• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* RTP Retransmission receiver element for GStreamer
2  *
3  * gstrtprtxreceive.c:
4  *
5  * Copyright (C) 2013 Collabora Ltd.
6  *   @author Julien Isorce <julien.isorce@collabora.co.uk>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 
24 /**
25  * SECTION:element-rtprtxreceive
26  * @title: rtprtxreceive
27  * @see_also: rtprtxsend, rtpsession, rtpjitterbuffer
28  *
29  * rtprtxreceive listens to the retransmission events from the
30  * downstream rtpjitterbuffer and remembers the SSRC (ssrc1) of the stream and
31  * the sequence number that was requested. When it receives a packet with
32  * a sequence number equal to one of the ones stored and with a different SSRC,
33  * it identifies the new SSRC (ssrc2) as the retransmission stream of ssrc1.
34  * From this point on, it replaces ssrc2 with ssrc1 in all packets of the
35  * ssrc2 stream and flags them as retransmissions, so that rtpjitterbuffer
36  * can reconstruct the original stream.
37  *
38  * This algorithm is implemented as specified in RFC 4588.
39  *
40  * This element is meant to be used with rtprtxsend on the sender side.
41  * See #GstRtpRtxSend
42  *
43  * Below you can see some examples that illustrate how rtprtxreceive and
44  * rtprtxsend fit among the other rtp elements and how they work internally.
45  * Normally, hoewever, you should avoid using such pipelines and use
46  * rtpbin instead, with its #GstRtpBin::request-aux-sender and
47  * #GstRtpBin::request-aux-receiver signals. See #GstRtpBin.
48  *
49  * ## Example pipelines
50  *
51  * |[
52  * gst-launch-1.0 rtpsession name=rtpsession rtp-profile=avpf \
53  *     audiotestsrc is-live=true ! opusenc ! rtpopuspay pt=96 ! \
54  *         rtprtxsend payload-type-map="application/x-rtp-pt-map,96=(uint)97" ! \
55  *         rtpsession.send_rtp_sink \
56  *     rtpsession.send_rtp_src ! identity drop-probability=0.01 ! \
57  *         udpsink host="127.0.0.1" port=5000 \
58  *     udpsrc port=5001 ! rtpsession.recv_rtcp_sink \
59  *     rtpsession.send_rtcp_src ! udpsink host="127.0.0.1" port=5002 \
60  *         sync=false async=false
61  * ]| Send audio stream through port 5000 (5001 and 5002 are just the rtcp
62  * link with the receiver)
63  *
64  * |[
65  * gst-launch-1.0 rtpsession name=rtpsession rtp-profile=avpf \
66  *     udpsrc port=5000 caps="application/x-rtp,media=(string)audio,clock-rate=(int)48000,encoding-name=(string)OPUS,payload=(int)96" ! \
67  *         rtpsession.recv_rtp_sink \
68  *     rtpsession.recv_rtp_src ! \
69  *         rtprtxreceive payload-type-map="application/x-rtp-pt-map,96=(uint)97" ! \
70  *         rtpssrcdemux ! rtpjitterbuffer do-retransmission=true ! \
71  *         rtpopusdepay ! opusdec ! audioconvert ! audioresample ! autoaudiosink \
72  *     rtpsession.send_rtcp_src ! \
73  *         udpsink host="127.0.0.1" port=5001 sync=false async=false \
74  *     udpsrc port=5002 ! rtpsession.recv_rtcp_sink
75  * ]|
76  * Receive audio stream from port 5000 (5001 and 5002 are just the rtcp
77  * link with the sender)
78  *
79  * In this example we can see a simple streaming of an OPUS stream with some
80  * of the packets being artificially dropped by the identity element.
81  * Thanks to retransmission, you should still hear a clear sound when setting
82  * drop-probability to something greater than 0.
83  *
84  * Internally, the rtpjitterbuffer will generate a custom upstream event,
85  * GstRTPRetransmissionRequest, when it detects that one packet is missing.
86  * Then this request is translated to a FB NACK in the rtcp link by rtpsession.
87  * Finally the rtpsession of the sender side will re-convert it in a
88  * GstRTPRetransmissionRequest that will be handled by rtprtxsend. rtprtxsend
89  * will then re-send the missing packet with a new srrc and a different payload
90  * type (here, 97), but with the same original sequence number. On the receiver
91  * side, rtprtxreceive will associate this new stream with the original and
92  * forward the retransmission packets to rtpjitterbuffer with the original
93  * ssrc and payload type.
94  *
95  * |[
96  * gst-launch-1.0 rtpsession name=rtpsession rtp-profile=avpf \
97  *     audiotestsrc is-live=true ! opusenc ! rtpopuspay pt=97 seqnum-offset=1 ! \
98  *         rtprtxsend payload-type-map="application/x-rtp-pt-map,97=(uint)99" ! \
99  *         funnel name=f ! rtpsession.send_rtp_sink \
100  *     audiotestsrc freq=660.0 is-live=true ! opusenc ! \
101  *         rtpopuspay pt=97 seqnum-offset=100 ! \
102  *         rtprtxsend payload-type-map="application/x-rtp-pt-map,97=(uint)99" ! \
103  *         f. \
104  *     rtpsession.send_rtp_src ! identity drop-probability=0.01 ! \
105  *         udpsink host="127.0.0.1" port=5000 \
106  *     udpsrc port=5001 ! rtpsession.recv_rtcp_sink \
107  *     rtpsession.send_rtcp_src ! udpsink host="127.0.0.1" port=5002 \
108  *         sync=false async=false
109  * ]|
110  * Send two audio streams to port 5000.
111  * |[
112  * gst-launch-1.0 rtpsession name=rtpsession rtp-profile=avpf \
113  *     udpsrc port=5000 caps="application/x-rtp,media=(string)audio,clock-rate=(int)48000,encoding-name=(string)OPUS,payload=(int)97" ! \
114  *         rtpsession.recv_rtp_sink \
115  *     rtpsession.recv_rtp_src ! \
116  *         rtprtxreceive payload-type-map="application/x-rtp-pt-map,97=(uint)99" ! \
117  *         rtpssrcdemux name=demux \
118  *     demux. ! queue ! rtpjitterbuffer do-retransmission=true ! rtpopusdepay ! \
119  *         opusdec ! audioconvert ! autoaudiosink \
120  *     demux. ! queue ! rtpjitterbuffer do-retransmission=true ! rtpopusdepay ! \
121  *         opusdec ! audioconvert ! autoaudiosink \
122  *     udpsrc port=5002 ! rtpsession.recv_rtcp_sink \
123  *     rtpsession.send_rtcp_src ! udpsink host="127.0.0.1" port=5001 \
124  *         sync=false async=false
125  * ]|
126  * Receive two audio streams from port 5000.
127  *
128  * In this example we are streaming two streams of the same type through the
129  * same port. They, however, are using a different SSRC (ssrc is randomly
130  * generated on each payloader - rtpopuspay in this example), so they can be
131  * identified and demultiplexed by rtpssrcdemux on the receiver side. This is
132  * an example of SSRC-multiplexing.
133  *
134  * It is important here to use a different starting sequence number
135  * (seqnum-offset), since this is the only means of identification that
136  * rtprtxreceive uses the very first time to identify retransmission streams.
137  * It is an error, according to RFC4588 to have two retransmission requests for
138  * packets belonging to two different streams but with the same sequence number.
139  * Note that the default seqnum-offset value (-1, which means random) would
140  * work just fine, but it is overridden here for illustration purposes.
141  */
142 
143 #ifdef HAVE_CONFIG_H
144 #include "config.h"
145 #endif
146 
147 #include <gst/gst.h>
148 #include <gst/rtp/gstrtpbuffer.h>
149 #include <string.h>
150 #include <stdlib.h>
151 
152 #include "gstrtprtxreceive.h"
153 
154 #define ASSOC_TIMEOUT (GST_SECOND)
155 
156 GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_receive_debug);
157 #define GST_CAT_DEFAULT gst_rtp_rtx_receive_debug
158 
159 enum
160 {
161   PROP_0,
162   PROP_PAYLOAD_TYPE_MAP,
163   PROP_NUM_RTX_REQUESTS,
164   PROP_NUM_RTX_PACKETS,
165   PROP_NUM_RTX_ASSOC_PACKETS
166 };
167 
168 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
169     GST_PAD_SRC,
170     GST_PAD_ALWAYS,
171     GST_STATIC_CAPS ("application/x-rtp")
172     );
173 
174 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
175     GST_PAD_SINK,
176     GST_PAD_ALWAYS,
177     GST_STATIC_CAPS ("application/x-rtp")
178     );
179 
180 static gboolean gst_rtp_rtx_receive_src_event (GstPad * pad, GstObject * parent,
181     GstEvent * event);
182 static GstFlowReturn gst_rtp_rtx_receive_chain (GstPad * pad,
183     GstObject * parent, GstBuffer * buffer);
184 
185 static GstStateChangeReturn gst_rtp_rtx_receive_change_state (GstElement *
186     element, GstStateChange transition);
187 
188 static void gst_rtp_rtx_receive_set_property (GObject * object, guint prop_id,
189     const GValue * value, GParamSpec * pspec);
190 static void gst_rtp_rtx_receive_get_property (GObject * object, guint prop_id,
191     GValue * value, GParamSpec * pspec);
192 static void gst_rtp_rtx_receive_finalize (GObject * object);
193 
194 G_DEFINE_TYPE_WITH_CODE (GstRtpRtxReceive, gst_rtp_rtx_receive,
195     GST_TYPE_ELEMENT, GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_receive_debug,
196         "rtprtxreceive", 0, "rtp retransmission receiver"));
197 GST_ELEMENT_REGISTER_DEFINE (rtprtxreceive, "rtprtxreceive", GST_RANK_NONE,
198     GST_TYPE_RTP_RTX_RECEIVE);
199 
200 static void
gst_rtp_rtx_receive_class_init(GstRtpRtxReceiveClass * klass)201 gst_rtp_rtx_receive_class_init (GstRtpRtxReceiveClass * klass)
202 {
203   GObjectClass *gobject_class;
204   GstElementClass *gstelement_class;
205 
206   gobject_class = (GObjectClass *) klass;
207   gstelement_class = (GstElementClass *) klass;
208 
209   gobject_class->get_property = gst_rtp_rtx_receive_get_property;
210   gobject_class->set_property = gst_rtp_rtx_receive_set_property;
211   gobject_class->finalize = gst_rtp_rtx_receive_finalize;
212 
213   g_object_class_install_property (gobject_class, PROP_PAYLOAD_TYPE_MAP,
214       g_param_spec_boxed ("payload-type-map", "Payload Type Map",
215           "Map of original payload types to their retransmission payload types",
216           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
217 
218   g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS,
219       g_param_spec_uint ("num-rtx-requests", "Num RTX Requests",
220           "Number of retransmission events received", 0, G_MAXUINT,
221           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
222 
223   g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS,
224       g_param_spec_uint ("num-rtx-packets", "Num RTX Packets",
225           " Number of retransmission packets received", 0, G_MAXUINT,
226           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
227 
228   g_object_class_install_property (gobject_class, PROP_NUM_RTX_ASSOC_PACKETS,
229       g_param_spec_uint ("num-rtx-assoc-packets",
230           "Num RTX Associated Packets", "Number of retransmission packets "
231           "correctly associated with retransmission requests", 0, G_MAXUINT,
232           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
233 
234   gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
235   gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
236 
237   gst_element_class_set_static_metadata (gstelement_class,
238       "RTP Retransmission receiver", "Codec",
239       "Receive retransmitted RTP packets according to RFC4588",
240       "Julien Isorce <julien.isorce@collabora.co.uk>");
241 
242   gstelement_class->change_state =
243       GST_DEBUG_FUNCPTR (gst_rtp_rtx_receive_change_state);
244 }
245 
246 static void
gst_rtp_rtx_receive_reset(GstRtpRtxReceive * rtx)247 gst_rtp_rtx_receive_reset (GstRtpRtxReceive * rtx)
248 {
249   GST_OBJECT_LOCK (rtx);
250   g_hash_table_remove_all (rtx->ssrc2_ssrc1_map);
251   g_hash_table_remove_all (rtx->seqnum_ssrc1_map);
252   rtx->num_rtx_requests = 0;
253   rtx->num_rtx_packets = 0;
254   rtx->num_rtx_assoc_packets = 0;
255   GST_OBJECT_UNLOCK (rtx);
256 }
257 
258 static void
gst_rtp_rtx_receive_finalize(GObject * object)259 gst_rtp_rtx_receive_finalize (GObject * object)
260 {
261   GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE_CAST (object);
262 
263   g_hash_table_unref (rtx->ssrc2_ssrc1_map);
264   g_hash_table_unref (rtx->seqnum_ssrc1_map);
265   g_hash_table_unref (rtx->rtx_pt_map);
266   if (rtx->rtx_pt_map_structure)
267     gst_structure_free (rtx->rtx_pt_map_structure);
268 
269   G_OBJECT_CLASS (gst_rtp_rtx_receive_parent_class)->finalize (object);
270 }
271 
272 typedef struct
273 {
274   guint32 ssrc;
275   GstClockTime time;
276 } SsrcAssoc;
277 
278 static SsrcAssoc *
ssrc_assoc_new(guint32 ssrc,GstClockTime time)279 ssrc_assoc_new (guint32 ssrc, GstClockTime time)
280 {
281   SsrcAssoc *assoc = g_slice_new (SsrcAssoc);
282 
283   assoc->ssrc = ssrc;
284   assoc->time = time;
285 
286   return assoc;
287 }
288 
289 static void
ssrc_assoc_free(SsrcAssoc * assoc)290 ssrc_assoc_free (SsrcAssoc * assoc)
291 {
292   g_slice_free (SsrcAssoc, assoc);
293 }
294 
295 static void
gst_rtp_rtx_receive_init(GstRtpRtxReceive * rtx)296 gst_rtp_rtx_receive_init (GstRtpRtxReceive * rtx)
297 {
298   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
299 
300   rtx->srcpad =
301       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
302           "src"), "src");
303   GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
304   GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
305   gst_pad_set_event_function (rtx->srcpad,
306       GST_DEBUG_FUNCPTR (gst_rtp_rtx_receive_src_event));
307   gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
308 
309   rtx->sinkpad =
310       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
311           "sink"), "sink");
312   GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
313   GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
314   gst_pad_set_chain_function (rtx->sinkpad,
315       GST_DEBUG_FUNCPTR (gst_rtp_rtx_receive_chain));
316   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
317 
318   rtx->ssrc2_ssrc1_map = g_hash_table_new (g_direct_hash, g_direct_equal);
319   rtx->seqnum_ssrc1_map = g_hash_table_new_full (g_direct_hash, g_direct_equal,
320       NULL, (GDestroyNotify) ssrc_assoc_free);
321 
322   rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
323 }
324 
325 static gboolean
gst_rtp_rtx_receive_src_event(GstPad * pad,GstObject * parent,GstEvent * event)326 gst_rtp_rtx_receive_src_event (GstPad * pad, GstObject * parent,
327     GstEvent * event)
328 {
329   GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE_CAST (parent);
330   gboolean res;
331 
332   switch (GST_EVENT_TYPE (event)) {
333     case GST_EVENT_CUSTOM_UPSTREAM:
334     {
335       const GstStructure *s = gst_event_get_structure (event);
336 
337       /* This event usually comes from the downstream gstrtpjitterbuffer */
338       if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
339         guint seqnum = 0;
340         guint ssrc = 0;
341         gpointer ssrc2 = 0;
342 
343         /* retrieve seqnum of the packet that need to be retransmitted */
344         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
345           seqnum = -1;
346 
347         /* retrieve ssrc of the packet that need to be retransmitted
348          * it's useful when reconstructing the original packet from the rtx packet */
349         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
350           ssrc = -1;
351 
352         GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X",
353             seqnum, ssrc);
354 
355         GST_OBJECT_LOCK (rtx);
356 
357         /* increase number of seen requests for our statistics */
358         ++rtx->num_rtx_requests;
359 
360         /* First, we lookup in our map to see if we have already associate this
361          * master stream ssrc with its retransmitted stream.
362          * Every ssrc are unique so we can use the same hash table
363          * for both retrieving the ssrc1 from ssrc2 and also ssrc2 from ssrc1
364          */
365         if (g_hash_table_lookup_extended (rtx->ssrc2_ssrc1_map,
366                 GUINT_TO_POINTER (ssrc), NULL, &ssrc2)
367             && GPOINTER_TO_UINT (ssrc2) != GPOINTER_TO_UINT (ssrc)) {
368           GST_TRACE_OBJECT (rtx, "Retransmitted stream %X already associated "
369               "to its master, %X", GPOINTER_TO_UINT (ssrc2), ssrc);
370         } else {
371           SsrcAssoc *assoc;
372 
373           /* not already associated but also we have to check that we have not
374            * already considered this request.
375            */
376           if (g_hash_table_lookup_extended (rtx->seqnum_ssrc1_map,
377                   GUINT_TO_POINTER (seqnum), NULL, (gpointer *) & assoc)) {
378             if (assoc->ssrc == ssrc) {
379               /* same seqnum, same ssrc */
380 
381               /* do nothing because we have already considered this request
382                * The jitter may be too impatient of the rtx packet has been
383                * lost too.
384                * It does not mean we reject the event, we still want to forward
385                * the request to the gstrtpsession to be translator into a FB NACK
386                */
387               GST_LOG_OBJECT (rtx, "Duplicate request: seqnum: %u, ssrc: %X",
388                   seqnum, ssrc);
389             } else {
390               /* same seqnum, different ssrc */
391 
392               /* If the association attempt is larger than ASSOC_TIMEOUT,
393                * then we give up on it, and try this one.
394                */
395               if (!GST_CLOCK_TIME_IS_VALID (rtx->last_time) ||
396                   !GST_CLOCK_TIME_IS_VALID (assoc->time) ||
397                   assoc->time + ASSOC_TIMEOUT < rtx->last_time) {
398                 /* From RFC 4588:
399                  * the receiver MUST NOT have two outstanding requests for the
400                  * same packet sequence number in two different original streams
401                  * before the association is resolved. Otherwise it's impossible
402                  * to associate a rtx stream and its master stream
403                  */
404 
405                 /* remove seqnum in order to reuse the spot */
406                 g_hash_table_remove (rtx->seqnum_ssrc1_map,
407                     GUINT_TO_POINTER (seqnum));
408                 goto retransmit;
409               } else {
410                 GST_INFO_OBJECT (rtx, "rejecting request for seqnum %u"
411                     " of master stream %X; there is already a pending request "
412                     "for the same seqnum on ssrc %X that has not expired",
413                     seqnum, ssrc, assoc->ssrc);
414 
415                 /* do not forward the event as we are rejecting this request */
416                 GST_OBJECT_UNLOCK (rtx);
417                 gst_event_unref (event);
418                 return TRUE;
419               }
420             }
421           } else {
422           retransmit:
423             /* the request has not been already considered
424              * insert it for the first time */
425             g_hash_table_insert (rtx->seqnum_ssrc1_map,
426                 GUINT_TO_POINTER (seqnum),
427                 ssrc_assoc_new (ssrc, rtx->last_time));
428           }
429         }
430 
431         GST_DEBUG_OBJECT (rtx, "packet number %u of master stream %X"
432             " needs to be retransmitted", seqnum, ssrc);
433 
434         GST_OBJECT_UNLOCK (rtx);
435       }
436 
437       /* Transfer event upstream so that the request can actually by translated
438        * through gstrtpsession through the network */
439       res = gst_pad_event_default (pad, parent, event);
440       break;
441     }
442     default:
443       res = gst_pad_event_default (pad, parent, event);
444       break;
445   }
446   return res;
447 }
448 
449 /* Copy fixed header and extension. Replace current ssrc by ssrc1,
450  * remove OSN and replace current seq num by OSN.
451  * Copy memory to avoid to manually copy each rtp buffer field.
452  */
453 static GstBuffer *
_gst_rtp_buffer_new_from_rtx(GstRTPBuffer * rtp,guint32 ssrc1,guint16 orign_seqnum,guint8 origin_payload_type)454 _gst_rtp_buffer_new_from_rtx (GstRTPBuffer * rtp, guint32 ssrc1,
455     guint16 orign_seqnum, guint8 origin_payload_type)
456 {
457   GstMemory *mem = NULL;
458   GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
459   GstBuffer *new_buffer = gst_buffer_new ();
460   GstMapInfo map;
461   guint payload_len = 0;
462 
463   /* copy fixed header */
464   mem = gst_memory_copy (rtp->map[0].memory,
465       (guint8 *) rtp->data[0] - rtp->map[0].data, rtp->size[0]);
466   gst_buffer_append_memory (new_buffer, mem);
467 
468   /* copy extension if any */
469   if (rtp->size[1]) {
470     mem = gst_memory_copy (rtp->map[1].memory,
471         (guint8 *) rtp->data[1] - rtp->map[1].data, rtp->size[1]);
472     gst_buffer_append_memory (new_buffer, mem);
473   }
474 
475   /* copy payload and remove OSN */
476   payload_len = rtp->size[2] - 2;
477   mem = gst_allocator_alloc (NULL, payload_len, NULL);
478 
479   gst_memory_map (mem, &map, GST_MAP_WRITE);
480   if (rtp->size[2])
481     memcpy (map.data, (guint8 *) rtp->data[2] + 2, payload_len);
482   gst_memory_unmap (mem, &map);
483   gst_buffer_append_memory (new_buffer, mem);
484 
485   /* the sender always constructs rtx packets without padding,
486    * But the receiver can still receive rtx packets with padding.
487    * So just copy it.
488    */
489   if (rtp->size[3]) {
490     guint pad_len = rtp->size[3];
491 
492     mem = gst_allocator_alloc (NULL, pad_len, NULL);
493 
494     gst_memory_map (mem, &map, GST_MAP_WRITE);
495     map.data[pad_len - 1] = pad_len;
496     gst_memory_unmap (mem, &map);
497 
498     gst_buffer_append_memory (new_buffer, mem);
499   }
500 
501   /* set ssrc and seq num */
502   gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
503   gst_rtp_buffer_set_ssrc (&new_rtp, ssrc1);
504   gst_rtp_buffer_set_seq (&new_rtp, orign_seqnum);
505   gst_rtp_buffer_set_payload_type (&new_rtp, origin_payload_type);
506   gst_rtp_buffer_unmap (&new_rtp);
507 
508   gst_buffer_copy_into (new_buffer, rtp->buffer,
509       GST_BUFFER_COPY_FLAGS | GST_BUFFER_COPY_TIMESTAMPS, 0, -1);
510   GST_BUFFER_FLAG_SET (new_buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION);
511 
512   return new_buffer;
513 }
514 
515 static GstFlowReturn
gst_rtp_rtx_receive_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)516 gst_rtp_rtx_receive_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
517 {
518   GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE_CAST (parent);
519   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
520   GstFlowReturn ret = GST_FLOW_OK;
521   GstBuffer *new_buffer = NULL;
522   guint32 ssrc = 0;
523   gpointer ssrc1 = 0;
524   guint32 ssrc2 = 0;
525   guint16 seqnum = 0;
526   guint16 orign_seqnum = 0;
527   guint8 payload_type = 0;
528   gpointer payload = NULL;
529   guint8 origin_payload_type = 0;
530   gboolean is_rtx;
531   gboolean drop = FALSE;
532 
533   /* map current rtp packet to parse its header */
534   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp))
535     goto invalid_buffer;
536 
537   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
538   seqnum = gst_rtp_buffer_get_seq (&rtp);
539   payload_type = gst_rtp_buffer_get_payload_type (&rtp);
540 
541   /* check if we have a retransmission packet (this information comes from SDP) */
542   GST_OBJECT_LOCK (rtx);
543 
544   is_rtx =
545       g_hash_table_lookup_extended (rtx->rtx_pt_map,
546       GUINT_TO_POINTER (payload_type), NULL, NULL);
547 
548   if (is_rtx) {
549     payload = gst_rtp_buffer_get_payload (&rtp);
550 
551     if (!payload || gst_rtp_buffer_get_payload_len (&rtp) < 2) {
552       GST_OBJECT_UNLOCK (rtx);
553       gst_rtp_buffer_unmap (&rtp);
554       goto invalid_buffer;
555     }
556   }
557 
558   rtx->last_time = GST_BUFFER_PTS (buffer);
559 
560   if (g_hash_table_size (rtx->seqnum_ssrc1_map) > 0) {
561     GHashTableIter iter;
562     gpointer key, value;
563 
564     g_hash_table_iter_init (&iter, rtx->seqnum_ssrc1_map);
565     while (g_hash_table_iter_next (&iter, &key, &value)) {
566       SsrcAssoc *assoc = value;
567 
568       /* remove association request if it is too old */
569       if (GST_CLOCK_TIME_IS_VALID (rtx->last_time) &&
570           GST_CLOCK_TIME_IS_VALID (assoc->time) &&
571           assoc->time + ASSOC_TIMEOUT < rtx->last_time) {
572         g_hash_table_iter_remove (&iter);
573       }
574     }
575   }
576 
577   /* if the current packet is from a retransmission stream */
578   if (is_rtx) {
579     /* increase our statistic */
580     ++rtx->num_rtx_packets;
581 
582     /* read OSN in the rtx payload */
583     orign_seqnum = GST_READ_UINT16_BE (gst_rtp_buffer_get_payload (&rtp));
584     origin_payload_type =
585         GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
586             GUINT_TO_POINTER (payload_type)));
587 
588     GST_DEBUG_OBJECT (rtx, "Got rtx packet: rtx seqnum %u, rtx ssrc %X, "
589         "rtx pt %u, orig seqnum %u, orig pt %u", seqnum, ssrc, payload_type,
590         orign_seqnum, origin_payload_type);
591 
592     /* first we check if we already have associated this retransmission stream
593      * to a master stream */
594     if (g_hash_table_lookup_extended (rtx->ssrc2_ssrc1_map,
595             GUINT_TO_POINTER (ssrc), NULL, &ssrc1)) {
596       GST_TRACE_OBJECT (rtx,
597           "packet is from retransmission stream %X already associated to "
598           "master stream %X", ssrc, GPOINTER_TO_UINT (ssrc1));
599       ssrc2 = ssrc;
600     } else {
601       SsrcAssoc *assoc;
602 
603       /* the current retransmitted packet has its rtx stream not already
604        * associated to a master stream, so retrieve it from our request
605        * history */
606       if (g_hash_table_lookup_extended (rtx->seqnum_ssrc1_map,
607               GUINT_TO_POINTER (orign_seqnum), NULL, (gpointer *) & assoc)) {
608         GST_LOG_OBJECT (rtx,
609             "associating retransmitted stream %X to master stream %X thanks "
610             "to rtx packet %u (orig seqnum %u)", ssrc, assoc->ssrc, seqnum,
611             orign_seqnum);
612         ssrc1 = GUINT_TO_POINTER (assoc->ssrc);
613         ssrc2 = ssrc;
614 
615         /* just put a guard */
616         if (GPOINTER_TO_UINT (ssrc1) == ssrc2)
617           GST_WARNING_OBJECT (rtx, "RTX receiver ssrc2_ssrc1_map bad state, "
618               "master and rtx SSRCs are the same (%X)\n", ssrc);
619 
620         /* free the spot so that this seqnum can be used to do another
621          * association */
622         g_hash_table_remove (rtx->seqnum_ssrc1_map,
623             GUINT_TO_POINTER (orign_seqnum));
624 
625         /* actually do the association between rtx stream and master stream */
626         g_hash_table_insert (rtx->ssrc2_ssrc1_map, GUINT_TO_POINTER (ssrc2),
627             ssrc1);
628 
629         /* also do the association between master stream and rtx stream
630          * every ssrc are unique so we can use the same hash table
631          * for both retrieving the ssrc1 from ssrc2 and also ssrc2 from ssrc1
632          */
633         g_hash_table_insert (rtx->ssrc2_ssrc1_map, ssrc1,
634             GUINT_TO_POINTER (ssrc2));
635 
636       } else {
637         /* we are not able to associate this rtx packet with a master stream */
638         GST_INFO_OBJECT (rtx,
639             "dropping rtx packet %u because its orig seqnum (%u) is not in our"
640             " pending retransmission requests", seqnum, orign_seqnum);
641         drop = TRUE;
642       }
643     }
644   }
645 
646   /* if not dropped the packet was successfully associated */
647   if (is_rtx && !drop)
648     ++rtx->num_rtx_assoc_packets;
649 
650   GST_OBJECT_UNLOCK (rtx);
651 
652   /* just drop the packet if the association could not have been made */
653   if (drop) {
654     gst_rtp_buffer_unmap (&rtp);
655     gst_buffer_unref (buffer);
656     return GST_FLOW_OK;
657   }
658 
659   /* create the retransmission packet */
660   if (is_rtx)
661     new_buffer =
662         _gst_rtp_buffer_new_from_rtx (&rtp, GPOINTER_TO_UINT (ssrc1),
663         orign_seqnum, origin_payload_type);
664 
665   gst_rtp_buffer_unmap (&rtp);
666 
667   /* push the packet */
668   if (is_rtx) {
669     gst_buffer_unref (buffer);
670     GST_LOG_OBJECT (rtx, "pushing packet seqnum:%u from restransmission "
671         "stream ssrc: %X (master ssrc %X)", orign_seqnum, ssrc2,
672         GPOINTER_TO_UINT (ssrc1));
673     ret = gst_pad_push (rtx->srcpad, new_buffer);
674   } else {
675     GST_TRACE_OBJECT (rtx, "pushing packet seqnum:%u from master stream "
676         "ssrc: %X", seqnum, ssrc);
677     ret = gst_pad_push (rtx->srcpad, buffer);
678   }
679 
680   return ret;
681 
682 invalid_buffer:
683   {
684     GST_ELEMENT_WARNING (rtx, STREAM, DECODE, (NULL),
685         ("Received invalid RTP payload, dropping"));
686     gst_buffer_unref (buffer);
687     return GST_FLOW_OK;
688   }
689 }
690 
691 static void
gst_rtp_rtx_receive_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)692 gst_rtp_rtx_receive_get_property (GObject * object,
693     guint prop_id, GValue * value, GParamSpec * pspec)
694 {
695   GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE_CAST (object);
696 
697   switch (prop_id) {
698     case PROP_PAYLOAD_TYPE_MAP:
699       GST_OBJECT_LOCK (rtx);
700       g_value_set_boxed (value, rtx->rtx_pt_map_structure);
701       GST_OBJECT_UNLOCK (rtx);
702       break;
703     case PROP_NUM_RTX_REQUESTS:
704       GST_OBJECT_LOCK (rtx);
705       g_value_set_uint (value, rtx->num_rtx_requests);
706       GST_OBJECT_UNLOCK (rtx);
707       break;
708     case PROP_NUM_RTX_PACKETS:
709       GST_OBJECT_LOCK (rtx);
710       g_value_set_uint (value, rtx->num_rtx_packets);
711       GST_OBJECT_UNLOCK (rtx);
712       break;
713     case PROP_NUM_RTX_ASSOC_PACKETS:
714       GST_OBJECT_LOCK (rtx);
715       g_value_set_uint (value, rtx->num_rtx_assoc_packets);
716       GST_OBJECT_UNLOCK (rtx);
717       break;
718     default:
719       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
720       break;
721   }
722 }
723 
724 static gboolean
structure_to_hash_table_inv(GQuark field_id,const GValue * value,gpointer hash)725 structure_to_hash_table_inv (GQuark field_id, const GValue * value,
726     gpointer hash)
727 {
728   const gchar *field_str;
729   guint field_uint;
730   guint value_uint;
731 
732   field_str = g_quark_to_string (field_id);
733   field_uint = atoi (field_str);
734   value_uint = g_value_get_uint (value);
735   g_hash_table_insert ((GHashTable *) hash, GUINT_TO_POINTER (value_uint),
736       GUINT_TO_POINTER (field_uint));
737 
738   return TRUE;
739 }
740 
741 static void
gst_rtp_rtx_receive_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)742 gst_rtp_rtx_receive_set_property (GObject * object,
743     guint prop_id, const GValue * value, GParamSpec * pspec)
744 {
745   GstRtpRtxReceive *rtx = GST_RTP_RTX_RECEIVE_CAST (object);
746 
747   switch (prop_id) {
748     case PROP_PAYLOAD_TYPE_MAP:
749       GST_OBJECT_LOCK (rtx);
750       if (rtx->rtx_pt_map_structure)
751         gst_structure_free (rtx->rtx_pt_map_structure);
752       rtx->rtx_pt_map_structure = g_value_dup_boxed (value);
753       g_hash_table_remove_all (rtx->rtx_pt_map);
754       gst_structure_foreach (rtx->rtx_pt_map_structure,
755           structure_to_hash_table_inv, rtx->rtx_pt_map);
756       GST_OBJECT_UNLOCK (rtx);
757       break;
758     default:
759       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
760       break;
761   }
762 }
763 
764 static GstStateChangeReturn
gst_rtp_rtx_receive_change_state(GstElement * element,GstStateChange transition)765 gst_rtp_rtx_receive_change_state (GstElement * element,
766     GstStateChange transition)
767 {
768   GstStateChangeReturn ret;
769   GstRtpRtxReceive *rtx;
770 
771   rtx = GST_RTP_RTX_RECEIVE_CAST (element);
772 
773   switch (transition) {
774     default:
775       break;
776   }
777 
778   ret =
779       GST_ELEMENT_CLASS (gst_rtp_rtx_receive_parent_class)->change_state
780       (element, transition);
781 
782   switch (transition) {
783     case GST_STATE_CHANGE_PAUSED_TO_READY:
784       gst_rtp_rtx_receive_reset (rtx);
785       break;
786     default:
787       break;
788   }
789 
790   return ret;
791 }
792