1 /* GStreamer
2 * Copyright (C) 2017 Matthew Waters <matthew@centricular.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
13 *
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
18 */
19
20 #ifdef HAVE_CONFIG_H
21 # include "config.h"
22 #endif
23
24 #include "transportsendbin.h"
25 #include "utils.h"
26 #include "gst/webrtc/webrtc-priv.h"
27
28 /*
29 * ,--------------transport_send_%u-------- ---,
30 * ; ,-----dtlssrtpenc---, ;
31 * data_sink o---o data_sink ; ;
32 * ; ; ; ,---nicesink---, ;
33 * rtp_sink o---o rtp_sink_0 src o--o sink ; ;
34 * ; ; ; '--------------' ;
35 * rtcp_sink o---o rtcp_sink_0 ; ;
36 * ; '-------------------'
37 * '-------------------------------------------'
38 *
39 *
40 * FIXME: Do we need a valve drop=TRUE for the no RTCP case?
41 */
42
43 #define GST_CAT_DEFAULT gst_webrtc_transport_send_bin_debug
44 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
45
46 #define transport_send_bin_parent_class parent_class
47 G_DEFINE_TYPE_WITH_CODE (TransportSendBin, transport_send_bin, GST_TYPE_BIN,
48 GST_DEBUG_CATEGORY_INIT (gst_webrtc_transport_send_bin_debug,
49 "webrtctransportsendbin", 0, "webrtctransportsendbin"););
50
51 static GstStaticPadTemplate rtp_sink_template =
52 GST_STATIC_PAD_TEMPLATE ("rtp_sink",
53 GST_PAD_SINK,
54 GST_PAD_ALWAYS,
55 GST_STATIC_CAPS ("application/x-rtp"));
56
57 static GstStaticPadTemplate rtcp_sink_template =
58 GST_STATIC_PAD_TEMPLATE ("rtcp_sink",
59 GST_PAD_SINK,
60 GST_PAD_ALWAYS,
61 GST_STATIC_CAPS ("application/x-rtp"));
62
63 static GstStaticPadTemplate data_sink_template =
64 GST_STATIC_PAD_TEMPLATE ("data_sink",
65 GST_PAD_SINK,
66 GST_PAD_ALWAYS,
67 GST_STATIC_CAPS_ANY);
68
69 enum
70 {
71 PROP_0,
72 PROP_STREAM,
73 };
74
75 #define TSB_GET_LOCK(tsb) (&tsb->lock)
76 #define TSB_LOCK(tsb) (g_mutex_lock (TSB_GET_LOCK(tsb)))
77 #define TSB_UNLOCK(tsb) (g_mutex_unlock (TSB_GET_LOCK(tsb)))
78
79 static void cleanup_blocks (TransportSendBin * send);
80
81 static void
transport_send_bin_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)82 transport_send_bin_set_property (GObject * object, guint prop_id,
83 const GValue * value, GParamSpec * pspec)
84 {
85 TransportSendBin *send = TRANSPORT_SEND_BIN (object);
86
87 GST_OBJECT_LOCK (send);
88 switch (prop_id) {
89 case PROP_STREAM:
90 /* XXX: weak-ref this? Note, it's construct-only so can't be changed later */
91 send->stream = TRANSPORT_STREAM (g_value_get_object (value));
92 break;
93 default:
94 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
95 break;
96 }
97 GST_OBJECT_UNLOCK (send);
98 }
99
100 static void
transport_send_bin_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)101 transport_send_bin_get_property (GObject * object, guint prop_id,
102 GValue * value, GParamSpec * pspec)
103 {
104 TransportSendBin *send = TRANSPORT_SEND_BIN (object);
105
106 GST_OBJECT_LOCK (send);
107 switch (prop_id) {
108 case PROP_STREAM:
109 g_value_set_object (value, send->stream);
110 break;
111 default:
112 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
113 break;
114 }
115 GST_OBJECT_UNLOCK (send);
116 }
117
118 static GstPadProbeReturn
pad_block(GstPad * pad,GstPadProbeInfo * info,gpointer unused)119 pad_block (GstPad * pad, GstPadProbeInfo * info, gpointer unused)
120 {
121 /* Drop all events: we don't care about them and don't want to block on
122 * them. Sticky events would be forwarded again later once we unblock
123 * and we don't want to forward them here already because that might
124 * cause a spurious GST_FLOW_FLUSHING */
125 if (GST_IS_EVENT (info->data))
126 return GST_PAD_PROBE_DROP;
127
128 /* But block on any actual data-flow so we don't accidentally send that
129 * to a pad that is not ready yet, causing GST_FLOW_FLUSHING and everything
130 * to silently stop.
131 */
132 GST_LOG_OBJECT (pad, "blocking pad with data %" GST_PTR_FORMAT, info->data);
133
134 return GST_PAD_PROBE_OK;
135 }
136
137 /* We block RTP/RTCP dataflow until the relevant DTLS key
138 * nego is done, but we need to block the *peer* src pad
139 * because the dtlssrtpenc state changes are done manually,
140 * and otherwise we can get state change problems trying to shut down */
141 static struct pad_block *
block_peer_pad(GstElement * elem,const gchar * pad_name)142 block_peer_pad (GstElement * elem, const gchar * pad_name)
143 {
144 GstPad *pad, *peer;
145 struct pad_block *block;
146
147 pad = gst_element_get_static_pad (elem, pad_name);
148 peer = gst_pad_get_peer (pad);
149 block = _create_pad_block (elem, peer, 0, NULL, NULL);
150 block->block_id = gst_pad_add_probe (peer,
151 GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
152 (GstPadProbeCallback) pad_block, NULL, NULL);
153 gst_object_unref (pad);
154 gst_object_unref (peer);
155 return block;
156 }
157
158 static GstStateChangeReturn
transport_send_bin_change_state(GstElement * element,GstStateChange transition)159 transport_send_bin_change_state (GstElement * element,
160 GstStateChange transition)
161 {
162 TransportSendBin *send = TRANSPORT_SEND_BIN (element);
163 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
164
165 GST_DEBUG_OBJECT (element, "changing state: %s => %s",
166 gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
167 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
168
169 switch (transition) {
170 case GST_STATE_CHANGE_NULL_TO_READY:{
171 /* XXX: don't change state until the client-ness has been chosen
172 * arguably the element should be able to deal with this itself or
173 * we should only add it once/if we get the encoding keys */
174 TSB_LOCK (send);
175 gst_element_set_locked_state (send->dtlssrtpenc, TRUE);
176 send->active = TRUE;
177 send->has_clientness = FALSE;
178 TSB_UNLOCK (send);
179 break;
180 }
181 case GST_STATE_CHANGE_READY_TO_PAUSED:{
182 GstElement *elem;
183
184 TSB_LOCK (send);
185 /* RTP */
186 /* unblock the encoder once the key is set, this should also be automatic */
187 elem = send->stream->transport->dtlssrtpenc;
188 send->rtp_block = block_peer_pad (elem, "rtp_sink_0");
189 /* Also block the RTCP pad on the RTP encoder, in case we mux RTCP */
190 send->rtcp_block = block_peer_pad (elem, "rtcp_sink_0");
191 /* unblock ice sink once a connection is made, this should also be automatic */
192 elem = send->stream->transport->transport->sink;
193
194 TSB_UNLOCK (send);
195 break;
196 }
197 default:
198 break;
199 }
200
201 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
202 if (ret == GST_STATE_CHANGE_FAILURE) {
203 GST_WARNING_OBJECT (element, "Parent state change handler failed");
204 return ret;
205 }
206
207 switch (transition) {
208 case GST_STATE_CHANGE_PAUSED_TO_READY:
209 {
210 /* Now that everything is stopped, we can remove the pad blocks
211 * if they still exist, without accidentally feeding data to the
212 * dtlssrtpenc elements */
213 TSB_LOCK (send);
214 cleanup_blocks (send);
215 TSB_UNLOCK (send);
216 break;
217 }
218 case GST_STATE_CHANGE_READY_TO_NULL:{
219 TSB_LOCK (send);
220 send->active = FALSE;
221 cleanup_blocks (send);
222
223 gst_element_set_locked_state (send->dtlssrtpenc, FALSE);
224 TSB_UNLOCK (send);
225
226 break;
227 }
228 default:
229 break;
230 }
231
232 return ret;
233 }
234
235 static void
_on_dtls_enc_key_set(GstElement * dtlssrtpenc,TransportSendBin * send)236 _on_dtls_enc_key_set (GstElement * dtlssrtpenc, TransportSendBin * send)
237 {
238 if (dtlssrtpenc != send->dtlssrtpenc) {
239 GST_WARNING_OBJECT (send,
240 "Received dtls-enc key info for unknown element %" GST_PTR_FORMAT,
241 dtlssrtpenc);
242 return;
243 }
244
245 TSB_LOCK (send);
246 if (!send->active) {
247 GST_INFO_OBJECT (send, "Received dtls-enc key info from %" GST_PTR_FORMAT
248 "when not active", dtlssrtpenc);
249 goto done;
250 }
251
252 GST_LOG_OBJECT (send, "Unblocking %" GST_PTR_FORMAT " pads", dtlssrtpenc);
253 _free_pad_block (send->rtp_block);
254 _free_pad_block (send->rtcp_block);
255 send->rtp_block = send->rtcp_block = NULL;
256
257 done:
258 TSB_UNLOCK (send);
259 }
260
261 static void
maybe_start_enc(TransportSendBin * send)262 maybe_start_enc (TransportSendBin * send)
263 {
264 GstWebRTCICEConnectionState state;
265
266 if (!send->has_clientness) {
267 GST_LOG_OBJECT (send, "Can't start DTLS because doesn't know client-ness");
268 return;
269 }
270
271 g_object_get (send->stream->transport->transport, "state", &state, NULL);
272 if (state != GST_WEBRTC_ICE_CONNECTION_STATE_CONNECTED &&
273 state != GST_WEBRTC_ICE_CONNECTION_STATE_COMPLETED) {
274 GST_LOG_OBJECT (send, "Can't start DTLS yet because ICE is not connected.");
275 return;
276 }
277
278 gst_element_set_locked_state (send->dtlssrtpenc, FALSE);
279 gst_element_sync_state_with_parent (send->dtlssrtpenc);
280 }
281
282 static void
_on_notify_dtls_client_status(GstElement * dtlssrtpenc,GParamSpec * pspec,TransportSendBin * send)283 _on_notify_dtls_client_status (GstElement * dtlssrtpenc,
284 GParamSpec * pspec, TransportSendBin * send)
285 {
286 if (dtlssrtpenc != send->dtlssrtpenc) {
287 GST_WARNING_OBJECT (send,
288 "Received dtls-enc client mode for unknown element %" GST_PTR_FORMAT,
289 dtlssrtpenc);
290 return;
291 }
292
293 TSB_LOCK (send);
294 if (!send->active) {
295 GST_DEBUG_OBJECT (send,
296 "DTLS-SRTP encoder ready after we're already stopping");
297 goto done;
298 }
299
300 send->has_clientness = TRUE;
301 GST_DEBUG_OBJECT (send,
302 "DTLS-SRTP encoder configured. Unlocking it and maybe changing state %"
303 GST_PTR_FORMAT, dtlssrtpenc);
304 maybe_start_enc (send);
305
306 done:
307 TSB_UNLOCK (send);
308 }
309
310 static void
_on_notify_ice_connection_state(GstWebRTCICETransport * transport,GParamSpec * pspec,TransportSendBin * send)311 _on_notify_ice_connection_state (GstWebRTCICETransport * transport,
312 GParamSpec * pspec, TransportSendBin * send)
313 {
314 TSB_LOCK (send);
315 maybe_start_enc (send);
316 TSB_UNLOCK (send);
317 }
318
319 static void
transport_send_bin_constructed(GObject * object)320 transport_send_bin_constructed (GObject * object)
321 {
322 TransportSendBin *send = TRANSPORT_SEND_BIN (object);
323 GstPadTemplate *templ;
324 GstPad *ghost, *pad;
325
326 g_return_if_fail (send->stream);
327
328 send->dtlssrtpenc = send->stream->transport->dtlssrtpenc;
329 send->nicesink = send->stream->transport->transport->sink;
330
331 /* unblock the encoder once the key is set */
332 g_signal_connect (send->dtlssrtpenc, "on-key-set",
333 G_CALLBACK (_on_dtls_enc_key_set), send);
334 /* Bring the encoder up to current state only once the is-client prop is set */
335 g_signal_connect (send->dtlssrtpenc, "notify::is-client",
336 G_CALLBACK (_on_notify_dtls_client_status), send);
337 /* unblock ice sink once it signals a connection */
338 g_signal_connect (send->stream->transport->transport, "notify::state",
339 G_CALLBACK (_on_notify_ice_connection_state), send);
340
341 gst_bin_add (GST_BIN (send), GST_ELEMENT (send->dtlssrtpenc));
342 gst_bin_add (GST_BIN (send), GST_ELEMENT (send->nicesink));
343
344 if (!gst_element_link_pads (GST_ELEMENT (send->dtlssrtpenc), "src",
345 send->nicesink, "sink"))
346 g_warn_if_reached ();
347
348 templ = _find_pad_template (send->dtlssrtpenc, GST_PAD_SINK, GST_PAD_REQUEST,
349 "rtp_sink_%d");
350 pad = gst_element_request_pad (send->dtlssrtpenc, templ, "rtp_sink_0", NULL);
351
352 ghost = gst_ghost_pad_new ("rtp_sink", pad);
353 gst_element_add_pad (GST_ELEMENT (send), ghost);
354 gst_object_unref (pad);
355
356 /* push the data stream onto the RTP dtls element */
357 templ = _find_pad_template (send->dtlssrtpenc, GST_PAD_SINK, GST_PAD_REQUEST,
358 "data_sink");
359 pad = gst_element_request_pad (send->dtlssrtpenc, templ, "data_sink", NULL);
360
361 ghost = gst_ghost_pad_new ("data_sink", pad);
362 gst_element_add_pad (GST_ELEMENT (send), ghost);
363 gst_object_unref (pad);
364
365 /* RTCP */
366 /* Do the common init for the context struct */
367 templ = _find_pad_template (send->dtlssrtpenc, GST_PAD_SINK, GST_PAD_REQUEST,
368 "rtcp_sink_%d");
369 pad = gst_element_request_pad (send->dtlssrtpenc, templ, "rtcp_sink_0", NULL);
370
371 ghost = gst_ghost_pad_new ("rtcp_sink", pad);
372 gst_element_add_pad (GST_ELEMENT (send), ghost);
373 gst_object_unref (pad);
374
375 G_OBJECT_CLASS (parent_class)->constructed (object);
376 }
377
378 static void
cleanup_blocks(TransportSendBin * send)379 cleanup_blocks (TransportSendBin * send)
380 {
381 if (send->rtp_block) {
382 _free_pad_block (send->rtp_block);
383 send->rtp_block = NULL;
384 }
385
386 if (send->rtcp_block) {
387 _free_pad_block (send->rtcp_block);
388 send->rtcp_block = NULL;
389 }
390 }
391
392 static void
transport_send_bin_dispose(GObject * object)393 transport_send_bin_dispose (GObject * object)
394 {
395 TransportSendBin *send = TRANSPORT_SEND_BIN (object);
396
397 TSB_LOCK (send);
398 if (send->nicesink) {
399 g_signal_handlers_disconnect_by_data (send->nicesink, send);
400 send->nicesink = NULL;
401 }
402
403 cleanup_blocks (send);
404
405 TSB_UNLOCK (send);
406
407 G_OBJECT_CLASS (parent_class)->dispose (object);
408 }
409
410 static void
transport_send_bin_finalize(GObject * object)411 transport_send_bin_finalize (GObject * object)
412 {
413 TransportSendBin *send = TRANSPORT_SEND_BIN (object);
414
415 g_mutex_clear (TSB_GET_LOCK (send));
416 G_OBJECT_CLASS (parent_class)->finalize (object);
417 }
418
419 static gboolean
gst_transport_send_bin_element_query(GstElement * element,GstQuery * query)420 gst_transport_send_bin_element_query (GstElement * element, GstQuery * query)
421 {
422 gboolean ret = TRUE;
423 GstClockTime min_latency;
424
425 GST_LOG_OBJECT (element, "got query %s", GST_QUERY_TYPE_NAME (query));
426
427 switch (GST_QUERY_TYPE (query)) {
428 case GST_QUERY_LATENCY:
429 /* when latency is queried, use the result to configure our
430 * own latency internally, piggybacking off the global
431 * latency configuration sequence. */
432 GST_DEBUG_OBJECT (element, "handling latency query");
433
434 /* Call the parent query handler to actually get the query
435 * sent upstream */
436 ret =
437 GST_ELEMENT_CLASS (transport_send_bin_parent_class)->query
438 (GST_ELEMENT (element), query);
439 if (!ret)
440 break;
441
442 gst_query_parse_latency (query, NULL, &min_latency, NULL);
443
444 GST_DEBUG_OBJECT (element,
445 "got min latency %" GST_TIME_FORMAT, GST_TIME_ARGS (min_latency));
446
447 /* configure latency on elements */
448 /* Call the parent event handler, because our sub-class handler
449 * will drop the LATENCY event. We also don't need to that
450 * the latency configuration is valid (min < max), because
451 * the pipeline will do it when checking the query results */
452 if (GST_ELEMENT_CLASS (transport_send_bin_parent_class)->send_event
453 (GST_ELEMENT (element), gst_event_new_latency (min_latency))) {
454 GST_INFO_OBJECT (element, "configured latency of %" GST_TIME_FORMAT,
455 GST_TIME_ARGS (min_latency));
456 } else {
457 GST_WARNING_OBJECT (element,
458 "did not really configure latency of %" GST_TIME_FORMAT,
459 GST_TIME_ARGS (min_latency));
460 }
461
462 break;
463 default:
464 ret =
465 GST_ELEMENT_CLASS (transport_send_bin_parent_class)->query
466 (GST_ELEMENT (element), query);
467 break;
468 }
469
470 return ret;
471 }
472
473 static gboolean
gst_transport_send_bin_element_event(GstElement * element,GstEvent * event)474 gst_transport_send_bin_element_event (GstElement * element, GstEvent * event)
475 {
476 gboolean ret = TRUE;
477
478 GST_LOG_OBJECT (element, "got event %s", GST_EVENT_TYPE_NAME (event));
479
480 switch (GST_EVENT_TYPE (event)) {
481 case GST_EVENT_LATENCY:
482 /* Ignore the pipeline configured latency, we choose our own
483 * instead when the latency query happens, so that sending
484 * isn't affected by other parts of the pipeline */
485 GST_DEBUG_OBJECT (element, "Ignoring latency event from parent");
486 gst_event_unref (event);
487 break;
488 default:
489 ret =
490 GST_ELEMENT_CLASS (transport_send_bin_parent_class)->send_event
491 (GST_ELEMENT (element), event);
492 break;
493 }
494
495 return ret;
496 }
497
498 static void
transport_send_bin_class_init(TransportSendBinClass * klass)499 transport_send_bin_class_init (TransportSendBinClass * klass)
500 {
501 GObjectClass *gobject_class = (GObjectClass *) klass;
502 GstElementClass *element_class = (GstElementClass *) klass;
503
504 element_class->change_state = transport_send_bin_change_state;
505
506 gst_element_class_add_static_pad_template (element_class, &rtp_sink_template);
507 gst_element_class_add_static_pad_template (element_class,
508 &rtcp_sink_template);
509 gst_element_class_add_static_pad_template (element_class,
510 &data_sink_template);
511
512 gst_element_class_set_metadata (element_class, "WebRTC Transport Send Bin",
513 "Filter/Network/WebRTC", "A bin for webrtc connections",
514 "Matthew Waters <matthew@centricular.com>");
515
516 gobject_class->constructed = transport_send_bin_constructed;
517 gobject_class->dispose = transport_send_bin_dispose;
518 gobject_class->get_property = transport_send_bin_get_property;
519 gobject_class->set_property = transport_send_bin_set_property;
520 gobject_class->finalize = transport_send_bin_finalize;
521
522 element_class->send_event = gst_transport_send_bin_element_event;
523 element_class->query = gst_transport_send_bin_element_query;
524
525 g_object_class_install_property (gobject_class,
526 PROP_STREAM,
527 g_param_spec_object ("stream", "Stream",
528 "The TransportStream for this sending bin",
529 transport_stream_get_type (),
530 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
531 }
532
533 static void
transport_send_bin_init(TransportSendBin * send)534 transport_send_bin_init (TransportSendBin * send)
535 {
536 g_mutex_init (TSB_GET_LOCK (send));
537 }
538