python (3.12.0)

(root)/
lib/
python3.12/
test/
test_asyncio/
test_tasks.py
       1  """Tests for tasks.py."""
       2  
       3  import collections
       4  import contextvars
       5  import gc
       6  import io
       7  import random
       8  import re
       9  import sys
      10  import traceback
      11  import types
      12  import unittest
      13  from unittest import mock
      14  from types import GenericAlias
      15  
      16  import asyncio
      17  from asyncio import futures
      18  from asyncio import tasks
      19  from test.test_asyncio import utils as test_utils
      20  from test import support
      21  from test.support.script_helper import assert_python_ok
      22  
      23  
      24  def tearDownModule():
      25      asyncio.set_event_loop_policy(None)
      26  
      27  
      28  async def coroutine_function():
      29      pass
      30  
      31  
      32  def format_coroutine(qualname, state, src, source_traceback, generator=False):
      33      if generator:
      34          state = '%s' % state
      35      else:
      36          state = '%s, defined' % state
      37      if source_traceback is not None:
      38          frame = source_traceback[-1]
      39          return ('coro=<%s() %s at %s> created at %s:%s'
      40                  % (qualname, state, src, frame[0], frame[1]))
      41      else:
      42          return 'coro=<%s() %s at %s>' % (qualname, state, src)
      43  
      44  
      45  def get_innermost_context(exc):
      46      """
      47      Return information about the innermost exception context in the chain.
      48      """
      49      depth = 0
      50      while True:
      51          context = exc.__context__
      52          if context is None:
      53              break
      54  
      55          exc = context
      56          depth += 1
      57  
      58      return (type(exc), exc.args, depth)
      59  
      60  
      61  class ESC[4;38;5;81mDummy:
      62  
      63      def __repr__(self):
      64          return '<Dummy>'
      65  
      66      def __call__(self, *args):
      67          pass
      68  
      69  
      70  class ESC[4;38;5;81mCoroLikeObject:
      71      def send(self, v):
      72          raise StopIteration(42)
      73  
      74      def throw(self, *exc):
      75          pass
      76  
      77      def close(self):
      78          pass
      79  
      80      def __await__(self):
      81          return self
      82  
      83  
      84  class ESC[4;38;5;81mBaseTaskTests:
      85  
      86      Task = None
      87      Future = None
      88  
      89      def new_task(self, loop, coro, name='TestTask', context=None):
      90          return self.__class__.Task(coro, loop=loop, name=name, context=context)
      91  
      92      def new_future(self, loop):
      93          return self.__class__.Future(loop=loop)
      94  
      95      def setUp(self):
      96          super().setUp()
      97          self.loop = self.new_test_loop()
      98          self.loop.set_task_factory(self.new_task)
      99          self.loop.create_future = lambda: self.new_future(self.loop)
     100  
     101      def test_generic_alias(self):
     102          task = self.__class__.Task[str]
     103          self.assertEqual(task.__args__, (str,))
     104          self.assertIsInstance(task, GenericAlias)
     105  
     106      def test_task_cancel_message_getter(self):
     107          async def coro():
     108              pass
     109          t = self.new_task(self.loop, coro())
     110          self.assertTrue(hasattr(t, '_cancel_message'))
     111          self.assertEqual(t._cancel_message, None)
     112  
     113          t.cancel('my message')
     114          self.assertEqual(t._cancel_message, 'my message')
     115  
     116          with self.assertRaises(asyncio.CancelledError) as cm:
     117              self.loop.run_until_complete(t)
     118  
     119          self.assertEqual('my message', cm.exception.args[0])
     120  
     121      def test_task_cancel_message_setter(self):
     122          async def coro():
     123              pass
     124          t = self.new_task(self.loop, coro())
     125          t.cancel('my message')
     126          t._cancel_message = 'my new message'
     127          self.assertEqual(t._cancel_message, 'my new message')
     128  
     129          with self.assertRaises(asyncio.CancelledError) as cm:
     130              self.loop.run_until_complete(t)
     131  
     132          self.assertEqual('my new message', cm.exception.args[0])
     133  
     134      def test_task_del_collect(self):
     135          class ESC[4;38;5;81mEvil:
     136              def __del__(self):
     137                  gc.collect()
     138  
     139          async def run():
     140              return Evil()
     141  
     142          self.loop.run_until_complete(
     143              asyncio.gather(*[
     144                  self.new_task(self.loop, run()) for _ in range(100)
     145              ]))
     146  
     147      def test_other_loop_future(self):
     148          other_loop = asyncio.new_event_loop()
     149          fut = self.new_future(other_loop)
     150  
     151          async def run(fut):
     152              await fut
     153  
     154          try:
     155              with self.assertRaisesRegex(RuntimeError,
     156                                          r'Task .* got Future .* attached'):
     157                  self.loop.run_until_complete(run(fut))
     158          finally:
     159              other_loop.close()
     160  
     161      def test_task_awaits_on_itself(self):
     162  
     163          async def test():
     164              await task
     165  
     166          task = asyncio.ensure_future(test(), loop=self.loop)
     167  
     168          with self.assertRaisesRegex(RuntimeError,
     169                                      'Task cannot await on itself'):
     170              self.loop.run_until_complete(task)
     171  
     172      def test_task_class(self):
     173          async def notmuch():
     174              return 'ok'
     175          t = self.new_task(self.loop, notmuch())
     176          self.loop.run_until_complete(t)
     177          self.assertTrue(t.done())
     178          self.assertEqual(t.result(), 'ok')
     179          self.assertIs(t._loop, self.loop)
     180          self.assertIs(t.get_loop(), self.loop)
     181  
     182          loop = asyncio.new_event_loop()
     183          self.set_event_loop(loop)
     184          t = self.new_task(loop, notmuch())
     185          self.assertIs(t._loop, loop)
     186          loop.run_until_complete(t)
     187          loop.close()
     188  
     189      def test_ensure_future_coroutine(self):
     190          async def notmuch():
     191              return 'ok'
     192          t = asyncio.ensure_future(notmuch(), loop=self.loop)
     193          self.assertIs(t._loop, self.loop)
     194          self.loop.run_until_complete(t)
     195          self.assertTrue(t.done())
     196          self.assertEqual(t.result(), 'ok')
     197  
     198          a = notmuch()
     199          self.addCleanup(a.close)
     200          with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
     201              asyncio.ensure_future(a)
     202  
     203          async def test():
     204              return asyncio.ensure_future(notmuch())
     205          t = self.loop.run_until_complete(test())
     206          self.assertIs(t._loop, self.loop)
     207          self.loop.run_until_complete(t)
     208          self.assertTrue(t.done())
     209          self.assertEqual(t.result(), 'ok')
     210  
     211          # Deprecated in 3.10, undeprecated in 3.12
     212          asyncio.set_event_loop(self.loop)
     213          self.addCleanup(asyncio.set_event_loop, None)
     214          t = asyncio.ensure_future(notmuch())
     215          self.assertIs(t._loop, self.loop)
     216          self.loop.run_until_complete(t)
     217          self.assertTrue(t.done())
     218          self.assertEqual(t.result(), 'ok')
     219  
     220      def test_ensure_future_future(self):
     221          f_orig = self.new_future(self.loop)
     222          f_orig.set_result('ko')
     223  
     224          f = asyncio.ensure_future(f_orig)
     225          self.loop.run_until_complete(f)
     226          self.assertTrue(f.done())
     227          self.assertEqual(f.result(), 'ko')
     228          self.assertIs(f, f_orig)
     229  
     230          loop = asyncio.new_event_loop()
     231          self.set_event_loop(loop)
     232  
     233          with self.assertRaises(ValueError):
     234              f = asyncio.ensure_future(f_orig, loop=loop)
     235  
     236          loop.close()
     237  
     238          f = asyncio.ensure_future(f_orig, loop=self.loop)
     239          self.assertIs(f, f_orig)
     240  
     241      def test_ensure_future_task(self):
     242          async def notmuch():
     243              return 'ok'
     244          t_orig = self.new_task(self.loop, notmuch())
     245          t = asyncio.ensure_future(t_orig)
     246          self.loop.run_until_complete(t)
     247          self.assertTrue(t.done())
     248          self.assertEqual(t.result(), 'ok')
     249          self.assertIs(t, t_orig)
     250  
     251          loop = asyncio.new_event_loop()
     252          self.set_event_loop(loop)
     253  
     254          with self.assertRaises(ValueError):
     255              t = asyncio.ensure_future(t_orig, loop=loop)
     256  
     257          loop.close()
     258  
     259          t = asyncio.ensure_future(t_orig, loop=self.loop)
     260          self.assertIs(t, t_orig)
     261  
     262      def test_ensure_future_awaitable(self):
     263          class ESC[4;38;5;81mAw:
     264              def __init__(self, coro):
     265                  self.coro = coro
     266              def __await__(self):
     267                  return self.coro.__await__()
     268  
     269          async def coro():
     270              return 'ok'
     271  
     272          loop = asyncio.new_event_loop()
     273          self.set_event_loop(loop)
     274          fut = asyncio.ensure_future(Aw(coro()), loop=loop)
     275          loop.run_until_complete(fut)
     276          self.assertEqual(fut.result(), 'ok')
     277  
     278      def test_ensure_future_task_awaitable(self):
     279          class ESC[4;38;5;81mAw:
     280              def __await__(self):
     281                  return asyncio.sleep(0, result='ok').__await__()
     282  
     283          loop = asyncio.new_event_loop()
     284          self.set_event_loop(loop)
     285          task = asyncio.ensure_future(Aw(), loop=loop)
     286          loop.run_until_complete(task)
     287          self.assertTrue(task.done())
     288          self.assertEqual(task.result(), 'ok')
     289          self.assertIsInstance(task.get_coro(), types.CoroutineType)
     290          loop.close()
     291  
     292      def test_ensure_future_neither(self):
     293          with self.assertRaises(TypeError):
     294              asyncio.ensure_future('ok')
     295  
     296      def test_ensure_future_error_msg(self):
     297          loop = asyncio.new_event_loop()
     298          f = self.new_future(self.loop)
     299          with self.assertRaisesRegex(ValueError, 'The future belongs to a '
     300                                      'different loop than the one specified as '
     301                                      'the loop argument'):
     302              asyncio.ensure_future(f, loop=loop)
     303          loop.close()
     304  
     305      def test_get_stack(self):
     306          T = None
     307  
     308          async def foo():
     309              await bar()
     310  
     311          async def bar():
     312              # test get_stack()
     313              f = T.get_stack(limit=1)
     314              try:
     315                  self.assertEqual(f[0].f_code.co_name, 'foo')
     316              finally:
     317                  f = None
     318  
     319              # test print_stack()
     320              file = io.StringIO()
     321              T.print_stack(limit=1, file=file)
     322              file.seek(0)
     323              tb = file.read()
     324              self.assertRegex(tb, r'foo\(\) running')
     325  
     326          async def runner():
     327              nonlocal T
     328              T = asyncio.ensure_future(foo(), loop=self.loop)
     329              await T
     330  
     331          self.loop.run_until_complete(runner())
     332  
     333      def test_task_repr(self):
     334          self.loop.set_debug(False)
     335  
     336          async def notmuch():
     337              return 'abc'
     338  
     339          # test coroutine function
     340          self.assertEqual(notmuch.__name__, 'notmuch')
     341          self.assertRegex(notmuch.__qualname__,
     342                           r'\w+.test_task_repr.<locals>.notmuch')
     343          self.assertEqual(notmuch.__module__, __name__)
     344  
     345          filename, lineno = test_utils.get_function_source(notmuch)
     346          src = "%s:%s" % (filename, lineno)
     347  
     348          # test coroutine object
     349          gen = notmuch()
     350          coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch'
     351          self.assertEqual(gen.__name__, 'notmuch')
     352          self.assertEqual(gen.__qualname__, coro_qualname)
     353  
     354          # test pending Task
     355          t = self.new_task(self.loop, gen)
     356          t.add_done_callback(Dummy())
     357  
     358          coro = format_coroutine(coro_qualname, 'running', src,
     359                                  t._source_traceback, generator=True)
     360          self.assertEqual(repr(t),
     361                           "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
     362  
     363          # test cancelling Task
     364          t.cancel()  # Does not take immediate effect!
     365          self.assertEqual(repr(t),
     366                           "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro)
     367  
     368          # test cancelled Task
     369          self.assertRaises(asyncio.CancelledError,
     370                            self.loop.run_until_complete, t)
     371          coro = format_coroutine(coro_qualname, 'done', src,
     372                                  t._source_traceback)
     373          self.assertEqual(repr(t),
     374                           "<Task cancelled name='TestTask' %s>" % coro)
     375  
     376          # test finished Task
     377          t = self.new_task(self.loop, notmuch())
     378          self.loop.run_until_complete(t)
     379          coro = format_coroutine(coro_qualname, 'done', src,
     380                                  t._source_traceback)
     381          self.assertEqual(repr(t),
     382                           "<Task finished name='TestTask' %s result='abc'>" % coro)
     383  
     384      def test_task_repr_autogenerated(self):
     385          async def notmuch():
     386              return 123
     387  
     388          t1 = self.new_task(self.loop, notmuch(), None)
     389          t2 = self.new_task(self.loop, notmuch(), None)
     390          self.assertNotEqual(repr(t1), repr(t2))
     391  
     392          match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1))
     393          self.assertIsNotNone(match1)
     394          match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2))
     395          self.assertIsNotNone(match2)
     396  
     397          # Autogenerated task names should have monotonically increasing numbers
     398          self.assertLess(int(match1.group(1)), int(match2.group(1)))
     399          self.loop.run_until_complete(t1)
     400          self.loop.run_until_complete(t2)
     401  
     402      def test_task_set_name_pylong(self):
     403          # test that setting the task name to a PyLong explicitly doesn't
     404          # incorrectly trigger the deferred name formatting logic
     405          async def notmuch():
     406              return 123
     407  
     408          t = self.new_task(self.loop, notmuch(), name=987654321)
     409          self.assertEqual(t.get_name(), '987654321')
     410          t.set_name(123456789)
     411          self.assertEqual(t.get_name(), '123456789')
     412          self.loop.run_until_complete(t)
     413  
     414      def test_task_repr_name_not_str(self):
     415          async def notmuch():
     416              return 123
     417  
     418          t = self.new_task(self.loop, notmuch())
     419          t.set_name({6})
     420          self.assertEqual(t.get_name(), '{6}')
     421          self.loop.run_until_complete(t)
     422  
     423      def test_task_repr_wait_for(self):
     424          self.loop.set_debug(False)
     425  
     426          async def wait_for(fut):
     427              return await fut
     428  
     429          fut = self.new_future(self.loop)
     430          task = self.new_task(self.loop, wait_for(fut))
     431          test_utils.run_briefly(self.loop)
     432          self.assertRegex(repr(task),
     433                           '<Task .* wait_for=%s>' % re.escape(repr(fut)))
     434  
     435          fut.set_result(None)
     436          self.loop.run_until_complete(task)
     437  
     438      def test_task_basics(self):
     439  
     440          async def outer():
     441              a = await inner1()
     442              b = await inner2()
     443              return a+b
     444  
     445          async def inner1():
     446              return 42
     447  
     448          async def inner2():
     449              return 1000
     450  
     451          t = outer()
     452          self.assertEqual(self.loop.run_until_complete(t), 1042)
     453  
     454      def test_exception_chaining_after_await(self):
     455          # Test that when awaiting on a task when an exception is already
     456          # active, if the task raises an exception it will be chained
     457          # with the original.
     458          loop = asyncio.new_event_loop()
     459          self.set_event_loop(loop)
     460  
     461          async def raise_error():
     462              raise ValueError
     463  
     464          async def run():
     465              try:
     466                  raise KeyError(3)
     467              except Exception as exc:
     468                  task = self.new_task(loop, raise_error())
     469                  try:
     470                      await task
     471                  except Exception as exc:
     472                      self.assertEqual(type(exc), ValueError)
     473                      chained = exc.__context__
     474                      self.assertEqual((type(chained), chained.args),
     475                          (KeyError, (3,)))
     476  
     477          try:
     478              task = self.new_task(loop, run())
     479              loop.run_until_complete(task)
     480          finally:
     481              loop.close()
     482  
     483      def test_exception_chaining_after_await_with_context_cycle(self):
     484          # Check trying to create an exception context cycle:
     485          # https://bugs.python.org/issue40696
     486          has_cycle = None
     487          loop = asyncio.new_event_loop()
     488          self.set_event_loop(loop)
     489  
     490          async def process_exc(exc):
     491              raise exc
     492  
     493          async def run():
     494              nonlocal has_cycle
     495              try:
     496                  raise KeyError('a')
     497              except Exception as exc:
     498                  task = self.new_task(loop, process_exc(exc))
     499                  try:
     500                      await task
     501                  except BaseException as exc:
     502                      has_cycle = (exc is exc.__context__)
     503                      # Prevent a hang if has_cycle is True.
     504                      exc.__context__ = None
     505  
     506          try:
     507              task = self.new_task(loop, run())
     508              loop.run_until_complete(task)
     509          finally:
     510              loop.close()
     511          # This also distinguishes from the initial has_cycle=None.
     512          self.assertEqual(has_cycle, False)
     513  
     514  
     515      def test_cancelling(self):
     516          loop = asyncio.new_event_loop()
     517  
     518          async def task():
     519              await asyncio.sleep(10)
     520  
     521          try:
     522              t = self.new_task(loop, task())
     523              self.assertFalse(t.cancelling())
     524              self.assertNotIn(" cancelling ", repr(t))
     525              self.assertTrue(t.cancel())
     526              self.assertTrue(t.cancelling())
     527              self.assertIn(" cancelling ", repr(t))
     528  
     529              # Since we commented out two lines from Task.cancel(),
     530              # this t.cancel() call now returns True.
     531              # self.assertFalse(t.cancel())
     532              self.assertTrue(t.cancel())
     533  
     534              with self.assertRaises(asyncio.CancelledError):
     535                  loop.run_until_complete(t)
     536          finally:
     537              loop.close()
     538  
     539      def test_uncancel_basic(self):
     540          loop = asyncio.new_event_loop()
     541  
     542          async def task():
     543              try:
     544                  await asyncio.sleep(10)
     545              except asyncio.CancelledError:
     546                  asyncio.current_task().uncancel()
     547                  await asyncio.sleep(10)
     548  
     549          try:
     550              t = self.new_task(loop, task())
     551              loop.run_until_complete(asyncio.sleep(0.01))
     552  
     553              # Cancel first sleep
     554              self.assertTrue(t.cancel())
     555              self.assertIn(" cancelling ", repr(t))
     556              self.assertEqual(t.cancelling(), 1)
     557              self.assertFalse(t.cancelled())  # Task is still not complete
     558              loop.run_until_complete(asyncio.sleep(0.01))
     559  
     560              # after .uncancel()
     561              self.assertNotIn(" cancelling ", repr(t))
     562              self.assertEqual(t.cancelling(), 0)
     563              self.assertFalse(t.cancelled())  # Task is still not complete
     564  
     565              # Cancel second sleep
     566              self.assertTrue(t.cancel())
     567              self.assertEqual(t.cancelling(), 1)
     568              self.assertFalse(t.cancelled())  # Task is still not complete
     569              with self.assertRaises(asyncio.CancelledError):
     570                  loop.run_until_complete(t)
     571              self.assertTrue(t.cancelled())  # Finally, task complete
     572              self.assertTrue(t.done())
     573  
     574              # uncancel is no longer effective after the task is complete
     575              t.uncancel()
     576              self.assertTrue(t.cancelled())
     577              self.assertTrue(t.done())
     578          finally:
     579              loop.close()
     580  
     581      def test_uncancel_structured_blocks(self):
     582          # This test recreates the following high-level structure using uncancel()::
     583          #
     584          #     async def make_request_with_timeout():
     585          #         try:
     586          #             async with asyncio.timeout(1):
     587          #                 # Structured block affected by the timeout:
     588          #                 await make_request()
     589          #                 await make_another_request()
     590          #         except TimeoutError:
     591          #             pass  # There was a timeout
     592          #         # Outer code not affected by the timeout:
     593          #         await unrelated_code()
     594  
     595          loop = asyncio.new_event_loop()
     596  
     597          async def make_request_with_timeout(*, sleep: float, timeout: float):
     598              task = asyncio.current_task()
     599              loop = task.get_loop()
     600  
     601              timed_out = False
     602              structured_block_finished = False
     603              outer_code_reached = False
     604  
     605              def on_timeout():
     606                  nonlocal timed_out
     607                  timed_out = True
     608                  task.cancel()
     609  
     610              timeout_handle = loop.call_later(timeout, on_timeout)
     611              try:
     612                  try:
     613                      # Structured block affected by the timeout
     614                      await asyncio.sleep(sleep)
     615                      structured_block_finished = True
     616                  finally:
     617                      timeout_handle.cancel()
     618                      if (
     619                          timed_out
     620                          and task.uncancel() == 0
     621                          and type(sys.exception()) is asyncio.CancelledError
     622                      ):
     623                          # Note the five rules that are needed here to satisfy proper
     624                          # uncancellation:
     625                          #
     626                          # 1. handle uncancellation in a `finally:` block to allow for
     627                          #    plain returns;
     628                          # 2. our `timed_out` flag is set, meaning that it was our event
     629                          #    that triggered the need to uncancel the task, regardless of
     630                          #    what exception is raised;
     631                          # 3. we can call `uncancel()` because *we* called `cancel()`
     632                          #    before;
     633                          # 4. we call `uncancel()` but we only continue converting the
     634                          #    CancelledError to TimeoutError if `uncancel()` caused the
     635                          #    cancellation request count go down to 0.  We need to look
     636                          #    at the counter vs having a simple boolean flag because our
     637                          #    code might have been nested (think multiple timeouts). See
     638                          #    commit 7fce1063b6e5a366f8504e039a8ccdd6944625cd for
     639                          #    details.
     640                          # 5. we only convert CancelledError to TimeoutError; for other
     641                          #    exceptions raised due to the cancellation (like
     642                          #    a ConnectionLostError from a database client), simply
     643                          #    propagate them.
     644                          #
     645                          # Those checks need to take place in this exact order to make
     646                          # sure the `cancelling()` counter always stays in sync.
     647                          #
     648                          # Additionally, the original stimulus to `cancel()` the task
     649                          # needs to be unscheduled to avoid re-cancelling the task later.
     650                          # Here we do it by cancelling `timeout_handle` in the `finally:`
     651                          # block.
     652                          raise TimeoutError
     653              except TimeoutError:
     654                  self.assertTrue(timed_out)
     655  
     656              # Outer code not affected by the timeout:
     657              outer_code_reached = True
     658              await asyncio.sleep(0)
     659              return timed_out, structured_block_finished, outer_code_reached
     660  
     661          try:
     662              # Test which timed out.
     663              t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1))
     664              timed_out, structured_block_finished, outer_code_reached = (
     665                  loop.run_until_complete(t1)
     666              )
     667              self.assertTrue(timed_out)
     668              self.assertFalse(structured_block_finished)  # it was cancelled
     669              self.assertTrue(outer_code_reached)  # task got uncancelled after leaving
     670                                                   # the structured block and continued until
     671                                                   # completion
     672              self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task
     673  
     674              # Test which did not time out.
     675              t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0))
     676              timed_out, structured_block_finished, outer_code_reached = (
     677                  loop.run_until_complete(t2)
     678              )
     679              self.assertFalse(timed_out)
     680              self.assertTrue(structured_block_finished)
     681              self.assertTrue(outer_code_reached)
     682              self.assertEqual(t2.cancelling(), 0)
     683          finally:
     684              loop.close()
     685  
     686      def test_cancel(self):
     687  
     688          def gen():
     689              when = yield
     690              self.assertAlmostEqual(10.0, when)
     691              yield 0
     692  
     693          loop = self.new_test_loop(gen)
     694  
     695          async def task():
     696              await asyncio.sleep(10.0)
     697              return 12
     698  
     699          t = self.new_task(loop, task())
     700          loop.call_soon(t.cancel)
     701          with self.assertRaises(asyncio.CancelledError):
     702              loop.run_until_complete(t)
     703          self.assertTrue(t.done())
     704          self.assertTrue(t.cancelled())
     705          self.assertFalse(t.cancel())
     706  
     707      def test_cancel_with_message_then_future_result(self):
     708          # Test Future.result() after calling cancel() with a message.
     709          cases = [
     710              ((), ()),
     711              ((None,), ()),
     712              (('my message',), ('my message',)),
     713              # Non-string values should roundtrip.
     714              ((5,), (5,)),
     715          ]
     716          for cancel_args, expected_args in cases:
     717              with self.subTest(cancel_args=cancel_args):
     718                  loop = asyncio.new_event_loop()
     719                  self.set_event_loop(loop)
     720  
     721                  async def sleep():
     722                      await asyncio.sleep(10)
     723  
     724                  async def coro():
     725                      task = self.new_task(loop, sleep())
     726                      await asyncio.sleep(0)
     727                      task.cancel(*cancel_args)
     728                      done, pending = await asyncio.wait([task])
     729                      task.result()
     730  
     731                  task = self.new_task(loop, coro())
     732                  with self.assertRaises(asyncio.CancelledError) as cm:
     733                      loop.run_until_complete(task)
     734                  exc = cm.exception
     735                  self.assertEqual(exc.args, expected_args)
     736  
     737                  actual = get_innermost_context(exc)
     738                  self.assertEqual(actual,
     739                      (asyncio.CancelledError, expected_args, 0))
     740  
     741      def test_cancel_with_message_then_future_exception(self):
     742          # Test Future.exception() after calling cancel() with a message.
     743          cases = [
     744              ((), ()),
     745              ((None,), ()),
     746              (('my message',), ('my message',)),
     747              # Non-string values should roundtrip.
     748              ((5,), (5,)),
     749          ]
     750          for cancel_args, expected_args in cases:
     751              with self.subTest(cancel_args=cancel_args):
     752                  loop = asyncio.new_event_loop()
     753                  self.set_event_loop(loop)
     754  
     755                  async def sleep():
     756                      await asyncio.sleep(10)
     757  
     758                  async def coro():
     759                      task = self.new_task(loop, sleep())
     760                      await asyncio.sleep(0)
     761                      task.cancel(*cancel_args)
     762                      done, pending = await asyncio.wait([task])
     763                      task.exception()
     764  
     765                  task = self.new_task(loop, coro())
     766                  with self.assertRaises(asyncio.CancelledError) as cm:
     767                      loop.run_until_complete(task)
     768                  exc = cm.exception
     769                  self.assertEqual(exc.args, expected_args)
     770  
     771                  actual = get_innermost_context(exc)
     772                  self.assertEqual(actual,
     773                      (asyncio.CancelledError, expected_args, 0))
     774  
     775      def test_cancellation_exception_context(self):
     776          loop = asyncio.new_event_loop()
     777          self.set_event_loop(loop)
     778          fut = loop.create_future()
     779  
     780          async def sleep():
     781              fut.set_result(None)
     782              await asyncio.sleep(10)
     783  
     784          async def coro():
     785              inner_task = self.new_task(loop, sleep())
     786              await fut
     787              loop.call_soon(inner_task.cancel, 'msg')
     788              try:
     789                  await inner_task
     790              except asyncio.CancelledError as ex:
     791                  raise ValueError("cancelled") from ex
     792  
     793          task = self.new_task(loop, coro())
     794          with self.assertRaises(ValueError) as cm:
     795              loop.run_until_complete(task)
     796          exc = cm.exception
     797          self.assertEqual(exc.args, ('cancelled',))
     798  
     799          actual = get_innermost_context(exc)
     800          self.assertEqual(actual,
     801              (asyncio.CancelledError, ('msg',), 1))
     802  
     803      def test_cancel_with_message_before_starting_task(self):
     804          loop = asyncio.new_event_loop()
     805          self.set_event_loop(loop)
     806  
     807          async def sleep():
     808              await asyncio.sleep(10)
     809  
     810          async def coro():
     811              task = self.new_task(loop, sleep())
     812              # We deliberately leave out the sleep here.
     813              task.cancel('my message')
     814              done, pending = await asyncio.wait([task])
     815              task.exception()
     816  
     817          task = self.new_task(loop, coro())
     818          with self.assertRaises(asyncio.CancelledError) as cm:
     819              loop.run_until_complete(task)
     820          exc = cm.exception
     821          self.assertEqual(exc.args, ('my message',))
     822  
     823          actual = get_innermost_context(exc)
     824          self.assertEqual(actual,
     825              (asyncio.CancelledError, ('my message',), 0))
     826  
     827      def test_cancel_yield(self):
     828          async def task():
     829              await asyncio.sleep(0)
     830              await asyncio.sleep(0)
     831              return 12
     832  
     833          t = self.new_task(self.loop, task())
     834          test_utils.run_briefly(self.loop)  # start coro
     835          t.cancel()
     836          self.assertRaises(
     837              asyncio.CancelledError, self.loop.run_until_complete, t)
     838          self.assertTrue(t.done())
     839          self.assertTrue(t.cancelled())
     840          self.assertFalse(t.cancel())
     841  
     842      def test_cancel_inner_future(self):
     843          f = self.new_future(self.loop)
     844  
     845          async def task():
     846              await f
     847              return 12
     848  
     849          t = self.new_task(self.loop, task())
     850          test_utils.run_briefly(self.loop)  # start task
     851          f.cancel()
     852          with self.assertRaises(asyncio.CancelledError):
     853              self.loop.run_until_complete(t)
     854          self.assertTrue(f.cancelled())
     855          self.assertTrue(t.cancelled())
     856  
     857      def test_cancel_both_task_and_inner_future(self):
     858          f = self.new_future(self.loop)
     859  
     860          async def task():
     861              await f
     862              return 12
     863  
     864          t = self.new_task(self.loop, task())
     865          test_utils.run_briefly(self.loop)
     866  
     867          f.cancel()
     868          t.cancel()
     869  
     870          with self.assertRaises(asyncio.CancelledError):
     871              self.loop.run_until_complete(t)
     872  
     873          self.assertTrue(t.done())
     874          self.assertTrue(f.cancelled())
     875          self.assertTrue(t.cancelled())
     876  
     877      def test_cancel_task_catching(self):
     878          fut1 = self.new_future(self.loop)
     879          fut2 = self.new_future(self.loop)
     880  
     881          async def task():
     882              await fut1
     883              try:
     884                  await fut2
     885              except asyncio.CancelledError:
     886                  return 42
     887  
     888          t = self.new_task(self.loop, task())
     889          test_utils.run_briefly(self.loop)
     890          self.assertIs(t._fut_waiter, fut1)  # White-box test.
     891          fut1.set_result(None)
     892          test_utils.run_briefly(self.loop)
     893          self.assertIs(t._fut_waiter, fut2)  # White-box test.
     894          t.cancel()
     895          self.assertTrue(fut2.cancelled())
     896          res = self.loop.run_until_complete(t)
     897          self.assertEqual(res, 42)
     898          self.assertFalse(t.cancelled())
     899  
     900      def test_cancel_task_ignoring(self):
     901          fut1 = self.new_future(self.loop)
     902          fut2 = self.new_future(self.loop)
     903          fut3 = self.new_future(self.loop)
     904  
     905          async def task():
     906              await fut1
     907              try:
     908                  await fut2
     909              except asyncio.CancelledError:
     910                  pass
     911              res = await fut3
     912              return res
     913  
     914          t = self.new_task(self.loop, task())
     915          test_utils.run_briefly(self.loop)
     916          self.assertIs(t._fut_waiter, fut1)  # White-box test.
     917          fut1.set_result(None)
     918          test_utils.run_briefly(self.loop)
     919          self.assertIs(t._fut_waiter, fut2)  # White-box test.
     920          t.cancel()
     921          self.assertTrue(fut2.cancelled())
     922          test_utils.run_briefly(self.loop)
     923          self.assertIs(t._fut_waiter, fut3)  # White-box test.
     924          fut3.set_result(42)
     925          res = self.loop.run_until_complete(t)
     926          self.assertEqual(res, 42)
     927          self.assertFalse(fut3.cancelled())
     928          self.assertFalse(t.cancelled())
     929  
     930      def test_cancel_current_task(self):
     931          loop = asyncio.new_event_loop()
     932          self.set_event_loop(loop)
     933  
     934          async def task():
     935              t.cancel()
     936              self.assertTrue(t._must_cancel)  # White-box test.
     937              # The sleep should be cancelled immediately.
     938              await asyncio.sleep(100)
     939              return 12
     940  
     941          t = self.new_task(loop, task())
     942          self.assertFalse(t.cancelled())
     943          self.assertRaises(
     944              asyncio.CancelledError, loop.run_until_complete, t)
     945          self.assertTrue(t.done())
     946          self.assertTrue(t.cancelled())
     947          self.assertFalse(t._must_cancel)  # White-box test.
     948          self.assertFalse(t.cancel())
     949  
     950      def test_cancel_at_end(self):
     951          """coroutine end right after task is cancelled"""
     952          loop = asyncio.new_event_loop()
     953          self.set_event_loop(loop)
     954  
     955          async def task():
     956              t.cancel()
     957              self.assertTrue(t._must_cancel)  # White-box test.
     958              return 12
     959  
     960          t = self.new_task(loop, task())
     961          self.assertFalse(t.cancelled())
     962          self.assertRaises(
     963              asyncio.CancelledError, loop.run_until_complete, t)
     964          self.assertTrue(t.done())
     965          self.assertTrue(t.cancelled())
     966          self.assertFalse(t._must_cancel)  # White-box test.
     967          self.assertFalse(t.cancel())
     968  
     969      def test_cancel_awaited_task(self):
     970          # This tests for a relatively rare condition when
     971          # a task cancellation is requested for a task which is not
     972          # currently blocked, such as a task cancelling itself.
     973          # In this situation we must ensure that whatever next future
     974          # or task the cancelled task blocks on is cancelled correctly
     975          # as well.  See also bpo-34872.
     976          loop = asyncio.new_event_loop()
     977          self.addCleanup(lambda: loop.close())
     978  
     979          task = nested_task = None
     980          fut = self.new_future(loop)
     981  
     982          async def nested():
     983              await fut
     984  
     985          async def coro():
     986              nonlocal nested_task
     987              # Create a sub-task and wait for it to run.
     988              nested_task = self.new_task(loop, nested())
     989              await asyncio.sleep(0)
     990  
     991              # Request the current task to be cancelled.
     992              task.cancel()
     993              # Block on the nested task, which should be immediately
     994              # cancelled.
     995              await nested_task
     996  
     997          task = self.new_task(loop, coro())
     998          with self.assertRaises(asyncio.CancelledError):
     999              loop.run_until_complete(task)
    1000  
    1001          self.assertTrue(task.cancelled())
    1002          self.assertTrue(nested_task.cancelled())
    1003          self.assertTrue(fut.cancelled())
    1004  
    1005      def assert_text_contains(self, text, substr):
    1006          if substr not in text:
    1007              raise RuntimeError(f'text {substr!r} not found in:\n>>>{text}<<<')
    1008  
    1009      def test_cancel_traceback_for_future_result(self):
    1010          # When calling Future.result() on a cancelled task, check that the
    1011          # line of code that was interrupted is included in the traceback.
    1012          loop = asyncio.new_event_loop()
    1013          self.set_event_loop(loop)
    1014  
    1015          async def nested():
    1016              # This will get cancelled immediately.
    1017              await asyncio.sleep(10)
    1018  
    1019          async def coro():
    1020              task = self.new_task(loop, nested())
    1021              await asyncio.sleep(0)
    1022              task.cancel()
    1023              await task  # search target
    1024  
    1025          task = self.new_task(loop, coro())
    1026          try:
    1027              loop.run_until_complete(task)
    1028          except asyncio.CancelledError:
    1029              tb = traceback.format_exc()
    1030              self.assert_text_contains(tb, "await asyncio.sleep(10)")
    1031              # The intermediate await should also be included.
    1032              self.assert_text_contains(tb, "await task  # search target")
    1033          else:
    1034              self.fail('CancelledError did not occur')
    1035  
    1036      def test_cancel_traceback_for_future_exception(self):
    1037          # When calling Future.exception() on a cancelled task, check that the
    1038          # line of code that was interrupted is included in the traceback.
    1039          loop = asyncio.new_event_loop()
    1040          self.set_event_loop(loop)
    1041  
    1042          async def nested():
    1043              # This will get cancelled immediately.
    1044              await asyncio.sleep(10)
    1045  
    1046          async def coro():
    1047              task = self.new_task(loop, nested())
    1048              await asyncio.sleep(0)
    1049              task.cancel()
    1050              done, pending = await asyncio.wait([task])
    1051              task.exception()  # search target
    1052  
    1053          task = self.new_task(loop, coro())
    1054          try:
    1055              loop.run_until_complete(task)
    1056          except asyncio.CancelledError:
    1057              tb = traceback.format_exc()
    1058              self.assert_text_contains(tb, "await asyncio.sleep(10)")
    1059              # The intermediate await should also be included.
    1060              self.assert_text_contains(tb,
    1061                  "task.exception()  # search target")
    1062          else:
    1063              self.fail('CancelledError did not occur')
    1064  
    1065      def test_stop_while_run_in_complete(self):
    1066  
    1067          def gen():
    1068              when = yield
    1069              self.assertAlmostEqual(0.1, when)
    1070              when = yield 0.1
    1071              self.assertAlmostEqual(0.2, when)
    1072              when = yield 0.1
    1073              self.assertAlmostEqual(0.3, when)
    1074              yield 0.1
    1075  
    1076          loop = self.new_test_loop(gen)
    1077  
    1078          x = 0
    1079  
    1080          async def task():
    1081              nonlocal x
    1082              while x < 10:
    1083                  await asyncio.sleep(0.1)
    1084                  x += 1
    1085                  if x == 2:
    1086                      loop.stop()
    1087  
    1088          t = self.new_task(loop, task())
    1089          with self.assertRaises(RuntimeError) as cm:
    1090              loop.run_until_complete(t)
    1091          self.assertEqual(str(cm.exception),
    1092                           'Event loop stopped before Future completed.')
    1093          self.assertFalse(t.done())
    1094          self.assertEqual(x, 2)
    1095          self.assertAlmostEqual(0.3, loop.time())
    1096  
    1097          t.cancel()
    1098          self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t)
    1099  
    1100      def test_log_traceback(self):
    1101          async def coro():
    1102              pass
    1103  
    1104          task = self.new_task(self.loop, coro())
    1105          with self.assertRaisesRegex(ValueError, 'can only be set to False'):
    1106              task._log_traceback = True
    1107          self.loop.run_until_complete(task)
    1108  
    1109      def test_wait(self):
    1110  
    1111          def gen():
    1112              when = yield
    1113              self.assertAlmostEqual(0.1, when)
    1114              when = yield 0
    1115              self.assertAlmostEqual(0.15, when)
    1116              yield 0.15
    1117  
    1118          loop = self.new_test_loop(gen)
    1119  
    1120          a = self.new_task(loop, asyncio.sleep(0.1))
    1121          b = self.new_task(loop, asyncio.sleep(0.15))
    1122  
    1123          async def foo():
    1124              done, pending = await asyncio.wait([b, a])
    1125              self.assertEqual(done, set([a, b]))
    1126              self.assertEqual(pending, set())
    1127              return 42
    1128  
    1129          res = loop.run_until_complete(self.new_task(loop, foo()))
    1130          self.assertEqual(res, 42)
    1131          self.assertAlmostEqual(0.15, loop.time())
    1132  
    1133          # Doing it again should take no time and exercise a different path.
    1134          res = loop.run_until_complete(self.new_task(loop, foo()))
    1135          self.assertAlmostEqual(0.15, loop.time())
    1136          self.assertEqual(res, 42)
    1137  
    1138      def test_wait_duplicate_coroutines(self):
    1139  
    1140          async def coro(s):
    1141              return s
    1142          c = self.loop.create_task(coro('test'))
    1143          task = self.new_task(
    1144              self.loop,
    1145              asyncio.wait([c, c, self.loop.create_task(coro('spam'))]))
    1146  
    1147          done, pending = self.loop.run_until_complete(task)
    1148  
    1149          self.assertFalse(pending)
    1150          self.assertEqual(set(f.result() for f in done), {'test', 'spam'})
    1151  
    1152      def test_wait_errors(self):
    1153          self.assertRaises(
    1154              ValueError, self.loop.run_until_complete,
    1155              asyncio.wait(set()))
    1156  
    1157          # -1 is an invalid return_when value
    1158          sleep_coro = asyncio.sleep(10.0)
    1159          wait_coro = asyncio.wait([sleep_coro], return_when=-1)
    1160          self.assertRaises(ValueError,
    1161                            self.loop.run_until_complete, wait_coro)
    1162  
    1163          sleep_coro.close()
    1164  
    1165      def test_wait_first_completed(self):
    1166  
    1167          def gen():
    1168              when = yield
    1169              self.assertAlmostEqual(10.0, when)
    1170              when = yield 0
    1171              self.assertAlmostEqual(0.1, when)
    1172              yield 0.1
    1173  
    1174          loop = self.new_test_loop(gen)
    1175  
    1176          a = self.new_task(loop, asyncio.sleep(10.0))
    1177          b = self.new_task(loop, asyncio.sleep(0.1))
    1178          task = self.new_task(
    1179              loop,
    1180              asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
    1181  
    1182          done, pending = loop.run_until_complete(task)
    1183          self.assertEqual({b}, done)
    1184          self.assertEqual({a}, pending)
    1185          self.assertFalse(a.done())
    1186          self.assertTrue(b.done())
    1187          self.assertIsNone(b.result())
    1188          self.assertAlmostEqual(0.1, loop.time())
    1189  
    1190          # move forward to close generator
    1191          loop.advance_time(10)
    1192          loop.run_until_complete(asyncio.wait([a, b]))
    1193  
    1194      def test_wait_really_done(self):
    1195          # there is possibility that some tasks in the pending list
    1196          # became done but their callbacks haven't all been called yet
    1197  
    1198          async def coro1():
    1199              await asyncio.sleep(0)
    1200  
    1201          async def coro2():
    1202              await asyncio.sleep(0)
    1203              await asyncio.sleep(0)
    1204  
    1205          a = self.new_task(self.loop, coro1())
    1206          b = self.new_task(self.loop, coro2())
    1207          task = self.new_task(
    1208              self.loop,
    1209              asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
    1210  
    1211          done, pending = self.loop.run_until_complete(task)
    1212          self.assertEqual({a, b}, done)
    1213          self.assertTrue(a.done())
    1214          self.assertIsNone(a.result())
    1215          self.assertTrue(b.done())
    1216          self.assertIsNone(b.result())
    1217  
    1218      def test_wait_first_exception(self):
    1219  
    1220          def gen():
    1221              when = yield
    1222              self.assertAlmostEqual(10.0, when)
    1223              yield 0
    1224  
    1225          loop = self.new_test_loop(gen)
    1226  
    1227          # first_exception, task already has exception
    1228          a = self.new_task(loop, asyncio.sleep(10.0))
    1229  
    1230          async def exc():
    1231              raise ZeroDivisionError('err')
    1232  
    1233          b = self.new_task(loop, exc())
    1234          task = self.new_task(
    1235              loop,
    1236              asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION))
    1237  
    1238          done, pending = loop.run_until_complete(task)
    1239          self.assertEqual({b}, done)
    1240          self.assertEqual({a}, pending)
    1241          self.assertAlmostEqual(0, loop.time())
    1242  
    1243          # move forward to close generator
    1244          loop.advance_time(10)
    1245          loop.run_until_complete(asyncio.wait([a, b]))
    1246  
    1247      def test_wait_first_exception_in_wait(self):
    1248  
    1249          def gen():
    1250              when = yield
    1251              self.assertAlmostEqual(10.0, when)
    1252              when = yield 0
    1253              self.assertAlmostEqual(0.01, when)
    1254              yield 0.01
    1255  
    1256          loop = self.new_test_loop(gen)
    1257  
    1258          # first_exception, exception during waiting
    1259          a = self.new_task(loop, asyncio.sleep(10.0))
    1260  
    1261          async def exc():
    1262              await asyncio.sleep(0.01)
    1263              raise ZeroDivisionError('err')
    1264  
    1265          b = self.new_task(loop, exc())
    1266          task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)
    1267  
    1268          done, pending = loop.run_until_complete(task)
    1269          self.assertEqual({b}, done)
    1270          self.assertEqual({a}, pending)
    1271          self.assertAlmostEqual(0.01, loop.time())
    1272  
    1273          # move forward to close generator
    1274          loop.advance_time(10)
    1275          loop.run_until_complete(asyncio.wait([a, b]))
    1276  
    1277      def test_wait_with_exception(self):
    1278  
    1279          def gen():
    1280              when = yield
    1281              self.assertAlmostEqual(0.1, when)
    1282              when = yield 0
    1283              self.assertAlmostEqual(0.15, when)
    1284              yield 0.15
    1285  
    1286          loop = self.new_test_loop(gen)
    1287  
    1288          a = self.new_task(loop, asyncio.sleep(0.1))
    1289  
    1290          async def sleeper():
    1291              await asyncio.sleep(0.15)
    1292              raise ZeroDivisionError('really')
    1293  
    1294          b = self.new_task(loop, sleeper())
    1295  
    1296          async def foo():
    1297              done, pending = await asyncio.wait([b, a])
    1298              self.assertEqual(len(done), 2)
    1299              self.assertEqual(pending, set())
    1300              errors = set(f for f in done if f.exception() is not None)
    1301              self.assertEqual(len(errors), 1)
    1302  
    1303          loop.run_until_complete(self.new_task(loop, foo()))
    1304          self.assertAlmostEqual(0.15, loop.time())
    1305  
    1306          loop.run_until_complete(self.new_task(loop, foo()))
    1307          self.assertAlmostEqual(0.15, loop.time())
    1308  
    1309      def test_wait_with_timeout(self):
    1310  
    1311          def gen():
    1312              when = yield
    1313              self.assertAlmostEqual(0.1, when)
    1314              when = yield 0
    1315              self.assertAlmostEqual(0.15, when)
    1316              when = yield 0
    1317              self.assertAlmostEqual(0.11, when)
    1318              yield 0.11
    1319  
    1320          loop = self.new_test_loop(gen)
    1321  
    1322          a = self.new_task(loop, asyncio.sleep(0.1))
    1323          b = self.new_task(loop, asyncio.sleep(0.15))
    1324  
    1325          async def foo():
    1326              done, pending = await asyncio.wait([b, a], timeout=0.11)
    1327              self.assertEqual(done, set([a]))
    1328              self.assertEqual(pending, set([b]))
    1329  
    1330          loop.run_until_complete(self.new_task(loop, foo()))
    1331          self.assertAlmostEqual(0.11, loop.time())
    1332  
    1333          # move forward to close generator
    1334          loop.advance_time(10)
    1335          loop.run_until_complete(asyncio.wait([a, b]))
    1336  
    1337      def test_wait_concurrent_complete(self):
    1338  
    1339          def gen():
    1340              when = yield
    1341              self.assertAlmostEqual(0.1, when)
    1342              when = yield 0
    1343              self.assertAlmostEqual(0.15, when)
    1344              when = yield 0
    1345              self.assertAlmostEqual(0.1, when)
    1346              yield 0.1
    1347  
    1348          loop = self.new_test_loop(gen)
    1349  
    1350          a = self.new_task(loop, asyncio.sleep(0.1))
    1351          b = self.new_task(loop, asyncio.sleep(0.15))
    1352  
    1353          done, pending = loop.run_until_complete(
    1354              asyncio.wait([b, a], timeout=0.1))
    1355  
    1356          self.assertEqual(done, set([a]))
    1357          self.assertEqual(pending, set([b]))
    1358          self.assertAlmostEqual(0.1, loop.time())
    1359  
    1360          # move forward to close generator
    1361          loop.advance_time(10)
    1362          loop.run_until_complete(asyncio.wait([a, b]))
    1363  
    1364      def test_wait_with_iterator_of_tasks(self):
    1365  
    1366          def gen():
    1367              when = yield
    1368              self.assertAlmostEqual(0.1, when)
    1369              when = yield 0
    1370              self.assertAlmostEqual(0.15, when)
    1371              yield 0.15
    1372  
    1373          loop = self.new_test_loop(gen)
    1374  
    1375          a = self.new_task(loop, asyncio.sleep(0.1))
    1376          b = self.new_task(loop, asyncio.sleep(0.15))
    1377  
    1378          async def foo():
    1379              done, pending = await asyncio.wait(iter([b, a]))
    1380              self.assertEqual(done, set([a, b]))
    1381              self.assertEqual(pending, set())
    1382              return 42
    1383  
    1384          res = loop.run_until_complete(self.new_task(loop, foo()))
    1385          self.assertEqual(res, 42)
    1386          self.assertAlmostEqual(0.15, loop.time())
    1387  
    1388  
    1389      def test_wait_generator(self):
    1390          async def func(a):
    1391              return a
    1392  
    1393          loop = self.new_test_loop()
    1394  
    1395          async def main():
    1396              tasks = (self.new_task(loop, func(i)) for i in range(10))
    1397              done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
    1398              self.assertEqual(len(done), 10)
    1399              self.assertEqual(len(pending), 0)
    1400  
    1401          loop.run_until_complete(main())
    1402  
    1403  
    1404      def test_as_completed(self):
    1405  
    1406          def gen():
    1407              yield 0
    1408              yield 0
    1409              yield 0.01
    1410              yield 0
    1411  
    1412          loop = self.new_test_loop(gen)
    1413          # disable "slow callback" warning
    1414          loop.slow_callback_duration = 1.0
    1415          completed = set()
    1416          time_shifted = False
    1417  
    1418          async def sleeper(dt, x):
    1419              nonlocal time_shifted
    1420              await asyncio.sleep(dt)
    1421              completed.add(x)
    1422              if not time_shifted and 'a' in completed and 'b' in completed:
    1423                  time_shifted = True
    1424                  loop.advance_time(0.14)
    1425              return x
    1426  
    1427          a = sleeper(0.01, 'a')
    1428          b = sleeper(0.01, 'b')
    1429          c = sleeper(0.15, 'c')
    1430  
    1431          async def foo():
    1432              values = []
    1433              for f in asyncio.as_completed([b, c, a]):
    1434                  values.append(await f)
    1435              return values
    1436  
    1437          res = loop.run_until_complete(self.new_task(loop, foo()))
    1438          self.assertAlmostEqual(0.15, loop.time())
    1439          self.assertTrue('a' in res[:2])
    1440          self.assertTrue('b' in res[:2])
    1441          self.assertEqual(res[2], 'c')
    1442  
    1443      def test_as_completed_with_timeout(self):
    1444  
    1445          def gen():
    1446              yield
    1447              yield 0
    1448              yield 0
    1449              yield 0.1
    1450  
    1451          loop = self.new_test_loop(gen)
    1452  
    1453          a = loop.create_task(asyncio.sleep(0.1, 'a'))
    1454          b = loop.create_task(asyncio.sleep(0.15, 'b'))
    1455  
    1456          async def foo():
    1457              values = []
    1458              for f in asyncio.as_completed([a, b], timeout=0.12):
    1459                  if values:
    1460                      loop.advance_time(0.02)
    1461                  try:
    1462                      v = await f
    1463                      values.append((1, v))
    1464                  except asyncio.TimeoutError as exc:
    1465                      values.append((2, exc))
    1466              return values
    1467  
    1468          res = loop.run_until_complete(self.new_task(loop, foo()))
    1469          self.assertEqual(len(res), 2, res)
    1470          self.assertEqual(res[0], (1, 'a'))
    1471          self.assertEqual(res[1][0], 2)
    1472          self.assertIsInstance(res[1][1], asyncio.TimeoutError)
    1473          self.assertAlmostEqual(0.12, loop.time())
    1474  
    1475          # move forward to close generator
    1476          loop.advance_time(10)
    1477          loop.run_until_complete(asyncio.wait([a, b]))
    1478  
    1479      def test_as_completed_with_unused_timeout(self):
    1480  
    1481          def gen():
    1482              yield
    1483              yield 0
    1484              yield 0.01
    1485  
    1486          loop = self.new_test_loop(gen)
    1487  
    1488          a = asyncio.sleep(0.01, 'a')
    1489  
    1490          async def foo():
    1491              for f in asyncio.as_completed([a], timeout=1):
    1492                  v = await f
    1493                  self.assertEqual(v, 'a')
    1494  
    1495          loop.run_until_complete(self.new_task(loop, foo()))
    1496  
    1497      def test_as_completed_reverse_wait(self):
    1498  
    1499          def gen():
    1500              yield 0
    1501              yield 0.05
    1502              yield 0
    1503  
    1504          loop = self.new_test_loop(gen)
    1505  
    1506          a = asyncio.sleep(0.05, 'a')
    1507          b = asyncio.sleep(0.10, 'b')
    1508          fs = {a, b}
    1509  
    1510          async def test():
    1511              futs = list(asyncio.as_completed(fs))
    1512              self.assertEqual(len(futs), 2)
    1513  
    1514              x = await futs[1]
    1515              self.assertEqual(x, 'a')
    1516              self.assertAlmostEqual(0.05, loop.time())
    1517              loop.advance_time(0.05)
    1518              y = await futs[0]
    1519              self.assertEqual(y, 'b')
    1520              self.assertAlmostEqual(0.10, loop.time())
    1521  
    1522          loop.run_until_complete(test())
    1523  
    1524      def test_as_completed_concurrent(self):
    1525  
    1526          def gen():
    1527              when = yield
    1528              self.assertAlmostEqual(0.05, when)
    1529              when = yield 0
    1530              self.assertAlmostEqual(0.05, when)
    1531              yield 0.05
    1532  
    1533          a = asyncio.sleep(0.05, 'a')
    1534          b = asyncio.sleep(0.05, 'b')
    1535          fs = {a, b}
    1536  
    1537          async def test():
    1538              futs = list(asyncio.as_completed(fs))
    1539              self.assertEqual(len(futs), 2)
    1540              done, pending = await asyncio.wait(
    1541                  [asyncio.ensure_future(fut) for fut in futs]
    1542              )
    1543              self.assertEqual(set(f.result() for f in done), {'a', 'b'})
    1544  
    1545          loop = self.new_test_loop(gen)
    1546          loop.run_until_complete(test())
    1547  
    1548      def test_as_completed_duplicate_coroutines(self):
    1549  
    1550          async def coro(s):
    1551              return s
    1552  
    1553          async def runner():
    1554              result = []
    1555              c = coro('ham')
    1556              for f in asyncio.as_completed([c, c, coro('spam')]):
    1557                  result.append(await f)
    1558              return result
    1559  
    1560          fut = self.new_task(self.loop, runner())
    1561          self.loop.run_until_complete(fut)
    1562          result = fut.result()
    1563          self.assertEqual(set(result), {'ham', 'spam'})
    1564          self.assertEqual(len(result), 2)
    1565  
    1566      def test_as_completed_coroutine_without_loop(self):
    1567          async def coro():
    1568              return 42
    1569  
    1570          a = coro()
    1571          self.addCleanup(a.close)
    1572  
    1573          futs = asyncio.as_completed([a])
    1574          with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
    1575              list(futs)
    1576  
    1577      def test_as_completed_coroutine_use_running_loop(self):
    1578          loop = self.new_test_loop()
    1579  
    1580          async def coro():
    1581              return 42
    1582  
    1583          async def test():
    1584              futs = list(asyncio.as_completed([coro()]))
    1585              self.assertEqual(len(futs), 1)
    1586              self.assertEqual(await futs[0], 42)
    1587  
    1588          loop.run_until_complete(test())
    1589  
    1590      def test_sleep(self):
    1591  
    1592          def gen():
    1593              when = yield
    1594              self.assertAlmostEqual(0.05, when)
    1595              when = yield 0.05
    1596              self.assertAlmostEqual(0.1, when)
    1597              yield 0.05
    1598  
    1599          loop = self.new_test_loop(gen)
    1600  
    1601          async def sleeper(dt, arg):
    1602              await asyncio.sleep(dt/2)
    1603              res = await asyncio.sleep(dt/2, arg)
    1604              return res
    1605  
    1606          t = self.new_task(loop, sleeper(0.1, 'yeah'))
    1607          loop.run_until_complete(t)
    1608          self.assertTrue(t.done())
    1609          self.assertEqual(t.result(), 'yeah')
    1610          self.assertAlmostEqual(0.1, loop.time())
    1611  
    1612      def test_sleep_cancel(self):
    1613  
    1614          def gen():
    1615              when = yield
    1616              self.assertAlmostEqual(10.0, when)
    1617              yield 0
    1618  
    1619          loop = self.new_test_loop(gen)
    1620  
    1621          t = self.new_task(loop, asyncio.sleep(10.0, 'yeah'))
    1622  
    1623          handle = None
    1624          orig_call_later = loop.call_later
    1625  
    1626          def call_later(delay, callback, *args):
    1627              nonlocal handle
    1628              handle = orig_call_later(delay, callback, *args)
    1629              return handle
    1630  
    1631          loop.call_later = call_later
    1632          test_utils.run_briefly(loop)
    1633  
    1634          self.assertFalse(handle._cancelled)
    1635  
    1636          t.cancel()
    1637          test_utils.run_briefly(loop)
    1638          self.assertTrue(handle._cancelled)
    1639  
    1640      def test_task_cancel_sleeping_task(self):
    1641  
    1642          def gen():
    1643              when = yield
    1644              self.assertAlmostEqual(0.1, when)
    1645              when = yield 0
    1646              self.assertAlmostEqual(5000, when)
    1647              yield 0.1
    1648  
    1649          loop = self.new_test_loop(gen)
    1650  
    1651          async def sleep(dt):
    1652              await asyncio.sleep(dt)
    1653  
    1654          async def doit():
    1655              sleeper = self.new_task(loop, sleep(5000))
    1656              loop.call_later(0.1, sleeper.cancel)
    1657              try:
    1658                  await sleeper
    1659              except asyncio.CancelledError:
    1660                  return 'cancelled'
    1661              else:
    1662                  return 'slept in'
    1663  
    1664          doer = doit()
    1665          self.assertEqual(loop.run_until_complete(doer), 'cancelled')
    1666          self.assertAlmostEqual(0.1, loop.time())
    1667  
    1668      def test_task_cancel_waiter_future(self):
    1669          fut = self.new_future(self.loop)
    1670  
    1671          async def coro():
    1672              await fut
    1673  
    1674          task = self.new_task(self.loop, coro())
    1675          test_utils.run_briefly(self.loop)
    1676          self.assertIs(task._fut_waiter, fut)
    1677  
    1678          task.cancel()
    1679          test_utils.run_briefly(self.loop)
    1680          self.assertRaises(
    1681              asyncio.CancelledError, self.loop.run_until_complete, task)
    1682          self.assertIsNone(task._fut_waiter)
    1683          self.assertTrue(fut.cancelled())
    1684  
    1685      def test_task_set_methods(self):
    1686          async def notmuch():
    1687              return 'ko'
    1688  
    1689          gen = notmuch()
    1690          task = self.new_task(self.loop, gen)
    1691  
    1692          with self.assertRaisesRegex(RuntimeError, 'not support set_result'):
    1693              task.set_result('ok')
    1694  
    1695          with self.assertRaisesRegex(RuntimeError, 'not support set_exception'):
    1696              task.set_exception(ValueError())
    1697  
    1698          self.assertEqual(
    1699              self.loop.run_until_complete(task),
    1700              'ko')
    1701  
    1702      def test_step_result_future(self):
    1703          # If coroutine returns future, task waits on this future.
    1704  
    1705          class ESC[4;38;5;81mFut(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mFuture):
    1706              def __init__(self, *args, **kwds):
    1707                  self.cb_added = False
    1708                  super().__init__(*args, **kwds)
    1709  
    1710              def add_done_callback(self, *args, **kwargs):
    1711                  self.cb_added = True
    1712                  super().add_done_callback(*args, **kwargs)
    1713  
    1714          fut = Fut(loop=self.loop)
    1715          result = None
    1716  
    1717          async def wait_for_future():
    1718              nonlocal result
    1719              result = await fut
    1720  
    1721          t = self.new_task(self.loop, wait_for_future())
    1722          test_utils.run_briefly(self.loop)
    1723          self.assertTrue(fut.cb_added)
    1724  
    1725          res = object()
    1726          fut.set_result(res)
    1727          test_utils.run_briefly(self.loop)
    1728          self.assertIs(res, result)
    1729          self.assertTrue(t.done())
    1730          self.assertIsNone(t.result())
    1731  
    1732      def test_baseexception_during_cancel(self):
    1733  
    1734          def gen():
    1735              when = yield
    1736              self.assertAlmostEqual(10.0, when)
    1737              yield 0
    1738  
    1739          loop = self.new_test_loop(gen)
    1740  
    1741          async def sleeper():
    1742              await asyncio.sleep(10)
    1743  
    1744          base_exc = SystemExit()
    1745  
    1746          async def notmutch():
    1747              try:
    1748                  await sleeper()
    1749              except asyncio.CancelledError:
    1750                  raise base_exc
    1751  
    1752          task = self.new_task(loop, notmutch())
    1753          test_utils.run_briefly(loop)
    1754  
    1755          task.cancel()
    1756          self.assertFalse(task.done())
    1757  
    1758          self.assertRaises(SystemExit, test_utils.run_briefly, loop)
    1759  
    1760          self.assertTrue(task.done())
    1761          self.assertFalse(task.cancelled())
    1762          self.assertIs(task.exception(), base_exc)
    1763  
    1764      def test_iscoroutinefunction(self):
    1765          def fn():
    1766              pass
    1767  
    1768          self.assertFalse(asyncio.iscoroutinefunction(fn))
    1769  
    1770          def fn1():
    1771              yield
    1772          self.assertFalse(asyncio.iscoroutinefunction(fn1))
    1773  
    1774          async def fn2():
    1775              pass
    1776          self.assertTrue(asyncio.iscoroutinefunction(fn2))
    1777  
    1778          self.assertFalse(asyncio.iscoroutinefunction(mock.Mock()))
    1779          self.assertTrue(asyncio.iscoroutinefunction(mock.AsyncMock()))
    1780  
    1781      def test_coroutine_non_gen_function(self):
    1782          async def func():
    1783              return 'test'
    1784  
    1785          self.assertTrue(asyncio.iscoroutinefunction(func))
    1786  
    1787          coro = func()
    1788          self.assertTrue(asyncio.iscoroutine(coro))
    1789  
    1790          res = self.loop.run_until_complete(coro)
    1791          self.assertEqual(res, 'test')
    1792  
    1793      def test_coroutine_non_gen_function_return_future(self):
    1794          fut = self.new_future(self.loop)
    1795  
    1796          async def func():
    1797              return fut
    1798  
    1799          async def coro():
    1800              fut.set_result('test')
    1801  
    1802          t1 = self.new_task(self.loop, func())
    1803          t2 = self.new_task(self.loop, coro())
    1804          res = self.loop.run_until_complete(t1)
    1805          self.assertEqual(res, fut)
    1806          self.assertIsNone(t2.result())
    1807  
    1808      def test_current_task(self):
    1809          self.assertIsNone(asyncio.current_task(loop=self.loop))
    1810  
    1811          async def coro(loop):
    1812              self.assertIs(asyncio.current_task(), task)
    1813  
    1814              self.assertIs(asyncio.current_task(None), task)
    1815              self.assertIs(asyncio.current_task(), task)
    1816  
    1817          task = self.new_task(self.loop, coro(self.loop))
    1818          self.loop.run_until_complete(task)
    1819          self.assertIsNone(asyncio.current_task(loop=self.loop))
    1820  
    1821      def test_current_task_with_interleaving_tasks(self):
    1822          self.assertIsNone(asyncio.current_task(loop=self.loop))
    1823  
    1824          fut1 = self.new_future(self.loop)
    1825          fut2 = self.new_future(self.loop)
    1826  
    1827          async def coro1(loop):
    1828              self.assertTrue(asyncio.current_task() is task1)
    1829              await fut1
    1830              self.assertTrue(asyncio.current_task() is task1)
    1831              fut2.set_result(True)
    1832  
    1833          async def coro2(loop):
    1834              self.assertTrue(asyncio.current_task() is task2)
    1835              fut1.set_result(True)
    1836              await fut2
    1837              self.assertTrue(asyncio.current_task() is task2)
    1838  
    1839          task1 = self.new_task(self.loop, coro1(self.loop))
    1840          task2 = self.new_task(self.loop, coro2(self.loop))
    1841  
    1842          self.loop.run_until_complete(asyncio.wait((task1, task2)))
    1843          self.assertIsNone(asyncio.current_task(loop=self.loop))
    1844  
    1845      # Some thorough tests for cancellation propagation through
    1846      # coroutines, tasks and wait().
    1847  
    1848      def test_yield_future_passes_cancel(self):
    1849          # Cancelling outer() cancels inner() cancels waiter.
    1850          proof = 0
    1851          waiter = self.new_future(self.loop)
    1852  
    1853          async def inner():
    1854              nonlocal proof
    1855              try:
    1856                  await waiter
    1857              except asyncio.CancelledError:
    1858                  proof += 1
    1859                  raise
    1860              else:
    1861                  self.fail('got past sleep() in inner()')
    1862  
    1863          async def outer():
    1864              nonlocal proof
    1865              try:
    1866                  await inner()
    1867              except asyncio.CancelledError:
    1868                  proof += 100  # Expect this path.
    1869              else:
    1870                  proof += 10
    1871  
    1872          f = asyncio.ensure_future(outer(), loop=self.loop)
    1873          test_utils.run_briefly(self.loop)
    1874          f.cancel()
    1875          self.loop.run_until_complete(f)
    1876          self.assertEqual(proof, 101)
    1877          self.assertTrue(waiter.cancelled())
    1878  
    1879      def test_yield_wait_does_not_shield_cancel(self):
    1880          # Cancelling outer() makes wait() return early, leaves inner()
    1881          # running.
    1882          proof = 0
    1883          waiter = self.new_future(self.loop)
    1884  
    1885          async def inner():
    1886              nonlocal proof
    1887              await waiter
    1888              proof += 1
    1889  
    1890          async def outer():
    1891              nonlocal proof
    1892              with self.assertWarns(DeprecationWarning):
    1893                  d, p = await asyncio.wait([asyncio.create_task(inner())])
    1894              proof += 100
    1895  
    1896          f = asyncio.ensure_future(outer(), loop=self.loop)
    1897          test_utils.run_briefly(self.loop)
    1898          f.cancel()
    1899          self.assertRaises(
    1900              asyncio.CancelledError, self.loop.run_until_complete, f)
    1901          waiter.set_result(None)
    1902          test_utils.run_briefly(self.loop)
    1903          self.assertEqual(proof, 1)
    1904  
    1905      def test_shield_result(self):
    1906          inner = self.new_future(self.loop)
    1907          outer = asyncio.shield(inner)
    1908          inner.set_result(42)
    1909          res = self.loop.run_until_complete(outer)
    1910          self.assertEqual(res, 42)
    1911  
    1912      def test_shield_exception(self):
    1913          inner = self.new_future(self.loop)
    1914          outer = asyncio.shield(inner)
    1915          test_utils.run_briefly(self.loop)
    1916          exc = RuntimeError('expected')
    1917          inner.set_exception(exc)
    1918          test_utils.run_briefly(self.loop)
    1919          self.assertIs(outer.exception(), exc)
    1920  
    1921      def test_shield_cancel_inner(self):
    1922          inner = self.new_future(self.loop)
    1923          outer = asyncio.shield(inner)
    1924          test_utils.run_briefly(self.loop)
    1925          inner.cancel()
    1926          test_utils.run_briefly(self.loop)
    1927          self.assertTrue(outer.cancelled())
    1928  
    1929      def test_shield_cancel_outer(self):
    1930          inner = self.new_future(self.loop)
    1931          outer = asyncio.shield(inner)
    1932          test_utils.run_briefly(self.loop)
    1933          outer.cancel()
    1934          test_utils.run_briefly(self.loop)
    1935          self.assertTrue(outer.cancelled())
    1936          self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks))
    1937  
    1938      def test_shield_shortcut(self):
    1939          fut = self.new_future(self.loop)
    1940          fut.set_result(42)
    1941          res = self.loop.run_until_complete(asyncio.shield(fut))
    1942          self.assertEqual(res, 42)
    1943  
    1944      def test_shield_effect(self):
    1945          # Cancelling outer() does not affect inner().
    1946          proof = 0
    1947          waiter = self.new_future(self.loop)
    1948  
    1949          async def inner():
    1950              nonlocal proof
    1951              await waiter
    1952              proof += 1
    1953  
    1954          async def outer():
    1955              nonlocal proof
    1956              await asyncio.shield(inner())
    1957              proof += 100
    1958  
    1959          f = asyncio.ensure_future(outer(), loop=self.loop)
    1960          test_utils.run_briefly(self.loop)
    1961          f.cancel()
    1962          with self.assertRaises(asyncio.CancelledError):
    1963              self.loop.run_until_complete(f)
    1964          waiter.set_result(None)
    1965          test_utils.run_briefly(self.loop)
    1966          self.assertEqual(proof, 1)
    1967  
    1968      def test_shield_gather(self):
    1969          child1 = self.new_future(self.loop)
    1970          child2 = self.new_future(self.loop)
    1971          parent = asyncio.gather(child1, child2)
    1972          outer = asyncio.shield(parent)
    1973          test_utils.run_briefly(self.loop)
    1974          outer.cancel()
    1975          test_utils.run_briefly(self.loop)
    1976          self.assertTrue(outer.cancelled())
    1977          child1.set_result(1)
    1978          child2.set_result(2)
    1979          test_utils.run_briefly(self.loop)
    1980          self.assertEqual(parent.result(), [1, 2])
    1981  
    1982      def test_gather_shield(self):
    1983          child1 = self.new_future(self.loop)
    1984          child2 = self.new_future(self.loop)
    1985          inner1 = asyncio.shield(child1)
    1986          inner2 = asyncio.shield(child2)
    1987          parent = asyncio.gather(inner1, inner2)
    1988          test_utils.run_briefly(self.loop)
    1989          parent.cancel()
    1990          # This should cancel inner1 and inner2 but bot child1 and child2.
    1991          test_utils.run_briefly(self.loop)
    1992          self.assertIsInstance(parent.exception(), asyncio.CancelledError)
    1993          self.assertTrue(inner1.cancelled())
    1994          self.assertTrue(inner2.cancelled())
    1995          child1.set_result(1)
    1996          child2.set_result(2)
    1997          test_utils.run_briefly(self.loop)
    1998  
    1999      def test_shield_coroutine_without_loop(self):
    2000          async def coro():
    2001              return 42
    2002  
    2003          inner = coro()
    2004          self.addCleanup(inner.close)
    2005          with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
    2006              asyncio.shield(inner)
    2007  
    2008      def test_shield_coroutine_use_running_loop(self):
    2009          async def coro():
    2010              return 42
    2011  
    2012          async def test():
    2013              return asyncio.shield(coro())
    2014          outer = self.loop.run_until_complete(test())
    2015          self.assertEqual(outer._loop, self.loop)
    2016          res = self.loop.run_until_complete(outer)
    2017          self.assertEqual(res, 42)
    2018  
    2019      def test_shield_coroutine_use_global_loop(self):
    2020          # Deprecated in 3.10, undeprecated in 3.12
    2021          async def coro():
    2022              return 42
    2023  
    2024          asyncio.set_event_loop(self.loop)
    2025          self.addCleanup(asyncio.set_event_loop, None)
    2026          outer = asyncio.shield(coro())
    2027          self.assertEqual(outer._loop, self.loop)
    2028          res = self.loop.run_until_complete(outer)
    2029          self.assertEqual(res, 42)
    2030  
    2031      def test_as_completed_invalid_args(self):
    2032          fut = self.new_future(self.loop)
    2033  
    2034          # as_completed() expects a list of futures, not a future instance
    2035          self.assertRaises(TypeError, self.loop.run_until_complete,
    2036              asyncio.as_completed(fut))
    2037          coro = coroutine_function()
    2038          self.assertRaises(TypeError, self.loop.run_until_complete,
    2039              asyncio.as_completed(coro))
    2040          coro.close()
    2041  
    2042      def test_wait_invalid_args(self):
    2043          fut = self.new_future(self.loop)
    2044  
    2045          # wait() expects a list of futures, not a future instance
    2046          self.assertRaises(TypeError, self.loop.run_until_complete,
    2047              asyncio.wait(fut))
    2048          coro = coroutine_function()
    2049          self.assertRaises(TypeError, self.loop.run_until_complete,
    2050              asyncio.wait(coro))
    2051          coro.close()
    2052  
    2053          # wait() expects at least a future
    2054          self.assertRaises(ValueError, self.loop.run_until_complete,
    2055              asyncio.wait([]))
    2056  
    2057      def test_log_destroyed_pending_task(self):
    2058          Task = self.__class__.Task
    2059  
    2060          async def kill_me(loop):
    2061              future = self.new_future(loop)
    2062              await future
    2063              # at this point, the only reference to kill_me() task is
    2064              # the Task._wakeup() method in future._callbacks
    2065              raise Exception("code never reached")
    2066  
    2067          mock_handler = mock.Mock()
    2068          self.loop.set_debug(True)
    2069          self.loop.set_exception_handler(mock_handler)
    2070  
    2071          # schedule the task
    2072          coro = kill_me(self.loop)
    2073          task = asyncio.ensure_future(coro, loop=self.loop)
    2074  
    2075          self.assertEqual(asyncio.all_tasks(loop=self.loop), {task})
    2076  
    2077          asyncio.set_event_loop(None)
    2078  
    2079          # execute the task so it waits for future
    2080          self.loop._run_once()
    2081          self.assertEqual(len(self.loop._ready), 0)
    2082  
    2083          coro = None
    2084          source_traceback = task._source_traceback
    2085          task = None
    2086  
    2087          # no more reference to kill_me() task: the task is destroyed by the GC
    2088          support.gc_collect()
    2089  
    2090          self.assertEqual(asyncio.all_tasks(loop=self.loop), set())
    2091  
    2092          mock_handler.assert_called_with(self.loop, {
    2093              'message': 'Task was destroyed but it is pending!',
    2094              'task': mock.ANY,
    2095              'source_traceback': source_traceback,
    2096          })
    2097          mock_handler.reset_mock()
    2098  
    2099      @mock.patch('asyncio.base_events.logger')
    2100      def test_tb_logger_not_called_after_cancel(self, m_log):
    2101          loop = asyncio.new_event_loop()
    2102          self.set_event_loop(loop)
    2103  
    2104          async def coro():
    2105              raise TypeError
    2106  
    2107          async def runner():
    2108              task = self.new_task(loop, coro())
    2109              await asyncio.sleep(0.05)
    2110              task.cancel()
    2111              task = None
    2112  
    2113          loop.run_until_complete(runner())
    2114          self.assertFalse(m_log.error.called)
    2115  
    2116      def test_task_source_traceback(self):
    2117          self.loop.set_debug(True)
    2118  
    2119          task = self.new_task(self.loop, coroutine_function())
    2120          lineno = sys._getframe().f_lineno - 1
    2121          self.assertIsInstance(task._source_traceback, list)
    2122          self.assertEqual(task._source_traceback[-2][:3],
    2123                           (__file__,
    2124                            lineno,
    2125                            'test_task_source_traceback'))
    2126          self.loop.run_until_complete(task)
    2127  
    2128      def test_cancel_gather_1(self):
    2129          """Ensure that a gathering future refuses to be cancelled once all
    2130          children are done"""
    2131          loop = asyncio.new_event_loop()
    2132          self.addCleanup(loop.close)
    2133  
    2134          fut = self.new_future(loop)
    2135          async def create():
    2136              # The indirection fut->child_coro is needed since otherwise the
    2137              # gathering task is done at the same time as the child future
    2138              async def child_coro():
    2139                  return await fut
    2140              gather_future = asyncio.gather(child_coro())
    2141              return asyncio.ensure_future(gather_future)
    2142          gather_task = loop.run_until_complete(create())
    2143  
    2144          cancel_result = None
    2145          def cancelling_callback(_):
    2146              nonlocal cancel_result
    2147              cancel_result = gather_task.cancel()
    2148          fut.add_done_callback(cancelling_callback)
    2149  
    2150          fut.set_result(42) # calls the cancelling_callback after fut is done()
    2151  
    2152          # At this point the task should complete.
    2153          loop.run_until_complete(gather_task)
    2154  
    2155          # Python issue #26923: asyncio.gather drops cancellation
    2156          self.assertEqual(cancel_result, False)
    2157          self.assertFalse(gather_task.cancelled())
    2158          self.assertEqual(gather_task.result(), [42])
    2159  
    2160      def test_cancel_gather_2(self):
    2161          cases = [
    2162              ((), ()),
    2163              ((None,), ()),
    2164              (('my message',), ('my message',)),
    2165              # Non-string values should roundtrip.
    2166              ((5,), (5,)),
    2167          ]
    2168          for cancel_args, expected_args in cases:
    2169              with self.subTest(cancel_args=cancel_args):
    2170                  loop = asyncio.new_event_loop()
    2171                  self.addCleanup(loop.close)
    2172  
    2173                  async def test():
    2174                      time = 0
    2175                      while True:
    2176                          time += 0.05
    2177                          await asyncio.gather(asyncio.sleep(0.05),
    2178                                               return_exceptions=True)
    2179                          if time > 1:
    2180                              return
    2181  
    2182                  async def main():
    2183                      qwe = self.new_task(loop, test())
    2184                      await asyncio.sleep(0.2)
    2185                      qwe.cancel(*cancel_args)
    2186                      await qwe
    2187  
    2188                  try:
    2189                      loop.run_until_complete(main())
    2190                  except asyncio.CancelledError as exc:
    2191                      self.assertEqual(exc.args, expected_args)
    2192                      actual = get_innermost_context(exc)
    2193                      self.assertEqual(
    2194                          actual,
    2195                          (asyncio.CancelledError, expected_args, 0),
    2196                      )
    2197                  else:
    2198                      self.fail(
    2199                          'gather() does not propagate CancelledError '
    2200                          'raised by inner task to the gather() caller.'
    2201                      )
    2202  
    2203      def test_exception_traceback(self):
    2204          # See http://bugs.python.org/issue28843
    2205  
    2206          async def foo():
    2207              1 / 0
    2208  
    2209          async def main():
    2210              task = self.new_task(self.loop, foo())
    2211              await asyncio.sleep(0)  # skip one loop iteration
    2212              self.assertIsNotNone(task.exception().__traceback__)
    2213  
    2214          self.loop.run_until_complete(main())
    2215  
    2216      @mock.patch('asyncio.base_events.logger')
    2217      def test_error_in_call_soon(self, m_log):
    2218          def call_soon(callback, *args, **kwargs):
    2219              raise ValueError
    2220          self.loop.call_soon = call_soon
    2221  
    2222          async def coro():
    2223              pass
    2224  
    2225          self.assertFalse(m_log.error.called)
    2226  
    2227          with self.assertRaises(ValueError):
    2228              gen = coro()
    2229              try:
    2230                  self.new_task(self.loop, gen)
    2231              finally:
    2232                  gen.close()
    2233          gc.collect()  # For PyPy or other GCs.
    2234  
    2235          self.assertTrue(m_log.error.called)
    2236          message = m_log.error.call_args[0][0]
    2237          self.assertIn('Task was destroyed but it is pending', message)
    2238  
    2239          self.assertEqual(asyncio.all_tasks(self.loop), set())
    2240  
    2241      def test_create_task_with_noncoroutine(self):
    2242          with self.assertRaisesRegex(TypeError,
    2243                                      "a coroutine was expected, got 123"):
    2244              self.new_task(self.loop, 123)
    2245  
    2246          # test it for the second time to ensure that caching
    2247          # in asyncio.iscoroutine() doesn't break things.
    2248          with self.assertRaisesRegex(TypeError,
    2249                                      "a coroutine was expected, got 123"):
    2250              self.new_task(self.loop, 123)
    2251  
    2252      def test_create_task_with_async_function(self):
    2253  
    2254          async def coro():
    2255              pass
    2256  
    2257          task = self.new_task(self.loop, coro())
    2258          self.assertIsInstance(task, self.Task)
    2259          self.loop.run_until_complete(task)
    2260  
    2261          # test it for the second time to ensure that caching
    2262          # in asyncio.iscoroutine() doesn't break things.
    2263          task = self.new_task(self.loop, coro())
    2264          self.assertIsInstance(task, self.Task)
    2265          self.loop.run_until_complete(task)
    2266  
    2267      def test_create_task_with_asynclike_function(self):
    2268          task = self.new_task(self.loop, CoroLikeObject())
    2269          self.assertIsInstance(task, self.Task)
    2270          self.assertEqual(self.loop.run_until_complete(task), 42)
    2271  
    2272          # test it for the second time to ensure that caching
    2273          # in asyncio.iscoroutine() doesn't break things.
    2274          task = self.new_task(self.loop, CoroLikeObject())
    2275          self.assertIsInstance(task, self.Task)
    2276          self.assertEqual(self.loop.run_until_complete(task), 42)
    2277  
    2278      def test_bare_create_task(self):
    2279  
    2280          async def inner():
    2281              return 1
    2282  
    2283          async def coro():
    2284              task = asyncio.create_task(inner())
    2285              self.assertIsInstance(task, self.Task)
    2286              ret = await task
    2287              self.assertEqual(1, ret)
    2288  
    2289          self.loop.run_until_complete(coro())
    2290  
    2291      def test_bare_create_named_task(self):
    2292  
    2293          async def coro_noop():
    2294              pass
    2295  
    2296          async def coro():
    2297              task = asyncio.create_task(coro_noop(), name='No-op')
    2298              self.assertEqual(task.get_name(), 'No-op')
    2299              await task
    2300  
    2301          self.loop.run_until_complete(coro())
    2302  
    2303      def test_context_1(self):
    2304          cvar = contextvars.ContextVar('cvar', default='nope')
    2305  
    2306          async def sub():
    2307              await asyncio.sleep(0.01)
    2308              self.assertEqual(cvar.get(), 'nope')
    2309              cvar.set('something else')
    2310  
    2311          async def main():
    2312              self.assertEqual(cvar.get(), 'nope')
    2313              subtask = self.new_task(loop, sub())
    2314              cvar.set('yes')
    2315              self.assertEqual(cvar.get(), 'yes')
    2316              await subtask
    2317              self.assertEqual(cvar.get(), 'yes')
    2318  
    2319          loop = asyncio.new_event_loop()
    2320          try:
    2321              task = self.new_task(loop, main())
    2322              loop.run_until_complete(task)
    2323          finally:
    2324              loop.close()
    2325  
    2326      def test_context_2(self):
    2327          cvar = contextvars.ContextVar('cvar', default='nope')
    2328  
    2329          async def main():
    2330              def fut_on_done(fut):
    2331                  # This change must not pollute the context
    2332                  # of the "main()" task.
    2333                  cvar.set('something else')
    2334  
    2335              self.assertEqual(cvar.get(), 'nope')
    2336  
    2337              for j in range(2):
    2338                  fut = self.new_future(loop)
    2339                  fut.add_done_callback(fut_on_done)
    2340                  cvar.set(f'yes{j}')
    2341                  loop.call_soon(fut.set_result, None)
    2342                  await fut
    2343                  self.assertEqual(cvar.get(), f'yes{j}')
    2344  
    2345                  for i in range(3):
    2346                      # Test that task passed its context to add_done_callback:
    2347                      cvar.set(f'yes{i}-{j}')
    2348                      await asyncio.sleep(0.001)
    2349                      self.assertEqual(cvar.get(), f'yes{i}-{j}')
    2350  
    2351          loop = asyncio.new_event_loop()
    2352          try:
    2353              task = self.new_task(loop, main())
    2354              loop.run_until_complete(task)
    2355          finally:
    2356              loop.close()
    2357  
    2358          self.assertEqual(cvar.get(), 'nope')
    2359  
    2360      def test_context_3(self):
    2361          # Run 100 Tasks in parallel, each modifying cvar.
    2362  
    2363          cvar = contextvars.ContextVar('cvar', default=-1)
    2364  
    2365          async def sub(num):
    2366              for i in range(10):
    2367                  cvar.set(num + i)
    2368                  await asyncio.sleep(random.uniform(0.001, 0.05))
    2369                  self.assertEqual(cvar.get(), num + i)
    2370  
    2371          async def main():
    2372              tasks = []
    2373              for i in range(100):
    2374                  task = loop.create_task(sub(random.randint(0, 10)))
    2375                  tasks.append(task)
    2376  
    2377              await asyncio.gather(*tasks)
    2378  
    2379          loop = asyncio.new_event_loop()
    2380          try:
    2381              loop.run_until_complete(main())
    2382          finally:
    2383              loop.close()
    2384  
    2385          self.assertEqual(cvar.get(), -1)
    2386  
    2387      def test_context_4(self):
    2388          cvar = contextvars.ContextVar('cvar')
    2389  
    2390          async def coro(val):
    2391              await asyncio.sleep(0)
    2392              cvar.set(val)
    2393  
    2394          async def main():
    2395              ret = []
    2396              ctx = contextvars.copy_context()
    2397              ret.append(ctx.get(cvar))
    2398              t1 = self.new_task(loop, coro(1), context=ctx)
    2399              await t1
    2400              ret.append(ctx.get(cvar))
    2401              t2 = self.new_task(loop, coro(2), context=ctx)
    2402              await t2
    2403              ret.append(ctx.get(cvar))
    2404              return ret
    2405  
    2406          loop = asyncio.new_event_loop()
    2407          try:
    2408              task = self.new_task(loop, main())
    2409              ret = loop.run_until_complete(task)
    2410          finally:
    2411              loop.close()
    2412  
    2413          self.assertEqual([None, 1, 2], ret)
    2414  
    2415      def test_context_5(self):
    2416          cvar = contextvars.ContextVar('cvar')
    2417  
    2418          async def coro(val):
    2419              await asyncio.sleep(0)
    2420              cvar.set(val)
    2421  
    2422          async def main():
    2423              ret = []
    2424              ctx = contextvars.copy_context()
    2425              ret.append(ctx.get(cvar))
    2426              t1 = asyncio.create_task(coro(1), context=ctx)
    2427              await t1
    2428              ret.append(ctx.get(cvar))
    2429              t2 = asyncio.create_task(coro(2), context=ctx)
    2430              await t2
    2431              ret.append(ctx.get(cvar))
    2432              return ret
    2433  
    2434          loop = asyncio.new_event_loop()
    2435          try:
    2436              task = self.new_task(loop, main())
    2437              ret = loop.run_until_complete(task)
    2438          finally:
    2439              loop.close()
    2440  
    2441          self.assertEqual([None, 1, 2], ret)
    2442  
    2443      def test_context_6(self):
    2444          cvar = contextvars.ContextVar('cvar')
    2445  
    2446          async def coro(val):
    2447              await asyncio.sleep(0)
    2448              cvar.set(val)
    2449  
    2450          async def main():
    2451              ret = []
    2452              ctx = contextvars.copy_context()
    2453              ret.append(ctx.get(cvar))
    2454              t1 = loop.create_task(coro(1), context=ctx)
    2455              await t1
    2456              ret.append(ctx.get(cvar))
    2457              t2 = loop.create_task(coro(2), context=ctx)
    2458              await t2
    2459              ret.append(ctx.get(cvar))
    2460              return ret
    2461  
    2462          loop = asyncio.new_event_loop()
    2463          try:
    2464              task = loop.create_task(main())
    2465              ret = loop.run_until_complete(task)
    2466          finally:
    2467              loop.close()
    2468  
    2469          self.assertEqual([None, 1, 2], ret)
    2470  
    2471      def test_get_coro(self):
    2472          loop = asyncio.new_event_loop()
    2473          coro = coroutine_function()
    2474          try:
    2475              task = self.new_task(loop, coro)
    2476              loop.run_until_complete(task)
    2477              self.assertIs(task.get_coro(), coro)
    2478          finally:
    2479              loop.close()
    2480  
    2481      def test_get_context(self):
    2482          loop = asyncio.new_event_loop()
    2483          coro = coroutine_function()
    2484          context = contextvars.copy_context()
    2485          try:
    2486              task = self.new_task(loop, coro, context=context)
    2487              loop.run_until_complete(task)
    2488              self.assertIs(task.get_context(), context)
    2489          finally:
    2490              loop.close()
    2491  
    2492  
    2493  def add_subclass_tests(cls):
    2494      BaseTask = cls.Task
    2495      BaseFuture = cls.Future
    2496  
    2497      if BaseTask is None or BaseFuture is None:
    2498          return cls
    2499  
    2500      class ESC[4;38;5;81mCommonFuture:
    2501          def __init__(self, *args, **kwargs):
    2502              self.calls = collections.defaultdict(lambda: 0)
    2503              super().__init__(*args, **kwargs)
    2504  
    2505          def add_done_callback(self, *args, **kwargs):
    2506              self.calls['add_done_callback'] += 1
    2507              return super().add_done_callback(*args, **kwargs)
    2508  
    2509      class ESC[4;38;5;81mTask(ESC[4;38;5;149mCommonFuture, ESC[4;38;5;149mBaseTask):
    2510          pass
    2511  
    2512      class ESC[4;38;5;81mFuture(ESC[4;38;5;149mCommonFuture, ESC[4;38;5;149mBaseFuture):
    2513          pass
    2514  
    2515      def test_subclasses_ctask_cfuture(self):
    2516          fut = self.Future(loop=self.loop)
    2517  
    2518          async def func():
    2519              self.loop.call_soon(lambda: fut.set_result('spam'))
    2520              return await fut
    2521  
    2522          task = self.Task(func(), loop=self.loop)
    2523  
    2524          result = self.loop.run_until_complete(task)
    2525  
    2526          self.assertEqual(result, 'spam')
    2527  
    2528          self.assertEqual(
    2529              dict(task.calls),
    2530              {'add_done_callback': 1})
    2531  
    2532          self.assertEqual(
    2533              dict(fut.calls),
    2534              {'add_done_callback': 1})
    2535  
    2536      # Add patched Task & Future back to the test case
    2537      cls.Task = Task
    2538      cls.Future = Future
    2539  
    2540      # Add an extra unit-test
    2541      cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture
    2542  
    2543      # Disable the "test_task_source_traceback" test
    2544      # (the test is hardcoded for a particular call stack, which
    2545      # is slightly different for Task subclasses)
    2546      cls.test_task_source_traceback = None
    2547  
    2548      return cls
    2549  
    2550  
    2551  class ESC[4;38;5;81mSetMethodsTest:
    2552  
    2553      def test_set_result_causes_invalid_state(self):
    2554          Future = type(self).Future
    2555          self.loop.call_exception_handler = exc_handler = mock.Mock()
    2556  
    2557          async def foo():
    2558              await asyncio.sleep(0.1)
    2559              return 10
    2560  
    2561          coro = foo()
    2562          task = self.new_task(self.loop, coro)
    2563          Future.set_result(task, 'spam')
    2564  
    2565          self.assertEqual(
    2566              self.loop.run_until_complete(task),
    2567              'spam')
    2568  
    2569          exc_handler.assert_called_once()
    2570          exc = exc_handler.call_args[0][0]['exception']
    2571          with self.assertRaisesRegex(asyncio.InvalidStateError,
    2572                                      r'step\(\): already done'):
    2573              raise exc
    2574  
    2575          coro.close()
    2576  
    2577      def test_set_exception_causes_invalid_state(self):
    2578          class ESC[4;38;5;81mMyExc(ESC[4;38;5;149mException):
    2579              pass
    2580  
    2581          Future = type(self).Future
    2582          self.loop.call_exception_handler = exc_handler = mock.Mock()
    2583  
    2584          async def foo():
    2585              await asyncio.sleep(0.1)
    2586              return 10
    2587  
    2588          coro = foo()
    2589          task = self.new_task(self.loop, coro)
    2590          Future.set_exception(task, MyExc())
    2591  
    2592          with self.assertRaises(MyExc):
    2593              self.loop.run_until_complete(task)
    2594  
    2595          exc_handler.assert_called_once()
    2596          exc = exc_handler.call_args[0][0]['exception']
    2597          with self.assertRaisesRegex(asyncio.InvalidStateError,
    2598                                      r'step\(\): already done'):
    2599              raise exc
    2600  
    2601          coro.close()
    2602  
    2603  
    2604  @unittest.skipUnless(hasattr(futures, '_CFuture') and
    2605                       hasattr(tasks, '_CTask'),
    2606                       'requires the C _asyncio module')
    2607  class ESC[4;38;5;81mCTask_CFuture_Tests(ESC[4;38;5;149mBaseTaskTests, ESC[4;38;5;149mSetMethodsTest,
    2608                            ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2609  
    2610      Task = getattr(tasks, '_CTask', None)
    2611      Future = getattr(futures, '_CFuture', None)
    2612  
    2613      @support.refcount_test
    2614      def test_refleaks_in_task___init__(self):
    2615          gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
    2616          async def coro():
    2617              pass
    2618          task = self.new_task(self.loop, coro())
    2619          self.loop.run_until_complete(task)
    2620          refs_before = gettotalrefcount()
    2621          for i in range(100):
    2622              task.__init__(coro(), loop=self.loop)
    2623              self.loop.run_until_complete(task)
    2624          self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)
    2625  
    2626      def test_del__log_destroy_pending_segfault(self):
    2627          async def coro():
    2628              pass
    2629          task = self.new_task(self.loop, coro())
    2630          self.loop.run_until_complete(task)
    2631          with self.assertRaises(AttributeError):
    2632              del task._log_destroy_pending
    2633  
    2634  
    2635  @unittest.skipUnless(hasattr(futures, '_CFuture') and
    2636                       hasattr(tasks, '_CTask'),
    2637                       'requires the C _asyncio module')
    2638  @add_subclass_tests
    2639  class ESC[4;38;5;81mCTask_CFuture_SubclassTests(ESC[4;38;5;149mBaseTaskTests, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2640  
    2641      Task = getattr(tasks, '_CTask', None)
    2642      Future = getattr(futures, '_CFuture', None)
    2643  
    2644  
    2645  @unittest.skipUnless(hasattr(tasks, '_CTask'),
    2646                       'requires the C _asyncio module')
    2647  @add_subclass_tests
    2648  class ESC[4;38;5;81mCTaskSubclass_PyFuture_Tests(ESC[4;38;5;149mBaseTaskTests, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2649  
    2650      Task = getattr(tasks, '_CTask', None)
    2651      Future = futures._PyFuture
    2652  
    2653  
    2654  @unittest.skipUnless(hasattr(futures, '_CFuture'),
    2655                       'requires the C _asyncio module')
    2656  @add_subclass_tests
    2657  class ESC[4;38;5;81mPyTask_CFutureSubclass_Tests(ESC[4;38;5;149mBaseTaskTests, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2658  
    2659      Future = getattr(futures, '_CFuture', None)
    2660      Task = tasks._PyTask
    2661  
    2662  
    2663  @unittest.skipUnless(hasattr(tasks, '_CTask'),
    2664                       'requires the C _asyncio module')
    2665  class ESC[4;38;5;81mCTask_PyFuture_Tests(ESC[4;38;5;149mBaseTaskTests, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2666  
    2667      Task = getattr(tasks, '_CTask', None)
    2668      Future = futures._PyFuture
    2669  
    2670  
    2671  @unittest.skipUnless(hasattr(futures, '_CFuture'),
    2672                       'requires the C _asyncio module')
    2673  class ESC[4;38;5;81mPyTask_CFuture_Tests(ESC[4;38;5;149mBaseTaskTests, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2674  
    2675      Task = tasks._PyTask
    2676      Future = getattr(futures, '_CFuture', None)
    2677  
    2678  
    2679  class ESC[4;38;5;81mPyTask_PyFuture_Tests(ESC[4;38;5;149mBaseTaskTests, ESC[4;38;5;149mSetMethodsTest,
    2680                              ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2681  
    2682      Task = tasks._PyTask
    2683      Future = futures._PyFuture
    2684  
    2685  
    2686  @add_subclass_tests
    2687  class ESC[4;38;5;81mPyTask_PyFuture_SubclassTests(ESC[4;38;5;149mBaseTaskTests, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2688      Task = tasks._PyTask
    2689      Future = futures._PyFuture
    2690  
    2691  
    2692  @unittest.skipUnless(hasattr(tasks, '_CTask'),
    2693                       'requires the C _asyncio module')
    2694  class ESC[4;38;5;81mCTask_Future_Tests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2695  
    2696      def test_foobar(self):
    2697          class ESC[4;38;5;81mFut(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mFuture):
    2698              @property
    2699              def get_loop(self):
    2700                  raise AttributeError
    2701  
    2702          async def coro():
    2703              await fut
    2704              return 'spam'
    2705  
    2706          self.loop = asyncio.new_event_loop()
    2707          try:
    2708              fut = Fut(loop=self.loop)
    2709              self.loop.call_later(0.1, fut.set_result, 1)
    2710              task = self.loop.create_task(coro())
    2711              res = self.loop.run_until_complete(task)
    2712          finally:
    2713              self.loop.close()
    2714  
    2715          self.assertEqual(res, 'spam')
    2716  
    2717  
    2718  class ESC[4;38;5;81mBaseTaskIntrospectionTests:
    2719      _register_task = None
    2720      _unregister_task = None
    2721      _enter_task = None
    2722      _leave_task = None
    2723  
    2724      def test__register_task_1(self):
    2725          class ESC[4;38;5;81mTaskLike:
    2726              @property
    2727              def _loop(self):
    2728                  return loop
    2729  
    2730              def done(self):
    2731                  return False
    2732  
    2733          task = TaskLike()
    2734          loop = mock.Mock()
    2735  
    2736          self.assertEqual(asyncio.all_tasks(loop), set())
    2737          self._register_task(task)
    2738          self.assertEqual(asyncio.all_tasks(loop), {task})
    2739          self._unregister_task(task)
    2740  
    2741      def test__register_task_2(self):
    2742          class ESC[4;38;5;81mTaskLike:
    2743              def get_loop(self):
    2744                  return loop
    2745  
    2746              def done(self):
    2747                  return False
    2748  
    2749          task = TaskLike()
    2750          loop = mock.Mock()
    2751  
    2752          self.assertEqual(asyncio.all_tasks(loop), set())
    2753          self._register_task(task)
    2754          self.assertEqual(asyncio.all_tasks(loop), {task})
    2755          self._unregister_task(task)
    2756  
    2757      def test__register_task_3(self):
    2758          class ESC[4;38;5;81mTaskLike:
    2759              def get_loop(self):
    2760                  return loop
    2761  
    2762              def done(self):
    2763                  return True
    2764  
    2765          task = TaskLike()
    2766          loop = mock.Mock()
    2767  
    2768          self.assertEqual(asyncio.all_tasks(loop), set())
    2769          self._register_task(task)
    2770          self.assertEqual(asyncio.all_tasks(loop), set())
    2771          self._unregister_task(task)
    2772  
    2773      def test__enter_task(self):
    2774          task = mock.Mock()
    2775          loop = mock.Mock()
    2776          self.assertIsNone(asyncio.current_task(loop))
    2777          self._enter_task(loop, task)
    2778          self.assertIs(asyncio.current_task(loop), task)
    2779          self._leave_task(loop, task)
    2780  
    2781      def test__enter_task_failure(self):
    2782          task1 = mock.Mock()
    2783          task2 = mock.Mock()
    2784          loop = mock.Mock()
    2785          self._enter_task(loop, task1)
    2786          with self.assertRaises(RuntimeError):
    2787              self._enter_task(loop, task2)
    2788          self.assertIs(asyncio.current_task(loop), task1)
    2789          self._leave_task(loop, task1)
    2790  
    2791      def test__leave_task(self):
    2792          task = mock.Mock()
    2793          loop = mock.Mock()
    2794          self._enter_task(loop, task)
    2795          self._leave_task(loop, task)
    2796          self.assertIsNone(asyncio.current_task(loop))
    2797  
    2798      def test__leave_task_failure1(self):
    2799          task1 = mock.Mock()
    2800          task2 = mock.Mock()
    2801          loop = mock.Mock()
    2802          self._enter_task(loop, task1)
    2803          with self.assertRaises(RuntimeError):
    2804              self._leave_task(loop, task2)
    2805          self.assertIs(asyncio.current_task(loop), task1)
    2806          self._leave_task(loop, task1)
    2807  
    2808      def test__leave_task_failure2(self):
    2809          task = mock.Mock()
    2810          loop = mock.Mock()
    2811          with self.assertRaises(RuntimeError):
    2812              self._leave_task(loop, task)
    2813          self.assertIsNone(asyncio.current_task(loop))
    2814  
    2815      def test__unregister_task(self):
    2816          task = mock.Mock()
    2817          loop = mock.Mock()
    2818          task.get_loop = lambda: loop
    2819          self._register_task(task)
    2820          self._unregister_task(task)
    2821          self.assertEqual(asyncio.all_tasks(loop), set())
    2822  
    2823      def test__unregister_task_not_registered(self):
    2824          task = mock.Mock()
    2825          loop = mock.Mock()
    2826          self._unregister_task(task)
    2827          self.assertEqual(asyncio.all_tasks(loop), set())
    2828  
    2829  
    2830  class ESC[4;38;5;81mPyIntrospectionTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase, ESC[4;38;5;149mBaseTaskIntrospectionTests):
    2831      _register_task = staticmethod(tasks._py_register_task)
    2832      _unregister_task = staticmethod(tasks._py_unregister_task)
    2833      _enter_task = staticmethod(tasks._py_enter_task)
    2834      _leave_task = staticmethod(tasks._py_leave_task)
    2835  
    2836  
    2837  @unittest.skipUnless(hasattr(tasks, '_c_register_task'),
    2838                       'requires the C _asyncio module')
    2839  class ESC[4;38;5;81mCIntrospectionTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase, ESC[4;38;5;149mBaseTaskIntrospectionTests):
    2840      if hasattr(tasks, '_c_register_task'):
    2841          _register_task = staticmethod(tasks._c_register_task)
    2842          _unregister_task = staticmethod(tasks._c_unregister_task)
    2843          _enter_task = staticmethod(tasks._c_enter_task)
    2844          _leave_task = staticmethod(tasks._c_leave_task)
    2845      else:
    2846          _register_task = _unregister_task = _enter_task = _leave_task = None
    2847  
    2848  
    2849  class ESC[4;38;5;81mBaseCurrentLoopTests:
    2850      current_task = None
    2851  
    2852      def setUp(self):
    2853          super().setUp()
    2854          self.loop = asyncio.new_event_loop()
    2855          self.set_event_loop(self.loop)
    2856  
    2857      def new_task(self, coro):
    2858          raise NotImplementedError
    2859  
    2860      def test_current_task_no_running_loop(self):
    2861          self.assertIsNone(self.current_task(loop=self.loop))
    2862  
    2863      def test_current_task_no_running_loop_implicit(self):
    2864          with self.assertRaisesRegex(RuntimeError, 'no running event loop'):
    2865              self.current_task()
    2866  
    2867      def test_current_task_with_implicit_loop(self):
    2868          async def coro():
    2869              self.assertIs(self.current_task(loop=self.loop), task)
    2870  
    2871              self.assertIs(self.current_task(None), task)
    2872              self.assertIs(self.current_task(), task)
    2873  
    2874          task = self.new_task(coro())
    2875          self.loop.run_until_complete(task)
    2876          self.assertIsNone(self.current_task(loop=self.loop))
    2877  
    2878  
    2879  class ESC[4;38;5;81mPyCurrentLoopTests(ESC[4;38;5;149mBaseCurrentLoopTests, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2880      current_task = staticmethod(tasks._py_current_task)
    2881  
    2882      def new_task(self, coro):
    2883          return tasks._PyTask(coro, loop=self.loop)
    2884  
    2885  
    2886  @unittest.skipUnless(hasattr(tasks, '_CTask') and
    2887                       hasattr(tasks, '_c_current_task'),
    2888                       'requires the C _asyncio module')
    2889  class ESC[4;38;5;81mCCurrentLoopTests(ESC[4;38;5;149mBaseCurrentLoopTests, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2890      if hasattr(tasks, '_c_current_task'):
    2891          current_task = staticmethod(tasks._c_current_task)
    2892      else:
    2893          current_task = None
    2894  
    2895      def new_task(self, coro):
    2896          return getattr(tasks, '_CTask')(coro, loop=self.loop)
    2897  
    2898  
    2899  class ESC[4;38;5;81mGenericTaskTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    2900  
    2901      def test_future_subclass(self):
    2902          self.assertTrue(issubclass(asyncio.Task, asyncio.Future))
    2903  
    2904      @support.cpython_only
    2905      def test_asyncio_module_compiled(self):
    2906          # Because of circular imports it's easy to make _asyncio
    2907          # module non-importable.  This is a simple test that will
    2908          # fail on systems where C modules were successfully compiled
    2909          # (hence the test for _functools etc), but _asyncio somehow didn't.
    2910          try:
    2911              import _functools
    2912              import _json
    2913              import _pickle
    2914          except ImportError:
    2915              self.skipTest('C modules are not available')
    2916          else:
    2917              try:
    2918                  import _asyncio
    2919              except ImportError:
    2920                  self.fail('_asyncio module is missing')
    2921  
    2922  
    2923  class ESC[4;38;5;81mGatherTestsBase:
    2924  
    2925      def setUp(self):
    2926          super().setUp()
    2927          self.one_loop = self.new_test_loop()
    2928          self.other_loop = self.new_test_loop()
    2929          self.set_event_loop(self.one_loop, cleanup=False)
    2930  
    2931      def _run_loop(self, loop):
    2932          while loop._ready:
    2933              test_utils.run_briefly(loop)
    2934  
    2935      def _check_success(self, **kwargs):
    2936          a, b, c = [self.one_loop.create_future() for i in range(3)]
    2937          fut = self._gather(*self.wrap_futures(a, b, c), **kwargs)
    2938          cb = test_utils.MockCallback()
    2939          fut.add_done_callback(cb)
    2940          b.set_result(1)
    2941          a.set_result(2)
    2942          self._run_loop(self.one_loop)
    2943          self.assertEqual(cb.called, False)
    2944          self.assertFalse(fut.done())
    2945          c.set_result(3)
    2946          self._run_loop(self.one_loop)
    2947          cb.assert_called_once_with(fut)
    2948          self.assertEqual(fut.result(), [2, 1, 3])
    2949  
    2950      def test_success(self):
    2951          self._check_success()
    2952          self._check_success(return_exceptions=False)
    2953  
    2954      def test_result_exception_success(self):
    2955          self._check_success(return_exceptions=True)
    2956  
    2957      def test_one_exception(self):
    2958          a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
    2959          fut = self._gather(*self.wrap_futures(a, b, c, d, e))
    2960          cb = test_utils.MockCallback()
    2961          fut.add_done_callback(cb)
    2962          exc = ZeroDivisionError()
    2963          a.set_result(1)
    2964          b.set_exception(exc)
    2965          self._run_loop(self.one_loop)
    2966          self.assertTrue(fut.done())
    2967          cb.assert_called_once_with(fut)
    2968          self.assertIs(fut.exception(), exc)
    2969          # Does nothing
    2970          c.set_result(3)
    2971          d.cancel()
    2972          e.set_exception(RuntimeError())
    2973          e.exception()
    2974  
    2975      def test_return_exceptions(self):
    2976          a, b, c, d = [self.one_loop.create_future() for i in range(4)]
    2977          fut = self._gather(*self.wrap_futures(a, b, c, d),
    2978                             return_exceptions=True)
    2979          cb = test_utils.MockCallback()
    2980          fut.add_done_callback(cb)
    2981          exc = ZeroDivisionError()
    2982          exc2 = RuntimeError()
    2983          b.set_result(1)
    2984          c.set_exception(exc)
    2985          a.set_result(3)
    2986          self._run_loop(self.one_loop)
    2987          self.assertFalse(fut.done())
    2988          d.set_exception(exc2)
    2989          self._run_loop(self.one_loop)
    2990          self.assertTrue(fut.done())
    2991          cb.assert_called_once_with(fut)
    2992          self.assertEqual(fut.result(), [3, 1, exc, exc2])
    2993  
    2994      def test_env_var_debug(self):
    2995          code = '\n'.join((
    2996              'import asyncio.coroutines',
    2997              'print(asyncio.coroutines._is_debug_mode())'))
    2998  
    2999          # Test with -E to not fail if the unit test was run with
    3000          # PYTHONASYNCIODEBUG set to a non-empty string
    3001          sts, stdout, stderr = assert_python_ok('-E', '-c', code)
    3002          self.assertEqual(stdout.rstrip(), b'False')
    3003  
    3004          sts, stdout, stderr = assert_python_ok('-c', code,
    3005                                                 PYTHONASYNCIODEBUG='',
    3006                                                 PYTHONDEVMODE='')
    3007          self.assertEqual(stdout.rstrip(), b'False')
    3008  
    3009          sts, stdout, stderr = assert_python_ok('-c', code,
    3010                                                 PYTHONASYNCIODEBUG='1',
    3011                                                 PYTHONDEVMODE='')
    3012          self.assertEqual(stdout.rstrip(), b'True')
    3013  
    3014          sts, stdout, stderr = assert_python_ok('-E', '-c', code,
    3015                                                 PYTHONASYNCIODEBUG='1',
    3016                                                 PYTHONDEVMODE='')
    3017          self.assertEqual(stdout.rstrip(), b'False')
    3018  
    3019          # -X dev
    3020          sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
    3021                                                 '-c', code)
    3022          self.assertEqual(stdout.rstrip(), b'True')
    3023  
    3024  
    3025  class ESC[4;38;5;81mFutureGatherTests(ESC[4;38;5;149mGatherTestsBase, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    3026  
    3027      def wrap_futures(self, *futures):
    3028          return futures
    3029  
    3030      def _gather(self, *args, **kwargs):
    3031          return asyncio.gather(*args, **kwargs)
    3032  
    3033      def test_constructor_empty_sequence_without_loop(self):
    3034          with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
    3035              asyncio.gather()
    3036  
    3037      def test_constructor_empty_sequence_use_running_loop(self):
    3038          async def gather():
    3039              return asyncio.gather()
    3040          fut = self.one_loop.run_until_complete(gather())
    3041          self.assertIsInstance(fut, asyncio.Future)
    3042          self.assertIs(fut._loop, self.one_loop)
    3043          self._run_loop(self.one_loop)
    3044          self.assertTrue(fut.done())
    3045          self.assertEqual(fut.result(), [])
    3046  
    3047      def test_constructor_empty_sequence_use_global_loop(self):
    3048          # Deprecated in 3.10, undeprecated in 3.12
    3049          asyncio.set_event_loop(self.one_loop)
    3050          self.addCleanup(asyncio.set_event_loop, None)
    3051          fut = asyncio.gather()
    3052          self.assertIsInstance(fut, asyncio.Future)
    3053          self.assertIs(fut._loop, self.one_loop)
    3054          self._run_loop(self.one_loop)
    3055          self.assertTrue(fut.done())
    3056          self.assertEqual(fut.result(), [])
    3057  
    3058      def test_constructor_heterogenous_futures(self):
    3059          fut1 = self.one_loop.create_future()
    3060          fut2 = self.other_loop.create_future()
    3061          with self.assertRaises(ValueError):
    3062              asyncio.gather(fut1, fut2)
    3063  
    3064      def test_constructor_homogenous_futures(self):
    3065          children = [self.other_loop.create_future() for i in range(3)]
    3066          fut = asyncio.gather(*children)
    3067          self.assertIs(fut._loop, self.other_loop)
    3068          self._run_loop(self.other_loop)
    3069          self.assertFalse(fut.done())
    3070          fut = asyncio.gather(*children)
    3071          self.assertIs(fut._loop, self.other_loop)
    3072          self._run_loop(self.other_loop)
    3073          self.assertFalse(fut.done())
    3074  
    3075      def test_one_cancellation(self):
    3076          a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
    3077          fut = asyncio.gather(a, b, c, d, e)
    3078          cb = test_utils.MockCallback()
    3079          fut.add_done_callback(cb)
    3080          a.set_result(1)
    3081          b.cancel()
    3082          self._run_loop(self.one_loop)
    3083          self.assertTrue(fut.done())
    3084          cb.assert_called_once_with(fut)
    3085          self.assertFalse(fut.cancelled())
    3086          self.assertIsInstance(fut.exception(), asyncio.CancelledError)
    3087          # Does nothing
    3088          c.set_result(3)
    3089          d.cancel()
    3090          e.set_exception(RuntimeError())
    3091          e.exception()
    3092  
    3093      def test_result_exception_one_cancellation(self):
    3094          a, b, c, d, e, f = [self.one_loop.create_future()
    3095                              for i in range(6)]
    3096          fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True)
    3097          cb = test_utils.MockCallback()
    3098          fut.add_done_callback(cb)
    3099          a.set_result(1)
    3100          zde = ZeroDivisionError()
    3101          b.set_exception(zde)
    3102          c.cancel()
    3103          self._run_loop(self.one_loop)
    3104          self.assertFalse(fut.done())
    3105          d.set_result(3)
    3106          e.cancel()
    3107          rte = RuntimeError()
    3108          f.set_exception(rte)
    3109          res = self.one_loop.run_until_complete(fut)
    3110          self.assertIsInstance(res[2], asyncio.CancelledError)
    3111          self.assertIsInstance(res[4], asyncio.CancelledError)
    3112          res[2] = res[4] = None
    3113          self.assertEqual(res, [1, zde, None, 3, None, rte])
    3114          cb.assert_called_once_with(fut)
    3115  
    3116  
    3117  class ESC[4;38;5;81mCoroutineGatherTests(ESC[4;38;5;149mGatherTestsBase, ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    3118  
    3119      def wrap_futures(self, *futures):
    3120          coros = []
    3121          for fut in futures:
    3122              async def coro(fut=fut):
    3123                  return await fut
    3124              coros.append(coro())
    3125          return coros
    3126  
    3127      def _gather(self, *args, **kwargs):
    3128          async def coro():
    3129              return asyncio.gather(*args, **kwargs)
    3130          return self.one_loop.run_until_complete(coro())
    3131  
    3132      def test_constructor_without_loop(self):
    3133          async def coro():
    3134              return 'abc'
    3135          gen1 = coro()
    3136          self.addCleanup(gen1.close)
    3137          gen2 = coro()
    3138          self.addCleanup(gen2.close)
    3139          with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
    3140              asyncio.gather(gen1, gen2)
    3141  
    3142      def test_constructor_use_running_loop(self):
    3143          async def coro():
    3144              return 'abc'
    3145          gen1 = coro()
    3146          gen2 = coro()
    3147          async def gather():
    3148              return asyncio.gather(gen1, gen2)
    3149          fut = self.one_loop.run_until_complete(gather())
    3150          self.assertIs(fut._loop, self.one_loop)
    3151          self.one_loop.run_until_complete(fut)
    3152  
    3153      def test_constructor_use_global_loop(self):
    3154          # Deprecated in 3.10, undeprecated in 3.12
    3155          async def coro():
    3156              return 'abc'
    3157          asyncio.set_event_loop(self.other_loop)
    3158          self.addCleanup(asyncio.set_event_loop, None)
    3159          gen1 = coro()
    3160          gen2 = coro()
    3161          fut = asyncio.gather(gen1, gen2)
    3162          self.assertIs(fut._loop, self.other_loop)
    3163          self.other_loop.run_until_complete(fut)
    3164  
    3165      def test_duplicate_coroutines(self):
    3166          async def coro(s):
    3167              return s
    3168          c = coro('abc')
    3169          fut = self._gather(c, c, coro('def'), c)
    3170          self._run_loop(self.one_loop)
    3171          self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc'])
    3172  
    3173      def test_cancellation_broadcast(self):
    3174          # Cancelling outer() cancels all children.
    3175          proof = 0
    3176          waiter = self.one_loop.create_future()
    3177  
    3178          async def inner():
    3179              nonlocal proof
    3180              await waiter
    3181              proof += 1
    3182  
    3183          child1 = asyncio.ensure_future(inner(), loop=self.one_loop)
    3184          child2 = asyncio.ensure_future(inner(), loop=self.one_loop)
    3185          gatherer = None
    3186  
    3187          async def outer():
    3188              nonlocal proof, gatherer
    3189              gatherer = asyncio.gather(child1, child2)
    3190              await gatherer
    3191              proof += 100
    3192  
    3193          f = asyncio.ensure_future(outer(), loop=self.one_loop)
    3194          test_utils.run_briefly(self.one_loop)
    3195          self.assertTrue(f.cancel())
    3196          with self.assertRaises(asyncio.CancelledError):
    3197              self.one_loop.run_until_complete(f)
    3198          self.assertFalse(gatherer.cancel())
    3199          self.assertTrue(waiter.cancelled())
    3200          self.assertTrue(child1.cancelled())
    3201          self.assertTrue(child2.cancelled())
    3202          test_utils.run_briefly(self.one_loop)
    3203          self.assertEqual(proof, 0)
    3204  
    3205      def test_exception_marking(self):
    3206          # Test for the first line marked "Mark exception retrieved."
    3207  
    3208          async def inner(f):
    3209              await f
    3210              raise RuntimeError('should not be ignored')
    3211  
    3212          a = self.one_loop.create_future()
    3213          b = self.one_loop.create_future()
    3214  
    3215          async def outer():
    3216              await asyncio.gather(inner(a), inner(b))
    3217  
    3218          f = asyncio.ensure_future(outer(), loop=self.one_loop)
    3219          test_utils.run_briefly(self.one_loop)
    3220          a.set_result(None)
    3221          test_utils.run_briefly(self.one_loop)
    3222          b.set_result(None)
    3223          test_utils.run_briefly(self.one_loop)
    3224          self.assertIsInstance(f.exception(), RuntimeError)
    3225  
    3226      def test_issue46672(self):
    3227          with mock.patch(
    3228              'asyncio.base_events.BaseEventLoop.call_exception_handler',
    3229          ):
    3230              async def coro(s):
    3231                  return s
    3232              c = coro('abc')
    3233  
    3234              with self.assertRaises(TypeError):
    3235                  self._gather(c, {})
    3236              self._run_loop(self.one_loop)
    3237              # NameError should not happen:
    3238              self.one_loop.call_exception_handler.assert_not_called()
    3239  
    3240  
    3241  class ESC[4;38;5;81mRunCoroutineThreadsafeTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    3242      """Test case for asyncio.run_coroutine_threadsafe."""
    3243  
    3244      def setUp(self):
    3245          super().setUp()
    3246          self.loop = asyncio.new_event_loop()
    3247          self.set_event_loop(self.loop) # Will cleanup properly
    3248  
    3249      async def add(self, a, b, fail=False, cancel=False):
    3250          """Wait 0.05 second and return a + b."""
    3251          await asyncio.sleep(0.05)
    3252          if fail:
    3253              raise RuntimeError("Fail!")
    3254          if cancel:
    3255              asyncio.current_task(self.loop).cancel()
    3256              await asyncio.sleep(0)
    3257          return a + b
    3258  
    3259      def target(self, fail=False, cancel=False, timeout=None,
    3260                 advance_coro=False):
    3261          """Run add coroutine in the event loop."""
    3262          coro = self.add(1, 2, fail=fail, cancel=cancel)
    3263          future = asyncio.run_coroutine_threadsafe(coro, self.loop)
    3264          if advance_coro:
    3265              # this is for test_run_coroutine_threadsafe_task_factory_exception;
    3266              # otherwise it spills errors and breaks **other** unittests, since
    3267              # 'target' is interacting with threads.
    3268  
    3269              # With this call, `coro` will be advanced.
    3270              self.loop.call_soon_threadsafe(coro.send, None)
    3271          try:
    3272              return future.result(timeout)
    3273          finally:
    3274              future.done() or future.cancel()
    3275  
    3276      def test_run_coroutine_threadsafe(self):
    3277          """Test coroutine submission from a thread to an event loop."""
    3278          future = self.loop.run_in_executor(None, self.target)
    3279          result = self.loop.run_until_complete(future)
    3280          self.assertEqual(result, 3)
    3281  
    3282      def test_run_coroutine_threadsafe_with_exception(self):
    3283          """Test coroutine submission from a thread to an event loop
    3284          when an exception is raised."""
    3285          future = self.loop.run_in_executor(None, self.target, True)
    3286          with self.assertRaises(RuntimeError) as exc_context:
    3287              self.loop.run_until_complete(future)
    3288          self.assertIn("Fail!", exc_context.exception.args)
    3289  
    3290      def test_run_coroutine_threadsafe_with_timeout(self):
    3291          """Test coroutine submission from a thread to an event loop
    3292          when a timeout is raised."""
    3293          callback = lambda: self.target(timeout=0)
    3294          future = self.loop.run_in_executor(None, callback)
    3295          with self.assertRaises(asyncio.TimeoutError):
    3296              self.loop.run_until_complete(future)
    3297          test_utils.run_briefly(self.loop)
    3298          # Check that there's no pending task (add has been cancelled)
    3299          for task in asyncio.all_tasks(self.loop):
    3300              self.assertTrue(task.done())
    3301  
    3302      def test_run_coroutine_threadsafe_task_cancelled(self):
    3303          """Test coroutine submission from a thread to an event loop
    3304          when the task is cancelled."""
    3305          callback = lambda: self.target(cancel=True)
    3306          future = self.loop.run_in_executor(None, callback)
    3307          with self.assertRaises(asyncio.CancelledError):
    3308              self.loop.run_until_complete(future)
    3309  
    3310      def test_run_coroutine_threadsafe_task_factory_exception(self):
    3311          """Test coroutine submission from a thread to an event loop
    3312          when the task factory raise an exception."""
    3313  
    3314          def task_factory(loop, coro):
    3315              raise NameError
    3316  
    3317          run = self.loop.run_in_executor(
    3318              None, lambda: self.target(advance_coro=True))
    3319  
    3320          # Set exception handler
    3321          callback = test_utils.MockCallback()
    3322          self.loop.set_exception_handler(callback)
    3323  
    3324          # Set corrupted task factory
    3325          self.addCleanup(self.loop.set_task_factory,
    3326                          self.loop.get_task_factory())
    3327          self.loop.set_task_factory(task_factory)
    3328  
    3329          # Run event loop
    3330          with self.assertRaises(NameError) as exc_context:
    3331              self.loop.run_until_complete(run)
    3332  
    3333          # Check exceptions
    3334          self.assertEqual(len(callback.call_args_list), 1)
    3335          (loop, context), kwargs = callback.call_args
    3336          self.assertEqual(context['exception'], exc_context.exception)
    3337  
    3338  
    3339  class ESC[4;38;5;81mSleepTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    3340      def setUp(self):
    3341          super().setUp()
    3342          self.loop = asyncio.new_event_loop()
    3343          self.set_event_loop(self.loop)
    3344  
    3345      def tearDown(self):
    3346          self.loop.close()
    3347          self.loop = None
    3348          super().tearDown()
    3349  
    3350      def test_sleep_zero(self):
    3351          result = 0
    3352  
    3353          def inc_result(num):
    3354              nonlocal result
    3355              result += num
    3356  
    3357          async def coro():
    3358              self.loop.call_soon(inc_result, 1)
    3359              self.assertEqual(result, 0)
    3360              num = await asyncio.sleep(0, result=10)
    3361              self.assertEqual(result, 1) # inc'ed by call_soon
    3362              inc_result(num) # num should be 11
    3363  
    3364          self.loop.run_until_complete(coro())
    3365          self.assertEqual(result, 11)
    3366  
    3367  
    3368  class ESC[4;38;5;81mCompatibilityTests(ESC[4;38;5;149mtest_utilsESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
    3369      # Tests for checking a bridge between old-styled coroutines
    3370      # and async/await syntax
    3371  
    3372      def setUp(self):
    3373          super().setUp()
    3374          self.loop = asyncio.new_event_loop()
    3375          self.set_event_loop(self.loop)
    3376  
    3377      def tearDown(self):
    3378          self.loop.close()
    3379          self.loop = None
    3380          super().tearDown()
    3381  
    3382  
    3383  if __name__ == '__main__':
    3384      unittest.main()