• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 
20 /**
21  * SECTION:gstwebrtc-datachannel
22  * @short_description: RTCDataChannel object
23  * @title: GstWebRTCDataChannel
24  * @see_also: #GstWebRTCRTPTransceiver
25  *
26  * <http://w3c.github.io/webrtc-pc/#dom-rtcsctptransport>
27  */
28 
29 #ifdef HAVE_CONFIG_H
30 # include "config.h"
31 #endif
32 
33 #include "webrtcdatachannel.h"
34 #include <gst/app/gstappsink.h>
35 #include <gst/app/gstappsrc.h>
36 #include <gst/base/gstbytereader.h>
37 #include <gst/base/gstbytewriter.h>
38 #include <gst/sctp/sctpreceivemeta.h>
39 #include <gst/sctp/sctpsendmeta.h>
40 
41 #include "gstwebrtcbin.h"
42 #include "utils.h"
43 
44 #define GST_CAT_DEFAULT webrtc_data_channel_debug
45 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
46 
47 #define webrtc_data_channel_parent_class parent_class
48 G_DEFINE_TYPE_WITH_CODE (WebRTCDataChannel, webrtc_data_channel,
49     GST_TYPE_WEBRTC_DATA_CHANNEL,
50     GST_DEBUG_CATEGORY_INIT (webrtc_data_channel_debug, "webrtcdatachannel", 0,
51         "webrtcdatachannel"););
52 
53 typedef enum
54 {
55   DATA_CHANNEL_PPID_WEBRTC_CONTROL = 50,
56   DATA_CHANNEL_PPID_WEBRTC_STRING = 51,
57   DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL = 52, /* deprecated */
58   DATA_CHANNEL_PPID_WEBRTC_BINARY = 53,
59   DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL = 54, /* deprecated */
60   DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY = 56,
61   DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY = 57,
62 } DataChannelPPID;
63 
64 typedef enum
65 {
66   CHANNEL_TYPE_RELIABLE = 0x00,
67   CHANNEL_TYPE_RELIABLE_UNORDERED = 0x80,
68   CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT = 0x01,
69   CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT_UNORDERED = 0x81,
70   CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED = 0x02,
71   CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED_UNORDERED = 0x82,
72 } DataChannelReliabilityType;
73 
74 typedef enum
75 {
76   CHANNEL_MESSAGE_ACK = 0x02,
77   CHANNEL_MESSAGE_OPEN = 0x03,
78 } DataChannelMessage;
79 
80 static guint16
priority_type_to_uint(GstWebRTCPriorityType pri)81 priority_type_to_uint (GstWebRTCPriorityType pri)
82 {
83   switch (pri) {
84     case GST_WEBRTC_PRIORITY_TYPE_VERY_LOW:
85       return 64;
86     case GST_WEBRTC_PRIORITY_TYPE_LOW:
87       return 192;
88     case GST_WEBRTC_PRIORITY_TYPE_MEDIUM:
89       return 384;
90     case GST_WEBRTC_PRIORITY_TYPE_HIGH:
91       return 768;
92   }
93   g_assert_not_reached ();
94   return 0;
95 }
96 
97 static GstWebRTCPriorityType
priority_uint_to_type(guint16 val)98 priority_uint_to_type (guint16 val)
99 {
100   if (val <= 128)
101     return GST_WEBRTC_PRIORITY_TYPE_VERY_LOW;
102   if (val <= 256)
103     return GST_WEBRTC_PRIORITY_TYPE_LOW;
104   if (val <= 512)
105     return GST_WEBRTC_PRIORITY_TYPE_MEDIUM;
106   return GST_WEBRTC_PRIORITY_TYPE_HIGH;
107 }
108 
109 static GstBuffer *
construct_open_packet(WebRTCDataChannel * channel)110 construct_open_packet (WebRTCDataChannel * channel)
111 {
112   GstByteWriter w;
113   gsize label_len = strlen (channel->parent.label);
114   gsize proto_len = strlen (channel->parent.protocol);
115   gsize size = 12 + label_len + proto_len;
116   DataChannelReliabilityType reliability = 0;
117   guint32 reliability_param = 0;
118   guint16 priority;
119   GstBuffer *buf;
120 
121 /*
122  *    0                   1                   2                   3
123  *    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
124  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
125  *   |  Message Type |  Channel Type |            Priority           |
126  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
127  *   |                    Reliability Parameter                      |
128  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
129  *   |         Label Length          |       Protocol Length         |
130  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
131  *   \                                                               /
132  *   |                             Label                             |
133  *   /                                                               \
134  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
135  *   \                                                               /
136  *   |                            Protocol                           |
137  *   /                                                               \
138  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
139  */
140 
141   gst_byte_writer_init_with_size (&w, size, FALSE);
142 
143   if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_OPEN))
144     g_return_val_if_reached (NULL);
145 
146   if (!channel->parent.ordered)
147     reliability |= 0x80;
148   if (channel->parent.max_retransmits != -1) {
149     reliability |= 0x01;
150     reliability_param = channel->parent.max_retransmits;
151   }
152   if (channel->parent.max_packet_lifetime != -1) {
153     reliability |= 0x02;
154     reliability_param = channel->parent.max_packet_lifetime;
155   }
156 
157   priority = priority_type_to_uint (channel->parent.priority);
158 
159   if (!gst_byte_writer_put_uint8 (&w, (guint8) reliability))
160     g_return_val_if_reached (NULL);
161   if (!gst_byte_writer_put_uint16_be (&w, (guint16) priority))
162     g_return_val_if_reached (NULL);
163   if (!gst_byte_writer_put_uint32_be (&w, (guint32) reliability_param))
164     g_return_val_if_reached (NULL);
165   if (!gst_byte_writer_put_uint16_be (&w, (guint16) label_len))
166     g_return_val_if_reached (NULL);
167   if (!gst_byte_writer_put_uint16_be (&w, (guint16) proto_len))
168     g_return_val_if_reached (NULL);
169   if (!gst_byte_writer_put_data (&w, (guint8 *) channel->parent.label,
170           label_len))
171     g_return_val_if_reached (NULL);
172   if (!gst_byte_writer_put_data (&w, (guint8 *) channel->parent.protocol,
173           proto_len))
174     g_return_val_if_reached (NULL);
175 
176   buf = gst_byte_writer_reset_and_get_buffer (&w);
177 
178   /* send reliable and ordered */
179   gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
180       GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
181 
182   return buf;
183 }
184 
185 static GstBuffer *
construct_ack_packet(WebRTCDataChannel * channel)186 construct_ack_packet (WebRTCDataChannel * channel)
187 {
188   GstByteWriter w;
189   GstBuffer *buf;
190 
191 /*
192  *   0                   1                   2                   3
193  *   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
194  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
195  *   |  Message Type |
196  *   +-+-+-+-+-+-+-+-+
197  */
198 
199   gst_byte_writer_init_with_size (&w, 1, FALSE);
200 
201   if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_ACK))
202     g_return_val_if_reached (NULL);
203 
204   buf = gst_byte_writer_reset_and_get_buffer (&w);
205 
206   /* send reliable and ordered */
207   gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
208       GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
209 
210   return buf;
211 }
212 
213 typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
214     gpointer user_data);
215 
216 struct task
217 {
218   GstWebRTCDataChannel *channel;
219   ChannelTask func;
220   gpointer user_data;
221   GDestroyNotify notify;
222 };
223 
224 static GstStructure *
_execute_task(GstWebRTCBin * webrtc,struct task * task)225 _execute_task (GstWebRTCBin * webrtc, struct task *task)
226 {
227   if (task->func)
228     task->func (task->channel, task->user_data);
229 
230   return NULL;
231 }
232 
233 static void
_free_task(struct task * task)234 _free_task (struct task *task)
235 {
236   gst_object_unref (task->channel);
237 
238   if (task->notify)
239     task->notify (task->user_data);
240   g_free (task);
241 }
242 
243 static void
_channel_enqueue_task(WebRTCDataChannel * channel,ChannelTask func,gpointer user_data,GDestroyNotify notify)244 _channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func,
245     gpointer user_data, GDestroyNotify notify)
246 {
247   struct task *task = g_new0 (struct task, 1);
248 
249   task->channel = gst_object_ref (channel);
250   task->func = func;
251   task->user_data = user_data;
252   task->notify = notify;
253 
254   gst_webrtc_bin_enqueue_task (channel->webrtcbin,
255       (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task,
256       NULL);
257 }
258 
259 static void
_channel_store_error(WebRTCDataChannel * channel,GError * error)260 _channel_store_error (WebRTCDataChannel * channel, GError * error)
261 {
262   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
263   if (error) {
264     GST_WARNING_OBJECT (channel, "Error: %s",
265         error ? error->message : "Unknown");
266     if (!channel->stored_error)
267       channel->stored_error = error;
268     else
269       g_clear_error (&error);
270   }
271   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
272 }
273 
274 static void
_emit_on_open(WebRTCDataChannel * channel,gpointer user_data)275 _emit_on_open (WebRTCDataChannel * channel, gpointer user_data)
276 {
277   gst_webrtc_data_channel_on_open (GST_WEBRTC_DATA_CHANNEL (channel));
278 }
279 
280 static void
_transport_closed(WebRTCDataChannel * channel)281 _transport_closed (WebRTCDataChannel * channel)
282 {
283   GError *error;
284   gboolean both_sides_closed;
285 
286   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
287   error = channel->stored_error;
288   channel->stored_error = NULL;
289 
290   both_sides_closed =
291       channel->peer_closed && channel->parent.buffered_amount <= 0;
292   if (both_sides_closed || error) {
293     channel->peer_closed = FALSE;
294   }
295   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
296 
297   if (error) {
298     gst_webrtc_data_channel_on_error (GST_WEBRTC_DATA_CHANNEL (channel), error);
299     g_clear_error (&error);
300   }
301   if (both_sides_closed || error) {
302     gst_webrtc_data_channel_on_close (GST_WEBRTC_DATA_CHANNEL (channel));
303   }
304 }
305 
306 static void
_close_sctp_stream(WebRTCDataChannel * channel,gpointer user_data)307 _close_sctp_stream (WebRTCDataChannel * channel, gpointer user_data)
308 {
309   GstPad *pad, *peer;
310 
311   GST_INFO_OBJECT (channel, "Closing outgoing SCTP stream %i label \"%s\"",
312       channel->parent.id, channel->parent.label);
313 
314   pad = gst_element_get_static_pad (channel->appsrc, "src");
315   peer = gst_pad_get_peer (pad);
316   gst_object_unref (pad);
317 
318   if (peer) {
319     GstElement *sctpenc = gst_pad_get_parent_element (peer);
320 
321     if (sctpenc) {
322       gst_element_release_request_pad (sctpenc, peer);
323       gst_object_unref (sctpenc);
324     }
325     gst_object_unref (peer);
326   }
327 
328   _transport_closed (channel);
329 }
330 
331 static void
_close_procedure(WebRTCDataChannel * channel,gpointer user_data)332 _close_procedure (WebRTCDataChannel * channel, gpointer user_data)
333 {
334   /* https://www.w3.org/TR/webrtc/#data-transport-closing-procedure */
335   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
336   if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED) {
337     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
338     return;
339   } else if (channel->parent.ready_state ==
340       GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING) {
341     _channel_enqueue_task (channel, (ChannelTask) _transport_closed, NULL,
342         NULL);
343   } else if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_OPEN) {
344     channel->parent.ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING;
345     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
346     g_object_notify (G_OBJECT (channel), "ready-state");
347 
348     GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
349     if (channel->parent.buffered_amount <= 0) {
350       _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream,
351           NULL, NULL);
352     }
353   }
354 
355   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
356 }
357 
358 static void
_on_sctp_stream_reset(WebRTCSCTPTransport * sctp,guint stream_id,WebRTCDataChannel * channel)359 _on_sctp_stream_reset (WebRTCSCTPTransport * sctp, guint stream_id,
360     WebRTCDataChannel * channel)
361 {
362   if (channel->parent.id == stream_id) {
363     GST_INFO_OBJECT (channel,
364         "Received channel close for SCTP stream %i label \"%s\"",
365         channel->parent.id, channel->parent.label);
366 
367     GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
368     channel->peer_closed = TRUE;
369     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
370 
371     _channel_enqueue_task (channel, (ChannelTask) _close_procedure,
372         GUINT_TO_POINTER (stream_id), NULL);
373   }
374 }
375 
376 static void
webrtc_data_channel_close(GstWebRTCDataChannel * channel)377 webrtc_data_channel_close (GstWebRTCDataChannel * channel)
378 {
379   _close_procedure (WEBRTC_DATA_CHANNEL (channel), NULL);
380 }
381 
382 static GstFlowReturn
_parse_control_packet(WebRTCDataChannel * channel,guint8 * data,gsize size,GError ** error)383 _parse_control_packet (WebRTCDataChannel * channel, guint8 * data,
384     gsize size, GError ** error)
385 {
386   GstByteReader r;
387   guint8 message_type;
388   gchar *label = NULL;
389   gchar *proto = NULL;
390 
391   if (!data)
392     g_return_val_if_reached (GST_FLOW_ERROR);
393   if (size < 1)
394     g_return_val_if_reached (GST_FLOW_ERROR);
395 
396   gst_byte_reader_init (&r, data, size);
397 
398   if (!gst_byte_reader_get_uint8 (&r, &message_type))
399     g_return_val_if_reached (GST_FLOW_ERROR);
400 
401   if (message_type == CHANNEL_MESSAGE_ACK) {
402     /* all good */
403     GST_INFO_OBJECT (channel, "Received channel ack");
404     return GST_FLOW_OK;
405   } else if (message_type == CHANNEL_MESSAGE_OPEN) {
406     guint8 reliability;
407     guint32 reliability_param;
408     guint16 priority, label_len, proto_len;
409     const guint8 *src;
410     GstBuffer *buffer;
411     GstFlowReturn ret;
412 
413     GST_INFO_OBJECT (channel, "Received channel open");
414 
415     if (channel->parent.negotiated) {
416       g_set_error (error, GST_WEBRTC_ERROR,
417           GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
418           "Data channel was signalled as negotiated already");
419       g_return_val_if_reached (GST_FLOW_ERROR);
420     }
421 
422     if (channel->opened)
423       return GST_FLOW_OK;
424 
425     if (!gst_byte_reader_get_uint8 (&r, &reliability))
426       goto parse_error;
427     if (!gst_byte_reader_get_uint16_be (&r, &priority))
428       goto parse_error;
429     if (!gst_byte_reader_get_uint32_be (&r, &reliability_param))
430       goto parse_error;
431     if (!gst_byte_reader_get_uint16_be (&r, &label_len))
432       goto parse_error;
433     if (!gst_byte_reader_get_uint16_be (&r, &proto_len))
434       goto parse_error;
435 
436     label = g_new0 (gchar, (gsize) label_len + 1);
437     proto = g_new0 (gchar, (gsize) proto_len + 1);
438 
439     if (!gst_byte_reader_get_data (&r, label_len, &src))
440       goto parse_error;
441     memcpy (label, src, label_len);
442     label[label_len] = '\0';
443     if (!gst_byte_reader_get_data (&r, proto_len, &src))
444       goto parse_error;
445     memcpy (proto, src, proto_len);
446     proto[proto_len] = '\0';
447 
448     g_free (channel->parent.label);
449     channel->parent.label = label;
450     g_free (channel->parent.protocol);
451     channel->parent.protocol = proto;
452     channel->parent.priority = priority_uint_to_type (priority);
453     channel->parent.ordered = !(reliability & 0x80);
454     if (reliability & 0x01) {
455       channel->parent.max_retransmits = reliability_param;
456       channel->parent.max_packet_lifetime = -1;
457     } else if (reliability & 0x02) {
458       channel->parent.max_retransmits = -1;
459       channel->parent.max_packet_lifetime = reliability_param;
460     } else {
461       channel->parent.max_retransmits = -1;
462       channel->parent.max_packet_lifetime = -1;
463     }
464     channel->opened = TRUE;
465 
466     GST_INFO_OBJECT (channel, "Received channel open for SCTP stream %i "
467         "label \"%s\" protocol %s ordered %s", channel->parent.id,
468         channel->parent.label, channel->parent.protocol,
469         channel->parent.ordered ? "true" : "false");
470 
471     _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
472 
473     GST_INFO_OBJECT (channel, "Sending channel ack");
474     buffer = construct_ack_packet (channel);
475 
476     GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
477     channel->parent.buffered_amount += gst_buffer_get_size (buffer);
478     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
479 
480     ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
481     if (ret != GST_FLOW_OK) {
482       g_set_error (error, GST_WEBRTC_ERROR,
483           GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Could not send ack packet");
484       return ret;
485     }
486 
487     return ret;
488   } else {
489     g_set_error (error, GST_WEBRTC_ERROR,
490         GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
491         "Unknown message type in control protocol");
492     return GST_FLOW_ERROR;
493   }
494 
495 parse_error:
496   {
497     g_free (label);
498     g_free (proto);
499     g_set_error (error, GST_WEBRTC_ERROR,
500         GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to parse packet");
501     g_return_val_if_reached (GST_FLOW_ERROR);
502   }
503 }
504 
505 static void
on_sink_eos(GstAppSink * sink,gpointer user_data)506 on_sink_eos (GstAppSink * sink, gpointer user_data)
507 {
508 }
509 
510 struct map_info
511 {
512   GstBuffer *buffer;
513   GstMapInfo map_info;
514 };
515 
516 static void
buffer_unmap_and_unref(struct map_info * info)517 buffer_unmap_and_unref (struct map_info *info)
518 {
519   gst_buffer_unmap (info->buffer, &info->map_info);
520   gst_buffer_unref (info->buffer);
521   g_free (info);
522 }
523 
524 static void
_emit_have_data(WebRTCDataChannel * channel,GBytes * data)525 _emit_have_data (WebRTCDataChannel * channel, GBytes * data)
526 {
527   gst_webrtc_data_channel_on_message_data (GST_WEBRTC_DATA_CHANNEL (channel),
528       data);
529 }
530 
531 static void
_emit_have_string(GstWebRTCDataChannel * channel,gchar * str)532 _emit_have_string (GstWebRTCDataChannel * channel, gchar * str)
533 {
534   gst_webrtc_data_channel_on_message_string (GST_WEBRTC_DATA_CHANNEL (channel),
535       str);
536 }
537 
538 static GstFlowReturn
_data_channel_have_sample(WebRTCDataChannel * channel,GstSample * sample,GError ** error)539 _data_channel_have_sample (WebRTCDataChannel * channel, GstSample * sample,
540     GError ** error)
541 {
542   GstSctpReceiveMeta *receive;
543   GstBuffer *buffer;
544   GstFlowReturn ret = GST_FLOW_OK;
545 
546   GST_LOG_OBJECT (channel, "Received sample %" GST_PTR_FORMAT, sample);
547 
548   g_return_val_if_fail (channel->sctp_transport != NULL, GST_FLOW_ERROR);
549 
550   buffer = gst_sample_get_buffer (sample);
551   if (!buffer) {
552     g_set_error (error, GST_WEBRTC_ERROR,
553         GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "No buffer to handle");
554     return GST_FLOW_ERROR;
555   }
556   receive = gst_sctp_buffer_get_receive_meta (buffer);
557   if (!receive) {
558     g_set_error (error, GST_WEBRTC_ERROR,
559         GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
560         "No SCTP Receive meta on the buffer");
561     return GST_FLOW_ERROR;
562   }
563 
564   switch (receive->ppid) {
565     case DATA_CHANNEL_PPID_WEBRTC_CONTROL:{
566       GstMapInfo info = GST_MAP_INFO_INIT;
567       if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
568         g_set_error (error, GST_WEBRTC_ERROR,
569             GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
570             "Failed to map received buffer");
571         ret = GST_FLOW_ERROR;
572       } else {
573         ret = _parse_control_packet (channel, info.data, info.size, error);
574         gst_buffer_unmap (buffer, &info);
575       }
576       break;
577     }
578     case DATA_CHANNEL_PPID_WEBRTC_STRING:
579     case DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL:{
580       GstMapInfo info = GST_MAP_INFO_INIT;
581       if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
582         g_set_error (error, GST_WEBRTC_ERROR,
583             GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
584             "Failed to map received buffer");
585         ret = GST_FLOW_ERROR;
586       } else {
587         gchar *str = g_strndup ((gchar *) info.data, info.size);
588         _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, str,
589             g_free);
590         gst_buffer_unmap (buffer, &info);
591       }
592       break;
593     }
594     case DATA_CHANNEL_PPID_WEBRTC_BINARY:
595     case DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL:{
596       struct map_info *info = g_new0 (struct map_info, 1);
597       if (!gst_buffer_map (buffer, &info->map_info, GST_MAP_READ)) {
598         g_set_error (error, GST_WEBRTC_ERROR,
599             GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
600             "Failed to map received buffer");
601         ret = GST_FLOW_ERROR;
602       } else {
603         GBytes *data = g_bytes_new_with_free_func (info->map_info.data,
604             info->map_info.size, (GDestroyNotify) buffer_unmap_and_unref, info);
605         info->buffer = gst_buffer_ref (buffer);
606         _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, data,
607             (GDestroyNotify) g_bytes_unref);
608       }
609       break;
610     }
611     case DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY:
612       _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, NULL,
613           NULL);
614       break;
615     case DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY:
616       _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, NULL,
617           NULL);
618       break;
619     default:
620       g_set_error (error, GST_WEBRTC_ERROR,
621           GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
622           "Unknown SCTP PPID %u received", receive->ppid);
623       ret = GST_FLOW_ERROR;
624       break;
625   }
626 
627   return ret;
628 }
629 
630 static GstFlowReturn
on_sink_preroll(GstAppSink * sink,gpointer user_data)631 on_sink_preroll (GstAppSink * sink, gpointer user_data)
632 {
633   WebRTCDataChannel *channel = user_data;
634   GstSample *sample = gst_app_sink_pull_preroll (sink);
635   GstFlowReturn ret;
636 
637   if (sample) {
638     /* This sample also seems to be provided by the sample callback
639        ret = _data_channel_have_sample (channel, sample); */
640     ret = GST_FLOW_OK;
641     gst_sample_unref (sample);
642   } else if (gst_app_sink_is_eos (sink)) {
643     ret = GST_FLOW_EOS;
644   } else {
645     ret = GST_FLOW_ERROR;
646   }
647 
648   if (ret != GST_FLOW_OK) {
649     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
650   }
651 
652   return ret;
653 }
654 
655 static GstFlowReturn
on_sink_sample(GstAppSink * sink,gpointer user_data)656 on_sink_sample (GstAppSink * sink, gpointer user_data)
657 {
658   WebRTCDataChannel *channel = user_data;
659   GstSample *sample = gst_app_sink_pull_sample (sink);
660   GstFlowReturn ret;
661   GError *error = NULL;
662 
663   if (sample) {
664     ret = _data_channel_have_sample (channel, sample, &error);
665     gst_sample_unref (sample);
666   } else if (gst_app_sink_is_eos (sink)) {
667     ret = GST_FLOW_EOS;
668   } else {
669     ret = GST_FLOW_ERROR;
670   }
671 
672   if (error)
673     _channel_store_error (channel, error);
674 
675   if (ret != GST_FLOW_OK) {
676     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
677   }
678 
679   return ret;
680 }
681 
682 static GstAppSinkCallbacks sink_callbacks = {
683   on_sink_eos,
684   on_sink_preroll,
685   on_sink_sample,
686 };
687 
688 void
webrtc_data_channel_start_negotiation(WebRTCDataChannel * channel)689 webrtc_data_channel_start_negotiation (WebRTCDataChannel * channel)
690 {
691   GstBuffer *buffer;
692 
693   g_return_if_fail (!channel->parent.negotiated);
694   g_return_if_fail (channel->parent.id != -1);
695   g_return_if_fail (channel->sctp_transport != NULL);
696 
697   buffer = construct_open_packet (channel);
698 
699   GST_INFO_OBJECT (channel, "Sending channel open for SCTP stream %i "
700       "label \"%s\" protocol %s ordered %s", channel->parent.id,
701       channel->parent.label, channel->parent.protocol,
702       channel->parent.ordered ? "true" : "false");
703 
704   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
705   channel->parent.buffered_amount += gst_buffer_get_size (buffer);
706   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
707   g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
708 
709   if (gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc),
710           buffer) == GST_FLOW_OK) {
711     channel->opened = TRUE;
712     _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
713   } else {
714     GError *error = NULL;
715     g_set_error (&error, GST_WEBRTC_ERROR,
716         GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
717         "Failed to send DCEP open packet");
718     _channel_store_error (channel, error);
719     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
720   }
721 }
722 
723 static void
_get_sctp_reliability(WebRTCDataChannel * channel,GstSctpSendMetaPartiallyReliability * reliability,guint * rel_param)724 _get_sctp_reliability (WebRTCDataChannel * channel,
725     GstSctpSendMetaPartiallyReliability * reliability, guint * rel_param)
726 {
727   if (channel->parent.max_retransmits != -1) {
728     *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX;
729     *rel_param = channel->parent.max_retransmits;
730   } else if (channel->parent.max_packet_lifetime != -1) {
731     *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL;
732     *rel_param = channel->parent.max_packet_lifetime;
733   } else {
734     *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE;
735     *rel_param = 0;
736   }
737 }
738 
739 static gboolean
_is_within_max_message_size(WebRTCDataChannel * channel,gsize size)740 _is_within_max_message_size (WebRTCDataChannel * channel, gsize size)
741 {
742   return size <= channel->sctp_transport->max_message_size;
743 }
744 
745 static void
webrtc_data_channel_send_data(GstWebRTCDataChannel * base_channel,GBytes * bytes)746 webrtc_data_channel_send_data (GstWebRTCDataChannel * base_channel,
747     GBytes * bytes)
748 {
749   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (base_channel);
750   GstSctpSendMetaPartiallyReliability reliability;
751   guint rel_param;
752   guint32 ppid;
753   GstBuffer *buffer;
754   GstFlowReturn ret;
755 
756   if (!bytes) {
757     buffer = gst_buffer_new ();
758     ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY;
759   } else {
760     gsize size;
761     guint8 *data;
762 
763     data = (guint8 *) g_bytes_get_data (bytes, &size);
764     g_return_if_fail (data != NULL);
765     if (!_is_within_max_message_size (channel, size)) {
766       GError *error = NULL;
767       g_set_error (&error, GST_WEBRTC_ERROR,
768           GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
769           "Requested to send data that is too large");
770       _channel_store_error (channel, error);
771       _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
772           NULL);
773       return;
774     }
775 
776     buffer = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, data, size,
777         0, size, g_bytes_ref (bytes), (GDestroyNotify) g_bytes_unref);
778     ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY;
779   }
780 
781   _get_sctp_reliability (channel, &reliability, &rel_param);
782   gst_sctp_buffer_add_send_meta (buffer, ppid, channel->parent.ordered,
783       reliability, rel_param);
784 
785   GST_LOG_OBJECT (channel, "Sending data using buffer %" GST_PTR_FORMAT,
786       buffer);
787 
788   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
789   channel->parent.buffered_amount += gst_buffer_get_size (buffer);
790   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
791   g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
792 
793   ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
794 
795   if (ret != GST_FLOW_OK) {
796     GError *error = NULL;
797     g_set_error (&error, GST_WEBRTC_ERROR,
798         GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
799     _channel_store_error (channel, error);
800     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
801   }
802 }
803 
804 static void
webrtc_data_channel_send_string(GstWebRTCDataChannel * base_channel,const gchar * str)805 webrtc_data_channel_send_string (GstWebRTCDataChannel * base_channel,
806     const gchar * str)
807 {
808   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (base_channel);
809   GstSctpSendMetaPartiallyReliability reliability;
810   guint rel_param;
811   guint32 ppid;
812   GstBuffer *buffer;
813   GstFlowReturn ret;
814 
815   if (!channel->parent.negotiated)
816     g_return_if_fail (channel->opened);
817   g_return_if_fail (channel->sctp_transport != NULL);
818 
819   if (!str) {
820     buffer = gst_buffer_new ();
821     ppid = DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY;
822   } else {
823     gsize size = strlen (str);
824     gchar *str_copy;
825 
826     if (!_is_within_max_message_size (channel, size)) {
827       GError *error = NULL;
828       g_set_error (&error, GST_WEBRTC_ERROR,
829           GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
830           "Requested to send a string that is too large");
831       _channel_store_error (channel, error);
832       _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
833           NULL);
834       return;
835     }
836 
837     str_copy = g_strdup (str);
838     buffer =
839         gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, str_copy,
840         size, 0, size, str_copy, g_free);
841     ppid = DATA_CHANNEL_PPID_WEBRTC_STRING;
842   }
843 
844   _get_sctp_reliability (channel, &reliability, &rel_param);
845   gst_sctp_buffer_add_send_meta (buffer, ppid, channel->parent.ordered,
846       reliability, rel_param);
847 
848   GST_TRACE_OBJECT (channel, "Sending string using buffer %" GST_PTR_FORMAT,
849       buffer);
850 
851   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
852   channel->parent.buffered_amount += gst_buffer_get_size (buffer);
853   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
854   g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
855 
856   ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
857 
858   if (ret != GST_FLOW_OK) {
859     GError *error = NULL;
860     g_set_error (&error, GST_WEBRTC_ERROR,
861         GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send string");
862     _channel_store_error (channel, error);
863     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
864   }
865 }
866 
867 static void
_on_sctp_notify_state_unlocked(GObject * sctp_transport,WebRTCDataChannel * channel)868 _on_sctp_notify_state_unlocked (GObject * sctp_transport,
869     WebRTCDataChannel * channel)
870 {
871   GstWebRTCSCTPTransportState state;
872 
873   g_object_get (sctp_transport, "state", &state, NULL);
874   if (state == GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED) {
875     if (channel->parent.negotiated)
876       _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
877   }
878 }
879 
880 static void
_on_sctp_notify_state(GObject * sctp_transport,GParamSpec * pspec,WebRTCDataChannel * channel)881 _on_sctp_notify_state (GObject * sctp_transport, GParamSpec * pspec,
882     WebRTCDataChannel * channel)
883 {
884   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
885   _on_sctp_notify_state_unlocked (sctp_transport, channel);
886   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
887 }
888 
889 static void
_emit_low_threshold(WebRTCDataChannel * channel,gpointer user_data)890 _emit_low_threshold (WebRTCDataChannel * channel, gpointer user_data)
891 {
892   gst_webrtc_data_channel_on_buffered_amount_low (GST_WEBRTC_DATA_CHANNEL
893       (channel));
894 }
895 
896 static GstPadProbeReturn
on_appsrc_data(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)897 on_appsrc_data (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
898 {
899   WebRTCDataChannel *channel = user_data;
900   guint64 prev_amount;
901   guint64 size = 0;
902 
903   if (GST_PAD_PROBE_INFO_TYPE (info) & (GST_PAD_PROBE_TYPE_BUFFER)) {
904     GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
905     size = gst_buffer_get_size (buffer);
906   } else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
907     GstBufferList *list = GST_PAD_PROBE_INFO_BUFFER_LIST (info);
908     size = gst_buffer_list_calculate_size (list);
909   }
910 
911   if (size > 0) {
912     GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
913     prev_amount = channel->parent.buffered_amount;
914     channel->parent.buffered_amount -= size;
915     GST_TRACE_OBJECT (channel, "checking low-threshold: prev %"
916         G_GUINT64_FORMAT " low-threshold %" G_GUINT64_FORMAT " buffered %"
917         G_GUINT64_FORMAT, prev_amount,
918         channel->parent.buffered_amount_low_threshold,
919         channel->parent.buffered_amount);
920     if (prev_amount >= channel->parent.buffered_amount_low_threshold
921         && channel->parent.buffered_amount <=
922         channel->parent.buffered_amount_low_threshold) {
923       _channel_enqueue_task (channel, (ChannelTask) _emit_low_threshold, NULL,
924           NULL);
925     }
926 
927     if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING
928         && channel->parent.buffered_amount <= 0) {
929       _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream, NULL,
930           NULL);
931     }
932     GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
933     g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
934   }
935 
936   return GST_PAD_PROBE_OK;
937 }
938 
939 static void
gst_webrtc_data_channel_constructed(GObject * object)940 gst_webrtc_data_channel_constructed (GObject * object)
941 {
942   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (object);
943   GstPad *pad;
944   GstCaps *caps;
945 
946   caps = gst_caps_new_any ();
947 
948   channel->appsrc = gst_element_factory_make ("appsrc", NULL);
949   gst_object_ref_sink (channel->appsrc);
950   pad = gst_element_get_static_pad (channel->appsrc, "src");
951 
952   channel->src_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_BOTH,
953       (GstPadProbeCallback) on_appsrc_data, channel, NULL);
954 
955   channel->appsink = gst_element_factory_make ("appsink", NULL);
956   gst_object_ref_sink (channel->appsink);
957   g_object_set (channel->appsink, "sync", FALSE, "async", FALSE, "caps", caps,
958       NULL);
959   gst_app_sink_set_callbacks (GST_APP_SINK (channel->appsink), &sink_callbacks,
960       channel, NULL);
961 
962   gst_object_unref (pad);
963   gst_caps_unref (caps);
964 }
965 
966 static void
gst_webrtc_data_channel_finalize(GObject * object)967 gst_webrtc_data_channel_finalize (GObject * object)
968 {
969   WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (object);
970 
971   if (channel->src_probe) {
972     GstPad *pad = gst_element_get_static_pad (channel->appsrc, "src");
973     gst_pad_remove_probe (pad, channel->src_probe);
974     gst_object_unref (pad);
975     channel->src_probe = 0;
976   }
977 
978   if (channel->sctp_transport)
979     g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
980   g_clear_object (&channel->sctp_transport);
981 
982   g_clear_object (&channel->appsrc);
983   g_clear_object (&channel->appsink);
984 
985   G_OBJECT_CLASS (parent_class)->finalize (object);
986 }
987 
988 static void
webrtc_data_channel_class_init(WebRTCDataChannelClass * klass)989 webrtc_data_channel_class_init (WebRTCDataChannelClass * klass)
990 {
991   GObjectClass *gobject_class = (GObjectClass *) klass;
992   GstWebRTCDataChannelClass *channel_class =
993       (GstWebRTCDataChannelClass *) klass;
994 
995   gobject_class->constructed = gst_webrtc_data_channel_constructed;
996   gobject_class->finalize = gst_webrtc_data_channel_finalize;
997 
998   channel_class->send_data = webrtc_data_channel_send_data;
999   channel_class->send_string = webrtc_data_channel_send_string;
1000   channel_class->close = webrtc_data_channel_close;
1001 }
1002 
1003 static void
webrtc_data_channel_init(WebRTCDataChannel * channel)1004 webrtc_data_channel_init (WebRTCDataChannel * channel)
1005 {
1006 }
1007 
1008 static void
_data_channel_set_sctp_transport(WebRTCDataChannel * channel,WebRTCSCTPTransport * sctp)1009 _data_channel_set_sctp_transport (WebRTCDataChannel * channel,
1010     WebRTCSCTPTransport * sctp)
1011 {
1012   g_return_if_fail (GST_IS_WEBRTC_DATA_CHANNEL (channel));
1013   g_return_if_fail (GST_IS_WEBRTC_SCTP_TRANSPORT (sctp));
1014 
1015   GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
1016   if (channel->sctp_transport)
1017     g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
1018 
1019   gst_object_replace ((GstObject **) & channel->sctp_transport,
1020       GST_OBJECT (sctp));
1021 
1022   if (sctp) {
1023     g_signal_connect (sctp, "stream-reset", G_CALLBACK (_on_sctp_stream_reset),
1024         channel);
1025     g_signal_connect (sctp, "notify::state", G_CALLBACK (_on_sctp_notify_state),
1026         channel);
1027   }
1028   GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
1029 }
1030 
1031 void
webrtc_data_channel_link_to_sctp(WebRTCDataChannel * channel,WebRTCSCTPTransport * sctp_transport)1032 webrtc_data_channel_link_to_sctp (WebRTCDataChannel * channel,
1033     WebRTCSCTPTransport * sctp_transport)
1034 {
1035   if (sctp_transport && !channel->sctp_transport) {
1036     gint id;
1037 
1038     g_object_get (channel, "id", &id, NULL);
1039 
1040     if (sctp_transport->association_established && id != -1) {
1041       gchar *pad_name;
1042 
1043       _data_channel_set_sctp_transport (channel, sctp_transport);
1044       pad_name = g_strdup_printf ("sink_%u", id);
1045       if (!gst_element_link_pads (channel->appsrc, "src",
1046               channel->sctp_transport->sctpenc, pad_name))
1047         g_warn_if_reached ();
1048       g_free (pad_name);
1049 
1050       _on_sctp_notify_state_unlocked (G_OBJECT (sctp_transport), channel);
1051     }
1052   }
1053 }
1054