1 import _thread
2 import asyncio
3 import contextvars
4 import gc
5 import re
6 import signal
7 import threading
8 import unittest
9 from test.test_asyncio import utils as test_utils
10 from unittest import mock
11 from unittest.mock import patch
12
13
14 def tearDownModule():
15 asyncio.set_event_loop_policy(None)
16
17
18 def interrupt_self():
19 _thread.interrupt_main()
20
21
22 class ESC[4;38;5;81mTestPolicy(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mAbstractEventLoopPolicy):
23
24 def __init__(self, loop_factory):
25 self.loop_factory = loop_factory
26 self.loop = None
27
28 def get_event_loop(self):
29 # shouldn't ever be called by asyncio.run()
30 raise RuntimeError
31
32 def new_event_loop(self):
33 return self.loop_factory()
34
35 def set_event_loop(self, loop):
36 if loop is not None:
37 # we want to check if the loop is closed
38 # in BaseTest.tearDown
39 self.loop = loop
40
41
42 class ESC[4;38;5;81mBaseTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
43
44 def new_loop(self):
45 loop = asyncio.BaseEventLoop()
46 loop._process_events = mock.Mock()
47 # Mock waking event loop from select
48 loop._write_to_self = mock.Mock()
49 loop._write_to_self.return_value = None
50 loop._selector = mock.Mock()
51 loop._selector.select.return_value = ()
52 loop.shutdown_ag_run = False
53
54 async def shutdown_asyncgens():
55 loop.shutdown_ag_run = True
56 loop.shutdown_asyncgens = shutdown_asyncgens
57
58 return loop
59
60 def setUp(self):
61 super().setUp()
62
63 policy = TestPolicy(self.new_loop)
64 asyncio.set_event_loop_policy(policy)
65
66 def tearDown(self):
67 policy = asyncio.get_event_loop_policy()
68 if policy.loop is not None:
69 self.assertTrue(policy.loop.is_closed())
70 self.assertTrue(policy.loop.shutdown_ag_run)
71
72 asyncio.set_event_loop_policy(None)
73 super().tearDown()
74
75
76 class ESC[4;38;5;81mRunTests(ESC[4;38;5;149mBaseTest):
77
78 def test_asyncio_run_return(self):
79 async def main():
80 await asyncio.sleep(0)
81 return 42
82
83 self.assertEqual(asyncio.run(main()), 42)
84
85 def test_asyncio_run_raises(self):
86 async def main():
87 await asyncio.sleep(0)
88 raise ValueError('spam')
89
90 with self.assertRaisesRegex(ValueError, 'spam'):
91 asyncio.run(main())
92
93 def test_asyncio_run_only_coro(self):
94 for o in {1, lambda: None}:
95 with self.subTest(obj=o), \
96 self.assertRaisesRegex(ValueError,
97 'a coroutine was expected'):
98 asyncio.run(o)
99
100 def test_asyncio_run_debug(self):
101 async def main(expected):
102 loop = asyncio.get_event_loop()
103 self.assertIs(loop.get_debug(), expected)
104
105 asyncio.run(main(False), debug=False)
106 asyncio.run(main(True), debug=True)
107 with mock.patch('asyncio.coroutines._is_debug_mode', lambda: True):
108 asyncio.run(main(True))
109 asyncio.run(main(False), debug=False)
110 with mock.patch('asyncio.coroutines._is_debug_mode', lambda: False):
111 asyncio.run(main(True), debug=True)
112 asyncio.run(main(False))
113
114 def test_asyncio_run_from_running_loop(self):
115 async def main():
116 coro = main()
117 try:
118 asyncio.run(coro)
119 finally:
120 coro.close() # Suppress ResourceWarning
121
122 with self.assertRaisesRegex(RuntimeError,
123 'cannot be called from a running'):
124 asyncio.run(main())
125
126 def test_asyncio_run_cancels_hanging_tasks(self):
127 lo_task = None
128
129 async def leftover():
130 await asyncio.sleep(0.1)
131
132 async def main():
133 nonlocal lo_task
134 lo_task = asyncio.create_task(leftover())
135 return 123
136
137 self.assertEqual(asyncio.run(main()), 123)
138 self.assertTrue(lo_task.done())
139
140 def test_asyncio_run_reports_hanging_tasks_errors(self):
141 lo_task = None
142 call_exc_handler_mock = mock.Mock()
143
144 async def leftover():
145 try:
146 await asyncio.sleep(0.1)
147 except asyncio.CancelledError:
148 1 / 0
149
150 async def main():
151 loop = asyncio.get_running_loop()
152 loop.call_exception_handler = call_exc_handler_mock
153
154 nonlocal lo_task
155 lo_task = asyncio.create_task(leftover())
156 return 123
157
158 self.assertEqual(asyncio.run(main()), 123)
159 self.assertTrue(lo_task.done())
160
161 call_exc_handler_mock.assert_called_with({
162 'message': test_utils.MockPattern(r'asyncio.run.*shutdown'),
163 'task': lo_task,
164 'exception': test_utils.MockInstanceOf(ZeroDivisionError)
165 })
166
167 def test_asyncio_run_closes_gens_after_hanging_tasks_errors(self):
168 spinner = None
169 lazyboy = None
170
171 class ESC[4;38;5;81mFancyExit(ESC[4;38;5;149mException):
172 pass
173
174 async def fidget():
175 while True:
176 yield 1
177 await asyncio.sleep(1)
178
179 async def spin():
180 nonlocal spinner
181 spinner = fidget()
182 try:
183 async for the_meaning_of_life in spinner: # NoQA
184 pass
185 except asyncio.CancelledError:
186 1 / 0
187
188 async def main():
189 loop = asyncio.get_running_loop()
190 loop.call_exception_handler = mock.Mock()
191
192 nonlocal lazyboy
193 lazyboy = asyncio.create_task(spin())
194 raise FancyExit
195
196 with self.assertRaises(FancyExit):
197 asyncio.run(main())
198
199 self.assertTrue(lazyboy.done())
200
201 self.assertIsNone(spinner.ag_frame)
202 self.assertFalse(spinner.ag_running)
203
204 def test_asyncio_run_set_event_loop(self):
205 #See https://github.com/python/cpython/issues/93896
206
207 async def main():
208 await asyncio.sleep(0)
209 return 42
210
211 policy = asyncio.get_event_loop_policy()
212 policy.set_event_loop = mock.Mock()
213 asyncio.run(main())
214 self.assertTrue(policy.set_event_loop.called)
215
216 def test_asyncio_run_without_uncancel(self):
217 # See https://github.com/python/cpython/issues/95097
218 class ESC[4;38;5;81mTask:
219 def __init__(self, loop, coro, **kwargs):
220 self._task = asyncio.Task(coro, loop=loop, **kwargs)
221
222 def cancel(self, *args, **kwargs):
223 return self._task.cancel(*args, **kwargs)
224
225 def add_done_callback(self, *args, **kwargs):
226 return self._task.add_done_callback(*args, **kwargs)
227
228 def remove_done_callback(self, *args, **kwargs):
229 return self._task.remove_done_callback(*args, **kwargs)
230
231 @property
232 def _asyncio_future_blocking(self):
233 return self._task._asyncio_future_blocking
234
235 def result(self, *args, **kwargs):
236 return self._task.result(*args, **kwargs)
237
238 def done(self, *args, **kwargs):
239 return self._task.done(*args, **kwargs)
240
241 def cancelled(self, *args, **kwargs):
242 return self._task.cancelled(*args, **kwargs)
243
244 def exception(self, *args, **kwargs):
245 return self._task.exception(*args, **kwargs)
246
247 def get_loop(self, *args, **kwargs):
248 return self._task.get_loop(*args, **kwargs)
249
250
251 async def main():
252 interrupt_self()
253 await asyncio.Event().wait()
254
255 def new_event_loop():
256 loop = self.new_loop()
257 loop.set_task_factory(Task)
258 return loop
259
260 asyncio.set_event_loop_policy(TestPolicy(new_event_loop))
261 with self.assertRaises(asyncio.CancelledError):
262 asyncio.run(main())
263
264
265 class ESC[4;38;5;81mRunnerTests(ESC[4;38;5;149mBaseTest):
266
267 def test_non_debug(self):
268 with asyncio.Runner(debug=False) as runner:
269 self.assertFalse(runner.get_loop().get_debug())
270
271 def test_debug(self):
272 with asyncio.Runner(debug=True) as runner:
273 self.assertTrue(runner.get_loop().get_debug())
274
275 def test_custom_factory(self):
276 loop = mock.Mock()
277 with asyncio.Runner(loop_factory=lambda: loop) as runner:
278 self.assertIs(runner.get_loop(), loop)
279
280 def test_run(self):
281 async def f():
282 await asyncio.sleep(0)
283 return 'done'
284
285 with asyncio.Runner() as runner:
286 self.assertEqual('done', runner.run(f()))
287 loop = runner.get_loop()
288
289 with self.assertRaisesRegex(
290 RuntimeError,
291 "Runner is closed"
292 ):
293 runner.get_loop()
294
295 self.assertTrue(loop.is_closed())
296
297 def test_run_non_coro(self):
298 with asyncio.Runner() as runner:
299 with self.assertRaisesRegex(
300 ValueError,
301 "a coroutine was expected"
302 ):
303 runner.run(123)
304
305 def test_run_future(self):
306 with asyncio.Runner() as runner:
307 with self.assertRaisesRegex(
308 ValueError,
309 "a coroutine was expected"
310 ):
311 fut = runner.get_loop().create_future()
312 runner.run(fut)
313
314 def test_explicit_close(self):
315 runner = asyncio.Runner()
316 loop = runner.get_loop()
317 runner.close()
318 with self.assertRaisesRegex(
319 RuntimeError,
320 "Runner is closed"
321 ):
322 runner.get_loop()
323
324 self.assertTrue(loop.is_closed())
325
326 def test_double_close(self):
327 runner = asyncio.Runner()
328 loop = runner.get_loop()
329
330 runner.close()
331 self.assertTrue(loop.is_closed())
332
333 # the second call is no-op
334 runner.close()
335 self.assertTrue(loop.is_closed())
336
337 def test_second_with_block_raises(self):
338 ret = []
339
340 async def f(arg):
341 ret.append(arg)
342
343 runner = asyncio.Runner()
344 with runner:
345 runner.run(f(1))
346
347 with self.assertRaisesRegex(
348 RuntimeError,
349 "Runner is closed"
350 ):
351 with runner:
352 runner.run(f(2))
353
354 self.assertEqual([1], ret)
355
356 def test_run_keeps_context(self):
357 cvar = contextvars.ContextVar("cvar", default=-1)
358
359 async def f(val):
360 old = cvar.get()
361 await asyncio.sleep(0)
362 cvar.set(val)
363 return old
364
365 async def get_context():
366 return contextvars.copy_context()
367
368 with asyncio.Runner() as runner:
369 self.assertEqual(-1, runner.run(f(1)))
370 self.assertEqual(1, runner.run(f(2)))
371
372 self.assertEqual(2, runner.run(get_context()).get(cvar))
373
374 def test_recursive_run(self):
375 async def g():
376 pass
377
378 async def f():
379 runner.run(g())
380
381 with asyncio.Runner() as runner:
382 with self.assertWarnsRegex(
383 RuntimeWarning,
384 "coroutine .+ was never awaited",
385 ):
386 with self.assertRaisesRegex(
387 RuntimeError,
388 re.escape(
389 "Runner.run() cannot be called from a running event loop"
390 ),
391 ):
392 runner.run(f())
393
394 def test_interrupt_call_soon(self):
395 # The only case when task is not suspended by waiting a future
396 # or another task
397 assert threading.current_thread() is threading.main_thread()
398
399 async def coro():
400 with self.assertRaises(asyncio.CancelledError):
401 while True:
402 await asyncio.sleep(0)
403 raise asyncio.CancelledError()
404
405 with asyncio.Runner() as runner:
406 runner.get_loop().call_later(0.1, interrupt_self)
407 with self.assertRaises(KeyboardInterrupt):
408 runner.run(coro())
409
410 def test_interrupt_wait(self):
411 # interrupting when waiting a future cancels both future and main task
412 assert threading.current_thread() is threading.main_thread()
413
414 async def coro(fut):
415 with self.assertRaises(asyncio.CancelledError):
416 await fut
417 raise asyncio.CancelledError()
418
419 with asyncio.Runner() as runner:
420 fut = runner.get_loop().create_future()
421 runner.get_loop().call_later(0.1, interrupt_self)
422
423 with self.assertRaises(KeyboardInterrupt):
424 runner.run(coro(fut))
425
426 self.assertTrue(fut.cancelled())
427
428 def test_interrupt_cancelled_task(self):
429 # interrupting cancelled main task doesn't raise KeyboardInterrupt
430 assert threading.current_thread() is threading.main_thread()
431
432 async def subtask(task):
433 await asyncio.sleep(0)
434 task.cancel()
435 interrupt_self()
436
437 async def coro():
438 asyncio.create_task(subtask(asyncio.current_task()))
439 await asyncio.sleep(10)
440
441 with asyncio.Runner() as runner:
442 with self.assertRaises(asyncio.CancelledError):
443 runner.run(coro())
444
445 def test_signal_install_not_supported_ok(self):
446 # signal.signal() can throw if the "main thread" doensn't have signals enabled
447 assert threading.current_thread() is threading.main_thread()
448
449 async def coro():
450 pass
451
452 with asyncio.Runner() as runner:
453 with patch.object(
454 signal,
455 "signal",
456 side_effect=ValueError(
457 "signal only works in main thread of the main interpreter"
458 )
459 ):
460 runner.run(coro())
461
462 def test_set_event_loop_called_once(self):
463 # See https://github.com/python/cpython/issues/95736
464 async def coro():
465 pass
466
467 policy = asyncio.get_event_loop_policy()
468 policy.set_event_loop = mock.Mock()
469 runner = asyncio.Runner()
470 runner.run(coro())
471 runner.run(coro())
472
473 self.assertEqual(1, policy.set_event_loop.call_count)
474 runner.close()
475
476
477 if __name__ == '__main__':
478 unittest.main()