• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) <2020> Mathieu Duponchelle <mathieu@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 
20 /**
21  * SECTION:element-rtpst2022-1-fecdec
22  * @see_also: #element-rtpst2022-1-fecenc
23  *
24  * This element takes as input a media stream and up to two FEC
25  * streams as described in SMPTE 2022-1: Forward Error Correction
26  * for Real-Time Video/Audio Transport Over IP Networks, and makes
27  * use of the FEC packets to recover media packets that may have
28  * gotten lost.
29  *
30  * ## Design
31  *
32  * The approach picked for this element is to proactively reconstruct missing
33  * packets as soon as possible. When a FEC packet arrives, the element
34  * immediately checks whether a media packet in the row / column it protects
35  * can be reconstructed.
36  *
37  * Similarly, when a media packet comes in, the element checks whether it has
38  * already received a corresponding packet in both the column and row the packet
39  * belongs to, and if so goes through the first step listed above.
40  *
41  * This process is repeated recursively, allowing for recoveries over one
42  * dimension to unblock recoveries over the other.
43  *
44  * In perfect networking conditions, this incurs next to no overhead as FEC
45  * packets will arrive after the media packets, causing no reconstruction to
46  * take place, just a few checks upon chaining.
47  *
48  * ## sender / receiver example
49  *
50  * ``` shell
51  * gst-launch-1.0 \
52  *   rtpbin name=rtp fec-encoders='fec,0="rtpst2022-1-fecenc\ rows\=5\ columns\=5";' \
53  *   uridecodebin uri=file:///path/to/video/file ! x264enc key-int-max=60 tune=zerolatency ! \
54  *     queue ! mpegtsmux ! rtpmp2tpay ssrc=0 ! rtp.send_rtp_sink_0 \
55  *   rtp.send_rtp_src_0 ! udpsink host=127.0.0.1 port=5000 \
56  *   rtp.send_fec_src_0_0 ! udpsink host=127.0.0.1 port=5002 async=false \
57  *   rtp.send_fec_src_0_1 ! udpsink host=127.0.0.1 port=5004 async=false
58  * ```
59  *
60  * ``` shell
61  * gst-launch-1.0 \
62  *   rtpbin latency=500 fec-decoders='fec,0="rtpst2022-1-fecdec\ size-time\=1000000000";' name=rtp \
63  *   udpsrc address=127.0.0.1 port=5002 caps="application/x-rtp, payload=96" ! queue ! rtp.recv_fec_sink_0_0 \
64  *   udpsrc address=127.0.0.1 port=5004 caps="application/x-rtp, payload=96" ! queue ! rtp.recv_fec_sink_0_1 \
65  *   udpsrc address=127.0.0.1 port=5000 caps="application/x-rtp, media=video, clock-rate=90000, encoding-name=mp2t, payload=33" ! \
66  *     queue ! netsim drop-probability=0.05 ! rtp.recv_rtp_sink_0 \
67  *   rtp. ! decodebin ! videoconvert ! queue ! autovideosink
68  * ```
69  *
70  * With the above command line, as the media packet size is constant,
71  * the fec overhead can be approximated to the number of fec packets
72  * per 2-d matrix of media packet, here 10 fec packets for each 25
73  * media packets.
74  *
75  * Increasing the number of rows and columns will decrease the overhead,
76  * but obviously increase the likelihood of recovery failure for lost
77  * packets on the receiver side.
78  *
79  * Since: 1.20
80  */
81 
82 #ifdef HAVE_CONFIG_H
83 #include "config.h"
84 #endif
85 
86 #include <gst/base/base.h>
87 #include <gst/rtp/gstrtpbuffer.h>
88 
89 #include "gstrtpst2022-1-fecdec.h"
90 
91 GST_DEBUG_CATEGORY_STATIC (gst_rtpst_2022_1_fecdec_debug);
92 #define GST_CAT_DEFAULT gst_rtpst_2022_1_fecdec_debug
93 
94 #define DEFAULT_SIZE_TIME (GST_SECOND)
95 
96 typedef struct
97 {
98   guint16 seq;
99   GstBuffer *buffer;
100 } Item;
101 
102 static GstFlowReturn store_media_item (GstRTPST_2022_1_FecDec * dec,
103     GstRTPBuffer * rtp, Item * item);
104 
105 static void
free_item(Item * item)106 free_item (Item * item)
107 {
108   gst_buffer_unref (item->buffer);
109   item->buffer = NULL;
110   g_free (item);
111 }
112 
113 static gint
cmp_items(Item * a,Item * b,gpointer unused)114 cmp_items (Item * a, Item * b, gpointer unused)
115 {
116   return gst_rtp_buffer_compare_seqnum (b->seq, a->seq);
117 }
118 
119 enum
120 {
121   PROP_0,
122   PROP_SIZE_TIME,
123 };
124 
125 struct _GstRTPST_2022_1_FecDecClass
126 {
127   GstElementClass class;
128 };
129 
130 struct _GstRTPST_2022_1_FecDec
131 {
132   GstElement element;
133 
134   GstPad *srcpad;
135   GstPad *sinkpad;
136   GList *fec_sinkpads;
137 
138   /* All the following field are protected by the OBJECT_LOCK */
139   GSequence *packets;
140   GHashTable *column_fec_packets;
141   GSequence *fec_packets[2];
142   /* N columns */
143   guint l;
144   /* N rows */
145   guint d;
146 
147   GstClockTime size_time;
148   GstClockTime max_arrival_time;
149   GstClockTime max_fec_arrival_time[2];
150 };
151 
152 #define RTP_CAPS "application/x-rtp"
153 
154 typedef struct
155 {
156   guint16 seq;
157   guint16 len;
158   guint8 E;
159   guint8 pt;
160   guint32 mask;
161   guint32 timestamp;
162   guint8 N;
163   guint8 D;
164   guint8 type;
165   guint8 index;
166   guint8 offset;
167   guint8 NA;
168   guint8 seq_ext;
169   guint8 *payload;
170   guint payload_len;
171   gboolean marker;
172   gboolean padding;
173   gboolean extension;
174 } Rtp2DFecHeader;
175 
176 static GstStaticPadTemplate fec_sink_template =
177 GST_STATIC_PAD_TEMPLATE ("fec_%u",
178     GST_PAD_SINK,
179     GST_PAD_REQUEST,
180     GST_STATIC_CAPS (RTP_CAPS));
181 
182 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
183     GST_PAD_SINK,
184     GST_PAD_ALWAYS,
185     GST_STATIC_CAPS (RTP_CAPS));
186 
187 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
188     GST_PAD_SRC,
189     GST_PAD_ALWAYS,
190     GST_STATIC_CAPS (RTP_CAPS));
191 
192 #define gst_rtpst_2022_1_fecdec_parent_class parent_class
193 G_DEFINE_TYPE (GstRTPST_2022_1_FecDec, gst_rtpst_2022_1_fecdec,
194     GST_TYPE_ELEMENT);
195 GST_ELEMENT_REGISTER_DEFINE (rtpst2022_1_fecdec, "rtpst2022-1-fecdec",
196     GST_RANK_NONE, GST_TYPE_RTPST_2022_1_FECDEC);
197 
198 static void
trim_items(GstRTPST_2022_1_FecDec * dec)199 trim_items (GstRTPST_2022_1_FecDec * dec)
200 {
201   GSequenceIter *tmp_iter, *iter = NULL;
202 
203   for (tmp_iter = g_sequence_get_begin_iter (dec->packets);
204       tmp_iter; tmp_iter = g_sequence_iter_next (tmp_iter)) {
205     Item *item;
206 
207     if (g_sequence_iter_is_end (tmp_iter))
208       break;
209 
210     item = g_sequence_get (tmp_iter);
211 
212     if (dec->max_arrival_time - GST_BUFFER_DTS_OR_PTS (item->buffer) <
213         dec->size_time)
214       break;
215 
216     iter = tmp_iter;
217   }
218 
219   if (iter) {
220     Item *item = g_sequence_get (iter);
221     GST_TRACE_OBJECT (dec,
222         "Trimming packets up to %" GST_TIME_FORMAT " (seq: %u)",
223         GST_TIME_ARGS (GST_BUFFER_DTS_OR_PTS (item->buffer)), item->seq);
224     g_sequence_remove_range (g_sequence_get_begin_iter (dec->packets),
225         g_sequence_iter_next (iter));
226   }
227 }
228 
229 static void
trim_fec_items(GstRTPST_2022_1_FecDec * dec,guint D)230 trim_fec_items (GstRTPST_2022_1_FecDec * dec, guint D)
231 {
232   GSequenceIter *tmp_iter, *iter = NULL;
233 
234   for (tmp_iter = g_sequence_get_begin_iter (dec->fec_packets[D]);
235       tmp_iter; tmp_iter = g_sequence_iter_next (tmp_iter)) {
236     Item *item;
237 
238     if (g_sequence_iter_is_end (tmp_iter))
239       break;
240 
241     item = g_sequence_get (tmp_iter);
242 
243     if (dec->max_fec_arrival_time[D] - GST_BUFFER_DTS_OR_PTS (item->buffer) <
244         dec->size_time)
245       break;
246 
247     if (!D) {
248       guint i;
249       guint16 seq;
250 
251       for (i = 0; i < dec->d; i++) {
252         seq = item->seq + i * dec->l;
253         g_hash_table_remove (dec->column_fec_packets, GUINT_TO_POINTER (seq));
254       }
255     }
256 
257     iter = tmp_iter;
258   }
259 
260   if (iter) {
261     Item *item = g_sequence_get (iter);
262     GST_TRACE_OBJECT (dec,
263         "Trimming %s FEC packets up to %" GST_TIME_FORMAT " (seq: %u)",
264         D ? "row" : "column",
265         GST_TIME_ARGS (GST_BUFFER_DTS_OR_PTS (item->buffer)), item->seq);
266     g_sequence_remove_range (g_sequence_get_begin_iter (dec->fec_packets[D]),
267         g_sequence_iter_next (iter));
268   }
269 }
270 
271 static Item *
lookup_media_packet(GstRTPST_2022_1_FecDec * dec,guint16 seqnum)272 lookup_media_packet (GstRTPST_2022_1_FecDec * dec, guint16 seqnum)
273 {
274   GSequenceIter *iter;
275   Item *ret = NULL;
276   Item dummy = { seqnum, NULL };
277 
278   iter =
279       g_sequence_lookup (dec->packets, &dummy, (GCompareDataFunc) cmp_items,
280       NULL);
281 
282   if (iter)
283     ret = g_sequence_get (iter);
284 
285   return ret;
286 }
287 
288 static gboolean
parse_header(GstRTPBuffer * rtp,Rtp2DFecHeader * fec)289 parse_header (GstRTPBuffer * rtp, Rtp2DFecHeader * fec)
290 {
291   gboolean ret = FALSE;
292   GstBitReader bits;
293   guint8 *data = gst_rtp_buffer_get_payload (rtp);
294   guint len = gst_rtp_buffer_get_payload_len (rtp);
295 
296   if (len < 16)
297     goto done;
298 
299   gst_bit_reader_init (&bits, data, len);
300 
301   fec->marker = gst_rtp_buffer_get_marker (rtp);
302   fec->padding = gst_rtp_buffer_get_padding (rtp);
303   fec->extension = gst_rtp_buffer_get_extension (rtp);
304   fec->seq = gst_bit_reader_get_bits_uint16_unchecked (&bits, 16);
305   fec->len = gst_bit_reader_get_bits_uint16_unchecked (&bits, 16);
306   fec->E = gst_bit_reader_get_bits_uint8_unchecked (&bits, 1);
307   fec->pt = gst_bit_reader_get_bits_uint8_unchecked (&bits, 7);
308   fec->mask = gst_bit_reader_get_bits_uint32_unchecked (&bits, 24);
309   fec->timestamp = gst_bit_reader_get_bits_uint32_unchecked (&bits, 32);
310   fec->N = gst_bit_reader_get_bits_uint8_unchecked (&bits, 1);
311   fec->D = gst_bit_reader_get_bits_uint8_unchecked (&bits, 1);
312   fec->type = gst_bit_reader_get_bits_uint8_unchecked (&bits, 3);
313   fec->index = gst_bit_reader_get_bits_uint8_unchecked (&bits, 3);
314   fec->offset = gst_bit_reader_get_bits_uint8_unchecked (&bits, 8);
315   fec->NA = gst_bit_reader_get_bits_uint8_unchecked (&bits, 8);
316   fec->seq_ext = gst_bit_reader_get_bits_uint8_unchecked (&bits, 8);
317   fec->payload = data + 16;
318   fec->payload_len = len - 16;
319 
320   ret = TRUE;
321 
322 done:
323   return ret;
324 }
325 
326 static Item *
get_row_fec(GstRTPST_2022_1_FecDec * dec,guint16 seqnum)327 get_row_fec (GstRTPST_2022_1_FecDec * dec, guint16 seqnum)
328 {
329   GSequenceIter *iter;
330   Item *ret = NULL;
331   Item dummy = { 0, };
332 
333   if (dec->l == G_MAXUINT)
334     goto done;
335 
336   /* Potential underflow is intended */
337   dummy.seq = seqnum - dec->l;
338 
339   iter =
340       g_sequence_search (dec->fec_packets[1], &dummy,
341       (GCompareDataFunc) cmp_items, NULL);
342 
343   if (!g_sequence_iter_is_end (iter)) {
344     gint seqdiff;
345     ret = g_sequence_get (iter);
346 
347     seqdiff = gst_rtp_buffer_compare_seqnum (ret->seq, seqnum);
348 
349     /* Now check whether the fec packet does apply */
350     if (seqdiff < 0 || seqdiff >= dec->l)
351       ret = NULL;
352   }
353 
354 done:
355   return ret;
356 }
357 
358 static Item *
get_column_fec(GstRTPST_2022_1_FecDec * dec,guint16 seqnum)359 get_column_fec (GstRTPST_2022_1_FecDec * dec, guint16 seqnum)
360 {
361   Item *ret = NULL;
362 
363   if (dec->l == G_MAXUINT || dec->d == G_MAXUINT)
364     goto done;
365 
366   ret =
367       g_hash_table_lookup (dec->column_fec_packets, GUINT_TO_POINTER (seqnum));
368 
369 done:
370   return ret;
371 }
372 
373 static void
_xor_mem(guint8 * restrict dst,const guint8 * restrict src,gsize length)374 _xor_mem (guint8 * restrict dst, const guint8 * restrict src, gsize length)
375 {
376   guint i;
377 
378   for (i = 0; i < (length / sizeof (guint64)); ++i) {
379 #if G_BYTE_ORDER == G_LITTLE_ENDIAN
380     GST_WRITE_UINT64_LE (dst,
381         GST_READ_UINT64_LE (dst) ^ GST_READ_UINT64_LE (src));
382 #else
383     GST_WRITE_UINT64_BE (dst,
384         GST_READ_UINT64_BE (dst) ^ GST_READ_UINT64_BE (src));
385 #endif
386     dst += sizeof (guint64);
387     src += sizeof (guint64);
388   }
389   for (i = 0; i < (length % sizeof (guint64)); ++i)
390     dst[i] ^= src[i];
391 }
392 
393 static GstFlowReturn
xor_items(GstRTPST_2022_1_FecDec * dec,Rtp2DFecHeader * fec,GList * packets,guint16 seqnum)394 xor_items (GstRTPST_2022_1_FecDec * dec, Rtp2DFecHeader * fec, GList * packets,
395     guint16 seqnum)
396 {
397   guint8 *xored;
398   guint32 xored_timestamp;
399   guint8 xored_pt;
400   guint16 xored_payload_len;
401   Item *item;
402   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
403   GList *tmp;
404   GstFlowReturn ret = GST_FLOW_OK;
405   GstBuffer *buffer;
406   gboolean xored_marker;
407   gboolean xored_padding;
408   gboolean xored_extension;
409 
410   /* Figure out the recovered packet length first */
411   xored_payload_len = fec->len;
412   for (tmp = packets; tmp; tmp = tmp->next) {
413     GstRTPBuffer media_rtp = GST_RTP_BUFFER_INIT;
414     Item *item = (Item *) tmp->data;
415 
416     gst_rtp_buffer_map (item->buffer, GST_MAP_READ, &media_rtp);
417     xored_payload_len ^= gst_rtp_buffer_get_payload_len (&media_rtp);
418     gst_rtp_buffer_unmap (&media_rtp);
419   }
420 
421   if (xored_payload_len > fec->payload_len) {
422     GST_WARNING_OBJECT (dec, "FEC payload len %u < length recovery %u",
423         fec->payload_len, xored_payload_len);
424     goto done;
425   }
426 
427   item = g_malloc0 (sizeof (Item));
428   item->seq = seqnum;
429   item->buffer = gst_rtp_buffer_new_allocate (xored_payload_len, 0, 0);
430   gst_rtp_buffer_map (item->buffer, GST_MAP_WRITE, &rtp);
431 
432   xored = gst_rtp_buffer_get_payload (&rtp);
433   memcpy (xored, fec->payload, xored_payload_len);
434   xored_timestamp = fec->timestamp;
435   xored_pt = fec->pt;
436   xored_marker = fec->marker;
437   xored_padding = fec->padding;
438   xored_extension = fec->extension;
439 
440   for (tmp = packets; tmp; tmp = tmp->next) {
441     GstRTPBuffer media_rtp = GST_RTP_BUFFER_INIT;
442     Item *item = (Item *) tmp->data;
443 
444     gst_rtp_buffer_map (item->buffer, GST_MAP_READ, &media_rtp);
445     _xor_mem (xored, gst_rtp_buffer_get_payload (&media_rtp),
446         MIN (gst_rtp_buffer_get_payload_len (&media_rtp), xored_payload_len));
447     xored_timestamp ^= gst_rtp_buffer_get_timestamp (&media_rtp);
448     xored_pt ^= gst_rtp_buffer_get_payload_type (&media_rtp);
449     xored_marker ^= gst_rtp_buffer_get_marker (&media_rtp);
450     xored_padding ^= gst_rtp_buffer_get_padding (&media_rtp);
451     xored_extension ^= gst_rtp_buffer_get_extension (&media_rtp);
452 
453     gst_rtp_buffer_unmap (&media_rtp);
454   }
455 
456   GST_DEBUG_OBJECT (dec,
457       "Recovered buffer through %s FEC with seqnum %u, payload len %u and timestamp %u",
458       fec->D ? "row" : "column", seqnum, xored_payload_len, xored_timestamp);
459 
460   GST_BUFFER_DTS (item->buffer) = dec->max_arrival_time;
461 
462   gst_rtp_buffer_set_timestamp (&rtp, xored_timestamp);
463   gst_rtp_buffer_set_seq (&rtp, seqnum);
464   gst_rtp_buffer_set_payload_type (&rtp, xored_pt);
465   gst_rtp_buffer_set_marker (&rtp, xored_marker);
466   gst_rtp_buffer_set_padding (&rtp, xored_padding);
467   gst_rtp_buffer_set_extension (&rtp, xored_extension);
468 
469   gst_rtp_buffer_unmap (&rtp);
470 
471   /* Store a ref on item->buffer as store_media_item may
472    * recurse and call this method again, potentially releasing
473    * the object lock and leaving our item unprotected in
474    * dec->packets
475    */
476   buffer = gst_buffer_ref (item->buffer);
477 
478   /* It is right that we should celebrate,
479    * for your brother was dead, and is alive again */
480   gst_rtp_buffer_map (item->buffer, GST_MAP_READ, &rtp);
481   ret = store_media_item (dec, &rtp, item);
482   gst_rtp_buffer_unmap (&rtp);
483 
484   if (ret == GST_FLOW_OK) {
485     /* Unlocking here is safe */
486     GST_OBJECT_UNLOCK (dec);
487     ret = gst_pad_push (dec->srcpad, buffer);
488     GST_OBJECT_LOCK (dec);
489   } else {
490     gst_buffer_unref (buffer);
491   }
492 
493 done:
494   return ret;
495 }
496 
497 /* Returns a flow value if we should discard the packet, GST_FLOW_CUSTOM_SUCCESS otherwise */
498 static GstFlowReturn
check_fec(GstRTPST_2022_1_FecDec * dec,Rtp2DFecHeader * fec)499 check_fec (GstRTPST_2022_1_FecDec * dec, Rtp2DFecHeader * fec)
500 {
501   GList *packets = NULL;
502   gint missing_seq = -1;
503   guint n_packets = 0;
504   guint required_n_packets;
505   GstFlowReturn ret = GST_FLOW_OK;
506 
507   if (fec->D) {
508     guint i = 0;
509 
510     required_n_packets = dec->l;
511 
512     for (i = 0; i < dec->l; i++) {
513       Item *item = lookup_media_packet (dec, fec->seq + i);
514 
515       if (item) {
516         packets = g_list_prepend (packets, item);
517         n_packets += 1;
518       } else {
519         missing_seq = fec->seq + i;
520       }
521     }
522   } else {
523     guint i = 0;
524 
525     required_n_packets = dec->d;
526 
527     for (i = 0; i < dec->d; i++) {
528       Item *item = lookup_media_packet (dec, fec->seq + i * dec->l);
529 
530       if (item) {
531         packets = g_list_prepend (packets, item);
532         n_packets += 1;
533       } else {
534         missing_seq = fec->seq + i * dec->l;
535       }
536     }
537   }
538 
539   if (n_packets == required_n_packets) {
540     g_assert (missing_seq == -1);
541     GST_LOG_OBJECT (dec,
542         "All media packets present, we can discard that FEC packet");
543   } else if (n_packets + 1 == required_n_packets) {
544     g_assert (missing_seq != -1);
545     ret = xor_items (dec, fec, packets, missing_seq);
546     GST_LOG_OBJECT (dec, "We have enough info to reconstruct %u", missing_seq);
547   } else {
548     ret = GST_FLOW_CUSTOM_SUCCESS;
549     GST_LOG_OBJECT (dec, "Too many media packets missing, storing FEC packet");
550   }
551   g_list_free (packets);
552 
553   return ret;
554 }
555 
556 static GstFlowReturn
check_fec_item(GstRTPST_2022_1_FecDec * dec,Item * item)557 check_fec_item (GstRTPST_2022_1_FecDec * dec, Item * item)
558 {
559   Rtp2DFecHeader fec;
560   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
561   GstFlowReturn ret;
562 
563   gst_rtp_buffer_map (item->buffer, GST_MAP_READ, &rtp);
564 
565   parse_header (&rtp, &fec);
566 
567   ret = check_fec (dec, &fec);
568 
569   gst_rtp_buffer_unmap (&rtp);
570 
571   return ret;
572 }
573 
574 static GstFlowReturn
store_media_item(GstRTPST_2022_1_FecDec * dec,GstRTPBuffer * rtp,Item * item)575 store_media_item (GstRTPST_2022_1_FecDec * dec, GstRTPBuffer * rtp, Item * item)
576 {
577   GstFlowReturn ret = GST_FLOW_OK;
578   Item *fec_item;
579   guint16 seq;
580 
581   seq = gst_rtp_buffer_get_seq (rtp);
582 
583   g_sequence_insert_sorted (dec->packets, item, (GCompareDataFunc) cmp_items,
584       NULL);
585 
586   if ((fec_item = get_row_fec (dec, seq))) {
587     ret = check_fec_item (dec, fec_item);
588     if (ret == GST_FLOW_CUSTOM_SUCCESS)
589       ret = GST_FLOW_OK;
590   }
591 
592   if (ret == GST_FLOW_OK && (fec_item = get_column_fec (dec, seq))) {
593     ret = check_fec_item (dec, fec_item);
594     if (ret == GST_FLOW_CUSTOM_SUCCESS)
595       ret = GST_FLOW_OK;
596   }
597 
598   return ret;
599 }
600 
601 static GstFlowReturn
store_media(GstRTPST_2022_1_FecDec * dec,GstRTPBuffer * rtp,GstBuffer * buffer)602 store_media (GstRTPST_2022_1_FecDec * dec, GstRTPBuffer * rtp,
603     GstBuffer * buffer)
604 {
605   Item *item;
606   guint16 seq;
607 
608   seq = gst_rtp_buffer_get_seq (rtp);
609   item = g_malloc0 (sizeof (Item));
610   item->buffer = gst_buffer_ref (buffer);
611   item->seq = seq;
612 
613   return store_media_item (dec, rtp, item);
614 }
615 
616 static GstFlowReturn
gst_rtpst_2022_1_fecdec_sink_chain_fec(GstPad * pad,GstObject * parent,GstBuffer * buffer)617 gst_rtpst_2022_1_fecdec_sink_chain_fec (GstPad * pad, GstObject * parent,
618     GstBuffer * buffer)
619 {
620   GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (parent);
621   Rtp2DFecHeader fec = { 0, };
622   guint payload_len;
623   guint8 *payload;
624   GstFlowReturn ret = GST_FLOW_OK;
625   Item *item;
626   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
627 
628   GST_OBJECT_LOCK (dec);
629 
630   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) {
631     GST_WARNING_OBJECT (pad, "Chained FEC buffer isn't valid RTP");
632     goto discard;
633   }
634 
635   payload_len = gst_rtp_buffer_get_payload_len (&rtp);
636   payload = gst_rtp_buffer_get_payload (&rtp);
637 
638   if (!parse_header (&rtp, &fec)) {
639     GST_WARNING_OBJECT (pad, "Failed to parse FEC header (payload len: %d)",
640         payload_len);
641     GST_MEMDUMP_OBJECT (pad, "Invalid payload", payload, payload_len);
642     goto discard;
643   }
644 
645   GST_TRACE_OBJECT
646       (pad,
647       "Handling FEC buffer with SNBase / N / D / NA / offset %u / %u / %u / %u / %u",
648       fec.seq, fec.N, fec.D, fec.NA, fec.offset);
649 
650   if (fec.D) {
651     if (dec->l == G_MAXUINT) {
652       dec->l = fec.NA;
653     } else if (fec.NA != dec->l) {
654       GST_WARNING_OBJECT (dec, "2D FEC dimensionality cannot change");
655       goto discard;
656     }
657 
658     if (fec.offset != 1) {
659       GST_WARNING_OBJECT (pad, "offset must be 1 for row FEC packets");
660       goto discard;
661     }
662   } else {
663     if (dec->d == G_MAXUINT) {
664       dec->d = fec.NA;
665     } else if (fec.NA != dec->d) {
666       GST_WARNING_OBJECT (dec, "2D FEC dimensionality cannot change");
667       goto discard;
668     }
669 
670     if (dec->l == G_MAXUINT) {
671       dec->l = fec.offset;
672     } else if (fec.offset != dec->l) {
673       GST_WARNING_OBJECT (dec, "2D FEC dimensionality cannot change");
674       goto discard;
675     }
676   }
677 
678   dec->max_fec_arrival_time[fec.D] = GST_BUFFER_DTS_OR_PTS (buffer);
679   trim_fec_items (dec, fec.D);
680 
681   ret = check_fec (dec, &fec);
682 
683   if (ret == GST_FLOW_CUSTOM_SUCCESS) {
684     item = g_malloc0 (sizeof (Item));
685     item->buffer = buffer;
686     item->seq = fec.seq;
687 
688     if (!fec.D) {
689       guint i;
690       guint16 seq;
691 
692       for (i = 0; i < dec->d; i++) {
693         seq = fec.seq + i * dec->l;
694         g_hash_table_insert (dec->column_fec_packets, GUINT_TO_POINTER (seq),
695             item);
696       }
697     }
698     g_sequence_insert_sorted (dec->fec_packets[fec.D], item,
699         (GCompareDataFunc) cmp_items, NULL);
700     ret = GST_FLOW_OK;
701   } else {
702     goto discard;
703   }
704 
705   gst_rtp_buffer_unmap (&rtp);
706 
707 done:
708   GST_OBJECT_UNLOCK (dec);
709   return ret;
710 
711 discard:
712   if (rtp.buffer != NULL)
713     gst_rtp_buffer_unmap (&rtp);
714 
715   gst_buffer_unref (buffer);
716 
717   goto done;
718 }
719 
720 static GstFlowReturn
gst_rtpst_2022_1_fecdec_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)721 gst_rtpst_2022_1_fecdec_sink_chain (GstPad * pad, GstObject * parent,
722     GstBuffer * buffer)
723 {
724   GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (parent);
725   GstFlowReturn ret = GST_FLOW_OK;
726   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
727 
728   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) {
729     GST_WARNING_OBJECT (pad, "Chained buffer isn't valid RTP");
730     goto error;
731   }
732 
733   GST_OBJECT_LOCK (dec);
734   dec->max_arrival_time =
735       MAX (dec->max_arrival_time, GST_BUFFER_DTS_OR_PTS (buffer));
736   trim_items (dec);
737   ret = store_media (dec, &rtp, buffer);
738   GST_OBJECT_UNLOCK (dec);
739 
740   gst_rtp_buffer_unmap (&rtp);
741 
742   if (ret == GST_FLOW_OK)
743     ret = gst_pad_push (dec->srcpad, buffer);
744 
745 done:
746   return ret;
747 
748 error:
749   gst_buffer_unref (buffer);
750   goto done;
751 }
752 
753 static gboolean
gst_rtpst_2022_1_fecdec_src_event(GstPad * pad,GstObject * parent,GstEvent * event)754 gst_rtpst_2022_1_fecdec_src_event (GstPad * pad, GstObject * parent,
755     GstEvent * event)
756 {
757   gboolean handled = FALSE;
758   gboolean ret = TRUE;
759 
760   if (!handled) {
761     gst_pad_event_default (pad, parent, event);
762   }
763 
764   return ret;
765 }
766 
767 /* Takes the object lock */
768 static void
gst_rtpst_2022_1_fecdec_reset(GstRTPST_2022_1_FecDec * dec,gboolean allocate)769 gst_rtpst_2022_1_fecdec_reset (GstRTPST_2022_1_FecDec * dec, gboolean allocate)
770 {
771   guint i;
772 
773   GST_OBJECT_LOCK (dec);
774 
775   if (dec->packets) {
776     g_sequence_free (dec->packets);
777     dec->packets = NULL;
778   }
779 
780   if (dec->column_fec_packets) {
781     g_hash_table_unref (dec->column_fec_packets);
782     dec->column_fec_packets = NULL;
783   }
784 
785   if (allocate) {
786     dec->packets = g_sequence_new ((GDestroyNotify) free_item);
787     dec->column_fec_packets = g_hash_table_new (g_direct_hash, g_direct_equal);
788   }
789 
790   for (i = 0; i < 2; i++) {
791     if (dec->fec_packets[i]) {
792       g_sequence_free (dec->fec_packets[i]);
793       dec->fec_packets[i] = NULL;
794     }
795 
796     if (allocate)
797       dec->fec_packets[i] = g_sequence_new ((GDestroyNotify) free_item);
798   }
799 
800   dec->d = G_MAXUINT;
801   dec->l = G_MAXUINT;
802 
803   GST_OBJECT_UNLOCK (dec);
804 }
805 
806 static GstStateChangeReturn
gst_rtpst_2022_1_fecdec_change_state(GstElement * element,GstStateChange transition)807 gst_rtpst_2022_1_fecdec_change_state (GstElement * element,
808     GstStateChange transition)
809 {
810   GstStateChangeReturn ret;
811   GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (element);
812 
813   switch (transition) {
814     case GST_STATE_CHANGE_READY_TO_PAUSED:
815       gst_rtpst_2022_1_fecdec_reset (dec, TRUE);
816       break;
817     case GST_STATE_CHANGE_PAUSED_TO_READY:
818       gst_rtpst_2022_1_fecdec_reset (dec, FALSE);
819       break;
820     default:
821       break;
822   }
823 
824   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
825 
826   return ret;
827 }
828 
829 static void
gst_rtpst_2022_1_fecdec_finalize(GObject * object)830 gst_rtpst_2022_1_fecdec_finalize (GObject * object)
831 {
832   GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (object);
833 
834   gst_rtpst_2022_1_fecdec_reset (dec, FALSE);
835 
836   G_OBJECT_CLASS (parent_class)->finalize (object);
837 }
838 
839 static void
gst_rtpst_2022_1_fecdec_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)840 gst_rtpst_2022_1_fecdec_set_property (GObject * object, guint prop_id,
841     const GValue * value, GParamSpec * pspec)
842 {
843   GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (object);
844 
845   switch (prop_id) {
846     case PROP_SIZE_TIME:
847       dec->size_time = g_value_get_uint64 (value);
848       break;
849     default:
850       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
851       break;
852   }
853 }
854 
855 static void
gst_rtpst_2022_1_fecdec_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)856 gst_rtpst_2022_1_fecdec_get_property (GObject * object, guint prop_id,
857     GValue * value, GParamSpec * pspec)
858 {
859   GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (object);
860 
861   switch (prop_id) {
862     case PROP_SIZE_TIME:
863       g_value_set_uint64 (value, dec->size_time);
864       break;
865     default:
866       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
867       break;
868   }
869 }
870 
871 static gboolean
gst_2d_fec_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)872 gst_2d_fec_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
873 {
874   GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (parent);
875   gboolean ret;
876 
877   if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
878     gst_rtpst_2022_1_fecdec_reset (dec, TRUE);
879 
880   ret = gst_pad_event_default (pad, parent, event);
881 
882   return ret;
883 }
884 
885 static GstIterator *
gst_rtpst_2022_1_fecdec_iterate_linked_pads(GstPad * pad,GstObject * parent)886 gst_rtpst_2022_1_fecdec_iterate_linked_pads (GstPad * pad, GstObject * parent)
887 {
888   GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (parent);
889   GstPad *otherpad = NULL;
890   GstIterator *it = NULL;
891   GValue val = { 0, };
892 
893   if (pad == dec->srcpad)
894     otherpad = dec->sinkpad;
895   else if (pad == dec->sinkpad)
896     otherpad = dec->srcpad;
897 
898   if (otherpad) {
899     g_value_init (&val, GST_TYPE_PAD);
900     g_value_set_object (&val, otherpad);
901     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
902     g_value_unset (&val);
903   }
904 
905   return it;
906 }
907 
908 static GstPad *
gst_rtpst_2022_1_fecdec_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * name,const GstCaps * caps)909 gst_rtpst_2022_1_fecdec_request_new_pad (GstElement * element,
910     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
911 {
912   GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (element);
913   GstPad *sinkpad = NULL;
914 
915   GST_DEBUG_OBJECT (element, "requesting pad");
916 
917   if (g_list_length (dec->fec_sinkpads) > 1) {
918     GST_ERROR_OBJECT (dec, "not accepting more than two fec streams");
919     goto done;
920   }
921 
922   sinkpad = gst_pad_new_from_template (templ, name);
923   gst_pad_set_chain_function (sinkpad, gst_rtpst_2022_1_fecdec_sink_chain_fec);
924   gst_element_add_pad (GST_ELEMENT (dec), sinkpad);
925   gst_pad_set_iterate_internal_links_function (sinkpad,
926       GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_iterate_linked_pads));
927 
928   gst_pad_set_active (sinkpad, TRUE);
929 
930   GST_DEBUG_OBJECT (element, "requested pad %s:%s",
931       GST_DEBUG_PAD_NAME (sinkpad));
932 
933 done:
934   return sinkpad;
935 }
936 
937 static void
gst_rtpst_2022_1_fecdec_release_pad(GstElement * element,GstPad * pad)938 gst_rtpst_2022_1_fecdec_release_pad (GstElement * element, GstPad * pad)
939 {
940   GstRTPST_2022_1_FecDec *dec = GST_RTPST_2022_1_FECDEC_CAST (element);
941 
942   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
943 
944   dec->fec_sinkpads = g_list_remove (dec->fec_sinkpads, pad);
945 
946   gst_pad_set_active (pad, FALSE);
947   gst_element_remove_pad (GST_ELEMENT_CAST (dec), pad);
948 }
949 
950 static void
gst_rtpst_2022_1_fecdec_class_init(GstRTPST_2022_1_FecDecClass * klass)951 gst_rtpst_2022_1_fecdec_class_init (GstRTPST_2022_1_FecDecClass * klass)
952 {
953   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
954   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
955 
956   gobject_class->set_property =
957       GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_set_property);
958   gobject_class->get_property =
959       GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_get_property);
960   gobject_class->finalize =
961       GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_finalize);
962 
963   g_object_class_install_property (gobject_class, PROP_SIZE_TIME,
964       g_param_spec_uint64 ("size-time", "Storage size (in ns)",
965           "The amount of data to store (in ns, 0-disable)", 0,
966           G_MAXUINT64, DEFAULT_SIZE_TIME,
967           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
968 
969   gstelement_class->change_state =
970       GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_change_state);
971   gstelement_class->request_new_pad =
972       GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_request_new_pad);
973   gstelement_class->release_pad =
974       GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_release_pad);
975 
976   gst_element_class_set_static_metadata (gstelement_class,
977       "SMPTE 2022-1 FEC decoder", "SMPTE 2022-1 FEC decoding",
978       "performs FEC as described by SMPTE 2022-1",
979       "Mathieu Duponchelle <mathieu@centricular.com>");
980 
981   gst_element_class_add_static_pad_template (gstelement_class, &sink_template);
982   gst_element_class_add_static_pad_template (gstelement_class,
983       &fec_sink_template);
984   gst_element_class_add_static_pad_template (gstelement_class, &src_template);
985 
986   GST_DEBUG_CATEGORY_INIT (gst_rtpst_2022_1_fecdec_debug,
987       "rtpst2022-1-fecdec", 0, "SMPTE 2022-1 FEC decoder element");
988 }
989 
990 static void
gst_rtpst_2022_1_fecdec_init(GstRTPST_2022_1_FecDec * dec)991 gst_rtpst_2022_1_fecdec_init (GstRTPST_2022_1_FecDec * dec)
992 {
993   dec->srcpad = gst_pad_new_from_static_template (&src_template, "src");
994   GST_PAD_SET_PROXY_CAPS (dec->srcpad);
995   gst_pad_use_fixed_caps (dec->srcpad);
996   gst_pad_set_event_function (dec->srcpad,
997       GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_src_event));
998   gst_pad_set_iterate_internal_links_function (dec->srcpad,
999       GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_iterate_linked_pads));
1000   gst_element_add_pad (GST_ELEMENT (dec), dec->srcpad);
1001 
1002   dec->sinkpad = gst_pad_new_from_static_template (&sink_template, "sink");
1003   GST_PAD_SET_PROXY_CAPS (dec->sinkpad);
1004   gst_pad_set_chain_function (dec->sinkpad, gst_rtpst_2022_1_fecdec_sink_chain);
1005   gst_pad_set_event_function (dec->sinkpad,
1006       GST_DEBUG_FUNCPTR (gst_2d_fec_sink_event));
1007   gst_pad_set_iterate_internal_links_function (dec->sinkpad,
1008       GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecdec_iterate_linked_pads));
1009   gst_element_add_pad (GST_ELEMENT (dec), dec->sinkpad);
1010 
1011   dec->d = G_MAXUINT;
1012   dec->l = G_MAXUINT;
1013 }
1014