• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  *
3  * example program for the ipcpipelinesrc/ipcpipelinesink elements
4  *
5  * Copyright (C) 2013-2014 Tim-Philipp Müller <tim centricular net>
6  * Copyright (C) 2013 Collabora Ltd.
7  * Copyright (C) 2015 Centricular Ltd
8  * Copyright (C) 2015-2017 YouView TV Ltd
9  *   Author: George Kiagiadakis <george.kiagiadakis@collabora.com>
10  *
11  * This library is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Library General Public
13  * License as published by the Free Software Foundation; either
14  * version 2 of the License, or (at your option) any later version.
15  *
16  * This library is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19  * Library General Public License for more details.
20  *
21  * You should have received a copy of the GNU Library General Public
22  * License along with this library; if not, write to the
23  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24  * Boston, MA 02110-1301, USA.
25  */
26 
27 /*
28  * Based on gst-play and ipcpipeline1. This program will play any URI
29  * while splitting the pipeline in two processes, running the source & demuxer
30  * on the master process and the decoders & sinks on the slave.
31  * See keyboard_cb() for the various keyboard shortcuts you can use to
32  * interact with it while the video window is focused.
33  */
34 
35 #define _GNU_SOURCE
36 #include <unistd.h>
37 #include <fcntl.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <string.h>
41 #include <errno.h>
42 #include <sys/types.h>
43 #include <sys/socket.h>
44 #include <gst/gst.h>
45 #include <gst/video/navigation.h>
46 
47 static GMainLoop *loop;
48 static int pipes[2] = { -1, -1 };
49 
50 static const char *arg_video_sink = "autovideosink";
51 static const char *arg_audio_sink = "autoaudiosink";
52 
53 /******* MASTER *******/
54 
55 #define GST_PLAY_KB_ARROW_UP    "\033[A"
56 #define GST_PLAY_KB_ARROW_DOWN  "\033[B"
57 #define GST_PLAY_KB_ARROW_RIGHT "\033[C"
58 #define GST_PLAY_KB_ARROW_LEFT  "\033[D"
59 
60 typedef enum
61 {
62   GST_PLAY_TRICK_MODE_NONE = 0,
63   GST_PLAY_TRICK_MODE_DEFAULT,
64   GST_PLAY_TRICK_MODE_DEFAULT_NO_AUDIO,
65   GST_PLAY_TRICK_MODE_KEY_UNITS,
66   GST_PLAY_TRICK_MODE_KEY_UNITS_NO_AUDIO,
67   GST_PLAY_TRICK_MODE_LAST
68 } GstPlayTrickMode;
69 
70 static GstPlayTrickMode trick_mode = GST_PLAY_TRICK_MODE_NONE;
71 static gdouble cur_rate = 1.0;
72 static gboolean buffering = FALSE;
73 static GstState desired_state = GST_STATE_PLAYING;
74 
75 static gboolean play_do_seek (GstElement * pipeline, gint64 pos, gdouble rate,
76     GstPlayTrickMode mode);
77 
78 static void
toggle_paused(GstElement * pipeline)79 toggle_paused (GstElement * pipeline)
80 {
81   if (desired_state == GST_STATE_PLAYING)
82     desired_state = GST_STATE_PAUSED;
83   else
84     desired_state = GST_STATE_PLAYING;
85 
86   if (!buffering) {
87     gst_element_set_state (pipeline, desired_state);
88   } else if (desired_state == GST_STATE_PLAYING) {
89     g_print ("\nWill play as soon as buffering finishes)\n");
90   }
91 }
92 
93 static void
relative_seek(GstElement * pipeline,gdouble percent)94 relative_seek (GstElement * pipeline, gdouble percent)
95 {
96   GstQuery *query;
97   gboolean seekable = FALSE;
98   gint64 dur = -1, pos = -1, step;
99 
100   g_return_if_fail (percent >= -1.0 && percent <= 1.0);
101 
102   if (!gst_element_query_position (pipeline, GST_FORMAT_TIME, &pos))
103     goto seek_failed;
104 
105   query = gst_query_new_seeking (GST_FORMAT_TIME);
106   if (!gst_element_query (pipeline, query)) {
107     gst_query_unref (query);
108     goto seek_failed;
109   }
110 
111   gst_query_parse_seeking (query, NULL, &seekable, NULL, &dur);
112   gst_query_unref (query);
113 
114   if (!seekable || dur <= 0)
115     goto seek_failed;
116 
117   step = dur * percent;
118   if (ABS (step) < GST_SECOND)
119     step = (percent < 0) ? -GST_SECOND : GST_SECOND;
120 
121   pos = pos + step;
122   if (pos > dur) {
123     g_print ("\nReached end of play list.\n");
124     g_main_loop_quit (loop);
125   } else {
126     if (pos < 0)
127       pos = 0;
128 
129     play_do_seek (pipeline, pos, cur_rate, trick_mode);
130   }
131 
132   return;
133 
134 seek_failed:
135   {
136     g_print ("\nCould not seek.\n");
137   }
138 }
139 
140 static gboolean
play_set_rate_and_trick_mode(GstElement * pipeline,gdouble rate,GstPlayTrickMode mode)141 play_set_rate_and_trick_mode (GstElement * pipeline, gdouble rate,
142     GstPlayTrickMode mode)
143 {
144   gint64 pos = -1;
145 
146   g_return_val_if_fail (rate != 0, FALSE);
147 
148   if (!gst_element_query_position (pipeline, GST_FORMAT_TIME, &pos))
149     return FALSE;
150 
151   return play_do_seek (pipeline, pos, rate, mode);
152 }
153 
154 static gboolean
play_do_seek(GstElement * pipeline,gint64 pos,gdouble rate,GstPlayTrickMode mode)155 play_do_seek (GstElement * pipeline, gint64 pos, gdouble rate,
156     GstPlayTrickMode mode)
157 {
158   GstSeekFlags seek_flags;
159   GstQuery *query;
160   GstEvent *seek;
161   gboolean seekable = FALSE;
162 
163   query = gst_query_new_seeking (GST_FORMAT_TIME);
164   if (!gst_element_query (pipeline, query)) {
165     gst_query_unref (query);
166     return FALSE;
167   }
168 
169   gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL);
170   gst_query_unref (query);
171 
172   if (!seekable)
173     return FALSE;
174 
175   seek_flags = GST_SEEK_FLAG_FLUSH;
176 
177   switch (mode) {
178     case GST_PLAY_TRICK_MODE_DEFAULT:
179       seek_flags |= GST_SEEK_FLAG_TRICKMODE;
180       break;
181     case GST_PLAY_TRICK_MODE_DEFAULT_NO_AUDIO:
182       seek_flags |= GST_SEEK_FLAG_TRICKMODE | GST_SEEK_FLAG_TRICKMODE_NO_AUDIO;
183       break;
184     case GST_PLAY_TRICK_MODE_KEY_UNITS:
185       seek_flags |= GST_SEEK_FLAG_TRICKMODE_KEY_UNITS;
186       break;
187     case GST_PLAY_TRICK_MODE_KEY_UNITS_NO_AUDIO:
188       seek_flags |=
189           GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | GST_SEEK_FLAG_TRICKMODE_NO_AUDIO;
190       break;
191     case GST_PLAY_TRICK_MODE_NONE:
192     default:
193       break;
194   }
195 
196   if (rate >= 0)
197     seek = gst_event_new_seek (rate, GST_FORMAT_TIME,
198         seek_flags | GST_SEEK_FLAG_ACCURATE,
199         /* start */ GST_SEEK_TYPE_SET, pos,
200         /* stop */ GST_SEEK_TYPE_SET, GST_CLOCK_TIME_NONE);
201   else
202     seek = gst_event_new_seek (rate, GST_FORMAT_TIME,
203         seek_flags | GST_SEEK_FLAG_ACCURATE,
204         /* start */ GST_SEEK_TYPE_SET, 0,
205         /* stop */ GST_SEEK_TYPE_SET, pos);
206 
207   if (!gst_element_send_event (pipeline, seek))
208     return FALSE;
209 
210   cur_rate = rate;
211   trick_mode = mode;
212   return TRUE;
213 }
214 
215 static void
play_set_playback_rate(GstElement * pipeline,gdouble rate)216 play_set_playback_rate (GstElement * pipeline, gdouble rate)
217 {
218   if (play_set_rate_and_trick_mode (pipeline, rate, trick_mode)) {
219     g_print ("Playback rate: %.2f", rate);
220     g_print ("                               \n");
221   } else {
222     g_print ("\n");
223     g_print ("Could not change playback rate to %.2f", rate);
224     g_print (".\n");
225   }
226 }
227 
228 static void
play_set_relative_playback_rate(GstElement * pipeline,gdouble rate_step,gboolean reverse_direction)229 play_set_relative_playback_rate (GstElement * pipeline, gdouble rate_step,
230     gboolean reverse_direction)
231 {
232   gdouble new_rate = cur_rate + rate_step;
233 
234   if (reverse_direction)
235     new_rate *= -1.0;
236 
237   play_set_playback_rate (pipeline, new_rate);
238 }
239 
240 static const gchar *
trick_mode_get_description(GstPlayTrickMode mode)241 trick_mode_get_description (GstPlayTrickMode mode)
242 {
243   switch (mode) {
244     case GST_PLAY_TRICK_MODE_NONE:
245       return "normal playback, trick modes disabled";
246     case GST_PLAY_TRICK_MODE_DEFAULT:
247       return "trick mode: default";
248     case GST_PLAY_TRICK_MODE_DEFAULT_NO_AUDIO:
249       return "trick mode: default, no audio";
250     case GST_PLAY_TRICK_MODE_KEY_UNITS:
251       return "trick mode: key frames only";
252     case GST_PLAY_TRICK_MODE_KEY_UNITS_NO_AUDIO:
253       return "trick mode: key frames only, no audio";
254     default:
255       break;
256   }
257   return "unknown trick mode";
258 }
259 
260 static void
play_switch_trick_mode(GstElement * pipeline)261 play_switch_trick_mode (GstElement * pipeline)
262 {
263   GstPlayTrickMode new_mode = ++trick_mode;
264   const gchar *mode_desc;
265 
266   if (new_mode == GST_PLAY_TRICK_MODE_LAST)
267     new_mode = GST_PLAY_TRICK_MODE_NONE;
268 
269   mode_desc = trick_mode_get_description (new_mode);
270 
271   if (play_set_rate_and_trick_mode (pipeline, cur_rate, new_mode)) {
272     g_print ("Rate: %.2f (%s)                      \n", cur_rate, mode_desc);
273   } else {
274     g_print ("\nCould not change trick mode to %s.\n", mode_desc);
275   }
276 }
277 
278 static void
keyboard_cb(const gchar * key_input,GstElement * pipeline)279 keyboard_cb (const gchar * key_input, GstElement * pipeline)
280 {
281   gchar key = '\0';
282 
283   /* only want to switch/case on single char, not first char of string */
284   if (key_input[0] != '\0' && key_input[1] == '\0')
285     key = g_ascii_tolower (key_input[0]);
286 
287   switch (key) {
288     case ' ':
289       toggle_paused (pipeline);
290       break;
291     case 'q':
292     case 'Q':
293       g_main_loop_quit (loop);
294       break;
295     case 'p':
296       if (cur_rate > -0.2 && cur_rate < 0.0)
297         play_set_relative_playback_rate (pipeline, 0.0, TRUE);
298       else if (ABS (cur_rate) < 2.0)
299         play_set_relative_playback_rate (pipeline, 0.1, FALSE);
300       else if (ABS (cur_rate) < 4.0)
301         play_set_relative_playback_rate (pipeline, 0.5, FALSE);
302       else
303         play_set_relative_playback_rate (pipeline, 1.0, FALSE);
304       break;
305     case 'o':
306       if (cur_rate > 0.0 && cur_rate < 0.20)
307         play_set_relative_playback_rate (pipeline, 0.0, TRUE);
308       else if (ABS (cur_rate) <= 2.0)
309         play_set_relative_playback_rate (pipeline, -0.1, FALSE);
310       else if (ABS (cur_rate) <= 4.0)
311         play_set_relative_playback_rate (pipeline, -0.5, FALSE);
312       else
313         play_set_relative_playback_rate (pipeline, -1.0, FALSE);
314       break;
315     case 'd':
316       play_set_relative_playback_rate (pipeline, 0.0, TRUE);
317       break;
318     case 't':
319       play_switch_trick_mode (pipeline);
320       break;
321     case 27:                   /* ESC */
322       if (key_input[1] == '\0') {
323         g_main_loop_quit (loop);
324         break;
325       }
326     case '0':
327       play_do_seek (pipeline, 0, cur_rate, trick_mode);
328       break;
329     case 'r':
330       GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
331           GST_DEBUG_GRAPH_SHOW_ALL, "ipc.master.requested");
332       break;
333     default:
334       if (strcmp (key_input, GST_PLAY_KB_ARROW_RIGHT) == 0) {
335         relative_seek (pipeline, +0.08);
336       } else if (strcmp (key_input, GST_PLAY_KB_ARROW_LEFT) == 0) {
337         relative_seek (pipeline, -0.01);
338       } else {
339         GST_INFO ("keyboard input:");
340         for (; *key_input != '\0'; ++key_input)
341           GST_INFO ("  code %3d", *key_input);
342       }
343       break;
344   }
345 }
346 
347 static gboolean
master_bus_msg(GstBus * bus,GstMessage * msg,gpointer data)348 master_bus_msg (GstBus * bus, GstMessage * msg, gpointer data)
349 {
350   GstPipeline *pipeline = data;
351 
352   switch (GST_MESSAGE_TYPE (msg)) {
353     case GST_MESSAGE_ERROR:{
354       GError *err;
355       gchar *dbg;
356 
357       gst_message_parse_error (msg, &err, &dbg);
358       g_printerr ("MASTER: ERROR: %s\n", err->message);
359       if (dbg != NULL)
360         g_printerr ("MASTER: ERROR debug information: %s\n", dbg);
361       g_error_free (err);
362       g_free (dbg);
363 
364       GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
365           GST_DEBUG_GRAPH_SHOW_ALL, "ipc.master.error");
366 
367       g_main_loop_quit (loop);
368       break;
369     }
370     case GST_MESSAGE_WARNING:{
371       GError *err;
372       gchar *dbg;
373 
374       gst_message_parse_warning (msg, &err, &dbg);
375       g_printerr ("MASTER: WARNING: %s\n", err->message);
376       if (dbg != NULL)
377         g_printerr ("MASTER: WARNING debug information: %s\n", dbg);
378       g_error_free (err);
379       g_free (dbg);
380 
381       GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
382           GST_DEBUG_GRAPH_SHOW_ALL, "ipc.master.warning");
383       break;
384     }
385     case GST_MESSAGE_ASYNC_DONE:
386       GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
387           GST_DEBUG_GRAPH_SHOW_ALL, "ipc.master.async-done");
388       break;
389     case GST_MESSAGE_EOS:
390       g_print ("EOS on master\n");
391       gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL);
392       g_main_loop_quit (loop);
393       break;
394     case GST_MESSAGE_BUFFERING:{
395       gint percent;
396       GstBufferingMode bufmode;
397 
398       if (!buffering)
399         g_print ("\n");
400 
401       gst_message_parse_buffering (msg, &percent);
402       g_print ("%s %d%%  \r", "Buffering...", percent);
403 
404       gst_message_parse_buffering_stats (msg, &bufmode, NULL, NULL, NULL);
405 
406       /* no state management needed for live pipelines */
407       if (bufmode != GST_BUFFERING_LIVE) {
408         if (percent == 100) {
409           /* a 100% message means buffering is done */
410           if (buffering) {
411             buffering = FALSE;
412             gst_element_set_state (GST_ELEMENT (pipeline), desired_state);
413             g_print ("\n%s\n", gst_element_state_get_name (desired_state));
414           }
415         } else {
416           /* buffering... */
417           if (!buffering) {
418             gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PAUSED);
419             buffering = TRUE;
420           }
421         }
422       }
423       break;
424     }
425     case GST_MESSAGE_CLOCK_LOST:{
426       g_print ("Clock lost, selecting a new one\n");
427       gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PAUSED);
428       gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
429       break;
430     }
431     case GST_MESSAGE_LATENCY:
432     {
433       gst_bin_recalculate_latency (GST_BIN (pipeline));
434       break;
435     }
436     case GST_MESSAGE_REQUEST_STATE:{
437       GstState state;
438       gchar *name;
439 
440       name = gst_object_get_path_string (GST_MESSAGE_SRC (msg));
441 
442       gst_message_parse_request_state (msg, &state);
443 
444       GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
445           GST_DEBUG_GRAPH_SHOW_VERBOSE, "ipc.slave.reqstate");
446 
447       g_print ("Setting state to %s as requested by %s...\n",
448           gst_element_state_get_name (state), name);
449 
450       gst_element_set_state (GST_ELEMENT (pipeline), state);
451       g_free (name);
452       break;
453     }
454     case GST_MESSAGE_ELEMENT:
455     {
456       GstNavigationMessageType mtype = gst_navigation_message_get_type (msg);
457       if (mtype == GST_NAVIGATION_MESSAGE_EVENT) {
458         GstEvent *ev = NULL;
459 
460         if (gst_navigation_message_parse_event (msg, &ev)) {
461           GstNavigationEventType e_type = gst_navigation_event_get_type (ev);
462           switch (e_type) {
463             case GST_NAVIGATION_EVENT_KEY_PRESS:
464             {
465               const gchar *key;
466 
467               if (gst_navigation_event_parse_key_event (ev, &key)) {
468                 GST_INFO ("Key press: %s", key);
469 
470                 if (strcmp (key, "Left") == 0)
471                   key = GST_PLAY_KB_ARROW_LEFT;
472                 else if (strcmp (key, "Right") == 0)
473                   key = GST_PLAY_KB_ARROW_RIGHT;
474                 else if (strcmp (key, "Up") == 0)
475                   key = GST_PLAY_KB_ARROW_UP;
476                 else if (strcmp (key, "Down") == 0)
477                   key = GST_PLAY_KB_ARROW_DOWN;
478                 else if (strcmp (key, "space") == 0)
479                   key = " ";
480                 else if (strlen (key) > 1)
481                   break;
482 
483                 keyboard_cb (key, GST_ELEMENT (pipeline));
484               }
485               break;
486             }
487             case GST_NAVIGATION_EVENT_MOUSE_BUTTON_PRESS:
488             {
489               gint button;
490               if (gst_navigation_event_parse_mouse_button_event (ev, &button,
491                       NULL, NULL)) {
492                 if (button == 4) {
493                   /* wheel up */
494                   relative_seek (GST_ELEMENT (pipeline), +0.08);
495                 } else if (button == 5) {
496                   /* wheel down */
497                   relative_seek (GST_ELEMENT (pipeline), -0.01);
498                 }
499               }
500               break;
501             }
502             default:
503               break;
504           }
505         }
506         if (ev)
507           gst_event_unref (ev);
508       }
509       break;
510     }
511     default:
512       break;
513   }
514   return TRUE;
515 }
516 
517 static int
sendfd(int s,int fd)518 sendfd (int s, int fd)
519 {
520   char buf[1];
521   struct iovec iov;
522   struct msghdr msg;
523   struct cmsghdr *cmsg;
524   int n;
525   char cms[CMSG_SPACE (sizeof (int))];
526 
527   buf[0] = 0;
528   iov.iov_base = buf;
529   iov.iov_len = 1;
530 
531   memset (&msg, 0, sizeof msg);
532   msg.msg_iov = &iov;
533   msg.msg_iovlen = 1;
534   msg.msg_control = (caddr_t) cms;
535   msg.msg_controllen = CMSG_LEN (sizeof (int));
536 
537   cmsg = CMSG_FIRSTHDR (&msg);
538   cmsg->cmsg_len = CMSG_LEN (sizeof (int));
539   cmsg->cmsg_level = SOL_SOCKET;
540   cmsg->cmsg_type = SCM_RIGHTS;
541   memmove (CMSG_DATA (cmsg), &fd, sizeof (int));
542 
543   if ((n = sendmsg (s, &msg, 0)) != iov.iov_len)
544     return -1;
545   return 0;
546 }
547 
548 static gint
find_ipcpipelinesink(gconstpointer e,gconstpointer c)549 find_ipcpipelinesink (gconstpointer e, gconstpointer c)
550 {
551   const GValue *elem = e;
552   const gchar *caps_name = c;
553   const gchar *n = g_object_get_data (g_value_get_object (elem),
554       "ipcpipelinesink-caps-name");
555   return g_strcmp0 (caps_name, n);
556 }
557 
558 /* in HLS the decodebin pads are destroyed and re-created every time
559  * the stream changes bitrate. This trick here ensures that the new
560  * pads that will appear will go and link to the same ipcpipelinesinks,
561  * avoiding the creation of new pipelines in the slave. */
562 static void
on_pad_unlinked(GstPad * pad,GstPad * peer,GstElement * pipeline)563 on_pad_unlinked (GstPad * pad, GstPad * peer, GstElement * pipeline)
564 {
565   GstCaps *caps;
566   const GstStructure *structure;
567 
568   caps = gst_pad_get_current_caps (pad);
569   structure = gst_caps_get_structure (caps, 0);
570 
571   g_object_set_data_full (G_OBJECT (GST_OBJECT_PARENT (peer)),
572       "ipcpipelinesink-caps-name",
573       g_strdup (gst_structure_get_name (structure)), g_free);
574 }
575 
576 static void
on_pad_added(GstElement * element,GstPad * pad,GstElement * pipeline)577 on_pad_added (GstElement * element, GstPad * pad, GstElement * pipeline)
578 {
579   GstElement *ipcpipelinesink;
580   GstPad *sinkpad;
581   GstCaps *caps;
582   const GstStructure *structure;
583   GstIterator *it;
584   GValue elem = G_VALUE_INIT;
585   int sockets[2];
586   gboolean create_sockets;
587 
588   caps = gst_pad_get_current_caps (pad);
589   structure = gst_caps_get_structure (caps, 0);
590 
591   it = gst_bin_iterate_sinks (GST_BIN (pipeline));
592   if (gst_iterator_find_custom (it, find_ipcpipelinesink, &elem,
593           (gpointer) gst_structure_get_name (structure))) {
594     ipcpipelinesink = g_value_get_object (&elem);
595     create_sockets = FALSE;
596     g_value_reset (&elem);
597   } else {
598     ipcpipelinesink = gst_element_factory_make ("ipcpipelinesink", NULL);
599     gst_bin_add (GST_BIN (pipeline), ipcpipelinesink);
600     create_sockets = TRUE;
601   }
602 
603   sinkpad = gst_element_get_static_pad (ipcpipelinesink, "sink");
604   if (gst_pad_link (pad, sinkpad) != GST_PAD_LINK_OK) {
605     fprintf (stderr, "Failed to link ipcpipelinesink\n");
606     exit (1);
607   }
608   gst_object_unref (sinkpad);
609 
610   g_signal_connect (pad, "unlinked", (GCallback) on_pad_unlinked, pipeline);
611 
612   if (create_sockets) {
613     if (socketpair (AF_UNIX, SOCK_STREAM, 0, sockets)) {
614       fprintf (stderr, "Error creating sockets: %s\n", strerror (errno));
615       exit (1);
616     }
617     if (fcntl (sockets[0], F_SETFL, O_NONBLOCK) < 0 ||
618         fcntl (sockets[1], F_SETFL, O_NONBLOCK) < 0) {
619       fprintf (stderr, "Error setting O_NONBLOCK on sockets: %s\n",
620           strerror (errno));
621       exit (1);
622     }
623     g_object_set (ipcpipelinesink, "fdin", sockets[0], "fdout", sockets[0],
624         NULL);
625 
626     printf ("new socket %d\n", sockets[1]);
627     sendfd (pipes[1], sockets[1]);
628   }
629 
630   gst_element_set_state (ipcpipelinesink, GST_STATE_PLAYING);
631 
632   GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
633       GST_DEBUG_GRAPH_SHOW_ALL, "pad.added");
634 }
635 
636 typedef enum
637 {
638   GST_AUTOPLUG_SELECT_TRY,
639   GST_AUTOPLUG_SELECT_EXPOSE,
640   GST_AUTOPLUG_SELECT_SKIP
641 } GstAutoplugSelectResult;
642 
643 static GstAutoplugSelectResult
on_autoplug_select(GstElement * uridecodebin,GstPad * pad,GstCaps * caps,GstElementFactory * factory,GstElement * pipeline)644 on_autoplug_select (GstElement * uridecodebin, GstPad * pad, GstCaps * caps,
645     GstElementFactory * factory, GstElement * pipeline)
646 {
647   /* if decodebin is about to plug a decoder,
648    * stop it right there and expose the pad;
649    * the slave's decodebin will take it from there... */
650   if (gst_element_factory_list_is_type (factory,
651           GST_ELEMENT_FACTORY_TYPE_DECODER)) {
652     gchar *capsstr = gst_caps_to_string (caps);
653     g_print (" exposing to slave: %s\n", capsstr);
654     g_free (capsstr);
655     return GST_AUTOPLUG_SELECT_EXPOSE;
656   }
657   return GST_AUTOPLUG_SELECT_TRY;
658 }
659 
660 static void
start_source(const gchar * uri)661 start_source (const gchar * uri)
662 {
663   GstElement *pipeline;
664   GstElement *uridecodebin;
665   GstBus *bus;
666 
667   pipeline = gst_pipeline_new (NULL);
668 
669   bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
670   gst_bus_add_watch (bus, master_bus_msg, pipeline);
671   gst_object_unref (bus);
672 
673   uridecodebin = gst_element_factory_make ("uridecodebin", NULL);
674   g_object_set (uridecodebin, "uri", uri, NULL);
675   g_signal_connect (uridecodebin, "pad-added", G_CALLBACK (on_pad_added),
676       pipeline);
677   g_signal_connect (uridecodebin, "autoplug-select",
678       G_CALLBACK (on_autoplug_select), pipeline);
679 
680   gst_bin_add (GST_BIN (pipeline), uridecodebin);
681   gst_element_set_state (pipeline, GST_STATE_PLAYING);
682 }
683 
684 /*********** SLAVE ***********/
685 
686 static gboolean
slave_bus_msg(GstBus * bus,GstMessage * msg,gpointer data)687 slave_bus_msg (GstBus * bus, GstMessage * msg, gpointer data)
688 {
689   GstPipeline *pipeline = data;
690 
691   switch (GST_MESSAGE_TYPE (msg)) {
692     case GST_MESSAGE_ERROR:{
693       GError *err;
694       gchar *dbg;
695 
696       gst_message_parse_error (msg, &err, &dbg);
697       g_printerr ("SLAVE: ERROR: %s\n", err->message);
698       if (dbg != NULL)
699         g_printerr ("SLAVE: ERROR debug information: %s\n", dbg);
700       g_error_free (err);
701       g_free (dbg);
702 
703       GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
704           GST_DEBUG_GRAPH_SHOW_ALL, "ipc.slave.error");
705       break;
706     }
707     case GST_MESSAGE_WARNING:{
708       GError *err;
709       gchar *dbg;
710 
711       gst_message_parse_warning (msg, &err, &dbg);
712       g_printerr ("SLAVE: WARNING: %s\n", err->message);
713       if (dbg != NULL)
714         g_printerr ("SLAVE: WARNING debug information: %s\n", dbg);
715       g_error_free (err);
716       g_free (dbg);
717 
718       GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
719           GST_DEBUG_GRAPH_SHOW_ALL, "ipc.slave.warning");
720       break;
721     }
722     case GST_MESSAGE_ASYNC_START:
723       GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
724           GST_DEBUG_GRAPH_SHOW_VERBOSE, "ipc.slave.async-start");
725       break;
726     case GST_MESSAGE_ASYNC_DONE:
727       GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
728           GST_DEBUG_GRAPH_SHOW_ALL, "ipc.slave.async-done");
729       break;
730     default:
731       break;
732   }
733   return TRUE;
734 }
735 
736 static void
on_decoded_pad_added(GstElement * element,GstPad * pad,gpointer data)737 on_decoded_pad_added (GstElement * element, GstPad * pad, gpointer data)
738 {
739   GstBin *pipeline = data;
740   GstCaps *caps;
741   GstPad *cpad;
742   const gchar *type;
743   gchar *capsstr;
744 
745   caps = gst_pad_get_current_caps (pad);
746   capsstr = gst_caps_to_string (caps);
747   printf (" caps: %s\n", capsstr);
748   g_free (capsstr);
749 
750   type = gst_structure_get_name (gst_caps_get_structure (caps, 0));
751   if (!strcmp (type, "video/x-raw")) {
752     GstElement *c, *s;
753     c = gst_element_factory_make ("videoconvert", NULL);
754     s = gst_element_factory_make (arg_video_sink, NULL);
755     gst_bin_add_many (GST_BIN (pipeline), c, s, NULL);
756     gst_element_link_many (c, s, NULL);
757     cpad = gst_element_get_static_pad (c, "sink");
758     gst_pad_link (pad, cpad);
759     gst_object_unref (cpad);
760     gst_element_set_state (s, GST_STATE_PLAYING);
761     gst_element_set_state (c, GST_STATE_PLAYING);
762   } else if (!strcmp (type, "audio/x-raw")) {
763     GstElement *c, *s;
764     c = gst_element_factory_make ("audioconvert", NULL);
765     s = gst_element_factory_make (arg_audio_sink, NULL);
766     gst_bin_add_many (GST_BIN (pipeline), c, s, NULL);
767     gst_element_link_many (c, s, NULL);
768     cpad = gst_element_get_static_pad (c, "sink");
769     gst_pad_link (pad, cpad);
770     gst_object_unref (cpad);
771     gst_element_set_state (s, GST_STATE_PLAYING);
772     gst_element_set_state (c, GST_STATE_PLAYING);
773   } else {
774     GstElement *s;
775     s = gst_element_factory_make ("fakesink", NULL);
776     g_object_set (s, "sync", TRUE, "async", TRUE, NULL);
777     gst_bin_add_many (GST_BIN (pipeline), s, NULL);
778     cpad = gst_element_get_static_pad (s, "sink");
779     gst_pad_link (pad, cpad);
780     gst_object_unref (cpad);
781     gst_element_set_state (s, GST_STATE_PLAYING);
782   }
783 
784   gst_caps_unref (caps);
785 
786   GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
787       GST_DEBUG_GRAPH_SHOW_ALL, "decoded.pad.added");
788 }
789 
790 static int
recvfd(int s)791 recvfd (int s)
792 {
793   int n;
794   int fd;
795   char buf[1];
796   struct iovec iov;
797   struct msghdr msg;
798   struct cmsghdr *cmsg;
799   char cms[CMSG_SPACE (sizeof (int))];
800 
801   iov.iov_base = buf;
802   iov.iov_len = 1;
803 
804   memset (&msg, 0, sizeof msg);
805   msg.msg_name = 0;
806   msg.msg_namelen = 0;
807   msg.msg_iov = &iov;
808   msg.msg_iovlen = 1;
809 
810   msg.msg_control = (caddr_t) cms;
811   msg.msg_controllen = sizeof cms;
812 
813   if ((n = recvmsg (s, &msg, 0)) < 0)
814     return -1;
815   if (n == 0) {
816     perror ("unexpected EOF");
817     return -1;
818   }
819   cmsg = CMSG_FIRSTHDR (&msg);
820   memmove (&fd, CMSG_DATA (cmsg), sizeof (int));
821   return fd;
822 }
823 
824 static gboolean
pipe_reader(gpointer data)825 pipe_reader (gpointer data)
826 {
827   GstElement *pipeline = data;
828   GstElement *ipcpipelinesrc, *mq, *decodebin;
829   GstPad *rpad, *sink_pad;
830   int fd;
831   fd_set set;
832   struct timeval tv;
833   int ret;
834   static int idx = 0;
835   char name[32];
836 
837   FD_ZERO (&set);
838   FD_SET (pipes[0], &set);
839   tv.tv_sec = tv.tv_usec = 0;
840   ret = select (pipes[0] + 1, &set, NULL, NULL, &tv);
841   if (ret < 0) {
842     fprintf (stderr, "Failed to select: %s\n", strerror (errno));
843     return TRUE;
844   }
845   if (!FD_ISSET (pipes[0], &set))
846     return TRUE;
847 
848   fd = recvfd (pipes[0]);
849   ipcpipelinesrc = gst_element_factory_make ("ipcpipelinesrc", NULL);
850   gst_bin_add (GST_BIN (pipeline), ipcpipelinesrc);
851   g_object_set (ipcpipelinesrc, "fdin", fd, "fdout", fd, NULL);
852 
853 
854   mq = gst_bin_get_by_name (GST_BIN (pipeline), "mq");
855   if (!mq) {
856     fprintf (stderr, "Failed to get mq\n");
857     return TRUE;
858   }
859   if (!gst_element_link (ipcpipelinesrc, mq)) {
860     fprintf (stderr, "Failed to link ipcpipelinesrc and mq\n");
861     return TRUE;
862   }
863 
864   snprintf (name, sizeof (name), "src_%u", idx++);
865   rpad = gst_element_get_static_pad (mq, name);
866   if (!rpad) {
867     fprintf (stderr, "Failed to get mq request pad\n");
868     return TRUE;
869   }
870 
871   decodebin = gst_element_factory_make ("decodebin", NULL);
872   gst_bin_add (GST_BIN (pipeline), decodebin);
873   sink_pad = gst_element_get_static_pad (decodebin, "sink");
874   gst_pad_link (rpad, sink_pad);
875   gst_object_unref (sink_pad);
876 
877   g_signal_connect (decodebin, "pad-added", G_CALLBACK (on_decoded_pad_added),
878       pipeline);
879 
880   /* dynamically added elements should be synced manually
881    * to the state of the slave pipeline */
882   gst_element_sync_state_with_parent (ipcpipelinesrc);
883   gst_element_sync_state_with_parent (decodebin);
884 
885   GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
886       GST_DEBUG_GRAPH_SHOW_ALL, "ipc.slave.added");
887   gst_object_unref (mq);
888 
889   return TRUE;
890 }
891 
892 static void
start_sink(void)893 start_sink (void)
894 {
895   GstElement *pipeline;
896   GstElement *multiqueue;
897 
898   pipeline = gst_element_factory_make ("ipcslavepipeline", NULL);
899   gst_bus_add_watch (GST_ELEMENT_BUS (pipeline), slave_bus_msg, pipeline);
900 
901   multiqueue = gst_element_factory_make ("multiqueue", "mq");
902   gst_bin_add (GST_BIN (pipeline), multiqueue);
903 
904   g_timeout_add (10, &pipe_reader, gst_object_ref (pipeline));
905 
906   GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
907       GST_DEBUG_GRAPH_SHOW_ALL, "ipc.sink");
908   /* The state of the slave pipeline will change together with the state
909    * of the master, there is no need to call gst_element_set_state() here */
910 }
911 
912 
913 /********** COMMON ***********/
914 
915 static void
init(int * argc,char *** argv)916 init (int *argc, char ***argv)
917 {
918   GOptionEntry options[] = {
919     {"audio-sink", 0, 0, G_OPTION_ARG_STRING, &arg_audio_sink,
920         "Audio sink element to use (default autoaudiosink)", NULL},
921     {"video-sink", 0, 0, G_OPTION_ARG_STRING, &arg_video_sink,
922         "Video sink element to use (default autovideosink)", NULL},
923     {NULL}
924   };
925   GOptionContext *ctx;
926   GError *err = NULL;
927 
928   ctx = g_option_context_new ("");
929   g_option_context_add_main_entries (ctx, options, "");
930   if (!g_option_context_parse (ctx, argc, argv, &err)) {
931     fprintf (stderr, "Error initializing: %s\n", err->message);
932     exit (1);
933   }
934   g_option_context_free (ctx);
935 }
936 
937 static void
run(pid_t pid)938 run (pid_t pid)
939 {
940   loop = g_main_loop_new (NULL, FALSE);
941   g_main_loop_run (loop);
942   if (pid > 0)
943     kill (pid, SIGTERM);
944 }
945 
946 gint
main(gint argc,gchar ** argv)947 main (gint argc, gchar ** argv)
948 {
949   GError *error = NULL;
950   gchar *uri = NULL;
951   pid_t pid;
952 
953   init (&argc, &argv);
954 
955   if (argc < 2) {
956     fprintf (stderr, "usage: %s [av-filename-or-url]\n", argv[0]);
957     return 1;
958   }
959 
960   if (!g_strstr_len (argv[1], -1, "://")) {
961     uri = gst_filename_to_uri (argv[1], &error);
962   } else {
963     uri = g_strdup (argv[1]);
964   }
965 
966   if (error) {
967     fprintf (stderr, "usage: %s [av-filename-or-url]\n", argv[0]);
968     g_clear_error (&error);
969     return 1;
970   }
971 
972   if (socketpair (AF_UNIX, SOCK_STREAM, 0, pipes)) {
973     fprintf (stderr, "Error creating pipes: %s\n", strerror (errno));
974     return 2;
975   }
976   if (fcntl (pipes[0], F_SETFL, O_NONBLOCK) < 0 ||
977       fcntl (pipes[1], F_SETFL, O_NONBLOCK) < 0) {
978     fprintf (stderr, "Error setting O_NONBLOCK on pipes: %s\n",
979         strerror (errno));
980     return 2;
981   }
982 
983   pid = fork ();
984   if (pid < 0) {
985     fprintf (stderr, "Error forking: %s\n", strerror (errno));
986     return 1;
987   } else if (pid > 0) {
988     setenv ("GST_DEBUG_FILE", "gstsrc.log", 1);
989     gst_init (&argc, &argv);
990     start_source (uri);
991   } else {
992     setenv ("GST_DEBUG_FILE", "gstsink.log", 1);
993     gst_init (&argc, &argv);
994     start_sink ();
995   }
996 
997   g_free (uri);
998   run (pid);
999 
1000   return 0;
1001 }
1002