• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2015, Collabora Ltd.
3  *
4  * Redistribution and use in source and binary forms, with or without modification,
5  * are permitted provided that the following conditions are met:
6  *
7  * 1. Redistributions of source code must retain the above copyright notice, this
8  * list of conditions and the following disclaimer.
9  *
10  * 2. Redistributions in binary form must reproduce the above copyright notice, this
11  * list of conditions and the following disclaimer in the documentation and/or other
12  * materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
17  * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
18  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
19  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
21  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
23  * OF SUCH DAMAGE.
24  */
25 
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29 #include "gstsctpenc.h"
30 
31 #include <gst/sctp/sctpsendmeta.h>
32 #include <stdio.h>
33 
34 GST_DEBUG_CATEGORY_STATIC (gst_sctp_enc_debug_category);
35 #define GST_CAT_DEFAULT gst_sctp_enc_debug_category
36 
37 #define gst_sctp_enc_parent_class parent_class
38 G_DEFINE_TYPE (GstSctpEnc, gst_sctp_enc, GST_TYPE_ELEMENT);
39 GST_ELEMENT_REGISTER_DEFINE (sctpenc, "sctpenc", GST_RANK_NONE,
40     GST_TYPE_SCTP_ENC);
41 
42 static GstStaticPadTemplate sink_template =
43 GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK,
44     GST_PAD_REQUEST, GST_STATIC_CAPS_ANY);
45 
46 static GstStaticPadTemplate src_template =
47 GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC,
48     GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-sctp"));
49 
50 enum
51 {
52   SIGNAL_SCTP_ASSOCIATION_ESTABLISHED,
53   SIGNAL_GET_STREAM_BYTES_SENT,
54   NUM_SIGNALS
55 };
56 
57 static guint signals[NUM_SIGNALS];
58 
59 enum
60 {
61   PROP_0,
62 
63   PROP_GST_SCTP_ASSOCIATION_ID,
64   PROP_REMOTE_SCTP_PORT,
65   PROP_USE_SOCK_STREAM,
66 
67   NUM_PROPERTIES
68 };
69 
70 static GParamSpec *properties[NUM_PROPERTIES];
71 
72 #define DEFAULT_GST_SCTP_ASSOCIATION_ID 1
73 #define DEFAULT_REMOTE_SCTP_PORT 0
74 #define DEFAULT_GST_SCTP_ORDERED TRUE
75 #define DEFAULT_SCTP_PPID 1
76 #define DEFAULT_USE_SOCK_STREAM FALSE
77 
78 #define BUFFER_FULL_SLEEP_TIME 100000
79 
80 GType gst_sctp_enc_pad_get_type (void);
81 
82 #define GST_TYPE_SCTP_ENC_PAD (gst_sctp_enc_pad_get_type())
83 #define GST_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPad))
84 #define GST_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPadClass))
85 #define GST_IS_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_ENC_PAD))
86 #define GST_IS_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_ENC_PAD))
87 
88 typedef struct _GstSctpEncPad GstSctpEncPad;
89 typedef GstPadClass GstSctpEncPadClass;
90 
91 struct _GstSctpEncPad
92 {
93   GstPad parent;
94 
95   guint16 stream_id;
96   gboolean ordered;
97   guint32 ppid;
98   GstSctpAssociationPartialReliability reliability;
99   guint32 reliability_param;
100 
101   guint64 bytes_sent;
102 
103   GMutex lock;
104   GCond cond;
105   gboolean flushing;
106 };
107 
108 G_DEFINE_TYPE (GstSctpEncPad, gst_sctp_enc_pad, GST_TYPE_PAD);
109 
110 static void
gst_sctp_enc_pad_finalize(GObject * object)111 gst_sctp_enc_pad_finalize (GObject * object)
112 {
113   GstSctpEncPad *self = GST_SCTP_ENC_PAD (object);
114 
115   g_cond_clear (&self->cond);
116   g_mutex_clear (&self->lock);
117 
118   G_OBJECT_CLASS (gst_sctp_enc_pad_parent_class)->finalize (object);
119 }
120 
121 static void
gst_sctp_enc_pad_class_init(GstSctpEncPadClass * klass)122 gst_sctp_enc_pad_class_init (GstSctpEncPadClass * klass)
123 {
124   GObjectClass *gobject_class = (GObjectClass *) klass;
125 
126   gobject_class->finalize = gst_sctp_enc_pad_finalize;
127 }
128 
129 static void
gst_sctp_enc_pad_init(GstSctpEncPad * self)130 gst_sctp_enc_pad_init (GstSctpEncPad * self)
131 {
132   g_mutex_init (&self->lock);
133   g_cond_init (&self->cond);
134   self->flushing = FALSE;
135 }
136 
137 static void gst_sctp_enc_finalize (GObject * object);
138 static void gst_sctp_enc_set_property (GObject * object, guint prop_id,
139     const GValue * value, GParamSpec * pspec);
140 static void gst_sctp_enc_get_property (GObject * object, guint prop_id,
141     GValue * value, GParamSpec * pspec);
142 static GstStateChangeReturn gst_sctp_enc_change_state (GstElement * element,
143     GstStateChange transition);
144 static GstPad *gst_sctp_enc_request_new_pad (GstElement * element,
145     GstPadTemplate * template, const gchar * name, const GstCaps * caps);
146 static void gst_sctp_enc_release_pad (GstElement * element, GstPad * pad);
147 static void gst_sctp_enc_srcpad_loop (GstPad * pad);
148 static GstFlowReturn gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent,
149     GstBuffer * buffer);
150 static gboolean gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent,
151     GstEvent * event);
152 static gboolean gst_sctp_enc_src_event (GstPad * pad, GstObject * parent,
153     GstEvent * event);
154 static void on_sctp_association_state_changed (GstSctpAssociation *
155     sctp_association, GParamSpec * pspec, GstSctpEnc * self);
156 
157 static gboolean configure_association (GstSctpEnc * self);
158 static void on_sctp_packet_out (GstSctpAssociation * sctp_association,
159     const guint8 * buf, gsize length, gpointer user_data);
160 static void stop_srcpad_task (GstPad * pad, GstSctpEnc * self);
161 static void sctpenc_cleanup (GstSctpEnc * self);
162 static void get_config_from_caps (const GstCaps * caps, gboolean * ordered,
163     GstSctpAssociationPartialReliability * reliability,
164     guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available);
165 static guint64 on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id);
166 
167 static void
gst_sctp_enc_class_init(GstSctpEncClass * klass)168 gst_sctp_enc_class_init (GstSctpEncClass * klass)
169 {
170   GObjectClass *gobject_class;
171   GstElementClass *element_class;
172 
173   gobject_class = (GObjectClass *) klass;
174   element_class = (GstElementClass *) klass;
175 
176   GST_DEBUG_CATEGORY_INIT (gst_sctp_enc_debug_category,
177       "sctpenc", 0, "debug category for sctpenc element");
178 
179   gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
180       gst_static_pad_template_get (&src_template));
181   gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
182       gst_static_pad_template_get (&sink_template));
183 
184   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_sctp_enc_finalize);
185   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_set_property);
186   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_get_property);
187 
188   element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_enc_change_state);
189   element_class->request_new_pad =
190       GST_DEBUG_FUNCPTR (gst_sctp_enc_request_new_pad);
191   element_class->release_pad = GST_DEBUG_FUNCPTR (gst_sctp_enc_release_pad);
192 
193   properties[PROP_GST_SCTP_ASSOCIATION_ID] =
194       g_param_spec_uint ("sctp-association-id",
195       "SCTP Association ID",
196       "Every encoder/decoder pair should have the same, unique, sctp-association-id. "
197       "This value must be set before any pads are requested.",
198       0, G_MAXUINT, DEFAULT_GST_SCTP_ASSOCIATION_ID,
199       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
200 
201   properties[PROP_REMOTE_SCTP_PORT] =
202       g_param_spec_uint ("remote-sctp-port",
203       "Remote SCTP port",
204       "Sctp remote sctp port for the sctp association. The local port is configured via the "
205       "GstSctpDec element.",
206       0, G_MAXUSHORT, DEFAULT_REMOTE_SCTP_PORT,
207       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
208 
209   properties[PROP_USE_SOCK_STREAM] =
210       g_param_spec_boolean ("use-sock-stream",
211       "Use sock-stream",
212       "When set to TRUE, a sequenced, reliable, connection-based connection is used."
213       "When TRUE the partial reliability parameters of the channel are ignored.",
214       DEFAULT_USE_SOCK_STREAM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
215 
216   g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
217 
218   signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED] =
219       g_signal_new ("sctp-association-established",
220       G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST,
221       G_STRUCT_OFFSET (GstSctpEncClass, on_sctp_association_is_established),
222       NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_BOOLEAN);
223 
224   signals[SIGNAL_GET_STREAM_BYTES_SENT] = g_signal_new ("bytes-sent",
225       G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
226       G_STRUCT_OFFSET (GstSctpEncClass, on_get_stream_bytes_sent), NULL, NULL,
227       NULL, G_TYPE_UINT64, 1, G_TYPE_UINT);
228 
229   klass->on_get_stream_bytes_sent =
230       GST_DEBUG_FUNCPTR (on_get_stream_bytes_sent);
231 
232   gst_element_class_set_static_metadata (element_class,
233       "SCTP Encoder",
234       "Encoder/Network/SCTP",
235       "Encodes packets with SCTP",
236       "George Kiagiadakis <george.kiagiadakis@collabora.com>");
237 }
238 
239 static gboolean
data_queue_check_full_cb(GstDataQueue * queue,guint visible,guint bytes,guint64 time,gpointer user_data)240 data_queue_check_full_cb (GstDataQueue * queue, guint visible, guint bytes,
241     guint64 time, gpointer user_data)
242 {
243   /* TODO: When are we considered full? */
244   return FALSE;
245 }
246 
247 static void
data_queue_empty_cb(GstDataQueue * queue,gpointer user_data)248 data_queue_empty_cb (GstDataQueue * queue, gpointer user_data)
249 {
250 }
251 
252 static void
data_queue_full_cb(GstDataQueue * queue,gpointer user_data)253 data_queue_full_cb (GstDataQueue * queue, gpointer user_data)
254 {
255 }
256 
257 static void
gst_sctp_enc_init(GstSctpEnc * self)258 gst_sctp_enc_init (GstSctpEnc * self)
259 {
260   self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
261   self->remote_sctp_port = DEFAULT_REMOTE_SCTP_PORT;
262 
263   self->sctp_association = NULL;
264   self->outbound_sctp_packet_queue =
265       gst_data_queue_new (data_queue_check_full_cb, data_queue_full_cb,
266       data_queue_empty_cb, NULL);
267 
268   self->src_pad = gst_pad_new_from_static_template (&src_template, "src");
269   gst_pad_set_event_function (self->src_pad,
270       GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_enc_src_event));
271   gst_element_add_pad (GST_ELEMENT (self), self->src_pad);
272 
273   g_queue_init (&self->pending_pads);
274   self->src_ret = GST_FLOW_FLUSHING;
275 }
276 
277 static void
gst_sctp_enc_finalize(GObject * object)278 gst_sctp_enc_finalize (GObject * object)
279 {
280   GstSctpEnc *self = GST_SCTP_ENC (object);
281 
282   g_queue_clear (&self->pending_pads);
283   gst_object_unref (self->outbound_sctp_packet_queue);
284 
285   G_OBJECT_CLASS (parent_class)->finalize (object);
286 }
287 
288 static void
gst_sctp_enc_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)289 gst_sctp_enc_set_property (GObject * object, guint prop_id,
290     const GValue * value, GParamSpec * pspec)
291 {
292   GstSctpEnc *self = GST_SCTP_ENC (object);
293 
294   switch (prop_id) {
295     case PROP_GST_SCTP_ASSOCIATION_ID:
296       self->sctp_association_id = g_value_get_uint (value);
297       break;
298     case PROP_REMOTE_SCTP_PORT:
299       self->remote_sctp_port = g_value_get_uint (value);
300       break;
301     case PROP_USE_SOCK_STREAM:
302       self->use_sock_stream = g_value_get_boolean (value);
303       break;
304     default:
305       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
306       break;
307   }
308 }
309 
310 static void
gst_sctp_enc_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)311 gst_sctp_enc_get_property (GObject * object, guint prop_id, GValue * value,
312     GParamSpec * pspec)
313 {
314   GstSctpEnc *self = GST_SCTP_ENC (object);
315 
316   switch (prop_id) {
317     case PROP_GST_SCTP_ASSOCIATION_ID:
318       g_value_set_uint (value, self->sctp_association_id);
319       break;
320     case PROP_REMOTE_SCTP_PORT:
321       g_value_set_uint (value, self->remote_sctp_port);
322       break;
323     case PROP_USE_SOCK_STREAM:
324       g_value_set_boolean (value, self->use_sock_stream);
325       break;
326     default:
327       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
328       break;
329   }
330 }
331 
332 static GstStateChangeReturn
gst_sctp_enc_change_state(GstElement * element,GstStateChange transition)333 gst_sctp_enc_change_state (GstElement * element, GstStateChange transition)
334 {
335   GstSctpEnc *self = GST_SCTP_ENC (element);
336   GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE;
337   gboolean res = TRUE;
338 
339   switch (transition) {
340     case GST_STATE_CHANGE_NULL_TO_READY:
341       break;
342     case GST_STATE_CHANGE_READY_TO_PAUSED:
343       self->need_segment = self->need_stream_start_caps = TRUE;
344       self->src_ret = GST_FLOW_OK;
345       gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
346       res = configure_association (self);
347       break;
348     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
349       break;
350     case GST_STATE_CHANGE_PAUSED_TO_READY:
351       stop_srcpad_task (self->src_pad, self);
352       self->src_ret = GST_FLOW_FLUSHING;
353       break;
354     case GST_STATE_CHANGE_READY_TO_NULL:
355       break;
356     default:
357       break;
358   }
359 
360   if (res)
361     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
362 
363   switch (transition) {
364     case GST_STATE_CHANGE_NULL_TO_READY:
365       break;
366     case GST_STATE_CHANGE_READY_TO_PAUSED:
367       gst_pad_start_task (self->src_pad,
368           (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
369       break;
370     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
371       break;
372     case GST_STATE_CHANGE_PAUSED_TO_READY:
373       sctpenc_cleanup (self);
374       break;
375     case GST_STATE_CHANGE_READY_TO_NULL:
376       break;
377     default:
378       break;
379   }
380 
381   return ret;
382 }
383 
384 static GstPad *
gst_sctp_enc_request_new_pad(GstElement * element,GstPadTemplate * template,const gchar * new_pad_name,const GstCaps * caps)385 gst_sctp_enc_request_new_pad (GstElement * element, GstPadTemplate * template,
386     const gchar * new_pad_name, const GstCaps * caps)
387 {
388   GstSctpEnc *self = GST_SCTP_ENC (element);
389   GstPad *new_pad = NULL;
390   GstSctpEncPad *sctpenc_pad;
391   guint32 stream_id;
392   gint state;
393   guint32 new_ppid;
394   gboolean is_new_ppid;
395 
396   g_object_get (self->sctp_association, "state", &state, NULL);
397 
398   if (state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
399     GST_ERROR_OBJECT
400         (self,
401         "The SCTP association must be established before a new stream can be created");
402     goto invalid_state;
403   }
404 
405   if (!template)
406     goto invalid_parameter;
407 
408   /* 65535 is not a valid stream id */
409   if (!new_pad_name || (sscanf (new_pad_name, "sink_%u", &stream_id) != 1)
410       || stream_id > 65534) {
411     GST_ERROR_OBJECT
412         (self, "Invalid sink pad name %s", GST_STR_NULL (new_pad_name));
413     goto invalid_parameter;
414   }
415 
416   new_pad = gst_element_get_static_pad (element, new_pad_name);
417   if (new_pad) {
418     gst_object_unref (new_pad);
419     new_pad = NULL;
420     GST_ERROR_OBJECT (self, "Pad %s already exists", new_pad_name);
421     goto invalid_parameter;
422   }
423 
424   GST_DEBUG_OBJECT (self, "Creating new pad %s", new_pad_name);
425   new_pad =
426       g_object_new (GST_TYPE_SCTP_ENC_PAD, "name", new_pad_name, "direction",
427       template->direction, "template", template, NULL);
428   gst_pad_set_chain_function (new_pad,
429       GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_chain));
430   gst_pad_set_event_function (new_pad,
431       GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_event));
432 
433   sctpenc_pad = GST_SCTP_ENC_PAD (new_pad);
434   sctpenc_pad->stream_id = stream_id;
435   sctpenc_pad->ppid = DEFAULT_SCTP_PPID;
436 
437   if (caps) {
438     GST_DEBUG_OBJECT (self, "Pad %s requested with caps %" GST_PTR_FORMAT,
439         new_pad_name, caps);
440     get_config_from_caps (caps, &sctpenc_pad->ordered,
441         &sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
442         &is_new_ppid);
443 
444     if (is_new_ppid)
445       sctpenc_pad->ppid = new_ppid;
446   }
447 
448   sctpenc_pad->flushing = FALSE;
449 
450   if (!gst_pad_set_active (new_pad, TRUE))
451     goto error_cleanup;
452 
453   if (!gst_element_add_pad (element, new_pad))
454     goto error_add_pad;
455 
456 invalid_state:
457 invalid_parameter:
458   return new_pad;
459 error_add_pad:
460   gst_pad_set_active (new_pad, FALSE);
461 error_cleanup:
462   gst_object_unref (new_pad);
463   return NULL;
464 }
465 
466 static void
gst_sctp_enc_release_pad(GstElement * element,GstPad * pad)467 gst_sctp_enc_release_pad (GstElement * element, GstPad * pad)
468 {
469   GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
470   GstSctpEnc *self;
471   guint stream_id = 0;
472 
473   self = GST_SCTP_ENC (element);
474 
475   g_mutex_lock (&sctpenc_pad->lock);
476   sctpenc_pad->flushing = TRUE;
477   g_cond_signal (&sctpenc_pad->cond);
478   g_mutex_unlock (&sctpenc_pad->lock);
479 
480   stream_id = sctpenc_pad->stream_id;
481   gst_pad_set_active (pad, FALSE);
482 
483   if (self->sctp_association)
484     gst_sctp_association_reset_stream (self->sctp_association, stream_id);
485 
486   GST_PAD_STREAM_LOCK (pad);
487   if (gst_object_has_as_parent (GST_OBJECT (pad), GST_OBJECT (element)))
488     gst_element_remove_pad (element, pad);
489   GST_PAD_STREAM_UNLOCK (pad);
490 }
491 
492 static void
gst_sctp_enc_srcpad_loop(GstPad * pad)493 gst_sctp_enc_srcpad_loop (GstPad * pad)
494 {
495   GstSctpEnc *self = GST_SCTP_ENC (GST_PAD_PARENT (pad));
496   GstFlowReturn flow_ret;
497   GstDataQueueItem *item;
498 
499   if (self->need_stream_start_caps) {
500     gchar s_id[32];
501     GstCaps *caps;
502 
503     g_snprintf (s_id, sizeof (s_id), "sctpenc-%08x", g_random_int ());
504     gst_pad_push_event (self->src_pad, gst_event_new_stream_start (s_id));
505 
506     caps = gst_caps_new_empty_simple ("application/x-sctp");
507     gst_pad_set_caps (self->src_pad, caps);
508     gst_caps_unref (caps);
509 
510     self->need_stream_start_caps = FALSE;
511   }
512 
513   if (self->need_segment) {
514     GstSegment segment;
515 
516     gst_segment_init (&segment, GST_FORMAT_BYTES);
517     gst_pad_push_event (self->src_pad, gst_event_new_segment (&segment));
518 
519     self->need_segment = FALSE;
520   }
521 
522   if (gst_data_queue_pop (self->outbound_sctp_packet_queue, &item)) {
523     GstBuffer *buffer = GST_BUFFER (item->object);
524 
525     GST_DEBUG_OBJECT (self, "Forwarding buffer %" GST_PTR_FORMAT, buffer);
526 
527     flow_ret = gst_pad_push (self->src_pad, buffer);
528     item->object = NULL;
529 
530     GST_OBJECT_LOCK (self);
531     self->src_ret = flow_ret;
532     GST_OBJECT_UNLOCK (self);
533 
534     if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
535             || flow_ret == GST_FLOW_NOT_LINKED)) {
536       GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
537           gst_flow_get_name (flow_ret));
538     } else if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
539       GST_ERROR_OBJECT (pad, "Push failed on packet source pad. Error: %s",
540           gst_flow_get_name (flow_ret));
541     }
542 
543     if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
544       GST_DEBUG_OBJECT (pad, "Pausing task because of an error");
545       gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
546       gst_data_queue_flush (self->outbound_sctp_packet_queue);
547       gst_pad_pause_task (pad);
548     }
549 
550     item->destroy (item);
551   } else {
552     GST_OBJECT_LOCK (self);
553     self->src_ret = GST_FLOW_FLUSHING;
554     GST_OBJECT_UNLOCK (self);
555 
556     GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
557     gst_pad_pause_task (pad);
558   }
559 }
560 
561 static GstFlowReturn
gst_sctp_enc_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)562 gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
563 {
564   GstSctpEnc *self = GST_SCTP_ENC (parent);
565   GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
566   GstMapInfo map;
567   guint32 ppid;
568   gboolean ordered;
569   GstSctpAssociationPartialReliability pr;
570   guint32 pr_param;
571   gpointer state = NULL;
572   GstMeta *meta;
573   const GstMetaInfo *meta_info = GST_SCTP_SEND_META_INFO;
574   GstFlowReturn flow_ret = GST_FLOW_ERROR;
575   const guint8 *data;
576   guint32 length;
577 
578   GST_OBJECT_LOCK (self);
579   if (self->src_ret != GST_FLOW_OK) {
580     GST_ERROR_OBJECT (pad, "Pushing on source pad failed before: %s",
581         gst_flow_get_name (self->src_ret));
582     flow_ret = self->src_ret;
583     GST_OBJECT_UNLOCK (self);
584     gst_buffer_unref (buffer);
585     return flow_ret;
586   }
587   GST_OBJECT_UNLOCK (self);
588 
589   ppid = sctpenc_pad->ppid;
590   ordered = sctpenc_pad->ordered;
591   pr = sctpenc_pad->reliability;
592   pr_param = sctpenc_pad->reliability_param;
593 
594   while ((meta = gst_buffer_iterate_meta (buffer, &state))) {
595     if (meta->info->api == meta_info->api) {
596       GstSctpSendMeta *sctp_send_meta = (GstSctpSendMeta *) meta;
597 
598       ppid = sctp_send_meta->ppid;
599       ordered = sctp_send_meta->ordered;
600       pr_param = sctp_send_meta->pr_param;
601       switch (sctp_send_meta->pr) {
602         case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE:
603           pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
604           break;
605         case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX:
606           pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
607           break;
608         case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_BUF:
609           pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
610           break;
611         case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL:
612           pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
613           break;
614       }
615       break;
616     }
617   }
618 
619   GST_DEBUG_OBJECT (pad,
620       "Sending buffer %" GST_PTR_FORMAT
621       " with ppid %u ordered %d pr %d pr_param %u", buffer, ppid, ordered, pr,
622       pr_param);
623 
624   if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) {
625     GST_ERROR_OBJECT (pad, "Could not map GstBuffer");
626     goto error;
627   }
628 
629   data = map.data;
630   length = map.size;
631 
632   g_mutex_lock (&sctpenc_pad->lock);
633   while (!sctpenc_pad->flushing) {
634     guint32 bytes_sent;
635 
636     g_mutex_unlock (&sctpenc_pad->lock);
637 
638     flow_ret =
639         gst_sctp_association_send_data (self->sctp_association, data,
640         length, sctpenc_pad->stream_id, ppid, ordered, pr, pr_param,
641         &bytes_sent);
642 
643     g_mutex_lock (&sctpenc_pad->lock);
644     if (flow_ret != GST_FLOW_OK) {
645       if (flow_ret != GST_FLOW_EOS) {
646         GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL),
647             ("Failed to send data"));
648       }
649       goto out;
650     } else if (bytes_sent < length && !sctpenc_pad->flushing) {
651       gint64 end_time = g_get_monotonic_time () + BUFFER_FULL_SLEEP_TIME;
652 
653       GST_TRACE_OBJECT (pad, "Sent only %u of %u remaining bytes, waiting",
654           bytes_sent, length);
655 
656       sctpenc_pad->bytes_sent += bytes_sent;
657       data += bytes_sent;
658       length -= bytes_sent;
659 
660       /* The buffer was probably full. Retry in a while */
661       GST_OBJECT_LOCK (self);
662       g_queue_push_tail (&self->pending_pads, sctpenc_pad);
663       GST_OBJECT_UNLOCK (self);
664 
665       g_cond_wait_until (&sctpenc_pad->cond, &sctpenc_pad->lock, end_time);
666 
667       GST_OBJECT_LOCK (self);
668       g_queue_remove (&self->pending_pads, sctpenc_pad);
669       GST_OBJECT_UNLOCK (self);
670     } else if (bytes_sent == length) {
671       GST_DEBUG_OBJECT (pad, "Successfully sent buffer");
672       sctpenc_pad->bytes_sent += bytes_sent;
673       break;
674     }
675   }
676   flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK;
677 
678 out:
679   g_mutex_unlock (&sctpenc_pad->lock);
680 
681   gst_buffer_unmap (buffer, &map);
682 error:
683   gst_buffer_unref (buffer);
684   return flow_ret;
685 }
686 
687 static gboolean
gst_sctp_enc_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)688 gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
689 {
690   GstSctpEnc *self = GST_SCTP_ENC (parent);
691   GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
692   gboolean ret, is_new_ppid;
693   guint32 new_ppid;
694 
695   switch (GST_EVENT_TYPE (event)) {
696     case GST_EVENT_CAPS:{
697       GstCaps *caps;
698 
699       gst_event_parse_caps (event, &caps);
700       GST_DEBUG_OBJECT (pad, "Received new caps %" GST_PTR_FORMAT, caps);
701       get_config_from_caps (caps, &sctpenc_pad->ordered,
702           &sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
703           &is_new_ppid);
704       if (is_new_ppid)
705         sctpenc_pad->ppid = new_ppid;
706       gst_event_unref (event);
707       ret = TRUE;
708       break;
709     }
710     case GST_EVENT_STREAM_START:
711     case GST_EVENT_SEGMENT:
712       /* Drop these, we create our own */
713       ret = TRUE;
714       gst_event_unref (event);
715       break;
716     case GST_EVENT_EOS:
717       /* Drop this, we're never EOS until shut down */
718       ret = TRUE;
719       gst_event_unref (event);
720       break;
721     case GST_EVENT_FLUSH_START:
722       g_mutex_lock (&sctpenc_pad->lock);
723       sctpenc_pad->flushing = TRUE;
724       g_cond_signal (&sctpenc_pad->cond);
725       g_mutex_unlock (&sctpenc_pad->lock);
726 
727       ret = gst_pad_event_default (pad, parent, event);
728       break;
729     case GST_EVENT_FLUSH_STOP:
730       sctpenc_pad->flushing = FALSE;
731       GST_OBJECT_LOCK (self);
732       self->src_ret = GST_FLOW_OK;
733       GST_OBJECT_UNLOCK (self);
734       ret = gst_pad_event_default (pad, parent, event);
735       break;
736     default:
737       ret = gst_pad_event_default (pad, parent, event);
738       break;
739   }
740   return ret;
741 }
742 
743 static void
flush_sinkpad(const GValue * item,gpointer user_data)744 flush_sinkpad (const GValue * item, gpointer user_data)
745 {
746   GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
747   gboolean flush = GPOINTER_TO_INT (user_data);
748 
749   if (flush) {
750     g_mutex_lock (&sctpenc_pad->lock);
751     sctpenc_pad->flushing = TRUE;
752     g_cond_signal (&sctpenc_pad->cond);
753     g_mutex_unlock (&sctpenc_pad->lock);
754   } else {
755     sctpenc_pad->flushing = FALSE;
756   }
757 }
758 
759 static gboolean
gst_sctp_enc_src_event(GstPad * pad,GstObject * parent,GstEvent * event)760 gst_sctp_enc_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
761 {
762   GstSctpEnc *self = GST_SCTP_ENC (parent);
763   gboolean ret;
764 
765   switch (GST_EVENT_TYPE (event)) {
766     case GST_EVENT_FLUSH_START:{
767       GstIterator *it;
768 
769       gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
770       gst_data_queue_flush (self->outbound_sctp_packet_queue);
771 
772       it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
773       while (gst_iterator_foreach (it, flush_sinkpad,
774               GINT_TO_POINTER (TRUE)) == GST_ITERATOR_RESYNC)
775         gst_iterator_resync (it);
776       gst_iterator_free (it);
777 
778       ret = gst_pad_event_default (pad, parent, event);
779       break;
780     }
781     case GST_EVENT_RECONFIGURE:
782     case GST_EVENT_FLUSH_STOP:{
783       GstIterator *it;
784 
785       it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
786       while (gst_iterator_foreach (it, flush_sinkpad,
787               GINT_TO_POINTER (FALSE)) == GST_ITERATOR_RESYNC)
788         gst_iterator_resync (it);
789       gst_iterator_free (it);
790 
791       gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
792       self->need_segment = TRUE;
793       GST_OBJECT_LOCK (self);
794       self->src_ret = GST_FLOW_OK;
795       GST_OBJECT_UNLOCK (self);
796       gst_pad_start_task (self->src_pad,
797           (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
798 
799       ret = gst_pad_event_default (pad, parent, event);
800       break;
801     }
802     default:
803       ret = gst_pad_event_default (pad, parent, event);
804       break;
805   }
806   return ret;
807 }
808 
809 static gboolean
configure_association(GstSctpEnc * self)810 configure_association (GstSctpEnc * self)
811 {
812   gint state;
813 
814   self->sctp_association = gst_sctp_association_get (self->sctp_association_id);
815 
816   g_object_get (self->sctp_association, "state", &state, NULL);
817 
818   if (state != GST_SCTP_ASSOCIATION_STATE_NEW) {
819     GST_WARNING_OBJECT (self,
820         "Could not configure SCTP association. Association already in use!");
821     g_object_unref (self->sctp_association);
822     self->sctp_association = NULL;
823     goto error;
824   }
825 
826   self->signal_handler_state_changed =
827       g_signal_connect_object (self->sctp_association, "notify::state",
828       G_CALLBACK (on_sctp_association_state_changed), self, 0);
829 
830   g_object_bind_property (self, "remote-sctp-port", self->sctp_association,
831       "remote-port", G_BINDING_SYNC_CREATE);
832 
833   g_object_bind_property (self, "use-sock-stream", self->sctp_association,
834       "use-sock-stream", G_BINDING_SYNC_CREATE);
835 
836   gst_sctp_association_set_on_packet_out (self->sctp_association,
837       on_sctp_packet_out, gst_object_ref (self), gst_object_unref);
838 
839   return TRUE;
840 error:
841   return FALSE;
842 }
843 
844 static void
on_sctp_association_state_changed(GstSctpAssociation * sctp_association,GParamSpec * pspec,GstSctpEnc * self)845 on_sctp_association_state_changed (GstSctpAssociation * sctp_association,
846     GParamSpec * pspec, GstSctpEnc * self)
847 {
848   gint state;
849 
850   g_object_get (sctp_association, "state", &state, NULL);
851 
852   GST_DEBUG_OBJECT (self, "Association state changed to %d", state);
853 
854   switch (state) {
855     case GST_SCTP_ASSOCIATION_STATE_NEW:
856       break;
857     case GST_SCTP_ASSOCIATION_STATE_READY:
858       gst_sctp_association_start (sctp_association);
859       break;
860     case GST_SCTP_ASSOCIATION_STATE_CONNECTING:
861       break;
862     case GST_SCTP_ASSOCIATION_STATE_CONNECTED:
863       g_signal_emit (self, signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED], 0,
864           TRUE);
865       break;
866     case GST_SCTP_ASSOCIATION_STATE_DISCONNECTING:
867     case GST_SCTP_ASSOCIATION_STATE_DISCONNECTED:
868       g_signal_emit (self, signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED], 0,
869           FALSE);
870       break;
871     case GST_SCTP_ASSOCIATION_STATE_ERROR:
872       GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL),
873           ("SCTP association went into error state"));
874       break;
875   }
876 }
877 
878 static void
data_queue_item_free(GstDataQueueItem * item)879 data_queue_item_free (GstDataQueueItem * item)
880 {
881   if (item->object)
882     gst_mini_object_unref (item->object);
883   g_free (item);
884 }
885 
886 static void
on_sctp_packet_out(GstSctpAssociation * _association,const guint8 * buf,gsize length,gpointer user_data)887 on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf,
888     gsize length, gpointer user_data)
889 {
890   GstSctpEnc *self = user_data;
891   GstBuffer *gstbuf;
892   GstDataQueueItem *item;
893   GList *pending_pads, *l;
894   GstSctpEncPad *sctpenc_pad;
895 
896   GST_DEBUG_OBJECT (self, "Received output packet of size %" G_GSIZE_FORMAT,
897       length);
898 
899   gstbuf = gst_buffer_new_memdup (buf, length);
900 
901   item = g_new0 (GstDataQueueItem, 1);
902   item->object = GST_MINI_OBJECT (gstbuf);
903   item->size = length;
904   item->visible = TRUE;
905   item->destroy = (GDestroyNotify) data_queue_item_free;
906 
907   if (!gst_data_queue_push (self->outbound_sctp_packet_queue, item)) {
908     item->destroy (item);
909     GST_DEBUG_OBJECT (self, "Failed to push item because we're flushing");
910   }
911 
912   /* Wake up pads in the order they waited, oldest pad first */
913   GST_OBJECT_LOCK (self);
914   pending_pads = NULL;
915   while ((sctpenc_pad = g_queue_pop_tail (&self->pending_pads))) {
916     pending_pads = g_list_prepend (pending_pads, sctpenc_pad);
917   }
918   GST_OBJECT_UNLOCK (self);
919 
920   for (l = pending_pads; l; l = l->next) {
921     sctpenc_pad = l->data;
922     g_mutex_lock (&sctpenc_pad->lock);
923     g_cond_signal (&sctpenc_pad->cond);
924     g_mutex_unlock (&sctpenc_pad->lock);
925   }
926   g_list_free (pending_pads);
927 }
928 
929 static void
stop_srcpad_task(GstPad * pad,GstSctpEnc * self)930 stop_srcpad_task (GstPad * pad, GstSctpEnc * self)
931 {
932   gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
933   gst_data_queue_flush (self->outbound_sctp_packet_queue);
934   gst_pad_stop_task (pad);
935 }
936 
937 static void
remove_sinkpad(const GValue * item,gpointer user_data)938 remove_sinkpad (const GValue * item, gpointer user_data)
939 {
940   GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
941   GstSctpEnc *self = user_data;
942 
943   gst_sctp_enc_release_pad (GST_ELEMENT (self), GST_PAD (sctpenc_pad));
944 }
945 
946 static void
sctpenc_cleanup(GstSctpEnc * self)947 sctpenc_cleanup (GstSctpEnc * self)
948 {
949   GstIterator *it;
950 
951   gst_sctp_association_set_on_packet_out (self->sctp_association, NULL, NULL,
952       NULL);
953 
954   g_signal_handler_disconnect (self->sctp_association,
955       self->signal_handler_state_changed);
956   stop_srcpad_task (self->src_pad, self);
957   gst_sctp_association_force_close (self->sctp_association);
958   g_object_unref (self->sctp_association);
959   self->sctp_association = NULL;
960 
961   it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
962   while (gst_iterator_foreach (it, remove_sinkpad, self) == GST_ITERATOR_RESYNC)
963     gst_iterator_resync (it);
964   gst_iterator_free (it);
965   g_queue_clear (&self->pending_pads);
966 }
967 
968 static void
get_config_from_caps(const GstCaps * caps,gboolean * ordered,GstSctpAssociationPartialReliability * reliability,guint32 * reliability_param,guint32 * ppid,gboolean * ppid_available)969 get_config_from_caps (const GstCaps * caps, gboolean * ordered,
970     GstSctpAssociationPartialReliability * reliability,
971     guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available)
972 {
973   GstStructure *s;
974   guint i, n;
975 
976   *ordered = TRUE;
977   *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
978   *reliability_param = 0;
979   *ppid_available = FALSE;
980 
981   n = gst_caps_get_size (caps);
982   for (i = 0; i < n; i++) {
983     s = gst_caps_get_structure (caps, i);
984     if (gst_structure_has_field (s, "ordered")) {
985       const GValue *v = gst_structure_get_value (s, "ordered");
986       *ordered = g_value_get_boolean (v);
987     }
988     if (gst_structure_has_field (s, "partially-reliability")) {
989       const GValue *v = gst_structure_get_value (s, "partially-reliability");
990       const gchar *reliability_string = g_value_get_string (v);
991 
992       if (!g_strcmp0 (reliability_string, "none"))
993         *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
994       else if (!g_strcmp0 (reliability_string, "ttl"))
995         *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
996       else if (!g_strcmp0 (reliability_string, "buf"))
997         *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
998       else if (!g_strcmp0 (reliability_string, "rtx"))
999         *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
1000     }
1001     if (gst_structure_has_field (s, "reliability-parameter")) {
1002       const GValue *v = gst_structure_get_value (s, "reliability-parameter");
1003       *reliability_param = g_value_get_uint (v);
1004     }
1005     if (gst_structure_has_field (s, "ppid")) {
1006       const GValue *v = gst_structure_get_value (s, "ppid");
1007       *ppid = g_value_get_uint (v);
1008       *ppid_available = TRUE;
1009     }
1010   }
1011 }
1012 
1013 static guint64
on_get_stream_bytes_sent(GstSctpEnc * self,guint stream_id)1014 on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id)
1015 {
1016   gchar *pad_name;
1017   GstPad *pad;
1018   GstSctpEncPad *sctpenc_pad;
1019   guint64 bytes_sent;
1020 
1021   pad_name = g_strdup_printf ("sink_%u", stream_id);
1022   pad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
1023   g_free (pad_name);
1024 
1025   if (!pad) {
1026     GST_DEBUG_OBJECT (self,
1027         "Buffered amount requested on a stream that does not exist!");
1028     return 0;
1029   }
1030 
1031   sctpenc_pad = GST_SCTP_ENC_PAD (pad);
1032 
1033   g_mutex_lock (&sctpenc_pad->lock);
1034   bytes_sent = sctpenc_pad->bytes_sent;
1035   g_mutex_unlock (&sctpenc_pad->lock);
1036 
1037   gst_object_unref (sctpenc_pad);
1038 
1039   return bytes_sent;
1040 }
1041