(root)/
Python-3.11.7/
Lib/
asyncio/
runners.py
       1  __all__ = ('Runner', 'run')
       2  
       3  import contextvars
       4  import enum
       5  import functools
       6  import threading
       7  import signal
       8  import sys
       9  from . import coroutines
      10  from . import events
      11  from . import exceptions
      12  from . import tasks
      13  
      14  
      15  class ESC[4;38;5;81m_State(ESC[4;38;5;149menumESC[4;38;5;149m.ESC[4;38;5;149mEnum):
      16      CREATED = "created"
      17      INITIALIZED = "initialized"
      18      CLOSED = "closed"
      19  
      20  
      21  class ESC[4;38;5;81mRunner:
      22      """A context manager that controls event loop life cycle.
      23  
      24      The context manager always creates a new event loop,
      25      allows to run async functions inside it,
      26      and properly finalizes the loop at the context manager exit.
      27  
      28      If debug is True, the event loop will be run in debug mode.
      29      If loop_factory is passed, it is used for new event loop creation.
      30  
      31      asyncio.run(main(), debug=True)
      32  
      33      is a shortcut for
      34  
      35      with asyncio.Runner(debug=True) as runner:
      36          runner.run(main())
      37  
      38      The run() method can be called multiple times within the runner's context.
      39  
      40      This can be useful for interactive console (e.g. IPython),
      41      unittest runners, console tools, -- everywhere when async code
      42      is called from existing sync framework and where the preferred single
      43      asyncio.run() call doesn't work.
      44  
      45      """
      46  
      47      # Note: the class is final, it is not intended for inheritance.
      48  
      49      def __init__(self, *, debug=None, loop_factory=None):
      50          self._state = _State.CREATED
      51          self._debug = debug
      52          self._loop_factory = loop_factory
      53          self._loop = None
      54          self._context = None
      55          self._interrupt_count = 0
      56          self._set_event_loop = False
      57  
      58      def __enter__(self):
      59          self._lazy_init()
      60          return self
      61  
      62      def __exit__(self, exc_type, exc_val, exc_tb):
      63          self.close()
      64  
      65      def close(self):
      66          """Shutdown and close event loop."""
      67          if self._state is not _State.INITIALIZED:
      68              return
      69          try:
      70              loop = self._loop
      71              _cancel_all_tasks(loop)
      72              loop.run_until_complete(loop.shutdown_asyncgens())
      73              loop.run_until_complete(loop.shutdown_default_executor())
      74          finally:
      75              if self._set_event_loop:
      76                  events.set_event_loop(None)
      77              loop.close()
      78              self._loop = None
      79              self._state = _State.CLOSED
      80  
      81      def get_loop(self):
      82          """Return embedded event loop."""
      83          self._lazy_init()
      84          return self._loop
      85  
      86      def run(self, coro, *, context=None):
      87          """Run a coroutine inside the embedded event loop."""
      88          if not coroutines.iscoroutine(coro):
      89              raise ValueError("a coroutine was expected, got {!r}".format(coro))
      90  
      91          if events._get_running_loop() is not None:
      92              # fail fast with short traceback
      93              raise RuntimeError(
      94                  "Runner.run() cannot be called from a running event loop")
      95  
      96          self._lazy_init()
      97  
      98          if context is None:
      99              context = self._context
     100          task = self._loop.create_task(coro, context=context)
     101  
     102          if (threading.current_thread() is threading.main_thread()
     103              and signal.getsignal(signal.SIGINT) is signal.default_int_handler
     104          ):
     105              sigint_handler = functools.partial(self._on_sigint, main_task=task)
     106              try:
     107                  signal.signal(signal.SIGINT, sigint_handler)
     108              except ValueError:
     109                  # `signal.signal` may throw if `threading.main_thread` does
     110                  # not support signals (e.g. embedded interpreter with signals
     111                  # not registered - see gh-91880)
     112                  sigint_handler = None
     113          else:
     114              sigint_handler = None
     115  
     116          self._interrupt_count = 0
     117          try:
     118              return self._loop.run_until_complete(task)
     119          except exceptions.CancelledError:
     120              if self._interrupt_count > 0:
     121                  uncancel = getattr(task, "uncancel", None)
     122                  if uncancel is not None and uncancel() == 0:
     123                      raise KeyboardInterrupt()
     124              raise  # CancelledError
     125          finally:
     126              if (sigint_handler is not None
     127                  and signal.getsignal(signal.SIGINT) is sigint_handler
     128              ):
     129                  signal.signal(signal.SIGINT, signal.default_int_handler)
     130  
     131      def _lazy_init(self):
     132          if self._state is _State.CLOSED:
     133              raise RuntimeError("Runner is closed")
     134          if self._state is _State.INITIALIZED:
     135              return
     136          if self._loop_factory is None:
     137              self._loop = events.new_event_loop()
     138              if not self._set_event_loop:
     139                  # Call set_event_loop only once to avoid calling
     140                  # attach_loop multiple times on child watchers
     141                  events.set_event_loop(self._loop)
     142                  self._set_event_loop = True
     143          else:
     144              self._loop = self._loop_factory()
     145          if self._debug is not None:
     146              self._loop.set_debug(self._debug)
     147          self._context = contextvars.copy_context()
     148          self._state = _State.INITIALIZED
     149  
     150      def _on_sigint(self, signum, frame, main_task):
     151          self._interrupt_count += 1
     152          if self._interrupt_count == 1 and not main_task.done():
     153              main_task.cancel()
     154              # wakeup loop if it is blocked by select() with long timeout
     155              self._loop.call_soon_threadsafe(lambda: None)
     156              return
     157          raise KeyboardInterrupt()
     158  
     159  
     160  def run(main, *, debug=None):
     161      """Execute the coroutine and return the result.
     162  
     163      This function runs the passed coroutine, taking care of
     164      managing the asyncio event loop and finalizing asynchronous
     165      generators.
     166  
     167      This function cannot be called when another asyncio event loop is
     168      running in the same thread.
     169  
     170      If debug is True, the event loop will be run in debug mode.
     171  
     172      This function always creates a new event loop and closes it at the end.
     173      It should be used as a main entry point for asyncio programs, and should
     174      ideally only be called once.
     175  
     176      Example:
     177  
     178          async def main():
     179              await asyncio.sleep(1)
     180              print('hello')
     181  
     182          asyncio.run(main())
     183      """
     184      if events._get_running_loop() is not None:
     185          # fail fast with short traceback
     186          raise RuntimeError(
     187              "asyncio.run() cannot be called from a running event loop")
     188  
     189      with Runner(debug=debug) as runner:
     190          return runner.run(main)
     191  
     192  
     193  def _cancel_all_tasks(loop):
     194      to_cancel = tasks.all_tasks(loop)
     195      if not to_cancel:
     196          return
     197  
     198      for task in to_cancel:
     199          task.cancel()
     200  
     201      loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
     202  
     203      for task in to_cancel:
     204          if task.cancelled():
     205              continue
     206          if task.exception() is not None:
     207              loop.call_exception_handler({
     208                  'message': 'unhandled exception during asyncio.run() shutdown',
     209                  'exception': task.exception(),
     210                  'task': task,
     211              })