python (3.11.7)

(root)/
lib/
python3.11/
test/
test_interpreters.py
       1  import contextlib
       2  import os
       3  import threading
       4  from textwrap import dedent
       5  import unittest
       6  import time
       7  
       8  from test import support
       9  from test.support import import_helper
      10  _interpreters = import_helper.import_module('_xxsubinterpreters')
      11  from test.support import interpreters
      12  
      13  
      14  def _captured_script(script):
      15      r, w = os.pipe()
      16      indented = script.replace('\n', '\n                ')
      17      wrapped = dedent(f"""
      18          import contextlib
      19          with open({w}, 'w', encoding='utf-8') as spipe:
      20              with contextlib.redirect_stdout(spipe):
      21                  {indented}
      22          """)
      23      return wrapped, open(r, encoding='utf-8')
      24  
      25  
      26  def clean_up_interpreters():
      27      for interp in interpreters.list_all():
      28          if interp.id == 0:  # main
      29              continue
      30          try:
      31              interp.close()
      32          except RuntimeError:
      33              pass  # already destroyed
      34  
      35  
      36  def _run_output(interp, request, channels=None):
      37      script, rpipe = _captured_script(request)
      38      with rpipe:
      39          interp.run(script, channels=channels)
      40          return rpipe.read()
      41  
      42  
      43  @contextlib.contextmanager
      44  def _running(interp):
      45      r, w = os.pipe()
      46      def run():
      47          interp.run(dedent(f"""
      48              # wait for "signal"
      49              with open({r}) as rpipe:
      50                  rpipe.read()
      51              """))
      52  
      53      t = threading.Thread(target=run)
      54      t.start()
      55  
      56      yield
      57  
      58      with open(w, 'w') as spipe:
      59          spipe.write('done')
      60      t.join()
      61  
      62  
      63  class ESC[4;38;5;81mTestBase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
      64  
      65      def tearDown(self):
      66          clean_up_interpreters()
      67  
      68  
      69  class ESC[4;38;5;81mCreateTests(ESC[4;38;5;149mTestBase):
      70  
      71      def test_in_main(self):
      72          interp = interpreters.create()
      73          self.assertIsInstance(interp, interpreters.Interpreter)
      74          self.assertIn(interp, interpreters.list_all())
      75  
      76      def test_in_thread(self):
      77          lock = threading.Lock()
      78          interp = None
      79          def f():
      80              nonlocal interp
      81              interp = interpreters.create()
      82              lock.acquire()
      83              lock.release()
      84          t = threading.Thread(target=f)
      85          with lock:
      86              t.start()
      87          t.join()
      88          self.assertIn(interp, interpreters.list_all())
      89  
      90      def test_in_subinterpreter(self):
      91          main, = interpreters.list_all()
      92          interp = interpreters.create()
      93          out = _run_output(interp, dedent("""
      94              from test.support import interpreters
      95              interp = interpreters.create()
      96              print(interp.id)
      97              """))
      98          interp2 = interpreters.Interpreter(int(out))
      99          self.assertEqual(interpreters.list_all(), [main, interp, interp2])
     100  
     101      def test_after_destroy_all(self):
     102          before = set(interpreters.list_all())
     103          # Create 3 subinterpreters.
     104          interp_lst = []
     105          for _ in range(3):
     106              interps = interpreters.create()
     107              interp_lst.append(interps)
     108          # Now destroy them.
     109          for interp in interp_lst:
     110              interp.close()
     111          # Finally, create another.
     112          interp = interpreters.create()
     113          self.assertEqual(set(interpreters.list_all()), before | {interp})
     114  
     115      def test_after_destroy_some(self):
     116          before = set(interpreters.list_all())
     117          # Create 3 subinterpreters.
     118          interp1 = interpreters.create()
     119          interp2 = interpreters.create()
     120          interp3 = interpreters.create()
     121          # Now destroy 2 of them.
     122          interp1.close()
     123          interp2.close()
     124          # Finally, create another.
     125          interp = interpreters.create()
     126          self.assertEqual(set(interpreters.list_all()), before | {interp3, interp})
     127  
     128  
     129  class ESC[4;38;5;81mGetCurrentTests(ESC[4;38;5;149mTestBase):
     130  
     131      def test_main(self):
     132          main = interpreters.get_main()
     133          current = interpreters.get_current()
     134          self.assertEqual(current, main)
     135  
     136      def test_subinterpreter(self):
     137          main = _interpreters.get_main()
     138          interp = interpreters.create()
     139          out = _run_output(interp, dedent("""
     140              from test.support import interpreters
     141              cur = interpreters.get_current()
     142              print(cur.id)
     143              """))
     144          current = interpreters.Interpreter(int(out))
     145          self.assertNotEqual(current, main)
     146  
     147  
     148  class ESC[4;38;5;81mListAllTests(ESC[4;38;5;149mTestBase):
     149  
     150      def test_initial(self):
     151          interps = interpreters.list_all()
     152          self.assertEqual(1, len(interps))
     153  
     154      def test_after_creating(self):
     155          main = interpreters.get_current()
     156          first = interpreters.create()
     157          second = interpreters.create()
     158  
     159          ids = []
     160          for interp in interpreters.list_all():
     161              ids.append(interp.id)
     162  
     163          self.assertEqual(ids, [main.id, first.id, second.id])
     164  
     165      def test_after_destroying(self):
     166          main = interpreters.get_current()
     167          first = interpreters.create()
     168          second = interpreters.create()
     169          first.close()
     170  
     171          ids = []
     172          for interp in interpreters.list_all():
     173              ids.append(interp.id)
     174  
     175          self.assertEqual(ids, [main.id, second.id])
     176  
     177  
     178  class ESC[4;38;5;81mTestInterpreterAttrs(ESC[4;38;5;149mTestBase):
     179  
     180      def test_id_type(self):
     181          main = interpreters.get_main()
     182          current = interpreters.get_current()
     183          interp = interpreters.create()
     184          self.assertIsInstance(main.id, _interpreters.InterpreterID)
     185          self.assertIsInstance(current.id, _interpreters.InterpreterID)
     186          self.assertIsInstance(interp.id, _interpreters.InterpreterID)
     187  
     188      def test_main_id(self):
     189          main = interpreters.get_main()
     190          self.assertEqual(main.id, 0)
     191  
     192      def test_custom_id(self):
     193          interp = interpreters.Interpreter(1)
     194          self.assertEqual(interp.id, 1)
     195  
     196          with self.assertRaises(TypeError):
     197              interpreters.Interpreter('1')
     198  
     199      def test_id_readonly(self):
     200          interp = interpreters.Interpreter(1)
     201          with self.assertRaises(AttributeError):
     202              interp.id = 2
     203  
     204      @unittest.skip('not ready yet (see bpo-32604)')
     205      def test_main_isolated(self):
     206          main = interpreters.get_main()
     207          self.assertFalse(main.isolated)
     208  
     209      @unittest.skip('not ready yet (see bpo-32604)')
     210      def test_subinterpreter_isolated_default(self):
     211          interp = interpreters.create()
     212          self.assertFalse(interp.isolated)
     213  
     214      def test_subinterpreter_isolated_explicit(self):
     215          interp1 = interpreters.create(isolated=True)
     216          interp2 = interpreters.create(isolated=False)
     217          self.assertTrue(interp1.isolated)
     218          self.assertFalse(interp2.isolated)
     219  
     220      @unittest.skip('not ready yet (see bpo-32604)')
     221      def test_custom_isolated_default(self):
     222          interp = interpreters.Interpreter(1)
     223          self.assertFalse(interp.isolated)
     224  
     225      def test_custom_isolated_explicit(self):
     226          interp1 = interpreters.Interpreter(1, isolated=True)
     227          interp2 = interpreters.Interpreter(1, isolated=False)
     228          self.assertTrue(interp1.isolated)
     229          self.assertFalse(interp2.isolated)
     230  
     231      def test_isolated_readonly(self):
     232          interp = interpreters.Interpreter(1)
     233          with self.assertRaises(AttributeError):
     234              interp.isolated = True
     235  
     236      def test_equality(self):
     237          interp1 = interpreters.create()
     238          interp2 = interpreters.create()
     239          self.assertEqual(interp1, interp1)
     240          self.assertNotEqual(interp1, interp2)
     241  
     242  
     243  class ESC[4;38;5;81mTestInterpreterIsRunning(ESC[4;38;5;149mTestBase):
     244  
     245      def test_main(self):
     246          main = interpreters.get_main()
     247          self.assertTrue(main.is_running())
     248  
     249      @unittest.skip('Fails on FreeBSD')
     250      def test_subinterpreter(self):
     251          interp = interpreters.create()
     252          self.assertFalse(interp.is_running())
     253  
     254          with _running(interp):
     255              self.assertTrue(interp.is_running())
     256          self.assertFalse(interp.is_running())
     257  
     258      def test_from_subinterpreter(self):
     259          interp = interpreters.create()
     260          out = _run_output(interp, dedent(f"""
     261              import _xxsubinterpreters as _interpreters
     262              if _interpreters.is_running({interp.id}):
     263                  print(True)
     264              else:
     265                  print(False)
     266              """))
     267          self.assertEqual(out.strip(), 'True')
     268  
     269      def test_already_destroyed(self):
     270          interp = interpreters.create()
     271          interp.close()
     272          with self.assertRaises(RuntimeError):
     273              interp.is_running()
     274  
     275      def test_does_not_exist(self):
     276          interp = interpreters.Interpreter(1_000_000)
     277          with self.assertRaises(RuntimeError):
     278              interp.is_running()
     279  
     280      def test_bad_id(self):
     281          interp = interpreters.Interpreter(-1)
     282          with self.assertRaises(ValueError):
     283              interp.is_running()
     284  
     285  
     286  class ESC[4;38;5;81mTestInterpreterClose(ESC[4;38;5;149mTestBase):
     287  
     288      def test_basic(self):
     289          main = interpreters.get_main()
     290          interp1 = interpreters.create()
     291          interp2 = interpreters.create()
     292          interp3 = interpreters.create()
     293          self.assertEqual(set(interpreters.list_all()),
     294                           {main, interp1, interp2, interp3})
     295          interp2.close()
     296          self.assertEqual(set(interpreters.list_all()),
     297                           {main, interp1, interp3})
     298  
     299      def test_all(self):
     300          before = set(interpreters.list_all())
     301          interps = set()
     302          for _ in range(3):
     303              interp = interpreters.create()
     304              interps.add(interp)
     305          self.assertEqual(set(interpreters.list_all()), before | interps)
     306          for interp in interps:
     307              interp.close()
     308          self.assertEqual(set(interpreters.list_all()), before)
     309  
     310      def test_main(self):
     311          main, = interpreters.list_all()
     312          with self.assertRaises(RuntimeError):
     313              main.close()
     314  
     315          def f():
     316              with self.assertRaises(RuntimeError):
     317                  main.close()
     318  
     319          t = threading.Thread(target=f)
     320          t.start()
     321          t.join()
     322  
     323      def test_already_destroyed(self):
     324          interp = interpreters.create()
     325          interp.close()
     326          with self.assertRaises(RuntimeError):
     327              interp.close()
     328  
     329      def test_does_not_exist(self):
     330          interp = interpreters.Interpreter(1_000_000)
     331          with self.assertRaises(RuntimeError):
     332              interp.close()
     333  
     334      def test_bad_id(self):
     335          interp = interpreters.Interpreter(-1)
     336          with self.assertRaises(ValueError):
     337              interp.close()
     338  
     339      def test_from_current(self):
     340          main, = interpreters.list_all()
     341          interp = interpreters.create()
     342          out = _run_output(interp, dedent(f"""
     343              from test.support import interpreters
     344              interp = interpreters.Interpreter({int(interp.id)})
     345              try:
     346                  interp.close()
     347              except RuntimeError:
     348                  print('failed')
     349              """))
     350          self.assertEqual(out.strip(), 'failed')
     351          self.assertEqual(set(interpreters.list_all()), {main, interp})
     352  
     353      def test_from_sibling(self):
     354          main, = interpreters.list_all()
     355          interp1 = interpreters.create()
     356          interp2 = interpreters.create()
     357          self.assertEqual(set(interpreters.list_all()),
     358                           {main, interp1, interp2})
     359          interp1.run(dedent(f"""
     360              from test.support import interpreters
     361              interp2 = interpreters.Interpreter(int({interp2.id}))
     362              interp2.close()
     363              interp3 = interpreters.create()
     364              interp3.close()
     365              """))
     366          self.assertEqual(set(interpreters.list_all()), {main, interp1})
     367  
     368      def test_from_other_thread(self):
     369          interp = interpreters.create()
     370          def f():
     371              interp.close()
     372  
     373          t = threading.Thread(target=f)
     374          t.start()
     375          t.join()
     376  
     377      @unittest.skip('Fails on FreeBSD')
     378      def test_still_running(self):
     379          main, = interpreters.list_all()
     380          interp = interpreters.create()
     381          with _running(interp):
     382              with self.assertRaises(RuntimeError):
     383                  interp.close()
     384              self.assertTrue(interp.is_running())
     385  
     386  
     387  class ESC[4;38;5;81mTestInterpreterRun(ESC[4;38;5;149mTestBase):
     388  
     389      def test_success(self):
     390          interp = interpreters.create()
     391          script, file = _captured_script('print("it worked!", end="")')
     392          with file:
     393              interp.run(script)
     394              out = file.read()
     395  
     396          self.assertEqual(out, 'it worked!')
     397  
     398      def test_in_thread(self):
     399          interp = interpreters.create()
     400          script, file = _captured_script('print("it worked!", end="")')
     401          with file:
     402              def f():
     403                  interp.run(script)
     404  
     405              t = threading.Thread(target=f)
     406              t.start()
     407              t.join()
     408              out = file.read()
     409  
     410          self.assertEqual(out, 'it worked!')
     411  
     412      @support.requires_fork()
     413      def test_fork(self):
     414          interp = interpreters.create()
     415          import tempfile
     416          with tempfile.NamedTemporaryFile('w+', encoding='utf-8') as file:
     417              file.write('')
     418              file.flush()
     419  
     420              expected = 'spam spam spam spam spam'
     421              script = dedent(f"""
     422                  import os
     423                  try:
     424                      os.fork()
     425                  except RuntimeError:
     426                      with open('{file.name}', 'w', encoding='utf-8') as out:
     427                          out.write('{expected}')
     428                  """)
     429              interp.run(script)
     430  
     431              file.seek(0)
     432              content = file.read()
     433              self.assertEqual(content, expected)
     434  
     435      @unittest.skip('Fails on FreeBSD')
     436      def test_already_running(self):
     437          interp = interpreters.create()
     438          with _running(interp):
     439              with self.assertRaises(RuntimeError):
     440                  interp.run('print("spam")')
     441  
     442      def test_does_not_exist(self):
     443          interp = interpreters.Interpreter(1_000_000)
     444          with self.assertRaises(RuntimeError):
     445              interp.run('print("spam")')
     446  
     447      def test_bad_id(self):
     448          interp = interpreters.Interpreter(-1)
     449          with self.assertRaises(ValueError):
     450              interp.run('print("spam")')
     451  
     452      def test_bad_script(self):
     453          interp = interpreters.create()
     454          with self.assertRaises(TypeError):
     455              interp.run(10)
     456  
     457      def test_bytes_for_script(self):
     458          interp = interpreters.create()
     459          with self.assertRaises(TypeError):
     460              interp.run(b'print("spam")')
     461  
     462      # test_xxsubinterpreters covers the remaining Interpreter.run() behavior.
     463  
     464  
     465  class ESC[4;38;5;81mTestIsShareable(ESC[4;38;5;149mTestBase):
     466  
     467      def test_default_shareables(self):
     468          shareables = [
     469                  # singletons
     470                  None,
     471                  # builtin objects
     472                  b'spam',
     473                  'spam',
     474                  10,
     475                  -10,
     476                  ]
     477          for obj in shareables:
     478              with self.subTest(obj):
     479                  shareable = interpreters.is_shareable(obj)
     480                  self.assertTrue(shareable)
     481  
     482      def test_not_shareable(self):
     483          class ESC[4;38;5;81mCheese:
     484              def __init__(self, name):
     485                  self.name = name
     486              def __str__(self):
     487                  return self.name
     488  
     489          class ESC[4;38;5;81mSubBytes(ESC[4;38;5;149mbytes):
     490              """A subclass of a shareable type."""
     491  
     492          not_shareables = [
     493                  # singletons
     494                  True,
     495                  False,
     496                  NotImplemented,
     497                  ...,
     498                  # builtin types and objects
     499                  type,
     500                  object,
     501                  object(),
     502                  Exception(),
     503                  100.0,
     504                  # user-defined types and objects
     505                  Cheese,
     506                  Cheese('Wensleydale'),
     507                  SubBytes(b'spam'),
     508                  ]
     509          for obj in not_shareables:
     510              with self.subTest(repr(obj)):
     511                  self.assertFalse(
     512                      interpreters.is_shareable(obj))
     513  
     514  
     515  class ESC[4;38;5;81mTestChannels(ESC[4;38;5;149mTestBase):
     516  
     517      def test_create(self):
     518          r, s = interpreters.create_channel()
     519          self.assertIsInstance(r, interpreters.RecvChannel)
     520          self.assertIsInstance(s, interpreters.SendChannel)
     521  
     522      def test_list_all(self):
     523          self.assertEqual(interpreters.list_all_channels(), [])
     524          created = set()
     525          for _ in range(3):
     526              ch = interpreters.create_channel()
     527              created.add(ch)
     528          after = set(interpreters.list_all_channels())
     529          self.assertEqual(after, created)
     530  
     531  
     532  class ESC[4;38;5;81mTestRecvChannelAttrs(ESC[4;38;5;149mTestBase):
     533  
     534      def test_id_type(self):
     535          rch, _ = interpreters.create_channel()
     536          self.assertIsInstance(rch.id, _interpreters.ChannelID)
     537  
     538      def test_custom_id(self):
     539          rch = interpreters.RecvChannel(1)
     540          self.assertEqual(rch.id, 1)
     541  
     542          with self.assertRaises(TypeError):
     543              interpreters.RecvChannel('1')
     544  
     545      def test_id_readonly(self):
     546          rch = interpreters.RecvChannel(1)
     547          with self.assertRaises(AttributeError):
     548              rch.id = 2
     549  
     550      def test_equality(self):
     551          ch1, _ = interpreters.create_channel()
     552          ch2, _ = interpreters.create_channel()
     553          self.assertEqual(ch1, ch1)
     554          self.assertNotEqual(ch1, ch2)
     555  
     556  
     557  class ESC[4;38;5;81mTestSendChannelAttrs(ESC[4;38;5;149mTestBase):
     558  
     559      def test_id_type(self):
     560          _, sch = interpreters.create_channel()
     561          self.assertIsInstance(sch.id, _interpreters.ChannelID)
     562  
     563      def test_custom_id(self):
     564          sch = interpreters.SendChannel(1)
     565          self.assertEqual(sch.id, 1)
     566  
     567          with self.assertRaises(TypeError):
     568              interpreters.SendChannel('1')
     569  
     570      def test_id_readonly(self):
     571          sch = interpreters.SendChannel(1)
     572          with self.assertRaises(AttributeError):
     573              sch.id = 2
     574  
     575      def test_equality(self):
     576          _, ch1 = interpreters.create_channel()
     577          _, ch2 = interpreters.create_channel()
     578          self.assertEqual(ch1, ch1)
     579          self.assertNotEqual(ch1, ch2)
     580  
     581  
     582  class ESC[4;38;5;81mTestSendRecv(ESC[4;38;5;149mTestBase):
     583  
     584      def test_send_recv_main(self):
     585          r, s = interpreters.create_channel()
     586          orig = b'spam'
     587          s.send_nowait(orig)
     588          obj = r.recv()
     589  
     590          self.assertEqual(obj, orig)
     591          self.assertIsNot(obj, orig)
     592  
     593      def test_send_recv_same_interpreter(self):
     594          interp = interpreters.create()
     595          interp.run(dedent("""
     596              from test.support import interpreters
     597              r, s = interpreters.create_channel()
     598              orig = b'spam'
     599              s.send_nowait(orig)
     600              obj = r.recv()
     601              assert obj == orig, 'expected: obj == orig'
     602              assert obj is not orig, 'expected: obj is not orig'
     603              """))
     604  
     605      @unittest.skip('broken (see BPO-...)')
     606      def test_send_recv_different_interpreters(self):
     607          r1, s1 = interpreters.create_channel()
     608          r2, s2 = interpreters.create_channel()
     609          orig1 = b'spam'
     610          s1.send_nowait(orig1)
     611          out = _run_output(
     612              interpreters.create(),
     613              dedent(f"""
     614                  obj1 = r.recv()
     615                  assert obj1 == b'spam', 'expected: obj1 == orig1'
     616                  # When going to another interpreter we get a copy.
     617                  assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
     618                  orig2 = b'eggs'
     619                  print(id(orig2))
     620                  s.send_nowait(orig2)
     621                  """),
     622              channels=dict(r=r1, s=s2),
     623              )
     624          obj2 = r2.recv()
     625  
     626          self.assertEqual(obj2, b'eggs')
     627          self.assertNotEqual(id(obj2), int(out))
     628  
     629      def test_send_recv_different_threads(self):
     630          r, s = interpreters.create_channel()
     631  
     632          def f():
     633              while True:
     634                  try:
     635                      obj = r.recv()
     636                      break
     637                  except interpreters.ChannelEmptyError:
     638                      time.sleep(0.1)
     639              s.send(obj)
     640          t = threading.Thread(target=f)
     641          t.start()
     642  
     643          orig = b'spam'
     644          s.send(orig)
     645          t.join()
     646          obj = r.recv()
     647  
     648          self.assertEqual(obj, orig)
     649          self.assertIsNot(obj, orig)
     650  
     651      def test_send_recv_nowait_main(self):
     652          r, s = interpreters.create_channel()
     653          orig = b'spam'
     654          s.send_nowait(orig)
     655          obj = r.recv_nowait()
     656  
     657          self.assertEqual(obj, orig)
     658          self.assertIsNot(obj, orig)
     659  
     660      def test_send_recv_nowait_main_with_default(self):
     661          r, _ = interpreters.create_channel()
     662          obj = r.recv_nowait(None)
     663  
     664          self.assertIsNone(obj)
     665  
     666      def test_send_recv_nowait_same_interpreter(self):
     667          interp = interpreters.create()
     668          interp.run(dedent("""
     669              from test.support import interpreters
     670              r, s = interpreters.create_channel()
     671              orig = b'spam'
     672              s.send_nowait(orig)
     673              obj = r.recv_nowait()
     674              assert obj == orig, 'expected: obj == orig'
     675              # When going back to the same interpreter we get the same object.
     676              assert obj is not orig, 'expected: obj is not orig'
     677              """))
     678  
     679      @unittest.skip('broken (see BPO-...)')
     680      def test_send_recv_nowait_different_interpreters(self):
     681          r1, s1 = interpreters.create_channel()
     682          r2, s2 = interpreters.create_channel()
     683          orig1 = b'spam'
     684          s1.send_nowait(orig1)
     685          out = _run_output(
     686              interpreters.create(),
     687              dedent(f"""
     688                  obj1 = r.recv_nowait()
     689                  assert obj1 == b'spam', 'expected: obj1 == orig1'
     690                  # When going to another interpreter we get a copy.
     691                  assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
     692                  orig2 = b'eggs'
     693                  print(id(orig2))
     694                  s.send_nowait(orig2)
     695                  """),
     696              channels=dict(r=r1, s=s2),
     697              )
     698          obj2 = r2.recv_nowait()
     699  
     700          self.assertEqual(obj2, b'eggs')
     701          self.assertNotEqual(id(obj2), int(out))
     702  
     703      def test_recv_channel_does_not_exist(self):
     704          ch = interpreters.RecvChannel(1_000_000)
     705          with self.assertRaises(interpreters.ChannelNotFoundError):
     706              ch.recv()
     707  
     708      def test_send_channel_does_not_exist(self):
     709          ch = interpreters.SendChannel(1_000_000)
     710          with self.assertRaises(interpreters.ChannelNotFoundError):
     711              ch.send(b'spam')
     712  
     713      def test_recv_nowait_channel_does_not_exist(self):
     714          ch = interpreters.RecvChannel(1_000_000)
     715          with self.assertRaises(interpreters.ChannelNotFoundError):
     716              ch.recv_nowait()
     717  
     718      def test_send_nowait_channel_does_not_exist(self):
     719          ch = interpreters.SendChannel(1_000_000)
     720          with self.assertRaises(interpreters.ChannelNotFoundError):
     721              ch.send_nowait(b'spam')
     722  
     723      def test_recv_nowait_empty(self):
     724          ch, _ = interpreters.create_channel()
     725          with self.assertRaises(interpreters.ChannelEmptyError):
     726              ch.recv_nowait()
     727  
     728      def test_recv_nowait_default(self):
     729          default = object()
     730          rch, sch = interpreters.create_channel()
     731          obj1 = rch.recv_nowait(default)
     732          sch.send_nowait(None)
     733          sch.send_nowait(1)
     734          sch.send_nowait(b'spam')
     735          sch.send_nowait(b'eggs')
     736          obj2 = rch.recv_nowait(default)
     737          obj3 = rch.recv_nowait(default)
     738          obj4 = rch.recv_nowait()
     739          obj5 = rch.recv_nowait(default)
     740          obj6 = rch.recv_nowait(default)
     741  
     742          self.assertIs(obj1, default)
     743          self.assertIs(obj2, None)
     744          self.assertEqual(obj3, 1)
     745          self.assertEqual(obj4, b'spam')
     746          self.assertEqual(obj5, b'eggs')
     747          self.assertIs(obj6, default)