python (3.12.0)
1 """Tests for tasks.py."""
2
3 import collections
4 import contextvars
5 import gc
6 import io
7 import random
8 import re
9 import sys
10 import traceback
11 import types
12 import unittest
13 from unittest import mock
14 from types import GenericAlias
15
16 import asyncio
17 from asyncio import futures
18 from asyncio import tasks
19 from test.test_asyncio import utils as test_utils
20 from test import support
21 from test.support.script_helper import assert_python_ok
22
23
24 def tearDownModule():
25 asyncio.set_event_loop_policy(None)
26
27
28 async def coroutine_function():
29 pass
30
31
32 def format_coroutine(qualname, state, src, source_traceback, generator=False):
33 if generator:
34 state = '%s' % state
35 else:
36 state = '%s, defined' % state
37 if source_traceback is not None:
38 frame = source_traceback[-1]
39 return ('coro=<%s() %s at %s> created at %s:%s'
40 % (qualname, state, src, frame[0], frame[1]))
41 else:
42 return 'coro=<%s() %s at %s>' % (qualname, state, src)
43
44
45 def get_innermost_context(exc):
46 """
47 Return information about the innermost exception context in the chain.
48 """
49 depth = 0
50 while True:
51 context = exc.__context__
52 if context is None:
53 break
54
55 exc = context
56 depth += 1
57
58 return (type(exc), exc.args, depth)
59
60
61 class ESC[4;38;5;81mDummy:
62
63 def __repr__(self):
64 return '<Dummy>'
65
66 def __call__(self, *args):
67 pass
68
69
70 class ESC[4;38;5;81mCoroLikeObject:
71 def send(self, v):
72 raise StopIteration(42)
73
74 def throw(self, *exc):
75 pass
76
77 def close(self):
78 pass
79
80 def __await__(self):
81 return self
82
83
84 class ESC[4;38;5;81mBaseTaskTests:
85
86 Task = None
87 Future = None
88
89 def new_task(self, loop, coro, name='TestTask', context=None):
90 return self.__class__.Task(coro, loop=loop, name=name, context=context)
91
92 def new_future(self, loop):
93 return self.__class__.Future(loop=loop)
94
95 def setUp(self):
96 super().setUp()
97 self.loop = self.new_test_loop()
98 self.loop.set_task_factory(self.new_task)
99 self.loop.create_future = lambda: self.new_future(self.loop)
100
101 def test_generic_alias(self):
102 task = self.__class__.Task[str]
103 self.assertEqual(task.__args__, (str,))
104 self.assertIsInstance(task, GenericAlias)
105
106 def test_task_cancel_message_getter(self):
107 async def coro():
108 pass
109 t = self.new_task(self.loop, coro())
110 self.assertTrue(hasattr(t, '_cancel_message'))
111 self.assertEqual(t._cancel_message, None)
112
113 t.cancel('my message')
114 self.assertEqual(t._cancel_message, 'my message')
115
116 with self.assertRaises(asyncio.CancelledError) as cm:
117 self.loop.run_until_complete(t)
118
119 self.assertEqual('my message', cm.exception.args[0])
120
121 def test_task_cancel_message_setter(self):
122 async def coro():
123 pass
124 t = self.new_task(self.loop, coro())
125 t.cancel('my message')
126 t._cancel_message = 'my new message'
127 self.assertEqual(t._cancel_message, 'my new message')
128
129 with self.assertRaises(asyncio.CancelledError) as cm:
130 self.loop.run_until_complete(t)
131
132 self.assertEqual('my new message', cm.exception.args[0])
133
134 def test_task_del_collect(self):
135 class ESC[4;38;5;81mEvil:
136 def __del__(self):
137 gc.collect()
138
139 async def run():
140 return Evil()
141
142 self.loop.run_until_complete(
143 asyncio.gather(*[
144 self.new_task(self.loop, run()) for _ in range(100)
145 ]))
146
147 def test_other_loop_future(self):
148 other_loop = asyncio.new_event_loop()
149 fut = self.new_future(other_loop)
150
151 async def run(fut):
152 await fut
153
154 try:
155 with self.assertRaisesRegex(RuntimeError,
156 r'Task .* got Future .* attached'):
157 self.loop.run_until_complete(run(fut))
158 finally:
159 other_loop.close()
160
161 def test_task_awaits_on_itself(self):
162
163 async def test():
164 await task
165
166 task = asyncio.ensure_future(test(), loop=self.loop)
167
168 with self.assertRaisesRegex(RuntimeError,
169 'Task cannot await on itself'):
170 self.loop.run_until_complete(task)
171
172 def test_task_class(self):
173 async def notmuch():
174 return 'ok'
175 t = self.new_task(self.loop, notmuch())
176 self.loop.run_until_complete(t)
177 self.assertTrue(t.done())
178 self.assertEqual(t.result(), 'ok')
179 self.assertIs(t._loop, self.loop)
180 self.assertIs(t.get_loop(), self.loop)
181
182 loop = asyncio.new_event_loop()
183 self.set_event_loop(loop)
184 t = self.new_task(loop, notmuch())
185 self.assertIs(t._loop, loop)
186 loop.run_until_complete(t)
187 loop.close()
188
189 def test_ensure_future_coroutine(self):
190 async def notmuch():
191 return 'ok'
192 t = asyncio.ensure_future(notmuch(), loop=self.loop)
193 self.assertIs(t._loop, self.loop)
194 self.loop.run_until_complete(t)
195 self.assertTrue(t.done())
196 self.assertEqual(t.result(), 'ok')
197
198 a = notmuch()
199 self.addCleanup(a.close)
200 with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
201 asyncio.ensure_future(a)
202
203 async def test():
204 return asyncio.ensure_future(notmuch())
205 t = self.loop.run_until_complete(test())
206 self.assertIs(t._loop, self.loop)
207 self.loop.run_until_complete(t)
208 self.assertTrue(t.done())
209 self.assertEqual(t.result(), 'ok')
210
211 # Deprecated in 3.10, undeprecated in 3.12
212 asyncio.set_event_loop(self.loop)
213 self.addCleanup(asyncio.set_event_loop, None)
214 t = asyncio.ensure_future(notmuch())
215 self.assertIs(t._loop, self.loop)
216 self.loop.run_until_complete(t)
217 self.assertTrue(t.done())
218 self.assertEqual(t.result(), 'ok')
219
220 def test_ensure_future_future(self):
221 f_orig = self.new_future(self.loop)
222 f_orig.set_result('ko')
223
224 f = asyncio.ensure_future(f_orig)
225 self.loop.run_until_complete(f)
226 self.assertTrue(f.done())
227 self.assertEqual(f.result(), 'ko')
228 self.assertIs(f, f_orig)
229
230 loop = asyncio.new_event_loop()
231 self.set_event_loop(loop)
232
233 with self.assertRaises(ValueError):
234 f = asyncio.ensure_future(f_orig, loop=loop)
235
236 loop.close()
237
238 f = asyncio.ensure_future(f_orig, loop=self.loop)
239 self.assertIs(f, f_orig)
240
241 def test_ensure_future_task(self):
242 async def notmuch():
243 return 'ok'
244 t_orig = self.new_task(self.loop, notmuch())
245 t = asyncio.ensure_future(t_orig)
246 self.loop.run_until_complete(t)
247 self.assertTrue(t.done())
248 self.assertEqual(t.result(), 'ok')
249 self.assertIs(t, t_orig)
250
251 loop = asyncio.new_event_loop()
252 self.set_event_loop(loop)
253
254 with self.assertRaises(ValueError):
255 t = asyncio.ensure_future(t_orig, loop=loop)
256
257 loop.close()
258
259 t = asyncio.ensure_future(t_orig, loop=self.loop)
260 self.assertIs(t, t_orig)
261
262 def test_ensure_future_awaitable(self):
263 class ESC[4;38;5;81mAw:
264 def __init__(self, coro):
265 self.coro = coro
266 def __await__(self):
267 return self.coro.__await__()
268
269 async def coro():
270 return 'ok'
271
272 loop = asyncio.new_event_loop()
273 self.set_event_loop(loop)
274 fut = asyncio.ensure_future(Aw(coro()), loop=loop)
275 loop.run_until_complete(fut)
276 self.assertEqual(fut.result(), 'ok')
277
278 def test_ensure_future_task_awaitable(self):
279 class ESC[4;38;5;81mAw:
280 def __await__(self):
281 return asyncio.sleep(0, result='ok').__await__()
282
283 loop = asyncio.new_event_loop()
284 self.set_event_loop(loop)
285 task = asyncio.ensure_future(Aw(), loop=loop)
286 loop.run_until_complete(task)
287 self.assertTrue(task.done())
288 self.assertEqual(task.result(), 'ok')
289 self.assertIsInstance(task.get_coro(), types.CoroutineType)
290 loop.close()
291
292 def test_ensure_future_neither(self):
293 with self.assertRaises(TypeError):
294 asyncio.ensure_future('ok')
295
296 def test_ensure_future_error_msg(self):
297 loop = asyncio.new_event_loop()
298 f = self.new_future(self.loop)
299 with self.assertRaisesRegex(ValueError, 'The future belongs to a '
300 'different loop than the one specified as '
301 'the loop argument'):
302 asyncio.ensure_future(f, loop=loop)
303 loop.close()
304
305 def test_get_stack(self):
306 T = None
307
308 async def foo():
309 await bar()
310
311 async def bar():
312 # test get_stack()
313 f = T.get_stack(limit=1)
314 try:
315 self.assertEqual(f[0].f_code.co_name, 'foo')
316 finally:
317 f = None
318
319 # test print_stack()
320 file = io.StringIO()
321 T.print_stack(limit=1, file=file)
322 file.seek(0)
323 tb = file.read()
324 self.assertRegex(tb, r'foo\(\) running')
325
326 async def runner():
327 nonlocal T
328 T = asyncio.ensure_future(foo(), loop=self.loop)
329 await T
330
331 self.loop.run_until_complete(runner())
332
333 def test_task_repr(self):
334 self.loop.set_debug(False)
335
336 async def notmuch():
337 return 'abc'
338
339 # test coroutine function
340 self.assertEqual(notmuch.__name__, 'notmuch')
341 self.assertRegex(notmuch.__qualname__,
342 r'\w+.test_task_repr.<locals>.notmuch')
343 self.assertEqual(notmuch.__module__, __name__)
344
345 filename, lineno = test_utils.get_function_source(notmuch)
346 src = "%s:%s" % (filename, lineno)
347
348 # test coroutine object
349 gen = notmuch()
350 coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch'
351 self.assertEqual(gen.__name__, 'notmuch')
352 self.assertEqual(gen.__qualname__, coro_qualname)
353
354 # test pending Task
355 t = self.new_task(self.loop, gen)
356 t.add_done_callback(Dummy())
357
358 coro = format_coroutine(coro_qualname, 'running', src,
359 t._source_traceback, generator=True)
360 self.assertEqual(repr(t),
361 "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
362
363 # test cancelling Task
364 t.cancel() # Does not take immediate effect!
365 self.assertEqual(repr(t),
366 "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro)
367
368 # test cancelled Task
369 self.assertRaises(asyncio.CancelledError,
370 self.loop.run_until_complete, t)
371 coro = format_coroutine(coro_qualname, 'done', src,
372 t._source_traceback)
373 self.assertEqual(repr(t),
374 "<Task cancelled name='TestTask' %s>" % coro)
375
376 # test finished Task
377 t = self.new_task(self.loop, notmuch())
378 self.loop.run_until_complete(t)
379 coro = format_coroutine(coro_qualname, 'done', src,
380 t._source_traceback)
381 self.assertEqual(repr(t),
382 "<Task finished name='TestTask' %s result='abc'>" % coro)
383
384 def test_task_repr_autogenerated(self):
385 async def notmuch():
386 return 123
387
388 t1 = self.new_task(self.loop, notmuch(), None)
389 t2 = self.new_task(self.loop, notmuch(), None)
390 self.assertNotEqual(repr(t1), repr(t2))
391
392 match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1))
393 self.assertIsNotNone(match1)
394 match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2))
395 self.assertIsNotNone(match2)
396
397 # Autogenerated task names should have monotonically increasing numbers
398 self.assertLess(int(match1.group(1)), int(match2.group(1)))
399 self.loop.run_until_complete(t1)
400 self.loop.run_until_complete(t2)
401
402 def test_task_set_name_pylong(self):
403 # test that setting the task name to a PyLong explicitly doesn't
404 # incorrectly trigger the deferred name formatting logic
405 async def notmuch():
406 return 123
407
408 t = self.new_task(self.loop, notmuch(), name=987654321)
409 self.assertEqual(t.get_name(), '987654321')
410 t.set_name(123456789)
411 self.assertEqual(t.get_name(), '123456789')
412 self.loop.run_until_complete(t)
413
414 def test_task_repr_name_not_str(self):
415 async def notmuch():
416 return 123
417
418 t = self.new_task(self.loop, notmuch())
419 t.set_name({6})
420 self.assertEqual(t.get_name(), '{6}')
421 self.loop.run_until_complete(t)
422
423 def test_task_repr_wait_for(self):
424 self.loop.set_debug(False)
425
426 async def wait_for(fut):
427 return await fut
428
429 fut = self.new_future(self.loop)
430 task = self.new_task(self.loop, wait_for(fut))
431 test_utils.run_briefly(self.loop)
432 self.assertRegex(repr(task),
433 '<Task .* wait_for=%s>' % re.escape(repr(fut)))
434
435 fut.set_result(None)
436 self.loop.run_until_complete(task)
437
438 def test_task_basics(self):
439
440 async def outer():
441 a = await inner1()
442 b = await inner2()
443 return a+b
444
445 async def inner1():
446 return 42
447
448 async def inner2():
449 return 1000
450
451 t = outer()
452 self.assertEqual(self.loop.run_until_complete(t), 1042)
453
454 def test_exception_chaining_after_await(self):
455 # Test that when awaiting on a task when an exception is already
456 # active, if the task raises an exception it will be chained
457 # with the original.
458 loop = asyncio.new_event_loop()
459 self.set_event_loop(loop)
460
461 async def raise_error():
462 raise ValueError
463
464 async def run():
465 try:
466 raise KeyError(3)
467 except Exception as exc:
468 task = self.new_task(loop, raise_error())
469 try:
470 await task
471 except Exception as exc:
472 self.assertEqual(type(exc), ValueError)
473 chained = exc.__context__
474 self.assertEqual((type(chained), chained.args),
475 (KeyError, (3,)))
476
477 try:
478 task = self.new_task(loop, run())
479 loop.run_until_complete(task)
480 finally:
481 loop.close()
482
483 def test_exception_chaining_after_await_with_context_cycle(self):
484 # Check trying to create an exception context cycle:
485 # https://bugs.python.org/issue40696
486 has_cycle = None
487 loop = asyncio.new_event_loop()
488 self.set_event_loop(loop)
489
490 async def process_exc(exc):
491 raise exc
492
493 async def run():
494 nonlocal has_cycle
495 try:
496 raise KeyError('a')
497 except Exception as exc:
498 task = self.new_task(loop, process_exc(exc))
499 try:
500 await task
501 except BaseException as exc:
502 has_cycle = (exc is exc.__context__)
503 # Prevent a hang if has_cycle is True.
504 exc.__context__ = None
505
506 try:
507 task = self.new_task(loop, run())
508 loop.run_until_complete(task)
509 finally:
510 loop.close()
511 # This also distinguishes from the initial has_cycle=None.
512 self.assertEqual(has_cycle, False)
513
514
515 def test_cancelling(self):
516 loop = asyncio.new_event_loop()
517
518 async def task():
519 await asyncio.sleep(10)
520
521 try:
522 t = self.new_task(loop, task())
523 self.assertFalse(t.cancelling())
524 self.assertNotIn(" cancelling ", repr(t))
525 self.assertTrue(t.cancel())
526 self.assertTrue(t.cancelling())
527 self.assertIn(" cancelling ", repr(t))
528
529 # Since we commented out two lines from Task.cancel(),
530 # this t.cancel() call now returns True.
531 # self.assertFalse(t.cancel())
532 self.assertTrue(t.cancel())
533
534 with self.assertRaises(asyncio.CancelledError):
535 loop.run_until_complete(t)
536 finally:
537 loop.close()
538
539 def test_uncancel_basic(self):
540 loop = asyncio.new_event_loop()
541
542 async def task():
543 try:
544 await asyncio.sleep(10)
545 except asyncio.CancelledError:
546 asyncio.current_task().uncancel()
547 await asyncio.sleep(10)
548
549 try:
550 t = self.new_task(loop, task())
551 loop.run_until_complete(asyncio.sleep(0.01))
552
553 # Cancel first sleep
554 self.assertTrue(t.cancel())
555 self.assertIn(" cancelling ", repr(t))
556 self.assertEqual(t.cancelling(), 1)
557 self.assertFalse(t.cancelled()) # Task is still not complete
558 loop.run_until_complete(asyncio.sleep(0.01))
559
560 # after .uncancel()
561 self.assertNotIn(" cancelling ", repr(t))
562 self.assertEqual(t.cancelling(), 0)
563 self.assertFalse(t.cancelled()) # Task is still not complete
564
565 # Cancel second sleep
566 self.assertTrue(t.cancel())
567 self.assertEqual(t.cancelling(), 1)
568 self.assertFalse(t.cancelled()) # Task is still not complete
569 with self.assertRaises(asyncio.CancelledError):
570 loop.run_until_complete(t)
571 self.assertTrue(t.cancelled()) # Finally, task complete
572 self.assertTrue(t.done())
573
574 # uncancel is no longer effective after the task is complete
575 t.uncancel()
576 self.assertTrue(t.cancelled())
577 self.assertTrue(t.done())
578 finally:
579 loop.close()
580
581 def test_uncancel_structured_blocks(self):
582 # This test recreates the following high-level structure using uncancel()::
583 #
584 # async def make_request_with_timeout():
585 # try:
586 # async with asyncio.timeout(1):
587 # # Structured block affected by the timeout:
588 # await make_request()
589 # await make_another_request()
590 # except TimeoutError:
591 # pass # There was a timeout
592 # # Outer code not affected by the timeout:
593 # await unrelated_code()
594
595 loop = asyncio.new_event_loop()
596
597 async def make_request_with_timeout(*, sleep: float, timeout: float):
598 task = asyncio.current_task()
599 loop = task.get_loop()
600
601 timed_out = False
602 structured_block_finished = False
603 outer_code_reached = False
604
605 def on_timeout():
606 nonlocal timed_out
607 timed_out = True
608 task.cancel()
609
610 timeout_handle = loop.call_later(timeout, on_timeout)
611 try:
612 try:
613 # Structured block affected by the timeout
614 await asyncio.sleep(sleep)
615 structured_block_finished = True
616 finally:
617 timeout_handle.cancel()
618 if (
619 timed_out
620 and task.uncancel() == 0
621 and type(sys.exception()) is asyncio.CancelledError
622 ):
623 # Note the five rules that are needed here to satisfy proper
624 # uncancellation:
625 #
626 # 1. handle uncancellation in a `finally:` block to allow for
627 # plain returns;
628 # 2. our `timed_out` flag is set, meaning that it was our event
629 # that triggered the need to uncancel the task, regardless of
630 # what exception is raised;
631 # 3. we can call `uncancel()` because *we* called `cancel()`
632 # before;
633 # 4. we call `uncancel()` but we only continue converting the
634 # CancelledError to TimeoutError if `uncancel()` caused the
635 # cancellation request count go down to 0. We need to look
636 # at the counter vs having a simple boolean flag because our
637 # code might have been nested (think multiple timeouts). See
638 # commit 7fce1063b6e5a366f8504e039a8ccdd6944625cd for
639 # details.
640 # 5. we only convert CancelledError to TimeoutError; for other
641 # exceptions raised due to the cancellation (like
642 # a ConnectionLostError from a database client), simply
643 # propagate them.
644 #
645 # Those checks need to take place in this exact order to make
646 # sure the `cancelling()` counter always stays in sync.
647 #
648 # Additionally, the original stimulus to `cancel()` the task
649 # needs to be unscheduled to avoid re-cancelling the task later.
650 # Here we do it by cancelling `timeout_handle` in the `finally:`
651 # block.
652 raise TimeoutError
653 except TimeoutError:
654 self.assertTrue(timed_out)
655
656 # Outer code not affected by the timeout:
657 outer_code_reached = True
658 await asyncio.sleep(0)
659 return timed_out, structured_block_finished, outer_code_reached
660
661 try:
662 # Test which timed out.
663 t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1))
664 timed_out, structured_block_finished, outer_code_reached = (
665 loop.run_until_complete(t1)
666 )
667 self.assertTrue(timed_out)
668 self.assertFalse(structured_block_finished) # it was cancelled
669 self.assertTrue(outer_code_reached) # task got uncancelled after leaving
670 # the structured block and continued until
671 # completion
672 self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task
673
674 # Test which did not time out.
675 t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0))
676 timed_out, structured_block_finished, outer_code_reached = (
677 loop.run_until_complete(t2)
678 )
679 self.assertFalse(timed_out)
680 self.assertTrue(structured_block_finished)
681 self.assertTrue(outer_code_reached)
682 self.assertEqual(t2.cancelling(), 0)
683 finally:
684 loop.close()
685
686 def test_cancel(self):
687
688 def gen():
689 when = yield
690 self.assertAlmostEqual(10.0, when)
691 yield 0
692
693 loop = self.new_test_loop(gen)
694
695 async def task():
696 await asyncio.sleep(10.0)
697 return 12
698
699 t = self.new_task(loop, task())
700 loop.call_soon(t.cancel)
701 with self.assertRaises(asyncio.CancelledError):
702 loop.run_until_complete(t)
703 self.assertTrue(t.done())
704 self.assertTrue(t.cancelled())
705 self.assertFalse(t.cancel())
706
707 def test_cancel_with_message_then_future_result(self):
708 # Test Future.result() after calling cancel() with a message.
709 cases = [
710 ((), ()),
711 ((None,), ()),
712 (('my message',), ('my message',)),
713 # Non-string values should roundtrip.
714 ((5,), (5,)),
715 ]
716 for cancel_args, expected_args in cases:
717 with self.subTest(cancel_args=cancel_args):
718 loop = asyncio.new_event_loop()
719 self.set_event_loop(loop)
720
721 async def sleep():
722 await asyncio.sleep(10)
723
724 async def coro():
725 task = self.new_task(loop, sleep())
726 await asyncio.sleep(0)
727 task.cancel(*cancel_args)
728 done, pending = await asyncio.wait([task])
729 task.result()
730
731 task = self.new_task(loop, coro())
732 with self.assertRaises(asyncio.CancelledError) as cm:
733 loop.run_until_complete(task)
734 exc = cm.exception
735 self.assertEqual(exc.args, expected_args)
736
737 actual = get_innermost_context(exc)
738 self.assertEqual(actual,
739 (asyncio.CancelledError, expected_args, 0))
740
741 def test_cancel_with_message_then_future_exception(self):
742 # Test Future.exception() after calling cancel() with a message.
743 cases = [
744 ((), ()),
745 ((None,), ()),
746 (('my message',), ('my message',)),
747 # Non-string values should roundtrip.
748 ((5,), (5,)),
749 ]
750 for cancel_args, expected_args in cases:
751 with self.subTest(cancel_args=cancel_args):
752 loop = asyncio.new_event_loop()
753 self.set_event_loop(loop)
754
755 async def sleep():
756 await asyncio.sleep(10)
757
758 async def coro():
759 task = self.new_task(loop, sleep())
760 await asyncio.sleep(0)
761 task.cancel(*cancel_args)
762 done, pending = await asyncio.wait([task])
763 task.exception()
764
765 task = self.new_task(loop, coro())
766 with self.assertRaises(asyncio.CancelledError) as cm:
767 loop.run_until_complete(task)
768 exc = cm.exception
769 self.assertEqual(exc.args, expected_args)
770
771 actual = get_innermost_context(exc)
772 self.assertEqual(actual,
773 (asyncio.CancelledError, expected_args, 0))
774
775 def test_cancellation_exception_context(self):
776 loop = asyncio.new_event_loop()
777 self.set_event_loop(loop)
778 fut = loop.create_future()
779
780 async def sleep():
781 fut.set_result(None)
782 await asyncio.sleep(10)
783
784 async def coro():
785 inner_task = self.new_task(loop, sleep())
786 await fut
787 loop.call_soon(inner_task.cancel, 'msg')
788 try:
789 await inner_task
790 except asyncio.CancelledError as ex:
791 raise ValueError("cancelled") from ex
792
793 task = self.new_task(loop, coro())
794 with self.assertRaises(ValueError) as cm:
795 loop.run_until_complete(task)
796 exc = cm.exception
797 self.assertEqual(exc.args, ('cancelled',))
798
799 actual = get_innermost_context(exc)
800 self.assertEqual(actual,
801 (asyncio.CancelledError, ('msg',), 1))
802
803 def test_cancel_with_message_before_starting_task(self):
804 loop = asyncio.new_event_loop()
805 self.set_event_loop(loop)
806
807 async def sleep():
808 await asyncio.sleep(10)
809
810 async def coro():
811 task = self.new_task(loop, sleep())
812 # We deliberately leave out the sleep here.
813 task.cancel('my message')
814 done, pending = await asyncio.wait([task])
815 task.exception()
816
817 task = self.new_task(loop, coro())
818 with self.assertRaises(asyncio.CancelledError) as cm:
819 loop.run_until_complete(task)
820 exc = cm.exception
821 self.assertEqual(exc.args, ('my message',))
822
823 actual = get_innermost_context(exc)
824 self.assertEqual(actual,
825 (asyncio.CancelledError, ('my message',), 0))
826
827 def test_cancel_yield(self):
828 async def task():
829 await asyncio.sleep(0)
830 await asyncio.sleep(0)
831 return 12
832
833 t = self.new_task(self.loop, task())
834 test_utils.run_briefly(self.loop) # start coro
835 t.cancel()
836 self.assertRaises(
837 asyncio.CancelledError, self.loop.run_until_complete, t)
838 self.assertTrue(t.done())
839 self.assertTrue(t.cancelled())
840 self.assertFalse(t.cancel())
841
842 def test_cancel_inner_future(self):
843 f = self.new_future(self.loop)
844
845 async def task():
846 await f
847 return 12
848
849 t = self.new_task(self.loop, task())
850 test_utils.run_briefly(self.loop) # start task
851 f.cancel()
852 with self.assertRaises(asyncio.CancelledError):
853 self.loop.run_until_complete(t)
854 self.assertTrue(f.cancelled())
855 self.assertTrue(t.cancelled())
856
857 def test_cancel_both_task_and_inner_future(self):
858 f = self.new_future(self.loop)
859
860 async def task():
861 await f
862 return 12
863
864 t = self.new_task(self.loop, task())
865 test_utils.run_briefly(self.loop)
866
867 f.cancel()
868 t.cancel()
869
870 with self.assertRaises(asyncio.CancelledError):
871 self.loop.run_until_complete(t)
872
873 self.assertTrue(t.done())
874 self.assertTrue(f.cancelled())
875 self.assertTrue(t.cancelled())
876
877 def test_cancel_task_catching(self):
878 fut1 = self.new_future(self.loop)
879 fut2 = self.new_future(self.loop)
880
881 async def task():
882 await fut1
883 try:
884 await fut2
885 except asyncio.CancelledError:
886 return 42
887
888 t = self.new_task(self.loop, task())
889 test_utils.run_briefly(self.loop)
890 self.assertIs(t._fut_waiter, fut1) # White-box test.
891 fut1.set_result(None)
892 test_utils.run_briefly(self.loop)
893 self.assertIs(t._fut_waiter, fut2) # White-box test.
894 t.cancel()
895 self.assertTrue(fut2.cancelled())
896 res = self.loop.run_until_complete(t)
897 self.assertEqual(res, 42)
898 self.assertFalse(t.cancelled())
899
900 def test_cancel_task_ignoring(self):
901 fut1 = self.new_future(self.loop)
902 fut2 = self.new_future(self.loop)
903 fut3 = self.new_future(self.loop)
904
905 async def task():
906 await fut1
907 try:
908 await fut2
909 except asyncio.CancelledError:
910 pass
911 res = await fut3
912 return res
913
914 t = self.new_task(self.loop, task())
915 test_utils.run_briefly(self.loop)
916 self.assertIs(t._fut_waiter, fut1) # White-box test.
917 fut1.set_result(None)
918 test_utils.run_briefly(self.loop)
919 self.assertIs(t._fut_waiter, fut2) # White-box test.
920 t.cancel()
921 self.assertTrue(fut2.cancelled())
922 test_utils.run_briefly(self.loop)
923 self.assertIs(t._fut_waiter, fut3) # White-box test.
924 fut3.set_result(42)
925 res = self.loop.run_until_complete(t)
926 self.assertEqual(res, 42)
927 self.assertFalse(fut3.cancelled())
928 self.assertFalse(t.cancelled())
929
930 def test_cancel_current_task(self):
931 loop = asyncio.new_event_loop()
932 self.set_event_loop(loop)
933
934 async def task():
935 t.cancel()
936 self.assertTrue(t._must_cancel) # White-box test.
937 # The sleep should be cancelled immediately.
938 await asyncio.sleep(100)
939 return 12
940
941 t = self.new_task(loop, task())
942 self.assertFalse(t.cancelled())
943 self.assertRaises(
944 asyncio.CancelledError, loop.run_until_complete, t)
945 self.assertTrue(t.done())
946 self.assertTrue(t.cancelled())
947 self.assertFalse(t._must_cancel) # White-box test.
948 self.assertFalse(t.cancel())
949
950 def test_cancel_at_end(self):
951 """coroutine end right after task is cancelled"""
952 loop = asyncio.new_event_loop()
953 self.set_event_loop(loop)
954
955 async def task():
956 t.cancel()
957 self.assertTrue(t._must_cancel) # White-box test.
958 return 12
959
960 t = self.new_task(loop, task())
961 self.assertFalse(t.cancelled())
962 self.assertRaises(
963 asyncio.CancelledError, loop.run_until_complete, t)
964 self.assertTrue(t.done())
965 self.assertTrue(t.cancelled())
966 self.assertFalse(t._must_cancel) # White-box test.
967 self.assertFalse(t.cancel())
968
969 def test_cancel_awaited_task(self):
970 # This tests for a relatively rare condition when
971 # a task cancellation is requested for a task which is not
972 # currently blocked, such as a task cancelling itself.
973 # In this situation we must ensure that whatever next future
974 # or task the cancelled task blocks on is cancelled correctly
975 # as well. See also bpo-34872.
976 loop = asyncio.new_event_loop()
977 self.addCleanup(lambda: loop.close())
978
979 task = nested_task = None
980 fut = self.new_future(loop)
981
982 async def nested():
983 await fut
984
985 async def coro():
986 nonlocal nested_task
987 # Create a sub-task and wait for it to run.
988 nested_task = self.new_task(loop, nested())
989 await asyncio.sleep(0)
990
991 # Request the current task to be cancelled.
992 task.cancel()
993 # Block on the nested task, which should be immediately
994 # cancelled.
995 await nested_task
996
997 task = self.new_task(loop, coro())
998 with self.assertRaises(asyncio.CancelledError):
999 loop.run_until_complete(task)
1000
1001 self.assertTrue(task.cancelled())
1002 self.assertTrue(nested_task.cancelled())
1003 self.assertTrue(fut.cancelled())
1004
1005 def assert_text_contains(self, text, substr):
1006 if substr not in text:
1007 raise RuntimeError(f'text {substr!r} not found in:\n>>>{text}<<<')
1008
1009 def test_cancel_traceback_for_future_result(self):
1010 # When calling Future.result() on a cancelled task, check that the
1011 # line of code that was interrupted is included in the traceback.
1012 loop = asyncio.new_event_loop()
1013 self.set_event_loop(loop)
1014
1015 async def nested():
1016 # This will get cancelled immediately.
1017 await asyncio.sleep(10)
1018
1019 async def coro():
1020 task = self.new_task(loop, nested())
1021 await asyncio.sleep(0)
1022 task.cancel()
1023 await task # search target
1024
1025 task = self.new_task(loop, coro())
1026 try:
1027 loop.run_until_complete(task)
1028 except asyncio.CancelledError:
1029 tb = traceback.format_exc()
1030 self.assert_text_contains(tb, "await asyncio.sleep(10)")
1031 # The intermediate await should also be included.
1032 self.assert_text_contains(tb, "await task # search target")
1033 else:
1034 self.fail('CancelledError did not occur')
1035
1036 def test_cancel_traceback_for_future_exception(self):
1037 # When calling Future.exception() on a cancelled task, check that the
1038 # line of code that was interrupted is included in the traceback.
1039 loop = asyncio.new_event_loop()
1040 self.set_event_loop(loop)
1041
1042 async def nested():
1043 # This will get cancelled immediately.
1044 await asyncio.sleep(10)
1045
1046 async def coro():
1047 task = self.new_task(loop, nested())
1048 await asyncio.sleep(0)
1049 task.cancel()
1050 done, pending = await asyncio.wait([task])
1051 task.exception() # search target
1052
1053 task = self.new_task(loop, coro())
1054 try:
1055 loop.run_until_complete(task)
1056 except asyncio.CancelledError:
1057 tb = traceback.format_exc()
1058 self.assert_text_contains(tb, "await asyncio.sleep(10)")
1059 # The intermediate await should also be included.
1060 self.assert_text_contains(tb,
1061 "task.exception() # search target")
1062 else:
1063 self.fail('CancelledError did not occur')
1064
1065 def test_stop_while_run_in_complete(self):
1066
1067 def gen():
1068 when = yield
1069 self.assertAlmostEqual(0.1, when)
1070 when = yield 0.1
1071 self.assertAlmostEqual(0.2, when)
1072 when = yield 0.1
1073 self.assertAlmostEqual(0.3, when)
1074 yield 0.1
1075
1076 loop = self.new_test_loop(gen)
1077
1078 x = 0
1079
1080 async def task():
1081 nonlocal x
1082 while x < 10:
1083 await asyncio.sleep(0.1)
1084 x += 1
1085 if x == 2:
1086 loop.stop()
1087
1088 t = self.new_task(loop, task())
1089 with self.assertRaises(RuntimeError) as cm:
1090 loop.run_until_complete(t)
1091 self.assertEqual(str(cm.exception),
1092 'Event loop stopped before Future completed.')
1093 self.assertFalse(t.done())
1094 self.assertEqual(x, 2)
1095 self.assertAlmostEqual(0.3, loop.time())
1096
1097 t.cancel()
1098 self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t)
1099
1100 def test_log_traceback(self):
1101 async def coro():
1102 pass
1103
1104 task = self.new_task(self.loop, coro())
1105 with self.assertRaisesRegex(ValueError, 'can only be set to False'):
1106 task._log_traceback = True
1107 self.loop.run_until_complete(task)
1108
1109 def test_wait(self):
1110
1111 def gen():
1112 when = yield
1113 self.assertAlmostEqual(0.1, when)
1114 when = yield 0
1115 self.assertAlmostEqual(0.15, when)
1116 yield 0.15
1117
1118 loop = self.new_test_loop(gen)
1119
1120 a = self.new_task(loop, asyncio.sleep(0.1))
1121 b = self.new_task(loop, asyncio.sleep(0.15))
1122
1123 async def foo():
1124 done, pending = await asyncio.wait([b, a])
1125 self.assertEqual(done, set([a, b]))
1126 self.assertEqual(pending, set())
1127 return 42
1128
1129 res = loop.run_until_complete(self.new_task(loop, foo()))
1130 self.assertEqual(res, 42)
1131 self.assertAlmostEqual(0.15, loop.time())
1132
1133 # Doing it again should take no time and exercise a different path.
1134 res = loop.run_until_complete(self.new_task(loop, foo()))
1135 self.assertAlmostEqual(0.15, loop.time())
1136 self.assertEqual(res, 42)
1137
1138 def test_wait_duplicate_coroutines(self):
1139
1140 async def coro(s):
1141 return s
1142 c = self.loop.create_task(coro('test'))
1143 task = self.new_task(
1144 self.loop,
1145 asyncio.wait([c, c, self.loop.create_task(coro('spam'))]))
1146
1147 done, pending = self.loop.run_until_complete(task)
1148
1149 self.assertFalse(pending)
1150 self.assertEqual(set(f.result() for f in done), {'test', 'spam'})
1151
1152 def test_wait_errors(self):
1153 self.assertRaises(
1154 ValueError, self.loop.run_until_complete,
1155 asyncio.wait(set()))
1156
1157 # -1 is an invalid return_when value
1158 sleep_coro = asyncio.sleep(10.0)
1159 wait_coro = asyncio.wait([sleep_coro], return_when=-1)
1160 self.assertRaises(ValueError,
1161 self.loop.run_until_complete, wait_coro)
1162
1163 sleep_coro.close()
1164
1165 def test_wait_first_completed(self):
1166
1167 def gen():
1168 when = yield
1169 self.assertAlmostEqual(10.0, when)
1170 when = yield 0
1171 self.assertAlmostEqual(0.1, when)
1172 yield 0.1
1173
1174 loop = self.new_test_loop(gen)
1175
1176 a = self.new_task(loop, asyncio.sleep(10.0))
1177 b = self.new_task(loop, asyncio.sleep(0.1))
1178 task = self.new_task(
1179 loop,
1180 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1181
1182 done, pending = loop.run_until_complete(task)
1183 self.assertEqual({b}, done)
1184 self.assertEqual({a}, pending)
1185 self.assertFalse(a.done())
1186 self.assertTrue(b.done())
1187 self.assertIsNone(b.result())
1188 self.assertAlmostEqual(0.1, loop.time())
1189
1190 # move forward to close generator
1191 loop.advance_time(10)
1192 loop.run_until_complete(asyncio.wait([a, b]))
1193
1194 def test_wait_really_done(self):
1195 # there is possibility that some tasks in the pending list
1196 # became done but their callbacks haven't all been called yet
1197
1198 async def coro1():
1199 await asyncio.sleep(0)
1200
1201 async def coro2():
1202 await asyncio.sleep(0)
1203 await asyncio.sleep(0)
1204
1205 a = self.new_task(self.loop, coro1())
1206 b = self.new_task(self.loop, coro2())
1207 task = self.new_task(
1208 self.loop,
1209 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1210
1211 done, pending = self.loop.run_until_complete(task)
1212 self.assertEqual({a, b}, done)
1213 self.assertTrue(a.done())
1214 self.assertIsNone(a.result())
1215 self.assertTrue(b.done())
1216 self.assertIsNone(b.result())
1217
1218 def test_wait_first_exception(self):
1219
1220 def gen():
1221 when = yield
1222 self.assertAlmostEqual(10.0, when)
1223 yield 0
1224
1225 loop = self.new_test_loop(gen)
1226
1227 # first_exception, task already has exception
1228 a = self.new_task(loop, asyncio.sleep(10.0))
1229
1230 async def exc():
1231 raise ZeroDivisionError('err')
1232
1233 b = self.new_task(loop, exc())
1234 task = self.new_task(
1235 loop,
1236 asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION))
1237
1238 done, pending = loop.run_until_complete(task)
1239 self.assertEqual({b}, done)
1240 self.assertEqual({a}, pending)
1241 self.assertAlmostEqual(0, loop.time())
1242
1243 # move forward to close generator
1244 loop.advance_time(10)
1245 loop.run_until_complete(asyncio.wait([a, b]))
1246
1247 def test_wait_first_exception_in_wait(self):
1248
1249 def gen():
1250 when = yield
1251 self.assertAlmostEqual(10.0, when)
1252 when = yield 0
1253 self.assertAlmostEqual(0.01, when)
1254 yield 0.01
1255
1256 loop = self.new_test_loop(gen)
1257
1258 # first_exception, exception during waiting
1259 a = self.new_task(loop, asyncio.sleep(10.0))
1260
1261 async def exc():
1262 await asyncio.sleep(0.01)
1263 raise ZeroDivisionError('err')
1264
1265 b = self.new_task(loop, exc())
1266 task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)
1267
1268 done, pending = loop.run_until_complete(task)
1269 self.assertEqual({b}, done)
1270 self.assertEqual({a}, pending)
1271 self.assertAlmostEqual(0.01, loop.time())
1272
1273 # move forward to close generator
1274 loop.advance_time(10)
1275 loop.run_until_complete(asyncio.wait([a, b]))
1276
1277 def test_wait_with_exception(self):
1278
1279 def gen():
1280 when = yield
1281 self.assertAlmostEqual(0.1, when)
1282 when = yield 0
1283 self.assertAlmostEqual(0.15, when)
1284 yield 0.15
1285
1286 loop = self.new_test_loop(gen)
1287
1288 a = self.new_task(loop, asyncio.sleep(0.1))
1289
1290 async def sleeper():
1291 await asyncio.sleep(0.15)
1292 raise ZeroDivisionError('really')
1293
1294 b = self.new_task(loop, sleeper())
1295
1296 async def foo():
1297 done, pending = await asyncio.wait([b, a])
1298 self.assertEqual(len(done), 2)
1299 self.assertEqual(pending, set())
1300 errors = set(f for f in done if f.exception() is not None)
1301 self.assertEqual(len(errors), 1)
1302
1303 loop.run_until_complete(self.new_task(loop, foo()))
1304 self.assertAlmostEqual(0.15, loop.time())
1305
1306 loop.run_until_complete(self.new_task(loop, foo()))
1307 self.assertAlmostEqual(0.15, loop.time())
1308
1309 def test_wait_with_timeout(self):
1310
1311 def gen():
1312 when = yield
1313 self.assertAlmostEqual(0.1, when)
1314 when = yield 0
1315 self.assertAlmostEqual(0.15, when)
1316 when = yield 0
1317 self.assertAlmostEqual(0.11, when)
1318 yield 0.11
1319
1320 loop = self.new_test_loop(gen)
1321
1322 a = self.new_task(loop, asyncio.sleep(0.1))
1323 b = self.new_task(loop, asyncio.sleep(0.15))
1324
1325 async def foo():
1326 done, pending = await asyncio.wait([b, a], timeout=0.11)
1327 self.assertEqual(done, set([a]))
1328 self.assertEqual(pending, set([b]))
1329
1330 loop.run_until_complete(self.new_task(loop, foo()))
1331 self.assertAlmostEqual(0.11, loop.time())
1332
1333 # move forward to close generator
1334 loop.advance_time(10)
1335 loop.run_until_complete(asyncio.wait([a, b]))
1336
1337 def test_wait_concurrent_complete(self):
1338
1339 def gen():
1340 when = yield
1341 self.assertAlmostEqual(0.1, when)
1342 when = yield 0
1343 self.assertAlmostEqual(0.15, when)
1344 when = yield 0
1345 self.assertAlmostEqual(0.1, when)
1346 yield 0.1
1347
1348 loop = self.new_test_loop(gen)
1349
1350 a = self.new_task(loop, asyncio.sleep(0.1))
1351 b = self.new_task(loop, asyncio.sleep(0.15))
1352
1353 done, pending = loop.run_until_complete(
1354 asyncio.wait([b, a], timeout=0.1))
1355
1356 self.assertEqual(done, set([a]))
1357 self.assertEqual(pending, set([b]))
1358 self.assertAlmostEqual(0.1, loop.time())
1359
1360 # move forward to close generator
1361 loop.advance_time(10)
1362 loop.run_until_complete(asyncio.wait([a, b]))
1363
1364 def test_wait_with_iterator_of_tasks(self):
1365
1366 def gen():
1367 when = yield
1368 self.assertAlmostEqual(0.1, when)
1369 when = yield 0
1370 self.assertAlmostEqual(0.15, when)
1371 yield 0.15
1372
1373 loop = self.new_test_loop(gen)
1374
1375 a = self.new_task(loop, asyncio.sleep(0.1))
1376 b = self.new_task(loop, asyncio.sleep(0.15))
1377
1378 async def foo():
1379 done, pending = await asyncio.wait(iter([b, a]))
1380 self.assertEqual(done, set([a, b]))
1381 self.assertEqual(pending, set())
1382 return 42
1383
1384 res = loop.run_until_complete(self.new_task(loop, foo()))
1385 self.assertEqual(res, 42)
1386 self.assertAlmostEqual(0.15, loop.time())
1387
1388
1389 def test_wait_generator(self):
1390 async def func(a):
1391 return a
1392
1393 loop = self.new_test_loop()
1394
1395 async def main():
1396 tasks = (self.new_task(loop, func(i)) for i in range(10))
1397 done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
1398 self.assertEqual(len(done), 10)
1399 self.assertEqual(len(pending), 0)
1400
1401 loop.run_until_complete(main())
1402
1403
1404 def test_as_completed(self):
1405
1406 def gen():
1407 yield 0
1408 yield 0
1409 yield 0.01
1410 yield 0
1411
1412 loop = self.new_test_loop(gen)
1413 # disable "slow callback" warning
1414 loop.slow_callback_duration = 1.0
1415 completed = set()
1416 time_shifted = False
1417
1418 async def sleeper(dt, x):
1419 nonlocal time_shifted
1420 await asyncio.sleep(dt)
1421 completed.add(x)
1422 if not time_shifted and 'a' in completed and 'b' in completed:
1423 time_shifted = True
1424 loop.advance_time(0.14)
1425 return x
1426
1427 a = sleeper(0.01, 'a')
1428 b = sleeper(0.01, 'b')
1429 c = sleeper(0.15, 'c')
1430
1431 async def foo():
1432 values = []
1433 for f in asyncio.as_completed([b, c, a]):
1434 values.append(await f)
1435 return values
1436
1437 res = loop.run_until_complete(self.new_task(loop, foo()))
1438 self.assertAlmostEqual(0.15, loop.time())
1439 self.assertTrue('a' in res[:2])
1440 self.assertTrue('b' in res[:2])
1441 self.assertEqual(res[2], 'c')
1442
1443 def test_as_completed_with_timeout(self):
1444
1445 def gen():
1446 yield
1447 yield 0
1448 yield 0
1449 yield 0.1
1450
1451 loop = self.new_test_loop(gen)
1452
1453 a = loop.create_task(asyncio.sleep(0.1, 'a'))
1454 b = loop.create_task(asyncio.sleep(0.15, 'b'))
1455
1456 async def foo():
1457 values = []
1458 for f in asyncio.as_completed([a, b], timeout=0.12):
1459 if values:
1460 loop.advance_time(0.02)
1461 try:
1462 v = await f
1463 values.append((1, v))
1464 except asyncio.TimeoutError as exc:
1465 values.append((2, exc))
1466 return values
1467
1468 res = loop.run_until_complete(self.new_task(loop, foo()))
1469 self.assertEqual(len(res), 2, res)
1470 self.assertEqual(res[0], (1, 'a'))
1471 self.assertEqual(res[1][0], 2)
1472 self.assertIsInstance(res[1][1], asyncio.TimeoutError)
1473 self.assertAlmostEqual(0.12, loop.time())
1474
1475 # move forward to close generator
1476 loop.advance_time(10)
1477 loop.run_until_complete(asyncio.wait([a, b]))
1478
1479 def test_as_completed_with_unused_timeout(self):
1480
1481 def gen():
1482 yield
1483 yield 0
1484 yield 0.01
1485
1486 loop = self.new_test_loop(gen)
1487
1488 a = asyncio.sleep(0.01, 'a')
1489
1490 async def foo():
1491 for f in asyncio.as_completed([a], timeout=1):
1492 v = await f
1493 self.assertEqual(v, 'a')
1494
1495 loop.run_until_complete(self.new_task(loop, foo()))
1496
1497 def test_as_completed_reverse_wait(self):
1498
1499 def gen():
1500 yield 0
1501 yield 0.05
1502 yield 0
1503
1504 loop = self.new_test_loop(gen)
1505
1506 a = asyncio.sleep(0.05, 'a')
1507 b = asyncio.sleep(0.10, 'b')
1508 fs = {a, b}
1509
1510 async def test():
1511 futs = list(asyncio.as_completed(fs))
1512 self.assertEqual(len(futs), 2)
1513
1514 x = await futs[1]
1515 self.assertEqual(x, 'a')
1516 self.assertAlmostEqual(0.05, loop.time())
1517 loop.advance_time(0.05)
1518 y = await futs[0]
1519 self.assertEqual(y, 'b')
1520 self.assertAlmostEqual(0.10, loop.time())
1521
1522 loop.run_until_complete(test())
1523
1524 def test_as_completed_concurrent(self):
1525
1526 def gen():
1527 when = yield
1528 self.assertAlmostEqual(0.05, when)
1529 when = yield 0
1530 self.assertAlmostEqual(0.05, when)
1531 yield 0.05
1532
1533 a = asyncio.sleep(0.05, 'a')
1534 b = asyncio.sleep(0.05, 'b')
1535 fs = {a, b}
1536
1537 async def test():
1538 futs = list(asyncio.as_completed(fs))
1539 self.assertEqual(len(futs), 2)
1540 done, pending = await asyncio.wait(
1541 [asyncio.ensure_future(fut) for fut in futs]
1542 )
1543 self.assertEqual(set(f.result() for f in done), {'a', 'b'})
1544
1545 loop = self.new_test_loop(gen)
1546 loop.run_until_complete(test())
1547
1548 def test_as_completed_duplicate_coroutines(self):
1549
1550 async def coro(s):
1551 return s
1552
1553 async def runner():
1554 result = []
1555 c = coro('ham')
1556 for f in asyncio.as_completed([c, c, coro('spam')]):
1557 result.append(await f)
1558 return result
1559
1560 fut = self.new_task(self.loop, runner())
1561 self.loop.run_until_complete(fut)
1562 result = fut.result()
1563 self.assertEqual(set(result), {'ham', 'spam'})
1564 self.assertEqual(len(result), 2)
1565
1566 def test_as_completed_coroutine_without_loop(self):
1567 async def coro():
1568 return 42
1569
1570 a = coro()
1571 self.addCleanup(a.close)
1572
1573 futs = asyncio.as_completed([a])
1574 with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
1575 list(futs)
1576
1577 def test_as_completed_coroutine_use_running_loop(self):
1578 loop = self.new_test_loop()
1579
1580 async def coro():
1581 return 42
1582
1583 async def test():
1584 futs = list(asyncio.as_completed([coro()]))
1585 self.assertEqual(len(futs), 1)
1586 self.assertEqual(await futs[0], 42)
1587
1588 loop.run_until_complete(test())
1589
1590 def test_sleep(self):
1591
1592 def gen():
1593 when = yield
1594 self.assertAlmostEqual(0.05, when)
1595 when = yield 0.05
1596 self.assertAlmostEqual(0.1, when)
1597 yield 0.05
1598
1599 loop = self.new_test_loop(gen)
1600
1601 async def sleeper(dt, arg):
1602 await asyncio.sleep(dt/2)
1603 res = await asyncio.sleep(dt/2, arg)
1604 return res
1605
1606 t = self.new_task(loop, sleeper(0.1, 'yeah'))
1607 loop.run_until_complete(t)
1608 self.assertTrue(t.done())
1609 self.assertEqual(t.result(), 'yeah')
1610 self.assertAlmostEqual(0.1, loop.time())
1611
1612 def test_sleep_cancel(self):
1613
1614 def gen():
1615 when = yield
1616 self.assertAlmostEqual(10.0, when)
1617 yield 0
1618
1619 loop = self.new_test_loop(gen)
1620
1621 t = self.new_task(loop, asyncio.sleep(10.0, 'yeah'))
1622
1623 handle = None
1624 orig_call_later = loop.call_later
1625
1626 def call_later(delay, callback, *args):
1627 nonlocal handle
1628 handle = orig_call_later(delay, callback, *args)
1629 return handle
1630
1631 loop.call_later = call_later
1632 test_utils.run_briefly(loop)
1633
1634 self.assertFalse(handle._cancelled)
1635
1636 t.cancel()
1637 test_utils.run_briefly(loop)
1638 self.assertTrue(handle._cancelled)
1639
1640 def test_task_cancel_sleeping_task(self):
1641
1642 def gen():
1643 when = yield
1644 self.assertAlmostEqual(0.1, when)
1645 when = yield 0
1646 self.assertAlmostEqual(5000, when)
1647 yield 0.1
1648
1649 loop = self.new_test_loop(gen)
1650
1651 async def sleep(dt):
1652 await asyncio.sleep(dt)
1653
1654 async def doit():
1655 sleeper = self.new_task(loop, sleep(5000))
1656 loop.call_later(0.1, sleeper.cancel)
1657 try:
1658 await sleeper
1659 except asyncio.CancelledError:
1660 return 'cancelled'
1661 else:
1662 return 'slept in'
1663
1664 doer = doit()
1665 self.assertEqual(loop.run_until_complete(doer), 'cancelled')
1666 self.assertAlmostEqual(0.1, loop.time())
1667
1668 def test_task_cancel_waiter_future(self):
1669 fut = self.new_future(self.loop)
1670
1671 async def coro():
1672 await fut
1673
1674 task = self.new_task(self.loop, coro())
1675 test_utils.run_briefly(self.loop)
1676 self.assertIs(task._fut_waiter, fut)
1677
1678 task.cancel()
1679 test_utils.run_briefly(self.loop)
1680 self.assertRaises(
1681 asyncio.CancelledError, self.loop.run_until_complete, task)
1682 self.assertIsNone(task._fut_waiter)
1683 self.assertTrue(fut.cancelled())
1684
1685 def test_task_set_methods(self):
1686 async def notmuch():
1687 return 'ko'
1688
1689 gen = notmuch()
1690 task = self.new_task(self.loop, gen)
1691
1692 with self.assertRaisesRegex(RuntimeError, 'not support set_result'):
1693 task.set_result('ok')
1694
1695 with self.assertRaisesRegex(RuntimeError, 'not support set_exception'):
1696 task.set_exception(ValueError())
1697
1698 self.assertEqual(
1699 self.loop.run_until_complete(task),
1700 'ko')
1701
1702 def test_step_result_future(self):
1703 # If coroutine returns future, task waits on this future.
1704
1705 class ESC[4;38;5;81mFut(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mFuture):
1706 def __init__(self, *args, **kwds):
1707 self.cb_added = False
1708 super().__init__(*args, **kwds)
1709
1710 def add_done_callback(self, *args, **kwargs):
1711 self.cb_added = True
1712 super().add_done_callback(*args, **kwargs)
1713
1714 fut = Fut(loop=self.loop)
1715 result = None
1716
1717 async def wait_for_future():
1718 nonlocal result
1719 result = await fut
1720
1721 t = self.new_task(self.loop, wait_for_future())
1722 test_utils.run_briefly(self.loop)
1723 self.assertTrue(fut.cb_added)
1724
1725 res = object()
1726 fut.set_result(res)
1727 test_utils.run_briefly(self.loop)
1728 self.assertIs(res, result)
1729 self.assertTrue(t.done())
1730 self.assertIsNone(t.result())
1731
1732 def test_baseexception_during_cancel(self):
1733
1734 def gen():
1735 when = yield
1736 self.assertAlmostEqual(10.0, when)
1737 yield 0
1738
1739 loop = self.new_test_loop(gen)
1740
1741 async def sleeper():
1742 await asyncio.sleep(10)
1743
1744 base_exc = SystemExit()
1745
1746 async def notmutch():
1747 try:
1748 await sleeper()
1749 except asyncio.CancelledError:
1750 raise base_exc
1751
1752 task = self.new_task(loop, notmutch())
1753 test_utils.run_briefly(loop)
1754
1755 task.cancel()
1756 self.assertFalse(task.done())
1757
1758 self.assertRaises(SystemExit, test_utils.run_briefly, loop)
1759
1760 self.assertTrue(task.done())
1761 self.assertFalse(task.cancelled())
1762 self.assertIs(task.exception(), base_exc)
1763
1764 def test_iscoroutinefunction(self):
1765 def fn():
1766 pass
1767
1768 self.assertFalse(asyncio.iscoroutinefunction(fn))
1769
1770 def fn1():
1771 yield
1772 self.assertFalse(asyncio.iscoroutinefunction(fn1))
1773
1774 async def fn2():
1775 pass
1776 self.assertTrue(asyncio.iscoroutinefunction(fn2))
1777
1778 self.assertFalse(asyncio.iscoroutinefunction(mock.Mock()))
1779 self.assertTrue(asyncio.iscoroutinefunction(mock.AsyncMock()))
1780
1781 def test_coroutine_non_gen_function(self):
1782 async def func():
1783 return 'test'
1784
1785 self.assertTrue(asyncio.iscoroutinefunction(func))
1786
1787 coro = func()
1788 self.assertTrue(asyncio.iscoroutine(coro))
1789
1790 res = self.loop.run_until_complete(coro)
1791 self.assertEqual(res, 'test')
1792
1793 def test_coroutine_non_gen_function_return_future(self):
1794 fut = self.new_future(self.loop)
1795
1796 async def func():
1797 return fut
1798
1799 async def coro():
1800 fut.set_result('test')
1801
1802 t1 = self.new_task(self.loop, func())
1803 t2 = self.new_task(self.loop, coro())
1804 res = self.loop.run_until_complete(t1)
1805 self.assertEqual(res, fut)
1806 self.assertIsNone(t2.result())
1807
1808 def test_current_task(self):
1809 self.assertIsNone(asyncio.current_task(loop=self.loop))
1810
1811 async def coro(loop):
1812 self.assertIs(asyncio.current_task(), task)
1813
1814 self.assertIs(asyncio.current_task(None), task)
1815 self.assertIs(asyncio.current_task(), task)
1816
1817 task = self.new_task(self.loop, coro(self.loop))
1818 self.loop.run_until_complete(task)
1819 self.assertIsNone(asyncio.current_task(loop=self.loop))
1820
1821 def test_current_task_with_interleaving_tasks(self):
1822 self.assertIsNone(asyncio.current_task(loop=self.loop))
1823
1824 fut1 = self.new_future(self.loop)
1825 fut2 = self.new_future(self.loop)
1826
1827 async def coro1(loop):
1828 self.assertTrue(asyncio.current_task() is task1)
1829 await fut1
1830 self.assertTrue(asyncio.current_task() is task1)
1831 fut2.set_result(True)
1832
1833 async def coro2(loop):
1834 self.assertTrue(asyncio.current_task() is task2)
1835 fut1.set_result(True)
1836 await fut2
1837 self.assertTrue(asyncio.current_task() is task2)
1838
1839 task1 = self.new_task(self.loop, coro1(self.loop))
1840 task2 = self.new_task(self.loop, coro2(self.loop))
1841
1842 self.loop.run_until_complete(asyncio.wait((task1, task2)))
1843 self.assertIsNone(asyncio.current_task(loop=self.loop))
1844
1845 # Some thorough tests for cancellation propagation through
1846 # coroutines, tasks and wait().
1847
1848 def test_yield_future_passes_cancel(self):
1849 # Cancelling outer() cancels inner() cancels waiter.
1850 proof = 0
1851 waiter = self.new_future(self.loop)
1852
1853 async def inner():
1854 nonlocal proof
1855 try:
1856 await waiter
1857 except asyncio.CancelledError:
1858 proof += 1
1859 raise
1860 else:
1861 self.fail('got past sleep() in inner()')
1862
1863 async def outer():
1864 nonlocal proof
1865 try:
1866 await inner()
1867 except asyncio.CancelledError:
1868 proof += 100 # Expect this path.
1869 else:
1870 proof += 10
1871
1872 f = asyncio.ensure_future(outer(), loop=self.loop)
1873 test_utils.run_briefly(self.loop)
1874 f.cancel()
1875 self.loop.run_until_complete(f)
1876 self.assertEqual(proof, 101)
1877 self.assertTrue(waiter.cancelled())
1878
1879 def test_yield_wait_does_not_shield_cancel(self):
1880 # Cancelling outer() makes wait() return early, leaves inner()
1881 # running.
1882 proof = 0
1883 waiter = self.new_future(self.loop)
1884
1885 async def inner():
1886 nonlocal proof
1887 await waiter
1888 proof += 1
1889
1890 async def outer():
1891 nonlocal proof
1892 with self.assertWarns(DeprecationWarning):
1893 d, p = await asyncio.wait([asyncio.create_task(inner())])
1894 proof += 100
1895
1896 f = asyncio.ensure_future(outer(), loop=self.loop)
1897 test_utils.run_briefly(self.loop)
1898 f.cancel()
1899 self.assertRaises(
1900 asyncio.CancelledError, self.loop.run_until_complete, f)
1901 waiter.set_result(None)
1902 test_utils.run_briefly(self.loop)
1903 self.assertEqual(proof, 1)
1904
1905 def test_shield_result(self):
1906 inner = self.new_future(self.loop)
1907 outer = asyncio.shield(inner)
1908 inner.set_result(42)
1909 res = self.loop.run_until_complete(outer)
1910 self.assertEqual(res, 42)
1911
1912 def test_shield_exception(self):
1913 inner = self.new_future(self.loop)
1914 outer = asyncio.shield(inner)
1915 test_utils.run_briefly(self.loop)
1916 exc = RuntimeError('expected')
1917 inner.set_exception(exc)
1918 test_utils.run_briefly(self.loop)
1919 self.assertIs(outer.exception(), exc)
1920
1921 def test_shield_cancel_inner(self):
1922 inner = self.new_future(self.loop)
1923 outer = asyncio.shield(inner)
1924 test_utils.run_briefly(self.loop)
1925 inner.cancel()
1926 test_utils.run_briefly(self.loop)
1927 self.assertTrue(outer.cancelled())
1928
1929 def test_shield_cancel_outer(self):
1930 inner = self.new_future(self.loop)
1931 outer = asyncio.shield(inner)
1932 test_utils.run_briefly(self.loop)
1933 outer.cancel()
1934 test_utils.run_briefly(self.loop)
1935 self.assertTrue(outer.cancelled())
1936 self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks))
1937
1938 def test_shield_shortcut(self):
1939 fut = self.new_future(self.loop)
1940 fut.set_result(42)
1941 res = self.loop.run_until_complete(asyncio.shield(fut))
1942 self.assertEqual(res, 42)
1943
1944 def test_shield_effect(self):
1945 # Cancelling outer() does not affect inner().
1946 proof = 0
1947 waiter = self.new_future(self.loop)
1948
1949 async def inner():
1950 nonlocal proof
1951 await waiter
1952 proof += 1
1953
1954 async def outer():
1955 nonlocal proof
1956 await asyncio.shield(inner())
1957 proof += 100
1958
1959 f = asyncio.ensure_future(outer(), loop=self.loop)
1960 test_utils.run_briefly(self.loop)
1961 f.cancel()
1962 with self.assertRaises(asyncio.CancelledError):
1963 self.loop.run_until_complete(f)
1964 waiter.set_result(None)
1965 test_utils.run_briefly(self.loop)
1966 self.assertEqual(proof, 1)
1967
1968 def test_shield_gather(self):
1969 child1 = self.new_future(self.loop)
1970 child2 = self.new_future(self.loop)
1971 parent = asyncio.gather(child1, child2)
1972 outer = asyncio.shield(parent)
1973 test_utils.run_briefly(self.loop)
1974 outer.cancel()
1975 test_utils.run_briefly(self.loop)
1976 self.assertTrue(outer.cancelled())
1977 child1.set_result(1)
1978 child2.set_result(2)
1979 test_utils.run_briefly(self.loop)
1980 self.assertEqual(parent.result(), [1, 2])
1981
1982 def test_gather_shield(self):
1983 child1 = self.new_future(self.loop)
1984 child2 = self.new_future(self.loop)
1985 inner1 = asyncio.shield(child1)
1986 inner2 = asyncio.shield(child2)
1987 parent = asyncio.gather(inner1, inner2)
1988 test_utils.run_briefly(self.loop)
1989 parent.cancel()
1990 # This should cancel inner1 and inner2 but bot child1 and child2.
1991 test_utils.run_briefly(self.loop)
1992 self.assertIsInstance(parent.exception(), asyncio.CancelledError)
1993 self.assertTrue(inner1.cancelled())
1994 self.assertTrue(inner2.cancelled())
1995 child1.set_result(1)
1996 child2.set_result(2)
1997 test_utils.run_briefly(self.loop)
1998
1999 def test_shield_coroutine_without_loop(self):
2000 async def coro():
2001 return 42
2002
2003 inner = coro()
2004 self.addCleanup(inner.close)
2005 with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
2006 asyncio.shield(inner)
2007
2008 def test_shield_coroutine_use_running_loop(self):
2009 async def coro():
2010 return 42
2011
2012 async def test():
2013 return asyncio.shield(coro())
2014 outer = self.loop.run_until_complete(test())
2015 self.assertEqual(outer._loop, self.loop)
2016 res = self.loop.run_until_complete(outer)
2017 self.assertEqual(res, 42)
2018
2019 def test_shield_coroutine_use_global_loop(self):
2020 # Deprecated in 3.10, undeprecated in 3.12
2021 async def coro():
2022 return 42
2023
2024 asyncio.set_event_loop(self.loop)
2025 self.addCleanup(asyncio.set_event_loop, None)
2026 outer = asyncio.shield(coro())
2027 self.assertEqual(outer._loop, self.loop)
2028 res = self.loop.run_until_complete(outer)
2029 self.assertEqual(res, 42)
2030
2031 def test_as_completed_invalid_args(self):
2032 fut = self.new_future(self.loop)
2033
2034 # as_completed() expects a list of futures, not a future instance
2035 self.assertRaises(TypeError, self.loop.run_until_complete,
2036 asyncio.as_completed(fut))
2037 coro = coroutine_function()
2038 self.assertRaises(TypeError, self.loop.run_until_complete,
2039 asyncio.as_completed(coro))
2040 coro.close()
2041
2042 def test_wait_invalid_args(self):
2043 fut = self.new_future(self.loop)
2044
2045 # wait() expects a list of futures, not a future instance
2046 self.assertRaises(TypeError, self.loop.run_until_complete,
2047 asyncio.wait(fut))
2048 coro = coroutine_function()
2049 self.assertRaises(TypeError, self.loop.run_until_complete,
2050 asyncio.wait(coro))
2051 coro.close()
2052
2053 # wait() expects at least a future
2054 self.assertRaises(ValueError, self.loop.run_until_complete,
2055 asyncio.wait([]))
2056
2057 def test_log_destroyed_pending_task(self):
2058 Task = self.__class__.Task
2059
2060 async def kill_me(loop):
2061 future = self.new_future(loop)
2062 await future
2063 # at this point, the only reference to kill_me() task is
2064 # the Task._wakeup() method in future._callbacks
2065 raise Exception("code never reached")
2066
2067 mock_handler = mock.Mock()
2068 self.loop.set_debug(True)
2069 self.loop.set_exception_handler(mock_handler)
2070
2071 # schedule the task
2072 coro = kill_me(self.loop)
2073 task = asyncio.ensure_future(coro, loop=self.loop)
2074
2075 self.assertEqual(asyncio.all_tasks(loop=self.loop), {task})
2076
2077 asyncio.set_event_loop(None)
2078
2079 # execute the task so it waits for future
2080 self.loop._run_once()
2081 self.assertEqual(len(self.loop._ready), 0)
2082
2083 coro = None
2084 source_traceback = task._source_traceback
2085 task = None
2086
2087 # no more reference to kill_me() task: the task is destroyed by the GC
2088 support.gc_collect()
2089
2090 self.assertEqual(asyncio.all_tasks(loop=self.loop), set())
2091
2092 mock_handler.assert_called_with(self.loop, {
2093 'message': 'Task was destroyed but it is pending!',
2094 'task': mock.ANY,
2095 'source_traceback': source_traceback,
2096 })
2097 mock_handler.reset_mock()
2098
2099 @mock.patch('asyncio.base_events.logger')
2100 def test_tb_logger_not_called_after_cancel(self, m_log):
2101 loop = asyncio.new_event_loop()
2102 self.set_event_loop(loop)
2103
2104 async def coro():
2105 raise TypeError
2106
2107 async def runner():
2108 task = self.new_task(loop, coro())
2109 await asyncio.sleep(0.05)
2110 task.cancel()
2111 task = None
2112
2113 loop.run_until_complete(runner())
2114 self.assertFalse(m_log.error.called)
2115
2116 def test_task_source_traceback(self):
2117 self.loop.set_debug(True)
2118
2119 task = self.new_task(self.loop, coroutine_function())
2120 lineno = sys._getframe().f_lineno - 1
2121 self.assertIsInstance(task._source_traceback, list)
2122 self.assertEqual(task._source_traceback[-2][:3],
2123 (__file__,
2124 lineno,
2125 'test_task_source_traceback'))
2126 self.loop.run_until_complete(task)
2127
2128 def test_cancel_gather_1(self):
2129 """Ensure that a gathering future refuses to be cancelled once all
2130 children are done"""
2131 loop = asyncio.new_event_loop()
2132 self.addCleanup(loop.close)
2133
2134 fut = self.new_future(loop)
2135 async def create():
2136 # The indirection fut->child_coro is needed since otherwise the
2137 # gathering task is done at the same time as the child future
2138 async def child_coro():
2139 return await fut
2140 gather_future = asyncio.gather(child_coro())
2141 return asyncio.ensure_future(gather_future)
2142 gather_task = loop.run_until_complete(create())
2143
2144 cancel_result = None
2145 def cancelling_callback(_):
2146 nonlocal cancel_result
2147 cancel_result = gather_task.cancel()
2148 fut.add_done_callback(cancelling_callback)
2149
2150 fut.set_result(42) # calls the cancelling_callback after fut is done()
2151
2152 # At this point the task should complete.
2153 loop.run_until_complete(gather_task)
2154
2155 # Python issue #26923: asyncio.gather drops cancellation
2156 self.assertEqual(cancel_result, False)
2157 self.assertFalse(gather_task.cancelled())
2158 self.assertEqual(gather_task.result(), [42])
2159
2160 def test_cancel_gather_2(self):
2161 cases = [
2162 ((), ()),
2163 ((None,), ()),
2164 (('my message',), ('my message',)),
2165 # Non-string values should roundtrip.
2166 ((5,), (5,)),
2167 ]
2168 for cancel_args, expected_args in cases:
2169 with self.subTest(cancel_args=cancel_args):
2170 loop = asyncio.new_event_loop()
2171 self.addCleanup(loop.close)
2172
2173 async def test():
2174 time = 0
2175 while True:
2176 time += 0.05
2177 await asyncio.gather(asyncio.sleep(0.05),
2178 return_exceptions=True)
2179 if time > 1:
2180 return
2181
2182 async def main():
2183 qwe = self.new_task(loop, test())
2184 await asyncio.sleep(0.2)
2185 qwe.cancel(*cancel_args)
2186 await qwe
2187
2188 try:
2189 loop.run_until_complete(main())
2190 except asyncio.CancelledError as exc:
2191 self.assertEqual(exc.args, expected_args)
2192 actual = get_innermost_context(exc)
2193 self.assertEqual(
2194 actual,
2195 (asyncio.CancelledError, expected_args, 0),
2196 )
2197 else:
2198 self.fail(
2199 'gather() does not propagate CancelledError '
2200 'raised by inner task to the gather() caller.'
2201 )
2202
2203 def test_exception_traceback(self):
2204 # See http://bugs.python.org/issue28843
2205
2206 async def foo():
2207 1 / 0
2208
2209 async def main():
2210 task = self.new_task(self.loop, foo())
2211 await asyncio.sleep(0) # skip one loop iteration
2212 self.assertIsNotNone(task.exception().__traceback__)
2213
2214 self.loop.run_until_complete(main())
2215
2216 @mock.patch('asyncio.base_events.logger')
2217 def test_error_in_call_soon(self, m_log):
2218 def call_soon(callback, *args, **kwargs):
2219 raise ValueError
2220 self.loop.call_soon = call_soon
2221
2222 async def coro():
2223 pass
2224
2225 self.assertFalse(m_log.error.called)
2226
2227 with self.assertRaises(ValueError):
2228 gen = coro()
2229 try:
2230 self.new_task(self.loop, gen)
2231 finally:
2232 gen.close()
2233 gc.collect() # For PyPy or other GCs.
2234
2235 self.assertTrue(m_log.error.called)
2236 message = m_log.error.call_args[0][0]
2237 self.assertIn('Task was destroyed but it is pending', message)
2238
2239 self.assertEqual(asyncio.all_tasks(self.loop), set())
2240
2241 def test_create_task_with_noncoroutine(self):
2242 with self.assertRaisesRegex(TypeError,
2243 "a coroutine was expected, got 123"):
2244 self.new_task(self.loop, 123)
2245
2246 # test it for the second time to ensure that caching
2247 # in asyncio.iscoroutine() doesn't break things.
2248 with self.assertRaisesRegex(TypeError,
2249 "a coroutine was expected, got 123"):
2250 self.new_task(self.loop, 123)
2251
2252 def test_create_task_with_async_function(self):
2253
2254 async def coro():
2255 pass
2256
2257 task = self.new_task(self.loop, coro())
2258 self.assertIsInstance(task, self.Task)
2259 self.loop.run_until_complete(task)
2260
2261 # test it for the second time to ensure that caching
2262 # in asyncio.iscoroutine() doesn't break things.
2263 task = self.new_task(self.loop, coro())
2264 self.assertIsInstance(task, self.Task)
2265 self.loop.run_until_complete(task)
2266
2267 def test_create_task_with_asynclike_function(self):
2268 task = self.new_task(self.loop, CoroLikeObject())
2269 self.assertIsInstance(task, self.Task)
2270 self.assertEqual(self.loop.run_until_complete(task), 42)
2271
2272 # test it for the second time to ensure that caching
2273 # in asyncio.iscoroutine() doesn't break things.
2274 task = self.new_task(self.loop, CoroLikeObject())
2275 self.assertIsInstance(task, self.Task)
2276 self.assertEqual(self.loop.run_until_complete(task), 42)
2277
2278 def test_bare_create_task(self):
2279
2280 async def inner():
2281 return 1
2282
2283 async def coro():
2284 task = asyncio.create_task(inner())
2285 self.assertIsInstance(task, self.Task)
2286 ret = await task
2287 self.assertEqual(1, ret)
2288
2289 self.loop.run_until_complete(coro())
2290
2291 def test_bare_create_named_task(self):
2292
2293 async def coro_noop():
2294 pass
2295
2296 async def coro():
2297 task = asyncio.create_task(coro_noop(), name='No-op')
2298 self.assertEqual(task.get_name(), 'No-op')
2299 await task
2300
2301 self.loop.run_until_complete(coro())
2302
2303 def test_context_1(self):
2304 cvar = contextvars.ContextVar('cvar', default='nope')
2305
2306 async def sub():
2307 await asyncio.sleep(0.01)
2308 self.assertEqual(cvar.get(), 'nope')
2309 cvar.set('something else')
2310
2311 async def main():
2312 self.assertEqual(cvar.get(), 'nope')
2313 subtask = self.new_task(loop, sub())
2314 cvar.set('yes')
2315 self.assertEqual(cvar.get(), 'yes')
2316 await subtask
2317 self.assertEqual(cvar.get(), 'yes')
2318
2319 loop = asyncio.new_event_loop()
2320 try:
2321 task = self.new_task(loop, main())
2322 loop.run_until_complete(task)
2323 finally:
2324 loop.close()
2325
2326 def test_context_2(self):
2327 cvar = contextvars.ContextVar('cvar', default='nope')
2328
2329 async def main():
2330 def fut_on_done(fut):
2331 # This change must not pollute the context
2332 # of the "main()" task.
2333 cvar.set('something else')
2334
2335 self.assertEqual(cvar.get(), 'nope')
2336
2337 for j in range(2):
2338 fut = self.new_future(loop)
2339 fut.add_done_callback(fut_on_done)
2340 cvar.set(f'yes{j}')
2341 loop.call_soon(fut.set_result, None)
2342 await fut
2343 self.assertEqual(cvar.get(), f'yes{j}')
2344
2345 for i in range(3):
2346 # Test that task passed its context to add_done_callback:
2347 cvar.set(f'yes{i}-{j}')
2348 await asyncio.sleep(0.001)
2349 self.assertEqual(cvar.get(), f'yes{i}-{j}')
2350
2351 loop = asyncio.new_event_loop()
2352 try:
2353 task = self.new_task(loop, main())
2354 loop.run_until_complete(task)
2355 finally:
2356 loop.close()
2357
2358 self.assertEqual(cvar.get(), 'nope')
2359
2360 def test_context_3(self):
2361 # Run 100 Tasks in parallel, each modifying cvar.
2362
2363 cvar = contextvars.ContextVar('cvar', default=-1)
2364
2365 async def sub(num):
2366 for i in range(10):
2367 cvar.set(num + i)
2368 await asyncio.sleep(random.uniform(0.001, 0.05))
2369 self.assertEqual(cvar.get(), num + i)
2370
2371 async def main():
2372 tasks = []
2373 for i in range(100):
2374 task = loop.create_task(sub(random.randint(0, 10)))
2375 tasks.append(task)
2376
2377 await asyncio.gather(*tasks)
2378
2379 loop = asyncio.new_event_loop()
2380 try:
2381 loop.run_until_complete(main())
2382 finally:
2383 loop.close()
2384
2385 self.assertEqual(cvar.get(), -1)
2386
2387 def test_context_4(self):
2388 cvar = contextvars.ContextVar('cvar')
2389
2390 async def coro(val):
2391 await asyncio.sleep(0)
2392 cvar.set(val)
2393
2394 async def main():
2395 ret = []
2396 ctx = contextvars.copy_context()
2397 ret.append(ctx.get(cvar))
2398 t1 = self.new_task(loop, coro(1), context=ctx)
2399 await t1
2400 ret.append(ctx.get(cvar))
2401 t2 = self.new_task(loop, coro(2), context=ctx)
2402 await t2
2403 ret.append(ctx.get(cvar))
2404 return ret
2405
2406 loop = asyncio.new_event_loop()
2407 try:
2408 task = self.new_task(loop, main())
2409 ret = loop.run_until_complete(task)
2410 finally:
2411 loop.close()
2412
2413 self.assertEqual([None, 1, 2], ret)
2414
2415 def test_context_5(self):
2416 cvar = contextvars.ContextVar('cvar')
2417
2418 async def coro(val):
2419 await asyncio.sleep(0)
2420 cvar.set(val)
2421
2422 async def main():
2423 ret = []
2424 ctx = contextvars.copy_context()
2425 ret.append(ctx.get(cvar))
2426 t1 = asyncio.create_task(coro(1), context=ctx)
2427 await t1
2428 ret.append(ctx.get(cvar))
2429 t2 = asyncio.create_task(coro(2), context=ctx)
2430 await t2
2431 ret.append(ctx.get(cvar))
2432 return ret
2433
2434 loop = asyncio.new_event_loop()
2435 try:
2436 task = self.new_task(loop, main())
2437 ret = loop.run_until_complete(task)
2438 finally:
2439 loop.close()
2440
2441 self.assertEqual([None, 1, 2], ret)
2442
2443 def test_context_6(self):
2444 cvar = contextvars.ContextVar('cvar')
2445
2446 async def coro(val):
2447 await asyncio.sleep(0)
2448 cvar.set(val)
2449
2450 async def main():
2451 ret = []
2452 ctx = contextvars.copy_context()
2453 ret.append(ctx.get(cvar))
2454 t1 = loop.create_task(coro(1), context=ctx)
2455 await t1
2456 ret.append(ctx.get(cvar))
2457 t2 = loop.create_task(coro(2), context=ctx)
2458 await t2
2459 ret.append(ctx.get(cvar))
2460 return ret
2461
2462 loop = asyncio.new_event_loop()
2463 try:
2464 task = loop.create_task(main())
2465 ret = loop.run_until_complete(task)
2466 finally:
2467 loop.close()
2468
2469 self.assertEqual([None, 1, 2], ret)
2470
2471 def test_get_coro(self):
2472 loop = asyncio.new_event_loop()
2473 coro = coroutine_function()
2474 try:
2475 task = self.new_task(loop, coro)
2476 loop.run_until_complete(task)
2477 self.assertIs(task.get_coro(), coro)
2478 finally:
2479 loop.close()
2480
2481 def test_get_context(self):
2482 loop = asyncio.new_event_loop()
2483 coro = coroutine_function()
2484 context = contextvars.copy_context()
2485 try:
2486 task = self.new_task(loop, coro, context=context)
2487 loop.run_until_complete(task)
2488 self.assertIs(task.get_context(), context)
2489 finally:
2490 loop.close()
2491
2492
2493 def add_subclass_tests(cls):
2494 BaseTask = cls.Task
2495 BaseFuture = cls.Future
2496
2497 if BaseTask is None or BaseFuture is None:
2498 return cls
2499
2500 class ESC[4;38;5;81mCommonFuture:
2501 def __init__(self, *args, **kwargs):
2502 self.calls = collections.defaultdict(lambda: 0)
2503 super().__init__(*args, **kwargs)
2504
2505 def add_done_callback(self, *args, **kwargs):
2506 self.calls['add_done_callback'] += 1
2507 return super().add_done_callback(*args, **kwargs)
2508
2509 class ESC[4;38;5;81mTask(ESC[4;38;5;149mCommonFuture, ESC[4;38;5;149mBaseTask):
2510 pass
2511
2512 class ESC[4;38;5;81mFuture(ESC[4;38;5;149mCommonFuture, ESC[4;38;5;149mBaseFuture):
2513 pass
2514
2515 def test_subclasses_ctask_cfuture(self):
2516 fut = self.Future(loop=self.loop)
2517
2518 async def func():
2519 self.loop.call_soon(lambda: fut.set_result('spam'))
2520 return await fut
2521
2522 task = self.Task(func(), loop=self.loop)
2523
2524 result = self.loop.run_until_complete(task)
2525
2526 self.assertEqual(result, 'spam')
2527
2528 self.assertEqual(
2529 dict(task.calls),
2530 {'add_done_callback': 1})
2531
2532 self.assertEqual(
2533 dict(fut.calls),
2534 {'add_done_callback': 1})
2535
2536 # Add patched Task & Future back to the test case
2537 cls.Task = Task
2538 cls.Future = Future
2539
2540 # Add an extra unit-test
2541 cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture
2542
2543 # Disable the "test_task_source_traceback" test
2544 # (the test is hardcoded for a particular call stack, which
2545 # is slightly different for Task subclasses)
2546 cls.test_task_source_traceback = None
2547
2548 return cls
2549
2550
2551 class ESC[4;38;5;81mSetMethodsTest:
2552
2553 def test_set_result_causes_invalid_state(self):
2554 Future = type(self).Future
2555 self.loop.call_exception_handler = exc_handler = mock.Mock()
2556
2557 async def foo():
2558 await asyncio.sleep(0.1)
2559 return 10
2560
2561 coro = foo()
2562 task = self.new_task(self.loop, coro)
2563 Future.set_result(task, 'spam')
2564
2565 self.assertEqual(
2566 self.loop.run_until_complete(task),
2567 'spam')
2568
2569 exc_handler.assert_called_once()
2570 exc = exc_handler.call_args[0][0]['exception']
2571 with self.assertRaisesRegex(asyncio.InvalidStateError,
2572 r'step\(\): already done'):
2573 raise exc
2574
2575 coro.close()
2576
2577 def test_set_exception_causes_invalid_state(self):
2578 class ESC[4;38;5;81mMyExc(ESC[4;38;5;149mException):
2579 pass
2580
2581 Future = type(self).Future
2582 self.loop.call_exception_handler = exc_handler = mock.Mock()
2583
2584 async def foo():
2585 await asyncio.sleep(0.1)
2586 return 10
2587
2588 coro = foo()
2589 task = self.new_task(self.loop, coro)
2590 Future.set_exception(task, MyExc())
2591
2592 with self.assertRaises(MyExc):
2593 self.loop.run_until_complete(task)
2594
2595 exc_handler.assert_called_once()
2596 exc = exc_handler.call_args[0][0]['exception']
2597 with self.assertRaisesRegex(asyncio.InvalidStateError,
2598 r'step\(\): already done'):
2599 raise exc
2600
2601 coro.close()
2602
2603
2604 @unittest.skipUnless(hasattr(futures, '_CFuture') and
2605 hasattr(tasks, '_CTask'),
2606 'requires the C _asyncio module')
2607 class ESC[4;38;5;81mCTask_CFuture_Tests(ESC[4;38;5;149mBaseTaskTests, ESC[4;38;5;149mSetMethodsTest,
2608 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2609
2610 Task = getattr(tasks, '_CTask', None)
2611 Future = getattr(futures, '_CFuture', None)
2612
2613 @support.refcount_test
2614 def test_refleaks_in_task___init__(self):
2615 gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
2616 async def coro():
2617 pass
2618 task = self.new_task(self.loop, coro())
2619 self.loop.run_until_complete(task)
2620 refs_before = gettotalrefcount()
2621 for i in range(100):
2622 task.__init__(coro(), loop=self.loop)
2623 self.loop.run_until_complete(task)
2624 self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)
2625
2626 def test_del__log_destroy_pending_segfault(self):
2627 async def coro():
2628 pass
2629 task = self.new_task(self.loop, coro())
2630 self.loop.run_until_complete(task)
2631 with self.assertRaises(AttributeError):
2632 del task._log_destroy_pending
2633
2634
2635 @unittest.skipUnless(hasattr(futures, '_CFuture') and
2636 hasattr(tasks, '_CTask'),
2637 'requires the C _asyncio module')
2638 @add_subclass_tests
2639 class ESC[4;38;5;81mCTask_CFuture_SubclassTests(ESC[4;38;5;149mBaseTaskTests, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2640
2641 Task = getattr(tasks, '_CTask', None)
2642 Future = getattr(futures, '_CFuture', None)
2643
2644
2645 @unittest.skipUnless(hasattr(tasks, '_CTask'),
2646 'requires the C _asyncio module')
2647 @add_subclass_tests
2648 class ESC[4;38;5;81mCTaskSubclass_PyFuture_Tests(ESC[4;38;5;149mBaseTaskTests, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2649
2650 Task = getattr(tasks, '_CTask', None)
2651 Future = futures._PyFuture
2652
2653
2654 @unittest.skipUnless(hasattr(futures, '_CFuture'),
2655 'requires the C _asyncio module')
2656 @add_subclass_tests
2657 class ESC[4;38;5;81mPyTask_CFutureSubclass_Tests(ESC[4;38;5;149mBaseTaskTests, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2658
2659 Future = getattr(futures, '_CFuture', None)
2660 Task = tasks._PyTask
2661
2662
2663 @unittest.skipUnless(hasattr(tasks, '_CTask'),
2664 'requires the C _asyncio module')
2665 class ESC[4;38;5;81mCTask_PyFuture_Tests(ESC[4;38;5;149mBaseTaskTests, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2666
2667 Task = getattr(tasks, '_CTask', None)
2668 Future = futures._PyFuture
2669
2670
2671 @unittest.skipUnless(hasattr(futures, '_CFuture'),
2672 'requires the C _asyncio module')
2673 class ESC[4;38;5;81mPyTask_CFuture_Tests(ESC[4;38;5;149mBaseTaskTests, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2674
2675 Task = tasks._PyTask
2676 Future = getattr(futures, '_CFuture', None)
2677
2678
2679 class ESC[4;38;5;81mPyTask_PyFuture_Tests(ESC[4;38;5;149mBaseTaskTests, ESC[4;38;5;149mSetMethodsTest,
2680 ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2681
2682 Task = tasks._PyTask
2683 Future = futures._PyFuture
2684
2685
2686 @add_subclass_tests
2687 class ESC[4;38;5;81mPyTask_PyFuture_SubclassTests(ESC[4;38;5;149mBaseTaskTests, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2688 Task = tasks._PyTask
2689 Future = futures._PyFuture
2690
2691
2692 @unittest.skipUnless(hasattr(tasks, '_CTask'),
2693 'requires the C _asyncio module')
2694 class ESC[4;38;5;81mCTask_Future_Tests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2695
2696 def test_foobar(self):
2697 class ESC[4;38;5;81mFut(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mFuture):
2698 @property
2699 def get_loop(self):
2700 raise AttributeError
2701
2702 async def coro():
2703 await fut
2704 return 'spam'
2705
2706 self.loop = asyncio.new_event_loop()
2707 try:
2708 fut = Fut(loop=self.loop)
2709 self.loop.call_later(0.1, fut.set_result, 1)
2710 task = self.loop.create_task(coro())
2711 res = self.loop.run_until_complete(task)
2712 finally:
2713 self.loop.close()
2714
2715 self.assertEqual(res, 'spam')
2716
2717
2718 class ESC[4;38;5;81mBaseTaskIntrospectionTests:
2719 _register_task = None
2720 _unregister_task = None
2721 _enter_task = None
2722 _leave_task = None
2723
2724 def test__register_task_1(self):
2725 class ESC[4;38;5;81mTaskLike:
2726 @property
2727 def _loop(self):
2728 return loop
2729
2730 def done(self):
2731 return False
2732
2733 task = TaskLike()
2734 loop = mock.Mock()
2735
2736 self.assertEqual(asyncio.all_tasks(loop), set())
2737 self._register_task(task)
2738 self.assertEqual(asyncio.all_tasks(loop), {task})
2739 self._unregister_task(task)
2740
2741 def test__register_task_2(self):
2742 class ESC[4;38;5;81mTaskLike:
2743 def get_loop(self):
2744 return loop
2745
2746 def done(self):
2747 return False
2748
2749 task = TaskLike()
2750 loop = mock.Mock()
2751
2752 self.assertEqual(asyncio.all_tasks(loop), set())
2753 self._register_task(task)
2754 self.assertEqual(asyncio.all_tasks(loop), {task})
2755 self._unregister_task(task)
2756
2757 def test__register_task_3(self):
2758 class ESC[4;38;5;81mTaskLike:
2759 def get_loop(self):
2760 return loop
2761
2762 def done(self):
2763 return True
2764
2765 task = TaskLike()
2766 loop = mock.Mock()
2767
2768 self.assertEqual(asyncio.all_tasks(loop), set())
2769 self._register_task(task)
2770 self.assertEqual(asyncio.all_tasks(loop), set())
2771 self._unregister_task(task)
2772
2773 def test__enter_task(self):
2774 task = mock.Mock()
2775 loop = mock.Mock()
2776 self.assertIsNone(asyncio.current_task(loop))
2777 self._enter_task(loop, task)
2778 self.assertIs(asyncio.current_task(loop), task)
2779 self._leave_task(loop, task)
2780
2781 def test__enter_task_failure(self):
2782 task1 = mock.Mock()
2783 task2 = mock.Mock()
2784 loop = mock.Mock()
2785 self._enter_task(loop, task1)
2786 with self.assertRaises(RuntimeError):
2787 self._enter_task(loop, task2)
2788 self.assertIs(asyncio.current_task(loop), task1)
2789 self._leave_task(loop, task1)
2790
2791 def test__leave_task(self):
2792 task = mock.Mock()
2793 loop = mock.Mock()
2794 self._enter_task(loop, task)
2795 self._leave_task(loop, task)
2796 self.assertIsNone(asyncio.current_task(loop))
2797
2798 def test__leave_task_failure1(self):
2799 task1 = mock.Mock()
2800 task2 = mock.Mock()
2801 loop = mock.Mock()
2802 self._enter_task(loop, task1)
2803 with self.assertRaises(RuntimeError):
2804 self._leave_task(loop, task2)
2805 self.assertIs(asyncio.current_task(loop), task1)
2806 self._leave_task(loop, task1)
2807
2808 def test__leave_task_failure2(self):
2809 task = mock.Mock()
2810 loop = mock.Mock()
2811 with self.assertRaises(RuntimeError):
2812 self._leave_task(loop, task)
2813 self.assertIsNone(asyncio.current_task(loop))
2814
2815 def test__unregister_task(self):
2816 task = mock.Mock()
2817 loop = mock.Mock()
2818 task.get_loop = lambda: loop
2819 self._register_task(task)
2820 self._unregister_task(task)
2821 self.assertEqual(asyncio.all_tasks(loop), set())
2822
2823 def test__unregister_task_not_registered(self):
2824 task = mock.Mock()
2825 loop = mock.Mock()
2826 self._unregister_task(task)
2827 self.assertEqual(asyncio.all_tasks(loop), set())
2828
2829
2830 class ESC[4;38;5;81mPyIntrospectionTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase, ESC[4;38;5;149mBaseTaskIntrospectionTests):
2831 _register_task = staticmethod(tasks._py_register_task)
2832 _unregister_task = staticmethod(tasks._py_unregister_task)
2833 _enter_task = staticmethod(tasks._py_enter_task)
2834 _leave_task = staticmethod(tasks._py_leave_task)
2835
2836
2837 @unittest.skipUnless(hasattr(tasks, '_c_register_task'),
2838 'requires the C _asyncio module')
2839 class ESC[4;38;5;81mCIntrospectionTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase, ESC[4;38;5;149mBaseTaskIntrospectionTests):
2840 if hasattr(tasks, '_c_register_task'):
2841 _register_task = staticmethod(tasks._c_register_task)
2842 _unregister_task = staticmethod(tasks._c_unregister_task)
2843 _enter_task = staticmethod(tasks._c_enter_task)
2844 _leave_task = staticmethod(tasks._c_leave_task)
2845 else:
2846 _register_task = _unregister_task = _enter_task = _leave_task = None
2847
2848
2849 class ESC[4;38;5;81mBaseCurrentLoopTests:
2850 current_task = None
2851
2852 def setUp(self):
2853 super().setUp()
2854 self.loop = asyncio.new_event_loop()
2855 self.set_event_loop(self.loop)
2856
2857 def new_task(self, coro):
2858 raise NotImplementedError
2859
2860 def test_current_task_no_running_loop(self):
2861 self.assertIsNone(self.current_task(loop=self.loop))
2862
2863 def test_current_task_no_running_loop_implicit(self):
2864 with self.assertRaisesRegex(RuntimeError, 'no running event loop'):
2865 self.current_task()
2866
2867 def test_current_task_with_implicit_loop(self):
2868 async def coro():
2869 self.assertIs(self.current_task(loop=self.loop), task)
2870
2871 self.assertIs(self.current_task(None), task)
2872 self.assertIs(self.current_task(), task)
2873
2874 task = self.new_task(coro())
2875 self.loop.run_until_complete(task)
2876 self.assertIsNone(self.current_task(loop=self.loop))
2877
2878
2879 class ESC[4;38;5;81mPyCurrentLoopTests(ESC[4;38;5;149mBaseCurrentLoopTests, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2880 current_task = staticmethod(tasks._py_current_task)
2881
2882 def new_task(self, coro):
2883 return tasks._PyTask(coro, loop=self.loop)
2884
2885
2886 @unittest.skipUnless(hasattr(tasks, '_CTask') and
2887 hasattr(tasks, '_c_current_task'),
2888 'requires the C _asyncio module')
2889 class ESC[4;38;5;81mCCurrentLoopTests(ESC[4;38;5;149mBaseCurrentLoopTests, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2890 if hasattr(tasks, '_c_current_task'):
2891 current_task = staticmethod(tasks._c_current_task)
2892 else:
2893 current_task = None
2894
2895 def new_task(self, coro):
2896 return getattr(tasks, '_CTask')(coro, loop=self.loop)
2897
2898
2899 class ESC[4;38;5;81mGenericTaskTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
2900
2901 def test_future_subclass(self):
2902 self.assertTrue(issubclass(asyncio.Task, asyncio.Future))
2903
2904 @support.cpython_only
2905 def test_asyncio_module_compiled(self):
2906 # Because of circular imports it's easy to make _asyncio
2907 # module non-importable. This is a simple test that will
2908 # fail on systems where C modules were successfully compiled
2909 # (hence the test for _functools etc), but _asyncio somehow didn't.
2910 try:
2911 import _functools
2912 import _json
2913 import _pickle
2914 except ImportError:
2915 self.skipTest('C modules are not available')
2916 else:
2917 try:
2918 import _asyncio
2919 except ImportError:
2920 self.fail('_asyncio module is missing')
2921
2922
2923 class ESC[4;38;5;81mGatherTestsBase:
2924
2925 def setUp(self):
2926 super().setUp()
2927 self.one_loop = self.new_test_loop()
2928 self.other_loop = self.new_test_loop()
2929 self.set_event_loop(self.one_loop, cleanup=False)
2930
2931 def _run_loop(self, loop):
2932 while loop._ready:
2933 test_utils.run_briefly(loop)
2934
2935 def _check_success(self, **kwargs):
2936 a, b, c = [self.one_loop.create_future() for i in range(3)]
2937 fut = self._gather(*self.wrap_futures(a, b, c), **kwargs)
2938 cb = test_utils.MockCallback()
2939 fut.add_done_callback(cb)
2940 b.set_result(1)
2941 a.set_result(2)
2942 self._run_loop(self.one_loop)
2943 self.assertEqual(cb.called, False)
2944 self.assertFalse(fut.done())
2945 c.set_result(3)
2946 self._run_loop(self.one_loop)
2947 cb.assert_called_once_with(fut)
2948 self.assertEqual(fut.result(), [2, 1, 3])
2949
2950 def test_success(self):
2951 self._check_success()
2952 self._check_success(return_exceptions=False)
2953
2954 def test_result_exception_success(self):
2955 self._check_success(return_exceptions=True)
2956
2957 def test_one_exception(self):
2958 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
2959 fut = self._gather(*self.wrap_futures(a, b, c, d, e))
2960 cb = test_utils.MockCallback()
2961 fut.add_done_callback(cb)
2962 exc = ZeroDivisionError()
2963 a.set_result(1)
2964 b.set_exception(exc)
2965 self._run_loop(self.one_loop)
2966 self.assertTrue(fut.done())
2967 cb.assert_called_once_with(fut)
2968 self.assertIs(fut.exception(), exc)
2969 # Does nothing
2970 c.set_result(3)
2971 d.cancel()
2972 e.set_exception(RuntimeError())
2973 e.exception()
2974
2975 def test_return_exceptions(self):
2976 a, b, c, d = [self.one_loop.create_future() for i in range(4)]
2977 fut = self._gather(*self.wrap_futures(a, b, c, d),
2978 return_exceptions=True)
2979 cb = test_utils.MockCallback()
2980 fut.add_done_callback(cb)
2981 exc = ZeroDivisionError()
2982 exc2 = RuntimeError()
2983 b.set_result(1)
2984 c.set_exception(exc)
2985 a.set_result(3)
2986 self._run_loop(self.one_loop)
2987 self.assertFalse(fut.done())
2988 d.set_exception(exc2)
2989 self._run_loop(self.one_loop)
2990 self.assertTrue(fut.done())
2991 cb.assert_called_once_with(fut)
2992 self.assertEqual(fut.result(), [3, 1, exc, exc2])
2993
2994 def test_env_var_debug(self):
2995 code = '\n'.join((
2996 'import asyncio.coroutines',
2997 'print(asyncio.coroutines._is_debug_mode())'))
2998
2999 # Test with -E to not fail if the unit test was run with
3000 # PYTHONASYNCIODEBUG set to a non-empty string
3001 sts, stdout, stderr = assert_python_ok('-E', '-c', code)
3002 self.assertEqual(stdout.rstrip(), b'False')
3003
3004 sts, stdout, stderr = assert_python_ok('-c', code,
3005 PYTHONASYNCIODEBUG='',
3006 PYTHONDEVMODE='')
3007 self.assertEqual(stdout.rstrip(), b'False')
3008
3009 sts, stdout, stderr = assert_python_ok('-c', code,
3010 PYTHONASYNCIODEBUG='1',
3011 PYTHONDEVMODE='')
3012 self.assertEqual(stdout.rstrip(), b'True')
3013
3014 sts, stdout, stderr = assert_python_ok('-E', '-c', code,
3015 PYTHONASYNCIODEBUG='1',
3016 PYTHONDEVMODE='')
3017 self.assertEqual(stdout.rstrip(), b'False')
3018
3019 # -X dev
3020 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
3021 '-c', code)
3022 self.assertEqual(stdout.rstrip(), b'True')
3023
3024
3025 class ESC[4;38;5;81mFutureGatherTests(ESC[4;38;5;149mGatherTestsBase, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
3026
3027 def wrap_futures(self, *futures):
3028 return futures
3029
3030 def _gather(self, *args, **kwargs):
3031 return asyncio.gather(*args, **kwargs)
3032
3033 def test_constructor_empty_sequence_without_loop(self):
3034 with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
3035 asyncio.gather()
3036
3037 def test_constructor_empty_sequence_use_running_loop(self):
3038 async def gather():
3039 return asyncio.gather()
3040 fut = self.one_loop.run_until_complete(gather())
3041 self.assertIsInstance(fut, asyncio.Future)
3042 self.assertIs(fut._loop, self.one_loop)
3043 self._run_loop(self.one_loop)
3044 self.assertTrue(fut.done())
3045 self.assertEqual(fut.result(), [])
3046
3047 def test_constructor_empty_sequence_use_global_loop(self):
3048 # Deprecated in 3.10, undeprecated in 3.12
3049 asyncio.set_event_loop(self.one_loop)
3050 self.addCleanup(asyncio.set_event_loop, None)
3051 fut = asyncio.gather()
3052 self.assertIsInstance(fut, asyncio.Future)
3053 self.assertIs(fut._loop, self.one_loop)
3054 self._run_loop(self.one_loop)
3055 self.assertTrue(fut.done())
3056 self.assertEqual(fut.result(), [])
3057
3058 def test_constructor_heterogenous_futures(self):
3059 fut1 = self.one_loop.create_future()
3060 fut2 = self.other_loop.create_future()
3061 with self.assertRaises(ValueError):
3062 asyncio.gather(fut1, fut2)
3063
3064 def test_constructor_homogenous_futures(self):
3065 children = [self.other_loop.create_future() for i in range(3)]
3066 fut = asyncio.gather(*children)
3067 self.assertIs(fut._loop, self.other_loop)
3068 self._run_loop(self.other_loop)
3069 self.assertFalse(fut.done())
3070 fut = asyncio.gather(*children)
3071 self.assertIs(fut._loop, self.other_loop)
3072 self._run_loop(self.other_loop)
3073 self.assertFalse(fut.done())
3074
3075 def test_one_cancellation(self):
3076 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
3077 fut = asyncio.gather(a, b, c, d, e)
3078 cb = test_utils.MockCallback()
3079 fut.add_done_callback(cb)
3080 a.set_result(1)
3081 b.cancel()
3082 self._run_loop(self.one_loop)
3083 self.assertTrue(fut.done())
3084 cb.assert_called_once_with(fut)
3085 self.assertFalse(fut.cancelled())
3086 self.assertIsInstance(fut.exception(), asyncio.CancelledError)
3087 # Does nothing
3088 c.set_result(3)
3089 d.cancel()
3090 e.set_exception(RuntimeError())
3091 e.exception()
3092
3093 def test_result_exception_one_cancellation(self):
3094 a, b, c, d, e, f = [self.one_loop.create_future()
3095 for i in range(6)]
3096 fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True)
3097 cb = test_utils.MockCallback()
3098 fut.add_done_callback(cb)
3099 a.set_result(1)
3100 zde = ZeroDivisionError()
3101 b.set_exception(zde)
3102 c.cancel()
3103 self._run_loop(self.one_loop)
3104 self.assertFalse(fut.done())
3105 d.set_result(3)
3106 e.cancel()
3107 rte = RuntimeError()
3108 f.set_exception(rte)
3109 res = self.one_loop.run_until_complete(fut)
3110 self.assertIsInstance(res[2], asyncio.CancelledError)
3111 self.assertIsInstance(res[4], asyncio.CancelledError)
3112 res[2] = res[4] = None
3113 self.assertEqual(res, [1, zde, None, 3, None, rte])
3114 cb.assert_called_once_with(fut)
3115
3116
3117 class ESC[4;38;5;81mCoroutineGatherTests(ESC[4;38;5;149mGatherTestsBase, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
3118
3119 def wrap_futures(self, *futures):
3120 coros = []
3121 for fut in futures:
3122 async def coro(fut=fut):
3123 return await fut
3124 coros.append(coro())
3125 return coros
3126
3127 def _gather(self, *args, **kwargs):
3128 async def coro():
3129 return asyncio.gather(*args, **kwargs)
3130 return self.one_loop.run_until_complete(coro())
3131
3132 def test_constructor_without_loop(self):
3133 async def coro():
3134 return 'abc'
3135 gen1 = coro()
3136 self.addCleanup(gen1.close)
3137 gen2 = coro()
3138 self.addCleanup(gen2.close)
3139 with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
3140 asyncio.gather(gen1, gen2)
3141
3142 def test_constructor_use_running_loop(self):
3143 async def coro():
3144 return 'abc'
3145 gen1 = coro()
3146 gen2 = coro()
3147 async def gather():
3148 return asyncio.gather(gen1, gen2)
3149 fut = self.one_loop.run_until_complete(gather())
3150 self.assertIs(fut._loop, self.one_loop)
3151 self.one_loop.run_until_complete(fut)
3152
3153 def test_constructor_use_global_loop(self):
3154 # Deprecated in 3.10, undeprecated in 3.12
3155 async def coro():
3156 return 'abc'
3157 asyncio.set_event_loop(self.other_loop)
3158 self.addCleanup(asyncio.set_event_loop, None)
3159 gen1 = coro()
3160 gen2 = coro()
3161 fut = asyncio.gather(gen1, gen2)
3162 self.assertIs(fut._loop, self.other_loop)
3163 self.other_loop.run_until_complete(fut)
3164
3165 def test_duplicate_coroutines(self):
3166 async def coro(s):
3167 return s
3168 c = coro('abc')
3169 fut = self._gather(c, c, coro('def'), c)
3170 self._run_loop(self.one_loop)
3171 self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc'])
3172
3173 def test_cancellation_broadcast(self):
3174 # Cancelling outer() cancels all children.
3175 proof = 0
3176 waiter = self.one_loop.create_future()
3177
3178 async def inner():
3179 nonlocal proof
3180 await waiter
3181 proof += 1
3182
3183 child1 = asyncio.ensure_future(inner(), loop=self.one_loop)
3184 child2 = asyncio.ensure_future(inner(), loop=self.one_loop)
3185 gatherer = None
3186
3187 async def outer():
3188 nonlocal proof, gatherer
3189 gatherer = asyncio.gather(child1, child2)
3190 await gatherer
3191 proof += 100
3192
3193 f = asyncio.ensure_future(outer(), loop=self.one_loop)
3194 test_utils.run_briefly(self.one_loop)
3195 self.assertTrue(f.cancel())
3196 with self.assertRaises(asyncio.CancelledError):
3197 self.one_loop.run_until_complete(f)
3198 self.assertFalse(gatherer.cancel())
3199 self.assertTrue(waiter.cancelled())
3200 self.assertTrue(child1.cancelled())
3201 self.assertTrue(child2.cancelled())
3202 test_utils.run_briefly(self.one_loop)
3203 self.assertEqual(proof, 0)
3204
3205 def test_exception_marking(self):
3206 # Test for the first line marked "Mark exception retrieved."
3207
3208 async def inner(f):
3209 await f
3210 raise RuntimeError('should not be ignored')
3211
3212 a = self.one_loop.create_future()
3213 b = self.one_loop.create_future()
3214
3215 async def outer():
3216 await asyncio.gather(inner(a), inner(b))
3217
3218 f = asyncio.ensure_future(outer(), loop=self.one_loop)
3219 test_utils.run_briefly(self.one_loop)
3220 a.set_result(None)
3221 test_utils.run_briefly(self.one_loop)
3222 b.set_result(None)
3223 test_utils.run_briefly(self.one_loop)
3224 self.assertIsInstance(f.exception(), RuntimeError)
3225
3226 def test_issue46672(self):
3227 with mock.patch(
3228 'asyncio.base_events.BaseEventLoop.call_exception_handler',
3229 ):
3230 async def coro(s):
3231 return s
3232 c = coro('abc')
3233
3234 with self.assertRaises(TypeError):
3235 self._gather(c, {})
3236 self._run_loop(self.one_loop)
3237 # NameError should not happen:
3238 self.one_loop.call_exception_handler.assert_not_called()
3239
3240
3241 class ESC[4;38;5;81mRunCoroutineThreadsafeTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
3242 """Test case for asyncio.run_coroutine_threadsafe."""
3243
3244 def setUp(self):
3245 super().setUp()
3246 self.loop = asyncio.new_event_loop()
3247 self.set_event_loop(self.loop) # Will cleanup properly
3248
3249 async def add(self, a, b, fail=False, cancel=False):
3250 """Wait 0.05 second and return a + b."""
3251 await asyncio.sleep(0.05)
3252 if fail:
3253 raise RuntimeError("Fail!")
3254 if cancel:
3255 asyncio.current_task(self.loop).cancel()
3256 await asyncio.sleep(0)
3257 return a + b
3258
3259 def target(self, fail=False, cancel=False, timeout=None,
3260 advance_coro=False):
3261 """Run add coroutine in the event loop."""
3262 coro = self.add(1, 2, fail=fail, cancel=cancel)
3263 future = asyncio.run_coroutine_threadsafe(coro, self.loop)
3264 if advance_coro:
3265 # this is for test_run_coroutine_threadsafe_task_factory_exception;
3266 # otherwise it spills errors and breaks **other** unittests, since
3267 # 'target' is interacting with threads.
3268
3269 # With this call, `coro` will be advanced.
3270 self.loop.call_soon_threadsafe(coro.send, None)
3271 try:
3272 return future.result(timeout)
3273 finally:
3274 future.done() or future.cancel()
3275
3276 def test_run_coroutine_threadsafe(self):
3277 """Test coroutine submission from a thread to an event loop."""
3278 future = self.loop.run_in_executor(None, self.target)
3279 result = self.loop.run_until_complete(future)
3280 self.assertEqual(result, 3)
3281
3282 def test_run_coroutine_threadsafe_with_exception(self):
3283 """Test coroutine submission from a thread to an event loop
3284 when an exception is raised."""
3285 future = self.loop.run_in_executor(None, self.target, True)
3286 with self.assertRaises(RuntimeError) as exc_context:
3287 self.loop.run_until_complete(future)
3288 self.assertIn("Fail!", exc_context.exception.args)
3289
3290 def test_run_coroutine_threadsafe_with_timeout(self):
3291 """Test coroutine submission from a thread to an event loop
3292 when a timeout is raised."""
3293 callback = lambda: self.target(timeout=0)
3294 future = self.loop.run_in_executor(None, callback)
3295 with self.assertRaises(asyncio.TimeoutError):
3296 self.loop.run_until_complete(future)
3297 test_utils.run_briefly(self.loop)
3298 # Check that there's no pending task (add has been cancelled)
3299 for task in asyncio.all_tasks(self.loop):
3300 self.assertTrue(task.done())
3301
3302 def test_run_coroutine_threadsafe_task_cancelled(self):
3303 """Test coroutine submission from a thread to an event loop
3304 when the task is cancelled."""
3305 callback = lambda: self.target(cancel=True)
3306 future = self.loop.run_in_executor(None, callback)
3307 with self.assertRaises(asyncio.CancelledError):
3308 self.loop.run_until_complete(future)
3309
3310 def test_run_coroutine_threadsafe_task_factory_exception(self):
3311 """Test coroutine submission from a thread to an event loop
3312 when the task factory raise an exception."""
3313
3314 def task_factory(loop, coro):
3315 raise NameError
3316
3317 run = self.loop.run_in_executor(
3318 None, lambda: self.target(advance_coro=True))
3319
3320 # Set exception handler
3321 callback = test_utils.MockCallback()
3322 self.loop.set_exception_handler(callback)
3323
3324 # Set corrupted task factory
3325 self.addCleanup(self.loop.set_task_factory,
3326 self.loop.get_task_factory())
3327 self.loop.set_task_factory(task_factory)
3328
3329 # Run event loop
3330 with self.assertRaises(NameError) as exc_context:
3331 self.loop.run_until_complete(run)
3332
3333 # Check exceptions
3334 self.assertEqual(len(callback.call_args_list), 1)
3335 (loop, context), kwargs = callback.call_args
3336 self.assertEqual(context['exception'], exc_context.exception)
3337
3338
3339 class ESC[4;38;5;81mSleepTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
3340 def setUp(self):
3341 super().setUp()
3342 self.loop = asyncio.new_event_loop()
3343 self.set_event_loop(self.loop)
3344
3345 def tearDown(self):
3346 self.loop.close()
3347 self.loop = None
3348 super().tearDown()
3349
3350 def test_sleep_zero(self):
3351 result = 0
3352
3353 def inc_result(num):
3354 nonlocal result
3355 result += num
3356
3357 async def coro():
3358 self.loop.call_soon(inc_result, 1)
3359 self.assertEqual(result, 0)
3360 num = await asyncio.sleep(0, result=10)
3361 self.assertEqual(result, 1) # inc'ed by call_soon
3362 inc_result(num) # num should be 11
3363
3364 self.loop.run_until_complete(coro())
3365 self.assertEqual(result, 11)
3366
3367
3368 class ESC[4;38;5;81mCompatibilityTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
3369 # Tests for checking a bridge between old-styled coroutines
3370 # and async/await syntax
3371
3372 def setUp(self):
3373 super().setUp()
3374 self.loop = asyncio.new_event_loop()
3375 self.set_event_loop(self.loop)
3376
3377 def tearDown(self):
3378 self.loop.close()
3379 self.loop = None
3380 super().tearDown()
3381
3382
3383 if __name__ == '__main__':
3384 unittest.main()