• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer plugin for forward error correction
2  * Copyright (C) 2017 Pexip
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17  *
18  * Author: Mikhail Fludkov <misha@pexip.com>
19  */
20 
21 #include <gst/rtp/gstrtpbuffer.h>
22 
23 #include "rtpstorage.h"
24 #include "rtpstoragestream.h"
25 
26 #define GST_CAT_DEFAULT (gst_rtp_storage_debug)
27 
28 enum
29 {
30   SIGNAL_PACKET_RECOVERED,
31   LAST_SIGNAL,
32 };
33 
34 static guint rtp_storage_signals[LAST_SIGNAL] = { 0 };
35 
36 G_DEFINE_TYPE (RtpStorage, rtp_storage, G_TYPE_OBJECT);
37 
38 #define STORAGE_LOCK(s)   g_mutex_lock   (&(s)->streams_lock)
39 #define STORAGE_UNLOCK(s) g_mutex_unlock (&(s)->streams_lock)
40 #define DEFAULT_SIZE_TIME (0)
41 
42 static void
rtp_storage_init(RtpStorage * self)43 rtp_storage_init (RtpStorage * self)
44 {
45   self->size_time = DEFAULT_SIZE_TIME;
46   self->streams = g_hash_table_new_full (NULL, NULL, NULL,
47       (GDestroyNotify) rtp_storage_stream_free);
48   g_mutex_init (&self->streams_lock);
49 }
50 
51 static void
rtp_storage_dispose(GObject * obj)52 rtp_storage_dispose (GObject * obj)
53 {
54   RtpStorage *self = RTP_STORAGE (obj);
55   STORAGE_LOCK (self);
56   g_hash_table_unref (self->streams);
57   self->streams = NULL;
58   STORAGE_UNLOCK (self);
59   g_mutex_clear (&self->streams_lock);
60   G_OBJECT_CLASS (rtp_storage_parent_class)->dispose (obj);
61 }
62 
63 static void
rtp_storage_class_init(RtpStorageClass * klass)64 rtp_storage_class_init (RtpStorageClass * klass)
65 {
66   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
67 
68   rtp_storage_signals[SIGNAL_PACKET_RECOVERED] =
69       g_signal_new ("packet-recovered", G_TYPE_FROM_CLASS (klass),
70       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, GST_TYPE_BUFFER);
71 
72   gobject_class->dispose = rtp_storage_dispose;
73 }
74 
75 GstBufferList *
rtp_storage_get_packets_for_recovery(RtpStorage * self,gint fec_pt,guint32 ssrc,guint16 lost_seq)76 rtp_storage_get_packets_for_recovery (RtpStorage * self, gint fec_pt,
77     guint32 ssrc, guint16 lost_seq)
78 {
79   GstBufferList *ret = NULL;
80   RtpStorageStream *stream;
81 
82   if (0 == self->size_time) {
83     GST_WARNING_OBJECT (self, "Received request for recovery RTP packets"
84         " around lost_seqnum=%u fec_pt=%u for ssrc=%08x, but size is 0",
85         lost_seq, fec_pt, ssrc);
86     return NULL;
87   }
88 
89   STORAGE_LOCK (self);
90   stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
91   STORAGE_UNLOCK (self);
92 
93   if (NULL == stream) {
94     GST_ERROR_OBJECT (self, "Can't find ssrc = 0x08%x", ssrc);
95   } else {
96     STREAM_LOCK (stream);
97     if (stream->queue.length > 0) {
98       GST_LOG_OBJECT (self, "Looking for recovery packets for fec_pt=%u around"
99           " lost_seq=%u for ssrc=%08x", fec_pt, lost_seq, ssrc);
100       ret =
101           rtp_storage_stream_get_packets_for_recovery (stream, fec_pt,
102           lost_seq);
103     } else {
104       GST_DEBUG_OBJECT (self, "Empty RTP storage for ssrc=%08x", ssrc);
105     }
106     STREAM_UNLOCK (stream);
107   }
108 
109   return ret;
110 }
111 
112 GstBuffer *
rtp_storage_get_redundant_packet(RtpStorage * self,guint32 ssrc,guint16 lost_seq)113 rtp_storage_get_redundant_packet (RtpStorage * self, guint32 ssrc,
114     guint16 lost_seq)
115 {
116   GstBuffer *ret = NULL;
117   RtpStorageStream *stream;
118 
119   if (0 == self->size_time) {
120     GST_WARNING_OBJECT (self, "Received request for redundant RTP packet with"
121         " seq=%u for ssrc=%08x, but size is 0", lost_seq, ssrc);
122     return NULL;
123   }
124 
125   STORAGE_LOCK (self);
126   stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
127   STORAGE_UNLOCK (self);
128 
129   if (NULL == stream) {
130     GST_ERROR_OBJECT (self, "Can't find ssrc = 0x%x", ssrc);
131   } else {
132     STREAM_LOCK (stream);
133     if (stream->queue.length > 0) {
134       ret = rtp_storage_stream_get_redundant_packet (stream, lost_seq);
135     } else {
136       GST_DEBUG_OBJECT (self, "Empty RTP storage for ssrc=%08x", ssrc);
137     }
138     STREAM_UNLOCK (stream);
139   }
140 
141   return ret;
142 }
143 
144 static void
rtp_storage_do_put_recovered_packet(RtpStorage * self,GstBuffer * buffer,guint8 pt,guint32 ssrc,guint16 seq)145 rtp_storage_do_put_recovered_packet (RtpStorage * self,
146     GstBuffer * buffer, guint8 pt, guint32 ssrc, guint16 seq)
147 {
148   RtpStorageStream *stream;
149 
150   STORAGE_LOCK (self);
151   stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
152   STORAGE_UNLOCK (self);
153 
154   g_assert (stream);
155 
156   GST_LOG_OBJECT (self,
157       "Storing recovered RTP packet with ssrc=%08x pt=%u seq=%u %"
158       GST_PTR_FORMAT, ssrc, pt, seq, buffer);
159 
160   STREAM_LOCK (stream);
161   rtp_storage_stream_add_item (stream, buffer, pt, seq);
162   STREAM_UNLOCK (stream);
163 }
164 
165 void
rtp_storage_put_recovered_packet(RtpStorage * self,GstBuffer * buffer,guint8 pt,guint32 ssrc,guint16 seq)166 rtp_storage_put_recovered_packet (RtpStorage * self,
167     GstBuffer * buffer, guint8 pt, guint32 ssrc, guint16 seq)
168 {
169   rtp_storage_do_put_recovered_packet (self, buffer, pt, ssrc, seq);
170   g_signal_emit (self, rtp_storage_signals[SIGNAL_PACKET_RECOVERED], 0, buffer);
171 }
172 
173 gboolean
rtp_storage_append_buffer(RtpStorage * self,GstBuffer * buf)174 rtp_storage_append_buffer (RtpStorage * self, GstBuffer * buf)
175 {
176   GstRTPBuffer rtpbuf = GST_RTP_BUFFER_INIT;
177   RtpStorageStream *stream;
178   guint32 ssrc;
179   guint8 pt;
180   guint16 seq;
181 
182   if (0 == self->size_time)
183     return TRUE;
184 
185   /* We are about to save it in the queue, it so it is better take a ref before
186    * mapping the buffer */
187   gst_buffer_ref (buf);
188 
189   if (!gst_rtp_buffer_map (buf, GST_MAP_READ |
190           GST_RTP_BUFFER_MAP_FLAG_SKIP_PADDING, &rtpbuf)) {
191     gst_buffer_unref (buf);
192     return TRUE;
193   }
194 
195   ssrc = gst_rtp_buffer_get_ssrc (&rtpbuf);
196   pt = gst_rtp_buffer_get_payload_type (&rtpbuf);
197   seq = gst_rtp_buffer_get_seq (&rtpbuf);
198 
199   STORAGE_LOCK (self);
200 
201   stream = g_hash_table_lookup (self->streams, GUINT_TO_POINTER (ssrc));
202   if (NULL == stream) {
203     GST_DEBUG_OBJECT (self,
204         "New media stream (ssrc=0x%08x, pt=%u) detected", ssrc, pt);
205     stream = rtp_storage_stream_new (ssrc);
206     g_hash_table_insert (self->streams, GUINT_TO_POINTER (ssrc), stream);
207   }
208 
209   STORAGE_UNLOCK (self);
210 
211   GST_LOG_OBJECT (self,
212       "Storing RTP packet with ssrc=%08x pt=%u seq=%u %" GST_PTR_FORMAT,
213       ssrc, pt, seq, buf);
214 
215   STREAM_LOCK (stream);
216 
217   /* Saving the buffer, now the storage owns it */
218   rtp_storage_stream_resize_and_add_item (stream, self->size_time, buf, pt,
219       seq);
220 
221   STREAM_UNLOCK (stream);
222 
223   gst_rtp_buffer_unmap (&rtpbuf);
224 
225   if (GST_BUFFER_FLAG_IS_SET (buf, GST_RTP_BUFFER_FLAG_REDUNDANT)) {
226     gst_buffer_unref (buf);
227     return FALSE;
228   }
229 
230   return TRUE;
231 }
232 
233 void
rtp_storage_clear(RtpStorage * self)234 rtp_storage_clear (RtpStorage * self)
235 {
236   STORAGE_LOCK (self);
237   g_hash_table_remove_all (self->streams);
238   STORAGE_UNLOCK (self);
239 }
240 
241 void
rtp_storage_set_size(RtpStorage * self,GstClockTime size)242 rtp_storage_set_size (RtpStorage * self, GstClockTime size)
243 {
244   self->size_time = size;
245   if (0 == self->size_time)
246     rtp_storage_clear (self);
247 }
248 
249 GstClockTime
rtp_storage_get_size(RtpStorage * self)250 rtp_storage_get_size (RtpStorage * self)
251 {
252   return self->size_time;
253 }
254 
255 RtpStorage *
rtp_storage_new(void)256 rtp_storage_new (void)
257 {
258   return g_object_new (RTP_TYPE_STORAGE, NULL);
259 }
260