python (3.12.0)

(root)/
lib/
python3.12/
concurrent/
futures/
process.py
       1  # Copyright 2009 Brian Quinlan. All Rights Reserved.
       2  # Licensed to PSF under a Contributor Agreement.
       3  
       4  """Implements ProcessPoolExecutor.
       5  
       6  The following diagram and text describe the data-flow through the system:
       7  
       8  |======================= In-process =====================|== Out-of-process ==|
       9  
      10  +----------+     +----------+       +--------+     +-----------+    +---------+
      11  |          |  => | Work Ids |       |        |     | Call Q    |    | Process |
      12  |          |     +----------+       |        |     +-----------+    |  Pool   |
      13  |          |     | ...      |       |        |     | ...       |    +---------+
      14  |          |     | 6        |    => |        |  => | 5, call() | => |         |
      15  |          |     | 7        |       |        |     | ...       |    |         |
      16  | Process  |     | ...      |       | Local  |     +-----------+    | Process |
      17  |  Pool    |     +----------+       | Worker |                      |  #1..n  |
      18  | Executor |                        | Thread |                      |         |
      19  |          |     +----------- +     |        |     +-----------+    |         |
      20  |          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
      21  |          |     +------------+     |        |     +-----------+    |         |
      22  |          |     | 6: call()  |     |        |     | ...       |    |         |
      23  |          |     |    future  |     |        |     | 4, result |    |         |
      24  |          |     | ...        |     |        |     | 3, except |    |         |
      25  +----------+     +------------+     +--------+     +-----------+    +---------+
      26  
      27  Executor.submit() called:
      28  - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
      29  - adds the id of the _WorkItem to the "Work Ids" queue
      30  
      31  Local worker thread:
      32  - reads work ids from the "Work Ids" queue and looks up the corresponding
      33    WorkItem from the "Work Items" dict: if the work item has been cancelled then
      34    it is simply removed from the dict, otherwise it is repackaged as a
      35    _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
      36    until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
      37    calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
      38  - reads _ResultItems from "Result Q", updates the future stored in the
      39    "Work Items" dict and deletes the dict entry
      40  
      41  Process #1..n:
      42  - reads _CallItems from "Call Q", executes the calls, and puts the resulting
      43    _ResultItems in "Result Q"
      44  """
      45  
      46  __author__ = 'Brian Quinlan (brian@sweetapp.com)'
      47  
      48  import os
      49  from concurrent.futures import _base
      50  import queue
      51  import multiprocessing as mp
      52  # This import is required to load the multiprocessing.connection submodule
      53  # so that it can be accessed later as `mp.connection`
      54  import multiprocessing.connection
      55  from multiprocessing.queues import Queue
      56  import threading
      57  import weakref
      58  from functools import partial
      59  import itertools
      60  import sys
      61  from traceback import format_exception
      62  
      63  
      64  _threads_wakeups = weakref.WeakKeyDictionary()
      65  _global_shutdown = False
      66  
      67  
      68  class ESC[4;38;5;81m_ThreadWakeup:
      69      def __init__(self):
      70          self._closed = False
      71          self._reader, self._writer = mp.Pipe(duplex=False)
      72  
      73      def close(self):
      74          if not self._closed:
      75              self._closed = True
      76              self._writer.close()
      77              self._reader.close()
      78  
      79      def wakeup(self):
      80          if not self._closed:
      81              self._writer.send_bytes(b"")
      82  
      83      def clear(self):
      84          if not self._closed:
      85              while self._reader.poll():
      86                  self._reader.recv_bytes()
      87  
      88  
      89  def _python_exit():
      90      global _global_shutdown
      91      _global_shutdown = True
      92      items = list(_threads_wakeups.items())
      93      for _, thread_wakeup in items:
      94          # call not protected by ProcessPoolExecutor._shutdown_lock
      95          thread_wakeup.wakeup()
      96      for t, _ in items:
      97          t.join()
      98  
      99  # Register for `_python_exit()` to be called just before joining all
     100  # non-daemon threads. This is used instead of `atexit.register()` for
     101  # compatibility with subinterpreters, which no longer support daemon threads.
     102  # See bpo-39812 for context.
     103  threading._register_atexit(_python_exit)
     104  
     105  # Controls how many more calls than processes will be queued in the call queue.
     106  # A smaller number will mean that processes spend more time idle waiting for
     107  # work while a larger number will make Future.cancel() succeed less frequently
     108  # (Futures in the call queue cannot be cancelled).
     109  EXTRA_QUEUED_CALLS = 1
     110  
     111  
     112  # On Windows, WaitForMultipleObjects is used to wait for processes to finish.
     113  # It can wait on, at most, 63 objects. There is an overhead of two objects:
     114  # - the result queue reader
     115  # - the thread wakeup reader
     116  _MAX_WINDOWS_WORKERS = 63 - 2
     117  
     118  # Hack to embed stringification of remote traceback in local traceback
     119  
     120  class ESC[4;38;5;81m_RemoteTraceback(ESC[4;38;5;149mException):
     121      def __init__(self, tb):
     122          self.tb = tb
     123      def __str__(self):
     124          return self.tb
     125  
     126  class ESC[4;38;5;81m_ExceptionWithTraceback:
     127      def __init__(self, exc, tb):
     128          tb = ''.join(format_exception(type(exc), exc, tb))
     129          self.exc = exc
     130          # Traceback object needs to be garbage-collected as its frames
     131          # contain references to all the objects in the exception scope
     132          self.exc.__traceback__ = None
     133          self.tb = '\n"""\n%s"""' % tb
     134      def __reduce__(self):
     135          return _rebuild_exc, (self.exc, self.tb)
     136  
     137  def _rebuild_exc(exc, tb):
     138      exc.__cause__ = _RemoteTraceback(tb)
     139      return exc
     140  
     141  class ESC[4;38;5;81m_WorkItem(ESC[4;38;5;149mobject):
     142      def __init__(self, future, fn, args, kwargs):
     143          self.future = future
     144          self.fn = fn
     145          self.args = args
     146          self.kwargs = kwargs
     147  
     148  class ESC[4;38;5;81m_ResultItem(ESC[4;38;5;149mobject):
     149      def __init__(self, work_id, exception=None, result=None, exit_pid=None):
     150          self.work_id = work_id
     151          self.exception = exception
     152          self.result = result
     153          self.exit_pid = exit_pid
     154  
     155  class ESC[4;38;5;81m_CallItem(ESC[4;38;5;149mobject):
     156      def __init__(self, work_id, fn, args, kwargs):
     157          self.work_id = work_id
     158          self.fn = fn
     159          self.args = args
     160          self.kwargs = kwargs
     161  
     162  
     163  class ESC[4;38;5;81m_SafeQueue(ESC[4;38;5;149mQueue):
     164      """Safe Queue set exception to the future object linked to a job"""
     165      def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
     166                   thread_wakeup):
     167          self.pending_work_items = pending_work_items
     168          self.shutdown_lock = shutdown_lock
     169          self.thread_wakeup = thread_wakeup
     170          super().__init__(max_size, ctx=ctx)
     171  
     172      def _on_queue_feeder_error(self, e, obj):
     173          if isinstance(obj, _CallItem):
     174              tb = format_exception(type(e), e, e.__traceback__)
     175              e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
     176              work_item = self.pending_work_items.pop(obj.work_id, None)
     177              with self.shutdown_lock:
     178                  self.thread_wakeup.wakeup()
     179              # work_item can be None if another process terminated. In this
     180              # case, the executor_manager_thread fails all work_items
     181              # with BrokenProcessPool
     182              if work_item is not None:
     183                  work_item.future.set_exception(e)
     184          else:
     185              super()._on_queue_feeder_error(e, obj)
     186  
     187  
     188  def _get_chunks(*iterables, chunksize):
     189      """ Iterates over zip()ed iterables in chunks. """
     190      it = zip(*iterables)
     191      while True:
     192          chunk = tuple(itertools.islice(it, chunksize))
     193          if not chunk:
     194              return
     195          yield chunk
     196  
     197  
     198  def _process_chunk(fn, chunk):
     199      """ Processes a chunk of an iterable passed to map.
     200  
     201      Runs the function passed to map() on a chunk of the
     202      iterable passed to map.
     203  
     204      This function is run in a separate process.
     205  
     206      """
     207      return [fn(*args) for args in chunk]
     208  
     209  
     210  def _sendback_result(result_queue, work_id, result=None, exception=None,
     211                       exit_pid=None):
     212      """Safely send back the given result or exception"""
     213      try:
     214          result_queue.put(_ResultItem(work_id, result=result,
     215                                       exception=exception, exit_pid=exit_pid))
     216      except BaseException as e:
     217          exc = _ExceptionWithTraceback(e, e.__traceback__)
     218          result_queue.put(_ResultItem(work_id, exception=exc,
     219                                       exit_pid=exit_pid))
     220  
     221  
     222  def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
     223      """Evaluates calls from call_queue and places the results in result_queue.
     224  
     225      This worker is run in a separate process.
     226  
     227      Args:
     228          call_queue: A ctx.Queue of _CallItems that will be read and
     229              evaluated by the worker.
     230          result_queue: A ctx.Queue of _ResultItems that will written
     231              to by the worker.
     232          initializer: A callable initializer, or None
     233          initargs: A tuple of args for the initializer
     234      """
     235      if initializer is not None:
     236          try:
     237              initializer(*initargs)
     238          except BaseException:
     239              _base.LOGGER.critical('Exception in initializer:', exc_info=True)
     240              # The parent will notice that the process stopped and
     241              # mark the pool broken
     242              return
     243      num_tasks = 0
     244      exit_pid = None
     245      while True:
     246          call_item = call_queue.get(block=True)
     247          if call_item is None:
     248              # Wake up queue management thread
     249              result_queue.put(os.getpid())
     250              return
     251  
     252          if max_tasks is not None:
     253              num_tasks += 1
     254              if num_tasks >= max_tasks:
     255                  exit_pid = os.getpid()
     256  
     257          try:
     258              r = call_item.fn(*call_item.args, **call_item.kwargs)
     259          except BaseException as e:
     260              exc = _ExceptionWithTraceback(e, e.__traceback__)
     261              _sendback_result(result_queue, call_item.work_id, exception=exc,
     262                               exit_pid=exit_pid)
     263          else:
     264              _sendback_result(result_queue, call_item.work_id, result=r,
     265                               exit_pid=exit_pid)
     266              del r
     267  
     268          # Liberate the resource as soon as possible, to avoid holding onto
     269          # open files or shared memory that is not needed anymore
     270          del call_item
     271  
     272          if exit_pid is not None:
     273              return
     274  
     275  
     276  class ESC[4;38;5;81m_ExecutorManagerThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
     277      """Manages the communication between this process and the worker processes.
     278  
     279      The manager is run in a local thread.
     280  
     281      Args:
     282          executor: A reference to the ProcessPoolExecutor that owns
     283              this thread. A weakref will be own by the manager as well as
     284              references to internal objects used to introspect the state of
     285              the executor.
     286      """
     287  
     288      def __init__(self, executor):
     289          # Store references to necessary internals of the executor.
     290  
     291          # A _ThreadWakeup to allow waking up the queue_manager_thread from the
     292          # main Thread and avoid deadlocks caused by permanently locked queues.
     293          self.thread_wakeup = executor._executor_manager_thread_wakeup
     294          self.shutdown_lock = executor._shutdown_lock
     295  
     296          # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
     297          # to determine if the ProcessPoolExecutor has been garbage collected
     298          # and that the manager can exit.
     299          # When the executor gets garbage collected, the weakref callback
     300          # will wake up the queue management thread so that it can terminate
     301          # if there is no pending work item.
     302          def weakref_cb(_,
     303                         thread_wakeup=self.thread_wakeup,
     304                         shutdown_lock=self.shutdown_lock):
     305              mp.util.debug('Executor collected: triggering callback for'
     306                            ' QueueManager wakeup')
     307              with shutdown_lock:
     308                  thread_wakeup.wakeup()
     309  
     310          self.executor_reference = weakref.ref(executor, weakref_cb)
     311  
     312          # A list of the ctx.Process instances used as workers.
     313          self.processes = executor._processes
     314  
     315          # A ctx.Queue that will be filled with _CallItems derived from
     316          # _WorkItems for processing by the process workers.
     317          self.call_queue = executor._call_queue
     318  
     319          # A ctx.SimpleQueue of _ResultItems generated by the process workers.
     320          self.result_queue = executor._result_queue
     321  
     322          # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
     323          self.work_ids_queue = executor._work_ids
     324  
     325          # Maximum number of tasks a worker process can execute before
     326          # exiting safely
     327          self.max_tasks_per_child = executor._max_tasks_per_child
     328  
     329          # A dict mapping work ids to _WorkItems e.g.
     330          #     {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
     331          self.pending_work_items = executor._pending_work_items
     332  
     333          super().__init__()
     334  
     335      def run(self):
     336          # Main loop for the executor manager thread.
     337  
     338          while True:
     339              self.add_call_item_to_queue()
     340  
     341              result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
     342  
     343              if is_broken:
     344                  self.terminate_broken(cause)
     345                  return
     346              if result_item is not None:
     347                  self.process_result_item(result_item)
     348  
     349                  process_exited = result_item.exit_pid is not None
     350                  if process_exited:
     351                      p = self.processes.pop(result_item.exit_pid)
     352                      p.join()
     353  
     354                  # Delete reference to result_item to avoid keeping references
     355                  # while waiting on new results.
     356                  del result_item
     357  
     358                  if executor := self.executor_reference():
     359                      if process_exited:
     360                          with self.shutdown_lock:
     361                              executor._adjust_process_count()
     362                      else:
     363                          executor._idle_worker_semaphore.release()
     364                      del executor
     365  
     366              if self.is_shutting_down():
     367                  self.flag_executor_shutting_down()
     368  
     369                  # When only canceled futures remain in pending_work_items, our
     370                  # next call to wait_result_broken_or_wakeup would hang forever.
     371                  # This makes sure we have some running futures or none at all.
     372                  self.add_call_item_to_queue()
     373  
     374                  # Since no new work items can be added, it is safe to shutdown
     375                  # this thread if there are no pending work items.
     376                  if not self.pending_work_items:
     377                      self.join_executor_internals()
     378                      return
     379  
     380      def add_call_item_to_queue(self):
     381          # Fills call_queue with _WorkItems from pending_work_items.
     382          # This function never blocks.
     383          while True:
     384              if self.call_queue.full():
     385                  return
     386              try:
     387                  work_id = self.work_ids_queue.get(block=False)
     388              except queue.Empty:
     389                  return
     390              else:
     391                  work_item = self.pending_work_items[work_id]
     392  
     393                  if work_item.future.set_running_or_notify_cancel():
     394                      self.call_queue.put(_CallItem(work_id,
     395                                                    work_item.fn,
     396                                                    work_item.args,
     397                                                    work_item.kwargs),
     398                                          block=True)
     399                  else:
     400                      del self.pending_work_items[work_id]
     401                      continue
     402  
     403      def wait_result_broken_or_wakeup(self):
     404          # Wait for a result to be ready in the result_queue while checking
     405          # that all worker processes are still running, or for a wake up
     406          # signal send. The wake up signals come either from new tasks being
     407          # submitted, from the executor being shutdown/gc-ed, or from the
     408          # shutdown of the python interpreter.
     409          result_reader = self.result_queue._reader
     410          assert not self.thread_wakeup._closed
     411          wakeup_reader = self.thread_wakeup._reader
     412          readers = [result_reader, wakeup_reader]
     413          worker_sentinels = [p.sentinel for p in list(self.processes.values())]
     414          ready = mp.connection.wait(readers + worker_sentinels)
     415  
     416          cause = None
     417          is_broken = True
     418          result_item = None
     419          if result_reader in ready:
     420              try:
     421                  result_item = result_reader.recv()
     422                  is_broken = False
     423              except BaseException as e:
     424                  cause = format_exception(type(e), e, e.__traceback__)
     425  
     426          elif wakeup_reader in ready:
     427              is_broken = False
     428  
     429          with self.shutdown_lock:
     430              self.thread_wakeup.clear()
     431  
     432          return result_item, is_broken, cause
     433  
     434      def process_result_item(self, result_item):
     435          # Process the received a result_item. This can be either the PID of a
     436          # worker that exited gracefully or a _ResultItem
     437  
     438          if isinstance(result_item, int):
     439              # Clean shutdown of a worker using its PID
     440              # (avoids marking the executor broken)
     441              assert self.is_shutting_down()
     442              p = self.processes.pop(result_item)
     443              p.join()
     444              if not self.processes:
     445                  self.join_executor_internals()
     446                  return
     447          else:
     448              # Received a _ResultItem so mark the future as completed.
     449              work_item = self.pending_work_items.pop(result_item.work_id, None)
     450              # work_item can be None if another process terminated (see above)
     451              if work_item is not None:
     452                  if result_item.exception:
     453                      work_item.future.set_exception(result_item.exception)
     454                  else:
     455                      work_item.future.set_result(result_item.result)
     456  
     457      def is_shutting_down(self):
     458          # Check whether we should start shutting down the executor.
     459          executor = self.executor_reference()
     460          # No more work items can be added if:
     461          #   - The interpreter is shutting down OR
     462          #   - The executor that owns this worker has been collected OR
     463          #   - The executor that owns this worker has been shutdown.
     464          return (_global_shutdown or executor is None
     465                  or executor._shutdown_thread)
     466  
     467      def terminate_broken(self, cause):
     468          # Terminate the executor because it is in a broken state. The cause
     469          # argument can be used to display more information on the error that
     470          # lead the executor into becoming broken.
     471  
     472          # Mark the process pool broken so that submits fail right now.
     473          executor = self.executor_reference()
     474          if executor is not None:
     475              executor._broken = ('A child process terminated '
     476                                  'abruptly, the process pool is not '
     477                                  'usable anymore')
     478              executor._shutdown_thread = True
     479              executor = None
     480  
     481          # All pending tasks are to be marked failed with the following
     482          # BrokenProcessPool error
     483          bpe = BrokenProcessPool("A process in the process pool was "
     484                                  "terminated abruptly while the future was "
     485                                  "running or pending.")
     486          if cause is not None:
     487              bpe.__cause__ = _RemoteTraceback(
     488                  f"\n'''\n{''.join(cause)}'''")
     489  
     490          # Mark pending tasks as failed.
     491          for work_id, work_item in self.pending_work_items.items():
     492              work_item.future.set_exception(bpe)
     493              # Delete references to object. See issue16284
     494              del work_item
     495          self.pending_work_items.clear()
     496  
     497          # Terminate remaining workers forcibly: the queues or their
     498          # locks may be in a dirty state and block forever.
     499          for p in self.processes.values():
     500              p.terminate()
     501  
     502          # Prevent queue writing to a pipe which is no longer read.
     503          # https://github.com/python/cpython/issues/94777
     504          self.call_queue._reader.close()
     505  
     506          # clean up resources
     507          self.join_executor_internals()
     508  
     509      def flag_executor_shutting_down(self):
     510          # Flag the executor as shutting down and cancel remaining tasks if
     511          # requested as early as possible if it is not gc-ed yet.
     512          executor = self.executor_reference()
     513          if executor is not None:
     514              executor._shutdown_thread = True
     515              # Cancel pending work items if requested.
     516              if executor._cancel_pending_futures:
     517                  # Cancel all pending futures and update pending_work_items
     518                  # to only have futures that are currently running.
     519                  new_pending_work_items = {}
     520                  for work_id, work_item in self.pending_work_items.items():
     521                      if not work_item.future.cancel():
     522                          new_pending_work_items[work_id] = work_item
     523                  self.pending_work_items = new_pending_work_items
     524                  # Drain work_ids_queue since we no longer need to
     525                  # add items to the call queue.
     526                  while True:
     527                      try:
     528                          self.work_ids_queue.get_nowait()
     529                      except queue.Empty:
     530                          break
     531                  # Make sure we do this only once to not waste time looping
     532                  # on running processes over and over.
     533                  executor._cancel_pending_futures = False
     534  
     535      def shutdown_workers(self):
     536          n_children_to_stop = self.get_n_children_alive()
     537          n_sentinels_sent = 0
     538          # Send the right number of sentinels, to make sure all children are
     539          # properly terminated.
     540          while (n_sentinels_sent < n_children_to_stop
     541                  and self.get_n_children_alive() > 0):
     542              for i in range(n_children_to_stop - n_sentinels_sent):
     543                  try:
     544                      self.call_queue.put_nowait(None)
     545                      n_sentinels_sent += 1
     546                  except queue.Full:
     547                      break
     548  
     549      def join_executor_internals(self):
     550          self.shutdown_workers()
     551          # Release the queue's resources as soon as possible.
     552          self.call_queue.close()
     553          self.call_queue.join_thread()
     554          with self.shutdown_lock:
     555              self.thread_wakeup.close()
     556          # If .join() is not called on the created processes then
     557          # some ctx.Queue methods may deadlock on Mac OS X.
     558          for p in self.processes.values():
     559              p.join()
     560  
     561      def get_n_children_alive(self):
     562          # This is an upper bound on the number of children alive.
     563          return sum(p.is_alive() for p in self.processes.values())
     564  
     565  
     566  _system_limits_checked = False
     567  _system_limited = None
     568  
     569  
     570  def _check_system_limits():
     571      global _system_limits_checked, _system_limited
     572      if _system_limits_checked:
     573          if _system_limited:
     574              raise NotImplementedError(_system_limited)
     575      _system_limits_checked = True
     576      try:
     577          import multiprocessing.synchronize
     578      except ImportError:
     579          _system_limited = (
     580              "This Python build lacks multiprocessing.synchronize, usually due "
     581              "to named semaphores being unavailable on this platform."
     582          )
     583          raise NotImplementedError(_system_limited)
     584      try:
     585          nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
     586      except (AttributeError, ValueError):
     587          # sysconf not available or setting not available
     588          return
     589      if nsems_max == -1:
     590          # indetermined limit, assume that limit is determined
     591          # by available memory only
     592          return
     593      if nsems_max >= 256:
     594          # minimum number of semaphores available
     595          # according to POSIX
     596          return
     597      _system_limited = ("system provides too few semaphores (%d"
     598                         " available, 256 necessary)" % nsems_max)
     599      raise NotImplementedError(_system_limited)
     600  
     601  
     602  def _chain_from_iterable_of_lists(iterable):
     603      """
     604      Specialized implementation of itertools.chain.from_iterable.
     605      Each item in *iterable* should be a list.  This function is
     606      careful not to keep references to yielded objects.
     607      """
     608      for element in iterable:
     609          element.reverse()
     610          while element:
     611              yield element.pop()
     612  
     613  
     614  class ESC[4;38;5;81mBrokenProcessPool(ESC[4;38;5;149m_baseESC[4;38;5;149m.ESC[4;38;5;149mBrokenExecutor):
     615      """
     616      Raised when a process in a ProcessPoolExecutor terminated abruptly
     617      while a future was in the running state.
     618      """
     619  
     620  
     621  class ESC[4;38;5;81mProcessPoolExecutor(ESC[4;38;5;149m_baseESC[4;38;5;149m.ESC[4;38;5;149mExecutor):
     622      def __init__(self, max_workers=None, mp_context=None,
     623                   initializer=None, initargs=(), *, max_tasks_per_child=None):
     624          """Initializes a new ProcessPoolExecutor instance.
     625  
     626          Args:
     627              max_workers: The maximum number of processes that can be used to
     628                  execute the given calls. If None or not given then as many
     629                  worker processes will be created as the machine has processors.
     630              mp_context: A multiprocessing context to launch the workers created
     631                  using the multiprocessing.get_context('start method') API. This
     632                  object should provide SimpleQueue, Queue and Process.
     633              initializer: A callable used to initialize worker processes.
     634              initargs: A tuple of arguments to pass to the initializer.
     635              max_tasks_per_child: The maximum number of tasks a worker process
     636                  can complete before it will exit and be replaced with a fresh
     637                  worker process. The default of None means worker process will
     638                  live as long as the executor. Requires a non-'fork' mp_context
     639                  start method. When given, we default to using 'spawn' if no
     640                  mp_context is supplied.
     641          """
     642          _check_system_limits()
     643  
     644          if max_workers is None:
     645              self._max_workers = os.cpu_count() or 1
     646              if sys.platform == 'win32':
     647                  self._max_workers = min(_MAX_WINDOWS_WORKERS,
     648                                          self._max_workers)
     649          else:
     650              if max_workers <= 0:
     651                  raise ValueError("max_workers must be greater than 0")
     652              elif (sys.platform == 'win32' and
     653                  max_workers > _MAX_WINDOWS_WORKERS):
     654                  raise ValueError(
     655                      f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
     656  
     657              self._max_workers = max_workers
     658  
     659          if mp_context is None:
     660              if max_tasks_per_child is not None:
     661                  mp_context = mp.get_context("spawn")
     662              else:
     663                  mp_context = mp.get_context()
     664          self._mp_context = mp_context
     665  
     666          # https://github.com/python/cpython/issues/90622
     667          self._safe_to_dynamically_spawn_children = (
     668                  self._mp_context.get_start_method(allow_none=False) != "fork")
     669  
     670          if initializer is not None and not callable(initializer):
     671              raise TypeError("initializer must be a callable")
     672          self._initializer = initializer
     673          self._initargs = initargs
     674  
     675          if max_tasks_per_child is not None:
     676              if not isinstance(max_tasks_per_child, int):
     677                  raise TypeError("max_tasks_per_child must be an integer")
     678              elif max_tasks_per_child <= 0:
     679                  raise ValueError("max_tasks_per_child must be >= 1")
     680              if self._mp_context.get_start_method(allow_none=False) == "fork":
     681                  # https://github.com/python/cpython/issues/90622
     682                  raise ValueError("max_tasks_per_child is incompatible with"
     683                                   " the 'fork' multiprocessing start method;"
     684                                   " supply a different mp_context.")
     685          self._max_tasks_per_child = max_tasks_per_child
     686  
     687          # Management thread
     688          self._executor_manager_thread = None
     689  
     690          # Map of pids to processes
     691          self._processes = {}
     692  
     693          # Shutdown is a two-step process.
     694          self._shutdown_thread = False
     695          self._shutdown_lock = threading.Lock()
     696          self._idle_worker_semaphore = threading.Semaphore(0)
     697          self._broken = False
     698          self._queue_count = 0
     699          self._pending_work_items = {}
     700          self._cancel_pending_futures = False
     701  
     702          # _ThreadWakeup is a communication channel used to interrupt the wait
     703          # of the main loop of executor_manager_thread from another thread (e.g.
     704          # when calling executor.submit or executor.shutdown). We do not use the
     705          # _result_queue to send wakeup signals to the executor_manager_thread
     706          # as it could result in a deadlock if a worker process dies with the
     707          # _result_queue write lock still acquired.
     708          #
     709          # _shutdown_lock must be locked to access _ThreadWakeup.
     710          self._executor_manager_thread_wakeup = _ThreadWakeup()
     711  
     712          # Create communication channels for the executor
     713          # Make the call queue slightly larger than the number of processes to
     714          # prevent the worker processes from idling. But don't make it too big
     715          # because futures in the call queue cannot be cancelled.
     716          queue_size = self._max_workers + EXTRA_QUEUED_CALLS
     717          self._call_queue = _SafeQueue(
     718              max_size=queue_size, ctx=self._mp_context,
     719              pending_work_items=self._pending_work_items,
     720              shutdown_lock=self._shutdown_lock,
     721              thread_wakeup=self._executor_manager_thread_wakeup)
     722          # Killed worker processes can produce spurious "broken pipe"
     723          # tracebacks in the queue's own worker thread. But we detect killed
     724          # processes anyway, so silence the tracebacks.
     725          self._call_queue._ignore_epipe = True
     726          self._result_queue = mp_context.SimpleQueue()
     727          self._work_ids = queue.Queue()
     728  
     729      def _start_executor_manager_thread(self):
     730          if self._executor_manager_thread is None:
     731              # Start the processes so that their sentinels are known.
     732              if not self._safe_to_dynamically_spawn_children:  # ie, using fork.
     733                  self._launch_processes()
     734              self._executor_manager_thread = _ExecutorManagerThread(self)
     735              self._executor_manager_thread.start()
     736              _threads_wakeups[self._executor_manager_thread] = \
     737                  self._executor_manager_thread_wakeup
     738  
     739      def _adjust_process_count(self):
     740          # if there's an idle process, we don't need to spawn a new one.
     741          if self._idle_worker_semaphore.acquire(blocking=False):
     742              return
     743  
     744          process_count = len(self._processes)
     745          if process_count < self._max_workers:
     746              # Assertion disabled as this codepath is also used to replace a
     747              # worker that unexpectedly dies, even when using the 'fork' start
     748              # method. That means there is still a potential deadlock bug. If a
     749              # 'fork' mp_context worker dies, we'll be forking a new one when
     750              # we know a thread is running (self._executor_manager_thread).
     751              #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
     752              self._spawn_process()
     753  
     754      def _launch_processes(self):
     755          # https://github.com/python/cpython/issues/90622
     756          assert not self._executor_manager_thread, (
     757                  'Processes cannot be fork()ed after the thread has started, '
     758                  'deadlock in the child processes could result.')
     759          for _ in range(len(self._processes), self._max_workers):
     760              self._spawn_process()
     761  
     762      def _spawn_process(self):
     763          p = self._mp_context.Process(
     764              target=_process_worker,
     765              args=(self._call_queue,
     766                    self._result_queue,
     767                    self._initializer,
     768                    self._initargs,
     769                    self._max_tasks_per_child))
     770          p.start()
     771          self._processes[p.pid] = p
     772  
     773      def submit(self, fn, /, *args, **kwargs):
     774          with self._shutdown_lock:
     775              if self._broken:
     776                  raise BrokenProcessPool(self._broken)
     777              if self._shutdown_thread:
     778                  raise RuntimeError('cannot schedule new futures after shutdown')
     779              if _global_shutdown:
     780                  raise RuntimeError('cannot schedule new futures after '
     781                                     'interpreter shutdown')
     782  
     783              f = _base.Future()
     784              w = _WorkItem(f, fn, args, kwargs)
     785  
     786              self._pending_work_items[self._queue_count] = w
     787              self._work_ids.put(self._queue_count)
     788              self._queue_count += 1
     789              # Wake up queue management thread
     790              self._executor_manager_thread_wakeup.wakeup()
     791  
     792              if self._safe_to_dynamically_spawn_children:
     793                  self._adjust_process_count()
     794              self._start_executor_manager_thread()
     795              return f
     796      submit.__doc__ = _base.Executor.submit.__doc__
     797  
     798      def map(self, fn, *iterables, timeout=None, chunksize=1):
     799          """Returns an iterator equivalent to map(fn, iter).
     800  
     801          Args:
     802              fn: A callable that will take as many arguments as there are
     803                  passed iterables.
     804              timeout: The maximum number of seconds to wait. If None, then there
     805                  is no limit on the wait time.
     806              chunksize: If greater than one, the iterables will be chopped into
     807                  chunks of size chunksize and submitted to the process pool.
     808                  If set to one, the items in the list will be sent one at a time.
     809  
     810          Returns:
     811              An iterator equivalent to: map(func, *iterables) but the calls may
     812              be evaluated out-of-order.
     813  
     814          Raises:
     815              TimeoutError: If the entire result iterator could not be generated
     816                  before the given timeout.
     817              Exception: If fn(*args) raises for any values.
     818          """
     819          if chunksize < 1:
     820              raise ValueError("chunksize must be >= 1.")
     821  
     822          results = super().map(partial(_process_chunk, fn),
     823                                _get_chunks(*iterables, chunksize=chunksize),
     824                                timeout=timeout)
     825          return _chain_from_iterable_of_lists(results)
     826  
     827      def shutdown(self, wait=True, *, cancel_futures=False):
     828          with self._shutdown_lock:
     829              self._cancel_pending_futures = cancel_futures
     830              self._shutdown_thread = True
     831              if self._executor_manager_thread_wakeup is not None:
     832                  # Wake up queue management thread
     833                  self._executor_manager_thread_wakeup.wakeup()
     834  
     835          if self._executor_manager_thread is not None and wait:
     836              self._executor_manager_thread.join()
     837          # To reduce the risk of opening too many files, remove references to
     838          # objects that use file descriptors.
     839          self._executor_manager_thread = None
     840          self._call_queue = None
     841          if self._result_queue is not None and wait:
     842              self._result_queue.close()
     843          self._result_queue = None
     844          self._processes = None
     845          self._executor_manager_thread_wakeup = None
     846  
     847      shutdown.__doc__ = _base.Executor.shutdown.__doc__