1  import _thread
       2  import asyncio
       3  import contextvars
       4  import re
       5  import signal
       6  import threading
       7  import unittest
       8  from test.test_asyncio import utils as test_utils
       9  from unittest import mock
      10  from unittest.mock import patch
      11  
      12  
      13  def tearDownModule():
      14      asyncio.set_event_loop_policy(None)
      15  
      16  
      17  def interrupt_self():
      18      _thread.interrupt_main()
      19  
      20  
      21  class ESC[4;38;5;81mTestPolicy(ESC[4;38;5;149masyncioESC[4;38;5;149m.ESC[4;38;5;149mAbstractEventLoopPolicy):
      22  
      23      def __init__(self, loop_factory):
      24          self.loop_factory = loop_factory
      25          self.loop = None
      26  
      27      def get_event_loop(self):
      28          # shouldn't ever be called by asyncio.run()
      29          raise RuntimeError
      30  
      31      def new_event_loop(self):
      32          return self.loop_factory()
      33  
      34      def set_event_loop(self, loop):
      35          if loop is not None:
      36              # we want to check if the loop is closed
      37              # in BaseTest.tearDown
      38              self.loop = loop
      39  
      40  
      41  class ESC[4;38;5;81mBaseTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      42  
      43      def new_loop(self):
      44          loop = asyncio.BaseEventLoop()
      45          loop._process_events = mock.Mock()
      46          # Mock waking event loop from select
      47          loop._write_to_self = mock.Mock()
      48          loop._write_to_self.return_value = None
      49          loop._selector = mock.Mock()
      50          loop._selector.select.return_value = ()
      51          loop.shutdown_ag_run = False
      52  
      53          async def shutdown_asyncgens():
      54              loop.shutdown_ag_run = True
      55          loop.shutdown_asyncgens = shutdown_asyncgens
      56  
      57          return loop
      58  
      59      def setUp(self):
      60          super().setUp()
      61  
      62          policy = TestPolicy(self.new_loop)
      63          asyncio.set_event_loop_policy(policy)
      64  
      65      def tearDown(self):
      66          policy = asyncio.get_event_loop_policy()
      67          if policy.loop is not None:
      68              self.assertTrue(policy.loop.is_closed())
      69              self.assertTrue(policy.loop.shutdown_ag_run)
      70  
      71          asyncio.set_event_loop_policy(None)
      72          super().tearDown()
      73  
      74  
      75  class ESC[4;38;5;81mRunTests(ESC[4;38;5;149mBaseTest):
      76  
      77      def test_asyncio_run_return(self):
      78          async def main():
      79              await asyncio.sleep(0)
      80              return 42
      81  
      82          self.assertEqual(asyncio.run(main()), 42)
      83  
      84      def test_asyncio_run_raises(self):
      85          async def main():
      86              await asyncio.sleep(0)
      87              raise ValueError('spam')
      88  
      89          with self.assertRaisesRegex(ValueError, 'spam'):
      90              asyncio.run(main())
      91  
      92      def test_asyncio_run_only_coro(self):
      93          for o in {1, lambda: None}:
      94              with self.subTest(obj=o), \
      95                      self.assertRaisesRegex(ValueError,
      96                                             'a coroutine was expected'):
      97                  asyncio.run(o)
      98  
      99      def test_asyncio_run_debug(self):
     100          async def main(expected):
     101              loop = asyncio.get_event_loop()
     102              self.assertIs(loop.get_debug(), expected)
     103  
     104          asyncio.run(main(False), debug=False)
     105          asyncio.run(main(True), debug=True)
     106          with mock.patch('asyncio.coroutines._is_debug_mode', lambda: True):
     107              asyncio.run(main(True))
     108              asyncio.run(main(False), debug=False)
     109          with mock.patch('asyncio.coroutines._is_debug_mode', lambda: False):
     110              asyncio.run(main(True), debug=True)
     111              asyncio.run(main(False))
     112  
     113      def test_asyncio_run_from_running_loop(self):
     114          async def main():
     115              coro = main()
     116              try:
     117                  asyncio.run(coro)
     118              finally:
     119                  coro.close()  # Suppress ResourceWarning
     120  
     121          with self.assertRaisesRegex(RuntimeError,
     122                                      'cannot be called from a running'):
     123              asyncio.run(main())
     124  
     125      def test_asyncio_run_cancels_hanging_tasks(self):
     126          lo_task = None
     127  
     128          async def leftover():
     129              await asyncio.sleep(0.1)
     130  
     131          async def main():
     132              nonlocal lo_task
     133              lo_task = asyncio.create_task(leftover())
     134              return 123
     135  
     136          self.assertEqual(asyncio.run(main()), 123)
     137          self.assertTrue(lo_task.done())
     138  
     139      def test_asyncio_run_reports_hanging_tasks_errors(self):
     140          lo_task = None
     141          call_exc_handler_mock = mock.Mock()
     142  
     143          async def leftover():
     144              try:
     145                  await asyncio.sleep(0.1)
     146              except asyncio.CancelledError:
     147                  1 / 0
     148  
     149          async def main():
     150              loop = asyncio.get_running_loop()
     151              loop.call_exception_handler = call_exc_handler_mock
     152  
     153              nonlocal lo_task
     154              lo_task = asyncio.create_task(leftover())
     155              return 123
     156  
     157          self.assertEqual(asyncio.run(main()), 123)
     158          self.assertTrue(lo_task.done())
     159  
     160          call_exc_handler_mock.assert_called_with({
     161              'message': test_utils.MockPattern(r'asyncio.run.*shutdown'),
     162              'task': lo_task,
     163              'exception': test_utils.MockInstanceOf(ZeroDivisionError)
     164          })
     165  
     166      def test_asyncio_run_closes_gens_after_hanging_tasks_errors(self):
     167          spinner = None
     168          lazyboy = None
     169  
     170          class ESC[4;38;5;81mFancyExit(ESC[4;38;5;149mException):
     171              pass
     172  
     173          async def fidget():
     174              while True:
     175                  yield 1
     176                  await asyncio.sleep(1)
     177  
     178          async def spin():
     179              nonlocal spinner
     180              spinner = fidget()
     181              try:
     182                  async for the_meaning_of_life in spinner:  # NoQA
     183                      pass
     184              except asyncio.CancelledError:
     185                  1 / 0
     186  
     187          async def main():
     188              loop = asyncio.get_running_loop()
     189              loop.call_exception_handler = mock.Mock()
     190  
     191              nonlocal lazyboy
     192              lazyboy = asyncio.create_task(spin())
     193              raise FancyExit
     194  
     195          with self.assertRaises(FancyExit):
     196              asyncio.run(main())
     197  
     198          self.assertTrue(lazyboy.done())
     199  
     200          self.assertIsNone(spinner.ag_frame)
     201          self.assertFalse(spinner.ag_running)
     202  
     203      def test_asyncio_run_set_event_loop(self):
     204          #See https://github.com/python/cpython/issues/93896
     205  
     206          async def main():
     207              await asyncio.sleep(0)
     208              return 42
     209  
     210          policy = asyncio.get_event_loop_policy()
     211          policy.set_event_loop = mock.Mock()
     212          asyncio.run(main())
     213          self.assertTrue(policy.set_event_loop.called)
     214  
     215      def test_asyncio_run_without_uncancel(self):
     216          # See https://github.com/python/cpython/issues/95097
     217          class ESC[4;38;5;81mTask:
     218              def __init__(self, loop, coro, **kwargs):
     219                  self._task = asyncio.Task(coro, loop=loop, **kwargs)
     220  
     221              def cancel(self, *args, **kwargs):
     222                  return self._task.cancel(*args, **kwargs)
     223  
     224              def add_done_callback(self, *args, **kwargs):
     225                  return self._task.add_done_callback(*args, **kwargs)
     226  
     227              def remove_done_callback(self, *args, **kwargs):
     228                  return self._task.remove_done_callback(*args, **kwargs)
     229  
     230              @property
     231              def _asyncio_future_blocking(self):
     232                  return self._task._asyncio_future_blocking
     233  
     234              def result(self, *args, **kwargs):
     235                  return self._task.result(*args, **kwargs)
     236  
     237              def done(self, *args, **kwargs):
     238                  return self._task.done(*args, **kwargs)
     239  
     240              def cancelled(self, *args, **kwargs):
     241                  return self._task.cancelled(*args, **kwargs)
     242  
     243              def exception(self, *args, **kwargs):
     244                  return self._task.exception(*args, **kwargs)
     245  
     246              def get_loop(self, *args, **kwargs):
     247                  return self._task.get_loop(*args, **kwargs)
     248  
     249  
     250          async def main():
     251              interrupt_self()
     252              await asyncio.Event().wait()
     253  
     254          def new_event_loop():
     255              loop = self.new_loop()
     256              loop.set_task_factory(Task)
     257              return loop
     258  
     259          asyncio.set_event_loop_policy(TestPolicy(new_event_loop))
     260          with self.assertRaises(asyncio.CancelledError):
     261              asyncio.run(main())
     262  
     263      def test_asyncio_run_loop_factory(self):
     264          factory = mock.Mock()
     265          loop = factory.return_value = self.new_loop()
     266  
     267          async def main():
     268              self.assertEqual(asyncio.get_running_loop(), loop)
     269  
     270          asyncio.run(main(), loop_factory=factory)
     271          factory.assert_called_once_with()
     272  
     273  
     274  class ESC[4;38;5;81mRunnerTests(ESC[4;38;5;149mBaseTest):
     275  
     276      def test_non_debug(self):
     277          with asyncio.Runner(debug=False) as runner:
     278              self.assertFalse(runner.get_loop().get_debug())
     279  
     280      def test_debug(self):
     281          with asyncio.Runner(debug=True) as runner:
     282              self.assertTrue(runner.get_loop().get_debug())
     283  
     284      def test_custom_factory(self):
     285          loop = mock.Mock()
     286          with asyncio.Runner(loop_factory=lambda: loop) as runner:
     287              self.assertIs(runner.get_loop(), loop)
     288  
     289      def test_run(self):
     290          async def f():
     291              await asyncio.sleep(0)
     292              return 'done'
     293  
     294          with asyncio.Runner() as runner:
     295              self.assertEqual('done', runner.run(f()))
     296              loop = runner.get_loop()
     297  
     298          with self.assertRaisesRegex(
     299              RuntimeError,
     300              "Runner is closed"
     301          ):
     302              runner.get_loop()
     303  
     304          self.assertTrue(loop.is_closed())
     305  
     306      def test_run_non_coro(self):
     307          with asyncio.Runner() as runner:
     308              with self.assertRaisesRegex(
     309                  ValueError,
     310                  "a coroutine was expected"
     311              ):
     312                  runner.run(123)
     313  
     314      def test_run_future(self):
     315          with asyncio.Runner() as runner:
     316              with self.assertRaisesRegex(
     317                  ValueError,
     318                  "a coroutine was expected"
     319              ):
     320                  fut = runner.get_loop().create_future()
     321                  runner.run(fut)
     322  
     323      def test_explicit_close(self):
     324          runner = asyncio.Runner()
     325          loop = runner.get_loop()
     326          runner.close()
     327          with self.assertRaisesRegex(
     328                  RuntimeError,
     329                  "Runner is closed"
     330          ):
     331              runner.get_loop()
     332  
     333          self.assertTrue(loop.is_closed())
     334  
     335      def test_double_close(self):
     336          runner = asyncio.Runner()
     337          loop = runner.get_loop()
     338  
     339          runner.close()
     340          self.assertTrue(loop.is_closed())
     341  
     342          # the second call is no-op
     343          runner.close()
     344          self.assertTrue(loop.is_closed())
     345  
     346      def test_second_with_block_raises(self):
     347          ret = []
     348  
     349          async def f(arg):
     350              ret.append(arg)
     351  
     352          runner = asyncio.Runner()
     353          with runner:
     354              runner.run(f(1))
     355  
     356          with self.assertRaisesRegex(
     357              RuntimeError,
     358              "Runner is closed"
     359          ):
     360              with runner:
     361                  runner.run(f(2))
     362  
     363          self.assertEqual([1], ret)
     364  
     365      def test_run_keeps_context(self):
     366          cvar = contextvars.ContextVar("cvar", default=-1)
     367  
     368          async def f(val):
     369              old = cvar.get()
     370              await asyncio.sleep(0)
     371              cvar.set(val)
     372              return old
     373  
     374          async def get_context():
     375              return contextvars.copy_context()
     376  
     377          with asyncio.Runner() as runner:
     378              self.assertEqual(-1, runner.run(f(1)))
     379              self.assertEqual(1, runner.run(f(2)))
     380  
     381              self.assertEqual(2, runner.run(get_context()).get(cvar))
     382  
     383      def test_recursive_run(self):
     384          async def g():
     385              pass
     386  
     387          async def f():
     388              runner.run(g())
     389  
     390          with asyncio.Runner() as runner:
     391              with self.assertWarnsRegex(
     392                  RuntimeWarning,
     393                  "coroutine .+ was never awaited",
     394              ):
     395                  with self.assertRaisesRegex(
     396                      RuntimeError,
     397                      re.escape(
     398                          "Runner.run() cannot be called from a running event loop"
     399                      ),
     400                  ):
     401                      runner.run(f())
     402  
     403      def test_interrupt_call_soon(self):
     404          # The only case when task is not suspended by waiting a future
     405          # or another task
     406          assert threading.current_thread() is threading.main_thread()
     407  
     408          async def coro():
     409              with self.assertRaises(asyncio.CancelledError):
     410                  while True:
     411                      await asyncio.sleep(0)
     412              raise asyncio.CancelledError()
     413  
     414          with asyncio.Runner() as runner:
     415              runner.get_loop().call_later(0.1, interrupt_self)
     416              with self.assertRaises(KeyboardInterrupt):
     417                  runner.run(coro())
     418  
     419      def test_interrupt_wait(self):
     420          # interrupting when waiting a future cancels both future and main task
     421          assert threading.current_thread() is threading.main_thread()
     422  
     423          async def coro(fut):
     424              with self.assertRaises(asyncio.CancelledError):
     425                  await fut
     426              raise asyncio.CancelledError()
     427  
     428          with asyncio.Runner() as runner:
     429              fut = runner.get_loop().create_future()
     430              runner.get_loop().call_later(0.1, interrupt_self)
     431  
     432              with self.assertRaises(KeyboardInterrupt):
     433                  runner.run(coro(fut))
     434  
     435              self.assertTrue(fut.cancelled())
     436  
     437      def test_interrupt_cancelled_task(self):
     438          # interrupting cancelled main task doesn't raise KeyboardInterrupt
     439          assert threading.current_thread() is threading.main_thread()
     440  
     441          async def subtask(task):
     442              await asyncio.sleep(0)
     443              task.cancel()
     444              interrupt_self()
     445  
     446          async def coro():
     447              asyncio.create_task(subtask(asyncio.current_task()))
     448              await asyncio.sleep(10)
     449  
     450          with asyncio.Runner() as runner:
     451              with self.assertRaises(asyncio.CancelledError):
     452                  runner.run(coro())
     453  
     454      def test_signal_install_not_supported_ok(self):
     455          # signal.signal() can throw if the "main thread" doesn't have signals enabled
     456          assert threading.current_thread() is threading.main_thread()
     457  
     458          async def coro():
     459              pass
     460  
     461          with asyncio.Runner() as runner:
     462              with patch.object(
     463                  signal,
     464                  "signal",
     465                  side_effect=ValueError(
     466                      "signal only works in main thread of the main interpreter"
     467                  )
     468              ):
     469                  runner.run(coro())
     470  
     471      def test_set_event_loop_called_once(self):
     472          # See https://github.com/python/cpython/issues/95736
     473          async def coro():
     474              pass
     475  
     476          policy = asyncio.get_event_loop_policy()
     477          policy.set_event_loop = mock.Mock()
     478          runner = asyncio.Runner()
     479          runner.run(coro())
     480          runner.run(coro())
     481  
     482          self.assertEqual(1, policy.set_event_loop.call_count)
     483          runner.close()
     484  
     485  
     486  if __name__ == '__main__':
     487      unittest.main()