Lines Matching refs:queues
846 _queues_init(_queues *queues, PyThread_type_lock mutex) in _queues_init() argument
849 assert(queues->mutex == NULL); in _queues_init()
850 *queues = (_queues){ in _queues_init()
859 _queues_fini(_queues *queues, PyThread_type_lock *p_mutex) in _queues_fini() argument
861 PyThread_type_lock mutex = queues->mutex; in _queues_fini()
865 if (queues->count > 0) { in _queues_fini()
866 assert(queues->head != NULL); in _queues_fini()
867 _queuerefs_clear(queues->head); in _queues_fini()
869 *queues = (_queues){0}; in _queues_fini()
876 _queues_next_id(_queues *queues) // needs lock in _queues_next_id() argument
878 int64_t qid = queues->next_id; in _queues_next_id()
883 queues->next_id += 1; in _queues_next_id()
888 _queues_lookup(_queues *queues, int64_t qid, _queue **res) in _queues_lookup() argument
890 PyThread_acquire_lock(queues->mutex, WAIT_LOCK); in _queues_lookup()
892 _queueref *ref = _queuerefs_find(queues->head, qid, NULL); in _queues_lookup()
894 PyThread_release_lock(queues->mutex); in _queues_lookup()
902 PyThread_release_lock(queues->mutex); in _queues_lookup()
909 _queues_add(_queues *queues, _queue *queue) in _queues_add() argument
912 PyThread_acquire_lock(queues->mutex, WAIT_LOCK); in _queues_add()
915 int64_t _qid = _queues_next_id(queues); in _queues_add()
931 ref->next = queues->head; in _queues_add()
932 queues->head = ref; in _queues_add()
933 queues->count += 1; in _queues_add()
937 PyThread_release_lock(queues->mutex); in _queues_add()
942 _queues_remove_ref(_queues *queues, _queueref *ref, _queueref *prev, in _queues_remove_ref() argument
947 if (ref == queues->head) { in _queues_remove_ref()
948 queues->head = ref->next; in _queues_remove_ref()
954 queues->count -= 1; in _queues_remove_ref()
962 _queues_remove(_queues *queues, int64_t qid, _queue **p_queue) in _queues_remove() argument
964 PyThread_acquire_lock(queues->mutex, WAIT_LOCK); in _queues_remove()
967 _queueref *ref = _queuerefs_find(queues->head, qid, &prev); in _queues_remove()
969 PyThread_release_lock(queues->mutex); in _queues_remove()
973 _queues_remove_ref(queues, ref, prev, p_queue); in _queues_remove()
974 PyThread_release_lock(queues->mutex); in _queues_remove()
980 _queues_incref(_queues *queues, int64_t qid) in _queues_incref() argument
984 PyThread_acquire_lock(queues->mutex, WAIT_LOCK); in _queues_incref()
986 _queueref *ref = _queuerefs_find(queues->head, qid, NULL); in _queues_incref()
996 PyThread_release_lock(queues->mutex); in _queues_incref()
1001 _queues_decref(_queues *queues, int64_t qid) in _queues_decref() argument
1004 PyThread_acquire_lock(queues->mutex, WAIT_LOCK); in _queues_decref()
1007 _queueref *ref = _queuerefs_find(queues->head, qid, &prev); in _queues_decref()
1024 _queues_remove_ref(queues, ref, prev, &queue); in _queues_decref()
1025 PyThread_release_lock(queues->mutex); in _queues_decref()
1034 PyThread_release_lock(queues->mutex); in _queues_decref()
1045 _queues_list_all(_queues *queues, int64_t *p_count) in _queues_list_all() argument
1048 PyThread_acquire_lock(queues->mutex, WAIT_LOCK); in _queues_list_all()
1050 (Py_ssize_t)(queues->count)); in _queues_list_all()
1054 _queueref *ref = queues->head; in _queues_list_all()
1061 *p_count = queues->count; in _queues_list_all()
1065 PyThread_release_lock(queues->mutex); in _queues_list_all()
1070 _queues_clear_interpreter(_queues *queues, int64_t interpid) in _queues_clear_interpreter() argument
1072 PyThread_acquire_lock(queues->mutex, WAIT_LOCK); in _queues_clear_interpreter()
1074 _queueref *ref = queues->head; in _queues_clear_interpreter()
1080 PyThread_release_lock(queues->mutex); in _queues_clear_interpreter()
1095 queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop) in queue_create() argument
1106 int64_t qid = _queues_add(queues, queue); in queue_create()
1116 queue_destroy(_queues *queues, int64_t qid) in queue_destroy() argument
1119 int err = _queues_remove(queues, qid, &queue); in queue_destroy()
1130 queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop) in queue_put() argument
1134 int err = _queues_lookup(queues, qid, &queue); in queue_put()
1143 _queue_unmark_waiter(queue, queues->mutex); in queue_put()
1147 _queue_unmark_waiter(queue, queues->mutex); in queue_put()
1157 _queue_unmark_waiter(queue, queues->mutex); in queue_put()
1171 queue_get(_queues *queues, int64_t qid, in queue_get() argument
1179 err = _queues_lookup(queues, qid, &queue); in queue_get()
1189 _queue_unmark_waiter(queue, queues->mutex); in queue_get()
1220 queue_get_maxsize(_queues *queues, int64_t qid, Py_ssize_t *p_maxsize) in queue_get_maxsize() argument
1223 int err = _queues_lookup(queues, qid, &queue); in queue_get_maxsize()
1228 _queue_unmark_waiter(queue, queues->mutex); in queue_get_maxsize()
1233 queue_is_full(_queues *queues, int64_t qid, int *p_is_full) in queue_is_full() argument
1236 int err = _queues_lookup(queues, qid, &queue); in queue_is_full()
1241 _queue_unmark_waiter(queue, queues->mutex); in queue_is_full()
1246 queue_get_count(_queues *queues, int64_t qid, Py_ssize_t *p_count) in queue_get_count() argument
1249 int err = _queues_lookup(queues, qid, &queue); in queue_get_count()
1254 _queue_unmark_waiter(queue, queues->mutex); in queue_get_count()
1311 _queues *queues = _get_global_queues(); in _queueid_xid_new() local
1312 if (_queues_incref(queues, qid) < 0) { in _queueid_xid_new()
1318 _queues_decref(queues, qid); in _queueid_xid_new()
1330 _queues *queues = _get_global_queues(); in _queueid_xid_free() local
1331 int res = _queues_decref(queues, qid); in _queueid_xid_free()
1406 _queues queues; member
1423 _queues_init(&_globals.queues, mutex); in _globals_init()
1437 _queues_fini(&_globals.queues, &mutex); in _globals_fini()
1447 return &_globals.queues; in _get_global_queues()
1460 _queues_clear_interpreter(&_globals.queues, interpid); in clear_interpreter()
1495 int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop); in queuesmod_create()
1504 int err = queue_destroy(&_globals.queues, qid); in queuesmod_create()
1536 int err = queue_destroy(&_globals.queues, qid); in queuesmod_destroy()
1553 struct queue_id_and_info *qids = _queues_list_all(&_globals.queues, &count); in queuesmod_list_all()
1608 int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop); in queuesmod_put()
1636 int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop); in queuesmod_get()
1671 int err = _queues_incref(&_globals.queues, qid); in queuesmod_bind()
1703 int err = _queues_decref(&_globals.queues, qid); in queuesmod_release()
1730 int err = queue_get_maxsize(&_globals.queues, qid, &maxsize); in queuesmod_get_maxsize()
1755 int err = _queues_lookup(&_globals.queues, qid, &queue); in queuesmod_get_queue_defaults()
1761 _queue_unmark_waiter(queue, _globals.queues.mutex); in queuesmod_get_queue_defaults()
1785 int err = queue_is_full(&_globals.queues, qid, &is_full); in queuesmod_is_full()
1813 int err = queue_get_count(&_globals.queues, qid, &count); in queuesmod_get_count()