(root)/
Python-3.11.7/
Lib/
test/
support/
interpreters.py
       1  """Subinterpreters High Level Module."""
       2  
       3  import time
       4  import _xxsubinterpreters as _interpreters
       5  
       6  # aliases:
       7  from _xxsubinterpreters import (
       8      ChannelError, ChannelNotFoundError, ChannelEmptyError,
       9      is_shareable,
      10  )
      11  
      12  
      13  __all__ = [
      14      'Interpreter', 'get_current', 'get_main', 'create', 'list_all',
      15      'SendChannel', 'RecvChannel',
      16      'create_channel', 'list_all_channels', 'is_shareable',
      17      'ChannelError', 'ChannelNotFoundError',
      18      'ChannelEmptyError',
      19      ]
      20  
      21  
      22  def create(*, isolated=True):
      23      """Return a new (idle) Python interpreter."""
      24      id = _interpreters.create(isolated=isolated)
      25      return Interpreter(id, isolated=isolated)
      26  
      27  
      28  def list_all():
      29      """Return all existing interpreters."""
      30      return [Interpreter(id) for id in _interpreters.list_all()]
      31  
      32  
      33  def get_current():
      34      """Return the currently running interpreter."""
      35      id = _interpreters.get_current()
      36      return Interpreter(id)
      37  
      38  
      39  def get_main():
      40      """Return the main interpreter."""
      41      id = _interpreters.get_main()
      42      return Interpreter(id)
      43  
      44  
      45  class ESC[4;38;5;81mInterpreter:
      46      """A single Python interpreter."""
      47  
      48      def __init__(self, id, *, isolated=None):
      49          if not isinstance(id, (int, _interpreters.InterpreterID)):
      50              raise TypeError(f'id must be an int, got {id!r}')
      51          self._id = id
      52          self._isolated = isolated
      53  
      54      def __repr__(self):
      55          data = dict(id=int(self._id), isolated=self._isolated)
      56          kwargs = (f'{k}={v!r}' for k, v in data.items())
      57          return f'{type(self).__name__}({", ".join(kwargs)})'
      58  
      59      def __hash__(self):
      60          return hash(self._id)
      61  
      62      def __eq__(self, other):
      63          if not isinstance(other, Interpreter):
      64              return NotImplemented
      65          else:
      66              return other._id == self._id
      67  
      68      @property
      69      def id(self):
      70          return self._id
      71  
      72      @property
      73      def isolated(self):
      74          if self._isolated is None:
      75              # XXX The low-level function has not been added yet.
      76              # See bpo-....
      77              self._isolated = _interpreters.is_isolated(self._id)
      78          return self._isolated
      79  
      80      def is_running(self):
      81          """Return whether or not the identified interpreter is running."""
      82          return _interpreters.is_running(self._id)
      83  
      84      def close(self):
      85          """Finalize and destroy the interpreter.
      86  
      87          Attempting to destroy the current interpreter results
      88          in a RuntimeError.
      89          """
      90          return _interpreters.destroy(self._id)
      91  
      92      def run(self, src_str, /, *, channels=None):
      93          """Run the given source code in the interpreter.
      94  
      95          This blocks the current Python thread until done.
      96          """
      97          _interpreters.run_string(self._id, src_str, channels)
      98  
      99  
     100  def create_channel():
     101      """Return (recv, send) for a new cross-interpreter channel.
     102  
     103      The channel may be used to pass data safely between interpreters.
     104      """
     105      cid = _interpreters.channel_create()
     106      recv, send = RecvChannel(cid), SendChannel(cid)
     107      return recv, send
     108  
     109  
     110  def list_all_channels():
     111      """Return a list of (recv, send) for all open channels."""
     112      return [(RecvChannel(cid), SendChannel(cid))
     113              for cid in _interpreters.channel_list_all()]
     114  
     115  
     116  class ESC[4;38;5;81m_ChannelEnd:
     117      """The base class for RecvChannel and SendChannel."""
     118  
     119      def __init__(self, id):
     120          if not isinstance(id, (int, _interpreters.ChannelID)):
     121              raise TypeError(f'id must be an int, got {id!r}')
     122          self._id = id
     123  
     124      def __repr__(self):
     125          return f'{type(self).__name__}(id={int(self._id)})'
     126  
     127      def __hash__(self):
     128          return hash(self._id)
     129  
     130      def __eq__(self, other):
     131          if isinstance(self, RecvChannel):
     132              if not isinstance(other, RecvChannel):
     133                  return NotImplemented
     134          elif not isinstance(other, SendChannel):
     135              return NotImplemented
     136          return other._id == self._id
     137  
     138      @property
     139      def id(self):
     140          return self._id
     141  
     142  
     143  _NOT_SET = object()
     144  
     145  
     146  class ESC[4;38;5;81mRecvChannel(ESC[4;38;5;149m_ChannelEnd):
     147      """The receiving end of a cross-interpreter channel."""
     148  
     149      def recv(self, *, _sentinel=object(), _delay=10 / 1000):  # 10 milliseconds
     150          """Return the next object from the channel.
     151  
     152          This blocks until an object has been sent, if none have been
     153          sent already.
     154          """
     155          obj = _interpreters.channel_recv(self._id, _sentinel)
     156          while obj is _sentinel:
     157              time.sleep(_delay)
     158              obj = _interpreters.channel_recv(self._id, _sentinel)
     159          return obj
     160  
     161      def recv_nowait(self, default=_NOT_SET):
     162          """Return the next object from the channel.
     163  
     164          If none have been sent then return the default if one
     165          is provided or fail with ChannelEmptyError.  Otherwise this
     166          is the same as recv().
     167          """
     168          if default is _NOT_SET:
     169              return _interpreters.channel_recv(self._id)
     170          else:
     171              return _interpreters.channel_recv(self._id, default)
     172  
     173  
     174  class ESC[4;38;5;81mSendChannel(ESC[4;38;5;149m_ChannelEnd):
     175      """The sending end of a cross-interpreter channel."""
     176  
     177      def send(self, obj):
     178          """Send the object (i.e. its data) to the channel's receiving end.
     179  
     180          This blocks until the object is received.
     181          """
     182          _interpreters.channel_send(self._id, obj)
     183          # XXX We are missing a low-level channel_send_wait().
     184          # See bpo-32604 and gh-19829.
     185          # Until that shows up we fake it:
     186          time.sleep(2)
     187  
     188      def send_nowait(self, obj):
     189          """Send the object to the channel's receiving end.
     190  
     191          If the object is immediately received then return True
     192          (else False).  Otherwise this is the same as send().
     193          """
     194          # XXX Note that at the moment channel_send() only ever returns
     195          # None.  This should be fixed when channel_send_wait() is added.
     196          # See bpo-32604 and gh-19829.
     197          return _interpreters.channel_send(self._id, obj)