• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer Stream Combiner
2  * Copyright (C) 2010 Edward Hervey <edward.hervey@collabora.co.uk>
3  *           (C) 2009 Nokia Corporation
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 
21 #ifdef HAVE_CONFIG_H
22 #include "config.h"
23 #endif
24 
25 #include <gst/video/video.h>
26 #include "gststreamcombiner.h"
27 #include "gststreamcombinerpad.h"
28 
29 static GstStaticPadTemplate src_template =
30 GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS,
31     GST_STATIC_CAPS_ANY);
32 
33 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink_%u",
34     GST_PAD_SINK,
35     GST_PAD_REQUEST,
36     GST_STATIC_CAPS_ANY);
37 
38 GST_DEBUG_CATEGORY_STATIC (gst_stream_combiner_debug);
39 #define GST_CAT_DEFAULT gst_stream_combiner_debug
40 
41 G_DEFINE_TYPE (GstStreamCombiner, gst_stream_combiner, GST_TYPE_ELEMENT);
42 
43 G_DEFINE_TYPE (GstStreamCombinerPad, gst_stream_combiner_pad, GST_TYPE_PAD);
44 
45 #define STREAMS_LOCK(obj) (g_mutex_lock(&obj->lock))
46 #define STREAMS_UNLOCK(obj) (g_mutex_unlock(&obj->lock))
47 
48 static void gst_stream_combiner_finalize (GObject * object);
49 
50 static GstPad *gst_stream_combiner_request_new_pad (GstElement * element,
51     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
52 static void gst_stream_combiner_release_pad (GstElement * element,
53     GstPad * pad);
54 
55 static void
gst_stream_combiner_pad_class_init(GstStreamCombinerPadClass * klass)56 gst_stream_combiner_pad_class_init (GstStreamCombinerPadClass * klass)
57 {
58   return;
59 }
60 
61 static void
gst_stream_combiner_pad_init(GstStreamCombinerPad * mixerpad)62 gst_stream_combiner_pad_init (GstStreamCombinerPad * mixerpad)
63 {
64   mixerpad->is_eos = FALSE;
65 }
66 
67 static void
gst_stream_combiner_class_init(GstStreamCombinerClass * klass)68 gst_stream_combiner_class_init (GstStreamCombinerClass * klass)
69 {
70   GObjectClass *gobject_klass;
71   GstElementClass *gstelement_klass;
72 
73   gobject_klass = (GObjectClass *) klass;
74   gstelement_klass = (GstElementClass *) klass;
75 
76   gobject_klass->finalize = gst_stream_combiner_finalize;
77 
78   GST_DEBUG_CATEGORY_INIT (gst_stream_combiner_debug, "streamcombiner", 0,
79       "Stream Combiner");
80 
81   gst_element_class_add_static_pad_template (gstelement_klass, &src_template);
82   gst_element_class_add_static_pad_template (gstelement_klass, &sink_template);
83 
84   gstelement_klass->request_new_pad =
85       GST_DEBUG_FUNCPTR (gst_stream_combiner_request_new_pad);
86   gstelement_klass->release_pad =
87       GST_DEBUG_FUNCPTR (gst_stream_combiner_release_pad);
88 
89   gst_element_class_set_static_metadata (gstelement_klass,
90       "streamcombiner", "Generic",
91       "Recombines streams split by the streamsplitter element",
92       "Edward Hervey <edward.hervey@collabora.co.uk>");
93 }
94 
95 static void
gst_stream_combiner_finalize(GObject * object)96 gst_stream_combiner_finalize (GObject * object)
97 {
98   GstStreamCombiner *stream_combiner = (GstStreamCombiner *) object;
99 
100   g_mutex_clear (&stream_combiner->lock);
101 
102   G_OBJECT_CLASS (gst_stream_combiner_parent_class)->finalize (object);
103 }
104 
105 static GstFlowReturn
gst_stream_combiner_chain(GstPad * pad,GstObject * parent,GstBuffer * buf)106 gst_stream_combiner_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
107 {
108   GstStreamCombiner *stream_combiner = (GstStreamCombiner *) parent;
109   /* FIXME : IMPLEMENT */
110 
111   /* with lock taken, check if we're the active stream, if not drop */
112   return gst_pad_push (stream_combiner->srcpad, buf);
113 }
114 
115 static gboolean
_all_sink_pads_eos(GstStreamCombiner * combiner)116 _all_sink_pads_eos (GstStreamCombiner * combiner)
117 {
118   GList *tmp;
119 
120   for (tmp = combiner->sinkpads; tmp; tmp = tmp->next) {
121     if (!(GST_STREAM_COMBINER_PAD (tmp->data))->is_eos)
122       return FALSE;
123   }
124 
125   return TRUE;
126 }
127 
128 static gboolean
gst_stream_combiner_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)129 gst_stream_combiner_sink_event (GstPad * pad, GstObject * parent,
130     GstEvent * event)
131 {
132   GstStreamCombiner *stream_combiner = (GstStreamCombiner *) parent;
133   GstStreamCombinerPad *combiner_pad = GST_STREAM_COMBINER_PAD (pad);
134   /* FIXME : IMPLEMENT */
135 
136   GST_DEBUG_OBJECT (pad, "Got event %s", GST_EVENT_TYPE_NAME (event));
137 
138   switch (GST_EVENT_TYPE (event)) {
139     case GST_EVENT_CUSTOM_DOWNSTREAM:
140       STREAMS_LOCK (stream_combiner);
141       if (gst_structure_has_name (gst_event_get_structure (event),
142               "start-draining-encoder")) {
143         GST_INFO_OBJECT (pad, "Starting to drain the encoder");
144         stream_combiner->draining_encoder = TRUE;
145       }
146       STREAMS_UNLOCK (stream_combiner);
147       break;
148     case GST_EVENT_FLUSH_START:
149       STREAMS_LOCK (stream_combiner);
150       if (stream_combiner->draining_encoder) {
151         GST_INFO_OBJECT (pad, "Discarding FLUSH_START as draining encoder");
152         gst_clear_event (&event);
153       }
154       STREAMS_UNLOCK (stream_combiner);
155       break;
156     case GST_EVENT_FLUSH_STOP:
157       STREAMS_LOCK (stream_combiner);
158       if (stream_combiner->draining_encoder) {
159         gst_clear_event (&event);
160         GST_INFO_OBJECT (stream_combiner, "Done draining the encoder.");
161       }
162       stream_combiner->draining_encoder = FALSE;
163       STREAMS_UNLOCK (stream_combiner);
164       break;
165     case GST_EVENT_EOS:
166       STREAMS_LOCK (stream_combiner);
167       if (stream_combiner->draining_encoder) {
168         STREAMS_UNLOCK (stream_combiner);
169         GST_INFO_OBJECT (stream_combiner, "Discarding EOS as draining encoder");
170         gst_clear_event (&event);
171         break;
172       }
173       combiner_pad->is_eos = TRUE;
174       if (!_all_sink_pads_eos (stream_combiner)) {
175         gst_event_unref (event);
176         event = NULL;
177       } else {
178         GST_DEBUG_OBJECT (stream_combiner, "All sink pads eos, pushing eos");
179       }
180       STREAMS_UNLOCK (stream_combiner);
181       break;
182     default:
183       break;
184   }
185 
186   /* SEGMENT : lock, wait for other stream to EOS, select stream, unlock, push */
187   /* FLUSH_START : lock, mark as flushing, unlock. if wasn't flushing forward */
188   /* FLUSH_STOP : lock, unmark as flushing, unlock, if was flushing forward */
189   /* OTHER : if selected pad forward */
190   if (event)
191     return gst_pad_push_event (stream_combiner->srcpad, event);
192   return FALSE;
193 }
194 
195 static gboolean
gst_stream_combiner_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)196 gst_stream_combiner_sink_query (GstPad * pad, GstObject * parent,
197     GstQuery * query)
198 {
199   GstStreamCombiner *stream_combiner = (GstStreamCombiner *) parent;
200 
201   return gst_pad_peer_query (stream_combiner->srcpad, query);
202 }
203 
204 static gboolean
gst_stream_combiner_src_event(GstPad * pad,GstObject * parent,GstEvent * event)205 gst_stream_combiner_src_event (GstPad * pad, GstObject * parent,
206     GstEvent * event)
207 {
208   GstStreamCombiner *stream_combiner = (GstStreamCombiner *) parent;
209   GstPad *sinkpad = NULL;
210 
211   /* Forward force-key-unit event to all sinkpads */
212   if (gst_video_event_is_force_key_unit (event))
213     return gst_pad_event_default (pad, parent, event);
214 
215   STREAMS_LOCK (stream_combiner);
216   if (stream_combiner->current)
217     sinkpad = stream_combiner->current;
218   else if (stream_combiner->sinkpads)
219     sinkpad = (GstPad *) stream_combiner->sinkpads->data;
220   STREAMS_UNLOCK (stream_combiner);
221 
222   if (sinkpad)
223     /* Forward upstream as is */
224     return gst_pad_push_event (sinkpad, event);
225 
226   return FALSE;
227 }
228 
229 static gboolean
gst_stream_combiner_src_query(GstPad * pad,GstObject * parent,GstQuery * query)230 gst_stream_combiner_src_query (GstPad * pad, GstObject * parent,
231     GstQuery * query)
232 {
233   GstStreamCombiner *stream_combiner = (GstStreamCombiner *) parent;
234   GstPad *sinkpad = NULL;
235   gboolean ret = FALSE;
236 
237   switch (GST_QUERY_TYPE (query)) {
238     case GST_QUERY_CAPS:
239       ret = gst_pad_query_default (pad, parent, query);
240       break;
241     default:
242       STREAMS_LOCK (stream_combiner);
243       if (stream_combiner->current)
244         sinkpad = stream_combiner->current;
245       else if (stream_combiner->sinkpads)
246         sinkpad = (GstPad *) stream_combiner->sinkpads->data;
247       STREAMS_UNLOCK (stream_combiner);
248 
249       if (sinkpad)
250         /* Forward upstream as is */
251         ret = gst_pad_peer_query (sinkpad, query);
252       break;
253   }
254   return ret;
255 }
256 
257 static void
gst_stream_combiner_init(GstStreamCombiner * stream_combiner)258 gst_stream_combiner_init (GstStreamCombiner * stream_combiner)
259 {
260   stream_combiner->srcpad =
261       gst_pad_new_from_static_template (&src_template, "src");
262   gst_pad_set_event_function (stream_combiner->srcpad,
263       gst_stream_combiner_src_event);
264   gst_pad_set_query_function (stream_combiner->srcpad,
265       gst_stream_combiner_src_query);
266   gst_element_add_pad (GST_ELEMENT (stream_combiner), stream_combiner->srcpad);
267 
268   g_mutex_init (&stream_combiner->lock);
269 }
270 
271 static GstPad *
gst_stream_combiner_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * name,const GstCaps * caps)272 gst_stream_combiner_request_new_pad (GstElement * element,
273     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
274 {
275   GstStreamCombiner *stream_combiner = (GstStreamCombiner *) element;
276   GstStreamCombinerPad *combiner_pad;
277   GstPad *sinkpad;
278   GstElementClass *klass = GST_ELEMENT_GET_CLASS (element);
279   GstPadTemplate *template =
280       gst_element_class_get_pad_template (klass, "sink_%u");
281 
282   GST_DEBUG_OBJECT (element, "templ:%p, name:%s", templ, name);
283 
284   combiner_pad = g_object_new (GST_TYPE_STREAM_COMBINER_PAD, "name", name,
285       "template", template, "direction", template->direction, NULL);
286 
287   sinkpad = GST_PAD_CAST (combiner_pad);
288   gst_pad_set_chain_function (sinkpad, gst_stream_combiner_chain);
289   gst_pad_set_event_function (sinkpad, gst_stream_combiner_sink_event);
290   gst_pad_set_query_function (sinkpad, gst_stream_combiner_sink_query);
291 
292   STREAMS_LOCK (stream_combiner);
293   stream_combiner->sinkpads =
294       g_list_append (stream_combiner->sinkpads, sinkpad);
295   gst_pad_set_active (sinkpad, TRUE);
296   gst_element_add_pad (element, sinkpad);
297   stream_combiner->cookie++;
298   STREAMS_UNLOCK (stream_combiner);
299 
300   GST_DEBUG_OBJECT (element, "Returning pad %p", sinkpad);
301 
302   return sinkpad;
303 }
304 
305 static void
gst_stream_combiner_release_pad(GstElement * element,GstPad * pad)306 gst_stream_combiner_release_pad (GstElement * element, GstPad * pad)
307 {
308   GstStreamCombiner *stream_combiner = (GstStreamCombiner *) element;
309   GList *tmp;
310 
311   GST_DEBUG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
312 
313   STREAMS_LOCK (stream_combiner);
314   tmp = g_list_find (stream_combiner->sinkpads, pad);
315   if (tmp) {
316     GstPad *pad = (GstPad *) tmp->data;
317 
318     stream_combiner->sinkpads =
319         g_list_delete_link (stream_combiner->sinkpads, tmp);
320     stream_combiner->cookie++;
321 
322     if (pad == stream_combiner->current) {
323       /* Deactivate current flow */
324       GST_DEBUG_OBJECT (element, "Removed pad was the current one");
325       stream_combiner->current = NULL;
326     }
327     GST_DEBUG_OBJECT (element, "Removing pad from ourself");
328     gst_element_remove_pad (element, pad);
329   }
330   STREAMS_UNLOCK (stream_combiner);
331 
332   return;
333 }
334