• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 
2 /* interpreters module */
3 /* low-level access to interpreter primitives */
4 
5 #include "Python.h"
6 #include "frameobject.h"
7 #include "interpreteridobject.h"
8 
9 
10 static char *
_copy_raw_string(PyObject * strobj)11 _copy_raw_string(PyObject *strobj)
12 {
13     const char *str = PyUnicode_AsUTF8(strobj);
14     if (str == NULL) {
15         return NULL;
16     }
17     char *copied = PyMem_Malloc(strlen(str)+1);
18     if (copied == NULL) {
19         PyErr_NoMemory();
20         return NULL;
21     }
22     strcpy(copied, str);
23     return copied;
24 }
25 
26 static PyInterpreterState *
_get_current(void)27 _get_current(void)
28 {
29     // _PyInterpreterState_Get() aborts if lookup fails, so don't need
30     // to check the result for NULL.
31     return _PyInterpreterState_Get();
32 }
33 
34 
35 /* data-sharing-specific code ***********************************************/
36 
37 struct _sharednsitem {
38     char *name;
39     _PyCrossInterpreterData data;
40 };
41 
42 static void _sharednsitem_clear(struct _sharednsitem *);  // forward
43 
44 static int
_sharednsitem_init(struct _sharednsitem * item,PyObject * key,PyObject * value)45 _sharednsitem_init(struct _sharednsitem *item, PyObject *key, PyObject *value)
46 {
47     item->name = _copy_raw_string(key);
48     if (item->name == NULL) {
49         return -1;
50     }
51     if (_PyObject_GetCrossInterpreterData(value, &item->data) != 0) {
52         _sharednsitem_clear(item);
53         return -1;
54     }
55     return 0;
56 }
57 
58 static void
_sharednsitem_clear(struct _sharednsitem * item)59 _sharednsitem_clear(struct _sharednsitem *item)
60 {
61     if (item->name != NULL) {
62         PyMem_Free(item->name);
63         item->name = NULL;
64     }
65     _PyCrossInterpreterData_Release(&item->data);
66 }
67 
68 static int
_sharednsitem_apply(struct _sharednsitem * item,PyObject * ns)69 _sharednsitem_apply(struct _sharednsitem *item, PyObject *ns)
70 {
71     PyObject *name = PyUnicode_FromString(item->name);
72     if (name == NULL) {
73         return -1;
74     }
75     PyObject *value = _PyCrossInterpreterData_NewObject(&item->data);
76     if (value == NULL) {
77         Py_DECREF(name);
78         return -1;
79     }
80     int res = PyDict_SetItem(ns, name, value);
81     Py_DECREF(name);
82     Py_DECREF(value);
83     return res;
84 }
85 
86 typedef struct _sharedns {
87     Py_ssize_t len;
88     struct _sharednsitem* items;
89 } _sharedns;
90 
91 static _sharedns *
_sharedns_new(Py_ssize_t len)92 _sharedns_new(Py_ssize_t len)
93 {
94     _sharedns *shared = PyMem_NEW(_sharedns, 1);
95     if (shared == NULL) {
96         PyErr_NoMemory();
97         return NULL;
98     }
99     shared->len = len;
100     shared->items = PyMem_NEW(struct _sharednsitem, len);
101     if (shared->items == NULL) {
102         PyErr_NoMemory();
103         PyMem_Free(shared);
104         return NULL;
105     }
106     return shared;
107 }
108 
109 static void
_sharedns_free(_sharedns * shared)110 _sharedns_free(_sharedns *shared)
111 {
112     for (Py_ssize_t i=0; i < shared->len; i++) {
113         _sharednsitem_clear(&shared->items[i]);
114     }
115     PyMem_Free(shared->items);
116     PyMem_Free(shared);
117 }
118 
119 static _sharedns *
_get_shared_ns(PyObject * shareable)120 _get_shared_ns(PyObject *shareable)
121 {
122     if (shareable == NULL || shareable == Py_None) {
123         return NULL;
124     }
125     Py_ssize_t len = PyDict_Size(shareable);
126     if (len == 0) {
127         return NULL;
128     }
129 
130     _sharedns *shared = _sharedns_new(len);
131     if (shared == NULL) {
132         return NULL;
133     }
134     Py_ssize_t pos = 0;
135     for (Py_ssize_t i=0; i < len; i++) {
136         PyObject *key, *value;
137         if (PyDict_Next(shareable, &pos, &key, &value) == 0) {
138             break;
139         }
140         if (_sharednsitem_init(&shared->items[i], key, value) != 0) {
141             break;
142         }
143     }
144     if (PyErr_Occurred()) {
145         _sharedns_free(shared);
146         return NULL;
147     }
148     return shared;
149 }
150 
151 static int
_sharedns_apply(_sharedns * shared,PyObject * ns)152 _sharedns_apply(_sharedns *shared, PyObject *ns)
153 {
154     for (Py_ssize_t i=0; i < shared->len; i++) {
155         if (_sharednsitem_apply(&shared->items[i], ns) != 0) {
156             return -1;
157         }
158     }
159     return 0;
160 }
161 
162 // Ultimately we'd like to preserve enough information about the
163 // exception and traceback that we could re-constitute (or at least
164 // simulate, a la traceback.TracebackException), and even chain, a copy
165 // of the exception in the calling interpreter.
166 
167 typedef struct _sharedexception {
168     char *name;
169     char *msg;
170 } _sharedexception;
171 
172 static _sharedexception *
_sharedexception_new(void)173 _sharedexception_new(void)
174 {
175     _sharedexception *err = PyMem_NEW(_sharedexception, 1);
176     if (err == NULL) {
177         PyErr_NoMemory();
178         return NULL;
179     }
180     err->name = NULL;
181     err->msg = NULL;
182     return err;
183 }
184 
185 static void
_sharedexception_clear(_sharedexception * exc)186 _sharedexception_clear(_sharedexception *exc)
187 {
188     if (exc->name != NULL) {
189         PyMem_Free(exc->name);
190     }
191     if (exc->msg != NULL) {
192         PyMem_Free(exc->msg);
193     }
194 }
195 
196 static void
_sharedexception_free(_sharedexception * exc)197 _sharedexception_free(_sharedexception *exc)
198 {
199     _sharedexception_clear(exc);
200     PyMem_Free(exc);
201 }
202 
203 static _sharedexception *
_sharedexception_bind(PyObject * exctype,PyObject * exc,PyObject * tb)204 _sharedexception_bind(PyObject *exctype, PyObject *exc, PyObject *tb)
205 {
206     assert(exctype != NULL);
207     char *failure = NULL;
208 
209     _sharedexception *err = _sharedexception_new();
210     if (err == NULL) {
211         goto finally;
212     }
213 
214     PyObject *name = PyUnicode_FromFormat("%S", exctype);
215     if (name == NULL) {
216         failure = "unable to format exception type name";
217         goto finally;
218     }
219     err->name = _copy_raw_string(name);
220     Py_DECREF(name);
221     if (err->name == NULL) {
222         if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
223             failure = "out of memory copying exception type name";
224         }
225         failure = "unable to encode and copy exception type name";
226         goto finally;
227     }
228 
229     if (exc != NULL) {
230         PyObject *msg = PyUnicode_FromFormat("%S", exc);
231         if (msg == NULL) {
232             failure = "unable to format exception message";
233             goto finally;
234         }
235         err->msg = _copy_raw_string(msg);
236         Py_DECREF(msg);
237         if (err->msg == NULL) {
238             if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
239                 failure = "out of memory copying exception message";
240             }
241             failure = "unable to encode and copy exception message";
242             goto finally;
243         }
244     }
245 
246 finally:
247     if (failure != NULL) {
248         PyErr_Clear();
249         if (err->name != NULL) {
250             PyMem_Free(err->name);
251             err->name = NULL;
252         }
253         err->msg = failure;
254     }
255     return err;
256 }
257 
258 static void
_sharedexception_apply(_sharedexception * exc,PyObject * wrapperclass)259 _sharedexception_apply(_sharedexception *exc, PyObject *wrapperclass)
260 {
261     if (exc->name != NULL) {
262         if (exc->msg != NULL) {
263             PyErr_Format(wrapperclass, "%s: %s",  exc->name, exc->msg);
264         }
265         else {
266             PyErr_SetString(wrapperclass, exc->name);
267         }
268     }
269     else if (exc->msg != NULL) {
270         PyErr_SetString(wrapperclass, exc->msg);
271     }
272     else {
273         PyErr_SetNone(wrapperclass);
274     }
275 }
276 
277 
278 /* channel-specific code ****************************************************/
279 
280 #define CHANNEL_SEND 1
281 #define CHANNEL_BOTH 0
282 #define CHANNEL_RECV -1
283 
284 static PyObject *ChannelError;
285 static PyObject *ChannelNotFoundError;
286 static PyObject *ChannelClosedError;
287 static PyObject *ChannelEmptyError;
288 static PyObject *ChannelNotEmptyError;
289 
290 static int
channel_exceptions_init(PyObject * ns)291 channel_exceptions_init(PyObject *ns)
292 {
293     // XXX Move the exceptions into per-module memory?
294 
295     // A channel-related operation failed.
296     ChannelError = PyErr_NewException("_xxsubinterpreters.ChannelError",
297                                       PyExc_RuntimeError, NULL);
298     if (ChannelError == NULL) {
299         return -1;
300     }
301     if (PyDict_SetItemString(ns, "ChannelError", ChannelError) != 0) {
302         return -1;
303     }
304 
305     // An operation tried to use a channel that doesn't exist.
306     ChannelNotFoundError = PyErr_NewException(
307             "_xxsubinterpreters.ChannelNotFoundError", ChannelError, NULL);
308     if (ChannelNotFoundError == NULL) {
309         return -1;
310     }
311     if (PyDict_SetItemString(ns, "ChannelNotFoundError", ChannelNotFoundError) != 0) {
312         return -1;
313     }
314 
315     // An operation tried to use a closed channel.
316     ChannelClosedError = PyErr_NewException(
317             "_xxsubinterpreters.ChannelClosedError", ChannelError, NULL);
318     if (ChannelClosedError == NULL) {
319         return -1;
320     }
321     if (PyDict_SetItemString(ns, "ChannelClosedError", ChannelClosedError) != 0) {
322         return -1;
323     }
324 
325     // An operation tried to pop from an empty channel.
326     ChannelEmptyError = PyErr_NewException(
327             "_xxsubinterpreters.ChannelEmptyError", ChannelError, NULL);
328     if (ChannelEmptyError == NULL) {
329         return -1;
330     }
331     if (PyDict_SetItemString(ns, "ChannelEmptyError", ChannelEmptyError) != 0) {
332         return -1;
333     }
334 
335     // An operation tried to close a non-empty channel.
336     ChannelNotEmptyError = PyErr_NewException(
337             "_xxsubinterpreters.ChannelNotEmptyError", ChannelError, NULL);
338     if (ChannelNotEmptyError == NULL) {
339         return -1;
340     }
341     if (PyDict_SetItemString(ns, "ChannelNotEmptyError", ChannelNotEmptyError) != 0) {
342         return -1;
343     }
344 
345     return 0;
346 }
347 
348 /* the channel queue */
349 
350 struct _channelitem;
351 
352 typedef struct _channelitem {
353     _PyCrossInterpreterData *data;
354     struct _channelitem *next;
355 } _channelitem;
356 
357 static _channelitem *
_channelitem_new(void)358 _channelitem_new(void)
359 {
360     _channelitem *item = PyMem_NEW(_channelitem, 1);
361     if (item == NULL) {
362         PyErr_NoMemory();
363         return NULL;
364     }
365     item->data = NULL;
366     item->next = NULL;
367     return item;
368 }
369 
370 static void
_channelitem_clear(_channelitem * item)371 _channelitem_clear(_channelitem *item)
372 {
373     if (item->data != NULL) {
374         _PyCrossInterpreterData_Release(item->data);
375         PyMem_Free(item->data);
376         item->data = NULL;
377     }
378     item->next = NULL;
379 }
380 
381 static void
_channelitem_free(_channelitem * item)382 _channelitem_free(_channelitem *item)
383 {
384     _channelitem_clear(item);
385     PyMem_Free(item);
386 }
387 
388 static void
_channelitem_free_all(_channelitem * item)389 _channelitem_free_all(_channelitem *item)
390 {
391     while (item != NULL) {
392         _channelitem *last = item;
393         item = item->next;
394         _channelitem_free(last);
395     }
396 }
397 
398 static _PyCrossInterpreterData *
_channelitem_popped(_channelitem * item)399 _channelitem_popped(_channelitem *item)
400 {
401     _PyCrossInterpreterData *data = item->data;
402     item->data = NULL;
403     _channelitem_free(item);
404     return data;
405 }
406 
407 typedef struct _channelqueue {
408     int64_t count;
409     _channelitem *first;
410     _channelitem *last;
411 } _channelqueue;
412 
413 static _channelqueue *
_channelqueue_new(void)414 _channelqueue_new(void)
415 {
416     _channelqueue *queue = PyMem_NEW(_channelqueue, 1);
417     if (queue == NULL) {
418         PyErr_NoMemory();
419         return NULL;
420     }
421     queue->count = 0;
422     queue->first = NULL;
423     queue->last = NULL;
424     return queue;
425 }
426 
427 static void
_channelqueue_clear(_channelqueue * queue)428 _channelqueue_clear(_channelqueue *queue)
429 {
430     _channelitem_free_all(queue->first);
431     queue->count = 0;
432     queue->first = NULL;
433     queue->last = NULL;
434 }
435 
436 static void
_channelqueue_free(_channelqueue * queue)437 _channelqueue_free(_channelqueue *queue)
438 {
439     _channelqueue_clear(queue);
440     PyMem_Free(queue);
441 }
442 
443 static int
_channelqueue_put(_channelqueue * queue,_PyCrossInterpreterData * data)444 _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
445 {
446     _channelitem *item = _channelitem_new();
447     if (item == NULL) {
448         return -1;
449     }
450     item->data = data;
451 
452     queue->count += 1;
453     if (queue->first == NULL) {
454         queue->first = item;
455     }
456     else {
457         queue->last->next = item;
458     }
459     queue->last = item;
460     return 0;
461 }
462 
463 static _PyCrossInterpreterData *
_channelqueue_get(_channelqueue * queue)464 _channelqueue_get(_channelqueue *queue)
465 {
466     _channelitem *item = queue->first;
467     if (item == NULL) {
468         return NULL;
469     }
470     queue->first = item->next;
471     if (queue->last == item) {
472         queue->last = NULL;
473     }
474     queue->count -= 1;
475 
476     return _channelitem_popped(item);
477 }
478 
479 /* channel-interpreter associations */
480 
481 struct _channelend;
482 
483 typedef struct _channelend {
484     struct _channelend *next;
485     int64_t interp;
486     int open;
487 } _channelend;
488 
489 static _channelend *
_channelend_new(int64_t interp)490 _channelend_new(int64_t interp)
491 {
492     _channelend *end = PyMem_NEW(_channelend, 1);
493     if (end == NULL) {
494         PyErr_NoMemory();
495         return NULL;
496     }
497     end->next = NULL;
498     end->interp = interp;
499     end->open = 1;
500     return end;
501 }
502 
503 static void
_channelend_free(_channelend * end)504 _channelend_free(_channelend *end)
505 {
506     PyMem_Free(end);
507 }
508 
509 static void
_channelend_free_all(_channelend * end)510 _channelend_free_all(_channelend *end)
511 {
512     while (end != NULL) {
513         _channelend *last = end;
514         end = end->next;
515         _channelend_free(last);
516     }
517 }
518 
519 static _channelend *
_channelend_find(_channelend * first,int64_t interp,_channelend ** pprev)520 _channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
521 {
522     _channelend *prev = NULL;
523     _channelend *end = first;
524     while (end != NULL) {
525         if (end->interp == interp) {
526             break;
527         }
528         prev = end;
529         end = end->next;
530     }
531     if (pprev != NULL) {
532         *pprev = prev;
533     }
534     return end;
535 }
536 
537 typedef struct _channelassociations {
538     // Note that the list entries are never removed for interpreter
539     // for which the channel is closed.  This should be a problem in
540     // practice.  Also, a channel isn't automatically closed when an
541     // interpreter is destroyed.
542     int64_t numsendopen;
543     int64_t numrecvopen;
544     _channelend *send;
545     _channelend *recv;
546 } _channelends;
547 
548 static _channelends *
_channelends_new(void)549 _channelends_new(void)
550 {
551     _channelends *ends = PyMem_NEW(_channelends, 1);
552     if (ends== NULL) {
553         return NULL;
554     }
555     ends->numsendopen = 0;
556     ends->numrecvopen = 0;
557     ends->send = NULL;
558     ends->recv = NULL;
559     return ends;
560 }
561 
562 static void
_channelends_clear(_channelends * ends)563 _channelends_clear(_channelends *ends)
564 {
565     _channelend_free_all(ends->send);
566     ends->send = NULL;
567     ends->numsendopen = 0;
568 
569     _channelend_free_all(ends->recv);
570     ends->recv = NULL;
571     ends->numrecvopen = 0;
572 }
573 
574 static void
_channelends_free(_channelends * ends)575 _channelends_free(_channelends *ends)
576 {
577     _channelends_clear(ends);
578     PyMem_Free(ends);
579 }
580 
581 static _channelend *
_channelends_add(_channelends * ends,_channelend * prev,int64_t interp,int send)582 _channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
583                  int send)
584 {
585     _channelend *end = _channelend_new(interp);
586     if (end == NULL) {
587         return NULL;
588     }
589 
590     if (prev == NULL) {
591         if (send) {
592             ends->send = end;
593         }
594         else {
595             ends->recv = end;
596         }
597     }
598     else {
599         prev->next = end;
600     }
601     if (send) {
602         ends->numsendopen += 1;
603     }
604     else {
605         ends->numrecvopen += 1;
606     }
607     return end;
608 }
609 
610 static int
_channelends_associate(_channelends * ends,int64_t interp,int send)611 _channelends_associate(_channelends *ends, int64_t interp, int send)
612 {
613     _channelend *prev;
614     _channelend *end = _channelend_find(send ? ends->send : ends->recv,
615                                         interp, &prev);
616     if (end != NULL) {
617         if (!end->open) {
618             PyErr_SetString(ChannelClosedError, "channel already closed");
619             return -1;
620         }
621         // already associated
622         return 0;
623     }
624     if (_channelends_add(ends, prev, interp, send) == NULL) {
625         return -1;
626     }
627     return 0;
628 }
629 
630 static int
_channelends_is_open(_channelends * ends)631 _channelends_is_open(_channelends *ends)
632 {
633     if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
634         return 1;
635     }
636     if (ends->send == NULL && ends->recv == NULL) {
637         return 1;
638     }
639     return 0;
640 }
641 
642 static void
_channelends_close_end(_channelends * ends,_channelend * end,int send)643 _channelends_close_end(_channelends *ends, _channelend *end, int send)
644 {
645     end->open = 0;
646     if (send) {
647         ends->numsendopen -= 1;
648     }
649     else {
650         ends->numrecvopen -= 1;
651     }
652 }
653 
654 static int
_channelends_close_interpreter(_channelends * ends,int64_t interp,int which)655 _channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
656 {
657     _channelend *prev;
658     _channelend *end;
659     if (which >= 0) {  // send/both
660         end = _channelend_find(ends->send, interp, &prev);
661         if (end == NULL) {
662             // never associated so add it
663             end = _channelends_add(ends, prev, interp, 1);
664             if (end == NULL) {
665                 return -1;
666             }
667         }
668         _channelends_close_end(ends, end, 1);
669     }
670     if (which <= 0) {  // recv/both
671         end = _channelend_find(ends->recv, interp, &prev);
672         if (end == NULL) {
673             // never associated so add it
674             end = _channelends_add(ends, prev, interp, 0);
675             if (end == NULL) {
676                 return -1;
677             }
678         }
679         _channelends_close_end(ends, end, 0);
680     }
681     return 0;
682 }
683 
684 static void
_channelends_close_all(_channelends * ends,int which,int force)685 _channelends_close_all(_channelends *ends, int which, int force)
686 {
687     // XXX Handle the ends.
688     // XXX Handle force is True.
689 
690     // Ensure all the "send"-associated interpreters are closed.
691     _channelend *end;
692     for (end = ends->send; end != NULL; end = end->next) {
693         _channelends_close_end(ends, end, 1);
694     }
695 
696     // Ensure all the "recv"-associated interpreters are closed.
697     for (end = ends->recv; end != NULL; end = end->next) {
698         _channelends_close_end(ends, end, 0);
699     }
700 }
701 
702 /* channels */
703 
704 struct _channel;
705 struct _channel_closing;
706 static void _channel_clear_closing(struct _channel *);
707 static void _channel_finish_closing(struct _channel *);
708 
709 typedef struct _channel {
710     PyThread_type_lock mutex;
711     _channelqueue *queue;
712     _channelends *ends;
713     int open;
714     struct _channel_closing *closing;
715 } _PyChannelState;
716 
717 static _PyChannelState *
_channel_new(void)718 _channel_new(void)
719 {
720     _PyChannelState *chan = PyMem_NEW(_PyChannelState, 1);
721     if (chan == NULL) {
722         return NULL;
723     }
724     chan->mutex = PyThread_allocate_lock();
725     if (chan->mutex == NULL) {
726         PyMem_Free(chan);
727         PyErr_SetString(ChannelError,
728                         "can't initialize mutex for new channel");
729         return NULL;
730     }
731     chan->queue = _channelqueue_new();
732     if (chan->queue == NULL) {
733         PyMem_Free(chan);
734         return NULL;
735     }
736     chan->ends = _channelends_new();
737     if (chan->ends == NULL) {
738         _channelqueue_free(chan->queue);
739         PyMem_Free(chan);
740         return NULL;
741     }
742     chan->open = 1;
743     chan->closing = NULL;
744     return chan;
745 }
746 
747 static void
_channel_free(_PyChannelState * chan)748 _channel_free(_PyChannelState *chan)
749 {
750     _channel_clear_closing(chan);
751     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
752     _channelqueue_free(chan->queue);
753     _channelends_free(chan->ends);
754     PyThread_release_lock(chan->mutex);
755 
756     PyThread_free_lock(chan->mutex);
757     PyMem_Free(chan);
758 }
759 
760 static int
_channel_add(_PyChannelState * chan,int64_t interp,_PyCrossInterpreterData * data)761 _channel_add(_PyChannelState *chan, int64_t interp,
762              _PyCrossInterpreterData *data)
763 {
764     int res = -1;
765     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
766 
767     if (!chan->open) {
768         PyErr_SetString(ChannelClosedError, "channel closed");
769         goto done;
770     }
771     if (_channelends_associate(chan->ends, interp, 1) != 0) {
772         goto done;
773     }
774 
775     if (_channelqueue_put(chan->queue, data) != 0) {
776         goto done;
777     }
778 
779     res = 0;
780 done:
781     PyThread_release_lock(chan->mutex);
782     return res;
783 }
784 
785 static _PyCrossInterpreterData *
_channel_next(_PyChannelState * chan,int64_t interp)786 _channel_next(_PyChannelState *chan, int64_t interp)
787 {
788     _PyCrossInterpreterData *data = NULL;
789     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
790 
791     if (!chan->open) {
792         PyErr_SetString(ChannelClosedError, "channel closed");
793         goto done;
794     }
795     if (_channelends_associate(chan->ends, interp, 0) != 0) {
796         goto done;
797     }
798 
799     data = _channelqueue_get(chan->queue);
800     if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) {
801         chan->open = 0;
802     }
803 
804 done:
805     PyThread_release_lock(chan->mutex);
806     if (chan->queue->count == 0) {
807         _channel_finish_closing(chan);
808     }
809     return data;
810 }
811 
812 static int
_channel_close_interpreter(_PyChannelState * chan,int64_t interp,int end)813 _channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
814 {
815     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
816 
817     int res = -1;
818     if (!chan->open) {
819         PyErr_SetString(ChannelClosedError, "channel already closed");
820         goto done;
821     }
822 
823     if (_channelends_close_interpreter(chan->ends, interp, end) != 0) {
824         goto done;
825     }
826     chan->open = _channelends_is_open(chan->ends);
827 
828     res = 0;
829 done:
830     PyThread_release_lock(chan->mutex);
831     return res;
832 }
833 
834 static int
_channel_close_all(_PyChannelState * chan,int end,int force)835 _channel_close_all(_PyChannelState *chan, int end, int force)
836 {
837     int res = -1;
838     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
839 
840     if (!chan->open) {
841         PyErr_SetString(ChannelClosedError, "channel already closed");
842         goto done;
843     }
844 
845     if (!force && chan->queue->count > 0) {
846         PyErr_SetString(ChannelNotEmptyError,
847                         "may not be closed if not empty (try force=True)");
848         goto done;
849     }
850 
851     chan->open = 0;
852 
853     // We *could* also just leave these in place, since we've marked
854     // the channel as closed already.
855     _channelends_close_all(chan->ends, end, force);
856 
857     res = 0;
858 done:
859     PyThread_release_lock(chan->mutex);
860     return res;
861 }
862 
863 /* the set of channels */
864 
865 struct _channelref;
866 
867 typedef struct _channelref {
868     int64_t id;
869     _PyChannelState *chan;
870     struct _channelref *next;
871     Py_ssize_t objcount;
872 } _channelref;
873 
874 static _channelref *
_channelref_new(int64_t id,_PyChannelState * chan)875 _channelref_new(int64_t id, _PyChannelState *chan)
876 {
877     _channelref *ref = PyMem_NEW(_channelref, 1);
878     if (ref == NULL) {
879         return NULL;
880     }
881     ref->id = id;
882     ref->chan = chan;
883     ref->next = NULL;
884     ref->objcount = 0;
885     return ref;
886 }
887 
888 //static void
889 //_channelref_clear(_channelref *ref)
890 //{
891 //    ref->id = -1;
892 //    ref->chan = NULL;
893 //    ref->next = NULL;
894 //    ref->objcount = 0;
895 //}
896 
897 static void
_channelref_free(_channelref * ref)898 _channelref_free(_channelref *ref)
899 {
900     if (ref->chan != NULL) {
901         _channel_clear_closing(ref->chan);
902     }
903     //_channelref_clear(ref);
904     PyMem_Free(ref);
905 }
906 
907 static _channelref *
_channelref_find(_channelref * first,int64_t id,_channelref ** pprev)908 _channelref_find(_channelref *first, int64_t id, _channelref **pprev)
909 {
910     _channelref *prev = NULL;
911     _channelref *ref = first;
912     while (ref != NULL) {
913         if (ref->id == id) {
914             break;
915         }
916         prev = ref;
917         ref = ref->next;
918     }
919     if (pprev != NULL) {
920         *pprev = prev;
921     }
922     return ref;
923 }
924 
925 typedef struct _channels {
926     PyThread_type_lock mutex;
927     _channelref *head;
928     int64_t numopen;
929     int64_t next_id;
930 } _channels;
931 
932 static int
_channels_init(_channels * channels)933 _channels_init(_channels *channels)
934 {
935     if (channels->mutex == NULL) {
936         channels->mutex = PyThread_allocate_lock();
937         if (channels->mutex == NULL) {
938             PyErr_SetString(ChannelError,
939                             "can't initialize mutex for channel management");
940             return -1;
941         }
942     }
943     channels->head = NULL;
944     channels->numopen = 0;
945     channels->next_id = 0;
946     return 0;
947 }
948 
949 static int64_t
_channels_next_id(_channels * channels)950 _channels_next_id(_channels *channels)  // needs lock
951 {
952     int64_t id = channels->next_id;
953     if (id < 0) {
954         /* overflow */
955         PyErr_SetString(ChannelError,
956                         "failed to get a channel ID");
957         return -1;
958     }
959     channels->next_id += 1;
960     return id;
961 }
962 
963 static _PyChannelState *
_channels_lookup(_channels * channels,int64_t id,PyThread_type_lock * pmutex)964 _channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex)
965 {
966     _PyChannelState *chan = NULL;
967     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
968     if (pmutex != NULL) {
969         *pmutex = NULL;
970     }
971 
972     _channelref *ref = _channelref_find(channels->head, id, NULL);
973     if (ref == NULL) {
974         PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
975         goto done;
976     }
977     if (ref->chan == NULL || !ref->chan->open) {
978         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id);
979         goto done;
980     }
981 
982     if (pmutex != NULL) {
983         // The mutex will be closed by the caller.
984         *pmutex = channels->mutex;
985     }
986 
987     chan = ref->chan;
988 done:
989     if (pmutex == NULL || *pmutex == NULL) {
990         PyThread_release_lock(channels->mutex);
991     }
992     return chan;
993 }
994 
995 static int64_t
_channels_add(_channels * channels,_PyChannelState * chan)996 _channels_add(_channels *channels, _PyChannelState *chan)
997 {
998     int64_t cid = -1;
999     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1000 
1001     // Create a new ref.
1002     int64_t id = _channels_next_id(channels);
1003     if (id < 0) {
1004         goto done;
1005     }
1006     _channelref *ref = _channelref_new(id, chan);
1007     if (ref == NULL) {
1008         goto done;
1009     }
1010 
1011     // Add it to the list.
1012     // We assume that the channel is a new one (not already in the list).
1013     ref->next = channels->head;
1014     channels->head = ref;
1015     channels->numopen += 1;
1016 
1017     cid = id;
1018 done:
1019     PyThread_release_lock(channels->mutex);
1020     return cid;
1021 }
1022 
1023 /* forward */
1024 static int _channel_set_closing(struct _channelref *, PyThread_type_lock);
1025 
1026 static int
_channels_close(_channels * channels,int64_t cid,_PyChannelState ** pchan,int end,int force)1027 _channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan,
1028                 int end, int force)
1029 {
1030     int res = -1;
1031     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1032     if (pchan != NULL) {
1033         *pchan = NULL;
1034     }
1035 
1036     _channelref *ref = _channelref_find(channels->head, cid, NULL);
1037     if (ref == NULL) {
1038         PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", cid);
1039         goto done;
1040     }
1041 
1042     if (ref->chan == NULL) {
1043         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
1044         goto done;
1045     }
1046     else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) {
1047         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
1048         goto done;
1049     }
1050     else {
1051         if (_channel_close_all(ref->chan, end, force) != 0) {
1052             if (end == CHANNEL_SEND &&
1053                     PyErr_ExceptionMatches(ChannelNotEmptyError)) {
1054                 if (ref->chan->closing != NULL) {
1055                     PyErr_Format(ChannelClosedError,
1056                                  "channel %" PRId64 " closed", cid);
1057                     goto done;
1058                 }
1059                 // Mark the channel as closing and return.  The channel
1060                 // will be cleaned up in _channel_next().
1061                 PyErr_Clear();
1062                 if (_channel_set_closing(ref, channels->mutex) != 0) {
1063                     goto done;
1064                 }
1065                 if (pchan != NULL) {
1066                     *pchan = ref->chan;
1067                 }
1068                 res = 0;
1069             }
1070             goto done;
1071         }
1072         if (pchan != NULL) {
1073             *pchan = ref->chan;
1074         }
1075         else  {
1076             _channel_free(ref->chan);
1077         }
1078         ref->chan = NULL;
1079     }
1080 
1081     res = 0;
1082 done:
1083     PyThread_release_lock(channels->mutex);
1084     return res;
1085 }
1086 
1087 static void
_channels_remove_ref(_channels * channels,_channelref * ref,_channelref * prev,_PyChannelState ** pchan)1088 _channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
1089                      _PyChannelState **pchan)
1090 {
1091     if (ref == channels->head) {
1092         channels->head = ref->next;
1093     }
1094     else {
1095         prev->next = ref->next;
1096     }
1097     channels->numopen -= 1;
1098 
1099     if (pchan != NULL) {
1100         *pchan = ref->chan;
1101     }
1102     _channelref_free(ref);
1103 }
1104 
1105 static int
_channels_remove(_channels * channels,int64_t id,_PyChannelState ** pchan)1106 _channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan)
1107 {
1108     int res = -1;
1109     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1110 
1111     if (pchan != NULL) {
1112         *pchan = NULL;
1113     }
1114 
1115     _channelref *prev = NULL;
1116     _channelref *ref = _channelref_find(channels->head, id, &prev);
1117     if (ref == NULL) {
1118         PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
1119         goto done;
1120     }
1121 
1122     _channels_remove_ref(channels, ref, prev, pchan);
1123 
1124     res = 0;
1125 done:
1126     PyThread_release_lock(channels->mutex);
1127     return res;
1128 }
1129 
1130 static int
_channels_add_id_object(_channels * channels,int64_t id)1131 _channels_add_id_object(_channels *channels, int64_t id)
1132 {
1133     int res = -1;
1134     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1135 
1136     _channelref *ref = _channelref_find(channels->head, id, NULL);
1137     if (ref == NULL) {
1138         PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
1139         goto done;
1140     }
1141     ref->objcount += 1;
1142 
1143     res = 0;
1144 done:
1145     PyThread_release_lock(channels->mutex);
1146     return res;
1147 }
1148 
1149 static void
_channels_drop_id_object(_channels * channels,int64_t id)1150 _channels_drop_id_object(_channels *channels, int64_t id)
1151 {
1152     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1153 
1154     _channelref *prev = NULL;
1155     _channelref *ref = _channelref_find(channels->head, id, &prev);
1156     if (ref == NULL) {
1157         // Already destroyed.
1158         goto done;
1159     }
1160     ref->objcount -= 1;
1161 
1162     // Destroy if no longer used.
1163     if (ref->objcount == 0) {
1164         _PyChannelState *chan = NULL;
1165         _channels_remove_ref(channels, ref, prev, &chan);
1166         if (chan != NULL) {
1167             _channel_free(chan);
1168         }
1169     }
1170 
1171 done:
1172     PyThread_release_lock(channels->mutex);
1173 }
1174 
1175 static int64_t *
_channels_list_all(_channels * channels,int64_t * count)1176 _channels_list_all(_channels *channels, int64_t *count)
1177 {
1178     int64_t *cids = NULL;
1179     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1180     int64_t numopen = channels->numopen;
1181     if (numopen >= PY_SSIZE_T_MAX) {
1182         PyErr_SetString(PyExc_RuntimeError, "too many channels open");
1183         goto done;
1184     }
1185     int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
1186     if (ids == NULL) {
1187         goto done;
1188     }
1189     _channelref *ref = channels->head;
1190     for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
1191         ids[i] = ref->id;
1192     }
1193     *count = channels->numopen;
1194 
1195     cids = ids;
1196 done:
1197     PyThread_release_lock(channels->mutex);
1198     return cids;
1199 }
1200 
1201 /* support for closing non-empty channels */
1202 
1203 struct _channel_closing {
1204     struct _channelref *ref;
1205 };
1206 
1207 static int
_channel_set_closing(struct _channelref * ref,PyThread_type_lock mutex)1208 _channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
1209     struct _channel *chan = ref->chan;
1210     if (chan == NULL) {
1211         // already closed
1212         return 0;
1213     }
1214     int res = -1;
1215     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
1216     if (chan->closing != NULL) {
1217         PyErr_SetString(ChannelClosedError, "channel closed");
1218         goto done;
1219     }
1220     chan->closing = PyMem_NEW(struct _channel_closing, 1);
1221     if (chan->closing == NULL) {
1222         goto done;
1223     }
1224     chan->closing->ref = ref;
1225 
1226     res = 0;
1227 done:
1228     PyThread_release_lock(chan->mutex);
1229     return res;
1230 }
1231 
1232 static void
_channel_clear_closing(struct _channel * chan)1233 _channel_clear_closing(struct _channel *chan) {
1234     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
1235     if (chan->closing != NULL) {
1236         PyMem_Free(chan->closing);
1237         chan->closing = NULL;
1238     }
1239     PyThread_release_lock(chan->mutex);
1240 }
1241 
1242 static void
_channel_finish_closing(struct _channel * chan)1243 _channel_finish_closing(struct _channel *chan) {
1244     struct _channel_closing *closing = chan->closing;
1245     if (closing == NULL) {
1246         return;
1247     }
1248     _channelref *ref = closing->ref;
1249     _channel_clear_closing(chan);
1250     // Do the things that would have been done in _channels_close().
1251     ref->chan = NULL;
1252     _channel_free(chan);
1253 }
1254 
1255 /* "high"-level channel-related functions */
1256 
1257 static int64_t
_channel_create(_channels * channels)1258 _channel_create(_channels *channels)
1259 {
1260     _PyChannelState *chan = _channel_new();
1261     if (chan == NULL) {
1262         return -1;
1263     }
1264     int64_t id = _channels_add(channels, chan);
1265     if (id < 0) {
1266         _channel_free(chan);
1267         return -1;
1268     }
1269     return id;
1270 }
1271 
1272 static int
_channel_destroy(_channels * channels,int64_t id)1273 _channel_destroy(_channels *channels, int64_t id)
1274 {
1275     _PyChannelState *chan = NULL;
1276     if (_channels_remove(channels, id, &chan) != 0) {
1277         return -1;
1278     }
1279     if (chan != NULL) {
1280         _channel_free(chan);
1281     }
1282     return 0;
1283 }
1284 
1285 static int
_channel_send(_channels * channels,int64_t id,PyObject * obj)1286 _channel_send(_channels *channels, int64_t id, PyObject *obj)
1287 {
1288     PyInterpreterState *interp = _get_current();
1289     if (interp == NULL) {
1290         return -1;
1291     }
1292 
1293     // Look up the channel.
1294     PyThread_type_lock mutex = NULL;
1295     _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1296     if (chan == NULL) {
1297         return -1;
1298     }
1299     // Past this point we are responsible for releasing the mutex.
1300 
1301     if (chan->closing != NULL) {
1302         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id);
1303         PyThread_release_lock(mutex);
1304         return -1;
1305     }
1306 
1307     // Convert the object to cross-interpreter data.
1308     _PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1);
1309     if (data == NULL) {
1310         PyThread_release_lock(mutex);
1311         return -1;
1312     }
1313     if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
1314         PyThread_release_lock(mutex);
1315         PyMem_Free(data);
1316         return -1;
1317     }
1318 
1319     // Add the data to the channel.
1320     int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
1321     PyThread_release_lock(mutex);
1322     if (res != 0) {
1323         _PyCrossInterpreterData_Release(data);
1324         PyMem_Free(data);
1325         return -1;
1326     }
1327 
1328     return 0;
1329 }
1330 
1331 static PyObject *
_channel_recv(_channels * channels,int64_t id)1332 _channel_recv(_channels *channels, int64_t id)
1333 {
1334     PyInterpreterState *interp = _get_current();
1335     if (interp == NULL) {
1336         return NULL;
1337     }
1338 
1339     // Look up the channel.
1340     PyThread_type_lock mutex = NULL;
1341     _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1342     if (chan == NULL) {
1343         return NULL;
1344     }
1345     // Past this point we are responsible for releasing the mutex.
1346 
1347     // Pop off the next item from the channel.
1348     _PyCrossInterpreterData *data = _channel_next(chan, PyInterpreterState_GetID(interp));
1349     PyThread_release_lock(mutex);
1350     if (data == NULL) {
1351         if (!PyErr_Occurred()) {
1352             PyErr_Format(ChannelEmptyError, "channel %" PRId64 " is empty", id);
1353         }
1354         return NULL;
1355     }
1356 
1357     // Convert the data back to an object.
1358     PyObject *obj = _PyCrossInterpreterData_NewObject(data);
1359     if (obj == NULL) {
1360         return NULL;
1361     }
1362     _PyCrossInterpreterData_Release(data);
1363     PyMem_Free(data);
1364 
1365     return obj;
1366 }
1367 
1368 static int
_channel_drop(_channels * channels,int64_t id,int send,int recv)1369 _channel_drop(_channels *channels, int64_t id, int send, int recv)
1370 {
1371     PyInterpreterState *interp = _get_current();
1372     if (interp == NULL) {
1373         return -1;
1374     }
1375 
1376     // Look up the channel.
1377     PyThread_type_lock mutex = NULL;
1378     _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1379     if (chan == NULL) {
1380         return -1;
1381     }
1382     // Past this point we are responsible for releasing the mutex.
1383 
1384     // Close one or both of the two ends.
1385     int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv);
1386     PyThread_release_lock(mutex);
1387     return res;
1388 }
1389 
1390 static int
_channel_close(_channels * channels,int64_t id,int end,int force)1391 _channel_close(_channels *channels, int64_t id, int end, int force)
1392 {
1393     return _channels_close(channels, id, NULL, end, force);
1394 }
1395 
1396 /* ChannelID class */
1397 
1398 static PyTypeObject ChannelIDtype;
1399 
1400 typedef struct channelid {
1401     PyObject_HEAD
1402     int64_t id;
1403     int end;
1404     int resolve;
1405     _channels *channels;
1406 } channelid;
1407 
1408 static int
channel_id_converter(PyObject * arg,void * ptr)1409 channel_id_converter(PyObject *arg, void *ptr)
1410 {
1411     int64_t cid;
1412     if (PyObject_TypeCheck(arg, &ChannelIDtype)) {
1413         cid = ((channelid *)arg)->id;
1414     }
1415     else if (PyIndex_Check(arg)) {
1416         cid = PyLong_AsLongLong(arg);
1417         if (cid == -1 && PyErr_Occurred()) {
1418             return 0;
1419         }
1420         if (cid < 0) {
1421             PyErr_Format(PyExc_ValueError,
1422                         "channel ID must be a non-negative int, got %R", arg);
1423             return 0;
1424         }
1425     }
1426     else {
1427         PyErr_Format(PyExc_TypeError,
1428                      "channel ID must be an int, got %.100s",
1429                      arg->ob_type->tp_name);
1430         return 0;
1431     }
1432     *(int64_t *)ptr = cid;
1433     return 1;
1434 }
1435 
1436 static channelid *
newchannelid(PyTypeObject * cls,int64_t cid,int end,_channels * channels,int force,int resolve)1437 newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
1438              int force, int resolve)
1439 {
1440     channelid *self = PyObject_New(channelid, cls);
1441     if (self == NULL) {
1442         return NULL;
1443     }
1444     self->id = cid;
1445     self->end = end;
1446     self->resolve = resolve;
1447     self->channels = channels;
1448 
1449     if (_channels_add_id_object(channels, cid) != 0) {
1450         if (force && PyErr_ExceptionMatches(ChannelNotFoundError)) {
1451             PyErr_Clear();
1452         }
1453         else {
1454             Py_DECREF((PyObject *)self);
1455             return NULL;
1456         }
1457     }
1458 
1459     return self;
1460 }
1461 
1462 static _channels * _global_channels(void);
1463 
1464 static PyObject *
channelid_new(PyTypeObject * cls,PyObject * args,PyObject * kwds)1465 channelid_new(PyTypeObject *cls, PyObject *args, PyObject *kwds)
1466 {
1467     static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL};
1468     int64_t cid;
1469     int send = -1;
1470     int recv = -1;
1471     int force = 0;
1472     int resolve = 0;
1473     if (!PyArg_ParseTupleAndKeywords(args, kwds,
1474                                      "O&|$pppp:ChannelID.__new__", kwlist,
1475                                      channel_id_converter, &cid, &send, &recv, &force, &resolve))
1476         return NULL;
1477 
1478     // Handle "send" and "recv".
1479     if (send == 0 && recv == 0) {
1480         PyErr_SetString(PyExc_ValueError,
1481                         "'send' and 'recv' cannot both be False");
1482         return NULL;
1483     }
1484 
1485     int end = 0;
1486     if (send == 1) {
1487         if (recv == 0 || recv == -1) {
1488             end = CHANNEL_SEND;
1489         }
1490     }
1491     else if (recv == 1) {
1492         end = CHANNEL_RECV;
1493     }
1494 
1495     return (PyObject *)newchannelid(cls, cid, end, _global_channels(),
1496                                     force, resolve);
1497 }
1498 
1499 static void
channelid_dealloc(PyObject * v)1500 channelid_dealloc(PyObject *v)
1501 {
1502     int64_t cid = ((channelid *)v)->id;
1503     _channels *channels = ((channelid *)v)->channels;
1504     Py_TYPE(v)->tp_free(v);
1505 
1506     _channels_drop_id_object(channels, cid);
1507 }
1508 
1509 static PyObject *
channelid_repr(PyObject * self)1510 channelid_repr(PyObject *self)
1511 {
1512     PyTypeObject *type = Py_TYPE(self);
1513     const char *name = _PyType_Name(type);
1514 
1515     channelid *cid = (channelid *)self;
1516     const char *fmt;
1517     if (cid->end == CHANNEL_SEND) {
1518         fmt = "%s(%" PRId64 ", send=True)";
1519     }
1520     else if (cid->end == CHANNEL_RECV) {
1521         fmt = "%s(%" PRId64 ", recv=True)";
1522     }
1523     else {
1524         fmt = "%s(%" PRId64 ")";
1525     }
1526     return PyUnicode_FromFormat(fmt, name, cid->id);
1527 }
1528 
1529 static PyObject *
channelid_str(PyObject * self)1530 channelid_str(PyObject *self)
1531 {
1532     channelid *cid = (channelid *)self;
1533     return PyUnicode_FromFormat("%" PRId64 "", cid->id);
1534 }
1535 
1536 static PyObject *
channelid_int(PyObject * self)1537 channelid_int(PyObject *self)
1538 {
1539     channelid *cid = (channelid *)self;
1540     return PyLong_FromLongLong(cid->id);
1541 }
1542 
1543 static PyNumberMethods channelid_as_number = {
1544      0,                        /* nb_add */
1545      0,                        /* nb_subtract */
1546      0,                        /* nb_multiply */
1547      0,                        /* nb_remainder */
1548      0,                        /* nb_divmod */
1549      0,                        /* nb_power */
1550      0,                        /* nb_negative */
1551      0,                        /* nb_positive */
1552      0,                        /* nb_absolute */
1553      0,                        /* nb_bool */
1554      0,                        /* nb_invert */
1555      0,                        /* nb_lshift */
1556      0,                        /* nb_rshift */
1557      0,                        /* nb_and */
1558      0,                        /* nb_xor */
1559      0,                        /* nb_or */
1560      (unaryfunc)channelid_int, /* nb_int */
1561      0,                        /* nb_reserved */
1562      0,                        /* nb_float */
1563 
1564      0,                        /* nb_inplace_add */
1565      0,                        /* nb_inplace_subtract */
1566      0,                        /* nb_inplace_multiply */
1567      0,                        /* nb_inplace_remainder */
1568      0,                        /* nb_inplace_power */
1569      0,                        /* nb_inplace_lshift */
1570      0,                        /* nb_inplace_rshift */
1571      0,                        /* nb_inplace_and */
1572      0,                        /* nb_inplace_xor */
1573      0,                        /* nb_inplace_or */
1574 
1575      0,                        /* nb_floor_divide */
1576      0,                        /* nb_true_divide */
1577      0,                        /* nb_inplace_floor_divide */
1578      0,                        /* nb_inplace_true_divide */
1579 
1580      (unaryfunc)channelid_int, /* nb_index */
1581 };
1582 
1583 static Py_hash_t
channelid_hash(PyObject * self)1584 channelid_hash(PyObject *self)
1585 {
1586     channelid *cid = (channelid *)self;
1587     PyObject *id = PyLong_FromLongLong(cid->id);
1588     if (id == NULL) {
1589         return -1;
1590     }
1591     Py_hash_t hash = PyObject_Hash(id);
1592     Py_DECREF(id);
1593     return hash;
1594 }
1595 
1596 static PyObject *
channelid_richcompare(PyObject * self,PyObject * other,int op)1597 channelid_richcompare(PyObject *self, PyObject *other, int op)
1598 {
1599     if (op != Py_EQ && op != Py_NE) {
1600         Py_RETURN_NOTIMPLEMENTED;
1601     }
1602 
1603     if (!PyObject_TypeCheck(self, &ChannelIDtype)) {
1604         Py_RETURN_NOTIMPLEMENTED;
1605     }
1606 
1607     channelid *cid = (channelid *)self;
1608     int equal;
1609     if (PyObject_TypeCheck(other, &ChannelIDtype)) {
1610         channelid *othercid = (channelid *)other;
1611         equal = (cid->end == othercid->end) && (cid->id == othercid->id);
1612     }
1613     else if (PyLong_Check(other)) {
1614         /* Fast path */
1615         int overflow;
1616         long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow);
1617         if (othercid == -1 && PyErr_Occurred()) {
1618             return NULL;
1619         }
1620         equal = !overflow && (othercid >= 0) && (cid->id == othercid);
1621     }
1622     else if (PyNumber_Check(other)) {
1623         PyObject *pyid = PyLong_FromLongLong(cid->id);
1624         if (pyid == NULL) {
1625             return NULL;
1626         }
1627         PyObject *res = PyObject_RichCompare(pyid, other, op);
1628         Py_DECREF(pyid);
1629         return res;
1630     }
1631     else {
1632         Py_RETURN_NOTIMPLEMENTED;
1633     }
1634 
1635     if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) {
1636         Py_RETURN_TRUE;
1637     }
1638     Py_RETURN_FALSE;
1639 }
1640 
1641 static PyObject *
_channel_from_cid(PyObject * cid,int end)1642 _channel_from_cid(PyObject *cid, int end)
1643 {
1644     PyObject *highlevel = PyImport_ImportModule("interpreters");
1645     if (highlevel == NULL) {
1646         PyErr_Clear();
1647         highlevel = PyImport_ImportModule("test.support.interpreters");
1648         if (highlevel == NULL) {
1649             return NULL;
1650         }
1651     }
1652     const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" :
1653                                                   "SendChannel";
1654     PyObject *cls = PyObject_GetAttrString(highlevel, clsname);
1655     Py_DECREF(highlevel);
1656     if (cls == NULL) {
1657         return NULL;
1658     }
1659     PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL);
1660     Py_DECREF(cls);
1661     if (chan == NULL) {
1662         return NULL;
1663     }
1664     return chan;
1665 }
1666 
1667 struct _channelid_xid {
1668     int64_t id;
1669     int end;
1670     int resolve;
1671 };
1672 
1673 static PyObject *
_channelid_from_xid(_PyCrossInterpreterData * data)1674 _channelid_from_xid(_PyCrossInterpreterData *data)
1675 {
1676     struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
1677     // Note that we do not preserve the "resolve" flag.
1678     PyObject *cid = (PyObject *)newchannelid(&ChannelIDtype, xid->id, xid->end,
1679                                              _global_channels(), 0, 0);
1680     if (xid->end == 0) {
1681         return cid;
1682     }
1683     if (!xid->resolve) {
1684         return cid;
1685     }
1686 
1687     /* Try returning a high-level channel end but fall back to the ID. */
1688     PyObject *chan = _channel_from_cid(cid, xid->end);
1689     if (chan == NULL) {
1690         PyErr_Clear();
1691         return cid;
1692     }
1693     Py_DECREF(cid);
1694     return chan;
1695 }
1696 
1697 static int
_channelid_shared(PyObject * obj,_PyCrossInterpreterData * data)1698 _channelid_shared(PyObject *obj, _PyCrossInterpreterData *data)
1699 {
1700     struct _channelid_xid *xid = PyMem_NEW(struct _channelid_xid, 1);
1701     if (xid == NULL) {
1702         return -1;
1703     }
1704     xid->id = ((channelid *)obj)->id;
1705     xid->end = ((channelid *)obj)->end;
1706     xid->resolve = ((channelid *)obj)->resolve;
1707 
1708     data->data = xid;
1709     Py_INCREF(obj);
1710     data->obj = obj;
1711     data->new_object = _channelid_from_xid;
1712     data->free = PyMem_Free;
1713     return 0;
1714 }
1715 
1716 static PyObject *
channelid_end(PyObject * self,void * end)1717 channelid_end(PyObject *self, void *end)
1718 {
1719     int force = 1;
1720     channelid *cid = (channelid *)self;
1721     if (end != NULL) {
1722         return (PyObject *)newchannelid(Py_TYPE(self), cid->id, *(int *)end,
1723                                         cid->channels, force, cid->resolve);
1724     }
1725 
1726     if (cid->end == CHANNEL_SEND) {
1727         return PyUnicode_InternFromString("send");
1728     }
1729     if (cid->end == CHANNEL_RECV) {
1730         return PyUnicode_InternFromString("recv");
1731     }
1732     return PyUnicode_InternFromString("both");
1733 }
1734 
1735 static int _channelid_end_send = CHANNEL_SEND;
1736 static int _channelid_end_recv = CHANNEL_RECV;
1737 
1738 static PyGetSetDef channelid_getsets[] = {
1739     {"end", (getter)channelid_end, NULL,
1740      PyDoc_STR("'send', 'recv', or 'both'")},
1741     {"send", (getter)channelid_end, NULL,
1742      PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send},
1743     {"recv", (getter)channelid_end, NULL,
1744      PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv},
1745     {NULL}
1746 };
1747 
1748 PyDoc_STRVAR(channelid_doc,
1749 "A channel ID identifies a channel and may be used as an int.");
1750 
1751 static PyTypeObject ChannelIDtype = {
1752     PyVarObject_HEAD_INIT(&PyType_Type, 0)
1753     "_xxsubinterpreters.ChannelID", /* tp_name */
1754     sizeof(channelid),              /* tp_basicsize */
1755     0,                              /* tp_itemsize */
1756     (destructor)channelid_dealloc,  /* tp_dealloc */
1757     0,                              /* tp_vectorcall_offset */
1758     0,                              /* tp_getattr */
1759     0,                              /* tp_setattr */
1760     0,                              /* tp_as_async */
1761     (reprfunc)channelid_repr,       /* tp_repr */
1762     &channelid_as_number,           /* tp_as_number */
1763     0,                              /* tp_as_sequence */
1764     0,                              /* tp_as_mapping */
1765     channelid_hash,                 /* tp_hash */
1766     0,                              /* tp_call */
1767     (reprfunc)channelid_str,        /* tp_str */
1768     0,                              /* tp_getattro */
1769     0,                              /* tp_setattro */
1770     0,                              /* tp_as_buffer */
1771     Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
1772     channelid_doc,                  /* tp_doc */
1773     0,                              /* tp_traverse */
1774     0,                              /* tp_clear */
1775     channelid_richcompare,          /* tp_richcompare */
1776     0,                              /* tp_weaklistoffset */
1777     0,                              /* tp_iter */
1778     0,                              /* tp_iternext */
1779     0,                              /* tp_methods */
1780     0,                              /* tp_members */
1781     channelid_getsets,              /* tp_getset */
1782     0,                              /* tp_base */
1783     0,                              /* tp_dict */
1784     0,                              /* tp_descr_get */
1785     0,                              /* tp_descr_set */
1786     0,                              /* tp_dictoffset */
1787     0,                              /* tp_init */
1788     0,                              /* tp_alloc */
1789     // Note that we do not set tp_new to channelid_new.  Instead we
1790     // set it to NULL, meaning it cannot be instantiated from Python
1791     // code.  We do this because there is a strong relationship between
1792     // channel IDs and the channel lifecycle, so this limitation avoids
1793     // related complications.
1794     NULL,                           /* tp_new */
1795 };
1796 
1797 
1798 /* interpreter-specific code ************************************************/
1799 
1800 static PyObject * RunFailedError = NULL;
1801 
1802 static int
interp_exceptions_init(PyObject * ns)1803 interp_exceptions_init(PyObject *ns)
1804 {
1805     // XXX Move the exceptions into per-module memory?
1806 
1807     if (RunFailedError == NULL) {
1808         // An uncaught exception came out of interp_run_string().
1809         RunFailedError = PyErr_NewException("_xxsubinterpreters.RunFailedError",
1810                                             PyExc_RuntimeError, NULL);
1811         if (RunFailedError == NULL) {
1812             return -1;
1813         }
1814         if (PyDict_SetItemString(ns, "RunFailedError", RunFailedError) != 0) {
1815             return -1;
1816         }
1817     }
1818 
1819     return 0;
1820 }
1821 
1822 static int
_is_running(PyInterpreterState * interp)1823 _is_running(PyInterpreterState *interp)
1824 {
1825     PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
1826     if (PyThreadState_Next(tstate) != NULL) {
1827         PyErr_SetString(PyExc_RuntimeError,
1828                         "interpreter has more than one thread");
1829         return -1;
1830     }
1831     PyFrameObject *frame = tstate->frame;
1832     if (frame == NULL) {
1833         if (PyErr_Occurred() != NULL) {
1834             return -1;
1835         }
1836         return 0;
1837     }
1838     return (int)(frame->f_executing);
1839 }
1840 
1841 static int
_ensure_not_running(PyInterpreterState * interp)1842 _ensure_not_running(PyInterpreterState *interp)
1843 {
1844     int is_running = _is_running(interp);
1845     if (is_running < 0) {
1846         return -1;
1847     }
1848     if (is_running) {
1849         PyErr_Format(PyExc_RuntimeError, "interpreter already running");
1850         return -1;
1851     }
1852     return 0;
1853 }
1854 
1855 static int
_run_script(PyInterpreterState * interp,const char * codestr,_sharedns * shared,_sharedexception ** exc)1856 _run_script(PyInterpreterState *interp, const char *codestr,
1857             _sharedns *shared, _sharedexception **exc)
1858 {
1859     PyObject *exctype = NULL;
1860     PyObject *excval = NULL;
1861     PyObject *tb = NULL;
1862 
1863     PyObject *main_mod = _PyInterpreterState_GetMainModule(interp);
1864     if (main_mod == NULL) {
1865         goto error;
1866     }
1867     PyObject *ns = PyModule_GetDict(main_mod);  // borrowed
1868     Py_DECREF(main_mod);
1869     if (ns == NULL) {
1870         goto error;
1871     }
1872     Py_INCREF(ns);
1873 
1874     // Apply the cross-interpreter data.
1875     if (shared != NULL) {
1876         if (_sharedns_apply(shared, ns) != 0) {
1877             Py_DECREF(ns);
1878             goto error;
1879         }
1880     }
1881 
1882     // Run the string (see PyRun_SimpleStringFlags).
1883     PyObject *result = PyRun_StringFlags(codestr, Py_file_input, ns, ns, NULL);
1884     Py_DECREF(ns);
1885     if (result == NULL) {
1886         goto error;
1887     }
1888     else {
1889         Py_DECREF(result);  // We throw away the result.
1890     }
1891 
1892     *exc = NULL;
1893     return 0;
1894 
1895 error:
1896     PyErr_Fetch(&exctype, &excval, &tb);
1897 
1898     _sharedexception *sharedexc = _sharedexception_bind(exctype, excval, tb);
1899     Py_XDECREF(exctype);
1900     Py_XDECREF(excval);
1901     Py_XDECREF(tb);
1902     if (sharedexc == NULL) {
1903         fprintf(stderr, "RunFailedError: script raised an uncaught exception");
1904         PyErr_Clear();
1905         sharedexc = NULL;
1906     }
1907     else {
1908         assert(!PyErr_Occurred());
1909     }
1910     *exc = sharedexc;
1911     return -1;
1912 }
1913 
1914 static int
_run_script_in_interpreter(PyInterpreterState * interp,const char * codestr,PyObject * shareables)1915 _run_script_in_interpreter(PyInterpreterState *interp, const char *codestr,
1916                            PyObject *shareables)
1917 {
1918     if (_ensure_not_running(interp) < 0) {
1919         return -1;
1920     }
1921 
1922     _sharedns *shared = _get_shared_ns(shareables);
1923     if (shared == NULL && PyErr_Occurred()) {
1924         return -1;
1925     }
1926 
1927     // Switch to interpreter.
1928     PyThreadState *save_tstate = NULL;
1929     if (interp != _PyInterpreterState_Get()) {
1930         // XXX Using the "head" thread isn't strictly correct.
1931         PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
1932         // XXX Possible GILState issues?
1933         save_tstate = PyThreadState_Swap(tstate);
1934     }
1935 
1936     // Run the script.
1937     _sharedexception *exc = NULL;
1938     int result = _run_script(interp, codestr, shared, &exc);
1939 
1940     // Switch back.
1941     if (save_tstate != NULL) {
1942         PyThreadState_Swap(save_tstate);
1943     }
1944 
1945     // Propagate any exception out to the caller.
1946     if (exc != NULL) {
1947         _sharedexception_apply(exc, RunFailedError);
1948         _sharedexception_free(exc);
1949     }
1950     else if (result != 0) {
1951         // We were unable to allocate a shared exception.
1952         PyErr_NoMemory();
1953     }
1954 
1955     if (shared != NULL) {
1956         _sharedns_free(shared);
1957     }
1958 
1959     return result;
1960 }
1961 
1962 
1963 /* module level code ********************************************************/
1964 
1965 /* globals is the process-global state for the module.  It holds all
1966    the data that we need to share between interpreters, so it cannot
1967    hold PyObject values. */
1968 static struct globals {
1969     _channels channels;
1970 } _globals = {{0}};
1971 
1972 static int
_init_globals(void)1973 _init_globals(void)
1974 {
1975     if (_channels_init(&_globals.channels) != 0) {
1976         return -1;
1977     }
1978     return 0;
1979 }
1980 
1981 static _channels *
_global_channels(void)1982 _global_channels(void) {
1983     return &_globals.channels;
1984 }
1985 
1986 static PyObject *
interp_create(PyObject * self,PyObject * args)1987 interp_create(PyObject *self, PyObject *args)
1988 {
1989     if (!PyArg_UnpackTuple(args, "create", 0, 0)) {
1990         return NULL;
1991     }
1992 
1993     // Create and initialize the new interpreter.
1994     PyThreadState *save_tstate = PyThreadState_Swap(NULL);
1995     // XXX Possible GILState issues?
1996     PyThreadState *tstate = Py_NewInterpreter();
1997     PyThreadState_Swap(save_tstate);
1998     if (tstate == NULL) {
1999         /* Since no new thread state was created, there is no exception to
2000            propagate; raise a fresh one after swapping in the old thread
2001            state. */
2002         PyErr_SetString(PyExc_RuntimeError, "interpreter creation failed");
2003         return NULL;
2004     }
2005     PyObject *idobj = _PyInterpreterState_GetIDObject(tstate->interp);
2006     if (idobj == NULL) {
2007         // XXX Possible GILState issues?
2008         save_tstate = PyThreadState_Swap(tstate);
2009         Py_EndInterpreter(tstate);
2010         PyThreadState_Swap(save_tstate);
2011         return NULL;
2012     }
2013     _PyInterpreterState_RequireIDRef(tstate->interp, 1);
2014     return idobj;
2015 }
2016 
2017 PyDoc_STRVAR(create_doc,
2018 "create() -> ID\n\
2019 \n\
2020 Create a new interpreter and return a unique generated ID.");
2021 
2022 
2023 static PyObject *
interp_destroy(PyObject * self,PyObject * args,PyObject * kwds)2024 interp_destroy(PyObject *self, PyObject *args, PyObject *kwds)
2025 {
2026     static char *kwlist[] = {"id", NULL};
2027     PyObject *id;
2028     // XXX Use "L" for id?
2029     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2030                                      "O:destroy", kwlist, &id)) {
2031         return NULL;
2032     }
2033 
2034     // Look up the interpreter.
2035     PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2036     if (interp == NULL) {
2037         return NULL;
2038     }
2039 
2040     // Ensure we don't try to destroy the current interpreter.
2041     PyInterpreterState *current = _get_current();
2042     if (current == NULL) {
2043         return NULL;
2044     }
2045     if (interp == current) {
2046         PyErr_SetString(PyExc_RuntimeError,
2047                         "cannot destroy the current interpreter");
2048         return NULL;
2049     }
2050 
2051     // Ensure the interpreter isn't running.
2052     /* XXX We *could* support destroying a running interpreter but
2053        aren't going to worry about it for now. */
2054     if (_ensure_not_running(interp) < 0) {
2055         return NULL;
2056     }
2057 
2058     // Destroy the interpreter.
2059     //PyInterpreterState_Delete(interp);
2060     PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
2061     // XXX Possible GILState issues?
2062     PyThreadState *save_tstate = PyThreadState_Swap(tstate);
2063     Py_EndInterpreter(tstate);
2064     PyThreadState_Swap(save_tstate);
2065 
2066     Py_RETURN_NONE;
2067 }
2068 
2069 PyDoc_STRVAR(destroy_doc,
2070 "destroy(id)\n\
2071 \n\
2072 Destroy the identified interpreter.\n\
2073 \n\
2074 Attempting to destroy the current interpreter results in a RuntimeError.\n\
2075 So does an unrecognized ID.");
2076 
2077 
2078 static PyObject *
interp_list_all(PyObject * self,PyObject * Py_UNUSED (ignored))2079 interp_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
2080 {
2081     PyObject *ids, *id;
2082     PyInterpreterState *interp;
2083 
2084     ids = PyList_New(0);
2085     if (ids == NULL) {
2086         return NULL;
2087     }
2088 
2089     interp = PyInterpreterState_Head();
2090     while (interp != NULL) {
2091         id = _PyInterpreterState_GetIDObject(interp);
2092         if (id == NULL) {
2093             Py_DECREF(ids);
2094             return NULL;
2095         }
2096         // insert at front of list
2097         int res = PyList_Insert(ids, 0, id);
2098         Py_DECREF(id);
2099         if (res < 0) {
2100             Py_DECREF(ids);
2101             return NULL;
2102         }
2103 
2104         interp = PyInterpreterState_Next(interp);
2105     }
2106 
2107     return ids;
2108 }
2109 
2110 PyDoc_STRVAR(list_all_doc,
2111 "list_all() -> [ID]\n\
2112 \n\
2113 Return a list containing the ID of every existing interpreter.");
2114 
2115 
2116 static PyObject *
interp_get_current(PyObject * self,PyObject * Py_UNUSED (ignored))2117 interp_get_current(PyObject *self, PyObject *Py_UNUSED(ignored))
2118 {
2119     PyInterpreterState *interp =_get_current();
2120     if (interp == NULL) {
2121         return NULL;
2122     }
2123     return _PyInterpreterState_GetIDObject(interp);
2124 }
2125 
2126 PyDoc_STRVAR(get_current_doc,
2127 "get_current() -> ID\n\
2128 \n\
2129 Return the ID of current interpreter.");
2130 
2131 
2132 static PyObject *
interp_get_main(PyObject * self,PyObject * Py_UNUSED (ignored))2133 interp_get_main(PyObject *self, PyObject *Py_UNUSED(ignored))
2134 {
2135     // Currently, 0 is always the main interpreter.
2136     PY_INT64_T id = 0;
2137     return _PyInterpreterID_New(id);
2138 }
2139 
2140 PyDoc_STRVAR(get_main_doc,
2141 "get_main() -> ID\n\
2142 \n\
2143 Return the ID of main interpreter.");
2144 
2145 
2146 static PyObject *
interp_run_string(PyObject * self,PyObject * args,PyObject * kwds)2147 interp_run_string(PyObject *self, PyObject *args, PyObject *kwds)
2148 {
2149     static char *kwlist[] = {"id", "script", "shared", NULL};
2150     PyObject *id, *code;
2151     PyObject *shared = NULL;
2152     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2153                                      "OU|O:run_string", kwlist,
2154                                      &id, &code, &shared)) {
2155         return NULL;
2156     }
2157 
2158     // Look up the interpreter.
2159     PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2160     if (interp == NULL) {
2161         return NULL;
2162     }
2163 
2164     // Extract code.
2165     Py_ssize_t size;
2166     const char *codestr = PyUnicode_AsUTF8AndSize(code, &size);
2167     if (codestr == NULL) {
2168         return NULL;
2169     }
2170     if (strlen(codestr) != (size_t)size) {
2171         PyErr_SetString(PyExc_ValueError,
2172                         "source code string cannot contain null bytes");
2173         return NULL;
2174     }
2175 
2176     // Run the code in the interpreter.
2177     if (_run_script_in_interpreter(interp, codestr, shared) != 0) {
2178         return NULL;
2179     }
2180     Py_RETURN_NONE;
2181 }
2182 
2183 PyDoc_STRVAR(run_string_doc,
2184 "run_string(id, script, shared)\n\
2185 \n\
2186 Execute the provided string in the identified interpreter.\n\
2187 \n\
2188 See PyRun_SimpleStrings.");
2189 
2190 
2191 static PyObject *
object_is_shareable(PyObject * self,PyObject * args,PyObject * kwds)2192 object_is_shareable(PyObject *self, PyObject *args, PyObject *kwds)
2193 {
2194     static char *kwlist[] = {"obj", NULL};
2195     PyObject *obj;
2196     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2197                                      "O:is_shareable", kwlist, &obj)) {
2198         return NULL;
2199     }
2200 
2201     if (_PyObject_CheckCrossInterpreterData(obj) == 0) {
2202         Py_RETURN_TRUE;
2203     }
2204     PyErr_Clear();
2205     Py_RETURN_FALSE;
2206 }
2207 
2208 PyDoc_STRVAR(is_shareable_doc,
2209 "is_shareable(obj) -> bool\n\
2210 \n\
2211 Return True if the object's data may be shared between interpreters and\n\
2212 False otherwise.");
2213 
2214 
2215 static PyObject *
interp_is_running(PyObject * self,PyObject * args,PyObject * kwds)2216 interp_is_running(PyObject *self, PyObject *args, PyObject *kwds)
2217 {
2218     static char *kwlist[] = {"id", NULL};
2219     PyObject *id;
2220     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2221                                      "O:is_running", kwlist, &id)) {
2222         return NULL;
2223     }
2224 
2225     PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2226     if (interp == NULL) {
2227         return NULL;
2228     }
2229     int is_running = _is_running(interp);
2230     if (is_running < 0) {
2231         return NULL;
2232     }
2233     if (is_running) {
2234         Py_RETURN_TRUE;
2235     }
2236     Py_RETURN_FALSE;
2237 }
2238 
2239 PyDoc_STRVAR(is_running_doc,
2240 "is_running(id) -> bool\n\
2241 \n\
2242 Return whether or not the identified interpreter is running.");
2243 
2244 static PyObject *
channel_create(PyObject * self,PyObject * Py_UNUSED (ignored))2245 channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
2246 {
2247     int64_t cid = _channel_create(&_globals.channels);
2248     if (cid < 0) {
2249         return NULL;
2250     }
2251     PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, cid, 0,
2252                                             &_globals.channels, 0, 0);
2253     if (id == NULL) {
2254         if (_channel_destroy(&_globals.channels, cid) != 0) {
2255             // XXX issue a warning?
2256         }
2257         return NULL;
2258     }
2259     assert(((channelid *)id)->channels != NULL);
2260     return id;
2261 }
2262 
2263 PyDoc_STRVAR(channel_create_doc,
2264 "channel_create() -> cid\n\
2265 \n\
2266 Create a new cross-interpreter channel and return a unique generated ID.");
2267 
2268 static PyObject *
channel_destroy(PyObject * self,PyObject * args,PyObject * kwds)2269 channel_destroy(PyObject *self, PyObject *args, PyObject *kwds)
2270 {
2271     static char *kwlist[] = {"cid", NULL};
2272     int64_t cid;
2273     if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist,
2274                                      channel_id_converter, &cid)) {
2275         return NULL;
2276     }
2277 
2278     if (_channel_destroy(&_globals.channels, cid) != 0) {
2279         return NULL;
2280     }
2281     Py_RETURN_NONE;
2282 }
2283 
2284 PyDoc_STRVAR(channel_destroy_doc,
2285 "channel_destroy(cid)\n\
2286 \n\
2287 Close and finalize the channel.  Afterward attempts to use the channel\n\
2288 will behave as though it never existed.");
2289 
2290 static PyObject *
channel_list_all(PyObject * self,PyObject * Py_UNUSED (ignored))2291 channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
2292 {
2293     int64_t count = 0;
2294     int64_t *cids = _channels_list_all(&_globals.channels, &count);
2295     if (cids == NULL) {
2296         if (count == 0) {
2297             return PyList_New(0);
2298         }
2299         return NULL;
2300     }
2301     PyObject *ids = PyList_New((Py_ssize_t)count);
2302     if (ids == NULL) {
2303         goto finally;
2304     }
2305     int64_t *cur = cids;
2306     for (int64_t i=0; i < count; cur++, i++) {
2307         PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, *cur, 0,
2308                                                 &_globals.channels, 0, 0);
2309         if (id == NULL) {
2310             Py_DECREF(ids);
2311             ids = NULL;
2312             break;
2313         }
2314         PyList_SET_ITEM(ids, i, id);
2315     }
2316 
2317 finally:
2318     PyMem_Free(cids);
2319     return ids;
2320 }
2321 
2322 PyDoc_STRVAR(channel_list_all_doc,
2323 "channel_list_all() -> [cid]\n\
2324 \n\
2325 Return the list of all IDs for active channels.");
2326 
2327 static PyObject *
channel_send(PyObject * self,PyObject * args,PyObject * kwds)2328 channel_send(PyObject *self, PyObject *args, PyObject *kwds)
2329 {
2330     static char *kwlist[] = {"cid", "obj", NULL};
2331     int64_t cid;
2332     PyObject *obj;
2333     if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist,
2334                                      channel_id_converter, &cid, &obj)) {
2335         return NULL;
2336     }
2337 
2338     if (_channel_send(&_globals.channels, cid, obj) != 0) {
2339         return NULL;
2340     }
2341     Py_RETURN_NONE;
2342 }
2343 
2344 PyDoc_STRVAR(channel_send_doc,
2345 "channel_send(cid, obj)\n\
2346 \n\
2347 Add the object's data to the channel's queue.");
2348 
2349 static PyObject *
channel_recv(PyObject * self,PyObject * args,PyObject * kwds)2350 channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
2351 {
2352     static char *kwlist[] = {"cid", NULL};
2353     int64_t cid;
2354     if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_recv", kwlist,
2355                                      channel_id_converter, &cid)) {
2356         return NULL;
2357     }
2358 
2359     return _channel_recv(&_globals.channels, cid);
2360 }
2361 
2362 PyDoc_STRVAR(channel_recv_doc,
2363 "channel_recv(cid) -> obj\n\
2364 \n\
2365 Return a new object from the data at the from of the channel's queue.");
2366 
2367 static PyObject *
channel_close(PyObject * self,PyObject * args,PyObject * kwds)2368 channel_close(PyObject *self, PyObject *args, PyObject *kwds)
2369 {
2370     static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
2371     int64_t cid;
2372     int send = 0;
2373     int recv = 0;
2374     int force = 0;
2375     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2376                                      "O&|$ppp:channel_close", kwlist,
2377                                      channel_id_converter, &cid, &send, &recv, &force)) {
2378         return NULL;
2379     }
2380 
2381     if (_channel_close(&_globals.channels, cid, send-recv, force) != 0) {
2382         return NULL;
2383     }
2384     Py_RETURN_NONE;
2385 }
2386 
2387 PyDoc_STRVAR(channel_close_doc,
2388 "channel_close(cid, *, send=None, recv=None, force=False)\n\
2389 \n\
2390 Close the channel for all interpreters.\n\
2391 \n\
2392 If the channel is empty then the keyword args are ignored and both\n\
2393 ends are immediately closed.  Otherwise, if 'force' is True then\n\
2394 all queued items are released and both ends are immediately\n\
2395 closed.\n\
2396 \n\
2397 If the channel is not empty *and* 'force' is False then following\n\
2398 happens:\n\
2399 \n\
2400  * recv is True (regardless of send):\n\
2401    - raise ChannelNotEmptyError\n\
2402  * recv is None and send is None:\n\
2403    - raise ChannelNotEmptyError\n\
2404  * send is True and recv is not True:\n\
2405    - fully close the 'send' end\n\
2406    - close the 'recv' end to interpreters not already receiving\n\
2407    - fully close it once empty\n\
2408 \n\
2409 Closing an already closed channel results in a ChannelClosedError.\n\
2410 \n\
2411 Once the channel's ID has no more ref counts in any interpreter\n\
2412 the channel will be destroyed.");
2413 
2414 static PyObject *
channel_release(PyObject * self,PyObject * args,PyObject * kwds)2415 channel_release(PyObject *self, PyObject *args, PyObject *kwds)
2416 {
2417     // Note that only the current interpreter is affected.
2418     static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
2419     int64_t cid;
2420     int send = 0;
2421     int recv = 0;
2422     int force = 0;
2423     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2424                                      "O&|$ppp:channel_release", kwlist,
2425                                      channel_id_converter, &cid, &send, &recv, &force)) {
2426         return NULL;
2427     }
2428     if (send == 0 && recv == 0) {
2429         send = 1;
2430         recv = 1;
2431     }
2432 
2433     // XXX Handle force is True.
2434     // XXX Fix implicit release.
2435 
2436     if (_channel_drop(&_globals.channels, cid, send, recv) != 0) {
2437         return NULL;
2438     }
2439     Py_RETURN_NONE;
2440 }
2441 
2442 PyDoc_STRVAR(channel_release_doc,
2443 "channel_release(cid, *, send=None, recv=None, force=True)\n\
2444 \n\
2445 Close the channel for the current interpreter.  'send' and 'recv'\n\
2446 (bool) may be used to indicate the ends to close.  By default both\n\
2447 ends are closed.  Closing an already closed end is a noop.");
2448 
2449 static PyObject *
channel__channel_id(PyObject * self,PyObject * args,PyObject * kwds)2450 channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
2451 {
2452     return channelid_new(&ChannelIDtype, args, kwds);
2453 }
2454 
2455 static PyMethodDef module_functions[] = {
2456     {"create",                    (PyCFunction)interp_create,
2457      METH_VARARGS, create_doc},
2458     {"destroy",                   (PyCFunction)(void(*)(void))interp_destroy,
2459      METH_VARARGS | METH_KEYWORDS, destroy_doc},
2460     {"list_all",                  interp_list_all,
2461      METH_NOARGS, list_all_doc},
2462     {"get_current",               interp_get_current,
2463      METH_NOARGS, get_current_doc},
2464     {"get_main",                  interp_get_main,
2465      METH_NOARGS, get_main_doc},
2466     {"is_running",                (PyCFunction)(void(*)(void))interp_is_running,
2467      METH_VARARGS | METH_KEYWORDS, is_running_doc},
2468     {"run_string",                (PyCFunction)(void(*)(void))interp_run_string,
2469      METH_VARARGS | METH_KEYWORDS, run_string_doc},
2470 
2471     {"is_shareable",              (PyCFunction)(void(*)(void))object_is_shareable,
2472      METH_VARARGS | METH_KEYWORDS, is_shareable_doc},
2473 
2474     {"channel_create",            channel_create,
2475      METH_NOARGS, channel_create_doc},
2476     {"channel_destroy",           (PyCFunction)(void(*)(void))channel_destroy,
2477      METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
2478     {"channel_list_all",          channel_list_all,
2479      METH_NOARGS, channel_list_all_doc},
2480     {"channel_send",              (PyCFunction)(void(*)(void))channel_send,
2481      METH_VARARGS | METH_KEYWORDS, channel_send_doc},
2482     {"channel_recv",              (PyCFunction)(void(*)(void))channel_recv,
2483      METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
2484     {"channel_close",             (PyCFunction)(void(*)(void))channel_close,
2485      METH_VARARGS | METH_KEYWORDS, channel_close_doc},
2486     {"channel_release",           (PyCFunction)(void(*)(void))channel_release,
2487      METH_VARARGS | METH_KEYWORDS, channel_release_doc},
2488     {"_channel_id",               (PyCFunction)(void(*)(void))channel__channel_id,
2489      METH_VARARGS | METH_KEYWORDS, NULL},
2490 
2491     {NULL,                        NULL}           /* sentinel */
2492 };
2493 
2494 
2495 /* initialization function */
2496 
2497 PyDoc_STRVAR(module_doc,
2498 "This module provides primitive operations to manage Python interpreters.\n\
2499 The 'interpreters' module provides a more convenient interface.");
2500 
2501 static struct PyModuleDef interpretersmodule = {
2502     PyModuleDef_HEAD_INIT,
2503     "_xxsubinterpreters",  /* m_name */
2504     module_doc,            /* m_doc */
2505     -1,                    /* m_size */
2506     module_functions,      /* m_methods */
2507     NULL,                  /* m_slots */
2508     NULL,                  /* m_traverse */
2509     NULL,                  /* m_clear */
2510     NULL                   /* m_free */
2511 };
2512 
2513 
2514 PyMODINIT_FUNC
PyInit__xxsubinterpreters(void)2515 PyInit__xxsubinterpreters(void)
2516 {
2517     if (_init_globals() != 0) {
2518         return NULL;
2519     }
2520 
2521     /* Initialize types */
2522     if (PyType_Ready(&ChannelIDtype) != 0) {
2523         return NULL;
2524     }
2525 
2526     /* Create the module */
2527     PyObject *module = PyModule_Create(&interpretersmodule);
2528     if (module == NULL) {
2529         return NULL;
2530     }
2531 
2532     /* Add exception types */
2533     PyObject *ns = PyModule_GetDict(module);  // borrowed
2534     if (interp_exceptions_init(ns) != 0) {
2535         return NULL;
2536     }
2537     if (channel_exceptions_init(ns) != 0) {
2538         return NULL;
2539     }
2540 
2541     /* Add other types */
2542     Py_INCREF(&ChannelIDtype);
2543     if (PyDict_SetItemString(ns, "ChannelID", (PyObject *)&ChannelIDtype) != 0) {
2544         return NULL;
2545     }
2546     Py_INCREF(&_PyInterpreterID_Type);
2547     if (PyDict_SetItemString(ns, "InterpreterID", (PyObject *)&_PyInterpreterID_Type) != 0) {
2548         return NULL;
2549     }
2550 
2551     if (_PyCrossInterpreterData_RegisterClass(&ChannelIDtype, _channelid_shared)) {
2552         return NULL;
2553     }
2554 
2555     return module;
2556 }
2557