1
2 /* interpreters module */
3 /* low-level access to interpreter primitives */
4
5 #include "Python.h"
6 #include "interpreteridobject.h"
7
8
9 /*
10 This module has the following process-global state:
11
12 _globals (static struct globals):
13 module_count (int)
14 channels (struct _channels):
15 numopen (int64_t)
16 next_id; (int64_t)
17 mutex (PyThread_type_lock)
18 head (linked list of struct _channelref *):
19 id (int64_t)
20 objcount (Py_ssize_t)
21 next (struct _channelref *):
22 ...
23 chan (struct _channel *):
24 open (int)
25 mutex (PyThread_type_lock)
26 closing (struct _channel_closing *):
27 ref (struct _channelref *):
28 ...
29 ends (struct _channelends *):
30 numsendopen (int64_t)
31 numrecvopen (int64_t)
32 send (struct _channelend *):
33 interp (int64_t)
34 open (int)
35 next (struct _channelend *)
36 recv (struct _channelend *):
37 ...
38 queue (struct _channelqueue *):
39 count (int64_t)
40 first (struct _channelitem *):
41 next (struct _channelitem *):
42 ...
43 data (_PyCrossInterpreterData *):
44 data (void *)
45 obj (PyObject *)
46 interp (int64_t)
47 new_object (xid_newobjectfunc)
48 free (xid_freefunc)
49 last (struct _channelitem *):
50 ...
51
52 The above state includes the following allocations by the module:
53
54 * 1 top-level mutex (to protect the rest of the state)
55 * for each channel:
56 * 1 struct _channelref
57 * 1 struct _channel
58 * 0-1 struct _channel_closing
59 * 1 struct _channelends
60 * 2 struct _channelend
61 * 1 struct _channelqueue
62 * for each item in each channel:
63 * 1 struct _channelitem
64 * 1 _PyCrossInterpreterData
65
66 The only objects in that global state are the references held by each
67 channel's queue, which are safely managed via the _PyCrossInterpreterData_*()
68 API.. The module does not create any objects that are shared globally.
69 */
70
71 #define MODULE_NAME "_xxinterpchannels"
72
73
74 #define GLOBAL_MALLOC(TYPE) \
75 PyMem_RawMalloc(sizeof(TYPE))
76 #define GLOBAL_FREE(VAR) \
77 PyMem_RawFree(VAR)
78
79
80 static PyInterpreterState *
81 _get_current_interp(void)
82 {
83 // PyInterpreterState_Get() aborts if lookup fails, so don't need
84 // to check the result for NULL.
85 return PyInterpreterState_Get();
86 }
87
88 static PyObject *
89 _get_current_module(void)
90 {
91 PyObject *name = PyUnicode_FromString(MODULE_NAME);
92 if (name == NULL) {
93 return NULL;
94 }
95 PyObject *mod = PyImport_GetModule(name);
96 Py_DECREF(name);
97 if (mod == NULL) {
98 return NULL;
99 }
100 assert(mod != Py_None);
101 return mod;
102 }
103
104 static PyObject *
105 get_module_from_owned_type(PyTypeObject *cls)
106 {
107 assert(cls != NULL);
108 return _get_current_module();
109 // XXX Use the more efficient API now that we use heap types:
110 //return PyType_GetModule(cls);
111 }
112
113 static struct PyModuleDef moduledef;
114
115 static PyObject *
116 get_module_from_type(PyTypeObject *cls)
117 {
118 assert(cls != NULL);
119 return _get_current_module();
120 // XXX Use the more efficient API now that we use heap types:
121 //return PyType_GetModuleByDef(cls, &moduledef);
122 }
123
124 static PyObject *
125 add_new_exception(PyObject *mod, const char *name, PyObject *base)
126 {
127 assert(!PyObject_HasAttrString(mod, name));
128 PyObject *exctype = PyErr_NewException(name, base, NULL);
129 if (exctype == NULL) {
130 return NULL;
131 }
132 int res = PyModule_AddType(mod, (PyTypeObject *)exctype);
133 if (res < 0) {
134 Py_DECREF(exctype);
135 return NULL;
136 }
137 return exctype;
138 }
139
140 #define ADD_NEW_EXCEPTION(MOD, NAME, BASE) \
141 add_new_exception(MOD, MODULE_NAME "." Py_STRINGIFY(NAME), BASE)
142
143 static PyTypeObject *
144 add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared)
145 {
146 PyTypeObject *cls = (PyTypeObject *)PyType_FromMetaclass(
147 NULL, mod, spec, NULL);
148 if (cls == NULL) {
149 return NULL;
150 }
151 if (PyModule_AddType(mod, cls) < 0) {
152 Py_DECREF(cls);
153 return NULL;
154 }
155 if (shared != NULL) {
156 if (_PyCrossInterpreterData_RegisterClass(cls, shared)) {
157 Py_DECREF(cls);
158 return NULL;
159 }
160 }
161 return cls;
162 }
163
164 static int
165 _release_xid_data(_PyCrossInterpreterData *data, int ignoreexc)
166 {
167 PyObject *exc;
168 if (ignoreexc) {
169 exc = PyErr_GetRaisedException();
170 }
171 int res = _PyCrossInterpreterData_Release(data);
172 if (res < 0) {
173 /* The owning interpreter is already destroyed. */
174 if (ignoreexc) {
175 // XXX Emit a warning?
176 PyErr_Clear();
177 }
178 }
179 if (ignoreexc) {
180 PyErr_SetRaisedException(exc);
181 }
182 return res;
183 }
184
185
186 /* module state *************************************************************/
187
188 typedef struct {
189 /* heap types */
190 PyTypeObject *ChannelIDType;
191
192 /* exceptions */
193 PyObject *ChannelError;
194 PyObject *ChannelNotFoundError;
195 PyObject *ChannelClosedError;
196 PyObject *ChannelEmptyError;
197 PyObject *ChannelNotEmptyError;
198 } module_state;
199
200 static inline module_state *
201 get_module_state(PyObject *mod)
202 {
203 assert(mod != NULL);
204 module_state *state = PyModule_GetState(mod);
205 assert(state != NULL);
206 return state;
207 }
208
209 static int
210 traverse_module_state(module_state *state, visitproc visit, void *arg)
211 {
212 /* heap types */
213 Py_VISIT(state->ChannelIDType);
214
215 /* exceptions */
216 Py_VISIT(state->ChannelError);
217 Py_VISIT(state->ChannelNotFoundError);
218 Py_VISIT(state->ChannelClosedError);
219 Py_VISIT(state->ChannelEmptyError);
220 Py_VISIT(state->ChannelNotEmptyError);
221
222 return 0;
223 }
224
225 static int
226 clear_module_state(module_state *state)
227 {
228 /* heap types */
229 if (state->ChannelIDType != NULL) {
230 (void)_PyCrossInterpreterData_UnregisterClass(state->ChannelIDType);
231 }
232 Py_CLEAR(state->ChannelIDType);
233
234 /* exceptions */
235 Py_CLEAR(state->ChannelError);
236 Py_CLEAR(state->ChannelNotFoundError);
237 Py_CLEAR(state->ChannelClosedError);
238 Py_CLEAR(state->ChannelEmptyError);
239 Py_CLEAR(state->ChannelNotEmptyError);
240
241 return 0;
242 }
243
244
245 /* channel-specific code ****************************************************/
246
247 #define CHANNEL_SEND 1
248 #define CHANNEL_BOTH 0
249 #define CHANNEL_RECV -1
250
251 /* channel errors */
252
253 #define ERR_CHANNEL_NOT_FOUND -2
254 #define ERR_CHANNEL_CLOSED -3
255 #define ERR_CHANNEL_INTERP_CLOSED -4
256 #define ERR_CHANNEL_EMPTY -5
257 #define ERR_CHANNEL_NOT_EMPTY -6
258 #define ERR_CHANNEL_MUTEX_INIT -7
259 #define ERR_CHANNELS_MUTEX_INIT -8
260 #define ERR_NO_NEXT_CHANNEL_ID -9
261
262 static int
263 exceptions_init(PyObject *mod)
264 {
265 module_state *state = get_module_state(mod);
266 if (state == NULL) {
267 return -1;
268 }
269
270 #define ADD(NAME, BASE) \
271 do { \
272 assert(state->NAME == NULL); \
273 state->NAME = ADD_NEW_EXCEPTION(mod, NAME, BASE); \
274 if (state->NAME == NULL) { \
275 return -1; \
276 } \
277 } while (0)
278
279 // A channel-related operation failed.
280 ADD(ChannelError, PyExc_RuntimeError);
281 // An operation tried to use a channel that doesn't exist.
282 ADD(ChannelNotFoundError, state->ChannelError);
283 // An operation tried to use a closed channel.
284 ADD(ChannelClosedError, state->ChannelError);
285 // An operation tried to pop from an empty channel.
286 ADD(ChannelEmptyError, state->ChannelError);
287 // An operation tried to close a non-empty channel.
288 ADD(ChannelNotEmptyError, state->ChannelError);
289 #undef ADD
290
291 return 0;
292 }
293
294 static int
295 handle_channel_error(int err, PyObject *mod, int64_t cid)
296 {
297 if (err == 0) {
298 assert(!PyErr_Occurred());
299 return 0;
300 }
301 assert(err < 0);
302 module_state *state = get_module_state(mod);
303 assert(state != NULL);
304 if (err == ERR_CHANNEL_NOT_FOUND) {
305 PyErr_Format(state->ChannelNotFoundError,
306 "channel %" PRId64 " not found", cid);
307 }
308 else if (err == ERR_CHANNEL_CLOSED) {
309 PyErr_Format(state->ChannelClosedError,
310 "channel %" PRId64 " is closed", cid);
311 }
312 else if (err == ERR_CHANNEL_INTERP_CLOSED) {
313 PyErr_Format(state->ChannelClosedError,
314 "channel %" PRId64 " is already closed", cid);
315 }
316 else if (err == ERR_CHANNEL_EMPTY) {
317 PyErr_Format(state->ChannelEmptyError,
318 "channel %" PRId64 " is empty", cid);
319 }
320 else if (err == ERR_CHANNEL_NOT_EMPTY) {
321 PyErr_Format(state->ChannelNotEmptyError,
322 "channel %" PRId64 " may not be closed "
323 "if not empty (try force=True)",
324 cid);
325 }
326 else if (err == ERR_CHANNEL_MUTEX_INIT) {
327 PyErr_SetString(state->ChannelError,
328 "can't initialize mutex for new channel");
329 }
330 else if (err == ERR_CHANNELS_MUTEX_INIT) {
331 PyErr_SetString(state->ChannelError,
332 "can't initialize mutex for channel management");
333 }
334 else if (err == ERR_NO_NEXT_CHANNEL_ID) {
335 PyErr_SetString(state->ChannelError,
336 "failed to get a channel ID");
337 }
338 else {
339 assert(PyErr_Occurred());
340 }
341 return 1;
342 }
343
344 /* the channel queue */
345
346 struct _channelitem;
347
348 typedef struct _channelitem {
349 _PyCrossInterpreterData *data;
350 struct _channelitem *next;
351 } _channelitem;
352
353 static _channelitem *
354 _channelitem_new(void)
355 {
356 _channelitem *item = GLOBAL_MALLOC(_channelitem);
357 if (item == NULL) {
358 PyErr_NoMemory();
359 return NULL;
360 }
361 item->data = NULL;
362 item->next = NULL;
363 return item;
364 }
365
366 static void
367 _channelitem_clear(_channelitem *item)
368 {
369 if (item->data != NULL) {
370 (void)_release_xid_data(item->data, 1);
371 // It was allocated in _channel_send().
372 GLOBAL_FREE(item->data);
373 item->data = NULL;
374 }
375 item->next = NULL;
376 }
377
378 static void
379 _channelitem_free(_channelitem *item)
380 {
381 _channelitem_clear(item);
382 GLOBAL_FREE(item);
383 }
384
385 static void
386 _channelitem_free_all(_channelitem *item)
387 {
388 while (item != NULL) {
389 _channelitem *last = item;
390 item = item->next;
391 _channelitem_free(last);
392 }
393 }
394
395 static _PyCrossInterpreterData *
396 _channelitem_popped(_channelitem *item)
397 {
398 _PyCrossInterpreterData *data = item->data;
399 item->data = NULL;
400 _channelitem_free(item);
401 return data;
402 }
403
404 typedef struct _channelqueue {
405 int64_t count;
406 _channelitem *first;
407 _channelitem *last;
408 } _channelqueue;
409
410 static _channelqueue *
411 _channelqueue_new(void)
412 {
413 _channelqueue *queue = GLOBAL_MALLOC(_channelqueue);
414 if (queue == NULL) {
415 PyErr_NoMemory();
416 return NULL;
417 }
418 queue->count = 0;
419 queue->first = NULL;
420 queue->last = NULL;
421 return queue;
422 }
423
424 static void
425 _channelqueue_clear(_channelqueue *queue)
426 {
427 _channelitem_free_all(queue->first);
428 queue->count = 0;
429 queue->first = NULL;
430 queue->last = NULL;
431 }
432
433 static void
434 _channelqueue_free(_channelqueue *queue)
435 {
436 _channelqueue_clear(queue);
437 GLOBAL_FREE(queue);
438 }
439
440 static int
441 _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
442 {
443 _channelitem *item = _channelitem_new();
444 if (item == NULL) {
445 return -1;
446 }
447 item->data = data;
448
449 queue->count += 1;
450 if (queue->first == NULL) {
451 queue->first = item;
452 }
453 else {
454 queue->last->next = item;
455 }
456 queue->last = item;
457 return 0;
458 }
459
460 static _PyCrossInterpreterData *
461 _channelqueue_get(_channelqueue *queue)
462 {
463 _channelitem *item = queue->first;
464 if (item == NULL) {
465 return NULL;
466 }
467 queue->first = item->next;
468 if (queue->last == item) {
469 queue->last = NULL;
470 }
471 queue->count -= 1;
472
473 return _channelitem_popped(item);
474 }
475
476 static void
477 _channelqueue_drop_interpreter(_channelqueue *queue, int64_t interp)
478 {
479 _channelitem *prev = NULL;
480 _channelitem *next = queue->first;
481 while (next != NULL) {
482 _channelitem *item = next;
483 next = item->next;
484 if (item->data->interp == interp) {
485 if (prev == NULL) {
486 queue->first = item->next;
487 }
488 else {
489 prev->next = item->next;
490 }
491 _channelitem_free(item);
492 queue->count -= 1;
493 }
494 else {
495 prev = item;
496 }
497 }
498 }
499
500 /* channel-interpreter associations */
501
502 struct _channelend;
503
504 typedef struct _channelend {
505 struct _channelend *next;
506 int64_t interp;
507 int open;
508 } _channelend;
509
510 static _channelend *
511 _channelend_new(int64_t interp)
512 {
513 _channelend *end = GLOBAL_MALLOC(_channelend);
514 if (end == NULL) {
515 PyErr_NoMemory();
516 return NULL;
517 }
518 end->next = NULL;
519 end->interp = interp;
520 end->open = 1;
521 return end;
522 }
523
524 static void
525 _channelend_free(_channelend *end)
526 {
527 GLOBAL_FREE(end);
528 }
529
530 static void
531 _channelend_free_all(_channelend *end)
532 {
533 while (end != NULL) {
534 _channelend *last = end;
535 end = end->next;
536 _channelend_free(last);
537 }
538 }
539
540 static _channelend *
541 _channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
542 {
543 _channelend *prev = NULL;
544 _channelend *end = first;
545 while (end != NULL) {
546 if (end->interp == interp) {
547 break;
548 }
549 prev = end;
550 end = end->next;
551 }
552 if (pprev != NULL) {
553 *pprev = prev;
554 }
555 return end;
556 }
557
558 typedef struct _channelassociations {
559 // Note that the list entries are never removed for interpreter
560 // for which the channel is closed. This should not be a problem in
561 // practice. Also, a channel isn't automatically closed when an
562 // interpreter is destroyed.
563 int64_t numsendopen;
564 int64_t numrecvopen;
565 _channelend *send;
566 _channelend *recv;
567 } _channelends;
568
569 static _channelends *
570 _channelends_new(void)
571 {
572 _channelends *ends = GLOBAL_MALLOC(_channelends);
573 if (ends== NULL) {
574 return NULL;
575 }
576 ends->numsendopen = 0;
577 ends->numrecvopen = 0;
578 ends->send = NULL;
579 ends->recv = NULL;
580 return ends;
581 }
582
583 static void
584 _channelends_clear(_channelends *ends)
585 {
586 _channelend_free_all(ends->send);
587 ends->send = NULL;
588 ends->numsendopen = 0;
589
590 _channelend_free_all(ends->recv);
591 ends->recv = NULL;
592 ends->numrecvopen = 0;
593 }
594
595 static void
596 _channelends_free(_channelends *ends)
597 {
598 _channelends_clear(ends);
599 GLOBAL_FREE(ends);
600 }
601
602 static _channelend *
603 _channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
604 int send)
605 {
606 _channelend *end = _channelend_new(interp);
607 if (end == NULL) {
608 return NULL;
609 }
610
611 if (prev == NULL) {
612 if (send) {
613 ends->send = end;
614 }
615 else {
616 ends->recv = end;
617 }
618 }
619 else {
620 prev->next = end;
621 }
622 if (send) {
623 ends->numsendopen += 1;
624 }
625 else {
626 ends->numrecvopen += 1;
627 }
628 return end;
629 }
630
631 static int
632 _channelends_associate(_channelends *ends, int64_t interp, int send)
633 {
634 _channelend *prev;
635 _channelend *end = _channelend_find(send ? ends->send : ends->recv,
636 interp, &prev);
637 if (end != NULL) {
638 if (!end->open) {
639 return ERR_CHANNEL_CLOSED;
640 }
641 // already associated
642 return 0;
643 }
644 if (_channelends_add(ends, prev, interp, send) == NULL) {
645 return -1;
646 }
647 return 0;
648 }
649
650 static int
651 _channelends_is_open(_channelends *ends)
652 {
653 if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
654 return 1;
655 }
656 if (ends->send == NULL && ends->recv == NULL) {
657 return 1;
658 }
659 return 0;
660 }
661
662 static void
663 _channelends_close_end(_channelends *ends, _channelend *end, int send)
664 {
665 end->open = 0;
666 if (send) {
667 ends->numsendopen -= 1;
668 }
669 else {
670 ends->numrecvopen -= 1;
671 }
672 }
673
674 static int
675 _channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
676 {
677 _channelend *prev;
678 _channelend *end;
679 if (which >= 0) { // send/both
680 end = _channelend_find(ends->send, interp, &prev);
681 if (end == NULL) {
682 // never associated so add it
683 end = _channelends_add(ends, prev, interp, 1);
684 if (end == NULL) {
685 return -1;
686 }
687 }
688 _channelends_close_end(ends, end, 1);
689 }
690 if (which <= 0) { // recv/both
691 end = _channelend_find(ends->recv, interp, &prev);
692 if (end == NULL) {
693 // never associated so add it
694 end = _channelends_add(ends, prev, interp, 0);
695 if (end == NULL) {
696 return -1;
697 }
698 }
699 _channelends_close_end(ends, end, 0);
700 }
701 return 0;
702 }
703
704 static void
705 _channelends_drop_interpreter(_channelends *ends, int64_t interp)
706 {
707 _channelend *end;
708 end = _channelend_find(ends->send, interp, NULL);
709 if (end != NULL) {
710 _channelends_close_end(ends, end, 1);
711 }
712 end = _channelend_find(ends->recv, interp, NULL);
713 if (end != NULL) {
714 _channelends_close_end(ends, end, 0);
715 }
716 }
717
718 static void
719 _channelends_close_all(_channelends *ends, int which, int force)
720 {
721 // XXX Handle the ends.
722 // XXX Handle force is True.
723
724 // Ensure all the "send"-associated interpreters are closed.
725 _channelend *end;
726 for (end = ends->send; end != NULL; end = end->next) {
727 _channelends_close_end(ends, end, 1);
728 }
729
730 // Ensure all the "recv"-associated interpreters are closed.
731 for (end = ends->recv; end != NULL; end = end->next) {
732 _channelends_close_end(ends, end, 0);
733 }
734 }
735
736 /* channels */
737
738 struct _channel;
739 struct _channel_closing;
740 static void _channel_clear_closing(struct _channel *);
741 static void _channel_finish_closing(struct _channel *);
742
743 typedef struct _channel {
744 PyThread_type_lock mutex;
745 _channelqueue *queue;
746 _channelends *ends;
747 int open;
748 struct _channel_closing *closing;
749 } _PyChannelState;
750
751 static _PyChannelState *
752 _channel_new(PyThread_type_lock mutex)
753 {
754 _PyChannelState *chan = GLOBAL_MALLOC(_PyChannelState);
755 if (chan == NULL) {
756 return NULL;
757 }
758 chan->mutex = mutex;
759 chan->queue = _channelqueue_new();
760 if (chan->queue == NULL) {
761 GLOBAL_FREE(chan);
762 return NULL;
763 }
764 chan->ends = _channelends_new();
765 if (chan->ends == NULL) {
766 _channelqueue_free(chan->queue);
767 GLOBAL_FREE(chan);
768 return NULL;
769 }
770 chan->open = 1;
771 chan->closing = NULL;
772 return chan;
773 }
774
775 static void
776 _channel_free(_PyChannelState *chan)
777 {
778 _channel_clear_closing(chan);
779 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
780 _channelqueue_free(chan->queue);
781 _channelends_free(chan->ends);
782 PyThread_release_lock(chan->mutex);
783
784 PyThread_free_lock(chan->mutex);
785 GLOBAL_FREE(chan);
786 }
787
788 static int
789 _channel_add(_PyChannelState *chan, int64_t interp,
790 _PyCrossInterpreterData *data)
791 {
792 int res = -1;
793 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
794
795 if (!chan->open) {
796 res = ERR_CHANNEL_CLOSED;
797 goto done;
798 }
799 if (_channelends_associate(chan->ends, interp, 1) != 0) {
800 res = ERR_CHANNEL_INTERP_CLOSED;
801 goto done;
802 }
803
804 if (_channelqueue_put(chan->queue, data) != 0) {
805 goto done;
806 }
807
808 res = 0;
809 done:
810 PyThread_release_lock(chan->mutex);
811 return res;
812 }
813
814 static int
815 _channel_next(_PyChannelState *chan, int64_t interp,
816 _PyCrossInterpreterData **res)
817 {
818 int err = 0;
819 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
820
821 if (!chan->open) {
822 err = ERR_CHANNEL_CLOSED;
823 goto done;
824 }
825 if (_channelends_associate(chan->ends, interp, 0) != 0) {
826 err = ERR_CHANNEL_INTERP_CLOSED;
827 goto done;
828 }
829
830 _PyCrossInterpreterData *data = _channelqueue_get(chan->queue);
831 if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) {
832 chan->open = 0;
833 }
834 *res = data;
835
836 done:
837 PyThread_release_lock(chan->mutex);
838 if (chan->queue->count == 0) {
839 _channel_finish_closing(chan);
840 }
841 return err;
842 }
843
844 static int
845 _channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
846 {
847 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
848
849 int res = -1;
850 if (!chan->open) {
851 res = ERR_CHANNEL_CLOSED;
852 goto done;
853 }
854
855 if (_channelends_close_interpreter(chan->ends, interp, end) != 0) {
856 goto done;
857 }
858 chan->open = _channelends_is_open(chan->ends);
859
860 res = 0;
861 done:
862 PyThread_release_lock(chan->mutex);
863 return res;
864 }
865
866 static void
867 _channel_drop_interpreter(_PyChannelState *chan, int64_t interp)
868 {
869 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
870
871 _channelqueue_drop_interpreter(chan->queue, interp);
872 _channelends_drop_interpreter(chan->ends, interp);
873 chan->open = _channelends_is_open(chan->ends);
874
875 PyThread_release_lock(chan->mutex);
876 }
877
878 static int
879 _channel_close_all(_PyChannelState *chan, int end, int force)
880 {
881 int res = -1;
882 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
883
884 if (!chan->open) {
885 res = ERR_CHANNEL_CLOSED;
886 goto done;
887 }
888
889 if (!force && chan->queue->count > 0) {
890 res = ERR_CHANNEL_NOT_EMPTY;
891 goto done;
892 }
893
894 chan->open = 0;
895
896 // We *could* also just leave these in place, since we've marked
897 // the channel as closed already.
898 _channelends_close_all(chan->ends, end, force);
899
900 res = 0;
901 done:
902 PyThread_release_lock(chan->mutex);
903 return res;
904 }
905
906 /* the set of channels */
907
908 struct _channelref;
909
910 typedef struct _channelref {
911 int64_t id;
912 _PyChannelState *chan;
913 struct _channelref *next;
914 Py_ssize_t objcount;
915 } _channelref;
916
917 static _channelref *
918 _channelref_new(int64_t id, _PyChannelState *chan)
919 {
920 _channelref *ref = GLOBAL_MALLOC(_channelref);
921 if (ref == NULL) {
922 return NULL;
923 }
924 ref->id = id;
925 ref->chan = chan;
926 ref->next = NULL;
927 ref->objcount = 0;
928 return ref;
929 }
930
931 //static void
932 //_channelref_clear(_channelref *ref)
933 //{
934 // ref->id = -1;
935 // ref->chan = NULL;
936 // ref->next = NULL;
937 // ref->objcount = 0;
938 //}
939
940 static void
941 _channelref_free(_channelref *ref)
942 {
943 if (ref->chan != NULL) {
944 _channel_clear_closing(ref->chan);
945 }
946 //_channelref_clear(ref);
947 GLOBAL_FREE(ref);
948 }
949
950 static _channelref *
951 _channelref_find(_channelref *first, int64_t id, _channelref **pprev)
952 {
953 _channelref *prev = NULL;
954 _channelref *ref = first;
955 while (ref != NULL) {
956 if (ref->id == id) {
957 break;
958 }
959 prev = ref;
960 ref = ref->next;
961 }
962 if (pprev != NULL) {
963 *pprev = prev;
964 }
965 return ref;
966 }
967
968 typedef struct _channels {
969 PyThread_type_lock mutex;
970 _channelref *head;
971 int64_t numopen;
972 int64_t next_id;
973 } _channels;
974
975 static void
976 _channels_init(_channels *channels, PyThread_type_lock mutex)
977 {
978 channels->mutex = mutex;
979 channels->head = NULL;
980 channels->numopen = 0;
981 channels->next_id = 0;
982 }
983
984 static void
985 _channels_fini(_channels *channels)
986 {
987 assert(channels->numopen == 0);
988 assert(channels->head == NULL);
989 if (channels->mutex != NULL) {
990 PyThread_free_lock(channels->mutex);
991 channels->mutex = NULL;
992 }
993 }
994
995 static int64_t
996 _channels_next_id(_channels *channels) // needs lock
997 {
998 int64_t id = channels->next_id;
999 if (id < 0) {
1000 /* overflow */
1001 return -1;
1002 }
1003 channels->next_id += 1;
1004 return id;
1005 }
1006
1007 static int
1008 _channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex,
1009 _PyChannelState **res)
1010 {
1011 int err = -1;
1012 _PyChannelState *chan = NULL;
1013 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1014 if (pmutex != NULL) {
1015 *pmutex = NULL;
1016 }
1017
1018 _channelref *ref = _channelref_find(channels->head, id, NULL);
1019 if (ref == NULL) {
1020 err = ERR_CHANNEL_NOT_FOUND;
1021 goto done;
1022 }
1023 if (ref->chan == NULL || !ref->chan->open) {
1024 err = ERR_CHANNEL_CLOSED;
1025 goto done;
1026 }
1027
1028 if (pmutex != NULL) {
1029 // The mutex will be closed by the caller.
1030 *pmutex = channels->mutex;
1031 }
1032
1033 chan = ref->chan;
1034 err = 0;
1035
1036 done:
1037 if (pmutex == NULL || *pmutex == NULL) {
1038 PyThread_release_lock(channels->mutex);
1039 }
1040 *res = chan;
1041 return err;
1042 }
1043
1044 static int64_t
1045 _channels_add(_channels *channels, _PyChannelState *chan)
1046 {
1047 int64_t cid = -1;
1048 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1049
1050 // Create a new ref.
1051 int64_t id = _channels_next_id(channels);
1052 if (id < 0) {
1053 cid = ERR_NO_NEXT_CHANNEL_ID;
1054 goto done;
1055 }
1056 _channelref *ref = _channelref_new(id, chan);
1057 if (ref == NULL) {
1058 goto done;
1059 }
1060
1061 // Add it to the list.
1062 // We assume that the channel is a new one (not already in the list).
1063 ref->next = channels->head;
1064 channels->head = ref;
1065 channels->numopen += 1;
1066
1067 cid = id;
1068 done:
1069 PyThread_release_lock(channels->mutex);
1070 return cid;
1071 }
1072
1073 /* forward */
1074 static int _channel_set_closing(struct _channelref *, PyThread_type_lock);
1075
1076 static int
1077 _channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan,
1078 int end, int force)
1079 {
1080 int res = -1;
1081 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1082 if (pchan != NULL) {
1083 *pchan = NULL;
1084 }
1085
1086 _channelref *ref = _channelref_find(channels->head, cid, NULL);
1087 if (ref == NULL) {
1088 res = ERR_CHANNEL_NOT_FOUND;
1089 goto done;
1090 }
1091
1092 if (ref->chan == NULL) {
1093 res = ERR_CHANNEL_CLOSED;
1094 goto done;
1095 }
1096 else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) {
1097 res = ERR_CHANNEL_CLOSED;
1098 goto done;
1099 }
1100 else {
1101 int err = _channel_close_all(ref->chan, end, force);
1102 if (err != 0) {
1103 if (end == CHANNEL_SEND && err == ERR_CHANNEL_NOT_EMPTY) {
1104 if (ref->chan->closing != NULL) {
1105 res = ERR_CHANNEL_CLOSED;
1106 goto done;
1107 }
1108 // Mark the channel as closing and return. The channel
1109 // will be cleaned up in _channel_next().
1110 PyErr_Clear();
1111 int err = _channel_set_closing(ref, channels->mutex);
1112 if (err != 0) {
1113 res = err;
1114 goto done;
1115 }
1116 if (pchan != NULL) {
1117 *pchan = ref->chan;
1118 }
1119 res = 0;
1120 }
1121 else {
1122 res = err;
1123 }
1124 goto done;
1125 }
1126 if (pchan != NULL) {
1127 *pchan = ref->chan;
1128 }
1129 else {
1130 _channel_free(ref->chan);
1131 }
1132 ref->chan = NULL;
1133 }
1134
1135 res = 0;
1136 done:
1137 PyThread_release_lock(channels->mutex);
1138 return res;
1139 }
1140
1141 static void
1142 _channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
1143 _PyChannelState **pchan)
1144 {
1145 if (ref == channels->head) {
1146 channels->head = ref->next;
1147 }
1148 else {
1149 prev->next = ref->next;
1150 }
1151 channels->numopen -= 1;
1152
1153 if (pchan != NULL) {
1154 *pchan = ref->chan;
1155 }
1156 _channelref_free(ref);
1157 }
1158
1159 static int
1160 _channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan)
1161 {
1162 int res = -1;
1163 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1164
1165 if (pchan != NULL) {
1166 *pchan = NULL;
1167 }
1168
1169 _channelref *prev = NULL;
1170 _channelref *ref = _channelref_find(channels->head, id, &prev);
1171 if (ref == NULL) {
1172 res = ERR_CHANNEL_NOT_FOUND;
1173 goto done;
1174 }
1175
1176 _channels_remove_ref(channels, ref, prev, pchan);
1177
1178 res = 0;
1179 done:
1180 PyThread_release_lock(channels->mutex);
1181 return res;
1182 }
1183
1184 static int
1185 _channels_add_id_object(_channels *channels, int64_t id)
1186 {
1187 int res = -1;
1188 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1189
1190 _channelref *ref = _channelref_find(channels->head, id, NULL);
1191 if (ref == NULL) {
1192 res = ERR_CHANNEL_NOT_FOUND;
1193 goto done;
1194 }
1195 ref->objcount += 1;
1196
1197 res = 0;
1198 done:
1199 PyThread_release_lock(channels->mutex);
1200 return res;
1201 }
1202
1203 static void
1204 _channels_drop_id_object(_channels *channels, int64_t id)
1205 {
1206 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1207
1208 _channelref *prev = NULL;
1209 _channelref *ref = _channelref_find(channels->head, id, &prev);
1210 if (ref == NULL) {
1211 // Already destroyed.
1212 goto done;
1213 }
1214 ref->objcount -= 1;
1215
1216 // Destroy if no longer used.
1217 if (ref->objcount == 0) {
1218 _PyChannelState *chan = NULL;
1219 _channels_remove_ref(channels, ref, prev, &chan);
1220 if (chan != NULL) {
1221 _channel_free(chan);
1222 }
1223 }
1224
1225 done:
1226 PyThread_release_lock(channels->mutex);
1227 }
1228
1229 static int64_t *
1230 _channels_list_all(_channels *channels, int64_t *count)
1231 {
1232 int64_t *cids = NULL;
1233 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1234 int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
1235 if (ids == NULL) {
1236 goto done;
1237 }
1238 _channelref *ref = channels->head;
1239 for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
1240 ids[i] = ref->id;
1241 }
1242 *count = channels->numopen;
1243
1244 cids = ids;
1245 done:
1246 PyThread_release_lock(channels->mutex);
1247 return cids;
1248 }
1249
1250 static void
1251 _channels_drop_interpreter(_channels *channels, int64_t interp)
1252 {
1253 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1254
1255 _channelref *ref = channels->head;
1256 for (; ref != NULL; ref = ref->next) {
1257 if (ref->chan != NULL) {
1258 _channel_drop_interpreter(ref->chan, interp);
1259 }
1260 }
1261
1262 PyThread_release_lock(channels->mutex);
1263 }
1264
1265 /* support for closing non-empty channels */
1266
1267 struct _channel_closing {
1268 struct _channelref *ref;
1269 };
1270
1271 static int
1272 _channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
1273 struct _channel *chan = ref->chan;
1274 if (chan == NULL) {
1275 // already closed
1276 return 0;
1277 }
1278 int res = -1;
1279 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
1280 if (chan->closing != NULL) {
1281 res = ERR_CHANNEL_CLOSED;
1282 goto done;
1283 }
1284 chan->closing = GLOBAL_MALLOC(struct _channel_closing);
1285 if (chan->closing == NULL) {
1286 goto done;
1287 }
1288 chan->closing->ref = ref;
1289
1290 res = 0;
1291 done:
1292 PyThread_release_lock(chan->mutex);
1293 return res;
1294 }
1295
1296 static void
1297 _channel_clear_closing(struct _channel *chan) {
1298 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
1299 if (chan->closing != NULL) {
1300 GLOBAL_FREE(chan->closing);
1301 chan->closing = NULL;
1302 }
1303 PyThread_release_lock(chan->mutex);
1304 }
1305
1306 static void
1307 _channel_finish_closing(struct _channel *chan) {
1308 struct _channel_closing *closing = chan->closing;
1309 if (closing == NULL) {
1310 return;
1311 }
1312 _channelref *ref = closing->ref;
1313 _channel_clear_closing(chan);
1314 // Do the things that would have been done in _channels_close().
1315 ref->chan = NULL;
1316 _channel_free(chan);
1317 }
1318
1319 /* "high"-level channel-related functions */
1320
1321 static int64_t
1322 _channel_create(_channels *channels)
1323 {
1324 PyThread_type_lock mutex = PyThread_allocate_lock();
1325 if (mutex == NULL) {
1326 return ERR_CHANNEL_MUTEX_INIT;
1327 }
1328 _PyChannelState *chan = _channel_new(mutex);
1329 if (chan == NULL) {
1330 PyThread_free_lock(mutex);
1331 return -1;
1332 }
1333 int64_t id = _channels_add(channels, chan);
1334 if (id < 0) {
1335 _channel_free(chan);
1336 }
1337 return id;
1338 }
1339
1340 static int
1341 _channel_destroy(_channels *channels, int64_t id)
1342 {
1343 _PyChannelState *chan = NULL;
1344 int err = _channels_remove(channels, id, &chan);
1345 if (err != 0) {
1346 return err;
1347 }
1348 if (chan != NULL) {
1349 _channel_free(chan);
1350 }
1351 return 0;
1352 }
1353
1354 static int
1355 _channel_send(_channels *channels, int64_t id, PyObject *obj)
1356 {
1357 PyInterpreterState *interp = _get_current_interp();
1358 if (interp == NULL) {
1359 return -1;
1360 }
1361
1362 // Look up the channel.
1363 PyThread_type_lock mutex = NULL;
1364 _PyChannelState *chan = NULL;
1365 int err = _channels_lookup(channels, id, &mutex, &chan);
1366 if (err != 0) {
1367 return err;
1368 }
1369 assert(chan != NULL);
1370 // Past this point we are responsible for releasing the mutex.
1371
1372 if (chan->closing != NULL) {
1373 PyThread_release_lock(mutex);
1374 return ERR_CHANNEL_CLOSED;
1375 }
1376
1377 // Convert the object to cross-interpreter data.
1378 _PyCrossInterpreterData *data = GLOBAL_MALLOC(_PyCrossInterpreterData);
1379 if (data == NULL) {
1380 PyThread_release_lock(mutex);
1381 return -1;
1382 }
1383 if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
1384 PyThread_release_lock(mutex);
1385 GLOBAL_FREE(data);
1386 return -1;
1387 }
1388
1389 // Add the data to the channel.
1390 int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
1391 PyThread_release_lock(mutex);
1392 if (res != 0) {
1393 // We may chain an exception here:
1394 (void)_release_xid_data(data, 0);
1395 GLOBAL_FREE(data);
1396 return res;
1397 }
1398
1399 return 0;
1400 }
1401
1402 static int
1403 _channel_recv(_channels *channels, int64_t id, PyObject **res)
1404 {
1405 int err;
1406 *res = NULL;
1407
1408 PyInterpreterState *interp = _get_current_interp();
1409 if (interp == NULL) {
1410 // XXX Is this always an error?
1411 if (PyErr_Occurred()) {
1412 return -1;
1413 }
1414 return 0;
1415 }
1416
1417 // Look up the channel.
1418 PyThread_type_lock mutex = NULL;
1419 _PyChannelState *chan = NULL;
1420 err = _channels_lookup(channels, id, &mutex, &chan);
1421 if (err != 0) {
1422 return err;
1423 }
1424 assert(chan != NULL);
1425 // Past this point we are responsible for releasing the mutex.
1426
1427 // Pop off the next item from the channel.
1428 _PyCrossInterpreterData *data = NULL;
1429 err = _channel_next(chan, PyInterpreterState_GetID(interp), &data);
1430 PyThread_release_lock(mutex);
1431 if (err != 0) {
1432 return err;
1433 }
1434 else if (data == NULL) {
1435 assert(!PyErr_Occurred());
1436 return 0;
1437 }
1438
1439 // Convert the data back to an object.
1440 PyObject *obj = _PyCrossInterpreterData_NewObject(data);
1441 if (obj == NULL) {
1442 assert(PyErr_Occurred());
1443 (void)_release_xid_data(data, 1);
1444 // It was allocated in _channel_send().
1445 GLOBAL_FREE(data);
1446 return -1;
1447 }
1448 int release_res = _release_xid_data(data, 0);
1449 // It was allocated in _channel_send().
1450 GLOBAL_FREE(data);
1451 if (release_res < 0) {
1452 // The source interpreter has been destroyed already.
1453 assert(PyErr_Occurred());
1454 Py_DECREF(obj);
1455 return -1;
1456 }
1457
1458 *res = obj;
1459 return 0;
1460 }
1461
1462 static int
1463 _channel_drop(_channels *channels, int64_t id, int send, int recv)
1464 {
1465 PyInterpreterState *interp = _get_current_interp();
1466 if (interp == NULL) {
1467 return -1;
1468 }
1469
1470 // Look up the channel.
1471 PyThread_type_lock mutex = NULL;
1472 _PyChannelState *chan = NULL;
1473 int err = _channels_lookup(channels, id, &mutex, &chan);
1474 if (err != 0) {
1475 return err;
1476 }
1477 // Past this point we are responsible for releasing the mutex.
1478
1479 // Close one or both of the two ends.
1480 int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv);
1481 PyThread_release_lock(mutex);
1482 return res;
1483 }
1484
1485 static int
1486 _channel_close(_channels *channels, int64_t id, int end, int force)
1487 {
1488 return _channels_close(channels, id, NULL, end, force);
1489 }
1490
1491 static int
1492 _channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
1493 int send)
1494 {
1495 _PyChannelState *chan = NULL;
1496 int err = _channels_lookup(channels, cid, NULL, &chan);
1497 if (err != 0) {
1498 return err;
1499 }
1500 else if (send && chan->closing != NULL) {
1501 return ERR_CHANNEL_CLOSED;
1502 }
1503
1504 _channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
1505 interp, NULL);
1506
1507 return (end != NULL && end->open);
1508 }
1509
1510 /* ChannelID class */
1511
1512 typedef struct channelid {
1513 PyObject_HEAD
1514 int64_t id;
1515 int end;
1516 int resolve;
1517 _channels *channels;
1518 } channelid;
1519
1520 struct channel_id_converter_data {
1521 PyObject *module;
1522 int64_t cid;
1523 };
1524
1525 static int
1526 channel_id_converter(PyObject *arg, void *ptr)
1527 {
1528 int64_t cid;
1529 struct channel_id_converter_data *data = ptr;
1530 module_state *state = get_module_state(data->module);
1531 assert(state != NULL);
1532 if (PyObject_TypeCheck(arg, state->ChannelIDType)) {
1533 cid = ((channelid *)arg)->id;
1534 }
1535 else if (PyIndex_Check(arg)) {
1536 cid = PyLong_AsLongLong(arg);
1537 if (cid == -1 && PyErr_Occurred()) {
1538 return 0;
1539 }
1540 if (cid < 0) {
1541 PyErr_Format(PyExc_ValueError,
1542 "channel ID must be a non-negative int, got %R", arg);
1543 return 0;
1544 }
1545 }
1546 else {
1547 PyErr_Format(PyExc_TypeError,
1548 "channel ID must be an int, got %.100s",
1549 Py_TYPE(arg)->tp_name);
1550 return 0;
1551 }
1552 data->cid = cid;
1553 return 1;
1554 }
1555
1556 static int
1557 newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
1558 int force, int resolve, channelid **res)
1559 {
1560 *res = NULL;
1561
1562 channelid *self = PyObject_New(channelid, cls);
1563 if (self == NULL) {
1564 return -1;
1565 }
1566 self->id = cid;
1567 self->end = end;
1568 self->resolve = resolve;
1569 self->channels = channels;
1570
1571 int err = _channels_add_id_object(channels, cid);
1572 if (err != 0) {
1573 if (force && err == ERR_CHANNEL_NOT_FOUND) {
1574 assert(!PyErr_Occurred());
1575 }
1576 else {
1577 Py_DECREF((PyObject *)self);
1578 return err;
1579 }
1580 }
1581
1582 *res = self;
1583 return 0;
1584 }
1585
1586 static _channels * _global_channels(void);
1587
1588 static PyObject *
1589 _channelid_new(PyObject *mod, PyTypeObject *cls,
1590 PyObject *args, PyObject *kwds)
1591 {
1592 static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL};
1593 int64_t cid;
1594 struct channel_id_converter_data cid_data = {
1595 .module = mod,
1596 };
1597 int send = -1;
1598 int recv = -1;
1599 int force = 0;
1600 int resolve = 0;
1601 if (!PyArg_ParseTupleAndKeywords(args, kwds,
1602 "O&|$pppp:ChannelID.__new__", kwlist,
1603 channel_id_converter, &cid_data,
1604 &send, &recv, &force, &resolve)) {
1605 return NULL;
1606 }
1607 cid = cid_data.cid;
1608
1609 // Handle "send" and "recv".
1610 if (send == 0 && recv == 0) {
1611 PyErr_SetString(PyExc_ValueError,
1612 "'send' and 'recv' cannot both be False");
1613 return NULL;
1614 }
1615
1616 int end = 0;
1617 if (send == 1) {
1618 if (recv == 0 || recv == -1) {
1619 end = CHANNEL_SEND;
1620 }
1621 }
1622 else if (recv == 1) {
1623 end = CHANNEL_RECV;
1624 }
1625
1626 PyObject *id = NULL;
1627 int err = newchannelid(cls, cid, end, _global_channels(),
1628 force, resolve,
1629 (channelid **)&id);
1630 if (handle_channel_error(err, mod, cid)) {
1631 assert(id == NULL);
1632 return NULL;
1633 }
1634 assert(id != NULL);
1635 return id;
1636 }
1637
1638 static void
1639 channelid_dealloc(PyObject *self)
1640 {
1641 int64_t cid = ((channelid *)self)->id;
1642 _channels *channels = ((channelid *)self)->channels;
1643
1644 PyTypeObject *tp = Py_TYPE(self);
1645 tp->tp_free(self);
1646 /* "Instances of heap-allocated types hold a reference to their type."
1647 * See: https://docs.python.org/3.11/howto/isolating-extensions.html#garbage-collection-protocol
1648 * See: https://docs.python.org/3.11/c-api/typeobj.html#c.PyTypeObject.tp_traverse
1649 */
1650 // XXX Why don't we implement Py_TPFLAGS_HAVE_GC, e.g. Py_tp_traverse,
1651 // like we do for _abc._abc_data?
1652 Py_DECREF(tp);
1653
1654 _channels_drop_id_object(channels, cid);
1655 }
1656
1657 static PyObject *
1658 channelid_repr(PyObject *self)
1659 {
1660 PyTypeObject *type = Py_TYPE(self);
1661 const char *name = _PyType_Name(type);
1662
1663 channelid *cid = (channelid *)self;
1664 const char *fmt;
1665 if (cid->end == CHANNEL_SEND) {
1666 fmt = "%s(%" PRId64 ", send=True)";
1667 }
1668 else if (cid->end == CHANNEL_RECV) {
1669 fmt = "%s(%" PRId64 ", recv=True)";
1670 }
1671 else {
1672 fmt = "%s(%" PRId64 ")";
1673 }
1674 return PyUnicode_FromFormat(fmt, name, cid->id);
1675 }
1676
1677 static PyObject *
1678 channelid_str(PyObject *self)
1679 {
1680 channelid *cid = (channelid *)self;
1681 return PyUnicode_FromFormat("%" PRId64 "", cid->id);
1682 }
1683
1684 static PyObject *
1685 channelid_int(PyObject *self)
1686 {
1687 channelid *cid = (channelid *)self;
1688 return PyLong_FromLongLong(cid->id);
1689 }
1690
1691 static Py_hash_t
1692 channelid_hash(PyObject *self)
1693 {
1694 channelid *cid = (channelid *)self;
1695 PyObject *id = PyLong_FromLongLong(cid->id);
1696 if (id == NULL) {
1697 return -1;
1698 }
1699 Py_hash_t hash = PyObject_Hash(id);
1700 Py_DECREF(id);
1701 return hash;
1702 }
1703
1704 static PyObject *
1705 channelid_richcompare(PyObject *self, PyObject *other, int op)
1706 {
1707 PyObject *res = NULL;
1708 if (op != Py_EQ && op != Py_NE) {
1709 Py_RETURN_NOTIMPLEMENTED;
1710 }
1711
1712 PyObject *mod = get_module_from_type(Py_TYPE(self));
1713 if (mod == NULL) {
1714 return NULL;
1715 }
1716 module_state *state = get_module_state(mod);
1717 if (state == NULL) {
1718 goto done;
1719 }
1720
1721 if (!PyObject_TypeCheck(self, state->ChannelIDType)) {
1722 res = Py_NewRef(Py_NotImplemented);
1723 goto done;
1724 }
1725
1726 channelid *cid = (channelid *)self;
1727 int equal;
1728 if (PyObject_TypeCheck(other, state->ChannelIDType)) {
1729 channelid *othercid = (channelid *)other;
1730 equal = (cid->end == othercid->end) && (cid->id == othercid->id);
1731 }
1732 else if (PyLong_Check(other)) {
1733 /* Fast path */
1734 int overflow;
1735 long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow);
1736 if (othercid == -1 && PyErr_Occurred()) {
1737 goto done;
1738 }
1739 equal = !overflow && (othercid >= 0) && (cid->id == othercid);
1740 }
1741 else if (PyNumber_Check(other)) {
1742 PyObject *pyid = PyLong_FromLongLong(cid->id);
1743 if (pyid == NULL) {
1744 goto done;
1745 }
1746 res = PyObject_RichCompare(pyid, other, op);
1747 Py_DECREF(pyid);
1748 goto done;
1749 }
1750 else {
1751 res = Py_NewRef(Py_NotImplemented);
1752 goto done;
1753 }
1754
1755 if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) {
1756 res = Py_NewRef(Py_True);
1757 }
1758 else {
1759 res = Py_NewRef(Py_False);
1760 }
1761
1762 done:
1763 Py_DECREF(mod);
1764 return res;
1765 }
1766
1767 static PyObject *
1768 _channel_from_cid(PyObject *cid, int end)
1769 {
1770 PyObject *highlevel = PyImport_ImportModule("interpreters");
1771 if (highlevel == NULL) {
1772 PyErr_Clear();
1773 highlevel = PyImport_ImportModule("test.support.interpreters");
1774 if (highlevel == NULL) {
1775 return NULL;
1776 }
1777 }
1778 const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" :
1779 "SendChannel";
1780 PyObject *cls = PyObject_GetAttrString(highlevel, clsname);
1781 Py_DECREF(highlevel);
1782 if (cls == NULL) {
1783 return NULL;
1784 }
1785 PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL);
1786 Py_DECREF(cls);
1787 if (chan == NULL) {
1788 return NULL;
1789 }
1790 return chan;
1791 }
1792
1793 struct _channelid_xid {
1794 int64_t id;
1795 int end;
1796 int resolve;
1797 };
1798
1799 static PyObject *
1800 _channelid_from_xid(_PyCrossInterpreterData *data)
1801 {
1802 struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
1803
1804 // It might not be imported yet, so we can't use _get_current_module().
1805 PyObject *mod = PyImport_ImportModule(MODULE_NAME);
1806 if (mod == NULL) {
1807 return NULL;
1808 }
1809 assert(mod != Py_None);
1810 module_state *state = get_module_state(mod);
1811 if (state == NULL) {
1812 return NULL;
1813 }
1814
1815 // Note that we do not preserve the "resolve" flag.
1816 PyObject *cid = NULL;
1817 int err = newchannelid(state->ChannelIDType, xid->id, xid->end,
1818 _global_channels(), 0, 0,
1819 (channelid **)&cid);
1820 if (err != 0) {
1821 assert(cid == NULL);
1822 (void)handle_channel_error(err, mod, xid->id);
1823 goto done;
1824 }
1825 assert(cid != NULL);
1826 if (xid->end == 0) {
1827 goto done;
1828 }
1829 if (!xid->resolve) {
1830 goto done;
1831 }
1832
1833 /* Try returning a high-level channel end but fall back to the ID. */
1834 PyObject *chan = _channel_from_cid(cid, xid->end);
1835 if (chan == NULL) {
1836 PyErr_Clear();
1837 goto done;
1838 }
1839 Py_DECREF(cid);
1840 cid = chan;
1841
1842 done:
1843 Py_DECREF(mod);
1844 return cid;
1845 }
1846
1847 static int
1848 _channelid_shared(PyThreadState *tstate, PyObject *obj,
1849 _PyCrossInterpreterData *data)
1850 {
1851 if (_PyCrossInterpreterData_InitWithSize(
1852 data, tstate->interp, sizeof(struct _channelid_xid), obj,
1853 _channelid_from_xid
1854 ) < 0)
1855 {
1856 return -1;
1857 }
1858 struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
1859 xid->id = ((channelid *)obj)->id;
1860 xid->end = ((channelid *)obj)->end;
1861 xid->resolve = ((channelid *)obj)->resolve;
1862 return 0;
1863 }
1864
1865 static PyObject *
1866 channelid_end(PyObject *self, void *end)
1867 {
1868 int force = 1;
1869 channelid *cid = (channelid *)self;
1870 if (end != NULL) {
1871 PyObject *id = NULL;
1872 int err = newchannelid(Py_TYPE(self), cid->id, *(int *)end,
1873 cid->channels, force, cid->resolve,
1874 (channelid **)&id);
1875 if (err != 0) {
1876 assert(id == NULL);
1877 PyObject *mod = get_module_from_type(Py_TYPE(self));
1878 if (mod == NULL) {
1879 return NULL;
1880 }
1881 (void)handle_channel_error(err, mod, cid->id);
1882 Py_DECREF(mod);
1883 return NULL;
1884 }
1885 assert(id != NULL);
1886 return id;
1887 }
1888
1889 if (cid->end == CHANNEL_SEND) {
1890 return PyUnicode_InternFromString("send");
1891 }
1892 if (cid->end == CHANNEL_RECV) {
1893 return PyUnicode_InternFromString("recv");
1894 }
1895 return PyUnicode_InternFromString("both");
1896 }
1897
1898 static int _channelid_end_send = CHANNEL_SEND;
1899 static int _channelid_end_recv = CHANNEL_RECV;
1900
1901 static PyGetSetDef channelid_getsets[] = {
1902 {"end", (getter)channelid_end, NULL,
1903 PyDoc_STR("'send', 'recv', or 'both'")},
1904 {"send", (getter)channelid_end, NULL,
1905 PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send},
1906 {"recv", (getter)channelid_end, NULL,
1907 PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv},
1908 {NULL}
1909 };
1910
1911 PyDoc_STRVAR(channelid_doc,
1912 "A channel ID identifies a channel and may be used as an int.");
1913
1914 static PyType_Slot ChannelIDType_slots[] = {
1915 {Py_tp_dealloc, (destructor)channelid_dealloc},
1916 {Py_tp_doc, (void *)channelid_doc},
1917 {Py_tp_repr, (reprfunc)channelid_repr},
1918 {Py_tp_str, (reprfunc)channelid_str},
1919 {Py_tp_hash, channelid_hash},
1920 {Py_tp_richcompare, channelid_richcompare},
1921 {Py_tp_getset, channelid_getsets},
1922 // number slots
1923 {Py_nb_int, (unaryfunc)channelid_int},
1924 {Py_nb_index, (unaryfunc)channelid_int},
1925 {0, NULL},
1926 };
1927
1928 static PyType_Spec ChannelIDType_spec = {
1929 .name = MODULE_NAME ".ChannelID",
1930 .basicsize = sizeof(channelid),
1931 .flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
1932 Py_TPFLAGS_DISALLOW_INSTANTIATION | Py_TPFLAGS_IMMUTABLETYPE),
1933 .slots = ChannelIDType_slots,
1934 };
1935
1936
1937 /* module level code ********************************************************/
1938
1939 /* globals is the process-global state for the module. It holds all
1940 the data that we need to share between interpreters, so it cannot
1941 hold PyObject values. */
1942 static struct globals {
1943 int module_count;
1944 _channels channels;
1945 } _globals = {0};
1946
1947 static int
1948 _globals_init(void)
1949 {
1950 // XXX This isn't thread-safe.
1951 _globals.module_count++;
1952 if (_globals.module_count > 1) {
1953 // Already initialized.
1954 return 0;
1955 }
1956
1957 assert(_globals.channels.mutex == NULL);
1958 PyThread_type_lock mutex = PyThread_allocate_lock();
1959 if (mutex == NULL) {
1960 return ERR_CHANNELS_MUTEX_INIT;
1961 }
1962 _channels_init(&_globals.channels, mutex);
1963 return 0;
1964 }
1965
1966 static void
1967 _globals_fini(void)
1968 {
1969 // XXX This isn't thread-safe.
1970 _globals.module_count--;
1971 if (_globals.module_count > 0) {
1972 return;
1973 }
1974
1975 _channels_fini(&_globals.channels);
1976 }
1977
1978 static _channels *
1979 _global_channels(void) {
1980 return &_globals.channels;
1981 }
1982
1983
1984 static void
1985 clear_interpreter(void *data)
1986 {
1987 if (_globals.module_count == 0) {
1988 return;
1989 }
1990 PyInterpreterState *interp = (PyInterpreterState *)data;
1991 assert(interp == _get_current_interp());
1992 int64_t id = PyInterpreterState_GetID(interp);
1993 _channels_drop_interpreter(&_globals.channels, id);
1994 }
1995
1996
1997 static PyObject *
1998 channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
1999 {
2000 int64_t cid = _channel_create(&_globals.channels);
2001 if (cid < 0) {
2002 (void)handle_channel_error(-1, self, cid);
2003 return NULL;
2004 }
2005 module_state *state = get_module_state(self);
2006 if (state == NULL) {
2007 return NULL;
2008 }
2009 PyObject *id = NULL;
2010 int err = newchannelid(state->ChannelIDType, cid, 0,
2011 &_globals.channels, 0, 0,
2012 (channelid **)&id);
2013 if (handle_channel_error(err, self, cid)) {
2014 assert(id == NULL);
2015 err = _channel_destroy(&_globals.channels, cid);
2016 if (handle_channel_error(err, self, cid)) {
2017 // XXX issue a warning?
2018 }
2019 return NULL;
2020 }
2021 assert(id != NULL);
2022 assert(((channelid *)id)->channels != NULL);
2023 return id;
2024 }
2025
2026 PyDoc_STRVAR(channel_create_doc,
2027 "channel_create() -> cid\n\
2028 \n\
2029 Create a new cross-interpreter channel and return a unique generated ID.");
2030
2031 static PyObject *
2032 channel_destroy(PyObject *self, PyObject *args, PyObject *kwds)
2033 {
2034 static char *kwlist[] = {"cid", NULL};
2035 int64_t cid;
2036 struct channel_id_converter_data cid_data = {
2037 .module = self,
2038 };
2039 if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist,
2040 channel_id_converter, &cid_data)) {
2041 return NULL;
2042 }
2043 cid = cid_data.cid;
2044
2045 int err = _channel_destroy(&_globals.channels, cid);
2046 if (handle_channel_error(err, self, cid)) {
2047 return NULL;
2048 }
2049 Py_RETURN_NONE;
2050 }
2051
2052 PyDoc_STRVAR(channel_destroy_doc,
2053 "channel_destroy(cid)\n\
2054 \n\
2055 Close and finalize the channel. Afterward attempts to use the channel\n\
2056 will behave as though it never existed.");
2057
2058 static PyObject *
2059 channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
2060 {
2061 int64_t count = 0;
2062 int64_t *cids = _channels_list_all(&_globals.channels, &count);
2063 if (cids == NULL) {
2064 if (count == 0) {
2065 return PyList_New(0);
2066 }
2067 return NULL;
2068 }
2069 PyObject *ids = PyList_New((Py_ssize_t)count);
2070 if (ids == NULL) {
2071 goto finally;
2072 }
2073 module_state *state = get_module_state(self);
2074 if (state == NULL) {
2075 Py_DECREF(ids);
2076 ids = NULL;
2077 goto finally;
2078 }
2079 int64_t *cur = cids;
2080 for (int64_t i=0; i < count; cur++, i++) {
2081 PyObject *id = NULL;
2082 int err = newchannelid(state->ChannelIDType, *cur, 0,
2083 &_globals.channels, 0, 0,
2084 (channelid **)&id);
2085 if (handle_channel_error(err, self, *cur)) {
2086 assert(id == NULL);
2087 Py_SETREF(ids, NULL);
2088 break;
2089 }
2090 assert(id != NULL);
2091 PyList_SET_ITEM(ids, (Py_ssize_t)i, id);
2092 }
2093
2094 finally:
2095 PyMem_Free(cids);
2096 return ids;
2097 }
2098
2099 PyDoc_STRVAR(channel_list_all_doc,
2100 "channel_list_all() -> [cid]\n\
2101 \n\
2102 Return the list of all IDs for active channels.");
2103
2104 static PyObject *
2105 channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
2106 {
2107 static char *kwlist[] = {"cid", "send", NULL};
2108 int64_t cid; /* Channel ID */
2109 struct channel_id_converter_data cid_data = {
2110 .module = self,
2111 };
2112 int send = 0; /* Send or receive end? */
2113 int64_t id;
2114 PyObject *ids, *id_obj;
2115 PyInterpreterState *interp;
2116
2117 if (!PyArg_ParseTupleAndKeywords(
2118 args, kwds, "O&$p:channel_list_interpreters",
2119 kwlist, channel_id_converter, &cid_data, &send)) {
2120 return NULL;
2121 }
2122 cid = cid_data.cid;
2123
2124 ids = PyList_New(0);
2125 if (ids == NULL) {
2126 goto except;
2127 }
2128
2129 interp = PyInterpreterState_Head();
2130 while (interp != NULL) {
2131 id = PyInterpreterState_GetID(interp);
2132 assert(id >= 0);
2133 int res = _channel_is_associated(&_globals.channels, cid, id, send);
2134 if (res < 0) {
2135 (void)handle_channel_error(res, self, cid);
2136 goto except;
2137 }
2138 if (res) {
2139 id_obj = _PyInterpreterState_GetIDObject(interp);
2140 if (id_obj == NULL) {
2141 goto except;
2142 }
2143 res = PyList_Insert(ids, 0, id_obj);
2144 Py_DECREF(id_obj);
2145 if (res < 0) {
2146 goto except;
2147 }
2148 }
2149 interp = PyInterpreterState_Next(interp);
2150 }
2151
2152 goto finally;
2153
2154 except:
2155 Py_CLEAR(ids);
2156
2157 finally:
2158 return ids;
2159 }
2160
2161 PyDoc_STRVAR(channel_list_interpreters_doc,
2162 "channel_list_interpreters(cid, *, send) -> [id]\n\
2163 \n\
2164 Return the list of all interpreter IDs associated with an end of the channel.\n\
2165 \n\
2166 The 'send' argument should be a boolean indicating whether to use the send or\n\
2167 receive end.");
2168
2169
2170 static PyObject *
2171 channel_send(PyObject *self, PyObject *args, PyObject *kwds)
2172 {
2173 static char *kwlist[] = {"cid", "obj", NULL};
2174 int64_t cid;
2175 struct channel_id_converter_data cid_data = {
2176 .module = self,
2177 };
2178 PyObject *obj;
2179 if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist,
2180 channel_id_converter, &cid_data, &obj)) {
2181 return NULL;
2182 }
2183 cid = cid_data.cid;
2184
2185 int err = _channel_send(&_globals.channels, cid, obj);
2186 if (handle_channel_error(err, self, cid)) {
2187 return NULL;
2188 }
2189 Py_RETURN_NONE;
2190 }
2191
2192 PyDoc_STRVAR(channel_send_doc,
2193 "channel_send(cid, obj)\n\
2194 \n\
2195 Add the object's data to the channel's queue.");
2196
2197 static PyObject *
2198 channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
2199 {
2200 static char *kwlist[] = {"cid", "default", NULL};
2201 int64_t cid;
2202 struct channel_id_converter_data cid_data = {
2203 .module = self,
2204 };
2205 PyObject *dflt = NULL;
2206 if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&|O:channel_recv", kwlist,
2207 channel_id_converter, &cid_data, &dflt)) {
2208 return NULL;
2209 }
2210 cid = cid_data.cid;
2211
2212 PyObject *obj = NULL;
2213 int err = _channel_recv(&_globals.channels, cid, &obj);
2214 if (handle_channel_error(err, self, cid)) {
2215 return NULL;
2216 }
2217 Py_XINCREF(dflt);
2218 if (obj == NULL) {
2219 // Use the default.
2220 if (dflt == NULL) {
2221 (void)handle_channel_error(ERR_CHANNEL_EMPTY, self, cid);
2222 return NULL;
2223 }
2224 obj = Py_NewRef(dflt);
2225 }
2226 Py_XDECREF(dflt);
2227 return obj;
2228 }
2229
2230 PyDoc_STRVAR(channel_recv_doc,
2231 "channel_recv(cid, [default]) -> obj\n\
2232 \n\
2233 Return a new object from the data at the front of the channel's queue.\n\
2234 \n\
2235 If there is nothing to receive then raise ChannelEmptyError, unless\n\
2236 a default value is provided. In that case return it.");
2237
2238 static PyObject *
2239 channel_close(PyObject *self, PyObject *args, PyObject *kwds)
2240 {
2241 static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
2242 int64_t cid;
2243 struct channel_id_converter_data cid_data = {
2244 .module = self,
2245 };
2246 int send = 0;
2247 int recv = 0;
2248 int force = 0;
2249 if (!PyArg_ParseTupleAndKeywords(args, kwds,
2250 "O&|$ppp:channel_close", kwlist,
2251 channel_id_converter, &cid_data,
2252 &send, &recv, &force)) {
2253 return NULL;
2254 }
2255 cid = cid_data.cid;
2256
2257 int err = _channel_close(&_globals.channels, cid, send-recv, force);
2258 if (handle_channel_error(err, self, cid)) {
2259 return NULL;
2260 }
2261 Py_RETURN_NONE;
2262 }
2263
2264 PyDoc_STRVAR(channel_close_doc,
2265 "channel_close(cid, *, send=None, recv=None, force=False)\n\
2266 \n\
2267 Close the channel for all interpreters.\n\
2268 \n\
2269 If the channel is empty then the keyword args are ignored and both\n\
2270 ends are immediately closed. Otherwise, if 'force' is True then\n\
2271 all queued items are released and both ends are immediately\n\
2272 closed.\n\
2273 \n\
2274 If the channel is not empty *and* 'force' is False then following\n\
2275 happens:\n\
2276 \n\
2277 * recv is True (regardless of send):\n\
2278 - raise ChannelNotEmptyError\n\
2279 * recv is None and send is None:\n\
2280 - raise ChannelNotEmptyError\n\
2281 * send is True and recv is not True:\n\
2282 - fully close the 'send' end\n\
2283 - close the 'recv' end to interpreters not already receiving\n\
2284 - fully close it once empty\n\
2285 \n\
2286 Closing an already closed channel results in a ChannelClosedError.\n\
2287 \n\
2288 Once the channel's ID has no more ref counts in any interpreter\n\
2289 the channel will be destroyed.");
2290
2291 static PyObject *
2292 channel_release(PyObject *self, PyObject *args, PyObject *kwds)
2293 {
2294 // Note that only the current interpreter is affected.
2295 static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
2296 int64_t cid;
2297 struct channel_id_converter_data cid_data = {
2298 .module = self,
2299 };
2300 int send = 0;
2301 int recv = 0;
2302 int force = 0;
2303 if (!PyArg_ParseTupleAndKeywords(args, kwds,
2304 "O&|$ppp:channel_release", kwlist,
2305 channel_id_converter, &cid_data,
2306 &send, &recv, &force)) {
2307 return NULL;
2308 }
2309 cid = cid_data.cid;
2310 if (send == 0 && recv == 0) {
2311 send = 1;
2312 recv = 1;
2313 }
2314
2315 // XXX Handle force is True.
2316 // XXX Fix implicit release.
2317
2318 int err = _channel_drop(&_globals.channels, cid, send, recv);
2319 if (handle_channel_error(err, self, cid)) {
2320 return NULL;
2321 }
2322 Py_RETURN_NONE;
2323 }
2324
2325 PyDoc_STRVAR(channel_release_doc,
2326 "channel_release(cid, *, send=None, recv=None, force=True)\n\
2327 \n\
2328 Close the channel for the current interpreter. 'send' and 'recv'\n\
2329 (bool) may be used to indicate the ends to close. By default both\n\
2330 ends are closed. Closing an already closed end is a noop.");
2331
2332 static PyObject *
2333 channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
2334 {
2335 module_state *state = get_module_state(self);
2336 if (state == NULL) {
2337 return NULL;
2338 }
2339 PyTypeObject *cls = state->ChannelIDType;
2340 PyObject *mod = get_module_from_owned_type(cls);
2341 if (mod == NULL) {
2342 return NULL;
2343 }
2344 PyObject *cid = _channelid_new(mod, cls, args, kwds);
2345 Py_DECREF(mod);
2346 return cid;
2347 }
2348
2349 static PyMethodDef module_functions[] = {
2350 {"create", channel_create,
2351 METH_NOARGS, channel_create_doc},
2352 {"destroy", _PyCFunction_CAST(channel_destroy),
2353 METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
2354 {"list_all", channel_list_all,
2355 METH_NOARGS, channel_list_all_doc},
2356 {"list_interpreters", _PyCFunction_CAST(channel_list_interpreters),
2357 METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
2358 {"send", _PyCFunction_CAST(channel_send),
2359 METH_VARARGS | METH_KEYWORDS, channel_send_doc},
2360 {"recv", _PyCFunction_CAST(channel_recv),
2361 METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
2362 {"close", _PyCFunction_CAST(channel_close),
2363 METH_VARARGS | METH_KEYWORDS, channel_close_doc},
2364 {"release", _PyCFunction_CAST(channel_release),
2365 METH_VARARGS | METH_KEYWORDS, channel_release_doc},
2366 {"_channel_id", _PyCFunction_CAST(channel__channel_id),
2367 METH_VARARGS | METH_KEYWORDS, NULL},
2368
2369 {NULL, NULL} /* sentinel */
2370 };
2371
2372
2373 /* initialization function */
2374
2375 PyDoc_STRVAR(module_doc,
2376 "This module provides primitive operations to manage Python interpreters.\n\
2377 The 'interpreters' module provides a more convenient interface.");
2378
2379 static int
2380 module_exec(PyObject *mod)
2381 {
2382 if (_globals_init() != 0) {
2383 return -1;
2384 }
2385
2386 /* Add exception types */
2387 if (exceptions_init(mod) != 0) {
2388 goto error;
2389 }
2390
2391 /* Add other types */
2392 module_state *state = get_module_state(mod);
2393 if (state == NULL) {
2394 goto error;
2395 }
2396
2397 // ChannelID
2398 state->ChannelIDType = add_new_type(
2399 mod, &ChannelIDType_spec, _channelid_shared);
2400 if (state->ChannelIDType == NULL) {
2401 goto error;
2402 }
2403
2404 // Make sure chnnels drop objects owned by this interpreter
2405 PyInterpreterState *interp = _get_current_interp();
2406 _Py_AtExit(interp, clear_interpreter, (void *)interp);
2407
2408 return 0;
2409
2410 error:
2411 _globals_fini();
2412 return -1;
2413 }
2414
2415 static struct PyModuleDef_Slot module_slots[] = {
2416 {Py_mod_exec, module_exec},
2417 {Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED},
2418 {0, NULL},
2419 };
2420
2421 static int
2422 module_traverse(PyObject *mod, visitproc visit, void *arg)
2423 {
2424 module_state *state = get_module_state(mod);
2425 assert(state != NULL);
2426 traverse_module_state(state, visit, arg);
2427 return 0;
2428 }
2429
2430 static int
2431 module_clear(PyObject *mod)
2432 {
2433 module_state *state = get_module_state(mod);
2434 assert(state != NULL);
2435 clear_module_state(state);
2436 return 0;
2437 }
2438
2439 static void
2440 module_free(void *mod)
2441 {
2442 module_state *state = get_module_state(mod);
2443 assert(state != NULL);
2444 clear_module_state(state);
2445 _globals_fini();
2446 }
2447
2448 static struct PyModuleDef moduledef = {
2449 .m_base = PyModuleDef_HEAD_INIT,
2450 .m_name = MODULE_NAME,
2451 .m_doc = module_doc,
2452 .m_size = sizeof(module_state),
2453 .m_methods = module_functions,
2454 .m_slots = module_slots,
2455 .m_traverse = module_traverse,
2456 .m_clear = module_clear,
2457 .m_free = (freefunc)module_free,
2458 };
2459
2460 PyMODINIT_FUNC
2461 PyInit__xxinterpchannels(void)
2462 {
2463 return PyModuleDef_Init(&moduledef);
2464 }