1 /* GStreamer
2 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 * 2005 Wim Taymans <wim@fluendo.com>
4 * 2006 Thomas Vander Stichele <thomas at apestaart dot org>
5 * 2014 Tim-Philipp Müller <tim centricular com>
6 * 2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
7 *
8 * gstipcpipelinesink.c:
9 *
10 * This library is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Library General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This library is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * Library General Public License for more details.
19 *
20 * You should have received a copy of the GNU Library General Public
21 * License along with this library; if not, write to the
22 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
23 * Boston, MA 02110-1301, USA.
24 */
25 /**
26 * SECTION:element-ipcpipelinesink
27 * @see_also: #GstIpcPipelineSrc, #GstIpcSlavePipeline
28 *
29 * Communicates with an ipcpipelinesrc element in another process via a socket.
30 *
31 * This element, together with ipcpipelinesrc and ipcslavepipeline form a
32 * mechanism that allows splitting a single pipeline in different processes.
33 * The main use-case for it is a playback pipeline split in two parts, where the
34 * first part contains the networking, parsing and demuxing and the second part
35 * contains the decoding and display. The intention of this split is to improve
36 * security of an application, by letting the networking, parsing and demuxing
37 * parts run in a less privileged process than the process that accesses the
38 * decoder and display.
39 *
40 * Once the pipelines in those different processes have been created, the
41 * playback can be controlled entirely from the first pipeline, which is the
42 * one that contains ipcpipelinesink. We call this pipeline the “master”.
43 * All relevant events and queries sent from the application are sent to
44 * the master pipeline and messages to the application are sent from the master
45 * pipeline. The second pipeline, in the other process, is transparently slaved.
46 *
47 * ipcpipelinesink can work only in push mode and does not synchronize buffers
48 * to the clock. Synchronization is meant to happen either at the real sink at
49 * the end of the remote slave pipeline, or not to happen at all, if the
50 * pipeline is live.
51 *
52 * A master pipeline may contain more than one ipcpipelinesink elements, which
53 * can be connected either to the same slave pipeline or to different ones.
54 *
55 * Communication with ipcpipelinesrc on the slave happens via a socket, using a
56 * custom protocol. Each buffer, event, query, message or state change is
57 * serialized in a "packet" and sent over the socket. The sender then
58 * performs a blocking wait for a reply, if a return code is needed.
59 *
60 * All objects that contain a GstStructure (messages, queries, events) are
61 * serialized by serializing the GstStructure to a string
62 * (gst_structure_to_string). This implies some limitations, of course.
63 * All fields of this structures that are not serializable to strings (ex.
64 * object pointers) are ignored, except for some cases where custom
65 * serialization may occur (ex error/warning/info messages that contain a
66 * GError are serialized differently).
67 *
68 * Buffers are transported by writing their content directly on the socket.
69 * More efficient ways for memory sharing could be implemented in the future.
70 */
71
72 #ifdef HAVE_CONFIG_H
73 # include "config.h"
74 #endif
75
76 #include "gstipcpipelineelements.h"
77 #include "gstipcpipelinesink.h"
78
79 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
80 GST_PAD_SINK,
81 GST_PAD_ALWAYS,
82 GST_STATIC_CAPS_ANY);
83
84 GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_sink_debug);
85 #define GST_CAT_DEFAULT gst_ipc_pipeline_sink_debug
86
87 enum
88 {
89 SIGNAL_DISCONNECT,
90 /* FILL ME */
91 LAST_SIGNAL
92 };
93 static guint gst_ipc_pipeline_sink_signals[LAST_SIGNAL] = { 0 };
94
95 enum
96 {
97 PROP_0,
98 PROP_FDIN,
99 PROP_FDOUT,
100 PROP_READ_CHUNK_SIZE,
101 PROP_ACK_TIME,
102 };
103
104
105 #define DEFAULT_READ_CHUNK_SIZE 4096
106 #define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
107
108 #define _do_init \
109 GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_sink_debug, "ipcpipelinesink", 0, "ipcpipelinesink element");
110 #define gst_ipc_pipeline_sink_parent_class parent_class
111 G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSink, gst_ipc_pipeline_sink,
112 GST_TYPE_ELEMENT, _do_init);
113 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (ipcpipelinesink, "ipcpipelinesink",
114 GST_RANK_NONE, GST_TYPE_IPC_PIPELINE_SINK,
115 icepipeline_element_init (plugin));
116
117 static void gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id,
118 const GValue * value, GParamSpec * pspec);
119 static void gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id,
120 GValue * value, GParamSpec * pspec);
121 static void gst_ipc_pipeline_sink_dispose (GObject * obj);
122 static void gst_ipc_pipeline_sink_finalize (GObject * obj);
123 static gboolean gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink *
124 sink);
125 static void gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink *
126 sink);
127
128 static GstStateChangeReturn gst_ipc_pipeline_sink_change_state (GstElement *
129 element, GstStateChange transition);
130
131 static GstFlowReturn gst_ipc_pipeline_sink_chain (GstPad * pad,
132 GstObject * parent, GstBuffer * buffer);
133 static gboolean gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent,
134 GstEvent * event);
135 static gboolean gst_ipc_pipeline_sink_element_query (GstElement * element,
136 GstQuery * query);
137 static gboolean gst_ipc_pipeline_sink_send_event (GstElement * element,
138 GstEvent * event);
139 static gboolean gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent,
140 GstQuery * query);
141 static gboolean gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad,
142 GstObject * parent, GstPadMode mode, gboolean active);
143
144
145 static void gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink);
146 static void pusher (gpointer data, gpointer user_data);
147
148
149 static void
gst_ipc_pipeline_sink_class_init(GstIpcPipelineSinkClass * klass)150 gst_ipc_pipeline_sink_class_init (GstIpcPipelineSinkClass * klass)
151 {
152 GObjectClass *gobject_class;
153 GstElementClass *gstelement_class;
154
155 gobject_class = G_OBJECT_CLASS (klass);
156 gstelement_class = GST_ELEMENT_CLASS (klass);
157
158 gobject_class->set_property = gst_ipc_pipeline_sink_set_property;
159 gobject_class->get_property = gst_ipc_pipeline_sink_get_property;
160 gobject_class->dispose = gst_ipc_pipeline_sink_dispose;
161 gobject_class->finalize = gst_ipc_pipeline_sink_finalize;
162
163 g_object_class_install_property (gobject_class, PROP_FDIN,
164 g_param_spec_int ("fdin", "Input file descriptor",
165 "File descriptor to received data from",
166 -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
167 g_object_class_install_property (gobject_class, PROP_FDOUT,
168 g_param_spec_int ("fdout", "Output file descriptor",
169 "File descriptor to send data through",
170 -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
171 g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE,
172 g_param_spec_uint ("read-chunk-size", "Read chunk size",
173 "Read chunk size",
174 1, 1 << 24, DEFAULT_READ_CHUNK_SIZE,
175 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
176 g_object_class_install_property (gobject_class, PROP_ACK_TIME,
177 g_param_spec_uint64 ("ack-time", "Ack time",
178 "Maximum time to wait for a response to a message",
179 0, G_MAXUINT64, DEFAULT_ACK_TIME,
180 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
181
182 gst_ipc_pipeline_sink_signals[SIGNAL_DISCONNECT] =
183 g_signal_new ("disconnect",
184 G_TYPE_FROM_CLASS (klass),
185 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
186 G_STRUCT_OFFSET (GstIpcPipelineSinkClass, disconnect),
187 NULL, NULL, NULL, G_TYPE_NONE, 0);
188
189 gst_element_class_set_static_metadata (gstelement_class,
190 "Inter-process Pipeline Sink",
191 "Sink",
192 "Allows splitting and continuing a pipeline in another process",
193 "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>");
194 gst_element_class_add_pad_template (gstelement_class,
195 gst_static_pad_template_get (&sinktemplate));
196
197 gstelement_class->change_state =
198 GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_change_state);
199 gstelement_class->query =
200 GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_element_query);
201 gstelement_class->send_event =
202 GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_send_event);
203
204 klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_sink_disconnect);
205 }
206
207 static void
gst_ipc_pipeline_sink_init(GstIpcPipelineSink * sink)208 gst_ipc_pipeline_sink_init (GstIpcPipelineSink * sink)
209 {
210 GstPadTemplate *pad_template;
211
212 GST_OBJECT_FLAG_SET (sink, GST_ELEMENT_FLAG_SINK);
213
214 gst_ipc_pipeline_comm_init (&sink->comm, GST_ELEMENT (sink));
215 sink->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
216 sink->comm.ack_time = DEFAULT_ACK_TIME;
217 sink->comm.fdin = -1;
218 sink->comm.fdout = -1;
219 sink->threads = g_thread_pool_new (pusher, sink, -1, FALSE, NULL);
220 gst_ipc_pipeline_sink_start_reader_thread (sink);
221
222 pad_template =
223 gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (sink), "sink");
224 g_return_if_fail (pad_template != NULL);
225
226 sink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
227
228 gst_pad_set_activatemode_function (sink->sinkpad,
229 gst_ipc_pipeline_sink_pad_activate_mode);
230 gst_pad_set_query_function (sink->sinkpad, gst_ipc_pipeline_sink_query);
231 gst_pad_set_event_function (sink->sinkpad, gst_ipc_pipeline_sink_event);
232 gst_pad_set_chain_function (sink->sinkpad, gst_ipc_pipeline_sink_chain);
233 gst_element_add_pad (GST_ELEMENT_CAST (sink), sink->sinkpad);
234
235 }
236
237 static void
gst_ipc_pipeline_sink_dispose(GObject * obj)238 gst_ipc_pipeline_sink_dispose (GObject * obj)
239 {
240 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj);
241
242 gst_ipc_pipeline_sink_stop_reader_thread (sink);
243 gst_ipc_pipeline_comm_cancel (&sink->comm, TRUE);
244
245 G_OBJECT_CLASS (parent_class)->dispose (obj);
246 }
247
248 static void
gst_ipc_pipeline_sink_finalize(GObject * obj)249 gst_ipc_pipeline_sink_finalize (GObject * obj)
250 {
251 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (obj);
252
253 gst_ipc_pipeline_comm_clear (&sink->comm);
254 g_thread_pool_free (sink->threads, TRUE, TRUE);
255
256 G_OBJECT_CLASS (parent_class)->finalize (obj);
257 }
258
259 static void
gst_ipc_pipeline_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)260 gst_ipc_pipeline_sink_set_property (GObject * object, guint prop_id,
261 const GValue * value, GParamSpec * pspec)
262 {
263 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object);
264
265 switch (prop_id) {
266 case PROP_FDIN:
267 sink->comm.fdin = g_value_get_int (value);
268 break;
269 case PROP_FDOUT:
270 sink->comm.fdout = g_value_get_int (value);
271 break;
272 case PROP_READ_CHUNK_SIZE:
273 sink->comm.read_chunk_size = g_value_get_uint (value);
274 break;
275 case PROP_ACK_TIME:
276 sink->comm.ack_time = g_value_get_uint64 (value);
277 break;
278 default:
279 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
280 break;
281 }
282 }
283
284 static void
gst_ipc_pipeline_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)285 gst_ipc_pipeline_sink_get_property (GObject * object, guint prop_id,
286 GValue * value, GParamSpec * pspec)
287 {
288 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (object);
289
290 switch (prop_id) {
291 case PROP_FDIN:
292 g_value_set_int (value, sink->comm.fdin);
293 break;
294 case PROP_FDOUT:
295 g_value_set_int (value, sink->comm.fdout);
296 break;
297 case PROP_READ_CHUNK_SIZE:
298 g_value_set_uint (value, sink->comm.read_chunk_size);
299 break;
300 case PROP_ACK_TIME:
301 g_value_set_uint64 (value, sink->comm.ack_time);
302 break;
303 default:
304 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
305 break;
306 }
307 }
308
309 static gboolean
gst_ipc_pipeline_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)310 gst_ipc_pipeline_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
311 {
312 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
313 gboolean ret;
314
315 GST_DEBUG_OBJECT (sink, "received event %p of type %s (%d)",
316 event, gst_event_type_get_name (event->type), event->type);
317
318 ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, FALSE, event);
319 gst_event_unref (event);
320 return ret;
321 }
322
323 static GstFlowReturn
gst_ipc_pipeline_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)324 gst_ipc_pipeline_sink_chain (GstPad * pad, GstObject * parent,
325 GstBuffer * buffer)
326 {
327 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
328 GstFlowReturn ret;
329
330 GST_DEBUG_OBJECT (sink, "Rendering buffer %" GST_PTR_FORMAT, buffer);
331
332 ret = gst_ipc_pipeline_comm_write_buffer_to_fd (&sink->comm, buffer);
333 if (ret != GST_FLOW_OK)
334 GST_DEBUG_OBJECT (sink, "Peer result was %s", gst_flow_get_name (ret));
335
336 gst_buffer_unref (buffer);
337 return ret;
338 }
339
340 static gboolean
gst_ipc_pipeline_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)341 gst_ipc_pipeline_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
342 {
343 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (parent);
344 gboolean ret;
345
346 GST_DEBUG_OBJECT (sink, "Got query %s: %" GST_PTR_FORMAT,
347 GST_QUERY_TYPE_NAME (query), query);
348
349 switch (GST_QUERY_TYPE (query)) {
350 case GST_QUERY_ALLOCATION:
351 GST_DEBUG_OBJECT (sink, "Rejecting ALLOCATION query");
352 return FALSE;
353 case GST_QUERY_CAPS:
354 {
355 /* caps queries occur even while linking the pipeline.
356 * It is possible that the ipcpipelinesrc may not be connected at this
357 * point, so let's avoid a couple of errors... */
358 GstState state;
359 GST_OBJECT_LOCK (sink);
360 state = GST_STATE (sink);
361 GST_OBJECT_UNLOCK (sink);
362 if (state == GST_STATE_NULL)
363 return FALSE;
364 }
365 default:
366 break;
367 }
368 ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, FALSE, query);
369
370 return ret;
371 }
372
373 static gboolean
gst_ipc_pipeline_sink_element_query(GstElement * element,GstQuery * query)374 gst_ipc_pipeline_sink_element_query (GstElement * element, GstQuery * query)
375 {
376 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
377 gboolean ret;
378
379 GST_DEBUG_OBJECT (sink, "Got element query %s: %" GST_PTR_FORMAT,
380 GST_QUERY_TYPE_NAME (query), query);
381
382 ret = gst_ipc_pipeline_comm_write_query_to_fd (&sink->comm, TRUE, query);
383 GST_DEBUG_OBJECT (sink, "Got query reply: %d: %" GST_PTR_FORMAT, ret, query);
384 return ret;
385 }
386
387 static gboolean
gst_ipc_pipeline_sink_send_event(GstElement * element,GstEvent * event)388 gst_ipc_pipeline_sink_send_event (GstElement * element, GstEvent * event)
389 {
390 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
391 gboolean ret;
392
393 GST_DEBUG_OBJECT (sink, "Got element event %s: %" GST_PTR_FORMAT,
394 GST_EVENT_TYPE_NAME (event), event);
395
396 ret = gst_ipc_pipeline_comm_write_event_to_fd (&sink->comm, TRUE, event);
397 GST_DEBUG_OBJECT (sink, "Got event reply: %d: %" GST_PTR_FORMAT, ret, event);
398
399 gst_event_unref (event);
400 return ret;
401 }
402
403
404 static gboolean
gst_ipc_pipeline_sink_pad_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)405 gst_ipc_pipeline_sink_pad_activate_mode (GstPad * pad,
406 GstObject * parent, GstPadMode mode, gboolean active)
407 {
408 if (mode == GST_PAD_MODE_PULL)
409 return FALSE;
410 return TRUE;
411 }
412
413 static void
on_buffer(guint32 id,GstBuffer * buffer,gpointer user_data)414 on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data)
415 {
416 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
417 GST_ERROR_OBJECT (sink,
418 "Got buffer id %u! I never knew buffers could go upstream...", id);
419 gst_buffer_unref (buffer);
420 }
421
422 static void
pusher(gpointer data,gpointer user_data)423 pusher (gpointer data, gpointer user_data)
424 {
425 GstIpcPipelineSink *sink = user_data;
426 gboolean ret;
427 guint32 id;
428
429 id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (data),
430 QUARK_ID));
431
432 if (GST_IS_EVENT (data)) {
433 GstEvent *event = GST_EVENT (data);
434 GST_DEBUG_OBJECT (sink, "Pushing event async: %" GST_PTR_FORMAT, event);
435 ret = gst_pad_push_event (sink->sinkpad, event);
436 GST_DEBUG_OBJECT (sink, "Event pushed, return %d", ret);
437 gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, ret);
438 } else if (GST_IS_QUERY (data)) {
439 GstQuery *query = GST_QUERY (data);
440 GST_DEBUG_OBJECT (sink, "Pushing query async: %" GST_PTR_FORMAT, query);
441 ret = gst_pad_peer_query (sink->sinkpad, query);
442 GST_DEBUG_OBJECT (sink, "Query pushed, return %d", ret);
443 gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, ret,
444 query);
445 gst_query_unref (query);
446 } else {
447 GST_ERROR_OBJECT (sink, "Unsupported object type");
448 }
449 gst_object_unref (sink);
450 }
451
452 static void
on_event(guint32 id,GstEvent * event,gboolean upstream,gpointer user_data)453 on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data)
454 {
455 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
456
457 if (!upstream) {
458 GST_ERROR_OBJECT (sink, "Got downstream event id %u! Not supposed to...",
459 id);
460 gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&sink->comm, id, FALSE);
461 gst_event_unref (event);
462 return;
463 }
464
465 GST_DEBUG_OBJECT (sink, "Got event id %u: %" GST_PTR_FORMAT, id, event);
466 gst_object_ref (sink);
467 g_thread_pool_push (sink->threads, event, NULL);
468 }
469
470 static void
on_query(guint32 id,GstQuery * query,gboolean upstream,gpointer user_data)471 on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data)
472 {
473 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
474
475 if (!upstream) {
476 GST_ERROR_OBJECT (sink, "Got downstream query id %u! Not supposed to...",
477 id);
478 gst_ipc_pipeline_comm_write_query_result_to_fd (&sink->comm, id, FALSE,
479 query);
480 gst_query_unref (query);
481 return;
482 }
483
484 GST_DEBUG_OBJECT (sink, "Got query id %u: %" GST_PTR_FORMAT, id, query);
485 gst_object_ref (sink);
486 g_thread_pool_push (sink->threads, query, NULL);
487 }
488
489 static void
on_state_change(guint32 id,GstStateChange transition,gpointer user_data)490 on_state_change (guint32 id, GstStateChange transition, gpointer user_data)
491 {
492 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
493 GST_ERROR_OBJECT (sink, "Got state change id %u! Not supposed to...", id);
494 }
495
496 static void
on_state_lost(gpointer user_data)497 on_state_lost (gpointer user_data)
498 {
499 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
500
501 GST_DEBUG_OBJECT (sink, "Got state lost notification, losing state");
502
503 GST_OBJECT_LOCK (sink);
504 sink->pass_next_async_done = TRUE;
505 GST_OBJECT_UNLOCK (sink);
506
507 gst_element_lost_state (GST_ELEMENT (sink));
508 }
509
510 static void
do_async_done(GstElement * element,gpointer user_data)511 do_async_done (GstElement * element, gpointer user_data)
512 {
513 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
514 GstMessage *message = user_data;
515
516 GST_STATE_LOCK (sink);
517 GST_OBJECT_LOCK (sink);
518 if (sink->pass_next_async_done) {
519 sink->pass_next_async_done = FALSE;
520 GST_OBJECT_UNLOCK (sink);
521 gst_element_continue_state (element, GST_STATE_CHANGE_SUCCESS);
522 GST_STATE_UNLOCK (sink);
523 gst_element_post_message (element, gst_message_ref (message));
524
525 } else {
526 GST_OBJECT_UNLOCK (sink);
527 GST_STATE_UNLOCK (sink);
528 }
529 }
530
531 static void
on_message(guint32 id,GstMessage * message,gpointer user_data)532 on_message (guint32 id, GstMessage * message, gpointer user_data)
533 {
534 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (user_data);
535
536 GST_DEBUG_OBJECT (sink, "Got message id %u: %" GST_PTR_FORMAT, id, message);
537
538 switch (GST_MESSAGE_TYPE (message)) {
539 case GST_MESSAGE_ASYNC_DONE:
540 GST_OBJECT_LOCK (sink);
541 if (sink->pass_next_async_done) {
542 GST_OBJECT_UNLOCK (sink);
543 gst_element_call_async (GST_ELEMENT (sink), do_async_done,
544 message, (GDestroyNotify) gst_message_unref);
545 } else {
546 GST_OBJECT_UNLOCK (sink);
547 gst_message_unref (message);
548 }
549 return;
550 default:
551 break;
552 }
553
554 gst_element_post_message (GST_ELEMENT (sink), message);
555 }
556
557 static gboolean
gst_ipc_pipeline_sink_start_reader_thread(GstIpcPipelineSink * sink)558 gst_ipc_pipeline_sink_start_reader_thread (GstIpcPipelineSink * sink)
559 {
560 if (!gst_ipc_pipeline_comm_start_reader_thread (&sink->comm, on_buffer,
561 on_event, on_query, on_state_change, on_state_lost, on_message,
562 sink)) {
563 GST_ERROR_OBJECT (sink, "Failed to start reader thread");
564 return FALSE;
565 }
566 return TRUE;
567 }
568
569 static void
gst_ipc_pipeline_sink_stop_reader_thread(GstIpcPipelineSink * sink)570 gst_ipc_pipeline_sink_stop_reader_thread (GstIpcPipelineSink * sink)
571 {
572 gst_ipc_pipeline_comm_stop_reader_thread (&sink->comm);
573 }
574
575
576 static void
gst_ipc_pipeline_sink_disconnect(GstIpcPipelineSink * sink)577 gst_ipc_pipeline_sink_disconnect (GstIpcPipelineSink * sink)
578 {
579 GST_DEBUG_OBJECT (sink, "Disconnecting");
580 gst_ipc_pipeline_sink_stop_reader_thread (sink);
581 sink->comm.fdin = -1;
582 sink->comm.fdout = -1;
583 gst_ipc_pipeline_comm_cancel (&sink->comm, FALSE);
584 gst_ipc_pipeline_sink_start_reader_thread (sink);
585 }
586
587 static GstStateChangeReturn
gst_ipc_pipeline_sink_change_state(GstElement * element,GstStateChange transition)588 gst_ipc_pipeline_sink_change_state (GstElement * element,
589 GstStateChange transition)
590 {
591 GstIpcPipelineSink *sink = GST_IPC_PIPELINE_SINK (element);
592 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
593 GstStateChangeReturn peer_ret = GST_STATE_CHANGE_SUCCESS;
594 gboolean async = FALSE;
595 gboolean down = FALSE;
596
597 GST_DEBUG_OBJECT (sink, "Got state change request: %s -> %s",
598 gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
599 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
600
601 switch (transition) {
602 case GST_STATE_CHANGE_NULL_TO_READY:
603 if (sink->comm.fdin < 0) {
604 GST_ERROR_OBJECT (element, "Invalid fdin: %d", sink->comm.fdin);
605 return GST_STATE_CHANGE_FAILURE;
606 }
607 if (sink->comm.fdout < 0) {
608 GST_ERROR_OBJECT (element, "Invalid fdout: %d", sink->comm.fdout);
609 return GST_STATE_CHANGE_FAILURE;
610 }
611 if (!sink->comm.reader_thread) {
612 GST_ERROR_OBJECT (element, "Failed to start reader thread");
613 return GST_STATE_CHANGE_FAILURE;
614 }
615 break;
616 case GST_STATE_CHANGE_READY_TO_PAUSED:
617 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
618 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
619 /* In these transitions, it is possible that the peer returns ASYNC.
620 * We don't know that in advance, but we post async-start anyway because
621 * it needs to be delivered *before* async-done, and async-done may
622 * arrive at any point in time after we've set the state of the peer.
623 * In case the peer doesn't return ASYNC, we just post async-done
624 * ourselves and the parent GstBin takes care of matching and deleting
625 * them, so the app never gets any of these. */
626 async = TRUE;
627 break;
628 default:
629 break;
630 }
631
632 /* downwards state change */
633 down = (GST_STATE_TRANSITION_CURRENT (transition) >=
634 GST_STATE_TRANSITION_NEXT (transition));
635
636 if (async) {
637 GST_DEBUG_OBJECT (sink,
638 "Posting async-start for %s, will need state-change-done",
639 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
640
641 gst_element_post_message (GST_ELEMENT (sink),
642 gst_message_new_async_start (GST_OBJECT (sink)));
643
644 GST_OBJECT_LOCK (sink);
645 sink->pass_next_async_done = TRUE;
646 GST_OBJECT_UNLOCK (sink);
647 }
648
649 /* change the state of the peer first */
650 /* If the fd out is -1, we do not actually call the peer. This will happen
651 when we explicitly disconnected, and in that case we want to be able
652 to bring the element down to NULL, so it can be restarted with a new
653 slave pipeline. */
654 if (sink->comm.fdout >= 0) {
655 GST_DEBUG_OBJECT (sink, "Calling peer with state change");
656 peer_ret = gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm,
657 transition);
658 if (peer_ret == GST_STATE_CHANGE_FAILURE && down) {
659 GST_WARNING_OBJECT (sink, "Peer returned state change failure, "
660 "but ignoring because we are going down");
661 peer_ret = GST_STATE_CHANGE_SUCCESS;
662 }
663 } else {
664 if (down) {
665 GST_WARNING_OBJECT (sink, "Not calling peer (fdout %d)",
666 sink->comm.fdout);
667 peer_ret = GST_STATE_CHANGE_SUCCESS;
668 } else {
669 GST_ERROR_OBJECT (sink, "Not calling peer (fdout %d) and failing",
670 sink->comm.fdout);
671 peer_ret = GST_STATE_CHANGE_FAILURE;
672 }
673 }
674
675 /* chain up to the parent class to change our state, if the peer succeeded */
676 if (peer_ret != GST_STATE_CHANGE_FAILURE) {
677 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
678
679 if (G_UNLIKELY (ret == GST_STATE_CHANGE_FAILURE && down)) {
680 GST_WARNING_OBJECT (sink, "Parent returned state change failure, "
681 "but ignoring because we are going down");
682 ret = GST_STATE_CHANGE_SUCCESS;
683 }
684 }
685
686 GST_DEBUG_OBJECT (sink, "For %s -> %s: Peer ret: %s, parent ret: %s",
687 gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
688 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)),
689 gst_element_state_change_return_get_name (peer_ret),
690 gst_element_state_change_return_get_name (ret));
691
692 /* now interpret the return codes */
693 if (async && peer_ret != GST_STATE_CHANGE_ASYNC) {
694 GST_DEBUG_OBJECT (sink, "Posting async-done for %s; peer wasn't ASYNC",
695 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
696
697 GST_OBJECT_LOCK (sink);
698 sink->pass_next_async_done = FALSE;
699 GST_OBJECT_UNLOCK (sink);
700
701 gst_element_post_message (GST_ELEMENT (sink),
702 gst_message_new_async_done (GST_OBJECT (sink), GST_CLOCK_TIME_NONE));
703 } else if (G_UNLIKELY (!async && peer_ret == GST_STATE_CHANGE_ASYNC)) {
704 GST_WARNING_OBJECT (sink, "Transition not async but peer returned ASYNC");
705 peer_ret = GST_STATE_CHANGE_SUCCESS;
706 }
707
708 if (peer_ret == GST_STATE_CHANGE_FAILURE || ret == GST_STATE_CHANGE_FAILURE) {
709 if (peer_ret != GST_STATE_CHANGE_FAILURE && sink->comm.fdout >= 0) {
710 /* only the parent's ret was FAILURE - revert remote changes */
711 GST_DEBUG_OBJECT (sink, "Reverting remote state change because parent "
712 "returned failure");
713 gst_ipc_pipeline_comm_write_state_change_to_fd (&sink->comm,
714 GST_STATE_TRANSITION (GST_STATE_TRANSITION_NEXT (transition),
715 GST_STATE_TRANSITION_CURRENT (transition)));
716 }
717 return GST_STATE_CHANGE_FAILURE;
718 }
719
720 /* the parent's (GstElement) state change func won't return ASYNC or
721 * NO_PREROLL, so unless it has returned FAILURE, which we have caught above,
722 * we are not interested in its return code... just return the peer's */
723 return peer_ret;
724 }
725