python (3.12.0)

(root)/
lib/
python3.12/
test/
test_interpreters.py
       1  import contextlib
       2  import os
       3  import threading
       4  from textwrap import dedent
       5  import unittest
       6  import time
       7  
       8  from test import support
       9  from test.support import import_helper
      10  from test.support import threading_helper
      11  _interpreters = import_helper.import_module('_xxsubinterpreters')
      12  _channels = import_helper.import_module('_xxinterpchannels')
      13  from test.support import interpreters
      14  
      15  
      16  def _captured_script(script):
      17      r, w = os.pipe()
      18      indented = script.replace('\n', '\n                ')
      19      wrapped = dedent(f"""
      20          import contextlib
      21          with open({w}, 'w', encoding='utf-8') as spipe:
      22              with contextlib.redirect_stdout(spipe):
      23                  {indented}
      24          """)
      25      return wrapped, open(r, encoding='utf-8')
      26  
      27  
      28  def clean_up_interpreters():
      29      for interp in interpreters.list_all():
      30          if interp.id == 0:  # main
      31              continue
      32          try:
      33              interp.close()
      34          except RuntimeError:
      35              pass  # already destroyed
      36  
      37  
      38  def _run_output(interp, request, channels=None):
      39      script, rpipe = _captured_script(request)
      40      with rpipe:
      41          interp.run(script, channels=channels)
      42          return rpipe.read()
      43  
      44  
      45  @contextlib.contextmanager
      46  def _running(interp):
      47      r, w = os.pipe()
      48      def run():
      49          interp.run(dedent(f"""
      50              # wait for "signal"
      51              with open({r}) as rpipe:
      52                  rpipe.read()
      53              """))
      54  
      55      t = threading.Thread(target=run)
      56      t.start()
      57  
      58      yield
      59  
      60      with open(w, 'w') as spipe:
      61          spipe.write('done')
      62      t.join()
      63  
      64  
      65  class ESC[4;38;5;81mTestBase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      66  
      67      def tearDown(self):
      68          clean_up_interpreters()
      69  
      70  
      71  class ESC[4;38;5;81mCreateTests(ESC[4;38;5;149mTestBase):
      72  
      73      def test_in_main(self):
      74          interp = interpreters.create()
      75          self.assertIsInstance(interp, interpreters.Interpreter)
      76          self.assertIn(interp, interpreters.list_all())
      77  
      78      def test_in_thread(self):
      79          lock = threading.Lock()
      80          interp = None
      81          def f():
      82              nonlocal interp
      83              interp = interpreters.create()
      84              lock.acquire()
      85              lock.release()
      86          t = threading.Thread(target=f)
      87          with lock:
      88              t.start()
      89          t.join()
      90          self.assertIn(interp, interpreters.list_all())
      91  
      92      def test_in_subinterpreter(self):
      93          main, = interpreters.list_all()
      94          interp = interpreters.create()
      95          out = _run_output(interp, dedent("""
      96              from test.support import interpreters
      97              interp = interpreters.create()
      98              print(interp.id)
      99              """))
     100          interp2 = interpreters.Interpreter(int(out))
     101          self.assertEqual(interpreters.list_all(), [main, interp, interp2])
     102  
     103      def test_after_destroy_all(self):
     104          before = set(interpreters.list_all())
     105          # Create 3 subinterpreters.
     106          interp_lst = []
     107          for _ in range(3):
     108              interps = interpreters.create()
     109              interp_lst.append(interps)
     110          # Now destroy them.
     111          for interp in interp_lst:
     112              interp.close()
     113          # Finally, create another.
     114          interp = interpreters.create()
     115          self.assertEqual(set(interpreters.list_all()), before | {interp})
     116  
     117      def test_after_destroy_some(self):
     118          before = set(interpreters.list_all())
     119          # Create 3 subinterpreters.
     120          interp1 = interpreters.create()
     121          interp2 = interpreters.create()
     122          interp3 = interpreters.create()
     123          # Now destroy 2 of them.
     124          interp1.close()
     125          interp2.close()
     126          # Finally, create another.
     127          interp = interpreters.create()
     128          self.assertEqual(set(interpreters.list_all()), before | {interp3, interp})
     129  
     130  
     131  class ESC[4;38;5;81mGetCurrentTests(ESC[4;38;5;149mTestBase):
     132  
     133      def test_main(self):
     134          main = interpreters.get_main()
     135          current = interpreters.get_current()
     136          self.assertEqual(current, main)
     137  
     138      def test_subinterpreter(self):
     139          main = _interpreters.get_main()
     140          interp = interpreters.create()
     141          out = _run_output(interp, dedent("""
     142              from test.support import interpreters
     143              cur = interpreters.get_current()
     144              print(cur.id)
     145              """))
     146          current = interpreters.Interpreter(int(out))
     147          self.assertNotEqual(current, main)
     148  
     149  
     150  class ESC[4;38;5;81mListAllTests(ESC[4;38;5;149mTestBase):
     151  
     152      def test_initial(self):
     153          interps = interpreters.list_all()
     154          self.assertEqual(1, len(interps))
     155  
     156      def test_after_creating(self):
     157          main = interpreters.get_current()
     158          first = interpreters.create()
     159          second = interpreters.create()
     160  
     161          ids = []
     162          for interp in interpreters.list_all():
     163              ids.append(interp.id)
     164  
     165          self.assertEqual(ids, [main.id, first.id, second.id])
     166  
     167      def test_after_destroying(self):
     168          main = interpreters.get_current()
     169          first = interpreters.create()
     170          second = interpreters.create()
     171          first.close()
     172  
     173          ids = []
     174          for interp in interpreters.list_all():
     175              ids.append(interp.id)
     176  
     177          self.assertEqual(ids, [main.id, second.id])
     178  
     179  
     180  class ESC[4;38;5;81mTestInterpreterAttrs(ESC[4;38;5;149mTestBase):
     181  
     182      def test_id_type(self):
     183          main = interpreters.get_main()
     184          current = interpreters.get_current()
     185          interp = interpreters.create()
     186          self.assertIsInstance(main.id, _interpreters.InterpreterID)
     187          self.assertIsInstance(current.id, _interpreters.InterpreterID)
     188          self.assertIsInstance(interp.id, _interpreters.InterpreterID)
     189  
     190      def test_main_id(self):
     191          main = interpreters.get_main()
     192          self.assertEqual(main.id, 0)
     193  
     194      def test_custom_id(self):
     195          interp = interpreters.Interpreter(1)
     196          self.assertEqual(interp.id, 1)
     197  
     198          with self.assertRaises(TypeError):
     199              interpreters.Interpreter('1')
     200  
     201      def test_id_readonly(self):
     202          interp = interpreters.Interpreter(1)
     203          with self.assertRaises(AttributeError):
     204              interp.id = 2
     205  
     206      @unittest.skip('not ready yet (see bpo-32604)')
     207      def test_main_isolated(self):
     208          main = interpreters.get_main()
     209          self.assertFalse(main.isolated)
     210  
     211      @unittest.skip('not ready yet (see bpo-32604)')
     212      def test_subinterpreter_isolated_default(self):
     213          interp = interpreters.create()
     214          self.assertFalse(interp.isolated)
     215  
     216      def test_subinterpreter_isolated_explicit(self):
     217          interp1 = interpreters.create(isolated=True)
     218          interp2 = interpreters.create(isolated=False)
     219          self.assertTrue(interp1.isolated)
     220          self.assertFalse(interp2.isolated)
     221  
     222      @unittest.skip('not ready yet (see bpo-32604)')
     223      def test_custom_isolated_default(self):
     224          interp = interpreters.Interpreter(1)
     225          self.assertFalse(interp.isolated)
     226  
     227      def test_custom_isolated_explicit(self):
     228          interp1 = interpreters.Interpreter(1, isolated=True)
     229          interp2 = interpreters.Interpreter(1, isolated=False)
     230          self.assertTrue(interp1.isolated)
     231          self.assertFalse(interp2.isolated)
     232  
     233      def test_isolated_readonly(self):
     234          interp = interpreters.Interpreter(1)
     235          with self.assertRaises(AttributeError):
     236              interp.isolated = True
     237  
     238      def test_equality(self):
     239          interp1 = interpreters.create()
     240          interp2 = interpreters.create()
     241          self.assertEqual(interp1, interp1)
     242          self.assertNotEqual(interp1, interp2)
     243  
     244  
     245  class ESC[4;38;5;81mTestInterpreterIsRunning(ESC[4;38;5;149mTestBase):
     246  
     247      def test_main(self):
     248          main = interpreters.get_main()
     249          self.assertTrue(main.is_running())
     250  
     251      @unittest.skip('Fails on FreeBSD')
     252      def test_subinterpreter(self):
     253          interp = interpreters.create()
     254          self.assertFalse(interp.is_running())
     255  
     256          with _running(interp):
     257              self.assertTrue(interp.is_running())
     258          self.assertFalse(interp.is_running())
     259  
     260      def test_from_subinterpreter(self):
     261          interp = interpreters.create()
     262          out = _run_output(interp, dedent(f"""
     263              import _xxsubinterpreters as _interpreters
     264              if _interpreters.is_running({interp.id}):
     265                  print(True)
     266              else:
     267                  print(False)
     268              """))
     269          self.assertEqual(out.strip(), 'True')
     270  
     271      def test_already_destroyed(self):
     272          interp = interpreters.create()
     273          interp.close()
     274          with self.assertRaises(RuntimeError):
     275              interp.is_running()
     276  
     277      def test_does_not_exist(self):
     278          interp = interpreters.Interpreter(1_000_000)
     279          with self.assertRaises(RuntimeError):
     280              interp.is_running()
     281  
     282      def test_bad_id(self):
     283          interp = interpreters.Interpreter(-1)
     284          with self.assertRaises(ValueError):
     285              interp.is_running()
     286  
     287  
     288  class ESC[4;38;5;81mTestInterpreterClose(ESC[4;38;5;149mTestBase):
     289  
     290      def test_basic(self):
     291          main = interpreters.get_main()
     292          interp1 = interpreters.create()
     293          interp2 = interpreters.create()
     294          interp3 = interpreters.create()
     295          self.assertEqual(set(interpreters.list_all()),
     296                           {main, interp1, interp2, interp3})
     297          interp2.close()
     298          self.assertEqual(set(interpreters.list_all()),
     299                           {main, interp1, interp3})
     300  
     301      def test_all(self):
     302          before = set(interpreters.list_all())
     303          interps = set()
     304          for _ in range(3):
     305              interp = interpreters.create()
     306              interps.add(interp)
     307          self.assertEqual(set(interpreters.list_all()), before | interps)
     308          for interp in interps:
     309              interp.close()
     310          self.assertEqual(set(interpreters.list_all()), before)
     311  
     312      def test_main(self):
     313          main, = interpreters.list_all()
     314          with self.assertRaises(RuntimeError):
     315              main.close()
     316  
     317          def f():
     318              with self.assertRaises(RuntimeError):
     319                  main.close()
     320  
     321          t = threading.Thread(target=f)
     322          t.start()
     323          t.join()
     324  
     325      def test_already_destroyed(self):
     326          interp = interpreters.create()
     327          interp.close()
     328          with self.assertRaises(RuntimeError):
     329              interp.close()
     330  
     331      def test_does_not_exist(self):
     332          interp = interpreters.Interpreter(1_000_000)
     333          with self.assertRaises(RuntimeError):
     334              interp.close()
     335  
     336      def test_bad_id(self):
     337          interp = interpreters.Interpreter(-1)
     338          with self.assertRaises(ValueError):
     339              interp.close()
     340  
     341      def test_from_current(self):
     342          main, = interpreters.list_all()
     343          interp = interpreters.create()
     344          out = _run_output(interp, dedent(f"""
     345              from test.support import interpreters
     346              interp = interpreters.Interpreter({int(interp.id)})
     347              try:
     348                  interp.close()
     349              except RuntimeError:
     350                  print('failed')
     351              """))
     352          self.assertEqual(out.strip(), 'failed')
     353          self.assertEqual(set(interpreters.list_all()), {main, interp})
     354  
     355      def test_from_sibling(self):
     356          main, = interpreters.list_all()
     357          interp1 = interpreters.create()
     358          interp2 = interpreters.create()
     359          self.assertEqual(set(interpreters.list_all()),
     360                           {main, interp1, interp2})
     361          interp1.run(dedent(f"""
     362              from test.support import interpreters
     363              interp2 = interpreters.Interpreter(int({interp2.id}))
     364              interp2.close()
     365              interp3 = interpreters.create()
     366              interp3.close()
     367              """))
     368          self.assertEqual(set(interpreters.list_all()), {main, interp1})
     369  
     370      def test_from_other_thread(self):
     371          interp = interpreters.create()
     372          def f():
     373              interp.close()
     374  
     375          t = threading.Thread(target=f)
     376          t.start()
     377          t.join()
     378  
     379      @unittest.skip('Fails on FreeBSD')
     380      def test_still_running(self):
     381          main, = interpreters.list_all()
     382          interp = interpreters.create()
     383          with _running(interp):
     384              with self.assertRaises(RuntimeError):
     385                  interp.close()
     386              self.assertTrue(interp.is_running())
     387  
     388  
     389  class ESC[4;38;5;81mTestInterpreterRun(ESC[4;38;5;149mTestBase):
     390  
     391      def test_success(self):
     392          interp = interpreters.create()
     393          script, file = _captured_script('print("it worked!", end="")')
     394          with file:
     395              interp.run(script)
     396              out = file.read()
     397  
     398          self.assertEqual(out, 'it worked!')
     399  
     400      def test_in_thread(self):
     401          interp = interpreters.create()
     402          script, file = _captured_script('print("it worked!", end="")')
     403          with file:
     404              def f():
     405                  interp.run(script)
     406  
     407              t = threading.Thread(target=f)
     408              t.start()
     409              t.join()
     410              out = file.read()
     411  
     412          self.assertEqual(out, 'it worked!')
     413  
     414      @support.requires_fork()
     415      def test_fork(self):
     416          interp = interpreters.create()
     417          import tempfile
     418          with tempfile.NamedTemporaryFile('w+', encoding='utf-8') as file:
     419              file.write('')
     420              file.flush()
     421  
     422              expected = 'spam spam spam spam spam'
     423              script = dedent(f"""
     424                  import os
     425                  try:
     426                      os.fork()
     427                  except RuntimeError:
     428                      with open('{file.name}', 'w', encoding='utf-8') as out:
     429                          out.write('{expected}')
     430                  """)
     431              interp.run(script)
     432  
     433              file.seek(0)
     434              content = file.read()
     435              self.assertEqual(content, expected)
     436  
     437      @unittest.skip('Fails on FreeBSD')
     438      def test_already_running(self):
     439          interp = interpreters.create()
     440          with _running(interp):
     441              with self.assertRaises(RuntimeError):
     442                  interp.run('print("spam")')
     443  
     444      def test_does_not_exist(self):
     445          interp = interpreters.Interpreter(1_000_000)
     446          with self.assertRaises(RuntimeError):
     447              interp.run('print("spam")')
     448  
     449      def test_bad_id(self):
     450          interp = interpreters.Interpreter(-1)
     451          with self.assertRaises(ValueError):
     452              interp.run('print("spam")')
     453  
     454      def test_bad_script(self):
     455          interp = interpreters.create()
     456          with self.assertRaises(TypeError):
     457              interp.run(10)
     458  
     459      def test_bytes_for_script(self):
     460          interp = interpreters.create()
     461          with self.assertRaises(TypeError):
     462              interp.run(b'print("spam")')
     463  
     464      # test_xxsubinterpreters covers the remaining Interpreter.run() behavior.
     465  
     466  
     467  @unittest.skip('these are crashing, likely just due just to _xxsubinterpreters (see gh-105699)')
     468  class ESC[4;38;5;81mStressTests(ESC[4;38;5;149mTestBase):
     469  
     470      # In these tests we generally want a lot of interpreters,
     471      # but not so many that any test takes too long.
     472  
     473      @support.requires_resource('cpu')
     474      def test_create_many_sequential(self):
     475          alive = []
     476          for _ in range(100):
     477              interp = interpreters.create()
     478              alive.append(interp)
     479  
     480      @support.requires_resource('cpu')
     481      def test_create_many_threaded(self):
     482          alive = []
     483          def task():
     484              interp = interpreters.create()
     485              alive.append(interp)
     486          threads = (threading.Thread(target=task) for _ in range(200))
     487          with threading_helper.start_threads(threads):
     488              pass
     489  
     490  
     491  class ESC[4;38;5;81mTestIsShareable(ESC[4;38;5;149mTestBase):
     492  
     493      def test_default_shareables(self):
     494          shareables = [
     495                  # singletons
     496                  None,
     497                  # builtin objects
     498                  b'spam',
     499                  'spam',
     500                  10,
     501                  -10,
     502                  ]
     503          for obj in shareables:
     504              with self.subTest(obj):
     505                  shareable = interpreters.is_shareable(obj)
     506                  self.assertTrue(shareable)
     507  
     508      def test_not_shareable(self):
     509          class ESC[4;38;5;81mCheese:
     510              def __init__(self, name):
     511                  self.name = name
     512              def __str__(self):
     513                  return self.name
     514  
     515          class ESC[4;38;5;81mSubBytes(ESC[4;38;5;149mbytes):
     516              """A subclass of a shareable type."""
     517  
     518          not_shareables = [
     519                  # singletons
     520                  True,
     521                  False,
     522                  NotImplemented,
     523                  ...,
     524                  # builtin types and objects
     525                  type,
     526                  object,
     527                  object(),
     528                  Exception(),
     529                  100.0,
     530                  # user-defined types and objects
     531                  Cheese,
     532                  Cheese('Wensleydale'),
     533                  SubBytes(b'spam'),
     534                  ]
     535          for obj in not_shareables:
     536              with self.subTest(repr(obj)):
     537                  self.assertFalse(
     538                      interpreters.is_shareable(obj))
     539  
     540  
     541  class ESC[4;38;5;81mTestChannels(ESC[4;38;5;149mTestBase):
     542  
     543      def test_create(self):
     544          r, s = interpreters.create_channel()
     545          self.assertIsInstance(r, interpreters.RecvChannel)
     546          self.assertIsInstance(s, interpreters.SendChannel)
     547  
     548      def test_list_all(self):
     549          self.assertEqual(interpreters.list_all_channels(), [])
     550          created = set()
     551          for _ in range(3):
     552              ch = interpreters.create_channel()
     553              created.add(ch)
     554          after = set(interpreters.list_all_channels())
     555          self.assertEqual(after, created)
     556  
     557  
     558  class ESC[4;38;5;81mTestRecvChannelAttrs(ESC[4;38;5;149mTestBase):
     559  
     560      def test_id_type(self):
     561          rch, _ = interpreters.create_channel()
     562          self.assertIsInstance(rch.id, _channels.ChannelID)
     563  
     564      def test_custom_id(self):
     565          rch = interpreters.RecvChannel(1)
     566          self.assertEqual(rch.id, 1)
     567  
     568          with self.assertRaises(TypeError):
     569              interpreters.RecvChannel('1')
     570  
     571      def test_id_readonly(self):
     572          rch = interpreters.RecvChannel(1)
     573          with self.assertRaises(AttributeError):
     574              rch.id = 2
     575  
     576      def test_equality(self):
     577          ch1, _ = interpreters.create_channel()
     578          ch2, _ = interpreters.create_channel()
     579          self.assertEqual(ch1, ch1)
     580          self.assertNotEqual(ch1, ch2)
     581  
     582  
     583  class ESC[4;38;5;81mTestSendChannelAttrs(ESC[4;38;5;149mTestBase):
     584  
     585      def test_id_type(self):
     586          _, sch = interpreters.create_channel()
     587          self.assertIsInstance(sch.id, _channels.ChannelID)
     588  
     589      def test_custom_id(self):
     590          sch = interpreters.SendChannel(1)
     591          self.assertEqual(sch.id, 1)
     592  
     593          with self.assertRaises(TypeError):
     594              interpreters.SendChannel('1')
     595  
     596      def test_id_readonly(self):
     597          sch = interpreters.SendChannel(1)
     598          with self.assertRaises(AttributeError):
     599              sch.id = 2
     600  
     601      def test_equality(self):
     602          _, ch1 = interpreters.create_channel()
     603          _, ch2 = interpreters.create_channel()
     604          self.assertEqual(ch1, ch1)
     605          self.assertNotEqual(ch1, ch2)
     606  
     607  
     608  class ESC[4;38;5;81mTestSendRecv(ESC[4;38;5;149mTestBase):
     609  
     610      def test_send_recv_main(self):
     611          r, s = interpreters.create_channel()
     612          orig = b'spam'
     613          s.send_nowait(orig)
     614          obj = r.recv()
     615  
     616          self.assertEqual(obj, orig)
     617          self.assertIsNot(obj, orig)
     618  
     619      def test_send_recv_same_interpreter(self):
     620          interp = interpreters.create()
     621          interp.run(dedent("""
     622              from test.support import interpreters
     623              r, s = interpreters.create_channel()
     624              orig = b'spam'
     625              s.send_nowait(orig)
     626              obj = r.recv()
     627              assert obj == orig, 'expected: obj == orig'
     628              assert obj is not orig, 'expected: obj is not orig'
     629              """))
     630  
     631      @unittest.skip('broken (see BPO-...)')
     632      def test_send_recv_different_interpreters(self):
     633          r1, s1 = interpreters.create_channel()
     634          r2, s2 = interpreters.create_channel()
     635          orig1 = b'spam'
     636          s1.send_nowait(orig1)
     637          out = _run_output(
     638              interpreters.create(),
     639              dedent(f"""
     640                  obj1 = r.recv()
     641                  assert obj1 == b'spam', 'expected: obj1 == orig1'
     642                  # When going to another interpreter we get a copy.
     643                  assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
     644                  orig2 = b'eggs'
     645                  print(id(orig2))
     646                  s.send_nowait(orig2)
     647                  """),
     648              channels=dict(r=r1, s=s2),
     649              )
     650          obj2 = r2.recv()
     651  
     652          self.assertEqual(obj2, b'eggs')
     653          self.assertNotEqual(id(obj2), int(out))
     654  
     655      def test_send_recv_different_threads(self):
     656          r, s = interpreters.create_channel()
     657  
     658          def f():
     659              while True:
     660                  try:
     661                      obj = r.recv()
     662                      break
     663                  except interpreters.ChannelEmptyError:
     664                      time.sleep(0.1)
     665              s.send(obj)
     666          t = threading.Thread(target=f)
     667          t.start()
     668  
     669          orig = b'spam'
     670          s.send(orig)
     671          t.join()
     672          obj = r.recv()
     673  
     674          self.assertEqual(obj, orig)
     675          self.assertIsNot(obj, orig)
     676  
     677      def test_send_recv_nowait_main(self):
     678          r, s = interpreters.create_channel()
     679          orig = b'spam'
     680          s.send_nowait(orig)
     681          obj = r.recv_nowait()
     682  
     683          self.assertEqual(obj, orig)
     684          self.assertIsNot(obj, orig)
     685  
     686      def test_send_recv_nowait_main_with_default(self):
     687          r, _ = interpreters.create_channel()
     688          obj = r.recv_nowait(None)
     689  
     690          self.assertIsNone(obj)
     691  
     692      def test_send_recv_nowait_same_interpreter(self):
     693          interp = interpreters.create()
     694          interp.run(dedent("""
     695              from test.support import interpreters
     696              r, s = interpreters.create_channel()
     697              orig = b'spam'
     698              s.send_nowait(orig)
     699              obj = r.recv_nowait()
     700              assert obj == orig, 'expected: obj == orig'
     701              # When going back to the same interpreter we get the same object.
     702              assert obj is not orig, 'expected: obj is not orig'
     703              """))
     704  
     705      @unittest.skip('broken (see BPO-...)')
     706      def test_send_recv_nowait_different_interpreters(self):
     707          r1, s1 = interpreters.create_channel()
     708          r2, s2 = interpreters.create_channel()
     709          orig1 = b'spam'
     710          s1.send_nowait(orig1)
     711          out = _run_output(
     712              interpreters.create(),
     713              dedent(f"""
     714                  obj1 = r.recv_nowait()
     715                  assert obj1 == b'spam', 'expected: obj1 == orig1'
     716                  # When going to another interpreter we get a copy.
     717                  assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
     718                  orig2 = b'eggs'
     719                  print(id(orig2))
     720                  s.send_nowait(orig2)
     721                  """),
     722              channels=dict(r=r1, s=s2),
     723              )
     724          obj2 = r2.recv_nowait()
     725  
     726          self.assertEqual(obj2, b'eggs')
     727          self.assertNotEqual(id(obj2), int(out))
     728  
     729      def test_recv_channel_does_not_exist(self):
     730          ch = interpreters.RecvChannel(1_000_000)
     731          with self.assertRaises(interpreters.ChannelNotFoundError):
     732              ch.recv()
     733  
     734      def test_send_channel_does_not_exist(self):
     735          ch = interpreters.SendChannel(1_000_000)
     736          with self.assertRaises(interpreters.ChannelNotFoundError):
     737              ch.send(b'spam')
     738  
     739      def test_recv_nowait_channel_does_not_exist(self):
     740          ch = interpreters.RecvChannel(1_000_000)
     741          with self.assertRaises(interpreters.ChannelNotFoundError):
     742              ch.recv_nowait()
     743  
     744      def test_send_nowait_channel_does_not_exist(self):
     745          ch = interpreters.SendChannel(1_000_000)
     746          with self.assertRaises(interpreters.ChannelNotFoundError):
     747              ch.send_nowait(b'spam')
     748  
     749      def test_recv_nowait_empty(self):
     750          ch, _ = interpreters.create_channel()
     751          with self.assertRaises(interpreters.ChannelEmptyError):
     752              ch.recv_nowait()
     753  
     754      def test_recv_nowait_default(self):
     755          default = object()
     756          rch, sch = interpreters.create_channel()
     757          obj1 = rch.recv_nowait(default)
     758          sch.send_nowait(None)
     759          sch.send_nowait(1)
     760          sch.send_nowait(b'spam')
     761          sch.send_nowait(b'eggs')
     762          obj2 = rch.recv_nowait(default)
     763          obj3 = rch.recv_nowait(default)
     764          obj4 = rch.recv_nowait()
     765          obj5 = rch.recv_nowait(default)
     766          obj6 = rch.recv_nowait(default)
     767  
     768          self.assertIs(obj1, default)
     769          self.assertIs(obj2, None)
     770          self.assertEqual(obj3, 1)
     771          self.assertEqual(obj4, b'spam')
     772          self.assertEqual(obj5, b'eggs')
     773          self.assertIs(obj6, default)