• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2014, Ericsson AB. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without modification,
5  * are permitted provided that the following conditions are met:
6  *
7  * 1. Redistributions of source code must retain the above copyright notice, this
8  * list of conditions and the following disclaimer.
9  *
10  * 2. Redistributions in binary form must reproduce the above copyright notice, this
11  * list of conditions and the following disclaimer in the documentation and/or other
12  * materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
17  * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
18  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
19  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
21  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
23  * OF SUCH DAMAGE.
24  */
25 
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29 
30 #include "gstdtlselements.h"
31 #include "gstdtlsdec.h"
32 
33 #include "gstdtlscertificate.h"
34 
35 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
36     GST_PAD_SINK,
37     GST_PAD_ALWAYS,
38     GST_STATIC_CAPS ("application/x-dtls")
39     );
40 
41 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
42     GST_PAD_SRC,
43     GST_PAD_REQUEST,
44     GST_STATIC_CAPS_ANY);
45 
46 GST_DEBUG_CATEGORY_STATIC (gst_dtls_dec_debug);
47 #define GST_CAT_DEFAULT gst_dtls_dec_debug
48 
49 #define gst_dtls_dec_parent_class parent_class
50 G_DEFINE_TYPE_WITH_CODE (GstDtlsDec, gst_dtls_dec, GST_TYPE_ELEMENT,
51     GST_DEBUG_CATEGORY_INIT (gst_dtls_dec_debug, "dtlsdec", 0, "DTLS Decoder"));
52 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (dtlsdec, "dtlsdec", GST_RANK_NONE,
53     GST_TYPE_DTLS_DEC, dtls_element_init (plugin));
54 
55 enum
56 {
57   SIGNAL_ON_KEY_RECEIVED,
58   NUM_SIGNALS
59 };
60 
61 static guint signals[NUM_SIGNALS];
62 
63 enum
64 {
65   PROP_0,
66   PROP_CONNECTION_ID,
67   PROP_PEM,
68   PROP_PEER_PEM,
69   PROP_DECODER_KEY,
70   PROP_SRTP_CIPHER,
71   PROP_SRTP_AUTH,
72   PROP_CONNECTION_STATE,
73   NUM_PROPERTIES
74 };
75 
76 static GParamSpec *properties[NUM_PROPERTIES];
77 
78 #define DEFAULT_CONNECTION_ID NULL
79 #define DEFAULT_PEM NULL
80 #define DEFAULT_PEER_PEM NULL
81 
82 #define DEFAULT_DECODER_KEY NULL
83 #define DEFAULT_SRTP_CIPHER 0
84 #define DEFAULT_SRTP_AUTH 0
85 
86 
87 static void gst_dtls_dec_finalize (GObject *);
88 static void gst_dtls_dec_dispose (GObject *);
89 static void gst_dtls_dec_set_property (GObject *, guint prop_id,
90     const GValue *, GParamSpec *);
91 static void gst_dtls_dec_get_property (GObject *, guint prop_id, GValue *,
92     GParamSpec *);
93 
94 static GstStateChangeReturn gst_dtls_dec_change_state (GstElement *,
95     GstStateChange);
96 static GstPad *gst_dtls_dec_request_new_pad (GstElement *, GstPadTemplate *,
97     const gchar * name, const GstCaps *);
98 static void gst_dtls_dec_release_pad (GstElement *, GstPad *);
99 
100 static void on_key_received (GstDtlsConnection *, gpointer key, guint cipher,
101     guint auth, GstDtlsDec *);
102 static gboolean on_peer_certificate_received (GstDtlsConnection *, gchar * pem,
103     GstDtlsDec *);
104 static GstFlowReturn sink_chain (GstPad *, GstObject * parent, GstBuffer *);
105 static GstFlowReturn sink_chain_list (GstPad *, GstObject * parent,
106     GstBufferList *);
107 
108 static GstDtlsAgent *get_agent_by_pem (const gchar * pem);
109 static void agent_weak_ref_notify (gchar * pem, GstDtlsAgent *);
110 static void create_connection (GstDtlsDec *, gchar * id);
111 static void connection_weak_ref_notify (gchar * id, GstDtlsConnection *);
112 
113 static void
gst_dtls_dec_class_init(GstDtlsDecClass * klass)114 gst_dtls_dec_class_init (GstDtlsDecClass * klass)
115 {
116   GObjectClass *gobject_class;
117   GstElementClass *element_class;
118 
119   gobject_class = (GObjectClass *) klass;
120   element_class = (GstElementClass *) klass;
121 
122   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_dtls_dec_finalize);
123   gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_dtls_dec_dispose);
124   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_dtls_dec_set_property);
125   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_dtls_dec_get_property);
126 
127   element_class->change_state = GST_DEBUG_FUNCPTR (gst_dtls_dec_change_state);
128   element_class->request_new_pad =
129       GST_DEBUG_FUNCPTR (gst_dtls_dec_request_new_pad);
130   element_class->release_pad = GST_DEBUG_FUNCPTR (gst_dtls_dec_release_pad);
131 
132   signals[SIGNAL_ON_KEY_RECEIVED] =
133       g_signal_new ("on-key-received", G_TYPE_FROM_CLASS (klass),
134       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
135 
136   properties[PROP_CONNECTION_ID] =
137       g_param_spec_string ("connection-id",
138       "Connection id",
139       "Every encoder/decoder pair should have the same, unique, connection-id",
140       DEFAULT_CONNECTION_ID, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
141 
142   properties[PROP_PEM] =
143       g_param_spec_string ("pem",
144       "PEM string",
145       "A string containing a X509 certificate and RSA private key in PEM format",
146       DEFAULT_PEM,
147       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | GST_PARAM_DOC_SHOW_DEFAULT);
148 
149   properties[PROP_PEER_PEM] =
150       g_param_spec_string ("peer-pem",
151       "Peer PEM string",
152       "The X509 certificate received in the DTLS handshake, in PEM format",
153       DEFAULT_PEER_PEM, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
154 
155   properties[PROP_DECODER_KEY] =
156       g_param_spec_boxed ("decoder-key",
157       "Decoder key",
158       "SRTP key that should be used by the decoder",
159       GST_TYPE_CAPS, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
160 
161   properties[PROP_SRTP_CIPHER] =
162       g_param_spec_uint ("srtp-cipher",
163       "SRTP cipher",
164       "The SRTP cipher selected in the DTLS handshake. "
165       "The value will be set to an GstDtlsSrtpCipher.",
166       0, GST_DTLS_SRTP_CIPHER_AES_128_ICM, DEFAULT_SRTP_CIPHER,
167       G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
168 
169   properties[PROP_SRTP_AUTH] =
170       g_param_spec_uint ("srtp-auth",
171       "SRTP authentication",
172       "The SRTP authentication selected in the DTLS handshake. "
173       "The value will be set to an GstDtlsSrtpAuth.",
174       0, GST_DTLS_SRTP_AUTH_HMAC_SHA1_80, DEFAULT_SRTP_AUTH,
175       G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
176 
177   properties[PROP_CONNECTION_STATE] =
178       g_param_spec_enum ("connection-state",
179       "Connection State",
180       "Current connection state",
181       GST_DTLS_TYPE_CONNECTION_STATE,
182       GST_DTLS_CONNECTION_STATE_NEW, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
183 
184   g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
185 
186   gst_element_class_add_static_pad_template (element_class, &src_template);
187   gst_element_class_add_static_pad_template (element_class, &sink_template);
188 
189   gst_element_class_set_static_metadata (element_class,
190       "DTLS Decoder",
191       "Decoder/Network/DTLS",
192       "Decodes DTLS packets", "Patrik Oldsberg patrik.oldsberg@ericsson.com");
193 }
194 
195 static void
gst_dtls_dec_init(GstDtlsDec * self)196 gst_dtls_dec_init (GstDtlsDec * self)
197 {
198   self->agent = get_agent_by_pem (NULL);
199   self->connection_id = NULL;
200   self->connection = NULL;
201   self->peer_pem = NULL;
202 
203   self->decoder_key = NULL;
204   self->srtp_cipher = DEFAULT_SRTP_CIPHER;
205   self->srtp_auth = DEFAULT_SRTP_AUTH;
206 
207   g_mutex_init (&self->src_mutex);
208 
209   self->src = NULL;
210   self->sink = gst_pad_new_from_static_template (&sink_template, "sink");
211   g_return_if_fail (self->sink);
212 
213   gst_pad_set_chain_function (self->sink, GST_DEBUG_FUNCPTR (sink_chain));
214   gst_pad_set_chain_list_function (self->sink,
215       GST_DEBUG_FUNCPTR (sink_chain_list));
216 
217   gst_element_add_pad (GST_ELEMENT (self), self->sink);
218 }
219 
220 static void
gst_dtls_dec_finalize(GObject * object)221 gst_dtls_dec_finalize (GObject * object)
222 {
223   GstDtlsDec *self = GST_DTLS_DEC (object);
224 
225   if (self->decoder_key) {
226     gst_buffer_unref (self->decoder_key);
227     self->decoder_key = NULL;
228   }
229 
230   g_free (self->connection_id);
231   self->connection_id = NULL;
232 
233   g_free (self->peer_pem);
234   self->peer_pem = NULL;
235 
236   g_mutex_clear (&self->src_mutex);
237 
238   GST_LOG_OBJECT (self, "finalized");
239 
240   G_OBJECT_CLASS (parent_class)->finalize (object);
241 }
242 
243 static void
gst_dtls_dec_dispose(GObject * object)244 gst_dtls_dec_dispose (GObject * object)
245 {
246   GstDtlsDec *self = GST_DTLS_DEC (object);
247 
248   if (self->agent) {
249     g_object_unref (self->agent);
250     self->agent = NULL;
251   }
252 
253   if (self->connection) {
254     g_object_unref (self->connection);
255     self->connection = NULL;
256   }
257 
258   G_OBJECT_CLASS (parent_class)->dispose (object);
259 }
260 
261 static void
gst_dtls_dec_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)262 gst_dtls_dec_set_property (GObject * object, guint prop_id,
263     const GValue * value, GParamSpec * pspec)
264 {
265   GstDtlsDec *self = GST_DTLS_DEC (object);
266 
267   switch (prop_id) {
268     case PROP_CONNECTION_ID:
269       g_free (self->connection_id);
270       self->connection_id = g_value_dup_string (value);
271       g_return_if_fail (self->agent);
272       create_connection (self, self->connection_id);
273       break;
274     case PROP_PEM:
275       if (self->agent) {
276         g_object_unref (self->agent);
277       }
278       self->agent = get_agent_by_pem (g_value_get_string (value));
279       if (self->connection_id) {
280         create_connection (self, self->connection_id);
281       }
282       break;
283     default:
284       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
285   }
286 }
287 
288 static void
gst_dtls_dec_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)289 gst_dtls_dec_get_property (GObject * object, guint prop_id, GValue * value,
290     GParamSpec * pspec)
291 {
292   GstDtlsDec *self = GST_DTLS_DEC (object);
293 
294   switch (prop_id) {
295     case PROP_CONNECTION_ID:
296       g_value_set_string (value, self->connection_id);
297       break;
298     case PROP_PEM:
299       g_value_take_string (value,
300           gst_dtls_agent_get_certificate_pem (self->agent));
301       break;
302     case PROP_PEER_PEM:
303       g_value_set_string (value, self->peer_pem);
304       break;
305     case PROP_DECODER_KEY:
306       g_value_set_boxed (value, self->decoder_key);
307       break;
308     case PROP_SRTP_CIPHER:
309       g_value_set_uint (value, self->srtp_cipher);
310       break;
311     case PROP_SRTP_AUTH:
312       g_value_set_uint (value, self->srtp_auth);
313       break;
314     case PROP_CONNECTION_STATE:
315       if (self->connection)
316         g_object_get_property (G_OBJECT (self->connection), "connection-state",
317             value);
318       else
319         g_value_set_enum (value, GST_DTLS_CONNECTION_STATE_CLOSED);
320       break;
321     default:
322       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
323   }
324 }
325 
326 static GstStateChangeReturn
gst_dtls_dec_change_state(GstElement * element,GstStateChange transition)327 gst_dtls_dec_change_state (GstElement * element, GstStateChange transition)
328 {
329   GstDtlsDec *self = GST_DTLS_DEC (element);
330   GstStateChangeReturn ret;
331 
332   switch (transition) {
333     case GST_STATE_CHANGE_NULL_TO_READY:
334       if (self->connection) {
335         g_signal_connect_object (self->connection,
336             "on-decoder-key", G_CALLBACK (on_key_received), self, 0);
337         g_signal_connect_object (self->connection,
338             "on-peer-certificate", G_CALLBACK (on_peer_certificate_received),
339             self, 0);
340       } else {
341         GST_WARNING_OBJECT (self,
342             "trying to change state to ready without connection id and pem");
343         return GST_STATE_CHANGE_FAILURE;
344       }
345       break;
346     default:
347       break;
348   }
349 
350   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
351 
352   return ret;
353 }
354 
355 static gboolean
forward_sticky_events(GstPad * pad,GstEvent ** event,gpointer user_data)356 forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data)
357 {
358   GstPad *srcpad = GST_PAD_CAST (user_data);
359   GstFlowReturn ret;
360 
361   ret = gst_pad_store_sticky_event (srcpad, *event);
362   if (ret != GST_FLOW_OK) {
363     GST_DEBUG_OBJECT (srcpad, "storing sticky event %p (%s) failed: %s", *event,
364         GST_EVENT_TYPE_NAME (*event), gst_flow_get_name (ret));
365   }
366 
367   return TRUE;
368 }
369 
370 static GstPad *
gst_dtls_dec_request_new_pad(GstElement * element,GstPadTemplate * tmpl,const gchar * name,const GstCaps * caps)371 gst_dtls_dec_request_new_pad (GstElement * element,
372     GstPadTemplate * tmpl, const gchar * name, const GstCaps * caps)
373 {
374   GstDtlsDec *self = GST_DTLS_DEC (element);
375   GstPad *pad;
376 
377   GST_DEBUG_OBJECT (element, "requesting pad");
378 
379   g_return_val_if_fail (!self->src, NULL);
380   g_return_val_if_fail (tmpl->direction == GST_PAD_SRC, NULL);
381 
382   g_mutex_lock (&self->src_mutex);
383   if (self->src) {
384     GST_ERROR_OBJECT (self, "Pad %s:%s exists already",
385         GST_DEBUG_PAD_NAME (self->src));
386     g_mutex_unlock (&self->src_mutex);
387     return NULL;
388   }
389 
390   self->src = pad = gst_pad_new_from_template (tmpl, name);
391 
392   g_mutex_unlock (&self->src_mutex);
393 
394   gst_pad_set_active (pad, TRUE);
395 
396   if (caps)
397     gst_pad_set_caps (pad, (GstCaps *) caps);
398 
399   /* Forward sticky events to the new srcpad */
400   gst_pad_sticky_events_foreach (self->sink, forward_sticky_events, self->src);
401 
402   gst_element_add_pad (element, pad);
403 
404   return pad;
405 }
406 
407 static void
gst_dtls_dec_release_pad(GstElement * element,GstPad * pad)408 gst_dtls_dec_release_pad (GstElement * element, GstPad * pad)
409 {
410   GstDtlsDec *self = GST_DTLS_DEC (element);
411 
412   g_return_if_fail (self->src == pad);
413 
414   g_mutex_lock (&self->src_mutex);
415 
416   self->src = NULL;
417   g_mutex_unlock (&self->src_mutex);
418 
419   GST_DEBUG_OBJECT (self, "releasing src pad");
420 
421   gst_element_remove_pad (element, pad);
422 }
423 
424 static void
on_key_received(GstDtlsConnection * connection,gpointer key,guint cipher,guint auth,GstDtlsDec * self)425 on_key_received (GstDtlsConnection * connection, gpointer key, guint cipher,
426     guint auth, GstDtlsDec * self)
427 {
428   GstBuffer *new_decoder_key;
429   gchar *key_str;
430 
431   g_return_if_fail (GST_IS_DTLS_DEC (self));
432 
433   self->srtp_cipher = cipher;
434   self->srtp_auth = auth;
435 
436   new_decoder_key =
437       gst_buffer_new_memdup (key, GST_DTLS_SRTP_MASTER_KEY_LENGTH);
438 
439   if (self->decoder_key)
440     gst_buffer_unref (self->decoder_key);
441 
442   self->decoder_key = new_decoder_key;
443 
444   key_str = g_base64_encode (key, GST_DTLS_SRTP_MASTER_KEY_LENGTH);
445   GST_INFO_OBJECT (self, "received key: %s", key_str);
446   g_free (key_str);
447 
448   g_signal_emit (self, signals[SIGNAL_ON_KEY_RECEIVED], 0);
449 }
450 
451 static gboolean
on_peer_certificate_received(GstDtlsConnection * connection,gchar * pem,GstDtlsDec * self)452 on_peer_certificate_received (GstDtlsConnection * connection, gchar * pem,
453     GstDtlsDec * self)
454 {
455   g_return_val_if_fail (GST_IS_DTLS_DEC (self), TRUE);
456 
457   GST_DEBUG_OBJECT (self, "Received peer certificate PEM: \n%s", pem);
458 
459   if (self->peer_pem != NULL) {
460     g_free (self->peer_pem);
461     self->peer_pem = NULL;
462   }
463   self->peer_pem = g_strdup (pem);
464 
465   g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_PEER_PEM]);
466 
467   return TRUE;
468 }
469 
470 static GstFlowReturn
process_buffer(GstDtlsDec * self,GstBuffer * buffer)471 process_buffer (GstDtlsDec * self, GstBuffer * buffer)
472 {
473   GstFlowReturn flow_ret;
474   GstMapInfo map_info;
475   GError *err = NULL;
476   gsize written = 0;
477 
478   if (!gst_buffer_map (buffer, &map_info, GST_MAP_READWRITE))
479     return GST_FLOW_ERROR;
480 
481   if (!map_info.size) {
482     gst_buffer_unmap (buffer, &map_info);
483     return GST_FLOW_ERROR;
484   }
485 
486   flow_ret =
487       gst_dtls_connection_process (self->connection, map_info.data,
488       map_info.size, &written, &err);
489   gst_buffer_unmap (buffer, &map_info);
490 
491   switch (flow_ret) {
492     case GST_FLOW_OK:
493       GST_LOG_OBJECT (self,
494           "Decoded buffer of size %" G_GSIZE_FORMAT " B to %" G_GSIZE_FORMAT,
495           map_info.size, written);
496       gst_buffer_set_size (buffer, written);
497       break;
498     case GST_FLOW_EOS:
499       gst_buffer_set_size (buffer, written);
500       GST_DEBUG_OBJECT (self, "Peer closed the connection");
501       break;
502     case GST_FLOW_ERROR:
503       GST_ERROR_OBJECT (self, "Error processing buffer: %s", err->message);
504       GST_ELEMENT_ERROR (self, RESOURCE, READ, (NULL), ("%s", err->message));
505       g_clear_error (&err);
506       break;
507     default:
508       g_assert_not_reached ();
509   }
510   g_assert (err == NULL);
511 
512   return flow_ret;
513 }
514 
515 typedef struct
516 {
517   GstDtlsDec *self;
518   GstFlowReturn flow_ret;
519   guint processed;
520 } ProcessListData;
521 
522 static gboolean
process_buffer_from_list(GstBuffer ** buffer,guint idx,gpointer user_data)523 process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
524 {
525   ProcessListData *process_list_data = user_data;
526   GstDtlsDec *self = GST_DTLS_DEC (process_list_data->self);
527   GstFlowReturn flow_ret;
528 
529   *buffer = gst_buffer_make_writable (*buffer);
530   flow_ret = process_buffer (self, *buffer);
531 
532   process_list_data->flow_ret = flow_ret;
533   if (gst_buffer_get_size (*buffer) == 0)
534     gst_buffer_replace (buffer, NULL);
535   else if (flow_ret != GST_FLOW_ERROR)
536     process_list_data->processed++;
537 
538   return flow_ret == GST_FLOW_OK;
539 }
540 
541 static GstFlowReturn
sink_chain_list(GstPad * pad,GstObject * parent,GstBufferList * list)542 sink_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
543 {
544   GstDtlsDec *self = GST_DTLS_DEC (parent);
545   GstPad *other_pad;
546   ProcessListData process_list_data = { self, GST_FLOW_OK, 0 };
547 
548   list = gst_buffer_list_make_writable (list);
549   gst_buffer_list_foreach (list, process_buffer_from_list, &process_list_data);
550 
551   /* If we successfully processed at least some buffers then forward those */
552   if (process_list_data.flow_ret != GST_FLOW_OK
553       && process_list_data.processed == 0) {
554     GST_ERROR_OBJECT (self, "Failed to process buffer list: %s",
555         gst_flow_get_name (process_list_data.flow_ret));
556     gst_buffer_list_unref (list);
557     return process_list_data.flow_ret;
558   }
559 
560   /* Remove all buffers after the first one that failed to be processed */
561   gst_buffer_list_remove (list, process_list_data.processed,
562       gst_buffer_list_length (list) - process_list_data.processed);
563 
564   if (gst_buffer_list_length (list) == 0) {
565     GST_DEBUG_OBJECT (self, "Not produced any buffers");
566     gst_buffer_list_unref (list);
567 
568     return process_list_data.flow_ret;
569   }
570 
571   g_mutex_lock (&self->src_mutex);
572   other_pad = self->src;
573   if (other_pad)
574     gst_object_ref (other_pad);
575   g_mutex_unlock (&self->src_mutex);
576 
577   if (other_pad) {
578     gboolean was_eos = process_list_data.flow_ret == GST_FLOW_EOS;
579 
580     GST_LOG_OBJECT (self, "pushing buffer list with length %u",
581         gst_buffer_list_length (list));
582     process_list_data.flow_ret = gst_pad_push_list (other_pad, list);
583 
584     /* If the peer closed the connection, signal that we're done here now */
585     if (was_eos)
586       gst_pad_push_event (other_pad, gst_event_new_eos ());
587 
588     gst_object_unref (other_pad);
589   } else {
590     GST_LOG_OBJECT (self,
591         "dropping buffer list with length %d, have no source pad",
592         gst_buffer_list_length (list));
593     gst_buffer_list_unref (list);
594   }
595 
596   return process_list_data.flow_ret;
597 }
598 
599 static GstFlowReturn
sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)600 sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
601 {
602   GstDtlsDec *self = GST_DTLS_DEC (parent);
603   GstFlowReturn ret = GST_FLOW_OK;
604   GstPad *other_pad;
605 
606   if (!self->agent) {
607     gst_buffer_unref (buffer);
608     return GST_FLOW_OK;
609   }
610 
611   GST_DEBUG_OBJECT (self,
612       "received buffer from %s with length %" G_GSIZE_FORMAT,
613       self->connection_id, gst_buffer_get_size (buffer));
614 
615   buffer = gst_buffer_make_writable (buffer);
616   ret = process_buffer (self, buffer);
617   if (ret == GST_FLOW_ERROR) {
618     GST_ERROR_OBJECT (self, "Failed to process buffer: %s",
619         gst_flow_get_name (ret));
620     gst_buffer_unref (buffer);
621     return ret;
622   }
623 
624   g_mutex_lock (&self->src_mutex);
625   other_pad = self->src;
626   if (other_pad)
627     gst_object_ref (other_pad);
628   g_mutex_unlock (&self->src_mutex);
629 
630   if (other_pad) {
631     gboolean was_eos = (ret == GST_FLOW_EOS);
632 
633     if (gst_buffer_get_size (buffer) > 0) {
634       GST_LOG_OBJECT (self, "pushing buffer");
635       ret = gst_pad_push (other_pad, buffer);
636     } else {
637       gst_buffer_unref (buffer);
638     }
639 
640     /* If the peer closed the connection, signal that we're done here now */
641     if (was_eos) {
642       gst_pad_push_event (other_pad, gst_event_new_eos ());
643       if (ret == GST_FLOW_OK)
644         ret = GST_FLOW_EOS;
645     }
646 
647     gst_object_unref (other_pad);
648   } else {
649     GST_LOG_OBJECT (self, "dropping buffer, have no source pad");
650     gst_buffer_unref (buffer);
651   }
652 
653   return ret;
654 }
655 
656 static GHashTable *agent_table = NULL;
657 G_LOCK_DEFINE_STATIC (agent_table);
658 
659 static GstDtlsAgent *generated_cert_agent = NULL;
660 
661 static GstDtlsAgent *
get_agent_by_pem(const gchar * pem)662 get_agent_by_pem (const gchar * pem)
663 {
664   GstDtlsAgent *agent;
665 
666   if (!pem) {
667     if (g_once_init_enter (&generated_cert_agent)) {
668       GstDtlsAgent *new_agent;
669       GObject *certificate;
670 
671       certificate = g_object_new (GST_TYPE_DTLS_CERTIFICATE, NULL);
672       new_agent = g_object_new (GST_TYPE_DTLS_AGENT, "certificate",
673           certificate, NULL);
674       g_object_unref (certificate);
675 
676       GST_DEBUG_OBJECT (generated_cert_agent,
677           "no agent with generated cert found, creating new");
678       g_once_init_leave (&generated_cert_agent, new_agent);
679     } else {
680       GST_DEBUG_OBJECT (generated_cert_agent,
681           "using agent with generated cert");
682     }
683 
684     agent = generated_cert_agent;
685     g_object_ref (agent);
686   } else {
687     G_LOCK (agent_table);
688 
689     if (!agent_table) {
690       agent_table =
691           g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
692     }
693 
694     agent = GST_DTLS_AGENT (g_hash_table_lookup (agent_table, pem));
695 
696     if (!agent) {
697       GObject *certificate;
698 
699       certificate = g_object_new (GST_TYPE_DTLS_CERTIFICATE, "pem", pem, NULL);
700       agent = g_object_new (GST_TYPE_DTLS_AGENT, "certificate", certificate,
701           NULL);
702       g_object_unref (certificate);
703 
704       g_object_weak_ref (G_OBJECT (agent), (GWeakNotify) agent_weak_ref_notify,
705           (gpointer) g_strdup (pem));
706 
707       g_hash_table_insert (agent_table, g_strdup (pem), agent);
708 
709       GST_DEBUG_OBJECT (agent, "no agent found, created new");
710     } else {
711       g_object_ref (agent);
712       GST_DEBUG_OBJECT (agent, "agent found");
713     }
714 
715     G_UNLOCK (agent_table);
716   }
717 
718 
719   return agent;
720 }
721 
722 static void
agent_weak_ref_notify(gchar * pem,GstDtlsAgent * agent)723 agent_weak_ref_notify (gchar * pem, GstDtlsAgent * agent)
724 {
725   G_LOCK (agent_table);
726   g_hash_table_remove (agent_table, pem);
727   G_UNLOCK (agent_table);
728 
729   g_free (pem);
730   pem = NULL;
731 }
732 
733 static GHashTable *connection_table = NULL;
734 G_LOCK_DEFINE_STATIC (connection_table);
735 
736 GstDtlsConnection *
gst_dtls_dec_fetch_connection(gchar * id)737 gst_dtls_dec_fetch_connection (gchar * id)
738 {
739   GstDtlsConnection *connection;
740   g_return_val_if_fail (id, NULL);
741 
742   GST_DEBUG ("fetching '%s' from connection table, size is %d",
743       id, g_hash_table_size (connection_table));
744 
745   G_LOCK (connection_table);
746 
747   connection = g_hash_table_lookup (connection_table, id);
748 
749   if (connection) {
750     g_object_ref (connection);
751     g_hash_table_remove (connection_table, id);
752   } else {
753     GST_WARNING ("no connection with id '%s' found", id);
754   }
755 
756   G_UNLOCK (connection_table);
757 
758   return connection;
759 }
760 
761 static void
on_connection_state_changed(GObject * object,GParamSpec * pspec,gpointer user_data)762 on_connection_state_changed (GObject * object, GParamSpec * pspec,
763     gpointer user_data)
764 {
765   GstDtlsDec *self = GST_DTLS_DEC (user_data);
766 
767   g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_CONNECTION_STATE]);
768 }
769 
770 static void
create_connection(GstDtlsDec * self,gchar * id)771 create_connection (GstDtlsDec * self, gchar * id)
772 {
773   g_return_if_fail (GST_IS_DTLS_DEC (self));
774   g_return_if_fail (GST_IS_DTLS_AGENT (self->agent));
775 
776   if (self->connection) {
777     g_signal_handlers_disconnect_by_func (self->connection,
778         on_connection_state_changed, self);
779     g_object_unref (self->connection);
780     self->connection = NULL;
781   }
782 
783   G_LOCK (connection_table);
784 
785   if (!connection_table) {
786     connection_table =
787         g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
788   }
789 
790   if (g_hash_table_contains (connection_table, id)) {
791     G_UNLOCK (connection_table);
792 
793     g_return_if_reached ();
794   }
795 
796   self->connection =
797       g_object_new (GST_TYPE_DTLS_CONNECTION, "agent", self->agent, NULL);
798   g_signal_connect_object (self->connection,
799       "notify::connection-state", G_CALLBACK (on_connection_state_changed),
800       self, 0);
801   on_connection_state_changed (NULL, NULL, self);
802 
803   g_object_weak_ref (G_OBJECT (self->connection),
804       (GWeakNotify) connection_weak_ref_notify, g_strdup (id));
805 
806   g_hash_table_insert (connection_table, g_strdup (id), self->connection);
807 
808   G_UNLOCK (connection_table);
809 }
810 
811 static void
connection_weak_ref_notify(gchar * id,GstDtlsConnection * connection)812 connection_weak_ref_notify (gchar * id, GstDtlsConnection * connection)
813 {
814   G_LOCK (connection_table);
815   g_hash_table_remove (connection_table, id);
816   G_UNLOCK (connection_table);
817 
818   g_free (id);
819   id = NULL;
820 }
821