1 /* GStreamer
2 * Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
13 *
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
18 */
19
20 /**
21 * SECTION:gstwebrtc-datachannel
22 * @short_description: RTCDataChannel object
23 * @title: GstWebRTCDataChannel
24 * @see_also: #GstWebRTCRTPTransceiver
25 *
26 * <http://w3c.github.io/webrtc-pc/#dom-rtcsctptransport>
27 */
28
29 #ifdef HAVE_CONFIG_H
30 # include "config.h"
31 #endif
32
33 #include "webrtcdatachannel.h"
34 #include <gst/app/gstappsink.h>
35 #include <gst/app/gstappsrc.h>
36 #include <gst/base/gstbytereader.h>
37 #include <gst/base/gstbytewriter.h>
38 #include <gst/sctp/sctpreceivemeta.h>
39 #include <gst/sctp/sctpsendmeta.h>
40
41 #include "gstwebrtcbin.h"
42 #include "utils.h"
43
44 #define GST_CAT_DEFAULT webrtc_data_channel_debug
45 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
46
47 #define webrtc_data_channel_parent_class parent_class
48 G_DEFINE_TYPE_WITH_CODE (WebRTCDataChannel, webrtc_data_channel,
49 GST_TYPE_WEBRTC_DATA_CHANNEL,
50 GST_DEBUG_CATEGORY_INIT (webrtc_data_channel_debug, "webrtcdatachannel", 0,
51 "webrtcdatachannel"););
52
53 typedef enum
54 {
55 DATA_CHANNEL_PPID_WEBRTC_CONTROL = 50,
56 DATA_CHANNEL_PPID_WEBRTC_STRING = 51,
57 DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL = 52, /* deprecated */
58 DATA_CHANNEL_PPID_WEBRTC_BINARY = 53,
59 DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL = 54, /* deprecated */
60 DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY = 56,
61 DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY = 57,
62 } DataChannelPPID;
63
64 typedef enum
65 {
66 CHANNEL_TYPE_RELIABLE = 0x00,
67 CHANNEL_TYPE_RELIABLE_UNORDERED = 0x80,
68 CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT = 0x01,
69 CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT_UNORDERED = 0x81,
70 CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED = 0x02,
71 CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED_UNORDERED = 0x82,
72 } DataChannelReliabilityType;
73
74 typedef enum
75 {
76 CHANNEL_MESSAGE_ACK = 0x02,
77 CHANNEL_MESSAGE_OPEN = 0x03,
78 } DataChannelMessage;
79
80 static guint16
priority_type_to_uint(GstWebRTCPriorityType pri)81 priority_type_to_uint (GstWebRTCPriorityType pri)
82 {
83 switch (pri) {
84 case GST_WEBRTC_PRIORITY_TYPE_VERY_LOW:
85 return 64;
86 case GST_WEBRTC_PRIORITY_TYPE_LOW:
87 return 192;
88 case GST_WEBRTC_PRIORITY_TYPE_MEDIUM:
89 return 384;
90 case GST_WEBRTC_PRIORITY_TYPE_HIGH:
91 return 768;
92 }
93 g_assert_not_reached ();
94 return 0;
95 }
96
97 static GstWebRTCPriorityType
priority_uint_to_type(guint16 val)98 priority_uint_to_type (guint16 val)
99 {
100 if (val <= 128)
101 return GST_WEBRTC_PRIORITY_TYPE_VERY_LOW;
102 if (val <= 256)
103 return GST_WEBRTC_PRIORITY_TYPE_LOW;
104 if (val <= 512)
105 return GST_WEBRTC_PRIORITY_TYPE_MEDIUM;
106 return GST_WEBRTC_PRIORITY_TYPE_HIGH;
107 }
108
109 static GstBuffer *
construct_open_packet(WebRTCDataChannel * channel)110 construct_open_packet (WebRTCDataChannel * channel)
111 {
112 GstByteWriter w;
113 gsize label_len = strlen (channel->parent.label);
114 gsize proto_len = strlen (channel->parent.protocol);
115 gsize size = 12 + label_len + proto_len;
116 DataChannelReliabilityType reliability = 0;
117 guint32 reliability_param = 0;
118 guint16 priority;
119 GstBuffer *buf;
120
121 /*
122 * 0 1 2 3
123 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
124 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
125 * | Message Type | Channel Type | Priority |
126 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
127 * | Reliability Parameter |
128 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
129 * | Label Length | Protocol Length |
130 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
131 * \ /
132 * | Label |
133 * / \
134 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
135 * \ /
136 * | Protocol |
137 * / \
138 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
139 */
140
141 gst_byte_writer_init_with_size (&w, size, FALSE);
142
143 if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_OPEN))
144 g_return_val_if_reached (NULL);
145
146 if (!channel->parent.ordered)
147 reliability |= 0x80;
148 if (channel->parent.max_retransmits != -1) {
149 reliability |= 0x01;
150 reliability_param = channel->parent.max_retransmits;
151 }
152 if (channel->parent.max_packet_lifetime != -1) {
153 reliability |= 0x02;
154 reliability_param = channel->parent.max_packet_lifetime;
155 }
156
157 priority = priority_type_to_uint (channel->parent.priority);
158
159 if (!gst_byte_writer_put_uint8 (&w, (guint8) reliability))
160 g_return_val_if_reached (NULL);
161 if (!gst_byte_writer_put_uint16_be (&w, (guint16) priority))
162 g_return_val_if_reached (NULL);
163 if (!gst_byte_writer_put_uint32_be (&w, (guint32) reliability_param))
164 g_return_val_if_reached (NULL);
165 if (!gst_byte_writer_put_uint16_be (&w, (guint16) label_len))
166 g_return_val_if_reached (NULL);
167 if (!gst_byte_writer_put_uint16_be (&w, (guint16) proto_len))
168 g_return_val_if_reached (NULL);
169 if (!gst_byte_writer_put_data (&w, (guint8 *) channel->parent.label,
170 label_len))
171 g_return_val_if_reached (NULL);
172 if (!gst_byte_writer_put_data (&w, (guint8 *) channel->parent.protocol,
173 proto_len))
174 g_return_val_if_reached (NULL);
175
176 buf = gst_byte_writer_reset_and_get_buffer (&w);
177
178 /* send reliable and ordered */
179 gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
180 GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
181
182 return buf;
183 }
184
185 static GstBuffer *
construct_ack_packet(WebRTCDataChannel * channel)186 construct_ack_packet (WebRTCDataChannel * channel)
187 {
188 GstByteWriter w;
189 GstBuffer *buf;
190
191 /*
192 * 0 1 2 3
193 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
194 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
195 * | Message Type |
196 * +-+-+-+-+-+-+-+-+
197 */
198
199 gst_byte_writer_init_with_size (&w, 1, FALSE);
200
201 if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_ACK))
202 g_return_val_if_reached (NULL);
203
204 buf = gst_byte_writer_reset_and_get_buffer (&w);
205
206 /* send reliable and ordered */
207 gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
208 GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
209
210 return buf;
211 }
212
213 typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
214 gpointer user_data);
215
216 struct task
217 {
218 GstWebRTCDataChannel *channel;
219 ChannelTask func;
220 gpointer user_data;
221 GDestroyNotify notify;
222 };
223
224 static GstStructure *
_execute_task(GstWebRTCBin * webrtc,struct task * task)225 _execute_task (GstWebRTCBin * webrtc, struct task *task)
226 {
227 if (task->func)
228 task->func (task->channel, task->user_data);
229
230 return NULL;
231 }
232
233 static void
_free_task(struct task * task)234 _free_task (struct task *task)
235 {
236 gst_object_unref (task->channel);
237
238 if (task->notify)
239 task->notify (task->user_data);
240 g_free (task);
241 }
242
243 static void
_channel_enqueue_task(WebRTCDataChannel * channel,ChannelTask func,gpointer user_data,GDestroyNotify notify)244 _channel_enqueue_task (WebRTCDataChannel * channel, ChannelTask func,
245 gpointer user_data, GDestroyNotify notify)
246 {
247 struct task *task = g_new0 (struct task, 1);
248
249 task->channel = gst_object_ref (channel);
250 task->func = func;
251 task->user_data = user_data;
252 task->notify = notify;
253
254 gst_webrtc_bin_enqueue_task (channel->webrtcbin,
255 (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task,
256 NULL);
257 }
258
259 static void
_channel_store_error(WebRTCDataChannel * channel,GError * error)260 _channel_store_error (WebRTCDataChannel * channel, GError * error)
261 {
262 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
263 if (error) {
264 GST_WARNING_OBJECT (channel, "Error: %s",
265 error ? error->message : "Unknown");
266 if (!channel->stored_error)
267 channel->stored_error = error;
268 else
269 g_clear_error (&error);
270 }
271 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
272 }
273
274 static void
_emit_on_open(WebRTCDataChannel * channel,gpointer user_data)275 _emit_on_open (WebRTCDataChannel * channel, gpointer user_data)
276 {
277 gst_webrtc_data_channel_on_open (GST_WEBRTC_DATA_CHANNEL (channel));
278 }
279
280 static void
_transport_closed(WebRTCDataChannel * channel)281 _transport_closed (WebRTCDataChannel * channel)
282 {
283 GError *error;
284 gboolean both_sides_closed;
285
286 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
287 error = channel->stored_error;
288 channel->stored_error = NULL;
289
290 both_sides_closed =
291 channel->peer_closed && channel->parent.buffered_amount <= 0;
292 if (both_sides_closed || error) {
293 channel->peer_closed = FALSE;
294 }
295 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
296
297 if (error) {
298 gst_webrtc_data_channel_on_error (GST_WEBRTC_DATA_CHANNEL (channel), error);
299 g_clear_error (&error);
300 }
301 if (both_sides_closed || error) {
302 gst_webrtc_data_channel_on_close (GST_WEBRTC_DATA_CHANNEL (channel));
303 }
304 }
305
306 static void
_close_sctp_stream(WebRTCDataChannel * channel,gpointer user_data)307 _close_sctp_stream (WebRTCDataChannel * channel, gpointer user_data)
308 {
309 GstPad *pad, *peer;
310
311 GST_INFO_OBJECT (channel, "Closing outgoing SCTP stream %i label \"%s\"",
312 channel->parent.id, channel->parent.label);
313
314 pad = gst_element_get_static_pad (channel->appsrc, "src");
315 peer = gst_pad_get_peer (pad);
316 gst_object_unref (pad);
317
318 if (peer) {
319 GstElement *sctpenc = gst_pad_get_parent_element (peer);
320
321 if (sctpenc) {
322 gst_element_release_request_pad (sctpenc, peer);
323 gst_object_unref (sctpenc);
324 }
325 gst_object_unref (peer);
326 }
327
328 _transport_closed (channel);
329 }
330
331 static void
_close_procedure(WebRTCDataChannel * channel,gpointer user_data)332 _close_procedure (WebRTCDataChannel * channel, gpointer user_data)
333 {
334 /* https://www.w3.org/TR/webrtc/#data-transport-closing-procedure */
335 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
336 if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED) {
337 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
338 return;
339 } else if (channel->parent.ready_state ==
340 GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING) {
341 _channel_enqueue_task (channel, (ChannelTask) _transport_closed, NULL,
342 NULL);
343 } else if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_OPEN) {
344 channel->parent.ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING;
345 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
346 g_object_notify (G_OBJECT (channel), "ready-state");
347
348 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
349 if (channel->parent.buffered_amount <= 0) {
350 _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream,
351 NULL, NULL);
352 }
353 }
354
355 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
356 }
357
358 static void
_on_sctp_stream_reset(WebRTCSCTPTransport * sctp,guint stream_id,WebRTCDataChannel * channel)359 _on_sctp_stream_reset (WebRTCSCTPTransport * sctp, guint stream_id,
360 WebRTCDataChannel * channel)
361 {
362 if (channel->parent.id == stream_id) {
363 GST_INFO_OBJECT (channel,
364 "Received channel close for SCTP stream %i label \"%s\"",
365 channel->parent.id, channel->parent.label);
366
367 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
368 channel->peer_closed = TRUE;
369 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
370
371 _channel_enqueue_task (channel, (ChannelTask) _close_procedure,
372 GUINT_TO_POINTER (stream_id), NULL);
373 }
374 }
375
376 static void
webrtc_data_channel_close(GstWebRTCDataChannel * channel)377 webrtc_data_channel_close (GstWebRTCDataChannel * channel)
378 {
379 _close_procedure (WEBRTC_DATA_CHANNEL (channel), NULL);
380 }
381
382 static GstFlowReturn
_parse_control_packet(WebRTCDataChannel * channel,guint8 * data,gsize size,GError ** error)383 _parse_control_packet (WebRTCDataChannel * channel, guint8 * data,
384 gsize size, GError ** error)
385 {
386 GstByteReader r;
387 guint8 message_type;
388 gchar *label = NULL;
389 gchar *proto = NULL;
390
391 if (!data)
392 g_return_val_if_reached (GST_FLOW_ERROR);
393 if (size < 1)
394 g_return_val_if_reached (GST_FLOW_ERROR);
395
396 gst_byte_reader_init (&r, data, size);
397
398 if (!gst_byte_reader_get_uint8 (&r, &message_type))
399 g_return_val_if_reached (GST_FLOW_ERROR);
400
401 if (message_type == CHANNEL_MESSAGE_ACK) {
402 /* all good */
403 GST_INFO_OBJECT (channel, "Received channel ack");
404 return GST_FLOW_OK;
405 } else if (message_type == CHANNEL_MESSAGE_OPEN) {
406 guint8 reliability;
407 guint32 reliability_param;
408 guint16 priority, label_len, proto_len;
409 const guint8 *src;
410 GstBuffer *buffer;
411 GstFlowReturn ret;
412
413 GST_INFO_OBJECT (channel, "Received channel open");
414
415 if (channel->parent.negotiated) {
416 g_set_error (error, GST_WEBRTC_ERROR,
417 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
418 "Data channel was signalled as negotiated already");
419 g_return_val_if_reached (GST_FLOW_ERROR);
420 }
421
422 if (channel->opened)
423 return GST_FLOW_OK;
424
425 if (!gst_byte_reader_get_uint8 (&r, &reliability))
426 goto parse_error;
427 if (!gst_byte_reader_get_uint16_be (&r, &priority))
428 goto parse_error;
429 if (!gst_byte_reader_get_uint32_be (&r, &reliability_param))
430 goto parse_error;
431 if (!gst_byte_reader_get_uint16_be (&r, &label_len))
432 goto parse_error;
433 if (!gst_byte_reader_get_uint16_be (&r, &proto_len))
434 goto parse_error;
435
436 label = g_new0 (gchar, (gsize) label_len + 1);
437 proto = g_new0 (gchar, (gsize) proto_len + 1);
438
439 if (!gst_byte_reader_get_data (&r, label_len, &src))
440 goto parse_error;
441 memcpy (label, src, label_len);
442 label[label_len] = '\0';
443 if (!gst_byte_reader_get_data (&r, proto_len, &src))
444 goto parse_error;
445 memcpy (proto, src, proto_len);
446 proto[proto_len] = '\0';
447
448 g_free (channel->parent.label);
449 channel->parent.label = label;
450 g_free (channel->parent.protocol);
451 channel->parent.protocol = proto;
452 channel->parent.priority = priority_uint_to_type (priority);
453 channel->parent.ordered = !(reliability & 0x80);
454 if (reliability & 0x01) {
455 channel->parent.max_retransmits = reliability_param;
456 channel->parent.max_packet_lifetime = -1;
457 } else if (reliability & 0x02) {
458 channel->parent.max_retransmits = -1;
459 channel->parent.max_packet_lifetime = reliability_param;
460 } else {
461 channel->parent.max_retransmits = -1;
462 channel->parent.max_packet_lifetime = -1;
463 }
464 channel->opened = TRUE;
465
466 GST_INFO_OBJECT (channel, "Received channel open for SCTP stream %i "
467 "label \"%s\" protocol %s ordered %s", channel->parent.id,
468 channel->parent.label, channel->parent.protocol,
469 channel->parent.ordered ? "true" : "false");
470
471 _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
472
473 GST_INFO_OBJECT (channel, "Sending channel ack");
474 buffer = construct_ack_packet (channel);
475
476 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
477 channel->parent.buffered_amount += gst_buffer_get_size (buffer);
478 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
479
480 ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
481 if (ret != GST_FLOW_OK) {
482 g_set_error (error, GST_WEBRTC_ERROR,
483 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Could not send ack packet");
484 return ret;
485 }
486
487 return ret;
488 } else {
489 g_set_error (error, GST_WEBRTC_ERROR,
490 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
491 "Unknown message type in control protocol");
492 return GST_FLOW_ERROR;
493 }
494
495 parse_error:
496 {
497 g_free (label);
498 g_free (proto);
499 g_set_error (error, GST_WEBRTC_ERROR,
500 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to parse packet");
501 g_return_val_if_reached (GST_FLOW_ERROR);
502 }
503 }
504
505 static void
on_sink_eos(GstAppSink * sink,gpointer user_data)506 on_sink_eos (GstAppSink * sink, gpointer user_data)
507 {
508 }
509
510 struct map_info
511 {
512 GstBuffer *buffer;
513 GstMapInfo map_info;
514 };
515
516 static void
buffer_unmap_and_unref(struct map_info * info)517 buffer_unmap_and_unref (struct map_info *info)
518 {
519 gst_buffer_unmap (info->buffer, &info->map_info);
520 gst_buffer_unref (info->buffer);
521 g_free (info);
522 }
523
524 static void
_emit_have_data(WebRTCDataChannel * channel,GBytes * data)525 _emit_have_data (WebRTCDataChannel * channel, GBytes * data)
526 {
527 gst_webrtc_data_channel_on_message_data (GST_WEBRTC_DATA_CHANNEL (channel),
528 data);
529 }
530
531 static void
_emit_have_string(GstWebRTCDataChannel * channel,gchar * str)532 _emit_have_string (GstWebRTCDataChannel * channel, gchar * str)
533 {
534 gst_webrtc_data_channel_on_message_string (GST_WEBRTC_DATA_CHANNEL (channel),
535 str);
536 }
537
538 static GstFlowReturn
_data_channel_have_sample(WebRTCDataChannel * channel,GstSample * sample,GError ** error)539 _data_channel_have_sample (WebRTCDataChannel * channel, GstSample * sample,
540 GError ** error)
541 {
542 GstSctpReceiveMeta *receive;
543 GstBuffer *buffer;
544 GstFlowReturn ret = GST_FLOW_OK;
545
546 GST_LOG_OBJECT (channel, "Received sample %" GST_PTR_FORMAT, sample);
547
548 g_return_val_if_fail (channel->sctp_transport != NULL, GST_FLOW_ERROR);
549
550 buffer = gst_sample_get_buffer (sample);
551 if (!buffer) {
552 g_set_error (error, GST_WEBRTC_ERROR,
553 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "No buffer to handle");
554 return GST_FLOW_ERROR;
555 }
556 receive = gst_sctp_buffer_get_receive_meta (buffer);
557 if (!receive) {
558 g_set_error (error, GST_WEBRTC_ERROR,
559 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
560 "No SCTP Receive meta on the buffer");
561 return GST_FLOW_ERROR;
562 }
563
564 switch (receive->ppid) {
565 case DATA_CHANNEL_PPID_WEBRTC_CONTROL:{
566 GstMapInfo info = GST_MAP_INFO_INIT;
567 if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
568 g_set_error (error, GST_WEBRTC_ERROR,
569 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
570 "Failed to map received buffer");
571 ret = GST_FLOW_ERROR;
572 } else {
573 ret = _parse_control_packet (channel, info.data, info.size, error);
574 gst_buffer_unmap (buffer, &info);
575 }
576 break;
577 }
578 case DATA_CHANNEL_PPID_WEBRTC_STRING:
579 case DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL:{
580 GstMapInfo info = GST_MAP_INFO_INIT;
581 if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
582 g_set_error (error, GST_WEBRTC_ERROR,
583 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
584 "Failed to map received buffer");
585 ret = GST_FLOW_ERROR;
586 } else {
587 gchar *str = g_strndup ((gchar *) info.data, info.size);
588 _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, str,
589 g_free);
590 gst_buffer_unmap (buffer, &info);
591 }
592 break;
593 }
594 case DATA_CHANNEL_PPID_WEBRTC_BINARY:
595 case DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL:{
596 struct map_info *info = g_new0 (struct map_info, 1);
597 if (!gst_buffer_map (buffer, &info->map_info, GST_MAP_READ)) {
598 g_set_error (error, GST_WEBRTC_ERROR,
599 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
600 "Failed to map received buffer");
601 ret = GST_FLOW_ERROR;
602 } else {
603 GBytes *data = g_bytes_new_with_free_func (info->map_info.data,
604 info->map_info.size, (GDestroyNotify) buffer_unmap_and_unref, info);
605 info->buffer = gst_buffer_ref (buffer);
606 _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, data,
607 (GDestroyNotify) g_bytes_unref);
608 }
609 break;
610 }
611 case DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY:
612 _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, NULL,
613 NULL);
614 break;
615 case DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY:
616 _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, NULL,
617 NULL);
618 break;
619 default:
620 g_set_error (error, GST_WEBRTC_ERROR,
621 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
622 "Unknown SCTP PPID %u received", receive->ppid);
623 ret = GST_FLOW_ERROR;
624 break;
625 }
626
627 return ret;
628 }
629
630 static GstFlowReturn
on_sink_preroll(GstAppSink * sink,gpointer user_data)631 on_sink_preroll (GstAppSink * sink, gpointer user_data)
632 {
633 WebRTCDataChannel *channel = user_data;
634 GstSample *sample = gst_app_sink_pull_preroll (sink);
635 GstFlowReturn ret;
636
637 if (sample) {
638 /* This sample also seems to be provided by the sample callback
639 ret = _data_channel_have_sample (channel, sample); */
640 ret = GST_FLOW_OK;
641 gst_sample_unref (sample);
642 } else if (gst_app_sink_is_eos (sink)) {
643 ret = GST_FLOW_EOS;
644 } else {
645 ret = GST_FLOW_ERROR;
646 }
647
648 if (ret != GST_FLOW_OK) {
649 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
650 }
651
652 return ret;
653 }
654
655 static GstFlowReturn
on_sink_sample(GstAppSink * sink,gpointer user_data)656 on_sink_sample (GstAppSink * sink, gpointer user_data)
657 {
658 WebRTCDataChannel *channel = user_data;
659 GstSample *sample = gst_app_sink_pull_sample (sink);
660 GstFlowReturn ret;
661 GError *error = NULL;
662
663 if (sample) {
664 ret = _data_channel_have_sample (channel, sample, &error);
665 gst_sample_unref (sample);
666 } else if (gst_app_sink_is_eos (sink)) {
667 ret = GST_FLOW_EOS;
668 } else {
669 ret = GST_FLOW_ERROR;
670 }
671
672 if (error)
673 _channel_store_error (channel, error);
674
675 if (ret != GST_FLOW_OK) {
676 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
677 }
678
679 return ret;
680 }
681
682 static GstAppSinkCallbacks sink_callbacks = {
683 on_sink_eos,
684 on_sink_preroll,
685 on_sink_sample,
686 };
687
688 void
webrtc_data_channel_start_negotiation(WebRTCDataChannel * channel)689 webrtc_data_channel_start_negotiation (WebRTCDataChannel * channel)
690 {
691 GstBuffer *buffer;
692
693 g_return_if_fail (!channel->parent.negotiated);
694 g_return_if_fail (channel->parent.id != -1);
695 g_return_if_fail (channel->sctp_transport != NULL);
696
697 buffer = construct_open_packet (channel);
698
699 GST_INFO_OBJECT (channel, "Sending channel open for SCTP stream %i "
700 "label \"%s\" protocol %s ordered %s", channel->parent.id,
701 channel->parent.label, channel->parent.protocol,
702 channel->parent.ordered ? "true" : "false");
703
704 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
705 channel->parent.buffered_amount += gst_buffer_get_size (buffer);
706 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
707 g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
708
709 if (gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc),
710 buffer) == GST_FLOW_OK) {
711 channel->opened = TRUE;
712 _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
713 } else {
714 GError *error = NULL;
715 g_set_error (&error, GST_WEBRTC_ERROR,
716 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
717 "Failed to send DCEP open packet");
718 _channel_store_error (channel, error);
719 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
720 }
721 }
722
723 static void
_get_sctp_reliability(WebRTCDataChannel * channel,GstSctpSendMetaPartiallyReliability * reliability,guint * rel_param)724 _get_sctp_reliability (WebRTCDataChannel * channel,
725 GstSctpSendMetaPartiallyReliability * reliability, guint * rel_param)
726 {
727 if (channel->parent.max_retransmits != -1) {
728 *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX;
729 *rel_param = channel->parent.max_retransmits;
730 } else if (channel->parent.max_packet_lifetime != -1) {
731 *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL;
732 *rel_param = channel->parent.max_packet_lifetime;
733 } else {
734 *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE;
735 *rel_param = 0;
736 }
737 }
738
739 static gboolean
_is_within_max_message_size(WebRTCDataChannel * channel,gsize size)740 _is_within_max_message_size (WebRTCDataChannel * channel, gsize size)
741 {
742 return size <= channel->sctp_transport->max_message_size;
743 }
744
745 static void
webrtc_data_channel_send_data(GstWebRTCDataChannel * base_channel,GBytes * bytes)746 webrtc_data_channel_send_data (GstWebRTCDataChannel * base_channel,
747 GBytes * bytes)
748 {
749 WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (base_channel);
750 GstSctpSendMetaPartiallyReliability reliability;
751 guint rel_param;
752 guint32 ppid;
753 GstBuffer *buffer;
754 GstFlowReturn ret;
755
756 if (!bytes) {
757 buffer = gst_buffer_new ();
758 ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY;
759 } else {
760 gsize size;
761 guint8 *data;
762
763 data = (guint8 *) g_bytes_get_data (bytes, &size);
764 g_return_if_fail (data != NULL);
765 if (!_is_within_max_message_size (channel, size)) {
766 GError *error = NULL;
767 g_set_error (&error, GST_WEBRTC_ERROR,
768 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
769 "Requested to send data that is too large");
770 _channel_store_error (channel, error);
771 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
772 NULL);
773 return;
774 }
775
776 buffer = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, data, size,
777 0, size, g_bytes_ref (bytes), (GDestroyNotify) g_bytes_unref);
778 ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY;
779 }
780
781 _get_sctp_reliability (channel, &reliability, &rel_param);
782 gst_sctp_buffer_add_send_meta (buffer, ppid, channel->parent.ordered,
783 reliability, rel_param);
784
785 GST_LOG_OBJECT (channel, "Sending data using buffer %" GST_PTR_FORMAT,
786 buffer);
787
788 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
789 channel->parent.buffered_amount += gst_buffer_get_size (buffer);
790 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
791 g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
792
793 ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
794
795 if (ret != GST_FLOW_OK) {
796 GError *error = NULL;
797 g_set_error (&error, GST_WEBRTC_ERROR,
798 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
799 _channel_store_error (channel, error);
800 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
801 }
802 }
803
804 static void
webrtc_data_channel_send_string(GstWebRTCDataChannel * base_channel,const gchar * str)805 webrtc_data_channel_send_string (GstWebRTCDataChannel * base_channel,
806 const gchar * str)
807 {
808 WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (base_channel);
809 GstSctpSendMetaPartiallyReliability reliability;
810 guint rel_param;
811 guint32 ppid;
812 GstBuffer *buffer;
813 GstFlowReturn ret;
814
815 if (!channel->parent.negotiated)
816 g_return_if_fail (channel->opened);
817 g_return_if_fail (channel->sctp_transport != NULL);
818
819 if (!str) {
820 buffer = gst_buffer_new ();
821 ppid = DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY;
822 } else {
823 gsize size = strlen (str);
824 gchar *str_copy;
825
826 if (!_is_within_max_message_size (channel, size)) {
827 GError *error = NULL;
828 g_set_error (&error, GST_WEBRTC_ERROR,
829 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE,
830 "Requested to send a string that is too large");
831 _channel_store_error (channel, error);
832 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
833 NULL);
834 return;
835 }
836
837 str_copy = g_strdup (str);
838 buffer =
839 gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, str_copy,
840 size, 0, size, str_copy, g_free);
841 ppid = DATA_CHANNEL_PPID_WEBRTC_STRING;
842 }
843
844 _get_sctp_reliability (channel, &reliability, &rel_param);
845 gst_sctp_buffer_add_send_meta (buffer, ppid, channel->parent.ordered,
846 reliability, rel_param);
847
848 GST_TRACE_OBJECT (channel, "Sending string using buffer %" GST_PTR_FORMAT,
849 buffer);
850
851 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
852 channel->parent.buffered_amount += gst_buffer_get_size (buffer);
853 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
854 g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
855
856 ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
857
858 if (ret != GST_FLOW_OK) {
859 GError *error = NULL;
860 g_set_error (&error, GST_WEBRTC_ERROR,
861 GST_WEBRTC_ERROR_DATA_CHANNEL_FAILURE, "Failed to send string");
862 _channel_store_error (channel, error);
863 _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
864 }
865 }
866
867 static void
_on_sctp_notify_state_unlocked(GObject * sctp_transport,WebRTCDataChannel * channel)868 _on_sctp_notify_state_unlocked (GObject * sctp_transport,
869 WebRTCDataChannel * channel)
870 {
871 GstWebRTCSCTPTransportState state;
872
873 g_object_get (sctp_transport, "state", &state, NULL);
874 if (state == GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED) {
875 if (channel->parent.negotiated)
876 _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
877 }
878 }
879
880 static void
_on_sctp_notify_state(GObject * sctp_transport,GParamSpec * pspec,WebRTCDataChannel * channel)881 _on_sctp_notify_state (GObject * sctp_transport, GParamSpec * pspec,
882 WebRTCDataChannel * channel)
883 {
884 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
885 _on_sctp_notify_state_unlocked (sctp_transport, channel);
886 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
887 }
888
889 static void
_emit_low_threshold(WebRTCDataChannel * channel,gpointer user_data)890 _emit_low_threshold (WebRTCDataChannel * channel, gpointer user_data)
891 {
892 gst_webrtc_data_channel_on_buffered_amount_low (GST_WEBRTC_DATA_CHANNEL
893 (channel));
894 }
895
896 static GstPadProbeReturn
on_appsrc_data(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)897 on_appsrc_data (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
898 {
899 WebRTCDataChannel *channel = user_data;
900 guint64 prev_amount;
901 guint64 size = 0;
902
903 if (GST_PAD_PROBE_INFO_TYPE (info) & (GST_PAD_PROBE_TYPE_BUFFER)) {
904 GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
905 size = gst_buffer_get_size (buffer);
906 } else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
907 GstBufferList *list = GST_PAD_PROBE_INFO_BUFFER_LIST (info);
908 size = gst_buffer_list_calculate_size (list);
909 }
910
911 if (size > 0) {
912 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
913 prev_amount = channel->parent.buffered_amount;
914 channel->parent.buffered_amount -= size;
915 GST_TRACE_OBJECT (channel, "checking low-threshold: prev %"
916 G_GUINT64_FORMAT " low-threshold %" G_GUINT64_FORMAT " buffered %"
917 G_GUINT64_FORMAT, prev_amount,
918 channel->parent.buffered_amount_low_threshold,
919 channel->parent.buffered_amount);
920 if (prev_amount >= channel->parent.buffered_amount_low_threshold
921 && channel->parent.buffered_amount <=
922 channel->parent.buffered_amount_low_threshold) {
923 _channel_enqueue_task (channel, (ChannelTask) _emit_low_threshold, NULL,
924 NULL);
925 }
926
927 if (channel->parent.ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING
928 && channel->parent.buffered_amount <= 0) {
929 _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream, NULL,
930 NULL);
931 }
932 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
933 g_object_notify (G_OBJECT (&channel->parent), "buffered-amount");
934 }
935
936 return GST_PAD_PROBE_OK;
937 }
938
939 static void
gst_webrtc_data_channel_constructed(GObject * object)940 gst_webrtc_data_channel_constructed (GObject * object)
941 {
942 WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (object);
943 GstPad *pad;
944 GstCaps *caps;
945
946 caps = gst_caps_new_any ();
947
948 channel->appsrc = gst_element_factory_make ("appsrc", NULL);
949 gst_object_ref_sink (channel->appsrc);
950 pad = gst_element_get_static_pad (channel->appsrc, "src");
951
952 channel->src_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_BOTH,
953 (GstPadProbeCallback) on_appsrc_data, channel, NULL);
954
955 channel->appsink = gst_element_factory_make ("appsink", NULL);
956 gst_object_ref_sink (channel->appsink);
957 g_object_set (channel->appsink, "sync", FALSE, "async", FALSE, "caps", caps,
958 NULL);
959 gst_app_sink_set_callbacks (GST_APP_SINK (channel->appsink), &sink_callbacks,
960 channel, NULL);
961
962 gst_object_unref (pad);
963 gst_caps_unref (caps);
964 }
965
966 static void
gst_webrtc_data_channel_finalize(GObject * object)967 gst_webrtc_data_channel_finalize (GObject * object)
968 {
969 WebRTCDataChannel *channel = WEBRTC_DATA_CHANNEL (object);
970
971 if (channel->src_probe) {
972 GstPad *pad = gst_element_get_static_pad (channel->appsrc, "src");
973 gst_pad_remove_probe (pad, channel->src_probe);
974 gst_object_unref (pad);
975 channel->src_probe = 0;
976 }
977
978 if (channel->sctp_transport)
979 g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
980 g_clear_object (&channel->sctp_transport);
981
982 g_clear_object (&channel->appsrc);
983 g_clear_object (&channel->appsink);
984
985 G_OBJECT_CLASS (parent_class)->finalize (object);
986 }
987
988 static void
webrtc_data_channel_class_init(WebRTCDataChannelClass * klass)989 webrtc_data_channel_class_init (WebRTCDataChannelClass * klass)
990 {
991 GObjectClass *gobject_class = (GObjectClass *) klass;
992 GstWebRTCDataChannelClass *channel_class =
993 (GstWebRTCDataChannelClass *) klass;
994
995 gobject_class->constructed = gst_webrtc_data_channel_constructed;
996 gobject_class->finalize = gst_webrtc_data_channel_finalize;
997
998 channel_class->send_data = webrtc_data_channel_send_data;
999 channel_class->send_string = webrtc_data_channel_send_string;
1000 channel_class->close = webrtc_data_channel_close;
1001 }
1002
1003 static void
webrtc_data_channel_init(WebRTCDataChannel * channel)1004 webrtc_data_channel_init (WebRTCDataChannel * channel)
1005 {
1006 }
1007
1008 static void
_data_channel_set_sctp_transport(WebRTCDataChannel * channel,WebRTCSCTPTransport * sctp)1009 _data_channel_set_sctp_transport (WebRTCDataChannel * channel,
1010 WebRTCSCTPTransport * sctp)
1011 {
1012 g_return_if_fail (GST_IS_WEBRTC_DATA_CHANNEL (channel));
1013 g_return_if_fail (GST_IS_WEBRTC_SCTP_TRANSPORT (sctp));
1014
1015 GST_WEBRTC_DATA_CHANNEL_LOCK (channel);
1016 if (channel->sctp_transport)
1017 g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
1018
1019 gst_object_replace ((GstObject **) & channel->sctp_transport,
1020 GST_OBJECT (sctp));
1021
1022 if (sctp) {
1023 g_signal_connect (sctp, "stream-reset", G_CALLBACK (_on_sctp_stream_reset),
1024 channel);
1025 g_signal_connect (sctp, "notify::state", G_CALLBACK (_on_sctp_notify_state),
1026 channel);
1027 }
1028 GST_WEBRTC_DATA_CHANNEL_UNLOCK (channel);
1029 }
1030
1031 void
webrtc_data_channel_link_to_sctp(WebRTCDataChannel * channel,WebRTCSCTPTransport * sctp_transport)1032 webrtc_data_channel_link_to_sctp (WebRTCDataChannel * channel,
1033 WebRTCSCTPTransport * sctp_transport)
1034 {
1035 if (sctp_transport && !channel->sctp_transport) {
1036 gint id;
1037
1038 g_object_get (channel, "id", &id, NULL);
1039
1040 if (sctp_transport->association_established && id != -1) {
1041 gchar *pad_name;
1042
1043 _data_channel_set_sctp_transport (channel, sctp_transport);
1044 pad_name = g_strdup_printf ("sink_%u", id);
1045 if (!gst_element_link_pads (channel->appsrc, "src",
1046 channel->sctp_transport->sctpenc, pad_name))
1047 g_warn_if_reached ();
1048 g_free (pad_name);
1049
1050 _on_sctp_notify_state_unlocked (G_OBJECT (sctp_transport), channel);
1051 }
1052 }
1053 }
1054