python (3.11.7)

(root)/
lib/
python3.11/
concurrent/
futures/
_base.py
       1  # Copyright 2009 Brian Quinlan. All Rights Reserved.
       2  # Licensed to PSF under a Contributor Agreement.
       3  
       4  __author__ = 'Brian Quinlan (brian@sweetapp.com)'
       5  
       6  import collections
       7  import logging
       8  import threading
       9  import time
      10  import types
      11  
      12  FIRST_COMPLETED = 'FIRST_COMPLETED'
      13  FIRST_EXCEPTION = 'FIRST_EXCEPTION'
      14  ALL_COMPLETED = 'ALL_COMPLETED'
      15  _AS_COMPLETED = '_AS_COMPLETED'
      16  
      17  # Possible future states (for internal use by the futures package).
      18  PENDING = 'PENDING'
      19  RUNNING = 'RUNNING'
      20  # The future was cancelled by the user...
      21  CANCELLED = 'CANCELLED'
      22  # ...and _Waiter.add_cancelled() was called by a worker.
      23  CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
      24  FINISHED = 'FINISHED'
      25  
      26  _FUTURE_STATES = [
      27      PENDING,
      28      RUNNING,
      29      CANCELLED,
      30      CANCELLED_AND_NOTIFIED,
      31      FINISHED
      32  ]
      33  
      34  _STATE_TO_DESCRIPTION_MAP = {
      35      PENDING: "pending",
      36      RUNNING: "running",
      37      CANCELLED: "cancelled",
      38      CANCELLED_AND_NOTIFIED: "cancelled",
      39      FINISHED: "finished"
      40  }
      41  
      42  # Logger for internal use by the futures package.
      43  LOGGER = logging.getLogger("concurrent.futures")
      44  
      45  class ESC[4;38;5;81mError(ESC[4;38;5;149mException):
      46      """Base class for all future-related exceptions."""
      47      pass
      48  
      49  class ESC[4;38;5;81mCancelledError(ESC[4;38;5;149mError):
      50      """The Future was cancelled."""
      51      pass
      52  
      53  TimeoutError = TimeoutError  # make local alias for the standard exception
      54  
      55  class ESC[4;38;5;81mInvalidStateError(ESC[4;38;5;149mError):
      56      """The operation is not allowed in this state."""
      57      pass
      58  
      59  class ESC[4;38;5;81m_Waiter(ESC[4;38;5;149mobject):
      60      """Provides the event that wait() and as_completed() block on."""
      61      def __init__(self):
      62          self.event = threading.Event()
      63          self.finished_futures = []
      64  
      65      def add_result(self, future):
      66          self.finished_futures.append(future)
      67  
      68      def add_exception(self, future):
      69          self.finished_futures.append(future)
      70  
      71      def add_cancelled(self, future):
      72          self.finished_futures.append(future)
      73  
      74  class ESC[4;38;5;81m_AsCompletedWaiter(ESC[4;38;5;149m_Waiter):
      75      """Used by as_completed()."""
      76  
      77      def __init__(self):
      78          super(_AsCompletedWaiter, self).__init__()
      79          self.lock = threading.Lock()
      80  
      81      def add_result(self, future):
      82          with self.lock:
      83              super(_AsCompletedWaiter, self).add_result(future)
      84              self.event.set()
      85  
      86      def add_exception(self, future):
      87          with self.lock:
      88              super(_AsCompletedWaiter, self).add_exception(future)
      89              self.event.set()
      90  
      91      def add_cancelled(self, future):
      92          with self.lock:
      93              super(_AsCompletedWaiter, self).add_cancelled(future)
      94              self.event.set()
      95  
      96  class ESC[4;38;5;81m_FirstCompletedWaiter(ESC[4;38;5;149m_Waiter):
      97      """Used by wait(return_when=FIRST_COMPLETED)."""
      98  
      99      def add_result(self, future):
     100          super().add_result(future)
     101          self.event.set()
     102  
     103      def add_exception(self, future):
     104          super().add_exception(future)
     105          self.event.set()
     106  
     107      def add_cancelled(self, future):
     108          super().add_cancelled(future)
     109          self.event.set()
     110  
     111  class ESC[4;38;5;81m_AllCompletedWaiter(ESC[4;38;5;149m_Waiter):
     112      """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
     113  
     114      def __init__(self, num_pending_calls, stop_on_exception):
     115          self.num_pending_calls = num_pending_calls
     116          self.stop_on_exception = stop_on_exception
     117          self.lock = threading.Lock()
     118          super().__init__()
     119  
     120      def _decrement_pending_calls(self):
     121          with self.lock:
     122              self.num_pending_calls -= 1
     123              if not self.num_pending_calls:
     124                  self.event.set()
     125  
     126      def add_result(self, future):
     127          super().add_result(future)
     128          self._decrement_pending_calls()
     129  
     130      def add_exception(self, future):
     131          super().add_exception(future)
     132          if self.stop_on_exception:
     133              self.event.set()
     134          else:
     135              self._decrement_pending_calls()
     136  
     137      def add_cancelled(self, future):
     138          super().add_cancelled(future)
     139          self._decrement_pending_calls()
     140  
     141  class ESC[4;38;5;81m_AcquireFutures(ESC[4;38;5;149mobject):
     142      """A context manager that does an ordered acquire of Future conditions."""
     143  
     144      def __init__(self, futures):
     145          self.futures = sorted(futures, key=id)
     146  
     147      def __enter__(self):
     148          for future in self.futures:
     149              future._condition.acquire()
     150  
     151      def __exit__(self, *args):
     152          for future in self.futures:
     153              future._condition.release()
     154  
     155  def _create_and_install_waiters(fs, return_when):
     156      if return_when == _AS_COMPLETED:
     157          waiter = _AsCompletedWaiter()
     158      elif return_when == FIRST_COMPLETED:
     159          waiter = _FirstCompletedWaiter()
     160      else:
     161          pending_count = sum(
     162                  f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
     163  
     164          if return_when == FIRST_EXCEPTION:
     165              waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
     166          elif return_when == ALL_COMPLETED:
     167              waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
     168          else:
     169              raise ValueError("Invalid return condition: %r" % return_when)
     170  
     171      for f in fs:
     172          f._waiters.append(waiter)
     173  
     174      return waiter
     175  
     176  
     177  def _yield_finished_futures(fs, waiter, ref_collect):
     178      """
     179      Iterate on the list *fs*, yielding finished futures one by one in
     180      reverse order.
     181      Before yielding a future, *waiter* is removed from its waiters
     182      and the future is removed from each set in the collection of sets
     183      *ref_collect*.
     184  
     185      The aim of this function is to avoid keeping stale references after
     186      the future is yielded and before the iterator resumes.
     187      """
     188      while fs:
     189          f = fs[-1]
     190          for futures_set in ref_collect:
     191              futures_set.remove(f)
     192          with f._condition:
     193              f._waiters.remove(waiter)
     194          del f
     195          # Careful not to keep a reference to the popped value
     196          yield fs.pop()
     197  
     198  
     199  def as_completed(fs, timeout=None):
     200      """An iterator over the given futures that yields each as it completes.
     201  
     202      Args:
     203          fs: The sequence of Futures (possibly created by different Executors) to
     204              iterate over.
     205          timeout: The maximum number of seconds to wait. If None, then there
     206              is no limit on the wait time.
     207  
     208      Returns:
     209          An iterator that yields the given Futures as they complete (finished or
     210          cancelled). If any given Futures are duplicated, they will be returned
     211          once.
     212  
     213      Raises:
     214          TimeoutError: If the entire result iterator could not be generated
     215              before the given timeout.
     216      """
     217      if timeout is not None:
     218          end_time = timeout + time.monotonic()
     219  
     220      fs = set(fs)
     221      total_futures = len(fs)
     222      with _AcquireFutures(fs):
     223          finished = set(
     224                  f for f in fs
     225                  if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
     226          pending = fs - finished
     227          waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
     228      finished = list(finished)
     229      try:
     230          yield from _yield_finished_futures(finished, waiter,
     231                                             ref_collect=(fs,))
     232  
     233          while pending:
     234              if timeout is None:
     235                  wait_timeout = None
     236              else:
     237                  wait_timeout = end_time - time.monotonic()
     238                  if wait_timeout < 0:
     239                      raise TimeoutError(
     240                              '%d (of %d) futures unfinished' % (
     241                              len(pending), total_futures))
     242  
     243              waiter.event.wait(wait_timeout)
     244  
     245              with waiter.lock:
     246                  finished = waiter.finished_futures
     247                  waiter.finished_futures = []
     248                  waiter.event.clear()
     249  
     250              # reverse to keep finishing order
     251              finished.reverse()
     252              yield from _yield_finished_futures(finished, waiter,
     253                                                 ref_collect=(fs, pending))
     254  
     255      finally:
     256          # Remove waiter from unfinished futures
     257          for f in fs:
     258              with f._condition:
     259                  f._waiters.remove(waiter)
     260  
     261  DoneAndNotDoneFutures = collections.namedtuple(
     262          'DoneAndNotDoneFutures', 'done not_done')
     263  def wait(fs, timeout=None, return_when=ALL_COMPLETED):
     264      """Wait for the futures in the given sequence to complete.
     265  
     266      Args:
     267          fs: The sequence of Futures (possibly created by different Executors) to
     268              wait upon.
     269          timeout: The maximum number of seconds to wait. If None, then there
     270              is no limit on the wait time.
     271          return_when: Indicates when this function should return. The options
     272              are:
     273  
     274              FIRST_COMPLETED - Return when any future finishes or is
     275                                cancelled.
     276              FIRST_EXCEPTION - Return when any future finishes by raising an
     277                                exception. If no future raises an exception
     278                                then it is equivalent to ALL_COMPLETED.
     279              ALL_COMPLETED -   Return when all futures finish or are cancelled.
     280  
     281      Returns:
     282          A named 2-tuple of sets. The first set, named 'done', contains the
     283          futures that completed (is finished or cancelled) before the wait
     284          completed. The second set, named 'not_done', contains uncompleted
     285          futures. Duplicate futures given to *fs* are removed and will be
     286          returned only once.
     287      """
     288      fs = set(fs)
     289      with _AcquireFutures(fs):
     290          done = {f for f in fs
     291                     if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]}
     292          not_done = fs - done
     293          if (return_when == FIRST_COMPLETED) and done:
     294              return DoneAndNotDoneFutures(done, not_done)
     295          elif (return_when == FIRST_EXCEPTION) and done:
     296              if any(f for f in done
     297                     if not f.cancelled() and f.exception() is not None):
     298                  return DoneAndNotDoneFutures(done, not_done)
     299  
     300          if len(done) == len(fs):
     301              return DoneAndNotDoneFutures(done, not_done)
     302  
     303          waiter = _create_and_install_waiters(fs, return_when)
     304  
     305      waiter.event.wait(timeout)
     306      for f in fs:
     307          with f._condition:
     308              f._waiters.remove(waiter)
     309  
     310      done.update(waiter.finished_futures)
     311      return DoneAndNotDoneFutures(done, fs - done)
     312  
     313  
     314  def _result_or_cancel(fut, timeout=None):
     315      try:
     316          try:
     317              return fut.result(timeout)
     318          finally:
     319              fut.cancel()
     320      finally:
     321          # Break a reference cycle with the exception in self._exception
     322          del fut
     323  
     324  
     325  class ESC[4;38;5;81mFuture(ESC[4;38;5;149mobject):
     326      """Represents the result of an asynchronous computation."""
     327  
     328      def __init__(self):
     329          """Initializes the future. Should not be called by clients."""
     330          self._condition = threading.Condition()
     331          self._state = PENDING
     332          self._result = None
     333          self._exception = None
     334          self._waiters = []
     335          self._done_callbacks = []
     336  
     337      def _invoke_callbacks(self):
     338          for callback in self._done_callbacks:
     339              try:
     340                  callback(self)
     341              except Exception:
     342                  LOGGER.exception('exception calling callback for %r', self)
     343  
     344      def __repr__(self):
     345          with self._condition:
     346              if self._state == FINISHED:
     347                  if self._exception:
     348                      return '<%s at %#x state=%s raised %s>' % (
     349                          self.__class__.__name__,
     350                          id(self),
     351                          _STATE_TO_DESCRIPTION_MAP[self._state],
     352                          self._exception.__class__.__name__)
     353                  else:
     354                      return '<%s at %#x state=%s returned %s>' % (
     355                          self.__class__.__name__,
     356                          id(self),
     357                          _STATE_TO_DESCRIPTION_MAP[self._state],
     358                          self._result.__class__.__name__)
     359              return '<%s at %#x state=%s>' % (
     360                      self.__class__.__name__,
     361                      id(self),
     362                     _STATE_TO_DESCRIPTION_MAP[self._state])
     363  
     364      def cancel(self):
     365          """Cancel the future if possible.
     366  
     367          Returns True if the future was cancelled, False otherwise. A future
     368          cannot be cancelled if it is running or has already completed.
     369          """
     370          with self._condition:
     371              if self._state in [RUNNING, FINISHED]:
     372                  return False
     373  
     374              if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
     375                  return True
     376  
     377              self._state = CANCELLED
     378              self._condition.notify_all()
     379  
     380          self._invoke_callbacks()
     381          return True
     382  
     383      def cancelled(self):
     384          """Return True if the future was cancelled."""
     385          with self._condition:
     386              return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
     387  
     388      def running(self):
     389          """Return True if the future is currently executing."""
     390          with self._condition:
     391              return self._state == RUNNING
     392  
     393      def done(self):
     394          """Return True if the future was cancelled or finished executing."""
     395          with self._condition:
     396              return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
     397  
     398      def __get_result(self):
     399          if self._exception:
     400              try:
     401                  raise self._exception
     402              finally:
     403                  # Break a reference cycle with the exception in self._exception
     404                  self = None
     405          else:
     406              return self._result
     407  
     408      def add_done_callback(self, fn):
     409          """Attaches a callable that will be called when the future finishes.
     410  
     411          Args:
     412              fn: A callable that will be called with this future as its only
     413                  argument when the future completes or is cancelled. The callable
     414                  will always be called by a thread in the same process in which
     415                  it was added. If the future has already completed or been
     416                  cancelled then the callable will be called immediately. These
     417                  callables are called in the order that they were added.
     418          """
     419          with self._condition:
     420              if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
     421                  self._done_callbacks.append(fn)
     422                  return
     423          try:
     424              fn(self)
     425          except Exception:
     426              LOGGER.exception('exception calling callback for %r', self)
     427  
     428      def result(self, timeout=None):
     429          """Return the result of the call that the future represents.
     430  
     431          Args:
     432              timeout: The number of seconds to wait for the result if the future
     433                  isn't done. If None, then there is no limit on the wait time.
     434  
     435          Returns:
     436              The result of the call that the future represents.
     437  
     438          Raises:
     439              CancelledError: If the future was cancelled.
     440              TimeoutError: If the future didn't finish executing before the given
     441                  timeout.
     442              Exception: If the call raised then that exception will be raised.
     443          """
     444          try:
     445              with self._condition:
     446                  if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
     447                      raise CancelledError()
     448                  elif self._state == FINISHED:
     449                      return self.__get_result()
     450  
     451                  self._condition.wait(timeout)
     452  
     453                  if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
     454                      raise CancelledError()
     455                  elif self._state == FINISHED:
     456                      return self.__get_result()
     457                  else:
     458                      raise TimeoutError()
     459          finally:
     460              # Break a reference cycle with the exception in self._exception
     461              self = None
     462  
     463      def exception(self, timeout=None):
     464          """Return the exception raised by the call that the future represents.
     465  
     466          Args:
     467              timeout: The number of seconds to wait for the exception if the
     468                  future isn't done. If None, then there is no limit on the wait
     469                  time.
     470  
     471          Returns:
     472              The exception raised by the call that the future represents or None
     473              if the call completed without raising.
     474  
     475          Raises:
     476              CancelledError: If the future was cancelled.
     477              TimeoutError: If the future didn't finish executing before the given
     478                  timeout.
     479          """
     480  
     481          with self._condition:
     482              if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
     483                  raise CancelledError()
     484              elif self._state == FINISHED:
     485                  return self._exception
     486  
     487              self._condition.wait(timeout)
     488  
     489              if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
     490                  raise CancelledError()
     491              elif self._state == FINISHED:
     492                  return self._exception
     493              else:
     494                  raise TimeoutError()
     495  
     496      # The following methods should only be used by Executors and in tests.
     497      def set_running_or_notify_cancel(self):
     498          """Mark the future as running or process any cancel notifications.
     499  
     500          Should only be used by Executor implementations and unit tests.
     501  
     502          If the future has been cancelled (cancel() was called and returned
     503          True) then any threads waiting on the future completing (though calls
     504          to as_completed() or wait()) are notified and False is returned.
     505  
     506          If the future was not cancelled then it is put in the running state
     507          (future calls to running() will return True) and True is returned.
     508  
     509          This method should be called by Executor implementations before
     510          executing the work associated with this future. If this method returns
     511          False then the work should not be executed.
     512  
     513          Returns:
     514              False if the Future was cancelled, True otherwise.
     515  
     516          Raises:
     517              RuntimeError: if this method was already called or if set_result()
     518                  or set_exception() was called.
     519          """
     520          with self._condition:
     521              if self._state == CANCELLED:
     522                  self._state = CANCELLED_AND_NOTIFIED
     523                  for waiter in self._waiters:
     524                      waiter.add_cancelled(self)
     525                  # self._condition.notify_all() is not necessary because
     526                  # self.cancel() triggers a notification.
     527                  return False
     528              elif self._state == PENDING:
     529                  self._state = RUNNING
     530                  return True
     531              else:
     532                  LOGGER.critical('Future %s in unexpected state: %s',
     533                                  id(self),
     534                                  self._state)
     535                  raise RuntimeError('Future in unexpected state')
     536  
     537      def set_result(self, result):
     538          """Sets the return value of work associated with the future.
     539  
     540          Should only be used by Executor implementations and unit tests.
     541          """
     542          with self._condition:
     543              if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
     544                  raise InvalidStateError('{}: {!r}'.format(self._state, self))
     545              self._result = result
     546              self._state = FINISHED
     547              for waiter in self._waiters:
     548                  waiter.add_result(self)
     549              self._condition.notify_all()
     550          self._invoke_callbacks()
     551  
     552      def set_exception(self, exception):
     553          """Sets the result of the future as being the given exception.
     554  
     555          Should only be used by Executor implementations and unit tests.
     556          """
     557          with self._condition:
     558              if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
     559                  raise InvalidStateError('{}: {!r}'.format(self._state, self))
     560              self._exception = exception
     561              self._state = FINISHED
     562              for waiter in self._waiters:
     563                  waiter.add_exception(self)
     564              self._condition.notify_all()
     565          self._invoke_callbacks()
     566  
     567      __class_getitem__ = classmethod(types.GenericAlias)
     568  
     569  class ESC[4;38;5;81mExecutor(ESC[4;38;5;149mobject):
     570      """This is an abstract base class for concrete asynchronous executors."""
     571  
     572      def submit(self, fn, /, *args, **kwargs):
     573          """Submits a callable to be executed with the given arguments.
     574  
     575          Schedules the callable to be executed as fn(*args, **kwargs) and returns
     576          a Future instance representing the execution of the callable.
     577  
     578          Returns:
     579              A Future representing the given call.
     580          """
     581          raise NotImplementedError()
     582  
     583      def map(self, fn, *iterables, timeout=None, chunksize=1):
     584          """Returns an iterator equivalent to map(fn, iter).
     585  
     586          Args:
     587              fn: A callable that will take as many arguments as there are
     588                  passed iterables.
     589              timeout: The maximum number of seconds to wait. If None, then there
     590                  is no limit on the wait time.
     591              chunksize: The size of the chunks the iterable will be broken into
     592                  before being passed to a child process. This argument is only
     593                  used by ProcessPoolExecutor; it is ignored by
     594                  ThreadPoolExecutor.
     595  
     596          Returns:
     597              An iterator equivalent to: map(func, *iterables) but the calls may
     598              be evaluated out-of-order.
     599  
     600          Raises:
     601              TimeoutError: If the entire result iterator could not be generated
     602                  before the given timeout.
     603              Exception: If fn(*args) raises for any values.
     604          """
     605          if timeout is not None:
     606              end_time = timeout + time.monotonic()
     607  
     608          fs = [self.submit(fn, *args) for args in zip(*iterables)]
     609  
     610          # Yield must be hidden in closure so that the futures are submitted
     611          # before the first iterator value is required.
     612          def result_iterator():
     613              try:
     614                  # reverse to keep finishing order
     615                  fs.reverse()
     616                  while fs:
     617                      # Careful not to keep a reference to the popped future
     618                      if timeout is None:
     619                          yield _result_or_cancel(fs.pop())
     620                      else:
     621                          yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
     622              finally:
     623                  for future in fs:
     624                      future.cancel()
     625          return result_iterator()
     626  
     627      def shutdown(self, wait=True, *, cancel_futures=False):
     628          """Clean-up the resources associated with the Executor.
     629  
     630          It is safe to call this method several times. Otherwise, no other
     631          methods can be called after this one.
     632  
     633          Args:
     634              wait: If True then shutdown will not return until all running
     635                  futures have finished executing and the resources used by the
     636                  executor have been reclaimed.
     637              cancel_futures: If True then shutdown will cancel all pending
     638                  futures. Futures that are completed or running will not be
     639                  cancelled.
     640          """
     641          pass
     642  
     643      def __enter__(self):
     644          return self
     645  
     646      def __exit__(self, exc_type, exc_val, exc_tb):
     647          self.shutdown(wait=True)
     648          return False
     649  
     650  
     651  class ESC[4;38;5;81mBrokenExecutor(ESC[4;38;5;149mRuntimeError):
     652      """
     653      Raised when a executor has become non-functional after a severe failure.
     654      """