1 """Tests for asyncio/timeouts.py"""
2
3 import unittest
4 import time
5
6 import asyncio
7 from asyncio import tasks
8
9 from test.test_asyncio.utils import await_without_task
10
11
12 def tearDownModule():
13 asyncio.set_event_loop_policy(None)
14
15 class ESC[4;38;5;81mTimeoutTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
16
17 async def test_timeout_basic(self):
18 with self.assertRaises(TimeoutError):
19 async with asyncio.timeout(0.01) as cm:
20 await asyncio.sleep(10)
21 self.assertTrue(cm.expired())
22
23 async def test_timeout_at_basic(self):
24 loop = asyncio.get_running_loop()
25
26 with self.assertRaises(TimeoutError):
27 deadline = loop.time() + 0.01
28 async with asyncio.timeout_at(deadline) as cm:
29 await asyncio.sleep(10)
30 self.assertTrue(cm.expired())
31 self.assertEqual(deadline, cm.when())
32
33 async def test_nested_timeouts(self):
34 loop = asyncio.get_running_loop()
35 cancelled = False
36 with self.assertRaises(TimeoutError):
37 deadline = loop.time() + 0.01
38 async with asyncio.timeout_at(deadline) as cm1:
39 # Only the topmost context manager should raise TimeoutError
40 try:
41 async with asyncio.timeout_at(deadline) as cm2:
42 await asyncio.sleep(10)
43 except asyncio.CancelledError:
44 cancelled = True
45 raise
46 self.assertTrue(cancelled)
47 self.assertTrue(cm1.expired())
48 self.assertTrue(cm2.expired())
49
50 async def test_waiter_cancelled(self):
51 cancelled = False
52 with self.assertRaises(TimeoutError):
53 async with asyncio.timeout(0.01):
54 try:
55 await asyncio.sleep(10)
56 except asyncio.CancelledError:
57 cancelled = True
58 raise
59 self.assertTrue(cancelled)
60
61 async def test_timeout_not_called(self):
62 loop = asyncio.get_running_loop()
63 async with asyncio.timeout(10) as cm:
64 await asyncio.sleep(0.01)
65 t1 = loop.time()
66
67 self.assertFalse(cm.expired())
68 self.assertGreater(cm.when(), t1)
69
70 async def test_timeout_disabled(self):
71 async with asyncio.timeout(None) as cm:
72 await asyncio.sleep(0.01)
73
74 self.assertFalse(cm.expired())
75 self.assertIsNone(cm.when())
76
77 async def test_timeout_at_disabled(self):
78 async with asyncio.timeout_at(None) as cm:
79 await asyncio.sleep(0.01)
80
81 self.assertFalse(cm.expired())
82 self.assertIsNone(cm.when())
83
84 async def test_timeout_zero(self):
85 loop = asyncio.get_running_loop()
86 t0 = loop.time()
87 with self.assertRaises(TimeoutError):
88 async with asyncio.timeout(0) as cm:
89 await asyncio.sleep(10)
90 t1 = loop.time()
91 self.assertTrue(cm.expired())
92 self.assertTrue(t0 <= cm.when() <= t1)
93
94 async def test_timeout_zero_sleep_zero(self):
95 loop = asyncio.get_running_loop()
96 t0 = loop.time()
97 with self.assertRaises(TimeoutError):
98 async with asyncio.timeout(0) as cm:
99 await asyncio.sleep(0)
100 t1 = loop.time()
101 self.assertTrue(cm.expired())
102 self.assertTrue(t0 <= cm.when() <= t1)
103
104 async def test_timeout_in_the_past_sleep_zero(self):
105 loop = asyncio.get_running_loop()
106 t0 = loop.time()
107 with self.assertRaises(TimeoutError):
108 async with asyncio.timeout(-11) as cm:
109 await asyncio.sleep(0)
110 t1 = loop.time()
111 self.assertTrue(cm.expired())
112 self.assertTrue(t0 >= cm.when() <= t1)
113
114 async def test_foreign_exception_passed(self):
115 with self.assertRaises(KeyError):
116 async with asyncio.timeout(0.01) as cm:
117 raise KeyError
118 self.assertFalse(cm.expired())
119
120 async def test_foreign_exception_on_timeout(self):
121 async def crash():
122 try:
123 await asyncio.sleep(1)
124 finally:
125 1/0
126 with self.assertRaises(ZeroDivisionError):
127 async with asyncio.timeout(0.01):
128 await crash()
129
130 async def test_foreign_cancel_doesnt_timeout_if_not_expired(self):
131 with self.assertRaises(asyncio.CancelledError):
132 async with asyncio.timeout(10) as cm:
133 asyncio.current_task().cancel()
134 await asyncio.sleep(10)
135 self.assertFalse(cm.expired())
136
137 async def test_outer_task_is_not_cancelled(self):
138 async def outer() -> None:
139 with self.assertRaises(TimeoutError):
140 async with asyncio.timeout(0.001):
141 await asyncio.sleep(10)
142
143 task = asyncio.create_task(outer())
144 await task
145 self.assertFalse(task.cancelled())
146 self.assertTrue(task.done())
147
148 async def test_nested_timeouts_concurrent(self):
149 with self.assertRaises(TimeoutError):
150 async with asyncio.timeout(0.002):
151 with self.assertRaises(TimeoutError):
152 async with asyncio.timeout(0.1):
153 # Pretend we crunch some numbers.
154 time.sleep(0.01)
155 await asyncio.sleep(1)
156
157 async def test_nested_timeouts_loop_busy(self):
158 # After the inner timeout is an expensive operation which should
159 # be stopped by the outer timeout.
160 loop = asyncio.get_running_loop()
161 # Disable a message about long running task
162 loop.slow_callback_duration = 10
163 t0 = loop.time()
164 with self.assertRaises(TimeoutError):
165 async with asyncio.timeout(0.1): # (1)
166 with self.assertRaises(TimeoutError):
167 async with asyncio.timeout(0.01): # (2)
168 # Pretend the loop is busy for a while.
169 time.sleep(0.1)
170 await asyncio.sleep(1)
171 # TimeoutError was cought by (2)
172 await asyncio.sleep(10) # This sleep should be interrupted by (1)
173 t1 = loop.time()
174 self.assertTrue(t0 <= t1 <= t0 + 1)
175
176 async def test_reschedule(self):
177 loop = asyncio.get_running_loop()
178 fut = loop.create_future()
179 deadline1 = loop.time() + 10
180 deadline2 = deadline1 + 20
181
182 async def f():
183 async with asyncio.timeout_at(deadline1) as cm:
184 fut.set_result(cm)
185 await asyncio.sleep(50)
186
187 task = asyncio.create_task(f())
188 cm = await fut
189
190 self.assertEqual(cm.when(), deadline1)
191 cm.reschedule(deadline2)
192 self.assertEqual(cm.when(), deadline2)
193 cm.reschedule(None)
194 self.assertIsNone(cm.when())
195
196 task.cancel()
197
198 with self.assertRaises(asyncio.CancelledError):
199 await task
200 self.assertFalse(cm.expired())
201
202 async def test_repr_active(self):
203 async with asyncio.timeout(10) as cm:
204 self.assertRegex(repr(cm), r"<Timeout \[active\] when=\d+\.\d*>")
205
206 async def test_repr_expired(self):
207 with self.assertRaises(TimeoutError):
208 async with asyncio.timeout(0.01) as cm:
209 await asyncio.sleep(10)
210 self.assertEqual(repr(cm), "<Timeout [expired]>")
211
212 async def test_repr_finished(self):
213 async with asyncio.timeout(10) as cm:
214 await asyncio.sleep(0)
215
216 self.assertEqual(repr(cm), "<Timeout [finished]>")
217
218 async def test_repr_disabled(self):
219 async with asyncio.timeout(None) as cm:
220 self.assertEqual(repr(cm), r"<Timeout [active] when=None>")
221
222 async def test_nested_timeout_in_finally(self):
223 with self.assertRaises(TimeoutError):
224 async with asyncio.timeout(0.01):
225 try:
226 await asyncio.sleep(1)
227 finally:
228 with self.assertRaises(TimeoutError):
229 async with asyncio.timeout(0.01):
230 await asyncio.sleep(10)
231
232 async def test_timeout_after_cancellation(self):
233 try:
234 asyncio.current_task().cancel()
235 await asyncio.sleep(1) # work which will be cancelled
236 except asyncio.CancelledError:
237 pass
238 finally:
239 with self.assertRaises(TimeoutError):
240 async with asyncio.timeout(0.0):
241 await asyncio.sleep(1) # some cleanup
242
243 async def test_cancel_in_timeout_after_cancellation(self):
244 try:
245 asyncio.current_task().cancel()
246 await asyncio.sleep(1) # work which will be cancelled
247 except asyncio.CancelledError:
248 pass
249 finally:
250 with self.assertRaises(asyncio.CancelledError):
251 async with asyncio.timeout(1.0):
252 asyncio.current_task().cancel()
253 await asyncio.sleep(2) # some cleanup
254
255 async def test_timeout_exception_cause (self):
256 with self.assertRaises(asyncio.TimeoutError) as exc:
257 async with asyncio.timeout(0):
258 await asyncio.sleep(1)
259 cause = exc.exception.__cause__
260 assert isinstance(cause, asyncio.CancelledError)
261
262 async def test_timeout_already_entered(self):
263 async with asyncio.timeout(0.01) as cm:
264 with self.assertRaisesRegex(RuntimeError, "has already been entered"):
265 async with cm:
266 pass
267
268 async def test_timeout_double_enter(self):
269 async with asyncio.timeout(0.01) as cm:
270 pass
271 with self.assertRaisesRegex(RuntimeError, "has already been entered"):
272 async with cm:
273 pass
274
275 async def test_timeout_finished(self):
276 async with asyncio.timeout(0.01) as cm:
277 pass
278 with self.assertRaisesRegex(RuntimeError, "finished"):
279 cm.reschedule(0.02)
280
281 async def test_timeout_expired(self):
282 with self.assertRaises(TimeoutError):
283 async with asyncio.timeout(0.01) as cm:
284 await asyncio.sleep(1)
285 with self.assertRaisesRegex(RuntimeError, "expired"):
286 cm.reschedule(0.02)
287
288 async def test_timeout_expiring(self):
289 async with asyncio.timeout(0.01) as cm:
290 with self.assertRaises(asyncio.CancelledError):
291 await asyncio.sleep(1)
292 with self.assertRaisesRegex(RuntimeError, "expiring"):
293 cm.reschedule(0.02)
294
295 async def test_timeout_not_entered(self):
296 cm = asyncio.timeout(0.01)
297 with self.assertRaisesRegex(RuntimeError, "has not been entered"):
298 cm.reschedule(0.02)
299
300 async def test_timeout_without_task(self):
301 cm = asyncio.timeout(0.01)
302 with self.assertRaisesRegex(RuntimeError, "task"):
303 await await_without_task(cm.__aenter__())
304 with self.assertRaisesRegex(RuntimeError, "has not been entered"):
305 cm.reschedule(0.02)
306
307
308 if __name__ == '__main__':
309 unittest.main()