python (3.12.0)

(root)/
lib/
python3.12/
test/
test__xxinterpchannels.py
       1  from collections import namedtuple
       2  import contextlib
       3  import sys
       4  from textwrap import dedent
       5  import threading
       6  import time
       7  import unittest
       8  
       9  from test.support import import_helper
      10  
      11  from test.test__xxsubinterpreters import (
      12      interpreters,
      13      _run_output,
      14      clean_up_interpreters,
      15  )
      16  
      17  
      18  channels = import_helper.import_module('_xxinterpchannels')
      19  
      20  
      21  ##################################
      22  # helpers
      23  
      24  #@contextmanager
      25  #def run_threaded(id, source, **shared):
      26  #    def run():
      27  #        run_interp(id, source, **shared)
      28  #    t = threading.Thread(target=run)
      29  #    t.start()
      30  #    yield
      31  #    t.join()
      32  
      33  
      34  def run_interp(id, source, **shared):
      35      _run_interp(id, source, shared)
      36  
      37  
      38  def _run_interp(id, source, shared, _mainns={}):
      39      source = dedent(source)
      40      main = interpreters.get_main()
      41      if main == id:
      42          if interpreters.get_current() != main:
      43              raise RuntimeError
      44          # XXX Run a func?
      45          exec(source, _mainns)
      46      else:
      47          interpreters.run_string(id, source, shared)
      48  
      49  
      50  class ESC[4;38;5;81mInterpreter(ESC[4;38;5;149mnamedtuple('Interpreter', 'name id')):
      51  
      52      @classmethod
      53      def from_raw(cls, raw):
      54          if isinstance(raw, cls):
      55              return raw
      56          elif isinstance(raw, str):
      57              return cls(raw)
      58          else:
      59              raise NotImplementedError
      60  
      61      def __new__(cls, name=None, id=None):
      62          main = interpreters.get_main()
      63          if id == main:
      64              if not name:
      65                  name = 'main'
      66              elif name != 'main':
      67                  raise ValueError(
      68                      'name mismatch (expected "main", got "{}")'.format(name))
      69              id = main
      70          elif id is not None:
      71              if not name:
      72                  name = 'interp'
      73              elif name == 'main':
      74                  raise ValueError('name mismatch (unexpected "main")')
      75              if not isinstance(id, interpreters.InterpreterID):
      76                  id = interpreters.InterpreterID(id)
      77          elif not name or name == 'main':
      78              name = 'main'
      79              id = main
      80          else:
      81              id = interpreters.create()
      82          self = super().__new__(cls, name, id)
      83          return self
      84  
      85  
      86  # XXX expect_channel_closed() is unnecessary once we improve exc propagation.
      87  
      88  @contextlib.contextmanager
      89  def expect_channel_closed():
      90      try:
      91          yield
      92      except channels.ChannelClosedError:
      93          pass
      94      else:
      95          assert False, 'channel not closed'
      96  
      97  
      98  class ESC[4;38;5;81mChannelAction(ESC[4;38;5;149mnamedtuple('ChannelAction', 'action end interp')):
      99  
     100      def __new__(cls, action, end=None, interp=None):
     101          if not end:
     102              end = 'both'
     103          if not interp:
     104              interp = 'main'
     105          self = super().__new__(cls, action, end, interp)
     106          return self
     107  
     108      def __init__(self, *args, **kwargs):
     109          if self.action == 'use':
     110              if self.end not in ('same', 'opposite', 'send', 'recv'):
     111                  raise ValueError(self.end)
     112          elif self.action in ('close', 'force-close'):
     113              if self.end not in ('both', 'same', 'opposite', 'send', 'recv'):
     114                  raise ValueError(self.end)
     115          else:
     116              raise ValueError(self.action)
     117          if self.interp not in ('main', 'same', 'other', 'extra'):
     118              raise ValueError(self.interp)
     119  
     120      def resolve_end(self, end):
     121          if self.end == 'same':
     122              return end
     123          elif self.end == 'opposite':
     124              return 'recv' if end == 'send' else 'send'
     125          else:
     126              return self.end
     127  
     128      def resolve_interp(self, interp, other, extra):
     129          if self.interp == 'same':
     130              return interp
     131          elif self.interp == 'other':
     132              if other is None:
     133                  raise RuntimeError
     134              return other
     135          elif self.interp == 'extra':
     136              if extra is None:
     137                  raise RuntimeError
     138              return extra
     139          elif self.interp == 'main':
     140              if interp.name == 'main':
     141                  return interp
     142              elif other and other.name == 'main':
     143                  return other
     144              else:
     145                  raise RuntimeError
     146          # Per __init__(), there aren't any others.
     147  
     148  
     149  class ESC[4;38;5;81mChannelState(ESC[4;38;5;149mnamedtuple('ChannelState', 'pending closed')):
     150  
     151      def __new__(cls, pending=0, *, closed=False):
     152          self = super().__new__(cls, pending, closed)
     153          return self
     154  
     155      def incr(self):
     156          return type(self)(self.pending + 1, closed=self.closed)
     157  
     158      def decr(self):
     159          return type(self)(self.pending - 1, closed=self.closed)
     160  
     161      def close(self, *, force=True):
     162          if self.closed:
     163              if not force or self.pending == 0:
     164                  return self
     165          return type(self)(0 if force else self.pending, closed=True)
     166  
     167  
     168  def run_action(cid, action, end, state, *, hideclosed=True):
     169      if state.closed:
     170          if action == 'use' and end == 'recv' and state.pending:
     171              expectfail = False
     172          else:
     173              expectfail = True
     174      else:
     175          expectfail = False
     176  
     177      try:
     178          result = _run_action(cid, action, end, state)
     179      except channels.ChannelClosedError:
     180          if not hideclosed and not expectfail:
     181              raise
     182          result = state.close()
     183      else:
     184          if expectfail:
     185              raise ...  # XXX
     186      return result
     187  
     188  
     189  def _run_action(cid, action, end, state):
     190      if action == 'use':
     191          if end == 'send':
     192              channels.send(cid, b'spam')
     193              return state.incr()
     194          elif end == 'recv':
     195              if not state.pending:
     196                  try:
     197                      channels.recv(cid)
     198                  except channels.ChannelEmptyError:
     199                      return state
     200                  else:
     201                      raise Exception('expected ChannelEmptyError')
     202              else:
     203                  channels.recv(cid)
     204                  return state.decr()
     205          else:
     206              raise ValueError(end)
     207      elif action == 'close':
     208          kwargs = {}
     209          if end in ('recv', 'send'):
     210              kwargs[end] = True
     211          channels.close(cid, **kwargs)
     212          return state.close()
     213      elif action == 'force-close':
     214          kwargs = {
     215              'force': True,
     216              }
     217          if end in ('recv', 'send'):
     218              kwargs[end] = True
     219          channels.close(cid, **kwargs)
     220          return state.close(force=True)
     221      else:
     222          raise ValueError(action)
     223  
     224  
     225  def clean_up_channels():
     226      for cid in channels.list_all():
     227          try:
     228              channels.destroy(cid)
     229          except channels.ChannelNotFoundError:
     230              pass  # already destroyed
     231  
     232  
     233  class ESC[4;38;5;81mTestBase(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase):
     234  
     235      def tearDown(self):
     236          clean_up_channels()
     237          clean_up_interpreters()
     238  
     239  
     240  ##################################
     241  # channel tests
     242  
     243  class ESC[4;38;5;81mChannelIDTests(ESC[4;38;5;149mTestBase):
     244  
     245      def test_default_kwargs(self):
     246          cid = channels._channel_id(10, force=True)
     247  
     248          self.assertEqual(int(cid), 10)
     249          self.assertEqual(cid.end, 'both')
     250  
     251      def test_with_kwargs(self):
     252          cid = channels._channel_id(10, send=True, force=True)
     253          self.assertEqual(cid.end, 'send')
     254  
     255          cid = channels._channel_id(10, send=True, recv=False, force=True)
     256          self.assertEqual(cid.end, 'send')
     257  
     258          cid = channels._channel_id(10, recv=True, force=True)
     259          self.assertEqual(cid.end, 'recv')
     260  
     261          cid = channels._channel_id(10, recv=True, send=False, force=True)
     262          self.assertEqual(cid.end, 'recv')
     263  
     264          cid = channels._channel_id(10, send=True, recv=True, force=True)
     265          self.assertEqual(cid.end, 'both')
     266  
     267      def test_coerce_id(self):
     268          class ESC[4;38;5;81mInt(ESC[4;38;5;149mstr):
     269              def __index__(self):
     270                  return 10
     271  
     272          cid = channels._channel_id(Int(), force=True)
     273          self.assertEqual(int(cid), 10)
     274  
     275      def test_bad_id(self):
     276          self.assertRaises(TypeError, channels._channel_id, object())
     277          self.assertRaises(TypeError, channels._channel_id, 10.0)
     278          self.assertRaises(TypeError, channels._channel_id, '10')
     279          self.assertRaises(TypeError, channels._channel_id, b'10')
     280          self.assertRaises(ValueError, channels._channel_id, -1)
     281          self.assertRaises(OverflowError, channels._channel_id, 2**64)
     282  
     283      def test_bad_kwargs(self):
     284          with self.assertRaises(ValueError):
     285              channels._channel_id(10, send=False, recv=False)
     286  
     287      def test_does_not_exist(self):
     288          cid = channels.create()
     289          with self.assertRaises(channels.ChannelNotFoundError):
     290              channels._channel_id(int(cid) + 1)  # unforced
     291  
     292      def test_str(self):
     293          cid = channels._channel_id(10, force=True)
     294          self.assertEqual(str(cid), '10')
     295  
     296      def test_repr(self):
     297          cid = channels._channel_id(10, force=True)
     298          self.assertEqual(repr(cid), 'ChannelID(10)')
     299  
     300          cid = channels._channel_id(10, send=True, force=True)
     301          self.assertEqual(repr(cid), 'ChannelID(10, send=True)')
     302  
     303          cid = channels._channel_id(10, recv=True, force=True)
     304          self.assertEqual(repr(cid), 'ChannelID(10, recv=True)')
     305  
     306          cid = channels._channel_id(10, send=True, recv=True, force=True)
     307          self.assertEqual(repr(cid), 'ChannelID(10)')
     308  
     309      def test_equality(self):
     310          cid1 = channels.create()
     311          cid2 = channels._channel_id(int(cid1))
     312          cid3 = channels.create()
     313  
     314          self.assertTrue(cid1 == cid1)
     315          self.assertTrue(cid1 == cid2)
     316          self.assertTrue(cid1 == int(cid1))
     317          self.assertTrue(int(cid1) == cid1)
     318          self.assertTrue(cid1 == float(int(cid1)))
     319          self.assertTrue(float(int(cid1)) == cid1)
     320          self.assertFalse(cid1 == float(int(cid1)) + 0.1)
     321          self.assertFalse(cid1 == str(int(cid1)))
     322          self.assertFalse(cid1 == 2**1000)
     323          self.assertFalse(cid1 == float('inf'))
     324          self.assertFalse(cid1 == 'spam')
     325          self.assertFalse(cid1 == cid3)
     326  
     327          self.assertFalse(cid1 != cid1)
     328          self.assertFalse(cid1 != cid2)
     329          self.assertTrue(cid1 != cid3)
     330  
     331      def test_shareable(self):
     332          chan = channels.create()
     333  
     334          obj = channels.create()
     335          channels.send(chan, obj)
     336          got = channels.recv(chan)
     337  
     338          self.assertEqual(got, obj)
     339          self.assertIs(type(got), type(obj))
     340          # XXX Check the following in the channel tests?
     341          #self.assertIsNot(got, obj)
     342  
     343  
     344  class ESC[4;38;5;81mChannelTests(ESC[4;38;5;149mTestBase):
     345  
     346      def test_create_cid(self):
     347          cid = channels.create()
     348          self.assertIsInstance(cid, channels.ChannelID)
     349  
     350      def test_sequential_ids(self):
     351          before = channels.list_all()
     352          id1 = channels.create()
     353          id2 = channels.create()
     354          id3 = channels.create()
     355          after = channels.list_all()
     356  
     357          self.assertEqual(id2, int(id1) + 1)
     358          self.assertEqual(id3, int(id2) + 1)
     359          self.assertEqual(set(after) - set(before), {id1, id2, id3})
     360  
     361      def test_ids_global(self):
     362          id1 = interpreters.create()
     363          out = _run_output(id1, dedent("""
     364              import _xxinterpchannels as _channels
     365              cid = _channels.create()
     366              print(cid)
     367              """))
     368          cid1 = int(out.strip())
     369  
     370          id2 = interpreters.create()
     371          out = _run_output(id2, dedent("""
     372              import _xxinterpchannels as _channels
     373              cid = _channels.create()
     374              print(cid)
     375              """))
     376          cid2 = int(out.strip())
     377  
     378          self.assertEqual(cid2, int(cid1) + 1)
     379  
     380      def test_channel_list_interpreters_none(self):
     381          """Test listing interpreters for a channel with no associations."""
     382          # Test for channel with no associated interpreters.
     383          cid = channels.create()
     384          send_interps = channels.list_interpreters(cid, send=True)
     385          recv_interps = channels.list_interpreters(cid, send=False)
     386          self.assertEqual(send_interps, [])
     387          self.assertEqual(recv_interps, [])
     388  
     389      def test_channel_list_interpreters_basic(self):
     390          """Test basic listing channel interpreters."""
     391          interp0 = interpreters.get_main()
     392          cid = channels.create()
     393          channels.send(cid, "send")
     394          # Test for a channel that has one end associated to an interpreter.
     395          send_interps = channels.list_interpreters(cid, send=True)
     396          recv_interps = channels.list_interpreters(cid, send=False)
     397          self.assertEqual(send_interps, [interp0])
     398          self.assertEqual(recv_interps, [])
     399  
     400          interp1 = interpreters.create()
     401          _run_output(interp1, dedent(f"""
     402              import _xxinterpchannels as _channels
     403              obj = _channels.recv({cid})
     404              """))
     405          # Test for channel that has both ends associated to an interpreter.
     406          send_interps = channels.list_interpreters(cid, send=True)
     407          recv_interps = channels.list_interpreters(cid, send=False)
     408          self.assertEqual(send_interps, [interp0])
     409          self.assertEqual(recv_interps, [interp1])
     410  
     411      def test_channel_list_interpreters_multiple(self):
     412          """Test listing interpreters for a channel with many associations."""
     413          interp0 = interpreters.get_main()
     414          interp1 = interpreters.create()
     415          interp2 = interpreters.create()
     416          interp3 = interpreters.create()
     417          cid = channels.create()
     418  
     419          channels.send(cid, "send")
     420          _run_output(interp1, dedent(f"""
     421              import _xxinterpchannels as _channels
     422              _channels.send({cid}, "send")
     423              """))
     424          _run_output(interp2, dedent(f"""
     425              import _xxinterpchannels as _channels
     426              obj = _channels.recv({cid})
     427              """))
     428          _run_output(interp3, dedent(f"""
     429              import _xxinterpchannels as _channels
     430              obj = _channels.recv({cid})
     431              """))
     432          send_interps = channels.list_interpreters(cid, send=True)
     433          recv_interps = channels.list_interpreters(cid, send=False)
     434          self.assertEqual(set(send_interps), {interp0, interp1})
     435          self.assertEqual(set(recv_interps), {interp2, interp3})
     436  
     437      def test_channel_list_interpreters_destroyed(self):
     438          """Test listing channel interpreters with a destroyed interpreter."""
     439          interp0 = interpreters.get_main()
     440          interp1 = interpreters.create()
     441          cid = channels.create()
     442          channels.send(cid, "send")
     443          _run_output(interp1, dedent(f"""
     444              import _xxinterpchannels as _channels
     445              obj = _channels.recv({cid})
     446              """))
     447          # Should be one interpreter associated with each end.
     448          send_interps = channels.list_interpreters(cid, send=True)
     449          recv_interps = channels.list_interpreters(cid, send=False)
     450          self.assertEqual(send_interps, [interp0])
     451          self.assertEqual(recv_interps, [interp1])
     452  
     453          interpreters.destroy(interp1)
     454          # Destroyed interpreter should not be listed.
     455          send_interps = channels.list_interpreters(cid, send=True)
     456          recv_interps = channels.list_interpreters(cid, send=False)
     457          self.assertEqual(send_interps, [interp0])
     458          self.assertEqual(recv_interps, [])
     459  
     460      def test_channel_list_interpreters_released(self):
     461          """Test listing channel interpreters with a released channel."""
     462          # Set up one channel with main interpreter on the send end and two
     463          # subinterpreters on the receive end.
     464          interp0 = interpreters.get_main()
     465          interp1 = interpreters.create()
     466          interp2 = interpreters.create()
     467          cid = channels.create()
     468          channels.send(cid, "data")
     469          _run_output(interp1, dedent(f"""
     470              import _xxinterpchannels as _channels
     471              obj = _channels.recv({cid})
     472              """))
     473          channels.send(cid, "data")
     474          _run_output(interp2, dedent(f"""
     475              import _xxinterpchannels as _channels
     476              obj = _channels.recv({cid})
     477              """))
     478          # Check the setup.
     479          send_interps = channels.list_interpreters(cid, send=True)
     480          recv_interps = channels.list_interpreters(cid, send=False)
     481          self.assertEqual(len(send_interps), 1)
     482          self.assertEqual(len(recv_interps), 2)
     483  
     484          # Release the main interpreter from the send end.
     485          channels.release(cid, send=True)
     486          # Send end should have no associated interpreters.
     487          send_interps = channels.list_interpreters(cid, send=True)
     488          recv_interps = channels.list_interpreters(cid, send=False)
     489          self.assertEqual(len(send_interps), 0)
     490          self.assertEqual(len(recv_interps), 2)
     491  
     492          # Release one of the subinterpreters from the receive end.
     493          _run_output(interp2, dedent(f"""
     494              import _xxinterpchannels as _channels
     495              _channels.release({cid})
     496              """))
     497          # Receive end should have the released interpreter removed.
     498          send_interps = channels.list_interpreters(cid, send=True)
     499          recv_interps = channels.list_interpreters(cid, send=False)
     500          self.assertEqual(len(send_interps), 0)
     501          self.assertEqual(recv_interps, [interp1])
     502  
     503      def test_channel_list_interpreters_closed(self):
     504          """Test listing channel interpreters with a closed channel."""
     505          interp0 = interpreters.get_main()
     506          interp1 = interpreters.create()
     507          cid = channels.create()
     508          # Put something in the channel so that it's not empty.
     509          channels.send(cid, "send")
     510  
     511          # Check initial state.
     512          send_interps = channels.list_interpreters(cid, send=True)
     513          recv_interps = channels.list_interpreters(cid, send=False)
     514          self.assertEqual(len(send_interps), 1)
     515          self.assertEqual(len(recv_interps), 0)
     516  
     517          # Force close the channel.
     518          channels.close(cid, force=True)
     519          # Both ends should raise an error.
     520          with self.assertRaises(channels.ChannelClosedError):
     521              channels.list_interpreters(cid, send=True)
     522          with self.assertRaises(channels.ChannelClosedError):
     523              channels.list_interpreters(cid, send=False)
     524  
     525      def test_channel_list_interpreters_closed_send_end(self):
     526          """Test listing channel interpreters with a channel's send end closed."""
     527          interp0 = interpreters.get_main()
     528          interp1 = interpreters.create()
     529          cid = channels.create()
     530          # Put something in the channel so that it's not empty.
     531          channels.send(cid, "send")
     532  
     533          # Check initial state.
     534          send_interps = channels.list_interpreters(cid, send=True)
     535          recv_interps = channels.list_interpreters(cid, send=False)
     536          self.assertEqual(len(send_interps), 1)
     537          self.assertEqual(len(recv_interps), 0)
     538  
     539          # Close the send end of the channel.
     540          channels.close(cid, send=True)
     541          # Send end should raise an error.
     542          with self.assertRaises(channels.ChannelClosedError):
     543              channels.list_interpreters(cid, send=True)
     544          # Receive end should not be closed (since channel is not empty).
     545          recv_interps = channels.list_interpreters(cid, send=False)
     546          self.assertEqual(len(recv_interps), 0)
     547  
     548          # Close the receive end of the channel from a subinterpreter.
     549          _run_output(interp1, dedent(f"""
     550              import _xxinterpchannels as _channels
     551              _channels.close({cid}, force=True)
     552              """))
     553          return
     554          # Both ends should raise an error.
     555          with self.assertRaises(channels.ChannelClosedError):
     556              channels.list_interpreters(cid, send=True)
     557          with self.assertRaises(channels.ChannelClosedError):
     558              channels.list_interpreters(cid, send=False)
     559  
     560      ####################
     561  
     562      def test_send_recv_main(self):
     563          cid = channels.create()
     564          orig = b'spam'
     565          channels.send(cid, orig)
     566          obj = channels.recv(cid)
     567  
     568          self.assertEqual(obj, orig)
     569          self.assertIsNot(obj, orig)
     570  
     571      def test_send_recv_same_interpreter(self):
     572          id1 = interpreters.create()
     573          out = _run_output(id1, dedent("""
     574              import _xxinterpchannels as _channels
     575              cid = _channels.create()
     576              orig = b'spam'
     577              _channels.send(cid, orig)
     578              obj = _channels.recv(cid)
     579              assert obj is not orig
     580              assert obj == orig
     581              """))
     582  
     583      def test_send_recv_different_interpreters(self):
     584          cid = channels.create()
     585          id1 = interpreters.create()
     586          out = _run_output(id1, dedent(f"""
     587              import _xxinterpchannels as _channels
     588              _channels.send({cid}, b'spam')
     589              """))
     590          obj = channels.recv(cid)
     591  
     592          self.assertEqual(obj, b'spam')
     593  
     594      def test_send_recv_different_threads(self):
     595          cid = channels.create()
     596  
     597          def f():
     598              while True:
     599                  try:
     600                      obj = channels.recv(cid)
     601                      break
     602                  except channels.ChannelEmptyError:
     603                      time.sleep(0.1)
     604              channels.send(cid, obj)
     605          t = threading.Thread(target=f)
     606          t.start()
     607  
     608          channels.send(cid, b'spam')
     609          t.join()
     610          obj = channels.recv(cid)
     611  
     612          self.assertEqual(obj, b'spam')
     613  
     614      def test_send_recv_different_interpreters_and_threads(self):
     615          cid = channels.create()
     616          id1 = interpreters.create()
     617          out = None
     618  
     619          def f():
     620              nonlocal out
     621              out = _run_output(id1, dedent(f"""
     622                  import time
     623                  import _xxinterpchannels as _channels
     624                  while True:
     625                      try:
     626                          obj = _channels.recv({cid})
     627                          break
     628                      except _channels.ChannelEmptyError:
     629                          time.sleep(0.1)
     630                  assert(obj == b'spam')
     631                  _channels.send({cid}, b'eggs')
     632                  """))
     633          t = threading.Thread(target=f)
     634          t.start()
     635  
     636          channels.send(cid, b'spam')
     637          t.join()
     638          obj = channels.recv(cid)
     639  
     640          self.assertEqual(obj, b'eggs')
     641  
     642      def test_send_not_found(self):
     643          with self.assertRaises(channels.ChannelNotFoundError):
     644              channels.send(10, b'spam')
     645  
     646      def test_recv_not_found(self):
     647          with self.assertRaises(channels.ChannelNotFoundError):
     648              channels.recv(10)
     649  
     650      def test_recv_empty(self):
     651          cid = channels.create()
     652          with self.assertRaises(channels.ChannelEmptyError):
     653              channels.recv(cid)
     654  
     655      def test_recv_default(self):
     656          default = object()
     657          cid = channels.create()
     658          obj1 = channels.recv(cid, default)
     659          channels.send(cid, None)
     660          channels.send(cid, 1)
     661          channels.send(cid, b'spam')
     662          channels.send(cid, b'eggs')
     663          obj2 = channels.recv(cid, default)
     664          obj3 = channels.recv(cid, default)
     665          obj4 = channels.recv(cid)
     666          obj5 = channels.recv(cid, default)
     667          obj6 = channels.recv(cid, default)
     668  
     669          self.assertIs(obj1, default)
     670          self.assertIs(obj2, None)
     671          self.assertEqual(obj3, 1)
     672          self.assertEqual(obj4, b'spam')
     673          self.assertEqual(obj5, b'eggs')
     674          self.assertIs(obj6, default)
     675  
     676      def test_recv_sending_interp_destroyed(self):
     677          with self.subTest('closed'):
     678              cid1 = channels.create()
     679              interp = interpreters.create()
     680              interpreters.run_string(interp, dedent(f"""
     681                  import _xxinterpchannels as _channels
     682                  _channels.send({cid1}, b'spam')
     683                  """))
     684              interpreters.destroy(interp)
     685  
     686              with self.assertRaisesRegex(RuntimeError,
     687                                          f'channel {cid1} is closed'):
     688                  channels.recv(cid1)
     689              del cid1
     690          with self.subTest('still open'):
     691              cid2 = channels.create()
     692              interp = interpreters.create()
     693              interpreters.run_string(interp, dedent(f"""
     694                  import _xxinterpchannels as _channels
     695                  _channels.send({cid2}, b'spam')
     696                  """))
     697              channels.send(cid2, b'eggs')
     698              interpreters.destroy(interp)
     699  
     700              channels.recv(cid2)
     701              with self.assertRaisesRegex(RuntimeError,
     702                                          f'channel {cid2} is empty'):
     703                  channels.recv(cid2)
     704              del cid2
     705  
     706      def test_allowed_types(self):
     707          cid = channels.create()
     708          objects = [
     709              None,
     710              'spam',
     711              b'spam',
     712              42,
     713          ]
     714          for obj in objects:
     715              with self.subTest(obj):
     716                  channels.send(cid, obj)
     717                  got = channels.recv(cid)
     718  
     719                  self.assertEqual(got, obj)
     720                  self.assertIs(type(got), type(obj))
     721                  # XXX Check the following?
     722                  #self.assertIsNot(got, obj)
     723                  # XXX What about between interpreters?
     724  
     725      def test_run_string_arg_unresolved(self):
     726          cid = channels.create()
     727          interp = interpreters.create()
     728  
     729          out = _run_output(interp, dedent("""
     730              import _xxinterpchannels as _channels
     731              print(cid.end)
     732              _channels.send(cid, b'spam')
     733              """),
     734              dict(cid=cid.send))
     735          obj = channels.recv(cid)
     736  
     737          self.assertEqual(obj, b'spam')
     738          self.assertEqual(out.strip(), 'send')
     739  
     740      # XXX For now there is no high-level channel into which the
     741      # sent channel ID can be converted...
     742      # Note: this test caused crashes on some buildbots (bpo-33615).
     743      @unittest.skip('disabled until high-level channels exist')
     744      def test_run_string_arg_resolved(self):
     745          cid = channels.create()
     746          cid = channels._channel_id(cid, _resolve=True)
     747          interp = interpreters.create()
     748  
     749          out = _run_output(interp, dedent("""
     750              import _xxinterpchannels as _channels
     751              print(chan.id.end)
     752              _channels.send(chan.id, b'spam')
     753              """),
     754              dict(chan=cid.send))
     755          obj = channels.recv(cid)
     756  
     757          self.assertEqual(obj, b'spam')
     758          self.assertEqual(out.strip(), 'send')
     759  
     760      # close
     761  
     762      def test_close_single_user(self):
     763          cid = channels.create()
     764          channels.send(cid, b'spam')
     765          channels.recv(cid)
     766          channels.close(cid)
     767  
     768          with self.assertRaises(channels.ChannelClosedError):
     769              channels.send(cid, b'eggs')
     770          with self.assertRaises(channels.ChannelClosedError):
     771              channels.recv(cid)
     772  
     773      def test_close_multiple_users(self):
     774          cid = channels.create()
     775          id1 = interpreters.create()
     776          id2 = interpreters.create()
     777          interpreters.run_string(id1, dedent(f"""
     778              import _xxinterpchannels as _channels
     779              _channels.send({cid}, b'spam')
     780              """))
     781          interpreters.run_string(id2, dedent(f"""
     782              import _xxinterpchannels as _channels
     783              _channels.recv({cid})
     784              """))
     785          channels.close(cid)
     786          with self.assertRaises(interpreters.RunFailedError) as cm:
     787              interpreters.run_string(id1, dedent(f"""
     788                  _channels.send({cid}, b'spam')
     789                  """))
     790          self.assertIn('ChannelClosedError', str(cm.exception))
     791          with self.assertRaises(interpreters.RunFailedError) as cm:
     792              interpreters.run_string(id2, dedent(f"""
     793                  _channels.send({cid}, b'spam')
     794                  """))
     795          self.assertIn('ChannelClosedError', str(cm.exception))
     796  
     797      def test_close_multiple_times(self):
     798          cid = channels.create()
     799          channels.send(cid, b'spam')
     800          channels.recv(cid)
     801          channels.close(cid)
     802  
     803          with self.assertRaises(channels.ChannelClosedError):
     804              channels.close(cid)
     805  
     806      def test_close_empty(self):
     807          tests = [
     808              (False, False),
     809              (True, False),
     810              (False, True),
     811              (True, True),
     812              ]
     813          for send, recv in tests:
     814              with self.subTest((send, recv)):
     815                  cid = channels.create()
     816                  channels.send(cid, b'spam')
     817                  channels.recv(cid)
     818                  channels.close(cid, send=send, recv=recv)
     819  
     820                  with self.assertRaises(channels.ChannelClosedError):
     821                      channels.send(cid, b'eggs')
     822                  with self.assertRaises(channels.ChannelClosedError):
     823                      channels.recv(cid)
     824  
     825      def test_close_defaults_with_unused_items(self):
     826          cid = channels.create()
     827          channels.send(cid, b'spam')
     828          channels.send(cid, b'ham')
     829  
     830          with self.assertRaises(channels.ChannelNotEmptyError):
     831              channels.close(cid)
     832          channels.recv(cid)
     833          channels.send(cid, b'eggs')
     834  
     835      def test_close_recv_with_unused_items_unforced(self):
     836          cid = channels.create()
     837          channels.send(cid, b'spam')
     838          channels.send(cid, b'ham')
     839  
     840          with self.assertRaises(channels.ChannelNotEmptyError):
     841              channels.close(cid, recv=True)
     842          channels.recv(cid)
     843          channels.send(cid, b'eggs')
     844          channels.recv(cid)
     845          channels.recv(cid)
     846          channels.close(cid, recv=True)
     847  
     848      def test_close_send_with_unused_items_unforced(self):
     849          cid = channels.create()
     850          channels.send(cid, b'spam')
     851          channels.send(cid, b'ham')
     852          channels.close(cid, send=True)
     853  
     854          with self.assertRaises(channels.ChannelClosedError):
     855              channels.send(cid, b'eggs')
     856          channels.recv(cid)
     857          channels.recv(cid)
     858          with self.assertRaises(channels.ChannelClosedError):
     859              channels.recv(cid)
     860  
     861      def test_close_both_with_unused_items_unforced(self):
     862          cid = channels.create()
     863          channels.send(cid, b'spam')
     864          channels.send(cid, b'ham')
     865  
     866          with self.assertRaises(channels.ChannelNotEmptyError):
     867              channels.close(cid, recv=True, send=True)
     868          channels.recv(cid)
     869          channels.send(cid, b'eggs')
     870          channels.recv(cid)
     871          channels.recv(cid)
     872          channels.close(cid, recv=True)
     873  
     874      def test_close_recv_with_unused_items_forced(self):
     875          cid = channels.create()
     876          channels.send(cid, b'spam')
     877          channels.send(cid, b'ham')
     878          channels.close(cid, recv=True, force=True)
     879  
     880          with self.assertRaises(channels.ChannelClosedError):
     881              channels.send(cid, b'eggs')
     882          with self.assertRaises(channels.ChannelClosedError):
     883              channels.recv(cid)
     884  
     885      def test_close_send_with_unused_items_forced(self):
     886          cid = channels.create()
     887          channels.send(cid, b'spam')
     888          channels.send(cid, b'ham')
     889          channels.close(cid, send=True, force=True)
     890  
     891          with self.assertRaises(channels.ChannelClosedError):
     892              channels.send(cid, b'eggs')
     893          with self.assertRaises(channels.ChannelClosedError):
     894              channels.recv(cid)
     895  
     896      def test_close_both_with_unused_items_forced(self):
     897          cid = channels.create()
     898          channels.send(cid, b'spam')
     899          channels.send(cid, b'ham')
     900          channels.close(cid, send=True, recv=True, force=True)
     901  
     902          with self.assertRaises(channels.ChannelClosedError):
     903              channels.send(cid, b'eggs')
     904          with self.assertRaises(channels.ChannelClosedError):
     905              channels.recv(cid)
     906  
     907      def test_close_never_used(self):
     908          cid = channels.create()
     909          channels.close(cid)
     910  
     911          with self.assertRaises(channels.ChannelClosedError):
     912              channels.send(cid, b'spam')
     913          with self.assertRaises(channels.ChannelClosedError):
     914              channels.recv(cid)
     915  
     916      def test_close_by_unassociated_interp(self):
     917          cid = channels.create()
     918          channels.send(cid, b'spam')
     919          interp = interpreters.create()
     920          interpreters.run_string(interp, dedent(f"""
     921              import _xxinterpchannels as _channels
     922              _channels.close({cid}, force=True)
     923              """))
     924          with self.assertRaises(channels.ChannelClosedError):
     925              channels.recv(cid)
     926          with self.assertRaises(channels.ChannelClosedError):
     927              channels.close(cid)
     928  
     929      def test_close_used_multiple_times_by_single_user(self):
     930          cid = channels.create()
     931          channels.send(cid, b'spam')
     932          channels.send(cid, b'spam')
     933          channels.send(cid, b'spam')
     934          channels.recv(cid)
     935          channels.close(cid, force=True)
     936  
     937          with self.assertRaises(channels.ChannelClosedError):
     938              channels.send(cid, b'eggs')
     939          with self.assertRaises(channels.ChannelClosedError):
     940              channels.recv(cid)
     941  
     942      def test_channel_list_interpreters_invalid_channel(self):
     943          cid = channels.create()
     944          # Test for invalid channel ID.
     945          with self.assertRaises(channels.ChannelNotFoundError):
     946              channels.list_interpreters(1000, send=True)
     947  
     948          channels.close(cid)
     949          # Test for a channel that has been closed.
     950          with self.assertRaises(channels.ChannelClosedError):
     951              channels.list_interpreters(cid, send=True)
     952  
     953      def test_channel_list_interpreters_invalid_args(self):
     954          # Tests for invalid arguments passed to the API.
     955          cid = channels.create()
     956          with self.assertRaises(TypeError):
     957              channels.list_interpreters(cid)
     958  
     959  
     960  class ESC[4;38;5;81mChannelReleaseTests(ESC[4;38;5;149mTestBase):
     961  
     962      # XXX Add more test coverage a la the tests for close().
     963  
     964      """
     965      - main / interp / other
     966      - run in: current thread / new thread / other thread / different threads
     967      - end / opposite
     968      - force / no force
     969      - used / not used  (associated / not associated)
     970      - empty / emptied / never emptied / partly emptied
     971      - closed / not closed
     972      - released / not released
     973      - creator (interp) / other
     974      - associated interpreter not running
     975      - associated interpreter destroyed
     976      """
     977  
     978      """
     979      use
     980      pre-release
     981      release
     982      after
     983      check
     984      """
     985  
     986      """
     987      release in:         main, interp1
     988      creator:            same, other (incl. interp2)
     989  
     990      use:                None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
     991      pre-release:        None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all
     992      pre-release forced: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all
     993  
     994      release:            same
     995      release forced:     same
     996  
     997      use after:          None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
     998      release after:      None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all
     999      check released:     send/recv for same/other(incl. interp2)
    1000      check closed:       send/recv for same/other(incl. interp2)
    1001      """
    1002  
    1003      def test_single_user(self):
    1004          cid = channels.create()
    1005          channels.send(cid, b'spam')
    1006          channels.recv(cid)
    1007          channels.release(cid, send=True, recv=True)
    1008  
    1009          with self.assertRaises(channels.ChannelClosedError):
    1010              channels.send(cid, b'eggs')
    1011          with self.assertRaises(channels.ChannelClosedError):
    1012              channels.recv(cid)
    1013  
    1014      def test_multiple_users(self):
    1015          cid = channels.create()
    1016          id1 = interpreters.create()
    1017          id2 = interpreters.create()
    1018          interpreters.run_string(id1, dedent(f"""
    1019              import _xxinterpchannels as _channels
    1020              _channels.send({cid}, b'spam')
    1021              """))
    1022          out = _run_output(id2, dedent(f"""
    1023              import _xxinterpchannels as _channels
    1024              obj = _channels.recv({cid})
    1025              _channels.release({cid})
    1026              print(repr(obj))
    1027              """))
    1028          interpreters.run_string(id1, dedent(f"""
    1029              _channels.release({cid})
    1030              """))
    1031  
    1032          self.assertEqual(out.strip(), "b'spam'")
    1033  
    1034      def test_no_kwargs(self):
    1035          cid = channels.create()
    1036          channels.send(cid, b'spam')
    1037          channels.recv(cid)
    1038          channels.release(cid)
    1039  
    1040          with self.assertRaises(channels.ChannelClosedError):
    1041              channels.send(cid, b'eggs')
    1042          with self.assertRaises(channels.ChannelClosedError):
    1043              channels.recv(cid)
    1044  
    1045      def test_multiple_times(self):
    1046          cid = channels.create()
    1047          channels.send(cid, b'spam')
    1048          channels.recv(cid)
    1049          channels.release(cid, send=True, recv=True)
    1050  
    1051          with self.assertRaises(channels.ChannelClosedError):
    1052              channels.release(cid, send=True, recv=True)
    1053  
    1054      def test_with_unused_items(self):
    1055          cid = channels.create()
    1056          channels.send(cid, b'spam')
    1057          channels.send(cid, b'ham')
    1058          channels.release(cid, send=True, recv=True)
    1059  
    1060          with self.assertRaises(channels.ChannelClosedError):
    1061              channels.recv(cid)
    1062  
    1063      def test_never_used(self):
    1064          cid = channels.create()
    1065          channels.release(cid)
    1066  
    1067          with self.assertRaises(channels.ChannelClosedError):
    1068              channels.send(cid, b'spam')
    1069          with self.assertRaises(channels.ChannelClosedError):
    1070              channels.recv(cid)
    1071  
    1072      def test_by_unassociated_interp(self):
    1073          cid = channels.create()
    1074          channels.send(cid, b'spam')
    1075          interp = interpreters.create()
    1076          interpreters.run_string(interp, dedent(f"""
    1077              import _xxinterpchannels as _channels
    1078              _channels.release({cid})
    1079              """))
    1080          obj = channels.recv(cid)
    1081          channels.release(cid)
    1082  
    1083          with self.assertRaises(channels.ChannelClosedError):
    1084              channels.send(cid, b'eggs')
    1085          self.assertEqual(obj, b'spam')
    1086  
    1087      def test_close_if_unassociated(self):
    1088          # XXX Something's not right with this test...
    1089          cid = channels.create()
    1090          interp = interpreters.create()
    1091          interpreters.run_string(interp, dedent(f"""
    1092              import _xxinterpchannels as _channels
    1093              obj = _channels.send({cid}, b'spam')
    1094              _channels.release({cid})
    1095              """))
    1096  
    1097          with self.assertRaises(channels.ChannelClosedError):
    1098              channels.recv(cid)
    1099  
    1100      def test_partially(self):
    1101          # XXX Is partial close too weird/confusing?
    1102          cid = channels.create()
    1103          channels.send(cid, None)
    1104          channels.recv(cid)
    1105          channels.send(cid, b'spam')
    1106          channels.release(cid, send=True)
    1107          obj = channels.recv(cid)
    1108  
    1109          self.assertEqual(obj, b'spam')
    1110  
    1111      def test_used_multiple_times_by_single_user(self):
    1112          cid = channels.create()
    1113          channels.send(cid, b'spam')
    1114          channels.send(cid, b'spam')
    1115          channels.send(cid, b'spam')
    1116          channels.recv(cid)
    1117          channels.release(cid, send=True, recv=True)
    1118  
    1119          with self.assertRaises(channels.ChannelClosedError):
    1120              channels.send(cid, b'eggs')
    1121          with self.assertRaises(channels.ChannelClosedError):
    1122              channels.recv(cid)
    1123  
    1124  
    1125  class ESC[4;38;5;81mChannelCloseFixture(ESC[4;38;5;149mnamedtuple('ChannelCloseFixture',
    1126                                       'end interp other extra creator')):
    1127  
    1128      # Set this to True to avoid creating interpreters, e.g. when
    1129      # scanning through test permutations without running them.
    1130      QUICK = False
    1131  
    1132      def __new__(cls, end, interp, other, extra, creator):
    1133          assert end in ('send', 'recv')
    1134          if cls.QUICK:
    1135              known = {}
    1136          else:
    1137              interp = Interpreter.from_raw(interp)
    1138              other = Interpreter.from_raw(other)
    1139              extra = Interpreter.from_raw(extra)
    1140              known = {
    1141                  interp.name: interp,
    1142                  other.name: other,
    1143                  extra.name: extra,
    1144                  }
    1145          if not creator:
    1146              creator = 'same'
    1147          self = super().__new__(cls, end, interp, other, extra, creator)
    1148          self._prepped = set()
    1149          self._state = ChannelState()
    1150          self._known = known
    1151          return self
    1152  
    1153      @property
    1154      def state(self):
    1155          return self._state
    1156  
    1157      @property
    1158      def cid(self):
    1159          try:
    1160              return self._cid
    1161          except AttributeError:
    1162              creator = self._get_interpreter(self.creator)
    1163              self._cid = self._new_channel(creator)
    1164              return self._cid
    1165  
    1166      def get_interpreter(self, interp):
    1167          interp = self._get_interpreter(interp)
    1168          self._prep_interpreter(interp)
    1169          return interp
    1170  
    1171      def expect_closed_error(self, end=None):
    1172          if end is None:
    1173              end = self.end
    1174          if end == 'recv' and self.state.closed == 'send':
    1175              return False
    1176          return bool(self.state.closed)
    1177  
    1178      def prep_interpreter(self, interp):
    1179          self._prep_interpreter(interp)
    1180  
    1181      def record_action(self, action, result):
    1182          self._state = result
    1183  
    1184      def clean_up(self):
    1185          clean_up_interpreters()
    1186          clean_up_channels()
    1187  
    1188      # internal methods
    1189  
    1190      def _new_channel(self, creator):
    1191          if creator.name == 'main':
    1192              return channels.create()
    1193          else:
    1194              ch = channels.create()
    1195              run_interp(creator.id, f"""
    1196                  import _xxsubinterpreters
    1197                  cid = _xxsubchannels.create()
    1198                  # We purposefully send back an int to avoid tying the
    1199                  # channel to the other interpreter.
    1200                  _xxsubchannels.send({ch}, int(cid))
    1201                  del _xxsubinterpreters
    1202                  """)
    1203              self._cid = channels.recv(ch)
    1204          return self._cid
    1205  
    1206      def _get_interpreter(self, interp):
    1207          if interp in ('same', 'interp'):
    1208              return self.interp
    1209          elif interp == 'other':
    1210              return self.other
    1211          elif interp == 'extra':
    1212              return self.extra
    1213          else:
    1214              name = interp
    1215              try:
    1216                  interp = self._known[name]
    1217              except KeyError:
    1218                  interp = self._known[name] = Interpreter(name)
    1219              return interp
    1220  
    1221      def _prep_interpreter(self, interp):
    1222          if interp.id in self._prepped:
    1223              return
    1224          self._prepped.add(interp.id)
    1225          if interp.name == 'main':
    1226              return
    1227          run_interp(interp.id, f"""
    1228              import _xxinterpchannels as channels
    1229              import test.test__xxinterpchannels as helpers
    1230              ChannelState = helpers.ChannelState
    1231              try:
    1232                  cid
    1233              except NameError:
    1234                  cid = channels._channel_id({self.cid})
    1235              """)
    1236  
    1237  
    1238  @unittest.skip('these tests take several hours to run')
    1239  class ESC[4;38;5;81mExhaustiveChannelTests(ESC[4;38;5;149mTestBase):
    1240  
    1241      """
    1242      - main / interp / other
    1243      - run in: current thread / new thread / other thread / different threads
    1244      - end / opposite
    1245      - force / no force
    1246      - used / not used  (associated / not associated)
    1247      - empty / emptied / never emptied / partly emptied
    1248      - closed / not closed
    1249      - released / not released
    1250      - creator (interp) / other
    1251      - associated interpreter not running
    1252      - associated interpreter destroyed
    1253  
    1254      - close after unbound
    1255      """
    1256  
    1257      """
    1258      use
    1259      pre-close
    1260      close
    1261      after
    1262      check
    1263      """
    1264  
    1265      """
    1266      close in:         main, interp1
    1267      creator:          same, other, extra
    1268  
    1269      use:              None,send,recv,send/recv in None,same,other,same+other,all
    1270      pre-close:        None,send,recv in None,same,other,same+other,all
    1271      pre-close forced: None,send,recv in None,same,other,same+other,all
    1272  
    1273      close:            same
    1274      close forced:     same
    1275  
    1276      use after:        None,send,recv,send/recv in None,same,other,extra,same+other,all
    1277      close after:      None,send,recv,send/recv in None,same,other,extra,same+other,all
    1278      check closed:     send/recv for same/other(incl. interp2)
    1279      """
    1280  
    1281      def iter_action_sets(self):
    1282          # - used / not used  (associated / not associated)
    1283          # - empty / emptied / never emptied / partly emptied
    1284          # - closed / not closed
    1285          # - released / not released
    1286  
    1287          # never used
    1288          yield []
    1289  
    1290          # only pre-closed (and possible used after)
    1291          for closeactions in self._iter_close_action_sets('same', 'other'):
    1292              yield closeactions
    1293              for postactions in self._iter_post_close_action_sets():
    1294                  yield closeactions + postactions
    1295          for closeactions in self._iter_close_action_sets('other', 'extra'):
    1296              yield closeactions
    1297              for postactions in self._iter_post_close_action_sets():
    1298                  yield closeactions + postactions
    1299  
    1300          # used
    1301          for useactions in self._iter_use_action_sets('same', 'other'):
    1302              yield useactions
    1303              for closeactions in self._iter_close_action_sets('same', 'other'):
    1304                  actions = useactions + closeactions
    1305                  yield actions
    1306                  for postactions in self._iter_post_close_action_sets():
    1307                      yield actions + postactions
    1308              for closeactions in self._iter_close_action_sets('other', 'extra'):
    1309                  actions = useactions + closeactions
    1310                  yield actions
    1311                  for postactions in self._iter_post_close_action_sets():
    1312                      yield actions + postactions
    1313          for useactions in self._iter_use_action_sets('other', 'extra'):
    1314              yield useactions
    1315              for closeactions in self._iter_close_action_sets('same', 'other'):
    1316                  actions = useactions + closeactions
    1317                  yield actions
    1318                  for postactions in self._iter_post_close_action_sets():
    1319                      yield actions + postactions
    1320              for closeactions in self._iter_close_action_sets('other', 'extra'):
    1321                  actions = useactions + closeactions
    1322                  yield actions
    1323                  for postactions in self._iter_post_close_action_sets():
    1324                      yield actions + postactions
    1325  
    1326      def _iter_use_action_sets(self, interp1, interp2):
    1327          interps = (interp1, interp2)
    1328  
    1329          # only recv end used
    1330          yield [
    1331              ChannelAction('use', 'recv', interp1),
    1332              ]
    1333          yield [
    1334              ChannelAction('use', 'recv', interp2),
    1335              ]
    1336          yield [
    1337              ChannelAction('use', 'recv', interp1),
    1338              ChannelAction('use', 'recv', interp2),
    1339              ]
    1340  
    1341          # never emptied
    1342          yield [
    1343              ChannelAction('use', 'send', interp1),
    1344              ]
    1345          yield [
    1346              ChannelAction('use', 'send', interp2),
    1347              ]
    1348          yield [
    1349              ChannelAction('use', 'send', interp1),
    1350              ChannelAction('use', 'send', interp2),
    1351              ]
    1352  
    1353          # partially emptied
    1354          for interp1 in interps:
    1355              for interp2 in interps:
    1356                  for interp3 in interps:
    1357                      yield [
    1358                          ChannelAction('use', 'send', interp1),
    1359                          ChannelAction('use', 'send', interp2),
    1360                          ChannelAction('use', 'recv', interp3),
    1361                          ]
    1362  
    1363          # fully emptied
    1364          for interp1 in interps:
    1365              for interp2 in interps:
    1366                  for interp3 in interps:
    1367                      for interp4 in interps:
    1368                          yield [
    1369                              ChannelAction('use', 'send', interp1),
    1370                              ChannelAction('use', 'send', interp2),
    1371                              ChannelAction('use', 'recv', interp3),
    1372                              ChannelAction('use', 'recv', interp4),
    1373                              ]
    1374  
    1375      def _iter_close_action_sets(self, interp1, interp2):
    1376          ends = ('recv', 'send')
    1377          interps = (interp1, interp2)
    1378          for force in (True, False):
    1379              op = 'force-close' if force else 'close'
    1380              for interp in interps:
    1381                  for end in ends:
    1382                      yield [
    1383                          ChannelAction(op, end, interp),
    1384                          ]
    1385          for recvop in ('close', 'force-close'):
    1386              for sendop in ('close', 'force-close'):
    1387                  for recv in interps:
    1388                      for send in interps:
    1389                          yield [
    1390                              ChannelAction(recvop, 'recv', recv),
    1391                              ChannelAction(sendop, 'send', send),
    1392                              ]
    1393  
    1394      def _iter_post_close_action_sets(self):
    1395          for interp in ('same', 'extra', 'other'):
    1396              yield [
    1397                  ChannelAction('use', 'recv', interp),
    1398                  ]
    1399              yield [
    1400                  ChannelAction('use', 'send', interp),
    1401                  ]
    1402  
    1403      def run_actions(self, fix, actions):
    1404          for action in actions:
    1405              self.run_action(fix, action)
    1406  
    1407      def run_action(self, fix, action, *, hideclosed=True):
    1408          end = action.resolve_end(fix.end)
    1409          interp = action.resolve_interp(fix.interp, fix.other, fix.extra)
    1410          fix.prep_interpreter(interp)
    1411          if interp.name == 'main':
    1412              result = run_action(
    1413                  fix.cid,
    1414                  action.action,
    1415                  end,
    1416                  fix.state,
    1417                  hideclosed=hideclosed,
    1418                  )
    1419              fix.record_action(action, result)
    1420          else:
    1421              _cid = channels.create()
    1422              run_interp(interp.id, f"""
    1423                  result = helpers.run_action(
    1424                      {fix.cid},
    1425                      {repr(action.action)},
    1426                      {repr(end)},
    1427                      {repr(fix.state)},
    1428                      hideclosed={hideclosed},
    1429                      )
    1430                  channels.send({_cid}, result.pending.to_bytes(1, 'little'))
    1431                  channels.send({_cid}, b'X' if result.closed else b'')
    1432                  """)
    1433              result = ChannelState(
    1434                  pending=int.from_bytes(channels.recv(_cid), 'little'),
    1435                  closed=bool(channels.recv(_cid)),
    1436                  )
    1437              fix.record_action(action, result)
    1438  
    1439      def iter_fixtures(self):
    1440          # XXX threads?
    1441          interpreters = [
    1442              ('main', 'interp', 'extra'),
    1443              ('interp', 'main', 'extra'),
    1444              ('interp1', 'interp2', 'extra'),
    1445              ('interp1', 'interp2', 'main'),
    1446          ]
    1447          for interp, other, extra in interpreters:
    1448              for creator in ('same', 'other', 'creator'):
    1449                  for end in ('send', 'recv'):
    1450                      yield ChannelCloseFixture(end, interp, other, extra, creator)
    1451  
    1452      def _close(self, fix, *, force):
    1453          op = 'force-close' if force else 'close'
    1454          close = ChannelAction(op, fix.end, 'same')
    1455          if not fix.expect_closed_error():
    1456              self.run_action(fix, close, hideclosed=False)
    1457          else:
    1458              with self.assertRaises(channels.ChannelClosedError):
    1459                  self.run_action(fix, close, hideclosed=False)
    1460  
    1461      def _assert_closed_in_interp(self, fix, interp=None):
    1462          if interp is None or interp.name == 'main':
    1463              with self.assertRaises(channels.ChannelClosedError):
    1464                  channels.recv(fix.cid)
    1465              with self.assertRaises(channels.ChannelClosedError):
    1466                  channels.send(fix.cid, b'spam')
    1467              with self.assertRaises(channels.ChannelClosedError):
    1468                  channels.close(fix.cid)
    1469              with self.assertRaises(channels.ChannelClosedError):
    1470                  channels.close(fix.cid, force=True)
    1471          else:
    1472              run_interp(interp.id, """
    1473                  with helpers.expect_channel_closed():
    1474                      channels.recv(cid)
    1475                  """)
    1476              run_interp(interp.id, """
    1477                  with helpers.expect_channel_closed():
    1478                      channels.send(cid, b'spam')
    1479                  """)
    1480              run_interp(interp.id, """
    1481                  with helpers.expect_channel_closed():
    1482                      channels.close(cid)
    1483                  """)
    1484              run_interp(interp.id, """
    1485                  with helpers.expect_channel_closed():
    1486                      channels.close(cid, force=True)
    1487                  """)
    1488  
    1489      def _assert_closed(self, fix):
    1490          self.assertTrue(fix.state.closed)
    1491  
    1492          for _ in range(fix.state.pending):
    1493              channels.recv(fix.cid)
    1494          self._assert_closed_in_interp(fix)
    1495  
    1496          for interp in ('same', 'other'):
    1497              interp = fix.get_interpreter(interp)
    1498              if interp.name == 'main':
    1499                  continue
    1500              self._assert_closed_in_interp(fix, interp)
    1501  
    1502          interp = fix.get_interpreter('fresh')
    1503          self._assert_closed_in_interp(fix, interp)
    1504  
    1505      def _iter_close_tests(self, verbose=False):
    1506          i = 0
    1507          for actions in self.iter_action_sets():
    1508              print()
    1509              for fix in self.iter_fixtures():
    1510                  i += 1
    1511                  if i > 1000:
    1512                      return
    1513                  if verbose:
    1514                      if (i - 1) % 6 == 0:
    1515                          print()
    1516                      print(i, fix, '({} actions)'.format(len(actions)))
    1517                  else:
    1518                      if (i - 1) % 6 == 0:
    1519                          print(' ', end='')
    1520                      print('.', end=''); sys.stdout.flush()
    1521                  yield i, fix, actions
    1522              if verbose:
    1523                  print('---')
    1524          print()
    1525  
    1526      # This is useful for scanning through the possible tests.
    1527      def _skim_close_tests(self):
    1528          ChannelCloseFixture.QUICK = True
    1529          for i, fix, actions in self._iter_close_tests():
    1530              pass
    1531  
    1532      def test_close(self):
    1533          for i, fix, actions in self._iter_close_tests():
    1534              with self.subTest('{} {}  {}'.format(i, fix, actions)):
    1535                  fix.prep_interpreter(fix.interp)
    1536                  self.run_actions(fix, actions)
    1537  
    1538                  self._close(fix, force=False)
    1539  
    1540                  self._assert_closed(fix)
    1541              # XXX Things slow down if we have too many interpreters.
    1542              fix.clean_up()
    1543  
    1544      def test_force_close(self):
    1545          for i, fix, actions in self._iter_close_tests():
    1546              with self.subTest('{} {}  {}'.format(i, fix, actions)):
    1547                  fix.prep_interpreter(fix.interp)
    1548                  self.run_actions(fix, actions)
    1549  
    1550                  self._close(fix, force=True)
    1551  
    1552                  self._assert_closed(fix)
    1553              # XXX Things slow down if we have too many interpreters.
    1554              fix.clean_up()
    1555  
    1556  
    1557  if __name__ == '__main__':
    1558      unittest.main()