• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2005 Wim Taymans <wim@fluendo.com>
4  *                    2006 Thomas Vander Stichele <thomas at apestaart dot org>
5  *                    2014 Tim-Philipp Müller <tim centricular com>
6  *               2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
7  *
8  * gstipcpipelinesink.c:
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Library General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * Library General Public License for more details.
19  *
20  * You should have received a copy of the GNU Library General Public
21  * License along with this library; if not, write to the
22  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23  * Boston, MA 02110-1301, USA.
24  */
25 /**
26  * SECTION:element-ipcpipelinesink
27  * @see_also: #GstIpcPipelineSrc, #GstIpcSlavePipeline
28  *
29  * Communicates with an ipcpipelinesrc element in another process via a socket.
30  *
31  * This element, together with ipcpipelinesrc and ipcslavepipeline form a
32  * mechanism that allows splitting a single pipeline in different processes.
33  * The main use-case for it is a playback pipeline split in two parts, where the
34  * first part contains the networking, parsing and demuxing and the second part
35  * contains the decoding and display. The intention of this split is to improve
36  * security of an application, by letting the networking, parsing and demuxing
37  * parts run in a less privileged process than the process that accesses the
38  * decoder and display.
39  *
40  * Once the pipelines in those different processes have been created, the
41  * playback can be controlled entirely from the first pipeline, which is the
42  * one that contains ipcpipelinesink. We call this pipeline the “master”.
43  * All relevant events and queries sent from the application are sent to
44  * the master pipeline and messages to the application are sent from the master
45  * pipeline. The second pipeline, in the other process, is transparently slaved.
46  *
47  * ipcpipelinesink can work only in push mode and does not synchronize buffers
48  * to the clock. Synchronization is meant to happen either at the real sink at
49  * the end of the remote slave pipeline, or not to happen at all, if the
50  * pipeline is live.
51  *
52  * A master pipeline may contain more than one ipcpipelinesink elements, which
53  * can be connected either to the same slave pipeline or to different ones.
54  *
55  * Communication with ipcpipelinesrc on the slave happens via a socket, using a
56  * custom protocol. Each buffer, event, query, message or state change is
57  * serialized in a "packet" and sent over the socket. The sender then
58  * performs a blocking wait for a reply, if a return code is needed.
59  *
60  * All objects that contain a GstStructure (messages, queries, events) are
61  * serialized by serializing the GstStructure to a string
62  * (gst_structure_to_string). This implies some limitations, of course.
63  * All fields of this structures that are not serializable to strings (ex.
64  * object pointers) are ignored, except for some cases where custom
65  * serialization may occur (ex error/warning/info messages that contain a
66  * GError are serialized differently).
67  *
68  * Buffers are transported by writing their content directly on the socket.
69  * More efficient ways for memory sharing could be implemented in the future.
70  */
71 
72 #ifdef HAVE_CONFIG_H
73 #  include "config.h"
74 #endif
75 
76 #include "gstipcpipelineelements.h"
77 #include "gstipcpipelinesink.h"
78 
79 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
80     GST_PAD_SINK,
81     GST_PAD_ALWAYS,
82     GST_STATIC_CAPS_ANY);
83 
84 GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_sink_debug);
85 #define GST_CAT_DEFAULT gst_ipc_pipeline_sink_debug
86 
87 enum
88 {
89   SIGNAL_DISCONNECT,
90   /* FILL ME */
91   LAST_SIGNAL
92 };
93 static guint gst_ipc_pipeline_sink_signals[LAST_SIGNAL] = { 0 };
94 
95 enum
96 {
97   PROP_0,
98   PROP_FDIN,
99   PROP_FDOUT,
100   PROP_READ_CHUNK_SIZE,
101   PROP_ACK_TIME,
102 };
103 
104 
105 #define DEFAULT_READ_CHUNK_SIZE 4096
106 #define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
107 
108 #define _do_init \
109     GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_sink_debug, "ipcpipelinesink", 0, "ipcpipelinesink element");
110 #define gst_ipc_pipeline_sink_parent_class parent_class
111 G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSink, gst_ipc_pipeline_sink,
112     GST_TYPE_ELEMENT, _do_init);
113 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (ipcpipelinesink, "ipcpipelinesink",
114     GST_RANK_NONE, GST_TYPE_IPC_PIPELINE_SINK,
115     icepipeline_element_init (plugin));
116 
117 static void gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id,
118     const GValue * value, GParamSpec * pspec);
119 static void gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id,
120     GValue * value, GParamSpec * pspec);
121 static void gst_ipc_pipeline_sink_dispose (GObject * obj);
122 static void gst_ipc_pipeline_sink_finalize (GObject * obj);
123 static gboolean gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink *
124     sink);
125 static void gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink *
126     sink);
127 
128 static GstStateChangeReturn gst_ipc_pipeline_sink_change_state (GstElement *
129     element, GstStateChange transition);
130 
131 static GstFlowReturn gst_ipc_pipeline_sink_chain (GstPad * pad,
132     GstObject * parent, GstBuffer * buffer);
133 static gboolean gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent,
134     GstEvent * event);
135 static gboolean gst_ipc_pipeline_sink_element_query (GstElement * element,
136     GstQuery * query);
137 static gboolean gst_ipc_pipeline_sink_send_event (GstElement * element,
138     GstEvent * event);
139 static gboolean gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent,
140     GstQuery * query);
141 static gboolean gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad,
142     GstObject * parent, GstPadMode mode, gboolean active);
143 
144 
145 static void gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink);
146 static void pusher (gpointer data, gpointer user_data);
147 
148 
149 static void
gst_ipc_pipeline_sink_class_init(GstIpcPipelineSinkClass * klass)150 gst_ipc_pipeline_sink_class_init (GstIpcPipelineSinkClass * klass)
151 {
152   GObjectClass *gobject_class;
153   GstElementClass *gstelement_class;
154 
155   gobject_class = G_OBJECT_CLASS (klass);
156   gstelement_class = GST_ELEMENT_CLASS (klass);
157 
158   gobject_class->set_property = gst_ipc_pipeline_sink_set_property;
159   gobject_class->get_property = gst_ipc_pipeline_sink_get_property;
160   gobject_class->dispose = gst_ipc_pipeline_sink_dispose;
161   gobject_class->finalize = gst_ipc_pipeline_sink_finalize;
162 
163   g_object_class_install_property (gobject_class, PROP_FDIN,
164       g_param_spec_int ("fdin", "Input file descriptor",
165           "File descriptor to received data from",
166           -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
167   g_object_class_install_property (gobject_class, PROP_FDOUT,
168       g_param_spec_int ("fdout", "Output file descriptor",
169           "File descriptor to send data through",
170           -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
171   g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE,
172       g_param_spec_uint ("read-chunk-size", "Read chunk size",
173           "Read chunk size",
174           1, 1 << 24, DEFAULT_READ_CHUNK_SIZE,
175           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
176   g_object_class_install_property (gobject_class, PROP_ACK_TIME,
177       g_param_spec_uint64 ("ack-time", "Ack time",
178           "Maximum time to wait for a response to a message",
179           0, G_MAXUINT64, DEFAULT_ACK_TIME,
180           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
181 
182   gst_ipc_pipeline_sink_signals[SIGNAL_DISCONNECT] =
183       g_signal_new ("disconnect",
184       G_TYPE_FROM_CLASS (klass),
185       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
186       G_STRUCT_OFFSET (GstIpcPipelineSinkClass, disconnect),
187       NULL, NULL, NULL, G_TYPE_NONE, 0);
188 
189   gst_element_class_set_static_metadata (gstelement_class,
190       "Inter-process Pipeline Sink",
191       "Sink",
192       "Allows splitting and continuing a pipeline in another process",
193       "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>");
194   gst_element_class_add_pad_template (gstelement_class,
195       gst_static_pad_template_get (&sinktemplate));
196 
197   gstelement_class->change_state =
198       GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_change_state);
199   gstelement_class->query =
200       GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_element_query);
201   gstelement_class->send_event =
202       GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_send_event);
203 
204   klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_disconnect);
205 }
206 
207 static void
gst_ipc_pipeline_sink_init(GstIpcPipelineSink * sink)208 gst_ipc_pipeline_sink_init (GstIpcPipelineSink * sink)
209 {
210   GstPadTemplate *pad_template;
211 
212   GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_SINK);
213 
214   gst_ipc_pipeline_comm_init (&sink->comm, GST_ELEMENT (sink));
215   sink->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
216   sink->comm.ack_time = DEFAULT_ACK_TIME;
217   sink->comm.fdin = -1;
218   sink->comm.fdout = -1;
219   sink->threads = g_thread_pool_new (pusher, sink, -1, FALSE, NULL);
220   gst_ipc_pipeline_sink_start_reader_thread (sink);
221 
222   pad_template =
223       gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (sink), "sink");
224   g_return_if_fail (pad_template != NULL);
225 
226   sink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
227 
228   gst_pad_set_activatemode_function (sink->sinkpad,
229       gst_ipc_pipeline_sink_pad_activate_mode);
230   gst_pad_set_query_function (sink->sinkpad, gst_ipc_pipeline_sink_query);
231   gst_pad_set_event_function (sink->sinkpad, gst_ipc_pipeline_sink_event);
232   gst_pad_set_chain_function (sink->sinkpad, gst_ipc_pipeline_sink_chain);
233   gst_element_add_pad (GST_ELEMENT_CAST (sink), sink->sinkpad);
234 
235 }
236 
237 static void
gst_ipc_pipeline_sink_dispose(GObject * obj)238 gst_ipc_pipeline_sink_dispose (GObject * obj)
239 {
240   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj);
241 
242   gst_ipc_pipeline_sink_stop_reader_thread (sink);
243   gst_ipc_pipeline_comm_cancel (&sink->comm, TRUE);
244 
245   G_OBJECT_CLASS (parent_class)->dispose (obj);
246 }
247 
248 static void
gst_ipc_pipeline_sink_finalize(GObject * obj)249 gst_ipc_pipeline_sink_finalize (GObject * obj)
250 {
251   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj);
252 
253   gst_ipc_pipeline_comm_clear (&sink->comm);
254   g_thread_pool_free (sink->threads, TRUE, TRUE);
255 
256   G_OBJECT_CLASS (parent_class)->finalize (obj);
257 }
258 
259 static void
gst_ipc_pipeline_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)260 gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id,
261     const GValue * value, GParamSpec * pspec)
262 {
263   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object);
264 
265   switch (prop_id) {
266     case PROP_FDIN:
267       sink->comm.fdin = g_value_get_int (value);
268       break;
269     case PROP_FDOUT:
270       sink->comm.fdout = g_value_get_int (value);
271       break;
272     case PROP_READ_CHUNK_SIZE:
273       sink->comm.read_chunk_size = g_value_get_uint (value);
274       break;
275     case PROP_ACK_TIME:
276       sink->comm.ack_time = g_value_get_uint64 (value);
277       break;
278     default:
279       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
280       break;
281   }
282 }
283 
284 static void
gst_ipc_pipeline_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)285 gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id,
286     GValue * value, GParamSpec * pspec)
287 {
288   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object);
289 
290   switch (prop_id) {
291     case PROP_FDIN:
292       g_value_set_int (value, sink->comm.fdin);
293       break;
294     case PROP_FDOUT:
295       g_value_set_int (value, sink->comm.fdout);
296       break;
297     case PROP_READ_CHUNK_SIZE:
298       g_value_set_uint (value, sink->comm.read_chunk_size);
299       break;
300     case PROP_ACK_TIME:
301       g_value_set_uint64 (value, sink->comm.ack_time);
302       break;
303     default:
304       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
305       break;
306   }
307 }
308 
309 static gboolean
gst_ipc_pipeline_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)310 gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
311 {
312   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
313   gboolean ret;
314 
315   GST_DEBUG_OBJECT (sink, "received event %p of type %s (%d)",
316       event, gst_event_type_get_name (event->type), event->type);
317 
318   ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, FALSE, event);
319   gst_event_unref (event);
320   return ret;
321 }
322 
323 static GstFlowReturn
gst_ipc_pipeline_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)324 gst_ipc_pipeline_sink_chain (GstPad * pad, GstObject * parent,
325     GstBuffer * buffer)
326 {
327   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
328   GstFlowReturn ret;
329 
330   GST_DEBUG_OBJECT (sink, "Rendering buffer %" GST_PTR_FORMAT, buffer);
331 
332   ret = gst_ipc_pipeline_comm_write_buffer_to_fd (&sink->comm, buffer);
333   if (ret != GST_FLOW_OK)
334     GST_DEBUG_OBJECT (sink, "Peer result was %s", gst_flow_get_name (ret));
335 
336   gst_buffer_unref (buffer);
337   return ret;
338 }
339 
340 static gboolean
gst_ipc_pipeline_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)341 gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
342 {
343   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
344   gboolean ret;
345 
346   GST_DEBUG_OBJECT (sink, "Got query %s: %" GST_PTR_FORMAT,
347       GST_QUERY_TYPE_NAME (query), query);
348 
349   switch (GST_QUERY_TYPE (query)) {
350     case GST_QUERY_ALLOCATION:
351       GST_DEBUG_OBJECT (sink, "Rejecting ALLOCATION query");
352       return FALSE;
353     case GST_QUERY_CAPS:
354     {
355       /* caps queries occur even while linking the pipeline.
356        * It is possible that the ipcpipelinesrc may not be connected at this
357        * point, so let's avoid a couple of errors... */
358       GstState state;
359       GST_OBJECT_LOCK (sink);
360       state = GST_STATE (sink);
361       GST_OBJECT_UNLOCK (sink);
362       if (state == GST_STATE_NULL)
363         return FALSE;
364     }
365     default:
366       break;
367   }
368   ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, FALSE, query);
369 
370   return ret;
371 }
372 
373 static gboolean
gst_ipc_pipeline_sink_element_query(GstElement * element,GstQuery * query)374 gst_ipc_pipeline_sink_element_query (GstElement * element, GstQuery * query)
375 {
376   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
377   gboolean ret;
378 
379   GST_DEBUG_OBJECT (sink, "Got element query %s: %" GST_PTR_FORMAT,
380       GST_QUERY_TYPE_NAME (query), query);
381 
382   ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, TRUE, query);
383   GST_DEBUG_OBJECT (sink, "Got query reply: %d: %" GST_PTR_FORMAT, ret, query);
384   return ret;
385 }
386 
387 static gboolean
gst_ipc_pipeline_sink_send_event(GstElement * element,GstEvent * event)388 gst_ipc_pipeline_sink_send_event (GstElement * element, GstEvent * event)
389 {
390   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
391   gboolean ret;
392 
393   GST_DEBUG_OBJECT (sink, "Got element event %s: %" GST_PTR_FORMAT,
394       GST_EVENT_TYPE_NAME (event), event);
395 
396   ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, TRUE, event);
397   GST_DEBUG_OBJECT (sink, "Got event reply: %d: %" GST_PTR_FORMAT, ret, event);
398 
399   gst_event_unref (event);
400   return ret;
401 }
402 
403 
404 static gboolean
gst_ipc_pipeline_sink_pad_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)405 gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad,
406     GstObject * parent, GstPadMode mode, gboolean active)
407 {
408   if (mode == GST_PAD_MODE_PULL)
409     return FALSE;
410   return TRUE;
411 }
412 
413 static void
on_buffer(guint32 id,GstBuffer * buffer,gpointer user_data)414 on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data)
415 {
416   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
417   GST_ERROR_OBJECT (sink,
418       "Got buffer id %u! I never knew buffers could go upstream...", id);
419   gst_buffer_unref (buffer);
420 }
421 
422 static void
pusher(gpointer data,gpointer user_data)423 pusher (gpointer data, gpointer user_data)
424 {
425   GstIpcPipelineSink *sink = user_data;
426   gboolean ret;
427   guint32 id;
428 
429   id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (data),
430           QUARK_ID));
431 
432   if (GST_IS_EVENT (data)) {
433     GstEvent *event = GST_EVENT (data);
434     GST_DEBUG_OBJECT (sink, "Pushing event async: %" GST_PTR_FORMAT, event);
435     ret = gst_pad_push_event (sink->sinkpad, event);
436     GST_DEBUG_OBJECT (sink, "Event pushed, return %d", ret);
437     gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, ret);
438   } else if (GST_IS_QUERY (data)) {
439     GstQuery *query = GST_QUERY (data);
440     GST_DEBUG_OBJECT (sink, "Pushing query async: %" GST_PTR_FORMAT, query);
441     ret = gst_pad_peer_query (sink->sinkpad, query);
442     GST_DEBUG_OBJECT (sink, "Query pushed, return %d", ret);
443     gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, ret,
444         query);
445     gst_query_unref (query);
446   } else {
447     GST_ERROR_OBJECT (sink, "Unsupported object type");
448   }
449   gst_object_unref (sink);
450 }
451 
452 static void
on_event(guint32 id,GstEvent * event,gboolean upstream,gpointer user_data)453 on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data)
454 {
455   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
456 
457   if (!upstream) {
458     GST_ERROR_OBJECT (sink, "Got downstream event id %u! Not supposed to...",
459         id);
460     gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, FALSE);
461     gst_event_unref (event);
462     return;
463   }
464 
465   GST_DEBUG_OBJECT (sink, "Got event id %u: %" GST_PTR_FORMAT, id, event);
466   gst_object_ref (sink);
467   g_thread_pool_push (sink->threads, event, NULL);
468 }
469 
470 static void
on_query(guint32 id,GstQuery * query,gboolean upstream,gpointer user_data)471 on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data)
472 {
473   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
474 
475   if (!upstream) {
476     GST_ERROR_OBJECT (sink, "Got downstream query id %u! Not supposed to...",
477         id);
478     gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, FALSE,
479         query);
480     gst_query_unref (query);
481     return;
482   }
483 
484   GST_DEBUG_OBJECT (sink, "Got query id %u: %" GST_PTR_FORMAT, id, query);
485   gst_object_ref (sink);
486   g_thread_pool_push (sink->threads, query, NULL);
487 }
488 
489 static void
on_state_change(guint32 id,GstStateChange transition,gpointer user_data)490 on_state_change (guint32 id, GstStateChange transition, gpointer user_data)
491 {
492   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
493   GST_ERROR_OBJECT (sink, "Got state change id %u! Not supposed to...", id);
494 }
495 
496 static void
on_state_lost(gpointer user_data)497 on_state_lost (gpointer user_data)
498 {
499   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
500 
501   GST_DEBUG_OBJECT (sink, "Got state lost notification, losing state");
502 
503   GST_OBJECT_LOCK (sink);
504   sink->pass_next_async_done = TRUE;
505   GST_OBJECT_UNLOCK (sink);
506 
507   gst_element_lost_state (GST_ELEMENT (sink));
508 }
509 
510 static void
do_async_done(GstElement * element,gpointer user_data)511 do_async_done (GstElement * element, gpointer user_data)
512 {
513   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
514   GstMessage *message = user_data;
515 
516   GST_STATE_LOCK (sink);
517   GST_OBJECT_LOCK (sink);
518   if (sink->pass_next_async_done) {
519     sink->pass_next_async_done = FALSE;
520     GST_OBJECT_UNLOCK (sink);
521     gst_element_continue_state (element, GST_STATE_CHANGE_SUCCESS);
522     GST_STATE_UNLOCK (sink);
523     gst_element_post_message (element, gst_message_ref (message));
524 
525   } else {
526     GST_OBJECT_UNLOCK (sink);
527     GST_STATE_UNLOCK (sink);
528   }
529 }
530 
531 static void
on_message(guint32 id,GstMessage * message,gpointer user_data)532 on_message (guint32 id, GstMessage * message, gpointer user_data)
533 {
534   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
535 
536   GST_DEBUG_OBJECT (sink, "Got message id %u: %" GST_PTR_FORMAT, id, message);
537 
538   switch (GST_MESSAGE_TYPE (message)) {
539     case GST_MESSAGE_ASYNC_DONE:
540       GST_OBJECT_LOCK (sink);
541       if (sink->pass_next_async_done) {
542         GST_OBJECT_UNLOCK (sink);
543         gst_element_call_async (GST_ELEMENT (sink), do_async_done,
544             message, (GDestroyNotify) gst_message_unref);
545       } else {
546         GST_OBJECT_UNLOCK (sink);
547         gst_message_unref (message);
548       }
549       return;
550     default:
551       break;
552   }
553 
554   gst_element_post_message (GST_ELEMENT (sink), message);
555 }
556 
557 static gboolean
gst_ipc_pipeline_sink_start_reader_thread(GstIpcPipelineSink * sink)558 gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink * sink)
559 {
560   if (!gst_ipc_pipeline_comm_start_reader_thread (&sink->comm, on_buffer,
561           on_event, on_query, on_state_change, on_state_lost, on_message,
562           sink)) {
563     GST_ERROR_OBJECT (sink, "Failed to start reader thread");
564     return FALSE;
565   }
566   return TRUE;
567 }
568 
569 static void
gst_ipc_pipeline_sink_stop_reader_thread(GstIpcPipelineSink * sink)570 gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink * sink)
571 {
572   gst_ipc_pipeline_comm_stop_reader_thread (&sink->comm);
573 }
574 
575 
576 static void
gst_ipc_pipeline_sink_disconnect(GstIpcPipelineSink * sink)577 gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink)
578 {
579   GST_DEBUG_OBJECT (sink, "Disconnecting");
580   gst_ipc_pipeline_sink_stop_reader_thread (sink);
581   sink->comm.fdin = -1;
582   sink->comm.fdout = -1;
583   gst_ipc_pipeline_comm_cancel (&sink->comm, FALSE);
584   gst_ipc_pipeline_sink_start_reader_thread (sink);
585 }
586 
587 static GstStateChangeReturn
gst_ipc_pipeline_sink_change_state(GstElement * element,GstStateChange transition)588 gst_ipc_pipeline_sink_change_state (GstElement * element,
589     GstStateChange transition)
590 {
591   GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
592   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
593   GstStateChangeReturn peer_ret = GST_STATE_CHANGE_SUCCESS;
594   gboolean async = FALSE;
595   gboolean down = FALSE;
596 
597   GST_DEBUG_OBJECT (sink, "Got state change request: %s -> %s",
598       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
599       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
600 
601   switch (transition) {
602     case GST_STATE_CHANGE_NULL_TO_READY:
603       if (sink->comm.fdin < 0) {
604         GST_ERROR_OBJECT (element, "Invalid fdin: %d", sink->comm.fdin);
605         return GST_STATE_CHANGE_FAILURE;
606       }
607       if (sink->comm.fdout < 0) {
608         GST_ERROR_OBJECT (element, "Invalid fdout: %d", sink->comm.fdout);
609         return GST_STATE_CHANGE_FAILURE;
610       }
611       if (!sink->comm.reader_thread) {
612         GST_ERROR_OBJECT (element, "Failed to start reader thread");
613         return GST_STATE_CHANGE_FAILURE;
614       }
615       break;
616     case GST_STATE_CHANGE_READY_TO_PAUSED:
617     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
618     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
619       /* In these transitions, it is possible that the peer returns ASYNC.
620        * We don't know that in advance, but we post async-start anyway because
621        * it needs to be delivered *before* async-done, and async-done may
622        * arrive at any point in time after we've set the state of the peer.
623        * In case the peer doesn't return ASYNC, we just post async-done
624        * ourselves and the parent GstBin takes care of matching and deleting
625        * them, so the app never gets any of these. */
626       async = TRUE;
627       break;
628     default:
629       break;
630   }
631 
632   /* downwards state change */
633   down = (GST_STATE_TRANSITION_CURRENT (transition) >=
634       GST_STATE_TRANSITION_NEXT (transition));
635 
636   if (async) {
637     GST_DEBUG_OBJECT (sink,
638         "Posting async-start for %s, will need state-change-done",
639         gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
640 
641     gst_element_post_message (GST_ELEMENT (sink),
642         gst_message_new_async_start (GST_OBJECT (sink)));
643 
644     GST_OBJECT_LOCK (sink);
645     sink->pass_next_async_done = TRUE;
646     GST_OBJECT_UNLOCK (sink);
647   }
648 
649   /* change the state of the peer first */
650   /* If the fd out is -1, we do not actually call the peer. This will happen
651      when we explicitly disconnected, and in that case we want to be able
652      to bring the element down to NULL, so it can be restarted with a new
653      slave pipeline. */
654   if (sink->comm.fdout >= 0) {
655     GST_DEBUG_OBJECT (sink, "Calling peer with state change");
656     peer_ret = gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm,
657         transition);
658     if (peer_ret == GST_STATE_CHANGE_FAILURE && down) {
659       GST_WARNING_OBJECT (sink, "Peer returned state change failure, "
660           "but ignoring because we are going down");
661       peer_ret = GST_STATE_CHANGE_SUCCESS;
662     }
663   } else {
664     if (down) {
665       GST_WARNING_OBJECT (sink, "Not calling peer (fdout %d)",
666           sink->comm.fdout);
667       peer_ret = GST_STATE_CHANGE_SUCCESS;
668     } else {
669       GST_ERROR_OBJECT (sink, "Not calling peer (fdout %d) and failing",
670           sink->comm.fdout);
671       peer_ret = GST_STATE_CHANGE_FAILURE;
672     }
673   }
674 
675   /* chain up to the parent class to change our state, if the peer succeeded */
676   if (peer_ret != GST_STATE_CHANGE_FAILURE) {
677     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
678 
679     if (G_UNLIKELY (ret == GST_STATE_CHANGE_FAILURE && down)) {
680       GST_WARNING_OBJECT (sink, "Parent returned state change failure, "
681           "but ignoring because we are going down");
682       ret = GST_STATE_CHANGE_SUCCESS;
683     }
684   }
685 
686   GST_DEBUG_OBJECT (sink, "For %s -> %s: Peer ret: %s, parent ret: %s",
687       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
688       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)),
689       gst_element_state_change_return_get_name (peer_ret),
690       gst_element_state_change_return_get_name (ret));
691 
692   /* now interpret the return codes */
693   if (async && peer_ret != GST_STATE_CHANGE_ASYNC) {
694     GST_DEBUG_OBJECT (sink, "Posting async-done for %s; peer wasn't ASYNC",
695         gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
696 
697     GST_OBJECT_LOCK (sink);
698     sink->pass_next_async_done = FALSE;
699     GST_OBJECT_UNLOCK (sink);
700 
701     gst_element_post_message (GST_ELEMENT (sink),
702         gst_message_new_async_done (GST_OBJECT (sink), GST_CLOCK_TIME_NONE));
703   } else if (G_UNLIKELY (!async && peer_ret == GST_STATE_CHANGE_ASYNC)) {
704     GST_WARNING_OBJECT (sink, "Transition not async but peer returned ASYNC");
705     peer_ret = GST_STATE_CHANGE_SUCCESS;
706   }
707 
708   if (peer_ret == GST_STATE_CHANGE_FAILURE || ret == GST_STATE_CHANGE_FAILURE) {
709     if (peer_ret != GST_STATE_CHANGE_FAILURE && sink->comm.fdout >= 0) {
710       /* only the parent's ret was FAILURE - revert remote changes */
711       GST_DEBUG_OBJECT (sink, "Reverting remote state change because parent "
712           "returned failure");
713       gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm,
714           GST_STATE_TRANSITION (GST_STATE_TRANSITION_NEXT (transition),
715               GST_STATE_TRANSITION_CURRENT (transition)));
716     }
717     return GST_STATE_CHANGE_FAILURE;
718   }
719 
720   /* the parent's (GstElement) state change func won't return ASYNC or
721    * NO_PREROLL, so unless it has returned FAILURE, which we have caught above,
722    * we are not interested in its return code... just return the peer's */
723   return peer_ret;
724 }
725