• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2015, Collabora Ltd.
3  *
4  * Redistribution and use in source and binary forms, with or without modification,
5  * are permitted provided that the following conditions are met:
6  *
7  * 1. Redistributions of source code must retain the above copyright notice, this
8  * list of conditions and the following disclaimer.
9  *
10  * 2. Redistributions in binary form must reproduce the above copyright notice, this
11  * list of conditions and the following disclaimer in the documentation and/or other
12  * materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
17  * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
18  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
19  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
21  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
23  * OF SUCH DAMAGE.
24  */
25 
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29 
30 #include "sctpassociation.h"
31 
32 #include <gst/gst.h>
33 #include <string.h>
34 #include <errno.h>
35 #include <stdlib.h>
36 
37 GST_DEBUG_CATEGORY_STATIC (gst_sctp_association_debug_category);
38 #define GST_CAT_DEFAULT gst_sctp_association_debug_category
39 GST_DEBUG_CATEGORY_STATIC (gst_sctp_debug_category);
40 
41 #define GST_SCTP_ASSOCIATION_STATE_TYPE (gst_sctp_association_state_get_type())
42 static GType
gst_sctp_association_state_get_type(void)43 gst_sctp_association_state_get_type (void)
44 {
45   static const GEnumValue values[] = {
46     {GST_SCTP_ASSOCIATION_STATE_NEW, "state-new", "state-new"},
47     {GST_SCTP_ASSOCIATION_STATE_READY, "state-ready", "state-ready"},
48     {GST_SCTP_ASSOCIATION_STATE_CONNECTING, "state-connecting",
49         "state-connecting"},
50     {GST_SCTP_ASSOCIATION_STATE_CONNECTED, "state-connected",
51         "state-connected"},
52     {GST_SCTP_ASSOCIATION_STATE_DISCONNECTING, "state-disconnecting",
53         "state-disconnecting"},
54     {GST_SCTP_ASSOCIATION_STATE_DISCONNECTED, "state-disconnected",
55         "state-disconnected"},
56     {GST_SCTP_ASSOCIATION_STATE_ERROR, "state-error", "state-error"},
57     {0, NULL, NULL}
58   };
59   static GType id = 0;
60 
61   if (g_once_init_enter ((gsize *) & id)) {
62     GType _id;
63     _id = g_enum_register_static ("GstSctpAssociationState", values);
64     g_once_init_leave ((gsize *) & id, _id);
65   }
66 
67   return id;
68 }
69 
70 G_DEFINE_TYPE (GstSctpAssociation, gst_sctp_association, G_TYPE_OBJECT);
71 
72 enum
73 {
74   SIGNAL_STREAM_RESET,
75   LAST_SIGNAL
76 };
77 
78 
79 enum
80 {
81   PROP_0,
82 
83   PROP_ASSOCIATION_ID,
84   PROP_LOCAL_PORT,
85   PROP_REMOTE_PORT,
86   PROP_STATE,
87   PROP_USE_SOCK_STREAM,
88 
89   NUM_PROPERTIES
90 };
91 
92 static guint signals[LAST_SIGNAL] = { 0 };
93 
94 static GParamSpec *properties[NUM_PROPERTIES];
95 
96 #define DEFAULT_NUMBER_OF_SCTP_STREAMS 1024
97 #define DEFAULT_LOCAL_SCTP_PORT 0
98 #define DEFAULT_REMOTE_SCTP_PORT 0
99 
100 static GHashTable *associations = NULL;
101 G_LOCK_DEFINE_STATIC (associations_lock);
102 static guint32 number_of_associations = 0;
103 
104 /* Interface implementations */
105 static void gst_sctp_association_finalize (GObject * object);
106 static void gst_sctp_association_set_property (GObject * object, guint prop_id,
107     const GValue * value, GParamSpec * pspec);
108 static void gst_sctp_association_get_property (GObject * object, guint prop_id,
109     GValue * value, GParamSpec * pspec);
110 
111 static struct socket *create_sctp_socket (GstSctpAssociation *
112     gst_sctp_association);
113 static struct sockaddr_conn get_sctp_socket_address (GstSctpAssociation *
114     gst_sctp_association, guint16 port);
115 static gboolean client_role_connect (GstSctpAssociation * self);
116 static int sctp_packet_out (void *addr, void *buffer, size_t length, guint8 tos,
117     guint8 set_df);
118 static int receive_cb (struct socket *sock, union sctp_sockstore addr,
119     void *data, size_t datalen, struct sctp_rcvinfo rcv_info, gint flags,
120     void *ulp_info);
121 static void handle_notification (GstSctpAssociation * self,
122     const union sctp_notification *notification, size_t length);
123 static void handle_association_changed (GstSctpAssociation * self,
124     const struct sctp_assoc_change *sac);
125 static void handle_stream_reset_event (GstSctpAssociation * self,
126     const struct sctp_stream_reset_event *ssr);
127 static void handle_message (GstSctpAssociation * self, guint8 * data,
128     guint32 datalen, guint16 stream_id, guint32 ppid);
129 
130 static void maybe_set_state_to_ready (GstSctpAssociation * self);
131 static gboolean gst_sctp_association_change_state (GstSctpAssociation * self,
132     GstSctpAssociationState new_state, gboolean lock);
133 
134 static void
gst_sctp_association_class_init(GstSctpAssociationClass * klass)135 gst_sctp_association_class_init (GstSctpAssociationClass * klass)
136 {
137   GObjectClass *gobject_class;
138 
139   gobject_class = (GObjectClass *) klass;
140 
141   gobject_class->finalize = gst_sctp_association_finalize;
142   gobject_class->set_property = gst_sctp_association_set_property;
143   gobject_class->get_property = gst_sctp_association_get_property;
144 
145   signals[SIGNAL_STREAM_RESET] =
146       g_signal_new ("stream-reset", G_OBJECT_CLASS_TYPE (klass),
147       G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSctpAssociationClass,
148           on_sctp_stream_reset), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
149 
150   properties[PROP_ASSOCIATION_ID] = g_param_spec_uint ("association-id",
151       "The SCTP association-id", "The SCTP association-id.", 0, G_MAXUSHORT,
152       DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
153 
154   properties[PROP_LOCAL_PORT] = g_param_spec_uint ("local-port", "Local SCTP",
155       "The local SCTP port for this association", 0, G_MAXUSHORT,
156       DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
157 
158   properties[PROP_REMOTE_PORT] =
159       g_param_spec_uint ("remote-port", "Remote SCTP",
160       "The remote SCTP port for this association", 0, G_MAXUSHORT,
161       DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
162 
163   properties[PROP_STATE] = g_param_spec_enum ("state", "SCTP Association state",
164       "The state of the SCTP association", GST_SCTP_ASSOCIATION_STATE_TYPE,
165       GST_SCTP_ASSOCIATION_STATE_NEW,
166       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
167 
168   properties[PROP_USE_SOCK_STREAM] =
169       g_param_spec_boolean ("use-sock-stream", "Use sock-stream",
170       "When set to TRUE, a sequenced, reliable, connection-based connection is used."
171       "When TRUE the partial reliability parameters of the channel is ignored.",
172       FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
173 
174   g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
175 }
176 
177 #if defined(SCTP_DEBUG) && !defined(GST_DISABLE_GST_DEBUG)
178 #define USRSCTP_GST_DEBUG_LEVEL GST_LEVEL_DEBUG
179 static void
gst_usrsctp_debug(const gchar * format,...)180 gst_usrsctp_debug (const gchar * format, ...)
181 {
182   va_list varargs;
183 
184   va_start (varargs, format);
185   gst_debug_log_valist (gst_sctp_debug_category, USRSCTP_GST_DEBUG_LEVEL,
186       __FILE__, GST_FUNCTION, __LINE__, NULL, format, varargs);
187   va_end (varargs);
188 }
189 #endif
190 
191 static void
gst_sctp_association_init(GstSctpAssociation * self)192 gst_sctp_association_init (GstSctpAssociation * self)
193 {
194   /* No need to lock mutex here as long as the function is only called from gst_sctp_association_get */
195   if (number_of_associations == 0) {
196 #if defined(SCTP_DEBUG) && !defined(GST_DISABLE_GST_DEBUG)
197     usrsctp_init (0, sctp_packet_out, gst_usrsctp_debug);
198 #else
199     usrsctp_init (0, sctp_packet_out, NULL);
200 #endif
201 
202     /* Explicit Congestion Notification */
203     usrsctp_sysctl_set_sctp_ecn_enable (0);
204 
205     /* Do not send ABORTs in response to INITs (1).
206      * Do not send ABORTs for received Out of the Blue packets (2).
207      */
208     usrsctp_sysctl_set_sctp_blackhole (2);
209 
210     /* Enable interleaving messages for different streams (incoming)
211      * See: https://tools.ietf.org/html/rfc6458#section-8.1.20
212      */
213     usrsctp_sysctl_set_sctp_default_frag_interleave (2);
214 
215     usrsctp_sysctl_set_sctp_nr_outgoing_streams_default
216         (DEFAULT_NUMBER_OF_SCTP_STREAMS);
217 
218 #if defined(SCTP_DEBUG) && !defined(GST_DISABLE_GST_DEBUG)
219     if (USRSCTP_GST_DEBUG_LEVEL <= GST_LEVEL_MAX
220         && USRSCTP_GST_DEBUG_LEVEL <= _gst_debug_min
221         && USRSCTP_GST_DEBUG_LEVEL <=
222         gst_debug_category_get_threshold (gst_sctp_debug_category)) {
223       usrsctp_sysctl_set_sctp_debug_on (SCTP_DEBUG_ALL);
224     }
225 #endif
226   }
227   number_of_associations++;
228 
229   self->local_port = DEFAULT_LOCAL_SCTP_PORT;
230   self->remote_port = DEFAULT_REMOTE_SCTP_PORT;
231   self->sctp_ass_sock = NULL;
232 
233   g_mutex_init (&self->association_mutex);
234 
235   self->state = GST_SCTP_ASSOCIATION_STATE_NEW;
236 
237   self->use_sock_stream = TRUE;
238 
239   usrsctp_register_address ((void *) self);
240 }
241 
242 static void
gst_sctp_association_finalize(GObject * object)243 gst_sctp_association_finalize (GObject * object)
244 {
245   GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object);
246 
247   G_LOCK (associations_lock);
248 
249   g_hash_table_remove (associations, GUINT_TO_POINTER (self->association_id));
250 
251   usrsctp_deregister_address ((void *) self);
252   number_of_associations--;
253   if (number_of_associations == 0) {
254     usrsctp_finish ();
255   }
256   G_UNLOCK (associations_lock);
257 
258   G_OBJECT_CLASS (gst_sctp_association_parent_class)->finalize (object);
259 }
260 
261 static void
gst_sctp_association_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)262 gst_sctp_association_set_property (GObject * object, guint prop_id,
263     const GValue * value, GParamSpec * pspec)
264 {
265   GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object);
266 
267   g_mutex_lock (&self->association_mutex);
268   if (self->state != GST_SCTP_ASSOCIATION_STATE_NEW) {
269     switch (prop_id) {
270       case PROP_LOCAL_PORT:
271       case PROP_REMOTE_PORT:
272         GST_ERROR_OBJECT (self, "These properties cannot be set in this state");
273         goto error;
274     }
275   }
276 
277   switch (prop_id) {
278     case PROP_ASSOCIATION_ID:
279       self->association_id = g_value_get_uint (value);
280       break;
281     case PROP_LOCAL_PORT:
282       self->local_port = g_value_get_uint (value);
283       break;
284     case PROP_REMOTE_PORT:
285       self->remote_port = g_value_get_uint (value);
286       break;
287     case PROP_STATE:
288       self->state = g_value_get_enum (value);
289       break;
290     case PROP_USE_SOCK_STREAM:
291       self->use_sock_stream = g_value_get_boolean (value);
292       break;
293     default:
294       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
295       break;
296   }
297 
298   g_mutex_unlock (&self->association_mutex);
299   if (prop_id == PROP_LOCAL_PORT || prop_id == PROP_REMOTE_PORT)
300     maybe_set_state_to_ready (self);
301 
302   return;
303 
304 error:
305   g_mutex_unlock (&self->association_mutex);
306 }
307 
308 static void
maybe_set_state_to_ready(GstSctpAssociation * self)309 maybe_set_state_to_ready (GstSctpAssociation * self)
310 {
311   gboolean signal_ready_state = FALSE;
312 
313   g_mutex_lock (&self->association_mutex);
314   if ((self->state == GST_SCTP_ASSOCIATION_STATE_NEW) &&
315       (self->local_port != 0 && self->remote_port != 0)
316       && (self->packet_out_cb != NULL) && (self->packet_received_cb != NULL)) {
317     signal_ready_state =
318         gst_sctp_association_change_state (self,
319         GST_SCTP_ASSOCIATION_STATE_READY, FALSE);
320   }
321   g_mutex_unlock (&self->association_mutex);
322 
323   if (signal_ready_state)
324     g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_STATE]);
325 
326 }
327 
328 static void
gst_sctp_association_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)329 gst_sctp_association_get_property (GObject * object, guint prop_id,
330     GValue * value, GParamSpec * pspec)
331 {
332   GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object);
333 
334   switch (prop_id) {
335     case PROP_ASSOCIATION_ID:
336       g_value_set_uint (value, self->association_id);
337       break;
338     case PROP_LOCAL_PORT:
339       g_value_set_uint (value, self->local_port);
340       break;
341     case PROP_REMOTE_PORT:
342       g_value_set_uint (value, self->remote_port);
343       break;
344     case PROP_STATE:
345       g_value_set_enum (value, self->state);
346       break;
347     case PROP_USE_SOCK_STREAM:
348       g_value_set_boolean (value, self->use_sock_stream);
349       break;
350     default:
351       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
352       break;
353   }
354 }
355 
356 /* Public functions */
357 
358 GstSctpAssociation *
gst_sctp_association_get(guint32 association_id)359 gst_sctp_association_get (guint32 association_id)
360 {
361   GstSctpAssociation *association;
362 
363   G_LOCK (associations_lock);
364   GST_DEBUG_CATEGORY_INIT (gst_sctp_association_debug_category,
365       "sctpassociation", 0, "debug category for sctpassociation");
366   GST_DEBUG_CATEGORY_INIT (gst_sctp_debug_category,
367       "sctplib", 0, "debug category for messages from usrsctp");
368 
369   if (!associations) {
370     associations =
371         g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, NULL);
372   }
373 
374   association =
375       g_hash_table_lookup (associations, GUINT_TO_POINTER (association_id));
376   if (!association) {
377     association =
378         g_object_new (GST_SCTP_TYPE_ASSOCIATION, "association-id",
379         association_id, NULL);
380     g_hash_table_insert (associations, GUINT_TO_POINTER (association_id),
381         association);
382   } else {
383     g_object_ref (association);
384   }
385   G_UNLOCK (associations_lock);
386   return association;
387 }
388 
389 gboolean
gst_sctp_association_start(GstSctpAssociation * self)390 gst_sctp_association_start (GstSctpAssociation * self)
391 {
392   if (self->state != GST_SCTP_ASSOCIATION_STATE_READY) {
393     GST_WARNING_OBJECT (self,
394         "SCTP association is in wrong state and cannot be started");
395     goto configure_required;
396   }
397 
398   if ((self->sctp_ass_sock = create_sctp_socket (self)) == NULL)
399     goto error;
400 
401   /* TODO: Support both server and client role */
402   if (!client_role_connect (self)) {
403     gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_ERROR,
404         TRUE);
405     goto error;
406   }
407 
408   gst_sctp_association_change_state (self,
409       GST_SCTP_ASSOCIATION_STATE_CONNECTING, TRUE);
410 
411   return TRUE;
412 error:
413   gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_ERROR,
414       TRUE);
415   return FALSE;
416 configure_required:
417   return FALSE;
418 }
419 
420 void
gst_sctp_association_set_on_packet_out(GstSctpAssociation * self,GstSctpAssociationPacketOutCb packet_out_cb,gpointer user_data,GDestroyNotify destroy_notify)421 gst_sctp_association_set_on_packet_out (GstSctpAssociation * self,
422     GstSctpAssociationPacketOutCb packet_out_cb, gpointer user_data,
423     GDestroyNotify destroy_notify)
424 {
425   g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self));
426 
427   g_mutex_lock (&self->association_mutex);
428   if (self->packet_out_destroy_notify)
429     self->packet_out_destroy_notify (self->packet_out_user_data);
430   self->packet_out_cb = packet_out_cb;
431   self->packet_out_user_data = user_data;
432   self->packet_out_destroy_notify = destroy_notify;
433   g_mutex_unlock (&self->association_mutex);
434 
435   maybe_set_state_to_ready (self);
436 }
437 
438 void
gst_sctp_association_set_on_packet_received(GstSctpAssociation * self,GstSctpAssociationPacketReceivedCb packet_received_cb,gpointer user_data,GDestroyNotify destroy_notify)439 gst_sctp_association_set_on_packet_received (GstSctpAssociation * self,
440     GstSctpAssociationPacketReceivedCb packet_received_cb, gpointer user_data,
441     GDestroyNotify destroy_notify)
442 {
443   g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self));
444 
445   g_mutex_lock (&self->association_mutex);
446   if (self->packet_received_destroy_notify)
447     self->packet_received_destroy_notify (self->packet_received_user_data);
448   self->packet_received_cb = packet_received_cb;
449   self->packet_received_user_data = user_data;
450   self->packet_received_destroy_notify = destroy_notify;
451   g_mutex_unlock (&self->association_mutex);
452 
453   maybe_set_state_to_ready (self);
454 }
455 
456 void
gst_sctp_association_incoming_packet(GstSctpAssociation * self,const guint8 * buf,guint32 length)457 gst_sctp_association_incoming_packet (GstSctpAssociation * self,
458     const guint8 * buf, guint32 length)
459 {
460   usrsctp_conninput ((void *) self, (const void *) buf, (size_t) length, 0);
461 }
462 
463 GstFlowReturn
gst_sctp_association_send_data(GstSctpAssociation * self,const guint8 * buf,guint32 length,guint16 stream_id,guint32 ppid,gboolean ordered,GstSctpAssociationPartialReliability pr,guint32 reliability_param,guint32 * bytes_sent_)464 gst_sctp_association_send_data (GstSctpAssociation * self, const guint8 * buf,
465     guint32 length, guint16 stream_id, guint32 ppid, gboolean ordered,
466     GstSctpAssociationPartialReliability pr, guint32 reliability_param,
467     guint32 * bytes_sent_)
468 {
469   GstFlowReturn flow_ret;
470   struct sctp_sendv_spa spa;
471   gint32 bytes_sent = 0;
472   struct sockaddr_conn remote_addr;
473 
474   g_mutex_lock (&self->association_mutex);
475   if (self->state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
476     if (self->state == GST_SCTP_ASSOCIATION_STATE_DISCONNECTED ||
477         self->state == GST_SCTP_ASSOCIATION_STATE_DISCONNECTING) {
478       GST_ERROR_OBJECT (self, "Disconnected");
479       flow_ret = GST_FLOW_EOS;
480       g_mutex_unlock (&self->association_mutex);
481       goto end;
482     } else {
483       GST_ERROR_OBJECT (self, "Association not connected yet");
484       flow_ret = GST_FLOW_ERROR;
485       g_mutex_unlock (&self->association_mutex);
486       goto end;
487     }
488   }
489   remote_addr = get_sctp_socket_address (self, self->remote_port);
490   g_mutex_unlock (&self->association_mutex);
491 
492   /* TODO: We probably want to split too large chunks into multiple packets
493    * and only set the SCTP_EOR flag on the last one. Firefox is using 0x4000
494    * as the maximum packet size
495    */
496   memset (&spa, 0, sizeof (spa));
497 
498   spa.sendv_sndinfo.snd_ppid = g_htonl (ppid);
499   spa.sendv_sndinfo.snd_sid = stream_id;
500   spa.sendv_sndinfo.snd_flags = SCTP_EOR | (ordered ? 0 : SCTP_UNORDERED);
501   spa.sendv_sndinfo.snd_context = 0;
502   spa.sendv_sndinfo.snd_assoc_id = 0;
503   spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
504   if (pr != GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE) {
505     spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
506     spa.sendv_prinfo.pr_value = g_htonl (reliability_param);
507     if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL)
508       spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL;
509     else if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX)
510       spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX;
511     else if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF)
512       spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_BUF;
513   }
514 
515   bytes_sent =
516       usrsctp_sendv (self->sctp_ass_sock, buf, length,
517       (struct sockaddr *) &remote_addr, 1, (void *) &spa,
518       (socklen_t) sizeof (struct sctp_sendv_spa), SCTP_SENDV_SPA, 0);
519   if (bytes_sent < 0) {
520     if (errno == EAGAIN || errno == EWOULDBLOCK) {
521       bytes_sent = 0;
522       /* Resending this buffer is taken care of by the gstsctpenc */
523       flow_ret = GST_FLOW_OK;
524       goto end;
525     } else {
526       GST_ERROR_OBJECT (self, "Error sending data on stream %u: (%u) %s",
527           stream_id, errno, g_strerror (errno));
528       flow_ret = GST_FLOW_ERROR;
529       goto end;
530     }
531   }
532   flow_ret = GST_FLOW_OK;
533 
534 end:
535   if (bytes_sent_)
536     *bytes_sent_ = bytes_sent;
537 
538   return flow_ret;
539 }
540 
541 void
gst_sctp_association_reset_stream(GstSctpAssociation * self,guint16 stream_id)542 gst_sctp_association_reset_stream (GstSctpAssociation * self, guint16 stream_id)
543 {
544   struct sctp_reset_streams *srs;
545   socklen_t length;
546 
547   length = (socklen_t) (sizeof (struct sctp_reset_streams) + sizeof (guint16));
548   srs = (struct sctp_reset_streams *) g_malloc0 (length);
549   srs->srs_assoc_id = SCTP_ALL_ASSOC;
550   srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
551   srs->srs_number_streams = 1;
552   srs->srs_stream_list[0] = stream_id;
553 
554   usrsctp_setsockopt (self->sctp_ass_sock, IPPROTO_SCTP, SCTP_RESET_STREAMS,
555       srs, length);
556 
557   g_free (srs);
558 }
559 
560 void
gst_sctp_association_force_close(GstSctpAssociation * self)561 gst_sctp_association_force_close (GstSctpAssociation * self)
562 {
563   if (self->sctp_ass_sock) {
564     struct socket *s = self->sctp_ass_sock;
565     self->sctp_ass_sock = NULL;
566     usrsctp_close (s);
567   }
568 
569   gst_sctp_association_change_state (self,
570       GST_SCTP_ASSOCIATION_STATE_DISCONNECTED, TRUE);
571 }
572 
573 static struct socket *
create_sctp_socket(GstSctpAssociation * self)574 create_sctp_socket (GstSctpAssociation * self)
575 {
576   struct socket *sock;
577   struct linger l;
578   struct sctp_event event;
579   struct sctp_assoc_value stream_reset;
580   int buf_size = 1024 * 1024;
581   int value = 1;
582   guint16 event_types[] = {
583     SCTP_ASSOC_CHANGE,
584     SCTP_PEER_ADDR_CHANGE,
585     SCTP_REMOTE_ERROR,
586     SCTP_SEND_FAILED,
587     SCTP_SEND_FAILED_EVENT,
588     SCTP_SHUTDOWN_EVENT,
589     SCTP_ADAPTATION_INDICATION,
590     SCTP_PARTIAL_DELIVERY_EVENT,
591     /*SCTP_AUTHENTICATION_EVENT, */
592     SCTP_STREAM_RESET_EVENT,
593     /*SCTP_SENDER_DRY_EVENT, */
594     /*SCTP_NOTIFICATIONS_STOPPED_EVENT, */
595     /*SCTP_ASSOC_RESET_EVENT, */
596     SCTP_STREAM_CHANGE_EVENT
597   };
598   guint32 i;
599   guint sock_type = self->use_sock_stream ? SOCK_STREAM : SOCK_SEQPACKET;
600 
601   if ((sock =
602           usrsctp_socket (AF_CONN, sock_type, IPPROTO_SCTP, receive_cb, NULL, 0,
603               (void *) self)) == NULL) {
604     GST_ERROR_OBJECT (self, "Could not open SCTP socket: (%u) %s", errno,
605         g_strerror (errno));
606     goto error;
607   }
608 
609   if (usrsctp_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
610           (const void *) &buf_size, sizeof (buf_size)) < 0) {
611     GST_ERROR_OBJECT (self, "Could not change receive buffer size: (%u) %s",
612         errno, g_strerror (errno));
613     goto error;
614   }
615   if (usrsctp_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
616           (const void *) &buf_size, sizeof (buf_size)) < 0) {
617     GST_ERROR_OBJECT (self, "Could not change send buffer size: (%u) %s",
618         errno, g_strerror (errno));
619     goto error;
620   }
621 
622   /* Properly return errors */
623   if (usrsctp_set_non_blocking (sock, 1) < 0) {
624     GST_ERROR_OBJECT (self,
625         "Could not set non-blocking mode on SCTP socket: (%u) %s", errno,
626         g_strerror (errno));
627     goto error;
628   }
629 
630   memset (&l, 0, sizeof (l));
631   l.l_onoff = 1;
632   l.l_linger = 0;
633   if (usrsctp_setsockopt (sock, SOL_SOCKET, SO_LINGER, (const void *) &l,
634           (socklen_t) sizeof (struct linger)) < 0) {
635     GST_ERROR_OBJECT (self, "Could not set SO_LINGER on SCTP socket: (%u) %s",
636         errno, g_strerror (errno));
637     goto error;
638   }
639 
640   if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_REUSE_PORT, &value,
641           sizeof (int))) {
642     GST_DEBUG_OBJECT (self, "Could not set SCTP_REUSE_PORT: (%u) %s", errno,
643         g_strerror (errno));
644   }
645 
646   if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_NODELAY, &value,
647           sizeof (int))) {
648     GST_DEBUG_OBJECT (self, "Could not set SCTP_NODELAY: (%u) %s", errno,
649         g_strerror (errno));
650     goto error;
651   }
652 
653   if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_EXPLICIT_EOR, &value,
654           sizeof (int))) {
655     GST_ERROR_OBJECT (self, "Could not set SCTP_EXPLICIT_EOR: (%u) %s", errno,
656         g_strerror (errno));
657     goto error;
658   }
659 
660   memset (&stream_reset, 0, sizeof (stream_reset));
661   stream_reset.assoc_id = SCTP_ALL_ASSOC;
662   stream_reset.assoc_value =
663       SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
664   if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET,
665           &stream_reset, sizeof (stream_reset))) {
666     GST_ERROR_OBJECT (self,
667         "Could not set SCTP_ENABLE_STREAM_RESET | SCTP_ENABLE_CHANGE_ASSOC_REQ: (%u) %s",
668         errno, g_strerror (errno));
669     goto error;
670   }
671 
672   memset (&event, 0, sizeof (event));
673   event.se_assoc_id = SCTP_ALL_ASSOC;
674   event.se_on = 1;
675   for (i = 0; i < sizeof (event_types) / sizeof (event_types[0]); i++) {
676     event.se_type = event_types[i];
677     if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_EVENT,
678             &event, sizeof (event)) < 0) {
679       GST_ERROR_OBJECT (self, "Failed to register event %u: (%u) %s",
680           event_types[i], errno, g_strerror (errno));
681     }
682   }
683 
684   return sock;
685 error:
686   if (sock)
687     usrsctp_close (sock);
688   return NULL;
689 }
690 
691 static struct sockaddr_conn
get_sctp_socket_address(GstSctpAssociation * gst_sctp_association,guint16 port)692 get_sctp_socket_address (GstSctpAssociation * gst_sctp_association,
693     guint16 port)
694 {
695   struct sockaddr_conn addr;
696 
697   memset ((void *) &addr, 0, sizeof (struct sockaddr_conn));
698 #ifdef __APPLE__
699   addr.sconn_len = sizeof (struct sockaddr_conn);
700 #endif
701   addr.sconn_family = AF_CONN;
702   addr.sconn_port = g_htons (port);
703   addr.sconn_addr = (void *) gst_sctp_association;
704 
705   return addr;
706 }
707 
708 static gboolean
client_role_connect(GstSctpAssociation * self)709 client_role_connect (GstSctpAssociation * self)
710 {
711   struct sockaddr_conn local_addr, remote_addr;
712   struct sctp_paddrparams paddrparams;
713   socklen_t opt_len;
714   gint ret;
715 
716   g_mutex_lock (&self->association_mutex);
717   local_addr = get_sctp_socket_address (self, self->local_port);
718   remote_addr = get_sctp_socket_address (self, self->remote_port);
719   g_mutex_unlock (&self->association_mutex);
720 
721   ret =
722       usrsctp_bind (self->sctp_ass_sock, (struct sockaddr *) &local_addr,
723       sizeof (struct sockaddr_conn));
724   if (ret < 0) {
725     GST_ERROR_OBJECT (self, "usrsctp_bind() error: (%u) %s", errno,
726         g_strerror (errno));
727     goto error;
728   }
729 
730   ret =
731       usrsctp_connect (self->sctp_ass_sock, (struct sockaddr *) &remote_addr,
732       sizeof (struct sockaddr_conn));
733   if (ret < 0 && errno != EINPROGRESS) {
734     GST_ERROR_OBJECT (self, "usrsctp_connect() error: (%u) %s", errno,
735         g_strerror (errno));
736     goto error;
737   }
738 
739   memset (&paddrparams, 0, sizeof (struct sctp_paddrparams));
740   memcpy (&paddrparams.spp_address, &remote_addr,
741       sizeof (struct sockaddr_conn));
742   opt_len = (socklen_t) sizeof (struct sctp_paddrparams);
743   ret =
744       usrsctp_getsockopt (self->sctp_ass_sock, IPPROTO_SCTP,
745       SCTP_PEER_ADDR_PARAMS, &paddrparams, &opt_len);
746   if (ret < 0) {
747     GST_WARNING_OBJECT (self,
748         "usrsctp_getsockopt(SCTP_PEER_ADDR_PARAMS) error: (%u) %s", errno,
749         g_strerror (errno));
750   } else {
751     /* draft-ietf-rtcweb-data-channel-13 section 5: max initial MTU IPV4 1200, IPV6 1280 */
752     paddrparams.spp_pathmtu = 1200;
753     paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE;
754     paddrparams.spp_flags |= SPP_PMTUD_DISABLE;
755     opt_len = (socklen_t) sizeof (struct sctp_paddrparams);
756     ret = usrsctp_setsockopt (self->sctp_ass_sock, IPPROTO_SCTP,
757         SCTP_PEER_ADDR_PARAMS, &paddrparams, opt_len);
758     if (ret < 0) {
759       GST_WARNING_OBJECT (self,
760           "usrsctp_setsockopt(SCTP_PEER_ADDR_PARAMS) error: (%u) %s", errno,
761           g_strerror (errno));
762     } else {
763       GST_DEBUG_OBJECT (self, "PMTUD disabled, MTU set to %u",
764           paddrparams.spp_pathmtu);
765     }
766   }
767 
768   return TRUE;
769 error:
770   return FALSE;
771 }
772 
773 static int
sctp_packet_out(void * addr,void * buffer,size_t length,guint8 tos,guint8 set_df)774 sctp_packet_out (void *addr, void *buffer, size_t length, guint8 tos,
775     guint8 set_df)
776 {
777   GstSctpAssociation *self = GST_SCTP_ASSOCIATION (addr);
778 
779   g_mutex_lock (&self->association_mutex);
780   if (self->packet_out_cb) {
781     self->packet_out_cb (self, buffer, length, self->packet_out_user_data);
782   }
783   g_mutex_unlock (&self->association_mutex);
784 
785   return 0;
786 }
787 
788 static int
receive_cb(struct socket * sock,union sctp_sockstore addr,void * data,size_t datalen,struct sctp_rcvinfo rcv_info,gint flags,void * ulp_info)789 receive_cb (struct socket *sock, union sctp_sockstore addr, void *data,
790     size_t datalen, struct sctp_rcvinfo rcv_info, gint flags, void *ulp_info)
791 {
792   GstSctpAssociation *self = GST_SCTP_ASSOCIATION (ulp_info);
793 
794   if (!data) {
795     /* Not sure if this can happend. */
796     GST_WARNING_OBJECT (self, "Received empty data buffer");
797   } else {
798     if (flags & MSG_NOTIFICATION) {
799       handle_notification (self, (const union sctp_notification *) data,
800           datalen);
801 
802       /* We use this instead of a bare `free()` so that we use the `free` from
803        * the C runtime that usrsctp was built with. This makes a difference on
804        * Windows where libusrstcp and GStreamer can be linked to two different
805        * CRTs. */
806       usrsctp_freedumpbuffer (data);
807     } else {
808       handle_message (self, data, datalen, rcv_info.rcv_sid,
809           ntohl (rcv_info.rcv_ppid));
810     }
811   }
812 
813   return 1;
814 }
815 
816 static void
handle_notification(GstSctpAssociation * self,const union sctp_notification * notification,size_t length)817 handle_notification (GstSctpAssociation * self,
818     const union sctp_notification *notification, size_t length)
819 {
820   g_assert (notification->sn_header.sn_length == length);
821 
822   switch (notification->sn_header.sn_type) {
823     case SCTP_ASSOC_CHANGE:
824       GST_DEBUG_OBJECT (self, "Event: SCTP_ASSOC_CHANGE");
825       handle_association_changed (self, &notification->sn_assoc_change);
826       break;
827     case SCTP_PEER_ADDR_CHANGE:
828       GST_DEBUG_OBJECT (self, "Event: SCTP_PEER_ADDR_CHANGE");
829       break;
830     case SCTP_REMOTE_ERROR:
831       GST_ERROR_OBJECT (self, "Event: SCTP_REMOTE_ERROR (%u)",
832           notification->sn_remote_error.sre_error);
833       break;
834     case SCTP_SEND_FAILED:
835       GST_ERROR_OBJECT (self, "Event: SCTP_SEND_FAILED");
836       break;
837     case SCTP_SHUTDOWN_EVENT:
838       GST_DEBUG_OBJECT (self, "Event: SCTP_SHUTDOWN_EVENT");
839       gst_sctp_association_change_state (self,
840           GST_SCTP_ASSOCIATION_STATE_DISCONNECTING, TRUE);
841       break;
842     case SCTP_ADAPTATION_INDICATION:
843       GST_DEBUG_OBJECT (self, "Event: SCTP_ADAPTATION_INDICATION");
844       break;
845     case SCTP_PARTIAL_DELIVERY_EVENT:
846       GST_DEBUG_OBJECT (self, "Event: SCTP_PARTIAL_DELIVERY_EVENT");
847       break;
848     case SCTP_AUTHENTICATION_EVENT:
849       GST_DEBUG_OBJECT (self, "Event: SCTP_AUTHENTICATION_EVENT");
850       break;
851     case SCTP_STREAM_RESET_EVENT:
852       GST_DEBUG_OBJECT (self, "Event: SCTP_STREAM_RESET_EVENT");
853       handle_stream_reset_event (self, &notification->sn_strreset_event);
854       break;
855     case SCTP_SENDER_DRY_EVENT:
856       GST_DEBUG_OBJECT (self, "Event: SCTP_SENDER_DRY_EVENT");
857       break;
858     case SCTP_NOTIFICATIONS_STOPPED_EVENT:
859       GST_DEBUG_OBJECT (self, "Event: SCTP_NOTIFICATIONS_STOPPED_EVENT");
860       break;
861     case SCTP_ASSOC_RESET_EVENT:
862       GST_DEBUG_OBJECT (self, "Event: SCTP_ASSOC_RESET_EVENT");
863       break;
864     case SCTP_STREAM_CHANGE_EVENT:
865       GST_DEBUG_OBJECT (self, "Event: SCTP_STREAM_CHANGE_EVENT");
866       break;
867     case SCTP_SEND_FAILED_EVENT:
868       GST_ERROR_OBJECT (self, "Event: SCTP_SEND_FAILED_EVENT (%u)",
869           notification->sn_send_failed_event.ssfe_error);
870       break;
871     default:
872       break;
873   }
874 }
875 
876 static void
handle_association_changed(GstSctpAssociation * self,const struct sctp_assoc_change * sac)877 handle_association_changed (GstSctpAssociation * self,
878     const struct sctp_assoc_change *sac)
879 {
880   gboolean change_state = FALSE;
881   GstSctpAssociationState new_state;
882 
883   switch (sac->sac_state) {
884     case SCTP_COMM_UP:
885       GST_DEBUG_OBJECT (self, "SCTP_COMM_UP");
886       g_mutex_lock (&self->association_mutex);
887       if (self->state == GST_SCTP_ASSOCIATION_STATE_CONNECTING) {
888         change_state = TRUE;
889         new_state = GST_SCTP_ASSOCIATION_STATE_CONNECTED;
890         GST_DEBUG_OBJECT (self, "SCTP association connected!");
891       } else if (self->state == GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
892         GST_FIXME_OBJECT (self, "SCTP association already open");
893       } else {
894         GST_WARNING_OBJECT (self, "SCTP association in unexpected state");
895       }
896       g_mutex_unlock (&self->association_mutex);
897       break;
898     case SCTP_COMM_LOST:
899       GST_WARNING_OBJECT (self, "SCTP event SCTP_COMM_LOST received");
900       change_state = TRUE;
901       new_state = GST_SCTP_ASSOCIATION_STATE_ERROR;
902       break;
903     case SCTP_RESTART:
904       GST_DEBUG_OBJECT (self, "SCTP event SCTP_RESTART received");
905       break;
906     case SCTP_SHUTDOWN_COMP:
907       GST_DEBUG_OBJECT (self, "SCTP event SCTP_SHUTDOWN_COMP received");
908       change_state = TRUE;
909       new_state = GST_SCTP_ASSOCIATION_STATE_DISCONNECTED;
910       break;
911     case SCTP_CANT_STR_ASSOC:
912       GST_WARNING_OBJECT (self, "SCTP event SCTP_CANT_STR_ASSOC received");
913       change_state = TRUE;
914       new_state = GST_SCTP_ASSOCIATION_STATE_ERROR;
915       break;
916   }
917 
918   if (change_state)
919     gst_sctp_association_change_state (self, new_state, TRUE);
920 }
921 
922 static void
handle_stream_reset_event(GstSctpAssociation * self,const struct sctp_stream_reset_event * sr)923 handle_stream_reset_event (GstSctpAssociation * self,
924     const struct sctp_stream_reset_event *sr)
925 {
926   guint32 i, n;
927   if (!(sr->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
928       !(sr->strreset_flags & SCTP_STREAM_RESET_DENIED)) {
929     n = (sr->strreset_length -
930         sizeof (struct sctp_stream_reset_event)) / sizeof (uint16_t);
931     for (i = 0; i < n; i++) {
932       if (sr->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
933         g_signal_emit (self, signals[SIGNAL_STREAM_RESET], 0,
934             sr->strreset_stream_list[i]);
935       }
936     }
937   }
938 }
939 
940 static void
handle_message(GstSctpAssociation * self,guint8 * data,guint32 datalen,guint16 stream_id,guint32 ppid)941 handle_message (GstSctpAssociation * self, guint8 * data, guint32 datalen,
942     guint16 stream_id, guint32 ppid)
943 {
944   g_mutex_lock (&self->association_mutex);
945   if (self->packet_received_cb) {
946     /* It's the callbacks job to free the data correctly */
947     self->packet_received_cb (self, data, datalen, stream_id, ppid,
948         self->packet_received_user_data);
949   } else {
950     /* We use this instead of a bare `free()` so that we use the `free` from
951      * the C runtime that usrsctp was built with. This makes a difference on
952      * Windows where libusrstcp and GStreamer can be linked to two different
953      * CRTs. */
954     usrsctp_freedumpbuffer ((gchar *) data);
955   }
956   g_mutex_unlock (&self->association_mutex);
957 }
958 
959 /* Returns TRUE if lock==FALSE and notification is needed later.
960  * Takes the mutex shortly if lock==TRUE! */
961 static gboolean
gst_sctp_association_change_state(GstSctpAssociation * self,GstSctpAssociationState new_state,gboolean lock)962 gst_sctp_association_change_state (GstSctpAssociation * self,
963     GstSctpAssociationState new_state, gboolean lock)
964 {
965   if (lock)
966     g_mutex_lock (&self->association_mutex);
967   if (self->state != new_state
968       && self->state != GST_SCTP_ASSOCIATION_STATE_ERROR) {
969     self->state = new_state;
970     if (lock) {
971       g_mutex_unlock (&self->association_mutex);
972       g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_STATE]);
973       return FALSE;
974     } else {
975       return TRUE;
976     }
977   } else {
978     if (lock)
979       g_mutex_unlock (&self->association_mutex);
980     return FALSE;
981   }
982 }
983