1 /* RTP Retransmission sender element for GStreamer
2 *
3 * gstrtprtxsend.c:
4 *
5 * Copyright (C) 2013 Collabora Ltd.
6 * @author Julien Isorce <julien.isorce@collabora.co.uk>
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
17 *
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
22 */
23
24 /**
25 * SECTION:element-rtprtxsend
26 * @title: rtprtxsend
27 *
28 * See #GstRtpRtxReceive for examples
29 *
30 * The purpose of the sender RTX object is to keep a history of RTP packets up
31 * to a configurable limit (max-size-time or max-size-packets). It will listen
32 * for upstream custom retransmission events (GstRTPRetransmissionRequest) that
33 * comes from downstream (#GstRtpSession). When receiving a request it will
34 * look up the requested seqnum in its list of stored packets. If the packet
35 * is available, it will create a RTX packet according to RFC 4588 and send
36 * this as an auxiliary stream. RTX is SSRC-multiplexed
37 */
38
39 #ifdef HAVE_CONFIG_H
40 #include "config.h"
41 #endif
42
43 #include <gst/gst.h>
44 #include <gst/rtp/gstrtpbuffer.h>
45 #include <string.h>
46 #include <stdlib.h>
47
48 #include "gstrtprtxsend.h"
49
50 GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_send_debug);
51 #define GST_CAT_DEFAULT gst_rtp_rtx_send_debug
52
53 #define DEFAULT_RTX_PAYLOAD_TYPE 0
54 #define DEFAULT_MAX_SIZE_TIME 0
55 #define DEFAULT_MAX_SIZE_PACKETS 100
56
57 enum
58 {
59 PROP_0,
60 PROP_SSRC_MAP,
61 PROP_PAYLOAD_TYPE_MAP,
62 PROP_MAX_SIZE_TIME,
63 PROP_MAX_SIZE_PACKETS,
64 PROP_NUM_RTX_REQUESTS,
65 PROP_NUM_RTX_PACKETS,
66 PROP_CLOCK_RATE_MAP,
67 };
68
69 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
70 GST_PAD_SRC,
71 GST_PAD_ALWAYS,
72 GST_STATIC_CAPS ("application/x-rtp")
73 );
74
75 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
76 GST_PAD_SINK,
77 GST_PAD_ALWAYS,
78 GST_STATIC_CAPS ("application/x-rtp")
79 );
80
81 static gboolean gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
82 guint visible, guint bytes, guint64 time, gpointer checkdata);
83
84 static gboolean gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent,
85 GstEvent * event);
86 static gboolean gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent,
87 GstEvent * event);
88 static GstFlowReturn gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent,
89 GstBuffer * buffer);
90 static GstFlowReturn gst_rtp_rtx_send_chain_list (GstPad * pad,
91 GstObject * parent, GstBufferList * list);
92
93 static void gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx);
94 static gboolean gst_rtp_rtx_send_activate_mode (GstPad * pad,
95 GstObject * parent, GstPadMode mode, gboolean active);
96
97 static GstStateChangeReturn gst_rtp_rtx_send_change_state (GstElement *
98 element, GstStateChange transition);
99
100 static void gst_rtp_rtx_send_set_property (GObject * object, guint prop_id,
101 const GValue * value, GParamSpec * pspec);
102 static void gst_rtp_rtx_send_get_property (GObject * object, guint prop_id,
103 GValue * value, GParamSpec * pspec);
104 static void gst_rtp_rtx_send_finalize (GObject * object);
105
106 G_DEFINE_TYPE_WITH_CODE (GstRtpRtxSend, gst_rtp_rtx_send, GST_TYPE_ELEMENT,
107 GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_send_debug, "rtprtxsend", 0,
108 "rtp retransmission sender"));
109 GST_ELEMENT_REGISTER_DEFINE (rtprtxsend, "rtprtxsend", GST_RANK_NONE,
110 GST_TYPE_RTP_RTX_SEND);
111
112 typedef struct
113 {
114 guint16 seqnum;
115 guint32 timestamp;
116 GstBuffer *buffer;
117 } BufferQueueItem;
118
119 static void
buffer_queue_item_free(BufferQueueItem * item)120 buffer_queue_item_free (BufferQueueItem * item)
121 {
122 gst_buffer_unref (item->buffer);
123 g_slice_free (BufferQueueItem, item);
124 }
125
126 typedef struct
127 {
128 guint32 rtx_ssrc;
129 guint16 seqnum_base, next_seqnum;
130 gint clock_rate;
131
132 /* history of rtp packets */
133 GSequence *queue;
134 } SSRCRtxData;
135
136 static SSRCRtxData *
ssrc_rtx_data_new(guint32 rtx_ssrc)137 ssrc_rtx_data_new (guint32 rtx_ssrc)
138 {
139 SSRCRtxData *data = g_slice_new0 (SSRCRtxData);
140
141 data->rtx_ssrc = rtx_ssrc;
142 data->next_seqnum = data->seqnum_base = g_random_int_range (0, G_MAXUINT16);
143 data->queue = g_sequence_new ((GDestroyNotify) buffer_queue_item_free);
144
145 return data;
146 }
147
148 static void
ssrc_rtx_data_free(SSRCRtxData * data)149 ssrc_rtx_data_free (SSRCRtxData * data)
150 {
151 g_sequence_free (data->queue);
152 g_slice_free (SSRCRtxData, data);
153 }
154
155 static void
gst_rtp_rtx_send_class_init(GstRtpRtxSendClass * klass)156 gst_rtp_rtx_send_class_init (GstRtpRtxSendClass * klass)
157 {
158 GObjectClass *gobject_class;
159 GstElementClass *gstelement_class;
160
161 gobject_class = (GObjectClass *) klass;
162 gstelement_class = (GstElementClass *) klass;
163
164 gobject_class->get_property = gst_rtp_rtx_send_get_property;
165 gobject_class->set_property = gst_rtp_rtx_send_set_property;
166 gobject_class->finalize = gst_rtp_rtx_send_finalize;
167
168 g_object_class_install_property (gobject_class, PROP_SSRC_MAP,
169 g_param_spec_boxed ("ssrc-map", "SSRC Map",
170 "Map of SSRCs to their retransmission SSRCs for SSRC-multiplexed mode"
171 " (default = random)", GST_TYPE_STRUCTURE,
172 G_PARAM_WRITABLE | G_PARAM_STATIC_STRINGS));
173
174 g_object_class_install_property (gobject_class, PROP_PAYLOAD_TYPE_MAP,
175 g_param_spec_boxed ("payload-type-map", "Payload Type Map",
176 "Map of original payload types to their retransmission payload types",
177 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
178
179 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
180 g_param_spec_uint ("max-size-time", "Max Size Time",
181 "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
182 DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
183
184 g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
185 g_param_spec_uint ("max-size-packets", "Max Size Packets",
186 "Amount of packets to queue (0 = unlimited)", 0, G_MAXINT16,
187 DEFAULT_MAX_SIZE_PACKETS,
188 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
189
190 g_object_class_install_property (gobject_class, PROP_NUM_RTX_REQUESTS,
191 g_param_spec_uint ("num-rtx-requests", "Num RTX Requests",
192 "Number of retransmission events received", 0, G_MAXUINT,
193 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
194
195 g_object_class_install_property (gobject_class, PROP_NUM_RTX_PACKETS,
196 g_param_spec_uint ("num-rtx-packets", "Num RTX Packets",
197 " Number of retransmission packets sent", 0, G_MAXUINT,
198 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
199
200 g_object_class_install_property (gobject_class, PROP_CLOCK_RATE_MAP,
201 g_param_spec_boxed ("clock-rate-map", "Clock Rate Map",
202 "Map of payload types to their clock rates",
203 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
204
205 gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
206 gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
207
208 gst_element_class_set_static_metadata (gstelement_class,
209 "RTP Retransmission Sender", "Codec",
210 "Retransmit RTP packets when needed, according to RFC4588",
211 "Julien Isorce <julien.isorce@collabora.co.uk>");
212
213 gstelement_class->change_state =
214 GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_change_state);
215 }
216
217 static void
gst_rtp_rtx_send_reset(GstRtpRtxSend * rtx)218 gst_rtp_rtx_send_reset (GstRtpRtxSend * rtx)
219 {
220 GST_OBJECT_LOCK (rtx);
221 gst_data_queue_flush (rtx->queue);
222 g_hash_table_remove_all (rtx->ssrc_data);
223 g_hash_table_remove_all (rtx->rtx_ssrcs);
224 rtx->num_rtx_requests = 0;
225 rtx->num_rtx_packets = 0;
226 GST_OBJECT_UNLOCK (rtx);
227 }
228
229 static void
gst_rtp_rtx_send_finalize(GObject * object)230 gst_rtp_rtx_send_finalize (GObject * object)
231 {
232 GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (object);
233
234 g_hash_table_unref (rtx->ssrc_data);
235 g_hash_table_unref (rtx->rtx_ssrcs);
236 if (rtx->external_ssrc_map)
237 gst_structure_free (rtx->external_ssrc_map);
238 g_hash_table_unref (rtx->rtx_pt_map);
239 if (rtx->rtx_pt_map_structure)
240 gst_structure_free (rtx->rtx_pt_map_structure);
241 g_hash_table_unref (rtx->clock_rate_map);
242 if (rtx->clock_rate_map_structure)
243 gst_structure_free (rtx->clock_rate_map_structure);
244 g_object_unref (rtx->queue);
245
246 G_OBJECT_CLASS (gst_rtp_rtx_send_parent_class)->finalize (object);
247 }
248
249 static void
gst_rtp_rtx_send_init(GstRtpRtxSend * rtx)250 gst_rtp_rtx_send_init (GstRtpRtxSend * rtx)
251 {
252 GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
253
254 rtx->srcpad =
255 gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
256 "src"), "src");
257 GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
258 GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
259 gst_pad_set_event_function (rtx->srcpad,
260 GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_src_event));
261 gst_pad_set_activatemode_function (rtx->srcpad,
262 GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_activate_mode));
263 gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
264
265 rtx->sinkpad =
266 gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
267 "sink"), "sink");
268 GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
269 GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
270 gst_pad_set_event_function (rtx->sinkpad,
271 GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_sink_event));
272 gst_pad_set_chain_function (rtx->sinkpad,
273 GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain));
274 gst_pad_set_chain_list_function (rtx->sinkpad,
275 GST_DEBUG_FUNCPTR (gst_rtp_rtx_send_chain_list));
276 gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
277
278 rtx->queue = gst_data_queue_new (gst_rtp_rtx_send_queue_check_full, NULL,
279 NULL, rtx);
280 rtx->ssrc_data = g_hash_table_new_full (g_direct_hash, g_direct_equal,
281 NULL, (GDestroyNotify) ssrc_rtx_data_free);
282 rtx->rtx_ssrcs = g_hash_table_new (g_direct_hash, g_direct_equal);
283 rtx->rtx_pt_map = g_hash_table_new (g_direct_hash, g_direct_equal);
284 rtx->clock_rate_map = g_hash_table_new (g_direct_hash, g_direct_equal);
285
286 rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
287 rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
288 }
289
290 static void
gst_rtp_rtx_send_set_flushing(GstRtpRtxSend * rtx,gboolean flush)291 gst_rtp_rtx_send_set_flushing (GstRtpRtxSend * rtx, gboolean flush)
292 {
293 GST_OBJECT_LOCK (rtx);
294 gst_data_queue_set_flushing (rtx->queue, flush);
295 gst_data_queue_flush (rtx->queue);
296 GST_OBJECT_UNLOCK (rtx);
297 }
298
299 static gboolean
gst_rtp_rtx_send_queue_check_full(GstDataQueue * queue,guint visible,guint bytes,guint64 time,gpointer checkdata)300 gst_rtp_rtx_send_queue_check_full (GstDataQueue * queue,
301 guint visible, guint bytes, guint64 time, gpointer checkdata)
302 {
303 return FALSE;
304 }
305
306 static void
gst_rtp_rtx_data_queue_item_free(gpointer item)307 gst_rtp_rtx_data_queue_item_free (gpointer item)
308 {
309 GstDataQueueItem *data = item;
310 if (data->object)
311 gst_mini_object_unref (data->object);
312 g_slice_free (GstDataQueueItem, data);
313 }
314
315 static gboolean
gst_rtp_rtx_send_push_out(GstRtpRtxSend * rtx,gpointer object)316 gst_rtp_rtx_send_push_out (GstRtpRtxSend * rtx, gpointer object)
317 {
318 GstDataQueueItem *data;
319 gboolean success;
320
321 data = g_slice_new0 (GstDataQueueItem);
322 data->object = GST_MINI_OBJECT (object);
323 data->size = 1;
324 data->duration = 1;
325 data->visible = TRUE;
326 data->destroy = gst_rtp_rtx_data_queue_item_free;
327
328 success = gst_data_queue_push (rtx->queue, data);
329
330 if (!success)
331 data->destroy (data);
332
333 return success;
334 }
335
336 static guint32
gst_rtp_rtx_send_choose_ssrc(GstRtpRtxSend * rtx,guint32 choice,gboolean consider_choice)337 gst_rtp_rtx_send_choose_ssrc (GstRtpRtxSend * rtx, guint32 choice,
338 gboolean consider_choice)
339 {
340 guint32 ssrc = consider_choice ? choice : g_random_int ();
341
342 /* make sure to be different than any other */
343 while (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc)) ||
344 g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
345 ssrc = g_random_int ();
346 }
347
348 return ssrc;
349 }
350
351 static SSRCRtxData *
gst_rtp_rtx_send_get_ssrc_data(GstRtpRtxSend * rtx,guint32 ssrc)352 gst_rtp_rtx_send_get_ssrc_data (GstRtpRtxSend * rtx, guint32 ssrc)
353 {
354 SSRCRtxData *data;
355 guint32 rtx_ssrc = 0;
356 gboolean consider = FALSE;
357
358 if (G_UNLIKELY (!g_hash_table_contains (rtx->ssrc_data,
359 GUINT_TO_POINTER (ssrc)))) {
360 if (rtx->external_ssrc_map) {
361 gchar *ssrc_str;
362 ssrc_str = g_strdup_printf ("%" G_GUINT32_FORMAT, ssrc);
363 consider = gst_structure_get_uint (rtx->external_ssrc_map, ssrc_str,
364 &rtx_ssrc);
365 g_free (ssrc_str);
366 }
367 rtx_ssrc = gst_rtp_rtx_send_choose_ssrc (rtx, rtx_ssrc, consider);
368 data = ssrc_rtx_data_new (rtx_ssrc);
369 g_hash_table_insert (rtx->ssrc_data, GUINT_TO_POINTER (ssrc), data);
370 g_hash_table_insert (rtx->rtx_ssrcs, GUINT_TO_POINTER (rtx_ssrc),
371 GUINT_TO_POINTER (ssrc));
372 } else {
373 data = g_hash_table_lookup (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
374 }
375 return data;
376 }
377
378 /* Copy fixed header and extension. Add OSN before to copy payload
379 * Copy memory to avoid to manually copy each rtp buffer field.
380 */
381 static GstBuffer *
gst_rtp_rtx_buffer_new(GstRtpRtxSend * rtx,GstBuffer * buffer)382 gst_rtp_rtx_buffer_new (GstRtpRtxSend * rtx, GstBuffer * buffer)
383 {
384 GstMemory *mem = NULL;
385 GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
386 GstRTPBuffer new_rtp = GST_RTP_BUFFER_INIT;
387 GstBuffer *new_buffer = gst_buffer_new ();
388 GstMapInfo map;
389 guint payload_len = 0;
390 SSRCRtxData *data;
391 guint32 ssrc;
392 guint16 seqnum;
393 guint8 fmtp;
394
395 gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
396
397 /* get needed data from GstRtpRtxSend */
398 ssrc = gst_rtp_buffer_get_ssrc (&rtp);
399 data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
400 ssrc = data->rtx_ssrc;
401 seqnum = data->next_seqnum++;
402 fmtp = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_pt_map,
403 GUINT_TO_POINTER (gst_rtp_buffer_get_payload_type (&rtp))));
404
405 GST_DEBUG_OBJECT (rtx, "creating rtx buffer, orig seqnum: %u, "
406 "rtx seqnum: %u, rtx ssrc: %X", gst_rtp_buffer_get_seq (&rtp),
407 seqnum, ssrc);
408
409 /* gst_rtp_buffer_map does not map the payload so do it now */
410 gst_rtp_buffer_get_payload (&rtp);
411
412 /* copy fixed header */
413 mem = gst_memory_copy (rtp.map[0].memory, 0, rtp.size[0]);
414 gst_buffer_append_memory (new_buffer, mem);
415
416 /* copy extension if any */
417 if (rtp.size[1]) {
418 mem = gst_allocator_alloc (NULL, rtp.size[1], NULL);
419 gst_memory_map (mem, &map, GST_MAP_WRITE);
420 memcpy (map.data, rtp.data[1], rtp.size[1]);
421 gst_memory_unmap (mem, &map);
422 gst_buffer_append_memory (new_buffer, mem);
423 }
424
425 /* copy payload and add OSN just before */
426 payload_len = 2 + rtp.size[2];
427 mem = gst_allocator_alloc (NULL, payload_len, NULL);
428
429 gst_memory_map (mem, &map, GST_MAP_WRITE);
430 GST_WRITE_UINT16_BE (map.data, gst_rtp_buffer_get_seq (&rtp));
431 if (rtp.size[2])
432 memcpy (map.data + 2, rtp.data[2], rtp.size[2]);
433 gst_memory_unmap (mem, &map);
434 gst_buffer_append_memory (new_buffer, mem);
435
436 /* everything needed is copied */
437 gst_rtp_buffer_unmap (&rtp);
438
439 /* set ssrc, seqnum and fmtp */
440 gst_rtp_buffer_map (new_buffer, GST_MAP_WRITE, &new_rtp);
441 gst_rtp_buffer_set_ssrc (&new_rtp, ssrc);
442 gst_rtp_buffer_set_seq (&new_rtp, seqnum);
443 gst_rtp_buffer_set_payload_type (&new_rtp, fmtp);
444 /* RFC 4588: let other elements do the padding, as normal */
445 gst_rtp_buffer_set_padding (&new_rtp, FALSE);
446 gst_rtp_buffer_unmap (&new_rtp);
447
448 /* Copy over timestamps */
449 gst_buffer_copy_into (new_buffer, buffer, GST_BUFFER_COPY_TIMESTAMPS, 0, -1);
450
451 return new_buffer;
452 }
453
454 static gint
buffer_queue_items_cmp(BufferQueueItem * a,BufferQueueItem * b,gpointer user_data)455 buffer_queue_items_cmp (BufferQueueItem * a, BufferQueueItem * b,
456 gpointer user_data)
457 {
458 /* gst_rtp_buffer_compare_seqnum returns the opposite of what we want,
459 * it returns negative when seqnum1 > seqnum2 and we want negative
460 * when b > a, i.e. a is smaller, so it comes first in the sequence */
461 return gst_rtp_buffer_compare_seqnum (b->seqnum, a->seqnum);
462 }
463
464 static gboolean
gst_rtp_rtx_send_src_event(GstPad * pad,GstObject * parent,GstEvent * event)465 gst_rtp_rtx_send_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
466 {
467 GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (parent);
468 gboolean res;
469
470 switch (GST_EVENT_TYPE (event)) {
471 case GST_EVENT_CUSTOM_UPSTREAM:
472 {
473 const GstStructure *s = gst_event_get_structure (event);
474
475 /* This event usually comes from the downstream gstrtpsession */
476 if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
477 guint seqnum = 0;
478 guint ssrc = 0;
479 GstBuffer *rtx_buf = NULL;
480
481 /* retrieve seqnum of the packet that need to be retransmitted */
482 if (!gst_structure_get_uint (s, "seqnum", &seqnum))
483 seqnum = -1;
484
485 /* retrieve ssrc of the packet that need to be retransmitted */
486 if (!gst_structure_get_uint (s, "ssrc", &ssrc))
487 ssrc = -1;
488
489 GST_DEBUG_OBJECT (rtx, "got rtx request for seqnum: %u, ssrc: %X",
490 seqnum, ssrc);
491
492 GST_OBJECT_LOCK (rtx);
493 /* check if request is for us */
494 if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
495 SSRCRtxData *data;
496 GSequenceIter *iter;
497 BufferQueueItem search_item;
498
499 /* update statistics */
500 ++rtx->num_rtx_requests;
501
502 data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
503
504 search_item.seqnum = seqnum;
505 iter = g_sequence_lookup (data->queue, &search_item,
506 (GCompareDataFunc) buffer_queue_items_cmp, NULL);
507 if (iter) {
508 BufferQueueItem *item = g_sequence_get (iter);
509 GST_LOG_OBJECT (rtx, "found %u", item->seqnum);
510 rtx_buf = gst_rtp_rtx_buffer_new (rtx, item->buffer);
511 }
512 #ifndef GST_DISABLE_DEBUG
513 else {
514 BufferQueueItem *item = NULL;
515
516 iter = g_sequence_get_begin_iter (data->queue);
517 if (!g_sequence_iter_is_end (iter))
518 item = g_sequence_get (iter);
519
520 if (item && seqnum < item->seqnum) {
521 GST_DEBUG_OBJECT (rtx, "requested seqnum %u has already been "
522 "removed from the rtx queue; the first available is %u",
523 seqnum, item->seqnum);
524 } else {
525 GST_WARNING_OBJECT (rtx, "requested seqnum %u has not been "
526 "transmitted yet in the original stream; either the remote end "
527 "is not configured correctly, or the source is too slow",
528 seqnum);
529 }
530 }
531 #endif
532 }
533 GST_OBJECT_UNLOCK (rtx);
534
535 if (rtx_buf)
536 gst_rtp_rtx_send_push_out (rtx, rtx_buf);
537
538 gst_event_unref (event);
539 res = TRUE;
540
541 /* This event usually comes from the downstream gstrtpsession */
542 } else if (gst_structure_has_name (s, "GstRTPCollision")) {
543 guint ssrc = 0;
544
545 if (!gst_structure_get_uint (s, "ssrc", &ssrc))
546 ssrc = -1;
547
548 GST_DEBUG_OBJECT (rtx, "got ssrc collision, ssrc: %X", ssrc);
549
550 GST_OBJECT_LOCK (rtx);
551
552 /* choose another ssrc for our retransmitted stream */
553 if (g_hash_table_contains (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc))) {
554 guint master_ssrc;
555 SSRCRtxData *data;
556
557 master_ssrc = GPOINTER_TO_UINT (g_hash_table_lookup (rtx->rtx_ssrcs,
558 GUINT_TO_POINTER (ssrc)));
559 data = gst_rtp_rtx_send_get_ssrc_data (rtx, master_ssrc);
560
561 /* change rtx_ssrc and update the reverse map */
562 data->rtx_ssrc = gst_rtp_rtx_send_choose_ssrc (rtx, 0, FALSE);
563 g_hash_table_remove (rtx->rtx_ssrcs, GUINT_TO_POINTER (ssrc));
564 g_hash_table_insert (rtx->rtx_ssrcs,
565 GUINT_TO_POINTER (data->rtx_ssrc),
566 GUINT_TO_POINTER (master_ssrc));
567
568 GST_OBJECT_UNLOCK (rtx);
569
570 /* no need to forward to payloader because we make sure to have
571 * a different ssrc
572 */
573 gst_event_unref (event);
574 res = TRUE;
575 } else {
576 /* if master ssrc has collided, remove it from our data, as it
577 * is not going to be used any longer */
578 if (g_hash_table_contains (rtx->ssrc_data, GUINT_TO_POINTER (ssrc))) {
579 SSRCRtxData *data;
580 data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
581 g_hash_table_remove (rtx->rtx_ssrcs,
582 GUINT_TO_POINTER (data->rtx_ssrc));
583 g_hash_table_remove (rtx->ssrc_data, GUINT_TO_POINTER (ssrc));
584 }
585
586 GST_OBJECT_UNLOCK (rtx);
587
588 /* forward event to payloader in case collided ssrc is
589 * master stream */
590 res = gst_pad_event_default (pad, parent, event);
591 }
592 } else {
593 res = gst_pad_event_default (pad, parent, event);
594 }
595 break;
596 }
597 default:
598 res = gst_pad_event_default (pad, parent, event);
599 break;
600 }
601 return res;
602 }
603
604 static gboolean
gst_rtp_rtx_send_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)605 gst_rtp_rtx_send_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
606 {
607 GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (parent);
608
609 switch (GST_EVENT_TYPE (event)) {
610 case GST_EVENT_FLUSH_START:
611 gst_pad_push_event (rtx->srcpad, event);
612 gst_rtp_rtx_send_set_flushing (rtx, TRUE);
613 gst_pad_pause_task (rtx->srcpad);
614 return TRUE;
615 case GST_EVENT_FLUSH_STOP:
616 gst_pad_push_event (rtx->srcpad, event);
617 gst_rtp_rtx_send_set_flushing (rtx, FALSE);
618 gst_pad_start_task (rtx->srcpad,
619 (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
620 return TRUE;
621 case GST_EVENT_EOS:
622 GST_INFO_OBJECT (rtx, "Got EOS - enqueueing it");
623 gst_rtp_rtx_send_push_out (rtx, event);
624 return TRUE;
625 case GST_EVENT_CAPS:
626 {
627 GstCaps *caps;
628 GstStructure *s;
629 guint ssrc;
630 gint payload;
631 gpointer rtx_payload;
632 SSRCRtxData *data;
633
634 gst_event_parse_caps (event, &caps);
635
636 s = gst_caps_get_structure (caps, 0);
637 if (!gst_structure_get_uint (s, "ssrc", &ssrc))
638 ssrc = -1;
639 if (!gst_structure_get_int (s, "payload", &payload))
640 payload = -1;
641
642 if (payload == -1 || ssrc == G_MAXUINT)
643 break;
644
645 if (payload == -1)
646 GST_WARNING_OBJECT (rtx, "No payload in caps");
647
648 GST_OBJECT_LOCK (rtx);
649 data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
650 if (!g_hash_table_lookup_extended (rtx->rtx_pt_map,
651 GUINT_TO_POINTER (payload), NULL, &rtx_payload))
652 rtx_payload = GINT_TO_POINTER (-1);
653
654 if (GPOINTER_TO_INT (rtx_payload) == -1 && payload != -1)
655 GST_WARNING_OBJECT (rtx, "Payload %d not in rtx-pt-map", payload);
656
657 GST_DEBUG_OBJECT (rtx,
658 "got caps for payload: %d->%d, ssrc: %u->%u : %" GST_PTR_FORMAT,
659 payload, GPOINTER_TO_INT (rtx_payload), ssrc, data->rtx_ssrc, caps);
660
661 gst_structure_get_int (s, "clock-rate", &data->clock_rate);
662
663 /* The session might need to know the RTX ssrc */
664 caps = gst_caps_copy (caps);
665 gst_caps_set_simple (caps, "rtx-ssrc", G_TYPE_UINT, data->rtx_ssrc,
666 "rtx-seqnum-offset", G_TYPE_UINT, data->seqnum_base, NULL);
667
668 if (GPOINTER_TO_INT (rtx_payload) != -1)
669 gst_caps_set_simple (caps, "rtx-payload", G_TYPE_INT,
670 GPOINTER_TO_INT (rtx_payload), NULL);
671
672 GST_DEBUG_OBJECT (rtx, "got clock-rate from caps: %d for ssrc: %u",
673 data->clock_rate, ssrc);
674 GST_OBJECT_UNLOCK (rtx);
675
676 gst_event_unref (event);
677 event = gst_event_new_caps (caps);
678 gst_caps_unref (caps);
679 break;
680 }
681 default:
682 break;
683 }
684 return gst_pad_event_default (pad, parent, event);
685 }
686
687 /* like rtp_jitter_buffer_get_ts_diff() */
688 static guint32
gst_rtp_rtx_send_get_ts_diff(SSRCRtxData * data)689 gst_rtp_rtx_send_get_ts_diff (SSRCRtxData * data)
690 {
691 guint64 high_ts, low_ts;
692 BufferQueueItem *high_buf, *low_buf;
693 guint32 result;
694
695 high_buf =
696 g_sequence_get (g_sequence_iter_prev (g_sequence_get_end_iter
697 (data->queue)));
698 low_buf = g_sequence_get (g_sequence_get_begin_iter (data->queue));
699
700 if (!high_buf || !low_buf || high_buf == low_buf)
701 return 0;
702
703 high_ts = high_buf->timestamp;
704 low_ts = low_buf->timestamp;
705
706 /* it needs to work if ts wraps */
707 if (high_ts >= low_ts) {
708 result = (guint32) (high_ts - low_ts);
709 } else {
710 result = (guint32) (high_ts + G_MAXUINT32 + 1 - low_ts);
711 }
712
713 /* return value in ms instead of clock ticks */
714 return (guint32) gst_util_uint64_scale_int (result, 1000, data->clock_rate);
715 }
716
717 /* Must be called with lock */
718 static void
process_buffer(GstRtpRtxSend * rtx,GstBuffer * buffer)719 process_buffer (GstRtpRtxSend * rtx, GstBuffer * buffer)
720 {
721 GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
722 BufferQueueItem *item;
723 SSRCRtxData *data;
724 guint16 seqnum;
725 guint8 payload_type;
726 guint32 ssrc, rtptime;
727
728 /* read the information we want from the buffer */
729 gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
730 seqnum = gst_rtp_buffer_get_seq (&rtp);
731 payload_type = gst_rtp_buffer_get_payload_type (&rtp);
732 ssrc = gst_rtp_buffer_get_ssrc (&rtp);
733 rtptime = gst_rtp_buffer_get_timestamp (&rtp);
734 gst_rtp_buffer_unmap (&rtp);
735
736 GST_TRACE_OBJECT (rtx, "Processing buffer seqnum: %u, ssrc: %X", seqnum,
737 ssrc);
738
739 /* do not store the buffer if it's payload type is unknown */
740 if (g_hash_table_contains (rtx->rtx_pt_map, GUINT_TO_POINTER (payload_type))) {
741 data = gst_rtp_rtx_send_get_ssrc_data (rtx, ssrc);
742
743 if (data->clock_rate == 0 && rtx->clock_rate_map_structure) {
744 data->clock_rate =
745 GPOINTER_TO_INT (g_hash_table_lookup (rtx->clock_rate_map,
746 GUINT_TO_POINTER (payload_type)));
747 }
748
749 /* add current rtp buffer to queue history */
750 item = g_slice_new0 (BufferQueueItem);
751 item->seqnum = seqnum;
752 item->timestamp = rtptime;
753 item->buffer = gst_buffer_ref (buffer);
754 g_sequence_append (data->queue, item);
755
756 /* remove oldest packets from history if they are too many */
757 if (rtx->max_size_packets) {
758 while (g_sequence_get_length (data->queue) > rtx->max_size_packets)
759 g_sequence_remove (g_sequence_get_begin_iter (data->queue));
760 }
761 if (rtx->max_size_time) {
762 while (gst_rtp_rtx_send_get_ts_diff (data) > rtx->max_size_time)
763 g_sequence_remove (g_sequence_get_begin_iter (data->queue));
764 }
765 }
766 }
767
768 static GstFlowReturn
gst_rtp_rtx_send_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)769 gst_rtp_rtx_send_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
770 {
771 GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (parent);
772 GstFlowReturn ret;
773
774 GST_OBJECT_LOCK (rtx);
775 process_buffer (rtx, buffer);
776 GST_OBJECT_UNLOCK (rtx);
777 ret = gst_pad_push (rtx->srcpad, buffer);
778
779 return ret;
780 }
781
782 static gboolean
process_buffer_from_list(GstBuffer ** buffer,guint idx,gpointer user_data)783 process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
784 {
785 process_buffer (user_data, *buffer);
786 return TRUE;
787 }
788
789 static GstFlowReturn
gst_rtp_rtx_send_chain_list(GstPad * pad,GstObject * parent,GstBufferList * list)790 gst_rtp_rtx_send_chain_list (GstPad * pad, GstObject * parent,
791 GstBufferList * list)
792 {
793 GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (parent);
794 GstFlowReturn ret;
795
796 GST_OBJECT_LOCK (rtx);
797 gst_buffer_list_foreach (list, process_buffer_from_list, rtx);
798 GST_OBJECT_UNLOCK (rtx);
799
800 ret = gst_pad_push_list (rtx->srcpad, list);
801
802 return ret;
803 }
804
805 static void
gst_rtp_rtx_send_src_loop(GstRtpRtxSend * rtx)806 gst_rtp_rtx_send_src_loop (GstRtpRtxSend * rtx)
807 {
808 GstDataQueueItem *data;
809
810 if (gst_data_queue_pop (rtx->queue, &data)) {
811 GST_LOG_OBJECT (rtx, "pushing rtx buffer %p", data->object);
812
813 if (G_LIKELY (GST_IS_BUFFER (data->object))) {
814 GST_OBJECT_LOCK (rtx);
815 /* Update statistics just before pushing. */
816 rtx->num_rtx_packets++;
817 GST_OBJECT_UNLOCK (rtx);
818
819 gst_pad_push (rtx->srcpad, GST_BUFFER (data->object));
820 } else if (GST_IS_EVENT (data->object)) {
821 gst_pad_push_event (rtx->srcpad, GST_EVENT (data->object));
822
823 /* after EOS, we should not send any more buffers,
824 * even if there are more requests coming in */
825 if (GST_EVENT_TYPE (data->object) == GST_EVENT_EOS) {
826 gst_rtp_rtx_send_set_flushing (rtx, TRUE);
827 }
828 } else {
829 g_assert_not_reached ();
830 }
831
832 data->object = NULL; /* we no longer own that object */
833 data->destroy (data);
834 } else {
835 GST_LOG_OBJECT (rtx, "flushing");
836 gst_pad_pause_task (rtx->srcpad);
837 }
838 }
839
840 static gboolean
gst_rtp_rtx_send_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)841 gst_rtp_rtx_send_activate_mode (GstPad * pad, GstObject * parent,
842 GstPadMode mode, gboolean active)
843 {
844 GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (parent);
845 gboolean ret = FALSE;
846
847 switch (mode) {
848 case GST_PAD_MODE_PUSH:
849 if (active) {
850 gst_rtp_rtx_send_set_flushing (rtx, FALSE);
851 ret = gst_pad_start_task (rtx->srcpad,
852 (GstTaskFunction) gst_rtp_rtx_send_src_loop, rtx, NULL);
853 } else {
854 gst_rtp_rtx_send_set_flushing (rtx, TRUE);
855 ret = gst_pad_stop_task (rtx->srcpad);
856 }
857 GST_INFO_OBJECT (rtx, "activate_mode: active %d, ret %d", active, ret);
858 break;
859 default:
860 break;
861 }
862 return ret;
863 }
864
865 static void
gst_rtp_rtx_send_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)866 gst_rtp_rtx_send_get_property (GObject * object,
867 guint prop_id, GValue * value, GParamSpec * pspec)
868 {
869 GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (object);
870
871 switch (prop_id) {
872 case PROP_PAYLOAD_TYPE_MAP:
873 GST_OBJECT_LOCK (rtx);
874 g_value_set_boxed (value, rtx->rtx_pt_map_structure);
875 GST_OBJECT_UNLOCK (rtx);
876 break;
877 case PROP_MAX_SIZE_TIME:
878 GST_OBJECT_LOCK (rtx);
879 g_value_set_uint (value, rtx->max_size_time);
880 GST_OBJECT_UNLOCK (rtx);
881 break;
882 case PROP_MAX_SIZE_PACKETS:
883 GST_OBJECT_LOCK (rtx);
884 g_value_set_uint (value, rtx->max_size_packets);
885 GST_OBJECT_UNLOCK (rtx);
886 break;
887 case PROP_NUM_RTX_REQUESTS:
888 GST_OBJECT_LOCK (rtx);
889 g_value_set_uint (value, rtx->num_rtx_requests);
890 GST_OBJECT_UNLOCK (rtx);
891 break;
892 case PROP_NUM_RTX_PACKETS:
893 GST_OBJECT_LOCK (rtx);
894 g_value_set_uint (value, rtx->num_rtx_packets);
895 GST_OBJECT_UNLOCK (rtx);
896 break;
897 case PROP_CLOCK_RATE_MAP:
898 GST_OBJECT_LOCK (rtx);
899 g_value_set_boxed (value, rtx->clock_rate_map_structure);
900 GST_OBJECT_UNLOCK (rtx);
901 break;
902 default:
903 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
904 break;
905 }
906 }
907
908 static gboolean
structure_to_hash_table(GQuark field_id,const GValue * value,gpointer hash)909 structure_to_hash_table (GQuark field_id, const GValue * value, gpointer hash)
910 {
911 const gchar *field_str;
912 guint field_uint;
913 guint value_uint;
914
915 field_str = g_quark_to_string (field_id);
916 field_uint = atoi (field_str);
917 value_uint = g_value_get_uint (value);
918 g_hash_table_insert ((GHashTable *) hash, GUINT_TO_POINTER (field_uint),
919 GUINT_TO_POINTER (value_uint));
920
921 return TRUE;
922 }
923
924 static void
gst_rtp_rtx_send_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)925 gst_rtp_rtx_send_set_property (GObject * object,
926 guint prop_id, const GValue * value, GParamSpec * pspec)
927 {
928 GstRtpRtxSend *rtx = GST_RTP_RTX_SEND_CAST (object);
929
930 switch (prop_id) {
931 case PROP_SSRC_MAP:
932 GST_OBJECT_LOCK (rtx);
933 if (rtx->external_ssrc_map)
934 gst_structure_free (rtx->external_ssrc_map);
935 rtx->external_ssrc_map = g_value_dup_boxed (value);
936 GST_OBJECT_UNLOCK (rtx);
937 break;
938 case PROP_PAYLOAD_TYPE_MAP:
939 GST_OBJECT_LOCK (rtx);
940 if (rtx->rtx_pt_map_structure)
941 gst_structure_free (rtx->rtx_pt_map_structure);
942 rtx->rtx_pt_map_structure = g_value_dup_boxed (value);
943 g_hash_table_remove_all (rtx->rtx_pt_map);
944 gst_structure_foreach (rtx->rtx_pt_map_structure, structure_to_hash_table,
945 rtx->rtx_pt_map);
946 GST_OBJECT_UNLOCK (rtx);
947 break;
948 case PROP_MAX_SIZE_TIME:
949 GST_OBJECT_LOCK (rtx);
950 rtx->max_size_time = g_value_get_uint (value);
951 GST_OBJECT_UNLOCK (rtx);
952 break;
953 case PROP_MAX_SIZE_PACKETS:
954 GST_OBJECT_LOCK (rtx);
955 rtx->max_size_packets = g_value_get_uint (value);
956 GST_OBJECT_UNLOCK (rtx);
957 break;
958 case PROP_CLOCK_RATE_MAP:
959 GST_OBJECT_LOCK (rtx);
960 if (rtx->clock_rate_map_structure)
961 gst_structure_free (rtx->clock_rate_map_structure);
962 rtx->clock_rate_map_structure = g_value_dup_boxed (value);
963 g_hash_table_remove_all (rtx->clock_rate_map);
964 gst_structure_foreach (rtx->clock_rate_map_structure,
965 structure_to_hash_table, rtx->clock_rate_map);
966 GST_OBJECT_UNLOCK (rtx);
967 break;
968 default:
969 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
970 break;
971 }
972 }
973
974 static GstStateChangeReturn
gst_rtp_rtx_send_change_state(GstElement * element,GstStateChange transition)975 gst_rtp_rtx_send_change_state (GstElement * element, GstStateChange transition)
976 {
977 GstStateChangeReturn ret;
978 GstRtpRtxSend *rtx;
979
980 rtx = GST_RTP_RTX_SEND_CAST (element);
981
982 switch (transition) {
983 default:
984 break;
985 }
986
987 ret =
988 GST_ELEMENT_CLASS (gst_rtp_rtx_send_parent_class)->change_state (element,
989 transition);
990
991 switch (transition) {
992 case GST_STATE_CHANGE_PAUSED_TO_READY:
993 gst_rtp_rtx_send_reset (rtx);
994 break;
995 default:
996 break;
997 }
998
999 return ret;
1000 }
1001