python (3.11.7)

(root)/
lib/
python3.11/
concurrent/
futures/
thread.py
       1  # Copyright 2009 Brian Quinlan. All Rights Reserved.
       2  # Licensed to PSF under a Contributor Agreement.
       3  
       4  """Implements ThreadPoolExecutor."""
       5  
       6  __author__ = 'Brian Quinlan (brian@sweetapp.com)'
       7  
       8  from concurrent.futures import _base
       9  import itertools
      10  import queue
      11  import threading
      12  import types
      13  import weakref
      14  import os
      15  
      16  
      17  _threads_queues = weakref.WeakKeyDictionary()
      18  _shutdown = False
      19  # Lock that ensures that new workers are not created while the interpreter is
      20  # shutting down. Must be held while mutating _threads_queues and _shutdown.
      21  _global_shutdown_lock = threading.Lock()
      22  
      23  def _python_exit():
      24      global _shutdown
      25      with _global_shutdown_lock:
      26          _shutdown = True
      27      items = list(_threads_queues.items())
      28      for t, q in items:
      29          q.put(None)
      30      for t, q in items:
      31          t.join()
      32  
      33  # Register for `_python_exit()` to be called just before joining all
      34  # non-daemon threads. This is used instead of `atexit.register()` for
      35  # compatibility with subinterpreters, which no longer support daemon threads.
      36  # See bpo-39812 for context.
      37  threading._register_atexit(_python_exit)
      38  
      39  # At fork, reinitialize the `_global_shutdown_lock` lock in the child process
      40  if hasattr(os, 'register_at_fork'):
      41      os.register_at_fork(before=_global_shutdown_lock.acquire,
      42                          after_in_child=_global_shutdown_lock._at_fork_reinit,
      43                          after_in_parent=_global_shutdown_lock.release)
      44  
      45  
      46  class ESC[4;38;5;81m_WorkItem(ESC[4;38;5;149mobject):
      47      def __init__(self, future, fn, args, kwargs):
      48          self.future = future
      49          self.fn = fn
      50          self.args = args
      51          self.kwargs = kwargs
      52  
      53      def run(self):
      54          if not self.future.set_running_or_notify_cancel():
      55              return
      56  
      57          try:
      58              result = self.fn(*self.args, **self.kwargs)
      59          except BaseException as exc:
      60              self.future.set_exception(exc)
      61              # Break a reference cycle with the exception 'exc'
      62              self = None
      63          else:
      64              self.future.set_result(result)
      65  
      66      __class_getitem__ = classmethod(types.GenericAlias)
      67  
      68  
      69  def _worker(executor_reference, work_queue, initializer, initargs):
      70      if initializer is not None:
      71          try:
      72              initializer(*initargs)
      73          except BaseException:
      74              _base.LOGGER.critical('Exception in initializer:', exc_info=True)
      75              executor = executor_reference()
      76              if executor is not None:
      77                  executor._initializer_failed()
      78              return
      79      try:
      80          while True:
      81              work_item = work_queue.get(block=True)
      82              if work_item is not None:
      83                  work_item.run()
      84                  # Delete references to object. See issue16284
      85                  del work_item
      86  
      87                  # attempt to increment idle count
      88                  executor = executor_reference()
      89                  if executor is not None:
      90                      executor._idle_semaphore.release()
      91                  del executor
      92                  continue
      93  
      94              executor = executor_reference()
      95              # Exit if:
      96              #   - The interpreter is shutting down OR
      97              #   - The executor that owns the worker has been collected OR
      98              #   - The executor that owns the worker has been shutdown.
      99              if _shutdown or executor is None or executor._shutdown:
     100                  # Flag the executor as shutting down as early as possible if it
     101                  # is not gc-ed yet.
     102                  if executor is not None:
     103                      executor._shutdown = True
     104                  # Notice other workers
     105                  work_queue.put(None)
     106                  return
     107              del executor
     108      except BaseException:
     109          _base.LOGGER.critical('Exception in worker', exc_info=True)
     110  
     111  
     112  class ESC[4;38;5;81mBrokenThreadPool(ESC[4;38;5;149m_baseESC[4;38;5;149m.ESC[4;38;5;149mBrokenExecutor):
     113      """
     114      Raised when a worker thread in a ThreadPoolExecutor failed initializing.
     115      """
     116  
     117  
     118  class ESC[4;38;5;81mThreadPoolExecutor(ESC[4;38;5;149m_baseESC[4;38;5;149m.ESC[4;38;5;149mExecutor):
     119  
     120      # Used to assign unique thread names when thread_name_prefix is not supplied.
     121      _counter = itertools.count().__next__
     122  
     123      def __init__(self, max_workers=None, thread_name_prefix='',
     124                   initializer=None, initargs=()):
     125          """Initializes a new ThreadPoolExecutor instance.
     126  
     127          Args:
     128              max_workers: The maximum number of threads that can be used to
     129                  execute the given calls.
     130              thread_name_prefix: An optional name prefix to give our threads.
     131              initializer: A callable used to initialize worker threads.
     132              initargs: A tuple of arguments to pass to the initializer.
     133          """
     134          if max_workers is None:
     135              # ThreadPoolExecutor is often used to:
     136              # * CPU bound task which releases GIL
     137              # * I/O bound task (which releases GIL, of course)
     138              #
     139              # We use cpu_count + 4 for both types of tasks.
     140              # But we limit it to 32 to avoid consuming surprisingly large resource
     141              # on many core machine.
     142              max_workers = min(32, (os.cpu_count() or 1) + 4)
     143          if max_workers <= 0:
     144              raise ValueError("max_workers must be greater than 0")
     145  
     146          if initializer is not None and not callable(initializer):
     147              raise TypeError("initializer must be a callable")
     148  
     149          self._max_workers = max_workers
     150          self._work_queue = queue.SimpleQueue()
     151          self._idle_semaphore = threading.Semaphore(0)
     152          self._threads = set()
     153          self._broken = False
     154          self._shutdown = False
     155          self._shutdown_lock = threading.Lock()
     156          self._thread_name_prefix = (thread_name_prefix or
     157                                      ("ThreadPoolExecutor-%d" % self._counter()))
     158          self._initializer = initializer
     159          self._initargs = initargs
     160  
     161      def submit(self, fn, /, *args, **kwargs):
     162          with self._shutdown_lock, _global_shutdown_lock:
     163              if self._broken:
     164                  raise BrokenThreadPool(self._broken)
     165  
     166              if self._shutdown:
     167                  raise RuntimeError('cannot schedule new futures after shutdown')
     168              if _shutdown:
     169                  raise RuntimeError('cannot schedule new futures after '
     170                                     'interpreter shutdown')
     171  
     172              f = _base.Future()
     173              w = _WorkItem(f, fn, args, kwargs)
     174  
     175              self._work_queue.put(w)
     176              self._adjust_thread_count()
     177              return f
     178      submit.__doc__ = _base.Executor.submit.__doc__
     179  
     180      def _adjust_thread_count(self):
     181          # if idle threads are available, don't spin new threads
     182          if self._idle_semaphore.acquire(timeout=0):
     183              return
     184  
     185          # When the executor gets lost, the weakref callback will wake up
     186          # the worker threads.
     187          def weakref_cb(_, q=self._work_queue):
     188              q.put(None)
     189  
     190          num_threads = len(self._threads)
     191          if num_threads < self._max_workers:
     192              thread_name = '%s_%d' % (self._thread_name_prefix or self,
     193                                       num_threads)
     194              t = threading.Thread(name=thread_name, target=_worker,
     195                                   args=(weakref.ref(self, weakref_cb),
     196                                         self._work_queue,
     197                                         self._initializer,
     198                                         self._initargs))
     199              t.start()
     200              self._threads.add(t)
     201              _threads_queues[t] = self._work_queue
     202  
     203      def _initializer_failed(self):
     204          with self._shutdown_lock:
     205              self._broken = ('A thread initializer failed, the thread pool '
     206                              'is not usable anymore')
     207              # Drain work queue and mark pending futures failed
     208              while True:
     209                  try:
     210                      work_item = self._work_queue.get_nowait()
     211                  except queue.Empty:
     212                      break
     213                  if work_item is not None:
     214                      work_item.future.set_exception(BrokenThreadPool(self._broken))
     215  
     216      def shutdown(self, wait=True, *, cancel_futures=False):
     217          with self._shutdown_lock:
     218              self._shutdown = True
     219              if cancel_futures:
     220                  # Drain all work items from the queue, and then cancel their
     221                  # associated futures.
     222                  while True:
     223                      try:
     224                          work_item = self._work_queue.get_nowait()
     225                      except queue.Empty:
     226                          break
     227                      if work_item is not None:
     228                          work_item.future.cancel()
     229  
     230              # Send a wake-up to prevent threads calling
     231              # _work_queue.get(block=True) from permanently blocking.
     232              self._work_queue.put(None)
     233          if wait:
     234              for t in self._threads:
     235                  t.join()
     236      shutdown.__doc__ = _base.Executor.shutdown.__doc__