• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /******************************************************************************
2  *
3  *  Copyright 2014 Google, Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at:
8  *
9  *  http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  ******************************************************************************/
18 
19 #include <base/logging.h>
20 #include <string.h>
21 
22 #include <mutex>
23 
24 #include "check.h"
25 #include "osi/include/allocator.h"
26 #include "osi/include/fixed_queue.h"
27 #include "osi/include/list.h"
28 #include "osi/include/osi.h"
29 #include "osi/include/reactor.h"
30 #include "osi/semaphore.h"
31 
32 typedef struct fixed_queue_t {
33   list_t* list;
34   semaphore_t* enqueue_sem;
35   semaphore_t* dequeue_sem;
36   std::mutex* mutex;
37   size_t capacity;
38 
39   reactor_object_t* dequeue_object;
40   fixed_queue_cb dequeue_ready;
41   void* dequeue_context;
42 } fixed_queue_t;
43 
44 static void internal_dequeue_ready(void* context);
45 
fixed_queue_new(size_t capacity)46 fixed_queue_t* fixed_queue_new(size_t capacity) {
47   fixed_queue_t* ret =
48       static_cast<fixed_queue_t*>(osi_calloc(sizeof(fixed_queue_t)));
49 
50   ret->mutex = new std::mutex;
51   ret->capacity = capacity;
52 
53   ret->list = list_new(NULL);
54   if (!ret->list) goto error;
55 
56   ret->enqueue_sem = semaphore_new(capacity);
57   if (!ret->enqueue_sem) goto error;
58 
59   ret->dequeue_sem = semaphore_new(0);
60   if (!ret->dequeue_sem) goto error;
61 
62   return ret;
63 
64 error:
65   fixed_queue_free(ret, NULL);
66   return NULL;
67 }
68 
fixed_queue_free(fixed_queue_t * queue,fixed_queue_free_cb free_cb)69 void fixed_queue_free(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
70   if (!queue) return;
71 
72   fixed_queue_unregister_dequeue(queue);
73 
74   if (free_cb)
75     for (const list_node_t* node = list_begin(queue->list);
76          node != list_end(queue->list); node = list_next(node))
77       free_cb(list_node(node));
78 
79   list_free(queue->list);
80   semaphore_free(queue->enqueue_sem);
81   semaphore_free(queue->dequeue_sem);
82   delete queue->mutex;
83   osi_free(queue);
84 }
85 
fixed_queue_flush(fixed_queue_t * queue,fixed_queue_free_cb free_cb)86 void fixed_queue_flush(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
87   if (!queue) return;
88 
89   while (!fixed_queue_is_empty(queue)) {
90     void* data = fixed_queue_try_dequeue(queue);
91     if (free_cb != NULL) {
92       free_cb(data);
93     }
94   }
95 }
96 
fixed_queue_is_empty(fixed_queue_t * queue)97 bool fixed_queue_is_empty(fixed_queue_t* queue) {
98   if (queue == NULL) return true;
99 
100   std::lock_guard<std::mutex> lock(*queue->mutex);
101   return list_is_empty(queue->list);
102 }
103 
fixed_queue_length(fixed_queue_t * queue)104 size_t fixed_queue_length(fixed_queue_t* queue) {
105   if (queue == NULL) return 0;
106 
107   std::lock_guard<std::mutex> lock(*queue->mutex);
108   return list_length(queue->list);
109 }
110 
fixed_queue_capacity(fixed_queue_t * queue)111 size_t fixed_queue_capacity(fixed_queue_t* queue) {
112   CHECK(queue != NULL);
113 
114   return queue->capacity;
115 }
116 
fixed_queue_enqueue(fixed_queue_t * queue,void * data)117 void fixed_queue_enqueue(fixed_queue_t* queue, void* data) {
118   CHECK(queue != NULL);
119   CHECK(data != NULL);
120 
121   semaphore_wait(queue->enqueue_sem);
122 
123   {
124     std::lock_guard<std::mutex> lock(*queue->mutex);
125     list_append(queue->list, data);
126   }
127 
128   semaphore_post(queue->dequeue_sem);
129 }
130 
fixed_queue_dequeue(fixed_queue_t * queue)131 void* fixed_queue_dequeue(fixed_queue_t* queue) {
132   CHECK(queue != NULL);
133 
134   semaphore_wait(queue->dequeue_sem);
135 
136   void* ret = NULL;
137   {
138     std::lock_guard<std::mutex> lock(*queue->mutex);
139     ret = list_front(queue->list);
140     list_remove(queue->list, ret);
141   }
142 
143   semaphore_post(queue->enqueue_sem);
144 
145   return ret;
146 }
147 
fixed_queue_try_enqueue(fixed_queue_t * queue,void * data)148 bool fixed_queue_try_enqueue(fixed_queue_t* queue, void* data) {
149   CHECK(queue != NULL);
150   CHECK(data != NULL);
151 
152   if (!semaphore_try_wait(queue->enqueue_sem)) return false;
153 
154   {
155     std::lock_guard<std::mutex> lock(*queue->mutex);
156     list_append(queue->list, data);
157   }
158 
159   semaphore_post(queue->dequeue_sem);
160   return true;
161 }
162 
fixed_queue_try_dequeue(fixed_queue_t * queue)163 void* fixed_queue_try_dequeue(fixed_queue_t* queue) {
164   if (queue == NULL) return NULL;
165 
166   if (!semaphore_try_wait(queue->dequeue_sem)) return NULL;
167 
168   void* ret = NULL;
169   {
170     std::lock_guard<std::mutex> lock(*queue->mutex);
171     ret = list_front(queue->list);
172     list_remove(queue->list, ret);
173   }
174 
175   semaphore_post(queue->enqueue_sem);
176 
177   return ret;
178 }
179 
fixed_queue_try_peek_first(fixed_queue_t * queue)180 void* fixed_queue_try_peek_first(fixed_queue_t* queue) {
181   if (queue == NULL) return NULL;
182 
183   std::lock_guard<std::mutex> lock(*queue->mutex);
184   return list_is_empty(queue->list) ? NULL : list_front(queue->list);
185 }
186 
fixed_queue_try_peek_last(fixed_queue_t * queue)187 void* fixed_queue_try_peek_last(fixed_queue_t* queue) {
188   if (queue == NULL) return NULL;
189 
190   std::lock_guard<std::mutex> lock(*queue->mutex);
191   return list_is_empty(queue->list) ? NULL : list_back(queue->list);
192 }
193 
fixed_queue_try_remove_from_queue(fixed_queue_t * queue,void * data)194 void* fixed_queue_try_remove_from_queue(fixed_queue_t* queue, void* data) {
195   if (queue == NULL) return NULL;
196 
197   bool removed = false;
198   {
199     std::lock_guard<std::mutex> lock(*queue->mutex);
200     if (list_contains(queue->list, data) &&
201         semaphore_try_wait(queue->dequeue_sem)) {
202       removed = list_remove(queue->list, data);
203       CHECK(removed);
204     }
205   }
206 
207   if (removed) {
208     semaphore_post(queue->enqueue_sem);
209     return data;
210   }
211   return NULL;
212 }
213 
fixed_queue_get_list(fixed_queue_t * queue)214 list_t* fixed_queue_get_list(fixed_queue_t* queue) {
215   CHECK(queue != NULL);
216 
217   // NOTE: Using the list in this way is not thread-safe.
218   // Using this list in any context where threads can call other functions
219   // to the queue can break our assumptions and the queue in general.
220   return queue->list;
221 }
222 
fixed_queue_get_dequeue_fd(const fixed_queue_t * queue)223 int fixed_queue_get_dequeue_fd(const fixed_queue_t* queue) {
224   CHECK(queue != NULL);
225   return semaphore_get_fd(queue->dequeue_sem);
226 }
227 
fixed_queue_get_enqueue_fd(const fixed_queue_t * queue)228 int fixed_queue_get_enqueue_fd(const fixed_queue_t* queue) {
229   CHECK(queue != NULL);
230   return semaphore_get_fd(queue->enqueue_sem);
231 }
232 
fixed_queue_register_dequeue(fixed_queue_t * queue,reactor_t * reactor,fixed_queue_cb ready_cb,void * context)233 void fixed_queue_register_dequeue(fixed_queue_t* queue, reactor_t* reactor,
234                                   fixed_queue_cb ready_cb, void* context) {
235   CHECK(queue != NULL);
236   CHECK(reactor != NULL);
237   CHECK(ready_cb != NULL);
238 
239   // Make sure we're not already registered
240   fixed_queue_unregister_dequeue(queue);
241 
242   queue->dequeue_ready = ready_cb;
243   queue->dequeue_context = context;
244   queue->dequeue_object =
245       reactor_register(reactor, fixed_queue_get_dequeue_fd(queue), queue,
246                        internal_dequeue_ready, NULL);
247 }
248 
fixed_queue_unregister_dequeue(fixed_queue_t * queue)249 void fixed_queue_unregister_dequeue(fixed_queue_t* queue) {
250   CHECK(queue != NULL);
251 
252   if (queue->dequeue_object) {
253     reactor_unregister(queue->dequeue_object);
254     queue->dequeue_object = NULL;
255   }
256 }
257 
internal_dequeue_ready(void * context)258 static void internal_dequeue_ready(void* context) {
259   CHECK(context != NULL);
260 
261   fixed_queue_t* queue = static_cast<fixed_queue_t*>(context);
262   queue->dequeue_ready(queue, queue->dequeue_context);
263 }
264