(root)/
Python-3.11.7/
Lib/
test/
test__xxsubinterpreters.py
       1  from collections import namedtuple
       2  import contextlib
       3  import itertools
       4  import os
       5  import pickle
       6  import sys
       7  from textwrap import dedent
       8  import threading
       9  import time
      10  import unittest
      11  
      12  from test import support
      13  from test.support import import_helper
      14  from test.support import script_helper
      15  
      16  
      17  interpreters = import_helper.import_module('_xxsubinterpreters')
      18  
      19  
      20  ##################################
      21  # helpers
      22  
      23  def _captured_script(script):
      24      r, w = os.pipe()
      25      indented = script.replace('\n', '\n                ')
      26      wrapped = dedent(f"""
      27          import contextlib
      28          with open({w}, 'w', encoding="utf-8") as spipe:
      29              with contextlib.redirect_stdout(spipe):
      30                  {indented}
      31          """)
      32      return wrapped, open(r, encoding="utf-8")
      33  
      34  
      35  def _run_output(interp, request, shared=None):
      36      script, rpipe = _captured_script(request)
      37      with rpipe:
      38          interpreters.run_string(interp, script, shared)
      39          return rpipe.read()
      40  
      41  
      42  def _wait_for_interp_to_run(interp, timeout=None):
      43      # bpo-37224: Running this test file in multiprocesses will fail randomly.
      44      # The failure reason is that the thread can't acquire the cpu to
      45      # run subinterpreter eariler than the main thread in multiprocess.
      46      if timeout is None:
      47          timeout = support.SHORT_TIMEOUT
      48      for _ in support.sleeping_retry(timeout, error=False):
      49          if interpreters.is_running(interp):
      50              break
      51      else:
      52          raise RuntimeError('interp is not running')
      53  
      54  
      55  @contextlib.contextmanager
      56  def _running(interp):
      57      r, w = os.pipe()
      58      def run():
      59          interpreters.run_string(interp, dedent(f"""
      60              # wait for "signal"
      61              with open({r}, encoding="utf-8") as rpipe:
      62                  rpipe.read()
      63              """))
      64  
      65      t = threading.Thread(target=run)
      66      t.start()
      67      _wait_for_interp_to_run(interp)
      68  
      69      yield
      70  
      71      with open(w, 'w', encoding="utf-8") as spipe:
      72          spipe.write('done')
      73      t.join()
      74  
      75  
      76  #@contextmanager
      77  #def run_threaded(id, source, **shared):
      78  #    def run():
      79  #        run_interp(id, source, **shared)
      80  #    t = threading.Thread(target=run)
      81  #    t.start()
      82  #    yield
      83  #    t.join()
      84  
      85  
      86  def run_interp(id, source, **shared):
      87      _run_interp(id, source, shared)
      88  
      89  
      90  def _run_interp(id, source, shared, _mainns={}):
      91      source = dedent(source)
      92      main = interpreters.get_main()
      93      if main == id:
      94          if interpreters.get_current() != main:
      95              raise RuntimeError
      96          # XXX Run a func?
      97          exec(source, _mainns)
      98      else:
      99          interpreters.run_string(id, source, shared)
     100  
     101  
     102  class ESC[4;38;5;81mInterpreter(ESC[4;38;5;149mnamedtuple('Interpreter', 'name id')):
     103  
     104      @classmethod
     105      def from_raw(cls, raw):
     106          if isinstance(raw, cls):
     107              return raw
     108          elif isinstance(raw, str):
     109              return cls(raw)
     110          else:
     111              raise NotImplementedError
     112  
     113      def __new__(cls, name=None, id=None):
     114          main = interpreters.get_main()
     115          if id == main:
     116              if not name:
     117                  name = 'main'
     118              elif name != 'main':
     119                  raise ValueError(
     120                      'name mismatch (expected "main", got "{}")'.format(name))
     121              id = main
     122          elif id is not None:
     123              if not name:
     124                  name = 'interp'
     125              elif name == 'main':
     126                  raise ValueError('name mismatch (unexpected "main")')
     127              if not isinstance(id, interpreters.InterpreterID):
     128                  id = interpreters.InterpreterID(id)
     129          elif not name or name == 'main':
     130              name = 'main'
     131              id = main
     132          else:
     133              id = interpreters.create()
     134          self = super().__new__(cls, name, id)
     135          return self
     136  
     137  
     138  # XXX expect_channel_closed() is unnecessary once we improve exc propagation.
     139  
     140  @contextlib.contextmanager
     141  def expect_channel_closed():
     142      try:
     143          yield
     144      except interpreters.ChannelClosedError:
     145          pass
     146      else:
     147          assert False, 'channel not closed'
     148  
     149  
     150  class ESC[4;38;5;81mChannelAction(ESC[4;38;5;149mnamedtuple('ChannelAction', 'action end interp')):
     151  
     152      def __new__(cls, action, end=None, interp=None):
     153          if not end:
     154              end = 'both'
     155          if not interp:
     156              interp = 'main'
     157          self = super().__new__(cls, action, end, interp)
     158          return self
     159  
     160      def __init__(self, *args, **kwargs):
     161          if self.action == 'use':
     162              if self.end not in ('same', 'opposite', 'send', 'recv'):
     163                  raise ValueError(self.end)
     164          elif self.action in ('close', 'force-close'):
     165              if self.end not in ('both', 'same', 'opposite', 'send', 'recv'):
     166                  raise ValueError(self.end)
     167          else:
     168              raise ValueError(self.action)
     169          if self.interp not in ('main', 'same', 'other', 'extra'):
     170              raise ValueError(self.interp)
     171  
     172      def resolve_end(self, end):
     173          if self.end == 'same':
     174              return end
     175          elif self.end == 'opposite':
     176              return 'recv' if end == 'send' else 'send'
     177          else:
     178              return self.end
     179  
     180      def resolve_interp(self, interp, other, extra):
     181          if self.interp == 'same':
     182              return interp
     183          elif self.interp == 'other':
     184              if other is None:
     185                  raise RuntimeError
     186              return other
     187          elif self.interp == 'extra':
     188              if extra is None:
     189                  raise RuntimeError
     190              return extra
     191          elif self.interp == 'main':
     192              if interp.name == 'main':
     193                  return interp
     194              elif other and other.name == 'main':
     195                  return other
     196              else:
     197                  raise RuntimeError
     198          # Per __init__(), there aren't any others.
     199  
     200  
     201  class ESC[4;38;5;81mChannelState(ESC[4;38;5;149mnamedtuple('ChannelState', 'pending closed')):
     202  
     203      def __new__(cls, pending=0, *, closed=False):
     204          self = super().__new__(cls, pending, closed)
     205          return self
     206  
     207      def incr(self):
     208          return type(self)(self.pending + 1, closed=self.closed)
     209  
     210      def decr(self):
     211          return type(self)(self.pending - 1, closed=self.closed)
     212  
     213      def close(self, *, force=True):
     214          if self.closed:
     215              if not force or self.pending == 0:
     216                  return self
     217          return type(self)(0 if force else self.pending, closed=True)
     218  
     219  
     220  def run_action(cid, action, end, state, *, hideclosed=True):
     221      if state.closed:
     222          if action == 'use' and end == 'recv' and state.pending:
     223              expectfail = False
     224          else:
     225              expectfail = True
     226      else:
     227          expectfail = False
     228  
     229      try:
     230          result = _run_action(cid, action, end, state)
     231      except interpreters.ChannelClosedError:
     232          if not hideclosed and not expectfail:
     233              raise
     234          result = state.close()
     235      else:
     236          if expectfail:
     237              raise ...  # XXX
     238      return result
     239  
     240  
     241  def _run_action(cid, action, end, state):
     242      if action == 'use':
     243          if end == 'send':
     244              interpreters.channel_send(cid, b'spam')
     245              return state.incr()
     246          elif end == 'recv':
     247              if not state.pending:
     248                  try:
     249                      interpreters.channel_recv(cid)
     250                  except interpreters.ChannelEmptyError:
     251                      return state
     252                  else:
     253                      raise Exception('expected ChannelEmptyError')
     254              else:
     255                  interpreters.channel_recv(cid)
     256                  return state.decr()
     257          else:
     258              raise ValueError(end)
     259      elif action == 'close':
     260          kwargs = {}
     261          if end in ('recv', 'send'):
     262              kwargs[end] = True
     263          interpreters.channel_close(cid, **kwargs)
     264          return state.close()
     265      elif action == 'force-close':
     266          kwargs = {
     267              'force': True,
     268              }
     269          if end in ('recv', 'send'):
     270              kwargs[end] = True
     271          interpreters.channel_close(cid, **kwargs)
     272          return state.close(force=True)
     273      else:
     274          raise ValueError(action)
     275  
     276  
     277  def clean_up_interpreters():
     278      for id in interpreters.list_all():
     279          if id == 0:  # main
     280              continue
     281          try:
     282              interpreters.destroy(id)
     283          except RuntimeError:
     284              pass  # already destroyed
     285  
     286  
     287  def clean_up_channels():
     288      for cid in interpreters.channel_list_all():
     289          try:
     290              interpreters.channel_destroy(cid)
     291          except interpreters.ChannelNotFoundError:
     292              pass  # already destroyed
     293  
     294  
     295  class ESC[4;38;5;81mTestBase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     296  
     297      def tearDown(self):
     298          clean_up_interpreters()
     299          clean_up_channels()
     300  
     301  
     302  ##################################
     303  # misc. tests
     304  
     305  class ESC[4;38;5;81mIsShareableTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     306  
     307      def test_default_shareables(self):
     308          shareables = [
     309                  # singletons
     310                  None,
     311                  # builtin objects
     312                  b'spam',
     313                  'spam',
     314                  10,
     315                  -10,
     316                  ]
     317          for obj in shareables:
     318              with self.subTest(obj):
     319                  self.assertTrue(
     320                      interpreters.is_shareable(obj))
     321  
     322      def test_not_shareable(self):
     323          class ESC[4;38;5;81mCheese:
     324              def __init__(self, name):
     325                  self.name = name
     326              def __str__(self):
     327                  return self.name
     328  
     329          class ESC[4;38;5;81mSubBytes(ESC[4;38;5;149mbytes):
     330              """A subclass of a shareable type."""
     331  
     332          not_shareables = [
     333                  # singletons
     334                  True,
     335                  False,
     336                  NotImplemented,
     337                  ...,
     338                  # builtin types and objects
     339                  type,
     340                  object,
     341                  object(),
     342                  Exception(),
     343                  100.0,
     344                  # user-defined types and objects
     345                  Cheese,
     346                  Cheese('Wensleydale'),
     347                  SubBytes(b'spam'),
     348                  ]
     349          for obj in not_shareables:
     350              with self.subTest(repr(obj)):
     351                  self.assertFalse(
     352                      interpreters.is_shareable(obj))
     353  
     354  
     355  class ESC[4;38;5;81mShareableTypeTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     356  
     357      def setUp(self):
     358          super().setUp()
     359          self.cid = interpreters.channel_create()
     360  
     361      def tearDown(self):
     362          interpreters.channel_destroy(self.cid)
     363          super().tearDown()
     364  
     365      def _assert_values(self, values):
     366          for obj in values:
     367              with self.subTest(obj):
     368                  interpreters.channel_send(self.cid, obj)
     369                  got = interpreters.channel_recv(self.cid)
     370  
     371                  self.assertEqual(got, obj)
     372                  self.assertIs(type(got), type(obj))
     373                  # XXX Check the following in the channel tests?
     374                  #self.assertIsNot(got, obj)
     375  
     376      def test_singletons(self):
     377          for obj in [None]:
     378              with self.subTest(obj):
     379                  interpreters.channel_send(self.cid, obj)
     380                  got = interpreters.channel_recv(self.cid)
     381  
     382                  # XXX What about between interpreters?
     383                  self.assertIs(got, obj)
     384  
     385      def test_types(self):
     386          self._assert_values([
     387              b'spam',
     388              9999,
     389              self.cid,
     390              ])
     391  
     392      def test_bytes(self):
     393          self._assert_values(i.to_bytes(2, 'little', signed=True)
     394                              for i in range(-1, 258))
     395  
     396      def test_strs(self):
     397          self._assert_values(['hello world', '你好世界', ''])
     398  
     399      def test_int(self):
     400          self._assert_values(itertools.chain(range(-1, 258),
     401                                              [sys.maxsize, -sys.maxsize - 1]))
     402  
     403      def test_non_shareable_int(self):
     404          ints = [
     405              sys.maxsize + 1,
     406              -sys.maxsize - 2,
     407              2**1000,
     408          ]
     409          for i in ints:
     410              with self.subTest(i):
     411                  with self.assertRaises(OverflowError):
     412                      interpreters.channel_send(self.cid, i)
     413  
     414  
     415  ##################################
     416  # interpreter tests
     417  
     418  class ESC[4;38;5;81mListAllTests(ESC[4;38;5;149mTestBase):
     419  
     420      def test_initial(self):
     421          main = interpreters.get_main()
     422          ids = interpreters.list_all()
     423          self.assertEqual(ids, [main])
     424  
     425      def test_after_creating(self):
     426          main = interpreters.get_main()
     427          first = interpreters.create()
     428          second = interpreters.create()
     429          ids = interpreters.list_all()
     430          self.assertEqual(ids, [main, first, second])
     431  
     432      def test_after_destroying(self):
     433          main = interpreters.get_main()
     434          first = interpreters.create()
     435          second = interpreters.create()
     436          interpreters.destroy(first)
     437          ids = interpreters.list_all()
     438          self.assertEqual(ids, [main, second])
     439  
     440  
     441  class ESC[4;38;5;81mGetCurrentTests(ESC[4;38;5;149mTestBase):
     442  
     443      def test_main(self):
     444          main = interpreters.get_main()
     445          cur = interpreters.get_current()
     446          self.assertEqual(cur, main)
     447          self.assertIsInstance(cur, interpreters.InterpreterID)
     448  
     449      def test_subinterpreter(self):
     450          main = interpreters.get_main()
     451          interp = interpreters.create()
     452          out = _run_output(interp, dedent("""
     453              import _xxsubinterpreters as _interpreters
     454              cur = _interpreters.get_current()
     455              print(cur)
     456              assert isinstance(cur, _interpreters.InterpreterID)
     457              """))
     458          cur = int(out.strip())
     459          _, expected = interpreters.list_all()
     460          self.assertEqual(cur, expected)
     461          self.assertNotEqual(cur, main)
     462  
     463  
     464  class ESC[4;38;5;81mGetMainTests(ESC[4;38;5;149mTestBase):
     465  
     466      def test_from_main(self):
     467          [expected] = interpreters.list_all()
     468          main = interpreters.get_main()
     469          self.assertEqual(main, expected)
     470          self.assertIsInstance(main, interpreters.InterpreterID)
     471  
     472      def test_from_subinterpreter(self):
     473          [expected] = interpreters.list_all()
     474          interp = interpreters.create()
     475          out = _run_output(interp, dedent("""
     476              import _xxsubinterpreters as _interpreters
     477              main = _interpreters.get_main()
     478              print(main)
     479              assert isinstance(main, _interpreters.InterpreterID)
     480              """))
     481          main = int(out.strip())
     482          self.assertEqual(main, expected)
     483  
     484  
     485  class ESC[4;38;5;81mIsRunningTests(ESC[4;38;5;149mTestBase):
     486  
     487      def test_main(self):
     488          main = interpreters.get_main()
     489          self.assertTrue(interpreters.is_running(main))
     490  
     491      @unittest.skip('Fails on FreeBSD')
     492      def test_subinterpreter(self):
     493          interp = interpreters.create()
     494          self.assertFalse(interpreters.is_running(interp))
     495  
     496          with _running(interp):
     497              self.assertTrue(interpreters.is_running(interp))
     498          self.assertFalse(interpreters.is_running(interp))
     499  
     500      def test_from_subinterpreter(self):
     501          interp = interpreters.create()
     502          out = _run_output(interp, dedent(f"""
     503              import _xxsubinterpreters as _interpreters
     504              if _interpreters.is_running({interp}):
     505                  print(True)
     506              else:
     507                  print(False)
     508              """))
     509          self.assertEqual(out.strip(), 'True')
     510  
     511      def test_already_destroyed(self):
     512          interp = interpreters.create()
     513          interpreters.destroy(interp)
     514          with self.assertRaises(RuntimeError):
     515              interpreters.is_running(interp)
     516  
     517      def test_does_not_exist(self):
     518          with self.assertRaises(RuntimeError):
     519              interpreters.is_running(1_000_000)
     520  
     521      def test_bad_id(self):
     522          with self.assertRaises(ValueError):
     523              interpreters.is_running(-1)
     524  
     525  
     526  class ESC[4;38;5;81mInterpreterIDTests(ESC[4;38;5;149mTestBase):
     527  
     528      def test_with_int(self):
     529          id = interpreters.InterpreterID(10, force=True)
     530  
     531          self.assertEqual(int(id), 10)
     532  
     533      def test_coerce_id(self):
     534          class ESC[4;38;5;81mInt(ESC[4;38;5;149mstr):
     535              def __index__(self):
     536                  return 10
     537  
     538          id = interpreters.InterpreterID(Int(), force=True)
     539          self.assertEqual(int(id), 10)
     540  
     541      def test_bad_id(self):
     542          self.assertRaises(TypeError, interpreters.InterpreterID, object())
     543          self.assertRaises(TypeError, interpreters.InterpreterID, 10.0)
     544          self.assertRaises(TypeError, interpreters.InterpreterID, '10')
     545          self.assertRaises(TypeError, interpreters.InterpreterID, b'10')
     546          self.assertRaises(ValueError, interpreters.InterpreterID, -1)
     547          self.assertRaises(OverflowError, interpreters.InterpreterID, 2**64)
     548  
     549      def test_does_not_exist(self):
     550          id = interpreters.channel_create()
     551          with self.assertRaises(RuntimeError):
     552              interpreters.InterpreterID(int(id) + 1)  # unforced
     553  
     554      def test_str(self):
     555          id = interpreters.InterpreterID(10, force=True)
     556          self.assertEqual(str(id), '10')
     557  
     558      def test_repr(self):
     559          id = interpreters.InterpreterID(10, force=True)
     560          self.assertEqual(repr(id), 'InterpreterID(10)')
     561  
     562      def test_equality(self):
     563          id1 = interpreters.create()
     564          id2 = interpreters.InterpreterID(int(id1))
     565          id3 = interpreters.create()
     566  
     567          self.assertTrue(id1 == id1)
     568          self.assertTrue(id1 == id2)
     569          self.assertTrue(id1 == int(id1))
     570          self.assertTrue(int(id1) == id1)
     571          self.assertTrue(id1 == float(int(id1)))
     572          self.assertTrue(float(int(id1)) == id1)
     573          self.assertFalse(id1 == float(int(id1)) + 0.1)
     574          self.assertFalse(id1 == str(int(id1)))
     575          self.assertFalse(id1 == 2**1000)
     576          self.assertFalse(id1 == float('inf'))
     577          self.assertFalse(id1 == 'spam')
     578          self.assertFalse(id1 == id3)
     579  
     580          self.assertFalse(id1 != id1)
     581          self.assertFalse(id1 != id2)
     582          self.assertTrue(id1 != id3)
     583  
     584  
     585  class ESC[4;38;5;81mCreateTests(ESC[4;38;5;149mTestBase):
     586  
     587      def test_in_main(self):
     588          id = interpreters.create()
     589          self.assertIsInstance(id, interpreters.InterpreterID)
     590  
     591          self.assertIn(id, interpreters.list_all())
     592  
     593      @unittest.skip('enable this test when working on pystate.c')
     594      def test_unique_id(self):
     595          seen = set()
     596          for _ in range(100):
     597              id = interpreters.create()
     598              interpreters.destroy(id)
     599              seen.add(id)
     600  
     601          self.assertEqual(len(seen), 100)
     602  
     603      def test_in_thread(self):
     604          lock = threading.Lock()
     605          id = None
     606          def f():
     607              nonlocal id
     608              id = interpreters.create()
     609              lock.acquire()
     610              lock.release()
     611  
     612          t = threading.Thread(target=f)
     613          with lock:
     614              t.start()
     615          t.join()
     616          self.assertIn(id, interpreters.list_all())
     617  
     618      def test_in_subinterpreter(self):
     619          main, = interpreters.list_all()
     620          id1 = interpreters.create()
     621          out = _run_output(id1, dedent("""
     622              import _xxsubinterpreters as _interpreters
     623              id = _interpreters.create()
     624              print(id)
     625              assert isinstance(id, _interpreters.InterpreterID)
     626              """))
     627          id2 = int(out.strip())
     628  
     629          self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
     630  
     631      def test_in_threaded_subinterpreter(self):
     632          main, = interpreters.list_all()
     633          id1 = interpreters.create()
     634          id2 = None
     635          def f():
     636              nonlocal id2
     637              out = _run_output(id1, dedent("""
     638                  import _xxsubinterpreters as _interpreters
     639                  id = _interpreters.create()
     640                  print(id)
     641                  """))
     642              id2 = int(out.strip())
     643  
     644          t = threading.Thread(target=f)
     645          t.start()
     646          t.join()
     647  
     648          self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
     649  
     650      def test_after_destroy_all(self):
     651          before = set(interpreters.list_all())
     652          # Create 3 subinterpreters.
     653          ids = []
     654          for _ in range(3):
     655              id = interpreters.create()
     656              ids.append(id)
     657          # Now destroy them.
     658          for id in ids:
     659              interpreters.destroy(id)
     660          # Finally, create another.
     661          id = interpreters.create()
     662          self.assertEqual(set(interpreters.list_all()), before | {id})
     663  
     664      def test_after_destroy_some(self):
     665          before = set(interpreters.list_all())
     666          # Create 3 subinterpreters.
     667          id1 = interpreters.create()
     668          id2 = interpreters.create()
     669          id3 = interpreters.create()
     670          # Now destroy 2 of them.
     671          interpreters.destroy(id1)
     672          interpreters.destroy(id3)
     673          # Finally, create another.
     674          id = interpreters.create()
     675          self.assertEqual(set(interpreters.list_all()), before | {id, id2})
     676  
     677  
     678  class ESC[4;38;5;81mDestroyTests(ESC[4;38;5;149mTestBase):
     679  
     680      def test_one(self):
     681          id1 = interpreters.create()
     682          id2 = interpreters.create()
     683          id3 = interpreters.create()
     684          self.assertIn(id2, interpreters.list_all())
     685          interpreters.destroy(id2)
     686          self.assertNotIn(id2, interpreters.list_all())
     687          self.assertIn(id1, interpreters.list_all())
     688          self.assertIn(id3, interpreters.list_all())
     689  
     690      def test_all(self):
     691          before = set(interpreters.list_all())
     692          ids = set()
     693          for _ in range(3):
     694              id = interpreters.create()
     695              ids.add(id)
     696          self.assertEqual(set(interpreters.list_all()), before | ids)
     697          for id in ids:
     698              interpreters.destroy(id)
     699          self.assertEqual(set(interpreters.list_all()), before)
     700  
     701      def test_main(self):
     702          main, = interpreters.list_all()
     703          with self.assertRaises(RuntimeError):
     704              interpreters.destroy(main)
     705  
     706          def f():
     707              with self.assertRaises(RuntimeError):
     708                  interpreters.destroy(main)
     709  
     710          t = threading.Thread(target=f)
     711          t.start()
     712          t.join()
     713  
     714      def test_already_destroyed(self):
     715          id = interpreters.create()
     716          interpreters.destroy(id)
     717          with self.assertRaises(RuntimeError):
     718              interpreters.destroy(id)
     719  
     720      def test_does_not_exist(self):
     721          with self.assertRaises(RuntimeError):
     722              interpreters.destroy(1_000_000)
     723  
     724      def test_bad_id(self):
     725          with self.assertRaises(ValueError):
     726              interpreters.destroy(-1)
     727  
     728      def test_from_current(self):
     729          main, = interpreters.list_all()
     730          id = interpreters.create()
     731          script = dedent(f"""
     732              import _xxsubinterpreters as _interpreters
     733              try:
     734                  _interpreters.destroy({id})
     735              except RuntimeError:
     736                  pass
     737              """)
     738  
     739          interpreters.run_string(id, script)
     740          self.assertEqual(set(interpreters.list_all()), {main, id})
     741  
     742      def test_from_sibling(self):
     743          main, = interpreters.list_all()
     744          id1 = interpreters.create()
     745          id2 = interpreters.create()
     746          script = dedent(f"""
     747              import _xxsubinterpreters as _interpreters
     748              _interpreters.destroy({id2})
     749              """)
     750          interpreters.run_string(id1, script)
     751  
     752          self.assertEqual(set(interpreters.list_all()), {main, id1})
     753  
     754      def test_from_other_thread(self):
     755          id = interpreters.create()
     756          def f():
     757              interpreters.destroy(id)
     758  
     759          t = threading.Thread(target=f)
     760          t.start()
     761          t.join()
     762  
     763      def test_still_running(self):
     764          main, = interpreters.list_all()
     765          interp = interpreters.create()
     766          with _running(interp):
     767              self.assertTrue(interpreters.is_running(interp),
     768                              msg=f"Interp {interp} should be running before destruction.")
     769  
     770              with self.assertRaises(RuntimeError,
     771                                     msg=f"Should not be able to destroy interp {interp} while it's still running."):
     772                  interpreters.destroy(interp)
     773              self.assertTrue(interpreters.is_running(interp))
     774  
     775  
     776  class ESC[4;38;5;81mRunStringTests(ESC[4;38;5;149mTestBase):
     777  
     778      def setUp(self):
     779          super().setUp()
     780          self.id = interpreters.create()
     781  
     782      def test_success(self):
     783          script, file = _captured_script('print("it worked!", end="")')
     784          with file:
     785              interpreters.run_string(self.id, script)
     786              out = file.read()
     787  
     788          self.assertEqual(out, 'it worked!')
     789  
     790      def test_in_thread(self):
     791          script, file = _captured_script('print("it worked!", end="")')
     792          with file:
     793              def f():
     794                  interpreters.run_string(self.id, script)
     795  
     796              t = threading.Thread(target=f)
     797              t.start()
     798              t.join()
     799              out = file.read()
     800  
     801          self.assertEqual(out, 'it worked!')
     802  
     803      def test_create_thread(self):
     804          subinterp = interpreters.create(isolated=False)
     805          script, file = _captured_script("""
     806              import threading
     807              def f():
     808                  print('it worked!', end='')
     809  
     810              t = threading.Thread(target=f)
     811              t.start()
     812              t.join()
     813              """)
     814          with file:
     815              interpreters.run_string(subinterp, script)
     816              out = file.read()
     817  
     818          self.assertEqual(out, 'it worked!')
     819  
     820      @support.requires_fork()
     821      def test_fork(self):
     822          import tempfile
     823          with tempfile.NamedTemporaryFile('w+', encoding="utf-8") as file:
     824              file.write('')
     825              file.flush()
     826  
     827              expected = 'spam spam spam spam spam'
     828              script = dedent(f"""
     829                  import os
     830                  try:
     831                      os.fork()
     832                  except RuntimeError:
     833                      with open('{file.name}', 'w', encoding='utf-8') as out:
     834                          out.write('{expected}')
     835                  """)
     836              interpreters.run_string(self.id, script)
     837  
     838              file.seek(0)
     839              content = file.read()
     840              self.assertEqual(content, expected)
     841  
     842      def test_already_running(self):
     843          with _running(self.id):
     844              with self.assertRaises(RuntimeError):
     845                  interpreters.run_string(self.id, 'print("spam")')
     846  
     847      def test_does_not_exist(self):
     848          id = 0
     849          while id in interpreters.list_all():
     850              id += 1
     851          with self.assertRaises(RuntimeError):
     852              interpreters.run_string(id, 'print("spam")')
     853  
     854      def test_error_id(self):
     855          with self.assertRaises(ValueError):
     856              interpreters.run_string(-1, 'print("spam")')
     857  
     858      def test_bad_id(self):
     859          with self.assertRaises(TypeError):
     860              interpreters.run_string('spam', 'print("spam")')
     861  
     862      def test_bad_script(self):
     863          with self.assertRaises(TypeError):
     864              interpreters.run_string(self.id, 10)
     865  
     866      def test_bytes_for_script(self):
     867          with self.assertRaises(TypeError):
     868              interpreters.run_string(self.id, b'print("spam")')
     869  
     870      @contextlib.contextmanager
     871      def assert_run_failed(self, exctype, msg=None):
     872          with self.assertRaises(interpreters.RunFailedError) as caught:
     873              yield
     874          if msg is None:
     875              self.assertEqual(str(caught.exception).split(':')[0],
     876                               str(exctype))
     877          else:
     878              self.assertEqual(str(caught.exception),
     879                               "{}: {}".format(exctype, msg))
     880  
     881      def test_invalid_syntax(self):
     882          with self.assert_run_failed(SyntaxError):
     883              # missing close paren
     884              interpreters.run_string(self.id, 'print("spam"')
     885  
     886      def test_failure(self):
     887          with self.assert_run_failed(Exception, 'spam'):
     888              interpreters.run_string(self.id, 'raise Exception("spam")')
     889  
     890      def test_SystemExit(self):
     891          with self.assert_run_failed(SystemExit, '42'):
     892              interpreters.run_string(self.id, 'raise SystemExit(42)')
     893  
     894      def test_sys_exit(self):
     895          with self.assert_run_failed(SystemExit):
     896              interpreters.run_string(self.id, dedent("""
     897                  import sys
     898                  sys.exit()
     899                  """))
     900  
     901          with self.assert_run_failed(SystemExit, '42'):
     902              interpreters.run_string(self.id, dedent("""
     903                  import sys
     904                  sys.exit(42)
     905                  """))
     906  
     907      def test_with_shared(self):
     908          r, w = os.pipe()
     909  
     910          shared = {
     911                  'spam': b'ham',
     912                  'eggs': b'-1',
     913                  'cheddar': None,
     914                  }
     915          script = dedent(f"""
     916              eggs = int(eggs)
     917              spam = 42
     918              result = spam + eggs
     919  
     920              ns = dict(vars())
     921              del ns['__builtins__']
     922              import pickle
     923              with open({w}, 'wb') as chan:
     924                  pickle.dump(ns, chan)
     925              """)
     926          interpreters.run_string(self.id, script, shared)
     927          with open(r, 'rb') as chan:
     928              ns = pickle.load(chan)
     929  
     930          self.assertEqual(ns['spam'], 42)
     931          self.assertEqual(ns['eggs'], -1)
     932          self.assertEqual(ns['result'], 41)
     933          self.assertIsNone(ns['cheddar'])
     934  
     935      def test_shared_overwrites(self):
     936          interpreters.run_string(self.id, dedent("""
     937              spam = 'eggs'
     938              ns1 = dict(vars())
     939              del ns1['__builtins__']
     940              """))
     941  
     942          shared = {'spam': b'ham'}
     943          script = dedent(f"""
     944              ns2 = dict(vars())
     945              del ns2['__builtins__']
     946          """)
     947          interpreters.run_string(self.id, script, shared)
     948  
     949          r, w = os.pipe()
     950          script = dedent(f"""
     951              ns = dict(vars())
     952              del ns['__builtins__']
     953              import pickle
     954              with open({w}, 'wb') as chan:
     955                  pickle.dump(ns, chan)
     956              """)
     957          interpreters.run_string(self.id, script)
     958          with open(r, 'rb') as chan:
     959              ns = pickle.load(chan)
     960  
     961          self.assertEqual(ns['ns1']['spam'], 'eggs')
     962          self.assertEqual(ns['ns2']['spam'], b'ham')
     963          self.assertEqual(ns['spam'], b'ham')
     964  
     965      def test_shared_overwrites_default_vars(self):
     966          r, w = os.pipe()
     967  
     968          shared = {'__name__': b'not __main__'}
     969          script = dedent(f"""
     970              spam = 42
     971  
     972              ns = dict(vars())
     973              del ns['__builtins__']
     974              import pickle
     975              with open({w}, 'wb') as chan:
     976                  pickle.dump(ns, chan)
     977              """)
     978          interpreters.run_string(self.id, script, shared)
     979          with open(r, 'rb') as chan:
     980              ns = pickle.load(chan)
     981  
     982          self.assertEqual(ns['__name__'], b'not __main__')
     983  
     984      def test_main_reused(self):
     985          r, w = os.pipe()
     986          interpreters.run_string(self.id, dedent(f"""
     987              spam = True
     988  
     989              ns = dict(vars())
     990              del ns['__builtins__']
     991              import pickle
     992              with open({w}, 'wb') as chan:
     993                  pickle.dump(ns, chan)
     994              del ns, pickle, chan
     995              """))
     996          with open(r, 'rb') as chan:
     997              ns1 = pickle.load(chan)
     998  
     999          r, w = os.pipe()
    1000          interpreters.run_string(self.id, dedent(f"""
    1001              eggs = False
    1002  
    1003              ns = dict(vars())
    1004              del ns['__builtins__']
    1005              import pickle
    1006              with open({w}, 'wb') as chan:
    1007                  pickle.dump(ns, chan)
    1008              """))
    1009          with open(r, 'rb') as chan:
    1010              ns2 = pickle.load(chan)
    1011  
    1012          self.assertIn('spam', ns1)
    1013          self.assertNotIn('eggs', ns1)
    1014          self.assertIn('eggs', ns2)
    1015          self.assertIn('spam', ns2)
    1016  
    1017      def test_execution_namespace_is_main(self):
    1018          r, w = os.pipe()
    1019  
    1020          script = dedent(f"""
    1021              spam = 42
    1022  
    1023              ns = dict(vars())
    1024              ns['__builtins__'] = str(ns['__builtins__'])
    1025              import pickle
    1026              with open({w}, 'wb') as chan:
    1027                  pickle.dump(ns, chan)
    1028              """)
    1029          interpreters.run_string(self.id, script)
    1030          with open(r, 'rb') as chan:
    1031              ns = pickle.load(chan)
    1032  
    1033          ns.pop('__builtins__')
    1034          ns.pop('__loader__')
    1035          self.assertEqual(ns, {
    1036              '__name__': '__main__',
    1037              '__annotations__': {},
    1038              '__doc__': None,
    1039              '__package__': None,
    1040              '__spec__': None,
    1041              'spam': 42,
    1042              })
    1043  
    1044      # XXX Fix this test!
    1045      @unittest.skip('blocking forever')
    1046      def test_still_running_at_exit(self):
    1047          script = dedent(f"""
    1048          from textwrap import dedent
    1049          import threading
    1050          import _xxsubinterpreters as _interpreters
    1051          id = _interpreters.create()
    1052          def f():
    1053              _interpreters.run_string(id, dedent('''
    1054                  import time
    1055                  # Give plenty of time for the main interpreter to finish.
    1056                  time.sleep(1_000_000)
    1057                  '''))
    1058  
    1059          t = threading.Thread(target=f)
    1060          t.start()
    1061          """)
    1062          with support.temp_dir() as dirname:
    1063              filename = script_helper.make_script(dirname, 'interp', script)
    1064              with script_helper.spawn_python(filename) as proc:
    1065                  retcode = proc.wait()
    1066  
    1067          self.assertEqual(retcode, 0)
    1068  
    1069  
    1070  ##################################
    1071  # channel tests
    1072  
    1073  class ESC[4;38;5;81mChannelIDTests(ESC[4;38;5;149mTestBase):
    1074  
    1075      def test_default_kwargs(self):
    1076          cid = interpreters._channel_id(10, force=True)
    1077  
    1078          self.assertEqual(int(cid), 10)
    1079          self.assertEqual(cid.end, 'both')
    1080  
    1081      def test_with_kwargs(self):
    1082          cid = interpreters._channel_id(10, send=True, force=True)
    1083          self.assertEqual(cid.end, 'send')
    1084  
    1085          cid = interpreters._channel_id(10, send=True, recv=False, force=True)
    1086          self.assertEqual(cid.end, 'send')
    1087  
    1088          cid = interpreters._channel_id(10, recv=True, force=True)
    1089          self.assertEqual(cid.end, 'recv')
    1090  
    1091          cid = interpreters._channel_id(10, recv=True, send=False, force=True)
    1092          self.assertEqual(cid.end, 'recv')
    1093  
    1094          cid = interpreters._channel_id(10, send=True, recv=True, force=True)
    1095          self.assertEqual(cid.end, 'both')
    1096  
    1097      def test_coerce_id(self):
    1098          class ESC[4;38;5;81mInt(ESC[4;38;5;149mstr):
    1099              def __index__(self):
    1100                  return 10
    1101  
    1102          cid = interpreters._channel_id(Int(), force=True)
    1103          self.assertEqual(int(cid), 10)
    1104  
    1105      def test_bad_id(self):
    1106          self.assertRaises(TypeError, interpreters._channel_id, object())
    1107          self.assertRaises(TypeError, interpreters._channel_id, 10.0)
    1108          self.assertRaises(TypeError, interpreters._channel_id, '10')
    1109          self.assertRaises(TypeError, interpreters._channel_id, b'10')
    1110          self.assertRaises(ValueError, interpreters._channel_id, -1)
    1111          self.assertRaises(OverflowError, interpreters._channel_id, 2**64)
    1112  
    1113      def test_bad_kwargs(self):
    1114          with self.assertRaises(ValueError):
    1115              interpreters._channel_id(10, send=False, recv=False)
    1116  
    1117      def test_does_not_exist(self):
    1118          cid = interpreters.channel_create()
    1119          with self.assertRaises(interpreters.ChannelNotFoundError):
    1120              interpreters._channel_id(int(cid) + 1)  # unforced
    1121  
    1122      def test_str(self):
    1123          cid = interpreters._channel_id(10, force=True)
    1124          self.assertEqual(str(cid), '10')
    1125  
    1126      def test_repr(self):
    1127          cid = interpreters._channel_id(10, force=True)
    1128          self.assertEqual(repr(cid), 'ChannelID(10)')
    1129  
    1130          cid = interpreters._channel_id(10, send=True, force=True)
    1131          self.assertEqual(repr(cid), 'ChannelID(10, send=True)')
    1132  
    1133          cid = interpreters._channel_id(10, recv=True, force=True)
    1134          self.assertEqual(repr(cid), 'ChannelID(10, recv=True)')
    1135  
    1136          cid = interpreters._channel_id(10, send=True, recv=True, force=True)
    1137          self.assertEqual(repr(cid), 'ChannelID(10)')
    1138  
    1139      def test_equality(self):
    1140          cid1 = interpreters.channel_create()
    1141          cid2 = interpreters._channel_id(int(cid1))
    1142          cid3 = interpreters.channel_create()
    1143  
    1144          self.assertTrue(cid1 == cid1)
    1145          self.assertTrue(cid1 == cid2)
    1146          self.assertTrue(cid1 == int(cid1))
    1147          self.assertTrue(int(cid1) == cid1)
    1148          self.assertTrue(cid1 == float(int(cid1)))
    1149          self.assertTrue(float(int(cid1)) == cid1)
    1150          self.assertFalse(cid1 == float(int(cid1)) + 0.1)
    1151          self.assertFalse(cid1 == str(int(cid1)))
    1152          self.assertFalse(cid1 == 2**1000)
    1153          self.assertFalse(cid1 == float('inf'))
    1154          self.assertFalse(cid1 == 'spam')
    1155          self.assertFalse(cid1 == cid3)
    1156  
    1157          self.assertFalse(cid1 != cid1)
    1158          self.assertFalse(cid1 != cid2)
    1159          self.assertTrue(cid1 != cid3)
    1160  
    1161  
    1162  class ESC[4;38;5;81mChannelTests(ESC[4;38;5;149mTestBase):
    1163  
    1164      def test_create_cid(self):
    1165          cid = interpreters.channel_create()
    1166          self.assertIsInstance(cid, interpreters.ChannelID)
    1167  
    1168      def test_sequential_ids(self):
    1169          before = interpreters.channel_list_all()
    1170          id1 = interpreters.channel_create()
    1171          id2 = interpreters.channel_create()
    1172          id3 = interpreters.channel_create()
    1173          after = interpreters.channel_list_all()
    1174  
    1175          self.assertEqual(id2, int(id1) + 1)
    1176          self.assertEqual(id3, int(id2) + 1)
    1177          self.assertEqual(set(after) - set(before), {id1, id2, id3})
    1178  
    1179      def test_ids_global(self):
    1180          id1 = interpreters.create()
    1181          out = _run_output(id1, dedent("""
    1182              import _xxsubinterpreters as _interpreters
    1183              cid = _interpreters.channel_create()
    1184              print(cid)
    1185              """))
    1186          cid1 = int(out.strip())
    1187  
    1188          id2 = interpreters.create()
    1189          out = _run_output(id2, dedent("""
    1190              import _xxsubinterpreters as _interpreters
    1191              cid = _interpreters.channel_create()
    1192              print(cid)
    1193              """))
    1194          cid2 = int(out.strip())
    1195  
    1196          self.assertEqual(cid2, int(cid1) + 1)
    1197  
    1198      def test_channel_list_interpreters_none(self):
    1199          """Test listing interpreters for a channel with no associations."""
    1200          # Test for channel with no associated interpreters.
    1201          cid = interpreters.channel_create()
    1202          send_interps = interpreters.channel_list_interpreters(cid, send=True)
    1203          recv_interps = interpreters.channel_list_interpreters(cid, send=False)
    1204          self.assertEqual(send_interps, [])
    1205          self.assertEqual(recv_interps, [])
    1206  
    1207      def test_channel_list_interpreters_basic(self):
    1208          """Test basic listing channel interpreters."""
    1209          interp0 = interpreters.get_main()
    1210          cid = interpreters.channel_create()
    1211          interpreters.channel_send(cid, "send")
    1212          # Test for a channel that has one end associated to an interpreter.
    1213          send_interps = interpreters.channel_list_interpreters(cid, send=True)
    1214          recv_interps = interpreters.channel_list_interpreters(cid, send=False)
    1215          self.assertEqual(send_interps, [interp0])
    1216          self.assertEqual(recv_interps, [])
    1217  
    1218          interp1 = interpreters.create()
    1219          _run_output(interp1, dedent(f"""
    1220              import _xxsubinterpreters as _interpreters
    1221              obj = _interpreters.channel_recv({cid})
    1222              """))
    1223          # Test for channel that has both ends associated to an interpreter.
    1224          send_interps = interpreters.channel_list_interpreters(cid, send=True)
    1225          recv_interps = interpreters.channel_list_interpreters(cid, send=False)
    1226          self.assertEqual(send_interps, [interp0])
    1227          self.assertEqual(recv_interps, [interp1])
    1228  
    1229      def test_channel_list_interpreters_multiple(self):
    1230          """Test listing interpreters for a channel with many associations."""
    1231          interp0 = interpreters.get_main()
    1232          interp1 = interpreters.create()
    1233          interp2 = interpreters.create()
    1234          interp3 = interpreters.create()
    1235          cid = interpreters.channel_create()
    1236  
    1237          interpreters.channel_send(cid, "send")
    1238          _run_output(interp1, dedent(f"""
    1239              import _xxsubinterpreters as _interpreters
    1240              _interpreters.channel_send({cid}, "send")
    1241              """))
    1242          _run_output(interp2, dedent(f"""
    1243              import _xxsubinterpreters as _interpreters
    1244              obj = _interpreters.channel_recv({cid})
    1245              """))
    1246          _run_output(interp3, dedent(f"""
    1247              import _xxsubinterpreters as _interpreters
    1248              obj = _interpreters.channel_recv({cid})
    1249              """))
    1250          send_interps = interpreters.channel_list_interpreters(cid, send=True)
    1251          recv_interps = interpreters.channel_list_interpreters(cid, send=False)
    1252          self.assertEqual(set(send_interps), {interp0, interp1})
    1253          self.assertEqual(set(recv_interps), {interp2, interp3})
    1254  
    1255      def test_channel_list_interpreters_destroyed(self):
    1256          """Test listing channel interpreters with a destroyed interpreter."""
    1257          interp0 = interpreters.get_main()
    1258          interp1 = interpreters.create()
    1259          cid = interpreters.channel_create()
    1260          interpreters.channel_send(cid, "send")
    1261          _run_output(interp1, dedent(f"""
    1262              import _xxsubinterpreters as _interpreters
    1263              obj = _interpreters.channel_recv({cid})
    1264              """))
    1265          # Should be one interpreter associated with each end.
    1266          send_interps = interpreters.channel_list_interpreters(cid, send=True)
    1267          recv_interps = interpreters.channel_list_interpreters(cid, send=False)
    1268          self.assertEqual(send_interps, [interp0])
    1269          self.assertEqual(recv_interps, [interp1])
    1270  
    1271          interpreters.destroy(interp1)
    1272          # Destroyed interpreter should not be listed.
    1273          send_interps = interpreters.channel_list_interpreters(cid, send=True)
    1274          recv_interps = interpreters.channel_list_interpreters(cid, send=False)
    1275          self.assertEqual(send_interps, [interp0])
    1276          self.assertEqual(recv_interps, [])
    1277  
    1278      def test_channel_list_interpreters_released(self):
    1279          """Test listing channel interpreters with a released channel."""
    1280          # Set up one channel with main interpreter on the send end and two
    1281          # subinterpreters on the receive end.
    1282          interp0 = interpreters.get_main()
    1283          interp1 = interpreters.create()
    1284          interp2 = interpreters.create()
    1285          cid = interpreters.channel_create()
    1286          interpreters.channel_send(cid, "data")
    1287          _run_output(interp1, dedent(f"""
    1288              import _xxsubinterpreters as _interpreters
    1289              obj = _interpreters.channel_recv({cid})
    1290              """))
    1291          interpreters.channel_send(cid, "data")
    1292          _run_output(interp2, dedent(f"""
    1293              import _xxsubinterpreters as _interpreters
    1294              obj = _interpreters.channel_recv({cid})
    1295              """))
    1296          # Check the setup.
    1297          send_interps = interpreters.channel_list_interpreters(cid, send=True)
    1298          recv_interps = interpreters.channel_list_interpreters(cid, send=False)
    1299          self.assertEqual(len(send_interps), 1)
    1300          self.assertEqual(len(recv_interps), 2)
    1301  
    1302          # Release the main interpreter from the send end.
    1303          interpreters.channel_release(cid, send=True)
    1304          # Send end should have no associated interpreters.
    1305          send_interps = interpreters.channel_list_interpreters(cid, send=True)
    1306          recv_interps = interpreters.channel_list_interpreters(cid, send=False)
    1307          self.assertEqual(len(send_interps), 0)
    1308          self.assertEqual(len(recv_interps), 2)
    1309  
    1310          # Release one of the subinterpreters from the receive end.
    1311          _run_output(interp2, dedent(f"""
    1312              import _xxsubinterpreters as _interpreters
    1313              _interpreters.channel_release({cid})
    1314              """))
    1315          # Receive end should have the released interpreter removed.
    1316          send_interps = interpreters.channel_list_interpreters(cid, send=True)
    1317          recv_interps = interpreters.channel_list_interpreters(cid, send=False)
    1318          self.assertEqual(len(send_interps), 0)
    1319          self.assertEqual(recv_interps, [interp1])
    1320  
    1321      def test_channel_list_interpreters_closed(self):
    1322          """Test listing channel interpreters with a closed channel."""
    1323          interp0 = interpreters.get_main()
    1324          interp1 = interpreters.create()
    1325          cid = interpreters.channel_create()
    1326          # Put something in the channel so that it's not empty.
    1327          interpreters.channel_send(cid, "send")
    1328  
    1329          # Check initial state.
    1330          send_interps = interpreters.channel_list_interpreters(cid, send=True)
    1331          recv_interps = interpreters.channel_list_interpreters(cid, send=False)
    1332          self.assertEqual(len(send_interps), 1)
    1333          self.assertEqual(len(recv_interps), 0)
    1334  
    1335          # Force close the channel.
    1336          interpreters.channel_close(cid, force=True)
    1337          # Both ends should raise an error.
    1338          with self.assertRaises(interpreters.ChannelClosedError):
    1339              interpreters.channel_list_interpreters(cid, send=True)
    1340          with self.assertRaises(interpreters.ChannelClosedError):
    1341              interpreters.channel_list_interpreters(cid, send=False)
    1342  
    1343      def test_channel_list_interpreters_closed_send_end(self):
    1344          """Test listing channel interpreters with a channel's send end closed."""
    1345          interp0 = interpreters.get_main()
    1346          interp1 = interpreters.create()
    1347          cid = interpreters.channel_create()
    1348          # Put something in the channel so that it's not empty.
    1349          interpreters.channel_send(cid, "send")
    1350  
    1351          # Check initial state.
    1352          send_interps = interpreters.channel_list_interpreters(cid, send=True)
    1353          recv_interps = interpreters.channel_list_interpreters(cid, send=False)
    1354          self.assertEqual(len(send_interps), 1)
    1355          self.assertEqual(len(recv_interps), 0)
    1356  
    1357          # Close the send end of the channel.
    1358          interpreters.channel_close(cid, send=True)
    1359          # Send end should raise an error.
    1360          with self.assertRaises(interpreters.ChannelClosedError):
    1361              interpreters.channel_list_interpreters(cid, send=True)
    1362          # Receive end should not be closed (since channel is not empty).
    1363          recv_interps = interpreters.channel_list_interpreters(cid, send=False)
    1364          self.assertEqual(len(recv_interps), 0)
    1365  
    1366          # Close the receive end of the channel from a subinterpreter.
    1367          _run_output(interp1, dedent(f"""
    1368              import _xxsubinterpreters as _interpreters
    1369              _interpreters.channel_close({cid}, force=True)
    1370              """))
    1371          # Both ends should raise an error.
    1372          with self.assertRaises(interpreters.ChannelClosedError):
    1373              interpreters.channel_list_interpreters(cid, send=True)
    1374          with self.assertRaises(interpreters.ChannelClosedError):
    1375              interpreters.channel_list_interpreters(cid, send=False)
    1376  
    1377      ####################
    1378  
    1379      def test_send_recv_main(self):
    1380          cid = interpreters.channel_create()
    1381          orig = b'spam'
    1382          interpreters.channel_send(cid, orig)
    1383          obj = interpreters.channel_recv(cid)
    1384  
    1385          self.assertEqual(obj, orig)
    1386          self.assertIsNot(obj, orig)
    1387  
    1388      def test_send_recv_same_interpreter(self):
    1389          id1 = interpreters.create()
    1390          out = _run_output(id1, dedent("""
    1391              import _xxsubinterpreters as _interpreters
    1392              cid = _interpreters.channel_create()
    1393              orig = b'spam'
    1394              _interpreters.channel_send(cid, orig)
    1395              obj = _interpreters.channel_recv(cid)
    1396              assert obj is not orig
    1397              assert obj == orig
    1398              """))
    1399  
    1400      def test_send_recv_different_interpreters(self):
    1401          cid = interpreters.channel_create()
    1402          id1 = interpreters.create()
    1403          out = _run_output(id1, dedent(f"""
    1404              import _xxsubinterpreters as _interpreters
    1405              _interpreters.channel_send({cid}, b'spam')
    1406              """))
    1407          obj = interpreters.channel_recv(cid)
    1408  
    1409          self.assertEqual(obj, b'spam')
    1410  
    1411      def test_send_recv_different_threads(self):
    1412          cid = interpreters.channel_create()
    1413  
    1414          def f():
    1415              while True:
    1416                  try:
    1417                      obj = interpreters.channel_recv(cid)
    1418                      break
    1419                  except interpreters.ChannelEmptyError:
    1420                      time.sleep(0.1)
    1421              interpreters.channel_send(cid, obj)
    1422          t = threading.Thread(target=f)
    1423          t.start()
    1424  
    1425          interpreters.channel_send(cid, b'spam')
    1426          t.join()
    1427          obj = interpreters.channel_recv(cid)
    1428  
    1429          self.assertEqual(obj, b'spam')
    1430  
    1431      def test_send_recv_different_interpreters_and_threads(self):
    1432          cid = interpreters.channel_create()
    1433          id1 = interpreters.create()
    1434          out = None
    1435  
    1436          def f():
    1437              nonlocal out
    1438              out = _run_output(id1, dedent(f"""
    1439                  import time
    1440                  import _xxsubinterpreters as _interpreters
    1441                  while True:
    1442                      try:
    1443                          obj = _interpreters.channel_recv({cid})
    1444                          break
    1445                      except _interpreters.ChannelEmptyError:
    1446                          time.sleep(0.1)
    1447                  assert(obj == b'spam')
    1448                  _interpreters.channel_send({cid}, b'eggs')
    1449                  """))
    1450          t = threading.Thread(target=f)
    1451          t.start()
    1452  
    1453          interpreters.channel_send(cid, b'spam')
    1454          t.join()
    1455          obj = interpreters.channel_recv(cid)
    1456  
    1457          self.assertEqual(obj, b'eggs')
    1458  
    1459      def test_send_not_found(self):
    1460          with self.assertRaises(interpreters.ChannelNotFoundError):
    1461              interpreters.channel_send(10, b'spam')
    1462  
    1463      def test_recv_not_found(self):
    1464          with self.assertRaises(interpreters.ChannelNotFoundError):
    1465              interpreters.channel_recv(10)
    1466  
    1467      def test_recv_empty(self):
    1468          cid = interpreters.channel_create()
    1469          with self.assertRaises(interpreters.ChannelEmptyError):
    1470              interpreters.channel_recv(cid)
    1471  
    1472      def test_recv_default(self):
    1473          default = object()
    1474          cid = interpreters.channel_create()
    1475          obj1 = interpreters.channel_recv(cid, default)
    1476          interpreters.channel_send(cid, None)
    1477          interpreters.channel_send(cid, 1)
    1478          interpreters.channel_send(cid, b'spam')
    1479          interpreters.channel_send(cid, b'eggs')
    1480          obj2 = interpreters.channel_recv(cid, default)
    1481          obj3 = interpreters.channel_recv(cid, default)
    1482          obj4 = interpreters.channel_recv(cid)
    1483          obj5 = interpreters.channel_recv(cid, default)
    1484          obj6 = interpreters.channel_recv(cid, default)
    1485  
    1486          self.assertIs(obj1, default)
    1487          self.assertIs(obj2, None)
    1488          self.assertEqual(obj3, 1)
    1489          self.assertEqual(obj4, b'spam')
    1490          self.assertEqual(obj5, b'eggs')
    1491          self.assertIs(obj6, default)
    1492  
    1493      def test_run_string_arg_unresolved(self):
    1494          cid = interpreters.channel_create()
    1495          interp = interpreters.create()
    1496  
    1497          out = _run_output(interp, dedent("""
    1498              import _xxsubinterpreters as _interpreters
    1499              print(cid.end)
    1500              _interpreters.channel_send(cid, b'spam')
    1501              """),
    1502              dict(cid=cid.send))
    1503          obj = interpreters.channel_recv(cid)
    1504  
    1505          self.assertEqual(obj, b'spam')
    1506          self.assertEqual(out.strip(), 'send')
    1507  
    1508      # XXX For now there is no high-level channel into which the
    1509      # sent channel ID can be converted...
    1510      # Note: this test caused crashes on some buildbots (bpo-33615).
    1511      @unittest.skip('disabled until high-level channels exist')
    1512      def test_run_string_arg_resolved(self):
    1513          cid = interpreters.channel_create()
    1514          cid = interpreters._channel_id(cid, _resolve=True)
    1515          interp = interpreters.create()
    1516  
    1517          out = _run_output(interp, dedent("""
    1518              import _xxsubinterpreters as _interpreters
    1519              print(chan.id.end)
    1520              _interpreters.channel_send(chan.id, b'spam')
    1521              """),
    1522              dict(chan=cid.send))
    1523          obj = interpreters.channel_recv(cid)
    1524  
    1525          self.assertEqual(obj, b'spam')
    1526          self.assertEqual(out.strip(), 'send')
    1527  
    1528      # close
    1529  
    1530      def test_close_single_user(self):
    1531          cid = interpreters.channel_create()
    1532          interpreters.channel_send(cid, b'spam')
    1533          interpreters.channel_recv(cid)
    1534          interpreters.channel_close(cid)
    1535  
    1536          with self.assertRaises(interpreters.ChannelClosedError):
    1537              interpreters.channel_send(cid, b'eggs')
    1538          with self.assertRaises(interpreters.ChannelClosedError):
    1539              interpreters.channel_recv(cid)
    1540  
    1541      def test_close_multiple_users(self):
    1542          cid = interpreters.channel_create()
    1543          id1 = interpreters.create()
    1544          id2 = interpreters.create()
    1545          interpreters.run_string(id1, dedent(f"""
    1546              import _xxsubinterpreters as _interpreters
    1547              _interpreters.channel_send({cid}, b'spam')
    1548              """))
    1549          interpreters.run_string(id2, dedent(f"""
    1550              import _xxsubinterpreters as _interpreters
    1551              _interpreters.channel_recv({cid})
    1552              """))
    1553          interpreters.channel_close(cid)
    1554          with self.assertRaises(interpreters.RunFailedError) as cm:
    1555              interpreters.run_string(id1, dedent(f"""
    1556                  _interpreters.channel_send({cid}, b'spam')
    1557                  """))
    1558          self.assertIn('ChannelClosedError', str(cm.exception))
    1559          with self.assertRaises(interpreters.RunFailedError) as cm:
    1560              interpreters.run_string(id2, dedent(f"""
    1561                  _interpreters.channel_send({cid}, b'spam')
    1562                  """))
    1563          self.assertIn('ChannelClosedError', str(cm.exception))
    1564  
    1565      def test_close_multiple_times(self):
    1566          cid = interpreters.channel_create()
    1567          interpreters.channel_send(cid, b'spam')
    1568          interpreters.channel_recv(cid)
    1569          interpreters.channel_close(cid)
    1570  
    1571          with self.assertRaises(interpreters.ChannelClosedError):
    1572              interpreters.channel_close(cid)
    1573  
    1574      def test_close_empty(self):
    1575          tests = [
    1576              (False, False),
    1577              (True, False),
    1578              (False, True),
    1579              (True, True),
    1580              ]
    1581          for send, recv in tests:
    1582              with self.subTest((send, recv)):
    1583                  cid = interpreters.channel_create()
    1584                  interpreters.channel_send(cid, b'spam')
    1585                  interpreters.channel_recv(cid)
    1586                  interpreters.channel_close(cid, send=send, recv=recv)
    1587  
    1588                  with self.assertRaises(interpreters.ChannelClosedError):
    1589                      interpreters.channel_send(cid, b'eggs')
    1590                  with self.assertRaises(interpreters.ChannelClosedError):
    1591                      interpreters.channel_recv(cid)
    1592  
    1593      def test_close_defaults_with_unused_items(self):
    1594          cid = interpreters.channel_create()
    1595          interpreters.channel_send(cid, b'spam')
    1596          interpreters.channel_send(cid, b'ham')
    1597  
    1598          with self.assertRaises(interpreters.ChannelNotEmptyError):
    1599              interpreters.channel_close(cid)
    1600          interpreters.channel_recv(cid)
    1601          interpreters.channel_send(cid, b'eggs')
    1602  
    1603      def test_close_recv_with_unused_items_unforced(self):
    1604          cid = interpreters.channel_create()
    1605          interpreters.channel_send(cid, b'spam')
    1606          interpreters.channel_send(cid, b'ham')
    1607  
    1608          with self.assertRaises(interpreters.ChannelNotEmptyError):
    1609              interpreters.channel_close(cid, recv=True)
    1610          interpreters.channel_recv(cid)
    1611          interpreters.channel_send(cid, b'eggs')
    1612          interpreters.channel_recv(cid)
    1613          interpreters.channel_recv(cid)
    1614          interpreters.channel_close(cid, recv=True)
    1615  
    1616      def test_close_send_with_unused_items_unforced(self):
    1617          cid = interpreters.channel_create()
    1618          interpreters.channel_send(cid, b'spam')
    1619          interpreters.channel_send(cid, b'ham')
    1620          interpreters.channel_close(cid, send=True)
    1621  
    1622          with self.assertRaises(interpreters.ChannelClosedError):
    1623              interpreters.channel_send(cid, b'eggs')
    1624          interpreters.channel_recv(cid)
    1625          interpreters.channel_recv(cid)
    1626          with self.assertRaises(interpreters.ChannelClosedError):
    1627              interpreters.channel_recv(cid)
    1628  
    1629      def test_close_both_with_unused_items_unforced(self):
    1630          cid = interpreters.channel_create()
    1631          interpreters.channel_send(cid, b'spam')
    1632          interpreters.channel_send(cid, b'ham')
    1633  
    1634          with self.assertRaises(interpreters.ChannelNotEmptyError):
    1635              interpreters.channel_close(cid, recv=True, send=True)
    1636          interpreters.channel_recv(cid)
    1637          interpreters.channel_send(cid, b'eggs')
    1638          interpreters.channel_recv(cid)
    1639          interpreters.channel_recv(cid)
    1640          interpreters.channel_close(cid, recv=True)
    1641  
    1642      def test_close_recv_with_unused_items_forced(self):
    1643          cid = interpreters.channel_create()
    1644          interpreters.channel_send(cid, b'spam')
    1645          interpreters.channel_send(cid, b'ham')
    1646          interpreters.channel_close(cid, recv=True, force=True)
    1647  
    1648          with self.assertRaises(interpreters.ChannelClosedError):
    1649              interpreters.channel_send(cid, b'eggs')
    1650          with self.assertRaises(interpreters.ChannelClosedError):
    1651              interpreters.channel_recv(cid)
    1652  
    1653      def test_close_send_with_unused_items_forced(self):
    1654          cid = interpreters.channel_create()
    1655          interpreters.channel_send(cid, b'spam')
    1656          interpreters.channel_send(cid, b'ham')
    1657          interpreters.channel_close(cid, send=True, force=True)
    1658  
    1659          with self.assertRaises(interpreters.ChannelClosedError):
    1660              interpreters.channel_send(cid, b'eggs')
    1661          with self.assertRaises(interpreters.ChannelClosedError):
    1662              interpreters.channel_recv(cid)
    1663  
    1664      def test_close_both_with_unused_items_forced(self):
    1665          cid = interpreters.channel_create()
    1666          interpreters.channel_send(cid, b'spam')
    1667          interpreters.channel_send(cid, b'ham')
    1668          interpreters.channel_close(cid, send=True, recv=True, force=True)
    1669  
    1670          with self.assertRaises(interpreters.ChannelClosedError):
    1671              interpreters.channel_send(cid, b'eggs')
    1672          with self.assertRaises(interpreters.ChannelClosedError):
    1673              interpreters.channel_recv(cid)
    1674  
    1675      def test_close_never_used(self):
    1676          cid = interpreters.channel_create()
    1677          interpreters.channel_close(cid)
    1678  
    1679          with self.assertRaises(interpreters.ChannelClosedError):
    1680              interpreters.channel_send(cid, b'spam')
    1681          with self.assertRaises(interpreters.ChannelClosedError):
    1682              interpreters.channel_recv(cid)
    1683  
    1684      def test_close_by_unassociated_interp(self):
    1685          cid = interpreters.channel_create()
    1686          interpreters.channel_send(cid, b'spam')
    1687          interp = interpreters.create()
    1688          interpreters.run_string(interp, dedent(f"""
    1689              import _xxsubinterpreters as _interpreters
    1690              _interpreters.channel_close({cid}, force=True)
    1691              """))
    1692          with self.assertRaises(interpreters.ChannelClosedError):
    1693              interpreters.channel_recv(cid)
    1694          with self.assertRaises(interpreters.ChannelClosedError):
    1695              interpreters.channel_close(cid)
    1696  
    1697      def test_close_used_multiple_times_by_single_user(self):
    1698          cid = interpreters.channel_create()
    1699          interpreters.channel_send(cid, b'spam')
    1700          interpreters.channel_send(cid, b'spam')
    1701          interpreters.channel_send(cid, b'spam')
    1702          interpreters.channel_recv(cid)
    1703          interpreters.channel_close(cid, force=True)
    1704  
    1705          with self.assertRaises(interpreters.ChannelClosedError):
    1706              interpreters.channel_send(cid, b'eggs')
    1707          with self.assertRaises(interpreters.ChannelClosedError):
    1708              interpreters.channel_recv(cid)
    1709  
    1710      def test_channel_list_interpreters_invalid_channel(self):
    1711          cid = interpreters.channel_create()
    1712          # Test for invalid channel ID.
    1713          with self.assertRaises(interpreters.ChannelNotFoundError):
    1714              interpreters.channel_list_interpreters(1000, send=True)
    1715  
    1716          interpreters.channel_close(cid)
    1717          # Test for a channel that has been closed.
    1718          with self.assertRaises(interpreters.ChannelClosedError):
    1719              interpreters.channel_list_interpreters(cid, send=True)
    1720  
    1721      def test_channel_list_interpreters_invalid_args(self):
    1722          # Tests for invalid arguments passed to the API.
    1723          cid = interpreters.channel_create()
    1724          with self.assertRaises(TypeError):
    1725              interpreters.channel_list_interpreters(cid)
    1726  
    1727  
    1728  class ESC[4;38;5;81mChannelReleaseTests(ESC[4;38;5;149mTestBase):
    1729  
    1730      # XXX Add more test coverage a la the tests for close().
    1731  
    1732      """
    1733      - main / interp / other
    1734      - run in: current thread / new thread / other thread / different threads
    1735      - end / opposite
    1736      - force / no force
    1737      - used / not used  (associated / not associated)
    1738      - empty / emptied / never emptied / partly emptied
    1739      - closed / not closed
    1740      - released / not released
    1741      - creator (interp) / other
    1742      - associated interpreter not running
    1743      - associated interpreter destroyed
    1744      """
    1745  
    1746      """
    1747      use
    1748      pre-release
    1749      release
    1750      after
    1751      check
    1752      """
    1753  
    1754      """
    1755      release in:         main, interp1
    1756      creator:            same, other (incl. interp2)
    1757  
    1758      use:                None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
    1759      pre-release:        None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all
    1760      pre-release forced: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all
    1761  
    1762      release:            same
    1763      release forced:     same
    1764  
    1765      use after:          None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
    1766      release after:      None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
    1767      check released:     send/recv for same/other(incl. interp2)
    1768      check closed:       send/recv for same/other(incl. interp2)
    1769      """
    1770  
    1771      def test_single_user(self):
    1772          cid = interpreters.channel_create()
    1773          interpreters.channel_send(cid, b'spam')
    1774          interpreters.channel_recv(cid)
    1775          interpreters.channel_release(cid, send=True, recv=True)
    1776  
    1777          with self.assertRaises(interpreters.ChannelClosedError):
    1778              interpreters.channel_send(cid, b'eggs')
    1779          with self.assertRaises(interpreters.ChannelClosedError):
    1780              interpreters.channel_recv(cid)
    1781  
    1782      def test_multiple_users(self):
    1783          cid = interpreters.channel_create()
    1784          id1 = interpreters.create()
    1785          id2 = interpreters.create()
    1786          interpreters.run_string(id1, dedent(f"""
    1787              import _xxsubinterpreters as _interpreters
    1788              _interpreters.channel_send({cid}, b'spam')
    1789              """))
    1790          out = _run_output(id2, dedent(f"""
    1791              import _xxsubinterpreters as _interpreters
    1792              obj = _interpreters.channel_recv({cid})
    1793              _interpreters.channel_release({cid})
    1794              print(repr(obj))
    1795              """))
    1796          interpreters.run_string(id1, dedent(f"""
    1797              _interpreters.channel_release({cid})
    1798              """))
    1799  
    1800          self.assertEqual(out.strip(), "b'spam'")
    1801  
    1802      def test_no_kwargs(self):
    1803          cid = interpreters.channel_create()
    1804          interpreters.channel_send(cid, b'spam')
    1805          interpreters.channel_recv(cid)
    1806          interpreters.channel_release(cid)
    1807  
    1808          with self.assertRaises(interpreters.ChannelClosedError):
    1809              interpreters.channel_send(cid, b'eggs')
    1810          with self.assertRaises(interpreters.ChannelClosedError):
    1811              interpreters.channel_recv(cid)
    1812  
    1813      def test_multiple_times(self):
    1814          cid = interpreters.channel_create()
    1815          interpreters.channel_send(cid, b'spam')
    1816          interpreters.channel_recv(cid)
    1817          interpreters.channel_release(cid, send=True, recv=True)
    1818  
    1819          with self.assertRaises(interpreters.ChannelClosedError):
    1820              interpreters.channel_release(cid, send=True, recv=True)
    1821  
    1822      def test_with_unused_items(self):
    1823          cid = interpreters.channel_create()
    1824          interpreters.channel_send(cid, b'spam')
    1825          interpreters.channel_send(cid, b'ham')
    1826          interpreters.channel_release(cid, send=True, recv=True)
    1827  
    1828          with self.assertRaises(interpreters.ChannelClosedError):
    1829              interpreters.channel_recv(cid)
    1830  
    1831      def test_never_used(self):
    1832          cid = interpreters.channel_create()
    1833          interpreters.channel_release(cid)
    1834  
    1835          with self.assertRaises(interpreters.ChannelClosedError):
    1836              interpreters.channel_send(cid, b'spam')
    1837          with self.assertRaises(interpreters.ChannelClosedError):
    1838              interpreters.channel_recv(cid)
    1839  
    1840      def test_by_unassociated_interp(self):
    1841          cid = interpreters.channel_create()
    1842          interpreters.channel_send(cid, b'spam')
    1843          interp = interpreters.create()
    1844          interpreters.run_string(interp, dedent(f"""
    1845              import _xxsubinterpreters as _interpreters
    1846              _interpreters.channel_release({cid})
    1847              """))
    1848          obj = interpreters.channel_recv(cid)
    1849          interpreters.channel_release(cid)
    1850  
    1851          with self.assertRaises(interpreters.ChannelClosedError):
    1852              interpreters.channel_send(cid, b'eggs')
    1853          self.assertEqual(obj, b'spam')
    1854  
    1855      def test_close_if_unassociated(self):
    1856          # XXX Something's not right with this test...
    1857          cid = interpreters.channel_create()
    1858          interp = interpreters.create()
    1859          interpreters.run_string(interp, dedent(f"""
    1860              import _xxsubinterpreters as _interpreters
    1861              obj = _interpreters.channel_send({cid}, b'spam')
    1862              _interpreters.channel_release({cid})
    1863              """))
    1864  
    1865          with self.assertRaises(interpreters.ChannelClosedError):
    1866              interpreters.channel_recv(cid)
    1867  
    1868      def test_partially(self):
    1869          # XXX Is partial close too weird/confusing?
    1870          cid = interpreters.channel_create()
    1871          interpreters.channel_send(cid, None)
    1872          interpreters.channel_recv(cid)
    1873          interpreters.channel_send(cid, b'spam')
    1874          interpreters.channel_release(cid, send=True)
    1875          obj = interpreters.channel_recv(cid)
    1876  
    1877          self.assertEqual(obj, b'spam')
    1878  
    1879      def test_used_multiple_times_by_single_user(self):
    1880          cid = interpreters.channel_create()
    1881          interpreters.channel_send(cid, b'spam')
    1882          interpreters.channel_send(cid, b'spam')
    1883          interpreters.channel_send(cid, b'spam')
    1884          interpreters.channel_recv(cid)
    1885          interpreters.channel_release(cid, send=True, recv=True)
    1886  
    1887          with self.assertRaises(interpreters.ChannelClosedError):
    1888              interpreters.channel_send(cid, b'eggs')
    1889          with self.assertRaises(interpreters.ChannelClosedError):
    1890              interpreters.channel_recv(cid)
    1891  
    1892  
    1893  class ESC[4;38;5;81mChannelCloseFixture(ESC[4;38;5;149mnamedtuple('ChannelCloseFixture',
    1894                                       'end interp other extra creator')):
    1895  
    1896      # Set this to True to avoid creating interpreters, e.g. when
    1897      # scanning through test permutations without running them.
    1898      QUICK = False
    1899  
    1900      def __new__(cls, end, interp, other, extra, creator):
    1901          assert end in ('send', 'recv')
    1902          if cls.QUICK:
    1903              known = {}
    1904          else:
    1905              interp = Interpreter.from_raw(interp)
    1906              other = Interpreter.from_raw(other)
    1907              extra = Interpreter.from_raw(extra)
    1908              known = {
    1909                  interp.name: interp,
    1910                  other.name: other,
    1911                  extra.name: extra,
    1912                  }
    1913          if not creator:
    1914              creator = 'same'
    1915          self = super().__new__(cls, end, interp, other, extra, creator)
    1916          self._prepped = set()
    1917          self._state = ChannelState()
    1918          self._known = known
    1919          return self
    1920  
    1921      @property
    1922      def state(self):
    1923          return self._state
    1924  
    1925      @property
    1926      def cid(self):
    1927          try:
    1928              return self._cid
    1929          except AttributeError:
    1930              creator = self._get_interpreter(self.creator)
    1931              self._cid = self._new_channel(creator)
    1932              return self._cid
    1933  
    1934      def get_interpreter(self, interp):
    1935          interp = self._get_interpreter(interp)
    1936          self._prep_interpreter(interp)
    1937          return interp
    1938  
    1939      def expect_closed_error(self, end=None):
    1940          if end is None:
    1941              end = self.end
    1942          if end == 'recv' and self.state.closed == 'send':
    1943              return False
    1944          return bool(self.state.closed)
    1945  
    1946      def prep_interpreter(self, interp):
    1947          self._prep_interpreter(interp)
    1948  
    1949      def record_action(self, action, result):
    1950          self._state = result
    1951  
    1952      def clean_up(self):
    1953          clean_up_interpreters()
    1954          clean_up_channels()
    1955  
    1956      # internal methods
    1957  
    1958      def _new_channel(self, creator):
    1959          if creator.name == 'main':
    1960              return interpreters.channel_create()
    1961          else:
    1962              ch = interpreters.channel_create()
    1963              run_interp(creator.id, f"""
    1964                  import _xxsubinterpreters
    1965                  cid = _xxsubinterpreters.channel_create()
    1966                  # We purposefully send back an int to avoid tying the
    1967                  # channel to the other interpreter.
    1968                  _xxsubinterpreters.channel_send({ch}, int(cid))
    1969                  del _xxsubinterpreters
    1970                  """)
    1971              self._cid = interpreters.channel_recv(ch)
    1972          return self._cid
    1973  
    1974      def _get_interpreter(self, interp):
    1975          if interp in ('same', 'interp'):
    1976              return self.interp
    1977          elif interp == 'other':
    1978              return self.other
    1979          elif interp == 'extra':
    1980              return self.extra
    1981          else:
    1982              name = interp
    1983              try:
    1984                  interp = self._known[name]
    1985              except KeyError:
    1986                  interp = self._known[name] = Interpreter(name)
    1987              return interp
    1988  
    1989      def _prep_interpreter(self, interp):
    1990          if interp.id in self._prepped:
    1991              return
    1992          self._prepped.add(interp.id)
    1993          if interp.name == 'main':
    1994              return
    1995          run_interp(interp.id, f"""
    1996              import _xxsubinterpreters as interpreters
    1997              import test.test__xxsubinterpreters as helpers
    1998              ChannelState = helpers.ChannelState
    1999              try:
    2000                  cid
    2001              except NameError:
    2002                  cid = interpreters._channel_id({self.cid})
    2003              """)
    2004  
    2005  
    2006  @unittest.skip('these tests take several hours to run')
    2007  class ESC[4;38;5;81mExhaustiveChannelTests(ESC[4;38;5;149mTestBase):
    2008  
    2009      """
    2010      - main / interp / other
    2011      - run in: current thread / new thread / other thread / different threads
    2012      - end / opposite
    2013      - force / no force
    2014      - used / not used  (associated / not associated)
    2015      - empty / emptied / never emptied / partly emptied
    2016      - closed / not closed
    2017      - released / not released
    2018      - creator (interp) / other
    2019      - associated interpreter not running
    2020      - associated interpreter destroyed
    2021  
    2022      - close after unbound
    2023      """
    2024  
    2025      """
    2026      use
    2027      pre-close
    2028      close
    2029      after
    2030      check
    2031      """
    2032  
    2033      """
    2034      close in:         main, interp1
    2035      creator:          same, other, extra
    2036  
    2037      use:              None,send,recv,send/recv in None,same,other,same+other,all
    2038      pre-close:        None,send,recv in None,same,other,same+other,all
    2039      pre-close forced: None,send,recv in None,same,other,same+other,all
    2040  
    2041      close:            same
    2042      close forced:     same
    2043  
    2044      use after:        None,send,recv,send/recv in None,same,other,extra,same+other,all
    2045      close after:      None,send,recv,send/recv in None,same,other,extra,same+other,all
    2046      check closed:     send/recv for same/other(incl. interp2)
    2047      """
    2048  
    2049      def iter_action_sets(self):
    2050          # - used / not used  (associated / not associated)
    2051          # - empty / emptied / never emptied / partly emptied
    2052          # - closed / not closed
    2053          # - released / not released
    2054  
    2055          # never used
    2056          yield []
    2057  
    2058          # only pre-closed (and possible used after)
    2059          for closeactions in self._iter_close_action_sets('same', 'other'):
    2060              yield closeactions
    2061              for postactions in self._iter_post_close_action_sets():
    2062                  yield closeactions + postactions
    2063          for closeactions in self._iter_close_action_sets('other', 'extra'):
    2064              yield closeactions
    2065              for postactions in self._iter_post_close_action_sets():
    2066                  yield closeactions + postactions
    2067  
    2068          # used
    2069          for useactions in self._iter_use_action_sets('same', 'other'):
    2070              yield useactions
    2071              for closeactions in self._iter_close_action_sets('same', 'other'):
    2072                  actions = useactions + closeactions
    2073                  yield actions
    2074                  for postactions in self._iter_post_close_action_sets():
    2075                      yield actions + postactions
    2076              for closeactions in self._iter_close_action_sets('other', 'extra'):
    2077                  actions = useactions + closeactions
    2078                  yield actions
    2079                  for postactions in self._iter_post_close_action_sets():
    2080                      yield actions + postactions
    2081          for useactions in self._iter_use_action_sets('other', 'extra'):
    2082              yield useactions
    2083              for closeactions in self._iter_close_action_sets('same', 'other'):
    2084                  actions = useactions + closeactions
    2085                  yield actions
    2086                  for postactions in self._iter_post_close_action_sets():
    2087                      yield actions + postactions
    2088              for closeactions in self._iter_close_action_sets('other', 'extra'):
    2089                  actions = useactions + closeactions
    2090                  yield actions
    2091                  for postactions in self._iter_post_close_action_sets():
    2092                      yield actions + postactions
    2093  
    2094      def _iter_use_action_sets(self, interp1, interp2):
    2095          interps = (interp1, interp2)
    2096  
    2097          # only recv end used
    2098          yield [
    2099              ChannelAction('use', 'recv', interp1),
    2100              ]
    2101          yield [
    2102              ChannelAction('use', 'recv', interp2),
    2103              ]
    2104          yield [
    2105              ChannelAction('use', 'recv', interp1),
    2106              ChannelAction('use', 'recv', interp2),
    2107              ]
    2108  
    2109          # never emptied
    2110          yield [
    2111              ChannelAction('use', 'send', interp1),
    2112              ]
    2113          yield [
    2114              ChannelAction('use', 'send', interp2),
    2115              ]
    2116          yield [
    2117              ChannelAction('use', 'send', interp1),
    2118              ChannelAction('use', 'send', interp2),
    2119              ]
    2120  
    2121          # partially emptied
    2122          for interp1 in interps:
    2123              for interp2 in interps:
    2124                  for interp3 in interps:
    2125                      yield [
    2126                          ChannelAction('use', 'send', interp1),
    2127                          ChannelAction('use', 'send', interp2),
    2128                          ChannelAction('use', 'recv', interp3),
    2129                          ]
    2130  
    2131          # fully emptied
    2132          for interp1 in interps:
    2133              for interp2 in interps:
    2134                  for interp3 in interps:
    2135                      for interp4 in interps:
    2136                          yield [
    2137                              ChannelAction('use', 'send', interp1),
    2138                              ChannelAction('use', 'send', interp2),
    2139                              ChannelAction('use', 'recv', interp3),
    2140                              ChannelAction('use', 'recv', interp4),
    2141                              ]
    2142  
    2143      def _iter_close_action_sets(self, interp1, interp2):
    2144          ends = ('recv', 'send')
    2145          interps = (interp1, interp2)
    2146          for force in (True, False):
    2147              op = 'force-close' if force else 'close'
    2148              for interp in interps:
    2149                  for end in ends:
    2150                      yield [
    2151                          ChannelAction(op, end, interp),
    2152                          ]
    2153          for recvop in ('close', 'force-close'):
    2154              for sendop in ('close', 'force-close'):
    2155                  for recv in interps:
    2156                      for send in interps:
    2157                          yield [
    2158                              ChannelAction(recvop, 'recv', recv),
    2159                              ChannelAction(sendop, 'send', send),
    2160                              ]
    2161  
    2162      def _iter_post_close_action_sets(self):
    2163          for interp in ('same', 'extra', 'other'):
    2164              yield [
    2165                  ChannelAction('use', 'recv', interp),
    2166                  ]
    2167              yield [
    2168                  ChannelAction('use', 'send', interp),
    2169                  ]
    2170  
    2171      def run_actions(self, fix, actions):
    2172          for action in actions:
    2173              self.run_action(fix, action)
    2174  
    2175      def run_action(self, fix, action, *, hideclosed=True):
    2176          end = action.resolve_end(fix.end)
    2177          interp = action.resolve_interp(fix.interp, fix.other, fix.extra)
    2178          fix.prep_interpreter(interp)
    2179          if interp.name == 'main':
    2180              result = run_action(
    2181                  fix.cid,
    2182                  action.action,
    2183                  end,
    2184                  fix.state,
    2185                  hideclosed=hideclosed,
    2186                  )
    2187              fix.record_action(action, result)
    2188          else:
    2189              _cid = interpreters.channel_create()
    2190              run_interp(interp.id, f"""
    2191                  result = helpers.run_action(
    2192                      {fix.cid},
    2193                      {repr(action.action)},
    2194                      {repr(end)},
    2195                      {repr(fix.state)},
    2196                      hideclosed={hideclosed},
    2197                      )
    2198                  interpreters.channel_send({_cid}, result.pending.to_bytes(1, 'little'))
    2199                  interpreters.channel_send({_cid}, b'X' if result.closed else b'')
    2200                  """)
    2201              result = ChannelState(
    2202                  pending=int.from_bytes(interpreters.channel_recv(_cid), 'little'),
    2203                  closed=bool(interpreters.channel_recv(_cid)),
    2204                  )
    2205              fix.record_action(action, result)
    2206  
    2207      def iter_fixtures(self):
    2208          # XXX threads?
    2209          interpreters = [
    2210              ('main', 'interp', 'extra'),
    2211              ('interp', 'main', 'extra'),
    2212              ('interp1', 'interp2', 'extra'),
    2213              ('interp1', 'interp2', 'main'),
    2214          ]
    2215          for interp, other, extra in interpreters:
    2216              for creator in ('same', 'other', 'creator'):
    2217                  for end in ('send', 'recv'):
    2218                      yield ChannelCloseFixture(end, interp, other, extra, creator)
    2219  
    2220      def _close(self, fix, *, force):
    2221          op = 'force-close' if force else 'close'
    2222          close = ChannelAction(op, fix.end, 'same')
    2223          if not fix.expect_closed_error():
    2224              self.run_action(fix, close, hideclosed=False)
    2225          else:
    2226              with self.assertRaises(interpreters.ChannelClosedError):
    2227                  self.run_action(fix, close, hideclosed=False)
    2228  
    2229      def _assert_closed_in_interp(self, fix, interp=None):
    2230          if interp is None or interp.name == 'main':
    2231              with self.assertRaises(interpreters.ChannelClosedError):
    2232                  interpreters.channel_recv(fix.cid)
    2233              with self.assertRaises(interpreters.ChannelClosedError):
    2234                  interpreters.channel_send(fix.cid, b'spam')
    2235              with self.assertRaises(interpreters.ChannelClosedError):
    2236                  interpreters.channel_close(fix.cid)
    2237              with self.assertRaises(interpreters.ChannelClosedError):
    2238                  interpreters.channel_close(fix.cid, force=True)
    2239          else:
    2240              run_interp(interp.id, f"""
    2241                  with helpers.expect_channel_closed():
    2242                      interpreters.channel_recv(cid)
    2243                  """)
    2244              run_interp(interp.id, f"""
    2245                  with helpers.expect_channel_closed():
    2246                      interpreters.channel_send(cid, b'spam')
    2247                  """)
    2248              run_interp(interp.id, f"""
    2249                  with helpers.expect_channel_closed():
    2250                      interpreters.channel_close(cid)
    2251                  """)
    2252              run_interp(interp.id, f"""
    2253                  with helpers.expect_channel_closed():
    2254                      interpreters.channel_close(cid, force=True)
    2255                  """)
    2256  
    2257      def _assert_closed(self, fix):
    2258          self.assertTrue(fix.state.closed)
    2259  
    2260          for _ in range(fix.state.pending):
    2261              interpreters.channel_recv(fix.cid)
    2262          self._assert_closed_in_interp(fix)
    2263  
    2264          for interp in ('same', 'other'):
    2265              interp = fix.get_interpreter(interp)
    2266              if interp.name == 'main':
    2267                  continue
    2268              self._assert_closed_in_interp(fix, interp)
    2269  
    2270          interp = fix.get_interpreter('fresh')
    2271          self._assert_closed_in_interp(fix, interp)
    2272  
    2273      def _iter_close_tests(self, verbose=False):
    2274          i = 0
    2275          for actions in self.iter_action_sets():
    2276              print()
    2277              for fix in self.iter_fixtures():
    2278                  i += 1
    2279                  if i > 1000:
    2280                      return
    2281                  if verbose:
    2282                      if (i - 1) % 6 == 0:
    2283                          print()
    2284                      print(i, fix, '({} actions)'.format(len(actions)))
    2285                  else:
    2286                      if (i - 1) % 6 == 0:
    2287                          print(' ', end='')
    2288                      print('.', end=''); sys.stdout.flush()
    2289                  yield i, fix, actions
    2290              if verbose:
    2291                  print('---')
    2292          print()
    2293  
    2294      # This is useful for scanning through the possible tests.
    2295      def _skim_close_tests(self):
    2296          ChannelCloseFixture.QUICK = True
    2297          for i, fix, actions in self._iter_close_tests():
    2298              pass
    2299  
    2300      def test_close(self):
    2301          for i, fix, actions in self._iter_close_tests():
    2302              with self.subTest('{} {}  {}'.format(i, fix, actions)):
    2303                  fix.prep_interpreter(fix.interp)
    2304                  self.run_actions(fix, actions)
    2305  
    2306                  self._close(fix, force=False)
    2307  
    2308                  self._assert_closed(fix)
    2309              # XXX Things slow down if we have too many interpreters.
    2310              fix.clean_up()
    2311  
    2312      def test_force_close(self):
    2313          for i, fix, actions in self._iter_close_tests():
    2314              with self.subTest('{} {}  {}'.format(i, fix, actions)):
    2315                  fix.prep_interpreter(fix.interp)
    2316                  self.run_actions(fix, actions)
    2317  
    2318                  self._close(fix, force=True)
    2319  
    2320                  self._assert_closed(fix)
    2321              # XXX Things slow down if we have too many interpreters.
    2322              fix.clean_up()
    2323  
    2324  
    2325  if __name__ == '__main__':
    2326      unittest.main()