• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* RTP Retransmission sender element for GStreamer
2  *
3  * gstrtprtxsend.c:
4  *
5  * Copyright (C) 2013 Collabora Ltd.
6  *   @author Julien Isorce <julien.isorce@collabora.co.uk>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 
24 /**
25  * SECTION:element-rtprtxsend
26  * @title: rtprtxsend
27  *
28  * See #GstRtpRtxReceive for examples
29  *
30  * The purpose of the sender RTX object is to keep a history of RTP packets up
31  * to a configurable limit (max-size-time or max-size-packets). It will listen
32  * for upstream custom retransmission events (GstRTPRetransmissionRequest) that
33  * comes from downstream (#GstRtpSession). When receiving a request it will
34  * look up the requested seqnum in its list of stored packets. If the packet
35  * is available, it will create a RTX packet according to RFC 4588 and send
36  * this as an auxiliary stream. RTX is SSRC-multiplexed
37  */
38 
39 #ifdef HAVE_CONFIG_H
40 #include "config.h"
41 #endif
42 
43 #include <gst/gst.h>
44 #include <gst/rtp/gstrtpbuffer.h>
45 #include <string.h>
46 #include <stdlib.h>
47 
48 #include "gstrtprtxsend.h"
49 
50 GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_send_debug);
51 #define GST_CAT_DEFAULT gst_rtp_rtx_send_debug
52 
53 #define DEFAULT_RTX_PAYLOAD_TYPE 0
54 #define DEFAULT_MAX_SIZE_TIME    0
55 #define DEFAULT_MAX_SIZE_PACKETS 100
56 
57 enum
58 {
59   PROP_0,
60   PROP_SSRC_MAP,
61   PROP_PAYLOAD_TYPE_MAP,
62   PROP_MAX_SIZE_TIME,
63   PROP_MAX_SIZE_PACKETS,
64   PROP_NUM_RTX_REQUESTS,
65   PROP_NUM_RTX_PACKETS,
66   PROP_CLOCK_RATE_MAP,
67 };
68 
69 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
70     GST_PAD_SRC,
71     GST_PAD_ALWAYS,
72     GST_STATIC_CAPS ("application/x-rtp")
73     );
74 
75 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
76     GST_PAD_SINK,
77     GST_PAD_ALWAYS,
78     GST_STATIC_CAPS ("application/x-rtp")
79     );
80 
81 static gboolean gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
82     guint visible, guint bytes, guint64 time, gpointer checkdata);
83 
84 static gboolean gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent,
85     GstEvent * event);
86 static gboolean gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent,
87     GstEvent * event);
88 static GstFlowReturn gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent,
89     GstBuffer * buffer);
90 static GstFlowReturn gst_rtp_rtx_send_chain_list (GstPad * pad,
91     GstObject * parent, GstBufferList * list);
92 
93 static void gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx);
94 static gboolean gst_rtp_rtx_send_activate_mode (GstPad * pad,
95     GstObject * parent, GstPadMode mode, gboolean active);
96 
97 static GstStateChangeReturn gst_rtp_rtx_send_change_state (GstElement *
98     element, GstStateChange transition);
99 
100 static void gst_rtp_rtx_send_set_property (GObject * object, guint prop_id,
101     const GValue * value, GParamSpec * pspec);
102 static void gst_rtp_rtx_send_get_property (GObject * object, guint prop_id,
103     GValue * value, GParamSpec * pspec);
104 static void gst_rtp_rtx_send_finalize (GObject * object);
105 
106 G_DEFINE_TYPE_WITH_CODE (GstRtpRtxSend, gst_rtp_rtx_send, GST_TYPE_ELEMENT,
107     GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_send_debug, "rtprtxsend", 0,
108         "rtp retransmission sender"));
109 GST_ELEMENT_REGISTER_DEFINE (rtprtxsend, "rtprtxsend", GST_RANK_NONE,
110     GST_TYPE_RTP_RTX_SEND);
111 
112 typedef struct
113 {
114   guint16 seqnum;
115   guint32 timestamp;
116   GstBuffer *buffer;
117 } BufferQueueItem;
118 
119 static void
buffer_queue_item_free(BufferQueueItem * item)120 buffer_queue_item_free (BufferQueueItem * item)
121 {
122   gst_buffer_unref (item->buffer);
123   g_slice_free (BufferQueueItem, item);
124 }
125 
126 typedef struct
127 {
128   guint32 rtx_ssrc;
129   guint16 seqnum_base, next_seqnum;
130   gint clock_rate;
131 
132   /* history of rtp packets */
133   GSequence *queue;
134 } SSRCRtxData;
135 
136 static SSRCRtxData *
ssrc_rtx_data_new(guint32 rtx_ssrc)137 ssrc_rtx_data_new (guint32 rtx_ssrc)
138 {
139   SSRCRtxData *data = g_slice_new0 (SSRCRtxData);
140 
141   data->rtx_ssrc = rtx_ssrc;
142   data->next_seqnum = data->seqnum_base = g_random_int_range (0, G_MAXUINT16);
143   data->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free);
144 
145   return data;
146 }
147 
148 static void
ssrc_rtx_data_free(SSRCRtxData * data)149 ssrc_rtx_data_free (SSRCRtxData * data)
150 {
151   g_sequence_free (data->queue);
152   g_slice_free (SSRCRtxData, data);
153 }
154 
155 static void
gst_rtp_rtx_send_class_init(GstRtpRtxSendClass * klass)156 gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass)
157 {
158   GObjectClass *gobject_class;
159   GstElementClass *gstelement_class;
160 
161   gobject_class = (GObjectClass *) klass;
162   gstelement_class = (GstElementClass *) klass;
163 
164   gobject_class->get_property = gst_rtp_rtx_send_get_property;
165   gobject_class->set_property = gst_rtp_rtx_send_set_property;
166   gobject_class->finalize = gst_rtp_rtx_send_finalize;
167 
168   g_object_class_install_property (gobject_class, PROP_SSRC_MAP,
169       g_param_spec_boxed ("ssrc-map", "SSRC Map",
170           "Map of SSRCs to their retransmission SSRCs for SSRC-multiplexed mode"
171           " (default = random)", GST_TYPE_STRUCTURE,
172           G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
173 
174   g_object_class_install_property (gobject_class, PROP_PAYLOAD_TYPE_MAP,
175       g_param_spec_boxed ("payload-type-map", "Payload Type Map",
176           "Map of original payload types to their retransmission payload types",
177           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
178 
179   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
180       g_param_spec_uint ("max-size-time", "Max Size Time",
181           "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
182           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
183 
184   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
185       g_param_spec_uint ("max-size-packets", "Max Size Packets",
186           "Amount of packets to queue (0 = unlimited)", 0, G_MAXINT16,
187           DEFAULT_MAX_SIZE_PACKETS,
188           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
189 
190   g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS,
191       g_param_spec_uint ("num-rtx-requests", "Num RTX Requests",
192           "Number of retransmission events received", 0, G_MAXUINT,
193           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
194 
195   g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS,
196       g_param_spec_uint ("num-rtx-packets", "Num RTX Packets",
197           " Number of retransmission packets sent", 0, G_MAXUINT,
198           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
199 
200   g_object_class_install_property (gobject_class, PROP_CLOCK_RATE_MAP,
201       g_param_spec_boxed ("clock-rate-map", "Clock Rate Map",
202           "Map of payload types to their clock rates",
203           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
204 
205   gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
206   gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
207 
208   gst_element_class_set_static_metadata (gstelement_class,
209       "RTP Retransmission Sender", "Codec",
210       "Retransmit RTP packets when needed, according to RFC4588",
211       "Julien Isorce <julien.isorce@collabora.co.uk>");
212 
213   gstelement_class->change_state =
214       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_change_state);
215 }
216 
217 static void
gst_rtp_rtx_send_reset(GstRtpRtxSend * rtx)218 gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx)
219 {
220   GST_OBJECT_LOCK (rtx);
221   gst_data_queue_flush (rtx->queue);
222   g_hash_table_remove_all (rtx->ssrc_data);
223   g_hash_table_remove_all (rtx->rtx_ssrcs);
224   rtx->num_rtx_requests = 0;
225   rtx->num_rtx_packets = 0;
226   GST_OBJECT_UNLOCK (rtx);
227 }
228 
229 static void
gst_rtp_rtx_send_finalize(GObject * object)230 gst_rtp_rtx_send_finalize (GObject * object)
231 {
232   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (object);
233 
234   g_hash_table_unref (rtx->ssrc_data);
235   g_hash_table_unref (rtx->rtx_ssrcs);
236   if (rtx->external_ssrc_map)
237     gst_structure_free (rtx->external_ssrc_map);
238   g_hash_table_unref (rtx->rtx_pt_map);
239   if (rtx->rtx_pt_map_structure)
240     gst_structure_free (rtx->rtx_pt_map_structure);
241   g_hash_table_unref (rtx->clock_rate_map);
242   if (rtx->clock_rate_map_structure)
243     gst_structure_free (rtx->clock_rate_map_structure);
244   g_object_unref (rtx->queue);
245 
246   G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object);
247 }
248 
249 static void
gst_rtp_rtx_send_init(GstRtpRtxSend * rtx)250 gst_rtp_rtx_send_init (GstRtpRtxSend * rtx)
251 {
252   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
253 
254   rtx->srcpad =
255       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
256           "src"), "src");
257   GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
258   GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
259   gst_pad_set_event_function (rtx->srcpad,
260       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_src_event));
261   gst_pad_set_activatemode_function (rtx->srcpad,
262       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_activate_mode));
263   gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
264 
265   rtx->sinkpad =
266       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
267           "sink"), "sink");
268   GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
269   GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
270   gst_pad_set_event_function (rtx->sinkpad,
271       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_sink_event));
272   gst_pad_set_chain_function (rtx->sinkpad,
273       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain));
274   gst_pad_set_chain_list_function (rtx->sinkpad,
275       GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain_list));
276   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
277 
278   rtx->queue = gst_data_queue_new (gst_rtp_rtx_send_queue_check_full, NULL,
279       NULL, rtx);
280   rtx->ssrc_data = g_hash_table_new_full (g_direct_hash, g_direct_equal,
281       NULL, (GDestroyNotify) ssrc_rtx_data_free);
282   rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
283   rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
284   rtx->clock_rate_map = g_hash_table_new (g_direct_hash, g_direct_equal);
285 
286   rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
287   rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
288 }
289 
290 static void
gst_rtp_rtx_send_set_flushing(GstRtpRtxSend * rtx,gboolean flush)291 gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush)
292 {
293   GST_OBJECT_LOCK (rtx);
294   gst_data_queue_set_flushing (rtx->queue, flush);
295   gst_data_queue_flush (rtx->queue);
296   GST_OBJECT_UNLOCK (rtx);
297 }
298 
299 static gboolean
gst_rtp_rtx_send_queue_check_full(GstDataQueue * queue,guint visible,guint bytes,guint64 time,gpointer checkdata)300 gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
301     guint visible, guint bytes, guint64 time, gpointer checkdata)
302 {
303   return FALSE;
304 }
305 
306 static void
gst_rtp_rtx_data_queue_item_free(gpointer item)307 gst_rtp_rtx_data_queue_item_free (gpointer item)
308 {
309   GstDataQueueItem *data = item;
310   if (data->object)
311     gst_mini_object_unref (data->object);
312   g_slice_free (GstDataQueueItem, data);
313 }
314 
315 static gboolean
gst_rtp_rtx_send_push_out(GstRtpRtxSend * rtx,gpointer object)316 gst_rtp_rtx_send_push_out (GstRtpRtxSend * rtx, gpointer object)
317 {
318   GstDataQueueItem *data;
319   gboolean success;
320 
321   data = g_slice_new0 (GstDataQueueItem);
322   data->object = GST_MINI_OBJECT (object);
323   data->size = 1;
324   data->duration = 1;
325   data->visible = TRUE;
326   data->destroy = gst_rtp_rtx_data_queue_item_free;
327 
328   success = gst_data_queue_push (rtx->queue, data);
329 
330   if (!success)
331     data->destroy (data);
332 
333   return success;
334 }
335 
336 static guint32
gst_rtp_rtx_send_choose_ssrc(GstRtpRtxSend * rtx,guint32 choice,gboolean consider_choice)337 gst_rtp_rtx_send_choose_ssrc (GstRtpRtxSend * rtx, guint32 choice,
338     gboolean consider_choice)
339 {
340   guint32 ssrc = consider_choice ? choice : g_random_int ();
341 
342   /* make sure to be different than any other */
343   while (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc)) ||
344       g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
345     ssrc = g_random_int ();
346   }
347 
348   return ssrc;
349 }
350 
351 static SSRCRtxData *
gst_rtp_rtx_send_get_ssrc_data(GstRtpRtxSend * rtx,guint32 ssrc)352 gst_rtp_rtx_send_get_ssrc_data (GstRtpRtxSend * rtx, guint32 ssrc)
353 {
354   SSRCRtxData *data;
355   guint32 rtx_ssrc = 0;
356   gboolean consider = FALSE;
357 
358   if (G_UNLIKELY (!g_hash_table_contains (rtx->ssrc_data,
359               GUINT_TO_POINTER (ssrc)))) {
360     if (rtx->external_ssrc_map) {
361       gchar *ssrc_str;
362       ssrc_str = g_strdup_printf ("%" G_GUINT32_FORMAT, ssrc);
363       consider = gst_structure_get_uint (rtx->external_ssrc_map, ssrc_str,
364           &rtx_ssrc);
365       g_free (ssrc_str);
366     }
367     rtx_ssrc = gst_rtp_rtx_send_choose_ssrc (rtx, rtx_ssrc, consider);
368     data = ssrc_rtx_data_new (rtx_ssrc);
369     g_hash_table_insert (rtx->ssrc_data, GUINT_TO_POINTER (ssrc), data);
370     g_hash_table_insert (rtx->rtx_ssrcs, GUINT_TO_POINTER (rtx_ssrc),
371         GUINT_TO_POINTER (ssrc));
372   } else {
373     data = g_hash_table_lookup (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
374   }
375   return data;
376 }
377 
378 /* Copy fixed header and extension. Add OSN before to copy payload
379  * Copy memory to avoid to manually copy each rtp buffer field.
380  */
381 static GstBuffer *
gst_rtp_rtx_buffer_new(GstRtpRtxSend * rtx,GstBuffer * buffer)382 gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
383 {
384   GstMemory *mem = NULL;
385   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
386   GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
387   GstBuffer *new_buffer = gst_buffer_new ();
388   GstMapInfo map;
389   guint payload_len = 0;
390   SSRCRtxData *data;
391   guint32 ssrc;
392   guint16 seqnum;
393   guint8 fmtp;
394 
395   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
396 
397   /* get needed data from GstRtpRtxSend */
398   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
399   data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
400   ssrc = data->rtx_ssrc;
401   seqnum = data->next_seqnum++;
402   fmtp = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
403           GUINT_TO_POINTER (gst_rtp_buffer_get_payload_type (&rtp))));
404 
405   GST_DEBUG_OBJECT (rtx, "creating rtx buffer, orig seqnum: %u, "
406       "rtx seqnum: %u, rtx ssrc: %X", gst_rtp_buffer_get_seq (&rtp),
407       seqnum, ssrc);
408 
409   /* gst_rtp_buffer_map does not map the payload so do it now */
410   gst_rtp_buffer_get_payload (&rtp);
411 
412   /* copy fixed header */
413   mem = gst_memory_copy (rtp.map[0].memory, 0, rtp.size[0]);
414   gst_buffer_append_memory (new_buffer, mem);
415 
416   /* copy extension if any */
417   if (rtp.size[1]) {
418     mem = gst_allocator_alloc (NULL, rtp.size[1], NULL);
419     gst_memory_map (mem, &map, GST_MAP_WRITE);
420     memcpy (map.data, rtp.data[1], rtp.size[1]);
421     gst_memory_unmap (mem, &map);
422     gst_buffer_append_memory (new_buffer, mem);
423   }
424 
425   /* copy payload and add OSN just before */
426   payload_len = 2 + rtp.size[2];
427   mem = gst_allocator_alloc (NULL, payload_len, NULL);
428 
429   gst_memory_map (mem, &map, GST_MAP_WRITE);
430   GST_WRITE_UINT16_BE (map.data, gst_rtp_buffer_get_seq (&rtp));
431   if (rtp.size[2])
432     memcpy (map.data + 2, rtp.data[2], rtp.size[2]);
433   gst_memory_unmap (mem, &map);
434   gst_buffer_append_memory (new_buffer, mem);
435 
436   /* everything needed is copied */
437   gst_rtp_buffer_unmap (&rtp);
438 
439   /* set ssrc, seqnum and fmtp */
440   gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
441   gst_rtp_buffer_set_ssrc (&new_rtp, ssrc);
442   gst_rtp_buffer_set_seq (&new_rtp, seqnum);
443   gst_rtp_buffer_set_payload_type (&new_rtp, fmtp);
444   /* RFC 4588: let other elements do the padding, as normal */
445   gst_rtp_buffer_set_padding (&new_rtp, FALSE);
446   gst_rtp_buffer_unmap (&new_rtp);
447 
448   /* Copy over timestamps */
449   gst_buffer_copy_into (new_buffer, buffer, GST_BUFFER_COPY_TIMESTAMPS, 0, -1);
450 
451   return new_buffer;
452 }
453 
454 static gint
buffer_queue_items_cmp(BufferQueueItem * a,BufferQueueItem * b,gpointer user_data)455 buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b,
456     gpointer user_data)
457 {
458   /* gst_rtp_buffer_compare_seqnum returns the opposite of what we want,
459    * it returns negative when seqnum1 > seqnum2 and we want negative
460    * when b > a, i.e. a is smaller, so it comes first in the sequence */
461   return gst_rtp_buffer_compare_seqnum (b->seqnum, a->seqnum);
462 }
463 
464 static gboolean
gst_rtp_rtx_send_src_event(GstPad * pad,GstObject * parent,GstEvent * event)465 gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
466 {
467   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (parent);
468   gboolean res;
469 
470   switch (GST_EVENT_TYPE (event)) {
471     case GST_EVENT_CUSTOM_UPSTREAM:
472     {
473       const GstStructure *s = gst_event_get_structure (event);
474 
475       /* This event usually comes from the downstream gstrtpsession */
476       if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
477         guint seqnum = 0;
478         guint ssrc = 0;
479         GstBuffer *rtx_buf = NULL;
480 
481         /* retrieve seqnum of the packet that need to be retransmitted */
482         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
483           seqnum = -1;
484 
485         /* retrieve ssrc of the packet that need to be retransmitted */
486         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
487           ssrc = -1;
488 
489         GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X",
490             seqnum, ssrc);
491 
492         GST_OBJECT_LOCK (rtx);
493         /* check if request is for us */
494         if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
495           SSRCRtxData *data;
496           GSequenceIter *iter;
497           BufferQueueItem search_item;
498 
499           /* update statistics */
500           ++rtx->num_rtx_requests;
501 
502           data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
503 
504           search_item.seqnum = seqnum;
505           iter = g_sequence_lookup (data->queue, &search_item,
506               (GCompareDataFunc) buffer_queue_items_cmp, NULL);
507           if (iter) {
508             BufferQueueItem *item = g_sequence_get (iter);
509             GST_LOG_OBJECT (rtx, "found %u", item->seqnum);
510             rtx_buf = gst_rtp_rtx_buffer_new (rtx, item->buffer);
511           }
512 #ifndef GST_DISABLE_DEBUG
513           else {
514             BufferQueueItem *item = NULL;
515 
516             iter = g_sequence_get_begin_iter (data->queue);
517             if (!g_sequence_iter_is_end (iter))
518               item = g_sequence_get (iter);
519 
520             if (item && seqnum < item->seqnum) {
521               GST_DEBUG_OBJECT (rtx, "requested seqnum %u has already been "
522                   "removed from the rtx queue; the first available is %u",
523                   seqnum, item->seqnum);
524             } else {
525               GST_WARNING_OBJECT (rtx, "requested seqnum %u has not been "
526                   "transmitted yet in the original stream; either the remote end "
527                   "is not configured correctly, or the source is too slow",
528                   seqnum);
529             }
530           }
531 #endif
532         }
533         GST_OBJECT_UNLOCK (rtx);
534 
535         if (rtx_buf)
536           gst_rtp_rtx_send_push_out (rtx, rtx_buf);
537 
538         gst_event_unref (event);
539         res = TRUE;
540 
541         /* This event usually comes from the downstream gstrtpsession */
542       } else if (gst_structure_has_name (s, "GstRTPCollision")) {
543         guint ssrc = 0;
544 
545         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
546           ssrc = -1;
547 
548         GST_DEBUG_OBJECT (rtx, "got ssrc collision, ssrc: %X", ssrc);
549 
550         GST_OBJECT_LOCK (rtx);
551 
552         /* choose another ssrc for our retransmitted stream */
553         if (g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
554           guint master_ssrc;
555           SSRCRtxData *data;
556 
557           master_ssrc = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_ssrcs,
558                   GUINT_TO_POINTER (ssrc)));
559           data = gst_rtp_rtx_send_get_ssrc_data (rtx, master_ssrc);
560 
561           /* change rtx_ssrc and update the reverse map */
562           data->rtx_ssrc = gst_rtp_rtx_send_choose_ssrc (rtx, 0, FALSE);
563           g_hash_table_remove (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc));
564           g_hash_table_insert (rtx->rtx_ssrcs,
565               GUINT_TO_POINTER (data->rtx_ssrc),
566               GUINT_TO_POINTER (master_ssrc));
567 
568           GST_OBJECT_UNLOCK (rtx);
569 
570           /* no need to forward to payloader because we make sure to have
571            * a different ssrc
572            */
573           gst_event_unref (event);
574           res = TRUE;
575         } else {
576           /* if master ssrc has collided, remove it from our data, as it
577            * is not going to be used any longer */
578           if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
579             SSRCRtxData *data;
580             data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
581             g_hash_table_remove (rtx->rtx_ssrcs,
582                 GUINT_TO_POINTER (data->rtx_ssrc));
583             g_hash_table_remove (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
584           }
585 
586           GST_OBJECT_UNLOCK (rtx);
587 
588           /* forward event to payloader in case collided ssrc is
589            * master stream */
590           res = gst_pad_event_default (pad, parent, event);
591         }
592       } else {
593         res = gst_pad_event_default (pad, parent, event);
594       }
595       break;
596     }
597     default:
598       res = gst_pad_event_default (pad, parent, event);
599       break;
600   }
601   return res;
602 }
603 
604 static gboolean
gst_rtp_rtx_send_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)605 gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
606 {
607   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (parent);
608 
609   switch (GST_EVENT_TYPE (event)) {
610     case GST_EVENT_FLUSH_START:
611       gst_pad_push_event (rtx->srcpad, event);
612       gst_rtp_rtx_send_set_flushing (rtx, TRUE);
613       gst_pad_pause_task (rtx->srcpad);
614       return TRUE;
615     case GST_EVENT_FLUSH_STOP:
616       gst_pad_push_event (rtx->srcpad, event);
617       gst_rtp_rtx_send_set_flushing (rtx, FALSE);
618       gst_pad_start_task (rtx->srcpad,
619           (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
620       return TRUE;
621     case GST_EVENT_EOS:
622       GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it");
623       gst_rtp_rtx_send_push_out (rtx, event);
624       return TRUE;
625     case GST_EVENT_CAPS:
626     {
627       GstCaps *caps;
628       GstStructure *s;
629       guint ssrc;
630       gint payload;
631       gpointer rtx_payload;
632       SSRCRtxData *data;
633 
634       gst_event_parse_caps (event, &caps);
635 
636       s = gst_caps_get_structure (caps, 0);
637       if (!gst_structure_get_uint (s, "ssrc", &ssrc))
638         ssrc = -1;
639       if (!gst_structure_get_int (s, "payload", &payload))
640         payload = -1;
641 
642       if (payload == -1 || ssrc == G_MAXUINT)
643         break;
644 
645       if (payload == -1)
646         GST_WARNING_OBJECT (rtx, "No payload in caps");
647 
648       GST_OBJECT_LOCK (rtx);
649       data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
650       if (!g_hash_table_lookup_extended (rtx->rtx_pt_map,
651               GUINT_TO_POINTER (payload), NULL, &rtx_payload))
652         rtx_payload = GINT_TO_POINTER (-1);
653 
654       if (GPOINTER_TO_INT (rtx_payload) == -1 && payload != -1)
655         GST_WARNING_OBJECT (rtx, "Payload %d not in rtx-pt-map", payload);
656 
657       GST_DEBUG_OBJECT (rtx,
658           "got caps for payload: %d->%d, ssrc: %u->%u : %" GST_PTR_FORMAT,
659           payload, GPOINTER_TO_INT (rtx_payload), ssrc, data->rtx_ssrc, caps);
660 
661       gst_structure_get_int (s, "clock-rate", &data->clock_rate);
662 
663       /* The session might need to know the RTX ssrc */
664       caps = gst_caps_copy (caps);
665       gst_caps_set_simple (caps, "rtx-ssrc", G_TYPE_UINT, data->rtx_ssrc,
666           "rtx-seqnum-offset", G_TYPE_UINT, data->seqnum_base, NULL);
667 
668       if (GPOINTER_TO_INT (rtx_payload) != -1)
669         gst_caps_set_simple (caps, "rtx-payload", G_TYPE_INT,
670             GPOINTER_TO_INT (rtx_payload), NULL);
671 
672       GST_DEBUG_OBJECT (rtx, "got clock-rate from caps: %d for ssrc: %u",
673           data->clock_rate, ssrc);
674       GST_OBJECT_UNLOCK (rtx);
675 
676       gst_event_unref (event);
677       event = gst_event_new_caps (caps);
678       gst_caps_unref (caps);
679       break;
680     }
681     default:
682       break;
683   }
684   return gst_pad_event_default (pad, parent, event);
685 }
686 
687 /* like rtp_jitter_buffer_get_ts_diff() */
688 static guint32
gst_rtp_rtx_send_get_ts_diff(SSRCRtxData * data)689 gst_rtp_rtx_send_get_ts_diff (SSRCRtxData * data)
690 {
691   guint64 high_ts, low_ts;
692   BufferQueueItem *high_buf, *low_buf;
693   guint32 result;
694 
695   high_buf =
696       g_sequence_get (g_sequence_iter_prev (g_sequence_get_end_iter
697           (data->queue)));
698   low_buf = g_sequence_get (g_sequence_get_begin_iter (data->queue));
699 
700   if (!high_buf || !low_buf || high_buf == low_buf)
701     return 0;
702 
703   high_ts = high_buf->timestamp;
704   low_ts = low_buf->timestamp;
705 
706   /* it needs to work if ts wraps */
707   if (high_ts >= low_ts) {
708     result = (guint32) (high_ts - low_ts);
709   } else {
710     result = (guint32) (high_ts + G_MAXUINT32 + 1 - low_ts);
711   }
712 
713   /* return value in ms instead of clock ticks */
714   return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate);
715 }
716 
717 /* Must be called with lock */
718 static void
process_buffer(GstRtpRtxSend * rtx,GstBuffer * buffer)719 process_buffer (GstRtpRtxSend * rtx, GstBuffer * buffer)
720 {
721   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
722   BufferQueueItem *item;
723   SSRCRtxData *data;
724   guint16 seqnum;
725   guint8 payload_type;
726   guint32 ssrc, rtptime;
727 
728   /* read the information we want from the buffer */
729   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
730   seqnum = gst_rtp_buffer_get_seq (&rtp);
731   payload_type = gst_rtp_buffer_get_payload_type (&rtp);
732   ssrc = gst_rtp_buffer_get_ssrc (&rtp);
733   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
734   gst_rtp_buffer_unmap (&rtp);
735 
736   GST_TRACE_OBJECT (rtx, "Processing buffer seqnum: %u, ssrc: %X", seqnum,
737       ssrc);
738 
739   /* do not store the buffer if it's payload type is unknown */
740   if (g_hash_table_contains (rtx->rtx_pt_map, GUINT_TO_POINTER (payload_type))) {
741     data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
742 
743     if (data->clock_rate == 0 && rtx->clock_rate_map_structure) {
744       data->clock_rate =
745           GPOINTER_TO_INT (g_hash_table_lookup (rtx->clock_rate_map,
746               GUINT_TO_POINTER (payload_type)));
747     }
748 
749     /* add current rtp buffer to queue history */
750     item = g_slice_new0 (BufferQueueItem);
751     item->seqnum = seqnum;
752     item->timestamp = rtptime;
753     item->buffer = gst_buffer_ref (buffer);
754     g_sequence_append (data->queue, item);
755 
756     /* remove oldest packets from history if they are too many */
757     if (rtx->max_size_packets) {
758       while (g_sequence_get_length (data->queue) > rtx->max_size_packets)
759         g_sequence_remove (g_sequence_get_begin_iter (data->queue));
760     }
761     if (rtx->max_size_time) {
762       while (gst_rtp_rtx_send_get_ts_diff (data) > rtx->max_size_time)
763         g_sequence_remove (g_sequence_get_begin_iter (data->queue));
764     }
765   }
766 }
767 
768 static GstFlowReturn
gst_rtp_rtx_send_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)769 gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
770 {
771   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (parent);
772   GstFlowReturn ret;
773 
774   GST_OBJECT_LOCK (rtx);
775   process_buffer (rtx, buffer);
776   GST_OBJECT_UNLOCK (rtx);
777   ret = gst_pad_push (rtx->srcpad, buffer);
778 
779   return ret;
780 }
781 
782 static gboolean
process_buffer_from_list(GstBuffer ** buffer,guint idx,gpointer user_data)783 process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
784 {
785   process_buffer (user_data, *buffer);
786   return TRUE;
787 }
788 
789 static GstFlowReturn
gst_rtp_rtx_send_chain_list(GstPad * pad,GstObject * parent,GstBufferList * list)790 gst_rtp_rtx_send_chain_list (GstPad * pad, GstObject * parent,
791     GstBufferList * list)
792 {
793   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (parent);
794   GstFlowReturn ret;
795 
796   GST_OBJECT_LOCK (rtx);
797   gst_buffer_list_foreach (list, process_buffer_from_list, rtx);
798   GST_OBJECT_UNLOCK (rtx);
799 
800   ret = gst_pad_push_list (rtx->srcpad, list);
801 
802   return ret;
803 }
804 
805 static void
gst_rtp_rtx_send_src_loop(GstRtpRtxSend * rtx)806 gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx)
807 {
808   GstDataQueueItem *data;
809 
810   if (gst_data_queue_pop (rtx->queue, &data)) {
811     GST_LOG_OBJECT (rtx, "pushing rtx buffer %p", data->object);
812 
813     if (G_LIKELY (GST_IS_BUFFER (data->object))) {
814       GST_OBJECT_LOCK (rtx);
815       /* Update statistics just before pushing. */
816       rtx->num_rtx_packets++;
817       GST_OBJECT_UNLOCK (rtx);
818 
819       gst_pad_push (rtx->srcpad, GST_BUFFER (data->object));
820     } else if (GST_IS_EVENT (data->object)) {
821       gst_pad_push_event (rtx->srcpad, GST_EVENT (data->object));
822 
823       /* after EOS, we should not send any more buffers,
824        * even if there are more requests coming in */
825       if (GST_EVENT_TYPE (data->object) == GST_EVENT_EOS) {
826         gst_rtp_rtx_send_set_flushing (rtx, TRUE);
827       }
828     } else {
829       g_assert_not_reached ();
830     }
831 
832     data->object = NULL;        /* we no longer own that object */
833     data->destroy (data);
834   } else {
835     GST_LOG_OBJECT (rtx, "flushing");
836     gst_pad_pause_task (rtx->srcpad);
837   }
838 }
839 
840 static gboolean
gst_rtp_rtx_send_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)841 gst_rtp_rtx_send_activate_mode (GstPad * pad, GstObject * parent,
842     GstPadMode mode, gboolean active)
843 {
844   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (parent);
845   gboolean ret = FALSE;
846 
847   switch (mode) {
848     case GST_PAD_MODE_PUSH:
849       if (active) {
850         gst_rtp_rtx_send_set_flushing (rtx, FALSE);
851         ret = gst_pad_start_task (rtx->srcpad,
852             (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
853       } else {
854         gst_rtp_rtx_send_set_flushing (rtx, TRUE);
855         ret = gst_pad_stop_task (rtx->srcpad);
856       }
857       GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret);
858       break;
859     default:
860       break;
861   }
862   return ret;
863 }
864 
865 static void
gst_rtp_rtx_send_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)866 gst_rtp_rtx_send_get_property (GObject * object,
867     guint prop_id, GValue * value, GParamSpec * pspec)
868 {
869   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (object);
870 
871   switch (prop_id) {
872     case PROP_PAYLOAD_TYPE_MAP:
873       GST_OBJECT_LOCK (rtx);
874       g_value_set_boxed (value, rtx->rtx_pt_map_structure);
875       GST_OBJECT_UNLOCK (rtx);
876       break;
877     case PROP_MAX_SIZE_TIME:
878       GST_OBJECT_LOCK (rtx);
879       g_value_set_uint (value, rtx->max_size_time);
880       GST_OBJECT_UNLOCK (rtx);
881       break;
882     case PROP_MAX_SIZE_PACKETS:
883       GST_OBJECT_LOCK (rtx);
884       g_value_set_uint (value, rtx->max_size_packets);
885       GST_OBJECT_UNLOCK (rtx);
886       break;
887     case PROP_NUM_RTX_REQUESTS:
888       GST_OBJECT_LOCK (rtx);
889       g_value_set_uint (value, rtx->num_rtx_requests);
890       GST_OBJECT_UNLOCK (rtx);
891       break;
892     case PROP_NUM_RTX_PACKETS:
893       GST_OBJECT_LOCK (rtx);
894       g_value_set_uint (value, rtx->num_rtx_packets);
895       GST_OBJECT_UNLOCK (rtx);
896       break;
897     case PROP_CLOCK_RATE_MAP:
898       GST_OBJECT_LOCK (rtx);
899       g_value_set_boxed (value, rtx->clock_rate_map_structure);
900       GST_OBJECT_UNLOCK (rtx);
901       break;
902     default:
903       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
904       break;
905   }
906 }
907 
908 static gboolean
structure_to_hash_table(GQuark field_id,const GValue * value,gpointer hash)909 structure_to_hash_table (GQuark field_id, const GValue * value, gpointer hash)
910 {
911   const gchar *field_str;
912   guint field_uint;
913   guint value_uint;
914 
915   field_str = g_quark_to_string (field_id);
916   field_uint = atoi (field_str);
917   value_uint = g_value_get_uint (value);
918   g_hash_table_insert ((GHashTable *) hash, GUINT_TO_POINTER (field_uint),
919       GUINT_TO_POINTER (value_uint));
920 
921   return TRUE;
922 }
923 
924 static void
gst_rtp_rtx_send_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)925 gst_rtp_rtx_send_set_property (GObject * object,
926     guint prop_id, const GValue * value, GParamSpec * pspec)
927 {
928   GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (object);
929 
930   switch (prop_id) {
931     case PROP_SSRC_MAP:
932       GST_OBJECT_LOCK (rtx);
933       if (rtx->external_ssrc_map)
934         gst_structure_free (rtx->external_ssrc_map);
935       rtx->external_ssrc_map = g_value_dup_boxed (value);
936       GST_OBJECT_UNLOCK (rtx);
937       break;
938     case PROP_PAYLOAD_TYPE_MAP:
939       GST_OBJECT_LOCK (rtx);
940       if (rtx->rtx_pt_map_structure)
941         gst_structure_free (rtx->rtx_pt_map_structure);
942       rtx->rtx_pt_map_structure = g_value_dup_boxed (value);
943       g_hash_table_remove_all (rtx->rtx_pt_map);
944       gst_structure_foreach (rtx->rtx_pt_map_structure, structure_to_hash_table,
945           rtx->rtx_pt_map);
946       GST_OBJECT_UNLOCK (rtx);
947       break;
948     case PROP_MAX_SIZE_TIME:
949       GST_OBJECT_LOCK (rtx);
950       rtx->max_size_time = g_value_get_uint (value);
951       GST_OBJECT_UNLOCK (rtx);
952       break;
953     case PROP_MAX_SIZE_PACKETS:
954       GST_OBJECT_LOCK (rtx);
955       rtx->max_size_packets = g_value_get_uint (value);
956       GST_OBJECT_UNLOCK (rtx);
957       break;
958     case PROP_CLOCK_RATE_MAP:
959       GST_OBJECT_LOCK (rtx);
960       if (rtx->clock_rate_map_structure)
961         gst_structure_free (rtx->clock_rate_map_structure);
962       rtx->clock_rate_map_structure = g_value_dup_boxed (value);
963       g_hash_table_remove_all (rtx->clock_rate_map);
964       gst_structure_foreach (rtx->clock_rate_map_structure,
965           structure_to_hash_table, rtx->clock_rate_map);
966       GST_OBJECT_UNLOCK (rtx);
967       break;
968     default:
969       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
970       break;
971   }
972 }
973 
974 static GstStateChangeReturn
gst_rtp_rtx_send_change_state(GstElement * element,GstStateChange transition)975 gst_rtp_rtx_send_change_state (GstElement * element, GstStateChange transition)
976 {
977   GstStateChangeReturn ret;
978   GstRtpRtxSend *rtx;
979 
980   rtx = GST_RTP_RTX_SEND_CAST (element);
981 
982   switch (transition) {
983     default:
984       break;
985   }
986 
987   ret =
988       GST_ELEMENT_CLASS (gst_rtp_rtx_send_parent_class)->change_state (element,
989       transition);
990 
991   switch (transition) {
992     case GST_STATE_CHANGE_PAUSED_TO_READY:
993       gst_rtp_rtx_send_reset (rtx);
994       break;
995     default:
996       break;
997   }
998 
999   return ret;
1000 }
1001