• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3  *
4  * gstdataqueue.c:
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 
22 /**
23  * SECTION:gstdataqueue
24  * @title: GstDataQueue
25  * @short_description: Threadsafe queueing object
26  *
27  * #GstDataQueue is an object that handles threadsafe queueing of objects. It
28  * also provides size-related functionality. This object should be used for
29  * any #GstElement that wishes to provide some sort of queueing functionality.
30  */
31 #ifdef HAVE_CONFIG_H
32 #include "config.h"
33 #endif
34 
35 #include <gst/gst.h>
36 #include "string.h"
37 #include "gstdataqueue.h"
38 #include "gstqueuearray.h"
39 #include "gst/glib-compat-private.h"
40 
41 GST_DEBUG_CATEGORY_STATIC (data_queue_debug);
42 #define GST_CAT_DEFAULT (data_queue_debug)
43 GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow);
44 
45 
46 /* Queue signals and args */
47 enum
48 {
49   SIGNAL_EMPTY,
50   SIGNAL_FULL,
51   LAST_SIGNAL
52 };
53 
54 enum
55 {
56   PROP_0,
57   PROP_CUR_LEVEL_VISIBLE,
58   PROP_CUR_LEVEL_BYTES,
59   PROP_CUR_LEVEL_TIME
60       /* FILL ME */
61 };
62 
63 struct _GstDataQueuePrivate
64 {
65   /* the array of data we're keeping our grubby hands on */
66   GstQueueArray *queue;
67 
68   GstDataQueueSize cur_level;   /* size of the queue */
69   GstDataQueueCheckFullFunction checkfull;      /* Callback to check if the queue is full */
70   gpointer *checkdata;
71 
72   GMutex qlock;                 /* lock for queue (vs object lock) */
73   gboolean waiting_add;
74   GCond item_add;               /* signals buffers now available for reading */
75   gboolean waiting_del;
76   GCond item_del;               /* signals space now available for writing */
77   gboolean flushing;            /* indicates whether conditions where signalled because
78                                  * of external flushing */
79   GstDataQueueFullCallback fullcallback;
80   GstDataQueueEmptyCallback emptycallback;
81 };
82 
83 #define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START {                     \
84     GST_CAT_TRACE (data_queue_dataflow,                                 \
85       "locking qlock from thread %p",                                   \
86       g_thread_self ());                                                \
87   g_mutex_lock (&q->priv->qlock);                                       \
88   GST_CAT_TRACE (data_queue_dataflow,                                   \
89       "locked qlock from thread %p",                                    \
90       g_thread_self ());                                                \
91 } G_STMT_END
92 
93 #define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START {        \
94     GST_DATA_QUEUE_MUTEX_LOCK (q);                                      \
95     if (q->priv->flushing)                                              \
96       goto label;                                                       \
97   } G_STMT_END
98 
99 #define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                   \
100     GST_CAT_TRACE (data_queue_dataflow,                                 \
101       "unlocking qlock from thread %p",                                 \
102       g_thread_self ());                                                \
103   g_mutex_unlock (&q->priv->qlock);                                     \
104 } G_STMT_END
105 
106 #define STATUS(q, msg)                                                  \
107   GST_CAT_LOG (data_queue_dataflow,                                     \
108                "queue:%p " msg ": %u visible items, %u "                \
109                "bytes, %"G_GUINT64_FORMAT                               \
110                " ns, %u elements",                                      \
111                queue,                                                   \
112                q->priv->cur_level.visible,                              \
113                q->priv->cur_level.bytes,                                \
114                q->priv->cur_level.time,                                 \
115                gst_queue_array_get_length (q->priv->queue))
116 
117 static void gst_data_queue_finalize (GObject * object);
118 
119 static void gst_data_queue_set_property (GObject * object,
120     guint prop_id, const GValue * value, GParamSpec * pspec);
121 static void gst_data_queue_get_property (GObject * object,
122     guint prop_id, GValue * value, GParamSpec * pspec);
123 
124 static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };
125 
126 #define _do_init \
127 { \
128   GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0, \
129       "data queue object"); \
130   GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0, \
131       "dataflow inside the data queue object"); \
132 }
133 
134 #define parent_class gst_data_queue_parent_class
135 G_DEFINE_TYPE_WITH_CODE (GstDataQueue, gst_data_queue, G_TYPE_OBJECT,
136     G_ADD_PRIVATE (GstDataQueue) _do_init);
137 
138 static void
gst_data_queue_class_init(GstDataQueueClass * klass)139 gst_data_queue_class_init (GstDataQueueClass * klass)
140 {
141   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
142 
143   gobject_class->set_property = gst_data_queue_set_property;
144   gobject_class->get_property = gst_data_queue_get_property;
145 
146   /* signals */
147   /**
148    * GstDataQueue::empty: (skip)
149    * @queue: the queue instance
150    *
151    * Reports that the queue became empty (empty).
152    * A queue is empty if the total amount of visible items inside it (num-visible, time,
153    * size) is lower than the boundary values which can be set through the GObject
154    * properties.
155    */
156   gst_data_queue_signals[SIGNAL_EMPTY] =
157       g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
158       G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL,
159       NULL, G_TYPE_NONE, 0);
160 
161   /**
162    * GstDataQueue::full: (skip)
163    * @queue: the queue instance
164    *
165    * Reports that the queue became full (full).
166    * A queue is full if the total amount of data inside it (num-visible, time,
167    * size) is higher than the boundary values which can be set through the GObject
168    * properties.
169    */
170   gst_data_queue_signals[SIGNAL_FULL] =
171       g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
172       G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL,
173       NULL, G_TYPE_NONE, 0);
174 
175   /* properties */
176   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
177       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
178           "Current amount of data in the queue (bytes)",
179           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
180   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_VISIBLE,
181       g_param_spec_uint ("current-level-visible",
182           "Current level (visible items)",
183           "Current number of visible items in the queue", 0, G_MAXUINT, 0,
184           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
185   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
186       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
187           "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0,
188           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
189 
190   gobject_class->finalize = gst_data_queue_finalize;
191 }
192 
193 static void
gst_data_queue_init(GstDataQueue * queue)194 gst_data_queue_init (GstDataQueue * queue)
195 {
196   queue->priv = gst_data_queue_get_instance_private (queue);
197 
198   queue->priv->cur_level.visible = 0;   /* no content */
199   queue->priv->cur_level.bytes = 0;     /* no content */
200   queue->priv->cur_level.time = 0;      /* no content */
201 
202   queue->priv->checkfull = NULL;
203 
204   g_mutex_init (&queue->priv->qlock);
205   g_cond_init (&queue->priv->item_add);
206   g_cond_init (&queue->priv->item_del);
207   queue->priv->queue = gst_queue_array_new (50);
208 
209   GST_DEBUG ("initialized queue's not_empty & not_full conditions");
210 }
211 
212 /**
213  * gst_data_queue_new: (skip)
214  * @checkfull: the callback used to tell if the element considers the queue full
215  * or not.
216  * @fullcallback: the callback which will be called when the queue is considered full.
217  * @emptycallback: the callback which will be called when the queue is considered empty.
218  * @checkdata: a #gpointer that will be passed to the @checkfull, @fullcallback,
219  *   and @emptycallback callbacks.
220  *
221  * Creates a new #GstDataQueue. If @fullcallback or @emptycallback are supplied, then
222  * the #GstDataQueue will call the respective callback to signal full or empty condition.
223  * If the callbacks are NULL the #GstDataQueue will instead emit 'full' and 'empty'
224  * signals.
225  *
226  * Returns: a new #GstDataQueue.
227  *
228  * Since: 1.2
229  */
230 GstDataQueue *
gst_data_queue_new(GstDataQueueCheckFullFunction checkfull,GstDataQueueFullCallback fullcallback,GstDataQueueEmptyCallback emptycallback,gpointer checkdata)231 gst_data_queue_new (GstDataQueueCheckFullFunction checkfull,
232     GstDataQueueFullCallback fullcallback,
233     GstDataQueueEmptyCallback emptycallback, gpointer checkdata)
234 {
235   GstDataQueue *ret;
236 
237   g_return_val_if_fail (checkfull != NULL, NULL);
238 
239   ret = g_object_new (GST_TYPE_DATA_QUEUE, NULL);
240   ret->priv->checkfull = checkfull;
241   ret->priv->checkdata = checkdata;
242   ret->priv->fullcallback = fullcallback;
243   ret->priv->emptycallback = emptycallback;
244 
245   return ret;
246 }
247 
248 static void
gst_data_queue_cleanup(GstDataQueue * queue)249 gst_data_queue_cleanup (GstDataQueue * queue)
250 {
251   GstDataQueuePrivate *priv = queue->priv;
252 
253   while (!gst_queue_array_is_empty (priv->queue)) {
254     GstDataQueueItem *item = gst_queue_array_pop_head (priv->queue);
255 
256     /* Just call the destroy notify on the item */
257     item->destroy (item);
258   }
259   priv->cur_level.visible = 0;
260   priv->cur_level.bytes = 0;
261   priv->cur_level.time = 0;
262 }
263 
264 /* called only once, as opposed to dispose */
265 static void
gst_data_queue_finalize(GObject * object)266 gst_data_queue_finalize (GObject * object)
267 {
268   GstDataQueue *queue = GST_DATA_QUEUE (object);
269   GstDataQueuePrivate *priv = queue->priv;
270 
271   GST_DEBUG ("finalizing queue");
272 
273   gst_data_queue_cleanup (queue);
274   gst_queue_array_free (priv->queue);
275 
276   GST_DEBUG ("free mutex");
277   g_mutex_clear (&priv->qlock);
278   GST_DEBUG ("done free mutex");
279 
280   g_cond_clear (&priv->item_add);
281   g_cond_clear (&priv->item_del);
282 
283   G_OBJECT_CLASS (parent_class)->finalize (object);
284 }
285 
286 static inline void
gst_data_queue_locked_flush(GstDataQueue * queue)287 gst_data_queue_locked_flush (GstDataQueue * queue)
288 {
289   GstDataQueuePrivate *priv = queue->priv;
290 
291   STATUS (queue, "before flushing");
292   gst_data_queue_cleanup (queue);
293   STATUS (queue, "after flushing");
294   /* we deleted something... */
295   if (priv->waiting_del)
296     g_cond_signal (&priv->item_del);
297 }
298 
299 static inline gboolean
gst_data_queue_locked_is_empty(GstDataQueue * queue)300 gst_data_queue_locked_is_empty (GstDataQueue * queue)
301 {
302   GstDataQueuePrivate *priv = queue->priv;
303 
304   return (gst_queue_array_get_length (priv->queue) == 0);
305 }
306 
307 static inline gboolean
gst_data_queue_locked_is_full(GstDataQueue * queue)308 gst_data_queue_locked_is_full (GstDataQueue * queue)
309 {
310   GstDataQueuePrivate *priv = queue->priv;
311 
312   return priv->checkfull (queue, priv->cur_level.visible,
313       priv->cur_level.bytes, priv->cur_level.time, priv->checkdata);
314 }
315 
316 /**
317  * gst_data_queue_flush: (skip)
318  * @queue: a #GstDataQueue.
319  *
320  * Flushes all the contents of the @queue. Any call to #gst_data_queue_push and
321  * #gst_data_queue_pop will be released.
322  * MT safe.
323  *
324  * Since: 1.2
325  */
326 void
gst_data_queue_flush(GstDataQueue * queue)327 gst_data_queue_flush (GstDataQueue * queue)
328 {
329   GST_DEBUG ("queue:%p", queue);
330   GST_DATA_QUEUE_MUTEX_LOCK (queue);
331   gst_data_queue_locked_flush (queue);
332   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
333 }
334 
335 /**
336  * gst_data_queue_is_empty: (skip)
337  * @queue: a #GstDataQueue.
338  *
339  * Queries if there are any items in the @queue.
340  * MT safe.
341  *
342  * Returns: %TRUE if @queue is empty.
343  *
344  * Since: 1.2
345  */
346 gboolean
gst_data_queue_is_empty(GstDataQueue * queue)347 gst_data_queue_is_empty (GstDataQueue * queue)
348 {
349   gboolean res;
350 
351   GST_DATA_QUEUE_MUTEX_LOCK (queue);
352   res = gst_data_queue_locked_is_empty (queue);
353   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
354 
355   return res;
356 }
357 
358 /**
359  * gst_data_queue_is_full: (skip)
360  * @queue: a #GstDataQueue.
361  *
362  * Queries if @queue is full. This check will be done using the
363  * #GstDataQueueCheckFullFunction registered with @queue.
364  * MT safe.
365  *
366  * Returns: %TRUE if @queue is full.
367  *
368  * Since: 1.2
369  */
370 gboolean
gst_data_queue_is_full(GstDataQueue * queue)371 gst_data_queue_is_full (GstDataQueue * queue)
372 {
373   gboolean res;
374 
375   GST_DATA_QUEUE_MUTEX_LOCK (queue);
376   res = gst_data_queue_locked_is_full (queue);
377   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
378 
379   return res;
380 }
381 
382 /**
383  * gst_data_queue_set_flushing: (skip)
384  * @queue: a #GstDataQueue.
385  * @flushing: a #gboolean stating if the queue will be flushing or not.
386  *
387  * Sets the queue to flushing state if @flushing is %TRUE. If set to flushing
388  * state, any incoming data on the @queue will be discarded. Any call currently
389  * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
390  * away with a return value of %FALSE. While the @queue is in flushing state,
391  * all calls to those two functions will return %FALSE.
392  *
393  * MT Safe.
394  *
395  * Since: 1.2
396  */
397 void
gst_data_queue_set_flushing(GstDataQueue * queue,gboolean flushing)398 gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
399 {
400   GstDataQueuePrivate *priv = queue->priv;
401 
402   GST_DEBUG ("queue:%p , flushing:%d", queue, flushing);
403 
404   GST_DATA_QUEUE_MUTEX_LOCK (queue);
405   priv->flushing = flushing;
406   if (flushing) {
407     /* release push/pop functions */
408     if (priv->waiting_add)
409       g_cond_signal (&priv->item_add);
410     if (priv->waiting_del)
411       g_cond_signal (&priv->item_del);
412   }
413   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
414 }
415 
416 static void
gst_data_queue_push_force_unlocked(GstDataQueue * queue,GstDataQueueItem * item)417 gst_data_queue_push_force_unlocked (GstDataQueue * queue,
418     GstDataQueueItem * item)
419 {
420   GstDataQueuePrivate *priv = queue->priv;
421 
422   gst_queue_array_push_tail (priv->queue, item);
423 
424   if (item->visible)
425     priv->cur_level.visible++;
426   priv->cur_level.bytes += item->size;
427   priv->cur_level.time += item->duration;
428 }
429 
430 /**
431  * gst_data_queue_push_force: (skip)
432  * @queue: a #GstDataQueue.
433  * @item: a #GstDataQueueItem.
434  *
435  * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
436  * on the @queue. It ignores if the @queue is full or not and forces the @item
437  * to be pushed anyway.
438  * MT safe.
439  *
440  * Note that this function has slightly different semantics than gst_pad_push()
441  * and gst_pad_push_event(): this function only takes ownership of @item and
442  * the #GstMiniObject contained in @item if the push was successful. If %FALSE
443  * is returned, the caller is responsible for freeing @item and its contents.
444  *
445  * Returns: %TRUE if the @item was successfully pushed on the @queue.
446  *
447  * Since: 1.2
448  */
449 gboolean
gst_data_queue_push_force(GstDataQueue * queue,GstDataQueueItem * item)450 gst_data_queue_push_force (GstDataQueue * queue, GstDataQueueItem * item)
451 {
452   GstDataQueuePrivate *priv = queue->priv;
453 
454   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
455   g_return_val_if_fail (item != NULL, FALSE);
456 
457   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
458 
459   STATUS (queue, "before pushing");
460   gst_data_queue_push_force_unlocked (queue, item);
461   STATUS (queue, "after pushing");
462   if (priv->waiting_add)
463     g_cond_signal (&priv->item_add);
464 
465   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
466 
467   return TRUE;
468 
469   /* ERRORS */
470 flushing:
471   {
472     GST_DEBUG ("queue:%p, we are flushing", queue);
473     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
474     return FALSE;
475   }
476 }
477 
478 /**
479  * gst_data_queue_push: (skip)
480  * @queue: a #GstDataQueue.
481  * @item: a #GstDataQueueItem.
482  *
483  * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
484  * on the @queue. If the @queue is full, the call will block until space is
485  * available, OR the @queue is set to flushing state.
486  * MT safe.
487  *
488  * Note that this function has slightly different semantics than gst_pad_push()
489  * and gst_pad_push_event(): this function only takes ownership of @item and
490  * the #GstMiniObject contained in @item if the push was successful. If %FALSE
491  * is returned, the caller is responsible for freeing @item and its contents.
492  *
493  * Returns: %TRUE if the @item was successfully pushed on the @queue.
494  *
495  * Since: 1.2
496  */
497 gboolean
gst_data_queue_push(GstDataQueue * queue,GstDataQueueItem * item)498 gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
499 {
500   GstDataQueuePrivate *priv = queue->priv;
501 
502   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
503   g_return_val_if_fail (item != NULL, FALSE);
504 
505   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
506 
507   STATUS (queue, "before pushing");
508 
509   /* We ALWAYS need to check for queue fillness */
510   if (gst_data_queue_locked_is_full (queue)) {
511     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
512     if (G_LIKELY (priv->fullcallback))
513       priv->fullcallback (queue, priv->checkdata);
514     else
515       g_signal_emit (queue, gst_data_queue_signals[SIGNAL_FULL], 0);
516     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
517 
518     /* signal might have removed some items */
519     while (gst_data_queue_locked_is_full (queue)) {
520       priv->waiting_del = TRUE;
521       g_cond_wait (&priv->item_del, &priv->qlock);
522       priv->waiting_del = FALSE;
523       if (priv->flushing)
524         goto flushing;
525     }
526   }
527 
528   gst_data_queue_push_force_unlocked (queue, item);
529 
530   STATUS (queue, "after pushing");
531   if (priv->waiting_add)
532     g_cond_signal (&priv->item_add);
533 
534   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
535 
536   return TRUE;
537 
538   /* ERRORS */
539 flushing:
540   {
541     GST_DEBUG ("queue:%p, we are flushing", queue);
542     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
543     return FALSE;
544   }
545 }
546 
547 static gboolean
_gst_data_queue_wait_non_empty(GstDataQueue * queue)548 _gst_data_queue_wait_non_empty (GstDataQueue * queue)
549 {
550   GstDataQueuePrivate *priv = queue->priv;
551 
552   while (gst_data_queue_locked_is_empty (queue)) {
553     priv->waiting_add = TRUE;
554     g_cond_wait (&priv->item_add, &priv->qlock);
555     priv->waiting_add = FALSE;
556     if (priv->flushing)
557       return FALSE;
558   }
559   return TRUE;
560 }
561 
562 /**
563  * gst_data_queue_pop: (skip)
564  * @queue: a #GstDataQueue.
565  * @item: (out): pointer to store the returned #GstDataQueueItem.
566  *
567  * Retrieves the first @item available on the @queue. If the queue is currently
568  * empty, the call will block until at least one item is available, OR the
569  * @queue is set to the flushing state.
570  * MT safe.
571  *
572  * Returns: %TRUE if an @item was successfully retrieved from the @queue.
573  *
574  * Since: 1.2
575  */
576 gboolean
gst_data_queue_pop(GstDataQueue * queue,GstDataQueueItem ** item)577 gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
578 {
579   GstDataQueuePrivate *priv = queue->priv;
580 
581   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
582   g_return_val_if_fail (item != NULL, FALSE);
583 
584   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
585 
586   STATUS (queue, "before popping");
587 
588   if (gst_data_queue_locked_is_empty (queue)) {
589     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
590     if (G_LIKELY (priv->emptycallback))
591       priv->emptycallback (queue, priv->checkdata);
592     else
593       g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
594     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
595 
596     if (!_gst_data_queue_wait_non_empty (queue))
597       goto flushing;
598   }
599 
600   /* Get the item from the GQueue */
601   *item = gst_queue_array_pop_head (priv->queue);
602 
603   /* update current level counter */
604   if ((*item)->visible)
605     priv->cur_level.visible--;
606   priv->cur_level.bytes -= (*item)->size;
607   priv->cur_level.time -= (*item)->duration;
608 
609   STATUS (queue, "after popping");
610   if (priv->waiting_del)
611     g_cond_signal (&priv->item_del);
612 
613   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
614 
615   return TRUE;
616 
617   /* ERRORS */
618 flushing:
619   {
620     GST_DEBUG ("queue:%p, we are flushing", queue);
621     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
622     return FALSE;
623   }
624 }
625 
626 static gint
is_of_type(gconstpointer a,gconstpointer b)627 is_of_type (gconstpointer a, gconstpointer b)
628 {
629   return !G_TYPE_CHECK_INSTANCE_TYPE (a, GPOINTER_TO_SIZE (b));
630 }
631 
632 /**
633  * gst_data_queue_peek: (skip)
634  * @queue: a #GstDataQueue.
635  * @item: (out): pointer to store the returned #GstDataQueueItem.
636  *
637  * Retrieves the first @item available on the @queue without removing it.
638  * If the queue is currently empty, the call will block until at least
639  * one item is available, OR the @queue is set to the flushing state.
640  * MT safe.
641  *
642  * Returns: %TRUE if an @item was successfully retrieved from the @queue.
643  *
644  * Since: 1.2
645  */
646 gboolean
gst_data_queue_peek(GstDataQueue * queue,GstDataQueueItem ** item)647 gst_data_queue_peek (GstDataQueue * queue, GstDataQueueItem ** item)
648 {
649   GstDataQueuePrivate *priv = queue->priv;
650 
651   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
652   g_return_val_if_fail (item != NULL, FALSE);
653 
654   GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
655 
656   STATUS (queue, "before peeking");
657 
658   if (gst_data_queue_locked_is_empty (queue)) {
659     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
660     if (G_LIKELY (priv->emptycallback))
661       priv->emptycallback (queue, priv->checkdata);
662     else
663       g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
664     GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
665 
666     if (!_gst_data_queue_wait_non_empty (queue))
667       goto flushing;
668   }
669 
670   /* Get the item from the GQueue */
671   *item = gst_queue_array_peek_head (priv->queue);
672 
673   STATUS (queue, "after peeking");
674   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
675 
676   return TRUE;
677 
678   /* ERRORS */
679 flushing:
680   {
681     GST_DEBUG ("queue:%p, we are flushing", queue);
682     GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
683     return FALSE;
684   }
685 }
686 
687 /**
688  * gst_data_queue_drop_head: (skip)
689  * @queue: The #GstDataQueue to drop an item from.
690  * @type: The #GType of the item to drop.
691  *
692  * Pop and unref the head-most #GstMiniObject with the given #GType.
693  *
694  * Returns: %TRUE if an element was removed.
695  *
696  * Since: 1.2
697  */
698 gboolean
gst_data_queue_drop_head(GstDataQueue * queue,GType type)699 gst_data_queue_drop_head (GstDataQueue * queue, GType type)
700 {
701   gboolean res = FALSE;
702   GstDataQueueItem *leak = NULL;
703   guint idx;
704   GstDataQueuePrivate *priv = queue->priv;
705 
706   g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
707 
708   GST_DEBUG ("queue:%p", queue);
709 
710   GST_DATA_QUEUE_MUTEX_LOCK (queue);
711   idx = gst_queue_array_find (priv->queue, is_of_type, GSIZE_TO_POINTER (type));
712 
713   if (idx == -1)
714     goto done;
715 
716   leak = gst_queue_array_drop_element (priv->queue, idx);
717 
718   if (leak->visible)
719     priv->cur_level.visible--;
720   priv->cur_level.bytes -= leak->size;
721   priv->cur_level.time -= leak->duration;
722 
723   leak->destroy (leak);
724 
725   res = TRUE;
726 
727 done:
728   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
729 
730   GST_DEBUG ("queue:%p , res:%d", queue, res);
731 
732   return res;
733 }
734 
735 /**
736  * gst_data_queue_limits_changed: (skip)
737  * @queue: The #GstDataQueue
738  *
739  * Inform the queue that the limits for the fullness check have changed and that
740  * any blocking gst_data_queue_push() should be unblocked to recheck the limits.
741  *
742  * Since: 1.2
743  */
744 void
gst_data_queue_limits_changed(GstDataQueue * queue)745 gst_data_queue_limits_changed (GstDataQueue * queue)
746 {
747   GstDataQueuePrivate *priv = queue->priv;
748 
749   g_return_if_fail (GST_IS_DATA_QUEUE (queue));
750 
751   GST_DATA_QUEUE_MUTEX_LOCK (queue);
752   if (priv->waiting_del) {
753     GST_DEBUG ("signal del");
754     g_cond_signal (&priv->item_del);
755   }
756   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
757 }
758 
759 /**
760  * gst_data_queue_get_level: (skip)
761  * @queue: The #GstDataQueue
762  * @level: (out): the location to store the result
763  *
764  * Get the current level of the queue.
765  *
766  * Since: 1.2
767  */
768 void
gst_data_queue_get_level(GstDataQueue * queue,GstDataQueueSize * level)769 gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)
770 {
771   GstDataQueuePrivate *priv = queue->priv;
772 
773   memcpy (level, (&priv->cur_level), sizeof (GstDataQueueSize));
774 }
775 
776 static void
gst_data_queue_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)777 gst_data_queue_set_property (GObject * object,
778     guint prop_id, const GValue * value, GParamSpec * pspec)
779 {
780   switch (prop_id) {
781     default:
782       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
783       break;
784   }
785 }
786 
787 static void
gst_data_queue_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)788 gst_data_queue_get_property (GObject * object,
789     guint prop_id, GValue * value, GParamSpec * pspec)
790 {
791   GstDataQueue *queue = GST_DATA_QUEUE (object);
792   GstDataQueuePrivate *priv = queue->priv;
793 
794   GST_DATA_QUEUE_MUTEX_LOCK (queue);
795 
796   switch (prop_id) {
797     case PROP_CUR_LEVEL_BYTES:
798       g_value_set_uint (value, priv->cur_level.bytes);
799       break;
800     case PROP_CUR_LEVEL_VISIBLE:
801       g_value_set_uint (value, priv->cur_level.visible);
802       break;
803     case PROP_CUR_LEVEL_TIME:
804       g_value_set_uint64 (value, priv->cur_level.time);
805       break;
806     default:
807       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
808       break;
809   }
810 
811   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
812 }
813