• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2003 Colin Walters <cwalters@gnome.org>
5  *                    2005 Wim Taymans <wim@fluendo.com>
6  *
7  * gstqueue.c:
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24 
25 /**
26  * SECTION:element-queue
27  * @title: queue
28  *
29  * Data is queued until one of the limits specified by the
30  * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or
31  * #GstQueue:max-size-time properties has been reached. Any attempt to push
32  * more buffers into the queue will block the pushing thread until more space
33  * becomes available.
34  *
35  * The queue will create a new thread on the source pad to decouple the
36  * processing on sink and source pad.
37  *
38  * You can query how many buffers are queued by reading the
39  * #GstQueue:current-level-buffers property. You can track changes
40  * by connecting to the notify::current-level-buffers signal (which
41  * like all signals will be emitted from the streaming thread). The same
42  * applies to the #GstQueue:current-level-time and
43  * #GstQueue:current-level-bytes properties.
44  *
45  * The default queue size limits are 200 buffers, 10MB of data, or
46  * one second worth of data, whichever is reached first.
47  *
48  * As said earlier, the queue blocks by default when one of the specified
49  * maximums (bytes, time, buffers) has been reached. You can set the
50  * #GstQueue:leaky property to specify that instead of blocking it should
51  * leak (drop) new or old buffers.
52  *
53  * The #GstQueue::underrun signal is emitted when the queue has less data than
54  * the specified minimum thresholds require (by default: when the queue is
55  * empty). The #GstQueue::overrun signal is emitted when the queue is filled
56  * up. Both signals are emitted from the context of the streaming thread.
57  */
58 
59 #include "gst/gst_private.h"
60 
61 #include <gst/gst.h>
62 #include "gstqueue.h"
63 #include "gstcoreelementselements.h"
64 
65 #include "../../gst/gst-i18n-lib.h"
66 #include "../../gst/glib-compat-private.h"
67 
68 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
69     GST_PAD_SINK,
70     GST_PAD_ALWAYS,
71     GST_STATIC_CAPS_ANY);
72 
73 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
74     GST_PAD_SRC,
75     GST_PAD_ALWAYS,
76     GST_STATIC_CAPS_ANY);
77 
78 GST_DEBUG_CATEGORY_STATIC (queue_debug);
79 #define GST_CAT_DEFAULT (queue_debug)
80 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
81 
82 #define STATUS(queue, pad, msg) \
83   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
84                       "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
85                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
86                       "-%" G_GUINT64_FORMAT " ns, %u items", \
87                       GST_DEBUG_PAD_NAME (pad), \
88                       queue->cur_level.buffers, \
89                       queue->min_threshold.buffers, \
90                       queue->max_size.buffers, \
91                       queue->cur_level.bytes, \
92                       queue->min_threshold.bytes, \
93                       queue->max_size.bytes, \
94                       queue->cur_level.time, \
95                       queue->min_threshold.time, \
96                       queue->max_size.time, \
97                       gst_queue_array_get_length (queue->queue))
98 
99 /* Queue signals and args */
100 enum
101 {
102   SIGNAL_UNDERRUN,
103   SIGNAL_RUNNING,
104   SIGNAL_OVERRUN,
105   SIGNAL_PUSHING,
106   LAST_SIGNAL
107 };
108 
109 enum
110 {
111   PROP_0,
112   /* FIXME: don't we have another way of doing this
113    * "Gstreamer format" (frame/byte/time) queries? */
114   PROP_CUR_LEVEL_BUFFERS,
115   PROP_CUR_LEVEL_BYTES,
116   PROP_CUR_LEVEL_TIME,
117   PROP_MAX_SIZE_BUFFERS,
118   PROP_MAX_SIZE_BYTES,
119   PROP_MAX_SIZE_TIME,
120   PROP_MIN_THRESHOLD_BUFFERS,
121   PROP_MIN_THRESHOLD_BYTES,
122   PROP_MIN_THRESHOLD_TIME,
123   PROP_LEAKY,
124   PROP_SILENT,
125   PROP_FLUSH_ON_EOS
126 };
127 
128 /* default property values */
129 #define DEFAULT_MAX_SIZE_BUFFERS  200   /* 200 buffers */
130 #define DEFAULT_MAX_SIZE_BYTES    (10 * 1024 * 1024)    /* 10 MB       */
131 #define DEFAULT_MAX_SIZE_TIME     GST_SECOND    /* 1 second    */
132 
133 #define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
134   g_mutex_lock (&q->qlock);                                              \
135 } G_STMT_END
136 
137 #define GST_QUEUE_MUTEX_LOCK_CHECK(q,label) G_STMT_START {              \
138   GST_QUEUE_MUTEX_LOCK (q);                                             \
139   if (q->srcresult != GST_FLOW_OK)                                      \
140     goto label;                                                         \
141 } G_STMT_END
142 
143 #define GST_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                        \
144   g_mutex_unlock (&q->qlock);                                            \
145 } G_STMT_END
146 
147 #define GST_QUEUE_WAIT_DEL_CHECK(q, label) G_STMT_START {               \
148   STATUS (q, q->sinkpad, "wait for DEL");                               \
149   q->waiting_del = TRUE;                                                \
150   g_cond_wait (&q->item_del, &q->qlock);                                  \
151   q->waiting_del = FALSE;                                               \
152   if (q->srcresult != GST_FLOW_OK) {                                    \
153     STATUS (q, q->srcpad, "received DEL wakeup");                       \
154     goto label;                                                         \
155   }                                                                     \
156   STATUS (q, q->sinkpad, "received DEL");                               \
157 } G_STMT_END
158 
159 #define GST_QUEUE_WAIT_ADD_CHECK(q, label) G_STMT_START {               \
160   STATUS (q, q->srcpad, "wait for ADD");                                \
161   q->waiting_add = TRUE;                                                \
162   g_cond_wait (&q->item_add, &q->qlock);                                  \
163   q->waiting_add = FALSE;                                               \
164   if (q->srcresult != GST_FLOW_OK) {                                    \
165     STATUS (q, q->srcpad, "received ADD wakeup");                       \
166     goto label;                                                         \
167   }                                                                     \
168   STATUS (q, q->srcpad, "received ADD");                                \
169 } G_STMT_END
170 
171 #define GST_QUEUE_SIGNAL_DEL(q) G_STMT_START {                          \
172   if (q->waiting_del) {                                                 \
173     STATUS (q, q->srcpad, "signal DEL");                                \
174     g_cond_signal (&q->item_del);                                        \
175   }                                                                     \
176 } G_STMT_END
177 
178 #define GST_QUEUE_SIGNAL_ADD(q) G_STMT_START {                          \
179   if (q->waiting_add) {                                                 \
180     STATUS (q, q->sinkpad, "signal ADD");                               \
181     g_cond_signal (&q->item_add);                                        \
182   }                                                                     \
183 } G_STMT_END
184 
185 #define _do_init \
186     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue", 0, "queue element"); \
187     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue_dataflow", 0, \
188         "dataflow inside the queue element");
189 #define gst_queue_parent_class parent_class
190 G_DEFINE_TYPE_WITH_CODE (GstQueue, gst_queue, GST_TYPE_ELEMENT, _do_init);
191 GST_ELEMENT_REGISTER_DEFINE (queue, "queue", GST_RANK_NONE, GST_TYPE_QUEUE);
192 
193 static void gst_queue_finalize (GObject * object);
194 static void gst_queue_set_property (GObject * object,
195     guint prop_id, const GValue * value, GParamSpec * pspec);
196 static void gst_queue_get_property (GObject * object,
197     guint prop_id, GValue * value, GParamSpec * pspec);
198 
199 static GstFlowReturn gst_queue_chain (GstPad * pad, GstObject * parent,
200     GstBuffer * buffer);
201 static GstFlowReturn gst_queue_chain_list (GstPad * pad, GstObject * parent,
202     GstBufferList * buffer_list);
203 static GstFlowReturn gst_queue_push_one (GstQueue * queue);
204 static void gst_queue_loop (GstPad * pad);
205 
206 static GstFlowReturn gst_queue_handle_sink_event (GstPad * pad,
207     GstObject * parent, GstEvent * event);
208 static gboolean gst_queue_handle_sink_query (GstPad * pad, GstObject * parent,
209     GstQuery * query);
210 
211 static gboolean gst_queue_handle_src_event (GstPad * pad, GstObject * parent,
212     GstEvent * event);
213 static gboolean gst_queue_handle_src_query (GstPad * pad, GstObject * parent,
214     GstQuery * query);
215 
216 static void gst_queue_locked_flush (GstQueue * queue, gboolean full);
217 
218 static gboolean gst_queue_src_activate_mode (GstPad * pad, GstObject * parent,
219     GstPadMode mode, gboolean active);
220 static gboolean gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent,
221     GstPadMode mode, gboolean active);
222 
223 static gboolean gst_queue_is_empty (GstQueue * queue);
224 static gboolean gst_queue_is_filled (GstQueue * queue);
225 
226 
227 typedef struct
228 {
229   GstMiniObject *item;
230   gsize size;
231   gboolean is_query;
232 } GstQueueItem;
233 
234 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
235 
236 static GType
queue_leaky_get_type(void)237 queue_leaky_get_type (void)
238 {
239   static GType queue_leaky_type = 0;
240   static const GEnumValue queue_leaky[] = {
241     {GST_QUEUE_NO_LEAK, "Not Leaky", "no"},
242     {GST_QUEUE_LEAK_UPSTREAM, "Leaky on upstream (new buffers)", "upstream"},
243     {GST_QUEUE_LEAK_DOWNSTREAM, "Leaky on downstream (old buffers)",
244         "downstream"},
245     {0, NULL, NULL},
246   };
247 
248   if (!queue_leaky_type) {
249     queue_leaky_type = g_enum_register_static ("GstQueueLeaky", queue_leaky);
250   }
251   return queue_leaky_type;
252 }
253 
254 static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
255 
256 static void
gst_queue_class_init(GstQueueClass * klass)257 gst_queue_class_init (GstQueueClass * klass)
258 {
259   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
260   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
261 
262   gobject_class->set_property = gst_queue_set_property;
263   gobject_class->get_property = gst_queue_get_property;
264 
265   /* signals */
266   /**
267    * GstQueue::underrun:
268    * @queue: the queue instance
269    *
270    * Reports that the buffer became empty (underrun).
271    * A buffer is empty if the total amount of data inside it (num-buffers, time,
272    * size) is lower than the boundary values which can be set through the
273    * GObject properties.
274    */
275   gst_queue_signals[SIGNAL_UNDERRUN] =
276       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
277       G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
278       NULL, G_TYPE_NONE, 0);
279   /**
280    * GstQueue::running:
281    * @queue: the queue instance
282    *
283    * Reports that enough (min-threshold) data is in the queue. Use this signal
284    * together with the underrun signal to pause the pipeline on underrun and
285    * wait for the queue to fill-up before resume playback.
286    */
287   gst_queue_signals[SIGNAL_RUNNING] =
288       g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
289       G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
290       NULL, G_TYPE_NONE, 0);
291   /**
292    * GstQueue::overrun:
293    * @queue: the queue instance
294    *
295    * Reports that the buffer became full (overrun).
296    * A buffer is full if the total amount of data inside it (num-buffers, time,
297    * size) is higher than the boundary values which can be set through the
298    * GObject properties.
299    */
300   gst_queue_signals[SIGNAL_OVERRUN] =
301       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
302       G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
303       NULL, G_TYPE_NONE, 0);
304   /**
305    * GstQueue::pushing:
306    * @queue: the queue instance
307    *
308    * Reports when the queue has enough data to start pushing data again on the
309    * source pad.
310    */
311   gst_queue_signals[SIGNAL_PUSHING] =
312       g_signal_new ("pushing", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
313       G_STRUCT_OFFSET (GstQueueClass, pushing), NULL, NULL,
314       NULL, G_TYPE_NONE, 0);
315 
316   /* properties */
317   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
318       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
319           "Current amount of data in the queue (bytes)",
320           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
321   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
322       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
323           "Current number of buffers in the queue",
324           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
325   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
326       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
327           "Current amount of data in the queue (in ns)",
328           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
329 
330   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
331       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
332           "Max. amount of data in the queue (bytes, 0=disable)",
333           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
334           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
335           G_PARAM_STATIC_STRINGS));
336   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
337       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
338           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
339           DEFAULT_MAX_SIZE_BUFFERS,
340           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
341           G_PARAM_STATIC_STRINGS));
342   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
343       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
344           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
345           DEFAULT_MAX_SIZE_TIME,
346           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
347           G_PARAM_STATIC_STRINGS));
348 
349   g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_BYTES,
350       g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
351           "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
352           0, G_MAXUINT, 0,
353           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
354           G_PARAM_STATIC_STRINGS));
355   g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_BUFFERS,
356       g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
357           "Min. number of buffers in the queue to allow reading (0=disable)", 0,
358           G_MAXUINT, 0,
359           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
360           G_PARAM_STATIC_STRINGS));
361   g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_TIME,
362       g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
363           "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
364           0, G_MAXUINT64, 0,
365           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
366           G_PARAM_STATIC_STRINGS));
367 
368   g_object_class_install_property (gobject_class, PROP_LEAKY,
369       g_param_spec_enum ("leaky", "Leaky",
370           "Where the queue leaks, if at all",
371           GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK,
372           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
373           G_PARAM_STATIC_STRINGS));
374 
375   /**
376    * GstQueue:silent
377    *
378    * Don't emit queue signals. Makes queues more lightweight if no signals are
379    * needed.
380    */
381   g_object_class_install_property (gobject_class, PROP_SILENT,
382       g_param_spec_boolean ("silent", "Silent",
383           "Don't emit queue signals", FALSE,
384           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
385           G_PARAM_STATIC_STRINGS));
386 
387   /**
388    * queue:flush-on-eos:
389    *
390    * Discard all data in the queue when an EOS event is received, and pass
391    * on the EOS event as soon as possible (instead of waiting until all
392    * buffers in the queue have been processed, which is the default behaviour).
393    *
394    * Flushing the queue on EOS might be useful when capturing and encoding
395    * from a live source, to finish up the recording quickly in cases when
396    * the encoder is slow. Note that this might mean some data from the end of
397    * the recording data might be lost though (never more than the configured
398    * max. sizes though).
399    *
400    * Since: 1.2
401    */
402   g_object_class_install_property (gobject_class, PROP_FLUSH_ON_EOS,
403       g_param_spec_boolean ("flush-on-eos", "Flush on EOS",
404           "Discard all data in the queue when an EOS event is received", FALSE,
405           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
406           G_PARAM_STATIC_STRINGS));
407 
408   gobject_class->finalize = gst_queue_finalize;
409 
410   gst_element_class_set_static_metadata (gstelement_class,
411       "Queue",
412       "Generic", "Simple data queue", "Erik Walthinsen <omega@cse.ogi.edu>");
413   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
414   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
415 
416   /* Registering debug symbols for function pointers */
417   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_src_activate_mode);
418   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_sink_event);
419   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_sink_query);
420   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_event);
421   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_query);
422   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain);
423   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain_list);
424 
425   gst_type_mark_as_plugin_api (GST_TYPE_QUEUE_LEAKY, 0);
426 }
427 
428 static void
gst_queue_init(GstQueue * queue)429 gst_queue_init (GstQueue * queue)
430 {
431   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
432 
433   gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
434   gst_pad_set_chain_list_function (queue->sinkpad, gst_queue_chain_list);
435   gst_pad_set_activatemode_function (queue->sinkpad,
436       gst_queue_sink_activate_mode);
437   gst_pad_set_event_full_function (queue->sinkpad, gst_queue_handle_sink_event);
438   gst_pad_set_query_function (queue->sinkpad, gst_queue_handle_sink_query);
439   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
440   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
441 
442   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
443 
444   gst_pad_set_activatemode_function (queue->srcpad,
445       gst_queue_src_activate_mode);
446   gst_pad_set_event_function (queue->srcpad, gst_queue_handle_src_event);
447   gst_pad_set_query_function (queue->srcpad, gst_queue_handle_src_query);
448   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
449   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
450 
451   GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
452   queue->max_size.buffers = DEFAULT_MAX_SIZE_BUFFERS;
453   queue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
454   queue->max_size.time = DEFAULT_MAX_SIZE_TIME;
455   GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
456   GST_QUEUE_CLEAR_LEVEL (queue->orig_min_threshold);
457   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
458   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
459   queue->head_needs_discont = queue->tail_needs_discont = FALSE;
460 
461   queue->leaky = GST_QUEUE_NO_LEAK;
462   queue->srcresult = GST_FLOW_FLUSHING;
463 
464   g_mutex_init (&queue->qlock);
465   g_cond_init (&queue->item_add);
466   g_cond_init (&queue->item_del);
467   g_cond_init (&queue->query_handled);
468 
469   queue->queue =
470       gst_queue_array_new_for_struct (sizeof (GstQueueItem),
471       DEFAULT_MAX_SIZE_BUFFERS * 3 / 2);
472 
473   queue->sinktime = GST_CLOCK_STIME_NONE;
474   queue->srctime = GST_CLOCK_STIME_NONE;
475 
476   queue->sink_tainted = TRUE;
477   queue->src_tainted = TRUE;
478 
479   queue->newseg_applied_to_src = FALSE;
480 
481   GST_DEBUG_OBJECT (queue,
482       "initialized queue's not_empty & not_full conditions");
483 }
484 
485 /* called only once, as opposed to dispose */
486 static void
gst_queue_finalize(GObject * object)487 gst_queue_finalize (GObject * object)
488 {
489   GstQueue *queue = GST_QUEUE (object);
490   GstQueueItem *qitem;
491 
492   GST_DEBUG_OBJECT (queue, "finalizing queue");
493 
494   while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
495     /* FIXME: if it's a query, shouldn't we unref that too? */
496     if (!qitem->is_query)
497       gst_mini_object_unref (qitem->item);
498   }
499   gst_queue_array_free (queue->queue);
500 
501   g_mutex_clear (&queue->qlock);
502   g_cond_clear (&queue->item_add);
503   g_cond_clear (&queue->item_del);
504   g_cond_clear (&queue->query_handled);
505 
506   G_OBJECT_CLASS (parent_class)->finalize (object);
507 }
508 
509 /* Convenience function */
510 static inline GstClockTimeDiff
my_segment_to_running_time(GstSegment * segment,GstClockTime val)511 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
512 {
513   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
514 
515   if (GST_CLOCK_TIME_IS_VALID (val)) {
516     gboolean sign =
517         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
518     if (sign > 0)
519       res = val;
520     else if (sign < 0)
521       res = -val;
522   }
523   return res;
524 }
525 
526 /* calculate the diff between running time on the sink and src of the queue.
527  * This is the total amount of time in the queue. */
528 static void
update_time_level(GstQueue * queue)529 update_time_level (GstQueue * queue)
530 {
531   gint64 sink_time, src_time;
532 
533   if (queue->sink_tainted) {
534     GST_LOG_OBJECT (queue, "update sink time");
535     queue->sinktime =
536         my_segment_to_running_time (&queue->sink_segment,
537         queue->sink_segment.position);
538     queue->sink_tainted = FALSE;
539   }
540   sink_time = queue->sinktime;
541 
542   if (queue->src_tainted) {
543     GST_LOG_OBJECT (queue, "update src time");
544     queue->srctime =
545         my_segment_to_running_time (&queue->src_segment,
546         queue->src_segment.position);
547     queue->src_tainted = FALSE;
548   }
549   src_time = queue->srctime;
550 
551   GST_LOG_OBJECT (queue, "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT,
552       GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
553 
554   if (GST_CLOCK_STIME_IS_VALID (src_time)
555       && GST_CLOCK_STIME_IS_VALID (sink_time) && sink_time >= src_time)
556     queue->cur_level.time = sink_time - src_time;
557   else
558     queue->cur_level.time = 0;
559 }
560 
561 /* take a SEGMENT event and apply the values to segment, updating the time
562  * level of queue. */
563 static void
apply_segment(GstQueue * queue,GstEvent * event,GstSegment * segment,gboolean sink)564 apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment,
565     gboolean sink)
566 {
567   gst_event_copy_segment (event, segment);
568 
569   /* now configure the values, we use these to track timestamps on the
570    * sinkpad. */
571   if (segment->format != GST_FORMAT_TIME) {
572     /* non-time format, pretent the current time segment is closed with a
573      * 0 start and unknown stop time. */
574     segment->format = GST_FORMAT_TIME;
575     segment->start = 0;
576     segment->stop = -1;
577     segment->time = 0;
578   }
579   if (sink)
580     queue->sink_tainted = TRUE;
581   else
582     queue->src_tainted = TRUE;
583 
584   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
585 
586   /* segment can update the time level of the queue */
587   update_time_level (queue);
588 }
589 
590 static void
apply_gap(GstQueue * queue,GstEvent * event,GstSegment * segment,gboolean is_sink)591 apply_gap (GstQueue * queue, GstEvent * event,
592     GstSegment * segment, gboolean is_sink)
593 {
594   GstClockTime timestamp;
595   GstClockTime duration;
596 
597   gst_event_parse_gap (event, &timestamp, &duration);
598 
599   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
600 
601     if (GST_CLOCK_TIME_IS_VALID (duration)) {
602       timestamp += duration;
603     }
604 
605     segment->position = timestamp;
606 
607     if (is_sink)
608       queue->sink_tainted = TRUE;
609     else
610       queue->src_tainted = TRUE;
611 
612     /* calc diff with other end */
613     update_time_level (queue);
614   }
615 }
616 
617 
618 /* take a buffer and update segment, updating the time level of the queue. */
619 static void
apply_buffer(GstQueue * queue,GstBuffer * buffer,GstSegment * segment,gboolean sink)620 apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
621     gboolean sink)
622 {
623   GstClockTime duration, timestamp;
624 
625   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
626   duration = GST_BUFFER_DURATION (buffer);
627 
628   /* if no timestamp is set, assume it's continuous with the previous
629    * time */
630   if (timestamp == GST_CLOCK_TIME_NONE)
631     timestamp = segment->position;
632 
633   /* add duration */
634   if (duration != GST_CLOCK_TIME_NONE)
635     timestamp += duration;
636 
637   GST_LOG_OBJECT (queue, "%s position updated to %" GST_TIME_FORMAT,
638       segment == &queue->sink_segment ? "sink" : "src",
639       GST_TIME_ARGS (timestamp));
640 
641   segment->position = timestamp;
642   if (sink)
643     queue->sink_tainted = TRUE;
644   else
645     queue->src_tainted = TRUE;
646 
647 
648   /* calc diff with other end */
649   update_time_level (queue);
650 }
651 
652 static gboolean
buffer_list_apply_time(GstBuffer ** buf,guint idx,gpointer user_data)653 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data)
654 {
655   GstClockTime *timestamp = user_data;
656   GstClockTime btime;
657 
658   GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
659       " duration %" GST_TIME_FORMAT, idx, GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
660       GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
661       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
662 
663   btime = GST_BUFFER_DTS_OR_PTS (*buf);
664   if (GST_CLOCK_TIME_IS_VALID (btime))
665     *timestamp = btime;
666 
667   if (GST_BUFFER_DURATION_IS_VALID (*buf))
668     *timestamp += GST_BUFFER_DURATION (*buf);
669 
670   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
671 
672   return TRUE;
673 }
674 
675 /* take a buffer list and update segment, updating the time level of the queue */
676 static void
apply_buffer_list(GstQueue * queue,GstBufferList * buffer_list,GstSegment * segment,gboolean sink)677 apply_buffer_list (GstQueue * queue, GstBufferList * buffer_list,
678     GstSegment * segment, gboolean sink)
679 {
680   GstClockTime timestamp;
681 
682   /* if no timestamp is set, assume it's continuous with the previous time */
683   timestamp = segment->position;
684 
685   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
686 
687   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
688       GST_TIME_ARGS (timestamp));
689 
690   segment->position = timestamp;
691 
692   if (sink)
693     queue->sink_tainted = TRUE;
694   else
695     queue->src_tainted = TRUE;
696 
697   /* calc diff with other end */
698   update_time_level (queue);
699 }
700 
701 static void
gst_queue_locked_flush(GstQueue * queue,gboolean full)702 gst_queue_locked_flush (GstQueue * queue, gboolean full)
703 {
704   GstQueueItem *qitem;
705 
706   while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
707     /* Then lose another reference because we are supposed to destroy that
708        data when flushing */
709     if (!full && !qitem->is_query && GST_IS_EVENT (qitem->item)
710         && GST_EVENT_IS_STICKY (qitem->item)
711         && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
712         && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
713       gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (qitem->item));
714     }
715     if (!qitem->is_query)
716       gst_mini_object_unref (qitem->item);
717     memset (qitem, 0, sizeof (GstQueueItem));
718   }
719   queue->last_query = FALSE;
720   g_cond_signal (&queue->query_handled);
721   GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
722   queue->min_threshold.buffers = queue->orig_min_threshold.buffers;
723   queue->min_threshold.bytes = queue->orig_min_threshold.bytes;
724   queue->min_threshold.time = queue->orig_min_threshold.time;
725   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
726   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
727   queue->head_needs_discont = queue->tail_needs_discont = FALSE;
728 
729   queue->sinktime = queue->srctime = GST_CLOCK_STIME_NONE;
730   queue->sink_tainted = queue->src_tainted = TRUE;
731 
732   /* we deleted a lot of something */
733   GST_QUEUE_SIGNAL_DEL (queue);
734 }
735 
736 /* enqueue an item an update the level stats, with QUEUE_LOCK */
737 static inline void
gst_queue_locked_enqueue_buffer(GstQueue * queue,gpointer item)738 gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
739 {
740   GstQueueItem qitem;
741   GstBuffer *buffer = GST_BUFFER_CAST (item);
742   gsize bsize = gst_buffer_get_size (buffer);
743 
744   /* add buffer to the statistics */
745   queue->cur_level.buffers++;
746   queue->cur_level.bytes += bsize;
747   apply_buffer (queue, buffer, &queue->sink_segment, TRUE);
748 
749   qitem.item = item;
750   qitem.is_query = FALSE;
751   qitem.size = bsize;
752   gst_queue_array_push_tail_struct (queue->queue, &qitem);
753   GST_QUEUE_SIGNAL_ADD (queue);
754 }
755 
756 static inline void
gst_queue_locked_enqueue_buffer_list(GstQueue * queue,gpointer item)757 gst_queue_locked_enqueue_buffer_list (GstQueue * queue, gpointer item)
758 {
759   GstQueueItem qitem;
760   GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
761   gsize bsize;
762 
763   bsize = gst_buffer_list_calculate_size (buffer_list);
764 
765   /* add buffer to the statistics */
766   queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
767   queue->cur_level.bytes += bsize;
768   apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
769 
770   qitem.item = item;
771   qitem.is_query = FALSE;
772   qitem.size = bsize;
773   gst_queue_array_push_tail_struct (queue->queue, &qitem);
774   GST_QUEUE_SIGNAL_ADD (queue);
775 }
776 
777 static inline void
gst_queue_locked_enqueue_event(GstQueue * queue,gpointer item)778 gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
779 {
780   GstQueueItem qitem;
781   GstEvent *event = GST_EVENT_CAST (item);
782 
783   switch (GST_EVENT_TYPE (event)) {
784     case GST_EVENT_EOS:
785       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from upstream");
786       /* Zero the thresholds, this makes sure the queue is completely
787        * filled and we can read all data from the queue. */
788       if (queue->flush_on_eos)
789         gst_queue_locked_flush (queue, FALSE);
790       else
791         GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
792       /* mark the queue as EOS. This prevents us from accepting more data. */
793       queue->eos = TRUE;
794       break;
795     case GST_EVENT_SEGMENT:
796       apply_segment (queue, event, &queue->sink_segment, TRUE);
797       /* if the queue is empty, apply sink segment on the source */
798       if (gst_queue_array_is_empty (queue->queue)) {
799         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Apply segment on srcpad");
800         apply_segment (queue, event, &queue->src_segment, FALSE);
801         queue->newseg_applied_to_src = TRUE;
802       }
803       /* a new segment allows us to accept more buffers if we got EOS
804        * from downstream */
805       queue->unexpected = FALSE;
806       break;
807     case GST_EVENT_GAP:
808       apply_gap (queue, event, &queue->sink_segment, TRUE);
809       break;
810     default:
811       break;
812   }
813 
814   qitem.item = item;
815   qitem.is_query = FALSE;
816   qitem.size = 0;
817   gst_queue_array_push_tail_struct (queue->queue, &qitem);
818   GST_QUEUE_SIGNAL_ADD (queue);
819 }
820 
821 /* dequeue an item from the queue and update level stats, with QUEUE_LOCK */
822 static GstMiniObject *
gst_queue_locked_dequeue(GstQueue * queue)823 gst_queue_locked_dequeue (GstQueue * queue)
824 {
825   GstQueueItem *qitem;
826   GstMiniObject *item;
827   gsize bufsize;
828 
829   qitem = gst_queue_array_pop_head_struct (queue->queue);
830   if (qitem == NULL)
831     goto no_item;
832 
833   item = qitem->item;
834   bufsize = qitem->size;
835 
836   if (GST_IS_BUFFER (item)) {
837     GstBuffer *buffer = GST_BUFFER_CAST (item);
838 
839     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
840         "retrieved buffer %p from queue", buffer);
841 
842     queue->cur_level.buffers--;
843     queue->cur_level.bytes -= bufsize;
844     apply_buffer (queue, buffer, &queue->src_segment, FALSE);
845 
846     /* if the queue is empty now, update the other side */
847     if (queue->cur_level.buffers == 0)
848       queue->cur_level.time = 0;
849   } else if (GST_IS_BUFFER_LIST (item)) {
850     GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
851 
852     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
853         "retrieved buffer list %p from queue", buffer_list);
854 
855     queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
856     queue->cur_level.bytes -= bufsize;
857     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
858 
859     /* if the queue is empty now, update the other side */
860     if (queue->cur_level.buffers == 0)
861       queue->cur_level.time = 0;
862   } else if (GST_IS_EVENT (item)) {
863     GstEvent *event = GST_EVENT_CAST (item);
864 
865     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
866         "retrieved event %p from queue", event);
867 
868     switch (GST_EVENT_TYPE (event)) {
869       case GST_EVENT_EOS:
870         /* queue is empty now that we dequeued the EOS */
871         GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
872         break;
873       case GST_EVENT_SEGMENT:
874         /* apply newsegment if it has not already been applied */
875         if (G_LIKELY (!queue->newseg_applied_to_src)) {
876           apply_segment (queue, event, &queue->src_segment, FALSE);
877         } else {
878           queue->newseg_applied_to_src = FALSE;
879         }
880         break;
881       case GST_EVENT_GAP:
882         apply_gap (queue, event, &queue->src_segment, FALSE);
883         break;
884       default:
885         break;
886     }
887   } else if (GST_IS_QUERY (item)) {
888     GstQuery *query = GST_QUERY_CAST (item);
889 
890     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
891         "retrieved query %p from queue", query);
892   } else {
893     g_warning
894         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
895         item, GST_OBJECT_NAME (queue));
896     item = NULL;
897   }
898   GST_QUEUE_SIGNAL_DEL (queue);
899 
900   return item;
901 
902   /* ERRORS */
903 no_item:
904   {
905     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "the queue is empty");
906     return NULL;
907   }
908 }
909 
910 static GstFlowReturn
gst_queue_handle_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)911 gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
912 {
913   gboolean ret = TRUE;
914   GstQueue *queue;
915 
916   queue = GST_QUEUE (parent);
917 
918   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
919       GST_EVENT_TYPE_NAME (event));
920 
921   switch (GST_EVENT_TYPE (event)) {
922     case GST_EVENT_FLUSH_START:
923       /* forward event */
924       ret = gst_pad_push_event (queue->srcpad, event);
925 
926       /* now unblock the chain function */
927       GST_QUEUE_MUTEX_LOCK (queue);
928       queue->srcresult = GST_FLOW_FLUSHING;
929       /* unblock the loop and chain functions */
930       GST_QUEUE_SIGNAL_ADD (queue);
931       GST_QUEUE_SIGNAL_DEL (queue);
932       GST_QUEUE_MUTEX_UNLOCK (queue);
933 
934       /* make sure it pauses, this should happen since we sent
935        * flush_start downstream. */
936       gst_pad_pause_task (queue->srcpad);
937       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
938 
939       /* unblock query handler after the streaming thread is shut down.
940        * Otherwise downstream might have a query that is already unreffed
941        * upstream */
942       GST_QUEUE_MUTEX_LOCK (queue);
943       queue->last_query = FALSE;
944       g_cond_signal (&queue->query_handled);
945       GST_QUEUE_MUTEX_UNLOCK (queue);
946       break;
947     case GST_EVENT_FLUSH_STOP:
948       /* forward event */
949       ret = gst_pad_push_event (queue->srcpad, event);
950 
951       GST_QUEUE_MUTEX_LOCK (queue);
952       gst_queue_locked_flush (queue, FALSE);
953       queue->srcresult = GST_FLOW_OK;
954       queue->eos = FALSE;
955       queue->unexpected = FALSE;
956       if (gst_pad_is_active (queue->srcpad)) {
957         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
958             queue->srcpad, NULL);
959       } else {
960         GST_INFO_OBJECT (queue->srcpad, "not re-starting task on srcpad, "
961             "pad not active any longer");
962       }
963       GST_QUEUE_MUTEX_UNLOCK (queue);
964 
965       STATUS (queue, pad, "after flush");
966       break;
967     default:
968       if (GST_EVENT_IS_SERIALIZED (event)) {
969         /* serialized events go in the queue */
970         GST_QUEUE_MUTEX_LOCK (queue);
971 
972         /* STREAM_START and SEGMENT reset the EOS status of a
973          * pad. Change the cached sinkpad flow result accordingly */
974         if (queue->srcresult == GST_FLOW_EOS
975             && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
976                 || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
977           queue->srcresult = GST_FLOW_OK;
978 
979         if (queue->srcresult != GST_FLOW_OK) {
980           /* Errors in sticky event pushing are no problem and ignored here
981            * as they will cause more meaningful errors during data flow.
982            * For EOS events, that are not followed by data flow, we still
983            * return FALSE here though and report an error.
984            */
985           if (!GST_EVENT_IS_STICKY (event)) {
986             GST_QUEUE_MUTEX_UNLOCK (queue);
987             goto out_flow_error;
988           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
989             if (queue->srcresult == GST_FLOW_NOT_LINKED
990                 || queue->srcresult < GST_FLOW_EOS) {
991               GST_QUEUE_MUTEX_UNLOCK (queue);
992               GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
993             } else {
994               GST_QUEUE_MUTEX_UNLOCK (queue);
995             }
996             goto out_flow_error;
997           }
998         }
999 
1000         /* refuse more events on EOS unless they unset the EOS status */
1001         if (queue->eos) {
1002           switch (GST_EVENT_TYPE (event)) {
1003             case GST_EVENT_STREAM_START:
1004             case GST_EVENT_SEGMENT:
1005               /* Restart the loop */
1006               if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
1007                 queue->srcresult = GST_FLOW_OK;
1008                 queue->eos = FALSE;
1009                 queue->unexpected = FALSE;
1010                 gst_pad_start_task (queue->srcpad,
1011                     (GstTaskFunction) gst_queue_loop, queue->srcpad, NULL);
1012               } else {
1013                 queue->eos = FALSE;
1014                 queue->unexpected = FALSE;
1015               }
1016 
1017               break;
1018             default:
1019               goto out_eos;
1020           }
1021         }
1022 
1023         gst_queue_locked_enqueue_event (queue, event);
1024         GST_QUEUE_MUTEX_UNLOCK (queue);
1025       } else {
1026         /* non-serialized events are forwarded downstream immediately */
1027         ret = gst_pad_push_event (queue->srcpad, event);
1028       }
1029       break;
1030   }
1031   if (ret == FALSE) {
1032     GST_ERROR_OBJECT (queue, "Failed to push event");
1033     return GST_FLOW_ERROR;
1034   }
1035   return GST_FLOW_OK;
1036 
1037   /* ERRORS */
1038 out_eos:
1039   {
1040     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "refusing event, we are EOS");
1041     GST_QUEUE_MUTEX_UNLOCK (queue);
1042     gst_event_unref (event);
1043     return GST_FLOW_EOS;
1044   }
1045 out_flow_error:
1046   {
1047     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1048         "refusing event, we have a downstream flow error: %s",
1049         gst_flow_get_name (queue->srcresult));
1050     gst_event_unref (event);
1051     return queue->srcresult;
1052   }
1053 }
1054 
1055 static gboolean
gst_queue_handle_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)1056 gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
1057 {
1058   GstQueue *queue = GST_QUEUE_CAST (parent);
1059   gboolean res;
1060 
1061   switch (GST_QUERY_TYPE (query)) {
1062     default:
1063       if (G_UNLIKELY (GST_QUERY_IS_SERIALIZED (query))) {
1064         GstQueueItem qitem;
1065 
1066         GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1067         GST_LOG_OBJECT (queue, "queuing query %p (%s)", query,
1068             GST_QUERY_TYPE_NAME (query));
1069         qitem.item = GST_MINI_OBJECT_CAST (query);
1070         qitem.is_query = TRUE;
1071         qitem.size = 0;
1072         gst_queue_array_push_tail_struct (queue->queue, &qitem);
1073         GST_QUEUE_SIGNAL_ADD (queue);
1074         while (queue->srcresult == GST_FLOW_OK &&
1075             queue->last_handled_query != query)
1076           g_cond_wait (&queue->query_handled, &queue->qlock);
1077         queue->last_handled_query = NULL;
1078         if (queue->srcresult != GST_FLOW_OK)
1079           goto out_flushing;
1080         res = queue->last_query;
1081         GST_QUEUE_MUTEX_UNLOCK (queue);
1082       } else {
1083         res = gst_pad_query_default (pad, parent, query);
1084       }
1085       break;
1086   }
1087   return res;
1088 
1089   /* ERRORS */
1090 out_flushing:
1091   {
1092     GST_DEBUG_OBJECT (queue, "we are flushing");
1093     GST_QUEUE_MUTEX_UNLOCK (queue);
1094     return FALSE;
1095   }
1096 }
1097 
1098 static gboolean
gst_queue_is_empty(GstQueue * queue)1099 gst_queue_is_empty (GstQueue * queue)
1100 {
1101   GstQueueItem *tail;
1102 
1103   tail = gst_queue_array_peek_tail_struct (queue->queue);
1104 
1105   if (tail == NULL)
1106     return TRUE;
1107 
1108   /* Only consider the queue empty if the minimum thresholds
1109    * are not reached and data is at the queue tail. Otherwise
1110    * we would block forever on serialized queries.
1111    */
1112   if (!GST_IS_BUFFER (tail->item) && !GST_IS_BUFFER_LIST (tail->item))
1113     return FALSE;
1114 
1115   /* It is possible that a max size is reached before all min thresholds are.
1116    * Therefore, only consider it empty if it is not filled. */
1117   return ((queue->min_threshold.buffers > 0 &&
1118           queue->cur_level.buffers < queue->min_threshold.buffers) ||
1119       (queue->min_threshold.bytes > 0 &&
1120           queue->cur_level.bytes < queue->min_threshold.bytes) ||
1121       (queue->min_threshold.time > 0 &&
1122           queue->cur_level.time < queue->min_threshold.time)) &&
1123       !gst_queue_is_filled (queue);
1124 }
1125 
1126 static gboolean
gst_queue_is_filled(GstQueue * queue)1127 gst_queue_is_filled (GstQueue * queue)
1128 {
1129   return (((queue->max_size.buffers > 0 &&
1130               queue->cur_level.buffers >= queue->max_size.buffers) ||
1131           (queue->max_size.bytes > 0 &&
1132               queue->cur_level.bytes >= queue->max_size.bytes) ||
1133           (queue->max_size.time > 0 &&
1134               queue->cur_level.time >= queue->max_size.time)));
1135 }
1136 
1137 static void
gst_queue_leak_downstream(GstQueue * queue)1138 gst_queue_leak_downstream (GstQueue * queue)
1139 {
1140   /* for as long as the queue is filled, dequeue an item and discard it */
1141   while (gst_queue_is_filled (queue)) {
1142     GstMiniObject *leak;
1143 
1144     leak = gst_queue_locked_dequeue (queue);
1145     /* there is nothing to dequeue and the queue is still filled.. This should
1146      * not happen */
1147     g_assert (leak != NULL);
1148 
1149     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1150         "queue is full, leaking item %p on downstream end", leak);
1151     if (GST_IS_EVENT (leak) && GST_EVENT_IS_STICKY (leak)) {
1152       GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1153           "Storing sticky event %s on srcpad", GST_EVENT_TYPE_NAME (leak));
1154       gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (leak));
1155     }
1156 
1157     if (!GST_IS_QUERY (leak))
1158       gst_mini_object_unref (leak);
1159 
1160     /* last buffer needs to get a DISCONT flag */
1161     queue->head_needs_discont = TRUE;
1162   }
1163 }
1164 
1165 static gboolean
discont_first_buffer(GstBuffer ** buffer,guint i,gpointer user_data)1166 discont_first_buffer (GstBuffer ** buffer, guint i, gpointer user_data)
1167 {
1168   GstQueue *queue = user_data;
1169   GstBuffer *subbuffer = gst_buffer_make_writable (*buffer);
1170 
1171   if (subbuffer) {
1172     *buffer = subbuffer;
1173     GST_BUFFER_FLAG_SET (*buffer, GST_BUFFER_FLAG_DISCONT);
1174   } else {
1175     GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
1176   }
1177 
1178   return FALSE;
1179 }
1180 
1181 static GstFlowReturn
gst_queue_chain_buffer_or_list(GstPad * pad,GstObject * parent,GstMiniObject * obj,gboolean is_list)1182 gst_queue_chain_buffer_or_list (GstPad * pad, GstObject * parent,
1183     GstMiniObject * obj, gboolean is_list)
1184 {
1185   GstQueue *queue;
1186 
1187   queue = GST_QUEUE_CAST (parent);
1188 
1189   /* we have to lock the queue since we span threads */
1190   GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1191   /* when we received EOS, we refuse any more data */
1192   if (queue->eos)
1193     goto out_eos;
1194   if (queue->unexpected)
1195     goto out_unexpected;
1196 
1197   if (!is_list) {
1198     GstClockTime duration, timestamp;
1199     GstBuffer *buffer = GST_BUFFER_CAST (obj);
1200 
1201     timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
1202     duration = GST_BUFFER_DURATION (buffer);
1203 
1204     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
1205         G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
1206         GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
1207         GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
1208   } else {
1209     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1210         "received buffer list %p with %u buffers", obj,
1211         gst_buffer_list_length (GST_BUFFER_LIST_CAST (obj)));
1212   }
1213 
1214   /* We make space available if we're "full" according to whatever
1215    * the user defined as "full". Note that this only applies to buffers.
1216    * We always handle events and they don't count in our statistics. */
1217   while (gst_queue_is_filled (queue)) {
1218     if (!queue->silent) {
1219       GST_QUEUE_MUTEX_UNLOCK (queue);
1220       g_signal_emit (queue, gst_queue_signals[SIGNAL_OVERRUN], 0);
1221       GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1222       /* we recheck, the signal could have changed the thresholds */
1223       if (!gst_queue_is_filled (queue))
1224         break;
1225     }
1226 
1227     /* how are we going to make space for this buffer? */
1228     switch (queue->leaky) {
1229       case GST_QUEUE_LEAK_UPSTREAM:
1230         /* next buffer needs to get a DISCONT flag */
1231         queue->tail_needs_discont = TRUE;
1232         /* leak current buffer */
1233         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1234             "queue is full, leaking buffer on upstream end");
1235         /* now we can clean up and exit right away */
1236         goto out_unref;
1237       case GST_QUEUE_LEAK_DOWNSTREAM:
1238         gst_queue_leak_downstream (queue);
1239         break;
1240       default:
1241         g_warning ("Unknown leaky type, using default");
1242         /* fall-through */
1243       case GST_QUEUE_NO_LEAK:
1244       {
1245         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1246             "queue is full, waiting for free space");
1247 
1248         /* don't leak. Instead, wait for space to be available */
1249         /* for as long as the queue is filled, wait till an item was deleted. */
1250         while (gst_queue_is_filled (queue)) {
1251           GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is full, waiting for item was deleted");
1252           GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
1253         };
1254 
1255         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is not full");
1256 
1257         if (!queue->silent) {
1258           GST_QUEUE_MUTEX_UNLOCK (queue);
1259           g_signal_emit (queue, gst_queue_signals[SIGNAL_RUNNING], 0);
1260           GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1261         }
1262         break;
1263       }
1264     }
1265   }
1266 
1267   if (queue->tail_needs_discont) {
1268     if (!is_list) {
1269       GstBuffer *buffer = GST_BUFFER_CAST (obj);
1270       GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
1271 
1272       if (subbuffer) {
1273         buffer = subbuffer;
1274         GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1275       } else {
1276         GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
1277       }
1278 
1279       obj = GST_MINI_OBJECT_CAST (buffer);
1280     } else {
1281       GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (obj);
1282 
1283       buffer_list = gst_buffer_list_make_writable (buffer_list);
1284       gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
1285       obj = GST_MINI_OBJECT_CAST (buffer_list);
1286     }
1287     queue->tail_needs_discont = FALSE;
1288   }
1289 
1290   /* put buffer in queue now */
1291   if (is_list)
1292     gst_queue_locked_enqueue_buffer_list (queue, obj);
1293   else
1294     gst_queue_locked_enqueue_buffer (queue, obj);
1295   GST_QUEUE_MUTEX_UNLOCK (queue);
1296 
1297   return GST_FLOW_OK;
1298 
1299   /* special conditions */
1300 out_unref:
1301   {
1302     GST_QUEUE_MUTEX_UNLOCK (queue);
1303 
1304     gst_mini_object_unref (obj);
1305 
1306     return GST_FLOW_OK;
1307   }
1308 out_flushing:
1309   {
1310     GstFlowReturn ret = queue->srcresult;
1311 
1312     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1313         "exit because task paused, reason: %s", gst_flow_get_name (ret));
1314     GST_QUEUE_MUTEX_UNLOCK (queue);
1315     gst_mini_object_unref (obj);
1316 
1317     return ret;
1318   }
1319 out_eos:
1320   {
1321     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
1322     GST_QUEUE_MUTEX_UNLOCK (queue);
1323 
1324     gst_mini_object_unref (obj);
1325 
1326     return GST_FLOW_EOS;
1327   }
1328 out_unexpected:
1329   {
1330     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
1331     GST_QUEUE_MUTEX_UNLOCK (queue);
1332 
1333     gst_mini_object_unref (obj);
1334 
1335     return GST_FLOW_EOS;
1336   }
1337 }
1338 
1339 static GstFlowReturn
gst_queue_chain_list(GstPad * pad,GstObject * parent,GstBufferList * buffer_list)1340 gst_queue_chain_list (GstPad * pad, GstObject * parent,
1341     GstBufferList * buffer_list)
1342 {
1343   return gst_queue_chain_buffer_or_list (pad, parent,
1344       GST_MINI_OBJECT_CAST (buffer_list), TRUE);
1345 }
1346 
1347 static GstFlowReturn
gst_queue_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)1348 gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1349 {
1350   return gst_queue_chain_buffer_or_list (pad, parent,
1351       GST_MINI_OBJECT_CAST (buffer), FALSE);
1352 }
1353 
1354 /* dequeue an item from the queue an push it downstream. This functions returns
1355  * the result of the push. */
1356 static GstFlowReturn
gst_queue_push_one(GstQueue * queue)1357 gst_queue_push_one (GstQueue * queue)
1358 {
1359   GstFlowReturn result = queue->srcresult;
1360   GstMiniObject *data;
1361   gboolean is_list;
1362 
1363   data = gst_queue_locked_dequeue (queue);
1364   if (data == NULL)
1365     goto no_item;
1366 
1367 next:
1368   is_list = GST_IS_BUFFER_LIST (data);
1369 
1370   if (GST_IS_BUFFER (data) || is_list) {
1371     if (!is_list) {
1372       GstBuffer *buffer;
1373 
1374       buffer = GST_BUFFER_CAST (data);
1375 
1376       if (queue->head_needs_discont) {
1377         GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
1378 
1379         if (subbuffer) {
1380           buffer = subbuffer;
1381           GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1382         } else {
1383           GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
1384         }
1385         queue->head_needs_discont = FALSE;
1386       }
1387 
1388       GST_QUEUE_MUTEX_UNLOCK (queue);
1389       result = gst_pad_push (queue->srcpad, buffer);
1390     } else {
1391       GstBufferList *buffer_list;
1392 
1393       buffer_list = GST_BUFFER_LIST_CAST (data);
1394 
1395       if (queue->head_needs_discont) {
1396         buffer_list = gst_buffer_list_make_writable (buffer_list);
1397         gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
1398         queue->head_needs_discont = FALSE;
1399       }
1400 
1401       GST_QUEUE_MUTEX_UNLOCK (queue);
1402       result = gst_pad_push_list (queue->srcpad, buffer_list);
1403     }
1404 
1405     /* need to check for srcresult here as well */
1406     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1407 
1408     if (result == GST_FLOW_EOS) {
1409       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
1410       /* stop pushing buffers, we dequeue all items until we see an item that we
1411        * can push again, which is EOS or SEGMENT. If there is nothing in the
1412        * queue we can push, we set a flag to make the sinkpad refuse more
1413        * buffers with an EOS return value. */
1414       while ((data = gst_queue_locked_dequeue (queue))) {
1415         if (GST_IS_BUFFER (data)) {
1416           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1417               "dropping EOS buffer %p", data);
1418           gst_buffer_unref (GST_BUFFER_CAST (data));
1419         } else if (GST_IS_BUFFER_LIST (data)) {
1420           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1421               "dropping EOS buffer list %p", data);
1422           gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
1423         } else if (GST_IS_EVENT (data)) {
1424           GstEvent *event = GST_EVENT_CAST (data);
1425           GstEventType type = GST_EVENT_TYPE (event);
1426 
1427           if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
1428               || type == GST_EVENT_STREAM_START) {
1429             /* we found a pushable item in the queue, push it out */
1430             GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1431                 "pushing pushable event %s after EOS",
1432                 GST_EVENT_TYPE_NAME (event));
1433             goto next;
1434           }
1435           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1436               "dropping EOS event %p", event);
1437           gst_event_unref (event);
1438         } else if (GST_IS_QUERY (data)) {
1439           GstQuery *query = GST_QUERY_CAST (data);
1440 
1441           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1442               "dropping query %p because of EOS", query);
1443           queue->last_query = FALSE;
1444           g_cond_signal (&queue->query_handled);
1445         }
1446       }
1447       /* no more items in the queue. Set the unexpected flag so that upstream
1448        * make us refuse any more buffers on the sinkpad. Since we will still
1449        * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
1450        * task function does not shut down. */
1451       queue->unexpected = TRUE;
1452       result = GST_FLOW_OK;
1453     }
1454   } else if (GST_IS_EVENT (data)) {
1455     GstEvent *event = GST_EVENT_CAST (data);
1456     GstEventType type = GST_EVENT_TYPE (event);
1457 
1458     GST_QUEUE_MUTEX_UNLOCK (queue);
1459 
1460     gst_pad_push_event (queue->srcpad, event);
1461 
1462     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1463     /* if we're EOS, return EOS so that the task pauses. */
1464     if (type == GST_EVENT_EOS) {
1465       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1466           "pushed EOS event %p, return EOS", event);
1467       result = GST_FLOW_EOS;
1468     }
1469   } else if (GST_IS_QUERY (data)) {
1470     GstQuery *query = GST_QUERY_CAST (data);
1471     gboolean ret;
1472 
1473     GST_QUEUE_MUTEX_UNLOCK (queue);
1474     ret = gst_pad_peer_query (queue->srcpad, query);
1475     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing_query);
1476     queue->last_query = ret;
1477     queue->last_handled_query = query;
1478     g_cond_signal (&queue->query_handled);
1479     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1480         "did query %p, return %d", query, queue->last_query);
1481   }
1482   return result;
1483 
1484   /* ERRORS */
1485 no_item:
1486   {
1487     GST_CAT_ERROR_OBJECT (queue_dataflow, queue,
1488         "exit because we have no item in the queue");
1489     return GST_FLOW_ERROR;
1490   }
1491 out_flushing:
1492   {
1493     GstFlowReturn ret = queue->srcresult;
1494     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1495         "exit because task paused, reason: %s", gst_flow_get_name (ret));
1496     return ret;
1497   }
1498 out_flushing_query:
1499   {
1500     GstFlowReturn ret = queue->srcresult;
1501     queue->last_query = FALSE;
1502     g_cond_signal (&queue->query_handled);
1503     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1504         "exit because task paused, reason: %s", gst_flow_get_name (ret));
1505     return ret;
1506   }
1507 }
1508 
1509 static void
gst_queue_loop(GstPad * pad)1510 gst_queue_loop (GstPad * pad)
1511 {
1512   GstQueue *queue;
1513   GstFlowReturn ret;
1514 
1515   queue = (GstQueue *) GST_PAD_PARENT (pad);
1516 
1517   /* have to lock for thread-safety */
1518   GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1519 
1520   while (gst_queue_is_empty (queue)) {
1521     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is empty");
1522     if (!queue->silent) {
1523       GST_QUEUE_MUTEX_UNLOCK (queue);
1524       g_signal_emit (queue, gst_queue_signals[SIGNAL_UNDERRUN], 0);
1525       GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1526     }
1527 
1528     /* we recheck, the signal could have changed the thresholds */
1529     while (gst_queue_is_empty (queue)) {
1530       GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
1531     }
1532 
1533     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is not empty");
1534     if (!queue->silent) {
1535       GST_QUEUE_MUTEX_UNLOCK (queue);
1536       g_signal_emit (queue, gst_queue_signals[SIGNAL_RUNNING], 0);
1537       g_signal_emit (queue, gst_queue_signals[SIGNAL_PUSHING], 0);
1538       GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1539     }
1540   }
1541 
1542   ret = gst_queue_push_one (queue);
1543   queue->srcresult = ret;
1544   if (ret != GST_FLOW_OK)
1545     goto out_flushing;
1546 
1547   GST_QUEUE_MUTEX_UNLOCK (queue);
1548 
1549   return;
1550 
1551   /* ERRORS */
1552 out_flushing:
1553   {
1554     gboolean eos = queue->eos;
1555     GstFlowReturn ret = queue->srcresult;
1556 
1557     gst_pad_pause_task (queue->srcpad);
1558     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1559         "pause task, reason:  %s", gst_flow_get_name (ret));
1560     if (ret == GST_FLOW_FLUSHING) {
1561       gst_queue_locked_flush (queue, FALSE);
1562     } else {
1563       GST_QUEUE_SIGNAL_DEL (queue);
1564       queue->last_query = FALSE;
1565       g_cond_signal (&queue->query_handled);
1566     }
1567     GST_QUEUE_MUTEX_UNLOCK (queue);
1568     /* let app know about us giving up if upstream is not expected to do so */
1569     /* EOS is already taken care of elsewhere */
1570     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
1571       GST_ELEMENT_FLOW_ERROR (queue, ret);
1572       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
1573     }
1574     return;
1575   }
1576 }
1577 
1578 static gboolean
gst_queue_handle_src_event(GstPad * pad,GstObject * parent,GstEvent * event)1579 gst_queue_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
1580 {
1581   gboolean res = TRUE;
1582   GstQueue *queue = GST_QUEUE (parent);
1583 
1584 #ifndef GST_DISABLE_GST_DEBUG
1585   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
1586       event, GST_EVENT_TYPE (event));
1587 #endif
1588 
1589   switch (GST_EVENT_TYPE (event)) {
1590     case GST_EVENT_RECONFIGURE:
1591       GST_QUEUE_MUTEX_LOCK (queue);
1592       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
1593         /* when we got not linked, assume downstream is linked again now and we
1594          * can try to start pushing again */
1595         queue->srcresult = GST_FLOW_OK;
1596         gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad, NULL);
1597       }
1598       GST_QUEUE_MUTEX_UNLOCK (queue);
1599 
1600       res = gst_pad_push_event (queue->sinkpad, event);
1601       break;
1602     default:
1603       res = gst_pad_event_default (pad, parent, event);
1604       break;
1605   }
1606 
1607 
1608   return res;
1609 }
1610 
1611 static gboolean
gst_queue_handle_src_query(GstPad * pad,GstObject * parent,GstQuery * query)1612 gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1613 {
1614   GstQueue *queue = GST_QUEUE (parent);
1615   gboolean res;
1616 
1617   switch (GST_QUERY_TYPE (query)) {
1618     case GST_QUERY_SCHEDULING:{
1619       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1620       res = TRUE;
1621       break;
1622     }
1623     default:
1624       res = gst_pad_query_default (pad, parent, query);
1625       break;
1626   }
1627 
1628   if (!res)
1629     return FALSE;
1630 
1631   /* Adjust peer response for data contained in queue */
1632   switch (GST_QUERY_TYPE (query)) {
1633     case GST_QUERY_POSITION:
1634     {
1635       gint64 peer_pos;
1636       GstFormat format;
1637 
1638       /* get peer position */
1639       gst_query_parse_position (query, &format, &peer_pos);
1640 
1641       /* FIXME: this code assumes that there's no discont in the queue */
1642       switch (format) {
1643         case GST_FORMAT_BYTES:
1644           peer_pos -= queue->cur_level.bytes;
1645           if (peer_pos < 0)     /* Clamp result to 0 */
1646             peer_pos = 0;
1647           break;
1648         case GST_FORMAT_TIME:
1649           peer_pos -= queue->cur_level.time;
1650           if (peer_pos < 0)     /* Clamp result to 0 */
1651             peer_pos = 0;
1652           break;
1653         default:
1654           GST_DEBUG_OBJECT (queue, "Can't adjust query in %s format, don't "
1655               "know how to adjust value", gst_format_get_name (format));
1656           return TRUE;
1657       }
1658       /* set updated position */
1659       gst_query_set_position (query, format, peer_pos);
1660       break;
1661     }
1662     case GST_QUERY_LATENCY:
1663     {
1664       gboolean live;
1665       GstClockTime min, max;
1666 
1667       gst_query_parse_latency (query, &live, &min, &max);
1668 
1669       /* we can delay up to the limit of the queue in time. If we have no time
1670        * limit, the best thing we can do is to return an infinite delay. In
1671        * reality a better estimate would be the byte/buffer rate but that is not
1672        * possible right now. */
1673       /* TODO: Use CONVERT query? */
1674       if (queue->max_size.time > 0 && max != -1
1675           && queue->leaky == GST_QUEUE_NO_LEAK)
1676         max += queue->max_size.time;
1677       else if (queue->max_size.time > 0 && queue->leaky != GST_QUEUE_NO_LEAK)
1678         max = MAX (queue->max_size.time, max);
1679       else
1680         max = -1;
1681 
1682       /* adjust for min-threshold */
1683       if (queue->min_threshold.time > 0)
1684         min += queue->min_threshold.time;
1685 
1686       gst_query_set_latency (query, live, min, max);
1687       break;
1688     }
1689     default:
1690       /* peer handled other queries */
1691       break;
1692   }
1693 
1694   return TRUE;
1695 }
1696 
1697 static gboolean
gst_queue_sink_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)1698 gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
1699     gboolean active)
1700 {
1701   gboolean result;
1702   GstQueue *queue;
1703 
1704   queue = GST_QUEUE (parent);
1705 
1706   switch (mode) {
1707     case GST_PAD_MODE_PUSH:
1708       if (active) {
1709         GST_QUEUE_MUTEX_LOCK (queue);
1710         queue->srcresult = GST_FLOW_OK;
1711         queue->eos = FALSE;
1712         queue->unexpected = FALSE;
1713         GST_QUEUE_MUTEX_UNLOCK (queue);
1714       } else {
1715         /* step 1, unblock chain function */
1716         GST_QUEUE_MUTEX_LOCK (queue);
1717         queue->srcresult = GST_FLOW_FLUSHING;
1718         /* the item del signal will unblock */
1719         GST_QUEUE_SIGNAL_DEL (queue);
1720         GST_QUEUE_MUTEX_UNLOCK (queue);
1721 
1722         /* step 2, wait until streaming thread stopped and flush queue */
1723         GST_PAD_STREAM_LOCK (pad);
1724         GST_QUEUE_MUTEX_LOCK (queue);
1725         gst_queue_locked_flush (queue, TRUE);
1726         GST_QUEUE_MUTEX_UNLOCK (queue);
1727         GST_PAD_STREAM_UNLOCK (pad);
1728       }
1729       result = TRUE;
1730       break;
1731     default:
1732       result = FALSE;
1733       break;
1734   }
1735   return result;
1736 }
1737 
1738 static gboolean
gst_queue_src_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)1739 gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
1740     gboolean active)
1741 {
1742   gboolean result;
1743   GstQueue *queue;
1744 
1745   queue = GST_QUEUE (parent);
1746 
1747   switch (mode) {
1748     case GST_PAD_MODE_PUSH:
1749       if (active) {
1750         GST_QUEUE_MUTEX_LOCK (queue);
1751         queue->srcresult = GST_FLOW_OK;
1752         queue->eos = FALSE;
1753         queue->unexpected = FALSE;
1754         result =
1755             gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad,
1756             NULL);
1757         GST_QUEUE_MUTEX_UNLOCK (queue);
1758       } else {
1759         /* step 1, unblock loop function */
1760         GST_QUEUE_MUTEX_LOCK (queue);
1761         queue->srcresult = GST_FLOW_FLUSHING;
1762         /* the item add signal will unblock */
1763         g_cond_signal (&queue->item_add);
1764         GST_QUEUE_MUTEX_UNLOCK (queue);
1765 
1766         /* step 2, make sure streaming finishes */
1767         result = gst_pad_stop_task (pad);
1768 
1769         GST_QUEUE_MUTEX_LOCK (queue);
1770         gst_queue_locked_flush (queue, FALSE);
1771         GST_QUEUE_MUTEX_UNLOCK (queue);
1772       }
1773       break;
1774     default:
1775       result = FALSE;
1776       break;
1777   }
1778   return result;
1779 }
1780 
1781 static void
queue_capacity_change(GstQueue * queue)1782 queue_capacity_change (GstQueue * queue)
1783 {
1784   if (queue->leaky == GST_QUEUE_LEAK_DOWNSTREAM) {
1785     gst_queue_leak_downstream (queue);
1786   }
1787 
1788   /* changing the capacity of the queue must wake up
1789    * the _chain function, it might have more room now
1790    * to store the buffer/event in the queue */
1791   GST_QUEUE_SIGNAL_DEL (queue);
1792 }
1793 
1794 /* Changing the minimum required fill level must
1795  * wake up the _loop function as it might now
1796  * be able to preceed.
1797  */
1798 #define QUEUE_THRESHOLD_CHANGE(q)\
1799   GST_QUEUE_SIGNAL_ADD (q);
1800 
1801 static void
gst_queue_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1802 gst_queue_set_property (GObject * object,
1803     guint prop_id, const GValue * value, GParamSpec * pspec)
1804 {
1805   GstQueue *queue = GST_QUEUE (object);
1806 
1807   /* someone could change levels here, and since this
1808    * affects the get/put funcs, we need to lock for safety. */
1809   GST_QUEUE_MUTEX_LOCK (queue);
1810 
1811   switch (prop_id) {
1812     case PROP_MAX_SIZE_BYTES:
1813       queue->max_size.bytes = g_value_get_uint (value);
1814       queue_capacity_change (queue);
1815       break;
1816     case PROP_MAX_SIZE_BUFFERS:
1817       queue->max_size.buffers = g_value_get_uint (value);
1818       queue_capacity_change (queue);
1819       break;
1820     case PROP_MAX_SIZE_TIME:
1821       queue->max_size.time = g_value_get_uint64 (value);
1822       queue_capacity_change (queue);
1823       break;
1824     case PROP_MIN_THRESHOLD_BYTES:
1825       queue->min_threshold.bytes = g_value_get_uint (value);
1826       queue->orig_min_threshold.bytes = queue->min_threshold.bytes;
1827       QUEUE_THRESHOLD_CHANGE (queue);
1828       break;
1829     case PROP_MIN_THRESHOLD_BUFFERS:
1830       queue->min_threshold.buffers = g_value_get_uint (value);
1831       queue->orig_min_threshold.buffers = queue->min_threshold.buffers;
1832       QUEUE_THRESHOLD_CHANGE (queue);
1833       break;
1834     case PROP_MIN_THRESHOLD_TIME:
1835       queue->min_threshold.time = g_value_get_uint64 (value);
1836       queue->orig_min_threshold.time = queue->min_threshold.time;
1837       QUEUE_THRESHOLD_CHANGE (queue);
1838       break;
1839     case PROP_LEAKY:
1840       queue->leaky = g_value_get_enum (value);
1841       break;
1842     case PROP_SILENT:
1843       queue->silent = g_value_get_boolean (value);
1844       break;
1845     case PROP_FLUSH_ON_EOS:
1846       queue->flush_on_eos = g_value_get_boolean (value);
1847       break;
1848     default:
1849       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1850       break;
1851   }
1852 
1853   GST_QUEUE_MUTEX_UNLOCK (queue);
1854 }
1855 
1856 static void
gst_queue_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1857 gst_queue_get_property (GObject * object,
1858     guint prop_id, GValue * value, GParamSpec * pspec)
1859 {
1860   GstQueue *queue = GST_QUEUE (object);
1861 
1862   GST_QUEUE_MUTEX_LOCK (queue);
1863 
1864   switch (prop_id) {
1865     case PROP_CUR_LEVEL_BYTES:
1866       g_value_set_uint (value, queue->cur_level.bytes);
1867       break;
1868     case PROP_CUR_LEVEL_BUFFERS:
1869       g_value_set_uint (value, queue->cur_level.buffers);
1870       break;
1871     case PROP_CUR_LEVEL_TIME:
1872       g_value_set_uint64 (value, queue->cur_level.time);
1873       break;
1874     case PROP_MAX_SIZE_BYTES:
1875       g_value_set_uint (value, queue->max_size.bytes);
1876       break;
1877     case PROP_MAX_SIZE_BUFFERS:
1878       g_value_set_uint (value, queue->max_size.buffers);
1879       break;
1880     case PROP_MAX_SIZE_TIME:
1881       g_value_set_uint64 (value, queue->max_size.time);
1882       break;
1883     case PROP_MIN_THRESHOLD_BYTES:
1884       g_value_set_uint (value, queue->min_threshold.bytes);
1885       break;
1886     case PROP_MIN_THRESHOLD_BUFFERS:
1887       g_value_set_uint (value, queue->min_threshold.buffers);
1888       break;
1889     case PROP_MIN_THRESHOLD_TIME:
1890       g_value_set_uint64 (value, queue->min_threshold.time);
1891       break;
1892     case PROP_LEAKY:
1893       g_value_set_enum (value, queue->leaky);
1894       break;
1895     case PROP_SILENT:
1896       g_value_set_boolean (value, queue->silent);
1897       break;
1898     case PROP_FLUSH_ON_EOS:
1899       g_value_set_boolean (value, queue->flush_on_eos);
1900       break;
1901     default:
1902       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1903       break;
1904   }
1905 
1906   GST_QUEUE_MUTEX_UNLOCK (queue);
1907 }
1908