• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  *
3  * Copyright (C) <2015> Centricular Ltd
4  *  @author: Edward Hervey <edward@centricular.com>
5  *  @author: Jan Schmidt <jan@centricular.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26 
27 #include <glib.h>
28 #include <glib-object.h>
29 #include <glib/gprintf.h>
30 #include <gst/gst.h>
31 #include <gst/pbutils/pbutils.h>
32 
33 #include "gstplaybackelements.h"
34 #include "gstplay-enum.h"
35 #include "gstrawcaps.h"
36 
37 /**
38  * SECTION:element-decodebin3
39  * @title: decodebin3
40  *
41  * #GstBin that auto-magically constructs a decoding pipeline using available
42  * decoders and demuxers via auto-plugging. The output is raw audio, video
43  * or subtitle streams.
44  *
45  * decodebin3 differs from the previous decodebin (decodebin2) in important ways:
46  *
47  * * supports publication and selection of stream information via
48  * GstStreamCollection messages and #GST_EVENT_SELECT_STREAMS events.
49  *
50  * * dynamically switches stream connections internally, and
51  * reuses decoder elements when stream selections change, so that in
52  * the normal case it maintains 1 decoder of each type (video/audio/subtitle)
53  * and only creates new elements when streams change and an existing decoder
54  * is not capable of handling the new format.
55  *
56  * * supports multiple input pads for the parallel decoding of auxiliary streams
57  * not muxed with the primary stream.
58  *
59  * * does not handle network stream buffering. decodebin3 expects that network stream
60  * buffering is handled upstream, before data is passed to it.
61  *
62  * > decodebin3 is still experimental API and a technology preview.
63  * > Its behaviour and exposed API is subject to change.
64  *
65  */
66 
67 /*
68  * Global design
69  *
70  * 1) From sink pad to elementary streams (GstParseBin)
71  *
72  * The input sink pads are fed to GstParseBin. GstParseBin will feed them
73  * through typefind. When the caps are detected (or changed) we recursively
74  * figure out which demuxer, parser or depayloader is needed until we get to
75  * elementary streams.
76  *
77  * All elementary streams (whether decoded or not, whether exposed or not) are
78  * fed through multiqueue. There is only *one* multiqueue in decodebin3.
79  *
80  * => MultiQueue is the cornerstone.
81  * => No buffering before multiqueue
82  *
83  * 2) Elementary streams
84  *
85  * After GstParseBin, there are 3 main components:
86  *  1) Input Streams (provided by GstParseBin)
87  *  2) Multiqueue slots
88  *  3) Output Streams
89  *
90  * Input Streams correspond to the stream coming from GstParseBin and that gets
91  * fed into a multiqueue slot.
92  *
93  * Output Streams correspond to the combination of a (optional) decoder and an
94  * output ghostpad. Output Streams can be moved from one multiqueue slot to
95  * another, can reconfigure itself (different decoders), and can be
96  * added/removed depending on the configuration (all streams outputted, only one
97  * of each type, ...).
98  *
99  * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For
100  * each 'active' Input Stream there is a corresponding slot.
101  * Slots might have different streams on input and output (due to internal
102  * buffering).
103  *
104  * Due to internal queuing/buffering/..., all those components (might) behave
105  * asynchronously. Therefore probes will be used on each component source pad to
106  * detect various key-points:
107  *  * EOS :
108  *     the stream is done => Mark that component as done, optionally freeing/removing it
109  *  * STREAM_START :
110  *     a new stream is starting => link it further if needed
111  *
112  * 3) Gradual replacement
113  *
114  * If the caps change at any point in decodebin (input sink pad, demuxer output,
115  * multiqueue output, ..), we gradually replace (if needed) the following elements.
116  *
117  * This is handled by the probes in various locations:
118  *  a) typefind output
119  *  b) multiqueue input (source pad of Input Streams)
120  *  c) multiqueue output (source pad of Multiqueue Slots)
121  *  d) final output (target of source ghostpads)
122  *
123  * When CAPS event arrive at those points, one of three things can happen:
124  * a) There is no elements downstream yet, just create/link-to following elements
125  * b) There are downstream elements, do a ACCEPT_CAPS query
126  *  b.1) The new CAPS are accepted, keep current configuration
127  *  b.2) The new CAPS are not accepted, remove following elements then do a)
128  *
129  *    Components:
130  *
131  *                                                   MultiQ     Output
132  *                     Input(s)                      Slots      Streams
133  *  /-------------------------------------------\   /-----\  /------------- \
134  *
135  * +-------------------------------------------------------------------------+
136  * |                                                                         |
137  * | +---------------------------------------------+                         |
138  * | |   GstParseBin(s)                            |                         |
139  * | |                +--------------+             |  +-----+                |
140  * | |                |              |---[parser]-[|--| Mul |---[ decoder ]-[|
141  * |]--[ typefind ]---|  demuxer(s)  |------------[|  | ti  |                |
142  * | |                |  (if needed) |---[parser]-[|--| qu  |                |
143  * | |                |              |---[parser]-[|--| eu  |---[ decoder ]-[|
144  * | |                +--------------+             |  +------             ^  |
145  * | +---------------------------------------------+        ^             |  |
146  * |                                               ^        |             |  |
147  * +-----------------------------------------------+--------+-------------+--+
148  *                                                 |        |             |
149  *                                                 |        |             |
150  *                                       Probes  --/--------/-------------/
151  *
152  * ATOMIC SWITCHING
153  *
154  * We want to ensure we re-use decoders when switching streams. This takes place
155  * at the multiqueue output level.
156  *
157  * MAIN CONCEPTS
158  *  1) Activating a stream (i.e. linking a slot to an output) is only done within
159  *    the streaming thread in the multiqueue_src_probe() and only if the
160       stream is in the REQUESTED selection.
161  *  2) Deactivating a stream (i.e. unlinking a slot from an output) is also done
162  *    within the stream thread, but only in a purposefully called IDLE probe
163  *    that calls reassign_slot().
164  *
165  * Based on those two principles, 3 "selection" of streams (stream-id) are used:
166  * 1) requested_selection
167  *    All streams within that list should be activated
168  * 2) active_selection
169  *    List of streams that are exposed by decodebin
170  * 3) to_activate
171  *    List of streams that will be moved to requested_selection in the
172  *    reassign_slot() method (i.e. once a stream was deactivated, and the output
173  *    was retargetted)
174  */
175 
176 
177 GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
178 #define GST_CAT_DEFAULT decodebin3_debug
179 
180 #define GST_TYPE_DECODEBIN3	 (gst_decodebin3_get_type ())
181 
182 #define EXTRA_DEBUG 1
183 
184 #define CUSTOM_FINAL_EOS_QUARK _custom_final_eos_quark_get ()
185 #define CUSTOM_FINAL_EOS_QUARK_DATA "custom-final-eos"
186 static GQuark
_custom_final_eos_quark_get(void)187 _custom_final_eos_quark_get (void)
188 {
189   static gsize g_quark;
190 
191   if (g_once_init_enter (&g_quark)) {
192     gsize quark =
193         (gsize) g_quark_from_static_string ("decodebin3-custom-final-eos");
194     g_once_init_leave (&g_quark, quark);
195   }
196   return g_quark;
197 }
198 
199 typedef struct _GstDecodebin3 GstDecodebin3;
200 typedef struct _GstDecodebin3Class GstDecodebin3Class;
201 
202 typedef struct _DecodebinInputStream DecodebinInputStream;
203 typedef struct _DecodebinInput DecodebinInput;
204 typedef struct _DecodebinOutputStream DecodebinOutputStream;
205 
206 struct _GstDecodebin3
207 {
208   GstBin bin;
209 
210   /* input_lock protects the following variables */
211   GMutex input_lock;
212   /* Main input (static sink pad) */
213   DecodebinInput *main_input;
214   /* Supplementary input (request sink pads) */
215   GList *other_inputs;
216   /* counter for input */
217   guint32 input_counter;
218   /* Current stream group_id (default : GST_GROUP_ID_INVALID) */
219   /* FIXME : Needs to be reset appropriately (when upstream changes ?) */
220   guint32 current_group_id;
221   /* End of variables protected by input_lock */
222 
223   GstElement *multiqueue;
224   GstClockTime default_mq_min_interleave;
225   GstClockTime current_mq_min_interleave;
226 
227   /* selection_lock protects access to following variables */
228   GMutex selection_lock;
229   GList *input_streams;         /* List of DecodebinInputStream for active collection */
230   GList *output_streams;        /* List of DecodebinOutputStream used for output */
231   GList *slots;                 /* List of MultiQueueSlot */
232   guint slot_id;
233 
234   /* Active collection */
235   GstStreamCollection *collection;
236   /* requested selection of stream-id to activate post-multiqueue */
237   GList *requested_selection;
238   /* list of stream-id currently activated in output */
239   GList *active_selection;
240   /* List of stream-id that need to be activated (after a stream switch for ex) */
241   GList *to_activate;
242   /* Pending select streams event */
243   guint32 select_streams_seqnum;
244   /* pending list of streams to select (from downstream) */
245   GList *pending_select_streams;
246   /* TRUE if requested_selection was updated, will become FALSE once
247    * it has fully transitioned to active */
248   gboolean selection_updated;
249   /* End of variables protected by selection_lock */
250 
251   /* List of pending collections.
252    * FIXME : Is this really needed ? */
253   GList *pending_collection;
254 
255   /* Factories */
256   GMutex factories_lock;
257   guint32 factories_cookie;
258   /* All DECODABLE factories */
259   GList *factories;
260   /* Only DECODER factories */
261   GList *decoder_factories;
262   /* DECODABLE but not DECODER factories */
263   GList *decodable_factories;
264 
265   /* counters for pads */
266   guint32 apadcount, vpadcount, tpadcount, opadcount;
267 
268   /* Properties */
269   GstCaps *caps;
270 };
271 
272 struct _GstDecodebin3Class
273 {
274   GstBinClass class;
275 
276     gint (*select_stream) (GstDecodebin3 * dbin,
277       GstStreamCollection * collection, GstStream * stream);
278 };
279 
280 /* Input of decodebin, controls input pad and parsebin */
281 struct _DecodebinInput
282 {
283   GstDecodebin3 *dbin;
284 
285   gboolean is_main;
286 
287   GstPad *ghost_sink;
288   GstPad *parsebin_sink;
289 
290   GstStreamCollection *collection;      /* Active collection */
291 
292   guint group_id;
293 
294   GstElement *parsebin;
295 
296   gulong pad_added_sigid;
297   gulong pad_removed_sigid;
298   gulong drained_sigid;
299 
300   /* TRUE if the input got drained
301    * FIXME : When do we reset it if re-used ?
302    */
303   gboolean drained;
304 
305   /* HACK : Remove these fields */
306   /* List of PendingPad structures */
307   GList *pending_pads;
308 };
309 
310 /* Multiqueue Slots */
311 typedef struct _MultiQueueSlot
312 {
313   guint id;
314 
315   GstDecodebin3 *dbin;
316   /* Type of stream handled by this slot */
317   GstStreamType type;
318 
319   /* Linked input and output */
320   DecodebinInputStream *input;
321 
322   /* pending => last stream received on sink pad */
323   GstStream *pending_stream;
324   /* active => last stream outputted on source pad */
325   GstStream *active_stream;
326 
327   GstPad *sink_pad, *src_pad;
328 
329   /* id of the MQ src_pad event probe */
330   gulong probe_id;
331 
332   gboolean is_drained;
333 
334   DecodebinOutputStream *output;
335 } MultiQueueSlot;
336 
337 /* Streams that are exposed downstream (i.e. output) */
338 struct _DecodebinOutputStream
339 {
340   GstDecodebin3 *dbin;
341   /* The type of stream handled by this output stream */
342   GstStreamType type;
343 
344   /* The slot to which this output stream is currently connected to */
345   MultiQueueSlot *slot;
346 
347   GstElement *decoder;          /* Optional */
348   GstPad *decoder_sink, *decoder_src;
349   gboolean linked;
350 
351   /* ghostpad */
352   GstPad *src_pad;
353   /* Flag if ghost pad is exposed */
354   gboolean src_exposed;
355 
356   /* Reported decoder latency */
357   GstClockTime decoder_latency;
358 
359   /* keyframe dropping probe */
360   gulong drop_probe_id;
361 };
362 
363 /* Pending pads from parsebin */
364 typedef struct _PendingPad
365 {
366   GstDecodebin3 *dbin;
367   DecodebinInput *input;
368   GstPad *pad;
369 
370   gulong buffer_probe;
371   gulong event_probe;
372   gboolean saw_eos;
373 } PendingPad;
374 
375 /* properties */
376 enum
377 {
378   PROP_0,
379   PROP_CAPS
380 };
381 
382 /* signals */
383 enum
384 {
385   SIGNAL_SELECT_STREAM,
386   SIGNAL_ABOUT_TO_FINISH,
387   LAST_SIGNAL
388 };
389 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
390 
391 #define SELECTION_LOCK(dbin) G_STMT_START {				\
392     GST_LOG_OBJECT (dbin,						\
393 		    "selection locking from thread %p",			\
394 		    g_thread_self ());					\
395     g_mutex_lock (&dbin->selection_lock);				\
396     GST_LOG_OBJECT (dbin,						\
397 		    "selection locked from thread %p",			\
398 		    g_thread_self ());					\
399   } G_STMT_END
400 
401 #define SELECTION_UNLOCK(dbin) G_STMT_START {				\
402     GST_LOG_OBJECT (dbin,						\
403 		    "selection unlocking from thread %p",		\
404 		    g_thread_self ());					\
405     g_mutex_unlock (&dbin->selection_lock);				\
406   } G_STMT_END
407 
408 #define INPUT_LOCK(dbin) G_STMT_START {				\
409     GST_LOG_OBJECT (dbin,						\
410 		    "input locking from thread %p",			\
411 		    g_thread_self ());					\
412     g_mutex_lock (&dbin->input_lock);				\
413     GST_LOG_OBJECT (dbin,						\
414 		    "input locked from thread %p",			\
415 		    g_thread_self ());					\
416   } G_STMT_END
417 
418 #define INPUT_UNLOCK(dbin) G_STMT_START {				\
419     GST_LOG_OBJECT (dbin,						\
420 		    "input unlocking from thread %p",		\
421 		    g_thread_self ());					\
422     g_mutex_unlock (&dbin->input_lock);				\
423   } G_STMT_END
424 
425 GType gst_decodebin3_get_type (void);
426 #define gst_decodebin3_parent_class parent_class
427 G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN);
428 #define _do_init \
429     GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin");\
430     playback_element_init (plugin);
431 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (decodebin3, "decodebin3", GST_RANK_NONE,
432     GST_TYPE_DECODEBIN3, _do_init);
433 
434 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
435 
436 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
437     GST_PAD_SINK,
438     GST_PAD_ALWAYS,
439     GST_STATIC_CAPS_ANY);
440 
441 static GstStaticPadTemplate request_sink_template =
442 GST_STATIC_PAD_TEMPLATE ("sink_%u",
443     GST_PAD_SINK,
444     GST_PAD_REQUEST,
445     GST_STATIC_CAPS_ANY);
446 
447 static GstStaticPadTemplate video_src_template =
448 GST_STATIC_PAD_TEMPLATE ("video_%u",
449     GST_PAD_SRC,
450     GST_PAD_SOMETIMES,
451     GST_STATIC_CAPS_ANY);
452 
453 static GstStaticPadTemplate audio_src_template =
454 GST_STATIC_PAD_TEMPLATE ("audio_%u",
455     GST_PAD_SRC,
456     GST_PAD_SOMETIMES,
457     GST_STATIC_CAPS_ANY);
458 
459 static GstStaticPadTemplate text_src_template =
460 GST_STATIC_PAD_TEMPLATE ("text_%u",
461     GST_PAD_SRC,
462     GST_PAD_SOMETIMES,
463     GST_STATIC_CAPS_ANY);
464 
465 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
466     GST_PAD_SRC,
467     GST_PAD_SOMETIMES,
468     GST_STATIC_CAPS_ANY);
469 
470 
471 static void gst_decodebin3_dispose (GObject * object);
472 static void gst_decodebin3_set_property (GObject * object, guint prop_id,
473     const GValue * value, GParamSpec * pspec);
474 static void gst_decodebin3_get_property (GObject * object, guint prop_id,
475     GValue * value, GParamSpec * pspec);
476 
477 static gboolean parsebin_autoplug_continue_cb (GstElement *
478     parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin);
479 
480 static gint
gst_decodebin3_select_stream(GstDecodebin3 * dbin,GstStreamCollection * collection,GstStream * stream)481 gst_decodebin3_select_stream (GstDecodebin3 * dbin,
482     GstStreamCollection * collection, GstStream * stream)
483 {
484   GST_LOG_OBJECT (dbin, "default select-stream, returning -1");
485 
486   return -1;
487 }
488 
489 static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
490     GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
491 static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
492 static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
493     GstStateChange transition);
494 static gboolean gst_decodebin3_send_event (GstElement * element,
495     GstEvent * event);
496 
497 static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin);
498 #if 0
499 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps,
500     GstElementFactoryListType ftype);
501 #endif
502 
503 static void free_input (GstDecodebin3 * dbin, DecodebinInput * input);
504 static void free_input_async (GstDecodebin3 * dbin, DecodebinInput * input);
505 static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main);
506 static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id);
507 
508 static void reconfigure_output_stream (DecodebinOutputStream * output,
509     MultiQueueSlot * slot);
510 static void free_output_stream (GstDecodebin3 * dbin,
511     DecodebinOutputStream * output);
512 static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin,
513     GstStreamType type);
514 
515 static GstPadProbeReturn slot_unassign_probe (GstPad * pad,
516     GstPadProbeInfo * info, MultiQueueSlot * slot);
517 static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
518 static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin,
519     DecodebinInputStream * input);
520 static void link_input_to_slot (DecodebinInputStream * input,
521     MultiQueueSlot * slot);
522 static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
523 static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
524     MultiQueueSlot * slot);
525 
526 static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
527 static void update_requested_selection (GstDecodebin3 * dbin);
528 
529 /* FIXME: Really make all the parser stuff a self-contained helper object */
530 #include "gstdecodebin3-parse.c"
531 
532 static gboolean
_gst_int_accumulator(GSignalInvocationHint * ihint,GValue * return_accu,const GValue * handler_return,gpointer dummy)533 _gst_int_accumulator (GSignalInvocationHint * ihint,
534     GValue * return_accu, const GValue * handler_return, gpointer dummy)
535 {
536   gint res = g_value_get_int (handler_return);
537 
538   g_value_set_int (return_accu, res);
539 
540   if (res == -1)
541     return TRUE;
542 
543   return FALSE;
544 }
545 
546 static void
gst_decodebin3_class_init(GstDecodebin3Class * klass)547 gst_decodebin3_class_init (GstDecodebin3Class * klass)
548 {
549   GObjectClass *gobject_klass = (GObjectClass *) klass;
550   GstElementClass *element_class = (GstElementClass *) klass;
551   GstBinClass *bin_klass = (GstBinClass *) klass;
552 
553   gobject_klass->dispose = gst_decodebin3_dispose;
554   gobject_klass->set_property = gst_decodebin3_set_property;
555   gobject_klass->get_property = gst_decodebin3_get_property;
556 
557   /* FIXME : ADD PROPERTIES ! */
558   g_object_class_install_property (gobject_klass, PROP_CAPS,
559       g_param_spec_boxed ("caps", "Caps",
560           "The caps on which to stop decoding. (NULL = default)",
561           GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
562 
563   /* FIXME : ADD SIGNALS ! */
564   /**
565    * GstDecodebin3::select-stream
566    * @decodebin: a #GstDecodebin3
567    * @collection: a #GstStreamCollection
568    * @stream: a #GstStream
569    *
570    * This signal is emitted whenever @decodebin needs to decide whether
571    * to expose a @stream of a given @collection.
572    *
573    * Note that the prefered way to select streams is to listen to
574    * GST_MESSAGE_STREAM_COLLECTION on the bus and send a
575    * GST_EVENT_SELECT_STREAMS with the streams the user wants.
576    *
577    * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected.
578    * A value of -1 (default) lets @decodebin decide what to do with the stream.
579    * */
580   gst_decodebin3_signals[SIGNAL_SELECT_STREAM] =
581       g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass),
582       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream),
583       _gst_int_accumulator, NULL, NULL,
584       G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
585 
586   /**
587    * GstDecodebin3::about-to-finish:
588    *
589    * This signal is emitted when the data for the selected URI is
590    * entirely buffered and it is safe to specify another URI.
591    */
592   gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH] =
593       g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
594       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
595 
596 
597   element_class->request_new_pad =
598       GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
599   element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state);
600   element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event);
601 
602   gst_element_class_add_pad_template (element_class,
603       gst_static_pad_template_get (&sink_template));
604   gst_element_class_add_pad_template (element_class,
605       gst_static_pad_template_get (&request_sink_template));
606   gst_element_class_add_pad_template (element_class,
607       gst_static_pad_template_get (&video_src_template));
608   gst_element_class_add_pad_template (element_class,
609       gst_static_pad_template_get (&audio_src_template));
610   gst_element_class_add_pad_template (element_class,
611       gst_static_pad_template_get (&text_src_template));
612   gst_element_class_add_pad_template (element_class,
613       gst_static_pad_template_get (&src_template));
614 
615   gst_element_class_set_static_metadata (element_class,
616       "Decoder Bin 3", "Generic/Bin/Decoder",
617       "Autoplug and decode to raw media",
618       "Edward Hervey <edward@centricular.com>");
619 
620   bin_klass->handle_message = gst_decodebin3_handle_message;
621 
622   klass->select_stream = gst_decodebin3_select_stream;
623 }
624 
625 static void
gst_decodebin3_init(GstDecodebin3 * dbin)626 gst_decodebin3_init (GstDecodebin3 * dbin)
627 {
628   /* Create main input */
629   dbin->main_input = create_new_input (dbin, TRUE);
630 
631   dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL);
632   g_object_get (dbin->multiqueue, "min-interleave-time",
633       &dbin->default_mq_min_interleave, NULL);
634   dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
635   g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE,
636       "max-size-buffers", 0, "use-interleave", TRUE, NULL);
637   gst_bin_add ((GstBin *) dbin, dbin->multiqueue);
638 
639   dbin->current_group_id = GST_GROUP_ID_INVALID;
640 
641   g_mutex_init (&dbin->factories_lock);
642   g_mutex_init (&dbin->selection_lock);
643   g_mutex_init (&dbin->input_lock);
644 
645   dbin->caps = gst_static_caps_get (&default_raw_caps);
646 
647   GST_OBJECT_FLAG_SET (dbin, GST_BIN_FLAG_STREAMS_AWARE);
648 }
649 
650 static void
gst_decodebin3_dispose(GObject * object)651 gst_decodebin3_dispose (GObject * object)
652 {
653   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
654   GList *walk, *next;
655 
656   if (dbin->factories)
657     gst_plugin_feature_list_free (dbin->factories);
658   if (dbin->decoder_factories)
659     g_list_free (dbin->decoder_factories);
660   if (dbin->decodable_factories)
661     g_list_free (dbin->decodable_factories);
662   g_list_free_full (dbin->requested_selection, g_free);
663   g_list_free (dbin->active_selection);
664   g_list_free (dbin->to_activate);
665   g_list_free (dbin->pending_select_streams);
666   g_clear_object (&dbin->collection);
667 
668   free_input (dbin, dbin->main_input);
669 
670   for (walk = dbin->other_inputs; walk; walk = next) {
671     DecodebinInput *input = walk->data;
672 
673     next = g_list_next (walk);
674 
675     free_input (dbin, input);
676     dbin->other_inputs = g_list_delete_link (dbin->other_inputs, walk);
677   }
678 
679   G_OBJECT_CLASS (parent_class)->dispose (object);
680 }
681 
682 static void
gst_decodebin3_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)683 gst_decodebin3_set_property (GObject * object, guint prop_id,
684     const GValue * value, GParamSpec * pspec)
685 {
686   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
687 
688   /* FIXME : IMPLEMENT */
689   switch (prop_id) {
690     case PROP_CAPS:
691       GST_OBJECT_LOCK (dbin);
692       if (dbin->caps)
693         gst_caps_unref (dbin->caps);
694       dbin->caps = g_value_dup_boxed (value);
695       GST_OBJECT_UNLOCK (dbin);
696       break;
697     default:
698       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
699       break;
700   }
701 }
702 
703 static void
gst_decodebin3_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)704 gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
705     GParamSpec * pspec)
706 {
707   GstDecodebin3 *dbin = (GstDecodebin3 *) object;
708 
709   /* FIXME : IMPLEMENT */
710   switch (prop_id) {
711     case PROP_CAPS:
712       GST_OBJECT_LOCK (dbin);
713       g_value_set_boxed (value, dbin->caps);
714       GST_OBJECT_UNLOCK (dbin);
715       break;
716     default:
717       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
718       break;
719   }
720 }
721 
722 static gboolean
parsebin_autoplug_continue_cb(GstElement * parsebin,GstPad * pad,GstCaps * caps,GstDecodebin3 * dbin)723 parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad,
724     GstCaps * caps, GstDecodebin3 * dbin)
725 {
726   GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
727 
728   /* If it matches our target caps, expose it */
729   if (gst_caps_can_intersect (caps, dbin->caps))
730     return FALSE;
731 
732   return TRUE;
733 }
734 
735 /* This method should be called whenever a STREAM_START event
736  * comes out of a given parsebin.
737  * The caller shall replace the group_id if the function returns TRUE */
738 static gboolean
set_input_group_id(DecodebinInput * input,guint32 * group_id)739 set_input_group_id (DecodebinInput * input, guint32 * group_id)
740 {
741   GstDecodebin3 *dbin = input->dbin;
742 
743   if (input->group_id != *group_id) {
744     if (input->group_id != GST_GROUP_ID_INVALID)
745       GST_WARNING_OBJECT (dbin,
746           "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT
747           ") on input %p ", input->group_id, *group_id, input);
748     input->group_id = *group_id;
749   }
750 
751   if (*group_id != dbin->current_group_id) {
752     if (dbin->current_group_id == GST_GROUP_ID_INVALID) {
753       GST_DEBUG_OBJECT (dbin, "Setting current group id to %" G_GUINT32_FORMAT,
754           *group_id);
755       dbin->current_group_id = *group_id;
756     }
757     *group_id = dbin->current_group_id;
758     return TRUE;
759   }
760 
761   return FALSE;
762 }
763 
764 static void
parsebin_drained_cb(GstElement * parsebin,DecodebinInput * input)765 parsebin_drained_cb (GstElement * parsebin, DecodebinInput * input)
766 {
767   GstDecodebin3 *dbin = input->dbin;
768   gboolean all_drained;
769   GList *tmp;
770 
771   GST_INFO_OBJECT (dbin, "input %p drained", input);
772   input->drained = TRUE;
773 
774   all_drained = dbin->main_input->drained;
775   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
776     DecodebinInput *data = (DecodebinInput *) tmp->data;
777 
778     all_drained &= data->drained;
779   }
780 
781   if (all_drained) {
782     GST_INFO_OBJECT (dbin, "All inputs drained. Posting about-to-finish");
783     g_signal_emit (dbin, gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH], 0,
784         NULL);
785   }
786 }
787 
788 /* Call with INPUT_LOCK taken */
789 static gboolean
ensure_input_parsebin(GstDecodebin3 * dbin,DecodebinInput * input)790 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
791 {
792   gboolean set_state = FALSE;
793 
794   if (input->parsebin == NULL) {
795     input->parsebin = gst_element_factory_make ("parsebin", NULL);
796     if (input->parsebin == NULL)
797       goto no_parsebin;
798     input->parsebin = gst_object_ref (input->parsebin);
799     input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink");
800     input->pad_added_sigid =
801         g_signal_connect (input->parsebin, "pad-added",
802         (GCallback) parsebin_pad_added_cb, input);
803     input->pad_removed_sigid =
804         g_signal_connect (input->parsebin, "pad-removed",
805         (GCallback) parsebin_pad_removed_cb, input);
806     input->drained_sigid =
807         g_signal_connect (input->parsebin, "drained",
808         (GCallback) parsebin_drained_cb, input);
809     g_signal_connect (input->parsebin, "autoplug-continue",
810         (GCallback) parsebin_autoplug_continue_cb, dbin);
811   }
812 
813   if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) {
814     /* The state lock is taken so that we ensure we are the one (de)activating
815      * parsebin. We need to do this to ensure any activation taking place in
816      * parsebin (including by elements doing upstream activation) are done
817      * within the same thread. */
818     GST_STATE_LOCK (input->parsebin);
819     gst_bin_add (GST_BIN (dbin), input->parsebin);
820     set_state = TRUE;
821   }
822 
823   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink),
824       input->parsebin_sink);
825 
826   if (set_state) {
827     gst_element_sync_state_with_parent (input->parsebin);
828     GST_STATE_UNLOCK (input->parsebin);
829   }
830 
831   return TRUE;
832 
833   /* ERRORS */
834 no_parsebin:
835   {
836     gst_element_post_message ((GstElement *) dbin,
837         gst_missing_element_message_new ((GstElement *) dbin, "parsebin"));
838     return FALSE;
839   }
840 }
841 
842 static GstPadLinkReturn
gst_decodebin3_input_pad_link(GstPad * pad,GstObject * parent,GstPad * peer)843 gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer)
844 {
845   GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
846   GstPadLinkReturn res = GST_PAD_LINK_OK;
847   DecodebinInput *input;
848 
849   GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT
850       ". Creating parsebin if needed", pad);
851 
852   if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
853     goto fail;
854 
855   INPUT_LOCK (dbin);
856   if (!ensure_input_parsebin (dbin, input))
857     res = GST_PAD_LINK_REFUSED;
858   INPUT_UNLOCK (dbin);
859 
860   return res;
861 fail:
862   GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
863   return GST_PAD_LINK_REFUSED;
864 }
865 
866 /* Drop duration query during _input_pad_unlink */
867 static GstPadProbeReturn
query_duration_drop_probe(GstPad * pad,GstPadProbeInfo * info,DecodebinInput * input)868 query_duration_drop_probe (GstPad * pad, GstPadProbeInfo * info,
869     DecodebinInput * input)
870 {
871   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
872 
873   if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
874     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
875     if (GST_QUERY_TYPE (query) == GST_QUERY_DURATION) {
876       GST_LOG_OBJECT (pad, "stop forwarding query duration");
877       ret = GST_PAD_PROBE_HANDLED;
878     }
879   }
880 
881   return ret;
882 }
883 
884 static void
gst_decodebin3_input_pad_unlink(GstPad * pad,GstObject * parent)885 gst_decodebin3_input_pad_unlink (GstPad * pad, GstObject * parent)
886 {
887   GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
888   DecodebinInput *input;
889 
890   GST_LOG_OBJECT (parent, "Got unlink on input pad %" GST_PTR_FORMAT
891       ". Removing parsebin.", pad);
892 
893   if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
894     goto fail;
895 
896   INPUT_LOCK (dbin);
897   if (input->parsebin == NULL) {
898     INPUT_UNLOCK (dbin);
899     return;
900   }
901 
902   if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) == GST_OBJECT (dbin)) {
903     GstStreamCollection *collection = NULL;
904     gulong probe_id = gst_pad_add_probe (input->parsebin_sink,
905         GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
906         (GstPadProbeCallback) query_duration_drop_probe, input, NULL);
907 
908     /* Clear stream-collection corresponding to current INPUT and post new
909      * stream-collection message, if needed */
910     if (input->collection) {
911       gst_object_unref (input->collection);
912       input->collection = NULL;
913     }
914 
915     SELECTION_LOCK (dbin);
916     collection = get_merged_collection (dbin);
917     if (collection && collection != dbin->collection) {
918       GstMessage *msg;
919       GST_DEBUG_OBJECT (dbin, "Update Stream Collection");
920 
921       if (dbin->collection)
922         gst_object_unref (dbin->collection);
923       dbin->collection = collection;
924       dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
925 
926       msg =
927           gst_message_new_stream_collection ((GstObject *) dbin,
928           dbin->collection);
929 
930       SELECTION_UNLOCK (dbin);
931       gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
932       update_requested_selection (dbin);
933     } else {
934       if (collection)
935         gst_object_unref (collection);
936       SELECTION_UNLOCK (dbin);
937     }
938 
939     gst_bin_remove (GST_BIN (dbin), input->parsebin);
940     gst_element_set_state (input->parsebin, GST_STATE_NULL);
941     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
942     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
943     g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
944     gst_pad_remove_probe (input->parsebin_sink, probe_id);
945     gst_object_unref (input->parsebin);
946     gst_object_unref (input->parsebin_sink);
947 
948     input->parsebin = NULL;
949     input->parsebin_sink = NULL;
950 
951     if (!input->is_main) {
952       dbin->other_inputs = g_list_remove (dbin->other_inputs, input);
953       free_input_async (dbin, input);
954     }
955   }
956   INPUT_UNLOCK (dbin);
957   return;
958 
959 fail:
960   GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
961   return;
962 }
963 
964 static void
free_input(GstDecodebin3 * dbin,DecodebinInput * input)965 free_input (GstDecodebin3 * dbin, DecodebinInput * input)
966 {
967   GST_DEBUG ("Freeing input %p", input);
968   gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
969   gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink);
970   if (input->parsebin) {
971     g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
972     g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
973     g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
974     gst_element_set_state (input->parsebin, GST_STATE_NULL);
975     gst_object_unref (input->parsebin);
976     gst_object_unref (input->parsebin_sink);
977   }
978   if (input->collection)
979     gst_object_unref (input->collection);
980   g_free (input);
981 }
982 
983 static void
free_input_async(GstDecodebin3 * dbin,DecodebinInput * input)984 free_input_async (GstDecodebin3 * dbin, DecodebinInput * input)
985 {
986   GST_LOG_OBJECT (dbin, "pushing input %p on thread pool to free", input);
987   gst_element_call_async (GST_ELEMENT_CAST (dbin),
988       (GstElementCallAsyncFunc) free_input, input, NULL);
989 }
990 
991 /* Call with INPUT_LOCK taken */
992 static DecodebinInput *
create_new_input(GstDecodebin3 * dbin,gboolean main)993 create_new_input (GstDecodebin3 * dbin, gboolean main)
994 {
995   DecodebinInput *input;
996 
997   input = g_new0 (DecodebinInput, 1);
998   input->dbin = dbin;
999   input->is_main = main;
1000   input->group_id = GST_GROUP_ID_INVALID;
1001   if (main)
1002     input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
1003   else {
1004     gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++);
1005     input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK);
1006     g_free (pad_name);
1007   }
1008   g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input);
1009   gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link);
1010   gst_pad_set_unlink_function (input->ghost_sink,
1011       gst_decodebin3_input_pad_unlink);
1012 
1013   gst_pad_set_active (input->ghost_sink, TRUE);
1014   gst_element_add_pad ((GstElement *) dbin, input->ghost_sink);
1015 
1016   return input;
1017 
1018 }
1019 
1020 static GstPad *
gst_decodebin3_request_new_pad(GstElement * element,GstPadTemplate * temp,const gchar * name,const GstCaps * caps)1021 gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
1022     const gchar * name, const GstCaps * caps)
1023 {
1024   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
1025   DecodebinInput *input;
1026   GstPad *res = NULL;
1027 
1028   /* We are ignoring names for the time being, not sure it makes any sense
1029    * within the context of decodebin3 ... */
1030   input = create_new_input (dbin, FALSE);
1031   if (input) {
1032     INPUT_LOCK (dbin);
1033     dbin->other_inputs = g_list_append (dbin->other_inputs, input);
1034     res = input->ghost_sink;
1035     INPUT_UNLOCK (dbin);
1036   }
1037 
1038   return res;
1039 }
1040 
1041 /* Must be called with factories lock! */
1042 static void
gst_decode_bin_update_factories_list(GstDecodebin3 * dbin)1043 gst_decode_bin_update_factories_list (GstDecodebin3 * dbin)
1044 {
1045   guint cookie;
1046 
1047   cookie = gst_registry_get_feature_list_cookie (gst_registry_get ());
1048   if (!dbin->factories || dbin->factories_cookie != cookie) {
1049     GList *tmp;
1050     if (dbin->factories)
1051       gst_plugin_feature_list_free (dbin->factories);
1052     if (dbin->decoder_factories)
1053       g_list_free (dbin->decoder_factories);
1054     if (dbin->decodable_factories)
1055       g_list_free (dbin->decodable_factories);
1056     dbin->factories =
1057         gst_element_factory_list_get_elements
1058         (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL);
1059     dbin->factories =
1060         g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func);
1061     dbin->factories_cookie = cookie;
1062 
1063     /* Filter decoder and other decodables */
1064     dbin->decoder_factories = NULL;
1065     dbin->decodable_factories = NULL;
1066     for (tmp = dbin->factories; tmp; tmp = tmp->next) {
1067       GstElementFactory *fact = (GstElementFactory *) tmp->data;
1068       if (gst_element_factory_list_is_type (fact,
1069               GST_ELEMENT_FACTORY_TYPE_DECODER))
1070         dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact);
1071       else
1072         dbin->decodable_factories =
1073             g_list_append (dbin->decodable_factories, fact);
1074     }
1075   }
1076 }
1077 
1078 /* Must be called with appropriate lock if list is a protected variable */
1079 static const gchar *
stream_in_list(GList * list,const gchar * sid)1080 stream_in_list (GList * list, const gchar * sid)
1081 {
1082   GList *tmp;
1083 
1084 #if EXTRA_DEBUG
1085   for (tmp = list; tmp; tmp = tmp->next) {
1086     gchar *osid = (gchar *) tmp->data;
1087     GST_DEBUG ("Checking %s against %s", sid, osid);
1088   }
1089 #endif
1090 
1091   for (tmp = list; tmp; tmp = tmp->next) {
1092     const gchar *osid = (gchar *) tmp->data;
1093     if (!g_strcmp0 (sid, osid))
1094       return osid;
1095   }
1096 
1097   return NULL;
1098 }
1099 
1100 static gboolean
stream_list_equal(GList * lista,GList * listb)1101 stream_list_equal (GList * lista, GList * listb)
1102 {
1103   GList *tmp;
1104 
1105   if (g_list_length (lista) != g_list_length (listb))
1106     return FALSE;
1107 
1108   for (tmp = lista; tmp; tmp = tmp->next) {
1109     gchar *osid = tmp->data;
1110     if (!stream_in_list (listb, osid))
1111       return FALSE;
1112   }
1113 
1114   return TRUE;
1115 }
1116 
1117 static void
update_requested_selection(GstDecodebin3 * dbin)1118 update_requested_selection (GstDecodebin3 * dbin)
1119 {
1120   guint i, nb;
1121   GList *tmp = NULL;
1122   gboolean all_user_selected = TRUE;
1123   GstStreamType used_types = 0;
1124   GstStreamCollection *collection;
1125 
1126   /* 1. Is there a pending SELECT_STREAMS we can return straight away since
1127    *  the switch handler will take care of the pending selection */
1128   SELECTION_LOCK (dbin);
1129   if (dbin->pending_select_streams) {
1130     GST_DEBUG_OBJECT (dbin,
1131         "No need to create pending selection, SELECT_STREAMS underway");
1132     goto beach;
1133   }
1134 
1135   collection = dbin->collection;
1136   if (G_UNLIKELY (collection == NULL)) {
1137     GST_DEBUG_OBJECT (dbin, "No current GstStreamCollection");
1138     goto beach;
1139   }
1140   nb = gst_stream_collection_get_size (collection);
1141 
1142   /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
1143   GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
1144 
1145   /* 3. If not, check if we already have some of the streams in the
1146    * existing active/requested selection */
1147   for (i = 0; i < nb; i++) {
1148     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1149     const gchar *sid = gst_stream_get_stream_id (stream);
1150     gint request = -1;
1151     /* Fire select-stream signal to see if outside components want to
1152      * hint at which streams should be selected */
1153     g_signal_emit (G_OBJECT (dbin),
1154         gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream,
1155         &request);
1156     GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request);
1157 
1158     if (request == -1)
1159       all_user_selected = FALSE;
1160     if (request == 1 || (request == -1
1161             && (stream_in_list (dbin->requested_selection, sid)
1162                 || stream_in_list (dbin->active_selection, sid)))) {
1163       GstStreamType curtype = gst_stream_get_stream_type (stream);
1164       if (request == 1)
1165         GST_DEBUG_OBJECT (dbin,
1166             "Using stream requested by 'select-stream' signal : %s", sid);
1167       else
1168         GST_DEBUG_OBJECT (dbin,
1169             "Re-using stream already present in requested or active selection : %s",
1170             sid);
1171       tmp = g_list_append (tmp, (gchar *) sid);
1172       used_types |= curtype;
1173     }
1174   }
1175 
1176   /* 4. If the user didn't explicitly selected all streams, match one stream of each type */
1177   if (!all_user_selected && dbin->select_streams_seqnum == GST_SEQNUM_INVALID) {
1178     for (i = 0; i < nb; i++) {
1179       GstStream *stream = gst_stream_collection_get_stream (collection, i);
1180       GstStreamType curtype = gst_stream_get_stream_type (stream);
1181       if (curtype != GST_STREAM_TYPE_UNKNOWN && !(used_types & curtype)) {
1182         const gchar *sid = gst_stream_get_stream_id (stream);
1183         GST_DEBUG_OBJECT (dbin,
1184             "Automatically selecting stream '%s' of type %s", sid,
1185             gst_stream_type_get_name (curtype));
1186         tmp = g_list_append (tmp, (gchar *) sid);
1187         used_types |= curtype;
1188       }
1189     }
1190   }
1191 
1192 beach:
1193   if (stream_list_equal (tmp, dbin->requested_selection)) {
1194     /* If the selection is equal, there is nothign to do */
1195     GST_DEBUG_OBJECT (dbin, "Dropping duplicate selection");
1196     g_list_free (tmp);
1197     tmp = NULL;
1198   }
1199 
1200   if (tmp) {
1201     /* Finally set the requested selection */
1202     if (dbin->requested_selection) {
1203       GST_FIXME_OBJECT (dbin,
1204           "Replacing non-NULL requested_selection, what should we do ??");
1205       g_list_free_full (dbin->requested_selection, g_free);
1206     }
1207     dbin->requested_selection =
1208         g_list_copy_deep (tmp, (GCopyFunc) g_strdup, NULL);
1209     dbin->selection_updated = TRUE;
1210     g_list_free (tmp);
1211   }
1212   SELECTION_UNLOCK (dbin);
1213 }
1214 
1215 /* sort_streams:
1216  * GCompareFunc to use with lists of GstStream.
1217  * Sorts GstStreams by stream type and SELECT flag and stream-id
1218  * First video, then audio, then others.
1219  *
1220  * Return: negative if a<b, 0 if a==b, positive if a>b
1221  */
1222 static gint
sort_streams(GstStream * sa,GstStream * sb)1223 sort_streams (GstStream * sa, GstStream * sb)
1224 {
1225   GstStreamType typea, typeb;
1226   GstStreamFlags flaga, flagb;
1227   const gchar *ida, *idb;
1228   gint ret = 0;
1229 
1230   typea = gst_stream_get_stream_type (sa);
1231   typeb = gst_stream_get_stream_type (sb);
1232 
1233   GST_LOG ("sa(%s), sb(%s)", gst_stream_get_stream_id (sa),
1234       gst_stream_get_stream_id (sb));
1235 
1236   /* Sort by stream type. First video, then audio, then others(text, container, unknown) */
1237   if (typea != typeb) {
1238     if (typea & GST_STREAM_TYPE_VIDEO)
1239       ret = -1;
1240     else if (typea & GST_STREAM_TYPE_AUDIO)
1241       ret = (!(typeb & GST_STREAM_TYPE_VIDEO)) ? -1 : 1;
1242     else if (typea & GST_STREAM_TYPE_TEXT)
1243       ret = (!(typeb & GST_STREAM_TYPE_VIDEO)
1244           && !(typeb & GST_STREAM_TYPE_AUDIO)) ? -1 : 1;
1245     else if (typea & GST_STREAM_TYPE_CONTAINER)
1246       ret = (typeb & GST_STREAM_TYPE_UNKNOWN) ? -1 : 1;
1247     else
1248       ret = 1;
1249 
1250     if (ret != 0) {
1251       GST_LOG ("Sort by stream-type: %d", ret);
1252       return ret;
1253     }
1254   }
1255 
1256   /* Sort by SELECT flag, if stream type is same. */
1257   flaga = gst_stream_get_stream_flags (sa);
1258   flagb = gst_stream_get_stream_flags (sb);
1259 
1260   ret =
1261       (flaga & GST_STREAM_FLAG_SELECT) ? ((flagb & GST_STREAM_FLAG_SELECT) ? 0 :
1262       -1) : ((flagb & GST_STREAM_FLAG_SELECT) ? 1 : 0);
1263 
1264   if (ret != 0) {
1265     GST_LOG ("Sort by SELECT flag: %d", ret);
1266     return ret;
1267   }
1268 
1269   /* Sort by stream-id, if otherwise the same. */
1270   ida = gst_stream_get_stream_id (sa);
1271   idb = gst_stream_get_stream_id (sb);
1272   ret = g_strcmp0 (ida, idb);
1273 
1274   GST_LOG ("Sort by stream-id: %d", ret);
1275 
1276   return ret;
1277 }
1278 
1279 /* Call with INPUT_LOCK taken */
1280 static GstStreamCollection *
get_merged_collection(GstDecodebin3 * dbin)1281 get_merged_collection (GstDecodebin3 * dbin)
1282 {
1283   gboolean needs_merge = FALSE;
1284   GstStreamCollection *res = NULL;
1285   GList *tmp;
1286   GList *unsorted_streams = NULL;
1287   guint i, nb_stream;
1288 
1289   /* First check if we need to do a merge or just return the only collection */
1290   res = dbin->main_input->collection;
1291 
1292   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1293     DecodebinInput *input = (DecodebinInput *) tmp->data;
1294     if (input->collection) {
1295       if (res) {
1296         needs_merge = TRUE;
1297         break;
1298       }
1299       res = input->collection;
1300     }
1301   }
1302 
1303   if (!needs_merge) {
1304     GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res);
1305     return res ? gst_object_ref (res) : NULL;
1306   }
1307 
1308   /* We really need to create a new collection */
1309   /* FIXME : Some numbering scheme maybe ?? */
1310   res = gst_stream_collection_new ("decodebin3");
1311   if (dbin->main_input->collection) {
1312     nb_stream = gst_stream_collection_get_size (dbin->main_input->collection);
1313     GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream);
1314     for (i = 0; i < nb_stream; i++) {
1315       GstStream *stream =
1316           gst_stream_collection_get_stream (dbin->main_input->collection, i);
1317       unsorted_streams = g_list_append (unsorted_streams, stream);
1318     }
1319   }
1320 
1321   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1322     DecodebinInput *input = (DecodebinInput *) tmp->data;
1323     GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input,
1324         input->collection);
1325     if (input->collection) {
1326       nb_stream = gst_stream_collection_get_size (input->collection);
1327       GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream);
1328       for (i = 0; i < nb_stream; i++) {
1329         GstStream *stream =
1330             gst_stream_collection_get_stream (input->collection, i);
1331         /* Only add if not already present in the list */
1332         if (!g_list_find (unsorted_streams, stream))
1333           unsorted_streams = g_list_append (unsorted_streams, stream);
1334       }
1335     }
1336   }
1337 
1338   /* re-order streams : video, then audio, then others */
1339   unsorted_streams =
1340       g_list_sort (unsorted_streams, (GCompareFunc) sort_streams);
1341   for (tmp = unsorted_streams; tmp; tmp = tmp->next) {
1342     GstStream *stream = (GstStream *) tmp->data;
1343     GST_DEBUG_OBJECT (dbin, "Adding #stream(%s) to collection",
1344         gst_stream_get_stream_id (stream));
1345     gst_stream_collection_add_stream (res, gst_object_ref (stream));
1346   }
1347 
1348   if (unsorted_streams)
1349     g_list_free (unsorted_streams);
1350 
1351   return res;
1352 }
1353 
1354 /* Call with INPUT_LOCK taken */
1355 static DecodebinInput *
find_message_parsebin(GstDecodebin3 * dbin,GstElement * child)1356 find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
1357 {
1358   DecodebinInput *input = NULL;
1359   GstElement *parent = gst_object_ref (child);
1360   GList *tmp;
1361 
1362   do {
1363     GstElement *next_parent;
1364 
1365     GST_DEBUG_OBJECT (dbin, "parent %s",
1366         parent ? GST_ELEMENT_NAME (parent) : "<NONE>");
1367 
1368     if (parent == dbin->main_input->parsebin) {
1369       input = dbin->main_input;
1370       break;
1371     }
1372     for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1373       DecodebinInput *cur = (DecodebinInput *) tmp->data;
1374       if (parent == cur->parsebin) {
1375         input = cur;
1376         break;
1377       }
1378     }
1379     next_parent = (GstElement *) gst_element_get_parent (parent);
1380     gst_object_unref (parent);
1381     parent = next_parent;
1382 
1383   } while (parent && parent != (GstElement *) dbin);
1384 
1385   if (parent)
1386     gst_object_unref (parent);
1387 
1388   return input;
1389 }
1390 
1391 static const gchar *
stream_in_collection(GstDecodebin3 * dbin,gchar * sid)1392 stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
1393 {
1394   guint i, len;
1395 
1396   if (dbin->collection == NULL)
1397     return NULL;
1398   len = gst_stream_collection_get_size (dbin->collection);
1399   for (i = 0; i < len; i++) {
1400     GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i);
1401     const gchar *osid = gst_stream_get_stream_id (stream);
1402     if (!g_strcmp0 (sid, osid))
1403       return osid;
1404   }
1405 
1406   return NULL;
1407 }
1408 
1409 /* Call with INPUT_LOCK taken */
1410 static void
handle_stream_collection(GstDecodebin3 * dbin,GstStreamCollection * collection,GstElement * child)1411 handle_stream_collection (GstDecodebin3 * dbin,
1412     GstStreamCollection * collection, GstElement * child)
1413 {
1414 #ifndef GST_DISABLE_GST_DEBUG
1415   const gchar *upstream_id;
1416   guint i;
1417 #endif
1418   DecodebinInput *input = find_message_parsebin (dbin, child);
1419 
1420   if (!input) {
1421     GST_DEBUG_OBJECT (dbin,
1422         "Couldn't find corresponding input, most likely shutting down");
1423     return;
1424   }
1425 
1426   /* Replace collection in input */
1427   if (input->collection)
1428     gst_object_unref (input->collection);
1429   input->collection = gst_object_ref (collection);
1430   GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
1431       input);
1432 
1433   /* Merge collection if needed */
1434   collection = get_merged_collection (dbin);
1435 
1436 #ifndef GST_DISABLE_GST_DEBUG
1437   /* Just some debugging */
1438   upstream_id = gst_stream_collection_get_upstream_id (collection);
1439   GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id);
1440   GST_DEBUG ("From input %p", input);
1441   GST_DEBUG ("  %d streams", gst_stream_collection_get_size (collection));
1442   for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1443     GstStream *stream = gst_stream_collection_get_stream (collection, i);
1444     GstTagList *taglist;
1445     GstCaps *caps;
1446 
1447     GST_DEBUG ("   Stream '%s'", gst_stream_get_stream_id (stream));
1448     GST_DEBUG ("     type  : %s",
1449         gst_stream_type_get_name (gst_stream_get_stream_type (stream)));
1450     GST_DEBUG ("     flags : 0x%x", gst_stream_get_stream_flags (stream));
1451     taglist = gst_stream_get_tags (stream);
1452     GST_DEBUG ("     tags  : %" GST_PTR_FORMAT, taglist);
1453     caps = gst_stream_get_caps (stream);
1454     GST_DEBUG ("     caps  : %" GST_PTR_FORMAT, caps);
1455     if (taglist)
1456       gst_tag_list_unref (taglist);
1457     if (caps)
1458       gst_caps_unref (caps);
1459   }
1460 #endif
1461 
1462   /* Store collection for later usage */
1463   SELECTION_LOCK (dbin);
1464   if (dbin->collection == NULL) {
1465     dbin->collection = collection;
1466   } else {
1467     /* We need to check who emitted this collection (the owner).
1468      * If we already had a collection from that user, this one is an update,
1469      * that is to say that we need to figure out how we are going to re-use
1470      * the streams/slot */
1471     GST_FIXME_OBJECT (dbin, "New collection but already had one ...");
1472     /* FIXME : When do we switch from pending collection to active collection ?
1473      * When all streams from active collection are drained in multiqueue output ? */
1474     gst_object_unref (dbin->collection);
1475     dbin->collection = collection;
1476     /* dbin->pending_collection = */
1477     /*     g_list_append (dbin->pending_collection, collection); */
1478   }
1479   dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
1480   SELECTION_UNLOCK (dbin);
1481 }
1482 
1483 /* Must be called with the selection lock taken */
1484 static void
gst_decodebin3_update_min_interleave(GstDecodebin3 * dbin)1485 gst_decodebin3_update_min_interleave (GstDecodebin3 * dbin)
1486 {
1487   GstClockTime max_latency = GST_CLOCK_TIME_NONE;
1488   GList *tmp;
1489 
1490   GST_DEBUG_OBJECT (dbin, "Recalculating max latency of decoders");
1491   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1492     DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1493     if (GST_CLOCK_TIME_IS_VALID (out->decoder_latency)) {
1494       if (max_latency == GST_CLOCK_TIME_NONE
1495           || out->decoder_latency > max_latency)
1496         max_latency = out->decoder_latency;
1497     }
1498   }
1499   GST_DEBUG_OBJECT (dbin, "max latency of all decoders: %" GST_TIME_FORMAT,
1500       GST_TIME_ARGS (max_latency));
1501 
1502   if (!GST_CLOCK_TIME_IS_VALID (max_latency))
1503     return;
1504 
1505   /* Make sure we keep an extra overhead */
1506   max_latency += 100 * GST_MSECOND;
1507   if (max_latency == dbin->current_mq_min_interleave)
1508     return;
1509 
1510   dbin->current_mq_min_interleave = max_latency;
1511   GST_DEBUG_OBJECT (dbin, "Setting mq min-interleave to %" GST_TIME_FORMAT,
1512       GST_TIME_ARGS (dbin->current_mq_min_interleave));
1513   g_object_set (dbin->multiqueue, "min-interleave-time",
1514       dbin->current_mq_min_interleave, NULL);
1515 }
1516 
1517 static void
gst_decodebin3_handle_message(GstBin * bin,GstMessage * message)1518 gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
1519 {
1520   GstDecodebin3 *dbin = (GstDecodebin3 *) bin;
1521   gboolean posting_collection = FALSE;
1522 
1523   GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message));
1524 
1525   switch (GST_MESSAGE_TYPE (message)) {
1526     case GST_MESSAGE_STREAM_COLLECTION:
1527     {
1528       GstStreamCollection *collection = NULL;
1529       gst_message_parse_stream_collection (message, &collection);
1530       if (collection) {
1531         INPUT_LOCK (dbin);
1532         handle_stream_collection (dbin, collection,
1533             (GstElement *) GST_MESSAGE_SRC (message));
1534         posting_collection = TRUE;
1535         INPUT_UNLOCK (dbin);
1536       }
1537 
1538       SELECTION_LOCK (dbin);
1539       if (dbin->collection) {
1540         /* Replace collection message, we most likely aggregated it */
1541         GstMessage *new_msg;
1542         new_msg =
1543             gst_message_new_stream_collection ((GstObject *) dbin,
1544             dbin->collection);
1545         gst_message_unref (message);
1546         message = new_msg;
1547       }
1548       SELECTION_UNLOCK (dbin);
1549 
1550       if (collection)
1551         gst_object_unref (collection);
1552       break;
1553     }
1554     case GST_MESSAGE_LATENCY:
1555     {
1556       GList *tmp;
1557       /* Check if this is from one of our decoders */
1558       SELECTION_LOCK (dbin);
1559       for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1560         DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1561         if (out->decoder == (GstElement *) GST_MESSAGE_SRC (message)) {
1562           GstClockTime min, max;
1563           if (GST_IS_VIDEO_DECODER (out->decoder)) {
1564             gst_video_decoder_get_latency (GST_VIDEO_DECODER (out->decoder),
1565                 &min, &max);
1566             GST_DEBUG_OBJECT (dbin,
1567                 "Got latency update from one of our decoders. min: %"
1568                 GST_TIME_FORMAT " max: %" GST_TIME_FORMAT, GST_TIME_ARGS (min),
1569                 GST_TIME_ARGS (max));
1570             out->decoder_latency = min;
1571             /* Trigger recalculation */
1572             gst_decodebin3_update_min_interleave (dbin);
1573           }
1574           break;
1575         }
1576       }
1577       SELECTION_UNLOCK (dbin);
1578     }
1579     default:
1580       break;
1581   }
1582 
1583   GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1584 
1585   if (posting_collection) {
1586     /* Figure out a selection for that collection */
1587     update_requested_selection (dbin);
1588   }
1589 }
1590 
1591 static DecodebinOutputStream *
find_free_compatible_output(GstDecodebin3 * dbin,GstStream * stream)1592 find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream)
1593 {
1594   GList *tmp;
1595   GstStreamType stype = gst_stream_get_stream_type (stream);
1596 
1597   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1598     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1599     if (output->type == stype && output->slot && output->slot->active_stream) {
1600       GstStream *tstream = output->slot->active_stream;
1601       if (!stream_in_list (dbin->requested_selection,
1602               (gchar *) gst_stream_get_stream_id (tstream))) {
1603         return output;
1604       }
1605     }
1606   }
1607 
1608   return NULL;
1609 }
1610 
1611 /* Give a certain slot, figure out if it should be linked to an
1612  * output stream
1613  * CALL WITH SELECTION LOCK TAKEN !*/
1614 static DecodebinOutputStream *
get_output_for_slot(MultiQueueSlot * slot)1615 get_output_for_slot (MultiQueueSlot * slot)
1616 {
1617   GstDecodebin3 *dbin = slot->dbin;
1618   DecodebinOutputStream *output = NULL;
1619   const gchar *stream_id;
1620   GstCaps *caps;
1621   gchar *id_in_list = NULL;
1622 
1623   /* If we already have a configured output, just use it */
1624   if (slot->output != NULL)
1625     return slot->output;
1626 
1627   /*
1628    * FIXME
1629    *
1630    * This method needs to be split into multiple parts
1631    *
1632    * 1) Figure out whether stream should be exposed or not
1633    *   This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence
1634    *   in the default stream attribution
1635    *
1636    * 2) Figure out whether an output stream should be created, whether
1637    *   we can re-use the output stream already linked to the slot, or
1638    *   whether we need to get re-assigned another (currently used) output
1639    *   stream.
1640    */
1641 
1642   stream_id = gst_stream_get_stream_id (slot->active_stream);
1643   caps = gst_stream_get_caps (slot->active_stream);
1644   GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps);
1645   gst_caps_unref (caps);
1646 
1647   /* 0. Emit autoplug-continue signal for pending caps ? */
1648   GST_FIXME_OBJECT (dbin, "emit autoplug-continue");
1649 
1650   /* 1. if in EXPOSE_ALL_MODE, just accept */
1651   GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE");
1652 
1653 #if 0
1654   /* FIXME : The idea around this was to avoid activating a stream for
1655    *     which we have no decoder. Unfortunately it is way too
1656    *     expensive. Need to figure out a better solution */
1657   /* 2. Is there a potential decoder (if one is required) */
1658   if (!gst_caps_can_intersect (caps, dbin->caps)
1659       && !have_factory (dbin, (GstCaps *) caps,
1660           GST_ELEMENT_FACTORY_TYPE_DECODER)) {
1661     GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT,
1662         caps);
1663     SELECTION_UNLOCK (dbin);
1664     gst_element_post_message (GST_ELEMENT_CAST (dbin),
1665         gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
1666     SELECTION_LOCK (dbin);
1667     return NULL;
1668   }
1669 #endif
1670 
1671   /* 3. In default mode check if we should expose */
1672   id_in_list = (gchar *) stream_in_list (dbin->requested_selection, stream_id);
1673   if (id_in_list) {
1674     /* Check if we can steal an existing output stream we could re-use.
1675      * that is:
1676      * * an output stream whose slot->stream is not in requested
1677      * * and is of the same type as this stream
1678      */
1679     output = find_free_compatible_output (dbin, slot->active_stream);
1680     if (output) {
1681       /* Move this output from its current slot to this slot */
1682       dbin->to_activate =
1683           g_list_append (dbin->to_activate, (gchar *) stream_id);
1684       dbin->requested_selection =
1685           g_list_remove (dbin->requested_selection, id_in_list);
1686       g_free (id_in_list);
1687       SELECTION_UNLOCK (dbin);
1688       gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
1689           (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL);
1690       SELECTION_LOCK (dbin);
1691       return NULL;
1692     }
1693 
1694     output = create_output_stream (dbin, slot->type);
1695     output->slot = slot;
1696     GST_DEBUG ("Linking slot %p to new output %p", slot, output);
1697     slot->output = output;
1698     dbin->active_selection =
1699         g_list_append (dbin->active_selection, (gchar *) stream_id);
1700   } else
1701     GST_DEBUG ("Not creating any output for slot %p", slot);
1702 
1703   return output;
1704 }
1705 
1706 /* Returns SELECTED_STREAMS message if active_selection is equal to
1707  * requested_selection, else NULL.
1708  * Must be called with LOCK taken */
1709 static GstMessage *
is_selection_done(GstDecodebin3 * dbin)1710 is_selection_done (GstDecodebin3 * dbin)
1711 {
1712   GList *tmp;
1713   GstMessage *msg;
1714 
1715   if (!dbin->selection_updated)
1716     return NULL;
1717 
1718   GST_LOG_OBJECT (dbin, "Checking");
1719 
1720   if (dbin->to_activate != NULL) {
1721     GST_DEBUG ("Still have streams to activate");
1722     return NULL;
1723   }
1724   for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) {
1725     GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data);
1726     if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) {
1727       GST_DEBUG ("Not in active selection, returning");
1728       return NULL;
1729     }
1730   }
1731 
1732   GST_DEBUG_OBJECT (dbin, "Selection active, creating message");
1733 
1734   /* We are completely active */
1735   msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection);
1736   if (dbin->select_streams_seqnum != GST_SEQNUM_INVALID) {
1737     gst_message_set_seqnum (msg, dbin->select_streams_seqnum);
1738   }
1739   for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1740     DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1741     if (output->slot) {
1742       GST_DEBUG_OBJECT (dbin, "Adding stream %s",
1743           gst_stream_get_stream_id (output->slot->active_stream));
1744 
1745       gst_message_streams_selected_add (msg, output->slot->active_stream);
1746     } else
1747       GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output);
1748   }
1749   dbin->selection_updated = FALSE;
1750   return msg;
1751 }
1752 
1753 /* Must be called with SELECTION_LOCK taken */
1754 static void
check_all_slot_for_eos(GstDecodebin3 * dbin)1755 check_all_slot_for_eos (GstDecodebin3 * dbin)
1756 {
1757   gboolean all_drained = TRUE;
1758   GList *iter;
1759 
1760   GST_DEBUG_OBJECT (dbin, "check slot for eos");
1761 
1762   for (iter = dbin->slots; iter; iter = iter->next) {
1763     MultiQueueSlot *slot = iter->data;
1764 
1765     if (!slot->output)
1766       continue;
1767 
1768     if (slot->is_drained) {
1769       GST_LOG_OBJECT (slot->sink_pad, "slot %p is drained", slot);
1770       continue;
1771     }
1772 
1773     all_drained = FALSE;
1774     break;
1775   }
1776 
1777   if (all_drained) {
1778     INPUT_LOCK (dbin);
1779     if (!pending_pads_are_eos (dbin->main_input))
1780       all_drained = FALSE;
1781 
1782     if (all_drained) {
1783       for (iter = dbin->other_inputs; iter; iter = iter->next) {
1784         if (!pending_pads_are_eos ((DecodebinInput *) iter->data)) {
1785           all_drained = FALSE;
1786           break;
1787         }
1788       }
1789     }
1790     INPUT_UNLOCK (dbin);
1791   }
1792 
1793   if (all_drained) {
1794     GST_DEBUG_OBJECT (dbin,
1795         "All active slots are drained, and no pending input, push EOS");
1796 
1797     for (iter = dbin->input_streams; iter; iter = iter->next) {
1798       DecodebinInputStream *input = (DecodebinInputStream *) iter->data;
1799       GstPad *peer = gst_pad_get_peer (input->srcpad);
1800 
1801       /* Send EOS to all slots */
1802       if (peer) {
1803         GstEvent *stream_start, *eos;
1804 
1805         stream_start =
1806             gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
1807 
1808         /* First forward a custom STREAM_START event to reset the EOS status (if any) */
1809         if (stream_start) {
1810           GstStructure *s;
1811           GstEvent *custom_stream_start = gst_event_copy (stream_start);
1812           gst_event_unref (stream_start);
1813           s = (GstStructure *) gst_event_get_structure (custom_stream_start);
1814           gst_structure_set (s, "decodebin3-flushing-stream-start",
1815               G_TYPE_BOOLEAN, TRUE, NULL);
1816           gst_pad_send_event (peer, custom_stream_start);
1817         }
1818 
1819         eos = gst_event_new_eos ();
1820         gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (eos),
1821             CUSTOM_FINAL_EOS_QUARK, (gchar *) CUSTOM_FINAL_EOS_QUARK_DATA,
1822             NULL);
1823         gst_pad_send_event (peer, eos);
1824         gst_object_unref (peer);
1825       } else
1826         GST_DEBUG_OBJECT (dbin, "no output");
1827     }
1828   }
1829 }
1830 
1831 static GstPadProbeReturn
multiqueue_src_probe(GstPad * pad,GstPadProbeInfo * info,MultiQueueSlot * slot)1832 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
1833     MultiQueueSlot * slot)
1834 {
1835   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
1836   GstDecodebin3 *dbin = slot->dbin;
1837 
1838   if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
1839     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
1840 
1841     GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
1842     switch (GST_EVENT_TYPE (ev)) {
1843       case GST_EVENT_STREAM_START:
1844       {
1845         GstStream *stream = NULL;
1846         const GstStructure *s = gst_event_get_structure (ev);
1847 
1848         /* Drop STREAM_START events used to cleanup multiqueue */
1849         if (s
1850             && gst_structure_has_field (s,
1851                 "decodebin3-flushing-stream-start")) {
1852           ret = GST_PAD_PROBE_HANDLED;
1853           gst_event_unref (ev);
1854           break;
1855         }
1856 
1857         gst_event_parse_stream (ev, &stream);
1858         if (stream == NULL) {
1859           GST_ERROR_OBJECT (pad,
1860               "Got a STREAM_START event without a GstStream");
1861           break;
1862         }
1863         slot->is_drained = FALSE;
1864         GST_DEBUG_OBJECT (pad, "Stream Start '%s'",
1865             gst_stream_get_stream_id (stream));
1866         if (slot->active_stream == NULL) {
1867           slot->active_stream = stream;
1868         } else if (slot->active_stream != stream) {
1869           GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !",
1870               gst_stream_get_stream_id (slot->active_stream),
1871               gst_stream_get_stream_id (stream));
1872           gst_object_unref (slot->active_stream);
1873           slot->active_stream = stream;
1874         } else
1875           gst_object_unref (stream);
1876 #if 0                           /* Disabled because stream-start is pushed for every buffer on every unlinked pad */
1877         {
1878           gboolean is_active, is_requested;
1879           /* Quick check to see if we're in the current selection */
1880           /* FIXME : Re-check all slot<=>output mappings based on requested_selection */
1881           SELECTION_LOCK (dbin);
1882           GST_DEBUG_OBJECT (dbin, "Checking active selection");
1883           is_active = stream_in_list (dbin->active_selection, stream_id);
1884           GST_DEBUG_OBJECT (dbin, "Checking requested selection");
1885           is_requested = stream_in_list (dbin->requested_selection, stream_id);
1886           SELECTION_UNLOCK (dbin);
1887           if (is_active)
1888             GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)",
1889                 slot->output);
1890           if (is_requested)
1891             GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)",
1892                 slot->output);
1893           else if (slot->output) {
1894             GST_DEBUG_OBJECT (pad,
1895                 "Slot needs to be deactivated ? It's no longer in requested selection");
1896           } else if (!is_active)
1897             GST_DEBUG_OBJECT (pad,
1898                 "Slot in neither active nor requested selection");
1899         }
1900 #endif
1901       }
1902         break;
1903       case GST_EVENT_CAPS:
1904       {
1905         /* Configure the output slot if needed */
1906         DecodebinOutputStream *output;
1907         GstMessage *msg = NULL;
1908         SELECTION_LOCK (dbin);
1909         output = get_output_for_slot (slot);
1910         if (output) {
1911           reconfigure_output_stream (output, slot);
1912           msg = is_selection_done (dbin);
1913         }
1914         SELECTION_UNLOCK (dbin);
1915         if (msg)
1916           gst_element_post_message ((GstElement *) slot->dbin, msg);
1917       }
1918         break;
1919       case GST_EVENT_EOS:
1920       {
1921         gboolean was_drained = slot->is_drained;
1922         slot->is_drained = TRUE;
1923 
1924         /* Custom EOS handling first */
1925         if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
1926                 CUSTOM_EOS_QUARK)) {
1927           /* remove custom-eos */
1928           gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev),
1929               CUSTOM_EOS_QUARK, NULL, NULL);
1930           GST_LOG_OBJECT (pad, "Received custom EOS");
1931           ret = GST_PAD_PROBE_HANDLED;
1932           SELECTION_LOCK (dbin);
1933           if (slot->input == NULL) {
1934             GST_DEBUG_OBJECT (pad,
1935                 "Got custom-eos from null input stream, remove output stream");
1936             /* Remove the output */
1937             if (slot->output) {
1938               DecodebinOutputStream *output = slot->output;
1939               dbin->output_streams =
1940                   g_list_remove (dbin->output_streams, output);
1941               free_output_stream (dbin, output);
1942               /* Reacalculate min interleave */
1943               gst_decodebin3_update_min_interleave (dbin);
1944             }
1945             slot->probe_id = 0;
1946             dbin->slots = g_list_remove (dbin->slots, slot);
1947             free_multiqueue_slot_async (dbin, slot);
1948             ret = GST_PAD_PROBE_REMOVE;
1949           } else if (!was_drained) {
1950             check_all_slot_for_eos (dbin);
1951           }
1952           if (ret == GST_PAD_PROBE_HANDLED)
1953             gst_event_unref (ev);
1954           SELECTION_UNLOCK (dbin);
1955           break;
1956         }
1957 
1958         GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
1959             slot->input);
1960         if (slot->input == NULL) {
1961           GstPad *peer;
1962           GST_DEBUG_OBJECT (pad,
1963               "last EOS for input, forwarding and removing slot");
1964           peer = gst_pad_get_peer (pad);
1965           if (peer) {
1966             gst_pad_send_event (peer, ev);
1967             gst_object_unref (peer);
1968           } else {
1969             gst_event_unref (ev);
1970           }
1971           SELECTION_LOCK (dbin);
1972           /* FIXME : Shouldn't we try to re-assign the output instead of just
1973            * removing it ? */
1974           /* Remove the output */
1975           if (slot->output) {
1976             DecodebinOutputStream *output = slot->output;
1977             dbin->output_streams = g_list_remove (dbin->output_streams, output);
1978             free_output_stream (dbin, output);
1979           }
1980           slot->probe_id = 0;
1981           dbin->slots = g_list_remove (dbin->slots, slot);
1982           SELECTION_UNLOCK (dbin);
1983 
1984           free_multiqueue_slot_async (dbin, slot);
1985           ret = GST_PAD_PROBE_REMOVE;
1986         } else if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
1987                 CUSTOM_FINAL_EOS_QUARK)) {
1988           GST_DEBUG_OBJECT (pad, "Got final eos, propagating downstream");
1989         } else {
1990           GST_DEBUG_OBJECT (pad, "Got regular eos (all_inputs_are_eos)");
1991           /* drop current event as eos will be sent in check_all_slot_for_eos
1992            * when all output streams are also eos */
1993           ret = GST_PAD_PROBE_DROP;
1994           SELECTION_LOCK (dbin);
1995           check_all_slot_for_eos (dbin);
1996           SELECTION_UNLOCK (dbin);
1997         }
1998       }
1999         break;
2000       default:
2001         break;
2002     }
2003   } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
2004     GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
2005     switch (GST_QUERY_TYPE (query)) {
2006       case GST_QUERY_CAPS:
2007       {
2008         GST_DEBUG_OBJECT (pad, "Intercepting CAPS query");
2009         gst_query_set_caps_result (query, GST_CAPS_ANY);
2010         ret = GST_PAD_PROBE_HANDLED;
2011       }
2012         break;
2013 
2014       case GST_QUERY_ACCEPT_CAPS:
2015       {
2016         GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query");
2017         /* If the current decoder doesn't accept caps, we'll reconfigure
2018          * on the actual caps event. So accept any caps. */
2019         gst_query_set_accept_caps_result (query, TRUE);
2020         ret = GST_PAD_PROBE_HANDLED;
2021       }
2022       default:
2023         break;
2024     }
2025   }
2026 
2027   return ret;
2028 }
2029 
2030 /* Create a new multiqueue slot for the given type
2031  *
2032  * It is up to the caller to know whether that slot is needed or not
2033  * (and release it when no longer needed) */
2034 static MultiQueueSlot *
create_new_slot(GstDecodebin3 * dbin,GstStreamType type)2035 create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
2036 {
2037   MultiQueueSlot *slot;
2038   GstIterator *it = NULL;
2039   GValue item = { 0, };
2040 
2041   GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s",
2042       gst_stream_type_get_name (type));
2043   slot = g_new0 (MultiQueueSlot, 1);
2044   slot->dbin = dbin;
2045 
2046   slot->id = dbin->slot_id++;
2047 
2048   slot->type = type;
2049   slot->sink_pad = gst_element_request_pad_simple (dbin->multiqueue, "sink_%u");
2050   if (slot->sink_pad == NULL)
2051     goto fail;
2052 
2053   it = gst_pad_iterate_internal_links (slot->sink_pad);
2054   if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
2055       || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
2056     GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s",
2057         GST_DEBUG_PAD_NAME (slot->src_pad));
2058     goto fail;
2059   }
2060   gst_iterator_free (it);
2061   g_value_reset (&item);
2062 
2063   g_object_set (slot->sink_pad, "group-id", (guint) type, NULL);
2064 
2065   /* Add event probe */
2066   slot->probe_id =
2067       gst_pad_add_probe (slot->src_pad,
2068       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2069       (GstPadProbeCallback) multiqueue_src_probe, slot, NULL);
2070 
2071   GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
2072       GST_DEBUG_PAD_NAME (slot->src_pad));
2073 
2074   dbin->slots = g_list_append (dbin->slots, slot);
2075 
2076   return slot;
2077 
2078   /* ERRORS */
2079 fail:
2080   {
2081     if (slot->sink_pad)
2082       gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2083     g_free (slot);
2084     return NULL;
2085   }
2086 }
2087 
2088 /* Must be called with SELECTION_LOCK */
2089 static MultiQueueSlot *
get_slot_for_input(GstDecodebin3 * dbin,DecodebinInputStream * input)2090 get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
2091 {
2092   GList *tmp;
2093   MultiQueueSlot *empty_slot = NULL;
2094   GstStreamType input_type = 0;
2095   gchar *stream_id = NULL;
2096 
2097   GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)",
2098       input, input->active_stream,
2099       input->
2100       active_stream ? gst_stream_get_stream_id (input->active_stream) : "");
2101 
2102   if (input->active_stream) {
2103     input_type = gst_stream_get_stream_type (input->active_stream);
2104     stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream);
2105   }
2106 
2107   /* Go over existing slots and check if there is already one for it */
2108   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2109     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2110     /* Already used input, return that one */
2111     if (slot->input == input) {
2112       GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id);
2113       return slot;
2114     }
2115   }
2116 
2117   /* Go amongst all unused slots of the right type and try to find a candidate */
2118   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2119     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2120     if (slot->input == NULL && input_type == slot->type) {
2121       /* Remember this empty slot for later */
2122       empty_slot = slot;
2123       /* Check if available slot is of the same stream_id */
2124       GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)",
2125           slot->id, slot->active_stream);
2126       if (stream_id && slot->active_stream) {
2127         gchar *ostream_id =
2128             (gchar *) gst_stream_get_stream_id (slot->active_stream);
2129         GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id,
2130             ostream_id, stream_id);
2131         if (!g_strcmp0 (stream_id, ostream_id))
2132           break;
2133       }
2134     }
2135   }
2136 
2137   if (empty_slot) {
2138     GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id);
2139     empty_slot->input = input;
2140     return empty_slot;
2141   }
2142 
2143   if (input_type)
2144     return create_new_slot (dbin, input_type);
2145 
2146   return NULL;
2147 }
2148 
2149 static void
link_input_to_slot(DecodebinInputStream * input,MultiQueueSlot * slot)2150 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
2151 {
2152   if (slot->input != NULL && slot->input != input) {
2153     GST_ERROR_OBJECT (slot->dbin,
2154         "Trying to link input to an already used slot");
2155     return;
2156   }
2157   gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
2158   slot->pending_stream = input->active_stream;
2159   slot->input = input;
2160 }
2161 
2162 #if 0
2163 static gboolean
2164 have_factory (GstDecodebin3 * dbin, GstCaps * caps,
2165     GstElementFactoryListType ftype)
2166 {
2167   gboolean ret = FALSE;
2168   GList *res;
2169 
2170   g_mutex_lock (&dbin->factories_lock);
2171   gst_decode_bin_update_factories_list (dbin);
2172   if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
2173     res =
2174         gst_element_factory_list_filter (dbin->decoder_factories,
2175         caps, GST_PAD_SINK, TRUE);
2176   else
2177     res =
2178         gst_element_factory_list_filter (dbin->decodable_factories,
2179         caps, GST_PAD_SINK, TRUE);
2180   g_mutex_unlock (&dbin->factories_lock);
2181 
2182   if (res) {
2183     ret = TRUE;
2184     gst_plugin_feature_list_free (res);
2185   }
2186 
2187   return ret;
2188 }
2189 #endif
2190 
2191 static GList *
create_decoder_factory_list(GstDecodebin3 * dbin,GstCaps * caps)2192 create_decoder_factory_list (GstDecodebin3 * dbin, GstCaps * caps)
2193 {
2194   GList *res;
2195 
2196   g_mutex_lock (&dbin->factories_lock);
2197   gst_decode_bin_update_factories_list (dbin);
2198   res = gst_element_factory_list_filter (dbin->decoder_factories,
2199       caps, GST_PAD_SINK, TRUE);
2200   g_mutex_unlock (&dbin->factories_lock);
2201   return res;
2202 }
2203 
2204 static GstPadProbeReturn
keyframe_waiter_probe(GstPad * pad,GstPadProbeInfo * info,DecodebinOutputStream * output)2205 keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
2206     DecodebinOutputStream * output)
2207 {
2208   GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2209   /* If we have a keyframe, remove the probe and let all data through */
2210   /* FIXME : HANDLE HEADER BUFFER ?? */
2211   if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
2212       GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
2213     GST_DEBUG_OBJECT (pad,
2214         "Buffer is keyframe or header, letting through and removing probe");
2215     output->drop_probe_id = 0;
2216     return GST_PAD_PROBE_REMOVE;
2217   }
2218   GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping");
2219   return GST_PAD_PROBE_DROP;
2220 }
2221 
2222 static void
reconfigure_output_stream(DecodebinOutputStream * output,MultiQueueSlot * slot)2223 reconfigure_output_stream (DecodebinOutputStream * output,
2224     MultiQueueSlot * slot)
2225 {
2226   GstDecodebin3 *dbin = output->dbin;
2227   GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
2228   gboolean needs_decoder;
2229 
2230   needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
2231 
2232   GST_DEBUG_OBJECT (dbin,
2233       "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
2234       needs_decoder);
2235 
2236   /* FIXME : Maybe make the output un-hook itself automatically ? */
2237   if (output->slot != NULL && output->slot != slot) {
2238     GST_WARNING_OBJECT (dbin,
2239         "Output still linked to another slot (%p)", output->slot);
2240     gst_caps_unref (new_caps);
2241     return;
2242   }
2243 
2244   /* Check if existing config is reusable as-is by checking if
2245    * the existing decoder accepts the new caps, if not delete
2246    * it and create a new one */
2247   if (output->decoder) {
2248     gboolean can_reuse_decoder;
2249 
2250     if (needs_decoder) {
2251       can_reuse_decoder =
2252           gst_pad_query_accept_caps (output->decoder_sink, new_caps);
2253     } else
2254       can_reuse_decoder = FALSE;
2255 
2256     if (can_reuse_decoder) {
2257       if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
2258         GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2259         output->drop_probe_id =
2260             gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2261             (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2262       }
2263       GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
2264       if (output->linked == FALSE) {
2265         gst_pad_link_full (slot->src_pad, output->decoder_sink,
2266             GST_PAD_LINK_CHECK_NOTHING);
2267         output->linked = TRUE;
2268       }
2269       gst_caps_unref (new_caps);
2270       return;
2271     }
2272 
2273     GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
2274 
2275     if (output->linked)
2276       gst_pad_unlink (slot->src_pad, output->decoder_sink);
2277     output->linked = FALSE;
2278     if (output->drop_probe_id) {
2279       gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2280       output->drop_probe_id = 0;
2281     }
2282 
2283     if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2284       GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
2285       gst_caps_unref (new_caps);
2286       goto cleanup;
2287     }
2288 
2289     gst_element_set_locked_state (output->decoder, TRUE);
2290     gst_element_set_state (output->decoder, GST_STATE_NULL);
2291 
2292     gst_bin_remove ((GstBin *) dbin, output->decoder);
2293     output->decoder = NULL;
2294     output->decoder_latency = GST_CLOCK_TIME_NONE;
2295   } else if (output->linked) {
2296     /* Otherwise if we have no decoder yet but the output is linked make
2297      * sure that the ghost pad is really unlinked in case no decoder was
2298      * needed previously */
2299     if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2300       GST_ERROR_OBJECT (dbin, "Could not release ghost pad");
2301       gst_caps_unref (new_caps);
2302       goto cleanup;
2303     }
2304   }
2305 
2306   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2307   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2308 
2309   /* If a decoder is required, create one */
2310   if (needs_decoder) {
2311     GList *factories, *next_factory;
2312 
2313     factories = next_factory = create_decoder_factory_list (dbin, new_caps);
2314     while (!output->decoder) {
2315       gboolean decoder_failed = FALSE;
2316 
2317       /* If we don't have a decoder yet, instantiate one */
2318       if (next_factory) {
2319         output->decoder = gst_element_factory_create ((GstElementFactory *)
2320             next_factory->data, NULL);
2321         GST_DEBUG ("Created decoder '%s'", GST_ELEMENT_NAME (output->decoder));
2322       } else
2323         GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT,
2324             new_caps);
2325 
2326       if (output->decoder == NULL) {
2327         GstCaps *caps;
2328 
2329         SELECTION_UNLOCK (dbin);
2330         /* FIXME : Should we be smarter if there's a missing decoder ?
2331          * Should we deactivate that stream ? */
2332         caps = gst_stream_get_caps (slot->active_stream);
2333         gst_element_post_message (GST_ELEMENT_CAST (dbin),
2334             gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2335         gst_caps_unref (caps);
2336         SELECTION_LOCK (dbin);
2337         goto cleanup;
2338       }
2339       if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
2340         GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline");
2341         goto cleanup;
2342       }
2343       output->decoder_sink =
2344           gst_element_get_static_pad (output->decoder, "sink");
2345       output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
2346       if (output->type & GST_STREAM_TYPE_VIDEO) {
2347         GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2348         output->drop_probe_id =
2349             gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2350             (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2351       }
2352       if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
2353               GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
2354         GST_ERROR_OBJECT (dbin, "could not link to %s:%s",
2355             GST_DEBUG_PAD_NAME (output->decoder_sink));
2356         goto cleanup;
2357       }
2358       if (gst_element_set_state (output->decoder,
2359               GST_STATE_READY) == GST_STATE_CHANGE_FAILURE) {
2360         GST_DEBUG_OBJECT (dbin,
2361             "Decoder '%s' failed to reach READY state, trying the next type",
2362             GST_ELEMENT_NAME (output->decoder));
2363         decoder_failed = TRUE;
2364       }
2365       if (!gst_pad_query_accept_caps (output->decoder_sink, new_caps)) {
2366         GST_DEBUG_OBJECT (dbin,
2367             "Decoder '%s' did not accept the caps, trying the next type",
2368             GST_ELEMENT_NAME (output->decoder));
2369         decoder_failed = TRUE;
2370       }
2371       if (decoder_failed) {
2372         gst_pad_unlink (slot->src_pad, output->decoder_sink);
2373         if (output->drop_probe_id) {
2374           gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2375           output->drop_probe_id = 0;
2376         }
2377 
2378         gst_element_set_locked_state (output->decoder, TRUE);
2379         gst_element_set_state (output->decoder, GST_STATE_NULL);
2380 
2381         gst_bin_remove ((GstBin *) dbin, output->decoder);
2382         output->decoder = NULL;
2383       }
2384       next_factory = next_factory->next;
2385     }
2386     gst_plugin_feature_list_free (factories);
2387   } else {
2388     output->decoder_src = gst_object_ref (slot->src_pad);
2389     output->decoder_sink = NULL;
2390   }
2391   gst_caps_unref (new_caps);
2392 
2393   output->linked = TRUE;
2394   if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad,
2395           output->decoder_src)) {
2396     GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
2397     goto cleanup;
2398   }
2399   if (output->src_exposed == FALSE) {
2400     GstEvent *stream_start;
2401 
2402     stream_start = gst_pad_get_sticky_event (slot->src_pad,
2403         GST_EVENT_STREAM_START, 0);
2404 
2405     /* Ensure GstStream is accesiable from pad-added callback */
2406     if (stream_start) {
2407       gst_pad_store_sticky_event (output->src_pad, stream_start);
2408       gst_event_unref (stream_start);
2409     } else {
2410       GST_WARNING_OBJECT (slot->src_pad,
2411           "Pad has no stored stream-start event");
2412     }
2413 
2414     output->src_exposed = TRUE;
2415     gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
2416   }
2417 
2418   if (output->decoder)
2419     gst_element_sync_state_with_parent (output->decoder);
2420 
2421   output->slot = slot;
2422   return;
2423 
2424 cleanup:
2425   {
2426     GST_DEBUG_OBJECT (dbin, "Cleanup");
2427     if (output->decoder_sink) {
2428       gst_object_unref (output->decoder_sink);
2429       output->decoder_sink = NULL;
2430     }
2431     if (output->decoder_src) {
2432       gst_object_unref (output->decoder_src);
2433       output->decoder_src = NULL;
2434     }
2435     if (output->decoder) {
2436       gst_element_set_state (output->decoder, GST_STATE_NULL);
2437       gst_bin_remove ((GstBin *) dbin, output->decoder);
2438       output->decoder = NULL;
2439     }
2440   }
2441 }
2442 
2443 static GstPadProbeReturn
idle_reconfigure(GstPad * pad,GstPadProbeInfo * info,MultiQueueSlot * slot)2444 idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
2445 {
2446   GstMessage *msg = NULL;
2447   DecodebinOutputStream *output;
2448 
2449   SELECTION_LOCK (slot->dbin);
2450   output = get_output_for_slot (slot);
2451 
2452   GST_DEBUG_OBJECT (pad, "output : %p", output);
2453 
2454   if (output) {
2455     reconfigure_output_stream (output, slot);
2456     msg = is_selection_done (slot->dbin);
2457   }
2458   SELECTION_UNLOCK (slot->dbin);
2459   if (msg)
2460     gst_element_post_message ((GstElement *) slot->dbin, msg);
2461 
2462   return GST_PAD_PROBE_REMOVE;
2463 }
2464 
2465 static MultiQueueSlot *
find_slot_for_stream_id(GstDecodebin3 * dbin,const gchar * sid)2466 find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid)
2467 {
2468   GList *tmp;
2469 
2470   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2471     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2472     const gchar *stream_id;
2473     if (slot->active_stream) {
2474       stream_id = gst_stream_get_stream_id (slot->active_stream);
2475       if (!g_strcmp0 (sid, stream_id))
2476         return slot;
2477     }
2478     if (slot->pending_stream && slot->pending_stream != slot->active_stream) {
2479       stream_id = gst_stream_get_stream_id (slot->pending_stream);
2480       if (!g_strcmp0 (sid, stream_id))
2481         return slot;
2482     }
2483   }
2484 
2485   return NULL;
2486 }
2487 
2488 /* This function handles the reassignment of a slot. Call this from
2489  * the streaming thread of a slot. */
2490 static gboolean
reassign_slot(GstDecodebin3 * dbin,MultiQueueSlot * slot)2491 reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2492 {
2493   DecodebinOutputStream *output;
2494   MultiQueueSlot *target_slot = NULL;
2495   GList *tmp;
2496   const gchar *sid, *tsid;
2497 
2498   SELECTION_LOCK (dbin);
2499   output = slot->output;
2500 
2501   if (G_UNLIKELY (slot->active_stream == NULL)) {
2502     GST_DEBUG_OBJECT (slot->src_pad,
2503         "Called on inactive slot (active_stream == NULL)");
2504     SELECTION_UNLOCK (dbin);
2505     return FALSE;
2506   }
2507 
2508   if (G_UNLIKELY (output == NULL)) {
2509     GST_DEBUG_OBJECT (slot->src_pad,
2510         "Slot doesn't have any output to be removed");
2511     SELECTION_UNLOCK (dbin);
2512     return FALSE;
2513   }
2514 
2515   sid = gst_stream_get_stream_id (slot->active_stream);
2516   GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot);
2517 
2518   /* Recheck whether this stream is still in the list of streams to deactivate */
2519   if (stream_in_list (dbin->requested_selection, sid)) {
2520     /* Stream is in the list of requested streams, don't remove */
2521     SELECTION_UNLOCK (dbin);
2522     GST_DEBUG_OBJECT (slot->src_pad,
2523         "Stream '%s' doesn't need to be deactivated", sid);
2524     return FALSE;
2525   }
2526 
2527   /* Unlink slot from output */
2528   /* FIXME : Handle flushing ? */
2529   /* FIXME : Handle outputs without decoders */
2530   GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p",
2531       output->decoder_sink);
2532   if (output->decoder_sink)
2533     gst_pad_unlink (slot->src_pad, output->decoder_sink);
2534   output->linked = FALSE;
2535   slot->output = NULL;
2536   output->slot = NULL;
2537   /* Remove sid from active selection */
2538   for (tmp = dbin->active_selection; tmp; tmp = tmp->next)
2539     if (!g_strcmp0 (sid, tmp->data)) {
2540       dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp);
2541       break;
2542     }
2543 
2544   /* Can we re-assign this output to a requested stream ? */
2545   GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream");
2546   for (tmp = dbin->to_activate; tmp; tmp = tmp->next) {
2547     MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data);
2548     GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)",
2549         tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream));
2550     if (tslot && tslot->type == output->type && tslot->output == NULL) {
2551       GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot");
2552       target_slot = tslot;
2553       tsid = tmp->data;
2554       /* Pass target stream id to requested selection */
2555       dbin->requested_selection =
2556           g_list_append (dbin->requested_selection, g_strdup (tmp->data));
2557       dbin->to_activate = g_list_remove (dbin->to_activate, tmp->data);
2558       break;
2559     }
2560   }
2561 
2562   if (target_slot) {
2563     GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'",
2564         target_slot, tsid);
2565     target_slot->output = output;
2566     output->slot = target_slot;
2567     dbin->active_selection =
2568         g_list_append (dbin->active_selection, (gchar *) tsid);
2569     SELECTION_UNLOCK (dbin);
2570 
2571     /* Wakeup the target slot so that it retries to send events/buffers
2572      * thereby triggering the output reconfiguration codepath */
2573     gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2574         (GstPadProbeCallback) idle_reconfigure, target_slot, NULL);
2575     /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */
2576   } else {
2577     GstMessage *msg;
2578 
2579     dbin->output_streams = g_list_remove (dbin->output_streams, output);
2580     free_output_stream (dbin, output);
2581     msg = is_selection_done (slot->dbin);
2582     SELECTION_UNLOCK (dbin);
2583 
2584     if (msg)
2585       gst_element_post_message ((GstElement *) slot->dbin, msg);
2586   }
2587 
2588   return TRUE;
2589 }
2590 
2591 /* Idle probe called when a slot should be unassigned from its output stream.
2592  * This is needed to ensure nothing is flowing when unlinking the slot.
2593  *
2594  * Also, this method will search for a pending stream which could re-use
2595  * the output stream. */
2596 static GstPadProbeReturn
slot_unassign_probe(GstPad * pad,GstPadProbeInfo * info,MultiQueueSlot * slot)2597 slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info,
2598     MultiQueueSlot * slot)
2599 {
2600   GstDecodebin3 *dbin = slot->dbin;
2601 
2602   reassign_slot (dbin, slot);
2603 
2604   return GST_PAD_PROBE_REMOVE;
2605 }
2606 
2607 static gboolean
handle_stream_switch(GstDecodebin3 * dbin,GList * select_streams,guint32 seqnum)2608 handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
2609     guint32 seqnum)
2610 {
2611   gboolean ret = TRUE;
2612   GList *tmp;
2613   /* List of slots to (de)activate. */
2614   GList *to_deactivate = NULL;
2615   GList *to_activate = NULL;
2616   /* List of unknown stream id, most likely means the event
2617    * should be sent upstream so that elements can expose the requested stream */
2618   GList *unknown = NULL;
2619   GList *to_reassign = NULL;
2620   GList *future_request_streams = NULL;
2621   GList *pending_streams = NULL;
2622   GList *slots_to_reassign = NULL;
2623 
2624   SELECTION_LOCK (dbin);
2625   if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) {
2626     GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime");
2627     SELECTION_UNLOCK (dbin);
2628     return TRUE;
2629   }
2630   /* Remove pending select_streams */
2631   g_list_free (dbin->pending_select_streams);
2632   dbin->pending_select_streams = NULL;
2633 
2634   /* COMPARE the requested streams to the active and requested streams
2635    * on multiqueue. */
2636 
2637   /* First check the slots to activate and which ones are unknown */
2638   for (tmp = select_streams; tmp; tmp = tmp->next) {
2639     const gchar *sid = (const gchar *) tmp->data;
2640     MultiQueueSlot *slot;
2641     GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid);
2642     slot = find_slot_for_stream_id (dbin, sid);
2643     /* Find the corresponding slot */
2644     if (slot == NULL) {
2645       if (stream_in_collection (dbin, (gchar *) sid)) {
2646         pending_streams = g_list_append (pending_streams, (gchar *) sid);
2647       } else {
2648         GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid);
2649         unknown = g_list_append (unknown, (gchar *) sid);
2650       }
2651     } else if (slot->output == NULL) {
2652       GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')",
2653           slot, sid);
2654       to_activate = g_list_append (to_activate, slot);
2655     } else {
2656       GST_DEBUG_OBJECT (dbin,
2657           "Stream '%s' from slot %p is already active on output %p", sid, slot,
2658           slot->output);
2659       future_request_streams =
2660           g_list_append (future_request_streams, (gchar *) sid);
2661     }
2662   }
2663 
2664   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2665     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2666     /* For slots that have an output, check if it's part of the streams to
2667      * be active */
2668     if (slot->output) {
2669       gboolean slot_to_deactivate = TRUE;
2670 
2671       if (slot->active_stream) {
2672         if (stream_in_list (select_streams,
2673                 gst_stream_get_stream_id (slot->active_stream)))
2674           slot_to_deactivate = FALSE;
2675       }
2676       if (slot_to_deactivate && slot->pending_stream
2677           && slot->pending_stream != slot->active_stream) {
2678         if (stream_in_list (select_streams,
2679                 gst_stream_get_stream_id (slot->pending_stream)))
2680           slot_to_deactivate = FALSE;
2681       }
2682       if (slot_to_deactivate) {
2683         GST_DEBUG_OBJECT (dbin,
2684             "Slot %p (%s) should be deactivated, no longer used", slot,
2685             slot->
2686             active_stream ? gst_stream_get_stream_id (slot->active_stream) :
2687             "NULL");
2688         to_deactivate = g_list_append (to_deactivate, slot);
2689       }
2690     }
2691   }
2692 
2693   if (to_deactivate != NULL) {
2694     GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots");
2695     /* We need to compare what needs to be activated and deactivated in order
2696      * to determine whether there are outputs that can be transferred */
2697     /* Take the stream-id of the slots that are to be activated, for which there
2698      * is a slot of the same type that needs to be deactivated */
2699     tmp = to_deactivate;
2700     while (tmp) {
2701       MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data;
2702       gboolean removeit = FALSE;
2703       GList *tmp2, *next;
2704       GST_DEBUG_OBJECT (dbin,
2705           "Checking if slot to deactivate (%p) has a candidate slot to activate",
2706           slot_to_deactivate);
2707       for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) {
2708         MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data;
2709         GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate);
2710         if (slot_to_activate->type == slot_to_deactivate->type) {
2711           GST_DEBUG_OBJECT (dbin, "Re-using");
2712           to_reassign = g_list_append (to_reassign, (gchar *)
2713               gst_stream_get_stream_id (slot_to_activate->active_stream));
2714           slots_to_reassign =
2715               g_list_append (slots_to_reassign, slot_to_deactivate);
2716           to_activate = g_list_remove (to_activate, slot_to_activate);
2717           removeit = TRUE;
2718           break;
2719         }
2720       }
2721       next = tmp->next;
2722       if (removeit)
2723         to_deactivate = g_list_delete_link (to_deactivate, tmp);
2724       tmp = next;
2725     }
2726   }
2727 
2728   for (tmp = to_deactivate; tmp; tmp = tmp->next) {
2729     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2730     GST_DEBUG_OBJECT (dbin,
2731         "Really need to deactivate slot %p, but no available alternative",
2732         slot);
2733 
2734     slots_to_reassign = g_list_append (slots_to_reassign, slot);
2735   }
2736 
2737   /* The only slots left to activate are the ones that won't be reassigned and
2738    * therefore really need to have a new output created */
2739   for (tmp = to_activate; tmp; tmp = tmp->next) {
2740     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2741     if (slot->active_stream)
2742       future_request_streams =
2743           g_list_append (future_request_streams,
2744           (gchar *) gst_stream_get_stream_id (slot->active_stream));
2745     else if (slot->pending_stream)
2746       future_request_streams =
2747           g_list_append (future_request_streams,
2748           (gchar *) gst_stream_get_stream_id (slot->pending_stream));
2749     else
2750       GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot);
2751   }
2752 
2753   if (to_activate == NULL && pending_streams != NULL) {
2754     GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection");
2755     if (dbin->requested_selection)
2756       g_list_free_full (dbin->requested_selection, g_free);
2757     dbin->requested_selection =
2758         g_list_copy_deep (select_streams, (GCopyFunc) g_strdup, NULL);
2759     g_list_free (to_deactivate);
2760     g_list_free (pending_streams);
2761     to_deactivate = NULL;
2762     pending_streams = NULL;
2763   } else {
2764     if (dbin->requested_selection)
2765       g_list_free_full (dbin->requested_selection, g_free);
2766     dbin->requested_selection =
2767         g_list_copy_deep (future_request_streams, (GCopyFunc) g_strdup, NULL);
2768     dbin->requested_selection =
2769         g_list_concat (dbin->requested_selection,
2770         g_list_copy_deep (pending_streams, (GCopyFunc) g_strdup, NULL));
2771     if (dbin->to_activate)
2772       g_list_free (dbin->to_activate);
2773     dbin->to_activate = g_list_copy (to_reassign);
2774   }
2775 
2776   dbin->selection_updated = TRUE;
2777   SELECTION_UNLOCK (dbin);
2778 
2779   if (unknown) {
2780     GST_FIXME_OBJECT (dbin, "Got request for an unknown stream");
2781     g_list_free (unknown);
2782   }
2783 
2784   if (to_activate && !slots_to_reassign) {
2785     for (tmp = to_activate; tmp; tmp = tmp->next) {
2786       MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2787       gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2788           (GstPadProbeCallback) idle_reconfigure, slot, NULL);
2789     }
2790   }
2791 
2792   /* For all streams to deactivate, add an idle probe where we will do
2793    * the unassignment and switch over */
2794   for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {
2795     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2796     gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2797         (GstPadProbeCallback) slot_unassign_probe, slot, NULL);
2798   }
2799 
2800   if (to_deactivate)
2801     g_list_free (to_deactivate);
2802   if (to_activate)
2803     g_list_free (to_activate);
2804   if (to_reassign)
2805     g_list_free (to_reassign);
2806   if (future_request_streams)
2807     g_list_free (future_request_streams);
2808   if (pending_streams)
2809     g_list_free (pending_streams);
2810   if (slots_to_reassign)
2811     g_list_free (slots_to_reassign);
2812 
2813   return ret;
2814 }
2815 
2816 static GstPadProbeReturn
ghost_pad_event_probe(GstPad * pad,GstPadProbeInfo * info,DecodebinOutputStream * output)2817 ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
2818     DecodebinOutputStream * output)
2819 {
2820   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2821   GstDecodebin3 *dbin = output->dbin;
2822   GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
2823 
2824   GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event));
2825 
2826   switch (GST_EVENT_TYPE (event)) {
2827     case GST_EVENT_SELECT_STREAMS:
2828     {
2829       GstPad *peer;
2830       GList *streams = NULL;
2831       guint32 seqnum = gst_event_get_seqnum (event);
2832 
2833       SELECTION_LOCK (dbin);
2834       if (seqnum == dbin->select_streams_seqnum) {
2835         SELECTION_UNLOCK (dbin);
2836         GST_DEBUG_OBJECT (pad,
2837             "Already handled/handling that SELECT_STREAMS event");
2838         gst_event_unref (event);
2839         ret = GST_PAD_PROBE_HANDLED;
2840         break;
2841       }
2842       dbin->select_streams_seqnum = seqnum;
2843       if (dbin->pending_select_streams != NULL) {
2844         GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2845         g_list_free (dbin->pending_select_streams);
2846         dbin->pending_select_streams = NULL;
2847       }
2848       gst_event_parse_select_streams (event, &streams);
2849       dbin->pending_select_streams = g_list_copy (streams);
2850       SELECTION_UNLOCK (dbin);
2851 
2852       /* Send event upstream */
2853       if ((peer = gst_pad_get_peer (pad))) {
2854         gst_pad_send_event (peer, event);
2855         gst_object_unref (peer);
2856       } else {
2857         gst_event_unref (event);
2858       }
2859       /* Finally handle the switch */
2860       if (streams) {
2861         handle_stream_switch (dbin, streams, seqnum);
2862         g_list_free_full (streams, g_free);
2863       }
2864       ret = GST_PAD_PROBE_HANDLED;
2865     }
2866       break;
2867     default:
2868       break;
2869   }
2870 
2871   return ret;
2872 }
2873 
2874 static gboolean
gst_decodebin3_send_event(GstElement * element,GstEvent * event)2875 gst_decodebin3_send_event (GstElement * element, GstEvent * event)
2876 {
2877   GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event));
2878   if (GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) {
2879     GstDecodebin3 *dbin = (GstDecodebin3 *) element;
2880     GList *streams = NULL;
2881     guint32 seqnum = gst_event_get_seqnum (event);
2882 
2883     SELECTION_LOCK (dbin);
2884     if (seqnum == dbin->select_streams_seqnum) {
2885       SELECTION_UNLOCK (dbin);
2886       GST_DEBUG_OBJECT (dbin,
2887           "Already handled/handling that SELECT_STREAMS event");
2888       return TRUE;
2889     }
2890     dbin->select_streams_seqnum = seqnum;
2891     if (dbin->pending_select_streams != NULL) {
2892       GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2893       g_list_free (dbin->pending_select_streams);
2894       dbin->pending_select_streams = NULL;
2895     }
2896     gst_event_parse_select_streams (event, &streams);
2897     dbin->pending_select_streams = g_list_copy (streams);
2898     SELECTION_UNLOCK (dbin);
2899 
2900     /* FIXME : We don't have an upstream ?? */
2901 #if 0
2902     /* Send event upstream */
2903     if ((peer = gst_pad_get_peer (pad))) {
2904       gst_pad_send_event (peer, event);
2905       gst_object_unref (peer);
2906     }
2907 #endif
2908     /* Finally handle the switch */
2909     if (streams) {
2910       handle_stream_switch (dbin, streams, seqnum);
2911       g_list_free_full (streams, g_free);
2912     }
2913 
2914     gst_event_unref (event);
2915     return TRUE;
2916   }
2917   return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
2918 }
2919 
2920 
2921 static void
free_multiqueue_slot(GstDecodebin3 * dbin,MultiQueueSlot * slot)2922 free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2923 {
2924   if (slot->probe_id)
2925     gst_pad_remove_probe (slot->src_pad, slot->probe_id);
2926   if (slot->input) {
2927     if (slot->input->srcpad)
2928       gst_pad_unlink (slot->input->srcpad, slot->sink_pad);
2929   }
2930 
2931   gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2932   gst_object_replace ((GstObject **) & slot->sink_pad, NULL);
2933   gst_object_replace ((GstObject **) & slot->src_pad, NULL);
2934   gst_object_replace ((GstObject **) & slot->active_stream, NULL);
2935   g_free (slot);
2936 }
2937 
2938 static void
free_multiqueue_slot_async(GstDecodebin3 * dbin,MultiQueueSlot * slot)2939 free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2940 {
2941   GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free");
2942   gst_element_call_async (GST_ELEMENT_CAST (dbin),
2943       (GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
2944 }
2945 
2946 /* Create a DecodebinOutputStream for a given type
2947  * Note: It will be empty initially, it needs to be configured
2948  * afterwards */
2949 static DecodebinOutputStream *
create_output_stream(GstDecodebin3 * dbin,GstStreamType type)2950 create_output_stream (GstDecodebin3 * dbin, GstStreamType type)
2951 {
2952   DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1);
2953   gchar *pad_name;
2954   const gchar *prefix;
2955   GstStaticPadTemplate *templ;
2956   GstPadTemplate *ptmpl;
2957   guint32 *counter;
2958   GstPad *internal_pad;
2959 
2960   GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s",
2961       res, gst_stream_type_get_name (type));
2962 
2963   res->type = type;
2964   res->dbin = dbin;
2965   res->decoder_latency = GST_CLOCK_TIME_NONE;
2966 
2967   if (type & GST_STREAM_TYPE_VIDEO) {
2968     templ = &video_src_template;
2969     counter = &dbin->vpadcount;
2970     prefix = "video";
2971   } else if (type & GST_STREAM_TYPE_AUDIO) {
2972     templ = &audio_src_template;
2973     counter = &dbin->apadcount;
2974     prefix = "audio";
2975   } else if (type & GST_STREAM_TYPE_TEXT) {
2976     templ = &text_src_template;
2977     counter = &dbin->tpadcount;
2978     prefix = "text";
2979   } else {
2980     templ = &src_template;
2981     counter = &dbin->opadcount;
2982     prefix = "src";
2983   }
2984 
2985   pad_name = g_strdup_printf ("%s_%u", prefix, *counter);
2986   *counter += 1;
2987   ptmpl = gst_static_pad_template_get (templ);
2988   res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl);
2989   gst_object_unref (ptmpl);
2990   g_free (pad_name);
2991   gst_pad_set_active (res->src_pad, TRUE);
2992   /* Put an event probe on the internal proxy pad to detect upstream
2993    * events */
2994   internal_pad =
2995       (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad);
2996   gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
2997       (GstPadProbeCallback) ghost_pad_event_probe, res, NULL);
2998   gst_object_unref (internal_pad);
2999 
3000   dbin->output_streams = g_list_append (dbin->output_streams, res);
3001 
3002   return res;
3003 }
3004 
3005 static void
free_output_stream(GstDecodebin3 * dbin,DecodebinOutputStream * output)3006 free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output)
3007 {
3008   if (output->slot) {
3009     if (output->decoder_sink && output->decoder)
3010       gst_pad_unlink (output->slot->src_pad, output->decoder_sink);
3011 
3012     output->slot->output = NULL;
3013     output->slot = NULL;
3014   }
3015   gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
3016   gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL);
3017   gst_object_replace ((GstObject **) & output->decoder_src, NULL);
3018   if (output->src_exposed) {
3019     gst_element_remove_pad ((GstElement *) dbin, output->src_pad);
3020   }
3021   if (output->decoder) {
3022     gst_element_set_locked_state (output->decoder, TRUE);
3023     gst_element_set_state (output->decoder, GST_STATE_NULL);
3024     gst_bin_remove ((GstBin *) dbin, output->decoder);
3025   }
3026   g_free (output);
3027 }
3028 
3029 static GstStateChangeReturn
gst_decodebin3_change_state(GstElement * element,GstStateChange transition)3030 gst_decodebin3_change_state (GstElement * element, GstStateChange transition)
3031 {
3032   GstDecodebin3 *dbin = (GstDecodebin3 *) element;
3033   GstStateChangeReturn ret;
3034 
3035   /* Upwards */
3036   switch (transition) {
3037     default:
3038       break;
3039   }
3040   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3041   if (ret == GST_STATE_CHANGE_FAILURE)
3042     goto beach;
3043 
3044   switch (transition) {
3045     case GST_STATE_CHANGE_PAUSED_TO_READY:
3046     {
3047       GList *tmp;
3048 
3049       /* Free output streams */
3050       for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
3051         DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
3052         free_output_stream (dbin, output);
3053       }
3054       g_list_free (dbin->output_streams);
3055       dbin->output_streams = NULL;
3056       /* Free multiqueue slots */
3057       for (tmp = dbin->slots; tmp; tmp = tmp->next) {
3058         MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3059         free_multiqueue_slot (dbin, slot);
3060       }
3061       g_list_free (dbin->slots);
3062       dbin->slots = NULL;
3063       dbin->current_group_id = GST_GROUP_ID_INVALID;
3064       /* Free inputs */
3065       /* Reset the main input group id since it will get a new id on a new stream */
3066       dbin->main_input->group_id = GST_GROUP_ID_INVALID;
3067       /* Reset multiqueue to default interleave */
3068       g_object_set (dbin->multiqueue, "min-interleave-time",
3069           dbin->default_mq_min_interleave, NULL);
3070       dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
3071     }
3072       break;
3073     default:
3074       break;
3075   }
3076 beach:
3077   return ret;
3078 }
3079