python (3.12.0)
1 """Support for tasks, coroutines and the scheduler."""
2
3 __all__ = (
4 'Task', 'create_task',
5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6 'wait', 'wait_for', 'as_completed', 'sleep',
7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8 'current_task', 'all_tasks',
9 'create_eager_task_factory', 'eager_task_factory',
10 '_register_task', '_unregister_task', '_enter_task', '_leave_task',
11 )
12
13 import concurrent.futures
14 import contextvars
15 import functools
16 import inspect
17 import itertools
18 import types
19 import warnings
20 import weakref
21 from types import GenericAlias
22
23 from . import base_tasks
24 from . import coroutines
25 from . import events
26 from . import exceptions
27 from . import futures
28 from . import timeouts
29
30 # Helper to generate new task names
31 # This uses itertools.count() instead of a "+= 1" operation because the latter
32 # is not thread safe. See bpo-11866 for a longer explanation.
33 _task_name_counter = itertools.count(1).__next__
34
35
36 def current_task(loop=None):
37 """Return a currently executed task."""
38 if loop is None:
39 loop = events.get_running_loop()
40 return _current_tasks.get(loop)
41
42
43 def all_tasks(loop=None):
44 """Return a set of all tasks for the loop."""
45 if loop is None:
46 loop = events.get_running_loop()
47 # capturing the set of eager tasks first, so if an eager task "graduates"
48 # to a regular task in another thread, we don't risk missing it.
49 eager_tasks = list(_eager_tasks)
50 # Looping over the WeakSet isn't safe as it can be updated from another
51 # thread, therefore we cast it to list prior to filtering. The list cast
52 # itself requires iteration, so we repeat it several times ignoring
53 # RuntimeErrors (which are not very likely to occur).
54 # See issues 34970 and 36607 for details.
55 scheduled_tasks = None
56 i = 0
57 while True:
58 try:
59 scheduled_tasks = list(_scheduled_tasks)
60 except RuntimeError:
61 i += 1
62 if i >= 1000:
63 raise
64 else:
65 break
66 return {t for t in itertools.chain(scheduled_tasks, eager_tasks)
67 if futures._get_loop(t) is loop and not t.done()}
68
69
70 def _set_task_name(task, name):
71 if name is not None:
72 try:
73 set_name = task.set_name
74 except AttributeError:
75 warnings.warn("Task.set_name() was added in Python 3.8, "
76 "the method support will be mandatory for third-party "
77 "task implementations since 3.13.",
78 DeprecationWarning, stacklevel=3)
79 else:
80 set_name(name)
81
82
83 class ESC[4;38;5;81mTask(ESC[4;38;5;149mfuturesESC[4;38;5;149m.ESC[4;38;5;149m_PyFuture): # Inherit Python Task implementation
84 # from a Python Future implementation.
85
86 """A coroutine wrapped in a Future."""
87
88 # An important invariant maintained while a Task not done:
89 #
90 # - Either _fut_waiter is None, and _step() is scheduled;
91 # - or _fut_waiter is some Future, and _step() is *not* scheduled.
92 #
93 # The only transition from the latter to the former is through
94 # _wakeup(). When _fut_waiter is not None, one of its callbacks
95 # must be _wakeup().
96
97 # If False, don't log a message if the task is destroyed whereas its
98 # status is still pending
99 _log_destroy_pending = True
100
101 def __init__(self, coro, *, loop=None, name=None, context=None,
102 eager_start=False):
103 super().__init__(loop=loop)
104 if self._source_traceback:
105 del self._source_traceback[-1]
106 if not coroutines.iscoroutine(coro):
107 # raise after Future.__init__(), attrs are required for __del__
108 # prevent logging for pending task in __del__
109 self._log_destroy_pending = False
110 raise TypeError(f"a coroutine was expected, got {coro!r}")
111
112 if name is None:
113 self._name = f'Task-{_task_name_counter()}'
114 else:
115 self._name = str(name)
116
117 self._num_cancels_requested = 0
118 self._must_cancel = False
119 self._fut_waiter = None
120 self._coro = coro
121 if context is None:
122 self._context = contextvars.copy_context()
123 else:
124 self._context = context
125
126 if eager_start and self._loop.is_running():
127 self.__eager_start()
128 else:
129 self._loop.call_soon(self.__step, context=self._context)
130 _register_task(self)
131
132 def __del__(self):
133 if self._state == futures._PENDING and self._log_destroy_pending:
134 context = {
135 'task': self,
136 'message': 'Task was destroyed but it is pending!',
137 }
138 if self._source_traceback:
139 context['source_traceback'] = self._source_traceback
140 self._loop.call_exception_handler(context)
141 super().__del__()
142
143 __class_getitem__ = classmethod(GenericAlias)
144
145 def __repr__(self):
146 return base_tasks._task_repr(self)
147
148 def get_coro(self):
149 return self._coro
150
151 def get_context(self):
152 return self._context
153
154 def get_name(self):
155 return self._name
156
157 def set_name(self, value):
158 self._name = str(value)
159
160 def set_result(self, result):
161 raise RuntimeError('Task does not support set_result operation')
162
163 def set_exception(self, exception):
164 raise RuntimeError('Task does not support set_exception operation')
165
166 def get_stack(self, *, limit=None):
167 """Return the list of stack frames for this task's coroutine.
168
169 If the coroutine is not done, this returns the stack where it is
170 suspended. If the coroutine has completed successfully or was
171 cancelled, this returns an empty list. If the coroutine was
172 terminated by an exception, this returns the list of traceback
173 frames.
174
175 The frames are always ordered from oldest to newest.
176
177 The optional limit gives the maximum number of frames to
178 return; by default all available frames are returned. Its
179 meaning differs depending on whether a stack or a traceback is
180 returned: the newest frames of a stack are returned, but the
181 oldest frames of a traceback are returned. (This matches the
182 behavior of the traceback module.)
183
184 For reasons beyond our control, only one stack frame is
185 returned for a suspended coroutine.
186 """
187 return base_tasks._task_get_stack(self, limit)
188
189 def print_stack(self, *, limit=None, file=None):
190 """Print the stack or traceback for this task's coroutine.
191
192 This produces output similar to that of the traceback module,
193 for the frames retrieved by get_stack(). The limit argument
194 is passed to get_stack(). The file argument is an I/O stream
195 to which the output is written; by default output is written
196 to sys.stderr.
197 """
198 return base_tasks._task_print_stack(self, limit, file)
199
200 def cancel(self, msg=None):
201 """Request that this task cancel itself.
202
203 This arranges for a CancelledError to be thrown into the
204 wrapped coroutine on the next cycle through the event loop.
205 The coroutine then has a chance to clean up or even deny
206 the request using try/except/finally.
207
208 Unlike Future.cancel, this does not guarantee that the
209 task will be cancelled: the exception might be caught and
210 acted upon, delaying cancellation of the task or preventing
211 cancellation completely. The task may also return a value or
212 raise a different exception.
213
214 Immediately after this method is called, Task.cancelled() will
215 not return True (unless the task was already cancelled). A
216 task will be marked as cancelled when the wrapped coroutine
217 terminates with a CancelledError exception (even if cancel()
218 was not called).
219
220 This also increases the task's count of cancellation requests.
221 """
222 self._log_traceback = False
223 if self.done():
224 return False
225 self._num_cancels_requested += 1
226 # These two lines are controversial. See discussion starting at
227 # https://github.com/python/cpython/pull/31394#issuecomment-1053545331
228 # Also remember that this is duplicated in _asynciomodule.c.
229 # if self._num_cancels_requested > 1:
230 # return False
231 if self._fut_waiter is not None:
232 if self._fut_waiter.cancel(msg=msg):
233 # Leave self._fut_waiter; it may be a Task that
234 # catches and ignores the cancellation so we may have
235 # to cancel it again later.
236 return True
237 # It must be the case that self.__step is already scheduled.
238 self._must_cancel = True
239 self._cancel_message = msg
240 return True
241
242 def cancelling(self):
243 """Return the count of the task's cancellation requests.
244
245 This count is incremented when .cancel() is called
246 and may be decremented using .uncancel().
247 """
248 return self._num_cancels_requested
249
250 def uncancel(self):
251 """Decrement the task's count of cancellation requests.
252
253 This should be called by the party that called `cancel()` on the task
254 beforehand.
255
256 Returns the remaining number of cancellation requests.
257 """
258 if self._num_cancels_requested > 0:
259 self._num_cancels_requested -= 1
260 return self._num_cancels_requested
261
262 def __eager_start(self):
263 prev_task = _swap_current_task(self._loop, self)
264 try:
265 _register_eager_task(self)
266 try:
267 self._context.run(self.__step_run_and_handle_result, None)
268 finally:
269 _unregister_eager_task(self)
270 finally:
271 try:
272 curtask = _swap_current_task(self._loop, prev_task)
273 assert curtask is self
274 finally:
275 if self.done():
276 self._coro = None
277 self = None # Needed to break cycles when an exception occurs.
278 else:
279 _register_task(self)
280
281 def __step(self, exc=None):
282 if self.done():
283 raise exceptions.InvalidStateError(
284 f'_step(): already done: {self!r}, {exc!r}')
285 if self._must_cancel:
286 if not isinstance(exc, exceptions.CancelledError):
287 exc = self._make_cancelled_error()
288 self._must_cancel = False
289 self._fut_waiter = None
290
291 _enter_task(self._loop, self)
292 try:
293 self.__step_run_and_handle_result(exc)
294 finally:
295 _leave_task(self._loop, self)
296 self = None # Needed to break cycles when an exception occurs.
297
298 def __step_run_and_handle_result(self, exc):
299 coro = self._coro
300 try:
301 if exc is None:
302 # We use the `send` method directly, because coroutines
303 # don't have `__iter__` and `__next__` methods.
304 result = coro.send(None)
305 else:
306 result = coro.throw(exc)
307 except StopIteration as exc:
308 if self._must_cancel:
309 # Task is cancelled right before coro stops.
310 self._must_cancel = False
311 super().cancel(msg=self._cancel_message)
312 else:
313 super().set_result(exc.value)
314 except exceptions.CancelledError as exc:
315 # Save the original exception so we can chain it later.
316 self._cancelled_exc = exc
317 super().cancel() # I.e., Future.cancel(self).
318 except (KeyboardInterrupt, SystemExit) as exc:
319 super().set_exception(exc)
320 raise
321 except BaseException as exc:
322 super().set_exception(exc)
323 else:
324 blocking = getattr(result, '_asyncio_future_blocking', None)
325 if blocking is not None:
326 # Yielded Future must come from Future.__iter__().
327 if futures._get_loop(result) is not self._loop:
328 new_exc = RuntimeError(
329 f'Task {self!r} got Future '
330 f'{result!r} attached to a different loop')
331 self._loop.call_soon(
332 self.__step, new_exc, context=self._context)
333 elif blocking:
334 if result is self:
335 new_exc = RuntimeError(
336 f'Task cannot await on itself: {self!r}')
337 self._loop.call_soon(
338 self.__step, new_exc, context=self._context)
339 else:
340 result._asyncio_future_blocking = False
341 result.add_done_callback(
342 self.__wakeup, context=self._context)
343 self._fut_waiter = result
344 if self._must_cancel:
345 if self._fut_waiter.cancel(
346 msg=self._cancel_message):
347 self._must_cancel = False
348 else:
349 new_exc = RuntimeError(
350 f'yield was used instead of yield from '
351 f'in task {self!r} with {result!r}')
352 self._loop.call_soon(
353 self.__step, new_exc, context=self._context)
354
355 elif result is None:
356 # Bare yield relinquishes control for one event loop iteration.
357 self._loop.call_soon(self.__step, context=self._context)
358 elif inspect.isgenerator(result):
359 # Yielding a generator is just wrong.
360 new_exc = RuntimeError(
361 f'yield was used instead of yield from for '
362 f'generator in task {self!r} with {result!r}')
363 self._loop.call_soon(
364 self.__step, new_exc, context=self._context)
365 else:
366 # Yielding something else is an error.
367 new_exc = RuntimeError(f'Task got bad yield: {result!r}')
368 self._loop.call_soon(
369 self.__step, new_exc, context=self._context)
370 finally:
371 self = None # Needed to break cycles when an exception occurs.
372
373 def __wakeup(self, future):
374 try:
375 future.result()
376 except BaseException as exc:
377 # This may also be a cancellation.
378 self.__step(exc)
379 else:
380 # Don't pass the value of `future.result()` explicitly,
381 # as `Future.__iter__` and `Future.__await__` don't need it.
382 # If we call `_step(value, None)` instead of `_step()`,
383 # Python eval loop would use `.send(value)` method call,
384 # instead of `__next__()`, which is slower for futures
385 # that return non-generator iterators from their `__iter__`.
386 self.__step()
387 self = None # Needed to break cycles when an exception occurs.
388
389
390 _PyTask = Task
391
392
393 try:
394 import _asyncio
395 except ImportError:
396 pass
397 else:
398 # _CTask is needed for tests.
399 Task = _CTask = _asyncio.Task
400
401
402 def create_task(coro, *, name=None, context=None):
403 """Schedule the execution of a coroutine object in a spawn task.
404
405 Return a Task object.
406 """
407 loop = events.get_running_loop()
408 if context is None:
409 # Use legacy API if context is not needed
410 task = loop.create_task(coro)
411 else:
412 task = loop.create_task(coro, context=context)
413
414 _set_task_name(task, name)
415 return task
416
417
418 # wait() and as_completed() similar to those in PEP 3148.
419
420 FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
421 FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
422 ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
423
424
425 async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
426 """Wait for the Futures or Tasks given by fs to complete.
427
428 The fs iterable must not be empty.
429
430 Coroutines will be wrapped in Tasks.
431
432 Returns two sets of Future: (done, pending).
433
434 Usage:
435
436 done, pending = await asyncio.wait(fs)
437
438 Note: This does not raise TimeoutError! Futures that aren't done
439 when the timeout occurs are returned in the second set.
440 """
441 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
442 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
443 if not fs:
444 raise ValueError('Set of Tasks/Futures is empty.')
445 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
446 raise ValueError(f'Invalid return_when value: {return_when}')
447
448 fs = set(fs)
449
450 if any(coroutines.iscoroutine(f) for f in fs):
451 raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
452
453 loop = events.get_running_loop()
454 return await _wait(fs, timeout, return_when, loop)
455
456
457 def _release_waiter(waiter, *args):
458 if not waiter.done():
459 waiter.set_result(None)
460
461
462 async def wait_for(fut, timeout):
463 """Wait for the single Future or coroutine to complete, with timeout.
464
465 Coroutine will be wrapped in Task.
466
467 Returns result of the Future or coroutine. When a timeout occurs,
468 it cancels the task and raises TimeoutError. To avoid the task
469 cancellation, wrap it in shield().
470
471 If the wait is cancelled, the task is also cancelled.
472
473 If the task supresses the cancellation and returns a value instead,
474 that value is returned.
475
476 This function is a coroutine.
477 """
478 # The special case for timeout <= 0 is for the following case:
479 #
480 # async def test_waitfor():
481 # func_started = False
482 #
483 # async def func():
484 # nonlocal func_started
485 # func_started = True
486 #
487 # try:
488 # await asyncio.wait_for(func(), 0)
489 # except asyncio.TimeoutError:
490 # assert not func_started
491 # else:
492 # assert False
493 #
494 # asyncio.run(test_waitfor())
495
496
497 if timeout is not None and timeout <= 0:
498 fut = ensure_future(fut)
499
500 if fut.done():
501 return fut.result()
502
503 await _cancel_and_wait(fut)
504 try:
505 return fut.result()
506 except exceptions.CancelledError as exc:
507 raise TimeoutError from exc
508
509 async with timeouts.timeout(timeout):
510 return await fut
511
512 async def _wait(fs, timeout, return_when, loop):
513 """Internal helper for wait().
514
515 The fs argument must be a collection of Futures.
516 """
517 assert fs, 'Set of Futures is empty.'
518 waiter = loop.create_future()
519 timeout_handle = None
520 if timeout is not None:
521 timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
522 counter = len(fs)
523
524 def _on_completion(f):
525 nonlocal counter
526 counter -= 1
527 if (counter <= 0 or
528 return_when == FIRST_COMPLETED or
529 return_when == FIRST_EXCEPTION and (not f.cancelled() and
530 f.exception() is not None)):
531 if timeout_handle is not None:
532 timeout_handle.cancel()
533 if not waiter.done():
534 waiter.set_result(None)
535
536 for f in fs:
537 f.add_done_callback(_on_completion)
538
539 try:
540 await waiter
541 finally:
542 if timeout_handle is not None:
543 timeout_handle.cancel()
544 for f in fs:
545 f.remove_done_callback(_on_completion)
546
547 done, pending = set(), set()
548 for f in fs:
549 if f.done():
550 done.add(f)
551 else:
552 pending.add(f)
553 return done, pending
554
555
556 async def _cancel_and_wait(fut):
557 """Cancel the *fut* future or task and wait until it completes."""
558
559 loop = events.get_running_loop()
560 waiter = loop.create_future()
561 cb = functools.partial(_release_waiter, waiter)
562 fut.add_done_callback(cb)
563
564 try:
565 fut.cancel()
566 # We cannot wait on *fut* directly to make
567 # sure _cancel_and_wait itself is reliably cancellable.
568 await waiter
569 finally:
570 fut.remove_done_callback(cb)
571
572
573 # This is *not* a @coroutine! It is just an iterator (yielding Futures).
574 def as_completed(fs, *, timeout=None):
575 """Return an iterator whose values are coroutines.
576
577 When waiting for the yielded coroutines you'll get the results (or
578 exceptions!) of the original Futures (or coroutines), in the order
579 in which and as soon as they complete.
580
581 This differs from PEP 3148; the proper way to use this is:
582
583 for f in as_completed(fs):
584 result = await f # The 'await' may raise.
585 # Use result.
586
587 If a timeout is specified, the 'await' will raise
588 TimeoutError when the timeout occurs before all Futures are done.
589
590 Note: The futures 'f' are not necessarily members of fs.
591 """
592 if futures.isfuture(fs) or coroutines.iscoroutine(fs):
593 raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
594
595 from .queues import Queue # Import here to avoid circular import problem.
596 done = Queue()
597
598 loop = events.get_event_loop()
599 todo = {ensure_future(f, loop=loop) for f in set(fs)}
600 timeout_handle = None
601
602 def _on_timeout():
603 for f in todo:
604 f.remove_done_callback(_on_completion)
605 done.put_nowait(None) # Queue a dummy value for _wait_for_one().
606 todo.clear() # Can't do todo.remove(f) in the loop.
607
608 def _on_completion(f):
609 if not todo:
610 return # _on_timeout() was here first.
611 todo.remove(f)
612 done.put_nowait(f)
613 if not todo and timeout_handle is not None:
614 timeout_handle.cancel()
615
616 async def _wait_for_one():
617 f = await done.get()
618 if f is None:
619 # Dummy value from _on_timeout().
620 raise exceptions.TimeoutError
621 return f.result() # May raise f.exception().
622
623 for f in todo:
624 f.add_done_callback(_on_completion)
625 if todo and timeout is not None:
626 timeout_handle = loop.call_later(timeout, _on_timeout)
627 for _ in range(len(todo)):
628 yield _wait_for_one()
629
630
631 @types.coroutine
632 def __sleep0():
633 """Skip one event loop run cycle.
634
635 This is a private helper for 'asyncio.sleep()', used
636 when the 'delay' is set to 0. It uses a bare 'yield'
637 expression (which Task.__step knows how to handle)
638 instead of creating a Future object.
639 """
640 yield
641
642
643 async def sleep(delay, result=None):
644 """Coroutine that completes after a given time (in seconds)."""
645 if delay <= 0:
646 await __sleep0()
647 return result
648
649 loop = events.get_running_loop()
650 future = loop.create_future()
651 h = loop.call_later(delay,
652 futures._set_result_unless_cancelled,
653 future, result)
654 try:
655 return await future
656 finally:
657 h.cancel()
658
659
660 def ensure_future(coro_or_future, *, loop=None):
661 """Wrap a coroutine or an awaitable in a future.
662
663 If the argument is a Future, it is returned directly.
664 """
665 if futures.isfuture(coro_or_future):
666 if loop is not None and loop is not futures._get_loop(coro_or_future):
667 raise ValueError('The future belongs to a different loop than '
668 'the one specified as the loop argument')
669 return coro_or_future
670 should_close = True
671 if not coroutines.iscoroutine(coro_or_future):
672 if inspect.isawaitable(coro_or_future):
673 async def _wrap_awaitable(awaitable):
674 return await awaitable
675
676 coro_or_future = _wrap_awaitable(coro_or_future)
677 should_close = False
678 else:
679 raise TypeError('An asyncio.Future, a coroutine or an awaitable '
680 'is required')
681
682 if loop is None:
683 loop = events.get_event_loop()
684 try:
685 return loop.create_task(coro_or_future)
686 except RuntimeError:
687 if should_close:
688 coro_or_future.close()
689 raise
690
691
692 class ESC[4;38;5;81m_GatheringFuture(ESC[4;38;5;149mfuturesESC[4;38;5;149m.ESC[4;38;5;149mFuture):
693 """Helper for gather().
694
695 This overrides cancel() to cancel all the children and act more
696 like Task.cancel(), which doesn't immediately mark itself as
697 cancelled.
698 """
699
700 def __init__(self, children, *, loop):
701 assert loop is not None
702 super().__init__(loop=loop)
703 self._children = children
704 self._cancel_requested = False
705
706 def cancel(self, msg=None):
707 if self.done():
708 return False
709 ret = False
710 for child in self._children:
711 if child.cancel(msg=msg):
712 ret = True
713 if ret:
714 # If any child tasks were actually cancelled, we should
715 # propagate the cancellation request regardless of
716 # *return_exceptions* argument. See issue 32684.
717 self._cancel_requested = True
718 return ret
719
720
721 def gather(*coros_or_futures, return_exceptions=False):
722 """Return a future aggregating results from the given coroutines/futures.
723
724 Coroutines will be wrapped in a future and scheduled in the event
725 loop. They will not necessarily be scheduled in the same order as
726 passed in.
727
728 All futures must share the same event loop. If all the tasks are
729 done successfully, the returned future's result is the list of
730 results (in the order of the original sequence, not necessarily
731 the order of results arrival). If *return_exceptions* is True,
732 exceptions in the tasks are treated the same as successful
733 results, and gathered in the result list; otherwise, the first
734 raised exception will be immediately propagated to the returned
735 future.
736
737 Cancellation: if the outer Future is cancelled, all children (that
738 have not completed yet) are also cancelled. If any child is
739 cancelled, this is treated as if it raised CancelledError --
740 the outer Future is *not* cancelled in this case. (This is to
741 prevent the cancellation of one child to cause other children to
742 be cancelled.)
743
744 If *return_exceptions* is False, cancelling gather() after it
745 has been marked done won't cancel any submitted awaitables.
746 For instance, gather can be marked done after propagating an
747 exception to the caller, therefore, calling ``gather.cancel()``
748 after catching an exception (raised by one of the awaitables) from
749 gather won't cancel any other awaitables.
750 """
751 if not coros_or_futures:
752 loop = events.get_event_loop()
753 outer = loop.create_future()
754 outer.set_result([])
755 return outer
756
757 def _done_callback(fut):
758 nonlocal nfinished
759 nfinished += 1
760
761 if outer is None or outer.done():
762 if not fut.cancelled():
763 # Mark exception retrieved.
764 fut.exception()
765 return
766
767 if not return_exceptions:
768 if fut.cancelled():
769 # Check if 'fut' is cancelled first, as
770 # 'fut.exception()' will *raise* a CancelledError
771 # instead of returning it.
772 exc = fut._make_cancelled_error()
773 outer.set_exception(exc)
774 return
775 else:
776 exc = fut.exception()
777 if exc is not None:
778 outer.set_exception(exc)
779 return
780
781 if nfinished == nfuts:
782 # All futures are done; create a list of results
783 # and set it to the 'outer' future.
784 results = []
785
786 for fut in children:
787 if fut.cancelled():
788 # Check if 'fut' is cancelled first, as 'fut.exception()'
789 # will *raise* a CancelledError instead of returning it.
790 # Also, since we're adding the exception return value
791 # to 'results' instead of raising it, don't bother
792 # setting __context__. This also lets us preserve
793 # calling '_make_cancelled_error()' at most once.
794 res = exceptions.CancelledError(
795 '' if fut._cancel_message is None else
796 fut._cancel_message)
797 else:
798 res = fut.exception()
799 if res is None:
800 res = fut.result()
801 results.append(res)
802
803 if outer._cancel_requested:
804 # If gather is being cancelled we must propagate the
805 # cancellation regardless of *return_exceptions* argument.
806 # See issue 32684.
807 exc = fut._make_cancelled_error()
808 outer.set_exception(exc)
809 else:
810 outer.set_result(results)
811
812 arg_to_fut = {}
813 children = []
814 nfuts = 0
815 nfinished = 0
816 done_futs = []
817 loop = None
818 outer = None # bpo-46672
819 for arg in coros_or_futures:
820 if arg not in arg_to_fut:
821 fut = ensure_future(arg, loop=loop)
822 if loop is None:
823 loop = futures._get_loop(fut)
824 if fut is not arg:
825 # 'arg' was not a Future, therefore, 'fut' is a new
826 # Future created specifically for 'arg'. Since the caller
827 # can't control it, disable the "destroy pending task"
828 # warning.
829 fut._log_destroy_pending = False
830
831 nfuts += 1
832 arg_to_fut[arg] = fut
833 if fut.done():
834 done_futs.append(fut)
835 else:
836 fut.add_done_callback(_done_callback)
837
838 else:
839 # There's a duplicate Future object in coros_or_futures.
840 fut = arg_to_fut[arg]
841
842 children.append(fut)
843
844 outer = _GatheringFuture(children, loop=loop)
845 # Run done callbacks after GatheringFuture created so any post-processing
846 # can be performed at this point
847 # optimization: in the special case that *all* futures finished eagerly,
848 # this will effectively complete the gather eagerly, with the last
849 # callback setting the result (or exception) on outer before returning it
850 for fut in done_futs:
851 _done_callback(fut)
852 return outer
853
854
855 def shield(arg):
856 """Wait for a future, shielding it from cancellation.
857
858 The statement
859
860 task = asyncio.create_task(something())
861 res = await shield(task)
862
863 is exactly equivalent to the statement
864
865 res = await something()
866
867 *except* that if the coroutine containing it is cancelled, the
868 task running in something() is not cancelled. From the POV of
869 something(), the cancellation did not happen. But its caller is
870 still cancelled, so the yield-from expression still raises
871 CancelledError. Note: If something() is cancelled by other means
872 this will still cancel shield().
873
874 If you want to completely ignore cancellation (not recommended)
875 you can combine shield() with a try/except clause, as follows:
876
877 task = asyncio.create_task(something())
878 try:
879 res = await shield(task)
880 except CancelledError:
881 res = None
882
883 Save a reference to tasks passed to this function, to avoid
884 a task disappearing mid-execution. The event loop only keeps
885 weak references to tasks. A task that isn't referenced elsewhere
886 may get garbage collected at any time, even before it's done.
887 """
888 inner = ensure_future(arg)
889 if inner.done():
890 # Shortcut.
891 return inner
892 loop = futures._get_loop(inner)
893 outer = loop.create_future()
894
895 def _inner_done_callback(inner):
896 if outer.cancelled():
897 if not inner.cancelled():
898 # Mark inner's result as retrieved.
899 inner.exception()
900 return
901
902 if inner.cancelled():
903 outer.cancel()
904 else:
905 exc = inner.exception()
906 if exc is not None:
907 outer.set_exception(exc)
908 else:
909 outer.set_result(inner.result())
910
911
912 def _outer_done_callback(outer):
913 if not inner.done():
914 inner.remove_done_callback(_inner_done_callback)
915
916 inner.add_done_callback(_inner_done_callback)
917 outer.add_done_callback(_outer_done_callback)
918 return outer
919
920
921 def run_coroutine_threadsafe(coro, loop):
922 """Submit a coroutine object to a given event loop.
923
924 Return a concurrent.futures.Future to access the result.
925 """
926 if not coroutines.iscoroutine(coro):
927 raise TypeError('A coroutine object is required')
928 future = concurrent.futures.Future()
929
930 def callback():
931 try:
932 futures._chain_future(ensure_future(coro, loop=loop), future)
933 except (SystemExit, KeyboardInterrupt):
934 raise
935 except BaseException as exc:
936 if future.set_running_or_notify_cancel():
937 future.set_exception(exc)
938 raise
939
940 loop.call_soon_threadsafe(callback)
941 return future
942
943
944 def create_eager_task_factory(custom_task_constructor):
945 """Create a function suitable for use as a task factory on an event-loop.
946
947 Example usage:
948
949 loop.set_task_factory(
950 asyncio.create_eager_task_factory(my_task_constructor))
951
952 Now, tasks created will be started immediately (rather than being first
953 scheduled to an event loop). The constructor argument can be any callable
954 that returns a Task-compatible object and has a signature compatible
955 with `Task.__init__`; it must have the `eager_start` keyword argument.
956
957 Most applications will use `Task` for `custom_task_constructor` and in
958 this case there's no need to call `create_eager_task_factory()`
959 directly. Instead the global `eager_task_factory` instance can be
960 used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`.
961 """
962
963 def factory(loop, coro, *, name=None, context=None):
964 return custom_task_constructor(
965 coro, loop=loop, name=name, context=context, eager_start=True)
966
967 return factory
968
969
970 eager_task_factory = create_eager_task_factory(Task)
971
972
973 # Collectively these two sets hold references to the complete set of active
974 # tasks. Eagerly executed tasks use a faster regular set as an optimization
975 # but may graduate to a WeakSet if the task blocks on IO.
976 _scheduled_tasks = weakref.WeakSet()
977 _eager_tasks = set()
978
979 # Dictionary containing tasks that are currently active in
980 # all running event loops. {EventLoop: Task}
981 _current_tasks = {}
982
983
984 def _register_task(task):
985 """Register an asyncio Task scheduled to run on an event loop."""
986 _scheduled_tasks.add(task)
987
988
989 def _register_eager_task(task):
990 """Register an asyncio Task about to be eagerly executed."""
991 _eager_tasks.add(task)
992
993
994 def _enter_task(loop, task):
995 current_task = _current_tasks.get(loop)
996 if current_task is not None:
997 raise RuntimeError(f"Cannot enter into task {task!r} while another "
998 f"task {current_task!r} is being executed.")
999 _current_tasks[loop] = task
1000
1001
1002 def _leave_task(loop, task):
1003 current_task = _current_tasks.get(loop)
1004 if current_task is not task:
1005 raise RuntimeError(f"Leaving task {task!r} does not match "
1006 f"the current task {current_task!r}.")
1007 del _current_tasks[loop]
1008
1009
1010 def _swap_current_task(loop, task):
1011 prev_task = _current_tasks.get(loop)
1012 if task is None:
1013 del _current_tasks[loop]
1014 else:
1015 _current_tasks[loop] = task
1016 return prev_task
1017
1018
1019 def _unregister_task(task):
1020 """Unregister a completed, scheduled Task."""
1021 _scheduled_tasks.discard(task)
1022
1023
1024 def _unregister_eager_task(task):
1025 """Unregister a task which finished its first eager step."""
1026 _eager_tasks.discard(task)
1027
1028
1029 _py_current_task = current_task
1030 _py_register_task = _register_task
1031 _py_register_eager_task = _register_eager_task
1032 _py_unregister_task = _unregister_task
1033 _py_unregister_eager_task = _unregister_eager_task
1034 _py_enter_task = _enter_task
1035 _py_leave_task = _leave_task
1036 _py_swap_current_task = _swap_current_task
1037
1038
1039 try:
1040 from _asyncio import (_register_task, _register_eager_task,
1041 _unregister_task, _unregister_eager_task,
1042 _enter_task, _leave_task, _swap_current_task,
1043 _scheduled_tasks, _eager_tasks, _current_tasks,
1044 current_task)
1045 except ImportError:
1046 pass
1047 else:
1048 _c_current_task = current_task
1049 _c_register_task = _register_task
1050 _c_register_eager_task = _register_eager_task
1051 _c_unregister_task = _unregister_task
1052 _c_unregister_eager_task = _unregister_eager_task
1053 _c_enter_task = _enter_task
1054 _c_leave_task = _leave_task
1055 _c_swap_current_task = _swap_current_task