1 import asyncio
2 import unittest
3 import time
4 from test import support
5
6
7 def tearDownModule():
8 asyncio.set_event_loop_policy(None)
9
10
11 # The following value can be used as a very small timeout:
12 # it passes check "timeout > 0", but has almost
13 # no effect on the test performance
14 _EPSILON = 0.0001
15
16
17 class ESC[4;38;5;81mSlowTask:
18 """ Task will run for this defined time, ignoring cancel requests """
19 TASK_TIMEOUT = 0.2
20
21 def __init__(self):
22 self.exited = False
23
24 async def run(self):
25 exitat = time.monotonic() + self.TASK_TIMEOUT
26
27 while True:
28 tosleep = exitat - time.monotonic()
29 if tosleep <= 0:
30 break
31
32 try:
33 await asyncio.sleep(tosleep)
34 except asyncio.CancelledError:
35 pass
36
37 self.exited = True
38
39
40 class ESC[4;38;5;81mAsyncioWaitForTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mIsolatedAsyncioTestCase):
41
42 async def test_asyncio_wait_for_cancelled(self):
43 t = SlowTask()
44
45 waitfortask = asyncio.create_task(
46 asyncio.wait_for(t.run(), t.TASK_TIMEOUT * 2))
47 await asyncio.sleep(0)
48 waitfortask.cancel()
49 await asyncio.wait({waitfortask})
50
51 self.assertTrue(t.exited)
52
53 async def test_asyncio_wait_for_timeout(self):
54 t = SlowTask()
55
56 try:
57 await asyncio.wait_for(t.run(), t.TASK_TIMEOUT / 2)
58 except asyncio.TimeoutError:
59 pass
60
61 self.assertTrue(t.exited)
62
63 async def test_wait_for_timeout_less_then_0_or_0_future_done(self):
64 loop = asyncio.get_running_loop()
65
66 fut = loop.create_future()
67 fut.set_result('done')
68
69 ret = await asyncio.wait_for(fut, 0)
70
71 self.assertEqual(ret, 'done')
72 self.assertTrue(fut.done())
73
74 async def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self):
75 foo_started = False
76
77 async def foo():
78 nonlocal foo_started
79 foo_started = True
80
81 with self.assertRaises(asyncio.TimeoutError):
82 await asyncio.wait_for(foo(), 0)
83
84 self.assertEqual(foo_started, False)
85
86 async def test_wait_for_timeout_less_then_0_or_0(self):
87 loop = asyncio.get_running_loop()
88
89 for timeout in [0, -1]:
90 with self.subTest(timeout=timeout):
91 foo_running = None
92 started = loop.create_future()
93
94 async def foo():
95 nonlocal foo_running
96 foo_running = True
97 started.set_result(None)
98 try:
99 await asyncio.sleep(10)
100 finally:
101 foo_running = False
102 return 'done'
103
104 fut = asyncio.create_task(foo())
105 await started
106
107 with self.assertRaises(asyncio.TimeoutError):
108 await asyncio.wait_for(fut, timeout)
109
110 self.assertTrue(fut.done())
111 # it should have been cancelled due to the timeout
112 self.assertTrue(fut.cancelled())
113 self.assertEqual(foo_running, False)
114
115 async def test_wait_for(self):
116 foo_running = None
117
118 async def foo():
119 nonlocal foo_running
120 foo_running = True
121 try:
122 await asyncio.sleep(support.LONG_TIMEOUT)
123 finally:
124 foo_running = False
125 return 'done'
126
127 fut = asyncio.create_task(foo())
128
129 with self.assertRaises(asyncio.TimeoutError):
130 await asyncio.wait_for(fut, 0.1)
131 self.assertTrue(fut.done())
132 # it should have been cancelled due to the timeout
133 self.assertTrue(fut.cancelled())
134 self.assertEqual(foo_running, False)
135
136 async def test_wait_for_blocking(self):
137 async def coro():
138 return 'done'
139
140 res = await asyncio.wait_for(coro(), timeout=None)
141 self.assertEqual(res, 'done')
142
143 async def test_wait_for_race_condition(self):
144 loop = asyncio.get_running_loop()
145
146 fut = loop.create_future()
147 task = asyncio.wait_for(fut, timeout=0.2)
148 loop.call_later(0.1, fut.set_result, "ok")
149 res = await task
150 self.assertEqual(res, "ok")
151
152 async def test_wait_for_cancellation_race_condition(self):
153 async def inner():
154 with self.assertRaises(asyncio.CancelledError):
155 await asyncio.sleep(1)
156 return 1
157
158 result = await asyncio.wait_for(inner(), timeout=.01)
159 self.assertEqual(result, 1)
160
161 async def test_wait_for_waits_for_task_cancellation(self):
162 task_done = False
163
164 async def inner():
165 nonlocal task_done
166 try:
167 await asyncio.sleep(10)
168 except asyncio.CancelledError:
169 await asyncio.sleep(_EPSILON)
170 raise
171 finally:
172 task_done = True
173
174 inner_task = asyncio.create_task(inner())
175
176 with self.assertRaises(asyncio.TimeoutError) as cm:
177 await asyncio.wait_for(inner_task, timeout=_EPSILON)
178
179 self.assertTrue(task_done)
180 chained = cm.exception.__context__
181 self.assertEqual(type(chained), asyncio.CancelledError)
182
183 async def test_wait_for_waits_for_task_cancellation_w_timeout_0(self):
184 task_done = False
185
186 async def foo():
187 async def inner():
188 nonlocal task_done
189 try:
190 await asyncio.sleep(10)
191 except asyncio.CancelledError:
192 await asyncio.sleep(_EPSILON)
193 raise
194 finally:
195 task_done = True
196
197 inner_task = asyncio.create_task(inner())
198 await asyncio.sleep(_EPSILON)
199 await asyncio.wait_for(inner_task, timeout=0)
200
201 with self.assertRaises(asyncio.TimeoutError) as cm:
202 await foo()
203
204 self.assertTrue(task_done)
205 chained = cm.exception.__context__
206 self.assertEqual(type(chained), asyncio.CancelledError)
207
208 async def test_wait_for_reraises_exception_during_cancellation(self):
209 class ESC[4;38;5;81mFooException(ESC[4;38;5;149mException):
210 pass
211
212 async def foo():
213 async def inner():
214 try:
215 await asyncio.sleep(0.2)
216 finally:
217 raise FooException
218
219 inner_task = asyncio.create_task(inner())
220
221 await asyncio.wait_for(inner_task, timeout=_EPSILON)
222
223 with self.assertRaises(FooException):
224 await foo()
225
226 async def test_wait_for_self_cancellation(self):
227 async def inner():
228 try:
229 await asyncio.sleep(0.3)
230 except asyncio.CancelledError:
231 try:
232 await asyncio.sleep(0.3)
233 except asyncio.CancelledError:
234 await asyncio.sleep(0.3)
235
236 return 42
237
238 inner_task = asyncio.create_task(inner())
239
240 wait = asyncio.wait_for(inner_task, timeout=0.1)
241
242 # Test that wait_for itself is properly cancellable
243 # even when the initial task holds up the initial cancellation.
244 task = asyncio.create_task(wait)
245 await asyncio.sleep(0.2)
246 task.cancel()
247
248 with self.assertRaises(asyncio.CancelledError):
249 await task
250
251 self.assertEqual(await inner_task, 42)
252
253 async def _test_cancel_wait_for(self, timeout):
254 loop = asyncio.get_running_loop()
255
256 async def blocking_coroutine():
257 fut = loop.create_future()
258 # Block: fut result is never set
259 await fut
260
261 task = asyncio.create_task(blocking_coroutine())
262
263 wait = asyncio.create_task(asyncio.wait_for(task, timeout))
264 loop.call_soon(wait.cancel)
265
266 with self.assertRaises(asyncio.CancelledError):
267 await wait
268
269 # Python issue #23219: cancelling the wait must also cancel the task
270 self.assertTrue(task.cancelled())
271
272 async def test_cancel_blocking_wait_for(self):
273 await self._test_cancel_wait_for(None)
274
275 async def test_cancel_wait_for(self):
276 await self._test_cancel_wait_for(60.0)
277
278
279 if __name__ == '__main__':
280 unittest.main()