1 /* GStreamer plugin for forward error correction
2 * Copyright (C) 2017 Pexip
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 *
18 * Author: Mikhail Fludkov <misha@pexip.com>
19 */
20
21 #include <gst/rtp/gstrtpbuffer.h>
22
23 #include "rtpstorage.h"
24 #include "rtpstoragestream.h"
25
26 #define GST_CAT_DEFAULT (gst_rtp_storage_debug)
27
28 enum
29 {
30 SIGNAL_PACKET_RECOVERED,
31 LAST_SIGNAL,
32 };
33
34 static guint rtp_storage_signals[LAST_SIGNAL] = { 0 };
35
36 G_DEFINE_TYPE (RtpStorage, rtp_storage, G_TYPE_OBJECT);
37
38 #define STORAGE_LOCK(s) g_mutex_lock (&(s)->streams_lock)
39 #define STORAGE_UNLOCK(s) g_mutex_unlock (&(s)->streams_lock)
40 #define DEFAULT_SIZE_TIME (0)
41
42 static void
rtp_storage_init(RtpStorage * self)43 rtp_storage_init (RtpStorage * self)
44 {
45 self->size_time = DEFAULT_SIZE_TIME;
46 self->streams = g_hash_table_new_full (NULL, NULL, NULL,
47 (GDestroyNotify) rtp_storage_stream_free);
48 g_mutex_init (&self->streams_lock);
49 }
50
51 static void
rtp_storage_dispose(GObject * obj)52 rtp_storage_dispose (GObject * obj)
53 {
54 RtpStorage *self = RTP_STORAGE (obj);
55 STORAGE_LOCK (self);
56 g_hash_table_unref (self->streams);
57 self->streams = NULL;
58 STORAGE_UNLOCK (self);
59 g_mutex_clear (&self->streams_lock);
60 G_OBJECT_CLASS (rtp_storage_parent_class)->dispose (obj);
61 }
62
63 static void
rtp_storage_class_init(RtpStorageClass * klass)64 rtp_storage_class_init (RtpStorageClass * klass)
65 {
66 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
67
68 rtp_storage_signals[SIGNAL_PACKET_RECOVERED] =
69 g_signal_new ("packet-recovered", G_TYPE_FROM_CLASS (klass),
70 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_BUFFER);
71
72 gobject_class->dispose = rtp_storage_dispose;
73 }
74
75 GstBufferList *
rtp_storage_get_packets_for_recovery(RtpStorage * self,gint fec_pt,guint32 ssrc,guint16 lost_seq)76 rtp_storage_get_packets_for_recovery (RtpStorage * self, gint fec_pt,
77 guint32 ssrc, guint16 lost_seq)
78 {
79 GstBufferList *ret = NULL;
80 RtpStorageStream *stream;
81
82 if (0 == self->size_time) {
83 GST_WARNING_OBJECT (self, "Received request for recovery RTP packets"
84 " around lost_seqnum=%u fec_pt=%u for ssrc=%08x, but size is 0",
85 lost_seq, fec_pt, ssrc);
86 return NULL;
87 }
88
89 STORAGE_LOCK (self);
90 stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
91 STORAGE_UNLOCK (self);
92
93 if (NULL == stream) {
94 GST_ERROR_OBJECT (self, "Can't find ssrc = 0x08%x", ssrc);
95 } else {
96 STREAM_LOCK (stream);
97 if (stream->queue.length > 0) {
98 GST_LOG_OBJECT (self, "Looking for recovery packets for fec_pt=%u around"
99 " lost_seq=%u for ssrc=%08x", fec_pt, lost_seq, ssrc);
100 ret =
101 rtp_storage_stream_get_packets_for_recovery (stream, fec_pt,
102 lost_seq);
103 } else {
104 GST_DEBUG_OBJECT (self, "Empty RTP storage for ssrc=%08x", ssrc);
105 }
106 STREAM_UNLOCK (stream);
107 }
108
109 return ret;
110 }
111
112 GstBuffer *
rtp_storage_get_redundant_packet(RtpStorage * self,guint32 ssrc,guint16 lost_seq)113 rtp_storage_get_redundant_packet (RtpStorage * self, guint32 ssrc,
114 guint16 lost_seq)
115 {
116 GstBuffer *ret = NULL;
117 RtpStorageStream *stream;
118
119 if (0 == self->size_time) {
120 GST_WARNING_OBJECT (self, "Received request for redundant RTP packet with"
121 " seq=%u for ssrc=%08x, but size is 0", lost_seq, ssrc);
122 return NULL;
123 }
124
125 STORAGE_LOCK (self);
126 stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
127 STORAGE_UNLOCK (self);
128
129 if (NULL == stream) {
130 GST_ERROR_OBJECT (self, "Can't find ssrc = 0x%x", ssrc);
131 } else {
132 STREAM_LOCK (stream);
133 if (stream->queue.length > 0) {
134 ret = rtp_storage_stream_get_redundant_packet (stream, lost_seq);
135 } else {
136 GST_DEBUG_OBJECT (self, "Empty RTP storage for ssrc=%08x", ssrc);
137 }
138 STREAM_UNLOCK (stream);
139 }
140
141 return ret;
142 }
143
144 static void
rtp_storage_do_put_recovered_packet(RtpStorage * self,GstBuffer * buffer,guint8 pt,guint32 ssrc,guint16 seq)145 rtp_storage_do_put_recovered_packet (RtpStorage * self,
146 GstBuffer * buffer, guint8 pt, guint32 ssrc, guint16 seq)
147 {
148 RtpStorageStream *stream;
149
150 STORAGE_LOCK (self);
151 stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
152 STORAGE_UNLOCK (self);
153
154 g_assert (stream);
155
156 GST_LOG_OBJECT (self,
157 "Storing recovered RTP packet with ssrc=%08x pt=%u seq=%u %"
158 GST_PTR_FORMAT, ssrc, pt, seq, buffer);
159
160 STREAM_LOCK (stream);
161 rtp_storage_stream_add_item (stream, buffer, pt, seq);
162 STREAM_UNLOCK (stream);
163 }
164
165 void
rtp_storage_put_recovered_packet(RtpStorage * self,GstBuffer * buffer,guint8 pt,guint32 ssrc,guint16 seq)166 rtp_storage_put_recovered_packet (RtpStorage * self,
167 GstBuffer * buffer, guint8 pt, guint32 ssrc, guint16 seq)
168 {
169 rtp_storage_do_put_recovered_packet (self, buffer, pt, ssrc, seq);
170 g_signal_emit (self, rtp_storage_signals[SIGNAL_PACKET_RECOVERED], 0, buffer);
171 }
172
173 gboolean
rtp_storage_append_buffer(RtpStorage * self,GstBuffer * buf)174 rtp_storage_append_buffer (RtpStorage * self, GstBuffer * buf)
175 {
176 GstRTPBuffer rtpbuf = GST_RTP_BUFFER_INIT;
177 RtpStorageStream *stream;
178 guint32 ssrc;
179 guint8 pt;
180 guint16 seq;
181
182 if (0 == self->size_time)
183 return TRUE;
184
185 /* We are about to save it in the queue, it so it is better take a ref before
186 * mapping the buffer */
187 gst_buffer_ref (buf);
188
189 if (!gst_rtp_buffer_map (buf, GST_MAP_READ |
190 GST_RTP_BUFFER_MAP_FLAG_SKIP_PADDING, &rtpbuf)) {
191 gst_buffer_unref (buf);
192 return TRUE;
193 }
194
195 ssrc = gst_rtp_buffer_get_ssrc (&rtpbuf);
196 pt = gst_rtp_buffer_get_payload_type (&rtpbuf);
197 seq = gst_rtp_buffer_get_seq (&rtpbuf);
198
199 STORAGE_LOCK (self);
200
201 stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
202 if (NULL == stream) {
203 GST_DEBUG_OBJECT (self,
204 "New media stream (ssrc=0x%08x, pt=%u) detected", ssrc, pt);
205 stream = rtp_storage_stream_new (ssrc);
206 g_hash_table_insert (self->streams, GUINT_TO_POINTER (ssrc), stream);
207 }
208
209 STORAGE_UNLOCK (self);
210
211 GST_LOG_OBJECT (self,
212 "Storing RTP packet with ssrc=%08x pt=%u seq=%u %" GST_PTR_FORMAT,
213 ssrc, pt, seq, buf);
214
215 STREAM_LOCK (stream);
216
217 /* Saving the buffer, now the storage owns it */
218 rtp_storage_stream_resize_and_add_item (stream, self->size_time, buf, pt,
219 seq);
220
221 STREAM_UNLOCK (stream);
222
223 gst_rtp_buffer_unmap (&rtpbuf);
224
225 if (GST_BUFFER_FLAG_IS_SET (buf, GST_RTP_BUFFER_FLAG_REDUNDANT)) {
226 gst_buffer_unref (buf);
227 return FALSE;
228 }
229
230 return TRUE;
231 }
232
233 void
rtp_storage_clear(RtpStorage * self)234 rtp_storage_clear (RtpStorage * self)
235 {
236 STORAGE_LOCK (self);
237 g_hash_table_remove_all (self->streams);
238 STORAGE_UNLOCK (self);
239 }
240
241 void
rtp_storage_set_size(RtpStorage * self,GstClockTime size)242 rtp_storage_set_size (RtpStorage * self, GstClockTime size)
243 {
244 self->size_time = size;
245 if (0 == self->size_time)
246 rtp_storage_clear (self);
247 }
248
249 GstClockTime
rtp_storage_get_size(RtpStorage * self)250 rtp_storage_get_size (RtpStorage * self)
251 {
252 return self->size_time;
253 }
254
255 RtpStorage *
rtp_storage_new(void)256 rtp_storage_new (void)
257 {
258 return g_object_new (RTP_TYPE_STORAGE, NULL);
259 }
260