• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* RTP Retransmission queue element for GStreamer
2  *
3  * gstrtprtxqueue.c:
4  *
5  * Copyright (C) 2013 Wim Taymans <wim.taymans@gmail.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 /**
24  * SECTION:element-rtprtxqueue
25  * @title: rtprtxqueue
26  *
27  * rtprtxqueue maintains a queue of transmitted RTP packets, up to a
28  * configurable limit (see #GstRTPRtxQueue:max-size-time,
29  * #GstRTPRtxQueue:max-size-packets), and retransmits them upon request
30  * from the downstream rtpsession (GstRTPRetransmissionRequest event).
31  *
32  * This element is similar to rtprtxsend, but it has differences:
33  * - Retransmission from rtprtxqueue is not RFC 4588 compliant. The
34  * retransmitted packets have the same ssrc and payload type as the original
35  * stream.
36  * - As a side-effect of the above, rtprtxqueue does not require the use of
37  * rtprtxreceive on the receiving end. rtpjitterbuffer alone is able to
38  * reconstruct the stream.
39  * - Retransmission from rtprtxqueue happens as soon as the next regular flow
40  * packet is chained, while rtprtxsend retransmits as soon as the retransmission
41  * event is received, using a helper thread.
42  * - rtprtxqueue can be used with rtpbin without the need of hooking to its
43  * #GstRtpBin::request-aux-sender signal, which means it can be used with
44  * rtpbin using gst-launch.
45  *
46  * See also #GstRtpRtxSend, #GstRtpRtxReceive
47  *
48  * # Example pipelines
49  *
50  * |[
51  * gst-launch-1.0 rtpbin name=b rtp-profile=avpf \
52  *    audiotestsrc is-live=true ! opusenc ! rtpopuspay pt=96 ! rtprtxqueue ! b.send_rtp_sink_0 \
53  *    b.send_rtp_src_0 ! identity drop-probability=0.01 ! udpsink host="127.0.0.1" port=5000 \
54  *    udpsrc port=5001 ! b.recv_rtcp_sink_0 \
55  *    b.send_rtcp_src_0 ! udpsink host="127.0.0.1" port=5002 sync=false async=false
56  * ]|
57  * Sender pipeline
58  *
59  * |[
60  * gst-launch-1.0 rtpbin name=b rtp-profile=avpf do-retransmission=true \
61  *    udpsrc port=5000 caps="application/x-rtp,media=(string)audio,clock-rate=(int)48000,encoding-name=(string)OPUS,payload=(int)96" ! \
62  *        b.recv_rtp_sink_0 \
63  *    b. ! rtpopusdepay ! opusdec ! audioconvert ! audioresample ! autoaudiosink \
64  *    udpsrc port=5002 ! b.recv_rtcp_sink_0 \
65  *    b.send_rtcp_src_0 ! udpsink host="127.0.0.1" port=5001 sync=false async=false
66  * ]|
67  * Receiver pipeline
68  */
69 
70 #ifdef HAVE_CONFIG_H
71 #include "config.h"
72 #endif
73 
74 #include <gst/gst.h>
75 #include <gst/rtp/gstrtpbuffer.h>
76 #include <string.h>
77 
78 #include "gstrtprtxqueue.h"
79 
80 GST_DEBUG_CATEGORY_STATIC (gst_rtp_rtx_queue_debug);
81 #define GST_CAT_DEFAULT gst_rtp_rtx_queue_debug
82 
83 #define DEFAULT_MAX_SIZE_TIME    0
84 #define DEFAULT_MAX_SIZE_PACKETS 100
85 
86 enum
87 {
88   PROP_0,
89   PROP_MAX_SIZE_TIME,
90   PROP_MAX_SIZE_PACKETS,
91   PROP_REQUESTS,
92   PROP_FULFILLED_REQUESTS,
93 };
94 
95 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
96     GST_PAD_SRC,
97     GST_PAD_ALWAYS,
98     GST_STATIC_CAPS ("application/x-rtp")
99     );
100 
101 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink",
102     GST_PAD_SINK,
103     GST_PAD_ALWAYS,
104     GST_STATIC_CAPS ("application/x-rtp")
105     );
106 
107 static gboolean gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent,
108     GstEvent * event);
109 static gboolean gst_rtp_rtx_queue_sink_event (GstPad * pad, GstObject * parent,
110     GstEvent * event);
111 static GstFlowReturn gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent,
112     GstBuffer * buffer);
113 static GstFlowReturn gst_rtp_rtx_queue_chain_list (GstPad * pad,
114     GstObject * parent, GstBufferList * list);
115 
116 static GstStateChangeReturn gst_rtp_rtx_queue_change_state (GstElement *
117     element, GstStateChange transition);
118 
119 static void gst_rtp_rtx_queue_set_property (GObject * object, guint prop_id,
120     const GValue * value, GParamSpec * pspec);
121 static void gst_rtp_rtx_queue_get_property (GObject * object, guint prop_id,
122     GValue * value, GParamSpec * pspec);
123 static void gst_rtp_rtx_queue_finalize (GObject * object);
124 
125 G_DEFINE_TYPE_WITH_CODE (GstRTPRtxQueue, gst_rtp_rtx_queue, GST_TYPE_ELEMENT,
126     GST_DEBUG_CATEGORY_INIT (gst_rtp_rtx_queue_debug, "rtprtxqueue", 0,
127         "rtp retransmission queue"));
128 GST_ELEMENT_REGISTER_DEFINE (rtprtxqueue, "rtprtxqueue", GST_RANK_NONE,
129     GST_TYPE_RTP_RTX_QUEUE);
130 
131 static void
gst_rtp_rtx_queue_class_init(GstRTPRtxQueueClass * klass)132 gst_rtp_rtx_queue_class_init (GstRTPRtxQueueClass * klass)
133 {
134   GObjectClass *gobject_class;
135   GstElementClass *gstelement_class;
136 
137   gobject_class = (GObjectClass *) klass;
138   gstelement_class = (GstElementClass *) klass;
139 
140   gobject_class->get_property = gst_rtp_rtx_queue_get_property;
141   gobject_class->set_property = gst_rtp_rtx_queue_set_property;
142   gobject_class->finalize = gst_rtp_rtx_queue_finalize;
143 
144   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
145       g_param_spec_uint ("max-size-time", "Max Size Times",
146           "Amount of ms to queue (0 = unlimited)", 0, G_MAXUINT,
147           DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
148 
149   g_object_class_install_property (gobject_class, PROP_MAX_SIZE_PACKETS,
150       g_param_spec_uint ("max-size-packets", "Max Size Packets",
151           "Amount of packets to queue (0 = unlimited)", 0, G_MAXUINT,
152           DEFAULT_MAX_SIZE_PACKETS,
153           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
154 
155   g_object_class_install_property (gobject_class, PROP_REQUESTS,
156       g_param_spec_uint ("requests", "Requests",
157           "Total number of retransmission requests", 0, G_MAXUINT,
158           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
159 
160   g_object_class_install_property (gobject_class, PROP_FULFILLED_REQUESTS,
161       g_param_spec_uint ("fulfilled-requests", "Fulfilled Requests",
162           "Number of fulfilled retransmission requests", 0, G_MAXUINT,
163           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
164 
165   gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
166   gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
167 
168   gst_element_class_set_static_metadata (gstelement_class,
169       "RTP Retransmission Queue", "Codec",
170       "Keep RTP packets in a queue for retransmission",
171       "Wim Taymans <wim.taymans@gmail.com>");
172 
173   gstelement_class->change_state =
174       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_change_state);
175 }
176 
177 static void
gst_rtp_rtx_queue_reset(GstRTPRtxQueue * rtx,gboolean full)178 gst_rtp_rtx_queue_reset (GstRTPRtxQueue * rtx, gboolean full)
179 {
180   g_mutex_lock (&rtx->lock);
181   g_queue_foreach (rtx->queue, (GFunc) gst_buffer_unref, NULL);
182   g_queue_clear (rtx->queue);
183   g_list_foreach (rtx->pending, (GFunc) gst_buffer_unref, NULL);
184   g_list_free (rtx->pending);
185   rtx->pending = NULL;
186   rtx->n_requests = 0;
187   rtx->n_fulfilled_requests = 0;
188   g_mutex_unlock (&rtx->lock);
189 }
190 
191 static void
gst_rtp_rtx_queue_finalize(GObject * object)192 gst_rtp_rtx_queue_finalize (GObject * object)
193 {
194   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
195 
196   gst_rtp_rtx_queue_reset (rtx, TRUE);
197   g_queue_free (rtx->queue);
198   g_mutex_clear (&rtx->lock);
199 
200   G_OBJECT_CLASS (gst_rtp_rtx_queue_parent_class)->finalize (object);
201 }
202 
203 static void
gst_rtp_rtx_queue_init(GstRTPRtxQueue * rtx)204 gst_rtp_rtx_queue_init (GstRTPRtxQueue * rtx)
205 {
206   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtx);
207 
208   rtx->srcpad =
209       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
210           "src"), "src");
211   GST_PAD_SET_PROXY_CAPS (rtx->srcpad);
212   GST_PAD_SET_PROXY_ALLOCATION (rtx->srcpad);
213   gst_pad_set_event_function (rtx->srcpad,
214       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_src_event));
215   gst_element_add_pad (GST_ELEMENT (rtx), rtx->srcpad);
216 
217   rtx->sinkpad =
218       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
219           "sink"), "sink");
220   GST_PAD_SET_PROXY_CAPS (rtx->sinkpad);
221   GST_PAD_SET_PROXY_ALLOCATION (rtx->sinkpad);
222   gst_pad_set_event_function (rtx->sinkpad,
223       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_sink_event));
224   gst_pad_set_chain_function (rtx->sinkpad,
225       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain));
226   gst_pad_set_chain_list_function (rtx->sinkpad,
227       GST_DEBUG_FUNCPTR (gst_rtp_rtx_queue_chain_list));
228   gst_element_add_pad (GST_ELEMENT (rtx), rtx->sinkpad);
229 
230   rtx->queue = g_queue_new ();
231   g_mutex_init (&rtx->lock);
232 
233   rtx->max_size_time = DEFAULT_MAX_SIZE_TIME;
234   rtx->max_size_packets = DEFAULT_MAX_SIZE_PACKETS;
235 }
236 
237 typedef struct
238 {
239   GstRTPRtxQueue *rtx;
240   guint seqnum;
241   gboolean found;
242 } RTXData;
243 
244 static void
push_seqnum(GstBuffer * buffer,RTXData * data)245 push_seqnum (GstBuffer * buffer, RTXData * data)
246 {
247   GstRTPRtxQueue *rtx = data->rtx;
248   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
249   guint16 seqnum;
250 
251   if (data->found)
252     return;
253 
254   if (!GST_IS_BUFFER (buffer) ||
255       !gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtpbuffer))
256     return;
257 
258   seqnum = gst_rtp_buffer_get_seq (&rtpbuffer);
259   gst_rtp_buffer_unmap (&rtpbuffer);
260 
261   if (seqnum == data->seqnum) {
262     data->found = TRUE;
263     GST_DEBUG_OBJECT (rtx, "found %d", seqnum);
264     rtx->pending = g_list_prepend (rtx->pending, gst_buffer_ref (buffer));
265   }
266 }
267 
268 static gboolean
gst_rtp_rtx_queue_src_event(GstPad * pad,GstObject * parent,GstEvent * event)269 gst_rtp_rtx_queue_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
270 {
271   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (parent);
272   gboolean res;
273 
274   switch (GST_EVENT_TYPE (event)) {
275     case GST_EVENT_CUSTOM_UPSTREAM:
276     {
277       const GstStructure *s;
278 
279       s = gst_event_get_structure (event);
280       if (gst_structure_has_name (s, "GstRTPRetransmissionRequest")) {
281         guint seqnum;
282         RTXData data;
283 
284         if (!gst_structure_get_uint (s, "seqnum", &seqnum))
285           seqnum = -1;
286 
287         GST_DEBUG_OBJECT (rtx, "request %d", seqnum);
288 
289         g_mutex_lock (&rtx->lock);
290         data.rtx = rtx;
291         data.seqnum = seqnum;
292         data.found = FALSE;
293         rtx->n_requests += 1;
294         g_queue_foreach (rtx->queue, (GFunc) push_seqnum, &data);
295         g_mutex_unlock (&rtx->lock);
296 
297         gst_event_unref (event);
298         res = TRUE;
299       } else {
300         res = gst_pad_event_default (pad, parent, event);
301       }
302       break;
303     }
304     default:
305       res = gst_pad_event_default (pad, parent, event);
306       break;
307   }
308   return res;
309 }
310 
311 static gboolean
gst_rtp_rtx_queue_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)312 gst_rtp_rtx_queue_sink_event (GstPad * pad, GstObject * parent,
313     GstEvent * event)
314 {
315   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (parent);
316   gboolean res;
317 
318   switch (GST_EVENT_TYPE (event)) {
319     case GST_EVENT_SEGMENT:
320     {
321       g_mutex_lock (&rtx->lock);
322       gst_event_copy_segment (event, &rtx->head_segment);
323       g_queue_push_head (rtx->queue, gst_event_ref (event));
324       g_mutex_unlock (&rtx->lock);
325       /* fall through */
326     }
327     default:
328       res = gst_pad_event_default (pad, parent, event);
329       break;
330   }
331   return res;
332 }
333 
334 static void
do_push(GstBuffer * buffer,GstRTPRtxQueue * rtx)335 do_push (GstBuffer * buffer, GstRTPRtxQueue * rtx)
336 {
337   rtx->n_fulfilled_requests += 1;
338   gst_pad_push (rtx->srcpad, buffer);
339 }
340 
341 static guint32
get_ts_diff(GstRTPRtxQueue * rtx)342 get_ts_diff (GstRTPRtxQueue * rtx)
343 {
344   GstClockTime high_ts, low_ts;
345   GstClockTimeDiff result;
346   GstBuffer *high_buf, *low_buf;
347 
348   high_buf = g_queue_peek_head (rtx->queue);
349 
350   while (GST_IS_EVENT ((low_buf = g_queue_peek_tail (rtx->queue)))) {
351     GstEvent *event = g_queue_pop_tail (rtx->queue);
352     gst_event_copy_segment (event, &rtx->tail_segment);
353     gst_event_unref (event);
354   }
355 
356   if (!high_buf || !low_buf || high_buf == low_buf)
357     return 0;
358 
359   high_ts = GST_BUFFER_TIMESTAMP (high_buf);
360   low_ts = GST_BUFFER_TIMESTAMP (low_buf);
361 
362   high_ts = gst_segment_to_running_time (&rtx->head_segment, GST_FORMAT_TIME,
363       high_ts);
364   low_ts = gst_segment_to_running_time (&rtx->tail_segment, GST_FORMAT_TIME,
365       low_ts);
366 
367   result = high_ts - low_ts;
368 
369   /* return value in ms instead of ns */
370   return (guint32) gst_util_uint64_scale_int (result, 1, GST_MSECOND);
371 }
372 
373 /* Must be called with rtx->lock */
374 static void
shrink_queue(GstRTPRtxQueue * rtx)375 shrink_queue (GstRTPRtxQueue * rtx)
376 {
377   if (rtx->max_size_packets) {
378     while (g_queue_get_length (rtx->queue) > rtx->max_size_packets)
379       gst_buffer_unref (g_queue_pop_tail (rtx->queue));
380   }
381   if (rtx->max_size_time) {
382     while (get_ts_diff (rtx) > rtx->max_size_time)
383       gst_buffer_unref (g_queue_pop_tail (rtx->queue));
384   }
385 }
386 
387 static GstFlowReturn
gst_rtp_rtx_queue_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)388 gst_rtp_rtx_queue_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
389 {
390   GstRTPRtxQueue *rtx;
391   GstFlowReturn ret;
392   GList *pending;
393 
394   rtx = GST_RTP_RTX_QUEUE (parent);
395 
396   g_mutex_lock (&rtx->lock);
397   g_queue_push_head (rtx->queue, gst_buffer_ref (buffer));
398   shrink_queue (rtx);
399 
400   pending = rtx->pending;
401   rtx->pending = NULL;
402   g_mutex_unlock (&rtx->lock);
403 
404   pending = g_list_reverse (pending);
405   g_list_foreach (pending, (GFunc) do_push, rtx);
406   g_list_free (pending);
407 
408   ret = gst_pad_push (rtx->srcpad, buffer);
409 
410   return ret;
411 }
412 
413 static gboolean
push_to_queue(GstBuffer ** buffer,guint idx,gpointer user_data)414 push_to_queue (GstBuffer ** buffer, guint idx, gpointer user_data)
415 {
416   GQueue *queue = user_data;
417 
418   g_queue_push_head (queue, gst_buffer_ref (*buffer));
419 
420   return TRUE;
421 }
422 
423 static GstFlowReturn
gst_rtp_rtx_queue_chain_list(GstPad * pad,GstObject * parent,GstBufferList * list)424 gst_rtp_rtx_queue_chain_list (GstPad * pad, GstObject * parent,
425     GstBufferList * list)
426 {
427   GstRTPRtxQueue *rtx;
428   GstFlowReturn ret;
429   GList *pending;
430 
431   rtx = GST_RTP_RTX_QUEUE (parent);
432 
433   g_mutex_lock (&rtx->lock);
434   gst_buffer_list_foreach (list, push_to_queue, rtx->queue);
435   shrink_queue (rtx);
436 
437   pending = rtx->pending;
438   rtx->pending = NULL;
439   g_mutex_unlock (&rtx->lock);
440 
441   pending = g_list_reverse (pending);
442   g_list_foreach (pending, (GFunc) do_push, rtx);
443   g_list_free (pending);
444 
445   ret = gst_pad_push_list (rtx->srcpad, list);
446 
447   return ret;
448 }
449 
450 static void
gst_rtp_rtx_queue_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)451 gst_rtp_rtx_queue_get_property (GObject * object,
452     guint prop_id, GValue * value, GParamSpec * pspec)
453 {
454   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
455 
456   switch (prop_id) {
457     case PROP_MAX_SIZE_TIME:
458       g_value_set_uint (value, rtx->max_size_time);
459       break;
460     case PROP_MAX_SIZE_PACKETS:
461       g_value_set_uint (value, rtx->max_size_packets);
462       break;
463     case PROP_REQUESTS:
464       g_value_set_uint (value, rtx->n_requests);
465       break;
466     case PROP_FULFILLED_REQUESTS:
467       g_value_set_uint (value, rtx->n_fulfilled_requests);
468       break;
469     default:
470       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
471       break;
472   }
473 }
474 
475 static void
gst_rtp_rtx_queue_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)476 gst_rtp_rtx_queue_set_property (GObject * object,
477     guint prop_id, const GValue * value, GParamSpec * pspec)
478 {
479   GstRTPRtxQueue *rtx = GST_RTP_RTX_QUEUE (object);
480 
481   switch (prop_id) {
482     case PROP_MAX_SIZE_TIME:
483       rtx->max_size_time = g_value_get_uint (value);
484       break;
485     case PROP_MAX_SIZE_PACKETS:
486       rtx->max_size_packets = g_value_get_uint (value);
487       break;
488     default:
489       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
490       break;
491   }
492 }
493 
494 static GstStateChangeReturn
gst_rtp_rtx_queue_change_state(GstElement * element,GstStateChange transition)495 gst_rtp_rtx_queue_change_state (GstElement * element, GstStateChange transition)
496 {
497   GstStateChangeReturn ret;
498   GstRTPRtxQueue *rtx;
499 
500   rtx = GST_RTP_RTX_QUEUE (element);
501 
502   switch (transition) {
503     default:
504       break;
505   }
506 
507   ret =
508       GST_ELEMENT_CLASS (gst_rtp_rtx_queue_parent_class)->change_state (element,
509       transition);
510 
511   switch (transition) {
512     case GST_STATE_CHANGE_PAUSED_TO_READY:
513       gst_rtp_rtx_queue_reset (rtx, TRUE);
514       break;
515     default:
516       break;
517   }
518 
519   return ret;
520 }
521