• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) <2020> Mathieu Duponchelle <mathieu@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 
20 /**
21  * SECTION:element-rtpst2022-1-fecenc
22  * @see_also: #element-rtpst2022-1-fecdec
23  *
24  * This element takes as input a media stream and up to two FEC
25  * streams as described in SMPTE 2022-1: Forward Error Correction
26  * for Real-Time Video/Audio Transport Over IP Networks, and makes
27  * use of the FEC packets to recover media packets that may have
28  * gotten lost.
29  *
30  * ## sender / receiver example
31  *
32  * ``` shell
33  * gst-launch-1.0 \
34  *   rtpbin name=rtp fec-encoders='fec,0="rtpst2022-1-fecenc\ rows\=5\ columns\=5";' \
35  *   uridecodebin uri=file:///path/to/video/file ! x264enc key-int-max=60 tune=zerolatency ! \
36  *     queue ! mpegtsmux ! rtpmp2tpay ssrc=0 ! rtp.send_rtp_sink_0 \
37  *   rtp.send_rtp_src_0 ! udpsink host=127.0.0.1 port=5000 \
38  *   rtp.send_fec_src_0_0 ! udpsink host=127.0.0.1 port=5002 async=false \
39  *   rtp.send_fec_src_0_1 ! udpsink host=127.0.0.1 port=5004 async=false
40  * ```
41  *
42  * ``` shell
43  * gst-launch-1.0 \
44  *   rtpbin latency=500 fec-decoders='fec,0="rtpst2022-1-fecdec\ size-time\=1000000000";' name=rtp \
45  *   udpsrc address=127.0.0.1 port=5002 caps="application/x-rtp, payload=96" ! queue ! rtp.recv_fec_sink_0_0 \
46  *   udpsrc address=127.0.0.1 port=5004 caps="application/x-rtp, payload=96" ! queue ! rtp.recv_fec_sink_0_1 \
47  *   udpsrc address=127.0.0.1 port=5000 caps="application/x-rtp, media=video, clock-rate=90000, encoding-name=mp2t, payload=33" ! \
48  *     queue ! netsim drop-probability=0.05 ! rtp.recv_rtp_sink_0 \
49  *   rtp. ! decodebin ! videoconvert ! queue ! autovideosink
50  * ```
51  *
52  * With the above command line, as the media packet size is constant,
53  * the fec overhead can be approximated to the number of fec packets
54  * per 2-d matrix of media packet, here 10 fec packets for each 25
55  * media packets.
56  *
57  * Increasing the number of rows and columns will decrease the overhead,
58  * but obviously increase the likelihood of recovery failure for lost
59  * packets on the receiver side.
60  *
61  * Since: 1.20
62  */
63 
64 #ifdef HAVE_CONFIG_H
65 #include "config.h"
66 #endif
67 
68 #include <gst/base/base.h>
69 #include <gst/rtp/gstrtpbuffer.h>
70 
71 #include "gstrtpst2022-1-fecenc.h"
72 
73 #if !GLIB_CHECK_VERSION(2, 60, 0)
74 #define g_queue_clear_full queue_clear_full
75 static void
queue_clear_full(GQueue * queue,GDestroyNotify free_func)76 queue_clear_full (GQueue * queue, GDestroyNotify free_func)
77 {
78   gpointer data;
79 
80   while ((data = g_queue_pop_head (queue)) != NULL)
81     free_func (data);
82 }
83 #endif
84 
85 GST_DEBUG_CATEGORY_STATIC (gst_rtpst_2022_1_fecenc_debug);
86 #define GST_CAT_DEFAULT gst_rtpst_2022_1_fecenc_debug
87 
88 enum
89 {
90   PROP_0,
91   PROP_COLUMNS,
92   PROP_ROWS,
93   PROP_PT,
94   PROP_ENABLE_COLUMN,
95   PROP_ENABLE_ROW,
96 };
97 
98 #define DEFAULT_ROWS 0
99 #define DEFAULT_COLUMNS 0
100 #define DEFAULT_PT 96
101 #define DEFAULT_ENABLE_COLUMN TRUE
102 #define DEFAULT_ENABLE_ROW TRUE
103 
104 typedef struct
105 {
106   guint16 target_media_seq;     /* The media seqnum we want to send that packet alongside */
107   guint16 seq_base;             /* Only used for logging purposes */
108   GstBuffer *buffer;
109 } Item;
110 
111 typedef struct
112 {
113   guint8 *xored_payload;
114   guint32 xored_timestamp;
115   guint8 xored_pt;
116   guint16 xored_payload_len;
117   gboolean xored_marker;
118   gboolean xored_padding;
119   gboolean xored_extension;
120 
121   guint16 seq_base;
122 
123   guint16 payload_len;
124   guint n_packets;
125 } FecPacket;
126 
127 struct _GstRTPST_2022_1_FecEncClass
128 {
129   GstElementClass class;
130 };
131 
132 struct _GstRTPST_2022_1_FecEnc
133 {
134   GstElement element;
135 
136   GstPad *srcpad;
137   GstPad *sinkpad;
138 
139   /* These pads do not participate in the flow return of the element,
140    * which should continue working even if the sending of FEC packets
141    * fails
142    */
143   GstPad *row_fec_srcpad;
144   GstPad *column_fec_srcpad;
145 
146   /* The following fields are only accessed on state change or from the
147    * streaming thread, and only settable in state < PAUSED */
148 
149   /* N columns */
150   guint l;
151   /* N rows */
152   guint d;
153 
154   /* Whether we have pushed initial events on the column FEC source pad */
155   gboolean column_events_pushed;
156 
157   /* The current row FEC packet */
158   FecPacket *row;
159   /* Tracks the row seqnum */
160   guint16 row_seq;
161   /* Whether we have pushed initial events on the row FEC source pad */
162   gboolean row_events_pushed;
163 
164   /* These two fields are used to enforce input seqnum consecutiveness,
165    * and to determine when column FEC packets should be pushed */
166   gboolean last_media_seqnum_set;
167   guint16 last_media_seqnum;
168 
169   /* This field is used to timestamp our FEC packets, we just piggy back */
170   guint32 last_media_timestamp;
171 
172   /* The payload type of the FEC packets */
173   gint pt;
174 
175   /* The following fields can be changed while PLAYING, and are
176    * protected with the OBJECT_LOCK
177    */
178   /* Tracks the property, can be changed while PLAYING */
179   gboolean enable_row;
180   /* Tracks the property, can be changed while PLAYING */
181   gboolean enable_column;
182 
183   /* Array of FecPackets, with size enc->l */
184   GPtrArray *columns;
185   /* Index of the current column in the array above */
186   guint current_column;
187   /* Tracks the column seqnum */
188   guint16 column_seq;
189   /* Column FEC packets must be delayed to make them more resilient
190    * to loss bursts, we store them here */
191   GQueue queued_column_packets;
192 };
193 
194 #define RTP_CAPS "application/x-rtp"
195 
196 static GstStaticPadTemplate fec_src_template =
197 GST_STATIC_PAD_TEMPLATE ("fec_%u",
198     GST_PAD_SRC,
199     GST_PAD_SOMETIMES,
200     GST_STATIC_CAPS (RTP_CAPS));
201 
202 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
203     GST_PAD_SINK,
204     GST_PAD_ALWAYS,
205     GST_STATIC_CAPS (RTP_CAPS));
206 
207 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
208     GST_PAD_SRC,
209     GST_PAD_ALWAYS,
210     GST_STATIC_CAPS (RTP_CAPS));
211 
212 #define gst_rtpst_2022_1_fecenc_parent_class parent_class
213 G_DEFINE_TYPE (GstRTPST_2022_1_FecEnc, gst_rtpst_2022_1_fecenc,
214     GST_TYPE_ELEMENT);
215 GST_ELEMENT_REGISTER_DEFINE (rtpst2022_1_fecenc, "rtpst2022-1-fecenc",
216     GST_RANK_NONE, GST_TYPE_RTPST_2022_1_FECENC);
217 
218 static void
free_item(Item * item)219 free_item (Item * item)
220 {
221   if (item->buffer)
222     gst_buffer_unref (item->buffer);
223 
224   g_free (item);
225 }
226 
227 static void
free_fec_packet(FecPacket * packet)228 free_fec_packet (FecPacket * packet)
229 {
230   if (packet->xored_payload)
231     g_free (packet->xored_payload);
232   g_free (packet);
233 }
234 
235 static void
_xor_mem(guint8 * restrict dst,const guint8 * restrict src,gsize length)236 _xor_mem (guint8 * restrict dst, const guint8 * restrict src, gsize length)
237 {
238   guint i;
239 
240   for (i = 0; i < (length / sizeof (guint64)); ++i) {
241 #if G_BYTE_ORDER == G_LITTLE_ENDIAN
242     GST_WRITE_UINT64_LE (dst,
243         GST_READ_UINT64_LE (dst) ^ GST_READ_UINT64_LE (src));
244 #else
245     GST_WRITE_UINT64_BE (dst,
246         GST_READ_UINT64_BE (dst) ^ GST_READ_UINT64_BE (src));
247 #endif
248     dst += sizeof (guint64);
249     src += sizeof (guint64);
250   }
251   for (i = 0; i < (length % sizeof (guint64)); ++i)
252     dst[i] ^= src[i];
253 }
254 
255 static void
fec_packet_update(FecPacket * fec,GstRTPBuffer * rtp)256 fec_packet_update (FecPacket * fec, GstRTPBuffer * rtp)
257 {
258   if (fec->n_packets == 0) {
259     fec->seq_base = gst_rtp_buffer_get_seq (rtp);
260     fec->payload_len = gst_rtp_buffer_get_payload_len (rtp);
261     fec->xored_payload_len = gst_rtp_buffer_get_payload_len (rtp);
262     fec->xored_pt = gst_rtp_buffer_get_payload_type (rtp);
263     fec->xored_timestamp = gst_rtp_buffer_get_timestamp (rtp);
264     fec->xored_marker = gst_rtp_buffer_get_marker (rtp);
265     fec->xored_padding = gst_rtp_buffer_get_padding (rtp);
266     fec->xored_extension = gst_rtp_buffer_get_extension (rtp);
267     fec->xored_payload = g_malloc (sizeof (guint8) * fec->payload_len);
268     memcpy (fec->xored_payload, gst_rtp_buffer_get_payload (rtp),
269         fec->payload_len);
270   } else {
271     guint plen = gst_rtp_buffer_get_payload_len (rtp);
272 
273     if (fec->payload_len < plen) {
274       fec->xored_payload =
275           g_realloc (fec->xored_payload, sizeof (guint8) * plen);
276       memset (fec->xored_payload + fec->payload_len, 0,
277           plen - fec->payload_len);
278       fec->payload_len = plen;
279     }
280 
281     fec->xored_payload_len ^= plen;
282     fec->xored_pt ^= gst_rtp_buffer_get_payload_type (rtp);
283     fec->xored_timestamp ^= gst_rtp_buffer_get_timestamp (rtp);
284     fec->xored_marker ^= gst_rtp_buffer_get_marker (rtp);
285     fec->xored_padding ^= gst_rtp_buffer_get_padding (rtp);
286     fec->xored_extension ^= gst_rtp_buffer_get_extension (rtp);
287     _xor_mem (fec->xored_payload, gst_rtp_buffer_get_payload (rtp), plen);
288   }
289 
290   fec->n_packets += 1;
291 }
292 
293 static void
push_initial_events(GstRTPST_2022_1_FecEnc * enc,GstPad * pad,const gchar * id)294 push_initial_events (GstRTPST_2022_1_FecEnc * enc, GstPad * pad,
295     const gchar * id)
296 {
297   gchar *stream_id;
298   GstCaps *caps;
299   GstSegment segment;
300 
301   stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT (enc), id);
302   gst_pad_push_event (pad, gst_event_new_stream_start (stream_id));
303   g_free (stream_id);
304 
305   caps = gst_caps_new_simple ("application/x-rtp",
306       "payload", G_TYPE_UINT, enc->pt, "ssrc", G_TYPE_UINT, 0, NULL);
307   gst_pad_push_event (pad, gst_event_new_caps (caps));
308   gst_caps_unref (caps);
309 
310   gst_segment_init (&segment, GST_FORMAT_TIME);
311   gst_pad_push_event (pad, gst_event_new_segment (&segment));
312 }
313 
314 static void
queue_fec_packet(GstRTPST_2022_1_FecEnc * enc,FecPacket * fec,gboolean row)315 queue_fec_packet (GstRTPST_2022_1_FecEnc * enc, FecPacket * fec, gboolean row)
316 {
317   GstBuffer *buffer = gst_rtp_buffer_new_allocate (fec->payload_len + 16, 0, 0);
318   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
319   GstBitWriter bits;
320   guint8 *data;
321 
322   gst_rtp_buffer_map (buffer, GST_MAP_WRITE, &rtp);
323   data = gst_rtp_buffer_get_payload (&rtp);
324   memset (data, 0x00, 16);
325 
326   gst_bit_writer_init_with_data (&bits, data, 17, FALSE);
327 
328   gst_bit_writer_put_bits_uint16 (&bits, fec->seq_base, 16);    /* SNBase low bits */
329   gst_bit_writer_put_bits_uint16 (&bits, fec->xored_payload_len, 16);   /* Length Recovery */
330   gst_bit_writer_put_bits_uint8 (&bits, 1, 1);  /* E */
331   gst_bit_writer_put_bits_uint8 (&bits, fec->xored_pt, 7);      /* PT recovery */
332   gst_bit_writer_put_bits_uint32 (&bits, 0, 24);        /* Mask */
333   gst_bit_writer_put_bits_uint32 (&bits, fec->xored_timestamp, 32);     /* TS recovery */
334   gst_bit_writer_put_bits_uint8 (&bits, 0, 1);  /* N */
335   gst_bit_writer_put_bits_uint8 (&bits, row ? 1 : 0, 1);        /* D */
336   gst_bit_writer_put_bits_uint8 (&bits, 0, 3);  /* type */
337   gst_bit_writer_put_bits_uint8 (&bits, 0, 3);  /* index */
338   gst_bit_writer_put_bits_uint8 (&bits, row ? 1 : enc->l, 8);   /* Offset */
339   gst_bit_writer_put_bits_uint8 (&bits, fec->n_packets, 8);     /* NA */
340   gst_bit_writer_put_bits_uint8 (&bits, 0, 8);  /* SNBase ext bits */
341 
342   memcpy (data + 16, fec->xored_payload, fec->payload_len);
343 
344   gst_bit_writer_reset (&bits);
345 
346   gst_rtp_buffer_set_payload_type (&rtp, enc->pt);
347   gst_rtp_buffer_set_seq (&rtp, row ? enc->row_seq++ : enc->column_seq++);
348   gst_rtp_buffer_set_marker (&rtp, fec->xored_marker);
349   gst_rtp_buffer_set_padding (&rtp, fec->xored_padding);
350   gst_rtp_buffer_set_extension (&rtp, fec->xored_extension);
351 
352   /* We're sending it out immediately */
353   if (row)
354     gst_rtp_buffer_set_timestamp (&rtp, enc->last_media_timestamp);
355 
356   gst_rtp_buffer_unmap (&rtp);
357 
358   /* We can send row FEC packets immediately, column packets need
359    * delaying by L <= delay < L * D
360    */
361   if (row) {
362     GstFlowReturn ret;
363 
364     GST_LOG_OBJECT (enc,
365         "Pushing row FEC packet, seq base: %u, media seqnum: %u",
366         fec->seq_base, enc->last_media_seqnum);
367 
368     /* Safe to unlock here */
369     GST_OBJECT_UNLOCK (enc);
370     ret = gst_pad_push (enc->row_fec_srcpad, buffer);
371     GST_OBJECT_LOCK (enc);
372 
373     if (ret != GST_FLOW_OK && ret != GST_FLOW_FLUSHING)
374       GST_WARNING_OBJECT (enc->row_fec_srcpad,
375           "Failed to push row FEC packet: %s", gst_flow_get_name (ret));
376   } else {
377     Item *item = g_malloc0 (sizeof (Item));
378 
379     item->buffer = buffer;
380     item->seq_base = fec->seq_base;
381     /* Let's get cute and linearize */
382     item->target_media_seq =
383         enc->last_media_seqnum + enc->l - enc->current_column +
384         enc->d * enc->current_column;
385 
386     g_queue_push_tail (&enc->queued_column_packets, item);
387   }
388 }
389 
390 static GstFlowReturn
gst_rtpst_2022_1_fecenc_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)391 gst_rtpst_2022_1_fecenc_sink_chain (GstPad * pad, GstObject * parent,
392     GstBuffer * buffer)
393 {
394   GstRTPST_2022_1_FecEnc *enc = GST_RTPST_2022_1_FECENC_CAST (parent);
395   GstFlowReturn ret = GST_FLOW_OK;
396   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
397 
398   if (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)) {
399     GST_ERROR_OBJECT (enc, "Chained buffer isn't valid RTP");
400     goto error;
401   }
402 
403   if (gst_rtp_buffer_get_ssrc (&rtp) != 0) {
404     GST_ERROR_OBJECT (enc, "Chained buffer must have SSRC == 0");
405     goto error;
406   }
407 
408   if (enc->last_media_seqnum_set
409       && (guint16) (enc->last_media_seqnum + 1) !=
410       gst_rtp_buffer_get_seq (&rtp)) {
411     GST_ERROR_OBJECT (enc, "consecutive sequence numbers are required");
412     goto error;
413   }
414 
415   if (!enc->row_events_pushed) {
416     push_initial_events (enc, enc->row_fec_srcpad, "row-fec");
417     enc->row_events_pushed = TRUE;
418   }
419 
420   if (!enc->column_events_pushed) {
421     push_initial_events (enc, enc->column_fec_srcpad, "column-fec");
422     enc->column_events_pushed = TRUE;
423   }
424 
425   enc->last_media_timestamp = gst_rtp_buffer_get_timestamp (&rtp);
426   enc->last_media_seqnum = gst_rtp_buffer_get_seq (&rtp);
427   enc->last_media_seqnum_set = TRUE;
428 
429   GST_OBJECT_LOCK (enc);
430   if (enc->enable_row && enc->l) {
431     g_assert (enc->row->n_packets < enc->l);
432     fec_packet_update (enc->row, &rtp);
433     if (enc->row->n_packets == enc->l) {
434       queue_fec_packet (enc, enc->row, TRUE);
435       g_free (enc->row->xored_payload);
436       memset (enc->row, 0x00, sizeof (FecPacket));
437     }
438   }
439 
440   if (enc->enable_column && enc->l && enc->d) {
441     FecPacket *column = g_ptr_array_index (enc->columns, enc->current_column);
442 
443     fec_packet_update (column, &rtp);
444     if (column->n_packets == enc->d) {
445       queue_fec_packet (enc, column, FALSE);
446       g_free (column->xored_payload);
447       memset (column, 0x00, sizeof (FecPacket));
448     }
449 
450     enc->current_column++;
451     enc->current_column %= enc->l;
452   }
453 
454   gst_rtp_buffer_unmap (&rtp);
455 
456   if (g_queue_get_length (&enc->queued_column_packets) > 0) {
457     Item *item = g_queue_peek_head (&enc->queued_column_packets);
458 
459     if (item->target_media_seq == enc->last_media_seqnum) {
460       GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
461 
462       g_queue_pop_head (&enc->queued_column_packets);
463       GST_LOG_OBJECT (enc,
464           "Pushing column FEC packet, seq base: %u, media seqnum: %u",
465           item->seq_base, enc->last_media_seqnum);
466       gst_rtp_buffer_map (item->buffer, GST_MAP_WRITE, &rtp);
467       gst_rtp_buffer_set_timestamp (&rtp, enc->last_media_timestamp);
468       gst_rtp_buffer_unmap (&rtp);
469       GST_OBJECT_UNLOCK (enc);
470       ret =
471           gst_pad_push (enc->column_fec_srcpad, gst_buffer_ref (item->buffer));
472       GST_OBJECT_LOCK (enc);
473 
474       if (ret != GST_FLOW_OK && ret != GST_FLOW_FLUSHING)
475         GST_WARNING_OBJECT (enc->column_fec_srcpad,
476             "Failed to push column FEC packet: %s", gst_flow_get_name (ret));
477 
478       free_item (item);
479     }
480   }
481   GST_OBJECT_UNLOCK (enc);
482 
483   ret = gst_pad_push (enc->srcpad, buffer);
484 
485 done:
486   return ret;
487 
488 error:
489   if (rtp.buffer)
490     gst_rtp_buffer_unmap (&rtp);
491   gst_buffer_unref (buffer);
492   ret = GST_FLOW_ERROR;
493   goto done;
494 }
495 
496 static GstIterator *
gst_rtpst_2022_1_fecenc_iterate_linked_pads(GstPad * pad,GstObject * parent)497 gst_rtpst_2022_1_fecenc_iterate_linked_pads (GstPad * pad, GstObject * parent)
498 {
499   GstRTPST_2022_1_FecEnc *enc = GST_RTPST_2022_1_FECENC_CAST (parent);
500   GstPad *otherpad = NULL;
501   GstIterator *it = NULL;
502   GValue val = { 0, };
503 
504   if (pad == enc->srcpad)
505     otherpad = enc->sinkpad;
506   else if (pad == enc->sinkpad)
507     otherpad = enc->srcpad;
508 
509   if (otherpad) {
510     g_value_init (&val, GST_TYPE_PAD);
511     g_value_set_object (&val, otherpad);
512     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
513     g_value_unset (&val);
514   }
515 
516   return it;
517 }
518 
519 static void
gst_rtpst_2022_1_fecenc_reset(GstRTPST_2022_1_FecEnc * enc,gboolean allocate)520 gst_rtpst_2022_1_fecenc_reset (GstRTPST_2022_1_FecEnc * enc, gboolean allocate)
521 {
522   if (enc->row) {
523     free_fec_packet (enc->row);
524     enc->row = NULL;
525   }
526 
527   if (enc->columns) {
528     g_ptr_array_unref (enc->columns);
529     enc->columns = NULL;
530   }
531 
532   if (enc->row_fec_srcpad) {
533     gst_element_remove_pad (GST_ELEMENT (enc), enc->row_fec_srcpad);
534     enc->row_fec_srcpad = NULL;
535   }
536 
537   if (enc->column_fec_srcpad) {
538     gst_element_remove_pad (GST_ELEMENT (enc), enc->column_fec_srcpad);
539     enc->column_fec_srcpad = NULL;
540   }
541 
542   g_queue_clear_full (&enc->queued_column_packets, (GDestroyNotify) free_item);
543 
544   if (allocate) {
545     guint i;
546 
547     enc->row = g_malloc0 (sizeof (FecPacket));
548     enc->columns =
549         g_ptr_array_new_full (enc->l, (GDestroyNotify) free_fec_packet);
550 
551     for (i = 0; i < enc->l; i++) {
552       g_ptr_array_add (enc->columns, g_malloc0 (sizeof (FecPacket)));
553     }
554 
555     g_queue_init (&enc->queued_column_packets);
556 
557     enc->column_fec_srcpad =
558         gst_pad_new_from_static_template (&fec_src_template, "fec_0");
559     gst_pad_set_active (enc->column_fec_srcpad, TRUE);
560     gst_pad_set_iterate_internal_links_function (enc->column_fec_srcpad,
561         GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecenc_iterate_linked_pads));
562     gst_element_add_pad (GST_ELEMENT (enc), enc->column_fec_srcpad);
563 
564     enc->row_fec_srcpad =
565         gst_pad_new_from_static_template (&fec_src_template, "fec_1");
566     gst_pad_set_active (enc->row_fec_srcpad, TRUE);
567     gst_pad_set_iterate_internal_links_function (enc->row_fec_srcpad,
568         GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecenc_iterate_linked_pads));
569     gst_element_add_pad (GST_ELEMENT (enc), enc->row_fec_srcpad);
570 
571     gst_element_no_more_pads (GST_ELEMENT (enc));
572   }
573 
574   enc->current_column = 0;
575   enc->last_media_seqnum_set = FALSE;
576 }
577 
578 static GstStateChangeReturn
gst_rtpst_2022_1_fecenc_change_state(GstElement * element,GstStateChange transition)579 gst_rtpst_2022_1_fecenc_change_state (GstElement * element,
580     GstStateChange transition)
581 {
582   GstStateChangeReturn ret;
583   GstRTPST_2022_1_FecEnc *enc = GST_RTPST_2022_1_FECENC_CAST (element);
584 
585   switch (transition) {
586     case GST_STATE_CHANGE_READY_TO_PAUSED:
587       gst_rtpst_2022_1_fecenc_reset (enc, TRUE);
588       break;
589     case GST_STATE_CHANGE_PAUSED_TO_READY:
590       gst_rtpst_2022_1_fecenc_reset (enc, FALSE);
591       break;
592     default:
593       break;
594   }
595 
596   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
597 
598   return ret;
599 }
600 
601 static void
gst_rtpst_2022_1_fecenc_finalize(GObject * object)602 gst_rtpst_2022_1_fecenc_finalize (GObject * object)
603 {
604   GstRTPST_2022_1_FecEnc *enc = GST_RTPST_2022_1_FECENC_CAST (object);
605 
606   gst_rtpst_2022_1_fecenc_reset (enc, FALSE);
607 
608   G_OBJECT_CLASS (parent_class)->finalize (object);
609 }
610 
611 static void
gst_rtpst_2022_1_fecenc_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)612 gst_rtpst_2022_1_fecenc_set_property (GObject * object, guint prop_id,
613     const GValue * value, GParamSpec * pspec)
614 {
615   GstRTPST_2022_1_FecEnc *enc = GST_RTPST_2022_1_FECENC_CAST (object);
616 
617   if (GST_STATE (enc) > GST_STATE_READY) {
618     GST_ERROR_OBJECT (enc,
619         "rtpst2022-1-fecenc properties can't be changed in PLAYING or PAUSED state");
620     return;
621   }
622 
623   switch (prop_id) {
624     case PROP_COLUMNS:
625       enc->l = g_value_get_uint (value);
626       break;
627     case PROP_ROWS:
628       enc->d = g_value_get_uint (value);
629       break;
630     case PROP_PT:
631       enc->pt = g_value_get_int (value);
632       break;
633     case PROP_ENABLE_COLUMN:
634       GST_OBJECT_LOCK (enc);
635       enc->enable_column = g_value_get_boolean (value);
636       if (!enc->enable_column) {
637         guint i;
638 
639         if (enc->columns) {
640           for (i = 0; i < enc->l; i++) {
641             FecPacket *column = g_ptr_array_index (enc->columns, i);
642             g_free (column->xored_payload);
643             memset (column, 0x00, sizeof (FecPacket));
644           }
645         }
646         enc->current_column = 0;
647         enc->column_seq = 0;
648         g_queue_clear_full (&enc->queued_column_packets,
649             (GDestroyNotify) free_item);
650       }
651       GST_OBJECT_UNLOCK (enc);
652       break;
653     case PROP_ENABLE_ROW:
654       GST_OBJECT_LOCK (enc);
655       enc->enable_row = g_value_get_boolean (value);
656       GST_OBJECT_UNLOCK (enc);
657       break;
658     default:
659       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
660       break;
661   }
662 }
663 
664 static void
gst_rtpst_2022_1_fecenc_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)665 gst_rtpst_2022_1_fecenc_get_property (GObject * object, guint prop_id,
666     GValue * value, GParamSpec * pspec)
667 {
668   GstRTPST_2022_1_FecEnc *enc = GST_RTPST_2022_1_FECENC_CAST (object);
669 
670   switch (prop_id) {
671     case PROP_COLUMNS:
672       g_value_set_uint (value, enc->l);
673       break;
674     case PROP_ROWS:
675       g_value_set_uint (value, enc->d);
676       break;
677     case PROP_PT:
678       g_value_set_int (value, enc->pt);
679       break;
680     case PROP_ENABLE_COLUMN:
681       GST_OBJECT_LOCK (enc);
682       g_value_set_boolean (value, enc->enable_column);
683       GST_OBJECT_UNLOCK (enc);
684       break;
685     case PROP_ENABLE_ROW:
686       GST_OBJECT_LOCK (enc);
687       g_value_set_boolean (value, enc->enable_row);
688       GST_OBJECT_UNLOCK (enc);
689       break;
690     default:
691       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
692       break;
693   }
694 }
695 
696 static gboolean
gst_2d_fec_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)697 gst_2d_fec_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
698 {
699   GstRTPST_2022_1_FecEnc *enc = GST_RTPST_2022_1_FECENC_CAST (parent);
700   gboolean ret;
701 
702   if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_STOP)
703     gst_rtpst_2022_1_fecenc_reset (enc, TRUE);
704 
705   ret = gst_pad_event_default (pad, parent, event);
706 
707   return ret;
708 }
709 
710 static void
gst_rtpst_2022_1_fecenc_class_init(GstRTPST_2022_1_FecEncClass * klass)711 gst_rtpst_2022_1_fecenc_class_init (GstRTPST_2022_1_FecEncClass * klass)
712 {
713   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
714   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
715 
716   gobject_class->set_property =
717       GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecenc_set_property);
718   gobject_class->get_property =
719       GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecenc_get_property);
720   gobject_class->finalize =
721       GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecenc_finalize);
722 
723   g_object_class_install_property (gobject_class, PROP_COLUMNS,
724       g_param_spec_uint ("columns", "Columns",
725           "Number of columns to apply row FEC on, 0=disabled", 0,
726           255, DEFAULT_COLUMNS,
727           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS |
728           GST_PARAM_MUTABLE_READY));
729 
730   g_object_class_install_property (gobject_class, PROP_ROWS,
731       g_param_spec_uint ("rows", "Rows",
732           "Number of rows to apply column FEC on, 0=disabled", 0,
733           255, DEFAULT_ROWS,
734           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS |
735           GST_PARAM_MUTABLE_READY));
736 
737   g_object_class_install_property (gobject_class, PROP_PT,
738       g_param_spec_int ("pt", "Payload Type",
739           "The payload type of FEC packets", 96,
740           255, DEFAULT_PT,
741           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS |
742           GST_PARAM_MUTABLE_READY));
743 
744   g_object_class_install_property (gobject_class, PROP_ENABLE_COLUMN,
745       g_param_spec_boolean ("enable-column-fec", "Enable Column FEC",
746           "Whether the encoder should compute and send column FEC",
747           DEFAULT_ENABLE_COLUMN,
748           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS |
749           GST_PARAM_MUTABLE_PLAYING));
750 
751   g_object_class_install_property (gobject_class, PROP_ENABLE_ROW,
752       g_param_spec_boolean ("enable-row-fec", "Enable Row FEC",
753           "Whether the encoder should compute and send row FEC",
754           DEFAULT_ENABLE_ROW,
755           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS |
756           GST_PARAM_MUTABLE_PLAYING));
757 
758   gstelement_class->change_state =
759       GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecenc_change_state);
760 
761   gst_element_class_set_static_metadata (gstelement_class,
762       "SMPTE 2022-1 FEC encoder", "SMPTE 2022-1 FEC encoding",
763       "performs FEC as described by SMPTE 2022-1",
764       "Mathieu Duponchelle <mathieu@centricular.com>");
765 
766   gst_element_class_add_static_pad_template (gstelement_class, &sink_template);
767   gst_element_class_add_static_pad_template (gstelement_class,
768       &fec_src_template);
769   gst_element_class_add_static_pad_template (gstelement_class, &src_template);
770 
771   GST_DEBUG_CATEGORY_INIT (gst_rtpst_2022_1_fecenc_debug,
772       "rtpst2022-1-fecenc", 0, "SMPTE 2022-1 FEC encoder element");
773 }
774 
775 static void
gst_rtpst_2022_1_fecenc_init(GstRTPST_2022_1_FecEnc * enc)776 gst_rtpst_2022_1_fecenc_init (GstRTPST_2022_1_FecEnc * enc)
777 {
778   enc->srcpad = gst_pad_new_from_static_template (&src_template, "src");
779   gst_pad_use_fixed_caps (enc->srcpad);
780   GST_PAD_SET_PROXY_CAPS (enc->srcpad);
781   gst_pad_set_iterate_internal_links_function (enc->srcpad,
782       GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecenc_iterate_linked_pads));
783   gst_element_add_pad (GST_ELEMENT (enc), enc->srcpad);
784 
785   enc->sinkpad = gst_pad_new_from_static_template (&sink_template, "sink");
786   GST_PAD_SET_PROXY_CAPS (enc->sinkpad);
787   gst_pad_set_chain_function (enc->sinkpad, gst_rtpst_2022_1_fecenc_sink_chain);
788   gst_pad_set_event_function (enc->sinkpad,
789       GST_DEBUG_FUNCPTR (gst_2d_fec_sink_event));
790   gst_pad_set_iterate_internal_links_function (enc->sinkpad,
791       GST_DEBUG_FUNCPTR (gst_rtpst_2022_1_fecenc_iterate_linked_pads));
792   gst_element_add_pad (GST_ELEMENT (enc), enc->sinkpad);
793 
794   enc->d = 0;
795   enc->l = 0;
796 }
797