(root)/
Python-3.11.7/
Lib/
asyncio/
__main__.py
       1  import ast
       2  import asyncio
       3  import code
       4  import concurrent.futures
       5  import inspect
       6  import sys
       7  import threading
       8  import types
       9  import warnings
      10  
      11  from . import futures
      12  
      13  
      14  class ESC[4;38;5;81mAsyncIOInteractiveConsole(ESC[4;38;5;149mcodeESC[4;38;5;149m.ESC[4;38;5;149mInteractiveConsole):
      15  
      16      def __init__(self, locals, loop):
      17          super().__init__(locals)
      18          self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT
      19  
      20          self.loop = loop
      21  
      22      def runcode(self, code):
      23          future = concurrent.futures.Future()
      24  
      25          def callback():
      26              global repl_future
      27              global repl_future_interrupted
      28  
      29              repl_future = None
      30              repl_future_interrupted = False
      31  
      32              func = types.FunctionType(code, self.locals)
      33              try:
      34                  coro = func()
      35              except SystemExit:
      36                  raise
      37              except KeyboardInterrupt as ex:
      38                  repl_future_interrupted = True
      39                  future.set_exception(ex)
      40                  return
      41              except BaseException as ex:
      42                  future.set_exception(ex)
      43                  return
      44  
      45              if not inspect.iscoroutine(coro):
      46                  future.set_result(coro)
      47                  return
      48  
      49              try:
      50                  repl_future = self.loop.create_task(coro)
      51                  futures._chain_future(repl_future, future)
      52              except BaseException as exc:
      53                  future.set_exception(exc)
      54  
      55          loop.call_soon_threadsafe(callback)
      56  
      57          try:
      58              return future.result()
      59          except SystemExit:
      60              raise
      61          except BaseException:
      62              if repl_future_interrupted:
      63                  self.write("\nKeyboardInterrupt\n")
      64              else:
      65                  self.showtraceback()
      66  
      67  
      68  class ESC[4;38;5;81mREPLThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
      69  
      70      def run(self):
      71          try:
      72              banner = (
      73                  f'asyncio REPL {sys.version} on {sys.platform}\n'
      74                  f'Use "await" directly instead of "asyncio.run()".\n'
      75                  f'Type "help", "copyright", "credits" or "license" '
      76                  f'for more information.\n'
      77                  f'{getattr(sys, "ps1", ">>> ")}import asyncio'
      78              )
      79  
      80              console.interact(
      81                  banner=banner,
      82                  exitmsg='exiting asyncio REPL...')
      83          finally:
      84              warnings.filterwarnings(
      85                  'ignore',
      86                  message=r'^coroutine .* was never awaited$',
      87                  category=RuntimeWarning)
      88  
      89              loop.call_soon_threadsafe(loop.stop)
      90  
      91  
      92  if __name__ == '__main__':
      93      loop = asyncio.new_event_loop()
      94      asyncio.set_event_loop(loop)
      95  
      96      repl_locals = {'asyncio': asyncio}
      97      for key in {'__name__', '__package__',
      98                  '__loader__', '__spec__',
      99                  '__builtins__', '__file__'}:
     100          repl_locals[key] = locals()[key]
     101  
     102      console = AsyncIOInteractiveConsole(repl_locals, loop)
     103  
     104      repl_future = None
     105      repl_future_interrupted = False
     106  
     107      try:
     108          import readline  # NoQA
     109      except ImportError:
     110          pass
     111  
     112      repl_thread = REPLThread()
     113      repl_thread.daemon = True
     114      repl_thread.start()
     115  
     116      while True:
     117          try:
     118              loop.run_forever()
     119          except KeyboardInterrupt:
     120              if repl_future and not repl_future.done():
     121                  repl_future.cancel()
     122                  repl_future_interrupted = True
     123              continue
     124          else:
     125              break