• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /******************************************************************************
2  *
3  *  Copyright 2014 Google, Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at:
8  *
9  *  http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  ******************************************************************************/
18 
19 #include "osi/include/fixed_queue.h"
20 
21 #include <bluetooth/log.h>
22 #include <string.h>
23 
24 #include <mutex>
25 
26 #include "osi/include/allocator.h"
27 #include "osi/include/list.h"
28 #include "osi/include/osi.h"
29 #include "osi/include/reactor.h"
30 #include "osi/semaphore.h"
31 
32 using namespace bluetooth;
33 
34 typedef struct fixed_queue_t {
35   list_t* list;
36   semaphore_t* enqueue_sem;
37   semaphore_t* dequeue_sem;
38   std::mutex* mutex;
39   size_t capacity;
40 
41   reactor_object_t* dequeue_object;
42   fixed_queue_cb dequeue_ready;
43   void* dequeue_context;
44 } fixed_queue_t;
45 
46 static void internal_dequeue_ready(void* context);
47 
fixed_queue_new(size_t capacity)48 fixed_queue_t* fixed_queue_new(size_t capacity) {
49   fixed_queue_t* ret =
50       static_cast<fixed_queue_t*>(osi_calloc(sizeof(fixed_queue_t)));
51 
52   ret->mutex = new std::mutex;
53   ret->capacity = capacity;
54 
55   ret->list = list_new(NULL);
56   if (!ret->list) goto error;
57 
58   ret->enqueue_sem = semaphore_new(capacity);
59   if (!ret->enqueue_sem) goto error;
60 
61   ret->dequeue_sem = semaphore_new(0);
62   if (!ret->dequeue_sem) goto error;
63 
64   return ret;
65 
66 error:
67   fixed_queue_free(ret, NULL);
68   return NULL;
69 }
70 
fixed_queue_free(fixed_queue_t * queue,fixed_queue_free_cb free_cb)71 void fixed_queue_free(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
72   if (!queue) return;
73 
74   fixed_queue_unregister_dequeue(queue);
75 
76   if (free_cb)
77     for (const list_node_t* node = list_begin(queue->list);
78          node != list_end(queue->list); node = list_next(node))
79       free_cb(list_node(node));
80 
81   list_free(queue->list);
82   semaphore_free(queue->enqueue_sem);
83   semaphore_free(queue->dequeue_sem);
84   delete queue->mutex;
85   osi_free(queue);
86 }
87 
fixed_queue_flush(fixed_queue_t * queue,fixed_queue_free_cb free_cb)88 void fixed_queue_flush(fixed_queue_t* queue, fixed_queue_free_cb free_cb) {
89   if (!queue) return;
90 
91   while (!fixed_queue_is_empty(queue)) {
92     void* data = fixed_queue_try_dequeue(queue);
93     if (free_cb != NULL) {
94       free_cb(data);
95     }
96   }
97 }
98 
fixed_queue_is_empty(fixed_queue_t * queue)99 bool fixed_queue_is_empty(fixed_queue_t* queue) {
100   if (queue == NULL) return true;
101 
102   std::lock_guard<std::mutex> lock(*queue->mutex);
103   return list_is_empty(queue->list);
104 }
105 
fixed_queue_length(fixed_queue_t * queue)106 size_t fixed_queue_length(fixed_queue_t* queue) {
107   if (queue == NULL) return 0;
108 
109   std::lock_guard<std::mutex> lock(*queue->mutex);
110   return list_length(queue->list);
111 }
112 
fixed_queue_capacity(fixed_queue_t * queue)113 size_t fixed_queue_capacity(fixed_queue_t* queue) {
114   log::assert_that(queue != NULL, "assert failed: queue != NULL");
115 
116   return queue->capacity;
117 }
118 
fixed_queue_enqueue(fixed_queue_t * queue,void * data)119 void fixed_queue_enqueue(fixed_queue_t* queue, void* data) {
120   log::assert_that(queue != NULL, "assert failed: queue != NULL");
121   log::assert_that(data != NULL, "assert failed: data != NULL");
122 
123   semaphore_wait(queue->enqueue_sem);
124 
125   {
126     std::lock_guard<std::mutex> lock(*queue->mutex);
127     list_append(queue->list, data);
128   }
129 
130   semaphore_post(queue->dequeue_sem);
131 }
132 
fixed_queue_dequeue(fixed_queue_t * queue)133 void* fixed_queue_dequeue(fixed_queue_t* queue) {
134   log::assert_that(queue != NULL, "assert failed: queue != NULL");
135 
136   semaphore_wait(queue->dequeue_sem);
137 
138   void* ret = NULL;
139   {
140     std::lock_guard<std::mutex> lock(*queue->mutex);
141     ret = list_front(queue->list);
142     list_remove(queue->list, ret);
143   }
144 
145   semaphore_post(queue->enqueue_sem);
146 
147   return ret;
148 }
149 
fixed_queue_try_enqueue(fixed_queue_t * queue,void * data)150 bool fixed_queue_try_enqueue(fixed_queue_t* queue, void* data) {
151   log::assert_that(queue != NULL, "assert failed: queue != NULL");
152   log::assert_that(data != NULL, "assert failed: data != NULL");
153 
154   if (!semaphore_try_wait(queue->enqueue_sem)) return false;
155 
156   {
157     std::lock_guard<std::mutex> lock(*queue->mutex);
158     list_append(queue->list, data);
159   }
160 
161   semaphore_post(queue->dequeue_sem);
162   return true;
163 }
164 
fixed_queue_try_dequeue(fixed_queue_t * queue)165 void* fixed_queue_try_dequeue(fixed_queue_t* queue) {
166   if (queue == NULL) return NULL;
167 
168   if (!semaphore_try_wait(queue->dequeue_sem)) return NULL;
169 
170   void* ret = NULL;
171   {
172     std::lock_guard<std::mutex> lock(*queue->mutex);
173     ret = list_front(queue->list);
174     list_remove(queue->list, ret);
175   }
176 
177   semaphore_post(queue->enqueue_sem);
178 
179   return ret;
180 }
181 
fixed_queue_try_peek_first(fixed_queue_t * queue)182 void* fixed_queue_try_peek_first(fixed_queue_t* queue) {
183   if (queue == NULL) return NULL;
184 
185   std::lock_guard<std::mutex> lock(*queue->mutex);
186   return list_is_empty(queue->list) ? NULL : list_front(queue->list);
187 }
188 
fixed_queue_try_peek_last(fixed_queue_t * queue)189 void* fixed_queue_try_peek_last(fixed_queue_t* queue) {
190   if (queue == NULL) return NULL;
191 
192   std::lock_guard<std::mutex> lock(*queue->mutex);
193   return list_is_empty(queue->list) ? NULL : list_back(queue->list);
194 }
195 
fixed_queue_try_remove_from_queue(fixed_queue_t * queue,void * data)196 void* fixed_queue_try_remove_from_queue(fixed_queue_t* queue, void* data) {
197   if (queue == NULL) return NULL;
198 
199   bool removed = false;
200   {
201     std::lock_guard<std::mutex> lock(*queue->mutex);
202     if (list_contains(queue->list, data) &&
203         semaphore_try_wait(queue->dequeue_sem)) {
204       removed = list_remove(queue->list, data);
205       log::assert_that(removed, "assert failed: removed");
206     }
207   }
208 
209   if (removed) {
210     semaphore_post(queue->enqueue_sem);
211     return data;
212   }
213   return NULL;
214 }
215 
fixed_queue_get_list(fixed_queue_t * queue)216 list_t* fixed_queue_get_list(fixed_queue_t* queue) {
217   log::assert_that(queue != NULL, "assert failed: queue != NULL");
218 
219   // NOTE: Using the list in this way is not thread-safe.
220   // Using this list in any context where threads can call other functions
221   // to the queue can break our assumptions and the queue in general.
222   return queue->list;
223 }
224 
fixed_queue_get_dequeue_fd(const fixed_queue_t * queue)225 int fixed_queue_get_dequeue_fd(const fixed_queue_t* queue) {
226   log::assert_that(queue != NULL, "assert failed: queue != NULL");
227   return semaphore_get_fd(queue->dequeue_sem);
228 }
229 
fixed_queue_get_enqueue_fd(const fixed_queue_t * queue)230 int fixed_queue_get_enqueue_fd(const fixed_queue_t* queue) {
231   log::assert_that(queue != NULL, "assert failed: queue != NULL");
232   return semaphore_get_fd(queue->enqueue_sem);
233 }
234 
fixed_queue_register_dequeue(fixed_queue_t * queue,reactor_t * reactor,fixed_queue_cb ready_cb,void * context)235 void fixed_queue_register_dequeue(fixed_queue_t* queue, reactor_t* reactor,
236                                   fixed_queue_cb ready_cb, void* context) {
237   log::assert_that(queue != NULL, "assert failed: queue != NULL");
238   log::assert_that(reactor != NULL, "assert failed: reactor != NULL");
239   log::assert_that(ready_cb != NULL, "assert failed: ready_cb != NULL");
240 
241   // Make sure we're not already registered
242   fixed_queue_unregister_dequeue(queue);
243 
244   queue->dequeue_ready = ready_cb;
245   queue->dequeue_context = context;
246   queue->dequeue_object =
247       reactor_register(reactor, fixed_queue_get_dequeue_fd(queue), queue,
248                        internal_dequeue_ready, NULL);
249 }
250 
fixed_queue_unregister_dequeue(fixed_queue_t * queue)251 void fixed_queue_unregister_dequeue(fixed_queue_t* queue) {
252   log::assert_that(queue != NULL, "assert failed: queue != NULL");
253 
254   if (queue->dequeue_object) {
255     reactor_unregister(queue->dequeue_object);
256     queue->dequeue_object = NULL;
257   }
258 }
259 
internal_dequeue_ready(void * context)260 static void internal_dequeue_ready(void* context) {
261   log::assert_that(context != NULL, "assert failed: context != NULL");
262 
263   fixed_queue_t* queue = static_cast<fixed_queue_t*>(context);
264   queue->dequeue_ready(queue, queue->dequeue_context);
265 }
266