1 /******************************************************************************
2 *
3 * Copyright 2014 Google, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ******************************************************************************/
18
19 #include "osi/include/fixed_queue.h"
20
21 #include <bluetooth/log.h>
22 #include <string.h>
23
24 #include <mutex>
25
26 #include "osi/include/allocator.h"
27 #include "osi/include/list.h"
28 #include "osi/include/osi.h"
29 #include "osi/include/reactor.h"
30 #include "osi/semaphore.h"
31
32 using namespace bluetooth;
33
34 typedef struct fixed_queue_t {
35 list_t* list;
36 semaphore_t* enqueue_sem;
37 semaphore_t* dequeue_sem;
38 std::mutex* mutex;
39 size_t capacity;
40
41 reactor_object_t* dequeue_object;
42 fixed_queue_cb dequeue_ready;
43 void* dequeue_context;
44 } fixed_queue_t;
45
46 static void internal_dequeue_ready(void* context);
47
fixed_queue_new(size_t capacity)48 fixed_queue_t* fixed_queue_new(size_t capacity) {
49 fixed_queue_t* ret =
50 static_cast<fixed_queue_t*>(osi_calloc(sizeof(fixed_queue_t)));
51
52 ret->mutex = new std::mutex;
53 ret->capacity = capacity;
54
55 ret->list = list_new(NULL);
56 if (!ret->list) goto error;
57
58 ret->enqueue_sem = semaphore_new(capacity);
59 if (!ret->enqueue_sem) goto error;
60
61 ret->dequeue_sem = semaphore_new(0);
62 if (!ret->dequeue_sem) goto error;
63
64 return ret;
65
66 error:
67 fixed_queue_free(ret, NULL);
68 return NULL;
69 }
70
fixed_queue_free(fixed_queue_t * queue,fixed_queue_free_cb free_cb)71 void fixed_queue_free(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
72 if (!queue) return;
73
74 fixed_queue_unregister_dequeue(queue);
75
76 if (free_cb)
77 for (const list_node_t* node = list_begin(queue->list);
78 node != list_end(queue->list); node = list_next(node))
79 free_cb(list_node(node));
80
81 list_free(queue->list);
82 semaphore_free(queue->enqueue_sem);
83 semaphore_free(queue->dequeue_sem);
84 delete queue->mutex;
85 osi_free(queue);
86 }
87
fixed_queue_flush(fixed_queue_t * queue,fixed_queue_free_cb free_cb)88 void fixed_queue_flush(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
89 if (!queue) return;
90
91 while (!fixed_queue_is_empty(queue)) {
92 void* data = fixed_queue_try_dequeue(queue);
93 if (free_cb != NULL) {
94 free_cb(data);
95 }
96 }
97 }
98
fixed_queue_is_empty(fixed_queue_t * queue)99 bool fixed_queue_is_empty(fixed_queue_t* queue) {
100 if (queue == NULL) return true;
101
102 std::lock_guard<std::mutex> lock(*queue->mutex);
103 return list_is_empty(queue->list);
104 }
105
fixed_queue_length(fixed_queue_t * queue)106 size_t fixed_queue_length(fixed_queue_t* queue) {
107 if (queue == NULL) return 0;
108
109 std::lock_guard<std::mutex> lock(*queue->mutex);
110 return list_length(queue->list);
111 }
112
fixed_queue_capacity(fixed_queue_t * queue)113 size_t fixed_queue_capacity(fixed_queue_t* queue) {
114 log::assert_that(queue != NULL, "assert failed: queue != NULL");
115
116 return queue->capacity;
117 }
118
fixed_queue_enqueue(fixed_queue_t * queue,void * data)119 void fixed_queue_enqueue(fixed_queue_t* queue, void* data) {
120 log::assert_that(queue != NULL, "assert failed: queue != NULL");
121 log::assert_that(data != NULL, "assert failed: data != NULL");
122
123 semaphore_wait(queue->enqueue_sem);
124
125 {
126 std::lock_guard<std::mutex> lock(*queue->mutex);
127 list_append(queue->list, data);
128 }
129
130 semaphore_post(queue->dequeue_sem);
131 }
132
fixed_queue_dequeue(fixed_queue_t * queue)133 void* fixed_queue_dequeue(fixed_queue_t* queue) {
134 log::assert_that(queue != NULL, "assert failed: queue != NULL");
135
136 semaphore_wait(queue->dequeue_sem);
137
138 void* ret = NULL;
139 {
140 std::lock_guard<std::mutex> lock(*queue->mutex);
141 ret = list_front(queue->list);
142 list_remove(queue->list, ret);
143 }
144
145 semaphore_post(queue->enqueue_sem);
146
147 return ret;
148 }
149
fixed_queue_try_enqueue(fixed_queue_t * queue,void * data)150 bool fixed_queue_try_enqueue(fixed_queue_t* queue, void* data) {
151 log::assert_that(queue != NULL, "assert failed: queue != NULL");
152 log::assert_that(data != NULL, "assert failed: data != NULL");
153
154 if (!semaphore_try_wait(queue->enqueue_sem)) return false;
155
156 {
157 std::lock_guard<std::mutex> lock(*queue->mutex);
158 list_append(queue->list, data);
159 }
160
161 semaphore_post(queue->dequeue_sem);
162 return true;
163 }
164
fixed_queue_try_dequeue(fixed_queue_t * queue)165 void* fixed_queue_try_dequeue(fixed_queue_t* queue) {
166 if (queue == NULL) return NULL;
167
168 if (!semaphore_try_wait(queue->dequeue_sem)) return NULL;
169
170 void* ret = NULL;
171 {
172 std::lock_guard<std::mutex> lock(*queue->mutex);
173 ret = list_front(queue->list);
174 list_remove(queue->list, ret);
175 }
176
177 semaphore_post(queue->enqueue_sem);
178
179 return ret;
180 }
181
fixed_queue_try_peek_first(fixed_queue_t * queue)182 void* fixed_queue_try_peek_first(fixed_queue_t* queue) {
183 if (queue == NULL) return NULL;
184
185 std::lock_guard<std::mutex> lock(*queue->mutex);
186 return list_is_empty(queue->list) ? NULL : list_front(queue->list);
187 }
188
fixed_queue_try_peek_last(fixed_queue_t * queue)189 void* fixed_queue_try_peek_last(fixed_queue_t* queue) {
190 if (queue == NULL) return NULL;
191
192 std::lock_guard<std::mutex> lock(*queue->mutex);
193 return list_is_empty(queue->list) ? NULL : list_back(queue->list);
194 }
195
fixed_queue_try_remove_from_queue(fixed_queue_t * queue,void * data)196 void* fixed_queue_try_remove_from_queue(fixed_queue_t* queue, void* data) {
197 if (queue == NULL) return NULL;
198
199 bool removed = false;
200 {
201 std::lock_guard<std::mutex> lock(*queue->mutex);
202 if (list_contains(queue->list, data) &&
203 semaphore_try_wait(queue->dequeue_sem)) {
204 removed = list_remove(queue->list, data);
205 log::assert_that(removed, "assert failed: removed");
206 }
207 }
208
209 if (removed) {
210 semaphore_post(queue->enqueue_sem);
211 return data;
212 }
213 return NULL;
214 }
215
fixed_queue_get_list(fixed_queue_t * queue)216 list_t* fixed_queue_get_list(fixed_queue_t* queue) {
217 log::assert_that(queue != NULL, "assert failed: queue != NULL");
218
219 // NOTE: Using the list in this way is not thread-safe.
220 // Using this list in any context where threads can call other functions
221 // to the queue can break our assumptions and the queue in general.
222 return queue->list;
223 }
224
fixed_queue_get_dequeue_fd(const fixed_queue_t * queue)225 int fixed_queue_get_dequeue_fd(const fixed_queue_t* queue) {
226 log::assert_that(queue != NULL, "assert failed: queue != NULL");
227 return semaphore_get_fd(queue->dequeue_sem);
228 }
229
fixed_queue_get_enqueue_fd(const fixed_queue_t * queue)230 int fixed_queue_get_enqueue_fd(const fixed_queue_t* queue) {
231 log::assert_that(queue != NULL, "assert failed: queue != NULL");
232 return semaphore_get_fd(queue->enqueue_sem);
233 }
234
fixed_queue_register_dequeue(fixed_queue_t * queue,reactor_t * reactor,fixed_queue_cb ready_cb,void * context)235 void fixed_queue_register_dequeue(fixed_queue_t* queue, reactor_t* reactor,
236 fixed_queue_cb ready_cb, void* context) {
237 log::assert_that(queue != NULL, "assert failed: queue != NULL");
238 log::assert_that(reactor != NULL, "assert failed: reactor != NULL");
239 log::assert_that(ready_cb != NULL, "assert failed: ready_cb != NULL");
240
241 // Make sure we're not already registered
242 fixed_queue_unregister_dequeue(queue);
243
244 queue->dequeue_ready = ready_cb;
245 queue->dequeue_context = context;
246 queue->dequeue_object =
247 reactor_register(reactor, fixed_queue_get_dequeue_fd(queue), queue,
248 internal_dequeue_ready, NULL);
249 }
250
fixed_queue_unregister_dequeue(fixed_queue_t * queue)251 void fixed_queue_unregister_dequeue(fixed_queue_t* queue) {
252 log::assert_that(queue != NULL, "assert failed: queue != NULL");
253
254 if (queue->dequeue_object) {
255 reactor_unregister(queue->dequeue_object);
256 queue->dequeue_object = NULL;
257 }
258 }
259
internal_dequeue_ready(void * context)260 static void internal_dequeue_ready(void* context) {
261 log::assert_that(context != NULL, "assert failed: context != NULL");
262
263 fixed_queue_t* queue = static_cast<fixed_queue_t*>(context);
264 queue->dequeue_ready(queue, queue->dequeue_context);
265 }
266