1 #ifndef Py_BUILD_CORE_BUILTIN
2 # define Py_BUILD_CORE_MODULE 1
3 #endif
4
5 #include "Python.h"
6 #include "pycore_ceval.h" // Py_MakePendingCalls()
7 #include "pycore_moduleobject.h" // _PyModule_GetState()
8 #include "pycore_parking_lot.h"
9 #include "pycore_time.h" // _PyTime_FromSecondsObject()
10
11 #include <stdbool.h>
12 #include <stddef.h> // offsetof()
13
14 typedef struct {
15 PyTypeObject *SimpleQueueType;
16 PyObject *EmptyError;
17 } simplequeue_state;
18
19 static simplequeue_state *
simplequeue_get_state(PyObject * module)20 simplequeue_get_state(PyObject *module)
21 {
22 simplequeue_state *state = _PyModule_GetState(module);
23 assert(state);
24 return state;
25 }
26 static struct PyModuleDef queuemodule;
27 #define simplequeue_get_state_by_type(type) \
28 (simplequeue_get_state(PyType_GetModuleByDef(type, &queuemodule)))
29
30 static const Py_ssize_t INITIAL_RING_BUF_CAPACITY = 8;
31
32 typedef struct {
33 // Where to place the next item
34 Py_ssize_t put_idx;
35
36 // Where to get the next item
37 Py_ssize_t get_idx;
38
39 PyObject **items;
40
41 // Total number of items that may be stored
42 Py_ssize_t items_cap;
43
44 // Number of items stored
45 Py_ssize_t num_items;
46 } RingBuf;
47
48 static int
RingBuf_Init(RingBuf * buf)49 RingBuf_Init(RingBuf *buf)
50 {
51 buf->put_idx = 0;
52 buf->get_idx = 0;
53 buf->items_cap = INITIAL_RING_BUF_CAPACITY;
54 buf->num_items = 0;
55 buf->items = PyMem_Calloc(buf->items_cap, sizeof(PyObject *));
56 if (buf->items == NULL) {
57 PyErr_NoMemory();
58 return -1;
59 }
60 return 0;
61 }
62
63 static PyObject *
RingBuf_At(RingBuf * buf,Py_ssize_t idx)64 RingBuf_At(RingBuf *buf, Py_ssize_t idx)
65 {
66 assert(idx >= 0 && idx < buf->num_items);
67 return buf->items[(buf->get_idx + idx) % buf->items_cap];
68 }
69
70 static void
RingBuf_Fini(RingBuf * buf)71 RingBuf_Fini(RingBuf *buf)
72 {
73 PyObject **items = buf->items;
74 Py_ssize_t num_items = buf->num_items;
75 Py_ssize_t cap = buf->items_cap;
76 Py_ssize_t idx = buf->get_idx;
77 buf->items = NULL;
78 buf->put_idx = 0;
79 buf->get_idx = 0;
80 buf->num_items = 0;
81 buf->items_cap = 0;
82 for (Py_ssize_t n = num_items; n > 0; idx = (idx + 1) % cap, n--) {
83 Py_DECREF(items[idx]);
84 }
85 PyMem_Free(items);
86 }
87
88 // Resize the underlying items array of buf to the new capacity and arrange
89 // the items contiguously in the new items array.
90 //
91 // Returns -1 on allocation failure or 0 on success.
92 static int
resize_ringbuf(RingBuf * buf,Py_ssize_t capacity)93 resize_ringbuf(RingBuf *buf, Py_ssize_t capacity)
94 {
95 Py_ssize_t new_capacity = Py_MAX(INITIAL_RING_BUF_CAPACITY, capacity);
96 if (new_capacity == buf->items_cap) {
97 return 0;
98 }
99 assert(buf->num_items <= new_capacity);
100
101 PyObject **new_items = PyMem_Calloc(new_capacity, sizeof(PyObject *));
102 if (new_items == NULL) {
103 return -1;
104 }
105
106 // Copy the "tail" of the old items array. This corresponds to "head" of
107 // the abstract ring buffer.
108 Py_ssize_t tail_size =
109 Py_MIN(buf->num_items, buf->items_cap - buf->get_idx);
110 if (tail_size > 0) {
111 memcpy(new_items, buf->items + buf->get_idx,
112 tail_size * sizeof(PyObject *));
113 }
114
115 // Copy the "head" of the old items array, if any. This corresponds to the
116 // "tail" of the abstract ring buffer.
117 Py_ssize_t head_size = buf->num_items - tail_size;
118 if (head_size > 0) {
119 memcpy(new_items + tail_size, buf->items,
120 head_size * sizeof(PyObject *));
121 }
122
123 PyMem_Free(buf->items);
124 buf->items = new_items;
125 buf->items_cap = new_capacity;
126 buf->get_idx = 0;
127 buf->put_idx = buf->num_items;
128
129 return 0;
130 }
131
132 // Returns a strong reference from the head of the buffer.
133 static PyObject *
RingBuf_Get(RingBuf * buf)134 RingBuf_Get(RingBuf *buf)
135 {
136 assert(buf->num_items > 0);
137
138 if (buf->num_items < (buf->items_cap / 4)) {
139 // Items is less than 25% occupied, shrink it by 50%. This allows for
140 // growth without immediately needing to resize the underlying items
141 // array.
142 //
143 // It's safe it ignore allocation failures here; shrinking is an
144 // optimization that isn't required for correctness.
145 (void)resize_ringbuf(buf, buf->items_cap / 2);
146 }
147
148 PyObject *item = buf->items[buf->get_idx];
149 buf->items[buf->get_idx] = NULL;
150 buf->get_idx = (buf->get_idx + 1) % buf->items_cap;
151 buf->num_items--;
152 return item;
153 }
154
155 // Returns 0 on success or -1 if the buffer failed to grow.
156 //
157 // Steals a reference to item.
158 static int
RingBuf_Put(RingBuf * buf,PyObject * item)159 RingBuf_Put(RingBuf *buf, PyObject *item)
160 {
161 assert(buf->num_items <= buf->items_cap);
162
163 if (buf->num_items == buf->items_cap) {
164 // Buffer is full, grow it.
165 if (resize_ringbuf(buf, buf->items_cap * 2) < 0) {
166 PyErr_NoMemory();
167 return -1;
168 }
169 }
170 buf->items[buf->put_idx] = item;
171 buf->put_idx = (buf->put_idx + 1) % buf->items_cap;
172 buf->num_items++;
173 return 0;
174 }
175
176 static Py_ssize_t
RingBuf_Len(RingBuf * buf)177 RingBuf_Len(RingBuf *buf)
178 {
179 return buf->num_items;
180 }
181
182 static bool
RingBuf_IsEmpty(RingBuf * buf)183 RingBuf_IsEmpty(RingBuf *buf)
184 {
185 return buf->num_items == 0;
186 }
187
188 typedef struct {
189 PyObject_HEAD
190
191 // Are there threads waiting for items
192 bool has_threads_waiting;
193
194 // Items in the queue
195 RingBuf buf;
196
197 PyObject *weakreflist;
198 } simplequeueobject;
199
200 /*[clinic input]
201 module _queue
202 class _queue.SimpleQueue "simplequeueobject *" "simplequeue_get_state_by_type(type)->SimpleQueueType"
203 [clinic start generated code]*/
204 /*[clinic end generated code: output=da39a3ee5e6b4b0d input=0a4023fe4d198c8d]*/
205
206 static int
simplequeue_clear(simplequeueobject * self)207 simplequeue_clear(simplequeueobject *self)
208 {
209 RingBuf_Fini(&self->buf);
210 return 0;
211 }
212
213 static void
simplequeue_dealloc(simplequeueobject * self)214 simplequeue_dealloc(simplequeueobject *self)
215 {
216 PyTypeObject *tp = Py_TYPE(self);
217
218 PyObject_GC_UnTrack(self);
219 (void)simplequeue_clear(self);
220 if (self->weakreflist != NULL)
221 PyObject_ClearWeakRefs((PyObject *) self);
222 Py_TYPE(self)->tp_free(self);
223 Py_DECREF(tp);
224 }
225
226 static int
simplequeue_traverse(simplequeueobject * self,visitproc visit,void * arg)227 simplequeue_traverse(simplequeueobject *self, visitproc visit, void *arg)
228 {
229 RingBuf *buf = &self->buf;
230 for (Py_ssize_t i = 0, num_items = buf->num_items; i < num_items; i++) {
231 Py_VISIT(RingBuf_At(buf, i));
232 }
233 Py_VISIT(Py_TYPE(self));
234 return 0;
235 }
236
237 /*[clinic input]
238 @classmethod
239 _queue.SimpleQueue.__new__ as simplequeue_new
240
241 Simple, unbounded, reentrant FIFO queue.
242 [clinic start generated code]*/
243
244 static PyObject *
simplequeue_new_impl(PyTypeObject * type)245 simplequeue_new_impl(PyTypeObject *type)
246 /*[clinic end generated code: output=ba97740608ba31cd input=a0674a1643e3e2fb]*/
247 {
248 simplequeueobject *self;
249
250 self = (simplequeueobject *) type->tp_alloc(type, 0);
251 if (self != NULL) {
252 self->weakreflist = NULL;
253 if (RingBuf_Init(&self->buf) < 0) {
254 Py_DECREF(self);
255 return NULL;
256 }
257 }
258
259 return (PyObject *) self;
260 }
261
262 typedef struct {
263 bool handed_off;
264 simplequeueobject *queue;
265 PyObject *item;
266 } HandoffData;
267
268 static void
maybe_handoff_item(HandoffData * data,PyObject ** item,int has_more_waiters)269 maybe_handoff_item(HandoffData *data, PyObject **item, int has_more_waiters)
270 {
271 if (item == NULL) {
272 // No threads were waiting
273 data->handed_off = false;
274 }
275 else {
276 // There was at least one waiting thread, hand off the item
277 *item = data->item;
278 data->handed_off = true;
279 }
280 data->queue->has_threads_waiting = has_more_waiters;
281 }
282
283 /*[clinic input]
284 @critical_section
285 _queue.SimpleQueue.put
286 item: object
287 block: bool = True
288 timeout: object = None
289
290 Put the item on the queue.
291
292 The optional 'block' and 'timeout' arguments are ignored, as this method
293 never blocks. They are provided for compatibility with the Queue class.
294
295 [clinic start generated code]*/
296
297 static PyObject *
_queue_SimpleQueue_put_impl(simplequeueobject * self,PyObject * item,int block,PyObject * timeout)298 _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
299 int block, PyObject *timeout)
300 /*[clinic end generated code: output=4333136e88f90d8b input=a16dbb33363c0fa8]*/
301 {
302 HandoffData data = {
303 .handed_off = 0,
304 .item = Py_NewRef(item),
305 .queue = self,
306 };
307 if (self->has_threads_waiting) {
308 // Try to hand the item off directly if there are threads waiting
309 _PyParkingLot_Unpark(&self->has_threads_waiting,
310 (_Py_unpark_fn_t *)maybe_handoff_item, &data);
311 }
312 if (!data.handed_off) {
313 if (RingBuf_Put(&self->buf, item) < 0) {
314 return NULL;
315 }
316 }
317 Py_RETURN_NONE;
318 }
319
320 /*[clinic input]
321 @critical_section
322 _queue.SimpleQueue.put_nowait
323 item: object
324
325 Put an item into the queue without blocking.
326
327 This is exactly equivalent to `put(item)` and is only provided
328 for compatibility with the Queue class.
329
330 [clinic start generated code]*/
331
332 static PyObject *
_queue_SimpleQueue_put_nowait_impl(simplequeueobject * self,PyObject * item)333 _queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item)
334 /*[clinic end generated code: output=0990536715efb1f1 input=ce949cc2cd8a4119]*/
335 {
336 return _queue_SimpleQueue_put_impl(self, item, 0, Py_None);
337 }
338
339 static PyObject *
empty_error(PyTypeObject * cls)340 empty_error(PyTypeObject *cls)
341 {
342 PyObject *module = PyType_GetModule(cls);
343 assert(module != NULL);
344 simplequeue_state *state = simplequeue_get_state(module);
345 PyErr_SetNone(state->EmptyError);
346 return NULL;
347 }
348
349 /*[clinic input]
350 @critical_section
351 _queue.SimpleQueue.get
352
353 cls: defining_class
354 /
355 block: bool = True
356 timeout as timeout_obj: object = None
357
358 Remove and return an item from the queue.
359
360 If optional args 'block' is true and 'timeout' is None (the default),
361 block if necessary until an item is available. If 'timeout' is
362 a non-negative number, it blocks at most 'timeout' seconds and raises
363 the Empty exception if no item was available within that time.
364 Otherwise ('block' is false), return an item if one is immediately
365 available, else raise the Empty exception ('timeout' is ignored
366 in that case).
367
368 [clinic start generated code]*/
369
370 static PyObject *
_queue_SimpleQueue_get_impl(simplequeueobject * self,PyTypeObject * cls,int block,PyObject * timeout_obj)371 _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
372 int block, PyObject *timeout_obj)
373 /*[clinic end generated code: output=5c2cca914cd1e55b input=f7836c65e5839c51]*/
374 {
375 PyTime_t endtime = 0;
376
377 // XXX Use PyThread_ParseTimeoutArg().
378
379 if (block != 0 && !Py_IsNone(timeout_obj)) {
380 /* With timeout */
381 PyTime_t timeout;
382 if (_PyTime_FromSecondsObject(&timeout,
383 timeout_obj, _PyTime_ROUND_CEILING) < 0) {
384 return NULL;
385 }
386 if (timeout < 0) {
387 PyErr_SetString(PyExc_ValueError,
388 "'timeout' must be a non-negative number");
389 return NULL;
390 }
391 endtime = _PyDeadline_Init(timeout);
392 }
393
394 for (;;) {
395 if (!RingBuf_IsEmpty(&self->buf)) {
396 return RingBuf_Get(&self->buf);
397 }
398
399 if (!block) {
400 return empty_error(cls);
401 }
402
403 int64_t timeout_ns = -1;
404 if (endtime != 0) {
405 timeout_ns = _PyDeadline_Get(endtime);
406 if (timeout_ns < 0) {
407 return empty_error(cls);
408 }
409 }
410
411 bool waiting = 1;
412 self->has_threads_waiting = waiting;
413
414 PyObject *item = NULL;
415 int st = _PyParkingLot_Park(&self->has_threads_waiting, &waiting,
416 sizeof(bool), timeout_ns, &item,
417 /* detach */ 1);
418 switch (st) {
419 case Py_PARK_OK: {
420 assert(item != NULL);
421 return item;
422 }
423 case Py_PARK_TIMEOUT: {
424 return empty_error(cls);
425 }
426 case Py_PARK_INTR: {
427 // Interrupted
428 if (Py_MakePendingCalls() < 0) {
429 return NULL;
430 }
431 break;
432 }
433 case Py_PARK_AGAIN: {
434 // This should be impossible with the current implementation of
435 // PyParkingLot, but would be possible if critical sections /
436 // the GIL were released before the thread was added to the
437 // internal thread queue in the parking lot.
438 break;
439 }
440 default: {
441 Py_UNREACHABLE();
442 }
443 }
444 }
445 }
446
447 /*[clinic input]
448 @critical_section
449 _queue.SimpleQueue.get_nowait
450
451 cls: defining_class
452 /
453
454 Remove and return an item from the queue without blocking.
455
456 Only get an item if one is immediately available. Otherwise
457 raise the Empty exception.
458 [clinic start generated code]*/
459
460 static PyObject *
_queue_SimpleQueue_get_nowait_impl(simplequeueobject * self,PyTypeObject * cls)461 _queue_SimpleQueue_get_nowait_impl(simplequeueobject *self,
462 PyTypeObject *cls)
463 /*[clinic end generated code: output=620c58e2750f8b8a input=d48be63633fefae9]*/
464 {
465 return _queue_SimpleQueue_get_impl(self, cls, 0, Py_None);
466 }
467
468 /*[clinic input]
469 @critical_section
470 _queue.SimpleQueue.empty -> bool
471
472 Return True if the queue is empty, False otherwise (not reliable!).
473 [clinic start generated code]*/
474
475 static int
_queue_SimpleQueue_empty_impl(simplequeueobject * self)476 _queue_SimpleQueue_empty_impl(simplequeueobject *self)
477 /*[clinic end generated code: output=1a02a1b87c0ef838 input=96cb22df5a67d831]*/
478 {
479 return RingBuf_IsEmpty(&self->buf);
480 }
481
482 /*[clinic input]
483 @critical_section
484 _queue.SimpleQueue.qsize -> Py_ssize_t
485
486 Return the approximate size of the queue (not reliable!).
487 [clinic start generated code]*/
488
489 static Py_ssize_t
_queue_SimpleQueue_qsize_impl(simplequeueobject * self)490 _queue_SimpleQueue_qsize_impl(simplequeueobject *self)
491 /*[clinic end generated code: output=f9dcd9d0a90e121e input=e218623cb8c16a79]*/
492 {
493 return RingBuf_Len(&self->buf);
494 }
495
496 static int
queue_traverse(PyObject * m,visitproc visit,void * arg)497 queue_traverse(PyObject *m, visitproc visit, void *arg)
498 {
499 simplequeue_state *state = simplequeue_get_state(m);
500 Py_VISIT(state->SimpleQueueType);
501 Py_VISIT(state->EmptyError);
502 return 0;
503 }
504
505 static int
queue_clear(PyObject * m)506 queue_clear(PyObject *m)
507 {
508 simplequeue_state *state = simplequeue_get_state(m);
509 Py_CLEAR(state->SimpleQueueType);
510 Py_CLEAR(state->EmptyError);
511 return 0;
512 }
513
514 static void
queue_free(void * m)515 queue_free(void *m)
516 {
517 queue_clear((PyObject *)m);
518 }
519
520 #include "clinic/_queuemodule.c.h"
521
522
523 static PyMethodDef simplequeue_methods[] = {
524 _QUEUE_SIMPLEQUEUE_EMPTY_METHODDEF
525 _QUEUE_SIMPLEQUEUE_GET_METHODDEF
526 _QUEUE_SIMPLEQUEUE_GET_NOWAIT_METHODDEF
527 _QUEUE_SIMPLEQUEUE_PUT_METHODDEF
528 _QUEUE_SIMPLEQUEUE_PUT_NOWAIT_METHODDEF
529 _QUEUE_SIMPLEQUEUE_QSIZE_METHODDEF
530 {"__class_getitem__", Py_GenericAlias,
531 METH_O|METH_CLASS, PyDoc_STR("See PEP 585")},
532 {NULL, NULL} /* sentinel */
533 };
534
535 static struct PyMemberDef simplequeue_members[] = {
536 {"__weaklistoffset__", Py_T_PYSSIZET, offsetof(simplequeueobject, weakreflist), Py_READONLY},
537 {NULL},
538 };
539
540 static PyType_Slot simplequeue_slots[] = {
541 {Py_tp_dealloc, simplequeue_dealloc},
542 {Py_tp_doc, (void *)simplequeue_new__doc__},
543 {Py_tp_traverse, simplequeue_traverse},
544 {Py_tp_clear, simplequeue_clear},
545 {Py_tp_members, simplequeue_members},
546 {Py_tp_methods, simplequeue_methods},
547 {Py_tp_new, simplequeue_new},
548 {0, NULL},
549 };
550
551 static PyType_Spec simplequeue_spec = {
552 .name = "_queue.SimpleQueue",
553 .basicsize = sizeof(simplequeueobject),
554 .flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_GC |
555 Py_TPFLAGS_IMMUTABLETYPE),
556 .slots = simplequeue_slots,
557 };
558
559
560 /* Initialization function */
561
562 PyDoc_STRVAR(queue_module_doc,
563 "C implementation of the Python queue module.\n\
564 This module is an implementation detail, please do not use it directly.");
565
566 static int
queuemodule_exec(PyObject * module)567 queuemodule_exec(PyObject *module)
568 {
569 simplequeue_state *state = simplequeue_get_state(module);
570
571 state->EmptyError = PyErr_NewExceptionWithDoc(
572 "_queue.Empty",
573 "Exception raised by Queue.get(block=0)/get_nowait().",
574 NULL, NULL);
575 if (state->EmptyError == NULL) {
576 return -1;
577 }
578 if (PyModule_AddObjectRef(module, "Empty", state->EmptyError) < 0) {
579 return -1;
580 }
581
582 state->SimpleQueueType = (PyTypeObject *)PyType_FromModuleAndSpec(
583 module, &simplequeue_spec, NULL);
584 if (state->SimpleQueueType == NULL) {
585 return -1;
586 }
587 if (PyModule_AddType(module, state->SimpleQueueType) < 0) {
588 return -1;
589 }
590
591 return 0;
592 }
593
594 static PyModuleDef_Slot queuemodule_slots[] = {
595 {Py_mod_exec, queuemodule_exec},
596 {Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED},
597 {Py_mod_gil, Py_MOD_GIL_NOT_USED},
598 {0, NULL}
599 };
600
601
602 static struct PyModuleDef queuemodule = {
603 .m_base = PyModuleDef_HEAD_INIT,
604 .m_name = "_queue",
605 .m_doc = queue_module_doc,
606 .m_size = sizeof(simplequeue_state),
607 .m_slots = queuemodule_slots,
608 .m_traverse = queue_traverse,
609 .m_clear = queue_clear,
610 .m_free = queue_free,
611 };
612
613
614 PyMODINIT_FUNC
PyInit__queue(void)615 PyInit__queue(void)
616 {
617 return PyModuleDef_Init(&queuemodule);
618 }
619