1 /* GStreamer
2 *
3 * Copyright (C) <2015> Centricular Ltd
4 * @author: Edward Hervey <edward@centricular.com>
5 * @author: Jan Schmidt <jan@centricular.com>
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
16 *
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 #ifdef HAVE_CONFIG_H
24 #include "config.h"
25 #endif
26
27 #include <glib.h>
28 #include <glib-object.h>
29 #include <glib/gprintf.h>
30 #include <gst/gst.h>
31 #include <gst/pbutils/pbutils.h>
32
33 #include "gstplaybackelements.h"
34 #include "gstplay-enum.h"
35 #include "gstrawcaps.h"
36
37 /**
38 * SECTION:element-decodebin3
39 * @title: decodebin3
40 *
41 * #GstBin that auto-magically constructs a decoding pipeline using available
42 * decoders and demuxers via auto-plugging. The output is raw audio, video
43 * or subtitle streams.
44 *
45 * decodebin3 differs from the previous decodebin (decodebin2) in important ways:
46 *
47 * * supports publication and selection of stream information via
48 * GstStreamCollection messages and #GST_EVENT_SELECT_STREAMS events.
49 *
50 * * dynamically switches stream connections internally, and
51 * reuses decoder elements when stream selections change, so that in
52 * the normal case it maintains 1 decoder of each type (video/audio/subtitle)
53 * and only creates new elements when streams change and an existing decoder
54 * is not capable of handling the new format.
55 *
56 * * supports multiple input pads for the parallel decoding of auxiliary streams
57 * not muxed with the primary stream.
58 *
59 * * does not handle network stream buffering. decodebin3 expects that network stream
60 * buffering is handled upstream, before data is passed to it.
61 *
62 * > decodebin3 is still experimental API and a technology preview.
63 * > Its behaviour and exposed API is subject to change.
64 *
65 */
66
67 /*
68 * Global design
69 *
70 * 1) From sink pad to elementary streams (GstParseBin)
71 *
72 * The input sink pads are fed to GstParseBin. GstParseBin will feed them
73 * through typefind. When the caps are detected (or changed) we recursively
74 * figure out which demuxer, parser or depayloader is needed until we get to
75 * elementary streams.
76 *
77 * All elementary streams (whether decoded or not, whether exposed or not) are
78 * fed through multiqueue. There is only *one* multiqueue in decodebin3.
79 *
80 * => MultiQueue is the cornerstone.
81 * => No buffering before multiqueue
82 *
83 * 2) Elementary streams
84 *
85 * After GstParseBin, there are 3 main components:
86 * 1) Input Streams (provided by GstParseBin)
87 * 2) Multiqueue slots
88 * 3) Output Streams
89 *
90 * Input Streams correspond to the stream coming from GstParseBin and that gets
91 * fed into a multiqueue slot.
92 *
93 * Output Streams correspond to the combination of a (optional) decoder and an
94 * output ghostpad. Output Streams can be moved from one multiqueue slot to
95 * another, can reconfigure itself (different decoders), and can be
96 * added/removed depending on the configuration (all streams outputted, only one
97 * of each type, ...).
98 *
99 * Multiqueue slots correspond to a pair of sink/src pad from multiqueue. For
100 * each 'active' Input Stream there is a corresponding slot.
101 * Slots might have different streams on input and output (due to internal
102 * buffering).
103 *
104 * Due to internal queuing/buffering/..., all those components (might) behave
105 * asynchronously. Therefore probes will be used on each component source pad to
106 * detect various key-points:
107 * * EOS :
108 * the stream is done => Mark that component as done, optionally freeing/removing it
109 * * STREAM_START :
110 * a new stream is starting => link it further if needed
111 *
112 * 3) Gradual replacement
113 *
114 * If the caps change at any point in decodebin (input sink pad, demuxer output,
115 * multiqueue output, ..), we gradually replace (if needed) the following elements.
116 *
117 * This is handled by the probes in various locations:
118 * a) typefind output
119 * b) multiqueue input (source pad of Input Streams)
120 * c) multiqueue output (source pad of Multiqueue Slots)
121 * d) final output (target of source ghostpads)
122 *
123 * When CAPS event arrive at those points, one of three things can happen:
124 * a) There is no elements downstream yet, just create/link-to following elements
125 * b) There are downstream elements, do a ACCEPT_CAPS query
126 * b.1) The new CAPS are accepted, keep current configuration
127 * b.2) The new CAPS are not accepted, remove following elements then do a)
128 *
129 * Components:
130 *
131 * MultiQ Output
132 * Input(s) Slots Streams
133 * /-------------------------------------------\ /-----\ /------------- \
134 *
135 * +-------------------------------------------------------------------------+
136 * | |
137 * | +---------------------------------------------+ |
138 * | | GstParseBin(s) | |
139 * | | +--------------+ | +-----+ |
140 * | | | |---[parser]-[|--| Mul |---[ decoder ]-[|
141 * |]--[ typefind ]---| demuxer(s) |------------[| | ti | |
142 * | | | (if needed) |---[parser]-[|--| qu | |
143 * | | | |---[parser]-[|--| eu |---[ decoder ]-[|
144 * | | +--------------+ | +------ ^ |
145 * | +---------------------------------------------+ ^ | |
146 * | ^ | | |
147 * +-----------------------------------------------+--------+-------------+--+
148 * | | |
149 * | | |
150 * Probes --/--------/-------------/
151 *
152 * ATOMIC SWITCHING
153 *
154 * We want to ensure we re-use decoders when switching streams. This takes place
155 * at the multiqueue output level.
156 *
157 * MAIN CONCEPTS
158 * 1) Activating a stream (i.e. linking a slot to an output) is only done within
159 * the streaming thread in the multiqueue_src_probe() and only if the
160 stream is in the REQUESTED selection.
161 * 2) Deactivating a stream (i.e. unlinking a slot from an output) is also done
162 * within the stream thread, but only in a purposefully called IDLE probe
163 * that calls reassign_slot().
164 *
165 * Based on those two principles, 3 "selection" of streams (stream-id) are used:
166 * 1) requested_selection
167 * All streams within that list should be activated
168 * 2) active_selection
169 * List of streams that are exposed by decodebin
170 * 3) to_activate
171 * List of streams that will be moved to requested_selection in the
172 * reassign_slot() method (i.e. once a stream was deactivated, and the output
173 * was retargetted)
174 */
175
176
177 GST_DEBUG_CATEGORY_STATIC (decodebin3_debug);
178 #define GST_CAT_DEFAULT decodebin3_debug
179
180 #define GST_TYPE_DECODEBIN3 (gst_decodebin3_get_type ())
181
182 #define EXTRA_DEBUG 1
183
184 #define CUSTOM_FINAL_EOS_QUARK _custom_final_eos_quark_get ()
185 #define CUSTOM_FINAL_EOS_QUARK_DATA "custom-final-eos"
186 static GQuark
_custom_final_eos_quark_get(void)187 _custom_final_eos_quark_get (void)
188 {
189 static gsize g_quark;
190
191 if (g_once_init_enter (&g_quark)) {
192 gsize quark =
193 (gsize) g_quark_from_static_string ("decodebin3-custom-final-eos");
194 g_once_init_leave (&g_quark, quark);
195 }
196 return g_quark;
197 }
198
199 typedef struct _GstDecodebin3 GstDecodebin3;
200 typedef struct _GstDecodebin3Class GstDecodebin3Class;
201
202 typedef struct _DecodebinInputStream DecodebinInputStream;
203 typedef struct _DecodebinInput DecodebinInput;
204 typedef struct _DecodebinOutputStream DecodebinOutputStream;
205
206 struct _GstDecodebin3
207 {
208 GstBin bin;
209
210 /* input_lock protects the following variables */
211 GMutex input_lock;
212 /* Main input (static sink pad) */
213 DecodebinInput *main_input;
214 /* Supplementary input (request sink pads) */
215 GList *other_inputs;
216 /* counter for input */
217 guint32 input_counter;
218 /* Current stream group_id (default : GST_GROUP_ID_INVALID) */
219 /* FIXME : Needs to be reset appropriately (when upstream changes ?) */
220 guint32 current_group_id;
221 /* End of variables protected by input_lock */
222
223 GstElement *multiqueue;
224 GstClockTime default_mq_min_interleave;
225 GstClockTime current_mq_min_interleave;
226
227 /* selection_lock protects access to following variables */
228 GMutex selection_lock;
229 GList *input_streams; /* List of DecodebinInputStream for active collection */
230 GList *output_streams; /* List of DecodebinOutputStream used for output */
231 GList *slots; /* List of MultiQueueSlot */
232 guint slot_id;
233
234 /* Active collection */
235 GstStreamCollection *collection;
236 /* requested selection of stream-id to activate post-multiqueue */
237 GList *requested_selection;
238 /* list of stream-id currently activated in output */
239 GList *active_selection;
240 /* List of stream-id that need to be activated (after a stream switch for ex) */
241 GList *to_activate;
242 /* Pending select streams event */
243 guint32 select_streams_seqnum;
244 /* pending list of streams to select (from downstream) */
245 GList *pending_select_streams;
246 /* TRUE if requested_selection was updated, will become FALSE once
247 * it has fully transitioned to active */
248 gboolean selection_updated;
249 /* End of variables protected by selection_lock */
250
251 /* List of pending collections.
252 * FIXME : Is this really needed ? */
253 GList *pending_collection;
254
255 /* Factories */
256 GMutex factories_lock;
257 guint32 factories_cookie;
258 /* All DECODABLE factories */
259 GList *factories;
260 /* Only DECODER factories */
261 GList *decoder_factories;
262 /* DECODABLE but not DECODER factories */
263 GList *decodable_factories;
264
265 /* counters for pads */
266 guint32 apadcount, vpadcount, tpadcount, opadcount;
267
268 /* Properties */
269 GstCaps *caps;
270 };
271
272 struct _GstDecodebin3Class
273 {
274 GstBinClass class;
275
276 gint (*select_stream) (GstDecodebin3 * dbin,
277 GstStreamCollection * collection, GstStream * stream);
278 };
279
280 /* Input of decodebin, controls input pad and parsebin */
281 struct _DecodebinInput
282 {
283 GstDecodebin3 *dbin;
284
285 gboolean is_main;
286
287 GstPad *ghost_sink;
288 GstPad *parsebin_sink;
289
290 GstStreamCollection *collection; /* Active collection */
291
292 guint group_id;
293
294 GstElement *parsebin;
295
296 gulong pad_added_sigid;
297 gulong pad_removed_sigid;
298 gulong drained_sigid;
299
300 /* TRUE if the input got drained
301 * FIXME : When do we reset it if re-used ?
302 */
303 gboolean drained;
304
305 /* HACK : Remove these fields */
306 /* List of PendingPad structures */
307 GList *pending_pads;
308 };
309
310 /* Multiqueue Slots */
311 typedef struct _MultiQueueSlot
312 {
313 guint id;
314
315 GstDecodebin3 *dbin;
316 /* Type of stream handled by this slot */
317 GstStreamType type;
318
319 /* Linked input and output */
320 DecodebinInputStream *input;
321
322 /* pending => last stream received on sink pad */
323 GstStream *pending_stream;
324 /* active => last stream outputted on source pad */
325 GstStream *active_stream;
326
327 GstPad *sink_pad, *src_pad;
328
329 /* id of the MQ src_pad event probe */
330 gulong probe_id;
331
332 gboolean is_drained;
333
334 DecodebinOutputStream *output;
335 } MultiQueueSlot;
336
337 /* Streams that are exposed downstream (i.e. output) */
338 struct _DecodebinOutputStream
339 {
340 GstDecodebin3 *dbin;
341 /* The type of stream handled by this output stream */
342 GstStreamType type;
343
344 /* The slot to which this output stream is currently connected to */
345 MultiQueueSlot *slot;
346
347 GstElement *decoder; /* Optional */
348 GstPad *decoder_sink, *decoder_src;
349 gboolean linked;
350
351 /* ghostpad */
352 GstPad *src_pad;
353 /* Flag if ghost pad is exposed */
354 gboolean src_exposed;
355
356 /* Reported decoder latency */
357 GstClockTime decoder_latency;
358
359 /* keyframe dropping probe */
360 gulong drop_probe_id;
361 };
362
363 /* Pending pads from parsebin */
364 typedef struct _PendingPad
365 {
366 GstDecodebin3 *dbin;
367 DecodebinInput *input;
368 GstPad *pad;
369
370 gulong buffer_probe;
371 gulong event_probe;
372 gboolean saw_eos;
373 } PendingPad;
374
375 /* properties */
376 enum
377 {
378 PROP_0,
379 PROP_CAPS
380 };
381
382 /* signals */
383 enum
384 {
385 SIGNAL_SELECT_STREAM,
386 SIGNAL_ABOUT_TO_FINISH,
387 LAST_SIGNAL
388 };
389 static guint gst_decodebin3_signals[LAST_SIGNAL] = { 0 };
390
391 #define SELECTION_LOCK(dbin) G_STMT_START { \
392 GST_LOG_OBJECT (dbin, \
393 "selection locking from thread %p", \
394 g_thread_self ()); \
395 g_mutex_lock (&dbin->selection_lock); \
396 GST_LOG_OBJECT (dbin, \
397 "selection locked from thread %p", \
398 g_thread_self ()); \
399 } G_STMT_END
400
401 #define SELECTION_UNLOCK(dbin) G_STMT_START { \
402 GST_LOG_OBJECT (dbin, \
403 "selection unlocking from thread %p", \
404 g_thread_self ()); \
405 g_mutex_unlock (&dbin->selection_lock); \
406 } G_STMT_END
407
408 #define INPUT_LOCK(dbin) G_STMT_START { \
409 GST_LOG_OBJECT (dbin, \
410 "input locking from thread %p", \
411 g_thread_self ()); \
412 g_mutex_lock (&dbin->input_lock); \
413 GST_LOG_OBJECT (dbin, \
414 "input locked from thread %p", \
415 g_thread_self ()); \
416 } G_STMT_END
417
418 #define INPUT_UNLOCK(dbin) G_STMT_START { \
419 GST_LOG_OBJECT (dbin, \
420 "input unlocking from thread %p", \
421 g_thread_self ()); \
422 g_mutex_unlock (&dbin->input_lock); \
423 } G_STMT_END
424
425 GType gst_decodebin3_get_type (void);
426 #define gst_decodebin3_parent_class parent_class
427 G_DEFINE_TYPE (GstDecodebin3, gst_decodebin3, GST_TYPE_BIN);
428 #define _do_init \
429 GST_DEBUG_CATEGORY_INIT (decodebin3_debug, "decodebin3", 0, "decoder bin");\
430 playback_element_init (plugin);
431 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (decodebin3, "decodebin3", GST_RANK_NONE,
432 GST_TYPE_DECODEBIN3, _do_init);
433
434 static GstStaticCaps default_raw_caps = GST_STATIC_CAPS (DEFAULT_RAW_CAPS);
435
436 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
437 GST_PAD_SINK,
438 GST_PAD_ALWAYS,
439 GST_STATIC_CAPS_ANY);
440
441 static GstStaticPadTemplate request_sink_template =
442 GST_STATIC_PAD_TEMPLATE ("sink_%u",
443 GST_PAD_SINK,
444 GST_PAD_REQUEST,
445 GST_STATIC_CAPS_ANY);
446
447 static GstStaticPadTemplate video_src_template =
448 GST_STATIC_PAD_TEMPLATE ("video_%u",
449 GST_PAD_SRC,
450 GST_PAD_SOMETIMES,
451 GST_STATIC_CAPS_ANY);
452
453 static GstStaticPadTemplate audio_src_template =
454 GST_STATIC_PAD_TEMPLATE ("audio_%u",
455 GST_PAD_SRC,
456 GST_PAD_SOMETIMES,
457 GST_STATIC_CAPS_ANY);
458
459 static GstStaticPadTemplate text_src_template =
460 GST_STATIC_PAD_TEMPLATE ("text_%u",
461 GST_PAD_SRC,
462 GST_PAD_SOMETIMES,
463 GST_STATIC_CAPS_ANY);
464
465 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
466 GST_PAD_SRC,
467 GST_PAD_SOMETIMES,
468 GST_STATIC_CAPS_ANY);
469
470
471 static void gst_decodebin3_dispose (GObject * object);
472 static void gst_decodebin3_set_property (GObject * object, guint prop_id,
473 const GValue * value, GParamSpec * pspec);
474 static void gst_decodebin3_get_property (GObject * object, guint prop_id,
475 GValue * value, GParamSpec * pspec);
476
477 static gboolean parsebin_autoplug_continue_cb (GstElement *
478 parsebin, GstPad * pad, GstCaps * caps, GstDecodebin3 * dbin);
479
480 static gint
gst_decodebin3_select_stream(GstDecodebin3 * dbin,GstStreamCollection * collection,GstStream * stream)481 gst_decodebin3_select_stream (GstDecodebin3 * dbin,
482 GstStreamCollection * collection, GstStream * stream)
483 {
484 GST_LOG_OBJECT (dbin, "default select-stream, returning -1");
485
486 return -1;
487 }
488
489 static GstPad *gst_decodebin3_request_new_pad (GstElement * element,
490 GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
491 static void gst_decodebin3_handle_message (GstBin * bin, GstMessage * message);
492 static GstStateChangeReturn gst_decodebin3_change_state (GstElement * element,
493 GstStateChange transition);
494 static gboolean gst_decodebin3_send_event (GstElement * element,
495 GstEvent * event);
496
497 static void gst_decode_bin_update_factories_list (GstDecodebin3 * dbin);
498 #if 0
499 static gboolean have_factory (GstDecodebin3 * dbin, GstCaps * caps,
500 GstElementFactoryListType ftype);
501 #endif
502
503 static void free_input (GstDecodebin3 * dbin, DecodebinInput * input);
504 static void free_input_async (GstDecodebin3 * dbin, DecodebinInput * input);
505 static DecodebinInput *create_new_input (GstDecodebin3 * dbin, gboolean main);
506 static gboolean set_input_group_id (DecodebinInput * input, guint32 * group_id);
507
508 static void reconfigure_output_stream (DecodebinOutputStream * output,
509 MultiQueueSlot * slot);
510 static void free_output_stream (GstDecodebin3 * dbin,
511 DecodebinOutputStream * output);
512 static DecodebinOutputStream *create_output_stream (GstDecodebin3 * dbin,
513 GstStreamType type);
514
515 static GstPadProbeReturn slot_unassign_probe (GstPad * pad,
516 GstPadProbeInfo * info, MultiQueueSlot * slot);
517 static gboolean reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
518 static MultiQueueSlot *get_slot_for_input (GstDecodebin3 * dbin,
519 DecodebinInputStream * input);
520 static void link_input_to_slot (DecodebinInputStream * input,
521 MultiQueueSlot * slot);
522 static void free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot);
523 static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
524 MultiQueueSlot * slot);
525
526 static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
527 static void update_requested_selection (GstDecodebin3 * dbin);
528
529 /* FIXME: Really make all the parser stuff a self-contained helper object */
530 #include "gstdecodebin3-parse.c"
531
532 static gboolean
_gst_int_accumulator(GSignalInvocationHint * ihint,GValue * return_accu,const GValue * handler_return,gpointer dummy)533 _gst_int_accumulator (GSignalInvocationHint * ihint,
534 GValue * return_accu, const GValue * handler_return, gpointer dummy)
535 {
536 gint res = g_value_get_int (handler_return);
537
538 g_value_set_int (return_accu, res);
539
540 if (res == -1)
541 return TRUE;
542
543 return FALSE;
544 }
545
546 static void
gst_decodebin3_class_init(GstDecodebin3Class * klass)547 gst_decodebin3_class_init (GstDecodebin3Class * klass)
548 {
549 GObjectClass *gobject_klass = (GObjectClass *) klass;
550 GstElementClass *element_class = (GstElementClass *) klass;
551 GstBinClass *bin_klass = (GstBinClass *) klass;
552
553 gobject_klass->dispose = gst_decodebin3_dispose;
554 gobject_klass->set_property = gst_decodebin3_set_property;
555 gobject_klass->get_property = gst_decodebin3_get_property;
556
557 /* FIXME : ADD PROPERTIES ! */
558 g_object_class_install_property (gobject_klass, PROP_CAPS,
559 g_param_spec_boxed ("caps", "Caps",
560 "The caps on which to stop decoding. (NULL = default)",
561 GST_TYPE_CAPS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
562
563 /* FIXME : ADD SIGNALS ! */
564 /**
565 * GstDecodebin3::select-stream
566 * @decodebin: a #GstDecodebin3
567 * @collection: a #GstStreamCollection
568 * @stream: a #GstStream
569 *
570 * This signal is emitted whenever @decodebin needs to decide whether
571 * to expose a @stream of a given @collection.
572 *
573 * Note that the prefered way to select streams is to listen to
574 * GST_MESSAGE_STREAM_COLLECTION on the bus and send a
575 * GST_EVENT_SELECT_STREAMS with the streams the user wants.
576 *
577 * Returns: 1 if the stream should be selected, 0 if it shouldn't be selected.
578 * A value of -1 (default) lets @decodebin decide what to do with the stream.
579 * */
580 gst_decodebin3_signals[SIGNAL_SELECT_STREAM] =
581 g_signal_new ("select-stream", G_TYPE_FROM_CLASS (klass),
582 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstDecodebin3Class, select_stream),
583 _gst_int_accumulator, NULL, NULL,
584 G_TYPE_INT, 2, GST_TYPE_STREAM_COLLECTION, GST_TYPE_STREAM);
585
586 /**
587 * GstDecodebin3::about-to-finish:
588 *
589 * This signal is emitted when the data for the selected URI is
590 * entirely buffered and it is safe to specify another URI.
591 */
592 gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH] =
593 g_signal_new ("about-to-finish", G_TYPE_FROM_CLASS (klass),
594 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0, G_TYPE_NONE);
595
596
597 element_class->request_new_pad =
598 GST_DEBUG_FUNCPTR (gst_decodebin3_request_new_pad);
599 element_class->change_state = GST_DEBUG_FUNCPTR (gst_decodebin3_change_state);
600 element_class->send_event = GST_DEBUG_FUNCPTR (gst_decodebin3_send_event);
601
602 gst_element_class_add_pad_template (element_class,
603 gst_static_pad_template_get (&sink_template));
604 gst_element_class_add_pad_template (element_class,
605 gst_static_pad_template_get (&request_sink_template));
606 gst_element_class_add_pad_template (element_class,
607 gst_static_pad_template_get (&video_src_template));
608 gst_element_class_add_pad_template (element_class,
609 gst_static_pad_template_get (&audio_src_template));
610 gst_element_class_add_pad_template (element_class,
611 gst_static_pad_template_get (&text_src_template));
612 gst_element_class_add_pad_template (element_class,
613 gst_static_pad_template_get (&src_template));
614
615 gst_element_class_set_static_metadata (element_class,
616 "Decoder Bin 3", "Generic/Bin/Decoder",
617 "Autoplug and decode to raw media",
618 "Edward Hervey <edward@centricular.com>");
619
620 bin_klass->handle_message = gst_decodebin3_handle_message;
621
622 klass->select_stream = gst_decodebin3_select_stream;
623 }
624
625 static void
gst_decodebin3_init(GstDecodebin3 * dbin)626 gst_decodebin3_init (GstDecodebin3 * dbin)
627 {
628 /* Create main input */
629 dbin->main_input = create_new_input (dbin, TRUE);
630
631 dbin->multiqueue = gst_element_factory_make ("multiqueue", NULL);
632 g_object_get (dbin->multiqueue, "min-interleave-time",
633 &dbin->default_mq_min_interleave, NULL);
634 dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
635 g_object_set (dbin->multiqueue, "sync-by-running-time", TRUE,
636 "max-size-buffers", 0, "use-interleave", TRUE, NULL);
637 gst_bin_add ((GstBin *) dbin, dbin->multiqueue);
638
639 dbin->current_group_id = GST_GROUP_ID_INVALID;
640
641 g_mutex_init (&dbin->factories_lock);
642 g_mutex_init (&dbin->selection_lock);
643 g_mutex_init (&dbin->input_lock);
644
645 dbin->caps = gst_static_caps_get (&default_raw_caps);
646
647 GST_OBJECT_FLAG_SET (dbin, GST_BIN_FLAG_STREAMS_AWARE);
648 }
649
650 static void
gst_decodebin3_dispose(GObject * object)651 gst_decodebin3_dispose (GObject * object)
652 {
653 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
654 GList *walk, *next;
655
656 if (dbin->factories)
657 gst_plugin_feature_list_free (dbin->factories);
658 if (dbin->decoder_factories)
659 g_list_free (dbin->decoder_factories);
660 if (dbin->decodable_factories)
661 g_list_free (dbin->decodable_factories);
662 g_list_free_full (dbin->requested_selection, g_free);
663 g_list_free (dbin->active_selection);
664 g_list_free (dbin->to_activate);
665 g_list_free (dbin->pending_select_streams);
666 g_clear_object (&dbin->collection);
667
668 free_input (dbin, dbin->main_input);
669
670 for (walk = dbin->other_inputs; walk; walk = next) {
671 DecodebinInput *input = walk->data;
672
673 next = g_list_next (walk);
674
675 free_input (dbin, input);
676 dbin->other_inputs = g_list_delete_link (dbin->other_inputs, walk);
677 }
678
679 G_OBJECT_CLASS (parent_class)->dispose (object);
680 }
681
682 static void
gst_decodebin3_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)683 gst_decodebin3_set_property (GObject * object, guint prop_id,
684 const GValue * value, GParamSpec * pspec)
685 {
686 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
687
688 /* FIXME : IMPLEMENT */
689 switch (prop_id) {
690 case PROP_CAPS:
691 GST_OBJECT_LOCK (dbin);
692 if (dbin->caps)
693 gst_caps_unref (dbin->caps);
694 dbin->caps = g_value_dup_boxed (value);
695 GST_OBJECT_UNLOCK (dbin);
696 break;
697 default:
698 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
699 break;
700 }
701 }
702
703 static void
gst_decodebin3_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)704 gst_decodebin3_get_property (GObject * object, guint prop_id, GValue * value,
705 GParamSpec * pspec)
706 {
707 GstDecodebin3 *dbin = (GstDecodebin3 *) object;
708
709 /* FIXME : IMPLEMENT */
710 switch (prop_id) {
711 case PROP_CAPS:
712 GST_OBJECT_LOCK (dbin);
713 g_value_set_boxed (value, dbin->caps);
714 GST_OBJECT_UNLOCK (dbin);
715 break;
716 default:
717 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
718 break;
719 }
720 }
721
722 static gboolean
parsebin_autoplug_continue_cb(GstElement * parsebin,GstPad * pad,GstCaps * caps,GstDecodebin3 * dbin)723 parsebin_autoplug_continue_cb (GstElement * parsebin, GstPad * pad,
724 GstCaps * caps, GstDecodebin3 * dbin)
725 {
726 GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
727
728 /* If it matches our target caps, expose it */
729 if (gst_caps_can_intersect (caps, dbin->caps))
730 return FALSE;
731
732 return TRUE;
733 }
734
735 /* This method should be called whenever a STREAM_START event
736 * comes out of a given parsebin.
737 * The caller shall replace the group_id if the function returns TRUE */
738 static gboolean
set_input_group_id(DecodebinInput * input,guint32 * group_id)739 set_input_group_id (DecodebinInput * input, guint32 * group_id)
740 {
741 GstDecodebin3 *dbin = input->dbin;
742
743 if (input->group_id != *group_id) {
744 if (input->group_id != GST_GROUP_ID_INVALID)
745 GST_WARNING_OBJECT (dbin,
746 "Group id changed (%" G_GUINT32_FORMAT " -> %" G_GUINT32_FORMAT
747 ") on input %p ", input->group_id, *group_id, input);
748 input->group_id = *group_id;
749 }
750
751 if (*group_id != dbin->current_group_id) {
752 if (dbin->current_group_id == GST_GROUP_ID_INVALID) {
753 GST_DEBUG_OBJECT (dbin, "Setting current group id to %" G_GUINT32_FORMAT,
754 *group_id);
755 dbin->current_group_id = *group_id;
756 }
757 *group_id = dbin->current_group_id;
758 return TRUE;
759 }
760
761 return FALSE;
762 }
763
764 static void
parsebin_drained_cb(GstElement * parsebin,DecodebinInput * input)765 parsebin_drained_cb (GstElement * parsebin, DecodebinInput * input)
766 {
767 GstDecodebin3 *dbin = input->dbin;
768 gboolean all_drained;
769 GList *tmp;
770
771 GST_INFO_OBJECT (dbin, "input %p drained", input);
772 input->drained = TRUE;
773
774 all_drained = dbin->main_input->drained;
775 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
776 DecodebinInput *data = (DecodebinInput *) tmp->data;
777
778 all_drained &= data->drained;
779 }
780
781 if (all_drained) {
782 GST_INFO_OBJECT (dbin, "All inputs drained. Posting about-to-finish");
783 g_signal_emit (dbin, gst_decodebin3_signals[SIGNAL_ABOUT_TO_FINISH], 0,
784 NULL);
785 }
786 }
787
788 /* Call with INPUT_LOCK taken */
789 static gboolean
ensure_input_parsebin(GstDecodebin3 * dbin,DecodebinInput * input)790 ensure_input_parsebin (GstDecodebin3 * dbin, DecodebinInput * input)
791 {
792 gboolean set_state = FALSE;
793
794 if (input->parsebin == NULL) {
795 input->parsebin = gst_element_factory_make ("parsebin", NULL);
796 if (input->parsebin == NULL)
797 goto no_parsebin;
798 input->parsebin = gst_object_ref (input->parsebin);
799 input->parsebin_sink = gst_element_get_static_pad (input->parsebin, "sink");
800 input->pad_added_sigid =
801 g_signal_connect (input->parsebin, "pad-added",
802 (GCallback) parsebin_pad_added_cb, input);
803 input->pad_removed_sigid =
804 g_signal_connect (input->parsebin, "pad-removed",
805 (GCallback) parsebin_pad_removed_cb, input);
806 input->drained_sigid =
807 g_signal_connect (input->parsebin, "drained",
808 (GCallback) parsebin_drained_cb, input);
809 g_signal_connect (input->parsebin, "autoplug-continue",
810 (GCallback) parsebin_autoplug_continue_cb, dbin);
811 }
812
813 if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) != GST_OBJECT (dbin)) {
814 /* The state lock is taken so that we ensure we are the one (de)activating
815 * parsebin. We need to do this to ensure any activation taking place in
816 * parsebin (including by elements doing upstream activation) are done
817 * within the same thread. */
818 GST_STATE_LOCK (input->parsebin);
819 gst_bin_add (GST_BIN (dbin), input->parsebin);
820 set_state = TRUE;
821 }
822
823 gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink),
824 input->parsebin_sink);
825
826 if (set_state) {
827 gst_element_sync_state_with_parent (input->parsebin);
828 GST_STATE_UNLOCK (input->parsebin);
829 }
830
831 return TRUE;
832
833 /* ERRORS */
834 no_parsebin:
835 {
836 gst_element_post_message ((GstElement *) dbin,
837 gst_missing_element_message_new ((GstElement *) dbin, "parsebin"));
838 return FALSE;
839 }
840 }
841
842 static GstPadLinkReturn
gst_decodebin3_input_pad_link(GstPad * pad,GstObject * parent,GstPad * peer)843 gst_decodebin3_input_pad_link (GstPad * pad, GstObject * parent, GstPad * peer)
844 {
845 GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
846 GstPadLinkReturn res = GST_PAD_LINK_OK;
847 DecodebinInput *input;
848
849 GST_LOG_OBJECT (parent, "Got link on input pad %" GST_PTR_FORMAT
850 ". Creating parsebin if needed", pad);
851
852 if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
853 goto fail;
854
855 INPUT_LOCK (dbin);
856 if (!ensure_input_parsebin (dbin, input))
857 res = GST_PAD_LINK_REFUSED;
858 INPUT_UNLOCK (dbin);
859
860 return res;
861 fail:
862 GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
863 return GST_PAD_LINK_REFUSED;
864 }
865
866 /* Drop duration query during _input_pad_unlink */
867 static GstPadProbeReturn
query_duration_drop_probe(GstPad * pad,GstPadProbeInfo * info,DecodebinInput * input)868 query_duration_drop_probe (GstPad * pad, GstPadProbeInfo * info,
869 DecodebinInput * input)
870 {
871 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
872
873 if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
874 GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
875 if (GST_QUERY_TYPE (query) == GST_QUERY_DURATION) {
876 GST_LOG_OBJECT (pad, "stop forwarding query duration");
877 ret = GST_PAD_PROBE_HANDLED;
878 }
879 }
880
881 return ret;
882 }
883
884 static void
gst_decodebin3_input_pad_unlink(GstPad * pad,GstObject * parent)885 gst_decodebin3_input_pad_unlink (GstPad * pad, GstObject * parent)
886 {
887 GstDecodebin3 *dbin = (GstDecodebin3 *) parent;
888 DecodebinInput *input;
889
890 GST_LOG_OBJECT (parent, "Got unlink on input pad %" GST_PTR_FORMAT
891 ". Removing parsebin.", pad);
892
893 if ((input = g_object_get_data (G_OBJECT (pad), "decodebin.input")) == NULL)
894 goto fail;
895
896 INPUT_LOCK (dbin);
897 if (input->parsebin == NULL) {
898 INPUT_UNLOCK (dbin);
899 return;
900 }
901
902 if (GST_OBJECT_PARENT (GST_OBJECT (input->parsebin)) == GST_OBJECT (dbin)) {
903 GstStreamCollection *collection = NULL;
904 gulong probe_id = gst_pad_add_probe (input->parsebin_sink,
905 GST_PAD_PROBE_TYPE_QUERY_UPSTREAM,
906 (GstPadProbeCallback) query_duration_drop_probe, input, NULL);
907
908 /* Clear stream-collection corresponding to current INPUT and post new
909 * stream-collection message, if needed */
910 if (input->collection) {
911 gst_object_unref (input->collection);
912 input->collection = NULL;
913 }
914
915 SELECTION_LOCK (dbin);
916 collection = get_merged_collection (dbin);
917 if (collection && collection != dbin->collection) {
918 GstMessage *msg;
919 GST_DEBUG_OBJECT (dbin, "Update Stream Collection");
920
921 if (dbin->collection)
922 gst_object_unref (dbin->collection);
923 dbin->collection = collection;
924 dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
925
926 msg =
927 gst_message_new_stream_collection ((GstObject *) dbin,
928 dbin->collection);
929
930 SELECTION_UNLOCK (dbin);
931 gst_element_post_message (GST_ELEMENT_CAST (dbin), msg);
932 update_requested_selection (dbin);
933 } else {
934 if (collection)
935 gst_object_unref (collection);
936 SELECTION_UNLOCK (dbin);
937 }
938
939 gst_bin_remove (GST_BIN (dbin), input->parsebin);
940 gst_element_set_state (input->parsebin, GST_STATE_NULL);
941 g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
942 g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
943 g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
944 gst_pad_remove_probe (input->parsebin_sink, probe_id);
945 gst_object_unref (input->parsebin);
946 gst_object_unref (input->parsebin_sink);
947
948 input->parsebin = NULL;
949 input->parsebin_sink = NULL;
950
951 if (!input->is_main) {
952 dbin->other_inputs = g_list_remove (dbin->other_inputs, input);
953 free_input_async (dbin, input);
954 }
955 }
956 INPUT_UNLOCK (dbin);
957 return;
958
959 fail:
960 GST_ERROR_OBJECT (parent, "Failed to retrieve input state from ghost pad");
961 return;
962 }
963
964 static void
free_input(GstDecodebin3 * dbin,DecodebinInput * input)965 free_input (GstDecodebin3 * dbin, DecodebinInput * input)
966 {
967 GST_DEBUG ("Freeing input %p", input);
968 gst_ghost_pad_set_target (GST_GHOST_PAD (input->ghost_sink), NULL);
969 gst_element_remove_pad (GST_ELEMENT (dbin), input->ghost_sink);
970 if (input->parsebin) {
971 g_signal_handler_disconnect (input->parsebin, input->pad_removed_sigid);
972 g_signal_handler_disconnect (input->parsebin, input->pad_added_sigid);
973 g_signal_handler_disconnect (input->parsebin, input->drained_sigid);
974 gst_element_set_state (input->parsebin, GST_STATE_NULL);
975 gst_object_unref (input->parsebin);
976 gst_object_unref (input->parsebin_sink);
977 }
978 if (input->collection)
979 gst_object_unref (input->collection);
980 g_free (input);
981 }
982
983 static void
free_input_async(GstDecodebin3 * dbin,DecodebinInput * input)984 free_input_async (GstDecodebin3 * dbin, DecodebinInput * input)
985 {
986 GST_LOG_OBJECT (dbin, "pushing input %p on thread pool to free", input);
987 gst_element_call_async (GST_ELEMENT_CAST (dbin),
988 (GstElementCallAsyncFunc) free_input, input, NULL);
989 }
990
991 /* Call with INPUT_LOCK taken */
992 static DecodebinInput *
create_new_input(GstDecodebin3 * dbin,gboolean main)993 create_new_input (GstDecodebin3 * dbin, gboolean main)
994 {
995 DecodebinInput *input;
996
997 input = g_new0 (DecodebinInput, 1);
998 input->dbin = dbin;
999 input->is_main = main;
1000 input->group_id = GST_GROUP_ID_INVALID;
1001 if (main)
1002 input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
1003 else {
1004 gchar *pad_name = g_strdup_printf ("sink_%u", dbin->input_counter++);
1005 input->ghost_sink = gst_ghost_pad_new_no_target (pad_name, GST_PAD_SINK);
1006 g_free (pad_name);
1007 }
1008 g_object_set_data (G_OBJECT (input->ghost_sink), "decodebin.input", input);
1009 gst_pad_set_link_function (input->ghost_sink, gst_decodebin3_input_pad_link);
1010 gst_pad_set_unlink_function (input->ghost_sink,
1011 gst_decodebin3_input_pad_unlink);
1012
1013 gst_pad_set_active (input->ghost_sink, TRUE);
1014 gst_element_add_pad ((GstElement *) dbin, input->ghost_sink);
1015
1016 return input;
1017
1018 }
1019
1020 static GstPad *
gst_decodebin3_request_new_pad(GstElement * element,GstPadTemplate * temp,const gchar * name,const GstCaps * caps)1021 gst_decodebin3_request_new_pad (GstElement * element, GstPadTemplate * temp,
1022 const gchar * name, const GstCaps * caps)
1023 {
1024 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
1025 DecodebinInput *input;
1026 GstPad *res = NULL;
1027
1028 /* We are ignoring names for the time being, not sure it makes any sense
1029 * within the context of decodebin3 ... */
1030 input = create_new_input (dbin, FALSE);
1031 if (input) {
1032 INPUT_LOCK (dbin);
1033 dbin->other_inputs = g_list_append (dbin->other_inputs, input);
1034 res = input->ghost_sink;
1035 INPUT_UNLOCK (dbin);
1036 }
1037
1038 return res;
1039 }
1040
1041 /* Must be called with factories lock! */
1042 static void
gst_decode_bin_update_factories_list(GstDecodebin3 * dbin)1043 gst_decode_bin_update_factories_list (GstDecodebin3 * dbin)
1044 {
1045 guint cookie;
1046
1047 cookie = gst_registry_get_feature_list_cookie (gst_registry_get ());
1048 if (!dbin->factories || dbin->factories_cookie != cookie) {
1049 GList *tmp;
1050 if (dbin->factories)
1051 gst_plugin_feature_list_free (dbin->factories);
1052 if (dbin->decoder_factories)
1053 g_list_free (dbin->decoder_factories);
1054 if (dbin->decodable_factories)
1055 g_list_free (dbin->decodable_factories);
1056 dbin->factories =
1057 gst_element_factory_list_get_elements
1058 (GST_ELEMENT_FACTORY_TYPE_DECODABLE, GST_RANK_MARGINAL);
1059 dbin->factories =
1060 g_list_sort (dbin->factories, gst_plugin_feature_rank_compare_func);
1061 dbin->factories_cookie = cookie;
1062
1063 /* Filter decoder and other decodables */
1064 dbin->decoder_factories = NULL;
1065 dbin->decodable_factories = NULL;
1066 for (tmp = dbin->factories; tmp; tmp = tmp->next) {
1067 GstElementFactory *fact = (GstElementFactory *) tmp->data;
1068 if (gst_element_factory_list_is_type (fact,
1069 GST_ELEMENT_FACTORY_TYPE_DECODER))
1070 dbin->decoder_factories = g_list_append (dbin->decoder_factories, fact);
1071 else
1072 dbin->decodable_factories =
1073 g_list_append (dbin->decodable_factories, fact);
1074 }
1075 }
1076 }
1077
1078 /* Must be called with appropriate lock if list is a protected variable */
1079 static const gchar *
stream_in_list(GList * list,const gchar * sid)1080 stream_in_list (GList * list, const gchar * sid)
1081 {
1082 GList *tmp;
1083
1084 #if EXTRA_DEBUG
1085 for (tmp = list; tmp; tmp = tmp->next) {
1086 gchar *osid = (gchar *) tmp->data;
1087 GST_DEBUG ("Checking %s against %s", sid, osid);
1088 }
1089 #endif
1090
1091 for (tmp = list; tmp; tmp = tmp->next) {
1092 const gchar *osid = (gchar *) tmp->data;
1093 if (!g_strcmp0 (sid, osid))
1094 return osid;
1095 }
1096
1097 return NULL;
1098 }
1099
1100 static gboolean
stream_list_equal(GList * lista,GList * listb)1101 stream_list_equal (GList * lista, GList * listb)
1102 {
1103 GList *tmp;
1104
1105 if (g_list_length (lista) != g_list_length (listb))
1106 return FALSE;
1107
1108 for (tmp = lista; tmp; tmp = tmp->next) {
1109 gchar *osid = tmp->data;
1110 if (!stream_in_list (listb, osid))
1111 return FALSE;
1112 }
1113
1114 return TRUE;
1115 }
1116
1117 static void
update_requested_selection(GstDecodebin3 * dbin)1118 update_requested_selection (GstDecodebin3 * dbin)
1119 {
1120 guint i, nb;
1121 GList *tmp = NULL;
1122 gboolean all_user_selected = TRUE;
1123 GstStreamType used_types = 0;
1124 GstStreamCollection *collection;
1125
1126 /* 1. Is there a pending SELECT_STREAMS we can return straight away since
1127 * the switch handler will take care of the pending selection */
1128 SELECTION_LOCK (dbin);
1129 if (dbin->pending_select_streams) {
1130 GST_DEBUG_OBJECT (dbin,
1131 "No need to create pending selection, SELECT_STREAMS underway");
1132 goto beach;
1133 }
1134
1135 collection = dbin->collection;
1136 if (G_UNLIKELY (collection == NULL)) {
1137 GST_DEBUG_OBJECT (dbin, "No current GstStreamCollection");
1138 goto beach;
1139 }
1140 nb = gst_stream_collection_get_size (collection);
1141
1142 /* 2. If not, are we in EXPOSE_ALL_MODE ? If so, match everything */
1143 GST_FIXME_OBJECT (dbin, "Implement EXPOSE_ALL_MODE");
1144
1145 /* 3. If not, check if we already have some of the streams in the
1146 * existing active/requested selection */
1147 for (i = 0; i < nb; i++) {
1148 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1149 const gchar *sid = gst_stream_get_stream_id (stream);
1150 gint request = -1;
1151 /* Fire select-stream signal to see if outside components want to
1152 * hint at which streams should be selected */
1153 g_signal_emit (G_OBJECT (dbin),
1154 gst_decodebin3_signals[SIGNAL_SELECT_STREAM], 0, collection, stream,
1155 &request);
1156 GST_DEBUG_OBJECT (dbin, "stream %s , request:%d", sid, request);
1157
1158 if (request == -1)
1159 all_user_selected = FALSE;
1160 if (request == 1 || (request == -1
1161 && (stream_in_list (dbin->requested_selection, sid)
1162 || stream_in_list (dbin->active_selection, sid)))) {
1163 GstStreamType curtype = gst_stream_get_stream_type (stream);
1164 if (request == 1)
1165 GST_DEBUG_OBJECT (dbin,
1166 "Using stream requested by 'select-stream' signal : %s", sid);
1167 else
1168 GST_DEBUG_OBJECT (dbin,
1169 "Re-using stream already present in requested or active selection : %s",
1170 sid);
1171 tmp = g_list_append (tmp, (gchar *) sid);
1172 used_types |= curtype;
1173 }
1174 }
1175
1176 /* 4. If the user didn't explicitly selected all streams, match one stream of each type */
1177 if (!all_user_selected && dbin->select_streams_seqnum == GST_SEQNUM_INVALID) {
1178 for (i = 0; i < nb; i++) {
1179 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1180 GstStreamType curtype = gst_stream_get_stream_type (stream);
1181 if (curtype != GST_STREAM_TYPE_UNKNOWN && !(used_types & curtype)) {
1182 const gchar *sid = gst_stream_get_stream_id (stream);
1183 GST_DEBUG_OBJECT (dbin,
1184 "Automatically selecting stream '%s' of type %s", sid,
1185 gst_stream_type_get_name (curtype));
1186 tmp = g_list_append (tmp, (gchar *) sid);
1187 used_types |= curtype;
1188 }
1189 }
1190 }
1191
1192 beach:
1193 if (stream_list_equal (tmp, dbin->requested_selection)) {
1194 /* If the selection is equal, there is nothign to do */
1195 GST_DEBUG_OBJECT (dbin, "Dropping duplicate selection");
1196 g_list_free (tmp);
1197 tmp = NULL;
1198 }
1199
1200 if (tmp) {
1201 /* Finally set the requested selection */
1202 if (dbin->requested_selection) {
1203 GST_FIXME_OBJECT (dbin,
1204 "Replacing non-NULL requested_selection, what should we do ??");
1205 g_list_free_full (dbin->requested_selection, g_free);
1206 }
1207 dbin->requested_selection =
1208 g_list_copy_deep (tmp, (GCopyFunc) g_strdup, NULL);
1209 dbin->selection_updated = TRUE;
1210 g_list_free (tmp);
1211 }
1212 SELECTION_UNLOCK (dbin);
1213 }
1214
1215 /* sort_streams:
1216 * GCompareFunc to use with lists of GstStream.
1217 * Sorts GstStreams by stream type and SELECT flag and stream-id
1218 * First video, then audio, then others.
1219 *
1220 * Return: negative if a<b, 0 if a==b, positive if a>b
1221 */
1222 static gint
sort_streams(GstStream * sa,GstStream * sb)1223 sort_streams (GstStream * sa, GstStream * sb)
1224 {
1225 GstStreamType typea, typeb;
1226 GstStreamFlags flaga, flagb;
1227 const gchar *ida, *idb;
1228 gint ret = 0;
1229
1230 typea = gst_stream_get_stream_type (sa);
1231 typeb = gst_stream_get_stream_type (sb);
1232
1233 GST_LOG ("sa(%s), sb(%s)", gst_stream_get_stream_id (sa),
1234 gst_stream_get_stream_id (sb));
1235
1236 /* Sort by stream type. First video, then audio, then others(text, container, unknown) */
1237 if (typea != typeb) {
1238 if (typea & GST_STREAM_TYPE_VIDEO)
1239 ret = -1;
1240 else if (typea & GST_STREAM_TYPE_AUDIO)
1241 ret = (!(typeb & GST_STREAM_TYPE_VIDEO)) ? -1 : 1;
1242 else if (typea & GST_STREAM_TYPE_TEXT)
1243 ret = (!(typeb & GST_STREAM_TYPE_VIDEO)
1244 && !(typeb & GST_STREAM_TYPE_AUDIO)) ? -1 : 1;
1245 else if (typea & GST_STREAM_TYPE_CONTAINER)
1246 ret = (typeb & GST_STREAM_TYPE_UNKNOWN) ? -1 : 1;
1247 else
1248 ret = 1;
1249
1250 if (ret != 0) {
1251 GST_LOG ("Sort by stream-type: %d", ret);
1252 return ret;
1253 }
1254 }
1255
1256 /* Sort by SELECT flag, if stream type is same. */
1257 flaga = gst_stream_get_stream_flags (sa);
1258 flagb = gst_stream_get_stream_flags (sb);
1259
1260 ret =
1261 (flaga & GST_STREAM_FLAG_SELECT) ? ((flagb & GST_STREAM_FLAG_SELECT) ? 0 :
1262 -1) : ((flagb & GST_STREAM_FLAG_SELECT) ? 1 : 0);
1263
1264 if (ret != 0) {
1265 GST_LOG ("Sort by SELECT flag: %d", ret);
1266 return ret;
1267 }
1268
1269 /* Sort by stream-id, if otherwise the same. */
1270 ida = gst_stream_get_stream_id (sa);
1271 idb = gst_stream_get_stream_id (sb);
1272 ret = g_strcmp0 (ida, idb);
1273
1274 GST_LOG ("Sort by stream-id: %d", ret);
1275
1276 return ret;
1277 }
1278
1279 /* Call with INPUT_LOCK taken */
1280 static GstStreamCollection *
get_merged_collection(GstDecodebin3 * dbin)1281 get_merged_collection (GstDecodebin3 * dbin)
1282 {
1283 gboolean needs_merge = FALSE;
1284 GstStreamCollection *res = NULL;
1285 GList *tmp;
1286 GList *unsorted_streams = NULL;
1287 guint i, nb_stream;
1288
1289 /* First check if we need to do a merge or just return the only collection */
1290 res = dbin->main_input->collection;
1291
1292 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1293 DecodebinInput *input = (DecodebinInput *) tmp->data;
1294 if (input->collection) {
1295 if (res) {
1296 needs_merge = TRUE;
1297 break;
1298 }
1299 res = input->collection;
1300 }
1301 }
1302
1303 if (!needs_merge) {
1304 GST_DEBUG_OBJECT (dbin, "No need to merge, returning %p", res);
1305 return res ? gst_object_ref (res) : NULL;
1306 }
1307
1308 /* We really need to create a new collection */
1309 /* FIXME : Some numbering scheme maybe ?? */
1310 res = gst_stream_collection_new ("decodebin3");
1311 if (dbin->main_input->collection) {
1312 nb_stream = gst_stream_collection_get_size (dbin->main_input->collection);
1313 GST_DEBUG_OBJECT (dbin, "main input %p %d", dbin->main_input, nb_stream);
1314 for (i = 0; i < nb_stream; i++) {
1315 GstStream *stream =
1316 gst_stream_collection_get_stream (dbin->main_input->collection, i);
1317 unsorted_streams = g_list_append (unsorted_streams, stream);
1318 }
1319 }
1320
1321 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1322 DecodebinInput *input = (DecodebinInput *) tmp->data;
1323 GST_DEBUG_OBJECT (dbin, "input %p , collection %p", input,
1324 input->collection);
1325 if (input->collection) {
1326 nb_stream = gst_stream_collection_get_size (input->collection);
1327 GST_DEBUG_OBJECT (dbin, "nb_stream : %d", nb_stream);
1328 for (i = 0; i < nb_stream; i++) {
1329 GstStream *stream =
1330 gst_stream_collection_get_stream (input->collection, i);
1331 /* Only add if not already present in the list */
1332 if (!g_list_find (unsorted_streams, stream))
1333 unsorted_streams = g_list_append (unsorted_streams, stream);
1334 }
1335 }
1336 }
1337
1338 /* re-order streams : video, then audio, then others */
1339 unsorted_streams =
1340 g_list_sort (unsorted_streams, (GCompareFunc) sort_streams);
1341 for (tmp = unsorted_streams; tmp; tmp = tmp->next) {
1342 GstStream *stream = (GstStream *) tmp->data;
1343 GST_DEBUG_OBJECT (dbin, "Adding #stream(%s) to collection",
1344 gst_stream_get_stream_id (stream));
1345 gst_stream_collection_add_stream (res, gst_object_ref (stream));
1346 }
1347
1348 if (unsorted_streams)
1349 g_list_free (unsorted_streams);
1350
1351 return res;
1352 }
1353
1354 /* Call with INPUT_LOCK taken */
1355 static DecodebinInput *
find_message_parsebin(GstDecodebin3 * dbin,GstElement * child)1356 find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
1357 {
1358 DecodebinInput *input = NULL;
1359 GstElement *parent = gst_object_ref (child);
1360 GList *tmp;
1361
1362 do {
1363 GstElement *next_parent;
1364
1365 GST_DEBUG_OBJECT (dbin, "parent %s",
1366 parent ? GST_ELEMENT_NAME (parent) : "<NONE>");
1367
1368 if (parent == dbin->main_input->parsebin) {
1369 input = dbin->main_input;
1370 break;
1371 }
1372 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
1373 DecodebinInput *cur = (DecodebinInput *) tmp->data;
1374 if (parent == cur->parsebin) {
1375 input = cur;
1376 break;
1377 }
1378 }
1379 next_parent = (GstElement *) gst_element_get_parent (parent);
1380 gst_object_unref (parent);
1381 parent = next_parent;
1382
1383 } while (parent && parent != (GstElement *) dbin);
1384
1385 if (parent)
1386 gst_object_unref (parent);
1387
1388 return input;
1389 }
1390
1391 static const gchar *
stream_in_collection(GstDecodebin3 * dbin,gchar * sid)1392 stream_in_collection (GstDecodebin3 * dbin, gchar * sid)
1393 {
1394 guint i, len;
1395
1396 if (dbin->collection == NULL)
1397 return NULL;
1398 len = gst_stream_collection_get_size (dbin->collection);
1399 for (i = 0; i < len; i++) {
1400 GstStream *stream = gst_stream_collection_get_stream (dbin->collection, i);
1401 const gchar *osid = gst_stream_get_stream_id (stream);
1402 if (!g_strcmp0 (sid, osid))
1403 return osid;
1404 }
1405
1406 return NULL;
1407 }
1408
1409 /* Call with INPUT_LOCK taken */
1410 static void
handle_stream_collection(GstDecodebin3 * dbin,GstStreamCollection * collection,GstElement * child)1411 handle_stream_collection (GstDecodebin3 * dbin,
1412 GstStreamCollection * collection, GstElement * child)
1413 {
1414 #ifndef GST_DISABLE_GST_DEBUG
1415 const gchar *upstream_id;
1416 guint i;
1417 #endif
1418 DecodebinInput *input = find_message_parsebin (dbin, child);
1419
1420 if (!input) {
1421 GST_DEBUG_OBJECT (dbin,
1422 "Couldn't find corresponding input, most likely shutting down");
1423 return;
1424 }
1425
1426 /* Replace collection in input */
1427 if (input->collection)
1428 gst_object_unref (input->collection);
1429 input->collection = gst_object_ref (collection);
1430 GST_DEBUG_OBJECT (dbin, "Setting collection %p on input %p", collection,
1431 input);
1432
1433 /* Merge collection if needed */
1434 collection = get_merged_collection (dbin);
1435
1436 #ifndef GST_DISABLE_GST_DEBUG
1437 /* Just some debugging */
1438 upstream_id = gst_stream_collection_get_upstream_id (collection);
1439 GST_DEBUG ("Received Stream Collection. Upstream_id : %s", upstream_id);
1440 GST_DEBUG ("From input %p", input);
1441 GST_DEBUG (" %d streams", gst_stream_collection_get_size (collection));
1442 for (i = 0; i < gst_stream_collection_get_size (collection); i++) {
1443 GstStream *stream = gst_stream_collection_get_stream (collection, i);
1444 GstTagList *taglist;
1445 GstCaps *caps;
1446
1447 GST_DEBUG (" Stream '%s'", gst_stream_get_stream_id (stream));
1448 GST_DEBUG (" type : %s",
1449 gst_stream_type_get_name (gst_stream_get_stream_type (stream)));
1450 GST_DEBUG (" flags : 0x%x", gst_stream_get_stream_flags (stream));
1451 taglist = gst_stream_get_tags (stream);
1452 GST_DEBUG (" tags : %" GST_PTR_FORMAT, taglist);
1453 caps = gst_stream_get_caps (stream);
1454 GST_DEBUG (" caps : %" GST_PTR_FORMAT, caps);
1455 if (taglist)
1456 gst_tag_list_unref (taglist);
1457 if (caps)
1458 gst_caps_unref (caps);
1459 }
1460 #endif
1461
1462 /* Store collection for later usage */
1463 SELECTION_LOCK (dbin);
1464 if (dbin->collection == NULL) {
1465 dbin->collection = collection;
1466 } else {
1467 /* We need to check who emitted this collection (the owner).
1468 * If we already had a collection from that user, this one is an update,
1469 * that is to say that we need to figure out how we are going to re-use
1470 * the streams/slot */
1471 GST_FIXME_OBJECT (dbin, "New collection but already had one ...");
1472 /* FIXME : When do we switch from pending collection to active collection ?
1473 * When all streams from active collection are drained in multiqueue output ? */
1474 gst_object_unref (dbin->collection);
1475 dbin->collection = collection;
1476 /* dbin->pending_collection = */
1477 /* g_list_append (dbin->pending_collection, collection); */
1478 }
1479 dbin->select_streams_seqnum = GST_SEQNUM_INVALID;
1480 SELECTION_UNLOCK (dbin);
1481 }
1482
1483 /* Must be called with the selection lock taken */
1484 static void
gst_decodebin3_update_min_interleave(GstDecodebin3 * dbin)1485 gst_decodebin3_update_min_interleave (GstDecodebin3 * dbin)
1486 {
1487 GstClockTime max_latency = GST_CLOCK_TIME_NONE;
1488 GList *tmp;
1489
1490 GST_DEBUG_OBJECT (dbin, "Recalculating max latency of decoders");
1491 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1492 DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1493 if (GST_CLOCK_TIME_IS_VALID (out->decoder_latency)) {
1494 if (max_latency == GST_CLOCK_TIME_NONE
1495 || out->decoder_latency > max_latency)
1496 max_latency = out->decoder_latency;
1497 }
1498 }
1499 GST_DEBUG_OBJECT (dbin, "max latency of all decoders: %" GST_TIME_FORMAT,
1500 GST_TIME_ARGS (max_latency));
1501
1502 if (!GST_CLOCK_TIME_IS_VALID (max_latency))
1503 return;
1504
1505 /* Make sure we keep an extra overhead */
1506 max_latency += 100 * GST_MSECOND;
1507 if (max_latency == dbin->current_mq_min_interleave)
1508 return;
1509
1510 dbin->current_mq_min_interleave = max_latency;
1511 GST_DEBUG_OBJECT (dbin, "Setting mq min-interleave to %" GST_TIME_FORMAT,
1512 GST_TIME_ARGS (dbin->current_mq_min_interleave));
1513 g_object_set (dbin->multiqueue, "min-interleave-time",
1514 dbin->current_mq_min_interleave, NULL);
1515 }
1516
1517 static void
gst_decodebin3_handle_message(GstBin * bin,GstMessage * message)1518 gst_decodebin3_handle_message (GstBin * bin, GstMessage * message)
1519 {
1520 GstDecodebin3 *dbin = (GstDecodebin3 *) bin;
1521 gboolean posting_collection = FALSE;
1522
1523 GST_DEBUG_OBJECT (bin, "Got Message %s", GST_MESSAGE_TYPE_NAME (message));
1524
1525 switch (GST_MESSAGE_TYPE (message)) {
1526 case GST_MESSAGE_STREAM_COLLECTION:
1527 {
1528 GstStreamCollection *collection = NULL;
1529 gst_message_parse_stream_collection (message, &collection);
1530 if (collection) {
1531 INPUT_LOCK (dbin);
1532 handle_stream_collection (dbin, collection,
1533 (GstElement *) GST_MESSAGE_SRC (message));
1534 posting_collection = TRUE;
1535 INPUT_UNLOCK (dbin);
1536 }
1537
1538 SELECTION_LOCK (dbin);
1539 if (dbin->collection) {
1540 /* Replace collection message, we most likely aggregated it */
1541 GstMessage *new_msg;
1542 new_msg =
1543 gst_message_new_stream_collection ((GstObject *) dbin,
1544 dbin->collection);
1545 gst_message_unref (message);
1546 message = new_msg;
1547 }
1548 SELECTION_UNLOCK (dbin);
1549
1550 if (collection)
1551 gst_object_unref (collection);
1552 break;
1553 }
1554 case GST_MESSAGE_LATENCY:
1555 {
1556 GList *tmp;
1557 /* Check if this is from one of our decoders */
1558 SELECTION_LOCK (dbin);
1559 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1560 DecodebinOutputStream *out = (DecodebinOutputStream *) tmp->data;
1561 if (out->decoder == (GstElement *) GST_MESSAGE_SRC (message)) {
1562 GstClockTime min, max;
1563 if (GST_IS_VIDEO_DECODER (out->decoder)) {
1564 gst_video_decoder_get_latency (GST_VIDEO_DECODER (out->decoder),
1565 &min, &max);
1566 GST_DEBUG_OBJECT (dbin,
1567 "Got latency update from one of our decoders. min: %"
1568 GST_TIME_FORMAT " max: %" GST_TIME_FORMAT, GST_TIME_ARGS (min),
1569 GST_TIME_ARGS (max));
1570 out->decoder_latency = min;
1571 /* Trigger recalculation */
1572 gst_decodebin3_update_min_interleave (dbin);
1573 }
1574 break;
1575 }
1576 }
1577 SELECTION_UNLOCK (dbin);
1578 }
1579 default:
1580 break;
1581 }
1582
1583 GST_BIN_CLASS (parent_class)->handle_message (bin, message);
1584
1585 if (posting_collection) {
1586 /* Figure out a selection for that collection */
1587 update_requested_selection (dbin);
1588 }
1589 }
1590
1591 static DecodebinOutputStream *
find_free_compatible_output(GstDecodebin3 * dbin,GstStream * stream)1592 find_free_compatible_output (GstDecodebin3 * dbin, GstStream * stream)
1593 {
1594 GList *tmp;
1595 GstStreamType stype = gst_stream_get_stream_type (stream);
1596
1597 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1598 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1599 if (output->type == stype && output->slot && output->slot->active_stream) {
1600 GstStream *tstream = output->slot->active_stream;
1601 if (!stream_in_list (dbin->requested_selection,
1602 (gchar *) gst_stream_get_stream_id (tstream))) {
1603 return output;
1604 }
1605 }
1606 }
1607
1608 return NULL;
1609 }
1610
1611 /* Give a certain slot, figure out if it should be linked to an
1612 * output stream
1613 * CALL WITH SELECTION LOCK TAKEN !*/
1614 static DecodebinOutputStream *
get_output_for_slot(MultiQueueSlot * slot)1615 get_output_for_slot (MultiQueueSlot * slot)
1616 {
1617 GstDecodebin3 *dbin = slot->dbin;
1618 DecodebinOutputStream *output = NULL;
1619 const gchar *stream_id;
1620 GstCaps *caps;
1621 gchar *id_in_list = NULL;
1622
1623 /* If we already have a configured output, just use it */
1624 if (slot->output != NULL)
1625 return slot->output;
1626
1627 /*
1628 * FIXME
1629 *
1630 * This method needs to be split into multiple parts
1631 *
1632 * 1) Figure out whether stream should be exposed or not
1633 * This is based on autoplug-continue, EXPOSE_ALL_MODE, or presence
1634 * in the default stream attribution
1635 *
1636 * 2) Figure out whether an output stream should be created, whether
1637 * we can re-use the output stream already linked to the slot, or
1638 * whether we need to get re-assigned another (currently used) output
1639 * stream.
1640 */
1641
1642 stream_id = gst_stream_get_stream_id (slot->active_stream);
1643 caps = gst_stream_get_caps (slot->active_stream);
1644 GST_DEBUG_OBJECT (dbin, "stream %s , %" GST_PTR_FORMAT, stream_id, caps);
1645 gst_caps_unref (caps);
1646
1647 /* 0. Emit autoplug-continue signal for pending caps ? */
1648 GST_FIXME_OBJECT (dbin, "emit autoplug-continue");
1649
1650 /* 1. if in EXPOSE_ALL_MODE, just accept */
1651 GST_FIXME_OBJECT (dbin, "Handle EXPOSE_ALL_MODE");
1652
1653 #if 0
1654 /* FIXME : The idea around this was to avoid activating a stream for
1655 * which we have no decoder. Unfortunately it is way too
1656 * expensive. Need to figure out a better solution */
1657 /* 2. Is there a potential decoder (if one is required) */
1658 if (!gst_caps_can_intersect (caps, dbin->caps)
1659 && !have_factory (dbin, (GstCaps *) caps,
1660 GST_ELEMENT_FACTORY_TYPE_DECODER)) {
1661 GST_WARNING_OBJECT (dbin, "Don't have a decoder for %" GST_PTR_FORMAT,
1662 caps);
1663 SELECTION_UNLOCK (dbin);
1664 gst_element_post_message (GST_ELEMENT_CAST (dbin),
1665 gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
1666 SELECTION_LOCK (dbin);
1667 return NULL;
1668 }
1669 #endif
1670
1671 /* 3. In default mode check if we should expose */
1672 id_in_list = (gchar *) stream_in_list (dbin->requested_selection, stream_id);
1673 if (id_in_list) {
1674 /* Check if we can steal an existing output stream we could re-use.
1675 * that is:
1676 * * an output stream whose slot->stream is not in requested
1677 * * and is of the same type as this stream
1678 */
1679 output = find_free_compatible_output (dbin, slot->active_stream);
1680 if (output) {
1681 /* Move this output from its current slot to this slot */
1682 dbin->to_activate =
1683 g_list_append (dbin->to_activate, (gchar *) stream_id);
1684 dbin->requested_selection =
1685 g_list_remove (dbin->requested_selection, id_in_list);
1686 g_free (id_in_list);
1687 SELECTION_UNLOCK (dbin);
1688 gst_pad_add_probe (output->slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
1689 (GstPadProbeCallback) slot_unassign_probe, output->slot, NULL);
1690 SELECTION_LOCK (dbin);
1691 return NULL;
1692 }
1693
1694 output = create_output_stream (dbin, slot->type);
1695 output->slot = slot;
1696 GST_DEBUG ("Linking slot %p to new output %p", slot, output);
1697 slot->output = output;
1698 dbin->active_selection =
1699 g_list_append (dbin->active_selection, (gchar *) stream_id);
1700 } else
1701 GST_DEBUG ("Not creating any output for slot %p", slot);
1702
1703 return output;
1704 }
1705
1706 /* Returns SELECTED_STREAMS message if active_selection is equal to
1707 * requested_selection, else NULL.
1708 * Must be called with LOCK taken */
1709 static GstMessage *
is_selection_done(GstDecodebin3 * dbin)1710 is_selection_done (GstDecodebin3 * dbin)
1711 {
1712 GList *tmp;
1713 GstMessage *msg;
1714
1715 if (!dbin->selection_updated)
1716 return NULL;
1717
1718 GST_LOG_OBJECT (dbin, "Checking");
1719
1720 if (dbin->to_activate != NULL) {
1721 GST_DEBUG ("Still have streams to activate");
1722 return NULL;
1723 }
1724 for (tmp = dbin->requested_selection; tmp; tmp = tmp->next) {
1725 GST_DEBUG ("Checking requested stream %s", (gchar *) tmp->data);
1726 if (!stream_in_list (dbin->active_selection, (gchar *) tmp->data)) {
1727 GST_DEBUG ("Not in active selection, returning");
1728 return NULL;
1729 }
1730 }
1731
1732 GST_DEBUG_OBJECT (dbin, "Selection active, creating message");
1733
1734 /* We are completely active */
1735 msg = gst_message_new_streams_selected ((GstObject *) dbin, dbin->collection);
1736 if (dbin->select_streams_seqnum != GST_SEQNUM_INVALID) {
1737 gst_message_set_seqnum (msg, dbin->select_streams_seqnum);
1738 }
1739 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
1740 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
1741 if (output->slot) {
1742 GST_DEBUG_OBJECT (dbin, "Adding stream %s",
1743 gst_stream_get_stream_id (output->slot->active_stream));
1744
1745 gst_message_streams_selected_add (msg, output->slot->active_stream);
1746 } else
1747 GST_WARNING_OBJECT (dbin, "No valid slot for output %p", output);
1748 }
1749 dbin->selection_updated = FALSE;
1750 return msg;
1751 }
1752
1753 /* Must be called with SELECTION_LOCK taken */
1754 static void
check_all_slot_for_eos(GstDecodebin3 * dbin)1755 check_all_slot_for_eos (GstDecodebin3 * dbin)
1756 {
1757 gboolean all_drained = TRUE;
1758 GList *iter;
1759
1760 GST_DEBUG_OBJECT (dbin, "check slot for eos");
1761
1762 for (iter = dbin->slots; iter; iter = iter->next) {
1763 MultiQueueSlot *slot = iter->data;
1764
1765 if (!slot->output)
1766 continue;
1767
1768 if (slot->is_drained) {
1769 GST_LOG_OBJECT (slot->sink_pad, "slot %p is drained", slot);
1770 continue;
1771 }
1772
1773 all_drained = FALSE;
1774 break;
1775 }
1776
1777 if (all_drained) {
1778 INPUT_LOCK (dbin);
1779 if (!pending_pads_are_eos (dbin->main_input))
1780 all_drained = FALSE;
1781
1782 if (all_drained) {
1783 for (iter = dbin->other_inputs; iter; iter = iter->next) {
1784 if (!pending_pads_are_eos ((DecodebinInput *) iter->data)) {
1785 all_drained = FALSE;
1786 break;
1787 }
1788 }
1789 }
1790 INPUT_UNLOCK (dbin);
1791 }
1792
1793 if (all_drained) {
1794 GST_DEBUG_OBJECT (dbin,
1795 "All active slots are drained, and no pending input, push EOS");
1796
1797 for (iter = dbin->input_streams; iter; iter = iter->next) {
1798 DecodebinInputStream *input = (DecodebinInputStream *) iter->data;
1799 GstPad *peer = gst_pad_get_peer (input->srcpad);
1800
1801 /* Send EOS to all slots */
1802 if (peer) {
1803 GstEvent *stream_start, *eos;
1804
1805 stream_start =
1806 gst_pad_get_sticky_event (input->srcpad, GST_EVENT_STREAM_START, 0);
1807
1808 /* First forward a custom STREAM_START event to reset the EOS status (if any) */
1809 if (stream_start) {
1810 GstStructure *s;
1811 GstEvent *custom_stream_start = gst_event_copy (stream_start);
1812 gst_event_unref (stream_start);
1813 s = (GstStructure *) gst_event_get_structure (custom_stream_start);
1814 gst_structure_set (s, "decodebin3-flushing-stream-start",
1815 G_TYPE_BOOLEAN, TRUE, NULL);
1816 gst_pad_send_event (peer, custom_stream_start);
1817 }
1818
1819 eos = gst_event_new_eos ();
1820 gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (eos),
1821 CUSTOM_FINAL_EOS_QUARK, (gchar *) CUSTOM_FINAL_EOS_QUARK_DATA,
1822 NULL);
1823 gst_pad_send_event (peer, eos);
1824 gst_object_unref (peer);
1825 } else
1826 GST_DEBUG_OBJECT (dbin, "no output");
1827 }
1828 }
1829 }
1830
1831 static GstPadProbeReturn
multiqueue_src_probe(GstPad * pad,GstPadProbeInfo * info,MultiQueueSlot * slot)1832 multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
1833 MultiQueueSlot * slot)
1834 {
1835 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
1836 GstDecodebin3 *dbin = slot->dbin;
1837
1838 if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
1839 GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
1840
1841 GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
1842 switch (GST_EVENT_TYPE (ev)) {
1843 case GST_EVENT_STREAM_START:
1844 {
1845 GstStream *stream = NULL;
1846 const GstStructure *s = gst_event_get_structure (ev);
1847
1848 /* Drop STREAM_START events used to cleanup multiqueue */
1849 if (s
1850 && gst_structure_has_field (s,
1851 "decodebin3-flushing-stream-start")) {
1852 ret = GST_PAD_PROBE_HANDLED;
1853 gst_event_unref (ev);
1854 break;
1855 }
1856
1857 gst_event_parse_stream (ev, &stream);
1858 if (stream == NULL) {
1859 GST_ERROR_OBJECT (pad,
1860 "Got a STREAM_START event without a GstStream");
1861 break;
1862 }
1863 slot->is_drained = FALSE;
1864 GST_DEBUG_OBJECT (pad, "Stream Start '%s'",
1865 gst_stream_get_stream_id (stream));
1866 if (slot->active_stream == NULL) {
1867 slot->active_stream = stream;
1868 } else if (slot->active_stream != stream) {
1869 GST_FIXME_OBJECT (pad, "Handle stream changes (%s => %s) !",
1870 gst_stream_get_stream_id (slot->active_stream),
1871 gst_stream_get_stream_id (stream));
1872 gst_object_unref (slot->active_stream);
1873 slot->active_stream = stream;
1874 } else
1875 gst_object_unref (stream);
1876 #if 0 /* Disabled because stream-start is pushed for every buffer on every unlinked pad */
1877 {
1878 gboolean is_active, is_requested;
1879 /* Quick check to see if we're in the current selection */
1880 /* FIXME : Re-check all slot<=>output mappings based on requested_selection */
1881 SELECTION_LOCK (dbin);
1882 GST_DEBUG_OBJECT (dbin, "Checking active selection");
1883 is_active = stream_in_list (dbin->active_selection, stream_id);
1884 GST_DEBUG_OBJECT (dbin, "Checking requested selection");
1885 is_requested = stream_in_list (dbin->requested_selection, stream_id);
1886 SELECTION_UNLOCK (dbin);
1887 if (is_active)
1888 GST_DEBUG_OBJECT (pad, "Slot in ACTIVE selection (output:%p)",
1889 slot->output);
1890 if (is_requested)
1891 GST_DEBUG_OBJECT (pad, "Slot in REQUESTED selection (output:%p)",
1892 slot->output);
1893 else if (slot->output) {
1894 GST_DEBUG_OBJECT (pad,
1895 "Slot needs to be deactivated ? It's no longer in requested selection");
1896 } else if (!is_active)
1897 GST_DEBUG_OBJECT (pad,
1898 "Slot in neither active nor requested selection");
1899 }
1900 #endif
1901 }
1902 break;
1903 case GST_EVENT_CAPS:
1904 {
1905 /* Configure the output slot if needed */
1906 DecodebinOutputStream *output;
1907 GstMessage *msg = NULL;
1908 SELECTION_LOCK (dbin);
1909 output = get_output_for_slot (slot);
1910 if (output) {
1911 reconfigure_output_stream (output, slot);
1912 msg = is_selection_done (dbin);
1913 }
1914 SELECTION_UNLOCK (dbin);
1915 if (msg)
1916 gst_element_post_message ((GstElement *) slot->dbin, msg);
1917 }
1918 break;
1919 case GST_EVENT_EOS:
1920 {
1921 gboolean was_drained = slot->is_drained;
1922 slot->is_drained = TRUE;
1923
1924 /* Custom EOS handling first */
1925 if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
1926 CUSTOM_EOS_QUARK)) {
1927 /* remove custom-eos */
1928 gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev),
1929 CUSTOM_EOS_QUARK, NULL, NULL);
1930 GST_LOG_OBJECT (pad, "Received custom EOS");
1931 ret = GST_PAD_PROBE_HANDLED;
1932 SELECTION_LOCK (dbin);
1933 if (slot->input == NULL) {
1934 GST_DEBUG_OBJECT (pad,
1935 "Got custom-eos from null input stream, remove output stream");
1936 /* Remove the output */
1937 if (slot->output) {
1938 DecodebinOutputStream *output = slot->output;
1939 dbin->output_streams =
1940 g_list_remove (dbin->output_streams, output);
1941 free_output_stream (dbin, output);
1942 /* Reacalculate min interleave */
1943 gst_decodebin3_update_min_interleave (dbin);
1944 }
1945 slot->probe_id = 0;
1946 dbin->slots = g_list_remove (dbin->slots, slot);
1947 free_multiqueue_slot_async (dbin, slot);
1948 ret = GST_PAD_PROBE_REMOVE;
1949 } else if (!was_drained) {
1950 check_all_slot_for_eos (dbin);
1951 }
1952 if (ret == GST_PAD_PROBE_HANDLED)
1953 gst_event_unref (ev);
1954 SELECTION_UNLOCK (dbin);
1955 break;
1956 }
1957
1958 GST_FIXME_OBJECT (pad, "EOS on multiqueue source pad. input:%p",
1959 slot->input);
1960 if (slot->input == NULL) {
1961 GstPad *peer;
1962 GST_DEBUG_OBJECT (pad,
1963 "last EOS for input, forwarding and removing slot");
1964 peer = gst_pad_get_peer (pad);
1965 if (peer) {
1966 gst_pad_send_event (peer, ev);
1967 gst_object_unref (peer);
1968 } else {
1969 gst_event_unref (ev);
1970 }
1971 SELECTION_LOCK (dbin);
1972 /* FIXME : Shouldn't we try to re-assign the output instead of just
1973 * removing it ? */
1974 /* Remove the output */
1975 if (slot->output) {
1976 DecodebinOutputStream *output = slot->output;
1977 dbin->output_streams = g_list_remove (dbin->output_streams, output);
1978 free_output_stream (dbin, output);
1979 }
1980 slot->probe_id = 0;
1981 dbin->slots = g_list_remove (dbin->slots, slot);
1982 SELECTION_UNLOCK (dbin);
1983
1984 free_multiqueue_slot_async (dbin, slot);
1985 ret = GST_PAD_PROBE_REMOVE;
1986 } else if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
1987 CUSTOM_FINAL_EOS_QUARK)) {
1988 GST_DEBUG_OBJECT (pad, "Got final eos, propagating downstream");
1989 } else {
1990 GST_DEBUG_OBJECT (pad, "Got regular eos (all_inputs_are_eos)");
1991 /* drop current event as eos will be sent in check_all_slot_for_eos
1992 * when all output streams are also eos */
1993 ret = GST_PAD_PROBE_DROP;
1994 SELECTION_LOCK (dbin);
1995 check_all_slot_for_eos (dbin);
1996 SELECTION_UNLOCK (dbin);
1997 }
1998 }
1999 break;
2000 default:
2001 break;
2002 }
2003 } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
2004 GstQuery *query = GST_PAD_PROBE_INFO_QUERY (info);
2005 switch (GST_QUERY_TYPE (query)) {
2006 case GST_QUERY_CAPS:
2007 {
2008 GST_DEBUG_OBJECT (pad, "Intercepting CAPS query");
2009 gst_query_set_caps_result (query, GST_CAPS_ANY);
2010 ret = GST_PAD_PROBE_HANDLED;
2011 }
2012 break;
2013
2014 case GST_QUERY_ACCEPT_CAPS:
2015 {
2016 GST_DEBUG_OBJECT (pad, "Intercepting Accept Caps query");
2017 /* If the current decoder doesn't accept caps, we'll reconfigure
2018 * on the actual caps event. So accept any caps. */
2019 gst_query_set_accept_caps_result (query, TRUE);
2020 ret = GST_PAD_PROBE_HANDLED;
2021 }
2022 default:
2023 break;
2024 }
2025 }
2026
2027 return ret;
2028 }
2029
2030 /* Create a new multiqueue slot for the given type
2031 *
2032 * It is up to the caller to know whether that slot is needed or not
2033 * (and release it when no longer needed) */
2034 static MultiQueueSlot *
create_new_slot(GstDecodebin3 * dbin,GstStreamType type)2035 create_new_slot (GstDecodebin3 * dbin, GstStreamType type)
2036 {
2037 MultiQueueSlot *slot;
2038 GstIterator *it = NULL;
2039 GValue item = { 0, };
2040
2041 GST_DEBUG_OBJECT (dbin, "Creating new slot for type %s",
2042 gst_stream_type_get_name (type));
2043 slot = g_new0 (MultiQueueSlot, 1);
2044 slot->dbin = dbin;
2045
2046 slot->id = dbin->slot_id++;
2047
2048 slot->type = type;
2049 slot->sink_pad = gst_element_request_pad_simple (dbin->multiqueue, "sink_%u");
2050 if (slot->sink_pad == NULL)
2051 goto fail;
2052
2053 it = gst_pad_iterate_internal_links (slot->sink_pad);
2054 if (!it || (gst_iterator_next (it, &item)) != GST_ITERATOR_OK
2055 || ((slot->src_pad = g_value_dup_object (&item)) == NULL)) {
2056 GST_ERROR ("Couldn't get srcpad from multiqueue for sink pad %s:%s",
2057 GST_DEBUG_PAD_NAME (slot->src_pad));
2058 goto fail;
2059 }
2060 gst_iterator_free (it);
2061 g_value_reset (&item);
2062
2063 g_object_set (slot->sink_pad, "group-id", (guint) type, NULL);
2064
2065 /* Add event probe */
2066 slot->probe_id =
2067 gst_pad_add_probe (slot->src_pad,
2068 GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM,
2069 (GstPadProbeCallback) multiqueue_src_probe, slot, NULL);
2070
2071 GST_DEBUG ("Created new slot %u (%p) (%s:%s)", slot->id, slot,
2072 GST_DEBUG_PAD_NAME (slot->src_pad));
2073
2074 dbin->slots = g_list_append (dbin->slots, slot);
2075
2076 return slot;
2077
2078 /* ERRORS */
2079 fail:
2080 {
2081 if (slot->sink_pad)
2082 gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2083 g_free (slot);
2084 return NULL;
2085 }
2086 }
2087
2088 /* Must be called with SELECTION_LOCK */
2089 static MultiQueueSlot *
get_slot_for_input(GstDecodebin3 * dbin,DecodebinInputStream * input)2090 get_slot_for_input (GstDecodebin3 * dbin, DecodebinInputStream * input)
2091 {
2092 GList *tmp;
2093 MultiQueueSlot *empty_slot = NULL;
2094 GstStreamType input_type = 0;
2095 gchar *stream_id = NULL;
2096
2097 GST_DEBUG_OBJECT (dbin, "input %p (stream %p %s)",
2098 input, input->active_stream,
2099 input->
2100 active_stream ? gst_stream_get_stream_id (input->active_stream) : "");
2101
2102 if (input->active_stream) {
2103 input_type = gst_stream_get_stream_type (input->active_stream);
2104 stream_id = (gchar *) gst_stream_get_stream_id (input->active_stream);
2105 }
2106
2107 /* Go over existing slots and check if there is already one for it */
2108 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2109 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2110 /* Already used input, return that one */
2111 if (slot->input == input) {
2112 GST_DEBUG_OBJECT (dbin, "Returning already specified slot %d", slot->id);
2113 return slot;
2114 }
2115 }
2116
2117 /* Go amongst all unused slots of the right type and try to find a candidate */
2118 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2119 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2120 if (slot->input == NULL && input_type == slot->type) {
2121 /* Remember this empty slot for later */
2122 empty_slot = slot;
2123 /* Check if available slot is of the same stream_id */
2124 GST_LOG_OBJECT (dbin, "Checking candidate slot %d (active_stream:%p)",
2125 slot->id, slot->active_stream);
2126 if (stream_id && slot->active_stream) {
2127 gchar *ostream_id =
2128 (gchar *) gst_stream_get_stream_id (slot->active_stream);
2129 GST_DEBUG_OBJECT (dbin, "Checking slot %d %s against %s", slot->id,
2130 ostream_id, stream_id);
2131 if (!g_strcmp0 (stream_id, ostream_id))
2132 break;
2133 }
2134 }
2135 }
2136
2137 if (empty_slot) {
2138 GST_DEBUG_OBJECT (dbin, "Re-using existing unused slot %d", empty_slot->id);
2139 empty_slot->input = input;
2140 return empty_slot;
2141 }
2142
2143 if (input_type)
2144 return create_new_slot (dbin, input_type);
2145
2146 return NULL;
2147 }
2148
2149 static void
link_input_to_slot(DecodebinInputStream * input,MultiQueueSlot * slot)2150 link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
2151 {
2152 if (slot->input != NULL && slot->input != input) {
2153 GST_ERROR_OBJECT (slot->dbin,
2154 "Trying to link input to an already used slot");
2155 return;
2156 }
2157 gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
2158 slot->pending_stream = input->active_stream;
2159 slot->input = input;
2160 }
2161
2162 #if 0
2163 static gboolean
2164 have_factory (GstDecodebin3 * dbin, GstCaps * caps,
2165 GstElementFactoryListType ftype)
2166 {
2167 gboolean ret = FALSE;
2168 GList *res;
2169
2170 g_mutex_lock (&dbin->factories_lock);
2171 gst_decode_bin_update_factories_list (dbin);
2172 if (ftype == GST_ELEMENT_FACTORY_TYPE_DECODER)
2173 res =
2174 gst_element_factory_list_filter (dbin->decoder_factories,
2175 caps, GST_PAD_SINK, TRUE);
2176 else
2177 res =
2178 gst_element_factory_list_filter (dbin->decodable_factories,
2179 caps, GST_PAD_SINK, TRUE);
2180 g_mutex_unlock (&dbin->factories_lock);
2181
2182 if (res) {
2183 ret = TRUE;
2184 gst_plugin_feature_list_free (res);
2185 }
2186
2187 return ret;
2188 }
2189 #endif
2190
2191 static GList *
create_decoder_factory_list(GstDecodebin3 * dbin,GstCaps * caps)2192 create_decoder_factory_list (GstDecodebin3 * dbin, GstCaps * caps)
2193 {
2194 GList *res;
2195
2196 g_mutex_lock (&dbin->factories_lock);
2197 gst_decode_bin_update_factories_list (dbin);
2198 res = gst_element_factory_list_filter (dbin->decoder_factories,
2199 caps, GST_PAD_SINK, TRUE);
2200 g_mutex_unlock (&dbin->factories_lock);
2201 return res;
2202 }
2203
2204 static GstPadProbeReturn
keyframe_waiter_probe(GstPad * pad,GstPadProbeInfo * info,DecodebinOutputStream * output)2205 keyframe_waiter_probe (GstPad * pad, GstPadProbeInfo * info,
2206 DecodebinOutputStream * output)
2207 {
2208 GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER (info);
2209 /* If we have a keyframe, remove the probe and let all data through */
2210 /* FIXME : HANDLE HEADER BUFFER ?? */
2211 if (!GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_DELTA_UNIT) ||
2212 GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
2213 GST_DEBUG_OBJECT (pad,
2214 "Buffer is keyframe or header, letting through and removing probe");
2215 output->drop_probe_id = 0;
2216 return GST_PAD_PROBE_REMOVE;
2217 }
2218 GST_DEBUG_OBJECT (pad, "Buffer is not a keyframe, dropping");
2219 return GST_PAD_PROBE_DROP;
2220 }
2221
2222 static void
reconfigure_output_stream(DecodebinOutputStream * output,MultiQueueSlot * slot)2223 reconfigure_output_stream (DecodebinOutputStream * output,
2224 MultiQueueSlot * slot)
2225 {
2226 GstDecodebin3 *dbin = output->dbin;
2227 GstCaps *new_caps = (GstCaps *) gst_stream_get_caps (slot->active_stream);
2228 gboolean needs_decoder;
2229
2230 needs_decoder = gst_caps_can_intersect (new_caps, dbin->caps) != TRUE;
2231
2232 GST_DEBUG_OBJECT (dbin,
2233 "Reconfiguring output %p to slot %p, needs_decoder:%d", output, slot,
2234 needs_decoder);
2235
2236 /* FIXME : Maybe make the output un-hook itself automatically ? */
2237 if (output->slot != NULL && output->slot != slot) {
2238 GST_WARNING_OBJECT (dbin,
2239 "Output still linked to another slot (%p)", output->slot);
2240 gst_caps_unref (new_caps);
2241 return;
2242 }
2243
2244 /* Check if existing config is reusable as-is by checking if
2245 * the existing decoder accepts the new caps, if not delete
2246 * it and create a new one */
2247 if (output->decoder) {
2248 gboolean can_reuse_decoder;
2249
2250 if (needs_decoder) {
2251 can_reuse_decoder =
2252 gst_pad_query_accept_caps (output->decoder_sink, new_caps);
2253 } else
2254 can_reuse_decoder = FALSE;
2255
2256 if (can_reuse_decoder) {
2257 if (output->type & GST_STREAM_TYPE_VIDEO && output->drop_probe_id == 0) {
2258 GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2259 output->drop_probe_id =
2260 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2261 (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2262 }
2263 GST_DEBUG_OBJECT (dbin, "Reusing existing decoder for slot %p", slot);
2264 if (output->linked == FALSE) {
2265 gst_pad_link_full (slot->src_pad, output->decoder_sink,
2266 GST_PAD_LINK_CHECK_NOTHING);
2267 output->linked = TRUE;
2268 }
2269 gst_caps_unref (new_caps);
2270 return;
2271 }
2272
2273 GST_DEBUG_OBJECT (dbin, "Removing old decoder for slot %p", slot);
2274
2275 if (output->linked)
2276 gst_pad_unlink (slot->src_pad, output->decoder_sink);
2277 output->linked = FALSE;
2278 if (output->drop_probe_id) {
2279 gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2280 output->drop_probe_id = 0;
2281 }
2282
2283 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2284 GST_ERROR_OBJECT (dbin, "Could not release decoder pad");
2285 gst_caps_unref (new_caps);
2286 goto cleanup;
2287 }
2288
2289 gst_element_set_locked_state (output->decoder, TRUE);
2290 gst_element_set_state (output->decoder, GST_STATE_NULL);
2291
2292 gst_bin_remove ((GstBin *) dbin, output->decoder);
2293 output->decoder = NULL;
2294 output->decoder_latency = GST_CLOCK_TIME_NONE;
2295 } else if (output->linked) {
2296 /* Otherwise if we have no decoder yet but the output is linked make
2297 * sure that the ghost pad is really unlinked in case no decoder was
2298 * needed previously */
2299 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL)) {
2300 GST_ERROR_OBJECT (dbin, "Could not release ghost pad");
2301 gst_caps_unref (new_caps);
2302 goto cleanup;
2303 }
2304 }
2305
2306 gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
2307 gst_object_replace ((GstObject **) & output->decoder_src, NULL);
2308
2309 /* If a decoder is required, create one */
2310 if (needs_decoder) {
2311 GList *factories, *next_factory;
2312
2313 factories = next_factory = create_decoder_factory_list (dbin, new_caps);
2314 while (!output->decoder) {
2315 gboolean decoder_failed = FALSE;
2316
2317 /* If we don't have a decoder yet, instantiate one */
2318 if (next_factory) {
2319 output->decoder = gst_element_factory_create ((GstElementFactory *)
2320 next_factory->data, NULL);
2321 GST_DEBUG ("Created decoder '%s'", GST_ELEMENT_NAME (output->decoder));
2322 } else
2323 GST_DEBUG ("Could not find an element for caps %" GST_PTR_FORMAT,
2324 new_caps);
2325
2326 if (output->decoder == NULL) {
2327 GstCaps *caps;
2328
2329 SELECTION_UNLOCK (dbin);
2330 /* FIXME : Should we be smarter if there's a missing decoder ?
2331 * Should we deactivate that stream ? */
2332 caps = gst_stream_get_caps (slot->active_stream);
2333 gst_element_post_message (GST_ELEMENT_CAST (dbin),
2334 gst_missing_decoder_message_new (GST_ELEMENT_CAST (dbin), caps));
2335 gst_caps_unref (caps);
2336 SELECTION_LOCK (dbin);
2337 goto cleanup;
2338 }
2339 if (!gst_bin_add ((GstBin *) dbin, output->decoder)) {
2340 GST_ERROR_OBJECT (dbin, "could not add decoder to pipeline");
2341 goto cleanup;
2342 }
2343 output->decoder_sink =
2344 gst_element_get_static_pad (output->decoder, "sink");
2345 output->decoder_src = gst_element_get_static_pad (output->decoder, "src");
2346 if (output->type & GST_STREAM_TYPE_VIDEO) {
2347 GST_DEBUG_OBJECT (dbin, "Adding keyframe-waiter probe");
2348 output->drop_probe_id =
2349 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_BUFFER,
2350 (GstPadProbeCallback) keyframe_waiter_probe, output, NULL);
2351 }
2352 if (gst_pad_link_full (slot->src_pad, output->decoder_sink,
2353 GST_PAD_LINK_CHECK_NOTHING) != GST_PAD_LINK_OK) {
2354 GST_ERROR_OBJECT (dbin, "could not link to %s:%s",
2355 GST_DEBUG_PAD_NAME (output->decoder_sink));
2356 goto cleanup;
2357 }
2358 if (gst_element_set_state (output->decoder,
2359 GST_STATE_READY) == GST_STATE_CHANGE_FAILURE) {
2360 GST_DEBUG_OBJECT (dbin,
2361 "Decoder '%s' failed to reach READY state, trying the next type",
2362 GST_ELEMENT_NAME (output->decoder));
2363 decoder_failed = TRUE;
2364 }
2365 if (!gst_pad_query_accept_caps (output->decoder_sink, new_caps)) {
2366 GST_DEBUG_OBJECT (dbin,
2367 "Decoder '%s' did not accept the caps, trying the next type",
2368 GST_ELEMENT_NAME (output->decoder));
2369 decoder_failed = TRUE;
2370 }
2371 if (decoder_failed) {
2372 gst_pad_unlink (slot->src_pad, output->decoder_sink);
2373 if (output->drop_probe_id) {
2374 gst_pad_remove_probe (slot->src_pad, output->drop_probe_id);
2375 output->drop_probe_id = 0;
2376 }
2377
2378 gst_element_set_locked_state (output->decoder, TRUE);
2379 gst_element_set_state (output->decoder, GST_STATE_NULL);
2380
2381 gst_bin_remove ((GstBin *) dbin, output->decoder);
2382 output->decoder = NULL;
2383 }
2384 next_factory = next_factory->next;
2385 }
2386 gst_plugin_feature_list_free (factories);
2387 } else {
2388 output->decoder_src = gst_object_ref (slot->src_pad);
2389 output->decoder_sink = NULL;
2390 }
2391 gst_caps_unref (new_caps);
2392
2393 output->linked = TRUE;
2394 if (!gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad,
2395 output->decoder_src)) {
2396 GST_ERROR_OBJECT (dbin, "Could not expose decoder pad");
2397 goto cleanup;
2398 }
2399 if (output->src_exposed == FALSE) {
2400 GstEvent *stream_start;
2401
2402 stream_start = gst_pad_get_sticky_event (slot->src_pad,
2403 GST_EVENT_STREAM_START, 0);
2404
2405 /* Ensure GstStream is accesiable from pad-added callback */
2406 if (stream_start) {
2407 gst_pad_store_sticky_event (output->src_pad, stream_start);
2408 gst_event_unref (stream_start);
2409 } else {
2410 GST_WARNING_OBJECT (slot->src_pad,
2411 "Pad has no stored stream-start event");
2412 }
2413
2414 output->src_exposed = TRUE;
2415 gst_element_add_pad (GST_ELEMENT_CAST (dbin), output->src_pad);
2416 }
2417
2418 if (output->decoder)
2419 gst_element_sync_state_with_parent (output->decoder);
2420
2421 output->slot = slot;
2422 return;
2423
2424 cleanup:
2425 {
2426 GST_DEBUG_OBJECT (dbin, "Cleanup");
2427 if (output->decoder_sink) {
2428 gst_object_unref (output->decoder_sink);
2429 output->decoder_sink = NULL;
2430 }
2431 if (output->decoder_src) {
2432 gst_object_unref (output->decoder_src);
2433 output->decoder_src = NULL;
2434 }
2435 if (output->decoder) {
2436 gst_element_set_state (output->decoder, GST_STATE_NULL);
2437 gst_bin_remove ((GstBin *) dbin, output->decoder);
2438 output->decoder = NULL;
2439 }
2440 }
2441 }
2442
2443 static GstPadProbeReturn
idle_reconfigure(GstPad * pad,GstPadProbeInfo * info,MultiQueueSlot * slot)2444 idle_reconfigure (GstPad * pad, GstPadProbeInfo * info, MultiQueueSlot * slot)
2445 {
2446 GstMessage *msg = NULL;
2447 DecodebinOutputStream *output;
2448
2449 SELECTION_LOCK (slot->dbin);
2450 output = get_output_for_slot (slot);
2451
2452 GST_DEBUG_OBJECT (pad, "output : %p", output);
2453
2454 if (output) {
2455 reconfigure_output_stream (output, slot);
2456 msg = is_selection_done (slot->dbin);
2457 }
2458 SELECTION_UNLOCK (slot->dbin);
2459 if (msg)
2460 gst_element_post_message ((GstElement *) slot->dbin, msg);
2461
2462 return GST_PAD_PROBE_REMOVE;
2463 }
2464
2465 static MultiQueueSlot *
find_slot_for_stream_id(GstDecodebin3 * dbin,const gchar * sid)2466 find_slot_for_stream_id (GstDecodebin3 * dbin, const gchar * sid)
2467 {
2468 GList *tmp;
2469
2470 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2471 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2472 const gchar *stream_id;
2473 if (slot->active_stream) {
2474 stream_id = gst_stream_get_stream_id (slot->active_stream);
2475 if (!g_strcmp0 (sid, stream_id))
2476 return slot;
2477 }
2478 if (slot->pending_stream && slot->pending_stream != slot->active_stream) {
2479 stream_id = gst_stream_get_stream_id (slot->pending_stream);
2480 if (!g_strcmp0 (sid, stream_id))
2481 return slot;
2482 }
2483 }
2484
2485 return NULL;
2486 }
2487
2488 /* This function handles the reassignment of a slot. Call this from
2489 * the streaming thread of a slot. */
2490 static gboolean
reassign_slot(GstDecodebin3 * dbin,MultiQueueSlot * slot)2491 reassign_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2492 {
2493 DecodebinOutputStream *output;
2494 MultiQueueSlot *target_slot = NULL;
2495 GList *tmp;
2496 const gchar *sid, *tsid;
2497
2498 SELECTION_LOCK (dbin);
2499 output = slot->output;
2500
2501 if (G_UNLIKELY (slot->active_stream == NULL)) {
2502 GST_DEBUG_OBJECT (slot->src_pad,
2503 "Called on inactive slot (active_stream == NULL)");
2504 SELECTION_UNLOCK (dbin);
2505 return FALSE;
2506 }
2507
2508 if (G_UNLIKELY (output == NULL)) {
2509 GST_DEBUG_OBJECT (slot->src_pad,
2510 "Slot doesn't have any output to be removed");
2511 SELECTION_UNLOCK (dbin);
2512 return FALSE;
2513 }
2514
2515 sid = gst_stream_get_stream_id (slot->active_stream);
2516 GST_DEBUG_OBJECT (slot->src_pad, "slot %s %p", sid, slot);
2517
2518 /* Recheck whether this stream is still in the list of streams to deactivate */
2519 if (stream_in_list (dbin->requested_selection, sid)) {
2520 /* Stream is in the list of requested streams, don't remove */
2521 SELECTION_UNLOCK (dbin);
2522 GST_DEBUG_OBJECT (slot->src_pad,
2523 "Stream '%s' doesn't need to be deactivated", sid);
2524 return FALSE;
2525 }
2526
2527 /* Unlink slot from output */
2528 /* FIXME : Handle flushing ? */
2529 /* FIXME : Handle outputs without decoders */
2530 GST_DEBUG_OBJECT (slot->src_pad, "Unlinking from decoder %p",
2531 output->decoder_sink);
2532 if (output->decoder_sink)
2533 gst_pad_unlink (slot->src_pad, output->decoder_sink);
2534 output->linked = FALSE;
2535 slot->output = NULL;
2536 output->slot = NULL;
2537 /* Remove sid from active selection */
2538 for (tmp = dbin->active_selection; tmp; tmp = tmp->next)
2539 if (!g_strcmp0 (sid, tmp->data)) {
2540 dbin->active_selection = g_list_delete_link (dbin->active_selection, tmp);
2541 break;
2542 }
2543
2544 /* Can we re-assign this output to a requested stream ? */
2545 GST_DEBUG_OBJECT (slot->src_pad, "Attempting to re-assing output stream");
2546 for (tmp = dbin->to_activate; tmp; tmp = tmp->next) {
2547 MultiQueueSlot *tslot = find_slot_for_stream_id (dbin, tmp->data);
2548 GST_LOG_OBJECT (tslot->src_pad, "Checking slot %p (output:%p , stream:%s)",
2549 tslot, tslot->output, gst_stream_get_stream_id (tslot->active_stream));
2550 if (tslot && tslot->type == output->type && tslot->output == NULL) {
2551 GST_DEBUG_OBJECT (tslot->src_pad, "Using as reassigned slot");
2552 target_slot = tslot;
2553 tsid = tmp->data;
2554 /* Pass target stream id to requested selection */
2555 dbin->requested_selection =
2556 g_list_append (dbin->requested_selection, g_strdup (tmp->data));
2557 dbin->to_activate = g_list_remove (dbin->to_activate, tmp->data);
2558 break;
2559 }
2560 }
2561
2562 if (target_slot) {
2563 GST_DEBUG_OBJECT (slot->src_pad, "Assigning output to slot %p '%s'",
2564 target_slot, tsid);
2565 target_slot->output = output;
2566 output->slot = target_slot;
2567 dbin->active_selection =
2568 g_list_append (dbin->active_selection, (gchar *) tsid);
2569 SELECTION_UNLOCK (dbin);
2570
2571 /* Wakeup the target slot so that it retries to send events/buffers
2572 * thereby triggering the output reconfiguration codepath */
2573 gst_pad_add_probe (target_slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2574 (GstPadProbeCallback) idle_reconfigure, target_slot, NULL);
2575 /* gst_pad_send_event (target_slot->src_pad, gst_event_new_reconfigure ()); */
2576 } else {
2577 GstMessage *msg;
2578
2579 dbin->output_streams = g_list_remove (dbin->output_streams, output);
2580 free_output_stream (dbin, output);
2581 msg = is_selection_done (slot->dbin);
2582 SELECTION_UNLOCK (dbin);
2583
2584 if (msg)
2585 gst_element_post_message ((GstElement *) slot->dbin, msg);
2586 }
2587
2588 return TRUE;
2589 }
2590
2591 /* Idle probe called when a slot should be unassigned from its output stream.
2592 * This is needed to ensure nothing is flowing when unlinking the slot.
2593 *
2594 * Also, this method will search for a pending stream which could re-use
2595 * the output stream. */
2596 static GstPadProbeReturn
slot_unassign_probe(GstPad * pad,GstPadProbeInfo * info,MultiQueueSlot * slot)2597 slot_unassign_probe (GstPad * pad, GstPadProbeInfo * info,
2598 MultiQueueSlot * slot)
2599 {
2600 GstDecodebin3 *dbin = slot->dbin;
2601
2602 reassign_slot (dbin, slot);
2603
2604 return GST_PAD_PROBE_REMOVE;
2605 }
2606
2607 static gboolean
handle_stream_switch(GstDecodebin3 * dbin,GList * select_streams,guint32 seqnum)2608 handle_stream_switch (GstDecodebin3 * dbin, GList * select_streams,
2609 guint32 seqnum)
2610 {
2611 gboolean ret = TRUE;
2612 GList *tmp;
2613 /* List of slots to (de)activate. */
2614 GList *to_deactivate = NULL;
2615 GList *to_activate = NULL;
2616 /* List of unknown stream id, most likely means the event
2617 * should be sent upstream so that elements can expose the requested stream */
2618 GList *unknown = NULL;
2619 GList *to_reassign = NULL;
2620 GList *future_request_streams = NULL;
2621 GList *pending_streams = NULL;
2622 GList *slots_to_reassign = NULL;
2623
2624 SELECTION_LOCK (dbin);
2625 if (G_UNLIKELY (seqnum != dbin->select_streams_seqnum)) {
2626 GST_DEBUG_OBJECT (dbin, "New SELECT_STREAMS has arrived in the meantime");
2627 SELECTION_UNLOCK (dbin);
2628 return TRUE;
2629 }
2630 /* Remove pending select_streams */
2631 g_list_free (dbin->pending_select_streams);
2632 dbin->pending_select_streams = NULL;
2633
2634 /* COMPARE the requested streams to the active and requested streams
2635 * on multiqueue. */
2636
2637 /* First check the slots to activate and which ones are unknown */
2638 for (tmp = select_streams; tmp; tmp = tmp->next) {
2639 const gchar *sid = (const gchar *) tmp->data;
2640 MultiQueueSlot *slot;
2641 GST_DEBUG_OBJECT (dbin, "Checking stream '%s'", sid);
2642 slot = find_slot_for_stream_id (dbin, sid);
2643 /* Find the corresponding slot */
2644 if (slot == NULL) {
2645 if (stream_in_collection (dbin, (gchar *) sid)) {
2646 pending_streams = g_list_append (pending_streams, (gchar *) sid);
2647 } else {
2648 GST_DEBUG_OBJECT (dbin, "We don't have a slot for stream '%s'", sid);
2649 unknown = g_list_append (unknown, (gchar *) sid);
2650 }
2651 } else if (slot->output == NULL) {
2652 GST_DEBUG_OBJECT (dbin, "We need to activate slot %p for stream '%s')",
2653 slot, sid);
2654 to_activate = g_list_append (to_activate, slot);
2655 } else {
2656 GST_DEBUG_OBJECT (dbin,
2657 "Stream '%s' from slot %p is already active on output %p", sid, slot,
2658 slot->output);
2659 future_request_streams =
2660 g_list_append (future_request_streams, (gchar *) sid);
2661 }
2662 }
2663
2664 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
2665 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2666 /* For slots that have an output, check if it's part of the streams to
2667 * be active */
2668 if (slot->output) {
2669 gboolean slot_to_deactivate = TRUE;
2670
2671 if (slot->active_stream) {
2672 if (stream_in_list (select_streams,
2673 gst_stream_get_stream_id (slot->active_stream)))
2674 slot_to_deactivate = FALSE;
2675 }
2676 if (slot_to_deactivate && slot->pending_stream
2677 && slot->pending_stream != slot->active_stream) {
2678 if (stream_in_list (select_streams,
2679 gst_stream_get_stream_id (slot->pending_stream)))
2680 slot_to_deactivate = FALSE;
2681 }
2682 if (slot_to_deactivate) {
2683 GST_DEBUG_OBJECT (dbin,
2684 "Slot %p (%s) should be deactivated, no longer used", slot,
2685 slot->
2686 active_stream ? gst_stream_get_stream_id (slot->active_stream) :
2687 "NULL");
2688 to_deactivate = g_list_append (to_deactivate, slot);
2689 }
2690 }
2691 }
2692
2693 if (to_deactivate != NULL) {
2694 GST_DEBUG_OBJECT (dbin, "Check if we can reassign slots");
2695 /* We need to compare what needs to be activated and deactivated in order
2696 * to determine whether there are outputs that can be transferred */
2697 /* Take the stream-id of the slots that are to be activated, for which there
2698 * is a slot of the same type that needs to be deactivated */
2699 tmp = to_deactivate;
2700 while (tmp) {
2701 MultiQueueSlot *slot_to_deactivate = (MultiQueueSlot *) tmp->data;
2702 gboolean removeit = FALSE;
2703 GList *tmp2, *next;
2704 GST_DEBUG_OBJECT (dbin,
2705 "Checking if slot to deactivate (%p) has a candidate slot to activate",
2706 slot_to_deactivate);
2707 for (tmp2 = to_activate; tmp2; tmp2 = tmp2->next) {
2708 MultiQueueSlot *slot_to_activate = (MultiQueueSlot *) tmp2->data;
2709 GST_DEBUG_OBJECT (dbin, "Comparing to slot %p", slot_to_activate);
2710 if (slot_to_activate->type == slot_to_deactivate->type) {
2711 GST_DEBUG_OBJECT (dbin, "Re-using");
2712 to_reassign = g_list_append (to_reassign, (gchar *)
2713 gst_stream_get_stream_id (slot_to_activate->active_stream));
2714 slots_to_reassign =
2715 g_list_append (slots_to_reassign, slot_to_deactivate);
2716 to_activate = g_list_remove (to_activate, slot_to_activate);
2717 removeit = TRUE;
2718 break;
2719 }
2720 }
2721 next = tmp->next;
2722 if (removeit)
2723 to_deactivate = g_list_delete_link (to_deactivate, tmp);
2724 tmp = next;
2725 }
2726 }
2727
2728 for (tmp = to_deactivate; tmp; tmp = tmp->next) {
2729 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2730 GST_DEBUG_OBJECT (dbin,
2731 "Really need to deactivate slot %p, but no available alternative",
2732 slot);
2733
2734 slots_to_reassign = g_list_append (slots_to_reassign, slot);
2735 }
2736
2737 /* The only slots left to activate are the ones that won't be reassigned and
2738 * therefore really need to have a new output created */
2739 for (tmp = to_activate; tmp; tmp = tmp->next) {
2740 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2741 if (slot->active_stream)
2742 future_request_streams =
2743 g_list_append (future_request_streams,
2744 (gchar *) gst_stream_get_stream_id (slot->active_stream));
2745 else if (slot->pending_stream)
2746 future_request_streams =
2747 g_list_append (future_request_streams,
2748 (gchar *) gst_stream_get_stream_id (slot->pending_stream));
2749 else
2750 GST_ERROR_OBJECT (dbin, "No stream for slot %p !!", slot);
2751 }
2752
2753 if (to_activate == NULL && pending_streams != NULL) {
2754 GST_DEBUG_OBJECT (dbin, "Stream switch requested for future collection");
2755 if (dbin->requested_selection)
2756 g_list_free_full (dbin->requested_selection, g_free);
2757 dbin->requested_selection =
2758 g_list_copy_deep (select_streams, (GCopyFunc) g_strdup, NULL);
2759 g_list_free (to_deactivate);
2760 g_list_free (pending_streams);
2761 to_deactivate = NULL;
2762 pending_streams = NULL;
2763 } else {
2764 if (dbin->requested_selection)
2765 g_list_free_full (dbin->requested_selection, g_free);
2766 dbin->requested_selection =
2767 g_list_copy_deep (future_request_streams, (GCopyFunc) g_strdup, NULL);
2768 dbin->requested_selection =
2769 g_list_concat (dbin->requested_selection,
2770 g_list_copy_deep (pending_streams, (GCopyFunc) g_strdup, NULL));
2771 if (dbin->to_activate)
2772 g_list_free (dbin->to_activate);
2773 dbin->to_activate = g_list_copy (to_reassign);
2774 }
2775
2776 dbin->selection_updated = TRUE;
2777 SELECTION_UNLOCK (dbin);
2778
2779 if (unknown) {
2780 GST_FIXME_OBJECT (dbin, "Got request for an unknown stream");
2781 g_list_free (unknown);
2782 }
2783
2784 if (to_activate && !slots_to_reassign) {
2785 for (tmp = to_activate; tmp; tmp = tmp->next) {
2786 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2787 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2788 (GstPadProbeCallback) idle_reconfigure, slot, NULL);
2789 }
2790 }
2791
2792 /* For all streams to deactivate, add an idle probe where we will do
2793 * the unassignment and switch over */
2794 for (tmp = slots_to_reassign; tmp; tmp = tmp->next) {
2795 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
2796 gst_pad_add_probe (slot->src_pad, GST_PAD_PROBE_TYPE_IDLE,
2797 (GstPadProbeCallback) slot_unassign_probe, slot, NULL);
2798 }
2799
2800 if (to_deactivate)
2801 g_list_free (to_deactivate);
2802 if (to_activate)
2803 g_list_free (to_activate);
2804 if (to_reassign)
2805 g_list_free (to_reassign);
2806 if (future_request_streams)
2807 g_list_free (future_request_streams);
2808 if (pending_streams)
2809 g_list_free (pending_streams);
2810 if (slots_to_reassign)
2811 g_list_free (slots_to_reassign);
2812
2813 return ret;
2814 }
2815
2816 static GstPadProbeReturn
ghost_pad_event_probe(GstPad * pad,GstPadProbeInfo * info,DecodebinOutputStream * output)2817 ghost_pad_event_probe (GstPad * pad, GstPadProbeInfo * info,
2818 DecodebinOutputStream * output)
2819 {
2820 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
2821 GstDecodebin3 *dbin = output->dbin;
2822 GstEvent *event = GST_PAD_PROBE_INFO_EVENT (info);
2823
2824 GST_DEBUG_OBJECT (pad, "Got event %p %s", event, GST_EVENT_TYPE_NAME (event));
2825
2826 switch (GST_EVENT_TYPE (event)) {
2827 case GST_EVENT_SELECT_STREAMS:
2828 {
2829 GstPad *peer;
2830 GList *streams = NULL;
2831 guint32 seqnum = gst_event_get_seqnum (event);
2832
2833 SELECTION_LOCK (dbin);
2834 if (seqnum == dbin->select_streams_seqnum) {
2835 SELECTION_UNLOCK (dbin);
2836 GST_DEBUG_OBJECT (pad,
2837 "Already handled/handling that SELECT_STREAMS event");
2838 gst_event_unref (event);
2839 ret = GST_PAD_PROBE_HANDLED;
2840 break;
2841 }
2842 dbin->select_streams_seqnum = seqnum;
2843 if (dbin->pending_select_streams != NULL) {
2844 GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2845 g_list_free (dbin->pending_select_streams);
2846 dbin->pending_select_streams = NULL;
2847 }
2848 gst_event_parse_select_streams (event, &streams);
2849 dbin->pending_select_streams = g_list_copy (streams);
2850 SELECTION_UNLOCK (dbin);
2851
2852 /* Send event upstream */
2853 if ((peer = gst_pad_get_peer (pad))) {
2854 gst_pad_send_event (peer, event);
2855 gst_object_unref (peer);
2856 } else {
2857 gst_event_unref (event);
2858 }
2859 /* Finally handle the switch */
2860 if (streams) {
2861 handle_stream_switch (dbin, streams, seqnum);
2862 g_list_free_full (streams, g_free);
2863 }
2864 ret = GST_PAD_PROBE_HANDLED;
2865 }
2866 break;
2867 default:
2868 break;
2869 }
2870
2871 return ret;
2872 }
2873
2874 static gboolean
gst_decodebin3_send_event(GstElement * element,GstEvent * event)2875 gst_decodebin3_send_event (GstElement * element, GstEvent * event)
2876 {
2877 GST_DEBUG_OBJECT (element, "event %s", GST_EVENT_TYPE_NAME (event));
2878 if (GST_EVENT_TYPE (event) == GST_EVENT_SELECT_STREAMS) {
2879 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
2880 GList *streams = NULL;
2881 guint32 seqnum = gst_event_get_seqnum (event);
2882
2883 SELECTION_LOCK (dbin);
2884 if (seqnum == dbin->select_streams_seqnum) {
2885 SELECTION_UNLOCK (dbin);
2886 GST_DEBUG_OBJECT (dbin,
2887 "Already handled/handling that SELECT_STREAMS event");
2888 return TRUE;
2889 }
2890 dbin->select_streams_seqnum = seqnum;
2891 if (dbin->pending_select_streams != NULL) {
2892 GST_LOG_OBJECT (dbin, "Replacing pending select streams");
2893 g_list_free (dbin->pending_select_streams);
2894 dbin->pending_select_streams = NULL;
2895 }
2896 gst_event_parse_select_streams (event, &streams);
2897 dbin->pending_select_streams = g_list_copy (streams);
2898 SELECTION_UNLOCK (dbin);
2899
2900 /* FIXME : We don't have an upstream ?? */
2901 #if 0
2902 /* Send event upstream */
2903 if ((peer = gst_pad_get_peer (pad))) {
2904 gst_pad_send_event (peer, event);
2905 gst_object_unref (peer);
2906 }
2907 #endif
2908 /* Finally handle the switch */
2909 if (streams) {
2910 handle_stream_switch (dbin, streams, seqnum);
2911 g_list_free_full (streams, g_free);
2912 }
2913
2914 gst_event_unref (event);
2915 return TRUE;
2916 }
2917 return GST_ELEMENT_CLASS (parent_class)->send_event (element, event);
2918 }
2919
2920
2921 static void
free_multiqueue_slot(GstDecodebin3 * dbin,MultiQueueSlot * slot)2922 free_multiqueue_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2923 {
2924 if (slot->probe_id)
2925 gst_pad_remove_probe (slot->src_pad, slot->probe_id);
2926 if (slot->input) {
2927 if (slot->input->srcpad)
2928 gst_pad_unlink (slot->input->srcpad, slot->sink_pad);
2929 }
2930
2931 gst_element_release_request_pad (dbin->multiqueue, slot->sink_pad);
2932 gst_object_replace ((GstObject **) & slot->sink_pad, NULL);
2933 gst_object_replace ((GstObject **) & slot->src_pad, NULL);
2934 gst_object_replace ((GstObject **) & slot->active_stream, NULL);
2935 g_free (slot);
2936 }
2937
2938 static void
free_multiqueue_slot_async(GstDecodebin3 * dbin,MultiQueueSlot * slot)2939 free_multiqueue_slot_async (GstDecodebin3 * dbin, MultiQueueSlot * slot)
2940 {
2941 GST_LOG_OBJECT (dbin, "pushing multiqueue slot on thread pool to free");
2942 gst_element_call_async (GST_ELEMENT_CAST (dbin),
2943 (GstElementCallAsyncFunc) free_multiqueue_slot, slot, NULL);
2944 }
2945
2946 /* Create a DecodebinOutputStream for a given type
2947 * Note: It will be empty initially, it needs to be configured
2948 * afterwards */
2949 static DecodebinOutputStream *
create_output_stream(GstDecodebin3 * dbin,GstStreamType type)2950 create_output_stream (GstDecodebin3 * dbin, GstStreamType type)
2951 {
2952 DecodebinOutputStream *res = g_new0 (DecodebinOutputStream, 1);
2953 gchar *pad_name;
2954 const gchar *prefix;
2955 GstStaticPadTemplate *templ;
2956 GstPadTemplate *ptmpl;
2957 guint32 *counter;
2958 GstPad *internal_pad;
2959
2960 GST_DEBUG_OBJECT (dbin, "Created new output stream %p for type %s",
2961 res, gst_stream_type_get_name (type));
2962
2963 res->type = type;
2964 res->dbin = dbin;
2965 res->decoder_latency = GST_CLOCK_TIME_NONE;
2966
2967 if (type & GST_STREAM_TYPE_VIDEO) {
2968 templ = &video_src_template;
2969 counter = &dbin->vpadcount;
2970 prefix = "video";
2971 } else if (type & GST_STREAM_TYPE_AUDIO) {
2972 templ = &audio_src_template;
2973 counter = &dbin->apadcount;
2974 prefix = "audio";
2975 } else if (type & GST_STREAM_TYPE_TEXT) {
2976 templ = &text_src_template;
2977 counter = &dbin->tpadcount;
2978 prefix = "text";
2979 } else {
2980 templ = &src_template;
2981 counter = &dbin->opadcount;
2982 prefix = "src";
2983 }
2984
2985 pad_name = g_strdup_printf ("%s_%u", prefix, *counter);
2986 *counter += 1;
2987 ptmpl = gst_static_pad_template_get (templ);
2988 res->src_pad = gst_ghost_pad_new_no_target_from_template (pad_name, ptmpl);
2989 gst_object_unref (ptmpl);
2990 g_free (pad_name);
2991 gst_pad_set_active (res->src_pad, TRUE);
2992 /* Put an event probe on the internal proxy pad to detect upstream
2993 * events */
2994 internal_pad =
2995 (GstPad *) gst_proxy_pad_get_internal ((GstProxyPad *) res->src_pad);
2996 gst_pad_add_probe (internal_pad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
2997 (GstPadProbeCallback) ghost_pad_event_probe, res, NULL);
2998 gst_object_unref (internal_pad);
2999
3000 dbin->output_streams = g_list_append (dbin->output_streams, res);
3001
3002 return res;
3003 }
3004
3005 static void
free_output_stream(GstDecodebin3 * dbin,DecodebinOutputStream * output)3006 free_output_stream (GstDecodebin3 * dbin, DecodebinOutputStream * output)
3007 {
3008 if (output->slot) {
3009 if (output->decoder_sink && output->decoder)
3010 gst_pad_unlink (output->slot->src_pad, output->decoder_sink);
3011
3012 output->slot->output = NULL;
3013 output->slot = NULL;
3014 }
3015 gst_object_replace ((GstObject **) & output->decoder_sink, NULL);
3016 gst_ghost_pad_set_target ((GstGhostPad *) output->src_pad, NULL);
3017 gst_object_replace ((GstObject **) & output->decoder_src, NULL);
3018 if (output->src_exposed) {
3019 gst_element_remove_pad ((GstElement *) dbin, output->src_pad);
3020 }
3021 if (output->decoder) {
3022 gst_element_set_locked_state (output->decoder, TRUE);
3023 gst_element_set_state (output->decoder, GST_STATE_NULL);
3024 gst_bin_remove ((GstBin *) dbin, output->decoder);
3025 }
3026 g_free (output);
3027 }
3028
3029 static GstStateChangeReturn
gst_decodebin3_change_state(GstElement * element,GstStateChange transition)3030 gst_decodebin3_change_state (GstElement * element, GstStateChange transition)
3031 {
3032 GstDecodebin3 *dbin = (GstDecodebin3 *) element;
3033 GstStateChangeReturn ret;
3034
3035 /* Upwards */
3036 switch (transition) {
3037 default:
3038 break;
3039 }
3040 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
3041 if (ret == GST_STATE_CHANGE_FAILURE)
3042 goto beach;
3043
3044 switch (transition) {
3045 case GST_STATE_CHANGE_PAUSED_TO_READY:
3046 {
3047 GList *tmp;
3048
3049 /* Free output streams */
3050 for (tmp = dbin->output_streams; tmp; tmp = tmp->next) {
3051 DecodebinOutputStream *output = (DecodebinOutputStream *) tmp->data;
3052 free_output_stream (dbin, output);
3053 }
3054 g_list_free (dbin->output_streams);
3055 dbin->output_streams = NULL;
3056 /* Free multiqueue slots */
3057 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
3058 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
3059 free_multiqueue_slot (dbin, slot);
3060 }
3061 g_list_free (dbin->slots);
3062 dbin->slots = NULL;
3063 dbin->current_group_id = GST_GROUP_ID_INVALID;
3064 /* Free inputs */
3065 /* Reset the main input group id since it will get a new id on a new stream */
3066 dbin->main_input->group_id = GST_GROUP_ID_INVALID;
3067 /* Reset multiqueue to default interleave */
3068 g_object_set (dbin->multiqueue, "min-interleave-time",
3069 dbin->default_mq_min_interleave, NULL);
3070 dbin->current_mq_min_interleave = dbin->default_mq_min_interleave;
3071 }
3072 break;
3073 default:
3074 break;
3075 }
3076 beach:
3077 return ret;
3078 }
3079