(root)/
Python-3.12.0/
Modules/
_xxinterpchannelsmodule.c
       1  
       2  /* interpreters module */
       3  /* low-level access to interpreter primitives */
       4  
       5  #include "Python.h"
       6  #include "interpreteridobject.h"
       7  
       8  
       9  /*
      10  This module has the following process-global state:
      11  
      12  _globals (static struct globals):
      13      module_count (int)
      14      channels (struct _channels):
      15          numopen (int64_t)
      16          next_id; (int64_t)
      17          mutex (PyThread_type_lock)
      18          head (linked list of struct _channelref *):
      19              id (int64_t)
      20              objcount (Py_ssize_t)
      21              next (struct _channelref *):
      22                  ...
      23              chan (struct _channel *):
      24                  open (int)
      25                  mutex (PyThread_type_lock)
      26                  closing (struct _channel_closing *):
      27                      ref (struct _channelref *):
      28                          ...
      29                  ends (struct _channelends *):
      30                      numsendopen (int64_t)
      31                      numrecvopen (int64_t)
      32                      send (struct _channelend *):
      33                          interp (int64_t)
      34                          open (int)
      35                          next (struct _channelend *)
      36                      recv (struct _channelend *):
      37                          ...
      38                  queue (struct _channelqueue *):
      39                      count (int64_t)
      40                      first (struct _channelitem *):
      41                          next (struct _channelitem *):
      42                              ...
      43                          data (_PyCrossInterpreterData *):
      44                              data (void *)
      45                              obj (PyObject *)
      46                              interp (int64_t)
      47                              new_object (xid_newobjectfunc)
      48                              free (xid_freefunc)
      49                      last (struct _channelitem *):
      50                          ...
      51  
      52  The above state includes the following allocations by the module:
      53  
      54  * 1 top-level mutex (to protect the rest of the state)
      55  * for each channel:
      56     * 1 struct _channelref
      57     * 1 struct _channel
      58     * 0-1 struct _channel_closing
      59     * 1 struct _channelends
      60     * 2 struct _channelend
      61     * 1 struct _channelqueue
      62  * for each item in each channel:
      63     * 1 struct _channelitem
      64     * 1 _PyCrossInterpreterData
      65  
      66  The only objects in that global state are the references held by each
      67  channel's queue, which are safely managed via the _PyCrossInterpreterData_*()
      68  API..  The module does not create any objects that are shared globally.
      69  */
      70  
      71  #define MODULE_NAME "_xxinterpchannels"
      72  
      73  
      74  #define GLOBAL_MALLOC(TYPE) \
      75      PyMem_RawMalloc(sizeof(TYPE))
      76  #define GLOBAL_FREE(VAR) \
      77      PyMem_RawFree(VAR)
      78  
      79  
      80  static PyInterpreterState *
      81  _get_current_interp(void)
      82  {
      83      // PyInterpreterState_Get() aborts if lookup fails, so don't need
      84      // to check the result for NULL.
      85      return PyInterpreterState_Get();
      86  }
      87  
      88  static PyObject *
      89  _get_current_module(void)
      90  {
      91      PyObject *name = PyUnicode_FromString(MODULE_NAME);
      92      if (name == NULL) {
      93          return NULL;
      94      }
      95      PyObject *mod = PyImport_GetModule(name);
      96      Py_DECREF(name);
      97      if (mod == NULL) {
      98          return NULL;
      99      }
     100      assert(mod != Py_None);
     101      return mod;
     102  }
     103  
     104  static PyObject *
     105  get_module_from_owned_type(PyTypeObject *cls)
     106  {
     107      assert(cls != NULL);
     108      return _get_current_module();
     109      // XXX Use the more efficient API now that we use heap types:
     110      //return PyType_GetModule(cls);
     111  }
     112  
     113  static struct PyModuleDef moduledef;
     114  
     115  static PyObject *
     116  get_module_from_type(PyTypeObject *cls)
     117  {
     118      assert(cls != NULL);
     119      return _get_current_module();
     120      // XXX Use the more efficient API now that we use heap types:
     121      //return PyType_GetModuleByDef(cls, &moduledef);
     122  }
     123  
     124  static PyObject *
     125  add_new_exception(PyObject *mod, const char *name, PyObject *base)
     126  {
     127      assert(!PyObject_HasAttrString(mod, name));
     128      PyObject *exctype = PyErr_NewException(name, base, NULL);
     129      if (exctype == NULL) {
     130          return NULL;
     131      }
     132      int res = PyModule_AddType(mod, (PyTypeObject *)exctype);
     133      if (res < 0) {
     134          Py_DECREF(exctype);
     135          return NULL;
     136      }
     137      return exctype;
     138  }
     139  
     140  #define ADD_NEW_EXCEPTION(MOD, NAME, BASE) \
     141      add_new_exception(MOD, MODULE_NAME "." Py_STRINGIFY(NAME), BASE)
     142  
     143  static PyTypeObject *
     144  add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared)
     145  {
     146      PyTypeObject *cls = (PyTypeObject *)PyType_FromMetaclass(
     147                  NULL, mod, spec, NULL);
     148      if (cls == NULL) {
     149          return NULL;
     150      }
     151      if (PyModule_AddType(mod, cls) < 0) {
     152          Py_DECREF(cls);
     153          return NULL;
     154      }
     155      if (shared != NULL) {
     156          if (_PyCrossInterpreterData_RegisterClass(cls, shared)) {
     157              Py_DECREF(cls);
     158              return NULL;
     159          }
     160      }
     161      return cls;
     162  }
     163  
     164  static int
     165  _release_xid_data(_PyCrossInterpreterData *data, int ignoreexc)
     166  {
     167      PyObject *exc;
     168      if (ignoreexc) {
     169          exc = PyErr_GetRaisedException();
     170      }
     171      int res = _PyCrossInterpreterData_Release(data);
     172      if (res < 0) {
     173          /* The owning interpreter is already destroyed. */
     174          if (ignoreexc) {
     175              // XXX Emit a warning?
     176              PyErr_Clear();
     177          }
     178      }
     179      if (ignoreexc) {
     180          PyErr_SetRaisedException(exc);
     181      }
     182      return res;
     183  }
     184  
     185  
     186  /* module state *************************************************************/
     187  
     188  typedef struct {
     189      /* heap types */
     190      PyTypeObject *ChannelIDType;
     191  
     192      /* exceptions */
     193      PyObject *ChannelError;
     194      PyObject *ChannelNotFoundError;
     195      PyObject *ChannelClosedError;
     196      PyObject *ChannelEmptyError;
     197      PyObject *ChannelNotEmptyError;
     198  } module_state;
     199  
     200  static inline module_state *
     201  get_module_state(PyObject *mod)
     202  {
     203      assert(mod != NULL);
     204      module_state *state = PyModule_GetState(mod);
     205      assert(state != NULL);
     206      return state;
     207  }
     208  
     209  static int
     210  traverse_module_state(module_state *state, visitproc visit, void *arg)
     211  {
     212      /* heap types */
     213      Py_VISIT(state->ChannelIDType);
     214  
     215      /* exceptions */
     216      Py_VISIT(state->ChannelError);
     217      Py_VISIT(state->ChannelNotFoundError);
     218      Py_VISIT(state->ChannelClosedError);
     219      Py_VISIT(state->ChannelEmptyError);
     220      Py_VISIT(state->ChannelNotEmptyError);
     221  
     222      return 0;
     223  }
     224  
     225  static int
     226  clear_module_state(module_state *state)
     227  {
     228      /* heap types */
     229      if (state->ChannelIDType != NULL) {
     230          (void)_PyCrossInterpreterData_UnregisterClass(state->ChannelIDType);
     231      }
     232      Py_CLEAR(state->ChannelIDType);
     233  
     234      /* exceptions */
     235      Py_CLEAR(state->ChannelError);
     236      Py_CLEAR(state->ChannelNotFoundError);
     237      Py_CLEAR(state->ChannelClosedError);
     238      Py_CLEAR(state->ChannelEmptyError);
     239      Py_CLEAR(state->ChannelNotEmptyError);
     240  
     241      return 0;
     242  }
     243  
     244  
     245  /* channel-specific code ****************************************************/
     246  
     247  #define CHANNEL_SEND 1
     248  #define CHANNEL_BOTH 0
     249  #define CHANNEL_RECV -1
     250  
     251  /* channel errors */
     252  
     253  #define ERR_CHANNEL_NOT_FOUND -2
     254  #define ERR_CHANNEL_CLOSED -3
     255  #define ERR_CHANNEL_INTERP_CLOSED -4
     256  #define ERR_CHANNEL_EMPTY -5
     257  #define ERR_CHANNEL_NOT_EMPTY -6
     258  #define ERR_CHANNEL_MUTEX_INIT -7
     259  #define ERR_CHANNELS_MUTEX_INIT -8
     260  #define ERR_NO_NEXT_CHANNEL_ID -9
     261  
     262  static int
     263  exceptions_init(PyObject *mod)
     264  {
     265      module_state *state = get_module_state(mod);
     266      if (state == NULL) {
     267          return -1;
     268      }
     269  
     270  #define ADD(NAME, BASE) \
     271      do { \
     272          assert(state->NAME == NULL); \
     273          state->NAME = ADD_NEW_EXCEPTION(mod, NAME, BASE); \
     274          if (state->NAME == NULL) { \
     275              return -1; \
     276          } \
     277      } while (0)
     278  
     279      // A channel-related operation failed.
     280      ADD(ChannelError, PyExc_RuntimeError);
     281      // An operation tried to use a channel that doesn't exist.
     282      ADD(ChannelNotFoundError, state->ChannelError);
     283      // An operation tried to use a closed channel.
     284      ADD(ChannelClosedError, state->ChannelError);
     285      // An operation tried to pop from an empty channel.
     286      ADD(ChannelEmptyError, state->ChannelError);
     287      // An operation tried to close a non-empty channel.
     288      ADD(ChannelNotEmptyError, state->ChannelError);
     289  #undef ADD
     290  
     291      return 0;
     292  }
     293  
     294  static int
     295  handle_channel_error(int err, PyObject *mod, int64_t cid)
     296  {
     297      if (err == 0) {
     298          assert(!PyErr_Occurred());
     299          return 0;
     300      }
     301      assert(err < 0);
     302      module_state *state = get_module_state(mod);
     303      assert(state != NULL);
     304      if (err == ERR_CHANNEL_NOT_FOUND) {
     305          PyErr_Format(state->ChannelNotFoundError,
     306                       "channel %" PRId64 " not found", cid);
     307      }
     308      else if (err == ERR_CHANNEL_CLOSED) {
     309          PyErr_Format(state->ChannelClosedError,
     310                       "channel %" PRId64 " is closed", cid);
     311      }
     312      else if (err == ERR_CHANNEL_INTERP_CLOSED) {
     313          PyErr_Format(state->ChannelClosedError,
     314                       "channel %" PRId64 " is already closed", cid);
     315      }
     316      else if (err == ERR_CHANNEL_EMPTY) {
     317          PyErr_Format(state->ChannelEmptyError,
     318                       "channel %" PRId64 " is empty", cid);
     319      }
     320      else if (err == ERR_CHANNEL_NOT_EMPTY) {
     321          PyErr_Format(state->ChannelNotEmptyError,
     322                       "channel %" PRId64 " may not be closed "
     323                       "if not empty (try force=True)",
     324                       cid);
     325      }
     326      else if (err == ERR_CHANNEL_MUTEX_INIT) {
     327          PyErr_SetString(state->ChannelError,
     328                          "can't initialize mutex for new channel");
     329      }
     330      else if (err == ERR_CHANNELS_MUTEX_INIT) {
     331          PyErr_SetString(state->ChannelError,
     332                          "can't initialize mutex for channel management");
     333      }
     334      else if (err == ERR_NO_NEXT_CHANNEL_ID) {
     335          PyErr_SetString(state->ChannelError,
     336                          "failed to get a channel ID");
     337      }
     338      else {
     339          assert(PyErr_Occurred());
     340      }
     341      return 1;
     342  }
     343  
     344  /* the channel queue */
     345  
     346  struct _channelitem;
     347  
     348  typedef struct _channelitem {
     349      _PyCrossInterpreterData *data;
     350      struct _channelitem *next;
     351  } _channelitem;
     352  
     353  static _channelitem *
     354  _channelitem_new(void)
     355  {
     356      _channelitem *item = GLOBAL_MALLOC(_channelitem);
     357      if (item == NULL) {
     358          PyErr_NoMemory();
     359          return NULL;
     360      }
     361      item->data = NULL;
     362      item->next = NULL;
     363      return item;
     364  }
     365  
     366  static void
     367  _channelitem_clear(_channelitem *item)
     368  {
     369      if (item->data != NULL) {
     370          (void)_release_xid_data(item->data, 1);
     371          // It was allocated in _channel_send().
     372          GLOBAL_FREE(item->data);
     373          item->data = NULL;
     374      }
     375      item->next = NULL;
     376  }
     377  
     378  static void
     379  _channelitem_free(_channelitem *item)
     380  {
     381      _channelitem_clear(item);
     382      GLOBAL_FREE(item);
     383  }
     384  
     385  static void
     386  _channelitem_free_all(_channelitem *item)
     387  {
     388      while (item != NULL) {
     389          _channelitem *last = item;
     390          item = item->next;
     391          _channelitem_free(last);
     392      }
     393  }
     394  
     395  static _PyCrossInterpreterData *
     396  _channelitem_popped(_channelitem *item)
     397  {
     398      _PyCrossInterpreterData *data = item->data;
     399      item->data = NULL;
     400      _channelitem_free(item);
     401      return data;
     402  }
     403  
     404  typedef struct _channelqueue {
     405      int64_t count;
     406      _channelitem *first;
     407      _channelitem *last;
     408  } _channelqueue;
     409  
     410  static _channelqueue *
     411  _channelqueue_new(void)
     412  {
     413      _channelqueue *queue = GLOBAL_MALLOC(_channelqueue);
     414      if (queue == NULL) {
     415          PyErr_NoMemory();
     416          return NULL;
     417      }
     418      queue->count = 0;
     419      queue->first = NULL;
     420      queue->last = NULL;
     421      return queue;
     422  }
     423  
     424  static void
     425  _channelqueue_clear(_channelqueue *queue)
     426  {
     427      _channelitem_free_all(queue->first);
     428      queue->count = 0;
     429      queue->first = NULL;
     430      queue->last = NULL;
     431  }
     432  
     433  static void
     434  _channelqueue_free(_channelqueue *queue)
     435  {
     436      _channelqueue_clear(queue);
     437      GLOBAL_FREE(queue);
     438  }
     439  
     440  static int
     441  _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
     442  {
     443      _channelitem *item = _channelitem_new();
     444      if (item == NULL) {
     445          return -1;
     446      }
     447      item->data = data;
     448  
     449      queue->count += 1;
     450      if (queue->first == NULL) {
     451          queue->first = item;
     452      }
     453      else {
     454          queue->last->next = item;
     455      }
     456      queue->last = item;
     457      return 0;
     458  }
     459  
     460  static _PyCrossInterpreterData *
     461  _channelqueue_get(_channelqueue *queue)
     462  {
     463      _channelitem *item = queue->first;
     464      if (item == NULL) {
     465          return NULL;
     466      }
     467      queue->first = item->next;
     468      if (queue->last == item) {
     469          queue->last = NULL;
     470      }
     471      queue->count -= 1;
     472  
     473      return _channelitem_popped(item);
     474  }
     475  
     476  static void
     477  _channelqueue_drop_interpreter(_channelqueue *queue, int64_t interp)
     478  {
     479      _channelitem *prev = NULL;
     480      _channelitem *next = queue->first;
     481      while (next != NULL) {
     482          _channelitem *item = next;
     483          next = item->next;
     484          if (item->data->interp == interp) {
     485              if (prev == NULL) {
     486                  queue->first = item->next;
     487              }
     488              else {
     489                  prev->next = item->next;
     490              }
     491              _channelitem_free(item);
     492              queue->count -= 1;
     493          }
     494          else {
     495              prev = item;
     496          }
     497      }
     498  }
     499  
     500  /* channel-interpreter associations */
     501  
     502  struct _channelend;
     503  
     504  typedef struct _channelend {
     505      struct _channelend *next;
     506      int64_t interp;
     507      int open;
     508  } _channelend;
     509  
     510  static _channelend *
     511  _channelend_new(int64_t interp)
     512  {
     513      _channelend *end = GLOBAL_MALLOC(_channelend);
     514      if (end == NULL) {
     515          PyErr_NoMemory();
     516          return NULL;
     517      }
     518      end->next = NULL;
     519      end->interp = interp;
     520      end->open = 1;
     521      return end;
     522  }
     523  
     524  static void
     525  _channelend_free(_channelend *end)
     526  {
     527      GLOBAL_FREE(end);
     528  }
     529  
     530  static void
     531  _channelend_free_all(_channelend *end)
     532  {
     533      while (end != NULL) {
     534          _channelend *last = end;
     535          end = end->next;
     536          _channelend_free(last);
     537      }
     538  }
     539  
     540  static _channelend *
     541  _channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
     542  {
     543      _channelend *prev = NULL;
     544      _channelend *end = first;
     545      while (end != NULL) {
     546          if (end->interp == interp) {
     547              break;
     548          }
     549          prev = end;
     550          end = end->next;
     551      }
     552      if (pprev != NULL) {
     553          *pprev = prev;
     554      }
     555      return end;
     556  }
     557  
     558  typedef struct _channelassociations {
     559      // Note that the list entries are never removed for interpreter
     560      // for which the channel is closed.  This should not be a problem in
     561      // practice.  Also, a channel isn't automatically closed when an
     562      // interpreter is destroyed.
     563      int64_t numsendopen;
     564      int64_t numrecvopen;
     565      _channelend *send;
     566      _channelend *recv;
     567  } _channelends;
     568  
     569  static _channelends *
     570  _channelends_new(void)
     571  {
     572      _channelends *ends = GLOBAL_MALLOC(_channelends);
     573      if (ends== NULL) {
     574          return NULL;
     575      }
     576      ends->numsendopen = 0;
     577      ends->numrecvopen = 0;
     578      ends->send = NULL;
     579      ends->recv = NULL;
     580      return ends;
     581  }
     582  
     583  static void
     584  _channelends_clear(_channelends *ends)
     585  {
     586      _channelend_free_all(ends->send);
     587      ends->send = NULL;
     588      ends->numsendopen = 0;
     589  
     590      _channelend_free_all(ends->recv);
     591      ends->recv = NULL;
     592      ends->numrecvopen = 0;
     593  }
     594  
     595  static void
     596  _channelends_free(_channelends *ends)
     597  {
     598      _channelends_clear(ends);
     599      GLOBAL_FREE(ends);
     600  }
     601  
     602  static _channelend *
     603  _channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
     604                   int send)
     605  {
     606      _channelend *end = _channelend_new(interp);
     607      if (end == NULL) {
     608          return NULL;
     609      }
     610  
     611      if (prev == NULL) {
     612          if (send) {
     613              ends->send = end;
     614          }
     615          else {
     616              ends->recv = end;
     617          }
     618      }
     619      else {
     620          prev->next = end;
     621      }
     622      if (send) {
     623          ends->numsendopen += 1;
     624      }
     625      else {
     626          ends->numrecvopen += 1;
     627      }
     628      return end;
     629  }
     630  
     631  static int
     632  _channelends_associate(_channelends *ends, int64_t interp, int send)
     633  {
     634      _channelend *prev;
     635      _channelend *end = _channelend_find(send ? ends->send : ends->recv,
     636                                          interp, &prev);
     637      if (end != NULL) {
     638          if (!end->open) {
     639              return ERR_CHANNEL_CLOSED;
     640          }
     641          // already associated
     642          return 0;
     643      }
     644      if (_channelends_add(ends, prev, interp, send) == NULL) {
     645          return -1;
     646      }
     647      return 0;
     648  }
     649  
     650  static int
     651  _channelends_is_open(_channelends *ends)
     652  {
     653      if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
     654          return 1;
     655      }
     656      if (ends->send == NULL && ends->recv == NULL) {
     657          return 1;
     658      }
     659      return 0;
     660  }
     661  
     662  static void
     663  _channelends_close_end(_channelends *ends, _channelend *end, int send)
     664  {
     665      end->open = 0;
     666      if (send) {
     667          ends->numsendopen -= 1;
     668      }
     669      else {
     670          ends->numrecvopen -= 1;
     671      }
     672  }
     673  
     674  static int
     675  _channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
     676  {
     677      _channelend *prev;
     678      _channelend *end;
     679      if (which >= 0) {  // send/both
     680          end = _channelend_find(ends->send, interp, &prev);
     681          if (end == NULL) {
     682              // never associated so add it
     683              end = _channelends_add(ends, prev, interp, 1);
     684              if (end == NULL) {
     685                  return -1;
     686              }
     687          }
     688          _channelends_close_end(ends, end, 1);
     689      }
     690      if (which <= 0) {  // recv/both
     691          end = _channelend_find(ends->recv, interp, &prev);
     692          if (end == NULL) {
     693              // never associated so add it
     694              end = _channelends_add(ends, prev, interp, 0);
     695              if (end == NULL) {
     696                  return -1;
     697              }
     698          }
     699          _channelends_close_end(ends, end, 0);
     700      }
     701      return 0;
     702  }
     703  
     704  static void
     705  _channelends_drop_interpreter(_channelends *ends, int64_t interp)
     706  {
     707      _channelend *end;
     708      end = _channelend_find(ends->send, interp, NULL);
     709      if (end != NULL) {
     710          _channelends_close_end(ends, end, 1);
     711      }
     712      end = _channelend_find(ends->recv, interp, NULL);
     713      if (end != NULL) {
     714          _channelends_close_end(ends, end, 0);
     715      }
     716  }
     717  
     718  static void
     719  _channelends_close_all(_channelends *ends, int which, int force)
     720  {
     721      // XXX Handle the ends.
     722      // XXX Handle force is True.
     723  
     724      // Ensure all the "send"-associated interpreters are closed.
     725      _channelend *end;
     726      for (end = ends->send; end != NULL; end = end->next) {
     727          _channelends_close_end(ends, end, 1);
     728      }
     729  
     730      // Ensure all the "recv"-associated interpreters are closed.
     731      for (end = ends->recv; end != NULL; end = end->next) {
     732          _channelends_close_end(ends, end, 0);
     733      }
     734  }
     735  
     736  /* channels */
     737  
     738  struct _channel;
     739  struct _channel_closing;
     740  static void _channel_clear_closing(struct _channel *);
     741  static void _channel_finish_closing(struct _channel *);
     742  
     743  typedef struct _channel {
     744      PyThread_type_lock mutex;
     745      _channelqueue *queue;
     746      _channelends *ends;
     747      int open;
     748      struct _channel_closing *closing;
     749  } _PyChannelState;
     750  
     751  static _PyChannelState *
     752  _channel_new(PyThread_type_lock mutex)
     753  {
     754      _PyChannelState *chan = GLOBAL_MALLOC(_PyChannelState);
     755      if (chan == NULL) {
     756          return NULL;
     757      }
     758      chan->mutex = mutex;
     759      chan->queue = _channelqueue_new();
     760      if (chan->queue == NULL) {
     761          GLOBAL_FREE(chan);
     762          return NULL;
     763      }
     764      chan->ends = _channelends_new();
     765      if (chan->ends == NULL) {
     766          _channelqueue_free(chan->queue);
     767          GLOBAL_FREE(chan);
     768          return NULL;
     769      }
     770      chan->open = 1;
     771      chan->closing = NULL;
     772      return chan;
     773  }
     774  
     775  static void
     776  _channel_free(_PyChannelState *chan)
     777  {
     778      _channel_clear_closing(chan);
     779      PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
     780      _channelqueue_free(chan->queue);
     781      _channelends_free(chan->ends);
     782      PyThread_release_lock(chan->mutex);
     783  
     784      PyThread_free_lock(chan->mutex);
     785      GLOBAL_FREE(chan);
     786  }
     787  
     788  static int
     789  _channel_add(_PyChannelState *chan, int64_t interp,
     790               _PyCrossInterpreterData *data)
     791  {
     792      int res = -1;
     793      PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
     794  
     795      if (!chan->open) {
     796          res = ERR_CHANNEL_CLOSED;
     797          goto done;
     798      }
     799      if (_channelends_associate(chan->ends, interp, 1) != 0) {
     800          res = ERR_CHANNEL_INTERP_CLOSED;
     801          goto done;
     802      }
     803  
     804      if (_channelqueue_put(chan->queue, data) != 0) {
     805          goto done;
     806      }
     807  
     808      res = 0;
     809  done:
     810      PyThread_release_lock(chan->mutex);
     811      return res;
     812  }
     813  
     814  static int
     815  _channel_next(_PyChannelState *chan, int64_t interp,
     816                _PyCrossInterpreterData **res)
     817  {
     818      int err = 0;
     819      PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
     820  
     821      if (!chan->open) {
     822          err = ERR_CHANNEL_CLOSED;
     823          goto done;
     824      }
     825      if (_channelends_associate(chan->ends, interp, 0) != 0) {
     826          err = ERR_CHANNEL_INTERP_CLOSED;
     827          goto done;
     828      }
     829  
     830      _PyCrossInterpreterData *data = _channelqueue_get(chan->queue);
     831      if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) {
     832          chan->open = 0;
     833      }
     834      *res = data;
     835  
     836  done:
     837      PyThread_release_lock(chan->mutex);
     838      if (chan->queue->count == 0) {
     839          _channel_finish_closing(chan);
     840      }
     841      return err;
     842  }
     843  
     844  static int
     845  _channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
     846  {
     847      PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
     848  
     849      int res = -1;
     850      if (!chan->open) {
     851          res = ERR_CHANNEL_CLOSED;
     852          goto done;
     853      }
     854  
     855      if (_channelends_close_interpreter(chan->ends, interp, end) != 0) {
     856          goto done;
     857      }
     858      chan->open = _channelends_is_open(chan->ends);
     859  
     860      res = 0;
     861  done:
     862      PyThread_release_lock(chan->mutex);
     863      return res;
     864  }
     865  
     866  static void
     867  _channel_drop_interpreter(_PyChannelState *chan, int64_t interp)
     868  {
     869      PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
     870  
     871      _channelqueue_drop_interpreter(chan->queue, interp);
     872      _channelends_drop_interpreter(chan->ends, interp);
     873      chan->open = _channelends_is_open(chan->ends);
     874  
     875      PyThread_release_lock(chan->mutex);
     876  }
     877  
     878  static int
     879  _channel_close_all(_PyChannelState *chan, int end, int force)
     880  {
     881      int res = -1;
     882      PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
     883  
     884      if (!chan->open) {
     885          res = ERR_CHANNEL_CLOSED;
     886          goto done;
     887      }
     888  
     889      if (!force && chan->queue->count > 0) {
     890          res = ERR_CHANNEL_NOT_EMPTY;
     891          goto done;
     892      }
     893  
     894      chan->open = 0;
     895  
     896      // We *could* also just leave these in place, since we've marked
     897      // the channel as closed already.
     898      _channelends_close_all(chan->ends, end, force);
     899  
     900      res = 0;
     901  done:
     902      PyThread_release_lock(chan->mutex);
     903      return res;
     904  }
     905  
     906  /* the set of channels */
     907  
     908  struct _channelref;
     909  
     910  typedef struct _channelref {
     911      int64_t id;
     912      _PyChannelState *chan;
     913      struct _channelref *next;
     914      Py_ssize_t objcount;
     915  } _channelref;
     916  
     917  static _channelref *
     918  _channelref_new(int64_t id, _PyChannelState *chan)
     919  {
     920      _channelref *ref = GLOBAL_MALLOC(_channelref);
     921      if (ref == NULL) {
     922          return NULL;
     923      }
     924      ref->id = id;
     925      ref->chan = chan;
     926      ref->next = NULL;
     927      ref->objcount = 0;
     928      return ref;
     929  }
     930  
     931  //static void
     932  //_channelref_clear(_channelref *ref)
     933  //{
     934  //    ref->id = -1;
     935  //    ref->chan = NULL;
     936  //    ref->next = NULL;
     937  //    ref->objcount = 0;
     938  //}
     939  
     940  static void
     941  _channelref_free(_channelref *ref)
     942  {
     943      if (ref->chan != NULL) {
     944          _channel_clear_closing(ref->chan);
     945      }
     946      //_channelref_clear(ref);
     947      GLOBAL_FREE(ref);
     948  }
     949  
     950  static _channelref *
     951  _channelref_find(_channelref *first, int64_t id, _channelref **pprev)
     952  {
     953      _channelref *prev = NULL;
     954      _channelref *ref = first;
     955      while (ref != NULL) {
     956          if (ref->id == id) {
     957              break;
     958          }
     959          prev = ref;
     960          ref = ref->next;
     961      }
     962      if (pprev != NULL) {
     963          *pprev = prev;
     964      }
     965      return ref;
     966  }
     967  
     968  typedef struct _channels {
     969      PyThread_type_lock mutex;
     970      _channelref *head;
     971      int64_t numopen;
     972      int64_t next_id;
     973  } _channels;
     974  
     975  static void
     976  _channels_init(_channels *channels, PyThread_type_lock mutex)
     977  {
     978      channels->mutex = mutex;
     979      channels->head = NULL;
     980      channels->numopen = 0;
     981      channels->next_id = 0;
     982  }
     983  
     984  static void
     985  _channels_fini(_channels *channels)
     986  {
     987      assert(channels->numopen == 0);
     988      assert(channels->head == NULL);
     989      if (channels->mutex != NULL) {
     990          PyThread_free_lock(channels->mutex);
     991          channels->mutex = NULL;
     992      }
     993  }
     994  
     995  static int64_t
     996  _channels_next_id(_channels *channels)  // needs lock
     997  {
     998      int64_t id = channels->next_id;
     999      if (id < 0) {
    1000          /* overflow */
    1001          return -1;
    1002      }
    1003      channels->next_id += 1;
    1004      return id;
    1005  }
    1006  
    1007  static int
    1008  _channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex,
    1009                   _PyChannelState **res)
    1010  {
    1011      int err = -1;
    1012      _PyChannelState *chan = NULL;
    1013      PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
    1014      if (pmutex != NULL) {
    1015          *pmutex = NULL;
    1016      }
    1017  
    1018      _channelref *ref = _channelref_find(channels->head, id, NULL);
    1019      if (ref == NULL) {
    1020          err = ERR_CHANNEL_NOT_FOUND;
    1021          goto done;
    1022      }
    1023      if (ref->chan == NULL || !ref->chan->open) {
    1024          err = ERR_CHANNEL_CLOSED;
    1025          goto done;
    1026      }
    1027  
    1028      if (pmutex != NULL) {
    1029          // The mutex will be closed by the caller.
    1030          *pmutex = channels->mutex;
    1031      }
    1032  
    1033      chan = ref->chan;
    1034      err = 0;
    1035  
    1036  done:
    1037      if (pmutex == NULL || *pmutex == NULL) {
    1038          PyThread_release_lock(channels->mutex);
    1039      }
    1040      *res = chan;
    1041      return err;
    1042  }
    1043  
    1044  static int64_t
    1045  _channels_add(_channels *channels, _PyChannelState *chan)
    1046  {
    1047      int64_t cid = -1;
    1048      PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
    1049  
    1050      // Create a new ref.
    1051      int64_t id = _channels_next_id(channels);
    1052      if (id < 0) {
    1053          cid = ERR_NO_NEXT_CHANNEL_ID;
    1054          goto done;
    1055      }
    1056      _channelref *ref = _channelref_new(id, chan);
    1057      if (ref == NULL) {
    1058          goto done;
    1059      }
    1060  
    1061      // Add it to the list.
    1062      // We assume that the channel is a new one (not already in the list).
    1063      ref->next = channels->head;
    1064      channels->head = ref;
    1065      channels->numopen += 1;
    1066  
    1067      cid = id;
    1068  done:
    1069      PyThread_release_lock(channels->mutex);
    1070      return cid;
    1071  }
    1072  
    1073  /* forward */
    1074  static int _channel_set_closing(struct _channelref *, PyThread_type_lock);
    1075  
    1076  static int
    1077  _channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan,
    1078                  int end, int force)
    1079  {
    1080      int res = -1;
    1081      PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
    1082      if (pchan != NULL) {
    1083          *pchan = NULL;
    1084      }
    1085  
    1086      _channelref *ref = _channelref_find(channels->head, cid, NULL);
    1087      if (ref == NULL) {
    1088          res = ERR_CHANNEL_NOT_FOUND;
    1089          goto done;
    1090      }
    1091  
    1092      if (ref->chan == NULL) {
    1093          res = ERR_CHANNEL_CLOSED;
    1094          goto done;
    1095      }
    1096      else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) {
    1097          res = ERR_CHANNEL_CLOSED;
    1098          goto done;
    1099      }
    1100      else {
    1101          int err = _channel_close_all(ref->chan, end, force);
    1102          if (err != 0) {
    1103              if (end == CHANNEL_SEND && err == ERR_CHANNEL_NOT_EMPTY) {
    1104                  if (ref->chan->closing != NULL) {
    1105                      res = ERR_CHANNEL_CLOSED;
    1106                      goto done;
    1107                  }
    1108                  // Mark the channel as closing and return.  The channel
    1109                  // will be cleaned up in _channel_next().
    1110                  PyErr_Clear();
    1111                  int err = _channel_set_closing(ref, channels->mutex);
    1112                  if (err != 0) {
    1113                      res = err;
    1114                      goto done;
    1115                  }
    1116                  if (pchan != NULL) {
    1117                      *pchan = ref->chan;
    1118                  }
    1119                  res = 0;
    1120              }
    1121              else {
    1122                  res = err;
    1123              }
    1124              goto done;
    1125          }
    1126          if (pchan != NULL) {
    1127              *pchan = ref->chan;
    1128          }
    1129          else  {
    1130              _channel_free(ref->chan);
    1131          }
    1132          ref->chan = NULL;
    1133      }
    1134  
    1135      res = 0;
    1136  done:
    1137      PyThread_release_lock(channels->mutex);
    1138      return res;
    1139  }
    1140  
    1141  static void
    1142  _channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
    1143                       _PyChannelState **pchan)
    1144  {
    1145      if (ref == channels->head) {
    1146          channels->head = ref->next;
    1147      }
    1148      else {
    1149          prev->next = ref->next;
    1150      }
    1151      channels->numopen -= 1;
    1152  
    1153      if (pchan != NULL) {
    1154          *pchan = ref->chan;
    1155      }
    1156      _channelref_free(ref);
    1157  }
    1158  
    1159  static int
    1160  _channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan)
    1161  {
    1162      int res = -1;
    1163      PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
    1164  
    1165      if (pchan != NULL) {
    1166          *pchan = NULL;
    1167      }
    1168  
    1169      _channelref *prev = NULL;
    1170      _channelref *ref = _channelref_find(channels->head, id, &prev);
    1171      if (ref == NULL) {
    1172          res = ERR_CHANNEL_NOT_FOUND;
    1173          goto done;
    1174      }
    1175  
    1176      _channels_remove_ref(channels, ref, prev, pchan);
    1177  
    1178      res = 0;
    1179  done:
    1180      PyThread_release_lock(channels->mutex);
    1181      return res;
    1182  }
    1183  
    1184  static int
    1185  _channels_add_id_object(_channels *channels, int64_t id)
    1186  {
    1187      int res = -1;
    1188      PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
    1189  
    1190      _channelref *ref = _channelref_find(channels->head, id, NULL);
    1191      if (ref == NULL) {
    1192          res = ERR_CHANNEL_NOT_FOUND;
    1193          goto done;
    1194      }
    1195      ref->objcount += 1;
    1196  
    1197      res = 0;
    1198  done:
    1199      PyThread_release_lock(channels->mutex);
    1200      return res;
    1201  }
    1202  
    1203  static void
    1204  _channels_drop_id_object(_channels *channels, int64_t id)
    1205  {
    1206      PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
    1207  
    1208      _channelref *prev = NULL;
    1209      _channelref *ref = _channelref_find(channels->head, id, &prev);
    1210      if (ref == NULL) {
    1211          // Already destroyed.
    1212          goto done;
    1213      }
    1214      ref->objcount -= 1;
    1215  
    1216      // Destroy if no longer used.
    1217      if (ref->objcount == 0) {
    1218          _PyChannelState *chan = NULL;
    1219          _channels_remove_ref(channels, ref, prev, &chan);
    1220          if (chan != NULL) {
    1221              _channel_free(chan);
    1222          }
    1223      }
    1224  
    1225  done:
    1226      PyThread_release_lock(channels->mutex);
    1227  }
    1228  
    1229  static int64_t *
    1230  _channels_list_all(_channels *channels, int64_t *count)
    1231  {
    1232      int64_t *cids = NULL;
    1233      PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
    1234      int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
    1235      if (ids == NULL) {
    1236          goto done;
    1237      }
    1238      _channelref *ref = channels->head;
    1239      for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
    1240          ids[i] = ref->id;
    1241      }
    1242      *count = channels->numopen;
    1243  
    1244      cids = ids;
    1245  done:
    1246      PyThread_release_lock(channels->mutex);
    1247      return cids;
    1248  }
    1249  
    1250  static void
    1251  _channels_drop_interpreter(_channels *channels, int64_t interp)
    1252  {
    1253      PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
    1254  
    1255      _channelref *ref = channels->head;
    1256      for (; ref != NULL; ref = ref->next) {
    1257          if (ref->chan != NULL) {
    1258              _channel_drop_interpreter(ref->chan, interp);
    1259          }
    1260      }
    1261  
    1262      PyThread_release_lock(channels->mutex);
    1263  }
    1264  
    1265  /* support for closing non-empty channels */
    1266  
    1267  struct _channel_closing {
    1268      struct _channelref *ref;
    1269  };
    1270  
    1271  static int
    1272  _channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
    1273      struct _channel *chan = ref->chan;
    1274      if (chan == NULL) {
    1275          // already closed
    1276          return 0;
    1277      }
    1278      int res = -1;
    1279      PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
    1280      if (chan->closing != NULL) {
    1281          res = ERR_CHANNEL_CLOSED;
    1282          goto done;
    1283      }
    1284      chan->closing = GLOBAL_MALLOC(struct _channel_closing);
    1285      if (chan->closing == NULL) {
    1286          goto done;
    1287      }
    1288      chan->closing->ref = ref;
    1289  
    1290      res = 0;
    1291  done:
    1292      PyThread_release_lock(chan->mutex);
    1293      return res;
    1294  }
    1295  
    1296  static void
    1297  _channel_clear_closing(struct _channel *chan) {
    1298      PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
    1299      if (chan->closing != NULL) {
    1300          GLOBAL_FREE(chan->closing);
    1301          chan->closing = NULL;
    1302      }
    1303      PyThread_release_lock(chan->mutex);
    1304  }
    1305  
    1306  static void
    1307  _channel_finish_closing(struct _channel *chan) {
    1308      struct _channel_closing *closing = chan->closing;
    1309      if (closing == NULL) {
    1310          return;
    1311      }
    1312      _channelref *ref = closing->ref;
    1313      _channel_clear_closing(chan);
    1314      // Do the things that would have been done in _channels_close().
    1315      ref->chan = NULL;
    1316      _channel_free(chan);
    1317  }
    1318  
    1319  /* "high"-level channel-related functions */
    1320  
    1321  static int64_t
    1322  _channel_create(_channels *channels)
    1323  {
    1324      PyThread_type_lock mutex = PyThread_allocate_lock();
    1325      if (mutex == NULL) {
    1326          return ERR_CHANNEL_MUTEX_INIT;
    1327      }
    1328      _PyChannelState *chan = _channel_new(mutex);
    1329      if (chan == NULL) {
    1330          PyThread_free_lock(mutex);
    1331          return -1;
    1332      }
    1333      int64_t id = _channels_add(channels, chan);
    1334      if (id < 0) {
    1335          _channel_free(chan);
    1336      }
    1337      return id;
    1338  }
    1339  
    1340  static int
    1341  _channel_destroy(_channels *channels, int64_t id)
    1342  {
    1343      _PyChannelState *chan = NULL;
    1344      int err = _channels_remove(channels, id, &chan);
    1345      if (err != 0) {
    1346          return err;
    1347      }
    1348      if (chan != NULL) {
    1349          _channel_free(chan);
    1350      }
    1351      return 0;
    1352  }
    1353  
    1354  static int
    1355  _channel_send(_channels *channels, int64_t id, PyObject *obj)
    1356  {
    1357      PyInterpreterState *interp = _get_current_interp();
    1358      if (interp == NULL) {
    1359          return -1;
    1360      }
    1361  
    1362      // Look up the channel.
    1363      PyThread_type_lock mutex = NULL;
    1364      _PyChannelState *chan = NULL;
    1365      int err = _channels_lookup(channels, id, &mutex, &chan);
    1366      if (err != 0) {
    1367          return err;
    1368      }
    1369      assert(chan != NULL);
    1370      // Past this point we are responsible for releasing the mutex.
    1371  
    1372      if (chan->closing != NULL) {
    1373          PyThread_release_lock(mutex);
    1374          return ERR_CHANNEL_CLOSED;
    1375      }
    1376  
    1377      // Convert the object to cross-interpreter data.
    1378      _PyCrossInterpreterData *data = GLOBAL_MALLOC(_PyCrossInterpreterData);
    1379      if (data == NULL) {
    1380          PyThread_release_lock(mutex);
    1381          return -1;
    1382      }
    1383      if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
    1384          PyThread_release_lock(mutex);
    1385          GLOBAL_FREE(data);
    1386          return -1;
    1387      }
    1388  
    1389      // Add the data to the channel.
    1390      int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
    1391      PyThread_release_lock(mutex);
    1392      if (res != 0) {
    1393          // We may chain an exception here:
    1394          (void)_release_xid_data(data, 0);
    1395          GLOBAL_FREE(data);
    1396          return res;
    1397      }
    1398  
    1399      return 0;
    1400  }
    1401  
    1402  static int
    1403  _channel_recv(_channels *channels, int64_t id, PyObject **res)
    1404  {
    1405      int err;
    1406      *res = NULL;
    1407  
    1408      PyInterpreterState *interp = _get_current_interp();
    1409      if (interp == NULL) {
    1410          // XXX Is this always an error?
    1411          if (PyErr_Occurred()) {
    1412              return -1;
    1413          }
    1414          return 0;
    1415      }
    1416  
    1417      // Look up the channel.
    1418      PyThread_type_lock mutex = NULL;
    1419      _PyChannelState *chan = NULL;
    1420      err = _channels_lookup(channels, id, &mutex, &chan);
    1421      if (err != 0) {
    1422          return err;
    1423      }
    1424      assert(chan != NULL);
    1425      // Past this point we are responsible for releasing the mutex.
    1426  
    1427      // Pop off the next item from the channel.
    1428      _PyCrossInterpreterData *data = NULL;
    1429      err = _channel_next(chan, PyInterpreterState_GetID(interp), &data);
    1430      PyThread_release_lock(mutex);
    1431      if (err != 0) {
    1432          return err;
    1433      }
    1434      else if (data == NULL) {
    1435          assert(!PyErr_Occurred());
    1436          return 0;
    1437      }
    1438  
    1439      // Convert the data back to an object.
    1440      PyObject *obj = _PyCrossInterpreterData_NewObject(data);
    1441      if (obj == NULL) {
    1442          assert(PyErr_Occurred());
    1443          (void)_release_xid_data(data, 1);
    1444          // It was allocated in _channel_send().
    1445          GLOBAL_FREE(data);
    1446          return -1;
    1447      }
    1448      int release_res = _release_xid_data(data, 0);
    1449      // It was allocated in _channel_send().
    1450      GLOBAL_FREE(data);
    1451      if (release_res < 0) {
    1452          // The source interpreter has been destroyed already.
    1453          assert(PyErr_Occurred());
    1454          Py_DECREF(obj);
    1455          return -1;
    1456      }
    1457  
    1458      *res = obj;
    1459      return 0;
    1460  }
    1461  
    1462  static int
    1463  _channel_drop(_channels *channels, int64_t id, int send, int recv)
    1464  {
    1465      PyInterpreterState *interp = _get_current_interp();
    1466      if (interp == NULL) {
    1467          return -1;
    1468      }
    1469  
    1470      // Look up the channel.
    1471      PyThread_type_lock mutex = NULL;
    1472      _PyChannelState *chan = NULL;
    1473      int err = _channels_lookup(channels, id, &mutex, &chan);
    1474      if (err != 0) {
    1475          return err;
    1476      }
    1477      // Past this point we are responsible for releasing the mutex.
    1478  
    1479      // Close one or both of the two ends.
    1480      int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv);
    1481      PyThread_release_lock(mutex);
    1482      return res;
    1483  }
    1484  
    1485  static int
    1486  _channel_close(_channels *channels, int64_t id, int end, int force)
    1487  {
    1488      return _channels_close(channels, id, NULL, end, force);
    1489  }
    1490  
    1491  static int
    1492  _channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
    1493                         int send)
    1494  {
    1495      _PyChannelState *chan = NULL;
    1496      int err = _channels_lookup(channels, cid, NULL, &chan);
    1497      if (err != 0) {
    1498          return err;
    1499      }
    1500      else if (send && chan->closing != NULL) {
    1501          return ERR_CHANNEL_CLOSED;
    1502      }
    1503  
    1504      _channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
    1505                                          interp, NULL);
    1506  
    1507      return (end != NULL && end->open);
    1508  }
    1509  
    1510  /* ChannelID class */
    1511  
    1512  typedef struct channelid {
    1513      PyObject_HEAD
    1514      int64_t id;
    1515      int end;
    1516      int resolve;
    1517      _channels *channels;
    1518  } channelid;
    1519  
    1520  struct channel_id_converter_data {
    1521      PyObject *module;
    1522      int64_t cid;
    1523  };
    1524  
    1525  static int
    1526  channel_id_converter(PyObject *arg, void *ptr)
    1527  {
    1528      int64_t cid;
    1529      struct channel_id_converter_data *data = ptr;
    1530      module_state *state = get_module_state(data->module);
    1531      assert(state != NULL);
    1532      if (PyObject_TypeCheck(arg, state->ChannelIDType)) {
    1533          cid = ((channelid *)arg)->id;
    1534      }
    1535      else if (PyIndex_Check(arg)) {
    1536          cid = PyLong_AsLongLong(arg);
    1537          if (cid == -1 && PyErr_Occurred()) {
    1538              return 0;
    1539          }
    1540          if (cid < 0) {
    1541              PyErr_Format(PyExc_ValueError,
    1542                          "channel ID must be a non-negative int, got %R", arg);
    1543              return 0;
    1544          }
    1545      }
    1546      else {
    1547          PyErr_Format(PyExc_TypeError,
    1548                       "channel ID must be an int, got %.100s",
    1549                       Py_TYPE(arg)->tp_name);
    1550          return 0;
    1551      }
    1552      data->cid = cid;
    1553      return 1;
    1554  }
    1555  
    1556  static int
    1557  newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
    1558               int force, int resolve, channelid **res)
    1559  {
    1560      *res = NULL;
    1561  
    1562      channelid *self = PyObject_New(channelid, cls);
    1563      if (self == NULL) {
    1564          return -1;
    1565      }
    1566      self->id = cid;
    1567      self->end = end;
    1568      self->resolve = resolve;
    1569      self->channels = channels;
    1570  
    1571      int err = _channels_add_id_object(channels, cid);
    1572      if (err != 0) {
    1573          if (force && err == ERR_CHANNEL_NOT_FOUND) {
    1574              assert(!PyErr_Occurred());
    1575          }
    1576          else {
    1577              Py_DECREF((PyObject *)self);
    1578              return err;
    1579          }
    1580      }
    1581  
    1582      *res = self;
    1583      return 0;
    1584  }
    1585  
    1586  static _channels * _global_channels(void);
    1587  
    1588  static PyObject *
    1589  _channelid_new(PyObject *mod, PyTypeObject *cls,
    1590                 PyObject *args, PyObject *kwds)
    1591  {
    1592      static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL};
    1593      int64_t cid;
    1594      struct channel_id_converter_data cid_data = {
    1595          .module = mod,
    1596      };
    1597      int send = -1;
    1598      int recv = -1;
    1599      int force = 0;
    1600      int resolve = 0;
    1601      if (!PyArg_ParseTupleAndKeywords(args, kwds,
    1602                                       "O&|$pppp:ChannelID.__new__", kwlist,
    1603                                       channel_id_converter, &cid_data,
    1604                                       &send, &recv, &force, &resolve)) {
    1605          return NULL;
    1606      }
    1607      cid = cid_data.cid;
    1608  
    1609      // Handle "send" and "recv".
    1610      if (send == 0 && recv == 0) {
    1611          PyErr_SetString(PyExc_ValueError,
    1612                          "'send' and 'recv' cannot both be False");
    1613          return NULL;
    1614      }
    1615  
    1616      int end = 0;
    1617      if (send == 1) {
    1618          if (recv == 0 || recv == -1) {
    1619              end = CHANNEL_SEND;
    1620          }
    1621      }
    1622      else if (recv == 1) {
    1623          end = CHANNEL_RECV;
    1624      }
    1625  
    1626      PyObject *id = NULL;
    1627      int err = newchannelid(cls, cid, end, _global_channels(),
    1628                             force, resolve,
    1629                             (channelid **)&id);
    1630      if (handle_channel_error(err, mod, cid)) {
    1631          assert(id == NULL);
    1632          return NULL;
    1633      }
    1634      assert(id != NULL);
    1635      return id;
    1636  }
    1637  
    1638  static void
    1639  channelid_dealloc(PyObject *self)
    1640  {
    1641      int64_t cid = ((channelid *)self)->id;
    1642      _channels *channels = ((channelid *)self)->channels;
    1643  
    1644      PyTypeObject *tp = Py_TYPE(self);
    1645      tp->tp_free(self);
    1646      /* "Instances of heap-allocated types hold a reference to their type."
    1647       * See: https://docs.python.org/3.11/howto/isolating-extensions.html#garbage-collection-protocol
    1648       * See: https://docs.python.org/3.11/c-api/typeobj.html#c.PyTypeObject.tp_traverse
    1649      */
    1650      // XXX Why don't we implement Py_TPFLAGS_HAVE_GC, e.g. Py_tp_traverse,
    1651      // like we do for _abc._abc_data?
    1652      Py_DECREF(tp);
    1653  
    1654      _channels_drop_id_object(channels, cid);
    1655  }
    1656  
    1657  static PyObject *
    1658  channelid_repr(PyObject *self)
    1659  {
    1660      PyTypeObject *type = Py_TYPE(self);
    1661      const char *name = _PyType_Name(type);
    1662  
    1663      channelid *cid = (channelid *)self;
    1664      const char *fmt;
    1665      if (cid->end == CHANNEL_SEND) {
    1666          fmt = "%s(%" PRId64 ", send=True)";
    1667      }
    1668      else if (cid->end == CHANNEL_RECV) {
    1669          fmt = "%s(%" PRId64 ", recv=True)";
    1670      }
    1671      else {
    1672          fmt = "%s(%" PRId64 ")";
    1673      }
    1674      return PyUnicode_FromFormat(fmt, name, cid->id);
    1675  }
    1676  
    1677  static PyObject *
    1678  channelid_str(PyObject *self)
    1679  {
    1680      channelid *cid = (channelid *)self;
    1681      return PyUnicode_FromFormat("%" PRId64 "", cid->id);
    1682  }
    1683  
    1684  static PyObject *
    1685  channelid_int(PyObject *self)
    1686  {
    1687      channelid *cid = (channelid *)self;
    1688      return PyLong_FromLongLong(cid->id);
    1689  }
    1690  
    1691  static Py_hash_t
    1692  channelid_hash(PyObject *self)
    1693  {
    1694      channelid *cid = (channelid *)self;
    1695      PyObject *id = PyLong_FromLongLong(cid->id);
    1696      if (id == NULL) {
    1697          return -1;
    1698      }
    1699      Py_hash_t hash = PyObject_Hash(id);
    1700      Py_DECREF(id);
    1701      return hash;
    1702  }
    1703  
    1704  static PyObject *
    1705  channelid_richcompare(PyObject *self, PyObject *other, int op)
    1706  {
    1707      PyObject *res = NULL;
    1708      if (op != Py_EQ && op != Py_NE) {
    1709          Py_RETURN_NOTIMPLEMENTED;
    1710      }
    1711  
    1712      PyObject *mod = get_module_from_type(Py_TYPE(self));
    1713      if (mod == NULL) {
    1714          return NULL;
    1715      }
    1716      module_state *state = get_module_state(mod);
    1717      if (state == NULL) {
    1718          goto done;
    1719      }
    1720  
    1721      if (!PyObject_TypeCheck(self, state->ChannelIDType)) {
    1722          res = Py_NewRef(Py_NotImplemented);
    1723          goto done;
    1724      }
    1725  
    1726      channelid *cid = (channelid *)self;
    1727      int equal;
    1728      if (PyObject_TypeCheck(other, state->ChannelIDType)) {
    1729          channelid *othercid = (channelid *)other;
    1730          equal = (cid->end == othercid->end) && (cid->id == othercid->id);
    1731      }
    1732      else if (PyLong_Check(other)) {
    1733          /* Fast path */
    1734          int overflow;
    1735          long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow);
    1736          if (othercid == -1 && PyErr_Occurred()) {
    1737              goto done;
    1738          }
    1739          equal = !overflow && (othercid >= 0) && (cid->id == othercid);
    1740      }
    1741      else if (PyNumber_Check(other)) {
    1742          PyObject *pyid = PyLong_FromLongLong(cid->id);
    1743          if (pyid == NULL) {
    1744              goto done;
    1745          }
    1746          res = PyObject_RichCompare(pyid, other, op);
    1747          Py_DECREF(pyid);
    1748          goto done;
    1749      }
    1750      else {
    1751          res = Py_NewRef(Py_NotImplemented);
    1752          goto done;
    1753      }
    1754  
    1755      if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) {
    1756          res = Py_NewRef(Py_True);
    1757      }
    1758      else {
    1759          res = Py_NewRef(Py_False);
    1760      }
    1761  
    1762  done:
    1763      Py_DECREF(mod);
    1764      return res;
    1765  }
    1766  
    1767  static PyObject *
    1768  _channel_from_cid(PyObject *cid, int end)
    1769  {
    1770      PyObject *highlevel = PyImport_ImportModule("interpreters");
    1771      if (highlevel == NULL) {
    1772          PyErr_Clear();
    1773          highlevel = PyImport_ImportModule("test.support.interpreters");
    1774          if (highlevel == NULL) {
    1775              return NULL;
    1776          }
    1777      }
    1778      const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" :
    1779                                                    "SendChannel";
    1780      PyObject *cls = PyObject_GetAttrString(highlevel, clsname);
    1781      Py_DECREF(highlevel);
    1782      if (cls == NULL) {
    1783          return NULL;
    1784      }
    1785      PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL);
    1786      Py_DECREF(cls);
    1787      if (chan == NULL) {
    1788          return NULL;
    1789      }
    1790      return chan;
    1791  }
    1792  
    1793  struct _channelid_xid {
    1794      int64_t id;
    1795      int end;
    1796      int resolve;
    1797  };
    1798  
    1799  static PyObject *
    1800  _channelid_from_xid(_PyCrossInterpreterData *data)
    1801  {
    1802      struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
    1803  
    1804      // It might not be imported yet, so we can't use _get_current_module().
    1805      PyObject *mod = PyImport_ImportModule(MODULE_NAME);
    1806      if (mod == NULL) {
    1807          return NULL;
    1808      }
    1809      assert(mod != Py_None);
    1810      module_state *state = get_module_state(mod);
    1811      if (state == NULL) {
    1812          return NULL;
    1813      }
    1814  
    1815      // Note that we do not preserve the "resolve" flag.
    1816      PyObject *cid = NULL;
    1817      int err = newchannelid(state->ChannelIDType, xid->id, xid->end,
    1818                             _global_channels(), 0, 0,
    1819                             (channelid **)&cid);
    1820      if (err != 0) {
    1821          assert(cid == NULL);
    1822          (void)handle_channel_error(err, mod, xid->id);
    1823          goto done;
    1824      }
    1825      assert(cid != NULL);
    1826      if (xid->end == 0) {
    1827          goto done;
    1828      }
    1829      if (!xid->resolve) {
    1830          goto done;
    1831      }
    1832  
    1833      /* Try returning a high-level channel end but fall back to the ID. */
    1834      PyObject *chan = _channel_from_cid(cid, xid->end);
    1835      if (chan == NULL) {
    1836          PyErr_Clear();
    1837          goto done;
    1838      }
    1839      Py_DECREF(cid);
    1840      cid = chan;
    1841  
    1842  done:
    1843      Py_DECREF(mod);
    1844      return cid;
    1845  }
    1846  
    1847  static int
    1848  _channelid_shared(PyThreadState *tstate, PyObject *obj,
    1849                    _PyCrossInterpreterData *data)
    1850  {
    1851      if (_PyCrossInterpreterData_InitWithSize(
    1852              data, tstate->interp, sizeof(struct _channelid_xid), obj,
    1853              _channelid_from_xid
    1854              ) < 0)
    1855      {
    1856          return -1;
    1857      }
    1858      struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
    1859      xid->id = ((channelid *)obj)->id;
    1860      xid->end = ((channelid *)obj)->end;
    1861      xid->resolve = ((channelid *)obj)->resolve;
    1862      return 0;
    1863  }
    1864  
    1865  static PyObject *
    1866  channelid_end(PyObject *self, void *end)
    1867  {
    1868      int force = 1;
    1869      channelid *cid = (channelid *)self;
    1870      if (end != NULL) {
    1871          PyObject *id = NULL;
    1872          int err = newchannelid(Py_TYPE(self), cid->id, *(int *)end,
    1873                                 cid->channels, force, cid->resolve,
    1874                                 (channelid **)&id);
    1875          if (err != 0) {
    1876              assert(id == NULL);
    1877              PyObject *mod = get_module_from_type(Py_TYPE(self));
    1878              if (mod == NULL) {
    1879                  return NULL;
    1880              }
    1881              (void)handle_channel_error(err, mod, cid->id);
    1882              Py_DECREF(mod);
    1883              return NULL;
    1884          }
    1885          assert(id != NULL);
    1886          return id;
    1887      }
    1888  
    1889      if (cid->end == CHANNEL_SEND) {
    1890          return PyUnicode_InternFromString("send");
    1891      }
    1892      if (cid->end == CHANNEL_RECV) {
    1893          return PyUnicode_InternFromString("recv");
    1894      }
    1895      return PyUnicode_InternFromString("both");
    1896  }
    1897  
    1898  static int _channelid_end_send = CHANNEL_SEND;
    1899  static int _channelid_end_recv = CHANNEL_RECV;
    1900  
    1901  static PyGetSetDef channelid_getsets[] = {
    1902      {"end", (getter)channelid_end, NULL,
    1903       PyDoc_STR("'send', 'recv', or 'both'")},
    1904      {"send", (getter)channelid_end, NULL,
    1905       PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send},
    1906      {"recv", (getter)channelid_end, NULL,
    1907       PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv},
    1908      {NULL}
    1909  };
    1910  
    1911  PyDoc_STRVAR(channelid_doc,
    1912  "A channel ID identifies a channel and may be used as an int.");
    1913  
    1914  static PyType_Slot ChannelIDType_slots[] = {
    1915      {Py_tp_dealloc, (destructor)channelid_dealloc},
    1916      {Py_tp_doc, (void *)channelid_doc},
    1917      {Py_tp_repr, (reprfunc)channelid_repr},
    1918      {Py_tp_str, (reprfunc)channelid_str},
    1919      {Py_tp_hash, channelid_hash},
    1920      {Py_tp_richcompare, channelid_richcompare},
    1921      {Py_tp_getset, channelid_getsets},
    1922      // number slots
    1923      {Py_nb_int, (unaryfunc)channelid_int},
    1924      {Py_nb_index,  (unaryfunc)channelid_int},
    1925      {0, NULL},
    1926  };
    1927  
    1928  static PyType_Spec ChannelIDType_spec = {
    1929      .name = MODULE_NAME ".ChannelID",
    1930      .basicsize = sizeof(channelid),
    1931      .flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
    1932                Py_TPFLAGS_DISALLOW_INSTANTIATION | Py_TPFLAGS_IMMUTABLETYPE),
    1933      .slots = ChannelIDType_slots,
    1934  };
    1935  
    1936  
    1937  /* module level code ********************************************************/
    1938  
    1939  /* globals is the process-global state for the module.  It holds all
    1940     the data that we need to share between interpreters, so it cannot
    1941     hold PyObject values. */
    1942  static struct globals {
    1943      int module_count;
    1944      _channels channels;
    1945  } _globals = {0};
    1946  
    1947  static int
    1948  _globals_init(void)
    1949  {
    1950      // XXX This isn't thread-safe.
    1951      _globals.module_count++;
    1952      if (_globals.module_count > 1) {
    1953          // Already initialized.
    1954          return 0;
    1955      }
    1956  
    1957      assert(_globals.channels.mutex == NULL);
    1958      PyThread_type_lock mutex = PyThread_allocate_lock();
    1959      if (mutex == NULL) {
    1960          return ERR_CHANNELS_MUTEX_INIT;
    1961      }
    1962      _channels_init(&_globals.channels, mutex);
    1963      return 0;
    1964  }
    1965  
    1966  static void
    1967  _globals_fini(void)
    1968  {
    1969      // XXX This isn't thread-safe.
    1970      _globals.module_count--;
    1971      if (_globals.module_count > 0) {
    1972          return;
    1973      }
    1974  
    1975      _channels_fini(&_globals.channels);
    1976  }
    1977  
    1978  static _channels *
    1979  _global_channels(void) {
    1980      return &_globals.channels;
    1981  }
    1982  
    1983  
    1984  static void
    1985  clear_interpreter(void *data)
    1986  {
    1987      if (_globals.module_count == 0) {
    1988          return;
    1989      }
    1990      PyInterpreterState *interp = (PyInterpreterState *)data;
    1991      assert(interp == _get_current_interp());
    1992      int64_t id = PyInterpreterState_GetID(interp);
    1993      _channels_drop_interpreter(&_globals.channels, id);
    1994  }
    1995  
    1996  
    1997  static PyObject *
    1998  channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
    1999  {
    2000      int64_t cid = _channel_create(&_globals.channels);
    2001      if (cid < 0) {
    2002          (void)handle_channel_error(-1, self, cid);
    2003          return NULL;
    2004      }
    2005      module_state *state = get_module_state(self);
    2006      if (state == NULL) {
    2007          return NULL;
    2008      }
    2009      PyObject *id = NULL;
    2010      int err = newchannelid(state->ChannelIDType, cid, 0,
    2011                             &_globals.channels, 0, 0,
    2012                             (channelid **)&id);
    2013      if (handle_channel_error(err, self, cid)) {
    2014          assert(id == NULL);
    2015          err = _channel_destroy(&_globals.channels, cid);
    2016          if (handle_channel_error(err, self, cid)) {
    2017              // XXX issue a warning?
    2018          }
    2019          return NULL;
    2020      }
    2021      assert(id != NULL);
    2022      assert(((channelid *)id)->channels != NULL);
    2023      return id;
    2024  }
    2025  
    2026  PyDoc_STRVAR(channel_create_doc,
    2027  "channel_create() -> cid\n\
    2028  \n\
    2029  Create a new cross-interpreter channel and return a unique generated ID.");
    2030  
    2031  static PyObject *
    2032  channel_destroy(PyObject *self, PyObject *args, PyObject *kwds)
    2033  {
    2034      static char *kwlist[] = {"cid", NULL};
    2035      int64_t cid;
    2036      struct channel_id_converter_data cid_data = {
    2037          .module = self,
    2038      };
    2039      if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist,
    2040                                       channel_id_converter, &cid_data)) {
    2041          return NULL;
    2042      }
    2043      cid = cid_data.cid;
    2044  
    2045      int err = _channel_destroy(&_globals.channels, cid);
    2046      if (handle_channel_error(err, self, cid)) {
    2047          return NULL;
    2048      }
    2049      Py_RETURN_NONE;
    2050  }
    2051  
    2052  PyDoc_STRVAR(channel_destroy_doc,
    2053  "channel_destroy(cid)\n\
    2054  \n\
    2055  Close and finalize the channel.  Afterward attempts to use the channel\n\
    2056  will behave as though it never existed.");
    2057  
    2058  static PyObject *
    2059  channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
    2060  {
    2061      int64_t count = 0;
    2062      int64_t *cids = _channels_list_all(&_globals.channels, &count);
    2063      if (cids == NULL) {
    2064          if (count == 0) {
    2065              return PyList_New(0);
    2066          }
    2067          return NULL;
    2068      }
    2069      PyObject *ids = PyList_New((Py_ssize_t)count);
    2070      if (ids == NULL) {
    2071          goto finally;
    2072      }
    2073      module_state *state = get_module_state(self);
    2074      if (state == NULL) {
    2075          Py_DECREF(ids);
    2076          ids = NULL;
    2077          goto finally;
    2078      }
    2079      int64_t *cur = cids;
    2080      for (int64_t i=0; i < count; cur++, i++) {
    2081          PyObject *id = NULL;
    2082          int err = newchannelid(state->ChannelIDType, *cur, 0,
    2083                                 &_globals.channels, 0, 0,
    2084                                 (channelid **)&id);
    2085          if (handle_channel_error(err, self, *cur)) {
    2086              assert(id == NULL);
    2087              Py_SETREF(ids, NULL);
    2088              break;
    2089          }
    2090          assert(id != NULL);
    2091          PyList_SET_ITEM(ids, (Py_ssize_t)i, id);
    2092      }
    2093  
    2094  finally:
    2095      PyMem_Free(cids);
    2096      return ids;
    2097  }
    2098  
    2099  PyDoc_STRVAR(channel_list_all_doc,
    2100  "channel_list_all() -> [cid]\n\
    2101  \n\
    2102  Return the list of all IDs for active channels.");
    2103  
    2104  static PyObject *
    2105  channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
    2106  {
    2107      static char *kwlist[] = {"cid", "send", NULL};
    2108      int64_t cid;            /* Channel ID */
    2109      struct channel_id_converter_data cid_data = {
    2110          .module = self,
    2111      };
    2112      int send = 0;           /* Send or receive end? */
    2113      int64_t id;
    2114      PyObject *ids, *id_obj;
    2115      PyInterpreterState *interp;
    2116  
    2117      if (!PyArg_ParseTupleAndKeywords(
    2118              args, kwds, "O&$p:channel_list_interpreters",
    2119              kwlist, channel_id_converter, &cid_data, &send)) {
    2120          return NULL;
    2121      }
    2122      cid = cid_data.cid;
    2123  
    2124      ids = PyList_New(0);
    2125      if (ids == NULL) {
    2126          goto except;
    2127      }
    2128  
    2129      interp = PyInterpreterState_Head();
    2130      while (interp != NULL) {
    2131          id = PyInterpreterState_GetID(interp);
    2132          assert(id >= 0);
    2133          int res = _channel_is_associated(&_globals.channels, cid, id, send);
    2134          if (res < 0) {
    2135              (void)handle_channel_error(res, self, cid);
    2136              goto except;
    2137          }
    2138          if (res) {
    2139              id_obj = _PyInterpreterState_GetIDObject(interp);
    2140              if (id_obj == NULL) {
    2141                  goto except;
    2142              }
    2143              res = PyList_Insert(ids, 0, id_obj);
    2144              Py_DECREF(id_obj);
    2145              if (res < 0) {
    2146                  goto except;
    2147              }
    2148          }
    2149          interp = PyInterpreterState_Next(interp);
    2150      }
    2151  
    2152      goto finally;
    2153  
    2154  except:
    2155      Py_CLEAR(ids);
    2156  
    2157  finally:
    2158      return ids;
    2159  }
    2160  
    2161  PyDoc_STRVAR(channel_list_interpreters_doc,
    2162  "channel_list_interpreters(cid, *, send) -> [id]\n\
    2163  \n\
    2164  Return the list of all interpreter IDs associated with an end of the channel.\n\
    2165  \n\
    2166  The 'send' argument should be a boolean indicating whether to use the send or\n\
    2167  receive end.");
    2168  
    2169  
    2170  static PyObject *
    2171  channel_send(PyObject *self, PyObject *args, PyObject *kwds)
    2172  {
    2173      static char *kwlist[] = {"cid", "obj", NULL};
    2174      int64_t cid;
    2175      struct channel_id_converter_data cid_data = {
    2176          .module = self,
    2177      };
    2178      PyObject *obj;
    2179      if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist,
    2180                                       channel_id_converter, &cid_data, &obj)) {
    2181          return NULL;
    2182      }
    2183      cid = cid_data.cid;
    2184  
    2185      int err = _channel_send(&_globals.channels, cid, obj);
    2186      if (handle_channel_error(err, self, cid)) {
    2187          return NULL;
    2188      }
    2189      Py_RETURN_NONE;
    2190  }
    2191  
    2192  PyDoc_STRVAR(channel_send_doc,
    2193  "channel_send(cid, obj)\n\
    2194  \n\
    2195  Add the object's data to the channel's queue.");
    2196  
    2197  static PyObject *
    2198  channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
    2199  {
    2200      static char *kwlist[] = {"cid", "default", NULL};
    2201      int64_t cid;
    2202      struct channel_id_converter_data cid_data = {
    2203          .module = self,
    2204      };
    2205      PyObject *dflt = NULL;
    2206      if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&|O:channel_recv", kwlist,
    2207                                       channel_id_converter, &cid_data, &dflt)) {
    2208          return NULL;
    2209      }
    2210      cid = cid_data.cid;
    2211  
    2212      PyObject *obj = NULL;
    2213      int err = _channel_recv(&_globals.channels, cid, &obj);
    2214      if (handle_channel_error(err, self, cid)) {
    2215          return NULL;
    2216      }
    2217      Py_XINCREF(dflt);
    2218      if (obj == NULL) {
    2219          // Use the default.
    2220          if (dflt == NULL) {
    2221              (void)handle_channel_error(ERR_CHANNEL_EMPTY, self, cid);
    2222              return NULL;
    2223          }
    2224          obj = Py_NewRef(dflt);
    2225      }
    2226      Py_XDECREF(dflt);
    2227      return obj;
    2228  }
    2229  
    2230  PyDoc_STRVAR(channel_recv_doc,
    2231  "channel_recv(cid, [default]) -> obj\n\
    2232  \n\
    2233  Return a new object from the data at the front of the channel's queue.\n\
    2234  \n\
    2235  If there is nothing to receive then raise ChannelEmptyError, unless\n\
    2236  a default value is provided.  In that case return it.");
    2237  
    2238  static PyObject *
    2239  channel_close(PyObject *self, PyObject *args, PyObject *kwds)
    2240  {
    2241      static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
    2242      int64_t cid;
    2243      struct channel_id_converter_data cid_data = {
    2244          .module = self,
    2245      };
    2246      int send = 0;
    2247      int recv = 0;
    2248      int force = 0;
    2249      if (!PyArg_ParseTupleAndKeywords(args, kwds,
    2250                                       "O&|$ppp:channel_close", kwlist,
    2251                                       channel_id_converter, &cid_data,
    2252                                       &send, &recv, &force)) {
    2253          return NULL;
    2254      }
    2255      cid = cid_data.cid;
    2256  
    2257      int err = _channel_close(&_globals.channels, cid, send-recv, force);
    2258      if (handle_channel_error(err, self, cid)) {
    2259          return NULL;
    2260      }
    2261      Py_RETURN_NONE;
    2262  }
    2263  
    2264  PyDoc_STRVAR(channel_close_doc,
    2265  "channel_close(cid, *, send=None, recv=None, force=False)\n\
    2266  \n\
    2267  Close the channel for all interpreters.\n\
    2268  \n\
    2269  If the channel is empty then the keyword args are ignored and both\n\
    2270  ends are immediately closed.  Otherwise, if 'force' is True then\n\
    2271  all queued items are released and both ends are immediately\n\
    2272  closed.\n\
    2273  \n\
    2274  If the channel is not empty *and* 'force' is False then following\n\
    2275  happens:\n\
    2276  \n\
    2277   * recv is True (regardless of send):\n\
    2278     - raise ChannelNotEmptyError\n\
    2279   * recv is None and send is None:\n\
    2280     - raise ChannelNotEmptyError\n\
    2281   * send is True and recv is not True:\n\
    2282     - fully close the 'send' end\n\
    2283     - close the 'recv' end to interpreters not already receiving\n\
    2284     - fully close it once empty\n\
    2285  \n\
    2286  Closing an already closed channel results in a ChannelClosedError.\n\
    2287  \n\
    2288  Once the channel's ID has no more ref counts in any interpreter\n\
    2289  the channel will be destroyed.");
    2290  
    2291  static PyObject *
    2292  channel_release(PyObject *self, PyObject *args, PyObject *kwds)
    2293  {
    2294      // Note that only the current interpreter is affected.
    2295      static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
    2296      int64_t cid;
    2297      struct channel_id_converter_data cid_data = {
    2298          .module = self,
    2299      };
    2300      int send = 0;
    2301      int recv = 0;
    2302      int force = 0;
    2303      if (!PyArg_ParseTupleAndKeywords(args, kwds,
    2304                                       "O&|$ppp:channel_release", kwlist,
    2305                                       channel_id_converter, &cid_data,
    2306                                       &send, &recv, &force)) {
    2307          return NULL;
    2308      }
    2309      cid = cid_data.cid;
    2310      if (send == 0 && recv == 0) {
    2311          send = 1;
    2312          recv = 1;
    2313      }
    2314  
    2315      // XXX Handle force is True.
    2316      // XXX Fix implicit release.
    2317  
    2318      int err = _channel_drop(&_globals.channels, cid, send, recv);
    2319      if (handle_channel_error(err, self, cid)) {
    2320          return NULL;
    2321      }
    2322      Py_RETURN_NONE;
    2323  }
    2324  
    2325  PyDoc_STRVAR(channel_release_doc,
    2326  "channel_release(cid, *, send=None, recv=None, force=True)\n\
    2327  \n\
    2328  Close the channel for the current interpreter.  'send' and 'recv'\n\
    2329  (bool) may be used to indicate the ends to close.  By default both\n\
    2330  ends are closed.  Closing an already closed end is a noop.");
    2331  
    2332  static PyObject *
    2333  channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
    2334  {
    2335      module_state *state = get_module_state(self);
    2336      if (state == NULL) {
    2337          return NULL;
    2338      }
    2339      PyTypeObject *cls = state->ChannelIDType;
    2340      PyObject *mod = get_module_from_owned_type(cls);
    2341      if (mod == NULL) {
    2342          return NULL;
    2343      }
    2344      PyObject *cid = _channelid_new(mod, cls, args, kwds);
    2345      Py_DECREF(mod);
    2346      return cid;
    2347  }
    2348  
    2349  static PyMethodDef module_functions[] = {
    2350      {"create",                    channel_create,
    2351       METH_NOARGS, channel_create_doc},
    2352      {"destroy",                   _PyCFunction_CAST(channel_destroy),
    2353       METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
    2354      {"list_all",                  channel_list_all,
    2355       METH_NOARGS, channel_list_all_doc},
    2356      {"list_interpreters",         _PyCFunction_CAST(channel_list_interpreters),
    2357       METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
    2358      {"send",                      _PyCFunction_CAST(channel_send),
    2359       METH_VARARGS | METH_KEYWORDS, channel_send_doc},
    2360      {"recv",                      _PyCFunction_CAST(channel_recv),
    2361       METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
    2362      {"close",                     _PyCFunction_CAST(channel_close),
    2363       METH_VARARGS | METH_KEYWORDS, channel_close_doc},
    2364      {"release",                   _PyCFunction_CAST(channel_release),
    2365       METH_VARARGS | METH_KEYWORDS, channel_release_doc},
    2366      {"_channel_id",               _PyCFunction_CAST(channel__channel_id),
    2367       METH_VARARGS | METH_KEYWORDS, NULL},
    2368  
    2369      {NULL,                        NULL}           /* sentinel */
    2370  };
    2371  
    2372  
    2373  /* initialization function */
    2374  
    2375  PyDoc_STRVAR(module_doc,
    2376  "This module provides primitive operations to manage Python interpreters.\n\
    2377  The 'interpreters' module provides a more convenient interface.");
    2378  
    2379  static int
    2380  module_exec(PyObject *mod)
    2381  {
    2382      if (_globals_init() != 0) {
    2383          return -1;
    2384      }
    2385  
    2386      /* Add exception types */
    2387      if (exceptions_init(mod) != 0) {
    2388          goto error;
    2389      }
    2390  
    2391      /* Add other types */
    2392      module_state *state = get_module_state(mod);
    2393      if (state == NULL) {
    2394          goto error;
    2395      }
    2396  
    2397      // ChannelID
    2398      state->ChannelIDType = add_new_type(
    2399              mod, &ChannelIDType_spec, _channelid_shared);
    2400      if (state->ChannelIDType == NULL) {
    2401          goto error;
    2402      }
    2403  
    2404      // Make sure chnnels drop objects owned by this interpreter
    2405      PyInterpreterState *interp = _get_current_interp();
    2406      _Py_AtExit(interp, clear_interpreter, (void *)interp);
    2407  
    2408      return 0;
    2409  
    2410  error:
    2411      _globals_fini();
    2412      return -1;
    2413  }
    2414  
    2415  static struct PyModuleDef_Slot module_slots[] = {
    2416      {Py_mod_exec, module_exec},
    2417      {Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED},
    2418      {0, NULL},
    2419  };
    2420  
    2421  static int
    2422  module_traverse(PyObject *mod, visitproc visit, void *arg)
    2423  {
    2424      module_state *state = get_module_state(mod);
    2425      assert(state != NULL);
    2426      traverse_module_state(state, visit, arg);
    2427      return 0;
    2428  }
    2429  
    2430  static int
    2431  module_clear(PyObject *mod)
    2432  {
    2433      module_state *state = get_module_state(mod);
    2434      assert(state != NULL);
    2435      clear_module_state(state);
    2436      return 0;
    2437  }
    2438  
    2439  static void
    2440  module_free(void *mod)
    2441  {
    2442      module_state *state = get_module_state(mod);
    2443      assert(state != NULL);
    2444      clear_module_state(state);
    2445      _globals_fini();
    2446  }
    2447  
    2448  static struct PyModuleDef moduledef = {
    2449      .m_base = PyModuleDef_HEAD_INIT,
    2450      .m_name = MODULE_NAME,
    2451      .m_doc = module_doc,
    2452      .m_size = sizeof(module_state),
    2453      .m_methods = module_functions,
    2454      .m_slots = module_slots,
    2455      .m_traverse = module_traverse,
    2456      .m_clear = module_clear,
    2457      .m_free = (freefunc)module_free,
    2458  };
    2459  
    2460  PyMODINIT_FUNC
    2461  PyInit__xxinterpchannels(void)
    2462  {
    2463      return PyModuleDef_Init(&moduledef);
    2464  }