• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 Centricular Ltd.
3  *   Author: Sebastian Dröge <sebastian@centricular.com>
4  *   Author: Nirbheek Chauhan <nirbheek@centricular.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 
22 /**
23  * SECTION:element-proxysink
24  * @title: proxysink
25  *
26  * Proxysink is a sink element that proxies events, queries, and buffers to
27  * another pipeline that contains a matching proxysrc element. The purpose is
28  * to allow two decoupled pipelines to function as though they are one without
29  * having to manually shuttle buffers, events, queries, etc between the two.
30  *
31  * This element also copies sticky events onto the matching proxysrc element.
32  *
33  * For example usage, see proxysrc.
34  */
35 
36 #ifdef HAVE_CONFIG_H
37 #include "config.h"
38 #endif
39 #include "gstproxysink.h"
40 #include "gstproxysrc.h"
41 #include "gstproxy-priv.h"
42 
43 #define GST_CAT_DEFAULT gst_proxy_sink_debug
44 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
45 
46 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
47     GST_PAD_SINK,
48     GST_PAD_ALWAYS,
49     GST_STATIC_CAPS_ANY);
50 
51 /* We're not subclassing from basesink because we don't want any of the special
52  * handling it has for events/queries/etc. We just pass-through everything. */
53 
54 /* Unlink proxysrc, we don't contain any elements so our parent is GstElement */
55 #define parent_class gst_proxy_sink_parent_class
56 G_DEFINE_TYPE (GstProxySink, gst_proxy_sink, GST_TYPE_ELEMENT);
57 GST_ELEMENT_REGISTER_DEFINE (proxysink, "proxysink", GST_RANK_NONE,
58     GST_TYPE_PROXY_SINK);
59 
60 static gboolean gst_proxy_sink_sink_query (GstPad * pad, GstObject * parent,
61     GstQuery * query);
62 static GstFlowReturn gst_proxy_sink_sink_chain (GstPad * pad,
63     GstObject * parent, GstBuffer * buffer);
64 static GstFlowReturn gst_proxy_sink_sink_chain_list (GstPad * pad,
65     GstObject * parent, GstBufferList * list);
66 static gboolean gst_proxy_sink_sink_event (GstPad * pad, GstObject * parent,
67     GstEvent * event);
68 
69 static GstStateChangeReturn gst_proxy_sink_change_state (GstElement * element,
70     GstStateChange transition);
71 
72 static gboolean gst_proxy_sink_send_event (GstElement * element,
73     GstEvent * event);
74 static gboolean gst_proxy_sink_query (GstElement * element, GstQuery * query);
75 
76 static void
gst_proxy_sink_class_init(GstProxySinkClass * klass)77 gst_proxy_sink_class_init (GstProxySinkClass * klass)
78 {
79   GstElementClass *gstelement_class = (GstElementClass *) klass;
80 
81   GST_DEBUG_CATEGORY_INIT (gst_proxy_sink_debug, "proxysink", 0, "proxy sink");
82 
83   gstelement_class->change_state = gst_proxy_sink_change_state;
84   gstelement_class->send_event = gst_proxy_sink_send_event;
85   gstelement_class->query = gst_proxy_sink_query;
86 
87   gst_element_class_add_pad_template (gstelement_class,
88       gst_static_pad_template_get (&sink_template));
89 
90   gst_element_class_set_static_metadata (gstelement_class, "Proxy Sink",
91       "Sink", "Proxy source for internal process communication",
92       "Sebastian Dröge <sebastian@centricular.com>");
93 }
94 
95 static void
gst_proxy_sink_init(GstProxySink * self)96 gst_proxy_sink_init (GstProxySink * self)
97 {
98   self->sinkpad = gst_pad_new_from_static_template (&sink_template, "sink");
99   gst_pad_set_chain_function (self->sinkpad,
100       GST_DEBUG_FUNCPTR (gst_proxy_sink_sink_chain));
101   gst_pad_set_chain_list_function (self->sinkpad,
102       GST_DEBUG_FUNCPTR (gst_proxy_sink_sink_chain_list));
103   gst_pad_set_event_function (self->sinkpad,
104       GST_DEBUG_FUNCPTR (gst_proxy_sink_sink_event));
105   gst_pad_set_query_function (self->sinkpad,
106       GST_DEBUG_FUNCPTR (gst_proxy_sink_sink_query));
107   gst_element_add_pad (GST_ELEMENT (self), self->sinkpad);
108 
109   GST_OBJECT_FLAG_SET (self, GST_ELEMENT_FLAG_SINK);
110 }
111 
112 static GstStateChangeReturn
gst_proxy_sink_change_state(GstElement * element,GstStateChange transition)113 gst_proxy_sink_change_state (GstElement * element, GstStateChange transition)
114 {
115   GstElementClass *gstelement_class =
116       GST_ELEMENT_CLASS (gst_proxy_sink_parent_class);
117   GstProxySink *self = GST_PROXY_SINK (element);
118   GstStateChangeReturn ret;
119 
120   switch (transition) {
121     case GST_STATE_CHANGE_READY_TO_PAUSED:
122       self->pending_sticky_events = FALSE;
123       break;
124     default:
125       break;
126   }
127 
128   ret = gstelement_class->change_state (element, transition);
129 
130   return ret;
131 }
132 
133 static gboolean
gst_proxy_sink_send_event(GstElement * element,GstEvent * event)134 gst_proxy_sink_send_event (GstElement * element, GstEvent * event)
135 {
136   GstProxySink *self = GST_PROXY_SINK (element);
137 
138   if (GST_EVENT_IS_UPSTREAM (event)) {
139     return gst_pad_push_event (self->sinkpad, event);
140   } else {
141     gst_event_unref (event);
142     return FALSE;
143   }
144 }
145 
146 static gboolean
gst_proxy_sink_query(GstElement * element,GstQuery * query)147 gst_proxy_sink_query (GstElement * element, GstQuery * query)
148 {
149   GstProxySink *self = GST_PROXY_SINK (element);
150 
151   if (GST_QUERY_IS_UPSTREAM (query)) {
152     return gst_pad_peer_query (self->sinkpad, query);
153   } else {
154     return FALSE;
155   }
156 }
157 
158 static gboolean
gst_proxy_sink_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)159 gst_proxy_sink_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
160 {
161   GstProxySink *self = GST_PROXY_SINK (parent);
162   GstProxySrc *src;
163   gboolean ret = FALSE;
164 
165   GST_LOG_OBJECT (pad, "Handling query of type '%s'",
166       gst_query_type_get_name (GST_QUERY_TYPE (query)));
167 
168   src = g_weak_ref_get (&self->proxysrc);
169   if (src) {
170     GstPad *srcpad;
171     srcpad = gst_proxy_src_get_internal_srcpad (src);
172 
173     ret = gst_pad_peer_query (srcpad, query);
174     gst_object_unref (srcpad);
175     gst_object_unref (src);
176   }
177 
178   return ret;
179 }
180 
181 typedef struct
182 {
183   GstPad *otherpad;
184   GstFlowReturn ret;
185 } CopyStickyEventsData;
186 
187 static gboolean
copy_sticky_events(G_GNUC_UNUSED GstPad * pad,GstEvent ** event,gpointer user_data)188 copy_sticky_events (G_GNUC_UNUSED GstPad * pad, GstEvent ** event,
189     gpointer user_data)
190 {
191   CopyStickyEventsData *data = user_data;
192 
193   data->ret = gst_pad_store_sticky_event (data->otherpad, *event);
194 
195   return data->ret == GST_FLOW_OK;
196 }
197 
198 static gboolean
gst_proxy_sink_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)199 gst_proxy_sink_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
200 {
201   GstProxySink *self = GST_PROXY_SINK (parent);
202   GstProxySrc *src;
203   gboolean ret = FALSE;
204   gboolean sticky = GST_EVENT_IS_STICKY (event);
205 
206   GST_LOG_OBJECT (pad, "Got %s event", GST_EVENT_TYPE_NAME (event));
207 
208   if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
209     self->pending_sticky_events = FALSE;
210 
211   src = g_weak_ref_get (&self->proxysrc);
212   if (src) {
213     GstPad *srcpad;
214     srcpad = gst_proxy_src_get_internal_srcpad (src);
215 
216     if (sticky && self->pending_sticky_events) {
217       CopyStickyEventsData data = { srcpad, GST_FLOW_OK };
218 
219       gst_pad_sticky_events_foreach (pad, copy_sticky_events, &data);
220       self->pending_sticky_events = data.ret != GST_FLOW_OK;
221     }
222 
223     ret = gst_pad_push_event (srcpad, event);
224     gst_object_unref (srcpad);
225     gst_object_unref (src);
226 
227     if (!ret && sticky) {
228       self->pending_sticky_events = TRUE;
229       ret = TRUE;
230     }
231   } else {
232     gst_event_unref (event);
233     ret = TRUE;
234   }
235 
236   return ret;
237 }
238 
239 static GstFlowReturn
gst_proxy_sink_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)240 gst_proxy_sink_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
241 {
242   GstProxySink *self = GST_PROXY_SINK (parent);
243   GstProxySrc *src;
244   GstFlowReturn ret = GST_FLOW_OK;
245 
246   GST_LOG_OBJECT (pad, "Chaining buffer %p", buffer);
247 
248   src = g_weak_ref_get (&self->proxysrc);
249   if (src) {
250     GstPad *srcpad;
251     srcpad = gst_proxy_src_get_internal_srcpad (src);
252 
253     if (self->pending_sticky_events) {
254       CopyStickyEventsData data = { srcpad, GST_FLOW_OK };
255 
256       gst_pad_sticky_events_foreach (pad, copy_sticky_events, &data);
257       self->pending_sticky_events = data.ret != GST_FLOW_OK;
258     }
259 
260     ret = gst_pad_push (srcpad, buffer);
261     gst_object_unref (srcpad);
262     gst_object_unref (src);
263 
264     GST_LOG_OBJECT (pad, "Chained buffer %p: %s", buffer,
265         gst_flow_get_name (ret));
266   } else {
267     gst_buffer_unref (buffer);
268     GST_LOG_OBJECT (pad, "Dropped buffer %p: no otherpad", buffer);
269   }
270 
271   return GST_FLOW_OK;
272 }
273 
274 static GstFlowReturn
gst_proxy_sink_sink_chain_list(GstPad * pad,GstObject * parent,GstBufferList * list)275 gst_proxy_sink_sink_chain_list (GstPad * pad, GstObject * parent,
276     GstBufferList * list)
277 {
278   GstProxySink *self = GST_PROXY_SINK (parent);
279   GstProxySrc *src;
280   GstFlowReturn ret = GST_FLOW_OK;
281 
282   GST_LOG_OBJECT (pad, "Chaining buffer list %p", list);
283 
284   src = g_weak_ref_get (&self->proxysrc);
285   if (src) {
286     GstPad *srcpad;
287     srcpad = gst_proxy_src_get_internal_srcpad (src);
288 
289     if (self->pending_sticky_events) {
290       CopyStickyEventsData data = { srcpad, GST_FLOW_OK };
291 
292       gst_pad_sticky_events_foreach (pad, copy_sticky_events, &data);
293       self->pending_sticky_events = data.ret != GST_FLOW_OK;
294     }
295 
296     ret = gst_pad_push_list (srcpad, list);
297     gst_object_unref (srcpad);
298     gst_object_unref (src);
299     GST_LOG_OBJECT (pad, "Chained buffer list %p: %s", list,
300         gst_flow_get_name (ret));
301   } else {
302     gst_buffer_list_unref (list);
303     GST_LOG_OBJECT (pad, "Dropped buffer list %p: no otherpad", list);
304   }
305 
306   return GST_FLOW_OK;
307 }
308 
309 /* Wrapper function for accessing private member
310  * This can also be retrieved with gst_element_get_static_pad, but that depends
311  * on the implementation of GstProxySink */
312 GstPad *
gst_proxy_sink_get_internal_sinkpad(GstProxySink * self)313 gst_proxy_sink_get_internal_sinkpad (GstProxySink * self)
314 {
315   g_return_val_if_fail (self, NULL);
316   return gst_object_ref (self->sinkpad);
317 }
318 
319 void
gst_proxy_sink_set_proxysrc(GstProxySink * self,GstProxySrc * src)320 gst_proxy_sink_set_proxysrc (GstProxySink * self, GstProxySrc * src)
321 {
322   g_return_if_fail (self);
323   g_weak_ref_set (&self->proxysrc, src);
324 }
325