• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 2017 Matthew Waters <matthew@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 
20 #ifdef HAVE_CONFIG_H
21 # include "config.h"
22 #endif
23 
24 #include "transportsendbin.h"
25 #include "utils.h"
26 #include "gst/webrtc/webrtc-priv.h"
27 
28 /*
29  *           ,--------------transport_send_%u-------- ---,
30  *           ;   ,-----dtlssrtpenc---,                   ;
31  * data_sink o---o data_sink         ;                   ;
32  *           ;   ;                   ;  ,---nicesink---, ;
33  *  rtp_sink o---o rtp_sink_0    src o--o sink         ; ;
34  *           ;   ;                   ;  '--------------' ;
35  * rtcp_sink o---o rtcp_sink_0       ;                   ;
36  *           ;   '-------------------'
37  *           '-------------------------------------------'
38  *
39  *
40  * FIXME: Do we need a valve drop=TRUE for the no RTCP case?
41  */
42 
43 #define GST_CAT_DEFAULT gst_webrtc_transport_send_bin_debug
44 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
45 
46 #define transport_send_bin_parent_class parent_class
47 G_DEFINE_TYPE_WITH_CODE (TransportSendBin, transport_send_bin, GST_TYPE_BIN,
48     GST_DEBUG_CATEGORY_INIT (gst_webrtc_transport_send_bin_debug,
49         "webrtctransportsendbin", 0, "webrtctransportsendbin"););
50 
51 static GstStaticPadTemplate rtp_sink_template =
52 GST_STATIC_PAD_TEMPLATE ("rtp_sink",
53     GST_PAD_SINK,
54     GST_PAD_ALWAYS,
55     GST_STATIC_CAPS ("application/x-rtp"));
56 
57 static GstStaticPadTemplate rtcp_sink_template =
58 GST_STATIC_PAD_TEMPLATE ("rtcp_sink",
59     GST_PAD_SINK,
60     GST_PAD_ALWAYS,
61     GST_STATIC_CAPS ("application/x-rtp"));
62 
63 static GstStaticPadTemplate data_sink_template =
64 GST_STATIC_PAD_TEMPLATE ("data_sink",
65     GST_PAD_SINK,
66     GST_PAD_ALWAYS,
67     GST_STATIC_CAPS_ANY);
68 
69 enum
70 {
71   PROP_0,
72   PROP_STREAM,
73 };
74 
75 #define TSB_GET_LOCK(tsb) (&tsb->lock)
76 #define TSB_LOCK(tsb) (g_mutex_lock (TSB_GET_LOCK(tsb)))
77 #define TSB_UNLOCK(tsb) (g_mutex_unlock (TSB_GET_LOCK(tsb)))
78 
79 static void cleanup_blocks (TransportSendBin * send);
80 
81 static void
transport_send_bin_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)82 transport_send_bin_set_property (GObject * object, guint prop_id,
83     const GValue * value, GParamSpec * pspec)
84 {
85   TransportSendBin *send = TRANSPORT_SEND_BIN (object);
86 
87   GST_OBJECT_LOCK (send);
88   switch (prop_id) {
89     case PROP_STREAM:
90       /* XXX: weak-ref this? Note, it's construct-only so can't be changed later */
91       send->stream = TRANSPORT_STREAM (g_value_get_object (value));
92       break;
93     default:
94       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
95       break;
96   }
97   GST_OBJECT_UNLOCK (send);
98 }
99 
100 static void
transport_send_bin_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)101 transport_send_bin_get_property (GObject * object, guint prop_id,
102     GValue * value, GParamSpec * pspec)
103 {
104   TransportSendBin *send = TRANSPORT_SEND_BIN (object);
105 
106   GST_OBJECT_LOCK (send);
107   switch (prop_id) {
108     case PROP_STREAM:
109       g_value_set_object (value, send->stream);
110       break;
111     default:
112       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
113       break;
114   }
115   GST_OBJECT_UNLOCK (send);
116 }
117 
118 static GstPadProbeReturn
pad_block(GstPad * pad,GstPadProbeInfo * info,gpointer unused)119 pad_block (GstPad * pad, GstPadProbeInfo * info, gpointer unused)
120 {
121   /* Drop all events: we don't care about them and don't want to block on
122    * them. Sticky events would be forwarded again later once we unblock
123    * and we don't want to forward them here already because that might
124    * cause a spurious GST_FLOW_FLUSHING */
125   if (GST_IS_EVENT (info->data))
126     return GST_PAD_PROBE_DROP;
127 
128   /* But block on any actual data-flow so we don't accidentally send that
129    * to a pad that is not ready yet, causing GST_FLOW_FLUSHING and everything
130    * to silently stop.
131    */
132   GST_LOG_OBJECT (pad, "blocking pad with data %" GST_PTR_FORMAT, info->data);
133 
134   return GST_PAD_PROBE_OK;
135 }
136 
137 /* We block RTP/RTCP dataflow until the relevant DTLS key
138  * nego is done, but we need to block the *peer* src pad
139  * because the dtlssrtpenc state changes are done manually,
140  * and otherwise we can get state change problems trying to shut down */
141 static struct pad_block *
block_peer_pad(GstElement * elem,const gchar * pad_name)142 block_peer_pad (GstElement * elem, const gchar * pad_name)
143 {
144   GstPad *pad, *peer;
145   struct pad_block *block;
146 
147   pad = gst_element_get_static_pad (elem, pad_name);
148   peer = gst_pad_get_peer (pad);
149   block = _create_pad_block (elem, peer, 0, NULL, NULL);
150   block->block_id = gst_pad_add_probe (peer,
151       GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
152       (GstPadProbeCallback) pad_block, NULL, NULL);
153   gst_object_unref (pad);
154   gst_object_unref (peer);
155   return block;
156 }
157 
158 static GstStateChangeReturn
transport_send_bin_change_state(GstElement * element,GstStateChange transition)159 transport_send_bin_change_state (GstElement * element,
160     GstStateChange transition)
161 {
162   TransportSendBin *send = TRANSPORT_SEND_BIN (element);
163   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
164 
165   GST_DEBUG_OBJECT (element, "changing state: %s => %s",
166       gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
167       gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
168 
169   switch (transition) {
170     case GST_STATE_CHANGE_NULL_TO_READY:{
171       /* XXX: don't change state until the client-ness has been chosen
172        * arguably the element should be able to deal with this itself or
173        * we should only add it once/if we get the encoding keys */
174       TSB_LOCK (send);
175       gst_element_set_locked_state (send->dtlssrtpenc, TRUE);
176       send->active = TRUE;
177       send->has_clientness = FALSE;
178       TSB_UNLOCK (send);
179       break;
180     }
181     case GST_STATE_CHANGE_READY_TO_PAUSED:{
182       GstElement *elem;
183 
184       TSB_LOCK (send);
185       /* RTP */
186       /* unblock the encoder once the key is set, this should also be automatic */
187       elem = send->stream->transport->dtlssrtpenc;
188       send->rtp_block = block_peer_pad (elem, "rtp_sink_0");
189       /* Also block the RTCP pad on the RTP encoder, in case we mux RTCP */
190       send->rtcp_block = block_peer_pad (elem, "rtcp_sink_0");
191       /* unblock ice sink once a connection is made, this should also be automatic */
192       elem = send->stream->transport->transport->sink;
193 
194       TSB_UNLOCK (send);
195       break;
196     }
197     default:
198       break;
199   }
200 
201   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
202   if (ret == GST_STATE_CHANGE_FAILURE) {
203     GST_WARNING_OBJECT (element, "Parent state change handler failed");
204     return ret;
205   }
206 
207   switch (transition) {
208     case GST_STATE_CHANGE_PAUSED_TO_READY:
209     {
210       /* Now that everything is stopped, we can remove the pad blocks
211        * if they still exist, without accidentally feeding data to the
212        * dtlssrtpenc elements */
213       TSB_LOCK (send);
214       cleanup_blocks (send);
215       TSB_UNLOCK (send);
216       break;
217     }
218     case GST_STATE_CHANGE_READY_TO_NULL:{
219       TSB_LOCK (send);
220       send->active = FALSE;
221       cleanup_blocks (send);
222 
223       gst_element_set_locked_state (send->dtlssrtpenc, FALSE);
224       TSB_UNLOCK (send);
225 
226       break;
227     }
228     default:
229       break;
230   }
231 
232   return ret;
233 }
234 
235 static void
_on_dtls_enc_key_set(GstElement * dtlssrtpenc,TransportSendBin * send)236 _on_dtls_enc_key_set (GstElement * dtlssrtpenc, TransportSendBin * send)
237 {
238   if (dtlssrtpenc != send->dtlssrtpenc) {
239     GST_WARNING_OBJECT (send,
240         "Received dtls-enc key info for unknown element %" GST_PTR_FORMAT,
241         dtlssrtpenc);
242     return;
243   }
244 
245   TSB_LOCK (send);
246   if (!send->active) {
247     GST_INFO_OBJECT (send, "Received dtls-enc key info from %" GST_PTR_FORMAT
248         "when not active", dtlssrtpenc);
249     goto done;
250   }
251 
252   GST_LOG_OBJECT (send, "Unblocking %" GST_PTR_FORMAT " pads", dtlssrtpenc);
253   _free_pad_block (send->rtp_block);
254   _free_pad_block (send->rtcp_block);
255   send->rtp_block = send->rtcp_block = NULL;
256 
257 done:
258   TSB_UNLOCK (send);
259 }
260 
261 static void
maybe_start_enc(TransportSendBin * send)262 maybe_start_enc (TransportSendBin * send)
263 {
264   GstWebRTCICEConnectionState state;
265 
266   if (!send->has_clientness) {
267     GST_LOG_OBJECT (send, "Can't start DTLS because doesn't know client-ness");
268     return;
269   }
270 
271   g_object_get (send->stream->transport->transport, "state", &state, NULL);
272   if (state != GST_WEBRTC_ICE_CONNECTION_STATE_CONNECTED &&
273       state != GST_WEBRTC_ICE_CONNECTION_STATE_COMPLETED) {
274     GST_LOG_OBJECT (send, "Can't start DTLS yet because ICE is not connected.");
275     return;
276   }
277 
278   gst_element_set_locked_state (send->dtlssrtpenc, FALSE);
279   gst_element_sync_state_with_parent (send->dtlssrtpenc);
280 }
281 
282 static void
_on_notify_dtls_client_status(GstElement * dtlssrtpenc,GParamSpec * pspec,TransportSendBin * send)283 _on_notify_dtls_client_status (GstElement * dtlssrtpenc,
284     GParamSpec * pspec, TransportSendBin * send)
285 {
286   if (dtlssrtpenc != send->dtlssrtpenc) {
287     GST_WARNING_OBJECT (send,
288         "Received dtls-enc client mode for unknown element %" GST_PTR_FORMAT,
289         dtlssrtpenc);
290     return;
291   }
292 
293   TSB_LOCK (send);
294   if (!send->active) {
295     GST_DEBUG_OBJECT (send,
296         "DTLS-SRTP encoder ready after we're already stopping");
297     goto done;
298   }
299 
300   send->has_clientness = TRUE;
301   GST_DEBUG_OBJECT (send,
302       "DTLS-SRTP encoder configured. Unlocking it and maybe changing state %"
303       GST_PTR_FORMAT, dtlssrtpenc);
304   maybe_start_enc (send);
305 
306 done:
307   TSB_UNLOCK (send);
308 }
309 
310 static void
_on_notify_ice_connection_state(GstWebRTCICETransport * transport,GParamSpec * pspec,TransportSendBin * send)311 _on_notify_ice_connection_state (GstWebRTCICETransport * transport,
312     GParamSpec * pspec, TransportSendBin * send)
313 {
314   TSB_LOCK (send);
315   maybe_start_enc (send);
316   TSB_UNLOCK (send);
317 }
318 
319 static void
transport_send_bin_constructed(GObject * object)320 transport_send_bin_constructed (GObject * object)
321 {
322   TransportSendBin *send = TRANSPORT_SEND_BIN (object);
323   GstPadTemplate *templ;
324   GstPad *ghost, *pad;
325 
326   g_return_if_fail (send->stream);
327 
328   send->dtlssrtpenc = send->stream->transport->dtlssrtpenc;
329   send->nicesink = send->stream->transport->transport->sink;
330 
331   /* unblock the encoder once the key is set */
332   g_signal_connect (send->dtlssrtpenc, "on-key-set",
333       G_CALLBACK (_on_dtls_enc_key_set), send);
334   /* Bring the encoder up to current state only once the is-client prop is set */
335   g_signal_connect (send->dtlssrtpenc, "notify::is-client",
336       G_CALLBACK (_on_notify_dtls_client_status), send);
337   /* unblock ice sink once it signals a connection */
338   g_signal_connect (send->stream->transport->transport, "notify::state",
339       G_CALLBACK (_on_notify_ice_connection_state), send);
340 
341   gst_bin_add (GST_BIN (send), GST_ELEMENT (send->dtlssrtpenc));
342   gst_bin_add (GST_BIN (send), GST_ELEMENT (send->nicesink));
343 
344   if (!gst_element_link_pads (GST_ELEMENT (send->dtlssrtpenc), "src",
345           send->nicesink, "sink"))
346     g_warn_if_reached ();
347 
348   templ = _find_pad_template (send->dtlssrtpenc, GST_PAD_SINK, GST_PAD_REQUEST,
349       "rtp_sink_%d");
350   pad = gst_element_request_pad (send->dtlssrtpenc, templ, "rtp_sink_0", NULL);
351 
352   ghost = gst_ghost_pad_new ("rtp_sink", pad);
353   gst_element_add_pad (GST_ELEMENT (send), ghost);
354   gst_object_unref (pad);
355 
356   /* push the data stream onto the RTP dtls element */
357   templ = _find_pad_template (send->dtlssrtpenc, GST_PAD_SINK, GST_PAD_REQUEST,
358       "data_sink");
359   pad = gst_element_request_pad (send->dtlssrtpenc, templ, "data_sink", NULL);
360 
361   ghost = gst_ghost_pad_new ("data_sink", pad);
362   gst_element_add_pad (GST_ELEMENT (send), ghost);
363   gst_object_unref (pad);
364 
365   /* RTCP */
366   /* Do the common init for the context struct */
367   templ = _find_pad_template (send->dtlssrtpenc, GST_PAD_SINK, GST_PAD_REQUEST,
368       "rtcp_sink_%d");
369   pad = gst_element_request_pad (send->dtlssrtpenc, templ, "rtcp_sink_0", NULL);
370 
371   ghost = gst_ghost_pad_new ("rtcp_sink", pad);
372   gst_element_add_pad (GST_ELEMENT (send), ghost);
373   gst_object_unref (pad);
374 
375   G_OBJECT_CLASS (parent_class)->constructed (object);
376 }
377 
378 static void
cleanup_blocks(TransportSendBin * send)379 cleanup_blocks (TransportSendBin * send)
380 {
381   if (send->rtp_block) {
382     _free_pad_block (send->rtp_block);
383     send->rtp_block = NULL;
384   }
385 
386   if (send->rtcp_block) {
387     _free_pad_block (send->rtcp_block);
388     send->rtcp_block = NULL;
389   }
390 }
391 
392 static void
transport_send_bin_dispose(GObject * object)393 transport_send_bin_dispose (GObject * object)
394 {
395   TransportSendBin *send = TRANSPORT_SEND_BIN (object);
396 
397   TSB_LOCK (send);
398   if (send->nicesink) {
399     g_signal_handlers_disconnect_by_data (send->nicesink, send);
400     send->nicesink = NULL;
401   }
402 
403   cleanup_blocks (send);
404 
405   TSB_UNLOCK (send);
406 
407   G_OBJECT_CLASS (parent_class)->dispose (object);
408 }
409 
410 static void
transport_send_bin_finalize(GObject * object)411 transport_send_bin_finalize (GObject * object)
412 {
413   TransportSendBin *send = TRANSPORT_SEND_BIN (object);
414 
415   g_mutex_clear (TSB_GET_LOCK (send));
416   G_OBJECT_CLASS (parent_class)->finalize (object);
417 }
418 
419 static gboolean
gst_transport_send_bin_element_query(GstElement * element,GstQuery * query)420 gst_transport_send_bin_element_query (GstElement * element, GstQuery * query)
421 {
422   gboolean ret = TRUE;
423   GstClockTime min_latency;
424 
425   GST_LOG_OBJECT (element, "got query %s", GST_QUERY_TYPE_NAME (query));
426 
427   switch (GST_QUERY_TYPE (query)) {
428     case GST_QUERY_LATENCY:
429       /* when latency is queried, use the result to configure our
430        * own latency internally, piggybacking off the global
431        * latency configuration sequence. */
432       GST_DEBUG_OBJECT (element, "handling latency query");
433 
434       /* Call the parent query handler to actually get the query
435        * sent upstream */
436       ret =
437           GST_ELEMENT_CLASS (transport_send_bin_parent_class)->query
438           (GST_ELEMENT (element), query);
439       if (!ret)
440         break;
441 
442       gst_query_parse_latency (query, NULL, &min_latency, NULL);
443 
444       GST_DEBUG_OBJECT (element,
445           "got min latency %" GST_TIME_FORMAT, GST_TIME_ARGS (min_latency));
446 
447       /* configure latency on elements */
448       /* Call the parent event handler, because our sub-class handler
449        * will drop the LATENCY event. We also don't need to that
450        * the latency configuration is valid (min < max), because
451        * the pipeline will do it when checking the query results */
452       if (GST_ELEMENT_CLASS (transport_send_bin_parent_class)->send_event
453           (GST_ELEMENT (element), gst_event_new_latency (min_latency))) {
454         GST_INFO_OBJECT (element, "configured latency of %" GST_TIME_FORMAT,
455             GST_TIME_ARGS (min_latency));
456       } else {
457         GST_WARNING_OBJECT (element,
458             "did not really configure latency of %" GST_TIME_FORMAT,
459             GST_TIME_ARGS (min_latency));
460       }
461 
462       break;
463     default:
464       ret =
465           GST_ELEMENT_CLASS (transport_send_bin_parent_class)->query
466           (GST_ELEMENT (element), query);
467       break;
468   }
469 
470   return ret;
471 }
472 
473 static gboolean
gst_transport_send_bin_element_event(GstElement * element,GstEvent * event)474 gst_transport_send_bin_element_event (GstElement * element, GstEvent * event)
475 {
476   gboolean ret = TRUE;
477 
478   GST_LOG_OBJECT (element, "got event %s", GST_EVENT_TYPE_NAME (event));
479 
480   switch (GST_EVENT_TYPE (event)) {
481     case GST_EVENT_LATENCY:
482       /* Ignore the pipeline configured latency, we choose our own
483        * instead when the latency query happens, so that sending
484        * isn't affected by other parts of the pipeline */
485       GST_DEBUG_OBJECT (element, "Ignoring latency event from parent");
486       gst_event_unref (event);
487       break;
488     default:
489       ret =
490           GST_ELEMENT_CLASS (transport_send_bin_parent_class)->send_event
491           (GST_ELEMENT (element), event);
492       break;
493   }
494 
495   return ret;
496 }
497 
498 static void
transport_send_bin_class_init(TransportSendBinClass * klass)499 transport_send_bin_class_init (TransportSendBinClass * klass)
500 {
501   GObjectClass *gobject_class = (GObjectClass *) klass;
502   GstElementClass *element_class = (GstElementClass *) klass;
503 
504   element_class->change_state = transport_send_bin_change_state;
505 
506   gst_element_class_add_static_pad_template (element_class, &rtp_sink_template);
507   gst_element_class_add_static_pad_template (element_class,
508       &rtcp_sink_template);
509   gst_element_class_add_static_pad_template (element_class,
510       &data_sink_template);
511 
512   gst_element_class_set_metadata (element_class, "WebRTC Transport Send Bin",
513       "Filter/Network/WebRTC", "A bin for webrtc connections",
514       "Matthew Waters <matthew@centricular.com>");
515 
516   gobject_class->constructed = transport_send_bin_constructed;
517   gobject_class->dispose = transport_send_bin_dispose;
518   gobject_class->get_property = transport_send_bin_get_property;
519   gobject_class->set_property = transport_send_bin_set_property;
520   gobject_class->finalize = transport_send_bin_finalize;
521 
522   element_class->send_event = gst_transport_send_bin_element_event;
523   element_class->query = gst_transport_send_bin_element_query;
524 
525   g_object_class_install_property (gobject_class,
526       PROP_STREAM,
527       g_param_spec_object ("stream", "Stream",
528           "The TransportStream for this sending bin",
529           transport_stream_get_type (),
530           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
531 }
532 
533 static void
transport_send_bin_init(TransportSendBin * send)534 transport_send_bin_init (TransportSendBin * send)
535 {
536   g_mutex_init (TSB_GET_LOCK (send));
537 }
538