1 /*
2 * Copyright (c) 2015, Collabora Ltd.
3 *
4 * Redistribution and use in source and binary forms, with or without modification,
5 * are permitted provided that the following conditions are met:
6 *
7 * 1. Redistributions of source code must retain the above copyright notice, this
8 * list of conditions and the following disclaimer.
9 *
10 * 2. Redistributions in binary form must reproduce the above copyright notice, this
11 * list of conditions and the following disclaimer in the documentation and/or other
12 * materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
17 * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
18 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
19 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
21 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
23 * OF SUCH DAMAGE.
24 */
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29 #include "gstsctpdec.h"
30
31 #include <gst/sctp/sctpreceivemeta.h>
32 #include <gst/base/gstdataqueue.h>
33
34 #include <stdio.h>
35 #include <stdlib.h>
36
37 GST_DEBUG_CATEGORY_STATIC (gst_sctp_dec_debug_category);
38 #define GST_CAT_DEFAULT gst_sctp_dec_debug_category
39
40 #define gst_sctp_dec_parent_class parent_class
41 G_DEFINE_TYPE (GstSctpDec, gst_sctp_dec, GST_TYPE_ELEMENT);
42 GST_ELEMENT_REGISTER_DEFINE (sctpdec, "sctpdec", GST_RANK_NONE,
43 GST_TYPE_SCTP_DEC);
44
45 static GstStaticPadTemplate sink_template =
46 GST_STATIC_PAD_TEMPLATE ("sink", GST_PAD_SINK,
47 GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-sctp"));
48
49 static GstStaticPadTemplate src_template =
50 GST_STATIC_PAD_TEMPLATE ("src_%u", GST_PAD_SRC,
51 GST_PAD_SOMETIMES, GST_STATIC_CAPS_ANY);
52
53 enum
54 {
55 SIGNAL_RESET_STREAM,
56 NUM_SIGNALS
57 };
58
59 static guint signals[NUM_SIGNALS];
60
61 enum
62 {
63 PROP_0,
64
65 PROP_GST_SCTP_ASSOCIATION_ID,
66 PROP_LOCAL_SCTP_PORT,
67
68 NUM_PROPERTIES
69 };
70
71 static GParamSpec *properties[NUM_PROPERTIES];
72
73 #define DEFAULT_GST_SCTP_ASSOCIATION_ID 1
74 #define DEFAULT_LOCAL_SCTP_PORT 0
75 #define MAX_SCTP_PORT 65535
76 #define MAX_GST_SCTP_ASSOCIATION_ID 65535
77 #define MAX_STREAM_ID 65535
78
79 GType gst_sctp_dec_pad_get_type (void);
80
81 #define GST_TYPE_SCTP_DEC_PAD (gst_sctp_dec_pad_get_type())
82 #define GST_SCTP_DEC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_DEC_PAD, GstSctpDecPad))
83 #define GST_SCTP_DEC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_DEC_PAD, GstSctpDecPadClass))
84 #define GST_IS_SCTP_DEC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_DEC_PAD))
85 #define GST_IS_SCTP_DEC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_DEC_PAD))
86
87 typedef struct _GstSctpDecPad GstSctpDecPad;
88 typedef GstPadClass GstSctpDecPadClass;
89
90 struct _GstSctpDecPad
91 {
92 GstPad parent;
93
94 GstDataQueue *packet_queue;
95 };
96
97 G_DEFINE_TYPE (GstSctpDecPad, gst_sctp_dec_pad, GST_TYPE_PAD);
98
99 static void
gst_sctp_dec_pad_finalize(GObject * object)100 gst_sctp_dec_pad_finalize (GObject * object)
101 {
102 GstSctpDecPad *self = GST_SCTP_DEC_PAD (object);
103
104 gst_object_unref (self->packet_queue);
105
106 G_OBJECT_CLASS (gst_sctp_dec_pad_parent_class)->finalize (object);
107 }
108
109 static gboolean
data_queue_check_full_cb(GstDataQueue * queue,guint visible,guint bytes,guint64 time,gpointer user_data)110 data_queue_check_full_cb (GstDataQueue * queue, guint visible, guint bytes,
111 guint64 time, gpointer user_data)
112 {
113 /* FIXME: Are we full at some point and block? */
114 return FALSE;
115 }
116
117 static void
data_queue_empty_cb(GstDataQueue * queue,gpointer user_data)118 data_queue_empty_cb (GstDataQueue * queue, gpointer user_data)
119 {
120 }
121
122 static void
data_queue_full_cb(GstDataQueue * queue,gpointer user_data)123 data_queue_full_cb (GstDataQueue * queue, gpointer user_data)
124 {
125 }
126
127 static void
gst_sctp_dec_pad_class_init(GstSctpDecPadClass * klass)128 gst_sctp_dec_pad_class_init (GstSctpDecPadClass * klass)
129 {
130 GObjectClass *gobject_class;
131
132 gobject_class = G_OBJECT_CLASS (klass);
133
134 gobject_class->finalize = gst_sctp_dec_pad_finalize;
135 }
136
137 static void
gst_sctp_dec_pad_init(GstSctpDecPad * self)138 gst_sctp_dec_pad_init (GstSctpDecPad * self)
139 {
140 self->packet_queue = gst_data_queue_new (data_queue_check_full_cb,
141 data_queue_full_cb, data_queue_empty_cb, NULL);
142 }
143
144 static void gst_sctp_dec_set_property (GObject * object, guint prop_id,
145 const GValue * value, GParamSpec * pspec);
146 static void gst_sctp_dec_get_property (GObject * object, guint prop_id,
147 GValue * value, GParamSpec * pspec);
148 static void gst_sctp_dec_finalize (GObject * object);
149 static GstStateChangeReturn gst_sctp_dec_change_state (GstElement * element,
150 GstStateChange transition);
151 static GstFlowReturn gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self,
152 GstBuffer * buf);
153 static gboolean gst_sctp_dec_packet_event (GstPad * pad, GstSctpDec * self,
154 GstEvent * event);
155 static void gst_sctp_data_srcpad_loop (GstPad * pad);
156
157 static gboolean configure_association (GstSctpDec * self);
158 static void on_gst_sctp_association_stream_reset (GstSctpAssociation *
159 gst_sctp_association, guint16 stream_id, GstSctpDec * self);
160 static void on_receive (GstSctpAssociation * gst_sctp_association,
161 guint8 * buf, gsize length, guint16 stream_id, guint ppid,
162 gpointer user_data);
163 static void stop_srcpad_task (GstPad * pad);
164 static void stop_all_srcpad_tasks (GstSctpDec * self);
165 static void sctpdec_cleanup (GstSctpDec * self);
166 static GstPad *get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id);
167 static void remove_pad (GstSctpDec * self, GstPad * pad);
168 static void on_reset_stream (GstSctpDec * self, guint stream_id);
169
170 static void
gst_sctp_dec_class_init(GstSctpDecClass * klass)171 gst_sctp_dec_class_init (GstSctpDecClass * klass)
172 {
173 GObjectClass *gobject_class;
174 GstElementClass *element_class;
175
176 gobject_class = G_OBJECT_CLASS (klass);
177 element_class = GST_ELEMENT_CLASS (klass);
178
179 GST_DEBUG_CATEGORY_INIT (gst_sctp_dec_debug_category,
180 "sctpdec", 0, "debug category for sctpdec element");
181
182 gst_element_class_add_pad_template (element_class,
183 gst_static_pad_template_get (&src_template));
184 gst_element_class_add_pad_template (element_class,
185 gst_static_pad_template_get (&sink_template));
186
187 gobject_class->set_property = gst_sctp_dec_set_property;
188 gobject_class->get_property = gst_sctp_dec_get_property;
189 gobject_class->finalize = gst_sctp_dec_finalize;
190
191 element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_dec_change_state);
192
193 klass->on_reset_stream = on_reset_stream;
194
195 properties[PROP_GST_SCTP_ASSOCIATION_ID] =
196 g_param_spec_uint ("sctp-association-id",
197 "SCTP Association ID",
198 "Every encoder/decoder pair should have the same, unique, sctp-association-id. "
199 "This value must be set before any pads are requested.",
200 0, MAX_GST_SCTP_ASSOCIATION_ID, DEFAULT_GST_SCTP_ASSOCIATION_ID,
201 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
202
203 properties[PROP_LOCAL_SCTP_PORT] =
204 g_param_spec_uint ("local-sctp-port",
205 "Local SCTP port",
206 "Local sctp port for the sctp association. The remote port is configured via the "
207 "GstSctpEnc element.",
208 0, MAX_SCTP_PORT, DEFAULT_LOCAL_SCTP_PORT,
209 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
210
211 g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
212
213 signals[SIGNAL_RESET_STREAM] = g_signal_new ("reset-stream",
214 G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
215 G_STRUCT_OFFSET (GstSctpDecClass, on_reset_stream), NULL, NULL,
216 NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
217
218 gst_element_class_set_static_metadata (element_class,
219 "SCTP Decoder",
220 "Decoder/Network/SCTP",
221 "Decodes packets with SCTP",
222 "George Kiagiadakis <george.kiagiadakis@collabora.com>");
223 }
224
225 static void
gst_sctp_dec_init(GstSctpDec * self)226 gst_sctp_dec_init (GstSctpDec * self)
227 {
228 self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
229 self->local_sctp_port = DEFAULT_LOCAL_SCTP_PORT;
230
231 self->flow_combiner = gst_flow_combiner_new ();
232
233 self->sink_pad = gst_pad_new_from_static_template (&sink_template, "sink");
234 gst_pad_set_chain_function (self->sink_pad,
235 GST_DEBUG_FUNCPTR ((GstPadChainFunction) gst_sctp_dec_packet_chain));
236 gst_pad_set_event_function (self->sink_pad,
237 GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_dec_packet_event));
238
239 gst_element_add_pad (GST_ELEMENT (self), self->sink_pad);
240 }
241
242 static void
gst_sctp_dec_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)243 gst_sctp_dec_set_property (GObject * object, guint prop_id,
244 const GValue * value, GParamSpec * pspec)
245 {
246 GstSctpDec *self = GST_SCTP_DEC (object);
247
248 switch (prop_id) {
249 case PROP_GST_SCTP_ASSOCIATION_ID:
250 self->sctp_association_id = g_value_get_uint (value);
251 break;
252 case PROP_LOCAL_SCTP_PORT:
253 self->local_sctp_port = g_value_get_uint (value);
254 break;
255 default:
256 G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
257 break;
258 }
259 }
260
261 static void
gst_sctp_dec_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)262 gst_sctp_dec_get_property (GObject * object, guint prop_id, GValue * value,
263 GParamSpec * pspec)
264 {
265 GstSctpDec *self = GST_SCTP_DEC (object);
266
267 switch (prop_id) {
268 case PROP_GST_SCTP_ASSOCIATION_ID:
269 g_value_set_uint (value, self->sctp_association_id);
270 break;
271 case PROP_LOCAL_SCTP_PORT:
272 g_value_set_uint (value, self->local_sctp_port);
273 break;
274 default:
275 G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
276 break;
277 }
278 }
279
280 static void
gst_sctp_dec_finalize(GObject * object)281 gst_sctp_dec_finalize (GObject * object)
282 {
283 GstSctpDec *self = GST_SCTP_DEC (object);
284
285 gst_flow_combiner_free (self->flow_combiner);
286 self->flow_combiner = NULL;
287
288 G_OBJECT_CLASS (parent_class)->finalize (object);
289 }
290
291 static GstStateChangeReturn
gst_sctp_dec_change_state(GstElement * element,GstStateChange transition)292 gst_sctp_dec_change_state (GstElement * element, GstStateChange transition)
293 {
294 GstSctpDec *self = GST_SCTP_DEC (element);
295 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
296
297 switch (transition) {
298 case GST_STATE_CHANGE_READY_TO_PAUSED:
299 gst_flow_combiner_reset (self->flow_combiner);
300 if (!configure_association (self))
301 ret = GST_STATE_CHANGE_FAILURE;
302 break;
303 case GST_STATE_CHANGE_PAUSED_TO_READY:
304 stop_all_srcpad_tasks (self);
305 break;
306 default:
307 break;
308 }
309
310 if (ret != GST_STATE_CHANGE_FAILURE)
311 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
312
313 switch (transition) {
314 case GST_STATE_CHANGE_PAUSED_TO_READY:
315 sctpdec_cleanup (self);
316 gst_flow_combiner_reset (self->flow_combiner);
317 break;
318 default:
319 break;
320 }
321
322 return ret;
323 }
324
325 static GstFlowReturn
gst_sctp_dec_packet_chain(GstPad * pad,GstSctpDec * self,GstBuffer * buf)326 gst_sctp_dec_packet_chain (GstPad * pad, GstSctpDec * self, GstBuffer * buf)
327 {
328 GstFlowReturn flow_ret;
329 GstMapInfo map;
330
331 GST_DEBUG_OBJECT (self, "Processing received buffer %" GST_PTR_FORMAT, buf);
332
333 if (!gst_buffer_map (buf, &map, GST_MAP_READ)) {
334 GST_ERROR_OBJECT (self, "Could not map GstBuffer");
335 gst_buffer_unref (buf);
336 return GST_FLOW_ERROR;
337 }
338
339 gst_sctp_association_incoming_packet (self->sctp_association,
340 (const guint8 *) map.data, (guint32) map.size);
341 gst_buffer_unmap (buf, &map);
342 gst_buffer_unref (buf);
343
344 GST_OBJECT_LOCK (self);
345 /* This gets the last combined flow return from all source pads */
346 flow_ret = gst_flow_combiner_update_flow (self->flow_combiner, GST_FLOW_OK);
347 GST_OBJECT_UNLOCK (self);
348
349 if (flow_ret != GST_FLOW_OK) {
350 GST_DEBUG_OBJECT (self, "Returning %s", gst_flow_get_name (flow_ret));
351 }
352
353 return flow_ret;
354 }
355
356 static void
flush_srcpad(const GValue * item,gpointer user_data)357 flush_srcpad (const GValue * item, gpointer user_data)
358 {
359 GstSctpDecPad *sctpdec_pad = g_value_get_object (item);
360 gboolean flush = GPOINTER_TO_INT (user_data);
361
362 if (flush) {
363 gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
364 gst_data_queue_flush (sctpdec_pad->packet_queue);
365 } else {
366 gst_data_queue_set_flushing (sctpdec_pad->packet_queue, FALSE);
367 gst_pad_start_task (GST_PAD (sctpdec_pad),
368 (GstTaskFunction) gst_sctp_data_srcpad_loop, sctpdec_pad, NULL);
369 }
370 }
371
372 static gboolean
gst_sctp_dec_packet_event(GstPad * pad,GstSctpDec * self,GstEvent * event)373 gst_sctp_dec_packet_event (GstPad * pad, GstSctpDec * self, GstEvent * event)
374 {
375 switch (GST_EVENT_TYPE (event)) {
376 case GST_EVENT_STREAM_START:
377 case GST_EVENT_CAPS:
378 /* We create our own stream-start events and the caps event does not
379 * make sense */
380 gst_event_unref (event);
381 return TRUE;
382 case GST_EVENT_EOS:
383 /* Drop this, we're never EOS until shut down */
384 gst_event_unref (event);
385 return TRUE;
386 case GST_EVENT_FLUSH_START:{
387 GstIterator *it;
388
389 it = gst_element_iterate_src_pads (GST_ELEMENT (self));
390 while (gst_iterator_foreach (it, flush_srcpad,
391 GINT_TO_POINTER (TRUE)) == GST_ITERATOR_RESYNC)
392 gst_iterator_resync (it);
393 gst_iterator_free (it);
394
395 return gst_pad_event_default (pad, GST_OBJECT (self), event);
396 }
397 case GST_EVENT_FLUSH_STOP:{
398 GstIterator *it;
399
400 it = gst_element_iterate_src_pads (GST_ELEMENT (self));
401 while (gst_iterator_foreach (it, flush_srcpad,
402 GINT_TO_POINTER (FALSE)) == GST_ITERATOR_RESYNC)
403 gst_iterator_resync (it);
404 gst_iterator_free (it);
405
406 return gst_pad_event_default (pad, GST_OBJECT (self), event);
407 }
408 default:
409 return gst_pad_event_default (pad, GST_OBJECT (self), event);
410 }
411 }
412
413 static void
gst_sctp_data_srcpad_loop(GstPad * pad)414 gst_sctp_data_srcpad_loop (GstPad * pad)
415 {
416 GstSctpDec *self;
417 GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
418 GstDataQueueItem *item;
419
420 self = GST_SCTP_DEC (gst_pad_get_parent (pad));
421
422 if (gst_data_queue_pop (sctpdec_pad->packet_queue, &item)) {
423 GstBuffer *buffer;
424 GstFlowReturn flow_ret;
425
426 buffer = GST_BUFFER (item->object);
427 GST_DEBUG_OBJECT (pad, "Forwarding buffer %" GST_PTR_FORMAT, buffer);
428
429 flow_ret = gst_pad_push (pad, buffer);
430 item->object = NULL;
431
432 GST_OBJECT_LOCK (self);
433 gst_flow_combiner_update_pad_flow (self->flow_combiner, pad, flow_ret);
434 GST_OBJECT_UNLOCK (self);
435
436 if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
437 || flow_ret == GST_FLOW_NOT_LINKED) || flow_ret == GST_FLOW_EOS) {
438 GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
439 gst_flow_get_name (flow_ret));
440 } else if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
441 GST_ERROR_OBJECT (pad, "Push failed on packet source pad. Error: %s",
442 gst_flow_get_name (flow_ret));
443 }
444
445 if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
446 GST_DEBUG_OBJECT (pad, "Pausing task because of an error");
447 gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
448 gst_data_queue_flush (sctpdec_pad->packet_queue);
449 gst_pad_pause_task (pad);
450 }
451
452 item->destroy (item);
453 } else {
454 GST_OBJECT_LOCK (self);
455 gst_flow_combiner_update_pad_flow (self->flow_combiner, pad,
456 GST_FLOW_FLUSHING);
457 GST_OBJECT_UNLOCK (self);
458
459 GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
460 gst_pad_pause_task (pad);
461 }
462
463 gst_object_unref (self);
464 }
465
466 static gboolean
configure_association(GstSctpDec * self)467 configure_association (GstSctpDec * self)
468 {
469 gint state;
470
471 self->sctp_association = gst_sctp_association_get (self->sctp_association_id);
472
473 g_object_get (self->sctp_association, "state", &state, NULL);
474
475 if (state != GST_SCTP_ASSOCIATION_STATE_NEW) {
476 GST_WARNING_OBJECT (self,
477 "Could not configure SCTP association. Association already in use!");
478 g_object_unref (self->sctp_association);
479 self->sctp_association = NULL;
480 goto error;
481 }
482
483 self->signal_handler_stream_reset =
484 g_signal_connect_object (self->sctp_association, "stream-reset",
485 G_CALLBACK (on_gst_sctp_association_stream_reset), self, 0);
486
487 g_object_bind_property (self, "local-sctp-port", self->sctp_association,
488 "local-port", G_BINDING_SYNC_CREATE);
489
490 gst_sctp_association_set_on_packet_received (self->sctp_association,
491 on_receive, gst_object_ref (self), gst_object_unref);
492
493 return TRUE;
494 error:
495 return FALSE;
496 }
497
498 static gboolean
gst_sctp_dec_src_event(GstPad * pad,GstSctpDec * self,GstEvent * event)499 gst_sctp_dec_src_event (GstPad * pad, GstSctpDec * self, GstEvent * event)
500 {
501 switch (GST_EVENT_TYPE (event)) {
502 case GST_EVENT_RECONFIGURE:
503 case GST_EVENT_FLUSH_STOP:{
504 GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
505
506 /* Unflush and start task again */
507 gst_data_queue_set_flushing (sctpdec_pad->packet_queue, FALSE);
508 gst_pad_start_task (pad, (GstTaskFunction) gst_sctp_data_srcpad_loop, pad,
509 NULL);
510
511 return gst_pad_event_default (pad, GST_OBJECT (self), event);
512 }
513 case GST_EVENT_FLUSH_START:{
514 GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
515
516 gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
517 gst_data_queue_flush (sctpdec_pad->packet_queue);
518
519 return gst_pad_event_default (pad, GST_OBJECT (self), event);
520 }
521 default:
522 return gst_pad_event_default (pad, GST_OBJECT (self), event);
523 }
524 }
525
526 static gboolean
copy_sticky_events(GstPad * pad,GstEvent ** event,gpointer user_data)527 copy_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
528 {
529 GstPad *new_pad = user_data;
530
531 if (GST_EVENT_TYPE (*event) != GST_EVENT_CAPS
532 && GST_EVENT_TYPE (*event) != GST_EVENT_STREAM_START)
533 gst_pad_store_sticky_event (new_pad, *event);
534
535 return TRUE;
536 }
537
538 static GstPad *
get_pad_for_stream_id(GstSctpDec * self,guint16 stream_id)539 get_pad_for_stream_id (GstSctpDec * self, guint16 stream_id)
540 {
541 GstPad *new_pad = NULL;
542 gint state;
543 gchar *pad_name, *pad_stream_id;
544 GstPadTemplate *template;
545
546 pad_name = g_strdup_printf ("src_%hu", stream_id);
547 new_pad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
548 if (new_pad) {
549 g_free (pad_name);
550 return new_pad;
551 }
552
553 g_object_get (self->sctp_association, "state", &state, NULL);
554
555 if (state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
556 GST_ERROR_OBJECT (self,
557 "The SCTP association must be established before a new stream can be created");
558 return NULL;
559 }
560
561 GST_DEBUG_OBJECT (self, "Creating new pad for stream id %u", stream_id);
562
563 if (stream_id > MAX_STREAM_ID)
564 return NULL;
565
566 template = gst_static_pad_template_get (&src_template);
567 new_pad = g_object_new (GST_TYPE_SCTP_DEC_PAD, "name", pad_name,
568 "direction", template->direction, "template", template, NULL);
569 g_free (pad_name);
570 gst_clear_object (&template);
571
572 gst_pad_set_event_function (new_pad,
573 GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_dec_src_event));
574
575 if (!gst_pad_set_active (new_pad, TRUE))
576 goto error_cleanup;
577
578 pad_stream_id =
579 gst_pad_create_stream_id_printf (new_pad, GST_ELEMENT (self), "%hu",
580 stream_id);
581 gst_pad_push_event (new_pad, gst_event_new_stream_start (pad_stream_id));
582 g_free (pad_stream_id);
583 gst_pad_sticky_events_foreach (self->sink_pad, copy_sticky_events, new_pad);
584
585 if (!gst_element_add_pad (GST_ELEMENT (self), new_pad))
586 goto error_add;
587
588 GST_OBJECT_LOCK (self);
589 gst_flow_combiner_add_pad (self->flow_combiner, new_pad);
590 GST_OBJECT_UNLOCK (self);
591
592 gst_pad_start_task (new_pad, (GstTaskFunction) gst_sctp_data_srcpad_loop,
593 new_pad, NULL);
594
595 gst_object_ref (new_pad);
596
597 return new_pad;
598 error_add:
599 gst_pad_set_active (new_pad, FALSE);
600 error_cleanup:
601 gst_object_unref (new_pad);
602 return NULL;
603 }
604
605 static void
remove_pad(GstSctpDec * self,GstPad * pad)606 remove_pad (GstSctpDec * self, GstPad * pad)
607 {
608 stop_srcpad_task (pad);
609 GST_PAD_STREAM_LOCK (pad);
610 gst_pad_set_active (pad, FALSE);
611 if (gst_object_has_as_parent (GST_OBJECT (pad), GST_OBJECT (self)))
612 gst_element_remove_pad (GST_ELEMENT (self), pad);
613 GST_PAD_STREAM_UNLOCK (pad);
614 GST_OBJECT_LOCK (self);
615 gst_flow_combiner_remove_pad (self->flow_combiner, pad);
616 GST_OBJECT_UNLOCK (self);
617 }
618
619 static void
on_gst_sctp_association_stream_reset(GstSctpAssociation * gst_sctp_association,guint16 stream_id,GstSctpDec * self)620 on_gst_sctp_association_stream_reset (GstSctpAssociation * gst_sctp_association,
621 guint16 stream_id, GstSctpDec * self)
622 {
623 gchar *pad_name;
624 GstPad *srcpad;
625
626 GST_DEBUG_OBJECT (self, "Stream %u reset", stream_id);
627
628 pad_name = g_strdup_printf ("src_%hu", stream_id);
629 srcpad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
630 g_free (pad_name);
631 if (!srcpad) {
632 GST_WARNING_OBJECT (self, "Reset called on stream without a srcpad");
633 return;
634 }
635 remove_pad (self, srcpad);
636 gst_object_unref (srcpad);
637 }
638
639 static void
data_queue_item_free(GstDataQueueItem * item)640 data_queue_item_free (GstDataQueueItem * item)
641 {
642 if (item->object)
643 gst_mini_object_unref (item->object);
644 g_free (item);
645 }
646
647 static void
on_receive(GstSctpAssociation * sctp_association,guint8 * buf,gsize length,guint16 stream_id,guint ppid,gpointer user_data)648 on_receive (GstSctpAssociation * sctp_association, guint8 * buf,
649 gsize length, guint16 stream_id, guint ppid, gpointer user_data)
650 {
651 GstSctpDec *self = user_data;
652 GstSctpDecPad *sctpdec_pad;
653 GstPad *src_pad;
654 GstDataQueueItem *item;
655 GstBuffer *gstbuf;
656
657 src_pad = get_pad_for_stream_id (self, stream_id);
658 g_assert (src_pad);
659
660 GST_DEBUG_OBJECT (src_pad,
661 "Received incoming packet of size %" G_GSIZE_FORMAT
662 " with stream id %u ppid %u", length, stream_id, ppid);
663
664 sctpdec_pad = GST_SCTP_DEC_PAD (src_pad);
665 gstbuf =
666 gst_buffer_new_wrapped_full (0, buf, length, 0, length, buf,
667 (GDestroyNotify) usrsctp_freedumpbuffer);
668 gst_sctp_buffer_add_receive_meta (gstbuf, ppid);
669
670 item = g_new0 (GstDataQueueItem, 1);
671 item->object = GST_MINI_OBJECT (gstbuf);
672 item->size = length;
673 item->visible = TRUE;
674 item->destroy = (GDestroyNotify) data_queue_item_free;
675 if (!gst_data_queue_push (sctpdec_pad->packet_queue, item)) {
676 item->destroy (item);
677 GST_DEBUG_OBJECT (src_pad, "Failed to push item because we're flushing");
678 }
679
680 gst_object_unref (src_pad);
681 }
682
683 static void
stop_srcpad_task(GstPad * pad)684 stop_srcpad_task (GstPad * pad)
685 {
686 GstSctpDecPad *sctpdec_pad = GST_SCTP_DEC_PAD (pad);
687
688 gst_data_queue_set_flushing (sctpdec_pad->packet_queue, TRUE);
689 gst_data_queue_flush (sctpdec_pad->packet_queue);
690 gst_pad_stop_task (pad);
691 }
692
693 static void
remove_pad_it(const GValue * item,gpointer user_data)694 remove_pad_it (const GValue * item, gpointer user_data)
695 {
696 GstPad *pad = g_value_get_object (item);
697 GstSctpDec *self = user_data;
698
699 remove_pad (self, pad);
700 }
701
702 static void
stop_all_srcpad_tasks(GstSctpDec * self)703 stop_all_srcpad_tasks (GstSctpDec * self)
704 {
705 GstIterator *it;
706
707 it = gst_element_iterate_src_pads (GST_ELEMENT (self));
708 while (gst_iterator_foreach (it, remove_pad_it, self) == GST_ITERATOR_RESYNC)
709 gst_iterator_resync (it);
710 gst_iterator_free (it);
711 }
712
713 static void
sctpdec_cleanup(GstSctpDec * self)714 sctpdec_cleanup (GstSctpDec * self)
715 {
716 if (self->sctp_association) {
717 gst_sctp_association_set_on_packet_received (self->sctp_association, NULL,
718 NULL, NULL);
719 g_signal_handler_disconnect (self->sctp_association,
720 self->signal_handler_stream_reset);
721 gst_sctp_association_force_close (self->sctp_association);
722 g_object_unref (self->sctp_association);
723 self->sctp_association = NULL;
724 }
725 }
726
727 static void
on_reset_stream(GstSctpDec * self,guint stream_id)728 on_reset_stream (GstSctpDec * self, guint stream_id)
729 {
730 if (self->sctp_association) {
731 gst_sctp_association_reset_stream (self->sctp_association, stream_id);
732 on_gst_sctp_association_stream_reset (self->sctp_association, stream_id,
733 self);
734 }
735 }
736