1
2 /* interpreters module */
3 /* low-level access to interpreter primitives */
4
5 #include "Python.h"
6 #include "frameobject.h"
7 #include "interpreteridobject.h"
8
9
10 static char *
_copy_raw_string(PyObject * strobj)11 _copy_raw_string(PyObject *strobj)
12 {
13 const char *str = PyUnicode_AsUTF8(strobj);
14 if (str == NULL) {
15 return NULL;
16 }
17 char *copied = PyMem_Malloc(strlen(str)+1);
18 if (copied == NULL) {
19 PyErr_NoMemory();
20 return NULL;
21 }
22 strcpy(copied, str);
23 return copied;
24 }
25
26 static PyInterpreterState *
_get_current(void)27 _get_current(void)
28 {
29 // _PyInterpreterState_Get() aborts if lookup fails, so don't need
30 // to check the result for NULL.
31 return _PyInterpreterState_Get();
32 }
33
34
35 /* data-sharing-specific code ***********************************************/
36
37 struct _sharednsitem {
38 char *name;
39 _PyCrossInterpreterData data;
40 };
41
42 static void _sharednsitem_clear(struct _sharednsitem *); // forward
43
44 static int
_sharednsitem_init(struct _sharednsitem * item,PyObject * key,PyObject * value)45 _sharednsitem_init(struct _sharednsitem *item, PyObject *key, PyObject *value)
46 {
47 item->name = _copy_raw_string(key);
48 if (item->name == NULL) {
49 return -1;
50 }
51 if (_PyObject_GetCrossInterpreterData(value, &item->data) != 0) {
52 _sharednsitem_clear(item);
53 return -1;
54 }
55 return 0;
56 }
57
58 static void
_sharednsitem_clear(struct _sharednsitem * item)59 _sharednsitem_clear(struct _sharednsitem *item)
60 {
61 if (item->name != NULL) {
62 PyMem_Free(item->name);
63 item->name = NULL;
64 }
65 _PyCrossInterpreterData_Release(&item->data);
66 }
67
68 static int
_sharednsitem_apply(struct _sharednsitem * item,PyObject * ns)69 _sharednsitem_apply(struct _sharednsitem *item, PyObject *ns)
70 {
71 PyObject *name = PyUnicode_FromString(item->name);
72 if (name == NULL) {
73 return -1;
74 }
75 PyObject *value = _PyCrossInterpreterData_NewObject(&item->data);
76 if (value == NULL) {
77 Py_DECREF(name);
78 return -1;
79 }
80 int res = PyDict_SetItem(ns, name, value);
81 Py_DECREF(name);
82 Py_DECREF(value);
83 return res;
84 }
85
86 typedef struct _sharedns {
87 Py_ssize_t len;
88 struct _sharednsitem* items;
89 } _sharedns;
90
91 static _sharedns *
_sharedns_new(Py_ssize_t len)92 _sharedns_new(Py_ssize_t len)
93 {
94 _sharedns *shared = PyMem_NEW(_sharedns, 1);
95 if (shared == NULL) {
96 PyErr_NoMemory();
97 return NULL;
98 }
99 shared->len = len;
100 shared->items = PyMem_NEW(struct _sharednsitem, len);
101 if (shared->items == NULL) {
102 PyErr_NoMemory();
103 PyMem_Free(shared);
104 return NULL;
105 }
106 return shared;
107 }
108
109 static void
_sharedns_free(_sharedns * shared)110 _sharedns_free(_sharedns *shared)
111 {
112 for (Py_ssize_t i=0; i < shared->len; i++) {
113 _sharednsitem_clear(&shared->items[i]);
114 }
115 PyMem_Free(shared->items);
116 PyMem_Free(shared);
117 }
118
119 static _sharedns *
_get_shared_ns(PyObject * shareable)120 _get_shared_ns(PyObject *shareable)
121 {
122 if (shareable == NULL || shareable == Py_None) {
123 return NULL;
124 }
125 Py_ssize_t len = PyDict_Size(shareable);
126 if (len == 0) {
127 return NULL;
128 }
129
130 _sharedns *shared = _sharedns_new(len);
131 if (shared == NULL) {
132 return NULL;
133 }
134 Py_ssize_t pos = 0;
135 for (Py_ssize_t i=0; i < len; i++) {
136 PyObject *key, *value;
137 if (PyDict_Next(shareable, &pos, &key, &value) == 0) {
138 break;
139 }
140 if (_sharednsitem_init(&shared->items[i], key, value) != 0) {
141 break;
142 }
143 }
144 if (PyErr_Occurred()) {
145 _sharedns_free(shared);
146 return NULL;
147 }
148 return shared;
149 }
150
151 static int
_sharedns_apply(_sharedns * shared,PyObject * ns)152 _sharedns_apply(_sharedns *shared, PyObject *ns)
153 {
154 for (Py_ssize_t i=0; i < shared->len; i++) {
155 if (_sharednsitem_apply(&shared->items[i], ns) != 0) {
156 return -1;
157 }
158 }
159 return 0;
160 }
161
162 // Ultimately we'd like to preserve enough information about the
163 // exception and traceback that we could re-constitute (or at least
164 // simulate, a la traceback.TracebackException), and even chain, a copy
165 // of the exception in the calling interpreter.
166
167 typedef struct _sharedexception {
168 char *name;
169 char *msg;
170 } _sharedexception;
171
172 static _sharedexception *
_sharedexception_new(void)173 _sharedexception_new(void)
174 {
175 _sharedexception *err = PyMem_NEW(_sharedexception, 1);
176 if (err == NULL) {
177 PyErr_NoMemory();
178 return NULL;
179 }
180 err->name = NULL;
181 err->msg = NULL;
182 return err;
183 }
184
185 static void
_sharedexception_clear(_sharedexception * exc)186 _sharedexception_clear(_sharedexception *exc)
187 {
188 if (exc->name != NULL) {
189 PyMem_Free(exc->name);
190 }
191 if (exc->msg != NULL) {
192 PyMem_Free(exc->msg);
193 }
194 }
195
196 static void
_sharedexception_free(_sharedexception * exc)197 _sharedexception_free(_sharedexception *exc)
198 {
199 _sharedexception_clear(exc);
200 PyMem_Free(exc);
201 }
202
203 static _sharedexception *
_sharedexception_bind(PyObject * exctype,PyObject * exc,PyObject * tb)204 _sharedexception_bind(PyObject *exctype, PyObject *exc, PyObject *tb)
205 {
206 assert(exctype != NULL);
207 char *failure = NULL;
208
209 _sharedexception *err = _sharedexception_new();
210 if (err == NULL) {
211 goto finally;
212 }
213
214 PyObject *name = PyUnicode_FromFormat("%S", exctype);
215 if (name == NULL) {
216 failure = "unable to format exception type name";
217 goto finally;
218 }
219 err->name = _copy_raw_string(name);
220 Py_DECREF(name);
221 if (err->name == NULL) {
222 if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
223 failure = "out of memory copying exception type name";
224 }
225 failure = "unable to encode and copy exception type name";
226 goto finally;
227 }
228
229 if (exc != NULL) {
230 PyObject *msg = PyUnicode_FromFormat("%S", exc);
231 if (msg == NULL) {
232 failure = "unable to format exception message";
233 goto finally;
234 }
235 err->msg = _copy_raw_string(msg);
236 Py_DECREF(msg);
237 if (err->msg == NULL) {
238 if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
239 failure = "out of memory copying exception message";
240 }
241 failure = "unable to encode and copy exception message";
242 goto finally;
243 }
244 }
245
246 finally:
247 if (failure != NULL) {
248 PyErr_Clear();
249 if (err->name != NULL) {
250 PyMem_Free(err->name);
251 err->name = NULL;
252 }
253 err->msg = failure;
254 }
255 return err;
256 }
257
258 static void
_sharedexception_apply(_sharedexception * exc,PyObject * wrapperclass)259 _sharedexception_apply(_sharedexception *exc, PyObject *wrapperclass)
260 {
261 if (exc->name != NULL) {
262 if (exc->msg != NULL) {
263 PyErr_Format(wrapperclass, "%s: %s", exc->name, exc->msg);
264 }
265 else {
266 PyErr_SetString(wrapperclass, exc->name);
267 }
268 }
269 else if (exc->msg != NULL) {
270 PyErr_SetString(wrapperclass, exc->msg);
271 }
272 else {
273 PyErr_SetNone(wrapperclass);
274 }
275 }
276
277
278 /* channel-specific code ****************************************************/
279
280 #define CHANNEL_SEND 1
281 #define CHANNEL_BOTH 0
282 #define CHANNEL_RECV -1
283
284 static PyObject *ChannelError;
285 static PyObject *ChannelNotFoundError;
286 static PyObject *ChannelClosedError;
287 static PyObject *ChannelEmptyError;
288 static PyObject *ChannelNotEmptyError;
289
290 static int
channel_exceptions_init(PyObject * ns)291 channel_exceptions_init(PyObject *ns)
292 {
293 // XXX Move the exceptions into per-module memory?
294
295 // A channel-related operation failed.
296 ChannelError = PyErr_NewException("_xxsubinterpreters.ChannelError",
297 PyExc_RuntimeError, NULL);
298 if (ChannelError == NULL) {
299 return -1;
300 }
301 if (PyDict_SetItemString(ns, "ChannelError", ChannelError) != 0) {
302 return -1;
303 }
304
305 // An operation tried to use a channel that doesn't exist.
306 ChannelNotFoundError = PyErr_NewException(
307 "_xxsubinterpreters.ChannelNotFoundError", ChannelError, NULL);
308 if (ChannelNotFoundError == NULL) {
309 return -1;
310 }
311 if (PyDict_SetItemString(ns, "ChannelNotFoundError", ChannelNotFoundError) != 0) {
312 return -1;
313 }
314
315 // An operation tried to use a closed channel.
316 ChannelClosedError = PyErr_NewException(
317 "_xxsubinterpreters.ChannelClosedError", ChannelError, NULL);
318 if (ChannelClosedError == NULL) {
319 return -1;
320 }
321 if (PyDict_SetItemString(ns, "ChannelClosedError", ChannelClosedError) != 0) {
322 return -1;
323 }
324
325 // An operation tried to pop from an empty channel.
326 ChannelEmptyError = PyErr_NewException(
327 "_xxsubinterpreters.ChannelEmptyError", ChannelError, NULL);
328 if (ChannelEmptyError == NULL) {
329 return -1;
330 }
331 if (PyDict_SetItemString(ns, "ChannelEmptyError", ChannelEmptyError) != 0) {
332 return -1;
333 }
334
335 // An operation tried to close a non-empty channel.
336 ChannelNotEmptyError = PyErr_NewException(
337 "_xxsubinterpreters.ChannelNotEmptyError", ChannelError, NULL);
338 if (ChannelNotEmptyError == NULL) {
339 return -1;
340 }
341 if (PyDict_SetItemString(ns, "ChannelNotEmptyError", ChannelNotEmptyError) != 0) {
342 return -1;
343 }
344
345 return 0;
346 }
347
348 /* the channel queue */
349
350 struct _channelitem;
351
352 typedef struct _channelitem {
353 _PyCrossInterpreterData *data;
354 struct _channelitem *next;
355 } _channelitem;
356
357 static _channelitem *
_channelitem_new(void)358 _channelitem_new(void)
359 {
360 _channelitem *item = PyMem_NEW(_channelitem, 1);
361 if (item == NULL) {
362 PyErr_NoMemory();
363 return NULL;
364 }
365 item->data = NULL;
366 item->next = NULL;
367 return item;
368 }
369
370 static void
_channelitem_clear(_channelitem * item)371 _channelitem_clear(_channelitem *item)
372 {
373 if (item->data != NULL) {
374 _PyCrossInterpreterData_Release(item->data);
375 PyMem_Free(item->data);
376 item->data = NULL;
377 }
378 item->next = NULL;
379 }
380
381 static void
_channelitem_free(_channelitem * item)382 _channelitem_free(_channelitem *item)
383 {
384 _channelitem_clear(item);
385 PyMem_Free(item);
386 }
387
388 static void
_channelitem_free_all(_channelitem * item)389 _channelitem_free_all(_channelitem *item)
390 {
391 while (item != NULL) {
392 _channelitem *last = item;
393 item = item->next;
394 _channelitem_free(last);
395 }
396 }
397
398 static _PyCrossInterpreterData *
_channelitem_popped(_channelitem * item)399 _channelitem_popped(_channelitem *item)
400 {
401 _PyCrossInterpreterData *data = item->data;
402 item->data = NULL;
403 _channelitem_free(item);
404 return data;
405 }
406
407 typedef struct _channelqueue {
408 int64_t count;
409 _channelitem *first;
410 _channelitem *last;
411 } _channelqueue;
412
413 static _channelqueue *
_channelqueue_new(void)414 _channelqueue_new(void)
415 {
416 _channelqueue *queue = PyMem_NEW(_channelqueue, 1);
417 if (queue == NULL) {
418 PyErr_NoMemory();
419 return NULL;
420 }
421 queue->count = 0;
422 queue->first = NULL;
423 queue->last = NULL;
424 return queue;
425 }
426
427 static void
_channelqueue_clear(_channelqueue * queue)428 _channelqueue_clear(_channelqueue *queue)
429 {
430 _channelitem_free_all(queue->first);
431 queue->count = 0;
432 queue->first = NULL;
433 queue->last = NULL;
434 }
435
436 static void
_channelqueue_free(_channelqueue * queue)437 _channelqueue_free(_channelqueue *queue)
438 {
439 _channelqueue_clear(queue);
440 PyMem_Free(queue);
441 }
442
443 static int
_channelqueue_put(_channelqueue * queue,_PyCrossInterpreterData * data)444 _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
445 {
446 _channelitem *item = _channelitem_new();
447 if (item == NULL) {
448 return -1;
449 }
450 item->data = data;
451
452 queue->count += 1;
453 if (queue->first == NULL) {
454 queue->first = item;
455 }
456 else {
457 queue->last->next = item;
458 }
459 queue->last = item;
460 return 0;
461 }
462
463 static _PyCrossInterpreterData *
_channelqueue_get(_channelqueue * queue)464 _channelqueue_get(_channelqueue *queue)
465 {
466 _channelitem *item = queue->first;
467 if (item == NULL) {
468 return NULL;
469 }
470 queue->first = item->next;
471 if (queue->last == item) {
472 queue->last = NULL;
473 }
474 queue->count -= 1;
475
476 return _channelitem_popped(item);
477 }
478
479 /* channel-interpreter associations */
480
481 struct _channelend;
482
483 typedef struct _channelend {
484 struct _channelend *next;
485 int64_t interp;
486 int open;
487 } _channelend;
488
489 static _channelend *
_channelend_new(int64_t interp)490 _channelend_new(int64_t interp)
491 {
492 _channelend *end = PyMem_NEW(_channelend, 1);
493 if (end == NULL) {
494 PyErr_NoMemory();
495 return NULL;
496 }
497 end->next = NULL;
498 end->interp = interp;
499 end->open = 1;
500 return end;
501 }
502
503 static void
_channelend_free(_channelend * end)504 _channelend_free(_channelend *end)
505 {
506 PyMem_Free(end);
507 }
508
509 static void
_channelend_free_all(_channelend * end)510 _channelend_free_all(_channelend *end)
511 {
512 while (end != NULL) {
513 _channelend *last = end;
514 end = end->next;
515 _channelend_free(last);
516 }
517 }
518
519 static _channelend *
_channelend_find(_channelend * first,int64_t interp,_channelend ** pprev)520 _channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
521 {
522 _channelend *prev = NULL;
523 _channelend *end = first;
524 while (end != NULL) {
525 if (end->interp == interp) {
526 break;
527 }
528 prev = end;
529 end = end->next;
530 }
531 if (pprev != NULL) {
532 *pprev = prev;
533 }
534 return end;
535 }
536
537 typedef struct _channelassociations {
538 // Note that the list entries are never removed for interpreter
539 // for which the channel is closed. This should be a problem in
540 // practice. Also, a channel isn't automatically closed when an
541 // interpreter is destroyed.
542 int64_t numsendopen;
543 int64_t numrecvopen;
544 _channelend *send;
545 _channelend *recv;
546 } _channelends;
547
548 static _channelends *
_channelends_new(void)549 _channelends_new(void)
550 {
551 _channelends *ends = PyMem_NEW(_channelends, 1);
552 if (ends== NULL) {
553 return NULL;
554 }
555 ends->numsendopen = 0;
556 ends->numrecvopen = 0;
557 ends->send = NULL;
558 ends->recv = NULL;
559 return ends;
560 }
561
562 static void
_channelends_clear(_channelends * ends)563 _channelends_clear(_channelends *ends)
564 {
565 _channelend_free_all(ends->send);
566 ends->send = NULL;
567 ends->numsendopen = 0;
568
569 _channelend_free_all(ends->recv);
570 ends->recv = NULL;
571 ends->numrecvopen = 0;
572 }
573
574 static void
_channelends_free(_channelends * ends)575 _channelends_free(_channelends *ends)
576 {
577 _channelends_clear(ends);
578 PyMem_Free(ends);
579 }
580
581 static _channelend *
_channelends_add(_channelends * ends,_channelend * prev,int64_t interp,int send)582 _channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
583 int send)
584 {
585 _channelend *end = _channelend_new(interp);
586 if (end == NULL) {
587 return NULL;
588 }
589
590 if (prev == NULL) {
591 if (send) {
592 ends->send = end;
593 }
594 else {
595 ends->recv = end;
596 }
597 }
598 else {
599 prev->next = end;
600 }
601 if (send) {
602 ends->numsendopen += 1;
603 }
604 else {
605 ends->numrecvopen += 1;
606 }
607 return end;
608 }
609
610 static int
_channelends_associate(_channelends * ends,int64_t interp,int send)611 _channelends_associate(_channelends *ends, int64_t interp, int send)
612 {
613 _channelend *prev;
614 _channelend *end = _channelend_find(send ? ends->send : ends->recv,
615 interp, &prev);
616 if (end != NULL) {
617 if (!end->open) {
618 PyErr_SetString(ChannelClosedError, "channel already closed");
619 return -1;
620 }
621 // already associated
622 return 0;
623 }
624 if (_channelends_add(ends, prev, interp, send) == NULL) {
625 return -1;
626 }
627 return 0;
628 }
629
630 static int
_channelends_is_open(_channelends * ends)631 _channelends_is_open(_channelends *ends)
632 {
633 if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
634 return 1;
635 }
636 if (ends->send == NULL && ends->recv == NULL) {
637 return 1;
638 }
639 return 0;
640 }
641
642 static void
_channelends_close_end(_channelends * ends,_channelend * end,int send)643 _channelends_close_end(_channelends *ends, _channelend *end, int send)
644 {
645 end->open = 0;
646 if (send) {
647 ends->numsendopen -= 1;
648 }
649 else {
650 ends->numrecvopen -= 1;
651 }
652 }
653
654 static int
_channelends_close_interpreter(_channelends * ends,int64_t interp,int which)655 _channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
656 {
657 _channelend *prev;
658 _channelend *end;
659 if (which >= 0) { // send/both
660 end = _channelend_find(ends->send, interp, &prev);
661 if (end == NULL) {
662 // never associated so add it
663 end = _channelends_add(ends, prev, interp, 1);
664 if (end == NULL) {
665 return -1;
666 }
667 }
668 _channelends_close_end(ends, end, 1);
669 }
670 if (which <= 0) { // recv/both
671 end = _channelend_find(ends->recv, interp, &prev);
672 if (end == NULL) {
673 // never associated so add it
674 end = _channelends_add(ends, prev, interp, 0);
675 if (end == NULL) {
676 return -1;
677 }
678 }
679 _channelends_close_end(ends, end, 0);
680 }
681 return 0;
682 }
683
684 static void
_channelends_close_all(_channelends * ends,int which,int force)685 _channelends_close_all(_channelends *ends, int which, int force)
686 {
687 // XXX Handle the ends.
688 // XXX Handle force is True.
689
690 // Ensure all the "send"-associated interpreters are closed.
691 _channelend *end;
692 for (end = ends->send; end != NULL; end = end->next) {
693 _channelends_close_end(ends, end, 1);
694 }
695
696 // Ensure all the "recv"-associated interpreters are closed.
697 for (end = ends->recv; end != NULL; end = end->next) {
698 _channelends_close_end(ends, end, 0);
699 }
700 }
701
702 /* channels */
703
704 struct _channel;
705 struct _channel_closing;
706 static void _channel_clear_closing(struct _channel *);
707 static void _channel_finish_closing(struct _channel *);
708
709 typedef struct _channel {
710 PyThread_type_lock mutex;
711 _channelqueue *queue;
712 _channelends *ends;
713 int open;
714 struct _channel_closing *closing;
715 } _PyChannelState;
716
717 static _PyChannelState *
_channel_new(void)718 _channel_new(void)
719 {
720 _PyChannelState *chan = PyMem_NEW(_PyChannelState, 1);
721 if (chan == NULL) {
722 return NULL;
723 }
724 chan->mutex = PyThread_allocate_lock();
725 if (chan->mutex == NULL) {
726 PyMem_Free(chan);
727 PyErr_SetString(ChannelError,
728 "can't initialize mutex for new channel");
729 return NULL;
730 }
731 chan->queue = _channelqueue_new();
732 if (chan->queue == NULL) {
733 PyMem_Free(chan);
734 return NULL;
735 }
736 chan->ends = _channelends_new();
737 if (chan->ends == NULL) {
738 _channelqueue_free(chan->queue);
739 PyMem_Free(chan);
740 return NULL;
741 }
742 chan->open = 1;
743 chan->closing = NULL;
744 return chan;
745 }
746
747 static void
_channel_free(_PyChannelState * chan)748 _channel_free(_PyChannelState *chan)
749 {
750 _channel_clear_closing(chan);
751 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
752 _channelqueue_free(chan->queue);
753 _channelends_free(chan->ends);
754 PyThread_release_lock(chan->mutex);
755
756 PyThread_free_lock(chan->mutex);
757 PyMem_Free(chan);
758 }
759
760 static int
_channel_add(_PyChannelState * chan,int64_t interp,_PyCrossInterpreterData * data)761 _channel_add(_PyChannelState *chan, int64_t interp,
762 _PyCrossInterpreterData *data)
763 {
764 int res = -1;
765 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
766
767 if (!chan->open) {
768 PyErr_SetString(ChannelClosedError, "channel closed");
769 goto done;
770 }
771 if (_channelends_associate(chan->ends, interp, 1) != 0) {
772 goto done;
773 }
774
775 if (_channelqueue_put(chan->queue, data) != 0) {
776 goto done;
777 }
778
779 res = 0;
780 done:
781 PyThread_release_lock(chan->mutex);
782 return res;
783 }
784
785 static _PyCrossInterpreterData *
_channel_next(_PyChannelState * chan,int64_t interp)786 _channel_next(_PyChannelState *chan, int64_t interp)
787 {
788 _PyCrossInterpreterData *data = NULL;
789 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
790
791 if (!chan->open) {
792 PyErr_SetString(ChannelClosedError, "channel closed");
793 goto done;
794 }
795 if (_channelends_associate(chan->ends, interp, 0) != 0) {
796 goto done;
797 }
798
799 data = _channelqueue_get(chan->queue);
800 if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) {
801 chan->open = 0;
802 }
803
804 done:
805 PyThread_release_lock(chan->mutex);
806 if (chan->queue->count == 0) {
807 _channel_finish_closing(chan);
808 }
809 return data;
810 }
811
812 static int
_channel_close_interpreter(_PyChannelState * chan,int64_t interp,int end)813 _channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
814 {
815 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
816
817 int res = -1;
818 if (!chan->open) {
819 PyErr_SetString(ChannelClosedError, "channel already closed");
820 goto done;
821 }
822
823 if (_channelends_close_interpreter(chan->ends, interp, end) != 0) {
824 goto done;
825 }
826 chan->open = _channelends_is_open(chan->ends);
827
828 res = 0;
829 done:
830 PyThread_release_lock(chan->mutex);
831 return res;
832 }
833
834 static int
_channel_close_all(_PyChannelState * chan,int end,int force)835 _channel_close_all(_PyChannelState *chan, int end, int force)
836 {
837 int res = -1;
838 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
839
840 if (!chan->open) {
841 PyErr_SetString(ChannelClosedError, "channel already closed");
842 goto done;
843 }
844
845 if (!force && chan->queue->count > 0) {
846 PyErr_SetString(ChannelNotEmptyError,
847 "may not be closed if not empty (try force=True)");
848 goto done;
849 }
850
851 chan->open = 0;
852
853 // We *could* also just leave these in place, since we've marked
854 // the channel as closed already.
855 _channelends_close_all(chan->ends, end, force);
856
857 res = 0;
858 done:
859 PyThread_release_lock(chan->mutex);
860 return res;
861 }
862
863 /* the set of channels */
864
865 struct _channelref;
866
867 typedef struct _channelref {
868 int64_t id;
869 _PyChannelState *chan;
870 struct _channelref *next;
871 Py_ssize_t objcount;
872 } _channelref;
873
874 static _channelref *
_channelref_new(int64_t id,_PyChannelState * chan)875 _channelref_new(int64_t id, _PyChannelState *chan)
876 {
877 _channelref *ref = PyMem_NEW(_channelref, 1);
878 if (ref == NULL) {
879 return NULL;
880 }
881 ref->id = id;
882 ref->chan = chan;
883 ref->next = NULL;
884 ref->objcount = 0;
885 return ref;
886 }
887
888 //static void
889 //_channelref_clear(_channelref *ref)
890 //{
891 // ref->id = -1;
892 // ref->chan = NULL;
893 // ref->next = NULL;
894 // ref->objcount = 0;
895 //}
896
897 static void
_channelref_free(_channelref * ref)898 _channelref_free(_channelref *ref)
899 {
900 if (ref->chan != NULL) {
901 _channel_clear_closing(ref->chan);
902 }
903 //_channelref_clear(ref);
904 PyMem_Free(ref);
905 }
906
907 static _channelref *
_channelref_find(_channelref * first,int64_t id,_channelref ** pprev)908 _channelref_find(_channelref *first, int64_t id, _channelref **pprev)
909 {
910 _channelref *prev = NULL;
911 _channelref *ref = first;
912 while (ref != NULL) {
913 if (ref->id == id) {
914 break;
915 }
916 prev = ref;
917 ref = ref->next;
918 }
919 if (pprev != NULL) {
920 *pprev = prev;
921 }
922 return ref;
923 }
924
925 typedef struct _channels {
926 PyThread_type_lock mutex;
927 _channelref *head;
928 int64_t numopen;
929 int64_t next_id;
930 } _channels;
931
932 static int
_channels_init(_channels * channels)933 _channels_init(_channels *channels)
934 {
935 if (channels->mutex == NULL) {
936 channels->mutex = PyThread_allocate_lock();
937 if (channels->mutex == NULL) {
938 PyErr_SetString(ChannelError,
939 "can't initialize mutex for channel management");
940 return -1;
941 }
942 }
943 channels->head = NULL;
944 channels->numopen = 0;
945 channels->next_id = 0;
946 return 0;
947 }
948
949 static int64_t
_channels_next_id(_channels * channels)950 _channels_next_id(_channels *channels) // needs lock
951 {
952 int64_t id = channels->next_id;
953 if (id < 0) {
954 /* overflow */
955 PyErr_SetString(ChannelError,
956 "failed to get a channel ID");
957 return -1;
958 }
959 channels->next_id += 1;
960 return id;
961 }
962
963 static _PyChannelState *
_channels_lookup(_channels * channels,int64_t id,PyThread_type_lock * pmutex)964 _channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex)
965 {
966 _PyChannelState *chan = NULL;
967 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
968 if (pmutex != NULL) {
969 *pmutex = NULL;
970 }
971
972 _channelref *ref = _channelref_find(channels->head, id, NULL);
973 if (ref == NULL) {
974 PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
975 goto done;
976 }
977 if (ref->chan == NULL || !ref->chan->open) {
978 PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id);
979 goto done;
980 }
981
982 if (pmutex != NULL) {
983 // The mutex will be closed by the caller.
984 *pmutex = channels->mutex;
985 }
986
987 chan = ref->chan;
988 done:
989 if (pmutex == NULL || *pmutex == NULL) {
990 PyThread_release_lock(channels->mutex);
991 }
992 return chan;
993 }
994
995 static int64_t
_channels_add(_channels * channels,_PyChannelState * chan)996 _channels_add(_channels *channels, _PyChannelState *chan)
997 {
998 int64_t cid = -1;
999 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1000
1001 // Create a new ref.
1002 int64_t id = _channels_next_id(channels);
1003 if (id < 0) {
1004 goto done;
1005 }
1006 _channelref *ref = _channelref_new(id, chan);
1007 if (ref == NULL) {
1008 goto done;
1009 }
1010
1011 // Add it to the list.
1012 // We assume that the channel is a new one (not already in the list).
1013 ref->next = channels->head;
1014 channels->head = ref;
1015 channels->numopen += 1;
1016
1017 cid = id;
1018 done:
1019 PyThread_release_lock(channels->mutex);
1020 return cid;
1021 }
1022
1023 /* forward */
1024 static int _channel_set_closing(struct _channelref *, PyThread_type_lock);
1025
1026 static int
_channels_close(_channels * channels,int64_t cid,_PyChannelState ** pchan,int end,int force)1027 _channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan,
1028 int end, int force)
1029 {
1030 int res = -1;
1031 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1032 if (pchan != NULL) {
1033 *pchan = NULL;
1034 }
1035
1036 _channelref *ref = _channelref_find(channels->head, cid, NULL);
1037 if (ref == NULL) {
1038 PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", cid);
1039 goto done;
1040 }
1041
1042 if (ref->chan == NULL) {
1043 PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
1044 goto done;
1045 }
1046 else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) {
1047 PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
1048 goto done;
1049 }
1050 else {
1051 if (_channel_close_all(ref->chan, end, force) != 0) {
1052 if (end == CHANNEL_SEND &&
1053 PyErr_ExceptionMatches(ChannelNotEmptyError)) {
1054 if (ref->chan->closing != NULL) {
1055 PyErr_Format(ChannelClosedError,
1056 "channel %" PRId64 " closed", cid);
1057 goto done;
1058 }
1059 // Mark the channel as closing and return. The channel
1060 // will be cleaned up in _channel_next().
1061 PyErr_Clear();
1062 if (_channel_set_closing(ref, channels->mutex) != 0) {
1063 goto done;
1064 }
1065 if (pchan != NULL) {
1066 *pchan = ref->chan;
1067 }
1068 res = 0;
1069 }
1070 goto done;
1071 }
1072 if (pchan != NULL) {
1073 *pchan = ref->chan;
1074 }
1075 else {
1076 _channel_free(ref->chan);
1077 }
1078 ref->chan = NULL;
1079 }
1080
1081 res = 0;
1082 done:
1083 PyThread_release_lock(channels->mutex);
1084 return res;
1085 }
1086
1087 static void
_channels_remove_ref(_channels * channels,_channelref * ref,_channelref * prev,_PyChannelState ** pchan)1088 _channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
1089 _PyChannelState **pchan)
1090 {
1091 if (ref == channels->head) {
1092 channels->head = ref->next;
1093 }
1094 else {
1095 prev->next = ref->next;
1096 }
1097 channels->numopen -= 1;
1098
1099 if (pchan != NULL) {
1100 *pchan = ref->chan;
1101 }
1102 _channelref_free(ref);
1103 }
1104
1105 static int
_channels_remove(_channels * channels,int64_t id,_PyChannelState ** pchan)1106 _channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan)
1107 {
1108 int res = -1;
1109 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1110
1111 if (pchan != NULL) {
1112 *pchan = NULL;
1113 }
1114
1115 _channelref *prev = NULL;
1116 _channelref *ref = _channelref_find(channels->head, id, &prev);
1117 if (ref == NULL) {
1118 PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
1119 goto done;
1120 }
1121
1122 _channels_remove_ref(channels, ref, prev, pchan);
1123
1124 res = 0;
1125 done:
1126 PyThread_release_lock(channels->mutex);
1127 return res;
1128 }
1129
1130 static int
_channels_add_id_object(_channels * channels,int64_t id)1131 _channels_add_id_object(_channels *channels, int64_t id)
1132 {
1133 int res = -1;
1134 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1135
1136 _channelref *ref = _channelref_find(channels->head, id, NULL);
1137 if (ref == NULL) {
1138 PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
1139 goto done;
1140 }
1141 ref->objcount += 1;
1142
1143 res = 0;
1144 done:
1145 PyThread_release_lock(channels->mutex);
1146 return res;
1147 }
1148
1149 static void
_channels_drop_id_object(_channels * channels,int64_t id)1150 _channels_drop_id_object(_channels *channels, int64_t id)
1151 {
1152 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1153
1154 _channelref *prev = NULL;
1155 _channelref *ref = _channelref_find(channels->head, id, &prev);
1156 if (ref == NULL) {
1157 // Already destroyed.
1158 goto done;
1159 }
1160 ref->objcount -= 1;
1161
1162 // Destroy if no longer used.
1163 if (ref->objcount == 0) {
1164 _PyChannelState *chan = NULL;
1165 _channels_remove_ref(channels, ref, prev, &chan);
1166 if (chan != NULL) {
1167 _channel_free(chan);
1168 }
1169 }
1170
1171 done:
1172 PyThread_release_lock(channels->mutex);
1173 }
1174
1175 static int64_t *
_channels_list_all(_channels * channels,int64_t * count)1176 _channels_list_all(_channels *channels, int64_t *count)
1177 {
1178 int64_t *cids = NULL;
1179 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1180 int64_t numopen = channels->numopen;
1181 if (numopen >= PY_SSIZE_T_MAX) {
1182 PyErr_SetString(PyExc_RuntimeError, "too many channels open");
1183 goto done;
1184 }
1185 int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
1186 if (ids == NULL) {
1187 goto done;
1188 }
1189 _channelref *ref = channels->head;
1190 for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
1191 ids[i] = ref->id;
1192 }
1193 *count = channels->numopen;
1194
1195 cids = ids;
1196 done:
1197 PyThread_release_lock(channels->mutex);
1198 return cids;
1199 }
1200
1201 /* support for closing non-empty channels */
1202
1203 struct _channel_closing {
1204 struct _channelref *ref;
1205 };
1206
1207 static int
_channel_set_closing(struct _channelref * ref,PyThread_type_lock mutex)1208 _channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
1209 struct _channel *chan = ref->chan;
1210 if (chan == NULL) {
1211 // already closed
1212 return 0;
1213 }
1214 int res = -1;
1215 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
1216 if (chan->closing != NULL) {
1217 PyErr_SetString(ChannelClosedError, "channel closed");
1218 goto done;
1219 }
1220 chan->closing = PyMem_NEW(struct _channel_closing, 1);
1221 if (chan->closing == NULL) {
1222 goto done;
1223 }
1224 chan->closing->ref = ref;
1225
1226 res = 0;
1227 done:
1228 PyThread_release_lock(chan->mutex);
1229 return res;
1230 }
1231
1232 static void
_channel_clear_closing(struct _channel * chan)1233 _channel_clear_closing(struct _channel *chan) {
1234 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
1235 if (chan->closing != NULL) {
1236 PyMem_Free(chan->closing);
1237 chan->closing = NULL;
1238 }
1239 PyThread_release_lock(chan->mutex);
1240 }
1241
1242 static void
_channel_finish_closing(struct _channel * chan)1243 _channel_finish_closing(struct _channel *chan) {
1244 struct _channel_closing *closing = chan->closing;
1245 if (closing == NULL) {
1246 return;
1247 }
1248 _channelref *ref = closing->ref;
1249 _channel_clear_closing(chan);
1250 // Do the things that would have been done in _channels_close().
1251 ref->chan = NULL;
1252 _channel_free(chan);
1253 }
1254
1255 /* "high"-level channel-related functions */
1256
1257 static int64_t
_channel_create(_channels * channels)1258 _channel_create(_channels *channels)
1259 {
1260 _PyChannelState *chan = _channel_new();
1261 if (chan == NULL) {
1262 return -1;
1263 }
1264 int64_t id = _channels_add(channels, chan);
1265 if (id < 0) {
1266 _channel_free(chan);
1267 return -1;
1268 }
1269 return id;
1270 }
1271
1272 static int
_channel_destroy(_channels * channels,int64_t id)1273 _channel_destroy(_channels *channels, int64_t id)
1274 {
1275 _PyChannelState *chan = NULL;
1276 if (_channels_remove(channels, id, &chan) != 0) {
1277 return -1;
1278 }
1279 if (chan != NULL) {
1280 _channel_free(chan);
1281 }
1282 return 0;
1283 }
1284
1285 static int
_channel_send(_channels * channels,int64_t id,PyObject * obj)1286 _channel_send(_channels *channels, int64_t id, PyObject *obj)
1287 {
1288 PyInterpreterState *interp = _get_current();
1289 if (interp == NULL) {
1290 return -1;
1291 }
1292
1293 // Look up the channel.
1294 PyThread_type_lock mutex = NULL;
1295 _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1296 if (chan == NULL) {
1297 return -1;
1298 }
1299 // Past this point we are responsible for releasing the mutex.
1300
1301 if (chan->closing != NULL) {
1302 PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id);
1303 PyThread_release_lock(mutex);
1304 return -1;
1305 }
1306
1307 // Convert the object to cross-interpreter data.
1308 _PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1);
1309 if (data == NULL) {
1310 PyThread_release_lock(mutex);
1311 return -1;
1312 }
1313 if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
1314 PyThread_release_lock(mutex);
1315 PyMem_Free(data);
1316 return -1;
1317 }
1318
1319 // Add the data to the channel.
1320 int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
1321 PyThread_release_lock(mutex);
1322 if (res != 0) {
1323 _PyCrossInterpreterData_Release(data);
1324 PyMem_Free(data);
1325 return -1;
1326 }
1327
1328 return 0;
1329 }
1330
1331 static PyObject *
_channel_recv(_channels * channels,int64_t id)1332 _channel_recv(_channels *channels, int64_t id)
1333 {
1334 PyInterpreterState *interp = _get_current();
1335 if (interp == NULL) {
1336 return NULL;
1337 }
1338
1339 // Look up the channel.
1340 PyThread_type_lock mutex = NULL;
1341 _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1342 if (chan == NULL) {
1343 return NULL;
1344 }
1345 // Past this point we are responsible for releasing the mutex.
1346
1347 // Pop off the next item from the channel.
1348 _PyCrossInterpreterData *data = _channel_next(chan, PyInterpreterState_GetID(interp));
1349 PyThread_release_lock(mutex);
1350 if (data == NULL) {
1351 if (!PyErr_Occurred()) {
1352 PyErr_Format(ChannelEmptyError, "channel %" PRId64 " is empty", id);
1353 }
1354 return NULL;
1355 }
1356
1357 // Convert the data back to an object.
1358 PyObject *obj = _PyCrossInterpreterData_NewObject(data);
1359 if (obj == NULL) {
1360 return NULL;
1361 }
1362 _PyCrossInterpreterData_Release(data);
1363 PyMem_Free(data);
1364
1365 return obj;
1366 }
1367
1368 static int
_channel_drop(_channels * channels,int64_t id,int send,int recv)1369 _channel_drop(_channels *channels, int64_t id, int send, int recv)
1370 {
1371 PyInterpreterState *interp = _get_current();
1372 if (interp == NULL) {
1373 return -1;
1374 }
1375
1376 // Look up the channel.
1377 PyThread_type_lock mutex = NULL;
1378 _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1379 if (chan == NULL) {
1380 return -1;
1381 }
1382 // Past this point we are responsible for releasing the mutex.
1383
1384 // Close one or both of the two ends.
1385 int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv);
1386 PyThread_release_lock(mutex);
1387 return res;
1388 }
1389
1390 static int
_channel_close(_channels * channels,int64_t id,int end,int force)1391 _channel_close(_channels *channels, int64_t id, int end, int force)
1392 {
1393 return _channels_close(channels, id, NULL, end, force);
1394 }
1395
1396 /* ChannelID class */
1397
1398 static PyTypeObject ChannelIDtype;
1399
1400 typedef struct channelid {
1401 PyObject_HEAD
1402 int64_t id;
1403 int end;
1404 int resolve;
1405 _channels *channels;
1406 } channelid;
1407
1408 static int
channel_id_converter(PyObject * arg,void * ptr)1409 channel_id_converter(PyObject *arg, void *ptr)
1410 {
1411 int64_t cid;
1412 if (PyObject_TypeCheck(arg, &ChannelIDtype)) {
1413 cid = ((channelid *)arg)->id;
1414 }
1415 else if (PyIndex_Check(arg)) {
1416 cid = PyLong_AsLongLong(arg);
1417 if (cid == -1 && PyErr_Occurred()) {
1418 return 0;
1419 }
1420 if (cid < 0) {
1421 PyErr_Format(PyExc_ValueError,
1422 "channel ID must be a non-negative int, got %R", arg);
1423 return 0;
1424 }
1425 }
1426 else {
1427 PyErr_Format(PyExc_TypeError,
1428 "channel ID must be an int, got %.100s",
1429 arg->ob_type->tp_name);
1430 return 0;
1431 }
1432 *(int64_t *)ptr = cid;
1433 return 1;
1434 }
1435
1436 static channelid *
newchannelid(PyTypeObject * cls,int64_t cid,int end,_channels * channels,int force,int resolve)1437 newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
1438 int force, int resolve)
1439 {
1440 channelid *self = PyObject_New(channelid, cls);
1441 if (self == NULL) {
1442 return NULL;
1443 }
1444 self->id = cid;
1445 self->end = end;
1446 self->resolve = resolve;
1447 self->channels = channels;
1448
1449 if (_channels_add_id_object(channels, cid) != 0) {
1450 if (force && PyErr_ExceptionMatches(ChannelNotFoundError)) {
1451 PyErr_Clear();
1452 }
1453 else {
1454 Py_DECREF((PyObject *)self);
1455 return NULL;
1456 }
1457 }
1458
1459 return self;
1460 }
1461
1462 static _channels * _global_channels(void);
1463
1464 static PyObject *
channelid_new(PyTypeObject * cls,PyObject * args,PyObject * kwds)1465 channelid_new(PyTypeObject *cls, PyObject *args, PyObject *kwds)
1466 {
1467 static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL};
1468 int64_t cid;
1469 int send = -1;
1470 int recv = -1;
1471 int force = 0;
1472 int resolve = 0;
1473 if (!PyArg_ParseTupleAndKeywords(args, kwds,
1474 "O&|$pppp:ChannelID.__new__", kwlist,
1475 channel_id_converter, &cid, &send, &recv, &force, &resolve))
1476 return NULL;
1477
1478 // Handle "send" and "recv".
1479 if (send == 0 && recv == 0) {
1480 PyErr_SetString(PyExc_ValueError,
1481 "'send' and 'recv' cannot both be False");
1482 return NULL;
1483 }
1484
1485 int end = 0;
1486 if (send == 1) {
1487 if (recv == 0 || recv == -1) {
1488 end = CHANNEL_SEND;
1489 }
1490 }
1491 else if (recv == 1) {
1492 end = CHANNEL_RECV;
1493 }
1494
1495 return (PyObject *)newchannelid(cls, cid, end, _global_channels(),
1496 force, resolve);
1497 }
1498
1499 static void
channelid_dealloc(PyObject * v)1500 channelid_dealloc(PyObject *v)
1501 {
1502 int64_t cid = ((channelid *)v)->id;
1503 _channels *channels = ((channelid *)v)->channels;
1504 Py_TYPE(v)->tp_free(v);
1505
1506 _channels_drop_id_object(channels, cid);
1507 }
1508
1509 static PyObject *
channelid_repr(PyObject * self)1510 channelid_repr(PyObject *self)
1511 {
1512 PyTypeObject *type = Py_TYPE(self);
1513 const char *name = _PyType_Name(type);
1514
1515 channelid *cid = (channelid *)self;
1516 const char *fmt;
1517 if (cid->end == CHANNEL_SEND) {
1518 fmt = "%s(%" PRId64 ", send=True)";
1519 }
1520 else if (cid->end == CHANNEL_RECV) {
1521 fmt = "%s(%" PRId64 ", recv=True)";
1522 }
1523 else {
1524 fmt = "%s(%" PRId64 ")";
1525 }
1526 return PyUnicode_FromFormat(fmt, name, cid->id);
1527 }
1528
1529 static PyObject *
channelid_str(PyObject * self)1530 channelid_str(PyObject *self)
1531 {
1532 channelid *cid = (channelid *)self;
1533 return PyUnicode_FromFormat("%" PRId64 "", cid->id);
1534 }
1535
1536 static PyObject *
channelid_int(PyObject * self)1537 channelid_int(PyObject *self)
1538 {
1539 channelid *cid = (channelid *)self;
1540 return PyLong_FromLongLong(cid->id);
1541 }
1542
1543 static PyNumberMethods channelid_as_number = {
1544 0, /* nb_add */
1545 0, /* nb_subtract */
1546 0, /* nb_multiply */
1547 0, /* nb_remainder */
1548 0, /* nb_divmod */
1549 0, /* nb_power */
1550 0, /* nb_negative */
1551 0, /* nb_positive */
1552 0, /* nb_absolute */
1553 0, /* nb_bool */
1554 0, /* nb_invert */
1555 0, /* nb_lshift */
1556 0, /* nb_rshift */
1557 0, /* nb_and */
1558 0, /* nb_xor */
1559 0, /* nb_or */
1560 (unaryfunc)channelid_int, /* nb_int */
1561 0, /* nb_reserved */
1562 0, /* nb_float */
1563
1564 0, /* nb_inplace_add */
1565 0, /* nb_inplace_subtract */
1566 0, /* nb_inplace_multiply */
1567 0, /* nb_inplace_remainder */
1568 0, /* nb_inplace_power */
1569 0, /* nb_inplace_lshift */
1570 0, /* nb_inplace_rshift */
1571 0, /* nb_inplace_and */
1572 0, /* nb_inplace_xor */
1573 0, /* nb_inplace_or */
1574
1575 0, /* nb_floor_divide */
1576 0, /* nb_true_divide */
1577 0, /* nb_inplace_floor_divide */
1578 0, /* nb_inplace_true_divide */
1579
1580 (unaryfunc)channelid_int, /* nb_index */
1581 };
1582
1583 static Py_hash_t
channelid_hash(PyObject * self)1584 channelid_hash(PyObject *self)
1585 {
1586 channelid *cid = (channelid *)self;
1587 PyObject *id = PyLong_FromLongLong(cid->id);
1588 if (id == NULL) {
1589 return -1;
1590 }
1591 Py_hash_t hash = PyObject_Hash(id);
1592 Py_DECREF(id);
1593 return hash;
1594 }
1595
1596 static PyObject *
channelid_richcompare(PyObject * self,PyObject * other,int op)1597 channelid_richcompare(PyObject *self, PyObject *other, int op)
1598 {
1599 if (op != Py_EQ && op != Py_NE) {
1600 Py_RETURN_NOTIMPLEMENTED;
1601 }
1602
1603 if (!PyObject_TypeCheck(self, &ChannelIDtype)) {
1604 Py_RETURN_NOTIMPLEMENTED;
1605 }
1606
1607 channelid *cid = (channelid *)self;
1608 int equal;
1609 if (PyObject_TypeCheck(other, &ChannelIDtype)) {
1610 channelid *othercid = (channelid *)other;
1611 equal = (cid->end == othercid->end) && (cid->id == othercid->id);
1612 }
1613 else if (PyLong_Check(other)) {
1614 /* Fast path */
1615 int overflow;
1616 long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow);
1617 if (othercid == -1 && PyErr_Occurred()) {
1618 return NULL;
1619 }
1620 equal = !overflow && (othercid >= 0) && (cid->id == othercid);
1621 }
1622 else if (PyNumber_Check(other)) {
1623 PyObject *pyid = PyLong_FromLongLong(cid->id);
1624 if (pyid == NULL) {
1625 return NULL;
1626 }
1627 PyObject *res = PyObject_RichCompare(pyid, other, op);
1628 Py_DECREF(pyid);
1629 return res;
1630 }
1631 else {
1632 Py_RETURN_NOTIMPLEMENTED;
1633 }
1634
1635 if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) {
1636 Py_RETURN_TRUE;
1637 }
1638 Py_RETURN_FALSE;
1639 }
1640
1641 static PyObject *
_channel_from_cid(PyObject * cid,int end)1642 _channel_from_cid(PyObject *cid, int end)
1643 {
1644 PyObject *highlevel = PyImport_ImportModule("interpreters");
1645 if (highlevel == NULL) {
1646 PyErr_Clear();
1647 highlevel = PyImport_ImportModule("test.support.interpreters");
1648 if (highlevel == NULL) {
1649 return NULL;
1650 }
1651 }
1652 const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" :
1653 "SendChannel";
1654 PyObject *cls = PyObject_GetAttrString(highlevel, clsname);
1655 Py_DECREF(highlevel);
1656 if (cls == NULL) {
1657 return NULL;
1658 }
1659 PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL);
1660 Py_DECREF(cls);
1661 if (chan == NULL) {
1662 return NULL;
1663 }
1664 return chan;
1665 }
1666
1667 struct _channelid_xid {
1668 int64_t id;
1669 int end;
1670 int resolve;
1671 };
1672
1673 static PyObject *
_channelid_from_xid(_PyCrossInterpreterData * data)1674 _channelid_from_xid(_PyCrossInterpreterData *data)
1675 {
1676 struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
1677 // Note that we do not preserve the "resolve" flag.
1678 PyObject *cid = (PyObject *)newchannelid(&ChannelIDtype, xid->id, xid->end,
1679 _global_channels(), 0, 0);
1680 if (xid->end == 0) {
1681 return cid;
1682 }
1683 if (!xid->resolve) {
1684 return cid;
1685 }
1686
1687 /* Try returning a high-level channel end but fall back to the ID. */
1688 PyObject *chan = _channel_from_cid(cid, xid->end);
1689 if (chan == NULL) {
1690 PyErr_Clear();
1691 return cid;
1692 }
1693 Py_DECREF(cid);
1694 return chan;
1695 }
1696
1697 static int
_channelid_shared(PyObject * obj,_PyCrossInterpreterData * data)1698 _channelid_shared(PyObject *obj, _PyCrossInterpreterData *data)
1699 {
1700 struct _channelid_xid *xid = PyMem_NEW(struct _channelid_xid, 1);
1701 if (xid == NULL) {
1702 return -1;
1703 }
1704 xid->id = ((channelid *)obj)->id;
1705 xid->end = ((channelid *)obj)->end;
1706 xid->resolve = ((channelid *)obj)->resolve;
1707
1708 data->data = xid;
1709 Py_INCREF(obj);
1710 data->obj = obj;
1711 data->new_object = _channelid_from_xid;
1712 data->free = PyMem_Free;
1713 return 0;
1714 }
1715
1716 static PyObject *
channelid_end(PyObject * self,void * end)1717 channelid_end(PyObject *self, void *end)
1718 {
1719 int force = 1;
1720 channelid *cid = (channelid *)self;
1721 if (end != NULL) {
1722 return (PyObject *)newchannelid(Py_TYPE(self), cid->id, *(int *)end,
1723 cid->channels, force, cid->resolve);
1724 }
1725
1726 if (cid->end == CHANNEL_SEND) {
1727 return PyUnicode_InternFromString("send");
1728 }
1729 if (cid->end == CHANNEL_RECV) {
1730 return PyUnicode_InternFromString("recv");
1731 }
1732 return PyUnicode_InternFromString("both");
1733 }
1734
1735 static int _channelid_end_send = CHANNEL_SEND;
1736 static int _channelid_end_recv = CHANNEL_RECV;
1737
1738 static PyGetSetDef channelid_getsets[] = {
1739 {"end", (getter)channelid_end, NULL,
1740 PyDoc_STR("'send', 'recv', or 'both'")},
1741 {"send", (getter)channelid_end, NULL,
1742 PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send},
1743 {"recv", (getter)channelid_end, NULL,
1744 PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv},
1745 {NULL}
1746 };
1747
1748 PyDoc_STRVAR(channelid_doc,
1749 "A channel ID identifies a channel and may be used as an int.");
1750
1751 static PyTypeObject ChannelIDtype = {
1752 PyVarObject_HEAD_INIT(&PyType_Type, 0)
1753 "_xxsubinterpreters.ChannelID", /* tp_name */
1754 sizeof(channelid), /* tp_basicsize */
1755 0, /* tp_itemsize */
1756 (destructor)channelid_dealloc, /* tp_dealloc */
1757 0, /* tp_vectorcall_offset */
1758 0, /* tp_getattr */
1759 0, /* tp_setattr */
1760 0, /* tp_as_async */
1761 (reprfunc)channelid_repr, /* tp_repr */
1762 &channelid_as_number, /* tp_as_number */
1763 0, /* tp_as_sequence */
1764 0, /* tp_as_mapping */
1765 channelid_hash, /* tp_hash */
1766 0, /* tp_call */
1767 (reprfunc)channelid_str, /* tp_str */
1768 0, /* tp_getattro */
1769 0, /* tp_setattro */
1770 0, /* tp_as_buffer */
1771 Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
1772 channelid_doc, /* tp_doc */
1773 0, /* tp_traverse */
1774 0, /* tp_clear */
1775 channelid_richcompare, /* tp_richcompare */
1776 0, /* tp_weaklistoffset */
1777 0, /* tp_iter */
1778 0, /* tp_iternext */
1779 0, /* tp_methods */
1780 0, /* tp_members */
1781 channelid_getsets, /* tp_getset */
1782 0, /* tp_base */
1783 0, /* tp_dict */
1784 0, /* tp_descr_get */
1785 0, /* tp_descr_set */
1786 0, /* tp_dictoffset */
1787 0, /* tp_init */
1788 0, /* tp_alloc */
1789 // Note that we do not set tp_new to channelid_new. Instead we
1790 // set it to NULL, meaning it cannot be instantiated from Python
1791 // code. We do this because there is a strong relationship between
1792 // channel IDs and the channel lifecycle, so this limitation avoids
1793 // related complications.
1794 NULL, /* tp_new */
1795 };
1796
1797
1798 /* interpreter-specific code ************************************************/
1799
1800 static PyObject * RunFailedError = NULL;
1801
1802 static int
interp_exceptions_init(PyObject * ns)1803 interp_exceptions_init(PyObject *ns)
1804 {
1805 // XXX Move the exceptions into per-module memory?
1806
1807 if (RunFailedError == NULL) {
1808 // An uncaught exception came out of interp_run_string().
1809 RunFailedError = PyErr_NewException("_xxsubinterpreters.RunFailedError",
1810 PyExc_RuntimeError, NULL);
1811 if (RunFailedError == NULL) {
1812 return -1;
1813 }
1814 if (PyDict_SetItemString(ns, "RunFailedError", RunFailedError) != 0) {
1815 return -1;
1816 }
1817 }
1818
1819 return 0;
1820 }
1821
1822 static int
_is_running(PyInterpreterState * interp)1823 _is_running(PyInterpreterState *interp)
1824 {
1825 PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
1826 if (PyThreadState_Next(tstate) != NULL) {
1827 PyErr_SetString(PyExc_RuntimeError,
1828 "interpreter has more than one thread");
1829 return -1;
1830 }
1831 PyFrameObject *frame = tstate->frame;
1832 if (frame == NULL) {
1833 if (PyErr_Occurred() != NULL) {
1834 return -1;
1835 }
1836 return 0;
1837 }
1838 return (int)(frame->f_executing);
1839 }
1840
1841 static int
_ensure_not_running(PyInterpreterState * interp)1842 _ensure_not_running(PyInterpreterState *interp)
1843 {
1844 int is_running = _is_running(interp);
1845 if (is_running < 0) {
1846 return -1;
1847 }
1848 if (is_running) {
1849 PyErr_Format(PyExc_RuntimeError, "interpreter already running");
1850 return -1;
1851 }
1852 return 0;
1853 }
1854
1855 static int
_run_script(PyInterpreterState * interp,const char * codestr,_sharedns * shared,_sharedexception ** exc)1856 _run_script(PyInterpreterState *interp, const char *codestr,
1857 _sharedns *shared, _sharedexception **exc)
1858 {
1859 PyObject *exctype = NULL;
1860 PyObject *excval = NULL;
1861 PyObject *tb = NULL;
1862
1863 PyObject *main_mod = _PyInterpreterState_GetMainModule(interp);
1864 if (main_mod == NULL) {
1865 goto error;
1866 }
1867 PyObject *ns = PyModule_GetDict(main_mod); // borrowed
1868 Py_DECREF(main_mod);
1869 if (ns == NULL) {
1870 goto error;
1871 }
1872 Py_INCREF(ns);
1873
1874 // Apply the cross-interpreter data.
1875 if (shared != NULL) {
1876 if (_sharedns_apply(shared, ns) != 0) {
1877 Py_DECREF(ns);
1878 goto error;
1879 }
1880 }
1881
1882 // Run the string (see PyRun_SimpleStringFlags).
1883 PyObject *result = PyRun_StringFlags(codestr, Py_file_input, ns, ns, NULL);
1884 Py_DECREF(ns);
1885 if (result == NULL) {
1886 goto error;
1887 }
1888 else {
1889 Py_DECREF(result); // We throw away the result.
1890 }
1891
1892 *exc = NULL;
1893 return 0;
1894
1895 error:
1896 PyErr_Fetch(&exctype, &excval, &tb);
1897
1898 _sharedexception *sharedexc = _sharedexception_bind(exctype, excval, tb);
1899 Py_XDECREF(exctype);
1900 Py_XDECREF(excval);
1901 Py_XDECREF(tb);
1902 if (sharedexc == NULL) {
1903 fprintf(stderr, "RunFailedError: script raised an uncaught exception");
1904 PyErr_Clear();
1905 sharedexc = NULL;
1906 }
1907 else {
1908 assert(!PyErr_Occurred());
1909 }
1910 *exc = sharedexc;
1911 return -1;
1912 }
1913
1914 static int
_run_script_in_interpreter(PyInterpreterState * interp,const char * codestr,PyObject * shareables)1915 _run_script_in_interpreter(PyInterpreterState *interp, const char *codestr,
1916 PyObject *shareables)
1917 {
1918 if (_ensure_not_running(interp) < 0) {
1919 return -1;
1920 }
1921
1922 _sharedns *shared = _get_shared_ns(shareables);
1923 if (shared == NULL && PyErr_Occurred()) {
1924 return -1;
1925 }
1926
1927 // Switch to interpreter.
1928 PyThreadState *save_tstate = NULL;
1929 if (interp != _PyInterpreterState_Get()) {
1930 // XXX Using the "head" thread isn't strictly correct.
1931 PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
1932 // XXX Possible GILState issues?
1933 save_tstate = PyThreadState_Swap(tstate);
1934 }
1935
1936 // Run the script.
1937 _sharedexception *exc = NULL;
1938 int result = _run_script(interp, codestr, shared, &exc);
1939
1940 // Switch back.
1941 if (save_tstate != NULL) {
1942 PyThreadState_Swap(save_tstate);
1943 }
1944
1945 // Propagate any exception out to the caller.
1946 if (exc != NULL) {
1947 _sharedexception_apply(exc, RunFailedError);
1948 _sharedexception_free(exc);
1949 }
1950 else if (result != 0) {
1951 // We were unable to allocate a shared exception.
1952 PyErr_NoMemory();
1953 }
1954
1955 if (shared != NULL) {
1956 _sharedns_free(shared);
1957 }
1958
1959 return result;
1960 }
1961
1962
1963 /* module level code ********************************************************/
1964
1965 /* globals is the process-global state for the module. It holds all
1966 the data that we need to share between interpreters, so it cannot
1967 hold PyObject values. */
1968 static struct globals {
1969 _channels channels;
1970 } _globals = {{0}};
1971
1972 static int
_init_globals(void)1973 _init_globals(void)
1974 {
1975 if (_channels_init(&_globals.channels) != 0) {
1976 return -1;
1977 }
1978 return 0;
1979 }
1980
1981 static _channels *
_global_channels(void)1982 _global_channels(void) {
1983 return &_globals.channels;
1984 }
1985
1986 static PyObject *
interp_create(PyObject * self,PyObject * args)1987 interp_create(PyObject *self, PyObject *args)
1988 {
1989 if (!PyArg_UnpackTuple(args, "create", 0, 0)) {
1990 return NULL;
1991 }
1992
1993 // Create and initialize the new interpreter.
1994 PyThreadState *save_tstate = PyThreadState_Swap(NULL);
1995 // XXX Possible GILState issues?
1996 PyThreadState *tstate = Py_NewInterpreter();
1997 PyThreadState_Swap(save_tstate);
1998 if (tstate == NULL) {
1999 /* Since no new thread state was created, there is no exception to
2000 propagate; raise a fresh one after swapping in the old thread
2001 state. */
2002 PyErr_SetString(PyExc_RuntimeError, "interpreter creation failed");
2003 return NULL;
2004 }
2005 PyObject *idobj = _PyInterpreterState_GetIDObject(tstate->interp);
2006 if (idobj == NULL) {
2007 // XXX Possible GILState issues?
2008 save_tstate = PyThreadState_Swap(tstate);
2009 Py_EndInterpreter(tstate);
2010 PyThreadState_Swap(save_tstate);
2011 return NULL;
2012 }
2013 _PyInterpreterState_RequireIDRef(tstate->interp, 1);
2014 return idobj;
2015 }
2016
2017 PyDoc_STRVAR(create_doc,
2018 "create() -> ID\n\
2019 \n\
2020 Create a new interpreter and return a unique generated ID.");
2021
2022
2023 static PyObject *
interp_destroy(PyObject * self,PyObject * args,PyObject * kwds)2024 interp_destroy(PyObject *self, PyObject *args, PyObject *kwds)
2025 {
2026 static char *kwlist[] = {"id", NULL};
2027 PyObject *id;
2028 // XXX Use "L" for id?
2029 if (!PyArg_ParseTupleAndKeywords(args, kwds,
2030 "O:destroy", kwlist, &id)) {
2031 return NULL;
2032 }
2033
2034 // Look up the interpreter.
2035 PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2036 if (interp == NULL) {
2037 return NULL;
2038 }
2039
2040 // Ensure we don't try to destroy the current interpreter.
2041 PyInterpreterState *current = _get_current();
2042 if (current == NULL) {
2043 return NULL;
2044 }
2045 if (interp == current) {
2046 PyErr_SetString(PyExc_RuntimeError,
2047 "cannot destroy the current interpreter");
2048 return NULL;
2049 }
2050
2051 // Ensure the interpreter isn't running.
2052 /* XXX We *could* support destroying a running interpreter but
2053 aren't going to worry about it for now. */
2054 if (_ensure_not_running(interp) < 0) {
2055 return NULL;
2056 }
2057
2058 // Destroy the interpreter.
2059 //PyInterpreterState_Delete(interp);
2060 PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
2061 // XXX Possible GILState issues?
2062 PyThreadState *save_tstate = PyThreadState_Swap(tstate);
2063 Py_EndInterpreter(tstate);
2064 PyThreadState_Swap(save_tstate);
2065
2066 Py_RETURN_NONE;
2067 }
2068
2069 PyDoc_STRVAR(destroy_doc,
2070 "destroy(id)\n\
2071 \n\
2072 Destroy the identified interpreter.\n\
2073 \n\
2074 Attempting to destroy the current interpreter results in a RuntimeError.\n\
2075 So does an unrecognized ID.");
2076
2077
2078 static PyObject *
interp_list_all(PyObject * self,PyObject * Py_UNUSED (ignored))2079 interp_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
2080 {
2081 PyObject *ids, *id;
2082 PyInterpreterState *interp;
2083
2084 ids = PyList_New(0);
2085 if (ids == NULL) {
2086 return NULL;
2087 }
2088
2089 interp = PyInterpreterState_Head();
2090 while (interp != NULL) {
2091 id = _PyInterpreterState_GetIDObject(interp);
2092 if (id == NULL) {
2093 Py_DECREF(ids);
2094 return NULL;
2095 }
2096 // insert at front of list
2097 int res = PyList_Insert(ids, 0, id);
2098 Py_DECREF(id);
2099 if (res < 0) {
2100 Py_DECREF(ids);
2101 return NULL;
2102 }
2103
2104 interp = PyInterpreterState_Next(interp);
2105 }
2106
2107 return ids;
2108 }
2109
2110 PyDoc_STRVAR(list_all_doc,
2111 "list_all() -> [ID]\n\
2112 \n\
2113 Return a list containing the ID of every existing interpreter.");
2114
2115
2116 static PyObject *
interp_get_current(PyObject * self,PyObject * Py_UNUSED (ignored))2117 interp_get_current(PyObject *self, PyObject *Py_UNUSED(ignored))
2118 {
2119 PyInterpreterState *interp =_get_current();
2120 if (interp == NULL) {
2121 return NULL;
2122 }
2123 return _PyInterpreterState_GetIDObject(interp);
2124 }
2125
2126 PyDoc_STRVAR(get_current_doc,
2127 "get_current() -> ID\n\
2128 \n\
2129 Return the ID of current interpreter.");
2130
2131
2132 static PyObject *
interp_get_main(PyObject * self,PyObject * Py_UNUSED (ignored))2133 interp_get_main(PyObject *self, PyObject *Py_UNUSED(ignored))
2134 {
2135 // Currently, 0 is always the main interpreter.
2136 PY_INT64_T id = 0;
2137 return _PyInterpreterID_New(id);
2138 }
2139
2140 PyDoc_STRVAR(get_main_doc,
2141 "get_main() -> ID\n\
2142 \n\
2143 Return the ID of main interpreter.");
2144
2145
2146 static PyObject *
interp_run_string(PyObject * self,PyObject * args,PyObject * kwds)2147 interp_run_string(PyObject *self, PyObject *args, PyObject *kwds)
2148 {
2149 static char *kwlist[] = {"id", "script", "shared", NULL};
2150 PyObject *id, *code;
2151 PyObject *shared = NULL;
2152 if (!PyArg_ParseTupleAndKeywords(args, kwds,
2153 "OU|O:run_string", kwlist,
2154 &id, &code, &shared)) {
2155 return NULL;
2156 }
2157
2158 // Look up the interpreter.
2159 PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2160 if (interp == NULL) {
2161 return NULL;
2162 }
2163
2164 // Extract code.
2165 Py_ssize_t size;
2166 const char *codestr = PyUnicode_AsUTF8AndSize(code, &size);
2167 if (codestr == NULL) {
2168 return NULL;
2169 }
2170 if (strlen(codestr) != (size_t)size) {
2171 PyErr_SetString(PyExc_ValueError,
2172 "source code string cannot contain null bytes");
2173 return NULL;
2174 }
2175
2176 // Run the code in the interpreter.
2177 if (_run_script_in_interpreter(interp, codestr, shared) != 0) {
2178 return NULL;
2179 }
2180 Py_RETURN_NONE;
2181 }
2182
2183 PyDoc_STRVAR(run_string_doc,
2184 "run_string(id, script, shared)\n\
2185 \n\
2186 Execute the provided string in the identified interpreter.\n\
2187 \n\
2188 See PyRun_SimpleStrings.");
2189
2190
2191 static PyObject *
object_is_shareable(PyObject * self,PyObject * args,PyObject * kwds)2192 object_is_shareable(PyObject *self, PyObject *args, PyObject *kwds)
2193 {
2194 static char *kwlist[] = {"obj", NULL};
2195 PyObject *obj;
2196 if (!PyArg_ParseTupleAndKeywords(args, kwds,
2197 "O:is_shareable", kwlist, &obj)) {
2198 return NULL;
2199 }
2200
2201 if (_PyObject_CheckCrossInterpreterData(obj) == 0) {
2202 Py_RETURN_TRUE;
2203 }
2204 PyErr_Clear();
2205 Py_RETURN_FALSE;
2206 }
2207
2208 PyDoc_STRVAR(is_shareable_doc,
2209 "is_shareable(obj) -> bool\n\
2210 \n\
2211 Return True if the object's data may be shared between interpreters and\n\
2212 False otherwise.");
2213
2214
2215 static PyObject *
interp_is_running(PyObject * self,PyObject * args,PyObject * kwds)2216 interp_is_running(PyObject *self, PyObject *args, PyObject *kwds)
2217 {
2218 static char *kwlist[] = {"id", NULL};
2219 PyObject *id;
2220 if (!PyArg_ParseTupleAndKeywords(args, kwds,
2221 "O:is_running", kwlist, &id)) {
2222 return NULL;
2223 }
2224
2225 PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2226 if (interp == NULL) {
2227 return NULL;
2228 }
2229 int is_running = _is_running(interp);
2230 if (is_running < 0) {
2231 return NULL;
2232 }
2233 if (is_running) {
2234 Py_RETURN_TRUE;
2235 }
2236 Py_RETURN_FALSE;
2237 }
2238
2239 PyDoc_STRVAR(is_running_doc,
2240 "is_running(id) -> bool\n\
2241 \n\
2242 Return whether or not the identified interpreter is running.");
2243
2244 static PyObject *
channel_create(PyObject * self,PyObject * Py_UNUSED (ignored))2245 channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
2246 {
2247 int64_t cid = _channel_create(&_globals.channels);
2248 if (cid < 0) {
2249 return NULL;
2250 }
2251 PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, cid, 0,
2252 &_globals.channels, 0, 0);
2253 if (id == NULL) {
2254 if (_channel_destroy(&_globals.channels, cid) != 0) {
2255 // XXX issue a warning?
2256 }
2257 return NULL;
2258 }
2259 assert(((channelid *)id)->channels != NULL);
2260 return id;
2261 }
2262
2263 PyDoc_STRVAR(channel_create_doc,
2264 "channel_create() -> cid\n\
2265 \n\
2266 Create a new cross-interpreter channel and return a unique generated ID.");
2267
2268 static PyObject *
channel_destroy(PyObject * self,PyObject * args,PyObject * kwds)2269 channel_destroy(PyObject *self, PyObject *args, PyObject *kwds)
2270 {
2271 static char *kwlist[] = {"cid", NULL};
2272 int64_t cid;
2273 if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist,
2274 channel_id_converter, &cid)) {
2275 return NULL;
2276 }
2277
2278 if (_channel_destroy(&_globals.channels, cid) != 0) {
2279 return NULL;
2280 }
2281 Py_RETURN_NONE;
2282 }
2283
2284 PyDoc_STRVAR(channel_destroy_doc,
2285 "channel_destroy(cid)\n\
2286 \n\
2287 Close and finalize the channel. Afterward attempts to use the channel\n\
2288 will behave as though it never existed.");
2289
2290 static PyObject *
channel_list_all(PyObject * self,PyObject * Py_UNUSED (ignored))2291 channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
2292 {
2293 int64_t count = 0;
2294 int64_t *cids = _channels_list_all(&_globals.channels, &count);
2295 if (cids == NULL) {
2296 if (count == 0) {
2297 return PyList_New(0);
2298 }
2299 return NULL;
2300 }
2301 PyObject *ids = PyList_New((Py_ssize_t)count);
2302 if (ids == NULL) {
2303 goto finally;
2304 }
2305 int64_t *cur = cids;
2306 for (int64_t i=0; i < count; cur++, i++) {
2307 PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, *cur, 0,
2308 &_globals.channels, 0, 0);
2309 if (id == NULL) {
2310 Py_DECREF(ids);
2311 ids = NULL;
2312 break;
2313 }
2314 PyList_SET_ITEM(ids, i, id);
2315 }
2316
2317 finally:
2318 PyMem_Free(cids);
2319 return ids;
2320 }
2321
2322 PyDoc_STRVAR(channel_list_all_doc,
2323 "channel_list_all() -> [cid]\n\
2324 \n\
2325 Return the list of all IDs for active channels.");
2326
2327 static PyObject *
channel_send(PyObject * self,PyObject * args,PyObject * kwds)2328 channel_send(PyObject *self, PyObject *args, PyObject *kwds)
2329 {
2330 static char *kwlist[] = {"cid", "obj", NULL};
2331 int64_t cid;
2332 PyObject *obj;
2333 if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist,
2334 channel_id_converter, &cid, &obj)) {
2335 return NULL;
2336 }
2337
2338 if (_channel_send(&_globals.channels, cid, obj) != 0) {
2339 return NULL;
2340 }
2341 Py_RETURN_NONE;
2342 }
2343
2344 PyDoc_STRVAR(channel_send_doc,
2345 "channel_send(cid, obj)\n\
2346 \n\
2347 Add the object's data to the channel's queue.");
2348
2349 static PyObject *
channel_recv(PyObject * self,PyObject * args,PyObject * kwds)2350 channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
2351 {
2352 static char *kwlist[] = {"cid", NULL};
2353 int64_t cid;
2354 if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_recv", kwlist,
2355 channel_id_converter, &cid)) {
2356 return NULL;
2357 }
2358
2359 return _channel_recv(&_globals.channels, cid);
2360 }
2361
2362 PyDoc_STRVAR(channel_recv_doc,
2363 "channel_recv(cid) -> obj\n\
2364 \n\
2365 Return a new object from the data at the from of the channel's queue.");
2366
2367 static PyObject *
channel_close(PyObject * self,PyObject * args,PyObject * kwds)2368 channel_close(PyObject *self, PyObject *args, PyObject *kwds)
2369 {
2370 static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
2371 int64_t cid;
2372 int send = 0;
2373 int recv = 0;
2374 int force = 0;
2375 if (!PyArg_ParseTupleAndKeywords(args, kwds,
2376 "O&|$ppp:channel_close", kwlist,
2377 channel_id_converter, &cid, &send, &recv, &force)) {
2378 return NULL;
2379 }
2380
2381 if (_channel_close(&_globals.channels, cid, send-recv, force) != 0) {
2382 return NULL;
2383 }
2384 Py_RETURN_NONE;
2385 }
2386
2387 PyDoc_STRVAR(channel_close_doc,
2388 "channel_close(cid, *, send=None, recv=None, force=False)\n\
2389 \n\
2390 Close the channel for all interpreters.\n\
2391 \n\
2392 If the channel is empty then the keyword args are ignored and both\n\
2393 ends are immediately closed. Otherwise, if 'force' is True then\n\
2394 all queued items are released and both ends are immediately\n\
2395 closed.\n\
2396 \n\
2397 If the channel is not empty *and* 'force' is False then following\n\
2398 happens:\n\
2399 \n\
2400 * recv is True (regardless of send):\n\
2401 - raise ChannelNotEmptyError\n\
2402 * recv is None and send is None:\n\
2403 - raise ChannelNotEmptyError\n\
2404 * send is True and recv is not True:\n\
2405 - fully close the 'send' end\n\
2406 - close the 'recv' end to interpreters not already receiving\n\
2407 - fully close it once empty\n\
2408 \n\
2409 Closing an already closed channel results in a ChannelClosedError.\n\
2410 \n\
2411 Once the channel's ID has no more ref counts in any interpreter\n\
2412 the channel will be destroyed.");
2413
2414 static PyObject *
channel_release(PyObject * self,PyObject * args,PyObject * kwds)2415 channel_release(PyObject *self, PyObject *args, PyObject *kwds)
2416 {
2417 // Note that only the current interpreter is affected.
2418 static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
2419 int64_t cid;
2420 int send = 0;
2421 int recv = 0;
2422 int force = 0;
2423 if (!PyArg_ParseTupleAndKeywords(args, kwds,
2424 "O&|$ppp:channel_release", kwlist,
2425 channel_id_converter, &cid, &send, &recv, &force)) {
2426 return NULL;
2427 }
2428 if (send == 0 && recv == 0) {
2429 send = 1;
2430 recv = 1;
2431 }
2432
2433 // XXX Handle force is True.
2434 // XXX Fix implicit release.
2435
2436 if (_channel_drop(&_globals.channels, cid, send, recv) != 0) {
2437 return NULL;
2438 }
2439 Py_RETURN_NONE;
2440 }
2441
2442 PyDoc_STRVAR(channel_release_doc,
2443 "channel_release(cid, *, send=None, recv=None, force=True)\n\
2444 \n\
2445 Close the channel for the current interpreter. 'send' and 'recv'\n\
2446 (bool) may be used to indicate the ends to close. By default both\n\
2447 ends are closed. Closing an already closed end is a noop.");
2448
2449 static PyObject *
channel__channel_id(PyObject * self,PyObject * args,PyObject * kwds)2450 channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
2451 {
2452 return channelid_new(&ChannelIDtype, args, kwds);
2453 }
2454
2455 static PyMethodDef module_functions[] = {
2456 {"create", (PyCFunction)interp_create,
2457 METH_VARARGS, create_doc},
2458 {"destroy", (PyCFunction)(void(*)(void))interp_destroy,
2459 METH_VARARGS | METH_KEYWORDS, destroy_doc},
2460 {"list_all", interp_list_all,
2461 METH_NOARGS, list_all_doc},
2462 {"get_current", interp_get_current,
2463 METH_NOARGS, get_current_doc},
2464 {"get_main", interp_get_main,
2465 METH_NOARGS, get_main_doc},
2466 {"is_running", (PyCFunction)(void(*)(void))interp_is_running,
2467 METH_VARARGS | METH_KEYWORDS, is_running_doc},
2468 {"run_string", (PyCFunction)(void(*)(void))interp_run_string,
2469 METH_VARARGS | METH_KEYWORDS, run_string_doc},
2470
2471 {"is_shareable", (PyCFunction)(void(*)(void))object_is_shareable,
2472 METH_VARARGS | METH_KEYWORDS, is_shareable_doc},
2473
2474 {"channel_create", channel_create,
2475 METH_NOARGS, channel_create_doc},
2476 {"channel_destroy", (PyCFunction)(void(*)(void))channel_destroy,
2477 METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
2478 {"channel_list_all", channel_list_all,
2479 METH_NOARGS, channel_list_all_doc},
2480 {"channel_send", (PyCFunction)(void(*)(void))channel_send,
2481 METH_VARARGS | METH_KEYWORDS, channel_send_doc},
2482 {"channel_recv", (PyCFunction)(void(*)(void))channel_recv,
2483 METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
2484 {"channel_close", (PyCFunction)(void(*)(void))channel_close,
2485 METH_VARARGS | METH_KEYWORDS, channel_close_doc},
2486 {"channel_release", (PyCFunction)(void(*)(void))channel_release,
2487 METH_VARARGS | METH_KEYWORDS, channel_release_doc},
2488 {"_channel_id", (PyCFunction)(void(*)(void))channel__channel_id,
2489 METH_VARARGS | METH_KEYWORDS, NULL},
2490
2491 {NULL, NULL} /* sentinel */
2492 };
2493
2494
2495 /* initialization function */
2496
2497 PyDoc_STRVAR(module_doc,
2498 "This module provides primitive operations to manage Python interpreters.\n\
2499 The 'interpreters' module provides a more convenient interface.");
2500
2501 static struct PyModuleDef interpretersmodule = {
2502 PyModuleDef_HEAD_INIT,
2503 "_xxsubinterpreters", /* m_name */
2504 module_doc, /* m_doc */
2505 -1, /* m_size */
2506 module_functions, /* m_methods */
2507 NULL, /* m_slots */
2508 NULL, /* m_traverse */
2509 NULL, /* m_clear */
2510 NULL /* m_free */
2511 };
2512
2513
2514 PyMODINIT_FUNC
PyInit__xxsubinterpreters(void)2515 PyInit__xxsubinterpreters(void)
2516 {
2517 if (_init_globals() != 0) {
2518 return NULL;
2519 }
2520
2521 /* Initialize types */
2522 if (PyType_Ready(&ChannelIDtype) != 0) {
2523 return NULL;
2524 }
2525
2526 /* Create the module */
2527 PyObject *module = PyModule_Create(&interpretersmodule);
2528 if (module == NULL) {
2529 return NULL;
2530 }
2531
2532 /* Add exception types */
2533 PyObject *ns = PyModule_GetDict(module); // borrowed
2534 if (interp_exceptions_init(ns) != 0) {
2535 return NULL;
2536 }
2537 if (channel_exceptions_init(ns) != 0) {
2538 return NULL;
2539 }
2540
2541 /* Add other types */
2542 Py_INCREF(&ChannelIDtype);
2543 if (PyDict_SetItemString(ns, "ChannelID", (PyObject *)&ChannelIDtype) != 0) {
2544 return NULL;
2545 }
2546 Py_INCREF(&_PyInterpreterID_Type);
2547 if (PyDict_SetItemString(ns, "InterpreterID", (PyObject *)&_PyInterpreterID_Type) != 0) {
2548 return NULL;
2549 }
2550
2551 if (_PyCrossInterpreterData_RegisterClass(&ChannelIDtype, _channelid_shared)) {
2552 return NULL;
2553 }
2554
2555 return module;
2556 }
2557