python (3.11.7)

(root)/
lib/
python3.11/
asyncio/
futures.py
       1  """A Future class similar to the one in PEP 3148."""
       2  
       3  __all__ = (
       4      'Future', 'wrap_future', 'isfuture',
       5  )
       6  
       7  import concurrent.futures
       8  import contextvars
       9  import logging
      10  import sys
      11  from types import GenericAlias
      12  
      13  from . import base_futures
      14  from . import events
      15  from . import exceptions
      16  from . import format_helpers
      17  
      18  
      19  isfuture = base_futures.isfuture
      20  
      21  
      22  _PENDING = base_futures._PENDING
      23  _CANCELLED = base_futures._CANCELLED
      24  _FINISHED = base_futures._FINISHED
      25  
      26  
      27  STACK_DEBUG = logging.DEBUG - 1  # heavy-duty debugging
      28  
      29  
      30  class ESC[4;38;5;81mFuture:
      31      """This class is *almost* compatible with concurrent.futures.Future.
      32  
      33      Differences:
      34  
      35      - This class is not thread-safe.
      36  
      37      - result() and exception() do not take a timeout argument and
      38        raise an exception when the future isn't done yet.
      39  
      40      - Callbacks registered with add_done_callback() are always called
      41        via the event loop's call_soon().
      42  
      43      - This class is not compatible with the wait() and as_completed()
      44        methods in the concurrent.futures package.
      45  
      46      (In Python 3.4 or later we may be able to unify the implementations.)
      47      """
      48  
      49      # Class variables serving as defaults for instance variables.
      50      _state = _PENDING
      51      _result = None
      52      _exception = None
      53      _loop = None
      54      _source_traceback = None
      55      _cancel_message = None
      56      # A saved CancelledError for later chaining as an exception context.
      57      _cancelled_exc = None
      58  
      59      # This field is used for a dual purpose:
      60      # - Its presence is a marker to declare that a class implements
      61      #   the Future protocol (i.e. is intended to be duck-type compatible).
      62      #   The value must also be not-None, to enable a subclass to declare
      63      #   that it is not compatible by setting this to None.
      64      # - It is set by __iter__() below so that Task._step() can tell
      65      #   the difference between
      66      #   `await Future()` or`yield from Future()` (correct) vs.
      67      #   `yield Future()` (incorrect).
      68      _asyncio_future_blocking = False
      69  
      70      __log_traceback = False
      71  
      72      def __init__(self, *, loop=None):
      73          """Initialize the future.
      74  
      75          The optional event_loop argument allows explicitly setting the event
      76          loop object used by the future. If it's not provided, the future uses
      77          the default event loop.
      78          """
      79          if loop is None:
      80              self._loop = events._get_event_loop()
      81          else:
      82              self._loop = loop
      83          self._callbacks = []
      84          if self._loop.get_debug():
      85              self._source_traceback = format_helpers.extract_stack(
      86                  sys._getframe(1))
      87  
      88      def __repr__(self):
      89          return base_futures._future_repr(self)
      90  
      91      def __del__(self):
      92          if not self.__log_traceback:
      93              # set_exception() was not called, or result() or exception()
      94              # has consumed the exception
      95              return
      96          exc = self._exception
      97          context = {
      98              'message':
      99                  f'{self.__class__.__name__} exception was never retrieved',
     100              'exception': exc,
     101              'future': self,
     102          }
     103          if self._source_traceback:
     104              context['source_traceback'] = self._source_traceback
     105          self._loop.call_exception_handler(context)
     106  
     107      __class_getitem__ = classmethod(GenericAlias)
     108  
     109      @property
     110      def _log_traceback(self):
     111          return self.__log_traceback
     112  
     113      @_log_traceback.setter
     114      def _log_traceback(self, val):
     115          if val:
     116              raise ValueError('_log_traceback can only be set to False')
     117          self.__log_traceback = False
     118  
     119      def get_loop(self):
     120          """Return the event loop the Future is bound to."""
     121          loop = self._loop
     122          if loop is None:
     123              raise RuntimeError("Future object is not initialized.")
     124          return loop
     125  
     126      def _make_cancelled_error(self):
     127          """Create the CancelledError to raise if the Future is cancelled.
     128  
     129          This should only be called once when handling a cancellation since
     130          it erases the saved context exception value.
     131          """
     132          if self._cancelled_exc is not None:
     133              exc = self._cancelled_exc
     134              self._cancelled_exc = None
     135              return exc
     136  
     137          if self._cancel_message is None:
     138              exc = exceptions.CancelledError()
     139          else:
     140              exc = exceptions.CancelledError(self._cancel_message)
     141          exc.__context__ = self._cancelled_exc
     142          # Remove the reference since we don't need this anymore.
     143          self._cancelled_exc = None
     144          return exc
     145  
     146      def cancel(self, msg=None):
     147          """Cancel the future and schedule callbacks.
     148  
     149          If the future is already done or cancelled, return False.  Otherwise,
     150          change the future's state to cancelled, schedule the callbacks and
     151          return True.
     152          """
     153          self.__log_traceback = False
     154          if self._state != _PENDING:
     155              return False
     156          self._state = _CANCELLED
     157          self._cancel_message = msg
     158          self.__schedule_callbacks()
     159          return True
     160  
     161      def __schedule_callbacks(self):
     162          """Internal: Ask the event loop to call all callbacks.
     163  
     164          The callbacks are scheduled to be called as soon as possible. Also
     165          clears the callback list.
     166          """
     167          callbacks = self._callbacks[:]
     168          if not callbacks:
     169              return
     170  
     171          self._callbacks[:] = []
     172          for callback, ctx in callbacks:
     173              self._loop.call_soon(callback, self, context=ctx)
     174  
     175      def cancelled(self):
     176          """Return True if the future was cancelled."""
     177          return self._state == _CANCELLED
     178  
     179      # Don't implement running(); see http://bugs.python.org/issue18699
     180  
     181      def done(self):
     182          """Return True if the future is done.
     183  
     184          Done means either that a result / exception are available, or that the
     185          future was cancelled.
     186          """
     187          return self._state != _PENDING
     188  
     189      def result(self):
     190          """Return the result this future represents.
     191  
     192          If the future has been cancelled, raises CancelledError.  If the
     193          future's result isn't yet available, raises InvalidStateError.  If
     194          the future is done and has an exception set, this exception is raised.
     195          """
     196          if self._state == _CANCELLED:
     197              exc = self._make_cancelled_error()
     198              raise exc
     199          if self._state != _FINISHED:
     200              raise exceptions.InvalidStateError('Result is not ready.')
     201          self.__log_traceback = False
     202          if self._exception is not None:
     203              raise self._exception.with_traceback(self._exception_tb)
     204          return self._result
     205  
     206      def exception(self):
     207          """Return the exception that was set on this future.
     208  
     209          The exception (or None if no exception was set) is returned only if
     210          the future is done.  If the future has been cancelled, raises
     211          CancelledError.  If the future isn't done yet, raises
     212          InvalidStateError.
     213          """
     214          if self._state == _CANCELLED:
     215              exc = self._make_cancelled_error()
     216              raise exc
     217          if self._state != _FINISHED:
     218              raise exceptions.InvalidStateError('Exception is not set.')
     219          self.__log_traceback = False
     220          return self._exception
     221  
     222      def add_done_callback(self, fn, *, context=None):
     223          """Add a callback to be run when the future becomes done.
     224  
     225          The callback is called with a single argument - the future object. If
     226          the future is already done when this is called, the callback is
     227          scheduled with call_soon.
     228          """
     229          if self._state != _PENDING:
     230              self._loop.call_soon(fn, self, context=context)
     231          else:
     232              if context is None:
     233                  context = contextvars.copy_context()
     234              self._callbacks.append((fn, context))
     235  
     236      # New method not in PEP 3148.
     237  
     238      def remove_done_callback(self, fn):
     239          """Remove all instances of a callback from the "call when done" list.
     240  
     241          Returns the number of callbacks removed.
     242          """
     243          filtered_callbacks = [(f, ctx)
     244                                for (f, ctx) in self._callbacks
     245                                if f != fn]
     246          removed_count = len(self._callbacks) - len(filtered_callbacks)
     247          if removed_count:
     248              self._callbacks[:] = filtered_callbacks
     249          return removed_count
     250  
     251      # So-called internal methods (note: no set_running_or_notify_cancel()).
     252  
     253      def set_result(self, result):
     254          """Mark the future done and set its result.
     255  
     256          If the future is already done when this method is called, raises
     257          InvalidStateError.
     258          """
     259          if self._state != _PENDING:
     260              raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
     261          self._result = result
     262          self._state = _FINISHED
     263          self.__schedule_callbacks()
     264  
     265      def set_exception(self, exception):
     266          """Mark the future done and set an exception.
     267  
     268          If the future is already done when this method is called, raises
     269          InvalidStateError.
     270          """
     271          if self._state != _PENDING:
     272              raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
     273          if isinstance(exception, type):
     274              exception = exception()
     275          if type(exception) is StopIteration:
     276              raise TypeError("StopIteration interacts badly with generators "
     277                              "and cannot be raised into a Future")
     278          self._exception = exception
     279          self._exception_tb = exception.__traceback__
     280          self._state = _FINISHED
     281          self.__schedule_callbacks()
     282          self.__log_traceback = True
     283  
     284      def __await__(self):
     285          if not self.done():
     286              self._asyncio_future_blocking = True
     287              yield self  # This tells Task to wait for completion.
     288          if not self.done():
     289              raise RuntimeError("await wasn't used with future")
     290          return self.result()  # May raise too.
     291  
     292      __iter__ = __await__  # make compatible with 'yield from'.
     293  
     294  
     295  # Needed for testing purposes.
     296  _PyFuture = Future
     297  
     298  
     299  def _get_loop(fut):
     300      # Tries to call Future.get_loop() if it's available.
     301      # Otherwise fallbacks to using the old '_loop' property.
     302      try:
     303          get_loop = fut.get_loop
     304      except AttributeError:
     305          pass
     306      else:
     307          return get_loop()
     308      return fut._loop
     309  
     310  
     311  def _set_result_unless_cancelled(fut, result):
     312      """Helper setting the result only if the future was not cancelled."""
     313      if fut.cancelled():
     314          return
     315      fut.set_result(result)
     316  
     317  
     318  def _convert_future_exc(exc):
     319      exc_class = type(exc)
     320      if exc_class is concurrent.futures.CancelledError:
     321          return exceptions.CancelledError(*exc.args)
     322      elif exc_class is concurrent.futures.TimeoutError:
     323          return exceptions.TimeoutError(*exc.args)
     324      elif exc_class is concurrent.futures.InvalidStateError:
     325          return exceptions.InvalidStateError(*exc.args)
     326      else:
     327          return exc
     328  
     329  
     330  def _set_concurrent_future_state(concurrent, source):
     331      """Copy state from a future to a concurrent.futures.Future."""
     332      assert source.done()
     333      if source.cancelled():
     334          concurrent.cancel()
     335      if not concurrent.set_running_or_notify_cancel():
     336          return
     337      exception = source.exception()
     338      if exception is not None:
     339          concurrent.set_exception(_convert_future_exc(exception))
     340      else:
     341          result = source.result()
     342          concurrent.set_result(result)
     343  
     344  
     345  def _copy_future_state(source, dest):
     346      """Internal helper to copy state from another Future.
     347  
     348      The other Future may be a concurrent.futures.Future.
     349      """
     350      assert source.done()
     351      if dest.cancelled():
     352          return
     353      assert not dest.done()
     354      if source.cancelled():
     355          dest.cancel()
     356      else:
     357          exception = source.exception()
     358          if exception is not None:
     359              dest.set_exception(_convert_future_exc(exception))
     360          else:
     361              result = source.result()
     362              dest.set_result(result)
     363  
     364  
     365  def _chain_future(source, destination):
     366      """Chain two futures so that when one completes, so does the other.
     367  
     368      The result (or exception) of source will be copied to destination.
     369      If destination is cancelled, source gets cancelled too.
     370      Compatible with both asyncio.Future and concurrent.futures.Future.
     371      """
     372      if not isfuture(source) and not isinstance(source,
     373                                                 concurrent.futures.Future):
     374          raise TypeError('A future is required for source argument')
     375      if not isfuture(destination) and not isinstance(destination,
     376                                                      concurrent.futures.Future):
     377          raise TypeError('A future is required for destination argument')
     378      source_loop = _get_loop(source) if isfuture(source) else None
     379      dest_loop = _get_loop(destination) if isfuture(destination) else None
     380  
     381      def _set_state(future, other):
     382          if isfuture(future):
     383              _copy_future_state(other, future)
     384          else:
     385              _set_concurrent_future_state(future, other)
     386  
     387      def _call_check_cancel(destination):
     388          if destination.cancelled():
     389              if source_loop is None or source_loop is dest_loop:
     390                  source.cancel()
     391              else:
     392                  source_loop.call_soon_threadsafe(source.cancel)
     393  
     394      def _call_set_state(source):
     395          if (destination.cancelled() and
     396                  dest_loop is not None and dest_loop.is_closed()):
     397              return
     398          if dest_loop is None or dest_loop is source_loop:
     399              _set_state(destination, source)
     400          else:
     401              if dest_loop.is_closed():
     402                  return
     403              dest_loop.call_soon_threadsafe(_set_state, destination, source)
     404  
     405      destination.add_done_callback(_call_check_cancel)
     406      source.add_done_callback(_call_set_state)
     407  
     408  
     409  def wrap_future(future, *, loop=None):
     410      """Wrap concurrent.futures.Future object."""
     411      if isfuture(future):
     412          return future
     413      assert isinstance(future, concurrent.futures.Future), \
     414          f'concurrent.futures.Future is expected, got {future!r}'
     415      if loop is None:
     416          loop = events._get_event_loop()
     417      new_future = loop.create_future()
     418      _chain_future(future, new_future)
     419      return new_future
     420  
     421  
     422  try:
     423      import _asyncio
     424  except ImportError:
     425      pass
     426  else:
     427      # _CFuture is needed for tests.
     428      Future = _CFuture = _asyncio.Future