• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 2009 Edward Hervey <bilboed@bilboed.com>
3  *               2011 Wim Taymans <wim.taymans@gmail.com>
4  *
5  * gstatomicqueue.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #include "gst_private.h"
24 
25 #include <string.h>
26 
27 #include <gst/gst.h>
28 #include "gstatomicqueue.h"
29 #include "glib-compat-private.h"
30 
31 /**
32  * SECTION:gstatomicqueue
33  * @title: GstAtomicQueue
34  * @short_description: An atomic queue implementation
35  *
36  * The #GstAtomicQueue object implements a queue that can be used from multiple
37  * threads without performing any blocking operations.
38  */
39 
40 G_DEFINE_BOXED_TYPE (GstAtomicQueue, gst_atomic_queue,
41     (GBoxedCopyFunc) gst_atomic_queue_ref,
42     (GBoxedFreeFunc) gst_atomic_queue_unref);
43 
44 /* By default the queue uses 2 * sizeof(gpointer) * clp2 (max_items) of
45  * memory. clp2(x) is the next power of two >= than x.
46  *
47  * The queue can operate in low memory mode, in which it consumes almost
48  * half the memory at the expense of extra overhead in the readers. This
49  * is disabled by default because even without LOW_MEM mode, the memory
50  * consumption is still lower than a plain GList.
51  */
52 #undef LOW_MEM
53 
54 typedef struct _GstAQueueMem GstAQueueMem;
55 
56 struct _GstAQueueMem
57 {
58   gint size;
59   gpointer *array;
60   gint head;
61   gint tail_write;
62   gint tail_read;
63   GstAQueueMem *next;
64   GstAQueueMem *free;
65 };
66 
67 static guint
clp2(guint n)68 clp2 (guint n)
69 {
70   guint res = 1;
71 
72   while (res < n)
73     res <<= 1;
74 
75   return res;
76 }
77 
78 static GstAQueueMem *
new_queue_mem(guint size,gint pos)79 new_queue_mem (guint size, gint pos)
80 {
81   GstAQueueMem *mem;
82 
83   mem = g_new (GstAQueueMem, 1);
84 
85   /* we keep the size as a mask for performance */
86   mem->size = clp2 (MAX (size, 16)) - 1;
87   mem->array = g_new0 (gpointer, mem->size + 1);
88   mem->head = pos;
89   mem->tail_write = pos;
90   mem->tail_read = pos;
91   mem->next = NULL;
92   mem->free = NULL;
93 
94   return mem;
95 }
96 
97 static void
free_queue_mem(GstAQueueMem * mem)98 free_queue_mem (GstAQueueMem * mem)
99 {
100   g_free (mem->array);
101   g_free (mem);
102 }
103 
104 struct _GstAtomicQueue
105 {
106   gint refcount;
107 #ifdef LOW_MEM
108   gint num_readers;
109 #endif
110   GstAQueueMem *head_mem;
111   GstAQueueMem *tail_mem;
112   GstAQueueMem *free_list;
113 };
114 
115 static void
add_to_free_list(GstAtomicQueue * queue,GstAQueueMem * mem)116 add_to_free_list (GstAtomicQueue * queue, GstAQueueMem * mem)
117 {
118   do {
119     mem->free = g_atomic_pointer_get (&queue->free_list);
120   } while (!g_atomic_pointer_compare_and_exchange (&queue->free_list,
121           mem->free, mem));
122 }
123 
124 static void
clear_free_list(GstAtomicQueue * queue)125 clear_free_list (GstAtomicQueue * queue)
126 {
127   GstAQueueMem *free_list;
128 
129   /* take the free list and replace with NULL */
130   do {
131     free_list = g_atomic_pointer_get (&queue->free_list);
132     if (free_list == NULL)
133       return;
134   } while (!g_atomic_pointer_compare_and_exchange (&queue->free_list, free_list,
135           NULL));
136 
137   while (free_list) {
138     GstAQueueMem *next = free_list->free;
139 
140     free_queue_mem (free_list);
141 
142     free_list = next;
143   }
144 }
145 
146 /**
147  * gst_atomic_queue_new:
148  * @initial_size: initial queue size
149  *
150  * Create a new atomic queue instance. @initial_size will be rounded up to the
151  * nearest power of 2 and used as the initial size of the queue.
152  *
153  * Returns: a new #GstAtomicQueue
154  */
155 GstAtomicQueue *
gst_atomic_queue_new(guint initial_size)156 gst_atomic_queue_new (guint initial_size)
157 {
158   GstAtomicQueue *queue;
159 
160   queue = g_new (GstAtomicQueue, 1);
161 
162   queue->refcount = 1;
163 #ifdef LOW_MEM
164   queue->num_readers = 0;
165 #endif
166   queue->head_mem = queue->tail_mem = new_queue_mem (initial_size, 0);
167   queue->free_list = NULL;
168 
169   return queue;
170 }
171 
172 /**
173  * gst_atomic_queue_ref:
174  * @queue: a #GstAtomicQueue
175  *
176  * Increase the refcount of @queue.
177  */
178 void
gst_atomic_queue_ref(GstAtomicQueue * queue)179 gst_atomic_queue_ref (GstAtomicQueue * queue)
180 {
181   g_return_if_fail (queue != NULL);
182 
183   g_atomic_int_inc (&queue->refcount);
184 }
185 
186 static void
gst_atomic_queue_free(GstAtomicQueue * queue)187 gst_atomic_queue_free (GstAtomicQueue * queue)
188 {
189   free_queue_mem (queue->head_mem);
190   if (queue->head_mem != queue->tail_mem)
191     free_queue_mem (queue->tail_mem);
192   clear_free_list (queue);
193   g_free (queue);
194 }
195 
196 /**
197  * gst_atomic_queue_unref:
198  * @queue: a #GstAtomicQueue
199  *
200  * Unref @queue and free the memory when the refcount reaches 0.
201  */
202 void
gst_atomic_queue_unref(GstAtomicQueue * queue)203 gst_atomic_queue_unref (GstAtomicQueue * queue)
204 {
205   g_return_if_fail (queue != NULL);
206 
207   if (g_atomic_int_dec_and_test (&queue->refcount))
208     gst_atomic_queue_free (queue);
209 }
210 
211 /**
212  * gst_atomic_queue_peek:
213  * @queue: a #GstAtomicQueue
214  *
215  * Peek the head element of the queue without removing it from the queue.
216  *
217  * Returns: (transfer none) (nullable): the head element of @queue or
218  * %NULL when the queue is empty.
219  */
220 gpointer
gst_atomic_queue_peek(GstAtomicQueue * queue)221 gst_atomic_queue_peek (GstAtomicQueue * queue)
222 {
223   GstAQueueMem *head_mem;
224   gint head, tail, size;
225 
226   g_return_val_if_fail (queue != NULL, NULL);
227 
228   while (TRUE) {
229     GstAQueueMem *next;
230 
231     head_mem = g_atomic_pointer_get (&queue->head_mem);
232 
233     head = g_atomic_int_get (&head_mem->head);
234     tail = g_atomic_int_get (&head_mem->tail_read);
235     size = head_mem->size;
236 
237     /* when we are not empty, we can continue */
238     if (G_LIKELY (head != tail))
239       break;
240 
241     /* else array empty, try to take next */
242     next = g_atomic_pointer_get (&head_mem->next);
243     if (next == NULL)
244       return NULL;
245 
246     /* now we try to move the next array as the head memory. If we fail to do that,
247      * some other reader managed to do it first and we retry */
248     if (!g_atomic_pointer_compare_and_exchange (&queue->head_mem, head_mem,
249             next))
250       continue;
251 
252     /* when we managed to swing the head pointer the old head is now
253      * useless and we add it to the freelist. We can't free the memory yet
254      * because we first need to make sure no reader is accessing it anymore. */
255     add_to_free_list (queue, head_mem);
256   }
257 
258   return head_mem->array[head & size];
259 }
260 
261 /**
262  * gst_atomic_queue_pop:
263  * @queue: a #GstAtomicQueue
264  *
265  * Get the head element of the queue.
266  *
267  * Returns: (transfer full): the head element of @queue or %NULL when
268  * the queue is empty.
269  */
270 gpointer
gst_atomic_queue_pop(GstAtomicQueue * queue)271 gst_atomic_queue_pop (GstAtomicQueue * queue)
272 {
273   gpointer ret;
274   GstAQueueMem *head_mem;
275   gint head, tail, size;
276 
277   g_return_val_if_fail (queue != NULL, NULL);
278 
279 #ifdef LOW_MEM
280   g_atomic_int_inc (&queue->num_readers);
281 #endif
282 
283   do {
284     while (TRUE) {
285       GstAQueueMem *next;
286 
287       head_mem = g_atomic_pointer_get (&queue->head_mem);
288 
289       head = g_atomic_int_get (&head_mem->head);
290       tail = g_atomic_int_get (&head_mem->tail_read);
291       size = head_mem->size;
292 
293       /* when we are not empty, we can continue */
294       if G_LIKELY
295         (head != tail)
296             break;
297 
298       /* else array empty, try to take next */
299       next = g_atomic_pointer_get (&head_mem->next);
300       if (next == NULL)
301         return NULL;
302 
303       /* now we try to move the next array as the head memory. If we fail to do that,
304        * some other reader managed to do it first and we retry */
305       if G_UNLIKELY
306         (!g_atomic_pointer_compare_and_exchange (&queue->head_mem, head_mem,
307                 next))
308             continue;
309 
310       /* when we managed to swing the head pointer the old head is now
311        * useless and we add it to the freelist. We can't free the memory yet
312        * because we first need to make sure no reader is accessing it anymore. */
313       add_to_free_list (queue, head_mem);
314     }
315 
316     ret = head_mem->array[head & size];
317   } while G_UNLIKELY
318   (!g_atomic_int_compare_and_exchange (&head_mem->head, head, head + 1));
319 
320 #ifdef LOW_MEM
321   /* decrement number of readers, when we reach 0 readers we can be sure that
322    * none is accessing the memory in the free list and we can try to clean up */
323   if (g_atomic_int_dec_and_test (&queue->num_readers))
324     clear_free_list (queue);
325 #endif
326 
327   return ret;
328 }
329 
330 /**
331  * gst_atomic_queue_push:
332  * @queue: a #GstAtomicQueue
333  * @data: the data
334  *
335  * Append @data to the tail of the queue.
336  */
337 void
gst_atomic_queue_push(GstAtomicQueue * queue,gpointer data)338 gst_atomic_queue_push (GstAtomicQueue * queue, gpointer data)
339 {
340   GstAQueueMem *tail_mem;
341   gint head, tail, size;
342 
343   g_return_if_fail (queue != NULL);
344 
345   do {
346     while (TRUE) {
347       GstAQueueMem *mem;
348 
349       tail_mem = g_atomic_pointer_get (&queue->tail_mem);
350       head = g_atomic_int_get (&tail_mem->head);
351       tail = g_atomic_int_get (&tail_mem->tail_write);
352       size = tail_mem->size;
353 
354       /* we're not full, continue */
355       if G_LIKELY
356         (tail - head <= size)
357             break;
358 
359       /* else we need to grow the array, we store a mask so we have to add 1 */
360       mem = new_queue_mem ((size << 1) + 1, tail);
361 
362       /* try to make our new array visible to other writers */
363       if G_UNLIKELY
364         (!g_atomic_pointer_compare_and_exchange (&queue->tail_mem, tail_mem,
365                 mem)) {
366         /* we tried to swap the new writer array but something changed. This is
367          * because some other writer beat us to it, we free our memory and try
368          * again */
369         free_queue_mem (mem);
370         continue;
371         }
372       /* make sure that readers can find our new array as well. The one who
373        * manages to swap the pointer is the only one who can set the next
374        * pointer to the new array */
375       g_atomic_pointer_set (&tail_mem->next, mem);
376     }
377   } while G_UNLIKELY
378   (!g_atomic_int_compare_and_exchange (&tail_mem->tail_write, tail, tail + 1));
379 
380   tail_mem->array[tail & size] = data;
381 
382   /* now wait until all writers have completed their write before we move the
383    * tail_read to this new item. It is possible that other writers are still
384    * updating the previous array slots and we don't want to reveal their changes
385    * before they are done. FIXME, it would be nice if we didn't have to busy
386    * wait here. */
387   while G_UNLIKELY
388     (!g_atomic_int_compare_and_exchange (&tail_mem->tail_read, tail, tail + 1));
389 }
390 
391 /**
392  * gst_atomic_queue_length:
393  * @queue: a #GstAtomicQueue
394  *
395  * Get the amount of items in the queue.
396  *
397  * Returns: the number of elements in the queue.
398  */
399 guint
gst_atomic_queue_length(GstAtomicQueue * queue)400 gst_atomic_queue_length (GstAtomicQueue * queue)
401 {
402   GstAQueueMem *head_mem, *tail_mem;
403   gint head, tail;
404 
405   g_return_val_if_fail (queue != NULL, 0);
406 
407 #ifdef LOW_MEM
408   g_atomic_int_inc (&queue->num_readers);
409 #endif
410 
411   head_mem = g_atomic_pointer_get (&queue->head_mem);
412   head = g_atomic_int_get (&head_mem->head);
413 
414   tail_mem = g_atomic_pointer_get (&queue->tail_mem);
415   tail = g_atomic_int_get (&tail_mem->tail_read);
416 
417 #ifdef LOW_MEM
418   if (g_atomic_int_dec_and_test (&queue->num_readers))
419     clear_free_list (queue);
420 #endif
421 
422   return tail - head;
423 }
424