python (3.11.7)

(root)/
lib/
python3.11/
concurrent/
futures/
process.py
       1  # Copyright 2009 Brian Quinlan. All Rights Reserved.
       2  # Licensed to PSF under a Contributor Agreement.
       3  
       4  """Implements ProcessPoolExecutor.
       5  
       6  The following diagram and text describe the data-flow through the system:
       7  
       8  |======================= In-process =====================|== Out-of-process ==|
       9  
      10  +----------+     +----------+       +--------+     +-----------+    +---------+
      11  |          |  => | Work Ids |       |        |     | Call Q    |    | Process |
      12  |          |     +----------+       |        |     +-----------+    |  Pool   |
      13  |          |     | ...      |       |        |     | ...       |    +---------+
      14  |          |     | 6        |    => |        |  => | 5, call() | => |         |
      15  |          |     | 7        |       |        |     | ...       |    |         |
      16  | Process  |     | ...      |       | Local  |     +-----------+    | Process |
      17  |  Pool    |     +----------+       | Worker |                      |  #1..n  |
      18  | Executor |                        | Thread |                      |         |
      19  |          |     +----------- +     |        |     +-----------+    |         |
      20  |          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
      21  |          |     +------------+     |        |     +-----------+    |         |
      22  |          |     | 6: call()  |     |        |     | ...       |    |         |
      23  |          |     |    future  |     |        |     | 4, result |    |         |
      24  |          |     | ...        |     |        |     | 3, except |    |         |
      25  +----------+     +------------+     +--------+     +-----------+    +---------+
      26  
      27  Executor.submit() called:
      28  - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
      29  - adds the id of the _WorkItem to the "Work Ids" queue
      30  
      31  Local worker thread:
      32  - reads work ids from the "Work Ids" queue and looks up the corresponding
      33    WorkItem from the "Work Items" dict: if the work item has been cancelled then
      34    it is simply removed from the dict, otherwise it is repackaged as a
      35    _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
      36    until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
      37    calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
      38  - reads _ResultItems from "Result Q", updates the future stored in the
      39    "Work Items" dict and deletes the dict entry
      40  
      41  Process #1..n:
      42  - reads _CallItems from "Call Q", executes the calls, and puts the resulting
      43    _ResultItems in "Result Q"
      44  """
      45  
      46  __author__ = 'Brian Quinlan (brian@sweetapp.com)'
      47  
      48  import os
      49  from concurrent.futures import _base
      50  import queue
      51  import multiprocessing as mp
      52  import multiprocessing.connection
      53  from multiprocessing.queues import Queue
      54  import threading
      55  import weakref
      56  from functools import partial
      57  import itertools
      58  import sys
      59  from traceback import format_exception
      60  
      61  
      62  _threads_wakeups = weakref.WeakKeyDictionary()
      63  _global_shutdown = False
      64  
      65  
      66  class ESC[4;38;5;81m_ThreadWakeup:
      67      def __init__(self):
      68          self._closed = False
      69          self._reader, self._writer = mp.Pipe(duplex=False)
      70  
      71      def close(self):
      72          # Please note that we do not take the shutdown lock when
      73          # calling clear() (to avoid deadlocking) so this method can
      74          # only be called safely from the same thread as all calls to
      75          # clear() even if you hold the shutdown lock. Otherwise we
      76          # might try to read from the closed pipe.
      77          if not self._closed:
      78              self._closed = True
      79              self._writer.close()
      80              self._reader.close()
      81  
      82      def wakeup(self):
      83          if not self._closed:
      84              self._writer.send_bytes(b"")
      85  
      86      def clear(self):
      87          if not self._closed:
      88              while self._reader.poll():
      89                  self._reader.recv_bytes()
      90  
      91  
      92  def _python_exit():
      93      global _global_shutdown
      94      _global_shutdown = True
      95      items = list(_threads_wakeups.items())
      96      for _, thread_wakeup in items:
      97          # call not protected by ProcessPoolExecutor._shutdown_lock
      98          thread_wakeup.wakeup()
      99      for t, _ in items:
     100          t.join()
     101  
     102  # Register for `_python_exit()` to be called just before joining all
     103  # non-daemon threads. This is used instead of `atexit.register()` for
     104  # compatibility with subinterpreters, which no longer support daemon threads.
     105  # See bpo-39812 for context.
     106  threading._register_atexit(_python_exit)
     107  
     108  # Controls how many more calls than processes will be queued in the call queue.
     109  # A smaller number will mean that processes spend more time idle waiting for
     110  # work while a larger number will make Future.cancel() succeed less frequently
     111  # (Futures in the call queue cannot be cancelled).
     112  EXTRA_QUEUED_CALLS = 1
     113  
     114  
     115  # On Windows, WaitForMultipleObjects is used to wait for processes to finish.
     116  # It can wait on, at most, 63 objects. There is an overhead of two objects:
     117  # - the result queue reader
     118  # - the thread wakeup reader
     119  _MAX_WINDOWS_WORKERS = 63 - 2
     120  
     121  # Hack to embed stringification of remote traceback in local traceback
     122  
     123  class ESC[4;38;5;81m_RemoteTraceback(ESC[4;38;5;149mException):
     124      def __init__(self, tb):
     125          self.tb = tb
     126      def __str__(self):
     127          return self.tb
     128  
     129  class ESC[4;38;5;81m_ExceptionWithTraceback:
     130      def __init__(self, exc, tb):
     131          tb = ''.join(format_exception(type(exc), exc, tb))
     132          self.exc = exc
     133          # Traceback object needs to be garbage-collected as its frames
     134          # contain references to all the objects in the exception scope
     135          self.exc.__traceback__ = None
     136          self.tb = '\n"""\n%s"""' % tb
     137      def __reduce__(self):
     138          return _rebuild_exc, (self.exc, self.tb)
     139  
     140  def _rebuild_exc(exc, tb):
     141      exc.__cause__ = _RemoteTraceback(tb)
     142      return exc
     143  
     144  class ESC[4;38;5;81m_WorkItem(ESC[4;38;5;149mobject):
     145      def __init__(self, future, fn, args, kwargs):
     146          self.future = future
     147          self.fn = fn
     148          self.args = args
     149          self.kwargs = kwargs
     150  
     151  class ESC[4;38;5;81m_ResultItem(ESC[4;38;5;149mobject):
     152      def __init__(self, work_id, exception=None, result=None, exit_pid=None):
     153          self.work_id = work_id
     154          self.exception = exception
     155          self.result = result
     156          self.exit_pid = exit_pid
     157  
     158  class ESC[4;38;5;81m_CallItem(ESC[4;38;5;149mobject):
     159      def __init__(self, work_id, fn, args, kwargs):
     160          self.work_id = work_id
     161          self.fn = fn
     162          self.args = args
     163          self.kwargs = kwargs
     164  
     165  
     166  class ESC[4;38;5;81m_SafeQueue(ESC[4;38;5;149mQueue):
     167      """Safe Queue set exception to the future object linked to a job"""
     168      def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
     169                   thread_wakeup):
     170          self.pending_work_items = pending_work_items
     171          self.shutdown_lock = shutdown_lock
     172          self.thread_wakeup = thread_wakeup
     173          super().__init__(max_size, ctx=ctx)
     174  
     175      def _on_queue_feeder_error(self, e, obj):
     176          if isinstance(obj, _CallItem):
     177              tb = format_exception(type(e), e, e.__traceback__)
     178              e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
     179              work_item = self.pending_work_items.pop(obj.work_id, None)
     180              with self.shutdown_lock:
     181                  self.thread_wakeup.wakeup()
     182              # work_item can be None if another process terminated. In this
     183              # case, the executor_manager_thread fails all work_items
     184              # with BrokenProcessPool
     185              if work_item is not None:
     186                  work_item.future.set_exception(e)
     187          else:
     188              super()._on_queue_feeder_error(e, obj)
     189  
     190  
     191  def _get_chunks(*iterables, chunksize):
     192      """ Iterates over zip()ed iterables in chunks. """
     193      it = zip(*iterables)
     194      while True:
     195          chunk = tuple(itertools.islice(it, chunksize))
     196          if not chunk:
     197              return
     198          yield chunk
     199  
     200  
     201  def _process_chunk(fn, chunk):
     202      """ Processes a chunk of an iterable passed to map.
     203  
     204      Runs the function passed to map() on a chunk of the
     205      iterable passed to map.
     206  
     207      This function is run in a separate process.
     208  
     209      """
     210      return [fn(*args) for args in chunk]
     211  
     212  
     213  def _sendback_result(result_queue, work_id, result=None, exception=None,
     214                       exit_pid=None):
     215      """Safely send back the given result or exception"""
     216      try:
     217          result_queue.put(_ResultItem(work_id, result=result,
     218                                       exception=exception, exit_pid=exit_pid))
     219      except BaseException as e:
     220          exc = _ExceptionWithTraceback(e, e.__traceback__)
     221          result_queue.put(_ResultItem(work_id, exception=exc,
     222                                       exit_pid=exit_pid))
     223  
     224  
     225  def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
     226      """Evaluates calls from call_queue and places the results in result_queue.
     227  
     228      This worker is run in a separate process.
     229  
     230      Args:
     231          call_queue: A ctx.Queue of _CallItems that will be read and
     232              evaluated by the worker.
     233          result_queue: A ctx.Queue of _ResultItems that will written
     234              to by the worker.
     235          initializer: A callable initializer, or None
     236          initargs: A tuple of args for the initializer
     237      """
     238      if initializer is not None:
     239          try:
     240              initializer(*initargs)
     241          except BaseException:
     242              _base.LOGGER.critical('Exception in initializer:', exc_info=True)
     243              # The parent will notice that the process stopped and
     244              # mark the pool broken
     245              return
     246      num_tasks = 0
     247      exit_pid = None
     248      while True:
     249          call_item = call_queue.get(block=True)
     250          if call_item is None:
     251              # Wake up queue management thread
     252              result_queue.put(os.getpid())
     253              return
     254  
     255          if max_tasks is not None:
     256              num_tasks += 1
     257              if num_tasks >= max_tasks:
     258                  exit_pid = os.getpid()
     259  
     260          try:
     261              r = call_item.fn(*call_item.args, **call_item.kwargs)
     262          except BaseException as e:
     263              exc = _ExceptionWithTraceback(e, e.__traceback__)
     264              _sendback_result(result_queue, call_item.work_id, exception=exc,
     265                               exit_pid=exit_pid)
     266          else:
     267              _sendback_result(result_queue, call_item.work_id, result=r,
     268                               exit_pid=exit_pid)
     269              del r
     270  
     271          # Liberate the resource as soon as possible, to avoid holding onto
     272          # open files or shared memory that is not needed anymore
     273          del call_item
     274  
     275          if exit_pid is not None:
     276              return
     277  
     278  
     279  class ESC[4;38;5;81m_ExecutorManagerThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
     280      """Manages the communication between this process and the worker processes.
     281  
     282      The manager is run in a local thread.
     283  
     284      Args:
     285          executor: A reference to the ProcessPoolExecutor that owns
     286              this thread. A weakref will be own by the manager as well as
     287              references to internal objects used to introspect the state of
     288              the executor.
     289      """
     290  
     291      def __init__(self, executor):
     292          # Store references to necessary internals of the executor.
     293  
     294          # A _ThreadWakeup to allow waking up the queue_manager_thread from the
     295          # main Thread and avoid deadlocks caused by permanently locked queues.
     296          self.thread_wakeup = executor._executor_manager_thread_wakeup
     297          self.shutdown_lock = executor._shutdown_lock
     298  
     299          # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
     300          # to determine if the ProcessPoolExecutor has been garbage collected
     301          # and that the manager can exit.
     302          # When the executor gets garbage collected, the weakref callback
     303          # will wake up the queue management thread so that it can terminate
     304          # if there is no pending work item.
     305          def weakref_cb(_,
     306                         thread_wakeup=self.thread_wakeup,
     307                         shutdown_lock=self.shutdown_lock):
     308              mp.util.debug('Executor collected: triggering callback for'
     309                            ' QueueManager wakeup')
     310              with shutdown_lock:
     311                  thread_wakeup.wakeup()
     312  
     313          self.executor_reference = weakref.ref(executor, weakref_cb)
     314  
     315          # A list of the ctx.Process instances used as workers.
     316          self.processes = executor._processes
     317  
     318          # A ctx.Queue that will be filled with _CallItems derived from
     319          # _WorkItems for processing by the process workers.
     320          self.call_queue = executor._call_queue
     321  
     322          # A ctx.SimpleQueue of _ResultItems generated by the process workers.
     323          self.result_queue = executor._result_queue
     324  
     325          # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
     326          self.work_ids_queue = executor._work_ids
     327  
     328          # Maximum number of tasks a worker process can execute before
     329          # exiting safely
     330          self.max_tasks_per_child = executor._max_tasks_per_child
     331  
     332          # A dict mapping work ids to _WorkItems e.g.
     333          #     {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
     334          self.pending_work_items = executor._pending_work_items
     335  
     336          super().__init__()
     337  
     338      def run(self):
     339          # Main loop for the executor manager thread.
     340  
     341          while True:
     342              self.add_call_item_to_queue()
     343  
     344              result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
     345  
     346              if is_broken:
     347                  self.terminate_broken(cause)
     348                  return
     349              if result_item is not None:
     350                  self.process_result_item(result_item)
     351  
     352                  process_exited = result_item.exit_pid is not None
     353                  if process_exited:
     354                      p = self.processes.pop(result_item.exit_pid)
     355                      p.join()
     356  
     357                  # Delete reference to result_item to avoid keeping references
     358                  # while waiting on new results.
     359                  del result_item
     360  
     361                  if executor := self.executor_reference():
     362                      if process_exited:
     363                          with self.shutdown_lock:
     364                              executor._adjust_process_count()
     365                      else:
     366                          executor._idle_worker_semaphore.release()
     367                      del executor
     368  
     369              if self.is_shutting_down():
     370                  self.flag_executor_shutting_down()
     371  
     372                  # When only canceled futures remain in pending_work_items, our
     373                  # next call to wait_result_broken_or_wakeup would hang forever.
     374                  # This makes sure we have some running futures or none at all.
     375                  self.add_call_item_to_queue()
     376  
     377                  # Since no new work items can be added, it is safe to shutdown
     378                  # this thread if there are no pending work items.
     379                  if not self.pending_work_items:
     380                      self.join_executor_internals()
     381                      return
     382  
     383      def add_call_item_to_queue(self):
     384          # Fills call_queue with _WorkItems from pending_work_items.
     385          # This function never blocks.
     386          while True:
     387              if self.call_queue.full():
     388                  return
     389              try:
     390                  work_id = self.work_ids_queue.get(block=False)
     391              except queue.Empty:
     392                  return
     393              else:
     394                  work_item = self.pending_work_items[work_id]
     395  
     396                  if work_item.future.set_running_or_notify_cancel():
     397                      self.call_queue.put(_CallItem(work_id,
     398                                                    work_item.fn,
     399                                                    work_item.args,
     400                                                    work_item.kwargs),
     401                                          block=True)
     402                  else:
     403                      del self.pending_work_items[work_id]
     404                      continue
     405  
     406      def wait_result_broken_or_wakeup(self):
     407          # Wait for a result to be ready in the result_queue while checking
     408          # that all worker processes are still running, or for a wake up
     409          # signal send. The wake up signals come either from new tasks being
     410          # submitted, from the executor being shutdown/gc-ed, or from the
     411          # shutdown of the python interpreter.
     412          result_reader = self.result_queue._reader
     413          assert not self.thread_wakeup._closed
     414          wakeup_reader = self.thread_wakeup._reader
     415          readers = [result_reader, wakeup_reader]
     416          worker_sentinels = [p.sentinel for p in list(self.processes.values())]
     417          ready = mp.connection.wait(readers + worker_sentinels)
     418  
     419          cause = None
     420          is_broken = True
     421          result_item = None
     422          if result_reader in ready:
     423              try:
     424                  result_item = result_reader.recv()
     425                  is_broken = False
     426              except BaseException as e:
     427                  cause = format_exception(type(e), e, e.__traceback__)
     428  
     429          elif wakeup_reader in ready:
     430              is_broken = False
     431  
     432          # No need to hold the _shutdown_lock here because:
     433          # 1. we're the only thread to use the wakeup reader
     434          # 2. we're also the only thread to call thread_wakeup.close()
     435          # 3. we want to avoid a possible deadlock when both reader and writer
     436          #    would block (gh-105829)
     437          self.thread_wakeup.clear()
     438  
     439          return result_item, is_broken, cause
     440  
     441      def process_result_item(self, result_item):
     442          # Process the received a result_item. This can be either the PID of a
     443          # worker that exited gracefully or a _ResultItem
     444  
     445          if isinstance(result_item, int):
     446              # Clean shutdown of a worker using its PID
     447              # (avoids marking the executor broken)
     448              assert self.is_shutting_down()
     449              p = self.processes.pop(result_item)
     450              p.join()
     451              if not self.processes:
     452                  self.join_executor_internals()
     453                  return
     454          else:
     455              # Received a _ResultItem so mark the future as completed.
     456              work_item = self.pending_work_items.pop(result_item.work_id, None)
     457              # work_item can be None if another process terminated (see above)
     458              if work_item is not None:
     459                  if result_item.exception:
     460                      work_item.future.set_exception(result_item.exception)
     461                  else:
     462                      work_item.future.set_result(result_item.result)
     463  
     464      def is_shutting_down(self):
     465          # Check whether we should start shutting down the executor.
     466          executor = self.executor_reference()
     467          # No more work items can be added if:
     468          #   - The interpreter is shutting down OR
     469          #   - The executor that owns this worker has been collected OR
     470          #   - The executor that owns this worker has been shutdown.
     471          return (_global_shutdown or executor is None
     472                  or executor._shutdown_thread)
     473  
     474      def terminate_broken(self, cause):
     475          # Terminate the executor because it is in a broken state. The cause
     476          # argument can be used to display more information on the error that
     477          # lead the executor into becoming broken.
     478  
     479          # Mark the process pool broken so that submits fail right now.
     480          executor = self.executor_reference()
     481          if executor is not None:
     482              executor._broken = ('A child process terminated '
     483                                  'abruptly, the process pool is not '
     484                                  'usable anymore')
     485              executor._shutdown_thread = True
     486              executor = None
     487  
     488          # All pending tasks are to be marked failed with the following
     489          # BrokenProcessPool error
     490          bpe = BrokenProcessPool("A process in the process pool was "
     491                                  "terminated abruptly while the future was "
     492                                  "running or pending.")
     493          if cause is not None:
     494              bpe.__cause__ = _RemoteTraceback(
     495                  f"\n'''\n{''.join(cause)}'''")
     496  
     497          # Mark pending tasks as failed.
     498          for work_id, work_item in self.pending_work_items.items():
     499              work_item.future.set_exception(bpe)
     500              # Delete references to object. See issue16284
     501              del work_item
     502          self.pending_work_items.clear()
     503  
     504          # Terminate remaining workers forcibly: the queues or their
     505          # locks may be in a dirty state and block forever.
     506          for p in self.processes.values():
     507              p.terminate()
     508  
     509          # Prevent queue writing to a pipe which is no longer read.
     510          # https://github.com/python/cpython/issues/94777
     511          self.call_queue._reader.close()
     512  
     513          # gh-107219: Close the connection writer which can unblock
     514          # Queue._feed() if it was stuck in send_bytes().
     515          if sys.platform == 'win32':
     516              self.call_queue._writer.close()
     517  
     518          # clean up resources
     519          self.join_executor_internals()
     520  
     521      def flag_executor_shutting_down(self):
     522          # Flag the executor as shutting down and cancel remaining tasks if
     523          # requested as early as possible if it is not gc-ed yet.
     524          executor = self.executor_reference()
     525          if executor is not None:
     526              executor._shutdown_thread = True
     527              # Cancel pending work items if requested.
     528              if executor._cancel_pending_futures:
     529                  # Cancel all pending futures and update pending_work_items
     530                  # to only have futures that are currently running.
     531                  new_pending_work_items = {}
     532                  for work_id, work_item in self.pending_work_items.items():
     533                      if not work_item.future.cancel():
     534                          new_pending_work_items[work_id] = work_item
     535                  self.pending_work_items = new_pending_work_items
     536                  # Drain work_ids_queue since we no longer need to
     537                  # add items to the call queue.
     538                  while True:
     539                      try:
     540                          self.work_ids_queue.get_nowait()
     541                      except queue.Empty:
     542                          break
     543                  # Make sure we do this only once to not waste time looping
     544                  # on running processes over and over.
     545                  executor._cancel_pending_futures = False
     546  
     547      def shutdown_workers(self):
     548          n_children_to_stop = self.get_n_children_alive()
     549          n_sentinels_sent = 0
     550          # Send the right number of sentinels, to make sure all children are
     551          # properly terminated.
     552          while (n_sentinels_sent < n_children_to_stop
     553                  and self.get_n_children_alive() > 0):
     554              for i in range(n_children_to_stop - n_sentinels_sent):
     555                  try:
     556                      self.call_queue.put_nowait(None)
     557                      n_sentinels_sent += 1
     558                  except queue.Full:
     559                      break
     560  
     561      def join_executor_internals(self):
     562          self.shutdown_workers()
     563          # Release the queue's resources as soon as possible.
     564          self.call_queue.close()
     565          self.call_queue.join_thread()
     566          with self.shutdown_lock:
     567              self.thread_wakeup.close()
     568          # If .join() is not called on the created processes then
     569          # some ctx.Queue methods may deadlock on Mac OS X.
     570          for p in self.processes.values():
     571              p.join()
     572  
     573      def get_n_children_alive(self):
     574          # This is an upper bound on the number of children alive.
     575          return sum(p.is_alive() for p in self.processes.values())
     576  
     577  
     578  _system_limits_checked = False
     579  _system_limited = None
     580  
     581  
     582  def _check_system_limits():
     583      global _system_limits_checked, _system_limited
     584      if _system_limits_checked:
     585          if _system_limited:
     586              raise NotImplementedError(_system_limited)
     587      _system_limits_checked = True
     588      try:
     589          import multiprocessing.synchronize
     590      except ImportError:
     591          _system_limited = (
     592              "This Python build lacks multiprocessing.synchronize, usually due "
     593              "to named semaphores being unavailable on this platform."
     594          )
     595          raise NotImplementedError(_system_limited)
     596      try:
     597          nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
     598      except (AttributeError, ValueError):
     599          # sysconf not available or setting not available
     600          return
     601      if nsems_max == -1:
     602          # indetermined limit, assume that limit is determined
     603          # by available memory only
     604          return
     605      if nsems_max >= 256:
     606          # minimum number of semaphores available
     607          # according to POSIX
     608          return
     609      _system_limited = ("system provides too few semaphores (%d"
     610                         " available, 256 necessary)" % nsems_max)
     611      raise NotImplementedError(_system_limited)
     612  
     613  
     614  def _chain_from_iterable_of_lists(iterable):
     615      """
     616      Specialized implementation of itertools.chain.from_iterable.
     617      Each item in *iterable* should be a list.  This function is
     618      careful not to keep references to yielded objects.
     619      """
     620      for element in iterable:
     621          element.reverse()
     622          while element:
     623              yield element.pop()
     624  
     625  
     626  class ESC[4;38;5;81mBrokenProcessPool(ESC[4;38;5;149m_baseESC[4;38;5;149m.ESC[4;38;5;149mBrokenExecutor):
     627      """
     628      Raised when a process in a ProcessPoolExecutor terminated abruptly
     629      while a future was in the running state.
     630      """
     631  
     632  
     633  class ESC[4;38;5;81mProcessPoolExecutor(ESC[4;38;5;149m_baseESC[4;38;5;149m.ESC[4;38;5;149mExecutor):
     634      def __init__(self, max_workers=None, mp_context=None,
     635                   initializer=None, initargs=(), *, max_tasks_per_child=None):
     636          """Initializes a new ProcessPoolExecutor instance.
     637  
     638          Args:
     639              max_workers: The maximum number of processes that can be used to
     640                  execute the given calls. If None or not given then as many
     641                  worker processes will be created as the machine has processors.
     642              mp_context: A multiprocessing context to launch the workers. This
     643                  object should provide SimpleQueue, Queue and Process. Useful
     644                  to allow specific multiprocessing start methods.
     645              initializer: A callable used to initialize worker processes.
     646              initargs: A tuple of arguments to pass to the initializer.
     647              max_tasks_per_child: The maximum number of tasks a worker process
     648                  can complete before it will exit and be replaced with a fresh
     649                  worker process. The default of None means worker process will
     650                  live as long as the executor. Requires a non-'fork' mp_context
     651                  start method. When given, we default to using 'spawn' if no
     652                  mp_context is supplied.
     653          """
     654          _check_system_limits()
     655  
     656          if max_workers is None:
     657              self._max_workers = os.cpu_count() or 1
     658              if sys.platform == 'win32':
     659                  self._max_workers = min(_MAX_WINDOWS_WORKERS,
     660                                          self._max_workers)
     661          else:
     662              if max_workers <= 0:
     663                  raise ValueError("max_workers must be greater than 0")
     664              elif (sys.platform == 'win32' and
     665                  max_workers > _MAX_WINDOWS_WORKERS):
     666                  raise ValueError(
     667                      f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
     668  
     669              self._max_workers = max_workers
     670  
     671          if mp_context is None:
     672              if max_tasks_per_child is not None:
     673                  mp_context = mp.get_context("spawn")
     674              else:
     675                  mp_context = mp.get_context()
     676          self._mp_context = mp_context
     677  
     678          # https://github.com/python/cpython/issues/90622
     679          self._safe_to_dynamically_spawn_children = (
     680                  self._mp_context.get_start_method(allow_none=False) != "fork")
     681  
     682          if initializer is not None and not callable(initializer):
     683              raise TypeError("initializer must be a callable")
     684          self._initializer = initializer
     685          self._initargs = initargs
     686  
     687          if max_tasks_per_child is not None:
     688              if not isinstance(max_tasks_per_child, int):
     689                  raise TypeError("max_tasks_per_child must be an integer")
     690              elif max_tasks_per_child <= 0:
     691                  raise ValueError("max_tasks_per_child must be >= 1")
     692              if self._mp_context.get_start_method(allow_none=False) == "fork":
     693                  # https://github.com/python/cpython/issues/90622
     694                  raise ValueError("max_tasks_per_child is incompatible with"
     695                                   " the 'fork' multiprocessing start method;"
     696                                   " supply a different mp_context.")
     697          self._max_tasks_per_child = max_tasks_per_child
     698  
     699          # Management thread
     700          self._executor_manager_thread = None
     701  
     702          # Map of pids to processes
     703          self._processes = {}
     704  
     705          # Shutdown is a two-step process.
     706          self._shutdown_thread = False
     707          self._shutdown_lock = threading.Lock()
     708          self._idle_worker_semaphore = threading.Semaphore(0)
     709          self._broken = False
     710          self._queue_count = 0
     711          self._pending_work_items = {}
     712          self._cancel_pending_futures = False
     713  
     714          # _ThreadWakeup is a communication channel used to interrupt the wait
     715          # of the main loop of executor_manager_thread from another thread (e.g.
     716          # when calling executor.submit or executor.shutdown). We do not use the
     717          # _result_queue to send wakeup signals to the executor_manager_thread
     718          # as it could result in a deadlock if a worker process dies with the
     719          # _result_queue write lock still acquired.
     720          #
     721          # _shutdown_lock must be locked to access _ThreadWakeup.close() and
     722          # .wakeup(). Care must also be taken to not call clear or close from
     723          # more than one thread since _ThreadWakeup.clear() is not protected by
     724          # the _shutdown_lock
     725          self._executor_manager_thread_wakeup = _ThreadWakeup()
     726  
     727          # Create communication channels for the executor
     728          # Make the call queue slightly larger than the number of processes to
     729          # prevent the worker processes from idling. But don't make it too big
     730          # because futures in the call queue cannot be cancelled.
     731          queue_size = self._max_workers + EXTRA_QUEUED_CALLS
     732          self._call_queue = _SafeQueue(
     733              max_size=queue_size, ctx=self._mp_context,
     734              pending_work_items=self._pending_work_items,
     735              shutdown_lock=self._shutdown_lock,
     736              thread_wakeup=self._executor_manager_thread_wakeup)
     737          # Killed worker processes can produce spurious "broken pipe"
     738          # tracebacks in the queue's own worker thread. But we detect killed
     739          # processes anyway, so silence the tracebacks.
     740          self._call_queue._ignore_epipe = True
     741          self._result_queue = mp_context.SimpleQueue()
     742          self._work_ids = queue.Queue()
     743  
     744      def _start_executor_manager_thread(self):
     745          if self._executor_manager_thread is None:
     746              # Start the processes so that their sentinels are known.
     747              if not self._safe_to_dynamically_spawn_children:  # ie, using fork.
     748                  self._launch_processes()
     749              self._executor_manager_thread = _ExecutorManagerThread(self)
     750              self._executor_manager_thread.start()
     751              _threads_wakeups[self._executor_manager_thread] = \
     752                  self._executor_manager_thread_wakeup
     753  
     754      def _adjust_process_count(self):
     755          # if there's an idle process, we don't need to spawn a new one.
     756          if self._idle_worker_semaphore.acquire(blocking=False):
     757              return
     758  
     759          process_count = len(self._processes)
     760          if process_count < self._max_workers:
     761              # Assertion disabled as this codepath is also used to replace a
     762              # worker that unexpectedly dies, even when using the 'fork' start
     763              # method. That means there is still a potential deadlock bug. If a
     764              # 'fork' mp_context worker dies, we'll be forking a new one when
     765              # we know a thread is running (self._executor_manager_thread).
     766              #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
     767              self._spawn_process()
     768  
     769      def _launch_processes(self):
     770          # https://github.com/python/cpython/issues/90622
     771          assert not self._executor_manager_thread, (
     772                  'Processes cannot be fork()ed after the thread has started, '
     773                  'deadlock in the child processes could result.')
     774          for _ in range(len(self._processes), self._max_workers):
     775              self._spawn_process()
     776  
     777      def _spawn_process(self):
     778          p = self._mp_context.Process(
     779              target=_process_worker,
     780              args=(self._call_queue,
     781                    self._result_queue,
     782                    self._initializer,
     783                    self._initargs,
     784                    self._max_tasks_per_child))
     785          p.start()
     786          self._processes[p.pid] = p
     787  
     788      def submit(self, fn, /, *args, **kwargs):
     789          with self._shutdown_lock:
     790              if self._broken:
     791                  raise BrokenProcessPool(self._broken)
     792              if self._shutdown_thread:
     793                  raise RuntimeError('cannot schedule new futures after shutdown')
     794              if _global_shutdown:
     795                  raise RuntimeError('cannot schedule new futures after '
     796                                     'interpreter shutdown')
     797  
     798              f = _base.Future()
     799              w = _WorkItem(f, fn, args, kwargs)
     800  
     801              self._pending_work_items[self._queue_count] = w
     802              self._work_ids.put(self._queue_count)
     803              self._queue_count += 1
     804              # Wake up queue management thread
     805              self._executor_manager_thread_wakeup.wakeup()
     806  
     807              if self._safe_to_dynamically_spawn_children:
     808                  self._adjust_process_count()
     809              self._start_executor_manager_thread()
     810              return f
     811      submit.__doc__ = _base.Executor.submit.__doc__
     812  
     813      def map(self, fn, *iterables, timeout=None, chunksize=1):
     814          """Returns an iterator equivalent to map(fn, iter).
     815  
     816          Args:
     817              fn: A callable that will take as many arguments as there are
     818                  passed iterables.
     819              timeout: The maximum number of seconds to wait. If None, then there
     820                  is no limit on the wait time.
     821              chunksize: If greater than one, the iterables will be chopped into
     822                  chunks of size chunksize and submitted to the process pool.
     823                  If set to one, the items in the list will be sent one at a time.
     824  
     825          Returns:
     826              An iterator equivalent to: map(func, *iterables) but the calls may
     827              be evaluated out-of-order.
     828  
     829          Raises:
     830              TimeoutError: If the entire result iterator could not be generated
     831                  before the given timeout.
     832              Exception: If fn(*args) raises for any values.
     833          """
     834          if chunksize < 1:
     835              raise ValueError("chunksize must be >= 1.")
     836  
     837          results = super().map(partial(_process_chunk, fn),
     838                                _get_chunks(*iterables, chunksize=chunksize),
     839                                timeout=timeout)
     840          return _chain_from_iterable_of_lists(results)
     841  
     842      def shutdown(self, wait=True, *, cancel_futures=False):
     843          with self._shutdown_lock:
     844              self._cancel_pending_futures = cancel_futures
     845              self._shutdown_thread = True
     846              if self._executor_manager_thread_wakeup is not None:
     847                  # Wake up queue management thread
     848                  self._executor_manager_thread_wakeup.wakeup()
     849  
     850          if self._executor_manager_thread is not None and wait:
     851              self._executor_manager_thread.join()
     852          # To reduce the risk of opening too many files, remove references to
     853          # objects that use file descriptors.
     854          self._executor_manager_thread = None
     855          self._call_queue = None
     856          if self._result_queue is not None and wait:
     857              self._result_queue.close()
     858          self._result_queue = None
     859          self._processes = None
     860          self._executor_manager_thread_wakeup = None
     861  
     862      shutdown.__doc__ = _base.Executor.shutdown.__doc__