• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #ifndef Py_BUILD_CORE_BUILTIN
2 #  define Py_BUILD_CORE_MODULE 1
3 #endif
4 
5 #include "Python.h"
6 #include "pycore_ceval.h"         // Py_MakePendingCalls()
7 #include "pycore_moduleobject.h"  // _PyModule_GetState()
8 #include "pycore_parking_lot.h"
9 #include "pycore_time.h"          // _PyTime_FromSecondsObject()
10 
11 #include <stdbool.h>
12 #include <stddef.h>               // offsetof()
13 
14 typedef struct {
15     PyTypeObject *SimpleQueueType;
16     PyObject *EmptyError;
17 } simplequeue_state;
18 
19 static simplequeue_state *
simplequeue_get_state(PyObject * module)20 simplequeue_get_state(PyObject *module)
21 {
22     simplequeue_state *state = _PyModule_GetState(module);
23     assert(state);
24     return state;
25 }
26 static struct PyModuleDef queuemodule;
27 #define simplequeue_get_state_by_type(type) \
28     (simplequeue_get_state(PyType_GetModuleByDef(type, &queuemodule)))
29 
30 static const Py_ssize_t INITIAL_RING_BUF_CAPACITY = 8;
31 
32 typedef struct {
33     // Where to place the next item
34     Py_ssize_t put_idx;
35 
36     // Where to get the next item
37     Py_ssize_t get_idx;
38 
39     PyObject **items;
40 
41     // Total number of items that may be stored
42     Py_ssize_t items_cap;
43 
44     // Number of items stored
45     Py_ssize_t num_items;
46 } RingBuf;
47 
48 static int
RingBuf_Init(RingBuf * buf)49 RingBuf_Init(RingBuf *buf)
50 {
51     buf->put_idx = 0;
52     buf->get_idx = 0;
53     buf->items_cap = INITIAL_RING_BUF_CAPACITY;
54     buf->num_items = 0;
55     buf->items = PyMem_Calloc(buf->items_cap, sizeof(PyObject *));
56     if (buf->items == NULL) {
57         PyErr_NoMemory();
58         return -1;
59     }
60     return 0;
61 }
62 
63 static PyObject *
RingBuf_At(RingBuf * buf,Py_ssize_t idx)64 RingBuf_At(RingBuf *buf, Py_ssize_t idx)
65 {
66     assert(idx >= 0 && idx < buf->num_items);
67     return buf->items[(buf->get_idx + idx) % buf->items_cap];
68 }
69 
70 static void
RingBuf_Fini(RingBuf * buf)71 RingBuf_Fini(RingBuf *buf)
72 {
73     PyObject **items = buf->items;
74     Py_ssize_t num_items = buf->num_items;
75     Py_ssize_t cap = buf->items_cap;
76     Py_ssize_t idx = buf->get_idx;
77     buf->items = NULL;
78     buf->put_idx = 0;
79     buf->get_idx = 0;
80     buf->num_items = 0;
81     buf->items_cap = 0;
82     for (Py_ssize_t n = num_items; n > 0; idx = (idx + 1) % cap, n--) {
83         Py_DECREF(items[idx]);
84     }
85     PyMem_Free(items);
86 }
87 
88 // Resize the underlying items array of buf to the new capacity and arrange
89 // the items contiguously in the new items array.
90 //
91 // Returns -1 on allocation failure or 0 on success.
92 static int
resize_ringbuf(RingBuf * buf,Py_ssize_t capacity)93 resize_ringbuf(RingBuf *buf, Py_ssize_t capacity)
94 {
95     Py_ssize_t new_capacity = Py_MAX(INITIAL_RING_BUF_CAPACITY, capacity);
96     if (new_capacity == buf->items_cap) {
97         return 0;
98     }
99     assert(buf->num_items <= new_capacity);
100 
101     PyObject **new_items = PyMem_Calloc(new_capacity, sizeof(PyObject *));
102     if (new_items == NULL) {
103         return -1;
104     }
105 
106     // Copy the "tail" of the old items array. This corresponds to "head" of
107     // the abstract ring buffer.
108     Py_ssize_t tail_size =
109         Py_MIN(buf->num_items, buf->items_cap - buf->get_idx);
110     if (tail_size > 0) {
111         memcpy(new_items, buf->items + buf->get_idx,
112                tail_size * sizeof(PyObject *));
113     }
114 
115     // Copy the "head" of the old items array, if any. This corresponds to the
116     // "tail" of the abstract ring buffer.
117     Py_ssize_t head_size = buf->num_items - tail_size;
118     if (head_size > 0) {
119         memcpy(new_items + tail_size, buf->items,
120                head_size * sizeof(PyObject *));
121     }
122 
123     PyMem_Free(buf->items);
124     buf->items = new_items;
125     buf->items_cap = new_capacity;
126     buf->get_idx = 0;
127     buf->put_idx = buf->num_items;
128 
129     return 0;
130 }
131 
132 // Returns a strong reference from the head of the buffer.
133 static PyObject *
RingBuf_Get(RingBuf * buf)134 RingBuf_Get(RingBuf *buf)
135 {
136     assert(buf->num_items > 0);
137 
138     if (buf->num_items < (buf->items_cap / 4)) {
139         // Items is less than 25% occupied, shrink it by 50%. This allows for
140         // growth without immediately needing to resize the underlying items
141         // array.
142         //
143         // It's safe it ignore allocation failures here; shrinking is an
144         // optimization that isn't required for correctness.
145         (void)resize_ringbuf(buf, buf->items_cap / 2);
146     }
147 
148     PyObject *item = buf->items[buf->get_idx];
149     buf->items[buf->get_idx] = NULL;
150     buf->get_idx = (buf->get_idx + 1) % buf->items_cap;
151     buf->num_items--;
152     return item;
153 }
154 
155 // Returns 0 on success or -1 if the buffer failed to grow.
156 //
157 // Steals a reference to item.
158 static int
RingBuf_Put(RingBuf * buf,PyObject * item)159 RingBuf_Put(RingBuf *buf, PyObject *item)
160 {
161     assert(buf->num_items <= buf->items_cap);
162 
163     if (buf->num_items == buf->items_cap) {
164         // Buffer is full, grow it.
165         if (resize_ringbuf(buf, buf->items_cap * 2) < 0) {
166             PyErr_NoMemory();
167             return -1;
168         }
169     }
170     buf->items[buf->put_idx] = item;
171     buf->put_idx = (buf->put_idx + 1) % buf->items_cap;
172     buf->num_items++;
173     return 0;
174 }
175 
176 static Py_ssize_t
RingBuf_Len(RingBuf * buf)177 RingBuf_Len(RingBuf *buf)
178 {
179     return buf->num_items;
180 }
181 
182 static bool
RingBuf_IsEmpty(RingBuf * buf)183 RingBuf_IsEmpty(RingBuf *buf)
184 {
185     return buf->num_items == 0;
186 }
187 
188 typedef struct {
189     PyObject_HEAD
190 
191     // Are there threads waiting for items
192     bool has_threads_waiting;
193 
194     // Items in the queue
195     RingBuf buf;
196 
197     PyObject *weakreflist;
198 } simplequeueobject;
199 
200 /*[clinic input]
201 module _queue
202 class _queue.SimpleQueue "simplequeueobject *" "simplequeue_get_state_by_type(type)->SimpleQueueType"
203 [clinic start generated code]*/
204 /*[clinic end generated code: output=da39a3ee5e6b4b0d input=0a4023fe4d198c8d]*/
205 
206 static int
simplequeue_clear(simplequeueobject * self)207 simplequeue_clear(simplequeueobject *self)
208 {
209     RingBuf_Fini(&self->buf);
210     return 0;
211 }
212 
213 static void
simplequeue_dealloc(simplequeueobject * self)214 simplequeue_dealloc(simplequeueobject *self)
215 {
216     PyTypeObject *tp = Py_TYPE(self);
217 
218     PyObject_GC_UnTrack(self);
219     (void)simplequeue_clear(self);
220     if (self->weakreflist != NULL)
221         PyObject_ClearWeakRefs((PyObject *) self);
222     Py_TYPE(self)->tp_free(self);
223     Py_DECREF(tp);
224 }
225 
226 static int
simplequeue_traverse(simplequeueobject * self,visitproc visit,void * arg)227 simplequeue_traverse(simplequeueobject *self, visitproc visit, void *arg)
228 {
229     RingBuf *buf = &self->buf;
230     for (Py_ssize_t i = 0, num_items = buf->num_items; i < num_items; i++) {
231         Py_VISIT(RingBuf_At(buf, i));
232     }
233     Py_VISIT(Py_TYPE(self));
234     return 0;
235 }
236 
237 /*[clinic input]
238 @classmethod
239 _queue.SimpleQueue.__new__ as simplequeue_new
240 
241 Simple, unbounded, reentrant FIFO queue.
242 [clinic start generated code]*/
243 
244 static PyObject *
simplequeue_new_impl(PyTypeObject * type)245 simplequeue_new_impl(PyTypeObject *type)
246 /*[clinic end generated code: output=ba97740608ba31cd input=a0674a1643e3e2fb]*/
247 {
248     simplequeueobject *self;
249 
250     self = (simplequeueobject *) type->tp_alloc(type, 0);
251     if (self != NULL) {
252         self->weakreflist = NULL;
253         if (RingBuf_Init(&self->buf) < 0) {
254             Py_DECREF(self);
255             return NULL;
256         }
257     }
258 
259     return (PyObject *) self;
260 }
261 
262 typedef struct {
263     bool handed_off;
264     simplequeueobject *queue;
265     PyObject *item;
266 } HandoffData;
267 
268 static void
maybe_handoff_item(HandoffData * data,PyObject ** item,int has_more_waiters)269 maybe_handoff_item(HandoffData *data, PyObject **item, int has_more_waiters)
270 {
271     if (item == NULL) {
272         // No threads were waiting
273         data->handed_off = false;
274     }
275     else {
276         // There was at least one waiting thread, hand off the item
277         *item = data->item;
278         data->handed_off = true;
279     }
280     data->queue->has_threads_waiting = has_more_waiters;
281 }
282 
283 /*[clinic input]
284 @critical_section
285 _queue.SimpleQueue.put
286     item: object
287     block: bool = True
288     timeout: object = None
289 
290 Put the item on the queue.
291 
292 The optional 'block' and 'timeout' arguments are ignored, as this method
293 never blocks.  They are provided for compatibility with the Queue class.
294 
295 [clinic start generated code]*/
296 
297 static PyObject *
_queue_SimpleQueue_put_impl(simplequeueobject * self,PyObject * item,int block,PyObject * timeout)298 _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
299                             int block, PyObject *timeout)
300 /*[clinic end generated code: output=4333136e88f90d8b input=a16dbb33363c0fa8]*/
301 {
302     HandoffData data = {
303         .handed_off = 0,
304         .item = Py_NewRef(item),
305         .queue = self,
306     };
307     if (self->has_threads_waiting) {
308         // Try to hand the item off directly if there are threads waiting
309         _PyParkingLot_Unpark(&self->has_threads_waiting,
310                              (_Py_unpark_fn_t *)maybe_handoff_item, &data);
311     }
312     if (!data.handed_off) {
313         if (RingBuf_Put(&self->buf, item) < 0) {
314             return NULL;
315         }
316     }
317     Py_RETURN_NONE;
318 }
319 
320 /*[clinic input]
321 @critical_section
322 _queue.SimpleQueue.put_nowait
323     item: object
324 
325 Put an item into the queue without blocking.
326 
327 This is exactly equivalent to `put(item)` and is only provided
328 for compatibility with the Queue class.
329 
330 [clinic start generated code]*/
331 
332 static PyObject *
_queue_SimpleQueue_put_nowait_impl(simplequeueobject * self,PyObject * item)333 _queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item)
334 /*[clinic end generated code: output=0990536715efb1f1 input=ce949cc2cd8a4119]*/
335 {
336     return _queue_SimpleQueue_put_impl(self, item, 0, Py_None);
337 }
338 
339 static PyObject *
empty_error(PyTypeObject * cls)340 empty_error(PyTypeObject *cls)
341 {
342     PyObject *module = PyType_GetModule(cls);
343     assert(module != NULL);
344     simplequeue_state *state = simplequeue_get_state(module);
345     PyErr_SetNone(state->EmptyError);
346     return NULL;
347 }
348 
349 /*[clinic input]
350 @critical_section
351 _queue.SimpleQueue.get
352 
353     cls: defining_class
354     /
355     block: bool = True
356     timeout as timeout_obj: object = None
357 
358 Remove and return an item from the queue.
359 
360 If optional args 'block' is true and 'timeout' is None (the default),
361 block if necessary until an item is available. If 'timeout' is
362 a non-negative number, it blocks at most 'timeout' seconds and raises
363 the Empty exception if no item was available within that time.
364 Otherwise ('block' is false), return an item if one is immediately
365 available, else raise the Empty exception ('timeout' is ignored
366 in that case).
367 
368 [clinic start generated code]*/
369 
370 static PyObject *
_queue_SimpleQueue_get_impl(simplequeueobject * self,PyTypeObject * cls,int block,PyObject * timeout_obj)371 _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
372                             int block, PyObject *timeout_obj)
373 /*[clinic end generated code: output=5c2cca914cd1e55b input=f7836c65e5839c51]*/
374 {
375     PyTime_t endtime = 0;
376 
377     // XXX Use PyThread_ParseTimeoutArg().
378 
379     if (block != 0 && !Py_IsNone(timeout_obj)) {
380         /* With timeout */
381         PyTime_t timeout;
382         if (_PyTime_FromSecondsObject(&timeout,
383                                       timeout_obj, _PyTime_ROUND_CEILING) < 0) {
384             return NULL;
385         }
386         if (timeout < 0) {
387             PyErr_SetString(PyExc_ValueError,
388                             "'timeout' must be a non-negative number");
389             return NULL;
390         }
391         endtime = _PyDeadline_Init(timeout);
392     }
393 
394     for (;;) {
395         if (!RingBuf_IsEmpty(&self->buf)) {
396             return RingBuf_Get(&self->buf);
397         }
398 
399         if (!block) {
400             return empty_error(cls);
401         }
402 
403         int64_t timeout_ns = -1;
404         if (endtime != 0) {
405             timeout_ns = _PyDeadline_Get(endtime);
406             if (timeout_ns < 0) {
407                 return empty_error(cls);
408             }
409         }
410 
411         bool waiting = 1;
412         self->has_threads_waiting = waiting;
413 
414         PyObject *item = NULL;
415         int st = _PyParkingLot_Park(&self->has_threads_waiting, &waiting,
416                                     sizeof(bool), timeout_ns, &item,
417                                     /* detach */ 1);
418         switch (st) {
419             case Py_PARK_OK: {
420                 assert(item != NULL);
421                 return item;
422             }
423             case Py_PARK_TIMEOUT: {
424                 return empty_error(cls);
425             }
426             case Py_PARK_INTR: {
427                 // Interrupted
428                 if (Py_MakePendingCalls() < 0) {
429                     return NULL;
430                 }
431                 break;
432             }
433             case Py_PARK_AGAIN: {
434                 // This should be impossible with the current implementation of
435                 // PyParkingLot, but would be possible if critical sections /
436                 // the GIL were released before the thread was added to the
437                 // internal thread queue in the parking lot.
438                 break;
439             }
440             default: {
441                 Py_UNREACHABLE();
442             }
443         }
444     }
445 }
446 
447 /*[clinic input]
448 @critical_section
449 _queue.SimpleQueue.get_nowait
450 
451     cls: defining_class
452     /
453 
454 Remove and return an item from the queue without blocking.
455 
456 Only get an item if one is immediately available. Otherwise
457 raise the Empty exception.
458 [clinic start generated code]*/
459 
460 static PyObject *
_queue_SimpleQueue_get_nowait_impl(simplequeueobject * self,PyTypeObject * cls)461 _queue_SimpleQueue_get_nowait_impl(simplequeueobject *self,
462                                    PyTypeObject *cls)
463 /*[clinic end generated code: output=620c58e2750f8b8a input=d48be63633fefae9]*/
464 {
465     return _queue_SimpleQueue_get_impl(self, cls, 0, Py_None);
466 }
467 
468 /*[clinic input]
469 @critical_section
470 _queue.SimpleQueue.empty -> bool
471 
472 Return True if the queue is empty, False otherwise (not reliable!).
473 [clinic start generated code]*/
474 
475 static int
_queue_SimpleQueue_empty_impl(simplequeueobject * self)476 _queue_SimpleQueue_empty_impl(simplequeueobject *self)
477 /*[clinic end generated code: output=1a02a1b87c0ef838 input=96cb22df5a67d831]*/
478 {
479     return RingBuf_IsEmpty(&self->buf);
480 }
481 
482 /*[clinic input]
483 @critical_section
484 _queue.SimpleQueue.qsize -> Py_ssize_t
485 
486 Return the approximate size of the queue (not reliable!).
487 [clinic start generated code]*/
488 
489 static Py_ssize_t
_queue_SimpleQueue_qsize_impl(simplequeueobject * self)490 _queue_SimpleQueue_qsize_impl(simplequeueobject *self)
491 /*[clinic end generated code: output=f9dcd9d0a90e121e input=e218623cb8c16a79]*/
492 {
493     return RingBuf_Len(&self->buf);
494 }
495 
496 static int
queue_traverse(PyObject * m,visitproc visit,void * arg)497 queue_traverse(PyObject *m, visitproc visit, void *arg)
498 {
499     simplequeue_state *state = simplequeue_get_state(m);
500     Py_VISIT(state->SimpleQueueType);
501     Py_VISIT(state->EmptyError);
502     return 0;
503 }
504 
505 static int
queue_clear(PyObject * m)506 queue_clear(PyObject *m)
507 {
508     simplequeue_state *state = simplequeue_get_state(m);
509     Py_CLEAR(state->SimpleQueueType);
510     Py_CLEAR(state->EmptyError);
511     return 0;
512 }
513 
514 static void
queue_free(void * m)515 queue_free(void *m)
516 {
517     queue_clear((PyObject *)m);
518 }
519 
520 #include "clinic/_queuemodule.c.h"
521 
522 
523 static PyMethodDef simplequeue_methods[] = {
524     _QUEUE_SIMPLEQUEUE_EMPTY_METHODDEF
525     _QUEUE_SIMPLEQUEUE_GET_METHODDEF
526     _QUEUE_SIMPLEQUEUE_GET_NOWAIT_METHODDEF
527     _QUEUE_SIMPLEQUEUE_PUT_METHODDEF
528     _QUEUE_SIMPLEQUEUE_PUT_NOWAIT_METHODDEF
529     _QUEUE_SIMPLEQUEUE_QSIZE_METHODDEF
530     {"__class_getitem__",    Py_GenericAlias,
531     METH_O|METH_CLASS,       PyDoc_STR("See PEP 585")},
532     {NULL,           NULL}              /* sentinel */
533 };
534 
535 static struct PyMemberDef simplequeue_members[] = {
536     {"__weaklistoffset__", Py_T_PYSSIZET, offsetof(simplequeueobject, weakreflist), Py_READONLY},
537     {NULL},
538 };
539 
540 static PyType_Slot simplequeue_slots[] = {
541     {Py_tp_dealloc, simplequeue_dealloc},
542     {Py_tp_doc, (void *)simplequeue_new__doc__},
543     {Py_tp_traverse, simplequeue_traverse},
544     {Py_tp_clear, simplequeue_clear},
545     {Py_tp_members, simplequeue_members},
546     {Py_tp_methods, simplequeue_methods},
547     {Py_tp_new, simplequeue_new},
548     {0, NULL},
549 };
550 
551 static PyType_Spec simplequeue_spec = {
552     .name = "_queue.SimpleQueue",
553     .basicsize = sizeof(simplequeueobject),
554     .flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_GC |
555               Py_TPFLAGS_IMMUTABLETYPE),
556     .slots = simplequeue_slots,
557 };
558 
559 
560 /* Initialization function */
561 
562 PyDoc_STRVAR(queue_module_doc,
563 "C implementation of the Python queue module.\n\
564 This module is an implementation detail, please do not use it directly.");
565 
566 static int
queuemodule_exec(PyObject * module)567 queuemodule_exec(PyObject *module)
568 {
569     simplequeue_state *state = simplequeue_get_state(module);
570 
571     state->EmptyError = PyErr_NewExceptionWithDoc(
572         "_queue.Empty",
573         "Exception raised by Queue.get(block=0)/get_nowait().",
574         NULL, NULL);
575     if (state->EmptyError == NULL) {
576         return -1;
577     }
578     if (PyModule_AddObjectRef(module, "Empty", state->EmptyError) < 0) {
579         return -1;
580     }
581 
582     state->SimpleQueueType = (PyTypeObject *)PyType_FromModuleAndSpec(
583         module, &simplequeue_spec, NULL);
584     if (state->SimpleQueueType == NULL) {
585         return -1;
586     }
587     if (PyModule_AddType(module, state->SimpleQueueType) < 0) {
588         return -1;
589     }
590 
591     return 0;
592 }
593 
594 static PyModuleDef_Slot queuemodule_slots[] = {
595     {Py_mod_exec, queuemodule_exec},
596     {Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED},
597     {Py_mod_gil, Py_MOD_GIL_NOT_USED},
598     {0, NULL}
599 };
600 
601 
602 static struct PyModuleDef queuemodule = {
603     .m_base = PyModuleDef_HEAD_INIT,
604     .m_name = "_queue",
605     .m_doc = queue_module_doc,
606     .m_size = sizeof(simplequeue_state),
607     .m_slots = queuemodule_slots,
608     .m_traverse = queue_traverse,
609     .m_clear = queue_clear,
610     .m_free = queue_free,
611 };
612 
613 
614 PyMODINIT_FUNC
PyInit__queue(void)615 PyInit__queue(void)
616 {
617    return PyModuleDef_Init(&queuemodule);
618 }
619