(root)/
Python-3.12.0/
Lib/
asyncio/
tasks.py
       1  """Support for tasks, coroutines and the scheduler."""
       2  
       3  __all__ = (
       4      'Task', 'create_task',
       5      'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
       6      'wait', 'wait_for', 'as_completed', 'sleep',
       7      'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
       8      'current_task', 'all_tasks',
       9      'create_eager_task_factory', 'eager_task_factory',
      10      '_register_task', '_unregister_task', '_enter_task', '_leave_task',
      11  )
      12  
      13  import concurrent.futures
      14  import contextvars
      15  import functools
      16  import inspect
      17  import itertools
      18  import types
      19  import warnings
      20  import weakref
      21  from types import GenericAlias
      22  
      23  from . import base_tasks
      24  from . import coroutines
      25  from . import events
      26  from . import exceptions
      27  from . import futures
      28  from . import timeouts
      29  
      30  # Helper to generate new task names
      31  # This uses itertools.count() instead of a "+= 1" operation because the latter
      32  # is not thread safe. See bpo-11866 for a longer explanation.
      33  _task_name_counter = itertools.count(1).__next__
      34  
      35  
      36  def current_task(loop=None):
      37      """Return a currently executed task."""
      38      if loop is None:
      39          loop = events.get_running_loop()
      40      return _current_tasks.get(loop)
      41  
      42  
      43  def all_tasks(loop=None):
      44      """Return a set of all tasks for the loop."""
      45      if loop is None:
      46          loop = events.get_running_loop()
      47      # capturing the set of eager tasks first, so if an eager task "graduates"
      48      # to a regular task in another thread, we don't risk missing it.
      49      eager_tasks = list(_eager_tasks)
      50      # Looping over the WeakSet isn't safe as it can be updated from another
      51      # thread, therefore we cast it to list prior to filtering. The list cast
      52      # itself requires iteration, so we repeat it several times ignoring
      53      # RuntimeErrors (which are not very likely to occur).
      54      # See issues 34970 and 36607 for details.
      55      scheduled_tasks = None
      56      i = 0
      57      while True:
      58          try:
      59              scheduled_tasks = list(_scheduled_tasks)
      60          except RuntimeError:
      61              i += 1
      62              if i >= 1000:
      63                  raise
      64          else:
      65              break
      66      return {t for t in itertools.chain(scheduled_tasks, eager_tasks)
      67              if futures._get_loop(t) is loop and not t.done()}
      68  
      69  
      70  def _set_task_name(task, name):
      71      if name is not None:
      72          try:
      73              set_name = task.set_name
      74          except AttributeError:
      75              warnings.warn("Task.set_name() was added in Python 3.8, "
      76                        "the method support will be mandatory for third-party "
      77                        "task implementations since 3.13.",
      78                        DeprecationWarning, stacklevel=3)
      79          else:
      80              set_name(name)
      81  
      82  
      83  class ESC[4;38;5;81mTask(ESC[4;38;5;149mfuturesESC[4;38;5;149m.ESC[4;38;5;149m_PyFuture):  # Inherit Python Task implementation
      84                                  # from a Python Future implementation.
      85  
      86      """A coroutine wrapped in a Future."""
      87  
      88      # An important invariant maintained while a Task not done:
      89      #
      90      # - Either _fut_waiter is None, and _step() is scheduled;
      91      # - or _fut_waiter is some Future, and _step() is *not* scheduled.
      92      #
      93      # The only transition from the latter to the former is through
      94      # _wakeup().  When _fut_waiter is not None, one of its callbacks
      95      # must be _wakeup().
      96  
      97      # If False, don't log a message if the task is destroyed whereas its
      98      # status is still pending
      99      _log_destroy_pending = True
     100  
     101      def __init__(self, coro, *, loop=None, name=None, context=None,
     102                   eager_start=False):
     103          super().__init__(loop=loop)
     104          if self._source_traceback:
     105              del self._source_traceback[-1]
     106          if not coroutines.iscoroutine(coro):
     107              # raise after Future.__init__(), attrs are required for __del__
     108              # prevent logging for pending task in __del__
     109              self._log_destroy_pending = False
     110              raise TypeError(f"a coroutine was expected, got {coro!r}")
     111  
     112          if name is None:
     113              self._name = f'Task-{_task_name_counter()}'
     114          else:
     115              self._name = str(name)
     116  
     117          self._num_cancels_requested = 0
     118          self._must_cancel = False
     119          self._fut_waiter = None
     120          self._coro = coro
     121          if context is None:
     122              self._context = contextvars.copy_context()
     123          else:
     124              self._context = context
     125  
     126          if eager_start and self._loop.is_running():
     127              self.__eager_start()
     128          else:
     129              self._loop.call_soon(self.__step, context=self._context)
     130              _register_task(self)
     131  
     132      def __del__(self):
     133          if self._state == futures._PENDING and self._log_destroy_pending:
     134              context = {
     135                  'task': self,
     136                  'message': 'Task was destroyed but it is pending!',
     137              }
     138              if self._source_traceback:
     139                  context['source_traceback'] = self._source_traceback
     140              self._loop.call_exception_handler(context)
     141          super().__del__()
     142  
     143      __class_getitem__ = classmethod(GenericAlias)
     144  
     145      def __repr__(self):
     146          return base_tasks._task_repr(self)
     147  
     148      def get_coro(self):
     149          return self._coro
     150  
     151      def get_context(self):
     152          return self._context
     153  
     154      def get_name(self):
     155          return self._name
     156  
     157      def set_name(self, value):
     158          self._name = str(value)
     159  
     160      def set_result(self, result):
     161          raise RuntimeError('Task does not support set_result operation')
     162  
     163      def set_exception(self, exception):
     164          raise RuntimeError('Task does not support set_exception operation')
     165  
     166      def get_stack(self, *, limit=None):
     167          """Return the list of stack frames for this task's coroutine.
     168  
     169          If the coroutine is not done, this returns the stack where it is
     170          suspended.  If the coroutine has completed successfully or was
     171          cancelled, this returns an empty list.  If the coroutine was
     172          terminated by an exception, this returns the list of traceback
     173          frames.
     174  
     175          The frames are always ordered from oldest to newest.
     176  
     177          The optional limit gives the maximum number of frames to
     178          return; by default all available frames are returned.  Its
     179          meaning differs depending on whether a stack or a traceback is
     180          returned: the newest frames of a stack are returned, but the
     181          oldest frames of a traceback are returned.  (This matches the
     182          behavior of the traceback module.)
     183  
     184          For reasons beyond our control, only one stack frame is
     185          returned for a suspended coroutine.
     186          """
     187          return base_tasks._task_get_stack(self, limit)
     188  
     189      def print_stack(self, *, limit=None, file=None):
     190          """Print the stack or traceback for this task's coroutine.
     191  
     192          This produces output similar to that of the traceback module,
     193          for the frames retrieved by get_stack().  The limit argument
     194          is passed to get_stack().  The file argument is an I/O stream
     195          to which the output is written; by default output is written
     196          to sys.stderr.
     197          """
     198          return base_tasks._task_print_stack(self, limit, file)
     199  
     200      def cancel(self, msg=None):
     201          """Request that this task cancel itself.
     202  
     203          This arranges for a CancelledError to be thrown into the
     204          wrapped coroutine on the next cycle through the event loop.
     205          The coroutine then has a chance to clean up or even deny
     206          the request using try/except/finally.
     207  
     208          Unlike Future.cancel, this does not guarantee that the
     209          task will be cancelled: the exception might be caught and
     210          acted upon, delaying cancellation of the task or preventing
     211          cancellation completely.  The task may also return a value or
     212          raise a different exception.
     213  
     214          Immediately after this method is called, Task.cancelled() will
     215          not return True (unless the task was already cancelled).  A
     216          task will be marked as cancelled when the wrapped coroutine
     217          terminates with a CancelledError exception (even if cancel()
     218          was not called).
     219  
     220          This also increases the task's count of cancellation requests.
     221          """
     222          self._log_traceback = False
     223          if self.done():
     224              return False
     225          self._num_cancels_requested += 1
     226          # These two lines are controversial.  See discussion starting at
     227          # https://github.com/python/cpython/pull/31394#issuecomment-1053545331
     228          # Also remember that this is duplicated in _asynciomodule.c.
     229          # if self._num_cancels_requested > 1:
     230          #     return False
     231          if self._fut_waiter is not None:
     232              if self._fut_waiter.cancel(msg=msg):
     233                  # Leave self._fut_waiter; it may be a Task that
     234                  # catches and ignores the cancellation so we may have
     235                  # to cancel it again later.
     236                  return True
     237          # It must be the case that self.__step is already scheduled.
     238          self._must_cancel = True
     239          self._cancel_message = msg
     240          return True
     241  
     242      def cancelling(self):
     243          """Return the count of the task's cancellation requests.
     244  
     245          This count is incremented when .cancel() is called
     246          and may be decremented using .uncancel().
     247          """
     248          return self._num_cancels_requested
     249  
     250      def uncancel(self):
     251          """Decrement the task's count of cancellation requests.
     252  
     253          This should be called by the party that called `cancel()` on the task
     254          beforehand.
     255  
     256          Returns the remaining number of cancellation requests.
     257          """
     258          if self._num_cancels_requested > 0:
     259              self._num_cancels_requested -= 1
     260          return self._num_cancels_requested
     261  
     262      def __eager_start(self):
     263          prev_task = _swap_current_task(self._loop, self)
     264          try:
     265              _register_eager_task(self)
     266              try:
     267                  self._context.run(self.__step_run_and_handle_result, None)
     268              finally:
     269                  _unregister_eager_task(self)
     270          finally:
     271              try:
     272                  curtask = _swap_current_task(self._loop, prev_task)
     273                  assert curtask is self
     274              finally:
     275                  if self.done():
     276                      self._coro = None
     277                      self = None  # Needed to break cycles when an exception occurs.
     278                  else:
     279                      _register_task(self)
     280  
     281      def __step(self, exc=None):
     282          if self.done():
     283              raise exceptions.InvalidStateError(
     284                  f'_step(): already done: {self!r}, {exc!r}')
     285          if self._must_cancel:
     286              if not isinstance(exc, exceptions.CancelledError):
     287                  exc = self._make_cancelled_error()
     288              self._must_cancel = False
     289          self._fut_waiter = None
     290  
     291          _enter_task(self._loop, self)
     292          try:
     293              self.__step_run_and_handle_result(exc)
     294          finally:
     295              _leave_task(self._loop, self)
     296              self = None  # Needed to break cycles when an exception occurs.
     297  
     298      def __step_run_and_handle_result(self, exc):
     299          coro = self._coro
     300          try:
     301              if exc is None:
     302                  # We use the `send` method directly, because coroutines
     303                  # don't have `__iter__` and `__next__` methods.
     304                  result = coro.send(None)
     305              else:
     306                  result = coro.throw(exc)
     307          except StopIteration as exc:
     308              if self._must_cancel:
     309                  # Task is cancelled right before coro stops.
     310                  self._must_cancel = False
     311                  super().cancel(msg=self._cancel_message)
     312              else:
     313                  super().set_result(exc.value)
     314          except exceptions.CancelledError as exc:
     315              # Save the original exception so we can chain it later.
     316              self._cancelled_exc = exc
     317              super().cancel()  # I.e., Future.cancel(self).
     318          except (KeyboardInterrupt, SystemExit) as exc:
     319              super().set_exception(exc)
     320              raise
     321          except BaseException as exc:
     322              super().set_exception(exc)
     323          else:
     324              blocking = getattr(result, '_asyncio_future_blocking', None)
     325              if blocking is not None:
     326                  # Yielded Future must come from Future.__iter__().
     327                  if futures._get_loop(result) is not self._loop:
     328                      new_exc = RuntimeError(
     329                          f'Task {self!r} got Future '
     330                          f'{result!r} attached to a different loop')
     331                      self._loop.call_soon(
     332                          self.__step, new_exc, context=self._context)
     333                  elif blocking:
     334                      if result is self:
     335                          new_exc = RuntimeError(
     336                              f'Task cannot await on itself: {self!r}')
     337                          self._loop.call_soon(
     338                              self.__step, new_exc, context=self._context)
     339                      else:
     340                          result._asyncio_future_blocking = False
     341                          result.add_done_callback(
     342                              self.__wakeup, context=self._context)
     343                          self._fut_waiter = result
     344                          if self._must_cancel:
     345                              if self._fut_waiter.cancel(
     346                                      msg=self._cancel_message):
     347                                  self._must_cancel = False
     348                  else:
     349                      new_exc = RuntimeError(
     350                          f'yield was used instead of yield from '
     351                          f'in task {self!r} with {result!r}')
     352                      self._loop.call_soon(
     353                          self.__step, new_exc, context=self._context)
     354  
     355              elif result is None:
     356                  # Bare yield relinquishes control for one event loop iteration.
     357                  self._loop.call_soon(self.__step, context=self._context)
     358              elif inspect.isgenerator(result):
     359                  # Yielding a generator is just wrong.
     360                  new_exc = RuntimeError(
     361                      f'yield was used instead of yield from for '
     362                      f'generator in task {self!r} with {result!r}')
     363                  self._loop.call_soon(
     364                      self.__step, new_exc, context=self._context)
     365              else:
     366                  # Yielding something else is an error.
     367                  new_exc = RuntimeError(f'Task got bad yield: {result!r}')
     368                  self._loop.call_soon(
     369                      self.__step, new_exc, context=self._context)
     370          finally:
     371              self = None  # Needed to break cycles when an exception occurs.
     372  
     373      def __wakeup(self, future):
     374          try:
     375              future.result()
     376          except BaseException as exc:
     377              # This may also be a cancellation.
     378              self.__step(exc)
     379          else:
     380              # Don't pass the value of `future.result()` explicitly,
     381              # as `Future.__iter__` and `Future.__await__` don't need it.
     382              # If we call `_step(value, None)` instead of `_step()`,
     383              # Python eval loop would use `.send(value)` method call,
     384              # instead of `__next__()`, which is slower for futures
     385              # that return non-generator iterators from their `__iter__`.
     386              self.__step()
     387          self = None  # Needed to break cycles when an exception occurs.
     388  
     389  
     390  _PyTask = Task
     391  
     392  
     393  try:
     394      import _asyncio
     395  except ImportError:
     396      pass
     397  else:
     398      # _CTask is needed for tests.
     399      Task = _CTask = _asyncio.Task
     400  
     401  
     402  def create_task(coro, *, name=None, context=None):
     403      """Schedule the execution of a coroutine object in a spawn task.
     404  
     405      Return a Task object.
     406      """
     407      loop = events.get_running_loop()
     408      if context is None:
     409          # Use legacy API if context is not needed
     410          task = loop.create_task(coro)
     411      else:
     412          task = loop.create_task(coro, context=context)
     413  
     414      _set_task_name(task, name)
     415      return task
     416  
     417  
     418  # wait() and as_completed() similar to those in PEP 3148.
     419  
     420  FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
     421  FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
     422  ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
     423  
     424  
     425  async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
     426      """Wait for the Futures or Tasks given by fs to complete.
     427  
     428      The fs iterable must not be empty.
     429  
     430      Coroutines will be wrapped in Tasks.
     431  
     432      Returns two sets of Future: (done, pending).
     433  
     434      Usage:
     435  
     436          done, pending = await asyncio.wait(fs)
     437  
     438      Note: This does not raise TimeoutError! Futures that aren't done
     439      when the timeout occurs are returned in the second set.
     440      """
     441      if futures.isfuture(fs) or coroutines.iscoroutine(fs):
     442          raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
     443      if not fs:
     444          raise ValueError('Set of Tasks/Futures is empty.')
     445      if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
     446          raise ValueError(f'Invalid return_when value: {return_when}')
     447  
     448      fs = set(fs)
     449  
     450      if any(coroutines.iscoroutine(f) for f in fs):
     451          raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
     452  
     453      loop = events.get_running_loop()
     454      return await _wait(fs, timeout, return_when, loop)
     455  
     456  
     457  def _release_waiter(waiter, *args):
     458      if not waiter.done():
     459          waiter.set_result(None)
     460  
     461  
     462  async def wait_for(fut, timeout):
     463      """Wait for the single Future or coroutine to complete, with timeout.
     464  
     465      Coroutine will be wrapped in Task.
     466  
     467      Returns result of the Future or coroutine.  When a timeout occurs,
     468      it cancels the task and raises TimeoutError.  To avoid the task
     469      cancellation, wrap it in shield().
     470  
     471      If the wait is cancelled, the task is also cancelled.
     472  
     473      If the task supresses the cancellation and returns a value instead,
     474      that value is returned.
     475  
     476      This function is a coroutine.
     477      """
     478      # The special case for timeout <= 0 is for the following case:
     479      #
     480      # async def test_waitfor():
     481      #     func_started = False
     482      #
     483      #     async def func():
     484      #         nonlocal func_started
     485      #         func_started = True
     486      #
     487      #     try:
     488      #         await asyncio.wait_for(func(), 0)
     489      #     except asyncio.TimeoutError:
     490      #         assert not func_started
     491      #     else:
     492      #         assert False
     493      #
     494      # asyncio.run(test_waitfor())
     495  
     496  
     497      if timeout is not None and timeout <= 0:
     498          fut = ensure_future(fut)
     499  
     500          if fut.done():
     501              return fut.result()
     502  
     503          await _cancel_and_wait(fut)
     504          try:
     505              return fut.result()
     506          except exceptions.CancelledError as exc:
     507              raise TimeoutError from exc
     508  
     509      async with timeouts.timeout(timeout):
     510          return await fut
     511  
     512  async def _wait(fs, timeout, return_when, loop):
     513      """Internal helper for wait().
     514  
     515      The fs argument must be a collection of Futures.
     516      """
     517      assert fs, 'Set of Futures is empty.'
     518      waiter = loop.create_future()
     519      timeout_handle = None
     520      if timeout is not None:
     521          timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
     522      counter = len(fs)
     523  
     524      def _on_completion(f):
     525          nonlocal counter
     526          counter -= 1
     527          if (counter <= 0 or
     528              return_when == FIRST_COMPLETED or
     529              return_when == FIRST_EXCEPTION and (not f.cancelled() and
     530                                                  f.exception() is not None)):
     531              if timeout_handle is not None:
     532                  timeout_handle.cancel()
     533              if not waiter.done():
     534                  waiter.set_result(None)
     535  
     536      for f in fs:
     537          f.add_done_callback(_on_completion)
     538  
     539      try:
     540          await waiter
     541      finally:
     542          if timeout_handle is not None:
     543              timeout_handle.cancel()
     544          for f in fs:
     545              f.remove_done_callback(_on_completion)
     546  
     547      done, pending = set(), set()
     548      for f in fs:
     549          if f.done():
     550              done.add(f)
     551          else:
     552              pending.add(f)
     553      return done, pending
     554  
     555  
     556  async def _cancel_and_wait(fut):
     557      """Cancel the *fut* future or task and wait until it completes."""
     558  
     559      loop = events.get_running_loop()
     560      waiter = loop.create_future()
     561      cb = functools.partial(_release_waiter, waiter)
     562      fut.add_done_callback(cb)
     563  
     564      try:
     565          fut.cancel()
     566          # We cannot wait on *fut* directly to make
     567          # sure _cancel_and_wait itself is reliably cancellable.
     568          await waiter
     569      finally:
     570          fut.remove_done_callback(cb)
     571  
     572  
     573  # This is *not* a @coroutine!  It is just an iterator (yielding Futures).
     574  def as_completed(fs, *, timeout=None):
     575      """Return an iterator whose values are coroutines.
     576  
     577      When waiting for the yielded coroutines you'll get the results (or
     578      exceptions!) of the original Futures (or coroutines), in the order
     579      in which and as soon as they complete.
     580  
     581      This differs from PEP 3148; the proper way to use this is:
     582  
     583          for f in as_completed(fs):
     584              result = await f  # The 'await' may raise.
     585              # Use result.
     586  
     587      If a timeout is specified, the 'await' will raise
     588      TimeoutError when the timeout occurs before all Futures are done.
     589  
     590      Note: The futures 'f' are not necessarily members of fs.
     591      """
     592      if futures.isfuture(fs) or coroutines.iscoroutine(fs):
     593          raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
     594  
     595      from .queues import Queue  # Import here to avoid circular import problem.
     596      done = Queue()
     597  
     598      loop = events.get_event_loop()
     599      todo = {ensure_future(f, loop=loop) for f in set(fs)}
     600      timeout_handle = None
     601  
     602      def _on_timeout():
     603          for f in todo:
     604              f.remove_done_callback(_on_completion)
     605              done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
     606          todo.clear()  # Can't do todo.remove(f) in the loop.
     607  
     608      def _on_completion(f):
     609          if not todo:
     610              return  # _on_timeout() was here first.
     611          todo.remove(f)
     612          done.put_nowait(f)
     613          if not todo and timeout_handle is not None:
     614              timeout_handle.cancel()
     615  
     616      async def _wait_for_one():
     617          f = await done.get()
     618          if f is None:
     619              # Dummy value from _on_timeout().
     620              raise exceptions.TimeoutError
     621          return f.result()  # May raise f.exception().
     622  
     623      for f in todo:
     624          f.add_done_callback(_on_completion)
     625      if todo and timeout is not None:
     626          timeout_handle = loop.call_later(timeout, _on_timeout)
     627      for _ in range(len(todo)):
     628          yield _wait_for_one()
     629  
     630  
     631  @types.coroutine
     632  def __sleep0():
     633      """Skip one event loop run cycle.
     634  
     635      This is a private helper for 'asyncio.sleep()', used
     636      when the 'delay' is set to 0.  It uses a bare 'yield'
     637      expression (which Task.__step knows how to handle)
     638      instead of creating a Future object.
     639      """
     640      yield
     641  
     642  
     643  async def sleep(delay, result=None):
     644      """Coroutine that completes after a given time (in seconds)."""
     645      if delay <= 0:
     646          await __sleep0()
     647          return result
     648  
     649      loop = events.get_running_loop()
     650      future = loop.create_future()
     651      h = loop.call_later(delay,
     652                          futures._set_result_unless_cancelled,
     653                          future, result)
     654      try:
     655          return await future
     656      finally:
     657          h.cancel()
     658  
     659  
     660  def ensure_future(coro_or_future, *, loop=None):
     661      """Wrap a coroutine or an awaitable in a future.
     662  
     663      If the argument is a Future, it is returned directly.
     664      """
     665      if futures.isfuture(coro_or_future):
     666          if loop is not None and loop is not futures._get_loop(coro_or_future):
     667              raise ValueError('The future belongs to a different loop than '
     668                              'the one specified as the loop argument')
     669          return coro_or_future
     670      should_close = True
     671      if not coroutines.iscoroutine(coro_or_future):
     672          if inspect.isawaitable(coro_or_future):
     673              async def _wrap_awaitable(awaitable):
     674                  return await awaitable
     675  
     676              coro_or_future = _wrap_awaitable(coro_or_future)
     677              should_close = False
     678          else:
     679              raise TypeError('An asyncio.Future, a coroutine or an awaitable '
     680                              'is required')
     681  
     682      if loop is None:
     683          loop = events.get_event_loop()
     684      try:
     685          return loop.create_task(coro_or_future)
     686      except RuntimeError:
     687          if should_close:
     688              coro_or_future.close()
     689          raise
     690  
     691  
     692  class ESC[4;38;5;81m_GatheringFuture(ESC[4;38;5;149mfuturesESC[4;38;5;149m.ESC[4;38;5;149mFuture):
     693      """Helper for gather().
     694  
     695      This overrides cancel() to cancel all the children and act more
     696      like Task.cancel(), which doesn't immediately mark itself as
     697      cancelled.
     698      """
     699  
     700      def __init__(self, children, *, loop):
     701          assert loop is not None
     702          super().__init__(loop=loop)
     703          self._children = children
     704          self._cancel_requested = False
     705  
     706      def cancel(self, msg=None):
     707          if self.done():
     708              return False
     709          ret = False
     710          for child in self._children:
     711              if child.cancel(msg=msg):
     712                  ret = True
     713          if ret:
     714              # If any child tasks were actually cancelled, we should
     715              # propagate the cancellation request regardless of
     716              # *return_exceptions* argument.  See issue 32684.
     717              self._cancel_requested = True
     718          return ret
     719  
     720  
     721  def gather(*coros_or_futures, return_exceptions=False):
     722      """Return a future aggregating results from the given coroutines/futures.
     723  
     724      Coroutines will be wrapped in a future and scheduled in the event
     725      loop. They will not necessarily be scheduled in the same order as
     726      passed in.
     727  
     728      All futures must share the same event loop.  If all the tasks are
     729      done successfully, the returned future's result is the list of
     730      results (in the order of the original sequence, not necessarily
     731      the order of results arrival).  If *return_exceptions* is True,
     732      exceptions in the tasks are treated the same as successful
     733      results, and gathered in the result list; otherwise, the first
     734      raised exception will be immediately propagated to the returned
     735      future.
     736  
     737      Cancellation: if the outer Future is cancelled, all children (that
     738      have not completed yet) are also cancelled.  If any child is
     739      cancelled, this is treated as if it raised CancelledError --
     740      the outer Future is *not* cancelled in this case.  (This is to
     741      prevent the cancellation of one child to cause other children to
     742      be cancelled.)
     743  
     744      If *return_exceptions* is False, cancelling gather() after it
     745      has been marked done won't cancel any submitted awaitables.
     746      For instance, gather can be marked done after propagating an
     747      exception to the caller, therefore, calling ``gather.cancel()``
     748      after catching an exception (raised by one of the awaitables) from
     749      gather won't cancel any other awaitables.
     750      """
     751      if not coros_or_futures:
     752          loop = events.get_event_loop()
     753          outer = loop.create_future()
     754          outer.set_result([])
     755          return outer
     756  
     757      def _done_callback(fut):
     758          nonlocal nfinished
     759          nfinished += 1
     760  
     761          if outer is None or outer.done():
     762              if not fut.cancelled():
     763                  # Mark exception retrieved.
     764                  fut.exception()
     765              return
     766  
     767          if not return_exceptions:
     768              if fut.cancelled():
     769                  # Check if 'fut' is cancelled first, as
     770                  # 'fut.exception()' will *raise* a CancelledError
     771                  # instead of returning it.
     772                  exc = fut._make_cancelled_error()
     773                  outer.set_exception(exc)
     774                  return
     775              else:
     776                  exc = fut.exception()
     777                  if exc is not None:
     778                      outer.set_exception(exc)
     779                      return
     780  
     781          if nfinished == nfuts:
     782              # All futures are done; create a list of results
     783              # and set it to the 'outer' future.
     784              results = []
     785  
     786              for fut in children:
     787                  if fut.cancelled():
     788                      # Check if 'fut' is cancelled first, as 'fut.exception()'
     789                      # will *raise* a CancelledError instead of returning it.
     790                      # Also, since we're adding the exception return value
     791                      # to 'results' instead of raising it, don't bother
     792                      # setting __context__.  This also lets us preserve
     793                      # calling '_make_cancelled_error()' at most once.
     794                      res = exceptions.CancelledError(
     795                          '' if fut._cancel_message is None else
     796                          fut._cancel_message)
     797                  else:
     798                      res = fut.exception()
     799                      if res is None:
     800                          res = fut.result()
     801                  results.append(res)
     802  
     803              if outer._cancel_requested:
     804                  # If gather is being cancelled we must propagate the
     805                  # cancellation regardless of *return_exceptions* argument.
     806                  # See issue 32684.
     807                  exc = fut._make_cancelled_error()
     808                  outer.set_exception(exc)
     809              else:
     810                  outer.set_result(results)
     811  
     812      arg_to_fut = {}
     813      children = []
     814      nfuts = 0
     815      nfinished = 0
     816      done_futs = []
     817      loop = None
     818      outer = None  # bpo-46672
     819      for arg in coros_or_futures:
     820          if arg not in arg_to_fut:
     821              fut = ensure_future(arg, loop=loop)
     822              if loop is None:
     823                  loop = futures._get_loop(fut)
     824              if fut is not arg:
     825                  # 'arg' was not a Future, therefore, 'fut' is a new
     826                  # Future created specifically for 'arg'.  Since the caller
     827                  # can't control it, disable the "destroy pending task"
     828                  # warning.
     829                  fut._log_destroy_pending = False
     830  
     831              nfuts += 1
     832              arg_to_fut[arg] = fut
     833              if fut.done():
     834                  done_futs.append(fut)
     835              else:
     836                  fut.add_done_callback(_done_callback)
     837  
     838          else:
     839              # There's a duplicate Future object in coros_or_futures.
     840              fut = arg_to_fut[arg]
     841  
     842          children.append(fut)
     843  
     844      outer = _GatheringFuture(children, loop=loop)
     845      # Run done callbacks after GatheringFuture created so any post-processing
     846      # can be performed at this point
     847      # optimization: in the special case that *all* futures finished eagerly,
     848      # this will effectively complete the gather eagerly, with the last
     849      # callback setting the result (or exception) on outer before returning it
     850      for fut in done_futs:
     851          _done_callback(fut)
     852      return outer
     853  
     854  
     855  def shield(arg):
     856      """Wait for a future, shielding it from cancellation.
     857  
     858      The statement
     859  
     860          task = asyncio.create_task(something())
     861          res = await shield(task)
     862  
     863      is exactly equivalent to the statement
     864  
     865          res = await something()
     866  
     867      *except* that if the coroutine containing it is cancelled, the
     868      task running in something() is not cancelled.  From the POV of
     869      something(), the cancellation did not happen.  But its caller is
     870      still cancelled, so the yield-from expression still raises
     871      CancelledError.  Note: If something() is cancelled by other means
     872      this will still cancel shield().
     873  
     874      If you want to completely ignore cancellation (not recommended)
     875      you can combine shield() with a try/except clause, as follows:
     876  
     877          task = asyncio.create_task(something())
     878          try:
     879              res = await shield(task)
     880          except CancelledError:
     881              res = None
     882  
     883      Save a reference to tasks passed to this function, to avoid
     884      a task disappearing mid-execution. The event loop only keeps
     885      weak references to tasks. A task that isn't referenced elsewhere
     886      may get garbage collected at any time, even before it's done.
     887      """
     888      inner = ensure_future(arg)
     889      if inner.done():
     890          # Shortcut.
     891          return inner
     892      loop = futures._get_loop(inner)
     893      outer = loop.create_future()
     894  
     895      def _inner_done_callback(inner):
     896          if outer.cancelled():
     897              if not inner.cancelled():
     898                  # Mark inner's result as retrieved.
     899                  inner.exception()
     900              return
     901  
     902          if inner.cancelled():
     903              outer.cancel()
     904          else:
     905              exc = inner.exception()
     906              if exc is not None:
     907                  outer.set_exception(exc)
     908              else:
     909                  outer.set_result(inner.result())
     910  
     911  
     912      def _outer_done_callback(outer):
     913          if not inner.done():
     914              inner.remove_done_callback(_inner_done_callback)
     915  
     916      inner.add_done_callback(_inner_done_callback)
     917      outer.add_done_callback(_outer_done_callback)
     918      return outer
     919  
     920  
     921  def run_coroutine_threadsafe(coro, loop):
     922      """Submit a coroutine object to a given event loop.
     923  
     924      Return a concurrent.futures.Future to access the result.
     925      """
     926      if not coroutines.iscoroutine(coro):
     927          raise TypeError('A coroutine object is required')
     928      future = concurrent.futures.Future()
     929  
     930      def callback():
     931          try:
     932              futures._chain_future(ensure_future(coro, loop=loop), future)
     933          except (SystemExit, KeyboardInterrupt):
     934              raise
     935          except BaseException as exc:
     936              if future.set_running_or_notify_cancel():
     937                  future.set_exception(exc)
     938              raise
     939  
     940      loop.call_soon_threadsafe(callback)
     941      return future
     942  
     943  
     944  def create_eager_task_factory(custom_task_constructor):
     945      """Create a function suitable for use as a task factory on an event-loop.
     946  
     947          Example usage:
     948  
     949              loop.set_task_factory(
     950                  asyncio.create_eager_task_factory(my_task_constructor))
     951  
     952          Now, tasks created will be started immediately (rather than being first
     953          scheduled to an event loop). The constructor argument can be any callable
     954          that returns a Task-compatible object and has a signature compatible
     955          with `Task.__init__`; it must have the `eager_start` keyword argument.
     956  
     957          Most applications will use `Task` for `custom_task_constructor` and in
     958          this case there's no need to call `create_eager_task_factory()`
     959          directly. Instead the  global `eager_task_factory` instance can be
     960          used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`.
     961          """
     962  
     963      def factory(loop, coro, *, name=None, context=None):
     964          return custom_task_constructor(
     965              coro, loop=loop, name=name, context=context, eager_start=True)
     966  
     967      return factory
     968  
     969  
     970  eager_task_factory = create_eager_task_factory(Task)
     971  
     972  
     973  # Collectively these two sets hold references to the complete set of active
     974  # tasks. Eagerly executed tasks use a faster regular set as an optimization
     975  # but may graduate to a WeakSet if the task blocks on IO.
     976  _scheduled_tasks = weakref.WeakSet()
     977  _eager_tasks = set()
     978  
     979  # Dictionary containing tasks that are currently active in
     980  # all running event loops.  {EventLoop: Task}
     981  _current_tasks = {}
     982  
     983  
     984  def _register_task(task):
     985      """Register an asyncio Task scheduled to run on an event loop."""
     986      _scheduled_tasks.add(task)
     987  
     988  
     989  def _register_eager_task(task):
     990      """Register an asyncio Task about to be eagerly executed."""
     991      _eager_tasks.add(task)
     992  
     993  
     994  def _enter_task(loop, task):
     995      current_task = _current_tasks.get(loop)
     996      if current_task is not None:
     997          raise RuntimeError(f"Cannot enter into task {task!r} while another "
     998                             f"task {current_task!r} is being executed.")
     999      _current_tasks[loop] = task
    1000  
    1001  
    1002  def _leave_task(loop, task):
    1003      current_task = _current_tasks.get(loop)
    1004      if current_task is not task:
    1005          raise RuntimeError(f"Leaving task {task!r} does not match "
    1006                             f"the current task {current_task!r}.")
    1007      del _current_tasks[loop]
    1008  
    1009  
    1010  def _swap_current_task(loop, task):
    1011      prev_task = _current_tasks.get(loop)
    1012      if task is None:
    1013          del _current_tasks[loop]
    1014      else:
    1015          _current_tasks[loop] = task
    1016      return prev_task
    1017  
    1018  
    1019  def _unregister_task(task):
    1020      """Unregister a completed, scheduled Task."""
    1021      _scheduled_tasks.discard(task)
    1022  
    1023  
    1024  def _unregister_eager_task(task):
    1025      """Unregister a task which finished its first eager step."""
    1026      _eager_tasks.discard(task)
    1027  
    1028  
    1029  _py_current_task = current_task
    1030  _py_register_task = _register_task
    1031  _py_register_eager_task = _register_eager_task
    1032  _py_unregister_task = _unregister_task
    1033  _py_unregister_eager_task = _unregister_eager_task
    1034  _py_enter_task = _enter_task
    1035  _py_leave_task = _leave_task
    1036  _py_swap_current_task = _swap_current_task
    1037  
    1038  
    1039  try:
    1040      from _asyncio import (_register_task, _register_eager_task,
    1041                            _unregister_task, _unregister_eager_task,
    1042                            _enter_task, _leave_task, _swap_current_task,
    1043                            _scheduled_tasks, _eager_tasks, _current_tasks,
    1044                            current_task)
    1045  except ImportError:
    1046      pass
    1047  else:
    1048      _c_current_task = current_task
    1049      _c_register_task = _register_task
    1050      _c_register_eager_task = _register_eager_task
    1051      _c_unregister_task = _unregister_task
    1052      _c_unregister_eager_task = _unregister_eager_task
    1053      _c_enter_task = _enter_task
    1054      _c_leave_task = _leave_task
    1055      _c_swap_current_task = _swap_current_task