1 /* GStreamer
2 * Copyright (C) 2009 Edward Hervey <bilboed@bilboed.com>
3 * 2011 Wim Taymans <wim.taymans@gmail.com>
4 *
5 * gstatomicqueue.c:
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
16 *
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 #include "gst_private.h"
24
25 #include <string.h>
26
27 #include <gst/gst.h>
28 #include "gstatomicqueue.h"
29 #include "glib-compat-private.h"
30
31 /**
32 * SECTION:gstatomicqueue
33 * @title: GstAtomicQueue
34 * @short_description: An atomic queue implementation
35 *
36 * The #GstAtomicQueue object implements a queue that can be used from multiple
37 * threads without performing any blocking operations.
38 */
39
40 G_DEFINE_BOXED_TYPE (GstAtomicQueue, gst_atomic_queue,
41 (GBoxedCopyFunc) gst_atomic_queue_ref,
42 (GBoxedFreeFunc) gst_atomic_queue_unref);
43
44 /* By default the queue uses 2 * sizeof(gpointer) * clp2 (max_items) of
45 * memory. clp2(x) is the next power of two >= than x.
46 *
47 * The queue can operate in low memory mode, in which it consumes almost
48 * half the memory at the expense of extra overhead in the readers. This
49 * is disabled by default because even without LOW_MEM mode, the memory
50 * consumption is still lower than a plain GList.
51 */
52 #undef LOW_MEM
53
54 typedef struct _GstAQueueMem GstAQueueMem;
55
56 struct _GstAQueueMem
57 {
58 gint size;
59 gpointer *array;
60 gint head;
61 gint tail_write;
62 gint tail_read;
63 GstAQueueMem *next;
64 GstAQueueMem *free;
65 };
66
67 static guint
clp2(guint n)68 clp2 (guint n)
69 {
70 guint res = 1;
71
72 while (res < n)
73 res <<= 1;
74
75 return res;
76 }
77
78 static GstAQueueMem *
new_queue_mem(guint size,gint pos)79 new_queue_mem (guint size, gint pos)
80 {
81 GstAQueueMem *mem;
82
83 mem = g_new (GstAQueueMem, 1);
84
85 /* we keep the size as a mask for performance */
86 mem->size = clp2 (MAX (size, 16)) - 1;
87 mem->array = g_new0 (gpointer, mem->size + 1);
88 mem->head = pos;
89 mem->tail_write = pos;
90 mem->tail_read = pos;
91 mem->next = NULL;
92 mem->free = NULL;
93
94 return mem;
95 }
96
97 static void
free_queue_mem(GstAQueueMem * mem)98 free_queue_mem (GstAQueueMem * mem)
99 {
100 g_free (mem->array);
101 g_free (mem);
102 }
103
104 struct _GstAtomicQueue
105 {
106 gint refcount;
107 #ifdef LOW_MEM
108 gint num_readers;
109 #endif
110 GstAQueueMem *head_mem;
111 GstAQueueMem *tail_mem;
112 GstAQueueMem *free_list;
113 };
114
115 static void
add_to_free_list(GstAtomicQueue * queue,GstAQueueMem * mem)116 add_to_free_list (GstAtomicQueue * queue, GstAQueueMem * mem)
117 {
118 do {
119 mem->free = g_atomic_pointer_get (&queue->free_list);
120 } while (!g_atomic_pointer_compare_and_exchange (&queue->free_list,
121 mem->free, mem));
122 }
123
124 static void
clear_free_list(GstAtomicQueue * queue)125 clear_free_list (GstAtomicQueue * queue)
126 {
127 GstAQueueMem *free_list;
128
129 /* take the free list and replace with NULL */
130 do {
131 free_list = g_atomic_pointer_get (&queue->free_list);
132 if (free_list == NULL)
133 return;
134 } while (!g_atomic_pointer_compare_and_exchange (&queue->free_list, free_list,
135 NULL));
136
137 while (free_list) {
138 GstAQueueMem *next = free_list->free;
139
140 free_queue_mem (free_list);
141
142 free_list = next;
143 }
144 }
145
146 /**
147 * gst_atomic_queue_new:
148 * @initial_size: initial queue size
149 *
150 * Create a new atomic queue instance. @initial_size will be rounded up to the
151 * nearest power of 2 and used as the initial size of the queue.
152 *
153 * Returns: a new #GstAtomicQueue
154 */
155 GstAtomicQueue *
gst_atomic_queue_new(guint initial_size)156 gst_atomic_queue_new (guint initial_size)
157 {
158 GstAtomicQueue *queue;
159
160 queue = g_new (GstAtomicQueue, 1);
161
162 queue->refcount = 1;
163 #ifdef LOW_MEM
164 queue->num_readers = 0;
165 #endif
166 queue->head_mem = queue->tail_mem = new_queue_mem (initial_size, 0);
167 queue->free_list = NULL;
168
169 return queue;
170 }
171
172 /**
173 * gst_atomic_queue_ref:
174 * @queue: a #GstAtomicQueue
175 *
176 * Increase the refcount of @queue.
177 */
178 void
gst_atomic_queue_ref(GstAtomicQueue * queue)179 gst_atomic_queue_ref (GstAtomicQueue * queue)
180 {
181 g_return_if_fail (queue != NULL);
182
183 g_atomic_int_inc (&queue->refcount);
184 }
185
186 static void
gst_atomic_queue_free(GstAtomicQueue * queue)187 gst_atomic_queue_free (GstAtomicQueue * queue)
188 {
189 free_queue_mem (queue->head_mem);
190 if (queue->head_mem != queue->tail_mem)
191 free_queue_mem (queue->tail_mem);
192 clear_free_list (queue);
193 g_free (queue);
194 }
195
196 /**
197 * gst_atomic_queue_unref:
198 * @queue: a #GstAtomicQueue
199 *
200 * Unref @queue and free the memory when the refcount reaches 0.
201 */
202 void
gst_atomic_queue_unref(GstAtomicQueue * queue)203 gst_atomic_queue_unref (GstAtomicQueue * queue)
204 {
205 g_return_if_fail (queue != NULL);
206
207 if (g_atomic_int_dec_and_test (&queue->refcount))
208 gst_atomic_queue_free (queue);
209 }
210
211 /**
212 * gst_atomic_queue_peek:
213 * @queue: a #GstAtomicQueue
214 *
215 * Peek the head element of the queue without removing it from the queue.
216 *
217 * Returns: (transfer none) (nullable): the head element of @queue or
218 * %NULL when the queue is empty.
219 */
220 gpointer
gst_atomic_queue_peek(GstAtomicQueue * queue)221 gst_atomic_queue_peek (GstAtomicQueue * queue)
222 {
223 GstAQueueMem *head_mem;
224 gint head, tail, size;
225
226 g_return_val_if_fail (queue != NULL, NULL);
227
228 while (TRUE) {
229 GstAQueueMem *next;
230
231 head_mem = g_atomic_pointer_get (&queue->head_mem);
232
233 head = g_atomic_int_get (&head_mem->head);
234 tail = g_atomic_int_get (&head_mem->tail_read);
235 size = head_mem->size;
236
237 /* when we are not empty, we can continue */
238 if (G_LIKELY (head != tail))
239 break;
240
241 /* else array empty, try to take next */
242 next = g_atomic_pointer_get (&head_mem->next);
243 if (next == NULL)
244 return NULL;
245
246 /* now we try to move the next array as the head memory. If we fail to do that,
247 * some other reader managed to do it first and we retry */
248 if (!g_atomic_pointer_compare_and_exchange (&queue->head_mem, head_mem,
249 next))
250 continue;
251
252 /* when we managed to swing the head pointer the old head is now
253 * useless and we add it to the freelist. We can't free the memory yet
254 * because we first need to make sure no reader is accessing it anymore. */
255 add_to_free_list (queue, head_mem);
256 }
257
258 return head_mem->array[head & size];
259 }
260
261 /**
262 * gst_atomic_queue_pop:
263 * @queue: a #GstAtomicQueue
264 *
265 * Get the head element of the queue.
266 *
267 * Returns: (transfer full): the head element of @queue or %NULL when
268 * the queue is empty.
269 */
270 gpointer
gst_atomic_queue_pop(GstAtomicQueue * queue)271 gst_atomic_queue_pop (GstAtomicQueue * queue)
272 {
273 gpointer ret;
274 GstAQueueMem *head_mem;
275 gint head, tail, size;
276
277 g_return_val_if_fail (queue != NULL, NULL);
278
279 #ifdef LOW_MEM
280 g_atomic_int_inc (&queue->num_readers);
281 #endif
282
283 do {
284 while (TRUE) {
285 GstAQueueMem *next;
286
287 head_mem = g_atomic_pointer_get (&queue->head_mem);
288
289 head = g_atomic_int_get (&head_mem->head);
290 tail = g_atomic_int_get (&head_mem->tail_read);
291 size = head_mem->size;
292
293 /* when we are not empty, we can continue */
294 if G_LIKELY
295 (head != tail)
296 break;
297
298 /* else array empty, try to take next */
299 next = g_atomic_pointer_get (&head_mem->next);
300 if (next == NULL)
301 return NULL;
302
303 /* now we try to move the next array as the head memory. If we fail to do that,
304 * some other reader managed to do it first and we retry */
305 if G_UNLIKELY
306 (!g_atomic_pointer_compare_and_exchange (&queue->head_mem, head_mem,
307 next))
308 continue;
309
310 /* when we managed to swing the head pointer the old head is now
311 * useless and we add it to the freelist. We can't free the memory yet
312 * because we first need to make sure no reader is accessing it anymore. */
313 add_to_free_list (queue, head_mem);
314 }
315
316 ret = head_mem->array[head & size];
317 } while G_UNLIKELY
318 (!g_atomic_int_compare_and_exchange (&head_mem->head, head, head + 1));
319
320 #ifdef LOW_MEM
321 /* decrement number of readers, when we reach 0 readers we can be sure that
322 * none is accessing the memory in the free list and we can try to clean up */
323 if (g_atomic_int_dec_and_test (&queue->num_readers))
324 clear_free_list (queue);
325 #endif
326
327 return ret;
328 }
329
330 /**
331 * gst_atomic_queue_push:
332 * @queue: a #GstAtomicQueue
333 * @data: the data
334 *
335 * Append @data to the tail of the queue.
336 */
337 void
gst_atomic_queue_push(GstAtomicQueue * queue,gpointer data)338 gst_atomic_queue_push (GstAtomicQueue * queue, gpointer data)
339 {
340 GstAQueueMem *tail_mem;
341 gint head, tail, size;
342
343 g_return_if_fail (queue != NULL);
344
345 do {
346 while (TRUE) {
347 GstAQueueMem *mem;
348
349 tail_mem = g_atomic_pointer_get (&queue->tail_mem);
350 head = g_atomic_int_get (&tail_mem->head);
351 tail = g_atomic_int_get (&tail_mem->tail_write);
352 size = tail_mem->size;
353
354 /* we're not full, continue */
355 if G_LIKELY
356 (tail - head <= size)
357 break;
358
359 /* else we need to grow the array, we store a mask so we have to add 1 */
360 mem = new_queue_mem ((size << 1) + 1, tail);
361
362 /* try to make our new array visible to other writers */
363 if G_UNLIKELY
364 (!g_atomic_pointer_compare_and_exchange (&queue->tail_mem, tail_mem,
365 mem)) {
366 /* we tried to swap the new writer array but something changed. This is
367 * because some other writer beat us to it, we free our memory and try
368 * again */
369 free_queue_mem (mem);
370 continue;
371 }
372 /* make sure that readers can find our new array as well. The one who
373 * manages to swap the pointer is the only one who can set the next
374 * pointer to the new array */
375 g_atomic_pointer_set (&tail_mem->next, mem);
376 }
377 } while G_UNLIKELY
378 (!g_atomic_int_compare_and_exchange (&tail_mem->tail_write, tail, tail + 1));
379
380 tail_mem->array[tail & size] = data;
381
382 /* now wait until all writers have completed their write before we move the
383 * tail_read to this new item. It is possible that other writers are still
384 * updating the previous array slots and we don't want to reveal their changes
385 * before they are done. FIXME, it would be nice if we didn't have to busy
386 * wait here. */
387 while G_UNLIKELY
388 (!g_atomic_int_compare_and_exchange (&tail_mem->tail_read, tail, tail + 1));
389 }
390
391 /**
392 * gst_atomic_queue_length:
393 * @queue: a #GstAtomicQueue
394 *
395 * Get the amount of items in the queue.
396 *
397 * Returns: the number of elements in the queue.
398 */
399 guint
gst_atomic_queue_length(GstAtomicQueue * queue)400 gst_atomic_queue_length (GstAtomicQueue * queue)
401 {
402 GstAQueueMem *head_mem, *tail_mem;
403 gint head, tail;
404
405 g_return_val_if_fail (queue != NULL, 0);
406
407 #ifdef LOW_MEM
408 g_atomic_int_inc (&queue->num_readers);
409 #endif
410
411 head_mem = g_atomic_pointer_get (&queue->head_mem);
412 head = g_atomic_int_get (&head_mem->head);
413
414 tail_mem = g_atomic_pointer_get (&queue->tail_mem);
415 tail = g_atomic_int_get (&tail_mem->tail_read);
416
417 #ifdef LOW_MEM
418 if (g_atomic_int_dec_and_test (&queue->num_readers))
419 clear_free_list (queue);
420 #endif
421
422 return tail - head;
423 }
424