1  """Tests for asyncio/timeouts.py"""
       2  
       3  import unittest
       4  import time
       5  
       6  import asyncio
       7  
       8  
       9  def tearDownModule():
      10      asyncio.set_event_loop_policy(None)
      11  
      12  
      13  class ESC[4;38;5;81mTimeoutTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
      14  
      15      async def test_timeout_basic(self):
      16          with self.assertRaises(TimeoutError):
      17              async with asyncio.timeout(0.01) as cm:
      18                  await asyncio.sleep(10)
      19          self.assertTrue(cm.expired())
      20  
      21      async def test_timeout_at_basic(self):
      22          loop = asyncio.get_running_loop()
      23  
      24          with self.assertRaises(TimeoutError):
      25              deadline = loop.time() + 0.01
      26              async with asyncio.timeout_at(deadline) as cm:
      27                  await asyncio.sleep(10)
      28          self.assertTrue(cm.expired())
      29          self.assertEqual(deadline, cm.when())
      30  
      31      async def test_nested_timeouts(self):
      32          loop = asyncio.get_running_loop()
      33          cancelled = False
      34          with self.assertRaises(TimeoutError):
      35              deadline = loop.time() + 0.01
      36              async with asyncio.timeout_at(deadline) as cm1:
      37                  # Only the topmost context manager should raise TimeoutError
      38                  try:
      39                      async with asyncio.timeout_at(deadline) as cm2:
      40                          await asyncio.sleep(10)
      41                  except asyncio.CancelledError:
      42                      cancelled = True
      43                      raise
      44          self.assertTrue(cancelled)
      45          self.assertTrue(cm1.expired())
      46          self.assertTrue(cm2.expired())
      47  
      48      async def test_waiter_cancelled(self):
      49          loop = asyncio.get_running_loop()
      50          cancelled = False
      51          with self.assertRaises(TimeoutError):
      52              async with asyncio.timeout(0.01):
      53                  try:
      54                      await asyncio.sleep(10)
      55                  except asyncio.CancelledError:
      56                      cancelled = True
      57                      raise
      58          self.assertTrue(cancelled)
      59  
      60      async def test_timeout_not_called(self):
      61          loop = asyncio.get_running_loop()
      62          t0 = loop.time()
      63          async with asyncio.timeout(10) as cm:
      64              await asyncio.sleep(0.01)
      65          t1 = loop.time()
      66  
      67          self.assertFalse(cm.expired())
      68          # 2 sec for slow CI boxes
      69          self.assertLess(t1-t0, 2)
      70          self.assertGreater(cm.when(), t1)
      71  
      72      async def test_timeout_disabled(self):
      73          loop = asyncio.get_running_loop()
      74          t0 = loop.time()
      75          async with asyncio.timeout(None) as cm:
      76              await asyncio.sleep(0.01)
      77          t1 = loop.time()
      78  
      79          self.assertFalse(cm.expired())
      80          self.assertIsNone(cm.when())
      81          # 2 sec for slow CI boxes
      82          self.assertLess(t1-t0, 2)
      83  
      84      async def test_timeout_at_disabled(self):
      85          loop = asyncio.get_running_loop()
      86          t0 = loop.time()
      87          async with asyncio.timeout_at(None) as cm:
      88              await asyncio.sleep(0.01)
      89          t1 = loop.time()
      90  
      91          self.assertFalse(cm.expired())
      92          self.assertIsNone(cm.when())
      93          # 2 sec for slow CI boxes
      94          self.assertLess(t1-t0, 2)
      95  
      96      async def test_timeout_zero(self):
      97          loop = asyncio.get_running_loop()
      98          t0 = loop.time()
      99          with self.assertRaises(TimeoutError):
     100              async with asyncio.timeout(0) as cm:
     101                  await asyncio.sleep(10)
     102          t1 = loop.time()
     103          self.assertTrue(cm.expired())
     104          # 2 sec for slow CI boxes
     105          self.assertLess(t1-t0, 2)
     106          self.assertTrue(t0 <= cm.when() <= t1)
     107  
     108      async def test_timeout_zero_sleep_zero(self):
     109          loop = asyncio.get_running_loop()
     110          t0 = loop.time()
     111          with self.assertRaises(TimeoutError):
     112              async with asyncio.timeout(0) as cm:
     113                  await asyncio.sleep(0)
     114          t1 = loop.time()
     115          self.assertTrue(cm.expired())
     116          # 2 sec for slow CI boxes
     117          self.assertLess(t1-t0, 2)
     118          self.assertTrue(t0 <= cm.when() <= t1)
     119  
     120      async def test_timeout_in_the_past_sleep_zero(self):
     121          loop = asyncio.get_running_loop()
     122          t0 = loop.time()
     123          with self.assertRaises(TimeoutError):
     124              async with asyncio.timeout(-11) as cm:
     125                  await asyncio.sleep(0)
     126          t1 = loop.time()
     127          self.assertTrue(cm.expired())
     128          # 2 sec for slow CI boxes
     129          self.assertLess(t1-t0, 2)
     130          self.assertTrue(t0 >= cm.when() <= t1)
     131  
     132      async def test_foreign_exception_passed(self):
     133          with self.assertRaises(KeyError):
     134              async with asyncio.timeout(0.01) as cm:
     135                  raise KeyError
     136          self.assertFalse(cm.expired())
     137  
     138      async def test_foreign_exception_on_timeout(self):
     139          async def crash():
     140              try:
     141                  await asyncio.sleep(1)
     142              finally:
     143                  1/0
     144          with self.assertRaises(ZeroDivisionError):
     145              async with asyncio.timeout(0.01):
     146                  await crash()
     147  
     148      async def test_foreign_cancel_doesnt_timeout_if_not_expired(self):
     149          with self.assertRaises(asyncio.CancelledError):
     150              async with asyncio.timeout(10) as cm:
     151                  asyncio.current_task().cancel()
     152                  await asyncio.sleep(10)
     153          self.assertFalse(cm.expired())
     154  
     155      async def test_outer_task_is_not_cancelled(self):
     156          async def outer() -> None:
     157              with self.assertRaises(TimeoutError):
     158                  async with asyncio.timeout(0.001):
     159                      await asyncio.sleep(10)
     160  
     161          task = asyncio.create_task(outer())
     162          await task
     163          self.assertFalse(task.cancelled())
     164          self.assertTrue(task.done())
     165  
     166      async def test_nested_timeouts_concurrent(self):
     167          with self.assertRaises(TimeoutError):
     168              async with asyncio.timeout(0.002):
     169                  with self.assertRaises(TimeoutError):
     170                      async with asyncio.timeout(0.1):
     171                          # Pretend we crunch some numbers.
     172                          time.sleep(0.01)
     173                          await asyncio.sleep(1)
     174  
     175      async def test_nested_timeouts_loop_busy(self):
     176          # After the inner timeout is an expensive operation which should
     177          # be stopped by the outer timeout.
     178          loop = asyncio.get_running_loop()
     179          # Disable a message about long running task
     180          loop.slow_callback_duration = 10
     181          t0 = loop.time()
     182          with self.assertRaises(TimeoutError):
     183              async with asyncio.timeout(0.1):  # (1)
     184                  with self.assertRaises(TimeoutError):
     185                      async with asyncio.timeout(0.01):  # (2)
     186                          # Pretend the loop is busy for a while.
     187                          time.sleep(0.1)
     188                          await asyncio.sleep(1)
     189                  # TimeoutError was cought by (2)
     190                  await asyncio.sleep(10) # This sleep should be interrupted by (1)
     191          t1 = loop.time()
     192          self.assertTrue(t0 <= t1 <= t0 + 1)
     193  
     194      async def test_reschedule(self):
     195          loop = asyncio.get_running_loop()
     196          fut = loop.create_future()
     197          deadline1 = loop.time() + 10
     198          deadline2 = deadline1 + 20
     199  
     200          async def f():
     201              async with asyncio.timeout_at(deadline1) as cm:
     202                  fut.set_result(cm)
     203                  await asyncio.sleep(50)
     204  
     205          task = asyncio.create_task(f())
     206          cm = await fut
     207  
     208          self.assertEqual(cm.when(), deadline1)
     209          cm.reschedule(deadline2)
     210          self.assertEqual(cm.when(), deadline2)
     211          cm.reschedule(None)
     212          self.assertIsNone(cm.when())
     213  
     214          task.cancel()
     215  
     216          with self.assertRaises(asyncio.CancelledError):
     217              await task
     218          self.assertFalse(cm.expired())
     219  
     220      async def test_repr_active(self):
     221          async with asyncio.timeout(10) as cm:
     222              self.assertRegex(repr(cm), r"<Timeout \[active\] when=\d+\.\d*>")
     223  
     224      async def test_repr_expired(self):
     225          with self.assertRaises(TimeoutError):
     226              async with asyncio.timeout(0.01) as cm:
     227                  await asyncio.sleep(10)
     228          self.assertEqual(repr(cm), "<Timeout [expired]>")
     229  
     230      async def test_repr_finished(self):
     231          async with asyncio.timeout(10) as cm:
     232              await asyncio.sleep(0)
     233  
     234          self.assertEqual(repr(cm), "<Timeout [finished]>")
     235  
     236      async def test_repr_disabled(self):
     237          async with asyncio.timeout(None) as cm:
     238              self.assertEqual(repr(cm), r"<Timeout [active] when=None>")
     239  
     240      async def test_nested_timeout_in_finally(self):
     241          with self.assertRaises(TimeoutError):
     242              async with asyncio.timeout(0.01):
     243                  try:
     244                      await asyncio.sleep(1)
     245                  finally:
     246                      with self.assertRaises(TimeoutError):
     247                          async with asyncio.timeout(0.01):
     248                              await asyncio.sleep(10)
     249  
     250      async def test_timeout_after_cancellation(self):
     251          try:
     252              asyncio.current_task().cancel()
     253              await asyncio.sleep(1)  # work which will be cancelled
     254          except asyncio.CancelledError:
     255              pass
     256          finally:
     257              with self.assertRaises(TimeoutError):
     258                  async with asyncio.timeout(0.0):
     259                      await asyncio.sleep(1)  # some cleanup
     260  
     261      async def test_cancel_in_timeout_after_cancellation(self):
     262          try:
     263              asyncio.current_task().cancel()
     264              await asyncio.sleep(1)  # work which will be cancelled
     265          except asyncio.CancelledError:
     266              pass
     267          finally:
     268              with self.assertRaises(asyncio.CancelledError):
     269                  async with asyncio.timeout(1.0):
     270                      asyncio.current_task().cancel()
     271                      await asyncio.sleep(2)  # some cleanup
     272  
     273      async def test_timeout_exception_cause (self):
     274          with self.assertRaises(asyncio.TimeoutError) as exc:
     275              async with asyncio.timeout(0):
     276                  await asyncio.sleep(1)
     277          cause = exc.exception.__cause__
     278          assert isinstance(cause, asyncio.CancelledError)
     279  
     280  
     281  if __name__ == '__main__':
     282      unittest.main()