1 /* GStreamer
2 *
3 * example program for the ipcpipelinesrc/ipcpipelinesink elements
4 *
5 * Copyright (C) 2013-2014 Tim-Philipp Müller <tim centricular net>
6 * Copyright (C) 2013 Collabora Ltd.
7 * Copyright (C) 2015 Centricular Ltd
8 * Copyright (C) 2015-2017 YouView TV Ltd
9 * Author: George Kiagiadakis <george.kiagiadakis@collabora.com>
10 *
11 * This library is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Library General Public
13 * License as published by the Free Software Foundation; either
14 * version 2 of the License, or (at your option) any later version.
15 *
16 * This library is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 * Library General Public License for more details.
20 *
21 * You should have received a copy of the GNU Library General Public
22 * License along with this library; if not, write to the
23 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24 * Boston, MA 02110-1301, USA.
25 */
26
27 /*
28 * Based on gst-play and ipcpipeline1. This program will play any URI
29 * while splitting the pipeline in two processes, running the source & demuxer
30 * on the master process and the decoders & sinks on the slave.
31 * See keyboard_cb() for the various keyboard shortcuts you can use to
32 * interact with it while the video window is focused.
33 */
34
35 #define _GNU_SOURCE
36 #include <unistd.h>
37 #include <fcntl.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <string.h>
41 #include <errno.h>
42 #include <sys/types.h>
43 #include <sys/socket.h>
44 #include <gst/gst.h>
45 #include <gst/video/navigation.h>
46
47 static GMainLoop *loop;
48 static int pipes[2] = { -1, -1 };
49
50 static const char *arg_video_sink = "autovideosink";
51 static const char *arg_audio_sink = "autoaudiosink";
52
53 /******* MASTER *******/
54
55 #define GST_PLAY_KB_ARROW_UP "\033[A"
56 #define GST_PLAY_KB_ARROW_DOWN "\033[B"
57 #define GST_PLAY_KB_ARROW_RIGHT "\033[C"
58 #define GST_PLAY_KB_ARROW_LEFT "\033[D"
59
60 typedef enum
61 {
62 GST_PLAY_TRICK_MODE_NONE = 0,
63 GST_PLAY_TRICK_MODE_DEFAULT,
64 GST_PLAY_TRICK_MODE_DEFAULT_NO_AUDIO,
65 GST_PLAY_TRICK_MODE_KEY_UNITS,
66 GST_PLAY_TRICK_MODE_KEY_UNITS_NO_AUDIO,
67 GST_PLAY_TRICK_MODE_LAST
68 } GstPlayTrickMode;
69
70 static GstPlayTrickMode trick_mode = GST_PLAY_TRICK_MODE_NONE;
71 static gdouble cur_rate = 1.0;
72 static gboolean buffering = FALSE;
73 static GstState desired_state = GST_STATE_PLAYING;
74
75 static gboolean play_do_seek (GstElement * pipeline, gint64 pos, gdouble rate,
76 GstPlayTrickMode mode);
77
78 static void
toggle_paused(GstElement * pipeline)79 toggle_paused (GstElement * pipeline)
80 {
81 if (desired_state == GST_STATE_PLAYING)
82 desired_state = GST_STATE_PAUSED;
83 else
84 desired_state = GST_STATE_PLAYING;
85
86 if (!buffering) {
87 gst_element_set_state (pipeline, desired_state);
88 } else if (desired_state == GST_STATE_PLAYING) {
89 g_print ("\nWill play as soon as buffering finishes)\n");
90 }
91 }
92
93 static void
relative_seek(GstElement * pipeline,gdouble percent)94 relative_seek (GstElement * pipeline, gdouble percent)
95 {
96 GstQuery *query;
97 gboolean seekable = FALSE;
98 gint64 dur = -1, pos = -1, step;
99
100 g_return_if_fail (percent >= -1.0 && percent <= 1.0);
101
102 if (!gst_element_query_position (pipeline, GST_FORMAT_TIME, &pos))
103 goto seek_failed;
104
105 query = gst_query_new_seeking (GST_FORMAT_TIME);
106 if (!gst_element_query (pipeline, query)) {
107 gst_query_unref (query);
108 goto seek_failed;
109 }
110
111 gst_query_parse_seeking (query, NULL, &seekable, NULL, &dur);
112 gst_query_unref (query);
113
114 if (!seekable || dur <= 0)
115 goto seek_failed;
116
117 step = dur * percent;
118 if (ABS (step) < GST_SECOND)
119 step = (percent < 0) ? -GST_SECOND : GST_SECOND;
120
121 pos = pos + step;
122 if (pos > dur) {
123 g_print ("\nReached end of play list.\n");
124 g_main_loop_quit (loop);
125 } else {
126 if (pos < 0)
127 pos = 0;
128
129 play_do_seek (pipeline, pos, cur_rate, trick_mode);
130 }
131
132 return;
133
134 seek_failed:
135 {
136 g_print ("\nCould not seek.\n");
137 }
138 }
139
140 static gboolean
play_set_rate_and_trick_mode(GstElement * pipeline,gdouble rate,GstPlayTrickMode mode)141 play_set_rate_and_trick_mode (GstElement * pipeline, gdouble rate,
142 GstPlayTrickMode mode)
143 {
144 gint64 pos = -1;
145
146 g_return_val_if_fail (rate != 0, FALSE);
147
148 if (!gst_element_query_position (pipeline, GST_FORMAT_TIME, &pos))
149 return FALSE;
150
151 return play_do_seek (pipeline, pos, rate, mode);
152 }
153
154 static gboolean
play_do_seek(GstElement * pipeline,gint64 pos,gdouble rate,GstPlayTrickMode mode)155 play_do_seek (GstElement * pipeline, gint64 pos, gdouble rate,
156 GstPlayTrickMode mode)
157 {
158 GstSeekFlags seek_flags;
159 GstQuery *query;
160 GstEvent *seek;
161 gboolean seekable = FALSE;
162
163 query = gst_query_new_seeking (GST_FORMAT_TIME);
164 if (!gst_element_query (pipeline, query)) {
165 gst_query_unref (query);
166 return FALSE;
167 }
168
169 gst_query_parse_seeking (query, NULL, &seekable, NULL, NULL);
170 gst_query_unref (query);
171
172 if (!seekable)
173 return FALSE;
174
175 seek_flags = GST_SEEK_FLAG_FLUSH;
176
177 switch (mode) {
178 case GST_PLAY_TRICK_MODE_DEFAULT:
179 seek_flags |= GST_SEEK_FLAG_TRICKMODE;
180 break;
181 case GST_PLAY_TRICK_MODE_DEFAULT_NO_AUDIO:
182 seek_flags |= GST_SEEK_FLAG_TRICKMODE | GST_SEEK_FLAG_TRICKMODE_NO_AUDIO;
183 break;
184 case GST_PLAY_TRICK_MODE_KEY_UNITS:
185 seek_flags |= GST_SEEK_FLAG_TRICKMODE_KEY_UNITS;
186 break;
187 case GST_PLAY_TRICK_MODE_KEY_UNITS_NO_AUDIO:
188 seek_flags |=
189 GST_SEEK_FLAG_TRICKMODE_KEY_UNITS | GST_SEEK_FLAG_TRICKMODE_NO_AUDIO;
190 break;
191 case GST_PLAY_TRICK_MODE_NONE:
192 default:
193 break;
194 }
195
196 if (rate >= 0)
197 seek = gst_event_new_seek (rate, GST_FORMAT_TIME,
198 seek_flags | GST_SEEK_FLAG_ACCURATE,
199 /* start */ GST_SEEK_TYPE_SET, pos,
200 /* stop */ GST_SEEK_TYPE_SET, GST_CLOCK_TIME_NONE);
201 else
202 seek = gst_event_new_seek (rate, GST_FORMAT_TIME,
203 seek_flags | GST_SEEK_FLAG_ACCURATE,
204 /* start */ GST_SEEK_TYPE_SET, 0,
205 /* stop */ GST_SEEK_TYPE_SET, pos);
206
207 if (!gst_element_send_event (pipeline, seek))
208 return FALSE;
209
210 cur_rate = rate;
211 trick_mode = mode;
212 return TRUE;
213 }
214
215 static void
play_set_playback_rate(GstElement * pipeline,gdouble rate)216 play_set_playback_rate (GstElement * pipeline, gdouble rate)
217 {
218 if (play_set_rate_and_trick_mode (pipeline, rate, trick_mode)) {
219 g_print ("Playback rate: %.2f", rate);
220 g_print (" \n");
221 } else {
222 g_print ("\n");
223 g_print ("Could not change playback rate to %.2f", rate);
224 g_print (".\n");
225 }
226 }
227
228 static void
play_set_relative_playback_rate(GstElement * pipeline,gdouble rate_step,gboolean reverse_direction)229 play_set_relative_playback_rate (GstElement * pipeline, gdouble rate_step,
230 gboolean reverse_direction)
231 {
232 gdouble new_rate = cur_rate + rate_step;
233
234 if (reverse_direction)
235 new_rate *= -1.0;
236
237 play_set_playback_rate (pipeline, new_rate);
238 }
239
240 static const gchar *
trick_mode_get_description(GstPlayTrickMode mode)241 trick_mode_get_description (GstPlayTrickMode mode)
242 {
243 switch (mode) {
244 case GST_PLAY_TRICK_MODE_NONE:
245 return "normal playback, trick modes disabled";
246 case GST_PLAY_TRICK_MODE_DEFAULT:
247 return "trick mode: default";
248 case GST_PLAY_TRICK_MODE_DEFAULT_NO_AUDIO:
249 return "trick mode: default, no audio";
250 case GST_PLAY_TRICK_MODE_KEY_UNITS:
251 return "trick mode: key frames only";
252 case GST_PLAY_TRICK_MODE_KEY_UNITS_NO_AUDIO:
253 return "trick mode: key frames only, no audio";
254 default:
255 break;
256 }
257 return "unknown trick mode";
258 }
259
260 static void
play_switch_trick_mode(GstElement * pipeline)261 play_switch_trick_mode (GstElement * pipeline)
262 {
263 GstPlayTrickMode new_mode = ++trick_mode;
264 const gchar *mode_desc;
265
266 if (new_mode == GST_PLAY_TRICK_MODE_LAST)
267 new_mode = GST_PLAY_TRICK_MODE_NONE;
268
269 mode_desc = trick_mode_get_description (new_mode);
270
271 if (play_set_rate_and_trick_mode (pipeline, cur_rate, new_mode)) {
272 g_print ("Rate: %.2f (%s) \n", cur_rate, mode_desc);
273 } else {
274 g_print ("\nCould not change trick mode to %s.\n", mode_desc);
275 }
276 }
277
278 static void
keyboard_cb(const gchar * key_input,GstElement * pipeline)279 keyboard_cb (const gchar * key_input, GstElement * pipeline)
280 {
281 gchar key = '\0';
282
283 /* only want to switch/case on single char, not first char of string */
284 if (key_input[0] != '\0' && key_input[1] == '\0')
285 key = g_ascii_tolower (key_input[0]);
286
287 switch (key) {
288 case ' ':
289 toggle_paused (pipeline);
290 break;
291 case 'q':
292 case 'Q':
293 g_main_loop_quit (loop);
294 break;
295 case 'p':
296 if (cur_rate > -0.2 && cur_rate < 0.0)
297 play_set_relative_playback_rate (pipeline, 0.0, TRUE);
298 else if (ABS (cur_rate) < 2.0)
299 play_set_relative_playback_rate (pipeline, 0.1, FALSE);
300 else if (ABS (cur_rate) < 4.0)
301 play_set_relative_playback_rate (pipeline, 0.5, FALSE);
302 else
303 play_set_relative_playback_rate (pipeline, 1.0, FALSE);
304 break;
305 case 'o':
306 if (cur_rate > 0.0 && cur_rate < 0.20)
307 play_set_relative_playback_rate (pipeline, 0.0, TRUE);
308 else if (ABS (cur_rate) <= 2.0)
309 play_set_relative_playback_rate (pipeline, -0.1, FALSE);
310 else if (ABS (cur_rate) <= 4.0)
311 play_set_relative_playback_rate (pipeline, -0.5, FALSE);
312 else
313 play_set_relative_playback_rate (pipeline, -1.0, FALSE);
314 break;
315 case 'd':
316 play_set_relative_playback_rate (pipeline, 0.0, TRUE);
317 break;
318 case 't':
319 play_switch_trick_mode (pipeline);
320 break;
321 case 27: /* ESC */
322 if (key_input[1] == '\0') {
323 g_main_loop_quit (loop);
324 break;
325 }
326 case '0':
327 play_do_seek (pipeline, 0, cur_rate, trick_mode);
328 break;
329 case 'r':
330 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
331 GST_DEBUG_GRAPH_SHOW_ALL, "ipc.master.requested");
332 break;
333 default:
334 if (strcmp (key_input, GST_PLAY_KB_ARROW_RIGHT) == 0) {
335 relative_seek (pipeline, +0.08);
336 } else if (strcmp (key_input, GST_PLAY_KB_ARROW_LEFT) == 0) {
337 relative_seek (pipeline, -0.01);
338 } else {
339 GST_INFO ("keyboard input:");
340 for (; *key_input != '\0'; ++key_input)
341 GST_INFO (" code %3d", *key_input);
342 }
343 break;
344 }
345 }
346
347 static gboolean
master_bus_msg(GstBus * bus,GstMessage * msg,gpointer data)348 master_bus_msg (GstBus * bus, GstMessage * msg, gpointer data)
349 {
350 GstPipeline *pipeline = data;
351
352 switch (GST_MESSAGE_TYPE (msg)) {
353 case GST_MESSAGE_ERROR:{
354 GError *err;
355 gchar *dbg;
356
357 gst_message_parse_error (msg, &err, &dbg);
358 g_printerr ("MASTER: ERROR: %s\n", err->message);
359 if (dbg != NULL)
360 g_printerr ("MASTER: ERROR debug information: %s\n", dbg);
361 g_error_free (err);
362 g_free (dbg);
363
364 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
365 GST_DEBUG_GRAPH_SHOW_ALL, "ipc.master.error");
366
367 g_main_loop_quit (loop);
368 break;
369 }
370 case GST_MESSAGE_WARNING:{
371 GError *err;
372 gchar *dbg;
373
374 gst_message_parse_warning (msg, &err, &dbg);
375 g_printerr ("MASTER: WARNING: %s\n", err->message);
376 if (dbg != NULL)
377 g_printerr ("MASTER: WARNING debug information: %s\n", dbg);
378 g_error_free (err);
379 g_free (dbg);
380
381 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
382 GST_DEBUG_GRAPH_SHOW_ALL, "ipc.master.warning");
383 break;
384 }
385 case GST_MESSAGE_ASYNC_DONE:
386 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
387 GST_DEBUG_GRAPH_SHOW_ALL, "ipc.master.async-done");
388 break;
389 case GST_MESSAGE_EOS:
390 g_print ("EOS on master\n");
391 gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL);
392 g_main_loop_quit (loop);
393 break;
394 case GST_MESSAGE_BUFFERING:{
395 gint percent;
396 GstBufferingMode bufmode;
397
398 if (!buffering)
399 g_print ("\n");
400
401 gst_message_parse_buffering (msg, &percent);
402 g_print ("%s %d%% \r", "Buffering...", percent);
403
404 gst_message_parse_buffering_stats (msg, &bufmode, NULL, NULL, NULL);
405
406 /* no state management needed for live pipelines */
407 if (bufmode != GST_BUFFERING_LIVE) {
408 if (percent == 100) {
409 /* a 100% message means buffering is done */
410 if (buffering) {
411 buffering = FALSE;
412 gst_element_set_state (GST_ELEMENT (pipeline), desired_state);
413 g_print ("\n%s\n", gst_element_state_get_name (desired_state));
414 }
415 } else {
416 /* buffering... */
417 if (!buffering) {
418 gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PAUSED);
419 buffering = TRUE;
420 }
421 }
422 }
423 break;
424 }
425 case GST_MESSAGE_CLOCK_LOST:{
426 g_print ("Clock lost, selecting a new one\n");
427 gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PAUSED);
428 gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
429 break;
430 }
431 case GST_MESSAGE_LATENCY:
432 {
433 gst_bin_recalculate_latency (GST_BIN (pipeline));
434 break;
435 }
436 case GST_MESSAGE_REQUEST_STATE:{
437 GstState state;
438 gchar *name;
439
440 name = gst_object_get_path_string (GST_MESSAGE_SRC (msg));
441
442 gst_message_parse_request_state (msg, &state);
443
444 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
445 GST_DEBUG_GRAPH_SHOW_VERBOSE, "ipc.slave.reqstate");
446
447 g_print ("Setting state to %s as requested by %s...\n",
448 gst_element_state_get_name (state), name);
449
450 gst_element_set_state (GST_ELEMENT (pipeline), state);
451 g_free (name);
452 break;
453 }
454 case GST_MESSAGE_ELEMENT:
455 {
456 GstNavigationMessageType mtype = gst_navigation_message_get_type (msg);
457 if (mtype == GST_NAVIGATION_MESSAGE_EVENT) {
458 GstEvent *ev = NULL;
459
460 if (gst_navigation_message_parse_event (msg, &ev)) {
461 GstNavigationEventType e_type = gst_navigation_event_get_type (ev);
462 switch (e_type) {
463 case GST_NAVIGATION_EVENT_KEY_PRESS:
464 {
465 const gchar *key;
466
467 if (gst_navigation_event_parse_key_event (ev, &key)) {
468 GST_INFO ("Key press: %s", key);
469
470 if (strcmp (key, "Left") == 0)
471 key = GST_PLAY_KB_ARROW_LEFT;
472 else if (strcmp (key, "Right") == 0)
473 key = GST_PLAY_KB_ARROW_RIGHT;
474 else if (strcmp (key, "Up") == 0)
475 key = GST_PLAY_KB_ARROW_UP;
476 else if (strcmp (key, "Down") == 0)
477 key = GST_PLAY_KB_ARROW_DOWN;
478 else if (strcmp (key, "space") == 0)
479 key = " ";
480 else if (strlen (key) > 1)
481 break;
482
483 keyboard_cb (key, GST_ELEMENT (pipeline));
484 }
485 break;
486 }
487 case GST_NAVIGATION_EVENT_MOUSE_BUTTON_PRESS:
488 {
489 gint button;
490 if (gst_navigation_event_parse_mouse_button_event (ev, &button,
491 NULL, NULL)) {
492 if (button == 4) {
493 /* wheel up */
494 relative_seek (GST_ELEMENT (pipeline), +0.08);
495 } else if (button == 5) {
496 /* wheel down */
497 relative_seek (GST_ELEMENT (pipeline), -0.01);
498 }
499 }
500 break;
501 }
502 default:
503 break;
504 }
505 }
506 if (ev)
507 gst_event_unref (ev);
508 }
509 break;
510 }
511 default:
512 break;
513 }
514 return TRUE;
515 }
516
517 static int
sendfd(int s,int fd)518 sendfd (int s, int fd)
519 {
520 char buf[1];
521 struct iovec iov;
522 struct msghdr msg;
523 struct cmsghdr *cmsg;
524 int n;
525 char cms[CMSG_SPACE (sizeof (int))];
526
527 buf[0] = 0;
528 iov.iov_base = buf;
529 iov.iov_len = 1;
530
531 memset (&msg, 0, sizeof msg);
532 msg.msg_iov = &iov;
533 msg.msg_iovlen = 1;
534 msg.msg_control = (caddr_t) cms;
535 msg.msg_controllen = CMSG_LEN (sizeof (int));
536
537 cmsg = CMSG_FIRSTHDR (&msg);
538 cmsg->cmsg_len = CMSG_LEN (sizeof (int));
539 cmsg->cmsg_level = SOL_SOCKET;
540 cmsg->cmsg_type = SCM_RIGHTS;
541 memmove (CMSG_DATA (cmsg), &fd, sizeof (int));
542
543 if ((n = sendmsg (s, &msg, 0)) != iov.iov_len)
544 return -1;
545 return 0;
546 }
547
548 static gint
find_ipcpipelinesink(gconstpointer e,gconstpointer c)549 find_ipcpipelinesink (gconstpointer e, gconstpointer c)
550 {
551 const GValue *elem = e;
552 const gchar *caps_name = c;
553 const gchar *n = g_object_get_data (g_value_get_object (elem),
554 "ipcpipelinesink-caps-name");
555 return g_strcmp0 (caps_name, n);
556 }
557
558 /* in HLS the decodebin pads are destroyed and re-created every time
559 * the stream changes bitrate. This trick here ensures that the new
560 * pads that will appear will go and link to the same ipcpipelinesinks,
561 * avoiding the creation of new pipelines in the slave. */
562 static void
on_pad_unlinked(GstPad * pad,GstPad * peer,GstElement * pipeline)563 on_pad_unlinked (GstPad * pad, GstPad * peer, GstElement * pipeline)
564 {
565 GstCaps *caps;
566 const GstStructure *structure;
567
568 caps = gst_pad_get_current_caps (pad);
569 structure = gst_caps_get_structure (caps, 0);
570
571 g_object_set_data_full (G_OBJECT (GST_OBJECT_PARENT (peer)),
572 "ipcpipelinesink-caps-name",
573 g_strdup (gst_structure_get_name (structure)), g_free);
574 }
575
576 static void
on_pad_added(GstElement * element,GstPad * pad,GstElement * pipeline)577 on_pad_added (GstElement * element, GstPad * pad, GstElement * pipeline)
578 {
579 GstElement *ipcpipelinesink;
580 GstPad *sinkpad;
581 GstCaps *caps;
582 const GstStructure *structure;
583 GstIterator *it;
584 GValue elem = G_VALUE_INIT;
585 int sockets[2];
586 gboolean create_sockets;
587
588 caps = gst_pad_get_current_caps (pad);
589 structure = gst_caps_get_structure (caps, 0);
590
591 it = gst_bin_iterate_sinks (GST_BIN (pipeline));
592 if (gst_iterator_find_custom (it, find_ipcpipelinesink, &elem,
593 (gpointer) gst_structure_get_name (structure))) {
594 ipcpipelinesink = g_value_get_object (&elem);
595 create_sockets = FALSE;
596 g_value_reset (&elem);
597 } else {
598 ipcpipelinesink = gst_element_factory_make ("ipcpipelinesink", NULL);
599 gst_bin_add (GST_BIN (pipeline), ipcpipelinesink);
600 create_sockets = TRUE;
601 }
602
603 sinkpad = gst_element_get_static_pad (ipcpipelinesink, "sink");
604 if (gst_pad_link (pad, sinkpad) != GST_PAD_LINK_OK) {
605 fprintf (stderr, "Failed to link ipcpipelinesink\n");
606 exit (1);
607 }
608 gst_object_unref (sinkpad);
609
610 g_signal_connect (pad, "unlinked", (GCallback) on_pad_unlinked, pipeline);
611
612 if (create_sockets) {
613 if (socketpair (AF_UNIX, SOCK_STREAM, 0, sockets)) {
614 fprintf (stderr, "Error creating sockets: %s\n", strerror (errno));
615 exit (1);
616 }
617 if (fcntl (sockets[0], F_SETFL, O_NONBLOCK) < 0 ||
618 fcntl (sockets[1], F_SETFL, O_NONBLOCK) < 0) {
619 fprintf (stderr, "Error setting O_NONBLOCK on sockets: %s\n",
620 strerror (errno));
621 exit (1);
622 }
623 g_object_set (ipcpipelinesink, "fdin", sockets[0], "fdout", sockets[0],
624 NULL);
625
626 printf ("new socket %d\n", sockets[1]);
627 sendfd (pipes[1], sockets[1]);
628 }
629
630 gst_element_set_state (ipcpipelinesink, GST_STATE_PLAYING);
631
632 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
633 GST_DEBUG_GRAPH_SHOW_ALL, "pad.added");
634 }
635
636 typedef enum
637 {
638 GST_AUTOPLUG_SELECT_TRY,
639 GST_AUTOPLUG_SELECT_EXPOSE,
640 GST_AUTOPLUG_SELECT_SKIP
641 } GstAutoplugSelectResult;
642
643 static GstAutoplugSelectResult
on_autoplug_select(GstElement * uridecodebin,GstPad * pad,GstCaps * caps,GstElementFactory * factory,GstElement * pipeline)644 on_autoplug_select (GstElement * uridecodebin, GstPad * pad, GstCaps * caps,
645 GstElementFactory * factory, GstElement * pipeline)
646 {
647 /* if decodebin is about to plug a decoder,
648 * stop it right there and expose the pad;
649 * the slave's decodebin will take it from there... */
650 if (gst_element_factory_list_is_type (factory,
651 GST_ELEMENT_FACTORY_TYPE_DECODER)) {
652 gchar *capsstr = gst_caps_to_string (caps);
653 g_print (" exposing to slave: %s\n", capsstr);
654 g_free (capsstr);
655 return GST_AUTOPLUG_SELECT_EXPOSE;
656 }
657 return GST_AUTOPLUG_SELECT_TRY;
658 }
659
660 static void
start_source(const gchar * uri)661 start_source (const gchar * uri)
662 {
663 GstElement *pipeline;
664 GstElement *uridecodebin;
665 GstBus *bus;
666
667 pipeline = gst_pipeline_new (NULL);
668
669 bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
670 gst_bus_add_watch (bus, master_bus_msg, pipeline);
671 gst_object_unref (bus);
672
673 uridecodebin = gst_element_factory_make ("uridecodebin", NULL);
674 g_object_set (uridecodebin, "uri", uri, NULL);
675 g_signal_connect (uridecodebin, "pad-added", G_CALLBACK (on_pad_added),
676 pipeline);
677 g_signal_connect (uridecodebin, "autoplug-select",
678 G_CALLBACK (on_autoplug_select), pipeline);
679
680 gst_bin_add (GST_BIN (pipeline), uridecodebin);
681 gst_element_set_state (pipeline, GST_STATE_PLAYING);
682 }
683
684 /*********** SLAVE ***********/
685
686 static gboolean
slave_bus_msg(GstBus * bus,GstMessage * msg,gpointer data)687 slave_bus_msg (GstBus * bus, GstMessage * msg, gpointer data)
688 {
689 GstPipeline *pipeline = data;
690
691 switch (GST_MESSAGE_TYPE (msg)) {
692 case GST_MESSAGE_ERROR:{
693 GError *err;
694 gchar *dbg;
695
696 gst_message_parse_error (msg, &err, &dbg);
697 g_printerr ("SLAVE: ERROR: %s\n", err->message);
698 if (dbg != NULL)
699 g_printerr ("SLAVE: ERROR debug information: %s\n", dbg);
700 g_error_free (err);
701 g_free (dbg);
702
703 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
704 GST_DEBUG_GRAPH_SHOW_ALL, "ipc.slave.error");
705 break;
706 }
707 case GST_MESSAGE_WARNING:{
708 GError *err;
709 gchar *dbg;
710
711 gst_message_parse_warning (msg, &err, &dbg);
712 g_printerr ("SLAVE: WARNING: %s\n", err->message);
713 if (dbg != NULL)
714 g_printerr ("SLAVE: WARNING debug information: %s\n", dbg);
715 g_error_free (err);
716 g_free (dbg);
717
718 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
719 GST_DEBUG_GRAPH_SHOW_ALL, "ipc.slave.warning");
720 break;
721 }
722 case GST_MESSAGE_ASYNC_START:
723 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
724 GST_DEBUG_GRAPH_SHOW_VERBOSE, "ipc.slave.async-start");
725 break;
726 case GST_MESSAGE_ASYNC_DONE:
727 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
728 GST_DEBUG_GRAPH_SHOW_ALL, "ipc.slave.async-done");
729 break;
730 default:
731 break;
732 }
733 return TRUE;
734 }
735
736 static void
on_decoded_pad_added(GstElement * element,GstPad * pad,gpointer data)737 on_decoded_pad_added (GstElement * element, GstPad * pad, gpointer data)
738 {
739 GstBin *pipeline = data;
740 GstCaps *caps;
741 GstPad *cpad;
742 const gchar *type;
743 gchar *capsstr;
744
745 caps = gst_pad_get_current_caps (pad);
746 capsstr = gst_caps_to_string (caps);
747 printf (" caps: %s\n", capsstr);
748 g_free (capsstr);
749
750 type = gst_structure_get_name (gst_caps_get_structure (caps, 0));
751 if (!strcmp (type, "video/x-raw")) {
752 GstElement *c, *s;
753 c = gst_element_factory_make ("videoconvert", NULL);
754 s = gst_element_factory_make (arg_video_sink, NULL);
755 gst_bin_add_many (GST_BIN (pipeline), c, s, NULL);
756 gst_element_link_many (c, s, NULL);
757 cpad = gst_element_get_static_pad (c, "sink");
758 gst_pad_link (pad, cpad);
759 gst_object_unref (cpad);
760 gst_element_set_state (s, GST_STATE_PLAYING);
761 gst_element_set_state (c, GST_STATE_PLAYING);
762 } else if (!strcmp (type, "audio/x-raw")) {
763 GstElement *c, *s;
764 c = gst_element_factory_make ("audioconvert", NULL);
765 s = gst_element_factory_make (arg_audio_sink, NULL);
766 gst_bin_add_many (GST_BIN (pipeline), c, s, NULL);
767 gst_element_link_many (c, s, NULL);
768 cpad = gst_element_get_static_pad (c, "sink");
769 gst_pad_link (pad, cpad);
770 gst_object_unref (cpad);
771 gst_element_set_state (s, GST_STATE_PLAYING);
772 gst_element_set_state (c, GST_STATE_PLAYING);
773 } else {
774 GstElement *s;
775 s = gst_element_factory_make ("fakesink", NULL);
776 g_object_set (s, "sync", TRUE, "async", TRUE, NULL);
777 gst_bin_add_many (GST_BIN (pipeline), s, NULL);
778 cpad = gst_element_get_static_pad (s, "sink");
779 gst_pad_link (pad, cpad);
780 gst_object_unref (cpad);
781 gst_element_set_state (s, GST_STATE_PLAYING);
782 }
783
784 gst_caps_unref (caps);
785
786 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
787 GST_DEBUG_GRAPH_SHOW_ALL, "decoded.pad.added");
788 }
789
790 static int
recvfd(int s)791 recvfd (int s)
792 {
793 int n;
794 int fd;
795 char buf[1];
796 struct iovec iov;
797 struct msghdr msg;
798 struct cmsghdr *cmsg;
799 char cms[CMSG_SPACE (sizeof (int))];
800
801 iov.iov_base = buf;
802 iov.iov_len = 1;
803
804 memset (&msg, 0, sizeof msg);
805 msg.msg_name = 0;
806 msg.msg_namelen = 0;
807 msg.msg_iov = &iov;
808 msg.msg_iovlen = 1;
809
810 msg.msg_control = (caddr_t) cms;
811 msg.msg_controllen = sizeof cms;
812
813 if ((n = recvmsg (s, &msg, 0)) < 0)
814 return -1;
815 if (n == 0) {
816 perror ("unexpected EOF");
817 return -1;
818 }
819 cmsg = CMSG_FIRSTHDR (&msg);
820 memmove (&fd, CMSG_DATA (cmsg), sizeof (int));
821 return fd;
822 }
823
824 static gboolean
pipe_reader(gpointer data)825 pipe_reader (gpointer data)
826 {
827 GstElement *pipeline = data;
828 GstElement *ipcpipelinesrc, *mq, *decodebin;
829 GstPad *rpad, *sink_pad;
830 int fd;
831 fd_set set;
832 struct timeval tv;
833 int ret;
834 static int idx = 0;
835 char name[32];
836
837 FD_ZERO (&set);
838 FD_SET (pipes[0], &set);
839 tv.tv_sec = tv.tv_usec = 0;
840 ret = select (pipes[0] + 1, &set, NULL, NULL, &tv);
841 if (ret < 0) {
842 fprintf (stderr, "Failed to select: %s\n", strerror (errno));
843 return TRUE;
844 }
845 if (!FD_ISSET (pipes[0], &set))
846 return TRUE;
847
848 fd = recvfd (pipes[0]);
849 ipcpipelinesrc = gst_element_factory_make ("ipcpipelinesrc", NULL);
850 gst_bin_add (GST_BIN (pipeline), ipcpipelinesrc);
851 g_object_set (ipcpipelinesrc, "fdin", fd, "fdout", fd, NULL);
852
853
854 mq = gst_bin_get_by_name (GST_BIN (pipeline), "mq");
855 if (!mq) {
856 fprintf (stderr, "Failed to get mq\n");
857 return TRUE;
858 }
859 if (!gst_element_link (ipcpipelinesrc, mq)) {
860 fprintf (stderr, "Failed to link ipcpipelinesrc and mq\n");
861 return TRUE;
862 }
863
864 snprintf (name, sizeof (name), "src_%u", idx++);
865 rpad = gst_element_get_static_pad (mq, name);
866 if (!rpad) {
867 fprintf (stderr, "Failed to get mq request pad\n");
868 return TRUE;
869 }
870
871 decodebin = gst_element_factory_make ("decodebin", NULL);
872 gst_bin_add (GST_BIN (pipeline), decodebin);
873 sink_pad = gst_element_get_static_pad (decodebin, "sink");
874 gst_pad_link (rpad, sink_pad);
875 gst_object_unref (sink_pad);
876
877 g_signal_connect (decodebin, "pad-added", G_CALLBACK (on_decoded_pad_added),
878 pipeline);
879
880 /* dynamically added elements should be synced manually
881 * to the state of the slave pipeline */
882 gst_element_sync_state_with_parent (ipcpipelinesrc);
883 gst_element_sync_state_with_parent (decodebin);
884
885 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
886 GST_DEBUG_GRAPH_SHOW_ALL, "ipc.slave.added");
887 gst_object_unref (mq);
888
889 return TRUE;
890 }
891
892 static void
start_sink(void)893 start_sink (void)
894 {
895 GstElement *pipeline;
896 GstElement *multiqueue;
897
898 pipeline = gst_element_factory_make ("ipcslavepipeline", NULL);
899 gst_bus_add_watch (GST_ELEMENT_BUS (pipeline), slave_bus_msg, pipeline);
900
901 multiqueue = gst_element_factory_make ("multiqueue", "mq");
902 gst_bin_add (GST_BIN (pipeline), multiqueue);
903
904 g_timeout_add (10, &pipe_reader, gst_object_ref (pipeline));
905
906 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
907 GST_DEBUG_GRAPH_SHOW_ALL, "ipc.sink");
908 /* The state of the slave pipeline will change together with the state
909 * of the master, there is no need to call gst_element_set_state() here */
910 }
911
912
913 /********** COMMON ***********/
914
915 static void
init(int * argc,char *** argv)916 init (int *argc, char ***argv)
917 {
918 GOptionEntry options[] = {
919 {"audio-sink", 0, 0, G_OPTION_ARG_STRING, &arg_audio_sink,
920 "Audio sink element to use (default autoaudiosink)", NULL},
921 {"video-sink", 0, 0, G_OPTION_ARG_STRING, &arg_video_sink,
922 "Video sink element to use (default autovideosink)", NULL},
923 {NULL}
924 };
925 GOptionContext *ctx;
926 GError *err = NULL;
927
928 ctx = g_option_context_new ("");
929 g_option_context_add_main_entries (ctx, options, "");
930 if (!g_option_context_parse (ctx, argc, argv, &err)) {
931 fprintf (stderr, "Error initializing: %s\n", err->message);
932 exit (1);
933 }
934 g_option_context_free (ctx);
935 }
936
937 static void
run(pid_t pid)938 run (pid_t pid)
939 {
940 loop = g_main_loop_new (NULL, FALSE);
941 g_main_loop_run (loop);
942 if (pid > 0)
943 kill (pid, SIGTERM);
944 }
945
946 gint
main(gint argc,gchar ** argv)947 main (gint argc, gchar ** argv)
948 {
949 GError *error = NULL;
950 gchar *uri = NULL;
951 pid_t pid;
952
953 init (&argc, &argv);
954
955 if (argc < 2) {
956 fprintf (stderr, "usage: %s [av-filename-or-url]\n", argv[0]);
957 return 1;
958 }
959
960 if (!g_strstr_len (argv[1], -1, "://")) {
961 uri = gst_filename_to_uri (argv[1], &error);
962 } else {
963 uri = g_strdup (argv[1]);
964 }
965
966 if (error) {
967 fprintf (stderr, "usage: %s [av-filename-or-url]\n", argv[0]);
968 g_clear_error (&error);
969 return 1;
970 }
971
972 if (socketpair (AF_UNIX, SOCK_STREAM, 0, pipes)) {
973 fprintf (stderr, "Error creating pipes: %s\n", strerror (errno));
974 return 2;
975 }
976 if (fcntl (pipes[0], F_SETFL, O_NONBLOCK) < 0 ||
977 fcntl (pipes[1], F_SETFL, O_NONBLOCK) < 0) {
978 fprintf (stderr, "Error setting O_NONBLOCK on pipes: %s\n",
979 strerror (errno));
980 return 2;
981 }
982
983 pid = fork ();
984 if (pid < 0) {
985 fprintf (stderr, "Error forking: %s\n", strerror (errno));
986 return 1;
987 } else if (pid > 0) {
988 setenv ("GST_DEBUG_FILE", "gstsrc.log", 1);
989 gst_init (&argc, &argv);
990 start_source (uri);
991 } else {
992 setenv ("GST_DEBUG_FILE", "gstsink.log", 1);
993 gst_init (&argc, &argv);
994 start_sink ();
995 }
996
997 g_free (uri);
998 run (pid);
999
1000 return 0;
1001 }
1002