• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer RIST plugin
2  * Copyright (C) 2019 Net Insight AB
3  *     Author: Nicolas Dufresne <nicolas.dufresne@collabora.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 
21 /**
22  * SECTION:element-ristsrc
23  * @title: ristsrc
24  * @see_also: ristsink
25  *
26  * This element implements RIST TR-06-1 Simple Profile receiver. The stream
27  * produced by this element will be RTP payloaded. This element also implements
28  * the URI scheme `rist://` allowing to render RIST streams in GStreamer based
29  * media players. The RIST URI handler also allows setting properties through
30  * the URI query.
31  *
32  * It also implements part of the RIST TR-06-2 Main Profile receiver. The
33  * tunneling, multiplexing and encryption parts of the specification are not
34  * included. This element will accept the RIST RTP header extension and restore
35  * the null MPEG-TS packets if the extension is included. It will not currently
36  * use the sequence number extension when sending RTCP NACK requests.
37  *
38  * ## Example gst-launch line
39  * |[
40  * gst-launch-1.0 ristsrc address=0.0.0.0 port=5004 ! rtpmp2tdepay ! udpsink
41  * gst-play-1.0 "rist://0.0.0.0:5004?receiver-buffer=700"
42  * ]|
43  *
44  * Additionally, this element supports link bonding, which means it
45  * can receive the same stream from multiple addresses. Each address
46  * will be mapped to its own RTP session. In order to enable bonding
47  * support, one need to configure the list of addresses through
48  * "bonding-addresses" properties.
49  *
50  * ## Example gst-launch line for bonding
51  * |[
52  * gst-launch-1.0 ristsrc bonding-addresses="10.0.0.1:5004,11.0.0.1:5006" ! rtpmp2tdepay ! udpsink
53  * gst-play-1.0 "rist://0.0.0.0:5004?bonding-addresses=10.0.0.1:5004,11.0.0.1:5006"
54  * ]|
55  */
56 
57 /* using GValueArray, which has not replacement */
58 #define GLIB_DISABLE_DEPRECATION_WARNINGS
59 
60 #ifdef HAVE_CONFIG_H
61 #include "config.h"
62 #endif
63 
64 #include <gio/gio.h>
65 #include <gst/net/net.h>
66 #include <gst/rtp/rtp.h>
67 
68 /* for strtol() */
69 #include <stdlib.h>
70 
71 #ifdef HAVE_SYS_SOCKET_H
72 #include <sys/socket.h>
73 #endif
74 
75 #include "gstrist.h"
76 
77 GST_DEBUG_CATEGORY_STATIC (gst_rist_src_debug);
78 #define GST_CAT_DEFAULT gst_rist_src_debug
79 
80 enum
81 {
82   PROP_ADDRESS = 1,
83   PROP_PORT,
84   PROP_RECEIVER_BUFFER,
85   PROP_REORDER_SECTION,
86   PROP_MAX_RTX_RETRIES,
87   PROP_MIN_RTCP_INTERVAL,
88   PROP_MAX_RTCP_BANDWIDTH,
89   PROP_STATS_UPDATE_INTERVAL,
90   PROP_STATS,
91   PROP_CNAME,
92   PROP_MULTICAST_LOOPBACK,
93   PROP_MULTICAST_IFACE,
94   PROP_MULTICAST_TTL,
95   PROP_BONDING_ADDRESSES
96 };
97 
98 static GstStaticPadTemplate src_templ = GST_STATIC_PAD_TEMPLATE ("src",
99     GST_PAD_SRC,
100     GST_PAD_ALWAYS,
101     GST_STATIC_CAPS ("application/x-rtp"));
102 
103 
104 typedef struct
105 {
106   guint session;
107   gchar *address;
108   gchar *multicast_iface;
109   guint port;
110 
111   GstElement *rtcp_src;
112   GstElement *rtp_src;
113   GstElement *rtcp_sink;
114   GstElement *rtx_receive;
115   gulong rtcp_recv_probe;
116   gulong rtcp_send_probe;
117   GSocketAddress *rtcp_send_addr;
118 
119 } RistReceiverBond;
120 
121 struct _GstRistSrc
122 {
123   GstBin parent;
124 
125   GstUri *uri;
126 
127   /* Common elements in the pipeline */
128   GstElement *rtpbin;
129   GstPad *srcpad;
130   GstElement *rtxbin;
131   GstElement *rtx_funnel;
132   GstElement *rtpdeext;
133 
134   /* Common properties, protected by bonds_lock */
135   guint reorder_section;
136   guint max_rtx_retries;
137   GstClockTime min_rtcp_interval;
138   gdouble max_rtcp_bandwidth;
139   gint multicast_loopback;
140   gint multicast_ttl;
141 
142   /* Bonds */
143   GPtrArray *bonds;
144   /* this is needed as setting sibling properties will try to take the object
145    * lock. Thus, any properties that affects the bonds will be protected with
146    * that lock instead of the object lock. */
147   GMutex bonds_lock;
148 
149   /* For stats */
150   guint stats_interval;
151   guint32 rtp_ssrc;
152   GstClockID stats_cid;
153   GstElement *jitterbuffer;
154 
155   /* This is set whenever there is a pipeline construction failure, and used
156    * to fail state changes later */
157   gboolean construct_failed;
158   const gchar *missing_plugin;
159 };
160 
161 static void gst_rist_src_uri_init (gpointer g_iface, gpointer iface_data);
162 
163 G_DEFINE_TYPE_WITH_CODE (GstRistSrc, gst_rist_src, GST_TYPE_BIN,
164     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_rist_src_uri_init);
165     GST_DEBUG_CATEGORY_INIT (gst_rist_src_debug, "ristsrc", 0, "RIST Source"));
166 GST_ELEMENT_REGISTER_DEFINE (ristsrc, "ristsrc", GST_RANK_PRIMARY,
167     GST_TYPE_RIST_SRC);
168 
169 /* called with bonds lock */
170 static RistReceiverBond *
gst_rist_src_add_bond(GstRistSrc * src)171 gst_rist_src_add_bond (GstRistSrc * src)
172 {
173   RistReceiverBond *bond = g_slice_new0 (RistReceiverBond);
174   GstPad *pad, *gpad;
175   gchar name[32];
176 
177   bond->session = src->bonds->len;
178   bond->address = g_strdup ("0.0.0.0");
179 
180   g_snprintf (name, 32, "rist_rtx_receive%u", bond->session);
181   bond->rtx_receive = gst_element_factory_make ("ristrtxreceive", name);
182   gst_bin_add (GST_BIN (src->rtxbin), bond->rtx_receive);
183 
184   g_snprintf (name, 32, "sink_%u", bond->session);
185   gst_element_link_pads (bond->rtx_receive, "src", src->rtx_funnel, name);
186 
187   g_snprintf (name, 32, "sink_%u", bond->session);
188   pad = gst_element_get_static_pad (bond->rtx_receive, "sink");
189   gpad = gst_ghost_pad_new (name, pad);
190   gst_object_unref (pad);
191   gst_element_add_pad (src->rtxbin, gpad);
192 
193   g_snprintf (name, 32, "rist_rtp_udpsrc%u", bond->session);
194   bond->rtp_src = gst_element_factory_make ("udpsrc", name);
195   g_snprintf (name, 32, "rist_rtcp_udpsrc%u", bond->session);
196   bond->rtcp_src = gst_element_factory_make ("udpsrc", name);
197   g_snprintf (name, 32, "rist_rtcp_dynudpsink%u", bond->session);
198   bond->rtcp_sink = gst_element_factory_make ("dynudpsink", name);
199   if (!bond->rtp_src || !bond->rtcp_src || !bond->rtcp_sink) {
200     g_clear_object (&bond->rtp_src);
201     g_clear_object (&bond->rtcp_src);
202     g_clear_object (&bond->rtcp_sink);
203     g_slice_free (RistReceiverBond, bond);
204     src->missing_plugin = "udp";
205     return NULL;
206   }
207   gst_bin_add_many (GST_BIN (src), bond->rtp_src, bond->rtcp_src,
208       bond->rtcp_sink, NULL);
209   g_object_set (bond->rtcp_sink, "sync", FALSE, "async", FALSE, NULL);
210   gst_element_set_locked_state (bond->rtcp_sink, TRUE);
211 
212   g_snprintf (name, 32, "recv_rtp_sink_%u", bond->session);
213   gst_element_link_pads (bond->rtp_src, "src", src->rtpbin, name);
214   g_snprintf (name, 32, "recv_rtcp_sink_%u", bond->session);
215   gst_element_link_pads (bond->rtcp_src, "src", src->rtpbin, name);
216   g_snprintf (name, 32, "send_rtcp_src_%u", bond->session);
217   gst_element_link_pads (src->rtpbin, name, bond->rtcp_sink, "sink");
218 
219   g_ptr_array_add (src->bonds, bond);
220   return bond;
221 }
222 
223 static void
gst_rist_src_pad_added(GstRistSrc * src,GstPad * new_pad,GstElement * rtpbin)224 gst_rist_src_pad_added (GstRistSrc * src, GstPad * new_pad, GstElement * rtpbin)
225 {
226   GST_TRACE_OBJECT (src, "New pad '%s'.", GST_PAD_NAME (new_pad));
227 
228   if (g_str_has_prefix (GST_PAD_NAME (new_pad), "recv_rtp_src_0_")) {
229     GST_DEBUG_OBJECT (src, "Using new pad '%s' as ghost pad target.",
230         GST_PAD_NAME (new_pad));
231     gst_ghost_pad_set_target (GST_GHOST_PAD (src->srcpad), new_pad);
232   }
233 }
234 
235 static GstCaps *
gst_rist_src_request_pt_map(GstRistSrc * src,guint session_id,guint pt,GstElement * rtpbin)236 gst_rist_src_request_pt_map (GstRistSrc * src, guint session_id, guint pt,
237     GstElement * rtpbin)
238 {
239   const GstRTPPayloadInfo *pt_info;
240   GstCaps *ret;
241 
242   pt_info = gst_rtp_payload_info_for_pt (pt);
243   if (!pt_info || !pt_info->clock_rate)
244     return NULL;
245 
246   ret = gst_caps_new_simple ("application/x-rtp",
247       "media", G_TYPE_STRING, pt_info->media,
248       "encoding_name", G_TYPE_STRING, pt_info->encoding_name,
249       "clock-rate", G_TYPE_INT, (gint) pt_info->clock_rate, NULL);
250 
251   /* FIXME add sprop-parameter-set if any */
252   g_warn_if_fail (pt_info->encoding_parameters == NULL);
253 
254   return ret;
255 }
256 
257 static GstElement *
gst_rist_src_request_aux_receiver(GstRistSrc * src,guint session_id,GstElement * rtpbin)258 gst_rist_src_request_aux_receiver (GstRistSrc * src, guint session_id,
259     GstElement * rtpbin)
260 {
261   return gst_object_ref (src->rtxbin);
262 }
263 
264 /* Overrides the nack creation. Right now we don't send mixed NACKS type, we
265  * simply send a set of range NACK if it takes less space, or allow adding
266  * more seqnum. */
267 static guint
gst_rist_src_on_sending_nacks(GObject * session,guint sender_ssrc,guint media_ssrc,GArray * nacks,GstBuffer * buffer,gpointer user_data)268 gst_rist_src_on_sending_nacks (GObject * session, guint sender_ssrc,
269     guint media_ssrc, GArray * nacks, GstBuffer * buffer, gpointer user_data)
270 {
271   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
272   GstRTCPPacket packet;
273   guint8 *app_data;
274   guint nacked_seqnums = 0;
275   guint range_size = 0;
276   guint n_rg_nacks = 0;
277   guint n_fb_nacks = 0;
278   guint16 seqnum;
279   guint i;
280   gint diff;
281 
282   /* We'll assume that range will be best, and find how many generic NACK
283    * would have been created. If this number ends up being smaller, we will
284    * just remove the APP packet and return 0, leaving it to RTPSession to
285    * create the generic NACK.*/
286 
287   gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &rtcp);
288   if (!gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_APP, &packet))
289     /* exit because the packet is full, will put next request in a
290      * further packet */
291     goto done;
292 
293   gst_rtcp_packet_app_set_ssrc (&packet, media_ssrc);
294   gst_rtcp_packet_app_set_name (&packet, "RIST");
295 
296   if (!gst_rtcp_packet_app_set_data_length (&packet, 1)) {
297     gst_rtcp_packet_remove (&packet);
298     GST_WARNING ("no range nacks fit in the packet");
299     goto done;
300   }
301 
302   app_data = gst_rtcp_packet_app_get_data (&packet);
303   for (i = 0; i < nacks->len; i = nacked_seqnums) {
304     guint j;
305     seqnum = g_array_index (nacks, guint16, i);
306 
307     if (!gst_rtcp_packet_app_set_data_length (&packet, n_rg_nacks + 1))
308       break;
309 
310     n_rg_nacks++;
311     nacked_seqnums++;
312 
313     for (j = i + 1; j < nacks->len; j++) {
314       guint16 next_seqnum = g_array_index (nacks, guint16, j);
315       diff = gst_rtp_buffer_compare_seqnum (seqnum, next_seqnum);
316       GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, next_seqnum, diff);
317       if (diff > (j - i))
318         break;
319 
320       nacked_seqnums++;
321     }
322 
323     range_size = j - i - 1;
324     GST_WRITE_UINT32_BE (app_data, seqnum << 16 | range_size);
325     app_data += 4;
326   }
327 
328   /* count how many FB NACK it would take to wrap nacked_seqnums */
329   seqnum = g_array_index (nacks, guint16, 0);
330   n_fb_nacks = 1;
331   for (i = 1; i < nacked_seqnums; i++) {
332     guint16 next_seqnum = g_array_index (nacks, guint16, i);
333     diff = gst_rtp_buffer_compare_seqnum (seqnum, next_seqnum);
334     if (diff > 16) {
335       n_fb_nacks++;
336       seqnum = next_seqnum;
337     }
338   }
339 
340   if (n_fb_nacks <= n_rg_nacks) {
341     GST_DEBUG ("Not sending %u range nacks, as %u FB nacks will be smaller",
342         n_rg_nacks, n_fb_nacks);
343     gst_rtcp_packet_remove (&packet);
344     nacked_seqnums = 0;
345     goto done;
346   }
347 
348   GST_DEBUG ("Sent %u seqnums into %u Range NACKs", nacked_seqnums, n_rg_nacks);
349 
350 done:
351   gst_rtcp_buffer_unmap (&rtcp);
352   return nacked_seqnums;
353 }
354 
355 static void
gst_rist_src_on_new_ssrc(GstRistSrc * src,guint session_id,guint ssrc,GstElement * rtpbin)356 gst_rist_src_on_new_ssrc (GstRistSrc * src, guint session_id, guint ssrc,
357     GstElement * rtpbin)
358 {
359   GObject *session = NULL;
360   GObject *source = NULL;
361 
362   g_signal_emit_by_name (rtpbin, "get-internal-session", session_id, &session);
363   g_signal_emit_by_name (session, "get-source-by-ssrc", ssrc, &source);
364 
365   if (ssrc & 1) {
366     GST_DEBUG ("Disabling RTCP and probation on RTX stream "
367         "(SSRC %u on session %u)", ssrc, session_id);
368     g_object_set (source, "disable-rtcp", TRUE, "probation", 0, NULL);
369   } else {
370     g_signal_connect (session, "on-sending-nacks",
371         (GCallback) gst_rist_src_on_sending_nacks, NULL);
372   }
373 
374   g_object_unref (source);
375   g_object_unref (session);
376 }
377 
378 static void
gst_rist_src_new_jitterbuffer(GstRistSrc * src,GstElement * jitterbuffer,guint session,guint ssrc,GstElement * rtpbin)379 gst_rist_src_new_jitterbuffer (GstRistSrc * src, GstElement * jitterbuffer,
380     guint session, guint ssrc, GstElement * rtpbin)
381 {
382   if (session != 0) {
383     GST_WARNING_OBJECT (rtpbin, "Unexpected jitterbuffer created.");
384     return;
385   }
386 
387   g_object_set (jitterbuffer, "rtx-delay", src->reorder_section,
388       "rtx-max-retries", src->max_rtx_retries, NULL);
389 
390   if ((ssrc & 1) == 0) {
391     GST_INFO_OBJECT (src, "Saving jitterbuffer for session %u ssrc %u",
392         session, ssrc);
393 
394     g_clear_object (&src->jitterbuffer);
395     src->jitterbuffer = gst_object_ref (jitterbuffer);
396     src->rtp_ssrc = ssrc;
397   }
398 }
399 
400 static void
gst_rist_src_init(GstRistSrc * src)401 gst_rist_src_init (GstRistSrc * src)
402 {
403   GstPad *pad, *gpad;
404   GstStructure *sdes = NULL;
405   RistReceiverBond *bond;
406 
407   g_mutex_init (&src->bonds_lock);
408   src->bonds = g_ptr_array_new ();
409 
410   /* Construct the RIST RTP receiver pipeline.
411    *
412    * udpsrc -> [recv_rtp_sink_%u]  --------  [recv_rtp_src_%u_%u_%u]
413    *                              | rtpbin |
414    * udpsrc -> [recv_rtcp_sink_%u] --------  [send_rtcp_src_%u] -> udpsink
415    *
416    * This pipeline is fixed for now, note that optionally an FEC stream could
417    * be added later.
418    */
419   src->srcpad = gst_ghost_pad_new_no_target_from_template ("src",
420       gst_static_pad_template_get (&src_templ));
421   gst_element_add_pad (GST_ELEMENT (src), src->srcpad);
422 
423   src->rtpbin = gst_element_factory_make ("rtpbin", "rist_recv_rtpbin");
424   if (!src->rtpbin) {
425     src->missing_plugin = "rtpmanager";
426     goto missing_plugin;
427   }
428 
429   /* RIST specification says the SDES should only contain the CNAME */
430   g_object_get (src->rtpbin, "sdes", &sdes, NULL);
431   gst_structure_remove_field (sdes, "tool");
432 
433   gst_bin_add (GST_BIN (src), src->rtpbin);
434   g_object_set (src->rtpbin, "do-retransmission", TRUE,
435       "rtp-profile", 3 /* AVPF */ ,
436       "sdes", sdes, NULL);
437   gst_structure_free (sdes);
438 
439   g_signal_connect_object (src->rtpbin, "request-pt-map",
440       G_CALLBACK (gst_rist_src_request_pt_map), src, G_CONNECT_SWAPPED);
441   g_signal_connect_object (src->rtpbin, "request-aux-receiver",
442       G_CALLBACK (gst_rist_src_request_aux_receiver), src, G_CONNECT_SWAPPED);
443 
444   src->rtxbin = gst_bin_new ("rist_recv_rtxbin");
445   g_object_ref_sink (src->rtxbin);
446 
447   src->rtx_funnel = gst_element_factory_make ("funnel", "rist_rtx_funnel");
448   gst_bin_add (GST_BIN (src->rtxbin), src->rtx_funnel);
449 
450   src->rtpdeext = gst_element_factory_make ("ristrtpdeext", "rist_rtp_de_ext");
451   gst_bin_add (GST_BIN (src->rtxbin), src->rtpdeext);
452   gst_element_link (src->rtx_funnel, src->rtpdeext);
453 
454   pad = gst_element_get_static_pad (src->rtpdeext, "src");
455   gpad = gst_ghost_pad_new ("src_0", pad);
456   gst_object_unref (pad);
457   gst_element_add_pad (src->rtxbin, gpad);
458 
459   g_signal_connect_object (src->rtpbin, "pad-added",
460       G_CALLBACK (gst_rist_src_pad_added), src, G_CONNECT_SWAPPED);
461   g_signal_connect_object (src->rtpbin, "on-new-ssrc",
462       G_CALLBACK (gst_rist_src_on_new_ssrc), src, G_CONNECT_SWAPPED);
463   g_signal_connect_object (src->rtpbin, "new-jitterbuffer",
464       G_CALLBACK (gst_rist_src_new_jitterbuffer), src, G_CONNECT_SWAPPED);
465 
466   bond = gst_rist_src_add_bond (src);
467   if (!bond)
468     goto missing_plugin;
469 
470   return;
471 
472 missing_plugin:
473   {
474     GST_ERROR_OBJECT (src, "'%s' plugin is missing.", src->missing_plugin);
475     src->construct_failed = TRUE;
476   }
477 }
478 
479 static void
gst_rist_src_handle_message(GstBin * bin,GstMessage * message)480 gst_rist_src_handle_message (GstBin * bin, GstMessage * message)
481 {
482   switch (GST_MESSAGE_TYPE (message)) {
483     case GST_MESSAGE_STREAM_START:
484     case GST_MESSAGE_EOS:
485       /* drop stream-start & eos from our internal udp sink(s);
486          https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1368 */
487       gst_message_unref (message);
488       break;
489     default:
490       GST_BIN_CLASS (gst_rist_src_parent_class)->handle_message (bin, message);
491       break;
492   }
493 }
494 
495 static GstPadProbeReturn
gst_rist_src_on_recv_rtcp(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)496 gst_rist_src_on_recv_rtcp (GstPad * pad, GstPadProbeInfo * info,
497     gpointer user_data)
498 {
499   GstRistSrc *src = GST_RIST_SRC (user_data);
500   GstBuffer *buffer;
501   GstNetAddressMeta *meta;
502   GstElement *rtcp_src;
503   RistReceiverBond *bond = NULL;
504   gint i;
505 
506   rtcp_src = GST_ELEMENT (gst_pad_get_parent (pad));
507 
508   g_mutex_lock (&src->bonds_lock);
509 
510   for (i = 0; i < src->bonds->len; i++) {
511     RistReceiverBond *b = g_ptr_array_index (src->bonds, i);
512     if (b->rtcp_src == rtcp_src) {
513       bond = b;
514       break;
515     }
516   }
517   gst_object_unref (rtcp_src);
518 
519   if (!bond) {
520     GST_WARNING_OBJECT (src, "Unexpected RTCP source.");
521     g_mutex_unlock (&src->bonds_lock);
522     return GST_PAD_PROBE_OK;
523   }
524 
525   if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) {
526     GstBufferList *buffer_list = info->data;
527     buffer = gst_buffer_list_get (buffer_list, 0);
528   } else {
529     buffer = info->data;
530   }
531 
532   meta = gst_buffer_get_net_address_meta (buffer);
533 
534   g_clear_object (&bond->rtcp_send_addr);
535   bond->rtcp_send_addr = g_object_ref (meta->addr);
536 
537   g_mutex_unlock (&src->bonds_lock);
538 
539   return GST_PAD_PROBE_OK;
540 }
541 
542 /* called with bonds lock */
543 static inline void
gst_rist_src_attach_net_address_meta(RistReceiverBond * bond,GstBuffer * buffer)544 gst_rist_src_attach_net_address_meta (RistReceiverBond * bond,
545     GstBuffer * buffer)
546 {
547   if (bond->rtcp_send_addr)
548     gst_buffer_add_net_address_meta (buffer, bond->rtcp_send_addr);
549 }
550 
551 static GstPadProbeReturn
gst_rist_src_on_send_rtcp(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)552 gst_rist_src_on_send_rtcp (GstPad * pad, GstPadProbeInfo * info,
553     gpointer user_data)
554 {
555   GstRistSrc *src = GST_RIST_SRC (user_data);
556   GstElement *rtcp_sink;
557   RistReceiverBond *bond = NULL;
558   gint i;
559 
560   rtcp_sink = GST_ELEMENT (gst_pad_get_parent (pad));
561 
562   g_mutex_lock (&src->bonds_lock);
563 
564   for (i = 0; i < src->bonds->len; i++) {
565     RistReceiverBond *b = g_ptr_array_index (src->bonds, i);
566     if (b->rtcp_sink == rtcp_sink) {
567       bond = b;
568       break;
569     }
570   }
571   gst_object_unref (rtcp_sink);
572 
573   if (!bond) {
574     GST_WARNING_OBJECT (src, "Unexpected RTCP sink.");
575     g_mutex_unlock (&src->bonds_lock);
576     return GST_PAD_PROBE_OK;
577   }
578 
579   if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) {
580     GstBufferList *buffer_list = info->data;
581     GstBuffer *buffer;
582     gint i;
583 
584     info->data = buffer_list = gst_buffer_list_make_writable (buffer_list);
585     for (i = 0; i < gst_buffer_list_length (buffer_list); i++) {
586       buffer = gst_buffer_list_get (buffer_list, i);
587       gst_rist_src_attach_net_address_meta (bond, buffer);
588     }
589   } else {
590     GstBuffer *buffer = info->data;
591     info->data = buffer = gst_buffer_make_writable (buffer);
592     gst_rist_src_attach_net_address_meta (bond, buffer);
593   }
594 
595   g_mutex_unlock (&src->bonds_lock);
596 
597   return GST_PAD_PROBE_OK;
598 }
599 
600 static gboolean
gst_rist_src_setup_rtcp_socket(GstRistSrc * src,RistReceiverBond * bond)601 gst_rist_src_setup_rtcp_socket (GstRistSrc * src, RistReceiverBond * bond)
602 {
603   GstPad *pad;
604   GSocket *socket = NULL;
605   GInetAddress *iaddr = NULL;
606   guint port = bond->port + 1;
607   GError *error = NULL;
608 
609   g_object_get (bond->rtcp_src, "used-socket", &socket, NULL);
610   if (!socket)
611     return GST_STATE_CHANGE_FAILURE;
612 
613   iaddr = g_inet_address_new_from_string (bond->address);
614   if (!iaddr) {
615     GList *results;
616     GResolver *resolver = NULL;
617 
618     resolver = g_resolver_get_default ();
619     results = g_resolver_lookup_by_name (resolver, bond->address, NULL, &error);
620 
621     if (!results) {
622       g_object_unref (resolver);
623       goto dns_resolve_failed;
624     }
625 
626     iaddr = G_INET_ADDRESS (g_object_ref (results->data));
627 
628     g_resolver_free_addresses (results);
629     g_object_unref (resolver);
630   }
631 
632   if (g_inet_address_get_is_multicast (iaddr)) {
633     /* mc-ttl is not supported by dynudpsink */
634     g_socket_set_multicast_ttl (socket, src->multicast_ttl);
635     /* In multicast, send RTCP to the multicast group */
636     bond->rtcp_send_addr = g_inet_socket_address_new (iaddr, port);
637   } else {
638     /* In unicast, send RTCP to the detected sender address */
639     pad = gst_element_get_static_pad (bond->rtcp_src, "src");
640     bond->rtcp_recv_probe = gst_pad_add_probe (pad,
641         GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
642         gst_rist_src_on_recv_rtcp, src, NULL);
643     gst_object_unref (pad);
644   }
645   g_object_unref (iaddr);
646 
647   pad = gst_element_get_static_pad (bond->rtcp_sink, "sink");
648   bond->rtcp_send_probe = gst_pad_add_probe (pad,
649       GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
650       gst_rist_src_on_send_rtcp, src, NULL);
651   gst_object_unref (pad);
652 
653   if (bond->multicast_iface) {
654 #ifdef SO_BINDTODEVICE
655     if (setsockopt (g_socket_get_fd (socket), SOL_SOCKET,
656             SO_BINDTODEVICE, bond->multicast_iface,
657             strlen (bond->multicast_iface)) < 0)
658       GST_WARNING_OBJECT (src, "setsockopt SO_BINDTODEVICE failed: %s",
659           strerror (errno));
660 #else
661     GST_WARNING_OBJECT (src, "Tried to set a multicast interface while"
662         " GStreamer was compiled on a platform without SO_BINDTODEVICE");
663 #endif
664   }
665 
666 
667   /* share the socket created by the source */
668   g_object_set (bond->rtcp_sink, "socket", socket, "close-socket", FALSE, NULL);
669   g_object_unref (socket);
670 
671   gst_element_set_locked_state (bond->rtcp_sink, FALSE);
672   gst_element_sync_state_with_parent (bond->rtcp_sink);
673 
674   return GST_STATE_CHANGE_SUCCESS;
675 
676 dns_resolve_failed:
677   GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND,
678       ("Could not resolve hostname '%s'", GST_STR_NULL (bond->address)),
679       ("DNS resolver reported: %s", error->message));
680   g_error_free (error);
681   return GST_STATE_CHANGE_FAILURE;
682 
683 }
684 
685 static GstStateChangeReturn
gst_rist_src_start(GstRistSrc * src)686 gst_rist_src_start (GstRistSrc * src)
687 {
688   gint i;
689 
690   if (src->construct_failed) {
691     GST_ELEMENT_ERROR (src, CORE, MISSING_PLUGIN,
692         ("Your GStreamer installation is missing plugin '%s'",
693             src->missing_plugin), (NULL));
694     return GST_STATE_CHANGE_FAILURE;
695   }
696 
697   for (i = 0; i < src->bonds->len; i++) {
698     RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
699     GObject *session = NULL;
700 
701     g_signal_emit_by_name (src->rtpbin, "get-session", i, &session);
702     g_object_set (session, "rtcp-min-interval", src->min_rtcp_interval,
703         "rtcp-fraction", src->max_rtcp_bandwidth, NULL);
704     g_object_unref (session);
705 
706     if (!gst_rist_src_setup_rtcp_socket (src, bond))
707       return GST_STATE_CHANGE_FAILURE;
708   }
709 
710   return GST_STATE_CHANGE_SUCCESS;
711 }
712 
713 static GstStructure *
gst_rist_src_create_stats(GstRistSrc * src)714 gst_rist_src_create_stats (GstRistSrc * src)
715 {
716   GstStructure *ret;
717   GValueArray *session_stats;
718   guint64 total_dropped = 0, total_received = 0, recovered = 0, lost = 0;
719   guint64 duplicates = 0, rtx_sent = 0, rtt = 0;
720   gint i;
721 
722   ret = gst_structure_new_empty ("rist/x-receiver-stats");
723   session_stats = g_value_array_new (src->bonds->len);
724 
725   for (i = 0; i < src->bonds->len; i++) {
726     GObject *session = NULL, *source = NULL;
727     GstStructure *sstats = NULL, *stats;
728     const gchar *rtp_from = NULL, *rtcp_from = NULL;
729     guint64 dropped = 0, received = 0;
730     GValue value = G_VALUE_INIT;
731 
732     g_signal_emit_by_name (src->rtpbin, "get-internal-session", i, &session);
733     if (!session)
734       continue;
735 
736     stats = gst_structure_new_empty ("rist/x-receiver-session-stats");
737 
738     g_signal_emit_by_name (session, "get-source-by-ssrc", src->rtp_ssrc,
739         &source);
740     if (source) {
741       gint packet_lost;
742       g_object_get (source, "stats", &sstats, NULL);
743       gst_structure_get_int (sstats, "packets-lost", &packet_lost);
744       dropped = MAX (packet_lost, 0);
745       gst_structure_get_uint64 (sstats, "packets-received", &received);
746       rtp_from = gst_structure_get_string (sstats, "rtp-from");
747       rtcp_from = gst_structure_get_string (sstats, "rtcp-from");
748     }
749     g_object_unref (session);
750 
751     gst_structure_set (stats, "session-id", G_TYPE_INT, i,
752         "rtp-from", G_TYPE_STRING, rtp_from ? rtp_from : "",
753         "rtcp-from", G_TYPE_STRING, rtcp_from ? rtcp_from : "",
754         "dropped", G_TYPE_UINT64, MAX (dropped, 0),
755         "received", G_TYPE_UINT64, received, NULL);
756 
757     if (sstats)
758       gst_structure_free (sstats);
759     g_clear_object (&source);
760 
761     g_value_init (&value, GST_TYPE_STRUCTURE);
762     g_value_take_boxed (&value, stats);
763     g_value_array_append (session_stats, &value);
764     g_value_unset (&value);
765 
766     total_dropped += dropped;
767   }
768 
769   if (src->jitterbuffer) {
770     GstStructure *stats;
771     g_object_get (src->jitterbuffer, "stats", &stats, NULL);
772     gst_structure_get (stats,
773         "num-pushed", G_TYPE_UINT64, &total_received,
774         "num-lost", G_TYPE_UINT64, &lost,
775         "rtx-count", G_TYPE_UINT64, &rtx_sent,
776         "num-duplicates", G_TYPE_UINT64, &duplicates,
777         "rtx-success-count", G_TYPE_UINT64, &recovered,
778         "rtx-rtt", G_TYPE_UINT64, &rtt, NULL);
779     gst_structure_free (stats);
780   }
781 
782   gst_structure_set (ret, "dropped", G_TYPE_UINT64, total_dropped,
783       "received", G_TYPE_UINT64, total_received,
784       "recovered", G_TYPE_UINT64, recovered,
785       "permanently-lost", G_TYPE_UINT64, lost,
786       "duplicates", G_TYPE_UINT64, duplicates,
787       "retransmission-requests-sent", G_TYPE_UINT64, rtx_sent,
788       "rtx-roundtrip-time", G_TYPE_UINT64, rtt,
789       "session-stats", G_TYPE_VALUE_ARRAY, session_stats, NULL);
790   g_value_array_free (session_stats);
791 
792   return ret;
793 }
794 
795 static gboolean
gst_rist_src_dump_stats(GstClock * clock,GstClockTime time,GstClockID id,gpointer user_data)796 gst_rist_src_dump_stats (GstClock * clock, GstClockTime time, GstClockID id,
797     gpointer user_data)
798 {
799   GstRistSrc *src = GST_RIST_SRC (user_data);
800   GstStructure *stats = gst_rist_src_create_stats (src);
801 
802   gst_println ("%s: %" GST_PTR_FORMAT, GST_OBJECT_NAME (src), stats);
803 
804   gst_structure_free (stats);
805   return TRUE;
806 }
807 
808 static void
gst_rist_src_enable_stats_interval(GstRistSrc * src)809 gst_rist_src_enable_stats_interval (GstRistSrc * src)
810 {
811   GstClock *clock;
812   GstClockTime start, interval;
813 
814   if (src->stats_interval == 0)
815     return;
816 
817   interval = src->stats_interval * GST_MSECOND;
818   clock = gst_system_clock_obtain ();
819   start = gst_clock_get_time (clock) + interval;
820 
821   src->stats_cid = gst_clock_new_periodic_id (clock, start, interval);
822   gst_clock_id_wait_async (src->stats_cid, gst_rist_src_dump_stats,
823       gst_object_ref (src), (GDestroyNotify) gst_object_unref);
824 
825   gst_object_unref (clock);
826 }
827 
828 static void
gst_rist_src_disable_stats_interval(GstRistSrc * src)829 gst_rist_src_disable_stats_interval (GstRistSrc * src)
830 {
831   if (src->stats_cid) {
832     gst_clock_id_unschedule (src->stats_cid);
833     gst_clock_id_unref (src->stats_cid);
834     src->stats_cid = NULL;
835   }
836 }
837 
838 static void
gst_rist_src_stop(GstRistSrc * src)839 gst_rist_src_stop (GstRistSrc * src)
840 {
841   GstPad *pad;
842   gint i;
843 
844   for (i = 0; i < src->bonds->len; i++) {
845     RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
846 
847     if (bond->rtcp_recv_probe) {
848       pad = gst_element_get_static_pad (bond->rtcp_src, "src");
849       gst_pad_remove_probe (pad, bond->rtcp_recv_probe);
850       bond->rtcp_recv_probe = 0;
851       gst_object_unref (pad);
852     }
853 
854     if (bond->rtcp_send_probe) {
855       pad = gst_element_get_static_pad (bond->rtcp_sink, "sink");
856       gst_pad_remove_probe (pad, bond->rtcp_send_probe);
857       bond->rtcp_send_probe = 0;
858       gst_object_unref (pad);
859     }
860   }
861 }
862 
863 static GstStateChangeReturn
gst_rist_src_change_state(GstElement * element,GstStateChange transition)864 gst_rist_src_change_state (GstElement * element, GstStateChange transition)
865 {
866   GstRistSrc *src = GST_RIST_SRC (element);
867   GstStateChangeReturn ret;
868 
869   switch (transition) {
870     case GST_STATE_CHANGE_PAUSED_TO_READY:
871       gst_rist_src_disable_stats_interval (src);
872       break;
873     default:
874       break;
875   }
876 
877   ret = GST_ELEMENT_CLASS (gst_rist_src_parent_class)->change_state (element,
878       transition);
879 
880   switch (transition) {
881     case GST_STATE_CHANGE_NULL_TO_READY:
882       gst_rist_src_start (src);
883       break;
884     case GST_STATE_CHANGE_READY_TO_PAUSED:
885       gst_rist_src_enable_stats_interval (src);
886       break;
887     case GST_STATE_CHANGE_READY_TO_NULL:
888       gst_rist_src_stop (src);
889       break;
890     default:
891       break;
892   }
893 
894   return ret;
895 }
896 
897 /* called with bonds lock */
898 static void
gst_rist_src_update_bond_address(GstRistSrc * src,RistReceiverBond * bond,const gchar * address,guint port,const gchar * multicast_iface)899 gst_rist_src_update_bond_address (GstRistSrc * src, RistReceiverBond * bond,
900     const gchar * address, guint port, const gchar * multicast_iface)
901 {
902   g_free (bond->address);
903   g_free (bond->multicast_iface);
904   bond->address = g_strdup (address);
905   bond->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
906   bond->port = port;
907 
908   g_object_set (G_OBJECT (bond->rtp_src), "address", address, "port", port,
909       "multicast-iface", bond->multicast_iface, NULL);
910   g_object_set (G_OBJECT (bond->rtcp_src), "address", address,
911       "port", port + 1, "multicast-iface", bond->multicast_iface, NULL);
912 
913   /* TODO add runtime support
914    *  - add blocking the pad probe
915    *  - update RTCP socket
916    *  - cycle elements through NULL state
917    */
918 }
919 
920 /* called with bonds lock */
921 static gchar *
gst_rist_src_get_bonds(GstRistSrc * src)922 gst_rist_src_get_bonds (GstRistSrc * src)
923 {
924   GString *bonds = g_string_new ("");
925   gint i;
926 
927   for (i = 0; i < src->bonds->len; i++) {
928     RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
929     if (bonds->len > 0)
930       g_string_append_c (bonds, ':');
931 
932     g_string_append_printf (bonds, "%s:%u", bond->address, bond->port);
933 
934     if (bond->multicast_iface)
935       g_string_append_printf (bonds, "/%s", bond->multicast_iface);
936   }
937 
938   return g_string_free (bonds, FALSE);
939 }
940 
941 struct RistAddress
942 {
943   gchar *address;
944   char *multicast_iface;
945   guint port;
946 };
947 
948 /* called with bonds lock */
949 static void
gst_rist_src_set_bonds(GstRistSrc * src,const gchar * bonds)950 gst_rist_src_set_bonds (GstRistSrc * src, const gchar * bonds)
951 {
952   GStrv tokens = NULL;
953   struct RistAddress *addrs;
954   gint i;
955 
956   if (bonds == NULL)
957     goto missing_address;
958 
959   tokens = g_strsplit (bonds, ",", 0);
960   if (tokens[0] == NULL)
961     goto missing_address;
962 
963   addrs = g_new0 (struct RistAddress, g_strv_length (tokens));
964 
965   /* parse the address list */
966   for (i = 0; tokens[i]; i++) {
967     gchar *address = tokens[i];
968     char *port_ptr, *iface_ptr, *endptr;
969     guint port;
970 
971     port_ptr = g_utf8_strrchr (address, -1, ':');
972     iface_ptr = g_utf8_strrchr (address, -1, '/');
973 
974     if (!port_ptr)
975       goto bad_parameter;
976     if (!g_ascii_isdigit (port_ptr[1]))
977       goto bad_parameter;
978 
979     if (iface_ptr) {
980       if (iface_ptr < port_ptr)
981         goto bad_parameter;
982       iface_ptr[0] = '\0';
983     }
984 
985     port = strtol (port_ptr + 1, &endptr, 0);
986     if (endptr[0] != '\0')
987       goto bad_parameter;
988 
989     /* port must be a multiple of 2 between 2 and 65534 */
990     if (port < 2 || (port & 1) || port > G_MAXUINT16)
991       goto invalid_port;
992 
993     port_ptr[0] = '\0';
994     addrs[i].port = port;
995     addrs[i].address = g_strstrip (address);
996     if (iface_ptr)
997       addrs[i].multicast_iface = g_strstrip (iface_ptr + 1);
998   }
999 
1000   /* configure the bonds */
1001   for (i = 0; tokens[i]; i++) {
1002     RistReceiverBond *bond = NULL;
1003 
1004     if (i < src->bonds->len)
1005       bond = g_ptr_array_index (src->bonds, i);
1006     else
1007       bond = gst_rist_src_add_bond (src);
1008 
1009     gst_rist_src_update_bond_address (src, bond, addrs[i].address,
1010         addrs[i].port, addrs[i].multicast_iface);
1011   }
1012 
1013   g_strfreev (tokens);
1014   return;
1015 
1016 missing_address:
1017   g_warning ("'bonding-addresses' cannot be empty");
1018   g_strfreev (tokens);
1019   return;
1020 
1021 bad_parameter:
1022   g_warning ("Failed to parse address '%s", tokens[i]);
1023   g_strfreev (tokens);
1024   g_free (addrs);
1025   return;
1026 
1027 invalid_port:
1028   g_warning ("RIST port must valid UDP port and a multiple of 2.");
1029   g_strfreev (tokens);
1030   g_free (addrs);
1031   return;
1032 }
1033 
1034 static void
gst_rist_src_set_multicast_loopback(GstRistSrc * src,gboolean loop)1035 gst_rist_src_set_multicast_loopback (GstRistSrc * src, gboolean loop)
1036 {
1037   gint i;
1038 
1039   src->multicast_loopback = loop;
1040   for (i = 0; i < src->bonds->len; i++) {
1041     RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
1042     g_object_set (G_OBJECT (bond->rtp_src), "loop", loop, NULL);
1043     g_object_set (G_OBJECT (bond->rtcp_src), "loop", loop, NULL);
1044   }
1045 }
1046 
1047 static void
gst_rist_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1048 gst_rist_src_get_property (GObject * object, guint prop_id,
1049     GValue * value, GParamSpec * pspec)
1050 {
1051   GstRistSrc *src = GST_RIST_SRC (object);
1052   GstStructure *sdes;
1053   RistReceiverBond *bond;
1054 
1055   if (src->construct_failed)
1056     return;
1057 
1058   g_mutex_lock (&src->bonds_lock);
1059 
1060   bond = g_ptr_array_index (src->bonds, 0);
1061 
1062   switch (prop_id) {
1063     case PROP_ADDRESS:
1064       g_value_set_string (value, bond->address);
1065       break;
1066 
1067     case PROP_PORT:
1068       g_value_set_uint (value, bond->port);
1069       break;
1070 
1071     case PROP_RECEIVER_BUFFER:
1072       g_object_get_property (G_OBJECT (src->rtpbin), "latency", value);
1073       break;
1074 
1075     case PROP_REORDER_SECTION:
1076       g_value_set_uint (value, src->reorder_section);
1077       break;
1078 
1079     case PROP_MAX_RTX_RETRIES:
1080       g_value_set_uint (value, src->max_rtx_retries);
1081       break;
1082 
1083     case PROP_MIN_RTCP_INTERVAL:
1084       g_value_set_uint (value, (guint) (src->min_rtcp_interval / GST_MSECOND));
1085       break;
1086 
1087     case PROP_MAX_RTCP_BANDWIDTH:
1088       g_value_set_double (value, src->max_rtcp_bandwidth);
1089       break;
1090 
1091     case PROP_STATS_UPDATE_INTERVAL:
1092       g_value_set_uint (value, src->stats_interval);
1093       break;
1094 
1095     case PROP_STATS:
1096       g_value_take_boxed (value, gst_rist_src_create_stats (src));
1097       break;
1098 
1099     case PROP_CNAME:
1100       g_object_get (src->rtpbin, "sdes", &sdes, NULL);
1101       g_value_set_string (value, gst_structure_get_string (sdes, "cname"));
1102       gst_structure_free (sdes);
1103       break;
1104 
1105     case PROP_MULTICAST_LOOPBACK:
1106       g_value_set_boolean (value, src->multicast_loopback);
1107       break;
1108 
1109     case PROP_MULTICAST_IFACE:
1110       g_value_set_string (value, bond->multicast_iface);
1111       break;
1112 
1113     case PROP_MULTICAST_TTL:
1114       g_value_set_int (value, src->multicast_ttl);
1115       break;
1116 
1117     case PROP_BONDING_ADDRESSES:
1118       g_value_take_string (value, gst_rist_src_get_bonds (src));
1119       break;
1120 
1121     default:
1122       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1123       break;
1124   }
1125 
1126   g_mutex_unlock (&src->bonds_lock);
1127 }
1128 
1129 static void
gst_rist_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1130 gst_rist_src_set_property (GObject * object, guint prop_id,
1131     const GValue * value, GParamSpec * pspec)
1132 {
1133   GstRistSrc *src = GST_RIST_SRC (object);
1134   GstStructure *sdes;
1135   RistReceiverBond *bond;
1136 
1137   if (src->construct_failed)
1138     return;
1139 
1140   g_mutex_lock (&src->bonds_lock);
1141 
1142   bond = g_ptr_array_index (src->bonds, 0);
1143 
1144   switch (prop_id) {
1145     case PROP_ADDRESS:
1146       g_free (bond->address);
1147       bond->address = g_value_dup_string (value);
1148       g_object_set_property (G_OBJECT (bond->rtp_src), "address", value);
1149       g_object_set_property (G_OBJECT (bond->rtcp_src), "address", value);
1150       break;
1151 
1152     case PROP_PORT:{
1153       guint port = g_value_get_uint (value);
1154 
1155       /* According to 5.1.1, RTP receiver port most be even number and RTCP
1156        * port should be the RTP port + 1 */
1157 
1158       if (port & 0x1) {
1159         g_warning ("Invalid RIST port %u, should be an even number.", port);
1160         return;
1161       }
1162 
1163       bond->port = port;
1164       g_object_set (bond->rtp_src, "port", port, NULL);
1165       g_object_set (bond->rtcp_src, "port", port + 1, NULL);
1166       break;
1167     }
1168 
1169     case PROP_RECEIVER_BUFFER:
1170       g_object_set (src->rtpbin, "latency", g_value_get_uint (value), NULL);
1171       break;
1172 
1173     case PROP_REORDER_SECTION:
1174       src->reorder_section = g_value_get_uint (value);
1175       break;
1176 
1177     case PROP_MAX_RTX_RETRIES:
1178       src->max_rtx_retries = g_value_get_uint (value);
1179       break;
1180 
1181     case PROP_MIN_RTCP_INTERVAL:
1182       src->min_rtcp_interval = g_value_get_uint (value) * GST_MSECOND;
1183       break;
1184 
1185     case PROP_MAX_RTCP_BANDWIDTH:
1186       src->max_rtcp_bandwidth = g_value_get_double (value);
1187       break;
1188 
1189     case PROP_STATS_UPDATE_INTERVAL:
1190       src->stats_interval = g_value_get_uint (value);
1191       break;
1192 
1193     case PROP_CNAME:
1194       g_object_get (src->rtpbin, "sdes", &sdes, NULL);
1195       gst_structure_set_value (sdes, "cname", value);
1196       g_object_set (src->rtpbin, "sdes", sdes, NULL);
1197       gst_structure_free (sdes);
1198       break;
1199 
1200     case PROP_MULTICAST_LOOPBACK:
1201       gst_rist_src_set_multicast_loopback (src, g_value_get_boolean (value));
1202       break;
1203 
1204     case PROP_MULTICAST_IFACE:
1205       g_free (bond->multicast_iface);
1206       bond->multicast_iface = g_value_dup_string (value);
1207       g_object_set_property (G_OBJECT (bond->rtp_src),
1208           "multicast-iface", value);
1209       g_object_set_property (G_OBJECT (bond->rtcp_src),
1210           "multicast-iface", value);
1211       break;
1212 
1213     case PROP_MULTICAST_TTL:
1214       src->multicast_ttl = g_value_get_int (value);
1215       break;
1216 
1217     case PROP_BONDING_ADDRESSES:
1218       gst_rist_src_set_bonds (src, g_value_get_string (value));
1219       break;
1220 
1221     default:
1222       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1223       break;
1224   }
1225 
1226   g_mutex_unlock (&src->bonds_lock);
1227 }
1228 
1229 static void
gst_rist_src_finalize(GObject * object)1230 gst_rist_src_finalize (GObject * object)
1231 {
1232   GstRistSrc *src = GST_RIST_SRC (object);
1233   gint i;
1234 
1235   g_mutex_lock (&src->bonds_lock);
1236 
1237   for (i = 0; i < src->bonds->len; i++) {
1238     RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
1239     g_free (bond->address);
1240     g_free (bond->multicast_iface);
1241     g_clear_object (&bond->rtcp_send_addr);
1242     g_slice_free (RistReceiverBond, bond);
1243   }
1244   g_ptr_array_free (src->bonds, TRUE);
1245 
1246   g_clear_object (&src->jitterbuffer);
1247   g_clear_object (&src->rtxbin);
1248 
1249   g_mutex_unlock (&src->bonds_lock);
1250   g_mutex_clear (&src->bonds_lock);
1251 
1252   G_OBJECT_CLASS (gst_rist_src_parent_class)->finalize (object);
1253 }
1254 
1255 static void
gst_rist_src_class_init(GstRistSrcClass * klass)1256 gst_rist_src_class_init (GstRistSrcClass * klass)
1257 {
1258   GstBinClass *bin_class = (GstBinClass *) klass;
1259   GstElementClass *element_class = (GstElementClass *) klass;
1260   GObjectClass *object_class = (GObjectClass *) klass;
1261 
1262   gst_element_class_set_metadata (element_class,
1263       "RIST Source", "Source/Network",
1264       "Source that implements RIST TR-06-1 streaming specification",
1265       "Nicolas Dufresne <nicolas.dufresne@collabora.com");
1266   gst_element_class_add_static_pad_template (element_class, &src_templ);
1267 
1268   bin_class->handle_message = gst_rist_src_handle_message;
1269 
1270   element_class->change_state = gst_rist_src_change_state;
1271 
1272   object_class->get_property = gst_rist_src_get_property;
1273   object_class->set_property = gst_rist_src_set_property;
1274   object_class->finalize = gst_rist_src_finalize;
1275 
1276   g_object_class_install_property (object_class, PROP_ADDRESS,
1277       g_param_spec_string ("address", "Address",
1278           "Address to receive packets from (can be IPv4 or IPv6).", "0.0.0.0",
1279           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1280 
1281   g_object_class_install_property (object_class, PROP_PORT,
1282       g_param_spec_uint ("port", "Port", "The port to listen for RTP packets, "
1283           "the RTCP port is this value + 1. This port must be an even number.",
1284           2, 65534, 5004,
1285           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1286 
1287   g_object_class_install_property (object_class, PROP_RECEIVER_BUFFER,
1288       g_param_spec_uint ("receiver-buffer", "Receiver Buffer",
1289           "Buffering duration (in ms)", 0, G_MAXUINT, 1000,
1290           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1291 
1292   g_object_class_install_property (object_class, PROP_REORDER_SECTION,
1293       g_param_spec_uint ("reorder-section", "Recorder Section",
1294           "Time to wait before sending retransmission request (in ms)",
1295           0, G_MAXUINT, 70,
1296           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1297 
1298   g_object_class_install_property (object_class, PROP_MAX_RTX_RETRIES,
1299       g_param_spec_uint ("max-rtx-retries", "Maximum Retransmission Retries",
1300           "The maximum number of retransmission requests for a lost packet.",
1301           0, G_MAXUINT, 7,
1302           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1303 
1304   g_object_class_install_property (object_class, PROP_MIN_RTCP_INTERVAL,
1305       g_param_spec_uint ("min-rtcp-interval", "Minimum RTCP Intercal",
1306           "The minimum interval (in ms) between two successive RTCP packets",
1307           0, 100, 100,
1308           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1309 
1310   g_object_class_install_property (object_class, PROP_MAX_RTCP_BANDWIDTH,
1311       g_param_spec_double ("max-rtcp-bandwidth", "Maximum RTCP Bandwidth",
1312           "The maximum bandwidth used for RTCP as a fraction of RTP bandwdith",
1313           0.0, 0.05, 0.05,
1314           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1315 
1316   g_object_class_install_property (object_class, PROP_STATS_UPDATE_INTERVAL,
1317       g_param_spec_uint ("stats-update-interval", "Statistics Update Interval",
1318           "The interval between 'stats' update notification (in ms) (0 disabled)",
1319           0, G_MAXUINT, 0,
1320           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1321 
1322   g_object_class_install_property (object_class, PROP_STATS,
1323       g_param_spec_boxed ("stats", "Statistics",
1324           "Statistic in a GstStructure named 'rist/x-receiver-stats'",
1325           GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1326 
1327   g_object_class_install_property (object_class, PROP_CNAME,
1328       g_param_spec_string ("cname", "CName",
1329           "Set the CNAME in the SDES block of the receiver report.", NULL,
1330           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
1331           GST_PARAM_DOC_SHOW_DEFAULT));
1332 
1333   g_object_class_install_property (object_class, PROP_MULTICAST_LOOPBACK,
1334       g_param_spec_boolean ("multicast-loopback", "Multicast Loopback",
1335           "When enabled, the packets will be received locally.", FALSE,
1336           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1337 
1338   g_object_class_install_property (object_class, PROP_MULTICAST_IFACE,
1339       g_param_spec_string ("multicast-iface", "multicast-iface",
1340           "The multicast interface to use to send packets.", NULL,
1341           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1342 
1343   g_object_class_install_property (object_class, PROP_MULTICAST_TTL,
1344       g_param_spec_int ("multicast-ttl", "Multicast TTL",
1345           "The multicast time-to-live parameter.", 0, 255, 1,
1346           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1347 
1348   g_object_class_install_property (object_class, PROP_BONDING_ADDRESSES,
1349       g_param_spec_string ("bonding-addresses", "Bonding Addresses",
1350           "Comma (,) separated list of <address>:<port> to receive from. "
1351           "Only used if 'enable-bonding' is set.", NULL,
1352           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1353 }
1354 
1355 static GstURIType
gst_rist_src_uri_get_type(GType type)1356 gst_rist_src_uri_get_type (GType type)
1357 {
1358   return GST_URI_SRC;
1359 }
1360 
1361 static const gchar *const *
gst_rist_src_uri_get_protocols(GType type)1362 gst_rist_src_uri_get_protocols (GType type)
1363 {
1364   static const char *protocols[] = { "rist", NULL };
1365   return protocols;
1366 }
1367 
1368 static gchar *
gst_rist_src_uri_get_uri(GstURIHandler * handler)1369 gst_rist_src_uri_get_uri (GstURIHandler * handler)
1370 {
1371   GstRistSrc *src = GST_RIST_SRC (handler);
1372   gchar *uri = NULL;
1373 
1374   GST_OBJECT_LOCK (src);
1375   if (src->uri)
1376     uri = gst_uri_to_string (src->uri);
1377   GST_OBJECT_UNLOCK (src);
1378 
1379   return uri;
1380 }
1381 
1382 static void
gst_rist_src_uri_query_foreach(const gchar * key,const gchar * value,GObject * src)1383 gst_rist_src_uri_query_foreach (const gchar * key, const gchar * value,
1384     GObject * src)
1385 {
1386   if (g_str_equal (key, "async-handling")) {
1387     GST_WARNING_OBJECT (src, "Setting '%s' property from URI is not allowed.",
1388         key);
1389     return;
1390   }
1391 
1392   GST_DEBUG_OBJECT (src, "Setting property '%s' to '%s'", key, value);
1393   gst_util_set_object_arg (src, key, value);
1394 }
1395 
1396 static gboolean
gst_rist_src_uri_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)1397 gst_rist_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
1398     GError ** error)
1399 {
1400   GstRistSrc *src = GST_RIST_SRC (handler);
1401   GstUri *gsturi;
1402   GHashTable *query_table;
1403 
1404   if (GST_STATE (src) >= GST_STATE_PAUSED) {
1405     g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
1406         "Changing the URI on ristsrc when it is running is not supported");
1407     GST_ERROR_OBJECT (src, "%s", (*error)->message);
1408     return FALSE;
1409   }
1410 
1411   if (!(gsturi = gst_uri_from_string (uri))) {
1412     g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
1413         "Could not parse URI");
1414     GST_ERROR_OBJECT (src, "%s", (*error)->message);
1415     gst_uri_unref (gsturi);
1416     return FALSE;
1417   }
1418 
1419   GST_OBJECT_LOCK (src);
1420   if (src->uri)
1421     gst_uri_unref (src->uri);
1422   src->uri = gst_uri_ref (gsturi);
1423   GST_OBJECT_UNLOCK (src);
1424 
1425   g_object_set (src, "address", gst_uri_get_host (gsturi), NULL);
1426   if (gst_uri_get_port (gsturi))
1427     g_object_set (src, "port", gst_uri_get_port (gsturi), NULL);
1428 
1429   query_table = gst_uri_get_query_table (gsturi);
1430   if (query_table)
1431     g_hash_table_foreach (query_table,
1432         (GHFunc) gst_rist_src_uri_query_foreach, src);
1433 
1434   gst_uri_unref (gsturi);
1435   return TRUE;
1436 }
1437 
1438 static void
gst_rist_src_uri_init(gpointer g_iface,gpointer iface_data)1439 gst_rist_src_uri_init (gpointer g_iface, gpointer iface_data)
1440 {
1441   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
1442 
1443   iface->get_type = gst_rist_src_uri_get_type;
1444   iface->get_protocols = gst_rist_src_uri_get_protocols;
1445   iface->get_uri = gst_rist_src_uri_get_uri;
1446   iface->set_uri = gst_rist_src_uri_set_uri;
1447 }
1448