1 /*
2 * Copyright (c) 2015, Collabora Ltd.
3 *
4 * Redistribution and use in source and binary forms, with or without modification,
5 * are permitted provided that the following conditions are met:
6 *
7 * 1. Redistributions of source code must retain the above copyright notice, this
8 * list of conditions and the following disclaimer.
9 *
10 * 2. Redistributions in binary form must reproduce the above copyright notice, this
11 * list of conditions and the following disclaimer in the documentation and/or other
12 * materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
17 * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
18 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
19 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
21 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
23 * OF SUCH DAMAGE.
24 */
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29 #include "gstsctpenc.h"
30
31 #include <gst/sctp/sctpsendmeta.h>
32 #include <stdio.h>
33
34 GST_DEBUG_CATEGORY_STATIC (gst_sctp_enc_debug_category);
35 #define GST_CAT_DEFAULT gst_sctp_enc_debug_category
36
37 #define gst_sctp_enc_parent_class parent_class
38 G_DEFINE_TYPE (GstSctpEnc, gst_sctp_enc, GST_TYPE_ELEMENT);
39 GST_ELEMENT_REGISTER_DEFINE (sctpenc, "sctpenc", GST_RANK_NONE,
40 GST_TYPE_SCTP_ENC);
41
42 static GstStaticPadTemplate sink_template =
43 GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK,
44 GST_PAD_REQUEST, GST_STATIC_CAPS_ANY);
45
46 static GstStaticPadTemplate src_template =
47 GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC,
48 GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-sctp"));
49
50 enum
51 {
52 SIGNAL_SCTP_ASSOCIATION_ESTABLISHED,
53 SIGNAL_GET_STREAM_BYTES_SENT,
54 NUM_SIGNALS
55 };
56
57 static guint signals[NUM_SIGNALS];
58
59 enum
60 {
61 PROP_0,
62
63 PROP_GST_SCTP_ASSOCIATION_ID,
64 PROP_REMOTE_SCTP_PORT,
65 PROP_USE_SOCK_STREAM,
66
67 NUM_PROPERTIES
68 };
69
70 static GParamSpec *properties[NUM_PROPERTIES];
71
72 #define DEFAULT_GST_SCTP_ASSOCIATION_ID 1
73 #define DEFAULT_REMOTE_SCTP_PORT 0
74 #define DEFAULT_GST_SCTP_ORDERED TRUE
75 #define DEFAULT_SCTP_PPID 1
76 #define DEFAULT_USE_SOCK_STREAM FALSE
77
78 #define BUFFER_FULL_SLEEP_TIME 100000
79
80 GType gst_sctp_enc_pad_get_type (void);
81
82 #define GST_TYPE_SCTP_ENC_PAD (gst_sctp_enc_pad_get_type())
83 #define GST_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPad))
84 #define GST_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPadClass))
85 #define GST_IS_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_ENC_PAD))
86 #define GST_IS_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_ENC_PAD))
87
88 typedef struct _GstSctpEncPad GstSctpEncPad;
89 typedef GstPadClass GstSctpEncPadClass;
90
91 struct _GstSctpEncPad
92 {
93 GstPad parent;
94
95 guint16 stream_id;
96 gboolean ordered;
97 guint32 ppid;
98 GstSctpAssociationPartialReliability reliability;
99 guint32 reliability_param;
100
101 guint64 bytes_sent;
102
103 GMutex lock;
104 GCond cond;
105 gboolean flushing;
106 };
107
108 G_DEFINE_TYPE (GstSctpEncPad, gst_sctp_enc_pad, GST_TYPE_PAD);
109
110 static void
gst_sctp_enc_pad_finalize(GObject * object)111 gst_sctp_enc_pad_finalize (GObject * object)
112 {
113 GstSctpEncPad *self = GST_SCTP_ENC_PAD (object);
114
115 g_cond_clear (&self->cond);
116 g_mutex_clear (&self->lock);
117
118 G_OBJECT_CLASS (gst_sctp_enc_pad_parent_class)->finalize (object);
119 }
120
121 static void
gst_sctp_enc_pad_class_init(GstSctpEncPadClass * klass)122 gst_sctp_enc_pad_class_init (GstSctpEncPadClass * klass)
123 {
124 GObjectClass *gobject_class = (GObjectClass *) klass;
125
126 gobject_class->finalize = gst_sctp_enc_pad_finalize;
127 }
128
129 static void
gst_sctp_enc_pad_init(GstSctpEncPad * self)130 gst_sctp_enc_pad_init (GstSctpEncPad * self)
131 {
132 g_mutex_init (&self->lock);
133 g_cond_init (&self->cond);
134 self->flushing = FALSE;
135 }
136
137 static void gst_sctp_enc_finalize (GObject * object);
138 static void gst_sctp_enc_set_property (GObject * object, guint prop_id,
139 const GValue * value, GParamSpec * pspec);
140 static void gst_sctp_enc_get_property (GObject * object, guint prop_id,
141 GValue * value, GParamSpec * pspec);
142 static GstStateChangeReturn gst_sctp_enc_change_state (GstElement * element,
143 GstStateChange transition);
144 static GstPad *gst_sctp_enc_request_new_pad (GstElement * element,
145 GstPadTemplate * template, const gchar * name, const GstCaps * caps);
146 static void gst_sctp_enc_release_pad (GstElement * element, GstPad * pad);
147 static void gst_sctp_enc_srcpad_loop (GstPad * pad);
148 static GstFlowReturn gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent,
149 GstBuffer * buffer);
150 static gboolean gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent,
151 GstEvent * event);
152 static gboolean gst_sctp_enc_src_event (GstPad * pad, GstObject * parent,
153 GstEvent * event);
154 static void on_sctp_association_state_changed (GstSctpAssociation *
155 sctp_association, GParamSpec * pspec, GstSctpEnc * self);
156
157 static gboolean configure_association (GstSctpEnc * self);
158 static void on_sctp_packet_out (GstSctpAssociation * sctp_association,
159 const guint8 * buf, gsize length, gpointer user_data);
160 static void stop_srcpad_task (GstPad * pad, GstSctpEnc * self);
161 static void sctpenc_cleanup (GstSctpEnc * self);
162 static void get_config_from_caps (const GstCaps * caps, gboolean * ordered,
163 GstSctpAssociationPartialReliability * reliability,
164 guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available);
165 static guint64 on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id);
166
167 static void
gst_sctp_enc_class_init(GstSctpEncClass * klass)168 gst_sctp_enc_class_init (GstSctpEncClass * klass)
169 {
170 GObjectClass *gobject_class;
171 GstElementClass *element_class;
172
173 gobject_class = (GObjectClass *) klass;
174 element_class = (GstElementClass *) klass;
175
176 GST_DEBUG_CATEGORY_INIT (gst_sctp_enc_debug_category,
177 "sctpenc", 0, "debug category for sctpenc element");
178
179 gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
180 gst_static_pad_template_get (&src_template));
181 gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
182 gst_static_pad_template_get (&sink_template));
183
184 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_sctp_enc_finalize);
185 gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_set_property);
186 gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_get_property);
187
188 element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_enc_change_state);
189 element_class->request_new_pad =
190 GST_DEBUG_FUNCPTR (gst_sctp_enc_request_new_pad);
191 element_class->release_pad = GST_DEBUG_FUNCPTR (gst_sctp_enc_release_pad);
192
193 properties[PROP_GST_SCTP_ASSOCIATION_ID] =
194 g_param_spec_uint ("sctp-association-id",
195 "SCTP Association ID",
196 "Every encoder/decoder pair should have the same, unique, sctp-association-id. "
197 "This value must be set before any pads are requested.",
198 0, G_MAXUINT, DEFAULT_GST_SCTP_ASSOCIATION_ID,
199 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
200
201 properties[PROP_REMOTE_SCTP_PORT] =
202 g_param_spec_uint ("remote-sctp-port",
203 "Remote SCTP port",
204 "Sctp remote sctp port for the sctp association. The local port is configured via the "
205 "GstSctpDec element.",
206 0, G_MAXUSHORT, DEFAULT_REMOTE_SCTP_PORT,
207 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
208
209 properties[PROP_USE_SOCK_STREAM] =
210 g_param_spec_boolean ("use-sock-stream",
211 "Use sock-stream",
212 "When set to TRUE, a sequenced, reliable, connection-based connection is used."
213 "When TRUE the partial reliability parameters of the channel are ignored.",
214 DEFAULT_USE_SOCK_STREAM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
215
216 g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
217
218 signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED] =
219 g_signal_new ("sctp-association-established",
220 G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST,
221 G_STRUCT_OFFSET (GstSctpEncClass, on_sctp_association_is_established),
222 NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_BOOLEAN);
223
224 signals[SIGNAL_GET_STREAM_BYTES_SENT] = g_signal_new ("bytes-sent",
225 G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
226 G_STRUCT_OFFSET (GstSctpEncClass, on_get_stream_bytes_sent), NULL, NULL,
227 NULL, G_TYPE_UINT64, 1, G_TYPE_UINT);
228
229 klass->on_get_stream_bytes_sent =
230 GST_DEBUG_FUNCPTR (on_get_stream_bytes_sent);
231
232 gst_element_class_set_static_metadata (element_class,
233 "SCTP Encoder",
234 "Encoder/Network/SCTP",
235 "Encodes packets with SCTP",
236 "George Kiagiadakis <george.kiagiadakis@collabora.com>");
237 }
238
239 static gboolean
data_queue_check_full_cb(GstDataQueue * queue,guint visible,guint bytes,guint64 time,gpointer user_data)240 data_queue_check_full_cb (GstDataQueue * queue, guint visible, guint bytes,
241 guint64 time, gpointer user_data)
242 {
243 /* TODO: When are we considered full? */
244 return FALSE;
245 }
246
247 static void
data_queue_empty_cb(GstDataQueue * queue,gpointer user_data)248 data_queue_empty_cb (GstDataQueue * queue, gpointer user_data)
249 {
250 }
251
252 static void
data_queue_full_cb(GstDataQueue * queue,gpointer user_data)253 data_queue_full_cb (GstDataQueue * queue, gpointer user_data)
254 {
255 }
256
257 static void
gst_sctp_enc_init(GstSctpEnc * self)258 gst_sctp_enc_init (GstSctpEnc * self)
259 {
260 self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
261 self->remote_sctp_port = DEFAULT_REMOTE_SCTP_PORT;
262
263 self->sctp_association = NULL;
264 self->outbound_sctp_packet_queue =
265 gst_data_queue_new (data_queue_check_full_cb, data_queue_full_cb,
266 data_queue_empty_cb, NULL);
267
268 self->src_pad = gst_pad_new_from_static_template (&src_template, "src");
269 gst_pad_set_event_function (self->src_pad,
270 GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_enc_src_event));
271 gst_element_add_pad (GST_ELEMENT (self), self->src_pad);
272
273 g_queue_init (&self->pending_pads);
274 self->src_ret = GST_FLOW_FLUSHING;
275 }
276
277 static void
gst_sctp_enc_finalize(GObject * object)278 gst_sctp_enc_finalize (GObject * object)
279 {
280 GstSctpEnc *self = GST_SCTP_ENC (object);
281
282 g_queue_clear (&self->pending_pads);
283 gst_object_unref (self->outbound_sctp_packet_queue);
284
285 G_OBJECT_CLASS (parent_class)->finalize (object);
286 }
287
288 static void
gst_sctp_enc_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)289 gst_sctp_enc_set_property (GObject * object, guint prop_id,
290 const GValue * value, GParamSpec * pspec)
291 {
292 GstSctpEnc *self = GST_SCTP_ENC (object);
293
294 switch (prop_id) {
295 case PROP_GST_SCTP_ASSOCIATION_ID:
296 self->sctp_association_id = g_value_get_uint (value);
297 break;
298 case PROP_REMOTE_SCTP_PORT:
299 self->remote_sctp_port = g_value_get_uint (value);
300 break;
301 case PROP_USE_SOCK_STREAM:
302 self->use_sock_stream = g_value_get_boolean (value);
303 break;
304 default:
305 G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
306 break;
307 }
308 }
309
310 static void
gst_sctp_enc_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)311 gst_sctp_enc_get_property (GObject * object, guint prop_id, GValue * value,
312 GParamSpec * pspec)
313 {
314 GstSctpEnc *self = GST_SCTP_ENC (object);
315
316 switch (prop_id) {
317 case PROP_GST_SCTP_ASSOCIATION_ID:
318 g_value_set_uint (value, self->sctp_association_id);
319 break;
320 case PROP_REMOTE_SCTP_PORT:
321 g_value_set_uint (value, self->remote_sctp_port);
322 break;
323 case PROP_USE_SOCK_STREAM:
324 g_value_set_boolean (value, self->use_sock_stream);
325 break;
326 default:
327 G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
328 break;
329 }
330 }
331
332 static GstStateChangeReturn
gst_sctp_enc_change_state(GstElement * element,GstStateChange transition)333 gst_sctp_enc_change_state (GstElement * element, GstStateChange transition)
334 {
335 GstSctpEnc *self = GST_SCTP_ENC (element);
336 GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE;
337 gboolean res = TRUE;
338
339 switch (transition) {
340 case GST_STATE_CHANGE_NULL_TO_READY:
341 break;
342 case GST_STATE_CHANGE_READY_TO_PAUSED:
343 self->need_segment = self->need_stream_start_caps = TRUE;
344 self->src_ret = GST_FLOW_OK;
345 gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
346 res = configure_association (self);
347 break;
348 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
349 break;
350 case GST_STATE_CHANGE_PAUSED_TO_READY:
351 stop_srcpad_task (self->src_pad, self);
352 self->src_ret = GST_FLOW_FLUSHING;
353 break;
354 case GST_STATE_CHANGE_READY_TO_NULL:
355 break;
356 default:
357 break;
358 }
359
360 if (res)
361 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
362
363 switch (transition) {
364 case GST_STATE_CHANGE_NULL_TO_READY:
365 break;
366 case GST_STATE_CHANGE_READY_TO_PAUSED:
367 gst_pad_start_task (self->src_pad,
368 (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
369 break;
370 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
371 break;
372 case GST_STATE_CHANGE_PAUSED_TO_READY:
373 sctpenc_cleanup (self);
374 break;
375 case GST_STATE_CHANGE_READY_TO_NULL:
376 break;
377 default:
378 break;
379 }
380
381 return ret;
382 }
383
384 static GstPad *
gst_sctp_enc_request_new_pad(GstElement * element,GstPadTemplate * template,const gchar * new_pad_name,const GstCaps * caps)385 gst_sctp_enc_request_new_pad (GstElement * element, GstPadTemplate * template,
386 const gchar * new_pad_name, const GstCaps * caps)
387 {
388 GstSctpEnc *self = GST_SCTP_ENC (element);
389 GstPad *new_pad = NULL;
390 GstSctpEncPad *sctpenc_pad;
391 guint32 stream_id;
392 gint state;
393 guint32 new_ppid;
394 gboolean is_new_ppid;
395
396 g_object_get (self->sctp_association, "state", &state, NULL);
397
398 if (state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
399 GST_ERROR_OBJECT
400 (self,
401 "The SCTP association must be established before a new stream can be created");
402 goto invalid_state;
403 }
404
405 if (!template)
406 goto invalid_parameter;
407
408 /* 65535 is not a valid stream id */
409 if (!new_pad_name || (sscanf (new_pad_name, "sink_%u", &stream_id) != 1)
410 || stream_id > 65534) {
411 GST_ERROR_OBJECT
412 (self, "Invalid sink pad name %s", GST_STR_NULL (new_pad_name));
413 goto invalid_parameter;
414 }
415
416 new_pad = gst_element_get_static_pad (element, new_pad_name);
417 if (new_pad) {
418 gst_object_unref (new_pad);
419 new_pad = NULL;
420 GST_ERROR_OBJECT (self, "Pad %s already exists", new_pad_name);
421 goto invalid_parameter;
422 }
423
424 GST_DEBUG_OBJECT (self, "Creating new pad %s", new_pad_name);
425 new_pad =
426 g_object_new (GST_TYPE_SCTP_ENC_PAD, "name", new_pad_name, "direction",
427 template->direction, "template", template, NULL);
428 gst_pad_set_chain_function (new_pad,
429 GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_chain));
430 gst_pad_set_event_function (new_pad,
431 GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_event));
432
433 sctpenc_pad = GST_SCTP_ENC_PAD (new_pad);
434 sctpenc_pad->stream_id = stream_id;
435 sctpenc_pad->ppid = DEFAULT_SCTP_PPID;
436
437 if (caps) {
438 GST_DEBUG_OBJECT (self, "Pad %s requested with caps %" GST_PTR_FORMAT,
439 new_pad_name, caps);
440 get_config_from_caps (caps, &sctpenc_pad->ordered,
441 &sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
442 &is_new_ppid);
443
444 if (is_new_ppid)
445 sctpenc_pad->ppid = new_ppid;
446 }
447
448 sctpenc_pad->flushing = FALSE;
449
450 if (!gst_pad_set_active (new_pad, TRUE))
451 goto error_cleanup;
452
453 if (!gst_element_add_pad (element, new_pad))
454 goto error_add_pad;
455
456 invalid_state:
457 invalid_parameter:
458 return new_pad;
459 error_add_pad:
460 gst_pad_set_active (new_pad, FALSE);
461 error_cleanup:
462 gst_object_unref (new_pad);
463 return NULL;
464 }
465
466 static void
gst_sctp_enc_release_pad(GstElement * element,GstPad * pad)467 gst_sctp_enc_release_pad (GstElement * element, GstPad * pad)
468 {
469 GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
470 GstSctpEnc *self;
471 guint stream_id = 0;
472
473 self = GST_SCTP_ENC (element);
474
475 g_mutex_lock (&sctpenc_pad->lock);
476 sctpenc_pad->flushing = TRUE;
477 g_cond_signal (&sctpenc_pad->cond);
478 g_mutex_unlock (&sctpenc_pad->lock);
479
480 stream_id = sctpenc_pad->stream_id;
481 gst_pad_set_active (pad, FALSE);
482
483 if (self->sctp_association)
484 gst_sctp_association_reset_stream (self->sctp_association, stream_id);
485
486 GST_PAD_STREAM_LOCK (pad);
487 if (gst_object_has_as_parent (GST_OBJECT (pad), GST_OBJECT (element)))
488 gst_element_remove_pad (element, pad);
489 GST_PAD_STREAM_UNLOCK (pad);
490 }
491
492 static void
gst_sctp_enc_srcpad_loop(GstPad * pad)493 gst_sctp_enc_srcpad_loop (GstPad * pad)
494 {
495 GstSctpEnc *self = GST_SCTP_ENC (GST_PAD_PARENT (pad));
496 GstFlowReturn flow_ret;
497 GstDataQueueItem *item;
498
499 if (self->need_stream_start_caps) {
500 gchar s_id[32];
501 GstCaps *caps;
502
503 g_snprintf (s_id, sizeof (s_id), "sctpenc-%08x", g_random_int ());
504 gst_pad_push_event (self->src_pad, gst_event_new_stream_start (s_id));
505
506 caps = gst_caps_new_empty_simple ("application/x-sctp");
507 gst_pad_set_caps (self->src_pad, caps);
508 gst_caps_unref (caps);
509
510 self->need_stream_start_caps = FALSE;
511 }
512
513 if (self->need_segment) {
514 GstSegment segment;
515
516 gst_segment_init (&segment, GST_FORMAT_BYTES);
517 gst_pad_push_event (self->src_pad, gst_event_new_segment (&segment));
518
519 self->need_segment = FALSE;
520 }
521
522 if (gst_data_queue_pop (self->outbound_sctp_packet_queue, &item)) {
523 GstBuffer *buffer = GST_BUFFER (item->object);
524
525 GST_DEBUG_OBJECT (self, "Forwarding buffer %" GST_PTR_FORMAT, buffer);
526
527 flow_ret = gst_pad_push (self->src_pad, buffer);
528 item->object = NULL;
529
530 GST_OBJECT_LOCK (self);
531 self->src_ret = flow_ret;
532 GST_OBJECT_UNLOCK (self);
533
534 if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
535 || flow_ret == GST_FLOW_NOT_LINKED)) {
536 GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
537 gst_flow_get_name (flow_ret));
538 } else if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
539 GST_ERROR_OBJECT (pad, "Push failed on packet source pad. Error: %s",
540 gst_flow_get_name (flow_ret));
541 }
542
543 if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
544 GST_DEBUG_OBJECT (pad, "Pausing task because of an error");
545 gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
546 gst_data_queue_flush (self->outbound_sctp_packet_queue);
547 gst_pad_pause_task (pad);
548 }
549
550 item->destroy (item);
551 } else {
552 GST_OBJECT_LOCK (self);
553 self->src_ret = GST_FLOW_FLUSHING;
554 GST_OBJECT_UNLOCK (self);
555
556 GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
557 gst_pad_pause_task (pad);
558 }
559 }
560
561 static GstFlowReturn
gst_sctp_enc_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)562 gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
563 {
564 GstSctpEnc *self = GST_SCTP_ENC (parent);
565 GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
566 GstMapInfo map;
567 guint32 ppid;
568 gboolean ordered;
569 GstSctpAssociationPartialReliability pr;
570 guint32 pr_param;
571 gpointer state = NULL;
572 GstMeta *meta;
573 const GstMetaInfo *meta_info = GST_SCTP_SEND_META_INFO;
574 GstFlowReturn flow_ret = GST_FLOW_ERROR;
575 const guint8 *data;
576 guint32 length;
577
578 GST_OBJECT_LOCK (self);
579 if (self->src_ret != GST_FLOW_OK) {
580 GST_ERROR_OBJECT (pad, "Pushing on source pad failed before: %s",
581 gst_flow_get_name (self->src_ret));
582 flow_ret = self->src_ret;
583 GST_OBJECT_UNLOCK (self);
584 gst_buffer_unref (buffer);
585 return flow_ret;
586 }
587 GST_OBJECT_UNLOCK (self);
588
589 ppid = sctpenc_pad->ppid;
590 ordered = sctpenc_pad->ordered;
591 pr = sctpenc_pad->reliability;
592 pr_param = sctpenc_pad->reliability_param;
593
594 while ((meta = gst_buffer_iterate_meta (buffer, &state))) {
595 if (meta->info->api == meta_info->api) {
596 GstSctpSendMeta *sctp_send_meta = (GstSctpSendMeta *) meta;
597
598 ppid = sctp_send_meta->ppid;
599 ordered = sctp_send_meta->ordered;
600 pr_param = sctp_send_meta->pr_param;
601 switch (sctp_send_meta->pr) {
602 case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE:
603 pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
604 break;
605 case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX:
606 pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
607 break;
608 case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_BUF:
609 pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
610 break;
611 case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL:
612 pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
613 break;
614 }
615 break;
616 }
617 }
618
619 GST_DEBUG_OBJECT (pad,
620 "Sending buffer %" GST_PTR_FORMAT
621 " with ppid %u ordered %d pr %d pr_param %u", buffer, ppid, ordered, pr,
622 pr_param);
623
624 if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) {
625 GST_ERROR_OBJECT (pad, "Could not map GstBuffer");
626 goto error;
627 }
628
629 data = map.data;
630 length = map.size;
631
632 g_mutex_lock (&sctpenc_pad->lock);
633 while (!sctpenc_pad->flushing) {
634 guint32 bytes_sent;
635
636 g_mutex_unlock (&sctpenc_pad->lock);
637
638 flow_ret =
639 gst_sctp_association_send_data (self->sctp_association, data,
640 length, sctpenc_pad->stream_id, ppid, ordered, pr, pr_param,
641 &bytes_sent);
642
643 g_mutex_lock (&sctpenc_pad->lock);
644 if (flow_ret != GST_FLOW_OK) {
645 if (flow_ret != GST_FLOW_EOS) {
646 GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL),
647 ("Failed to send data"));
648 }
649 goto out;
650 } else if (bytes_sent < length && !sctpenc_pad->flushing) {
651 gint64 end_time = g_get_monotonic_time () + BUFFER_FULL_SLEEP_TIME;
652
653 GST_TRACE_OBJECT (pad, "Sent only %u of %u remaining bytes, waiting",
654 bytes_sent, length);
655
656 sctpenc_pad->bytes_sent += bytes_sent;
657 data += bytes_sent;
658 length -= bytes_sent;
659
660 /* The buffer was probably full. Retry in a while */
661 GST_OBJECT_LOCK (self);
662 g_queue_push_tail (&self->pending_pads, sctpenc_pad);
663 GST_OBJECT_UNLOCK (self);
664
665 g_cond_wait_until (&sctpenc_pad->cond, &sctpenc_pad->lock, end_time);
666
667 GST_OBJECT_LOCK (self);
668 g_queue_remove (&self->pending_pads, sctpenc_pad);
669 GST_OBJECT_UNLOCK (self);
670 } else if (bytes_sent == length) {
671 GST_DEBUG_OBJECT (pad, "Successfully sent buffer");
672 sctpenc_pad->bytes_sent += bytes_sent;
673 break;
674 }
675 }
676 flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK;
677
678 out:
679 g_mutex_unlock (&sctpenc_pad->lock);
680
681 gst_buffer_unmap (buffer, &map);
682 error:
683 gst_buffer_unref (buffer);
684 return flow_ret;
685 }
686
687 static gboolean
gst_sctp_enc_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)688 gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
689 {
690 GstSctpEnc *self = GST_SCTP_ENC (parent);
691 GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
692 gboolean ret, is_new_ppid;
693 guint32 new_ppid;
694
695 switch (GST_EVENT_TYPE (event)) {
696 case GST_EVENT_CAPS:{
697 GstCaps *caps;
698
699 gst_event_parse_caps (event, &caps);
700 GST_DEBUG_OBJECT (pad, "Received new caps %" GST_PTR_FORMAT, caps);
701 get_config_from_caps (caps, &sctpenc_pad->ordered,
702 &sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
703 &is_new_ppid);
704 if (is_new_ppid)
705 sctpenc_pad->ppid = new_ppid;
706 gst_event_unref (event);
707 ret = TRUE;
708 break;
709 }
710 case GST_EVENT_STREAM_START:
711 case GST_EVENT_SEGMENT:
712 /* Drop these, we create our own */
713 ret = TRUE;
714 gst_event_unref (event);
715 break;
716 case GST_EVENT_EOS:
717 /* Drop this, we're never EOS until shut down */
718 ret = TRUE;
719 gst_event_unref (event);
720 break;
721 case GST_EVENT_FLUSH_START:
722 g_mutex_lock (&sctpenc_pad->lock);
723 sctpenc_pad->flushing = TRUE;
724 g_cond_signal (&sctpenc_pad->cond);
725 g_mutex_unlock (&sctpenc_pad->lock);
726
727 ret = gst_pad_event_default (pad, parent, event);
728 break;
729 case GST_EVENT_FLUSH_STOP:
730 sctpenc_pad->flushing = FALSE;
731 GST_OBJECT_LOCK (self);
732 self->src_ret = GST_FLOW_OK;
733 GST_OBJECT_UNLOCK (self);
734 ret = gst_pad_event_default (pad, parent, event);
735 break;
736 default:
737 ret = gst_pad_event_default (pad, parent, event);
738 break;
739 }
740 return ret;
741 }
742
743 static void
flush_sinkpad(const GValue * item,gpointer user_data)744 flush_sinkpad (const GValue * item, gpointer user_data)
745 {
746 GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
747 gboolean flush = GPOINTER_TO_INT (user_data);
748
749 if (flush) {
750 g_mutex_lock (&sctpenc_pad->lock);
751 sctpenc_pad->flushing = TRUE;
752 g_cond_signal (&sctpenc_pad->cond);
753 g_mutex_unlock (&sctpenc_pad->lock);
754 } else {
755 sctpenc_pad->flushing = FALSE;
756 }
757 }
758
759 static gboolean
gst_sctp_enc_src_event(GstPad * pad,GstObject * parent,GstEvent * event)760 gst_sctp_enc_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
761 {
762 GstSctpEnc *self = GST_SCTP_ENC (parent);
763 gboolean ret;
764
765 switch (GST_EVENT_TYPE (event)) {
766 case GST_EVENT_FLUSH_START:{
767 GstIterator *it;
768
769 gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
770 gst_data_queue_flush (self->outbound_sctp_packet_queue);
771
772 it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
773 while (gst_iterator_foreach (it, flush_sinkpad,
774 GINT_TO_POINTER (TRUE)) == GST_ITERATOR_RESYNC)
775 gst_iterator_resync (it);
776 gst_iterator_free (it);
777
778 ret = gst_pad_event_default (pad, parent, event);
779 break;
780 }
781 case GST_EVENT_RECONFIGURE:
782 case GST_EVENT_FLUSH_STOP:{
783 GstIterator *it;
784
785 it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
786 while (gst_iterator_foreach (it, flush_sinkpad,
787 GINT_TO_POINTER (FALSE)) == GST_ITERATOR_RESYNC)
788 gst_iterator_resync (it);
789 gst_iterator_free (it);
790
791 gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
792 self->need_segment = TRUE;
793 GST_OBJECT_LOCK (self);
794 self->src_ret = GST_FLOW_OK;
795 GST_OBJECT_UNLOCK (self);
796 gst_pad_start_task (self->src_pad,
797 (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
798
799 ret = gst_pad_event_default (pad, parent, event);
800 break;
801 }
802 default:
803 ret = gst_pad_event_default (pad, parent, event);
804 break;
805 }
806 return ret;
807 }
808
809 static gboolean
configure_association(GstSctpEnc * self)810 configure_association (GstSctpEnc * self)
811 {
812 gint state;
813
814 self->sctp_association = gst_sctp_association_get (self->sctp_association_id);
815
816 g_object_get (self->sctp_association, "state", &state, NULL);
817
818 if (state != GST_SCTP_ASSOCIATION_STATE_NEW) {
819 GST_WARNING_OBJECT (self,
820 "Could not configure SCTP association. Association already in use!");
821 g_object_unref (self->sctp_association);
822 self->sctp_association = NULL;
823 goto error;
824 }
825
826 self->signal_handler_state_changed =
827 g_signal_connect_object (self->sctp_association, "notify::state",
828 G_CALLBACK (on_sctp_association_state_changed), self, 0);
829
830 g_object_bind_property (self, "remote-sctp-port", self->sctp_association,
831 "remote-port", G_BINDING_SYNC_CREATE);
832
833 g_object_bind_property (self, "use-sock-stream", self->sctp_association,
834 "use-sock-stream", G_BINDING_SYNC_CREATE);
835
836 gst_sctp_association_set_on_packet_out (self->sctp_association,
837 on_sctp_packet_out, gst_object_ref (self), gst_object_unref);
838
839 return TRUE;
840 error:
841 return FALSE;
842 }
843
844 static void
on_sctp_association_state_changed(GstSctpAssociation * sctp_association,GParamSpec * pspec,GstSctpEnc * self)845 on_sctp_association_state_changed (GstSctpAssociation * sctp_association,
846 GParamSpec * pspec, GstSctpEnc * self)
847 {
848 gint state;
849
850 g_object_get (sctp_association, "state", &state, NULL);
851
852 GST_DEBUG_OBJECT (self, "Association state changed to %d", state);
853
854 switch (state) {
855 case GST_SCTP_ASSOCIATION_STATE_NEW:
856 break;
857 case GST_SCTP_ASSOCIATION_STATE_READY:
858 gst_sctp_association_start (sctp_association);
859 break;
860 case GST_SCTP_ASSOCIATION_STATE_CONNECTING:
861 break;
862 case GST_SCTP_ASSOCIATION_STATE_CONNECTED:
863 g_signal_emit (self, signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED], 0,
864 TRUE);
865 break;
866 case GST_SCTP_ASSOCIATION_STATE_DISCONNECTING:
867 case GST_SCTP_ASSOCIATION_STATE_DISCONNECTED:
868 g_signal_emit (self, signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED], 0,
869 FALSE);
870 break;
871 case GST_SCTP_ASSOCIATION_STATE_ERROR:
872 GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL),
873 ("SCTP association went into error state"));
874 break;
875 }
876 }
877
878 static void
data_queue_item_free(GstDataQueueItem * item)879 data_queue_item_free (GstDataQueueItem * item)
880 {
881 if (item->object)
882 gst_mini_object_unref (item->object);
883 g_free (item);
884 }
885
886 static void
on_sctp_packet_out(GstSctpAssociation * _association,const guint8 * buf,gsize length,gpointer user_data)887 on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf,
888 gsize length, gpointer user_data)
889 {
890 GstSctpEnc *self = user_data;
891 GstBuffer *gstbuf;
892 GstDataQueueItem *item;
893 GList *pending_pads, *l;
894 GstSctpEncPad *sctpenc_pad;
895
896 GST_DEBUG_OBJECT (self, "Received output packet of size %" G_GSIZE_FORMAT,
897 length);
898
899 gstbuf = gst_buffer_new_memdup (buf, length);
900
901 item = g_new0 (GstDataQueueItem, 1);
902 item->object = GST_MINI_OBJECT (gstbuf);
903 item->size = length;
904 item->visible = TRUE;
905 item->destroy = (GDestroyNotify) data_queue_item_free;
906
907 if (!gst_data_queue_push (self->outbound_sctp_packet_queue, item)) {
908 item->destroy (item);
909 GST_DEBUG_OBJECT (self, "Failed to push item because we're flushing");
910 }
911
912 /* Wake up pads in the order they waited, oldest pad first */
913 GST_OBJECT_LOCK (self);
914 pending_pads = NULL;
915 while ((sctpenc_pad = g_queue_pop_tail (&self->pending_pads))) {
916 pending_pads = g_list_prepend (pending_pads, sctpenc_pad);
917 }
918 GST_OBJECT_UNLOCK (self);
919
920 for (l = pending_pads; l; l = l->next) {
921 sctpenc_pad = l->data;
922 g_mutex_lock (&sctpenc_pad->lock);
923 g_cond_signal (&sctpenc_pad->cond);
924 g_mutex_unlock (&sctpenc_pad->lock);
925 }
926 g_list_free (pending_pads);
927 }
928
929 static void
stop_srcpad_task(GstPad * pad,GstSctpEnc * self)930 stop_srcpad_task (GstPad * pad, GstSctpEnc * self)
931 {
932 gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
933 gst_data_queue_flush (self->outbound_sctp_packet_queue);
934 gst_pad_stop_task (pad);
935 }
936
937 static void
remove_sinkpad(const GValue * item,gpointer user_data)938 remove_sinkpad (const GValue * item, gpointer user_data)
939 {
940 GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
941 GstSctpEnc *self = user_data;
942
943 gst_sctp_enc_release_pad (GST_ELEMENT (self), GST_PAD (sctpenc_pad));
944 }
945
946 static void
sctpenc_cleanup(GstSctpEnc * self)947 sctpenc_cleanup (GstSctpEnc * self)
948 {
949 GstIterator *it;
950
951 gst_sctp_association_set_on_packet_out (self->sctp_association, NULL, NULL,
952 NULL);
953
954 g_signal_handler_disconnect (self->sctp_association,
955 self->signal_handler_state_changed);
956 stop_srcpad_task (self->src_pad, self);
957 gst_sctp_association_force_close (self->sctp_association);
958 g_object_unref (self->sctp_association);
959 self->sctp_association = NULL;
960
961 it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
962 while (gst_iterator_foreach (it, remove_sinkpad, self) == GST_ITERATOR_RESYNC)
963 gst_iterator_resync (it);
964 gst_iterator_free (it);
965 g_queue_clear (&self->pending_pads);
966 }
967
968 static void
get_config_from_caps(const GstCaps * caps,gboolean * ordered,GstSctpAssociationPartialReliability * reliability,guint32 * reliability_param,guint32 * ppid,gboolean * ppid_available)969 get_config_from_caps (const GstCaps * caps, gboolean * ordered,
970 GstSctpAssociationPartialReliability * reliability,
971 guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available)
972 {
973 GstStructure *s;
974 guint i, n;
975
976 *ordered = TRUE;
977 *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
978 *reliability_param = 0;
979 *ppid_available = FALSE;
980
981 n = gst_caps_get_size (caps);
982 for (i = 0; i < n; i++) {
983 s = gst_caps_get_structure (caps, i);
984 if (gst_structure_has_field (s, "ordered")) {
985 const GValue *v = gst_structure_get_value (s, "ordered");
986 *ordered = g_value_get_boolean (v);
987 }
988 if (gst_structure_has_field (s, "partially-reliability")) {
989 const GValue *v = gst_structure_get_value (s, "partially-reliability");
990 const gchar *reliability_string = g_value_get_string (v);
991
992 if (!g_strcmp0 (reliability_string, "none"))
993 *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
994 else if (!g_strcmp0 (reliability_string, "ttl"))
995 *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
996 else if (!g_strcmp0 (reliability_string, "buf"))
997 *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
998 else if (!g_strcmp0 (reliability_string, "rtx"))
999 *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
1000 }
1001 if (gst_structure_has_field (s, "reliability-parameter")) {
1002 const GValue *v = gst_structure_get_value (s, "reliability-parameter");
1003 *reliability_param = g_value_get_uint (v);
1004 }
1005 if (gst_structure_has_field (s, "ppid")) {
1006 const GValue *v = gst_structure_get_value (s, "ppid");
1007 *ppid = g_value_get_uint (v);
1008 *ppid_available = TRUE;
1009 }
1010 }
1011 }
1012
1013 static guint64
on_get_stream_bytes_sent(GstSctpEnc * self,guint stream_id)1014 on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id)
1015 {
1016 gchar *pad_name;
1017 GstPad *pad;
1018 GstSctpEncPad *sctpenc_pad;
1019 guint64 bytes_sent;
1020
1021 pad_name = g_strdup_printf ("sink_%u", stream_id);
1022 pad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
1023 g_free (pad_name);
1024
1025 if (!pad) {
1026 GST_DEBUG_OBJECT (self,
1027 "Buffered amount requested on a stream that does not exist!");
1028 return 0;
1029 }
1030
1031 sctpenc_pad = GST_SCTP_ENC_PAD (pad);
1032
1033 g_mutex_lock (&sctpenc_pad->lock);
1034 bytes_sent = sctpenc_pad->bytes_sent;
1035 g_mutex_unlock (&sctpenc_pad->lock);
1036
1037 gst_object_unref (sctpenc_pad);
1038
1039 return bytes_sent;
1040 }
1041