• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 
2 /* interpreters module */
3 /* low-level access to interpreter primitives */
4 
5 #include "Python.h"
6 #include "frameobject.h"
7 #include "interpreteridobject.h"
8 
9 
10 static char *
_copy_raw_string(PyObject * strobj)11 _copy_raw_string(PyObject *strobj)
12 {
13     const char *str = PyUnicode_AsUTF8(strobj);
14     if (str == NULL) {
15         return NULL;
16     }
17     char *copied = PyMem_Malloc(strlen(str)+1);
18     if (copied == NULL) {
19         PyErr_NoMemory();
20         return NULL;
21     }
22     strcpy(copied, str);
23     return copied;
24 }
25 
26 static PyInterpreterState *
_get_current(void)27 _get_current(void)
28 {
29     // PyInterpreterState_Get() aborts if lookup fails, so don't need
30     // to check the result for NULL.
31     return PyInterpreterState_Get();
32 }
33 
34 
35 /* data-sharing-specific code ***********************************************/
36 
37 struct _sharednsitem {
38     char *name;
39     _PyCrossInterpreterData data;
40 };
41 
42 static void _sharednsitem_clear(struct _sharednsitem *);  // forward
43 
44 static int
_sharednsitem_init(struct _sharednsitem * item,PyObject * key,PyObject * value)45 _sharednsitem_init(struct _sharednsitem *item, PyObject *key, PyObject *value)
46 {
47     item->name = _copy_raw_string(key);
48     if (item->name == NULL) {
49         return -1;
50     }
51     if (_PyObject_GetCrossInterpreterData(value, &item->data) != 0) {
52         _sharednsitem_clear(item);
53         return -1;
54     }
55     return 0;
56 }
57 
58 static void
_sharednsitem_clear(struct _sharednsitem * item)59 _sharednsitem_clear(struct _sharednsitem *item)
60 {
61     if (item->name != NULL) {
62         PyMem_Free(item->name);
63         item->name = NULL;
64     }
65     _PyCrossInterpreterData_Release(&item->data);
66 }
67 
68 static int
_sharednsitem_apply(struct _sharednsitem * item,PyObject * ns)69 _sharednsitem_apply(struct _sharednsitem *item, PyObject *ns)
70 {
71     PyObject *name = PyUnicode_FromString(item->name);
72     if (name == NULL) {
73         return -1;
74     }
75     PyObject *value = _PyCrossInterpreterData_NewObject(&item->data);
76     if (value == NULL) {
77         Py_DECREF(name);
78         return -1;
79     }
80     int res = PyDict_SetItem(ns, name, value);
81     Py_DECREF(name);
82     Py_DECREF(value);
83     return res;
84 }
85 
86 typedef struct _sharedns {
87     Py_ssize_t len;
88     struct _sharednsitem* items;
89 } _sharedns;
90 
91 static _sharedns *
_sharedns_new(Py_ssize_t len)92 _sharedns_new(Py_ssize_t len)
93 {
94     _sharedns *shared = PyMem_NEW(_sharedns, 1);
95     if (shared == NULL) {
96         PyErr_NoMemory();
97         return NULL;
98     }
99     shared->len = len;
100     shared->items = PyMem_NEW(struct _sharednsitem, len);
101     if (shared->items == NULL) {
102         PyErr_NoMemory();
103         PyMem_Free(shared);
104         return NULL;
105     }
106     return shared;
107 }
108 
109 static void
_sharedns_free(_sharedns * shared)110 _sharedns_free(_sharedns *shared)
111 {
112     for (Py_ssize_t i=0; i < shared->len; i++) {
113         _sharednsitem_clear(&shared->items[i]);
114     }
115     PyMem_Free(shared->items);
116     PyMem_Free(shared);
117 }
118 
119 static _sharedns *
_get_shared_ns(PyObject * shareable)120 _get_shared_ns(PyObject *shareable)
121 {
122     if (shareable == NULL || shareable == Py_None) {
123         return NULL;
124     }
125     Py_ssize_t len = PyDict_Size(shareable);
126     if (len == 0) {
127         return NULL;
128     }
129 
130     _sharedns *shared = _sharedns_new(len);
131     if (shared == NULL) {
132         return NULL;
133     }
134     Py_ssize_t pos = 0;
135     for (Py_ssize_t i=0; i < len; i++) {
136         PyObject *key, *value;
137         if (PyDict_Next(shareable, &pos, &key, &value) == 0) {
138             break;
139         }
140         if (_sharednsitem_init(&shared->items[i], key, value) != 0) {
141             break;
142         }
143     }
144     if (PyErr_Occurred()) {
145         _sharedns_free(shared);
146         return NULL;
147     }
148     return shared;
149 }
150 
151 static int
_sharedns_apply(_sharedns * shared,PyObject * ns)152 _sharedns_apply(_sharedns *shared, PyObject *ns)
153 {
154     for (Py_ssize_t i=0; i < shared->len; i++) {
155         if (_sharednsitem_apply(&shared->items[i], ns) != 0) {
156             return -1;
157         }
158     }
159     return 0;
160 }
161 
162 // Ultimately we'd like to preserve enough information about the
163 // exception and traceback that we could re-constitute (or at least
164 // simulate, a la traceback.TracebackException), and even chain, a copy
165 // of the exception in the calling interpreter.
166 
167 typedef struct _sharedexception {
168     char *name;
169     char *msg;
170 } _sharedexception;
171 
172 static _sharedexception *
_sharedexception_new(void)173 _sharedexception_new(void)
174 {
175     _sharedexception *err = PyMem_NEW(_sharedexception, 1);
176     if (err == NULL) {
177         PyErr_NoMemory();
178         return NULL;
179     }
180     err->name = NULL;
181     err->msg = NULL;
182     return err;
183 }
184 
185 static void
_sharedexception_clear(_sharedexception * exc)186 _sharedexception_clear(_sharedexception *exc)
187 {
188     if (exc->name != NULL) {
189         PyMem_Free(exc->name);
190     }
191     if (exc->msg != NULL) {
192         PyMem_Free(exc->msg);
193     }
194 }
195 
196 static void
_sharedexception_free(_sharedexception * exc)197 _sharedexception_free(_sharedexception *exc)
198 {
199     _sharedexception_clear(exc);
200     PyMem_Free(exc);
201 }
202 
203 static _sharedexception *
_sharedexception_bind(PyObject * exctype,PyObject * exc,PyObject * tb)204 _sharedexception_bind(PyObject *exctype, PyObject *exc, PyObject *tb)
205 {
206     assert(exctype != NULL);
207     char *failure = NULL;
208 
209     _sharedexception *err = _sharedexception_new();
210     if (err == NULL) {
211         goto finally;
212     }
213 
214     PyObject *name = PyUnicode_FromFormat("%S", exctype);
215     if (name == NULL) {
216         failure = "unable to format exception type name";
217         goto finally;
218     }
219     err->name = _copy_raw_string(name);
220     Py_DECREF(name);
221     if (err->name == NULL) {
222         if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
223             failure = "out of memory copying exception type name";
224         } else {
225             failure = "unable to encode and copy exception type name";
226         }
227         goto finally;
228     }
229 
230     if (exc != NULL) {
231         PyObject *msg = PyUnicode_FromFormat("%S", exc);
232         if (msg == NULL) {
233             failure = "unable to format exception message";
234             goto finally;
235         }
236         err->msg = _copy_raw_string(msg);
237         Py_DECREF(msg);
238         if (err->msg == NULL) {
239             if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
240                 failure = "out of memory copying exception message";
241             } else {
242                 failure = "unable to encode and copy exception message";
243             }
244             goto finally;
245         }
246     }
247 
248 finally:
249     if (failure != NULL) {
250         PyErr_Clear();
251         if (err->name != NULL) {
252             PyMem_Free(err->name);
253             err->name = NULL;
254         }
255         err->msg = failure;
256     }
257     return err;
258 }
259 
260 static void
_sharedexception_apply(_sharedexception * exc,PyObject * wrapperclass)261 _sharedexception_apply(_sharedexception *exc, PyObject *wrapperclass)
262 {
263     if (exc->name != NULL) {
264         if (exc->msg != NULL) {
265             PyErr_Format(wrapperclass, "%s: %s",  exc->name, exc->msg);
266         }
267         else {
268             PyErr_SetString(wrapperclass, exc->name);
269         }
270     }
271     else if (exc->msg != NULL) {
272         PyErr_SetString(wrapperclass, exc->msg);
273     }
274     else {
275         PyErr_SetNone(wrapperclass);
276     }
277 }
278 
279 
280 /* channel-specific code ****************************************************/
281 
282 #define CHANNEL_SEND 1
283 #define CHANNEL_BOTH 0
284 #define CHANNEL_RECV -1
285 
286 static PyObject *ChannelError;
287 static PyObject *ChannelNotFoundError;
288 static PyObject *ChannelClosedError;
289 static PyObject *ChannelEmptyError;
290 static PyObject *ChannelNotEmptyError;
291 
292 static int
channel_exceptions_init(PyObject * ns)293 channel_exceptions_init(PyObject *ns)
294 {
295     // XXX Move the exceptions into per-module memory?
296 
297     // A channel-related operation failed.
298     ChannelError = PyErr_NewException("_xxsubinterpreters.ChannelError",
299                                       PyExc_RuntimeError, NULL);
300     if (ChannelError == NULL) {
301         return -1;
302     }
303     if (PyDict_SetItemString(ns, "ChannelError", ChannelError) != 0) {
304         return -1;
305     }
306 
307     // An operation tried to use a channel that doesn't exist.
308     ChannelNotFoundError = PyErr_NewException(
309             "_xxsubinterpreters.ChannelNotFoundError", ChannelError, NULL);
310     if (ChannelNotFoundError == NULL) {
311         return -1;
312     }
313     if (PyDict_SetItemString(ns, "ChannelNotFoundError", ChannelNotFoundError) != 0) {
314         return -1;
315     }
316 
317     // An operation tried to use a closed channel.
318     ChannelClosedError = PyErr_NewException(
319             "_xxsubinterpreters.ChannelClosedError", ChannelError, NULL);
320     if (ChannelClosedError == NULL) {
321         return -1;
322     }
323     if (PyDict_SetItemString(ns, "ChannelClosedError", ChannelClosedError) != 0) {
324         return -1;
325     }
326 
327     // An operation tried to pop from an empty channel.
328     ChannelEmptyError = PyErr_NewException(
329             "_xxsubinterpreters.ChannelEmptyError", ChannelError, NULL);
330     if (ChannelEmptyError == NULL) {
331         return -1;
332     }
333     if (PyDict_SetItemString(ns, "ChannelEmptyError", ChannelEmptyError) != 0) {
334         return -1;
335     }
336 
337     // An operation tried to close a non-empty channel.
338     ChannelNotEmptyError = PyErr_NewException(
339             "_xxsubinterpreters.ChannelNotEmptyError", ChannelError, NULL);
340     if (ChannelNotEmptyError == NULL) {
341         return -1;
342     }
343     if (PyDict_SetItemString(ns, "ChannelNotEmptyError", ChannelNotEmptyError) != 0) {
344         return -1;
345     }
346 
347     return 0;
348 }
349 
350 /* the channel queue */
351 
352 struct _channelitem;
353 
354 typedef struct _channelitem {
355     _PyCrossInterpreterData *data;
356     struct _channelitem *next;
357 } _channelitem;
358 
359 static _channelitem *
_channelitem_new(void)360 _channelitem_new(void)
361 {
362     _channelitem *item = PyMem_NEW(_channelitem, 1);
363     if (item == NULL) {
364         PyErr_NoMemory();
365         return NULL;
366     }
367     item->data = NULL;
368     item->next = NULL;
369     return item;
370 }
371 
372 static void
_channelitem_clear(_channelitem * item)373 _channelitem_clear(_channelitem *item)
374 {
375     if (item->data != NULL) {
376         _PyCrossInterpreterData_Release(item->data);
377         PyMem_Free(item->data);
378         item->data = NULL;
379     }
380     item->next = NULL;
381 }
382 
383 static void
_channelitem_free(_channelitem * item)384 _channelitem_free(_channelitem *item)
385 {
386     _channelitem_clear(item);
387     PyMem_Free(item);
388 }
389 
390 static void
_channelitem_free_all(_channelitem * item)391 _channelitem_free_all(_channelitem *item)
392 {
393     while (item != NULL) {
394         _channelitem *last = item;
395         item = item->next;
396         _channelitem_free(last);
397     }
398 }
399 
400 static _PyCrossInterpreterData *
_channelitem_popped(_channelitem * item)401 _channelitem_popped(_channelitem *item)
402 {
403     _PyCrossInterpreterData *data = item->data;
404     item->data = NULL;
405     _channelitem_free(item);
406     return data;
407 }
408 
409 typedef struct _channelqueue {
410     int64_t count;
411     _channelitem *first;
412     _channelitem *last;
413 } _channelqueue;
414 
415 static _channelqueue *
_channelqueue_new(void)416 _channelqueue_new(void)
417 {
418     _channelqueue *queue = PyMem_NEW(_channelqueue, 1);
419     if (queue == NULL) {
420         PyErr_NoMemory();
421         return NULL;
422     }
423     queue->count = 0;
424     queue->first = NULL;
425     queue->last = NULL;
426     return queue;
427 }
428 
429 static void
_channelqueue_clear(_channelqueue * queue)430 _channelqueue_clear(_channelqueue *queue)
431 {
432     _channelitem_free_all(queue->first);
433     queue->count = 0;
434     queue->first = NULL;
435     queue->last = NULL;
436 }
437 
438 static void
_channelqueue_free(_channelqueue * queue)439 _channelqueue_free(_channelqueue *queue)
440 {
441     _channelqueue_clear(queue);
442     PyMem_Free(queue);
443 }
444 
445 static int
_channelqueue_put(_channelqueue * queue,_PyCrossInterpreterData * data)446 _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
447 {
448     _channelitem *item = _channelitem_new();
449     if (item == NULL) {
450         return -1;
451     }
452     item->data = data;
453 
454     queue->count += 1;
455     if (queue->first == NULL) {
456         queue->first = item;
457     }
458     else {
459         queue->last->next = item;
460     }
461     queue->last = item;
462     return 0;
463 }
464 
465 static _PyCrossInterpreterData *
_channelqueue_get(_channelqueue * queue)466 _channelqueue_get(_channelqueue *queue)
467 {
468     _channelitem *item = queue->first;
469     if (item == NULL) {
470         return NULL;
471     }
472     queue->first = item->next;
473     if (queue->last == item) {
474         queue->last = NULL;
475     }
476     queue->count -= 1;
477 
478     return _channelitem_popped(item);
479 }
480 
481 /* channel-interpreter associations */
482 
483 struct _channelend;
484 
485 typedef struct _channelend {
486     struct _channelend *next;
487     int64_t interp;
488     int open;
489 } _channelend;
490 
491 static _channelend *
_channelend_new(int64_t interp)492 _channelend_new(int64_t interp)
493 {
494     _channelend *end = PyMem_NEW(_channelend, 1);
495     if (end == NULL) {
496         PyErr_NoMemory();
497         return NULL;
498     }
499     end->next = NULL;
500     end->interp = interp;
501     end->open = 1;
502     return end;
503 }
504 
505 static void
_channelend_free(_channelend * end)506 _channelend_free(_channelend *end)
507 {
508     PyMem_Free(end);
509 }
510 
511 static void
_channelend_free_all(_channelend * end)512 _channelend_free_all(_channelend *end)
513 {
514     while (end != NULL) {
515         _channelend *last = end;
516         end = end->next;
517         _channelend_free(last);
518     }
519 }
520 
521 static _channelend *
_channelend_find(_channelend * first,int64_t interp,_channelend ** pprev)522 _channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
523 {
524     _channelend *prev = NULL;
525     _channelend *end = first;
526     while (end != NULL) {
527         if (end->interp == interp) {
528             break;
529         }
530         prev = end;
531         end = end->next;
532     }
533     if (pprev != NULL) {
534         *pprev = prev;
535     }
536     return end;
537 }
538 
539 typedef struct _channelassociations {
540     // Note that the list entries are never removed for interpreter
541     // for which the channel is closed.  This should not be a problem in
542     // practice.  Also, a channel isn't automatically closed when an
543     // interpreter is destroyed.
544     int64_t numsendopen;
545     int64_t numrecvopen;
546     _channelend *send;
547     _channelend *recv;
548 } _channelends;
549 
550 static _channelends *
_channelends_new(void)551 _channelends_new(void)
552 {
553     _channelends *ends = PyMem_NEW(_channelends, 1);
554     if (ends== NULL) {
555         return NULL;
556     }
557     ends->numsendopen = 0;
558     ends->numrecvopen = 0;
559     ends->send = NULL;
560     ends->recv = NULL;
561     return ends;
562 }
563 
564 static void
_channelends_clear(_channelends * ends)565 _channelends_clear(_channelends *ends)
566 {
567     _channelend_free_all(ends->send);
568     ends->send = NULL;
569     ends->numsendopen = 0;
570 
571     _channelend_free_all(ends->recv);
572     ends->recv = NULL;
573     ends->numrecvopen = 0;
574 }
575 
576 static void
_channelends_free(_channelends * ends)577 _channelends_free(_channelends *ends)
578 {
579     _channelends_clear(ends);
580     PyMem_Free(ends);
581 }
582 
583 static _channelend *
_channelends_add(_channelends * ends,_channelend * prev,int64_t interp,int send)584 _channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
585                  int send)
586 {
587     _channelend *end = _channelend_new(interp);
588     if (end == NULL) {
589         return NULL;
590     }
591 
592     if (prev == NULL) {
593         if (send) {
594             ends->send = end;
595         }
596         else {
597             ends->recv = end;
598         }
599     }
600     else {
601         prev->next = end;
602     }
603     if (send) {
604         ends->numsendopen += 1;
605     }
606     else {
607         ends->numrecvopen += 1;
608     }
609     return end;
610 }
611 
612 static int
_channelends_associate(_channelends * ends,int64_t interp,int send)613 _channelends_associate(_channelends *ends, int64_t interp, int send)
614 {
615     _channelend *prev;
616     _channelend *end = _channelend_find(send ? ends->send : ends->recv,
617                                         interp, &prev);
618     if (end != NULL) {
619         if (!end->open) {
620             PyErr_SetString(ChannelClosedError, "channel already closed");
621             return -1;
622         }
623         // already associated
624         return 0;
625     }
626     if (_channelends_add(ends, prev, interp, send) == NULL) {
627         return -1;
628     }
629     return 0;
630 }
631 
632 static int
_channelends_is_open(_channelends * ends)633 _channelends_is_open(_channelends *ends)
634 {
635     if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
636         return 1;
637     }
638     if (ends->send == NULL && ends->recv == NULL) {
639         return 1;
640     }
641     return 0;
642 }
643 
644 static void
_channelends_close_end(_channelends * ends,_channelend * end,int send)645 _channelends_close_end(_channelends *ends, _channelend *end, int send)
646 {
647     end->open = 0;
648     if (send) {
649         ends->numsendopen -= 1;
650     }
651     else {
652         ends->numrecvopen -= 1;
653     }
654 }
655 
656 static int
_channelends_close_interpreter(_channelends * ends,int64_t interp,int which)657 _channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
658 {
659     _channelend *prev;
660     _channelend *end;
661     if (which >= 0) {  // send/both
662         end = _channelend_find(ends->send, interp, &prev);
663         if (end == NULL) {
664             // never associated so add it
665             end = _channelends_add(ends, prev, interp, 1);
666             if (end == NULL) {
667                 return -1;
668             }
669         }
670         _channelends_close_end(ends, end, 1);
671     }
672     if (which <= 0) {  // recv/both
673         end = _channelend_find(ends->recv, interp, &prev);
674         if (end == NULL) {
675             // never associated so add it
676             end = _channelends_add(ends, prev, interp, 0);
677             if (end == NULL) {
678                 return -1;
679             }
680         }
681         _channelends_close_end(ends, end, 0);
682     }
683     return 0;
684 }
685 
686 static void
_channelends_close_all(_channelends * ends,int which,int force)687 _channelends_close_all(_channelends *ends, int which, int force)
688 {
689     // XXX Handle the ends.
690     // XXX Handle force is True.
691 
692     // Ensure all the "send"-associated interpreters are closed.
693     _channelend *end;
694     for (end = ends->send; end != NULL; end = end->next) {
695         _channelends_close_end(ends, end, 1);
696     }
697 
698     // Ensure all the "recv"-associated interpreters are closed.
699     for (end = ends->recv; end != NULL; end = end->next) {
700         _channelends_close_end(ends, end, 0);
701     }
702 }
703 
704 /* channels */
705 
706 struct _channel;
707 struct _channel_closing;
708 static void _channel_clear_closing(struct _channel *);
709 static void _channel_finish_closing(struct _channel *);
710 
711 typedef struct _channel {
712     PyThread_type_lock mutex;
713     _channelqueue *queue;
714     _channelends *ends;
715     int open;
716     struct _channel_closing *closing;
717 } _PyChannelState;
718 
719 static _PyChannelState *
_channel_new(void)720 _channel_new(void)
721 {
722     _PyChannelState *chan = PyMem_NEW(_PyChannelState, 1);
723     if (chan == NULL) {
724         return NULL;
725     }
726     chan->mutex = PyThread_allocate_lock();
727     if (chan->mutex == NULL) {
728         PyMem_Free(chan);
729         PyErr_SetString(ChannelError,
730                         "can't initialize mutex for new channel");
731         return NULL;
732     }
733     chan->queue = _channelqueue_new();
734     if (chan->queue == NULL) {
735         PyMem_Free(chan);
736         return NULL;
737     }
738     chan->ends = _channelends_new();
739     if (chan->ends == NULL) {
740         _channelqueue_free(chan->queue);
741         PyMem_Free(chan);
742         return NULL;
743     }
744     chan->open = 1;
745     chan->closing = NULL;
746     return chan;
747 }
748 
749 static void
_channel_free(_PyChannelState * chan)750 _channel_free(_PyChannelState *chan)
751 {
752     _channel_clear_closing(chan);
753     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
754     _channelqueue_free(chan->queue);
755     _channelends_free(chan->ends);
756     PyThread_release_lock(chan->mutex);
757 
758     PyThread_free_lock(chan->mutex);
759     PyMem_Free(chan);
760 }
761 
762 static int
_channel_add(_PyChannelState * chan,int64_t interp,_PyCrossInterpreterData * data)763 _channel_add(_PyChannelState *chan, int64_t interp,
764              _PyCrossInterpreterData *data)
765 {
766     int res = -1;
767     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
768 
769     if (!chan->open) {
770         PyErr_SetString(ChannelClosedError, "channel closed");
771         goto done;
772     }
773     if (_channelends_associate(chan->ends, interp, 1) != 0) {
774         goto done;
775     }
776 
777     if (_channelqueue_put(chan->queue, data) != 0) {
778         goto done;
779     }
780 
781     res = 0;
782 done:
783     PyThread_release_lock(chan->mutex);
784     return res;
785 }
786 
787 static _PyCrossInterpreterData *
_channel_next(_PyChannelState * chan,int64_t interp)788 _channel_next(_PyChannelState *chan, int64_t interp)
789 {
790     _PyCrossInterpreterData *data = NULL;
791     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
792 
793     if (!chan->open) {
794         PyErr_SetString(ChannelClosedError, "channel closed");
795         goto done;
796     }
797     if (_channelends_associate(chan->ends, interp, 0) != 0) {
798         goto done;
799     }
800 
801     data = _channelqueue_get(chan->queue);
802     if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) {
803         chan->open = 0;
804     }
805 
806 done:
807     PyThread_release_lock(chan->mutex);
808     if (chan->queue->count == 0) {
809         _channel_finish_closing(chan);
810     }
811     return data;
812 }
813 
814 static int
_channel_close_interpreter(_PyChannelState * chan,int64_t interp,int end)815 _channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
816 {
817     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
818 
819     int res = -1;
820     if (!chan->open) {
821         PyErr_SetString(ChannelClosedError, "channel already closed");
822         goto done;
823     }
824 
825     if (_channelends_close_interpreter(chan->ends, interp, end) != 0) {
826         goto done;
827     }
828     chan->open = _channelends_is_open(chan->ends);
829 
830     res = 0;
831 done:
832     PyThread_release_lock(chan->mutex);
833     return res;
834 }
835 
836 static int
_channel_close_all(_PyChannelState * chan,int end,int force)837 _channel_close_all(_PyChannelState *chan, int end, int force)
838 {
839     int res = -1;
840     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
841 
842     if (!chan->open) {
843         PyErr_SetString(ChannelClosedError, "channel already closed");
844         goto done;
845     }
846 
847     if (!force && chan->queue->count > 0) {
848         PyErr_SetString(ChannelNotEmptyError,
849                         "may not be closed if not empty (try force=True)");
850         goto done;
851     }
852 
853     chan->open = 0;
854 
855     // We *could* also just leave these in place, since we've marked
856     // the channel as closed already.
857     _channelends_close_all(chan->ends, end, force);
858 
859     res = 0;
860 done:
861     PyThread_release_lock(chan->mutex);
862     return res;
863 }
864 
865 /* the set of channels */
866 
867 struct _channelref;
868 
869 typedef struct _channelref {
870     int64_t id;
871     _PyChannelState *chan;
872     struct _channelref *next;
873     Py_ssize_t objcount;
874 } _channelref;
875 
876 static _channelref *
_channelref_new(int64_t id,_PyChannelState * chan)877 _channelref_new(int64_t id, _PyChannelState *chan)
878 {
879     _channelref *ref = PyMem_NEW(_channelref, 1);
880     if (ref == NULL) {
881         return NULL;
882     }
883     ref->id = id;
884     ref->chan = chan;
885     ref->next = NULL;
886     ref->objcount = 0;
887     return ref;
888 }
889 
890 //static void
891 //_channelref_clear(_channelref *ref)
892 //{
893 //    ref->id = -1;
894 //    ref->chan = NULL;
895 //    ref->next = NULL;
896 //    ref->objcount = 0;
897 //}
898 
899 static void
_channelref_free(_channelref * ref)900 _channelref_free(_channelref *ref)
901 {
902     if (ref->chan != NULL) {
903         _channel_clear_closing(ref->chan);
904     }
905     //_channelref_clear(ref);
906     PyMem_Free(ref);
907 }
908 
909 static _channelref *
_channelref_find(_channelref * first,int64_t id,_channelref ** pprev)910 _channelref_find(_channelref *first, int64_t id, _channelref **pprev)
911 {
912     _channelref *prev = NULL;
913     _channelref *ref = first;
914     while (ref != NULL) {
915         if (ref->id == id) {
916             break;
917         }
918         prev = ref;
919         ref = ref->next;
920     }
921     if (pprev != NULL) {
922         *pprev = prev;
923     }
924     return ref;
925 }
926 
927 typedef struct _channels {
928     PyThread_type_lock mutex;
929     _channelref *head;
930     int64_t numopen;
931     int64_t next_id;
932 } _channels;
933 
934 static int
_channels_init(_channels * channels)935 _channels_init(_channels *channels)
936 {
937     if (channels->mutex == NULL) {
938         channels->mutex = PyThread_allocate_lock();
939         if (channels->mutex == NULL) {
940             PyErr_SetString(ChannelError,
941                             "can't initialize mutex for channel management");
942             return -1;
943         }
944     }
945     channels->head = NULL;
946     channels->numopen = 0;
947     channels->next_id = 0;
948     return 0;
949 }
950 
951 static int64_t
_channels_next_id(_channels * channels)952 _channels_next_id(_channels *channels)  // needs lock
953 {
954     int64_t id = channels->next_id;
955     if (id < 0) {
956         /* overflow */
957         PyErr_SetString(ChannelError,
958                         "failed to get a channel ID");
959         return -1;
960     }
961     channels->next_id += 1;
962     return id;
963 }
964 
965 static _PyChannelState *
_channels_lookup(_channels * channels,int64_t id,PyThread_type_lock * pmutex)966 _channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex)
967 {
968     _PyChannelState *chan = NULL;
969     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
970     if (pmutex != NULL) {
971         *pmutex = NULL;
972     }
973 
974     _channelref *ref = _channelref_find(channels->head, id, NULL);
975     if (ref == NULL) {
976         PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
977         goto done;
978     }
979     if (ref->chan == NULL || !ref->chan->open) {
980         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id);
981         goto done;
982     }
983 
984     if (pmutex != NULL) {
985         // The mutex will be closed by the caller.
986         *pmutex = channels->mutex;
987     }
988 
989     chan = ref->chan;
990 done:
991     if (pmutex == NULL || *pmutex == NULL) {
992         PyThread_release_lock(channels->mutex);
993     }
994     return chan;
995 }
996 
997 static int64_t
_channels_add(_channels * channels,_PyChannelState * chan)998 _channels_add(_channels *channels, _PyChannelState *chan)
999 {
1000     int64_t cid = -1;
1001     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1002 
1003     // Create a new ref.
1004     int64_t id = _channels_next_id(channels);
1005     if (id < 0) {
1006         goto done;
1007     }
1008     _channelref *ref = _channelref_new(id, chan);
1009     if (ref == NULL) {
1010         goto done;
1011     }
1012 
1013     // Add it to the list.
1014     // We assume that the channel is a new one (not already in the list).
1015     ref->next = channels->head;
1016     channels->head = ref;
1017     channels->numopen += 1;
1018 
1019     cid = id;
1020 done:
1021     PyThread_release_lock(channels->mutex);
1022     return cid;
1023 }
1024 
1025 /* forward */
1026 static int _channel_set_closing(struct _channelref *, PyThread_type_lock);
1027 
1028 static int
_channels_close(_channels * channels,int64_t cid,_PyChannelState ** pchan,int end,int force)1029 _channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan,
1030                 int end, int force)
1031 {
1032     int res = -1;
1033     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1034     if (pchan != NULL) {
1035         *pchan = NULL;
1036     }
1037 
1038     _channelref *ref = _channelref_find(channels->head, cid, NULL);
1039     if (ref == NULL) {
1040         PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", cid);
1041         goto done;
1042     }
1043 
1044     if (ref->chan == NULL) {
1045         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
1046         goto done;
1047     }
1048     else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) {
1049         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
1050         goto done;
1051     }
1052     else {
1053         if (_channel_close_all(ref->chan, end, force) != 0) {
1054             if (end == CHANNEL_SEND &&
1055                     PyErr_ExceptionMatches(ChannelNotEmptyError)) {
1056                 if (ref->chan->closing != NULL) {
1057                     PyErr_Format(ChannelClosedError,
1058                                  "channel %" PRId64 " closed", cid);
1059                     goto done;
1060                 }
1061                 // Mark the channel as closing and return.  The channel
1062                 // will be cleaned up in _channel_next().
1063                 PyErr_Clear();
1064                 if (_channel_set_closing(ref, channels->mutex) != 0) {
1065                     goto done;
1066                 }
1067                 if (pchan != NULL) {
1068                     *pchan = ref->chan;
1069                 }
1070                 res = 0;
1071             }
1072             goto done;
1073         }
1074         if (pchan != NULL) {
1075             *pchan = ref->chan;
1076         }
1077         else  {
1078             _channel_free(ref->chan);
1079         }
1080         ref->chan = NULL;
1081     }
1082 
1083     res = 0;
1084 done:
1085     PyThread_release_lock(channels->mutex);
1086     return res;
1087 }
1088 
1089 static void
_channels_remove_ref(_channels * channels,_channelref * ref,_channelref * prev,_PyChannelState ** pchan)1090 _channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
1091                      _PyChannelState **pchan)
1092 {
1093     if (ref == channels->head) {
1094         channels->head = ref->next;
1095     }
1096     else {
1097         prev->next = ref->next;
1098     }
1099     channels->numopen -= 1;
1100 
1101     if (pchan != NULL) {
1102         *pchan = ref->chan;
1103     }
1104     _channelref_free(ref);
1105 }
1106 
1107 static int
_channels_remove(_channels * channels,int64_t id,_PyChannelState ** pchan)1108 _channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan)
1109 {
1110     int res = -1;
1111     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1112 
1113     if (pchan != NULL) {
1114         *pchan = NULL;
1115     }
1116 
1117     _channelref *prev = NULL;
1118     _channelref *ref = _channelref_find(channels->head, id, &prev);
1119     if (ref == NULL) {
1120         PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
1121         goto done;
1122     }
1123 
1124     _channels_remove_ref(channels, ref, prev, pchan);
1125 
1126     res = 0;
1127 done:
1128     PyThread_release_lock(channels->mutex);
1129     return res;
1130 }
1131 
1132 static int
_channels_add_id_object(_channels * channels,int64_t id)1133 _channels_add_id_object(_channels *channels, int64_t id)
1134 {
1135     int res = -1;
1136     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1137 
1138     _channelref *ref = _channelref_find(channels->head, id, NULL);
1139     if (ref == NULL) {
1140         PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
1141         goto done;
1142     }
1143     ref->objcount += 1;
1144 
1145     res = 0;
1146 done:
1147     PyThread_release_lock(channels->mutex);
1148     return res;
1149 }
1150 
1151 static void
_channels_drop_id_object(_channels * channels,int64_t id)1152 _channels_drop_id_object(_channels *channels, int64_t id)
1153 {
1154     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1155 
1156     _channelref *prev = NULL;
1157     _channelref *ref = _channelref_find(channels->head, id, &prev);
1158     if (ref == NULL) {
1159         // Already destroyed.
1160         goto done;
1161     }
1162     ref->objcount -= 1;
1163 
1164     // Destroy if no longer used.
1165     if (ref->objcount == 0) {
1166         _PyChannelState *chan = NULL;
1167         _channels_remove_ref(channels, ref, prev, &chan);
1168         if (chan != NULL) {
1169             _channel_free(chan);
1170         }
1171     }
1172 
1173 done:
1174     PyThread_release_lock(channels->mutex);
1175 }
1176 
1177 static int64_t *
_channels_list_all(_channels * channels,int64_t * count)1178 _channels_list_all(_channels *channels, int64_t *count)
1179 {
1180     int64_t *cids = NULL;
1181     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1182     int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
1183     if (ids == NULL) {
1184         goto done;
1185     }
1186     _channelref *ref = channels->head;
1187     for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
1188         ids[i] = ref->id;
1189     }
1190     *count = channels->numopen;
1191 
1192     cids = ids;
1193 done:
1194     PyThread_release_lock(channels->mutex);
1195     return cids;
1196 }
1197 
1198 /* support for closing non-empty channels */
1199 
1200 struct _channel_closing {
1201     struct _channelref *ref;
1202 };
1203 
1204 static int
_channel_set_closing(struct _channelref * ref,PyThread_type_lock mutex)1205 _channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
1206     struct _channel *chan = ref->chan;
1207     if (chan == NULL) {
1208         // already closed
1209         return 0;
1210     }
1211     int res = -1;
1212     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
1213     if (chan->closing != NULL) {
1214         PyErr_SetString(ChannelClosedError, "channel closed");
1215         goto done;
1216     }
1217     chan->closing = PyMem_NEW(struct _channel_closing, 1);
1218     if (chan->closing == NULL) {
1219         goto done;
1220     }
1221     chan->closing->ref = ref;
1222 
1223     res = 0;
1224 done:
1225     PyThread_release_lock(chan->mutex);
1226     return res;
1227 }
1228 
1229 static void
_channel_clear_closing(struct _channel * chan)1230 _channel_clear_closing(struct _channel *chan) {
1231     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
1232     if (chan->closing != NULL) {
1233         PyMem_Free(chan->closing);
1234         chan->closing = NULL;
1235     }
1236     PyThread_release_lock(chan->mutex);
1237 }
1238 
1239 static void
_channel_finish_closing(struct _channel * chan)1240 _channel_finish_closing(struct _channel *chan) {
1241     struct _channel_closing *closing = chan->closing;
1242     if (closing == NULL) {
1243         return;
1244     }
1245     _channelref *ref = closing->ref;
1246     _channel_clear_closing(chan);
1247     // Do the things that would have been done in _channels_close().
1248     ref->chan = NULL;
1249     _channel_free(chan);
1250 }
1251 
1252 /* "high"-level channel-related functions */
1253 
1254 static int64_t
_channel_create(_channels * channels)1255 _channel_create(_channels *channels)
1256 {
1257     _PyChannelState *chan = _channel_new();
1258     if (chan == NULL) {
1259         return -1;
1260     }
1261     int64_t id = _channels_add(channels, chan);
1262     if (id < 0) {
1263         _channel_free(chan);
1264         return -1;
1265     }
1266     return id;
1267 }
1268 
1269 static int
_channel_destroy(_channels * channels,int64_t id)1270 _channel_destroy(_channels *channels, int64_t id)
1271 {
1272     _PyChannelState *chan = NULL;
1273     if (_channels_remove(channels, id, &chan) != 0) {
1274         return -1;
1275     }
1276     if (chan != NULL) {
1277         _channel_free(chan);
1278     }
1279     return 0;
1280 }
1281 
1282 static int
_channel_send(_channels * channels,int64_t id,PyObject * obj)1283 _channel_send(_channels *channels, int64_t id, PyObject *obj)
1284 {
1285     PyInterpreterState *interp = _get_current();
1286     if (interp == NULL) {
1287         return -1;
1288     }
1289 
1290     // Look up the channel.
1291     PyThread_type_lock mutex = NULL;
1292     _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1293     if (chan == NULL) {
1294         return -1;
1295     }
1296     // Past this point we are responsible for releasing the mutex.
1297 
1298     if (chan->closing != NULL) {
1299         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id);
1300         PyThread_release_lock(mutex);
1301         return -1;
1302     }
1303 
1304     // Convert the object to cross-interpreter data.
1305     _PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1);
1306     if (data == NULL) {
1307         PyThread_release_lock(mutex);
1308         return -1;
1309     }
1310     if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
1311         PyThread_release_lock(mutex);
1312         PyMem_Free(data);
1313         return -1;
1314     }
1315 
1316     // Add the data to the channel.
1317     int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
1318     PyThread_release_lock(mutex);
1319     if (res != 0) {
1320         _PyCrossInterpreterData_Release(data);
1321         PyMem_Free(data);
1322         return -1;
1323     }
1324 
1325     return 0;
1326 }
1327 
1328 static PyObject *
_channel_recv(_channels * channels,int64_t id)1329 _channel_recv(_channels *channels, int64_t id)
1330 {
1331     PyInterpreterState *interp = _get_current();
1332     if (interp == NULL) {
1333         return NULL;
1334     }
1335 
1336     // Look up the channel.
1337     PyThread_type_lock mutex = NULL;
1338     _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1339     if (chan == NULL) {
1340         return NULL;
1341     }
1342     // Past this point we are responsible for releasing the mutex.
1343 
1344     // Pop off the next item from the channel.
1345     _PyCrossInterpreterData *data = _channel_next(chan, PyInterpreterState_GetID(interp));
1346     PyThread_release_lock(mutex);
1347     if (data == NULL) {
1348         return NULL;
1349     }
1350 
1351     // Convert the data back to an object.
1352     PyObject *obj = _PyCrossInterpreterData_NewObject(data);
1353     _PyCrossInterpreterData_Release(data);
1354     PyMem_Free(data);
1355     if (obj == NULL) {
1356         return NULL;
1357     }
1358 
1359     return obj;
1360 }
1361 
1362 static int
_channel_drop(_channels * channels,int64_t id,int send,int recv)1363 _channel_drop(_channels *channels, int64_t id, int send, int recv)
1364 {
1365     PyInterpreterState *interp = _get_current();
1366     if (interp == NULL) {
1367         return -1;
1368     }
1369 
1370     // Look up the channel.
1371     PyThread_type_lock mutex = NULL;
1372     _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1373     if (chan == NULL) {
1374         return -1;
1375     }
1376     // Past this point we are responsible for releasing the mutex.
1377 
1378     // Close one or both of the two ends.
1379     int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv);
1380     PyThread_release_lock(mutex);
1381     return res;
1382 }
1383 
1384 static int
_channel_close(_channels * channels,int64_t id,int end,int force)1385 _channel_close(_channels *channels, int64_t id, int end, int force)
1386 {
1387     return _channels_close(channels, id, NULL, end, force);
1388 }
1389 
1390 static int
_channel_is_associated(_channels * channels,int64_t cid,int64_t interp,int send)1391 _channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
1392                        int send)
1393 {
1394     _PyChannelState *chan = _channels_lookup(channels, cid, NULL);
1395     if (chan == NULL) {
1396         return -1;
1397     } else if (send && chan->closing != NULL) {
1398         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
1399         return -1;
1400     }
1401 
1402     _channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
1403                                         interp, NULL);
1404 
1405     return (end != NULL && end->open);
1406 }
1407 
1408 /* ChannelID class */
1409 
1410 static PyTypeObject ChannelIDtype;
1411 
1412 typedef struct channelid {
1413     PyObject_HEAD
1414     int64_t id;
1415     int end;
1416     int resolve;
1417     _channels *channels;
1418 } channelid;
1419 
1420 static int
channel_id_converter(PyObject * arg,void * ptr)1421 channel_id_converter(PyObject *arg, void *ptr)
1422 {
1423     int64_t cid;
1424     if (PyObject_TypeCheck(arg, &ChannelIDtype)) {
1425         cid = ((channelid *)arg)->id;
1426     }
1427     else if (PyIndex_Check(arg)) {
1428         cid = PyLong_AsLongLong(arg);
1429         if (cid == -1 && PyErr_Occurred()) {
1430             return 0;
1431         }
1432         if (cid < 0) {
1433             PyErr_Format(PyExc_ValueError,
1434                         "channel ID must be a non-negative int, got %R", arg);
1435             return 0;
1436         }
1437     }
1438     else {
1439         PyErr_Format(PyExc_TypeError,
1440                      "channel ID must be an int, got %.100s",
1441                      Py_TYPE(arg)->tp_name);
1442         return 0;
1443     }
1444     *(int64_t *)ptr = cid;
1445     return 1;
1446 }
1447 
1448 static channelid *
newchannelid(PyTypeObject * cls,int64_t cid,int end,_channels * channels,int force,int resolve)1449 newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
1450              int force, int resolve)
1451 {
1452     channelid *self = PyObject_New(channelid, cls);
1453     if (self == NULL) {
1454         return NULL;
1455     }
1456     self->id = cid;
1457     self->end = end;
1458     self->resolve = resolve;
1459     self->channels = channels;
1460 
1461     if (_channels_add_id_object(channels, cid) != 0) {
1462         if (force && PyErr_ExceptionMatches(ChannelNotFoundError)) {
1463             PyErr_Clear();
1464         }
1465         else {
1466             Py_DECREF((PyObject *)self);
1467             return NULL;
1468         }
1469     }
1470 
1471     return self;
1472 }
1473 
1474 static _channels * _global_channels(void);
1475 
1476 static PyObject *
channelid_new(PyTypeObject * cls,PyObject * args,PyObject * kwds)1477 channelid_new(PyTypeObject *cls, PyObject *args, PyObject *kwds)
1478 {
1479     static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL};
1480     int64_t cid;
1481     int send = -1;
1482     int recv = -1;
1483     int force = 0;
1484     int resolve = 0;
1485     if (!PyArg_ParseTupleAndKeywords(args, kwds,
1486                                      "O&|$pppp:ChannelID.__new__", kwlist,
1487                                      channel_id_converter, &cid, &send, &recv, &force, &resolve))
1488         return NULL;
1489 
1490     // Handle "send" and "recv".
1491     if (send == 0 && recv == 0) {
1492         PyErr_SetString(PyExc_ValueError,
1493                         "'send' and 'recv' cannot both be False");
1494         return NULL;
1495     }
1496 
1497     int end = 0;
1498     if (send == 1) {
1499         if (recv == 0 || recv == -1) {
1500             end = CHANNEL_SEND;
1501         }
1502     }
1503     else if (recv == 1) {
1504         end = CHANNEL_RECV;
1505     }
1506 
1507     return (PyObject *)newchannelid(cls, cid, end, _global_channels(),
1508                                     force, resolve);
1509 }
1510 
1511 static void
channelid_dealloc(PyObject * v)1512 channelid_dealloc(PyObject *v)
1513 {
1514     int64_t cid = ((channelid *)v)->id;
1515     _channels *channels = ((channelid *)v)->channels;
1516     Py_TYPE(v)->tp_free(v);
1517 
1518     _channels_drop_id_object(channels, cid);
1519 }
1520 
1521 static PyObject *
channelid_repr(PyObject * self)1522 channelid_repr(PyObject *self)
1523 {
1524     PyTypeObject *type = Py_TYPE(self);
1525     const char *name = _PyType_Name(type);
1526 
1527     channelid *cid = (channelid *)self;
1528     const char *fmt;
1529     if (cid->end == CHANNEL_SEND) {
1530         fmt = "%s(%" PRId64 ", send=True)";
1531     }
1532     else if (cid->end == CHANNEL_RECV) {
1533         fmt = "%s(%" PRId64 ", recv=True)";
1534     }
1535     else {
1536         fmt = "%s(%" PRId64 ")";
1537     }
1538     return PyUnicode_FromFormat(fmt, name, cid->id);
1539 }
1540 
1541 static PyObject *
channelid_str(PyObject * self)1542 channelid_str(PyObject *self)
1543 {
1544     channelid *cid = (channelid *)self;
1545     return PyUnicode_FromFormat("%" PRId64 "", cid->id);
1546 }
1547 
1548 static PyObject *
channelid_int(PyObject * self)1549 channelid_int(PyObject *self)
1550 {
1551     channelid *cid = (channelid *)self;
1552     return PyLong_FromLongLong(cid->id);
1553 }
1554 
1555 static PyNumberMethods channelid_as_number = {
1556      0,                        /* nb_add */
1557      0,                        /* nb_subtract */
1558      0,                        /* nb_multiply */
1559      0,                        /* nb_remainder */
1560      0,                        /* nb_divmod */
1561      0,                        /* nb_power */
1562      0,                        /* nb_negative */
1563      0,                        /* nb_positive */
1564      0,                        /* nb_absolute */
1565      0,                        /* nb_bool */
1566      0,                        /* nb_invert */
1567      0,                        /* nb_lshift */
1568      0,                        /* nb_rshift */
1569      0,                        /* nb_and */
1570      0,                        /* nb_xor */
1571      0,                        /* nb_or */
1572      (unaryfunc)channelid_int, /* nb_int */
1573      0,                        /* nb_reserved */
1574      0,                        /* nb_float */
1575 
1576      0,                        /* nb_inplace_add */
1577      0,                        /* nb_inplace_subtract */
1578      0,                        /* nb_inplace_multiply */
1579      0,                        /* nb_inplace_remainder */
1580      0,                        /* nb_inplace_power */
1581      0,                        /* nb_inplace_lshift */
1582      0,                        /* nb_inplace_rshift */
1583      0,                        /* nb_inplace_and */
1584      0,                        /* nb_inplace_xor */
1585      0,                        /* nb_inplace_or */
1586 
1587      0,                        /* nb_floor_divide */
1588      0,                        /* nb_true_divide */
1589      0,                        /* nb_inplace_floor_divide */
1590      0,                        /* nb_inplace_true_divide */
1591 
1592      (unaryfunc)channelid_int, /* nb_index */
1593 };
1594 
1595 static Py_hash_t
channelid_hash(PyObject * self)1596 channelid_hash(PyObject *self)
1597 {
1598     channelid *cid = (channelid *)self;
1599     PyObject *id = PyLong_FromLongLong(cid->id);
1600     if (id == NULL) {
1601         return -1;
1602     }
1603     Py_hash_t hash = PyObject_Hash(id);
1604     Py_DECREF(id);
1605     return hash;
1606 }
1607 
1608 static PyObject *
channelid_richcompare(PyObject * self,PyObject * other,int op)1609 channelid_richcompare(PyObject *self, PyObject *other, int op)
1610 {
1611     if (op != Py_EQ && op != Py_NE) {
1612         Py_RETURN_NOTIMPLEMENTED;
1613     }
1614 
1615     if (!PyObject_TypeCheck(self, &ChannelIDtype)) {
1616         Py_RETURN_NOTIMPLEMENTED;
1617     }
1618 
1619     channelid *cid = (channelid *)self;
1620     int equal;
1621     if (PyObject_TypeCheck(other, &ChannelIDtype)) {
1622         channelid *othercid = (channelid *)other;
1623         equal = (cid->end == othercid->end) && (cid->id == othercid->id);
1624     }
1625     else if (PyLong_Check(other)) {
1626         /* Fast path */
1627         int overflow;
1628         long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow);
1629         if (othercid == -1 && PyErr_Occurred()) {
1630             return NULL;
1631         }
1632         equal = !overflow && (othercid >= 0) && (cid->id == othercid);
1633     }
1634     else if (PyNumber_Check(other)) {
1635         PyObject *pyid = PyLong_FromLongLong(cid->id);
1636         if (pyid == NULL) {
1637             return NULL;
1638         }
1639         PyObject *res = PyObject_RichCompare(pyid, other, op);
1640         Py_DECREF(pyid);
1641         return res;
1642     }
1643     else {
1644         Py_RETURN_NOTIMPLEMENTED;
1645     }
1646 
1647     if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) {
1648         Py_RETURN_TRUE;
1649     }
1650     Py_RETURN_FALSE;
1651 }
1652 
1653 static PyObject *
_channel_from_cid(PyObject * cid,int end)1654 _channel_from_cid(PyObject *cid, int end)
1655 {
1656     PyObject *highlevel = PyImport_ImportModule("interpreters");
1657     if (highlevel == NULL) {
1658         PyErr_Clear();
1659         highlevel = PyImport_ImportModule("test.support.interpreters");
1660         if (highlevel == NULL) {
1661             return NULL;
1662         }
1663     }
1664     const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" :
1665                                                   "SendChannel";
1666     PyObject *cls = PyObject_GetAttrString(highlevel, clsname);
1667     Py_DECREF(highlevel);
1668     if (cls == NULL) {
1669         return NULL;
1670     }
1671     PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL);
1672     Py_DECREF(cls);
1673     if (chan == NULL) {
1674         return NULL;
1675     }
1676     return chan;
1677 }
1678 
1679 struct _channelid_xid {
1680     int64_t id;
1681     int end;
1682     int resolve;
1683 };
1684 
1685 static PyObject *
_channelid_from_xid(_PyCrossInterpreterData * data)1686 _channelid_from_xid(_PyCrossInterpreterData *data)
1687 {
1688     struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
1689     // Note that we do not preserve the "resolve" flag.
1690     PyObject *cid = (PyObject *)newchannelid(&ChannelIDtype, xid->id, xid->end,
1691                                              _global_channels(), 0, 0);
1692     if (xid->end == 0) {
1693         return cid;
1694     }
1695     if (!xid->resolve) {
1696         return cid;
1697     }
1698 
1699     /* Try returning a high-level channel end but fall back to the ID. */
1700     PyObject *chan = _channel_from_cid(cid, xid->end);
1701     if (chan == NULL) {
1702         PyErr_Clear();
1703         return cid;
1704     }
1705     Py_DECREF(cid);
1706     return chan;
1707 }
1708 
1709 static int
_channelid_shared(PyObject * obj,_PyCrossInterpreterData * data)1710 _channelid_shared(PyObject *obj, _PyCrossInterpreterData *data)
1711 {
1712     struct _channelid_xid *xid = PyMem_NEW(struct _channelid_xid, 1);
1713     if (xid == NULL) {
1714         return -1;
1715     }
1716     xid->id = ((channelid *)obj)->id;
1717     xid->end = ((channelid *)obj)->end;
1718     xid->resolve = ((channelid *)obj)->resolve;
1719 
1720     data->data = xid;
1721     Py_INCREF(obj);
1722     data->obj = obj;
1723     data->new_object = _channelid_from_xid;
1724     data->free = PyMem_Free;
1725     return 0;
1726 }
1727 
1728 static PyObject *
channelid_end(PyObject * self,void * end)1729 channelid_end(PyObject *self, void *end)
1730 {
1731     int force = 1;
1732     channelid *cid = (channelid *)self;
1733     if (end != NULL) {
1734         return (PyObject *)newchannelid(Py_TYPE(self), cid->id, *(int *)end,
1735                                         cid->channels, force, cid->resolve);
1736     }
1737 
1738     if (cid->end == CHANNEL_SEND) {
1739         return PyUnicode_InternFromString("send");
1740     }
1741     if (cid->end == CHANNEL_RECV) {
1742         return PyUnicode_InternFromString("recv");
1743     }
1744     return PyUnicode_InternFromString("both");
1745 }
1746 
1747 static int _channelid_end_send = CHANNEL_SEND;
1748 static int _channelid_end_recv = CHANNEL_RECV;
1749 
1750 static PyGetSetDef channelid_getsets[] = {
1751     {"end", (getter)channelid_end, NULL,
1752      PyDoc_STR("'send', 'recv', or 'both'")},
1753     {"send", (getter)channelid_end, NULL,
1754      PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send},
1755     {"recv", (getter)channelid_end, NULL,
1756      PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv},
1757     {NULL}
1758 };
1759 
1760 PyDoc_STRVAR(channelid_doc,
1761 "A channel ID identifies a channel and may be used as an int.");
1762 
1763 static PyTypeObject ChannelIDtype = {
1764     PyVarObject_HEAD_INIT(&PyType_Type, 0)
1765     "_xxsubinterpreters.ChannelID", /* tp_name */
1766     sizeof(channelid),              /* tp_basicsize */
1767     0,                              /* tp_itemsize */
1768     (destructor)channelid_dealloc,  /* tp_dealloc */
1769     0,                              /* tp_vectorcall_offset */
1770     0,                              /* tp_getattr */
1771     0,                              /* tp_setattr */
1772     0,                              /* tp_as_async */
1773     (reprfunc)channelid_repr,       /* tp_repr */
1774     &channelid_as_number,           /* tp_as_number */
1775     0,                              /* tp_as_sequence */
1776     0,                              /* tp_as_mapping */
1777     channelid_hash,                 /* tp_hash */
1778     0,                              /* tp_call */
1779     (reprfunc)channelid_str,        /* tp_str */
1780     0,                              /* tp_getattro */
1781     0,                              /* tp_setattro */
1782     0,                              /* tp_as_buffer */
1783     Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
1784     channelid_doc,                  /* tp_doc */
1785     0,                              /* tp_traverse */
1786     0,                              /* tp_clear */
1787     channelid_richcompare,          /* tp_richcompare */
1788     0,                              /* tp_weaklistoffset */
1789     0,                              /* tp_iter */
1790     0,                              /* tp_iternext */
1791     0,                              /* tp_methods */
1792     0,                              /* tp_members */
1793     channelid_getsets,              /* tp_getset */
1794     0,                              /* tp_base */
1795     0,                              /* tp_dict */
1796     0,                              /* tp_descr_get */
1797     0,                              /* tp_descr_set */
1798     0,                              /* tp_dictoffset */
1799     0,                              /* tp_init */
1800     0,                              /* tp_alloc */
1801     // Note that we do not set tp_new to channelid_new.  Instead we
1802     // set it to NULL, meaning it cannot be instantiated from Python
1803     // code.  We do this because there is a strong relationship between
1804     // channel IDs and the channel lifecycle, so this limitation avoids
1805     // related complications.
1806     NULL,                           /* tp_new */
1807 };
1808 
1809 
1810 /* interpreter-specific code ************************************************/
1811 
1812 static PyObject * RunFailedError = NULL;
1813 
1814 static int
interp_exceptions_init(PyObject * ns)1815 interp_exceptions_init(PyObject *ns)
1816 {
1817     // XXX Move the exceptions into per-module memory?
1818 
1819     if (RunFailedError == NULL) {
1820         // An uncaught exception came out of interp_run_string().
1821         RunFailedError = PyErr_NewException("_xxsubinterpreters.RunFailedError",
1822                                             PyExc_RuntimeError, NULL);
1823         if (RunFailedError == NULL) {
1824             return -1;
1825         }
1826         if (PyDict_SetItemString(ns, "RunFailedError", RunFailedError) != 0) {
1827             return -1;
1828         }
1829     }
1830 
1831     return 0;
1832 }
1833 
1834 static int
_is_running(PyInterpreterState * interp)1835 _is_running(PyInterpreterState *interp)
1836 {
1837     PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
1838     if (PyThreadState_Next(tstate) != NULL) {
1839         PyErr_SetString(PyExc_RuntimeError,
1840                         "interpreter has more than one thread");
1841         return -1;
1842     }
1843 
1844     assert(!PyErr_Occurred());
1845     PyFrameObject *frame = PyThreadState_GetFrame(tstate);
1846     if (frame == NULL) {
1847         return 0;
1848     }
1849 
1850     int executing = (int)(frame->f_executing);
1851     Py_DECREF(frame);
1852 
1853     return executing;
1854 }
1855 
1856 static int
_ensure_not_running(PyInterpreterState * interp)1857 _ensure_not_running(PyInterpreterState *interp)
1858 {
1859     int is_running = _is_running(interp);
1860     if (is_running < 0) {
1861         return -1;
1862     }
1863     if (is_running) {
1864         PyErr_Format(PyExc_RuntimeError, "interpreter already running");
1865         return -1;
1866     }
1867     return 0;
1868 }
1869 
1870 static int
_run_script(PyInterpreterState * interp,const char * codestr,_sharedns * shared,_sharedexception ** exc)1871 _run_script(PyInterpreterState *interp, const char *codestr,
1872             _sharedns *shared, _sharedexception **exc)
1873 {
1874     PyObject *exctype = NULL;
1875     PyObject *excval = NULL;
1876     PyObject *tb = NULL;
1877 
1878     PyObject *main_mod = _PyInterpreterState_GetMainModule(interp);
1879     if (main_mod == NULL) {
1880         goto error;
1881     }
1882     PyObject *ns = PyModule_GetDict(main_mod);  // borrowed
1883     Py_DECREF(main_mod);
1884     if (ns == NULL) {
1885         goto error;
1886     }
1887     Py_INCREF(ns);
1888 
1889     // Apply the cross-interpreter data.
1890     if (shared != NULL) {
1891         if (_sharedns_apply(shared, ns) != 0) {
1892             Py_DECREF(ns);
1893             goto error;
1894         }
1895     }
1896 
1897     // Run the string (see PyRun_SimpleStringFlags).
1898     PyObject *result = PyRun_StringFlags(codestr, Py_file_input, ns, ns, NULL);
1899     Py_DECREF(ns);
1900     if (result == NULL) {
1901         goto error;
1902     }
1903     else {
1904         Py_DECREF(result);  // We throw away the result.
1905     }
1906 
1907     *exc = NULL;
1908     return 0;
1909 
1910 error:
1911     PyErr_Fetch(&exctype, &excval, &tb);
1912 
1913     _sharedexception *sharedexc = _sharedexception_bind(exctype, excval, tb);
1914     Py_XDECREF(exctype);
1915     Py_XDECREF(excval);
1916     Py_XDECREF(tb);
1917     if (sharedexc == NULL) {
1918         fprintf(stderr, "RunFailedError: script raised an uncaught exception");
1919         PyErr_Clear();
1920         sharedexc = NULL;
1921     }
1922     else {
1923         assert(!PyErr_Occurred());
1924     }
1925     *exc = sharedexc;
1926     return -1;
1927 }
1928 
1929 static int
_run_script_in_interpreter(PyInterpreterState * interp,const char * codestr,PyObject * shareables)1930 _run_script_in_interpreter(PyInterpreterState *interp, const char *codestr,
1931                            PyObject *shareables)
1932 {
1933     if (_ensure_not_running(interp) < 0) {
1934         return -1;
1935     }
1936 
1937     _sharedns *shared = _get_shared_ns(shareables);
1938     if (shared == NULL && PyErr_Occurred()) {
1939         return -1;
1940     }
1941 
1942     // Switch to interpreter.
1943     PyThreadState *save_tstate = NULL;
1944     if (interp != PyInterpreterState_Get()) {
1945         // XXX Using the "head" thread isn't strictly correct.
1946         PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
1947         // XXX Possible GILState issues?
1948         save_tstate = PyThreadState_Swap(tstate);
1949     }
1950 
1951     // Run the script.
1952     _sharedexception *exc = NULL;
1953     int result = _run_script(interp, codestr, shared, &exc);
1954 
1955     // Switch back.
1956     if (save_tstate != NULL) {
1957         PyThreadState_Swap(save_tstate);
1958     }
1959 
1960     // Propagate any exception out to the caller.
1961     if (exc != NULL) {
1962         _sharedexception_apply(exc, RunFailedError);
1963         _sharedexception_free(exc);
1964     }
1965     else if (result != 0) {
1966         // We were unable to allocate a shared exception.
1967         PyErr_NoMemory();
1968     }
1969 
1970     if (shared != NULL) {
1971         _sharedns_free(shared);
1972     }
1973 
1974     return result;
1975 }
1976 
1977 
1978 /* module level code ********************************************************/
1979 
1980 /* globals is the process-global state for the module.  It holds all
1981    the data that we need to share between interpreters, so it cannot
1982    hold PyObject values. */
1983 static struct globals {
1984     _channels channels;
1985 } _globals = {{0}};
1986 
1987 static int
_init_globals(void)1988 _init_globals(void)
1989 {
1990     if (_channels_init(&_globals.channels) != 0) {
1991         return -1;
1992     }
1993     return 0;
1994 }
1995 
1996 static _channels *
_global_channels(void)1997 _global_channels(void) {
1998     return &_globals.channels;
1999 }
2000 
2001 static PyObject *
interp_create(PyObject * self,PyObject * args,PyObject * kwds)2002 interp_create(PyObject *self, PyObject *args, PyObject *kwds)
2003 {
2004 
2005     static char *kwlist[] = {"isolated", NULL};
2006     int isolated = 1;
2007     if (!PyArg_ParseTupleAndKeywords(args, kwds, "|$i:create", kwlist,
2008                                      &isolated)) {
2009         return NULL;
2010     }
2011 
2012     // Create and initialize the new interpreter.
2013     PyThreadState *save_tstate = PyThreadState_Swap(NULL);
2014     // XXX Possible GILState issues?
2015     PyThreadState *tstate = _Py_NewInterpreter(isolated);
2016     PyThreadState_Swap(save_tstate);
2017     if (tstate == NULL) {
2018         /* Since no new thread state was created, there is no exception to
2019            propagate; raise a fresh one after swapping in the old thread
2020            state. */
2021         PyErr_SetString(PyExc_RuntimeError, "interpreter creation failed");
2022         return NULL;
2023     }
2024     PyInterpreterState *interp = PyThreadState_GetInterpreter(tstate);
2025     PyObject *idobj = _PyInterpreterState_GetIDObject(interp);
2026     if (idobj == NULL) {
2027         // XXX Possible GILState issues?
2028         save_tstate = PyThreadState_Swap(tstate);
2029         Py_EndInterpreter(tstate);
2030         PyThreadState_Swap(save_tstate);
2031         return NULL;
2032     }
2033     _PyInterpreterState_RequireIDRef(interp, 1);
2034     return idobj;
2035 }
2036 
2037 PyDoc_STRVAR(create_doc,
2038 "create() -> ID\n\
2039 \n\
2040 Create a new interpreter and return a unique generated ID.");
2041 
2042 
2043 static PyObject *
interp_destroy(PyObject * self,PyObject * args,PyObject * kwds)2044 interp_destroy(PyObject *self, PyObject *args, PyObject *kwds)
2045 {
2046     static char *kwlist[] = {"id", NULL};
2047     PyObject *id;
2048     // XXX Use "L" for id?
2049     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2050                                      "O:destroy", kwlist, &id)) {
2051         return NULL;
2052     }
2053 
2054     // Look up the interpreter.
2055     PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2056     if (interp == NULL) {
2057         return NULL;
2058     }
2059 
2060     // Ensure we don't try to destroy the current interpreter.
2061     PyInterpreterState *current = _get_current();
2062     if (current == NULL) {
2063         return NULL;
2064     }
2065     if (interp == current) {
2066         PyErr_SetString(PyExc_RuntimeError,
2067                         "cannot destroy the current interpreter");
2068         return NULL;
2069     }
2070 
2071     // Ensure the interpreter isn't running.
2072     /* XXX We *could* support destroying a running interpreter but
2073        aren't going to worry about it for now. */
2074     if (_ensure_not_running(interp) < 0) {
2075         return NULL;
2076     }
2077 
2078     // Destroy the interpreter.
2079     PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
2080     // XXX Possible GILState issues?
2081     PyThreadState *save_tstate = PyThreadState_Swap(tstate);
2082     Py_EndInterpreter(tstate);
2083     PyThreadState_Swap(save_tstate);
2084 
2085     Py_RETURN_NONE;
2086 }
2087 
2088 PyDoc_STRVAR(destroy_doc,
2089 "destroy(id)\n\
2090 \n\
2091 Destroy the identified interpreter.\n\
2092 \n\
2093 Attempting to destroy the current interpreter results in a RuntimeError.\n\
2094 So does an unrecognized ID.");
2095 
2096 
2097 static PyObject *
interp_list_all(PyObject * self,PyObject * Py_UNUSED (ignored))2098 interp_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
2099 {
2100     PyObject *ids, *id;
2101     PyInterpreterState *interp;
2102 
2103     ids = PyList_New(0);
2104     if (ids == NULL) {
2105         return NULL;
2106     }
2107 
2108     interp = PyInterpreterState_Head();
2109     while (interp != NULL) {
2110         id = _PyInterpreterState_GetIDObject(interp);
2111         if (id == NULL) {
2112             Py_DECREF(ids);
2113             return NULL;
2114         }
2115         // insert at front of list
2116         int res = PyList_Insert(ids, 0, id);
2117         Py_DECREF(id);
2118         if (res < 0) {
2119             Py_DECREF(ids);
2120             return NULL;
2121         }
2122 
2123         interp = PyInterpreterState_Next(interp);
2124     }
2125 
2126     return ids;
2127 }
2128 
2129 PyDoc_STRVAR(list_all_doc,
2130 "list_all() -> [ID]\n\
2131 \n\
2132 Return a list containing the ID of every existing interpreter.");
2133 
2134 
2135 static PyObject *
interp_get_current(PyObject * self,PyObject * Py_UNUSED (ignored))2136 interp_get_current(PyObject *self, PyObject *Py_UNUSED(ignored))
2137 {
2138     PyInterpreterState *interp =_get_current();
2139     if (interp == NULL) {
2140         return NULL;
2141     }
2142     return _PyInterpreterState_GetIDObject(interp);
2143 }
2144 
2145 PyDoc_STRVAR(get_current_doc,
2146 "get_current() -> ID\n\
2147 \n\
2148 Return the ID of current interpreter.");
2149 
2150 
2151 static PyObject *
interp_get_main(PyObject * self,PyObject * Py_UNUSED (ignored))2152 interp_get_main(PyObject *self, PyObject *Py_UNUSED(ignored))
2153 {
2154     // Currently, 0 is always the main interpreter.
2155     int64_t id = 0;
2156     return _PyInterpreterID_New(id);
2157 }
2158 
2159 PyDoc_STRVAR(get_main_doc,
2160 "get_main() -> ID\n\
2161 \n\
2162 Return the ID of main interpreter.");
2163 
2164 
2165 static PyObject *
interp_run_string(PyObject * self,PyObject * args,PyObject * kwds)2166 interp_run_string(PyObject *self, PyObject *args, PyObject *kwds)
2167 {
2168     static char *kwlist[] = {"id", "script", "shared", NULL};
2169     PyObject *id, *code;
2170     PyObject *shared = NULL;
2171     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2172                                      "OU|O:run_string", kwlist,
2173                                      &id, &code, &shared)) {
2174         return NULL;
2175     }
2176 
2177     // Look up the interpreter.
2178     PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2179     if (interp == NULL) {
2180         return NULL;
2181     }
2182 
2183     // Extract code.
2184     Py_ssize_t size;
2185     const char *codestr = PyUnicode_AsUTF8AndSize(code, &size);
2186     if (codestr == NULL) {
2187         return NULL;
2188     }
2189     if (strlen(codestr) != (size_t)size) {
2190         PyErr_SetString(PyExc_ValueError,
2191                         "source code string cannot contain null bytes");
2192         return NULL;
2193     }
2194 
2195     // Run the code in the interpreter.
2196     if (_run_script_in_interpreter(interp, codestr, shared) != 0) {
2197         return NULL;
2198     }
2199     Py_RETURN_NONE;
2200 }
2201 
2202 PyDoc_STRVAR(run_string_doc,
2203 "run_string(id, script, shared)\n\
2204 \n\
2205 Execute the provided string in the identified interpreter.\n\
2206 \n\
2207 See PyRun_SimpleStrings.");
2208 
2209 
2210 static PyObject *
object_is_shareable(PyObject * self,PyObject * args,PyObject * kwds)2211 object_is_shareable(PyObject *self, PyObject *args, PyObject *kwds)
2212 {
2213     static char *kwlist[] = {"obj", NULL};
2214     PyObject *obj;
2215     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2216                                      "O:is_shareable", kwlist, &obj)) {
2217         return NULL;
2218     }
2219 
2220     if (_PyObject_CheckCrossInterpreterData(obj) == 0) {
2221         Py_RETURN_TRUE;
2222     }
2223     PyErr_Clear();
2224     Py_RETURN_FALSE;
2225 }
2226 
2227 PyDoc_STRVAR(is_shareable_doc,
2228 "is_shareable(obj) -> bool\n\
2229 \n\
2230 Return True if the object's data may be shared between interpreters and\n\
2231 False otherwise.");
2232 
2233 
2234 static PyObject *
interp_is_running(PyObject * self,PyObject * args,PyObject * kwds)2235 interp_is_running(PyObject *self, PyObject *args, PyObject *kwds)
2236 {
2237     static char *kwlist[] = {"id", NULL};
2238     PyObject *id;
2239     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2240                                      "O:is_running", kwlist, &id)) {
2241         return NULL;
2242     }
2243 
2244     PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2245     if (interp == NULL) {
2246         return NULL;
2247     }
2248     int is_running = _is_running(interp);
2249     if (is_running < 0) {
2250         return NULL;
2251     }
2252     if (is_running) {
2253         Py_RETURN_TRUE;
2254     }
2255     Py_RETURN_FALSE;
2256 }
2257 
2258 PyDoc_STRVAR(is_running_doc,
2259 "is_running(id) -> bool\n\
2260 \n\
2261 Return whether or not the identified interpreter is running.");
2262 
2263 static PyObject *
channel_create(PyObject * self,PyObject * Py_UNUSED (ignored))2264 channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
2265 {
2266     int64_t cid = _channel_create(&_globals.channels);
2267     if (cid < 0) {
2268         return NULL;
2269     }
2270     PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, cid, 0,
2271                                             &_globals.channels, 0, 0);
2272     if (id == NULL) {
2273         if (_channel_destroy(&_globals.channels, cid) != 0) {
2274             // XXX issue a warning?
2275         }
2276         return NULL;
2277     }
2278     assert(((channelid *)id)->channels != NULL);
2279     return id;
2280 }
2281 
2282 PyDoc_STRVAR(channel_create_doc,
2283 "channel_create() -> cid\n\
2284 \n\
2285 Create a new cross-interpreter channel and return a unique generated ID.");
2286 
2287 static PyObject *
channel_destroy(PyObject * self,PyObject * args,PyObject * kwds)2288 channel_destroy(PyObject *self, PyObject *args, PyObject *kwds)
2289 {
2290     static char *kwlist[] = {"cid", NULL};
2291     int64_t cid;
2292     if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist,
2293                                      channel_id_converter, &cid)) {
2294         return NULL;
2295     }
2296 
2297     if (_channel_destroy(&_globals.channels, cid) != 0) {
2298         return NULL;
2299     }
2300     Py_RETURN_NONE;
2301 }
2302 
2303 PyDoc_STRVAR(channel_destroy_doc,
2304 "channel_destroy(cid)\n\
2305 \n\
2306 Close and finalize the channel.  Afterward attempts to use the channel\n\
2307 will behave as though it never existed.");
2308 
2309 static PyObject *
channel_list_all(PyObject * self,PyObject * Py_UNUSED (ignored))2310 channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
2311 {
2312     int64_t count = 0;
2313     int64_t *cids = _channels_list_all(&_globals.channels, &count);
2314     if (cids == NULL) {
2315         if (count == 0) {
2316             return PyList_New(0);
2317         }
2318         return NULL;
2319     }
2320     PyObject *ids = PyList_New((Py_ssize_t)count);
2321     if (ids == NULL) {
2322         goto finally;
2323     }
2324     int64_t *cur = cids;
2325     for (int64_t i=0; i < count; cur++, i++) {
2326         PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, *cur, 0,
2327                                                 &_globals.channels, 0, 0);
2328         if (id == NULL) {
2329             Py_DECREF(ids);
2330             ids = NULL;
2331             break;
2332         }
2333         PyList_SET_ITEM(ids, i, id);
2334     }
2335 
2336 finally:
2337     PyMem_Free(cids);
2338     return ids;
2339 }
2340 
2341 PyDoc_STRVAR(channel_list_all_doc,
2342 "channel_list_all() -> [cid]\n\
2343 \n\
2344 Return the list of all IDs for active channels.");
2345 
2346 static PyObject *
channel_list_interpreters(PyObject * self,PyObject * args,PyObject * kwds)2347 channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
2348 {
2349     static char *kwlist[] = {"cid", "send", NULL};
2350     int64_t cid;            /* Channel ID */
2351     int send = 0;           /* Send or receive end? */
2352     int64_t id;
2353     PyObject *ids, *id_obj;
2354     PyInterpreterState *interp;
2355 
2356     if (!PyArg_ParseTupleAndKeywords(
2357             args, kwds, "O&$p:channel_list_interpreters",
2358             kwlist, channel_id_converter, &cid, &send)) {
2359         return NULL;
2360     }
2361 
2362     ids = PyList_New(0);
2363     if (ids == NULL) {
2364         goto except;
2365     }
2366 
2367     interp = PyInterpreterState_Head();
2368     while (interp != NULL) {
2369         id = PyInterpreterState_GetID(interp);
2370         assert(id >= 0);
2371         int res = _channel_is_associated(&_globals.channels, cid, id, send);
2372         if (res < 0) {
2373             goto except;
2374         }
2375         if (res) {
2376             id_obj = _PyInterpreterState_GetIDObject(interp);
2377             if (id_obj == NULL) {
2378                 goto except;
2379             }
2380             res = PyList_Insert(ids, 0, id_obj);
2381             Py_DECREF(id_obj);
2382             if (res < 0) {
2383                 goto except;
2384             }
2385         }
2386         interp = PyInterpreterState_Next(interp);
2387     }
2388 
2389     goto finally;
2390 
2391 except:
2392     Py_XDECREF(ids);
2393     ids = NULL;
2394 
2395 finally:
2396     return ids;
2397 }
2398 
2399 PyDoc_STRVAR(channel_list_interpreters_doc,
2400 "channel_list_interpreters(cid, *, send) -> [id]\n\
2401 \n\
2402 Return the list of all interpreter IDs associated with an end of the channel.\n\
2403 \n\
2404 The 'send' argument should be a boolean indicating whether to use the send or\n\
2405 receive end.");
2406 
2407 
2408 static PyObject *
channel_send(PyObject * self,PyObject * args,PyObject * kwds)2409 channel_send(PyObject *self, PyObject *args, PyObject *kwds)
2410 {
2411     static char *kwlist[] = {"cid", "obj", NULL};
2412     int64_t cid;
2413     PyObject *obj;
2414     if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist,
2415                                      channel_id_converter, &cid, &obj)) {
2416         return NULL;
2417     }
2418 
2419     if (_channel_send(&_globals.channels, cid, obj) != 0) {
2420         return NULL;
2421     }
2422     Py_RETURN_NONE;
2423 }
2424 
2425 PyDoc_STRVAR(channel_send_doc,
2426 "channel_send(cid, obj)\n\
2427 \n\
2428 Add the object's data to the channel's queue.");
2429 
2430 static PyObject *
channel_recv(PyObject * self,PyObject * args,PyObject * kwds)2431 channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
2432 {
2433     static char *kwlist[] = {"cid", "default", NULL};
2434     int64_t cid;
2435     PyObject *dflt = NULL;
2436     if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&|O:channel_recv", kwlist,
2437                                      channel_id_converter, &cid, &dflt)) {
2438         return NULL;
2439     }
2440     Py_XINCREF(dflt);
2441 
2442     PyObject *obj = _channel_recv(&_globals.channels, cid);
2443     if (obj != NULL) {
2444         Py_XDECREF(dflt);
2445         return obj;
2446     } else if (PyErr_Occurred()) {
2447         Py_XDECREF(dflt);
2448         return NULL;
2449     } else if (dflt != NULL) {
2450         return dflt;
2451     } else {
2452         PyErr_Format(ChannelEmptyError, "channel %" PRId64 " is empty", cid);
2453         return NULL;
2454     }
2455 }
2456 
2457 PyDoc_STRVAR(channel_recv_doc,
2458 "channel_recv(cid, [default]) -> obj\n\
2459 \n\
2460 Return a new object from the data at the front of the channel's queue.\n\
2461 \n\
2462 If there is nothing to receive then raise ChannelEmptyError, unless\n\
2463 a default value is provided.  In that case return it.");
2464 
2465 static PyObject *
channel_close(PyObject * self,PyObject * args,PyObject * kwds)2466 channel_close(PyObject *self, PyObject *args, PyObject *kwds)
2467 {
2468     static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
2469     int64_t cid;
2470     int send = 0;
2471     int recv = 0;
2472     int force = 0;
2473     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2474                                      "O&|$ppp:channel_close", kwlist,
2475                                      channel_id_converter, &cid, &send, &recv, &force)) {
2476         return NULL;
2477     }
2478 
2479     if (_channel_close(&_globals.channels, cid, send-recv, force) != 0) {
2480         return NULL;
2481     }
2482     Py_RETURN_NONE;
2483 }
2484 
2485 PyDoc_STRVAR(channel_close_doc,
2486 "channel_close(cid, *, send=None, recv=None, force=False)\n\
2487 \n\
2488 Close the channel for all interpreters.\n\
2489 \n\
2490 If the channel is empty then the keyword args are ignored and both\n\
2491 ends are immediately closed.  Otherwise, if 'force' is True then\n\
2492 all queued items are released and both ends are immediately\n\
2493 closed.\n\
2494 \n\
2495 If the channel is not empty *and* 'force' is False then following\n\
2496 happens:\n\
2497 \n\
2498  * recv is True (regardless of send):\n\
2499    - raise ChannelNotEmptyError\n\
2500  * recv is None and send is None:\n\
2501    - raise ChannelNotEmptyError\n\
2502  * send is True and recv is not True:\n\
2503    - fully close the 'send' end\n\
2504    - close the 'recv' end to interpreters not already receiving\n\
2505    - fully close it once empty\n\
2506 \n\
2507 Closing an already closed channel results in a ChannelClosedError.\n\
2508 \n\
2509 Once the channel's ID has no more ref counts in any interpreter\n\
2510 the channel will be destroyed.");
2511 
2512 static PyObject *
channel_release(PyObject * self,PyObject * args,PyObject * kwds)2513 channel_release(PyObject *self, PyObject *args, PyObject *kwds)
2514 {
2515     // Note that only the current interpreter is affected.
2516     static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
2517     int64_t cid;
2518     int send = 0;
2519     int recv = 0;
2520     int force = 0;
2521     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2522                                      "O&|$ppp:channel_release", kwlist,
2523                                      channel_id_converter, &cid, &send, &recv, &force)) {
2524         return NULL;
2525     }
2526     if (send == 0 && recv == 0) {
2527         send = 1;
2528         recv = 1;
2529     }
2530 
2531     // XXX Handle force is True.
2532     // XXX Fix implicit release.
2533 
2534     if (_channel_drop(&_globals.channels, cid, send, recv) != 0) {
2535         return NULL;
2536     }
2537     Py_RETURN_NONE;
2538 }
2539 
2540 PyDoc_STRVAR(channel_release_doc,
2541 "channel_release(cid, *, send=None, recv=None, force=True)\n\
2542 \n\
2543 Close the channel for the current interpreter.  'send' and 'recv'\n\
2544 (bool) may be used to indicate the ends to close.  By default both\n\
2545 ends are closed.  Closing an already closed end is a noop.");
2546 
2547 static PyObject *
channel__channel_id(PyObject * self,PyObject * args,PyObject * kwds)2548 channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
2549 {
2550     return channelid_new(&ChannelIDtype, args, kwds);
2551 }
2552 
2553 static PyMethodDef module_functions[] = {
2554     {"create",                    (PyCFunction)(void(*)(void))interp_create,
2555      METH_VARARGS | METH_KEYWORDS, create_doc},
2556     {"destroy",                   (PyCFunction)(void(*)(void))interp_destroy,
2557      METH_VARARGS | METH_KEYWORDS, destroy_doc},
2558     {"list_all",                  interp_list_all,
2559      METH_NOARGS, list_all_doc},
2560     {"get_current",               interp_get_current,
2561      METH_NOARGS, get_current_doc},
2562     {"get_main",                  interp_get_main,
2563      METH_NOARGS, get_main_doc},
2564     {"is_running",                (PyCFunction)(void(*)(void))interp_is_running,
2565      METH_VARARGS | METH_KEYWORDS, is_running_doc},
2566     {"run_string",                (PyCFunction)(void(*)(void))interp_run_string,
2567      METH_VARARGS | METH_KEYWORDS, run_string_doc},
2568 
2569     {"is_shareable",              (PyCFunction)(void(*)(void))object_is_shareable,
2570      METH_VARARGS | METH_KEYWORDS, is_shareable_doc},
2571 
2572     {"channel_create",            channel_create,
2573      METH_NOARGS, channel_create_doc},
2574     {"channel_destroy",           (PyCFunction)(void(*)(void))channel_destroy,
2575      METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
2576     {"channel_list_all",          channel_list_all,
2577      METH_NOARGS, channel_list_all_doc},
2578     {"channel_list_interpreters", (PyCFunction)(void(*)(void))channel_list_interpreters,
2579      METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
2580     {"channel_send",              (PyCFunction)(void(*)(void))channel_send,
2581      METH_VARARGS | METH_KEYWORDS, channel_send_doc},
2582     {"channel_recv",              (PyCFunction)(void(*)(void))channel_recv,
2583      METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
2584     {"channel_close",             (PyCFunction)(void(*)(void))channel_close,
2585      METH_VARARGS | METH_KEYWORDS, channel_close_doc},
2586     {"channel_release",           (PyCFunction)(void(*)(void))channel_release,
2587      METH_VARARGS | METH_KEYWORDS, channel_release_doc},
2588     {"_channel_id",               (PyCFunction)(void(*)(void))channel__channel_id,
2589      METH_VARARGS | METH_KEYWORDS, NULL},
2590 
2591     {NULL,                        NULL}           /* sentinel */
2592 };
2593 
2594 
2595 /* initialization function */
2596 
2597 PyDoc_STRVAR(module_doc,
2598 "This module provides primitive operations to manage Python interpreters.\n\
2599 The 'interpreters' module provides a more convenient interface.");
2600 
2601 static struct PyModuleDef interpretersmodule = {
2602     PyModuleDef_HEAD_INIT,
2603     "_xxsubinterpreters",  /* m_name */
2604     module_doc,            /* m_doc */
2605     -1,                    /* m_size */
2606     module_functions,      /* m_methods */
2607     NULL,                  /* m_slots */
2608     NULL,                  /* m_traverse */
2609     NULL,                  /* m_clear */
2610     NULL                   /* m_free */
2611 };
2612 
2613 
2614 PyMODINIT_FUNC
PyInit__xxsubinterpreters(void)2615 PyInit__xxsubinterpreters(void)
2616 {
2617     if (_init_globals() != 0) {
2618         return NULL;
2619     }
2620 
2621     /* Initialize types */
2622     if (PyType_Ready(&ChannelIDtype) != 0) {
2623         return NULL;
2624     }
2625 
2626     /* Create the module */
2627     PyObject *module = PyModule_Create(&interpretersmodule);
2628     if (module == NULL) {
2629         return NULL;
2630     }
2631 
2632     /* Add exception types */
2633     PyObject *ns = PyModule_GetDict(module);  // borrowed
2634     if (interp_exceptions_init(ns) != 0) {
2635         return NULL;
2636     }
2637     if (channel_exceptions_init(ns) != 0) {
2638         return NULL;
2639     }
2640 
2641     /* Add other types */
2642     Py_INCREF(&ChannelIDtype);
2643     if (PyDict_SetItemString(ns, "ChannelID", (PyObject *)&ChannelIDtype) != 0) {
2644         return NULL;
2645     }
2646     Py_INCREF(&_PyInterpreterID_Type);
2647     if (PyDict_SetItemString(ns, "InterpreterID", (PyObject *)&_PyInterpreterID_Type) != 0) {
2648         return NULL;
2649     }
2650 
2651     if (_PyCrossInterpreterData_RegisterClass(&ChannelIDtype, _channelid_shared)) {
2652         return NULL;
2653     }
2654 
2655     return module;
2656 }
2657