(root)/
Python-3.11.7/
Lib/
test/
test_asyncio/
test_waitfor.py
       1  import asyncio
       2  import unittest
       3  import time
       4  from test import support
       5  
       6  
       7  def tearDownModule():
       8      asyncio.set_event_loop_policy(None)
       9  
      10  
      11  # The following value can be used as a very small timeout:
      12  # it passes check "timeout > 0", but has almost
      13  # no effect on the test performance
      14  _EPSILON = 0.0001
      15  
      16  
      17  class ESC[4;38;5;81mSlowTask:
      18      """ Task will run for this defined time, ignoring cancel requests """
      19      TASK_TIMEOUT = 0.2
      20  
      21      def __init__(self):
      22          self.exited = False
      23  
      24      async def run(self):
      25          exitat = time.monotonic() + self.TASK_TIMEOUT
      26  
      27          while True:
      28              tosleep = exitat - time.monotonic()
      29              if tosleep <= 0:
      30                  break
      31  
      32              try:
      33                  await asyncio.sleep(tosleep)
      34              except asyncio.CancelledError:
      35                  pass
      36  
      37          self.exited = True
      38  
      39  
      40  class ESC[4;38;5;81mAsyncioWaitForTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
      41  
      42      async def test_asyncio_wait_for_cancelled(self):
      43          t = SlowTask()
      44  
      45          waitfortask = asyncio.create_task(
      46              asyncio.wait_for(t.run(), t.TASK_TIMEOUT * 2))
      47          await asyncio.sleep(0)
      48          waitfortask.cancel()
      49          await asyncio.wait({waitfortask})
      50  
      51          self.assertTrue(t.exited)
      52  
      53      async def test_asyncio_wait_for_timeout(self):
      54          t = SlowTask()
      55  
      56          try:
      57              await asyncio.wait_for(t.run(), t.TASK_TIMEOUT / 2)
      58          except asyncio.TimeoutError:
      59              pass
      60  
      61          self.assertTrue(t.exited)
      62  
      63      async def test_wait_for_timeout_less_then_0_or_0_future_done(self):
      64          loop = asyncio.get_running_loop()
      65  
      66          fut = loop.create_future()
      67          fut.set_result('done')
      68  
      69          ret = await asyncio.wait_for(fut, 0)
      70  
      71          self.assertEqual(ret, 'done')
      72          self.assertTrue(fut.done())
      73  
      74      async def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self):
      75          foo_started = False
      76  
      77          async def foo():
      78              nonlocal foo_started
      79              foo_started = True
      80  
      81          with self.assertRaises(asyncio.TimeoutError):
      82              await asyncio.wait_for(foo(), 0)
      83  
      84          self.assertEqual(foo_started, False)
      85  
      86      async def test_wait_for_timeout_less_then_0_or_0(self):
      87          loop = asyncio.get_running_loop()
      88  
      89          for timeout in [0, -1]:
      90              with self.subTest(timeout=timeout):
      91                  foo_running = None
      92                  started = loop.create_future()
      93  
      94                  async def foo():
      95                      nonlocal foo_running
      96                      foo_running = True
      97                      started.set_result(None)
      98                      try:
      99                          await asyncio.sleep(10)
     100                      finally:
     101                          foo_running = False
     102                      return 'done'
     103  
     104                  fut = asyncio.create_task(foo())
     105                  await started
     106  
     107                  with self.assertRaises(asyncio.TimeoutError):
     108                      await asyncio.wait_for(fut, timeout)
     109  
     110                  self.assertTrue(fut.done())
     111                  # it should have been cancelled due to the timeout
     112                  self.assertTrue(fut.cancelled())
     113                  self.assertEqual(foo_running, False)
     114  
     115      async def test_wait_for(self):
     116          foo_running = None
     117  
     118          async def foo():
     119              nonlocal foo_running
     120              foo_running = True
     121              try:
     122                  await asyncio.sleep(support.LONG_TIMEOUT)
     123              finally:
     124                  foo_running = False
     125              return 'done'
     126  
     127          fut = asyncio.create_task(foo())
     128  
     129          with self.assertRaises(asyncio.TimeoutError):
     130              await asyncio.wait_for(fut, 0.1)
     131          self.assertTrue(fut.done())
     132          # it should have been cancelled due to the timeout
     133          self.assertTrue(fut.cancelled())
     134          self.assertEqual(foo_running, False)
     135  
     136      async def test_wait_for_blocking(self):
     137          async def coro():
     138              return 'done'
     139  
     140          res = await asyncio.wait_for(coro(), timeout=None)
     141          self.assertEqual(res, 'done')
     142  
     143      async def test_wait_for_race_condition(self):
     144          loop = asyncio.get_running_loop()
     145  
     146          fut = loop.create_future()
     147          task = asyncio.wait_for(fut, timeout=0.2)
     148          loop.call_later(0.1, fut.set_result, "ok")
     149          res = await task
     150          self.assertEqual(res, "ok")
     151  
     152      async def test_wait_for_cancellation_race_condition(self):
     153          async def inner():
     154              with self.assertRaises(asyncio.CancelledError):
     155                  await asyncio.sleep(1)
     156              return 1
     157  
     158          result = await asyncio.wait_for(inner(), timeout=.01)
     159          self.assertEqual(result, 1)
     160  
     161      async def test_wait_for_waits_for_task_cancellation(self):
     162          task_done = False
     163  
     164          async def inner():
     165              nonlocal task_done
     166              try:
     167                  await asyncio.sleep(10)
     168              except asyncio.CancelledError:
     169                  await asyncio.sleep(_EPSILON)
     170                  raise
     171              finally:
     172                  task_done = True
     173  
     174          inner_task = asyncio.create_task(inner())
     175  
     176          with self.assertRaises(asyncio.TimeoutError) as cm:
     177              await asyncio.wait_for(inner_task, timeout=_EPSILON)
     178  
     179          self.assertTrue(task_done)
     180          chained = cm.exception.__context__
     181          self.assertEqual(type(chained), asyncio.CancelledError)
     182  
     183      async def test_wait_for_waits_for_task_cancellation_w_timeout_0(self):
     184          task_done = False
     185  
     186          async def foo():
     187              async def inner():
     188                  nonlocal task_done
     189                  try:
     190                      await asyncio.sleep(10)
     191                  except asyncio.CancelledError:
     192                      await asyncio.sleep(_EPSILON)
     193                      raise
     194                  finally:
     195                      task_done = True
     196  
     197              inner_task = asyncio.create_task(inner())
     198              await asyncio.sleep(_EPSILON)
     199              await asyncio.wait_for(inner_task, timeout=0)
     200  
     201          with self.assertRaises(asyncio.TimeoutError) as cm:
     202              await foo()
     203  
     204          self.assertTrue(task_done)
     205          chained = cm.exception.__context__
     206          self.assertEqual(type(chained), asyncio.CancelledError)
     207  
     208      async def test_wait_for_reraises_exception_during_cancellation(self):
     209          class ESC[4;38;5;81mFooException(ESC[4;38;5;149mException):
     210              pass
     211  
     212          async def foo():
     213              async def inner():
     214                  try:
     215                      await asyncio.sleep(0.2)
     216                  finally:
     217                      raise FooException
     218  
     219              inner_task = asyncio.create_task(inner())
     220  
     221              await asyncio.wait_for(inner_task, timeout=_EPSILON)
     222  
     223          with self.assertRaises(FooException):
     224              await foo()
     225  
     226      async def test_wait_for_self_cancellation(self):
     227          async def inner():
     228              try:
     229                  await asyncio.sleep(0.3)
     230              except asyncio.CancelledError:
     231                  try:
     232                      await asyncio.sleep(0.3)
     233                  except asyncio.CancelledError:
     234                      await asyncio.sleep(0.3)
     235  
     236              return 42
     237  
     238          inner_task = asyncio.create_task(inner())
     239  
     240          wait = asyncio.wait_for(inner_task, timeout=0.1)
     241  
     242          # Test that wait_for itself is properly cancellable
     243          # even when the initial task holds up the initial cancellation.
     244          task = asyncio.create_task(wait)
     245          await asyncio.sleep(0.2)
     246          task.cancel()
     247  
     248          with self.assertRaises(asyncio.CancelledError):
     249              await task
     250  
     251          self.assertEqual(await inner_task, 42)
     252  
     253      async def _test_cancel_wait_for(self, timeout):
     254          loop = asyncio.get_running_loop()
     255  
     256          async def blocking_coroutine():
     257              fut = loop.create_future()
     258              # Block: fut result is never set
     259              await fut
     260  
     261          task = asyncio.create_task(blocking_coroutine())
     262  
     263          wait = asyncio.create_task(asyncio.wait_for(task, timeout))
     264          loop.call_soon(wait.cancel)
     265  
     266          with self.assertRaises(asyncio.CancelledError):
     267              await wait
     268  
     269          # Python issue #23219: cancelling the wait must also cancel the task
     270          self.assertTrue(task.cancelled())
     271  
     272      async def test_cancel_blocking_wait_for(self):
     273          await self._test_cancel_wait_for(None)
     274  
     275      async def test_cancel_wait_for(self):
     276          await self._test_cancel_wait_for(60.0)
     277  
     278  
     279  if __name__ == '__main__':
     280      unittest.main()