• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) 2018, Collabora Ltd.
3  * Copyright (C) 2018, SK Telecom, Co., Ltd.
4  *   Author: Jeongseok Kim <jeongseok.kim@sk.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25 
26 /* Needed for GValueArray */
27 #define GLIB_DISABLE_DEPRECATION_WARNINGS
28 
29 #include "gstsrtobject.h"
30 
31 #include <gst/base/gstbasesink.h>
32 #include <gio/gnetworking.h>
33 #include <stdlib.h>
34 #include <stdbool.h>
35 #include <stdint.h>
36 
37 GST_DEBUG_CATEGORY_EXTERN (gst_debug_srtobject);
38 #define GST_CAT_DEFAULT gst_debug_srtobject
39 
40 #if SRT_VERSION_VALUE > 0x10402
41 #define SRTSOCK_ERROR_DEBUG ("libsrt reported: %s", srt_rejectreason_str (reason))
42 #else
43 /* srt_rejectreason_str() is unavailable in libsrt 1.4.2 and prior due to
44  * unexported symbol. See https://github.com/Haivision/srt/pull/1728. */
45 #define SRTSOCK_ERROR_DEBUG ("libsrt reported reject reason code %d", reason)
46 #endif
47 
48 /* Define options added in later revisions */
49 #if SRT_VERSION_VALUE < 0x10402
50 #define SRTO_DRIFTTRACER 37
51 /* We can't define SRTO_BINDTODEVICE since it depends on configuration flags *sigh* */
52 #define SRTO_RETRANSMITALGO 61
53 #endif
54 
55 #define ELEMENT_WARNING_SRTSOCK_ERROR(code, reason) \
56   GST_ELEMENT_WARNING (srtobject->element, RESOURCE, code, \
57   ("Error on SRT socket. Trying to reconnect."), SRTSOCK_ERROR_DEBUG)
58 
59 enum
60 {
61   PROP_URI = 1,
62   PROP_MODE,
63   PROP_LOCALADDRESS,
64   PROP_LOCALPORT,
65   PROP_PASSPHRASE,
66   PROP_PBKEYLEN,
67   PROP_POLL_TIMEOUT,
68   PROP_LATENCY,
69   PROP_MSG_SIZE,
70   PROP_STATS,
71   PROP_WAIT_FOR_CONNECTION,
72   PROP_STREAMID,
73   PROP_AUTHENTICATION,
74   PROP_LAST
75 };
76 
77 typedef struct
78 {
79   SRTSOCKET sock;
80   gint poll_id;
81   GSocketAddress *sockaddr;
82   gboolean sent_headers;
83 } SRTCaller;
84 
85 static GstStructure *gst_srt_object_accumulate_stats (GstSRTObject * srtobject,
86     SRTSOCKET srtsock);
87 
88 static SRTCaller *
srt_caller_new(void)89 srt_caller_new (void)
90 {
91   SRTCaller *caller = g_new0 (SRTCaller, 1);
92   caller->sock = SRT_INVALID_SOCK;
93   caller->poll_id = SRT_ERROR;
94   caller->sent_headers = FALSE;
95 
96   return caller;
97 }
98 
99 static void
srt_caller_free(SRTCaller * caller)100 srt_caller_free (SRTCaller * caller)
101 {
102   g_return_if_fail (caller != NULL);
103 
104   g_clear_object (&caller->sockaddr);
105 
106   if (caller->sock != SRT_INVALID_SOCK) {
107     srt_close (caller->sock);
108   }
109 
110   if (caller->poll_id != SRT_ERROR) {
111     srt_epoll_release (caller->poll_id);
112   }
113 
114   g_free (caller);
115 }
116 
117 /* called with sock_lock */
118 static void
srt_caller_signal_removed(SRTCaller * caller,GstSRTObject * srtobject)119 srt_caller_signal_removed (SRTCaller * caller, GstSRTObject * srtobject)
120 {
121   GstStructure *stats;
122 
123   stats = gst_srt_object_accumulate_stats (srtobject, caller->sock);
124 
125   /* FIXME: These are the final statistics for the caller before we close its
126    * socket. Deliver the stats to the app before we throw them away. */
127   gst_structure_free (stats);
128 
129   g_signal_emit_by_name (srtobject->element, "caller-removed", 0,
130       caller->sockaddr);
131 }
132 
133 struct srt_constant_params
134 {
135   const gchar *name;
136   SRT_SOCKOPT param;
137   const void *val;
138   int val_len;
139 };
140 
141 static const bool bool_false = false;
142 static const bool bool_true = true;
143 static const struct linger no_linger = { 0, 0 };
144 
145 /* *INDENT-OFF* */
146 static const struct srt_constant_params srt_params[] = {
147   {"SRTO_SNDSYN",    SRTO_SNDSYN,    &bool_false, sizeof bool_false}, /* non-blocking */
148   {"SRTO_RCVSYN",    SRTO_RCVSYN,    &bool_false, sizeof bool_false}, /* non-blocking */
149   {"SRTO_LINGER",    SRTO_LINGER,    &no_linger,  sizeof no_linger},  /* no linger time */
150   {"SRTO_TSBPDMODE", SRTO_TSBPDMODE, &bool_true,  sizeof bool_true},  /* Timestamp-based Packet Delivery mode must be enabled */
151   {NULL, -1, NULL, 0},
152 };
153 /* *INDENT-ON* */
154 
155 typedef struct
156 {
157   const gchar *name;
158   SRT_SOCKOPT opt;
159   GType gtype;
160 } SrtOption;
161 
162 SrtOption srt_options[] = {
163   {"mss", SRTO_MSS, G_TYPE_INT},
164   {"fc", SRTO_FC, G_TYPE_INT},
165   {"sndbuf", SRTO_SNDBUF, G_TYPE_INT},
166   {"rcvbuf", SRTO_RCVBUF, G_TYPE_INT},
167   {"maxbw", SRTO_MAXBW, G_TYPE_INT64},
168   {"tsbpdmode", SRTO_TSBPDMODE, G_TYPE_BOOLEAN},
169   {"latency", SRTO_LATENCY, G_TYPE_INT},
170   {"inputbw", SRTO_INPUTBW, G_TYPE_INT64},
171   {"oheadbw", SRTO_OHEADBW, G_TYPE_INT},
172   {"passphrase", SRTO_PASSPHRASE, G_TYPE_STRING},
173   {"pbkeylen", SRTO_PBKEYLEN, G_TYPE_INT},
174   {"ipttl", SRTO_IPTTL, G_TYPE_INT},
175   {"iptos", SRTO_IPTOS, G_TYPE_INT},
176   {"tlpktdrop", SRTO_TLPKTDROP, G_TYPE_BOOLEAN},
177   {"snddropdelay", SRTO_SNDDROPDELAY, G_TYPE_INT},
178   {"nakreport", SRTO_NAKREPORT, G_TYPE_BOOLEAN},
179   {"conntimeo", SRTO_CONNTIMEO, G_TYPE_INT},
180   {"drifttracer", SRTO_DRIFTTRACER, G_TYPE_BOOLEAN},
181   {"lossmaxttl", SRTO_LOSSMAXTTL, G_TYPE_INT},
182   {"rcvlatency", SRTO_RCVLATENCY, G_TYPE_INT},
183   {"peerlatency", SRTO_PEERLATENCY, G_TYPE_INT},
184   {"minversion", SRTO_MINVERSION, G_TYPE_INT},
185   {"streamid", SRTO_STREAMID, G_TYPE_STRING},
186   {"congestion", SRTO_CONGESTION, G_TYPE_STRING},
187   {"messageapi", SRTO_MESSAGEAPI, G_TYPE_BOOLEAN},
188   {"payloadsize", SRTO_PAYLOADSIZE, G_TYPE_INT},
189   {"transtype", SRTO_TRANSTYPE, G_TYPE_INT},
190   {"kmrefreshrate", SRTO_KMREFRESHRATE, G_TYPE_INT},
191   {"kmpreannounce", SRTO_KMPREANNOUNCE, G_TYPE_INT},
192   {"enforcedencryption", SRTO_ENFORCEDENCRYPTION, G_TYPE_BOOLEAN},
193   {"ipv6only", SRTO_IPV6ONLY, G_TYPE_INT},
194   {"peeridletimeo", SRTO_PEERIDLETIMEO, G_TYPE_INT},
195 #if SRT_VERSION_VALUE >= 0x10402
196   {"bindtodevice", SRTO_BINDTODEVICE, G_TYPE_STRING},
197 #endif
198   {"packetfilter", SRTO_PACKETFILTER, G_TYPE_STRING},
199   {"retransmitalgo", SRTO_RETRANSMITALGO, G_TYPE_INT},
200   {NULL}
201 };
202 
203 static gint srt_init_refcount = 0;
204 
205 static GSocketAddress *
gst_srt_object_resolve(GstSRTObject * srtobject,const gchar * address,guint port,GCancellable * cancellable,GError ** err_out)206 gst_srt_object_resolve (GstSRTObject * srtobject, const gchar * address,
207     guint port, GCancellable * cancellable, GError ** err_out)
208 {
209   GError *err = NULL;
210   GSocketAddress *saddr;
211   GResolver *resolver;
212 
213   saddr = g_inet_socket_address_new_from_string (address, port);
214   if (!saddr) {
215     GList *results;
216 
217     GST_DEBUG_OBJECT (srtobject->element, "resolving IP address for host %s",
218         address);
219     resolver = g_resolver_get_default ();
220     results = g_resolver_lookup_by_name (resolver, address, cancellable, &err);
221     if (!results)
222       goto name_resolve;
223 
224     saddr = g_inet_socket_address_new (G_INET_ADDRESS (results->data), port);
225 
226     g_resolver_free_addresses (results);
227     g_object_unref (resolver);
228   }
229 #ifndef GST_DISABLE_GST_DEBUG
230   {
231     gchar *ip =
232         g_inet_address_to_string (g_inet_socket_address_get_address
233         (G_INET_SOCKET_ADDRESS (saddr)));
234 
235     GST_DEBUG_OBJECT (srtobject->element, "IP address for host %s is %s",
236         address, ip);
237     g_free (ip);
238   }
239 #endif
240 
241   return saddr;
242 
243 name_resolve:
244   {
245     GST_WARNING_OBJECT (srtobject->element, "Failed to resolve %s: %s", address,
246         err->message);
247     g_set_error (err_out, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
248         "Failed to resolve host '%s': %s", address, err->message);
249     g_clear_error (&err);
250     g_object_unref (resolver);
251     return NULL;
252   }
253 }
254 
255 static gboolean
gst_srt_object_apply_socket_option(SRTSOCKET sock,SrtOption * option,const GValue * value,GError ** error)256 gst_srt_object_apply_socket_option (SRTSOCKET sock, SrtOption * option,
257     const GValue * value, GError ** error)
258 {
259   union
260   {
261     int32_t i;
262     int64_t i64;
263     gboolean b;
264     const gchar *c;
265   } u;
266   const void *optval = &u;
267   gint optlen;
268 
269   if (!G_VALUE_HOLDS (value, option->gtype)) {
270     goto bad_type;
271   }
272 
273   switch (option->gtype) {
274     case G_TYPE_INT:
275       u.i = g_value_get_int (value);
276       optlen = sizeof u.i;
277       break;
278     case G_TYPE_INT64:
279       u.i64 = g_value_get_int64 (value);
280       optlen = sizeof u.i64;
281       break;
282     case G_TYPE_BOOLEAN:
283       u.b = g_value_get_boolean (value);
284       optlen = sizeof u.b;
285       break;
286     case G_TYPE_STRING:
287       u.c = g_value_get_string (value);
288       optval = u.c;
289       optlen = u.c ? strlen (u.c) : 0;
290       if (optlen == 0) {
291         return TRUE;
292       }
293       break;
294     default:
295       goto bad_type;
296   }
297 
298   if (srt_setsockopt (sock, 0, option->opt, optval, optlen)) {
299     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
300         "failed to set %s (reason: %s)", option->name, srt_getlasterror_str ());
301     return FALSE;
302   }
303 
304   return TRUE;
305 
306 bad_type:
307   g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
308       "option %s has unsupported type", option->name);
309   return FALSE;
310 }
311 
312 static gboolean
gst_srt_object_set_common_params(SRTSOCKET sock,GstSRTObject * srtobject,GError ** error)313 gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
314     GError ** error)
315 {
316   const struct srt_constant_params *params = srt_params;
317   SrtOption *option = srt_options;
318 
319   GST_OBJECT_LOCK (srtobject->element);
320 
321   for (; params->name != NULL; params++) {
322     if (srt_setsockopt (sock, 0, params->param, params->val, params->val_len)) {
323       g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
324           "failed to set %s (reason: %s)", params->name,
325           srt_getlasterror_str ());
326       goto err;
327     }
328   }
329 
330   for (; option->name; ++option) {
331     const GValue *val;
332 
333     val = gst_structure_get_value (srtobject->parameters, option->name);
334     if (val && !gst_srt_object_apply_socket_option (sock, option, val, error)) {
335       goto err;
336     }
337   }
338 
339   GST_OBJECT_UNLOCK (srtobject->element);
340   return TRUE;
341 
342 err:
343   GST_OBJECT_UNLOCK (srtobject->element);
344   return FALSE;
345 }
346 
347 GstSRTObject *
gst_srt_object_new(GstElement * element)348 gst_srt_object_new (GstElement * element)
349 {
350   GstSRTObject *srtobject;
351 
352   if (g_atomic_int_add (&srt_init_refcount, 1) == 0) {
353     GST_DEBUG_OBJECT (element, "Starting up SRT");
354     if (srt_startup () < 0) {
355       g_warning ("Failed to initialize SRT (reason: %s)",
356           srt_getlasterror_str ());
357     }
358   }
359 
360   srtobject = g_new0 (GstSRTObject, 1);
361   srtobject->element = element;
362   srtobject->parameters = gst_structure_new_empty ("application/x-srt-params");
363   srtobject->sock = SRT_INVALID_SOCK;
364   srtobject->poll_id = srt_epoll_create ();
365   srtobject->listener_sock = SRT_INVALID_SOCK;
366   srtobject->listener_poll_id = SRT_ERROR;
367   srtobject->sent_headers = FALSE;
368   srtobject->wait_for_connection = GST_SRT_DEFAULT_WAIT_FOR_CONNECTION;
369 
370   g_cond_init (&srtobject->sock_cond);
371   return srtobject;
372 }
373 
374 void
gst_srt_object_destroy(GstSRTObject * srtobject)375 gst_srt_object_destroy (GstSRTObject * srtobject)
376 {
377   g_return_if_fail (srtobject != NULL);
378 
379   if (srtobject->poll_id != SRT_ERROR) {
380     srt_epoll_release (srtobject->poll_id);
381     srtobject->poll_id = SRT_ERROR;
382   }
383 
384   g_cond_clear (&srtobject->sock_cond);
385 
386   GST_DEBUG_OBJECT (srtobject->element, "Destroying srtobject");
387   gst_structure_free (srtobject->parameters);
388 
389   if (g_atomic_int_dec_and_test (&srt_init_refcount)) {
390     srt_cleanup ();
391     GST_DEBUG_OBJECT (srtobject->element, "Cleaning up SRT");
392   }
393 
394   g_clear_pointer (&srtobject->uri, gst_uri_unref);
395 
396   g_free (srtobject);
397 }
398 
399 gboolean
gst_srt_object_set_property_helper(GstSRTObject * srtobject,guint prop_id,const GValue * value,GParamSpec * pspec)400 gst_srt_object_set_property_helper (GstSRTObject * srtobject,
401     guint prop_id, const GValue * value, GParamSpec * pspec)
402 {
403   GST_OBJECT_LOCK (srtobject->element);
404 
405   switch (prop_id) {
406     case PROP_URI:
407       gst_srt_object_set_uri (srtobject, g_value_get_string (value), NULL);
408       break;
409     case PROP_MODE:
410       gst_structure_set_value (srtobject->parameters, "mode", value);
411       break;
412     case PROP_POLL_TIMEOUT:
413       gst_structure_set_value (srtobject->parameters, "poll-timeout", value);
414       break;
415     case PROP_LATENCY:
416       gst_structure_set_value (srtobject->parameters, "latency", value);
417       break;
418     case PROP_LOCALADDRESS:
419       gst_structure_set_value (srtobject->parameters, "localaddress", value);
420       break;
421     case PROP_LOCALPORT:
422       gst_structure_set_value (srtobject->parameters, "localport", value);
423       break;
424     case PROP_PASSPHRASE:
425       gst_structure_set_value (srtobject->parameters, "passphrase", value);
426       break;
427     case PROP_PBKEYLEN:
428       gst_structure_set (srtobject->parameters, "pbkeylen", G_TYPE_INT,
429           g_value_get_enum (value), NULL);
430       break;
431     case PROP_WAIT_FOR_CONNECTION:
432       srtobject->wait_for_connection = g_value_get_boolean (value);
433       break;
434     case PROP_STREAMID:
435       gst_structure_set_value (srtobject->parameters, "streamid", value);
436       break;
437     case PROP_AUTHENTICATION:
438       srtobject->authentication = g_value_get_boolean (value);
439       break;
440     default:
441       goto err;
442   }
443 
444   GST_OBJECT_UNLOCK (srtobject->element);
445   return TRUE;
446 
447 err:
448   GST_OBJECT_UNLOCK (srtobject->element);
449   return FALSE;
450 }
451 
452 gboolean
gst_srt_object_get_property_helper(GstSRTObject * srtobject,guint prop_id,GValue * value,GParamSpec * pspec)453 gst_srt_object_get_property_helper (GstSRTObject * srtobject,
454     guint prop_id, GValue * value, GParamSpec * pspec)
455 {
456   switch (prop_id) {
457     case PROP_URI:
458       GST_OBJECT_LOCK (srtobject->element);
459       g_value_take_string (value, gst_uri_to_string (srtobject->uri));
460       GST_OBJECT_UNLOCK (srtobject->element);
461       break;
462     case PROP_MODE:{
463       GstSRTConnectionMode v;
464 
465       GST_OBJECT_LOCK (srtobject->element);
466       if (!gst_structure_get_enum (srtobject->parameters, "mode",
467               GST_TYPE_SRT_CONNECTION_MODE, (gint *) & v)) {
468         GST_WARNING_OBJECT (srtobject->element, "Failed to get 'mode'");
469         v = GST_SRT_CONNECTION_MODE_NONE;
470       }
471       g_value_set_enum (value, v);
472       GST_OBJECT_UNLOCK (srtobject->element);
473       break;
474     }
475     case PROP_LOCALADDRESS:
476       GST_OBJECT_LOCK (srtobject->element);
477       g_value_set_string (value,
478           gst_structure_get_string (srtobject->parameters, "localaddress"));
479       GST_OBJECT_UNLOCK (srtobject->element);
480       break;
481     case PROP_LOCALPORT:{
482       guint v;
483 
484       GST_OBJECT_LOCK (srtobject->element);
485       if (!gst_structure_get_uint (srtobject->parameters, "localport", &v)) {
486         GST_WARNING_OBJECT (srtobject->element, "Failed to get 'localport'");
487         v = GST_SRT_DEFAULT_PORT;
488       }
489       g_value_set_uint (value, v);
490       GST_OBJECT_UNLOCK (srtobject->element);
491       break;
492     }
493     case PROP_PBKEYLEN:{
494       GstSRTKeyLength v;
495 
496       GST_OBJECT_LOCK (srtobject->element);
497       if (!gst_structure_get_int (srtobject->parameters, "pbkeylen",
498               (gint *) & v)) {
499         GST_WARNING_OBJECT (srtobject->element, "Failed to get 'pbkeylen'");
500         v = GST_SRT_KEY_LENGTH_NO_KEY;
501       }
502       g_value_set_enum (value, v);
503       GST_OBJECT_UNLOCK (srtobject->element);
504       break;
505     }
506     case PROP_POLL_TIMEOUT:{
507       gint v;
508 
509       GST_OBJECT_LOCK (srtobject->element);
510       if (!gst_structure_get_int (srtobject->parameters, "poll-timeout", &v)) {
511         GST_WARNING_OBJECT (srtobject->element, "Failed to get 'poll-timeout'");
512         v = GST_SRT_DEFAULT_POLL_TIMEOUT;
513       }
514       g_value_set_int (value, v);
515       GST_OBJECT_UNLOCK (srtobject->element);
516       break;
517     }
518     case PROP_LATENCY:{
519       gint v;
520 
521       GST_OBJECT_LOCK (srtobject->element);
522       if (!gst_structure_get_int (srtobject->parameters, "latency", &v)) {
523         GST_WARNING_OBJECT (srtobject->element, "Failed to get 'latency'");
524         v = GST_SRT_DEFAULT_LATENCY;
525       }
526       g_value_set_int (value, v);
527       GST_OBJECT_UNLOCK (srtobject->element);
528       break;
529     }
530     case PROP_STATS:
531       g_value_take_boxed (value, gst_srt_object_get_stats (srtobject));
532       break;
533     case PROP_WAIT_FOR_CONNECTION:
534       GST_OBJECT_LOCK (srtobject->element);
535       g_value_set_boolean (value, srtobject->wait_for_connection);
536       GST_OBJECT_UNLOCK (srtobject->element);
537       break;
538     case PROP_STREAMID:
539       GST_OBJECT_LOCK (srtobject->element);
540       g_value_set_string (value,
541           gst_structure_get_string (srtobject->parameters, "streamid"));
542       GST_OBJECT_UNLOCK (srtobject->element);
543       break;
544     case PROP_AUTHENTICATION:
545       g_value_set_boolean (value, srtobject->authentication);
546       break;
547     default:
548       return FALSE;
549   }
550 
551   return TRUE;
552 }
553 
554 void
gst_srt_object_install_properties_helper(GObjectClass * gobject_class)555 gst_srt_object_install_properties_helper (GObjectClass * gobject_class)
556 {
557   /**
558    * GstSRTSrc:uri:
559    *
560    * The URI used by SRT connection. User can specify SRT specific options by URI parameters.
561    * Refer to <a href="https://github.com/Haivision/srt/blob/master/docs/stransmit.md#medium-srt">Mediun: SRT</a>
562    */
563   g_object_class_install_property (gobject_class, PROP_URI,
564       g_param_spec_string ("uri", "URI",
565           "URI in the form of srt://address:port", GST_SRT_DEFAULT_URI,
566           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
567           G_PARAM_STATIC_STRINGS));
568 
569   /**
570    * GstSRTSrc:mode:
571    *
572    * The SRT connection mode.
573    * This property can be set by URI parameters.
574    */
575   g_object_class_install_property (gobject_class, PROP_MODE,
576       g_param_spec_enum ("mode", "Connection mode",
577           "SRT connection mode", GST_TYPE_SRT_CONNECTION_MODE,
578           GST_SRT_CONNECTION_MODE_CALLER,
579           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
580           G_PARAM_STATIC_STRINGS));
581   gst_type_mark_as_plugin_api (GST_TYPE_SRT_CONNECTION_MODE, 0);
582 
583   /**
584    * GstSRTSrc:localaddress:
585    *
586    * The address to bind when #GstSRTSrc:mode is listener or rendezvous.
587    * This property can be set by URI parameters.
588    */
589   g_object_class_install_property (gobject_class, PROP_LOCALADDRESS,
590       g_param_spec_string ("localaddress", "Local address",
591           "Local address to bind", GST_SRT_DEFAULT_LOCALADDRESS,
592           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
593           G_PARAM_STATIC_STRINGS));
594 
595   /**
596    * GstSRTSrc:localport:
597    *
598    * The local port to bind when #GstSRTSrc:mode is listener or rendezvous.
599    * This property can be set by URI parameters.
600    */
601   g_object_class_install_property (gobject_class, PROP_LOCALPORT,
602       g_param_spec_uint ("localport", "Local port",
603           "Local port to bind", 0,
604           65535, GST_SRT_DEFAULT_PORT,
605           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
606           G_PARAM_STATIC_STRINGS));
607 
608   /**
609    * GstSRTSrc:passphrase:
610    *
611    * The password for the encrypted transmission.
612    * This property can be set by URI parameters.
613    */
614   g_object_class_install_property (gobject_class, PROP_PASSPHRASE,
615       g_param_spec_string ("passphrase", "Passphrase",
616           "Password for the encrypted transmission", "",
617           G_PARAM_WRITABLE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS));
618 
619   /**
620    * GstSRTSrc:pbkeylen:
621    *
622    * The crypto key length.
623    * This property can be set by URI parameters.
624    */
625   g_object_class_install_property (gobject_class, PROP_PBKEYLEN,
626       g_param_spec_enum ("pbkeylen", "Crypto key length",
627           "Crypto key length in bytes", GST_TYPE_SRT_KEY_LENGTH,
628           GST_SRT_DEFAULT_PBKEYLEN,
629           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
630           G_PARAM_STATIC_STRINGS));
631   gst_type_mark_as_plugin_api (GST_TYPE_SRT_KEY_LENGTH, 0);
632 
633   /**
634    * GstSRTSrc:poll-timeout:
635    *
636    * The polling timeout used when srt poll is started.
637    * Even if the default value indicates infinite waiting, it can be cancellable according to #GstState
638    * This property can be set by URI parameters.
639    */
640   g_object_class_install_property (gobject_class, PROP_POLL_TIMEOUT,
641       g_param_spec_int ("poll-timeout", "Poll timeout",
642           "Return poll wait after timeout milliseconds (-1 = infinite)", -1,
643           G_MAXINT32, GST_SRT_DEFAULT_POLL_TIMEOUT,
644           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
645           G_PARAM_STATIC_STRINGS));
646 
647   /**
648    * GstSRTSrc:latency:
649    *
650    * The maximum accepted transmission latency.
651    */
652   g_object_class_install_property (gobject_class, PROP_LATENCY,
653       g_param_spec_int ("latency", "latency",
654           "Minimum latency (milliseconds)", 0,
655           G_MAXINT32, GST_SRT_DEFAULT_LATENCY,
656           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
657 
658   /**
659    * GstSRTSrc:stats:
660    *
661    * The statistics from SRT.
662    */
663   g_object_class_install_property (gobject_class, PROP_STATS,
664       g_param_spec_boxed ("stats", "Statistics",
665           "SRT Statistics", GST_TYPE_STRUCTURE,
666           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
667 
668   /**
669    * GstSRTSink:wait-for-connection:
670    *
671    * Boolean to block streaming until a client connects.  If TRUE,
672    * `srtsink' will stream only when a client is connected.
673    */
674   g_object_class_install_property (gobject_class, PROP_WAIT_FOR_CONNECTION,
675       g_param_spec_boolean ("wait-for-connection",
676           "Wait for a connection",
677           "Block the stream until a client connects",
678           GST_SRT_DEFAULT_WAIT_FOR_CONNECTION,
679           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
680 
681   /**
682    * GstSRTSrc:streamid:
683    *
684    * The stream id for the SRT access control.
685    */
686   g_object_class_install_property (gobject_class, PROP_STREAMID,
687       g_param_spec_string ("streamid", "Stream ID",
688           "Stream ID for the SRT access control", "",
689           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
690           G_PARAM_STATIC_STRINGS));
691 
692   /**
693    * GstSRTSink:authentication:
694    *
695    * Boolean to authenticate a connection.  If TRUE,
696    * the incoming connection is authenticated. Else,
697    * all the connections are accepted.
698    *
699    * Since: 1.20
700    *
701    */
702   g_object_class_install_property (gobject_class, PROP_AUTHENTICATION,
703       g_param_spec_boolean ("authentication",
704           "Authentication",
705           "Authenticate a connection",
706           FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
707 }
708 
709 static void
gst_srt_object_set_enum_value(GstStructure * s,GType enum_type,gconstpointer key,gconstpointer value)710 gst_srt_object_set_enum_value (GstStructure * s, GType enum_type,
711     gconstpointer key, gconstpointer value)
712 {
713   GEnumClass *enum_class;
714   GEnumValue *enum_value;
715 
716   enum_class = g_type_class_ref (enum_type);
717   enum_value = g_enum_get_value_by_nick (enum_class, value);
718 
719   if (enum_value) {
720     gst_structure_set (s, key, enum_type, enum_value->value, NULL);
721   }
722 
723   g_type_class_unref (enum_class);
724 }
725 
726 static void
gst_srt_object_set_string_value(GstStructure * s,const gchar * key,const gchar * value)727 gst_srt_object_set_string_value (GstStructure * s, const gchar * key,
728     const gchar * value)
729 {
730   gst_structure_set (s, key, G_TYPE_STRING, value, NULL);
731 }
732 
733 static void
gst_srt_object_set_uint_value(GstStructure * s,const gchar * key,const gchar * value)734 gst_srt_object_set_uint_value (GstStructure * s, const gchar * key,
735     const gchar * value)
736 {
737   gst_structure_set (s, key, G_TYPE_UINT,
738       (guint) g_ascii_strtoll (value, NULL, 10), NULL);
739 }
740 
741 static void
gst_srt_object_set_int_value(GstStructure * s,const gchar * key,const gchar * value)742 gst_srt_object_set_int_value (GstStructure * s, const gchar * key,
743     const gchar * value)
744 {
745   gst_structure_set (s, key, G_TYPE_INT,
746       (gint) g_ascii_strtoll (value, NULL, 10), NULL);
747 }
748 
749 static void
gst_srt_object_set_int64_value(GstStructure * s,const gchar * key,const gchar * value)750 gst_srt_object_set_int64_value (GstStructure * s, const gchar * key,
751     const gchar * value)
752 {
753   gst_structure_set (s, key, G_TYPE_INT64,
754       g_ascii_strtoll (value, NULL, 10), NULL);
755 }
756 
757 static void
gst_srt_object_set_boolean_value(GstStructure * s,const gchar * key,const gchar * value)758 gst_srt_object_set_boolean_value (GstStructure * s, const gchar * key,
759     const gchar * value)
760 {
761   gboolean bool_val;
762   static const gchar *true_names[] = {
763     "1", "yes", "on", "true", NULL
764   };
765   static const gchar *false_names[] = {
766     "0", "no", "off", "false", NULL
767   };
768 
769   if (g_strv_contains (true_names, value)) {
770     bool_val = TRUE;
771   } else if (g_strv_contains (false_names, value)) {
772     bool_val = FALSE;
773   } else {
774     return;
775   }
776 
777   gst_structure_set (s, key, G_TYPE_BOOLEAN, bool_val, NULL);
778 }
779 
780 static void
gst_srt_object_set_socket_option(GstStructure * s,const gchar * key,const gchar * value)781 gst_srt_object_set_socket_option (GstStructure * s, const gchar * key,
782     const gchar * value)
783 {
784   SrtOption *option = srt_options;
785 
786   for (; option; ++option) {
787     if (g_str_equal (key, option->name)) {
788       switch (option->gtype) {
789         case G_TYPE_INT:
790           gst_srt_object_set_int_value (s, key, value);
791           break;
792         case G_TYPE_INT64:
793           gst_srt_object_set_int64_value (s, key, value);
794           break;
795         case G_TYPE_STRING:
796           gst_srt_object_set_string_value (s, key, value);
797           break;
798         case G_TYPE_BOOLEAN:
799           gst_srt_object_set_boolean_value (s, key, value);
800           break;
801       }
802 
803       break;
804     }
805   }
806 }
807 
808 static void
gst_srt_object_validate_parameters(GstStructure * s,GstUri * uri)809 gst_srt_object_validate_parameters (GstStructure * s, GstUri * uri)
810 {
811   GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
812 
813   gst_structure_get_enum (s, "mode", GST_TYPE_SRT_CONNECTION_MODE,
814       (gint *) & connection_mode);
815 
816   if (connection_mode == GST_SRT_CONNECTION_MODE_RENDEZVOUS ||
817       connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
818     guint local_port;
819     const gchar *local_address = gst_structure_get_string (s, "localaddress");
820 
821     if (local_address == NULL) {
822       local_address =
823           gst_uri_get_host (uri) ==
824           NULL ? GST_SRT_DEFAULT_LOCALADDRESS : gst_uri_get_host (uri);
825       gst_srt_object_set_string_value (s, "localaddress", local_address);
826     }
827 
828     if (!gst_structure_get_uint (s, "localport", &local_port)) {
829       local_port =
830           gst_uri_get_port (uri) ==
831           GST_URI_NO_PORT ? GST_SRT_DEFAULT_PORT : gst_uri_get_port (uri);
832       gst_structure_set (s, "localport", G_TYPE_UINT, local_port, NULL);
833     }
834   }
835 }
836 
837 /* called with GST_OBJECT_LOCK (srtobject->element) held */
838 gboolean
gst_srt_object_set_uri(GstSRTObject * srtobject,const gchar * uri,GError ** err)839 gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar * uri,
840     GError ** err)
841 {
842   GHashTable *query_table = NULL;
843   GHashTableIter iter;
844   gpointer key, value;
845   const char *addr_str;
846 
847   if (srtobject->opened) {
848     g_warning
849         ("It's not supported to change the 'uri' property when SRT socket is opened.");
850     g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
851         "It's not supported to change the 'uri' property when SRT socket is opened");
852 
853     return FALSE;
854   }
855 
856   if (!g_str_has_prefix (uri, GST_SRT_DEFAULT_URI_SCHEME)) {
857     g_warning ("Given uri cannot be used for SRT connection.");
858     g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
859         "Invalid SRT URI scheme");
860     return FALSE;
861   }
862 
863   g_clear_pointer (&srtobject->uri, gst_uri_unref);
864   srtobject->uri = gst_uri_from_string (uri);
865 
866   g_clear_pointer (&srtobject->parameters, gst_structure_free);
867   srtobject->parameters = gst_structure_new ("application/x-srt-params",
868       "poll-timeout", G_TYPE_INT, GST_SRT_DEFAULT_POLL_TIMEOUT,
869       "latency", G_TYPE_INT, GST_SRT_DEFAULT_LATENCY, NULL);
870 
871   query_table = gst_uri_get_query_table (srtobject->uri);
872 
873   GST_DEBUG_OBJECT (srtobject->element,
874       "set uri to (host: %s, port: %d) with %d query strings",
875       gst_uri_get_host (srtobject->uri), gst_uri_get_port (srtobject->uri),
876       query_table == NULL ? 0 : g_hash_table_size (query_table));
877 
878   addr_str = gst_uri_get_host (srtobject->uri);
879   if (addr_str)
880     gst_srt_object_set_enum_value (srtobject->parameters,
881         GST_TYPE_SRT_CONNECTION_MODE, "mode", "caller");
882   else
883     gst_srt_object_set_enum_value (srtobject->parameters,
884         GST_TYPE_SRT_CONNECTION_MODE, "mode", "listener");
885 
886   if (query_table) {
887     g_hash_table_iter_init (&iter, query_table);
888     while (g_hash_table_iter_next (&iter, &key, &value)) {
889       if (!g_strcmp0 ("mode", key)) {
890         gst_srt_object_set_enum_value (srtobject->parameters,
891             GST_TYPE_SRT_CONNECTION_MODE, key, value);
892       } else if (!g_strcmp0 ("localaddress", key)) {
893         gst_srt_object_set_string_value (srtobject->parameters, key, value);
894       } else if (!g_strcmp0 ("localport", key)) {
895         gst_srt_object_set_uint_value (srtobject->parameters, key, value);
896       } else if (!g_strcmp0 ("poll-timeout", key)) {
897         gst_srt_object_set_int_value (srtobject->parameters, key, value);
898       } else {
899         gst_srt_object_set_socket_option (srtobject->parameters, key, value);
900       }
901     }
902 
903     g_hash_table_unref (query_table);
904   }
905 
906   gst_srt_object_validate_parameters (srtobject->parameters, srtobject->uri);
907 
908   return TRUE;
909 }
910 
911 static gpointer
thread_func(gpointer data)912 thread_func (gpointer data)
913 {
914   GstSRTObject *srtobject = data;
915   SRTSOCKET caller_sock;
916   union
917   {
918     struct sockaddr_storage ss;
919     struct sockaddr sa;
920   } caller_sa;
921   int caller_sa_len = sizeof (caller_sa);
922 
923   gint poll_timeout;
924 
925   SRTSOCKET rsock;
926   gint rsocklen = 1;
927 
928   for (;;) {
929     GST_OBJECT_LOCK (srtobject->element);
930     if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
931             &poll_timeout)) {
932       poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
933     }
934     GST_OBJECT_UNLOCK (srtobject->element);
935 
936     GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
937 
938     if (srt_epoll_wait (srtobject->listener_poll_id, &rsock,
939             &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
940       gint srt_errno = srt_getlasterror (NULL);
941 
942       if (srtobject->listener_poll_id == SRT_ERROR)
943         return NULL;
944       if (srt_errno == SRT_ETIMEOUT) {
945         continue;
946       } else {
947         GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
948             ("abort polling: %s", srt_getlasterror_str ()), (NULL));
949         return NULL;
950       }
951     }
952 
953     caller_sock =
954         srt_accept (srtobject->listener_sock, &caller_sa.sa, &caller_sa_len);
955 
956     if (caller_sock != SRT_INVALID_SOCK) {
957       SRTCaller *caller;
958       gint flag = SRT_EPOLL_ERR;
959 
960       caller = srt_caller_new ();
961       caller->sockaddr =
962           g_socket_address_new_from_native (&caller_sa.sa, caller_sa_len);
963       caller->poll_id = srt_epoll_create ();
964       caller->sock = caller_sock;
965 
966       if (gst_uri_handler_get_uri_type (GST_URI_HANDLER
967               (srtobject->element)) == GST_URI_SRC) {
968         flag |= SRT_EPOLL_IN;
969       } else {
970         flag |= SRT_EPOLL_OUT;
971       }
972 
973       if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) {
974 
975         GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS,
976             ("%s", srt_getlasterror_str ()), (NULL));
977 
978         srt_caller_free (caller);
979 
980         /* try-again */
981         continue;
982       }
983 
984       GST_DEBUG_OBJECT (srtobject->element, "Accept to connect %d",
985           caller->sock);
986 
987       g_mutex_lock (&srtobject->sock_lock);
988       srtobject->callers = g_list_append (srtobject->callers, caller);
989       g_cond_signal (&srtobject->sock_cond);
990       g_mutex_unlock (&srtobject->sock_lock);
991 
992       /* notifying caller-added */
993       g_signal_emit_by_name (srtobject->element, "caller-added", 0,
994           caller->sockaddr);
995 
996       if (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) ==
997           GST_URI_SRC)
998         return NULL;
999     }
1000   }
1001 }
1002 
1003 static GSocketAddress *
peeraddr_to_g_socket_address(const struct sockaddr * peeraddr)1004 peeraddr_to_g_socket_address (const struct sockaddr *peeraddr)
1005 {
1006   gsize peeraddr_len;
1007 
1008   switch (peeraddr->sa_family) {
1009     case AF_INET:
1010       peeraddr_len = sizeof (struct sockaddr_in);
1011       break;
1012     case AF_INET6:
1013       peeraddr_len = sizeof (struct sockaddr_in6);
1014       break;
1015     default:
1016       g_warning ("Unsupported address family %d", peeraddr->sa_family);
1017       return NULL;
1018   }
1019   return g_socket_address_new_from_native ((gpointer) peeraddr, peeraddr_len);
1020 }
1021 
1022 static gint
srt_listen_callback_func(GstSRTObject * self,SRTSOCKET sock,int hs_version,const struct sockaddr * peeraddr,const char * stream_id)1023 srt_listen_callback_func (GstSRTObject * self, SRTSOCKET sock, int hs_version,
1024     const struct sockaddr *peeraddr, const char *stream_id)
1025 {
1026   GSocketAddress *addr = peeraddr_to_g_socket_address (peeraddr);
1027 
1028   if (!addr) {
1029     GST_WARNING_OBJECT (self->element,
1030         "Invalid peer address. Rejecting sink %d streamid: %s", sock,
1031         stream_id);
1032     return -1;
1033   }
1034 
1035   if (self->authentication) {
1036     gboolean authenticated = FALSE;
1037 
1038     /* notifying caller-connecting */
1039     g_signal_emit_by_name (self->element, "caller-connecting", addr,
1040         stream_id, &authenticated);
1041 
1042     if (!authenticated)
1043       goto reject;
1044   }
1045 
1046   GST_DEBUG_OBJECT (self->element,
1047       "Accepting sink %d streamid: %s", sock, stream_id);
1048   g_object_unref (addr);
1049   return 0;
1050 reject:
1051   /* notifying caller-rejected */
1052   GST_WARNING_OBJECT (self->element,
1053       "Rejecting sink %d streamid: %s", sock, stream_id);
1054   g_signal_emit_by_name (self->element, "caller-rejected", addr, stream_id);
1055   g_object_unref (addr);
1056   return -1;
1057 }
1058 
1059 static gboolean
gst_srt_object_wait_connect(GstSRTObject * srtobject,GCancellable * cancellable,gpointer sa,size_t sa_len,GError ** error)1060 gst_srt_object_wait_connect (GstSRTObject * srtobject,
1061     GCancellable * cancellable, gpointer sa, size_t sa_len, GError ** error)
1062 {
1063   SRTSOCKET sock = SRT_INVALID_SOCK;
1064   const gchar *local_address = NULL;
1065   guint local_port = 0;
1066   gint sock_flags = SRT_EPOLL_ERR | SRT_EPOLL_IN;
1067 
1068   gpointer bind_sa;
1069   gsize bind_sa_len;
1070   GSocketAddress *bind_addr = NULL;
1071 
1072   GST_OBJECT_LOCK (srtobject->element);
1073 
1074   gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
1075 
1076   local_address =
1077       gst_structure_get_string (srtobject->parameters, "localaddress");
1078   if (local_address == NULL)
1079     local_address = GST_SRT_DEFAULT_LOCALADDRESS;
1080 
1081   GST_OBJECT_UNLOCK (srtobject->element);
1082 
1083   bind_addr =
1084       gst_srt_object_resolve (srtobject, local_address, local_port, cancellable,
1085       error);
1086   if (!bind_addr) {
1087     goto failed;
1088   }
1089 
1090   bind_sa_len = g_socket_address_get_native_size (bind_addr);
1091   bind_sa = g_alloca (bind_sa_len);
1092 
1093   if (!g_socket_address_to_native (bind_addr, bind_sa, bind_sa_len, error)) {
1094     goto failed;
1095   }
1096 
1097   g_clear_object (&bind_addr);
1098 
1099   sock = srt_create_socket ();
1100   if (sock == SRT_INVALID_SOCK) {
1101     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_INIT, "%s",
1102         srt_getlasterror_str ());
1103     goto failed;
1104   }
1105 
1106   if (!gst_srt_object_set_common_params (sock, srtobject, error)) {
1107     goto failed;
1108   }
1109 
1110   GST_DEBUG_OBJECT (srtobject->element, "Binding to %s (port: %d)",
1111       local_address, local_port);
1112 
1113   if (srt_bind (sock, bind_sa, bind_sa_len) == SRT_ERROR) {
1114     g_set_error (error, GST_RESOURCE_ERROR,
1115         GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot bind to %s:%d - %s",
1116         local_address, local_port, srt_getlasterror_str ());
1117     goto failed;
1118   }
1119 
1120   if (srt_epoll_add_usock (srtobject->listener_poll_id, sock, &sock_flags)) {
1121     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
1122         srt_getlasterror_str ());
1123     goto failed;
1124   }
1125 
1126   GST_DEBUG_OBJECT (srtobject->element, "Starting to listen on bind socket");
1127   if (srt_listen (sock, 1) == SRT_ERROR) {
1128     g_set_error (error, GST_RESOURCE_ERROR,
1129         GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot listen on bind socket: %s",
1130         srt_getlasterror_str ());
1131 
1132     goto failed;
1133   }
1134 
1135   srtobject->listener_sock = sock;
1136 
1137   /* Register the SRT listen callback */
1138   if (srt_listen_callback (srtobject->listener_sock,
1139           (srt_listen_callback_fn *) srt_listen_callback_func, srtobject)) {
1140     goto failed;
1141   }
1142 
1143   srtobject->thread =
1144       g_thread_try_new ("GstSRTObjectListener", thread_func, srtobject, error);
1145   if (srtobject->thread == NULL) {
1146     GST_ERROR_OBJECT (srtobject->element, "Failed to start thread");
1147     goto failed;
1148   }
1149 
1150   return TRUE;
1151 
1152 failed:
1153 
1154   if (srtobject->listener_poll_id != SRT_ERROR) {
1155     srt_epoll_release (srtobject->listener_poll_id);
1156   }
1157 
1158   if (sock != SRT_INVALID_SOCK) {
1159     srt_close (sock);
1160   }
1161 
1162   g_clear_object (&bind_addr);
1163 
1164   srtobject->listener_poll_id = SRT_ERROR;
1165   srtobject->listener_sock = SRT_INVALID_SOCK;
1166 
1167   return FALSE;
1168 }
1169 
1170 static gboolean
gst_srt_object_connect(GstSRTObject * srtobject,GCancellable * cancellable,GstSRTConnectionMode connection_mode,gpointer sa,size_t sa_len,GError ** error)1171 gst_srt_object_connect (GstSRTObject * srtobject, GCancellable * cancellable,
1172     GstSRTConnectionMode connection_mode, gpointer sa, size_t sa_len,
1173     GError ** error)
1174 {
1175   SRTSOCKET sock;
1176   gint sock_flags = SRT_EPOLL_ERR;
1177   guint local_port = 0;
1178   const gchar *local_address = NULL;
1179   bool sender;
1180   bool rendezvous;
1181 
1182   sock = srt_create_socket ();
1183   if (sock == SRT_INVALID_SOCK) {
1184     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_INIT, "%s",
1185         srt_getlasterror_str ());
1186     goto failed;
1187   }
1188 
1189   if (!gst_srt_object_set_common_params (sock, srtobject, error)) {
1190     goto failed;
1191   }
1192 
1193   switch (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element))) {
1194     case GST_URI_SRC:
1195       sender = false;
1196       sock_flags |= SRT_EPOLL_IN;
1197       break;
1198     case GST_URI_SINK:
1199       sender = true;
1200       sock_flags |= SRT_EPOLL_OUT;
1201       break;
1202     default:
1203       g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
1204           "Cannot determine stream direction");
1205       goto failed;
1206   }
1207 
1208   if (srt_setsockopt (sock, 0, SRTO_SENDER, &sender, sizeof sender)) {
1209     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
1210         srt_getlasterror_str ());
1211     goto failed;
1212   }
1213 
1214   rendezvous = (connection_mode == GST_SRT_CONNECTION_MODE_RENDEZVOUS);
1215   if (srt_setsockopt (sock, 0, SRTO_RENDEZVOUS, &rendezvous, sizeof rendezvous)) {
1216     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
1217         srt_getlasterror_str ());
1218     goto failed;
1219   }
1220 
1221   GST_OBJECT_LOCK (srtobject->element);
1222   gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
1223   local_address =
1224       gst_structure_get_string (srtobject->parameters, "localaddress");
1225   GST_OBJECT_UNLOCK (srtobject->element);
1226 
1227   /* According to SRT norm, bind local address and port if specified */
1228   if (local_address != NULL && local_port != 0) {
1229     gpointer bind_sa;
1230     gsize bind_sa_len;
1231 
1232     GSocketAddress *bind_addr =
1233         gst_srt_object_resolve (srtobject, local_address,
1234         local_port, cancellable, error);
1235 
1236     if (!bind_addr) {
1237       goto failed;
1238     }
1239 
1240     bind_sa_len = g_socket_address_get_native_size (bind_addr);
1241     bind_sa = g_alloca (bind_sa_len);
1242 
1243     if (!g_socket_address_to_native (bind_addr, bind_sa, bind_sa_len, error)) {
1244       g_clear_object (&bind_addr);
1245       goto failed;
1246     }
1247 
1248     g_clear_object (&bind_addr);
1249 
1250     GST_DEBUG_OBJECT (srtobject->element, "Binding to %s (port: %d)",
1251         local_address, local_port);
1252 
1253     if (srt_bind (sock, bind_sa, bind_sa_len) == SRT_ERROR) {
1254       g_set_error (error, GST_RESOURCE_ERROR,
1255           GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot bind to %s:%d - %s",
1256           local_address, local_port, srt_getlasterror_str ());
1257       goto failed;
1258     }
1259   }
1260 
1261   if (srt_epoll_add_usock (srtobject->poll_id, sock, &sock_flags)) {
1262     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
1263         srt_getlasterror_str ());
1264     goto failed;
1265   }
1266 
1267   if (srt_connect (sock, sa, sa_len) == SRT_ERROR) {
1268     g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ, "%s",
1269         srt_getlasterror_str ());
1270     goto failed;
1271   }
1272 
1273   srtobject->sock = sock;
1274 
1275   return TRUE;
1276 
1277 failed:
1278 
1279   if (srtobject->poll_id != SRT_ERROR) {
1280     srt_epoll_release (srtobject->poll_id);
1281   }
1282 
1283   if (sock != SRT_INVALID_SOCK) {
1284     srt_close (sock);
1285   }
1286 
1287   srtobject->poll_id = SRT_ERROR;
1288   srtobject->sock = SRT_INVALID_SOCK;
1289 
1290   return FALSE;
1291 }
1292 
1293 static gboolean
gst_srt_object_open_connection(GstSRTObject * srtobject,GCancellable * cancellable,GstSRTConnectionMode connection_mode,gpointer sa,size_t sa_len,GError ** error)1294 gst_srt_object_open_connection (GstSRTObject * srtobject,
1295     GCancellable * cancellable, GstSRTConnectionMode connection_mode,
1296     gpointer sa, size_t sa_len, GError ** error)
1297 {
1298   gboolean ret = FALSE;
1299 
1300   if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1301     ret =
1302         gst_srt_object_wait_connect (srtobject, cancellable, sa, sa_len, error);
1303   } else {
1304     ret =
1305         gst_srt_object_connect (srtobject, cancellable, connection_mode, sa,
1306         sa_len, error);
1307   }
1308 
1309   return ret;
1310 }
1311 
1312 static gboolean
gst_srt_object_open_internal(GstSRTObject * srtobject,GCancellable * cancellable,GError ** error)1313 gst_srt_object_open_internal (GstSRTObject * srtobject,
1314     GCancellable * cancellable, GError ** error)
1315 {
1316   GSocketAddress *socket_address = NULL;
1317   GstSRTConnectionMode connection_mode;
1318 
1319   gpointer sa;
1320   size_t sa_len;
1321   const gchar *addr_str;
1322   guint port;
1323   gboolean ret = FALSE;
1324 
1325   GST_OBJECT_LOCK (srtobject->element);
1326 
1327   srtobject->opened = FALSE;
1328 
1329   if (!gst_structure_get_enum (srtobject->parameters,
1330           "mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) {
1331     GST_WARNING_OBJECT (srtobject->element,
1332         "Cannot get connection mode information." " Use default mode");
1333     connection_mode = GST_SRT_DEFAULT_MODE;
1334   }
1335 
1336   addr_str = gst_uri_get_host (srtobject->uri);
1337   if (addr_str == NULL) {
1338     connection_mode = GST_SRT_CONNECTION_MODE_LISTENER;
1339     addr_str = GST_SRT_DEFAULT_LOCALADDRESS;
1340     GST_DEBUG_OBJECT (srtobject->element,
1341         "Given uri doesn't have hostname or address. Use any (%s) and"
1342         " setting listener mode", addr_str);
1343   }
1344 
1345   port = gst_uri_get_port (srtobject->uri);
1346 
1347   GST_DEBUG_OBJECT (srtobject->element,
1348       "Opening SRT socket with parameters: %" GST_PTR_FORMAT,
1349       srtobject->parameters);
1350 
1351   GST_OBJECT_UNLOCK (srtobject->element);
1352 
1353   socket_address =
1354       gst_srt_object_resolve (srtobject, addr_str, port, cancellable, error);
1355   if (socket_address == NULL) {
1356     goto out;
1357   }
1358 
1359   sa_len = g_socket_address_get_native_size (socket_address);
1360   sa = g_alloca (sa_len);
1361 
1362   if (!g_socket_address_to_native (socket_address, sa, sa_len, error)) {
1363     goto out;
1364   }
1365 
1366   srtobject->listener_poll_id = srt_epoll_create ();
1367 
1368   ret =
1369       gst_srt_object_open_connection
1370       (srtobject, cancellable, connection_mode, sa, sa_len, error);
1371 
1372   GST_OBJECT_LOCK (srtobject->element);
1373   srtobject->opened = ret;
1374   GST_OBJECT_UNLOCK (srtobject->element);
1375 
1376 out:
1377   g_clear_object (&socket_address);
1378 
1379   return ret;
1380 }
1381 
1382 gboolean
gst_srt_object_open(GstSRTObject * srtobject,GCancellable * cancellable,GError ** error)1383 gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable,
1384     GError ** error)
1385 {
1386   srtobject->previous_bytes = 0;
1387 
1388   return gst_srt_object_open_internal (srtobject, cancellable, error);
1389 }
1390 
1391 void
gst_srt_object_close(GstSRTObject * srtobject)1392 gst_srt_object_close (GstSRTObject * srtobject)
1393 {
1394   g_mutex_lock (&srtobject->sock_lock);
1395 
1396   if (srtobject->sock != SRT_INVALID_SOCK) {
1397     GstStructure *stats;
1398 
1399     if (srtobject->poll_id != SRT_ERROR) {
1400       srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
1401     }
1402 
1403     stats = gst_srt_object_accumulate_stats (srtobject, srtobject->sock);
1404 
1405     /* FIXME: These are the final statistics for the socket before we close it.
1406      * Deliver the stats to the app before we throw them away. */
1407     gst_structure_free (stats);
1408 
1409     GST_DEBUG_OBJECT (srtobject->element, "Closing SRT socket (0x%x)",
1410         srtobject->sock);
1411 
1412     srt_close (srtobject->sock);
1413     srtobject->sock = SRT_INVALID_SOCK;
1414   }
1415 
1416   if (srtobject->listener_poll_id != SRT_ERROR) {
1417     if (srtobject->listener_sock != SRT_INVALID_SOCK) {
1418       srt_epoll_remove_usock (srtobject->listener_poll_id,
1419           srtobject->listener_sock);
1420     }
1421     srt_epoll_release (srtobject->listener_poll_id);
1422     srtobject->listener_poll_id = SRT_ERROR;
1423   }
1424 
1425   if (srtobject->thread) {
1426     GThread *thread = g_steal_pointer (&srtobject->thread);
1427     g_mutex_unlock (&srtobject->sock_lock);
1428     g_thread_join (thread);
1429     g_mutex_lock (&srtobject->sock_lock);
1430   }
1431 
1432   if (srtobject->listener_sock != SRT_INVALID_SOCK) {
1433     GST_DEBUG_OBJECT (srtobject->element, "Closing SRT listener socket (0x%x)",
1434         srtobject->listener_sock);
1435 
1436     srt_close (srtobject->listener_sock);
1437     srtobject->listener_sock = SRT_INVALID_SOCK;
1438   }
1439 
1440   if (srtobject->callers) {
1441     GList *callers = g_steal_pointer (&srtobject->callers);
1442     g_list_foreach (callers, (GFunc) srt_caller_signal_removed, srtobject);
1443     g_list_free_full (callers, (GDestroyNotify) srt_caller_free);
1444   }
1445 
1446   g_mutex_unlock (&srtobject->sock_lock);
1447 
1448   GST_OBJECT_LOCK (srtobject->element);
1449   srtobject->opened = FALSE;
1450   GST_OBJECT_UNLOCK (srtobject->element);
1451 }
1452 
1453 static gboolean
gst_srt_object_wait_caller(GstSRTObject * srtobject,GCancellable * cancellable,GError ** error)1454 gst_srt_object_wait_caller (GstSRTObject * srtobject,
1455     GCancellable * cancellable, GError ** error)
1456 {
1457   gboolean ret;
1458 
1459   g_mutex_lock (&srtobject->sock_lock);
1460 
1461   if (srtobject->callers == NULL) {
1462     GST_INFO_OBJECT (srtobject->element, "Waiting for connection");
1463 
1464     while (!g_cancellable_is_cancelled (cancellable)) {
1465       ret = (srtobject->callers != NULL);
1466       if (ret) {
1467         GST_DEBUG_OBJECT (srtobject->element, "Got a connection");
1468         break;
1469       }
1470 
1471       g_cond_wait (&srtobject->sock_cond, &srtobject->sock_lock);
1472     }
1473   } else {
1474     ret = TRUE;
1475   }
1476 
1477   g_mutex_unlock (&srtobject->sock_lock);
1478 
1479   if (!ret) {
1480     g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_FAILED,
1481         "Canceled waiting for a connection.");
1482   }
1483 
1484   return ret;
1485 }
1486 
1487 gssize
gst_srt_object_read(GstSRTObject * srtobject,guint8 * data,gsize size,GCancellable * cancellable,GError ** error,SRT_MSGCTRL * mctrl)1488 gst_srt_object_read (GstSRTObject * srtobject,
1489     guint8 * data, gsize size, GCancellable * cancellable, GError ** error,
1490     SRT_MSGCTRL * mctrl)
1491 {
1492   gssize len = 0;
1493   gint poll_timeout;
1494   GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
1495   gint poll_id = SRT_ERROR;
1496 
1497   /* Only source element can read data */
1498   g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
1499           (srtobject->element)) == GST_URI_SRC, -1);
1500 
1501   GST_OBJECT_LOCK (srtobject->element);
1502 
1503   gst_structure_get_enum (srtobject->parameters, "mode",
1504       GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
1505 
1506   if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
1507           &poll_timeout)) {
1508     poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
1509   }
1510 
1511   GST_OBJECT_UNLOCK (srtobject->element);
1512 
1513   if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1514     if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
1515       return -1;
1516 
1517     g_mutex_lock (&srtobject->sock_lock);
1518     if (srtobject->callers) {
1519       SRTCaller *caller = srtobject->callers->data;
1520       poll_id = caller->poll_id;
1521     }
1522     g_mutex_unlock (&srtobject->sock_lock);
1523 
1524     if (poll_id == SRT_ERROR)
1525       return 0;
1526   } else {
1527     poll_id = srtobject->poll_id;
1528   }
1529 
1530   while (!g_cancellable_is_cancelled (cancellable)) {
1531 
1532     SRTSOCKET rsock;
1533     gint rsocklen = 1;
1534     SRTSOCKET wsock;
1535     gint wsocklen = 1;
1536 
1537     if (srt_epoll_wait (poll_id, &rsock, &rsocklen, &wsock, &wsocklen,
1538             poll_timeout, NULL, 0, NULL, 0) < 0) {
1539       gint srt_errno = srt_getlasterror (NULL);
1540 
1541       if (srt_errno != SRT_ETIMEOUT) {
1542         return 0;
1543       }
1544       continue;
1545     }
1546 
1547     if (wsocklen == 1 && rsocklen == 1) {
1548       /* Socket reported in wsock AND rsock signifies an error. */
1549       gint reason = srt_getrejectreason (wsock);
1550       gboolean is_auth_error = (reason == SRT_REJ_BADSECRET
1551           || reason == SRT_REJ_UNSECURE);
1552 
1553       if (is_auth_error) {
1554         ELEMENT_WARNING_SRTSOCK_ERROR (NOT_AUTHORIZED, reason);
1555       }
1556 
1557       if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1558         /* Caller has disappeared. */
1559         return 0;
1560       } else {
1561         if (!is_auth_error) {
1562           ELEMENT_WARNING_SRTSOCK_ERROR (READ, reason);
1563         }
1564 
1565         gst_srt_object_close (srtobject);
1566         if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
1567           return -1;
1568         }
1569       }
1570       continue;
1571     }
1572 
1573 
1574     srt_msgctrl_init (mctrl);
1575     len = srt_recvmsg2 (rsock, (char *) (data), size, mctrl);
1576 
1577     if (len == SRT_ERROR) {
1578       gint srt_errno = srt_getlasterror (NULL);
1579       if (srt_errno == SRT_EASYNCRCV) {
1580         continue;
1581       } else {
1582         g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
1583             "Failed to receive from SRT socket: %s", srt_getlasterror_str ());
1584         return -1;
1585       }
1586     }
1587     break;
1588   }
1589 
1590   return len;
1591 }
1592 
1593 void
gst_srt_object_wakeup(GstSRTObject * srtobject,GCancellable * cancellable)1594 gst_srt_object_wakeup (GstSRTObject * srtobject, GCancellable * cancellable)
1595 {
1596   GST_DEBUG_OBJECT (srtobject->element, "waking up SRT");
1597 
1598   /* Removing all socket descriptors from the monitoring list
1599    * wakes up SRT's threads. We only have one to remove. */
1600   if (srtobject->sock != SRT_INVALID_SOCK && srtobject->poll_id != SRT_ERROR) {
1601     srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
1602   }
1603 
1604   /* connection is only waited for in listener mode,
1605    * but there is no harm in raising signal in any case */
1606   g_mutex_lock (&srtobject->sock_lock);
1607   /* however, a race might be harmful ...
1608    * the cancellation is used as 'flushing' flag here,
1609    * so make sure it is so detected by the intended part at proper time */
1610   g_cancellable_cancel (cancellable);
1611   g_cond_signal (&srtobject->sock_cond);
1612   g_mutex_unlock (&srtobject->sock_lock);
1613 }
1614 
1615 static gboolean
gst_srt_object_send_headers(GstSRTObject * srtobject,SRTSOCKET sock,gint poll_id,gint poll_timeout,GstBufferList * headers,GCancellable * cancellable)1616 gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
1617     gint poll_id, gint poll_timeout, GstBufferList * headers,
1618     GCancellable * cancellable)
1619 {
1620   guint size, i;
1621 
1622   if (!headers)
1623     return TRUE;
1624 
1625   size = gst_buffer_list_length (headers);
1626 
1627   GST_DEBUG_OBJECT (srtobject->element, "Sending %u stream headers", size);
1628 
1629   for (i = 0; i < size; i++) {
1630     SRTSOCKET wsock = sock;
1631     gint wsocklen = 1;
1632 
1633     GstBuffer *buffer = gst_buffer_list_get (headers, i);
1634     GstMapInfo mapinfo;
1635 
1636     if (g_cancellable_is_cancelled (cancellable)) {
1637       return FALSE;
1638     }
1639 
1640     if (poll_id > 0 && srt_epoll_wait (poll_id, 0, 0, &wsock,
1641             &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
1642       continue;
1643     }
1644 
1645     GST_TRACE_OBJECT (srtobject->element, "sending header %u %" GST_PTR_FORMAT,
1646         i, buffer);
1647 
1648     if (!gst_buffer_map (buffer, &mapinfo, GST_MAP_READ)) {
1649       GST_ELEMENT_ERROR (srtobject->element, RESOURCE, READ,
1650           ("Could not map the input stream"), (NULL));
1651       return FALSE;
1652     }
1653 
1654     if (srt_sendmsg2 (wsock, (char *) mapinfo.data, mapinfo.size,
1655             0) == SRT_ERROR) {
1656       GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
1657           ("%s", srt_getlasterror_str ()));
1658       gst_buffer_unmap (buffer, &mapinfo);
1659       return FALSE;
1660     }
1661 
1662     gst_buffer_unmap (buffer, &mapinfo);
1663   }
1664 
1665   return TRUE;
1666 }
1667 
1668 static gssize
gst_srt_object_write_to_callers(GstSRTObject * srtobject,GstBufferList * headers,const GstMapInfo * mapinfo,GCancellable * cancellable,GError ** error)1669 gst_srt_object_write_to_callers (GstSRTObject * srtobject,
1670     GstBufferList * headers,
1671     const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1672 {
1673   GList *callers;
1674 
1675   g_mutex_lock (&srtobject->sock_lock);
1676   callers = srtobject->callers;
1677   while (callers != NULL) {
1678     gssize len = 0;
1679     const guint8 *msg = mapinfo->data;
1680     gint sent;
1681     gint payload_size, optlen = sizeof (payload_size);
1682 
1683     SRTCaller *caller = callers->data;
1684     callers = callers->next;
1685 
1686     if (g_cancellable_is_cancelled (cancellable)) {
1687       goto cancelled;
1688     }
1689 
1690     if (!caller->sent_headers) {
1691       if (!gst_srt_object_send_headers (srtobject, caller->sock, -1,
1692               -1, headers, cancellable)) {
1693         goto err;
1694       }
1695       caller->sent_headers = TRUE;
1696     }
1697 
1698     if (srt_getsockflag (caller->sock, SRTO_PAYLOADSIZE, &payload_size,
1699             &optlen)) {
1700       GST_WARNING_OBJECT (srtobject->element, "%s", srt_getlasterror_str ());
1701       goto err;
1702     }
1703 
1704     while (len < mapinfo->size) {
1705       gint rest = MIN (mapinfo->size - len, payload_size);
1706       sent = srt_sendmsg2 (caller->sock, (char *) (msg + len), rest, 0);
1707       if (sent < 0) {
1708         GST_WARNING_OBJECT (srtobject->element, "Dropping caller %d: %s",
1709             caller->sock, srt_getlasterror_str ());
1710         goto err;
1711       }
1712       len += sent;
1713     }
1714 
1715     continue;
1716 
1717   err:
1718     srtobject->callers = g_list_remove (srtobject->callers, caller);
1719     srt_caller_signal_removed (caller, srtobject);
1720     srt_caller_free (caller);
1721   }
1722 
1723   g_mutex_unlock (&srtobject->sock_lock);
1724   return mapinfo->size;
1725 
1726 cancelled:
1727   g_mutex_unlock (&srtobject->sock_lock);
1728   return -1;
1729 }
1730 
1731 static gssize
gst_srt_object_write_one(GstSRTObject * srtobject,GstBufferList * headers,const GstMapInfo * mapinfo,GCancellable * cancellable,GError ** error)1732 gst_srt_object_write_one (GstSRTObject * srtobject,
1733     GstBufferList * headers,
1734     const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1735 {
1736   gssize len = 0;
1737   gint poll_timeout;
1738   const guint8 *msg = mapinfo->data;
1739   gint payload_size, optlen = sizeof (payload_size);
1740   gboolean wait_for_connection;
1741 
1742   GST_OBJECT_LOCK (srtobject->element);
1743   wait_for_connection = srtobject->wait_for_connection;
1744   if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
1745           &poll_timeout)) {
1746     poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
1747   }
1748   GST_OBJECT_UNLOCK (srtobject->element);
1749 
1750   if (!srtobject->sent_headers) {
1751     if (!gst_srt_object_send_headers (srtobject, srtobject->sock,
1752             srtobject->poll_id, poll_timeout, headers, cancellable)) {
1753       return -1;
1754     }
1755     srtobject->sent_headers = TRUE;
1756   }
1757 
1758   while (len < mapinfo->size) {
1759     SRTSOCKET rsock;
1760     gint rsocklen = 1;
1761     SRTSOCKET wsock;
1762     gint wsocklen = 1;
1763 
1764     gint sent;
1765     gint rest;
1766 
1767     if (g_cancellable_is_cancelled (cancellable)) {
1768       break;
1769     }
1770 
1771     if (!wait_for_connection &&
1772         srt_getsockstate (srtobject->sock) == SRTS_CONNECTING) {
1773       GST_LOG_OBJECT (srtobject->element,
1774           "Not connected yet. Dropping the buffer.");
1775       break;
1776     }
1777 
1778     if (srt_epoll_wait (srtobject->poll_id, &rsock, &rsocklen, &wsock,
1779             &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
1780       continue;
1781     }
1782 
1783     if (wsocklen == 1 && rsocklen == 1) {
1784       /* Socket reported in wsock AND rsock signifies an error. */
1785       gint reason = srt_getrejectreason (wsock);
1786 
1787       if (reason == SRT_REJ_BADSECRET || reason == SRT_REJ_UNSECURE) {
1788         ELEMENT_WARNING_SRTSOCK_ERROR (NOT_AUTHORIZED, reason);
1789       } else {
1790         ELEMENT_WARNING_SRTSOCK_ERROR (WRITE, reason);
1791       }
1792 
1793       gst_srt_object_close (srtobject);
1794       if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
1795         return -1;
1796       }
1797       continue;
1798     }
1799 
1800     if (srt_getsockflag (wsock, SRTO_PAYLOADSIZE, &payload_size, &optlen)) {
1801       GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
1802           ("%s", srt_getlasterror_str ()));
1803       break;
1804     }
1805 
1806     rest = MIN (mapinfo->size - len, payload_size);
1807 
1808     sent = srt_sendmsg2 (wsock, (char *) (msg + len), rest, 0);
1809     if (sent < 0) {
1810       GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
1811           ("%s", srt_getlasterror_str ()));
1812       break;
1813     }
1814     len += sent;
1815   }
1816 
1817   return len;
1818 }
1819 
1820 gssize
gst_srt_object_write(GstSRTObject * srtobject,GstBufferList * headers,const GstMapInfo * mapinfo,GCancellable * cancellable,GError ** error)1821 gst_srt_object_write (GstSRTObject * srtobject,
1822     GstBufferList * headers,
1823     const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1824 {
1825   gssize len = 0;
1826   GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
1827   gboolean wait_for_connection;
1828 
1829   /* Only sink element can write data */
1830   g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
1831           (srtobject->element)) == GST_URI_SINK, -1);
1832 
1833   GST_OBJECT_LOCK (srtobject->element);
1834   gst_structure_get_enum (srtobject->parameters, "mode",
1835       GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
1836   wait_for_connection = srtobject->wait_for_connection;
1837   GST_OBJECT_UNLOCK (srtobject->element);
1838 
1839   if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1840     if (wait_for_connection) {
1841       if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
1842         return -1;
1843     }
1844     len =
1845         gst_srt_object_write_to_callers (srtobject, headers, mapinfo,
1846         cancellable, error);
1847   } else {
1848     len =
1849         gst_srt_object_write_one (srtobject, headers, mapinfo, cancellable,
1850         error);
1851   }
1852 
1853   return len;
1854 }
1855 
1856 static GstStructure *
get_stats_for_srtsock(SRTSOCKET srtsock,gboolean is_sender,guint64 * bytes)1857 get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender, guint64 * bytes)
1858 {
1859   GstStructure *s = gst_structure_new_empty ("application/x-srt-statistics");
1860   int ret;
1861   SRT_TRACEBSTATS stats;
1862 
1863   ret = srt_bstats (srtsock, &stats, 0);
1864 
1865   if (ret >= 0) {
1866     if (is_sender) {
1867       gst_structure_set (s,
1868           /* number of sent data packets, including retransmissions */
1869           "packets-sent", G_TYPE_INT64, stats.pktSent,
1870           /* number of lost packets (sender side) */
1871           "packets-sent-lost", G_TYPE_INT, stats.pktSndLoss,
1872           /* number of retransmitted packets */
1873           "packets-retransmitted", G_TYPE_INT, stats.pktRetrans,
1874           /* number of received ACK packets */
1875           "packet-ack-received", G_TYPE_INT, stats.pktRecvACK,
1876           /* number of received NAK packets */
1877           "packet-nack-received", G_TYPE_INT, stats.pktRecvNAK,
1878           /* time duration when UDT is sending data (idle time exclusive) */
1879           "send-duration-us", G_TYPE_INT64, stats.usSndDuration,
1880           /* number of sent data bytes, including retransmissions */
1881           "bytes-sent", G_TYPE_UINT64, stats.byteSent,
1882           /* number of retransmitted bytes */
1883           "bytes-retransmitted", G_TYPE_UINT64, stats.byteRetrans,
1884           /* number of too-late-to-send dropped bytes */
1885           "bytes-sent-dropped", G_TYPE_UINT64, stats.byteSndDrop,
1886           /* number of too-late-to-send dropped packets */
1887           "packets-sent-dropped", G_TYPE_INT, stats.pktSndDrop,
1888           /* sending rate in Mb/s */
1889           "send-rate-mbps", G_TYPE_DOUBLE, stats.mbpsSendRate,
1890           /* busy sending time (i.e., idle time exclusive) */
1891           "send-duration-us", G_TYPE_UINT64, stats.usSndDuration,
1892           "negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay, NULL);
1893       *bytes += stats.byteSent;
1894     } else {
1895       gst_structure_set (s,
1896           "packets-received", G_TYPE_INT64, stats.pktRecvTotal,
1897           "packets-received-lost", G_TYPE_INT, stats.pktRcvLossTotal,
1898           /* number of sent ACK packets */
1899           "packet-ack-sent", G_TYPE_INT, stats.pktSentACK,
1900           /* number of sent NAK packets */
1901           "packet-nack-sent", G_TYPE_INT, stats.pktSentNAK,
1902           "bytes-received", G_TYPE_UINT64, stats.byteRecvTotal,
1903           "bytes-received-lost", G_TYPE_UINT64, stats.byteRcvLossTotal,
1904           "receive-rate-mbps", G_TYPE_DOUBLE, stats.mbpsRecvRate,
1905           "negotiated-latency-ms", G_TYPE_INT, stats.msRcvTsbPdDelay, NULL);
1906       *bytes += stats.byteRecvTotal;
1907     }
1908 
1909     gst_structure_set (s,
1910         /* estimated bandwidth, in Mb/s */
1911         "bandwidth-mbps", G_TYPE_DOUBLE, stats.mbpsBandwidth,
1912         "rtt-ms", G_TYPE_DOUBLE, stats.msRTT, NULL);
1913 
1914   }
1915 
1916   return s;
1917 }
1918 
1919 GstStructure *
gst_srt_object_get_stats(GstSRTObject * srtobject)1920 gst_srt_object_get_stats (GstSRTObject * srtobject)
1921 {
1922   GstStructure *s = NULL;
1923   gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
1924   guint64 bytes;
1925 
1926   g_mutex_lock (&srtobject->sock_lock);
1927 
1928   bytes = srtobject->previous_bytes;
1929 
1930   if (srtobject->sock != SRT_INVALID_SOCK) {
1931     s = get_stats_for_srtsock (srtobject->sock, is_sender, &bytes);
1932     goto done;
1933   }
1934 
1935   s = gst_structure_new_empty ("application/x-srt-statistics");
1936 
1937   if (srtobject->callers) {
1938     GValueArray *callers_stats = g_value_array_new (1);
1939     GValue callers_stats_v = G_VALUE_INIT;
1940     GList *item;
1941 
1942     for (item = srtobject->callers; item; item = item->next) {
1943       SRTCaller *caller = item->data;
1944       GstStructure *tmp;
1945       GValue *v;
1946 
1947       tmp = get_stats_for_srtsock (caller->sock, is_sender, &bytes);
1948 
1949       gst_structure_set (tmp, "caller-address", G_TYPE_SOCKET_ADDRESS,
1950           caller->sockaddr, NULL);
1951 
1952       g_value_array_append (callers_stats, NULL);
1953       v = g_value_array_get_nth (callers_stats, callers_stats->n_values - 1);
1954       g_value_init (v, GST_TYPE_STRUCTURE);
1955       g_value_take_boxed (v, tmp);
1956     }
1957 
1958     g_value_init (&callers_stats_v, G_TYPE_VALUE_ARRAY);
1959     g_value_take_boxed (&callers_stats_v, callers_stats);
1960     gst_structure_take_value (s, "callers", &callers_stats_v);
1961   }
1962 
1963 done:
1964   gst_structure_set (s, is_sender ? "bytes-sent-total" : "bytes-received-total",
1965       G_TYPE_UINT64, bytes, NULL);
1966 
1967   g_mutex_unlock (&srtobject->sock_lock);
1968 
1969   return s;
1970 }
1971 
1972 static GstStructure *
gst_srt_object_accumulate_stats(GstSRTObject * srtobject,SRTSOCKET srtsock)1973 gst_srt_object_accumulate_stats (GstSRTObject * srtobject, SRTSOCKET srtsock)
1974 {
1975   gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
1976   GstStructure *stats;
1977   guint64 bytes = 0;
1978 
1979   stats = get_stats_for_srtsock (srtsock, is_sender, &bytes);
1980   srtobject->previous_bytes += bytes;
1981 
1982   return stats;
1983 }
1984