1 /* GStreamer
2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2003 Colin Walters <cwalters@gnome.org>
4 * 2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5 * 2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6 * SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7 *
8 * gstqueue2.c:
9 *
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
19 *
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
24 */
25
26 /**
27 * SECTION:element-queue2
28 * @title: queue2
29 *
30 * Data is queued until one of the limits specified by the
31 * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
32 * #GstQueue2:max-size-time properties has been reached. Any attempt to push
33 * more buffers into the queue will block the pushing thread until more space
34 * becomes available.
35 *
36 * The queue will create a new thread on the source pad to decouple the
37 * processing on sink and source pad.
38 *
39 * You can query how many buffers are queued by reading the
40 * #GstQueue2:current-level-buffers property.
41 *
42 * The default queue size limits are 100 buffers, 2MB of data, or
43 * two seconds worth of data, whichever is reached first.
44 *
45 * If you set temp-template to a value such as /tmp/gstreamer-XXXXXX, the element
46 * will allocate a random free filename and buffer data in the file.
47 * By using this, it will buffer the entire stream data on the file independently
48 * of the queue size limits, they will only be used for buffering statistics.
49 *
50 * The temp-location property will be used to notify the application of the
51 * allocated filename.
52 *
53 * If the #GstQueue2:use-buffering property is set to TRUE, and any writable
54 * property is modified, #GstQueue2 will attempt to post a buffering message
55 * if the changes to the properties also cause the buffering percentage to be
56 * changed (for example, because the queue's capacity was changed and it already
57 * contains some data).
58 */
59
60 #ifdef HAVE_CONFIG_H
61 #include "config.h"
62 #endif
63
64 #include "gstqueue2.h"
65 #include "gstcoreelementselements.h"
66
67 #include <glib/gstdio.h>
68
69 #include "gst/gst-i18n-lib.h"
70 #include "gst/glib-compat-private.h"
71
72 #include <string.h>
73
74 #ifdef G_OS_WIN32
75 #include <io.h> /* lseek, open, close, read */
76 #undef lseek
77 #define lseek _lseeki64
78 #undef off_t
79 #define off_t guint64
80 #else
81 #include <unistd.h>
82 #endif
83
84 #ifdef __BIONIC__ /* Android */
85 #include <fcntl.h>
86 #endif
87
88 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
89 GST_PAD_SINK,
90 GST_PAD_ALWAYS,
91 GST_STATIC_CAPS_ANY);
92
93 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
94 GST_PAD_SRC,
95 GST_PAD_ALWAYS,
96 GST_STATIC_CAPS_ANY);
97
98 GST_DEBUG_CATEGORY_STATIC (queue_debug);
99 #define GST_CAT_DEFAULT (queue_debug)
100 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
101
102 enum
103 {
104 LAST_SIGNAL
105 };
106
107 /* other defines */
108 #define DEFAULT_BUFFER_SIZE 4096
109 #define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_template != NULL)
110 #define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0) /* for consistency with the above macro */
111 #define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
112
113 #define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)
114
115 /* default property values */
116 #define DEFAULT_MAX_SIZE_BUFFERS 100 /* 100 buffers */
117 #define DEFAULT_MAX_SIZE_BYTES (2 * 1024 * 1024) /* 2 MB */
118 #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND /* 2 seconds */
119 #define DEFAULT_USE_BUFFERING FALSE
120 #define DEFAULT_USE_TAGS_BITRATE FALSE
121 #define DEFAULT_USE_RATE_ESTIMATE TRUE
122 #define DEFAULT_LOW_PERCENT 10
123 #define DEFAULT_HIGH_PERCENT 99
124 #define DEFAULT_LOW_WATERMARK 0.01
125 #define DEFAULT_HIGH_WATERMARK 0.99
126 #define DEFAULT_TEMP_REMOVE TRUE
127 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
128 #define DEFAULT_USE_BITRATE_QUERY TRUE
129
130 enum
131 {
132 PROP_0,
133 PROP_CUR_LEVEL_BUFFERS,
134 PROP_CUR_LEVEL_BYTES,
135 PROP_CUR_LEVEL_TIME,
136 PROP_MAX_SIZE_BUFFERS,
137 PROP_MAX_SIZE_BYTES,
138 PROP_MAX_SIZE_TIME,
139 PROP_USE_BUFFERING,
140 PROP_USE_TAGS_BITRATE,
141 PROP_USE_RATE_ESTIMATE,
142 PROP_LOW_PERCENT,
143 PROP_HIGH_PERCENT,
144 PROP_LOW_WATERMARK,
145 PROP_HIGH_WATERMARK,
146 PROP_TEMP_TEMPLATE,
147 PROP_TEMP_LOCATION,
148 PROP_TEMP_REMOVE,
149 PROP_RING_BUFFER_MAX_SIZE,
150 PROP_AVG_IN_RATE,
151 PROP_USE_BITRATE_QUERY,
152 PROP_BITRATE,
153 PROP_LAST
154 };
155 static GParamSpec *obj_props[PROP_LAST] = { NULL, };
156
157 /* Explanation for buffer levels and percentages:
158 *
159 * The buffering_level functions here return a value in a normalized range
160 * that specifies the queue's current fill level. The range goes from 0 to
161 * MAX_BUFFERING_LEVEL. The low/high watermarks also use this same range.
162 *
163 * This is not to be confused with the buffering_percent value, which is
164 * a *relative* quantity - relative to the low/high watermarks.
165 * buffering_percent = 0% means buffering_level is at the low watermark.
166 * buffering_percent = 100% means buffering_level is at the high watermark.
167 * buffering_percent is used for determining if the fill level has reached
168 * the high watermark, and for producing BUFFERING messages. This value
169 * always uses a 0..100 range (since it is a percentage).
170 *
171 * To avoid future confusions, whenever "buffering level" is mentioned, it
172 * refers to the absolute level which is in the 0..MAX_BUFFERING_LEVEL
173 * range. Whenever "buffering_percent" is mentioned, it refers to the
174 * percentage value that is relative to the low/high watermark. */
175
176 /* Using a buffering level range of 0..1000000 to allow for a
177 * resolution in ppm (1 ppm = 0.0001%) */
178 #define MAX_BUFFERING_LEVEL 1000000
179
180 /* How much 1% makes up in the buffer level range */
181 #define BUF_LEVEL_PERCENT_FACTOR ((MAX_BUFFERING_LEVEL) / 100)
182
183 #define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START { \
184 l.buffers = 0; \
185 l.bytes = 0; \
186 l.time = 0; \
187 l.rate_time = 0; \
188 } G_STMT_END
189
190 #define STATUS(queue, pad, msg) \
191 GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
192 "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
193 "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
194 " ns, %"G_GUINT64_FORMAT" items", \
195 GST_DEBUG_PAD_NAME (pad), \
196 queue->cur_level.buffers, \
197 queue->max_level.buffers, \
198 queue->cur_level.bytes, \
199 queue->max_level.bytes, \
200 queue->cur_level.time, \
201 queue->max_level.time, \
202 (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
203 queue->current->writing_pos - queue->current->max_reading_pos : \
204 gst_queue_array_get_length(queue->queue)))
205
206 #define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START { \
207 g_mutex_lock (&q->qlock); \
208 } G_STMT_END
209
210 #define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START { \
211 GST_QUEUE2_MUTEX_LOCK (q); \
212 if (res != GST_FLOW_OK) \
213 goto label; \
214 } G_STMT_END
215
216 #define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START { \
217 g_mutex_unlock (&q->qlock); \
218 } G_STMT_END
219
220 #define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START { \
221 STATUS (queue, q->sinkpad, "wait for DEL"); \
222 q->waiting_del = TRUE; \
223 g_cond_wait (&q->item_del, &queue->qlock); \
224 q->waiting_del = FALSE; \
225 if (res != GST_FLOW_OK) { \
226 STATUS (queue, q->srcpad, "received DEL wakeup"); \
227 goto label; \
228 } \
229 STATUS (queue, q->sinkpad, "received DEL"); \
230 } G_STMT_END
231
232 #define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START { \
233 STATUS (queue, q->srcpad, "wait for ADD"); \
234 q->waiting_add = TRUE; \
235 g_cond_wait (&q->item_add, &q->qlock); \
236 q->waiting_add = FALSE; \
237 if (res != GST_FLOW_OK) { \
238 STATUS (queue, q->srcpad, "received ADD wakeup"); \
239 goto label; \
240 } \
241 STATUS (queue, q->srcpad, "received ADD"); \
242 } G_STMT_END
243
244 #define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START { \
245 if (q->waiting_del) { \
246 STATUS (q, q->srcpad, "signal DEL"); \
247 g_cond_signal (&q->item_del); \
248 } \
249 } G_STMT_END
250
251 #define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START { \
252 if (q->waiting_add) { \
253 STATUS (q, q->sinkpad, "signal ADD"); \
254 g_cond_signal (&q->item_add); \
255 } \
256 } G_STMT_END
257
258 #define SET_PERCENT(q, perc) G_STMT_START { \
259 if (perc != q->buffering_percent) { \
260 q->buffering_percent = perc; \
261 q->percent_changed = TRUE; \
262 GST_DEBUG_OBJECT (q, "buffering %d percent", perc); \
263 get_buffering_stats (q, perc, &q->mode, &q->avg_in, &q->avg_out, \
264 &q->buffering_left); \
265 } \
266 } G_STMT_END
267
268 #define _do_init \
269 GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
270 GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
271 "dataflow inside the queue element");
272 #define gst_queue2_parent_class parent_class
273 G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
274 GST_ELEMENT_REGISTER_DEFINE (queue2, "queue2", GST_RANK_NONE, GST_TYPE_QUEUE2);
275
276 static void gst_queue2_finalize (GObject * object);
277
278 static void gst_queue2_set_property (GObject * object,
279 guint prop_id, const GValue * value, GParamSpec * pspec);
280 static void gst_queue2_get_property (GObject * object,
281 guint prop_id, GValue * value, GParamSpec * pspec);
282
283 static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
284 GstBuffer * buffer);
285 static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
286 GstBufferList * buffer_list);
287 static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
288 static void gst_queue2_loop (GstPad * pad);
289
290 static GstFlowReturn gst_queue2_handle_sink_event (GstPad * pad,
291 GstObject * parent, GstEvent * event);
292 static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
293 GstQuery * query);
294
295 static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
296 GstEvent * event);
297 static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
298 GstQuery * query);
299 static gboolean gst_queue2_handle_query (GstElement * element,
300 GstQuery * query);
301
302 static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
303 guint64 offset, guint length, GstBuffer ** buffer);
304
305 static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
306 GstPadMode mode, gboolean active);
307 static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
308 GstPadMode mode, gboolean active);
309 static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
310 GstStateChange transition);
311
312 static gboolean gst_queue2_is_empty (GstQueue2 * queue);
313 static gboolean gst_queue2_is_filled (GstQueue2 * queue);
314
315 static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);
316 static void update_in_rates (GstQueue2 * queue, gboolean force);
317 static GstMessage *gst_queue2_get_buffering_message (GstQueue2 * queue,
318 gint * percent);
319 static void update_buffering (GstQueue2 * queue);
320 static void gst_queue2_post_buffering (GstQueue2 * queue);
321
322 typedef enum
323 {
324 GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
325 GST_QUEUE2_ITEM_TYPE_BUFFER,
326 GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
327 GST_QUEUE2_ITEM_TYPE_EVENT,
328 GST_QUEUE2_ITEM_TYPE_QUERY
329 } GstQueue2ItemType;
330
331 typedef struct
332 {
333 GstQueue2ItemType type;
334 GstMiniObject *item;
335 } GstQueue2Item;
336
337 /* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
338
339 static void
gst_queue2_class_init(GstQueue2Class * klass)340 gst_queue2_class_init (GstQueue2Class * klass)
341 {
342 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
343 GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
344
345 gobject_class->set_property = gst_queue2_set_property;
346 gobject_class->get_property = gst_queue2_get_property;
347
348 /* properties */
349 obj_props[PROP_CUR_LEVEL_BYTES] = g_param_spec_uint ("current-level-bytes",
350 "Current level (kB)", "Current amount of data in the queue (bytes)",
351 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
352 obj_props[PROP_CUR_LEVEL_BUFFERS] =
353 g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
354 "Current number of buffers in the queue",
355 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
356 obj_props[PROP_CUR_LEVEL_TIME] = g_param_spec_uint64 ("current-level-time",
357 "Current level (ns)", "Current amount of data in the queue (in ns)",
358 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
359
360 obj_props[PROP_MAX_SIZE_BYTES] = g_param_spec_uint ("max-size-bytes",
361 "Max. size (kB)", "Max. amount of data in the queue (bytes, 0=disable)",
362 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
363 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS);
364 obj_props[PROP_MAX_SIZE_BUFFERS] = g_param_spec_uint ("max-size-buffers",
365 "Max. size (buffers)", "Max. number of buffers in the queue (0=disable)",
366 0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS,
367 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS);
368 obj_props[PROP_MAX_SIZE_TIME] = g_param_spec_uint64 ("max-size-time",
369 "Max. size (ns)", "Max. amount of data in the queue (in ns, 0=disable)",
370 0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME,
371 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS);
372
373 obj_props[PROP_USE_BUFFERING] = g_param_spec_boolean ("use-buffering",
374 "Use buffering",
375 "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds "
376 "(0% = low-watermark, 100% = high-watermark)",
377 DEFAULT_USE_BUFFERING,
378 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS);
379 obj_props[PROP_USE_TAGS_BITRATE] = g_param_spec_boolean ("use-tags-bitrate",
380 "Use bitrate from tags",
381 "Use a bitrate from upstream tags to estimate buffer duration if not provided",
382 DEFAULT_USE_TAGS_BITRATE,
383 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS);
384 obj_props[PROP_USE_RATE_ESTIMATE] = g_param_spec_boolean ("use-rate-estimate",
385 "Use Rate Estimate",
386 "Estimate the bitrate of the stream to calculate time level",
387 DEFAULT_USE_RATE_ESTIMATE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
388 obj_props[PROP_LOW_PERCENT] = g_param_spec_int ("low-percent", "Low percent",
389 "Low threshold for buffering to start. Only used if use-buffering is True "
390 "(Deprecated: use low-watermark instead)",
391 0, 100, DEFAULT_LOW_WATERMARK * 100,
392 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
393 obj_props[PROP_HIGH_PERCENT] = g_param_spec_int ("high-percent",
394 "High percent",
395 "High threshold for buffering to finish. Only used if use-buffering is True "
396 "(Deprecated: use high-watermark instead)",
397 0, 100, DEFAULT_HIGH_WATERMARK * 100,
398 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
399 obj_props[PROP_LOW_WATERMARK] = g_param_spec_double ("low-watermark",
400 "Low watermark",
401 "Low threshold for buffering to start. Only used if use-buffering is True",
402 0.0, 1.0, DEFAULT_LOW_WATERMARK,
403 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
404 obj_props[PROP_HIGH_WATERMARK] = g_param_spec_double ("high-watermark",
405 "High watermark",
406 "High threshold for buffering to finish. Only used if use-buffering is True",
407 0.0, 1.0, DEFAULT_HIGH_WATERMARK,
408 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
409
410 obj_props[PROP_TEMP_TEMPLATE] = g_param_spec_string ("temp-template",
411 "Temporary File Template",
412 "File template to store temporary files in, should contain directory "
413 "and XXXXXX. (NULL == disabled)",
414 NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
415
416 obj_props[PROP_TEMP_LOCATION] = g_param_spec_string ("temp-location",
417 "Temporary File Location",
418 "Location to store temporary files in (Only read this property, "
419 "use temp-template to configure the name template)",
420 NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
421
422 obj_props[PROP_USE_BITRATE_QUERY] = g_param_spec_boolean ("use-bitrate-query",
423 "Use bitrate from downstream query",
424 "Use a bitrate from a downstream query to estimate buffer duration if not provided",
425 DEFAULT_USE_BITRATE_QUERY,
426 G_PARAM_READWRITE | GST_PARAM_MUTABLE_PLAYING | G_PARAM_STATIC_STRINGS);
427
428 /**
429 * GstQueue2:temp-remove
430 *
431 * When temp-template is set, remove the temporary file when going to READY.
432 */
433 obj_props[PROP_TEMP_REMOVE] = g_param_spec_boolean ("temp-remove",
434 "Remove the Temporary File", "Remove the temp-location after use",
435 DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
436
437 /**
438 * GstQueue2:ring-buffer-max-size
439 *
440 * The maximum size of the ring buffer in bytes. If set to 0, the ring
441 * buffer is disabled. Default 0.
442 */
443 obj_props[PROP_RING_BUFFER_MAX_SIZE] =
444 g_param_spec_uint64 ("ring-buffer-max-size",
445 "Max. ring buffer size (bytes)",
446 "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
447 0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
448 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
449
450 /**
451 * GstQueue2:avg-in-rate
452 *
453 * The average input data rate.
454 */
455 obj_props[PROP_AVG_IN_RATE] = g_param_spec_int64 ("avg-in-rate",
456 "Input data rate (bytes/s)", "Average input data rate (bytes/s)",
457 0, G_MAXINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
458
459 /**
460 * GstQueue2:bitrate
461 *
462 * The value used to convert between byte and time values for limiting
463 * the size of the queue. Values are taken from either the upstream tags
464 * or from the downstream bitrate query.
465 */
466 obj_props[PROP_BITRATE] = g_param_spec_uint64 ("bitrate", "Bitrate (bits/s)",
467 "Conversion value between data size and time",
468 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
469
470 g_object_class_install_properties (gobject_class, PROP_LAST, obj_props);
471
472 /* set several parent class virtual functions */
473 gobject_class->finalize = gst_queue2_finalize;
474
475 gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
476 gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
477
478 gst_element_class_set_static_metadata (gstelement_class, "Queue 2",
479 "Generic",
480 "Simple data queue",
481 "Erik Walthinsen <omega@cse.ogi.edu>, "
482 "Wim Taymans <wim.taymans@gmail.com>");
483
484 gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
485 gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
486 }
487
488 static void
gst_queue2_init(GstQueue2 * queue)489 gst_queue2_init (GstQueue2 * queue)
490 {
491 queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
492
493 gst_pad_set_chain_function (queue->sinkpad,
494 GST_DEBUG_FUNCPTR (gst_queue2_chain));
495 gst_pad_set_chain_list_function (queue->sinkpad,
496 GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
497 gst_pad_set_activatemode_function (queue->sinkpad,
498 GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
499 gst_pad_set_event_full_function (queue->sinkpad,
500 GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
501 gst_pad_set_query_function (queue->sinkpad,
502 GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
503 GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
504 gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
505
506 queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
507
508 gst_pad_set_activatemode_function (queue->srcpad,
509 GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
510 gst_pad_set_getrange_function (queue->srcpad,
511 GST_DEBUG_FUNCPTR (gst_queue2_get_range));
512 gst_pad_set_event_function (queue->srcpad,
513 GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
514 gst_pad_set_query_function (queue->srcpad,
515 GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
516 GST_PAD_SET_PROXY_CAPS (queue->srcpad);
517 gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
518
519 /* levels */
520 GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
521 queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
522 queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
523 queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
524 queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
525 queue->use_buffering = DEFAULT_USE_BUFFERING;
526 queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
527 queue->low_watermark = DEFAULT_LOW_WATERMARK * MAX_BUFFERING_LEVEL;
528 queue->high_watermark = DEFAULT_HIGH_WATERMARK * MAX_BUFFERING_LEVEL;
529
530 gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
531 gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
532
533 queue->sinktime = GST_CLOCK_TIME_NONE;
534 queue->srctime = GST_CLOCK_TIME_NONE;
535 queue->sink_tainted = TRUE;
536 queue->src_tainted = TRUE;
537
538 queue->srcresult = GST_FLOW_FLUSHING;
539 queue->sinkresult = GST_FLOW_FLUSHING;
540 queue->is_eos = FALSE;
541 queue->in_timer = g_timer_new ();
542 queue->out_timer = g_timer_new ();
543
544 g_mutex_init (&queue->qlock);
545 queue->waiting_add = FALSE;
546 g_cond_init (&queue->item_add);
547 queue->waiting_del = FALSE;
548 g_cond_init (&queue->item_del);
549 queue->queue = gst_queue_array_new_for_struct (sizeof (GstQueue2Item), 32);
550
551 g_cond_init (&queue->query_handled);
552 queue->last_query = FALSE;
553
554 g_mutex_init (&queue->buffering_post_lock);
555 queue->buffering_percent = 100;
556 queue->last_posted_buffering_percent = -1;
557
558 /* tempfile related */
559 queue->temp_template = NULL;
560 queue->temp_location = NULL;
561 queue->temp_remove = DEFAULT_TEMP_REMOVE;
562
563 queue->ring_buffer = NULL;
564 queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
565
566 queue->use_bitrate_query = DEFAULT_USE_BITRATE_QUERY;
567
568 GST_DEBUG_OBJECT (queue,
569 "initialized queue's not_empty & not_full conditions");
570 }
571
572 /* called only once, as opposed to dispose */
573 static void
gst_queue2_finalize(GObject * object)574 gst_queue2_finalize (GObject * object)
575 {
576 GstQueue2 *queue = GST_QUEUE2 (object);
577 GstQueue2Item *qitem;
578
579 GST_DEBUG_OBJECT (queue, "finalizing queue");
580
581 while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
582 if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
583 gst_mini_object_unref (qitem->item);
584 }
585 gst_queue_array_free (queue->queue);
586
587 queue->last_query = FALSE;
588 g_mutex_clear (&queue->qlock);
589 g_mutex_clear (&queue->buffering_post_lock);
590 g_cond_clear (&queue->item_add);
591 g_cond_clear (&queue->item_del);
592 g_cond_clear (&queue->query_handled);
593 g_timer_destroy (queue->in_timer);
594 g_timer_destroy (queue->out_timer);
595
596 /* temp_file path cleanup */
597 g_free (queue->temp_template);
598 g_free (queue->temp_location);
599
600 G_OBJECT_CLASS (parent_class)->finalize (object);
601 }
602
603 static void
debug_ranges(GstQueue2 * queue)604 debug_ranges (GstQueue2 * queue)
605 {
606 GstQueue2Range *walk;
607
608 for (walk = queue->ranges; walk; walk = walk->next) {
609 GST_DEBUG_OBJECT (queue,
610 "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
611 G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
612 " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
613 walk->rb_writing_pos, walk->reading_pos,
614 walk == queue->current ? "**y**" : " n ");
615 }
616 }
617
618 /* clear all the downloaded ranges */
619 static void
clean_ranges(GstQueue2 * queue)620 clean_ranges (GstQueue2 * queue)
621 {
622 GST_DEBUG_OBJECT (queue, "clean queue ranges");
623
624 g_slice_free_chain (GstQueue2Range, queue->ranges, next);
625 queue->ranges = NULL;
626 queue->current = NULL;
627 }
628
629 /* find a range that contains @offset or NULL when nothing does */
630 static GstQueue2Range *
find_range(GstQueue2 * queue,guint64 offset)631 find_range (GstQueue2 * queue, guint64 offset)
632 {
633 GstQueue2Range *range = NULL;
634 GstQueue2Range *walk;
635
636 /* first do a quick check for the current range */
637 for (walk = queue->ranges; walk; walk = walk->next) {
638 if (offset >= walk->offset && offset <= walk->writing_pos) {
639 /* we can reuse an existing range */
640 range = walk;
641 break;
642 }
643 }
644 if (range) {
645 GST_DEBUG_OBJECT (queue,
646 "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
647 G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
648 } else {
649 GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
650 }
651 return range;
652 }
653
654 static void
update_cur_level(GstQueue2 * queue,GstQueue2Range * range)655 update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
656 {
657 guint64 max_reading_pos, writing_pos;
658
659 writing_pos = range->writing_pos;
660 max_reading_pos = range->max_reading_pos;
661
662 if (writing_pos > max_reading_pos)
663 queue->cur_level.bytes = writing_pos - max_reading_pos;
664 else
665 queue->cur_level.bytes = 0;
666 }
667
668 /* make a new range for @offset or reuse an existing range */
669 static GstQueue2Range *
add_range(GstQueue2 * queue,guint64 offset,gboolean update_existing)670 add_range (GstQueue2 * queue, guint64 offset, gboolean update_existing)
671 {
672 GstQueue2Range *range, *prev, *next;
673
674 GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);
675
676 if ((range = find_range (queue, offset))) {
677 GST_DEBUG_OBJECT (queue,
678 "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
679 range->writing_pos);
680 if (update_existing && range->writing_pos != offset) {
681 GST_DEBUG_OBJECT (queue, "updating range writing position to "
682 "%" G_GUINT64_FORMAT, offset);
683 range->writing_pos = offset;
684 }
685 } else {
686 GST_DEBUG_OBJECT (queue,
687 "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);
688
689 range = g_slice_new0 (GstQueue2Range);
690 range->offset = offset;
691 /* we want to write to the next location in the ring buffer */
692 range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
693 range->writing_pos = offset;
694 range->rb_writing_pos = range->rb_offset;
695 range->reading_pos = offset;
696 range->max_reading_pos = offset;
697
698 /* insert sorted */
699 prev = NULL;
700 next = queue->ranges;
701 while (next) {
702 if (next->offset > offset) {
703 /* insert before next */
704 GST_DEBUG_OBJECT (queue,
705 "insert before range %p, offset %" G_GUINT64_FORMAT, next,
706 next->offset);
707 break;
708 }
709 /* try next */
710 prev = next;
711 next = next->next;
712 }
713 range->next = next;
714 if (prev)
715 prev->next = range;
716 else
717 queue->ranges = range;
718 }
719 debug_ranges (queue);
720
721 /* update the stats for this range */
722 update_cur_level (queue, range);
723
724 return range;
725 }
726
727
728 /* clear and init the download ranges for offset 0 */
729 static void
init_ranges(GstQueue2 * queue)730 init_ranges (GstQueue2 * queue)
731 {
732 GST_DEBUG_OBJECT (queue, "init queue ranges");
733
734 /* get rid of all the current ranges */
735 clean_ranges (queue);
736 /* make a range for offset 0 */
737 queue->current = add_range (queue, 0, TRUE);
738 }
739
740 /* calculate the diff between running time on the sink and src of the queue.
741 * This is the total amount of time in the queue. */
742 static void
update_time_level(GstQueue2 * queue)743 update_time_level (GstQueue2 * queue)
744 {
745 if (queue->sink_tainted) {
746 queue->sinktime =
747 gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
748 queue->sink_segment.position);
749 queue->sink_tainted = FALSE;
750 }
751
752 if (queue->src_tainted) {
753 queue->srctime =
754 gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
755 queue->src_segment.position);
756 queue->src_tainted = FALSE;
757 }
758
759 GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
760 GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
761
762 if (queue->sinktime != GST_CLOCK_TIME_NONE
763 && queue->srctime != GST_CLOCK_TIME_NONE
764 && queue->sinktime >= queue->srctime)
765 queue->cur_level.time = queue->sinktime - queue->srctime;
766 else
767 queue->cur_level.time = 0;
768 }
769
770 /* take a SEGMENT event and apply the values to segment, updating the time
771 * level of queue. */
772 static void
apply_segment(GstQueue2 * queue,GstEvent * event,GstSegment * segment,gboolean is_sink)773 apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
774 gboolean is_sink)
775 {
776 gst_event_copy_segment (event, segment);
777
778 if (segment->format == GST_FORMAT_BYTES) {
779 if (!QUEUE_IS_USING_QUEUE (queue) && is_sink) {
780 /* start is where we'll be getting from and as such writing next */
781 queue->current = add_range (queue, segment->start, TRUE);
782 }
783 }
784
785 /* now configure the values, we use these to track timestamps on the
786 * sinkpad. */
787 if (segment->format != GST_FORMAT_TIME) {
788 /* non-time format, pretend the current time segment is closed with a
789 * 0 start and unknown stop time. */
790 segment->format = GST_FORMAT_TIME;
791 segment->start = 0;
792 segment->stop = -1;
793 segment->time = 0;
794 }
795
796 GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
797
798 if (is_sink)
799 queue->sink_tainted = TRUE;
800 else
801 queue->src_tainted = TRUE;
802
803 /* segment can update the time level of the queue */
804 update_time_level (queue);
805 }
806
807 static void
apply_gap(GstQueue2 * queue,GstEvent * event,GstSegment * segment,gboolean is_sink)808 apply_gap (GstQueue2 * queue, GstEvent * event,
809 GstSegment * segment, gboolean is_sink)
810 {
811 GstClockTime timestamp;
812 GstClockTime duration;
813
814 gst_event_parse_gap (event, ×tamp, &duration);
815
816 if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
817
818 if (GST_CLOCK_TIME_IS_VALID (duration)) {
819 timestamp += duration;
820 }
821
822 segment->position = timestamp;
823
824 if (is_sink)
825 queue->sink_tainted = TRUE;
826 else
827 queue->src_tainted = TRUE;
828
829 /* calc diff with other end */
830 update_time_level (queue);
831 }
832 }
833
834 static void
query_downstream_bitrate(GstQueue2 * queue)835 query_downstream_bitrate (GstQueue2 * queue)
836 {
837 GstQuery *query = gst_query_new_bitrate ();
838 guint downstream_bitrate = 0;
839 gboolean changed;
840
841 if (gst_pad_peer_query (queue->srcpad, query)) {
842 gst_query_parse_bitrate (query, &downstream_bitrate);
843 GST_DEBUG_OBJECT (queue, "Got bitrate of %u from downstream",
844 downstream_bitrate);
845 } else {
846 GST_DEBUG_OBJECT (queue, "Failed to query bitrate from downstream");
847 }
848
849 gst_query_unref (query);
850
851 GST_QUEUE2_MUTEX_LOCK (queue);
852 changed = queue->downstream_bitrate != downstream_bitrate;
853 queue->downstream_bitrate = downstream_bitrate;
854 GST_QUEUE2_MUTEX_UNLOCK (queue);
855
856 if (changed) {
857 if (queue->use_buffering)
858 update_buffering (queue);
859 gst_queue2_post_buffering (queue);
860
861 g_object_notify_by_pspec (G_OBJECT (queue), obj_props[PROP_BITRATE]);
862 }
863 }
864
865 /* take a buffer and update segment, updating the time level of the queue. */
866 static void
apply_buffer(GstQueue2 * queue,GstBuffer * buffer,GstSegment * segment,guint64 size,gboolean is_sink)867 apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
868 guint64 size, gboolean is_sink)
869 {
870 GstClockTime duration, timestamp;
871
872 timestamp = GST_BUFFER_DTS_OR_PTS (buffer);
873 duration = GST_BUFFER_DURATION (buffer);
874
875 /* If we have no duration, pick one from the bitrate if we can */
876 if (duration == GST_CLOCK_TIME_NONE) {
877 if (queue->use_tags_bitrate) {
878 guint bitrate =
879 is_sink ? queue->sink_tags_bitrate : queue->src_tags_bitrate;
880 if (bitrate)
881 duration = gst_util_uint64_scale (size, 8 * GST_SECOND, bitrate);
882 }
883 if (duration == GST_CLOCK_TIME_NONE && !is_sink && queue->use_bitrate_query) {
884 if (queue->downstream_bitrate > 0) {
885 duration =
886 gst_util_uint64_scale (size, 8 * GST_SECOND,
887 queue->downstream_bitrate);
888
889 GST_LOG_OBJECT (queue, "got bitrate %u resulting in estimated "
890 "duration %" GST_TIME_FORMAT, queue->downstream_bitrate,
891 GST_TIME_ARGS (duration));
892 }
893 }
894 }
895
896 /* if no timestamp is set, assume it's continuous with the previous
897 * time */
898 if (timestamp == GST_CLOCK_TIME_NONE)
899 timestamp = segment->position;
900
901 /* add duration */
902 if (duration != GST_CLOCK_TIME_NONE)
903 timestamp += duration;
904
905 GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
906 GST_TIME_ARGS (timestamp));
907
908 segment->position = timestamp;
909
910 if (is_sink)
911 queue->sink_tainted = TRUE;
912 else
913 queue->src_tainted = TRUE;
914
915 /* calc diff with other end */
916 update_time_level (queue);
917 }
918
919 struct BufListData
920 {
921 GstClockTime timestamp;
922 guint bitrate;
923 };
924
925 static gboolean
buffer_list_apply_time(GstBuffer ** buf,guint idx,gpointer data)926 buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
927 {
928 struct BufListData *bld = data;
929 GstClockTime *timestamp = &bld->timestamp;
930 GstClockTime btime;
931
932 GST_TRACE ("buffer %u has pts %" GST_TIME_FORMAT " dts %" GST_TIME_FORMAT
933 " duration %" GST_TIME_FORMAT, idx,
934 GST_TIME_ARGS (GST_BUFFER_PTS (*buf)),
935 GST_TIME_ARGS (GST_BUFFER_DTS (*buf)),
936 GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));
937
938 btime = GST_BUFFER_DTS_OR_PTS (*buf);
939 if (GST_CLOCK_TIME_IS_VALID (btime))
940 *timestamp = btime;
941
942 if (GST_BUFFER_DURATION_IS_VALID (*buf))
943 *timestamp += GST_BUFFER_DURATION (*buf);
944 else if (bld->bitrate != 0) {
945 guint64 size = gst_buffer_get_size (*buf);
946
947 /* If we have no duration, pick one from the bitrate if we can */
948 *timestamp += gst_util_uint64_scale (bld->bitrate, 8 * GST_SECOND, size);
949 }
950
951
952 GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
953 return TRUE;
954 }
955
956 /* take a buffer list and update segment, updating the time level of the queue */
957 static void
apply_buffer_list(GstQueue2 * queue,GstBufferList * buffer_list,GstSegment * segment,gboolean is_sink)958 apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
959 GstSegment * segment, gboolean is_sink)
960 {
961 struct BufListData bld;
962
963 /* if no timestamp is set, assume it's continuous with the previous time */
964 bld.timestamp = segment->position;
965
966 bld.bitrate = 0;
967 if (queue->use_tags_bitrate) {
968 if (is_sink)
969 bld.bitrate = queue->sink_tags_bitrate;
970 else
971 bld.bitrate = queue->src_tags_bitrate;
972 }
973 if (!is_sink && bld.bitrate == 0 && queue->use_bitrate_query) {
974 bld.bitrate = queue->downstream_bitrate;
975 }
976
977 gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &bld);
978
979 GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
980 GST_TIME_ARGS (bld.timestamp));
981
982 segment->position = bld.timestamp;
983
984 if (is_sink)
985 queue->sink_tainted = TRUE;
986 else
987 queue->src_tainted = TRUE;
988
989 /* calc diff with other end */
990 update_time_level (queue);
991 }
992
993 static inline gint
normalize_to_buffering_level(guint64 cur_level,guint64 max_level,guint64 alt_max)994 normalize_to_buffering_level (guint64 cur_level, guint64 max_level,
995 guint64 alt_max)
996 {
997 guint64 p;
998
999 if (max_level == 0)
1000 return 0;
1001
1002 if (alt_max > 0)
1003 p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL,
1004 MIN (max_level, alt_max));
1005 else
1006 p = gst_util_uint64_scale (cur_level, MAX_BUFFERING_LEVEL, max_level);
1007
1008 return MIN (p, MAX_BUFFERING_LEVEL);
1009 }
1010
1011 static gboolean
get_buffering_level(GstQueue2 * queue,gboolean * is_buffering,gint * buffering_level)1012 get_buffering_level (GstQueue2 * queue, gboolean * is_buffering,
1013 gint * buffering_level)
1014 {
1015 gint buflevel, buflevel2;
1016
1017 if (queue->high_watermark <= 0) {
1018 if (buffering_level)
1019 *buffering_level = MAX_BUFFERING_LEVEL;
1020 if (is_buffering)
1021 *is_buffering = FALSE;
1022 return FALSE;
1023 }
1024 #define GET_BUFFER_LEVEL_FOR_QUANTITY(format,alt_max) \
1025 normalize_to_buffering_level (queue->cur_level.format,queue->max_level.format,(alt_max))
1026
1027 if (queue->is_eos || queue->srcresult == GST_FLOW_NOT_LINKED) {
1028 /* on EOS and NOT_LINKED we are always 100% full, we set the var
1029 * here so that we can reuse the logic below to stop buffering */
1030 buflevel = MAX_BUFFERING_LEVEL;
1031 GST_LOG_OBJECT (queue, "we are %s", queue->is_eos ? "EOS" : "NOT_LINKED");
1032 } else {
1033 GST_LOG_OBJECT (queue,
1034 "Cur level bytes/time/rate-time/buffers %u/%" GST_TIME_FORMAT "/%"
1035 GST_TIME_FORMAT "/%u", queue->cur_level.bytes,
1036 GST_TIME_ARGS (queue->cur_level.time),
1037 GST_TIME_ARGS (queue->cur_level.rate_time), queue->cur_level.buffers);
1038
1039 /* figure out the buffering level we are filled, we take the max of all formats. */
1040 if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
1041 buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, 0);
1042 } else {
1043 guint64 rb_size = queue->ring_buffer_max_size;
1044 buflevel = GET_BUFFER_LEVEL_FOR_QUANTITY (bytes, rb_size);
1045 }
1046
1047 buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (time, 0);
1048 buflevel = MAX (buflevel, buflevel2);
1049
1050 buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (buffers, 0);
1051 buflevel = MAX (buflevel, buflevel2);
1052
1053 /* also apply the rate estimate when we need to */
1054 if (queue->use_rate_estimate) {
1055 buflevel2 = GET_BUFFER_LEVEL_FOR_QUANTITY (rate_time, 0);
1056 buflevel = MAX (buflevel, buflevel2);
1057 }
1058
1059 /* Don't get to 0% unless we're really empty */
1060 if (queue->cur_level.bytes > 0)
1061 buflevel = MAX (1, buflevel);
1062 }
1063 #undef GET_BUFFER_LEVEL_FOR_QUANTITY
1064
1065 if (is_buffering)
1066 *is_buffering = queue->is_buffering;
1067
1068 if (buffering_level)
1069 *buffering_level = buflevel;
1070
1071 GST_DEBUG_OBJECT (queue, "buffering %d, level %d", queue->is_buffering,
1072 buflevel);
1073
1074 return TRUE;
1075 }
1076
1077 static gint
convert_to_buffering_percent(GstQueue2 * queue,gint buffering_level)1078 convert_to_buffering_percent (GstQueue2 * queue, gint buffering_level)
1079 {
1080 int percent;
1081
1082 /* scale so that if buffering_level equals the high watermark,
1083 * the percentage is 100% */
1084 percent = buffering_level * 100 / queue->high_watermark;
1085 /* clip */
1086 if (percent > 100)
1087 percent = 100;
1088
1089 return percent;
1090 }
1091
1092 static void
get_buffering_stats(GstQueue2 * queue,gint percent,GstBufferingMode * mode,gint * avg_in,gint * avg_out,gint64 * buffering_left)1093 get_buffering_stats (GstQueue2 * queue, gint percent, GstBufferingMode * mode,
1094 gint * avg_in, gint * avg_out, gint64 * buffering_left)
1095 {
1096 if (mode) {
1097 if (!QUEUE_IS_USING_QUEUE (queue)) {
1098 if (QUEUE_IS_USING_RING_BUFFER (queue))
1099 *mode = GST_BUFFERING_TIMESHIFT;
1100 else
1101 *mode = GST_BUFFERING_DOWNLOAD;
1102 } else {
1103 *mode = GST_BUFFERING_STREAM;
1104 }
1105 }
1106
1107 if (avg_in)
1108 *avg_in = queue->byte_in_rate;
1109 if (avg_out)
1110 *avg_out = queue->byte_out_rate;
1111
1112 if (buffering_left) {
1113 *buffering_left = (percent == 100 ? 0 : -1);
1114
1115 if (queue->use_rate_estimate) {
1116 guint64 max, cur;
1117
1118 max = queue->max_level.rate_time;
1119 cur = queue->cur_level.rate_time;
1120
1121 if (percent != 100 && max > cur)
1122 *buffering_left = (max - cur) / 1000000;
1123 }
1124 }
1125 }
1126
1127 /* Called with the lock taken */
1128 static GstMessage *
gst_queue2_get_buffering_message(GstQueue2 * queue,gint * percent)1129 gst_queue2_get_buffering_message (GstQueue2 * queue, gint * percent)
1130 {
1131 GstMessage *msg = NULL;
1132 if (queue->percent_changed) {
1133 /* Don't change the buffering level if the sinkpad is waiting for
1134 * space to become available. This prevents the situation where,
1135 * upstream is pushing buffers larger than our limits so only 1 buffer
1136 * is ever in the queue at a time.
1137 * Changing the level causes a buffering message to be posted saying that
1138 * we are buffering which the application may pause to wait for another
1139 * 100% buffering message which would be posted very soon after the
1140 * waiting sink thread adds it's buffer to the queue */
1141 /* FIXME: This situation above can still occur later if
1142 * the sink pad is waiting to push a serialized event into the queue and
1143 * the queue becomes empty for a short period of time. */
1144 if (!queue->waiting_del
1145 && queue->last_posted_buffering_percent != queue->buffering_percent) {
1146 *percent = queue->buffering_percent;
1147
1148 GST_DEBUG_OBJECT (queue, "Going to post buffering: %d%%", *percent);
1149 msg = gst_message_new_buffering (GST_OBJECT_CAST (queue), *percent);
1150
1151 gst_message_set_buffering_stats (msg, queue->mode, queue->avg_in,
1152 queue->avg_out, queue->buffering_left);
1153 }
1154 }
1155
1156 return msg;
1157 }
1158
1159 static void
gst_queue2_post_buffering(GstQueue2 * queue)1160 gst_queue2_post_buffering (GstQueue2 * queue)
1161 {
1162 GstMessage *msg = NULL;
1163 gint percent = -1;
1164
1165 g_mutex_lock (&queue->buffering_post_lock);
1166 GST_QUEUE2_MUTEX_LOCK (queue);
1167 msg = gst_queue2_get_buffering_message (queue, &percent);
1168 GST_QUEUE2_MUTEX_UNLOCK (queue);
1169
1170 if (msg != NULL) {
1171 if (gst_element_post_message (GST_ELEMENT_CAST (queue), msg)) {
1172 GST_QUEUE2_MUTEX_LOCK (queue);
1173 /* Set these states only if posting the message succeeded. Otherwise,
1174 * this post attempt failed, and the next one won't be done, because
1175 * gst_queue2_get_buffering_message() checks these states and decides
1176 * based on their values that it won't produce a message. */
1177 queue->last_posted_buffering_percent = percent;
1178 if (percent == queue->buffering_percent)
1179 queue->percent_changed = FALSE;
1180 GST_QUEUE2_MUTEX_UNLOCK (queue);
1181 GST_DEBUG_OBJECT (queue, "successfully posted %d%% buffering message",
1182 percent);
1183 } else
1184 GST_DEBUG_OBJECT (queue, "could not post buffering message");
1185 }
1186
1187 g_mutex_unlock (&queue->buffering_post_lock);
1188 }
1189
1190 static void
update_buffering(GstQueue2 * queue)1191 update_buffering (GstQueue2 * queue)
1192 {
1193 gint buffering_level, percent;
1194
1195 /* Ensure the variables used to calculate buffering state are up-to-date. */
1196 if (queue->current)
1197 update_cur_level (queue, queue->current);
1198 update_in_rates (queue, FALSE);
1199
1200 if (!get_buffering_level (queue, NULL, &buffering_level))
1201 return;
1202
1203 percent = convert_to_buffering_percent (queue, buffering_level);
1204
1205 if (queue->is_buffering) {
1206 /* if we were buffering see if we reached the high watermark */
1207 if (percent >= 100)
1208 queue->is_buffering = FALSE;
1209
1210 SET_PERCENT (queue, percent);
1211 } else {
1212 /* we were not buffering, check if we need to start buffering if we drop
1213 * below the low threshold */
1214 if (buffering_level < queue->low_watermark) {
1215 queue->is_buffering = TRUE;
1216 SET_PERCENT (queue, percent);
1217 }
1218 }
1219 }
1220
1221 static void
reset_rate_timer(GstQueue2 * queue)1222 reset_rate_timer (GstQueue2 * queue)
1223 {
1224 queue->bytes_in = 0;
1225 queue->bytes_out = 0;
1226 queue->byte_in_rate = 0.0;
1227 queue->byte_in_period = 0;
1228 queue->byte_out_rate = 0.0;
1229 queue->last_update_in_rates_elapsed = 0.0;
1230 queue->last_in_elapsed = 0.0;
1231 queue->last_out_elapsed = 0.0;
1232 queue->in_timer_started = FALSE;
1233 queue->out_timer_started = FALSE;
1234 }
1235
1236 /* the interval in seconds to recalculate the rate */
1237 #define RATE_INTERVAL 0.2
1238 /* Tuning for rate estimation. We use a large window for the input rate because
1239 * it should be stable when connected to a network. The output rate is less
1240 * stable (the elements preroll, queues behind a demuxer fill, ...) and should
1241 * therefore adapt more quickly.
1242 * However, initial input rate may be subject to a burst, and should therefore
1243 * initially also adapt more quickly to changes, and only later on give higher
1244 * weight to previous values. */
1245 #define AVG_IN(avg,val,w1,w2) ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
1246 #define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0
1247
1248 static void
update_in_rates(GstQueue2 * queue,gboolean force)1249 update_in_rates (GstQueue2 * queue, gboolean force)
1250 {
1251 gdouble elapsed, period;
1252 gdouble byte_in_rate;
1253
1254 if (!queue->in_timer_started) {
1255 queue->in_timer_started = TRUE;
1256 g_timer_start (queue->in_timer);
1257 return;
1258 }
1259
1260 queue->last_update_in_rates_elapsed = elapsed =
1261 g_timer_elapsed (queue->in_timer, NULL);
1262
1263 /* recalc after each interval. */
1264 if (force || queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
1265 period = elapsed - queue->last_in_elapsed;
1266
1267 GST_DEBUG_OBJECT (queue,
1268 "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
1269 period, queue->bytes_in, queue->byte_in_period);
1270
1271 byte_in_rate = queue->bytes_in / period;
1272
1273 if (queue->byte_in_rate == 0.0)
1274 queue->byte_in_rate = byte_in_rate;
1275 else
1276 queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
1277 (double) queue->byte_in_period, period);
1278
1279 /* another data point, cap at 16 for long time running average */
1280 if (queue->byte_in_period < 16 * RATE_INTERVAL)
1281 queue->byte_in_period += period;
1282
1283 /* reset the values to calculate rate over the next interval */
1284 queue->last_in_elapsed = elapsed;
1285 queue->bytes_in = 0;
1286 }
1287
1288 if (queue->use_bitrate_query && queue->downstream_bitrate > 0) {
1289 queue->cur_level.rate_time =
1290 gst_util_uint64_scale (8 * queue->cur_level.bytes, GST_SECOND,
1291 queue->downstream_bitrate);
1292 GST_LOG_OBJECT (queue,
1293 "got bitrate %u with byte level %u resulting in time %"
1294 GST_TIME_FORMAT, queue->downstream_bitrate, queue->cur_level.bytes,
1295 GST_TIME_ARGS (queue->cur_level.rate_time));
1296 } else if (queue->byte_in_rate > 0.0) {
1297 queue->cur_level.rate_time =
1298 queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1299 }
1300 GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
1301 queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1302 }
1303
1304 static void
update_out_rates(GstQueue2 * queue)1305 update_out_rates (GstQueue2 * queue)
1306 {
1307 gdouble elapsed, period;
1308 gdouble byte_out_rate;
1309
1310 if (!queue->out_timer_started) {
1311 queue->out_timer_started = TRUE;
1312 g_timer_start (queue->out_timer);
1313 return;
1314 }
1315
1316 elapsed = g_timer_elapsed (queue->out_timer, NULL);
1317
1318 /* recalc after each interval. */
1319 if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
1320 period = elapsed - queue->last_out_elapsed;
1321
1322 GST_DEBUG_OBJECT (queue,
1323 "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);
1324
1325 byte_out_rate = queue->bytes_out / period;
1326
1327 if (queue->byte_out_rate == 0.0)
1328 queue->byte_out_rate = byte_out_rate;
1329 else
1330 queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);
1331
1332 /* reset the values to calculate rate over the next interval */
1333 queue->last_out_elapsed = elapsed;
1334 queue->bytes_out = 0;
1335 }
1336 if (queue->byte_in_rate > 0.0) {
1337 queue->cur_level.rate_time =
1338 queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
1339 }
1340 GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
1341 queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
1342 }
1343
1344 static void
update_cur_pos(GstQueue2 * queue,GstQueue2Range * range,guint64 pos)1345 update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
1346 {
1347 guint64 reading_pos, max_reading_pos;
1348
1349 reading_pos = pos;
1350 max_reading_pos = range->max_reading_pos;
1351
1352 max_reading_pos = MAX (max_reading_pos, reading_pos);
1353
1354 GST_DEBUG_OBJECT (queue,
1355 "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
1356 G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1357 range->max_reading_pos = max_reading_pos;
1358
1359 update_cur_level (queue, range);
1360 }
1361
1362 static gboolean
perform_seek_to_offset(GstQueue2 * queue,guint64 offset)1363 perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
1364 {
1365 GstEvent *event;
1366 gboolean res;
1367
1368 /* until we receive the FLUSH_STOP from this seek, we skip data */
1369 queue->seeking = TRUE;
1370 GST_QUEUE2_MUTEX_UNLOCK (queue);
1371
1372 debug_ranges (queue);
1373
1374 GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);
1375
1376 event =
1377 gst_event_new_seek (1.0, GST_FORMAT_BYTES,
1378 GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
1379 GST_SEEK_TYPE_NONE, -1);
1380
1381 res = gst_pad_push_event (queue->sinkpad, event);
1382 GST_QUEUE2_MUTEX_LOCK (queue);
1383
1384 if (res) {
1385 /* Between us sending the seek event and re-acquiring the lock, the source
1386 * thread might already have pushed data and moved along the range's
1387 * writing_pos beyond the seek offset. In that case we don't want to set
1388 * the writing position back to the requested seek position, as it would
1389 * cause data to be written to the wrong offset in the file or ring buffer.
1390 * We still do the add_range call to switch the current range to the
1391 * requested range, or create one if one doesn't exist yet. */
1392 queue->current = add_range (queue, offset, FALSE);
1393 }
1394
1395 return res;
1396 }
1397
1398 /* get the threshold for when we decide to seek rather than wait */
1399 static guint64
get_seek_threshold(GstQueue2 * queue)1400 get_seek_threshold (GstQueue2 * queue)
1401 {
1402 guint64 threshold;
1403
1404 /* FIXME, find a good threshold based on the incoming rate. */
1405 threshold = 1024 * 512;
1406
1407 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1408 threshold = MIN (threshold,
1409 QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes);
1410 }
1411 return threshold;
1412 }
1413
1414 /* see if there is enough data in the file to read a full buffer */
1415 static gboolean
gst_queue2_have_data(GstQueue2 * queue,guint64 offset,guint length)1416 gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1417 {
1418 GstQueue2Range *range;
1419
1420 GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
1421 offset, length);
1422
1423 if ((range = find_range (queue, offset))) {
1424 if (queue->current != range) {
1425 GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1426 perform_seek_to_offset (queue, range->writing_pos);
1427 }
1428
1429 GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
1430 queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1431
1432 /* we have a range for offset */
1433 GST_DEBUG_OBJECT (queue,
1434 "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
1435 G_GUINT64_FORMAT, range, range->offset, range->writing_pos);
1436
1437 if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1438 return TRUE;
1439
1440 if (offset + length <= range->writing_pos)
1441 return TRUE;
1442 else
1443 GST_DEBUG_OBJECT (queue,
1444 "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1445 (offset + length) - range->writing_pos);
1446
1447 } else {
1448 GST_INFO_OBJECT (queue, "not found in any range off %" G_GUINT64_FORMAT
1449 " len %u", offset, length);
1450 /* we don't have the range, see how far away we are */
1451 if (!queue->is_eos && queue->current) {
1452 guint64 threshold = get_seek_threshold (queue);
1453
1454 if (offset >= queue->current->offset && offset <=
1455 queue->current->writing_pos + threshold) {
1456 GST_INFO_OBJECT (queue,
1457 "requested data is within range, wait for data");
1458 return FALSE;
1459 }
1460 }
1461
1462 /* too far away, do a seek */
1463 perform_seek_to_offset (queue, offset);
1464 }
1465
1466 return FALSE;
1467 }
1468
1469 #ifdef HAVE_FSEEKO
1470 #define FSEEK_FILE(file,offset) (fseeko (file, (off_t) offset, SEEK_SET) != 0)
1471 #elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
1472 #define FSEEK_FILE(file,offset) (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
1473 #else
1474 #define FSEEK_FILE(file,offset) (fseek (file, offset, SEEK_SET) != 0)
1475 #endif
1476
1477 static GstFlowReturn
gst_queue2_read_data_at_offset(GstQueue2 * queue,guint64 offset,guint length,guint8 * dst,gint64 * read_return)1478 gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
1479 guint8 * dst, gint64 * read_return)
1480 {
1481 guint8 *ring_buffer;
1482 size_t res;
1483
1484 ring_buffer = queue->ring_buffer;
1485
1486 if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
1487 goto seek_failed;
1488
1489 /* this should not block */
1490 GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
1491 length, offset);
1492 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
1493 res = fread (dst, 1, length, queue->temp_file);
1494 } else {
1495 memcpy (dst, ring_buffer + offset, length);
1496 res = length;
1497 }
1498
1499 GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1500
1501 if (G_UNLIKELY (res < length)) {
1502 if (!QUEUE_IS_USING_TEMP_FILE (queue))
1503 goto could_not_read;
1504 /* check for errors or EOF */
1505 if (ferror (queue->temp_file))
1506 goto could_not_read;
1507 if (feof (queue->temp_file) && length > 0)
1508 goto eos;
1509 }
1510
1511 *read_return = res;
1512
1513 return GST_FLOW_OK;
1514
1515 seek_failed:
1516 {
1517 GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
1518 return GST_FLOW_ERROR;
1519 }
1520 could_not_read:
1521 {
1522 GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL), GST_ERROR_SYSTEM);
1523 return GST_FLOW_ERROR;
1524 }
1525 eos:
1526 {
1527 GST_DEBUG ("non-regular file hits EOS");
1528 return GST_FLOW_EOS;
1529 }
1530 }
1531
1532 static GstFlowReturn
gst_queue2_create_read(GstQueue2 * queue,guint64 offset,guint length,GstBuffer ** buffer)1533 gst_queue2_create_read (GstQueue2 * queue, guint64 offset, guint length,
1534 GstBuffer ** buffer)
1535 {
1536 GstBuffer *buf;
1537 GstMapInfo info;
1538 guint8 *data;
1539 guint64 file_offset;
1540 guint block_length, remaining, read_length;
1541 guint64 rb_size;
1542 guint64 max_size;
1543 guint64 rpos;
1544 GstFlowReturn ret = GST_FLOW_OK;
1545
1546 /* allocate the output buffer of the requested size */
1547 if (*buffer == NULL)
1548 buf = gst_buffer_new_allocate (NULL, length, NULL);
1549 else
1550 buf = *buffer;
1551
1552 if (!gst_buffer_map (buf, &info, GST_MAP_WRITE))
1553 goto buffer_write_fail;
1554 data = info.data;
1555
1556 GST_DEBUG_OBJECT (queue, "Reading %u bytes from %" G_GUINT64_FORMAT, length,
1557 offset);
1558
1559 rpos = offset;
1560 rb_size = queue->ring_buffer_max_size;
1561 max_size = QUEUE_MAX_BYTES (queue);
1562
1563 remaining = length;
1564 while (remaining > 0) {
1565 /* configure how much/whether to read */
1566 if (!gst_queue2_have_data (queue, rpos, remaining)) {
1567 read_length = 0;
1568
1569 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1570 guint64 level;
1571
1572 /* calculate how far away the offset is */
1573 if (queue->current->writing_pos > rpos)
1574 level = queue->current->writing_pos - rpos;
1575 else
1576 level = 0;
1577
1578 GST_DEBUG_OBJECT (queue,
1579 "reading %" G_GUINT64_FORMAT ", writing %" G_GUINT64_FORMAT
1580 ", level %" G_GUINT64_FORMAT ", max %" G_GUINT64_FORMAT,
1581 rpos, queue->current->writing_pos, level, max_size);
1582
1583 if (level >= max_size) {
1584 /* we don't have the data but if we have a ring buffer that is full, we
1585 * need to read */
1586 GST_DEBUG_OBJECT (queue,
1587 "ring buffer full, reading QUEUE_MAX_BYTES %"
1588 G_GUINT64_FORMAT " bytes", max_size);
1589 read_length = max_size;
1590 } else if (queue->is_eos) {
1591 /* won't get any more data so read any data we have */
1592 if (level) {
1593 GST_DEBUG_OBJECT (queue,
1594 "EOS hit but read %" G_GUINT64_FORMAT " bytes that we have",
1595 level);
1596 #ifdef OHOS_OPT_STABLE
1597 // ohos.opt.stable.0002
1598 // Fix the heap overflow issue. After that gst_queue2_have_data
1599 // is executed, the queue may be filled with enough data to fill the output buffer,
1600 // or even much larger than the output buffer, causing heap overflow problems in
1601 // the subsequent copy process.
1602 //
1603 // The root cause is that the lock is released before the seek operation and then the
1604 // lock is reacquired after the seek operation, which causes the judgment before the lock
1605 // release operation to no longer be valid: there is not enough data in the queue to
1606 // fill the output buffer.
1607 //
1608 // Therefore, the length of the data to be copied is protected to avoid heap overflow
1609 // problems.
1610
1611 read_length = MIN (level, remaining);
1612 remaining = read_length;
1613 length = read_length;
1614 #else
1615 read_length = level;
1616 remaining = level;
1617 length = level;
1618 #endif
1619 } else
1620 goto hit_eos;
1621 }
1622 }
1623
1624 if (read_length == 0) {
1625 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1626 GST_DEBUG_OBJECT (queue,
1627 "update current position [%" G_GUINT64_FORMAT "-%"
1628 G_GUINT64_FORMAT "]", rpos, queue->current->max_reading_pos);
1629 update_cur_pos (queue, queue->current, rpos);
1630 GST_QUEUE2_SIGNAL_DEL (queue);
1631 }
1632
1633 if (queue->use_buffering)
1634 update_buffering (queue);
1635
1636 GST_DEBUG_OBJECT (queue, "waiting for add");
1637 GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
1638 continue;
1639 }
1640 } else {
1641 /* we have the requested data so read it */
1642 read_length = remaining;
1643 }
1644
1645 /* set range reading_pos to actual reading position for this read */
1646 queue->current->reading_pos = rpos;
1647
1648 /* configure how much and from where to read */
1649 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
1650 file_offset =
1651 (queue->current->rb_offset + (rpos -
1652 queue->current->offset)) % rb_size;
1653 if (file_offset + read_length > rb_size) {
1654 block_length = rb_size - file_offset;
1655 } else {
1656 block_length = read_length;
1657 }
1658 } else {
1659 file_offset = rpos;
1660 block_length = read_length;
1661 }
1662
1663 /* while we still have data to read, we loop */
1664 while (read_length > 0) {
1665 gint64 read_return;
1666
1667 ret =
1668 gst_queue2_read_data_at_offset (queue, file_offset, block_length,
1669 data, &read_return);
1670 if (ret != GST_FLOW_OK)
1671 goto read_error;
1672
1673 file_offset += read_return;
1674 if (QUEUE_IS_USING_RING_BUFFER (queue))
1675 file_offset %= rb_size;
1676
1677 data += read_return;
1678 read_length -= read_return;
1679 block_length = read_length;
1680 remaining -= read_return;
1681
1682 rpos = (queue->current->reading_pos += read_return);
1683 update_cur_pos (queue, queue->current, queue->current->reading_pos);
1684 }
1685 GST_QUEUE2_SIGNAL_DEL (queue);
1686 GST_DEBUG_OBJECT (queue, "%u bytes left to read", remaining);
1687 }
1688
1689 gst_buffer_unmap (buf, &info);
1690 gst_buffer_resize (buf, 0, length);
1691
1692 GST_BUFFER_OFFSET (buf) = offset;
1693 GST_BUFFER_OFFSET_END (buf) = offset + length;
1694
1695 *buffer = buf;
1696
1697 return ret;
1698
1699 /* ERRORS */
1700 hit_eos:
1701 {
1702 GST_DEBUG_OBJECT (queue, "EOS hit and we don't have any requested data");
1703 gst_buffer_unmap (buf, &info);
1704 if (*buffer == NULL)
1705 gst_buffer_unref (buf);
1706 return GST_FLOW_EOS;
1707 }
1708 out_flushing:
1709 {
1710 GST_DEBUG_OBJECT (queue, "we are flushing");
1711 gst_buffer_unmap (buf, &info);
1712 if (*buffer == NULL)
1713 gst_buffer_unref (buf);
1714 return GST_FLOW_FLUSHING;
1715 }
1716 read_error:
1717 {
1718 GST_DEBUG_OBJECT (queue, "we have a read error");
1719 gst_buffer_unmap (buf, &info);
1720 if (*buffer == NULL)
1721 gst_buffer_unref (buf);
1722 return ret;
1723 }
1724 buffer_write_fail:
1725 {
1726 GST_ELEMENT_ERROR (queue, RESOURCE, WRITE, (NULL),
1727 ("Can't write to buffer"));
1728 if (*buffer == NULL)
1729 gst_buffer_unref (buf);
1730 return GST_FLOW_ERROR;
1731 }
1732 }
1733
1734 /* should be called with QUEUE_LOCK */
1735 static GstMiniObject *
gst_queue2_read_item_from_file(GstQueue2 * queue)1736 gst_queue2_read_item_from_file (GstQueue2 * queue)
1737 {
1738 GstMiniObject *item;
1739
1740 if (queue->stream_start_event != NULL) {
1741 item = GST_MINI_OBJECT_CAST (queue->stream_start_event);
1742 queue->stream_start_event = NULL;
1743 } else if (queue->starting_segment != NULL) {
1744 item = GST_MINI_OBJECT_CAST (queue->starting_segment);
1745 queue->starting_segment = NULL;
1746 } else {
1747 GstFlowReturn ret;
1748 GstBuffer *buffer = NULL;
1749 guint64 reading_pos;
1750
1751 reading_pos = queue->current->reading_pos;
1752
1753 ret =
1754 gst_queue2_create_read (queue, reading_pos, DEFAULT_BUFFER_SIZE,
1755 &buffer);
1756
1757 switch (ret) {
1758 case GST_FLOW_OK:
1759 item = GST_MINI_OBJECT_CAST (buffer);
1760 break;
1761 case GST_FLOW_EOS:
1762 item = GST_MINI_OBJECT_CAST (gst_event_new_eos ());
1763 break;
1764 default:
1765 item = NULL;
1766 break;
1767 }
1768 }
1769 return item;
1770 }
1771
1772 /* must be called with MUTEX_LOCK. Will briefly release the lock when notifying
1773 * the temp filename. */
1774 static gboolean
gst_queue2_open_temp_location_file(GstQueue2 * queue)1775 gst_queue2_open_temp_location_file (GstQueue2 * queue)
1776 {
1777 gint fd = -1;
1778 gchar *name = NULL;
1779
1780 if (queue->temp_file)
1781 goto already_opened;
1782
1783 GST_DEBUG_OBJECT (queue, "opening temp file %s", queue->temp_template);
1784
1785 /* If temp_template was set, allocate a filename and open that file */
1786
1787 /* nothing to do */
1788 if (queue->temp_template == NULL)
1789 goto no_directory;
1790
1791 /* make copy of the template, we don't want to change this */
1792 name = g_strdup (queue->temp_template);
1793
1794 #ifdef __BIONIC__
1795 fd = g_mkstemp_full (name, O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
1796 #else
1797 fd = g_mkstemp (name);
1798 #endif
1799
1800 if (fd == -1)
1801 goto mkstemp_failed;
1802
1803 /* open the file for update/writing */
1804 queue->temp_file = fdopen (fd, "wb+");
1805 /* error creating file */
1806 if (queue->temp_file == NULL)
1807 goto open_failed;
1808
1809 g_free (queue->temp_location);
1810 queue->temp_location = name;
1811
1812 GST_QUEUE2_MUTEX_UNLOCK (queue);
1813
1814 /* we can't emit the notify with the lock */
1815 g_object_notify_by_pspec (G_OBJECT (queue), obj_props[PROP_TEMP_LOCATION]);
1816
1817 GST_QUEUE2_MUTEX_LOCK (queue);
1818
1819 GST_DEBUG_OBJECT (queue, "opened temp file %s", queue->temp_template);
1820
1821 return TRUE;
1822
1823 /* ERRORS */
1824 already_opened:
1825 {
1826 GST_DEBUG_OBJECT (queue, "temp file was already open");
1827 return TRUE;
1828 }
1829 no_directory:
1830 {
1831 GST_ELEMENT_ERROR (queue, RESOURCE, NOT_FOUND,
1832 (_("No Temp directory specified.")), (NULL));
1833 return FALSE;
1834 }
1835 mkstemp_failed:
1836 {
1837 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1838 (_("Could not create temp file \"%s\"."), queue->temp_template),
1839 GST_ERROR_SYSTEM);
1840 g_free (name);
1841 return FALSE;
1842 }
1843 open_failed:
1844 {
1845 GST_ELEMENT_ERROR (queue, RESOURCE, OPEN_READ,
1846 (_("Could not open file \"%s\" for reading."), name), GST_ERROR_SYSTEM);
1847 g_free (name);
1848 if (fd != -1)
1849 close (fd);
1850 return FALSE;
1851 }
1852 }
1853
1854 static void
gst_queue2_close_temp_location_file(GstQueue2 * queue)1855 gst_queue2_close_temp_location_file (GstQueue2 * queue)
1856 {
1857 /* nothing to do */
1858 if (queue->temp_file == NULL)
1859 return;
1860
1861 GST_DEBUG_OBJECT (queue, "closing temp file");
1862
1863 fflush (queue->temp_file);
1864 fclose (queue->temp_file);
1865
1866 if (queue->temp_remove) {
1867 if (remove (queue->temp_location) < 0) {
1868 GST_WARNING_OBJECT (queue, "Failed to remove temporary file %s: %s",
1869 queue->temp_location, g_strerror (errno));
1870 }
1871 }
1872
1873 queue->temp_file = NULL;
1874 clean_ranges (queue);
1875 }
1876
1877 static void
gst_queue2_flush_temp_file(GstQueue2 * queue)1878 gst_queue2_flush_temp_file (GstQueue2 * queue)
1879 {
1880 if (queue->temp_file == NULL)
1881 return;
1882
1883 GST_DEBUG_OBJECT (queue, "flushing temp file");
1884
1885 queue->temp_file = g_freopen (queue->temp_location, "wb+", queue->temp_file);
1886 }
1887
1888 static void
gst_queue2_locked_flush(GstQueue2 * queue,gboolean full,gboolean clear_temp)1889 gst_queue2_locked_flush (GstQueue2 * queue, gboolean full, gboolean clear_temp)
1890 {
1891 if (!QUEUE_IS_USING_QUEUE (queue)) {
1892 if (QUEUE_IS_USING_TEMP_FILE (queue) && clear_temp)
1893 gst_queue2_flush_temp_file (queue);
1894 init_ranges (queue);
1895 } else {
1896 GstQueue2Item *qitem;
1897
1898 while ((qitem = gst_queue_array_pop_head_struct (queue->queue))) {
1899 if (!full && qitem->type == GST_QUEUE2_ITEM_TYPE_EVENT
1900 && GST_EVENT_IS_STICKY (qitem->item)
1901 && GST_EVENT_TYPE (qitem->item) != GST_EVENT_SEGMENT
1902 && GST_EVENT_TYPE (qitem->item) != GST_EVENT_EOS) {
1903 gst_pad_store_sticky_event (queue->srcpad,
1904 GST_EVENT_CAST (qitem->item));
1905 }
1906
1907 /* Then lose another reference because we are supposed to destroy that
1908 data when flushing */
1909 if (qitem->type != GST_QUEUE2_ITEM_TYPE_QUERY)
1910 gst_mini_object_unref (qitem->item);
1911 }
1912 }
1913 queue->last_query = FALSE;
1914 g_cond_signal (&queue->query_handled);
1915 GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
1916 gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
1917 gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
1918 queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
1919 queue->sink_tainted = queue->src_tainted = TRUE;
1920 if (queue->starting_segment != NULL)
1921 gst_event_unref (queue->starting_segment);
1922 queue->starting_segment = NULL;
1923 queue->segment_event_received = FALSE;
1924 gst_event_replace (&queue->stream_start_event, NULL);
1925
1926 /* we deleted a lot of something */
1927 GST_QUEUE2_SIGNAL_DEL (queue);
1928 }
1929
1930 static gboolean
gst_queue2_wait_free_space(GstQueue2 * queue)1931 gst_queue2_wait_free_space (GstQueue2 * queue)
1932 {
1933 /* We make space available if we're "full" according to whatever
1934 * the user defined as "full". */
1935 if (gst_queue2_is_filled (queue)) {
1936 gboolean started;
1937
1938 /* pause the timer while we wait. The fact that we are waiting does not mean
1939 * the byterate on the input pad is lower */
1940 if ((started = queue->in_timer_started))
1941 g_timer_stop (queue->in_timer);
1942
1943 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
1944 "queue is full, waiting for free space");
1945 do {
1946 /* Wait for space to be available, we could be unlocked because of a flush. */
1947 GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
1948 }
1949 while (gst_queue2_is_filled (queue));
1950
1951 /* and continue if we were running before */
1952 if (started)
1953 g_timer_continue (queue->in_timer);
1954 }
1955 return TRUE;
1956
1957 /* ERRORS */
1958 out_flushing:
1959 {
1960 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is flushing");
1961 return FALSE;
1962 }
1963 }
1964
1965 static gboolean
gst_queue2_create_write(GstQueue2 * queue,GstBuffer * buffer)1966 gst_queue2_create_write (GstQueue2 * queue, GstBuffer * buffer)
1967 {
1968 GstMapInfo info;
1969 guint8 *data, *ring_buffer;
1970 guint size, rb_size;
1971 guint64 writing_pos, new_writing_pos;
1972 GstQueue2Range *range, *prev, *next;
1973 gboolean do_seek = FALSE;
1974
1975 if (QUEUE_IS_USING_RING_BUFFER (queue))
1976 writing_pos = queue->current->rb_writing_pos;
1977 else
1978 writing_pos = queue->current->writing_pos;
1979 ring_buffer = queue->ring_buffer;
1980 rb_size = queue->ring_buffer_max_size;
1981
1982 if (!gst_buffer_map (buffer, &info, GST_MAP_READ))
1983 goto buffer_read_error;
1984
1985 size = info.size;
1986 data = info.data;
1987
1988 GST_DEBUG_OBJECT (queue, "Writing %u bytes to %" G_GUINT64_FORMAT, size,
1989 writing_pos);
1990
1991 /* sanity check */
1992 if (GST_BUFFER_OFFSET_IS_VALID (buffer) &&
1993 GST_BUFFER_OFFSET (buffer) != queue->current->writing_pos) {
1994 GST_WARNING_OBJECT (queue, "buffer offset does not match current writing "
1995 "position! %" G_GINT64_FORMAT " != %" G_GINT64_FORMAT,
1996 GST_BUFFER_OFFSET (buffer), queue->current->writing_pos);
1997 }
1998
1999 while (size > 0) {
2000 guint to_write;
2001
2002 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2003 gint64 space;
2004
2005 /* calculate the space in the ring buffer not used by data from
2006 * the current range */
2007 while (QUEUE_MAX_BYTES (queue) <= queue->cur_level.bytes) {
2008 /* wait until there is some free space */
2009 GST_QUEUE2_WAIT_DEL_CHECK (queue, queue->sinkresult, out_flushing);
2010 }
2011 /* get the amount of space we have */
2012 space = QUEUE_MAX_BYTES (queue) - queue->cur_level.bytes;
2013
2014 /* calculate if we need to split or if we can write the entire
2015 * buffer now */
2016 to_write = MIN (size, space);
2017
2018 /* the writing position in the ring buffer after writing (part
2019 * or all of) the buffer */
2020 new_writing_pos = (writing_pos + to_write) % rb_size;
2021
2022 prev = NULL;
2023 range = queue->ranges;
2024
2025 /* if we need to overwrite data in the ring buffer, we need to
2026 * update the ranges
2027 *
2028 * warning: this code is complicated and includes some
2029 * simplifications - pen, paper and diagrams for the cases
2030 * recommended! */
2031 while (range) {
2032 guint64 range_data_start, range_data_end;
2033 GstQueue2Range *range_to_destroy = NULL;
2034
2035 #ifndef OHOS_OPT_COMPAT
2036 /* ohos.opt.compat.0009
2037 * Resolve the issue of replaying the page gray screen after the video playback is over */
2038 if (range == queue->current)
2039 goto next_range;
2040 #endif
2041 range_data_start = range->rb_offset;
2042 range_data_end = range->rb_writing_pos;
2043
2044 /* handle the special case where the range has no data in it */
2045 if (range->writing_pos == range->offset) {
2046 if (range != queue->current) {
2047 GST_DEBUG_OBJECT (queue,
2048 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
2049 G_GUINT64_FORMAT, range->offset, range->writing_pos);
2050 /* remove range */
2051 range_to_destroy = range;
2052 if (prev)
2053 prev->next = range->next;
2054 }
2055 goto next_range;
2056 }
2057
2058 if (range_data_end > range_data_start) {
2059 if (writing_pos >= range_data_end && new_writing_pos >= writing_pos)
2060 goto next_range;
2061
2062 if (new_writing_pos > range_data_start) {
2063 if (new_writing_pos >= range_data_end) {
2064 GST_DEBUG_OBJECT (queue,
2065 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
2066 G_GUINT64_FORMAT, range->offset, range->writing_pos);
2067 /* remove range */
2068 range_to_destroy = range;
2069 if (prev)
2070 prev->next = range->next;
2071 } else {
2072 GST_DEBUG_OBJECT (queue,
2073 "advancing offsets from %" G_GUINT64_FORMAT " (%"
2074 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
2075 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
2076 range->offset + new_writing_pos - range_data_start,
2077 new_writing_pos);
2078 range->offset += (new_writing_pos - range_data_start);
2079 range->rb_offset = new_writing_pos;
2080 }
2081 }
2082 } else {
2083 guint64 new_wpos_virt = writing_pos + to_write;
2084
2085 if (new_wpos_virt <= range_data_start)
2086 goto next_range;
2087
2088 if (new_wpos_virt > rb_size && new_writing_pos >= range_data_end) {
2089 GST_DEBUG_OBJECT (queue,
2090 "Removing range: offset %" G_GUINT64_FORMAT ", wpos %"
2091 G_GUINT64_FORMAT, range->offset, range->writing_pos);
2092 /* remove range */
2093 range_to_destroy = range;
2094 if (prev)
2095 prev->next = range->next;
2096 } else {
2097 GST_DEBUG_OBJECT (queue,
2098 "advancing offsets from %" G_GUINT64_FORMAT " (%"
2099 G_GUINT64_FORMAT ") to %" G_GUINT64_FORMAT " (%"
2100 G_GUINT64_FORMAT ")", range->offset, range->rb_offset,
2101 range->offset + new_writing_pos - range_data_start,
2102 new_writing_pos);
2103 range->offset += (new_wpos_virt - range_data_start);
2104 range->rb_offset = new_writing_pos;
2105 }
2106 }
2107
2108 next_range:
2109 if (!range_to_destroy)
2110 prev = range;
2111
2112 range = range->next;
2113 if (range_to_destroy) {
2114 if (range_to_destroy == queue->ranges)
2115 queue->ranges = range;
2116 g_slice_free (GstQueue2Range, range_to_destroy);
2117 range_to_destroy = NULL;
2118 }
2119 }
2120 } else {
2121 to_write = size;
2122 new_writing_pos = writing_pos + to_write;
2123 }
2124
2125 if (QUEUE_IS_USING_TEMP_FILE (queue)
2126 && FSEEK_FILE (queue->temp_file, writing_pos))
2127 goto seek_failed;
2128
2129 if (new_writing_pos > writing_pos) {
2130 GST_INFO_OBJECT (queue,
2131 "writing %u bytes to range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT
2132 "] (rb wpos %" G_GUINT64_FORMAT ")", to_write, queue->current->offset,
2133 queue->current->writing_pos, queue->current->rb_writing_pos);
2134 /* either not using ring buffer or no wrapping, just write */
2135 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2136 if (fwrite (data, to_write, 1, queue->temp_file) != 1)
2137 goto handle_error;
2138 } else {
2139 memcpy (ring_buffer + writing_pos, data, to_write);
2140 }
2141
2142 if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
2143 /* try to merge with next range */
2144 while ((next = queue->current->next)) {
2145 GST_INFO_OBJECT (queue,
2146 "checking merge with next range %" G_GUINT64_FORMAT " < %"
2147 G_GUINT64_FORMAT, new_writing_pos, next->offset);
2148 if (new_writing_pos < next->offset)
2149 break;
2150
2151 GST_DEBUG_OBJECT (queue, "merging ranges %" G_GUINT64_FORMAT,
2152 next->writing_pos);
2153
2154 /* remove the group */
2155 queue->current->next = next->next;
2156
2157 /* We use the threshold to decide if we want to do a seek or simply
2158 * read the data again. If there is not so much data in the range we
2159 * prefer to avoid to seek and read it again. */
2160 if (next->writing_pos > new_writing_pos + get_seek_threshold (queue)) {
2161 /* the new range had more data than the threshold, it's worth keeping
2162 * it and doing a seek. */
2163 new_writing_pos = next->writing_pos;
2164 do_seek = TRUE;
2165 }
2166 g_slice_free (GstQueue2Range, next);
2167 }
2168 goto update_and_signal;
2169 }
2170 } else {
2171 /* wrapping */
2172 guint block_one, block_two;
2173
2174 block_one = rb_size - writing_pos;
2175 block_two = to_write - block_one;
2176
2177 if (block_one > 0) {
2178 GST_INFO_OBJECT (queue, "writing %u bytes", block_one);
2179 /* write data to end of ring buffer */
2180 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2181 if (fwrite (data, block_one, 1, queue->temp_file) != 1)
2182 goto handle_error;
2183 } else {
2184 memcpy (ring_buffer + writing_pos, data, block_one);
2185 }
2186 }
2187
2188 if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, 0))
2189 goto seek_failed;
2190
2191 if (block_two > 0) {
2192 GST_INFO_OBJECT (queue, "writing %u bytes", block_two);
2193 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
2194 if (fwrite (data + block_one, block_two, 1, queue->temp_file) != 1)
2195 goto handle_error;
2196 } else {
2197 memcpy (ring_buffer, data + block_one, block_two);
2198 }
2199 }
2200 }
2201
2202 update_and_signal:
2203 /* update the writing positions */
2204 size -= to_write;
2205 GST_INFO_OBJECT (queue,
2206 "wrote %u bytes to %" G_GUINT64_FORMAT " (%u bytes remaining to write)",
2207 to_write, writing_pos, size);
2208
2209 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2210 data += to_write;
2211 queue->current->writing_pos += to_write;
2212 queue->current->rb_writing_pos = writing_pos = new_writing_pos;
2213 } else {
2214 queue->current->writing_pos = writing_pos = new_writing_pos;
2215 }
2216 if (do_seek)
2217 perform_seek_to_offset (queue, new_writing_pos);
2218
2219 update_cur_level (queue, queue->current);
2220
2221 /* update the buffering status */
2222 if (queue->use_buffering) {
2223 GstMessage *msg;
2224 gint percent = -1;
2225 update_buffering (queue);
2226 msg = gst_queue2_get_buffering_message (queue, &percent);
2227 if (msg) {
2228 gboolean post_ok;
2229
2230 GST_QUEUE2_MUTEX_UNLOCK (queue);
2231
2232 g_mutex_lock (&queue->buffering_post_lock);
2233 post_ok = gst_element_post_message (GST_ELEMENT_CAST (queue), msg);
2234
2235 GST_QUEUE2_MUTEX_LOCK (queue);
2236
2237 if (post_ok) {
2238 /* Set these states only if posting the message succeeded. Otherwise,
2239 * this post attempt failed, and the next one won't be done, because
2240 * gst_queue2_get_buffering_message() checks these states and decides
2241 * based on their values that it won't produce a message. */
2242 queue->last_posted_buffering_percent = percent;
2243 if (percent == queue->buffering_percent)
2244 queue->percent_changed = FALSE;
2245 GST_DEBUG_OBJECT (queue, "successfully posted %d%% buffering message",
2246 percent);
2247 } else {
2248 GST_DEBUG_OBJECT (queue, "could not post buffering message");
2249 }
2250 g_mutex_unlock (&queue->buffering_post_lock);
2251 }
2252 }
2253
2254 GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
2255 queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
2256
2257 GST_QUEUE2_SIGNAL_ADD (queue);
2258 }
2259
2260 gst_buffer_unmap (buffer, &info);
2261
2262 return TRUE;
2263
2264 /* ERRORS */
2265 out_flushing:
2266 {
2267 GST_DEBUG_OBJECT (queue, "we are flushing");
2268 gst_buffer_unmap (buffer, &info);
2269 /* FIXME - GST_FLOW_EOS ? */
2270 return FALSE;
2271 }
2272 seek_failed:
2273 {
2274 GST_ELEMENT_ERROR (queue, RESOURCE, SEEK, (NULL), GST_ERROR_SYSTEM);
2275 gst_buffer_unmap (buffer, &info);
2276 return FALSE;
2277 }
2278 handle_error:
2279 {
2280 switch (errno) {
2281 case ENOSPC:{
2282 GST_ELEMENT_ERROR (queue, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
2283 break;
2284 }
2285 default:{
2286 GST_ELEMENT_ERROR (queue, RESOURCE, WRITE,
2287 (_("Error while writing to download file.")),
2288 ("%s", g_strerror (errno)));
2289 }
2290 }
2291 gst_buffer_unmap (buffer, &info);
2292 return FALSE;
2293 }
2294 buffer_read_error:
2295 {
2296 GST_ELEMENT_ERROR (queue, RESOURCE, READ, (NULL),
2297 ("Can't read from buffer"));
2298 return FALSE;
2299 }
2300 }
2301
2302 static gboolean
buffer_list_create_write(GstBuffer ** buf,guint idx,gpointer q)2303 buffer_list_create_write (GstBuffer ** buf, guint idx, gpointer q)
2304 {
2305 GstQueue2 *queue = q;
2306
2307 GST_TRACE_OBJECT (queue,
2308 "writing buffer %u of size %" G_GSIZE_FORMAT " bytes", idx,
2309 gst_buffer_get_size (*buf));
2310
2311 if (!gst_queue2_create_write (queue, *buf)) {
2312 GST_INFO_OBJECT (queue, "create_write() returned FALSE, bailing out");
2313 return FALSE;
2314 }
2315 return TRUE;
2316 }
2317
2318 /* enqueue an item an update the level stats */
2319 static void
gst_queue2_locked_enqueue(GstQueue2 * queue,gpointer item,GstQueue2ItemType item_type)2320 gst_queue2_locked_enqueue (GstQueue2 * queue, gpointer item,
2321 GstQueue2ItemType item_type)
2322 {
2323 if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
2324 GstBuffer *buffer;
2325 guint size;
2326
2327 buffer = GST_BUFFER_CAST (item);
2328 size = gst_buffer_get_size (buffer);
2329
2330 /* add buffer to the statistics */
2331 if (QUEUE_IS_USING_QUEUE (queue)) {
2332 queue->cur_level.buffers++;
2333 queue->cur_level.bytes += size;
2334 }
2335 queue->bytes_in += size;
2336
2337 /* apply new buffer to segment stats */
2338 apply_buffer (queue, buffer, &queue->sink_segment, size, TRUE);
2339 /* update the byterate stats */
2340 update_in_rates (queue, FALSE);
2341
2342 if (!QUEUE_IS_USING_QUEUE (queue)) {
2343 /* FIXME - check return value? */
2344 gst_queue2_create_write (queue, buffer);
2345 }
2346 } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
2347 GstBufferList *buffer_list;
2348 guint size;
2349
2350 buffer_list = GST_BUFFER_LIST_CAST (item);
2351
2352 size = gst_buffer_list_calculate_size (buffer_list);
2353 GST_LOG_OBJECT (queue, "total size of buffer list: %u bytes", size);
2354
2355 /* add buffer to the statistics */
2356 if (QUEUE_IS_USING_QUEUE (queue)) {
2357 queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
2358 queue->cur_level.bytes += size;
2359 }
2360 queue->bytes_in += size;
2361
2362 /* apply new buffer to segment stats */
2363 apply_buffer_list (queue, buffer_list, &queue->sink_segment, TRUE);
2364
2365 /* update the byterate stats */
2366 update_in_rates (queue, FALSE);
2367
2368 if (!QUEUE_IS_USING_QUEUE (queue)) {
2369 gst_buffer_list_foreach (buffer_list, buffer_list_create_write, queue);
2370 }
2371 } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
2372 GstEvent *event;
2373
2374 event = GST_EVENT_CAST (item);
2375
2376 switch (GST_EVENT_TYPE (event)) {
2377 case GST_EVENT_EOS:
2378 /* Zero the thresholds, this makes sure the queue is completely
2379 * filled and we can read all data from the queue. */
2380 GST_DEBUG_OBJECT (queue, "we have EOS");
2381 queue->is_eos = TRUE;
2382 /* Force updating the input bitrate */
2383 update_in_rates (queue, TRUE);
2384 break;
2385 case GST_EVENT_SEGMENT:
2386 apply_segment (queue, event, &queue->sink_segment, TRUE);
2387 /* This is our first new segment, we hold it
2388 * as we can't save it on the temp file */
2389 if (!QUEUE_IS_USING_QUEUE (queue)) {
2390 if (queue->segment_event_received)
2391 goto unexpected_event;
2392
2393 queue->segment_event_received = TRUE;
2394 if (queue->starting_segment != NULL)
2395 gst_event_unref (queue->starting_segment);
2396 queue->starting_segment = event;
2397 item = NULL;
2398 }
2399 /* a new segment allows us to accept more buffers if we got EOS
2400 * from downstream */
2401 queue->unexpected = FALSE;
2402 break;
2403 case GST_EVENT_GAP:
2404 apply_gap (queue, event, &queue->sink_segment, TRUE);
2405 break;
2406 case GST_EVENT_STREAM_START:
2407 if (!QUEUE_IS_USING_QUEUE (queue)) {
2408 gst_event_replace (&queue->stream_start_event, event);
2409 gst_event_unref (event);
2410 item = NULL;
2411 }
2412 break;
2413 case GST_EVENT_CAPS:{
2414 GstCaps *caps;
2415
2416 gst_event_parse_caps (event, &caps);
2417 GST_INFO ("got caps: %" GST_PTR_FORMAT, caps);
2418
2419 if (!QUEUE_IS_USING_QUEUE (queue)) {
2420 GST_LOG ("Dropping caps event, not using queue");
2421 gst_event_unref (event);
2422 item = NULL;
2423 }
2424 break;
2425 }
2426 default:
2427 if (!QUEUE_IS_USING_QUEUE (queue))
2428 goto unexpected_event;
2429 break;
2430 }
2431 } else if (GST_IS_QUERY (item)) {
2432 /* Can't happen as we check that in the caller */
2433 if (!QUEUE_IS_USING_QUEUE (queue))
2434 g_assert_not_reached ();
2435 } else {
2436 g_warning ("Unexpected item %p added in queue %s (refcounting problem?)",
2437 item, GST_OBJECT_NAME (queue));
2438 /* we can't really unref since we don't know what it is */
2439 item = NULL;
2440 }
2441
2442 if (item) {
2443 /* update the buffering status */
2444 if (queue->use_buffering)
2445 update_buffering (queue);
2446
2447 if (QUEUE_IS_USING_QUEUE (queue)) {
2448 GstQueue2Item qitem;
2449
2450 qitem.type = item_type;
2451 qitem.item = item;
2452 gst_queue_array_push_tail_struct (queue->queue, &qitem);
2453 } else {
2454 gst_mini_object_unref (GST_MINI_OBJECT_CAST (item));
2455 }
2456
2457 GST_QUEUE2_SIGNAL_ADD (queue);
2458 }
2459
2460 return;
2461
2462 /* ERRORS */
2463 unexpected_event:
2464 {
2465 gboolean is_custom = GST_EVENT_TYPE (item) < GST_EVENT_CUSTOM_UPSTREAM;
2466
2467 GST_WARNING_OBJECT (queue, "%s%s event can't be added to temp file: "
2468 "%" GST_PTR_FORMAT, is_custom ? "Unexpected " : "",
2469 GST_EVENT_TYPE_NAME (item), GST_EVENT_CAST (item));
2470 gst_event_unref (GST_EVENT_CAST (item));
2471 return;
2472 }
2473 }
2474
2475 /* dequeue an item from the queue and update level stats */
2476 static GstMiniObject *
gst_queue2_locked_dequeue(GstQueue2 * queue,GstQueue2ItemType * item_type)2477 gst_queue2_locked_dequeue (GstQueue2 * queue, GstQueue2ItemType * item_type)
2478 {
2479 GstMiniObject *item;
2480
2481 if (!QUEUE_IS_USING_QUEUE (queue)) {
2482 item = gst_queue2_read_item_from_file (queue);
2483 } else {
2484 GstQueue2Item *qitem = gst_queue_array_pop_head_struct (queue->queue);
2485
2486 if (qitem == NULL)
2487 goto no_item;
2488
2489 item = qitem->item;
2490 }
2491
2492 if (item == NULL)
2493 goto no_item;
2494
2495 if (GST_IS_BUFFER (item)) {
2496 GstBuffer *buffer;
2497 guint size;
2498
2499 buffer = GST_BUFFER_CAST (item);
2500 size = gst_buffer_get_size (buffer);
2501 *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER;
2502
2503 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2504 "retrieved buffer %p from queue", buffer);
2505
2506 if (QUEUE_IS_USING_QUEUE (queue)) {
2507 queue->cur_level.buffers--;
2508 queue->cur_level.bytes -= size;
2509 }
2510 queue->bytes_out += size;
2511
2512 apply_buffer (queue, buffer, &queue->src_segment, size, FALSE);
2513 /* update the byterate stats */
2514 update_out_rates (queue);
2515 /* update the buffering */
2516 if (queue->use_buffering)
2517 update_buffering (queue);
2518
2519 } else if (GST_IS_EVENT (item)) {
2520 GstEvent *event = GST_EVENT_CAST (item);
2521
2522 *item_type = GST_QUEUE2_ITEM_TYPE_EVENT;
2523
2524 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2525 "retrieved event %p from queue", event);
2526
2527 switch (GST_EVENT_TYPE (event)) {
2528 case GST_EVENT_EOS:
2529 /* queue is empty now that we dequeued the EOS */
2530 GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
2531 break;
2532 case GST_EVENT_SEGMENT:
2533 apply_segment (queue, event, &queue->src_segment, FALSE);
2534 break;
2535 case GST_EVENT_GAP:
2536 apply_gap (queue, event, &queue->src_segment, FALSE);
2537 break;
2538 default:
2539 break;
2540 }
2541 } else if (GST_IS_BUFFER_LIST (item)) {
2542 GstBufferList *buffer_list;
2543 guint size;
2544
2545 buffer_list = GST_BUFFER_LIST_CAST (item);
2546 size = gst_buffer_list_calculate_size (buffer_list);
2547 *item_type = GST_QUEUE2_ITEM_TYPE_BUFFER_LIST;
2548
2549 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2550 "retrieved buffer list %p from queue", buffer_list);
2551
2552 if (QUEUE_IS_USING_QUEUE (queue)) {
2553 queue->cur_level.buffers -= gst_buffer_list_length (buffer_list);
2554 queue->cur_level.bytes -= size;
2555 }
2556 queue->bytes_out += size;
2557
2558 apply_buffer_list (queue, buffer_list, &queue->src_segment, FALSE);
2559 /* update the byterate stats */
2560 update_out_rates (queue);
2561 /* update the buffering */
2562 if (queue->use_buffering)
2563 update_buffering (queue);
2564 } else if (GST_IS_QUERY (item)) {
2565 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2566 "retrieved query %p from queue", item);
2567 *item_type = GST_QUEUE2_ITEM_TYPE_QUERY;
2568 } else {
2569 g_warning
2570 ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
2571 item, GST_OBJECT_NAME (queue));
2572 item = NULL;
2573 *item_type = GST_QUEUE2_ITEM_TYPE_UNKNOWN;
2574 }
2575 GST_QUEUE2_SIGNAL_DEL (queue);
2576
2577 return item;
2578
2579 /* ERRORS */
2580 no_item:
2581 {
2582 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "the queue is empty");
2583 return NULL;
2584 }
2585 }
2586
2587 static GstFlowReturn
gst_queue2_handle_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)2588 gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
2589 GstEvent * event)
2590 {
2591 gboolean ret = TRUE;
2592 GstQueue2 *queue;
2593
2594 queue = GST_QUEUE2 (parent);
2595
2596 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
2597 GST_EVENT_TYPE_NAME (event));
2598
2599 switch (GST_EVENT_TYPE (event)) {
2600 case GST_EVENT_FLUSH_START:
2601 {
2602 if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2603 /* forward event */
2604 ret = gst_pad_push_event (queue->srcpad, event);
2605
2606 /* now unblock the chain function */
2607 GST_QUEUE2_MUTEX_LOCK (queue);
2608 queue->srcresult = GST_FLOW_FLUSHING;
2609 queue->sinkresult = GST_FLOW_FLUSHING;
2610 /* unblock the loop and chain functions */
2611 GST_QUEUE2_SIGNAL_ADD (queue);
2612 GST_QUEUE2_SIGNAL_DEL (queue);
2613 GST_QUEUE2_MUTEX_UNLOCK (queue);
2614
2615 /* make sure it pauses, this should happen since we sent
2616 * flush_start downstream. */
2617 gst_pad_pause_task (queue->srcpad);
2618 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
2619
2620 GST_QUEUE2_MUTEX_LOCK (queue);
2621 queue->last_query = FALSE;
2622 g_cond_signal (&queue->query_handled);
2623 GST_QUEUE2_MUTEX_UNLOCK (queue);
2624 } else {
2625 GST_QUEUE2_MUTEX_LOCK (queue);
2626 /* flush the sink pad */
2627 queue->sinkresult = GST_FLOW_FLUSHING;
2628 GST_QUEUE2_SIGNAL_DEL (queue);
2629 queue->last_query = FALSE;
2630 g_cond_signal (&queue->query_handled);
2631 GST_QUEUE2_MUTEX_UNLOCK (queue);
2632
2633 gst_event_unref (event);
2634 }
2635 break;
2636 }
2637 case GST_EVENT_FLUSH_STOP:
2638 {
2639 if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2640 /* forward event */
2641 ret = gst_pad_push_event (queue->srcpad, event);
2642
2643 GST_QUEUE2_MUTEX_LOCK (queue);
2644 gst_queue2_locked_flush (queue, FALSE, TRUE);
2645 queue->srcresult = GST_FLOW_OK;
2646 queue->sinkresult = GST_FLOW_OK;
2647 queue->is_eos = FALSE;
2648 queue->unexpected = FALSE;
2649 queue->seeking = FALSE;
2650 queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2651 /* reset rate counters */
2652 reset_rate_timer (queue);
2653 gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue2_loop,
2654 queue->srcpad, NULL);
2655 GST_QUEUE2_MUTEX_UNLOCK (queue);
2656 } else {
2657 GST_QUEUE2_MUTEX_LOCK (queue);
2658 queue->segment_event_received = FALSE;
2659 queue->is_eos = FALSE;
2660 queue->unexpected = FALSE;
2661 queue->sinkresult = GST_FLOW_OK;
2662 queue->seeking = FALSE;
2663 queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2664 GST_QUEUE2_MUTEX_UNLOCK (queue);
2665
2666 gst_event_unref (event);
2667 }
2668 g_object_notify_by_pspec (G_OBJECT (queue), obj_props[PROP_BITRATE]);
2669 break;
2670 }
2671 case GST_EVENT_TAG:{
2672 if (queue->use_tags_bitrate) {
2673 GstTagList *tags;
2674 guint bitrate;
2675
2676 gst_event_parse_tag (event, &tags);
2677 if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
2678 gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
2679 GST_QUEUE2_MUTEX_LOCK (queue);
2680 queue->sink_tags_bitrate = bitrate;
2681 GST_QUEUE2_MUTEX_UNLOCK (queue);
2682 GST_LOG_OBJECT (queue, "Sink pad bitrate from tags now %u", bitrate);
2683 g_object_notify_by_pspec (G_OBJECT (queue), obj_props[PROP_BITRATE]);
2684 }
2685 }
2686 /* Fall-through */
2687 }
2688 default:
2689 if (GST_EVENT_IS_SERIALIZED (event)) {
2690 gboolean bitrate_changed = TRUE;
2691 /* serialized events go in the queue */
2692
2693 /* STREAM_START and SEGMENT reset the EOS status of a
2694 * pad. Change the cached sinkpad flow result accordingly */
2695 if (queue->sinkresult == GST_FLOW_EOS
2696 && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
2697 || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
2698 queue->sinkresult = GST_FLOW_OK;
2699
2700 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2701 if (queue->srcresult != GST_FLOW_OK) {
2702 /* Errors in sticky event pushing are no problem and ignored here
2703 * as they will cause more meaningful errors during data flow.
2704 * For EOS events, that are not followed by data flow, we still
2705 * return FALSE here though and report an error.
2706 */
2707 if (!GST_EVENT_IS_STICKY (event)) {
2708 goto out_flow_error;
2709 } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
2710 if (queue->srcresult == GST_FLOW_NOT_LINKED
2711 || queue->srcresult < GST_FLOW_EOS) {
2712 GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
2713 }
2714 goto out_flow_error;
2715 }
2716 }
2717
2718 /* refuse more events on EOS unless they unset the EOS status */
2719 if (queue->is_eos) {
2720 switch (GST_EVENT_TYPE (event)) {
2721 case GST_EVENT_STREAM_START:
2722 case GST_EVENT_SEGMENT:
2723 /* Restart the loop */
2724 if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
2725 queue->srcresult = GST_FLOW_OK;
2726 queue->is_eos = FALSE;
2727 queue->unexpected = FALSE;
2728 queue->seeking = FALSE;
2729 queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2730 /* reset rate counters */
2731 reset_rate_timer (queue);
2732 gst_pad_start_task (queue->srcpad,
2733 (GstTaskFunction) gst_queue2_loop, queue->srcpad, NULL);
2734 } else {
2735 queue->is_eos = FALSE;
2736 queue->unexpected = FALSE;
2737 queue->seeking = FALSE;
2738 queue->src_tags_bitrate = queue->sink_tags_bitrate = 0;
2739 }
2740 bitrate_changed = TRUE;
2741
2742 break;
2743 default:
2744 goto out_eos;
2745 }
2746 }
2747
2748 gst_queue2_locked_enqueue (queue, event, GST_QUEUE2_ITEM_TYPE_EVENT);
2749 GST_QUEUE2_MUTEX_UNLOCK (queue);
2750 gst_queue2_post_buffering (queue);
2751 if (bitrate_changed)
2752 g_object_notify_by_pspec (G_OBJECT (queue), obj_props[PROP_BITRATE]);
2753 } else {
2754 /* non-serialized events are passed downstream. */
2755 ret = gst_pad_push_event (queue->srcpad, event);
2756 }
2757 break;
2758 }
2759 if (ret == FALSE)
2760 return GST_FLOW_ERROR;
2761 return GST_FLOW_OK;
2762
2763 /* ERRORS */
2764 out_flushing:
2765 {
2766 GstFlowReturn ret = queue->sinkresult;
2767 GST_DEBUG_OBJECT (queue, "refusing event, we are %s",
2768 gst_flow_get_name (ret));
2769 GST_QUEUE2_MUTEX_UNLOCK (queue);
2770 gst_event_unref (event);
2771 return ret;
2772 }
2773 out_eos:
2774 {
2775 GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
2776 GST_QUEUE2_MUTEX_UNLOCK (queue);
2777 gst_event_unref (event);
2778 return GST_FLOW_EOS;
2779 }
2780 out_flow_error:
2781 {
2782 GST_LOG_OBJECT (queue,
2783 "refusing event, we have a downstream flow error: %s",
2784 gst_flow_get_name (queue->srcresult));
2785 GST_QUEUE2_MUTEX_UNLOCK (queue);
2786 gst_event_unref (event);
2787 return queue->srcresult;
2788 }
2789 }
2790
2791 static gboolean
gst_queue2_handle_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)2792 gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
2793 GstQuery * query)
2794 {
2795 GstQueue2 *queue;
2796 gboolean res;
2797
2798 queue = GST_QUEUE2 (parent);
2799
2800 switch (GST_QUERY_TYPE (query)) {
2801 default:
2802 if (GST_QUERY_IS_SERIALIZED (query)) {
2803 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2804 "received query %" GST_PTR_FORMAT, query);
2805 /* serialized events go in the queue. We need to be certain that we
2806 * don't cause deadlocks waiting for the query return value. We check if
2807 * the queue is empty (nothing is blocking downstream and the query can
2808 * be pushed for sure) or we are not buffering. If we are buffering,
2809 * the pipeline waits to unblock downstream until our queue fills up
2810 * completely, which can not happen if we block on the query..
2811 * Therefore we only potentially block when we are not buffering.
2812 *
2813 * Update: Edward Hervey 2021: Realistically when posting buffering
2814 * messages there are no safe places where we can block and forward a
2815 * serialized query due to the potential of causing deadlocks. We
2816 * therefore refuse any serialized queries in such cases. */
2817 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2818 if (QUEUE_IS_USING_QUEUE (queue) && !queue->use_buffering) {
2819 if (!g_atomic_int_get (&queue->downstream_may_block)) {
2820 gst_queue2_locked_enqueue (queue, query,
2821 GST_QUEUE2_ITEM_TYPE_QUERY);
2822
2823 STATUS (queue, queue->sinkpad, "wait for QUERY");
2824 while (queue->sinkresult == GST_FLOW_OK &&
2825 queue->last_handled_query != query)
2826 g_cond_wait (&queue->query_handled, &queue->qlock);
2827 queue->last_handled_query = NULL;
2828 if (queue->sinkresult != GST_FLOW_OK)
2829 goto out_flushing;
2830 res = queue->last_query;
2831 } else {
2832 GST_DEBUG_OBJECT (queue, "refusing query, downstream might block");
2833 res = FALSE;
2834 }
2835 } else {
2836 GST_DEBUG_OBJECT (queue,
2837 "refusing query, we are not using the queue or we are posting buffering messages");
2838 res = FALSE;
2839 }
2840 GST_QUEUE2_MUTEX_UNLOCK (queue);
2841 gst_queue2_post_buffering (queue);
2842 } else {
2843 res = gst_pad_query_default (pad, parent, query);
2844 }
2845 break;
2846 }
2847 return res;
2848
2849 /* ERRORS */
2850 out_flushing:
2851 {
2852 GST_DEBUG_OBJECT (queue, "refusing query, we are %s",
2853 gst_flow_get_name (queue->sinkresult));
2854 GST_QUEUE2_MUTEX_UNLOCK (queue);
2855 return FALSE;
2856 }
2857 }
2858
2859 static gboolean
gst_queue2_is_empty(GstQueue2 * queue)2860 gst_queue2_is_empty (GstQueue2 * queue)
2861 {
2862 /* never empty on EOS */
2863 if (queue->is_eos)
2864 return FALSE;
2865
2866 if (!QUEUE_IS_USING_QUEUE (queue) && queue->current) {
2867 return queue->current->writing_pos <= queue->current->max_reading_pos;
2868 } else {
2869 if (gst_queue_array_get_length (queue->queue) == 0)
2870 return TRUE;
2871 }
2872
2873 return FALSE;
2874 }
2875
2876 static gboolean
gst_queue2_is_filled(GstQueue2 * queue)2877 gst_queue2_is_filled (GstQueue2 * queue)
2878 {
2879 gboolean res;
2880
2881 /* always filled on EOS */
2882 if (queue->is_eos)
2883 return TRUE;
2884
2885 /* Check the levels if non-null */
2886 #define CHECK_FILLED_REAL(format) \
2887 ((queue->max_level.format) > 0 && (queue->cur_level.format) >= ((queue->max_level.format)))
2888 /* Check the levels if non-null (use the alternative max if non-zero) */
2889 #define CHECK_FILLED_ALT(format,alt_max) ((queue->max_level.format) > 0 && \
2890 (queue->cur_level.format) >= ((alt_max) ? \
2891 MIN ((queue->max_level.format), (alt_max)) : (queue->max_level.format)))
2892
2893 /* if using a ring buffer we're filled if all ring buffer space is used
2894 * _by the current range_ */
2895 if (QUEUE_IS_USING_RING_BUFFER (queue)) {
2896 guint64 rb_size = queue->ring_buffer_max_size;
2897 GST_DEBUG_OBJECT (queue,
2898 "max bytes %u, rb size %" G_GUINT64_FORMAT ", cur bytes %u",
2899 queue->max_level.bytes, rb_size, queue->cur_level.bytes);
2900 return CHECK_FILLED_ALT (bytes, rb_size);
2901 }
2902
2903 /* if using file, we're never filled if we don't have EOS */
2904 if (QUEUE_IS_USING_TEMP_FILE (queue))
2905 return FALSE;
2906
2907 /* we are never filled when we have no buffers at all */
2908 if (queue->cur_level.buffers == 0)
2909 return FALSE;
2910
2911 /* we are filled if one of the current levels exceeds the max */
2912 res = CHECK_FILLED_REAL (buffers) || CHECK_FILLED_REAL (bytes)
2913 || CHECK_FILLED_REAL (time);
2914
2915 /* if we need to, use the rate estimate to check against the max time we are
2916 * allowed to queue */
2917 if (queue->use_rate_estimate)
2918 res |= CHECK_FILLED_REAL (rate_time);
2919
2920 #undef CHECK_FILLED_REAL
2921 #undef CHECK_FILLED_ALT
2922 return res;
2923 }
2924
2925 static GstFlowReturn
gst_queue2_chain_buffer_or_buffer_list(GstQueue2 * queue,GstMiniObject * item,GstQueue2ItemType item_type)2926 gst_queue2_chain_buffer_or_buffer_list (GstQueue2 * queue,
2927 GstMiniObject * item, GstQueue2ItemType item_type)
2928 {
2929 /* we have to lock the queue since we span threads */
2930 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->sinkresult, out_flushing);
2931 /* when we received EOS, we refuse more data */
2932 if (queue->is_eos)
2933 goto out_eos;
2934 /* when we received unexpected from downstream, refuse more buffers */
2935 if (queue->unexpected)
2936 goto out_unexpected;
2937
2938 /* while we didn't receive the newsegment, we're seeking and we skip data */
2939 if (queue->seeking)
2940 goto out_seeking;
2941
2942 if (!gst_queue2_wait_free_space (queue))
2943 goto out_flushing;
2944
2945 /* put buffer in queue now */
2946 gst_queue2_locked_enqueue (queue, item, item_type);
2947 GST_QUEUE2_MUTEX_UNLOCK (queue);
2948 gst_queue2_post_buffering (queue);
2949
2950 return GST_FLOW_OK;
2951
2952 /* special conditions */
2953 out_flushing:
2954 {
2955 GstFlowReturn ret = queue->sinkresult;
2956
2957 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
2958 "exit because task paused, reason: %s", gst_flow_get_name (ret));
2959 GST_QUEUE2_MUTEX_UNLOCK (queue);
2960 gst_mini_object_unref (item);
2961
2962 return ret;
2963 }
2964 out_eos:
2965 {
2966 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2967 GST_QUEUE2_MUTEX_UNLOCK (queue);
2968 gst_mini_object_unref (item);
2969
2970 return GST_FLOW_EOS;
2971 }
2972 out_seeking:
2973 {
2974 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are seeking");
2975 GST_QUEUE2_MUTEX_UNLOCK (queue);
2976 gst_mini_object_unref (item);
2977
2978 return GST_FLOW_OK;
2979 }
2980 out_unexpected:
2981 {
2982 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
2983 GST_QUEUE2_MUTEX_UNLOCK (queue);
2984 gst_mini_object_unref (item);
2985
2986 return GST_FLOW_EOS;
2987 }
2988 }
2989
2990 static GstFlowReturn
gst_queue2_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)2991 gst_queue2_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
2992 {
2993 GstQueue2 *queue;
2994
2995 queue = GST_QUEUE2 (parent);
2996
2997 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "received buffer %p of "
2998 "size %" G_GSIZE_FORMAT ", time %" GST_TIME_FORMAT ", duration %"
2999 GST_TIME_FORMAT, buffer, gst_buffer_get_size (buffer),
3000 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer)),
3001 GST_TIME_ARGS (GST_BUFFER_DURATION (buffer)));
3002
3003 return gst_queue2_chain_buffer_or_buffer_list (queue,
3004 GST_MINI_OBJECT_CAST (buffer), GST_QUEUE2_ITEM_TYPE_BUFFER);
3005 }
3006
3007 static GstFlowReturn
gst_queue2_chain_list(GstPad * pad,GstObject * parent,GstBufferList * buffer_list)3008 gst_queue2_chain_list (GstPad * pad, GstObject * parent,
3009 GstBufferList * buffer_list)
3010 {
3011 GstQueue2 *queue;
3012
3013 queue = GST_QUEUE2 (parent);
3014
3015 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3016 "received buffer list %p", buffer_list);
3017
3018 return gst_queue2_chain_buffer_or_buffer_list (queue,
3019 GST_MINI_OBJECT_CAST (buffer_list), GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
3020 }
3021
3022 static GstMiniObject *
gst_queue2_dequeue_on_eos(GstQueue2 * queue,GstQueue2ItemType * item_type)3023 gst_queue2_dequeue_on_eos (GstQueue2 * queue, GstQueue2ItemType * item_type)
3024 {
3025 GstMiniObject *data;
3026
3027 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from downstream");
3028
3029 /* stop pushing buffers, we dequeue all items until we see an item that we
3030 * can push again, which is EOS or SEGMENT. If there is nothing in the
3031 * queue we can push, we set a flag to make the sinkpad refuse more
3032 * buffers with an EOS return value until we receive something
3033 * pushable again or we get flushed. */
3034 while ((data = gst_queue2_locked_dequeue (queue, item_type))) {
3035 if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
3036 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3037 "dropping EOS buffer %p", data);
3038 gst_buffer_unref (GST_BUFFER_CAST (data));
3039 } else if (*item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
3040 GstEvent *event = GST_EVENT_CAST (data);
3041 GstEventType type = GST_EVENT_TYPE (event);
3042
3043 if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
3044 || type == GST_EVENT_STREAM_START) {
3045 /* we found a pushable item in the queue, push it out */
3046 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3047 "pushing pushable event %s after EOS", GST_EVENT_TYPE_NAME (event));
3048 return data;
3049 }
3050 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3051 "dropping EOS event %p", event);
3052 gst_event_unref (event);
3053 } else if (*item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
3054 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3055 "dropping EOS buffer list %p", data);
3056 gst_buffer_list_unref (GST_BUFFER_LIST_CAST (data));
3057 } else if (*item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
3058 queue->last_query = FALSE;
3059 g_cond_signal (&queue->query_handled);
3060 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "dropping EOS query %p", data);
3061 }
3062 }
3063 /* no more items in the queue. Set the unexpected flag so that upstream
3064 * make us refuse any more buffers on the sinkpad. Since we will still
3065 * accept EOS and SEGMENT we return _FLOW_OK to the caller so that the
3066 * task function does not shut down. */
3067 queue->unexpected = TRUE;
3068 return NULL;
3069 }
3070
3071 /* dequeue an item from the queue an push it downstream. This functions returns
3072 * the result of the push. */
3073 static GstFlowReturn
gst_queue2_push_one(GstQueue2 * queue)3074 gst_queue2_push_one (GstQueue2 * queue)
3075 {
3076 GstFlowReturn result;
3077 GstMiniObject *data;
3078 GstQueue2ItemType item_type;
3079
3080 data = gst_queue2_locked_dequeue (queue, &item_type);
3081 if (data == NULL)
3082 goto no_item;
3083
3084 next:
3085 result = queue->srcresult;
3086 STATUS (queue, queue->srcpad, "We have something dequeud");
3087 g_atomic_int_set (&queue->downstream_may_block,
3088 item_type == GST_QUEUE2_ITEM_TYPE_BUFFER ||
3089 item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST);
3090 GST_QUEUE2_MUTEX_UNLOCK (queue);
3091 gst_queue2_post_buffering (queue);
3092
3093 if (gst_pad_check_reconfigure (queue->srcpad)) {
3094 /* If the pad was reconfigured, do a new bitrate query */
3095 query_downstream_bitrate (queue);
3096 }
3097
3098 if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER) {
3099 GstBuffer *buffer;
3100
3101 buffer = GST_BUFFER_CAST (data);
3102
3103 result = gst_pad_push (queue->srcpad, buffer);
3104 g_atomic_int_set (&queue->downstream_may_block, 0);
3105
3106 /* need to check for srcresult here as well */
3107 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3108 if (result == GST_FLOW_EOS) {
3109 data = gst_queue2_dequeue_on_eos (queue, &item_type);
3110 if (data != NULL)
3111 goto next;
3112 /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
3113 * to the caller so that the task function does not shut down */
3114 result = GST_FLOW_OK;
3115 }
3116 } else if (item_type == GST_QUEUE2_ITEM_TYPE_EVENT) {
3117 GstEvent *event = GST_EVENT_CAST (data);
3118 GstEventType type = GST_EVENT_TYPE (event);
3119
3120 if (type == GST_EVENT_TAG) {
3121 if (queue->use_tags_bitrate) {
3122 GstTagList *tags;
3123 guint bitrate;
3124
3125 gst_event_parse_tag (event, &tags);
3126 if (gst_tag_list_get_uint (tags, GST_TAG_BITRATE, &bitrate) ||
3127 gst_tag_list_get_uint (tags, GST_TAG_NOMINAL_BITRATE, &bitrate)) {
3128 GST_QUEUE2_MUTEX_LOCK (queue);
3129 queue->src_tags_bitrate = bitrate;
3130 GST_QUEUE2_MUTEX_UNLOCK (queue);
3131 GST_LOG_OBJECT (queue, "src pad bitrate from tags now %u", bitrate);
3132 g_object_notify_by_pspec (G_OBJECT (queue), obj_props[PROP_BITRATE]);
3133 }
3134 }
3135 }
3136
3137 gst_pad_push_event (queue->srcpad, event);
3138
3139 /* if we're EOS, return EOS so that the task pauses. */
3140 if (type == GST_EVENT_EOS) {
3141 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3142 "pushed EOS event %p, return EOS", event);
3143 result = GST_FLOW_EOS;
3144 }
3145
3146 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3147 } else if (item_type == GST_QUEUE2_ITEM_TYPE_BUFFER_LIST) {
3148 GstBufferList *buffer_list;
3149
3150 buffer_list = GST_BUFFER_LIST_CAST (data);
3151
3152 result = gst_pad_push_list (queue->srcpad, buffer_list);
3153 g_atomic_int_set (&queue->downstream_may_block, 0);
3154
3155 /* need to check for srcresult here as well */
3156 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3157 if (result == GST_FLOW_EOS) {
3158 data = gst_queue2_dequeue_on_eos (queue, &item_type);
3159 if (data != NULL)
3160 goto next;
3161 /* Since we will still accept EOS and SEGMENT we return _FLOW_OK
3162 * to the caller so that the task function does not shut down */
3163 result = GST_FLOW_OK;
3164 }
3165 } else if (item_type == GST_QUEUE2_ITEM_TYPE_QUERY) {
3166 GstQuery *query = GST_QUERY_CAST (data);
3167
3168 GST_LOG_OBJECT (queue->srcpad, "Peering query %p", query);
3169 queue->last_handled_query = query;
3170 queue->last_query = gst_pad_peer_query (queue->srcpad, query);
3171 GST_LOG_OBJECT (queue->srcpad, "Peered query");
3172 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3173 "did query %p, return %d", query, queue->last_query);
3174 g_cond_signal (&queue->query_handled);
3175 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3176 result = GST_FLOW_OK;
3177 }
3178 return result;
3179
3180 /* ERRORS */
3181 no_item:
3182 {
3183 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3184 "exit because we have no item in the queue");
3185 return GST_FLOW_ERROR;
3186 }
3187 out_flushing:
3188 {
3189 GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are %s",
3190 gst_flow_get_name (queue->srcresult));
3191 return queue->srcresult;
3192 }
3193 }
3194
3195 /* called repeatedly with @pad as the source pad. This function should push out
3196 * data to the peer element. */
3197 static void
gst_queue2_loop(GstPad * pad)3198 gst_queue2_loop (GstPad * pad)
3199 {
3200 GstQueue2 *queue;
3201 GstFlowReturn ret;
3202
3203 queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
3204
3205 /* have to lock for thread-safety */
3206 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3207
3208 if (gst_queue2_is_empty (queue)) {
3209 gboolean started;
3210
3211 /* pause the timer while we wait. The fact that we are waiting does not mean
3212 * the byterate on the output pad is lower */
3213 if ((started = queue->out_timer_started))
3214 g_timer_stop (queue->out_timer);
3215
3216 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
3217 "queue is empty, waiting for new data");
3218 do {
3219 /* Wait for data to be available, we could be unlocked because of a flush. */
3220 GST_QUEUE2_WAIT_ADD_CHECK (queue, queue->srcresult, out_flushing);
3221 }
3222 while (gst_queue2_is_empty (queue));
3223
3224 /* and continue if we were running before */
3225 if (started)
3226 g_timer_continue (queue->out_timer);
3227 }
3228 ret = gst_queue2_push_one (queue);
3229 queue->srcresult = ret;
3230 queue->sinkresult = ret;
3231 if (ret != GST_FLOW_OK)
3232 goto out_flushing;
3233
3234 GST_QUEUE2_MUTEX_UNLOCK (queue);
3235 gst_queue2_post_buffering (queue);
3236
3237 return;
3238
3239 /* ERRORS */
3240 out_flushing:
3241 {
3242 gboolean eos = queue->is_eos;
3243 GstFlowReturn ret = queue->srcresult;
3244
3245 gst_pad_pause_task (queue->srcpad);
3246 if (ret == GST_FLOW_FLUSHING) {
3247 gst_queue2_locked_flush (queue, FALSE, FALSE);
3248 } else {
3249 GST_QUEUE2_SIGNAL_DEL (queue);
3250 queue->last_query = FALSE;
3251 g_cond_signal (&queue->query_handled);
3252 }
3253 GST_QUEUE2_MUTEX_UNLOCK (queue);
3254 GST_CAT_LOG_OBJECT (queue_dataflow, queue,
3255 "pause task, reason: %s", gst_flow_get_name (queue->srcresult));
3256 /* Recalculate buffering levels before stopping since the source flow
3257 * might cause a different buffering level (like NOT_LINKED making
3258 * the queue appear as full) */
3259 if (queue->use_buffering)
3260 update_buffering (queue);
3261 gst_queue2_post_buffering (queue);
3262 /* let app know about us giving up if upstream is not expected to do so */
3263 /* EOS is already taken care of elsewhere */
3264 if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
3265 GST_ELEMENT_FLOW_ERROR (queue, ret);
3266 gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
3267 }
3268 return;
3269 }
3270 }
3271
3272 static gboolean
gst_queue2_handle_src_event(GstPad * pad,GstObject * parent,GstEvent * event)3273 gst_queue2_handle_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
3274 {
3275 gboolean res = TRUE;
3276 GstQueue2 *queue = GST_QUEUE2 (parent);
3277
3278 #ifndef GST_DISABLE_GST_DEBUG
3279 GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%s)",
3280 event, GST_EVENT_TYPE_NAME (event));
3281 #endif
3282
3283 switch (GST_EVENT_TYPE (event)) {
3284 case GST_EVENT_FLUSH_START:
3285 if (QUEUE_IS_USING_QUEUE (queue)) {
3286 /* just forward upstream */
3287 res = gst_pad_push_event (queue->sinkpad, event);
3288 } else {
3289 /* now unblock the getrange function */
3290 GST_QUEUE2_MUTEX_LOCK (queue);
3291 GST_DEBUG_OBJECT (queue, "flushing");
3292 queue->srcresult = GST_FLOW_FLUSHING;
3293 GST_QUEUE2_SIGNAL_ADD (queue);
3294 GST_QUEUE2_MUTEX_UNLOCK (queue);
3295
3296 /* when using a temp file, we eat the event */
3297 res = TRUE;
3298 gst_event_unref (event);
3299 }
3300 break;
3301 case GST_EVENT_FLUSH_STOP:
3302 if (QUEUE_IS_USING_QUEUE (queue)) {
3303 /* just forward upstream */
3304 res = gst_pad_push_event (queue->sinkpad, event);
3305 } else {
3306 /* now unblock the getrange function */
3307 GST_QUEUE2_MUTEX_LOCK (queue);
3308 queue->srcresult = GST_FLOW_OK;
3309 GST_QUEUE2_MUTEX_UNLOCK (queue);
3310
3311 /* when using a temp file, we eat the event */
3312 res = TRUE;
3313 gst_event_unref (event);
3314 }
3315 break;
3316 case GST_EVENT_RECONFIGURE:
3317 GST_QUEUE2_MUTEX_LOCK (queue);
3318 /* assume downstream is linked now and try to push again */
3319 if (queue->srcresult == GST_FLOW_NOT_LINKED) {
3320 /* Mark the pad as needing reconfiguration, and
3321 * the loop will re-query downstream bitrate
3322 */
3323 gst_pad_mark_reconfigure (pad);
3324 queue->srcresult = GST_FLOW_OK;
3325 queue->sinkresult = GST_FLOW_OK;
3326 if (GST_PAD_MODE (pad) == GST_PAD_MODE_PUSH) {
3327 gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad,
3328 NULL);
3329 }
3330
3331 }
3332 GST_QUEUE2_MUTEX_UNLOCK (queue);
3333
3334 res = gst_pad_push_event (queue->sinkpad, event);
3335 break;
3336 default:
3337 res = gst_pad_push_event (queue->sinkpad, event);
3338 break;
3339 }
3340
3341 return res;
3342 }
3343
3344 static gboolean
gst_queue2_handle_src_query(GstPad * pad,GstObject * parent,GstQuery * query)3345 gst_queue2_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
3346 {
3347 GstQueue2 *queue;
3348
3349 queue = GST_QUEUE2 (parent);
3350
3351 switch (GST_QUERY_TYPE (query)) {
3352 case GST_QUERY_POSITION:
3353 {
3354 gint64 peer_pos;
3355 GstFormat format;
3356
3357 if (!gst_pad_peer_query (queue->sinkpad, query))
3358 goto peer_failed;
3359
3360 /* get peer position */
3361 gst_query_parse_position (query, &format, &peer_pos);
3362
3363 /* FIXME: this code assumes that there's no discont in the queue */
3364 switch (format) {
3365 case GST_FORMAT_BYTES:
3366 peer_pos -= queue->cur_level.bytes;
3367 if (peer_pos < 0) /* Clamp result to 0 */
3368 peer_pos = 0;
3369 break;
3370 case GST_FORMAT_TIME:
3371 peer_pos -= queue->cur_level.time;
3372 if (peer_pos < 0) /* Clamp result to 0 */
3373 peer_pos = 0;
3374 break;
3375 default:
3376 GST_WARNING_OBJECT (queue, "dropping query in %s format, don't "
3377 "know how to adjust value", gst_format_get_name (format));
3378 return FALSE;
3379 }
3380 /* set updated position */
3381 gst_query_set_position (query, format, peer_pos);
3382 break;
3383 }
3384 case GST_QUERY_DURATION:
3385 {
3386 GST_DEBUG_OBJECT (queue, "doing peer query");
3387
3388 if (!gst_pad_peer_query (queue->sinkpad, query))
3389 goto peer_failed;
3390
3391 GST_DEBUG_OBJECT (queue, "peer query success");
3392 break;
3393 }
3394 case GST_QUERY_BUFFERING:
3395 {
3396 gint percent;
3397 gboolean is_buffering;
3398 GstBufferingMode mode;
3399 gint avg_in, avg_out;
3400 gint64 buffering_left;
3401
3402 GST_DEBUG_OBJECT (queue, "query buffering");
3403
3404 get_buffering_level (queue, &is_buffering, &percent);
3405 percent = convert_to_buffering_percent (queue, percent);
3406 gst_query_set_buffering_percent (query, is_buffering, percent);
3407
3408 get_buffering_stats (queue, percent, &mode, &avg_in, &avg_out,
3409 &buffering_left);
3410 gst_query_set_buffering_stats (query, mode, avg_in, avg_out,
3411 buffering_left);
3412
3413 if (!QUEUE_IS_USING_QUEUE (queue)) {
3414 /* add ranges for download and ringbuffer buffering */
3415 GstFormat format;
3416 gint64 start, stop, range_start, range_stop;
3417 guint64 writing_pos;
3418 gint64 estimated_total;
3419 gint64 duration;
3420 gboolean peer_res, is_eos;
3421 GstQueue2Range *queued_ranges;
3422
3423 /* we need a current download region */
3424 if (queue->current == NULL)
3425 return FALSE;
3426
3427 writing_pos = queue->current->writing_pos;
3428 is_eos = queue->is_eos;
3429
3430 if (is_eos) {
3431 /* we're EOS, we know the duration in bytes now */
3432 peer_res = TRUE;
3433 duration = writing_pos;
3434 } else {
3435 /* get duration of upstream in bytes */
3436 peer_res = gst_pad_peer_query_duration (queue->sinkpad,
3437 GST_FORMAT_BYTES, &duration);
3438 }
3439
3440 GST_DEBUG_OBJECT (queue, "percent %d, duration %" G_GINT64_FORMAT
3441 ", writing %" G_GINT64_FORMAT, percent, duration, writing_pos);
3442
3443 /* calculate remaining and total download time */
3444 if (peer_res && avg_in > 0.0)
3445 estimated_total = ((duration - writing_pos) * 1000) / avg_in;
3446 else
3447 estimated_total = -1;
3448
3449 GST_DEBUG_OBJECT (queue, "estimated-total %" G_GINT64_FORMAT,
3450 estimated_total);
3451
3452 gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
3453
3454 switch (format) {
3455 case GST_FORMAT_PERCENT:
3456 /* we need duration */
3457 if (!peer_res)
3458 goto peer_failed;
3459
3460 start = 0;
3461 /* get our available data relative to the duration */
3462 if (duration != -1)
3463 stop =
3464 gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, writing_pos,
3465 duration);
3466 else
3467 stop = -1;
3468 break;
3469 case GST_FORMAT_BYTES:
3470 start = 0;
3471 stop = writing_pos;
3472 break;
3473 default:
3474 start = -1;
3475 stop = -1;
3476 break;
3477 }
3478
3479 /* fill out the buffered ranges */
3480 for (queued_ranges = queue->ranges; queued_ranges;
3481 queued_ranges = queued_ranges->next) {
3482 switch (format) {
3483 case GST_FORMAT_PERCENT:
3484 if (duration == -1) {
3485 range_start = 0;
3486 range_stop = 0;
3487 break;
3488 }
3489 range_start =
3490 gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3491 queued_ranges->offset, duration);
3492 range_stop =
3493 gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX,
3494 queued_ranges->writing_pos, duration);
3495 break;
3496 case GST_FORMAT_BYTES:
3497 range_start = queued_ranges->offset;
3498 range_stop = queued_ranges->writing_pos;
3499 break;
3500 default:
3501 range_start = -1;
3502 range_stop = -1;
3503 break;
3504 }
3505 if (range_start == range_stop)
3506 continue;
3507 GST_DEBUG_OBJECT (queue,
3508 "range starting at %" G_GINT64_FORMAT " and finishing at %"
3509 G_GINT64_FORMAT, range_start, range_stop);
3510 gst_query_add_buffering_range (query, range_start, range_stop);
3511 }
3512
3513 gst_query_set_buffering_range (query, format, start, stop,
3514 estimated_total);
3515 }
3516 break;
3517 }
3518 case GST_QUERY_SCHEDULING:
3519 {
3520 gboolean pull_mode;
3521 GstSchedulingFlags flags = 0;
3522 GstQuery *upstream;
3523
3524 upstream = gst_query_new_scheduling ();
3525 if (!gst_pad_peer_query (queue->sinkpad, upstream)) {
3526 gst_query_unref (upstream);
3527 goto peer_failed;
3528 }
3529
3530 gst_query_parse_scheduling (upstream, &flags, NULL, NULL, NULL);
3531 gst_query_unref (upstream);
3532
3533 /* we can operate in pull mode when we are using a tempfile */
3534 pull_mode = !QUEUE_IS_USING_QUEUE (queue);
3535
3536 if (pull_mode)
3537 flags |= GST_SCHEDULING_FLAG_SEEKABLE;
3538 gst_query_set_scheduling (query, flags, 0, -1, 0);
3539 if (pull_mode)
3540 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
3541 gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
3542 break;
3543 }
3544 default:
3545 /* peer handled other queries */
3546 if (!gst_pad_query_default (pad, parent, query))
3547 goto peer_failed;
3548 break;
3549 }
3550
3551 return TRUE;
3552
3553 /* ERRORS */
3554 peer_failed:
3555 {
3556 GST_DEBUG_OBJECT (queue, "failed peer query");
3557 return FALSE;
3558 }
3559 }
3560
3561 static gboolean
gst_queue2_handle_query(GstElement * element,GstQuery * query)3562 gst_queue2_handle_query (GstElement * element, GstQuery * query)
3563 {
3564 GstQueue2 *queue = GST_QUEUE2 (element);
3565
3566 /* simply forward to the srcpad query function */
3567 return gst_queue2_handle_src_query (queue->srcpad, GST_OBJECT_CAST (element),
3568 query);
3569 }
3570
3571 static void
gst_queue2_update_upstream_size(GstQueue2 * queue)3572 gst_queue2_update_upstream_size (GstQueue2 * queue)
3573 {
3574 gint64 upstream_size = -1;
3575
3576 if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
3577 &upstream_size)) {
3578 GST_INFO_OBJECT (queue, "upstream size: %" G_GINT64_FORMAT, upstream_size);
3579
3580 /* upstream_size can be negative but queue->upstream_size is unsigned.
3581 * Prevent setting negative values to it (the query can return -1) */
3582 if (upstream_size >= 0)
3583 queue->upstream_size = upstream_size;
3584 else
3585 queue->upstream_size = 0;
3586 }
3587 }
3588
3589 static GstFlowReturn
gst_queue2_get_range(GstPad * pad,GstObject * parent,guint64 offset,guint length,GstBuffer ** buffer)3590 gst_queue2_get_range (GstPad * pad, GstObject * parent, guint64 offset,
3591 guint length, GstBuffer ** buffer)
3592 {
3593 GstQueue2 *queue;
3594 GstFlowReturn ret;
3595
3596 queue = GST_QUEUE2_CAST (parent);
3597
3598 length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
3599 GST_QUEUE2_MUTEX_LOCK_CHECK (queue, queue->srcresult, out_flushing);
3600 offset = (offset == -1) ? queue->current->reading_pos : offset;
3601
3602 GST_DEBUG_OBJECT (queue,
3603 "Getting range: offset %" G_GUINT64_FORMAT ", length %u", offset, length);
3604
3605 /* catch any reads beyond the size of the file here to make sure queue2
3606 * doesn't send seek events beyond the size of the file upstream, since
3607 * that would confuse elements such as souphttpsrc and/or http servers.
3608 * Demuxers often just loop until EOS at the end of the file to figure out
3609 * when they've read all the end-headers or index chunks. */
3610 if (G_UNLIKELY (offset >= queue->upstream_size)) {
3611 gst_queue2_update_upstream_size (queue);
3612 if (queue->upstream_size > 0 && offset >= queue->upstream_size)
3613 goto out_unexpected;
3614 }
3615
3616 if (G_UNLIKELY (offset + length > queue->upstream_size)) {
3617 gst_queue2_update_upstream_size (queue);
3618 if (queue->upstream_size > 0 && offset + length >= queue->upstream_size) {
3619 length = queue->upstream_size - offset;
3620 GST_DEBUG_OBJECT (queue, "adjusting length downto %d", length);
3621 }
3622 }
3623
3624 /* FIXME - function will block when the range is not yet available */
3625 ret = gst_queue2_create_read (queue, offset, length, buffer);
3626 GST_QUEUE2_MUTEX_UNLOCK (queue);
3627 gst_queue2_post_buffering (queue);
3628
3629 return ret;
3630
3631 /* ERRORS */
3632 out_flushing:
3633 {
3634 ret = queue->srcresult;
3635
3636 GST_DEBUG_OBJECT (queue, "we are %s", gst_flow_get_name (ret));
3637 GST_QUEUE2_MUTEX_UNLOCK (queue);
3638 return ret;
3639 }
3640 out_unexpected:
3641 {
3642 GST_DEBUG_OBJECT (queue, "read beyond end of file");
3643 GST_QUEUE2_MUTEX_UNLOCK (queue);
3644 return GST_FLOW_EOS;
3645 }
3646 }
3647
3648 /* sink currently only operates in push mode */
3649 static gboolean
gst_queue2_sink_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)3650 gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
3651 GstPadMode mode, gboolean active)
3652 {
3653 gboolean result;
3654 GstQueue2 *queue;
3655
3656 queue = GST_QUEUE2 (parent);
3657
3658 switch (mode) {
3659 case GST_PAD_MODE_PUSH:
3660 if (active) {
3661 GST_QUEUE2_MUTEX_LOCK (queue);
3662 GST_DEBUG_OBJECT (queue, "activating push mode");
3663 queue->srcresult = GST_FLOW_OK;
3664 queue->sinkresult = GST_FLOW_OK;
3665 queue->is_eos = FALSE;
3666 queue->unexpected = FALSE;
3667 reset_rate_timer (queue);
3668 GST_QUEUE2_MUTEX_UNLOCK (queue);
3669 } else {
3670 /* unblock chain function */
3671 GST_QUEUE2_MUTEX_LOCK (queue);
3672 GST_DEBUG_OBJECT (queue, "deactivating push mode");
3673 queue->srcresult = GST_FLOW_FLUSHING;
3674 queue->sinkresult = GST_FLOW_FLUSHING;
3675 GST_QUEUE2_SIGNAL_DEL (queue);
3676 GST_QUEUE2_MUTEX_UNLOCK (queue);
3677
3678 /* wait until it is unblocked and clean up */
3679 GST_PAD_STREAM_LOCK (pad);
3680 GST_QUEUE2_MUTEX_LOCK (queue);
3681 gst_queue2_locked_flush (queue, TRUE, FALSE);
3682 GST_QUEUE2_MUTEX_UNLOCK (queue);
3683 GST_PAD_STREAM_UNLOCK (pad);
3684 }
3685 result = TRUE;
3686 break;
3687 default:
3688 result = FALSE;
3689 break;
3690 }
3691 return result;
3692 }
3693
3694 /* src operating in push mode, we start a task on the source pad that pushes out
3695 * buffers from the queue */
3696 static gboolean
gst_queue2_src_activate_push(GstPad * pad,GstObject * parent,gboolean active)3697 gst_queue2_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3698 {
3699 gboolean result = FALSE;
3700 GstQueue2 *queue;
3701
3702 queue = GST_QUEUE2 (parent);
3703
3704 if (active) {
3705 GST_QUEUE2_MUTEX_LOCK (queue);
3706 GST_DEBUG_OBJECT (queue, "activating push mode");
3707 queue->srcresult = GST_FLOW_OK;
3708 queue->sinkresult = GST_FLOW_OK;
3709 queue->is_eos = FALSE;
3710 queue->unexpected = FALSE;
3711 result =
3712 gst_pad_start_task (pad, (GstTaskFunction) gst_queue2_loop, pad, NULL);
3713 GST_QUEUE2_MUTEX_UNLOCK (queue);
3714 } else {
3715 /* unblock loop function */
3716 GST_QUEUE2_MUTEX_LOCK (queue);
3717 GST_DEBUG_OBJECT (queue, "deactivating push mode");
3718 queue->srcresult = GST_FLOW_FLUSHING;
3719 queue->sinkresult = GST_FLOW_FLUSHING;
3720 /* the item add signal will unblock */
3721 GST_QUEUE2_SIGNAL_ADD (queue);
3722 GST_QUEUE2_MUTEX_UNLOCK (queue);
3723
3724 /* step 2, make sure streaming finishes */
3725 result = gst_pad_stop_task (pad);
3726
3727 GST_QUEUE2_MUTEX_LOCK (queue);
3728 gst_queue2_locked_flush (queue, FALSE, FALSE);
3729 GST_QUEUE2_MUTEX_UNLOCK (queue);
3730 }
3731
3732 return result;
3733 }
3734
3735 /* pull mode, downstream will call our getrange function */
3736 static gboolean
gst_queue2_src_activate_pull(GstPad * pad,GstObject * parent,gboolean active)3737 gst_queue2_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3738 {
3739 gboolean result;
3740 GstQueue2 *queue;
3741
3742 queue = GST_QUEUE2 (parent);
3743
3744 if (active) {
3745 GST_QUEUE2_MUTEX_LOCK (queue);
3746 if (!QUEUE_IS_USING_QUEUE (queue)) {
3747 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3748 /* open the temp file now */
3749 result = gst_queue2_open_temp_location_file (queue);
3750 } else if (!queue->ring_buffer) {
3751 queue->ring_buffer = g_malloc (queue->ring_buffer_max_size);
3752 result = ! !queue->ring_buffer;
3753 } else {
3754 result = TRUE;
3755 }
3756
3757 GST_DEBUG_OBJECT (queue, "activating pull mode");
3758 init_ranges (queue);
3759 queue->srcresult = GST_FLOW_OK;
3760 queue->sinkresult = GST_FLOW_OK;
3761 queue->is_eos = FALSE;
3762 queue->unexpected = FALSE;
3763 queue->upstream_size = 0;
3764 } else {
3765 GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
3766 /* this is not allowed, we cannot operate in pull mode without a temp
3767 * file. */
3768 queue->srcresult = GST_FLOW_FLUSHING;
3769 queue->sinkresult = GST_FLOW_FLUSHING;
3770 result = FALSE;
3771 }
3772 GST_QUEUE2_MUTEX_UNLOCK (queue);
3773 } else {
3774 GST_QUEUE2_MUTEX_LOCK (queue);
3775 GST_DEBUG_OBJECT (queue, "deactivating pull mode");
3776 queue->srcresult = GST_FLOW_FLUSHING;
3777 queue->sinkresult = GST_FLOW_FLUSHING;
3778 /* this will unlock getrange */
3779 GST_QUEUE2_SIGNAL_ADD (queue);
3780 result = TRUE;
3781 GST_QUEUE2_MUTEX_UNLOCK (queue);
3782 }
3783
3784 return result;
3785 }
3786
3787 static gboolean
gst_queue2_src_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)3788 gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
3789 gboolean active)
3790 {
3791 gboolean res;
3792
3793 switch (mode) {
3794 case GST_PAD_MODE_PULL:
3795 res = gst_queue2_src_activate_pull (pad, parent, active);
3796 break;
3797 case GST_PAD_MODE_PUSH:
3798 res = gst_queue2_src_activate_push (pad, parent, active);
3799 break;
3800 default:
3801 GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3802 res = FALSE;
3803 break;
3804 }
3805 return res;
3806 }
3807
3808 static GstStateChangeReturn
gst_queue2_change_state(GstElement * element,GstStateChange transition)3809 gst_queue2_change_state (GstElement * element, GstStateChange transition)
3810 {
3811 GstQueue2 *queue;
3812 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
3813
3814 queue = GST_QUEUE2 (element);
3815
3816 switch (transition) {
3817 case GST_STATE_CHANGE_NULL_TO_READY:
3818 break;
3819 case GST_STATE_CHANGE_READY_TO_PAUSED:
3820 GST_QUEUE2_MUTEX_LOCK (queue);
3821 if (!QUEUE_IS_USING_QUEUE (queue)) {
3822 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3823 if (!gst_queue2_open_temp_location_file (queue))
3824 ret = GST_STATE_CHANGE_FAILURE;
3825 } else {
3826 if (queue->ring_buffer) {
3827 g_free (queue->ring_buffer);
3828 queue->ring_buffer = NULL;
3829 }
3830 if (!(queue->ring_buffer = g_malloc (queue->ring_buffer_max_size)))
3831 ret = GST_STATE_CHANGE_FAILURE;
3832 }
3833 init_ranges (queue);
3834 }
3835 queue->segment_event_received = FALSE;
3836 queue->starting_segment = NULL;
3837 gst_event_replace (&queue->stream_start_event, NULL);
3838 GST_QUEUE2_MUTEX_UNLOCK (queue);
3839
3840 /* Mark the srcpad as reconfigured to trigger querying
3841 * the downstream bitrate next time it tries to push */
3842 gst_pad_mark_reconfigure (queue->srcpad);
3843 break;
3844 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3845 break;
3846 default:
3847 break;
3848 }
3849
3850 if (ret == GST_STATE_CHANGE_FAILURE)
3851 return ret;
3852
3853 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3854
3855 if (ret == GST_STATE_CHANGE_FAILURE)
3856 return ret;
3857
3858 switch (transition) {
3859 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3860 break;
3861 case GST_STATE_CHANGE_PAUSED_TO_READY:
3862 GST_QUEUE2_MUTEX_LOCK (queue);
3863 if (!QUEUE_IS_USING_QUEUE (queue)) {
3864 if (QUEUE_IS_USING_TEMP_FILE (queue)) {
3865 gst_queue2_close_temp_location_file (queue);
3866 } else if (queue->ring_buffer) {
3867 g_free (queue->ring_buffer);
3868 queue->ring_buffer = NULL;
3869 }
3870 clean_ranges (queue);
3871 }
3872 if (queue->starting_segment != NULL) {
3873 gst_event_unref (queue->starting_segment);
3874 queue->starting_segment = NULL;
3875 }
3876 gst_event_replace (&queue->stream_start_event, NULL);
3877 GST_QUEUE2_MUTEX_UNLOCK (queue);
3878 break;
3879 case GST_STATE_CHANGE_READY_TO_NULL:
3880 break;
3881 default:
3882 break;
3883 }
3884
3885 return ret;
3886 }
3887
3888 /* changing the capacity of the queue must wake up
3889 * the _chain function, it might have more room now
3890 * to store the buffer/event in the queue */
3891 #define QUEUE_CAPACITY_CHANGE(q) \
3892 GST_QUEUE2_SIGNAL_DEL (queue); \
3893 if (queue->use_buffering) \
3894 update_buffering (queue);
3895
3896 /* Changing the minimum required fill level must
3897 * wake up the _loop function as it might now
3898 * be able to preceed.
3899 */
3900 #define QUEUE_THRESHOLD_CHANGE(q)\
3901 GST_QUEUE2_SIGNAL_ADD (queue);
3902
3903 static void
gst_queue2_set_temp_template(GstQueue2 * queue,const gchar * template)3904 gst_queue2_set_temp_template (GstQueue2 * queue, const gchar * template)
3905 {
3906 GstState state;
3907
3908 /* the element must be stopped in order to do this */
3909 GST_OBJECT_LOCK (queue);
3910 state = GST_STATE (queue);
3911 if (state != GST_STATE_READY && state != GST_STATE_NULL)
3912 goto wrong_state;
3913 GST_OBJECT_UNLOCK (queue);
3914
3915 /* set new location */
3916 g_free (queue->temp_template);
3917 queue->temp_template = g_strdup (template);
3918
3919 return;
3920
3921 /* ERROR */
3922 wrong_state:
3923 {
3924 GST_WARNING_OBJECT (queue, "setting temp-template property in wrong state");
3925 GST_OBJECT_UNLOCK (queue);
3926 }
3927 }
3928
3929 static void
gst_queue2_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)3930 gst_queue2_set_property (GObject * object,
3931 guint prop_id, const GValue * value, GParamSpec * pspec)
3932 {
3933 GstQueue2 *queue = GST_QUEUE2 (object);
3934
3935 /* someone could change levels here, and since this
3936 * affects the get/put funcs, we need to lock for safety. */
3937 GST_QUEUE2_MUTEX_LOCK (queue);
3938
3939 switch (prop_id) {
3940 case PROP_MAX_SIZE_BYTES:
3941 queue->max_level.bytes = g_value_get_uint (value);
3942 QUEUE_CAPACITY_CHANGE (queue);
3943 break;
3944 case PROP_MAX_SIZE_BUFFERS:
3945 queue->max_level.buffers = g_value_get_uint (value);
3946 QUEUE_CAPACITY_CHANGE (queue);
3947 break;
3948 case PROP_MAX_SIZE_TIME:
3949 queue->max_level.time = g_value_get_uint64 (value);
3950 /* set rate_time to the same value. We use an extra field in the level
3951 * structure so that we can easily access and compare it */
3952 queue->max_level.rate_time = queue->max_level.time;
3953 QUEUE_CAPACITY_CHANGE (queue);
3954 break;
3955 case PROP_USE_BUFFERING:
3956 queue->use_buffering = g_value_get_boolean (value);
3957 if (!queue->use_buffering && queue->is_buffering) {
3958 GST_DEBUG_OBJECT (queue, "Disabled buffering while buffering, "
3959 "posting 100%% message");
3960 SET_PERCENT (queue, 100);
3961 queue->is_buffering = FALSE;
3962 }
3963
3964 if (queue->use_buffering) {
3965 queue->is_buffering = TRUE;
3966 update_buffering (queue);
3967 }
3968 break;
3969 case PROP_USE_TAGS_BITRATE:
3970 queue->use_tags_bitrate = g_value_get_boolean (value);
3971 break;
3972 case PROP_USE_RATE_ESTIMATE:
3973 queue->use_rate_estimate = g_value_get_boolean (value);
3974 break;
3975 case PROP_LOW_PERCENT:
3976 queue->low_watermark = g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
3977 if (queue->is_buffering)
3978 update_buffering (queue);
3979 break;
3980 case PROP_HIGH_PERCENT:
3981 queue->high_watermark =
3982 g_value_get_int (value) * BUF_LEVEL_PERCENT_FACTOR;
3983 if (queue->is_buffering)
3984 update_buffering (queue);
3985 break;
3986 case PROP_LOW_WATERMARK:
3987 queue->low_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
3988 if (queue->is_buffering)
3989 update_buffering (queue);
3990 break;
3991 case PROP_HIGH_WATERMARK:
3992 queue->high_watermark = g_value_get_double (value) * MAX_BUFFERING_LEVEL;
3993 if (queue->is_buffering)
3994 update_buffering (queue);
3995 break;
3996 case PROP_TEMP_TEMPLATE:
3997 gst_queue2_set_temp_template (queue, g_value_get_string (value));
3998 break;
3999 case PROP_TEMP_REMOVE:
4000 queue->temp_remove = g_value_get_boolean (value);
4001 break;
4002 case PROP_RING_BUFFER_MAX_SIZE:
4003 queue->ring_buffer_max_size = g_value_get_uint64 (value);
4004 break;
4005 case PROP_USE_BITRATE_QUERY:
4006 queue->use_bitrate_query = g_value_get_boolean (value);
4007 break;
4008 default:
4009 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4010 break;
4011 }
4012
4013 GST_QUEUE2_MUTEX_UNLOCK (queue);
4014 gst_queue2_post_buffering (queue);
4015 }
4016
4017 static void
gst_queue2_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)4018 gst_queue2_get_property (GObject * object,
4019 guint prop_id, GValue * value, GParamSpec * pspec)
4020 {
4021 GstQueue2 *queue = GST_QUEUE2 (object);
4022
4023 GST_QUEUE2_MUTEX_LOCK (queue);
4024
4025 switch (prop_id) {
4026 case PROP_CUR_LEVEL_BYTES:
4027 g_value_set_uint (value, queue->cur_level.bytes);
4028 break;
4029 case PROP_CUR_LEVEL_BUFFERS:
4030 g_value_set_uint (value, queue->cur_level.buffers);
4031 break;
4032 case PROP_CUR_LEVEL_TIME:
4033 g_value_set_uint64 (value, queue->cur_level.time);
4034 break;
4035 case PROP_MAX_SIZE_BYTES:
4036 g_value_set_uint (value, queue->max_level.bytes);
4037 break;
4038 case PROP_MAX_SIZE_BUFFERS:
4039 g_value_set_uint (value, queue->max_level.buffers);
4040 break;
4041 case PROP_MAX_SIZE_TIME:
4042 g_value_set_uint64 (value, queue->max_level.time);
4043 break;
4044 case PROP_USE_BUFFERING:
4045 g_value_set_boolean (value, queue->use_buffering);
4046 break;
4047 case PROP_USE_TAGS_BITRATE:
4048 g_value_set_boolean (value, queue->use_tags_bitrate);
4049 break;
4050 case PROP_USE_RATE_ESTIMATE:
4051 g_value_set_boolean (value, queue->use_rate_estimate);
4052 break;
4053 case PROP_LOW_PERCENT:
4054 g_value_set_int (value, queue->low_watermark / BUF_LEVEL_PERCENT_FACTOR);
4055 break;
4056 case PROP_HIGH_PERCENT:
4057 g_value_set_int (value, queue->high_watermark / BUF_LEVEL_PERCENT_FACTOR);
4058 break;
4059 case PROP_LOW_WATERMARK:
4060 g_value_set_double (value, queue->low_watermark /
4061 (gdouble) MAX_BUFFERING_LEVEL);
4062 break;
4063 case PROP_HIGH_WATERMARK:
4064 g_value_set_double (value, queue->high_watermark /
4065 (gdouble) MAX_BUFFERING_LEVEL);
4066 break;
4067 case PROP_TEMP_TEMPLATE:
4068 g_value_set_string (value, queue->temp_template);
4069 break;
4070 case PROP_TEMP_LOCATION:
4071 g_value_set_string (value, queue->temp_location);
4072 break;
4073 case PROP_TEMP_REMOVE:
4074 g_value_set_boolean (value, queue->temp_remove);
4075 break;
4076 case PROP_RING_BUFFER_MAX_SIZE:
4077 g_value_set_uint64 (value, queue->ring_buffer_max_size);
4078 break;
4079 case PROP_AVG_IN_RATE:
4080 {
4081 gdouble in_rate = queue->byte_in_rate;
4082
4083 /* During the first RATE_INTERVAL, byte_in_rate will not have been
4084 * calculated, so calculate it here. */
4085 if (in_rate == 0.0 && queue->bytes_in
4086 && queue->last_update_in_rates_elapsed > 0.0)
4087 in_rate = queue->bytes_in / queue->last_update_in_rates_elapsed;
4088
4089 g_value_set_int64 (value, (gint64) in_rate);
4090 break;
4091 }
4092 case PROP_USE_BITRATE_QUERY:
4093 g_value_set_boolean (value, queue->use_bitrate_query);
4094 break;
4095 case PROP_BITRATE:{
4096 guint64 bitrate = 0;
4097 if (bitrate == 0 && queue->use_tags_bitrate) {
4098 if (queue->sink_tags_bitrate > 0)
4099 bitrate = queue->sink_tags_bitrate;
4100 else if (queue->src_tags_bitrate)
4101 bitrate = queue->src_tags_bitrate;
4102 }
4103 if (bitrate == 0 && queue->use_bitrate_query) {
4104 bitrate = queue->downstream_bitrate;
4105 }
4106 g_value_set_uint64 (value, (guint64) bitrate);
4107 break;
4108 }
4109 default:
4110 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4111 break;
4112 }
4113
4114 GST_QUEUE2_MUTEX_UNLOCK (queue);
4115 }
4116