python (3.12.0)
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
3
4 """Implements ProcessPoolExecutor.
5
6 The following diagram and text describe the data-flow through the system:
7
8 |======================= In-process =====================|== Out-of-process ==|
9
10 +----------+ +----------+ +--------+ +-----------+ +---------+
11 | | => | Work Ids | | | | Call Q | | Process |
12 | | +----------+ | | +-----------+ | Pool |
13 | | | ... | | | | ... | +---------+
14 | | | 6 | => | | => | 5, call() | => | |
15 | | | 7 | | | | ... | | |
16 | Process | | ... | | Local | +-----------+ | Process |
17 | Pool | +----------+ | Worker | | #1..n |
18 | Executor | | Thread | | |
19 | | +----------- + | | +-----------+ | |
20 | | <=> | Work Items | <=> | | <= | Result Q | <= | |
21 | | +------------+ | | +-----------+ | |
22 | | | 6: call() | | | | ... | | |
23 | | | future | | | | 4, result | | |
24 | | | ... | | | | 3, except | | |
25 +----------+ +------------+ +--------+ +-----------+ +---------+
26
27 Executor.submit() called:
28 - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29 - adds the id of the _WorkItem to the "Work Ids" queue
30
31 Local worker thread:
32 - reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38 - reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41 Process #1..n:
42 - reads _CallItems from "Call Q", executes the calls, and puts the resulting
43 _ResultItems in "Result Q"
44 """
45
46 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48 import os
49 from concurrent.futures import _base
50 import queue
51 import multiprocessing as mp
52 # This import is required to load the multiprocessing.connection submodule
53 # so that it can be accessed later as `mp.connection`
54 import multiprocessing.connection
55 from multiprocessing.queues import Queue
56 import threading
57 import weakref
58 from functools import partial
59 import itertools
60 import sys
61 from traceback import format_exception
62
63
64 _threads_wakeups = weakref.WeakKeyDictionary()
65 _global_shutdown = False
66
67
68 class ESC[4;38;5;81m_ThreadWakeup:
69 def __init__(self):
70 self._closed = False
71 self._reader, self._writer = mp.Pipe(duplex=False)
72
73 def close(self):
74 if not self._closed:
75 self._closed = True
76 self._writer.close()
77 self._reader.close()
78
79 def wakeup(self):
80 if not self._closed:
81 self._writer.send_bytes(b"")
82
83 def clear(self):
84 if not self._closed:
85 while self._reader.poll():
86 self._reader.recv_bytes()
87
88
89 def _python_exit():
90 global _global_shutdown
91 _global_shutdown = True
92 items = list(_threads_wakeups.items())
93 for _, thread_wakeup in items:
94 # call not protected by ProcessPoolExecutor._shutdown_lock
95 thread_wakeup.wakeup()
96 for t, _ in items:
97 t.join()
98
99 # Register for `_python_exit()` to be called just before joining all
100 # non-daemon threads. This is used instead of `atexit.register()` for
101 # compatibility with subinterpreters, which no longer support daemon threads.
102 # See bpo-39812 for context.
103 threading._register_atexit(_python_exit)
104
105 # Controls how many more calls than processes will be queued in the call queue.
106 # A smaller number will mean that processes spend more time idle waiting for
107 # work while a larger number will make Future.cancel() succeed less frequently
108 # (Futures in the call queue cannot be cancelled).
109 EXTRA_QUEUED_CALLS = 1
110
111
112 # On Windows, WaitForMultipleObjects is used to wait for processes to finish.
113 # It can wait on, at most, 63 objects. There is an overhead of two objects:
114 # - the result queue reader
115 # - the thread wakeup reader
116 _MAX_WINDOWS_WORKERS = 63 - 2
117
118 # Hack to embed stringification of remote traceback in local traceback
119
120 class ESC[4;38;5;81m_RemoteTraceback(ESC[4;38;5;149mException):
121 def __init__(self, tb):
122 self.tb = tb
123 def __str__(self):
124 return self.tb
125
126 class ESC[4;38;5;81m_ExceptionWithTraceback:
127 def __init__(self, exc, tb):
128 tb = ''.join(format_exception(type(exc), exc, tb))
129 self.exc = exc
130 # Traceback object needs to be garbage-collected as its frames
131 # contain references to all the objects in the exception scope
132 self.exc.__traceback__ = None
133 self.tb = '\n"""\n%s"""' % tb
134 def __reduce__(self):
135 return _rebuild_exc, (self.exc, self.tb)
136
137 def _rebuild_exc(exc, tb):
138 exc.__cause__ = _RemoteTraceback(tb)
139 return exc
140
141 class ESC[4;38;5;81m_WorkItem(ESC[4;38;5;149mobject):
142 def __init__(self, future, fn, args, kwargs):
143 self.future = future
144 self.fn = fn
145 self.args = args
146 self.kwargs = kwargs
147
148 class ESC[4;38;5;81m_ResultItem(ESC[4;38;5;149mobject):
149 def __init__(self, work_id, exception=None, result=None, exit_pid=None):
150 self.work_id = work_id
151 self.exception = exception
152 self.result = result
153 self.exit_pid = exit_pid
154
155 class ESC[4;38;5;81m_CallItem(ESC[4;38;5;149mobject):
156 def __init__(self, work_id, fn, args, kwargs):
157 self.work_id = work_id
158 self.fn = fn
159 self.args = args
160 self.kwargs = kwargs
161
162
163 class ESC[4;38;5;81m_SafeQueue(ESC[4;38;5;149mQueue):
164 """Safe Queue set exception to the future object linked to a job"""
165 def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
166 thread_wakeup):
167 self.pending_work_items = pending_work_items
168 self.shutdown_lock = shutdown_lock
169 self.thread_wakeup = thread_wakeup
170 super().__init__(max_size, ctx=ctx)
171
172 def _on_queue_feeder_error(self, e, obj):
173 if isinstance(obj, _CallItem):
174 tb = format_exception(type(e), e, e.__traceback__)
175 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
176 work_item = self.pending_work_items.pop(obj.work_id, None)
177 with self.shutdown_lock:
178 self.thread_wakeup.wakeup()
179 # work_item can be None if another process terminated. In this
180 # case, the executor_manager_thread fails all work_items
181 # with BrokenProcessPool
182 if work_item is not None:
183 work_item.future.set_exception(e)
184 else:
185 super()._on_queue_feeder_error(e, obj)
186
187
188 def _get_chunks(*iterables, chunksize):
189 """ Iterates over zip()ed iterables in chunks. """
190 it = zip(*iterables)
191 while True:
192 chunk = tuple(itertools.islice(it, chunksize))
193 if not chunk:
194 return
195 yield chunk
196
197
198 def _process_chunk(fn, chunk):
199 """ Processes a chunk of an iterable passed to map.
200
201 Runs the function passed to map() on a chunk of the
202 iterable passed to map.
203
204 This function is run in a separate process.
205
206 """
207 return [fn(*args) for args in chunk]
208
209
210 def _sendback_result(result_queue, work_id, result=None, exception=None,
211 exit_pid=None):
212 """Safely send back the given result or exception"""
213 try:
214 result_queue.put(_ResultItem(work_id, result=result,
215 exception=exception, exit_pid=exit_pid))
216 except BaseException as e:
217 exc = _ExceptionWithTraceback(e, e.__traceback__)
218 result_queue.put(_ResultItem(work_id, exception=exc,
219 exit_pid=exit_pid))
220
221
222 def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
223 """Evaluates calls from call_queue and places the results in result_queue.
224
225 This worker is run in a separate process.
226
227 Args:
228 call_queue: A ctx.Queue of _CallItems that will be read and
229 evaluated by the worker.
230 result_queue: A ctx.Queue of _ResultItems that will written
231 to by the worker.
232 initializer: A callable initializer, or None
233 initargs: A tuple of args for the initializer
234 """
235 if initializer is not None:
236 try:
237 initializer(*initargs)
238 except BaseException:
239 _base.LOGGER.critical('Exception in initializer:', exc_info=True)
240 # The parent will notice that the process stopped and
241 # mark the pool broken
242 return
243 num_tasks = 0
244 exit_pid = None
245 while True:
246 call_item = call_queue.get(block=True)
247 if call_item is None:
248 # Wake up queue management thread
249 result_queue.put(os.getpid())
250 return
251
252 if max_tasks is not None:
253 num_tasks += 1
254 if num_tasks >= max_tasks:
255 exit_pid = os.getpid()
256
257 try:
258 r = call_item.fn(*call_item.args, **call_item.kwargs)
259 except BaseException as e:
260 exc = _ExceptionWithTraceback(e, e.__traceback__)
261 _sendback_result(result_queue, call_item.work_id, exception=exc,
262 exit_pid=exit_pid)
263 else:
264 _sendback_result(result_queue, call_item.work_id, result=r,
265 exit_pid=exit_pid)
266 del r
267
268 # Liberate the resource as soon as possible, to avoid holding onto
269 # open files or shared memory that is not needed anymore
270 del call_item
271
272 if exit_pid is not None:
273 return
274
275
276 class ESC[4;38;5;81m_ExecutorManagerThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
277 """Manages the communication between this process and the worker processes.
278
279 The manager is run in a local thread.
280
281 Args:
282 executor: A reference to the ProcessPoolExecutor that owns
283 this thread. A weakref will be own by the manager as well as
284 references to internal objects used to introspect the state of
285 the executor.
286 """
287
288 def __init__(self, executor):
289 # Store references to necessary internals of the executor.
290
291 # A _ThreadWakeup to allow waking up the queue_manager_thread from the
292 # main Thread and avoid deadlocks caused by permanently locked queues.
293 self.thread_wakeup = executor._executor_manager_thread_wakeup
294 self.shutdown_lock = executor._shutdown_lock
295
296 # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
297 # to determine if the ProcessPoolExecutor has been garbage collected
298 # and that the manager can exit.
299 # When the executor gets garbage collected, the weakref callback
300 # will wake up the queue management thread so that it can terminate
301 # if there is no pending work item.
302 def weakref_cb(_,
303 thread_wakeup=self.thread_wakeup,
304 shutdown_lock=self.shutdown_lock):
305 mp.util.debug('Executor collected: triggering callback for'
306 ' QueueManager wakeup')
307 with shutdown_lock:
308 thread_wakeup.wakeup()
309
310 self.executor_reference = weakref.ref(executor, weakref_cb)
311
312 # A list of the ctx.Process instances used as workers.
313 self.processes = executor._processes
314
315 # A ctx.Queue that will be filled with _CallItems derived from
316 # _WorkItems for processing by the process workers.
317 self.call_queue = executor._call_queue
318
319 # A ctx.SimpleQueue of _ResultItems generated by the process workers.
320 self.result_queue = executor._result_queue
321
322 # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
323 self.work_ids_queue = executor._work_ids
324
325 # Maximum number of tasks a worker process can execute before
326 # exiting safely
327 self.max_tasks_per_child = executor._max_tasks_per_child
328
329 # A dict mapping work ids to _WorkItems e.g.
330 # {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
331 self.pending_work_items = executor._pending_work_items
332
333 super().__init__()
334
335 def run(self):
336 # Main loop for the executor manager thread.
337
338 while True:
339 self.add_call_item_to_queue()
340
341 result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
342
343 if is_broken:
344 self.terminate_broken(cause)
345 return
346 if result_item is not None:
347 self.process_result_item(result_item)
348
349 process_exited = result_item.exit_pid is not None
350 if process_exited:
351 p = self.processes.pop(result_item.exit_pid)
352 p.join()
353
354 # Delete reference to result_item to avoid keeping references
355 # while waiting on new results.
356 del result_item
357
358 if executor := self.executor_reference():
359 if process_exited:
360 with self.shutdown_lock:
361 executor._adjust_process_count()
362 else:
363 executor._idle_worker_semaphore.release()
364 del executor
365
366 if self.is_shutting_down():
367 self.flag_executor_shutting_down()
368
369 # When only canceled futures remain in pending_work_items, our
370 # next call to wait_result_broken_or_wakeup would hang forever.
371 # This makes sure we have some running futures or none at all.
372 self.add_call_item_to_queue()
373
374 # Since no new work items can be added, it is safe to shutdown
375 # this thread if there are no pending work items.
376 if not self.pending_work_items:
377 self.join_executor_internals()
378 return
379
380 def add_call_item_to_queue(self):
381 # Fills call_queue with _WorkItems from pending_work_items.
382 # This function never blocks.
383 while True:
384 if self.call_queue.full():
385 return
386 try:
387 work_id = self.work_ids_queue.get(block=False)
388 except queue.Empty:
389 return
390 else:
391 work_item = self.pending_work_items[work_id]
392
393 if work_item.future.set_running_or_notify_cancel():
394 self.call_queue.put(_CallItem(work_id,
395 work_item.fn,
396 work_item.args,
397 work_item.kwargs),
398 block=True)
399 else:
400 del self.pending_work_items[work_id]
401 continue
402
403 def wait_result_broken_or_wakeup(self):
404 # Wait for a result to be ready in the result_queue while checking
405 # that all worker processes are still running, or for a wake up
406 # signal send. The wake up signals come either from new tasks being
407 # submitted, from the executor being shutdown/gc-ed, or from the
408 # shutdown of the python interpreter.
409 result_reader = self.result_queue._reader
410 assert not self.thread_wakeup._closed
411 wakeup_reader = self.thread_wakeup._reader
412 readers = [result_reader, wakeup_reader]
413 worker_sentinels = [p.sentinel for p in list(self.processes.values())]
414 ready = mp.connection.wait(readers + worker_sentinels)
415
416 cause = None
417 is_broken = True
418 result_item = None
419 if result_reader in ready:
420 try:
421 result_item = result_reader.recv()
422 is_broken = False
423 except BaseException as e:
424 cause = format_exception(type(e), e, e.__traceback__)
425
426 elif wakeup_reader in ready:
427 is_broken = False
428
429 with self.shutdown_lock:
430 self.thread_wakeup.clear()
431
432 return result_item, is_broken, cause
433
434 def process_result_item(self, result_item):
435 # Process the received a result_item. This can be either the PID of a
436 # worker that exited gracefully or a _ResultItem
437
438 if isinstance(result_item, int):
439 # Clean shutdown of a worker using its PID
440 # (avoids marking the executor broken)
441 assert self.is_shutting_down()
442 p = self.processes.pop(result_item)
443 p.join()
444 if not self.processes:
445 self.join_executor_internals()
446 return
447 else:
448 # Received a _ResultItem so mark the future as completed.
449 work_item = self.pending_work_items.pop(result_item.work_id, None)
450 # work_item can be None if another process terminated (see above)
451 if work_item is not None:
452 if result_item.exception:
453 work_item.future.set_exception(result_item.exception)
454 else:
455 work_item.future.set_result(result_item.result)
456
457 def is_shutting_down(self):
458 # Check whether we should start shutting down the executor.
459 executor = self.executor_reference()
460 # No more work items can be added if:
461 # - The interpreter is shutting down OR
462 # - The executor that owns this worker has been collected OR
463 # - The executor that owns this worker has been shutdown.
464 return (_global_shutdown or executor is None
465 or executor._shutdown_thread)
466
467 def terminate_broken(self, cause):
468 # Terminate the executor because it is in a broken state. The cause
469 # argument can be used to display more information on the error that
470 # lead the executor into becoming broken.
471
472 # Mark the process pool broken so that submits fail right now.
473 executor = self.executor_reference()
474 if executor is not None:
475 executor._broken = ('A child process terminated '
476 'abruptly, the process pool is not '
477 'usable anymore')
478 executor._shutdown_thread = True
479 executor = None
480
481 # All pending tasks are to be marked failed with the following
482 # BrokenProcessPool error
483 bpe = BrokenProcessPool("A process in the process pool was "
484 "terminated abruptly while the future was "
485 "running or pending.")
486 if cause is not None:
487 bpe.__cause__ = _RemoteTraceback(
488 f"\n'''\n{''.join(cause)}'''")
489
490 # Mark pending tasks as failed.
491 for work_id, work_item in self.pending_work_items.items():
492 work_item.future.set_exception(bpe)
493 # Delete references to object. See issue16284
494 del work_item
495 self.pending_work_items.clear()
496
497 # Terminate remaining workers forcibly: the queues or their
498 # locks may be in a dirty state and block forever.
499 for p in self.processes.values():
500 p.terminate()
501
502 # Prevent queue writing to a pipe which is no longer read.
503 # https://github.com/python/cpython/issues/94777
504 self.call_queue._reader.close()
505
506 # clean up resources
507 self.join_executor_internals()
508
509 def flag_executor_shutting_down(self):
510 # Flag the executor as shutting down and cancel remaining tasks if
511 # requested as early as possible if it is not gc-ed yet.
512 executor = self.executor_reference()
513 if executor is not None:
514 executor._shutdown_thread = True
515 # Cancel pending work items if requested.
516 if executor._cancel_pending_futures:
517 # Cancel all pending futures and update pending_work_items
518 # to only have futures that are currently running.
519 new_pending_work_items = {}
520 for work_id, work_item in self.pending_work_items.items():
521 if not work_item.future.cancel():
522 new_pending_work_items[work_id] = work_item
523 self.pending_work_items = new_pending_work_items
524 # Drain work_ids_queue since we no longer need to
525 # add items to the call queue.
526 while True:
527 try:
528 self.work_ids_queue.get_nowait()
529 except queue.Empty:
530 break
531 # Make sure we do this only once to not waste time looping
532 # on running processes over and over.
533 executor._cancel_pending_futures = False
534
535 def shutdown_workers(self):
536 n_children_to_stop = self.get_n_children_alive()
537 n_sentinels_sent = 0
538 # Send the right number of sentinels, to make sure all children are
539 # properly terminated.
540 while (n_sentinels_sent < n_children_to_stop
541 and self.get_n_children_alive() > 0):
542 for i in range(n_children_to_stop - n_sentinels_sent):
543 try:
544 self.call_queue.put_nowait(None)
545 n_sentinels_sent += 1
546 except queue.Full:
547 break
548
549 def join_executor_internals(self):
550 self.shutdown_workers()
551 # Release the queue's resources as soon as possible.
552 self.call_queue.close()
553 self.call_queue.join_thread()
554 with self.shutdown_lock:
555 self.thread_wakeup.close()
556 # If .join() is not called on the created processes then
557 # some ctx.Queue methods may deadlock on Mac OS X.
558 for p in self.processes.values():
559 p.join()
560
561 def get_n_children_alive(self):
562 # This is an upper bound on the number of children alive.
563 return sum(p.is_alive() for p in self.processes.values())
564
565
566 _system_limits_checked = False
567 _system_limited = None
568
569
570 def _check_system_limits():
571 global _system_limits_checked, _system_limited
572 if _system_limits_checked:
573 if _system_limited:
574 raise NotImplementedError(_system_limited)
575 _system_limits_checked = True
576 try:
577 import multiprocessing.synchronize
578 except ImportError:
579 _system_limited = (
580 "This Python build lacks multiprocessing.synchronize, usually due "
581 "to named semaphores being unavailable on this platform."
582 )
583 raise NotImplementedError(_system_limited)
584 try:
585 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
586 except (AttributeError, ValueError):
587 # sysconf not available or setting not available
588 return
589 if nsems_max == -1:
590 # indetermined limit, assume that limit is determined
591 # by available memory only
592 return
593 if nsems_max >= 256:
594 # minimum number of semaphores available
595 # according to POSIX
596 return
597 _system_limited = ("system provides too few semaphores (%d"
598 " available, 256 necessary)" % nsems_max)
599 raise NotImplementedError(_system_limited)
600
601
602 def _chain_from_iterable_of_lists(iterable):
603 """
604 Specialized implementation of itertools.chain.from_iterable.
605 Each item in *iterable* should be a list. This function is
606 careful not to keep references to yielded objects.
607 """
608 for element in iterable:
609 element.reverse()
610 while element:
611 yield element.pop()
612
613
614 class ESC[4;38;5;81mBrokenProcessPool(ESC[4;38;5;149m_baseESC[4;38;5;149m.ESC[4;38;5;149mBrokenExecutor):
615 """
616 Raised when a process in a ProcessPoolExecutor terminated abruptly
617 while a future was in the running state.
618 """
619
620
621 class ESC[4;38;5;81mProcessPoolExecutor(ESC[4;38;5;149m_baseESC[4;38;5;149m.ESC[4;38;5;149mExecutor):
622 def __init__(self, max_workers=None, mp_context=None,
623 initializer=None, initargs=(), *, max_tasks_per_child=None):
624 """Initializes a new ProcessPoolExecutor instance.
625
626 Args:
627 max_workers: The maximum number of processes that can be used to
628 execute the given calls. If None or not given then as many
629 worker processes will be created as the machine has processors.
630 mp_context: A multiprocessing context to launch the workers created
631 using the multiprocessing.get_context('start method') API. This
632 object should provide SimpleQueue, Queue and Process.
633 initializer: A callable used to initialize worker processes.
634 initargs: A tuple of arguments to pass to the initializer.
635 max_tasks_per_child: The maximum number of tasks a worker process
636 can complete before it will exit and be replaced with a fresh
637 worker process. The default of None means worker process will
638 live as long as the executor. Requires a non-'fork' mp_context
639 start method. When given, we default to using 'spawn' if no
640 mp_context is supplied.
641 """
642 _check_system_limits()
643
644 if max_workers is None:
645 self._max_workers = os.cpu_count() or 1
646 if sys.platform == 'win32':
647 self._max_workers = min(_MAX_WINDOWS_WORKERS,
648 self._max_workers)
649 else:
650 if max_workers <= 0:
651 raise ValueError("max_workers must be greater than 0")
652 elif (sys.platform == 'win32' and
653 max_workers > _MAX_WINDOWS_WORKERS):
654 raise ValueError(
655 f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
656
657 self._max_workers = max_workers
658
659 if mp_context is None:
660 if max_tasks_per_child is not None:
661 mp_context = mp.get_context("spawn")
662 else:
663 mp_context = mp.get_context()
664 self._mp_context = mp_context
665
666 # https://github.com/python/cpython/issues/90622
667 self._safe_to_dynamically_spawn_children = (
668 self._mp_context.get_start_method(allow_none=False) != "fork")
669
670 if initializer is not None and not callable(initializer):
671 raise TypeError("initializer must be a callable")
672 self._initializer = initializer
673 self._initargs = initargs
674
675 if max_tasks_per_child is not None:
676 if not isinstance(max_tasks_per_child, int):
677 raise TypeError("max_tasks_per_child must be an integer")
678 elif max_tasks_per_child <= 0:
679 raise ValueError("max_tasks_per_child must be >= 1")
680 if self._mp_context.get_start_method(allow_none=False) == "fork":
681 # https://github.com/python/cpython/issues/90622
682 raise ValueError("max_tasks_per_child is incompatible with"
683 " the 'fork' multiprocessing start method;"
684 " supply a different mp_context.")
685 self._max_tasks_per_child = max_tasks_per_child
686
687 # Management thread
688 self._executor_manager_thread = None
689
690 # Map of pids to processes
691 self._processes = {}
692
693 # Shutdown is a two-step process.
694 self._shutdown_thread = False
695 self._shutdown_lock = threading.Lock()
696 self._idle_worker_semaphore = threading.Semaphore(0)
697 self._broken = False
698 self._queue_count = 0
699 self._pending_work_items = {}
700 self._cancel_pending_futures = False
701
702 # _ThreadWakeup is a communication channel used to interrupt the wait
703 # of the main loop of executor_manager_thread from another thread (e.g.
704 # when calling executor.submit or executor.shutdown). We do not use the
705 # _result_queue to send wakeup signals to the executor_manager_thread
706 # as it could result in a deadlock if a worker process dies with the
707 # _result_queue write lock still acquired.
708 #
709 # _shutdown_lock must be locked to access _ThreadWakeup.
710 self._executor_manager_thread_wakeup = _ThreadWakeup()
711
712 # Create communication channels for the executor
713 # Make the call queue slightly larger than the number of processes to
714 # prevent the worker processes from idling. But don't make it too big
715 # because futures in the call queue cannot be cancelled.
716 queue_size = self._max_workers + EXTRA_QUEUED_CALLS
717 self._call_queue = _SafeQueue(
718 max_size=queue_size, ctx=self._mp_context,
719 pending_work_items=self._pending_work_items,
720 shutdown_lock=self._shutdown_lock,
721 thread_wakeup=self._executor_manager_thread_wakeup)
722 # Killed worker processes can produce spurious "broken pipe"
723 # tracebacks in the queue's own worker thread. But we detect killed
724 # processes anyway, so silence the tracebacks.
725 self._call_queue._ignore_epipe = True
726 self._result_queue = mp_context.SimpleQueue()
727 self._work_ids = queue.Queue()
728
729 def _start_executor_manager_thread(self):
730 if self._executor_manager_thread is None:
731 # Start the processes so that their sentinels are known.
732 if not self._safe_to_dynamically_spawn_children: # ie, using fork.
733 self._launch_processes()
734 self._executor_manager_thread = _ExecutorManagerThread(self)
735 self._executor_manager_thread.start()
736 _threads_wakeups[self._executor_manager_thread] = \
737 self._executor_manager_thread_wakeup
738
739 def _adjust_process_count(self):
740 # if there's an idle process, we don't need to spawn a new one.
741 if self._idle_worker_semaphore.acquire(blocking=False):
742 return
743
744 process_count = len(self._processes)
745 if process_count < self._max_workers:
746 # Assertion disabled as this codepath is also used to replace a
747 # worker that unexpectedly dies, even when using the 'fork' start
748 # method. That means there is still a potential deadlock bug. If a
749 # 'fork' mp_context worker dies, we'll be forking a new one when
750 # we know a thread is running (self._executor_manager_thread).
751 #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
752 self._spawn_process()
753
754 def _launch_processes(self):
755 # https://github.com/python/cpython/issues/90622
756 assert not self._executor_manager_thread, (
757 'Processes cannot be fork()ed after the thread has started, '
758 'deadlock in the child processes could result.')
759 for _ in range(len(self._processes), self._max_workers):
760 self._spawn_process()
761
762 def _spawn_process(self):
763 p = self._mp_context.Process(
764 target=_process_worker,
765 args=(self._call_queue,
766 self._result_queue,
767 self._initializer,
768 self._initargs,
769 self._max_tasks_per_child))
770 p.start()
771 self._processes[p.pid] = p
772
773 def submit(self, fn, /, *args, **kwargs):
774 with self._shutdown_lock:
775 if self._broken:
776 raise BrokenProcessPool(self._broken)
777 if self._shutdown_thread:
778 raise RuntimeError('cannot schedule new futures after shutdown')
779 if _global_shutdown:
780 raise RuntimeError('cannot schedule new futures after '
781 'interpreter shutdown')
782
783 f = _base.Future()
784 w = _WorkItem(f, fn, args, kwargs)
785
786 self._pending_work_items[self._queue_count] = w
787 self._work_ids.put(self._queue_count)
788 self._queue_count += 1
789 # Wake up queue management thread
790 self._executor_manager_thread_wakeup.wakeup()
791
792 if self._safe_to_dynamically_spawn_children:
793 self._adjust_process_count()
794 self._start_executor_manager_thread()
795 return f
796 submit.__doc__ = _base.Executor.submit.__doc__
797
798 def map(self, fn, *iterables, timeout=None, chunksize=1):
799 """Returns an iterator equivalent to map(fn, iter).
800
801 Args:
802 fn: A callable that will take as many arguments as there are
803 passed iterables.
804 timeout: The maximum number of seconds to wait. If None, then there
805 is no limit on the wait time.
806 chunksize: If greater than one, the iterables will be chopped into
807 chunks of size chunksize and submitted to the process pool.
808 If set to one, the items in the list will be sent one at a time.
809
810 Returns:
811 An iterator equivalent to: map(func, *iterables) but the calls may
812 be evaluated out-of-order.
813
814 Raises:
815 TimeoutError: If the entire result iterator could not be generated
816 before the given timeout.
817 Exception: If fn(*args) raises for any values.
818 """
819 if chunksize < 1:
820 raise ValueError("chunksize must be >= 1.")
821
822 results = super().map(partial(_process_chunk, fn),
823 _get_chunks(*iterables, chunksize=chunksize),
824 timeout=timeout)
825 return _chain_from_iterable_of_lists(results)
826
827 def shutdown(self, wait=True, *, cancel_futures=False):
828 with self._shutdown_lock:
829 self._cancel_pending_futures = cancel_futures
830 self._shutdown_thread = True
831 if self._executor_manager_thread_wakeup is not None:
832 # Wake up queue management thread
833 self._executor_manager_thread_wakeup.wakeup()
834
835 if self._executor_manager_thread is not None and wait:
836 self._executor_manager_thread.join()
837 # To reduce the risk of opening too many files, remove references to
838 # objects that use file descriptors.
839 self._executor_manager_thread = None
840 self._call_queue = None
841 if self._result_queue is not None and wait:
842 self._result_queue.close()
843 self._result_queue = None
844 self._processes = None
845 self._executor_manager_thread_wakeup = None
846
847 shutdown.__doc__ = _base.Executor.shutdown.__doc__