1 /* GStreamer
2 *
3 * Copyright (C) <2015> Centricular Ltd
4 * @author: Edward Hervey <edward@centricular.com>
5 * @author: Jan Schmidt <jan@centricular.com>
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
16 *
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 #if 0
24 /* Not needed for now - we're including gstdecodebin3-parse.c into gstdecodebin3.c */
25 #ifdef HAVE_CONFIG_H
26 #include "config.h"
27 #endif
28
29 #include <glib.h>
30 #include <glib-object.h>
31 #include <glib/gprintf.h>
32 #include <gst/gst.h>
33 #include <gst/pbutils/pbutils.h>
34
35 #include "gstplayback.h"
36 #endif
37
38 #define CUSTOM_EOS_QUARK _custom_eos_quark_get ()
39 #define CUSTOM_EOS_QUARK_DATA "custom-eos"
40 static GQuark
_custom_eos_quark_get(void)41 _custom_eos_quark_get (void)
42 {
43 static gsize g_quark;
44
45 if (g_once_init_enter (&g_quark)) {
46 gsize quark = (gsize) g_quark_from_static_string ("decodebin3-custom-eos");
47 g_once_init_leave (&g_quark, quark);
48 }
49 return g_quark;
50 }
51
52 /* Streams that come from demuxers (input/upstream) */
53 /* FIXME : All this is hardcoded. Switch to tree of chains */
54 struct _DecodebinInputStream
55 {
56 GstDecodebin3 *dbin;
57 GstStream *pending_stream; /* Extra ref */
58 GstStream *active_stream;
59
60 DecodebinInput *input;
61
62 GstPad *srcpad; /* From demuxer */
63
64 /* id of the pad event probe */
65 gulong output_event_probe_id;
66
67 /* id of the buffer blocking probe on the input (demuxer src) pad */
68 gulong input_buffer_probe_id;
69
70 /* Whether we saw an EOS on input. This should be treated accordingly
71 * when the stream is no longer used */
72 gboolean saw_eos;
73 };
74
75 static void parsebin_pad_added_cb (GstElement * demux, GstPad * pad,
76 DecodebinInput * input);
77 static void parsebin_pad_removed_cb (GstElement * demux, GstPad * pad,
78 DecodebinInput * input);
79
80 /* WITH SELECTION_LOCK TAKEN! */
81 static gboolean
pending_pads_are_eos(DecodebinInput * input)82 pending_pads_are_eos (DecodebinInput * input)
83 {
84 GList *tmp;
85
86 for (tmp = input->pending_pads; tmp; tmp = tmp->next) {
87 PendingPad *ppad = (PendingPad *) tmp->data;
88 if (ppad->saw_eos == FALSE)
89 return FALSE;
90 }
91
92 return TRUE;
93 }
94
95 /* WITH SELECTION_LOCK TAKEN! */
96 static gboolean
all_inputs_are_eos(GstDecodebin3 * dbin)97 all_inputs_are_eos (GstDecodebin3 * dbin)
98 {
99 GList *tmp;
100 /* First check input streams */
101 for (tmp = dbin->input_streams; tmp; tmp = tmp->next) {
102 DecodebinInputStream *input = (DecodebinInputStream *) tmp->data;
103 if (input->saw_eos == FALSE)
104 return FALSE;
105 }
106
107 /* Check pending pads */
108 if (!pending_pads_are_eos (dbin->main_input))
109 return FALSE;
110 for (tmp = dbin->other_inputs; tmp; tmp = tmp->next)
111 if (!pending_pads_are_eos ((DecodebinInput *) tmp->data))
112 return FALSE;
113
114 GST_DEBUG_OBJECT (dbin, "All streams are EOS");
115 return TRUE;
116 }
117
118 /* WITH SELECTION_LOCK TAKEN! */
119 static void
check_all_streams_for_eos(GstDecodebin3 * dbin)120 check_all_streams_for_eos (GstDecodebin3 * dbin)
121 {
122 GList *tmp;
123 GList *outputpads = NULL;
124
125 if (!all_inputs_are_eos (dbin))
126 return;
127
128 /* We know all streams are EOS, properly clean up everything */
129
130 /* We grab all peer pads *while* the selection lock is taken and then we will
131 push EOS downstream with the selection lock released */
132 for (tmp = dbin->input_streams; tmp; tmp = tmp->next) {
133 DecodebinInputStream *input = (DecodebinInputStream *) tmp->data;
134 GstPad *peer = gst_pad_get_peer (input->srcpad);
135
136 /* Keep a reference to the peer pad */
137 if (peer)
138 outputpads = g_list_append (outputpads, peer);
139 }
140
141 SELECTION_UNLOCK (dbin);
142 /* */
143 for (tmp = outputpads; tmp; tmp = tmp->next) {
144 GstPad *peer = (GstPad *) tmp->data;
145
146 /* Send EOS and then remove elements */
147 gst_pad_send_event (peer, gst_event_new_eos ());
148 GST_FIXME_OBJECT (peer, "Remove input stream");
149 gst_object_unref (peer);
150 }
151 SELECTION_LOCK (dbin);
152
153 g_list_free (outputpads);
154 }
155
156 /* Get the intersection of parser caps and available (sorted) decoders */
157 static GstCaps *
get_parser_caps_filter(GstDecodebin3 * dbin,GstCaps * caps)158 get_parser_caps_filter (GstDecodebin3 * dbin, GstCaps * caps)
159 {
160 GList *tmp;
161 GstCaps *filter_caps;
162
163 /* If no filter was provided, it can handle anything */
164 if (!caps || gst_caps_is_any (caps))
165 return gst_caps_new_any ();
166
167 filter_caps = gst_caps_new_empty ();
168
169 g_mutex_lock (&dbin->factories_lock);
170 gst_decode_bin_update_factories_list (dbin);
171 for (tmp = dbin->decoder_factories; tmp; tmp = tmp->next) {
172 GstElementFactory *factory = (GstElementFactory *) tmp->data;
173 GstCaps *tcaps, *intersection;
174 const GList *tmps;
175
176 GST_LOG ("Trying factory %s",
177 gst_plugin_feature_get_name (GST_PLUGIN_FEATURE (factory)));
178 for (tmps = gst_element_factory_get_static_pad_templates (factory); tmps;
179 tmps = tmps->next) {
180 GstStaticPadTemplate *st = (GstStaticPadTemplate *) tmps->data;
181 if (st->direction != GST_PAD_SINK || st->presence != GST_PAD_ALWAYS)
182 continue;
183 tcaps = gst_static_pad_template_get_caps (st);
184 intersection =
185 gst_caps_intersect_full (tcaps, caps, GST_CAPS_INTERSECT_FIRST);
186 filter_caps = gst_caps_merge (filter_caps, intersection);
187 gst_caps_unref (tcaps);
188 }
189 }
190 g_mutex_unlock (&dbin->factories_lock);
191 GST_DEBUG_OBJECT (dbin, "Got filter caps %" GST_PTR_FORMAT, filter_caps);
192 return filter_caps;
193 }
194
195 static gboolean
check_parser_caps_filter(GstDecodebin3 * dbin,GstCaps * caps)196 check_parser_caps_filter (GstDecodebin3 * dbin, GstCaps * caps)
197 {
198 GList *tmp;
199 gboolean res = FALSE;
200
201 g_mutex_lock (&dbin->factories_lock);
202 gst_decode_bin_update_factories_list (dbin);
203 for (tmp = dbin->decoder_factories; tmp; tmp = tmp->next) {
204 GstElementFactory *factory = (GstElementFactory *) tmp->data;
205 GstCaps *tcaps;
206 const GList *tmps;
207
208 GST_LOG ("Trying factory %s",
209 gst_plugin_feature_get_name (GST_PLUGIN_FEATURE (factory)));
210 for (tmps = gst_element_factory_get_static_pad_templates (factory); tmps;
211 tmps = tmps->next) {
212 GstStaticPadTemplate *st = (GstStaticPadTemplate *) tmps->data;
213 if (st->direction != GST_PAD_SINK || st->presence != GST_PAD_ALWAYS)
214 continue;
215 tcaps = gst_static_pad_template_get_caps (st);
216 if (gst_caps_can_intersect (tcaps, caps)) {
217 res = TRUE;
218 gst_caps_unref (tcaps);
219 goto beach;
220 }
221 gst_caps_unref (tcaps);
222 }
223 }
224 beach:
225 g_mutex_unlock (&dbin->factories_lock);
226 GST_DEBUG_OBJECT (dbin, "Can intersect : %d", res);
227 return res;
228 }
229
230 /* Probe on the output of a parser chain (the last
231 * src pad) */
232 static GstPadProbeReturn
parse_chain_output_probe(GstPad * pad,GstPadProbeInfo * info,DecodebinInputStream * input)233 parse_chain_output_probe (GstPad * pad, GstPadProbeInfo * info,
234 DecodebinInputStream * input)
235 {
236 GstPadProbeReturn ret = GST_PAD_PROBE_OK;
237
238 if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
239 GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
240
241 GST_DEBUG_OBJECT (pad, "Got event %s", GST_EVENT_TYPE_NAME (ev));
242 switch (GST_EVENT_TYPE (ev)) {
243 case GST_EVENT_STREAM_START:
244 {
245 GstStream *stream = NULL;
246 guint group_id = GST_GROUP_ID_INVALID;
247
248 if (!gst_event_parse_group_id (ev, &group_id)) {
249 GST_FIXME_OBJECT (pad,
250 "Consider implementing group-id handling on stream-start event");
251 group_id = gst_util_group_id_next ();
252 }
253
254 GST_DEBUG_OBJECT (pad, "Got stream-start, group_id:%d, input %p",
255 group_id, input->input);
256 if (set_input_group_id (input->input, &group_id)) {
257 ev = gst_event_make_writable (ev);
258 gst_event_set_group_id (ev, group_id);
259 GST_PAD_PROBE_INFO_DATA (info) = ev;
260 }
261 input->saw_eos = FALSE;
262
263 gst_event_parse_stream (ev, &stream);
264 /* FIXME : Would we ever end up with a stream already set on the input ?? */
265 if (stream) {
266 if (input->active_stream != stream) {
267 MultiQueueSlot *slot;
268 if (input->active_stream)
269 gst_object_unref (input->active_stream);
270 input->active_stream = stream;
271 /* We have the beginning of a stream, get a multiqueue slot and link to it */
272 SELECTION_LOCK (input->dbin);
273 slot = get_slot_for_input (input->dbin, input);
274 link_input_to_slot (input, slot);
275 SELECTION_UNLOCK (input->dbin);
276 } else
277 gst_object_unref (stream);
278 }
279 }
280 break;
281 case GST_EVENT_CAPS:
282 {
283 GstCaps *caps = NULL;
284 gst_event_parse_caps (ev, &caps);
285 GST_DEBUG_OBJECT (pad, "caps %" GST_PTR_FORMAT, caps);
286 if (caps && input->active_stream)
287 gst_stream_set_caps (input->active_stream, caps);
288 }
289 break;
290 case GST_EVENT_EOS:
291 input->saw_eos = TRUE;
292 if (all_inputs_are_eos (input->dbin)) {
293 GST_DEBUG_OBJECT (pad, "real input pad, marking as EOS");
294 SELECTION_LOCK (input->dbin);
295 check_all_streams_for_eos (input->dbin);
296 SELECTION_UNLOCK (input->dbin);
297 } else {
298 GstPad *peer = gst_pad_get_peer (input->srcpad);
299 if (peer) {
300 /* Send custom-eos event to multiqueue slot */
301 GstEvent *event;
302
303 GST_DEBUG_OBJECT (pad,
304 "Got EOS end of input stream, post custom-eos");
305 event = gst_event_new_eos ();
306 gst_event_set_seqnum (event, gst_event_get_seqnum (ev));
307 gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (event),
308 CUSTOM_EOS_QUARK, (gchar *) CUSTOM_EOS_QUARK_DATA, NULL);
309 gst_pad_send_event (peer, event);
310 gst_object_unref (peer);
311 } else {
312 GST_FIXME_OBJECT (pad, "No peer, what should we do ?");
313 }
314 }
315 ret = GST_PAD_PROBE_DROP;
316 break;
317 case GST_EVENT_FLUSH_STOP:
318 GST_DEBUG_OBJECT (pad, "Clear saw_eos flag");
319 input->saw_eos = FALSE;
320 default:
321 break;
322 }
323 } else if (GST_IS_QUERY (GST_PAD_PROBE_INFO_DATA (info))) {
324 GstQuery *q = GST_PAD_PROBE_INFO_QUERY (info);
325 GST_DEBUG_OBJECT (pad, "Seeing query %s", GST_QUERY_TYPE_NAME (q));
326 /* If we have a parser, we want to reply to the caps query */
327 /* FIXME: Set a flag when the input stream is created for
328 * streams where we shouldn't reply to these queries */
329 if (GST_QUERY_TYPE (q) == GST_QUERY_CAPS
330 && (info->type & GST_PAD_PROBE_TYPE_PULL)) {
331 GstCaps *filter = NULL;
332 GstCaps *allowed;
333 gst_query_parse_caps (q, &filter);
334 allowed = get_parser_caps_filter (input->dbin, filter);
335 GST_DEBUG_OBJECT (pad,
336 "Intercepting caps query, setting %" GST_PTR_FORMAT, allowed);
337 gst_query_set_caps_result (q, allowed);
338 gst_caps_unref (allowed);
339 ret = GST_PAD_PROBE_HANDLED;
340 } else if (GST_QUERY_TYPE (q) == GST_QUERY_ACCEPT_CAPS) {
341 GstCaps *prop = NULL;
342 gst_query_parse_accept_caps (q, &prop);
343 /* Fast check against target caps */
344 if (gst_caps_can_intersect (prop, input->dbin->caps))
345 gst_query_set_accept_caps_result (q, TRUE);
346 else {
347 gboolean accepted = check_parser_caps_filter (input->dbin, prop);
348 /* check against caps filter */
349 gst_query_set_accept_caps_result (q, accepted);
350 GST_DEBUG_OBJECT (pad, "ACCEPT_CAPS query, returning %d", accepted);
351 }
352 ret = GST_PAD_PROBE_HANDLED;
353 }
354 }
355
356 return ret;
357 }
358
359 static DecodebinInputStream *
create_input_stream(GstDecodebin3 * dbin,GstStream * stream,GstPad * pad,DecodebinInput * input)360 create_input_stream (GstDecodebin3 * dbin, GstStream * stream, GstPad * pad,
361 DecodebinInput * input)
362 {
363 DecodebinInputStream *res = g_new0 (DecodebinInputStream, 1);
364
365 GST_DEBUG_OBJECT (pad, "Creating input stream for stream %p %s (input:%p)",
366 stream, gst_stream_get_stream_id (stream), input);
367
368 res->dbin = dbin;
369 res->input = input;
370 res->pending_stream = gst_object_ref (stream);
371 res->srcpad = pad;
372
373 /* Put probe on output source pad (for detecting EOS/STREAM_START/FLUSH) */
374 res->output_event_probe_id =
375 gst_pad_add_probe (pad,
376 GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM | GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM
377 | GST_PAD_PROBE_TYPE_EVENT_FLUSH,
378 (GstPadProbeCallback) parse_chain_output_probe, res, NULL);
379
380 /* Add to list of current input streams */
381 SELECTION_LOCK (dbin);
382 dbin->input_streams = g_list_append (dbin->input_streams, res);
383 SELECTION_UNLOCK (dbin);
384 GST_DEBUG_OBJECT (pad, "Done creating input stream");
385
386 return res;
387 }
388
389 /* WITH SELECTION_LOCK TAKEN! */
390 static void
remove_input_stream(GstDecodebin3 * dbin,DecodebinInputStream * stream)391 remove_input_stream (GstDecodebin3 * dbin, DecodebinInputStream * stream)
392 {
393 MultiQueueSlot *slot;
394
395 GST_DEBUG_OBJECT (dbin, "Removing input stream %p (%s)", stream,
396 stream->active_stream ? gst_stream_get_stream_id (stream->active_stream) :
397 "<NONE>");
398
399 /* Unlink from slot */
400 if (stream->srcpad) {
401 GstPad *peer;
402 peer = gst_pad_get_peer (stream->srcpad);
403 if (peer) {
404 gst_pad_unlink (stream->srcpad, peer);
405 gst_object_unref (peer);
406 }
407 }
408
409 slot = get_slot_for_input (dbin, stream);
410 if (slot) {
411 slot->pending_stream = NULL;
412 slot->input = NULL;
413 GST_DEBUG_OBJECT (dbin, "slot %p cleared", slot);
414 }
415
416 if (stream->active_stream)
417 gst_object_unref (stream->active_stream);
418 if (stream->pending_stream)
419 gst_object_unref (stream->pending_stream);
420
421 dbin->input_streams = g_list_remove (dbin->input_streams, stream);
422
423 g_free (stream);
424 }
425
426 static void
unblock_pending_input(DecodebinInput * input)427 unblock_pending_input (DecodebinInput * input)
428 {
429 GstDecodebin3 *dbin = input->dbin;
430 GList *tmp, *unused_slot = NULL;
431
432 /* 1. Re-use existing streams if/when possible */
433 GST_FIXME_OBJECT (dbin, "Re-use existing input streams if/when possible");
434
435 /* 2. Remove unused streams (push EOS) */
436 GST_DEBUG_OBJECT (dbin, "Removing unused streams");
437 SELECTION_LOCK (dbin);
438 tmp = dbin->input_streams;
439 while (tmp != NULL) {
440 DecodebinInputStream *input_stream = (DecodebinInputStream *) tmp->data;
441 GList *next = tmp->next;
442
443 if (input_stream->input != input) {
444 tmp = next;
445 continue;
446 }
447
448 GST_DEBUG_OBJECT (dbin, "Checking input stream %p", input_stream);
449 if (input_stream->input_buffer_probe_id) {
450 GST_DEBUG_OBJECT (dbin,
451 "Removing pad block on input %p pad %" GST_PTR_FORMAT, input_stream,
452 input_stream->srcpad);
453 gst_pad_remove_probe (input_stream->srcpad,
454 input_stream->input_buffer_probe_id);
455 }
456 input_stream->input_buffer_probe_id = 0;
457
458 if (input_stream->saw_eos) {
459 remove_input_stream (dbin, input_stream);
460 tmp = dbin->input_streams;
461 } else
462 tmp = next;
463 }
464 SELECTION_UNLOCK (dbin);
465
466 GST_DEBUG_OBJECT (dbin, "Creating new streams (if needed)");
467 /* 3. Create new streams */
468 for (tmp = input->pending_pads; tmp; tmp = tmp->next) {
469 GstStream *stream;
470 PendingPad *ppad = (PendingPad *) tmp->data;
471
472 stream = gst_pad_get_stream (ppad->pad);
473 if (stream == NULL) {
474 GST_ERROR_OBJECT (dbin, "No stream for pad ????");
475 } else {
476 MultiQueueSlot *slot;
477 DecodebinInputStream *input_stream;
478 /* The remaining pads in pending_pads are the ones that require a new
479 * input stream */
480 input_stream = create_input_stream (dbin, stream, ppad->pad, ppad->input);
481 /* See if we can link it straight away */
482 input_stream->active_stream = stream;
483
484 SELECTION_LOCK (dbin);
485 slot = get_slot_for_input (dbin, input_stream);
486 link_input_to_slot (input_stream, slot);
487 SELECTION_UNLOCK (dbin);
488
489 /* Remove the buffer and event probe */
490 gst_pad_remove_probe (ppad->pad, ppad->buffer_probe);
491 gst_pad_remove_probe (ppad->pad, ppad->event_probe);
492 g_free (ppad);
493 }
494 }
495
496 g_list_free (input->pending_pads);
497 input->pending_pads = NULL;
498
499 /* Weed out unused multiqueue slots */
500 SELECTION_LOCK (dbin);
501 for (tmp = dbin->slots; tmp; tmp = tmp->next) {
502 MultiQueueSlot *slot = (MultiQueueSlot *) tmp->data;
503 GST_LOG_OBJECT (dbin, "Slot %d input:%p", slot->id, slot->input);
504 if (slot->input == NULL) {
505 unused_slot =
506 g_list_append (unused_slot, gst_object_ref (slot->sink_pad));
507 }
508 }
509 SELECTION_UNLOCK (dbin);
510
511 for (tmp = unused_slot; tmp; tmp = tmp->next) {
512 GstPad *sink_pad = (GstPad *) tmp->data;
513 GST_DEBUG_OBJECT (sink_pad, "Sending EOS to unused slot");
514 gst_pad_send_event (sink_pad, gst_event_new_eos ());
515 }
516
517 if (unused_slot)
518 g_list_free_full (unused_slot, (GDestroyNotify) gst_object_unref);
519
520 }
521
522 /* FIXME : HACK, REMOVE, USE INPUT CHAINS */
523 static GstPadProbeReturn
parsebin_buffer_probe(GstPad * pad,GstPadProbeInfo * info,DecodebinInput * input)524 parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
525 DecodebinInput * input)
526 {
527 /* Any data out the demuxer means it's not creating pads
528 * any more right now */
529 GST_DEBUG_OBJECT (pad, "Got a buffer ! UNBLOCK !");
530 unblock_pending_input (input);
531
532 return GST_PAD_PROBE_OK;
533 }
534
535 static GstPadProbeReturn
parsebin_pending_event_probe(GstPad * pad,GstPadProbeInfo * info,PendingPad * ppad)536 parsebin_pending_event_probe (GstPad * pad, GstPadProbeInfo * info,
537 PendingPad * ppad)
538 {
539 GstDecodebin3 *dbin = ppad->dbin;
540 /* We drop all events by default */
541 GstPadProbeReturn ret = GST_PAD_PROBE_DROP;
542 GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
543
544 GST_DEBUG_OBJECT (pad, "Got event %p %s", ev, GST_EVENT_TYPE_NAME (ev));
545 switch (GST_EVENT_TYPE (ev)) {
546 case GST_EVENT_EOS:
547 {
548 GST_DEBUG_OBJECT (pad, "Pending pad marked as EOS, removing");
549 ppad->input->pending_pads =
550 g_list_remove (ppad->input->pending_pads, ppad);
551 gst_pad_remove_probe (ppad->pad, ppad->buffer_probe);
552 gst_pad_remove_probe (ppad->pad, ppad->event_probe);
553 g_free (ppad);
554
555 SELECTION_LOCK (dbin);
556 check_all_streams_for_eos (dbin);
557 SELECTION_UNLOCK (dbin);
558 }
559 break;
560 case GST_EVENT_GAP:
561 GST_DEBUG_OBJECT (pad, "Got a gap event! UNBLOCK !");
562 unblock_pending_input (ppad->input);
563 ret = GST_PAD_PROBE_OK;
564 break;
565 default:
566 break;
567 }
568
569 return ret;
570 }
571
572 static void
parsebin_pad_added_cb(GstElement * demux,GstPad * pad,DecodebinInput * input)573 parsebin_pad_added_cb (GstElement * demux, GstPad * pad, DecodebinInput * input)
574 {
575 GstDecodebin3 *dbin = input->dbin;
576 PendingPad *ppad;
577 GList *tmp;
578
579 GST_DEBUG_OBJECT (dbin, "New pad %s:%s (input:%p)", GST_DEBUG_PAD_NAME (pad),
580 input);
581
582 ppad = g_new0 (PendingPad, 1);
583 ppad->dbin = dbin;
584 ppad->input = input;
585 ppad->pad = pad;
586
587 ppad->event_probe =
588 gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
589 (GstPadProbeCallback) parsebin_pending_event_probe, ppad, NULL);
590 ppad->buffer_probe =
591 gst_pad_add_probe (pad,
592 GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER,
593 (GstPadProbeCallback) parsebin_buffer_probe, input, NULL);
594
595 input->pending_pads = g_list_append (input->pending_pads, ppad);
596
597 /* Check if all existing input streams have a buffer probe set */
598 for (tmp = dbin->input_streams; tmp; tmp = tmp->next) {
599 DecodebinInputStream *input_stream = (DecodebinInputStream *) tmp->data;
600 if (input_stream->input == input &&
601 input_stream->input_buffer_probe_id == 0) {
602 GST_DEBUG_OBJECT (input_stream->srcpad, "Adding blocking buffer probe");
603 input_stream->input_buffer_probe_id =
604 gst_pad_add_probe (input_stream->srcpad,
605 GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER,
606 (GstPadProbeCallback) parsebin_buffer_probe, input_stream->input,
607 NULL);
608 }
609 }
610 }
611
612 static void
parsebin_pad_removed_cb(GstElement * demux,GstPad * pad,DecodebinInput * inp)613 parsebin_pad_removed_cb (GstElement * demux, GstPad * pad, DecodebinInput * inp)
614 {
615 GstDecodebin3 *dbin = inp->dbin;
616 DecodebinInputStream *input = NULL;
617 GList *tmp;
618 GST_DEBUG_OBJECT (pad, "removed");
619
620 for (tmp = dbin->input_streams; tmp; tmp = tmp->next) {
621 DecodebinInputStream *cand = (DecodebinInputStream *) tmp->data;
622 if (cand->srcpad == pad)
623 input = cand;
624 }
625 /* If there are no pending pads, this means we will definitely not need this
626 * stream anymore */
627 if (input) {
628 GST_DEBUG_OBJECT (pad, "stream %p", input);
629 if (inp->pending_pads == NULL) {
630 MultiQueueSlot *slot;
631
632 GST_DEBUG_OBJECT (pad, "Remove input stream %p", input);
633
634 SELECTION_LOCK (dbin);
635 slot = get_slot_for_input (dbin, input);
636
637 remove_input_stream (dbin, input);
638 if (slot && g_list_find (dbin->slots, slot) && slot->is_drained) {
639 /* if slot is still there and already drained, remove it in here */
640 if (slot->output) {
641 DecodebinOutputStream *output = slot->output;
642 GST_DEBUG_OBJECT (pad,
643 "Multiqueue was drained, Remove output stream");
644
645 dbin->output_streams = g_list_remove (dbin->output_streams, output);
646 free_output_stream (dbin, output);
647 }
648 GST_DEBUG_OBJECT (pad, "No pending pad, Remove multiqueue slot");
649 if (slot->probe_id)
650 gst_pad_remove_probe (slot->src_pad, slot->probe_id);
651 slot->probe_id = 0;
652 dbin->slots = g_list_remove (dbin->slots, slot);
653 free_multiqueue_slot_async (dbin, slot);
654 }
655 SELECTION_UNLOCK (dbin);
656 } else {
657 input->srcpad = NULL;
658 if (input->input_buffer_probe_id)
659 gst_pad_remove_probe (pad, input->input_buffer_probe_id);
660 input->input_buffer_probe_id = 0;
661 }
662 }
663 }
664