• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* interpreters module */
2 /* low-level access to interpreter primitives */
3 
4 #ifndef Py_BUILD_CORE_BUILTIN
5 #  define Py_BUILD_CORE_MODULE 1
6 #endif
7 
8 #include "Python.h"
9 #include "pycore_crossinterp.h"   // struct _xid
10 
11 #define REGISTERS_HEAP_TYPES
12 #define HAS_UNBOUND_ITEMS
13 #include "_interpreters_common.h"
14 #undef HAS_UNBOUND_ITEMS
15 #undef REGISTERS_HEAP_TYPES
16 
17 
18 #define MODULE_NAME _interpqueues
19 #define MODULE_NAME_STR Py_STRINGIFY(MODULE_NAME)
20 #define MODINIT_FUNC_NAME RESOLVE_MODINIT_FUNC_NAME(MODULE_NAME)
21 
22 
23 #define GLOBAL_MALLOC(TYPE) \
24     PyMem_RawMalloc(sizeof(TYPE))
25 #define GLOBAL_FREE(VAR) \
26     PyMem_RawFree(VAR)
27 
28 
29 #define XID_IGNORE_EXC 1
30 #define XID_FREE 2
31 
32 static int
_release_xid_data(_PyCrossInterpreterData * data,int flags)33 _release_xid_data(_PyCrossInterpreterData *data, int flags)
34 {
35     int ignoreexc = flags & XID_IGNORE_EXC;
36     PyObject *exc;
37     if (ignoreexc) {
38         exc = PyErr_GetRaisedException();
39     }
40     int res;
41     if (flags & XID_FREE) {
42         res = _PyCrossInterpreterData_ReleaseAndRawFree(data);
43     }
44     else {
45         res = _PyCrossInterpreterData_Release(data);
46     }
47     if (res < 0) {
48         /* The owning interpreter is already destroyed. */
49         if (ignoreexc) {
50             // XXX Emit a warning?
51             PyErr_Clear();
52         }
53     }
54     if (flags & XID_FREE) {
55         /* Either way, we free the data. */
56     }
57     if (ignoreexc) {
58         PyErr_SetRaisedException(exc);
59     }
60     return res;
61 }
62 
63 static PyInterpreterState *
_get_current_interp(void)64 _get_current_interp(void)
65 {
66     // PyInterpreterState_Get() aborts if lookup fails, so don't need
67     // to check the result for NULL.
68     return PyInterpreterState_Get();
69 }
70 
71 static PyObject *
_get_current_module(void)72 _get_current_module(void)
73 {
74     PyObject *name = PyUnicode_FromString(MODULE_NAME_STR);
75     if (name == NULL) {
76         return NULL;
77     }
78     PyObject *mod = PyImport_GetModule(name);
79     Py_DECREF(name);
80     if (mod == NULL) {
81         return NULL;
82     }
83     assert(mod != Py_None);
84     return mod;
85 }
86 
87 
88 struct idarg_int64_converter_data {
89     // input:
90     const char *label;
91     // output:
92     int64_t id;
93 };
94 
95 static int
idarg_int64_converter(PyObject * arg,void * ptr)96 idarg_int64_converter(PyObject *arg, void *ptr)
97 {
98     int64_t id;
99     struct idarg_int64_converter_data *data = ptr;
100 
101     const char *label = data->label;
102     if (label == NULL) {
103         label = "ID";
104     }
105 
106     if (PyIndex_Check(arg)) {
107         int overflow = 0;
108         id = PyLong_AsLongLongAndOverflow(arg, &overflow);
109         if (id == -1 && PyErr_Occurred()) {
110             return 0;
111         }
112         else if (id == -1 && overflow == 1) {
113             PyErr_Format(PyExc_OverflowError,
114                          "max %s is %lld, got %R", label, INT64_MAX, arg);
115             return 0;
116         }
117         else if (id < 0) {
118             PyErr_Format(PyExc_ValueError,
119                          "%s must be a non-negative int, got %R", label, arg);
120             return 0;
121         }
122     }
123     else {
124         PyErr_Format(PyExc_TypeError,
125                      "%s must be an int, got %.100s",
126                      label, Py_TYPE(arg)->tp_name);
127         return 0;
128     }
129     data->id = id;
130     return 1;
131 }
132 
133 
134 static int
ensure_highlevel_module_loaded(void)135 ensure_highlevel_module_loaded(void)
136 {
137     PyObject *highlevel = PyImport_ImportModule("interpreters.queues");
138     if (highlevel == NULL) {
139         PyErr_Clear();
140         highlevel = PyImport_ImportModule("test.support.interpreters.queues");
141         if (highlevel == NULL) {
142             return -1;
143         }
144     }
145     Py_DECREF(highlevel);
146     return 0;
147 }
148 
149 
150 /* module state *************************************************************/
151 
152 typedef struct {
153     /* external types (added at runtime by interpreters module) */
154     PyTypeObject *queue_type;
155 
156     /* QueueError (and its subclasses) */
157     PyObject *QueueError;
158     PyObject *QueueNotFoundError;
159     PyObject *QueueEmpty;
160     PyObject *QueueFull;
161 } module_state;
162 
163 static inline module_state *
get_module_state(PyObject * mod)164 get_module_state(PyObject *mod)
165 {
166     assert(mod != NULL);
167     module_state *state = PyModule_GetState(mod);
168     assert(state != NULL);
169     return state;
170 }
171 
172 static int
traverse_module_state(module_state * state,visitproc visit,void * arg)173 traverse_module_state(module_state *state, visitproc visit, void *arg)
174 {
175     /* external types */
176     Py_VISIT(state->queue_type);
177 
178     /* QueueError */
179     Py_VISIT(state->QueueError);
180     Py_VISIT(state->QueueNotFoundError);
181     Py_VISIT(state->QueueEmpty);
182     Py_VISIT(state->QueueFull);
183 
184     return 0;
185 }
186 
187 static int
clear_module_state(module_state * state)188 clear_module_state(module_state *state)
189 {
190     /* external types */
191     if (state->queue_type != NULL) {
192         (void)clear_xid_class(state->queue_type);
193     }
194     Py_CLEAR(state->queue_type);
195 
196     /* QueueError */
197     Py_CLEAR(state->QueueError);
198     Py_CLEAR(state->QueueNotFoundError);
199     Py_CLEAR(state->QueueEmpty);
200     Py_CLEAR(state->QueueFull);
201 
202     return 0;
203 }
204 
205 
206 /* error codes **************************************************************/
207 
208 #define ERR_EXCEPTION_RAISED (-1)
209 // multi-queue errors
210 #define ERR_QUEUES_ALLOC (-11)
211 #define ERR_QUEUE_ALLOC (-12)
212 #define ERR_NO_NEXT_QUEUE_ID (-13)
213 #define ERR_QUEUE_NOT_FOUND (-14)
214 // single-queue errors
215 #define ERR_QUEUE_EMPTY (-21)
216 #define ERR_QUEUE_FULL (-22)
217 #define ERR_QUEUE_NEVER_BOUND (-23)
218 
219 static int ensure_external_exc_types(module_state *);
220 
221 static int
resolve_module_errcode(module_state * state,int errcode,int64_t qid,PyObject ** p_exctype,PyObject ** p_msgobj)222 resolve_module_errcode(module_state *state, int errcode, int64_t qid,
223                        PyObject **p_exctype, PyObject **p_msgobj)
224 {
225     PyObject *exctype = NULL;
226     PyObject *msg = NULL;
227     switch (errcode) {
228     case ERR_NO_NEXT_QUEUE_ID:
229         exctype = state->QueueError;
230         msg = PyUnicode_FromString("ran out of queue IDs");
231         break;
232     case ERR_QUEUE_NOT_FOUND:
233         exctype = state->QueueNotFoundError;
234         msg = PyUnicode_FromFormat("queue %" PRId64 " not found", qid);
235         break;
236     case ERR_QUEUE_EMPTY:
237         if (ensure_external_exc_types(state) < 0) {
238             return -1;
239         }
240         exctype = state->QueueEmpty;
241         msg = PyUnicode_FromFormat("queue %" PRId64 " is empty", qid);
242         break;
243     case ERR_QUEUE_FULL:
244         if (ensure_external_exc_types(state) < 0) {
245             return -1;
246         }
247         exctype = state->QueueFull;
248         msg = PyUnicode_FromFormat("queue %" PRId64 " is full", qid);
249         break;
250     case ERR_QUEUE_NEVER_BOUND:
251         exctype = state->QueueError;
252         msg = PyUnicode_FromFormat("queue %" PRId64 " never bound", qid);
253         break;
254     default:
255         PyErr_Format(PyExc_ValueError,
256                      "unsupported error code %d", errcode);
257         return -1;
258     }
259 
260     if (msg == NULL) {
261         assert(PyErr_Occurred());
262         return -1;
263     }
264     *p_exctype = exctype;
265     *p_msgobj = msg;
266     return 0;
267 }
268 
269 
270 /* QueueError ***************************************************************/
271 
272 static int
add_exctype(PyObject * mod,PyObject ** p_state_field,const char * qualname,const char * doc,PyObject * base)273 add_exctype(PyObject *mod, PyObject **p_state_field,
274             const char *qualname, const char *doc, PyObject *base)
275 {
276 #ifndef NDEBUG
277     const char *dot = strrchr(qualname, '.');
278     assert(dot != NULL);
279     const char *name = dot+1;
280     assert(*p_state_field == NULL);
281     assert(!PyObject_HasAttrStringWithError(mod, name));
282 #endif
283     PyObject *exctype = PyErr_NewExceptionWithDoc(qualname, doc, base, NULL);
284     if (exctype == NULL) {
285         return -1;
286     }
287     if (PyModule_AddType(mod, (PyTypeObject *)exctype) < 0) {
288         Py_DECREF(exctype);
289         return -1;
290     }
291     *p_state_field = exctype;
292     return 0;
293 }
294 
295 static int
add_QueueError(PyObject * mod)296 add_QueueError(PyObject *mod)
297 {
298     module_state *state = get_module_state(mod);
299 
300 #define PREFIX "test.support.interpreters."
301 #define ADD_EXCTYPE(NAME, BASE, DOC)                                    \
302     assert(state->NAME == NULL);                                        \
303     if (add_exctype(mod, &state->NAME, PREFIX #NAME, DOC, BASE) < 0) {  \
304         return -1;                                                      \
305     }
306     ADD_EXCTYPE(QueueError, PyExc_RuntimeError,
307                 "Indicates that a queue-related error happened.")
308     ADD_EXCTYPE(QueueNotFoundError, state->QueueError, NULL)
309     // QueueEmpty and QueueFull are set by set_external_exc_types().
310     state->QueueEmpty = NULL;
311     state->QueueFull = NULL;
312 #undef ADD_EXCTYPE
313 #undef PREFIX
314 
315     return 0;
316 }
317 
318 static int
set_external_exc_types(module_state * state,PyObject * emptyerror,PyObject * fullerror)319 set_external_exc_types(module_state *state,
320                        PyObject *emptyerror, PyObject *fullerror)
321 {
322     if (state->QueueEmpty != NULL) {
323         assert(state->QueueFull != NULL);
324         Py_CLEAR(state->QueueEmpty);
325         Py_CLEAR(state->QueueFull);
326     }
327     else {
328         assert(state->QueueFull == NULL);
329     }
330     assert(PyObject_IsSubclass(emptyerror, state->QueueError));
331     assert(PyObject_IsSubclass(fullerror, state->QueueError));
332     state->QueueEmpty = Py_NewRef(emptyerror);
333     state->QueueFull = Py_NewRef(fullerror);
334     return 0;
335 }
336 
337 static int
ensure_external_exc_types(module_state * state)338 ensure_external_exc_types(module_state *state)
339 {
340     if (state->QueueEmpty != NULL) {
341         assert(state->QueueFull != NULL);
342         return 0;
343     }
344     assert(state->QueueFull == NULL);
345 
346     // Force the module to be loaded, to register the type.
347     if (ensure_highlevel_module_loaded() < 0) {
348         return -1;
349     }
350     assert(state->QueueEmpty != NULL);
351     assert(state->QueueFull != NULL);
352     return 0;
353 }
354 
355 static int
handle_queue_error(int err,PyObject * mod,int64_t qid)356 handle_queue_error(int err, PyObject *mod, int64_t qid)
357 {
358     if (err == 0) {
359         assert(!PyErr_Occurred());
360         return 0;
361     }
362     assert(err < 0);
363     assert((err == -1) == (PyErr_Occurred() != NULL));
364 
365     module_state *state;
366     switch (err) {
367     case ERR_QUEUE_ALLOC:  // fall through
368     case ERR_QUEUES_ALLOC:
369         PyErr_NoMemory();
370         break;
371     case -1:
372         return -1;
373     default:
374         state = get_module_state(mod);
375         assert(state->QueueError != NULL);
376         PyObject *exctype = NULL;
377         PyObject *msg = NULL;
378         if (resolve_module_errcode(state, err, qid, &exctype, &msg) < 0) {
379             return -1;
380         }
381         PyObject *exc = PyObject_CallOneArg(exctype, msg);
382         Py_DECREF(msg);
383         if (exc == NULL) {
384             return -1;
385         }
386         PyErr_SetObject(exctype, exc);
387         Py_DECREF(exc);
388     }
389     return 1;
390 }
391 
392 
393 /* the basic queue **********************************************************/
394 
395 struct _queueitem;
396 
397 typedef struct _queueitem {
398     /* The interpreter that added the item to the queue.
399        The actual bound interpid is found in item->data.
400        This is necessary because item->data might be NULL,
401        meaning the interpreter has been destroyed. */
402     int64_t interpid;
403     _PyCrossInterpreterData *data;
404     int fmt;
405     int unboundop;
406     struct _queueitem *next;
407 } _queueitem;
408 
409 static void
_queueitem_init(_queueitem * item,int64_t interpid,_PyCrossInterpreterData * data,int fmt,int unboundop)410 _queueitem_init(_queueitem *item,
411                 int64_t interpid, _PyCrossInterpreterData *data,
412                 int fmt, int unboundop)
413 {
414     if (interpid < 0) {
415         interpid = _get_interpid(data);
416     }
417     else {
418         assert(data == NULL
419                || _PyCrossInterpreterData_INTERPID(data) < 0
420                || interpid == _PyCrossInterpreterData_INTERPID(data));
421     }
422     assert(check_unbound(unboundop));
423     *item = (_queueitem){
424         .interpid = interpid,
425         .data = data,
426         .fmt = fmt,
427         .unboundop = unboundop,
428     };
429 }
430 
431 static void
_queueitem_clear_data(_queueitem * item)432 _queueitem_clear_data(_queueitem *item)
433 {
434     if (item->data == NULL) {
435         return;
436     }
437     // It was allocated in queue_put().
438     (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
439     item->data = NULL;
440 }
441 
442 static void
_queueitem_clear(_queueitem * item)443 _queueitem_clear(_queueitem *item)
444 {
445     item->next = NULL;
446     _queueitem_clear_data(item);
447 }
448 
449 static _queueitem *
_queueitem_new(int64_t interpid,_PyCrossInterpreterData * data,int fmt,int unboundop)450 _queueitem_new(int64_t interpid, _PyCrossInterpreterData *data,
451                int fmt, int unboundop)
452 {
453     _queueitem *item = GLOBAL_MALLOC(_queueitem);
454     if (item == NULL) {
455         PyErr_NoMemory();
456         return NULL;
457     }
458     _queueitem_init(item, interpid, data, fmt, unboundop);
459     return item;
460 }
461 
462 static void
_queueitem_free(_queueitem * item)463 _queueitem_free(_queueitem *item)
464 {
465     _queueitem_clear(item);
466     GLOBAL_FREE(item);
467 }
468 
469 static void
_queueitem_free_all(_queueitem * item)470 _queueitem_free_all(_queueitem *item)
471 {
472     while (item != NULL) {
473         _queueitem *last = item;
474         item = item->next;
475         _queueitem_free(last);
476     }
477 }
478 
479 static void
_queueitem_popped(_queueitem * item,_PyCrossInterpreterData ** p_data,int * p_fmt,int * p_unboundop)480 _queueitem_popped(_queueitem *item,
481                   _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop)
482 {
483     *p_data = item->data;
484     *p_fmt = item->fmt;
485     *p_unboundop = item->unboundop;
486     // We clear them here, so they won't be released in _queueitem_clear().
487     item->data = NULL;
488     _queueitem_free(item);
489 }
490 
491 static int
_queueitem_clear_interpreter(_queueitem * item)492 _queueitem_clear_interpreter(_queueitem *item)
493 {
494     assert(item->interpid >= 0);
495     if (item->data == NULL) {
496         // Its interpreter was already cleared (or it was never bound).
497         // For UNBOUND_REMOVE it should have been freed at that time.
498         assert(item->unboundop != UNBOUND_REMOVE);
499         return 0;
500     }
501     assert(_PyCrossInterpreterData_INTERPID(item->data) == item->interpid);
502 
503     switch (item->unboundop) {
504     case UNBOUND_REMOVE:
505         // The caller must free/clear it.
506         return 1;
507     case UNBOUND_ERROR:
508     case UNBOUND_REPLACE:
509         // We won't need the cross-interpreter data later
510         // so we completely throw it away.
511         _queueitem_clear_data(item);
512         return 0;
513     default:
514         Py_FatalError("not reachable");
515         return -1;
516     }
517 }
518 
519 
520 /* the queue */
521 
522 typedef struct _queue {
523     Py_ssize_t num_waiters;  // protected by global lock
524     PyThread_type_lock mutex;
525     int alive;
526     struct _queueitems {
527         Py_ssize_t maxsize;
528         Py_ssize_t count;
529         _queueitem *first;
530         _queueitem *last;
531     } items;
532     struct {
533         int fmt;
534         int unboundop;
535     } defaults;
536 } _queue;
537 
538 static int
_queue_init(_queue * queue,Py_ssize_t maxsize,int fmt,int unboundop)539 _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop)
540 {
541     assert(check_unbound(unboundop));
542     PyThread_type_lock mutex = PyThread_allocate_lock();
543     if (mutex == NULL) {
544         return ERR_QUEUE_ALLOC;
545     }
546     *queue = (_queue){
547         .mutex = mutex,
548         .alive = 1,
549         .items = {
550             .maxsize = maxsize,
551         },
552         .defaults = {
553             .fmt = fmt,
554             .unboundop = unboundop,
555         },
556     };
557     return 0;
558 }
559 
560 static void
_queue_clear(_queue * queue)561 _queue_clear(_queue *queue)
562 {
563     assert(!queue->alive);
564     assert(queue->num_waiters == 0);
565     _queueitem_free_all(queue->items.first);
566     assert(queue->mutex != NULL);
567     PyThread_free_lock(queue->mutex);
568     *queue = (_queue){0};
569 }
570 
571 static void _queue_free(_queue *);
572 
573 static void
_queue_kill_and_wait(_queue * queue)574 _queue_kill_and_wait(_queue *queue)
575 {
576     // Mark it as dead.
577     PyThread_acquire_lock(queue->mutex, WAIT_LOCK);
578     assert(queue->alive);
579     queue->alive = 0;
580     PyThread_release_lock(queue->mutex);
581 
582     // Wait for all waiters to fail.
583     while (queue->num_waiters > 0) {
584         PyThread_acquire_lock(queue->mutex, WAIT_LOCK);
585         PyThread_release_lock(queue->mutex);
586     };
587 }
588 
589 static void
_queue_mark_waiter(_queue * queue,PyThread_type_lock parent_mutex)590 _queue_mark_waiter(_queue *queue, PyThread_type_lock parent_mutex)
591 {
592     if (parent_mutex != NULL) {
593         PyThread_acquire_lock(parent_mutex, WAIT_LOCK);
594         queue->num_waiters += 1;
595         PyThread_release_lock(parent_mutex);
596     }
597     else {
598         // The caller must be holding the parent lock already.
599         queue->num_waiters += 1;
600     }
601 }
602 
603 static void
_queue_unmark_waiter(_queue * queue,PyThread_type_lock parent_mutex)604 _queue_unmark_waiter(_queue *queue, PyThread_type_lock parent_mutex)
605 {
606     if (parent_mutex != NULL) {
607         PyThread_acquire_lock(parent_mutex, WAIT_LOCK);
608         queue->num_waiters -= 1;
609         PyThread_release_lock(parent_mutex);
610     }
611     else {
612         // The caller must be holding the parent lock already.
613         queue->num_waiters -= 1;
614     }
615 }
616 
617 static int
_queue_lock(_queue * queue)618 _queue_lock(_queue *queue)
619 {
620     // The queue must be marked as a waiter already.
621     PyThread_acquire_lock(queue->mutex, WAIT_LOCK);
622     if (!queue->alive) {
623         PyThread_release_lock(queue->mutex);
624         return ERR_QUEUE_NOT_FOUND;
625     }
626     return 0;
627 }
628 
629 static void
_queue_unlock(_queue * queue)630 _queue_unlock(_queue *queue)
631 {
632     PyThread_release_lock(queue->mutex);
633 }
634 
635 static int
_queue_add(_queue * queue,int64_t interpid,_PyCrossInterpreterData * data,int fmt,int unboundop)636 _queue_add(_queue *queue, int64_t interpid, _PyCrossInterpreterData *data,
637            int fmt, int unboundop)
638 {
639     int err = _queue_lock(queue);
640     if (err < 0) {
641         return err;
642     }
643 
644     Py_ssize_t maxsize = queue->items.maxsize;
645     if (maxsize <= 0) {
646         maxsize = PY_SSIZE_T_MAX;
647     }
648     if (queue->items.count >= maxsize) {
649         _queue_unlock(queue);
650         return ERR_QUEUE_FULL;
651     }
652 
653     _queueitem *item = _queueitem_new(interpid, data, fmt, unboundop);
654     if (item == NULL) {
655         _queue_unlock(queue);
656         return -1;
657     }
658 
659     queue->items.count += 1;
660     if (queue->items.first == NULL) {
661         queue->items.first = item;
662     }
663     else {
664         queue->items.last->next = item;
665     }
666     queue->items.last = item;
667 
668     _queue_unlock(queue);
669     return 0;
670 }
671 
672 static int
_queue_next(_queue * queue,_PyCrossInterpreterData ** p_data,int * p_fmt,int * p_unboundop)673 _queue_next(_queue *queue,
674             _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop)
675 {
676     int err = _queue_lock(queue);
677     if (err < 0) {
678         return err;
679     }
680 
681     assert(queue->items.count >= 0);
682     _queueitem *item = queue->items.first;
683     if (item == NULL) {
684         _queue_unlock(queue);
685         return ERR_QUEUE_EMPTY;
686     }
687     queue->items.first = item->next;
688     if (queue->items.last == item) {
689         queue->items.last = NULL;
690     }
691     queue->items.count -= 1;
692 
693     _queueitem_popped(item, p_data, p_fmt, p_unboundop);
694 
695     _queue_unlock(queue);
696     return 0;
697 }
698 
699 static int
_queue_get_maxsize(_queue * queue,Py_ssize_t * p_maxsize)700 _queue_get_maxsize(_queue *queue, Py_ssize_t *p_maxsize)
701 {
702     int err = _queue_lock(queue);
703     if (err < 0) {
704         return err;
705     }
706 
707     *p_maxsize = queue->items.maxsize;
708 
709     _queue_unlock(queue);
710     return 0;
711 }
712 
713 static int
_queue_is_full(_queue * queue,int * p_is_full)714 _queue_is_full(_queue *queue, int *p_is_full)
715 {
716     int err = _queue_lock(queue);
717     if (err < 0) {
718         return err;
719     }
720 
721     assert(queue->items.count <= queue->items.maxsize);
722     *p_is_full = queue->items.count == queue->items.maxsize;
723 
724     _queue_unlock(queue);
725     return 0;
726 }
727 
728 static int
_queue_get_count(_queue * queue,Py_ssize_t * p_count)729 _queue_get_count(_queue *queue, Py_ssize_t *p_count)
730 {
731     int err = _queue_lock(queue);
732     if (err < 0) {
733         return err;
734     }
735 
736     *p_count = queue->items.count;
737 
738     _queue_unlock(queue);
739     return 0;
740 }
741 
742 static void
_queue_clear_interpreter(_queue * queue,int64_t interpid)743 _queue_clear_interpreter(_queue *queue, int64_t interpid)
744 {
745     int err = _queue_lock(queue);
746     if (err == ERR_QUEUE_NOT_FOUND) {
747         // The queue is already destroyed, so there's nothing to clear.
748         assert(!PyErr_Occurred());
749         return;
750     }
751     assert(err == 0);  // There should be no other errors.
752 
753     _queueitem *prev = NULL;
754     _queueitem *next = queue->items.first;
755     while (next != NULL) {
756         _queueitem *item = next;
757         next = item->next;
758         int remove = (item->interpid == interpid)
759             ? _queueitem_clear_interpreter(item)
760             : 0;
761         if (remove) {
762             _queueitem_free(item);
763             if (prev == NULL) {
764                 queue->items.first = next;
765             }
766             else {
767                 prev->next = next;
768             }
769             queue->items.count -= 1;
770         }
771         else {
772             prev = item;
773         }
774     }
775 
776     _queue_unlock(queue);
777 }
778 
779 
780 /* external queue references ************************************************/
781 
782 struct _queueref;
783 
784 typedef struct _queueref {
785     struct _queueref *next;
786     int64_t qid;
787     Py_ssize_t refcount;
788     _queue *queue;
789 } _queueref;
790 
791 static _queueref *
_queuerefs_find(_queueref * first,int64_t qid,_queueref ** pprev)792 _queuerefs_find(_queueref *first, int64_t qid, _queueref **pprev)
793 {
794     _queueref *prev = NULL;
795     _queueref *ref = first;
796     while (ref != NULL) {
797         if (ref->qid == qid) {
798             break;
799         }
800         prev = ref;
801         ref = ref->next;
802     }
803     if (pprev != NULL) {
804         *pprev = prev;
805     }
806     return ref;
807 }
808 
809 static void
_queuerefs_clear(_queueref * head)810 _queuerefs_clear(_queueref *head)
811 {
812     _queueref *next = head;
813     while (next != NULL) {
814         _queueref *ref = next;
815         next = ref->next;
816 
817 #ifdef Py_DEBUG
818         int64_t qid = ref->qid;
819         fprintf(stderr, "queue %" PRId64 " still exists\n", qid);
820 #endif
821         _queue *queue = ref->queue;
822         GLOBAL_FREE(ref);
823 
824         _queue_kill_and_wait(queue);
825 #ifdef Py_DEBUG
826     if (queue->items.count > 0) {
827         fprintf(stderr, "queue %" PRId64 " still holds %zd items\n",
828                 qid, queue->items.count);
829     }
830 #endif
831         _queue_free(queue);
832     }
833 }
834 
835 
836 /* a collection of queues ***************************************************/
837 
838 typedef struct _queues {
839     PyThread_type_lock mutex;
840     _queueref *head;
841     int64_t count;
842     int64_t next_id;
843 } _queues;
844 
845 static void
_queues_init(_queues * queues,PyThread_type_lock mutex)846 _queues_init(_queues *queues, PyThread_type_lock mutex)
847 {
848     assert(mutex != NULL);
849     assert(queues->mutex == NULL);
850     *queues = (_queues){
851         .mutex = mutex,
852         .head = NULL,
853         .count = 0,
854         .next_id = 1,
855     };
856 }
857 
858 static void
_queues_fini(_queues * queues,PyThread_type_lock * p_mutex)859 _queues_fini(_queues *queues, PyThread_type_lock *p_mutex)
860 {
861     PyThread_type_lock mutex = queues->mutex;
862     assert(mutex != NULL);
863 
864     PyThread_acquire_lock(mutex, WAIT_LOCK);
865     if (queues->count > 0) {
866         assert(queues->head != NULL);
867         _queuerefs_clear(queues->head);
868     }
869     *queues = (_queues){0};
870     PyThread_release_lock(mutex);
871 
872     *p_mutex = mutex;
873 }
874 
875 static int64_t
_queues_next_id(_queues * queues)876 _queues_next_id(_queues *queues)  // needs lock
877 {
878     int64_t qid = queues->next_id;
879     if (qid < 0) {
880         /* overflow */
881         return ERR_NO_NEXT_QUEUE_ID;
882     }
883     queues->next_id += 1;
884     return qid;
885 }
886 
887 static int
_queues_lookup(_queues * queues,int64_t qid,_queue ** res)888 _queues_lookup(_queues *queues, int64_t qid, _queue **res)
889 {
890     PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
891 
892     _queueref *ref = _queuerefs_find(queues->head, qid, NULL);
893     if (ref == NULL) {
894         PyThread_release_lock(queues->mutex);
895         return ERR_QUEUE_NOT_FOUND;
896     }
897     assert(ref->queue != NULL);
898     _queue *queue = ref->queue;
899     _queue_mark_waiter(queue, NULL);
900     // The caller must unmark it.
901 
902     PyThread_release_lock(queues->mutex);
903 
904     *res = queue;
905     return 0;
906 }
907 
908 static int64_t
_queues_add(_queues * queues,_queue * queue)909 _queues_add(_queues *queues, _queue *queue)
910 {
911     int64_t qid = -1;
912     PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
913 
914     // Create a new ref.
915     int64_t _qid = _queues_next_id(queues);
916     if (_qid < 0) {
917         goto done;
918     }
919     _queueref *ref = GLOBAL_MALLOC(_queueref);
920     if (ref == NULL) {
921         qid = ERR_QUEUE_ALLOC;
922         goto done;
923     }
924     *ref = (_queueref){
925         .qid = _qid,
926         .queue = queue,
927     };
928 
929     // Add it to the list.
930     // We assume that the queue is a new one (not already in the list).
931     ref->next = queues->head;
932     queues->head = ref;
933     queues->count += 1;
934 
935     qid = _qid;
936 done:
937     PyThread_release_lock(queues->mutex);
938     return qid;
939 }
940 
941 static void
_queues_remove_ref(_queues * queues,_queueref * ref,_queueref * prev,_queue ** p_queue)942 _queues_remove_ref(_queues *queues, _queueref *ref, _queueref *prev,
943                    _queue **p_queue)
944 {
945     assert(ref->queue != NULL);
946 
947     if (ref == queues->head) {
948         queues->head = ref->next;
949     }
950     else {
951         prev->next = ref->next;
952     }
953     ref->next = NULL;
954     queues->count -= 1;
955 
956     *p_queue = ref->queue;
957     ref->queue = NULL;
958     GLOBAL_FREE(ref);
959 }
960 
961 static int
_queues_remove(_queues * queues,int64_t qid,_queue ** p_queue)962 _queues_remove(_queues *queues, int64_t qid, _queue **p_queue)
963 {
964     PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
965 
966     _queueref *prev = NULL;
967     _queueref *ref = _queuerefs_find(queues->head, qid, &prev);
968     if (ref == NULL) {
969         PyThread_release_lock(queues->mutex);
970         return ERR_QUEUE_NOT_FOUND;
971     }
972 
973     _queues_remove_ref(queues, ref, prev, p_queue);
974     PyThread_release_lock(queues->mutex);
975 
976     return 0;
977 }
978 
979 static int
_queues_incref(_queues * queues,int64_t qid)980 _queues_incref(_queues *queues, int64_t qid)
981 {
982     // XXX Track interpreter IDs?
983     int res = -1;
984     PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
985 
986     _queueref *ref = _queuerefs_find(queues->head, qid, NULL);
987     if (ref == NULL) {
988         assert(!PyErr_Occurred());
989         res = ERR_QUEUE_NOT_FOUND;
990         goto done;
991     }
992     ref->refcount += 1;
993 
994     res = 0;
995 done:
996     PyThread_release_lock(queues->mutex);
997     return res;
998 }
999 
1000 static int
_queues_decref(_queues * queues,int64_t qid)1001 _queues_decref(_queues *queues, int64_t qid)
1002 {
1003     int res = -1;
1004     PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
1005 
1006     _queueref *prev = NULL;
1007     _queueref *ref = _queuerefs_find(queues->head, qid, &prev);
1008     if (ref == NULL) {
1009         assert(!PyErr_Occurred());
1010         res = ERR_QUEUE_NOT_FOUND;
1011         goto finally;
1012     }
1013     if (ref->refcount == 0) {
1014         res = ERR_QUEUE_NEVER_BOUND;
1015         goto finally;
1016     }
1017     assert(ref->refcount > 0);
1018     ref->refcount -= 1;
1019 
1020     // Destroy if no longer used.
1021     assert(ref->queue != NULL);
1022     if (ref->refcount == 0) {
1023         _queue *queue = NULL;
1024         _queues_remove_ref(queues, ref, prev, &queue);
1025         PyThread_release_lock(queues->mutex);
1026 
1027         _queue_kill_and_wait(queue);
1028         _queue_free(queue);
1029         return 0;
1030     }
1031 
1032     res = 0;
1033 finally:
1034     PyThread_release_lock(queues->mutex);
1035     return res;
1036 }
1037 
1038 struct queue_id_and_info {
1039     int64_t id;
1040     int fmt;
1041     int unboundop;
1042 };
1043 
1044 static struct queue_id_and_info *
_queues_list_all(_queues * queues,int64_t * p_count)1045 _queues_list_all(_queues *queues, int64_t *p_count)
1046 {
1047     struct queue_id_and_info *qids = NULL;
1048     PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
1049     struct queue_id_and_info *ids = PyMem_NEW(struct queue_id_and_info,
1050                                               (Py_ssize_t)(queues->count));
1051     if (ids == NULL) {
1052         goto done;
1053     }
1054     _queueref *ref = queues->head;
1055     for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
1056         ids[i].id = ref->qid;
1057         assert(ref->queue != NULL);
1058         ids[i].fmt = ref->queue->defaults.fmt;
1059         ids[i].unboundop = ref->queue->defaults.unboundop;
1060     }
1061     *p_count = queues->count;
1062 
1063     qids = ids;
1064 done:
1065     PyThread_release_lock(queues->mutex);
1066     return qids;
1067 }
1068 
1069 static void
_queues_clear_interpreter(_queues * queues,int64_t interpid)1070 _queues_clear_interpreter(_queues *queues, int64_t interpid)
1071 {
1072     PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
1073 
1074     _queueref *ref = queues->head;
1075     for (; ref != NULL; ref = ref->next) {
1076         assert(ref->queue != NULL);
1077         _queue_clear_interpreter(ref->queue, interpid);
1078     }
1079 
1080     PyThread_release_lock(queues->mutex);
1081 }
1082 
1083 
1084 /* "high"-level queue-related functions *************************************/
1085 
1086 static void
_queue_free(_queue * queue)1087 _queue_free(_queue *queue)
1088 {
1089     _queue_clear(queue);
1090     GLOBAL_FREE(queue);
1091 }
1092 
1093 // Create a new queue.
1094 static int64_t
queue_create(_queues * queues,Py_ssize_t maxsize,int fmt,int unboundop)1095 queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop)
1096 {
1097     _queue *queue = GLOBAL_MALLOC(_queue);
1098     if (queue == NULL) {
1099         return ERR_QUEUE_ALLOC;
1100     }
1101     int err = _queue_init(queue, maxsize, fmt, unboundop);
1102     if (err < 0) {
1103         GLOBAL_FREE(queue);
1104         return (int64_t)err;
1105     }
1106     int64_t qid = _queues_add(queues, queue);
1107     if (qid < 0) {
1108         _queue_clear(queue);
1109         GLOBAL_FREE(queue);
1110     }
1111     return qid;
1112 }
1113 
1114 // Completely destroy the queue.
1115 static int
queue_destroy(_queues * queues,int64_t qid)1116 queue_destroy(_queues *queues, int64_t qid)
1117 {
1118     _queue *queue = NULL;
1119     int err = _queues_remove(queues, qid, &queue);
1120     if (err < 0) {
1121         return err;
1122     }
1123     _queue_kill_and_wait(queue);
1124     _queue_free(queue);
1125     return 0;
1126 }
1127 
1128 // Push an object onto the queue.
1129 static int
queue_put(_queues * queues,int64_t qid,PyObject * obj,int fmt,int unboundop)1130 queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
1131 {
1132     // Look up the queue.
1133     _queue *queue = NULL;
1134     int err = _queues_lookup(queues, qid, &queue);
1135     if (err != 0) {
1136         return err;
1137     }
1138     assert(queue != NULL);
1139 
1140     // Convert the object to cross-interpreter data.
1141     _PyCrossInterpreterData *data = GLOBAL_MALLOC(_PyCrossInterpreterData);
1142     if (data == NULL) {
1143         _queue_unmark_waiter(queue, queues->mutex);
1144         return -1;
1145     }
1146     if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
1147         _queue_unmark_waiter(queue, queues->mutex);
1148         GLOBAL_FREE(data);
1149         return -1;
1150     }
1151     assert(_PyCrossInterpreterData_INTERPID(data) == \
1152            PyInterpreterState_GetID(PyInterpreterState_Get()));
1153 
1154     // Add the data to the queue.
1155     int64_t interpid = -1;  // _queueitem_init() will set it.
1156     int res = _queue_add(queue, interpid, data, fmt, unboundop);
1157     _queue_unmark_waiter(queue, queues->mutex);
1158     if (res != 0) {
1159         // We may chain an exception here:
1160         (void)_release_xid_data(data, 0);
1161         GLOBAL_FREE(data);
1162         return res;
1163     }
1164 
1165     return 0;
1166 }
1167 
1168 // Pop the next object off the queue.  Fail if empty.
1169 // XXX Support a "wait" mutex?
1170 static int
queue_get(_queues * queues,int64_t qid,PyObject ** res,int * p_fmt,int * p_unboundop)1171 queue_get(_queues *queues, int64_t qid,
1172           PyObject **res, int *p_fmt, int *p_unboundop)
1173 {
1174     int err;
1175     *res = NULL;
1176 
1177     // Look up the queue.
1178     _queue *queue = NULL;
1179     err = _queues_lookup(queues, qid, &queue);
1180     if (err != 0) {
1181         return err;
1182     }
1183     // Past this point we are responsible for releasing the mutex.
1184     assert(queue != NULL);
1185 
1186     // Pop off the next item from the queue.
1187     _PyCrossInterpreterData *data = NULL;
1188     err = _queue_next(queue, &data, p_fmt, p_unboundop);
1189     _queue_unmark_waiter(queue, queues->mutex);
1190     if (err != 0) {
1191         return err;
1192     }
1193     else if (data == NULL) {
1194         assert(!PyErr_Occurred());
1195         return 0;
1196     }
1197 
1198     // Convert the data back to an object.
1199     PyObject *obj = _PyCrossInterpreterData_NewObject(data);
1200     if (obj == NULL) {
1201         assert(PyErr_Occurred());
1202         // It was allocated in queue_put(), so we free it.
1203         (void)_release_xid_data(data, XID_IGNORE_EXC | XID_FREE);
1204         return -1;
1205     }
1206     // It was allocated in queue_put(), so we free it.
1207     int release_res = _release_xid_data(data, XID_FREE);
1208     if (release_res < 0) {
1209         // The source interpreter has been destroyed already.
1210         assert(PyErr_Occurred());
1211         Py_DECREF(obj);
1212         return -1;
1213     }
1214 
1215     *res = obj;
1216     return 0;
1217 }
1218 
1219 static int
queue_get_maxsize(_queues * queues,int64_t qid,Py_ssize_t * p_maxsize)1220 queue_get_maxsize(_queues *queues, int64_t qid, Py_ssize_t *p_maxsize)
1221 {
1222     _queue *queue = NULL;
1223     int err = _queues_lookup(queues, qid, &queue);
1224     if (err < 0) {
1225         return err;
1226     }
1227     err = _queue_get_maxsize(queue, p_maxsize);
1228     _queue_unmark_waiter(queue, queues->mutex);
1229     return err;
1230 }
1231 
1232 static int
queue_is_full(_queues * queues,int64_t qid,int * p_is_full)1233 queue_is_full(_queues *queues, int64_t qid, int *p_is_full)
1234 {
1235     _queue *queue = NULL;
1236     int err = _queues_lookup(queues, qid, &queue);
1237     if (err < 0) {
1238         return err;
1239     }
1240     err = _queue_is_full(queue, p_is_full);
1241     _queue_unmark_waiter(queue, queues->mutex);
1242     return err;
1243 }
1244 
1245 static int
queue_get_count(_queues * queues,int64_t qid,Py_ssize_t * p_count)1246 queue_get_count(_queues *queues, int64_t qid, Py_ssize_t *p_count)
1247 {
1248     _queue *queue = NULL;
1249     int err = _queues_lookup(queues, qid, &queue);
1250     if (err < 0) {
1251         return err;
1252     }
1253     err = _queue_get_count(queue, p_count);
1254     _queue_unmark_waiter(queue, queues->mutex);
1255     return err;
1256 }
1257 
1258 
1259 /* external Queue objects ***************************************************/
1260 
1261 static int _queueobj_shared(PyThreadState *,
1262                             PyObject *, _PyCrossInterpreterData *);
1263 
1264 static int
set_external_queue_type(module_state * state,PyTypeObject * queue_type)1265 set_external_queue_type(module_state *state, PyTypeObject *queue_type)
1266 {
1267     // Clear the old value if the .py module was reloaded.
1268     if (state->queue_type != NULL) {
1269         (void)clear_xid_class(state->queue_type);
1270         Py_CLEAR(state->queue_type);
1271     }
1272 
1273     // Add and register the new type.
1274     if (ensure_xid_class(queue_type, _queueobj_shared) < 0) {
1275         return -1;
1276     }
1277     state->queue_type = (PyTypeObject *)Py_NewRef(queue_type);
1278 
1279     return 0;
1280 }
1281 
1282 static PyTypeObject *
get_external_queue_type(PyObject * module)1283 get_external_queue_type(PyObject *module)
1284 {
1285     module_state *state = get_module_state(module);
1286 
1287     PyTypeObject *cls = state->queue_type;
1288     if (cls == NULL) {
1289         // Force the module to be loaded, to register the type.
1290         if (ensure_highlevel_module_loaded() < 0) {
1291             return NULL;
1292         }
1293         cls = state->queue_type;
1294         assert(cls != NULL);
1295     }
1296     return cls;
1297 }
1298 
1299 
1300 // XXX Use a new __xid__ protocol instead?
1301 
1302 struct _queueid_xid {
1303     int64_t qid;
1304 };
1305 
1306 static _queues * _get_global_queues(void);
1307 
1308 static void *
_queueid_xid_new(int64_t qid)1309 _queueid_xid_new(int64_t qid)
1310 {
1311     _queues *queues = _get_global_queues();
1312     if (_queues_incref(queues, qid) < 0) {
1313         return NULL;
1314     }
1315 
1316     struct _queueid_xid *data = PyMem_RawMalloc(sizeof(struct _queueid_xid));
1317     if (data == NULL) {
1318         _queues_decref(queues, qid);
1319         return NULL;
1320     }
1321     data->qid = qid;
1322     return (void *)data;
1323 }
1324 
1325 static void
_queueid_xid_free(void * data)1326 _queueid_xid_free(void *data)
1327 {
1328     int64_t qid = ((struct _queueid_xid *)data)->qid;
1329     PyMem_RawFree(data);
1330     _queues *queues = _get_global_queues();
1331     int res = _queues_decref(queues, qid);
1332     if (res == ERR_QUEUE_NOT_FOUND) {
1333         // Already destroyed.
1334         // XXX Warn?
1335     }
1336     else {
1337         assert(res == 0);
1338     }
1339 }
1340 
1341 static PyObject *
_queueobj_from_xid(_PyCrossInterpreterData * data)1342 _queueobj_from_xid(_PyCrossInterpreterData *data)
1343 {
1344     int64_t qid = *(int64_t *)_PyCrossInterpreterData_DATA(data);
1345     PyObject *qidobj = PyLong_FromLongLong(qid);
1346     if (qidobj == NULL) {
1347         return NULL;
1348     }
1349 
1350     PyObject *mod = _get_current_module();
1351     if (mod == NULL) {
1352         // XXX import it?
1353         PyErr_SetString(PyExc_RuntimeError,
1354                         MODULE_NAME_STR " module not imported yet");
1355         return NULL;
1356     }
1357 
1358     PyTypeObject *cls = get_external_queue_type(mod);
1359     Py_DECREF(mod);
1360     if (cls == NULL) {
1361         Py_DECREF(qidobj);
1362         return NULL;
1363     }
1364     PyObject *obj = PyObject_CallOneArg((PyObject *)cls, (PyObject *)qidobj);
1365     Py_DECREF(qidobj);
1366     return obj;
1367 }
1368 
1369 static int
_queueobj_shared(PyThreadState * tstate,PyObject * queueobj,_PyCrossInterpreterData * data)1370 _queueobj_shared(PyThreadState *tstate, PyObject *queueobj,
1371                  _PyCrossInterpreterData *data)
1372 {
1373     PyObject *qidobj = PyObject_GetAttrString(queueobj, "_id");
1374     if (qidobj == NULL) {
1375         return -1;
1376     }
1377     struct idarg_int64_converter_data converted = {
1378         .label = "queue ID",
1379     };
1380     int res = idarg_int64_converter(qidobj, &converted);
1381     Py_CLEAR(qidobj);
1382     if (!res) {
1383         assert(PyErr_Occurred());
1384         return -1;
1385     }
1386 
1387     void *raw = _queueid_xid_new(converted.id);
1388     if (raw == NULL) {
1389         return -1;
1390     }
1391     _PyCrossInterpreterData_Init(data, tstate->interp, raw, NULL,
1392                                  _queueobj_from_xid);
1393     _PyCrossInterpreterData_SET_FREE(data, _queueid_xid_free);
1394     return 0;
1395 }
1396 
1397 
1398 /* module level code ********************************************************/
1399 
1400 /* globals is the process-global state for the module.  It holds all
1401    the data that we need to share between interpreters, so it cannot
1402    hold PyObject values. */
1403 static struct globals {
1404     PyMutex mutex;
1405     int module_count;
1406     _queues queues;
1407 } _globals = {0};
1408 
1409 static int
_globals_init(void)1410 _globals_init(void)
1411 {
1412     PyMutex_Lock(&_globals.mutex);
1413     assert(_globals.module_count >= 0);
1414     _globals.module_count++;
1415     if (_globals.module_count == 1) {
1416         // Called for the first time.
1417         PyThread_type_lock mutex = PyThread_allocate_lock();
1418         if (mutex == NULL) {
1419             _globals.module_count--;
1420             PyMutex_Unlock(&_globals.mutex);
1421             return ERR_QUEUES_ALLOC;
1422         }
1423         _queues_init(&_globals.queues, mutex);
1424     }
1425     PyMutex_Unlock(&_globals.mutex);
1426     return 0;
1427 }
1428 
1429 static void
_globals_fini(void)1430 _globals_fini(void)
1431 {
1432     PyMutex_Lock(&_globals.mutex);
1433     assert(_globals.module_count > 0);
1434     _globals.module_count--;
1435     if (_globals.module_count == 0) {
1436         PyThread_type_lock mutex;
1437         _queues_fini(&_globals.queues, &mutex);
1438         assert(mutex != NULL);
1439         PyThread_free_lock(mutex);
1440     }
1441     PyMutex_Unlock(&_globals.mutex);
1442 }
1443 
1444 static _queues *
_get_global_queues(void)1445 _get_global_queues(void)
1446 {
1447     return &_globals.queues;
1448 }
1449 
1450 
1451 static void
clear_interpreter(void * data)1452 clear_interpreter(void *data)
1453 {
1454     if (_globals.module_count == 0) {
1455         return;
1456     }
1457     PyInterpreterState *interp = (PyInterpreterState *)data;
1458     assert(interp == _get_current_interp());
1459     int64_t interpid = PyInterpreterState_GetID(interp);
1460     _queues_clear_interpreter(&_globals.queues, interpid);
1461 }
1462 
1463 
1464 typedef struct idarg_int64_converter_data qidarg_converter_data;
1465 
1466 static int
qidarg_converter(PyObject * arg,void * ptr)1467 qidarg_converter(PyObject *arg, void *ptr)
1468 {
1469     qidarg_converter_data *data = ptr;
1470     if (data->label == NULL) {
1471         data->label = "queue ID";
1472     }
1473     return idarg_int64_converter(arg, ptr);
1474 }
1475 
1476 
1477 static PyObject *
queuesmod_create(PyObject * self,PyObject * args,PyObject * kwds)1478 queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
1479 {
1480     static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL};
1481     Py_ssize_t maxsize;
1482     int fmt;
1483     int unboundop;
1484     if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist,
1485                                      &maxsize, &fmt, &unboundop))
1486     {
1487         return NULL;
1488     }
1489     if (!check_unbound(unboundop)) {
1490         PyErr_Format(PyExc_ValueError,
1491                      "unsupported unboundop %d", unboundop);
1492         return NULL;
1493     }
1494 
1495     int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop);
1496     if (qid < 0) {
1497         (void)handle_queue_error((int)qid, self, qid);
1498         return NULL;
1499     }
1500 
1501     PyObject *qidobj = PyLong_FromLongLong(qid);
1502     if (qidobj == NULL) {
1503         PyObject *exc = PyErr_GetRaisedException();
1504         int err = queue_destroy(&_globals.queues, qid);
1505         if (handle_queue_error(err, self, qid)) {
1506             // XXX issue a warning?
1507             PyErr_Clear();
1508         }
1509         PyErr_SetRaisedException(exc);
1510         return NULL;
1511     }
1512 
1513     return qidobj;
1514 }
1515 
1516 PyDoc_STRVAR(queuesmod_create_doc,
1517 "create(maxsize, fmt, unboundop) -> qid\n\
1518 \n\
1519 Create a new cross-interpreter queue and return its unique generated ID.\n\
1520 It is a new reference as though bind() had been called on the queue.\n\
1521 \n\
1522 The caller is responsible for calling destroy() for the new queue\n\
1523 before the runtime is finalized.");
1524 
1525 static PyObject *
queuesmod_destroy(PyObject * self,PyObject * args,PyObject * kwds)1526 queuesmod_destroy(PyObject *self, PyObject *args, PyObject *kwds)
1527 {
1528     static char *kwlist[] = {"qid", NULL};
1529     qidarg_converter_data qidarg = {0};
1530     if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:destroy", kwlist,
1531                                      qidarg_converter, &qidarg)) {
1532         return NULL;
1533     }
1534     int64_t qid = qidarg.id;
1535 
1536     int err = queue_destroy(&_globals.queues, qid);
1537     if (handle_queue_error(err, self, qid)) {
1538         return NULL;
1539     }
1540     Py_RETURN_NONE;
1541 }
1542 
1543 PyDoc_STRVAR(queuesmod_destroy_doc,
1544 "destroy(qid)\n\
1545 \n\
1546 Clear and destroy the queue.  Afterward attempts to use the queue\n\
1547 will behave as though it never existed.");
1548 
1549 static PyObject *
queuesmod_list_all(PyObject * self,PyObject * Py_UNUSED (ignored))1550 queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
1551 {
1552     int64_t count = 0;
1553     struct queue_id_and_info *qids = _queues_list_all(&_globals.queues, &count);
1554     if (qids == NULL) {
1555         if (!PyErr_Occurred() && count == 0) {
1556             return PyList_New(0);
1557         }
1558         return NULL;
1559     }
1560     PyObject *ids = PyList_New((Py_ssize_t)count);
1561     if (ids == NULL) {
1562         goto finally;
1563     }
1564     struct queue_id_and_info *cur = qids;
1565     for (int64_t i=0; i < count; cur++, i++) {
1566         PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt,
1567                                        cur->unboundop);
1568         if (item == NULL) {
1569             Py_SETREF(ids, NULL);
1570             break;
1571         }
1572         PyList_SET_ITEM(ids, (Py_ssize_t)i, item);
1573     }
1574 
1575 finally:
1576     PyMem_Free(qids);
1577     return ids;
1578 }
1579 
1580 PyDoc_STRVAR(queuesmod_list_all_doc,
1581 "list_all() -> [(qid, fmt)]\n\
1582 \n\
1583 Return the list of IDs for all queues.\n\
1584 Each corresponding default format is also included.");
1585 
1586 static PyObject *
queuesmod_put(PyObject * self,PyObject * args,PyObject * kwds)1587 queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
1588 {
1589     static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL};
1590     qidarg_converter_data qidarg = {0};
1591     PyObject *obj;
1592     int fmt;
1593     int unboundop;
1594     if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist,
1595                                      qidarg_converter, &qidarg, &obj, &fmt,
1596                                      &unboundop))
1597     {
1598         return NULL;
1599     }
1600     int64_t qid = qidarg.id;
1601     if (!check_unbound(unboundop)) {
1602         PyErr_Format(PyExc_ValueError,
1603                      "unsupported unboundop %d", unboundop);
1604         return NULL;
1605     }
1606 
1607     /* Queue up the object. */
1608     int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop);
1609     // This is the only place that raises QueueFull.
1610     if (handle_queue_error(err, self, qid)) {
1611         return NULL;
1612     }
1613 
1614     Py_RETURN_NONE;
1615 }
1616 
1617 PyDoc_STRVAR(queuesmod_put_doc,
1618 "put(qid, obj, fmt)\n\
1619 \n\
1620 Add the object's data to the queue.");
1621 
1622 static PyObject *
queuesmod_get(PyObject * self,PyObject * args,PyObject * kwds)1623 queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds)
1624 {
1625     static char *kwlist[] = {"qid", NULL};
1626     qidarg_converter_data qidarg = {0};
1627     if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:get", kwlist,
1628                                      qidarg_converter, &qidarg)) {
1629         return NULL;
1630     }
1631     int64_t qid = qidarg.id;
1632 
1633     PyObject *obj = NULL;
1634     int fmt = 0;
1635     int unboundop = 0;
1636     int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop);
1637     // This is the only place that raises QueueEmpty.
1638     if (handle_queue_error(err, self, qid)) {
1639         return NULL;
1640     }
1641 
1642     if (obj == NULL) {
1643         return Py_BuildValue("Oii", Py_None, fmt, unboundop);
1644     }
1645     PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None);
1646     Py_DECREF(obj);
1647     return res;
1648 }
1649 
1650 PyDoc_STRVAR(queuesmod_get_doc,
1651 "get(qid) -> (obj, fmt)\n\
1652 \n\
1653 Return a new object from the data at the front of the queue.\n\
1654 The object's format is also returned.\n\
1655 \n\
1656 If there is nothing to receive then raise QueueEmpty.");
1657 
1658 static PyObject *
queuesmod_bind(PyObject * self,PyObject * args,PyObject * kwds)1659 queuesmod_bind(PyObject *self, PyObject *args, PyObject *kwds)
1660 {
1661     static char *kwlist[] = {"qid", NULL};
1662     qidarg_converter_data qidarg = {0};
1663     if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:bind", kwlist,
1664                                      qidarg_converter, &qidarg)) {
1665         return NULL;
1666     }
1667     int64_t qid = qidarg.id;
1668 
1669     // XXX Check module state if bound already.
1670 
1671     int err = _queues_incref(&_globals.queues, qid);
1672     if (handle_queue_error(err, self, qid)) {
1673         return NULL;
1674     }
1675 
1676     // XXX Update module state.
1677 
1678     Py_RETURN_NONE;
1679 }
1680 
1681 PyDoc_STRVAR(queuesmod_bind_doc,
1682 "bind(qid)\n\
1683 \n\
1684 Take a reference to the identified queue.\n\
1685 The queue is not destroyed until there are no references left.");
1686 
1687 static PyObject *
queuesmod_release(PyObject * self,PyObject * args,PyObject * kwds)1688 queuesmod_release(PyObject *self, PyObject *args, PyObject *kwds)
1689 {
1690     // Note that only the current interpreter is affected.
1691     static char *kwlist[] = {"qid", NULL};
1692     qidarg_converter_data qidarg = {0};
1693     if (!PyArg_ParseTupleAndKeywords(args, kwds,
1694                                      "O&:release", kwlist,
1695                                      qidarg_converter, &qidarg)) {
1696         return NULL;
1697     }
1698     int64_t qid = qidarg.id;
1699 
1700     // XXX Check module state if bound already.
1701     // XXX Update module state.
1702 
1703     int err = _queues_decref(&_globals.queues, qid);
1704     if (handle_queue_error(err, self, qid)) {
1705         return NULL;
1706     }
1707 
1708     Py_RETURN_NONE;
1709 }
1710 
1711 PyDoc_STRVAR(queuesmod_release_doc,
1712 "release(qid)\n\
1713 \n\
1714 Release a reference to the queue.\n\
1715 The queue is destroyed once there are no references left.");
1716 
1717 static PyObject *
queuesmod_get_maxsize(PyObject * self,PyObject * args,PyObject * kwds)1718 queuesmod_get_maxsize(PyObject *self, PyObject *args, PyObject *kwds)
1719 {
1720     static char *kwlist[] = {"qid", NULL};
1721     qidarg_converter_data qidarg = {0};
1722     if (!PyArg_ParseTupleAndKeywords(args, kwds,
1723                                      "O&:get_maxsize", kwlist,
1724                                      qidarg_converter, &qidarg)) {
1725         return NULL;
1726     }
1727     int64_t qid = qidarg.id;
1728 
1729     Py_ssize_t maxsize = -1;
1730     int err = queue_get_maxsize(&_globals.queues, qid, &maxsize);
1731     if (handle_queue_error(err, self, qid)) {
1732         return NULL;
1733     }
1734     return PyLong_FromLongLong(maxsize);
1735 }
1736 
1737 PyDoc_STRVAR(queuesmod_get_maxsize_doc,
1738 "get_maxsize(qid)\n\
1739 \n\
1740 Return the maximum number of items in the queue.");
1741 
1742 static PyObject *
queuesmod_get_queue_defaults(PyObject * self,PyObject * args,PyObject * kwds)1743 queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds)
1744 {
1745     static char *kwlist[] = {"qid", NULL};
1746     qidarg_converter_data qidarg = {0};
1747     if (!PyArg_ParseTupleAndKeywords(args, kwds,
1748                                      "O&:get_queue_defaults", kwlist,
1749                                      qidarg_converter, &qidarg)) {
1750         return NULL;
1751     }
1752     int64_t qid = qidarg.id;
1753 
1754     _queue *queue = NULL;
1755     int err = _queues_lookup(&_globals.queues, qid, &queue);
1756     if (handle_queue_error(err, self, qid)) {
1757         return NULL;
1758     }
1759     int fmt = queue->defaults.fmt;
1760     int unboundop = queue->defaults.unboundop;
1761     _queue_unmark_waiter(queue, _globals.queues.mutex);
1762 
1763     PyObject *defaults = Py_BuildValue("ii", fmt, unboundop);
1764     return defaults;
1765 }
1766 
1767 PyDoc_STRVAR(queuesmod_get_queue_defaults_doc,
1768 "get_queue_defaults(qid)\n\
1769 \n\
1770 Return the queue's default values, set when it was created.");
1771 
1772 static PyObject *
queuesmod_is_full(PyObject * self,PyObject * args,PyObject * kwds)1773 queuesmod_is_full(PyObject *self, PyObject *args, PyObject *kwds)
1774 {
1775     static char *kwlist[] = {"qid", NULL};
1776     qidarg_converter_data qidarg = {0};
1777     if (!PyArg_ParseTupleAndKeywords(args, kwds,
1778                                      "O&:is_full", kwlist,
1779                                      qidarg_converter, &qidarg)) {
1780         return NULL;
1781     }
1782     int64_t qid = qidarg.id;
1783 
1784     int is_full = 0;
1785     int err = queue_is_full(&_globals.queues, qid, &is_full);
1786     if (handle_queue_error(err, self, qid)) {
1787         return NULL;
1788     }
1789     if (is_full) {
1790         Py_RETURN_TRUE;
1791     }
1792     Py_RETURN_FALSE;
1793 }
1794 
1795 PyDoc_STRVAR(queuesmod_is_full_doc,
1796 "is_full(qid)\n\
1797 \n\
1798 Return true if the queue has a maxsize and has reached it.");
1799 
1800 static PyObject *
queuesmod_get_count(PyObject * self,PyObject * args,PyObject * kwds)1801 queuesmod_get_count(PyObject *self, PyObject *args, PyObject *kwds)
1802 {
1803     static char *kwlist[] = {"qid", NULL};
1804     qidarg_converter_data qidarg = {0};
1805     if (!PyArg_ParseTupleAndKeywords(args, kwds,
1806                                      "O&:get_count", kwlist,
1807                                      qidarg_converter, &qidarg)) {
1808         return NULL;
1809     }
1810     int64_t qid = qidarg.id;
1811 
1812     Py_ssize_t count = -1;
1813     int err = queue_get_count(&_globals.queues, qid, &count);
1814     if (handle_queue_error(err, self, qid)) {
1815         return NULL;
1816     }
1817     assert(count >= 0);
1818     return PyLong_FromSsize_t(count);
1819 }
1820 
1821 PyDoc_STRVAR(queuesmod_get_count_doc,
1822 "get_count(qid)\n\
1823 \n\
1824 Return the number of items in the queue.");
1825 
1826 static PyObject *
queuesmod__register_heap_types(PyObject * self,PyObject * args,PyObject * kwds)1827 queuesmod__register_heap_types(PyObject *self, PyObject *args, PyObject *kwds)
1828 {
1829     static char *kwlist[] = {"queuetype", "emptyerror", "fullerror", NULL};
1830     PyObject *queuetype;
1831     PyObject *emptyerror;
1832     PyObject *fullerror;
1833     if (!PyArg_ParseTupleAndKeywords(args, kwds,
1834                                      "OOO:_register_heap_types", kwlist,
1835                                      &queuetype, &emptyerror, &fullerror)) {
1836         return NULL;
1837     }
1838     if (!PyType_Check(queuetype)) {
1839         PyErr_SetString(PyExc_TypeError,
1840                         "expected a type for 'queuetype'");
1841         return NULL;
1842     }
1843     if (!PyExceptionClass_Check(emptyerror)) {
1844         PyErr_SetString(PyExc_TypeError,
1845                         "expected an exception type for 'emptyerror'");
1846         return NULL;
1847     }
1848     if (!PyExceptionClass_Check(fullerror)) {
1849         PyErr_SetString(PyExc_TypeError,
1850                         "expected an exception type for 'fullerror'");
1851         return NULL;
1852     }
1853 
1854     module_state *state = get_module_state(self);
1855 
1856     if (set_external_queue_type(state, (PyTypeObject *)queuetype) < 0) {
1857         return NULL;
1858     }
1859     if (set_external_exc_types(state, emptyerror, fullerror) < 0) {
1860         return NULL;
1861     }
1862 
1863     Py_RETURN_NONE;
1864 }
1865 
1866 static PyMethodDef module_functions[] = {
1867     {"create",                     _PyCFunction_CAST(queuesmod_create),
1868      METH_VARARGS | METH_KEYWORDS, queuesmod_create_doc},
1869     {"destroy",                    _PyCFunction_CAST(queuesmod_destroy),
1870      METH_VARARGS | METH_KEYWORDS, queuesmod_destroy_doc},
1871     {"list_all",                   queuesmod_list_all,
1872      METH_NOARGS,                  queuesmod_list_all_doc},
1873     {"put",                        _PyCFunction_CAST(queuesmod_put),
1874      METH_VARARGS | METH_KEYWORDS, queuesmod_put_doc},
1875     {"get",                        _PyCFunction_CAST(queuesmod_get),
1876      METH_VARARGS | METH_KEYWORDS, queuesmod_get_doc},
1877     {"bind",                       _PyCFunction_CAST(queuesmod_bind),
1878      METH_VARARGS | METH_KEYWORDS, queuesmod_bind_doc},
1879     {"release",                    _PyCFunction_CAST(queuesmod_release),
1880      METH_VARARGS | METH_KEYWORDS, queuesmod_release_doc},
1881     {"get_maxsize",                _PyCFunction_CAST(queuesmod_get_maxsize),
1882      METH_VARARGS | METH_KEYWORDS, queuesmod_get_maxsize_doc},
1883     {"get_queue_defaults",         _PyCFunction_CAST(queuesmod_get_queue_defaults),
1884      METH_VARARGS | METH_KEYWORDS, queuesmod_get_queue_defaults_doc},
1885     {"is_full",                    _PyCFunction_CAST(queuesmod_is_full),
1886      METH_VARARGS | METH_KEYWORDS, queuesmod_is_full_doc},
1887     {"get_count",                  _PyCFunction_CAST(queuesmod_get_count),
1888      METH_VARARGS | METH_KEYWORDS, queuesmod_get_count_doc},
1889     {"_register_heap_types",       _PyCFunction_CAST(queuesmod__register_heap_types),
1890      METH_VARARGS | METH_KEYWORDS, NULL},
1891 
1892     {NULL,                        NULL}           /* sentinel */
1893 };
1894 
1895 
1896 /* initialization function */
1897 
1898 PyDoc_STRVAR(module_doc,
1899 "This module provides primitive operations to manage Python interpreters.\n\
1900 The 'interpreters' module provides a more convenient interface.");
1901 
1902 static int
module_exec(PyObject * mod)1903 module_exec(PyObject *mod)
1904 {
1905     int err = _globals_init();
1906     if (handle_queue_error(err, mod, -1)) {
1907         return -1;
1908     }
1909 
1910     /* Add exception types */
1911     if (add_QueueError(mod) < 0) {
1912         goto error;
1913     }
1914 
1915     /* Make sure queues drop objects owned by this interpreter. */
1916     PyInterpreterState *interp = _get_current_interp();
1917     PyUnstable_AtExit(interp, clear_interpreter, (void *)interp);
1918 
1919     return 0;
1920 
1921 error:
1922     _globals_fini();
1923     return -1;
1924 }
1925 
1926 static struct PyModuleDef_Slot module_slots[] = {
1927     {Py_mod_exec, module_exec},
1928     {Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED},
1929     {Py_mod_gil, Py_MOD_GIL_NOT_USED},
1930     {0, NULL},
1931 };
1932 
1933 static int
module_traverse(PyObject * mod,visitproc visit,void * arg)1934 module_traverse(PyObject *mod, visitproc visit, void *arg)
1935 {
1936     module_state *state = get_module_state(mod);
1937     traverse_module_state(state, visit, arg);
1938     return 0;
1939 }
1940 
1941 static int
module_clear(PyObject * mod)1942 module_clear(PyObject *mod)
1943 {
1944     module_state *state = get_module_state(mod);
1945 
1946     // Now we clear the module state.
1947     clear_module_state(state);
1948     return 0;
1949 }
1950 
1951 static void
module_free(void * mod)1952 module_free(void *mod)
1953 {
1954     module_state *state = get_module_state(mod);
1955 
1956     // Now we clear the module state.
1957     clear_module_state(state);
1958 
1959     _globals_fini();
1960 }
1961 
1962 static struct PyModuleDef moduledef = {
1963     .m_base = PyModuleDef_HEAD_INIT,
1964     .m_name = MODULE_NAME_STR,
1965     .m_doc = module_doc,
1966     .m_size = sizeof(module_state),
1967     .m_methods = module_functions,
1968     .m_slots = module_slots,
1969     .m_traverse = module_traverse,
1970     .m_clear = module_clear,
1971     .m_free = (freefunc)module_free,
1972 };
1973 
1974 PyMODINIT_FUNC
MODINIT_FUNC_NAME(void)1975 MODINIT_FUNC_NAME(void)
1976 {
1977     return PyModuleDef_Init(&moduledef);
1978 }
1979