• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer RIST plugin
2  * Copyright (C) 2019 Net Insight AB
3  *     Author: Nicolas Dufresne <nicolas.dufresne@collabora.com>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 
21 /**
22  * SECTION:element-ristsink
23  * @title: ristsink
24  * @see_also: ristsrc
25  *
26  * This element implements RIST TR-06-1 Simple Profile transmitter. It
27  * currently supports any registered RTP static payload types such as
28  * MPEG TS. The stream passed to this element must be already RTP
29  * payloaded.  Even though RTP SSRC collision are rare in
30  * unidirectional streaming, this element expects the upstream elements
31  * to obey to collision events and change the SSRC in use. Collisions
32  * will occur when transmitting and receiving over multicast on the
33  * same host, and will be properly ignored.
34  *
35  * It also implements part of the RIST TR-06-2 Main Profile transmitter. The
36  * tunneling, multiplexing and encryption parts of the specification are not
37  * included. This element will include the RIST header extension if either of
38  * the "sequence-number-extension" or "drop-null-ts-packets" properties are set.
39  *
40  * ## Example gst-launch line
41  * |[
42  * gst-launch-1.0 udpsrc ! tsparse set-timestamps=1 smoothing-latency=40000 ! \
43  * rtpmp2tpay ! ristsink address=10.0.0.1 port=5004
44  * ]|
45  *
46  * Additionally, this element supports bonding, which consist of using
47  * multiple links in order to transmit the streams. The address of
48  * each link is configured through the "bonding-addresses"
49  * property. When set, this will replace the value that might have
50  * been set on the "address" and "port" properties. Each link will be
51  * mapped to its own RTP session. RTX request are only replied to on the
52  * link the NACK was received from.
53  *
54  * There are currently two bonding methods in place: "broadcast" and "round-robin".
55  * In "broadcast" mode, all the packets are duplicated over all sessions.
56  * While in "round-robin" mode, packets are evenly distributed over the links. One
57  * can also implement its own dispatcher element and configure it using the
58  * "dispatcher" property. As a reference, "broadcast" mode is implemented with
59  * the "tee" element, while "round-robin" mode is implemented with the
60  * "round-robin" element.
61  *
62  * ## Example gst-launch line for bonding
63  * |[
64  * gst-launch-1.0 udpsrc ! tsparse set-timestamps=1 smoothing-latency=40000 ! \
65  *  rtpmp2tpay ! ristsink bonding-addresses="10.0.0.1:5004,11.0.0.1:5006"
66  * ]|
67  */
68 
69 /* using GValueArray, which has not replacement */
70 #define GLIB_DISABLE_DEPRECATION_WARNINGS
71 
72 #ifdef HAVE_CONFIG_H
73 #include "config.h"
74 #endif
75 
76 #include <gio/gio.h>
77 #include <gst/rtp/rtp.h>
78 
79 /* for strtol() */
80 #include <stdlib.h>
81 
82 #include "gstrist.h"
83 
84 GST_DEBUG_CATEGORY_STATIC (gst_rist_sink_debug);
85 #define GST_CAT_DEFAULT gst_rist_sink_debug
86 
87 enum
88 {
89   PROP_ADDRESS = 1,
90   PROP_PORT,
91   PROP_SENDER_BUFFER,
92   PROP_MIN_RTCP_INTERVAL,
93   PROP_MAX_RTCP_BANDWIDTH,
94   PROP_STATS_UPDATE_INTERVAL,
95   PROP_STATS,
96   PROP_CNAME,
97   PROP_MULTICAST_LOOPBACK,
98   PROP_MULTICAST_IFACE,
99   PROP_MULTICAST_TTL,
100   PROP_BONDING_ADDRESSES,
101   PROP_BONDING_METHOD,
102   PROP_DISPATCHER,
103   PROP_DROP_NULL_TS_PACKETS,
104   PROP_SEQUENCE_NUMBER_EXTENSION
105 };
106 
107 typedef enum
108 {
109   GST_RIST_BONDING_METHOD_BROADCAST,
110   GST_RIST_BONDING_METHOD_ROUND_ROBIN,
111 } GstRistBondingMethod;
112 
113 static GstStaticPadTemplate sink_templ = GST_STATIC_PAD_TEMPLATE ("sink",
114     GST_PAD_SINK,
115     GST_PAD_ALWAYS,
116     GST_STATIC_CAPS ("application/x-rtp"));
117 
118 typedef struct
119 {
120   guint session;
121   gchar *address;
122   gchar *multicast_iface;
123   guint port;
124   GstElement *rtcp_src;
125   GstElement *rtp_sink;
126   GstElement *rtcp_sink;
127   GstElement *rtx_send;
128   GstElement *rtx_queue;
129   guint32 rtcp_ssrc;
130 } RistSenderBond;
131 
132 struct _GstRistSink
133 {
134   GstBin parent;
135 
136   /* Common elements in the pipeline */
137   GstElement *rtpbin;
138   GstElement *ssrc_filter;
139   GstPad *sinkpad;
140   GstElement *rtxbin;
141   GstElement *dispatcher;
142   GstElement *rtpext;
143 
144   /* Common properties, protected by bonds_lock */
145   gint multicast_ttl;
146   gboolean multicast_loopback;
147   GstClockTime min_rtcp_interval;
148   gdouble max_rtcp_bandwidth;
149   GstRistBondingMethod bonding_method;
150 
151   /* Bonds */
152   GPtrArray *bonds;
153   /* this is needed as setting sibling properties will try to take the object
154    * lock. Thus, any properties that affects the bonds will be protected with
155    * that lock instead of the object lock. */
156   GMutex bonds_lock;
157 
158   /* For stats */
159   guint stats_interval;
160   guint32 rtp_ssrc;
161   GstClockID stats_cid;
162 
163   /* This is set whenever there is a pipeline construction failure, and used
164    * to fail state changes later */
165   gboolean construct_failed;
166   const gchar *missing_plugin;
167 };
168 
169 static GType
gst_rist_bonding_method_get_type(void)170 gst_rist_bonding_method_get_type (void)
171 {
172   static gsize id = 0;
173   static const GEnumValue values[] = {
174     {GST_RIST_BONDING_METHOD_BROADCAST,
175         "GST_RIST_BONDING_METHOD_BROADCAST", "broadcast"},
176     {GST_RIST_BONDING_METHOD_ROUND_ROBIN,
177         "GST_RIST_BONDING_METHOD_ROUND_ROBIN", "round-robin"},
178     {0, NULL, NULL}
179   };
180 
181   if (g_once_init_enter (&id)) {
182     GType tmp = g_enum_register_static ("GstRistBondingMethodType", values);
183     g_once_init_leave (&id, tmp);
184   }
185 
186   return (GType) id;
187 }
188 
189 G_DEFINE_TYPE_WITH_CODE (GstRistSink, gst_rist_sink, GST_TYPE_BIN,
190     GST_DEBUG_CATEGORY_INIT (gst_rist_sink_debug, "ristsink", 0, "RIST Sink"));
191 GST_ELEMENT_REGISTER_DEFINE (ristsink, "ristsink", GST_RANK_PRIMARY,
192     GST_TYPE_RIST_SINK);
193 
194 GQuark session_id_quark = 0;
195 
196 static RistSenderBond *
gst_rist_sink_add_bond(GstRistSink * sink)197 gst_rist_sink_add_bond (GstRistSink * sink)
198 {
199   RistSenderBond *bond = g_slice_new0 (RistSenderBond);
200   GstPad *pad, *gpad;
201   gchar name[32];
202 
203   bond->session = sink->bonds->len;
204   bond->address = g_strdup ("localhost");
205 
206   g_snprintf (name, 32, "rist_rtp_udpsink%u", bond->session);
207   bond->rtp_sink = gst_element_factory_make ("udpsink", name);
208   if (!bond->rtp_sink) {
209     g_slice_free (RistSenderBond, bond);
210     sink->missing_plugin = "udp";
211     return NULL;
212   }
213 
214   /* these are all from UDP plugin, so they cannot fail */
215   g_snprintf (name, 32, "rist_rtcp_udpsrc%u", bond->session);
216   bond->rtcp_src = gst_element_factory_make ("udpsrc", name);
217   g_snprintf (name, 32, "rist_rtcp_udpsink%u", bond->session);
218   bond->rtcp_sink = gst_element_factory_make ("udpsink", name);
219   g_object_set (bond->rtcp_sink, "async", FALSE, NULL);
220 
221   gst_bin_add_many (GST_BIN (sink), bond->rtp_sink, bond->rtcp_src,
222       bond->rtcp_sink, NULL);
223   gst_element_set_locked_state (bond->rtcp_src, TRUE);
224   gst_element_set_locked_state (bond->rtcp_sink, TRUE);
225 
226   g_snprintf (name, 32, "rist_rtx_queue%u", bond->session);
227   bond->rtx_queue = gst_element_factory_make ("queue", name);
228   gst_bin_add (GST_BIN (sink->rtxbin), bond->rtx_queue);
229 
230   g_snprintf (name, 32, "rist_rtx_send%u", bond->session);
231   bond->rtx_send = gst_element_factory_make ("ristrtxsend", name);
232   if (!bond->rtx_send) {
233     sink->missing_plugin = "rtpmanager";
234     g_slice_free (RistSenderBond, bond);
235     return NULL;
236   }
237   gst_bin_add (GST_BIN (sink->rtxbin), bond->rtx_send);
238 
239   gst_element_link (bond->rtx_queue, bond->rtx_send);
240 
241   pad = gst_element_get_static_pad (bond->rtx_send, "src");
242   g_snprintf (name, 32, "src_%u", bond->session);
243   gpad = gst_ghost_pad_new (name, pad);
244   gst_object_unref (pad);
245   gst_element_add_pad (sink->rtxbin, gpad);
246 
247   g_object_set (bond->rtx_send, "max-size-packets", 0, NULL);
248 
249   g_snprintf (name, 32, "send_rtp_sink_%u", bond->session);
250   if (bond->session == 0) {
251     gst_element_link_pads (sink->ssrc_filter, "src", sink->rtpbin, name);
252   } else {
253     GstPad *pad;
254 
255     /* to make a sender, we need to create an unused pad on rtpbin, which will
256      * require an unused pad on the rtxbin */
257     g_snprintf (name, 32, "sink_%u", bond->session);
258     pad = gst_ghost_pad_new_no_target (name, GST_PAD_SINK);
259     gst_element_add_pad (sink->rtxbin, pad);
260 
261     g_snprintf (name, 32, "send_rtp_sink_%u", bond->session);
262     pad = gst_element_request_pad_simple (sink->rtpbin, name);
263     gst_object_unref (pad);
264   }
265 
266   g_snprintf (name, 32, "send_rtp_src_%u", bond->session);
267   gst_element_link_pads (sink->rtpbin, name, bond->rtp_sink, "sink");
268 
269   g_snprintf (name, 32, "recv_rtcp_sink_%u", bond->session);
270   gst_element_link_pads (bond->rtcp_src, "src", sink->rtpbin, name);
271 
272   g_snprintf (name, 32, "send_rtcp_src_%u", bond->session);
273   gst_element_link_pads (sink->rtpbin, name, bond->rtcp_sink, "sink");
274 
275   g_ptr_array_add (sink->bonds, bond);
276   return bond;
277 }
278 
279 static GstCaps *
gst_rist_sink_request_pt_map(GstRistSrc * sink,guint session_id,guint pt,GstElement * rtpbin)280 gst_rist_sink_request_pt_map (GstRistSrc * sink, guint session_id, guint pt,
281     GstElement * rtpbin)
282 {
283   const GstRTPPayloadInfo *pt_info;
284   GstCaps *ret;
285 
286   pt_info = gst_rtp_payload_info_for_pt (pt);
287   if (!pt_info || !pt_info->clock_rate)
288     return NULL;
289 
290   ret = gst_caps_new_simple ("application/x-rtp",
291       "media", G_TYPE_STRING, pt_info->media,
292       "encoding_name", G_TYPE_STRING, pt_info->encoding_name,
293       "clock-rate", G_TYPE_INT, (gint) pt_info->clock_rate, NULL);
294 
295   /* FIXME add sprop-parameter-set if any */
296   g_warn_if_fail (pt_info->encoding_parameters == NULL);
297 
298   return ret;
299 }
300 
301 static GstElement *
gst_rist_sink_request_aux_sender(GstRistSink * sink,guint session_id,GstElement * rtpbin)302 gst_rist_sink_request_aux_sender (GstRistSink * sink, guint session_id,
303     GstElement * rtpbin)
304 {
305   return gst_object_ref (sink->rtxbin);
306 }
307 
308 static void
on_receiving_rtcp(GObject * session,GstBuffer * buffer,GstRistSink * sink)309 on_receiving_rtcp (GObject * session, GstBuffer * buffer, GstRistSink * sink)
310 {
311   RistSenderBond *bond = NULL;
312   GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
313 
314   if (gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp)) {
315     GstRTCPPacket packet;
316 
317     if (gst_rtcp_buffer_get_first_packet (&rtcp, &packet)) {
318       /* Always skip the first one as it's never a FB or APP packet */
319 
320       while (gst_rtcp_packet_move_to_next (&packet)) {
321         guint32 ssrc;
322 
323         switch (gst_rtcp_packet_get_type (&packet)) {
324           case GST_RTCP_TYPE_APP:
325             if (memcmp (gst_rtcp_packet_app_get_name (&packet), "RIST", 4) == 0)
326               ssrc = gst_rtcp_packet_app_get_ssrc (&packet);
327             else
328               continue;
329             break;
330           case GST_RTCP_TYPE_RTPFB:
331             if (gst_rtcp_packet_fb_get_type (&packet) ==
332                 GST_RTCP_RTPFB_TYPE_NACK)
333               ssrc = gst_rtcp_packet_fb_get_media_ssrc (&packet);
334             else
335               continue;
336             break;
337           default:
338             continue;
339         }
340 
341         /* The SSRC could be that of the original data or the
342          * retransmission. So for the last bit to 0.
343          */
344         ssrc &= 0xFFFFFFFE;
345 
346         if (bond == NULL) {
347           guint session_id =
348               GPOINTER_TO_UINT (g_object_get_qdata (session, session_id_quark));
349 
350           bond = g_ptr_array_index (sink->bonds, session_id);
351           if (bond == NULL) {
352             g_critical ("Can't find session id %u", session_id);
353             goto done;
354           }
355         }
356 
357         gst_rist_rtx_send_clear_extseqnum (GST_RIST_RTX_SEND (bond->rtx_send),
358             ssrc);
359       }
360     }
361   done:
362     gst_rtcp_buffer_unmap (&rtcp);
363   }
364 }
365 
366 static void
on_app_rtcp(GObject * session,guint32 subtype,guint32 ssrc,const gchar * name,GstBuffer * data,GstRistSink * sink)367 on_app_rtcp (GObject * session, guint32 subtype, guint32 ssrc,
368     const gchar * name, GstBuffer * data, GstRistSink * sink)
369 {
370   if (g_str_equal (name, "RIST")) {
371     guint session_id =
372         GPOINTER_TO_UINT (g_object_get_qdata (session, session_id_quark));
373 
374     if (subtype == 0) {
375       GstEvent *event;
376       GstPad *send_rtp_sink;
377       GstMapInfo map;
378       gint i;
379       GstElement *gstsession;
380 
381       g_signal_emit_by_name (sink->rtpbin, "get-session", session_id,
382           &gstsession);
383 
384       send_rtp_sink = gst_element_get_static_pad (gstsession, "send_rtp_sink");
385       if (send_rtp_sink) {
386         gst_buffer_map (data, &map, GST_MAP_READ);
387 
388         for (i = 0; i < map.size; i += sizeof (guint32)) {
389           guint32 dword = GST_READ_UINT32_BE (map.data + i);
390           guint16 seqnum = dword >> 16;
391           guint16 num = dword & 0x0000FFFF;
392           guint16 j;
393 
394           GST_DEBUG ("got RIST nack packet, #%u %u", seqnum, num);
395 
396           /* num is inclusive, i.e. it can be 0, which means exactly 1 seqnum */
397           for (j = 0; j <= num; j++) {
398             event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
399                 gst_structure_new ("GstRTPRetransmissionRequest",
400                     "seqnum", G_TYPE_UINT, (guint) seqnum + j,
401                     "ssrc", G_TYPE_UINT, (guint) ssrc, NULL));
402             gst_pad_push_event (send_rtp_sink, event);
403           }
404         }
405 
406         gst_buffer_unmap (data, &map);
407         gst_object_unref (send_rtp_sink);
408       }
409     } else if (subtype == 1) {
410       GstMapInfo map;
411       RistSenderBond *bond;
412       guint16 seqnum_ext;
413 
414       bond = g_ptr_array_index (sink->bonds, session_id);
415 
416       if (gst_buffer_get_size (data) < 4) {
417         if (bond)
418           gst_rist_rtx_send_clear_extseqnum (GST_RIST_RTX_SEND (bond->rtx_send),
419               ssrc);
420 
421         GST_WARNING_OBJECT (sink, "RIST APP RTCP packet is too small,"
422             " it's %zu bytes, less than the expected 4 bytes",
423             gst_buffer_get_size (data));
424         return;
425       }
426 
427       gst_buffer_map (data, &map, GST_MAP_READ);
428       seqnum_ext = GST_READ_UINT16_BE (map.data);
429       gst_buffer_unmap (data, &map);
430 
431       if (bond)
432         gst_rist_rtx_send_set_extseqnum (GST_RIST_RTX_SEND (bond->rtx_send),
433             ssrc, seqnum_ext);
434     }
435   }
436 }
437 
438 static void
gst_rist_sink_on_new_sender_ssrc(GstRistSink * sink,guint session_id,guint ssrc,GstElement * rtpbin)439 gst_rist_sink_on_new_sender_ssrc (GstRistSink * sink, guint session_id,
440     guint ssrc, GstElement * rtpbin)
441 {
442   GObject *gstsession = NULL;
443   GObject *session = NULL;
444   GObject *source = NULL;
445 
446   if (session_id != 0)
447     return;
448 
449   g_signal_emit_by_name (rtpbin, "get-session", session_id, &gstsession);
450   g_signal_emit_by_name (rtpbin, "get-internal-session", session_id, &session);
451   g_signal_emit_by_name (session, "get-source-by-ssrc", ssrc, &source);
452   g_object_set_qdata (session, session_id_quark, GUINT_TO_POINTER (session_id));
453 
454   if (ssrc & 1) {
455     g_object_set (source, "disable-rtcp", TRUE, NULL);
456   } else {
457     g_signal_connect_object (session, "on-app-rtcp",
458         (GCallback) on_app_rtcp, sink, 0);
459     g_signal_connect_object (session, "on-receiving-rtcp",
460         (GCallback) on_receiving_rtcp, sink, 0);
461   }
462 
463   g_object_unref (source);
464   g_object_unref (session);
465 }
466 
467 static void
gst_rist_sink_on_new_receiver_ssrc(GstRistSink * sink,guint session_id,guint ssrc,GstElement * rtpbin)468 gst_rist_sink_on_new_receiver_ssrc (GstRistSink * sink, guint session_id,
469     guint ssrc, GstElement * rtpbin)
470 {
471   RistSenderBond *bond;
472 
473   if (session_id != 0)
474     return;
475 
476   GST_INFO_OBJECT (sink, "Got RTCP remote SSRC %u", ssrc);
477   bond = g_ptr_array_index (sink->bonds, session_id);
478   bond->rtcp_ssrc = ssrc;
479 }
480 
481 static GstPadProbeReturn
gst_rist_sink_fix_collision(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)482 gst_rist_sink_fix_collision (GstPad * pad, GstPadProbeInfo * info,
483     gpointer user_data)
484 {
485   GstEvent *event = info->data;
486   const GstStructure *cs;
487   GstStructure *s;
488   guint ssrc;
489 
490   /* We simply ignore collisions */
491   if (GST_EVENT_TYPE (event) != GST_EVENT_CUSTOM_UPSTREAM)
492     return GST_PAD_PROBE_OK;
493 
494   cs = gst_event_get_structure (event);
495   if (!gst_structure_has_name (cs, "GstRTPCollision"))
496     return GST_PAD_PROBE_OK;
497 
498   gst_structure_get_uint (cs, "suggested-ssrc", &ssrc);
499   if ((ssrc & 1) == 0)
500     return GST_PAD_PROBE_OK;
501 
502   event = info->data = gst_event_make_writable (event);
503   /* we can drop the const qualifier as we ensured writability */
504   s = (GstStructure *) gst_event_get_structure (event);
505   gst_structure_set (s, "suggested-ssrc", G_TYPE_UINT, ssrc - 1, NULL);
506 
507   return GST_PAD_PROBE_OK;
508 }
509 
510 static gboolean
gst_rist_sink_set_caps(GstRistSink * sink,GstCaps * caps)511 gst_rist_sink_set_caps (GstRistSink * sink, GstCaps * caps)
512 {
513   const GstStructure *s = gst_caps_get_structure (caps, 0);
514 
515   if (!gst_structure_get_uint (s, "ssrc", &sink->rtp_ssrc)) {
516     GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, ("No 'ssrc' field in caps."),
517         (NULL));
518     return FALSE;
519   }
520 
521   if (sink->rtp_ssrc & 1) {
522     GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION,
523         ("Invalid RIST SSRC, LSB must be zero."), (NULL));
524     return FALSE;
525   }
526 
527   return TRUE;
528 }
529 
530 static gboolean
gst_rist_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)531 gst_rist_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
532 {
533   GstRistSink *sink = GST_RIST_SINK (parent);
534   GstCaps *caps;
535   gboolean ret = TRUE;
536 
537   switch (GST_EVENT_TYPE (event)) {
538     case GST_EVENT_CAPS:
539       gst_event_parse_caps (event, &caps);
540       ret = gst_rist_sink_set_caps (sink, caps);
541       break;
542     default:
543       break;
544   }
545 
546   if (ret)
547     ret = gst_pad_event_default (pad, parent, event);
548   else
549     gst_event_unref (event);
550 
551   return ret;
552 }
553 
554 static void
gst_rist_sink_init(GstRistSink * sink)555 gst_rist_sink_init (GstRistSink * sink)
556 {
557   GstPad *ssrc_filter_sinkpad, *rtxbin_gpad;
558   GstCaps *ssrc_caps;
559   GstStructure *sdes = NULL;
560   RistSenderBond *bond;
561 
562   sink->rtpext = gst_element_factory_make ("ristrtpext", "ristrtpext");
563 
564   g_mutex_init (&sink->bonds_lock);
565   sink->bonds = g_ptr_array_new ();
566 
567   /* Construct the RIST RTP sender pipeline.
568    *
569    * capsfilter*-> [send_rtp_sink_%u]   --------  [send_rtp_src_%u]  -> udpsink
570    *                                   | rtpbin |
571    * udpsrc     -> [recv_rtcp_sink_%u]  --------  [send_rtcp_src_%u] -> * udpsink
572    *
573    * * To select RIST compatible SSRC
574    */
575   sink->rtpbin = gst_element_factory_make ("rtpbin", "rist_send_rtpbin");
576   if (!sink->rtpbin) {
577     sink->missing_plugin = "rtpmanager";
578     goto missing_plugin;
579   }
580 
581   /* RIST specification says the SDES should only contain the CNAME */
582   g_object_get (sink->rtpbin, "sdes", &sdes, NULL);
583   gst_structure_remove_field (sdes, "tool");
584 
585   gst_bin_add (GST_BIN (sink), sink->rtpbin);
586   g_object_set (sink->rtpbin, "do-retransmission", TRUE,
587       "rtp-profile", 3 /* AVFP */ ,
588       "sdes", sdes, NULL);
589   gst_structure_free (sdes);
590 
591   g_signal_connect_object (sink->rtpbin, "request-pt-map",
592       G_CALLBACK (gst_rist_sink_request_pt_map), sink, G_CONNECT_SWAPPED);
593   g_signal_connect_object (sink->rtpbin, "request-aux-sender",
594       G_CALLBACK (gst_rist_sink_request_aux_sender), sink, G_CONNECT_SWAPPED);
595   g_signal_connect_object (sink->rtpbin, "on-new-sender-ssrc",
596       G_CALLBACK (gst_rist_sink_on_new_sender_ssrc), sink, G_CONNECT_SWAPPED);
597   g_signal_connect_object (sink->rtpbin, "on-new-ssrc",
598       G_CALLBACK (gst_rist_sink_on_new_receiver_ssrc), sink, G_CONNECT_SWAPPED);
599 
600   sink->rtxbin = gst_bin_new ("rist_send_rtxbin");
601   g_object_ref_sink (sink->rtxbin);
602 
603   rtxbin_gpad = gst_ghost_pad_new_no_target ("sink_0", GST_PAD_SINK);
604   gst_element_add_pad (sink->rtxbin, rtxbin_gpad);
605 
606   sink->ssrc_filter = gst_element_factory_make ("capsfilter",
607       "rist_ssrc_filter");
608   gst_bin_add (GST_BIN (sink), sink->ssrc_filter);
609 
610   /* RIST RTP SSRC should have LSB set to 0 */
611   sink->rtp_ssrc = g_random_int () & ~1;
612   ssrc_caps = gst_caps_new_simple ("application/x-rtp",
613       "ssrc", G_TYPE_UINT, sink->rtp_ssrc, NULL);
614   gst_caps_append_structure (ssrc_caps,
615       gst_structure_new_empty ("application/x-rtp"));
616   g_object_set (sink->ssrc_filter, "caps", ssrc_caps, NULL);
617   gst_caps_unref (ssrc_caps);
618 
619   ssrc_filter_sinkpad = gst_element_get_static_pad (sink->ssrc_filter, "sink");
620   sink->sinkpad = gst_ghost_pad_new_from_template ("sink", ssrc_filter_sinkpad,
621       gst_static_pad_template_get (&sink_templ));
622   gst_pad_set_event_function (sink->sinkpad, gst_rist_sink_event);
623   gst_element_add_pad (GST_ELEMENT (sink), sink->sinkpad);
624   gst_object_unref (ssrc_filter_sinkpad);
625 
626   gst_pad_add_probe (sink->sinkpad, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM,
627       gst_rist_sink_fix_collision, sink, NULL);
628 
629   bond = gst_rist_sink_add_bond (sink);
630   if (!bond)
631     goto missing_plugin;
632 
633   return;
634 
635 missing_plugin:
636   {
637     GST_ERROR_OBJECT (sink, "'%s' plugin is missing.", sink->missing_plugin);
638     sink->construct_failed = TRUE;
639     /* Just make our element valid, so we fail cleanly */
640     gst_element_add_pad (GST_ELEMENT (sink),
641         gst_pad_new_from_static_template (&sink_templ, "sink"));
642   }
643 }
644 
645 static gboolean
gst_rist_sink_setup_rtcp_socket(GstRistSink * sink,RistSenderBond * bond)646 gst_rist_sink_setup_rtcp_socket (GstRistSink * sink, RistSenderBond * bond)
647 {
648   GSocket *socket = NULL;
649   GInetAddress *iaddr = NULL;
650   gchar *remote_addr = NULL;
651   guint port = bond->port + 1;
652   GError *error = NULL;
653 
654   iaddr = g_inet_address_new_from_string (bond->address);
655   if (!iaddr) {
656     GList *results;
657     GResolver *resolver = NULL;
658 
659     resolver = g_resolver_get_default ();
660     results = g_resolver_lookup_by_name (resolver, bond->address, NULL, &error);
661 
662     if (!results) {
663       g_object_unref (resolver);
664       goto dns_resolve_failed;
665     }
666 
667     iaddr = G_INET_ADDRESS (g_object_ref (results->data));
668 
669     g_resolver_free_addresses (results);
670     g_object_unref (resolver);
671   }
672   remote_addr = g_inet_address_to_string (iaddr);
673 
674   if (g_inet_address_get_is_multicast (iaddr)) {
675     g_object_set (bond->rtcp_src, "address", remote_addr, "port", port, NULL);
676   } else {
677     const gchar *any_addr;
678 
679     if (g_inet_address_get_family (iaddr) == G_SOCKET_FAMILY_IPV6)
680       any_addr = "::";
681     else
682       any_addr = "0.0.0.0";
683 
684     g_object_set (bond->rtcp_src, "address", any_addr, "port", 0, NULL);
685   }
686   g_free (remote_addr);
687   g_object_unref (iaddr);
688 
689   gst_element_set_locked_state (bond->rtcp_src, FALSE);
690   gst_element_sync_state_with_parent (bond->rtcp_src);
691 
692   /* share the socket created by the sink */
693   g_object_get (bond->rtcp_src, "used-socket", &socket, NULL);
694   g_object_set (bond->rtcp_sink, "socket", socket, "auto-multicast", FALSE,
695       "close-socket", FALSE, NULL);
696   g_object_unref (socket);
697 
698   g_object_set (bond->rtcp_sink, "sync", FALSE, "async", FALSE, NULL);
699   gst_element_set_locked_state (bond->rtcp_sink, FALSE);
700   gst_element_sync_state_with_parent (bond->rtcp_sink);
701 
702   return GST_STATE_CHANGE_SUCCESS;
703 
704 dns_resolve_failed:
705   GST_ELEMENT_ERROR (sink, RESOURCE, NOT_FOUND,
706       ("Could not resolve hostname '%s'", GST_STR_NULL (bond->address)),
707       ("DNS resolver reported: %s", error->message));
708   g_error_free (error);
709   return GST_STATE_CHANGE_FAILURE;
710 }
711 
712 static GstStateChangeReturn
gst_rist_sink_reuse_socket(GstRistSink * sink)713 gst_rist_sink_reuse_socket (GstRistSink * sink)
714 {
715   gint i;
716 
717   for (i = 0; i < sink->bonds->len; i++) {
718     RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
719     GObject *session = NULL;
720     GstPad *pad;
721     gchar name[32];
722 
723     g_signal_emit_by_name (sink->rtpbin, "get-session", i, &session);
724     g_object_set (session, "rtcp-min-interval", sink->min_rtcp_interval,
725         "rtcp-fraction", sink->max_rtcp_bandwidth, NULL);
726     g_object_unref (session);
727 
728     g_snprintf (name, 32, "src_%u", bond->session);
729     pad = gst_element_request_pad_simple (sink->dispatcher, name);
730     gst_element_link_pads (sink->dispatcher, name, bond->rtx_queue, "sink");
731     gst_object_unref (pad);
732 
733     if (!gst_rist_sink_setup_rtcp_socket (sink, bond))
734       return GST_STATE_CHANGE_FAILURE;
735   }
736 
737   return GST_STATE_CHANGE_SUCCESS;
738 }
739 
740 static GstStateChangeReturn
gst_rist_sink_start(GstRistSink * sink)741 gst_rist_sink_start (GstRistSink * sink)
742 {
743   GstPad *rtxbin_gpad, *rtpext_sinkpad;
744 
745   /* Unless a custom dispatcher was provided, use the specified bonding method
746    * to create one */
747   if (!sink->dispatcher) {
748     switch (sink->bonding_method) {
749       case GST_RIST_BONDING_METHOD_BROADCAST:
750         sink->dispatcher = gst_element_factory_make ("tee", "rist_dispatcher");
751         if (!sink->dispatcher) {
752           sink->missing_plugin = "coreelements";
753           sink->construct_failed = TRUE;
754         }
755         break;
756       case GST_RIST_BONDING_METHOD_ROUND_ROBIN:
757         sink->dispatcher = gst_element_factory_make ("roundrobin",
758             "rist_dispatcher");
759         g_assert (sink->dispatcher);
760         break;
761     }
762   }
763 
764   if (sink->construct_failed) {
765     GST_ELEMENT_ERROR (sink, CORE, MISSING_PLUGIN,
766         ("Your GStreamer installation is missing plugin '%s'",
767             sink->missing_plugin), (NULL));
768     return GST_STATE_CHANGE_FAILURE;
769   }
770 
771   gst_bin_add (GST_BIN (sink), sink->rtpext);
772   rtxbin_gpad = gst_element_get_static_pad (sink->rtxbin, "sink_0");
773   rtpext_sinkpad = gst_element_get_static_pad (sink->rtpext, "sink");
774   gst_ghost_pad_set_target (GST_GHOST_PAD (rtxbin_gpad), rtpext_sinkpad);
775   gst_object_unref (rtpext_sinkpad);
776 
777   gst_bin_add (GST_BIN (sink->rtxbin), sink->dispatcher);
778   gst_element_link (sink->rtpext, sink->dispatcher);
779 
780   return GST_STATE_CHANGE_SUCCESS;
781 }
782 
783 
784 static GstStructure *
gst_rist_sink_create_stats(GstRistSink * sink)785 gst_rist_sink_create_stats (GstRistSink * sink)
786 {
787   RistSenderBond *bond;
788   GstStructure *ret;
789   GValueArray *session_stats;
790   guint64 total_pkt_sent = 0, total_rtx_sent = 0;
791   gint i;
792 
793   ret = gst_structure_new_empty ("rist/x-sender-stats");
794   session_stats = g_value_array_new (sink->bonds->len);
795 
796   for (i = 0; i < sink->bonds->len; i++) {
797     GObject *session = NULL, *source = NULL;
798     GstStructure *sstats = NULL, *stats;
799     guint64 pkt_sent = 0, rtx_sent = 0, rtt;
800     guint rb_rtt = 0;
801     GValue value = G_VALUE_INIT;
802 
803     g_signal_emit_by_name (sink->rtpbin, "get-internal-session", i, &session);
804     if (!session)
805       continue;
806 
807     stats = gst_structure_new_empty ("rist/x-sender-session-stats");
808     bond = g_ptr_array_index (sink->bonds, i);
809 
810     g_signal_emit_by_name (session, "get-source-by-ssrc", sink->rtp_ssrc,
811         &source);
812     if (source) {
813       g_object_get (source, "stats", &sstats, NULL);
814       gst_structure_get_uint64 (sstats, "packets-sent", &pkt_sent);
815       gst_structure_free (sstats);
816       g_clear_object (&source);
817     }
818 
819     g_signal_emit_by_name (session, "get-source-by-ssrc", bond->rtcp_ssrc,
820         &source);
821     if (source) {
822       g_object_get (source, "stats", &sstats, NULL);
823       gst_structure_get_uint (sstats, "rb-round-trip", &rb_rtt);
824       gst_structure_free (sstats);
825       g_clear_object (&source);
826     }
827     g_object_unref (session);
828 
829     g_object_get (bond->rtx_send, "num-rtx-packets", &rtx_sent, NULL);
830 
831     /* rb_rtt is in Q16 in NTP time */
832     rtt = gst_util_uint64_scale (rb_rtt, GST_SECOND, 65536);
833 
834     gst_structure_set (stats, "session-id", G_TYPE_INT, i,
835         "sent-original-packets", G_TYPE_UINT64, pkt_sent,
836         "sent-retransmitted-packets", G_TYPE_UINT64, rtx_sent,
837         "round-trip-time", G_TYPE_UINT64, rtt, NULL);
838 
839     g_value_init (&value, GST_TYPE_STRUCTURE);
840     g_value_take_boxed (&value, stats);
841     g_value_array_append (session_stats, &value);
842     g_value_unset (&value);
843 
844     total_pkt_sent += pkt_sent;
845     total_rtx_sent += rtx_sent;
846   }
847 
848   gst_structure_set (ret,
849       "sent-original-packets", G_TYPE_UINT64, total_pkt_sent,
850       "sent-retransmitted-packets", G_TYPE_UINT64, total_rtx_sent,
851       "session-stats", G_TYPE_VALUE_ARRAY, session_stats, NULL);
852   g_value_array_free (session_stats);
853 
854   return ret;
855 }
856 
857 static gboolean
gst_rist_sink_dump_stats(GstClock * clock,GstClockTime time,GstClockID id,gpointer user_data)858 gst_rist_sink_dump_stats (GstClock * clock, GstClockTime time, GstClockID id,
859     gpointer user_data)
860 {
861   GstRistSink *sink = GST_RIST_SINK (user_data);
862   GstStructure *stats = gst_rist_sink_create_stats (sink);
863 
864   gst_println ("%s: %" GST_PTR_FORMAT, GST_OBJECT_NAME (sink), stats);
865 
866   gst_structure_free (stats);
867   return TRUE;
868 }
869 
870 static void
gst_rist_sink_enable_stats_interval(GstRistSink * sink)871 gst_rist_sink_enable_stats_interval (GstRistSink * sink)
872 {
873   GstClock *clock;
874   GstClockTime start, interval;
875 
876   if (sink->stats_interval == 0)
877     return;
878 
879   interval = sink->stats_interval * GST_MSECOND;
880   clock = gst_system_clock_obtain ();
881   start = gst_clock_get_time (clock) + interval;
882 
883   sink->stats_cid = gst_clock_new_periodic_id (clock, start, interval);
884   gst_clock_id_wait_async (sink->stats_cid, gst_rist_sink_dump_stats,
885       gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
886 
887   gst_object_unref (clock);
888 }
889 
890 static void
gst_rist_sink_disable_stats_interval(GstRistSink * sink)891 gst_rist_sink_disable_stats_interval (GstRistSink * sink)
892 {
893   if (sink->stats_cid) {
894     gst_clock_id_unschedule (sink->stats_cid);
895     gst_clock_id_unref (sink->stats_cid);
896     sink->stats_cid = NULL;
897   }
898 }
899 
900 static GstStateChangeReturn
gst_rist_sink_change_state(GstElement * element,GstStateChange transition)901 gst_rist_sink_change_state (GstElement * element, GstStateChange transition)
902 {
903   GstRistSink *sink = GST_RIST_SINK (element);
904   GstStateChangeReturn ret;
905 
906   switch (transition) {
907     case GST_STATE_CHANGE_NULL_TO_READY:
908       /* Set the properties to the child elements to avoid binding to
909        * a NULL interface on a network without a default gateway */
910       if (gst_rist_sink_start (sink) == GST_STATE_CHANGE_FAILURE)
911         return GST_STATE_CHANGE_FAILURE;
912     case GST_STATE_CHANGE_PAUSED_TO_READY:
913       gst_rist_sink_disable_stats_interval (sink);
914       break;
915     default:
916       break;
917   }
918 
919   ret = GST_ELEMENT_CLASS (gst_rist_sink_parent_class)->change_state (element,
920       transition);
921 
922   switch (transition) {
923     case GST_STATE_CHANGE_NULL_TO_READY:
924       ret = gst_rist_sink_reuse_socket (sink);
925       break;
926     case GST_STATE_CHANGE_READY_TO_PAUSED:
927       gst_rist_sink_enable_stats_interval (sink);
928       break;
929     default:
930       break;
931   }
932 
933   return ret;
934 }
935 
936 /* called with bonds lock */
937 static void
gst_rist_sink_update_bond_address(GstRistSink * sink,RistSenderBond * bond,const gchar * address,guint port,const gchar * multicast_iface)938 gst_rist_sink_update_bond_address (GstRistSink * sink, RistSenderBond * bond,
939     const gchar * address, guint port, const gchar * multicast_iface)
940 {
941   g_free (bond->address);
942   g_free (bond->multicast_iface);
943   bond->address = g_strdup (address);
944   bond->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
945   bond->port = port;
946 
947   g_object_set (G_OBJECT (bond->rtp_sink), "host", address, "port", port,
948       "multicast-iface", bond->multicast_iface, NULL);
949   g_object_set (G_OBJECT (bond->rtcp_sink), "host", address,
950       "port", port + 1, "multicast-iface", bond->multicast_iface, NULL);
951 
952   /* TODO add runtime support
953    *  - add blocking the pad probe
954    *  - update RTCP socket
955    *  - cycle elements through NULL state
956    */
957 }
958 
959 /* called with bonds lock */
960 static gchar *
gst_rist_sink_get_bonds(GstRistSink * sink)961 gst_rist_sink_get_bonds (GstRistSink * sink)
962 {
963   GString *bonds = g_string_new ("");
964   gint i;
965 
966   for (i = 0; i < sink->bonds->len; i++) {
967     RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
968     if (bonds->len > 0)
969       g_string_append_c (bonds, ':');
970 
971     g_string_append_printf (bonds, "%s:%u", bond->address, bond->port);
972 
973     if (bond->multicast_iface)
974       g_string_append_printf (bonds, "/%s", bond->multicast_iface);
975   }
976 
977   return g_string_free (bonds, FALSE);
978 }
979 
980 struct RistAddress
981 {
982   gchar *address;
983   char *multicast_iface;
984   guint port;
985 };
986 
987 /* called with bonds lock */
988 static void
gst_rist_sink_set_bonds(GstRistSink * sink,const gchar * bonds)989 gst_rist_sink_set_bonds (GstRistSink * sink, const gchar * bonds)
990 {
991   GStrv tokens = NULL;
992   struct RistAddress *addrs;
993   gint i;
994 
995   if (bonds == NULL)
996     goto missing_address;
997 
998   tokens = g_strsplit (bonds, ",", 0);
999   if (tokens[0] == NULL)
1000     goto missing_address;
1001 
1002   addrs = g_new0 (struct RistAddress, g_strv_length (tokens));
1003 
1004   /* parse the address list */
1005   for (i = 0; tokens[i]; i++) {
1006     gchar *address = tokens[i];
1007     char *port_ptr, *iface_ptr, *endptr;
1008     guint port;
1009 
1010     port_ptr = g_utf8_strrchr (address, -1, ':');
1011     iface_ptr = g_utf8_strrchr (address, -1, '/');
1012 
1013     if (!port_ptr)
1014       goto bad_parameter;
1015     if (!g_ascii_isdigit (port_ptr[1]))
1016       goto bad_parameter;
1017 
1018     if (iface_ptr) {
1019       if (iface_ptr < port_ptr)
1020         goto bad_parameter;
1021       iface_ptr[0] = '\0';
1022     }
1023 
1024     port = strtol (port_ptr + 1, &endptr, 0);
1025     if (endptr[0] != '\0')
1026       goto bad_parameter;
1027 
1028     /* port must be a multiple of 2 between 2 and 65534 */
1029     if (port < 2 || (port & 1) || port > G_MAXUINT16)
1030       goto invalid_port;
1031 
1032     port_ptr[0] = '\0';
1033     addrs[i].port = port;
1034     addrs[i].address = g_strstrip (address);
1035     if (iface_ptr)
1036       addrs[i].multicast_iface = g_strstrip (iface_ptr + 1);
1037   }
1038 
1039   /* configure the bonds */
1040   for (i = 0; tokens[i]; i++) {
1041     RistSenderBond *bond = NULL;
1042 
1043     if (i < sink->bonds->len)
1044       bond = g_ptr_array_index (sink->bonds, i);
1045     else
1046       bond = gst_rist_sink_add_bond (sink);
1047 
1048     gst_rist_sink_update_bond_address (sink, bond, addrs[i].address,
1049         addrs[i].port, addrs[i].multicast_iface);
1050   }
1051 
1052   g_strfreev (tokens);
1053   return;
1054 
1055 missing_address:
1056   g_warning ("'bonding-addresses' cannot be empty");
1057   g_strfreev (tokens);
1058   return;
1059 
1060 bad_parameter:
1061   g_warning ("Failed to parse address '%s", tokens[i]);
1062   g_strfreev (tokens);
1063   g_free (addrs);
1064   return;
1065 
1066 invalid_port:
1067   g_warning ("RIST port must valid UDP port and a multiple of 2.");
1068   g_strfreev (tokens);
1069   g_free (addrs);
1070   return;
1071 }
1072 
1073 static void
gst_rist_sink_set_multicast_loopback(GstRistSink * sink,gboolean loop)1074 gst_rist_sink_set_multicast_loopback (GstRistSink * sink, gboolean loop)
1075 {
1076   gint i;
1077 
1078   sink->multicast_loopback = loop;
1079   for (i = 0; i < sink->bonds->len; i++) {
1080     RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
1081     g_object_set (G_OBJECT (bond->rtp_sink), "loop", loop, NULL);
1082     g_object_set (G_OBJECT (bond->rtcp_sink), "loop", loop, NULL);
1083   }
1084 }
1085 
1086 /* called with bonds lock */
1087 static void
gst_rist_sink_set_multicast_ttl(GstRistSink * sink,gint ttl)1088 gst_rist_sink_set_multicast_ttl (GstRistSink * sink, gint ttl)
1089 {
1090   gint i;
1091 
1092   sink->multicast_ttl = ttl;
1093   for (i = 0; i < sink->bonds->len; i++) {
1094     RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
1095     g_object_set (G_OBJECT (bond->rtp_sink), "ttl-mc", ttl, NULL);
1096     g_object_set (G_OBJECT (bond->rtcp_sink), "ttl-mc", ttl, NULL);
1097   }
1098 }
1099 
1100 static void
gst_rist_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1101 gst_rist_sink_get_property (GObject * object, guint prop_id,
1102     GValue * value, GParamSpec * pspec)
1103 {
1104   GstRistSink *sink = GST_RIST_SINK (object);
1105   GstStructure *sdes;
1106   RistSenderBond *bond;
1107 
1108   if (sink->construct_failed)
1109     return;
1110 
1111   g_mutex_lock (&sink->bonds_lock);
1112 
1113   bond = g_ptr_array_index (sink->bonds, 0);
1114 
1115   switch (prop_id) {
1116     case PROP_ADDRESS:
1117       g_value_set_string (value, bond->address);
1118       break;
1119 
1120     case PROP_PORT:
1121       g_value_set_uint (value, bond->port);
1122       break;
1123 
1124     case PROP_SENDER_BUFFER:
1125       g_object_get_property (G_OBJECT (bond->rtx_send), "max-size-time", value);
1126       break;
1127 
1128     case PROP_MIN_RTCP_INTERVAL:
1129       g_value_set_uint (value, (guint) (sink->min_rtcp_interval / GST_MSECOND));
1130       break;
1131 
1132     case PROP_MAX_RTCP_BANDWIDTH:
1133       g_value_set_double (value, sink->max_rtcp_bandwidth);
1134       break;
1135 
1136     case PROP_STATS_UPDATE_INTERVAL:
1137       g_value_set_uint (value, sink->stats_interval);
1138       break;
1139 
1140     case PROP_STATS:
1141       g_value_take_boxed (value, gst_rist_sink_create_stats (sink));
1142       break;
1143 
1144     case PROP_CNAME:
1145       g_object_get (sink->rtpbin, "sdes", &sdes, NULL);
1146       g_value_set_string (value, gst_structure_get_string (sdes, "cname"));
1147       gst_structure_free (sdes);
1148       break;
1149 
1150     case PROP_MULTICAST_LOOPBACK:
1151       g_value_set_boolean (value, sink->multicast_loopback);
1152       break;
1153 
1154     case PROP_MULTICAST_IFACE:
1155       g_value_set_string (value, bond->multicast_iface);
1156       break;
1157 
1158     case PROP_MULTICAST_TTL:
1159       g_value_set_int (value, sink->multicast_ttl);
1160       break;
1161 
1162     case PROP_BONDING_ADDRESSES:
1163       g_value_take_string (value, gst_rist_sink_get_bonds (sink));
1164       break;
1165 
1166     case PROP_BONDING_METHOD:
1167       g_value_set_enum (value, sink->bonding_method);
1168       break;
1169 
1170     case PROP_DISPATCHER:
1171       g_value_set_object (value, sink->dispatcher);
1172       break;
1173 
1174     case PROP_DROP_NULL_TS_PACKETS:
1175       g_object_get_property (G_OBJECT (sink->rtpext), "drop-null-ts-packets",
1176           value);
1177       break;
1178 
1179     case PROP_SEQUENCE_NUMBER_EXTENSION:
1180       g_object_get_property (G_OBJECT (sink->rtpext),
1181           "sequence-number-extension", value);
1182       break;
1183 
1184     default:
1185       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1186       break;
1187   }
1188 
1189   g_mutex_unlock (&sink->bonds_lock);
1190 }
1191 
1192 static void
gst_rist_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1193 gst_rist_sink_set_property (GObject * object, guint prop_id,
1194     const GValue * value, GParamSpec * pspec)
1195 {
1196   GstRistSink *sink = GST_RIST_SINK (object);
1197   GstStructure *sdes;
1198   RistSenderBond *bond;
1199 
1200   if (sink->construct_failed)
1201     return;
1202 
1203   g_mutex_lock (&sink->bonds_lock);
1204 
1205   bond = g_ptr_array_index (sink->bonds, 0);
1206 
1207   switch (prop_id) {
1208     case PROP_ADDRESS:
1209       g_free (bond->address);
1210       bond->address = g_value_dup_string (value);
1211       g_object_set_property (G_OBJECT (bond->rtp_sink), "host", value);
1212       g_object_set_property (G_OBJECT (bond->rtcp_sink), "host", value);
1213       break;
1214 
1215     case PROP_PORT:{
1216       guint port = g_value_get_uint (value);
1217 
1218       /* According to 5.1.1, RTCP receiver port most be event number and RTCP
1219        * port should be the RTP port + 1 */
1220 
1221       if (port & 0x1) {
1222         g_warning ("Invalid RIST port %u, should be an even number.", port);
1223         return;
1224       }
1225 
1226       bond->port = port;
1227       g_object_set (bond->rtp_sink, "port", port, NULL);
1228       g_object_set (bond->rtcp_sink, "port", port + 1, NULL);
1229       break;
1230     }
1231 
1232     case PROP_SENDER_BUFFER:
1233       g_object_set (bond->rtx_send,
1234           "max-size-time", g_value_get_uint (value), NULL);
1235       break;
1236 
1237     case PROP_MIN_RTCP_INTERVAL:
1238       sink->min_rtcp_interval = g_value_get_uint (value) * GST_MSECOND;
1239       break;
1240 
1241     case PROP_MAX_RTCP_BANDWIDTH:
1242       sink->max_rtcp_bandwidth = g_value_get_double (value);
1243       break;
1244 
1245     case PROP_STATS_UPDATE_INTERVAL:
1246       sink->stats_interval = g_value_get_uint (value);
1247       break;
1248 
1249     case PROP_CNAME:
1250       g_object_get (sink->rtpbin, "sdes", &sdes, NULL);
1251       gst_structure_set_value (sdes, "cname", value);
1252       g_object_set (sink->rtpbin, "sdes", sdes, NULL);
1253       gst_structure_free (sdes);
1254       break;
1255 
1256     case PROP_MULTICAST_LOOPBACK:
1257       gst_rist_sink_set_multicast_loopback (sink, g_value_get_boolean (value));
1258       break;
1259 
1260     case PROP_MULTICAST_IFACE:
1261       g_free (bond->multicast_iface);
1262       bond->multicast_iface = g_value_dup_string (value);
1263       g_object_set_property (G_OBJECT (bond->rtp_sink),
1264           "multicast-iface", value);
1265       g_object_set_property (G_OBJECT (bond->rtcp_sink),
1266           "multicast-iface", value);
1267       break;
1268 
1269     case PROP_MULTICAST_TTL:
1270       gst_rist_sink_set_multicast_ttl (sink, g_value_get_int (value));
1271       break;
1272 
1273     case PROP_BONDING_ADDRESSES:
1274       gst_rist_sink_set_bonds (sink, g_value_get_string (value));
1275       break;
1276 
1277     case PROP_BONDING_METHOD:
1278       sink->bonding_method = g_value_get_enum (value);
1279       break;
1280 
1281     case PROP_DISPATCHER:
1282       if (sink->dispatcher)
1283         g_object_unref (sink->dispatcher);
1284       sink->dispatcher = g_object_ref_sink (g_value_get_object (value));
1285       break;
1286 
1287     case PROP_DROP_NULL_TS_PACKETS:
1288       g_object_set_property (G_OBJECT (sink->rtpext), "drop-null-ts-packets",
1289           value);
1290       break;
1291 
1292     case PROP_SEQUENCE_NUMBER_EXTENSION:
1293       g_object_set_property (G_OBJECT (sink->rtpext),
1294           "sequence-number-extension", value);
1295       break;
1296 
1297     default:
1298       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1299       break;
1300   }
1301 
1302   g_mutex_unlock (&sink->bonds_lock);
1303 }
1304 
1305 static void
gst_rist_sink_finalize(GObject * object)1306 gst_rist_sink_finalize (GObject * object)
1307 {
1308   GstRistSink *sink = GST_RIST_SINK (object);
1309   gint i;
1310 
1311   g_mutex_lock (&sink->bonds_lock);
1312 
1313   for (i = 0; i < sink->bonds->len; i++) {
1314     RistSenderBond *bond = g_ptr_array_index (sink->bonds, i);
1315     g_free (bond->address);
1316     g_free (bond->multicast_iface);
1317     g_slice_free (RistSenderBond, bond);
1318   }
1319   g_ptr_array_free (sink->bonds, TRUE);
1320 
1321   g_clear_object (&sink->rtxbin);
1322 
1323   g_mutex_unlock (&sink->bonds_lock);
1324   g_mutex_clear (&sink->bonds_lock);
1325 
1326   G_OBJECT_CLASS (gst_rist_sink_parent_class)->finalize (object);
1327 }
1328 
1329 static void
gst_rist_sink_class_init(GstRistSinkClass * klass)1330 gst_rist_sink_class_init (GstRistSinkClass * klass)
1331 {
1332   GstElementClass *element_class = (GstElementClass *) klass;
1333   GObjectClass *object_class = (GObjectClass *) klass;
1334 
1335 
1336   session_id_quark = g_quark_from_static_string ("gst-rist-sink-session-id");
1337 
1338   gst_element_class_set_metadata (element_class,
1339       "RIST Sink", "Source/Network",
1340       "Sink that implements RIST TR-06-1 streaming specification",
1341       "Nicolas Dufresne <nicolas.dufresne@collabora.com");
1342   gst_element_class_add_static_pad_template (element_class, &sink_templ);
1343 
1344   element_class->change_state = gst_rist_sink_change_state;
1345 
1346   object_class->get_property = gst_rist_sink_get_property;
1347   object_class->set_property = gst_rist_sink_set_property;
1348   object_class->finalize = gst_rist_sink_finalize;
1349 
1350   g_object_class_install_property (object_class, PROP_ADDRESS,
1351       g_param_spec_string ("address", "Address",
1352           "Address to send packets to (can be IPv4 or IPv6).", "0.0.0.0",
1353           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1354 
1355   g_object_class_install_property (object_class, PROP_PORT,
1356       g_param_spec_uint ("port", "Port", "The port RTP packets will be sent, "
1357           "the RTCP port is this value + 1. This port must be an even number.",
1358           2, 65534, 5004,
1359           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1360 
1361   g_object_class_install_property (object_class, PROP_SENDER_BUFFER,
1362       g_param_spec_uint ("sender-buffer", "Sender Buffer",
1363           "Size of the retransmission queue (in ms)", 0, G_MAXUINT, 1200,
1364           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1365 
1366   g_object_class_install_property (object_class, PROP_MIN_RTCP_INTERVAL,
1367       g_param_spec_uint ("min-rtcp-interval", "Minimum RTCP Intercal",
1368           "The minimum interval (in ms) between two regular successive RTCP "
1369           "packets.", 0, 100, 100,
1370           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1371 
1372   g_object_class_install_property (object_class, PROP_MAX_RTCP_BANDWIDTH,
1373       g_param_spec_double ("max-rtcp-bandwidth", "Maximum RTCP Bandwidth",
1374           "The maximum bandwidth used for RTCP as a fraction of RTP bandwdith",
1375           0.0, 0.05, 0.05,
1376           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1377 
1378   g_object_class_install_property (object_class, PROP_STATS_UPDATE_INTERVAL,
1379       g_param_spec_uint ("stats-update-interval", "Statistics Update Interval",
1380           "The interval between 'stats' update notification (in ms) (0 disabled)",
1381           0, G_MAXUINT, 0,
1382           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1383 
1384   g_object_class_install_property (object_class, PROP_STATS,
1385       g_param_spec_boxed ("stats", "Statistics",
1386           "Statistic in a GstStructure named 'rist/x-sender-stats'",
1387           GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1388 
1389   g_object_class_install_property (object_class, PROP_CNAME,
1390       g_param_spec_string ("cname", "CName",
1391           "Set the CNAME in the SDES block of the sender report.", NULL,
1392           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
1393           GST_PARAM_DOC_SHOW_DEFAULT));
1394 
1395   g_object_class_install_property (object_class, PROP_MULTICAST_LOOPBACK,
1396       g_param_spec_boolean ("multicast-loopback", "Multicast Loopback",
1397           "When enabled, the packet will be received locally.", FALSE,
1398           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1399 
1400   g_object_class_install_property (object_class, PROP_MULTICAST_IFACE,
1401       g_param_spec_string ("multicast-iface", "multicast-iface",
1402           "The multicast interface to use to send packets.", NULL,
1403           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1404 
1405   g_object_class_install_property (object_class, PROP_MULTICAST_TTL,
1406       g_param_spec_int ("multicast-ttl", "Multicast TTL",
1407           "The multicast time-to-live parameter.", 0, 255, 1,
1408           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1409 
1410   g_object_class_install_property (object_class, PROP_BONDING_ADDRESSES,
1411       g_param_spec_string ("bonding-addresses", "Bonding Addresses",
1412           "Comma (,) separated list of <address>:<port> to send to. ", NULL,
1413           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1414 
1415   g_object_class_install_property (object_class, PROP_BONDING_METHOD,
1416       g_param_spec_enum ("bonding-method", "Bonding Method",
1417           "Defines the bonding method to use.",
1418           gst_rist_bonding_method_get_type (),
1419           GST_RIST_BONDING_METHOD_BROADCAST,
1420           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1421 
1422   g_object_class_install_property (object_class, PROP_DISPATCHER,
1423       g_param_spec_object ("dispatcher", "Bonding Dispatcher",
1424           "An element that takes care of multi-plexing bonded links. When set "
1425           "\"bonding-method\" is ignored.",
1426           GST_TYPE_ELEMENT,
1427           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS |
1428           GST_PARAM_MUTABLE_READY));
1429   g_object_class_install_property (object_class, PROP_DROP_NULL_TS_PACKETS,
1430       g_param_spec_boolean ("drop-null-ts-packets", "Drop null TS packets",
1431           "Drop null MPEG-TS packet and replace them with a custom header"
1432           " extension.", FALSE,
1433           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1434   g_object_class_install_property (object_class, PROP_SEQUENCE_NUMBER_EXTENSION,
1435       g_param_spec_boolean ("sequence-number-extension",
1436           "Sequence Number Extension",
1437           "Add sequence number extension to packets.", FALSE,
1438           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
1439 
1440   gst_type_mark_as_plugin_api (gst_rist_bonding_method_get_type (), 0);
1441 }
1442