1 /* GStreamer
2 * Copyright (C) 2018, Collabora Ltd.
3 * Copyright (C) 2018, SK Telecom, Co., Ltd.
4 * Author: Jeongseok Kim <jeongseok.kim@sk.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 /* Needed for GValueArray */
27 #define GLIB_DISABLE_DEPRECATION_WARNINGS
28
29 #include "gstsrtobject.h"
30
31 #include <gst/base/gstbasesink.h>
32 #include <gio/gnetworking.h>
33 #include <stdlib.h>
34 #include <stdbool.h>
35 #include <stdint.h>
36
37 GST_DEBUG_CATEGORY_EXTERN (gst_debug_srtobject);
38 #define GST_CAT_DEFAULT gst_debug_srtobject
39
40 #if SRT_VERSION_VALUE > 0x10402
41 #define SRTSOCK_ERROR_DEBUG ("libsrt reported: %s", srt_rejectreason_str (reason))
42 #else
43 /* srt_rejectreason_str() is unavailable in libsrt 1.4.2 and prior due to
44 * unexported symbol. See https://github.com/Haivision/srt/pull/1728. */
45 #define SRTSOCK_ERROR_DEBUG ("libsrt reported reject reason code %d", reason)
46 #endif
47
48 /* Define options added in later revisions */
49 #if SRT_VERSION_VALUE < 0x10402
50 #define SRTO_DRIFTTRACER 37
51 /* We can't define SRTO_BINDTODEVICE since it depends on configuration flags *sigh* */
52 #define SRTO_RETRANSMITALGO 61
53 #endif
54
55 #define ELEMENT_WARNING_SRTSOCK_ERROR(code, reason) \
56 GST_ELEMENT_WARNING (srtobject->element, RESOURCE, code, \
57 ("Error on SRT socket. Trying to reconnect."), SRTSOCK_ERROR_DEBUG)
58
59 enum
60 {
61 PROP_URI = 1,
62 PROP_MODE,
63 PROP_LOCALADDRESS,
64 PROP_LOCALPORT,
65 PROP_PASSPHRASE,
66 PROP_PBKEYLEN,
67 PROP_POLL_TIMEOUT,
68 PROP_LATENCY,
69 PROP_MSG_SIZE,
70 PROP_STATS,
71 PROP_WAIT_FOR_CONNECTION,
72 PROP_STREAMID,
73 PROP_AUTHENTICATION,
74 PROP_LAST
75 };
76
77 typedef struct
78 {
79 SRTSOCKET sock;
80 gint poll_id;
81 GSocketAddress *sockaddr;
82 gboolean sent_headers;
83 } SRTCaller;
84
85 static GstStructure *gst_srt_object_accumulate_stats (GstSRTObject * srtobject,
86 SRTSOCKET srtsock);
87
88 static SRTCaller *
srt_caller_new(void)89 srt_caller_new (void)
90 {
91 SRTCaller *caller = g_new0 (SRTCaller, 1);
92 caller->sock = SRT_INVALID_SOCK;
93 caller->poll_id = SRT_ERROR;
94 caller->sent_headers = FALSE;
95
96 return caller;
97 }
98
99 static void
srt_caller_free(SRTCaller * caller)100 srt_caller_free (SRTCaller * caller)
101 {
102 g_return_if_fail (caller != NULL);
103
104 g_clear_object (&caller->sockaddr);
105
106 if (caller->sock != SRT_INVALID_SOCK) {
107 srt_close (caller->sock);
108 }
109
110 if (caller->poll_id != SRT_ERROR) {
111 srt_epoll_release (caller->poll_id);
112 }
113
114 g_free (caller);
115 }
116
117 /* called with sock_lock */
118 static void
srt_caller_signal_removed(SRTCaller * caller,GstSRTObject * srtobject)119 srt_caller_signal_removed (SRTCaller * caller, GstSRTObject * srtobject)
120 {
121 GstStructure *stats;
122
123 stats = gst_srt_object_accumulate_stats (srtobject, caller->sock);
124
125 /* FIXME: These are the final statistics for the caller before we close its
126 * socket. Deliver the stats to the app before we throw them away. */
127 gst_structure_free (stats);
128
129 g_signal_emit_by_name (srtobject->element, "caller-removed", 0,
130 caller->sockaddr);
131 }
132
133 struct srt_constant_params
134 {
135 const gchar *name;
136 SRT_SOCKOPT param;
137 const void *val;
138 int val_len;
139 };
140
141 static const bool bool_false = false;
142 static const bool bool_true = true;
143 static const struct linger no_linger = { 0, 0 };
144
145 /* *INDENT-OFF* */
146 static const struct srt_constant_params srt_params[] = {
147 {"SRTO_SNDSYN", SRTO_SNDSYN, &bool_false, sizeof bool_false}, /* non-blocking */
148 {"SRTO_RCVSYN", SRTO_RCVSYN, &bool_false, sizeof bool_false}, /* non-blocking */
149 {"SRTO_LINGER", SRTO_LINGER, &no_linger, sizeof no_linger}, /* no linger time */
150 {"SRTO_TSBPDMODE", SRTO_TSBPDMODE, &bool_true, sizeof bool_true}, /* Timestamp-based Packet Delivery mode must be enabled */
151 {NULL, -1, NULL, 0},
152 };
153 /* *INDENT-ON* */
154
155 typedef struct
156 {
157 const gchar *name;
158 SRT_SOCKOPT opt;
159 GType gtype;
160 } SrtOption;
161
162 SrtOption srt_options[] = {
163 {"mss", SRTO_MSS, G_TYPE_INT},
164 {"fc", SRTO_FC, G_TYPE_INT},
165 {"sndbuf", SRTO_SNDBUF, G_TYPE_INT},
166 {"rcvbuf", SRTO_RCVBUF, G_TYPE_INT},
167 {"maxbw", SRTO_MAXBW, G_TYPE_INT64},
168 {"tsbpdmode", SRTO_TSBPDMODE, G_TYPE_BOOLEAN},
169 {"latency", SRTO_LATENCY, G_TYPE_INT},
170 {"inputbw", SRTO_INPUTBW, G_TYPE_INT64},
171 {"oheadbw", SRTO_OHEADBW, G_TYPE_INT},
172 {"passphrase", SRTO_PASSPHRASE, G_TYPE_STRING},
173 {"pbkeylen", SRTO_PBKEYLEN, G_TYPE_INT},
174 {"ipttl", SRTO_IPTTL, G_TYPE_INT},
175 {"iptos", SRTO_IPTOS, G_TYPE_INT},
176 {"tlpktdrop", SRTO_TLPKTDROP, G_TYPE_BOOLEAN},
177 {"snddropdelay", SRTO_SNDDROPDELAY, G_TYPE_INT},
178 {"nakreport", SRTO_NAKREPORT, G_TYPE_BOOLEAN},
179 {"conntimeo", SRTO_CONNTIMEO, G_TYPE_INT},
180 {"drifttracer", SRTO_DRIFTTRACER, G_TYPE_BOOLEAN},
181 {"lossmaxttl", SRTO_LOSSMAXTTL, G_TYPE_INT},
182 {"rcvlatency", SRTO_RCVLATENCY, G_TYPE_INT},
183 {"peerlatency", SRTO_PEERLATENCY, G_TYPE_INT},
184 {"minversion", SRTO_MINVERSION, G_TYPE_INT},
185 {"streamid", SRTO_STREAMID, G_TYPE_STRING},
186 {"congestion", SRTO_CONGESTION, G_TYPE_STRING},
187 {"messageapi", SRTO_MESSAGEAPI, G_TYPE_BOOLEAN},
188 {"payloadsize", SRTO_PAYLOADSIZE, G_TYPE_INT},
189 {"transtype", SRTO_TRANSTYPE, G_TYPE_INT},
190 {"kmrefreshrate", SRTO_KMREFRESHRATE, G_TYPE_INT},
191 {"kmpreannounce", SRTO_KMPREANNOUNCE, G_TYPE_INT},
192 {"enforcedencryption", SRTO_ENFORCEDENCRYPTION, G_TYPE_BOOLEAN},
193 {"ipv6only", SRTO_IPV6ONLY, G_TYPE_INT},
194 {"peeridletimeo", SRTO_PEERIDLETIMEO, G_TYPE_INT},
195 #if SRT_VERSION_VALUE >= 0x10402
196 {"bindtodevice", SRTO_BINDTODEVICE, G_TYPE_STRING},
197 #endif
198 {"packetfilter", SRTO_PACKETFILTER, G_TYPE_STRING},
199 {"retransmitalgo", SRTO_RETRANSMITALGO, G_TYPE_INT},
200 {NULL}
201 };
202
203 static gint srt_init_refcount = 0;
204
205 static GSocketAddress *
gst_srt_object_resolve(GstSRTObject * srtobject,const gchar * address,guint port,GCancellable * cancellable,GError ** err_out)206 gst_srt_object_resolve (GstSRTObject * srtobject, const gchar * address,
207 guint port, GCancellable * cancellable, GError ** err_out)
208 {
209 GError *err = NULL;
210 GSocketAddress *saddr;
211 GResolver *resolver;
212
213 saddr = g_inet_socket_address_new_from_string (address, port);
214 if (!saddr) {
215 GList *results;
216
217 GST_DEBUG_OBJECT (srtobject->element, "resolving IP address for host %s",
218 address);
219 resolver = g_resolver_get_default ();
220 results = g_resolver_lookup_by_name (resolver, address, cancellable, &err);
221 if (!results)
222 goto name_resolve;
223
224 saddr = g_inet_socket_address_new (G_INET_ADDRESS (results->data), port);
225
226 g_resolver_free_addresses (results);
227 g_object_unref (resolver);
228 }
229 #ifndef GST_DISABLE_GST_DEBUG
230 {
231 gchar *ip =
232 g_inet_address_to_string (g_inet_socket_address_get_address
233 (G_INET_SOCKET_ADDRESS (saddr)));
234
235 GST_DEBUG_OBJECT (srtobject->element, "IP address for host %s is %s",
236 address, ip);
237 g_free (ip);
238 }
239 #endif
240
241 return saddr;
242
243 name_resolve:
244 {
245 GST_WARNING_OBJECT (srtobject->element, "Failed to resolve %s: %s", address,
246 err->message);
247 g_set_error (err_out, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
248 "Failed to resolve host '%s': %s", address, err->message);
249 g_clear_error (&err);
250 g_object_unref (resolver);
251 return NULL;
252 }
253 }
254
255 static gboolean
gst_srt_object_apply_socket_option(SRTSOCKET sock,SrtOption * option,const GValue * value,GError ** error)256 gst_srt_object_apply_socket_option (SRTSOCKET sock, SrtOption * option,
257 const GValue * value, GError ** error)
258 {
259 union
260 {
261 int32_t i;
262 int64_t i64;
263 gboolean b;
264 const gchar *c;
265 } u;
266 const void *optval = &u;
267 gint optlen;
268
269 if (!G_VALUE_HOLDS (value, option->gtype)) {
270 goto bad_type;
271 }
272
273 switch (option->gtype) {
274 case G_TYPE_INT:
275 u.i = g_value_get_int (value);
276 optlen = sizeof u.i;
277 break;
278 case G_TYPE_INT64:
279 u.i64 = g_value_get_int64 (value);
280 optlen = sizeof u.i64;
281 break;
282 case G_TYPE_BOOLEAN:
283 u.b = g_value_get_boolean (value);
284 optlen = sizeof u.b;
285 break;
286 case G_TYPE_STRING:
287 u.c = g_value_get_string (value);
288 optval = u.c;
289 optlen = u.c ? strlen (u.c) : 0;
290 if (optlen == 0) {
291 return TRUE;
292 }
293 break;
294 default:
295 goto bad_type;
296 }
297
298 if (srt_setsockopt (sock, 0, option->opt, optval, optlen)) {
299 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
300 "failed to set %s (reason: %s)", option->name, srt_getlasterror_str ());
301 return FALSE;
302 }
303
304 return TRUE;
305
306 bad_type:
307 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
308 "option %s has unsupported type", option->name);
309 return FALSE;
310 }
311
312 static gboolean
gst_srt_object_set_common_params(SRTSOCKET sock,GstSRTObject * srtobject,GError ** error)313 gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
314 GError ** error)
315 {
316 const struct srt_constant_params *params = srt_params;
317 SrtOption *option = srt_options;
318
319 GST_OBJECT_LOCK (srtobject->element);
320
321 for (; params->name != NULL; params++) {
322 if (srt_setsockopt (sock, 0, params->param, params->val, params->val_len)) {
323 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
324 "failed to set %s (reason: %s)", params->name,
325 srt_getlasterror_str ());
326 goto err;
327 }
328 }
329
330 for (; option->name; ++option) {
331 const GValue *val;
332
333 val = gst_structure_get_value (srtobject->parameters, option->name);
334 if (val && !gst_srt_object_apply_socket_option (sock, option, val, error)) {
335 goto err;
336 }
337 }
338
339 GST_OBJECT_UNLOCK (srtobject->element);
340 return TRUE;
341
342 err:
343 GST_OBJECT_UNLOCK (srtobject->element);
344 return FALSE;
345 }
346
347 GstSRTObject *
gst_srt_object_new(GstElement * element)348 gst_srt_object_new (GstElement * element)
349 {
350 GstSRTObject *srtobject;
351
352 if (g_atomic_int_add (&srt_init_refcount, 1) == 0) {
353 GST_DEBUG_OBJECT (element, "Starting up SRT");
354 if (srt_startup () < 0) {
355 g_warning ("Failed to initialize SRT (reason: %s)",
356 srt_getlasterror_str ());
357 }
358 }
359
360 srtobject = g_new0 (GstSRTObject, 1);
361 srtobject->element = element;
362 srtobject->parameters = gst_structure_new_empty ("application/x-srt-params");
363 srtobject->sock = SRT_INVALID_SOCK;
364 srtobject->poll_id = srt_epoll_create ();
365 srtobject->listener_sock = SRT_INVALID_SOCK;
366 srtobject->listener_poll_id = SRT_ERROR;
367 srtobject->sent_headers = FALSE;
368 srtobject->wait_for_connection = GST_SRT_DEFAULT_WAIT_FOR_CONNECTION;
369
370 g_cond_init (&srtobject->sock_cond);
371 return srtobject;
372 }
373
374 void
gst_srt_object_destroy(GstSRTObject * srtobject)375 gst_srt_object_destroy (GstSRTObject * srtobject)
376 {
377 g_return_if_fail (srtobject != NULL);
378
379 if (srtobject->poll_id != SRT_ERROR) {
380 srt_epoll_release (srtobject->poll_id);
381 srtobject->poll_id = SRT_ERROR;
382 }
383
384 g_cond_clear (&srtobject->sock_cond);
385
386 GST_DEBUG_OBJECT (srtobject->element, "Destroying srtobject");
387 gst_structure_free (srtobject->parameters);
388
389 if (g_atomic_int_dec_and_test (&srt_init_refcount)) {
390 srt_cleanup ();
391 GST_DEBUG_OBJECT (srtobject->element, "Cleaning up SRT");
392 }
393
394 g_clear_pointer (&srtobject->uri, gst_uri_unref);
395
396 g_free (srtobject);
397 }
398
399 gboolean
gst_srt_object_set_property_helper(GstSRTObject * srtobject,guint prop_id,const GValue * value,GParamSpec * pspec)400 gst_srt_object_set_property_helper (GstSRTObject * srtobject,
401 guint prop_id, const GValue * value, GParamSpec * pspec)
402 {
403 GST_OBJECT_LOCK (srtobject->element);
404
405 switch (prop_id) {
406 case PROP_URI:
407 gst_srt_object_set_uri (srtobject, g_value_get_string (value), NULL);
408 break;
409 case PROP_MODE:
410 gst_structure_set_value (srtobject->parameters, "mode", value);
411 break;
412 case PROP_POLL_TIMEOUT:
413 gst_structure_set_value (srtobject->parameters, "poll-timeout", value);
414 break;
415 case PROP_LATENCY:
416 gst_structure_set_value (srtobject->parameters, "latency", value);
417 break;
418 case PROP_LOCALADDRESS:
419 gst_structure_set_value (srtobject->parameters, "localaddress", value);
420 break;
421 case PROP_LOCALPORT:
422 gst_structure_set_value (srtobject->parameters, "localport", value);
423 break;
424 case PROP_PASSPHRASE:
425 gst_structure_set_value (srtobject->parameters, "passphrase", value);
426 break;
427 case PROP_PBKEYLEN:
428 gst_structure_set (srtobject->parameters, "pbkeylen", G_TYPE_INT,
429 g_value_get_enum (value), NULL);
430 break;
431 case PROP_WAIT_FOR_CONNECTION:
432 srtobject->wait_for_connection = g_value_get_boolean (value);
433 break;
434 case PROP_STREAMID:
435 gst_structure_set_value (srtobject->parameters, "streamid", value);
436 break;
437 case PROP_AUTHENTICATION:
438 srtobject->authentication = g_value_get_boolean (value);
439 break;
440 default:
441 goto err;
442 }
443
444 GST_OBJECT_UNLOCK (srtobject->element);
445 return TRUE;
446
447 err:
448 GST_OBJECT_UNLOCK (srtobject->element);
449 return FALSE;
450 }
451
452 gboolean
gst_srt_object_get_property_helper(GstSRTObject * srtobject,guint prop_id,GValue * value,GParamSpec * pspec)453 gst_srt_object_get_property_helper (GstSRTObject * srtobject,
454 guint prop_id, GValue * value, GParamSpec * pspec)
455 {
456 switch (prop_id) {
457 case PROP_URI:
458 GST_OBJECT_LOCK (srtobject->element);
459 g_value_take_string (value, gst_uri_to_string (srtobject->uri));
460 GST_OBJECT_UNLOCK (srtobject->element);
461 break;
462 case PROP_MODE:{
463 GstSRTConnectionMode v;
464
465 GST_OBJECT_LOCK (srtobject->element);
466 if (!gst_structure_get_enum (srtobject->parameters, "mode",
467 GST_TYPE_SRT_CONNECTION_MODE, (gint *) & v)) {
468 GST_WARNING_OBJECT (srtobject->element, "Failed to get 'mode'");
469 v = GST_SRT_CONNECTION_MODE_NONE;
470 }
471 g_value_set_enum (value, v);
472 GST_OBJECT_UNLOCK (srtobject->element);
473 break;
474 }
475 case PROP_LOCALADDRESS:
476 GST_OBJECT_LOCK (srtobject->element);
477 g_value_set_string (value,
478 gst_structure_get_string (srtobject->parameters, "localaddress"));
479 GST_OBJECT_UNLOCK (srtobject->element);
480 break;
481 case PROP_LOCALPORT:{
482 guint v;
483
484 GST_OBJECT_LOCK (srtobject->element);
485 if (!gst_structure_get_uint (srtobject->parameters, "localport", &v)) {
486 GST_WARNING_OBJECT (srtobject->element, "Failed to get 'localport'");
487 v = GST_SRT_DEFAULT_PORT;
488 }
489 g_value_set_uint (value, v);
490 GST_OBJECT_UNLOCK (srtobject->element);
491 break;
492 }
493 case PROP_PBKEYLEN:{
494 GstSRTKeyLength v;
495
496 GST_OBJECT_LOCK (srtobject->element);
497 if (!gst_structure_get_int (srtobject->parameters, "pbkeylen",
498 (gint *) & v)) {
499 GST_WARNING_OBJECT (srtobject->element, "Failed to get 'pbkeylen'");
500 v = GST_SRT_KEY_LENGTH_NO_KEY;
501 }
502 g_value_set_enum (value, v);
503 GST_OBJECT_UNLOCK (srtobject->element);
504 break;
505 }
506 case PROP_POLL_TIMEOUT:{
507 gint v;
508
509 GST_OBJECT_LOCK (srtobject->element);
510 if (!gst_structure_get_int (srtobject->parameters, "poll-timeout", &v)) {
511 GST_WARNING_OBJECT (srtobject->element, "Failed to get 'poll-timeout'");
512 v = GST_SRT_DEFAULT_POLL_TIMEOUT;
513 }
514 g_value_set_int (value, v);
515 GST_OBJECT_UNLOCK (srtobject->element);
516 break;
517 }
518 case PROP_LATENCY:{
519 gint v;
520
521 GST_OBJECT_LOCK (srtobject->element);
522 if (!gst_structure_get_int (srtobject->parameters, "latency", &v)) {
523 GST_WARNING_OBJECT (srtobject->element, "Failed to get 'latency'");
524 v = GST_SRT_DEFAULT_LATENCY;
525 }
526 g_value_set_int (value, v);
527 GST_OBJECT_UNLOCK (srtobject->element);
528 break;
529 }
530 case PROP_STATS:
531 g_value_take_boxed (value, gst_srt_object_get_stats (srtobject));
532 break;
533 case PROP_WAIT_FOR_CONNECTION:
534 GST_OBJECT_LOCK (srtobject->element);
535 g_value_set_boolean (value, srtobject->wait_for_connection);
536 GST_OBJECT_UNLOCK (srtobject->element);
537 break;
538 case PROP_STREAMID:
539 GST_OBJECT_LOCK (srtobject->element);
540 g_value_set_string (value,
541 gst_structure_get_string (srtobject->parameters, "streamid"));
542 GST_OBJECT_UNLOCK (srtobject->element);
543 break;
544 case PROP_AUTHENTICATION:
545 g_value_set_boolean (value, srtobject->authentication);
546 break;
547 default:
548 return FALSE;
549 }
550
551 return TRUE;
552 }
553
554 void
gst_srt_object_install_properties_helper(GObjectClass * gobject_class)555 gst_srt_object_install_properties_helper (GObjectClass * gobject_class)
556 {
557 /**
558 * GstSRTSrc:uri:
559 *
560 * The URI used by SRT connection. User can specify SRT specific options by URI parameters.
561 * Refer to <a href="https://github.com/Haivision/srt/blob/master/docs/stransmit.md#medium-srt">Mediun: SRT</a>
562 */
563 g_object_class_install_property (gobject_class, PROP_URI,
564 g_param_spec_string ("uri", "URI",
565 "URI in the form of srt://address:port", GST_SRT_DEFAULT_URI,
566 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
567 G_PARAM_STATIC_STRINGS));
568
569 /**
570 * GstSRTSrc:mode:
571 *
572 * The SRT connection mode.
573 * This property can be set by URI parameters.
574 */
575 g_object_class_install_property (gobject_class, PROP_MODE,
576 g_param_spec_enum ("mode", "Connection mode",
577 "SRT connection mode", GST_TYPE_SRT_CONNECTION_MODE,
578 GST_SRT_CONNECTION_MODE_CALLER,
579 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
580 G_PARAM_STATIC_STRINGS));
581 gst_type_mark_as_plugin_api (GST_TYPE_SRT_CONNECTION_MODE, 0);
582
583 /**
584 * GstSRTSrc:localaddress:
585 *
586 * The address to bind when #GstSRTSrc:mode is listener or rendezvous.
587 * This property can be set by URI parameters.
588 */
589 g_object_class_install_property (gobject_class, PROP_LOCALADDRESS,
590 g_param_spec_string ("localaddress", "Local address",
591 "Local address to bind", GST_SRT_DEFAULT_LOCALADDRESS,
592 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
593 G_PARAM_STATIC_STRINGS));
594
595 /**
596 * GstSRTSrc:localport:
597 *
598 * The local port to bind when #GstSRTSrc:mode is listener or rendezvous.
599 * This property can be set by URI parameters.
600 */
601 g_object_class_install_property (gobject_class, PROP_LOCALPORT,
602 g_param_spec_uint ("localport", "Local port",
603 "Local port to bind", 0,
604 65535, GST_SRT_DEFAULT_PORT,
605 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
606 G_PARAM_STATIC_STRINGS));
607
608 /**
609 * GstSRTSrc:passphrase:
610 *
611 * The password for the encrypted transmission.
612 * This property can be set by URI parameters.
613 */
614 g_object_class_install_property (gobject_class, PROP_PASSPHRASE,
615 g_param_spec_string ("passphrase", "Passphrase",
616 "Password for the encrypted transmission", "",
617 G_PARAM_WRITABLE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS));
618
619 /**
620 * GstSRTSrc:pbkeylen:
621 *
622 * The crypto key length.
623 * This property can be set by URI parameters.
624 */
625 g_object_class_install_property (gobject_class, PROP_PBKEYLEN,
626 g_param_spec_enum ("pbkeylen", "Crypto key length",
627 "Crypto key length in bytes", GST_TYPE_SRT_KEY_LENGTH,
628 GST_SRT_DEFAULT_PBKEYLEN,
629 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
630 G_PARAM_STATIC_STRINGS));
631 gst_type_mark_as_plugin_api (GST_TYPE_SRT_KEY_LENGTH, 0);
632
633 /**
634 * GstSRTSrc:poll-timeout:
635 *
636 * The polling timeout used when srt poll is started.
637 * Even if the default value indicates infinite waiting, it can be cancellable according to #GstState
638 * This property can be set by URI parameters.
639 */
640 g_object_class_install_property (gobject_class, PROP_POLL_TIMEOUT,
641 g_param_spec_int ("poll-timeout", "Poll timeout",
642 "Return poll wait after timeout milliseconds (-1 = infinite)", -1,
643 G_MAXINT32, GST_SRT_DEFAULT_POLL_TIMEOUT,
644 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
645 G_PARAM_STATIC_STRINGS));
646
647 /**
648 * GstSRTSrc:latency:
649 *
650 * The maximum accepted transmission latency.
651 */
652 g_object_class_install_property (gobject_class, PROP_LATENCY,
653 g_param_spec_int ("latency", "latency",
654 "Minimum latency (milliseconds)", 0,
655 G_MAXINT32, GST_SRT_DEFAULT_LATENCY,
656 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
657
658 /**
659 * GstSRTSrc:stats:
660 *
661 * The statistics from SRT.
662 */
663 g_object_class_install_property (gobject_class, PROP_STATS,
664 g_param_spec_boxed ("stats", "Statistics",
665 "SRT Statistics", GST_TYPE_STRUCTURE,
666 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
667
668 /**
669 * GstSRTSink:wait-for-connection:
670 *
671 * Boolean to block streaming until a client connects. If TRUE,
672 * `srtsink' will stream only when a client is connected.
673 */
674 g_object_class_install_property (gobject_class, PROP_WAIT_FOR_CONNECTION,
675 g_param_spec_boolean ("wait-for-connection",
676 "Wait for a connection",
677 "Block the stream until a client connects",
678 GST_SRT_DEFAULT_WAIT_FOR_CONNECTION,
679 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
680
681 /**
682 * GstSRTSrc:streamid:
683 *
684 * The stream id for the SRT access control.
685 */
686 g_object_class_install_property (gobject_class, PROP_STREAMID,
687 g_param_spec_string ("streamid", "Stream ID",
688 "Stream ID for the SRT access control", "",
689 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
690 G_PARAM_STATIC_STRINGS));
691
692 /**
693 * GstSRTSink:authentication:
694 *
695 * Boolean to authenticate a connection. If TRUE,
696 * the incoming connection is authenticated. Else,
697 * all the connections are accepted.
698 *
699 * Since: 1.20
700 *
701 */
702 g_object_class_install_property (gobject_class, PROP_AUTHENTICATION,
703 g_param_spec_boolean ("authentication",
704 "Authentication",
705 "Authenticate a connection",
706 FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
707 }
708
709 static void
gst_srt_object_set_enum_value(GstStructure * s,GType enum_type,gconstpointer key,gconstpointer value)710 gst_srt_object_set_enum_value (GstStructure * s, GType enum_type,
711 gconstpointer key, gconstpointer value)
712 {
713 GEnumClass *enum_class;
714 GEnumValue *enum_value;
715
716 enum_class = g_type_class_ref (enum_type);
717 enum_value = g_enum_get_value_by_nick (enum_class, value);
718
719 if (enum_value) {
720 gst_structure_set (s, key, enum_type, enum_value->value, NULL);
721 }
722
723 g_type_class_unref (enum_class);
724 }
725
726 static void
gst_srt_object_set_string_value(GstStructure * s,const gchar * key,const gchar * value)727 gst_srt_object_set_string_value (GstStructure * s, const gchar * key,
728 const gchar * value)
729 {
730 gst_structure_set (s, key, G_TYPE_STRING, value, NULL);
731 }
732
733 static void
gst_srt_object_set_uint_value(GstStructure * s,const gchar * key,const gchar * value)734 gst_srt_object_set_uint_value (GstStructure * s, const gchar * key,
735 const gchar * value)
736 {
737 gst_structure_set (s, key, G_TYPE_UINT,
738 (guint) g_ascii_strtoll (value, NULL, 10), NULL);
739 }
740
741 static void
gst_srt_object_set_int_value(GstStructure * s,const gchar * key,const gchar * value)742 gst_srt_object_set_int_value (GstStructure * s, const gchar * key,
743 const gchar * value)
744 {
745 gst_structure_set (s, key, G_TYPE_INT,
746 (gint) g_ascii_strtoll (value, NULL, 10), NULL);
747 }
748
749 static void
gst_srt_object_set_int64_value(GstStructure * s,const gchar * key,const gchar * value)750 gst_srt_object_set_int64_value (GstStructure * s, const gchar * key,
751 const gchar * value)
752 {
753 gst_structure_set (s, key, G_TYPE_INT64,
754 g_ascii_strtoll (value, NULL, 10), NULL);
755 }
756
757 static void
gst_srt_object_set_boolean_value(GstStructure * s,const gchar * key,const gchar * value)758 gst_srt_object_set_boolean_value (GstStructure * s, const gchar * key,
759 const gchar * value)
760 {
761 gboolean bool_val;
762 static const gchar *true_names[] = {
763 "1", "yes", "on", "true", NULL
764 };
765 static const gchar *false_names[] = {
766 "0", "no", "off", "false", NULL
767 };
768
769 if (g_strv_contains (true_names, value)) {
770 bool_val = TRUE;
771 } else if (g_strv_contains (false_names, value)) {
772 bool_val = FALSE;
773 } else {
774 return;
775 }
776
777 gst_structure_set (s, key, G_TYPE_BOOLEAN, bool_val, NULL);
778 }
779
780 static void
gst_srt_object_set_socket_option(GstStructure * s,const gchar * key,const gchar * value)781 gst_srt_object_set_socket_option (GstStructure * s, const gchar * key,
782 const gchar * value)
783 {
784 SrtOption *option = srt_options;
785
786 for (; option; ++option) {
787 if (g_str_equal (key, option->name)) {
788 switch (option->gtype) {
789 case G_TYPE_INT:
790 gst_srt_object_set_int_value (s, key, value);
791 break;
792 case G_TYPE_INT64:
793 gst_srt_object_set_int64_value (s, key, value);
794 break;
795 case G_TYPE_STRING:
796 gst_srt_object_set_string_value (s, key, value);
797 break;
798 case G_TYPE_BOOLEAN:
799 gst_srt_object_set_boolean_value (s, key, value);
800 break;
801 }
802
803 break;
804 }
805 }
806 }
807
808 static void
gst_srt_object_validate_parameters(GstStructure * s,GstUri * uri)809 gst_srt_object_validate_parameters (GstStructure * s, GstUri * uri)
810 {
811 GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
812
813 gst_structure_get_enum (s, "mode", GST_TYPE_SRT_CONNECTION_MODE,
814 (gint *) & connection_mode);
815
816 if (connection_mode == GST_SRT_CONNECTION_MODE_RENDEZVOUS ||
817 connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
818 guint local_port;
819 const gchar *local_address = gst_structure_get_string (s, "localaddress");
820
821 if (local_address == NULL) {
822 local_address =
823 gst_uri_get_host (uri) ==
824 NULL ? GST_SRT_DEFAULT_LOCALADDRESS : gst_uri_get_host (uri);
825 gst_srt_object_set_string_value (s, "localaddress", local_address);
826 }
827
828 if (!gst_structure_get_uint (s, "localport", &local_port)) {
829 local_port =
830 gst_uri_get_port (uri) ==
831 GST_URI_NO_PORT ? GST_SRT_DEFAULT_PORT : gst_uri_get_port (uri);
832 gst_structure_set (s, "localport", G_TYPE_UINT, local_port, NULL);
833 }
834 }
835 }
836
837 /* called with GST_OBJECT_LOCK (srtobject->element) held */
838 gboolean
gst_srt_object_set_uri(GstSRTObject * srtobject,const gchar * uri,GError ** err)839 gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar * uri,
840 GError ** err)
841 {
842 GHashTable *query_table = NULL;
843 GHashTableIter iter;
844 gpointer key, value;
845 const char *addr_str;
846
847 if (srtobject->opened) {
848 g_warning
849 ("It's not supported to change the 'uri' property when SRT socket is opened.");
850 g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
851 "It's not supported to change the 'uri' property when SRT socket is opened");
852
853 return FALSE;
854 }
855
856 if (!g_str_has_prefix (uri, GST_SRT_DEFAULT_URI_SCHEME)) {
857 g_warning ("Given uri cannot be used for SRT connection.");
858 g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
859 "Invalid SRT URI scheme");
860 return FALSE;
861 }
862
863 g_clear_pointer (&srtobject->uri, gst_uri_unref);
864 srtobject->uri = gst_uri_from_string (uri);
865
866 g_clear_pointer (&srtobject->parameters, gst_structure_free);
867 srtobject->parameters = gst_structure_new ("application/x-srt-params",
868 "poll-timeout", G_TYPE_INT, GST_SRT_DEFAULT_POLL_TIMEOUT,
869 "latency", G_TYPE_INT, GST_SRT_DEFAULT_LATENCY, NULL);
870
871 query_table = gst_uri_get_query_table (srtobject->uri);
872
873 GST_DEBUG_OBJECT (srtobject->element,
874 "set uri to (host: %s, port: %d) with %d query strings",
875 gst_uri_get_host (srtobject->uri), gst_uri_get_port (srtobject->uri),
876 query_table == NULL ? 0 : g_hash_table_size (query_table));
877
878 addr_str = gst_uri_get_host (srtobject->uri);
879 if (addr_str)
880 gst_srt_object_set_enum_value (srtobject->parameters,
881 GST_TYPE_SRT_CONNECTION_MODE, "mode", "caller");
882 else
883 gst_srt_object_set_enum_value (srtobject->parameters,
884 GST_TYPE_SRT_CONNECTION_MODE, "mode", "listener");
885
886 if (query_table) {
887 g_hash_table_iter_init (&iter, query_table);
888 while (g_hash_table_iter_next (&iter, &key, &value)) {
889 if (!g_strcmp0 ("mode", key)) {
890 gst_srt_object_set_enum_value (srtobject->parameters,
891 GST_TYPE_SRT_CONNECTION_MODE, key, value);
892 } else if (!g_strcmp0 ("localaddress", key)) {
893 gst_srt_object_set_string_value (srtobject->parameters, key, value);
894 } else if (!g_strcmp0 ("localport", key)) {
895 gst_srt_object_set_uint_value (srtobject->parameters, key, value);
896 } else if (!g_strcmp0 ("poll-timeout", key)) {
897 gst_srt_object_set_int_value (srtobject->parameters, key, value);
898 } else {
899 gst_srt_object_set_socket_option (srtobject->parameters, key, value);
900 }
901 }
902
903 g_hash_table_unref (query_table);
904 }
905
906 gst_srt_object_validate_parameters (srtobject->parameters, srtobject->uri);
907
908 return TRUE;
909 }
910
911 static gpointer
thread_func(gpointer data)912 thread_func (gpointer data)
913 {
914 GstSRTObject *srtobject = data;
915 SRTSOCKET caller_sock;
916 union
917 {
918 struct sockaddr_storage ss;
919 struct sockaddr sa;
920 } caller_sa;
921 int caller_sa_len = sizeof (caller_sa);
922
923 gint poll_timeout;
924
925 SRTSOCKET rsock;
926 gint rsocklen = 1;
927
928 for (;;) {
929 GST_OBJECT_LOCK (srtobject->element);
930 if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
931 &poll_timeout)) {
932 poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
933 }
934 GST_OBJECT_UNLOCK (srtobject->element);
935
936 GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
937
938 if (srt_epoll_wait (srtobject->listener_poll_id, &rsock,
939 &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
940 gint srt_errno = srt_getlasterror (NULL);
941
942 if (srtobject->listener_poll_id == SRT_ERROR)
943 return NULL;
944 if (srt_errno == SRT_ETIMEOUT) {
945 continue;
946 } else {
947 GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
948 ("abort polling: %s", srt_getlasterror_str ()), (NULL));
949 return NULL;
950 }
951 }
952
953 caller_sock =
954 srt_accept (srtobject->listener_sock, &caller_sa.sa, &caller_sa_len);
955
956 if (caller_sock != SRT_INVALID_SOCK) {
957 SRTCaller *caller;
958 gint flag = SRT_EPOLL_ERR;
959
960 caller = srt_caller_new ();
961 caller->sockaddr =
962 g_socket_address_new_from_native (&caller_sa.sa, caller_sa_len);
963 caller->poll_id = srt_epoll_create ();
964 caller->sock = caller_sock;
965
966 if (gst_uri_handler_get_uri_type (GST_URI_HANDLER
967 (srtobject->element)) == GST_URI_SRC) {
968 flag |= SRT_EPOLL_IN;
969 } else {
970 flag |= SRT_EPOLL_OUT;
971 }
972
973 if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) {
974
975 GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS,
976 ("%s", srt_getlasterror_str ()), (NULL));
977
978 srt_caller_free (caller);
979
980 /* try-again */
981 continue;
982 }
983
984 GST_DEBUG_OBJECT (srtobject->element, "Accept to connect %d",
985 caller->sock);
986
987 g_mutex_lock (&srtobject->sock_lock);
988 srtobject->callers = g_list_append (srtobject->callers, caller);
989 g_cond_signal (&srtobject->sock_cond);
990 g_mutex_unlock (&srtobject->sock_lock);
991
992 /* notifying caller-added */
993 g_signal_emit_by_name (srtobject->element, "caller-added", 0,
994 caller->sockaddr);
995
996 if (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) ==
997 GST_URI_SRC)
998 return NULL;
999 }
1000 }
1001 }
1002
1003 static GSocketAddress *
peeraddr_to_g_socket_address(const struct sockaddr * peeraddr)1004 peeraddr_to_g_socket_address (const struct sockaddr *peeraddr)
1005 {
1006 gsize peeraddr_len;
1007
1008 switch (peeraddr->sa_family) {
1009 case AF_INET:
1010 peeraddr_len = sizeof (struct sockaddr_in);
1011 break;
1012 case AF_INET6:
1013 peeraddr_len = sizeof (struct sockaddr_in6);
1014 break;
1015 default:
1016 g_warning ("Unsupported address family %d", peeraddr->sa_family);
1017 return NULL;
1018 }
1019 return g_socket_address_new_from_native ((gpointer) peeraddr, peeraddr_len);
1020 }
1021
1022 static gint
srt_listen_callback_func(GstSRTObject * self,SRTSOCKET sock,int hs_version,const struct sockaddr * peeraddr,const char * stream_id)1023 srt_listen_callback_func (GstSRTObject * self, SRTSOCKET sock, int hs_version,
1024 const struct sockaddr *peeraddr, const char *stream_id)
1025 {
1026 GSocketAddress *addr = peeraddr_to_g_socket_address (peeraddr);
1027
1028 if (!addr) {
1029 GST_WARNING_OBJECT (self->element,
1030 "Invalid peer address. Rejecting sink %d streamid: %s", sock,
1031 stream_id);
1032 return -1;
1033 }
1034
1035 if (self->authentication) {
1036 gboolean authenticated = FALSE;
1037
1038 /* notifying caller-connecting */
1039 g_signal_emit_by_name (self->element, "caller-connecting", addr,
1040 stream_id, &authenticated);
1041
1042 if (!authenticated)
1043 goto reject;
1044 }
1045
1046 GST_DEBUG_OBJECT (self->element,
1047 "Accepting sink %d streamid: %s", sock, stream_id);
1048 g_object_unref (addr);
1049 return 0;
1050 reject:
1051 /* notifying caller-rejected */
1052 GST_WARNING_OBJECT (self->element,
1053 "Rejecting sink %d streamid: %s", sock, stream_id);
1054 g_signal_emit_by_name (self->element, "caller-rejected", addr, stream_id);
1055 g_object_unref (addr);
1056 return -1;
1057 }
1058
1059 static gboolean
gst_srt_object_wait_connect(GstSRTObject * srtobject,GCancellable * cancellable,gpointer sa,size_t sa_len,GError ** error)1060 gst_srt_object_wait_connect (GstSRTObject * srtobject,
1061 GCancellable * cancellable, gpointer sa, size_t sa_len, GError ** error)
1062 {
1063 SRTSOCKET sock = SRT_INVALID_SOCK;
1064 const gchar *local_address = NULL;
1065 guint local_port = 0;
1066 gint sock_flags = SRT_EPOLL_ERR | SRT_EPOLL_IN;
1067
1068 gpointer bind_sa;
1069 gsize bind_sa_len;
1070 GSocketAddress *bind_addr = NULL;
1071
1072 GST_OBJECT_LOCK (srtobject->element);
1073
1074 gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
1075
1076 local_address =
1077 gst_structure_get_string (srtobject->parameters, "localaddress");
1078 if (local_address == NULL)
1079 local_address = GST_SRT_DEFAULT_LOCALADDRESS;
1080
1081 GST_OBJECT_UNLOCK (srtobject->element);
1082
1083 bind_addr =
1084 gst_srt_object_resolve (srtobject, local_address, local_port, cancellable,
1085 error);
1086 if (!bind_addr) {
1087 goto failed;
1088 }
1089
1090 bind_sa_len = g_socket_address_get_native_size (bind_addr);
1091 bind_sa = g_alloca (bind_sa_len);
1092
1093 if (!g_socket_address_to_native (bind_addr, bind_sa, bind_sa_len, error)) {
1094 goto failed;
1095 }
1096
1097 g_clear_object (&bind_addr);
1098
1099 sock = srt_create_socket ();
1100 if (sock == SRT_INVALID_SOCK) {
1101 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_INIT, "%s",
1102 srt_getlasterror_str ());
1103 goto failed;
1104 }
1105
1106 if (!gst_srt_object_set_common_params (sock, srtobject, error)) {
1107 goto failed;
1108 }
1109
1110 GST_DEBUG_OBJECT (srtobject->element, "Binding to %s (port: %d)",
1111 local_address, local_port);
1112
1113 if (srt_bind (sock, bind_sa, bind_sa_len) == SRT_ERROR) {
1114 g_set_error (error, GST_RESOURCE_ERROR,
1115 GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot bind to %s:%d - %s",
1116 local_address, local_port, srt_getlasterror_str ());
1117 goto failed;
1118 }
1119
1120 if (srt_epoll_add_usock (srtobject->listener_poll_id, sock, &sock_flags)) {
1121 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
1122 srt_getlasterror_str ());
1123 goto failed;
1124 }
1125
1126 GST_DEBUG_OBJECT (srtobject->element, "Starting to listen on bind socket");
1127 if (srt_listen (sock, 1) == SRT_ERROR) {
1128 g_set_error (error, GST_RESOURCE_ERROR,
1129 GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot listen on bind socket: %s",
1130 srt_getlasterror_str ());
1131
1132 goto failed;
1133 }
1134
1135 srtobject->listener_sock = sock;
1136
1137 /* Register the SRT listen callback */
1138 if (srt_listen_callback (srtobject->listener_sock,
1139 (srt_listen_callback_fn *) srt_listen_callback_func, srtobject)) {
1140 goto failed;
1141 }
1142
1143 srtobject->thread =
1144 g_thread_try_new ("GstSRTObjectListener", thread_func, srtobject, error);
1145 if (srtobject->thread == NULL) {
1146 GST_ERROR_OBJECT (srtobject->element, "Failed to start thread");
1147 goto failed;
1148 }
1149
1150 return TRUE;
1151
1152 failed:
1153
1154 if (srtobject->listener_poll_id != SRT_ERROR) {
1155 srt_epoll_release (srtobject->listener_poll_id);
1156 }
1157
1158 if (sock != SRT_INVALID_SOCK) {
1159 srt_close (sock);
1160 }
1161
1162 g_clear_object (&bind_addr);
1163
1164 srtobject->listener_poll_id = SRT_ERROR;
1165 srtobject->listener_sock = SRT_INVALID_SOCK;
1166
1167 return FALSE;
1168 }
1169
1170 static gboolean
gst_srt_object_connect(GstSRTObject * srtobject,GCancellable * cancellable,GstSRTConnectionMode connection_mode,gpointer sa,size_t sa_len,GError ** error)1171 gst_srt_object_connect (GstSRTObject * srtobject, GCancellable * cancellable,
1172 GstSRTConnectionMode connection_mode, gpointer sa, size_t sa_len,
1173 GError ** error)
1174 {
1175 SRTSOCKET sock;
1176 gint sock_flags = SRT_EPOLL_ERR;
1177 guint local_port = 0;
1178 const gchar *local_address = NULL;
1179 bool sender;
1180 bool rendezvous;
1181
1182 sock = srt_create_socket ();
1183 if (sock == SRT_INVALID_SOCK) {
1184 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_INIT, "%s",
1185 srt_getlasterror_str ());
1186 goto failed;
1187 }
1188
1189 if (!gst_srt_object_set_common_params (sock, srtobject, error)) {
1190 goto failed;
1191 }
1192
1193 switch (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element))) {
1194 case GST_URI_SRC:
1195 sender = false;
1196 sock_flags |= SRT_EPOLL_IN;
1197 break;
1198 case GST_URI_SINK:
1199 sender = true;
1200 sock_flags |= SRT_EPOLL_OUT;
1201 break;
1202 default:
1203 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
1204 "Cannot determine stream direction");
1205 goto failed;
1206 }
1207
1208 if (srt_setsockopt (sock, 0, SRTO_SENDER, &sender, sizeof sender)) {
1209 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
1210 srt_getlasterror_str ());
1211 goto failed;
1212 }
1213
1214 rendezvous = (connection_mode == GST_SRT_CONNECTION_MODE_RENDEZVOUS);
1215 if (srt_setsockopt (sock, 0, SRTO_RENDEZVOUS, &rendezvous, sizeof rendezvous)) {
1216 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
1217 srt_getlasterror_str ());
1218 goto failed;
1219 }
1220
1221 GST_OBJECT_LOCK (srtobject->element);
1222 gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
1223 local_address =
1224 gst_structure_get_string (srtobject->parameters, "localaddress");
1225 GST_OBJECT_UNLOCK (srtobject->element);
1226
1227 /* According to SRT norm, bind local address and port if specified */
1228 if (local_address != NULL && local_port != 0) {
1229 gpointer bind_sa;
1230 gsize bind_sa_len;
1231
1232 GSocketAddress *bind_addr =
1233 gst_srt_object_resolve (srtobject, local_address,
1234 local_port, cancellable, error);
1235
1236 if (!bind_addr) {
1237 goto failed;
1238 }
1239
1240 bind_sa_len = g_socket_address_get_native_size (bind_addr);
1241 bind_sa = g_alloca (bind_sa_len);
1242
1243 if (!g_socket_address_to_native (bind_addr, bind_sa, bind_sa_len, error)) {
1244 g_clear_object (&bind_addr);
1245 goto failed;
1246 }
1247
1248 g_clear_object (&bind_addr);
1249
1250 GST_DEBUG_OBJECT (srtobject->element, "Binding to %s (port: %d)",
1251 local_address, local_port);
1252
1253 if (srt_bind (sock, bind_sa, bind_sa_len) == SRT_ERROR) {
1254 g_set_error (error, GST_RESOURCE_ERROR,
1255 GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot bind to %s:%d - %s",
1256 local_address, local_port, srt_getlasterror_str ());
1257 goto failed;
1258 }
1259 }
1260
1261 if (srt_epoll_add_usock (srtobject->poll_id, sock, &sock_flags)) {
1262 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
1263 srt_getlasterror_str ());
1264 goto failed;
1265 }
1266
1267 if (srt_connect (sock, sa, sa_len) == SRT_ERROR) {
1268 g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ, "%s",
1269 srt_getlasterror_str ());
1270 goto failed;
1271 }
1272
1273 srtobject->sock = sock;
1274
1275 return TRUE;
1276
1277 failed:
1278
1279 if (srtobject->poll_id != SRT_ERROR) {
1280 srt_epoll_release (srtobject->poll_id);
1281 }
1282
1283 if (sock != SRT_INVALID_SOCK) {
1284 srt_close (sock);
1285 }
1286
1287 srtobject->poll_id = SRT_ERROR;
1288 srtobject->sock = SRT_INVALID_SOCK;
1289
1290 return FALSE;
1291 }
1292
1293 static gboolean
gst_srt_object_open_connection(GstSRTObject * srtobject,GCancellable * cancellable,GstSRTConnectionMode connection_mode,gpointer sa,size_t sa_len,GError ** error)1294 gst_srt_object_open_connection (GstSRTObject * srtobject,
1295 GCancellable * cancellable, GstSRTConnectionMode connection_mode,
1296 gpointer sa, size_t sa_len, GError ** error)
1297 {
1298 gboolean ret = FALSE;
1299
1300 if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1301 ret =
1302 gst_srt_object_wait_connect (srtobject, cancellable, sa, sa_len, error);
1303 } else {
1304 ret =
1305 gst_srt_object_connect (srtobject, cancellable, connection_mode, sa,
1306 sa_len, error);
1307 }
1308
1309 return ret;
1310 }
1311
1312 static gboolean
gst_srt_object_open_internal(GstSRTObject * srtobject,GCancellable * cancellable,GError ** error)1313 gst_srt_object_open_internal (GstSRTObject * srtobject,
1314 GCancellable * cancellable, GError ** error)
1315 {
1316 GSocketAddress *socket_address = NULL;
1317 GstSRTConnectionMode connection_mode;
1318
1319 gpointer sa;
1320 size_t sa_len;
1321 const gchar *addr_str;
1322 guint port;
1323 gboolean ret = FALSE;
1324
1325 GST_OBJECT_LOCK (srtobject->element);
1326
1327 srtobject->opened = FALSE;
1328
1329 if (!gst_structure_get_enum (srtobject->parameters,
1330 "mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) {
1331 GST_WARNING_OBJECT (srtobject->element,
1332 "Cannot get connection mode information." " Use default mode");
1333 connection_mode = GST_SRT_DEFAULT_MODE;
1334 }
1335
1336 addr_str = gst_uri_get_host (srtobject->uri);
1337 if (addr_str == NULL) {
1338 connection_mode = GST_SRT_CONNECTION_MODE_LISTENER;
1339 addr_str = GST_SRT_DEFAULT_LOCALADDRESS;
1340 GST_DEBUG_OBJECT (srtobject->element,
1341 "Given uri doesn't have hostname or address. Use any (%s) and"
1342 " setting listener mode", addr_str);
1343 }
1344
1345 port = gst_uri_get_port (srtobject->uri);
1346
1347 GST_DEBUG_OBJECT (srtobject->element,
1348 "Opening SRT socket with parameters: %" GST_PTR_FORMAT,
1349 srtobject->parameters);
1350
1351 GST_OBJECT_UNLOCK (srtobject->element);
1352
1353 socket_address =
1354 gst_srt_object_resolve (srtobject, addr_str, port, cancellable, error);
1355 if (socket_address == NULL) {
1356 goto out;
1357 }
1358
1359 sa_len = g_socket_address_get_native_size (socket_address);
1360 sa = g_alloca (sa_len);
1361
1362 if (!g_socket_address_to_native (socket_address, sa, sa_len, error)) {
1363 goto out;
1364 }
1365
1366 srtobject->listener_poll_id = srt_epoll_create ();
1367
1368 ret =
1369 gst_srt_object_open_connection
1370 (srtobject, cancellable, connection_mode, sa, sa_len, error);
1371
1372 GST_OBJECT_LOCK (srtobject->element);
1373 srtobject->opened = ret;
1374 GST_OBJECT_UNLOCK (srtobject->element);
1375
1376 out:
1377 g_clear_object (&socket_address);
1378
1379 return ret;
1380 }
1381
1382 gboolean
gst_srt_object_open(GstSRTObject * srtobject,GCancellable * cancellable,GError ** error)1383 gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable,
1384 GError ** error)
1385 {
1386 srtobject->previous_bytes = 0;
1387
1388 return gst_srt_object_open_internal (srtobject, cancellable, error);
1389 }
1390
1391 void
gst_srt_object_close(GstSRTObject * srtobject)1392 gst_srt_object_close (GstSRTObject * srtobject)
1393 {
1394 g_mutex_lock (&srtobject->sock_lock);
1395
1396 if (srtobject->sock != SRT_INVALID_SOCK) {
1397 GstStructure *stats;
1398
1399 if (srtobject->poll_id != SRT_ERROR) {
1400 srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
1401 }
1402
1403 stats = gst_srt_object_accumulate_stats (srtobject, srtobject->sock);
1404
1405 /* FIXME: These are the final statistics for the socket before we close it.
1406 * Deliver the stats to the app before we throw them away. */
1407 gst_structure_free (stats);
1408
1409 GST_DEBUG_OBJECT (srtobject->element, "Closing SRT socket (0x%x)",
1410 srtobject->sock);
1411
1412 srt_close (srtobject->sock);
1413 srtobject->sock = SRT_INVALID_SOCK;
1414 }
1415
1416 if (srtobject->listener_poll_id != SRT_ERROR) {
1417 if (srtobject->listener_sock != SRT_INVALID_SOCK) {
1418 srt_epoll_remove_usock (srtobject->listener_poll_id,
1419 srtobject->listener_sock);
1420 }
1421 srt_epoll_release (srtobject->listener_poll_id);
1422 srtobject->listener_poll_id = SRT_ERROR;
1423 }
1424
1425 if (srtobject->thread) {
1426 GThread *thread = g_steal_pointer (&srtobject->thread);
1427 g_mutex_unlock (&srtobject->sock_lock);
1428 g_thread_join (thread);
1429 g_mutex_lock (&srtobject->sock_lock);
1430 }
1431
1432 if (srtobject->listener_sock != SRT_INVALID_SOCK) {
1433 GST_DEBUG_OBJECT (srtobject->element, "Closing SRT listener socket (0x%x)",
1434 srtobject->listener_sock);
1435
1436 srt_close (srtobject->listener_sock);
1437 srtobject->listener_sock = SRT_INVALID_SOCK;
1438 }
1439
1440 if (srtobject->callers) {
1441 GList *callers = g_steal_pointer (&srtobject->callers);
1442 g_list_foreach (callers, (GFunc) srt_caller_signal_removed, srtobject);
1443 g_list_free_full (callers, (GDestroyNotify) srt_caller_free);
1444 }
1445
1446 g_mutex_unlock (&srtobject->sock_lock);
1447
1448 GST_OBJECT_LOCK (srtobject->element);
1449 srtobject->opened = FALSE;
1450 GST_OBJECT_UNLOCK (srtobject->element);
1451 }
1452
1453 static gboolean
gst_srt_object_wait_caller(GstSRTObject * srtobject,GCancellable * cancellable,GError ** error)1454 gst_srt_object_wait_caller (GstSRTObject * srtobject,
1455 GCancellable * cancellable, GError ** error)
1456 {
1457 gboolean ret;
1458
1459 g_mutex_lock (&srtobject->sock_lock);
1460
1461 if (srtobject->callers == NULL) {
1462 GST_INFO_OBJECT (srtobject->element, "Waiting for connection");
1463
1464 while (!g_cancellable_is_cancelled (cancellable)) {
1465 ret = (srtobject->callers != NULL);
1466 if (ret) {
1467 GST_DEBUG_OBJECT (srtobject->element, "Got a connection");
1468 break;
1469 }
1470
1471 g_cond_wait (&srtobject->sock_cond, &srtobject->sock_lock);
1472 }
1473 } else {
1474 ret = TRUE;
1475 }
1476
1477 g_mutex_unlock (&srtobject->sock_lock);
1478
1479 if (!ret) {
1480 g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_FAILED,
1481 "Canceled waiting for a connection.");
1482 }
1483
1484 return ret;
1485 }
1486
1487 gssize
gst_srt_object_read(GstSRTObject * srtobject,guint8 * data,gsize size,GCancellable * cancellable,GError ** error,SRT_MSGCTRL * mctrl)1488 gst_srt_object_read (GstSRTObject * srtobject,
1489 guint8 * data, gsize size, GCancellable * cancellable, GError ** error,
1490 SRT_MSGCTRL * mctrl)
1491 {
1492 gssize len = 0;
1493 gint poll_timeout;
1494 GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
1495 gint poll_id = SRT_ERROR;
1496
1497 /* Only source element can read data */
1498 g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
1499 (srtobject->element)) == GST_URI_SRC, -1);
1500
1501 GST_OBJECT_LOCK (srtobject->element);
1502
1503 gst_structure_get_enum (srtobject->parameters, "mode",
1504 GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
1505
1506 if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
1507 &poll_timeout)) {
1508 poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
1509 }
1510
1511 GST_OBJECT_UNLOCK (srtobject->element);
1512
1513 if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1514 if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
1515 return -1;
1516
1517 g_mutex_lock (&srtobject->sock_lock);
1518 if (srtobject->callers) {
1519 SRTCaller *caller = srtobject->callers->data;
1520 poll_id = caller->poll_id;
1521 }
1522 g_mutex_unlock (&srtobject->sock_lock);
1523
1524 if (poll_id == SRT_ERROR)
1525 return 0;
1526 } else {
1527 poll_id = srtobject->poll_id;
1528 }
1529
1530 while (!g_cancellable_is_cancelled (cancellable)) {
1531
1532 SRTSOCKET rsock;
1533 gint rsocklen = 1;
1534 SRTSOCKET wsock;
1535 gint wsocklen = 1;
1536
1537 if (srt_epoll_wait (poll_id, &rsock, &rsocklen, &wsock, &wsocklen,
1538 poll_timeout, NULL, 0, NULL, 0) < 0) {
1539 gint srt_errno = srt_getlasterror (NULL);
1540
1541 if (srt_errno != SRT_ETIMEOUT) {
1542 return 0;
1543 }
1544 continue;
1545 }
1546
1547 if (wsocklen == 1 && rsocklen == 1) {
1548 /* Socket reported in wsock AND rsock signifies an error. */
1549 gint reason = srt_getrejectreason (wsock);
1550 gboolean is_auth_error = (reason == SRT_REJ_BADSECRET
1551 || reason == SRT_REJ_UNSECURE);
1552
1553 if (is_auth_error) {
1554 ELEMENT_WARNING_SRTSOCK_ERROR (NOT_AUTHORIZED, reason);
1555 }
1556
1557 if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1558 /* Caller has disappeared. */
1559 return 0;
1560 } else {
1561 if (!is_auth_error) {
1562 ELEMENT_WARNING_SRTSOCK_ERROR (READ, reason);
1563 }
1564
1565 gst_srt_object_close (srtobject);
1566 if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
1567 return -1;
1568 }
1569 }
1570 continue;
1571 }
1572
1573
1574 srt_msgctrl_init (mctrl);
1575 len = srt_recvmsg2 (rsock, (char *) (data), size, mctrl);
1576
1577 if (len == SRT_ERROR) {
1578 gint srt_errno = srt_getlasterror (NULL);
1579 if (srt_errno == SRT_EASYNCRCV) {
1580 continue;
1581 } else {
1582 g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_READ,
1583 "Failed to receive from SRT socket: %s", srt_getlasterror_str ());
1584 return -1;
1585 }
1586 }
1587 break;
1588 }
1589
1590 return len;
1591 }
1592
1593 void
gst_srt_object_wakeup(GstSRTObject * srtobject,GCancellable * cancellable)1594 gst_srt_object_wakeup (GstSRTObject * srtobject, GCancellable * cancellable)
1595 {
1596 GST_DEBUG_OBJECT (srtobject->element, "waking up SRT");
1597
1598 /* Removing all socket descriptors from the monitoring list
1599 * wakes up SRT's threads. We only have one to remove. */
1600 if (srtobject->sock != SRT_INVALID_SOCK && srtobject->poll_id != SRT_ERROR) {
1601 srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
1602 }
1603
1604 /* connection is only waited for in listener mode,
1605 * but there is no harm in raising signal in any case */
1606 g_mutex_lock (&srtobject->sock_lock);
1607 /* however, a race might be harmful ...
1608 * the cancellation is used as 'flushing' flag here,
1609 * so make sure it is so detected by the intended part at proper time */
1610 g_cancellable_cancel (cancellable);
1611 g_cond_signal (&srtobject->sock_cond);
1612 g_mutex_unlock (&srtobject->sock_lock);
1613 }
1614
1615 static gboolean
gst_srt_object_send_headers(GstSRTObject * srtobject,SRTSOCKET sock,gint poll_id,gint poll_timeout,GstBufferList * headers,GCancellable * cancellable)1616 gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
1617 gint poll_id, gint poll_timeout, GstBufferList * headers,
1618 GCancellable * cancellable)
1619 {
1620 guint size, i;
1621
1622 if (!headers)
1623 return TRUE;
1624
1625 size = gst_buffer_list_length (headers);
1626
1627 GST_DEBUG_OBJECT (srtobject->element, "Sending %u stream headers", size);
1628
1629 for (i = 0; i < size; i++) {
1630 SRTSOCKET wsock = sock;
1631 gint wsocklen = 1;
1632
1633 GstBuffer *buffer = gst_buffer_list_get (headers, i);
1634 GstMapInfo mapinfo;
1635
1636 if (g_cancellable_is_cancelled (cancellable)) {
1637 return FALSE;
1638 }
1639
1640 if (poll_id > 0 && srt_epoll_wait (poll_id, 0, 0, &wsock,
1641 &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
1642 continue;
1643 }
1644
1645 GST_TRACE_OBJECT (srtobject->element, "sending header %u %" GST_PTR_FORMAT,
1646 i, buffer);
1647
1648 if (!gst_buffer_map (buffer, &mapinfo, GST_MAP_READ)) {
1649 GST_ELEMENT_ERROR (srtobject->element, RESOURCE, READ,
1650 ("Could not map the input stream"), (NULL));
1651 return FALSE;
1652 }
1653
1654 if (srt_sendmsg2 (wsock, (char *) mapinfo.data, mapinfo.size,
1655 0) == SRT_ERROR) {
1656 GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
1657 ("%s", srt_getlasterror_str ()));
1658 gst_buffer_unmap (buffer, &mapinfo);
1659 return FALSE;
1660 }
1661
1662 gst_buffer_unmap (buffer, &mapinfo);
1663 }
1664
1665 return TRUE;
1666 }
1667
1668 static gssize
gst_srt_object_write_to_callers(GstSRTObject * srtobject,GstBufferList * headers,const GstMapInfo * mapinfo,GCancellable * cancellable,GError ** error)1669 gst_srt_object_write_to_callers (GstSRTObject * srtobject,
1670 GstBufferList * headers,
1671 const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1672 {
1673 GList *callers;
1674
1675 g_mutex_lock (&srtobject->sock_lock);
1676 callers = srtobject->callers;
1677 while (callers != NULL) {
1678 gssize len = 0;
1679 const guint8 *msg = mapinfo->data;
1680 gint sent;
1681 gint payload_size, optlen = sizeof (payload_size);
1682
1683 SRTCaller *caller = callers->data;
1684 callers = callers->next;
1685
1686 if (g_cancellable_is_cancelled (cancellable)) {
1687 goto cancelled;
1688 }
1689
1690 if (!caller->sent_headers) {
1691 if (!gst_srt_object_send_headers (srtobject, caller->sock, -1,
1692 -1, headers, cancellable)) {
1693 goto err;
1694 }
1695 caller->sent_headers = TRUE;
1696 }
1697
1698 if (srt_getsockflag (caller->sock, SRTO_PAYLOADSIZE, &payload_size,
1699 &optlen)) {
1700 GST_WARNING_OBJECT (srtobject->element, "%s", srt_getlasterror_str ());
1701 goto err;
1702 }
1703
1704 while (len < mapinfo->size) {
1705 gint rest = MIN (mapinfo->size - len, payload_size);
1706 sent = srt_sendmsg2 (caller->sock, (char *) (msg + len), rest, 0);
1707 if (sent < 0) {
1708 GST_WARNING_OBJECT (srtobject->element, "Dropping caller %d: %s",
1709 caller->sock, srt_getlasterror_str ());
1710 goto err;
1711 }
1712 len += sent;
1713 }
1714
1715 continue;
1716
1717 err:
1718 srtobject->callers = g_list_remove (srtobject->callers, caller);
1719 srt_caller_signal_removed (caller, srtobject);
1720 srt_caller_free (caller);
1721 }
1722
1723 g_mutex_unlock (&srtobject->sock_lock);
1724 return mapinfo->size;
1725
1726 cancelled:
1727 g_mutex_unlock (&srtobject->sock_lock);
1728 return -1;
1729 }
1730
1731 static gssize
gst_srt_object_write_one(GstSRTObject * srtobject,GstBufferList * headers,const GstMapInfo * mapinfo,GCancellable * cancellable,GError ** error)1732 gst_srt_object_write_one (GstSRTObject * srtobject,
1733 GstBufferList * headers,
1734 const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1735 {
1736 gssize len = 0;
1737 gint poll_timeout;
1738 const guint8 *msg = mapinfo->data;
1739 gint payload_size, optlen = sizeof (payload_size);
1740 gboolean wait_for_connection;
1741
1742 GST_OBJECT_LOCK (srtobject->element);
1743 wait_for_connection = srtobject->wait_for_connection;
1744 if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
1745 &poll_timeout)) {
1746 poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
1747 }
1748 GST_OBJECT_UNLOCK (srtobject->element);
1749
1750 if (!srtobject->sent_headers) {
1751 if (!gst_srt_object_send_headers (srtobject, srtobject->sock,
1752 srtobject->poll_id, poll_timeout, headers, cancellable)) {
1753 return -1;
1754 }
1755 srtobject->sent_headers = TRUE;
1756 }
1757
1758 while (len < mapinfo->size) {
1759 SRTSOCKET rsock;
1760 gint rsocklen = 1;
1761 SRTSOCKET wsock;
1762 gint wsocklen = 1;
1763
1764 gint sent;
1765 gint rest;
1766
1767 if (g_cancellable_is_cancelled (cancellable)) {
1768 break;
1769 }
1770
1771 if (!wait_for_connection &&
1772 srt_getsockstate (srtobject->sock) == SRTS_CONNECTING) {
1773 GST_LOG_OBJECT (srtobject->element,
1774 "Not connected yet. Dropping the buffer.");
1775 break;
1776 }
1777
1778 if (srt_epoll_wait (srtobject->poll_id, &rsock, &rsocklen, &wsock,
1779 &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
1780 continue;
1781 }
1782
1783 if (wsocklen == 1 && rsocklen == 1) {
1784 /* Socket reported in wsock AND rsock signifies an error. */
1785 gint reason = srt_getrejectreason (wsock);
1786
1787 if (reason == SRT_REJ_BADSECRET || reason == SRT_REJ_UNSECURE) {
1788 ELEMENT_WARNING_SRTSOCK_ERROR (NOT_AUTHORIZED, reason);
1789 } else {
1790 ELEMENT_WARNING_SRTSOCK_ERROR (WRITE, reason);
1791 }
1792
1793 gst_srt_object_close (srtobject);
1794 if (!gst_srt_object_open_internal (srtobject, cancellable, error)) {
1795 return -1;
1796 }
1797 continue;
1798 }
1799
1800 if (srt_getsockflag (wsock, SRTO_PAYLOADSIZE, &payload_size, &optlen)) {
1801 GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
1802 ("%s", srt_getlasterror_str ()));
1803 break;
1804 }
1805
1806 rest = MIN (mapinfo->size - len, payload_size);
1807
1808 sent = srt_sendmsg2 (wsock, (char *) (msg + len), rest, 0);
1809 if (sent < 0) {
1810 GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
1811 ("%s", srt_getlasterror_str ()));
1812 break;
1813 }
1814 len += sent;
1815 }
1816
1817 return len;
1818 }
1819
1820 gssize
gst_srt_object_write(GstSRTObject * srtobject,GstBufferList * headers,const GstMapInfo * mapinfo,GCancellable * cancellable,GError ** error)1821 gst_srt_object_write (GstSRTObject * srtobject,
1822 GstBufferList * headers,
1823 const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1824 {
1825 gssize len = 0;
1826 GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
1827 gboolean wait_for_connection;
1828
1829 /* Only sink element can write data */
1830 g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
1831 (srtobject->element)) == GST_URI_SINK, -1);
1832
1833 GST_OBJECT_LOCK (srtobject->element);
1834 gst_structure_get_enum (srtobject->parameters, "mode",
1835 GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
1836 wait_for_connection = srtobject->wait_for_connection;
1837 GST_OBJECT_UNLOCK (srtobject->element);
1838
1839 if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1840 if (wait_for_connection) {
1841 if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
1842 return -1;
1843 }
1844 len =
1845 gst_srt_object_write_to_callers (srtobject, headers, mapinfo,
1846 cancellable, error);
1847 } else {
1848 len =
1849 gst_srt_object_write_one (srtobject, headers, mapinfo, cancellable,
1850 error);
1851 }
1852
1853 return len;
1854 }
1855
1856 static GstStructure *
get_stats_for_srtsock(SRTSOCKET srtsock,gboolean is_sender,guint64 * bytes)1857 get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender, guint64 * bytes)
1858 {
1859 GstStructure *s = gst_structure_new_empty ("application/x-srt-statistics");
1860 int ret;
1861 SRT_TRACEBSTATS stats;
1862
1863 ret = srt_bstats (srtsock, &stats, 0);
1864
1865 if (ret >= 0) {
1866 if (is_sender) {
1867 gst_structure_set (s,
1868 /* number of sent data packets, including retransmissions */
1869 "packets-sent", G_TYPE_INT64, stats.pktSent,
1870 /* number of lost packets (sender side) */
1871 "packets-sent-lost", G_TYPE_INT, stats.pktSndLoss,
1872 /* number of retransmitted packets */
1873 "packets-retransmitted", G_TYPE_INT, stats.pktRetrans,
1874 /* number of received ACK packets */
1875 "packet-ack-received", G_TYPE_INT, stats.pktRecvACK,
1876 /* number of received NAK packets */
1877 "packet-nack-received", G_TYPE_INT, stats.pktRecvNAK,
1878 /* time duration when UDT is sending data (idle time exclusive) */
1879 "send-duration-us", G_TYPE_INT64, stats.usSndDuration,
1880 /* number of sent data bytes, including retransmissions */
1881 "bytes-sent", G_TYPE_UINT64, stats.byteSent,
1882 /* number of retransmitted bytes */
1883 "bytes-retransmitted", G_TYPE_UINT64, stats.byteRetrans,
1884 /* number of too-late-to-send dropped bytes */
1885 "bytes-sent-dropped", G_TYPE_UINT64, stats.byteSndDrop,
1886 /* number of too-late-to-send dropped packets */
1887 "packets-sent-dropped", G_TYPE_INT, stats.pktSndDrop,
1888 /* sending rate in Mb/s */
1889 "send-rate-mbps", G_TYPE_DOUBLE, stats.mbpsSendRate,
1890 /* busy sending time (i.e., idle time exclusive) */
1891 "send-duration-us", G_TYPE_UINT64, stats.usSndDuration,
1892 "negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay, NULL);
1893 *bytes += stats.byteSent;
1894 } else {
1895 gst_structure_set (s,
1896 "packets-received", G_TYPE_INT64, stats.pktRecvTotal,
1897 "packets-received-lost", G_TYPE_INT, stats.pktRcvLossTotal,
1898 /* number of sent ACK packets */
1899 "packet-ack-sent", G_TYPE_INT, stats.pktSentACK,
1900 /* number of sent NAK packets */
1901 "packet-nack-sent", G_TYPE_INT, stats.pktSentNAK,
1902 "bytes-received", G_TYPE_UINT64, stats.byteRecvTotal,
1903 "bytes-received-lost", G_TYPE_UINT64, stats.byteRcvLossTotal,
1904 "receive-rate-mbps", G_TYPE_DOUBLE, stats.mbpsRecvRate,
1905 "negotiated-latency-ms", G_TYPE_INT, stats.msRcvTsbPdDelay, NULL);
1906 *bytes += stats.byteRecvTotal;
1907 }
1908
1909 gst_structure_set (s,
1910 /* estimated bandwidth, in Mb/s */
1911 "bandwidth-mbps", G_TYPE_DOUBLE, stats.mbpsBandwidth,
1912 "rtt-ms", G_TYPE_DOUBLE, stats.msRTT, NULL);
1913
1914 }
1915
1916 return s;
1917 }
1918
1919 GstStructure *
gst_srt_object_get_stats(GstSRTObject * srtobject)1920 gst_srt_object_get_stats (GstSRTObject * srtobject)
1921 {
1922 GstStructure *s = NULL;
1923 gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
1924 guint64 bytes;
1925
1926 g_mutex_lock (&srtobject->sock_lock);
1927
1928 bytes = srtobject->previous_bytes;
1929
1930 if (srtobject->sock != SRT_INVALID_SOCK) {
1931 s = get_stats_for_srtsock (srtobject->sock, is_sender, &bytes);
1932 goto done;
1933 }
1934
1935 s = gst_structure_new_empty ("application/x-srt-statistics");
1936
1937 if (srtobject->callers) {
1938 GValueArray *callers_stats = g_value_array_new (1);
1939 GValue callers_stats_v = G_VALUE_INIT;
1940 GList *item;
1941
1942 for (item = srtobject->callers; item; item = item->next) {
1943 SRTCaller *caller = item->data;
1944 GstStructure *tmp;
1945 GValue *v;
1946
1947 tmp = get_stats_for_srtsock (caller->sock, is_sender, &bytes);
1948
1949 gst_structure_set (tmp, "caller-address", G_TYPE_SOCKET_ADDRESS,
1950 caller->sockaddr, NULL);
1951
1952 g_value_array_append (callers_stats, NULL);
1953 v = g_value_array_get_nth (callers_stats, callers_stats->n_values - 1);
1954 g_value_init (v, GST_TYPE_STRUCTURE);
1955 g_value_take_boxed (v, tmp);
1956 }
1957
1958 g_value_init (&callers_stats_v, G_TYPE_VALUE_ARRAY);
1959 g_value_take_boxed (&callers_stats_v, callers_stats);
1960 gst_structure_take_value (s, "callers", &callers_stats_v);
1961 }
1962
1963 done:
1964 gst_structure_set (s, is_sender ? "bytes-sent-total" : "bytes-received-total",
1965 G_TYPE_UINT64, bytes, NULL);
1966
1967 g_mutex_unlock (&srtobject->sock_lock);
1968
1969 return s;
1970 }
1971
1972 static GstStructure *
gst_srt_object_accumulate_stats(GstSRTObject * srtobject,SRTSOCKET srtsock)1973 gst_srt_object_accumulate_stats (GstSRTObject * srtobject, SRTSOCKET srtsock)
1974 {
1975 gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
1976 GstStructure *stats;
1977 guint64 bytes = 0;
1978
1979 stats = get_stats_for_srtsock (srtsock, is_sender, &bytes);
1980 srtobject->previous_bytes += bytes;
1981
1982 return stats;
1983 }
1984