• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wim@fluendo.com>
4  *                    2006 Thomas Vander Stichele <thomas at apestaart dot org>
5  *               2015-2017 YouView TV Ltd, Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
6  *
7  * gstipcpipelinesrc.c:
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24 /**
25  * SECTION:element-ipcpipelinesrc
26  * @see_also: #GstIpcPipelineSink, #GstIpcSlavePipeline
27  *
28  * Communicates with an ipcpipelinesink element in another process via a socket.
29  *
30  * The ipcpipelinesrc element allows 2-way communication with an ipcpipelinesink
31  * element on another process/pipeline. It is meant to run inside an
32  * interslavepipeline and when paired with an ipcpipelinesink, it will slave its
33  * whole parent pipeline to the "master" one, which contains the ipcpipelinesink.
34  *
35  * For more details about this mechanism and its uses, see the documentation
36  * of the ipcpipelinesink element.
37  */
38 
39 #ifdef HAVE_CONFIG_H
40 #  include "config.h"
41 #endif
42 
43 #include "gstipcpipelineelements.h"
44 #include "gstipcpipelinesrc.h"
45 
46 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
47     GST_PAD_SRC,
48     GST_PAD_ALWAYS,
49     GST_STATIC_CAPS_ANY);
50 
51 GST_DEBUG_CATEGORY_STATIC (gst_ipc_pipeline_src_debug);
52 #define GST_CAT_DEFAULT gst_ipc_pipeline_src_debug
53 
54 enum
55 {
56   /* FILL ME */
57   SIGNAL_FORWARD_MESSAGE,
58   SIGNAL_DISCONNECT,
59   LAST_SIGNAL
60 };
61 static guint gst_ipc_pipeline_src_signals[LAST_SIGNAL] = { 0 };
62 
63 enum
64 {
65   PROP_0,
66   PROP_FDIN,
67   PROP_FDOUT,
68   PROP_READ_CHUNK_SIZE,
69   PROP_ACK_TIME,
70   PROP_LAST,
71 };
72 
73 static GQuark QUARK_UPSTREAM;
74 
75 #define DEFAULT_READ_CHUNK_SIZE 65536
76 #define DEFAULT_ACK_TIME (10 * G_TIME_SPAN_SECOND)
77 
78 #define _do_init \
79     GST_DEBUG_CATEGORY_INIT (gst_ipc_pipeline_src_debug, "ipcpipelinesrc", 0, "ipcpipelinesrc element");
80 #define gst_ipc_pipeline_src_parent_class parent_class
81 G_DEFINE_TYPE_WITH_CODE (GstIpcPipelineSrc, gst_ipc_pipeline_src,
82     GST_TYPE_ELEMENT, _do_init);
83 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (ipcpipelinesrc, "ipcpipelinesrc",
84     GST_RANK_NONE, GST_TYPE_IPC_PIPELINE_SRC,
85     icepipeline_element_init (plugin));
86 
87 static void gst_ipc_pipeline_src_finalize (GObject * object);
88 static void gst_ipc_pipeline_src_dispose (GObject * object);
89 static void gst_ipc_pipeline_src_set_property (GObject * object, guint prop_id,
90     const GValue * value, GParamSpec * pspec);
91 static void gst_ipc_pipeline_src_get_property (GObject * object, guint prop_id,
92     GValue * value, GParamSpec * pspec);
93 
94 static void gst_ipc_pipeline_src_cancel_queued (GstIpcPipelineSrc * src);
95 
96 static gboolean gst_ipc_pipeline_src_start_reader_thread (GstIpcPipelineSrc *
97     src);
98 static void gst_ipc_pipeline_src_stop_reader_thread (GstIpcPipelineSrc * src);
99 
100 static gboolean gst_ipc_pipeline_src_activate_mode (GstPad * pad,
101     GstObject * parent, GstPadMode mode, gboolean active);
102 static gboolean gst_ipc_pipeline_src_srcpad_event (GstPad * pad,
103     GstObject * parent, GstEvent * event);
104 static gboolean gst_ipc_pipeline_src_srcpad_query (GstPad * pad,
105     GstObject * parent, GstQuery * query);
106 static void gst_ipc_pipeline_src_loop (GstIpcPipelineSrc * src);
107 
108 static gboolean gst_ipc_pipeline_src_send_event (GstElement * element,
109     GstEvent * event);
110 static gboolean gst_ipc_pipeline_src_query (GstElement * element,
111     GstQuery * query);
112 static GstStateChangeReturn gst_ipc_pipeline_src_change_state (GstElement *
113     element, GstStateChange transition);
114 
115 static gboolean gst_ipc_pipeline_src_forward_message (GstIpcPipelineSrc * src,
116     GstMessage * msg);
117 static void gst_ipc_pipeline_src_disconnect (GstIpcPipelineSrc * src);
118 
119 static void
gst_ipc_pipeline_src_class_init(GstIpcPipelineSrcClass * klass)120 gst_ipc_pipeline_src_class_init (GstIpcPipelineSrcClass * klass)
121 {
122   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
123   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
124 
125   QUARK_UPSTREAM = g_quark_from_static_string ("ipcpipeline-upstream");
126 
127   gobject_class->dispose = gst_ipc_pipeline_src_dispose;
128   gobject_class->finalize = gst_ipc_pipeline_src_finalize;
129 
130   gobject_class->set_property = gst_ipc_pipeline_src_set_property;
131   gobject_class->get_property = gst_ipc_pipeline_src_get_property;
132 
133   gstelement_class->send_event =
134       GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_send_event);
135   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_query);
136   gstelement_class->change_state =
137       GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_change_state);
138 
139   klass->forward_message =
140       GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_forward_message);
141   klass->disconnect = GST_DEBUG_FUNCPTR (gst_ipc_pipeline_src_disconnect);
142 
143   GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_activate_mode);
144   GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_srcpad_event);
145   GST_DEBUG_REGISTER_FUNCPTR (gst_ipc_pipeline_src_srcpad_query);
146 
147   g_object_class_install_property (gobject_class, PROP_FDIN,
148       g_param_spec_int ("fdin", "Input file descriptor",
149           "File descriptor to read data from",
150           -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
151   g_object_class_install_property (gobject_class, PROP_FDOUT,
152       g_param_spec_int ("fdout", "Output file descriptor",
153           "File descriptor to write data through",
154           -1, 0xffff, -1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
155   g_object_class_install_property (gobject_class, PROP_READ_CHUNK_SIZE,
156       g_param_spec_uint ("read-chunk-size", "Read chunk size",
157           "Read chunk size",
158           1, 1 << 24, DEFAULT_READ_CHUNK_SIZE,
159           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
160   g_object_class_install_property (gobject_class, PROP_ACK_TIME,
161       g_param_spec_uint64 ("ack-time", "Ack time",
162           "Maximum time to wait for a response to a message",
163           0, G_MAXUINT64, DEFAULT_ACK_TIME,
164           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
165 
166   gst_ipc_pipeline_src_signals[SIGNAL_FORWARD_MESSAGE] =
167       g_signal_new ("forward-message", G_TYPE_FROM_CLASS (klass),
168       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
169       G_STRUCT_OFFSET (GstIpcPipelineSrcClass, forward_message), NULL, NULL,
170       NULL, G_TYPE_BOOLEAN, 1, GST_TYPE_MESSAGE);
171 
172   gst_ipc_pipeline_src_signals[SIGNAL_DISCONNECT] =
173       g_signal_new ("disconnect", G_TYPE_FROM_CLASS (klass),
174       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
175       G_STRUCT_OFFSET (GstIpcPipelineSrcClass, disconnect), NULL, NULL, NULL,
176       G_TYPE_NONE, 0);
177 
178   gst_element_class_set_static_metadata (gstelement_class,
179       "Inter-process Pipeline Source",
180       "Source",
181       "Continues a split pipeline from another process",
182       "Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>");
183   gst_element_class_add_pad_template (gstelement_class,
184       gst_static_pad_template_get (&srctemplate));
185 }
186 
187 static void
gst_ipc_pipeline_src_init(GstIpcPipelineSrc * src)188 gst_ipc_pipeline_src_init (GstIpcPipelineSrc * src)
189 {
190   GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_SOURCE);
191 
192   gst_ipc_pipeline_comm_init (&src->comm, GST_ELEMENT (src));
193   src->comm.read_chunk_size = DEFAULT_READ_CHUNK_SIZE;
194   src->comm.ack_time = DEFAULT_ACK_TIME;
195   src->flushing = TRUE;
196   src->last_ret = GST_FLOW_FLUSHING;
197   src->queued = NULL;
198   g_cond_init (&src->create_cond);
199 
200   src->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
201   gst_pad_set_activatemode_function (src->srcpad,
202       gst_ipc_pipeline_src_activate_mode);
203   gst_pad_set_event_function (src->srcpad, gst_ipc_pipeline_src_srcpad_event);
204   gst_pad_set_query_function (src->srcpad, gst_ipc_pipeline_src_srcpad_query);
205   gst_element_add_pad (GST_ELEMENT (src), src->srcpad);
206 
207   gst_ipc_pipeline_src_start_reader_thread (src);
208 }
209 
210 static void
gst_ipc_pipeline_src_dispose(GObject * object)211 gst_ipc_pipeline_src_dispose (GObject * object)
212 {
213   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
214 
215   gst_ipc_pipeline_src_stop_reader_thread (src);
216   gst_ipc_pipeline_src_cancel_queued (src);
217   gst_ipc_pipeline_comm_cancel (&src->comm, TRUE);
218 
219   G_OBJECT_CLASS (parent_class)->dispose (object);
220 }
221 
222 static void
gst_ipc_pipeline_src_finalize(GObject * object)223 gst_ipc_pipeline_src_finalize (GObject * object)
224 {
225   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
226 
227   gst_ipc_pipeline_comm_clear (&src->comm);
228   g_cond_clear (&src->create_cond);
229 
230   G_OBJECT_CLASS (parent_class)->finalize (object);
231 }
232 
233 static void
gst_ipc_pipeline_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)234 gst_ipc_pipeline_src_set_property (GObject * object, guint prop_id,
235     const GValue * value, GParamSpec * pspec)
236 {
237   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
238 
239   switch (prop_id) {
240     case PROP_FDIN:
241       src->comm.fdin = g_value_get_int (value);
242       break;
243     case PROP_FDOUT:
244       src->comm.fdout = g_value_get_int (value);
245       break;
246     case PROP_READ_CHUNK_SIZE:
247       src->comm.read_chunk_size = g_value_get_uint (value);
248       break;
249     case PROP_ACK_TIME:
250       src->comm.ack_time = g_value_get_uint64 (value);
251       break;
252     default:
253       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
254       break;
255   }
256 }
257 
258 static void
gst_ipc_pipeline_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)259 gst_ipc_pipeline_src_get_property (GObject * object, guint prop_id,
260     GValue * value, GParamSpec * pspec)
261 {
262   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (object);
263 
264   g_return_if_fail (GST_IS_IPC_PIPELINE_SRC (object));
265 
266   switch (prop_id) {
267     case PROP_FDIN:
268       g_value_set_int (value, src->comm.fdin);
269       break;
270     case PROP_FDOUT:
271       g_value_set_int (value, src->comm.fdout);
272       break;
273     case PROP_READ_CHUNK_SIZE:
274       g_value_set_uint (value, src->comm.read_chunk_size);
275       break;
276     case PROP_ACK_TIME:
277       g_value_set_uint64 (value, src->comm.ack_time);
278       break;
279     default:
280       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
281       break;
282   }
283 }
284 
285 static void
gst_ipc_pipeline_src_log_queue(GstIpcPipelineSrc * src)286 gst_ipc_pipeline_src_log_queue (GstIpcPipelineSrc * src)
287 {
288   GList *queued;
289   guint n;
290 
291   queued = src->queued;
292   n = 0;
293   GST_LOG_OBJECT (src, "There are %u objects in the queue",
294       g_list_length (queued));
295   while (queued) {
296     void *object = queued->data;
297     if (GST_IS_EVENT (object)) {
298       GST_LOG_OBJECT (src, "  #%u: %s event", n, GST_EVENT_TYPE_NAME (object));
299     } else if (GST_IS_QUERY (object)) {
300       GST_LOG_OBJECT (src, "  #%u: %s query", n, GST_QUERY_TYPE_NAME (object));
301     } else if (GST_IS_BUFFER (object)) {
302       GST_LOG_OBJECT (src, "  #%u: %" G_GSIZE_FORMAT " bytes buffer", n,
303           gst_buffer_get_size (object));
304     } else {
305       GST_LOG_OBJECT (src, "  #%u: unknown item in queue", n);
306     }
307     queued = queued->next;
308     ++n;
309   }
310 }
311 
312 static void
gst_ipc_pipeline_src_cancel_queued(GstIpcPipelineSrc * src)313 gst_ipc_pipeline_src_cancel_queued (GstIpcPipelineSrc * src)
314 {
315   GList *queued;
316   guint32 id;
317 
318   g_mutex_lock (&src->comm.mutex);
319   queued = src->queued;
320   src->queued = NULL;
321   g_cond_broadcast (&src->create_cond);
322   g_mutex_unlock (&src->comm.mutex);
323 
324   while (queued) {
325     void *object = queued->data;
326 
327     id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (object),
328             QUARK_ID));
329 
330     queued = g_list_delete_link (queued, queued);
331     if (GST_IS_EVENT (object)) {
332       GstEvent *event = GST_EVENT (object);
333       GST_DEBUG_OBJECT (src, "Cancelling queued event: %" GST_PTR_FORMAT,
334           event);
335       gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE);
336       gst_event_unref (event);
337     } else if (GST_IS_BUFFER (object)) {
338       GstBuffer *buffer = GST_BUFFER (object);
339       GST_DEBUG_OBJECT (src, "Cancelling queued buffer: %" GST_PTR_FORMAT,
340           buffer);
341       gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id,
342           GST_FLOW_FLUSHING);
343       gst_buffer_unref (buffer);
344     } else if (GST_IS_QUERY (object)) {
345       GstQuery *query = GST_QUERY (object);
346       GST_DEBUG_OBJECT (src, "Cancelling queued query: %" GST_PTR_FORMAT,
347           query);
348       gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, FALSE,
349           query);
350       gst_query_unref (query);
351     }
352   }
353 
354 }
355 
356 static void
gst_ipc_pipeline_src_start_loop(GstIpcPipelineSrc * src)357 gst_ipc_pipeline_src_start_loop (GstIpcPipelineSrc * src)
358 {
359   g_mutex_lock (&src->comm.mutex);
360   src->flushing = FALSE;
361   src->last_ret = GST_FLOW_OK;
362   g_mutex_unlock (&src->comm.mutex);
363 
364   gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_ipc_pipeline_src_loop,
365       src, NULL);
366 }
367 
368 static void
gst_ipc_pipeline_src_stop_loop(GstIpcPipelineSrc * src,gboolean stop)369 gst_ipc_pipeline_src_stop_loop (GstIpcPipelineSrc * src, gboolean stop)
370 {
371   g_mutex_lock (&src->comm.mutex);
372   src->flushing = TRUE;
373   g_cond_broadcast (&src->create_cond);
374   g_mutex_unlock (&src->comm.mutex);
375 
376   if (stop)
377     gst_pad_stop_task (src->srcpad);
378 }
379 
380 static gboolean
gst_ipc_pipeline_src_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)381 gst_ipc_pipeline_src_activate_mode (GstPad * pad, GstObject * parent,
382     GstPadMode mode, gboolean active)
383 {
384   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent);
385 
386   switch (mode) {
387     case GST_PAD_MODE_PUSH:
388       GST_DEBUG_OBJECT (pad, "%s in push mode", active ? "activating" :
389           "deactivating");
390       if (active) {
391         gst_ipc_pipeline_src_start_loop (src);
392       } else {
393         gst_ipc_pipeline_src_stop_loop (src, TRUE);
394         gst_ipc_pipeline_comm_cancel (&src->comm, FALSE);
395       }
396       return TRUE;
397     default:
398       GST_DEBUG_OBJECT (pad, "unsupported activation mode");
399       return FALSE;
400   }
401 }
402 
403 static gboolean
gst_ipc_pipeline_src_srcpad_event(GstPad * pad,GstObject * parent,GstEvent * event)404 gst_ipc_pipeline_src_srcpad_event (GstPad * pad, GstObject * parent,
405     GstEvent * event)
406 {
407   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent);
408   gboolean ret;
409 
410   GST_DEBUG_OBJECT (src, "Got upstream event %s", GST_EVENT_TYPE_NAME (event));
411 
412   ret = gst_ipc_pipeline_comm_write_event_to_fd (&src->comm, TRUE, event);
413   gst_event_unref (event);
414 
415   GST_DEBUG_OBJECT (src, "Returning event result: %d", ret);
416   return ret;
417 }
418 
419 static gboolean
gst_ipc_pipeline_src_srcpad_query(GstPad * pad,GstObject * parent,GstQuery * query)420 gst_ipc_pipeline_src_srcpad_query (GstPad * pad, GstObject * parent,
421     GstQuery * query)
422 {
423   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (parent);
424   gboolean ret;
425 
426   /* answer some queries that do not make sense to be forwarded */
427   switch (GST_QUERY_TYPE (query)) {
428     case GST_QUERY_LATENCY:
429       return TRUE;
430     case GST_QUERY_CONTEXT:
431       return FALSE;
432     case GST_QUERY_CAPS:
433     {
434       /* caps queries occur even while linking the pipeline.
435        * It is possible that the ipcpipelinesink may not be connected at this
436        * point, so let's avoid a couple of errors... */
437       GstState state;
438       GST_OBJECT_LOCK (src);
439       state = GST_STATE (src);
440       GST_OBJECT_UNLOCK (src);
441       if (state == GST_STATE_NULL)
442         return FALSE;
443     }
444     default:
445       break;
446   }
447 
448   GST_DEBUG_OBJECT (src, "Got upstream query %s: %" GST_PTR_FORMAT,
449       GST_QUERY_TYPE_NAME (query), query);
450 
451   ret = gst_ipc_pipeline_comm_write_query_to_fd (&src->comm, TRUE, query);
452 
453   GST_DEBUG_OBJECT (src, "Returning query result: %d, %" GST_PTR_FORMAT,
454       ret, query);
455   return ret;
456 }
457 
458 static void
gst_ipc_pipeline_src_loop(GstIpcPipelineSrc * src)459 gst_ipc_pipeline_src_loop (GstIpcPipelineSrc * src)
460 {
461   gpointer object;
462   guint32 id;
463   gboolean ok;
464   GstFlowReturn ret = GST_FLOW_OK;
465 
466   g_mutex_lock (&src->comm.mutex);
467 
468   while (!src->queued && !src->flushing)
469     g_cond_wait (&src->create_cond, &src->comm.mutex);
470 
471   if (src->flushing)
472     goto out;
473 
474   object = src->queued->data;
475   src->queued = g_list_delete_link (src->queued, src->queued);
476   g_mutex_unlock (&src->comm.mutex);
477 
478   id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (object),
479           QUARK_ID));
480 
481   if (GST_IS_BUFFER (object)) {
482     GstBuffer *buf = GST_BUFFER (object);
483     GST_DEBUG_OBJECT (src, "Pushing queued buffer: %" GST_PTR_FORMAT, buf);
484     ret = gst_pad_push (src->srcpad, buf);
485     GST_DEBUG_OBJECT (src, "pushed id %u, ret: %s", id,
486         gst_flow_get_name (ret));
487     gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, ret);
488   } else if (GST_IS_EVENT (object)) {
489     GstEvent *event = GST_EVENT (object);
490     GST_DEBUG_OBJECT (src, "Pushing queued event: %" GST_PTR_FORMAT, event);
491     ok = gst_pad_push_event (src->srcpad, event);
492     gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok);
493   } else if (GST_IS_QUERY (object)) {
494     GstQuery *query = GST_QUERY (object);
495     GST_DEBUG_OBJECT (src, "Pushing queued query: %" GST_PTR_FORMAT, query);
496     ok = gst_pad_peer_query (src->srcpad, query);
497     gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, ok, query);
498     gst_query_unref (query);
499   } else {
500     GST_WARNING_OBJECT (src, "Unknown data type queued");
501   }
502 
503   g_mutex_lock (&src->comm.mutex);
504   if (!src->queued)
505     g_cond_broadcast (&src->create_cond);
506 out:
507   if (src->flushing)
508     ret = GST_FLOW_FLUSHING;
509   if (ret != GST_FLOW_OK)
510     src->last_ret = ret;
511   g_mutex_unlock (&src->comm.mutex);
512 
513   if (ret == GST_FLOW_FLUSHING) {
514     gst_ipc_pipeline_src_cancel_queued (src);
515     gst_pad_pause_task (src->srcpad);
516   }
517 }
518 
519 static gboolean
gst_ipc_pipeline_src_send_event(GstElement * element,GstEvent * event)520 gst_ipc_pipeline_src_send_event (GstElement * element, GstEvent * event)
521 {
522   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
523   return gst_pad_push_event (src->srcpad, event);
524 }
525 
526 static gboolean
gst_ipc_pipeline_src_query(GstElement * element,GstQuery * query)527 gst_ipc_pipeline_src_query (GstElement * element, GstQuery * query)
528 {
529   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
530   return gst_pad_query (src->srcpad, query);
531 }
532 
533 static GstElement *
find_pipeline(GstElement * element)534 find_pipeline (GstElement * element)
535 {
536   GstElement *pipeline = element;
537   while (GST_ELEMENT_PARENT (pipeline)) {
538     pipeline = GST_ELEMENT_PARENT (pipeline);
539     if (GST_IS_PIPELINE (pipeline))
540       break;
541   }
542   if (!pipeline || !GST_IS_PIPELINE (pipeline)) {
543     pipeline = NULL;
544   }
545   return pipeline;
546 }
547 
548 static gboolean
gst_ipc_pipeline_src_forward_message(GstIpcPipelineSrc * src,GstMessage * msg)549 gst_ipc_pipeline_src_forward_message (GstIpcPipelineSrc * src, GstMessage * msg)
550 {
551   gboolean skip = FALSE;
552 
553   GST_DEBUG_OBJECT (src, "Message to forward: %" GST_PTR_FORMAT, msg);
554 
555   switch (GST_MESSAGE_TYPE (msg)) {
556     case GST_MESSAGE_STATE_CHANGED:
557     {
558       GstState old, new, pending;
559       GstElement *pipeline = find_pipeline (GST_ELEMENT (src));
560 
561       gst_message_parse_state_changed (msg, &old, &new, &pending);
562 
563       if (GST_MESSAGE_SRC (msg) == GST_OBJECT (pipeline) &&
564           old == new && new == pending) {
565         GST_DEBUG_OBJECT (src, "Detected lost state, notifying master");
566         gst_ipc_pipeline_comm_write_state_lost_to_fd (&src->comm);
567       }
568       /* fall through & skip */
569     }
570     case GST_MESSAGE_ASYNC_START:
571     case GST_MESSAGE_CLOCK_PROVIDE:
572     case GST_MESSAGE_CLOCK_LOST:
573     case GST_MESSAGE_NEW_CLOCK:
574     case GST_MESSAGE_STREAM_STATUS:
575     case GST_MESSAGE_NEED_CONTEXT:
576     case GST_MESSAGE_HAVE_CONTEXT:
577     case GST_MESSAGE_STRUCTURE_CHANGE:
578       skip = TRUE;
579       break;
580     case GST_MESSAGE_RESET_TIME:
581     {
582       GQuark ipcpipelinesrc_posted = g_quark_from_static_string
583           ("gstinterslavepipeline-message-already-posted");
584 
585       skip = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT (msg),
586               ipcpipelinesrc_posted));
587       if (!skip) {
588         gst_mini_object_set_qdata (GST_MINI_OBJECT (msg), ipcpipelinesrc_posted,
589             GUINT_TO_POINTER (1), NULL);
590       }
591       break;
592     }
593     case GST_MESSAGE_ERROR:
594     {
595       GError *error = NULL;
596 
597       /* skip forwarding a RESOURCE/WRITE error message that originated from
598        * ipcpipelinesrc; we post this error when writing to the comm fd fails,
599        * so if we try to forward it here, we will likely get another one posted
600        * immediately and end up doing an endless loop */
601       gst_message_parse_error (msg, &error, NULL);
602       skip = (GST_MESSAGE_SRC (msg) == GST_OBJECT_CAST (src)
603           && error->domain == gst_resource_error_quark ()
604           && error->code == GST_RESOURCE_ERROR_WRITE);
605       g_error_free (error);
606       break;
607     }
608     default:
609       break;
610   }
611 
612   if (skip) {
613     GST_DEBUG_OBJECT (src, "message will not be forwarded");
614     return TRUE;
615   }
616 
617   return gst_ipc_pipeline_comm_write_message_to_fd (&src->comm, msg);
618 }
619 
620 static void
on_buffer(guint32 id,GstBuffer * buffer,gpointer user_data)621 on_buffer (guint32 id, GstBuffer * buffer, gpointer user_data)
622 {
623   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
624   GST_DEBUG_OBJECT (src, "Got buffer id %u, queueing: %" GST_PTR_FORMAT, id,
625       buffer);
626   g_mutex_lock (&src->comm.mutex);
627   if (!GST_PAD_IS_ACTIVE (src->srcpad) || src->flushing) {
628     g_mutex_unlock (&src->comm.mutex);
629     GST_INFO_OBJECT (src, "We're not started or flushing, buffer ignored");
630     gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id,
631         GST_FLOW_FLUSHING);
632     gst_buffer_unref (buffer);
633     return;
634   }
635   if (src->last_ret != GST_FLOW_OK) {
636     GstFlowReturn last_ret = src->last_ret;
637     g_mutex_unlock (&src->comm.mutex);
638     GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting buffer",
639         gst_flow_get_name (last_ret));
640     gst_ipc_pipeline_comm_write_flow_ack_to_fd (&src->comm, id, last_ret);
641     gst_buffer_unref (buffer);
642     return;
643   }
644   src->queued = g_list_append (src->queued, buffer);    /* keep the ref */
645   gst_ipc_pipeline_src_log_queue (src);
646   g_cond_broadcast (&src->create_cond);
647   g_mutex_unlock (&src->comm.mutex);
648 }
649 
650 static void
do_oob_event(GstElement * element,gpointer user_data)651 do_oob_event (GstElement * element, gpointer user_data)
652 {
653   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
654   GstEvent *event = user_data;
655   gboolean ret, upstream;
656   guint32 id;
657 
658   id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
659           (event), QUARK_ID));
660   upstream = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
661           (event), QUARK_UPSTREAM));
662 
663   if (upstream) {
664     GstElement *pipeline;
665     gboolean ok = FALSE;
666 
667     if (!(pipeline = find_pipeline (element))) {
668       GST_ERROR_OBJECT (src, "No pipeline found");
669       gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok);
670     } else {
671       GST_DEBUG_OBJECT (src, "Posting upstream event on pipeline: %"
672           GST_PTR_FORMAT, event);
673       ok = gst_element_send_event (pipeline, gst_event_ref (event));
674       gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ok);
675     }
676   } else {
677     GST_DEBUG_OBJECT (src, "Pushing event async: %" GST_PTR_FORMAT, event);
678     ret = gst_element_send_event (element, gst_event_ref (event));
679     GST_DEBUG_OBJECT (src, "Event pushed, return %d", ret);
680     gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, ret);
681   }
682 }
683 
684 static void
on_event(guint32 id,GstEvent * event,gboolean upstream,gpointer user_data)685 on_event (guint32 id, GstEvent * event, gboolean upstream, gpointer user_data)
686 {
687   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
688   GstFlowReturn last_ret = GST_FLOW_OK;
689 
690   GST_DEBUG_OBJECT (src, "Got event id %u, queueing: %" GST_PTR_FORMAT, id,
691       event);
692 
693   gst_mini_object_set_qdata (GST_MINI_OBJECT (event), QUARK_UPSTREAM,
694       GINT_TO_POINTER (upstream), NULL);
695 
696   switch (GST_EVENT_TYPE (event)) {
697     case GST_EVENT_FLUSH_START:
698       gst_ipc_pipeline_src_stop_loop (src, FALSE);
699       break;
700     case GST_EVENT_FLUSH_STOP:
701       gst_ipc_pipeline_src_start_loop (src);
702       break;
703     default:
704       g_mutex_lock (&src->comm.mutex);
705       last_ret = src->last_ret;
706       g_mutex_unlock (&src->comm.mutex);
707       break;
708   }
709 
710   if (GST_EVENT_IS_SERIALIZED (event) && !upstream) {
711     if (last_ret != GST_FLOW_OK) {
712       GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting event",
713           gst_flow_get_name (last_ret));
714       gst_event_unref (event);
715       gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE);
716     } else {
717       GST_DEBUG_OBJECT (src, "This is a serialized event, adding to queue %p",
718           src->queued);
719       g_mutex_lock (&src->comm.mutex);
720       src->queued = g_list_append (src->queued, event); /* keep the ref */
721       gst_ipc_pipeline_src_log_queue (src);
722       g_cond_broadcast (&src->create_cond);
723       g_mutex_unlock (&src->comm.mutex);
724     }
725   } else {
726     if (last_ret != GST_FLOW_OK && !upstream) {
727       GST_DEBUG_OBJECT (src, "Last flow was %s, rejecting event",
728           gst_flow_get_name (last_ret));
729       gst_ipc_pipeline_comm_write_boolean_ack_to_fd (&src->comm, id, FALSE);
730       gst_event_unref (event);
731     } else {
732       GST_DEBUG_OBJECT (src,
733           "This is not a serialized event, pushing in a thread");
734       gst_element_call_async (GST_ELEMENT (src), do_oob_event, event,
735           (GDestroyNotify) gst_event_unref);
736     }
737   }
738 }
739 
740 static void
do_oob_query(GstElement * element,gpointer user_data)741 do_oob_query (GstElement * element, gpointer user_data)
742 {
743   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
744   GstQuery *query = GST_QUERY (user_data);
745   guint32 id;
746   gboolean upstream;
747   gboolean ret;
748 
749   id = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
750           (query), QUARK_ID));
751   upstream = GPOINTER_TO_INT (gst_mini_object_get_qdata (GST_MINI_OBJECT
752           (query), QUARK_UPSTREAM));
753 
754   if (upstream) {
755     GstElement *pipeline;
756 
757     if (!(pipeline = find_pipeline (element))) {
758       GST_ERROR_OBJECT (src, "No pipeline found");
759       ret = FALSE;
760     } else {
761       GST_DEBUG_OBJECT (src, "Posting query on pipeline: %" GST_PTR_FORMAT,
762           query);
763       ret = gst_element_query (pipeline, query);
764     }
765   } else {
766     GST_DEBUG_OBJECT (src, "Pushing query async: %" GST_PTR_FORMAT, query);
767     ret = gst_pad_peer_query (src->srcpad, query);
768     GST_DEBUG_OBJECT (src, "Query pushed, return %d", ret);
769   }
770   gst_ipc_pipeline_comm_write_query_result_to_fd (&src->comm, id, ret, query);
771 }
772 
773 static void
on_query(guint32 id,GstQuery * query,gboolean upstream,gpointer user_data)774 on_query (guint32 id, GstQuery * query, gboolean upstream, gpointer user_data)
775 {
776   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
777 
778   GST_DEBUG_OBJECT (src, "Got query id %u, queueing: %" GST_PTR_FORMAT, id,
779       query);
780 
781   if (GST_QUERY_IS_SERIALIZED (query) && !upstream) {
782     g_mutex_lock (&src->comm.mutex);
783     src->queued = g_list_append (src->queued, query);   /* keep the ref */
784     gst_ipc_pipeline_src_log_queue (src);
785     g_cond_broadcast (&src->create_cond);
786     g_mutex_unlock (&src->comm.mutex);
787   } else {
788     gst_mini_object_set_qdata (GST_MINI_OBJECT (query), QUARK_UPSTREAM,
789         GINT_TO_POINTER (upstream), NULL);
790     gst_element_call_async (GST_ELEMENT (src), do_oob_query, query,
791         (GDestroyNotify) gst_query_unref);
792   }
793 }
794 
795 struct StateChangeData
796 {
797   guint32 id;
798   GstStateChange transition;
799 };
800 
801 static void
do_state_change(GstElement * element,gpointer data)802 do_state_change (GstElement * element, gpointer data)
803 {
804   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
805   GstElement *pipeline;
806   GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE;
807   GstState state, pending, effective;
808   struct StateChangeData *d = data;
809   guint32 id = d->id;
810   GstStateChange transition = d->transition;
811   gboolean down;
812 
813   GST_DEBUG_OBJECT (src, "Doing state change id %u, %s -> %s", id,
814       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
815       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
816 
817   if (!(pipeline = find_pipeline (element))) {
818     GST_ERROR_OBJECT (src, "No pipeline found");
819     ret = GST_STATE_CHANGE_FAILURE;
820     goto done_nolock;
821   }
822 
823   down = (GST_STATE_TRANSITION_CURRENT (transition) >=
824       GST_STATE_TRANSITION_NEXT (transition));
825 
826   GST_STATE_LOCK (pipeline);
827   ret = gst_element_get_state (pipeline, &state, &pending, 0);
828 
829   /* if we are pending a state change, count the pending state as
830    * the current one */
831   effective = pending == GST_STATE_VOID_PENDING ? state : pending;
832 
833   GST_DEBUG_OBJECT (src, "Current element state: ret:%s state:%s pending:%s "
834       "effective:%s", gst_element_state_change_return_get_name (ret),
835       gst_element_state_get_name (state),
836       gst_element_state_get_name (pending),
837       gst_element_state_get_name (effective));
838 
839   if ((GST_STATE_TRANSITION_NEXT (transition) <= effective && !down) ||
840       (GST_STATE_TRANSITION_NEXT (transition) > effective && down)) {
841     /* if the request was to transition to a state that we have already
842      * transitioned to in the same direction, then we just silently return */
843     GST_DEBUG_OBJECT (src, "State transition to %s is unnecessary",
844         gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
845     /* make sure we return SUCCESS if the transition is to NULL or READY,
846      * even if our current ret is ASYNC for example; also, make sure not
847      * to return FAILURE, since our state is already committed */
848     if (GST_STATE_TRANSITION_NEXT (transition) <= GST_STATE_READY ||
849         ret == GST_STATE_CHANGE_FAILURE) {
850       ret = GST_STATE_CHANGE_SUCCESS;
851     }
852   } else if (ret != GST_STATE_CHANGE_FAILURE || down) {
853     /* if the request was to transition to a state that we haven't already
854      * transitioned to in the same direction, then we need to request a state
855      * change in the pipeline, *unless* we are going upwards and the last ret
856      * was FAILURE, in which case we should just return FAILURE and stop.
857      * We don't stop a downwards state change though in case of FAILURE, since
858      * we need to be able to bring the pipeline down to NULL. Note that
859      * GST_MESSAGE_ERROR will cause ret to be GST_STATE_CHANGE_FAILURE */
860     ret = gst_element_set_state (pipeline,
861         GST_STATE_TRANSITION_NEXT (transition));
862     GST_DEBUG_OBJECT (src, "gst_element_set_state returned %s",
863         gst_element_state_change_return_get_name (ret));
864   }
865 
866   GST_STATE_UNLOCK (pipeline);
867 
868 done_nolock:
869   GST_DEBUG_OBJECT (src, "sending state change ack, ret = %s",
870       gst_element_state_change_return_get_name (ret));
871   gst_ipc_pipeline_comm_write_state_change_ack_to_fd (&src->comm, id, ret);
872 }
873 
874 static void
on_state_change(guint32 id,GstStateChange transition,gpointer user_data)875 on_state_change (guint32 id, GstStateChange transition, gpointer user_data)
876 {
877   struct StateChangeData *d;
878   GstElement *ipcpipelinesrc = GST_ELEMENT (user_data);
879 
880   GST_DEBUG_OBJECT (ipcpipelinesrc, "Got state change id %u, %s -> %s", id,
881       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
882       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
883 
884   d = g_new (struct StateChangeData, 1);
885   d->id = id;
886   d->transition = transition;
887 
888   gst_element_call_async (ipcpipelinesrc, do_state_change, d, g_free);
889 }
890 
891 static void
on_message(guint32 id,GstMessage * message,gpointer user_data)892 on_message (guint32 id, GstMessage * message, gpointer user_data)
893 {
894   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (user_data);
895 
896   GST_ERROR_OBJECT (src, "Got message id %u, not supposed to: %" GST_PTR_FORMAT,
897       id, message);
898   gst_message_unref (message);
899 }
900 
901 static gboolean
gst_ipc_pipeline_src_start_reader_thread(GstIpcPipelineSrc * src)902 gst_ipc_pipeline_src_start_reader_thread (GstIpcPipelineSrc * src)
903 {
904   if (!gst_ipc_pipeline_comm_start_reader_thread (&src->comm, on_buffer,
905           on_event, on_query, on_state_change, NULL, on_message, src)) {
906     GST_ERROR_OBJECT (src, "Failed to start reader thread");
907     return FALSE;
908   }
909   return TRUE;
910 }
911 
912 static void
gst_ipc_pipeline_src_stop_reader_thread(GstIpcPipelineSrc * src)913 gst_ipc_pipeline_src_stop_reader_thread (GstIpcPipelineSrc * src)
914 {
915   gst_ipc_pipeline_comm_stop_reader_thread (&src->comm);
916 }
917 
918 static void
gst_ipc_pipeline_src_disconnect(GstIpcPipelineSrc * src)919 gst_ipc_pipeline_src_disconnect (GstIpcPipelineSrc * src)
920 {
921   GST_DEBUG_OBJECT (src, "Disconnecting");
922   gst_ipc_pipeline_src_stop_reader_thread (src);
923   src->comm.fdin = -1;
924   src->comm.fdout = -1;
925   gst_ipc_pipeline_comm_cancel (&src->comm, FALSE);
926   gst_ipc_pipeline_src_start_reader_thread (src);
927 }
928 
929 static GstStateChangeReturn
gst_ipc_pipeline_src_change_state(GstElement * element,GstStateChange transition)930 gst_ipc_pipeline_src_change_state (GstElement * element,
931     GstStateChange transition)
932 {
933   GstIpcPipelineSrc *src = GST_IPC_PIPELINE_SRC (element);
934 
935   switch (transition) {
936     case GST_STATE_CHANGE_NULL_TO_READY:
937       if (src->comm.fdin < 0) {
938         GST_ERROR_OBJECT (element, "Invalid fdin: %d", src->comm.fdin);
939         return GST_STATE_CHANGE_FAILURE;
940       }
941       if (src->comm.fdout < 0) {
942         GST_ERROR_OBJECT (element, "Invalid fdout: %d", src->comm.fdout);
943         return GST_STATE_CHANGE_FAILURE;
944       }
945       if (!src->comm.reader_thread) {
946         GST_ERROR_OBJECT (element, "Failed to start reader thread");
947         return GST_STATE_CHANGE_FAILURE;
948       }
949       break;
950     default:
951       break;
952   }
953   return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
954 }
955