1 /* GStreamer
2 * Copyright (C) 2013 Collabora Ltd.
3 * @author Torrie Fischer <torrie.fischer@collabora.co.uk>
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
14 *
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 */
20 #include <gst/gst.h>
21 #include <gst/rtp/rtp.h>
22 #include <stdlib.h>
23
24 /*
25 * RTP receiver with RFC4588 retransmission handling enabled
26 *
27 * In this example we have two RTP sessions, one for video and one for audio.
28 * Video is received on port 5000, with its RTCP stream received on port 5001
29 * and sent on port 5005. Audio is received on port 5005, with its RTCP stream
30 * received on port 5006 and sent on port 5011.
31 *
32 * In both sessions, we set "rtprtxreceive" as the session's "aux" element
33 * in rtpbin, which enables RFC4588 retransmission handling for that session.
34 *
35 * .-------. .----------. .-----------. .---------. .-------------.
36 * RTP |udpsrc | | rtpbin | |theoradepay| |theoradec| |autovideosink|
37 * port=5000 | src->recv_rtp_0 recv_rtp_0->sink src->sink src->sink |
38 * '-------' | | '-----------' '---------' '-------------'
39 * | |
40 * | | .-------.
41 * | | |udpsink| RTCP
42 * | send_rtcp_0->sink | port=5005
43 * .-------. | | '-------' sync=false
44 * RTCP |udpsrc | | | async=false
45 * port=5001 | src->recv_rtcp_0 |
46 * '-------' | |
47 * | |
48 * .-------. | | .---------. .-------. .-------------.
49 * RTP |udpsrc | | | |pcmadepay| |alawdec| |autoaudiosink|
50 * port=5006 | src->recv_rtp_1 recv_rtp_1->sink src->sink src->sink |
51 * '-------' | | '---------' '-------' '-------------'
52 * | |
53 * | | .-------.
54 * | | |udpsink| RTCP
55 * | send_rtcp_1->sink | port=5011
56 * .-------. | | '-------' sync=false
57 * RTCP |udpsrc | | | async=false
58 * port=5007 | src->recv_rtcp_1 |
59 * '-------' '----------'
60 *
61 */
62
63 GMainLoop *loop = NULL;
64
65 typedef struct _SessionData
66 {
67 int ref;
68 GstElement *rtpbin;
69 guint sessionNum;
70 GstCaps *caps;
71 GstElement *output;
72 } SessionData;
73
74 static SessionData *
session_ref(SessionData * data)75 session_ref (SessionData * data)
76 {
77 g_atomic_int_inc (&data->ref);
78 return data;
79 }
80
81 static void
session_unref(gpointer data)82 session_unref (gpointer data)
83 {
84 SessionData *session = (SessionData *) data;
85 if (g_atomic_int_dec_and_test (&session->ref)) {
86 g_object_unref (session->rtpbin);
87 gst_caps_unref (session->caps);
88 g_free (session);
89 }
90 }
91
92 static SessionData *
session_new(guint sessionNum)93 session_new (guint sessionNum)
94 {
95 SessionData *ret = g_new0 (SessionData, 1);
96 ret->sessionNum = sessionNum;
97 return session_ref (ret);
98 }
99
100 static void
setup_ghost_sink(GstElement * sink,GstBin * bin)101 setup_ghost_sink (GstElement * sink, GstBin * bin)
102 {
103 GstPad *sinkPad = gst_element_get_static_pad (sink, "sink");
104 GstPad *binPad = gst_ghost_pad_new ("sink", sinkPad);
105 gst_element_add_pad (GST_ELEMENT (bin), binPad);
106 }
107
108 static SessionData *
make_audio_session(guint sessionNum)109 make_audio_session (guint sessionNum)
110 {
111 SessionData *ret = session_new (sessionNum);
112 GstBin *bin = GST_BIN (gst_bin_new ("audio"));
113 GstElement *queue = gst_element_factory_make ("queue", NULL);
114 GstElement *sink = gst_element_factory_make ("autoaudiosink", NULL);
115 GstElement *audioconvert = gst_element_factory_make ("audioconvert", NULL);
116 GstElement *audioresample = gst_element_factory_make ("audioresample", NULL);
117 GstElement *depayloader = gst_element_factory_make ("rtppcmadepay", NULL);
118 GstElement *decoder = gst_element_factory_make ("alawdec", NULL);
119
120 gst_bin_add_many (bin, queue, depayloader, decoder, audioconvert,
121 audioresample, sink, NULL);
122 gst_element_link_many (queue, depayloader, decoder, audioconvert,
123 audioresample, sink, NULL);
124
125 setup_ghost_sink (queue, bin);
126
127 ret->output = GST_ELEMENT (bin);
128 ret->caps = gst_caps_new_simple ("application/x-rtp",
129 "media", G_TYPE_STRING, "audio",
130 "clock-rate", G_TYPE_INT, 8000,
131 "encoding-name", G_TYPE_STRING, "PCMA", NULL);
132
133 return ret;
134 }
135
136 static SessionData *
make_video_session(guint sessionNum)137 make_video_session (guint sessionNum)
138 {
139 SessionData *ret = session_new (sessionNum);
140 GstBin *bin = GST_BIN (gst_bin_new ("video"));
141 GstElement *queue = gst_element_factory_make ("queue", NULL);
142 GstElement *depayloader = gst_element_factory_make ("rtptheoradepay", NULL);
143 GstElement *decoder = gst_element_factory_make ("theoradec", NULL);
144 GstElement *converter = gst_element_factory_make ("videoconvert", NULL);
145 GstElement *sink = gst_element_factory_make ("autovideosink", NULL);
146
147 gst_bin_add_many (bin, depayloader, decoder, converter, queue, sink, NULL);
148 gst_element_link_many (queue, depayloader, decoder, converter, sink, NULL);
149
150 setup_ghost_sink (queue, bin);
151
152 ret->output = GST_ELEMENT (bin);
153 ret->caps = gst_caps_new_simple ("application/x-rtp",
154 "media", G_TYPE_STRING, "video",
155 "clock-rate", G_TYPE_INT, 90000,
156 "encoding-name", G_TYPE_STRING, "THEORA", NULL);
157
158 return ret;
159 }
160
161 static GstCaps *
request_pt_map(GstElement * rtpbin,guint session,guint pt,gpointer user_data)162 request_pt_map (GstElement * rtpbin, guint session, guint pt,
163 gpointer user_data)
164 {
165 SessionData *data = (SessionData *) user_data;
166 gchar *caps_str;
167 g_print ("Looking for caps for pt %u in session %u, have %u\n", pt, session,
168 data->sessionNum);
169 if (session == data->sessionNum) {
170 caps_str = gst_caps_to_string (data->caps);
171 g_print ("Returning %s\n", caps_str);
172 g_free (caps_str);
173 return gst_caps_ref (data->caps);
174 }
175 return NULL;
176 }
177
178 static void
cb_eos(GstBus * bus,GstMessage * message,gpointer data)179 cb_eos (GstBus * bus, GstMessage * message, gpointer data)
180 {
181 g_print ("Got EOS\n");
182 g_main_loop_quit (loop);
183 }
184
185 static void
cb_state(GstBus * bus,GstMessage * message,gpointer data)186 cb_state (GstBus * bus, GstMessage * message, gpointer data)
187 {
188 GstObject *pipe = GST_OBJECT (data);
189 GstState old, new, pending;
190 gst_message_parse_state_changed (message, &old, &new, &pending);
191 if (message->src == pipe) {
192 g_print ("Pipeline %s changed state from %s to %s\n",
193 GST_OBJECT_NAME (message->src),
194 gst_element_state_get_name (old), gst_element_state_get_name (new));
195 }
196 }
197
198 static void
cb_warning(GstBus * bus,GstMessage * message,gpointer data)199 cb_warning (GstBus * bus, GstMessage * message, gpointer data)
200 {
201 GError *error = NULL;
202 gst_message_parse_warning (message, &error, NULL);
203 g_printerr ("Got warning from %s: %s\n", GST_OBJECT_NAME (message->src),
204 error->message);
205 g_error_free (error);
206 }
207
208 static void
cb_error(GstBus * bus,GstMessage * message,gpointer data)209 cb_error (GstBus * bus, GstMessage * message, gpointer data)
210 {
211 GError *error = NULL;
212 gst_message_parse_error (message, &error, NULL);
213 g_printerr ("Got error from %s: %s\n", GST_OBJECT_NAME (message->src),
214 error->message);
215 g_error_free (error);
216 g_main_loop_quit (loop);
217 }
218
219 static void
handle_new_stream(GstElement * element,GstPad * newPad,gpointer data)220 handle_new_stream (GstElement * element, GstPad * newPad, gpointer data)
221 {
222 SessionData *session = (SessionData *) data;
223 gchar *padName;
224 gchar *myPrefix;
225
226 padName = gst_pad_get_name (newPad);
227 myPrefix = g_strdup_printf ("recv_rtp_src_%u", session->sessionNum);
228
229 g_print ("New pad: %s, looking for %s_*\n", padName, myPrefix);
230
231 if (g_str_has_prefix (padName, myPrefix)) {
232 GstPad *outputSinkPad;
233 GstElement *parent;
234
235 parent = GST_ELEMENT (gst_element_get_parent (session->rtpbin));
236 gst_bin_add (GST_BIN (parent), session->output);
237 gst_element_sync_state_with_parent (session->output);
238 gst_object_unref (parent);
239
240 outputSinkPad = gst_element_get_static_pad (session->output, "sink");
241 g_assert_cmpint (gst_pad_link (newPad, outputSinkPad), ==, GST_PAD_LINK_OK);
242 gst_object_unref (outputSinkPad);
243
244 g_print ("Linked!\n");
245 }
246 g_free (myPrefix);
247 g_free (padName);
248 }
249
250 static GstElement *
request_aux_receiver(GstElement * rtpbin,guint sessid,SessionData * session)251 request_aux_receiver (GstElement * rtpbin, guint sessid, SessionData * session)
252 {
253 GstElement *rtx, *bin;
254 GstPad *pad;
255 gchar *name;
256 GstStructure *pt_map;
257
258 GST_INFO ("creating AUX receiver");
259 bin = gst_bin_new (NULL);
260 rtx = gst_element_factory_make ("rtprtxreceive", NULL);
261 pt_map = gst_structure_new ("application/x-rtp-pt-map",
262 "8", G_TYPE_UINT, 98, "96", G_TYPE_UINT, 99, NULL);
263 g_object_set (rtx, "payload-type-map", pt_map, NULL);
264 gst_structure_free (pt_map);
265 gst_bin_add (GST_BIN (bin), rtx);
266
267 pad = gst_element_get_static_pad (rtx, "src");
268 name = g_strdup_printf ("src_%u", sessid);
269 gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
270 g_free (name);
271 gst_object_unref (pad);
272
273 pad = gst_element_get_static_pad (rtx, "sink");
274 name = g_strdup_printf ("sink_%u", sessid);
275 gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
276 g_free (name);
277 gst_object_unref (pad);
278
279 return bin;
280 }
281
282 static void
join_session(GstElement * pipeline,GstElement * rtpBin,SessionData * session)283 join_session (GstElement * pipeline, GstElement * rtpBin, SessionData * session)
284 {
285 GstElement *rtpSrc;
286 GstElement *rtcpSrc;
287 GstElement *rtcpSink;
288 gchar *padName;
289 guint basePort;
290
291 g_print ("Joining session %p\n", session);
292
293 session->rtpbin = g_object_ref (rtpBin);
294
295 basePort = 5000 + (session->sessionNum * 6);
296
297 rtpSrc = gst_element_factory_make ("udpsrc", NULL);
298 rtcpSrc = gst_element_factory_make ("udpsrc", NULL);
299 rtcpSink = gst_element_factory_make ("udpsink", NULL);
300 g_object_set (rtpSrc, "port", basePort, "caps", session->caps, NULL);
301 g_object_set (rtcpSink, "port", basePort + 5, "host", "127.0.0.1", "sync",
302 FALSE, "async", FALSE, NULL);
303 g_object_set (rtcpSrc, "port", basePort + 1, NULL);
304
305 g_print ("Connecting to %i/%i/%i\n", basePort, basePort + 1, basePort + 5);
306
307 /* enable RFC4588 retransmission handling by setting rtprtxreceive
308 * as the "aux" element of rtpbin */
309 g_signal_connect (rtpBin, "request-aux-receiver",
310 (GCallback) request_aux_receiver, session);
311
312 gst_bin_add_many (GST_BIN (pipeline), rtpSrc, rtcpSrc, rtcpSink, NULL);
313
314 g_signal_connect_data (rtpBin, "pad-added", G_CALLBACK (handle_new_stream),
315 session_ref (session), (GClosureNotify) session_unref, 0);
316
317 g_signal_connect_data (rtpBin, "request-pt-map", G_CALLBACK (request_pt_map),
318 session_ref (session), (GClosureNotify) session_unref, 0);
319
320 padName = g_strdup_printf ("recv_rtp_sink_%u", session->sessionNum);
321 gst_element_link_pads (rtpSrc, "src", rtpBin, padName);
322 g_free (padName);
323
324 padName = g_strdup_printf ("recv_rtcp_sink_%u", session->sessionNum);
325 gst_element_link_pads (rtcpSrc, "src", rtpBin, padName);
326 g_free (padName);
327
328 padName = g_strdup_printf ("send_rtcp_src_%u", session->sessionNum);
329 gst_element_link_pads (rtpBin, padName, rtcpSink, "sink");
330 g_free (padName);
331
332 session_unref (session);
333 }
334
335 int
main(int argc,char ** argv)336 main (int argc, char **argv)
337 {
338 GstPipeline *pipe;
339 SessionData *videoSession;
340 SessionData *audioSession;
341 GstElement *rtpBin;
342 GstBus *bus;
343
344 gst_init (&argc, &argv);
345
346 loop = g_main_loop_new (NULL, FALSE);
347 pipe = GST_PIPELINE (gst_pipeline_new (NULL));
348
349 bus = gst_element_get_bus (GST_ELEMENT (pipe));
350 g_signal_connect (bus, "message::error", G_CALLBACK (cb_error), pipe);
351 g_signal_connect (bus, "message::warning", G_CALLBACK (cb_warning), pipe);
352 g_signal_connect (bus, "message::state-changed", G_CALLBACK (cb_state), pipe);
353 g_signal_connect (bus, "message::eos", G_CALLBACK (cb_eos), NULL);
354 gst_bus_add_signal_watch (bus);
355 gst_object_unref (bus);
356
357 rtpBin = gst_element_factory_make ("rtpbin", NULL);
358 gst_bin_add (GST_BIN (pipe), rtpBin);
359 g_object_set (rtpBin, "latency", 200, "do-retransmission", TRUE,
360 "rtp-profile", GST_RTP_PROFILE_AVPF, NULL);
361
362 videoSession = make_video_session (0);
363 audioSession = make_audio_session (1);
364
365 join_session (GST_ELEMENT (pipe), rtpBin, videoSession);
366 join_session (GST_ELEMENT (pipe), rtpBin, audioSession);
367
368 g_print ("starting client pipeline\n");
369 gst_element_set_state (GST_ELEMENT (pipe), GST_STATE_PLAYING);
370
371 g_main_loop_run (loop);
372
373 g_print ("stopping client pipeline\n");
374 gst_element_set_state (GST_ELEMENT (pipe), GST_STATE_NULL);
375
376 gst_object_unref (pipe);
377 g_main_loop_unref (loop);
378
379 return 0;
380 }
381