1 /******************************************************************************
2 *
3 * Copyright (C) 2014 Google, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ******************************************************************************/
18
19 #include <assert.h>
20 #include <pthread.h>
21 #include <stdlib.h>
22
23 #include "fixed_queue.h"
24 #include "list.h"
25 #include "osi.h"
26 #include "semaphore.h"
27
28 typedef struct fixed_queue_t {
29 list_t *list;
30 semaphore_t *enqueue_sem;
31 semaphore_t *dequeue_sem;
32 pthread_mutex_t lock;
33 size_t capacity;
34 } fixed_queue_t;
35
fixed_queue_new(size_t capacity)36 fixed_queue_t *fixed_queue_new(size_t capacity) {
37 fixed_queue_t *ret = calloc(1, sizeof(fixed_queue_t));
38 if (!ret)
39 goto error;
40
41 ret->list = list_new(NULL);
42 if (!ret->list)
43 goto error;
44
45 ret->enqueue_sem = semaphore_new(capacity);
46 if (!ret->enqueue_sem)
47 goto error;
48
49 ret->dequeue_sem = semaphore_new(0);
50 if (!ret->dequeue_sem)
51 goto error;
52
53 pthread_mutex_init(&ret->lock, NULL);
54 ret->capacity = capacity;
55
56 return ret;
57
58 error:;
59 if (ret) {
60 list_free(ret->list);
61 semaphore_free(ret->enqueue_sem);
62 semaphore_free(ret->dequeue_sem);
63 }
64
65 free(ret);
66 return NULL;
67 }
68
fixed_queue_free(fixed_queue_t * queue,fixed_queue_free_cb free_cb)69 void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb) {
70 if (!queue)
71 return;
72
73 if (free_cb)
74 for (const list_node_t *node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node))
75 free_cb(list_node(node));
76
77 list_free(queue->list);
78 semaphore_free(queue->enqueue_sem);
79 semaphore_free(queue->dequeue_sem);
80 pthread_mutex_destroy(&queue->lock);
81 free(queue);
82 }
83
fixed_queue_enqueue(fixed_queue_t * queue,void * data)84 void fixed_queue_enqueue(fixed_queue_t *queue, void *data) {
85 assert(queue != NULL);
86 assert(data != NULL);
87
88 semaphore_wait(queue->enqueue_sem);
89
90 pthread_mutex_lock(&queue->lock);
91 list_append(queue->list, data);
92 pthread_mutex_unlock(&queue->lock);
93
94 semaphore_post(queue->dequeue_sem);
95 }
96
fixed_queue_dequeue(fixed_queue_t * queue)97 void *fixed_queue_dequeue(fixed_queue_t *queue) {
98 assert(queue != NULL);
99
100 semaphore_wait(queue->dequeue_sem);
101
102 pthread_mutex_lock(&queue->lock);
103 void *ret = list_front(queue->list);
104 list_remove(queue->list, ret);
105 pthread_mutex_unlock(&queue->lock);
106
107 semaphore_post(queue->enqueue_sem);
108
109 return ret;
110 }
111
fixed_queue_try_enqueue(fixed_queue_t * queue,void * data)112 bool fixed_queue_try_enqueue(fixed_queue_t *queue, void *data) {
113 assert(queue != NULL);
114 assert(data != NULL);
115
116 if (!semaphore_try_wait(queue->enqueue_sem))
117 return false;
118
119 pthread_mutex_lock(&queue->lock);
120 list_append(queue->list, data);
121 pthread_mutex_unlock(&queue->lock);
122
123 semaphore_post(queue->dequeue_sem);
124 return true;
125 }
126
fixed_queue_try_dequeue(fixed_queue_t * queue)127 void *fixed_queue_try_dequeue(fixed_queue_t *queue) {
128 assert(queue != NULL);
129
130 if (!semaphore_try_wait(queue->dequeue_sem))
131 return NULL;
132
133 pthread_mutex_lock(&queue->lock);
134 void *ret = list_front(queue->list);
135 list_remove(queue->list, ret);
136 pthread_mutex_unlock(&queue->lock);
137
138 semaphore_post(queue->enqueue_sem);
139
140 return ret;
141 }
142
fixed_queue_get_dequeue_fd(const fixed_queue_t * queue)143 int fixed_queue_get_dequeue_fd(const fixed_queue_t *queue) {
144 assert(queue != NULL);
145 return semaphore_get_fd(queue->dequeue_sem);
146 }
147
fixed_queue_get_enqueue_fd(const fixed_queue_t * queue)148 int fixed_queue_get_enqueue_fd(const fixed_queue_t *queue) {
149 assert(queue != NULL);
150 return semaphore_get_fd(queue->enqueue_sem);
151 }
152