• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) <2018> Marc Leeman <marc.leeman@gmail.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 
20 /**
21  * SECTION: gstrtpsrc
22  * @title: GstRtpSrc
23  * @short description: element with Uri interface to get RTP data from
24  * the network.
25  *
26  * RTP (RFC 3550) is a protocol to stream media over the network while
27  * retaining the timing information and providing enough information to
28  * reconstruct the correct timing domain by the receiver.
29  *
30  * The RTP data port should be even, while the RTCP port should be
31  * odd. The URI that is entered defines the data port, the RTCP port will
32  * be allocated to the next port.
33  *
34  * This element hooks up the correct sockets to support both RTP as the
35  * accompanying RTCP layer.
36  *
37  * This Bin handles taking in of data from the network and provides the
38  * RTP payloaded data.
39  *
40  * This element also implements the URI scheme `rtp://` allowing to render
41  * RTP streams in GStreamer based media players. The RTP URI handler also
42  * allows setting properties through the URI query.
43  */
44 #ifdef HAVE_CONFIG_H
45 #include <config.h>
46 #endif
47 
48 #include <stdio.h>
49 
50 #include <gst/net/net.h>
51 #include <gst/rtp/gstrtppayloads.h>
52 
53 #include "gstrtpsrc.h"
54 #include "gstrtp-utils.h"
55 
56 GST_DEBUG_CATEGORY_STATIC (gst_rtp_src_debug);
57 #define GST_CAT_DEFAULT gst_rtp_src_debug
58 
59 #define DEFAULT_PROP_TTL              64
60 #define DEFAULT_PROP_TTL_MC           1
61 #define DEFAULT_PROP_ENCODING_NAME    NULL
62 #define DEFAULT_PROP_CAPS             NULL
63 #define DEFAULT_PROP_LATENCY          200
64 
65 #define DEFAULT_PROP_ADDRESS          "0.0.0.0"
66 #define DEFAULT_PROP_PORT             5004
67 #define DEFAULT_PROP_URI              "rtp://"DEFAULT_PROP_ADDRESS":"G_STRINGIFY(DEFAULT_PROP_PORT)
68 #define DEFAULT_PROP_MULTICAST_IFACE  NULL
69 
70 enum
71 {
72   PROP_0,
73 
74   PROP_URI,
75   PROP_ADDRESS,
76   PROP_PORT,
77   PROP_TTL,
78   PROP_TTL_MC,
79   PROP_ENCODING_NAME,
80   PROP_LATENCY,
81   PROP_MULTICAST_IFACE,
82   PROP_CAPS,
83 
84   PROP_LAST
85 };
86 
87 static void gst_rtp_src_uri_handler_init (gpointer g_iface,
88     gpointer iface_data);
89 
90 #define gst_rtp_src_parent_class parent_class
91 G_DEFINE_TYPE_WITH_CODE (GstRtpSrc, gst_rtp_src, GST_TYPE_BIN,
92     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_rtp_src_uri_handler_init);
93     GST_DEBUG_CATEGORY_INIT (gst_rtp_src_debug, "rtpsrc", 0, "RTP Source"));
94 GST_ELEMENT_REGISTER_DEFINE (rtpsrc, "rtpsrc", GST_RANK_PRIMARY + 1,
95     GST_TYPE_RTP_SRC);
96 
97 #define GST_RTP_SRC_GET_LOCK(obj) (&((GstRtpSrc*)(obj))->lock)
98 #define GST_RTP_SRC_LOCK(obj) (g_mutex_lock (GST_RTP_SRC_GET_LOCK(obj)))
99 #define GST_RTP_SRC_UNLOCK(obj) (g_mutex_unlock (GST_RTP_SRC_GET_LOCK(obj)))
100 
101 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src_%u",
102     GST_PAD_SRC,
103     GST_PAD_SOMETIMES,
104     GST_STATIC_CAPS ("application/x-rtp"));
105 
106 static GstStateChangeReturn
107 gst_rtp_src_change_state (GstElement * element, GstStateChange transition);
108 
109 /**
110  * gst_rtp_src_rtpbin_request_pt_map_cb:
111  * @self: The current #GstRtpSrc object
112  *
113  * #GstRtpBin callback to map a pt on RTP caps.
114  *
115  * Returns: (transfer none): the guess on the RTP caps based on the PT
116  * and caps.
117  */
118 static GstCaps *
gst_rtp_src_rtpbin_request_pt_map_cb(GstElement * rtpbin,guint session_id,guint pt,gpointer data)119 gst_rtp_src_rtpbin_request_pt_map_cb (GstElement * rtpbin, guint session_id,
120     guint pt, gpointer data)
121 {
122   GstRtpSrc *self = GST_RTP_SRC (data);
123   const GstRTPPayloadInfo *p = NULL;
124 
125   GST_DEBUG_OBJECT (self,
126       "Requesting caps for session-id 0x%x and pt %u.", session_id, pt);
127 
128   if (G_UNLIKELY (self->caps)) {
129     GST_DEBUG_OBJECT (self,
130         "Full caps were set, no need for lookup %" GST_PTR_FORMAT, self->caps);
131     return gst_caps_copy (self->caps);
132   }
133 
134   /* the encoding-name has more relevant information */
135   if (self->encoding_name != NULL) {
136     /* Unfortunately, the media needs to be passed in the function. Since
137      * it is not known, try for video if video not found. */
138     p = gst_rtp_payload_info_for_name ("video", self->encoding_name);
139     if (p == NULL)
140       p = gst_rtp_payload_info_for_name ("audio", self->encoding_name);
141 
142   }
143 
144   /* If info has been found before based on the encoding-name, go with
145    * it. If not, try to look it up on with a static one. Needs to be guarded
146    * because some encoders do not use dynamic values for H.264 */
147   if (p == NULL) {
148     /* Static payload types, this is a simple lookup */
149     if (!GST_RTP_PAYLOAD_IS_DYNAMIC (pt)) {
150       p = gst_rtp_payload_info_for_pt (pt);
151     }
152   }
153 
154   if (p != NULL) {
155     GstCaps *ret = gst_caps_new_simple ("application/x-rtp",
156         "encoding-name", G_TYPE_STRING, p->encoding_name,
157         "clock-rate", G_TYPE_INT, p->clock_rate,
158         "media", G_TYPE_STRING, p->media, NULL);
159 
160     GST_DEBUG_OBJECT (self, "Decided on caps %" GST_PTR_FORMAT, ret);
161 
162     return ret;
163   }
164 
165   GST_DEBUG_OBJECT (self, "Could not determine caps based on pt and"
166       " the encoding-name was not set.");
167   return NULL;
168 }
169 
170 static void
gst_rtp_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)171 gst_rtp_src_set_property (GObject * object, guint prop_id,
172     const GValue * value, GParamSpec * pspec)
173 {
174   GstRtpSrc *self = GST_RTP_SRC (object);
175   GstCaps *caps;
176 
177   switch (prop_id) {
178     case PROP_URI:{
179       GstUri *uri = NULL;
180       const gchar *str_uri = g_value_get_string (value);
181 
182       GST_RTP_SRC_LOCK (object);
183       uri = gst_uri_from_string (str_uri);
184       if (uri == NULL) {
185         if (str_uri) {
186           GST_RTP_SRC_UNLOCK (object);
187           GST_ERROR_OBJECT (object, "Invalid uri: %s", str_uri);
188 
189           break;
190         }
191       }
192 
193       if (self->uri)
194         gst_uri_unref (self->uri);
195       self->uri = uri;
196 
197       if (!uri) {
198         GST_RTP_SRC_UNLOCK (object);
199 
200         break;
201       }
202 
203       /* Recursive set to self, do not use the same lock in all property
204        * setters. */
205       g_object_set (self, "address", gst_uri_get_host (self->uri), NULL);
206       g_object_set (self, "port", gst_uri_get_port (self->uri), NULL);
207       gst_rtp_utils_set_properties_from_uri_query (G_OBJECT (self), self->uri);
208       GST_RTP_SRC_UNLOCK (object);
209       break;
210     }
211     case PROP_ADDRESS:{
212       gst_uri_set_host (self->uri, g_value_get_string (value));
213       g_object_set_property (G_OBJECT (self->rtp_src), "address", value);
214       g_object_set_property (G_OBJECT (self->rtcp_src), "address", value);
215       break;
216     }
217     case PROP_PORT:{
218       guint port = g_value_get_uint (value);
219 
220       /* According to RFC 3550, 11, RTCP receiver port should be even
221        * number and RTCP port should be the RTP port + 1 */
222       if (port & 0x1)
223         GST_WARNING_OBJECT (self,
224             "Port %u is odd, this is not standard (see RFC 3550).", port);
225 
226       gst_uri_set_port (self->uri, port);
227       g_object_set (self->rtp_src, "port", port, NULL);
228       g_object_set (self->rtcp_src, "port", port + 1, NULL);
229       break;
230     }
231     case PROP_TTL:
232       self->ttl = g_value_get_int (value);
233       break;
234     case PROP_TTL_MC:
235       self->ttl_mc = g_value_get_int (value);
236       break;
237     case PROP_ENCODING_NAME:
238       g_free (self->encoding_name);
239       self->encoding_name = g_value_dup_string (value);
240       if (self->rtp_src) {
241         caps = gst_rtp_src_rtpbin_request_pt_map_cb (NULL, 0, 96, self);
242         g_object_set (G_OBJECT (self->rtp_src), "caps", caps, NULL);
243         gst_caps_unref (caps);
244       }
245       break;
246     case PROP_LATENCY:
247       g_object_set (self->rtpbin, "latency", g_value_get_uint (value), NULL);
248       break;
249     case PROP_MULTICAST_IFACE:
250       g_free (self->multi_iface);
251 
252       if (g_value_get_string (value) == NULL)
253         self->multi_iface = g_strdup (DEFAULT_PROP_MULTICAST_IFACE);
254       else
255         self->multi_iface = g_value_dup_string (value);
256       break;
257     case PROP_CAPS:
258     {
259       const GstCaps *new_caps_val = gst_value_get_caps (value);
260       GstCaps *new_caps = NULL;
261       GstCaps *old_caps = self->caps;
262 
263       if (new_caps_val != NULL) {
264         new_caps = gst_caps_copy (new_caps_val);
265       }
266 
267       self->caps = new_caps;
268 
269       if (old_caps)
270         gst_caps_unref (old_caps);
271       break;
272     }
273     default:
274       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
275       break;
276   }
277 }
278 
279 static void
gst_rtp_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)280 gst_rtp_src_get_property (GObject * object, guint prop_id,
281     GValue * value, GParamSpec * pspec)
282 {
283   GstRtpSrc *self = GST_RTP_SRC (object);
284 
285   switch (prop_id) {
286     case PROP_URI:
287       GST_RTP_SRC_LOCK (object);
288       if (self->uri)
289         g_value_take_string (value, gst_uri_to_string (self->uri));
290       else
291         g_value_set_string (value, NULL);
292       GST_RTP_SRC_UNLOCK (object);
293       break;
294     case PROP_ADDRESS:
295       g_value_set_string (value, gst_uri_get_host (self->uri));
296       break;
297     case PROP_PORT:
298       g_value_set_uint (value, gst_uri_get_port (self->uri));
299       break;
300     case PROP_TTL:
301       g_value_set_int (value, self->ttl);
302       break;
303     case PROP_TTL_MC:
304       g_value_set_int (value, self->ttl_mc);
305       break;
306     case PROP_ENCODING_NAME:
307       g_value_set_string (value, self->encoding_name);
308       break;
309     case PROP_LATENCY:
310       g_object_get_property (G_OBJECT (self->rtpbin), "latency", value);
311       break;
312     case PROP_MULTICAST_IFACE:
313       g_value_set_string (value, self->multi_iface);
314       break;
315     case PROP_CAPS:
316       gst_value_set_caps (value, self->caps);
317       break;
318     default:
319       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
320       break;
321   }
322 }
323 
324 static void
gst_rtp_src_finalize(GObject * gobject)325 gst_rtp_src_finalize (GObject * gobject)
326 {
327   GstRtpSrc *self = GST_RTP_SRC (gobject);
328 
329   if (self->uri)
330     gst_uri_unref (self->uri);
331   g_free (self->encoding_name);
332 
333   g_free (self->multi_iface);
334 
335   if (self->caps)
336     gst_caps_unref (self->caps);
337 
338   g_clear_object (&self->rtcp_send_addr);
339 
340   g_mutex_clear (&self->lock);
341   G_OBJECT_CLASS (parent_class)->finalize (gobject);
342 }
343 
344 static void
gst_rtp_src_handle_message(GstBin * bin,GstMessage * message)345 gst_rtp_src_handle_message (GstBin * bin, GstMessage * message)
346 {
347   switch (GST_MESSAGE_TYPE (message)) {
348     case GST_MESSAGE_STREAM_START:
349     case GST_MESSAGE_EOS:
350       /* drop stream-start & eos from our internal udp sink(s);
351          https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1368 */
352       gst_message_unref (message);
353       break;
354     default:
355       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
356       break;
357   }
358 }
359 
360 static void
gst_rtp_src_class_init(GstRtpSrcClass * klass)361 gst_rtp_src_class_init (GstRtpSrcClass * klass)
362 {
363   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
364   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
365   GstBinClass *gstbin_class = GST_BIN_CLASS (klass);
366 
367   gobject_class->set_property = gst_rtp_src_set_property;
368   gobject_class->get_property = gst_rtp_src_get_property;
369   gobject_class->finalize = gst_rtp_src_finalize;
370   gstelement_class->change_state = gst_rtp_src_change_state;
371   gstbin_class->handle_message = gst_rtp_src_handle_message;
372 
373   /**
374    * GstRtpSrc:uri:
375    *
376    * uri to an RTP from. All GStreamer parameters can be
377    * encoded in the URI, this URI format is RFC compliant.
378    */
379   g_object_class_install_property (gobject_class, PROP_URI,
380       g_param_spec_string ("uri", "URI",
381           "URI in the form of rtp://host:port?query", DEFAULT_PROP_URI,
382           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
383 
384   /**
385    * GstRtpSrc:address:
386    *
387    * Address to receive packets from (can be IPv4 or IPv6).
388    */
389   g_object_class_install_property (gobject_class, PROP_ADDRESS,
390       g_param_spec_string ("address", "Address",
391           "Address to receive packets from (can be IPv4 or IPv6).",
392           DEFAULT_PROP_ADDRESS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
393 
394   /**
395    * GstRtpSrc:port:
396    *
397    * The port to listen to RTP packets, the RTCP port is this value
398    * +1. This port must be an even number.
399    */
400   g_object_class_install_property (gobject_class, PROP_PORT,
401       g_param_spec_uint ("port", "Port", "The port to listen for RTP packets, "
402           "the RTCP port is this value + 1. This port must be an even number.",
403           2, 65534, DEFAULT_PROP_PORT,
404           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_CONSTRUCT));
405 
406   /**
407    * GstRtpSrc:ttl:
408    *
409    * Set the unicast TTL parameter. In RTP this of importance for RTCP.
410    */
411   g_object_class_install_property (gobject_class, PROP_TTL,
412       g_param_spec_int ("ttl", "Unicast TTL",
413           "Used for setting the unicast TTL parameter",
414           0, 255, DEFAULT_PROP_TTL,
415           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
416 
417   /**
418    * GstRtpSrc:ttl-mc:
419    *
420    * Set the multicast TTL parameter. In RTP this of importance for RTCP.
421    */
422   g_object_class_install_property (gobject_class, PROP_TTL_MC,
423       g_param_spec_int ("ttl-mc", "Multicast TTL",
424           "Used for setting the multicast TTL parameter", 0, 255,
425           DEFAULT_PROP_TTL_MC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
426 
427   /**
428    * GstRtpSrc:encoding-name:
429    *
430    * Set the encoding name of the stream to use. This is a short-hand for
431    * the full caps and maps typically to the encoding-name in the RTP caps.
432    */
433   g_object_class_install_property (gobject_class, PROP_ENCODING_NAME,
434       g_param_spec_string ("encoding-name", "Caps encoding name",
435           "Encoding name use to determine caps parameters",
436           DEFAULT_PROP_ENCODING_NAME,
437           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
438 
439   /**
440    * GstRtpSrc:latency:
441    *
442    * Set the size of the latency buffer in the
443    * GstRtpBin/GstRtpJitterBuffer to compensate for network jitter.
444    */
445   g_object_class_install_property (gobject_class, PROP_LATENCY,
446       g_param_spec_uint ("latency", "Buffer latency in ms",
447           "Default amount of ms to buffer in the jitterbuffers", 0,
448           G_MAXUINT, DEFAULT_PROP_LATENCY,
449           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
450 
451   /**
452    * GstRtpSink:multicast-iface:
453    *
454    * The networkinterface on which to join the multicast group
455    */
456   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
457       g_param_spec_string ("multicast-iface", "Multicast Interface",
458           "The network interface on which to join the multicast group."
459           "This allows multiple interfaces separated by comma. (\"eth0,eth1\")",
460           DEFAULT_PROP_MULTICAST_IFACE,
461           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
462 
463   /**
464   * GstRtpSrc:caps:
465   *
466   * The RTP caps of the incoming stream.
467   *
468   * Since: 1.20
469   */
470   g_object_class_install_property (gobject_class, PROP_CAPS,
471       g_param_spec_boxed ("caps", "Caps",
472           "The caps of the incoming stream", GST_TYPE_CAPS,
473           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
474 
475   gst_element_class_add_pad_template (gstelement_class,
476       gst_static_pad_template_get (&src_template));
477 
478   gst_element_class_set_static_metadata (gstelement_class,
479       "RTP Source element",
480       "Generic/Bin/Src",
481       "Simple RTP src", "Marc Leeman <marc.leeman@gmail.com>");
482 }
483 
484 static void
clear_ssrc(GstElement * rtpbin,GstPad * gpad)485 clear_ssrc (GstElement * rtpbin, GstPad * gpad)
486 {
487   GstPad *pad;
488   gint pt;
489   guint ssrc;
490 
491   pad = gst_ghost_pad_get_target (GST_GHOST_PAD (gpad));
492   if (!pad)
493     return;
494 
495   if (sscanf (GST_PAD_NAME (pad), "recv_rtp_src_0_%u_%d", &ssrc, &pt) != 2) {
496     gst_object_unref (pad);
497     return;
498   }
499   gst_object_unref (pad);
500 
501   g_signal_emit_by_name (rtpbin, "clear-ssrc", 0, ssrc);
502 }
503 
504 static void
gst_rtp_src_rtpbin_pad_added_cb(GstElement * element,GstPad * pad,gpointer data)505 gst_rtp_src_rtpbin_pad_added_cb (GstElement * element, GstPad * pad,
506     gpointer data)
507 {
508   GstRtpSrc *self = GST_RTP_SRC (data);
509   GstCaps *caps = gst_pad_query_caps (pad, NULL);
510   const GstStructure *s;
511   GstPad *upad = NULL;
512   gint pt = -1;
513   gchar name[48];
514 
515   /* Expose RTP data pad only */
516   GST_INFO_OBJECT (self,
517       "Element %" GST_PTR_FORMAT " added pad %" GST_PTR_FORMAT "with caps %"
518       GST_PTR_FORMAT ".", element, pad, caps);
519 
520   /* Sanity checks */
521   if (GST_PAD_DIRECTION (pad) == GST_PAD_SINK) {
522     /* Sink pad, do not expose */
523     gst_caps_unref (caps);
524     return;
525   }
526 
527   if (G_LIKELY (caps)) {
528     GstCaps *ref_caps = gst_caps_new_empty_simple ("application/x-rtcp");
529 
530     if (gst_caps_can_intersect (caps, ref_caps)) {
531       /* SRC RTCP caps, do not expose */
532       gst_caps_unref (ref_caps);
533       gst_caps_unref (caps);
534 
535       return;
536     }
537     gst_caps_unref (ref_caps);
538   } else {
539     GST_ERROR_OBJECT (self, "Pad with no caps detected.");
540     gst_caps_unref (caps);
541 
542     return;
543   }
544 
545   s = gst_caps_get_structure (caps, 0);
546   gst_structure_get_int (s, "payload", &pt);
547   gst_caps_unref (caps);
548 
549   GST_RTP_SRC_LOCK (self);
550   g_snprintf (name, 48, "src_%u", pt);
551   upad = gst_element_get_static_pad (GST_ELEMENT (self), name);
552 
553   if (!upad) {
554     GST_DEBUG_OBJECT (self, "Adding new pad: %s", name);
555 
556     upad = gst_ghost_pad_new (name, pad);
557     gst_pad_set_active (upad, TRUE);
558     gst_element_add_pad (GST_ELEMENT (self), upad);
559   } else {
560     GST_DEBUG_OBJECT (self, "Re-using existing pad: %s", GST_PAD_NAME (upad));
561     clear_ssrc (element, upad);
562     gst_ghost_pad_set_target (GST_GHOST_PAD (upad), pad);
563     gst_object_unref (upad);
564   }
565   GST_RTP_SRC_UNLOCK (self);
566 }
567 
568 static void
gst_rtp_src_rtpbin_pad_removed_cb(GstElement * element,GstPad * pad,gpointer data)569 gst_rtp_src_rtpbin_pad_removed_cb (GstElement * element, GstPad * pad,
570     gpointer data)
571 {
572   GstRtpSrc *self = GST_RTP_SRC (data);
573   GST_INFO_OBJECT (self,
574       "Element %" GST_PTR_FORMAT " removed pad %" GST_PTR_FORMAT ".", element,
575       pad);
576 }
577 
578 static void
gst_rtp_src_rtpbin_on_ssrc_collision_cb(GstElement * rtpbin,guint session_id,guint ssrc,gpointer data)579 gst_rtp_src_rtpbin_on_ssrc_collision_cb (GstElement * rtpbin, guint session_id,
580     guint ssrc, gpointer data)
581 {
582   GstRtpSrc *self = GST_RTP_SRC (data);
583 
584   GST_INFO_OBJECT (self,
585       "Detected an SSRC collision: session-id 0x%x, ssrc 0x%x.", session_id,
586       ssrc);
587 }
588 
589 static void
gst_rtp_src_rtpbin_on_new_ssrc_cb(GstElement * rtpbin,guint session_id,guint ssrc,gpointer data)590 gst_rtp_src_rtpbin_on_new_ssrc_cb (GstElement * rtpbin, guint session_id,
591     guint ssrc, gpointer data)
592 {
593   GstRtpSrc *self = GST_RTP_SRC (data);
594 
595   GST_INFO_OBJECT (self, "Detected a new SSRC: session-id 0x%x, ssrc 0x%x.",
596       session_id, ssrc);
597 }
598 
599 static GstPadProbeReturn
gst_rtp_src_on_recv_rtcp(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)600 gst_rtp_src_on_recv_rtcp (GstPad * pad, GstPadProbeInfo * info,
601     gpointer user_data)
602 {
603   GstRtpSrc *self = GST_RTP_SRC (user_data);
604   GstBuffer *buffer;
605   GstNetAddressMeta *meta;
606 
607   if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) {
608     GstBufferList *buffer_list = info->data;
609     buffer = gst_buffer_list_get (buffer_list, 0);
610   } else {
611     buffer = info->data;
612   }
613 
614   meta = gst_buffer_get_net_address_meta (buffer);
615 
616   GST_OBJECT_LOCK (self);
617   g_clear_object (&self->rtcp_send_addr);
618   self->rtcp_send_addr = g_object_ref (meta->addr);
619   GST_OBJECT_UNLOCK (self);
620 
621   return GST_PAD_PROBE_OK;
622 }
623 
624 static inline void
gst_rtp_src_attach_net_address_meta(GstRtpSrc * self,GstBuffer * buffer)625 gst_rtp_src_attach_net_address_meta (GstRtpSrc * self, GstBuffer * buffer)
626 {
627   GST_OBJECT_LOCK (self);
628   if (self->rtcp_send_addr)
629     gst_buffer_add_net_address_meta (buffer, self->rtcp_send_addr);
630   GST_OBJECT_UNLOCK (self);
631 }
632 
633 static GstPadProbeReturn
gst_rtp_src_on_send_rtcp(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)634 gst_rtp_src_on_send_rtcp (GstPad * pad, GstPadProbeInfo * info,
635     gpointer user_data)
636 {
637   GstRtpSrc *self = GST_RTP_SRC (user_data);
638 
639   if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) {
640     GstBufferList *buffer_list = info->data;
641     GstBuffer *buffer;
642     gint i;
643 
644     info->data = buffer_list = gst_buffer_list_make_writable (buffer_list);
645     for (i = 0; i < gst_buffer_list_length (buffer_list); i++) {
646       buffer = gst_buffer_list_get (buffer_list, i);
647       gst_rtp_src_attach_net_address_meta (self, buffer);
648     }
649   } else {
650     GstBuffer *buffer = info->data;
651     info->data = buffer = gst_buffer_make_writable (buffer);
652     gst_rtp_src_attach_net_address_meta (self, buffer);
653   }
654 
655   return GST_PAD_PROBE_OK;
656 }
657 
658 static gboolean
gst_rtp_src_start(GstRtpSrc * self)659 gst_rtp_src_start (GstRtpSrc * self)
660 {
661   GstPad *pad;
662   GSocket *socket;
663   GInetAddress *iaddr;
664   GstCaps *caps;
665   GError *error = NULL;
666 
667   /* Should not be NULL */
668   g_return_val_if_fail (self->uri != NULL, FALSE);
669 
670   /* share the socket created by the source */
671   g_object_get (G_OBJECT (self->rtcp_src), "used-socket", &socket, NULL);
672   if (!G_IS_SOCKET (socket)) {
673     GST_WARNING_OBJECT (self, "Could not retrieve RTCP src socket.");
674   }
675 
676   iaddr = g_inet_address_new_from_string (gst_uri_get_host (self->uri));
677   if (!iaddr) {
678     GList *results;
679     GResolver *resolver = NULL;
680 
681     resolver = g_resolver_get_default ();
682     results =
683         g_resolver_lookup_by_name (resolver, gst_uri_get_host (self->uri), NULL,
684         &error);
685 
686     if (!results) {
687       g_object_unref (resolver);
688       goto dns_resolve_failed;
689     }
690 
691     iaddr = G_INET_ADDRESS (g_object_ref (results->data));
692 
693     g_resolver_free_addresses (results);
694     g_object_unref (resolver);
695   }
696 
697   if (g_inet_address_get_is_multicast (iaddr)) {
698     /* mc-ttl is not supported by dynudpsink */
699     g_socket_set_multicast_ttl (socket, self->ttl_mc);
700     /* In multicast, send RTCP to the multicast group */
701     self->rtcp_send_addr =
702         g_inet_socket_address_new (iaddr, gst_uri_get_port (self->uri) + 1);
703 
704     /* set multicast-iface on the udpsrc and udpsink elements */
705     g_object_set (self->rtcp_src, "multicast-iface", self->multi_iface, NULL);
706     g_object_set (self->rtp_src, "multicast-iface", self->multi_iface, NULL);
707   } else {
708     /* In unicast, send RTCP to the detected sender address */
709     g_socket_set_ttl (socket, self->ttl);
710     pad = gst_element_get_static_pad (self->rtcp_src, "src");
711     self->rtcp_recv_probe = gst_pad_add_probe (pad,
712         GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
713         gst_rtp_src_on_recv_rtcp, self, NULL);
714     gst_object_unref (pad);
715   }
716   g_object_unref (iaddr);
717 
718   /* no need to set address if unicast */
719   caps = gst_caps_new_empty_simple ("application/x-rtcp");
720   g_object_set (self->rtcp_src, "caps", caps, NULL);
721   gst_caps_unref (caps);
722 
723   pad = gst_element_get_static_pad (self->rtcp_sink, "sink");
724   self->rtcp_send_probe = gst_pad_add_probe (pad,
725       GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
726       gst_rtp_src_on_send_rtcp, self, NULL);
727   gst_object_unref (pad);
728 
729   g_object_set (self->rtcp_sink, "socket", socket, "close-socket", FALSE, NULL);
730   g_object_unref (socket);
731 
732   gst_element_set_locked_state (self->rtcp_sink, FALSE);
733   gst_element_sync_state_with_parent (self->rtcp_sink);
734 
735   return TRUE;
736 
737 dns_resolve_failed:
738   GST_ELEMENT_ERROR (self, RESOURCE, NOT_FOUND,
739       ("Could not resolve hostname '%s'", gst_uri_get_host (self->uri)),
740       ("DNS resolver reported: %s", error->message));
741   g_error_free (error);
742   return FALSE;
743 }
744 
745 static void
gst_rtp_src_stop(GstRtpSrc * self)746 gst_rtp_src_stop (GstRtpSrc * self)
747 {
748   GstPad *pad;
749 
750   if (self->rtcp_recv_probe) {
751     pad = gst_element_get_static_pad (self->rtcp_src, "src");
752     gst_pad_remove_probe (pad, self->rtcp_recv_probe);
753     self->rtcp_recv_probe = 0;
754     gst_object_unref (pad);
755   }
756 
757   pad = gst_element_get_static_pad (self->rtcp_sink, "sink");
758   gst_pad_remove_probe (pad, self->rtcp_send_probe);
759   self->rtcp_send_probe = 0;
760   gst_object_unref (pad);
761 }
762 
763 static GstStateChangeReturn
gst_rtp_src_change_state(GstElement * element,GstStateChange transition)764 gst_rtp_src_change_state (GstElement * element, GstStateChange transition)
765 {
766   GstRtpSrc *self = GST_RTP_SRC (element);
767   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
768 
769   GST_DEBUG_OBJECT (self, "Changing state: %s => %s",
770       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
771       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
772 
773   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
774   if (ret == GST_STATE_CHANGE_FAILURE)
775     return ret;
776 
777   switch (transition) {
778     case GST_STATE_CHANGE_NULL_TO_READY:
779       if (gst_rtp_src_start (self) == FALSE)
780         return GST_STATE_CHANGE_FAILURE;
781       break;
782     case GST_STATE_CHANGE_READY_TO_PAUSED:
783       ret = GST_STATE_CHANGE_NO_PREROLL;
784       break;
785     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
786       ret = GST_STATE_CHANGE_NO_PREROLL;
787       break;
788     case GST_STATE_CHANGE_READY_TO_NULL:
789       gst_rtp_src_stop (self);
790       break;
791     default:
792       break;
793   }
794 
795   return ret;
796 }
797 
798 static void
gst_rtp_src_init(GstRtpSrc * self)799 gst_rtp_src_init (GstRtpSrc * self)
800 {
801   gchar name[48];
802   const gchar *missing_plugin = NULL;
803 
804   self->rtpbin = NULL;
805   self->rtp_src = NULL;
806   self->rtcp_src = NULL;
807   self->rtcp_sink = NULL;
808   self->multi_iface = g_strdup (DEFAULT_PROP_MULTICAST_IFACE);
809 
810   self->uri = gst_uri_from_string (DEFAULT_PROP_URI);
811   self->ttl = DEFAULT_PROP_TTL;
812   self->ttl_mc = DEFAULT_PROP_TTL_MC;
813   self->encoding_name = DEFAULT_PROP_ENCODING_NAME;
814   self->caps = DEFAULT_PROP_CAPS;
815 
816   GST_OBJECT_FLAG_SET (GST_OBJECT (self), GST_ELEMENT_FLAG_SOURCE);
817   gst_bin_set_suppressed_flags (GST_BIN (self),
818       GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
819 
820   g_mutex_init (&self->lock);
821 
822   /* Construct the RTP receiver pipeline.
823    *
824    * udpsrc -> [recv_rtp_sink_%u]  --------  [recv_rtp_src_%u_%u_%u]
825    *                              | rtpbin |
826    * udpsrc -> [recv_rtcp_sink_%u] --------  [send_rtcp_src_%u] -> udpsink
827    *
828    * This pipeline is fixed for now, note that optionally an FEC stream could
829    * be added later.
830    */
831 
832   self->rtpbin = gst_element_factory_make ("rtpbin", "rtp_recv_rtpbin0");
833   if (self->rtpbin == NULL) {
834     missing_plugin = "rtpmanager";
835     goto missing_plugin;
836   }
837   g_object_set (self->rtpbin, "autoremove", TRUE, NULL);
838 
839   gst_bin_add (GST_BIN (self), self->rtpbin);
840 
841   /* Add rtpbin callbacks to monitor the operation of rtpbin */
842   g_signal_connect_object (self->rtpbin, "pad-added",
843       G_CALLBACK (gst_rtp_src_rtpbin_pad_added_cb), self, 0);
844   g_signal_connect_object (self->rtpbin, "pad-removed",
845       G_CALLBACK (gst_rtp_src_rtpbin_pad_removed_cb), self, 0);
846   g_signal_connect_object (self->rtpbin, "request-pt-map",
847       G_CALLBACK (gst_rtp_src_rtpbin_request_pt_map_cb), self, 0);
848   g_signal_connect_object (self->rtpbin, "on-new-ssrc",
849       G_CALLBACK (gst_rtp_src_rtpbin_on_new_ssrc_cb), self, 0);
850   g_signal_connect_object (self->rtpbin, "on-ssrc-collision",
851       G_CALLBACK (gst_rtp_src_rtpbin_on_ssrc_collision_cb), self, 0);
852 
853   self->rtp_src = gst_element_factory_make ("udpsrc", "rtp_rtp_udpsrc0");
854   if (self->rtp_src == NULL) {
855     missing_plugin = "udp";
856     goto missing_plugin;
857   }
858 
859   self->rtcp_src = gst_element_factory_make ("udpsrc", "rtp_rtcp_udpsrc0");
860   if (self->rtcp_src == NULL) {
861     missing_plugin = "udp";
862     goto missing_plugin;
863   }
864 
865   self->rtcp_sink =
866       gst_element_factory_make ("dynudpsink", "rtp_rtcp_dynudpsink0");
867   if (self->rtcp_sink == NULL) {
868     missing_plugin = "udp";
869     goto missing_plugin;
870   }
871 
872   /* Add elements as needed, since udpsrc/udpsink for RTCP share a socket,
873    * not all at the same moment */
874   gst_bin_add (GST_BIN (self), self->rtp_src);
875   gst_bin_add (GST_BIN (self), self->rtcp_src);
876   gst_bin_add (GST_BIN (self), self->rtcp_sink);
877 
878   g_object_set (self->rtcp_sink, "sync", FALSE, "async", FALSE, NULL);
879   gst_element_set_locked_state (self->rtcp_sink, TRUE);
880 
881   /* pads are all named */
882   g_snprintf (name, 48, "recv_rtp_sink_%u", GST_ELEMENT (self)->numpads);
883   gst_element_link_pads (self->rtp_src, "src", self->rtpbin, name);
884   g_snprintf (name, 48, "recv_rtcp_sink_%u", GST_ELEMENT (self)->numpads);
885   gst_element_link_pads (self->rtcp_src, "src", self->rtpbin, name);
886   g_snprintf (name, 48, "send_rtcp_src_%u", GST_ELEMENT (self)->numpads);
887   gst_element_link_pads (self->rtpbin, name, self->rtcp_sink, "sink");
888 
889   if (missing_plugin == NULL)
890     return;
891 
892 missing_plugin:
893   {
894     GST_ERROR_OBJECT (self, "'%s' plugin is missing.", missing_plugin);
895   }
896 }
897 
898 static GstURIType
gst_rtp_src_uri_get_type(GType type)899 gst_rtp_src_uri_get_type (GType type)
900 {
901   return GST_URI_SRC;
902 }
903 
904 static const gchar *const *
gst_rtp_src_uri_get_protocols(GType type)905 gst_rtp_src_uri_get_protocols (GType type)
906 {
907   static const gchar *protocols[] = { (char *) "rtp", NULL };
908 
909   return protocols;
910 }
911 
912 static gchar *
gst_rtp_src_uri_get_uri(GstURIHandler * handler)913 gst_rtp_src_uri_get_uri (GstURIHandler * handler)
914 {
915   GstRtpSrc *self = (GstRtpSrc *) handler;
916 
917   return gst_uri_to_string (self->uri);
918 }
919 
920 static gboolean
gst_rtp_src_uri_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)921 gst_rtp_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
922     GError ** error)
923 {
924   GstRtpSrc *self = (GstRtpSrc *) handler;
925 
926   g_object_set (G_OBJECT (self), "uri", uri, NULL);
927 
928   return TRUE;
929 }
930 
931 static void
gst_rtp_src_uri_handler_init(gpointer g_iface,gpointer iface_data)932 gst_rtp_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
933 {
934   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
935 
936   iface->get_type = gst_rtp_src_uri_get_type;
937   iface->get_protocols = gst_rtp_src_uri_get_protocols;
938   iface->get_uri = gst_rtp_src_uri_get_uri;
939   iface->set_uri = gst_rtp_src_uri_set_uri;
940 }
941 
942 /* ex: set tabstop=2 shiftwidth=2 expandtab: */
943