1 /* GStreamer
2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2000 Wim Taymans <wtay@chello.be>
4 * 2003 Colin Walters <cwalters@gnome.org>
5 * 2005 Wim Taymans <wim@fluendo.com>
6 *
7 * gstqueue.c:
8 *
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Library General Public
11 * License as published by the Free Software Foundation; either
12 * version 2 of the License, or (at your option) any later version.
13 *
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Library General Public License for more details.
18 *
19 * You should have received a copy of the GNU Library General Public
20 * License along with this library; if not, write to the
21 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22 * Boston, MA 02110-1301, USA.
23 */
24
25 /**
26 * SECTION:element-queue
27 * @title: queue
28 *
29 * Data is queued until one of the limits specified by the
30 * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or
31 * #GstQueue:max-size-time properties has been reached. Any attempt to push
32 * more buffers into the queue will block the pushing thread until more space
33 * becomes available.
34 *
35 * The queue will create a new thread on the source pad to decouple the
36 * processing on sink and source pad.
37 *
38 * You can query how many buffers are queued by reading the
39 * #GstQueue:current-level-buffers property. You can track changes
40 * by connecting to the notify::current-level-buffers signal (which
41 * like all signals will be emitted from the streaming thread). The same
42 * applies to the #GstQueue:current-level-time and
43 * #GstQueue:current-level-bytes properties.
44 *
45 * The default queue size limits are 200 buffers, 10MB of data, or
46 * one second worth of data, whichever is reached first.
47 *
48 * As said earlier, the queue blocks by default when one of the specified
49 * maximums (bytes, time, buffers) has been reached. You can set the
50 * #GstQueue:leaky property to specify that instead of blocking it should
51 * leak (drop) new or old buffers.
52 *
53 * The #GstQueue::underrun signal is emitted when the queue has less data than
54 * the specified minimum thresholds require (by default: when the queue is
55 * empty). The #GstQueue::overrun signal is emitted when the queue is filled
56 * up. Both signals are emitted from the context of the streaming thread.
57 */
58
59 #include "gst/gst_private.h"
60
61 #include <gst/gst.h>
62 #include "gstqueue.h"
63 #include "gstcoreelementselements.h"
64
65 #include "../../gst/gst-i18n-lib.h"
66 #include "../../gst/glib-compat-private.h"
67
68 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
69 GST_PAD_SINK,
70 GST_PAD_ALWAYS,
71 GST_STATIC_CAPS_ANY);
72
73 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
74 GST_PAD_SRC,
75 GST_PAD_ALWAYS,
76 GST_STATIC_CAPS_ANY);
77
78 GST_DEBUG_CATEGORY_STATIC (queue_debug);
79 #define GST_CAT_DEFAULT (queue_debug)
80 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
81
82 #define STATUS(queue, pad, msg) \
83 GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
84 "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
85 "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
86 "-%" G_GUINT64_FORMAT " ns, %u items", \
87 GST_DEBUG_PAD_NAME (pad), \
88 queue->cur_level.buffers, \
89 queue->min_threshold.buffers, \
90 queue->max_size.buffers, \
91 queue->cur_level.bytes, \
92 queue->min_threshold.bytes, \
93 queue->max_size.bytes, \
94 queue->cur_level.time, \
95 queue->min_threshold.time, \
96 queue->max_size.time, \
97 gst_queue_array_get_length (queue->queue))
98
99 /* Queue signals and args */
100 enum
101 {
102 SIGNAL_UNDERRUN,
103 SIGNAL_RUNNING,
104 SIGNAL_OVERRUN,
105 SIGNAL_PUSHING,
106 LAST_SIGNAL
107 };
108
109 enum
110 {
111 PROP_0,
112 /* FIXME: don't we have another way of doing this
113 * "Gstreamer format" (frame/byte/time) queries? */
114 PROP_CUR_LEVEL_BUFFERS,
115 PROP_CUR_LEVEL_BYTES,
116 PROP_CUR_LEVEL_TIME,
117 PROP_MAX_SIZE_BUFFERS,
118 PROP_MAX_SIZE_BYTES,
119 PROP_MAX_SIZE_TIME,
120 PROP_MIN_THRESHOLD_BUFFERS,
121 PROP_MIN_THRESHOLD_BYTES,
122 PROP_MIN_THRESHOLD_TIME,
123 PROP_LEAKY,
124 PROP_SILENT,
125 PROP_FLUSH_ON_EOS
126 };
127
128 /* default property values */
129 #define DEFAULT_MAX_SIZE_BUFFERS 200 /* 200 buffers */
130 #define DEFAULT_MAX_SIZE_BYTES (10 * 1024 * 1024) /* 10 MB */
131 #define DEFAULT_MAX_SIZE_TIME GST_SECOND /* 1 second */
132
133 #define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
134 g_mutex_lock (&q->qlock); \
135 } G_STMT_END
136
137 #define GST_QUEUE_MUTEX_LOCK_CHECK(q,label) G_STMT_START { \
138 GST_QUEUE_MUTEX_LOCK (q); \
139 if (q->srcresult != GST_FLOW_OK) \
140 goto label; \
141 } G_STMT_END
142
143 #define GST_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
144 g_mutex_unlock (&q->qlock); \
145 } G_STMT_END
146
147 #define GST_QUEUE_WAIT_DEL_CHECK(q, label) G_STMT_START { \
148 STATUS (q, q->sinkpad, "wait for DEL"); \
149 q->waiting_del = TRUE; \
150 g_cond_wait (&q->item_del, &q->qlock); \
151 q->waiting_del = FALSE; \
152 if (q->srcresult != GST_FLOW_OK) { \
153 STATUS (q, q->srcpad, "received DEL wakeup"); \
154 goto label; \
155 } \
156 STATUS (q, q->sinkpad, "received DEL"); \
157 } G_STMT_END
158
159 #define GST_QUEUE_WAIT_ADD_CHECK(q, label) G_STMT_START { \
160 STATUS (q, q->srcpad, "wait for ADD"); \
161 q->waiting_add = TRUE; \
162 g_cond_wait (&q->item_add, &q->qlock); \
163 q->waiting_add = FALSE; \
164 if (q->srcresult != GST_FLOW_OK) { \
165 STATUS (q, q->srcpad, "received ADD wakeup"); \
166 goto label; \
167 } \
168 STATUS (q, q->srcpad, "received ADD"); \
169 } G_STMT_END
170
171 #define GST_QUEUE_SIGNAL_DEL(q) G_STMT_START { \
172 if (q->waiting_del) { \
173 STATUS (q, q->srcpad, "signal DEL"); \
174 g_cond_signal (&q->item_del); \
175 } \
176 } G_STMT_END
177
178 #define GST_QUEUE_SIGNAL_ADD(q) G_STMT_START { \
179 if (q->waiting_add) { \
180 STATUS (q, q->sinkpad, "signal ADD"); \
181 g_cond_signal (&q->item_add); \
182 } \
183 } G_STMT_END
184
185 #define _do_init \
186 GST_DEBUG_CATEGORY_INIT (queue_debug, "queue", 0, "queue element"); \
187 GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue_dataflow", 0, \
188 "dataflow inside the queue element");
189 #define gst_queue_parent_class parent_class
190 G_DEFINE_TYPE_WITH_CODE (GstQueue, gst_queue, GST_TYPE_ELEMENT, _do_init);
191 GST_ELEMENT_REGISTER_DEFINE (queue, "queue", GST_RANK_NONE, GST_TYPE_QUEUE);
192
193 static void gst_queue_finalize (GObject * object);
194 static void gst_queue_set_property (GObject * object,
195 guint prop_id, const GValue * value, GParamSpec * pspec);
196 static void gst_queue_get_property (GObject * object,
197 guint prop_id, GValue * value, GParamSpec * pspec);
198
199 static GstFlowReturn gst_queue_chain (GstPad * pad, GstObject * parent,
200 GstBuffer * buffer);
201 static GstFlowReturn gst_queue_chain_list (GstPad * pad, GstObject * parent,
202 GstBufferList * buffer_list);
203 static GstFlowReturn gst_queue_push_one (GstQueue * queue);
204 static void gst_queue_loop (GstPad * pad);
205
206 static GstFlowReturn gst_queue_handle_sink_event (GstPad * pad,
207 GstObject * parent, GstEvent * event);
208 static gboolean gst_queue_handle_sink_query (GstPad * pad, GstObject * parent,
209 GstQuery * query);
210
211 static gboolean gst_queue_handle_src_event (GstPad * pad, GstObject * parent,
212 GstEvent * event);
213 static gboolean gst_queue_handle_src_query (GstPad * pad, GstObject * parent,
214 GstQuery * query);
215
216 static void gst_queue_locked_flush (GstQueue * queue, gboolean full);
217
218 static gboolean gst_queue_src_activate_mode (GstPad * pad, GstObject * parent,
219 GstPadMode mode, gboolean active);
220 static gboolean gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent,
221 GstPadMode mode, gboolean active);
222
223 static gboolean gst_queue_is_empty (GstQueue * queue);
224 static gboolean gst_queue_is_filled (GstQueue * queue);
225
226
227 typedef struct
228 {
229 GstMiniObject *item;
230 gsize size;
231 gboolean is_query;
232 } GstQueueItem;
233
234 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
235
236 static GType
queue_leaky_get_type(void)237 queue_leaky_get_type (void)
238 {
239 static GType queue_leaky_type = 0;
240 static const GEnumValue queue_leaky[] = {
241 {GST_QUEUE_NO_LEAK, "Not Leaky", "no"},
242 {GST_QUEUE_LEAK_UPSTREAM, "Leaky on upstream (new buffers)", "upstream"},
243 {GST_QUEUE_LEAK_DOWNSTREAM, "Leaky on downstream (old buffers)",
244 "downstream"},
245 {0, NULL, NULL},
246 };
247
248 if (!queue_leaky_type) {
249 queue_leaky_type = g_enum_register_static ("GstQueueLeaky", queue_leaky);
250 }
251 return queue_leaky_type;
252 }
253
254 static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
255
256 static void
gst_queue_class_init(GstQueueClass * klass)257 gst_queue_class_init (GstQueueClass * klass)
258 {
259 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
260 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
261
262 gobject_class->set_property = gst_queue_set_property;
263 gobject_class->get_property = gst_queue_get_property;
264
265 /* signals */
266 /**
267 * GstQueue::underrun:
268 * @queue: the queue instance
269 *
270 * Reports that the buffer became empty (underrun).
271 * A buffer is empty if the total amount of data inside it (num-buffers, time,
272 * size) is lower than the boundary values which can be set through the
273 * GObject properties.
274 */
275 gst_queue_signals[SIGNAL_UNDERRUN] =
276 g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
277 G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
278 NULL, G_TYPE_NONE, 0);
279 /**
280 * GstQueue::running:
281 * @queue: the queue instance
282 *
283 * Reports that enough (min-threshold) data is in the queue. Use this signal
284 * together with the underrun signal to pause the pipeline on underrun and
285 * wait for the queue to fill-up before resume playback.
286 */
287 gst_queue_signals[SIGNAL_RUNNING] =
288 g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
289 G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
290 NULL, G_TYPE_NONE, 0);
291 /**
292 * GstQueue::overrun:
293 * @queue: the queue instance
294 *
295 * Reports that the buffer became full (overrun).
296 * A buffer is full if the total amount of data inside it (num-buffers, time,
297 * size) is higher than the boundary values which can be set through the
298 * GObject properties.
299 */
300 gst_queue_signals[SIGNAL_OVERRUN] =
301 g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
302 G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
303 NULL, G_TYPE_NONE, 0);
304 /**
305 * GstQueue::pushing:
306 * @queue: the queue instance
307 *
308 * Reports when the queue has enough data to start pushing data again on the
309 * source pad.
310 */
311 gst_queue_signals[SIGNAL_PUSHING] =
312 g_signal_new ("pushing", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
313 G_STRUCT_OFFSET (GstQueueClass, pushing), NULL, NULL,
314 NULL, G_TYPE_NONE, 0);
315
316 /* properties */
317 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
318 g_param_spec_uint ("current-level-bytes", "Current level (kB)",
319 "Current amount of data in the queue (bytes)",
320 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
321 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
322 g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
323 "Current number of buffers in the queue",
324 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
325 g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
326 g_param_spec_uint64 ("current-level-time", "Current level (ns)",
327 "Current amount of data in the queue (in ns)",
328 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
329
330 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
331 g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
332 "Max. amount of data in the queue (bytes, 0=disable)",
333 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
334 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
335 G_PARAM_STATIC_STRINGS));
336 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
337 g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
338 "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
339 DEFAULT_MAX_SIZE_BUFFERS,
340 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
341 G_PARAM_STATIC_STRINGS));
342 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
343 g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
344 "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
345 DEFAULT_MAX_SIZE_TIME,
346 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
347 G_PARAM_STATIC_STRINGS));
348
349 g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_BYTES,
350 g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
351 "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
352 0, G_MAXUINT, 0,
353 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
354 G_PARAM_STATIC_STRINGS));
355 g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_BUFFERS,
356 g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
357 "Min. number of buffers in the queue to allow reading (0=disable)", 0,
358 G_MAXUINT, 0,
359 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
360 G_PARAM_STATIC_STRINGS));
361 g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_TIME,
362 g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
363 "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
364 0, G_MAXUINT64, 0,
365 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
366 G_PARAM_STATIC_STRINGS));
367
368 g_object_class_install_property (gobject_class, PROP_LEAKY,
369 g_param_spec_enum ("leaky", "Leaky",
370 "Where the queue leaks, if at all",
371 GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK,
372 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
373 G_PARAM_STATIC_STRINGS));
374
375 /**
376 * GstQueue:silent
377 *
378 * Don't emit queue signals. Makes queues more lightweight if no signals are
379 * needed.
380 */
381 g_object_class_install_property (gobject_class, PROP_SILENT,
382 g_param_spec_boolean ("silent", "Silent",
383 "Don't emit queue signals", FALSE,
384 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
385 G_PARAM_STATIC_STRINGS));
386
387 /**
388 * queue:flush-on-eos:
389 *
390 * Discard all data in the queue when an EOS event is received, and pass
391 * on the EOS event as soon as possible (instead of waiting until all
392 * buffers in the queue have been processed, which is the default behaviour).
393 *
394 * Flushing the queue on EOS might be useful when capturing and encoding
395 * from a live source, to finish up the recording quickly in cases when
396 * the encoder is slow. Note that this might mean some data from the end of
397 * the recording data might be lost though (never more than the configured
398 * max. sizes though).
399 *
400 * Since: 1.2
401 */
402 g_object_class_install_property (gobject_class, PROP_FLUSH_ON_EOS,
403 g_param_spec_boolean ("flush-on-eos", "Flush on EOS",
404 "Discard all data in the queue when an EOS event is received", FALSE,
405 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
406 G_PARAM_STATIC_STRINGS));
407
408 gobject_class->finalize = gst_queue_finalize;
409
410 gst_element_class_set_static_metadata (gstelement_class,
411 "Queue",
412 "Generic", "Simple data queue", "Erik Walthinsen <omega@cse.ogi.edu>");
413 gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
414 gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
415
416 /* Registering debug symbols for function pointers */
417 GST_DEBUG_REGISTER_FUNCPTR (gst_queue_src_activate_mode);
418 GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_sink_event);
419 GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_sink_query);
420 GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_event);
421 GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_query);
422 GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain);
423 GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain_list);
424
425 gst_type_mark_as_plugin_api (GST_TYPE_QUEUE_LEAKY, 0);
426 }
427
428 static void
gst_queue_init(GstQueue * queue)429 gst_queue_init (GstQueue * queue)
430 {
431 queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
432
433 gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
434 gst_pad_set_chain_list_function (queue->sinkpad, gst_queue_chain_list);
435 gst_pad_set_activatemode_function (queue->sinkpad,
436 gst_queue_sink_activate_mode);
437 gst_pad_set_event_full_function (queue->sinkpad, gst_queue_handle_sink_event);
438 gst_pad_set_query_function (queue->sinkpad, gst_queue_handle_sink_query);
439 GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
440 gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
441
442 queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
443
444 gst_pad_set_activatemode_function (queue->srcpad,
445 gst_queue_src_activate_mode);
446 gst_pad_set_event_function (queue->srcpad, gst_queue_handle_src_event);
447 gst_pad_set_query_function (queue->srcpad, gst_queue_handle_src_query);
448 GST_PAD_SET_PROXY_CAPS (queue->srcpad);
449 gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
450
451 GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
452 queue->max_size.buffers = DEFAULT_MAX_SIZE_BUFFERS;
453 queue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
454 queue->max_size.time = DEFAULT_MAX_SIZE_TIME;
455 GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
456 GST_QUEUE_CLEAR_LEVEL (queue->orig_min_threshold);
457 gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
458 gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
459 queue->head_needs_discont = queue->tail_needs_discont = FALSE;
460
461 queue->leaky = GST_QUEUE_NO_LEAK;
462 queue->srcresult = GST_FLOW_FLUSHING;
463
464 g_mutex_init (&queue->qlock);
465 g_cond_init (&queue->item_add);
466 g_cond_init (&queue->item_del);
467 g_cond_init (&queue->query_handled);
468
469 queue->queue =
470 gst_queue_array_new_for_struct (sizeof (GstQueueItem),
471 DEFAULT_MAX_SIZE_BUFFERS * 3 / 2);
472
473 queue->sinktime = GST_CLOCK_STIME_NONE;
474 queue->srctime = GST_CLOCK_STIME_NONE;
475
476 queue->sink_tainted = TRUE;
477 queue->src_tainted = TRUE;
478
479 queue->newseg_applied_to_src = FALSE;
480
481 GST_DEBUG_OBJECT (queue,
482 "initialized queue's not_empty & not_full conditions");
483 }
484
485 /* called only once, as opposed to dispose */
486 static void
gst_queue_finalize(GObject * object)487 gst_queue_finalize (GObject * object)
488 {
489 GstQueue *queue = GST_QUEUE (object);
490 GstQueueItem *qitem;
491
492 GST_DEBUG_OBJECT (queue, "finalizing queue");
493
494 while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
495 /* FIXME: if it's a query, shouldn't we unref that too? */
496 if (!qitem->is_query)
497 gst_mini_object_unref (qitem->item);
498 }
499 gst_queue_array_free (queue->queue);
500
501 g_mutex_clear (&queue->qlock);
502 g_cond_clear (&queue->item_add);
503 g_cond_clear (&queue->item_del);
504 g_cond_clear (&queue->query_handled);
505
506 G_OBJECT_CLASS (parent_class)->finalize (object);
507 }
508
509 /* Convenience function */
510 static inline GstClockTimeDiff
my_segment_to_running_time(GstSegment * segment,GstClockTime val)511 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
512 {
513 GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
514
515 if (GST_CLOCK_TIME_IS_VALID (val)) {
516 gboolean sign =
517 gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
518 if (sign > 0)
519 res = val;
520 else if (sign < 0)
521 res = -val;
522 }
523 return res;
524 }
525
526 /* calculate the diff between running time on the sink and src of the queue.
527 * This is the total amount of time in the queue. */
528 static void
update_time_level(GstQueue * queue)529 update_time_level (GstQueue * queue)
530 {
531 gint64 sink_time, src_time;
532
533 if (queue->sink_tainted) {
534 GST_LOG_OBJECT (queue, "update sink time");
535 queue->sinktime =
536 my_segment_to_running_time (&queue->sink_segment,
537 queue->sink_segment.position);
538 queue->sink_tainted = FALSE;
539 }
540 sink_time = queue->sinktime;
541
542 if (queue->src_tainted) {
543 GST_LOG_OBJECT (queue, "update src time");
544 queue->srctime =
545 my_segment_to_running_time (&queue->src_segment,
546 queue->src_segment.position);
547 queue->src_tainted = FALSE;
548 }
549 src_time = queue->srctime;
550
551 GST_LOG_OBJECT (queue, "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT,
552 GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
553
554 if (GST_CLOCK_STIME_IS_VALID (src_time)
555 && GST_CLOCK_STIME_IS_VALID (sink_time) && sink_time >= src_time)
556 queue->cur_level.time = sink_time - src_time;
557 else
558 queue->cur_level.time = 0;
559 }
560
561 /* take a SEGMENT event and apply the values to segment, updating the time
562 * level of queue. */
563 static void
apply_segment(GstQueue * queue,GstEvent * event,GstSegment * segment,gboolean sink)564 apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment,
565 gboolean sink)
566 {
567 gst_event_copy_segment (event, segment);
568
569 /* now configure the values, we use these to track timestamps on the
570 * sinkpad. */
571 if (segment->format != GST_FORMAT_TIME) {
572 /* non-time format, pretent the current time segment is closed with a
573 * 0 start and unknown stop time. */
574 segment->format = GST_FORMAT_TIME;
575 segment->start = 0;
576 segment->stop = -1;
577 segment->time = 0;
578 }
579 if (sink)
580 queue->sink_tainted = TRUE;
581 else
582 queue->src_tainted = TRUE;
583
584 GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
585
586 /* segment can update the time level of the queue */
587 update_time_level (queue);
588 }
589
590 static void
apply_gap(GstQueue * queue,GstEvent * event,GstSegment * segment,gboolean is_sink)591 apply_gap (GstQueue * queue, GstEvent * event,
592 GstSegment * segment, gboolean is_sink)
593 {
594 GstClockTime timestamp;
595 GstClockTime duration;
596
597 gst_event_parse_gap (event, ×tamp, &duration);
598
599 if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
600
601 if (GST_CLOCK_TIME_IS_VALID (duration)) {
602 timestamp += duration;
603 }
604
605 segment->position = timestamp;
606
607 if (is_sink)
608 queue->sink_tainted = TRUE;
609 else
610 queue->src_tainted = TRUE;
611
612 /* calc diff with other end */
613 update_time_level (queue);
614 }
615 }
616
617
618 /* take a buffer and update segment, updating the time level of the queue. */
619 static void
apply_buffer(GstQueue * queue,GstBuffer * buffer,GstSegment * segment,gboolean sink)620 apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
621 gboolean sink)
622 {
623 GstClockTime duration, timestamp;
624
625 timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
626 duration = GST_BUFFER_DURATION (buffer);
627
628 /* if no timestamp is set, assume it's continuous with the previous
629 * time */
630 if (timestamp == GST_CLOCK_TIME_NONE)
631 timestamp = segment->position;
632
633 /* add duration */
634 if (duration != GST_CLOCK_TIME_NONE)
635 timestamp += duration;
636
637 GST_LOG_OBJECT (queue, "%s position updated to %" GST_TIME_FORMAT,
638 segment == &queue->sink_segment ? "sink" : "src",
639 GST_TIME_ARGS (timestamp));
640
641 segment->position = timestamp;
642 if (sink)
643 queue->sink_tainted = TRUE;
644 else
645 queue->src_tainted = TRUE;
646
647
648 /* calc diff with other end */
649 update_time_level (queue);
650 }
651
652 static gboolean
buffer_list_apply_time(GstBuffer ** buf,guint idx,gpointer user_data)653 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data)
654 {
655 GstClockTime *timestamp = user_data;
656 GstClockTime btime;
657
658 GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
659 " duration %" GST_TIME_FORMAT, idx, GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
660 GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
661 GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
662
663 btime = GST_BUFFER_DTS_OR_PTS (*buf);
664 if (GST_CLOCK_TIME_IS_VALID (btime))
665 *timestamp = btime;
666
667 if (GST_BUFFER_DURATION_IS_VALID (*buf))
668 *timestamp += GST_BUFFER_DURATION (*buf);
669
670 GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
671
672 return TRUE;
673 }
674
675 /* take a buffer list and update segment, updating the time level of the queue */
676 static void
apply_buffer_list(GstQueue * queue,GstBufferList * buffer_list,GstSegment * segment,gboolean sink)677 apply_buffer_list (GstQueue * queue, GstBufferList * buffer_list,
678 GstSegment * segment, gboolean sink)
679 {
680 GstClockTime timestamp;
681
682 /* if no timestamp is set, assume it's continuous with the previous time */
683 timestamp = segment->position;
684
685 gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, ×tamp);
686
687 GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
688 GST_TIME_ARGS (timestamp));
689
690 segment->position = timestamp;
691
692 if (sink)
693 queue->sink_tainted = TRUE;
694 else
695 queue->src_tainted = TRUE;
696
697 /* calc diff with other end */
698 update_time_level (queue);
699 }
700
701 static void
gst_queue_locked_flush(GstQueue * queue,gboolean full)702 gst_queue_locked_flush (GstQueue * queue, gboolean full)
703 {
704 GstQueueItem *qitem;
705
706 while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
707 /* Then lose another reference because we are supposed to destroy that
708 data when flushing */
709 if (!full && !qitem->is_query && GST_IS_EVENT (qitem->item)
710 && GST_EVENT_IS_STICKY (qitem->item)
711 && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
712 && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
713 gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (qitem->item));
714 }
715 if (!qitem->is_query)
716 gst_mini_object_unref (qitem->item);
717 memset (qitem, 0, sizeof (GstQueueItem));
718 }
719 queue->last_query = FALSE;
720 g_cond_signal (&queue->query_handled);
721 GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
722 queue->min_threshold.buffers = queue->orig_min_threshold.buffers;
723 queue->min_threshold.bytes = queue->orig_min_threshold.bytes;
724 queue->min_threshold.time = queue->orig_min_threshold.time;
725 gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
726 gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
727 queue->head_needs_discont = queue->tail_needs_discont = FALSE;
728
729 queue->sinktime = queue->srctime = GST_CLOCK_STIME_NONE;
730 queue->sink_tainted = queue->src_tainted = TRUE;
731
732 /* we deleted a lot of something */
733 GST_QUEUE_SIGNAL_DEL (queue);
734 }
735
736 /* enqueue an item an update the level stats, with QUEUE_LOCK */
737 static inline void
gst_queue_locked_enqueue_buffer(GstQueue * queue,gpointer item)738 gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
739 {
740 GstQueueItem qitem;
741 GstBuffer *buffer = GST_BUFFER_CAST (item);
742 gsize bsize = gst_buffer_get_size (buffer);
743
744 /* add buffer to the statistics */
745 queue->cur_level.buffers++;
746 queue->cur_level.bytes += bsize;
747 apply_buffer (queue, buffer, &queue->sink_segment, TRUE);
748
749 qitem.item = item;
750 qitem.is_query = FALSE;
751 qitem.size = bsize;
752 gst_queue_array_push_tail_struct (queue->queue, &qitem);
753 GST_QUEUE_SIGNAL_ADD (queue);
754 }
755
756 static inline void
gst_queue_locked_enqueue_buffer_list(GstQueue * queue,gpointer item)757 gst_queue_locked_enqueue_buffer_list (GstQueue * queue, gpointer item)
758 {
759 GstQueueItem qitem;
760 GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
761 gsize bsize;
762
763 bsize = gst_buffer_list_calculate_size (buffer_list);
764
765 /* add buffer to the statistics */
766 queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
767 queue->cur_level.bytes += bsize;
768 apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
769
770 qitem.item = item;
771 qitem.is_query = FALSE;
772 qitem.size = bsize;
773 gst_queue_array_push_tail_struct (queue->queue, &qitem);
774 GST_QUEUE_SIGNAL_ADD (queue);
775 }
776
777 static inline void
gst_queue_locked_enqueue_event(GstQueue * queue,gpointer item)778 gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
779 {
780 GstQueueItem qitem;
781 GstEvent *event = GST_EVENT_CAST (item);
782
783 switch (GST_EVENT_TYPE (event)) {
784 case GST_EVENT_EOS:
785 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from upstream");
786 /* Zero the thresholds, this makes sure the queue is completely
787 * filled and we can read all data from the queue. */
788 if (queue->flush_on_eos)
789 gst_queue_locked_flush (queue, FALSE);
790 else
791 GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
792 /* mark the queue as EOS. This prevents us from accepting more data. */
793 queue->eos = TRUE;
794 break;
795 case GST_EVENT_SEGMENT:
796 apply_segment (queue, event, &queue->sink_segment, TRUE);
797 /* if the queue is empty, apply sink segment on the source */
798 if (gst_queue_array_is_empty (queue->queue)) {
799 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Apply segment on srcpad");
800 apply_segment (queue, event, &queue->src_segment, FALSE);
801 queue->newseg_applied_to_src = TRUE;
802 }
803 /* a new segment allows us to accept more buffers if we got EOS
804 * from downstream */
805 queue->unexpected = FALSE;
806 break;
807 case GST_EVENT_GAP:
808 apply_gap (queue, event, &queue->sink_segment, TRUE);
809 break;
810 default:
811 break;
812 }
813
814 qitem.item = item;
815 qitem.is_query = FALSE;
816 qitem.size = 0;
817 gst_queue_array_push_tail_struct (queue->queue, &qitem);
818 GST_QUEUE_SIGNAL_ADD (queue);
819 }
820
821 /* dequeue an item from the queue and update level stats, with QUEUE_LOCK */
822 static GstMiniObject *
gst_queue_locked_dequeue(GstQueue * queue)823 gst_queue_locked_dequeue (GstQueue * queue)
824 {
825 GstQueueItem *qitem;
826 GstMiniObject *item;
827 gsize bufsize;
828
829 qitem = gst_queue_array_pop_head_struct (queue->queue);
830 if (qitem == NULL)
831 goto no_item;
832
833 item = qitem->item;
834 bufsize = qitem->size;
835
836 if (GST_IS_BUFFER (item)) {
837 GstBuffer *buffer = GST_BUFFER_CAST (item);
838
839 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
840 "retrieved buffer %p from queue", buffer);
841
842 queue->cur_level.buffers--;
843 queue->cur_level.bytes -= bufsize;
844 apply_buffer (queue, buffer, &queue->src_segment, FALSE);
845
846 /* if the queue is empty now, update the other side */
847 if (queue->cur_level.buffers == 0)
848 queue->cur_level.time = 0;
849 } else if (GST_IS_BUFFER_LIST (item)) {
850 GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
851
852 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
853 "retrieved buffer list %p from queue", buffer_list);
854
855 queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
856 queue->cur_level.bytes -= bufsize;
857 apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
858
859 /* if the queue is empty now, update the other side */
860 if (queue->cur_level.buffers == 0)
861 queue->cur_level.time = 0;
862 } else if (GST_IS_EVENT (item)) {
863 GstEvent *event = GST_EVENT_CAST (item);
864
865 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
866 "retrieved event %p from queue", event);
867
868 switch (GST_EVENT_TYPE (event)) {
869 case GST_EVENT_EOS:
870 /* queue is empty now that we dequeued the EOS */
871 GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
872 break;
873 case GST_EVENT_SEGMENT:
874 /* apply newsegment if it has not already been applied */
875 if (G_LIKELY (!queue->newseg_applied_to_src)) {
876 apply_segment (queue, event, &queue->src_segment, FALSE);
877 } else {
878 queue->newseg_applied_to_src = FALSE;
879 }
880 break;
881 case GST_EVENT_GAP:
882 apply_gap (queue, event, &queue->src_segment, FALSE);
883 break;
884 default:
885 break;
886 }
887 } else if (GST_IS_QUERY (item)) {
888 GstQuery *query = GST_QUERY_CAST (item);
889
890 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
891 "retrieved query %p from queue", query);
892 } else {
893 g_warning
894 ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
895 item, GST_OBJECT_NAME (queue));
896 item = NULL;
897 }
898 GST_QUEUE_SIGNAL_DEL (queue);
899
900 return item;
901
902 /* ERRORS */
903 no_item:
904 {
905 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "the queue is empty");
906 return NULL;
907 }
908 }
909
910 static GstFlowReturn
gst_queue_handle_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)911 gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
912 {
913 gboolean ret = TRUE;
914 GstQueue *queue;
915
916 queue = GST_QUEUE (parent);
917
918 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
919 GST_EVENT_TYPE_NAME (event));
920
921 switch (GST_EVENT_TYPE (event)) {
922 case GST_EVENT_FLUSH_START:
923 /* forward event */
924 ret = gst_pad_push_event (queue->srcpad, event);
925
926 /* now unblock the chain function */
927 GST_QUEUE_MUTEX_LOCK (queue);
928 queue->srcresult = GST_FLOW_FLUSHING;
929 /* unblock the loop and chain functions */
930 GST_QUEUE_SIGNAL_ADD (queue);
931 GST_QUEUE_SIGNAL_DEL (queue);
932 GST_QUEUE_MUTEX_UNLOCK (queue);
933
934 /* make sure it pauses, this should happen since we sent
935 * flush_start downstream. */
936 gst_pad_pause_task (queue->srcpad);
937 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
938
939 /* unblock query handler after the streaming thread is shut down.
940 * Otherwise downstream might have a query that is already unreffed
941 * upstream */
942 GST_QUEUE_MUTEX_LOCK (queue);
943 queue->last_query = FALSE;
944 g_cond_signal (&queue->query_handled);
945 GST_QUEUE_MUTEX_UNLOCK (queue);
946 break;
947 case GST_EVENT_FLUSH_STOP:
948 /* forward event */
949 ret = gst_pad_push_event (queue->srcpad, event);
950
951 GST_QUEUE_MUTEX_LOCK (queue);
952 gst_queue_locked_flush (queue, FALSE);
953 queue->srcresult = GST_FLOW_OK;
954 queue->eos = FALSE;
955 queue->unexpected = FALSE;
956 if (gst_pad_is_active (queue->srcpad)) {
957 gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
958 queue->srcpad, NULL);
959 } else {
960 GST_INFO_OBJECT (queue->srcpad, "not re-starting task on srcpad, "
961 "pad not active any longer");
962 }
963 GST_QUEUE_MUTEX_UNLOCK (queue);
964
965 STATUS (queue, pad, "after flush");
966 break;
967 default:
968 if (GST_EVENT_IS_SERIALIZED (event)) {
969 /* serialized events go in the queue */
970 GST_QUEUE_MUTEX_LOCK (queue);
971
972 /* STREAM_START and SEGMENT reset the EOS status of a
973 * pad. Change the cached sinkpad flow result accordingly */
974 if (queue->srcresult == GST_FLOW_EOS
975 && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
976 || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
977 queue->srcresult = GST_FLOW_OK;
978
979 if (queue->srcresult != GST_FLOW_OK) {
980 /* Errors in sticky event pushing are no problem and ignored here
981 * as they will cause more meaningful errors during data flow.
982 * For EOS events, that are not followed by data flow, we still
983 * return FALSE here though and report an error.
984 */
985 if (!GST_EVENT_IS_STICKY (event)) {
986 GST_QUEUE_MUTEX_UNLOCK (queue);
987 goto out_flow_error;
988 } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
989 if (queue->srcresult == GST_FLOW_NOT_LINKED
990 || queue->srcresult < GST_FLOW_EOS) {
991 GST_QUEUE_MUTEX_UNLOCK (queue);
992 GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
993 } else {
994 GST_QUEUE_MUTEX_UNLOCK (queue);
995 }
996 goto out_flow_error;
997 }
998 }
999
1000 /* refuse more events on EOS unless they unset the EOS status */
1001 if (queue->eos) {
1002 switch (GST_EVENT_TYPE (event)) {
1003 case GST_EVENT_STREAM_START:
1004 case GST_EVENT_SEGMENT:
1005 /* Restart the loop */
1006 if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
1007 queue->srcresult = GST_FLOW_OK;
1008 queue->eos = FALSE;
1009 queue->unexpected = FALSE;
1010 gst_pad_start_task (queue->srcpad,
1011 (GstTaskFunction) gst_queue_loop, queue->srcpad, NULL);
1012 } else {
1013 queue->eos = FALSE;
1014 queue->unexpected = FALSE;
1015 }
1016
1017 break;
1018 default:
1019 goto out_eos;
1020 }
1021 }
1022
1023 gst_queue_locked_enqueue_event (queue, event);
1024 GST_QUEUE_MUTEX_UNLOCK (queue);
1025 } else {
1026 /* non-serialized events are forwarded downstream immediately */
1027 ret = gst_pad_push_event (queue->srcpad, event);
1028 }
1029 break;
1030 }
1031 if (ret == FALSE) {
1032 GST_ERROR_OBJECT (queue, "Failed to push event");
1033 return GST_FLOW_ERROR;
1034 }
1035 return GST_FLOW_OK;
1036
1037 /* ERRORS */
1038 out_eos:
1039 {
1040 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "refusing event, we are EOS");
1041 GST_QUEUE_MUTEX_UNLOCK (queue);
1042 gst_event_unref (event);
1043 return GST_FLOW_EOS;
1044 }
1045 out_flow_error:
1046 {
1047 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1048 "refusing event, we have a downstream flow error: %s",
1049 gst_flow_get_name (queue->srcresult));
1050 gst_event_unref (event);
1051 return queue->srcresult;
1052 }
1053 }
1054
1055 static gboolean
gst_queue_handle_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)1056 gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
1057 {
1058 GstQueue *queue = GST_QUEUE_CAST (parent);
1059 gboolean res;
1060
1061 switch (GST_QUERY_TYPE (query)) {
1062 default:
1063 if (G_UNLIKELY (GST_QUERY_IS_SERIALIZED (query))) {
1064 GstQueueItem qitem;
1065
1066 GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1067 GST_LOG_OBJECT (queue, "queuing query %p (%s)", query,
1068 GST_QUERY_TYPE_NAME (query));
1069 qitem.item = GST_MINI_OBJECT_CAST (query);
1070 qitem.is_query = TRUE;
1071 qitem.size = 0;
1072 gst_queue_array_push_tail_struct (queue->queue, &qitem);
1073 GST_QUEUE_SIGNAL_ADD (queue);
1074 while (queue->srcresult == GST_FLOW_OK &&
1075 queue->last_handled_query != query)
1076 g_cond_wait (&queue->query_handled, &queue->qlock);
1077 queue->last_handled_query = NULL;
1078 if (queue->srcresult != GST_FLOW_OK)
1079 goto out_flushing;
1080 res = queue->last_query;
1081 GST_QUEUE_MUTEX_UNLOCK (queue);
1082 } else {
1083 res = gst_pad_query_default (pad, parent, query);
1084 }
1085 break;
1086 }
1087 return res;
1088
1089 /* ERRORS */
1090 out_flushing:
1091 {
1092 GST_DEBUG_OBJECT (queue, "we are flushing");
1093 GST_QUEUE_MUTEX_UNLOCK (queue);
1094 return FALSE;
1095 }
1096 }
1097
1098 static gboolean
gst_queue_is_empty(GstQueue * queue)1099 gst_queue_is_empty (GstQueue * queue)
1100 {
1101 GstQueueItem *tail;
1102
1103 tail = gst_queue_array_peek_tail_struct (queue->queue);
1104
1105 if (tail == NULL)
1106 return TRUE;
1107
1108 /* Only consider the queue empty if the minimum thresholds
1109 * are not reached and data is at the queue tail. Otherwise
1110 * we would block forever on serialized queries.
1111 */
1112 if (!GST_IS_BUFFER (tail->item) && !GST_IS_BUFFER_LIST (tail->item))
1113 return FALSE;
1114
1115 /* It is possible that a max size is reached before all min thresholds are.
1116 * Therefore, only consider it empty if it is not filled. */
1117 return ((queue->min_threshold.buffers > 0 &&
1118 queue->cur_level.buffers < queue->min_threshold.buffers) ||
1119 (queue->min_threshold.bytes > 0 &&
1120 queue->cur_level.bytes < queue->min_threshold.bytes) ||
1121 (queue->min_threshold.time > 0 &&
1122 queue->cur_level.time < queue->min_threshold.time)) &&
1123 !gst_queue_is_filled (queue);
1124 }
1125
1126 static gboolean
gst_queue_is_filled(GstQueue * queue)1127 gst_queue_is_filled (GstQueue * queue)
1128 {
1129 return (((queue->max_size.buffers > 0 &&
1130 queue->cur_level.buffers >= queue->max_size.buffers) ||
1131 (queue->max_size.bytes > 0 &&
1132 queue->cur_level.bytes >= queue->max_size.bytes) ||
1133 (queue->max_size.time > 0 &&
1134 queue->cur_level.time >= queue->max_size.time)));
1135 }
1136
1137 static void
gst_queue_leak_downstream(GstQueue * queue)1138 gst_queue_leak_downstream (GstQueue * queue)
1139 {
1140 /* for as long as the queue is filled, dequeue an item and discard it */
1141 while (gst_queue_is_filled (queue)) {
1142 GstMiniObject *leak;
1143
1144 leak = gst_queue_locked_dequeue (queue);
1145 /* there is nothing to dequeue and the queue is still filled.. This should
1146 * not happen */
1147 g_assert (leak != NULL);
1148
1149 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1150 "queue is full, leaking item %p on downstream end", leak);
1151 if (GST_IS_EVENT (leak) && GST_EVENT_IS_STICKY (leak)) {
1152 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1153 "Storing sticky event %s on srcpad", GST_EVENT_TYPE_NAME (leak));
1154 gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (leak));
1155 }
1156
1157 if (!GST_IS_QUERY (leak))
1158 gst_mini_object_unref (leak);
1159
1160 /* last buffer needs to get a DISCONT flag */
1161 queue->head_needs_discont = TRUE;
1162 }
1163 }
1164
1165 static gboolean
discont_first_buffer(GstBuffer ** buffer,guint i,gpointer user_data)1166 discont_first_buffer (GstBuffer ** buffer, guint i, gpointer user_data)
1167 {
1168 GstQueue *queue = user_data;
1169 GstBuffer *subbuffer = gst_buffer_make_writable (*buffer);
1170
1171 if (subbuffer) {
1172 *buffer = subbuffer;
1173 GST_BUFFER_FLAG_SET (*buffer, GST_BUFFER_FLAG_DISCONT);
1174 } else {
1175 GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
1176 }
1177
1178 return FALSE;
1179 }
1180
1181 static GstFlowReturn
gst_queue_chain_buffer_or_list(GstPad * pad,GstObject * parent,GstMiniObject * obj,gboolean is_list)1182 gst_queue_chain_buffer_or_list (GstPad * pad, GstObject * parent,
1183 GstMiniObject * obj, gboolean is_list)
1184 {
1185 GstQueue *queue;
1186
1187 queue = GST_QUEUE_CAST (parent);
1188
1189 /* we have to lock the queue since we span threads */
1190 GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1191 /* when we received EOS, we refuse any more data */
1192 if (queue->eos)
1193 goto out_eos;
1194 if (queue->unexpected)
1195 goto out_unexpected;
1196
1197 if (!is_list) {
1198 GstClockTime duration, timestamp;
1199 GstBuffer *buffer = GST_BUFFER_CAST (obj);
1200
1201 timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
1202 duration = GST_BUFFER_DURATION (buffer);
1203
1204 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
1205 G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
1206 GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
1207 GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
1208 } else {
1209 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1210 "received buffer list %p with %u buffers", obj,
1211 gst_buffer_list_length (GST_BUFFER_LIST_CAST (obj)));
1212 }
1213
1214 /* We make space available if we're "full" according to whatever
1215 * the user defined as "full". Note that this only applies to buffers.
1216 * We always handle events and they don't count in our statistics. */
1217 while (gst_queue_is_filled (queue)) {
1218 if (!queue->silent) {
1219 GST_QUEUE_MUTEX_UNLOCK (queue);
1220 g_signal_emit (queue, gst_queue_signals[SIGNAL_OVERRUN], 0);
1221 GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1222 /* we recheck, the signal could have changed the thresholds */
1223 if (!gst_queue_is_filled (queue))
1224 break;
1225 }
1226
1227 /* how are we going to make space for this buffer? */
1228 switch (queue->leaky) {
1229 case GST_QUEUE_LEAK_UPSTREAM:
1230 /* next buffer needs to get a DISCONT flag */
1231 queue->tail_needs_discont = TRUE;
1232 /* leak current buffer */
1233 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1234 "queue is full, leaking buffer on upstream end");
1235 /* now we can clean up and exit right away */
1236 goto out_unref;
1237 case GST_QUEUE_LEAK_DOWNSTREAM:
1238 gst_queue_leak_downstream (queue);
1239 break;
1240 default:
1241 g_warning ("Unknown leaky type, using default");
1242 /* fall-through */
1243 case GST_QUEUE_NO_LEAK:
1244 {
1245 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1246 "queue is full, waiting for free space");
1247
1248 /* don't leak. Instead, wait for space to be available */
1249 /* for as long as the queue is filled, wait till an item was deleted. */
1250 while (gst_queue_is_filled (queue)) {
1251 GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
1252 };
1253
1254 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is not full");
1255
1256 if (!queue->silent) {
1257 GST_QUEUE_MUTEX_UNLOCK (queue);
1258 g_signal_emit (queue, gst_queue_signals[SIGNAL_RUNNING], 0);
1259 GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1260 }
1261 break;
1262 }
1263 }
1264 }
1265
1266 if (queue->tail_needs_discont) {
1267 if (!is_list) {
1268 GstBuffer *buffer = GST_BUFFER_CAST (obj);
1269 GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
1270
1271 if (subbuffer) {
1272 buffer = subbuffer;
1273 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1274 } else {
1275 GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
1276 }
1277
1278 obj = GST_MINI_OBJECT_CAST (buffer);
1279 } else {
1280 GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (obj);
1281
1282 buffer_list = gst_buffer_list_make_writable (buffer_list);
1283 gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
1284 obj = GST_MINI_OBJECT_CAST (buffer_list);
1285 }
1286 queue->tail_needs_discont = FALSE;
1287 }
1288
1289 /* put buffer in queue now */
1290 if (is_list)
1291 gst_queue_locked_enqueue_buffer_list (queue, obj);
1292 else
1293 gst_queue_locked_enqueue_buffer (queue, obj);
1294 GST_QUEUE_MUTEX_UNLOCK (queue);
1295
1296 return GST_FLOW_OK;
1297
1298 /* special conditions */
1299 out_unref:
1300 {
1301 GST_QUEUE_MUTEX_UNLOCK (queue);
1302
1303 gst_mini_object_unref (obj);
1304
1305 return GST_FLOW_OK;
1306 }
1307 out_flushing:
1308 {
1309 GstFlowReturn ret = queue->srcresult;
1310
1311 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1312 "exit because task paused, reason: %s", gst_flow_get_name (ret));
1313 GST_QUEUE_MUTEX_UNLOCK (queue);
1314 gst_mini_object_unref (obj);
1315
1316 return ret;
1317 }
1318 out_eos:
1319 {
1320 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
1321 GST_QUEUE_MUTEX_UNLOCK (queue);
1322
1323 gst_mini_object_unref (obj);
1324
1325 return GST_FLOW_EOS;
1326 }
1327 out_unexpected:
1328 {
1329 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
1330 GST_QUEUE_MUTEX_UNLOCK (queue);
1331
1332 gst_mini_object_unref (obj);
1333
1334 return GST_FLOW_EOS;
1335 }
1336 }
1337
1338 static GstFlowReturn
gst_queue_chain_list(GstPad * pad,GstObject * parent,GstBufferList * buffer_list)1339 gst_queue_chain_list (GstPad * pad, GstObject * parent,
1340 GstBufferList * buffer_list)
1341 {
1342 return gst_queue_chain_buffer_or_list (pad, parent,
1343 GST_MINI_OBJECT_CAST (buffer_list), TRUE);
1344 }
1345
1346 static GstFlowReturn
gst_queue_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)1347 gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1348 {
1349 return gst_queue_chain_buffer_or_list (pad, parent,
1350 GST_MINI_OBJECT_CAST (buffer), FALSE);
1351 }
1352
1353 /* dequeue an item from the queue an push it downstream. This functions returns
1354 * the result of the push. */
1355 static GstFlowReturn
gst_queue_push_one(GstQueue * queue)1356 gst_queue_push_one (GstQueue * queue)
1357 {
1358 GstFlowReturn result = queue->srcresult;
1359 GstMiniObject *data;
1360 gboolean is_list;
1361
1362 data = gst_queue_locked_dequeue (queue);
1363 if (data == NULL)
1364 goto no_item;
1365
1366 next:
1367 is_list = GST_IS_BUFFER_LIST (data);
1368
1369 if (GST_IS_BUFFER (data) || is_list) {
1370 if (!is_list) {
1371 GstBuffer *buffer;
1372
1373 buffer = GST_BUFFER_CAST (data);
1374
1375 if (queue->head_needs_discont) {
1376 GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
1377
1378 if (subbuffer) {
1379 buffer = subbuffer;
1380 GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1381 } else {
1382 GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
1383 }
1384 queue->head_needs_discont = FALSE;
1385 }
1386
1387 GST_QUEUE_MUTEX_UNLOCK (queue);
1388 result = gst_pad_push (queue->srcpad, buffer);
1389 } else {
1390 GstBufferList *buffer_list;
1391
1392 buffer_list = GST_BUFFER_LIST_CAST (data);
1393
1394 if (queue->head_needs_discont) {
1395 buffer_list = gst_buffer_list_make_writable (buffer_list);
1396 gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
1397 queue->head_needs_discont = FALSE;
1398 }
1399
1400 GST_QUEUE_MUTEX_UNLOCK (queue);
1401 result = gst_pad_push_list (queue->srcpad, buffer_list);
1402 }
1403
1404 /* need to check for srcresult here as well */
1405 GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1406
1407 if (result == GST_FLOW_EOS) {
1408 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
1409 /* stop pushing buffers, we dequeue all items until we see an item that we
1410 * can push again, which is EOS or SEGMENT. If there is nothing in the
1411 * queue we can push, we set a flag to make the sinkpad refuse more
1412 * buffers with an EOS return value. */
1413 while ((data = gst_queue_locked_dequeue (queue))) {
1414 if (GST_IS_BUFFER (data)) {
1415 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1416 "dropping EOS buffer %p", data);
1417 gst_buffer_unref (GST_BUFFER_CAST (data));
1418 } else if (GST_IS_BUFFER_LIST (data)) {
1419 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1420 "dropping EOS buffer list %p", data);
1421 gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
1422 } else if (GST_IS_EVENT (data)) {
1423 GstEvent *event = GST_EVENT_CAST (data);
1424 GstEventType type = GST_EVENT_TYPE (event);
1425
1426 if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
1427 || type == GST_EVENT_STREAM_START) {
1428 /* we found a pushable item in the queue, push it out */
1429 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1430 "pushing pushable event %s after EOS",
1431 GST_EVENT_TYPE_NAME (event));
1432 goto next;
1433 }
1434 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1435 "dropping EOS event %p", event);
1436 gst_event_unref (event);
1437 } else if (GST_IS_QUERY (data)) {
1438 GstQuery *query = GST_QUERY_CAST (data);
1439
1440 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1441 "dropping query %p because of EOS", query);
1442 queue->last_query = FALSE;
1443 g_cond_signal (&queue->query_handled);
1444 }
1445 }
1446 /* no more items in the queue. Set the unexpected flag so that upstream
1447 * make us refuse any more buffers on the sinkpad. Since we will still
1448 * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
1449 * task function does not shut down. */
1450 queue->unexpected = TRUE;
1451 result = GST_FLOW_OK;
1452 }
1453 } else if (GST_IS_EVENT (data)) {
1454 GstEvent *event = GST_EVENT_CAST (data);
1455 GstEventType type = GST_EVENT_TYPE (event);
1456
1457 GST_QUEUE_MUTEX_UNLOCK (queue);
1458
1459 gst_pad_push_event (queue->srcpad, event);
1460
1461 GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1462 /* if we're EOS, return EOS so that the task pauses. */
1463 if (type == GST_EVENT_EOS) {
1464 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1465 "pushed EOS event %p, return EOS", event);
1466 result = GST_FLOW_EOS;
1467 }
1468 } else if (GST_IS_QUERY (data)) {
1469 GstQuery *query = GST_QUERY_CAST (data);
1470 gboolean ret;
1471
1472 GST_QUEUE_MUTEX_UNLOCK (queue);
1473 ret = gst_pad_peer_query (queue->srcpad, query);
1474 GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing_query);
1475 queue->last_query = ret;
1476 queue->last_handled_query = query;
1477 g_cond_signal (&queue->query_handled);
1478 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1479 "did query %p, return %d", query, queue->last_query);
1480 }
1481 return result;
1482
1483 /* ERRORS */
1484 no_item:
1485 {
1486 GST_CAT_ERROR_OBJECT (queue_dataflow, queue,
1487 "exit because we have no item in the queue");
1488 return GST_FLOW_ERROR;
1489 }
1490 out_flushing:
1491 {
1492 GstFlowReturn ret = queue->srcresult;
1493 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1494 "exit because task paused, reason: %s", gst_flow_get_name (ret));
1495 return ret;
1496 }
1497 out_flushing_query:
1498 {
1499 GstFlowReturn ret = queue->srcresult;
1500 queue->last_query = FALSE;
1501 g_cond_signal (&queue->query_handled);
1502 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1503 "exit because task paused, reason: %s", gst_flow_get_name (ret));
1504 return ret;
1505 }
1506 }
1507
1508 static void
gst_queue_loop(GstPad * pad)1509 gst_queue_loop (GstPad * pad)
1510 {
1511 GstQueue *queue;
1512 GstFlowReturn ret;
1513
1514 queue = (GstQueue *) GST_PAD_PARENT (pad);
1515
1516 /* have to lock for thread-safety */
1517 GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1518
1519 while (gst_queue_is_empty (queue)) {
1520 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is empty");
1521 if (!queue->silent) {
1522 GST_QUEUE_MUTEX_UNLOCK (queue);
1523 g_signal_emit (queue, gst_queue_signals[SIGNAL_UNDERRUN], 0);
1524 GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1525 }
1526
1527 /* we recheck, the signal could have changed the thresholds */
1528 while (gst_queue_is_empty (queue)) {
1529 GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
1530 }
1531
1532 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is not empty");
1533 if (!queue->silent) {
1534 GST_QUEUE_MUTEX_UNLOCK (queue);
1535 g_signal_emit (queue, gst_queue_signals[SIGNAL_RUNNING], 0);
1536 g_signal_emit (queue, gst_queue_signals[SIGNAL_PUSHING], 0);
1537 GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1538 }
1539 }
1540
1541 ret = gst_queue_push_one (queue);
1542 queue->srcresult = ret;
1543 if (ret != GST_FLOW_OK)
1544 goto out_flushing;
1545
1546 GST_QUEUE_MUTEX_UNLOCK (queue);
1547
1548 return;
1549
1550 /* ERRORS */
1551 out_flushing:
1552 {
1553 gboolean eos = queue->eos;
1554 GstFlowReturn ret = queue->srcresult;
1555
1556 gst_pad_pause_task (queue->srcpad);
1557 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1558 "pause task, reason: %s", gst_flow_get_name (ret));
1559 if (ret == GST_FLOW_FLUSHING) {
1560 gst_queue_locked_flush (queue, FALSE);
1561 } else {
1562 GST_QUEUE_SIGNAL_DEL (queue);
1563 queue->last_query = FALSE;
1564 g_cond_signal (&queue->query_handled);
1565 }
1566 GST_QUEUE_MUTEX_UNLOCK (queue);
1567 /* let app know about us giving up if upstream is not expected to do so */
1568 /* EOS is already taken care of elsewhere */
1569 if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
1570 GST_ELEMENT_FLOW_ERROR (queue, ret);
1571 gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
1572 }
1573 return;
1574 }
1575 }
1576
1577 static gboolean
gst_queue_handle_src_event(GstPad * pad,GstObject * parent,GstEvent * event)1578 gst_queue_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
1579 {
1580 gboolean res = TRUE;
1581 GstQueue *queue = GST_QUEUE (parent);
1582
1583 #ifndef GST_DISABLE_GST_DEBUG
1584 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
1585 event, GST_EVENT_TYPE (event));
1586 #endif
1587
1588 switch (GST_EVENT_TYPE (event)) {
1589 case GST_EVENT_RECONFIGURE:
1590 GST_QUEUE_MUTEX_LOCK (queue);
1591 if (queue->srcresult == GST_FLOW_NOT_LINKED) {
1592 /* when we got not linked, assume downstream is linked again now and we
1593 * can try to start pushing again */
1594 queue->srcresult = GST_FLOW_OK;
1595 gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad, NULL);
1596 }
1597 GST_QUEUE_MUTEX_UNLOCK (queue);
1598
1599 res = gst_pad_push_event (queue->sinkpad, event);
1600 break;
1601 default:
1602 res = gst_pad_event_default (pad, parent, event);
1603 break;
1604 }
1605
1606
1607 return res;
1608 }
1609
1610 static gboolean
gst_queue_handle_src_query(GstPad * pad,GstObject * parent,GstQuery * query)1611 gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1612 {
1613 GstQueue *queue = GST_QUEUE (parent);
1614 gboolean res;
1615
1616 switch (GST_QUERY_TYPE (query)) {
1617 case GST_QUERY_SCHEDULING:{
1618 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1619 res = TRUE;
1620 break;
1621 }
1622 default:
1623 res = gst_pad_query_default (pad, parent, query);
1624 break;
1625 }
1626
1627 if (!res)
1628 return FALSE;
1629
1630 /* Adjust peer response for data contained in queue */
1631 switch (GST_QUERY_TYPE (query)) {
1632 case GST_QUERY_POSITION:
1633 {
1634 gint64 peer_pos;
1635 GstFormat format;
1636
1637 /* get peer position */
1638 gst_query_parse_position (query, &format, &peer_pos);
1639
1640 /* FIXME: this code assumes that there's no discont in the queue */
1641 switch (format) {
1642 case GST_FORMAT_BYTES:
1643 peer_pos -= queue->cur_level.bytes;
1644 if (peer_pos < 0) /* Clamp result to 0 */
1645 peer_pos = 0;
1646 break;
1647 case GST_FORMAT_TIME:
1648 peer_pos -= queue->cur_level.time;
1649 if (peer_pos < 0) /* Clamp result to 0 */
1650 peer_pos = 0;
1651 break;
1652 default:
1653 GST_DEBUG_OBJECT (queue, "Can't adjust query in %s format, don't "
1654 "know how to adjust value", gst_format_get_name (format));
1655 return TRUE;
1656 }
1657 /* set updated position */
1658 gst_query_set_position (query, format, peer_pos);
1659 break;
1660 }
1661 case GST_QUERY_LATENCY:
1662 {
1663 gboolean live;
1664 GstClockTime min, max;
1665
1666 gst_query_parse_latency (query, &live, &min, &max);
1667
1668 /* we can delay up to the limit of the queue in time. If we have no time
1669 * limit, the best thing we can do is to return an infinite delay. In
1670 * reality a better estimate would be the byte/buffer rate but that is not
1671 * possible right now. */
1672 /* TODO: Use CONVERT query? */
1673 if (queue->max_size.time > 0 && max != -1
1674 && queue->leaky == GST_QUEUE_NO_LEAK)
1675 max += queue->max_size.time;
1676 else if (queue->max_size.time > 0 && queue->leaky != GST_QUEUE_NO_LEAK)
1677 max = MAX (queue->max_size.time, max);
1678 else
1679 max = -1;
1680
1681 /* adjust for min-threshold */
1682 if (queue->min_threshold.time > 0)
1683 min += queue->min_threshold.time;
1684
1685 gst_query_set_latency (query, live, min, max);
1686 break;
1687 }
1688 default:
1689 /* peer handled other queries */
1690 break;
1691 }
1692
1693 return TRUE;
1694 }
1695
1696 static gboolean
gst_queue_sink_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)1697 gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
1698 gboolean active)
1699 {
1700 gboolean result;
1701 GstQueue *queue;
1702
1703 queue = GST_QUEUE (parent);
1704
1705 switch (mode) {
1706 case GST_PAD_MODE_PUSH:
1707 if (active) {
1708 GST_QUEUE_MUTEX_LOCK (queue);
1709 queue->srcresult = GST_FLOW_OK;
1710 queue->eos = FALSE;
1711 queue->unexpected = FALSE;
1712 GST_QUEUE_MUTEX_UNLOCK (queue);
1713 } else {
1714 /* step 1, unblock chain function */
1715 GST_QUEUE_MUTEX_LOCK (queue);
1716 queue->srcresult = GST_FLOW_FLUSHING;
1717 /* the item del signal will unblock */
1718 GST_QUEUE_SIGNAL_DEL (queue);
1719 GST_QUEUE_MUTEX_UNLOCK (queue);
1720
1721 /* step 2, wait until streaming thread stopped and flush queue */
1722 GST_PAD_STREAM_LOCK (pad);
1723 GST_QUEUE_MUTEX_LOCK (queue);
1724 gst_queue_locked_flush (queue, TRUE);
1725 GST_QUEUE_MUTEX_UNLOCK (queue);
1726 GST_PAD_STREAM_UNLOCK (pad);
1727 }
1728 result = TRUE;
1729 break;
1730 default:
1731 result = FALSE;
1732 break;
1733 }
1734 return result;
1735 }
1736
1737 static gboolean
gst_queue_src_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)1738 gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
1739 gboolean active)
1740 {
1741 gboolean result;
1742 GstQueue *queue;
1743
1744 queue = GST_QUEUE (parent);
1745
1746 switch (mode) {
1747 case GST_PAD_MODE_PUSH:
1748 if (active) {
1749 GST_QUEUE_MUTEX_LOCK (queue);
1750 queue->srcresult = GST_FLOW_OK;
1751 queue->eos = FALSE;
1752 queue->unexpected = FALSE;
1753 result =
1754 gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad,
1755 NULL);
1756 GST_QUEUE_MUTEX_UNLOCK (queue);
1757 } else {
1758 /* step 1, unblock loop function */
1759 GST_QUEUE_MUTEX_LOCK (queue);
1760 queue->srcresult = GST_FLOW_FLUSHING;
1761 /* the item add signal will unblock */
1762 g_cond_signal (&queue->item_add);
1763 GST_QUEUE_MUTEX_UNLOCK (queue);
1764
1765 /* step 2, make sure streaming finishes */
1766 result = gst_pad_stop_task (pad);
1767
1768 GST_QUEUE_MUTEX_LOCK (queue);
1769 gst_queue_locked_flush (queue, FALSE);
1770 GST_QUEUE_MUTEX_UNLOCK (queue);
1771 }
1772 break;
1773 default:
1774 result = FALSE;
1775 break;
1776 }
1777 return result;
1778 }
1779
1780 static void
queue_capacity_change(GstQueue * queue)1781 queue_capacity_change (GstQueue * queue)
1782 {
1783 if (queue->leaky == GST_QUEUE_LEAK_DOWNSTREAM) {
1784 gst_queue_leak_downstream (queue);
1785 }
1786
1787 /* changing the capacity of the queue must wake up
1788 * the _chain function, it might have more room now
1789 * to store the buffer/event in the queue */
1790 GST_QUEUE_SIGNAL_DEL (queue);
1791 }
1792
1793 /* Changing the minimum required fill level must
1794 * wake up the _loop function as it might now
1795 * be able to preceed.
1796 */
1797 #define QUEUE_THRESHOLD_CHANGE(q)\
1798 GST_QUEUE_SIGNAL_ADD (q);
1799
1800 static void
gst_queue_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1801 gst_queue_set_property (GObject * object,
1802 guint prop_id, const GValue * value, GParamSpec * pspec)
1803 {
1804 GstQueue *queue = GST_QUEUE (object);
1805
1806 /* someone could change levels here, and since this
1807 * affects the get/put funcs, we need to lock for safety. */
1808 GST_QUEUE_MUTEX_LOCK (queue);
1809
1810 switch (prop_id) {
1811 case PROP_MAX_SIZE_BYTES:
1812 queue->max_size.bytes = g_value_get_uint (value);
1813 queue_capacity_change (queue);
1814 break;
1815 case PROP_MAX_SIZE_BUFFERS:
1816 queue->max_size.buffers = g_value_get_uint (value);
1817 queue_capacity_change (queue);
1818 break;
1819 case PROP_MAX_SIZE_TIME:
1820 queue->max_size.time = g_value_get_uint64 (value);
1821 queue_capacity_change (queue);
1822 break;
1823 case PROP_MIN_THRESHOLD_BYTES:
1824 queue->min_threshold.bytes = g_value_get_uint (value);
1825 queue->orig_min_threshold.bytes = queue->min_threshold.bytes;
1826 QUEUE_THRESHOLD_CHANGE (queue);
1827 break;
1828 case PROP_MIN_THRESHOLD_BUFFERS:
1829 queue->min_threshold.buffers = g_value_get_uint (value);
1830 queue->orig_min_threshold.buffers = queue->min_threshold.buffers;
1831 QUEUE_THRESHOLD_CHANGE (queue);
1832 break;
1833 case PROP_MIN_THRESHOLD_TIME:
1834 queue->min_threshold.time = g_value_get_uint64 (value);
1835 queue->orig_min_threshold.time = queue->min_threshold.time;
1836 QUEUE_THRESHOLD_CHANGE (queue);
1837 break;
1838 case PROP_LEAKY:
1839 queue->leaky = g_value_get_enum (value);
1840 break;
1841 case PROP_SILENT:
1842 queue->silent = g_value_get_boolean (value);
1843 break;
1844 case PROP_FLUSH_ON_EOS:
1845 queue->flush_on_eos = g_value_get_boolean (value);
1846 break;
1847 default:
1848 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1849 break;
1850 }
1851
1852 GST_QUEUE_MUTEX_UNLOCK (queue);
1853 }
1854
1855 static void
gst_queue_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1856 gst_queue_get_property (GObject * object,
1857 guint prop_id, GValue * value, GParamSpec * pspec)
1858 {
1859 GstQueue *queue = GST_QUEUE (object);
1860
1861 GST_QUEUE_MUTEX_LOCK (queue);
1862
1863 switch (prop_id) {
1864 case PROP_CUR_LEVEL_BYTES:
1865 g_value_set_uint (value, queue->cur_level.bytes);
1866 break;
1867 case PROP_CUR_LEVEL_BUFFERS:
1868 g_value_set_uint (value, queue->cur_level.buffers);
1869 break;
1870 case PROP_CUR_LEVEL_TIME:
1871 g_value_set_uint64 (value, queue->cur_level.time);
1872 break;
1873 case PROP_MAX_SIZE_BYTES:
1874 g_value_set_uint (value, queue->max_size.bytes);
1875 break;
1876 case PROP_MAX_SIZE_BUFFERS:
1877 g_value_set_uint (value, queue->max_size.buffers);
1878 break;
1879 case PROP_MAX_SIZE_TIME:
1880 g_value_set_uint64 (value, queue->max_size.time);
1881 break;
1882 case PROP_MIN_THRESHOLD_BYTES:
1883 g_value_set_uint (value, queue->min_threshold.bytes);
1884 break;
1885 case PROP_MIN_THRESHOLD_BUFFERS:
1886 g_value_set_uint (value, queue->min_threshold.buffers);
1887 break;
1888 case PROP_MIN_THRESHOLD_TIME:
1889 g_value_set_uint64 (value, queue->min_threshold.time);
1890 break;
1891 case PROP_LEAKY:
1892 g_value_set_enum (value, queue->leaky);
1893 break;
1894 case PROP_SILENT:
1895 g_value_set_boolean (value, queue->silent);
1896 break;
1897 case PROP_FLUSH_ON_EOS:
1898 g_value_set_boolean (value, queue->flush_on_eos);
1899 break;
1900 default:
1901 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1902 break;
1903 }
1904
1905 GST_QUEUE_MUTEX_UNLOCK (queue);
1906 }
1907