• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2003 Colin Walters <cwalters@gnome.org>
5  *                    2005 Wim Taymans <wim@fluendo.com>
6  *
7  * gstqueue.c:
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24 
25 /**
26  * SECTION:element-queue
27  * @title: queue
28  *
29  * Data is queued until one of the limits specified by the
30  * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or
31  * #GstQueue:max-size-time properties has been reached. Any attempt to push
32  * more buffers into the queue will block the pushing thread until more space
33  * becomes available.
34  *
35  * The queue will create a new thread on the source pad to decouple the
36  * processing on sink and source pad.
37  *
38  * You can query how many buffers are queued by reading the
39  * #GstQueue:current-level-buffers property. You can track changes
40  * by connecting to the notify::current-level-buffers signal (which
41  * like all signals will be emitted from the streaming thread). The same
42  * applies to the #GstQueue:current-level-time and
43  * #GstQueue:current-level-bytes properties.
44  *
45  * The default queue size limits are 200 buffers, 10MB of data, or
46  * one second worth of data, whichever is reached first.
47  *
48  * As said earlier, the queue blocks by default when one of the specified
49  * maximums (bytes, time, buffers) has been reached. You can set the
50  * #GstQueue:leaky property to specify that instead of blocking it should
51  * leak (drop) new or old buffers.
52  *
53  * The #GstQueue::underrun signal is emitted when the queue has less data than
54  * the specified minimum thresholds require (by default: when the queue is
55  * empty). The #GstQueue::overrun signal is emitted when the queue is filled
56  * up. Both signals are emitted from the context of the streaming thread.
57  */
58 
59 #include "gst/gst_private.h"
60 
61 #include <gst/gst.h>
62 #include "gstqueue.h"
63 #include "gstcoreelementselements.h"
64 
65 #include "../../gst/gst-i18n-lib.h"
66 #include "../../gst/glib-compat-private.h"
67 
68 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
69     GST_PAD_SINK,
70     GST_PAD_ALWAYS,
71     GST_STATIC_CAPS_ANY);
72 
73 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
74     GST_PAD_SRC,
75     GST_PAD_ALWAYS,
76     GST_STATIC_CAPS_ANY);
77 
78 GST_DEBUG_CATEGORY_STATIC (queue_debug);
79 #define GST_CAT_DEFAULT (queue_debug)
80 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
81 
82 #define STATUS(queue, pad, msg) \
83   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
84                       "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
85                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
86                       "-%" G_GUINT64_FORMAT " ns, %u items", \
87                       GST_DEBUG_PAD_NAME (pad), \
88                       queue->cur_level.buffers, \
89                       queue->min_threshold.buffers, \
90                       queue->max_size.buffers, \
91                       queue->cur_level.bytes, \
92                       queue->min_threshold.bytes, \
93                       queue->max_size.bytes, \
94                       queue->cur_level.time, \
95                       queue->min_threshold.time, \
96                       queue->max_size.time, \
97                       gst_queue_array_get_length (queue->queue))
98 
99 /* Queue signals and args */
100 enum
101 {
102   SIGNAL_UNDERRUN,
103   SIGNAL_RUNNING,
104   SIGNAL_OVERRUN,
105   SIGNAL_PUSHING,
106   LAST_SIGNAL
107 };
108 
109 enum
110 {
111   PROP_0,
112   /* FIXME: don't we have another way of doing this
113    * "Gstreamer format" (frame/byte/time) queries? */
114   PROP_CUR_LEVEL_BUFFERS,
115   PROP_CUR_LEVEL_BYTES,
116   PROP_CUR_LEVEL_TIME,
117   PROP_MAX_SIZE_BUFFERS,
118   PROP_MAX_SIZE_BYTES,
119   PROP_MAX_SIZE_TIME,
120   PROP_MIN_THRESHOLD_BUFFERS,
121   PROP_MIN_THRESHOLD_BYTES,
122   PROP_MIN_THRESHOLD_TIME,
123   PROP_LEAKY,
124   PROP_SILENT,
125   PROP_FLUSH_ON_EOS
126 };
127 
128 /* default property values */
129 #define DEFAULT_MAX_SIZE_BUFFERS  200   /* 200 buffers */
130 #define DEFAULT_MAX_SIZE_BYTES    (10 * 1024 * 1024)    /* 10 MB       */
131 #define DEFAULT_MAX_SIZE_TIME     GST_SECOND    /* 1 second    */
132 
133 #define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
134   g_mutex_lock (&q->qlock);                                              \
135 } G_STMT_END
136 
137 #define GST_QUEUE_MUTEX_LOCK_CHECK(q,label) G_STMT_START {              \
138   GST_QUEUE_MUTEX_LOCK (q);                                             \
139   if (q->srcresult != GST_FLOW_OK)                                      \
140     goto label;                                                         \
141 } G_STMT_END
142 
143 #define GST_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                        \
144   g_mutex_unlock (&q->qlock);                                            \
145 } G_STMT_END
146 
147 #define GST_QUEUE_WAIT_DEL_CHECK(q, label) G_STMT_START {               \
148   STATUS (q, q->sinkpad, "wait for DEL");                               \
149   q->waiting_del = TRUE;                                                \
150   g_cond_wait (&q->item_del, &q->qlock);                                  \
151   q->waiting_del = FALSE;                                               \
152   if (q->srcresult != GST_FLOW_OK) {                                    \
153     STATUS (q, q->srcpad, "received DEL wakeup");                       \
154     goto label;                                                         \
155   }                                                                     \
156   STATUS (q, q->sinkpad, "received DEL");                               \
157 } G_STMT_END
158 
159 #define GST_QUEUE_WAIT_ADD_CHECK(q, label) G_STMT_START {               \
160   STATUS (q, q->srcpad, "wait for ADD");                                \
161   q->waiting_add = TRUE;                                                \
162   g_cond_wait (&q->item_add, &q->qlock);                                  \
163   q->waiting_add = FALSE;                                               \
164   if (q->srcresult != GST_FLOW_OK) {                                    \
165     STATUS (q, q->srcpad, "received ADD wakeup");                       \
166     goto label;                                                         \
167   }                                                                     \
168   STATUS (q, q->srcpad, "received ADD");                                \
169 } G_STMT_END
170 
171 #define GST_QUEUE_SIGNAL_DEL(q) G_STMT_START {                          \
172   if (q->waiting_del) {                                                 \
173     STATUS (q, q->srcpad, "signal DEL");                                \
174     g_cond_signal (&q->item_del);                                        \
175   }                                                                     \
176 } G_STMT_END
177 
178 #define GST_QUEUE_SIGNAL_ADD(q) G_STMT_START {                          \
179   if (q->waiting_add) {                                                 \
180     STATUS (q, q->sinkpad, "signal ADD");                               \
181     g_cond_signal (&q->item_add);                                        \
182   }                                                                     \
183 } G_STMT_END
184 
185 #define _do_init \
186     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue", 0, "queue element"); \
187     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue_dataflow", 0, \
188         "dataflow inside the queue element");
189 #define gst_queue_parent_class parent_class
190 G_DEFINE_TYPE_WITH_CODE (GstQueue, gst_queue, GST_TYPE_ELEMENT, _do_init);
191 GST_ELEMENT_REGISTER_DEFINE (queue, "queue", GST_RANK_NONE, GST_TYPE_QUEUE);
192 
193 static void gst_queue_finalize (GObject * object);
194 static void gst_queue_set_property (GObject * object,
195     guint prop_id, const GValue * value, GParamSpec * pspec);
196 static void gst_queue_get_property (GObject * object,
197     guint prop_id, GValue * value, GParamSpec * pspec);
198 
199 static GstFlowReturn gst_queue_chain (GstPad * pad, GstObject * parent,
200     GstBuffer * buffer);
201 static GstFlowReturn gst_queue_chain_list (GstPad * pad, GstObject * parent,
202     GstBufferList * buffer_list);
203 static GstFlowReturn gst_queue_push_one (GstQueue * queue);
204 static void gst_queue_loop (GstPad * pad);
205 
206 static GstFlowReturn gst_queue_handle_sink_event (GstPad * pad,
207     GstObject * parent, GstEvent * event);
208 static gboolean gst_queue_handle_sink_query (GstPad * pad, GstObject * parent,
209     GstQuery * query);
210 
211 static gboolean gst_queue_handle_src_event (GstPad * pad, GstObject * parent,
212     GstEvent * event);
213 static gboolean gst_queue_handle_src_query (GstPad * pad, GstObject * parent,
214     GstQuery * query);
215 
216 static void gst_queue_locked_flush (GstQueue * queue, gboolean full);
217 
218 static gboolean gst_queue_src_activate_mode (GstPad * pad, GstObject * parent,
219     GstPadMode mode, gboolean active);
220 static gboolean gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent,
221     GstPadMode mode, gboolean active);
222 
223 static gboolean gst_queue_is_empty (GstQueue * queue);
224 static gboolean gst_queue_is_filled (GstQueue * queue);
225 
226 
227 typedef struct
228 {
229   GstMiniObject *item;
230   gsize size;
231   gboolean is_query;
232 } GstQueueItem;
233 
234 #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
235 
236 static GType
queue_leaky_get_type(void)237 queue_leaky_get_type (void)
238 {
239   static GType queue_leaky_type = 0;
240   static const GEnumValue queue_leaky[] = {
241     {GST_QUEUE_NO_LEAK, "Not Leaky", "no"},
242     {GST_QUEUE_LEAK_UPSTREAM, "Leaky on upstream (new buffers)", "upstream"},
243     {GST_QUEUE_LEAK_DOWNSTREAM, "Leaky on downstream (old buffers)",
244         "downstream"},
245     {0, NULL, NULL},
246   };
247 
248   if (!queue_leaky_type) {
249     queue_leaky_type = g_enum_register_static ("GstQueueLeaky", queue_leaky);
250   }
251   return queue_leaky_type;
252 }
253 
254 static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
255 
256 static void
gst_queue_class_init(GstQueueClass * klass)257 gst_queue_class_init (GstQueueClass * klass)
258 {
259   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
260   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
261 
262   gobject_class->set_property = gst_queue_set_property;
263   gobject_class->get_property = gst_queue_get_property;
264 
265   /* signals */
266   /**
267    * GstQueue::underrun:
268    * @queue: the queue instance
269    *
270    * Reports that the buffer became empty (underrun).
271    * A buffer is empty if the total amount of data inside it (num-buffers, time,
272    * size) is lower than the boundary values which can be set through the
273    * GObject properties.
274    */
275   gst_queue_signals[SIGNAL_UNDERRUN] =
276       g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
277       G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
278       NULL, G_TYPE_NONE, 0);
279   /**
280    * GstQueue::running:
281    * @queue: the queue instance
282    *
283    * Reports that enough (min-threshold) data is in the queue. Use this signal
284    * together with the underrun signal to pause the pipeline on underrun and
285    * wait for the queue to fill-up before resume playback.
286    */
287   gst_queue_signals[SIGNAL_RUNNING] =
288       g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
289       G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
290       NULL, G_TYPE_NONE, 0);
291   /**
292    * GstQueue::overrun:
293    * @queue: the queue instance
294    *
295    * Reports that the buffer became full (overrun).
296    * A buffer is full if the total amount of data inside it (num-buffers, time,
297    * size) is higher than the boundary values which can be set through the
298    * GObject properties.
299    */
300   gst_queue_signals[SIGNAL_OVERRUN] =
301       g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
302       G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
303       NULL, G_TYPE_NONE, 0);
304   /**
305    * GstQueue::pushing:
306    * @queue: the queue instance
307    *
308    * Reports when the queue has enough data to start pushing data again on the
309    * source pad.
310    */
311   gst_queue_signals[SIGNAL_PUSHING] =
312       g_signal_new ("pushing", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
313       G_STRUCT_OFFSET (GstQueueClass, pushing), NULL, NULL,
314       NULL, G_TYPE_NONE, 0);
315 
316   /* properties */
317   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
318       g_param_spec_uint ("current-level-bytes", "Current level (kB)",
319           "Current amount of data in the queue (bytes)",
320           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
321   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
322       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
323           "Current number of buffers in the queue",
324           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
325   g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
326       g_param_spec_uint64 ("current-level-time", "Current level (ns)",
327           "Current amount of data in the queue (in ns)",
328           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
329 
330   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
331       g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
332           "Max. amount of data in the queue (bytes, 0=disable)",
333           0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
334           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
335           G_PARAM_STATIC_STRINGS));
336   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
337       g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
338           "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
339           DEFAULT_MAX_SIZE_BUFFERS,
340           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
341           G_PARAM_STATIC_STRINGS));
342   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
343       g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
344           "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
345           DEFAULT_MAX_SIZE_TIME,
346           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
347           G_PARAM_STATIC_STRINGS));
348 
349   g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_BYTES,
350       g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
351           "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
352           0, G_MAXUINT, 0,
353           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
354           G_PARAM_STATIC_STRINGS));
355   g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_BUFFERS,
356       g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
357           "Min. number of buffers in the queue to allow reading (0=disable)", 0,
358           G_MAXUINT, 0,
359           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
360           G_PARAM_STATIC_STRINGS));
361   g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_TIME,
362       g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
363           "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
364           0, G_MAXUINT64, 0,
365           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
366           G_PARAM_STATIC_STRINGS));
367 
368   g_object_class_install_property (gobject_class, PROP_LEAKY,
369       g_param_spec_enum ("leaky", "Leaky",
370           "Where the queue leaks, if at all",
371           GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK,
372           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
373           G_PARAM_STATIC_STRINGS));
374 
375   /**
376    * GstQueue:silent
377    *
378    * Don't emit queue signals. Makes queues more lightweight if no signals are
379    * needed.
380    */
381   g_object_class_install_property (gobject_class, PROP_SILENT,
382       g_param_spec_boolean ("silent", "Silent",
383           "Don't emit queue signals", FALSE,
384           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
385           G_PARAM_STATIC_STRINGS));
386 
387   /**
388    * queue:flush-on-eos:
389    *
390    * Discard all data in the queue when an EOS event is received, and pass
391    * on the EOS event as soon as possible (instead of waiting until all
392    * buffers in the queue have been processed, which is the default behaviour).
393    *
394    * Flushing the queue on EOS might be useful when capturing and encoding
395    * from a live source, to finish up the recording quickly in cases when
396    * the encoder is slow. Note that this might mean some data from the end of
397    * the recording data might be lost though (never more than the configured
398    * max. sizes though).
399    *
400    * Since: 1.2
401    */
402   g_object_class_install_property (gobject_class, PROP_FLUSH_ON_EOS,
403       g_param_spec_boolean ("flush-on-eos", "Flush on EOS",
404           "Discard all data in the queue when an EOS event is received", FALSE,
405           G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING |
406           G_PARAM_STATIC_STRINGS));
407 
408   gobject_class->finalize = gst_queue_finalize;
409 
410   gst_element_class_set_static_metadata (gstelement_class,
411       "Queue",
412       "Generic", "Simple data queue", "Erik Walthinsen <omega@cse.ogi.edu>");
413   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
414   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
415 
416   /* Registering debug symbols for function pointers */
417   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_src_activate_mode);
418   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_sink_event);
419   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_sink_query);
420   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_event);
421   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_query);
422   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain);
423   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain_list);
424 
425   gst_type_mark_as_plugin_api (GST_TYPE_QUEUE_LEAKY, 0);
426 }
427 
428 static void
gst_queue_init(GstQueue * queue)429 gst_queue_init (GstQueue * queue)
430 {
431   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
432 
433   gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
434   gst_pad_set_chain_list_function (queue->sinkpad, gst_queue_chain_list);
435   gst_pad_set_activatemode_function (queue->sinkpad,
436       gst_queue_sink_activate_mode);
437   gst_pad_set_event_full_function (queue->sinkpad, gst_queue_handle_sink_event);
438   gst_pad_set_query_function (queue->sinkpad, gst_queue_handle_sink_query);
439   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
440   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
441 
442   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
443 
444   gst_pad_set_activatemode_function (queue->srcpad,
445       gst_queue_src_activate_mode);
446   gst_pad_set_event_function (queue->srcpad, gst_queue_handle_src_event);
447   gst_pad_set_query_function (queue->srcpad, gst_queue_handle_src_query);
448   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
449   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
450 
451   GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
452   queue->max_size.buffers = DEFAULT_MAX_SIZE_BUFFERS;
453   queue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
454   queue->max_size.time = DEFAULT_MAX_SIZE_TIME;
455   GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
456   GST_QUEUE_CLEAR_LEVEL (queue->orig_min_threshold);
457   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
458   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
459   queue->head_needs_discont = queue->tail_needs_discont = FALSE;
460 
461   queue->leaky = GST_QUEUE_NO_LEAK;
462   queue->srcresult = GST_FLOW_FLUSHING;
463 
464   g_mutex_init (&queue->qlock);
465   g_cond_init (&queue->item_add);
466   g_cond_init (&queue->item_del);
467   g_cond_init (&queue->query_handled);
468 
469   queue->queue =
470       gst_queue_array_new_for_struct (sizeof (GstQueueItem),
471       DEFAULT_MAX_SIZE_BUFFERS * 3 / 2);
472 
473   queue->sinktime = GST_CLOCK_STIME_NONE;
474   queue->srctime = GST_CLOCK_STIME_NONE;
475 
476   queue->sink_tainted = TRUE;
477   queue->src_tainted = TRUE;
478 
479   queue->newseg_applied_to_src = FALSE;
480 
481   GST_DEBUG_OBJECT (queue,
482       "initialized queue's not_empty & not_full conditions");
483 }
484 
485 /* called only once, as opposed to dispose */
486 static void
gst_queue_finalize(GObject * object)487 gst_queue_finalize (GObject * object)
488 {
489   GstQueue *queue = GST_QUEUE (object);
490   GstQueueItem *qitem;
491 
492   GST_DEBUG_OBJECT (queue, "finalizing queue");
493 
494   while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
495     /* FIXME: if it's a query, shouldn't we unref that too? */
496     if (!qitem->is_query)
497       gst_mini_object_unref (qitem->item);
498   }
499   gst_queue_array_free (queue->queue);
500 
501   g_mutex_clear (&queue->qlock);
502   g_cond_clear (&queue->item_add);
503   g_cond_clear (&queue->item_del);
504   g_cond_clear (&queue->query_handled);
505 
506   G_OBJECT_CLASS (parent_class)->finalize (object);
507 }
508 
509 /* Convenience function */
510 static inline GstClockTimeDiff
my_segment_to_running_time(GstSegment * segment,GstClockTime val)511 my_segment_to_running_time (GstSegment * segment, GstClockTime val)
512 {
513   GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
514 
515   if (GST_CLOCK_TIME_IS_VALID (val)) {
516     gboolean sign =
517         gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
518     if (sign > 0)
519       res = val;
520     else if (sign < 0)
521       res = -val;
522   }
523   return res;
524 }
525 
526 /* calculate the diff between running time on the sink and src of the queue.
527  * This is the total amount of time in the queue. */
528 static void
update_time_level(GstQueue * queue)529 update_time_level (GstQueue * queue)
530 {
531   gint64 sink_time, src_time;
532 
533   if (queue->sink_tainted) {
534     GST_LOG_OBJECT (queue, "update sink time");
535     queue->sinktime =
536         my_segment_to_running_time (&queue->sink_segment,
537         queue->sink_segment.position);
538     queue->sink_tainted = FALSE;
539   }
540   sink_time = queue->sinktime;
541 
542   if (queue->src_tainted) {
543     GST_LOG_OBJECT (queue, "update src time");
544     queue->srctime =
545         my_segment_to_running_time (&queue->src_segment,
546         queue->src_segment.position);
547     queue->src_tainted = FALSE;
548   }
549   src_time = queue->srctime;
550 
551   GST_LOG_OBJECT (queue, "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT,
552       GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
553 
554   if (GST_CLOCK_STIME_IS_VALID (src_time)
555       && GST_CLOCK_STIME_IS_VALID (sink_time) && sink_time >= src_time)
556     queue->cur_level.time = sink_time - src_time;
557   else
558     queue->cur_level.time = 0;
559 }
560 
561 /* take a SEGMENT event and apply the values to segment, updating the time
562  * level of queue. */
563 static void
apply_segment(GstQueue * queue,GstEvent * event,GstSegment * segment,gboolean sink)564 apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment,
565     gboolean sink)
566 {
567   gst_event_copy_segment (event, segment);
568 
569   /* now configure the values, we use these to track timestamps on the
570    * sinkpad. */
571   if (segment->format != GST_FORMAT_TIME) {
572     /* non-time format, pretent the current time segment is closed with a
573      * 0 start and unknown stop time. */
574     segment->format = GST_FORMAT_TIME;
575     segment->start = 0;
576     segment->stop = -1;
577     segment->time = 0;
578   }
579   if (sink)
580     queue->sink_tainted = TRUE;
581   else
582     queue->src_tainted = TRUE;
583 
584   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
585 
586   /* segment can update the time level of the queue */
587   update_time_level (queue);
588 }
589 
590 static void
apply_gap(GstQueue * queue,GstEvent * event,GstSegment * segment,gboolean is_sink)591 apply_gap (GstQueue * queue, GstEvent * event,
592     GstSegment * segment, gboolean is_sink)
593 {
594   GstClockTime timestamp;
595   GstClockTime duration;
596 
597   gst_event_parse_gap (event, &timestamp, &duration);
598 
599   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
600 
601     if (GST_CLOCK_TIME_IS_VALID (duration)) {
602       timestamp += duration;
603     }
604 
605     segment->position = timestamp;
606 
607     if (is_sink)
608       queue->sink_tainted = TRUE;
609     else
610       queue->src_tainted = TRUE;
611 
612     /* calc diff with other end */
613     update_time_level (queue);
614   }
615 }
616 
617 
618 /* take a buffer and update segment, updating the time level of the queue. */
619 static void
apply_buffer(GstQueue * queue,GstBuffer * buffer,GstSegment * segment,gboolean sink)620 apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
621     gboolean sink)
622 {
623   GstClockTime duration, timestamp;
624 
625   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
626   duration = GST_BUFFER_DURATION (buffer);
627 
628   /* if no timestamp is set, assume it's continuous with the previous
629    * time */
630   if (timestamp == GST_CLOCK_TIME_NONE)
631     timestamp = segment->position;
632 
633   /* add duration */
634   if (duration != GST_CLOCK_TIME_NONE)
635     timestamp += duration;
636 
637   GST_LOG_OBJECT (queue, "%s position updated to %" GST_TIME_FORMAT,
638       segment == &queue->sink_segment ? "sink" : "src",
639       GST_TIME_ARGS (timestamp));
640 
641   segment->position = timestamp;
642   if (sink)
643     queue->sink_tainted = TRUE;
644   else
645     queue->src_tainted = TRUE;
646 
647 
648   /* calc diff with other end */
649   update_time_level (queue);
650 }
651 
652 static gboolean
buffer_list_apply_time(GstBuffer ** buf,guint idx,gpointer user_data)653 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer user_data)
654 {
655   GstClockTime *timestamp = user_data;
656   GstClockTime btime;
657 
658   GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
659       " duration %" GST_TIME_FORMAT, idx, GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
660       GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
661       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
662 
663   btime = GST_BUFFER_DTS_OR_PTS (*buf);
664   if (GST_CLOCK_TIME_IS_VALID (btime))
665     *timestamp = btime;
666 
667   if (GST_BUFFER_DURATION_IS_VALID (*buf))
668     *timestamp += GST_BUFFER_DURATION (*buf);
669 
670   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
671 
672   return TRUE;
673 }
674 
675 /* take a buffer list and update segment, updating the time level of the queue */
676 static void
apply_buffer_list(GstQueue * queue,GstBufferList * buffer_list,GstSegment * segment,gboolean sink)677 apply_buffer_list (GstQueue * queue, GstBufferList * buffer_list,
678     GstSegment * segment, gboolean sink)
679 {
680   GstClockTime timestamp;
681 
682   /* if no timestamp is set, assume it's continuous with the previous time */
683   timestamp = segment->position;
684 
685   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);
686 
687   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
688       GST_TIME_ARGS (timestamp));
689 
690   segment->position = timestamp;
691 
692   if (sink)
693     queue->sink_tainted = TRUE;
694   else
695     queue->src_tainted = TRUE;
696 
697   /* calc diff with other end */
698   update_time_level (queue);
699 }
700 
701 static void
gst_queue_locked_flush(GstQueue * queue,gboolean full)702 gst_queue_locked_flush (GstQueue * queue, gboolean full)
703 {
704   GstQueueItem *qitem;
705 
706   while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
707     /* Then lose another reference because we are supposed to destroy that
708        data when flushing */
709     if (!full && !qitem->is_query && GST_IS_EVENT (qitem->item)
710         && GST_EVENT_IS_STICKY (qitem->item)
711         && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
712         && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
713       gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (qitem->item));
714     }
715     if (!qitem->is_query)
716       gst_mini_object_unref (qitem->item);
717     memset (qitem, 0, sizeof (GstQueueItem));
718   }
719   queue->last_query = FALSE;
720   g_cond_signal (&queue->query_handled);
721   GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
722   queue->min_threshold.buffers = queue->orig_min_threshold.buffers;
723   queue->min_threshold.bytes = queue->orig_min_threshold.bytes;
724   queue->min_threshold.time = queue->orig_min_threshold.time;
725   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
726   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
727   queue->head_needs_discont = queue->tail_needs_discont = FALSE;
728 
729   queue->sinktime = queue->srctime = GST_CLOCK_STIME_NONE;
730   queue->sink_tainted = queue->src_tainted = TRUE;
731 
732   /* we deleted a lot of something */
733   GST_QUEUE_SIGNAL_DEL (queue);
734 }
735 
736 /* enqueue an item an update the level stats, with QUEUE_LOCK */
737 static inline void
gst_queue_locked_enqueue_buffer(GstQueue * queue,gpointer item)738 gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
739 {
740   GstQueueItem qitem;
741   GstBuffer *buffer = GST_BUFFER_CAST (item);
742   gsize bsize = gst_buffer_get_size (buffer);
743 
744   /* add buffer to the statistics */
745   queue->cur_level.buffers++;
746   queue->cur_level.bytes += bsize;
747   apply_buffer (queue, buffer, &queue->sink_segment, TRUE);
748 
749   qitem.item = item;
750   qitem.is_query = FALSE;
751   qitem.size = bsize;
752   gst_queue_array_push_tail_struct (queue->queue, &qitem);
753   GST_QUEUE_SIGNAL_ADD (queue);
754 }
755 
756 static inline void
gst_queue_locked_enqueue_buffer_list(GstQueue * queue,gpointer item)757 gst_queue_locked_enqueue_buffer_list (GstQueue * queue, gpointer item)
758 {
759   GstQueueItem qitem;
760   GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
761   gsize bsize;
762 
763   bsize = gst_buffer_list_calculate_size (buffer_list);
764 
765   /* add buffer to the statistics */
766   queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
767   queue->cur_level.bytes += bsize;
768   apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
769 
770   qitem.item = item;
771   qitem.is_query = FALSE;
772   qitem.size = bsize;
773   gst_queue_array_push_tail_struct (queue->queue, &qitem);
774   GST_QUEUE_SIGNAL_ADD (queue);
775 }
776 
777 static inline void
gst_queue_locked_enqueue_event(GstQueue * queue,gpointer item)778 gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
779 {
780   GstQueueItem qitem;
781   GstEvent *event = GST_EVENT_CAST (item);
782 
783   switch (GST_EVENT_TYPE (event)) {
784     case GST_EVENT_EOS:
785       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from upstream");
786       /* Zero the thresholds, this makes sure the queue is completely
787        * filled and we can read all data from the queue. */
788       if (queue->flush_on_eos)
789         gst_queue_locked_flush (queue, FALSE);
790       else
791         GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
792       /* mark the queue as EOS. This prevents us from accepting more data. */
793       queue->eos = TRUE;
794       break;
795     case GST_EVENT_SEGMENT:
796       apply_segment (queue, event, &queue->sink_segment, TRUE);
797       /* if the queue is empty, apply sink segment on the source */
798       if (gst_queue_array_is_empty (queue->queue)) {
799         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Apply segment on srcpad");
800         apply_segment (queue, event, &queue->src_segment, FALSE);
801         queue->newseg_applied_to_src = TRUE;
802       }
803       /* a new segment allows us to accept more buffers if we got EOS
804        * from downstream */
805       queue->unexpected = FALSE;
806       break;
807     case GST_EVENT_GAP:
808       apply_gap (queue, event, &queue->sink_segment, TRUE);
809       break;
810     default:
811       break;
812   }
813 
814   qitem.item = item;
815   qitem.is_query = FALSE;
816   qitem.size = 0;
817   gst_queue_array_push_tail_struct (queue->queue, &qitem);
818   GST_QUEUE_SIGNAL_ADD (queue);
819 }
820 
821 /* dequeue an item from the queue and update level stats, with QUEUE_LOCK */
822 static GstMiniObject *
gst_queue_locked_dequeue(GstQueue * queue)823 gst_queue_locked_dequeue (GstQueue * queue)
824 {
825   GstQueueItem *qitem;
826   GstMiniObject *item;
827   gsize bufsize;
828 
829   qitem = gst_queue_array_pop_head_struct (queue->queue);
830   if (qitem == NULL)
831     goto no_item;
832 
833   item = qitem->item;
834   bufsize = qitem->size;
835 
836   if (GST_IS_BUFFER (item)) {
837     GstBuffer *buffer = GST_BUFFER_CAST (item);
838 
839     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
840         "retrieved buffer %p from queue", buffer);
841 
842     queue->cur_level.buffers--;
843     queue->cur_level.bytes -= bufsize;
844     apply_buffer (queue, buffer, &queue->src_segment, FALSE);
845 
846     /* if the queue is empty now, update the other side */
847     if (queue->cur_level.buffers == 0)
848       queue->cur_level.time = 0;
849   } else if (GST_IS_BUFFER_LIST (item)) {
850     GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
851 
852     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
853         "retrieved buffer list %p from queue", buffer_list);
854 
855     queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
856     queue->cur_level.bytes -= bufsize;
857     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
858 
859     /* if the queue is empty now, update the other side */
860     if (queue->cur_level.buffers == 0)
861       queue->cur_level.time = 0;
862   } else if (GST_IS_EVENT (item)) {
863     GstEvent *event = GST_EVENT_CAST (item);
864 
865     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
866         "retrieved event %p from queue", event);
867 
868     switch (GST_EVENT_TYPE (event)) {
869       case GST_EVENT_EOS:
870         /* queue is empty now that we dequeued the EOS */
871         GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
872         break;
873       case GST_EVENT_SEGMENT:
874         /* apply newsegment if it has not already been applied */
875         if (G_LIKELY (!queue->newseg_applied_to_src)) {
876           apply_segment (queue, event, &queue->src_segment, FALSE);
877         } else {
878           queue->newseg_applied_to_src = FALSE;
879         }
880         break;
881       case GST_EVENT_GAP:
882         apply_gap (queue, event, &queue->src_segment, FALSE);
883         break;
884       default:
885         break;
886     }
887   } else if (GST_IS_QUERY (item)) {
888     GstQuery *query = GST_QUERY_CAST (item);
889 
890     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
891         "retrieved query %p from queue", query);
892   } else {
893     g_warning
894         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
895         item, GST_OBJECT_NAME (queue));
896     item = NULL;
897   }
898   GST_QUEUE_SIGNAL_DEL (queue);
899 
900   return item;
901 
902   /* ERRORS */
903 no_item:
904   {
905     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "the queue is empty");
906     return NULL;
907   }
908 }
909 
910 static GstFlowReturn
gst_queue_handle_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)911 gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
912 {
913   gboolean ret = TRUE;
914   GstQueue *queue;
915 
916   queue = GST_QUEUE (parent);
917 
918   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
919       GST_EVENT_TYPE_NAME (event));
920 
921   switch (GST_EVENT_TYPE (event)) {
922     case GST_EVENT_FLUSH_START:
923       /* forward event */
924       ret = gst_pad_push_event (queue->srcpad, event);
925 
926       /* now unblock the chain function */
927       GST_QUEUE_MUTEX_LOCK (queue);
928       queue->srcresult = GST_FLOW_FLUSHING;
929       /* unblock the loop and chain functions */
930       GST_QUEUE_SIGNAL_ADD (queue);
931       GST_QUEUE_SIGNAL_DEL (queue);
932       GST_QUEUE_MUTEX_UNLOCK (queue);
933 
934       /* make sure it pauses, this should happen since we sent
935        * flush_start downstream. */
936       gst_pad_pause_task (queue->srcpad);
937       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
938 
939       /* unblock query handler after the streaming thread is shut down.
940        * Otherwise downstream might have a query that is already unreffed
941        * upstream */
942       GST_QUEUE_MUTEX_LOCK (queue);
943       queue->last_query = FALSE;
944       g_cond_signal (&queue->query_handled);
945       GST_QUEUE_MUTEX_UNLOCK (queue);
946       break;
947     case GST_EVENT_FLUSH_STOP:
948       /* forward event */
949       ret = gst_pad_push_event (queue->srcpad, event);
950 
951       GST_QUEUE_MUTEX_LOCK (queue);
952       gst_queue_locked_flush (queue, FALSE);
953       queue->srcresult = GST_FLOW_OK;
954       queue->eos = FALSE;
955       queue->unexpected = FALSE;
956       if (gst_pad_is_active (queue->srcpad)) {
957         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
958             queue->srcpad, NULL);
959       } else {
960         GST_INFO_OBJECT (queue->srcpad, "not re-starting task on srcpad, "
961             "pad not active any longer");
962       }
963       GST_QUEUE_MUTEX_UNLOCK (queue);
964 
965       STATUS (queue, pad, "after flush");
966       break;
967     default:
968       if (GST_EVENT_IS_SERIALIZED (event)) {
969         /* serialized events go in the queue */
970         GST_QUEUE_MUTEX_LOCK (queue);
971 
972         /* STREAM_START and SEGMENT reset the EOS status of a
973          * pad. Change the cached sinkpad flow result accordingly */
974         if (queue->srcresult == GST_FLOW_EOS
975             && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
976                 || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
977           queue->srcresult = GST_FLOW_OK;
978 
979         if (queue->srcresult != GST_FLOW_OK) {
980           /* Errors in sticky event pushing are no problem and ignored here
981            * as they will cause more meaningful errors during data flow.
982            * For EOS events, that are not followed by data flow, we still
983            * return FALSE here though and report an error.
984            */
985           if (!GST_EVENT_IS_STICKY (event)) {
986             GST_QUEUE_MUTEX_UNLOCK (queue);
987             goto out_flow_error;
988           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
989             if (queue->srcresult == GST_FLOW_NOT_LINKED
990                 || queue->srcresult < GST_FLOW_EOS) {
991               GST_QUEUE_MUTEX_UNLOCK (queue);
992               GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
993             } else {
994               GST_QUEUE_MUTEX_UNLOCK (queue);
995             }
996             goto out_flow_error;
997           }
998         }
999 
1000         /* refuse more events on EOS unless they unset the EOS status */
1001         if (queue->eos) {
1002           switch (GST_EVENT_TYPE (event)) {
1003             case GST_EVENT_STREAM_START:
1004             case GST_EVENT_SEGMENT:
1005               /* Restart the loop */
1006               if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
1007                 queue->srcresult = GST_FLOW_OK;
1008                 queue->eos = FALSE;
1009                 queue->unexpected = FALSE;
1010                 gst_pad_start_task (queue->srcpad,
1011                     (GstTaskFunction) gst_queue_loop, queue->srcpad, NULL);
1012               } else {
1013                 queue->eos = FALSE;
1014                 queue->unexpected = FALSE;
1015               }
1016 
1017               break;
1018             default:
1019               goto out_eos;
1020           }
1021         }
1022 
1023         gst_queue_locked_enqueue_event (queue, event);
1024         GST_QUEUE_MUTEX_UNLOCK (queue);
1025       } else {
1026         /* non-serialized events are forwarded downstream immediately */
1027         ret = gst_pad_push_event (queue->srcpad, event);
1028       }
1029       break;
1030   }
1031   if (ret == FALSE) {
1032     GST_ERROR_OBJECT (queue, "Failed to push event");
1033     return GST_FLOW_ERROR;
1034   }
1035   return GST_FLOW_OK;
1036 
1037   /* ERRORS */
1038 out_eos:
1039   {
1040     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "refusing event, we are EOS");
1041     GST_QUEUE_MUTEX_UNLOCK (queue);
1042     gst_event_unref (event);
1043     return GST_FLOW_EOS;
1044   }
1045 out_flow_error:
1046   {
1047     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1048         "refusing event, we have a downstream flow error: %s",
1049         gst_flow_get_name (queue->srcresult));
1050     gst_event_unref (event);
1051     return queue->srcresult;
1052   }
1053 }
1054 
1055 static gboolean
gst_queue_handle_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)1056 gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
1057 {
1058   GstQueue *queue = GST_QUEUE_CAST (parent);
1059   gboolean res;
1060 
1061   switch (GST_QUERY_TYPE (query)) {
1062     default:
1063       if (G_UNLIKELY (GST_QUERY_IS_SERIALIZED (query))) {
1064         GstQueueItem qitem;
1065 
1066         GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1067         GST_LOG_OBJECT (queue, "queuing query %p (%s)", query,
1068             GST_QUERY_TYPE_NAME (query));
1069         qitem.item = GST_MINI_OBJECT_CAST (query);
1070         qitem.is_query = TRUE;
1071         qitem.size = 0;
1072         gst_queue_array_push_tail_struct (queue->queue, &qitem);
1073         GST_QUEUE_SIGNAL_ADD (queue);
1074         while (queue->srcresult == GST_FLOW_OK &&
1075             queue->last_handled_query != query)
1076           g_cond_wait (&queue->query_handled, &queue->qlock);
1077         queue->last_handled_query = NULL;
1078         if (queue->srcresult != GST_FLOW_OK)
1079           goto out_flushing;
1080         res = queue->last_query;
1081         GST_QUEUE_MUTEX_UNLOCK (queue);
1082       } else {
1083         res = gst_pad_query_default (pad, parent, query);
1084       }
1085       break;
1086   }
1087   return res;
1088 
1089   /* ERRORS */
1090 out_flushing:
1091   {
1092     GST_DEBUG_OBJECT (queue, "we are flushing");
1093     GST_QUEUE_MUTEX_UNLOCK (queue);
1094     return FALSE;
1095   }
1096 }
1097 
1098 static gboolean
gst_queue_is_empty(GstQueue * queue)1099 gst_queue_is_empty (GstQueue * queue)
1100 {
1101   GstQueueItem *tail;
1102 
1103   tail = gst_queue_array_peek_tail_struct (queue->queue);
1104 
1105   if (tail == NULL)
1106     return TRUE;
1107 
1108   /* Only consider the queue empty if the minimum thresholds
1109    * are not reached and data is at the queue tail. Otherwise
1110    * we would block forever on serialized queries.
1111    */
1112   if (!GST_IS_BUFFER (tail->item) && !GST_IS_BUFFER_LIST (tail->item))
1113     return FALSE;
1114 
1115   /* It is possible that a max size is reached before all min thresholds are.
1116    * Therefore, only consider it empty if it is not filled. */
1117   return ((queue->min_threshold.buffers > 0 &&
1118           queue->cur_level.buffers < queue->min_threshold.buffers) ||
1119       (queue->min_threshold.bytes > 0 &&
1120           queue->cur_level.bytes < queue->min_threshold.bytes) ||
1121       (queue->min_threshold.time > 0 &&
1122           queue->cur_level.time < queue->min_threshold.time)) &&
1123       !gst_queue_is_filled (queue);
1124 }
1125 
1126 static gboolean
gst_queue_is_filled(GstQueue * queue)1127 gst_queue_is_filled (GstQueue * queue)
1128 {
1129   return (((queue->max_size.buffers > 0 &&
1130               queue->cur_level.buffers >= queue->max_size.buffers) ||
1131           (queue->max_size.bytes > 0 &&
1132               queue->cur_level.bytes >= queue->max_size.bytes) ||
1133           (queue->max_size.time > 0 &&
1134               queue->cur_level.time >= queue->max_size.time)));
1135 }
1136 
1137 static void
gst_queue_leak_downstream(GstQueue * queue)1138 gst_queue_leak_downstream (GstQueue * queue)
1139 {
1140   /* for as long as the queue is filled, dequeue an item and discard it */
1141   while (gst_queue_is_filled (queue)) {
1142     GstMiniObject *leak;
1143 
1144     leak = gst_queue_locked_dequeue (queue);
1145     /* there is nothing to dequeue and the queue is still filled.. This should
1146      * not happen */
1147     g_assert (leak != NULL);
1148 
1149     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1150         "queue is full, leaking item %p on downstream end", leak);
1151     if (GST_IS_EVENT (leak) && GST_EVENT_IS_STICKY (leak)) {
1152       GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1153           "Storing sticky event %s on srcpad", GST_EVENT_TYPE_NAME (leak));
1154       gst_pad_store_sticky_event (queue->srcpad, GST_EVENT_CAST (leak));
1155     }
1156 
1157     if (!GST_IS_QUERY (leak))
1158       gst_mini_object_unref (leak);
1159 
1160     /* last buffer needs to get a DISCONT flag */
1161     queue->head_needs_discont = TRUE;
1162   }
1163 }
1164 
1165 static gboolean
discont_first_buffer(GstBuffer ** buffer,guint i,gpointer user_data)1166 discont_first_buffer (GstBuffer ** buffer, guint i, gpointer user_data)
1167 {
1168   GstQueue *queue = user_data;
1169   GstBuffer *subbuffer = gst_buffer_make_writable (*buffer);
1170 
1171   if (subbuffer) {
1172     *buffer = subbuffer;
1173     GST_BUFFER_FLAG_SET (*buffer, GST_BUFFER_FLAG_DISCONT);
1174   } else {
1175     GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
1176   }
1177 
1178   return FALSE;
1179 }
1180 
1181 static GstFlowReturn
gst_queue_chain_buffer_or_list(GstPad * pad,GstObject * parent,GstMiniObject * obj,gboolean is_list)1182 gst_queue_chain_buffer_or_list (GstPad * pad, GstObject * parent,
1183     GstMiniObject * obj, gboolean is_list)
1184 {
1185   GstQueue *queue;
1186 
1187   queue = GST_QUEUE_CAST (parent);
1188 
1189   /* we have to lock the queue since we span threads */
1190   GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1191   /* when we received EOS, we refuse any more data */
1192   if (queue->eos)
1193     goto out_eos;
1194   if (queue->unexpected)
1195     goto out_unexpected;
1196 
1197   if (!is_list) {
1198     GstClockTime duration, timestamp;
1199     GstBuffer *buffer = GST_BUFFER_CAST (obj);
1200 
1201     timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
1202     duration = GST_BUFFER_DURATION (buffer);
1203 
1204     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of size %"
1205         G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
1206         GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
1207         GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
1208   } else {
1209     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1210         "received buffer list %p with %u buffers", obj,
1211         gst_buffer_list_length (GST_BUFFER_LIST_CAST (obj)));
1212   }
1213 
1214   /* We make space available if we're "full" according to whatever
1215    * the user defined as "full". Note that this only applies to buffers.
1216    * We always handle events and they don't count in our statistics. */
1217   while (gst_queue_is_filled (queue)) {
1218     if (!queue->silent) {
1219       GST_QUEUE_MUTEX_UNLOCK (queue);
1220       g_signal_emit (queue, gst_queue_signals[SIGNAL_OVERRUN], 0);
1221       GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1222       /* we recheck, the signal could have changed the thresholds */
1223       if (!gst_queue_is_filled (queue))
1224         break;
1225     }
1226 
1227     /* how are we going to make space for this buffer? */
1228     switch (queue->leaky) {
1229       case GST_QUEUE_LEAK_UPSTREAM:
1230         /* next buffer needs to get a DISCONT flag */
1231         queue->tail_needs_discont = TRUE;
1232         /* leak current buffer */
1233         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1234             "queue is full, leaking buffer on upstream end");
1235         /* now we can clean up and exit right away */
1236         goto out_unref;
1237       case GST_QUEUE_LEAK_DOWNSTREAM:
1238         gst_queue_leak_downstream (queue);
1239         break;
1240       default:
1241         g_warning ("Unknown leaky type, using default");
1242         /* fall-through */
1243       case GST_QUEUE_NO_LEAK:
1244       {
1245         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1246             "queue is full, waiting for free space");
1247 
1248         /* don't leak. Instead, wait for space to be available */
1249         /* for as long as the queue is filled, wait till an item was deleted. */
1250         while (gst_queue_is_filled (queue)) {
1251           GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
1252         };
1253 
1254         GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is not full");
1255 
1256         if (!queue->silent) {
1257           GST_QUEUE_MUTEX_UNLOCK (queue);
1258           g_signal_emit (queue, gst_queue_signals[SIGNAL_RUNNING], 0);
1259           GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1260         }
1261         break;
1262       }
1263     }
1264   }
1265 
1266   if (queue->tail_needs_discont) {
1267     if (!is_list) {
1268       GstBuffer *buffer = GST_BUFFER_CAST (obj);
1269       GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
1270 
1271       if (subbuffer) {
1272         buffer = subbuffer;
1273         GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1274       } else {
1275         GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
1276       }
1277 
1278       obj = GST_MINI_OBJECT_CAST (buffer);
1279     } else {
1280       GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (obj);
1281 
1282       buffer_list = gst_buffer_list_make_writable (buffer_list);
1283       gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
1284       obj = GST_MINI_OBJECT_CAST (buffer_list);
1285     }
1286     queue->tail_needs_discont = FALSE;
1287   }
1288 
1289   /* put buffer in queue now */
1290   if (is_list)
1291     gst_queue_locked_enqueue_buffer_list (queue, obj);
1292   else
1293     gst_queue_locked_enqueue_buffer (queue, obj);
1294   GST_QUEUE_MUTEX_UNLOCK (queue);
1295 
1296   return GST_FLOW_OK;
1297 
1298   /* special conditions */
1299 out_unref:
1300   {
1301     GST_QUEUE_MUTEX_UNLOCK (queue);
1302 
1303     gst_mini_object_unref (obj);
1304 
1305     return GST_FLOW_OK;
1306   }
1307 out_flushing:
1308   {
1309     GstFlowReturn ret = queue->srcresult;
1310 
1311     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1312         "exit because task paused, reason: %s", gst_flow_get_name (ret));
1313     GST_QUEUE_MUTEX_UNLOCK (queue);
1314     gst_mini_object_unref (obj);
1315 
1316     return ret;
1317   }
1318 out_eos:
1319   {
1320     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
1321     GST_QUEUE_MUTEX_UNLOCK (queue);
1322 
1323     gst_mini_object_unref (obj);
1324 
1325     return GST_FLOW_EOS;
1326   }
1327 out_unexpected:
1328   {
1329     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
1330     GST_QUEUE_MUTEX_UNLOCK (queue);
1331 
1332     gst_mini_object_unref (obj);
1333 
1334     return GST_FLOW_EOS;
1335   }
1336 }
1337 
1338 static GstFlowReturn
gst_queue_chain_list(GstPad * pad,GstObject * parent,GstBufferList * buffer_list)1339 gst_queue_chain_list (GstPad * pad, GstObject * parent,
1340     GstBufferList * buffer_list)
1341 {
1342   return gst_queue_chain_buffer_or_list (pad, parent,
1343       GST_MINI_OBJECT_CAST (buffer_list), TRUE);
1344 }
1345 
1346 static GstFlowReturn
gst_queue_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)1347 gst_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
1348 {
1349   return gst_queue_chain_buffer_or_list (pad, parent,
1350       GST_MINI_OBJECT_CAST (buffer), FALSE);
1351 }
1352 
1353 /* dequeue an item from the queue an push it downstream. This functions returns
1354  * the result of the push. */
1355 static GstFlowReturn
gst_queue_push_one(GstQueue * queue)1356 gst_queue_push_one (GstQueue * queue)
1357 {
1358   GstFlowReturn result = queue->srcresult;
1359   GstMiniObject *data;
1360   gboolean is_list;
1361 
1362   data = gst_queue_locked_dequeue (queue);
1363   if (data == NULL)
1364     goto no_item;
1365 
1366 next:
1367   is_list = GST_IS_BUFFER_LIST (data);
1368 
1369   if (GST_IS_BUFFER (data) || is_list) {
1370     if (!is_list) {
1371       GstBuffer *buffer;
1372 
1373       buffer = GST_BUFFER_CAST (data);
1374 
1375       if (queue->head_needs_discont) {
1376         GstBuffer *subbuffer = gst_buffer_make_writable (buffer);
1377 
1378         if (subbuffer) {
1379           buffer = subbuffer;
1380           GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1381         } else {
1382           GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
1383         }
1384         queue->head_needs_discont = FALSE;
1385       }
1386 
1387       GST_QUEUE_MUTEX_UNLOCK (queue);
1388       result = gst_pad_push (queue->srcpad, buffer);
1389     } else {
1390       GstBufferList *buffer_list;
1391 
1392       buffer_list = GST_BUFFER_LIST_CAST (data);
1393 
1394       if (queue->head_needs_discont) {
1395         buffer_list = gst_buffer_list_make_writable (buffer_list);
1396         gst_buffer_list_foreach (buffer_list, discont_first_buffer, queue);
1397         queue->head_needs_discont = FALSE;
1398       }
1399 
1400       GST_QUEUE_MUTEX_UNLOCK (queue);
1401       result = gst_pad_push_list (queue->srcpad, buffer_list);
1402     }
1403 
1404     /* need to check for srcresult here as well */
1405     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1406 
1407     if (result == GST_FLOW_EOS) {
1408       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
1409       /* stop pushing buffers, we dequeue all items until we see an item that we
1410        * can push again, which is EOS or SEGMENT. If there is nothing in the
1411        * queue we can push, we set a flag to make the sinkpad refuse more
1412        * buffers with an EOS return value. */
1413       while ((data = gst_queue_locked_dequeue (queue))) {
1414         if (GST_IS_BUFFER (data)) {
1415           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1416               "dropping EOS buffer %p", data);
1417           gst_buffer_unref (GST_BUFFER_CAST (data));
1418         } else if (GST_IS_BUFFER_LIST (data)) {
1419           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1420               "dropping EOS buffer list %p", data);
1421           gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
1422         } else if (GST_IS_EVENT (data)) {
1423           GstEvent *event = GST_EVENT_CAST (data);
1424           GstEventType type = GST_EVENT_TYPE (event);
1425 
1426           if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
1427               || type == GST_EVENT_STREAM_START) {
1428             /* we found a pushable item in the queue, push it out */
1429             GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1430                 "pushing pushable event %s after EOS",
1431                 GST_EVENT_TYPE_NAME (event));
1432             goto next;
1433           }
1434           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1435               "dropping EOS event %p", event);
1436           gst_event_unref (event);
1437         } else if (GST_IS_QUERY (data)) {
1438           GstQuery *query = GST_QUERY_CAST (data);
1439 
1440           GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1441               "dropping query %p because of EOS", query);
1442           queue->last_query = FALSE;
1443           g_cond_signal (&queue->query_handled);
1444         }
1445       }
1446       /* no more items in the queue. Set the unexpected flag so that upstream
1447        * make us refuse any more buffers on the sinkpad. Since we will still
1448        * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
1449        * task function does not shut down. */
1450       queue->unexpected = TRUE;
1451       result = GST_FLOW_OK;
1452     }
1453   } else if (GST_IS_EVENT (data)) {
1454     GstEvent *event = GST_EVENT_CAST (data);
1455     GstEventType type = GST_EVENT_TYPE (event);
1456 
1457     GST_QUEUE_MUTEX_UNLOCK (queue);
1458 
1459     gst_pad_push_event (queue->srcpad, event);
1460 
1461     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1462     /* if we're EOS, return EOS so that the task pauses. */
1463     if (type == GST_EVENT_EOS) {
1464       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1465           "pushed EOS event %p, return EOS", event);
1466       result = GST_FLOW_EOS;
1467     }
1468   } else if (GST_IS_QUERY (data)) {
1469     GstQuery *query = GST_QUERY_CAST (data);
1470     gboolean ret;
1471 
1472     GST_QUEUE_MUTEX_UNLOCK (queue);
1473     ret = gst_pad_peer_query (queue->srcpad, query);
1474     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing_query);
1475     queue->last_query = ret;
1476     queue->last_handled_query = query;
1477     g_cond_signal (&queue->query_handled);
1478     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1479         "did query %p, return %d", query, queue->last_query);
1480   }
1481   return result;
1482 
1483   /* ERRORS */
1484 no_item:
1485   {
1486     GST_CAT_ERROR_OBJECT (queue_dataflow, queue,
1487         "exit because we have no item in the queue");
1488     return GST_FLOW_ERROR;
1489   }
1490 out_flushing:
1491   {
1492     GstFlowReturn ret = queue->srcresult;
1493     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1494         "exit because task paused, reason: %s", gst_flow_get_name (ret));
1495     return ret;
1496   }
1497 out_flushing_query:
1498   {
1499     GstFlowReturn ret = queue->srcresult;
1500     queue->last_query = FALSE;
1501     g_cond_signal (&queue->query_handled);
1502     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1503         "exit because task paused, reason: %s", gst_flow_get_name (ret));
1504     return ret;
1505   }
1506 }
1507 
1508 static void
gst_queue_loop(GstPad * pad)1509 gst_queue_loop (GstPad * pad)
1510 {
1511   GstQueue *queue;
1512   GstFlowReturn ret;
1513 
1514   queue = (GstQueue *) GST_PAD_PARENT (pad);
1515 
1516   /* have to lock for thread-safety */
1517   GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1518 
1519   while (gst_queue_is_empty (queue)) {
1520     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is empty");
1521     if (!queue->silent) {
1522       GST_QUEUE_MUTEX_UNLOCK (queue);
1523       g_signal_emit (queue, gst_queue_signals[SIGNAL_UNDERRUN], 0);
1524       GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1525     }
1526 
1527     /* we recheck, the signal could have changed the thresholds */
1528     while (gst_queue_is_empty (queue)) {
1529       GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
1530     }
1531 
1532     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is not empty");
1533     if (!queue->silent) {
1534       GST_QUEUE_MUTEX_UNLOCK (queue);
1535       g_signal_emit (queue, gst_queue_signals[SIGNAL_RUNNING], 0);
1536       g_signal_emit (queue, gst_queue_signals[SIGNAL_PUSHING], 0);
1537       GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1538     }
1539   }
1540 
1541   ret = gst_queue_push_one (queue);
1542   queue->srcresult = ret;
1543   if (ret != GST_FLOW_OK)
1544     goto out_flushing;
1545 
1546   GST_QUEUE_MUTEX_UNLOCK (queue);
1547 
1548   return;
1549 
1550   /* ERRORS */
1551 out_flushing:
1552   {
1553     gboolean eos = queue->eos;
1554     GstFlowReturn ret = queue->srcresult;
1555 
1556     gst_pad_pause_task (queue->srcpad);
1557     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1558         "pause task, reason:  %s", gst_flow_get_name (ret));
1559     if (ret == GST_FLOW_FLUSHING) {
1560       gst_queue_locked_flush (queue, FALSE);
1561     } else {
1562       GST_QUEUE_SIGNAL_DEL (queue);
1563       queue->last_query = FALSE;
1564       g_cond_signal (&queue->query_handled);
1565     }
1566     GST_QUEUE_MUTEX_UNLOCK (queue);
1567     /* let app know about us giving up if upstream is not expected to do so */
1568     /* EOS is already taken care of elsewhere */
1569     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
1570       GST_ELEMENT_FLOW_ERROR (queue, ret);
1571       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
1572     }
1573     return;
1574   }
1575 }
1576 
1577 static gboolean
gst_queue_handle_src_event(GstPad * pad,GstObject * parent,GstEvent * event)1578 gst_queue_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
1579 {
1580   gboolean res = TRUE;
1581   GstQueue *queue = GST_QUEUE (parent);
1582 
1583 #ifndef GST_DISABLE_GST_DEBUG
1584   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
1585       event, GST_EVENT_TYPE (event));
1586 #endif
1587 
1588   switch (GST_EVENT_TYPE (event)) {
1589     case GST_EVENT_RECONFIGURE:
1590       GST_QUEUE_MUTEX_LOCK (queue);
1591       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
1592         /* when we got not linked, assume downstream is linked again now and we
1593          * can try to start pushing again */
1594         queue->srcresult = GST_FLOW_OK;
1595         gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad, NULL);
1596       }
1597       GST_QUEUE_MUTEX_UNLOCK (queue);
1598 
1599       res = gst_pad_push_event (queue->sinkpad, event);
1600       break;
1601     default:
1602       res = gst_pad_event_default (pad, parent, event);
1603       break;
1604   }
1605 
1606 
1607   return res;
1608 }
1609 
1610 static gboolean
gst_queue_handle_src_query(GstPad * pad,GstObject * parent,GstQuery * query)1611 gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1612 {
1613   GstQueue *queue = GST_QUEUE (parent);
1614   gboolean res;
1615 
1616   switch (GST_QUERY_TYPE (query)) {
1617     case GST_QUERY_SCHEDULING:{
1618       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1619       res = TRUE;
1620       break;
1621     }
1622     default:
1623       res = gst_pad_query_default (pad, parent, query);
1624       break;
1625   }
1626 
1627   if (!res)
1628     return FALSE;
1629 
1630   /* Adjust peer response for data contained in queue */
1631   switch (GST_QUERY_TYPE (query)) {
1632     case GST_QUERY_POSITION:
1633     {
1634       gint64 peer_pos;
1635       GstFormat format;
1636 
1637       /* get peer position */
1638       gst_query_parse_position (query, &format, &peer_pos);
1639 
1640       /* FIXME: this code assumes that there's no discont in the queue */
1641       switch (format) {
1642         case GST_FORMAT_BYTES:
1643           peer_pos -= queue->cur_level.bytes;
1644           if (peer_pos < 0)     /* Clamp result to 0 */
1645             peer_pos = 0;
1646           break;
1647         case GST_FORMAT_TIME:
1648           peer_pos -= queue->cur_level.time;
1649           if (peer_pos < 0)     /* Clamp result to 0 */
1650             peer_pos = 0;
1651           break;
1652         default:
1653           GST_DEBUG_OBJECT (queue, "Can't adjust query in %s format, don't "
1654               "know how to adjust value", gst_format_get_name (format));
1655           return TRUE;
1656       }
1657       /* set updated position */
1658       gst_query_set_position (query, format, peer_pos);
1659       break;
1660     }
1661     case GST_QUERY_LATENCY:
1662     {
1663       gboolean live;
1664       GstClockTime min, max;
1665 
1666       gst_query_parse_latency (query, &live, &min, &max);
1667 
1668       /* we can delay up to the limit of the queue in time. If we have no time
1669        * limit, the best thing we can do is to return an infinite delay. In
1670        * reality a better estimate would be the byte/buffer rate but that is not
1671        * possible right now. */
1672       /* TODO: Use CONVERT query? */
1673       if (queue->max_size.time > 0 && max != -1
1674           && queue->leaky == GST_QUEUE_NO_LEAK)
1675         max += queue->max_size.time;
1676       else if (queue->max_size.time > 0 && queue->leaky != GST_QUEUE_NO_LEAK)
1677         max = MAX (queue->max_size.time, max);
1678       else
1679         max = -1;
1680 
1681       /* adjust for min-threshold */
1682       if (queue->min_threshold.time > 0)
1683         min += queue->min_threshold.time;
1684 
1685       gst_query_set_latency (query, live, min, max);
1686       break;
1687     }
1688     default:
1689       /* peer handled other queries */
1690       break;
1691   }
1692 
1693   return TRUE;
1694 }
1695 
1696 static gboolean
gst_queue_sink_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)1697 gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
1698     gboolean active)
1699 {
1700   gboolean result;
1701   GstQueue *queue;
1702 
1703   queue = GST_QUEUE (parent);
1704 
1705   switch (mode) {
1706     case GST_PAD_MODE_PUSH:
1707       if (active) {
1708         GST_QUEUE_MUTEX_LOCK (queue);
1709         queue->srcresult = GST_FLOW_OK;
1710         queue->eos = FALSE;
1711         queue->unexpected = FALSE;
1712         GST_QUEUE_MUTEX_UNLOCK (queue);
1713       } else {
1714         /* step 1, unblock chain function */
1715         GST_QUEUE_MUTEX_LOCK (queue);
1716         queue->srcresult = GST_FLOW_FLUSHING;
1717         /* the item del signal will unblock */
1718         GST_QUEUE_SIGNAL_DEL (queue);
1719         GST_QUEUE_MUTEX_UNLOCK (queue);
1720 
1721         /* step 2, wait until streaming thread stopped and flush queue */
1722         GST_PAD_STREAM_LOCK (pad);
1723         GST_QUEUE_MUTEX_LOCK (queue);
1724         gst_queue_locked_flush (queue, TRUE);
1725         GST_QUEUE_MUTEX_UNLOCK (queue);
1726         GST_PAD_STREAM_UNLOCK (pad);
1727       }
1728       result = TRUE;
1729       break;
1730     default:
1731       result = FALSE;
1732       break;
1733   }
1734   return result;
1735 }
1736 
1737 static gboolean
gst_queue_src_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)1738 gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
1739     gboolean active)
1740 {
1741   gboolean result;
1742   GstQueue *queue;
1743 
1744   queue = GST_QUEUE (parent);
1745 
1746   switch (mode) {
1747     case GST_PAD_MODE_PUSH:
1748       if (active) {
1749         GST_QUEUE_MUTEX_LOCK (queue);
1750         queue->srcresult = GST_FLOW_OK;
1751         queue->eos = FALSE;
1752         queue->unexpected = FALSE;
1753         result =
1754             gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad,
1755             NULL);
1756         GST_QUEUE_MUTEX_UNLOCK (queue);
1757       } else {
1758         /* step 1, unblock loop function */
1759         GST_QUEUE_MUTEX_LOCK (queue);
1760         queue->srcresult = GST_FLOW_FLUSHING;
1761         /* the item add signal will unblock */
1762         g_cond_signal (&queue->item_add);
1763         GST_QUEUE_MUTEX_UNLOCK (queue);
1764 
1765         /* step 2, make sure streaming finishes */
1766         result = gst_pad_stop_task (pad);
1767 
1768         GST_QUEUE_MUTEX_LOCK (queue);
1769         gst_queue_locked_flush (queue, FALSE);
1770         GST_QUEUE_MUTEX_UNLOCK (queue);
1771       }
1772       break;
1773     default:
1774       result = FALSE;
1775       break;
1776   }
1777   return result;
1778 }
1779 
1780 static void
queue_capacity_change(GstQueue * queue)1781 queue_capacity_change (GstQueue * queue)
1782 {
1783   if (queue->leaky == GST_QUEUE_LEAK_DOWNSTREAM) {
1784     gst_queue_leak_downstream (queue);
1785   }
1786 
1787   /* changing the capacity of the queue must wake up
1788    * the _chain function, it might have more room now
1789    * to store the buffer/event in the queue */
1790   GST_QUEUE_SIGNAL_DEL (queue);
1791 }
1792 
1793 /* Changing the minimum required fill level must
1794  * wake up the _loop function as it might now
1795  * be able to preceed.
1796  */
1797 #define QUEUE_THRESHOLD_CHANGE(q)\
1798   GST_QUEUE_SIGNAL_ADD (q);
1799 
1800 static void
gst_queue_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1801 gst_queue_set_property (GObject * object,
1802     guint prop_id, const GValue * value, GParamSpec * pspec)
1803 {
1804   GstQueue *queue = GST_QUEUE (object);
1805 
1806   /* someone could change levels here, and since this
1807    * affects the get/put funcs, we need to lock for safety. */
1808   GST_QUEUE_MUTEX_LOCK (queue);
1809 
1810   switch (prop_id) {
1811     case PROP_MAX_SIZE_BYTES:
1812       queue->max_size.bytes = g_value_get_uint (value);
1813       queue_capacity_change (queue);
1814       break;
1815     case PROP_MAX_SIZE_BUFFERS:
1816       queue->max_size.buffers = g_value_get_uint (value);
1817       queue_capacity_change (queue);
1818       break;
1819     case PROP_MAX_SIZE_TIME:
1820       queue->max_size.time = g_value_get_uint64 (value);
1821       queue_capacity_change (queue);
1822       break;
1823     case PROP_MIN_THRESHOLD_BYTES:
1824       queue->min_threshold.bytes = g_value_get_uint (value);
1825       queue->orig_min_threshold.bytes = queue->min_threshold.bytes;
1826       QUEUE_THRESHOLD_CHANGE (queue);
1827       break;
1828     case PROP_MIN_THRESHOLD_BUFFERS:
1829       queue->min_threshold.buffers = g_value_get_uint (value);
1830       queue->orig_min_threshold.buffers = queue->min_threshold.buffers;
1831       QUEUE_THRESHOLD_CHANGE (queue);
1832       break;
1833     case PROP_MIN_THRESHOLD_TIME:
1834       queue->min_threshold.time = g_value_get_uint64 (value);
1835       queue->orig_min_threshold.time = queue->min_threshold.time;
1836       QUEUE_THRESHOLD_CHANGE (queue);
1837       break;
1838     case PROP_LEAKY:
1839       queue->leaky = g_value_get_enum (value);
1840       break;
1841     case PROP_SILENT:
1842       queue->silent = g_value_get_boolean (value);
1843       break;
1844     case PROP_FLUSH_ON_EOS:
1845       queue->flush_on_eos = g_value_get_boolean (value);
1846       break;
1847     default:
1848       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1849       break;
1850   }
1851 
1852   GST_QUEUE_MUTEX_UNLOCK (queue);
1853 }
1854 
1855 static void
gst_queue_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1856 gst_queue_get_property (GObject * object,
1857     guint prop_id, GValue * value, GParamSpec * pspec)
1858 {
1859   GstQueue *queue = GST_QUEUE (object);
1860 
1861   GST_QUEUE_MUTEX_LOCK (queue);
1862 
1863   switch (prop_id) {
1864     case PROP_CUR_LEVEL_BYTES:
1865       g_value_set_uint (value, queue->cur_level.bytes);
1866       break;
1867     case PROP_CUR_LEVEL_BUFFERS:
1868       g_value_set_uint (value, queue->cur_level.buffers);
1869       break;
1870     case PROP_CUR_LEVEL_TIME:
1871       g_value_set_uint64 (value, queue->cur_level.time);
1872       break;
1873     case PROP_MAX_SIZE_BYTES:
1874       g_value_set_uint (value, queue->max_size.bytes);
1875       break;
1876     case PROP_MAX_SIZE_BUFFERS:
1877       g_value_set_uint (value, queue->max_size.buffers);
1878       break;
1879     case PROP_MAX_SIZE_TIME:
1880       g_value_set_uint64 (value, queue->max_size.time);
1881       break;
1882     case PROP_MIN_THRESHOLD_BYTES:
1883       g_value_set_uint (value, queue->min_threshold.bytes);
1884       break;
1885     case PROP_MIN_THRESHOLD_BUFFERS:
1886       g_value_set_uint (value, queue->min_threshold.buffers);
1887       break;
1888     case PROP_MIN_THRESHOLD_TIME:
1889       g_value_set_uint64 (value, queue->min_threshold.time);
1890       break;
1891     case PROP_LEAKY:
1892       g_value_set_enum (value, queue->leaky);
1893       break;
1894     case PROP_SILENT:
1895       g_value_set_boolean (value, queue->silent);
1896       break;
1897     case PROP_FLUSH_ON_EOS:
1898       g_value_set_boolean (value, queue->flush_on_eos);
1899       break;
1900     default:
1901       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1902       break;
1903   }
1904 
1905   GST_QUEUE_MUTEX_UNLOCK (queue);
1906 }
1907