• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer plugin for forward error correction
2  * Copyright (C) 2017 Pexip
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17  *
18  * Author: Mikhail Fludkov <misha@pexip.com>
19  */
20 
21 /**
22  * SECTION:element-rtpreddec
23  * @short_description: RTP Redundant Audio Data (RED) decoder
24  * @title: rtpreddec
25  *
26  * Decode Redundant Audio Data (RED) as per RFC 2198.
27  *
28  * This element is mostly provided for chrome webrtc compatibility:
29  * chrome will wrap ulpfec-protected streams in RED packets, and such
30  * streams need to be unwrapped by this element before being passed on
31  * to #GstRtpUlpFecDec.
32  *
33  * The #GstRtpRedDec:pt property should be set to the expected payload
34  * types of the RED packets.
35  *
36  * When using #GstRtpBin, this element should be inserted through the
37  * #GstRtpBin::request-aux-receiver signal.
38  *
39  * ## Example pipeline
40  *
41  * |[
42  * gst-launch-1.0 udpsrc port=8888 caps="application/x-rtp, payload=96, clock-rate=90000" ! rtpreddec pt=122 ! rtpstorage size-time=220000000 ! rtpssrcdemux ! application/x-rtp, payload=96, clock-rate=90000, media=video, encoding-name=H264 ! rtpjitterbuffer do-lost=1 latency=200 !  rtpulpfecdec pt=122 ! rtph264depay ! avdec_h264 ! videoconvert ! autovideosink
43  * ]| This example will receive a stream with RED and ULP FEC and try to reconstruct the packets.
44  *
45  * See also: #GstRtpRedEnc, #GstWebRTCBin, #GstRtpBin
46  * Since: 1.14
47  */
48 
49 #include <gst/rtp/gstrtpbuffer.h>
50 
51 #include "gstrtpelements.h"
52 #include "rtpredcommon.h"
53 #include "gstrtpreddec.h"
54 #include "rtpulpfeccommon.h"
55 
56 #define RTP_HISTORY_MAX_SIZE (16)
57 
58 typedef struct
59 {
60   guint32 timestamp;
61   guint16 seq;
62 } RTPHistItem;
63 
64 #define RTP_HIST_ITEM_TIMESTAMP(p) ((RTPHistItem *)p)->timestamp
65 #define RTP_HIST_ITEM_SEQ(p) ((RTPHistItem *)p)->seq
66 
67 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
68     GST_PAD_SINK,
69     GST_PAD_ALWAYS,
70     GST_STATIC_CAPS ("application/x-rtp"));
71 
72 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
73     GST_PAD_SRC,
74     GST_PAD_ALWAYS,
75     GST_STATIC_CAPS ("application/x-rtp"));
76 
77 #define UNDEF_PT                -1
78 #define MIN_PT                  UNDEF_PT
79 #define MAX_PT                  127
80 #define DEFAULT_PT              UNDEF_PT
81 
82 GST_DEBUG_CATEGORY_STATIC (gst_rtp_red_dec_debug);
83 #define GST_CAT_DEFAULT gst_rtp_red_dec_debug
84 
85 G_DEFINE_TYPE (GstRtpRedDec, gst_rtp_red_dec, GST_TYPE_ELEMENT);
86 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (rtpreddec, "rtpreddec", GST_RANK_NONE,
87     GST_TYPE_RTP_RED_DEC, rtp_element_init (plugin));
88 
89 enum
90 {
91   PROP_0,
92   PROP_PT,
93   PROP_RECEIVED,
94   PROP_PAYLOADS,
95 };
96 
97 static RTPHistItem *
rtp_hist_item_alloc(void)98 rtp_hist_item_alloc (void)
99 {
100   return g_slice_new (RTPHistItem);
101 }
102 
103 static void
rtp_hist_item_free(gpointer item)104 rtp_hist_item_free (gpointer item)
105 {
106   g_slice_free (RTPHistItem, item);
107 }
108 
109 static gint
gst_rtp_red_history_find_less_or_equal(gconstpointer item,gconstpointer timestamp)110 gst_rtp_red_history_find_less_or_equal (gconstpointer item,
111     gconstpointer timestamp)
112 {
113   guint32 t = GPOINTER_TO_UINT (timestamp);
114   gint32 diff = t - RTP_HIST_ITEM_TIMESTAMP (item);
115   return diff < 0;
116 }
117 
118 static gint
gst_rtp_red_history_find_less(gconstpointer item,gconstpointer timestamp)119 gst_rtp_red_history_find_less (gconstpointer item, gconstpointer timestamp)
120 {
121   guint32 t = GPOINTER_TO_UINT (timestamp);
122   gint32 diff = t - RTP_HIST_ITEM_TIMESTAMP (item);
123   return diff <= 0;
124 }
125 
126 static void
gst_rtp_red_history_update(GstRtpRedDec * self,GQueue * rtp_history,GstRTPBuffer * rtp)127 gst_rtp_red_history_update (GstRtpRedDec * self, GQueue * rtp_history,
128     GstRTPBuffer * rtp)
129 {
130   RTPHistItem *item;
131   GList *link, *sibling;
132 
133   /* If we have not reached MAX number of elements in the history,
134    * allocate a new link and a new item,
135    * otherwise reuse the tail (the oldest data) without any reallocations
136    */
137   if (rtp_history->length < RTP_HISTORY_MAX_SIZE) {
138     item = rtp_hist_item_alloc ();
139     link = g_list_alloc ();
140     link->data = item;
141   } else {
142     link = g_queue_pop_tail_link (rtp_history);
143     item = link->data;
144   }
145 
146   item->timestamp = gst_rtp_buffer_get_timestamp (rtp);
147   item->seq = gst_rtp_buffer_get_seq (rtp);
148 
149   /* Looking for a place to insert new link.
150    * The queue has newest to oldest rtp timestamps, so in 99% cases
151    * it is inserted before the head of the queue */
152   sibling = g_list_find_custom (rtp_history->head,
153       GUINT_TO_POINTER (item->timestamp),
154       gst_rtp_red_history_find_less_or_equal);
155   g_queue_push_nth_link (rtp_history,
156       g_list_position (rtp_history->head, sibling), link);
157 }
158 
159 static gboolean
rtp_red_buffer_is_valid(GstRtpRedDec * self,GstRTPBuffer * red_rtp,gsize * dst_first_red_payload_offset)160 rtp_red_buffer_is_valid (GstRtpRedDec * self, GstRTPBuffer * red_rtp,
161     gsize * dst_first_red_payload_offset)
162 {
163   guint8 *payload = gst_rtp_buffer_get_payload (red_rtp);
164   gsize payload_len = gst_rtp_buffer_get_payload_len (red_rtp);
165   gsize red_hdrs_offset = 0;
166   guint red_hdrs_checked = 0;
167   guint redundant_payload_len = 0;
168 
169   while (TRUE) {
170     gpointer red_hdr = payload + red_hdrs_offset;
171     gsize red_hdr_len;
172     gboolean is_redundant;
173 
174     ++red_hdrs_checked;
175 
176     /* Can we address the first byte where F bit is located ? */
177     if (red_hdrs_offset + 1 > payload_len)
178       goto red_buffer_invalid;
179 
180     is_redundant = rtp_red_block_is_redundant (red_hdr);
181 
182     /* Is it the last block? */
183     if (is_redundant) {
184       red_hdr_len = rtp_red_block_header_get_length (TRUE);
185 
186       /* Can we address all the other bytes in RED block header? */
187       if (red_hdrs_offset + red_hdr_len > payload_len)
188         goto red_buffer_invalid;
189 
190       redundant_payload_len += rtp_red_block_get_payload_length (red_hdr);
191       red_hdrs_offset += red_hdr_len;
192     } else {
193       red_hdr_len = rtp_red_block_header_get_length (FALSE);
194       red_hdrs_offset += red_hdr_len;
195       break;
196     }
197   }
198 
199   /* Do we have enough data to create redundant packets & main packet. Keep in
200    * mind that redundant_payload_len contains the length of redundant packets only.
201    */
202   if (red_hdrs_offset + redundant_payload_len >= payload_len)
203     goto red_buffer_invalid;
204 
205   *dst_first_red_payload_offset = red_hdrs_offset;
206 
207   GST_LOG_OBJECT (self, "RED packet has %u blocks", red_hdrs_checked);
208   return TRUE;
209 
210 red_buffer_invalid:
211   GST_WARNING_OBJECT (self, "Received invalid RED packet "
212       "ssrc=0x%08x pt=%u tstamp=%u seq=%u size=%u, "
213       "checked %u blocks",
214       gst_rtp_buffer_get_ssrc (red_rtp),
215       gst_rtp_buffer_get_payload_type (red_rtp),
216       gst_rtp_buffer_get_timestamp (red_rtp),
217       gst_rtp_buffer_get_seq (red_rtp),
218       gst_rtp_buffer_get_packet_len (red_rtp), red_hdrs_checked);
219   return FALSE;
220 }
221 
222 static gboolean
gst_red_history_lost_seq_num_for_timestamp(GstRtpRedDec * self,GQueue * rtp_history,guint32 timestamp,guint16 * dst_seq_num)223 gst_red_history_lost_seq_num_for_timestamp (GstRtpRedDec * self,
224     GQueue * rtp_history, guint32 timestamp, guint16 * dst_seq_num)
225 {
226   GList *older_sibling = g_list_find_custom (rtp_history->head,
227       GUINT_TO_POINTER (timestamp),
228       gst_rtp_red_history_find_less);
229   RTPHistItem *older;
230   RTPHistItem *newer;
231   guint32 timestamp_diff;
232   gint seq_diff, lost_packet_idx;
233 
234   if (NULL == older_sibling) {
235     if (rtp_history->length == RTP_HISTORY_MAX_SIZE)
236       GST_WARNING_OBJECT (self, "History is too short. "
237           "Oldest rtp timestamp %u, looking for %u, size %u",
238           RTP_HIST_ITEM_TIMESTAMP (rtp_history->tail->data),
239           timestamp, rtp_history->length);
240     return FALSE;
241   }
242 
243   if (NULL == older_sibling->prev) {
244     GST_WARNING_OBJECT (self, "RED block timestamp offset probably wrong. "
245         "Latest rtp timestamp %u, looking for %u, size %u",
246         RTP_HIST_ITEM_TIMESTAMP (rtp_history->head->data),
247         timestamp, rtp_history->length);
248     return FALSE;
249   }
250 
251   older = older_sibling->data;
252   newer = older_sibling->prev->data;
253   /* We know for sure @older has lower timestamp than we are looking for,
254    * if @newer has the same timestamp, there is no packet loss and we
255    * don't need to use redundant data */
256   if (newer->timestamp == timestamp)
257     return FALSE;
258 
259   seq_diff = gst_rtp_buffer_compare_seqnum (older->seq, newer->seq);
260   if (seq_diff <= 1) {
261     if (seq_diff == 1)
262       GST_WARNING_OBJECT (self, "RED block timestamp offset is wrong: "
263           "#%u,%u #%u,%u looking for %u",
264           older->seq, older->timestamp,
265           newer->seq, newer->timestamp, timestamp);
266     else
267       GST_WARNING_OBJECT (self, "RTP timestamps increasing while "
268           "sequence numbers decreasing: #%u,%u #%u,%u",
269           older->seq, older->timestamp, newer->seq, newer->timestamp);
270     return FALSE;
271   }
272 
273   timestamp_diff = newer->timestamp - older->timestamp;
274   for (lost_packet_idx = 1; lost_packet_idx < seq_diff; ++lost_packet_idx) {
275     guint32 lost_timestamp = older->timestamp +
276         lost_packet_idx * timestamp_diff / seq_diff;
277     if (lost_timestamp == timestamp) {
278       *dst_seq_num = older->seq + lost_packet_idx;
279       return TRUE;
280     }
281   }
282 
283   GST_WARNING_OBJECT (self, "Can't find RED block timestamp "
284       "#%u,%u #%u,%u looking for %u",
285       older->seq, older->timestamp, newer->seq, newer->timestamp, timestamp);
286   return FALSE;
287 }
288 
289 static GstBuffer *
gst_rtp_red_create_packet(GstRtpRedDec * self,GstRTPBuffer * red_rtp,gboolean marker,guint8 pt,guint16 seq_num,guint32 timestamp,gsize red_payload_subbuffer_start,gsize red_payload_subbuffer_len)290 gst_rtp_red_create_packet (GstRtpRedDec * self, GstRTPBuffer * red_rtp,
291     gboolean marker, guint8 pt, guint16 seq_num, guint32 timestamp,
292     gsize red_payload_subbuffer_start, gsize red_payload_subbuffer_len)
293 {
294   guint csrc_count = gst_rtp_buffer_get_csrc_count (red_rtp);
295   GstBuffer *ret = gst_rtp_buffer_new_allocate (0, 0, csrc_count);
296   GstRTPBuffer ret_rtp = GST_RTP_BUFFER_INIT;
297   guint i;
298   if (!gst_rtp_buffer_map (ret, GST_MAP_WRITE, &ret_rtp))
299     g_assert_not_reached ();
300 
301   gst_rtp_buffer_set_marker (&ret_rtp, marker);
302   gst_rtp_buffer_set_payload_type (&ret_rtp, pt);
303   gst_rtp_buffer_set_seq (&ret_rtp, seq_num);
304   gst_rtp_buffer_set_timestamp (&ret_rtp, timestamp);
305   gst_rtp_buffer_set_ssrc (&ret_rtp, gst_rtp_buffer_get_ssrc (red_rtp));
306   for (i = 0; i < csrc_count; ++i)
307     gst_rtp_buffer_set_csrc (&ret_rtp, i, gst_rtp_buffer_get_csrc (red_rtp, i));
308   gst_rtp_buffer_unmap (&ret_rtp);
309 
310   ret = gst_buffer_append (ret,
311       gst_rtp_buffer_get_payload_subbuffer (red_rtp,
312           red_payload_subbuffer_start, red_payload_subbuffer_len));
313 
314   /* Timestamps, meta, flags from the RED packet should go to main block packet */
315   gst_buffer_copy_into (ret, red_rtp->buffer, GST_BUFFER_COPY_METADATA, 0, -1);
316   return ret;
317 }
318 
319 static GstBuffer *
gst_rtp_red_create_from_redundant_block(GstRtpRedDec * self,GQueue * rtp_history,GstRTPBuffer * red_rtp,gsize * red_hdr_offset,gsize * red_payload_offset)320 gst_rtp_red_create_from_redundant_block (GstRtpRedDec * self,
321     GQueue * rtp_history, GstRTPBuffer * red_rtp, gsize * red_hdr_offset,
322     gsize * red_payload_offset)
323 {
324   guint8 *payload = gst_rtp_buffer_get_payload (red_rtp);
325   guint8 *red_hdr = payload + *red_hdr_offset;
326   guint32 lost_timestamp = gst_rtp_buffer_get_timestamp (red_rtp) -
327       rtp_red_block_get_timestamp_offset (red_hdr);
328 
329   GstBuffer *ret = NULL;
330   guint16 lost_seq = 0;
331   if (gst_red_history_lost_seq_num_for_timestamp (self, rtp_history,
332           lost_timestamp, &lost_seq)) {
333     GST_LOG_OBJECT (self,
334         "Recovering from RED packet pt=%u ts=%u seq=%u" " len=%u present",
335         rtp_red_block_get_payload_type (red_hdr), lost_timestamp, lost_seq,
336         rtp_red_block_get_payload_length (red_hdr));
337     ret =
338         gst_rtp_red_create_packet (self, red_rtp, FALSE,
339         rtp_red_block_get_payload_type (red_hdr), lost_seq, lost_timestamp,
340         *red_payload_offset, rtp_red_block_get_payload_length (red_hdr));
341     GST_BUFFER_FLAG_SET (ret, GST_RTP_BUFFER_FLAG_REDUNDANT);
342   } else {
343     GST_LOG_OBJECT (self, "Ignore RED packet pt=%u ts=%u len=%u because already"
344         " present", rtp_red_block_get_payload_type (red_hdr), lost_timestamp,
345         rtp_red_block_get_payload_length (red_hdr));
346   }
347 
348   *red_hdr_offset += rtp_red_block_header_get_length (TRUE);
349   *red_payload_offset += rtp_red_block_get_payload_length (red_hdr);
350   return ret;
351 }
352 
353 static GstBuffer *
gst_rtp_red_create_from_main_block(GstRtpRedDec * self,GstRTPBuffer * red_rtp,gsize red_hdr_offset,gsize * red_payload_offset)354 gst_rtp_red_create_from_main_block (GstRtpRedDec * self,
355     GstRTPBuffer * red_rtp, gsize red_hdr_offset, gsize * red_payload_offset)
356 {
357   guint8 *payload = gst_rtp_buffer_get_payload (red_rtp);
358   GstBuffer *ret = gst_rtp_red_create_packet (self, red_rtp,
359       gst_rtp_buffer_get_marker (red_rtp),
360       rtp_red_block_get_payload_type (payload + red_hdr_offset),
361       gst_rtp_buffer_get_seq (red_rtp),
362       gst_rtp_buffer_get_timestamp (red_rtp),
363       *red_payload_offset, -1);
364   *red_payload_offset = gst_rtp_buffer_get_payload_len (red_rtp);
365   GST_LOG_OBJECT (self, "Extracting main payload from RED pt=%u seq=%u ts=%u"
366       " marker=%u", rtp_red_block_get_payload_type (payload + red_hdr_offset),
367       gst_rtp_buffer_get_seq (red_rtp), gst_rtp_buffer_get_timestamp (red_rtp),
368       gst_rtp_buffer_get_marker (red_rtp));
369 
370   return ret;
371 }
372 
373 static GstBuffer *
gst_rtp_red_create_from_block(GstRtpRedDec * self,GQueue * rtp_history,GstRTPBuffer * red_rtp,gsize * red_hdr_offset,gsize * red_payload_offset)374 gst_rtp_red_create_from_block (GstRtpRedDec * self, GQueue * rtp_history,
375     GstRTPBuffer * red_rtp, gsize * red_hdr_offset, gsize * red_payload_offset)
376 {
377   guint8 *payload = gst_rtp_buffer_get_payload (red_rtp);
378 
379   if (rtp_red_block_is_redundant (payload + (*red_hdr_offset)))
380     return gst_rtp_red_create_from_redundant_block (self, rtp_history, red_rtp,
381         red_hdr_offset, red_payload_offset);
382 
383   return gst_rtp_red_create_from_main_block (self, red_rtp, *red_hdr_offset,
384       red_payload_offset);
385 }
386 
387 static GstFlowReturn
gst_rtp_red_process(GstRtpRedDec * self,GQueue * rtp_history,GstRTPBuffer * red_rtp,gsize first_red_payload_offset)388 gst_rtp_red_process (GstRtpRedDec * self, GQueue * rtp_history,
389     GstRTPBuffer * red_rtp, gsize first_red_payload_offset)
390 {
391   gsize red_hdr_offset = 0;
392   gsize red_payload_offset = first_red_payload_offset;
393   gsize payload_len = gst_rtp_buffer_get_payload_len (red_rtp);
394   GstFlowReturn ret = GST_FLOW_OK;
395 
396   do {
397     GstBuffer *buf = gst_rtp_red_create_from_block (self, rtp_history, red_rtp,
398         &red_hdr_offset,
399         &red_payload_offset);
400     if (buf)
401       ret = gst_pad_push (self->srcpad, buf);
402   } while (GST_FLOW_OK == ret && red_payload_offset < payload_len);
403 
404   return ret;
405 }
406 
407 static gboolean
is_red_pt(GstRtpRedDec * self,guint8 pt)408 is_red_pt (GstRtpRedDec * self, guint8 pt)
409 {
410   gboolean ret;
411 
412   g_mutex_lock (&self->lock);
413   if (pt == self->pt) {
414     ret = TRUE;
415     goto done;
416   }
417 
418   ret = self->payloads
419       && g_hash_table_contains (self->payloads, GINT_TO_POINTER (pt));
420 
421 done:
422   g_mutex_unlock (&self->lock);
423   return ret;
424 }
425 
426 static GstFlowReturn
gst_rtp_red_dec_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)427 gst_rtp_red_dec_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
428 {
429   GstRtpRedDec *self = GST_RTP_RED_DEC (parent);
430   GstRTPBuffer irtp = GST_RTP_BUFFER_INIT;
431   GstFlowReturn ret = GST_FLOW_OK;
432   gsize first_red_payload_offset = 0;
433   GQueue *rtp_history;
434   guint32 ssrc;
435 
436   if (self->pt == UNDEF_PT && self->payloads == NULL)
437     return gst_pad_push (self->srcpad, buffer);
438 
439   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &irtp))
440     return gst_pad_push (self->srcpad, buffer);
441 
442   ssrc = gst_rtp_buffer_get_ssrc (&irtp);
443 
444   if (!(rtp_history =
445           g_hash_table_lookup (self->rtp_histories, GUINT_TO_POINTER (ssrc)))) {
446     rtp_history = g_queue_new ();
447     g_hash_table_insert (self->rtp_histories, GUINT_TO_POINTER (ssrc),
448         rtp_history);
449   }
450 
451   gst_rtp_red_history_update (self, rtp_history, &irtp);
452 
453   if (!is_red_pt (self, gst_rtp_buffer_get_payload_type (&irtp))) {
454     GST_LOG_RTP_PACKET (self, "rtp header (incoming)", &irtp);
455 
456     gst_rtp_buffer_unmap (&irtp);
457     return gst_pad_push (self->srcpad, buffer);
458   }
459 
460   self->num_received++;
461 
462   if (rtp_red_buffer_is_valid (self, &irtp, &first_red_payload_offset)) {
463     GST_DEBUG_RTP_PACKET (self, "rtp header (red)", &irtp);
464     ret =
465         gst_rtp_red_process (self, rtp_history, &irtp,
466         first_red_payload_offset);
467   }
468 
469   gst_rtp_buffer_unmap (&irtp);
470   gst_buffer_unref (buffer);
471   return ret;
472 }
473 
474 static void
gst_rtp_red_dec_dispose(GObject * obj)475 gst_rtp_red_dec_dispose (GObject * obj)
476 {
477   GstRtpRedDec *self = GST_RTP_RED_DEC (obj);
478 
479   g_hash_table_unref (self->rtp_histories);
480 
481   if (self->payloads) {
482     g_hash_table_unref (self->payloads);
483   }
484 
485   g_mutex_clear (&self->lock);
486 
487   G_OBJECT_CLASS (gst_rtp_red_dec_parent_class)->dispose (obj);
488 }
489 
490 static void
free_rtp_history(GQueue * rtp_history)491 free_rtp_history (GQueue * rtp_history)
492 {
493   g_queue_free_full (rtp_history, rtp_hist_item_free);
494 }
495 
496 static void
gst_rtp_red_dec_init(GstRtpRedDec * self)497 gst_rtp_red_dec_init (GstRtpRedDec * self)
498 {
499   GstPadTemplate *pad_template;
500 
501   pad_template =
502       gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (self), "src");
503   self->srcpad = gst_pad_new_from_template (pad_template, "src");
504   gst_element_add_pad (GST_ELEMENT_CAST (self), self->srcpad);
505 
506   pad_template =
507       gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (self), "sink");
508   self->sinkpad = gst_pad_new_from_template (pad_template, "sink");
509   gst_pad_set_chain_function (self->sinkpad,
510       GST_DEBUG_FUNCPTR (gst_rtp_red_dec_chain));
511   GST_PAD_SET_PROXY_CAPS (self->sinkpad);
512   GST_PAD_SET_PROXY_ALLOCATION (self->sinkpad);
513   gst_element_add_pad (GST_ELEMENT (self), self->sinkpad);
514 
515   self->pt = DEFAULT_PT;
516   self->num_received = 0;
517   self->rtp_histories =
518       g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
519       (GDestroyNotify) free_rtp_history);
520   self->payloads = NULL;
521   g_mutex_init (&self->lock);
522 }
523 
524 static void
gst_rtp_red_dec_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)525 gst_rtp_red_dec_set_property (GObject * object, guint prop_id,
526     const GValue * value, GParamSpec * pspec)
527 {
528   GstRtpRedDec *self = GST_RTP_RED_DEC (object);
529 
530   switch (prop_id) {
531     case PROP_PT:
532       g_mutex_lock (&self->lock);
533       self->pt = g_value_get_int (value);
534       g_mutex_unlock (&self->lock);
535       break;
536     case PROP_PAYLOADS:
537     {
538       guint i, n_vals;
539 
540       g_mutex_lock (&self->lock);
541       if (self->payloads) {
542         g_hash_table_unref (self->payloads);
543         self->payloads = NULL;
544       }
545 
546       n_vals = gst_value_array_get_size (value);
547 
548       if (n_vals > 0) {
549         self->payloads = g_hash_table_new (g_direct_hash, g_direct_equal);
550 
551         for (i = 0; i < gst_value_array_get_size (value); i++) {
552           const GValue *val = gst_value_array_get_value (value, i);
553 
554           g_hash_table_insert (self->payloads,
555               GINT_TO_POINTER (g_value_get_int (val)), NULL);
556         }
557       }
558       g_mutex_unlock (&self->lock);
559       break;
560     }
561     default:
562       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
563       break;
564   }
565 }
566 
567 static void
append_payload(gpointer key,gpointer value,GValue * array)568 append_payload (gpointer key, gpointer value, GValue * array)
569 {
570   GValue v = { 0, };
571   g_value_init (&v, G_TYPE_INT);
572   g_value_set_int (&v, GPOINTER_TO_INT (key));
573   gst_value_array_append_value (array, &v);
574   g_value_unset (&v);
575 }
576 
577 static void
gst_rtp_red_dec_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)578 gst_rtp_red_dec_get_property (GObject * object, guint prop_id,
579     GValue * value, GParamSpec * pspec)
580 {
581   GstRtpRedDec *self = GST_RTP_RED_DEC (object);
582   switch (prop_id) {
583     case PROP_PT:
584       g_mutex_lock (&self->lock);
585       g_value_set_int (value, self->pt);
586       g_mutex_unlock (&self->lock);
587       break;
588     case PROP_RECEIVED:
589       g_value_set_uint (value, self->num_received);
590       break;
591     case PROP_PAYLOADS:
592     {
593       g_mutex_lock (&self->lock);
594       if (self->payloads) {
595         g_hash_table_foreach (self->payloads, (GHFunc) append_payload, value);
596       }
597       g_mutex_unlock (&self->lock);
598       break;
599     }
600     default:
601       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
602       break;
603   }
604 }
605 
606 static void
gst_rtp_red_dec_class_init(GstRtpRedDecClass * klass)607 gst_rtp_red_dec_class_init (GstRtpRedDecClass * klass)
608 {
609   GObjectClass *gobject_class;
610   GstElementClass *element_class;
611 
612   gobject_class = G_OBJECT_CLASS (klass);
613   element_class = GST_ELEMENT_CLASS (klass);
614 
615   gst_element_class_add_pad_template (element_class,
616       gst_static_pad_template_get (&src_template));
617   gst_element_class_add_pad_template (element_class,
618       gst_static_pad_template_get (&sink_template));
619 
620   gst_element_class_set_metadata (element_class,
621       "Redundant Audio Data (RED) Decoder",
622       "Codec/Depayloader/Network/RTP",
623       "Decode Redundant Audio Data (RED)",
624       "Hani Mustafa <hani@pexip.com>, Mikhail Fludkov <misha@pexip.com>");
625 
626   gobject_class->set_property =
627       GST_DEBUG_FUNCPTR (gst_rtp_red_dec_set_property);
628   gobject_class->get_property =
629       GST_DEBUG_FUNCPTR (gst_rtp_red_dec_get_property);
630   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_rtp_red_dec_dispose);
631 
632   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PT,
633       g_param_spec_int ("pt", "payload type",
634           "Payload type FEC packets",
635           MIN_PT, MAX_PT, DEFAULT_PT,
636           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
637 
638   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_RECEIVED,
639       g_param_spec_uint ("received", "Received",
640           "Count of received packets",
641           0, G_MAXUINT32, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
642 
643   /**
644    * rtpreddec:payloads:
645    *
646    * All the RED payloads this decoder may encounter
647    *
648    * Since: 1.20
649    */
650   g_object_class_install_property (G_OBJECT_CLASS (klass),
651       PROP_PAYLOADS,
652       gst_param_spec_array ("payloads",
653           "RED payloads",
654           "All the RED payloads this decoder may encounter",
655           g_param_spec_int ("pt",
656               "payload type",
657               "A RED payload type",
658               MIN_PT, MAX_PT,
659               DEFAULT_PT,
660               G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS),
661           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)
662       );
663 
664   GST_DEBUG_CATEGORY_INIT (gst_rtp_red_dec_debug, "rtpreddec", 0,
665       "RTP RED Decoder");
666 }
667