python (3.12.0)
1 import asyncio
2 import unittest
3 import time
4
5
6 def tearDownModule():
7 asyncio.set_event_loop_policy(None)
8
9
10 # The following value can be used as a very small timeout:
11 # it passes check "timeout > 0", but has almost
12 # no effect on the test performance
13 _EPSILON = 0.0001
14
15
16 class ESC[4;38;5;81mSlowTask:
17 """ Task will run for this defined time, ignoring cancel requests """
18 TASK_TIMEOUT = 0.2
19
20 def __init__(self):
21 self.exited = False
22
23 async def run(self):
24 exitat = time.monotonic() + self.TASK_TIMEOUT
25
26 while True:
27 tosleep = exitat - time.monotonic()
28 if tosleep <= 0:
29 break
30
31 try:
32 await asyncio.sleep(tosleep)
33 except asyncio.CancelledError:
34 pass
35
36 self.exited = True
37
38
39 class ESC[4;38;5;81mAsyncioWaitForTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
40
41 async def test_asyncio_wait_for_cancelled(self):
42 t = SlowTask()
43
44 waitfortask = asyncio.create_task(
45 asyncio.wait_for(t.run(), t.TASK_TIMEOUT * 2))
46 await asyncio.sleep(0)
47 waitfortask.cancel()
48 await asyncio.wait({waitfortask})
49
50 self.assertTrue(t.exited)
51
52 async def test_asyncio_wait_for_timeout(self):
53 t = SlowTask()
54
55 try:
56 await asyncio.wait_for(t.run(), t.TASK_TIMEOUT / 2)
57 except asyncio.TimeoutError:
58 pass
59
60 self.assertTrue(t.exited)
61
62 async def test_wait_for_timeout_less_then_0_or_0_future_done(self):
63 loop = asyncio.get_running_loop()
64
65 fut = loop.create_future()
66 fut.set_result('done')
67
68 t0 = loop.time()
69 ret = await asyncio.wait_for(fut, 0)
70 t1 = loop.time()
71
72 self.assertEqual(ret, 'done')
73 self.assertTrue(fut.done())
74 self.assertLess(t1 - t0, 0.1)
75
76 async def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self):
77 loop = asyncio.get_running_loop()
78
79 foo_started = False
80
81 async def foo():
82 nonlocal foo_started
83 foo_started = True
84
85 with self.assertRaises(asyncio.TimeoutError):
86 t0 = loop.time()
87 await asyncio.wait_for(foo(), 0)
88 t1 = loop.time()
89
90 self.assertEqual(foo_started, False)
91 self.assertLess(t1 - t0, 0.1)
92
93 async def test_wait_for_timeout_less_then_0_or_0(self):
94 loop = asyncio.get_running_loop()
95
96 for timeout in [0, -1]:
97 with self.subTest(timeout=timeout):
98 foo_running = None
99 started = loop.create_future()
100
101 async def foo():
102 nonlocal foo_running
103 foo_running = True
104 started.set_result(None)
105 try:
106 await asyncio.sleep(10)
107 finally:
108 foo_running = False
109 return 'done'
110
111 fut = asyncio.create_task(foo())
112 await started
113
114 with self.assertRaises(asyncio.TimeoutError):
115 t0 = loop.time()
116 await asyncio.wait_for(fut, timeout)
117 t1 = loop.time()
118
119 self.assertTrue(fut.done())
120 # it should have been cancelled due to the timeout
121 self.assertTrue(fut.cancelled())
122 self.assertEqual(foo_running, False)
123 self.assertLess(t1 - t0, 0.1)
124
125 async def test_wait_for(self):
126 loop = asyncio.get_running_loop()
127 foo_running = None
128
129 async def foo():
130 nonlocal foo_running
131 foo_running = True
132 try:
133 await asyncio.sleep(10)
134 finally:
135 foo_running = False
136 return 'done'
137
138 fut = asyncio.create_task(foo())
139
140 with self.assertRaises(asyncio.TimeoutError):
141 t0 = loop.time()
142 await asyncio.wait_for(fut, 0.1)
143 t1 = loop.time()
144 self.assertTrue(fut.done())
145 # it should have been cancelled due to the timeout
146 self.assertTrue(fut.cancelled())
147 self.assertLess(t1 - t0, 0.5)
148 self.assertEqual(foo_running, False)
149
150 async def test_wait_for_blocking(self):
151 async def coro():
152 return 'done'
153
154 res = await asyncio.wait_for(coro(), timeout=None)
155 self.assertEqual(res, 'done')
156
157 async def test_wait_for_race_condition(self):
158 loop = asyncio.get_running_loop()
159
160 fut = loop.create_future()
161 task = asyncio.wait_for(fut, timeout=0.2)
162 loop.call_soon(fut.set_result, "ok")
163 res = await task
164 self.assertEqual(res, "ok")
165
166 async def test_wait_for_cancellation_race_condition(self):
167 async def inner():
168 with self.assertRaises(asyncio.CancelledError):
169 await asyncio.sleep(1)
170 return 1
171
172 result = await asyncio.wait_for(inner(), timeout=.01)
173 self.assertEqual(result, 1)
174
175 async def test_wait_for_waits_for_task_cancellation(self):
176 task_done = False
177
178 async def inner():
179 nonlocal task_done
180 try:
181 await asyncio.sleep(10)
182 except asyncio.CancelledError:
183 await asyncio.sleep(_EPSILON)
184 raise
185 finally:
186 task_done = True
187
188 inner_task = asyncio.create_task(inner())
189
190 with self.assertRaises(asyncio.TimeoutError) as cm:
191 await asyncio.wait_for(inner_task, timeout=_EPSILON)
192
193 self.assertTrue(task_done)
194 chained = cm.exception.__context__
195 self.assertEqual(type(chained), asyncio.CancelledError)
196
197 async def test_wait_for_waits_for_task_cancellation_w_timeout_0(self):
198 task_done = False
199
200 async def foo():
201 async def inner():
202 nonlocal task_done
203 try:
204 await asyncio.sleep(10)
205 except asyncio.CancelledError:
206 await asyncio.sleep(_EPSILON)
207 raise
208 finally:
209 task_done = True
210
211 inner_task = asyncio.create_task(inner())
212 await asyncio.sleep(_EPSILON)
213 await asyncio.wait_for(inner_task, timeout=0)
214
215 with self.assertRaises(asyncio.TimeoutError) as cm:
216 await foo()
217
218 self.assertTrue(task_done)
219 chained = cm.exception.__context__
220 self.assertEqual(type(chained), asyncio.CancelledError)
221
222 async def test_wait_for_reraises_exception_during_cancellation(self):
223 class ESC[4;38;5;81mFooException(ESC[4;38;5;149mException):
224 pass
225
226 async def foo():
227 async def inner():
228 try:
229 await asyncio.sleep(0.2)
230 finally:
231 raise FooException
232
233 inner_task = asyncio.create_task(inner())
234
235 await asyncio.wait_for(inner_task, timeout=_EPSILON)
236
237 with self.assertRaises(FooException):
238 await foo()
239
240 async def _test_cancel_wait_for(self, timeout):
241 loop = asyncio.get_running_loop()
242
243 async def blocking_coroutine():
244 fut = loop.create_future()
245 # Block: fut result is never set
246 await fut
247
248 task = asyncio.create_task(blocking_coroutine())
249
250 wait = asyncio.create_task(asyncio.wait_for(task, timeout))
251 loop.call_soon(wait.cancel)
252
253 with self.assertRaises(asyncio.CancelledError):
254 await wait
255
256 # Python issue #23219: cancelling the wait must also cancel the task
257 self.assertTrue(task.cancelled())
258
259 async def test_cancel_blocking_wait_for(self):
260 await self._test_cancel_wait_for(None)
261
262 async def test_cancel_wait_for(self):
263 await self._test_cancel_wait_for(60.0)
264
265 async def test_wait_for_cancel_suppressed(self):
266 # GH-86296: Supressing CancelledError is discouraged
267 # but if a task subpresses CancelledError and returns a value,
268 # `wait_for` should return the value instead of raising CancelledError.
269 # This is the same behavior as `asyncio.timeout`.
270
271 async def return_42():
272 try:
273 await asyncio.sleep(10)
274 except asyncio.CancelledError:
275 return 42
276
277 res = await asyncio.wait_for(return_42(), timeout=0.1)
278 self.assertEqual(res, 42)
279
280
281 async def test_wait_for_issue86296(self):
282 # GH-86296: The task should get cancelled and not run to completion.
283 # inner completes in one cycle of the event loop so it
284 # completes before the task is cancelled.
285
286 async def inner():
287 return 'done'
288
289 inner_task = asyncio.create_task(inner())
290 reached_end = False
291
292 async def wait_for_coro():
293 await asyncio.wait_for(inner_task, timeout=100)
294 await asyncio.sleep(1)
295 nonlocal reached_end
296 reached_end = True
297
298 task = asyncio.create_task(wait_for_coro())
299 self.assertFalse(task.done())
300 # Run the task
301 await asyncio.sleep(0)
302 task.cancel()
303 with self.assertRaises(asyncio.CancelledError):
304 await task
305 self.assertTrue(inner_task.done())
306 self.assertEqual(await inner_task, 'done')
307 self.assertFalse(reached_end)
308
309
310 class ESC[4;38;5;81mWaitForShieldTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
311
312 async def test_zero_timeout(self):
313 # `asyncio.shield` creates a new task which wraps the passed in
314 # awaitable and shields it from cancellation so with timeout=0
315 # the task returned by `asyncio.shield` aka shielded_task gets
316 # cancelled immediately and the task wrapped by it is scheduled
317 # to run.
318
319 async def coro():
320 await asyncio.sleep(0.01)
321 return 'done'
322
323 task = asyncio.create_task(coro())
324 with self.assertRaises(asyncio.TimeoutError):
325 shielded_task = asyncio.shield(task)
326 await asyncio.wait_for(shielded_task, timeout=0)
327
328 # Task is running in background
329 self.assertFalse(task.done())
330 self.assertFalse(task.cancelled())
331 self.assertTrue(shielded_task.cancelled())
332
333 # Wait for the task to complete
334 await asyncio.sleep(0.1)
335 self.assertTrue(task.done())
336
337
338 async def test_none_timeout(self):
339 # With timeout=None the timeout is disabled so it
340 # runs till completion.
341 async def coro():
342 await asyncio.sleep(0.1)
343 return 'done'
344
345 task = asyncio.create_task(coro())
346 await asyncio.wait_for(asyncio.shield(task), timeout=None)
347
348 self.assertTrue(task.done())
349 self.assertEqual(await task, "done")
350
351 async def test_shielded_timeout(self):
352 # shield prevents the task from being cancelled.
353 async def coro():
354 await asyncio.sleep(0.1)
355 return 'done'
356
357 task = asyncio.create_task(coro())
358 with self.assertRaises(asyncio.TimeoutError):
359 await asyncio.wait_for(asyncio.shield(task), timeout=0.01)
360
361 self.assertFalse(task.done())
362 self.assertFalse(task.cancelled())
363 self.assertEqual(await task, "done")
364
365
366 if __name__ == '__main__':
367 unittest.main()