• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2003 Colin Walters <cwalters@gnome.org>
4  *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5  *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6  *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7  *
8  * gstqueue2.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25 
26 /**
27  * SECTION:element-queue2
28  * @title: queue2
29  *
30  * Data is queued until one of the limits specified by the
31  * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
32  * #GstQueue2:max-size-time properties has been reached. Any attempt to push
33  * more buffers into the queue will block the pushing thread until more space
34  * becomes available.
35  *
36  * The queue will create a new thread on the source pad to decouple the
37  * processing on sink and source pad.
38  *
39  * You can query how many buffers are queued by reading the
40  * #GstQueue2:current-level-buffers property.
41  *
42  * The default queue size limits are 100 buffers, 2MB of data, or
43  * two seconds worth of data, whichever is reached first.
44  *
45  * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
46  * will allocate a random free filename and buffer data in the file.
47  * By using this, it will buffer the entire stream data on the file independently
48  * of the queue size limits, they will only be used for buffering statistics.
49  *
50  * The temp-location property will be used to notify the application of the
51  * allocated filename.
52  *
53  * If the #GstQueue2:use-buffering property is set to TRUE, and any writable
54  * property is modified, #GstQueue2 will attempt to post a buffering message
55  * if the changes to the properties also cause the buffering percentage to be
56  * changed (for example, because the queue's capacity was changed and it already
57  * contains some data).
58  */
59 
60 #ifdef HAVE_CONFIG_H
61 #include "config.h"
62 #endif
63 
64 #include "gstqueue2.h"
65 #include "gstcoreelementselements.h"
66 
67 #include <glib/gstdio.h>
68 
69 #include "gst/gst-i18n-lib.h"
70 #include "gst/glib-compat-private.h"
71 
72 #include <string.h>
73 
74 #ifdef G_OS_WIN32
75 #include <io.h>                 /* lseek, open, close, read */
76 #undef lseek
77 #define lseek _lseeki64
78 #undef off_t
79 #define off_t guint64
80 #else
81 #include <unistd.h>
82 #endif
83 
84 #ifdef __BIONIC__               /* Android */
85 #include <fcntl.h>
86 #endif
87 
88 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
89     GST_PAD_SINK,
90     GST_PAD_ALWAYS,
91     GST_STATIC_CAPS_ANY);
92 
93 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
94     GST_PAD_SRC,
95     GST_PAD_ALWAYS,
96     GST_STATIC_CAPS_ANY);
97 
98 GST_DEBUG_CATEGORY_STATIC (queue_debug);
99 #define GST_CAT_DEFAULT (queue_debug)
100 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
101 
102 enum
103 {
104   LAST_SIGNAL
105 };
106 
107 /* other defines */
108 #define DEFAULT_BUFFER_SIZE 4096
109 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
110 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
111 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
112 
113 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
114 
115 /* default property values */
116 #define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
117 #define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
118 #define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
119 #define DEFAULT_USE_BUFFERING      FALSE
120 #define DEFAULT_USE_TAGS_BITRATE   FALSE
121 #define DEFAULT_USE_RATE_ESTIMATE  TRUE
122 #define DEFAULT_LOW_PERCENT        10
123 #define DEFAULT_HIGH_PERCENT       99
124 #define DEFAULT_LOW_WATERMARK      0.01
125 #define DEFAULT_HIGH_WATERMARK     0.99
126 #define DEFAULT_TEMP_REMOVE        TRUE
127 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
128 #define DEFAULT_USE_BITRATE_QUERY  TRUE
129 
130 enum
131 {
132   PROP_0,
133   PROP_CUR_LEVEL_BUFFERS,
134   PROP_CUR_LEVEL_BYTES,
135   PROP_CUR_LEVEL_TIME,
136   PROP_MAX_SIZE_BUFFERS,
137   PROP_MAX_SIZE_BYTES,
138   PROP_MAX_SIZE_TIME,
139   PROP_USE_BUFFERING,
140   PROP_USE_TAGS_BITRATE,
141   PROP_USE_RATE_ESTIMATE,
142   PROP_LOW_PERCENT,
143   PROP_HIGH_PERCENT,
144   PROP_LOW_WATERMARK,
145   PROP_HIGH_WATERMARK,
146   PROP_TEMP_TEMPLATE,
147   PROP_TEMP_LOCATION,
148   PROP_TEMP_REMOVE,
149   PROP_RING_BUFFER_MAX_SIZE,
150   PROP_AVG_IN_RATE,
151   PROP_USE_BITRATE_QUERY,
152   PROP_BITRATE,
153   PROP_LAST
154 };
155 static GParamSpec *obj_props[PROP_LAST] = { NULL, };
156 
157 /* Explanation for buffer levels and percentages:
158  *
159  * The buffering_level functions here return a value in a normalized range
160  * that specifies the queue's current fill level. The range goes from 0 to
161  * MAX_BUFFERING_LEVEL. The low/high watermarks also use this same range.
162  *
163  * This is not to be confused with the buffering_percent value, which is
164  * a *relative* quantity - relative to the low/high watermarks.
165  * buffering_percent = 0% means buffering_level is at the low watermark.
166  * buffering_percent = 100% means buffering_level is at the high watermark.
167  * buffering_percent is used for determining if the fill level has reached
168  * the high watermark, and for producing BUFFERING messages. This value
169  * always uses a 0..100 range (since it is a percentage).
170  *
171  * To avoid future confusions, whenever "buffering level" is mentioned, it
172  * refers to the absolute level which is in the 0..MAX_BUFFERING_LEVEL
173  * range. Whenever "buffering_percent" is mentioned, it refers to the
174  * percentage value that is relative to the low/high watermark. */
175 
176 /* Using a buffering level range of 0..1000000 to allow for a
177  * resolution in ppm (1 ppm = 0.0001%) */
178 #define MAX_BUFFERING_LEVEL 1000000
179 
180 /* How much 1% makes up in the buffer level range */
181 #define BUF_LEVEL_PERCENT_FACTOR ((MAX_BUFFERING_LEVEL) / 100)
182 
183 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
184   l.buffers = 0;                                        \
185   l.bytes = 0;                                          \
186   l.time = 0;                                           \
187   l.rate_time = 0;                                      \
188 } G_STMT_END
189 
190 #define STATUS(queue, pad, msg) \
191   GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
192                       "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
193                       "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
194                       " ns, %"G_GUINT64_FORMAT" items", \
195                       GST_DEBUG_PAD_NAME (pad), \
196                       queue->cur_level.buffers, \
197                       queue->max_level.buffers, \
198                       queue->cur_level.bytes, \
199                       queue->max_level.bytes, \
200                       queue->cur_level.time, \
201                       queue->max_level.time, \
202                       (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
203                         queue->current->writing_pos - queue->current->max_reading_pos : \
204                         gst_queue_array_get_length(queue->queue)))
205 
206 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
207   g_mutex_lock (&q->qlock);                                              \
208 } G_STMT_END
209 
210 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
211   GST_QUEUE2_MUTEX_LOCK (q);                                            \
212   if (res != GST_FLOW_OK)                                               \
213     goto label;                                                         \
214 } G_STMT_END
215 
216 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
217   g_mutex_unlock (&q->qlock);                                            \
218 } G_STMT_END
219 
220 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
221   STATUS (queue, q->sinkpad, "wait for DEL");                           \
222   q->waiting_del = TRUE;                                                \
223   g_cond_wait (&q->item_del, &queue->qlock);                              \
224   q->waiting_del = FALSE;                                               \
225   if (res != GST_FLOW_OK) {                                             \
226     STATUS (queue, q->srcpad, "received DEL wakeup");                   \
227     goto label;                                                         \
228   }                                                                     \
229   STATUS (queue, q->sinkpad, "received DEL");                           \
230 } G_STMT_END
231 
232 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
233   STATUS (queue, q->srcpad, "wait for ADD");                            \
234   q->waiting_add = TRUE;                                                \
235   g_cond_wait (&q->item_add, &q->qlock);                                  \
236   q->waiting_add = FALSE;                                               \
237   if (res != GST_FLOW_OK) {                                             \
238     STATUS (queue, q->srcpad, "received ADD wakeup");                   \
239     goto label;                                                         \
240   }                                                                     \
241   STATUS (queue, q->srcpad, "received ADD");                            \
242 } G_STMT_END
243 
244 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
245   if (q->waiting_del) {                                                 \
246     STATUS (q, q->srcpad, "signal DEL");                                \
247     g_cond_signal (&q->item_del);                                        \
248   }                                                                     \
249 } G_STMT_END
250 
251 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
252   if (q->waiting_add) {                                                 \
253     STATUS (q, q->sinkpad, "signal ADD");                               \
254     g_cond_signal (&q->item_add);                                        \
255   }                                                                     \
256 } G_STMT_END
257 
258 #define SET_PERCENT(q, perc) G_STMT_START {                              \
259   if (perc != q->buffering_percent) {                                    \
260     q->buffering_percent = perc;                                         \
261     q->percent_changed = TRUE;                                           \
262     GST_DEBUG_OBJECT (q, "buffering %d percent", perc);                  \
263     get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out,     \
264         &q->buffering_left);                                             \
265   }                                                                      \
266 } G_STMT_END
267 
268 #define _do_init \
269     GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
270     GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
271         "dataflow inside the queue element");
272 #define gst_queue2_parent_class parent_class
273 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
274 GST_ELEMENT_REGISTER_DEFINE (queue2, "queue2", GST_RANK_NONE, GST_TYPE_QUEUE2);
275 
276 static void gst_queue2_finalize (GObject * object);
277 
278 static void gst_queue2_set_property (GObject * object,
279     guint prop_id, const GValue * value, GParamSpec * pspec);
280 static void gst_queue2_get_property (GObject * object,
281     guint prop_id, GValue * value, GParamSpec * pspec);
282 
283 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
284     GstBuffer * buffer);
285 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
286     GstBufferList * buffer_list);
287 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
288 static void gst_queue2_loop (GstPad * pad);
289 
290 static GstFlowReturn gst_queue2_handle_sink_event (GstPad * pad,
291     GstObject * parent, GstEvent * event);
292 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
293     GstQuery * query);
294 
295 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
296     GstEvent * event);
297 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
298     GstQuery * query);
299 static gboolean gst_queue2_handle_query (GstElement * element,
300     GstQuery * query);
301 
302 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
303     guint64 offset, guint length, GstBuffer ** buffer);
304 
305 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
306     GstPadMode mode, gboolean active);
307 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
308     GstPadMode mode, gboolean active);
309 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
310     GstStateChange transition);
311 
312 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
313 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
314 
315 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
316 static void update_in_rates (GstQueue2 * queue, gboolean force);
317 static GstMessage *gst_queue2_get_buffering_message (GstQueue2 * queue,
318     gint * percent);
319 static void update_buffering (GstQueue2 * queue);
320 static void gst_queue2_post_buffering (GstQueue2 * queue);
321 
322 typedef enum
323 {
324   GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
325   GST_QUEUE2_ITEM_TYPE_BUFFER,
326   GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
327   GST_QUEUE2_ITEM_TYPE_EVENT,
328   GST_QUEUE2_ITEM_TYPE_QUERY
329 } GstQueue2ItemType;
330 
331 typedef struct
332 {
333   GstQueue2ItemType type;
334   GstMiniObject *item;
335 } GstQueue2Item;
336 
337 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
338 
339 static void
gst_queue2_class_init(GstQueue2Class * klass)340 gst_queue2_class_init (GstQueue2Class * klass)
341 {
342   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
343   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
344 
345   gobject_class->set_property = gst_queue2_set_property;
346   gobject_class->get_property = gst_queue2_get_property;
347 
348   /* properties */
349   obj_props[PROP_CUR_LEVEL_BYTES] = g_param_spec_uint ("current-level-bytes",
350       "Current level (kB)", "Current amount of data in the queue (bytes)",
351       0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
352   obj_props[PROP_CUR_LEVEL_BUFFERS] =
353       g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
354       "Current number of buffers in the queue",
355       0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
356   obj_props[PROP_CUR_LEVEL_TIME] = g_param_spec_uint64 ("current-level-time",
357       "Current level (ns)", "Current amount of data in the queue (in ns)",
358       0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
359 
360   obj_props[PROP_MAX_SIZE_BYTES] = g_param_spec_uint ("max-size-bytes",
361       "Max. size (kB)", "Max. amount of data in the queue (bytes, 0=disable)",
362       0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
363       G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS);
364   obj_props[PROP_MAX_SIZE_BUFFERS] = g_param_spec_uint ("max-size-buffers",
365       "Max. size (buffers)", "Max. number of buffers in the queue (0=disable)",
366       0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS,
367       G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS);
368   obj_props[PROP_MAX_SIZE_TIME] = g_param_spec_uint64 ("max-size-time",
369       "Max. size (ns)", "Max. amount of data in the queue (in ns, 0=disable)",
370       0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME,
371       G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS);
372 
373   obj_props[PROP_USE_BUFFERING] = g_param_spec_boolean ("use-buffering",
374       "Use buffering",
375       "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds "
376       "(0% = low-watermark, 100% = high-watermark)",
377       DEFAULT_USE_BUFFERING,
378       G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS);
379   obj_props[PROP_USE_TAGS_BITRATE] = g_param_spec_boolean ("use-tags-bitrate",
380       "Use bitrate from tags",
381       "Use a bitrate from upstream tags to estimate buffer duration if not provided",
382       DEFAULT_USE_TAGS_BITRATE,
383       G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS);
384   obj_props[PROP_USE_RATE_ESTIMATE] = g_param_spec_boolean ("use-rate-estimate",
385       "Use Rate Estimate",
386       "Estimate the bitrate of the stream to calculate time level",
387       DEFAULT_USE_RATE_ESTIMATE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
388   obj_props[PROP_LOW_PERCENT] = g_param_spec_int ("low-percent", "Low percent",
389       "Low threshold for buffering to start. Only used if use-buffering is True "
390       "(Deprecated: use low-watermark instead)",
391       0, 100, DEFAULT_LOW_WATERMARK * 100,
392       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
393   obj_props[PROP_HIGH_PERCENT] = g_param_spec_int ("high-percent",
394       "High percent",
395       "High threshold for buffering to finish. Only used if use-buffering is True "
396       "(Deprecated: use high-watermark instead)",
397       0, 100, DEFAULT_HIGH_WATERMARK * 100,
398       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
399   obj_props[PROP_LOW_WATERMARK] = g_param_spec_double ("low-watermark",
400       "Low watermark",
401       "Low threshold for buffering to start. Only used if use-buffering is True",
402       0.0, 1.0, DEFAULT_LOW_WATERMARK,
403       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
404   obj_props[PROP_HIGH_WATERMARK] = g_param_spec_double ("high-watermark",
405       "High watermark",
406       "High threshold for buffering to finish. Only used if use-buffering is True",
407       0.0, 1.0, DEFAULT_HIGH_WATERMARK,
408       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
409 
410   obj_props[PROP_TEMP_TEMPLATE] = g_param_spec_string ("temp-template",
411       "Temporary File Template",
412       "File template to store temporary files in, should contain directory "
413       "and XXXXXX. (NULL == disabled)",
414       NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
415 
416   obj_props[PROP_TEMP_LOCATION] = g_param_spec_string ("temp-location",
417       "Temporary File Location",
418       "Location to store temporary files in (Only read this property, "
419       "use temp-template to configure the name template)",
420       NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
421 
422   obj_props[PROP_USE_BITRATE_QUERY] = g_param_spec_boolean ("use-bitrate-query",
423       "Use bitrate from downstream query",
424       "Use a bitrate from a downstream query to estimate buffer duration if not provided",
425       DEFAULT_USE_BITRATE_QUERY,
426       G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS);
427 
428   /**
429    * GstQueue2:temp-remove
430    *
431    * When temp-template is set, remove the temporary file when going to READY.
432    */
433   obj_props[PROP_TEMP_REMOVE] = g_param_spec_boolean ("temp-remove",
434       "Remove the Temporary File", "Remove the temp-location after use",
435       DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
436 
437   /**
438    * GstQueue2:ring-buffer-max-size
439    *
440    * The maximum size of the ring buffer in bytes. If set to 0, the ring
441    * buffer is disabled. Default 0.
442    */
443   obj_props[PROP_RING_BUFFER_MAX_SIZE] =
444       g_param_spec_uint64 ("ring-buffer-max-size",
445       "Max. ring buffer size (bytes)",
446       "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
447       0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
448       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
449 
450   /**
451    * GstQueue2:avg-in-rate
452    *
453    * The average input data rate.
454    */
455   obj_props[PROP_AVG_IN_RATE] = g_param_spec_int64 ("avg-in-rate",
456       "Input data rate (bytes/s)", "Average input data rate (bytes/s)",
457       0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
458 
459   /**
460    * GstQueue2:bitrate
461    *
462    * The value used to convert between byte and time values for limiting
463    * the size of the queue.  Values are taken from either the upstream tags
464    * or from the downstream bitrate query.
465    */
466   obj_props[PROP_BITRATE] = g_param_spec_uint64 ("bitrate", "Bitrate (bits/s)",
467       "Conversion value between data size and time",
468       0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
469 
470   g_object_class_install_properties (gobject_class, PROP_LAST, obj_props);
471 
472   /* set several parent class virtual functions */
473   gobject_class->finalize = gst_queue2_finalize;
474 
475   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
476   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
477 
478   gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
479       "Generic",
480       "Simple data queue",
481       "Erik Walthinsen <omega@cse.ogi.edu>, "
482       "Wim Taymans <wim.taymans@gmail.com>");
483 
484   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
485   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
486 }
487 
488 static void
gst_queue2_init(GstQueue2 * queue)489 gst_queue2_init (GstQueue2 * queue)
490 {
491   queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
492 
493   gst_pad_set_chain_function (queue->sinkpad,
494       GST_DEBUG_FUNCPTR (gst_queue2_chain));
495   gst_pad_set_chain_list_function (queue->sinkpad,
496       GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
497   gst_pad_set_activatemode_function (queue->sinkpad,
498       GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
499   gst_pad_set_event_full_function (queue->sinkpad,
500       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
501   gst_pad_set_query_function (queue->sinkpad,
502       GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
503   GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
504   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
505 
506   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
507 
508   gst_pad_set_activatemode_function (queue->srcpad,
509       GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
510   gst_pad_set_getrange_function (queue->srcpad,
511       GST_DEBUG_FUNCPTR (gst_queue2_get_range));
512   gst_pad_set_event_function (queue->srcpad,
513       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
514   gst_pad_set_query_function (queue->srcpad,
515       GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
516   GST_PAD_SET_PROXY_CAPS (queue->srcpad);
517   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
518 
519   /* levels */
520   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
521   queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
522   queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
523   queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
524   queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
525   queue->use_buffering = DEFAULT_USE_BUFFERING;
526   queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
527   queue->low_watermark = DEFAULT_LOW_WATERMARK * MAX_BUFFERING_LEVEL;
528   queue->high_watermark = DEFAULT_HIGH_WATERMARK * MAX_BUFFERING_LEVEL;
529 
530   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
531   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
532 
533   queue->sinktime = GST_CLOCK_TIME_NONE;
534   queue->srctime = GST_CLOCK_TIME_NONE;
535   queue->sink_tainted = TRUE;
536   queue->src_tainted = TRUE;
537 
538   queue->srcresult = GST_FLOW_FLUSHING;
539   queue->sinkresult = GST_FLOW_FLUSHING;
540   queue->is_eos = FALSE;
541   queue->in_timer = g_timer_new ();
542   queue->out_timer = g_timer_new ();
543 
544   g_mutex_init (&queue->qlock);
545   queue->waiting_add = FALSE;
546   g_cond_init (&queue->item_add);
547   queue->waiting_del = FALSE;
548   g_cond_init (&queue->item_del);
549   queue->queue = gst_queue_array_new_for_struct (sizeof (GstQueue2Item), 32);
550 
551   g_cond_init (&queue->query_handled);
552   queue->last_query = FALSE;
553 
554   g_mutex_init (&queue->buffering_post_lock);
555   queue->buffering_percent = 100;
556   queue->last_posted_buffering_percent = -1;
557 
558   /* tempfile related */
559   queue->temp_template = NULL;
560   queue->temp_location = NULL;
561   queue->temp_remove = DEFAULT_TEMP_REMOVE;
562 
563   queue->ring_buffer = NULL;
564   queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
565 
566   queue->use_bitrate_query = DEFAULT_USE_BITRATE_QUERY;
567 
568   GST_DEBUG_OBJECT (queue,
569       "initialized queue's not_empty & not_full conditions");
570 }
571 
572 /* called only once, as opposed to dispose */
573 static void
gst_queue2_finalize(GObject * object)574 gst_queue2_finalize (GObject * object)
575 {
576   GstQueue2 *queue = GST_QUEUE2 (object);
577   GstQueue2Item *qitem;
578 
579   GST_DEBUG_OBJECT (queue, "finalizing queue");
580 
581   while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
582     if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
583       gst_mini_object_unref (qitem->item);
584   }
585   gst_queue_array_free (queue->queue);
586 
587   queue->last_query = FALSE;
588   g_mutex_clear (&queue->qlock);
589   g_mutex_clear (&queue->buffering_post_lock);
590   g_cond_clear (&queue->item_add);
591   g_cond_clear (&queue->item_del);
592   g_cond_clear (&queue->query_handled);
593   g_timer_destroy (queue->in_timer);
594   g_timer_destroy (queue->out_timer);
595 
596   /* temp_file path cleanup  */
597   g_free (queue->temp_template);
598   g_free (queue->temp_location);
599 
600   G_OBJECT_CLASS (parent_class)->finalize (object);
601 }
602 
603 static void
debug_ranges(GstQueue2 * queue)604 debug_ranges (GstQueue2 * queue)
605 {
606   GstQueue2Range *walk;
607 
608   for (walk = queue->ranges; walk; walk = walk->next) {
609     GST_DEBUG_OBJECT (queue,
610         "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
611         G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
612         " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
613         walk->rb_writing_pos, walk->reading_pos,
614         walk == queue->current ? "**y**" : "  n  ");
615   }
616 }
617 
618 /* clear all the downloaded ranges */
619 static void
clean_ranges(GstQueue2 * queue)620 clean_ranges (GstQueue2 * queue)
621 {
622   GST_DEBUG_OBJECT (queue, "clean queue ranges");
623 
624   g_slice_free_chain (GstQueue2Range, queue->ranges, next);
625   queue->ranges = NULL;
626   queue->current = NULL;
627 }
628 
629 /* find a range that contains @offset or NULL when nothing does */
630 static GstQueue2Range *
find_range(GstQueue2 * queue,guint64 offset)631 find_range (GstQueue2 * queue, guint64 offset)
632 {
633   GstQueue2Range *range = NULL;
634   GstQueue2Range *walk;
635 
636   /* first do a quick check for the current range */
637   for (walk = queue->ranges; walk; walk = walk->next) {
638     if (offset >= walk->offset && offset <= walk->writing_pos) {
639       /* we can reuse an existing range */
640       range = walk;
641       break;
642     }
643   }
644   if (range) {
645     GST_DEBUG_OBJECT (queue,
646         "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
647         G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
648   } else {
649     GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
650   }
651   return range;
652 }
653 
654 static void
update_cur_level(GstQueue2 * queue,GstQueue2Range * range)655 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
656 {
657   guint64 max_reading_pos, writing_pos;
658 
659   writing_pos = range->writing_pos;
660   max_reading_pos = range->max_reading_pos;
661 
662   if (writing_pos > max_reading_pos)
663     queue->cur_level.bytes = writing_pos - max_reading_pos;
664   else
665     queue->cur_level.bytes = 0;
666 }
667 
668 /* make a new range for @offset or reuse an existing range */
669 static GstQueue2Range *
add_range(GstQueue2 * queue,guint64 offset,gboolean update_existing)670 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
671 {
672   GstQueue2Range *range, *prev, *next;
673 
674   GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
675 
676   if ((range = find_range (queue, offset))) {
677     GST_DEBUG_OBJECT (queue,
678         "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
679         range->writing_pos);
680     if (update_existing && range->writing_pos != offset) {
681       GST_DEBUG_OBJECT (queue, "updating range writing position to "
682           "%" G_GUINT64_FORMAT, offset);
683       range->writing_pos = offset;
684     }
685   } else {
686     GST_DEBUG_OBJECT (queue,
687         "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
688 
689     range = g_slice_new0 (GstQueue2Range);
690     range->offset = offset;
691     /* we want to write to the next location in the ring buffer */
692     range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
693     range->writing_pos = offset;
694     range->rb_writing_pos = range->rb_offset;
695     range->reading_pos = offset;
696     range->max_reading_pos = offset;
697 
698     /* insert sorted */
699     prev = NULL;
700     next = queue->ranges;
701     while (next) {
702       if (next->offset > offset) {
703         /* insert before next */
704         GST_DEBUG_OBJECT (queue,
705             "insert before range %p, offset %" G_GUINT64_FORMAT, next,
706             next->offset);
707         break;
708       }
709       /* try next */
710       prev = next;
711       next = next->next;
712     }
713     range->next = next;
714     if (prev)
715       prev->next = range;
716     else
717       queue->ranges = range;
718   }
719   debug_ranges (queue);
720 
721   /* update the stats for this range */
722   update_cur_level (queue, range);
723 
724   return range;
725 }
726 
727 
728 /* clear and init the download ranges for offset 0 */
729 static void
init_ranges(GstQueue2 * queue)730 init_ranges (GstQueue2 * queue)
731 {
732   GST_DEBUG_OBJECT (queue, "init queue ranges");
733 
734   /* get rid of all the current ranges */
735   clean_ranges (queue);
736   /* make a range for offset 0 */
737   queue->current = add_range (queue, 0, TRUE);
738 }
739 
740 /* calculate the diff between running time on the sink and src of the queue.
741  * This is the total amount of time in the queue. */
742 static void
update_time_level(GstQueue2 * queue)743 update_time_level (GstQueue2 * queue)
744 {
745   if (queue->sink_tainted) {
746     queue->sinktime =
747         gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
748         queue->sink_segment.position);
749     queue->sink_tainted = FALSE;
750   }
751 
752   if (queue->src_tainted) {
753     queue->srctime =
754         gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
755         queue->src_segment.position);
756     queue->src_tainted = FALSE;
757   }
758 
759   GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
760       GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
761 
762   if (queue->sinktime != GST_CLOCK_TIME_NONE
763       && queue->srctime != GST_CLOCK_TIME_NONE
764       && queue->sinktime >= queue->srctime)
765     queue->cur_level.time = queue->sinktime - queue->srctime;
766   else
767     queue->cur_level.time = 0;
768 }
769 
770 /* take a SEGMENT event and apply the values to segment, updating the time
771  * level of queue. */
772 static void
apply_segment(GstQueue2 * queue,GstEvent * event,GstSegment * segment,gboolean is_sink)773 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
774     gboolean is_sink)
775 {
776   gst_event_copy_segment (event, segment);
777 
778   if (segment->format == GST_FORMAT_BYTES) {
779     if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
780       /* start is where we'll be getting from and as such writing next */
781       queue->current = add_range (queue, segment->start, TRUE);
782     }
783   }
784 
785   /* now configure the values, we use these to track timestamps on the
786    * sinkpad. */
787   if (segment->format != GST_FORMAT_TIME) {
788     /* non-time format, pretend the current time segment is closed with a
789      * 0 start and unknown stop time. */
790     segment->format = GST_FORMAT_TIME;
791     segment->start = 0;
792     segment->stop = -1;
793     segment->time = 0;
794   }
795 
796   GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
797 
798   if (is_sink)
799     queue->sink_tainted = TRUE;
800   else
801     queue->src_tainted = TRUE;
802 
803   /* segment can update the time level of the queue */
804   update_time_level (queue);
805 }
806 
807 static void
apply_gap(GstQueue2 * queue,GstEvent * event,GstSegment * segment,gboolean is_sink)808 apply_gap (GstQueue2 * queue, GstEvent * event,
809     GstSegment * segment, gboolean is_sink)
810 {
811   GstClockTime timestamp;
812   GstClockTime duration;
813 
814   gst_event_parse_gap (event, &timestamp, &duration);
815 
816   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
817 
818     if (GST_CLOCK_TIME_IS_VALID (duration)) {
819       timestamp += duration;
820     }
821 
822     segment->position = timestamp;
823 
824     if (is_sink)
825       queue->sink_tainted = TRUE;
826     else
827       queue->src_tainted = TRUE;
828 
829     /* calc diff with other end */
830     update_time_level (queue);
831   }
832 }
833 
834 static void
query_downstream_bitrate(GstQueue2 * queue)835 query_downstream_bitrate (GstQueue2 * queue)
836 {
837   GstQuery *query = gst_query_new_bitrate ();
838   guint downstream_bitrate = 0;
839   gboolean changed;
840 
841   if (gst_pad_peer_query (queue->srcpad, query)) {
842     gst_query_parse_bitrate (query, &downstream_bitrate);
843     GST_DEBUG_OBJECT (queue, "Got bitrate of %u from downstream",
844         downstream_bitrate);
845   } else {
846     GST_DEBUG_OBJECT (queue, "Failed to query bitrate from downstream");
847   }
848 
849   gst_query_unref (query);
850 
851   GST_QUEUE2_MUTEX_LOCK (queue);
852   changed = queue->downstream_bitrate != downstream_bitrate;
853   queue->downstream_bitrate = downstream_bitrate;
854   GST_QUEUE2_MUTEX_UNLOCK (queue);
855 
856   if (changed) {
857     if (queue->use_buffering)
858       update_buffering (queue);
859     gst_queue2_post_buffering (queue);
860 
861     g_object_notify_by_pspec (G_OBJECT (queue), obj_props[PROP_BITRATE]);
862   }
863 }
864 
865 /* take a buffer and update segment, updating the time level of the queue. */
866 static void
apply_buffer(GstQueue2 * queue,GstBuffer * buffer,GstSegment * segment,guint64 size,gboolean is_sink)867 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
868     guint64 size, gboolean is_sink)
869 {
870   GstClockTime duration, timestamp;
871 
872   timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
873   duration = GST_BUFFER_DURATION (buffer);
874 
875   /* If we have no duration, pick one from the bitrate if we can */
876   if (duration == GST_CLOCK_TIME_NONE) {
877     if (queue->use_tags_bitrate) {
878       guint bitrate =
879           is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate;
880       if (bitrate)
881         duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate);
882     }
883     if (duration == GST_CLOCK_TIME_NONE && !is_sink && queue->use_bitrate_query) {
884       if (queue->downstream_bitrate > 0) {
885         duration =
886             gst_util_uint64_scale (size, 8 * GST_SECOND,
887             queue->downstream_bitrate);
888 
889         GST_LOG_OBJECT (queue, "got bitrate %u resulting in estimated "
890             "duration %" GST_TIME_FORMAT, queue->downstream_bitrate,
891             GST_TIME_ARGS (duration));
892       }
893     }
894   }
895 
896   /* if no timestamp is set, assume it's continuous with the previous
897    * time */
898   if (timestamp == GST_CLOCK_TIME_NONE)
899     timestamp = segment->position;
900 
901   /* add duration */
902   if (duration != GST_CLOCK_TIME_NONE)
903     timestamp += duration;
904 
905   GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
906       GST_TIME_ARGS (timestamp));
907 
908   segment->position = timestamp;
909 
910   if (is_sink)
911     queue->sink_tainted = TRUE;
912   else
913     queue->src_tainted = TRUE;
914 
915   /* calc diff with other end */
916   update_time_level (queue);
917 }
918 
919 struct BufListData
920 {
921   GstClockTime timestamp;
922   guint bitrate;
923 };
924 
925 static gboolean
buffer_list_apply_time(GstBuffer ** buf,guint idx,gpointer data)926 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
927 {
928   struct BufListData *bld = data;
929   GstClockTime *timestamp = &bld->timestamp;
930   GstClockTime btime;
931 
932   GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
933       " duration %" GST_TIME_FORMAT, idx,
934       GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
935       GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
936       GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
937 
938   btime = GST_BUFFER_DTS_OR_PTS (*buf);
939   if (GST_CLOCK_TIME_IS_VALID (btime))
940     *timestamp = btime;
941 
942   if (GST_BUFFER_DURATION_IS_VALID (*buf))
943     *timestamp += GST_BUFFER_DURATION (*buf);
944   else if (bld->bitrate != 0) {
945     guint64 size = gst_buffer_get_size (*buf);
946 
947     /* If we have no duration, pick one from the bitrate if we can */
948     *timestamp += gst_util_uint64_scale (bld->bitrate, 8 * GST_SECOND, size);
949   }
950 
951 
952   GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
953   return TRUE;
954 }
955 
956 /* take a buffer list and update segment, updating the time level of the queue */
957 static void
apply_buffer_list(GstQueue2 * queue,GstBufferList * buffer_list,GstSegment * segment,gboolean is_sink)958 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
959     GstSegment * segment, gboolean is_sink)
960 {
961   struct BufListData bld;
962 
963   /* if no timestamp is set, assume it's continuous with the previous time */
964   bld.timestamp = segment->position;
965 
966   bld.bitrate = 0;
967   if (queue->use_tags_bitrate) {
968     if (is_sink)
969       bld.bitrate = queue->sink_tags_bitrate;
970     else
971       bld.bitrate = queue->src_tags_bitrate;
972   }
973   if (!is_sink && bld.bitrate == 0 && queue->use_bitrate_query) {
974     bld.bitrate = queue->downstream_bitrate;
975   }
976 
977   gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld);
978 
979   GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
980       GST_TIME_ARGS (bld.timestamp));
981 
982   segment->position = bld.timestamp;
983 
984   if (is_sink)
985     queue->sink_tainted = TRUE;
986   else
987     queue->src_tainted = TRUE;
988 
989   /* calc diff with other end */
990   update_time_level (queue);
991 }
992 
993 static inline gint
normalize_to_buffering_level(guint64 cur_level,guint64 max_level,guint64 alt_max)994 normalize_to_buffering_level (guint64 cur_level, guint64 max_level,
995     guint64 alt_max)
996 {
997   guint64 p;
998 
999   if (max_level == 0)
1000     return 0;
1001 
1002   if (alt_max > 0)
1003     p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL,
1004         MIN (max_level, alt_max));
1005   else
1006     p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL, max_level);
1007 
1008   return MIN (p, MAX_BUFFERING_LEVEL);
1009 }
1010 
1011 static gboolean
get_buffering_level(GstQueue2 * queue,gboolean * is_buffering,gint * buffering_level)1012 get_buffering_level (GstQueue2 * queue, gboolean * is_buffering,
1013     gint * buffering_level)
1014 {
1015   gint buflevel, buflevel2;
1016 
1017   if (queue->high_watermark <= 0) {
1018     if (buffering_level)
1019       *buffering_level = MAX_BUFFERING_LEVEL;
1020     if (is_buffering)
1021       *is_buffering = FALSE;
1022     return FALSE;
1023   }
1024 #define GET_BUFFER_LEVEL_FOR_QUANTITY(format,alt_max) \
1025     normalize_to_buffering_level (queue->cur_level.format,queue->max_level.format,(alt_max))
1026 
1027   if (queue->is_eos || queue->srcresult == GST_FLOW_NOT_LINKED) {
1028     /* on EOS and NOT_LINKED we are always 100% full, we set the var
1029      * here so that we can reuse the logic below to stop buffering */
1030     buflevel = MAX_BUFFERING_LEVEL;
1031     GST_LOG_OBJECT (queue, "we are %s", queue->is_eos ? "EOS" : "NOT_LINKED");
1032   } else {
1033     GST_LOG_OBJECT (queue,
1034         "Cur level bytes/time/rate-time/buffers %u/%" GST_TIME_FORMAT "/%"
1035         GST_TIME_FORMAT "/%u", queue->cur_level.bytes,
1036         GST_TIME_ARGS (queue->cur_level.time),
1037         GST_TIME_ARGS (queue->cur_level.rate_time), queue->cur_level.buffers);
1038 
1039     /* figure out the buffering level we are filled, we take the max of all formats. */
1040     if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
1041       buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, 0);
1042     } else {
1043       guint64 rb_size = queue->ring_buffer_max_size;
1044       buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, rb_size);
1045     }
1046 
1047     buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (time, 0);
1048     buflevel = MAX (buflevel, buflevel2);
1049 
1050     buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (buffers, 0);
1051     buflevel = MAX (buflevel, buflevel2);
1052 
1053     /* also apply the rate estimate when we need to */
1054     if (queue->use_rate_estimate) {
1055       buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (rate_time, 0);
1056       buflevel = MAX (buflevel, buflevel2);
1057     }
1058 
1059     /* Don't get to 0% unless we're really empty */
1060     if (queue->cur_level.bytes > 0)
1061       buflevel = MAX (1, buflevel);
1062   }
1063 #undef GET_BUFFER_LEVEL_FOR_QUANTITY
1064 
1065   if (is_buffering)
1066     *is_buffering = queue->is_buffering;
1067 
1068   if (buffering_level)
1069     *buffering_level = buflevel;
1070 
1071   GST_DEBUG_OBJECT (queue, "buffering %d, level %d", queue->is_buffering,
1072       buflevel);
1073 
1074   return TRUE;
1075 }
1076 
1077 static gint
convert_to_buffering_percent(GstQueue2 * queue,gint buffering_level)1078 convert_to_buffering_percent (GstQueue2 * queue, gint buffering_level)
1079 {
1080   int percent;
1081 
1082   /* scale so that if buffering_level equals the high watermark,
1083    * the percentage is 100% */
1084   percent = buffering_level * 100 / queue->high_watermark;
1085   /* clip */
1086   if (percent > 100)
1087     percent = 100;
1088 
1089   return percent;
1090 }
1091 
1092 static void
get_buffering_stats(GstQueue2 * queue,gint percent,GstBufferingMode * mode,gint * avg_in,gint * avg_out,gint64 * buffering_left)1093 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
1094     gint * avg_in, gint * avg_out, gint64 * buffering_left)
1095 {
1096   if (mode) {
1097     if (!QUEUE_IS_USING_QUEUE (queue)) {
1098       if (QUEUE_IS_USING_RING_BUFFER (queue))
1099         *mode = GST_BUFFERING_TIMESHIFT;
1100       else
1101         *mode = GST_BUFFERING_DOWNLOAD;
1102     } else {
1103       *mode = GST_BUFFERING_STREAM;
1104     }
1105   }
1106 
1107   if (avg_in)
1108     *avg_in = queue->byte_in_rate;
1109   if (avg_out)
1110     *avg_out = queue->byte_out_rate;
1111 
1112   if (buffering_left) {
1113     *buffering_left = (percent == 100 ? 0 : -1);
1114 
1115     if (queue->use_rate_estimate) {
1116       guint64 max, cur;
1117 
1118       max = queue->max_level.rate_time;
1119       cur = queue->cur_level.rate_time;
1120 
1121       if (percent != 100 && max > cur)
1122         *buffering_left = (max - cur) / 1000000;
1123     }
1124   }
1125 }
1126 
1127 /* Called with the lock taken */
1128 static GstMessage *
gst_queue2_get_buffering_message(GstQueue2 * queue,gint * percent)1129 gst_queue2_get_buffering_message (GstQueue2 * queue, gint * percent)
1130 {
1131   GstMessage *msg = NULL;
1132   if (queue->percent_changed) {
1133     /* Don't change the buffering level if the sinkpad is waiting for
1134      * space to become available.  This prevents the situation where,
1135      * upstream is pushing buffers larger than our limits so only 1 buffer
1136      * is ever in the queue at a time.
1137      * Changing the level causes a buffering message to be posted saying that
1138      * we are buffering which the application may pause to wait for another
1139      * 100% buffering message which would be posted very soon after the
1140      * waiting sink thread adds it's buffer to the queue */
1141     /* FIXME: This situation above can still occur later if
1142      * the sink pad is waiting to push a serialized event into the queue and
1143      * the queue becomes empty for a short period of time. */
1144     if (!queue->waiting_del
1145         && queue->last_posted_buffering_percent != queue->buffering_percent) {
1146       *percent = queue->buffering_percent;
1147 
1148       GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", *percent);
1149       msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), *percent);
1150 
1151       gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
1152           queue->avg_out, queue->buffering_left);
1153     }
1154   }
1155 
1156   return msg;
1157 }
1158 
1159 static void
gst_queue2_post_buffering(GstQueue2 * queue)1160 gst_queue2_post_buffering (GstQueue2 * queue)
1161 {
1162   GstMessage *msg = NULL;
1163   gint percent = -1;
1164 
1165   g_mutex_lock (&queue->buffering_post_lock);
1166   GST_QUEUE2_MUTEX_LOCK (queue);
1167   msg = gst_queue2_get_buffering_message (queue, &percent);
1168   GST_QUEUE2_MUTEX_UNLOCK (queue);
1169 
1170   if (msg != NULL) {
1171     if (gst_element_post_message (GST_ELEMENT_CAST (queue), msg)) {
1172       GST_QUEUE2_MUTEX_LOCK (queue);
1173       /* Set these states only if posting the message succeeded. Otherwise,
1174        * this post attempt failed, and the next one won't be done, because
1175        * gst_queue2_get_buffering_message() checks these states and decides
1176        * based on their values that it won't produce a message. */
1177       queue->last_posted_buffering_percent = percent;
1178       if (percent == queue->buffering_percent)
1179         queue->percent_changed = FALSE;
1180       GST_QUEUE2_MUTEX_UNLOCK (queue);
1181       GST_DEBUG_OBJECT (queue, "successfully posted %d%% buffering message",
1182           percent);
1183     } else
1184       GST_DEBUG_OBJECT (queue, "could not post buffering message");
1185   }
1186 
1187   g_mutex_unlock (&queue->buffering_post_lock);
1188 }
1189 
1190 static void
update_buffering(GstQueue2 * queue)1191 update_buffering (GstQueue2 * queue)
1192 {
1193   gint buffering_level, percent;
1194 
1195   /* Ensure the variables used to calculate buffering state are up-to-date. */
1196   if (queue->current)
1197     update_cur_level (queue, queue->current);
1198   update_in_rates (queue, FALSE);
1199 
1200   if (!get_buffering_level (queue, NULL, &buffering_level))
1201     return;
1202 
1203   percent = convert_to_buffering_percent (queue, buffering_level);
1204 
1205   if (queue->is_buffering) {
1206     /* if we were buffering see if we reached the high watermark */
1207     if (percent >= 100)
1208       queue->is_buffering = FALSE;
1209 
1210     SET_PERCENT (queue, percent);
1211   } else {
1212     /* we were not buffering, check if we need to start buffering if we drop
1213      * below the low threshold */
1214     if (buffering_level < queue->low_watermark) {
1215       queue->is_buffering = TRUE;
1216       SET_PERCENT (queue, percent);
1217     }
1218   }
1219 }
1220 
1221 static void
reset_rate_timer(GstQueue2 * queue)1222 reset_rate_timer (GstQueue2 * queue)
1223 {
1224   queue->bytes_in = 0;
1225   queue->bytes_out = 0;
1226   queue->byte_in_rate = 0.0;
1227   queue->byte_in_period = 0;
1228   queue->byte_out_rate = 0.0;
1229   queue->last_update_in_rates_elapsed = 0.0;
1230   queue->last_in_elapsed = 0.0;
1231   queue->last_out_elapsed = 0.0;
1232   queue->in_timer_started = FALSE;
1233   queue->out_timer_started = FALSE;
1234 }
1235 
1236 /* the interval in seconds to recalculate the rate */
1237 #define RATE_INTERVAL    0.2
1238 /* Tuning for rate estimation. We use a large window for the input rate because
1239  * it should be stable when connected to a network. The output rate is less
1240  * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1241  * therefore adapt more quickly.
1242  * However, initial input rate may be subject to a burst, and should therefore
1243  * initially also adapt more quickly to changes, and only later on give higher
1244  * weight to previous values. */
1245 #define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1246 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1247 
1248 static void
update_in_rates(GstQueue2 * queue,gboolean force)1249 update_in_rates (GstQueue2 * queue, gboolean force)
1250 {
1251   gdouble elapsed, period;
1252   gdouble byte_in_rate;
1253 
1254   if (!queue->in_timer_started) {
1255     queue->in_timer_started = TRUE;
1256     g_timer_start (queue->in_timer);
1257     return;
1258   }
1259 
1260   queue->last_update_in_rates_elapsed = elapsed =
1261       g_timer_elapsed (queue->in_timer, NULL);
1262 
1263   /* recalc after each interval. */
1264   if (force || queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1265     period = elapsed - queue->last_in_elapsed;
1266 
1267     GST_DEBUG_OBJECT (queue,
1268         "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1269         period, queue->bytes_in, queue->byte_in_period);
1270 
1271     byte_in_rate = queue->bytes_in / period;
1272 
1273     if (queue->byte_in_rate == 0.0)
1274       queue->byte_in_rate = byte_in_rate;
1275     else
1276       queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1277           (double) queue->byte_in_period, period);
1278 
1279     /* another data point, cap at 16 for long time running average */
1280     if (queue->byte_in_period < 16 * RATE_INTERVAL)
1281       queue->byte_in_period += period;
1282 
1283     /* reset the values to calculate rate over the next interval */
1284     queue->last_in_elapsed = elapsed;
1285     queue->bytes_in = 0;
1286   }
1287 
1288   if (queue->use_bitrate_query && queue->downstream_bitrate > 0) {
1289     queue->cur_level.rate_time =
1290         gst_util_uint64_scale (8 * queue->cur_level.bytes, GST_SECOND,
1291         queue->downstream_bitrate);
1292     GST_LOG_OBJECT (queue,
1293         "got bitrate %u with byte level %u resulting in time %"
1294         GST_TIME_FORMAT, queue->downstream_bitrate, queue->cur_level.bytes,
1295         GST_TIME_ARGS (queue->cur_level.rate_time));
1296   } else if (queue->byte_in_rate > 0.0) {
1297     queue->cur_level.rate_time =
1298         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1299   }
1300   GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1301       queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1302 }
1303 
1304 static void
update_out_rates(GstQueue2 * queue)1305 update_out_rates (GstQueue2 * queue)
1306 {
1307   gdouble elapsed, period;
1308   gdouble byte_out_rate;
1309 
1310   if (!queue->out_timer_started) {
1311     queue->out_timer_started = TRUE;
1312     g_timer_start (queue->out_timer);
1313     return;
1314   }
1315 
1316   elapsed = g_timer_elapsed (queue->out_timer, NULL);
1317 
1318   /* recalc after each interval. */
1319   if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1320     period = elapsed - queue->last_out_elapsed;
1321 
1322     GST_DEBUG_OBJECT (queue,
1323         "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1324 
1325     byte_out_rate = queue->bytes_out / period;
1326 
1327     if (queue->byte_out_rate == 0.0)
1328       queue->byte_out_rate = byte_out_rate;
1329     else
1330       queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1331 
1332     /* reset the values to calculate rate over the next interval */
1333     queue->last_out_elapsed = elapsed;
1334     queue->bytes_out = 0;
1335   }
1336   if (queue->byte_in_rate > 0.0) {
1337     queue->cur_level.rate_time =
1338         queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1339   }
1340   GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1341       queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1342 }
1343 
1344 static void
update_cur_pos(GstQueue2 * queue,GstQueue2Range * range,guint64 pos)1345 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1346 {
1347   guint64 reading_pos, max_reading_pos;
1348 
1349   reading_pos = pos;
1350   max_reading_pos = range->max_reading_pos;
1351 
1352   max_reading_pos = MAX (max_reading_pos, reading_pos);
1353 
1354   GST_DEBUG_OBJECT (queue,
1355       "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1356       G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1357   range->max_reading_pos = max_reading_pos;
1358 
1359   update_cur_level (queue, range);
1360 }
1361 
1362 static gboolean
perform_seek_to_offset(GstQueue2 * queue,guint64 offset)1363 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1364 {
1365   GstEvent *event;
1366   gboolean res;
1367 
1368   /* until we receive the FLUSH_STOP from this seek, we skip data */
1369   queue->seeking = TRUE;
1370   GST_QUEUE2_MUTEX_UNLOCK (queue);
1371 
1372   debug_ranges (queue);
1373 
1374   GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1375 
1376   event =
1377       gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1378       GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1379       GST_SEEK_TYPE_NONE, -1);
1380 
1381   res = gst_pad_push_event (queue->sinkpad, event);
1382   GST_QUEUE2_MUTEX_LOCK (queue);
1383 
1384   if (res) {
1385     /* Between us sending the seek event and re-acquiring the lock, the source
1386      * thread might already have pushed data and moved along the range's
1387      * writing_pos beyond the seek offset. In that case we don't want to set
1388      * the writing position back to the requested seek position, as it would
1389      * cause data to be written to the wrong offset in the file or ring buffer.
1390      * We still do the add_range call to switch the current range to the
1391      * requested range, or create one if one doesn't exist yet. */
1392     queue->current = add_range (queue, offset, FALSE);
1393   }
1394 
1395   return res;
1396 }
1397 
1398 /* get the threshold for when we decide to seek rather than wait */
1399 static guint64
get_seek_threshold(GstQueue2 * queue)1400 get_seek_threshold (GstQueue2 * queue)
1401 {
1402   guint64 threshold;
1403 
1404   /* FIXME, find a good threshold based on the incoming rate. */
1405   threshold = 1024 * 512;
1406 
1407   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1408     threshold = MIN (threshold,
1409         QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1410   }
1411   return threshold;
1412 }
1413 
1414 /* see if there is enough data in the file to read a full buffer */
1415 static gboolean
gst_queue2_have_data(GstQueue2 * queue,guint64 offset,guint length)1416 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1417 {
1418   GstQueue2Range *range;
1419 
1420   GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1421       offset, length);
1422 
1423   if ((range = find_range (queue, offset))) {
1424     if (queue->current != range) {
1425       GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1426       perform_seek_to_offset (queue, range->writing_pos);
1427     }
1428 
1429     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1430         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1431 
1432     /* we have a range for offset */
1433     GST_DEBUG_OBJECT (queue,
1434         "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1435         G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1436 
1437     if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1438       return TRUE;
1439 
1440     if (offset + length <= range->writing_pos)
1441       return TRUE;
1442     else
1443       GST_DEBUG_OBJECT (queue,
1444           "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1445           (offset + length) - range->writing_pos);
1446 
1447   } else {
1448     GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1449         " len %u", offset, length);
1450     /* we don't have the range, see how far away we are */
1451     if (!queue->is_eos && queue->current) {
1452       guint64 threshold = get_seek_threshold (queue);
1453 
1454       if (offset >= queue->current->offset && offset <=
1455           queue->current->writing_pos + threshold) {
1456         GST_INFO_OBJECT (queue,
1457             "requested data is within range, wait for data");
1458         return FALSE;
1459       }
1460     }
1461 
1462     /* too far away, do a seek */
1463     perform_seek_to_offset (queue, offset);
1464   }
1465 
1466   return FALSE;
1467 }
1468 
1469 #ifdef HAVE_FSEEKO
1470 #define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1471 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1472 #define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1473 #else
1474 #define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
1475 #endif
1476 
1477 static GstFlowReturn
gst_queue2_read_data_at_offset(GstQueue2 * queue,guint64 offset,guint length,guint8 * dst,gint64 * read_return)1478 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1479     guint8 * dst, gint64 * read_return)
1480 {
1481   guint8 *ring_buffer;
1482   size_t res;
1483 
1484   ring_buffer = queue->ring_buffer;
1485 
1486   if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1487     goto seek_failed;
1488 
1489   /* this should not block */
1490   GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1491       length, offset);
1492   if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1493     res = fread (dst, 1, length, queue->temp_file);
1494   } else {
1495     memcpy (dst, ring_buffer + offset, length);
1496     res = length;
1497   }
1498 
1499   GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1500 
1501   if (G_UNLIKELY (res < length)) {
1502     if (!QUEUE_IS_USING_TEMP_FILE (queue))
1503       goto could_not_read;
1504     /* check for errors or EOF */
1505     if (ferror (queue->temp_file))
1506       goto could_not_read;
1507     if (feof (queue->temp_file) && length > 0)
1508       goto eos;
1509   }
1510 
1511   *read_return = res;
1512 
1513   return GST_FLOW_OK;
1514 
1515 seek_failed:
1516   {
1517     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1518     return GST_FLOW_ERROR;
1519   }
1520 could_not_read:
1521   {
1522     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1523     return GST_FLOW_ERROR;
1524   }
1525 eos:
1526   {
1527     GST_DEBUG ("non-regular file hits EOS");
1528     return GST_FLOW_EOS;
1529   }
1530 }
1531 
1532 static GstFlowReturn
gst_queue2_create_read(GstQueue2 * queue,guint64 offset,guint length,GstBuffer ** buffer)1533 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1534     GstBuffer ** buffer)
1535 {
1536   GstBuffer *buf;
1537   GstMapInfo info;
1538   guint8 *data;
1539   guint64 file_offset;
1540   guint block_length, remaining, read_length;
1541   guint64 rb_size;
1542   guint64 max_size;
1543   guint64 rpos;
1544   GstFlowReturn ret = GST_FLOW_OK;
1545 
1546   /* allocate the output buffer of the requested size */
1547   if (*buffer == NULL)
1548     buf = gst_buffer_new_allocate (NULL, length, NULL);
1549   else
1550     buf = *buffer;
1551 
1552   if (!gst_buffer_map (buf, &info, GST_MAP_WRITE))
1553     goto buffer_write_fail;
1554   data = info.data;
1555 
1556   GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1557       offset);
1558 
1559   rpos = offset;
1560   rb_size = queue->ring_buffer_max_size;
1561   max_size = QUEUE_MAX_BYTES (queue);
1562 
1563   remaining = length;
1564   while (remaining > 0) {
1565     /* configure how much/whether to read */
1566     if (!gst_queue2_have_data (queue, rpos, remaining)) {
1567       read_length = 0;
1568 
1569       if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1570         guint64 level;
1571 
1572         /* calculate how far away the offset is */
1573         if (queue->current->writing_pos > rpos)
1574           level = queue->current->writing_pos - rpos;
1575         else
1576           level = 0;
1577 
1578         GST_DEBUG_OBJECT (queue,
1579             "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1580             ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1581             rpos, queue->current->writing_pos, level, max_size);
1582 
1583         if (level >= max_size) {
1584           /* we don't have the data but if we have a ring buffer that is full, we
1585            * need to read */
1586           GST_DEBUG_OBJECT (queue,
1587               "ring buffer full, reading QUEUE_MAX_BYTES %"
1588               G_GUINT64_FORMAT " bytes", max_size);
1589           read_length = max_size;
1590         } else if (queue->is_eos) {
1591           /* won't get any more data so read any data we have */
1592           if (level) {
1593             GST_DEBUG_OBJECT (queue,
1594                 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1595                 level);
1596 #ifdef OHOS_OPT_STABLE
1597             // ohos.opt.stable.0002
1598             // Fix the heap overflow issue. After that gst_queue2_have_data
1599             // is executed, the queue may be filled with enough data to fill the output buffer,
1600             // or even much larger than the output buffer, causing heap overflow problems in
1601             // the subsequent copy process.
1602             //
1603             // The root cause is that the lock is released before the seek operation and then the
1604             // lock is reacquired after the seek operation, which causes the judgment before the lock
1605             // release operation to no longer be valid: there is not enough data in the queue to
1606             // fill the output buffer.
1607             //
1608             // Therefore, the length of the data to be copied is protected to avoid heap overflow
1609             // problems.
1610 
1611             read_length = MIN (level, remaining);
1612             remaining = read_length;
1613             length = read_length;
1614 #else
1615             read_length = level;
1616             remaining = level;
1617             length = level;
1618 #endif
1619           } else
1620             goto hit_eos;
1621         }
1622       }
1623 
1624       if (read_length == 0) {
1625         if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1626           GST_DEBUG_OBJECT (queue,
1627               "update current position [%" G_GUINT64_FORMAT "-%"
1628               G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1629           update_cur_pos (queue, queue->current, rpos);
1630           GST_QUEUE2_SIGNAL_DEL (queue);
1631         }
1632 
1633         if (queue->use_buffering)
1634           update_buffering (queue);
1635 
1636         GST_DEBUG_OBJECT (queue, "waiting for add");
1637         GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1638         continue;
1639       }
1640     } else {
1641       /* we have the requested data so read it */
1642       read_length = remaining;
1643     }
1644 
1645     /* set range reading_pos to actual reading position for this read */
1646     queue->current->reading_pos = rpos;
1647 
1648     /* configure how much and from where to read */
1649     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1650       file_offset =
1651           (queue->current->rb_offset + (rpos -
1652               queue->current->offset)) % rb_size;
1653       if (file_offset + read_length > rb_size) {
1654         block_length = rb_size - file_offset;
1655       } else {
1656         block_length = read_length;
1657       }
1658     } else {
1659       file_offset = rpos;
1660       block_length = read_length;
1661     }
1662 
1663     /* while we still have data to read, we loop */
1664     while (read_length > 0) {
1665       gint64 read_return;
1666 
1667       ret =
1668           gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1669           data, &read_return);
1670       if (ret != GST_FLOW_OK)
1671         goto read_error;
1672 
1673       file_offset += read_return;
1674       if (QUEUE_IS_USING_RING_BUFFER (queue))
1675         file_offset %= rb_size;
1676 
1677       data += read_return;
1678       read_length -= read_return;
1679       block_length = read_length;
1680       remaining -= read_return;
1681 
1682       rpos = (queue->current->reading_pos += read_return);
1683       update_cur_pos (queue, queue->current, queue->current->reading_pos);
1684     }
1685     GST_QUEUE2_SIGNAL_DEL (queue);
1686     GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1687   }
1688 
1689   gst_buffer_unmap (buf, &info);
1690   gst_buffer_resize (buf, 0, length);
1691 
1692   GST_BUFFER_OFFSET (buf) = offset;
1693   GST_BUFFER_OFFSET_END (buf) = offset + length;
1694 
1695   *buffer = buf;
1696 
1697   return ret;
1698 
1699   /* ERRORS */
1700 hit_eos:
1701   {
1702     GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1703     gst_buffer_unmap (buf, &info);
1704     if (*buffer == NULL)
1705       gst_buffer_unref (buf);
1706     return GST_FLOW_EOS;
1707   }
1708 out_flushing:
1709   {
1710     GST_DEBUG_OBJECT (queue, "we are flushing");
1711     gst_buffer_unmap (buf, &info);
1712     if (*buffer == NULL)
1713       gst_buffer_unref (buf);
1714     return GST_FLOW_FLUSHING;
1715   }
1716 read_error:
1717   {
1718     GST_DEBUG_OBJECT (queue, "we have a read error");
1719     gst_buffer_unmap (buf, &info);
1720     if (*buffer == NULL)
1721       gst_buffer_unref (buf);
1722     return ret;
1723   }
1724 buffer_write_fail:
1725   {
1726     GST_ELEMENT_ERROR (queue, RESOURCE, WRITE, (NULL),
1727         ("Can't write to buffer"));
1728     if (*buffer == NULL)
1729       gst_buffer_unref (buf);
1730     return GST_FLOW_ERROR;
1731   }
1732 }
1733 
1734 /* should be called with QUEUE_LOCK */
1735 static GstMiniObject *
gst_queue2_read_item_from_file(GstQueue2 * queue)1736 gst_queue2_read_item_from_file (GstQueue2 * queue)
1737 {
1738   GstMiniObject *item;
1739 
1740   if (queue->stream_start_event != NULL) {
1741     item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1742     queue->stream_start_event = NULL;
1743   } else if (queue->starting_segment != NULL) {
1744     item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1745     queue->starting_segment = NULL;
1746   } else {
1747     GstFlowReturn ret;
1748     GstBuffer *buffer = NULL;
1749     guint64 reading_pos;
1750 
1751     reading_pos = queue->current->reading_pos;
1752 
1753     ret =
1754         gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1755         &buffer);
1756 
1757     switch (ret) {
1758       case GST_FLOW_OK:
1759         item = GST_MINI_OBJECT_CAST (buffer);
1760         break;
1761       case GST_FLOW_EOS:
1762         item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1763         break;
1764       default:
1765         item = NULL;
1766         break;
1767     }
1768   }
1769   return item;
1770 }
1771 
1772 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1773  * the temp filename. */
1774 static gboolean
gst_queue2_open_temp_location_file(GstQueue2 * queue)1775 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1776 {
1777   gint fd = -1;
1778   gchar *name = NULL;
1779 
1780   if (queue->temp_file)
1781     goto already_opened;
1782 
1783   GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1784 
1785   /* If temp_template was set, allocate a filename and open that file */
1786 
1787   /* nothing to do */
1788   if (queue->temp_template == NULL)
1789     goto no_directory;
1790 
1791   /* make copy of the template, we don't want to change this */
1792   name = g_strdup (queue->temp_template);
1793 
1794 #ifdef __BIONIC__
1795   fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
1796 #else
1797   fd = g_mkstemp (name);
1798 #endif
1799 
1800   if (fd == -1)
1801     goto mkstemp_failed;
1802 
1803   /* open the file for update/writing */
1804   queue->temp_file = fdopen (fd, "wb+");
1805   /* error creating file */
1806   if (queue->temp_file == NULL)
1807     goto open_failed;
1808 
1809   g_free (queue->temp_location);
1810   queue->temp_location = name;
1811 
1812   GST_QUEUE2_MUTEX_UNLOCK (queue);
1813 
1814   /* we can't emit the notify with the lock */
1815   g_object_notify_by_pspec (G_OBJECT (queue), obj_props[PROP_TEMP_LOCATION]);
1816 
1817   GST_QUEUE2_MUTEX_LOCK (queue);
1818 
1819   GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1820 
1821   return TRUE;
1822 
1823   /* ERRORS */
1824 already_opened:
1825   {
1826     GST_DEBUG_OBJECT (queue, "temp file was already open");
1827     return TRUE;
1828   }
1829 no_directory:
1830   {
1831     GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1832         (_("No Temp directory specified.")), (NULL));
1833     return FALSE;
1834   }
1835 mkstemp_failed:
1836   {
1837     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1838         (_("Could not create temp file \"%s\"."), queue->temp_template),
1839         GST_ERROR_SYSTEM);
1840     g_free (name);
1841     return FALSE;
1842   }
1843 open_failed:
1844   {
1845     GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1846         (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1847     g_free (name);
1848     if (fd != -1)
1849       close (fd);
1850     return FALSE;
1851   }
1852 }
1853 
1854 static void
gst_queue2_close_temp_location_file(GstQueue2 * queue)1855 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1856 {
1857   /* nothing to do */
1858   if (queue->temp_file == NULL)
1859     return;
1860 
1861   GST_DEBUG_OBJECT (queue, "closing temp file");
1862 
1863   fflush (queue->temp_file);
1864   fclose (queue->temp_file);
1865 
1866   if (queue->temp_remove) {
1867     if (remove (queue->temp_location) < 0) {
1868       GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1869           queue->temp_location, g_strerror (errno));
1870     }
1871   }
1872 
1873   queue->temp_file = NULL;
1874   clean_ranges (queue);
1875 }
1876 
1877 static void
gst_queue2_flush_temp_file(GstQueue2 * queue)1878 gst_queue2_flush_temp_file (GstQueue2 * queue)
1879 {
1880   if (queue->temp_file == NULL)
1881     return;
1882 
1883   GST_DEBUG_OBJECT (queue, "flushing temp file");
1884 
1885   queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1886 }
1887 
1888 static void
gst_queue2_locked_flush(GstQueue2 * queue,gboolean full,gboolean clear_temp)1889 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1890 {
1891   if (!QUEUE_IS_USING_QUEUE (queue)) {
1892     if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1893       gst_queue2_flush_temp_file (queue);
1894     init_ranges (queue);
1895   } else {
1896     GstQueue2Item *qitem;
1897 
1898     while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
1899       if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1900           && GST_EVENT_IS_STICKY (qitem->item)
1901           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1902           && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1903         gst_pad_store_sticky_event (queue->srcpad,
1904             GST_EVENT_CAST (qitem->item));
1905       }
1906 
1907       /* Then lose another reference because we are supposed to destroy that
1908          data when flushing */
1909       if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1910         gst_mini_object_unref (qitem->item);
1911     }
1912   }
1913   queue->last_query = FALSE;
1914   g_cond_signal (&queue->query_handled);
1915   GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1916   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1917   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1918   queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1919   queue->sink_tainted = queue->src_tainted = TRUE;
1920   if (queue->starting_segment != NULL)
1921     gst_event_unref (queue->starting_segment);
1922   queue->starting_segment = NULL;
1923   queue->segment_event_received = FALSE;
1924   gst_event_replace (&queue->stream_start_event, NULL);
1925 
1926   /* we deleted a lot of something */
1927   GST_QUEUE2_SIGNAL_DEL (queue);
1928 }
1929 
1930 static gboolean
gst_queue2_wait_free_space(GstQueue2 * queue)1931 gst_queue2_wait_free_space (GstQueue2 * queue)
1932 {
1933   /* We make space available if we're "full" according to whatever
1934    * the user defined as "full". */
1935   if (gst_queue2_is_filled (queue)) {
1936     gboolean started;
1937 
1938     /* pause the timer while we wait. The fact that we are waiting does not mean
1939      * the byterate on the input pad is lower */
1940     if ((started = queue->in_timer_started))
1941       g_timer_stop (queue->in_timer);
1942 
1943     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1944         "queue is full, waiting for free space");
1945     do {
1946       /* Wait for space to be available, we could be unlocked because of a flush. */
1947       GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1948     }
1949     while (gst_queue2_is_filled (queue));
1950 
1951     /* and continue if we were running before */
1952     if (started)
1953       g_timer_continue (queue->in_timer);
1954   }
1955   return TRUE;
1956 
1957   /* ERRORS */
1958 out_flushing:
1959   {
1960     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1961     return FALSE;
1962   }
1963 }
1964 
1965 static gboolean
gst_queue2_create_write(GstQueue2 * queue,GstBuffer * buffer)1966 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1967 {
1968   GstMapInfo info;
1969   guint8 *data, *ring_buffer;
1970   guint size, rb_size;
1971   guint64 writing_pos, new_writing_pos;
1972   GstQueue2Range *range, *prev, *next;
1973   gboolean do_seek = FALSE;
1974 
1975   if (QUEUE_IS_USING_RING_BUFFER (queue))
1976     writing_pos = queue->current->rb_writing_pos;
1977   else
1978     writing_pos = queue->current->writing_pos;
1979   ring_buffer = queue->ring_buffer;
1980   rb_size = queue->ring_buffer_max_size;
1981 
1982   if (!gst_buffer_map (buffer, &info, GST_MAP_READ))
1983     goto buffer_read_error;
1984 
1985   size = info.size;
1986   data = info.data;
1987 
1988   GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1989       writing_pos);
1990 
1991   /* sanity check */
1992   if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1993       GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
1994     GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
1995         "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1996         GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
1997   }
1998 
1999   while (size > 0) {
2000     guint to_write;
2001 
2002     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2003       gint64 space;
2004 
2005       /* calculate the space in the ring buffer not used by data from
2006        * the current range */
2007       while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
2008         /* wait until there is some free space */
2009         GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
2010       }
2011       /* get the amount of space we have */
2012       space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
2013 
2014       /* calculate if we need to split or if we can write the entire
2015        * buffer now */
2016       to_write = MIN (size, space);
2017 
2018       /* the writing position in the ring buffer after writing (part
2019        * or all of) the buffer */
2020       new_writing_pos = (writing_pos + to_write) % rb_size;
2021 
2022       prev = NULL;
2023       range = queue->ranges;
2024 
2025       /* if we need to overwrite data in the ring buffer, we need to
2026        * update the ranges
2027        *
2028        * warning: this code is complicated and includes some
2029        * simplifications - pen, paper and diagrams for the cases
2030        * recommended! */
2031       while (range) {
2032         guint64 range_data_start, range_data_end;
2033         GstQueue2Range *range_to_destroy = NULL;
2034 
2035 #ifndef OHOS_OPT_COMPAT
2036 /* ohos.opt.compat.0009
2037  * Resolve the issue of replaying the page gray screen after the video playback is over */
2038         if (range == queue->current)
2039           goto next_range;
2040 #endif
2041         range_data_start = range->rb_offset;
2042         range_data_end = range->rb_writing_pos;
2043 
2044         /* handle the special case where the range has no data in it */
2045         if (range->writing_pos == range->offset) {
2046           if (range != queue->current) {
2047             GST_DEBUG_OBJECT (queue,
2048                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
2049                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
2050             /* remove range */
2051             range_to_destroy = range;
2052             if (prev)
2053               prev->next = range->next;
2054           }
2055           goto next_range;
2056         }
2057 
2058         if (range_data_end > range_data_start) {
2059           if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
2060             goto next_range;
2061 
2062           if (new_writing_pos > range_data_start) {
2063             if (new_writing_pos >= range_data_end) {
2064               GST_DEBUG_OBJECT (queue,
2065                   "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
2066                   G_GUINT64_FORMAT, range->offset, range->writing_pos);
2067               /* remove range */
2068               range_to_destroy = range;
2069               if (prev)
2070                 prev->next = range->next;
2071             } else {
2072               GST_DEBUG_OBJECT (queue,
2073                   "advancing offsets from %" G_GUINT64_FORMAT " (%"
2074                   G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
2075                   G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
2076                   range->offset + new_writing_pos - range_data_start,
2077                   new_writing_pos);
2078               range->offset += (new_writing_pos - range_data_start);
2079               range->rb_offset = new_writing_pos;
2080             }
2081           }
2082         } else {
2083           guint64 new_wpos_virt = writing_pos + to_write;
2084 
2085           if (new_wpos_virt <= range_data_start)
2086             goto next_range;
2087 
2088           if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
2089             GST_DEBUG_OBJECT (queue,
2090                 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
2091                 G_GUINT64_FORMAT, range->offset, range->writing_pos);
2092             /* remove range */
2093             range_to_destroy = range;
2094             if (prev)
2095               prev->next = range->next;
2096           } else {
2097             GST_DEBUG_OBJECT (queue,
2098                 "advancing offsets from %" G_GUINT64_FORMAT " (%"
2099                 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
2100                 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
2101                 range->offset + new_writing_pos - range_data_start,
2102                 new_writing_pos);
2103             range->offset += (new_wpos_virt - range_data_start);
2104             range->rb_offset = new_writing_pos;
2105           }
2106         }
2107 
2108       next_range:
2109         if (!range_to_destroy)
2110           prev = range;
2111 
2112         range = range->next;
2113         if (range_to_destroy) {
2114           if (range_to_destroy == queue->ranges)
2115             queue->ranges = range;
2116           g_slice_free (GstQueue2Range, range_to_destroy);
2117           range_to_destroy = NULL;
2118         }
2119       }
2120     } else {
2121       to_write = size;
2122       new_writing_pos = writing_pos + to_write;
2123     }
2124 
2125     if (QUEUE_IS_USING_TEMP_FILE (queue)
2126         && FSEEK_FILE (queue->temp_file, writing_pos))
2127       goto seek_failed;
2128 
2129     if (new_writing_pos > writing_pos) {
2130       GST_INFO_OBJECT (queue,
2131           "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
2132           "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
2133           queue->current->writing_pos, queue->current->rb_writing_pos);
2134       /* either not using ring buffer or no wrapping, just write */
2135       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2136         if (fwrite (data, to_write, 1, queue->temp_file) != 1)
2137           goto handle_error;
2138       } else {
2139         memcpy (ring_buffer + writing_pos, data, to_write);
2140       }
2141 
2142       if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
2143         /* try to merge with next range */
2144         while ((next = queue->current->next)) {
2145           GST_INFO_OBJECT (queue,
2146               "checking merge with next range %" G_GUINT64_FORMAT " < %"
2147               G_GUINT64_FORMAT, new_writing_pos, next->offset);
2148           if (new_writing_pos < next->offset)
2149             break;
2150 
2151           GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
2152               next->writing_pos);
2153 
2154           /* remove the group */
2155           queue->current->next = next->next;
2156 
2157           /* We use the threshold to decide if we want to do a seek or simply
2158            * read the data again. If there is not so much data in the range we
2159            * prefer to avoid to seek and read it again. */
2160           if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
2161             /* the new range had more data than the threshold, it's worth keeping
2162              * it and doing a seek. */
2163             new_writing_pos = next->writing_pos;
2164             do_seek = TRUE;
2165           }
2166           g_slice_free (GstQueue2Range, next);
2167         }
2168         goto update_and_signal;
2169       }
2170     } else {
2171       /* wrapping */
2172       guint block_one, block_two;
2173 
2174       block_one = rb_size - writing_pos;
2175       block_two = to_write - block_one;
2176 
2177       if (block_one > 0) {
2178         GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
2179         /* write data to end of ring buffer */
2180         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2181           if (fwrite (data, block_one, 1, queue->temp_file) != 1)
2182             goto handle_error;
2183         } else {
2184           memcpy (ring_buffer + writing_pos, data, block_one);
2185         }
2186       }
2187 
2188       if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
2189         goto seek_failed;
2190 
2191       if (block_two > 0) {
2192         GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
2193         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2194           if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
2195             goto handle_error;
2196         } else {
2197           memcpy (ring_buffer, data + block_one, block_two);
2198         }
2199       }
2200     }
2201 
2202   update_and_signal:
2203     /* update the writing positions */
2204     size -= to_write;
2205     GST_INFO_OBJECT (queue,
2206         "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
2207         to_write, writing_pos, size);
2208 
2209     if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2210       data += to_write;
2211       queue->current->writing_pos += to_write;
2212       queue->current->rb_writing_pos = writing_pos = new_writing_pos;
2213     } else {
2214       queue->current->writing_pos = writing_pos = new_writing_pos;
2215     }
2216     if (do_seek)
2217       perform_seek_to_offset (queue, new_writing_pos);
2218 
2219     update_cur_level (queue, queue->current);
2220 
2221     /* update the buffering status */
2222     if (queue->use_buffering) {
2223       GstMessage *msg;
2224       gint percent = -1;
2225       update_buffering (queue);
2226       msg = gst_queue2_get_buffering_message (queue, &percent);
2227       if (msg) {
2228         gboolean post_ok;
2229 
2230         GST_QUEUE2_MUTEX_UNLOCK (queue);
2231 
2232         g_mutex_lock (&queue->buffering_post_lock);
2233         post_ok = gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
2234 
2235         GST_QUEUE2_MUTEX_LOCK (queue);
2236 
2237         if (post_ok) {
2238           /* Set these states only if posting the message succeeded. Otherwise,
2239            * this post attempt failed, and the next one won't be done, because
2240            * gst_queue2_get_buffering_message() checks these states and decides
2241            * based on their values that it won't produce a message. */
2242           queue->last_posted_buffering_percent = percent;
2243           if (percent == queue->buffering_percent)
2244             queue->percent_changed = FALSE;
2245           GST_DEBUG_OBJECT (queue, "successfully posted %d%% buffering message",
2246               percent);
2247         } else {
2248           GST_DEBUG_OBJECT (queue, "could not post buffering message");
2249         }
2250         g_mutex_unlock (&queue->buffering_post_lock);
2251       }
2252     }
2253 
2254     GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
2255         queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
2256 
2257     GST_QUEUE2_SIGNAL_ADD (queue);
2258   }
2259 
2260   gst_buffer_unmap (buffer, &info);
2261 
2262   return TRUE;
2263 
2264   /* ERRORS */
2265 out_flushing:
2266   {
2267     GST_DEBUG_OBJECT (queue, "we are flushing");
2268     gst_buffer_unmap (buffer, &info);
2269     /* FIXME - GST_FLOW_EOS ? */
2270     return FALSE;
2271   }
2272 seek_failed:
2273   {
2274     GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
2275     gst_buffer_unmap (buffer, &info);
2276     return FALSE;
2277   }
2278 handle_error:
2279   {
2280     switch (errno) {
2281       case ENOSPC:{
2282         GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
2283         break;
2284       }
2285       default:{
2286         GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
2287             (_("Error while writing to download file.")),
2288             ("%s", g_strerror (errno)));
2289       }
2290     }
2291     gst_buffer_unmap (buffer, &info);
2292     return FALSE;
2293   }
2294 buffer_read_error:
2295   {
2296     GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL),
2297         ("Can't read from buffer"));
2298     return FALSE;
2299   }
2300 }
2301 
2302 static gboolean
buffer_list_create_write(GstBuffer ** buf,guint idx,gpointer q)2303 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2304 {
2305   GstQueue2 *queue = q;
2306 
2307   GST_TRACE_OBJECT (queue,
2308       "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2309       gst_buffer_get_size (*buf));
2310 
2311   if (!gst_queue2_create_write (queue, *buf)) {
2312     GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2313     return FALSE;
2314   }
2315   return TRUE;
2316 }
2317 
2318 /* enqueue an item an update the level stats */
2319 static void
gst_queue2_locked_enqueue(GstQueue2 * queue,gpointer item,GstQueue2ItemType item_type)2320 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2321     GstQueue2ItemType item_type)
2322 {
2323   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2324     GstBuffer *buffer;
2325     guint size;
2326 
2327     buffer = GST_BUFFER_CAST (item);
2328     size = gst_buffer_get_size (buffer);
2329 
2330     /* add buffer to the statistics */
2331     if (QUEUE_IS_USING_QUEUE (queue)) {
2332       queue->cur_level.buffers++;
2333       queue->cur_level.bytes += size;
2334     }
2335     queue->bytes_in += size;
2336 
2337     /* apply new buffer to segment stats */
2338     apply_buffer (queue, buffer, &queue->sink_segment, size, TRUE);
2339     /* update the byterate stats */
2340     update_in_rates (queue, FALSE);
2341 
2342     if (!QUEUE_IS_USING_QUEUE (queue)) {
2343       /* FIXME - check return value? */
2344       gst_queue2_create_write (queue, buffer);
2345     }
2346   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2347     GstBufferList *buffer_list;
2348     guint size;
2349 
2350     buffer_list = GST_BUFFER_LIST_CAST (item);
2351 
2352     size = gst_buffer_list_calculate_size (buffer_list);
2353     GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2354 
2355     /* add buffer to the statistics */
2356     if (QUEUE_IS_USING_QUEUE (queue)) {
2357       queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2358       queue->cur_level.bytes += size;
2359     }
2360     queue->bytes_in += size;
2361 
2362     /* apply new buffer to segment stats */
2363     apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2364 
2365     /* update the byterate stats */
2366     update_in_rates (queue, FALSE);
2367 
2368     if (!QUEUE_IS_USING_QUEUE (queue)) {
2369       gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2370     }
2371   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2372     GstEvent *event;
2373 
2374     event = GST_EVENT_CAST (item);
2375 
2376     switch (GST_EVENT_TYPE (event)) {
2377       case GST_EVENT_EOS:
2378         /* Zero the thresholds, this makes sure the queue is completely
2379          * filled and we can read all data from the queue. */
2380         GST_DEBUG_OBJECT (queue, "we have EOS");
2381         queue->is_eos = TRUE;
2382         /* Force updating the input bitrate */
2383         update_in_rates (queue, TRUE);
2384         break;
2385       case GST_EVENT_SEGMENT:
2386         apply_segment (queue, event, &queue->sink_segment, TRUE);
2387         /* This is our first new segment, we hold it
2388          * as we can't save it on the temp file */
2389         if (!QUEUE_IS_USING_QUEUE (queue)) {
2390           if (queue->segment_event_received)
2391             goto unexpected_event;
2392 
2393           queue->segment_event_received = TRUE;
2394           if (queue->starting_segment != NULL)
2395             gst_event_unref (queue->starting_segment);
2396           queue->starting_segment = event;
2397           item = NULL;
2398         }
2399         /* a new segment allows us to accept more buffers if we got EOS
2400          * from downstream */
2401         queue->unexpected = FALSE;
2402         break;
2403       case GST_EVENT_GAP:
2404         apply_gap (queue, event, &queue->sink_segment, TRUE);
2405         break;
2406       case GST_EVENT_STREAM_START:
2407         if (!QUEUE_IS_USING_QUEUE (queue)) {
2408           gst_event_replace (&queue->stream_start_event, event);
2409           gst_event_unref (event);
2410           item = NULL;
2411         }
2412         break;
2413       case GST_EVENT_CAPS:{
2414         GstCaps *caps;
2415 
2416         gst_event_parse_caps (event, &caps);
2417         GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2418 
2419         if (!QUEUE_IS_USING_QUEUE (queue)) {
2420           GST_LOG ("Dropping caps event, not using queue");
2421           gst_event_unref (event);
2422           item = NULL;
2423         }
2424         break;
2425       }
2426       default:
2427         if (!QUEUE_IS_USING_QUEUE (queue))
2428           goto unexpected_event;
2429         break;
2430     }
2431   } else if (GST_IS_QUERY (item)) {
2432     /* Can't happen as we check that in the caller */
2433     if (!QUEUE_IS_USING_QUEUE (queue))
2434       g_assert_not_reached ();
2435   } else {
2436     g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2437         item, GST_OBJECT_NAME (queue));
2438     /* we can't really unref since we don't know what it is */
2439     item = NULL;
2440   }
2441 
2442   if (item) {
2443     /* update the buffering status */
2444     if (queue->use_buffering)
2445       update_buffering (queue);
2446 
2447     if (QUEUE_IS_USING_QUEUE (queue)) {
2448       GstQueue2Item qitem;
2449 
2450       qitem.type = item_type;
2451       qitem.item = item;
2452       gst_queue_array_push_tail_struct (queue->queue, &qitem);
2453     } else {
2454       gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2455     }
2456 
2457     GST_QUEUE2_SIGNAL_ADD (queue);
2458   }
2459 
2460   return;
2461 
2462   /* ERRORS */
2463 unexpected_event:
2464   {
2465     gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
2466 
2467     GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
2468         "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
2469         GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
2470     gst_event_unref (GST_EVENT_CAST (item));
2471     return;
2472   }
2473 }
2474 
2475 /* dequeue an item from the queue and update level stats */
2476 static GstMiniObject *
gst_queue2_locked_dequeue(GstQueue2 * queue,GstQueue2ItemType * item_type)2477 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2478 {
2479   GstMiniObject *item;
2480 
2481   if (!QUEUE_IS_USING_QUEUE (queue)) {
2482     item = gst_queue2_read_item_from_file (queue);
2483   } else {
2484     GstQueue2Item *qitem = gst_queue_array_pop_head_struct (queue->queue);
2485 
2486     if (qitem == NULL)
2487       goto no_item;
2488 
2489     item = qitem->item;
2490   }
2491 
2492   if (item == NULL)
2493     goto no_item;
2494 
2495   if (GST_IS_BUFFER (item)) {
2496     GstBuffer *buffer;
2497     guint size;
2498 
2499     buffer = GST_BUFFER_CAST (item);
2500     size = gst_buffer_get_size (buffer);
2501     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2502 
2503     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2504         "retrieved buffer %p from queue", buffer);
2505 
2506     if (QUEUE_IS_USING_QUEUE (queue)) {
2507       queue->cur_level.buffers--;
2508       queue->cur_level.bytes -= size;
2509     }
2510     queue->bytes_out += size;
2511 
2512     apply_buffer (queue, buffer, &queue->src_segment, size, FALSE);
2513     /* update the byterate stats */
2514     update_out_rates (queue);
2515     /* update the buffering */
2516     if (queue->use_buffering)
2517       update_buffering (queue);
2518 
2519   } else if (GST_IS_EVENT (item)) {
2520     GstEvent *event = GST_EVENT_CAST (item);
2521 
2522     *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2523 
2524     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2525         "retrieved event %p from queue", event);
2526 
2527     switch (GST_EVENT_TYPE (event)) {
2528       case GST_EVENT_EOS:
2529         /* queue is empty now that we dequeued the EOS */
2530         GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2531         break;
2532       case GST_EVENT_SEGMENT:
2533         apply_segment (queue, event, &queue->src_segment, FALSE);
2534         break;
2535       case GST_EVENT_GAP:
2536         apply_gap (queue, event, &queue->src_segment, FALSE);
2537         break;
2538       default:
2539         break;
2540     }
2541   } else if (GST_IS_BUFFER_LIST (item)) {
2542     GstBufferList *buffer_list;
2543     guint size;
2544 
2545     buffer_list = GST_BUFFER_LIST_CAST (item);
2546     size = gst_buffer_list_calculate_size (buffer_list);
2547     *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2548 
2549     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2550         "retrieved buffer list %p from queue", buffer_list);
2551 
2552     if (QUEUE_IS_USING_QUEUE (queue)) {
2553       queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2554       queue->cur_level.bytes -= size;
2555     }
2556     queue->bytes_out += size;
2557 
2558     apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2559     /* update the byterate stats */
2560     update_out_rates (queue);
2561     /* update the buffering */
2562     if (queue->use_buffering)
2563       update_buffering (queue);
2564   } else if (GST_IS_QUERY (item)) {
2565     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2566         "retrieved query %p from queue", item);
2567     *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2568   } else {
2569     g_warning
2570         ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2571         item, GST_OBJECT_NAME (queue));
2572     item = NULL;
2573     *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2574   }
2575   GST_QUEUE2_SIGNAL_DEL (queue);
2576 
2577   return item;
2578 
2579   /* ERRORS */
2580 no_item:
2581   {
2582     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2583     return NULL;
2584   }
2585 }
2586 
2587 static GstFlowReturn
gst_queue2_handle_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)2588 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2589     GstEvent * event)
2590 {
2591   gboolean ret = TRUE;
2592   GstQueue2 *queue;
2593 
2594   queue = GST_QUEUE2 (parent);
2595 
2596   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
2597       GST_EVENT_TYPE_NAME (event));
2598 
2599   switch (GST_EVENT_TYPE (event)) {
2600     case GST_EVENT_FLUSH_START:
2601     {
2602       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2603         /* forward event */
2604         ret = gst_pad_push_event (queue->srcpad, event);
2605 
2606         /* now unblock the chain function */
2607         GST_QUEUE2_MUTEX_LOCK (queue);
2608         queue->srcresult = GST_FLOW_FLUSHING;
2609         queue->sinkresult = GST_FLOW_FLUSHING;
2610         /* unblock the loop and chain functions */
2611         GST_QUEUE2_SIGNAL_ADD (queue);
2612         GST_QUEUE2_SIGNAL_DEL (queue);
2613         GST_QUEUE2_MUTEX_UNLOCK (queue);
2614 
2615         /* make sure it pauses, this should happen since we sent
2616          * flush_start downstream. */
2617         gst_pad_pause_task (queue->srcpad);
2618         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2619 
2620         GST_QUEUE2_MUTEX_LOCK (queue);
2621         queue->last_query = FALSE;
2622         g_cond_signal (&queue->query_handled);
2623         GST_QUEUE2_MUTEX_UNLOCK (queue);
2624       } else {
2625         GST_QUEUE2_MUTEX_LOCK (queue);
2626         /* flush the sink pad */
2627         queue->sinkresult = GST_FLOW_FLUSHING;
2628         GST_QUEUE2_SIGNAL_DEL (queue);
2629         queue->last_query = FALSE;
2630         g_cond_signal (&queue->query_handled);
2631         GST_QUEUE2_MUTEX_UNLOCK (queue);
2632 
2633         gst_event_unref (event);
2634       }
2635       break;
2636     }
2637     case GST_EVENT_FLUSH_STOP:
2638     {
2639       if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2640         /* forward event */
2641         ret = gst_pad_push_event (queue->srcpad, event);
2642 
2643         GST_QUEUE2_MUTEX_LOCK (queue);
2644         gst_queue2_locked_flush (queue, FALSE, TRUE);
2645         queue->srcresult = GST_FLOW_OK;
2646         queue->sinkresult = GST_FLOW_OK;
2647         queue->is_eos = FALSE;
2648         queue->unexpected = FALSE;
2649         queue->seeking = FALSE;
2650         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2651         /* reset rate counters */
2652         reset_rate_timer (queue);
2653         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2654             queue->srcpad, NULL);
2655         GST_QUEUE2_MUTEX_UNLOCK (queue);
2656       } else {
2657         GST_QUEUE2_MUTEX_LOCK (queue);
2658         queue->segment_event_received = FALSE;
2659         queue->is_eos = FALSE;
2660         queue->unexpected = FALSE;
2661         queue->sinkresult = GST_FLOW_OK;
2662         queue->seeking = FALSE;
2663         queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2664         GST_QUEUE2_MUTEX_UNLOCK (queue);
2665 
2666         gst_event_unref (event);
2667       }
2668       g_object_notify_by_pspec (G_OBJECT (queue), obj_props[PROP_BITRATE]);
2669       break;
2670     }
2671     case GST_EVENT_TAG:{
2672       if (queue->use_tags_bitrate) {
2673         GstTagList *tags;
2674         guint bitrate;
2675 
2676         gst_event_parse_tag (event, &tags);
2677         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2678             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2679           GST_QUEUE2_MUTEX_LOCK (queue);
2680           queue->sink_tags_bitrate = bitrate;
2681           GST_QUEUE2_MUTEX_UNLOCK (queue);
2682           GST_LOG_OBJECT (queue, "Sink pad bitrate from tags now %u", bitrate);
2683           g_object_notify_by_pspec (G_OBJECT (queue), obj_props[PROP_BITRATE]);
2684         }
2685       }
2686       /* Fall-through */
2687     }
2688     default:
2689       if (GST_EVENT_IS_SERIALIZED (event)) {
2690         gboolean bitrate_changed = TRUE;
2691         /* serialized events go in the queue */
2692 
2693         /* STREAM_START and SEGMENT reset the EOS status of a
2694          * pad. Change the cached sinkpad flow result accordingly */
2695         if (queue->sinkresult == GST_FLOW_EOS
2696             && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
2697                 || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
2698           queue->sinkresult = GST_FLOW_OK;
2699 
2700         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2701         if (queue->srcresult != GST_FLOW_OK) {
2702           /* Errors in sticky event pushing are no problem and ignored here
2703            * as they will cause more meaningful errors during data flow.
2704            * For EOS events, that are not followed by data flow, we still
2705            * return FALSE here though and report an error.
2706            */
2707           if (!GST_EVENT_IS_STICKY (event)) {
2708             goto out_flow_error;
2709           } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2710             if (queue->srcresult == GST_FLOW_NOT_LINKED
2711                 || queue->srcresult < GST_FLOW_EOS) {
2712               GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
2713             }
2714             goto out_flow_error;
2715           }
2716         }
2717 
2718         /* refuse more events on EOS unless they unset the EOS status */
2719         if (queue->is_eos) {
2720           switch (GST_EVENT_TYPE (event)) {
2721             case GST_EVENT_STREAM_START:
2722             case GST_EVENT_SEGMENT:
2723               /* Restart the loop */
2724               if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2725                 queue->srcresult = GST_FLOW_OK;
2726                 queue->is_eos = FALSE;
2727                 queue->unexpected = FALSE;
2728                 queue->seeking = FALSE;
2729                 queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2730                 /* reset rate counters */
2731                 reset_rate_timer (queue);
2732                 gst_pad_start_task (queue->srcpad,
2733                     (GstTaskFunction) gst_queue2_loop, queue->srcpad, NULL);
2734               } else {
2735                 queue->is_eos = FALSE;
2736                 queue->unexpected = FALSE;
2737                 queue->seeking = FALSE;
2738                 queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2739               }
2740               bitrate_changed = TRUE;
2741 
2742               break;
2743             default:
2744               goto out_eos;
2745           }
2746         }
2747 
2748         gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2749         GST_QUEUE2_MUTEX_UNLOCK (queue);
2750         gst_queue2_post_buffering (queue);
2751         if (bitrate_changed)
2752           g_object_notify_by_pspec (G_OBJECT (queue), obj_props[PROP_BITRATE]);
2753       } else {
2754         /* non-serialized events are passed downstream. */
2755         ret = gst_pad_push_event (queue->srcpad, event);
2756       }
2757       break;
2758   }
2759   if (ret == FALSE)
2760     return GST_FLOW_ERROR;
2761   return GST_FLOW_OK;
2762 
2763   /* ERRORS */
2764 out_flushing:
2765   {
2766     GstFlowReturn ret = queue->sinkresult;
2767     GST_DEBUG_OBJECT (queue, "refusing event, we are %s",
2768         gst_flow_get_name (ret));
2769     GST_QUEUE2_MUTEX_UNLOCK (queue);
2770     gst_event_unref (event);
2771     return ret;
2772   }
2773 out_eos:
2774   {
2775     GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2776     GST_QUEUE2_MUTEX_UNLOCK (queue);
2777     gst_event_unref (event);
2778     return GST_FLOW_EOS;
2779   }
2780 out_flow_error:
2781   {
2782     GST_LOG_OBJECT (queue,
2783         "refusing event, we have a downstream flow error: %s",
2784         gst_flow_get_name (queue->srcresult));
2785     GST_QUEUE2_MUTEX_UNLOCK (queue);
2786     gst_event_unref (event);
2787     return queue->srcresult;
2788   }
2789 }
2790 
2791 static gboolean
gst_queue2_handle_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)2792 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2793     GstQuery * query)
2794 {
2795   GstQueue2 *queue;
2796   gboolean res;
2797 
2798   queue = GST_QUEUE2 (parent);
2799 
2800   switch (GST_QUERY_TYPE (query)) {
2801     default:
2802       if (GST_QUERY_IS_SERIALIZED (query)) {
2803         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2804             "received query %" GST_PTR_FORMAT, query);
2805         /* serialized events go in the queue. We need to be certain that we
2806          * don't cause deadlocks waiting for the query return value. We check if
2807          * the queue is empty (nothing is blocking downstream and the query can
2808          * be pushed for sure) or we are not buffering. If we are buffering,
2809          * the pipeline waits to unblock downstream until our queue fills up
2810          * completely, which can not happen if we block on the query..
2811          * Therefore we only potentially block when we are not buffering.
2812          *
2813          * Update: Edward Hervey 2021: Realistically when posting buffering
2814          * messages there are no safe places where we can block and forward a
2815          * serialized query due to the potential of causing deadlocks. We
2816          * therefore refuse any serialized queries in such cases. */
2817         GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2818         if (QUEUE_IS_USING_QUEUE (queue) && !queue->use_buffering) {
2819           if (!g_atomic_int_get (&queue->downstream_may_block)) {
2820             gst_queue2_locked_enqueue (queue, query,
2821                 GST_QUEUE2_ITEM_TYPE_QUERY);
2822 
2823             STATUS (queue, queue->sinkpad, "wait for QUERY");
2824             while (queue->sinkresult == GST_FLOW_OK &&
2825                 queue->last_handled_query != query)
2826               g_cond_wait (&queue->query_handled, &queue->qlock);
2827             queue->last_handled_query = NULL;
2828             if (queue->sinkresult != GST_FLOW_OK)
2829               goto out_flushing;
2830             res = queue->last_query;
2831           } else {
2832             GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2833             res = FALSE;
2834           }
2835         } else {
2836           GST_DEBUG_OBJECT (queue,
2837               "refusing query, we are not using the queue or we are posting buffering messages");
2838           res = FALSE;
2839         }
2840         GST_QUEUE2_MUTEX_UNLOCK (queue);
2841         gst_queue2_post_buffering (queue);
2842       } else {
2843         res = gst_pad_query_default (pad, parent, query);
2844       }
2845       break;
2846   }
2847   return res;
2848 
2849   /* ERRORS */
2850 out_flushing:
2851   {
2852     GST_DEBUG_OBJECT (queue, "refusing query, we are %s",
2853         gst_flow_get_name (queue->sinkresult));
2854     GST_QUEUE2_MUTEX_UNLOCK (queue);
2855     return FALSE;
2856   }
2857 }
2858 
2859 static gboolean
gst_queue2_is_empty(GstQueue2 * queue)2860 gst_queue2_is_empty (GstQueue2 * queue)
2861 {
2862   /* never empty on EOS */
2863   if (queue->is_eos)
2864     return FALSE;
2865 
2866   if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2867     return queue->current->writing_pos <= queue->current->max_reading_pos;
2868   } else {
2869     if (gst_queue_array_get_length (queue->queue) == 0)
2870       return TRUE;
2871   }
2872 
2873   return FALSE;
2874 }
2875 
2876 static gboolean
gst_queue2_is_filled(GstQueue2 * queue)2877 gst_queue2_is_filled (GstQueue2 * queue)
2878 {
2879   gboolean res;
2880 
2881   /* always filled on EOS */
2882   if (queue->is_eos)
2883     return TRUE;
2884 
2885   /* Check the levels if non-null */
2886 #define CHECK_FILLED_REAL(format) \
2887   ((queue->max_level.format) > 0 && (queue->cur_level.format) >= ((queue->max_level.format)))
2888   /* Check the levels if non-null (use the alternative max if non-zero) */
2889 #define CHECK_FILLED_ALT(format,alt_max) ((queue->max_level.format) > 0 && \
2890     (queue->cur_level.format) >= ((alt_max) ? \
2891       MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2892 
2893   /* if using a ring buffer we're filled if all ring buffer space is used
2894    * _by the current range_ */
2895   if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2896     guint64 rb_size = queue->ring_buffer_max_size;
2897     GST_DEBUG_OBJECT (queue,
2898         "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2899         queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2900     return CHECK_FILLED_ALT (bytes, rb_size);
2901   }
2902 
2903   /* if using file, we're never filled if we don't have EOS */
2904   if (QUEUE_IS_USING_TEMP_FILE (queue))
2905     return FALSE;
2906 
2907   /* we are never filled when we have no buffers at all */
2908   if (queue->cur_level.buffers == 0)
2909     return FALSE;
2910 
2911   /* we are filled if one of the current levels exceeds the max */
2912   res = CHECK_FILLED_REAL (buffers) || CHECK_FILLED_REAL (bytes)
2913       || CHECK_FILLED_REAL (time);
2914 
2915   /* if we need to, use the rate estimate to check against the max time we are
2916    * allowed to queue */
2917   if (queue->use_rate_estimate)
2918     res |= CHECK_FILLED_REAL (rate_time);
2919 
2920 #undef CHECK_FILLED_REAL
2921 #undef CHECK_FILLED_ALT
2922   return res;
2923 }
2924 
2925 static GstFlowReturn
gst_queue2_chain_buffer_or_buffer_list(GstQueue2 * queue,GstMiniObject * item,GstQueue2ItemType item_type)2926 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2927     GstMiniObject * item, GstQueue2ItemType item_type)
2928 {
2929   /* we have to lock the queue since we span threads */
2930   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2931   /* when we received EOS, we refuse more data */
2932   if (queue->is_eos)
2933     goto out_eos;
2934   /* when we received unexpected from downstream, refuse more buffers */
2935   if (queue->unexpected)
2936     goto out_unexpected;
2937 
2938   /* while we didn't receive the newsegment, we're seeking and we skip data */
2939   if (queue->seeking)
2940     goto out_seeking;
2941 
2942   if (!gst_queue2_wait_free_space (queue))
2943     goto out_flushing;
2944 
2945   /* put buffer in queue now */
2946   gst_queue2_locked_enqueue (queue, item, item_type);
2947   GST_QUEUE2_MUTEX_UNLOCK (queue);
2948   gst_queue2_post_buffering (queue);
2949 
2950   return GST_FLOW_OK;
2951 
2952   /* special conditions */
2953 out_flushing:
2954   {
2955     GstFlowReturn ret = queue->sinkresult;
2956 
2957     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2958         "exit because task paused, reason: %s", gst_flow_get_name (ret));
2959     GST_QUEUE2_MUTEX_UNLOCK (queue);
2960     gst_mini_object_unref (item);
2961 
2962     return ret;
2963   }
2964 out_eos:
2965   {
2966     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2967     GST_QUEUE2_MUTEX_UNLOCK (queue);
2968     gst_mini_object_unref (item);
2969 
2970     return GST_FLOW_EOS;
2971   }
2972 out_seeking:
2973   {
2974     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2975     GST_QUEUE2_MUTEX_UNLOCK (queue);
2976     gst_mini_object_unref (item);
2977 
2978     return GST_FLOW_OK;
2979   }
2980 out_unexpected:
2981   {
2982     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2983     GST_QUEUE2_MUTEX_UNLOCK (queue);
2984     gst_mini_object_unref (item);
2985 
2986     return GST_FLOW_EOS;
2987   }
2988 }
2989 
2990 static GstFlowReturn
gst_queue2_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)2991 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2992 {
2993   GstQueue2 *queue;
2994 
2995   queue = GST_QUEUE2 (parent);
2996 
2997   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2998       "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2999       GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
3000       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
3001       GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
3002 
3003   return gst_queue2_chain_buffer_or_buffer_list (queue,
3004       GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
3005 }
3006 
3007 static GstFlowReturn
gst_queue2_chain_list(GstPad * pad,GstObject * parent,GstBufferList * buffer_list)3008 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
3009     GstBufferList * buffer_list)
3010 {
3011   GstQueue2 *queue;
3012 
3013   queue = GST_QUEUE2 (parent);
3014 
3015   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3016       "received buffer list %p", buffer_list);
3017 
3018   return gst_queue2_chain_buffer_or_buffer_list (queue,
3019       GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
3020 }
3021 
3022 static GstMiniObject *
gst_queue2_dequeue_on_eos(GstQueue2 * queue,GstQueue2ItemType * item_type)3023 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
3024 {
3025   GstMiniObject *data;
3026 
3027   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
3028 
3029   /* stop pushing buffers, we dequeue all items until we see an item that we
3030    * can push again, which is EOS or SEGMENT. If there is nothing in the
3031    * queue we can push, we set a flag to make the sinkpad refuse more
3032    * buffers with an EOS return value until we receive something
3033    * pushable again or we get flushed. */
3034   while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
3035     if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
3036       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3037           "dropping EOS buffer %p", data);
3038       gst_buffer_unref (GST_BUFFER_CAST (data));
3039     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
3040       GstEvent *event = GST_EVENT_CAST (data);
3041       GstEventType type = GST_EVENT_TYPE (event);
3042 
3043       if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
3044           || type == GST_EVENT_STREAM_START) {
3045         /* we found a pushable item in the queue, push it out */
3046         GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3047             "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
3048         return data;
3049       }
3050       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3051           "dropping EOS event %p", event);
3052       gst_event_unref (event);
3053     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
3054       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3055           "dropping EOS buffer list %p", data);
3056       gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
3057     } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
3058       queue->last_query = FALSE;
3059       g_cond_signal (&queue->query_handled);
3060       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
3061     }
3062   }
3063   /* no more items in the queue. Set the unexpected flag so that upstream
3064    * make us refuse any more buffers on the sinkpad. Since we will still
3065    * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
3066    * task function does not shut down. */
3067   queue->unexpected = TRUE;
3068   return NULL;
3069 }
3070 
3071 /* dequeue an item from the queue an push it downstream. This functions returns
3072  * the result of the push. */
3073 static GstFlowReturn
gst_queue2_push_one(GstQueue2 * queue)3074 gst_queue2_push_one (GstQueue2 * queue)
3075 {
3076   GstFlowReturn result;
3077   GstMiniObject *data;
3078   GstQueue2ItemType item_type;
3079 
3080   data = gst_queue2_locked_dequeue (queue, &item_type);
3081   if (data == NULL)
3082     goto no_item;
3083 
3084 next:
3085   result = queue->srcresult;
3086   STATUS (queue, queue->srcpad, "We have something dequeud");
3087   g_atomic_int_set (&queue->downstream_may_block,
3088       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
3089       item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
3090   GST_QUEUE2_MUTEX_UNLOCK (queue);
3091   gst_queue2_post_buffering (queue);
3092 
3093   if (gst_pad_check_reconfigure (queue->srcpad)) {
3094     /* If the pad was reconfigured, do a new bitrate query */
3095     query_downstream_bitrate (queue);
3096   }
3097 
3098   if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
3099     GstBuffer *buffer;
3100 
3101     buffer = GST_BUFFER_CAST (data);
3102 
3103     result = gst_pad_push (queue->srcpad, buffer);
3104     g_atomic_int_set (&queue->downstream_may_block, 0);
3105 
3106     /* need to check for srcresult here as well */
3107     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3108     if (result == GST_FLOW_EOS) {
3109       data = gst_queue2_dequeue_on_eos (queue, &item_type);
3110       if (data != NULL)
3111         goto next;
3112       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
3113        * to the caller so that the task function does not shut down */
3114       result = GST_FLOW_OK;
3115     }
3116   } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
3117     GstEvent *event = GST_EVENT_CAST (data);
3118     GstEventType type = GST_EVENT_TYPE (event);
3119 
3120     if (type == GST_EVENT_TAG) {
3121       if (queue->use_tags_bitrate) {
3122         GstTagList *tags;
3123         guint bitrate;
3124 
3125         gst_event_parse_tag (event, &tags);
3126         if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
3127             gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
3128           GST_QUEUE2_MUTEX_LOCK (queue);
3129           queue->src_tags_bitrate = bitrate;
3130           GST_QUEUE2_MUTEX_UNLOCK (queue);
3131           GST_LOG_OBJECT (queue, "src pad bitrate from tags now %u", bitrate);
3132           g_object_notify_by_pspec (G_OBJECT (queue), obj_props[PROP_BITRATE]);
3133         }
3134       }
3135     }
3136 
3137     gst_pad_push_event (queue->srcpad, event);
3138 
3139     /* if we're EOS, return EOS so that the task pauses. */
3140     if (type == GST_EVENT_EOS) {
3141       GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3142           "pushed EOS event %p, return EOS", event);
3143       result = GST_FLOW_EOS;
3144     }
3145 
3146     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3147   } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
3148     GstBufferList *buffer_list;
3149 
3150     buffer_list = GST_BUFFER_LIST_CAST (data);
3151 
3152     result = gst_pad_push_list (queue->srcpad, buffer_list);
3153     g_atomic_int_set (&queue->downstream_may_block, 0);
3154 
3155     /* need to check for srcresult here as well */
3156     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3157     if (result == GST_FLOW_EOS) {
3158       data = gst_queue2_dequeue_on_eos (queue, &item_type);
3159       if (data != NULL)
3160         goto next;
3161       /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
3162        * to the caller so that the task function does not shut down */
3163       result = GST_FLOW_OK;
3164     }
3165   } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
3166     GstQuery *query = GST_QUERY_CAST (data);
3167 
3168     GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
3169     queue->last_handled_query = query;
3170     queue->last_query = gst_pad_peer_query (queue->srcpad, query);
3171     GST_LOG_OBJECT (queue->srcpad, "Peered query");
3172     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3173         "did query %p, return %d", query, queue->last_query);
3174     g_cond_signal (&queue->query_handled);
3175     GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3176     result = GST_FLOW_OK;
3177   }
3178   return result;
3179 
3180   /* ERRORS */
3181 no_item:
3182   {
3183     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3184         "exit because we have no item in the queue");
3185     return GST_FLOW_ERROR;
3186   }
3187 out_flushing:
3188   {
3189     GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are %s",
3190         gst_flow_get_name (queue->srcresult));
3191     return queue->srcresult;
3192   }
3193 }
3194 
3195 /* called repeatedly with @pad as the source pad. This function should push out
3196  * data to the peer element. */
3197 static void
gst_queue2_loop(GstPad * pad)3198 gst_queue2_loop (GstPad * pad)
3199 {
3200   GstQueue2 *queue;
3201   GstFlowReturn ret;
3202 
3203   queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
3204 
3205   /* have to lock for thread-safety */
3206   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3207 
3208   if (gst_queue2_is_empty (queue)) {
3209     gboolean started;
3210 
3211     /* pause the timer while we wait. The fact that we are waiting does not mean
3212      * the byterate on the output pad is lower */
3213     if ((started = queue->out_timer_started))
3214       g_timer_stop (queue->out_timer);
3215 
3216     GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
3217         "queue is empty, waiting for new data");
3218     do {
3219       /* Wait for data to be available, we could be unlocked because of a flush. */
3220       GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
3221     }
3222     while (gst_queue2_is_empty (queue));
3223 
3224     /* and continue if we were running before */
3225     if (started)
3226       g_timer_continue (queue->out_timer);
3227   }
3228   ret = gst_queue2_push_one (queue);
3229   queue->srcresult = ret;
3230   queue->sinkresult = ret;
3231   if (ret != GST_FLOW_OK)
3232     goto out_flushing;
3233 
3234   GST_QUEUE2_MUTEX_UNLOCK (queue);
3235   gst_queue2_post_buffering (queue);
3236 
3237   return;
3238 
3239   /* ERRORS */
3240 out_flushing:
3241   {
3242     gboolean eos = queue->is_eos;
3243     GstFlowReturn ret = queue->srcresult;
3244 
3245     gst_pad_pause_task (queue->srcpad);
3246     if (ret == GST_FLOW_FLUSHING) {
3247       gst_queue2_locked_flush (queue, FALSE, FALSE);
3248     } else {
3249       GST_QUEUE2_SIGNAL_DEL (queue);
3250       queue->last_query = FALSE;
3251       g_cond_signal (&queue->query_handled);
3252     }
3253     GST_QUEUE2_MUTEX_UNLOCK (queue);
3254     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3255         "pause task, reason:  %s", gst_flow_get_name (queue->srcresult));
3256     /* Recalculate buffering levels before stopping since the source flow
3257      * might cause a different buffering level (like NOT_LINKED making
3258      * the queue appear as full) */
3259     if (queue->use_buffering)
3260       update_buffering (queue);
3261     gst_queue2_post_buffering (queue);
3262     /* let app know about us giving up if upstream is not expected to do so */
3263     /* EOS is already taken care of elsewhere */
3264     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
3265       GST_ELEMENT_FLOW_ERROR (queue, ret);
3266       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
3267     }
3268     return;
3269   }
3270 }
3271 
3272 static gboolean
gst_queue2_handle_src_event(GstPad * pad,GstObject * parent,GstEvent * event)3273 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
3274 {
3275   gboolean res = TRUE;
3276   GstQueue2 *queue = GST_QUEUE2 (parent);
3277 
3278 #ifndef GST_DISABLE_GST_DEBUG
3279   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
3280       event, GST_EVENT_TYPE_NAME (event));
3281 #endif
3282 
3283   switch (GST_EVENT_TYPE (event)) {
3284     case GST_EVENT_FLUSH_START:
3285       if (QUEUE_IS_USING_QUEUE (queue)) {
3286         /* just forward upstream */
3287         res = gst_pad_push_event (queue->sinkpad, event);
3288       } else {
3289         /* now unblock the getrange function */
3290         GST_QUEUE2_MUTEX_LOCK (queue);
3291         GST_DEBUG_OBJECT (queue, "flushing");
3292         queue->srcresult = GST_FLOW_FLUSHING;
3293         GST_QUEUE2_SIGNAL_ADD (queue);
3294         GST_QUEUE2_MUTEX_UNLOCK (queue);
3295 
3296         /* when using a temp file, we eat the event */
3297         res = TRUE;
3298         gst_event_unref (event);
3299       }
3300       break;
3301     case GST_EVENT_FLUSH_STOP:
3302       if (QUEUE_IS_USING_QUEUE (queue)) {
3303         /* just forward upstream */
3304         res = gst_pad_push_event (queue->sinkpad, event);
3305       } else {
3306         /* now unblock the getrange function */
3307         GST_QUEUE2_MUTEX_LOCK (queue);
3308         queue->srcresult = GST_FLOW_OK;
3309         GST_QUEUE2_MUTEX_UNLOCK (queue);
3310 
3311         /* when using a temp file, we eat the event */
3312         res = TRUE;
3313         gst_event_unref (event);
3314       }
3315       break;
3316     case GST_EVENT_RECONFIGURE:
3317       GST_QUEUE2_MUTEX_LOCK (queue);
3318       /* assume downstream is linked now and try to push again */
3319       if (queue->srcresult == GST_FLOW_NOT_LINKED) {
3320         /* Mark the pad as needing reconfiguration, and
3321          * the loop will re-query downstream bitrate
3322          */
3323         gst_pad_mark_reconfigure (pad);
3324         queue->srcresult = GST_FLOW_OK;
3325         queue->sinkresult = GST_FLOW_OK;
3326         if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
3327           gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
3328               NULL);
3329         }
3330 
3331       }
3332       GST_QUEUE2_MUTEX_UNLOCK (queue);
3333 
3334       res = gst_pad_push_event (queue->sinkpad, event);
3335       break;
3336     default:
3337       res = gst_pad_push_event (queue->sinkpad, event);
3338       break;
3339   }
3340 
3341   return res;
3342 }
3343 
3344 static gboolean
gst_queue2_handle_src_query(GstPad * pad,GstObject * parent,GstQuery * query)3345 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
3346 {
3347   GstQueue2 *queue;
3348 
3349   queue = GST_QUEUE2 (parent);
3350 
3351   switch (GST_QUERY_TYPE (query)) {
3352     case GST_QUERY_POSITION:
3353     {
3354       gint64 peer_pos;
3355       GstFormat format;
3356 
3357       if (!gst_pad_peer_query (queue->sinkpad, query))
3358         goto peer_failed;
3359 
3360       /* get peer position */
3361       gst_query_parse_position (query, &format, &peer_pos);
3362 
3363       /* FIXME: this code assumes that there's no discont in the queue */
3364       switch (format) {
3365         case GST_FORMAT_BYTES:
3366           peer_pos -= queue->cur_level.bytes;
3367           if (peer_pos < 0)     /* Clamp result to 0 */
3368             peer_pos = 0;
3369           break;
3370         case GST_FORMAT_TIME:
3371           peer_pos -= queue->cur_level.time;
3372           if (peer_pos < 0)     /* Clamp result to 0 */
3373             peer_pos = 0;
3374           break;
3375         default:
3376           GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
3377               "know how to adjust value", gst_format_get_name (format));
3378           return FALSE;
3379       }
3380       /* set updated position */
3381       gst_query_set_position (query, format, peer_pos);
3382       break;
3383     }
3384     case GST_QUERY_DURATION:
3385     {
3386       GST_DEBUG_OBJECT (queue, "doing peer query");
3387 
3388       if (!gst_pad_peer_query (queue->sinkpad, query))
3389         goto peer_failed;
3390 
3391       GST_DEBUG_OBJECT (queue, "peer query success");
3392       break;
3393     }
3394     case GST_QUERY_BUFFERING:
3395     {
3396       gint percent;
3397       gboolean is_buffering;
3398       GstBufferingMode mode;
3399       gint avg_in, avg_out;
3400       gint64 buffering_left;
3401 
3402       GST_DEBUG_OBJECT (queue, "query buffering");
3403 
3404       get_buffering_level (queue, &is_buffering, &percent);
3405       percent = convert_to_buffering_percent (queue, percent);
3406       gst_query_set_buffering_percent (query, is_buffering, percent);
3407 
3408       get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3409           &buffering_left);
3410       gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3411           buffering_left);
3412 
3413       if (!QUEUE_IS_USING_QUEUE (queue)) {
3414         /* add ranges for download and ringbuffer buffering */
3415         GstFormat format;
3416         gint64 start, stop, range_start, range_stop;
3417         guint64 writing_pos;
3418         gint64 estimated_total;
3419         gint64 duration;
3420         gboolean peer_res, is_eos;
3421         GstQueue2Range *queued_ranges;
3422 
3423         /* we need a current download region */
3424         if (queue->current == NULL)
3425           return FALSE;
3426 
3427         writing_pos = queue->current->writing_pos;
3428         is_eos = queue->is_eos;
3429 
3430         if (is_eos) {
3431           /* we're EOS, we know the duration in bytes now */
3432           peer_res = TRUE;
3433           duration = writing_pos;
3434         } else {
3435           /* get duration of upstream in bytes */
3436           peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3437               GST_FORMAT_BYTES, &duration);
3438         }
3439 
3440         GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3441             ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3442 
3443         /* calculate remaining and total download time */
3444         if (peer_res && avg_in > 0.0)
3445           estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3446         else
3447           estimated_total = -1;
3448 
3449         GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3450             estimated_total);
3451 
3452         gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3453 
3454         switch (format) {
3455           case GST_FORMAT_PERCENT:
3456             /* we need duration */
3457             if (!peer_res)
3458               goto peer_failed;
3459 
3460             start = 0;
3461             /* get our available data relative to the duration */
3462             if (duration != -1)
3463               stop =
3464                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3465                   duration);
3466             else
3467               stop = -1;
3468             break;
3469           case GST_FORMAT_BYTES:
3470             start = 0;
3471             stop = writing_pos;
3472             break;
3473           default:
3474             start = -1;
3475             stop = -1;
3476             break;
3477         }
3478 
3479         /* fill out the buffered ranges */
3480         for (queued_ranges = queue->ranges; queued_ranges;
3481             queued_ranges = queued_ranges->next) {
3482           switch (format) {
3483             case GST_FORMAT_PERCENT:
3484               if (duration == -1) {
3485                 range_start = 0;
3486                 range_stop = 0;
3487                 break;
3488               }
3489               range_start =
3490                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3491                   queued_ranges->offset, duration);
3492               range_stop =
3493                   gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3494                   queued_ranges->writing_pos, duration);
3495               break;
3496             case GST_FORMAT_BYTES:
3497               range_start = queued_ranges->offset;
3498               range_stop = queued_ranges->writing_pos;
3499               break;
3500             default:
3501               range_start = -1;
3502               range_stop = -1;
3503               break;
3504           }
3505           if (range_start == range_stop)
3506             continue;
3507           GST_DEBUG_OBJECT (queue,
3508               "range starting at %" G_GINT64_FORMAT " and finishing at %"
3509               G_GINT64_FORMAT, range_start, range_stop);
3510           gst_query_add_buffering_range (query, range_start, range_stop);
3511         }
3512 
3513         gst_query_set_buffering_range (query, format, start, stop,
3514             estimated_total);
3515       }
3516       break;
3517     }
3518     case GST_QUERY_SCHEDULING:
3519     {
3520       gboolean pull_mode;
3521       GstSchedulingFlags flags = 0;
3522       GstQuery *upstream;
3523 
3524       upstream = gst_query_new_scheduling ();
3525       if (!gst_pad_peer_query (queue->sinkpad, upstream)) {
3526         gst_query_unref (upstream);
3527         goto peer_failed;
3528       }
3529 
3530       gst_query_parse_scheduling (upstream, &flags, NULL, NULL, NULL);
3531       gst_query_unref (upstream);
3532 
3533       /* we can operate in pull mode when we are using a tempfile */
3534       pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3535 
3536       if (pull_mode)
3537         flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3538       gst_query_set_scheduling (query, flags, 0, -1, 0);
3539       if (pull_mode)
3540         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3541       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3542       break;
3543     }
3544     default:
3545       /* peer handled other queries */
3546       if (!gst_pad_query_default (pad, parent, query))
3547         goto peer_failed;
3548       break;
3549   }
3550 
3551   return TRUE;
3552 
3553   /* ERRORS */
3554 peer_failed:
3555   {
3556     GST_DEBUG_OBJECT (queue, "failed peer query");
3557     return FALSE;
3558   }
3559 }
3560 
3561 static gboolean
gst_queue2_handle_query(GstElement * element,GstQuery * query)3562 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3563 {
3564   GstQueue2 *queue = GST_QUEUE2 (element);
3565 
3566   /* simply forward to the srcpad query function */
3567   return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3568       query);
3569 }
3570 
3571 static void
gst_queue2_update_upstream_size(GstQueue2 * queue)3572 gst_queue2_update_upstream_size (GstQueue2 * queue)
3573 {
3574   gint64 upstream_size = -1;
3575 
3576   if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3577           &upstream_size)) {
3578     GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3579 
3580     /* upstream_size can be negative but queue->upstream_size is unsigned.
3581      * Prevent setting negative values to it (the query can return -1) */
3582     if (upstream_size >= 0)
3583       queue->upstream_size = upstream_size;
3584     else
3585       queue->upstream_size = 0;
3586   }
3587 }
3588 
3589 static GstFlowReturn
gst_queue2_get_range(GstPad * pad,GstObject * parent,guint64 offset,guint length,GstBuffer ** buffer)3590 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3591     guint length, GstBuffer ** buffer)
3592 {
3593   GstQueue2 *queue;
3594   GstFlowReturn ret;
3595 
3596   queue = GST_QUEUE2_CAST (parent);
3597 
3598   length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3599   GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3600   offset = (offset == -1) ? queue->current->reading_pos : offset;
3601 
3602   GST_DEBUG_OBJECT (queue,
3603       "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3604 
3605   /* catch any reads beyond the size of the file here to make sure queue2
3606    * doesn't send seek events beyond the size of the file upstream, since
3607    * that would confuse elements such as souphttpsrc and/or http servers.
3608    * Demuxers often just loop until EOS at the end of the file to figure out
3609    * when they've read all the end-headers or index chunks. */
3610   if (G_UNLIKELY (offset >= queue->upstream_size)) {
3611     gst_queue2_update_upstream_size (queue);
3612     if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3613       goto out_unexpected;
3614   }
3615 
3616   if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3617     gst_queue2_update_upstream_size (queue);
3618     if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3619       length = queue->upstream_size - offset;
3620       GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3621     }
3622   }
3623 
3624   /* FIXME - function will block when the range is not yet available */
3625   ret = gst_queue2_create_read (queue, offset, length, buffer);
3626   GST_QUEUE2_MUTEX_UNLOCK (queue);
3627   gst_queue2_post_buffering (queue);
3628 
3629   return ret;
3630 
3631   /* ERRORS */
3632 out_flushing:
3633   {
3634     ret = queue->srcresult;
3635 
3636     GST_DEBUG_OBJECT (queue, "we are %s", gst_flow_get_name (ret));
3637     GST_QUEUE2_MUTEX_UNLOCK (queue);
3638     return ret;
3639   }
3640 out_unexpected:
3641   {
3642     GST_DEBUG_OBJECT (queue, "read beyond end of file");
3643     GST_QUEUE2_MUTEX_UNLOCK (queue);
3644     return GST_FLOW_EOS;
3645   }
3646 }
3647 
3648 /* sink currently only operates in push mode */
3649 static gboolean
gst_queue2_sink_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)3650 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3651     GstPadMode mode, gboolean active)
3652 {
3653   gboolean result;
3654   GstQueue2 *queue;
3655 
3656   queue = GST_QUEUE2 (parent);
3657 
3658   switch (mode) {
3659     case GST_PAD_MODE_PUSH:
3660       if (active) {
3661         GST_QUEUE2_MUTEX_LOCK (queue);
3662         GST_DEBUG_OBJECT (queue, "activating push mode");
3663         queue->srcresult = GST_FLOW_OK;
3664         queue->sinkresult = GST_FLOW_OK;
3665         queue->is_eos = FALSE;
3666         queue->unexpected = FALSE;
3667         reset_rate_timer (queue);
3668         GST_QUEUE2_MUTEX_UNLOCK (queue);
3669       } else {
3670         /* unblock chain function */
3671         GST_QUEUE2_MUTEX_LOCK (queue);
3672         GST_DEBUG_OBJECT (queue, "deactivating push mode");
3673         queue->srcresult = GST_FLOW_FLUSHING;
3674         queue->sinkresult = GST_FLOW_FLUSHING;
3675         GST_QUEUE2_SIGNAL_DEL (queue);
3676         GST_QUEUE2_MUTEX_UNLOCK (queue);
3677 
3678         /* wait until it is unblocked and clean up */
3679         GST_PAD_STREAM_LOCK (pad);
3680         GST_QUEUE2_MUTEX_LOCK (queue);
3681         gst_queue2_locked_flush (queue, TRUE, FALSE);
3682         GST_QUEUE2_MUTEX_UNLOCK (queue);
3683         GST_PAD_STREAM_UNLOCK (pad);
3684       }
3685       result = TRUE;
3686       break;
3687     default:
3688       result = FALSE;
3689       break;
3690   }
3691   return result;
3692 }
3693 
3694 /* src operating in push mode, we start a task on the source pad that pushes out
3695  * buffers from the queue */
3696 static gboolean
gst_queue2_src_activate_push(GstPad * pad,GstObject * parent,gboolean active)3697 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3698 {
3699   gboolean result = FALSE;
3700   GstQueue2 *queue;
3701 
3702   queue = GST_QUEUE2 (parent);
3703 
3704   if (active) {
3705     GST_QUEUE2_MUTEX_LOCK (queue);
3706     GST_DEBUG_OBJECT (queue, "activating push mode");
3707     queue->srcresult = GST_FLOW_OK;
3708     queue->sinkresult = GST_FLOW_OK;
3709     queue->is_eos = FALSE;
3710     queue->unexpected = FALSE;
3711     result =
3712         gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3713     GST_QUEUE2_MUTEX_UNLOCK (queue);
3714   } else {
3715     /* unblock loop function */
3716     GST_QUEUE2_MUTEX_LOCK (queue);
3717     GST_DEBUG_OBJECT (queue, "deactivating push mode");
3718     queue->srcresult = GST_FLOW_FLUSHING;
3719     queue->sinkresult = GST_FLOW_FLUSHING;
3720     /* the item add signal will unblock */
3721     GST_QUEUE2_SIGNAL_ADD (queue);
3722     GST_QUEUE2_MUTEX_UNLOCK (queue);
3723 
3724     /* step 2, make sure streaming finishes */
3725     result = gst_pad_stop_task (pad);
3726 
3727     GST_QUEUE2_MUTEX_LOCK (queue);
3728     gst_queue2_locked_flush (queue, FALSE, FALSE);
3729     GST_QUEUE2_MUTEX_UNLOCK (queue);
3730   }
3731 
3732   return result;
3733 }
3734 
3735 /* pull mode, downstream will call our getrange function */
3736 static gboolean
gst_queue2_src_activate_pull(GstPad * pad,GstObject * parent,gboolean active)3737 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3738 {
3739   gboolean result;
3740   GstQueue2 *queue;
3741 
3742   queue = GST_QUEUE2 (parent);
3743 
3744   if (active) {
3745     GST_QUEUE2_MUTEX_LOCK (queue);
3746     if (!QUEUE_IS_USING_QUEUE (queue)) {
3747       if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3748         /* open the temp file now */
3749         result = gst_queue2_open_temp_location_file (queue);
3750       } else if (!queue->ring_buffer) {
3751         queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3752         result = ! !queue->ring_buffer;
3753       } else {
3754         result = TRUE;
3755       }
3756 
3757       GST_DEBUG_OBJECT (queue, "activating pull mode");
3758       init_ranges (queue);
3759       queue->srcresult = GST_FLOW_OK;
3760       queue->sinkresult = GST_FLOW_OK;
3761       queue->is_eos = FALSE;
3762       queue->unexpected = FALSE;
3763       queue->upstream_size = 0;
3764     } else {
3765       GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3766       /* this is not allowed, we cannot operate in pull mode without a temp
3767        * file. */
3768       queue->srcresult = GST_FLOW_FLUSHING;
3769       queue->sinkresult = GST_FLOW_FLUSHING;
3770       result = FALSE;
3771     }
3772     GST_QUEUE2_MUTEX_UNLOCK (queue);
3773   } else {
3774     GST_QUEUE2_MUTEX_LOCK (queue);
3775     GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3776     queue->srcresult = GST_FLOW_FLUSHING;
3777     queue->sinkresult = GST_FLOW_FLUSHING;
3778     /* this will unlock getrange */
3779     GST_QUEUE2_SIGNAL_ADD (queue);
3780     result = TRUE;
3781     GST_QUEUE2_MUTEX_UNLOCK (queue);
3782   }
3783 
3784   return result;
3785 }
3786 
3787 static gboolean
gst_queue2_src_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)3788 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3789     gboolean active)
3790 {
3791   gboolean res;
3792 
3793   switch (mode) {
3794     case GST_PAD_MODE_PULL:
3795       res = gst_queue2_src_activate_pull (pad, parent, active);
3796       break;
3797     case GST_PAD_MODE_PUSH:
3798       res = gst_queue2_src_activate_push (pad, parent, active);
3799       break;
3800     default:
3801       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3802       res = FALSE;
3803       break;
3804   }
3805   return res;
3806 }
3807 
3808 static GstStateChangeReturn
gst_queue2_change_state(GstElement * element,GstStateChange transition)3809 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3810 {
3811   GstQueue2 *queue;
3812   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3813 
3814   queue = GST_QUEUE2 (element);
3815 
3816   switch (transition) {
3817     case GST_STATE_CHANGE_NULL_TO_READY:
3818       break;
3819     case GST_STATE_CHANGE_READY_TO_PAUSED:
3820       GST_QUEUE2_MUTEX_LOCK (queue);
3821       if (!QUEUE_IS_USING_QUEUE (queue)) {
3822         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3823           if (!gst_queue2_open_temp_location_file (queue))
3824             ret = GST_STATE_CHANGE_FAILURE;
3825         } else {
3826           if (queue->ring_buffer) {
3827             g_free (queue->ring_buffer);
3828             queue->ring_buffer = NULL;
3829           }
3830           if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3831             ret = GST_STATE_CHANGE_FAILURE;
3832         }
3833         init_ranges (queue);
3834       }
3835       queue->segment_event_received = FALSE;
3836       queue->starting_segment = NULL;
3837       gst_event_replace (&queue->stream_start_event, NULL);
3838       GST_QUEUE2_MUTEX_UNLOCK (queue);
3839 
3840       /* Mark the srcpad as reconfigured to trigger querying
3841        * the downstream bitrate next time it tries to push */
3842       gst_pad_mark_reconfigure (queue->srcpad);
3843       break;
3844     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3845       break;
3846     default:
3847       break;
3848   }
3849 
3850   if (ret == GST_STATE_CHANGE_FAILURE)
3851     return ret;
3852 
3853   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3854 
3855   if (ret == GST_STATE_CHANGE_FAILURE)
3856     return ret;
3857 
3858   switch (transition) {
3859     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3860       break;
3861     case GST_STATE_CHANGE_PAUSED_TO_READY:
3862       GST_QUEUE2_MUTEX_LOCK (queue);
3863       if (!QUEUE_IS_USING_QUEUE (queue)) {
3864         if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3865           gst_queue2_close_temp_location_file (queue);
3866         } else if (queue->ring_buffer) {
3867           g_free (queue->ring_buffer);
3868           queue->ring_buffer = NULL;
3869         }
3870         clean_ranges (queue);
3871       }
3872       if (queue->starting_segment != NULL) {
3873         gst_event_unref (queue->starting_segment);
3874         queue->starting_segment = NULL;
3875       }
3876       gst_event_replace (&queue->stream_start_event, NULL);
3877       GST_QUEUE2_MUTEX_UNLOCK (queue);
3878       break;
3879     case GST_STATE_CHANGE_READY_TO_NULL:
3880       break;
3881     default:
3882       break;
3883   }
3884 
3885   return ret;
3886 }
3887 
3888 /* changing the capacity of the queue must wake up
3889  * the _chain function, it might have more room now
3890  * to store the buffer/event in the queue */
3891 #define QUEUE_CAPACITY_CHANGE(q) \
3892   GST_QUEUE2_SIGNAL_DEL (queue); \
3893   if (queue->use_buffering)      \
3894     update_buffering (queue);
3895 
3896 /* Changing the minimum required fill level must
3897  * wake up the _loop function as it might now
3898  * be able to preceed.
3899  */
3900 #define QUEUE_THRESHOLD_CHANGE(q)\
3901   GST_QUEUE2_SIGNAL_ADD (queue);
3902 
3903 static void
gst_queue2_set_temp_template(GstQueue2 * queue,const gchar * template)3904 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3905 {
3906   GstState state;
3907 
3908   /* the element must be stopped in order to do this */
3909   GST_OBJECT_LOCK (queue);
3910   state = GST_STATE (queue);
3911   if (state != GST_STATE_READY && state != GST_STATE_NULL)
3912     goto wrong_state;
3913   GST_OBJECT_UNLOCK (queue);
3914 
3915   /* set new location */
3916   g_free (queue->temp_template);
3917   queue->temp_template = g_strdup (template);
3918 
3919   return;
3920 
3921 /* ERROR */
3922 wrong_state:
3923   {
3924     GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3925     GST_OBJECT_UNLOCK (queue);
3926   }
3927 }
3928 
3929 static void
gst_queue2_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)3930 gst_queue2_set_property (GObject * object,
3931     guint prop_id, const GValue * value, GParamSpec * pspec)
3932 {
3933   GstQueue2 *queue = GST_QUEUE2 (object);
3934 
3935   /* someone could change levels here, and since this
3936    * affects the get/put funcs, we need to lock for safety. */
3937   GST_QUEUE2_MUTEX_LOCK (queue);
3938 
3939   switch (prop_id) {
3940     case PROP_MAX_SIZE_BYTES:
3941       queue->max_level.bytes = g_value_get_uint (value);
3942       QUEUE_CAPACITY_CHANGE (queue);
3943       break;
3944     case PROP_MAX_SIZE_BUFFERS:
3945       queue->max_level.buffers = g_value_get_uint (value);
3946       QUEUE_CAPACITY_CHANGE (queue);
3947       break;
3948     case PROP_MAX_SIZE_TIME:
3949       queue->max_level.time = g_value_get_uint64 (value);
3950       /* set rate_time to the same value. We use an extra field in the level
3951        * structure so that we can easily access and compare it */
3952       queue->max_level.rate_time = queue->max_level.time;
3953       QUEUE_CAPACITY_CHANGE (queue);
3954       break;
3955     case PROP_USE_BUFFERING:
3956       queue->use_buffering = g_value_get_boolean (value);
3957       if (!queue->use_buffering && queue->is_buffering) {
3958         GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3959             "posting 100%% message");
3960         SET_PERCENT (queue, 100);
3961         queue->is_buffering = FALSE;
3962       }
3963 
3964       if (queue->use_buffering) {
3965         queue->is_buffering = TRUE;
3966         update_buffering (queue);
3967       }
3968       break;
3969     case PROP_USE_TAGS_BITRATE:
3970       queue->use_tags_bitrate = g_value_get_boolean (value);
3971       break;
3972     case PROP_USE_RATE_ESTIMATE:
3973       queue->use_rate_estimate = g_value_get_boolean (value);
3974       break;
3975     case PROP_LOW_PERCENT:
3976       queue->low_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
3977       if (queue->is_buffering)
3978         update_buffering (queue);
3979       break;
3980     case PROP_HIGH_PERCENT:
3981       queue->high_watermark =
3982           g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
3983       if (queue->is_buffering)
3984         update_buffering (queue);
3985       break;
3986     case PROP_LOW_WATERMARK:
3987       queue->low_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
3988       if (queue->is_buffering)
3989         update_buffering (queue);
3990       break;
3991     case PROP_HIGH_WATERMARK:
3992       queue->high_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
3993       if (queue->is_buffering)
3994         update_buffering (queue);
3995       break;
3996     case PROP_TEMP_TEMPLATE:
3997       gst_queue2_set_temp_template (queue, g_value_get_string (value));
3998       break;
3999     case PROP_TEMP_REMOVE:
4000       queue->temp_remove = g_value_get_boolean (value);
4001       break;
4002     case PROP_RING_BUFFER_MAX_SIZE:
4003       queue->ring_buffer_max_size = g_value_get_uint64 (value);
4004       break;
4005     case PROP_USE_BITRATE_QUERY:
4006       queue->use_bitrate_query = g_value_get_boolean (value);
4007       break;
4008     default:
4009       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4010       break;
4011   }
4012 
4013   GST_QUEUE2_MUTEX_UNLOCK (queue);
4014   gst_queue2_post_buffering (queue);
4015 }
4016 
4017 static void
gst_queue2_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)4018 gst_queue2_get_property (GObject * object,
4019     guint prop_id, GValue * value, GParamSpec * pspec)
4020 {
4021   GstQueue2 *queue = GST_QUEUE2 (object);
4022 
4023   GST_QUEUE2_MUTEX_LOCK (queue);
4024 
4025   switch (prop_id) {
4026     case PROP_CUR_LEVEL_BYTES:
4027       g_value_set_uint (value, queue->cur_level.bytes);
4028       break;
4029     case PROP_CUR_LEVEL_BUFFERS:
4030       g_value_set_uint (value, queue->cur_level.buffers);
4031       break;
4032     case PROP_CUR_LEVEL_TIME:
4033       g_value_set_uint64 (value, queue->cur_level.time);
4034       break;
4035     case PROP_MAX_SIZE_BYTES:
4036       g_value_set_uint (value, queue->max_level.bytes);
4037       break;
4038     case PROP_MAX_SIZE_BUFFERS:
4039       g_value_set_uint (value, queue->max_level.buffers);
4040       break;
4041     case PROP_MAX_SIZE_TIME:
4042       g_value_set_uint64 (value, queue->max_level.time);
4043       break;
4044     case PROP_USE_BUFFERING:
4045       g_value_set_boolean (value, queue->use_buffering);
4046       break;
4047     case PROP_USE_TAGS_BITRATE:
4048       g_value_set_boolean (value, queue->use_tags_bitrate);
4049       break;
4050     case PROP_USE_RATE_ESTIMATE:
4051       g_value_set_boolean (value, queue->use_rate_estimate);
4052       break;
4053     case PROP_LOW_PERCENT:
4054       g_value_set_int (value, queue->low_watermark / BUF_LEVEL_PERCENT_FACTOR);
4055       break;
4056     case PROP_HIGH_PERCENT:
4057       g_value_set_int (value, queue->high_watermark / BUF_LEVEL_PERCENT_FACTOR);
4058       break;
4059     case PROP_LOW_WATERMARK:
4060       g_value_set_double (value, queue->low_watermark /
4061           (gdouble) MAX_BUFFERING_LEVEL);
4062       break;
4063     case PROP_HIGH_WATERMARK:
4064       g_value_set_double (value, queue->high_watermark /
4065           (gdouble) MAX_BUFFERING_LEVEL);
4066       break;
4067     case PROP_TEMP_TEMPLATE:
4068       g_value_set_string (value, queue->temp_template);
4069       break;
4070     case PROP_TEMP_LOCATION:
4071       g_value_set_string (value, queue->temp_location);
4072       break;
4073     case PROP_TEMP_REMOVE:
4074       g_value_set_boolean (value, queue->temp_remove);
4075       break;
4076     case PROP_RING_BUFFER_MAX_SIZE:
4077       g_value_set_uint64 (value, queue->ring_buffer_max_size);
4078       break;
4079     case PROP_AVG_IN_RATE:
4080     {
4081       gdouble in_rate = queue->byte_in_rate;
4082 
4083       /* During the first RATE_INTERVAL, byte_in_rate will not have been
4084        * calculated, so calculate it here. */
4085       if (in_rate == 0.0 && queue->bytes_in
4086           && queue->last_update_in_rates_elapsed > 0.0)
4087         in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
4088 
4089       g_value_set_int64 (value, (gint64) in_rate);
4090       break;
4091     }
4092     case PROP_USE_BITRATE_QUERY:
4093       g_value_set_boolean (value, queue->use_bitrate_query);
4094       break;
4095     case PROP_BITRATE:{
4096       guint64 bitrate = 0;
4097       if (bitrate == 0 && queue->use_tags_bitrate) {
4098         if (queue->sink_tags_bitrate > 0)
4099           bitrate = queue->sink_tags_bitrate;
4100         else if (queue->src_tags_bitrate)
4101           bitrate = queue->src_tags_bitrate;
4102       }
4103       if (bitrate == 0 && queue->use_bitrate_query) {
4104         bitrate = queue->downstream_bitrate;
4105       }
4106       g_value_set_uint64 (value, (guint64) bitrate);
4107       break;
4108     }
4109     default:
4110       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4111       break;
4112   }
4113 
4114   GST_QUEUE2_MUTEX_UNLOCK (queue);
4115 }
4116