• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) <2015> Jan Schmidt <jan@centricular.com>
3  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 
21 /**
22  * SECTION:element-urisourcebin
23  * @title: urisourcebin
24  *
25  * urisourcebin is an element for accessing URIs in a uniform manner.
26  *
27  * It handles selecting a URI source element and potentially download
28  * buffering for network sources. It produces one or more source pads,
29  * depending on the input source, for feeding to decoding chains or decodebin.
30  *
31  * The main configuration is via the #GstURISourceBin:uri property.
32  *
33  * > urisourcebin is still experimental API and a technology preview.
34  * > Its behaviour and exposed API is subject to change.
35  */
36 
37 /* FIXME 0.11: suppress warnings for deprecated API such as GValueArray
38  * with newer GLib versions (>= 2.31.0) */
39 #define GLIB_DISABLE_DEPRECATION_WARNINGS
40 
41 #ifdef HAVE_CONFIG_H
42 #  include "config.h"
43 #endif
44 
45 #include <string.h>
46 
47 #include <gst/gst.h>
48 #include <gst/gst-i18n-plugin.h>
49 #include <gst/pbutils/missing-plugins.h>
50 
51 #include "gstplay-enum.h"
52 #include "gstrawcaps.h"
53 #include "gstplaybackelements.h"
54 #include "gstplaybackutils.h"
55 
56 #define GST_TYPE_URI_SOURCE_BIN \
57   (gst_uri_source_bin_get_type())
58 #define GST_URI_SOURCE_BIN(obj) \
59   (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_URI_SOURCE_BIN,GstURISourceBin))
60 #define GST_URI_SOURCE_BIN_CLASS(klass) \
61   (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_URI_SOURCE_BIN,GstURISourceBinClass))
62 #define GST_IS_URI_SOURCE_BIN(obj) \
63   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_URI_SOURCE_BIN))
64 #define GST_IS_URI_SOURCE_BIN_CLASS(klass) \
65   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_URI_SOURCE_BIN))
66 #define GST_URI_SOURCE_BIN_CAST(obj) ((GstURISourceBin *) (obj))
67 
68 typedef struct _GstURISourceBin GstURISourceBin;
69 typedef struct _GstURISourceBinClass GstURISourceBinClass;
70 typedef struct _ChildSrcPadInfo ChildSrcPadInfo;
71 typedef struct _OutputSlotInfo OutputSlotInfo;
72 
73 #define GST_URI_SOURCE_BIN_LOCK(urisrc) (g_mutex_lock(&((GstURISourceBin*)(urisrc))->lock))
74 #define GST_URI_SOURCE_BIN_UNLOCK(urisrc) (g_mutex_unlock(&((GstURISourceBin*)(urisrc))->lock))
75 
76 #define BUFFERING_LOCK(ubin) G_STMT_START {				\
77     GST_LOG_OBJECT (ubin,						\
78 		    "buffering locking from thread %p",			\
79 		    g_thread_self ());					\
80     g_mutex_lock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock);		\
81     GST_LOG_OBJECT (ubin,						\
82 		    "buffering lock from thread %p",			\
83 		    g_thread_self ());					\
84 } G_STMT_END
85 
86 #define BUFFERING_UNLOCK(ubin) G_STMT_START {				\
87     GST_LOG_OBJECT (ubin,						\
88 		    "buffering unlocking from thread %p",		\
89 		    g_thread_self ());					\
90     g_mutex_unlock (&GST_URI_SOURCE_BIN_CAST(ubin)->buffering_lock);		\
91 } G_STMT_END
92 
93 /* Track a source pad from a child that
94  * is linked or needs linking to an output
95  * slot, or source pads that are directly
96  * exposed as ghost pads */
97 struct _ChildSrcPadInfo
98 {
99   guint blocking_probe_id;
100   guint event_probe_id;
101 
102   /* Source pad this info is attached to (not reffed, since
103    * the pad owns the ChildSrcPadInfo as qdata */
104   GstPad *src_pad;
105   GstCaps *cur_caps;            /* holds ref */
106 
107   /* Configured output slot, if any */
108   OutputSlotInfo *output_slot;
109 
110   /* If this info is for a directly exposed pad,
111    * rather than linked through a slot it's here: */
112   GstPad *output_pad;
113 };
114 
115 struct _OutputSlotInfo
116 {
117   ChildSrcPadInfo *linked_info; /* demux source pad info feeding this slot, if any */
118   GstElement *queue;            /* queue2 or downloadbuffer */
119   GstPad *sinkpad;              /* Sink pad of the queue eleemnt */
120   GstPad *srcpad;               /* Output ghost pad */
121   gboolean is_eos;              /* Did EOS get fed into the buffering element */
122 
123   gulong bitrate_changed_id;    /* queue bitrate changed notification */
124 };
125 
126 /**
127  * GstURISourceBin
128  *
129  * urisourcebin element struct
130  */
131 struct _GstURISourceBin
132 {
133   GstBin parent_instance;
134 
135   GMutex lock;                  /* lock for constructing */
136 
137   GMutex factories_lock;
138   guint32 factories_cookie;
139   GList *factories;             /* factories we can use for selecting elements */
140 
141   gchar *uri;
142   guint64 connection_speed;
143 
144   gboolean is_stream;
145   gboolean is_adaptive;
146   gboolean need_queue;
147   guint64 buffer_duration;      /* When buffering, buffer duration (ns) */
148   guint buffer_size;            /* When buffering, buffer size (bytes) */
149   gboolean download;
150   gboolean use_buffering;
151   gdouble low_watermark;
152   gdouble high_watermark;
153 
154   GstElement *source;
155   GList *typefinds;             /* list of typefind element */
156 
157   GstElement *demuxer;          /* Adaptive demuxer if any */
158   GSList *out_slots;
159 
160   guint numpads;
161 
162   /* for dynamic sources */
163   guint src_np_sig_id;          /* new-pad signal id */
164 
165   guint64 ring_buffer_max_size; /* 0 means disabled */
166 
167   GList *pending_pads;          /* Pads we have blocked pending assignment
168                                    to an output source pad */
169 
170   GList *buffering_status;      /* element currently buffering messages */
171   gint last_buffering_pct;      /* Avoid sending buffering over and over */
172   GMutex buffering_lock;
173   GMutex buffering_post_lock;
174 };
175 
176 struct _GstURISourceBinClass
177 {
178   GstBinClass parent_class;
179 
180   /* emitted when all data has been drained out
181    * FIXME : What do we need this for ?? */
182   void (*drained) (GstElement * element);
183   /* emitted when all data has been fed into buffering slots (i.e the
184    * actual sources are done) */
185   void (*about_to_finish) (GstElement * element);
186 };
187 
188 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
189     GST_PAD_SRC,
190     GST_PAD_SOMETIMES,
191     GST_STATIC_CAPS_ANY);
192 
193 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
194 
195 GST_DEBUG_CATEGORY_STATIC (gst_uri_source_bin_debug);
196 #define GST_CAT_DEFAULT gst_uri_source_bin_debug
197 
198 /* signals */
199 enum
200 {
201   SIGNAL_DRAINED,
202   SIGNAL_ABOUT_TO_FINISH,
203   SIGNAL_SOURCE_SETUP,
204   LAST_SIGNAL
205 };
206 
207 /* properties */
208 #define DEFAULT_PROP_URI            NULL
209 #define DEFAULT_PROP_SOURCE         NULL
210 #define DEFAULT_CONNECTION_SPEED    0
211 #define DEFAULT_BUFFER_DURATION     -1
212 #define DEFAULT_BUFFER_SIZE         -1
213 #define DEFAULT_DOWNLOAD            FALSE
214 #define DEFAULT_USE_BUFFERING       TRUE
215 #define DEFAULT_RING_BUFFER_MAX_SIZE 0
216 #define DEFAULT_LOW_WATERMARK       0.01
217 #define DEFAULT_HIGH_WATERMARK      0.99
218 
219 #define ACTUAL_DEFAULT_BUFFER_SIZE  10 * 1024 * 1024    /* The value used for byte limits when buffer-size == -1 */
220 #define ACTUAL_DEFAULT_BUFFER_DURATION  5 * GST_SECOND  /* The value used for time limits when buffer-duration == -1 */
221 
222 #define GET_BUFFER_SIZE(u) ((u)->buffer_size == -1 ? ACTUAL_DEFAULT_BUFFER_SIZE : (u)->buffer_size)
223 #define GET_BUFFER_DURATION(u) ((u)->buffer_duration == -1 ? ACTUAL_DEFAULT_BUFFER_DURATION : (u)->buffer_duration)
224 
225 #define DEFAULT_CAPS (gst_static_caps_get (&default_raw_caps))
226 enum
227 {
228   PROP_0,
229   PROP_URI,
230   PROP_SOURCE,
231   PROP_CONNECTION_SPEED,
232   PROP_BUFFER_SIZE,
233   PROP_BUFFER_DURATION,
234   PROP_DOWNLOAD,
235   PROP_USE_BUFFERING,
236   PROP_RING_BUFFER_MAX_SIZE,
237   PROP_LOW_WATERMARK,
238   PROP_HIGH_WATERMARK,
239   PROP_STATISTICS,
240 };
241 
242 #define CUSTOM_EOS_QUARK _custom_eos_quark_get ()
243 #define CUSTOM_EOS_QUARK_DATA "custom-eos"
244 static GQuark
_custom_eos_quark_get(void)245 _custom_eos_quark_get (void)
246 {
247   static gsize g_quark;
248 
249   if (g_once_init_enter (&g_quark)) {
250     gsize quark =
251         (gsize) g_quark_from_static_string ("urisourcebin-custom-eos");
252     g_once_init_leave (&g_quark, quark);
253   }
254   return g_quark;
255 }
256 
257 static void post_missing_plugin_error (GstElement * urisrc,
258     const gchar * element_name);
259 
260 static guint gst_uri_source_bin_signals[LAST_SIGNAL] = { 0 };
261 
262 GType gst_uri_source_bin_get_type (void);
263 #define gst_uri_source_bin_parent_class parent_class
264 G_DEFINE_TYPE (GstURISourceBin, gst_uri_source_bin, GST_TYPE_BIN);
265 
266 #define _do_init \
267     GST_DEBUG_CATEGORY_INIT (gst_uri_source_bin_debug, "urisourcebin", 0, "URI source element"); \
268     playback_element_init (plugin);
269 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (urisourcebin, "urisourcebin",
270     GST_RANK_NONE, GST_TYPE_URI_SOURCE_BIN, _do_init);
271 
272 static void gst_uri_source_bin_set_property (GObject * object, guint prop_id,
273     const GValue * value, GParamSpec * pspec);
274 static void gst_uri_source_bin_get_property (GObject * object, guint prop_id,
275     GValue * value, GParamSpec * pspec);
276 static void gst_uri_source_bin_finalize (GObject * obj);
277 
278 static void handle_message (GstBin * bin, GstMessage * msg);
279 
280 static gboolean gst_uri_source_bin_query (GstElement * element,
281     GstQuery * query);
282 static GstStateChangeReturn gst_uri_source_bin_change_state (GstElement *
283     element, GstStateChange transition);
284 
285 static void remove_demuxer (GstURISourceBin * bin);
286 static void expose_output_pad (GstURISourceBin * urisrc, GstPad * pad);
287 static OutputSlotInfo *get_output_slot (GstURISourceBin * urisrc,
288     gboolean do_download, gboolean is_adaptive, GstCaps * caps);
289 static void free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc);
290 static void free_output_slot_async (GstURISourceBin * urisrc,
291     OutputSlotInfo * slot);
292 static GstPad *create_output_pad (GstURISourceBin * urisrc, GstPad * pad);
293 static void remove_buffering_msgs (GstURISourceBin * bin, GstObject * src);
294 
295 static void update_queue_values (GstURISourceBin * urisrc);
296 static GstStructure *get_queue_statistics (GstURISourceBin * urisrc);
297 
298 static void
gst_uri_source_bin_class_init(GstURISourceBinClass * klass)299 gst_uri_source_bin_class_init (GstURISourceBinClass * klass)
300 {
301   GObjectClass *gobject_class;
302   GstElementClass *gstelement_class;
303   GstBinClass *gstbin_class;
304 
305   gobject_class = G_OBJECT_CLASS (klass);
306   gstelement_class = GST_ELEMENT_CLASS (klass);
307   gstbin_class = GST_BIN_CLASS (klass);
308 
309   gobject_class->set_property = gst_uri_source_bin_set_property;
310   gobject_class->get_property = gst_uri_source_bin_get_property;
311   gobject_class->finalize = gst_uri_source_bin_finalize;
312 
313   g_object_class_install_property (gobject_class, PROP_URI,
314       g_param_spec_string ("uri", "URI", "URI to decode",
315           DEFAULT_PROP_URI, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
316 
317   g_object_class_install_property (gobject_class, PROP_SOURCE,
318       g_param_spec_object ("source", "Source", "Source object used",
319           GST_TYPE_ELEMENT, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
320 
321   g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
322       g_param_spec_uint64 ("connection-speed", "Connection Speed",
323           "Network connection speed in kbps (0 = unknown)",
324           0, G_MAXUINT64 / 1000, DEFAULT_CONNECTION_SPEED,
325           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
326 
327   g_object_class_install_property (gobject_class, PROP_BUFFER_SIZE,
328       g_param_spec_int ("buffer-size", "Buffer size (bytes)",
329           "Buffer size when buffering streams (-1 default value)",
330           -1, G_MAXINT, DEFAULT_BUFFER_SIZE,
331           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
332   g_object_class_install_property (gobject_class, PROP_BUFFER_DURATION,
333       g_param_spec_int64 ("buffer-duration", "Buffer duration (ns)",
334           "Buffer duration when buffering streams (-1 default value)",
335           -1, G_MAXINT64, DEFAULT_BUFFER_DURATION,
336           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
337 
338   /**
339    * GstURISourceBin::download:
340    *
341    * For certain media type, enable download buffering.
342    */
343   g_object_class_install_property (gobject_class, PROP_DOWNLOAD,
344       g_param_spec_boolean ("download", "Download",
345           "Attempt download buffering when buffering network streams",
346           DEFAULT_DOWNLOAD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
347 
348   /**
349    * GstURISourceBin::use-buffering:
350    *
351    * Perform buffering using a queue2 element, and emit BUFFERING
352    * messages based on low-/high-percent thresholds of streaming data,
353    * such as adaptive-demuxer streams.
354    *
355    * When download buffering is activated and used for the current media
356    * type, this property does nothing.
357    *
358    */
359   g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
360       g_param_spec_boolean ("use-buffering", "Use Buffering",
361           "Perform buffering on demuxed/parsed media",
362           DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363 
364   /**
365    * GstURISourceBin::ring-buffer-max-size
366    *
367    * The maximum size of the ring buffer in kilobytes. If set to 0, the ring
368    * buffer is disabled. Default is 0.
369    *
370    */
371   g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
372       g_param_spec_uint64 ("ring-buffer-max-size",
373           "Max. ring buffer size (bytes)",
374           "Max. amount of data in the ring buffer (bytes, 0 = ring buffer disabled)",
375           0, G_MAXUINT, DEFAULT_RING_BUFFER_MAX_SIZE,
376           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
377 
378   /**
379    * GstURISourceBin::low-watermark
380    *
381    * Proportion of the queue size (either in bytes or time) for buffering
382    * to restart when crossed from above.  Only used if use-buffering is TRUE.
383    */
384   g_object_class_install_property (gobject_class, PROP_LOW_WATERMARK,
385       g_param_spec_double ("low-watermark", "Low watermark",
386           "Low threshold for buffering to start. Only used if use-buffering is True",
387           0.0, 1.0, DEFAULT_LOW_WATERMARK,
388           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
389 
390   /**
391    * GstURISourceBin::high-watermark
392    *
393    * Proportion of the queue size (either in bytes or time) to complete
394    * buffering.  Only used if use-buffering is TRUE.
395    */
396   g_object_class_install_property (gobject_class, PROP_HIGH_WATERMARK,
397       g_param_spec_double ("high-watermark", "High watermark",
398           "High threshold for buffering to finish. Only used if use-buffering is True",
399           0.0, 1.0, DEFAULT_HIGH_WATERMARK,
400           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
401 
402   /**
403    * GstURISourceBin::statistics
404    *
405    * A GStructure containing the following values based on the values from
406    * all the queue's contained in this urisourcebin.
407    *
408    *  "minimum-byte-level"  G_TYPE_UINT               Minimum of the current byte levels
409    *  "maximum-byte-level"  G_TYPE_UINT               Maximum of the current byte levels
410    *  "average-byte-level"  G_TYPE_UINT               Average of the current byte levels
411    *  "minimum-time-level"  G_TYPE_UINT64             Minimum of the current time levels
412    *  "maximum-time-level"  G_TYPE_UINT64             Maximum of the current time levels
413    *  "average-time-level"  G_TYPE_UINT64             Average of the current time levels
414    */
415   g_object_class_install_property (gobject_class, PROP_STATISTICS,
416       g_param_spec_boxed ("statistics", "Queue Statistics",
417           "A set of statistics over all the queue-like elements contained in "
418           "this element", GST_TYPE_STRUCTURE,
419           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
420 
421   /**
422    * GstURISourceBin::drained:
423    *
424    * This signal is emitted when the data for the current uri is played.
425    */
426   gst_uri_source_bin_signals[SIGNAL_DRAINED] =
427       g_signal_new ("drained", G_TYPE_FROM_CLASS (klass),
428       G_SIGNAL_RUN_LAST,
429       G_STRUCT_OFFSET (GstURISourceBinClass, drained), NULL, NULL, NULL,
430       G_TYPE_NONE, 0, G_TYPE_NONE);
431 
432     /**
433    * GstURISourceBin::about-to-finish:
434    *
435    * This signal is emitted when the data for the current uri is played.
436    */
437   gst_uri_source_bin_signals[SIGNAL_ABOUT_TO_FINISH] =
438       g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
439       G_SIGNAL_RUN_LAST,
440       G_STRUCT_OFFSET (GstURISourceBinClass, about_to_finish), NULL, NULL, NULL,
441       G_TYPE_NONE, 0, G_TYPE_NONE);
442 
443   /**
444    * GstURISourceBin::source-setup:
445    * @bin: the urisourcebin.
446    * @source: source element
447    *
448    * This signal is emitted after the source element has been created, so
449    * it can be configured by setting additional properties (e.g. set a
450    * proxy server for an http source, or set the device and read speed for
451    * an audio cd source). This is functionally equivalent to connecting to
452    * the notify::source signal, but more convenient.
453    *
454    * Since: 1.6.1
455    */
456   gst_uri_source_bin_signals[SIGNAL_SOURCE_SETUP] =
457       g_signal_new ("source-setup", G_TYPE_FROM_CLASS (klass),
458       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_ELEMENT);
459 
460   gst_element_class_add_pad_template (gstelement_class,
461       gst_static_pad_template_get (&srctemplate));
462   gst_element_class_set_static_metadata (gstelement_class,
463       "URI reader", "Generic/Bin/Source",
464       "Download and buffer a URI as needed",
465       "Jan Schmidt <jan@centricular.com>");
466 
467   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_uri_source_bin_query);
468   gstelement_class->change_state =
469       GST_DEBUG_FUNCPTR (gst_uri_source_bin_change_state);
470 
471   gstbin_class->handle_message = GST_DEBUG_FUNCPTR (handle_message);
472 }
473 
474 static void
gst_uri_source_bin_init(GstURISourceBin * urisrc)475 gst_uri_source_bin_init (GstURISourceBin * urisrc)
476 {
477   /* first filter out the interesting element factories */
478   g_mutex_init (&urisrc->factories_lock);
479 
480   g_mutex_init (&urisrc->lock);
481 
482   g_mutex_init (&urisrc->buffering_lock);
483   g_mutex_init (&urisrc->buffering_post_lock);
484 
485   urisrc->uri = g_strdup (DEFAULT_PROP_URI);
486   urisrc->connection_speed = DEFAULT_CONNECTION_SPEED;
487 
488   urisrc->buffer_duration = DEFAULT_BUFFER_DURATION;
489   urisrc->buffer_size = DEFAULT_BUFFER_SIZE;
490   urisrc->download = DEFAULT_DOWNLOAD;
491   urisrc->use_buffering = DEFAULT_USE_BUFFERING;
492   urisrc->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
493   urisrc->last_buffering_pct = -1;
494   urisrc->low_watermark = DEFAULT_LOW_WATERMARK;
495   urisrc->high_watermark = DEFAULT_HIGH_WATERMARK;
496 
497   GST_OBJECT_FLAG_SET (urisrc,
498       GST_ELEMENT_FLAG_SOURCE | GST_BIN_FLAG_STREAMS_AWARE);
499   gst_bin_set_suppressed_flags (GST_BIN (urisrc),
500       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
501 }
502 
503 static void
gst_uri_source_bin_finalize(GObject * obj)504 gst_uri_source_bin_finalize (GObject * obj)
505 {
506   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (obj);
507 
508   remove_demuxer (urisrc);
509   g_mutex_clear (&urisrc->lock);
510   g_mutex_clear (&urisrc->factories_lock);
511   g_mutex_clear (&urisrc->buffering_lock);
512   g_mutex_clear (&urisrc->buffering_post_lock);
513   g_free (urisrc->uri);
514   if (urisrc->factories)
515     gst_plugin_feature_list_free (urisrc->factories);
516 
517   G_OBJECT_CLASS (parent_class)->finalize (obj);
518 }
519 
520 static void
gst_uri_source_bin_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)521 gst_uri_source_bin_set_property (GObject * object, guint prop_id,
522     const GValue * value, GParamSpec * pspec)
523 {
524   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (object);
525 
526   switch (prop_id) {
527     case PROP_URI:
528       GST_OBJECT_LOCK (urisrc);
529       g_free (urisrc->uri);
530       urisrc->uri = g_value_dup_string (value);
531       GST_OBJECT_UNLOCK (urisrc);
532       break;
533     case PROP_CONNECTION_SPEED:
534       GST_OBJECT_LOCK (urisrc);
535       urisrc->connection_speed = g_value_get_uint64 (value) * 1000;
536       GST_OBJECT_UNLOCK (urisrc);
537       break;
538     case PROP_BUFFER_SIZE:
539       urisrc->buffer_size = g_value_get_int (value);
540       update_queue_values (urisrc);
541       break;
542     case PROP_BUFFER_DURATION:
543       urisrc->buffer_duration = g_value_get_int64 (value);
544       update_queue_values (urisrc);
545       break;
546     case PROP_DOWNLOAD:
547       urisrc->download = g_value_get_boolean (value);
548       break;
549     case PROP_USE_BUFFERING:
550       urisrc->use_buffering = g_value_get_boolean (value);
551       break;
552     case PROP_RING_BUFFER_MAX_SIZE:
553       urisrc->ring_buffer_max_size = g_value_get_uint64 (value);
554       break;
555     case PROP_LOW_WATERMARK:
556       urisrc->low_watermark = g_value_get_double (value);
557       update_queue_values (urisrc);
558       break;
559     case PROP_HIGH_WATERMARK:
560       urisrc->high_watermark = g_value_get_double (value);
561       update_queue_values (urisrc);
562       break;
563     default:
564       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
565       break;
566   }
567 }
568 
569 static void
gst_uri_source_bin_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)570 gst_uri_source_bin_get_property (GObject * object, guint prop_id,
571     GValue * value, GParamSpec * pspec)
572 {
573   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (object);
574 
575   switch (prop_id) {
576     case PROP_URI:
577       GST_OBJECT_LOCK (urisrc);
578       g_value_set_string (value, urisrc->uri);
579       GST_OBJECT_UNLOCK (urisrc);
580       break;
581     case PROP_SOURCE:
582       GST_OBJECT_LOCK (urisrc);
583       g_value_set_object (value, urisrc->source);
584       GST_OBJECT_UNLOCK (urisrc);
585       break;
586     case PROP_CONNECTION_SPEED:
587       GST_OBJECT_LOCK (urisrc);
588       g_value_set_uint64 (value, urisrc->connection_speed / 1000);
589       GST_OBJECT_UNLOCK (urisrc);
590       break;
591     case PROP_BUFFER_SIZE:
592       GST_OBJECT_LOCK (urisrc);
593       g_value_set_int (value, urisrc->buffer_size);
594       GST_OBJECT_UNLOCK (urisrc);
595       break;
596     case PROP_BUFFER_DURATION:
597       GST_OBJECT_LOCK (urisrc);
598       g_value_set_int64 (value, urisrc->buffer_duration);
599       GST_OBJECT_UNLOCK (urisrc);
600       break;
601     case PROP_DOWNLOAD:
602       g_value_set_boolean (value, urisrc->download);
603       break;
604     case PROP_USE_BUFFERING:
605       g_value_set_boolean (value, urisrc->use_buffering);
606       break;
607     case PROP_RING_BUFFER_MAX_SIZE:
608       g_value_set_uint64 (value, urisrc->ring_buffer_max_size);
609       break;
610     case PROP_LOW_WATERMARK:
611       g_value_set_double (value, urisrc->low_watermark);
612       break;
613     case PROP_HIGH_WATERMARK:
614       g_value_set_double (value, urisrc->high_watermark);
615       break;
616     case PROP_STATISTICS:
617       g_value_take_boxed (value, get_queue_statistics (urisrc));
618       break;
619     default:
620       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
621       break;
622   }
623 }
624 
625 static gboolean
copy_sticky_events(GstPad * pad,GstEvent ** event,gpointer user_data)626 copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
627 {
628   GstPad *gpad = GST_PAD_CAST (user_data);
629 
630   GST_DEBUG_OBJECT (gpad, "store sticky event %" GST_PTR_FORMAT, *event);
631   gst_pad_store_sticky_event (gpad, *event);
632 
633   return TRUE;
634 }
635 
636 static GstPadProbeReturn
637 pending_pad_blocked (GstPad * pad, GstPadProbeInfo * info, gpointer user_data);
638 
639 static GstPadProbeReturn
640 demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data);
641 
642 static void
free_child_src_pad_info(ChildSrcPadInfo * info)643 free_child_src_pad_info (ChildSrcPadInfo * info)
644 {
645   if (info->cur_caps)
646     gst_caps_unref (info->cur_caps);
647   if (info->output_pad)
648     gst_object_unref (info->output_pad);
649   g_free (info);
650 }
651 
652 /* Called by the signal handlers when a demuxer has produced a new stream */
653 static void
new_demuxer_pad_added_cb(GstElement * element,GstPad * pad,GstURISourceBin * urisrc)654 new_demuxer_pad_added_cb (GstElement * element, GstPad * pad,
655     GstURISourceBin * urisrc)
656 {
657   ChildSrcPadInfo *info;
658 
659   info = g_new0 (ChildSrcPadInfo, 1);
660   info->src_pad = pad;
661   info->cur_caps = gst_pad_get_current_caps (pad);
662   if (info->cur_caps == NULL)
663     info->cur_caps = gst_pad_query_caps (pad, NULL);
664 
665   g_object_set_data_full (G_OBJECT (pad), "urisourcebin.srcpadinfo",
666       info, (GDestroyNotify) free_child_src_pad_info);
667 
668   GST_DEBUG_OBJECT (element, "new demuxer pad, name: <%s>. "
669       "Added as pending pad with caps %" GST_PTR_FORMAT,
670       GST_PAD_NAME (pad), info->cur_caps);
671 
672   GST_URI_SOURCE_BIN_LOCK (urisrc);
673   urisrc->pending_pads = g_list_prepend (urisrc->pending_pads, pad);
674   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
675 
676   /* Block the pad. On the first data on that pad if it hasn't
677    * been linked to an output slot, we'll create one */
678   info->blocking_probe_id =
679       gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
680       pending_pad_blocked, urisrc, NULL);
681   info->event_probe_id =
682       gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM |
683       GST_PAD_PROBE_TYPE_EVENT_FLUSH, demux_pad_events, urisrc, NULL);
684 }
685 
686 static GstPadProbeReturn
pending_pad_blocked(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)687 pending_pad_blocked (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
688 {
689   ChildSrcPadInfo *child_info;
690   OutputSlotInfo *slot;
691   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (user_data);
692   GstCaps *caps;
693   GstPad *output_pad;
694 
695   if (!(child_info =
696           g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo")))
697     goto done;
698 
699   GST_LOG_OBJECT (urisrc, "Removing pad %" GST_PTR_FORMAT " from pending list",
700       pad);
701 
702   GST_URI_SOURCE_BIN_LOCK (urisrc);
703 
704   /* Once blocked, this pad is no longer pending, one way or another */
705   urisrc->pending_pads = g_list_remove (urisrc->pending_pads, pad);
706 
707   /* If already linked to a slot, nothing more to do */
708   if (child_info->output_slot) {
709     GST_LOG_OBJECT (urisrc, "Pad %" GST_PTR_FORMAT " is linked to queue %"
710         GST_PTR_FORMAT " on slot %p", pad, child_info->output_slot->queue,
711         child_info->output_slot);
712     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
713     goto done;
714   }
715 
716   caps = gst_pad_get_current_caps (pad);
717   if (caps == NULL)
718     caps = gst_pad_query_caps (pad, NULL);
719 
720   slot = get_output_slot (urisrc, FALSE, TRUE, caps);
721 
722   gst_caps_unref (caps);
723 
724   if (slot == NULL) {
725     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
726     goto done;
727   }
728 
729   GST_LOG_OBJECT (urisrc, "Pad %" GST_PTR_FORMAT " linked to slot %p", pad,
730       slot);
731 
732   child_info->output_slot = slot;
733   slot->linked_info = child_info;
734   gst_pad_link (pad, slot->sinkpad);
735 
736   output_pad = gst_object_ref (slot->srcpad);
737 
738   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
739 
740   expose_output_pad (urisrc, output_pad);
741   gst_object_unref (output_pad);
742 
743 done:
744   return GST_PAD_PROBE_REMOVE;
745 }
746 
747 /* Called with LOCK held */
748 /* Looks for a suitable pending pad to connect onto this
749  * finishing output slot that's about to EOS */
750 static gboolean
link_pending_pad_to_output(GstURISourceBin * urisrc,OutputSlotInfo * slot)751 link_pending_pad_to_output (GstURISourceBin * urisrc, OutputSlotInfo * slot)
752 {
753   GList *cur;
754   ChildSrcPadInfo *in_info = slot->linked_info;
755   ChildSrcPadInfo *out_info = NULL;
756   gboolean res = FALSE;
757   GstCaps *cur_caps;
758 
759   /* Look for a suitable pending pad */
760   cur_caps = gst_pad_get_current_caps (slot->sinkpad);
761 
762   GST_DEBUG_OBJECT (urisrc,
763       "Looking for a pending pad with caps %" GST_PTR_FORMAT, cur_caps);
764 
765   for (cur = urisrc->pending_pads; cur != NULL; cur = g_list_next (cur)) {
766     GstPad *pending = (GstPad *) (cur->data);
767     ChildSrcPadInfo *cur_info = NULL;
768     if ((cur_info =
769             g_object_get_data (G_OBJECT (pending),
770                 "urisourcebin.srcpadinfo"))) {
771       /* Don't re-link to the same pad in case of EOS while still pending */
772       if (in_info == cur_info)
773         continue;
774       if (cur_caps == NULL || gst_caps_is_equal (cur_caps, cur_info->cur_caps)) {
775         GST_DEBUG_OBJECT (urisrc, "Found suitable pending pad %" GST_PTR_FORMAT
776             " with caps %" GST_PTR_FORMAT " to link to this output slot",
777             cur_info->src_pad, cur_info->cur_caps);
778         out_info = cur_info;
779         break;
780       }
781     }
782   }
783 
784   if (cur_caps)
785     gst_caps_unref (cur_caps);
786 
787   if (out_info) {
788     /* Block any upstream stuff while we switch out the pad */
789     guint block_id =
790         gst_pad_add_probe (slot->sinkpad, GST_PAD_PROBE_TYPE_BLOCK_UPSTREAM,
791         NULL, NULL, NULL);
792     GST_DEBUG_OBJECT (urisrc, "Linking pending pad %" GST_PTR_FORMAT
793         " to existing output slot %p", out_info->src_pad, slot);
794 
795     if (in_info) {
796       gst_pad_unlink (in_info->src_pad, slot->sinkpad);
797       in_info->output_slot = NULL;
798       slot->linked_info = NULL;
799     }
800 
801     if (gst_pad_link (out_info->src_pad, slot->sinkpad) == GST_PAD_LINK_OK) {
802       out_info->output_slot = slot;
803       slot->linked_info = out_info;
804 
805       BUFFERING_LOCK (urisrc);
806       /* A re-linked slot is no longer EOS */
807       slot->is_eos = FALSE;
808       BUFFERING_UNLOCK (urisrc);
809       res = TRUE;
810       slot->is_eos = FALSE;
811       urisrc->pending_pads =
812           g_list_remove (urisrc->pending_pads, out_info->src_pad);
813     } else {
814       GST_ERROR_OBJECT (urisrc,
815           "Failed to link new demuxer pad to the output slot we tried");
816     }
817     gst_pad_remove_probe (slot->sinkpad, block_id);
818   }
819 
820   return res;
821 }
822 
823 /* Called with lock held */
824 static gboolean
all_slots_are_eos(GstURISourceBin * urisrc)825 all_slots_are_eos (GstURISourceBin * urisrc)
826 {
827   GSList *tmp;
828 
829   for (tmp = urisrc->out_slots; tmp; tmp = tmp->next) {
830     OutputSlotInfo *slot = (OutputSlotInfo *) tmp->data;
831     if (slot->is_eos == FALSE)
832       return FALSE;
833   }
834   return TRUE;
835 }
836 
837 static GstPadProbeReturn
demux_pad_events(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)838 demux_pad_events (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
839 {
840   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (user_data);
841   ChildSrcPadInfo *child_info;
842   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
843   GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
844 
845   if (!(child_info =
846           g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo")))
847     goto done;
848 
849   GST_URI_SOURCE_BIN_LOCK (urisrc);
850   /* If not linked to a slot, nothing more to do */
851   if (child_info->output_slot == NULL) {
852     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
853     goto done;
854   }
855 
856   switch (GST_EVENT_TYPE (ev)) {
857     case GST_EVENT_EOS:
858     {
859       gboolean all_streams_eos;
860 
861       GST_LOG_OBJECT (urisrc, "EOS on pad %" GST_PTR_FORMAT, pad);
862 
863       if ((urisrc->pending_pads &&
864               link_pending_pad_to_output (urisrc, child_info->output_slot))) {
865         /* Found a new source pad to give this slot data - no need to send EOS */
866         GST_URI_SOURCE_BIN_UNLOCK (urisrc);
867         ret = GST_PAD_PROBE_DROP;
868         goto done;
869       }
870 
871       BUFFERING_LOCK (urisrc);
872       /* Mark that we fed an EOS to this slot */
873       child_info->output_slot->is_eos = TRUE;
874       all_streams_eos = all_slots_are_eos (urisrc);
875       BUFFERING_UNLOCK (urisrc);
876 
877       /* EOS means this element is no longer buffering */
878       remove_buffering_msgs (urisrc,
879           GST_OBJECT_CAST (child_info->output_slot->queue));
880 
881       /* Mark this custom EOS */
882       gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev), CUSTOM_EOS_QUARK,
883           (gchar *) CUSTOM_EOS_QUARK_DATA, NULL);
884       if (all_streams_eos) {
885         GST_DEBUG_OBJECT (urisrc, "POSTING ABOUT TO FINISH");
886         g_signal_emit (urisrc,
887             gst_uri_source_bin_signals[SIGNAL_ABOUT_TO_FINISH], 0, NULL);
888       }
889     }
890       break;
891     case GST_EVENT_CAPS:
892     {
893       GstCaps *caps;
894       gst_event_parse_caps (ev, &caps);
895       gst_caps_replace (&child_info->cur_caps, caps);
896     }
897       break;
898     case GST_EVENT_STREAM_START:
899     case GST_EVENT_FLUSH_STOP:
900       BUFFERING_LOCK (urisrc);
901       child_info->output_slot->is_eos = FALSE;
902       BUFFERING_UNLOCK (urisrc);
903       break;
904     default:
905       break;
906   }
907 
908   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
909 
910 done:
911   return ret;
912 }
913 
914 static GstPadProbeReturn
pre_queue_event_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)915 pre_queue_event_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
916 {
917   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (user_data);
918   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
919   GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
920 
921   switch (GST_EVENT_TYPE (ev)) {
922     case GST_EVENT_EOS:
923     {
924       GST_LOG_OBJECT (urisrc, "EOS on pad %" GST_PTR_FORMAT, pad);
925       GST_DEBUG_OBJECT (urisrc, "POSTING ABOUT TO FINISH");
926       g_signal_emit (urisrc,
927           gst_uri_source_bin_signals[SIGNAL_ABOUT_TO_FINISH], 0, NULL);
928     }
929       break;
930     default:
931       break;
932   }
933   return ret;
934 }
935 
936 static GstStructure *
get_queue_statistics(GstURISourceBin * urisrc)937 get_queue_statistics (GstURISourceBin * urisrc)
938 {
939   GstStructure *ret = NULL;
940   guint min_byte_level = 0, max_byte_level = 0;
941   guint64 min_time_level = 0, max_time_level = 0;
942   gdouble avg_byte_level = 0., avg_time_level = 0.;
943   guint i = 0;
944   GSList *cur;
945 
946   GST_URI_SOURCE_BIN_LOCK (urisrc);
947 
948   for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) {
949     OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
950     guint byte_limit = 0;
951     guint64 time_limit = 0;
952 
953     g_object_get (slot->queue, "current-level-bytes", &byte_limit,
954         "current-level-time", &time_limit, NULL);
955 
956     if (byte_limit < min_byte_level)
957       min_byte_level = byte_limit;
958     if (byte_limit > max_byte_level)
959       max_byte_level = byte_limit;
960     avg_byte_level = (avg_byte_level * i + byte_limit) / (gdouble) (i + 1);
961 
962     if (time_limit < min_time_level)
963       min_time_level = time_limit;
964     if (time_limit > max_time_level)
965       max_time_level = time_limit;
966     avg_time_level = (avg_time_level * i + time_limit) / (gdouble) (i + 1);
967 
968     i++;
969   }
970   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
971 
972   ret = gst_structure_new ("application/x-urisourcebin-stats",
973       "minimum-byte-level", G_TYPE_UINT, (guint) min_byte_level,
974       "maximum-byte-level", G_TYPE_UINT, (guint) max_byte_level,
975       "average-byte-level", G_TYPE_UINT, (guint) avg_byte_level,
976       "minimum-time-level", G_TYPE_UINT64, (guint64) min_time_level,
977       "maximum-time-level", G_TYPE_UINT64, (guint64) max_time_level,
978       "average-time-level", G_TYPE_UINT64, (guint64) avg_time_level, NULL);
979 
980   return ret;
981 }
982 
983 static void
update_queue_values(GstURISourceBin * urisrc)984 update_queue_values (GstURISourceBin * urisrc)
985 {
986   gint64 duration;
987   guint buffer_size;
988   gdouble low_watermark, high_watermark;
989   guint64 cumulative_bitrate = 0;
990   GSList *cur;
991 
992   GST_URI_SOURCE_BIN_LOCK (urisrc);
993   duration = GET_BUFFER_DURATION (urisrc);
994   buffer_size = GET_BUFFER_SIZE (urisrc);
995   low_watermark = urisrc->low_watermark;
996   high_watermark = urisrc->high_watermark;
997 
998   for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) {
999     OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
1000     guint64 bitrate = 0;
1001 
1002     if (g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue),
1003             "bitrate")) {
1004       g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL);
1005     }
1006 
1007     if (bitrate > 0)
1008       cumulative_bitrate += bitrate;
1009     else {
1010       GST_TRACE_OBJECT (urisrc, "Unknown bitrate detected from %" GST_PTR_FORMAT
1011           ", resetting all bitrates", slot->queue);
1012       cumulative_bitrate = 0;
1013       break;
1014     }
1015   }
1016 
1017   GST_DEBUG_OBJECT (urisrc, "recalculating queue limits with cumulative "
1018       "bitrate %" G_GUINT64_FORMAT ", buffer size %u, buffer duration %"
1019       G_GINT64_FORMAT, cumulative_bitrate, buffer_size, duration);
1020 
1021   for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) {
1022     OutputSlotInfo *slot = (OutputSlotInfo *) (cur->data);
1023     guint byte_limit;
1024 
1025     if (cumulative_bitrate > 0
1026         && g_object_class_find_property (G_OBJECT_GET_CLASS (slot->queue),
1027             "bitrate")) {
1028       guint64 bitrate;
1029       g_object_get (G_OBJECT (slot->queue), "bitrate", &bitrate, NULL);
1030       byte_limit =
1031           gst_util_uint64_scale (buffer_size, bitrate, cumulative_bitrate);
1032     } else {
1033       /* if not all queue's have valid bitrates, use the buffer-size as the
1034        * limit */
1035       byte_limit = buffer_size;
1036     }
1037 
1038     GST_DEBUG_OBJECT (urisrc,
1039         "calculated new limits for queue-like element %" GST_PTR_FORMAT
1040         ", bytes:%u, time:%" G_GUINT64_FORMAT
1041         ", low-watermark:%f, high-watermark:%f",
1042         slot->queue, byte_limit, (guint64) duration, low_watermark,
1043         high_watermark);
1044     g_object_set (G_OBJECT (slot->queue), "max-size-bytes", byte_limit,
1045         "max-size-time", (guint64) duration, "low-watermark", low_watermark,
1046         "high-watermark", high_watermark, NULL);
1047   }
1048   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1049 }
1050 
1051 static void
on_queue_bitrate_changed(GstElement * queue,GParamSpec * pspec,gpointer user_data)1052 on_queue_bitrate_changed (GstElement * queue, GParamSpec * pspec,
1053     gpointer user_data)
1054 {
1055   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (user_data);
1056 
1057   gst_element_call_async (GST_ELEMENT (urisrc),
1058       (GstElementCallAsyncFunc) update_queue_values, NULL, NULL);
1059 }
1060 
1061 /* Called with lock held */
1062 static OutputSlotInfo *
get_output_slot(GstURISourceBin * urisrc,gboolean do_download,gboolean is_adaptive,GstCaps * caps)1063 get_output_slot (GstURISourceBin * urisrc, gboolean do_download,
1064     gboolean is_adaptive, GstCaps * caps)
1065 {
1066   OutputSlotInfo *slot;
1067   GstPad *srcpad;
1068   GstElement *queue;
1069   const gchar *elem_name;
1070 
1071   /* If we have caps, iterate the existing slots and look for an
1072    * unlinked one that can be used */
1073   if (caps && gst_caps_is_fixed (caps)) {
1074     GSList *cur;
1075     GstCaps *cur_caps;
1076 
1077     for (cur = urisrc->out_slots; cur != NULL; cur = g_slist_next (cur)) {
1078       slot = (OutputSlotInfo *) (cur->data);
1079       if (slot->linked_info == NULL) {
1080         cur_caps = gst_pad_get_current_caps (slot->sinkpad);
1081         if (cur_caps == NULL || gst_caps_is_equal (caps, cur_caps)) {
1082           GST_LOG_OBJECT (urisrc, "Found existing slot %p to link to", slot);
1083           gst_caps_unref (cur_caps);
1084           slot->is_eos = FALSE;
1085           return slot;
1086         }
1087         gst_caps_unref (cur_caps);
1088       }
1089     }
1090   }
1091 
1092   /* Otherwise create the new slot */
1093   if (do_download)
1094     elem_name = "downloadbuffer";
1095   else
1096     elem_name = "queue2";
1097 
1098   queue = gst_element_factory_make (elem_name, NULL);
1099   if (!queue)
1100     goto no_buffer_element;
1101 
1102   slot = g_new0 (OutputSlotInfo, 1);
1103   slot->queue = queue;
1104 
1105   /* Set the slot onto the queue (needed in buffering msg handling) */
1106   g_object_set_data (G_OBJECT (queue), "urisourcebin.slotinfo", slot);
1107 
1108   slot->bitrate_changed_id =
1109       g_signal_connect (G_OBJECT (queue), "notify::bitrate",
1110       (GCallback) on_queue_bitrate_changed, urisrc);
1111 
1112   if (do_download) {
1113     gchar *temp_template, *filename;
1114     const gchar *tmp_dir, *prgname;
1115 
1116     tmp_dir = g_get_user_cache_dir ();
1117     prgname = g_get_prgname ();
1118     if (prgname == NULL)
1119       prgname = "GStreamer";
1120 
1121     filename = g_strdup_printf ("%s-XXXXXX", prgname);
1122 
1123     /* build our filename */
1124     temp_template = g_build_filename (tmp_dir, filename, NULL);
1125 
1126     GST_DEBUG_OBJECT (urisrc, "enable download buffering in %s (%s, %s, %s)",
1127         temp_template, tmp_dir, prgname, filename);
1128 
1129     /* configure progressive download for selected media types */
1130     g_object_set (queue, "temp-template", temp_template, NULL);
1131 
1132     g_free (filename);
1133     g_free (temp_template);
1134   } else {
1135     if (is_adaptive) {
1136       GST_LOG_OBJECT (urisrc, "Adding queue for adaptive streaming stream");
1137       g_object_set (queue, "use-buffering", urisrc->use_buffering,
1138           "use-tags-bitrate", TRUE, "use-rate-estimate", FALSE, NULL);
1139     } else {
1140       GST_LOG_OBJECT (urisrc, "Adding queue for buffering");
1141       g_object_set (queue, "use-buffering", urisrc->use_buffering, NULL);
1142     }
1143 
1144     g_object_set (queue, "ring-buffer-max-size",
1145         urisrc->ring_buffer_max_size, NULL);
1146     /* Disable max-size-buffers - queue based on data rate to the default time limit */
1147     g_object_set (queue, "max-size-buffers", 0, NULL);
1148 
1149     /* Don't start buffering until the queue is empty (< 1%).
1150      * Start playback when the queue is 60% full, leaving a bit more room
1151      * for upstream to push more without getting bursty */
1152     g_object_set (queue, "low-percent", 1, "high-percent", 60, NULL);
1153 
1154     g_object_set (queue, "low-watermark", urisrc->low_watermark,
1155         "high-watermark", urisrc->high_watermark, NULL);
1156   }
1157 
1158   /* set the necessary limits on the queue-like elements */
1159   g_object_set (queue, "max-size-bytes", GET_BUFFER_SIZE (urisrc),
1160       "max-size-time", (guint64) GET_BUFFER_DURATION (urisrc), NULL);
1161 #if 0
1162   /* Disabled because this makes initial startup slower for radio streams */
1163   else {
1164     /* Buffer 4 seconds by default - some extra headroom over the
1165      * core default, because we trigger playback sooner */
1166     //g_object_set (queue, "max-size-time", 4 * GST_SECOND, NULL);
1167   }
1168 #endif
1169 
1170   /* save queue pointer so we can remove it later */
1171   urisrc->out_slots = g_slist_prepend (urisrc->out_slots, slot);
1172 
1173   gst_bin_add (GST_BIN_CAST (urisrc), queue);
1174   gst_element_sync_state_with_parent (queue);
1175 
1176   slot->sinkpad = gst_element_get_static_pad (queue, "sink");
1177 
1178   /* get the new raw srcpad */
1179   srcpad = gst_element_get_static_pad (queue, "src");
1180   g_object_set_data (G_OBJECT (srcpad), "urisourcebin.slotinfo", slot);
1181 
1182   slot->srcpad = create_output_pad (urisrc, srcpad);
1183 
1184   gst_object_unref (srcpad);
1185 
1186   return slot;
1187 
1188 no_buffer_element:
1189   {
1190     post_missing_plugin_error (GST_ELEMENT_CAST (urisrc), elem_name);
1191     return NULL;
1192   }
1193 }
1194 
1195 static GstPadProbeReturn
source_pad_event_probe(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)1196 source_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
1197     gpointer user_data)
1198 {
1199   GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
1200   GstURISourceBin *urisrc = user_data;
1201 
1202   GST_LOG_OBJECT (pad, "%s, urisrc %p", GST_EVENT_TYPE_NAME (event), event);
1203 
1204   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS &&
1205       gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (event),
1206           CUSTOM_EOS_QUARK)) {
1207     OutputSlotInfo *slot;
1208     GST_DEBUG_OBJECT (pad, "we received EOS");
1209 
1210     /* remove custom-eos */
1211     gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (event), CUSTOM_EOS_QUARK,
1212         NULL, NULL);
1213 
1214     GST_URI_SOURCE_BIN_LOCK (urisrc);
1215 
1216     slot = g_object_get_data (G_OBJECT (pad), "urisourcebin.slotinfo");
1217 
1218     if (slot) {
1219       GstEvent *eos;
1220       guint32 seqnum;
1221 
1222       if (slot->linked_info) {
1223         if (slot->is_eos) {
1224           /* linked_info is old input which is still linked without removal */
1225           GST_DEBUG_OBJECT (pad, "push actual EOS");
1226           seqnum = gst_event_get_seqnum (event);
1227           eos = gst_event_new_eos ();
1228           gst_event_set_seqnum (eos, seqnum);
1229           gst_pad_push_event (slot->srcpad, eos);
1230         } else {
1231           /* Do not clear output slot yet. A new input was
1232            * connected. We should just drop this EOS */
1233         }
1234         GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1235         return GST_PAD_PROBE_DROP;
1236       }
1237 
1238       seqnum = gst_event_get_seqnum (event);
1239       eos = gst_event_new_eos ();
1240       gst_event_set_seqnum (eos, seqnum);
1241       gst_pad_push_event (slot->srcpad, eos);
1242       free_output_slot_async (urisrc, slot);
1243     }
1244 
1245     /* FIXME: Only emit drained if all output pads are done and there's no
1246      * pending pads */
1247     g_signal_emit (urisrc, gst_uri_source_bin_signals[SIGNAL_DRAINED], 0, NULL);
1248 
1249     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1250     return GST_PAD_PROBE_DROP;
1251   }
1252   /* never drop events */
1253   return GST_PAD_PROBE_OK;
1254 }
1255 
1256 /* called when we found a raw pad to expose. We set up a
1257  * padprobe to detect EOS before exposing the pad.
1258  * Called with LOCK held. */
1259 static GstPad *
create_output_pad(GstURISourceBin * urisrc,GstPad * pad)1260 create_output_pad (GstURISourceBin * urisrc, GstPad * pad)
1261 {
1262   GstPad *newpad;
1263   GstPadTemplate *pad_tmpl;
1264   gchar *padname;
1265 
1266   gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
1267       source_pad_event_probe, urisrc, NULL);
1268 
1269   pad_tmpl = gst_static_pad_template_get (&srctemplate);
1270 
1271   padname = g_strdup_printf ("src_%u", urisrc->numpads);
1272   urisrc->numpads++;
1273 
1274   newpad = gst_ghost_pad_new_from_template (padname, pad, pad_tmpl);
1275   gst_object_unref (pad_tmpl);
1276   g_free (padname);
1277 
1278   GST_DEBUG_OBJECT (urisrc, "Created output pad %s:%s for pad %s:%s",
1279       GST_DEBUG_PAD_NAME (newpad), GST_DEBUG_PAD_NAME (pad));
1280 
1281   return newpad;
1282 }
1283 
1284 static void
expose_output_pad(GstURISourceBin * urisrc,GstPad * pad)1285 expose_output_pad (GstURISourceBin * urisrc, GstPad * pad)
1286 {
1287   GstPad *target;
1288 
1289   if (gst_object_has_as_parent (GST_OBJECT (pad), GST_OBJECT (urisrc)))
1290     return;                     /* Pad is already exposed */
1291 
1292   target = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
1293 
1294   gst_pad_sticky_events_foreach (target, copy_sticky_events, pad);
1295   gst_object_unref (target);
1296 
1297   GST_DEBUG_OBJECT (urisrc, "Exposing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1298 
1299   gst_pad_set_active (pad, TRUE);
1300   gst_element_add_pad (GST_ELEMENT_CAST (urisrc), pad);
1301 }
1302 
1303 static void
expose_raw_output_pad(GstURISourceBin * urisrc,GstPad * srcpad,GstPad * output_pad)1304 expose_raw_output_pad (GstURISourceBin * urisrc, GstPad * srcpad,
1305     GstPad * output_pad)
1306 {
1307   ChildSrcPadInfo *info = g_new0 (ChildSrcPadInfo, 1);
1308   info->src_pad = srcpad;
1309   info->output_pad = gst_object_ref (output_pad);
1310 
1311   g_assert (g_object_get_data (G_OBJECT (srcpad),
1312           "urisourcebin.srcpadinfo") == NULL);
1313 
1314   g_object_set_data_full (G_OBJECT (srcpad), "urisourcebin.srcpadinfo",
1315       info, (GDestroyNotify) free_child_src_pad_info);
1316 
1317   expose_output_pad (urisrc, output_pad);
1318 }
1319 
1320 static void
remove_output_pad(GstURISourceBin * urisrc,GstPad * pad)1321 remove_output_pad (GstURISourceBin * urisrc, GstPad * pad)
1322 {
1323   if (!gst_object_has_as_parent (GST_OBJECT (pad), GST_OBJECT (urisrc)))
1324     return;                     /* Pad is not exposed */
1325 
1326   GST_DEBUG_OBJECT (urisrc, "Removing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1327 
1328   gst_pad_set_active (pad, FALSE);
1329   gst_element_remove_pad (GST_ELEMENT_CAST (urisrc), pad);
1330 }
1331 
1332 static void
pad_removed_cb(GstElement * element,GstPad * pad,GstURISourceBin * urisrc)1333 pad_removed_cb (GstElement * element, GstPad * pad, GstURISourceBin * urisrc)
1334 {
1335   ChildSrcPadInfo *info;
1336 
1337   GST_DEBUG_OBJECT (element, "pad removed name: <%s:%s>",
1338       GST_DEBUG_PAD_NAME (pad));
1339 
1340   /* we only care about srcpads */
1341   if (!GST_PAD_IS_SRC (pad))
1342     return;
1343 
1344   if (!(info = g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo")))
1345     goto no_info;
1346 
1347   GST_URI_SOURCE_BIN_LOCK (urisrc);
1348   /* Make sure this isn't in the pending pads list */
1349   urisrc->pending_pads = g_list_remove (urisrc->pending_pads, pad);
1350 
1351   /* Send EOS to the output slot if the demuxer didn't already */
1352   if (info->output_slot) {
1353     GstStructure *s;
1354     GstEvent *event;
1355     OutputSlotInfo *slot;
1356 
1357     slot = info->output_slot;
1358 
1359     if (!slot->is_eos && urisrc->pending_pads &&
1360         link_pending_pad_to_output (urisrc, slot)) {
1361       /* Found a new source pad to give this slot data - no need to send EOS */
1362       GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1363       return;
1364     }
1365 
1366     BUFFERING_LOCK (urisrc);
1367     /* Unlink this pad from its output slot and send a fake EOS event
1368      * to drain the queue */
1369     slot->is_eos = TRUE;
1370     BUFFERING_UNLOCK (urisrc);
1371 
1372     remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
1373 
1374     slot->linked_info = NULL;
1375 
1376     info->output_slot = NULL;
1377 
1378     GST_LOG_OBJECT (element,
1379         "Pad %" GST_PTR_FORMAT " was removed without EOS. Sending.", pad);
1380 
1381     event = gst_event_new_eos ();
1382     s = gst_event_writable_structure (event);
1383     gst_structure_set (s, "urisourcebin-custom-eos", G_TYPE_BOOLEAN, TRUE,
1384         NULL);
1385     gst_pad_send_event (slot->sinkpad, event);
1386   } else if (info->output_pad != NULL) {
1387     GST_LOG_OBJECT (element,
1388         "Pad %" GST_PTR_FORMAT " was removed. Unexposing %" GST_PTR_FORMAT,
1389         pad, info->output_pad);
1390     remove_output_pad (urisrc, info->output_pad);
1391   } else {
1392     GST_LOG_OBJECT (urisrc, "Removed pad has no output slot or pad");
1393   }
1394   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1395 
1396   return;
1397 
1398   /* ERRORS */
1399 no_info:
1400   {
1401     GST_WARNING_OBJECT (element, "no info found for pad");
1402     return;
1403   }
1404 }
1405 
1406 /* helper function to lookup stuff in lists */
1407 static gboolean
array_has_value(const gchar * values[],const gchar * value)1408 array_has_value (const gchar * values[], const gchar * value)
1409 {
1410   gint i;
1411 
1412   for (i = 0; values[i]; i++) {
1413     if (g_str_has_prefix (value, values[i]))
1414       return TRUE;
1415   }
1416   return FALSE;
1417 }
1418 
1419 static gboolean
array_has_uri_value(const gchar * values[],const gchar * value)1420 array_has_uri_value (const gchar * values[], const gchar * value)
1421 {
1422   gint i;
1423 
1424   for (i = 0; values[i]; i++) {
1425     if (!g_ascii_strncasecmp (value, values[i], strlen (values[i])))
1426       return TRUE;
1427   }
1428   return FALSE;
1429 }
1430 
1431 /* list of URIs that we consider to be streams and that need buffering.
1432  * We have no mechanism yet to figure this out with a query. */
1433 static const gchar *stream_uris[] = { "http://", "https://", "mms://",
1434   "mmsh://", "mmsu://", "mmst://", "fd://", "myth://", "ssh://",
1435   "ftp://", "sftp://",
1436   NULL
1437 };
1438 
1439 /* list of URIs that need a queue because they are pretty bursty */
1440 static const gchar *queue_uris[] = { "cdda://", NULL };
1441 
1442 /* blacklisted URIs, we know they will always fail. */
1443 static const gchar *blacklisted_uris[] = { NULL };
1444 
1445 /* media types that use adaptive streaming */
1446 static const gchar *adaptive_media[] = {
1447   "application/x-hls", "application/vnd.ms-sstr+xml",
1448   "application/dash+xml", NULL
1449 };
1450 
1451 #define IS_STREAM_URI(uri)          (array_has_uri_value (stream_uris, uri))
1452 #define IS_QUEUE_URI(uri)           (array_has_uri_value (queue_uris, uri))
1453 #define IS_BLACKLISTED_URI(uri)     (array_has_uri_value (blacklisted_uris, uri))
1454 #define IS_ADAPTIVE_MEDIA(media)    (array_has_value (adaptive_media, media))
1455 
1456 /*
1457  * Generate and configure a source element.
1458  */
1459 static GstElement *
gen_source_element(GstURISourceBin * urisrc)1460 gen_source_element (GstURISourceBin * urisrc)
1461 {
1462   GObjectClass *source_class;
1463   GstElement *source;
1464   GParamSpec *pspec;
1465   GstQuery *query;
1466   GstSchedulingFlags flags;
1467   GError *err = NULL;
1468 
1469   if (!urisrc->uri)
1470     goto no_uri;
1471 
1472   GST_LOG_OBJECT (urisrc, "finding source for %s", urisrc->uri);
1473 
1474   if (!gst_uri_is_valid (urisrc->uri))
1475     goto invalid_uri;
1476 
1477   if (IS_BLACKLISTED_URI (urisrc->uri))
1478     goto uri_blacklisted;
1479 
1480   source = gst_element_make_from_uri (GST_URI_SRC, urisrc->uri, NULL, &err);
1481   if (!source)
1482     goto no_source;
1483 
1484   GST_LOG_OBJECT (urisrc, "found source type %s", G_OBJECT_TYPE_NAME (source));
1485 
1486   urisrc->is_stream = IS_STREAM_URI (urisrc->uri);
1487 
1488   query = gst_query_new_scheduling ();
1489   if (gst_element_query (source, query)) {
1490     gst_query_parse_scheduling (query, &flags, NULL, NULL, NULL);
1491     if ((flags & GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED))
1492       urisrc->is_stream = TRUE;
1493   }
1494   gst_query_unref (query);
1495 
1496   GST_LOG_OBJECT (urisrc, "source is stream: %d", urisrc->is_stream);
1497 
1498   urisrc->need_queue = IS_QUEUE_URI (urisrc->uri);
1499   GST_LOG_OBJECT (urisrc, "source needs queue: %d", urisrc->need_queue);
1500 
1501   source_class = G_OBJECT_GET_CLASS (source);
1502 
1503   pspec = g_object_class_find_property (source_class, "connection-speed");
1504   if (pspec != NULL) {
1505     guint64 speed = urisrc->connection_speed / 1000;
1506     gboolean wrong_type = FALSE;
1507 
1508     if (G_PARAM_SPEC_TYPE (pspec) == G_TYPE_PARAM_UINT) {
1509       GParamSpecUInt *pspecuint = G_PARAM_SPEC_UINT (pspec);
1510 
1511       speed = CLAMP (speed, pspecuint->minimum, pspecuint->maximum);
1512     } else if (G_PARAM_SPEC_TYPE (pspec) == G_TYPE_PARAM_INT) {
1513       GParamSpecInt *pspecint = G_PARAM_SPEC_INT (pspec);
1514 
1515       speed = CLAMP (speed, pspecint->minimum, pspecint->maximum);
1516     } else if (G_PARAM_SPEC_TYPE (pspec) == G_TYPE_PARAM_UINT64) {
1517       GParamSpecUInt64 *pspecuint = G_PARAM_SPEC_UINT64 (pspec);
1518 
1519       speed = CLAMP (speed, pspecuint->minimum, pspecuint->maximum);
1520     } else if (G_PARAM_SPEC_TYPE (pspec) == G_TYPE_PARAM_INT64) {
1521       GParamSpecInt64 *pspecint = G_PARAM_SPEC_INT64 (pspec);
1522 
1523       speed = CLAMP (speed, pspecint->minimum, pspecint->maximum);
1524     } else {
1525       GST_WARNING_OBJECT (urisrc,
1526           "The connection speed property %" G_GUINT64_FORMAT
1527           " of type %s is not useful. Not setting it", speed,
1528           g_type_name (G_PARAM_SPEC_TYPE (pspec)));
1529       wrong_type = TRUE;
1530     }
1531 
1532     if (!wrong_type) {
1533       g_object_set (source, "connection-speed", speed, NULL);
1534 
1535       GST_DEBUG_OBJECT (urisrc,
1536           "setting connection-speed=%" G_GUINT64_FORMAT " to source element",
1537           speed);
1538     }
1539   }
1540 
1541   return source;
1542 
1543   /* ERRORS */
1544 no_uri:
1545   {
1546     GST_ELEMENT_ERROR (urisrc, RESOURCE, NOT_FOUND,
1547         (_("No URI specified to play from.")), (NULL));
1548     return NULL;
1549   }
1550 invalid_uri:
1551   {
1552     GST_ELEMENT_ERROR (urisrc, RESOURCE, NOT_FOUND,
1553         (_("Invalid URI \"%s\"."), urisrc->uri), (NULL));
1554     g_clear_error (&err);
1555     return NULL;
1556   }
1557 uri_blacklisted:
1558   {
1559     GST_ELEMENT_ERROR (urisrc, RESOURCE, FAILED,
1560         (_("This stream type cannot be played yet.")), (NULL));
1561     return NULL;
1562   }
1563 no_source:
1564   {
1565     /* whoops, could not create the source element, dig a little deeper to
1566      * figure out what might be wrong. */
1567     if (err != NULL && err->code == GST_URI_ERROR_UNSUPPORTED_PROTOCOL) {
1568       gchar *prot;
1569 
1570       prot = gst_uri_get_protocol (urisrc->uri);
1571       if (prot == NULL)
1572         goto invalid_uri;
1573 
1574       gst_element_post_message (GST_ELEMENT_CAST (urisrc),
1575           gst_missing_uri_source_message_new (GST_ELEMENT (urisrc), prot));
1576 
1577       GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN,
1578           (_("No URI handler implemented for \"%s\"."), prot), (NULL));
1579 
1580       g_free (prot);
1581     } else {
1582       GST_ELEMENT_ERROR (urisrc, RESOURCE, NOT_FOUND,
1583           ("%s", (err) ? err->message : "URI was not accepted by any element"),
1584           ("No element accepted URI '%s'", urisrc->uri));
1585     }
1586 
1587     g_clear_error (&err);
1588     return NULL;
1589   }
1590 }
1591 
1592 static gboolean
is_all_raw_caps(GstCaps * caps,GstCaps * rawcaps,gboolean * all_raw)1593 is_all_raw_caps (GstCaps * caps, GstCaps * rawcaps, gboolean * all_raw)
1594 {
1595   GstCaps *intersection;
1596   gint capssize;
1597   gboolean res = FALSE;
1598 
1599   if (caps == NULL)
1600     return FALSE;
1601 
1602   capssize = gst_caps_get_size (caps);
1603   /* no caps, skip and move to the next pad */
1604   if (capssize == 0 || gst_caps_is_empty (caps) || gst_caps_is_any (caps))
1605     goto done;
1606 
1607   intersection = gst_caps_intersect (caps, rawcaps);
1608   *all_raw = !gst_caps_is_empty (intersection)
1609       && (gst_caps_get_size (intersection) == capssize);
1610   gst_caps_unref (intersection);
1611 
1612   res = TRUE;
1613 
1614 done:
1615   return res;
1616 }
1617 
1618 /**
1619  * has_all_raw_caps:
1620  * @pad: a #GstPad
1621  * @all_raw: pointer to hold the result
1622  *
1623  * check if the caps of the pad are all raw. The caps are all raw if
1624  * all of its structures contain audio/x-raw or video/x-raw.
1625  *
1626  * Returns: %FALSE @pad has no caps. Else TRUE and @all_raw set t the result.
1627  */
1628 static gboolean
has_all_raw_caps(GstPad * pad,GstCaps * rawcaps,gboolean * all_raw)1629 has_all_raw_caps (GstPad * pad, GstCaps * rawcaps, gboolean * all_raw)
1630 {
1631   GstCaps *caps;
1632   gboolean res = FALSE;
1633 
1634   caps = gst_pad_query_caps (pad, NULL);
1635 
1636   GST_DEBUG_OBJECT (pad, "have caps %" GST_PTR_FORMAT, caps);
1637 
1638   res = is_all_raw_caps (caps, rawcaps, all_raw);
1639 
1640   gst_caps_unref (caps);
1641   return res;
1642 }
1643 
1644 static void
post_missing_plugin_error(GstElement * urisrc,const gchar * element_name)1645 post_missing_plugin_error (GstElement * urisrc, const gchar * element_name)
1646 {
1647   GstMessage *msg;
1648 
1649   msg = gst_missing_element_message_new (urisrc, element_name);
1650   gst_element_post_message (urisrc, msg);
1651 
1652   GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN,
1653       (_("Missing element '%s' - check your GStreamer installation."),
1654           element_name), (NULL));
1655 }
1656 
1657 /**
1658  * analyse_source:
1659  * @urisrc: a #GstURISourceBin
1660  * @is_raw: are all pads raw data
1661  * @have_out: does the source have output
1662  * @is_dynamic: is this a dynamic source
1663  * @use_queue: put a queue before raw output pads
1664  *
1665  * Check the source of @urisrc and collect information about it.
1666  *
1667  * @is_raw will be set to TRUE if the source only produces raw pads. When this
1668  * function returns, all of the raw pad of the source will be added
1669  * to @urisrc
1670  *
1671  * @have_out: will be set to TRUE if the source has output pads.
1672  *
1673  * @is_dynamic: TRUE if the element will create (more) pads dynamically later
1674  * on.
1675  *
1676  * Returns: FALSE if a fatal error occurred while scanning.
1677  */
1678 static gboolean
analyse_source(GstURISourceBin * urisrc,gboolean * is_raw,gboolean * have_out,gboolean * is_dynamic,gboolean use_queue)1679 analyse_source (GstURISourceBin * urisrc, gboolean * is_raw,
1680     gboolean * have_out, gboolean * is_dynamic, gboolean use_queue)
1681 {
1682   GstElementClass *elemclass;
1683   GList *walk;
1684   GstIterator *pads_iter;
1685   gboolean done = FALSE;
1686   gboolean res = TRUE;
1687   GstPad *pad;
1688   GValue item = { 0, };
1689   GstCaps *rawcaps = DEFAULT_CAPS;
1690 
1691   *have_out = FALSE;
1692   *is_raw = FALSE;
1693   *is_dynamic = FALSE;
1694 
1695   pads_iter = gst_element_iterate_src_pads (urisrc->source);
1696   while (!done) {
1697     switch (gst_iterator_next (pads_iter, &item)) {
1698       case GST_ITERATOR_ERROR:
1699         res = FALSE;
1700         /* FALLTHROUGH */
1701       case GST_ITERATOR_DONE:
1702         done = TRUE;
1703         break;
1704       case GST_ITERATOR_RESYNC:
1705         /* reset results and resync */
1706         *have_out = FALSE;
1707         *is_raw = FALSE;
1708         *is_dynamic = FALSE;
1709         gst_iterator_resync (pads_iter);
1710         break;
1711       case GST_ITERATOR_OK:
1712         pad = g_value_dup_object (&item);
1713         /* we now officially have an output pad */
1714         *have_out = TRUE;
1715 
1716         /* if FALSE, this pad has no caps and we continue with the next pad. */
1717         if (!has_all_raw_caps (pad, rawcaps, is_raw)) {
1718           gst_object_unref (pad);
1719           g_value_reset (&item);
1720           break;
1721         }
1722 
1723         /* caps on source pad are all raw, we can add the pad */
1724         if (*is_raw) {
1725           GstPad *output_pad;
1726 
1727           GST_URI_SOURCE_BIN_LOCK (urisrc);
1728           if (use_queue) {
1729             OutputSlotInfo *slot = get_output_slot (urisrc, FALSE, FALSE, NULL);
1730             if (!slot)
1731               goto no_slot;
1732 
1733             gst_pad_link (pad, slot->sinkpad);
1734 
1735             /* get the new raw srcpad */
1736             output_pad = gst_object_ref (slot->srcpad);
1737 
1738             GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1739 
1740             expose_output_pad (urisrc, output_pad);
1741             gst_object_unref (output_pad);
1742           } else {
1743             output_pad = create_output_pad (urisrc, pad);
1744 
1745             GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1746 
1747             expose_raw_output_pad (urisrc, pad, output_pad);
1748           }
1749           gst_object_unref (pad);
1750         } else {
1751           gst_object_unref (pad);
1752         }
1753         g_value_reset (&item);
1754         break;
1755     }
1756   }
1757   g_value_unset (&item);
1758   gst_iterator_free (pads_iter);
1759   gst_caps_unref (rawcaps);
1760 
1761   /* check for padtemplates that list SOMETIMES pads to
1762    * determine if the element is dynamic. */
1763   elemclass = GST_ELEMENT_GET_CLASS (urisrc->source);
1764   walk = gst_element_class_get_pad_template_list (elemclass);
1765   while (walk != NULL) {
1766     GstPadTemplate *templ;
1767 
1768     templ = (GstPadTemplate *) walk->data;
1769     if (GST_PAD_TEMPLATE_DIRECTION (templ) == GST_PAD_SRC) {
1770       if (GST_PAD_TEMPLATE_PRESENCE (templ) == GST_PAD_SOMETIMES)
1771         *is_dynamic = TRUE;
1772       break;
1773     }
1774     walk = g_list_next (walk);
1775   }
1776 
1777   return res;
1778 no_slot:
1779   {
1780     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1781     gst_object_unref (pad);
1782     g_value_unset (&item);
1783     gst_iterator_free (pads_iter);
1784     gst_caps_unref (rawcaps);
1785 
1786     return FALSE;
1787   }
1788 }
1789 
1790 /* Remove any adaptive demuxer element */
1791 static void
remove_demuxer(GstURISourceBin * bin)1792 remove_demuxer (GstURISourceBin * bin)
1793 {
1794   if (bin->demuxer) {
1795     GST_DEBUG_OBJECT (bin, "removing old demuxer element");
1796     gst_element_set_state (bin->demuxer, GST_STATE_NULL);
1797     gst_bin_remove (GST_BIN_CAST (bin), bin->demuxer);
1798     bin->demuxer = NULL;
1799   }
1800 }
1801 
1802 /* make a demuxer and connect to all the signals */
1803 static GstElement *
make_demuxer(GstURISourceBin * urisrc,GstCaps * caps)1804 make_demuxer (GstURISourceBin * urisrc, GstCaps * caps)
1805 {
1806   GList *factories, *eligible, *cur;
1807   GstElement *demuxer = NULL;
1808   GParamSpec *pspec;
1809 
1810   GST_LOG_OBJECT (urisrc, "making new adaptive demuxer");
1811 
1812   /* now create the demuxer element */
1813 
1814   /* FIXME: Fire a signal to get the demuxer? */
1815   factories = gst_element_factory_list_get_elements
1816       (GST_ELEMENT_FACTORY_TYPE_DEMUXER, GST_RANK_MARGINAL);
1817   eligible =
1818       gst_element_factory_list_filter (factories, caps, GST_PAD_SINK,
1819       gst_caps_is_fixed (caps));
1820   gst_plugin_feature_list_free (factories);
1821 
1822   if (eligible == NULL)
1823     goto no_demuxer;
1824 
1825   eligible = g_list_sort (eligible, gst_plugin_feature_rank_compare_func);
1826 
1827   for (cur = eligible; cur != NULL; cur = g_list_next (cur)) {
1828     GstElementFactory *factory = (GstElementFactory *) (cur->data);
1829     const gchar *klass =
1830         gst_element_factory_get_metadata (factory, GST_ELEMENT_METADATA_KLASS);
1831 
1832     /* Can't be a demuxer unless it has Demux in the klass name */
1833     if (!strstr (klass, "Demux") || !strstr (klass, "Adaptive"))
1834       continue;
1835 
1836     demuxer = gst_element_factory_create (factory, NULL);
1837     break;
1838   }
1839   gst_plugin_feature_list_free (eligible);
1840 
1841   if (!demuxer)
1842     goto no_demuxer;
1843 
1844   GST_DEBUG_OBJECT (urisrc, "Created adaptive demuxer %" GST_PTR_FORMAT,
1845       demuxer);
1846 
1847   /* set up callbacks to create the links between
1848    * demuxer streams and output */
1849   g_signal_connect (demuxer,
1850       "pad-added", G_CALLBACK (new_demuxer_pad_added_cb), urisrc);
1851   g_signal_connect (demuxer,
1852       "pad-removed", G_CALLBACK (pad_removed_cb), urisrc);
1853 
1854   /* Propagate connection-speed property */
1855   pspec = g_object_class_find_property (G_OBJECT_GET_CLASS (demuxer),
1856       "connection-speed");
1857   if (pspec != NULL)
1858     g_object_set (demuxer,
1859         "connection-speed", urisrc->connection_speed / 1000, NULL);
1860 
1861   return demuxer;
1862 
1863   /* ERRORS */
1864 no_demuxer:
1865   {
1866     /* FIXME: Fire the right error */
1867     GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN, (NULL),
1868         ("No demuxer element, check your installation"));
1869     return NULL;
1870   }
1871 }
1872 
1873 static void
handle_new_pad(GstURISourceBin * urisrc,GstPad * srcpad,GstCaps * caps)1874 handle_new_pad (GstURISourceBin * urisrc, GstPad * srcpad, GstCaps * caps)
1875 {
1876   gboolean is_raw;
1877   GstStructure *s;
1878   const gchar *media_type;
1879   gboolean do_download = FALSE;
1880 
1881   GST_URI_SOURCE_BIN_LOCK (urisrc);
1882 
1883   /* if this is a pad with all raw caps, we can expose it */
1884   if (is_all_raw_caps (caps, DEFAULT_CAPS, &is_raw) && is_raw) {
1885     GstPad *output_pad;
1886 
1887     GST_DEBUG_OBJECT (urisrc, "Found pad with raw caps %" GST_PTR_FORMAT
1888         ", exposing", caps);
1889     output_pad = create_output_pad (urisrc, srcpad);
1890     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1891 
1892     expose_raw_output_pad (urisrc, srcpad, output_pad);
1893     return;
1894   }
1895   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1896 
1897   s = gst_caps_get_structure (caps, 0);
1898   media_type = gst_structure_get_name (s);
1899 
1900   urisrc->is_adaptive = IS_ADAPTIVE_MEDIA (media_type);
1901 
1902   if (urisrc->is_adaptive) {
1903     GstPad *sinkpad;
1904     GstPadLinkReturn link_res;
1905 
1906     urisrc->demuxer = make_demuxer (urisrc, caps);
1907     if (!urisrc->demuxer)
1908       goto no_demuxer;
1909     gst_bin_add (GST_BIN_CAST (urisrc), urisrc->demuxer);
1910 
1911     sinkpad = gst_element_get_static_pad (urisrc->demuxer, "sink");
1912     if (sinkpad == NULL)
1913       goto no_demuxer_sink;
1914 
1915     link_res = gst_pad_link (srcpad, sinkpad);
1916 
1917     gst_object_unref (sinkpad);
1918     if (link_res != GST_PAD_LINK_OK)
1919       goto could_not_link;
1920 
1921     gst_element_sync_state_with_parent (urisrc->demuxer);
1922   } else if (!urisrc->is_stream) {
1923     GstPad *output_pad;
1924     /* We don't need slot here, expose immediately */
1925     GST_URI_SOURCE_BIN_LOCK (urisrc);
1926     output_pad = create_output_pad (urisrc, srcpad);
1927     expose_raw_output_pad (urisrc, srcpad, output_pad);
1928     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1929   } else {
1930     OutputSlotInfo *slot;
1931     GstPad *output_pad;
1932 
1933     /* only enable download buffering if the upstream duration is known */
1934     if (urisrc->download) {
1935       GstQuery *query = gst_query_new_duration (GST_FORMAT_BYTES);
1936       if (gst_pad_query (srcpad, query)) {
1937         gint64 dur;
1938         gst_query_parse_duration (query, NULL, &dur);
1939         do_download = (dur != -1);
1940       }
1941       gst_query_unref (query);
1942     }
1943 
1944     GST_DEBUG_OBJECT (urisrc, "check media-type %s, do_download:%d", media_type,
1945         do_download);
1946 
1947     GST_URI_SOURCE_BIN_LOCK (urisrc);
1948     slot = get_output_slot (urisrc, do_download, FALSE, NULL);
1949 
1950     if (slot == NULL || gst_pad_link (srcpad, slot->sinkpad) != GST_PAD_LINK_OK)
1951       goto could_not_link;
1952 
1953     gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
1954         pre_queue_event_probe, urisrc, NULL);
1955 
1956     output_pad = gst_object_ref (slot->srcpad);
1957     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1958 
1959     expose_output_pad (urisrc, output_pad);
1960     gst_object_unref (output_pad);
1961   }
1962 
1963   return;
1964 
1965   /* ERRORS */
1966 no_demuxer:
1967   {
1968     /* error was posted */
1969     return;
1970   }
1971 no_demuxer_sink:
1972   {
1973     GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
1974         (NULL), ("Adaptive demuxer element has no 'sink' pad"));
1975     return;
1976   }
1977 could_not_link:
1978   {
1979     GST_URI_SOURCE_BIN_UNLOCK (urisrc);
1980     GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
1981         (NULL), ("Can't link typefind to adaptive demuxer element"));
1982     return;
1983   }
1984 }
1985 
1986 /* signaled when we have a stream and we need to configure the download
1987  * buffering or regular buffering */
1988 static void
type_found(GstElement * typefind,guint probability,GstCaps * caps,GstURISourceBin * urisrc)1989 type_found (GstElement * typefind, guint probability,
1990     GstCaps * caps, GstURISourceBin * urisrc)
1991 {
1992   GstPad *srcpad = gst_element_get_static_pad (typefind, "src");
1993 
1994   GST_DEBUG_OBJECT (urisrc, "typefind found caps %" GST_PTR_FORMAT
1995       " on pad %" GST_PTR_FORMAT, caps, srcpad);
1996   handle_new_pad (urisrc, srcpad, caps);
1997 
1998   gst_object_unref (GST_OBJECT (srcpad));
1999 }
2000 
2001 /* setup typefind for any source. This will first plug a typefind element to the
2002  * source. After we find the type, we decide to whether to plug an adaptive
2003  * demuxer, or just link through queue2 (if needed) and expose the data */
2004 static gboolean
setup_typefind(GstURISourceBin * urisrc,GstPad * srcpad)2005 setup_typefind (GstURISourceBin * urisrc, GstPad * srcpad)
2006 {
2007   GstElement *typefind;
2008 
2009   /* now create the typefind element */
2010   typefind = gst_element_factory_make ("typefind", NULL);
2011   if (!typefind)
2012     goto no_typefind;
2013 
2014   /* Make sure the bin doesn't set the typefind running yet */
2015   gst_element_set_locked_state (typefind, TRUE);
2016 
2017   gst_bin_add (GST_BIN_CAST (urisrc), typefind);
2018 
2019   if (!srcpad) {
2020     if (!gst_element_link_pads (urisrc->source, NULL, typefind, "sink"))
2021       goto could_not_link;
2022   } else {
2023     GstPad *sinkpad = gst_element_get_static_pad (typefind, "sink");
2024     GstPadLinkReturn ret;
2025 
2026     ret = gst_pad_link (srcpad, sinkpad);
2027     gst_object_unref (sinkpad);
2028     if (ret != GST_PAD_LINK_OK)
2029       goto could_not_link;
2030   }
2031 
2032   urisrc->typefinds = g_list_append (urisrc->typefinds, typefind);
2033 
2034   /* connect a signal to find out when the typefind element found
2035    * a type */
2036   g_signal_connect (typefind, "have-type", G_CALLBACK (type_found), urisrc);
2037 
2038   /* Now it can start */
2039   gst_element_set_locked_state (typefind, FALSE);
2040   gst_element_sync_state_with_parent (typefind);
2041 
2042   return TRUE;
2043 
2044   /* ERRORS */
2045 no_typefind:
2046   {
2047     post_missing_plugin_error (GST_ELEMENT_CAST (urisrc), "typefind");
2048     GST_ELEMENT_ERROR (urisrc, CORE, MISSING_PLUGIN, (NULL),
2049         ("No typefind element, check your installation"));
2050     return FALSE;
2051   }
2052 could_not_link:
2053   {
2054     GST_ELEMENT_ERROR (urisrc, CORE, NEGOTIATION,
2055         (NULL), ("Can't link source to typefind element"));
2056     gst_bin_remove (GST_BIN_CAST (urisrc), typefind);
2057     return FALSE;
2058   }
2059 }
2060 
2061 static void
free_output_slot(OutputSlotInfo * slot,GstURISourceBin * urisrc)2062 free_output_slot (OutputSlotInfo * slot, GstURISourceBin * urisrc)
2063 {
2064   GST_DEBUG_OBJECT (urisrc, "removing old queue element and freeing slot %p",
2065       slot);
2066   if (slot->bitrate_changed_id > 0)
2067     g_signal_handler_disconnect (slot->queue, slot->bitrate_changed_id);
2068   slot->bitrate_changed_id = 0;
2069 
2070   gst_element_set_locked_state (slot->queue, TRUE);
2071   gst_element_set_state (slot->queue, GST_STATE_NULL);
2072   remove_buffering_msgs (urisrc, GST_OBJECT_CAST (slot->queue));
2073   gst_bin_remove (GST_BIN_CAST (urisrc), slot->queue);
2074 
2075   gst_object_unref (slot->sinkpad);
2076 
2077   /* deactivate and remove the srcpad */
2078   gst_pad_set_active (slot->srcpad, FALSE);
2079   gst_element_remove_pad (GST_ELEMENT_CAST (urisrc), slot->srcpad);
2080 
2081   g_free (slot);
2082 }
2083 
2084 static void
call_free_output_slot(GstURISourceBin * urisrc,OutputSlotInfo * slot)2085 call_free_output_slot (GstURISourceBin * urisrc, OutputSlotInfo * slot)
2086 {
2087   GST_LOG_OBJECT (urisrc, "free output slot in thread pool");
2088   free_output_slot (slot, urisrc);
2089 }
2090 
2091 /* must be called with GST_URI_SOURCE_BIN_LOCK */
2092 static void
free_output_slot_async(GstURISourceBin * urisrc,OutputSlotInfo * slot)2093 free_output_slot_async (GstURISourceBin * urisrc, OutputSlotInfo * slot)
2094 {
2095   GST_LOG_OBJECT (urisrc, "pushing output slot on thread pool to free");
2096   urisrc->out_slots = g_slist_remove (urisrc->out_slots, slot);
2097   gst_element_call_async (GST_ELEMENT_CAST (urisrc),
2098       (GstElementCallAsyncFunc) call_free_output_slot, slot, NULL);
2099 }
2100 
2101 static void
unexpose_src_pads(GstURISourceBin * urisrc,GstElement * element)2102 unexpose_src_pads (GstURISourceBin * urisrc, GstElement * element)
2103 {
2104   GstIterator *pads_iter;
2105   GValue item = { 0, };
2106   gboolean done = FALSE;
2107 
2108   pads_iter = gst_element_iterate_src_pads (element);
2109   while (!done) {
2110     switch (gst_iterator_next (pads_iter, &item)) {
2111       case GST_ITERATOR_ERROR:
2112         /* FALLTHROUGH */
2113       case GST_ITERATOR_DONE:
2114         done = TRUE;
2115         break;
2116       case GST_ITERATOR_RESYNC:
2117         gst_iterator_resync (pads_iter);
2118         break;
2119       case GST_ITERATOR_OK:
2120       {
2121         ChildSrcPadInfo *info;
2122         GstPad *pad = g_value_get_object (&item);
2123 
2124         if (!(info =
2125                 g_object_get_data (G_OBJECT (pad), "urisourcebin.srcpadinfo")))
2126           break;
2127 
2128         if (info->output_pad != NULL)
2129           remove_output_pad (urisrc, info->output_pad);
2130 
2131         g_value_reset (&item);
2132         break;
2133       }
2134     }
2135   }
2136   g_value_unset (&item);
2137   gst_iterator_free (pads_iter);
2138 }
2139 
2140 /* remove source and all related elements */
2141 static void
remove_source(GstURISourceBin * urisrc)2142 remove_source (GstURISourceBin * urisrc)
2143 {
2144 
2145   if (urisrc->source) {
2146     GstElement *source = urisrc->source;
2147 
2148     GST_DEBUG_OBJECT (urisrc, "removing old src element");
2149     unexpose_src_pads (urisrc, source);
2150     gst_element_set_state (source, GST_STATE_NULL);
2151 
2152     if (urisrc->src_np_sig_id) {
2153       g_signal_handler_disconnect (source, urisrc->src_np_sig_id);
2154       urisrc->src_np_sig_id = 0;
2155     }
2156     gst_bin_remove (GST_BIN_CAST (urisrc), source);
2157     urisrc->source = NULL;
2158   }
2159 
2160   if (urisrc->typefinds) {
2161     GList *iter, *next;
2162     GST_DEBUG_OBJECT (urisrc, "removing old typefind element");
2163     for (iter = urisrc->typefinds; iter; iter = next) {
2164       GstElement *typefind = iter->data;
2165 
2166       next = g_list_next (iter);
2167 
2168       unexpose_src_pads (urisrc, typefind);
2169       gst_element_set_state (typefind, GST_STATE_NULL);
2170       gst_bin_remove (GST_BIN_CAST (urisrc), typefind);
2171     }
2172     g_list_free (urisrc->typefinds);
2173     urisrc->typefinds = NULL;
2174   }
2175 
2176   GST_URI_SOURCE_BIN_LOCK (urisrc);
2177   g_slist_foreach (urisrc->out_slots, (GFunc) free_output_slot, urisrc);
2178   g_slist_free (urisrc->out_slots);
2179   urisrc->out_slots = NULL;
2180   GST_URI_SOURCE_BIN_UNLOCK (urisrc);
2181 
2182   if (urisrc->demuxer) {
2183     GST_DEBUG_OBJECT (urisrc, "removing old adaptive demux element");
2184     gst_element_set_state (urisrc->demuxer, GST_STATE_NULL);
2185     gst_bin_remove (GST_BIN_CAST (urisrc), urisrc->demuxer);
2186     urisrc->demuxer = NULL;
2187   }
2188 }
2189 
2190 /* is called when a dynamic source element created a new pad. */
2191 static void
source_new_pad(GstElement * element,GstPad * pad,GstURISourceBin * urisrc)2192 source_new_pad (GstElement * element, GstPad * pad, GstURISourceBin * urisrc)
2193 {
2194   GstCaps *caps;
2195 
2196   GST_DEBUG_OBJECT (urisrc, "Found new pad %s.%s in source element %s",
2197       GST_DEBUG_PAD_NAME (pad), GST_ELEMENT_NAME (element));
2198   caps = gst_pad_get_current_caps (pad);
2199   if (caps == NULL)
2200     setup_typefind (urisrc, pad);
2201   else {
2202     handle_new_pad (urisrc, pad, caps);
2203     gst_caps_unref (caps);
2204   }
2205 }
2206 
2207 static gboolean
is_live_source(GstElement * source)2208 is_live_source (GstElement * source)
2209 {
2210   GObjectClass *source_class = NULL;
2211   gboolean is_live = FALSE;
2212   GParamSpec *pspec;
2213 
2214   source_class = G_OBJECT_GET_CLASS (source);
2215   pspec = g_object_class_find_property (source_class, "is-live");
2216   if (!pspec || G_PARAM_SPEC_VALUE_TYPE (pspec) != G_TYPE_BOOLEAN)
2217     return FALSE;
2218 
2219   g_object_get (G_OBJECT (source), "is-live", &is_live, NULL);
2220 
2221   return is_live;
2222 }
2223 
2224 /* construct and run the source and demuxer elements until we found
2225  * all the streams or until a preroll queue has been filled.
2226 */
2227 static gboolean
setup_source(GstURISourceBin * urisrc)2228 setup_source (GstURISourceBin * urisrc)
2229 {
2230   gboolean is_raw, have_out, is_dynamic;
2231 
2232   GST_DEBUG_OBJECT (urisrc, "setup source");
2233 
2234   /* delete old src */
2235   remove_source (urisrc);
2236 
2237   /* create and configure an element that can handle the uri */
2238   if (!(urisrc->source = gen_source_element (urisrc)))
2239     goto no_source;
2240 
2241   /* state will be merged later - if file is not found, error will be
2242    * handled by the application right after. */
2243   gst_bin_add (GST_BIN_CAST (urisrc), urisrc->source);
2244 
2245   /* notify of the new source used */
2246   g_object_notify (G_OBJECT (urisrc), "source");
2247 
2248   g_signal_emit (urisrc, gst_uri_source_bin_signals[SIGNAL_SOURCE_SETUP],
2249       0, urisrc->source);
2250 
2251   if (is_live_source (urisrc->source))
2252     urisrc->is_stream = FALSE;
2253 
2254   /* remove the old demuxer now, if any */
2255   remove_demuxer (urisrc);
2256 
2257   /* see if the source element emits raw audio/video all by itself,
2258    * if so, we can create streams for the pads and be done with it.
2259    * Also check that is has source pads, if not, we assume it will
2260    * do everything itself.  */
2261   if (!analyse_source (urisrc, &is_raw, &have_out, &is_dynamic,
2262           urisrc->need_queue && urisrc->use_buffering))
2263     goto invalid_source;
2264 
2265   if (!is_dynamic) {
2266     if (is_raw) {
2267       GST_DEBUG_OBJECT (urisrc, "Source provides all raw data");
2268       /* source provides raw data, we added the pads and we can now signal a
2269        * no_more pads because we are done. */
2270       gst_element_no_more_pads (GST_ELEMENT_CAST (urisrc));
2271       return TRUE;
2272     } else if (!have_out) {
2273       GST_DEBUG_OBJECT (urisrc, "Source has no output pads");
2274 
2275       return TRUE;
2276     }
2277   } else {
2278     GST_DEBUG_OBJECT (urisrc, "Source has dynamic output pads");
2279     /* connect a handler for the new-pad signal */
2280     urisrc->src_np_sig_id =
2281         g_signal_connect (urisrc->source, "pad-added",
2282         G_CALLBACK (source_new_pad), urisrc);
2283   }
2284 
2285   if (is_raw) {
2286     GST_DEBUG_OBJECT (urisrc,
2287         "Got raw srcpads on a dynamic source, using them as is.");
2288 
2289     return TRUE;
2290   } else if (urisrc->is_stream) {
2291     GST_DEBUG_OBJECT (urisrc, "Setting up streaming");
2292     /* do the stream things here */
2293     if (!setup_typefind (urisrc, NULL))
2294       goto streaming_failed;
2295   } else {
2296     GstIterator *pads_iter;
2297     gboolean done = FALSE;
2298 
2299     /* Expose all non-raw srcpads */
2300     pads_iter = gst_element_iterate_src_pads (urisrc->source);
2301     while (!done) {
2302       GValue item = { 0, };
2303       GstPad *pad;
2304 
2305       switch (gst_iterator_next (pads_iter, &item)) {
2306         case GST_ITERATOR_ERROR:
2307           GST_WARNING_OBJECT (urisrc, "Error iterating pads on source element");
2308           /* FALLTHROUGH */
2309         case GST_ITERATOR_DONE:
2310           done = TRUE;
2311           break;
2312         case GST_ITERATOR_RESYNC:
2313           /* reset results and resync */
2314           gst_iterator_resync (pads_iter);
2315           break;
2316         case GST_ITERATOR_OK:
2317           pad = g_value_get_object (&item);
2318           if (!setup_typefind (urisrc, pad)) {
2319             gst_iterator_free (pads_iter);
2320             goto streaming_failed;
2321           }
2322           g_value_reset (&item);
2323           break;
2324       }
2325     }
2326     gst_iterator_free (pads_iter);
2327   }
2328 
2329   return TRUE;
2330 
2331   /* ERRORS */
2332 no_source:
2333   {
2334     /* error message was already posted */
2335     return FALSE;
2336   }
2337 invalid_source:
2338   {
2339     GST_ELEMENT_ERROR (urisrc, CORE, FAILED,
2340         (_("Source element is invalid.")), (NULL));
2341     return FALSE;
2342   }
2343 streaming_failed:
2344   {
2345     /* message was posted */
2346     return FALSE;
2347   }
2348 }
2349 
2350 static void
value_list_append_structure_list(GValue * list_val,GstStructure ** first,GList * structure_list)2351 value_list_append_structure_list (GValue * list_val, GstStructure ** first,
2352     GList * structure_list)
2353 {
2354   GList *l;
2355 
2356   for (l = structure_list; l != NULL; l = l->next) {
2357     GValue val = { 0, };
2358 
2359     if (*first == NULL)
2360       *first = gst_structure_copy ((GstStructure *) l->data);
2361 
2362     g_value_init (&val, GST_TYPE_STRUCTURE);
2363     g_value_take_boxed (&val, gst_structure_copy ((GstStructure *) l->data));
2364     gst_value_list_append_value (list_val, &val);
2365     g_value_unset (&val);
2366   }
2367 }
2368 
2369 /* if it's a redirect message with multiple redirect locations we might
2370  * want to pick a different 'best' location depending on the required
2371  * bitrates and the connection speed */
2372 static GstMessage *
handle_redirect_message(GstURISourceBin * urisrc,GstMessage * msg)2373 handle_redirect_message (GstURISourceBin * urisrc, GstMessage * msg)
2374 {
2375   const GValue *locations_list, *location_val;
2376   GstMessage *new_msg;
2377   GstStructure *new_structure = NULL;
2378   GList *l_good = NULL, *l_neutral = NULL, *l_bad = NULL;
2379   GValue new_list = { 0, };
2380   guint size, i;
2381   const GstStructure *structure;
2382 
2383   GST_DEBUG_OBJECT (urisrc, "redirect message: %" GST_PTR_FORMAT, msg);
2384   GST_DEBUG_OBJECT (urisrc, "connection speed: %" G_GUINT64_FORMAT,
2385       urisrc->connection_speed);
2386 
2387   structure = gst_message_get_structure (msg);
2388   if (urisrc->connection_speed == 0 || structure == NULL)
2389     return msg;
2390 
2391   locations_list = gst_structure_get_value (structure, "locations");
2392   if (locations_list == NULL)
2393     return msg;
2394 
2395   size = gst_value_list_get_size (locations_list);
2396   if (size < 2)
2397     return msg;
2398 
2399   /* maintain existing order as much as possible, just sort references
2400    * with too high a bitrate to the end (the assumption being that if
2401    * bitrates are given they are given for all interesting streams and
2402    * that the you-need-at-least-version-xyz redirect has the same bitrate
2403    * as the lowest referenced redirect alternative) */
2404   for (i = 0; i < size; ++i) {
2405     const GstStructure *s;
2406     gint bitrate = 0;
2407 
2408     location_val = gst_value_list_get_value (locations_list, i);
2409     s = (const GstStructure *) g_value_get_boxed (location_val);
2410     if (!gst_structure_get_int (s, "minimum-bitrate", &bitrate) || bitrate <= 0) {
2411       GST_DEBUG_OBJECT (urisrc, "no bitrate: %" GST_PTR_FORMAT, s);
2412       l_neutral = g_list_append (l_neutral, (gpointer) s);
2413     } else if (bitrate > urisrc->connection_speed) {
2414       GST_DEBUG_OBJECT (urisrc, "bitrate too high: %" GST_PTR_FORMAT, s);
2415       l_bad = g_list_append (l_bad, (gpointer) s);
2416     } else if (bitrate <= urisrc->connection_speed) {
2417       GST_DEBUG_OBJECT (urisrc, "bitrate OK: %" GST_PTR_FORMAT, s);
2418       l_good = g_list_append (l_good, (gpointer) s);
2419     }
2420   }
2421 
2422   g_value_init (&new_list, GST_TYPE_LIST);
2423   value_list_append_structure_list (&new_list, &new_structure, l_good);
2424   value_list_append_structure_list (&new_list, &new_structure, l_neutral);
2425   value_list_append_structure_list (&new_list, &new_structure, l_bad);
2426   gst_structure_take_value (new_structure, "locations", &new_list);
2427 
2428   g_list_free (l_good);
2429   g_list_free (l_neutral);
2430   g_list_free (l_bad);
2431 
2432   new_msg = gst_message_new_element (msg->src, new_structure);
2433   gst_message_unref (msg);
2434 
2435   GST_DEBUG_OBJECT (urisrc, "new redirect message: %" GST_PTR_FORMAT, new_msg);
2436   return new_msg;
2437 }
2438 
2439 static void
handle_buffering_message(GstURISourceBin * urisrc,GstMessage * msg)2440 handle_buffering_message (GstURISourceBin * urisrc, GstMessage * msg)
2441 {
2442   gint perc, msg_perc;
2443   gint smaller_perc = 100;
2444   GstMessage *smaller = NULL;
2445   GList *found = NULL;
2446   GList *iter;
2447   OutputSlotInfo *slot;
2448 
2449   /* buffering messages must be aggregated as there might be multiple
2450    * multiqueue in the pipeline and their independent buffering messages
2451    * will confuse the application
2452    *
2453    * urisourcebin keeps a list of messages received from elements that are
2454    * buffering.
2455    * Rules are:
2456    * 0) Ignore buffering from elements that are draining (is_eos == TRUE)
2457    * 1) Always post the smaller buffering %
2458    * 2) If an element posts a 100% buffering message, remove it from the list
2459    * 3) When there are no more messages on the list, post 100% message
2460    * 4) When an element posts a new buffering message, update the one
2461    *    on the list to this new value
2462    */
2463   gst_message_parse_buffering (msg, &msg_perc);
2464   GST_LOG_OBJECT (urisrc, "Got buffering msg from %" GST_PTR_FORMAT
2465       " with %d%%", GST_MESSAGE_SRC (msg), msg_perc);
2466 
2467   slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (msg)),
2468       "urisourcebin.slotinfo");
2469 
2470   BUFFERING_LOCK (urisrc);
2471   if (slot && slot->is_eos) {
2472     /* Ignore buffering messages from queues we marked as EOS,
2473      * we already removed those from the list of buffering
2474      * objects */
2475     BUFFERING_UNLOCK (urisrc);
2476     gst_message_replace (&msg, NULL);
2477     return;
2478   }
2479 
2480 
2481   g_mutex_lock (&urisrc->buffering_post_lock);
2482 
2483   /*
2484    * Single loop for 2 things:
2485    * 1) Look for a message with the same source
2486    *   1.1) If the received message is 100%, remove it from the list
2487    * 2) Find the minimum buffering from the list from elements that aren't EOS
2488    */
2489   for (iter = urisrc->buffering_status; iter;) {
2490     GstMessage *bufstats = iter->data;
2491     gboolean is_eos = FALSE;
2492 
2493     slot = g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (bufstats)),
2494         "urisourcebin.slotinfo");
2495     if (slot)
2496       is_eos = slot->is_eos;
2497 
2498     if (GST_MESSAGE_SRC (bufstats) == GST_MESSAGE_SRC (msg)) {
2499       found = iter;
2500       if (msg_perc < 100) {
2501         gst_message_unref (iter->data);
2502         bufstats = iter->data = gst_message_ref (msg);
2503       } else {
2504         GList *current = iter;
2505 
2506         /* remove the element here and avoid confusing the loop */
2507         iter = g_list_next (iter);
2508 
2509         gst_message_unref (current->data);
2510         urisrc->buffering_status =
2511             g_list_delete_link (urisrc->buffering_status, current);
2512 
2513         continue;
2514       }
2515     }
2516 
2517     /* only update minimum stat for non-EOS slots */
2518     if (!is_eos) {
2519       gst_message_parse_buffering (bufstats, &perc);
2520       if (perc < smaller_perc) {
2521         smaller_perc = perc;
2522         smaller = bufstats;
2523       }
2524     } else {
2525       GST_LOG_OBJECT (urisrc, "Ignoring buffering from EOS element");
2526     }
2527     iter = g_list_next (iter);
2528   }
2529 
2530   if (found == NULL && msg_perc < 100) {
2531     if (msg_perc < smaller_perc) {
2532       smaller_perc = msg_perc;
2533       smaller = msg;
2534     }
2535     urisrc->buffering_status =
2536         g_list_prepend (urisrc->buffering_status, gst_message_ref (msg));
2537   }
2538 
2539   if (smaller_perc == urisrc->last_buffering_pct) {
2540     /* Don't repeat our last buffering status */
2541     gst_message_replace (&msg, NULL);
2542   } else {
2543     urisrc->last_buffering_pct = smaller_perc;
2544 
2545     /* now compute the buffering message that should be posted */
2546     if (smaller_perc == 100) {
2547       g_assert (urisrc->buffering_status == NULL);
2548       /* we are posting the original received msg */
2549     } else {
2550       gst_message_replace (&msg, smaller);
2551     }
2552   }
2553   BUFFERING_UNLOCK (urisrc);
2554 
2555   if (msg) {
2556     GST_LOG_OBJECT (urisrc, "Sending buffering msg from %" GST_PTR_FORMAT
2557         " with %d%%", GST_MESSAGE_SRC (msg), smaller_perc);
2558     GST_BIN_CLASS (parent_class)->handle_message (GST_BIN (urisrc), msg);
2559   } else {
2560     GST_LOG_OBJECT (urisrc, "Dropped buffering msg as a repeat of %d%%",
2561         smaller_perc);
2562   }
2563   g_mutex_unlock (&urisrc->buffering_post_lock);
2564 }
2565 
2566 /* Remove any buffering message from the given source */
2567 static void
remove_buffering_msgs(GstURISourceBin * urisrc,GstObject * src)2568 remove_buffering_msgs (GstURISourceBin * urisrc, GstObject * src)
2569 {
2570   GList *iter;
2571   gboolean removed = FALSE, post;
2572 
2573   BUFFERING_LOCK (urisrc);
2574   g_mutex_lock (&urisrc->buffering_post_lock);
2575 
2576   GST_DEBUG_OBJECT (urisrc, "Removing %" GST_PTR_FORMAT
2577       " buffering messages", src);
2578 
2579   for (iter = urisrc->buffering_status; iter;) {
2580     GstMessage *bufstats = iter->data;
2581     if (GST_MESSAGE_SRC (bufstats) == src) {
2582       gst_message_unref (bufstats);
2583       urisrc->buffering_status =
2584           g_list_delete_link (urisrc->buffering_status, iter);
2585       removed = TRUE;
2586       break;
2587     }
2588     iter = g_list_next (iter);
2589   }
2590 
2591   post = (removed && urisrc->buffering_status == NULL);
2592   BUFFERING_UNLOCK (urisrc);
2593 
2594   if (post) {
2595     GST_DEBUG_OBJECT (urisrc, "Last buffering element done - posting 100%%");
2596 
2597     /* removed the last buffering element, post 100% */
2598     gst_element_post_message (GST_ELEMENT_CAST (urisrc),
2599         gst_message_new_buffering (GST_OBJECT_CAST (urisrc), 100));
2600   }
2601 
2602   g_mutex_unlock (&urisrc->buffering_post_lock);
2603 }
2604 
2605 static void
handle_message(GstBin * bin,GstMessage * msg)2606 handle_message (GstBin * bin, GstMessage * msg)
2607 {
2608   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (bin);
2609 
2610   switch (GST_MESSAGE_TYPE (msg)) {
2611     case GST_MESSAGE_ELEMENT:{
2612       if (gst_message_has_name (msg, "redirect")) {
2613         /* sort redirect messages based on the connection speed. This simplifies
2614          * the user of this element as it can in most cases just pick the first item
2615          * of the sorted list as a good redirection candidate. It can of course
2616          * choose something else from the list if it has a better way. */
2617         msg = handle_redirect_message (urisrc, msg);
2618       }
2619       break;
2620     }
2621     case GST_MESSAGE_BUFFERING:
2622       handle_buffering_message (urisrc, msg);
2623       msg = NULL;
2624       break;
2625     default:
2626       break;
2627   }
2628 
2629   if (msg)
2630     GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
2631 }
2632 
2633 /* generic struct passed to all query fold methods
2634  * FIXME, move to core.
2635  */
2636 typedef struct
2637 {
2638   GstQuery *query;
2639   gint64 min;
2640   gint64 max;
2641   gboolean seekable;
2642   gboolean live;
2643 } QueryFold;
2644 
2645 typedef void (*QueryInitFunction) (GstURISourceBin * urisrc, QueryFold * fold);
2646 typedef void (*QueryDoneFunction) (GstURISourceBin * urisrc, QueryFold * fold);
2647 
2648 /* for duration/position we collect all durations/positions and take
2649  * the MAX of all valid results */
2650 static void
uri_source_query_init(GstURISourceBin * urisrc,QueryFold * fold)2651 uri_source_query_init (GstURISourceBin * urisrc, QueryFold * fold)
2652 {
2653   fold->min = 0;
2654   fold->max = -1;
2655   fold->seekable = TRUE;
2656   fold->live = 0;
2657 }
2658 
2659 static gboolean
uri_source_query_duration_fold(const GValue * item,GValue * ret,QueryFold * fold)2660 uri_source_query_duration_fold (const GValue * item, GValue * ret,
2661     QueryFold * fold)
2662 {
2663   GstPad *pad = g_value_get_object (item);
2664 
2665   if (gst_pad_query (pad, fold->query)) {
2666     gint64 duration;
2667 
2668     g_value_set_boolean (ret, TRUE);
2669 
2670     gst_query_parse_duration (fold->query, NULL, &duration);
2671 
2672     GST_DEBUG_OBJECT (item, "got duration %" G_GINT64_FORMAT, duration);
2673 
2674     if (duration > fold->max)
2675       fold->max = duration;
2676   }
2677   return TRUE;
2678 }
2679 
2680 static void
uri_source_query_duration_done(GstURISourceBin * urisrc,QueryFold * fold)2681 uri_source_query_duration_done (GstURISourceBin * urisrc, QueryFold * fold)
2682 {
2683   GstFormat format;
2684 
2685   gst_query_parse_duration (fold->query, &format, NULL);
2686   /* store max in query result */
2687   gst_query_set_duration (fold->query, format, fold->max);
2688 
2689   GST_DEBUG ("max duration %" G_GINT64_FORMAT, fold->max);
2690 }
2691 
2692 static gboolean
uri_source_query_position_fold(const GValue * item,GValue * ret,QueryFold * fold)2693 uri_source_query_position_fold (const GValue * item, GValue * ret,
2694     QueryFold * fold)
2695 {
2696   GstPad *pad = g_value_get_object (item);
2697 
2698   if (gst_pad_query (pad, fold->query)) {
2699     gint64 position;
2700 
2701     g_value_set_boolean (ret, TRUE);
2702 
2703     gst_query_parse_position (fold->query, NULL, &position);
2704 
2705     GST_DEBUG_OBJECT (item, "got position %" G_GINT64_FORMAT, position);
2706 
2707     if (position > fold->max)
2708       fold->max = position;
2709   }
2710 
2711   return TRUE;
2712 }
2713 
2714 static void
uri_source_query_position_done(GstURISourceBin * urisrc,QueryFold * fold)2715 uri_source_query_position_done (GstURISourceBin * urisrc, QueryFold * fold)
2716 {
2717   GstFormat format;
2718 
2719   gst_query_parse_position (fold->query, &format, NULL);
2720   /* store max in query result */
2721   gst_query_set_position (fold->query, format, fold->max);
2722 
2723   GST_DEBUG_OBJECT (urisrc, "max position %" G_GINT64_FORMAT, fold->max);
2724 }
2725 
2726 static gboolean
uri_source_query_latency_fold(const GValue * item,GValue * ret,QueryFold * fold)2727 uri_source_query_latency_fold (const GValue * item, GValue * ret,
2728     QueryFold * fold)
2729 {
2730   GstPad *pad = g_value_get_object (item);
2731 
2732   if (gst_pad_query (pad, fold->query)) {
2733     GstClockTime min, max;
2734     gboolean live;
2735 
2736     gst_query_parse_latency (fold->query, &live, &min, &max);
2737 
2738     GST_DEBUG_OBJECT (pad,
2739         "got latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
2740         ", live %d", GST_TIME_ARGS (min), GST_TIME_ARGS (max), live);
2741 
2742     if (live) {
2743       /* for the combined latency we collect the MAX of all min latencies and
2744        * the MIN of all max latencies */
2745       if (min > fold->min)
2746         fold->min = min;
2747       if (fold->max == -1)
2748         fold->max = max;
2749       else if (max < fold->max)
2750         fold->max = max;
2751 
2752       fold->live = TRUE;
2753     }
2754   } else {
2755     GST_LOG_OBJECT (pad, "latency query failed");
2756     g_value_set_boolean (ret, FALSE);
2757   }
2758 
2759   return TRUE;
2760 }
2761 
2762 static void
uri_source_query_latency_done(GstURISourceBin * urisrc,QueryFold * fold)2763 uri_source_query_latency_done (GstURISourceBin * urisrc, QueryFold * fold)
2764 {
2765   /* store max in query result */
2766   gst_query_set_latency (fold->query, fold->live, fold->min, fold->max);
2767 
2768   GST_DEBUG_OBJECT (urisrc,
2769       "latency min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT
2770       ", live %d", GST_TIME_ARGS (fold->min), GST_TIME_ARGS (fold->max),
2771       fold->live);
2772 }
2773 
2774 /* we are seekable if all srcpads are seekable */
2775 static gboolean
uri_source_query_seeking_fold(const GValue * item,GValue * ret,QueryFold * fold)2776 uri_source_query_seeking_fold (const GValue * item, GValue * ret,
2777     QueryFold * fold)
2778 {
2779   GstPad *pad = g_value_get_object (item);
2780 
2781   if (gst_pad_query (pad, fold->query)) {
2782     gboolean seekable;
2783 
2784     g_value_set_boolean (ret, TRUE);
2785     gst_query_parse_seeking (fold->query, NULL, &seekable, NULL, NULL);
2786 
2787     GST_DEBUG_OBJECT (item, "got seekable %d", seekable);
2788 
2789     if (fold->seekable)
2790       fold->seekable = seekable;
2791   }
2792 
2793   return TRUE;
2794 }
2795 
2796 static void
uri_source_query_seeking_done(GstURISourceBin * urisrc,QueryFold * fold)2797 uri_source_query_seeking_done (GstURISourceBin * urisrc, QueryFold * fold)
2798 {
2799   GstFormat format;
2800 
2801   gst_query_parse_seeking (fold->query, &format, NULL, NULL, NULL);
2802   gst_query_set_seeking (fold->query, format, fold->seekable, 0, -1);
2803 
2804   GST_DEBUG_OBJECT (urisrc, "seekable %d", fold->seekable);
2805 }
2806 
2807 /* generic fold, return first valid result */
2808 static gboolean
uri_source_query_generic_fold(const GValue * item,GValue * ret,QueryFold * fold)2809 uri_source_query_generic_fold (const GValue * item, GValue * ret,
2810     QueryFold * fold)
2811 {
2812   GstPad *pad = g_value_get_object (item);
2813   gboolean res;
2814 
2815   if ((res = gst_pad_query (pad, fold->query))) {
2816     g_value_set_boolean (ret, TRUE);
2817     GST_DEBUG_OBJECT (item, "answered query %p", fold->query);
2818   }
2819 
2820   /* and stop as soon as we have a valid result */
2821   return !res;
2822 }
2823 
2824 /* we're a bin, the default query handler iterates sink elements, which we don't
2825  * have normally. We should just query all source pads.
2826  */
2827 static gboolean
gst_uri_source_bin_query(GstElement * element,GstQuery * query)2828 gst_uri_source_bin_query (GstElement * element, GstQuery * query)
2829 {
2830   GstURISourceBin *urisrc;
2831   gboolean res = FALSE;
2832   GstIterator *iter;
2833   GstIteratorFoldFunction fold_func;
2834   QueryInitFunction fold_init = NULL;
2835   QueryDoneFunction fold_done = NULL;
2836   QueryFold fold_data;
2837   GValue ret = { 0 };
2838   gboolean default_ret = FALSE;
2839 
2840   urisrc = GST_URI_SOURCE_BIN (element);
2841 
2842   switch (GST_QUERY_TYPE (query)) {
2843     case GST_QUERY_DURATION:
2844       /* iterate and collect durations */
2845       fold_func = (GstIteratorFoldFunction) uri_source_query_duration_fold;
2846       fold_init = uri_source_query_init;
2847       fold_done = uri_source_query_duration_done;
2848       break;
2849     case GST_QUERY_POSITION:
2850       /* iterate and collect durations */
2851       fold_func = (GstIteratorFoldFunction) uri_source_query_position_fold;
2852       fold_init = uri_source_query_init;
2853       fold_done = uri_source_query_position_done;
2854       break;
2855     case GST_QUERY_LATENCY:
2856       /* iterate and collect durations */
2857       fold_func = (GstIteratorFoldFunction) uri_source_query_latency_fold;
2858       fold_init = uri_source_query_init;
2859       fold_done = uri_source_query_latency_done;
2860       default_ret = TRUE;
2861       break;
2862     case GST_QUERY_SEEKING:
2863       /* iterate and collect durations */
2864       fold_func = (GstIteratorFoldFunction) uri_source_query_seeking_fold;
2865       fold_init = uri_source_query_init;
2866       fold_done = uri_source_query_seeking_done;
2867       break;
2868     default:
2869       fold_func = (GstIteratorFoldFunction) uri_source_query_generic_fold;
2870       break;
2871   }
2872 
2873   fold_data.query = query;
2874 
2875   g_value_init (&ret, G_TYPE_BOOLEAN);
2876   g_value_set_boolean (&ret, default_ret);
2877 
2878   iter = gst_element_iterate_src_pads (element);
2879   GST_DEBUG_OBJECT (element, "Sending query %p (type %d) to src pads",
2880       query, GST_QUERY_TYPE (query));
2881 
2882   if (fold_init)
2883     fold_init (urisrc, &fold_data);
2884 
2885   while (TRUE) {
2886     GstIteratorResult ires;
2887 
2888     ires = gst_iterator_fold (iter, fold_func, &ret, &fold_data);
2889 
2890     switch (ires) {
2891       case GST_ITERATOR_RESYNC:
2892         gst_iterator_resync (iter);
2893         if (fold_init)
2894           fold_init (urisrc, &fold_data);
2895         g_value_set_boolean (&ret, default_ret);
2896         break;
2897       case GST_ITERATOR_OK:
2898       case GST_ITERATOR_DONE:
2899         res = g_value_get_boolean (&ret);
2900         if (fold_done != NULL && res)
2901           fold_done (urisrc, &fold_data);
2902         goto done;
2903       default:
2904         res = FALSE;
2905         goto done;
2906     }
2907   }
2908 done:
2909   gst_iterator_free (iter);
2910 
2911   return res;
2912 }
2913 
2914 static GstStateChangeReturn
gst_uri_source_bin_change_state(GstElement * element,GstStateChange transition)2915 gst_uri_source_bin_change_state (GstElement * element,
2916     GstStateChange transition)
2917 {
2918   GstStateChangeReturn ret;
2919   GstURISourceBin *urisrc = GST_URI_SOURCE_BIN (element);
2920 
2921   switch (transition) {
2922     case GST_STATE_CHANGE_READY_TO_PAUSED:
2923       GST_DEBUG ("ready to paused");
2924       if (!setup_source (urisrc))
2925         goto source_failed;
2926       break;
2927     default:
2928       break;
2929   }
2930 
2931   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
2932   if (ret == GST_STATE_CHANGE_FAILURE)
2933     goto setup_failed;
2934 
2935   switch (transition) {
2936     case GST_STATE_CHANGE_READY_TO_PAUSED:
2937       break;
2938     case GST_STATE_CHANGE_PAUSED_TO_READY:
2939       GST_DEBUG ("paused to ready");
2940       remove_demuxer (urisrc);
2941       remove_source (urisrc);
2942       g_list_free_full (urisrc->buffering_status,
2943           (GDestroyNotify) gst_message_unref);
2944       urisrc->buffering_status = NULL;
2945       urisrc->last_buffering_pct = -1;
2946       break;
2947     case GST_STATE_CHANGE_READY_TO_NULL:
2948       GST_DEBUG ("ready to null");
2949       remove_demuxer (urisrc);
2950       remove_source (urisrc);
2951       break;
2952     default:
2953       break;
2954   }
2955   return ret;
2956 
2957   /* ERRORS */
2958 source_failed:
2959   {
2960     return GST_STATE_CHANGE_FAILURE;
2961   }
2962 setup_failed:
2963   {
2964     /* clean up leftover groups */
2965     return GST_STATE_CHANGE_FAILURE;
2966   }
2967 }
2968