(root)/
Python-3.12.0/
Lib/
test/
test_asyncio/
test_waitfor.py
       1  import asyncio
       2  import unittest
       3  import time
       4  
       5  
       6  def tearDownModule():
       7      asyncio.set_event_loop_policy(None)
       8  
       9  
      10  # The following value can be used as a very small timeout:
      11  # it passes check "timeout > 0", but has almost
      12  # no effect on the test performance
      13  _EPSILON = 0.0001
      14  
      15  
      16  class ESC[4;38;5;81mSlowTask:
      17      """ Task will run for this defined time, ignoring cancel requests """
      18      TASK_TIMEOUT = 0.2
      19  
      20      def __init__(self):
      21          self.exited = False
      22  
      23      async def run(self):
      24          exitat = time.monotonic() + self.TASK_TIMEOUT
      25  
      26          while True:
      27              tosleep = exitat - time.monotonic()
      28              if tosleep <= 0:
      29                  break
      30  
      31              try:
      32                  await asyncio.sleep(tosleep)
      33              except asyncio.CancelledError:
      34                  pass
      35  
      36          self.exited = True
      37  
      38  
      39  class ESC[4;38;5;81mAsyncioWaitForTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
      40  
      41      async def test_asyncio_wait_for_cancelled(self):
      42          t = SlowTask()
      43  
      44          waitfortask = asyncio.create_task(
      45              asyncio.wait_for(t.run(), t.TASK_TIMEOUT * 2))
      46          await asyncio.sleep(0)
      47          waitfortask.cancel()
      48          await asyncio.wait({waitfortask})
      49  
      50          self.assertTrue(t.exited)
      51  
      52      async def test_asyncio_wait_for_timeout(self):
      53          t = SlowTask()
      54  
      55          try:
      56              await asyncio.wait_for(t.run(), t.TASK_TIMEOUT / 2)
      57          except asyncio.TimeoutError:
      58              pass
      59  
      60          self.assertTrue(t.exited)
      61  
      62      async def test_wait_for_timeout_less_then_0_or_0_future_done(self):
      63          loop = asyncio.get_running_loop()
      64  
      65          fut = loop.create_future()
      66          fut.set_result('done')
      67  
      68          t0 = loop.time()
      69          ret = await asyncio.wait_for(fut, 0)
      70          t1 = loop.time()
      71  
      72          self.assertEqual(ret, 'done')
      73          self.assertTrue(fut.done())
      74          self.assertLess(t1 - t0, 0.1)
      75  
      76      async def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self):
      77          loop = asyncio.get_running_loop()
      78  
      79          foo_started = False
      80  
      81          async def foo():
      82              nonlocal foo_started
      83              foo_started = True
      84  
      85          with self.assertRaises(asyncio.TimeoutError):
      86              t0 = loop.time()
      87              await asyncio.wait_for(foo(), 0)
      88          t1 = loop.time()
      89  
      90          self.assertEqual(foo_started, False)
      91          self.assertLess(t1 - t0, 0.1)
      92  
      93      async def test_wait_for_timeout_less_then_0_or_0(self):
      94          loop = asyncio.get_running_loop()
      95  
      96          for timeout in [0, -1]:
      97              with self.subTest(timeout=timeout):
      98                  foo_running = None
      99                  started = loop.create_future()
     100  
     101                  async def foo():
     102                      nonlocal foo_running
     103                      foo_running = True
     104                      started.set_result(None)
     105                      try:
     106                          await asyncio.sleep(10)
     107                      finally:
     108                          foo_running = False
     109                      return 'done'
     110  
     111                  fut = asyncio.create_task(foo())
     112                  await started
     113  
     114                  with self.assertRaises(asyncio.TimeoutError):
     115                      t0 = loop.time()
     116                      await asyncio.wait_for(fut, timeout)
     117                  t1 = loop.time()
     118  
     119                  self.assertTrue(fut.done())
     120                  # it should have been cancelled due to the timeout
     121                  self.assertTrue(fut.cancelled())
     122                  self.assertEqual(foo_running, False)
     123                  self.assertLess(t1 - t0, 0.1)
     124  
     125      async def test_wait_for(self):
     126          loop = asyncio.get_running_loop()
     127          foo_running = None
     128  
     129          async def foo():
     130              nonlocal foo_running
     131              foo_running = True
     132              try:
     133                  await asyncio.sleep(10)
     134              finally:
     135                  foo_running = False
     136              return 'done'
     137  
     138          fut = asyncio.create_task(foo())
     139  
     140          with self.assertRaises(asyncio.TimeoutError):
     141              t0 = loop.time()
     142              await asyncio.wait_for(fut, 0.1)
     143          t1 = loop.time()
     144          self.assertTrue(fut.done())
     145          # it should have been cancelled due to the timeout
     146          self.assertTrue(fut.cancelled())
     147          self.assertLess(t1 - t0, 0.5)
     148          self.assertEqual(foo_running, False)
     149  
     150      async def test_wait_for_blocking(self):
     151          async def coro():
     152              return 'done'
     153  
     154          res = await asyncio.wait_for(coro(), timeout=None)
     155          self.assertEqual(res, 'done')
     156  
     157      async def test_wait_for_race_condition(self):
     158          loop = asyncio.get_running_loop()
     159  
     160          fut = loop.create_future()
     161          task = asyncio.wait_for(fut, timeout=0.2)
     162          loop.call_soon(fut.set_result, "ok")
     163          res = await task
     164          self.assertEqual(res, "ok")
     165  
     166      async def test_wait_for_cancellation_race_condition(self):
     167          async def inner():
     168              with self.assertRaises(asyncio.CancelledError):
     169                  await asyncio.sleep(1)
     170              return 1
     171  
     172          result = await asyncio.wait_for(inner(), timeout=.01)
     173          self.assertEqual(result, 1)
     174  
     175      async def test_wait_for_waits_for_task_cancellation(self):
     176          task_done = False
     177  
     178          async def inner():
     179              nonlocal task_done
     180              try:
     181                  await asyncio.sleep(10)
     182              except asyncio.CancelledError:
     183                  await asyncio.sleep(_EPSILON)
     184                  raise
     185              finally:
     186                  task_done = True
     187  
     188          inner_task = asyncio.create_task(inner())
     189  
     190          with self.assertRaises(asyncio.TimeoutError) as cm:
     191              await asyncio.wait_for(inner_task, timeout=_EPSILON)
     192  
     193          self.assertTrue(task_done)
     194          chained = cm.exception.__context__
     195          self.assertEqual(type(chained), asyncio.CancelledError)
     196  
     197      async def test_wait_for_waits_for_task_cancellation_w_timeout_0(self):
     198          task_done = False
     199  
     200          async def foo():
     201              async def inner():
     202                  nonlocal task_done
     203                  try:
     204                      await asyncio.sleep(10)
     205                  except asyncio.CancelledError:
     206                      await asyncio.sleep(_EPSILON)
     207                      raise
     208                  finally:
     209                      task_done = True
     210  
     211              inner_task = asyncio.create_task(inner())
     212              await asyncio.sleep(_EPSILON)
     213              await asyncio.wait_for(inner_task, timeout=0)
     214  
     215          with self.assertRaises(asyncio.TimeoutError) as cm:
     216              await foo()
     217  
     218          self.assertTrue(task_done)
     219          chained = cm.exception.__context__
     220          self.assertEqual(type(chained), asyncio.CancelledError)
     221  
     222      async def test_wait_for_reraises_exception_during_cancellation(self):
     223          class ESC[4;38;5;81mFooException(ESC[4;38;5;149mException):
     224              pass
     225  
     226          async def foo():
     227              async def inner():
     228                  try:
     229                      await asyncio.sleep(0.2)
     230                  finally:
     231                      raise FooException
     232  
     233              inner_task = asyncio.create_task(inner())
     234  
     235              await asyncio.wait_for(inner_task, timeout=_EPSILON)
     236  
     237          with self.assertRaises(FooException):
     238              await foo()
     239  
     240      async def _test_cancel_wait_for(self, timeout):
     241          loop = asyncio.get_running_loop()
     242  
     243          async def blocking_coroutine():
     244              fut = loop.create_future()
     245              # Block: fut result is never set
     246              await fut
     247  
     248          task = asyncio.create_task(blocking_coroutine())
     249  
     250          wait = asyncio.create_task(asyncio.wait_for(task, timeout))
     251          loop.call_soon(wait.cancel)
     252  
     253          with self.assertRaises(asyncio.CancelledError):
     254              await wait
     255  
     256          # Python issue #23219: cancelling the wait must also cancel the task
     257          self.assertTrue(task.cancelled())
     258  
     259      async def test_cancel_blocking_wait_for(self):
     260          await self._test_cancel_wait_for(None)
     261  
     262      async def test_cancel_wait_for(self):
     263          await self._test_cancel_wait_for(60.0)
     264  
     265      async def test_wait_for_cancel_suppressed(self):
     266          # GH-86296: Supressing CancelledError is discouraged
     267          # but if a task subpresses CancelledError and returns a value,
     268          # `wait_for` should return the value instead of raising CancelledError.
     269          # This is the same behavior as `asyncio.timeout`.
     270  
     271          async def return_42():
     272              try:
     273                  await asyncio.sleep(10)
     274              except asyncio.CancelledError:
     275                  return 42
     276  
     277          res = await asyncio.wait_for(return_42(), timeout=0.1)
     278          self.assertEqual(res, 42)
     279  
     280  
     281      async def test_wait_for_issue86296(self):
     282          # GH-86296: The task should get cancelled and not run to completion.
     283          # inner completes in one cycle of the event loop so it
     284          # completes before the task is cancelled.
     285  
     286          async def inner():
     287              return 'done'
     288  
     289          inner_task = asyncio.create_task(inner())
     290          reached_end = False
     291  
     292          async def wait_for_coro():
     293              await asyncio.wait_for(inner_task, timeout=100)
     294              await asyncio.sleep(1)
     295              nonlocal reached_end
     296              reached_end = True
     297  
     298          task = asyncio.create_task(wait_for_coro())
     299          self.assertFalse(task.done())
     300          # Run the task
     301          await asyncio.sleep(0)
     302          task.cancel()
     303          with self.assertRaises(asyncio.CancelledError):
     304              await task
     305          self.assertTrue(inner_task.done())
     306          self.assertEqual(await inner_task, 'done')
     307          self.assertFalse(reached_end)
     308  
     309  
     310  class ESC[4;38;5;81mWaitForShieldTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
     311  
     312      async def test_zero_timeout(self):
     313          # `asyncio.shield` creates a new task which wraps the passed in
     314          # awaitable and shields it from cancellation so with timeout=0
     315          # the task returned by `asyncio.shield` aka shielded_task gets
     316          # cancelled immediately and the task wrapped by it is scheduled
     317          # to run.
     318  
     319          async def coro():
     320              await asyncio.sleep(0.01)
     321              return 'done'
     322  
     323          task = asyncio.create_task(coro())
     324          with self.assertRaises(asyncio.TimeoutError):
     325              shielded_task = asyncio.shield(task)
     326              await asyncio.wait_for(shielded_task, timeout=0)
     327  
     328          # Task is running in background
     329          self.assertFalse(task.done())
     330          self.assertFalse(task.cancelled())
     331          self.assertTrue(shielded_task.cancelled())
     332  
     333          # Wait for the task to complete
     334          await asyncio.sleep(0.1)
     335          self.assertTrue(task.done())
     336  
     337  
     338      async def test_none_timeout(self):
     339          # With timeout=None the timeout is disabled so it
     340          # runs till completion.
     341          async def coro():
     342              await asyncio.sleep(0.1)
     343              return 'done'
     344  
     345          task = asyncio.create_task(coro())
     346          await asyncio.wait_for(asyncio.shield(task), timeout=None)
     347  
     348          self.assertTrue(task.done())
     349          self.assertEqual(await task, "done")
     350  
     351      async def test_shielded_timeout(self):
     352          # shield prevents the task from being cancelled.
     353          async def coro():
     354              await asyncio.sleep(0.1)
     355              return 'done'
     356  
     357          task = asyncio.create_task(coro())
     358          with self.assertRaises(asyncio.TimeoutError):
     359              await asyncio.wait_for(asyncio.shield(task), timeout=0.01)
     360  
     361          self.assertFalse(task.done())
     362          self.assertFalse(task.cancelled())
     363          self.assertEqual(await task, "done")
     364  
     365  
     366  if __name__ == '__main__':
     367      unittest.main()