(root)/
Python-3.11.7/
Lib/
test/
test_asyncio/
test_runners.py
       1  import _thread
       2  import asyncio
       3  import contextvars
       4  import gc
       5  import re
       6  import signal
       7  import threading
       8  import unittest
       9  from test.test_asyncio import utils as test_utils
      10  from unittest import mock
      11  from unittest.mock import patch
      12  
      13  
      14  def tearDownModule():
      15      asyncio.set_event_loop_policy(None)
      16  
      17  
      18  def interrupt_self():
      19      _thread.interrupt_main()
      20  
      21  
      22  class ESC[4;38;5;81mTestPolicy(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mAbstractEventLoopPolicy):
      23  
      24      def __init__(self, loop_factory):
      25          self.loop_factory = loop_factory
      26          self.loop = None
      27  
      28      def get_event_loop(self):
      29          # shouldn't ever be called by asyncio.run()
      30          raise RuntimeError
      31  
      32      def new_event_loop(self):
      33          return self.loop_factory()
      34  
      35      def set_event_loop(self, loop):
      36          if loop is not None:
      37              # we want to check if the loop is closed
      38              # in BaseTest.tearDown
      39              self.loop = loop
      40  
      41  
      42  class ESC[4;38;5;81mBaseTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      43  
      44      def new_loop(self):
      45          loop = asyncio.BaseEventLoop()
      46          loop._process_events = mock.Mock()
      47          # Mock waking event loop from select
      48          loop._write_to_self = mock.Mock()
      49          loop._write_to_self.return_value = None
      50          loop._selector = mock.Mock()
      51          loop._selector.select.return_value = ()
      52          loop.shutdown_ag_run = False
      53  
      54          async def shutdown_asyncgens():
      55              loop.shutdown_ag_run = True
      56          loop.shutdown_asyncgens = shutdown_asyncgens
      57  
      58          return loop
      59  
      60      def setUp(self):
      61          super().setUp()
      62  
      63          policy = TestPolicy(self.new_loop)
      64          asyncio.set_event_loop_policy(policy)
      65  
      66      def tearDown(self):
      67          policy = asyncio.get_event_loop_policy()
      68          if policy.loop is not None:
      69              self.assertTrue(policy.loop.is_closed())
      70              self.assertTrue(policy.loop.shutdown_ag_run)
      71  
      72          asyncio.set_event_loop_policy(None)
      73          super().tearDown()
      74  
      75  
      76  class ESC[4;38;5;81mRunTests(ESC[4;38;5;149mBaseTest):
      77  
      78      def test_asyncio_run_return(self):
      79          async def main():
      80              await asyncio.sleep(0)
      81              return 42
      82  
      83          self.assertEqual(asyncio.run(main()), 42)
      84  
      85      def test_asyncio_run_raises(self):
      86          async def main():
      87              await asyncio.sleep(0)
      88              raise ValueError('spam')
      89  
      90          with self.assertRaisesRegex(ValueError, 'spam'):
      91              asyncio.run(main())
      92  
      93      def test_asyncio_run_only_coro(self):
      94          for o in {1, lambda: None}:
      95              with self.subTest(obj=o), \
      96                      self.assertRaisesRegex(ValueError,
      97                                             'a coroutine was expected'):
      98                  asyncio.run(o)
      99  
     100      def test_asyncio_run_debug(self):
     101          async def main(expected):
     102              loop = asyncio.get_event_loop()
     103              self.assertIs(loop.get_debug(), expected)
     104  
     105          asyncio.run(main(False), debug=False)
     106          asyncio.run(main(True), debug=True)
     107          with mock.patch('asyncio.coroutines._is_debug_mode', lambda: True):
     108              asyncio.run(main(True))
     109              asyncio.run(main(False), debug=False)
     110          with mock.patch('asyncio.coroutines._is_debug_mode', lambda: False):
     111              asyncio.run(main(True), debug=True)
     112              asyncio.run(main(False))
     113  
     114      def test_asyncio_run_from_running_loop(self):
     115          async def main():
     116              coro = main()
     117              try:
     118                  asyncio.run(coro)
     119              finally:
     120                  coro.close()  # Suppress ResourceWarning
     121  
     122          with self.assertRaisesRegex(RuntimeError,
     123                                      'cannot be called from a running'):
     124              asyncio.run(main())
     125  
     126      def test_asyncio_run_cancels_hanging_tasks(self):
     127          lo_task = None
     128  
     129          async def leftover():
     130              await asyncio.sleep(0.1)
     131  
     132          async def main():
     133              nonlocal lo_task
     134              lo_task = asyncio.create_task(leftover())
     135              return 123
     136  
     137          self.assertEqual(asyncio.run(main()), 123)
     138          self.assertTrue(lo_task.done())
     139  
     140      def test_asyncio_run_reports_hanging_tasks_errors(self):
     141          lo_task = None
     142          call_exc_handler_mock = mock.Mock()
     143  
     144          async def leftover():
     145              try:
     146                  await asyncio.sleep(0.1)
     147              except asyncio.CancelledError:
     148                  1 / 0
     149  
     150          async def main():
     151              loop = asyncio.get_running_loop()
     152              loop.call_exception_handler = call_exc_handler_mock
     153  
     154              nonlocal lo_task
     155              lo_task = asyncio.create_task(leftover())
     156              return 123
     157  
     158          self.assertEqual(asyncio.run(main()), 123)
     159          self.assertTrue(lo_task.done())
     160  
     161          call_exc_handler_mock.assert_called_with({
     162              'message': test_utils.MockPattern(r'asyncio.run.*shutdown'),
     163              'task': lo_task,
     164              'exception': test_utils.MockInstanceOf(ZeroDivisionError)
     165          })
     166  
     167      def test_asyncio_run_closes_gens_after_hanging_tasks_errors(self):
     168          spinner = None
     169          lazyboy = None
     170  
     171          class ESC[4;38;5;81mFancyExit(ESC[4;38;5;149mException):
     172              pass
     173  
     174          async def fidget():
     175              while True:
     176                  yield 1
     177                  await asyncio.sleep(1)
     178  
     179          async def spin():
     180              nonlocal spinner
     181              spinner = fidget()
     182              try:
     183                  async for the_meaning_of_life in spinner:  # NoQA
     184                      pass
     185              except asyncio.CancelledError:
     186                  1 / 0
     187  
     188          async def main():
     189              loop = asyncio.get_running_loop()
     190              loop.call_exception_handler = mock.Mock()
     191  
     192              nonlocal lazyboy
     193              lazyboy = asyncio.create_task(spin())
     194              raise FancyExit
     195  
     196          with self.assertRaises(FancyExit):
     197              asyncio.run(main())
     198  
     199          self.assertTrue(lazyboy.done())
     200  
     201          self.assertIsNone(spinner.ag_frame)
     202          self.assertFalse(spinner.ag_running)
     203  
     204      def test_asyncio_run_set_event_loop(self):
     205          #See https://github.com/python/cpython/issues/93896
     206  
     207          async def main():
     208              await asyncio.sleep(0)
     209              return 42
     210  
     211          policy = asyncio.get_event_loop_policy()
     212          policy.set_event_loop = mock.Mock()
     213          asyncio.run(main())
     214          self.assertTrue(policy.set_event_loop.called)
     215  
     216      def test_asyncio_run_without_uncancel(self):
     217          # See https://github.com/python/cpython/issues/95097
     218          class ESC[4;38;5;81mTask:
     219              def __init__(self, loop, coro, **kwargs):
     220                  self._task = asyncio.Task(coro, loop=loop, **kwargs)
     221  
     222              def cancel(self, *args, **kwargs):
     223                  return self._task.cancel(*args, **kwargs)
     224  
     225              def add_done_callback(self, *args, **kwargs):
     226                  return self._task.add_done_callback(*args, **kwargs)
     227  
     228              def remove_done_callback(self, *args, **kwargs):
     229                  return self._task.remove_done_callback(*args, **kwargs)
     230  
     231              @property
     232              def _asyncio_future_blocking(self):
     233                  return self._task._asyncio_future_blocking
     234  
     235              def result(self, *args, **kwargs):
     236                  return self._task.result(*args, **kwargs)
     237  
     238              def done(self, *args, **kwargs):
     239                  return self._task.done(*args, **kwargs)
     240  
     241              def cancelled(self, *args, **kwargs):
     242                  return self._task.cancelled(*args, **kwargs)
     243  
     244              def exception(self, *args, **kwargs):
     245                  return self._task.exception(*args, **kwargs)
     246  
     247              def get_loop(self, *args, **kwargs):
     248                  return self._task.get_loop(*args, **kwargs)
     249  
     250  
     251          async def main():
     252              interrupt_self()
     253              await asyncio.Event().wait()
     254  
     255          def new_event_loop():
     256              loop = self.new_loop()
     257              loop.set_task_factory(Task)
     258              return loop
     259  
     260          asyncio.set_event_loop_policy(TestPolicy(new_event_loop))
     261          with self.assertRaises(asyncio.CancelledError):
     262              asyncio.run(main())
     263  
     264  
     265  class ESC[4;38;5;81mRunnerTests(ESC[4;38;5;149mBaseTest):
     266  
     267      def test_non_debug(self):
     268          with asyncio.Runner(debug=False) as runner:
     269              self.assertFalse(runner.get_loop().get_debug())
     270  
     271      def test_debug(self):
     272          with asyncio.Runner(debug=True) as runner:
     273              self.assertTrue(runner.get_loop().get_debug())
     274  
     275      def test_custom_factory(self):
     276          loop = mock.Mock()
     277          with asyncio.Runner(loop_factory=lambda: loop) as runner:
     278              self.assertIs(runner.get_loop(), loop)
     279  
     280      def test_run(self):
     281          async def f():
     282              await asyncio.sleep(0)
     283              return 'done'
     284  
     285          with asyncio.Runner() as runner:
     286              self.assertEqual('done', runner.run(f()))
     287              loop = runner.get_loop()
     288  
     289          with self.assertRaisesRegex(
     290              RuntimeError,
     291              "Runner is closed"
     292          ):
     293              runner.get_loop()
     294  
     295          self.assertTrue(loop.is_closed())
     296  
     297      def test_run_non_coro(self):
     298          with asyncio.Runner() as runner:
     299              with self.assertRaisesRegex(
     300                  ValueError,
     301                  "a coroutine was expected"
     302              ):
     303                  runner.run(123)
     304  
     305      def test_run_future(self):
     306          with asyncio.Runner() as runner:
     307              with self.assertRaisesRegex(
     308                  ValueError,
     309                  "a coroutine was expected"
     310              ):
     311                  fut = runner.get_loop().create_future()
     312                  runner.run(fut)
     313  
     314      def test_explicit_close(self):
     315          runner = asyncio.Runner()
     316          loop = runner.get_loop()
     317          runner.close()
     318          with self.assertRaisesRegex(
     319                  RuntimeError,
     320                  "Runner is closed"
     321          ):
     322              runner.get_loop()
     323  
     324          self.assertTrue(loop.is_closed())
     325  
     326      def test_double_close(self):
     327          runner = asyncio.Runner()
     328          loop = runner.get_loop()
     329  
     330          runner.close()
     331          self.assertTrue(loop.is_closed())
     332  
     333          # the second call is no-op
     334          runner.close()
     335          self.assertTrue(loop.is_closed())
     336  
     337      def test_second_with_block_raises(self):
     338          ret = []
     339  
     340          async def f(arg):
     341              ret.append(arg)
     342  
     343          runner = asyncio.Runner()
     344          with runner:
     345              runner.run(f(1))
     346  
     347          with self.assertRaisesRegex(
     348              RuntimeError,
     349              "Runner is closed"
     350          ):
     351              with runner:
     352                  runner.run(f(2))
     353  
     354          self.assertEqual([1], ret)
     355  
     356      def test_run_keeps_context(self):
     357          cvar = contextvars.ContextVar("cvar", default=-1)
     358  
     359          async def f(val):
     360              old = cvar.get()
     361              await asyncio.sleep(0)
     362              cvar.set(val)
     363              return old
     364  
     365          async def get_context():
     366              return contextvars.copy_context()
     367  
     368          with asyncio.Runner() as runner:
     369              self.assertEqual(-1, runner.run(f(1)))
     370              self.assertEqual(1, runner.run(f(2)))
     371  
     372              self.assertEqual(2, runner.run(get_context()).get(cvar))
     373  
     374      def test_recursive_run(self):
     375          async def g():
     376              pass
     377  
     378          async def f():
     379              runner.run(g())
     380  
     381          with asyncio.Runner() as runner:
     382              with self.assertWarnsRegex(
     383                  RuntimeWarning,
     384                  "coroutine .+ was never awaited",
     385              ):
     386                  with self.assertRaisesRegex(
     387                      RuntimeError,
     388                      re.escape(
     389                          "Runner.run() cannot be called from a running event loop"
     390                      ),
     391                  ):
     392                      runner.run(f())
     393  
     394      def test_interrupt_call_soon(self):
     395          # The only case when task is not suspended by waiting a future
     396          # or another task
     397          assert threading.current_thread() is threading.main_thread()
     398  
     399          async def coro():
     400              with self.assertRaises(asyncio.CancelledError):
     401                  while True:
     402                      await asyncio.sleep(0)
     403              raise asyncio.CancelledError()
     404  
     405          with asyncio.Runner() as runner:
     406              runner.get_loop().call_later(0.1, interrupt_self)
     407              with self.assertRaises(KeyboardInterrupt):
     408                  runner.run(coro())
     409  
     410      def test_interrupt_wait(self):
     411          # interrupting when waiting a future cancels both future and main task
     412          assert threading.current_thread() is threading.main_thread()
     413  
     414          async def coro(fut):
     415              with self.assertRaises(asyncio.CancelledError):
     416                  await fut
     417              raise asyncio.CancelledError()
     418  
     419          with asyncio.Runner() as runner:
     420              fut = runner.get_loop().create_future()
     421              runner.get_loop().call_later(0.1, interrupt_self)
     422  
     423              with self.assertRaises(KeyboardInterrupt):
     424                  runner.run(coro(fut))
     425  
     426              self.assertTrue(fut.cancelled())
     427  
     428      def test_interrupt_cancelled_task(self):
     429          # interrupting cancelled main task doesn't raise KeyboardInterrupt
     430          assert threading.current_thread() is threading.main_thread()
     431  
     432          async def subtask(task):
     433              await asyncio.sleep(0)
     434              task.cancel()
     435              interrupt_self()
     436  
     437          async def coro():
     438              asyncio.create_task(subtask(asyncio.current_task()))
     439              await asyncio.sleep(10)
     440  
     441          with asyncio.Runner() as runner:
     442              with self.assertRaises(asyncio.CancelledError):
     443                  runner.run(coro())
     444  
     445      def test_signal_install_not_supported_ok(self):
     446          # signal.signal() can throw if the "main thread" doensn't have signals enabled
     447          assert threading.current_thread() is threading.main_thread()
     448  
     449          async def coro():
     450              pass
     451  
     452          with asyncio.Runner() as runner:
     453              with patch.object(
     454                  signal,
     455                  "signal",
     456                  side_effect=ValueError(
     457                      "signal only works in main thread of the main interpreter"
     458                  )
     459              ):
     460                  runner.run(coro())
     461  
     462      def test_set_event_loop_called_once(self):
     463          # See https://github.com/python/cpython/issues/95736
     464          async def coro():
     465              pass
     466  
     467          policy = asyncio.get_event_loop_policy()
     468          policy.set_event_loop = mock.Mock()
     469          runner = asyncio.Runner()
     470          runner.run(coro())
     471          runner.run(coro())
     472  
     473          self.assertEqual(1, policy.set_event_loop.call_count)
     474          runner.close()
     475  
     476  
     477  if __name__ == '__main__':
     478      unittest.main()