(root)/
Python-3.12.0/
Lib/
test/
test__xxsubinterpreters.py
       1  import contextlib
       2  import itertools
       3  import os
       4  import pickle
       5  import sys
       6  from textwrap import dedent
       7  import threading
       8  import unittest
       9  
      10  import _testcapi
      11  from test import support
      12  from test.support import import_helper
      13  from test.support import script_helper
      14  
      15  
      16  interpreters = import_helper.import_module('_xxsubinterpreters')
      17  
      18  
      19  ##################################
      20  # helpers
      21  
      22  def _captured_script(script):
      23      r, w = os.pipe()
      24      indented = script.replace('\n', '\n                ')
      25      wrapped = dedent(f"""
      26          import contextlib
      27          with open({w}, 'w', encoding="utf-8") as spipe:
      28              with contextlib.redirect_stdout(spipe):
      29                  {indented}
      30          """)
      31      return wrapped, open(r, encoding="utf-8")
      32  
      33  
      34  def _run_output(interp, request, shared=None):
      35      script, rpipe = _captured_script(request)
      36      with rpipe:
      37          interpreters.run_string(interp, script, shared)
      38          return rpipe.read()
      39  
      40  
      41  def _wait_for_interp_to_run(interp, timeout=None):
      42      # bpo-37224: Running this test file in multiprocesses will fail randomly.
      43      # The failure reason is that the thread can't acquire the cpu to
      44      # run subinterpreter eariler than the main thread in multiprocess.
      45      if timeout is None:
      46          timeout = support.SHORT_TIMEOUT
      47      for _ in support.sleeping_retry(timeout, error=False):
      48          if interpreters.is_running(interp):
      49              break
      50      else:
      51          raise RuntimeError('interp is not running')
      52  
      53  
      54  @contextlib.contextmanager
      55  def _running(interp):
      56      r, w = os.pipe()
      57      def run():
      58          interpreters.run_string(interp, dedent(f"""
      59              # wait for "signal"
      60              with open({r}, encoding="utf-8") as rpipe:
      61                  rpipe.read()
      62              """))
      63  
      64      t = threading.Thread(target=run)
      65      t.start()
      66      _wait_for_interp_to_run(interp)
      67  
      68      yield
      69  
      70      with open(w, 'w', encoding="utf-8") as spipe:
      71          spipe.write('done')
      72      t.join()
      73  
      74  
      75  def clean_up_interpreters():
      76      for id in interpreters.list_all():
      77          if id == 0:  # main
      78              continue
      79          try:
      80              interpreters.destroy(id)
      81          except RuntimeError:
      82              pass  # already destroyed
      83  
      84  
      85  class ESC[4;38;5;81mTestBase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      86  
      87      def tearDown(self):
      88          clean_up_interpreters()
      89  
      90  
      91  ##################################
      92  # misc. tests
      93  
      94  class ESC[4;38;5;81mIsShareableTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      95  
      96      def test_default_shareables(self):
      97          shareables = [
      98                  # singletons
      99                  None,
     100                  # builtin objects
     101                  b'spam',
     102                  'spam',
     103                  10,
     104                  -10,
     105                  ]
     106          for obj in shareables:
     107              with self.subTest(obj):
     108                  self.assertTrue(
     109                      interpreters.is_shareable(obj))
     110  
     111      def test_not_shareable(self):
     112          class ESC[4;38;5;81mCheese:
     113              def __init__(self, name):
     114                  self.name = name
     115              def __str__(self):
     116                  return self.name
     117  
     118          class ESC[4;38;5;81mSubBytes(ESC[4;38;5;149mbytes):
     119              """A subclass of a shareable type."""
     120  
     121          not_shareables = [
     122                  # singletons
     123                  True,
     124                  False,
     125                  NotImplemented,
     126                  ...,
     127                  # builtin types and objects
     128                  type,
     129                  object,
     130                  object(),
     131                  Exception(),
     132                  100.0,
     133                  # user-defined types and objects
     134                  Cheese,
     135                  Cheese('Wensleydale'),
     136                  SubBytes(b'spam'),
     137                  ]
     138          for obj in not_shareables:
     139              with self.subTest(repr(obj)):
     140                  self.assertFalse(
     141                      interpreters.is_shareable(obj))
     142  
     143  
     144  class ESC[4;38;5;81mShareableTypeTests(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     145  
     146      def _assert_values(self, values):
     147          for obj in values:
     148              with self.subTest(obj):
     149                  xid = _testcapi.get_crossinterp_data(obj)
     150                  got = _testcapi.restore_crossinterp_data(xid)
     151  
     152                  self.assertEqual(got, obj)
     153                  self.assertIs(type(got), type(obj))
     154  
     155      def test_singletons(self):
     156          for obj in [None]:
     157              with self.subTest(obj):
     158                  xid = _testcapi.get_crossinterp_data(obj)
     159                  got = _testcapi.restore_crossinterp_data(xid)
     160  
     161                  # XXX What about between interpreters?
     162                  self.assertIs(got, obj)
     163  
     164      def test_types(self):
     165          self._assert_values([
     166              b'spam',
     167              9999,
     168              ])
     169  
     170      def test_bytes(self):
     171          self._assert_values(i.to_bytes(2, 'little', signed=True)
     172                              for i in range(-1, 258))
     173  
     174      def test_strs(self):
     175          self._assert_values(['hello world', '你好世界', ''])
     176  
     177      def test_int(self):
     178          self._assert_values(itertools.chain(range(-1, 258),
     179                                              [sys.maxsize, -sys.maxsize - 1]))
     180  
     181      def test_non_shareable_int(self):
     182          ints = [
     183              sys.maxsize + 1,
     184              -sys.maxsize - 2,
     185              2**1000,
     186          ]
     187          for i in ints:
     188              with self.subTest(i):
     189                  with self.assertRaises(OverflowError):
     190                      _testcapi.get_crossinterp_data(i)
     191  
     192  
     193  class ESC[4;38;5;81mModuleTests(ESC[4;38;5;149mTestBase):
     194  
     195      def test_import_in_interpreter(self):
     196          _run_output(
     197              interpreters.create(),
     198              'import _xxsubinterpreters as _interpreters',
     199          )
     200  
     201  
     202  ##################################
     203  # interpreter tests
     204  
     205  class ESC[4;38;5;81mListAllTests(ESC[4;38;5;149mTestBase):
     206  
     207      def test_initial(self):
     208          main = interpreters.get_main()
     209          ids = interpreters.list_all()
     210          self.assertEqual(ids, [main])
     211  
     212      def test_after_creating(self):
     213          main = interpreters.get_main()
     214          first = interpreters.create()
     215          second = interpreters.create()
     216          ids = interpreters.list_all()
     217          self.assertEqual(ids, [main, first, second])
     218  
     219      def test_after_destroying(self):
     220          main = interpreters.get_main()
     221          first = interpreters.create()
     222          second = interpreters.create()
     223          interpreters.destroy(first)
     224          ids = interpreters.list_all()
     225          self.assertEqual(ids, [main, second])
     226  
     227  
     228  class ESC[4;38;5;81mGetCurrentTests(ESC[4;38;5;149mTestBase):
     229  
     230      def test_main(self):
     231          main = interpreters.get_main()
     232          cur = interpreters.get_current()
     233          self.assertEqual(cur, main)
     234          self.assertIsInstance(cur, interpreters.InterpreterID)
     235  
     236      def test_subinterpreter(self):
     237          main = interpreters.get_main()
     238          interp = interpreters.create()
     239          out = _run_output(interp, dedent("""
     240              import _xxsubinterpreters as _interpreters
     241              cur = _interpreters.get_current()
     242              print(cur)
     243              assert isinstance(cur, _interpreters.InterpreterID)
     244              """))
     245          cur = int(out.strip())
     246          _, expected = interpreters.list_all()
     247          self.assertEqual(cur, expected)
     248          self.assertNotEqual(cur, main)
     249  
     250  
     251  class ESC[4;38;5;81mGetMainTests(ESC[4;38;5;149mTestBase):
     252  
     253      def test_from_main(self):
     254          [expected] = interpreters.list_all()
     255          main = interpreters.get_main()
     256          self.assertEqual(main, expected)
     257          self.assertIsInstance(main, interpreters.InterpreterID)
     258  
     259      def test_from_subinterpreter(self):
     260          [expected] = interpreters.list_all()
     261          interp = interpreters.create()
     262          out = _run_output(interp, dedent("""
     263              import _xxsubinterpreters as _interpreters
     264              main = _interpreters.get_main()
     265              print(main)
     266              assert isinstance(main, _interpreters.InterpreterID)
     267              """))
     268          main = int(out.strip())
     269          self.assertEqual(main, expected)
     270  
     271  
     272  class ESC[4;38;5;81mIsRunningTests(ESC[4;38;5;149mTestBase):
     273  
     274      def test_main(self):
     275          main = interpreters.get_main()
     276          self.assertTrue(interpreters.is_running(main))
     277  
     278      @unittest.skip('Fails on FreeBSD')
     279      def test_subinterpreter(self):
     280          interp = interpreters.create()
     281          self.assertFalse(interpreters.is_running(interp))
     282  
     283          with _running(interp):
     284              self.assertTrue(interpreters.is_running(interp))
     285          self.assertFalse(interpreters.is_running(interp))
     286  
     287      def test_from_subinterpreter(self):
     288          interp = interpreters.create()
     289          out = _run_output(interp, dedent(f"""
     290              import _xxsubinterpreters as _interpreters
     291              if _interpreters.is_running({interp}):
     292                  print(True)
     293              else:
     294                  print(False)
     295              """))
     296          self.assertEqual(out.strip(), 'True')
     297  
     298      def test_already_destroyed(self):
     299          interp = interpreters.create()
     300          interpreters.destroy(interp)
     301          with self.assertRaises(RuntimeError):
     302              interpreters.is_running(interp)
     303  
     304      def test_does_not_exist(self):
     305          with self.assertRaises(RuntimeError):
     306              interpreters.is_running(1_000_000)
     307  
     308      def test_bad_id(self):
     309          with self.assertRaises(ValueError):
     310              interpreters.is_running(-1)
     311  
     312  
     313  class ESC[4;38;5;81mInterpreterIDTests(ESC[4;38;5;149mTestBase):
     314  
     315      def test_with_int(self):
     316          id = interpreters.InterpreterID(10, force=True)
     317  
     318          self.assertEqual(int(id), 10)
     319  
     320      def test_coerce_id(self):
     321          class ESC[4;38;5;81mInt(ESC[4;38;5;149mstr):
     322              def __index__(self):
     323                  return 10
     324  
     325          id = interpreters.InterpreterID(Int(), force=True)
     326          self.assertEqual(int(id), 10)
     327  
     328      def test_bad_id(self):
     329          self.assertRaises(TypeError, interpreters.InterpreterID, object())
     330          self.assertRaises(TypeError, interpreters.InterpreterID, 10.0)
     331          self.assertRaises(TypeError, interpreters.InterpreterID, '10')
     332          self.assertRaises(TypeError, interpreters.InterpreterID, b'10')
     333          self.assertRaises(ValueError, interpreters.InterpreterID, -1)
     334          self.assertRaises(OverflowError, interpreters.InterpreterID, 2**64)
     335  
     336      def test_does_not_exist(self):
     337          id = interpreters.create()
     338          with self.assertRaises(RuntimeError):
     339              interpreters.InterpreterID(int(id) + 1)  # unforced
     340  
     341      def test_str(self):
     342          id = interpreters.InterpreterID(10, force=True)
     343          self.assertEqual(str(id), '10')
     344  
     345      def test_repr(self):
     346          id = interpreters.InterpreterID(10, force=True)
     347          self.assertEqual(repr(id), 'InterpreterID(10)')
     348  
     349      def test_equality(self):
     350          id1 = interpreters.create()
     351          id2 = interpreters.InterpreterID(int(id1))
     352          id3 = interpreters.create()
     353  
     354          self.assertTrue(id1 == id1)
     355          self.assertTrue(id1 == id2)
     356          self.assertTrue(id1 == int(id1))
     357          self.assertTrue(int(id1) == id1)
     358          self.assertTrue(id1 == float(int(id1)))
     359          self.assertTrue(float(int(id1)) == id1)
     360          self.assertFalse(id1 == float(int(id1)) + 0.1)
     361          self.assertFalse(id1 == str(int(id1)))
     362          self.assertFalse(id1 == 2**1000)
     363          self.assertFalse(id1 == float('inf'))
     364          self.assertFalse(id1 == 'spam')
     365          self.assertFalse(id1 == id3)
     366  
     367          self.assertFalse(id1 != id1)
     368          self.assertFalse(id1 != id2)
     369          self.assertTrue(id1 != id3)
     370  
     371  
     372  class ESC[4;38;5;81mCreateTests(ESC[4;38;5;149mTestBase):
     373  
     374      def test_in_main(self):
     375          id = interpreters.create()
     376          self.assertIsInstance(id, interpreters.InterpreterID)
     377  
     378          self.assertIn(id, interpreters.list_all())
     379  
     380      @unittest.skip('enable this test when working on pystate.c')
     381      def test_unique_id(self):
     382          seen = set()
     383          for _ in range(100):
     384              id = interpreters.create()
     385              interpreters.destroy(id)
     386              seen.add(id)
     387  
     388          self.assertEqual(len(seen), 100)
     389  
     390      def test_in_thread(self):
     391          lock = threading.Lock()
     392          id = None
     393          def f():
     394              nonlocal id
     395              id = interpreters.create()
     396              lock.acquire()
     397              lock.release()
     398  
     399          t = threading.Thread(target=f)
     400          with lock:
     401              t.start()
     402          t.join()
     403          self.assertIn(id, interpreters.list_all())
     404  
     405      def test_in_subinterpreter(self):
     406          main, = interpreters.list_all()
     407          id1 = interpreters.create()
     408          out = _run_output(id1, dedent("""
     409              import _xxsubinterpreters as _interpreters
     410              id = _interpreters.create()
     411              print(id)
     412              assert isinstance(id, _interpreters.InterpreterID)
     413              """))
     414          id2 = int(out.strip())
     415  
     416          self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
     417  
     418      def test_in_threaded_subinterpreter(self):
     419          main, = interpreters.list_all()
     420          id1 = interpreters.create()
     421          id2 = None
     422          def f():
     423              nonlocal id2
     424              out = _run_output(id1, dedent("""
     425                  import _xxsubinterpreters as _interpreters
     426                  id = _interpreters.create()
     427                  print(id)
     428                  """))
     429              id2 = int(out.strip())
     430  
     431          t = threading.Thread(target=f)
     432          t.start()
     433          t.join()
     434  
     435          self.assertEqual(set(interpreters.list_all()), {main, id1, id2})
     436  
     437      def test_after_destroy_all(self):
     438          before = set(interpreters.list_all())
     439          # Create 3 subinterpreters.
     440          ids = []
     441          for _ in range(3):
     442              id = interpreters.create()
     443              ids.append(id)
     444          # Now destroy them.
     445          for id in ids:
     446              interpreters.destroy(id)
     447          # Finally, create another.
     448          id = interpreters.create()
     449          self.assertEqual(set(interpreters.list_all()), before | {id})
     450  
     451      def test_after_destroy_some(self):
     452          before = set(interpreters.list_all())
     453          # Create 3 subinterpreters.
     454          id1 = interpreters.create()
     455          id2 = interpreters.create()
     456          id3 = interpreters.create()
     457          # Now destroy 2 of them.
     458          interpreters.destroy(id1)
     459          interpreters.destroy(id3)
     460          # Finally, create another.
     461          id = interpreters.create()
     462          self.assertEqual(set(interpreters.list_all()), before | {id, id2})
     463  
     464  
     465  class ESC[4;38;5;81mDestroyTests(ESC[4;38;5;149mTestBase):
     466  
     467      def test_one(self):
     468          id1 = interpreters.create()
     469          id2 = interpreters.create()
     470          id3 = interpreters.create()
     471          self.assertIn(id2, interpreters.list_all())
     472          interpreters.destroy(id2)
     473          self.assertNotIn(id2, interpreters.list_all())
     474          self.assertIn(id1, interpreters.list_all())
     475          self.assertIn(id3, interpreters.list_all())
     476  
     477      def test_all(self):
     478          before = set(interpreters.list_all())
     479          ids = set()
     480          for _ in range(3):
     481              id = interpreters.create()
     482              ids.add(id)
     483          self.assertEqual(set(interpreters.list_all()), before | ids)
     484          for id in ids:
     485              interpreters.destroy(id)
     486          self.assertEqual(set(interpreters.list_all()), before)
     487  
     488      def test_main(self):
     489          main, = interpreters.list_all()
     490          with self.assertRaises(RuntimeError):
     491              interpreters.destroy(main)
     492  
     493          def f():
     494              with self.assertRaises(RuntimeError):
     495                  interpreters.destroy(main)
     496  
     497          t = threading.Thread(target=f)
     498          t.start()
     499          t.join()
     500  
     501      def test_already_destroyed(self):
     502          id = interpreters.create()
     503          interpreters.destroy(id)
     504          with self.assertRaises(RuntimeError):
     505              interpreters.destroy(id)
     506  
     507      def test_does_not_exist(self):
     508          with self.assertRaises(RuntimeError):
     509              interpreters.destroy(1_000_000)
     510  
     511      def test_bad_id(self):
     512          with self.assertRaises(ValueError):
     513              interpreters.destroy(-1)
     514  
     515      def test_from_current(self):
     516          main, = interpreters.list_all()
     517          id = interpreters.create()
     518          script = dedent(f"""
     519              import _xxsubinterpreters as _interpreters
     520              try:
     521                  _interpreters.destroy({id})
     522              except RuntimeError:
     523                  pass
     524              """)
     525  
     526          interpreters.run_string(id, script)
     527          self.assertEqual(set(interpreters.list_all()), {main, id})
     528  
     529      def test_from_sibling(self):
     530          main, = interpreters.list_all()
     531          id1 = interpreters.create()
     532          id2 = interpreters.create()
     533          script = dedent(f"""
     534              import _xxsubinterpreters as _interpreters
     535              _interpreters.destroy({id2})
     536              """)
     537          interpreters.run_string(id1, script)
     538  
     539          self.assertEqual(set(interpreters.list_all()), {main, id1})
     540  
     541      def test_from_other_thread(self):
     542          id = interpreters.create()
     543          def f():
     544              interpreters.destroy(id)
     545  
     546          t = threading.Thread(target=f)
     547          t.start()
     548          t.join()
     549  
     550      def test_still_running(self):
     551          main, = interpreters.list_all()
     552          interp = interpreters.create()
     553          with _running(interp):
     554              self.assertTrue(interpreters.is_running(interp),
     555                              msg=f"Interp {interp} should be running before destruction.")
     556  
     557              with self.assertRaises(RuntimeError,
     558                                     msg=f"Should not be able to destroy interp {interp} while it's still running."):
     559                  interpreters.destroy(interp)
     560              self.assertTrue(interpreters.is_running(interp))
     561  
     562  
     563  class ESC[4;38;5;81mRunStringTests(ESC[4;38;5;149mTestBase):
     564  
     565      def setUp(self):
     566          super().setUp()
     567          self.id = interpreters.create()
     568  
     569      def test_success(self):
     570          script, file = _captured_script('print("it worked!", end="")')
     571          with file:
     572              interpreters.run_string(self.id, script)
     573              out = file.read()
     574  
     575          self.assertEqual(out, 'it worked!')
     576  
     577      def test_in_thread(self):
     578          script, file = _captured_script('print("it worked!", end="")')
     579          with file:
     580              def f():
     581                  interpreters.run_string(self.id, script)
     582  
     583              t = threading.Thread(target=f)
     584              t.start()
     585              t.join()
     586              out = file.read()
     587  
     588          self.assertEqual(out, 'it worked!')
     589  
     590      def test_create_thread(self):
     591          subinterp = interpreters.create()
     592          script, file = _captured_script("""
     593              import threading
     594              def f():
     595                  print('it worked!', end='')
     596  
     597              t = threading.Thread(target=f)
     598              t.start()
     599              t.join()
     600              """)
     601          with file:
     602              interpreters.run_string(subinterp, script)
     603              out = file.read()
     604  
     605          self.assertEqual(out, 'it worked!')
     606  
     607      def test_create_daemon_thread(self):
     608          with self.subTest('isolated'):
     609              expected = 'spam spam spam spam spam'
     610              subinterp = interpreters.create(isolated=True)
     611              script, file = _captured_script(f"""
     612                  import threading
     613                  def f():
     614                      print('it worked!', end='')
     615  
     616                  try:
     617                      t = threading.Thread(target=f, daemon=True)
     618                      t.start()
     619                      t.join()
     620                  except RuntimeError:
     621                      print('{expected}', end='')
     622                  """)
     623              with file:
     624                  interpreters.run_string(subinterp, script)
     625                  out = file.read()
     626  
     627              self.assertEqual(out, expected)
     628  
     629          with self.subTest('not isolated'):
     630              subinterp = interpreters.create(isolated=False)
     631              script, file = _captured_script("""
     632                  import threading
     633                  def f():
     634                      print('it worked!', end='')
     635  
     636                  t = threading.Thread(target=f, daemon=True)
     637                  t.start()
     638                  t.join()
     639                  """)
     640              with file:
     641                  interpreters.run_string(subinterp, script)
     642                  out = file.read()
     643  
     644              self.assertEqual(out, 'it worked!')
     645  
     646      def test_shareable_types(self):
     647          interp = interpreters.create()
     648          objects = [
     649              None,
     650              'spam',
     651              b'spam',
     652              42,
     653          ]
     654          for obj in objects:
     655              with self.subTest(obj):
     656                  interpreters.run_string(
     657                      interp,
     658                      f'assert(obj == {obj!r})',
     659                      shared=dict(obj=obj),
     660                  )
     661  
     662      def test_os_exec(self):
     663          expected = 'spam spam spam spam spam'
     664          subinterp = interpreters.create()
     665          script, file = _captured_script(f"""
     666              import os, sys
     667              try:
     668                  os.execl(sys.executable)
     669              except RuntimeError:
     670                  print('{expected}', end='')
     671              """)
     672          with file:
     673              interpreters.run_string(subinterp, script)
     674              out = file.read()
     675  
     676          self.assertEqual(out, expected)
     677  
     678      @support.requires_fork()
     679      def test_fork(self):
     680          import tempfile
     681          with tempfile.NamedTemporaryFile('w+', encoding="utf-8") as file:
     682              file.write('')
     683              file.flush()
     684  
     685              expected = 'spam spam spam spam spam'
     686              script = dedent(f"""
     687                  import os
     688                  try:
     689                      os.fork()
     690                  except RuntimeError:
     691                      with open('{file.name}', 'w', encoding='utf-8') as out:
     692                          out.write('{expected}')
     693                  """)
     694              interpreters.run_string(self.id, script)
     695  
     696              file.seek(0)
     697              content = file.read()
     698              self.assertEqual(content, expected)
     699  
     700      def test_already_running(self):
     701          with _running(self.id):
     702              with self.assertRaises(RuntimeError):
     703                  interpreters.run_string(self.id, 'print("spam")')
     704  
     705      def test_does_not_exist(self):
     706          id = 0
     707          while id in interpreters.list_all():
     708              id += 1
     709          with self.assertRaises(RuntimeError):
     710              interpreters.run_string(id, 'print("spam")')
     711  
     712      def test_error_id(self):
     713          with self.assertRaises(ValueError):
     714              interpreters.run_string(-1, 'print("spam")')
     715  
     716      def test_bad_id(self):
     717          with self.assertRaises(TypeError):
     718              interpreters.run_string('spam', 'print("spam")')
     719  
     720      def test_bad_script(self):
     721          with self.assertRaises(TypeError):
     722              interpreters.run_string(self.id, 10)
     723  
     724      def test_bytes_for_script(self):
     725          with self.assertRaises(TypeError):
     726              interpreters.run_string(self.id, b'print("spam")')
     727  
     728      @contextlib.contextmanager
     729      def assert_run_failed(self, exctype, msg=None):
     730          with self.assertRaises(interpreters.RunFailedError) as caught:
     731              yield
     732          if msg is None:
     733              self.assertEqual(str(caught.exception).split(':')[0],
     734                               str(exctype))
     735          else:
     736              self.assertEqual(str(caught.exception),
     737                               "{}: {}".format(exctype, msg))
     738  
     739      def test_invalid_syntax(self):
     740          with self.assert_run_failed(SyntaxError):
     741              # missing close paren
     742              interpreters.run_string(self.id, 'print("spam"')
     743  
     744      def test_failure(self):
     745          with self.assert_run_failed(Exception, 'spam'):
     746              interpreters.run_string(self.id, 'raise Exception("spam")')
     747  
     748      def test_SystemExit(self):
     749          with self.assert_run_failed(SystemExit, '42'):
     750              interpreters.run_string(self.id, 'raise SystemExit(42)')
     751  
     752      def test_sys_exit(self):
     753          with self.assert_run_failed(SystemExit):
     754              interpreters.run_string(self.id, dedent("""
     755                  import sys
     756                  sys.exit()
     757                  """))
     758  
     759          with self.assert_run_failed(SystemExit, '42'):
     760              interpreters.run_string(self.id, dedent("""
     761                  import sys
     762                  sys.exit(42)
     763                  """))
     764  
     765      def test_with_shared(self):
     766          r, w = os.pipe()
     767  
     768          shared = {
     769                  'spam': b'ham',
     770                  'eggs': b'-1',
     771                  'cheddar': None,
     772                  }
     773          script = dedent(f"""
     774              eggs = int(eggs)
     775              spam = 42
     776              result = spam + eggs
     777  
     778              ns = dict(vars())
     779              del ns['__builtins__']
     780              import pickle
     781              with open({w}, 'wb') as chan:
     782                  pickle.dump(ns, chan)
     783              """)
     784          interpreters.run_string(self.id, script, shared)
     785          with open(r, 'rb') as chan:
     786              ns = pickle.load(chan)
     787  
     788          self.assertEqual(ns['spam'], 42)
     789          self.assertEqual(ns['eggs'], -1)
     790          self.assertEqual(ns['result'], 41)
     791          self.assertIsNone(ns['cheddar'])
     792  
     793      def test_shared_overwrites(self):
     794          interpreters.run_string(self.id, dedent("""
     795              spam = 'eggs'
     796              ns1 = dict(vars())
     797              del ns1['__builtins__']
     798              """))
     799  
     800          shared = {'spam': b'ham'}
     801          script = dedent("""
     802              ns2 = dict(vars())
     803              del ns2['__builtins__']
     804          """)
     805          interpreters.run_string(self.id, script, shared)
     806  
     807          r, w = os.pipe()
     808          script = dedent(f"""
     809              ns = dict(vars())
     810              del ns['__builtins__']
     811              import pickle
     812              with open({w}, 'wb') as chan:
     813                  pickle.dump(ns, chan)
     814              """)
     815          interpreters.run_string(self.id, script)
     816          with open(r, 'rb') as chan:
     817              ns = pickle.load(chan)
     818  
     819          self.assertEqual(ns['ns1']['spam'], 'eggs')
     820          self.assertEqual(ns['ns2']['spam'], b'ham')
     821          self.assertEqual(ns['spam'], b'ham')
     822  
     823      def test_shared_overwrites_default_vars(self):
     824          r, w = os.pipe()
     825  
     826          shared = {'__name__': b'not __main__'}
     827          script = dedent(f"""
     828              spam = 42
     829  
     830              ns = dict(vars())
     831              del ns['__builtins__']
     832              import pickle
     833              with open({w}, 'wb') as chan:
     834                  pickle.dump(ns, chan)
     835              """)
     836          interpreters.run_string(self.id, script, shared)
     837          with open(r, 'rb') as chan:
     838              ns = pickle.load(chan)
     839  
     840          self.assertEqual(ns['__name__'], b'not __main__')
     841  
     842      def test_main_reused(self):
     843          r, w = os.pipe()
     844          interpreters.run_string(self.id, dedent(f"""
     845              spam = True
     846  
     847              ns = dict(vars())
     848              del ns['__builtins__']
     849              import pickle
     850              with open({w}, 'wb') as chan:
     851                  pickle.dump(ns, chan)
     852              del ns, pickle, chan
     853              """))
     854          with open(r, 'rb') as chan:
     855              ns1 = pickle.load(chan)
     856  
     857          r, w = os.pipe()
     858          interpreters.run_string(self.id, dedent(f"""
     859              eggs = False
     860  
     861              ns = dict(vars())
     862              del ns['__builtins__']
     863              import pickle
     864              with open({w}, 'wb') as chan:
     865                  pickle.dump(ns, chan)
     866              """))
     867          with open(r, 'rb') as chan:
     868              ns2 = pickle.load(chan)
     869  
     870          self.assertIn('spam', ns1)
     871          self.assertNotIn('eggs', ns1)
     872          self.assertIn('eggs', ns2)
     873          self.assertIn('spam', ns2)
     874  
     875      def test_execution_namespace_is_main(self):
     876          r, w = os.pipe()
     877  
     878          script = dedent(f"""
     879              spam = 42
     880  
     881              ns = dict(vars())
     882              ns['__builtins__'] = str(ns['__builtins__'])
     883              import pickle
     884              with open({w}, 'wb') as chan:
     885                  pickle.dump(ns, chan)
     886              """)
     887          interpreters.run_string(self.id, script)
     888          with open(r, 'rb') as chan:
     889              ns = pickle.load(chan)
     890  
     891          ns.pop('__builtins__')
     892          ns.pop('__loader__')
     893          self.assertEqual(ns, {
     894              '__name__': '__main__',
     895              '__annotations__': {},
     896              '__doc__': None,
     897              '__package__': None,
     898              '__spec__': None,
     899              'spam': 42,
     900              })
     901  
     902      # XXX Fix this test!
     903      @unittest.skip('blocking forever')
     904      def test_still_running_at_exit(self):
     905          script = dedent("""
     906          from textwrap import dedent
     907          import threading
     908          import _xxsubinterpreters as _interpreters
     909          id = _interpreters.create()
     910          def f():
     911              _interpreters.run_string(id, dedent('''
     912                  import time
     913                  # Give plenty of time for the main interpreter to finish.
     914                  time.sleep(1_000_000)
     915                  '''))
     916  
     917          t = threading.Thread(target=f)
     918          t.start()
     919          """)
     920          with support.temp_dir() as dirname:
     921              filename = script_helper.make_script(dirname, 'interp', script)
     922              with script_helper.spawn_python(filename) as proc:
     923                  retcode = proc.wait()
     924  
     925          self.assertEqual(retcode, 0)
     926  
     927  
     928  if __name__ == '__main__':
     929      unittest.main()