• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2014, Ericsson AB. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without modification,
5  * are permitted provided that the following conditions are met:
6  *
7  * 1. Redistributions of source code must retain the above copyright notice, this
8  * list of conditions and the following disclaimer.
9  *
10  * 2. Redistributions in binary form must reproduce the above copyright notice, this
11  * list of conditions and the following disclaimer in the documentation and/or other
12  * materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
17  * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
18  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
19  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
21  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
23  * OF SUCH DAMAGE.
24  */
25 
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29 
30 #include "gstdtlselements.h"
31 #include "gstdtlsenc.h"
32 
33 #include "gstdtlsdec.h"
34 
35 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
36     GST_PAD_SINK,
37     GST_PAD_REQUEST,
38     GST_STATIC_CAPS_ANY);
39 
40 static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
41     GST_PAD_SRC,
42     GST_PAD_ALWAYS,
43     GST_STATIC_CAPS ("application/x-dtls")
44     );
45 
46 GST_DEBUG_CATEGORY_STATIC (gst_dtls_enc_debug);
47 #define GST_CAT_DEFAULT gst_dtls_enc_debug
48 
49 #define gst_dtls_enc_parent_class parent_class
50 G_DEFINE_TYPE_WITH_CODE (GstDtlsEnc, gst_dtls_enc, GST_TYPE_ELEMENT,
51     GST_DEBUG_CATEGORY_INIT (gst_dtls_enc_debug, "dtlsenc", 0, "DTLS Encoder"));
52 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (dtlsenc, "dtlsenc", GST_RANK_NONE,
53     GST_TYPE_DTLS_ENC, dtls_element_init (plugin));
54 
55 enum
56 {
57   SIGNAL_ON_KEY_RECEIVED,
58   NUM_SIGNALS
59 };
60 
61 static guint signals[NUM_SIGNALS];
62 
63 enum
64 {
65   PROP_0,
66   PROP_CONNECTION_ID,
67   PROP_IS_CLIENT,
68   PROP_ENCODER_KEY,
69   PROP_SRTP_CIPHER,
70   PROP_SRTP_AUTH,
71   PROP_CONNECTION_STATE,
72   NUM_PROPERTIES
73 };
74 
75 static GParamSpec *properties[NUM_PROPERTIES];
76 
77 #define DEFAULT_CONNECTION_ID NULL
78 #define DEFAULT_IS_CLIENT FALSE
79 
80 #define DEFAULT_ENCODER_KEY NULL
81 #define DEFAULT_SRTP_CIPHER 0
82 #define DEFAULT_SRTP_AUTH 0
83 
84 #define INITIAL_QUEUE_SIZE 64
85 
86 static void gst_dtls_enc_finalize (GObject *);
87 static void gst_dtls_enc_set_property (GObject *, guint prop_id,
88     const GValue *, GParamSpec *);
89 static void gst_dtls_enc_get_property (GObject *, guint prop_id, GValue *,
90     GParamSpec *);
91 
92 static GstStateChangeReturn gst_dtls_enc_change_state (GstElement *,
93     GstStateChange);
94 static GstPad *gst_dtls_enc_request_new_pad (GstElement *, GstPadTemplate *,
95     const gchar * name, const GstCaps *);
96 
97 static gboolean src_activate_mode (GstPad *, GstObject *, GstPadMode,
98     gboolean active);
99 static void src_task_loop (GstPad *);
100 
101 static GstFlowReturn sink_chain (GstPad *, GstObject *, GstBuffer *);
102 static gboolean sink_event (GstPad * pad, GstObject * parent, GstEvent * event);
103 
104 static void on_key_received (GstDtlsConnection *, gpointer key, guint cipher,
105     guint auth, GstDtlsEnc *);
106 static gboolean on_send_data (GstDtlsConnection *, gconstpointer data,
107     gsize length, GstDtlsEnc *);
108 
109 static void
gst_dtls_enc_class_init(GstDtlsEncClass * klass)110 gst_dtls_enc_class_init (GstDtlsEncClass * klass)
111 {
112   GObjectClass *gobject_class;
113   GstElementClass *element_class;
114 
115   gobject_class = (GObjectClass *) klass;
116   element_class = (GstElementClass *) klass;
117 
118   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_dtls_enc_finalize);
119   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_dtls_enc_set_property);
120   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_dtls_enc_get_property);
121 
122   element_class->change_state = GST_DEBUG_FUNCPTR (gst_dtls_enc_change_state);
123   element_class->request_new_pad =
124       GST_DEBUG_FUNCPTR (gst_dtls_enc_request_new_pad);
125 
126   signals[SIGNAL_ON_KEY_RECEIVED] =
127       g_signal_new ("on-key-received", G_TYPE_FROM_CLASS (klass),
128       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 0);
129 
130   properties[PROP_CONNECTION_ID] =
131       g_param_spec_string ("connection-id",
132       "Connection id",
133       "Every encoder/decoder pair should have the same, unique, connection-id",
134       DEFAULT_CONNECTION_ID, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
135 
136   properties[PROP_IS_CLIENT] =
137       g_param_spec_boolean ("is-client",
138       "Is client",
139       "Set to true if the decoder should act as "
140       "client and initiate the handshake",
141       DEFAULT_IS_CLIENT,
142       GST_PARAM_MUTABLE_READY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
143 
144   properties[PROP_ENCODER_KEY] =
145       g_param_spec_boxed ("encoder-key",
146       "Encoder key",
147       "Master key that should be used by the SRTP encoder",
148       GST_TYPE_BUFFER, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
149 
150   properties[PROP_SRTP_CIPHER] =
151       g_param_spec_uint ("srtp-cipher",
152       "SRTP cipher",
153       "The SRTP cipher selected in the DTLS handshake. "
154       "The value will be set to an GstDtlsSrtpCipher.",
155       0, GST_DTLS_SRTP_CIPHER_AES_128_ICM, DEFAULT_SRTP_CIPHER,
156       G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
157 
158   properties[PROP_SRTP_AUTH] =
159       g_param_spec_uint ("srtp-auth",
160       "SRTP authentication",
161       "The SRTP authentication selected in the DTLS handshake. "
162       "The value will be set to an GstDtlsSrtpAuth.",
163       0, GST_DTLS_SRTP_AUTH_HMAC_SHA1_80, DEFAULT_SRTP_AUTH,
164       G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
165 
166   properties[PROP_CONNECTION_STATE] =
167       g_param_spec_enum ("connection-state",
168       "Connection State",
169       "Current connection state",
170       GST_DTLS_TYPE_CONNECTION_STATE,
171       GST_DTLS_CONNECTION_STATE_NEW, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS);
172 
173   g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
174 
175   gst_element_class_add_static_pad_template (element_class, &src_template);
176   gst_element_class_add_static_pad_template (element_class, &sink_template);
177 
178   gst_element_class_set_static_metadata (element_class,
179       "DTLS Encoder",
180       "Encoder/Network/DTLS",
181       "Encodes packets with DTLS",
182       "Patrik Oldsberg patrik.oldsberg@ericsson.com");
183 }
184 
185 static void
gst_dtls_enc_init(GstDtlsEnc * self)186 gst_dtls_enc_init (GstDtlsEnc * self)
187 {
188   self->connection_id = NULL;
189   self->connection = NULL;
190 
191   self->is_client = DEFAULT_IS_CLIENT;
192 
193   self->encoder_key = NULL;
194   self->srtp_cipher = DEFAULT_SRTP_CIPHER;
195   self->srtp_auth = DEFAULT_SRTP_AUTH;
196 
197   g_queue_init (&self->queue);
198   g_mutex_init (&self->queue_lock);
199   g_cond_init (&self->queue_cond_add);
200 
201   self->src = gst_pad_new_from_static_template (&src_template, "src");
202   g_return_if_fail (self->src);
203 
204   gst_pad_set_activatemode_function (self->src,
205       GST_DEBUG_FUNCPTR (src_activate_mode));
206 
207   gst_element_add_pad (GST_ELEMENT (self), self->src);
208 }
209 
210 static void
gst_dtls_enc_finalize(GObject * object)211 gst_dtls_enc_finalize (GObject * object)
212 {
213   GstDtlsEnc *self = GST_DTLS_ENC (object);
214 
215   if (self->encoder_key) {
216     gst_buffer_unref (self->encoder_key);
217     self->encoder_key = NULL;
218   }
219 
220   if (self->connection_id) {
221     g_free (self->connection_id);
222     self->connection_id = NULL;
223   }
224 
225   g_mutex_lock (&self->queue_lock);
226   g_queue_foreach (&self->queue, (GFunc) gst_buffer_unref, NULL);
227   g_queue_clear (&self->queue);
228   g_mutex_unlock (&self->queue_lock);
229 
230   g_mutex_clear (&self->queue_lock);
231   g_cond_clear (&self->queue_cond_add);
232 
233   GST_LOG_OBJECT (self, "finalized");
234 
235   G_OBJECT_CLASS (parent_class)->finalize (object);
236 }
237 
238 static void
gst_dtls_enc_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)239 gst_dtls_enc_set_property (GObject * object, guint prop_id,
240     const GValue * value, GParamSpec * pspec)
241 {
242   GstDtlsEnc *self = GST_DTLS_ENC (object);
243 
244   switch (prop_id) {
245     case PROP_CONNECTION_ID:
246       if (self->connection_id != NULL) {
247         g_free (self->connection_id);
248         self->connection_id = NULL;
249       }
250       self->connection_id = g_value_dup_string (value);
251       break;
252     case PROP_IS_CLIENT:
253       self->is_client = g_value_get_boolean (value);
254       break;
255     default:
256       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
257   }
258 }
259 
260 static void
gst_dtls_enc_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)261 gst_dtls_enc_get_property (GObject * object, guint prop_id, GValue * value,
262     GParamSpec * pspec)
263 {
264   GstDtlsEnc *self = GST_DTLS_ENC (object);
265 
266   switch (prop_id) {
267     case PROP_CONNECTION_ID:
268       g_value_set_string (value, self->connection_id);
269       break;
270     case PROP_IS_CLIENT:
271       g_value_set_boolean (value, self->is_client);
272       break;
273     case PROP_ENCODER_KEY:
274       g_value_set_boxed (value, self->encoder_key);
275       break;
276     case PROP_SRTP_CIPHER:
277       g_value_set_uint (value, self->srtp_cipher);
278       break;
279     case PROP_SRTP_AUTH:
280       g_value_set_uint (value, self->srtp_auth);
281       break;
282     case PROP_CONNECTION_STATE:
283       if (self->connection)
284         g_object_get_property (G_OBJECT (self->connection), "connection-state",
285             value);
286       else
287         g_value_set_enum (value, GST_DTLS_CONNECTION_STATE_CLOSED);
288       break;
289     default:
290       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
291   }
292 }
293 
294 static void
on_connection_state_changed(GObject * object,GParamSpec * pspec,gpointer user_data)295 on_connection_state_changed (GObject * object, GParamSpec * pspec,
296     gpointer user_data)
297 {
298   GstDtlsEnc *self = GST_DTLS_ENC (user_data);
299 
300   g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_CONNECTION_STATE]);
301 }
302 
303 static GstStateChangeReturn
gst_dtls_enc_change_state(GstElement * element,GstStateChange transition)304 gst_dtls_enc_change_state (GstElement * element, GstStateChange transition)
305 {
306   GstDtlsEnc *self = GST_DTLS_ENC (element);
307   GstStateChangeReturn ret;
308 
309   switch (transition) {
310     case GST_STATE_CHANGE_NULL_TO_READY:
311       if (self->connection_id) {
312         self->connection = gst_dtls_dec_fetch_connection (self->connection_id);
313 
314         if (!self->connection) {
315           GST_WARNING_OBJECT (self,
316               "invalid connection id: '%s', connection not found or already in use",
317               self->connection_id);
318           return GST_STATE_CHANGE_FAILURE;
319         }
320 
321         g_signal_connect_object (self->connection,
322             "on-encoder-key", G_CALLBACK (on_key_received), self, 0);
323         g_signal_connect_object (self->connection,
324             "notify::connection-state",
325             G_CALLBACK (on_connection_state_changed), self, 0);
326         on_connection_state_changed (NULL, NULL, self);
327 
328         gst_dtls_connection_set_send_callback (self->connection,
329             (GstDtlsConnectionSendCallback) on_send_data, self, NULL);
330       } else {
331         GST_WARNING_OBJECT (self,
332             "trying to change state to ready without connection id");
333         return GST_STATE_CHANGE_FAILURE;
334       }
335       break;
336     case GST_STATE_CHANGE_PAUSED_TO_READY:
337       GST_DEBUG_OBJECT (self, "stopping connection %s", self->connection_id);
338 
339       gst_dtls_connection_stop (self->connection);
340       break;
341     case GST_STATE_CHANGE_READY_TO_NULL:
342       GST_DEBUG_OBJECT (self, "closing connection %s", self->connection_id);
343 
344       if (self->connection) {
345         gst_dtls_connection_close (self->connection);
346         gst_dtls_connection_set_send_callback (self->connection, NULL, NULL,
347             NULL);
348         g_object_unref (self->connection);
349         self->connection = NULL;
350       }
351       break;
352     default:
353       break;
354   }
355 
356   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
357 
358   switch (transition) {
359     case GST_STATE_CHANGE_READY_TO_PAUSED:{
360       GError *err = NULL;
361 
362       GST_DEBUG_OBJECT (self, "starting connection %s", self->connection_id);
363       if (!gst_dtls_connection_start (self->connection, self->is_client, &err)) {
364         GST_ELEMENT_ERROR (self, RESOURCE, OPEN_WRITE, (NULL), ("%s",
365                 err->message));
366         g_clear_error (&err);
367       }
368       break;
369     }
370     default:
371       break;
372   }
373 
374   return ret;
375 }
376 
377 static GstPad *
gst_dtls_enc_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * name,const GstCaps * caps)378 gst_dtls_enc_request_new_pad (GstElement * element,
379     GstPadTemplate * templ, const gchar * name, const GstCaps * caps)
380 {
381   GstPad *sink;
382   gboolean ret;
383 
384   GST_DEBUG_OBJECT (element, "sink pad requested");
385 
386   g_return_val_if_fail (templ->direction == GST_PAD_SINK, NULL);
387 
388   sink = gst_pad_new_from_template (templ, name);
389   g_return_val_if_fail (sink, NULL);
390 
391   if (caps) {
392     g_object_set (sink, "caps", caps, NULL);
393   }
394 
395   gst_pad_set_chain_function (sink, GST_DEBUG_FUNCPTR (sink_chain));
396   gst_pad_set_event_function (sink, GST_DEBUG_FUNCPTR (sink_event));
397 
398   ret = gst_pad_set_active (sink, TRUE);
399   g_warn_if_fail (ret);
400 
401   gst_element_add_pad (element, sink);
402 
403   return sink;
404 }
405 
406 static gboolean
src_activate_mode(GstPad * pad,GstObject * parent,GstPadMode mode,gboolean active)407 src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
408     gboolean active)
409 {
410   GstDtlsEnc *self = GST_DTLS_ENC (parent);
411   gboolean success = TRUE;
412   g_return_val_if_fail (mode == GST_PAD_MODE_PUSH, FALSE);
413 
414   if (active) {
415     GST_DEBUG_OBJECT (self, "src pad activating in push mode");
416 
417     self->flushing = FALSE;
418     self->src_ret = GST_FLOW_OK;
419     self->send_initial_events = TRUE;
420     success =
421         gst_pad_start_task (pad, (GstTaskFunction) src_task_loop, self->src,
422         NULL);
423     if (!success) {
424       GST_WARNING_OBJECT (self, "failed to activate pad task");
425     }
426   } else {
427     GST_DEBUG_OBJECT (self, "deactivating src pad");
428 
429     g_mutex_lock (&self->queue_lock);
430     g_queue_foreach (&self->queue, (GFunc) gst_buffer_unref, NULL);
431     g_queue_clear (&self->queue);
432     self->flushing = TRUE;
433     self->src_ret = GST_FLOW_FLUSHING;
434     g_cond_signal (&self->queue_cond_add);
435     g_mutex_unlock (&self->queue_lock);
436     success = gst_pad_stop_task (pad);
437     if (!success) {
438       GST_WARNING_OBJECT (self, "failed to deactivate pad task");
439     }
440   }
441 
442   return success;
443 }
444 
445 static void
src_task_loop(GstPad * pad)446 src_task_loop (GstPad * pad)
447 {
448   GstDtlsEnc *self = GST_DTLS_ENC (GST_PAD_PARENT (pad));
449   GstFlowReturn ret;
450   GstBuffer *buffer;
451   gboolean check_connection_timeout = FALSE;
452 
453   GST_TRACE_OBJECT (self, "src loop: acquiring lock");
454   g_mutex_lock (&self->queue_lock);
455   GST_TRACE_OBJECT (self, "src loop: acquired lock");
456 
457   if (self->flushing) {
458     GST_LOG_OBJECT (self, "src task loop entered on inactive pad");
459     GST_TRACE_OBJECT (self, "src loop: releasing lock");
460     g_mutex_unlock (&self->queue_lock);
461     return;
462   }
463 
464   while (g_queue_is_empty (&self->queue)) {
465     GST_TRACE_OBJECT (self, "src loop: queue empty, waiting for add");
466     g_cond_wait (&self->queue_cond_add, &self->queue_lock);
467     GST_TRACE_OBJECT (self, "src loop: add signaled");
468 
469     if (self->flushing) {
470       GST_LOG_OBJECT (self, "pad inactive, task returning");
471       GST_TRACE_OBJECT (self, "src loop: releasing lock");
472       g_mutex_unlock (&self->queue_lock);
473       return;
474     }
475   }
476   GST_TRACE_OBJECT (self, "src loop: queue has element");
477 
478   buffer = g_queue_pop_head (&self->queue);
479   g_mutex_unlock (&self->queue_lock);
480 
481   if (self->send_initial_events) {
482     GstSegment segment;
483     gchar s_id[32];
484     GstCaps *caps;
485 
486     self->send_initial_events = FALSE;
487 
488     g_snprintf (s_id, sizeof (s_id), "dtlsenc-%08x", g_random_int ());
489     gst_pad_push_event (self->src, gst_event_new_stream_start (s_id));
490     caps = gst_caps_new_empty_simple ("application/x-dtls");
491     gst_pad_push_event (self->src, gst_event_new_caps (caps));
492     gst_caps_unref (caps);
493     gst_segment_init (&segment, GST_FORMAT_BYTES);
494     gst_pad_push_event (self->src, gst_event_new_segment (&segment));
495     check_connection_timeout = TRUE;
496   }
497 
498   GST_TRACE_OBJECT (self, "src loop: releasing lock");
499 
500   if (buffer) {
501     ret = gst_pad_push (self->src, buffer);
502     if (check_connection_timeout)
503       gst_dtls_connection_check_timeout (self->connection);
504 
505     if (G_UNLIKELY (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
506       GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s",
507           gst_flow_get_name (ret));
508     }
509     g_mutex_lock (&self->queue_lock);
510     self->src_ret = ret;
511     g_mutex_unlock (&self->queue_lock);
512   } else {
513     GST_DEBUG_OBJECT (self, "Peer and us closed the connection, sending EOS");
514     gst_pad_push_event (self->src, gst_event_new_eos ());
515     g_mutex_lock (&self->queue_lock);
516     self->src_ret = GST_FLOW_EOS;
517     g_mutex_unlock (&self->queue_lock);
518   }
519 }
520 
521 static GstFlowReturn
sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)522 sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
523 {
524   GstDtlsEnc *self = GST_DTLS_ENC (parent);
525   GstMapInfo map_info;
526   GError *err = NULL;
527   gsize to_write, written = 0;
528   GstFlowReturn ret = GST_FLOW_OK;
529 
530   g_mutex_lock (&self->queue_lock);
531   if (self->src_ret != GST_FLOW_OK) {
532     if (G_UNLIKELY (self->src_ret == GST_FLOW_NOT_LINKED
533             || self->src_ret < GST_FLOW_EOS))
534       GST_ERROR_OBJECT (self, "Pushing previous data returned an error: %s",
535           gst_flow_get_name (self->src_ret));
536 
537     gst_buffer_unref (buffer);
538     g_mutex_unlock (&self->queue_lock);
539     return self->src_ret;
540   }
541   g_mutex_unlock (&self->queue_lock);
542 
543   gst_buffer_map (buffer, &map_info, GST_MAP_READ);
544 
545   to_write = map_info.size;
546 
547   while (to_write > 0 && ret == GST_FLOW_OK) {
548     ret =
549         gst_dtls_connection_send (self->connection, map_info.data,
550         map_info.size, &written, &err);
551 
552     switch (ret) {
553       case GST_FLOW_OK:
554         GST_DEBUG_OBJECT (self,
555             "Wrote %" G_GSIZE_FORMAT " B of %" G_GSIZE_FORMAT " B", written,
556             map_info.size);
557         g_assert (written <= to_write);
558         to_write -= written;
559         break;
560       case GST_FLOW_EOS:
561         GST_INFO_OBJECT (self, "Received data after the connection was closed");
562         break;
563       case GST_FLOW_ERROR:
564         GST_WARNING_OBJECT (self, "error sending data: %s", err->message);
565         GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL), ("%s", err->message));
566         g_clear_error (&err);
567         break;
568       case GST_FLOW_FLUSHING:
569         GST_INFO_OBJECT (self, "Flushing");
570         break;
571       default:
572         g_assert_not_reached ();
573         break;
574     }
575 
576     g_assert (err == NULL);
577   }
578 
579   gst_buffer_unmap (buffer, &map_info);
580   gst_buffer_unref (buffer);
581 
582   return ret;
583 }
584 
585 
586 static gboolean
sink_event(GstPad * pad,GstObject * parent,GstEvent * event)587 sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
588 {
589   GstDtlsEnc *self = GST_DTLS_ENC (parent);
590   gboolean ret = FALSE;
591 
592   switch (GST_EVENT_TYPE (event)) {
593       /* Drop segment, stream-start as we will push our own from the src pad
594        * task.
595        * FIXME: do we need any information from upstream for pushing our own? */
596     case GST_EVENT_SEGMENT:
597     case GST_EVENT_STREAM_START:
598       gst_event_unref (event);
599       ret = TRUE;
600       break;
601     case GST_EVENT_EOS:{
602       GstFlowReturn flow_ret;
603 
604       /* Close the write side of the connection now */
605       flow_ret =
606           gst_dtls_connection_send (self->connection, NULL, 0, NULL, NULL);
607 
608       if (flow_ret != GST_FLOW_OK)
609         GST_ERROR_OBJECT (self, "Failed to send close_notify");
610 
611       /* Do not forward the EOS event unless the peer already closed to the
612        * connection itself. If it didn't yet then we'll later get the send
613        * callback called with no data and send EOS from there */
614       if (flow_ret == GST_FLOW_EOS) {
615         ret = gst_pad_event_default (pad, parent, event);
616       } else {
617         gst_event_unref (event);
618         ret = TRUE;
619       }
620 
621       break;
622     }
623     default:
624       ret = gst_pad_event_default (pad, parent, event);
625       break;
626   }
627 
628   return ret;
629 }
630 
631 static void
on_key_received(GstDtlsConnection * connection,gpointer key,guint cipher,guint auth,GstDtlsEnc * self)632 on_key_received (GstDtlsConnection * connection, gpointer key, guint cipher,
633     guint auth, GstDtlsEnc * self)
634 {
635   GstBuffer *new_encoder_key;
636   gchar *key_str;
637 
638   g_return_if_fail (GST_IS_DTLS_ENC (self));
639   g_return_if_fail (GST_IS_DTLS_CONNECTION (connection));
640 
641   self->srtp_cipher = cipher;
642   self->srtp_auth = auth;
643 
644   new_encoder_key =
645       gst_buffer_new_memdup (key, GST_DTLS_SRTP_MASTER_KEY_LENGTH);
646 
647   if (self->encoder_key)
648     gst_buffer_unref (self->encoder_key);
649 
650   self->encoder_key = new_encoder_key;
651 
652   key_str = g_base64_encode (key, GST_DTLS_SRTP_MASTER_KEY_LENGTH);
653   GST_INFO_OBJECT (self, "received key: %s", key_str);
654   g_free (key_str);
655 
656   g_signal_emit (self, signals[SIGNAL_ON_KEY_RECEIVED], 0);
657 }
658 
659 static gboolean
on_send_data(GstDtlsConnection * connection,gconstpointer data,gsize length,GstDtlsEnc * self)660 on_send_data (GstDtlsConnection * connection, gconstpointer data, gsize length,
661     GstDtlsEnc * self)
662 {
663   GstBuffer *buffer;
664   gboolean ret;
665 
666   GST_DEBUG_OBJECT (self, "sending data from %s with length %" G_GSIZE_FORMAT,
667       self->connection_id, length);
668 
669   buffer = data ? gst_buffer_new_memdup (data, length) : NULL;
670 
671   GST_TRACE_OBJECT (self, "send data: acquiring lock");
672   g_mutex_lock (&self->queue_lock);
673   GST_TRACE_OBJECT (self, "send data: acquired lock");
674 
675   g_queue_push_tail (&self->queue, buffer);
676 
677   GST_TRACE_OBJECT (self, "send data: signaling add");
678   g_cond_signal (&self->queue_cond_add);
679 
680   GST_TRACE_OBJECT (self, "send data: releasing lock");
681 
682   ret = self->src_ret == GST_FLOW_OK;
683   if (self->src_ret == GST_FLOW_FLUSHING)
684     gst_dtls_connection_set_flow_return (connection, self->src_ret);
685   g_mutex_unlock (&self->queue_lock);
686 
687   return ret;
688 }
689