1 /* GStreamer
2 * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3 *
4 * gstdataqueue.c:
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21
22 /**
23 * SECTION:gstdataqueue
24 * @title: GstDataQueue
25 * @short_description: Threadsafe queueing object
26 *
27 * #GstDataQueue is an object that handles threadsafe queueing of objects. It
28 * also provides size-related functionality. This object should be used for
29 * any #GstElement that wishes to provide some sort of queueing functionality.
30 */
31 #ifdef HAVE_CONFIG_H
32 #include "config.h"
33 #endif
34
35 #include <gst/gst.h>
36 #include "string.h"
37 #include "gstdataqueue.h"
38 #include "gstqueuearray.h"
39 #include "gst/glib-compat-private.h"
40
41 GST_DEBUG_CATEGORY_STATIC (data_queue_debug);
42 #define GST_CAT_DEFAULT (data_queue_debug)
43 GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow);
44
45
46 /* Queue signals and args */
47 enum
48 {
49 SIGNAL_EMPTY,
50 SIGNAL_FULL,
51 LAST_SIGNAL
52 };
53
54 enum
55 {
56 PROP_0,
57 PROP_CUR_LEVEL_VISIBLE,
58 PROP_CUR_LEVEL_BYTES,
59 PROP_CUR_LEVEL_TIME
60 /* FILL ME */
61 };
62
63 struct _GstDataQueuePrivate
64 {
65 /* the array of data we're keeping our grubby hands on */
66 GstQueueArray *queue;
67
68 GstDataQueueSize cur_level; /* size of the queue */
69 GstDataQueueCheckFullFunction checkfull; /* Callback to check if the queue is full */
70 gpointer *checkdata;
71
72 GMutex qlock; /* lock for queue (vs object lock) */
73 gboolean waiting_add;
74 GCond item_add; /* signals buffers now available for reading */
75 gboolean waiting_del;
76 GCond item_del; /* signals space now available for writing */
77 gboolean flushing; /* indicates whether conditions where signalled because
78 * of external flushing */
79 GstDataQueueFullCallback fullcallback;
80 GstDataQueueEmptyCallback emptycallback;
81 };
82
83 #define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
84 GST_CAT_TRACE (data_queue_dataflow, \
85 "locking qlock from thread %p", \
86 g_thread_self ()); \
87 g_mutex_lock (&q->priv->qlock); \
88 GST_CAT_TRACE (data_queue_dataflow, \
89 "locked qlock from thread %p", \
90 g_thread_self ()); \
91 } G_STMT_END
92
93 #define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START { \
94 GST_DATA_QUEUE_MUTEX_LOCK (q); \
95 if (q->priv->flushing) \
96 goto label; \
97 } G_STMT_END
98
99 #define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
100 GST_CAT_TRACE (data_queue_dataflow, \
101 "unlocking qlock from thread %p", \
102 g_thread_self ()); \
103 g_mutex_unlock (&q->priv->qlock); \
104 } G_STMT_END
105
106 #define STATUS(q, msg) \
107 GST_CAT_LOG (data_queue_dataflow, \
108 "queue:%p " msg ": %u visible items, %u " \
109 "bytes, %"G_GUINT64_FORMAT \
110 " ns, %u elements", \
111 queue, \
112 q->priv->cur_level.visible, \
113 q->priv->cur_level.bytes, \
114 q->priv->cur_level.time, \
115 gst_queue_array_get_length (q->priv->queue))
116
117 static void gst_data_queue_finalize (GObject * object);
118
119 static void gst_data_queue_set_property (GObject * object,
120 guint prop_id, const GValue * value, GParamSpec * pspec);
121 static void gst_data_queue_get_property (GObject * object,
122 guint prop_id, GValue * value, GParamSpec * pspec);
123
124 static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };
125
126 #define _do_init \
127 { \
128 GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0, \
129 "data queue object"); \
130 GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0, \
131 "dataflow inside the data queue object"); \
132 }
133
134 #define parent_class gst_data_queue_parent_class
135 G_DEFINE_TYPE_WITH_CODE (GstDataQueue, gst_data_queue, G_TYPE_OBJECT,
136 G_ADD_PRIVATE (GstDataQueue) _do_init);
137
138 static void
gst_data_queue_class_init(GstDataQueueClass * klass)139 gst_data_queue_class_init (GstDataQueueClass * klass)
140 {
141 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
142
143 gobject_class->set_property = gst_data_queue_set_property;
144 gobject_class->get_property = gst_data_queue_get_property;
145
146 /* signals */
147 /**
148 * GstDataQueue::empty: (skip)
149 * @queue: the queue instance
150 *
151 * Reports that the queue became empty (empty).
152 * A queue is empty if the total amount of visible items inside it (num-visible, time,
153 * size) is lower than the boundary values which can be set through the GObject
154 * properties.
155 */
156 gst_data_queue_signals[SIGNAL_EMPTY] =
157 g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
158 G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL,
159 NULL, G_TYPE_NONE, 0);
160
161 /**
162 * GstDataQueue::full: (skip)
163 * @queue: the queue instance
164 *
165 * Reports that the queue became full (full).
166 * A queue is full if the total amount of data inside it (num-visible, time,
167 * size) is higher than the boundary values which can be set through the GObject
168 * properties.
169 */
170 gst_data_queue_signals[SIGNAL_FULL] =
171 g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
172 G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL,
173 NULL, G_TYPE_NONE, 0);
174
175 /* properties */
176 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
177 g_param_spec_uint ("current-level-bytes", "Current level (kB)",
178 "Current amount of data in the queue (bytes)",
179 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
180 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_VISIBLE,
181 g_param_spec_uint ("current-level-visible",
182 "Current level (visible items)",
183 "Current number of visible items in the queue", 0, G_MAXUINT, 0,
184 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
185 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
186 g_param_spec_uint64 ("current-level-time", "Current level (ns)",
187 "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0,
188 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
189
190 gobject_class->finalize = gst_data_queue_finalize;
191 }
192
193 static void
gst_data_queue_init(GstDataQueue * queue)194 gst_data_queue_init (GstDataQueue * queue)
195 {
196 queue->priv = gst_data_queue_get_instance_private (queue);
197
198 queue->priv->cur_level.visible = 0; /* no content */
199 queue->priv->cur_level.bytes = 0; /* no content */
200 queue->priv->cur_level.time = 0; /* no content */
201
202 queue->priv->checkfull = NULL;
203
204 g_mutex_init (&queue->priv->qlock);
205 g_cond_init (&queue->priv->item_add);
206 g_cond_init (&queue->priv->item_del);
207 queue->priv->queue = gst_queue_array_new (50);
208
209 GST_DEBUG ("initialized queue's not_empty & not_full conditions");
210 }
211
212 /**
213 * gst_data_queue_new: (skip)
214 * @checkfull: the callback used to tell if the element considers the queue full
215 * or not.
216 * @fullcallback: the callback which will be called when the queue is considered full.
217 * @emptycallback: the callback which will be called when the queue is considered empty.
218 * @checkdata: a #gpointer that will be passed to the @checkfull, @fullcallback,
219 * and @emptycallback callbacks.
220 *
221 * Creates a new #GstDataQueue. If @fullcallback or @emptycallback are supplied, then
222 * the #GstDataQueue will call the respective callback to signal full or empty condition.
223 * If the callbacks are NULL the #GstDataQueue will instead emit 'full' and 'empty'
224 * signals.
225 *
226 * Returns: a new #GstDataQueue.
227 *
228 * Since: 1.2
229 */
230 GstDataQueue *
gst_data_queue_new(GstDataQueueCheckFullFunction checkfull,GstDataQueueFullCallback fullcallback,GstDataQueueEmptyCallback emptycallback,gpointer checkdata)231 gst_data_queue_new (GstDataQueueCheckFullFunction checkfull,
232 GstDataQueueFullCallback fullcallback,
233 GstDataQueueEmptyCallback emptycallback, gpointer checkdata)
234 {
235 GstDataQueue *ret;
236
237 g_return_val_if_fail (checkfull != NULL, NULL);
238
239 ret = g_object_new (GST_TYPE_DATA_QUEUE, NULL);
240 ret->priv->checkfull = checkfull;
241 ret->priv->checkdata = checkdata;
242 ret->priv->fullcallback = fullcallback;
243 ret->priv->emptycallback = emptycallback;
244
245 return ret;
246 }
247
248 static void
gst_data_queue_cleanup(GstDataQueue * queue)249 gst_data_queue_cleanup (GstDataQueue * queue)
250 {
251 GstDataQueuePrivate *priv = queue->priv;
252
253 while (!gst_queue_array_is_empty (priv->queue)) {
254 GstDataQueueItem *item = gst_queue_array_pop_head (priv->queue);
255
256 /* Just call the destroy notify on the item */
257 item->destroy (item);
258 }
259 priv->cur_level.visible = 0;
260 priv->cur_level.bytes = 0;
261 priv->cur_level.time = 0;
262 }
263
264 /* called only once, as opposed to dispose */
265 static void
gst_data_queue_finalize(GObject * object)266 gst_data_queue_finalize (GObject * object)
267 {
268 GstDataQueue *queue = GST_DATA_QUEUE (object);
269 GstDataQueuePrivate *priv = queue->priv;
270
271 GST_DEBUG ("finalizing queue");
272
273 gst_data_queue_cleanup (queue);
274 gst_queue_array_free (priv->queue);
275
276 GST_DEBUG ("free mutex");
277 g_mutex_clear (&priv->qlock);
278 GST_DEBUG ("done free mutex");
279
280 g_cond_clear (&priv->item_add);
281 g_cond_clear (&priv->item_del);
282
283 G_OBJECT_CLASS (parent_class)->finalize (object);
284 }
285
286 static inline void
gst_data_queue_locked_flush(GstDataQueue * queue)287 gst_data_queue_locked_flush (GstDataQueue * queue)
288 {
289 GstDataQueuePrivate *priv = queue->priv;
290
291 STATUS (queue, "before flushing");
292 gst_data_queue_cleanup (queue);
293 STATUS (queue, "after flushing");
294 /* we deleted something... */
295 if (priv->waiting_del)
296 g_cond_signal (&priv->item_del);
297 }
298
299 static inline gboolean
gst_data_queue_locked_is_empty(GstDataQueue * queue)300 gst_data_queue_locked_is_empty (GstDataQueue * queue)
301 {
302 GstDataQueuePrivate *priv = queue->priv;
303
304 return (gst_queue_array_get_length (priv->queue) == 0);
305 }
306
307 static inline gboolean
gst_data_queue_locked_is_full(GstDataQueue * queue)308 gst_data_queue_locked_is_full (GstDataQueue * queue)
309 {
310 GstDataQueuePrivate *priv = queue->priv;
311
312 return priv->checkfull (queue, priv->cur_level.visible,
313 priv->cur_level.bytes, priv->cur_level.time, priv->checkdata);
314 }
315
316 /**
317 * gst_data_queue_flush: (skip)
318 * @queue: a #GstDataQueue.
319 *
320 * Flushes all the contents of the @queue. Any call to #gst_data_queue_push and
321 * #gst_data_queue_pop will be released.
322 * MT safe.
323 *
324 * Since: 1.2
325 */
326 void
gst_data_queue_flush(GstDataQueue * queue)327 gst_data_queue_flush (GstDataQueue * queue)
328 {
329 GST_DEBUG ("queue:%p", queue);
330 GST_DATA_QUEUE_MUTEX_LOCK (queue);
331 gst_data_queue_locked_flush (queue);
332 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
333 }
334
335 /**
336 * gst_data_queue_is_empty: (skip)
337 * @queue: a #GstDataQueue.
338 *
339 * Queries if there are any items in the @queue.
340 * MT safe.
341 *
342 * Returns: %TRUE if @queue is empty.
343 *
344 * Since: 1.2
345 */
346 gboolean
gst_data_queue_is_empty(GstDataQueue * queue)347 gst_data_queue_is_empty (GstDataQueue * queue)
348 {
349 gboolean res;
350
351 GST_DATA_QUEUE_MUTEX_LOCK (queue);
352 res = gst_data_queue_locked_is_empty (queue);
353 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
354
355 return res;
356 }
357
358 /**
359 * gst_data_queue_is_full: (skip)
360 * @queue: a #GstDataQueue.
361 *
362 * Queries if @queue is full. This check will be done using the
363 * #GstDataQueueCheckFullFunction registered with @queue.
364 * MT safe.
365 *
366 * Returns: %TRUE if @queue is full.
367 *
368 * Since: 1.2
369 */
370 gboolean
gst_data_queue_is_full(GstDataQueue * queue)371 gst_data_queue_is_full (GstDataQueue * queue)
372 {
373 gboolean res;
374
375 GST_DATA_QUEUE_MUTEX_LOCK (queue);
376 res = gst_data_queue_locked_is_full (queue);
377 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
378
379 return res;
380 }
381
382 /**
383 * gst_data_queue_set_flushing: (skip)
384 * @queue: a #GstDataQueue.
385 * @flushing: a #gboolean stating if the queue will be flushing or not.
386 *
387 * Sets the queue to flushing state if @flushing is %TRUE. If set to flushing
388 * state, any incoming data on the @queue will be discarded. Any call currently
389 * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
390 * away with a return value of %FALSE. While the @queue is in flushing state,
391 * all calls to those two functions will return %FALSE.
392 *
393 * MT Safe.
394 *
395 * Since: 1.2
396 */
397 void
gst_data_queue_set_flushing(GstDataQueue * queue,gboolean flushing)398 gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
399 {
400 GstDataQueuePrivate *priv = queue->priv;
401
402 GST_DEBUG ("queue:%p , flushing:%d", queue, flushing);
403
404 GST_DATA_QUEUE_MUTEX_LOCK (queue);
405 priv->flushing = flushing;
406 if (flushing) {
407 /* release push/pop functions */
408 if (priv->waiting_add)
409 g_cond_signal (&priv->item_add);
410 if (priv->waiting_del)
411 g_cond_signal (&priv->item_del);
412 }
413 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
414 }
415
416 static void
gst_data_queue_push_force_unlocked(GstDataQueue * queue,GstDataQueueItem * item)417 gst_data_queue_push_force_unlocked (GstDataQueue * queue,
418 GstDataQueueItem * item)
419 {
420 GstDataQueuePrivate *priv = queue->priv;
421
422 gst_queue_array_push_tail (priv->queue, item);
423
424 if (item->visible)
425 priv->cur_level.visible++;
426 priv->cur_level.bytes += item->size;
427 priv->cur_level.time += item->duration;
428 }
429
430 /**
431 * gst_data_queue_push_force: (skip)
432 * @queue: a #GstDataQueue.
433 * @item: a #GstDataQueueItem.
434 *
435 * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
436 * on the @queue. It ignores if the @queue is full or not and forces the @item
437 * to be pushed anyway.
438 * MT safe.
439 *
440 * Note that this function has slightly different semantics than gst_pad_push()
441 * and gst_pad_push_event(): this function only takes ownership of @item and
442 * the #GstMiniObject contained in @item if the push was successful. If %FALSE
443 * is returned, the caller is responsible for freeing @item and its contents.
444 *
445 * Returns: %TRUE if the @item was successfully pushed on the @queue.
446 *
447 * Since: 1.2
448 */
449 gboolean
gst_data_queue_push_force(GstDataQueue * queue,GstDataQueueItem * item)450 gst_data_queue_push_force (GstDataQueue * queue, GstDataQueueItem * item)
451 {
452 GstDataQueuePrivate *priv = queue->priv;
453
454 g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
455 g_return_val_if_fail (item != NULL, FALSE);
456
457 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
458
459 STATUS (queue, "before pushing");
460 gst_data_queue_push_force_unlocked (queue, item);
461 STATUS (queue, "after pushing");
462 if (priv->waiting_add)
463 g_cond_signal (&priv->item_add);
464
465 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
466
467 return TRUE;
468
469 /* ERRORS */
470 flushing:
471 {
472 GST_DEBUG ("queue:%p, we are flushing", queue);
473 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
474 return FALSE;
475 }
476 }
477
478 /**
479 * gst_data_queue_push: (skip)
480 * @queue: a #GstDataQueue.
481 * @item: a #GstDataQueueItem.
482 *
483 * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
484 * on the @queue. If the @queue is full, the call will block until space is
485 * available, OR the @queue is set to flushing state.
486 * MT safe.
487 *
488 * Note that this function has slightly different semantics than gst_pad_push()
489 * and gst_pad_push_event(): this function only takes ownership of @item and
490 * the #GstMiniObject contained in @item if the push was successful. If %FALSE
491 * is returned, the caller is responsible for freeing @item and its contents.
492 *
493 * Returns: %TRUE if the @item was successfully pushed on the @queue.
494 *
495 * Since: 1.2
496 */
497 gboolean
gst_data_queue_push(GstDataQueue * queue,GstDataQueueItem * item)498 gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
499 {
500 GstDataQueuePrivate *priv = queue->priv;
501
502 g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
503 g_return_val_if_fail (item != NULL, FALSE);
504
505 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
506
507 STATUS (queue, "before pushing");
508
509 /* We ALWAYS need to check for queue fillness */
510 if (gst_data_queue_locked_is_full (queue)) {
511 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
512 if (G_LIKELY (priv->fullcallback))
513 priv->fullcallback (queue, priv->checkdata);
514 else
515 g_signal_emit (queue, gst_data_queue_signals[SIGNAL_FULL], 0);
516 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
517
518 /* signal might have removed some items */
519 while (gst_data_queue_locked_is_full (queue)) {
520 priv->waiting_del = TRUE;
521 g_cond_wait (&priv->item_del, &priv->qlock);
522 priv->waiting_del = FALSE;
523 if (priv->flushing)
524 goto flushing;
525 }
526 }
527
528 gst_data_queue_push_force_unlocked (queue, item);
529
530 STATUS (queue, "after pushing");
531 if (priv->waiting_add)
532 g_cond_signal (&priv->item_add);
533
534 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
535
536 return TRUE;
537
538 /* ERRORS */
539 flushing:
540 {
541 GST_DEBUG ("queue:%p, we are flushing", queue);
542 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
543 return FALSE;
544 }
545 }
546
547 static gboolean
_gst_data_queue_wait_non_empty(GstDataQueue * queue)548 _gst_data_queue_wait_non_empty (GstDataQueue * queue)
549 {
550 GstDataQueuePrivate *priv = queue->priv;
551
552 while (gst_data_queue_locked_is_empty (queue)) {
553 priv->waiting_add = TRUE;
554 g_cond_wait (&priv->item_add, &priv->qlock);
555 priv->waiting_add = FALSE;
556 if (priv->flushing)
557 return FALSE;
558 }
559 return TRUE;
560 }
561
562 /**
563 * gst_data_queue_pop: (skip)
564 * @queue: a #GstDataQueue.
565 * @item: (out): pointer to store the returned #GstDataQueueItem.
566 *
567 * Retrieves the first @item available on the @queue. If the queue is currently
568 * empty, the call will block until at least one item is available, OR the
569 * @queue is set to the flushing state.
570 * MT safe.
571 *
572 * Returns: %TRUE if an @item was successfully retrieved from the @queue.
573 *
574 * Since: 1.2
575 */
576 gboolean
gst_data_queue_pop(GstDataQueue * queue,GstDataQueueItem ** item)577 gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
578 {
579 GstDataQueuePrivate *priv = queue->priv;
580
581 g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
582 g_return_val_if_fail (item != NULL, FALSE);
583
584 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
585
586 STATUS (queue, "before popping");
587
588 if (gst_data_queue_locked_is_empty (queue)) {
589 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
590 if (G_LIKELY (priv->emptycallback))
591 priv->emptycallback (queue, priv->checkdata);
592 else
593 g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
594 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
595
596 if (!_gst_data_queue_wait_non_empty (queue))
597 goto flushing;
598 }
599
600 /* Get the item from the GQueue */
601 *item = gst_queue_array_pop_head (priv->queue);
602
603 /* update current level counter */
604 if ((*item)->visible)
605 priv->cur_level.visible--;
606 priv->cur_level.bytes -= (*item)->size;
607 priv->cur_level.time -= (*item)->duration;
608
609 STATUS (queue, "after popping");
610 if (priv->waiting_del)
611 g_cond_signal (&priv->item_del);
612
613 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
614
615 return TRUE;
616
617 /* ERRORS */
618 flushing:
619 {
620 GST_DEBUG ("queue:%p, we are flushing", queue);
621 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
622 return FALSE;
623 }
624 }
625
626 static gint
is_of_type(gconstpointer a,gconstpointer b)627 is_of_type (gconstpointer a, gconstpointer b)
628 {
629 return !G_TYPE_CHECK_INSTANCE_TYPE (a, GPOINTER_TO_SIZE (b));
630 }
631
632 /**
633 * gst_data_queue_peek: (skip)
634 * @queue: a #GstDataQueue.
635 * @item: (out): pointer to store the returned #GstDataQueueItem.
636 *
637 * Retrieves the first @item available on the @queue without removing it.
638 * If the queue is currently empty, the call will block until at least
639 * one item is available, OR the @queue is set to the flushing state.
640 * MT safe.
641 *
642 * Returns: %TRUE if an @item was successfully retrieved from the @queue.
643 *
644 * Since: 1.2
645 */
646 gboolean
gst_data_queue_peek(GstDataQueue * queue,GstDataQueueItem ** item)647 gst_data_queue_peek (GstDataQueue * queue, GstDataQueueItem ** item)
648 {
649 GstDataQueuePrivate *priv = queue->priv;
650
651 g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
652 g_return_val_if_fail (item != NULL, FALSE);
653
654 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
655
656 STATUS (queue, "before peeking");
657
658 if (gst_data_queue_locked_is_empty (queue)) {
659 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
660 if (G_LIKELY (priv->emptycallback))
661 priv->emptycallback (queue, priv->checkdata);
662 else
663 g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
664 GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
665
666 if (!_gst_data_queue_wait_non_empty (queue))
667 goto flushing;
668 }
669
670 /* Get the item from the GQueue */
671 *item = gst_queue_array_peek_head (priv->queue);
672
673 STATUS (queue, "after peeking");
674 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
675
676 return TRUE;
677
678 /* ERRORS */
679 flushing:
680 {
681 GST_DEBUG ("queue:%p, we are flushing", queue);
682 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
683 return FALSE;
684 }
685 }
686
687 /**
688 * gst_data_queue_drop_head: (skip)
689 * @queue: The #GstDataQueue to drop an item from.
690 * @type: The #GType of the item to drop.
691 *
692 * Pop and unref the head-most #GstMiniObject with the given #GType.
693 *
694 * Returns: %TRUE if an element was removed.
695 *
696 * Since: 1.2
697 */
698 gboolean
gst_data_queue_drop_head(GstDataQueue * queue,GType type)699 gst_data_queue_drop_head (GstDataQueue * queue, GType type)
700 {
701 gboolean res = FALSE;
702 GstDataQueueItem *leak = NULL;
703 guint idx;
704 GstDataQueuePrivate *priv = queue->priv;
705
706 g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
707
708 GST_DEBUG ("queue:%p", queue);
709
710 GST_DATA_QUEUE_MUTEX_LOCK (queue);
711 idx = gst_queue_array_find (priv->queue, is_of_type, GSIZE_TO_POINTER (type));
712
713 if (idx == -1)
714 goto done;
715
716 leak = gst_queue_array_drop_element (priv->queue, idx);
717
718 if (leak->visible)
719 priv->cur_level.visible--;
720 priv->cur_level.bytes -= leak->size;
721 priv->cur_level.time -= leak->duration;
722
723 leak->destroy (leak);
724
725 res = TRUE;
726
727 done:
728 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
729
730 GST_DEBUG ("queue:%p , res:%d", queue, res);
731
732 return res;
733 }
734
735 /**
736 * gst_data_queue_limits_changed: (skip)
737 * @queue: The #GstDataQueue
738 *
739 * Inform the queue that the limits for the fullness check have changed and that
740 * any blocking gst_data_queue_push() should be unblocked to recheck the limits.
741 *
742 * Since: 1.2
743 */
744 void
gst_data_queue_limits_changed(GstDataQueue * queue)745 gst_data_queue_limits_changed (GstDataQueue * queue)
746 {
747 GstDataQueuePrivate *priv = queue->priv;
748
749 g_return_if_fail (GST_IS_DATA_QUEUE (queue));
750
751 GST_DATA_QUEUE_MUTEX_LOCK (queue);
752 if (priv->waiting_del) {
753 GST_DEBUG ("signal del");
754 g_cond_signal (&priv->item_del);
755 }
756 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
757 }
758
759 /**
760 * gst_data_queue_get_level: (skip)
761 * @queue: The #GstDataQueue
762 * @level: (out): the location to store the result
763 *
764 * Get the current level of the queue.
765 *
766 * Since: 1.2
767 */
768 void
gst_data_queue_get_level(GstDataQueue * queue,GstDataQueueSize * level)769 gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)
770 {
771 GstDataQueuePrivate *priv = queue->priv;
772
773 memcpy (level, (&priv->cur_level), sizeof (GstDataQueueSize));
774 }
775
776 static void
gst_data_queue_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)777 gst_data_queue_set_property (GObject * object,
778 guint prop_id, const GValue * value, GParamSpec * pspec)
779 {
780 switch (prop_id) {
781 default:
782 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
783 break;
784 }
785 }
786
787 static void
gst_data_queue_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)788 gst_data_queue_get_property (GObject * object,
789 guint prop_id, GValue * value, GParamSpec * pspec)
790 {
791 GstDataQueue *queue = GST_DATA_QUEUE (object);
792 GstDataQueuePrivate *priv = queue->priv;
793
794 GST_DATA_QUEUE_MUTEX_LOCK (queue);
795
796 switch (prop_id) {
797 case PROP_CUR_LEVEL_BYTES:
798 g_value_set_uint (value, priv->cur_level.bytes);
799 break;
800 case PROP_CUR_LEVEL_VISIBLE:
801 g_value_set_uint (value, priv->cur_level.visible);
802 break;
803 case PROP_CUR_LEVEL_TIME:
804 g_value_set_uint64 (value, priv->cur_level.time);
805 break;
806 default:
807 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
808 break;
809 }
810
811 GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
812 }
813