1 /* interpreters module */
2 /* low-level access to interpreter primitives */
3
4 #ifndef Py_BUILD_CORE_BUILTIN
5 # define Py_BUILD_CORE_MODULE 1
6 #endif
7
8 #include "Python.h"
9 #include "pycore_crossinterp.h" // struct _xid
10
11 #define REGISTERS_HEAP_TYPES
12 #define HAS_UNBOUND_ITEMS
13 #include "_interpreters_common.h"
14 #undef HAS_UNBOUND_ITEMS
15 #undef REGISTERS_HEAP_TYPES
16
17
18 #define MODULE_NAME _interpqueues
19 #define MODULE_NAME_STR Py_STRINGIFY(MODULE_NAME)
20 #define MODINIT_FUNC_NAME RESOLVE_MODINIT_FUNC_NAME(MODULE_NAME)
21
22
23 #define GLOBAL_MALLOC(TYPE) \
24 PyMem_RawMalloc(sizeof(TYPE))
25 #define GLOBAL_FREE(VAR) \
26 PyMem_RawFree(VAR)
27
28
29 #define XID_IGNORE_EXC 1
30 #define XID_FREE 2
31
32 static int
_release_xid_data(_PyCrossInterpreterData * data,int flags)33 _release_xid_data(_PyCrossInterpreterData *data, int flags)
34 {
35 int ignoreexc = flags & XID_IGNORE_EXC;
36 PyObject *exc;
37 if (ignoreexc) {
38 exc = PyErr_GetRaisedException();
39 }
40 int res;
41 if (flags & XID_FREE) {
42 res = _PyCrossInterpreterData_ReleaseAndRawFree(data);
43 }
44 else {
45 res = _PyCrossInterpreterData_Release(data);
46 }
47 if (res < 0) {
48 /* The owning interpreter is already destroyed. */
49 if (ignoreexc) {
50 // XXX Emit a warning?
51 PyErr_Clear();
52 }
53 }
54 if (flags & XID_FREE) {
55 /* Either way, we free the data. */
56 }
57 if (ignoreexc) {
58 PyErr_SetRaisedException(exc);
59 }
60 return res;
61 }
62
63 static PyInterpreterState *
_get_current_interp(void)64 _get_current_interp(void)
65 {
66 // PyInterpreterState_Get() aborts if lookup fails, so don't need
67 // to check the result for NULL.
68 return PyInterpreterState_Get();
69 }
70
71 static PyObject *
_get_current_module(void)72 _get_current_module(void)
73 {
74 PyObject *name = PyUnicode_FromString(MODULE_NAME_STR);
75 if (name == NULL) {
76 return NULL;
77 }
78 PyObject *mod = PyImport_GetModule(name);
79 Py_DECREF(name);
80 if (mod == NULL) {
81 return NULL;
82 }
83 assert(mod != Py_None);
84 return mod;
85 }
86
87
88 struct idarg_int64_converter_data {
89 // input:
90 const char *label;
91 // output:
92 int64_t id;
93 };
94
95 static int
idarg_int64_converter(PyObject * arg,void * ptr)96 idarg_int64_converter(PyObject *arg, void *ptr)
97 {
98 int64_t id;
99 struct idarg_int64_converter_data *data = ptr;
100
101 const char *label = data->label;
102 if (label == NULL) {
103 label = "ID";
104 }
105
106 if (PyIndex_Check(arg)) {
107 int overflow = 0;
108 id = PyLong_AsLongLongAndOverflow(arg, &overflow);
109 if (id == -1 && PyErr_Occurred()) {
110 return 0;
111 }
112 else if (id == -1 && overflow == 1) {
113 PyErr_Format(PyExc_OverflowError,
114 "max %s is %lld, got %R", label, INT64_MAX, arg);
115 return 0;
116 }
117 else if (id < 0) {
118 PyErr_Format(PyExc_ValueError,
119 "%s must be a non-negative int, got %R", label, arg);
120 return 0;
121 }
122 }
123 else {
124 PyErr_Format(PyExc_TypeError,
125 "%s must be an int, got %.100s",
126 label, Py_TYPE(arg)->tp_name);
127 return 0;
128 }
129 data->id = id;
130 return 1;
131 }
132
133
134 static int
ensure_highlevel_module_loaded(void)135 ensure_highlevel_module_loaded(void)
136 {
137 PyObject *highlevel = PyImport_ImportModule("interpreters.queues");
138 if (highlevel == NULL) {
139 PyErr_Clear();
140 highlevel = PyImport_ImportModule("test.support.interpreters.queues");
141 if (highlevel == NULL) {
142 return -1;
143 }
144 }
145 Py_DECREF(highlevel);
146 return 0;
147 }
148
149
150 /* module state *************************************************************/
151
152 typedef struct {
153 /* external types (added at runtime by interpreters module) */
154 PyTypeObject *queue_type;
155
156 /* QueueError (and its subclasses) */
157 PyObject *QueueError;
158 PyObject *QueueNotFoundError;
159 PyObject *QueueEmpty;
160 PyObject *QueueFull;
161 } module_state;
162
163 static inline module_state *
get_module_state(PyObject * mod)164 get_module_state(PyObject *mod)
165 {
166 assert(mod != NULL);
167 module_state *state = PyModule_GetState(mod);
168 assert(state != NULL);
169 return state;
170 }
171
172 static int
traverse_module_state(module_state * state,visitproc visit,void * arg)173 traverse_module_state(module_state *state, visitproc visit, void *arg)
174 {
175 /* external types */
176 Py_VISIT(state->queue_type);
177
178 /* QueueError */
179 Py_VISIT(state->QueueError);
180 Py_VISIT(state->QueueNotFoundError);
181 Py_VISIT(state->QueueEmpty);
182 Py_VISIT(state->QueueFull);
183
184 return 0;
185 }
186
187 static int
clear_module_state(module_state * state)188 clear_module_state(module_state *state)
189 {
190 /* external types */
191 if (state->queue_type != NULL) {
192 (void)clear_xid_class(state->queue_type);
193 }
194 Py_CLEAR(state->queue_type);
195
196 /* QueueError */
197 Py_CLEAR(state->QueueError);
198 Py_CLEAR(state->QueueNotFoundError);
199 Py_CLEAR(state->QueueEmpty);
200 Py_CLEAR(state->QueueFull);
201
202 return 0;
203 }
204
205
206 /* error codes **************************************************************/
207
208 #define ERR_EXCEPTION_RAISED (-1)
209 // multi-queue errors
210 #define ERR_QUEUES_ALLOC (-11)
211 #define ERR_QUEUE_ALLOC (-12)
212 #define ERR_NO_NEXT_QUEUE_ID (-13)
213 #define ERR_QUEUE_NOT_FOUND (-14)
214 // single-queue errors
215 #define ERR_QUEUE_EMPTY (-21)
216 #define ERR_QUEUE_FULL (-22)
217 #define ERR_QUEUE_NEVER_BOUND (-23)
218
219 static int ensure_external_exc_types(module_state *);
220
221 static int
resolve_module_errcode(module_state * state,int errcode,int64_t qid,PyObject ** p_exctype,PyObject ** p_msgobj)222 resolve_module_errcode(module_state *state, int errcode, int64_t qid,
223 PyObject **p_exctype, PyObject **p_msgobj)
224 {
225 PyObject *exctype = NULL;
226 PyObject *msg = NULL;
227 switch (errcode) {
228 case ERR_NO_NEXT_QUEUE_ID:
229 exctype = state->QueueError;
230 msg = PyUnicode_FromString("ran out of queue IDs");
231 break;
232 case ERR_QUEUE_NOT_FOUND:
233 exctype = state->QueueNotFoundError;
234 msg = PyUnicode_FromFormat("queue %" PRId64 " not found", qid);
235 break;
236 case ERR_QUEUE_EMPTY:
237 if (ensure_external_exc_types(state) < 0) {
238 return -1;
239 }
240 exctype = state->QueueEmpty;
241 msg = PyUnicode_FromFormat("queue %" PRId64 " is empty", qid);
242 break;
243 case ERR_QUEUE_FULL:
244 if (ensure_external_exc_types(state) < 0) {
245 return -1;
246 }
247 exctype = state->QueueFull;
248 msg = PyUnicode_FromFormat("queue %" PRId64 " is full", qid);
249 break;
250 case ERR_QUEUE_NEVER_BOUND:
251 exctype = state->QueueError;
252 msg = PyUnicode_FromFormat("queue %" PRId64 " never bound", qid);
253 break;
254 default:
255 PyErr_Format(PyExc_ValueError,
256 "unsupported error code %d", errcode);
257 return -1;
258 }
259
260 if (msg == NULL) {
261 assert(PyErr_Occurred());
262 return -1;
263 }
264 *p_exctype = exctype;
265 *p_msgobj = msg;
266 return 0;
267 }
268
269
270 /* QueueError ***************************************************************/
271
272 static int
add_exctype(PyObject * mod,PyObject ** p_state_field,const char * qualname,const char * doc,PyObject * base)273 add_exctype(PyObject *mod, PyObject **p_state_field,
274 const char *qualname, const char *doc, PyObject *base)
275 {
276 #ifndef NDEBUG
277 const char *dot = strrchr(qualname, '.');
278 assert(dot != NULL);
279 const char *name = dot+1;
280 assert(*p_state_field == NULL);
281 assert(!PyObject_HasAttrStringWithError(mod, name));
282 #endif
283 PyObject *exctype = PyErr_NewExceptionWithDoc(qualname, doc, base, NULL);
284 if (exctype == NULL) {
285 return -1;
286 }
287 if (PyModule_AddType(mod, (PyTypeObject *)exctype) < 0) {
288 Py_DECREF(exctype);
289 return -1;
290 }
291 *p_state_field = exctype;
292 return 0;
293 }
294
295 static int
add_QueueError(PyObject * mod)296 add_QueueError(PyObject *mod)
297 {
298 module_state *state = get_module_state(mod);
299
300 #define PREFIX "test.support.interpreters."
301 #define ADD_EXCTYPE(NAME, BASE, DOC) \
302 assert(state->NAME == NULL); \
303 if (add_exctype(mod, &state->NAME, PREFIX #NAME, DOC, BASE) < 0) { \
304 return -1; \
305 }
306 ADD_EXCTYPE(QueueError, PyExc_RuntimeError,
307 "Indicates that a queue-related error happened.")
308 ADD_EXCTYPE(QueueNotFoundError, state->QueueError, NULL)
309 // QueueEmpty and QueueFull are set by set_external_exc_types().
310 state->QueueEmpty = NULL;
311 state->QueueFull = NULL;
312 #undef ADD_EXCTYPE
313 #undef PREFIX
314
315 return 0;
316 }
317
318 static int
set_external_exc_types(module_state * state,PyObject * emptyerror,PyObject * fullerror)319 set_external_exc_types(module_state *state,
320 PyObject *emptyerror, PyObject *fullerror)
321 {
322 if (state->QueueEmpty != NULL) {
323 assert(state->QueueFull != NULL);
324 Py_CLEAR(state->QueueEmpty);
325 Py_CLEAR(state->QueueFull);
326 }
327 else {
328 assert(state->QueueFull == NULL);
329 }
330 assert(PyObject_IsSubclass(emptyerror, state->QueueError));
331 assert(PyObject_IsSubclass(fullerror, state->QueueError));
332 state->QueueEmpty = Py_NewRef(emptyerror);
333 state->QueueFull = Py_NewRef(fullerror);
334 return 0;
335 }
336
337 static int
ensure_external_exc_types(module_state * state)338 ensure_external_exc_types(module_state *state)
339 {
340 if (state->QueueEmpty != NULL) {
341 assert(state->QueueFull != NULL);
342 return 0;
343 }
344 assert(state->QueueFull == NULL);
345
346 // Force the module to be loaded, to register the type.
347 if (ensure_highlevel_module_loaded() < 0) {
348 return -1;
349 }
350 assert(state->QueueEmpty != NULL);
351 assert(state->QueueFull != NULL);
352 return 0;
353 }
354
355 static int
handle_queue_error(int err,PyObject * mod,int64_t qid)356 handle_queue_error(int err, PyObject *mod, int64_t qid)
357 {
358 if (err == 0) {
359 assert(!PyErr_Occurred());
360 return 0;
361 }
362 assert(err < 0);
363 assert((err == -1) == (PyErr_Occurred() != NULL));
364
365 module_state *state;
366 switch (err) {
367 case ERR_QUEUE_ALLOC: // fall through
368 case ERR_QUEUES_ALLOC:
369 PyErr_NoMemory();
370 break;
371 case -1:
372 return -1;
373 default:
374 state = get_module_state(mod);
375 assert(state->QueueError != NULL);
376 PyObject *exctype = NULL;
377 PyObject *msg = NULL;
378 if (resolve_module_errcode(state, err, qid, &exctype, &msg) < 0) {
379 return -1;
380 }
381 PyObject *exc = PyObject_CallOneArg(exctype, msg);
382 Py_DECREF(msg);
383 if (exc == NULL) {
384 return -1;
385 }
386 PyErr_SetObject(exctype, exc);
387 Py_DECREF(exc);
388 }
389 return 1;
390 }
391
392
393 /* the basic queue **********************************************************/
394
395 struct _queueitem;
396
397 typedef struct _queueitem {
398 /* The interpreter that added the item to the queue.
399 The actual bound interpid is found in item->data.
400 This is necessary because item->data might be NULL,
401 meaning the interpreter has been destroyed. */
402 int64_t interpid;
403 _PyCrossInterpreterData *data;
404 int fmt;
405 int unboundop;
406 struct _queueitem *next;
407 } _queueitem;
408
409 static void
_queueitem_init(_queueitem * item,int64_t interpid,_PyCrossInterpreterData * data,int fmt,int unboundop)410 _queueitem_init(_queueitem *item,
411 int64_t interpid, _PyCrossInterpreterData *data,
412 int fmt, int unboundop)
413 {
414 if (interpid < 0) {
415 interpid = _get_interpid(data);
416 }
417 else {
418 assert(data == NULL
419 || _PyCrossInterpreterData_INTERPID(data) < 0
420 || interpid == _PyCrossInterpreterData_INTERPID(data));
421 }
422 assert(check_unbound(unboundop));
423 *item = (_queueitem){
424 .interpid = interpid,
425 .data = data,
426 .fmt = fmt,
427 .unboundop = unboundop,
428 };
429 }
430
431 static void
_queueitem_clear_data(_queueitem * item)432 _queueitem_clear_data(_queueitem *item)
433 {
434 if (item->data == NULL) {
435 return;
436 }
437 // It was allocated in queue_put().
438 (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
439 item->data = NULL;
440 }
441
442 static void
_queueitem_clear(_queueitem * item)443 _queueitem_clear(_queueitem *item)
444 {
445 item->next = NULL;
446 _queueitem_clear_data(item);
447 }
448
449 static _queueitem *
_queueitem_new(int64_t interpid,_PyCrossInterpreterData * data,int fmt,int unboundop)450 _queueitem_new(int64_t interpid, _PyCrossInterpreterData *data,
451 int fmt, int unboundop)
452 {
453 _queueitem *item = GLOBAL_MALLOC(_queueitem);
454 if (item == NULL) {
455 PyErr_NoMemory();
456 return NULL;
457 }
458 _queueitem_init(item, interpid, data, fmt, unboundop);
459 return item;
460 }
461
462 static void
_queueitem_free(_queueitem * item)463 _queueitem_free(_queueitem *item)
464 {
465 _queueitem_clear(item);
466 GLOBAL_FREE(item);
467 }
468
469 static void
_queueitem_free_all(_queueitem * item)470 _queueitem_free_all(_queueitem *item)
471 {
472 while (item != NULL) {
473 _queueitem *last = item;
474 item = item->next;
475 _queueitem_free(last);
476 }
477 }
478
479 static void
_queueitem_popped(_queueitem * item,_PyCrossInterpreterData ** p_data,int * p_fmt,int * p_unboundop)480 _queueitem_popped(_queueitem *item,
481 _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop)
482 {
483 *p_data = item->data;
484 *p_fmt = item->fmt;
485 *p_unboundop = item->unboundop;
486 // We clear them here, so they won't be released in _queueitem_clear().
487 item->data = NULL;
488 _queueitem_free(item);
489 }
490
491 static int
_queueitem_clear_interpreter(_queueitem * item)492 _queueitem_clear_interpreter(_queueitem *item)
493 {
494 assert(item->interpid >= 0);
495 if (item->data == NULL) {
496 // Its interpreter was already cleared (or it was never bound).
497 // For UNBOUND_REMOVE it should have been freed at that time.
498 assert(item->unboundop != UNBOUND_REMOVE);
499 return 0;
500 }
501 assert(_PyCrossInterpreterData_INTERPID(item->data) == item->interpid);
502
503 switch (item->unboundop) {
504 case UNBOUND_REMOVE:
505 // The caller must free/clear it.
506 return 1;
507 case UNBOUND_ERROR:
508 case UNBOUND_REPLACE:
509 // We won't need the cross-interpreter data later
510 // so we completely throw it away.
511 _queueitem_clear_data(item);
512 return 0;
513 default:
514 Py_FatalError("not reachable");
515 return -1;
516 }
517 }
518
519
520 /* the queue */
521
522 typedef struct _queue {
523 Py_ssize_t num_waiters; // protected by global lock
524 PyThread_type_lock mutex;
525 int alive;
526 struct _queueitems {
527 Py_ssize_t maxsize;
528 Py_ssize_t count;
529 _queueitem *first;
530 _queueitem *last;
531 } items;
532 struct {
533 int fmt;
534 int unboundop;
535 } defaults;
536 } _queue;
537
538 static int
_queue_init(_queue * queue,Py_ssize_t maxsize,int fmt,int unboundop)539 _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop)
540 {
541 assert(check_unbound(unboundop));
542 PyThread_type_lock mutex = PyThread_allocate_lock();
543 if (mutex == NULL) {
544 return ERR_QUEUE_ALLOC;
545 }
546 *queue = (_queue){
547 .mutex = mutex,
548 .alive = 1,
549 .items = {
550 .maxsize = maxsize,
551 },
552 .defaults = {
553 .fmt = fmt,
554 .unboundop = unboundop,
555 },
556 };
557 return 0;
558 }
559
560 static void
_queue_clear(_queue * queue)561 _queue_clear(_queue *queue)
562 {
563 assert(!queue->alive);
564 assert(queue->num_waiters == 0);
565 _queueitem_free_all(queue->items.first);
566 assert(queue->mutex != NULL);
567 PyThread_free_lock(queue->mutex);
568 *queue = (_queue){0};
569 }
570
571 static void _queue_free(_queue *);
572
573 static void
_queue_kill_and_wait(_queue * queue)574 _queue_kill_and_wait(_queue *queue)
575 {
576 // Mark it as dead.
577 PyThread_acquire_lock(queue->mutex, WAIT_LOCK);
578 assert(queue->alive);
579 queue->alive = 0;
580 PyThread_release_lock(queue->mutex);
581
582 // Wait for all waiters to fail.
583 while (queue->num_waiters > 0) {
584 PyThread_acquire_lock(queue->mutex, WAIT_LOCK);
585 PyThread_release_lock(queue->mutex);
586 };
587 }
588
589 static void
_queue_mark_waiter(_queue * queue,PyThread_type_lock parent_mutex)590 _queue_mark_waiter(_queue *queue, PyThread_type_lock parent_mutex)
591 {
592 if (parent_mutex != NULL) {
593 PyThread_acquire_lock(parent_mutex, WAIT_LOCK);
594 queue->num_waiters += 1;
595 PyThread_release_lock(parent_mutex);
596 }
597 else {
598 // The caller must be holding the parent lock already.
599 queue->num_waiters += 1;
600 }
601 }
602
603 static void
_queue_unmark_waiter(_queue * queue,PyThread_type_lock parent_mutex)604 _queue_unmark_waiter(_queue *queue, PyThread_type_lock parent_mutex)
605 {
606 if (parent_mutex != NULL) {
607 PyThread_acquire_lock(parent_mutex, WAIT_LOCK);
608 queue->num_waiters -= 1;
609 PyThread_release_lock(parent_mutex);
610 }
611 else {
612 // The caller must be holding the parent lock already.
613 queue->num_waiters -= 1;
614 }
615 }
616
617 static int
_queue_lock(_queue * queue)618 _queue_lock(_queue *queue)
619 {
620 // The queue must be marked as a waiter already.
621 PyThread_acquire_lock(queue->mutex, WAIT_LOCK);
622 if (!queue->alive) {
623 PyThread_release_lock(queue->mutex);
624 return ERR_QUEUE_NOT_FOUND;
625 }
626 return 0;
627 }
628
629 static void
_queue_unlock(_queue * queue)630 _queue_unlock(_queue *queue)
631 {
632 PyThread_release_lock(queue->mutex);
633 }
634
635 static int
_queue_add(_queue * queue,int64_t interpid,_PyCrossInterpreterData * data,int fmt,int unboundop)636 _queue_add(_queue *queue, int64_t interpid, _PyCrossInterpreterData *data,
637 int fmt, int unboundop)
638 {
639 int err = _queue_lock(queue);
640 if (err < 0) {
641 return err;
642 }
643
644 Py_ssize_t maxsize = queue->items.maxsize;
645 if (maxsize <= 0) {
646 maxsize = PY_SSIZE_T_MAX;
647 }
648 if (queue->items.count >= maxsize) {
649 _queue_unlock(queue);
650 return ERR_QUEUE_FULL;
651 }
652
653 _queueitem *item = _queueitem_new(interpid, data, fmt, unboundop);
654 if (item == NULL) {
655 _queue_unlock(queue);
656 return -1;
657 }
658
659 queue->items.count += 1;
660 if (queue->items.first == NULL) {
661 queue->items.first = item;
662 }
663 else {
664 queue->items.last->next = item;
665 }
666 queue->items.last = item;
667
668 _queue_unlock(queue);
669 return 0;
670 }
671
672 static int
_queue_next(_queue * queue,_PyCrossInterpreterData ** p_data,int * p_fmt,int * p_unboundop)673 _queue_next(_queue *queue,
674 _PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop)
675 {
676 int err = _queue_lock(queue);
677 if (err < 0) {
678 return err;
679 }
680
681 assert(queue->items.count >= 0);
682 _queueitem *item = queue->items.first;
683 if (item == NULL) {
684 _queue_unlock(queue);
685 return ERR_QUEUE_EMPTY;
686 }
687 queue->items.first = item->next;
688 if (queue->items.last == item) {
689 queue->items.last = NULL;
690 }
691 queue->items.count -= 1;
692
693 _queueitem_popped(item, p_data, p_fmt, p_unboundop);
694
695 _queue_unlock(queue);
696 return 0;
697 }
698
699 static int
_queue_get_maxsize(_queue * queue,Py_ssize_t * p_maxsize)700 _queue_get_maxsize(_queue *queue, Py_ssize_t *p_maxsize)
701 {
702 int err = _queue_lock(queue);
703 if (err < 0) {
704 return err;
705 }
706
707 *p_maxsize = queue->items.maxsize;
708
709 _queue_unlock(queue);
710 return 0;
711 }
712
713 static int
_queue_is_full(_queue * queue,int * p_is_full)714 _queue_is_full(_queue *queue, int *p_is_full)
715 {
716 int err = _queue_lock(queue);
717 if (err < 0) {
718 return err;
719 }
720
721 assert(queue->items.count <= queue->items.maxsize);
722 *p_is_full = queue->items.count == queue->items.maxsize;
723
724 _queue_unlock(queue);
725 return 0;
726 }
727
728 static int
_queue_get_count(_queue * queue,Py_ssize_t * p_count)729 _queue_get_count(_queue *queue, Py_ssize_t *p_count)
730 {
731 int err = _queue_lock(queue);
732 if (err < 0) {
733 return err;
734 }
735
736 *p_count = queue->items.count;
737
738 _queue_unlock(queue);
739 return 0;
740 }
741
742 static void
_queue_clear_interpreter(_queue * queue,int64_t interpid)743 _queue_clear_interpreter(_queue *queue, int64_t interpid)
744 {
745 int err = _queue_lock(queue);
746 if (err == ERR_QUEUE_NOT_FOUND) {
747 // The queue is already destroyed, so there's nothing to clear.
748 assert(!PyErr_Occurred());
749 return;
750 }
751 assert(err == 0); // There should be no other errors.
752
753 _queueitem *prev = NULL;
754 _queueitem *next = queue->items.first;
755 while (next != NULL) {
756 _queueitem *item = next;
757 next = item->next;
758 int remove = (item->interpid == interpid)
759 ? _queueitem_clear_interpreter(item)
760 : 0;
761 if (remove) {
762 _queueitem_free(item);
763 if (prev == NULL) {
764 queue->items.first = next;
765 }
766 else {
767 prev->next = next;
768 }
769 queue->items.count -= 1;
770 }
771 else {
772 prev = item;
773 }
774 }
775
776 _queue_unlock(queue);
777 }
778
779
780 /* external queue references ************************************************/
781
782 struct _queueref;
783
784 typedef struct _queueref {
785 struct _queueref *next;
786 int64_t qid;
787 Py_ssize_t refcount;
788 _queue *queue;
789 } _queueref;
790
791 static _queueref *
_queuerefs_find(_queueref * first,int64_t qid,_queueref ** pprev)792 _queuerefs_find(_queueref *first, int64_t qid, _queueref **pprev)
793 {
794 _queueref *prev = NULL;
795 _queueref *ref = first;
796 while (ref != NULL) {
797 if (ref->qid == qid) {
798 break;
799 }
800 prev = ref;
801 ref = ref->next;
802 }
803 if (pprev != NULL) {
804 *pprev = prev;
805 }
806 return ref;
807 }
808
809 static void
_queuerefs_clear(_queueref * head)810 _queuerefs_clear(_queueref *head)
811 {
812 _queueref *next = head;
813 while (next != NULL) {
814 _queueref *ref = next;
815 next = ref->next;
816
817 #ifdef Py_DEBUG
818 int64_t qid = ref->qid;
819 fprintf(stderr, "queue %" PRId64 " still exists\n", qid);
820 #endif
821 _queue *queue = ref->queue;
822 GLOBAL_FREE(ref);
823
824 _queue_kill_and_wait(queue);
825 #ifdef Py_DEBUG
826 if (queue->items.count > 0) {
827 fprintf(stderr, "queue %" PRId64 " still holds %zd items\n",
828 qid, queue->items.count);
829 }
830 #endif
831 _queue_free(queue);
832 }
833 }
834
835
836 /* a collection of queues ***************************************************/
837
838 typedef struct _queues {
839 PyThread_type_lock mutex;
840 _queueref *head;
841 int64_t count;
842 int64_t next_id;
843 } _queues;
844
845 static void
_queues_init(_queues * queues,PyThread_type_lock mutex)846 _queues_init(_queues *queues, PyThread_type_lock mutex)
847 {
848 assert(mutex != NULL);
849 assert(queues->mutex == NULL);
850 *queues = (_queues){
851 .mutex = mutex,
852 .head = NULL,
853 .count = 0,
854 .next_id = 1,
855 };
856 }
857
858 static void
_queues_fini(_queues * queues,PyThread_type_lock * p_mutex)859 _queues_fini(_queues *queues, PyThread_type_lock *p_mutex)
860 {
861 PyThread_type_lock mutex = queues->mutex;
862 assert(mutex != NULL);
863
864 PyThread_acquire_lock(mutex, WAIT_LOCK);
865 if (queues->count > 0) {
866 assert(queues->head != NULL);
867 _queuerefs_clear(queues->head);
868 }
869 *queues = (_queues){0};
870 PyThread_release_lock(mutex);
871
872 *p_mutex = mutex;
873 }
874
875 static int64_t
_queues_next_id(_queues * queues)876 _queues_next_id(_queues *queues) // needs lock
877 {
878 int64_t qid = queues->next_id;
879 if (qid < 0) {
880 /* overflow */
881 return ERR_NO_NEXT_QUEUE_ID;
882 }
883 queues->next_id += 1;
884 return qid;
885 }
886
887 static int
_queues_lookup(_queues * queues,int64_t qid,_queue ** res)888 _queues_lookup(_queues *queues, int64_t qid, _queue **res)
889 {
890 PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
891
892 _queueref *ref = _queuerefs_find(queues->head, qid, NULL);
893 if (ref == NULL) {
894 PyThread_release_lock(queues->mutex);
895 return ERR_QUEUE_NOT_FOUND;
896 }
897 assert(ref->queue != NULL);
898 _queue *queue = ref->queue;
899 _queue_mark_waiter(queue, NULL);
900 // The caller must unmark it.
901
902 PyThread_release_lock(queues->mutex);
903
904 *res = queue;
905 return 0;
906 }
907
908 static int64_t
_queues_add(_queues * queues,_queue * queue)909 _queues_add(_queues *queues, _queue *queue)
910 {
911 int64_t qid = -1;
912 PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
913
914 // Create a new ref.
915 int64_t _qid = _queues_next_id(queues);
916 if (_qid < 0) {
917 goto done;
918 }
919 _queueref *ref = GLOBAL_MALLOC(_queueref);
920 if (ref == NULL) {
921 qid = ERR_QUEUE_ALLOC;
922 goto done;
923 }
924 *ref = (_queueref){
925 .qid = _qid,
926 .queue = queue,
927 };
928
929 // Add it to the list.
930 // We assume that the queue is a new one (not already in the list).
931 ref->next = queues->head;
932 queues->head = ref;
933 queues->count += 1;
934
935 qid = _qid;
936 done:
937 PyThread_release_lock(queues->mutex);
938 return qid;
939 }
940
941 static void
_queues_remove_ref(_queues * queues,_queueref * ref,_queueref * prev,_queue ** p_queue)942 _queues_remove_ref(_queues *queues, _queueref *ref, _queueref *prev,
943 _queue **p_queue)
944 {
945 assert(ref->queue != NULL);
946
947 if (ref == queues->head) {
948 queues->head = ref->next;
949 }
950 else {
951 prev->next = ref->next;
952 }
953 ref->next = NULL;
954 queues->count -= 1;
955
956 *p_queue = ref->queue;
957 ref->queue = NULL;
958 GLOBAL_FREE(ref);
959 }
960
961 static int
_queues_remove(_queues * queues,int64_t qid,_queue ** p_queue)962 _queues_remove(_queues *queues, int64_t qid, _queue **p_queue)
963 {
964 PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
965
966 _queueref *prev = NULL;
967 _queueref *ref = _queuerefs_find(queues->head, qid, &prev);
968 if (ref == NULL) {
969 PyThread_release_lock(queues->mutex);
970 return ERR_QUEUE_NOT_FOUND;
971 }
972
973 _queues_remove_ref(queues, ref, prev, p_queue);
974 PyThread_release_lock(queues->mutex);
975
976 return 0;
977 }
978
979 static int
_queues_incref(_queues * queues,int64_t qid)980 _queues_incref(_queues *queues, int64_t qid)
981 {
982 // XXX Track interpreter IDs?
983 int res = -1;
984 PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
985
986 _queueref *ref = _queuerefs_find(queues->head, qid, NULL);
987 if (ref == NULL) {
988 assert(!PyErr_Occurred());
989 res = ERR_QUEUE_NOT_FOUND;
990 goto done;
991 }
992 ref->refcount += 1;
993
994 res = 0;
995 done:
996 PyThread_release_lock(queues->mutex);
997 return res;
998 }
999
1000 static int
_queues_decref(_queues * queues,int64_t qid)1001 _queues_decref(_queues *queues, int64_t qid)
1002 {
1003 int res = -1;
1004 PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
1005
1006 _queueref *prev = NULL;
1007 _queueref *ref = _queuerefs_find(queues->head, qid, &prev);
1008 if (ref == NULL) {
1009 assert(!PyErr_Occurred());
1010 res = ERR_QUEUE_NOT_FOUND;
1011 goto finally;
1012 }
1013 if (ref->refcount == 0) {
1014 res = ERR_QUEUE_NEVER_BOUND;
1015 goto finally;
1016 }
1017 assert(ref->refcount > 0);
1018 ref->refcount -= 1;
1019
1020 // Destroy if no longer used.
1021 assert(ref->queue != NULL);
1022 if (ref->refcount == 0) {
1023 _queue *queue = NULL;
1024 _queues_remove_ref(queues, ref, prev, &queue);
1025 PyThread_release_lock(queues->mutex);
1026
1027 _queue_kill_and_wait(queue);
1028 _queue_free(queue);
1029 return 0;
1030 }
1031
1032 res = 0;
1033 finally:
1034 PyThread_release_lock(queues->mutex);
1035 return res;
1036 }
1037
1038 struct queue_id_and_info {
1039 int64_t id;
1040 int fmt;
1041 int unboundop;
1042 };
1043
1044 static struct queue_id_and_info *
_queues_list_all(_queues * queues,int64_t * p_count)1045 _queues_list_all(_queues *queues, int64_t *p_count)
1046 {
1047 struct queue_id_and_info *qids = NULL;
1048 PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
1049 struct queue_id_and_info *ids = PyMem_NEW(struct queue_id_and_info,
1050 (Py_ssize_t)(queues->count));
1051 if (ids == NULL) {
1052 goto done;
1053 }
1054 _queueref *ref = queues->head;
1055 for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
1056 ids[i].id = ref->qid;
1057 assert(ref->queue != NULL);
1058 ids[i].fmt = ref->queue->defaults.fmt;
1059 ids[i].unboundop = ref->queue->defaults.unboundop;
1060 }
1061 *p_count = queues->count;
1062
1063 qids = ids;
1064 done:
1065 PyThread_release_lock(queues->mutex);
1066 return qids;
1067 }
1068
1069 static void
_queues_clear_interpreter(_queues * queues,int64_t interpid)1070 _queues_clear_interpreter(_queues *queues, int64_t interpid)
1071 {
1072 PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
1073
1074 _queueref *ref = queues->head;
1075 for (; ref != NULL; ref = ref->next) {
1076 assert(ref->queue != NULL);
1077 _queue_clear_interpreter(ref->queue, interpid);
1078 }
1079
1080 PyThread_release_lock(queues->mutex);
1081 }
1082
1083
1084 /* "high"-level queue-related functions *************************************/
1085
1086 static void
_queue_free(_queue * queue)1087 _queue_free(_queue *queue)
1088 {
1089 _queue_clear(queue);
1090 GLOBAL_FREE(queue);
1091 }
1092
1093 // Create a new queue.
1094 static int64_t
queue_create(_queues * queues,Py_ssize_t maxsize,int fmt,int unboundop)1095 queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop)
1096 {
1097 _queue *queue = GLOBAL_MALLOC(_queue);
1098 if (queue == NULL) {
1099 return ERR_QUEUE_ALLOC;
1100 }
1101 int err = _queue_init(queue, maxsize, fmt, unboundop);
1102 if (err < 0) {
1103 GLOBAL_FREE(queue);
1104 return (int64_t)err;
1105 }
1106 int64_t qid = _queues_add(queues, queue);
1107 if (qid < 0) {
1108 _queue_clear(queue);
1109 GLOBAL_FREE(queue);
1110 }
1111 return qid;
1112 }
1113
1114 // Completely destroy the queue.
1115 static int
queue_destroy(_queues * queues,int64_t qid)1116 queue_destroy(_queues *queues, int64_t qid)
1117 {
1118 _queue *queue = NULL;
1119 int err = _queues_remove(queues, qid, &queue);
1120 if (err < 0) {
1121 return err;
1122 }
1123 _queue_kill_and_wait(queue);
1124 _queue_free(queue);
1125 return 0;
1126 }
1127
1128 // Push an object onto the queue.
1129 static int
queue_put(_queues * queues,int64_t qid,PyObject * obj,int fmt,int unboundop)1130 queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
1131 {
1132 // Look up the queue.
1133 _queue *queue = NULL;
1134 int err = _queues_lookup(queues, qid, &queue);
1135 if (err != 0) {
1136 return err;
1137 }
1138 assert(queue != NULL);
1139
1140 // Convert the object to cross-interpreter data.
1141 _PyCrossInterpreterData *data = GLOBAL_MALLOC(_PyCrossInterpreterData);
1142 if (data == NULL) {
1143 _queue_unmark_waiter(queue, queues->mutex);
1144 return -1;
1145 }
1146 if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
1147 _queue_unmark_waiter(queue, queues->mutex);
1148 GLOBAL_FREE(data);
1149 return -1;
1150 }
1151 assert(_PyCrossInterpreterData_INTERPID(data) == \
1152 PyInterpreterState_GetID(PyInterpreterState_Get()));
1153
1154 // Add the data to the queue.
1155 int64_t interpid = -1; // _queueitem_init() will set it.
1156 int res = _queue_add(queue, interpid, data, fmt, unboundop);
1157 _queue_unmark_waiter(queue, queues->mutex);
1158 if (res != 0) {
1159 // We may chain an exception here:
1160 (void)_release_xid_data(data, 0);
1161 GLOBAL_FREE(data);
1162 return res;
1163 }
1164
1165 return 0;
1166 }
1167
1168 // Pop the next object off the queue. Fail if empty.
1169 // XXX Support a "wait" mutex?
1170 static int
queue_get(_queues * queues,int64_t qid,PyObject ** res,int * p_fmt,int * p_unboundop)1171 queue_get(_queues *queues, int64_t qid,
1172 PyObject **res, int *p_fmt, int *p_unboundop)
1173 {
1174 int err;
1175 *res = NULL;
1176
1177 // Look up the queue.
1178 _queue *queue = NULL;
1179 err = _queues_lookup(queues, qid, &queue);
1180 if (err != 0) {
1181 return err;
1182 }
1183 // Past this point we are responsible for releasing the mutex.
1184 assert(queue != NULL);
1185
1186 // Pop off the next item from the queue.
1187 _PyCrossInterpreterData *data = NULL;
1188 err = _queue_next(queue, &data, p_fmt, p_unboundop);
1189 _queue_unmark_waiter(queue, queues->mutex);
1190 if (err != 0) {
1191 return err;
1192 }
1193 else if (data == NULL) {
1194 assert(!PyErr_Occurred());
1195 return 0;
1196 }
1197
1198 // Convert the data back to an object.
1199 PyObject *obj = _PyCrossInterpreterData_NewObject(data);
1200 if (obj == NULL) {
1201 assert(PyErr_Occurred());
1202 // It was allocated in queue_put(), so we free it.
1203 (void)_release_xid_data(data, XID_IGNORE_EXC | XID_FREE);
1204 return -1;
1205 }
1206 // It was allocated in queue_put(), so we free it.
1207 int release_res = _release_xid_data(data, XID_FREE);
1208 if (release_res < 0) {
1209 // The source interpreter has been destroyed already.
1210 assert(PyErr_Occurred());
1211 Py_DECREF(obj);
1212 return -1;
1213 }
1214
1215 *res = obj;
1216 return 0;
1217 }
1218
1219 static int
queue_get_maxsize(_queues * queues,int64_t qid,Py_ssize_t * p_maxsize)1220 queue_get_maxsize(_queues *queues, int64_t qid, Py_ssize_t *p_maxsize)
1221 {
1222 _queue *queue = NULL;
1223 int err = _queues_lookup(queues, qid, &queue);
1224 if (err < 0) {
1225 return err;
1226 }
1227 err = _queue_get_maxsize(queue, p_maxsize);
1228 _queue_unmark_waiter(queue, queues->mutex);
1229 return err;
1230 }
1231
1232 static int
queue_is_full(_queues * queues,int64_t qid,int * p_is_full)1233 queue_is_full(_queues *queues, int64_t qid, int *p_is_full)
1234 {
1235 _queue *queue = NULL;
1236 int err = _queues_lookup(queues, qid, &queue);
1237 if (err < 0) {
1238 return err;
1239 }
1240 err = _queue_is_full(queue, p_is_full);
1241 _queue_unmark_waiter(queue, queues->mutex);
1242 return err;
1243 }
1244
1245 static int
queue_get_count(_queues * queues,int64_t qid,Py_ssize_t * p_count)1246 queue_get_count(_queues *queues, int64_t qid, Py_ssize_t *p_count)
1247 {
1248 _queue *queue = NULL;
1249 int err = _queues_lookup(queues, qid, &queue);
1250 if (err < 0) {
1251 return err;
1252 }
1253 err = _queue_get_count(queue, p_count);
1254 _queue_unmark_waiter(queue, queues->mutex);
1255 return err;
1256 }
1257
1258
1259 /* external Queue objects ***************************************************/
1260
1261 static int _queueobj_shared(PyThreadState *,
1262 PyObject *, _PyCrossInterpreterData *);
1263
1264 static int
set_external_queue_type(module_state * state,PyTypeObject * queue_type)1265 set_external_queue_type(module_state *state, PyTypeObject *queue_type)
1266 {
1267 // Clear the old value if the .py module was reloaded.
1268 if (state->queue_type != NULL) {
1269 (void)clear_xid_class(state->queue_type);
1270 Py_CLEAR(state->queue_type);
1271 }
1272
1273 // Add and register the new type.
1274 if (ensure_xid_class(queue_type, _queueobj_shared) < 0) {
1275 return -1;
1276 }
1277 state->queue_type = (PyTypeObject *)Py_NewRef(queue_type);
1278
1279 return 0;
1280 }
1281
1282 static PyTypeObject *
get_external_queue_type(PyObject * module)1283 get_external_queue_type(PyObject *module)
1284 {
1285 module_state *state = get_module_state(module);
1286
1287 PyTypeObject *cls = state->queue_type;
1288 if (cls == NULL) {
1289 // Force the module to be loaded, to register the type.
1290 if (ensure_highlevel_module_loaded() < 0) {
1291 return NULL;
1292 }
1293 cls = state->queue_type;
1294 assert(cls != NULL);
1295 }
1296 return cls;
1297 }
1298
1299
1300 // XXX Use a new __xid__ protocol instead?
1301
1302 struct _queueid_xid {
1303 int64_t qid;
1304 };
1305
1306 static _queues * _get_global_queues(void);
1307
1308 static void *
_queueid_xid_new(int64_t qid)1309 _queueid_xid_new(int64_t qid)
1310 {
1311 _queues *queues = _get_global_queues();
1312 if (_queues_incref(queues, qid) < 0) {
1313 return NULL;
1314 }
1315
1316 struct _queueid_xid *data = PyMem_RawMalloc(sizeof(struct _queueid_xid));
1317 if (data == NULL) {
1318 _queues_decref(queues, qid);
1319 return NULL;
1320 }
1321 data->qid = qid;
1322 return (void *)data;
1323 }
1324
1325 static void
_queueid_xid_free(void * data)1326 _queueid_xid_free(void *data)
1327 {
1328 int64_t qid = ((struct _queueid_xid *)data)->qid;
1329 PyMem_RawFree(data);
1330 _queues *queues = _get_global_queues();
1331 int res = _queues_decref(queues, qid);
1332 if (res == ERR_QUEUE_NOT_FOUND) {
1333 // Already destroyed.
1334 // XXX Warn?
1335 }
1336 else {
1337 assert(res == 0);
1338 }
1339 }
1340
1341 static PyObject *
_queueobj_from_xid(_PyCrossInterpreterData * data)1342 _queueobj_from_xid(_PyCrossInterpreterData *data)
1343 {
1344 int64_t qid = *(int64_t *)_PyCrossInterpreterData_DATA(data);
1345 PyObject *qidobj = PyLong_FromLongLong(qid);
1346 if (qidobj == NULL) {
1347 return NULL;
1348 }
1349
1350 PyObject *mod = _get_current_module();
1351 if (mod == NULL) {
1352 // XXX import it?
1353 PyErr_SetString(PyExc_RuntimeError,
1354 MODULE_NAME_STR " module not imported yet");
1355 return NULL;
1356 }
1357
1358 PyTypeObject *cls = get_external_queue_type(mod);
1359 Py_DECREF(mod);
1360 if (cls == NULL) {
1361 Py_DECREF(qidobj);
1362 return NULL;
1363 }
1364 PyObject *obj = PyObject_CallOneArg((PyObject *)cls, (PyObject *)qidobj);
1365 Py_DECREF(qidobj);
1366 return obj;
1367 }
1368
1369 static int
_queueobj_shared(PyThreadState * tstate,PyObject * queueobj,_PyCrossInterpreterData * data)1370 _queueobj_shared(PyThreadState *tstate, PyObject *queueobj,
1371 _PyCrossInterpreterData *data)
1372 {
1373 PyObject *qidobj = PyObject_GetAttrString(queueobj, "_id");
1374 if (qidobj == NULL) {
1375 return -1;
1376 }
1377 struct idarg_int64_converter_data converted = {
1378 .label = "queue ID",
1379 };
1380 int res = idarg_int64_converter(qidobj, &converted);
1381 Py_CLEAR(qidobj);
1382 if (!res) {
1383 assert(PyErr_Occurred());
1384 return -1;
1385 }
1386
1387 void *raw = _queueid_xid_new(converted.id);
1388 if (raw == NULL) {
1389 return -1;
1390 }
1391 _PyCrossInterpreterData_Init(data, tstate->interp, raw, NULL,
1392 _queueobj_from_xid);
1393 _PyCrossInterpreterData_SET_FREE(data, _queueid_xid_free);
1394 return 0;
1395 }
1396
1397
1398 /* module level code ********************************************************/
1399
1400 /* globals is the process-global state for the module. It holds all
1401 the data that we need to share between interpreters, so it cannot
1402 hold PyObject values. */
1403 static struct globals {
1404 PyMutex mutex;
1405 int module_count;
1406 _queues queues;
1407 } _globals = {0};
1408
1409 static int
_globals_init(void)1410 _globals_init(void)
1411 {
1412 PyMutex_Lock(&_globals.mutex);
1413 assert(_globals.module_count >= 0);
1414 _globals.module_count++;
1415 if (_globals.module_count == 1) {
1416 // Called for the first time.
1417 PyThread_type_lock mutex = PyThread_allocate_lock();
1418 if (mutex == NULL) {
1419 _globals.module_count--;
1420 PyMutex_Unlock(&_globals.mutex);
1421 return ERR_QUEUES_ALLOC;
1422 }
1423 _queues_init(&_globals.queues, mutex);
1424 }
1425 PyMutex_Unlock(&_globals.mutex);
1426 return 0;
1427 }
1428
1429 static void
_globals_fini(void)1430 _globals_fini(void)
1431 {
1432 PyMutex_Lock(&_globals.mutex);
1433 assert(_globals.module_count > 0);
1434 _globals.module_count--;
1435 if (_globals.module_count == 0) {
1436 PyThread_type_lock mutex;
1437 _queues_fini(&_globals.queues, &mutex);
1438 assert(mutex != NULL);
1439 PyThread_free_lock(mutex);
1440 }
1441 PyMutex_Unlock(&_globals.mutex);
1442 }
1443
1444 static _queues *
_get_global_queues(void)1445 _get_global_queues(void)
1446 {
1447 return &_globals.queues;
1448 }
1449
1450
1451 static void
clear_interpreter(void * data)1452 clear_interpreter(void *data)
1453 {
1454 if (_globals.module_count == 0) {
1455 return;
1456 }
1457 PyInterpreterState *interp = (PyInterpreterState *)data;
1458 assert(interp == _get_current_interp());
1459 int64_t interpid = PyInterpreterState_GetID(interp);
1460 _queues_clear_interpreter(&_globals.queues, interpid);
1461 }
1462
1463
1464 typedef struct idarg_int64_converter_data qidarg_converter_data;
1465
1466 static int
qidarg_converter(PyObject * arg,void * ptr)1467 qidarg_converter(PyObject *arg, void *ptr)
1468 {
1469 qidarg_converter_data *data = ptr;
1470 if (data->label == NULL) {
1471 data->label = "queue ID";
1472 }
1473 return idarg_int64_converter(arg, ptr);
1474 }
1475
1476
1477 static PyObject *
queuesmod_create(PyObject * self,PyObject * args,PyObject * kwds)1478 queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
1479 {
1480 static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL};
1481 Py_ssize_t maxsize;
1482 int fmt;
1483 int unboundop;
1484 if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist,
1485 &maxsize, &fmt, &unboundop))
1486 {
1487 return NULL;
1488 }
1489 if (!check_unbound(unboundop)) {
1490 PyErr_Format(PyExc_ValueError,
1491 "unsupported unboundop %d", unboundop);
1492 return NULL;
1493 }
1494
1495 int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop);
1496 if (qid < 0) {
1497 (void)handle_queue_error((int)qid, self, qid);
1498 return NULL;
1499 }
1500
1501 PyObject *qidobj = PyLong_FromLongLong(qid);
1502 if (qidobj == NULL) {
1503 PyObject *exc = PyErr_GetRaisedException();
1504 int err = queue_destroy(&_globals.queues, qid);
1505 if (handle_queue_error(err, self, qid)) {
1506 // XXX issue a warning?
1507 PyErr_Clear();
1508 }
1509 PyErr_SetRaisedException(exc);
1510 return NULL;
1511 }
1512
1513 return qidobj;
1514 }
1515
1516 PyDoc_STRVAR(queuesmod_create_doc,
1517 "create(maxsize, fmt, unboundop) -> qid\n\
1518 \n\
1519 Create a new cross-interpreter queue and return its unique generated ID.\n\
1520 It is a new reference as though bind() had been called on the queue.\n\
1521 \n\
1522 The caller is responsible for calling destroy() for the new queue\n\
1523 before the runtime is finalized.");
1524
1525 static PyObject *
queuesmod_destroy(PyObject * self,PyObject * args,PyObject * kwds)1526 queuesmod_destroy(PyObject *self, PyObject *args, PyObject *kwds)
1527 {
1528 static char *kwlist[] = {"qid", NULL};
1529 qidarg_converter_data qidarg = {0};
1530 if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:destroy", kwlist,
1531 qidarg_converter, &qidarg)) {
1532 return NULL;
1533 }
1534 int64_t qid = qidarg.id;
1535
1536 int err = queue_destroy(&_globals.queues, qid);
1537 if (handle_queue_error(err, self, qid)) {
1538 return NULL;
1539 }
1540 Py_RETURN_NONE;
1541 }
1542
1543 PyDoc_STRVAR(queuesmod_destroy_doc,
1544 "destroy(qid)\n\
1545 \n\
1546 Clear and destroy the queue. Afterward attempts to use the queue\n\
1547 will behave as though it never existed.");
1548
1549 static PyObject *
queuesmod_list_all(PyObject * self,PyObject * Py_UNUSED (ignored))1550 queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
1551 {
1552 int64_t count = 0;
1553 struct queue_id_and_info *qids = _queues_list_all(&_globals.queues, &count);
1554 if (qids == NULL) {
1555 if (!PyErr_Occurred() && count == 0) {
1556 return PyList_New(0);
1557 }
1558 return NULL;
1559 }
1560 PyObject *ids = PyList_New((Py_ssize_t)count);
1561 if (ids == NULL) {
1562 goto finally;
1563 }
1564 struct queue_id_and_info *cur = qids;
1565 for (int64_t i=0; i < count; cur++, i++) {
1566 PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt,
1567 cur->unboundop);
1568 if (item == NULL) {
1569 Py_SETREF(ids, NULL);
1570 break;
1571 }
1572 PyList_SET_ITEM(ids, (Py_ssize_t)i, item);
1573 }
1574
1575 finally:
1576 PyMem_Free(qids);
1577 return ids;
1578 }
1579
1580 PyDoc_STRVAR(queuesmod_list_all_doc,
1581 "list_all() -> [(qid, fmt)]\n\
1582 \n\
1583 Return the list of IDs for all queues.\n\
1584 Each corresponding default format is also included.");
1585
1586 static PyObject *
queuesmod_put(PyObject * self,PyObject * args,PyObject * kwds)1587 queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
1588 {
1589 static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL};
1590 qidarg_converter_data qidarg = {0};
1591 PyObject *obj;
1592 int fmt;
1593 int unboundop;
1594 if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist,
1595 qidarg_converter, &qidarg, &obj, &fmt,
1596 &unboundop))
1597 {
1598 return NULL;
1599 }
1600 int64_t qid = qidarg.id;
1601 if (!check_unbound(unboundop)) {
1602 PyErr_Format(PyExc_ValueError,
1603 "unsupported unboundop %d", unboundop);
1604 return NULL;
1605 }
1606
1607 /* Queue up the object. */
1608 int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop);
1609 // This is the only place that raises QueueFull.
1610 if (handle_queue_error(err, self, qid)) {
1611 return NULL;
1612 }
1613
1614 Py_RETURN_NONE;
1615 }
1616
1617 PyDoc_STRVAR(queuesmod_put_doc,
1618 "put(qid, obj, fmt)\n\
1619 \n\
1620 Add the object's data to the queue.");
1621
1622 static PyObject *
queuesmod_get(PyObject * self,PyObject * args,PyObject * kwds)1623 queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds)
1624 {
1625 static char *kwlist[] = {"qid", NULL};
1626 qidarg_converter_data qidarg = {0};
1627 if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:get", kwlist,
1628 qidarg_converter, &qidarg)) {
1629 return NULL;
1630 }
1631 int64_t qid = qidarg.id;
1632
1633 PyObject *obj = NULL;
1634 int fmt = 0;
1635 int unboundop = 0;
1636 int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop);
1637 // This is the only place that raises QueueEmpty.
1638 if (handle_queue_error(err, self, qid)) {
1639 return NULL;
1640 }
1641
1642 if (obj == NULL) {
1643 return Py_BuildValue("Oii", Py_None, fmt, unboundop);
1644 }
1645 PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None);
1646 Py_DECREF(obj);
1647 return res;
1648 }
1649
1650 PyDoc_STRVAR(queuesmod_get_doc,
1651 "get(qid) -> (obj, fmt)\n\
1652 \n\
1653 Return a new object from the data at the front of the queue.\n\
1654 The object's format is also returned.\n\
1655 \n\
1656 If there is nothing to receive then raise QueueEmpty.");
1657
1658 static PyObject *
queuesmod_bind(PyObject * self,PyObject * args,PyObject * kwds)1659 queuesmod_bind(PyObject *self, PyObject *args, PyObject *kwds)
1660 {
1661 static char *kwlist[] = {"qid", NULL};
1662 qidarg_converter_data qidarg = {0};
1663 if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:bind", kwlist,
1664 qidarg_converter, &qidarg)) {
1665 return NULL;
1666 }
1667 int64_t qid = qidarg.id;
1668
1669 // XXX Check module state if bound already.
1670
1671 int err = _queues_incref(&_globals.queues, qid);
1672 if (handle_queue_error(err, self, qid)) {
1673 return NULL;
1674 }
1675
1676 // XXX Update module state.
1677
1678 Py_RETURN_NONE;
1679 }
1680
1681 PyDoc_STRVAR(queuesmod_bind_doc,
1682 "bind(qid)\n\
1683 \n\
1684 Take a reference to the identified queue.\n\
1685 The queue is not destroyed until there are no references left.");
1686
1687 static PyObject *
queuesmod_release(PyObject * self,PyObject * args,PyObject * kwds)1688 queuesmod_release(PyObject *self, PyObject *args, PyObject *kwds)
1689 {
1690 // Note that only the current interpreter is affected.
1691 static char *kwlist[] = {"qid", NULL};
1692 qidarg_converter_data qidarg = {0};
1693 if (!PyArg_ParseTupleAndKeywords(args, kwds,
1694 "O&:release", kwlist,
1695 qidarg_converter, &qidarg)) {
1696 return NULL;
1697 }
1698 int64_t qid = qidarg.id;
1699
1700 // XXX Check module state if bound already.
1701 // XXX Update module state.
1702
1703 int err = _queues_decref(&_globals.queues, qid);
1704 if (handle_queue_error(err, self, qid)) {
1705 return NULL;
1706 }
1707
1708 Py_RETURN_NONE;
1709 }
1710
1711 PyDoc_STRVAR(queuesmod_release_doc,
1712 "release(qid)\n\
1713 \n\
1714 Release a reference to the queue.\n\
1715 The queue is destroyed once there are no references left.");
1716
1717 static PyObject *
queuesmod_get_maxsize(PyObject * self,PyObject * args,PyObject * kwds)1718 queuesmod_get_maxsize(PyObject *self, PyObject *args, PyObject *kwds)
1719 {
1720 static char *kwlist[] = {"qid", NULL};
1721 qidarg_converter_data qidarg = {0};
1722 if (!PyArg_ParseTupleAndKeywords(args, kwds,
1723 "O&:get_maxsize", kwlist,
1724 qidarg_converter, &qidarg)) {
1725 return NULL;
1726 }
1727 int64_t qid = qidarg.id;
1728
1729 Py_ssize_t maxsize = -1;
1730 int err = queue_get_maxsize(&_globals.queues, qid, &maxsize);
1731 if (handle_queue_error(err, self, qid)) {
1732 return NULL;
1733 }
1734 return PyLong_FromLongLong(maxsize);
1735 }
1736
1737 PyDoc_STRVAR(queuesmod_get_maxsize_doc,
1738 "get_maxsize(qid)\n\
1739 \n\
1740 Return the maximum number of items in the queue.");
1741
1742 static PyObject *
queuesmod_get_queue_defaults(PyObject * self,PyObject * args,PyObject * kwds)1743 queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds)
1744 {
1745 static char *kwlist[] = {"qid", NULL};
1746 qidarg_converter_data qidarg = {0};
1747 if (!PyArg_ParseTupleAndKeywords(args, kwds,
1748 "O&:get_queue_defaults", kwlist,
1749 qidarg_converter, &qidarg)) {
1750 return NULL;
1751 }
1752 int64_t qid = qidarg.id;
1753
1754 _queue *queue = NULL;
1755 int err = _queues_lookup(&_globals.queues, qid, &queue);
1756 if (handle_queue_error(err, self, qid)) {
1757 return NULL;
1758 }
1759 int fmt = queue->defaults.fmt;
1760 int unboundop = queue->defaults.unboundop;
1761 _queue_unmark_waiter(queue, _globals.queues.mutex);
1762
1763 PyObject *defaults = Py_BuildValue("ii", fmt, unboundop);
1764 return defaults;
1765 }
1766
1767 PyDoc_STRVAR(queuesmod_get_queue_defaults_doc,
1768 "get_queue_defaults(qid)\n\
1769 \n\
1770 Return the queue's default values, set when it was created.");
1771
1772 static PyObject *
queuesmod_is_full(PyObject * self,PyObject * args,PyObject * kwds)1773 queuesmod_is_full(PyObject *self, PyObject *args, PyObject *kwds)
1774 {
1775 static char *kwlist[] = {"qid", NULL};
1776 qidarg_converter_data qidarg = {0};
1777 if (!PyArg_ParseTupleAndKeywords(args, kwds,
1778 "O&:is_full", kwlist,
1779 qidarg_converter, &qidarg)) {
1780 return NULL;
1781 }
1782 int64_t qid = qidarg.id;
1783
1784 int is_full = 0;
1785 int err = queue_is_full(&_globals.queues, qid, &is_full);
1786 if (handle_queue_error(err, self, qid)) {
1787 return NULL;
1788 }
1789 if (is_full) {
1790 Py_RETURN_TRUE;
1791 }
1792 Py_RETURN_FALSE;
1793 }
1794
1795 PyDoc_STRVAR(queuesmod_is_full_doc,
1796 "is_full(qid)\n\
1797 \n\
1798 Return true if the queue has a maxsize and has reached it.");
1799
1800 static PyObject *
queuesmod_get_count(PyObject * self,PyObject * args,PyObject * kwds)1801 queuesmod_get_count(PyObject *self, PyObject *args, PyObject *kwds)
1802 {
1803 static char *kwlist[] = {"qid", NULL};
1804 qidarg_converter_data qidarg = {0};
1805 if (!PyArg_ParseTupleAndKeywords(args, kwds,
1806 "O&:get_count", kwlist,
1807 qidarg_converter, &qidarg)) {
1808 return NULL;
1809 }
1810 int64_t qid = qidarg.id;
1811
1812 Py_ssize_t count = -1;
1813 int err = queue_get_count(&_globals.queues, qid, &count);
1814 if (handle_queue_error(err, self, qid)) {
1815 return NULL;
1816 }
1817 assert(count >= 0);
1818 return PyLong_FromSsize_t(count);
1819 }
1820
1821 PyDoc_STRVAR(queuesmod_get_count_doc,
1822 "get_count(qid)\n\
1823 \n\
1824 Return the number of items in the queue.");
1825
1826 static PyObject *
queuesmod__register_heap_types(PyObject * self,PyObject * args,PyObject * kwds)1827 queuesmod__register_heap_types(PyObject *self, PyObject *args, PyObject *kwds)
1828 {
1829 static char *kwlist[] = {"queuetype", "emptyerror", "fullerror", NULL};
1830 PyObject *queuetype;
1831 PyObject *emptyerror;
1832 PyObject *fullerror;
1833 if (!PyArg_ParseTupleAndKeywords(args, kwds,
1834 "OOO:_register_heap_types", kwlist,
1835 &queuetype, &emptyerror, &fullerror)) {
1836 return NULL;
1837 }
1838 if (!PyType_Check(queuetype)) {
1839 PyErr_SetString(PyExc_TypeError,
1840 "expected a type for 'queuetype'");
1841 return NULL;
1842 }
1843 if (!PyExceptionClass_Check(emptyerror)) {
1844 PyErr_SetString(PyExc_TypeError,
1845 "expected an exception type for 'emptyerror'");
1846 return NULL;
1847 }
1848 if (!PyExceptionClass_Check(fullerror)) {
1849 PyErr_SetString(PyExc_TypeError,
1850 "expected an exception type for 'fullerror'");
1851 return NULL;
1852 }
1853
1854 module_state *state = get_module_state(self);
1855
1856 if (set_external_queue_type(state, (PyTypeObject *)queuetype) < 0) {
1857 return NULL;
1858 }
1859 if (set_external_exc_types(state, emptyerror, fullerror) < 0) {
1860 return NULL;
1861 }
1862
1863 Py_RETURN_NONE;
1864 }
1865
1866 static PyMethodDef module_functions[] = {
1867 {"create", _PyCFunction_CAST(queuesmod_create),
1868 METH_VARARGS | METH_KEYWORDS, queuesmod_create_doc},
1869 {"destroy", _PyCFunction_CAST(queuesmod_destroy),
1870 METH_VARARGS | METH_KEYWORDS, queuesmod_destroy_doc},
1871 {"list_all", queuesmod_list_all,
1872 METH_NOARGS, queuesmod_list_all_doc},
1873 {"put", _PyCFunction_CAST(queuesmod_put),
1874 METH_VARARGS | METH_KEYWORDS, queuesmod_put_doc},
1875 {"get", _PyCFunction_CAST(queuesmod_get),
1876 METH_VARARGS | METH_KEYWORDS, queuesmod_get_doc},
1877 {"bind", _PyCFunction_CAST(queuesmod_bind),
1878 METH_VARARGS | METH_KEYWORDS, queuesmod_bind_doc},
1879 {"release", _PyCFunction_CAST(queuesmod_release),
1880 METH_VARARGS | METH_KEYWORDS, queuesmod_release_doc},
1881 {"get_maxsize", _PyCFunction_CAST(queuesmod_get_maxsize),
1882 METH_VARARGS | METH_KEYWORDS, queuesmod_get_maxsize_doc},
1883 {"get_queue_defaults", _PyCFunction_CAST(queuesmod_get_queue_defaults),
1884 METH_VARARGS | METH_KEYWORDS, queuesmod_get_queue_defaults_doc},
1885 {"is_full", _PyCFunction_CAST(queuesmod_is_full),
1886 METH_VARARGS | METH_KEYWORDS, queuesmod_is_full_doc},
1887 {"get_count", _PyCFunction_CAST(queuesmod_get_count),
1888 METH_VARARGS | METH_KEYWORDS, queuesmod_get_count_doc},
1889 {"_register_heap_types", _PyCFunction_CAST(queuesmod__register_heap_types),
1890 METH_VARARGS | METH_KEYWORDS, NULL},
1891
1892 {NULL, NULL} /* sentinel */
1893 };
1894
1895
1896 /* initialization function */
1897
1898 PyDoc_STRVAR(module_doc,
1899 "This module provides primitive operations to manage Python interpreters.\n\
1900 The 'interpreters' module provides a more convenient interface.");
1901
1902 static int
module_exec(PyObject * mod)1903 module_exec(PyObject *mod)
1904 {
1905 int err = _globals_init();
1906 if (handle_queue_error(err, mod, -1)) {
1907 return -1;
1908 }
1909
1910 /* Add exception types */
1911 if (add_QueueError(mod) < 0) {
1912 goto error;
1913 }
1914
1915 /* Make sure queues drop objects owned by this interpreter. */
1916 PyInterpreterState *interp = _get_current_interp();
1917 PyUnstable_AtExit(interp, clear_interpreter, (void *)interp);
1918
1919 return 0;
1920
1921 error:
1922 _globals_fini();
1923 return -1;
1924 }
1925
1926 static struct PyModuleDef_Slot module_slots[] = {
1927 {Py_mod_exec, module_exec},
1928 {Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED},
1929 {Py_mod_gil, Py_MOD_GIL_NOT_USED},
1930 {0, NULL},
1931 };
1932
1933 static int
module_traverse(PyObject * mod,visitproc visit,void * arg)1934 module_traverse(PyObject *mod, visitproc visit, void *arg)
1935 {
1936 module_state *state = get_module_state(mod);
1937 traverse_module_state(state, visit, arg);
1938 return 0;
1939 }
1940
1941 static int
module_clear(PyObject * mod)1942 module_clear(PyObject *mod)
1943 {
1944 module_state *state = get_module_state(mod);
1945
1946 // Now we clear the module state.
1947 clear_module_state(state);
1948 return 0;
1949 }
1950
1951 static void
module_free(void * mod)1952 module_free(void *mod)
1953 {
1954 module_state *state = get_module_state(mod);
1955
1956 // Now we clear the module state.
1957 clear_module_state(state);
1958
1959 _globals_fini();
1960 }
1961
1962 static struct PyModuleDef moduledef = {
1963 .m_base = PyModuleDef_HEAD_INIT,
1964 .m_name = MODULE_NAME_STR,
1965 .m_doc = module_doc,
1966 .m_size = sizeof(module_state),
1967 .m_methods = module_functions,
1968 .m_slots = module_slots,
1969 .m_traverse = module_traverse,
1970 .m_clear = module_clear,
1971 .m_free = (freefunc)module_free,
1972 };
1973
1974 PyMODINIT_FUNC
MODINIT_FUNC_NAME(void)1975 MODINIT_FUNC_NAME(void)
1976 {
1977 return PyModuleDef_Init(&moduledef);
1978 }
1979