1 /* GStreamer plugin for forward error correction
2 * Copyright (C) 2017 Pexip
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 *
18 * Author: Mikhail Fludkov <misha@pexip.com>
19 */
20
21 #include "rtpstoragestream.h"
22
23 #define GST_CAT_DEFAULT (gst_rtp_storage_debug)
24
25 static RtpStorageItem *
rtp_storage_item_new(GstBuffer * buffer,guint8 pt,guint16 seq)26 rtp_storage_item_new (GstBuffer * buffer, guint8 pt, guint16 seq)
27 {
28 RtpStorageItem *ret = g_slice_new0 (RtpStorageItem);
29 ret->buffer = buffer;
30 ret->pt = pt;
31 ret->seq = seq;
32 return ret;
33 }
34
35 static void
rtp_storage_item_free(RtpStorageItem * item)36 rtp_storage_item_free (RtpStorageItem * item)
37 {
38 g_assert (item->buffer != NULL);
39 gst_buffer_unref (item->buffer);
40 g_slice_free (RtpStorageItem, item);
41 }
42
43 static gint
rtp_storage_item_compare(gconstpointer a,gconstpointer b,gpointer userdata)44 rtp_storage_item_compare (gconstpointer a, gconstpointer b, gpointer userdata)
45 {
46 gint seq_diff = gst_rtp_buffer_compare_seqnum (
47 ((RtpStorageItem const *) a)->seq, ((RtpStorageItem const *) b)->seq);
48
49 if (seq_diff >= 0)
50 return 0;
51
52 return 1;
53 }
54
55 static void
rtp_storage_stream_resize(RtpStorageStream * stream,GstClockTime size_time)56 rtp_storage_stream_resize (RtpStorageStream * stream, GstClockTime size_time)
57 {
58 GList *it;
59 guint i, too_old_buffers_num = 0;
60
61 g_assert (GST_CLOCK_TIME_IS_VALID (stream->max_arrival_time));
62 g_assert (GST_CLOCK_TIME_IS_VALID (size_time));
63 g_assert_cmpint (size_time, >, 0);
64
65 /* Iterating from oldest sequence numbers to newest */
66 for (i = 0, it = stream->queue.tail; it; it = it->prev, ++i) {
67 RtpStorageItem *item = it->data;
68 GstClockTime arrival_time = GST_BUFFER_DTS_OR_PTS (item->buffer);
69 if (GST_CLOCK_TIME_IS_VALID (arrival_time)) {
70 if (stream->max_arrival_time - arrival_time > size_time) {
71 too_old_buffers_num = i + 1;
72 } else
73 break;
74 }
75 }
76
77 for (i = 0; i < too_old_buffers_num; ++i) {
78 RtpStorageItem *item = g_queue_pop_tail (&stream->queue);
79
80 GST_TRACE ("Removing %u/%u buffers, pt=%d seq=%d for ssrc=%08x",
81 i, too_old_buffers_num, item->pt, item->seq, stream->ssrc);
82
83 rtp_storage_item_free (item);
84 }
85 }
86
87 /* This algorithm corresponds to rtp_jitter_buffer_get_seqnum_diff(),
88 * we want to keep the same number of packets in the worse case.
89 */
90
91 static guint16
rtp_storage_stream_get_seqnum_diff(RtpStorageStream * stream)92 rtp_storage_stream_get_seqnum_diff (RtpStorageStream * stream)
93 {
94 guint32 high_seqnum, low_seqnum;
95 RtpStorageItem *high_item, *low_item;
96 guint16 result;
97
98
99 high_item = (RtpStorageItem *) g_queue_peek_head (&stream->queue);
100 low_item = (RtpStorageItem *) g_queue_peek_tail (&stream->queue);
101
102 if (!high_item || !low_item || high_item == low_item)
103 return 0;
104
105 high_seqnum = high_item->seq;
106 low_seqnum = low_item->seq;
107
108 /* it needs to work if seqnum wraps */
109 if (high_seqnum >= low_seqnum) {
110 result = (guint32) (high_seqnum - low_seqnum);
111 } else {
112 result = (guint32) (high_seqnum + G_MAXUINT16 + 1 - low_seqnum);
113 }
114 return result;
115 }
116
117 void
rtp_storage_stream_resize_and_add_item(RtpStorageStream * stream,GstClockTime size_time,GstBuffer * buffer,guint8 pt,guint16 seq)118 rtp_storage_stream_resize_and_add_item (RtpStorageStream * stream,
119 GstClockTime size_time, GstBuffer * buffer, guint8 pt, guint16 seq)
120 {
121 GstClockTime arrival_time = GST_BUFFER_DTS_OR_PTS (buffer);
122
123 /* These limits match those of the jittebuffer, we keep a couple more
124 * packets to avoid races as it can be queried after the output of the
125 * jitterbuffer.
126 */
127 if (rtp_storage_stream_get_seqnum_diff (stream) >= 32765 ||
128 stream->queue.length > 10100) {
129 RtpStorageItem *item = g_queue_pop_tail (&stream->queue);
130
131 GST_WARNING ("Queue too big, removing pt=%d seq=%d for ssrc=%08x",
132 item->pt, item->seq, stream->ssrc);
133
134 rtp_storage_item_free (item);
135 }
136
137 if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (arrival_time))) {
138 if (G_LIKELY (GST_CLOCK_TIME_IS_VALID (stream->max_arrival_time)))
139 stream->max_arrival_time = MAX (stream->max_arrival_time, arrival_time);
140 else
141 stream->max_arrival_time = arrival_time;
142
143 rtp_storage_stream_resize (stream, size_time);
144 rtp_storage_stream_add_item (stream, buffer, pt, seq);
145 } else {
146 rtp_storage_stream_add_item (stream, buffer, pt, seq);
147 }
148 }
149
150 RtpStorageStream *
rtp_storage_stream_new(guint32 ssrc)151 rtp_storage_stream_new (guint32 ssrc)
152 {
153 RtpStorageStream *ret = g_slice_new0 (RtpStorageStream);
154 ret->max_arrival_time = GST_CLOCK_TIME_NONE;
155 ret->ssrc = ssrc;
156 g_mutex_init (&ret->stream_lock);
157 return ret;
158 }
159
160 void
rtp_storage_stream_free(RtpStorageStream * stream)161 rtp_storage_stream_free (RtpStorageStream * stream)
162 {
163 STREAM_LOCK (stream);
164 while (stream->queue.length)
165 rtp_storage_item_free (g_queue_pop_tail (&stream->queue));
166 STREAM_UNLOCK (stream);
167 g_mutex_clear (&stream->stream_lock);
168 g_slice_free (RtpStorageStream, stream);
169 }
170
171 void
rtp_storage_stream_add_item(RtpStorageStream * stream,GstBuffer * buffer,guint8 pt,guint16 seq)172 rtp_storage_stream_add_item (RtpStorageStream * stream, GstBuffer * buffer,
173 guint8 pt, guint16 seq)
174 {
175 RtpStorageItem *item = rtp_storage_item_new (buffer, pt, seq);
176 GList *sibling = g_queue_find_custom (&stream->queue, item,
177 (GCompareFunc) rtp_storage_item_compare);
178
179 g_queue_insert_before (&stream->queue, sibling, item);
180 }
181
182 GstBufferList *
rtp_storage_stream_get_packets_for_recovery(RtpStorageStream * stream,guint8 pt_fec,guint16 lost_seq)183 rtp_storage_stream_get_packets_for_recovery (RtpStorageStream * stream,
184 guint8 pt_fec, guint16 lost_seq)
185 {
186 guint ret_length = 0;
187 GList *end = NULL;
188 GList *start = NULL;
189 gboolean saw_fec = TRUE; /* To initialize the start pointer in the loop below */
190 GList *it;
191
192 /* Looking for media stream chunk with FEC packets at the end, which could
193 * can have the lost packet. For example:
194 *
195 * |#10 FEC| |#9 FEC| |#8| ... |#6| |#5 FEC| |#4 FEC| |#3 FEC| |#2| |#1| |#0|
196 *
197 * Say @lost_seq = 7. Want to return bufferlist with packets [#6 : #10]. Other
198 * packets are not relevant for recovery of packet 7.
199 *
200 * Or the lost packet can be in the storage. In that case single packet is returned.
201 * It can happen if:
202 * - it could have arrived right after it was considered lost (more of a corner case)
203 * - it was recovered together with the other lost packet (most likely)
204 */
205 for (it = stream->queue.tail; it; it = it->prev) {
206 RtpStorageItem *item = it->data;
207 gboolean found_end = FALSE;
208
209 /* Is the buffer we lost in the storage? */
210 if (item->seq == lost_seq) {
211 start = it;
212 end = it;
213 ret_length = 1;
214 break;
215 }
216
217 if (pt_fec == item->pt) {
218 gint seq_diff = gst_rtp_buffer_compare_seqnum (lost_seq, item->seq);
219
220 if (seq_diff >= 0) {
221 if (it->prev) {
222 gboolean media_next =
223 pt_fec != ((RtpStorageItem *) it->prev->data)->pt;
224 found_end = media_next;
225 } else
226 found_end = TRUE;
227 }
228 saw_fec = TRUE;
229 } else if (saw_fec) {
230 saw_fec = FALSE;
231 start = it;
232 ret_length = 0;
233 }
234
235 ++ret_length;
236 if (found_end) {
237 end = it;
238 break;
239 }
240 }
241
242 if (end && !start)
243 start = end;
244
245 if (start && end) {
246 GstBufferList *ret = gst_buffer_list_new_sized (ret_length);
247 GList *it;
248
249 GST_LOG ("Found %u buffers with lost seq=%d for ssrc=%08x, creating %"
250 GST_PTR_FORMAT, ret_length, lost_seq, stream->ssrc, ret);
251
252 for (it = start; it != end->prev; it = it->prev)
253 gst_buffer_list_add (ret,
254 gst_buffer_ref (((RtpStorageItem *) it->data)->buffer));
255 return ret;
256 }
257
258 return NULL;
259 }
260
261 GstBuffer *
rtp_storage_stream_get_redundant_packet(RtpStorageStream * stream,guint16 lost_seq)262 rtp_storage_stream_get_redundant_packet (RtpStorageStream * stream,
263 guint16 lost_seq)
264 {
265 GList *it;
266 for (it = stream->queue.head; it; it = it->next) {
267 RtpStorageItem *item = it->data;
268 if (item->seq == lost_seq) {
269 GST_LOG ("Found buffer pt=%u seq=%u for ssrc=%08x %" GST_PTR_FORMAT,
270 item->pt, item->seq, stream->ssrc, item->buffer);
271 return gst_buffer_ref (item->buffer);
272 }
273 }
274 GST_DEBUG ("Could not find packet with seq=%u for ssrc=%08x",
275 lost_seq, stream->ssrc);
276 return NULL;
277 }
278