• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  *
3  * Copyright (C) <2015> Centricular Ltd
4  *  @author: Edward Hervey <edward@centricular.com>
5  *  @author: Jan Schmidt <jan@centricular.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 #if 0
24 /* Not needed for now - we're including gstdecodebin3-parse.c into gstdecodebin3.c */
25 #ifdef HAVE_CONFIG_H
26 #include "config.h"
27 #endif
28 
29 #include <glib.h>
30 #include <glib-object.h>
31 #include <glib/gprintf.h>
32 #include <gst/gst.h>
33 #include <gst/pbutils/pbutils.h>
34 
35 #include "gstplayback.h"
36 #endif
37 
38 #define CUSTOM_EOS_QUARK _custom_eos_quark_get ()
39 #define CUSTOM_EOS_QUARK_DATA "custom-eos"
40 static GQuark
_custom_eos_quark_get(void)41 _custom_eos_quark_get (void)
42 {
43   static gsize g_quark;
44 
45   if (g_once_init_enter (&g_quark)) {
46     gsize quark = (gsize) g_quark_from_static_string ("decodebin3-custom-eos");
47     g_once_init_leave (&g_quark, quark);
48   }
49   return g_quark;
50 }
51 
52 /* Streams that come from demuxers (input/upstream) */
53 /* FIXME : All this is hardcoded. Switch to tree of chains */
54 struct _DecodebinInputStream
55 {
56   GstDecodebin3 *dbin;
57   GstStream *pending_stream;    /* Extra ref */
58   GstStream *active_stream;
59 
60   DecodebinInput *input;
61 
62   GstPad *srcpad;               /* From demuxer */
63 
64   /* id of the pad event probe */
65   gulong output_event_probe_id;
66 
67   /* id of the buffer blocking probe on the input (demuxer src) pad */
68   gulong input_buffer_probe_id;
69 
70   /* Whether we saw an EOS on input. This should be treated accordingly
71    * when the stream is no longer used */
72   gboolean saw_eos;
73 };
74 
75 static void parsebin_pad_added_cb (GstElement * demux, GstPad * pad,
76     DecodebinInput * input);
77 static void parsebin_pad_removed_cb (GstElement * demux, GstPad * pad,
78     DecodebinInput * input);
79 
80 /* WITH SELECTION_LOCK TAKEN! */
81 static gboolean
pending_pads_are_eos(DecodebinInput * input)82 pending_pads_are_eos (DecodebinInput * input)
83 {
84   GList *tmp;
85 
86   for (tmp = input->pending_pads; tmp; tmp = tmp->next) {
87     PendingPad *ppad = (PendingPad *) tmp->data;
88     if (ppad->saw_eos == FALSE)
89       return FALSE;
90   }
91 
92   return TRUE;
93 }
94 
95 /* WITH SELECTION_LOCK TAKEN! */
96 static gboolean
all_inputs_are_eos(GstDecodebin3 * dbin)97 all_inputs_are_eos (GstDecodebin3 * dbin)
98 {
99   GList *tmp;
100   /* First check input streams */
101   for (tmp = dbin->input_streams; tmp; tmp = tmp->next) {
102     DecodebinInputStream *input = (DecodebinInputStream *) tmp->data;
103     if (input->saw_eos == FALSE)
104       return FALSE;
105   }
106 
107   /* Check pending pads */
108   if (!pending_pads_are_eos (dbin->main_input))
109     return FALSE;
110   for (tmp = dbin->other_inputs; tmp; tmp = tmp->next)
111     if (!pending_pads_are_eos ((DecodebinInput *) tmp->data))
112       return FALSE;
113 
114   GST_DEBUG_OBJECT (dbin, "All streams are EOS");
115   return TRUE;
116 }
117 
118 /* WITH SELECTION_LOCK TAKEN! */
119 static void
check_all_streams_for_eos(GstDecodebin3 * dbin)120 check_all_streams_for_eos (GstDecodebin3 * dbin)
121 {
122   GList *tmp;
123   GList *outputpads = NULL;
124 
125   if (!all_inputs_are_eos (dbin))
126     return;
127 
128   /* We know all streams are EOS, properly clean up everything */
129 
130   /* We grab all peer pads *while* the selection lock is taken and then we will
131      push EOS downstream with the selection lock released */
132   for (tmp = dbin->input_streams; tmp; tmp = tmp->next) {
133     DecodebinInputStream *input = (DecodebinInputStream *) tmp->data;
134     GstPad *peer = gst_pad_get_peer (input->srcpad);
135 
136     /* Keep a reference to the peer pad */
137     if (peer)
138       outputpads = g_list_append (outputpads, peer);
139   }
140 
141   SELECTION_UNLOCK (dbin);
142   /*  */
143   for (tmp = outputpads; tmp; tmp = tmp->next) {
144     GstPad *peer = (GstPad *) tmp->data;
145 
146     /* Send EOS and then remove elements */
147     gst_pad_send_event (peer, gst_event_new_eos ());
148     GST_FIXME_OBJECT (peer, "Remove input stream");
149     gst_object_unref (peer);
150   }
151   SELECTION_LOCK (dbin);
152 
153   g_list_free (outputpads);
154 }
155 
156 /* Get the intersection of parser caps and available (sorted) decoders */
157 static GstCaps *
get_parser_caps_filter(GstDecodebin3 * dbin,GstCaps * caps)158 get_parser_caps_filter (GstDecodebin3 * dbin, GstCaps * caps)
159 {
160   GList *tmp;
161   GstCaps *filter_caps;
162 
163   /* If no filter was provided, it can handle anything */
164   if (!caps || gst_caps_is_any (caps))
165     return gst_caps_new_any ();
166 
167   filter_caps = gst_caps_new_empty ();
168 
169   g_mutex_lock (&dbin->factories_lock);
170   gst_decode_bin_update_factories_list (dbin);
171   for (tmp = dbin->decoder_factories; tmp; tmp = tmp->next) {
172     GstElementFactory *factory = (GstElementFactory *) tmp->data;
173     GstCaps *tcaps, *intersection;
174     const GList *tmps;
175 
176     GST_LOG ("Trying factory %s",
177         gst_plugin_feature_get_name (GST_PLUGIN_FEATURE (factory)));
178     for (tmps = gst_element_factory_get_static_pad_templates (factory); tmps;
179         tmps = tmps->next) {
180       GstStaticPadTemplate *st = (GstStaticPadTemplate *) tmps->data;
181       if (st->direction != GST_PAD_SINK || st->presence != GST_PAD_ALWAYS)
182         continue;
183       tcaps = gst_static_pad_template_get_caps (st);
184       intersection =
185           gst_caps_intersect_full (tcaps, caps, GST_CAPS_INTERSECT_FIRST);
186       filter_caps = gst_caps_merge (filter_caps, intersection);
187       gst_caps_unref (tcaps);
188     }
189   }
190   g_mutex_unlock (&dbin->factories_lock);
191   GST_DEBUG_OBJECT (dbin, "Got filter caps %" GST_PTR_FORMAT, filter_caps);
192   return filter_caps;
193 }
194 
195 static gboolean
check_parser_caps_filter(GstDecodebin3 * dbin,GstCaps * caps)196 check_parser_caps_filter (GstDecodebin3 * dbin, GstCaps * caps)
197 {
198   GList *tmp;
199   gboolean res = FALSE;
200 
201   g_mutex_lock (&dbin->factories_lock);
202   gst_decode_bin_update_factories_list (dbin);
203   for (tmp = dbin->decoder_factories; tmp; tmp = tmp->next) {
204     GstElementFactory *factory = (GstElementFactory *) tmp->data;
205     GstCaps *tcaps;
206     const GList *tmps;
207 
208     GST_LOG ("Trying factory %s",
209         gst_plugin_feature_get_name (GST_PLUGIN_FEATURE (factory)));
210     for (tmps = gst_element_factory_get_static_pad_templates (factory); tmps;
211         tmps = tmps->next) {
212       GstStaticPadTemplate *st = (GstStaticPadTemplate *) tmps->data;
213       if (st->direction != GST_PAD_SINK || st->presence != GST_PAD_ALWAYS)
214         continue;
215       tcaps = gst_static_pad_template_get_caps (st);
216       if (gst_caps_can_intersect (tcaps, caps)) {
217         res = TRUE;
218         gst_caps_unref (tcaps);
219         goto beach;
220       }
221       gst_caps_unref (tcaps);
222     }
223   }
224 beach:
225   g_mutex_unlock (&dbin->factories_lock);
226   GST_DEBUG_OBJECT (dbin, "Can intersect : %d", res);
227   return res;
228 }
229 
230 /* Probe on the output of a parser chain (the last
231  * src pad) */
232 static GstPadProbeReturn
parse_chain_output_probe(GstPad * pad,GstPadProbeInfo * info,DecodebinInputStream * input)233 parse_chain_output_probe (GstPad * pad, GstPadProbeInfo * info,
234     DecodebinInputStream * input)
235 {
236   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
237 
238   if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
239     GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
240 
241     GST_DEBUG_OBJECT (pad, "Got event %s", GST_EVENT_TYPE_NAME (ev));
242     switch (GST_EVENT_TYPE (ev)) {
243       case GST_EVENT_STREAM_START:
244       {
245         GstStream *stream = NULL;
246         guint group_id = GST_GROUP_ID_INVALID;
247 
248         if (!gst_event_parse_group_id (ev, &group_id)) {
249           GST_FIXME_OBJECT (pad,
250               "Consider implementing group-id handling on stream-start event");
251           group_id = gst_util_group_id_next ();
252         }
253 
254         GST_DEBUG_OBJECT (pad, "Got stream-start, group_id:%d, input %p",
255             group_id, input->input);
256         if (set_input_group_id (input->input, &group_id)) {
257           ev = gst_event_make_writable (ev);
258           gst_event_set_group_id (ev, group_id);
259           GST_PAD_PROBE_INFO_DATA (info) = ev;
260         }
261         input->saw_eos = FALSE;
262 
263         gst_event_parse_stream (ev, &stream);
264         /* FIXME : Would we ever end up with a stream already set on the input ?? */
265         if (stream) {
266           if (input->active_stream != stream) {
267             MultiQueueSlot *slot;
268             if (input->active_stream)
269               gst_object_unref (input->active_stream);
270             input->active_stream = stream;
271             /* We have the beginning of a stream, get a multiqueue slot and link to it */
272             SELECTION_LOCK (input->dbin);
273             slot = get_slot_for_input (input->dbin, input);
274             link_input_to_slot (input, slot);
275             SELECTION_UNLOCK (input->dbin);
276           } else
277             gst_object_unref (stream);
278         }
279       }
280         break;
281       case GST_EVENT_CAPS:
282       {
283         GstCaps *caps = NULL;
284         gst_event_parse_caps (ev, &caps);
285         GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
286         if (caps && input->active_stream)
287           gst_stream_set_caps (input->active_stream, caps);
288       }
289         break;
290       case GST_EVENT_EOS:
291         input->saw_eos = TRUE;
292         if (all_inputs_are_eos (input->dbin)) {
293           GST_DEBUG_OBJECT (pad, "real input pad, marking as EOS");
294           SELECTION_LOCK (input->dbin);
295           check_all_streams_for_eos (input->dbin);
296           SELECTION_UNLOCK (input->dbin);
297         } else {
298           GstPad *peer = gst_pad_get_peer (input->srcpad);
299           if (peer) {
300             /* Send custom-eos event to multiqueue slot */
301             GstEvent *event;
302 
303             GST_DEBUG_OBJECT (pad,
304                 "Got EOS end of input stream, post custom-eos");
305             event = gst_event_new_eos ();
306             gst_event_set_seqnum (event, gst_event_get_seqnum (ev));
307             gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (event),
308                 CUSTOM_EOS_QUARK, (gchar *) CUSTOM_EOS_QUARK_DATA, NULL);
309             gst_pad_send_event (peer, event);
310             gst_object_unref (peer);
311           } else {
312             GST_FIXME_OBJECT (pad, "No peer, what should we do ?");
313           }
314         }
315         ret = GST_PAD_PROBE_DROP;
316         break;
317       case GST_EVENT_FLUSH_STOP:
318         GST_DEBUG_OBJECT (pad, "Clear saw_eos flag");
319         input->saw_eos = FALSE;
320       default:
321         break;
322     }
323   } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
324     GstQuery *q = GST_PAD_PROBE_INFO_QUERY (info);
325     GST_DEBUG_OBJECT (pad, "Seeing query %s", GST_QUERY_TYPE_NAME (q));
326     /* If we have a parser, we want to reply to the caps query */
327     /* FIXME: Set a flag when the input stream is created for
328      * streams where we shouldn't reply to these queries */
329     if (GST_QUERY_TYPE (q) == GST_QUERY_CAPS
330         && (info->type & GST_PAD_PROBE_TYPE_PULL)) {
331       GstCaps *filter = NULL;
332       GstCaps *allowed;
333       gst_query_parse_caps (q, &filter);
334       allowed = get_parser_caps_filter (input->dbin, filter);
335       GST_DEBUG_OBJECT (pad,
336           "Intercepting caps query, setting %" GST_PTR_FORMAT, allowed);
337       gst_query_set_caps_result (q, allowed);
338       gst_caps_unref (allowed);
339       ret = GST_PAD_PROBE_HANDLED;
340     } else if (GST_QUERY_TYPE (q) == GST_QUERY_ACCEPT_CAPS) {
341       GstCaps *prop = NULL;
342       gst_query_parse_accept_caps (q, &prop);
343       /* Fast check against target caps */
344       if (gst_caps_can_intersect (prop, input->dbin->caps))
345         gst_query_set_accept_caps_result (q, TRUE);
346       else {
347         gboolean accepted = check_parser_caps_filter (input->dbin, prop);
348         /* check against caps filter */
349         gst_query_set_accept_caps_result (q, accepted);
350         GST_DEBUG_OBJECT (pad, "ACCEPT_CAPS query, returning %d", accepted);
351       }
352       ret = GST_PAD_PROBE_HANDLED;
353     }
354   }
355 
356   return ret;
357 }
358 
359 static DecodebinInputStream *
create_input_stream(GstDecodebin3 * dbin,GstStream * stream,GstPad * pad,DecodebinInput * input)360 create_input_stream (GstDecodebin3 * dbin, GstStream * stream, GstPad * pad,
361     DecodebinInput * input)
362 {
363   DecodebinInputStream *res = g_new0 (DecodebinInputStream, 1);
364 
365   GST_DEBUG_OBJECT (pad, "Creating input stream for stream %p %s (input:%p)",
366       stream, gst_stream_get_stream_id (stream), input);
367 
368   res->dbin = dbin;
369   res->input = input;
370   res->pending_stream = gst_object_ref (stream);
371   res->srcpad = pad;
372 
373   /* Put probe on output source pad (for detecting EOS/STREAM_START/FLUSH) */
374   res->output_event_probe_id =
375       gst_pad_add_probe (pad,
376       GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM
377       | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
378       (GstPadProbeCallback) parse_chain_output_probe, res, NULL);
379 
380   /* Add to list of current input streams */
381   SELECTION_LOCK (dbin);
382   dbin->input_streams = g_list_append (dbin->input_streams, res);
383   SELECTION_UNLOCK (dbin);
384   GST_DEBUG_OBJECT (pad, "Done creating input stream");
385 
386   return res;
387 }
388 
389 /* WITH SELECTION_LOCK TAKEN! */
390 static void
remove_input_stream(GstDecodebin3 * dbin,DecodebinInputStream * stream)391 remove_input_stream (GstDecodebin3 * dbin, DecodebinInputStream * stream)
392 {
393   MultiQueueSlot *slot;
394 
395   GST_DEBUG_OBJECT (dbin, "Removing input stream %p (%s)", stream,
396       stream->active_stream ? gst_stream_get_stream_id (stream->active_stream) :
397       "<NONE>");
398 
399   /* Unlink from slot */
400   if (stream->srcpad) {
401     GstPad *peer;
402     peer = gst_pad_get_peer (stream->srcpad);
403     if (peer) {
404       gst_pad_unlink (stream->srcpad, peer);
405       gst_object_unref (peer);
406     }
407   }
408 
409   slot = get_slot_for_input (dbin, stream);
410   if (slot) {
411     slot->pending_stream = NULL;
412     slot->input = NULL;
413     GST_DEBUG_OBJECT (dbin, "slot %p cleared", slot);
414   }
415 
416   if (stream->active_stream)
417     gst_object_unref (stream->active_stream);
418   if (stream->pending_stream)
419     gst_object_unref (stream->pending_stream);
420 
421   dbin->input_streams = g_list_remove (dbin->input_streams, stream);
422 
423   g_free (stream);
424 }
425 
426 static void
unblock_pending_input(DecodebinInput * input)427 unblock_pending_input (DecodebinInput * input)
428 {
429   GstDecodebin3 *dbin = input->dbin;
430   GList *tmp, *unused_slot = NULL;
431 
432   /* 1. Re-use existing streams if/when possible */
433   GST_FIXME_OBJECT (dbin, "Re-use existing input streams if/when possible");
434 
435   /* 2. Remove unused streams (push EOS) */
436   GST_DEBUG_OBJECT (dbin, "Removing unused streams");
437   SELECTION_LOCK (dbin);
438   tmp = dbin->input_streams;
439   while (tmp != NULL) {
440     DecodebinInputStream *input_stream = (DecodebinInputStream *) tmp->data;
441     GList *next = tmp->next;
442 
443     if (input_stream->input != input) {
444       tmp = next;
445       continue;
446     }
447 
448     GST_DEBUG_OBJECT (dbin, "Checking input stream %p", input_stream);
449     if (input_stream->input_buffer_probe_id) {
450       GST_DEBUG_OBJECT (dbin,
451           "Removing pad block on input %p pad %" GST_PTR_FORMAT, input_stream,
452           input_stream->srcpad);
453       gst_pad_remove_probe (input_stream->srcpad,
454           input_stream->input_buffer_probe_id);
455     }
456     input_stream->input_buffer_probe_id = 0;
457 
458     if (input_stream->saw_eos) {
459       remove_input_stream (dbin, input_stream);
460       tmp = dbin->input_streams;
461     } else
462       tmp = next;
463   }
464   SELECTION_UNLOCK (dbin);
465 
466   GST_DEBUG_OBJECT (dbin, "Creating new streams (if needed)");
467   /* 3. Create new streams */
468   for (tmp = input->pending_pads; tmp; tmp = tmp->next) {
469     GstStream *stream;
470     PendingPad *ppad = (PendingPad *) tmp->data;
471 
472     stream = gst_pad_get_stream (ppad->pad);
473     if (stream == NULL) {
474       GST_ERROR_OBJECT (dbin, "No stream for pad ????");
475     } else {
476       MultiQueueSlot *slot;
477       DecodebinInputStream *input_stream;
478       /* The remaining pads in pending_pads are the ones that require a new
479        * input stream */
480       input_stream = create_input_stream (dbin, stream, ppad->pad, ppad->input);
481       /* See if we can link it straight away */
482       input_stream->active_stream = stream;
483 
484       SELECTION_LOCK (dbin);
485       slot = get_slot_for_input (dbin, input_stream);
486       link_input_to_slot (input_stream, slot);
487       SELECTION_UNLOCK (dbin);
488 
489       /* Remove the buffer and event probe */
490       gst_pad_remove_probe (ppad->pad, ppad->buffer_probe);
491       gst_pad_remove_probe (ppad->pad, ppad->event_probe);
492       g_free (ppad);
493     }
494   }
495 
496   g_list_free (input->pending_pads);
497   input->pending_pads = NULL;
498 
499   /* Weed out unused multiqueue slots */
500   SELECTION_LOCK (dbin);
501   for (tmp = dbin->slots; tmp; tmp = tmp->next) {
502     MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
503     GST_LOG_OBJECT (dbin, "Slot %d input:%p", slot->id, slot->input);
504     if (slot->input == NULL) {
505       unused_slot =
506           g_list_append (unused_slot, gst_object_ref (slot->sink_pad));
507     }
508   }
509   SELECTION_UNLOCK (dbin);
510 
511   for (tmp = unused_slot; tmp; tmp = tmp->next) {
512     GstPad *sink_pad = (GstPad *) tmp->data;
513     GST_DEBUG_OBJECT (sink_pad, "Sending EOS to unused slot");
514     gst_pad_send_event (sink_pad, gst_event_new_eos ());
515   }
516 
517   if (unused_slot)
518     g_list_free_full (unused_slot, (GDestroyNotify) gst_object_unref);
519 
520 }
521 
522 /* FIXME : HACK, REMOVE, USE INPUT CHAINS */
523 static GstPadProbeReturn
parsebin_buffer_probe(GstPad * pad,GstPadProbeInfo * info,DecodebinInput * input)524 parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
525     DecodebinInput * input)
526 {
527   /* Any data out the demuxer means it's not creating pads
528    * any more right now */
529   GST_DEBUG_OBJECT (pad, "Got a buffer ! UNBLOCK !");
530   unblock_pending_input (input);
531 
532   return GST_PAD_PROBE_OK;
533 }
534 
535 static GstPadProbeReturn
parsebin_pending_event_probe(GstPad * pad,GstPadProbeInfo * info,PendingPad * ppad)536 parsebin_pending_event_probe (GstPad * pad, GstPadProbeInfo * info,
537     PendingPad * ppad)
538 {
539   GstDecodebin3 *dbin = ppad->dbin;
540   /* We drop all events by default */
541   GstPadProbeReturn ret = GST_PAD_PROBE_DROP;
542   GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
543 
544   GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
545   switch (GST_EVENT_TYPE (ev)) {
546     case GST_EVENT_EOS:
547     {
548       GST_DEBUG_OBJECT (pad, "Pending pad marked as EOS, removing");
549       ppad->input->pending_pads =
550           g_list_remove (ppad->input->pending_pads, ppad);
551       gst_pad_remove_probe (ppad->pad, ppad->buffer_probe);
552       gst_pad_remove_probe (ppad->pad, ppad->event_probe);
553       g_free (ppad);
554 
555       SELECTION_LOCK (dbin);
556       check_all_streams_for_eos (dbin);
557       SELECTION_UNLOCK (dbin);
558     }
559       break;
560     case GST_EVENT_GAP:
561       GST_DEBUG_OBJECT (pad, "Got a gap event! UNBLOCK !");
562       unblock_pending_input (ppad->input);
563       ret = GST_PAD_PROBE_OK;
564       break;
565     default:
566       break;
567   }
568 
569   return ret;
570 }
571 
572 static void
parsebin_pad_added_cb(GstElement * demux,GstPad * pad,DecodebinInput * input)573 parsebin_pad_added_cb (GstElement * demux, GstPad * pad, DecodebinInput * input)
574 {
575   GstDecodebin3 *dbin = input->dbin;
576   PendingPad *ppad;
577   GList *tmp;
578 
579   GST_DEBUG_OBJECT (dbin, "New pad %s:%s (input:%p)", GST_DEBUG_PAD_NAME (pad),
580       input);
581 
582   ppad = g_new0 (PendingPad, 1);
583   ppad->dbin = dbin;
584   ppad->input = input;
585   ppad->pad = pad;
586 
587   ppad->event_probe =
588       gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
589       (GstPadProbeCallback) parsebin_pending_event_probe, ppad, NULL);
590   ppad->buffer_probe =
591       gst_pad_add_probe (pad,
592       GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER,
593       (GstPadProbeCallback) parsebin_buffer_probe, input, NULL);
594 
595   input->pending_pads = g_list_append (input->pending_pads, ppad);
596 
597   /* Check if all existing input streams have a buffer probe set */
598   for (tmp = dbin->input_streams; tmp; tmp = tmp->next) {
599     DecodebinInputStream *input_stream = (DecodebinInputStream *) tmp->data;
600     if (input_stream->input == input &&
601         input_stream->input_buffer_probe_id == 0) {
602       GST_DEBUG_OBJECT (input_stream->srcpad, "Adding blocking buffer probe");
603       input_stream->input_buffer_probe_id =
604           gst_pad_add_probe (input_stream->srcpad,
605           GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER,
606           (GstPadProbeCallback) parsebin_buffer_probe, input_stream->input,
607           NULL);
608     }
609   }
610 }
611 
612 static void
parsebin_pad_removed_cb(GstElement * demux,GstPad * pad,DecodebinInput * inp)613 parsebin_pad_removed_cb (GstElement * demux, GstPad * pad, DecodebinInput * inp)
614 {
615   GstDecodebin3 *dbin = inp->dbin;
616   DecodebinInputStream *input = NULL;
617   GList *tmp;
618   GST_DEBUG_OBJECT (pad, "removed");
619 
620   for (tmp = dbin->input_streams; tmp; tmp = tmp->next) {
621     DecodebinInputStream *cand = (DecodebinInputStream *) tmp->data;
622     if (cand->srcpad == pad)
623       input = cand;
624   }
625   /* If there are no pending pads, this means we will definitely not need this
626    * stream anymore */
627   if (input) {
628     GST_DEBUG_OBJECT (pad, "stream %p", input);
629     if (inp->pending_pads == NULL) {
630       MultiQueueSlot *slot;
631 
632       GST_DEBUG_OBJECT (pad, "Remove input stream %p", input);
633 
634       SELECTION_LOCK (dbin);
635       slot = get_slot_for_input (dbin, input);
636 
637       remove_input_stream (dbin, input);
638       if (slot && g_list_find (dbin->slots, slot) && slot->is_drained) {
639         /* if slot is still there and already drained, remove it in here */
640         if (slot->output) {
641           DecodebinOutputStream *output = slot->output;
642           GST_DEBUG_OBJECT (pad,
643               "Multiqueue was drained, Remove output stream");
644 
645           dbin->output_streams = g_list_remove (dbin->output_streams, output);
646           free_output_stream (dbin, output);
647         }
648         GST_DEBUG_OBJECT (pad, "No pending pad, Remove multiqueue slot");
649         if (slot->probe_id)
650           gst_pad_remove_probe (slot->src_pad, slot->probe_id);
651         slot->probe_id = 0;
652         dbin->slots = g_list_remove (dbin->slots, slot);
653         free_multiqueue_slot_async (dbin, slot);
654       }
655       SELECTION_UNLOCK (dbin);
656     } else {
657       input->srcpad = NULL;
658       if (input->input_buffer_probe_id)
659         gst_pad_remove_probe (pad, input->input_buffer_probe_id);
660       input->input_buffer_probe_id = 0;
661     }
662   }
663 }
664