1 /* GStreamer RIST plugin
2 * Copyright (C) 2019 Net Insight AB
3 * Author: Nicolas Dufresne <nicolas.dufresne@collabora.com>
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
14 *
15 * You should have received a copy of the GNU Library General Public
16 * License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 */
20
21 /**
22 * SECTION:element-ristsrc
23 * @title: ristsrc
24 * @see_also: ristsink
25 *
26 * This element implements RIST TR-06-1 Simple Profile receiver. The stream
27 * produced by this element will be RTP payloaded. This element also implements
28 * the URI scheme `rist://` allowing to render RIST streams in GStreamer based
29 * media players. The RIST URI handler also allows setting properties through
30 * the URI query.
31 *
32 * It also implements part of the RIST TR-06-2 Main Profile receiver. The
33 * tunneling, multiplexing and encryption parts of the specification are not
34 * included. This element will accept the RIST RTP header extension and restore
35 * the null MPEG-TS packets if the extension is included. It will not currently
36 * use the sequence number extension when sending RTCP NACK requests.
37 *
38 * ## Example gst-launch line
39 * |[
40 * gst-launch-1.0 ristsrc address=0.0.0.0 port=5004 ! rtpmp2tdepay ! udpsink
41 * gst-play-1.0 "rist://0.0.0.0:5004?receiver-buffer=700"
42 * ]|
43 *
44 * Additionally, this element supports link bonding, which means it
45 * can receive the same stream from multiple addresses. Each address
46 * will be mapped to its own RTP session. In order to enable bonding
47 * support, one need to configure the list of addresses through
48 * "bonding-addresses" properties.
49 *
50 * ## Example gst-launch line for bonding
51 * |[
52 * gst-launch-1.0 ristsrc bonding-addresses="10.0.0.1:5004,11.0.0.1:5006" ! rtpmp2tdepay ! udpsink
53 * gst-play-1.0 "rist://0.0.0.0:5004?bonding-addresses=10.0.0.1:5004,11.0.0.1:5006"
54 * ]|
55 */
56
57 /* using GValueArray, which has not replacement */
58 #define GLIB_DISABLE_DEPRECATION_WARNINGS
59
60 #ifdef HAVE_CONFIG_H
61 #include "config.h"
62 #endif
63
64 #include <gio/gio.h>
65 #include <gst/net/net.h>
66 #include <gst/rtp/rtp.h>
67
68 /* for strtol() */
69 #include <stdlib.h>
70
71 #ifdef HAVE_SYS_SOCKET_H
72 #include <sys/socket.h>
73 #endif
74
75 #include "gstrist.h"
76
77 GST_DEBUG_CATEGORY_STATIC (gst_rist_src_debug);
78 #define GST_CAT_DEFAULT gst_rist_src_debug
79
80 enum
81 {
82 PROP_ADDRESS = 1,
83 PROP_PORT,
84 PROP_RECEIVER_BUFFER,
85 PROP_REORDER_SECTION,
86 PROP_MAX_RTX_RETRIES,
87 PROP_MIN_RTCP_INTERVAL,
88 PROP_MAX_RTCP_BANDWIDTH,
89 PROP_STATS_UPDATE_INTERVAL,
90 PROP_STATS,
91 PROP_CNAME,
92 PROP_MULTICAST_LOOPBACK,
93 PROP_MULTICAST_IFACE,
94 PROP_MULTICAST_TTL,
95 PROP_BONDING_ADDRESSES
96 };
97
98 static GstStaticPadTemplate src_templ = GST_STATIC_PAD_TEMPLATE ("src",
99 GST_PAD_SRC,
100 GST_PAD_ALWAYS,
101 GST_STATIC_CAPS ("application/x-rtp"));
102
103
104 typedef struct
105 {
106 guint session;
107 gchar *address;
108 gchar *multicast_iface;
109 guint port;
110
111 GstElement *rtcp_src;
112 GstElement *rtp_src;
113 GstElement *rtcp_sink;
114 GstElement *rtx_receive;
115 gulong rtcp_recv_probe;
116 gulong rtcp_send_probe;
117 GSocketAddress *rtcp_send_addr;
118
119 } RistReceiverBond;
120
121 struct _GstRistSrc
122 {
123 GstBin parent;
124
125 GstUri *uri;
126
127 /* Common elements in the pipeline */
128 GstElement *rtpbin;
129 GstPad *srcpad;
130 GstElement *rtxbin;
131 GstElement *rtx_funnel;
132 GstElement *rtpdeext;
133
134 /* Common properties, protected by bonds_lock */
135 guint reorder_section;
136 guint max_rtx_retries;
137 GstClockTime min_rtcp_interval;
138 gdouble max_rtcp_bandwidth;
139 gint multicast_loopback;
140 gint multicast_ttl;
141
142 /* Bonds */
143 GPtrArray *bonds;
144 /* this is needed as setting sibling properties will try to take the object
145 * lock. Thus, any properties that affects the bonds will be protected with
146 * that lock instead of the object lock. */
147 GMutex bonds_lock;
148
149 /* For stats */
150 guint stats_interval;
151 guint32 rtp_ssrc;
152 GstClockID stats_cid;
153 GstElement *jitterbuffer;
154
155 /* This is set whenever there is a pipeline construction failure, and used
156 * to fail state changes later */
157 gboolean construct_failed;
158 const gchar *missing_plugin;
159 };
160
161 static void gst_rist_src_uri_init (gpointer g_iface, gpointer iface_data);
162
163 G_DEFINE_TYPE_WITH_CODE (GstRistSrc, gst_rist_src, GST_TYPE_BIN,
164 G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_rist_src_uri_init);
165 GST_DEBUG_CATEGORY_INIT (gst_rist_src_debug, "ristsrc", 0, "RIST Source"));
166 GST_ELEMENT_REGISTER_DEFINE (ristsrc, "ristsrc", GST_RANK_PRIMARY,
167 GST_TYPE_RIST_SRC);
168
169 /* called with bonds lock */
170 static RistReceiverBond *
gst_rist_src_add_bond(GstRistSrc * src)171 gst_rist_src_add_bond (GstRistSrc * src)
172 {
173 RistReceiverBond *bond = g_slice_new0 (RistReceiverBond);
174 GstPad *pad, *gpad;
175 gchar name[32];
176
177 bond->session = src->bonds->len;
178 bond->address = g_strdup ("0.0.0.0");
179
180 g_snprintf (name, 32, "rist_rtx_receive%u", bond->session);
181 bond->rtx_receive = gst_element_factory_make ("ristrtxreceive", name);
182 gst_bin_add (GST_BIN (src->rtxbin), bond->rtx_receive);
183
184 g_snprintf (name, 32, "sink_%u", bond->session);
185 gst_element_link_pads (bond->rtx_receive, "src", src->rtx_funnel, name);
186
187 g_snprintf (name, 32, "sink_%u", bond->session);
188 pad = gst_element_get_static_pad (bond->rtx_receive, "sink");
189 gpad = gst_ghost_pad_new (name, pad);
190 gst_object_unref (pad);
191 gst_element_add_pad (src->rtxbin, gpad);
192
193 g_snprintf (name, 32, "rist_rtp_udpsrc%u", bond->session);
194 bond->rtp_src = gst_element_factory_make ("udpsrc", name);
195 g_snprintf (name, 32, "rist_rtcp_udpsrc%u", bond->session);
196 bond->rtcp_src = gst_element_factory_make ("udpsrc", name);
197 g_snprintf (name, 32, "rist_rtcp_dynudpsink%u", bond->session);
198 bond->rtcp_sink = gst_element_factory_make ("dynudpsink", name);
199 if (!bond->rtp_src || !bond->rtcp_src || !bond->rtcp_sink) {
200 g_clear_object (&bond->rtp_src);
201 g_clear_object (&bond->rtcp_src);
202 g_clear_object (&bond->rtcp_sink);
203 g_slice_free (RistReceiverBond, bond);
204 src->missing_plugin = "udp";
205 return NULL;
206 }
207 gst_bin_add_many (GST_BIN (src), bond->rtp_src, bond->rtcp_src,
208 bond->rtcp_sink, NULL);
209 g_object_set (bond->rtcp_sink, "sync", FALSE, "async", FALSE, NULL);
210 gst_element_set_locked_state (bond->rtcp_sink, TRUE);
211
212 g_snprintf (name, 32, "recv_rtp_sink_%u", bond->session);
213 gst_element_link_pads (bond->rtp_src, "src", src->rtpbin, name);
214 g_snprintf (name, 32, "recv_rtcp_sink_%u", bond->session);
215 gst_element_link_pads (bond->rtcp_src, "src", src->rtpbin, name);
216 g_snprintf (name, 32, "send_rtcp_src_%u", bond->session);
217 gst_element_link_pads (src->rtpbin, name, bond->rtcp_sink, "sink");
218
219 g_ptr_array_add (src->bonds, bond);
220 return bond;
221 }
222
223 static void
gst_rist_src_pad_added(GstRistSrc * src,GstPad * new_pad,GstElement * rtpbin)224 gst_rist_src_pad_added (GstRistSrc * src, GstPad * new_pad, GstElement * rtpbin)
225 {
226 GST_TRACE_OBJECT (src, "New pad '%s'.", GST_PAD_NAME (new_pad));
227
228 if (g_str_has_prefix (GST_PAD_NAME (new_pad), "recv_rtp_src_0_")) {
229 GST_DEBUG_OBJECT (src, "Using new pad '%s' as ghost pad target.",
230 GST_PAD_NAME (new_pad));
231 gst_ghost_pad_set_target (GST_GHOST_PAD (src->srcpad), new_pad);
232 }
233 }
234
235 static GstCaps *
gst_rist_src_request_pt_map(GstRistSrc * src,guint session_id,guint pt,GstElement * rtpbin)236 gst_rist_src_request_pt_map (GstRistSrc * src, guint session_id, guint pt,
237 GstElement * rtpbin)
238 {
239 const GstRTPPayloadInfo *pt_info;
240 GstCaps *ret;
241
242 pt_info = gst_rtp_payload_info_for_pt (pt);
243 if (!pt_info || !pt_info->clock_rate)
244 return NULL;
245
246 ret = gst_caps_new_simple ("application/x-rtp",
247 "media", G_TYPE_STRING, pt_info->media,
248 "encoding_name", G_TYPE_STRING, pt_info->encoding_name,
249 "clock-rate", G_TYPE_INT, (gint) pt_info->clock_rate, NULL);
250
251 /* FIXME add sprop-parameter-set if any */
252 g_warn_if_fail (pt_info->encoding_parameters == NULL);
253
254 return ret;
255 }
256
257 static GstElement *
gst_rist_src_request_aux_receiver(GstRistSrc * src,guint session_id,GstElement * rtpbin)258 gst_rist_src_request_aux_receiver (GstRistSrc * src, guint session_id,
259 GstElement * rtpbin)
260 {
261 return gst_object_ref (src->rtxbin);
262 }
263
264 /* Overrides the nack creation. Right now we don't send mixed NACKS type, we
265 * simply send a set of range NACK if it takes less space, or allow adding
266 * more seqnum. */
267 static guint
gst_rist_src_on_sending_nacks(GObject * session,guint sender_ssrc,guint media_ssrc,GArray * nacks,GstBuffer * buffer,gpointer user_data)268 gst_rist_src_on_sending_nacks (GObject * session, guint sender_ssrc,
269 guint media_ssrc, GArray * nacks, GstBuffer * buffer, gpointer user_data)
270 {
271 GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
272 GstRTCPPacket packet;
273 guint8 *app_data;
274 guint nacked_seqnums = 0;
275 guint range_size = 0;
276 guint n_rg_nacks = 0;
277 guint n_fb_nacks = 0;
278 guint16 seqnum;
279 guint i;
280 gint diff;
281
282 /* We'll assume that range will be best, and find how many generic NACK
283 * would have been created. If this number ends up being smaller, we will
284 * just remove the APP packet and return 0, leaving it to RTPSession to
285 * create the generic NACK.*/
286
287 gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &rtcp);
288 if (!gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_APP, &packet))
289 /* exit because the packet is full, will put next request in a
290 * further packet */
291 goto done;
292
293 gst_rtcp_packet_app_set_ssrc (&packet, media_ssrc);
294 gst_rtcp_packet_app_set_name (&packet, "RIST");
295
296 if (!gst_rtcp_packet_app_set_data_length (&packet, 1)) {
297 gst_rtcp_packet_remove (&packet);
298 GST_WARNING ("no range nacks fit in the packet");
299 goto done;
300 }
301
302 app_data = gst_rtcp_packet_app_get_data (&packet);
303 for (i = 0; i < nacks->len; i = nacked_seqnums) {
304 guint j;
305 seqnum = g_array_index (nacks, guint16, i);
306
307 if (!gst_rtcp_packet_app_set_data_length (&packet, n_rg_nacks + 1))
308 break;
309
310 n_rg_nacks++;
311 nacked_seqnums++;
312
313 for (j = i + 1; j < nacks->len; j++) {
314 guint16 next_seqnum = g_array_index (nacks, guint16, j);
315 diff = gst_rtp_buffer_compare_seqnum (seqnum, next_seqnum);
316 GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, next_seqnum, diff);
317 if (diff > (j - i))
318 break;
319
320 nacked_seqnums++;
321 }
322
323 range_size = j - i - 1;
324 GST_WRITE_UINT32_BE (app_data, seqnum << 16 | range_size);
325 app_data += 4;
326 }
327
328 /* count how many FB NACK it would take to wrap nacked_seqnums */
329 seqnum = g_array_index (nacks, guint16, 0);
330 n_fb_nacks = 1;
331 for (i = 1; i < nacked_seqnums; i++) {
332 guint16 next_seqnum = g_array_index (nacks, guint16, i);
333 diff = gst_rtp_buffer_compare_seqnum (seqnum, next_seqnum);
334 if (diff > 16) {
335 n_fb_nacks++;
336 seqnum = next_seqnum;
337 }
338 }
339
340 if (n_fb_nacks <= n_rg_nacks) {
341 GST_DEBUG ("Not sending %u range nacks, as %u FB nacks will be smaller",
342 n_rg_nacks, n_fb_nacks);
343 gst_rtcp_packet_remove (&packet);
344 nacked_seqnums = 0;
345 goto done;
346 }
347
348 GST_DEBUG ("Sent %u seqnums into %u Range NACKs", nacked_seqnums, n_rg_nacks);
349
350 done:
351 gst_rtcp_buffer_unmap (&rtcp);
352 return nacked_seqnums;
353 }
354
355 static void
gst_rist_src_on_new_ssrc(GstRistSrc * src,guint session_id,guint ssrc,GstElement * rtpbin)356 gst_rist_src_on_new_ssrc (GstRistSrc * src, guint session_id, guint ssrc,
357 GstElement * rtpbin)
358 {
359 GObject *session = NULL;
360 GObject *source = NULL;
361
362 g_signal_emit_by_name (rtpbin, "get-internal-session", session_id, &session);
363 g_signal_emit_by_name (session, "get-source-by-ssrc", ssrc, &source);
364
365 if (ssrc & 1) {
366 GST_DEBUG ("Disabling RTCP and probation on RTX stream "
367 "(SSRC %u on session %u)", ssrc, session_id);
368 g_object_set (source, "disable-rtcp", TRUE, "probation", 0, NULL);
369 } else {
370 g_signal_connect (session, "on-sending-nacks",
371 (GCallback) gst_rist_src_on_sending_nacks, NULL);
372 }
373
374 g_object_unref (source);
375 g_object_unref (session);
376 }
377
378 static void
gst_rist_src_new_jitterbuffer(GstRistSrc * src,GstElement * jitterbuffer,guint session,guint ssrc,GstElement * rtpbin)379 gst_rist_src_new_jitterbuffer (GstRistSrc * src, GstElement * jitterbuffer,
380 guint session, guint ssrc, GstElement * rtpbin)
381 {
382 if (session != 0) {
383 GST_WARNING_OBJECT (rtpbin, "Unexpected jitterbuffer created.");
384 return;
385 }
386
387 g_object_set (jitterbuffer, "rtx-delay", src->reorder_section,
388 "rtx-max-retries", src->max_rtx_retries, NULL);
389
390 if ((ssrc & 1) == 0) {
391 GST_INFO_OBJECT (src, "Saving jitterbuffer for session %u ssrc %u",
392 session, ssrc);
393
394 g_clear_object (&src->jitterbuffer);
395 src->jitterbuffer = gst_object_ref (jitterbuffer);
396 src->rtp_ssrc = ssrc;
397 }
398 }
399
400 static void
gst_rist_src_init(GstRistSrc * src)401 gst_rist_src_init (GstRistSrc * src)
402 {
403 GstPad *pad, *gpad;
404 GstStructure *sdes = NULL;
405 RistReceiverBond *bond;
406
407 g_mutex_init (&src->bonds_lock);
408 src->bonds = g_ptr_array_new ();
409
410 /* Construct the RIST RTP receiver pipeline.
411 *
412 * udpsrc -> [recv_rtp_sink_%u] -------- [recv_rtp_src_%u_%u_%u]
413 * | rtpbin |
414 * udpsrc -> [recv_rtcp_sink_%u] -------- [send_rtcp_src_%u] -> udpsink
415 *
416 * This pipeline is fixed for now, note that optionally an FEC stream could
417 * be added later.
418 */
419 src->srcpad = gst_ghost_pad_new_no_target_from_template ("src",
420 gst_static_pad_template_get (&src_templ));
421 gst_element_add_pad (GST_ELEMENT (src), src->srcpad);
422
423 src->rtpbin = gst_element_factory_make ("rtpbin", "rist_recv_rtpbin");
424 if (!src->rtpbin) {
425 src->missing_plugin = "rtpmanager";
426 goto missing_plugin;
427 }
428
429 /* RIST specification says the SDES should only contain the CNAME */
430 g_object_get (src->rtpbin, "sdes", &sdes, NULL);
431 gst_structure_remove_field (sdes, "tool");
432
433 gst_bin_add (GST_BIN (src), src->rtpbin);
434 g_object_set (src->rtpbin, "do-retransmission", TRUE,
435 "rtp-profile", 3 /* AVPF */ ,
436 "sdes", sdes, NULL);
437 gst_structure_free (sdes);
438
439 g_signal_connect_object (src->rtpbin, "request-pt-map",
440 G_CALLBACK (gst_rist_src_request_pt_map), src, G_CONNECT_SWAPPED);
441 g_signal_connect_object (src->rtpbin, "request-aux-receiver",
442 G_CALLBACK (gst_rist_src_request_aux_receiver), src, G_CONNECT_SWAPPED);
443
444 src->rtxbin = gst_bin_new ("rist_recv_rtxbin");
445 g_object_ref_sink (src->rtxbin);
446
447 src->rtx_funnel = gst_element_factory_make ("funnel", "rist_rtx_funnel");
448 gst_bin_add (GST_BIN (src->rtxbin), src->rtx_funnel);
449
450 src->rtpdeext = gst_element_factory_make ("ristrtpdeext", "rist_rtp_de_ext");
451 gst_bin_add (GST_BIN (src->rtxbin), src->rtpdeext);
452 gst_element_link (src->rtx_funnel, src->rtpdeext);
453
454 pad = gst_element_get_static_pad (src->rtpdeext, "src");
455 gpad = gst_ghost_pad_new ("src_0", pad);
456 gst_object_unref (pad);
457 gst_element_add_pad (src->rtxbin, gpad);
458
459 g_signal_connect_object (src->rtpbin, "pad-added",
460 G_CALLBACK (gst_rist_src_pad_added), src, G_CONNECT_SWAPPED);
461 g_signal_connect_object (src->rtpbin, "on-new-ssrc",
462 G_CALLBACK (gst_rist_src_on_new_ssrc), src, G_CONNECT_SWAPPED);
463 g_signal_connect_object (src->rtpbin, "new-jitterbuffer",
464 G_CALLBACK (gst_rist_src_new_jitterbuffer), src, G_CONNECT_SWAPPED);
465
466 bond = gst_rist_src_add_bond (src);
467 if (!bond)
468 goto missing_plugin;
469
470 return;
471
472 missing_plugin:
473 {
474 GST_ERROR_OBJECT (src, "'%s' plugin is missing.", src->missing_plugin);
475 src->construct_failed = TRUE;
476 }
477 }
478
479 static void
gst_rist_src_handle_message(GstBin * bin,GstMessage * message)480 gst_rist_src_handle_message (GstBin * bin, GstMessage * message)
481 {
482 switch (GST_MESSAGE_TYPE (message)) {
483 case GST_MESSAGE_STREAM_START:
484 case GST_MESSAGE_EOS:
485 /* drop stream-start & eos from our internal udp sink(s);
486 https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1368 */
487 gst_message_unref (message);
488 break;
489 default:
490 GST_BIN_CLASS (gst_rist_src_parent_class)->handle_message (bin, message);
491 break;
492 }
493 }
494
495 static GstPadProbeReturn
gst_rist_src_on_recv_rtcp(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)496 gst_rist_src_on_recv_rtcp (GstPad * pad, GstPadProbeInfo * info,
497 gpointer user_data)
498 {
499 GstRistSrc *src = GST_RIST_SRC (user_data);
500 GstBuffer *buffer;
501 GstNetAddressMeta *meta;
502 GstElement *rtcp_src;
503 RistReceiverBond *bond = NULL;
504 gint i;
505
506 rtcp_src = GST_ELEMENT (gst_pad_get_parent (pad));
507
508 g_mutex_lock (&src->bonds_lock);
509
510 for (i = 0; i < src->bonds->len; i++) {
511 RistReceiverBond *b = g_ptr_array_index (src->bonds, i);
512 if (b->rtcp_src == rtcp_src) {
513 bond = b;
514 break;
515 }
516 }
517 gst_object_unref (rtcp_src);
518
519 if (!bond) {
520 GST_WARNING_OBJECT (src, "Unexpected RTCP source.");
521 g_mutex_unlock (&src->bonds_lock);
522 return GST_PAD_PROBE_OK;
523 }
524
525 if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) {
526 GstBufferList *buffer_list = info->data;
527 buffer = gst_buffer_list_get (buffer_list, 0);
528 } else {
529 buffer = info->data;
530 }
531
532 meta = gst_buffer_get_net_address_meta (buffer);
533
534 g_clear_object (&bond->rtcp_send_addr);
535 bond->rtcp_send_addr = g_object_ref (meta->addr);
536
537 g_mutex_unlock (&src->bonds_lock);
538
539 return GST_PAD_PROBE_OK;
540 }
541
542 /* called with bonds lock */
543 static inline void
gst_rist_src_attach_net_address_meta(RistReceiverBond * bond,GstBuffer * buffer)544 gst_rist_src_attach_net_address_meta (RistReceiverBond * bond,
545 GstBuffer * buffer)
546 {
547 if (bond->rtcp_send_addr)
548 gst_buffer_add_net_address_meta (buffer, bond->rtcp_send_addr);
549 }
550
551 static GstPadProbeReturn
gst_rist_src_on_send_rtcp(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)552 gst_rist_src_on_send_rtcp (GstPad * pad, GstPadProbeInfo * info,
553 gpointer user_data)
554 {
555 GstRistSrc *src = GST_RIST_SRC (user_data);
556 GstElement *rtcp_sink;
557 RistReceiverBond *bond = NULL;
558 gint i;
559
560 rtcp_sink = GST_ELEMENT (gst_pad_get_parent (pad));
561
562 g_mutex_lock (&src->bonds_lock);
563
564 for (i = 0; i < src->bonds->len; i++) {
565 RistReceiverBond *b = g_ptr_array_index (src->bonds, i);
566 if (b->rtcp_sink == rtcp_sink) {
567 bond = b;
568 break;
569 }
570 }
571 gst_object_unref (rtcp_sink);
572
573 if (!bond) {
574 GST_WARNING_OBJECT (src, "Unexpected RTCP sink.");
575 g_mutex_unlock (&src->bonds_lock);
576 return GST_PAD_PROBE_OK;
577 }
578
579 if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) {
580 GstBufferList *buffer_list = info->data;
581 GstBuffer *buffer;
582 gint i;
583
584 info->data = buffer_list = gst_buffer_list_make_writable (buffer_list);
585 for (i = 0; i < gst_buffer_list_length (buffer_list); i++) {
586 buffer = gst_buffer_list_get (buffer_list, i);
587 gst_rist_src_attach_net_address_meta (bond, buffer);
588 }
589 } else {
590 GstBuffer *buffer = info->data;
591 info->data = buffer = gst_buffer_make_writable (buffer);
592 gst_rist_src_attach_net_address_meta (bond, buffer);
593 }
594
595 g_mutex_unlock (&src->bonds_lock);
596
597 return GST_PAD_PROBE_OK;
598 }
599
600 static gboolean
gst_rist_src_setup_rtcp_socket(GstRistSrc * src,RistReceiverBond * bond)601 gst_rist_src_setup_rtcp_socket (GstRistSrc * src, RistReceiverBond * bond)
602 {
603 GstPad *pad;
604 GSocket *socket = NULL;
605 GInetAddress *iaddr = NULL;
606 guint port = bond->port + 1;
607 GError *error = NULL;
608
609 g_object_get (bond->rtcp_src, "used-socket", &socket, NULL);
610 if (!socket)
611 return GST_STATE_CHANGE_FAILURE;
612
613 iaddr = g_inet_address_new_from_string (bond->address);
614 if (!iaddr) {
615 GList *results;
616 GResolver *resolver = NULL;
617
618 resolver = g_resolver_get_default ();
619 results = g_resolver_lookup_by_name (resolver, bond->address, NULL, &error);
620
621 if (!results) {
622 g_object_unref (resolver);
623 goto dns_resolve_failed;
624 }
625
626 iaddr = G_INET_ADDRESS (g_object_ref (results->data));
627
628 g_resolver_free_addresses (results);
629 g_object_unref (resolver);
630 }
631
632 if (g_inet_address_get_is_multicast (iaddr)) {
633 /* mc-ttl is not supported by dynudpsink */
634 g_socket_set_multicast_ttl (socket, src->multicast_ttl);
635 /* In multicast, send RTCP to the multicast group */
636 bond->rtcp_send_addr = g_inet_socket_address_new (iaddr, port);
637 } else {
638 /* In unicast, send RTCP to the detected sender address */
639 pad = gst_element_get_static_pad (bond->rtcp_src, "src");
640 bond->rtcp_recv_probe = gst_pad_add_probe (pad,
641 GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
642 gst_rist_src_on_recv_rtcp, src, NULL);
643 gst_object_unref (pad);
644 }
645 g_object_unref (iaddr);
646
647 pad = gst_element_get_static_pad (bond->rtcp_sink, "sink");
648 bond->rtcp_send_probe = gst_pad_add_probe (pad,
649 GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
650 gst_rist_src_on_send_rtcp, src, NULL);
651 gst_object_unref (pad);
652
653 if (bond->multicast_iface) {
654 #ifdef SO_BINDTODEVICE
655 if (setsockopt (g_socket_get_fd (socket), SOL_SOCKET,
656 SO_BINDTODEVICE, bond->multicast_iface,
657 strlen (bond->multicast_iface)) < 0)
658 GST_WARNING_OBJECT (src, "setsockopt SO_BINDTODEVICE failed: %s",
659 strerror (errno));
660 #else
661 GST_WARNING_OBJECT (src, "Tried to set a multicast interface while"
662 " GStreamer was compiled on a platform without SO_BINDTODEVICE");
663 #endif
664 }
665
666
667 /* share the socket created by the source */
668 g_object_set (bond->rtcp_sink, "socket", socket, "close-socket", FALSE, NULL);
669 g_object_unref (socket);
670
671 gst_element_set_locked_state (bond->rtcp_sink, FALSE);
672 gst_element_sync_state_with_parent (bond->rtcp_sink);
673
674 return GST_STATE_CHANGE_SUCCESS;
675
676 dns_resolve_failed:
677 GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND,
678 ("Could not resolve hostname '%s'", GST_STR_NULL (bond->address)),
679 ("DNS resolver reported: %s", error->message));
680 g_error_free (error);
681 return GST_STATE_CHANGE_FAILURE;
682
683 }
684
685 static GstStateChangeReturn
gst_rist_src_start(GstRistSrc * src)686 gst_rist_src_start (GstRistSrc * src)
687 {
688 gint i;
689
690 if (src->construct_failed) {
691 GST_ELEMENT_ERROR (src, CORE, MISSING_PLUGIN,
692 ("Your GStreamer installation is missing plugin '%s'",
693 src->missing_plugin), (NULL));
694 return GST_STATE_CHANGE_FAILURE;
695 }
696
697 for (i = 0; i < src->bonds->len; i++) {
698 RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
699 GObject *session = NULL;
700
701 g_signal_emit_by_name (src->rtpbin, "get-session", i, &session);
702 g_object_set (session, "rtcp-min-interval", src->min_rtcp_interval,
703 "rtcp-fraction", src->max_rtcp_bandwidth, NULL);
704 g_object_unref (session);
705
706 if (!gst_rist_src_setup_rtcp_socket (src, bond))
707 return GST_STATE_CHANGE_FAILURE;
708 }
709
710 return GST_STATE_CHANGE_SUCCESS;
711 }
712
713 static GstStructure *
gst_rist_src_create_stats(GstRistSrc * src)714 gst_rist_src_create_stats (GstRistSrc * src)
715 {
716 GstStructure *ret;
717 GValueArray *session_stats;
718 guint64 total_dropped = 0, total_received = 0, recovered = 0, lost = 0;
719 guint64 duplicates = 0, rtx_sent = 0, rtt = 0;
720 gint i;
721
722 ret = gst_structure_new_empty ("rist/x-receiver-stats");
723 session_stats = g_value_array_new (src->bonds->len);
724
725 for (i = 0; i < src->bonds->len; i++) {
726 GObject *session = NULL, *source = NULL;
727 GstStructure *sstats = NULL, *stats;
728 const gchar *rtp_from = NULL, *rtcp_from = NULL;
729 guint64 dropped = 0, received = 0;
730 GValue value = G_VALUE_INIT;
731
732 g_signal_emit_by_name (src->rtpbin, "get-internal-session", i, &session);
733 if (!session)
734 continue;
735
736 stats = gst_structure_new_empty ("rist/x-receiver-session-stats");
737
738 g_signal_emit_by_name (session, "get-source-by-ssrc", src->rtp_ssrc,
739 &source);
740 if (source) {
741 gint packet_lost;
742 g_object_get (source, "stats", &sstats, NULL);
743 gst_structure_get_int (sstats, "packets-lost", &packet_lost);
744 dropped = MAX (packet_lost, 0);
745 gst_structure_get_uint64 (sstats, "packets-received", &received);
746 rtp_from = gst_structure_get_string (sstats, "rtp-from");
747 rtcp_from = gst_structure_get_string (sstats, "rtcp-from");
748 }
749 g_object_unref (session);
750
751 gst_structure_set (stats, "session-id", G_TYPE_INT, i,
752 "rtp-from", G_TYPE_STRING, rtp_from ? rtp_from : "",
753 "rtcp-from", G_TYPE_STRING, rtcp_from ? rtcp_from : "",
754 "dropped", G_TYPE_UINT64, MAX (dropped, 0),
755 "received", G_TYPE_UINT64, received, NULL);
756
757 if (sstats)
758 gst_structure_free (sstats);
759 g_clear_object (&source);
760
761 g_value_init (&value, GST_TYPE_STRUCTURE);
762 g_value_take_boxed (&value, stats);
763 g_value_array_append (session_stats, &value);
764 g_value_unset (&value);
765
766 total_dropped += dropped;
767 }
768
769 if (src->jitterbuffer) {
770 GstStructure *stats;
771 g_object_get (src->jitterbuffer, "stats", &stats, NULL);
772 gst_structure_get (stats,
773 "num-pushed", G_TYPE_UINT64, &total_received,
774 "num-lost", G_TYPE_UINT64, &lost,
775 "rtx-count", G_TYPE_UINT64, &rtx_sent,
776 "num-duplicates", G_TYPE_UINT64, &duplicates,
777 "rtx-success-count", G_TYPE_UINT64, &recovered,
778 "rtx-rtt", G_TYPE_UINT64, &rtt, NULL);
779 gst_structure_free (stats);
780 }
781
782 gst_structure_set (ret, "dropped", G_TYPE_UINT64, total_dropped,
783 "received", G_TYPE_UINT64, total_received,
784 "recovered", G_TYPE_UINT64, recovered,
785 "permanently-lost", G_TYPE_UINT64, lost,
786 "duplicates", G_TYPE_UINT64, duplicates,
787 "retransmission-requests-sent", G_TYPE_UINT64, rtx_sent,
788 "rtx-roundtrip-time", G_TYPE_UINT64, rtt,
789 "session-stats", G_TYPE_VALUE_ARRAY, session_stats, NULL);
790 g_value_array_free (session_stats);
791
792 return ret;
793 }
794
795 static gboolean
gst_rist_src_dump_stats(GstClock * clock,GstClockTime time,GstClockID id,gpointer user_data)796 gst_rist_src_dump_stats (GstClock * clock, GstClockTime time, GstClockID id,
797 gpointer user_data)
798 {
799 GstRistSrc *src = GST_RIST_SRC (user_data);
800 GstStructure *stats = gst_rist_src_create_stats (src);
801
802 gst_println ("%s: %" GST_PTR_FORMAT, GST_OBJECT_NAME (src), stats);
803
804 gst_structure_free (stats);
805 return TRUE;
806 }
807
808 static void
gst_rist_src_enable_stats_interval(GstRistSrc * src)809 gst_rist_src_enable_stats_interval (GstRistSrc * src)
810 {
811 GstClock *clock;
812 GstClockTime start, interval;
813
814 if (src->stats_interval == 0)
815 return;
816
817 interval = src->stats_interval * GST_MSECOND;
818 clock = gst_system_clock_obtain ();
819 start = gst_clock_get_time (clock) + interval;
820
821 src->stats_cid = gst_clock_new_periodic_id (clock, start, interval);
822 gst_clock_id_wait_async (src->stats_cid, gst_rist_src_dump_stats,
823 gst_object_ref (src), (GDestroyNotify) gst_object_unref);
824
825 gst_object_unref (clock);
826 }
827
828 static void
gst_rist_src_disable_stats_interval(GstRistSrc * src)829 gst_rist_src_disable_stats_interval (GstRistSrc * src)
830 {
831 if (src->stats_cid) {
832 gst_clock_id_unschedule (src->stats_cid);
833 gst_clock_id_unref (src->stats_cid);
834 src->stats_cid = NULL;
835 }
836 }
837
838 static void
gst_rist_src_stop(GstRistSrc * src)839 gst_rist_src_stop (GstRistSrc * src)
840 {
841 GstPad *pad;
842 gint i;
843
844 for (i = 0; i < src->bonds->len; i++) {
845 RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
846
847 if (bond->rtcp_recv_probe) {
848 pad = gst_element_get_static_pad (bond->rtcp_src, "src");
849 gst_pad_remove_probe (pad, bond->rtcp_recv_probe);
850 bond->rtcp_recv_probe = 0;
851 gst_object_unref (pad);
852 }
853
854 if (bond->rtcp_send_probe) {
855 pad = gst_element_get_static_pad (bond->rtcp_sink, "sink");
856 gst_pad_remove_probe (pad, bond->rtcp_send_probe);
857 bond->rtcp_send_probe = 0;
858 gst_object_unref (pad);
859 }
860 }
861 }
862
863 static GstStateChangeReturn
gst_rist_src_change_state(GstElement * element,GstStateChange transition)864 gst_rist_src_change_state (GstElement * element, GstStateChange transition)
865 {
866 GstRistSrc *src = GST_RIST_SRC (element);
867 GstStateChangeReturn ret;
868
869 switch (transition) {
870 case GST_STATE_CHANGE_PAUSED_TO_READY:
871 gst_rist_src_disable_stats_interval (src);
872 break;
873 default:
874 break;
875 }
876
877 ret = GST_ELEMENT_CLASS (gst_rist_src_parent_class)->change_state (element,
878 transition);
879
880 switch (transition) {
881 case GST_STATE_CHANGE_NULL_TO_READY:
882 gst_rist_src_start (src);
883 break;
884 case GST_STATE_CHANGE_READY_TO_PAUSED:
885 gst_rist_src_enable_stats_interval (src);
886 break;
887 case GST_STATE_CHANGE_READY_TO_NULL:
888 gst_rist_src_stop (src);
889 break;
890 default:
891 break;
892 }
893
894 return ret;
895 }
896
897 /* called with bonds lock */
898 static void
gst_rist_src_update_bond_address(GstRistSrc * src,RistReceiverBond * bond,const gchar * address,guint port,const gchar * multicast_iface)899 gst_rist_src_update_bond_address (GstRistSrc * src, RistReceiverBond * bond,
900 const gchar * address, guint port, const gchar * multicast_iface)
901 {
902 g_free (bond->address);
903 g_free (bond->multicast_iface);
904 bond->address = g_strdup (address);
905 bond->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
906 bond->port = port;
907
908 g_object_set (G_OBJECT (bond->rtp_src), "address", address, "port", port,
909 "multicast-iface", bond->multicast_iface, NULL);
910 g_object_set (G_OBJECT (bond->rtcp_src), "address", address,
911 "port", port + 1, "multicast-iface", bond->multicast_iface, NULL);
912
913 /* TODO add runtime support
914 * - add blocking the pad probe
915 * - update RTCP socket
916 * - cycle elements through NULL state
917 */
918 }
919
920 /* called with bonds lock */
921 static gchar *
gst_rist_src_get_bonds(GstRistSrc * src)922 gst_rist_src_get_bonds (GstRistSrc * src)
923 {
924 GString *bonds = g_string_new ("");
925 gint i;
926
927 for (i = 0; i < src->bonds->len; i++) {
928 RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
929 if (bonds->len > 0)
930 g_string_append_c (bonds, ':');
931
932 g_string_append_printf (bonds, "%s:%u", bond->address, bond->port);
933
934 if (bond->multicast_iface)
935 g_string_append_printf (bonds, "/%s", bond->multicast_iface);
936 }
937
938 return g_string_free (bonds, FALSE);
939 }
940
941 struct RistAddress
942 {
943 gchar *address;
944 char *multicast_iface;
945 guint port;
946 };
947
948 /* called with bonds lock */
949 static void
gst_rist_src_set_bonds(GstRistSrc * src,const gchar * bonds)950 gst_rist_src_set_bonds (GstRistSrc * src, const gchar * bonds)
951 {
952 GStrv tokens = NULL;
953 struct RistAddress *addrs;
954 gint i;
955
956 if (bonds == NULL)
957 goto missing_address;
958
959 tokens = g_strsplit (bonds, ",", 0);
960 if (tokens[0] == NULL)
961 goto missing_address;
962
963 addrs = g_new0 (struct RistAddress, g_strv_length (tokens));
964
965 /* parse the address list */
966 for (i = 0; tokens[i]; i++) {
967 gchar *address = tokens[i];
968 char *port_ptr, *iface_ptr, *endptr;
969 guint port;
970
971 port_ptr = g_utf8_strrchr (address, -1, ':');
972 iface_ptr = g_utf8_strrchr (address, -1, '/');
973
974 if (!port_ptr)
975 goto bad_parameter;
976 if (!g_ascii_isdigit (port_ptr[1]))
977 goto bad_parameter;
978
979 if (iface_ptr) {
980 if (iface_ptr < port_ptr)
981 goto bad_parameter;
982 iface_ptr[0] = '\0';
983 }
984
985 port = strtol (port_ptr + 1, &endptr, 0);
986 if (endptr[0] != '\0')
987 goto bad_parameter;
988
989 /* port must be a multiple of 2 between 2 and 65534 */
990 if (port < 2 || (port & 1) || port > G_MAXUINT16)
991 goto invalid_port;
992
993 port_ptr[0] = '\0';
994 addrs[i].port = port;
995 addrs[i].address = g_strstrip (address);
996 if (iface_ptr)
997 addrs[i].multicast_iface = g_strstrip (iface_ptr + 1);
998 }
999
1000 /* configure the bonds */
1001 for (i = 0; tokens[i]; i++) {
1002 RistReceiverBond *bond = NULL;
1003
1004 if (i < src->bonds->len)
1005 bond = g_ptr_array_index (src->bonds, i);
1006 else
1007 bond = gst_rist_src_add_bond (src);
1008
1009 gst_rist_src_update_bond_address (src, bond, addrs[i].address,
1010 addrs[i].port, addrs[i].multicast_iface);
1011 }
1012
1013 g_strfreev (tokens);
1014 return;
1015
1016 missing_address:
1017 g_warning ("'bonding-addresses' cannot be empty");
1018 g_strfreev (tokens);
1019 return;
1020
1021 bad_parameter:
1022 g_warning ("Failed to parse address '%s", tokens[i]);
1023 g_strfreev (tokens);
1024 g_free (addrs);
1025 return;
1026
1027 invalid_port:
1028 g_warning ("RIST port must valid UDP port and a multiple of 2.");
1029 g_strfreev (tokens);
1030 g_free (addrs);
1031 return;
1032 }
1033
1034 static void
gst_rist_src_set_multicast_loopback(GstRistSrc * src,gboolean loop)1035 gst_rist_src_set_multicast_loopback (GstRistSrc * src, gboolean loop)
1036 {
1037 gint i;
1038
1039 src->multicast_loopback = loop;
1040 for (i = 0; i < src->bonds->len; i++) {
1041 RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
1042 g_object_set (G_OBJECT (bond->rtp_src), "loop", loop, NULL);
1043 g_object_set (G_OBJECT (bond->rtcp_src), "loop", loop, NULL);
1044 }
1045 }
1046
1047 static void
gst_rist_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1048 gst_rist_src_get_property (GObject * object, guint prop_id,
1049 GValue * value, GParamSpec * pspec)
1050 {
1051 GstRistSrc *src = GST_RIST_SRC (object);
1052 GstStructure *sdes;
1053 RistReceiverBond *bond;
1054
1055 if (src->construct_failed)
1056 return;
1057
1058 g_mutex_lock (&src->bonds_lock);
1059
1060 bond = g_ptr_array_index (src->bonds, 0);
1061
1062 switch (prop_id) {
1063 case PROP_ADDRESS:
1064 g_value_set_string (value, bond->address);
1065 break;
1066
1067 case PROP_PORT:
1068 g_value_set_uint (value, bond->port);
1069 break;
1070
1071 case PROP_RECEIVER_BUFFER:
1072 g_object_get_property (G_OBJECT (src->rtpbin), "latency", value);
1073 break;
1074
1075 case PROP_REORDER_SECTION:
1076 g_value_set_uint (value, src->reorder_section);
1077 break;
1078
1079 case PROP_MAX_RTX_RETRIES:
1080 g_value_set_uint (value, src->max_rtx_retries);
1081 break;
1082
1083 case PROP_MIN_RTCP_INTERVAL:
1084 g_value_set_uint (value, (guint) (src->min_rtcp_interval / GST_MSECOND));
1085 break;
1086
1087 case PROP_MAX_RTCP_BANDWIDTH:
1088 g_value_set_double (value, src->max_rtcp_bandwidth);
1089 break;
1090
1091 case PROP_STATS_UPDATE_INTERVAL:
1092 g_value_set_uint (value, src->stats_interval);
1093 break;
1094
1095 case PROP_STATS:
1096 g_value_take_boxed (value, gst_rist_src_create_stats (src));
1097 break;
1098
1099 case PROP_CNAME:
1100 g_object_get (src->rtpbin, "sdes", &sdes, NULL);
1101 g_value_set_string (value, gst_structure_get_string (sdes, "cname"));
1102 gst_structure_free (sdes);
1103 break;
1104
1105 case PROP_MULTICAST_LOOPBACK:
1106 g_value_set_boolean (value, src->multicast_loopback);
1107 break;
1108
1109 case PROP_MULTICAST_IFACE:
1110 g_value_set_string (value, bond->multicast_iface);
1111 break;
1112
1113 case PROP_MULTICAST_TTL:
1114 g_value_set_int (value, src->multicast_ttl);
1115 break;
1116
1117 case PROP_BONDING_ADDRESSES:
1118 g_value_take_string (value, gst_rist_src_get_bonds (src));
1119 break;
1120
1121 default:
1122 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1123 break;
1124 }
1125
1126 g_mutex_unlock (&src->bonds_lock);
1127 }
1128
1129 static void
gst_rist_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1130 gst_rist_src_set_property (GObject * object, guint prop_id,
1131 const GValue * value, GParamSpec * pspec)
1132 {
1133 GstRistSrc *src = GST_RIST_SRC (object);
1134 GstStructure *sdes;
1135 RistReceiverBond *bond;
1136
1137 if (src->construct_failed)
1138 return;
1139
1140 g_mutex_lock (&src->bonds_lock);
1141
1142 bond = g_ptr_array_index (src->bonds, 0);
1143
1144 switch (prop_id) {
1145 case PROP_ADDRESS:
1146 g_free (bond->address);
1147 bond->address = g_value_dup_string (value);
1148 g_object_set_property (G_OBJECT (bond->rtp_src), "address", value);
1149 g_object_set_property (G_OBJECT (bond->rtcp_src), "address", value);
1150 break;
1151
1152 case PROP_PORT:{
1153 guint port = g_value_get_uint (value);
1154
1155 /* According to 5.1.1, RTP receiver port most be even number and RTCP
1156 * port should be the RTP port + 1 */
1157
1158 if (port & 0x1) {
1159 g_warning ("Invalid RIST port %u, should be an even number.", port);
1160 return;
1161 }
1162
1163 bond->port = port;
1164 g_object_set (bond->rtp_src, "port", port, NULL);
1165 g_object_set (bond->rtcp_src, "port", port + 1, NULL);
1166 break;
1167 }
1168
1169 case PROP_RECEIVER_BUFFER:
1170 g_object_set (src->rtpbin, "latency", g_value_get_uint (value), NULL);
1171 break;
1172
1173 case PROP_REORDER_SECTION:
1174 src->reorder_section = g_value_get_uint (value);
1175 break;
1176
1177 case PROP_MAX_RTX_RETRIES:
1178 src->max_rtx_retries = g_value_get_uint (value);
1179 break;
1180
1181 case PROP_MIN_RTCP_INTERVAL:
1182 src->min_rtcp_interval = g_value_get_uint (value) * GST_MSECOND;
1183 break;
1184
1185 case PROP_MAX_RTCP_BANDWIDTH:
1186 src->max_rtcp_bandwidth = g_value_get_double (value);
1187 break;
1188
1189 case PROP_STATS_UPDATE_INTERVAL:
1190 src->stats_interval = g_value_get_uint (value);
1191 break;
1192
1193 case PROP_CNAME:
1194 g_object_get (src->rtpbin, "sdes", &sdes, NULL);
1195 gst_structure_set_value (sdes, "cname", value);
1196 g_object_set (src->rtpbin, "sdes", sdes, NULL);
1197 gst_structure_free (sdes);
1198 break;
1199
1200 case PROP_MULTICAST_LOOPBACK:
1201 gst_rist_src_set_multicast_loopback (src, g_value_get_boolean (value));
1202 break;
1203
1204 case PROP_MULTICAST_IFACE:
1205 g_free (bond->multicast_iface);
1206 bond->multicast_iface = g_value_dup_string (value);
1207 g_object_set_property (G_OBJECT (bond->rtp_src),
1208 "multicast-iface", value);
1209 g_object_set_property (G_OBJECT (bond->rtcp_src),
1210 "multicast-iface", value);
1211 break;
1212
1213 case PROP_MULTICAST_TTL:
1214 src->multicast_ttl = g_value_get_int (value);
1215 break;
1216
1217 case PROP_BONDING_ADDRESSES:
1218 gst_rist_src_set_bonds (src, g_value_get_string (value));
1219 break;
1220
1221 default:
1222 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1223 break;
1224 }
1225
1226 g_mutex_unlock (&src->bonds_lock);
1227 }
1228
1229 static void
gst_rist_src_finalize(GObject * object)1230 gst_rist_src_finalize (GObject * object)
1231 {
1232 GstRistSrc *src = GST_RIST_SRC (object);
1233 gint i;
1234
1235 g_mutex_lock (&src->bonds_lock);
1236
1237 for (i = 0; i < src->bonds->len; i++) {
1238 RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
1239 g_free (bond->address);
1240 g_free (bond->multicast_iface);
1241 g_clear_object (&bond->rtcp_send_addr);
1242 g_slice_free (RistReceiverBond, bond);
1243 }
1244 g_ptr_array_free (src->bonds, TRUE);
1245
1246 g_clear_object (&src->jitterbuffer);
1247 g_clear_object (&src->rtxbin);
1248
1249 g_mutex_unlock (&src->bonds_lock);
1250 g_mutex_clear (&src->bonds_lock);
1251
1252 G_OBJECT_CLASS (gst_rist_src_parent_class)->finalize (object);
1253 }
1254
1255 static void
gst_rist_src_class_init(GstRistSrcClass * klass)1256 gst_rist_src_class_init (GstRistSrcClass * klass)
1257 {
1258 GstBinClass *bin_class = (GstBinClass *) klass;
1259 GstElementClass *element_class = (GstElementClass *) klass;
1260 GObjectClass *object_class = (GObjectClass *) klass;
1261
1262 gst_element_class_set_metadata (element_class,
1263 "RIST Source", "Source/Network",
1264 "Source that implements RIST TR-06-1 streaming specification",
1265 "Nicolas Dufresne <nicolas.dufresne@collabora.com");
1266 gst_element_class_add_static_pad_template (element_class, &src_templ);
1267
1268 bin_class->handle_message = gst_rist_src_handle_message;
1269
1270 element_class->change_state = gst_rist_src_change_state;
1271
1272 object_class->get_property = gst_rist_src_get_property;
1273 object_class->set_property = gst_rist_src_set_property;
1274 object_class->finalize = gst_rist_src_finalize;
1275
1276 g_object_class_install_property (object_class, PROP_ADDRESS,
1277 g_param_spec_string ("address", "Address",
1278 "Address to receive packets from (can be IPv4 or IPv6).", "0.0.0.0",
1279 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1280
1281 g_object_class_install_property (object_class, PROP_PORT,
1282 g_param_spec_uint ("port", "Port", "The port to listen for RTP packets, "
1283 "the RTCP port is this value + 1. This port must be an even number.",
1284 2, 65534, 5004,
1285 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1286
1287 g_object_class_install_property (object_class, PROP_RECEIVER_BUFFER,
1288 g_param_spec_uint ("receiver-buffer", "Receiver Buffer",
1289 "Buffering duration (in ms)", 0, G_MAXUINT, 1000,
1290 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1291
1292 g_object_class_install_property (object_class, PROP_REORDER_SECTION,
1293 g_param_spec_uint ("reorder-section", "Recorder Section",
1294 "Time to wait before sending retransmission request (in ms)",
1295 0, G_MAXUINT, 70,
1296 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1297
1298 g_object_class_install_property (object_class, PROP_MAX_RTX_RETRIES,
1299 g_param_spec_uint ("max-rtx-retries", "Maximum Retransmission Retries",
1300 "The maximum number of retransmission requests for a lost packet.",
1301 0, G_MAXUINT, 7,
1302 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1303
1304 g_object_class_install_property (object_class, PROP_MIN_RTCP_INTERVAL,
1305 g_param_spec_uint ("min-rtcp-interval", "Minimum RTCP Intercal",
1306 "The minimum interval (in ms) between two successive RTCP packets",
1307 0, 100, 100,
1308 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1309
1310 g_object_class_install_property (object_class, PROP_MAX_RTCP_BANDWIDTH,
1311 g_param_spec_double ("max-rtcp-bandwidth", "Maximum RTCP Bandwidth",
1312 "The maximum bandwidth used for RTCP as a fraction of RTP bandwdith",
1313 0.0, 0.05, 0.05,
1314 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1315
1316 g_object_class_install_property (object_class, PROP_STATS_UPDATE_INTERVAL,
1317 g_param_spec_uint ("stats-update-interval", "Statistics Update Interval",
1318 "The interval between 'stats' update notification (in ms) (0 disabled)",
1319 0, G_MAXUINT, 0,
1320 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1321
1322 g_object_class_install_property (object_class, PROP_STATS,
1323 g_param_spec_boxed ("stats", "Statistics",
1324 "Statistic in a GstStructure named 'rist/x-receiver-stats'",
1325 GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1326
1327 g_object_class_install_property (object_class, PROP_CNAME,
1328 g_param_spec_string ("cname", "CName",
1329 "Set the CNAME in the SDES block of the receiver report.", NULL,
1330 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
1331 GST_PARAM_DOC_SHOW_DEFAULT));
1332
1333 g_object_class_install_property (object_class, PROP_MULTICAST_LOOPBACK,
1334 g_param_spec_boolean ("multicast-loopback", "Multicast Loopback",
1335 "When enabled, the packets will be received locally.", FALSE,
1336 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1337
1338 g_object_class_install_property (object_class, PROP_MULTICAST_IFACE,
1339 g_param_spec_string ("multicast-iface", "multicast-iface",
1340 "The multicast interface to use to send packets.", NULL,
1341 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1342
1343 g_object_class_install_property (object_class, PROP_MULTICAST_TTL,
1344 g_param_spec_int ("multicast-ttl", "Multicast TTL",
1345 "The multicast time-to-live parameter.", 0, 255, 1,
1346 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1347
1348 g_object_class_install_property (object_class, PROP_BONDING_ADDRESSES,
1349 g_param_spec_string ("bonding-addresses", "Bonding Addresses",
1350 "Comma (,) separated list of <address>:<port> to receive from. "
1351 "Only used if 'enable-bonding' is set.", NULL,
1352 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1353 }
1354
1355 static GstURIType
gst_rist_src_uri_get_type(GType type)1356 gst_rist_src_uri_get_type (GType type)
1357 {
1358 return GST_URI_SRC;
1359 }
1360
1361 static const gchar *const *
gst_rist_src_uri_get_protocols(GType type)1362 gst_rist_src_uri_get_protocols (GType type)
1363 {
1364 static const char *protocols[] = { "rist", NULL };
1365 return protocols;
1366 }
1367
1368 static gchar *
gst_rist_src_uri_get_uri(GstURIHandler * handler)1369 gst_rist_src_uri_get_uri (GstURIHandler * handler)
1370 {
1371 GstRistSrc *src = GST_RIST_SRC (handler);
1372 gchar *uri = NULL;
1373
1374 GST_OBJECT_LOCK (src);
1375 if (src->uri)
1376 uri = gst_uri_to_string (src->uri);
1377 GST_OBJECT_UNLOCK (src);
1378
1379 return uri;
1380 }
1381
1382 static void
gst_rist_src_uri_query_foreach(const gchar * key,const gchar * value,GObject * src)1383 gst_rist_src_uri_query_foreach (const gchar * key, const gchar * value,
1384 GObject * src)
1385 {
1386 if (g_str_equal (key, "async-handling")) {
1387 GST_WARNING_OBJECT (src, "Setting '%s' property from URI is not allowed.",
1388 key);
1389 return;
1390 }
1391
1392 GST_DEBUG_OBJECT (src, "Setting property '%s' to '%s'", key, value);
1393 gst_util_set_object_arg (src, key, value);
1394 }
1395
1396 static gboolean
gst_rist_src_uri_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)1397 gst_rist_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
1398 GError ** error)
1399 {
1400 GstRistSrc *src = GST_RIST_SRC (handler);
1401 GstUri *gsturi;
1402 GHashTable *query_table;
1403
1404 if (GST_STATE (src) >= GST_STATE_PAUSED) {
1405 g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
1406 "Changing the URI on ristsrc when it is running is not supported");
1407 GST_ERROR_OBJECT (src, "%s", (*error)->message);
1408 return FALSE;
1409 }
1410
1411 if (!(gsturi = gst_uri_from_string (uri))) {
1412 g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
1413 "Could not parse URI");
1414 GST_ERROR_OBJECT (src, "%s", (*error)->message);
1415 gst_uri_unref (gsturi);
1416 return FALSE;
1417 }
1418
1419 GST_OBJECT_LOCK (src);
1420 if (src->uri)
1421 gst_uri_unref (src->uri);
1422 src->uri = gst_uri_ref (gsturi);
1423 GST_OBJECT_UNLOCK (src);
1424
1425 g_object_set (src, "address", gst_uri_get_host (gsturi), NULL);
1426 if (gst_uri_get_port (gsturi))
1427 g_object_set (src, "port", gst_uri_get_port (gsturi), NULL);
1428
1429 query_table = gst_uri_get_query_table (gsturi);
1430 if (query_table)
1431 g_hash_table_foreach (query_table,
1432 (GHFunc) gst_rist_src_uri_query_foreach, src);
1433
1434 gst_uri_unref (gsturi);
1435 return TRUE;
1436 }
1437
1438 static void
gst_rist_src_uri_init(gpointer g_iface,gpointer iface_data)1439 gst_rist_src_uri_init (gpointer g_iface, gpointer iface_data)
1440 {
1441 GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1442
1443 iface->get_type = gst_rist_src_uri_get_type;
1444 iface->get_protocols = gst_rist_src_uri_get_protocols;
1445 iface->get_uri = gst_rist_src_uri_get_uri;
1446 iface->set_uri = gst_rist_src_uri_set_uri;
1447 }
1448