python (3.12.0)
1 import _thread
2 import asyncio
3 import contextvars
4 import re
5 import signal
6 import threading
7 import unittest
8 from test.test_asyncio import utils as test_utils
9 from unittest import mock
10 from unittest.mock import patch
11
12
13 def tearDownModule():
14 asyncio.set_event_loop_policy(None)
15
16
17 def interrupt_self():
18 _thread.interrupt_main()
19
20
21 class ESC[4;38;5;81mTestPolicy(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mAbstractEventLoopPolicy):
22
23 def __init__(self, loop_factory):
24 self.loop_factory = loop_factory
25 self.loop = None
26
27 def get_event_loop(self):
28 # shouldn't ever be called by asyncio.run()
29 raise RuntimeError
30
31 def new_event_loop(self):
32 return self.loop_factory()
33
34 def set_event_loop(self, loop):
35 if loop is not None:
36 # we want to check if the loop is closed
37 # in BaseTest.tearDown
38 self.loop = loop
39
40
41 class ESC[4;38;5;81mBaseTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
42
43 def new_loop(self):
44 loop = asyncio.BaseEventLoop()
45 loop._process_events = mock.Mock()
46 # Mock waking event loop from select
47 loop._write_to_self = mock.Mock()
48 loop._write_to_self.return_value = None
49 loop._selector = mock.Mock()
50 loop._selector.select.return_value = ()
51 loop.shutdown_ag_run = False
52
53 async def shutdown_asyncgens():
54 loop.shutdown_ag_run = True
55 loop.shutdown_asyncgens = shutdown_asyncgens
56
57 return loop
58
59 def setUp(self):
60 super().setUp()
61
62 policy = TestPolicy(self.new_loop)
63 asyncio.set_event_loop_policy(policy)
64
65 def tearDown(self):
66 policy = asyncio.get_event_loop_policy()
67 if policy.loop is not None:
68 self.assertTrue(policy.loop.is_closed())
69 self.assertTrue(policy.loop.shutdown_ag_run)
70
71 asyncio.set_event_loop_policy(None)
72 super().tearDown()
73
74
75 class ESC[4;38;5;81mRunTests(ESC[4;38;5;149mBaseTest):
76
77 def test_asyncio_run_return(self):
78 async def main():
79 await asyncio.sleep(0)
80 return 42
81
82 self.assertEqual(asyncio.run(main()), 42)
83
84 def test_asyncio_run_raises(self):
85 async def main():
86 await asyncio.sleep(0)
87 raise ValueError('spam')
88
89 with self.assertRaisesRegex(ValueError, 'spam'):
90 asyncio.run(main())
91
92 def test_asyncio_run_only_coro(self):
93 for o in {1, lambda: None}:
94 with self.subTest(obj=o), \
95 self.assertRaisesRegex(ValueError,
96 'a coroutine was expected'):
97 asyncio.run(o)
98
99 def test_asyncio_run_debug(self):
100 async def main(expected):
101 loop = asyncio.get_event_loop()
102 self.assertIs(loop.get_debug(), expected)
103
104 asyncio.run(main(False), debug=False)
105 asyncio.run(main(True), debug=True)
106 with mock.patch('asyncio.coroutines._is_debug_mode', lambda: True):
107 asyncio.run(main(True))
108 asyncio.run(main(False), debug=False)
109 with mock.patch('asyncio.coroutines._is_debug_mode', lambda: False):
110 asyncio.run(main(True), debug=True)
111 asyncio.run(main(False))
112
113 def test_asyncio_run_from_running_loop(self):
114 async def main():
115 coro = main()
116 try:
117 asyncio.run(coro)
118 finally:
119 coro.close() # Suppress ResourceWarning
120
121 with self.assertRaisesRegex(RuntimeError,
122 'cannot be called from a running'):
123 asyncio.run(main())
124
125 def test_asyncio_run_cancels_hanging_tasks(self):
126 lo_task = None
127
128 async def leftover():
129 await asyncio.sleep(0.1)
130
131 async def main():
132 nonlocal lo_task
133 lo_task = asyncio.create_task(leftover())
134 return 123
135
136 self.assertEqual(asyncio.run(main()), 123)
137 self.assertTrue(lo_task.done())
138
139 def test_asyncio_run_reports_hanging_tasks_errors(self):
140 lo_task = None
141 call_exc_handler_mock = mock.Mock()
142
143 async def leftover():
144 try:
145 await asyncio.sleep(0.1)
146 except asyncio.CancelledError:
147 1 / 0
148
149 async def main():
150 loop = asyncio.get_running_loop()
151 loop.call_exception_handler = call_exc_handler_mock
152
153 nonlocal lo_task
154 lo_task = asyncio.create_task(leftover())
155 return 123
156
157 self.assertEqual(asyncio.run(main()), 123)
158 self.assertTrue(lo_task.done())
159
160 call_exc_handler_mock.assert_called_with({
161 'message': test_utils.MockPattern(r'asyncio.run.*shutdown'),
162 'task': lo_task,
163 'exception': test_utils.MockInstanceOf(ZeroDivisionError)
164 })
165
166 def test_asyncio_run_closes_gens_after_hanging_tasks_errors(self):
167 spinner = None
168 lazyboy = None
169
170 class ESC[4;38;5;81mFancyExit(ESC[4;38;5;149mException):
171 pass
172
173 async def fidget():
174 while True:
175 yield 1
176 await asyncio.sleep(1)
177
178 async def spin():
179 nonlocal spinner
180 spinner = fidget()
181 try:
182 async for the_meaning_of_life in spinner: # NoQA
183 pass
184 except asyncio.CancelledError:
185 1 / 0
186
187 async def main():
188 loop = asyncio.get_running_loop()
189 loop.call_exception_handler = mock.Mock()
190
191 nonlocal lazyboy
192 lazyboy = asyncio.create_task(spin())
193 raise FancyExit
194
195 with self.assertRaises(FancyExit):
196 asyncio.run(main())
197
198 self.assertTrue(lazyboy.done())
199
200 self.assertIsNone(spinner.ag_frame)
201 self.assertFalse(spinner.ag_running)
202
203 def test_asyncio_run_set_event_loop(self):
204 #See https://github.com/python/cpython/issues/93896
205
206 async def main():
207 await asyncio.sleep(0)
208 return 42
209
210 policy = asyncio.get_event_loop_policy()
211 policy.set_event_loop = mock.Mock()
212 asyncio.run(main())
213 self.assertTrue(policy.set_event_loop.called)
214
215 def test_asyncio_run_without_uncancel(self):
216 # See https://github.com/python/cpython/issues/95097
217 class ESC[4;38;5;81mTask:
218 def __init__(self, loop, coro, **kwargs):
219 self._task = asyncio.Task(coro, loop=loop, **kwargs)
220
221 def cancel(self, *args, **kwargs):
222 return self._task.cancel(*args, **kwargs)
223
224 def add_done_callback(self, *args, **kwargs):
225 return self._task.add_done_callback(*args, **kwargs)
226
227 def remove_done_callback(self, *args, **kwargs):
228 return self._task.remove_done_callback(*args, **kwargs)
229
230 @property
231 def _asyncio_future_blocking(self):
232 return self._task._asyncio_future_blocking
233
234 def result(self, *args, **kwargs):
235 return self._task.result(*args, **kwargs)
236
237 def done(self, *args, **kwargs):
238 return self._task.done(*args, **kwargs)
239
240 def cancelled(self, *args, **kwargs):
241 return self._task.cancelled(*args, **kwargs)
242
243 def exception(self, *args, **kwargs):
244 return self._task.exception(*args, **kwargs)
245
246 def get_loop(self, *args, **kwargs):
247 return self._task.get_loop(*args, **kwargs)
248
249
250 async def main():
251 interrupt_self()
252 await asyncio.Event().wait()
253
254 def new_event_loop():
255 loop = self.new_loop()
256 loop.set_task_factory(Task)
257 return loop
258
259 asyncio.set_event_loop_policy(TestPolicy(new_event_loop))
260 with self.assertRaises(asyncio.CancelledError):
261 asyncio.run(main())
262
263 def test_asyncio_run_loop_factory(self):
264 factory = mock.Mock()
265 loop = factory.return_value = self.new_loop()
266
267 async def main():
268 self.assertEqual(asyncio.get_running_loop(), loop)
269
270 asyncio.run(main(), loop_factory=factory)
271 factory.assert_called_once_with()
272
273
274 class ESC[4;38;5;81mRunnerTests(ESC[4;38;5;149mBaseTest):
275
276 def test_non_debug(self):
277 with asyncio.Runner(debug=False) as runner:
278 self.assertFalse(runner.get_loop().get_debug())
279
280 def test_debug(self):
281 with asyncio.Runner(debug=True) as runner:
282 self.assertTrue(runner.get_loop().get_debug())
283
284 def test_custom_factory(self):
285 loop = mock.Mock()
286 with asyncio.Runner(loop_factory=lambda: loop) as runner:
287 self.assertIs(runner.get_loop(), loop)
288
289 def test_run(self):
290 async def f():
291 await asyncio.sleep(0)
292 return 'done'
293
294 with asyncio.Runner() as runner:
295 self.assertEqual('done', runner.run(f()))
296 loop = runner.get_loop()
297
298 with self.assertRaisesRegex(
299 RuntimeError,
300 "Runner is closed"
301 ):
302 runner.get_loop()
303
304 self.assertTrue(loop.is_closed())
305
306 def test_run_non_coro(self):
307 with asyncio.Runner() as runner:
308 with self.assertRaisesRegex(
309 ValueError,
310 "a coroutine was expected"
311 ):
312 runner.run(123)
313
314 def test_run_future(self):
315 with asyncio.Runner() as runner:
316 with self.assertRaisesRegex(
317 ValueError,
318 "a coroutine was expected"
319 ):
320 fut = runner.get_loop().create_future()
321 runner.run(fut)
322
323 def test_explicit_close(self):
324 runner = asyncio.Runner()
325 loop = runner.get_loop()
326 runner.close()
327 with self.assertRaisesRegex(
328 RuntimeError,
329 "Runner is closed"
330 ):
331 runner.get_loop()
332
333 self.assertTrue(loop.is_closed())
334
335 def test_double_close(self):
336 runner = asyncio.Runner()
337 loop = runner.get_loop()
338
339 runner.close()
340 self.assertTrue(loop.is_closed())
341
342 # the second call is no-op
343 runner.close()
344 self.assertTrue(loop.is_closed())
345
346 def test_second_with_block_raises(self):
347 ret = []
348
349 async def f(arg):
350 ret.append(arg)
351
352 runner = asyncio.Runner()
353 with runner:
354 runner.run(f(1))
355
356 with self.assertRaisesRegex(
357 RuntimeError,
358 "Runner is closed"
359 ):
360 with runner:
361 runner.run(f(2))
362
363 self.assertEqual([1], ret)
364
365 def test_run_keeps_context(self):
366 cvar = contextvars.ContextVar("cvar", default=-1)
367
368 async def f(val):
369 old = cvar.get()
370 await asyncio.sleep(0)
371 cvar.set(val)
372 return old
373
374 async def get_context():
375 return contextvars.copy_context()
376
377 with asyncio.Runner() as runner:
378 self.assertEqual(-1, runner.run(f(1)))
379 self.assertEqual(1, runner.run(f(2)))
380
381 self.assertEqual(2, runner.run(get_context()).get(cvar))
382
383 def test_recursive_run(self):
384 async def g():
385 pass
386
387 async def f():
388 runner.run(g())
389
390 with asyncio.Runner() as runner:
391 with self.assertWarnsRegex(
392 RuntimeWarning,
393 "coroutine .+ was never awaited",
394 ):
395 with self.assertRaisesRegex(
396 RuntimeError,
397 re.escape(
398 "Runner.run() cannot be called from a running event loop"
399 ),
400 ):
401 runner.run(f())
402
403 def test_interrupt_call_soon(self):
404 # The only case when task is not suspended by waiting a future
405 # or another task
406 assert threading.current_thread() is threading.main_thread()
407
408 async def coro():
409 with self.assertRaises(asyncio.CancelledError):
410 while True:
411 await asyncio.sleep(0)
412 raise asyncio.CancelledError()
413
414 with asyncio.Runner() as runner:
415 runner.get_loop().call_later(0.1, interrupt_self)
416 with self.assertRaises(KeyboardInterrupt):
417 runner.run(coro())
418
419 def test_interrupt_wait(self):
420 # interrupting when waiting a future cancels both future and main task
421 assert threading.current_thread() is threading.main_thread()
422
423 async def coro(fut):
424 with self.assertRaises(asyncio.CancelledError):
425 await fut
426 raise asyncio.CancelledError()
427
428 with asyncio.Runner() as runner:
429 fut = runner.get_loop().create_future()
430 runner.get_loop().call_later(0.1, interrupt_self)
431
432 with self.assertRaises(KeyboardInterrupt):
433 runner.run(coro(fut))
434
435 self.assertTrue(fut.cancelled())
436
437 def test_interrupt_cancelled_task(self):
438 # interrupting cancelled main task doesn't raise KeyboardInterrupt
439 assert threading.current_thread() is threading.main_thread()
440
441 async def subtask(task):
442 await asyncio.sleep(0)
443 task.cancel()
444 interrupt_self()
445
446 async def coro():
447 asyncio.create_task(subtask(asyncio.current_task()))
448 await asyncio.sleep(10)
449
450 with asyncio.Runner() as runner:
451 with self.assertRaises(asyncio.CancelledError):
452 runner.run(coro())
453
454 def test_signal_install_not_supported_ok(self):
455 # signal.signal() can throw if the "main thread" doesn't have signals enabled
456 assert threading.current_thread() is threading.main_thread()
457
458 async def coro():
459 pass
460
461 with asyncio.Runner() as runner:
462 with patch.object(
463 signal,
464 "signal",
465 side_effect=ValueError(
466 "signal only works in main thread of the main interpreter"
467 )
468 ):
469 runner.run(coro())
470
471 def test_set_event_loop_called_once(self):
472 # See https://github.com/python/cpython/issues/95736
473 async def coro():
474 pass
475
476 policy = asyncio.get_event_loop_policy()
477 policy.set_event_loop = mock.Mock()
478 runner = asyncio.Runner()
479 runner.run(coro())
480 runner.run(coro())
481
482 self.assertEqual(1, policy.set_event_loop.call_count)
483 runner.close()
484
485
486 if __name__ == '__main__':
487 unittest.main()