1 /* GStreamer
2 * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3 * Copyright (C) <2009> Jarkko Palviainen <jarkko.palviainen@sesca.com>
4 * Copyright (C) <2012> Collabora Ltd.
5 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
16 *
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 /**
24 * SECTION:element-multiudpsink
25 * @title: multiudpsink
26 * @see_also: udpsink, multifdsink
27 *
28 * multiudpsink is a network sink that sends UDP packets to multiple
29 * clients.
30 * It can be combined with rtp payload encoders to implement RTP streaming.
31 */
32
33 #ifdef HAVE_CONFIG_H
34 #include "config.h"
35 #endif
36 #include "gstudpelements.h"
37 #include "gstmultiudpsink.h"
38
39 #include <string.h>
40
41 #ifdef HAVE_SYS_SOCKET_H
42 #include <sys/socket.h>
43 #endif
44
45 #include <gio/gnetworking.h>
46
47 #include "gst/net/net.h"
48 #include "gst/glib-compat-private.h"
49
50 GST_DEBUG_CATEGORY_STATIC (multiudpsink_debug);
51 #define GST_CAT_DEFAULT (multiudpsink_debug)
52
53 #define UDP_MAX_SIZE 65507
54
55 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
56 GST_PAD_SINK,
57 GST_PAD_ALWAYS,
58 GST_STATIC_CAPS_ANY);
59
60 /* MultiUDPSink signals and args */
61 enum
62 {
63 /* methods */
64 SIGNAL_ADD,
65 SIGNAL_REMOVE,
66 SIGNAL_CLEAR,
67 SIGNAL_GET_STATS,
68
69 /* signals */
70 SIGNAL_CLIENT_ADDED,
71 SIGNAL_CLIENT_REMOVED,
72
73 /* FILL ME */
74 LAST_SIGNAL
75 };
76
77 #define DEFAULT_SOCKET NULL
78 #define DEFAULT_CLOSE_SOCKET TRUE
79 #define DEFAULT_USED_SOCKET NULL
80 #define DEFAULT_CLIENTS NULL
81 /* FIXME, this should be disabled by default, we don't need to join a multicast
82 * group for sending, if this socket is also used for receiving, it should
83 * be configured in the element that does the receive. */
84 #define DEFAULT_AUTO_MULTICAST TRUE
85 #define DEFAULT_MULTICAST_IFACE NULL
86 #define DEFAULT_TTL 64
87 #define DEFAULT_TTL_MC 1
88 #define DEFAULT_LOOP TRUE
89 #define DEFAULT_FORCE_IPV4 FALSE
90 #define DEFAULT_QOS_DSCP -1
91 #define DEFAULT_SEND_DUPLICATES TRUE
92 #define DEFAULT_BUFFER_SIZE 0
93 #define DEFAULT_BIND_ADDRESS NULL
94 #define DEFAULT_BIND_PORT 0
95
96 enum
97 {
98 PROP_0,
99 PROP_BYTES_TO_SERVE,
100 PROP_BYTES_SERVED,
101 PROP_SOCKET,
102 PROP_SOCKET_V6,
103 PROP_CLOSE_SOCKET,
104 PROP_USED_SOCKET,
105 PROP_USED_SOCKET_V6,
106 PROP_CLIENTS,
107 PROP_AUTO_MULTICAST,
108 PROP_MULTICAST_IFACE,
109 PROP_TTL,
110 PROP_TTL_MC,
111 PROP_LOOP,
112 PROP_FORCE_IPV4,
113 PROP_QOS_DSCP,
114 PROP_SEND_DUPLICATES,
115 PROP_BUFFER_SIZE,
116 PROP_BIND_ADDRESS,
117 PROP_BIND_PORT
118 };
119
120 static void gst_multiudpsink_finalize (GObject * object);
121
122 static GstFlowReturn gst_multiudpsink_render (GstBaseSink * sink,
123 GstBuffer * buffer);
124 static GstFlowReturn gst_multiudpsink_render_list (GstBaseSink * bsink,
125 GstBufferList * buffer_list);
126
127 static gboolean gst_multiudpsink_start (GstBaseSink * bsink);
128 static gboolean gst_multiudpsink_stop (GstBaseSink * bsink);
129 static gboolean gst_multiudpsink_unlock (GstBaseSink * bsink);
130 static gboolean gst_multiudpsink_unlock_stop (GstBaseSink * bsink);
131
132 static void gst_multiudpsink_set_property (GObject * object, guint prop_id,
133 const GValue * value, GParamSpec * pspec);
134 static void gst_multiudpsink_get_property (GObject * object, guint prop_id,
135 GValue * value, GParamSpec * pspec);
136
137 static void gst_multiudpsink_add_internal (GstMultiUDPSink * sink,
138 const gchar * host, gint port, gboolean lock);
139 static void gst_multiudpsink_clear_internal (GstMultiUDPSink * sink,
140 gboolean lock);
141
142 static guint gst_multiudpsink_signals[LAST_SIGNAL] = { 0 };
143
144 #define gst_multiudpsink_parent_class parent_class
145 G_DEFINE_TYPE (GstMultiUDPSink, gst_multiudpsink, GST_TYPE_BASE_SINK);
146 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (multiudpsink, "multiudpsink",
147 GST_RANK_NONE, GST_TYPE_MULTIUDPSINK, udp_element_init (plugin));
148
149 static void
gst_multiudpsink_class_init(GstMultiUDPSinkClass * klass)150 gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass)
151 {
152 GObjectClass *gobject_class;
153 GstElementClass *gstelement_class;
154 GstBaseSinkClass *gstbasesink_class;
155
156 gobject_class = (GObjectClass *) klass;
157 gstelement_class = (GstElementClass *) klass;
158 gstbasesink_class = (GstBaseSinkClass *) klass;
159
160 gobject_class->set_property = gst_multiudpsink_set_property;
161 gobject_class->get_property = gst_multiudpsink_get_property;
162 gobject_class->finalize = gst_multiudpsink_finalize;
163
164 /**
165 * GstMultiUDPSink::add:
166 * @gstmultiudpsink: the sink on which the signal is emitted
167 * @host: the hostname/IP address of the client to add
168 * @port: the port of the client to add
169 *
170 * Add a client with destination @host and @port to the list of
171 * clients. When the same host/port pair is added multiple times, the
172 * send-duplicates property defines if the packets are sent multiple times to
173 * the same host/port pair or not.
174 *
175 * When a host/port pair is added multiple times, an equal amount of remove
176 * calls must be performed to actually remove the host/port pair from the list
177 * of destinations.
178 */
179 gst_multiudpsink_signals[SIGNAL_ADD] =
180 g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
181 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
182 G_STRUCT_OFFSET (GstMultiUDPSinkClass, add),
183 NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_INT);
184 /**
185 * GstMultiUDPSink::remove:
186 * @gstmultiudpsink: the sink on which the signal is emitted
187 * @host: the hostname/IP address of the client to remove
188 * @port: the port of the client to remove
189 *
190 * Remove the client with destination @host and @port from the list of
191 * clients.
192 */
193 gst_multiudpsink_signals[SIGNAL_REMOVE] =
194 g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
195 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
196 G_STRUCT_OFFSET (GstMultiUDPSinkClass, remove),
197 NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_INT);
198 /**
199 * GstMultiUDPSink::clear:
200 * @gstmultiudpsink: the sink on which the signal is emitted
201 *
202 * Clear the list of clients.
203 */
204 gst_multiudpsink_signals[SIGNAL_CLEAR] =
205 g_signal_new ("clear", G_TYPE_FROM_CLASS (klass),
206 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
207 G_STRUCT_OFFSET (GstMultiUDPSinkClass, clear), NULL, NULL, NULL,
208 G_TYPE_NONE, 0);
209 /**
210 * GstMultiUDPSink::get-stats:
211 * @gstmultiudpsink: the sink on which the signal is emitted
212 * @host: the hostname/IP address of the client to get stats on
213 * @port: the port of the client to get stats on
214 *
215 * Get the statistics of the client with destination @host and @port.
216 *
217 * Returns: a GstStructure: bytes_sent, packets_sent, connect_time
218 * (in epoch nanoseconds), disconnect_time (in epoch
219 * nanoseconds)
220 */
221 gst_multiudpsink_signals[SIGNAL_GET_STATS] =
222 g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
223 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
224 G_STRUCT_OFFSET (GstMultiUDPSinkClass, get_stats),
225 NULL, NULL, NULL, GST_TYPE_STRUCTURE, 2, G_TYPE_STRING, G_TYPE_INT);
226 /**
227 * GstMultiUDPSink::client-added:
228 * @gstmultiudpsink: the sink emitting the signal
229 * @host: the hostname/IP address of the added client
230 * @port: the port of the added client
231 *
232 * Signal emitted when a new client is added to the list of
233 * clients.
234 */
235 gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED] =
236 g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
237 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass, client_added),
238 NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_INT);
239 /**
240 * GstMultiUDPSink::client-removed:
241 * @gstmultiudpsink: the sink emitting the signal
242 * @host: the hostname/IP address of the removed client
243 * @port: the port of the removed client
244 *
245 * Signal emitted when a client is removed from the list of
246 * clients.
247 */
248 gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED] =
249 g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
250 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass,
251 client_removed), NULL, NULL, NULL,
252 G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_INT);
253
254 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_TO_SERVE,
255 g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
256 "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
257 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
258 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_SERVED,
259 g_param_spec_uint64 ("bytes-served", "Bytes served",
260 "Total number of bytes sent to all clients", 0, G_MAXUINT64, 0,
261 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
262 g_object_class_install_property (gobject_class, PROP_SOCKET,
263 g_param_spec_object ("socket", "Socket Handle",
264 "Socket to use for UDP sending. (NULL == allocate)",
265 G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266 g_object_class_install_property (gobject_class, PROP_SOCKET_V6,
267 g_param_spec_object ("socket-v6", "Socket Handle IPv6",
268 "Socket to use for UDPv6 sending. (NULL == allocate)",
269 G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
270 g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
271 g_param_spec_boolean ("close-socket", "Close socket",
272 "Close socket if passed as property on state change",
273 DEFAULT_CLOSE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
274 g_object_class_install_property (gobject_class, PROP_USED_SOCKET,
275 g_param_spec_object ("used-socket", "Used Socket Handle",
276 "Socket currently in use for UDP sending. (NULL == no socket)",
277 G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
278 g_object_class_install_property (gobject_class, PROP_USED_SOCKET_V6,
279 g_param_spec_object ("used-socket-v6", "Used Socket Handle IPv6",
280 "Socket currently in use for UDPv6 sending. (NULL == no socket)",
281 G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
282 g_object_class_install_property (gobject_class, PROP_CLIENTS,
283 g_param_spec_string ("clients", "Clients",
284 "A comma separated list of host:port pairs with destinations",
285 DEFAULT_CLIENTS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
286 g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
287 g_param_spec_boolean ("auto-multicast",
288 "Automatically join/leave multicast groups",
289 "Automatically join/leave the multicast groups, FALSE means user"
290 " has to do it himself", DEFAULT_AUTO_MULTICAST,
291 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
292 g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
293 g_param_spec_string ("multicast-iface", "Multicast Interface",
294 "The network interface on which to join the multicast group",
295 DEFAULT_MULTICAST_IFACE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
296 g_object_class_install_property (gobject_class, PROP_TTL,
297 g_param_spec_int ("ttl", "Unicast TTL",
298 "Used for setting the unicast TTL parameter",
299 0, 255, DEFAULT_TTL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
300 g_object_class_install_property (gobject_class, PROP_TTL_MC,
301 g_param_spec_int ("ttl-mc", "Multicast TTL",
302 "Used for setting the multicast TTL parameter",
303 0, 255, DEFAULT_TTL_MC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
304 g_object_class_install_property (gobject_class, PROP_LOOP,
305 g_param_spec_boolean ("loop", "Multicast Loopback",
306 "Used for setting the multicast loop parameter. TRUE = enable,"
307 " FALSE = disable", DEFAULT_LOOP,
308 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
309 /**
310 * GstMultiUDPSink::force-ipv4:
311 *
312 * Force the use of an IPv4 socket.
313 *
314 * Since: 1.0.2
315 */
316 #ifndef GST_REMOVE_DEPRECATED
317 g_object_class_install_property (gobject_class, PROP_FORCE_IPV4,
318 g_param_spec_boolean ("force-ipv4", "Force IPv4",
319 "Forcing the use of an IPv4 socket (DEPRECATED, has no effect anymore)",
320 DEFAULT_FORCE_IPV4,
321 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
322 #endif
323 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_QOS_DSCP,
324 g_param_spec_int ("qos-dscp", "QoS diff srv code point",
325 "Quality of Service, differentiated services code point (-1 default)",
326 -1, 63, DEFAULT_QOS_DSCP,
327 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
328 /**
329 * GstMultiUDPSink::send-duplicates:
330 *
331 * When a host/port pair is added multiple times, send the packet to the host
332 * multiple times as well.
333 */
334 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEND_DUPLICATES,
335 g_param_spec_boolean ("send-duplicates", "Send Duplicates",
336 "When a distination/port pair is added multiple times, send packets "
337 "multiple times as well", DEFAULT_SEND_DUPLICATES,
338 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
339
340 g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
341 g_param_spec_int ("buffer-size", "Buffer Size",
342 "Size of the kernel send buffer in bytes, 0=default", 0, G_MAXINT,
343 DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
344
345 g_object_class_install_property (gobject_class, PROP_BIND_ADDRESS,
346 g_param_spec_string ("bind-address", "Bind Address",
347 "Address to bind the socket to", DEFAULT_BIND_ADDRESS,
348 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
349 g_object_class_install_property (gobject_class, PROP_BIND_PORT,
350 g_param_spec_int ("bind-port", "Bind Port",
351 "Port to bind the socket to", 0, G_MAXUINT16,
352 DEFAULT_BIND_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
353
354 gst_element_class_add_static_pad_template (gstelement_class, &sink_template);
355
356 gst_element_class_set_static_metadata (gstelement_class, "UDP packet sender",
357 "Sink/Network",
358 "Send data over the network via UDP to one or multiple recipients "
359 "which can be added or removed at runtime using action signals",
360 "Wim Taymans <wim.taymans@gmail.com>");
361
362 gstbasesink_class->render = gst_multiudpsink_render;
363 gstbasesink_class->render_list = gst_multiudpsink_render_list;
364 gstbasesink_class->start = gst_multiudpsink_start;
365 gstbasesink_class->stop = gst_multiudpsink_stop;
366 gstbasesink_class->unlock = gst_multiudpsink_unlock;
367 gstbasesink_class->unlock_stop = gst_multiudpsink_unlock_stop;
368 klass->add = gst_multiudpsink_add;
369 klass->remove = gst_multiudpsink_remove;
370 klass->clear = gst_multiudpsink_clear;
371 klass->get_stats = gst_multiudpsink_get_stats;
372
373 GST_DEBUG_CATEGORY_INIT (multiudpsink_debug, "multiudpsink", 0, "UDP sink");
374 }
375
376 static void
gst_multiudpsink_create_cancellable(GstMultiUDPSink * sink)377 gst_multiudpsink_create_cancellable (GstMultiUDPSink * sink)
378 {
379 GPollFD pollfd;
380
381 sink->cancellable = g_cancellable_new ();
382 sink->made_cancel_fd = g_cancellable_make_pollfd (sink->cancellable, &pollfd);
383 }
384
385 static void
gst_multiudpsink_free_cancellable(GstMultiUDPSink * sink)386 gst_multiudpsink_free_cancellable (GstMultiUDPSink * sink)
387 {
388 if (sink->made_cancel_fd) {
389 g_cancellable_release_fd (sink->cancellable);
390 sink->made_cancel_fd = FALSE;
391 }
392 g_object_unref (sink->cancellable);
393 sink->cancellable = NULL;
394 }
395
396 static void
gst_multiudpsink_init(GstMultiUDPSink * sink)397 gst_multiudpsink_init (GstMultiUDPSink * sink)
398 {
399 guint max_mem;
400
401 g_mutex_init (&sink->client_lock);
402 sink->clients = NULL;
403 sink->num_v4_unique = 0;
404 sink->num_v4_all = 0;
405 sink->num_v6_unique = 0;
406 sink->num_v6_all = 0;
407
408 sink->socket = DEFAULT_SOCKET;
409 sink->socket_v6 = DEFAULT_SOCKET;
410 sink->used_socket = DEFAULT_USED_SOCKET;
411 sink->used_socket_v6 = DEFAULT_USED_SOCKET;
412 sink->close_socket = DEFAULT_CLOSE_SOCKET;
413 sink->external_socket = (sink->socket != NULL);
414 sink->auto_multicast = DEFAULT_AUTO_MULTICAST;
415 sink->ttl = DEFAULT_TTL;
416 sink->ttl_mc = DEFAULT_TTL_MC;
417 sink->loop = DEFAULT_LOOP;
418 sink->force_ipv4 = DEFAULT_FORCE_IPV4;
419 sink->qos_dscp = DEFAULT_QOS_DSCP;
420 sink->send_duplicates = DEFAULT_SEND_DUPLICATES;
421 sink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
422
423 gst_multiudpsink_create_cancellable (sink);
424
425 /* pre-allocate OutputVector, MapInfo and OutputMessage arrays
426 * for use in the render and render_list functions */
427 max_mem = gst_buffer_get_max_memory ();
428
429 sink->n_vecs = max_mem;
430 sink->vecs = g_new (GOutputVector, sink->n_vecs);
431
432 sink->n_maps = max_mem;
433 sink->maps = g_new (GstMapInfo, sink->n_maps);
434
435 sink->n_messages = 1;
436 sink->messages = g_new (GstOutputMessage, sink->n_messages);
437
438 /* we assume that the number of memories per buffer can fit into a guint8 */
439 g_warn_if_fail (max_mem <= G_MAXUINT8);
440 }
441
442 static GstUDPClient *
gst_udp_client_new(GstMultiUDPSink * sink,const gchar * host,gint port)443 gst_udp_client_new (GstMultiUDPSink * sink, const gchar * host, gint port)
444 {
445 GstUDPClient *client;
446 GInetAddress *addr;
447 GSocketAddress *sockaddr;
448 GResolver *resolver;
449 GError *err = NULL;
450
451 sockaddr = g_inet_socket_address_new_from_string (host, port);
452 if (!sockaddr) {
453 GList *results;
454
455 resolver = g_resolver_get_default ();
456 results =
457 g_resolver_lookup_by_name (resolver, host, sink->cancellable, &err);
458 if (!results)
459 goto name_resolve;
460 addr = G_INET_ADDRESS (g_object_ref (results->data));
461 sockaddr = g_inet_socket_address_new (addr, port);
462
463 g_resolver_free_addresses (results);
464 g_object_unref (resolver);
465 g_object_unref (addr);
466 }
467 addr = g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (sockaddr));
468 #ifndef GST_DISABLE_GST_DEBUG
469 {
470 gchar *ip = g_inet_address_to_string (addr);
471
472 GST_DEBUG_OBJECT (sink, "IP address for host %s is %s", host, ip);
473 g_free (ip);
474 }
475 #endif
476
477 client = g_slice_new0 (GstUDPClient);
478 client->ref_count = 1;
479 client->add_count = 0;
480 client->host = g_strdup (host);
481 client->port = port;
482 client->addr = sockaddr;
483
484 return client;
485
486 name_resolve:
487 {
488 g_clear_error (&err);
489 g_object_unref (resolver);
490
491 return NULL;
492 }
493 }
494
495 /* call with client lock held */
496 static void
gst_udp_client_unref(GstUDPClient * client)497 gst_udp_client_unref (GstUDPClient * client)
498 {
499 if (--client->ref_count == 0) {
500 g_object_unref (client->addr);
501 g_free (client->host);
502 g_slice_free (GstUDPClient, client);
503 }
504 }
505
506 /* call with client lock held */
507 static inline GstUDPClient *
gst_udp_client_ref(GstUDPClient * client)508 gst_udp_client_ref (GstUDPClient * client)
509 {
510 ++client->ref_count;
511 return client;
512 }
513
514 static gint
client_compare(GstUDPClient * a,GstUDPClient * b)515 client_compare (GstUDPClient * a, GstUDPClient * b)
516 {
517 if ((a->port == b->port) && (strcmp (a->host, b->host) == 0))
518 return 0;
519
520 return 1;
521 }
522
523 static void
gst_multiudpsink_finalize(GObject * object)524 gst_multiudpsink_finalize (GObject * object)
525 {
526 GstMultiUDPSink *sink;
527
528 sink = GST_MULTIUDPSINK (object);
529
530 g_list_foreach (sink->clients, (GFunc) gst_udp_client_unref, NULL);
531 g_list_free (sink->clients);
532
533 if (sink->socket)
534 g_object_unref (sink->socket);
535 sink->socket = NULL;
536
537 if (sink->socket_v6)
538 g_object_unref (sink->socket_v6);
539 sink->socket_v6 = NULL;
540
541 if (sink->used_socket)
542 g_object_unref (sink->used_socket);
543 sink->used_socket = NULL;
544
545 if (sink->used_socket_v6)
546 g_object_unref (sink->used_socket_v6);
547 sink->used_socket_v6 = NULL;
548
549 gst_multiudpsink_free_cancellable (sink);
550
551 g_free (sink->multi_iface);
552 sink->multi_iface = NULL;
553
554 g_free (sink->vecs);
555 sink->vecs = NULL;
556 g_free (sink->maps);
557 sink->maps = NULL;
558 g_free (sink->messages);
559 sink->messages = NULL;
560
561 g_free (sink->bind_address);
562 sink->bind_address = NULL;
563
564 g_mutex_clear (&sink->client_lock);
565
566 G_OBJECT_CLASS (parent_class)->finalize (object);
567 }
568
569 static gsize
fill_vectors(GOutputVector * vecs,GstMapInfo * maps,guint n,GstBuffer * buf)570 fill_vectors (GOutputVector * vecs, GstMapInfo * maps, guint n, GstBuffer * buf)
571 {
572 GstMemory *mem;
573 gsize size = 0;
574 guint i;
575
576 g_assert (gst_buffer_n_memory (buf) == n);
577
578 for (i = 0; i < n; ++i) {
579 mem = gst_buffer_peek_memory (buf, i);
580 if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
581 vecs[i].buffer = maps[i].data;
582 vecs[i].size = maps[i].size;
583 } else {
584 GST_WARNING ("Failed to map memory %p for reading", mem);
585 vecs[i].buffer = "";
586 vecs[i].size = 0;
587 }
588 size += vecs[i].size;
589 }
590
591 return size;
592 }
593
594 static gsize
gst_udp_calc_message_size(GstOutputMessage * msg)595 gst_udp_calc_message_size (GstOutputMessage * msg)
596 {
597 gsize size = 0;
598 guint i;
599
600 for (i = 0; i < msg->num_vectors; ++i)
601 size += msg->vectors[i].size;
602
603 return size;
604 }
605
606 static gint
gst_udp_messsages_find_first_not_sent(GstOutputMessage * messages,guint num_messages)607 gst_udp_messsages_find_first_not_sent (GstOutputMessage * messages,
608 guint num_messages)
609 {
610 guint i;
611
612 for (i = 0; i < num_messages; ++i) {
613 GstOutputMessage *msg = &messages[i];
614
615 if (msg->bytes_sent == 0 && gst_udp_calc_message_size (msg) > 0)
616 return i;
617 }
618
619 return -1;
620 }
621
622 static inline gchar *
gst_udp_address_get_string(GSocketAddress * addr,gchar * s,gsize size)623 gst_udp_address_get_string (GSocketAddress * addr, gchar * s, gsize size)
624 {
625 GInetSocketAddress *isa = G_INET_SOCKET_ADDRESS (addr);
626 GInetAddress *ia;
627 gchar *addr_str;
628
629 ia = g_inet_socket_address_get_address (isa);
630 addr_str = g_inet_address_to_string (ia);
631 g_snprintf (s, size, "%s:%u", addr_str, g_inet_socket_address_get_port (isa));
632 g_free (addr_str);
633
634 return s;
635 }
636
637 /* Wrapper around g_socket_send_messages() plus error handling (ignoring).
638 * Returns FALSE if we got cancelled, otherwise TRUE. */
639 static GstFlowReturn
gst_multiudpsink_send_messages(GstMultiUDPSink * sink,GSocket * socket,GstOutputMessage * messages,guint num_messages)640 gst_multiudpsink_send_messages (GstMultiUDPSink * sink, GSocket * socket,
641 GstOutputMessage * messages, guint num_messages)
642 {
643 gboolean sent_max_size_warning = FALSE;
644
645 while (num_messages > 0) {
646 gchar astr[64] G_GNUC_UNUSED;
647 GError *err = NULL;
648 guint msg_size, skip, i;
649 gint ret, err_idx;
650
651 ret = g_socket_send_messages (socket, messages, num_messages, 0,
652 sink->cancellable, &err);
653
654 if (G_UNLIKELY (ret < 0)) {
655 GstOutputMessage *msg;
656
657 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
658 GstFlowReturn flow_ret;
659
660 g_clear_error (&err);
661
662 flow_ret = gst_base_sink_wait_preroll (GST_BASE_SINK (sink));
663
664 if (flow_ret == GST_FLOW_OK)
665 continue;
666
667 return flow_ret;
668 }
669
670 err_idx = gst_udp_messsages_find_first_not_sent (messages, num_messages);
671 if (err_idx < 0)
672 break;
673
674 msg = &messages[err_idx];
675 msg_size = gst_udp_calc_message_size (msg);
676
677 GST_LOG_OBJECT (sink, "error sending %u bytes to client %s: %s", msg_size,
678 gst_udp_address_get_string (msg->address, astr, sizeof (astr)),
679 err->message);
680
681 skip = 1;
682 if (msg_size > UDP_MAX_SIZE) {
683 if (!sent_max_size_warning) {
684 GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
685 ("Attempting to send a UDP packets larger than maximum size "
686 "(%u > %d)", msg_size, UDP_MAX_SIZE),
687 ("Reason: %s", err ? err->message : "unknown reason"));
688 sent_max_size_warning = FALSE;
689 }
690 } else {
691 GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
692 ("Error sending UDP packets"), ("client %s, reason: %s",
693 gst_udp_address_get_string (msg->address, astr, sizeof (astr)),
694 (err != NULL) ? err->message : "unknown reason"));
695
696 for (i = err_idx + 1; i < num_messages; ++i, ++skip) {
697 if (messages[i].address != msg->address)
698 break;
699 }
700 GST_DEBUG_OBJECT (sink, "skipping %d message(s) to same client", skip);
701 }
702
703 /* ignore any errors and try sending the rest */
704 g_clear_error (&err);
705 ret = skip;
706 }
707
708 g_assert (ret <= num_messages);
709
710 messages += ret;
711 num_messages -= ret;
712 }
713
714 return GST_FLOW_OK;
715 }
716
717 static GstFlowReturn
gst_multiudpsink_render_buffers(GstMultiUDPSink * sink,GstBuffer ** buffers,guint num_buffers,guint8 * mem_nums,guint total_mem_num)718 gst_multiudpsink_render_buffers (GstMultiUDPSink * sink, GstBuffer ** buffers,
719 guint num_buffers, guint8 * mem_nums, guint total_mem_num)
720 {
721 GstOutputMessage *msgs;
722 gboolean send_duplicates;
723 GstUDPClient **clients;
724 GOutputVector *vecs;
725 GstMapInfo *map_infos;
726 GstFlowReturn flow_ret;
727 guint num_addr_v4, num_addr_v6;
728 guint num_addr, num_msgs;
729 guint i, j, mem;
730 gsize size = 0;
731 GList *l;
732
733 send_duplicates = sink->send_duplicates;
734
735 g_mutex_lock (&sink->client_lock);
736
737 if (send_duplicates) {
738 num_addr_v4 = sink->num_v4_all;
739 num_addr_v6 = sink->num_v6_all;
740 } else {
741 num_addr_v4 = sink->num_v4_unique;
742 num_addr_v6 = sink->num_v6_unique;
743 }
744 num_addr = num_addr_v4 + num_addr_v6;
745
746 if (num_addr == 0)
747 goto no_clients;
748
749 clients = g_newa (GstUDPClient *, num_addr);
750 for (l = sink->clients, i = 0; l != NULL; l = l->next) {
751 GstUDPClient *client = l->data;
752
753 clients[i++] = gst_udp_client_ref (client);
754 for (j = 1; send_duplicates && j < client->add_count; ++j)
755 clients[i++] = gst_udp_client_ref (client);
756 }
757 g_assert_cmpuint (i, ==, num_addr);
758
759 g_mutex_unlock (&sink->client_lock);
760
761 GST_LOG_OBJECT (sink, "%u buffers, %u memories -> to be sent to %u clients",
762 num_buffers, total_mem_num, num_addr);
763
764 /* ensure our pre-allocated scratch space arrays are large enough */
765 if (sink->n_vecs < total_mem_num) {
766 sink->n_vecs = GST_ROUND_UP_16 (total_mem_num);
767 g_free (sink->vecs);
768 sink->vecs = g_new (GOutputVector, sink->n_vecs);
769 }
770 vecs = sink->vecs;
771
772 if (sink->n_maps < total_mem_num) {
773 sink->n_maps = GST_ROUND_UP_16 (total_mem_num);
774 g_free (sink->maps);
775 sink->maps = g_new (GstMapInfo, sink->n_maps);
776 }
777 map_infos = sink->maps;
778
779 num_msgs = num_addr * num_buffers;
780 if (sink->n_messages < num_msgs) {
781 sink->n_messages = GST_ROUND_UP_16 (num_msgs);
782 g_free (sink->messages);
783 sink->messages = g_new (GstOutputMessage, sink->n_messages);
784 }
785 msgs = sink->messages;
786
787 /* populate first num_buffers messages with output vectors for the buffers */
788 for (i = 0, mem = 0; i < num_buffers; ++i) {
789 size += fill_vectors (&vecs[mem], &map_infos[mem], mem_nums[i], buffers[i]);
790 msgs[i].vectors = &vecs[mem];
791 msgs[i].num_vectors = mem_nums[i];
792 msgs[i].num_control_messages = 0;
793 msgs[i].bytes_sent = 0;
794 msgs[i].control_messages = NULL;
795 msgs[i].address = clients[0]->addr;
796 mem += mem_nums[i];
797 }
798
799 /* FIXME: how about some locking? (there wasn't any before either, but..) */
800 sink->bytes_to_serve += size;
801
802 /* now copy the pre-filled num_buffer messages over to the next num_buffer
803 * messages for the next client, where we also change the target address */
804 for (i = 1; i < num_addr; ++i) {
805 for (j = 0; j < num_buffers; ++j) {
806 msgs[i * num_buffers + j] = msgs[j];
807 msgs[i * num_buffers + j].address = clients[i]->addr;
808 }
809 }
810
811 /* now send it! */
812
813 /* no IPv4 socket? Send it all from the IPv6 socket then.. */
814 if (sink->used_socket == NULL) {
815 flow_ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6,
816 msgs, num_msgs);
817 } else {
818 guint num_msgs_v4 = num_buffers * num_addr_v4;
819 guint num_msgs_v6 = num_buffers * num_addr_v6;
820
821 /* our client list is sorted with IPv4 clients first and IPv6 ones last */
822 flow_ret = gst_multiudpsink_send_messages (sink, sink->used_socket,
823 msgs, num_msgs_v4);
824
825 if (flow_ret != GST_FLOW_OK)
826 goto cancelled;
827
828 flow_ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6,
829 msgs + num_msgs_v4, num_msgs_v6);
830 }
831
832 if (flow_ret != GST_FLOW_OK)
833 goto cancelled;
834
835 /* now update stats */
836 g_mutex_lock (&sink->client_lock);
837
838 for (i = 0; i < num_addr; ++i) {
839 GstUDPClient *client = clients[i];
840
841 for (j = 0; j < num_buffers; ++j) {
842 gsize bytes_sent;
843
844 bytes_sent = msgs[i * num_buffers + j].bytes_sent;
845
846 client->bytes_sent += bytes_sent;
847 client->packets_sent++;
848 sink->bytes_served += bytes_sent;
849 }
850 gst_udp_client_unref (client);
851 }
852
853 g_mutex_unlock (&sink->client_lock);
854
855 out:
856
857 for (i = 0; i < mem; ++i)
858 gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
859
860 return flow_ret;
861
862 no_clients:
863 {
864 g_mutex_unlock (&sink->client_lock);
865 GST_LOG_OBJECT (sink, "no clients");
866 return GST_FLOW_OK;
867 }
868 cancelled:
869 {
870 GST_INFO_OBJECT (sink, "cancelled");
871
872 g_mutex_lock (&sink->client_lock);
873 for (i = 0; i < num_addr; ++i)
874 gst_udp_client_unref (clients[i]);
875 g_mutex_unlock (&sink->client_lock);
876 goto out;
877 }
878 }
879
880 static GstFlowReturn
gst_multiudpsink_render_list(GstBaseSink * bsink,GstBufferList * buffer_list)881 gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
882 {
883 GstMultiUDPSink *sink;
884 GstBuffer **buffers;
885 GstFlowReturn flow;
886 guint8 *mem_nums;
887 guint total_mems;
888 guint i, num_buffers;
889
890 sink = GST_MULTIUDPSINK_CAST (bsink);
891
892 num_buffers = gst_buffer_list_length (buffer_list);
893 if (num_buffers == 0)
894 goto no_data;
895
896 buffers = g_newa (GstBuffer *, num_buffers);
897 mem_nums = g_newa (guint8, num_buffers);
898 for (i = 0, total_mems = 0; i < num_buffers; ++i) {
899 buffers[i] = gst_buffer_list_get (buffer_list, i);
900 mem_nums[i] = gst_buffer_n_memory (buffers[i]);
901 total_mems += mem_nums[i];
902 }
903
904 flow = gst_multiudpsink_render_buffers (sink, buffers, num_buffers,
905 mem_nums, total_mems);
906
907 return flow;
908
909 no_data:
910 {
911 GST_LOG_OBJECT (sink, "empty buffer");
912 return GST_FLOW_OK;
913 }
914 }
915
916 static GstFlowReturn
gst_multiudpsink_render(GstBaseSink * bsink,GstBuffer * buffer)917 gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
918 {
919 GstMultiUDPSink *sink;
920 GstFlowReturn flow;
921 guint8 n_mem;
922
923 sink = GST_MULTIUDPSINK_CAST (bsink);
924
925 n_mem = gst_buffer_n_memory (buffer);
926
927 if (n_mem > 0)
928 flow = gst_multiudpsink_render_buffers (sink, &buffer, 1, &n_mem, n_mem);
929 else
930 flow = GST_FLOW_OK;
931
932 return flow;
933 }
934
935 static void
gst_multiudpsink_set_clients_string(GstMultiUDPSink * sink,const gchar * string)936 gst_multiudpsink_set_clients_string (GstMultiUDPSink * sink,
937 const gchar * string)
938 {
939 gchar **clients;
940 gint i;
941
942 clients = g_strsplit (string, ",", 0);
943
944 g_mutex_lock (&sink->client_lock);
945 /* clear all existing clients */
946 gst_multiudpsink_clear_internal (sink, FALSE);
947 for (i = 0; clients[i]; i++) {
948 gchar *host, *p;
949 gint64 port = 0;
950
951 host = clients[i];
952 p = strstr (clients[i], ":");
953 if (p != NULL) {
954 *p = '\0';
955 port = g_ascii_strtoll (p + 1, NULL, 10);
956 }
957 if (port != 0)
958 gst_multiudpsink_add_internal (sink, host, port, FALSE);
959 }
960 g_mutex_unlock (&sink->client_lock);
961
962 g_strfreev (clients);
963 }
964
965 static gchar *
gst_multiudpsink_get_clients_string(GstMultiUDPSink * sink)966 gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink)
967 {
968 GString *str;
969 GList *clients;
970
971 str = g_string_new ("");
972
973 g_mutex_lock (&sink->client_lock);
974 clients = sink->clients;
975 while (clients) {
976 GstUDPClient *client;
977 gint count;
978
979 client = (GstUDPClient *) clients->data;
980
981 clients = g_list_next (clients);
982
983 count = client->add_count;
984 while (count--) {
985 g_string_append_printf (str, "%s:%d%s", client->host, client->port,
986 (clients || count > 1 ? "," : ""));
987 }
988 }
989 g_mutex_unlock (&sink->client_lock);
990
991 return g_string_free (str, FALSE);
992 }
993
994 static void
gst_multiudpsink_setup_qos_dscp(GstMultiUDPSink * sink,GSocket * socket)995 gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink, GSocket * socket)
996 {
997 /* don't touch on -1 */
998 if (sink->qos_dscp < 0)
999 return;
1000
1001 if (socket == NULL)
1002 return;
1003
1004 if (!gst_net_utils_set_socket_tos (socket, sink->qos_dscp))
1005 GST_ERROR_OBJECT (sink, "could not set qos dscp: %d", sink->qos_dscp);
1006 }
1007
1008 static void
gst_multiudpsink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1009 gst_multiudpsink_set_property (GObject * object, guint prop_id,
1010 const GValue * value, GParamSpec * pspec)
1011 {
1012 GstMultiUDPSink *udpsink;
1013
1014 udpsink = GST_MULTIUDPSINK (object);
1015
1016 switch (prop_id) {
1017 case PROP_SOCKET:
1018 if (udpsink->socket != NULL && udpsink->socket != udpsink->used_socket &&
1019 udpsink->close_socket) {
1020 GError *err = NULL;
1021
1022 if (!g_socket_close (udpsink->socket, &err)) {
1023 GST_ERROR ("failed to close socket %p: %s", udpsink->socket,
1024 err->message);
1025 g_clear_error (&err);
1026 }
1027 }
1028 if (udpsink->socket)
1029 g_object_unref (udpsink->socket);
1030 udpsink->socket = g_value_dup_object (value);
1031 GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket);
1032 break;
1033 case PROP_SOCKET_V6:
1034 if (udpsink->socket_v6 != NULL
1035 && udpsink->socket_v6 != udpsink->used_socket_v6
1036 && udpsink->close_socket) {
1037 GError *err = NULL;
1038
1039 if (!g_socket_close (udpsink->socket_v6, &err)) {
1040 GST_ERROR ("failed to close socket %p: %s", udpsink->socket_v6,
1041 err->message);
1042 g_clear_error (&err);
1043 }
1044 }
1045 if (udpsink->socket_v6)
1046 g_object_unref (udpsink->socket_v6);
1047 udpsink->socket_v6 = g_value_dup_object (value);
1048 GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket_v6);
1049 break;
1050 case PROP_CLOSE_SOCKET:
1051 udpsink->close_socket = g_value_get_boolean (value);
1052 break;
1053 case PROP_CLIENTS:
1054 gst_multiudpsink_set_clients_string (udpsink, g_value_get_string (value));
1055 break;
1056 case PROP_AUTO_MULTICAST:
1057 udpsink->auto_multicast = g_value_get_boolean (value);
1058 break;
1059 case PROP_MULTICAST_IFACE:
1060 g_free (udpsink->multi_iface);
1061
1062 if (g_value_get_string (value) == NULL)
1063 udpsink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
1064 else
1065 udpsink->multi_iface = g_value_dup_string (value);
1066 break;
1067 case PROP_TTL:
1068 udpsink->ttl = g_value_get_int (value);
1069 break;
1070 case PROP_TTL_MC:
1071 udpsink->ttl_mc = g_value_get_int (value);
1072 break;
1073 case PROP_LOOP:
1074 udpsink->loop = g_value_get_boolean (value);
1075 break;
1076 case PROP_FORCE_IPV4:
1077 udpsink->force_ipv4 = g_value_get_boolean (value);
1078 break;
1079 case PROP_QOS_DSCP:
1080 udpsink->qos_dscp = g_value_get_int (value);
1081 gst_multiudpsink_setup_qos_dscp (udpsink, udpsink->used_socket);
1082 gst_multiudpsink_setup_qos_dscp (udpsink, udpsink->used_socket_v6);
1083 break;
1084 case PROP_SEND_DUPLICATES:
1085 udpsink->send_duplicates = g_value_get_boolean (value);
1086 break;
1087 case PROP_BUFFER_SIZE:
1088 udpsink->buffer_size = g_value_get_int (value);
1089 break;
1090 case PROP_BIND_ADDRESS:
1091 g_free (udpsink->bind_address);
1092 udpsink->bind_address = g_value_dup_string (value);
1093 break;
1094 case PROP_BIND_PORT:
1095 udpsink->bind_port = g_value_get_int (value);
1096 break;
1097 default:
1098 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1099 break;
1100 }
1101 }
1102
1103 static void
gst_multiudpsink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1104 gst_multiudpsink_get_property (GObject * object, guint prop_id, GValue * value,
1105 GParamSpec * pspec)
1106 {
1107 GstMultiUDPSink *udpsink;
1108
1109 udpsink = GST_MULTIUDPSINK (object);
1110
1111 switch (prop_id) {
1112 case PROP_BYTES_TO_SERVE:
1113 g_value_set_uint64 (value, udpsink->bytes_to_serve);
1114 break;
1115 case PROP_BYTES_SERVED:
1116 g_value_set_uint64 (value, udpsink->bytes_served);
1117 break;
1118 case PROP_SOCKET:
1119 g_value_set_object (value, udpsink->socket);
1120 break;
1121 case PROP_SOCKET_V6:
1122 g_value_set_object (value, udpsink->socket_v6);
1123 break;
1124 case PROP_CLOSE_SOCKET:
1125 g_value_set_boolean (value, udpsink->close_socket);
1126 break;
1127 case PROP_USED_SOCKET:
1128 g_value_set_object (value, udpsink->used_socket);
1129 break;
1130 case PROP_USED_SOCKET_V6:
1131 g_value_set_object (value, udpsink->used_socket_v6);
1132 break;
1133 case PROP_CLIENTS:
1134 g_value_take_string (value,
1135 gst_multiudpsink_get_clients_string (udpsink));
1136 break;
1137 case PROP_AUTO_MULTICAST:
1138 g_value_set_boolean (value, udpsink->auto_multicast);
1139 break;
1140 case PROP_MULTICAST_IFACE:
1141 g_value_set_string (value, udpsink->multi_iface);
1142 break;
1143 case PROP_TTL:
1144 g_value_set_int (value, udpsink->ttl);
1145 break;
1146 case PROP_TTL_MC:
1147 g_value_set_int (value, udpsink->ttl_mc);
1148 break;
1149 case PROP_LOOP:
1150 g_value_set_boolean (value, udpsink->loop);
1151 break;
1152 case PROP_FORCE_IPV4:
1153 g_value_set_boolean (value, udpsink->force_ipv4);
1154 break;
1155 case PROP_QOS_DSCP:
1156 g_value_set_int (value, udpsink->qos_dscp);
1157 break;
1158 case PROP_SEND_DUPLICATES:
1159 g_value_set_boolean (value, udpsink->send_duplicates);
1160 break;
1161 case PROP_BUFFER_SIZE:
1162 g_value_set_int (value, udpsink->buffer_size);
1163 break;
1164 case PROP_BIND_ADDRESS:
1165 g_value_set_string (value, udpsink->bind_address);
1166 break;
1167 case PROP_BIND_PORT:
1168 g_value_set_int (value, udpsink->bind_port);
1169 break;
1170 default:
1171 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1172 break;
1173 }
1174 }
1175
1176 static gboolean
gst_multiudpsink_configure_client(GstMultiUDPSink * sink,GstUDPClient * client)1177 gst_multiudpsink_configure_client (GstMultiUDPSink * sink,
1178 GstUDPClient * client)
1179 {
1180 GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr);
1181 GInetAddress *addr = g_inet_socket_address_get_address (saddr);
1182 GSocketFamily family = g_socket_address_get_family (G_SOCKET_ADDRESS (saddr));
1183 GSocket *socket;
1184 GError *err = NULL;
1185
1186 GST_DEBUG_OBJECT (sink, "configuring client %p", client);
1187
1188 if (family == G_SOCKET_FAMILY_IPV6 && !sink->used_socket_v6)
1189 goto invalid_family;
1190
1191 /* Select socket to send from for this address */
1192 if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
1193 socket = sink->used_socket_v6;
1194 else
1195 socket = sink->used_socket;
1196
1197 if (g_inet_address_get_is_multicast (addr)) {
1198 GST_DEBUG_OBJECT (sink, "we have a multicast client %p", client);
1199 if (sink->auto_multicast) {
1200 GST_DEBUG_OBJECT (sink, "autojoining group");
1201 if (!g_socket_join_multicast_group (socket, addr, FALSE,
1202 sink->multi_iface, &err))
1203 goto join_group_failed;
1204 }
1205 GST_DEBUG_OBJECT (sink, "setting loop to %d", sink->loop);
1206 g_socket_set_multicast_loopback (socket, sink->loop);
1207 GST_DEBUG_OBJECT (sink, "setting ttl to %d", sink->ttl_mc);
1208 g_socket_set_multicast_ttl (socket, sink->ttl_mc);
1209 } else {
1210 GST_DEBUG_OBJECT (sink, "setting unicast ttl to %d", sink->ttl);
1211 g_socket_set_ttl (socket, sink->ttl);
1212 }
1213 return TRUE;
1214
1215 /* ERRORS */
1216 join_group_failed:
1217 {
1218 gst_multiudpsink_stop (GST_BASE_SINK (sink));
1219 GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
1220 ("Could not join multicast group: %s",
1221 err ? err->message : "unknown reason"));
1222 g_clear_error (&err);
1223 return FALSE;
1224 }
1225 invalid_family:
1226 {
1227 gst_multiudpsink_stop (GST_BASE_SINK (sink));
1228 GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
1229 ("Invalid address family (got %d)", family));
1230 return FALSE;
1231 }
1232 }
1233
1234 /* create a socket for sending to remote machine */
1235 static gboolean
gst_multiudpsink_start(GstBaseSink * bsink)1236 gst_multiudpsink_start (GstBaseSink * bsink)
1237 {
1238 GstMultiUDPSink *sink;
1239 GList *clients;
1240 GstUDPClient *client;
1241 GError *err = NULL;
1242
1243 sink = GST_MULTIUDPSINK (bsink);
1244
1245 sink->external_socket = FALSE;
1246
1247 if (sink->socket) {
1248 GST_DEBUG_OBJECT (sink, "using configured socket");
1249 if (g_socket_get_family (sink->socket) == G_SOCKET_FAMILY_IPV6) {
1250 sink->used_socket_v6 = G_SOCKET (g_object_ref (sink->socket));
1251 sink->external_socket = TRUE;
1252 } else {
1253 sink->used_socket = G_SOCKET (g_object_ref (sink->socket));
1254 sink->external_socket = TRUE;
1255 }
1256 }
1257
1258 if (sink->socket_v6) {
1259 GST_DEBUG_OBJECT (sink, "using configured IPv6 socket");
1260 g_return_val_if_fail (!sink->socket || g_socket_get_family (sink->socket) !=
1261 G_SOCKET_FAMILY_IPV6, FALSE);
1262
1263 if (sink->used_socket_v6 && sink->used_socket_v6 != sink->socket_v6) {
1264 GST_ERROR_OBJECT (sink,
1265 "Provided different IPv6 sockets in socket and socket-v6 properties");
1266 return FALSE;
1267 }
1268
1269 sink->used_socket_v6 = G_SOCKET (g_object_ref (sink->socket_v6));
1270 sink->external_socket = TRUE;
1271 }
1272
1273 if (!sink->used_socket && !sink->used_socket_v6) {
1274 GSocketAddress *bind_addr;
1275 GInetAddress *bind_iaddr;
1276
1277 if (sink->bind_address) {
1278 GSocketFamily family;
1279
1280 bind_iaddr = g_inet_address_new_from_string (sink->bind_address);
1281 if (!bind_iaddr) {
1282 GList *results;
1283 GResolver *resolver;
1284
1285 resolver = g_resolver_get_default ();
1286 results =
1287 g_resolver_lookup_by_name (resolver, sink->bind_address,
1288 sink->cancellable, &err);
1289 if (!results) {
1290 g_object_unref (resolver);
1291 goto name_resolve;
1292 }
1293 bind_iaddr = G_INET_ADDRESS (g_object_ref (results->data));
1294 g_resolver_free_addresses (results);
1295 g_object_unref (resolver);
1296 }
1297
1298 bind_addr = g_inet_socket_address_new (bind_iaddr, sink->bind_port);
1299 g_object_unref (bind_iaddr);
1300 family = g_socket_address_get_family (G_SOCKET_ADDRESS (bind_addr));
1301
1302 if ((sink->used_socket =
1303 g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1304 G_SOCKET_PROTOCOL_UDP, &err)) == NULL) {
1305 g_object_unref (bind_addr);
1306 goto no_socket;
1307 }
1308
1309 g_socket_bind (sink->used_socket, bind_addr, TRUE, &err);
1310 g_object_unref (bind_addr);
1311 if (err != NULL)
1312 goto bind_error;
1313 } else {
1314 /* create sender sockets if none available */
1315 if ((sink->used_socket = g_socket_new (G_SOCKET_FAMILY_IPV4,
1316 G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
1317 goto no_socket;
1318
1319 bind_iaddr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
1320 bind_addr = g_inet_socket_address_new (bind_iaddr, sink->bind_port);
1321 g_socket_bind (sink->used_socket, bind_addr, TRUE, &err);
1322 g_object_unref (bind_addr);
1323 g_object_unref (bind_iaddr);
1324 if (err != NULL)
1325 goto bind_error;
1326
1327 if ((sink->used_socket_v6 = g_socket_new (G_SOCKET_FAMILY_IPV6,
1328 G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP,
1329 &err)) == NULL) {
1330 GST_INFO_OBJECT (sink, "Failed to create IPv6 socket: %s",
1331 err->message);
1332 g_clear_error (&err);
1333 } else {
1334 bind_iaddr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV6);
1335 bind_addr = g_inet_socket_address_new (bind_iaddr, sink->bind_port);
1336 g_socket_bind (sink->used_socket_v6, bind_addr, TRUE, &err);
1337 g_object_unref (bind_addr);
1338 g_object_unref (bind_iaddr);
1339 if (err != NULL)
1340 goto bind_error;
1341 }
1342 }
1343 }
1344 #ifdef SO_SNDBUF
1345 {
1346 gint sndsize;
1347 GError *opt_err = NULL;
1348
1349 if (sink->buffer_size != 0) {
1350 sndsize = sink->buffer_size;
1351
1352 GST_DEBUG_OBJECT (sink, "setting udp buffer of %d bytes", sndsize);
1353 /* set buffer size, Note that on Linux this is typically limited to a
1354 * maximum of around 100K. Also a minimum of 128 bytes is required on
1355 * Linux. */
1356
1357 if (sink->used_socket) {
1358 if (!g_socket_set_option (sink->used_socket, SOL_SOCKET, SO_SNDBUF,
1359 sndsize, &opt_err)) {
1360 GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL),
1361 ("Could not create a buffer of requested %d bytes (%s)",
1362 sndsize, opt_err->message));
1363 g_clear_error (&opt_err);
1364 }
1365 }
1366
1367 if (sink->used_socket_v6) {
1368 if (!g_socket_set_option (sink->used_socket_v6, SOL_SOCKET, SO_SNDBUF,
1369 sndsize, &opt_err)) {
1370 GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL),
1371 ("Could not create a buffer of requested %d bytes (%s)",
1372 sndsize, opt_err->message));
1373 g_clear_error (&opt_err);
1374 }
1375 }
1376 }
1377
1378 /* read the value of the receive buffer. Note that on linux this returns 2x the
1379 * value we set because the kernel allocates extra memory for metadata.
1380 * The default on Linux is about 100K (which is about 50K without metadata) */
1381 if (sink->used_socket) {
1382 if (g_socket_get_option (sink->used_socket, SOL_SOCKET, SO_SNDBUF,
1383 &sndsize, NULL)) {
1384 GST_DEBUG_OBJECT (sink, "have UDP buffer of %d bytes", sndsize);
1385 } else {
1386 GST_DEBUG_OBJECT (sink, "could not get UDP buffer size");
1387 }
1388 }
1389
1390 if (sink->used_socket_v6) {
1391 if (g_socket_get_option (sink->used_socket_v6, SOL_SOCKET, SO_SNDBUF,
1392 &sndsize, NULL)) {
1393 GST_DEBUG_OBJECT (sink, "have UDPv6 buffer of %d bytes", sndsize);
1394 } else {
1395 GST_DEBUG_OBJECT (sink, "could not get UDPv6 buffer size");
1396 }
1397 }
1398 }
1399 #endif
1400
1401 #ifdef SO_BINDTODEVICE
1402 if (sink->multi_iface) {
1403 if (sink->used_socket) {
1404 if (setsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET,
1405 SO_BINDTODEVICE, sink->multi_iface,
1406 strlen (sink->multi_iface)) < 0)
1407 GST_WARNING_OBJECT (sink, "setsockopt SO_BINDTODEVICE failed: %s",
1408 strerror (errno));
1409 }
1410 if (sink->used_socket_v6) {
1411 if (setsockopt (g_socket_get_fd (sink->used_socket_v6), SOL_SOCKET,
1412 SO_BINDTODEVICE, sink->multi_iface,
1413 strlen (sink->multi_iface)) < 0)
1414 GST_WARNING_OBJECT (sink, "setsockopt SO_BINDTODEVICE failed (v6): %s",
1415 strerror (errno));
1416 }
1417 }
1418 #endif
1419
1420 if (sink->used_socket)
1421 g_socket_set_broadcast (sink->used_socket, TRUE);
1422 if (sink->used_socket_v6)
1423 g_socket_set_broadcast (sink->used_socket_v6, TRUE);
1424
1425 sink->bytes_to_serve = 0;
1426 sink->bytes_served = 0;
1427
1428 gst_multiudpsink_setup_qos_dscp (sink, sink->used_socket);
1429 gst_multiudpsink_setup_qos_dscp (sink, sink->used_socket_v6);
1430
1431 /* look for multicast clients and join multicast groups appropriately
1432 set also ttl and multicast loopback delivery appropriately */
1433 for (clients = sink->clients; clients; clients = g_list_next (clients)) {
1434 client = (GstUDPClient *) clients->data;
1435
1436 if (!gst_multiudpsink_configure_client (sink, client))
1437 return FALSE;
1438 }
1439 return TRUE;
1440
1441 /* ERRORS */
1442 no_socket:
1443 {
1444 GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1445 ("Could not create socket: %s", err->message));
1446 g_clear_error (&err);
1447 return FALSE;
1448 }
1449 bind_error:
1450 {
1451 GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1452 ("Failed to bind socket: %s", err->message));
1453 g_clear_error (&err);
1454 return FALSE;
1455 }
1456 name_resolve:
1457 {
1458 GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1459 ("Failed to resolve bind address %s: %s", sink->bind_address,
1460 err->message));
1461 g_clear_error (&err);
1462 return FALSE;
1463 }
1464 }
1465
1466 static gboolean
gst_multiudpsink_stop(GstBaseSink * bsink)1467 gst_multiudpsink_stop (GstBaseSink * bsink)
1468 {
1469 GstMultiUDPSink *udpsink;
1470
1471 udpsink = GST_MULTIUDPSINK (bsink);
1472
1473 if (udpsink->used_socket) {
1474 if (udpsink->close_socket || !udpsink->external_socket) {
1475 GError *err = NULL;
1476
1477 if (!g_socket_close (udpsink->used_socket, &err)) {
1478 GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
1479 g_clear_error (&err);
1480 }
1481 }
1482
1483 g_object_unref (udpsink->used_socket);
1484 udpsink->used_socket = NULL;
1485 }
1486
1487 if (udpsink->used_socket_v6) {
1488 if (udpsink->close_socket || !udpsink->external_socket) {
1489 GError *err = NULL;
1490
1491 if (!g_socket_close (udpsink->used_socket_v6, &err)) {
1492 GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
1493 g_clear_error (&err);
1494 }
1495 }
1496
1497 g_object_unref (udpsink->used_socket_v6);
1498 udpsink->used_socket_v6 = NULL;
1499 }
1500
1501 return TRUE;
1502 }
1503
1504 static gint
gst_udp_client_compare_socket_family(GstUDPClient * a,GstUDPClient * b)1505 gst_udp_client_compare_socket_family (GstUDPClient * a, GstUDPClient * b)
1506 {
1507 GSocketFamily fa = g_socket_address_get_family (a->addr);
1508 GSocketFamily fb = g_socket_address_get_family (b->addr);
1509
1510 if (fa == fb)
1511 return 0;
1512
1513 /* a should go before b */
1514 if (fa == G_SOCKET_FAMILY_IPV4 && fb == G_SOCKET_FAMILY_IPV6)
1515 return -1;
1516
1517 /* b should go before a */
1518 return 1;
1519 }
1520
1521 static void
gst_multiudpsink_add_internal(GstMultiUDPSink * sink,const gchar * host,gint port,gboolean lock)1522 gst_multiudpsink_add_internal (GstMultiUDPSink * sink, const gchar * host,
1523 gint port, gboolean lock)
1524 {
1525 GSocketFamily family;
1526 GstUDPClient *client;
1527 GstUDPClient udpclient;
1528 GList *find;
1529
1530 udpclient.host = (gchar *) host;
1531 udpclient.port = port;
1532
1533 GST_DEBUG_OBJECT (sink, "adding client on host %s, port %d", host, port);
1534
1535 if (lock)
1536 g_mutex_lock (&sink->client_lock);
1537
1538 find = g_list_find_custom (sink->clients, &udpclient,
1539 (GCompareFunc) client_compare);
1540
1541 if (!find) {
1542 find = g_list_find_custom (sink->clients_to_be_removed, &udpclient,
1543 (GCompareFunc) client_compare);
1544 if (find)
1545 gst_udp_client_ref (find->data);
1546 }
1547
1548 if (find) {
1549 client = (GstUDPClient *) find->data;
1550
1551 family = g_socket_address_get_family (client->addr);
1552
1553 GST_DEBUG_OBJECT (sink, "found %d existing clients with host %s, port %d",
1554 client->add_count, host, port);
1555 } else {
1556 client = gst_udp_client_new (sink, host, port);
1557 if (!client)
1558 goto error;
1559
1560 family = g_socket_address_get_family (client->addr);
1561
1562 client->connect_time = g_get_real_time () * GST_USECOND;
1563
1564 if (sink->used_socket)
1565 gst_multiudpsink_configure_client (sink, client);
1566
1567 GST_DEBUG_OBJECT (sink, "add client with host %s, port %d", host, port);
1568
1569 /* keep IPv4 clients at the beginning, and IPv6 at the end, we can make
1570 * use of this in gst_multiudpsink_render_buffers() */
1571 sink->clients = g_list_insert_sorted (sink->clients, client,
1572 (GCompareFunc) gst_udp_client_compare_socket_family);
1573
1574 if (family == G_SOCKET_FAMILY_IPV4)
1575 ++sink->num_v4_unique;
1576 else
1577 ++sink->num_v6_unique;
1578 }
1579
1580 ++client->add_count;
1581
1582 if (family == G_SOCKET_FAMILY_IPV4)
1583 ++sink->num_v4_all;
1584 else
1585 ++sink->num_v6_all;
1586
1587 if (lock)
1588 g_mutex_unlock (&sink->client_lock);
1589
1590 g_signal_emit (G_OBJECT (sink),
1591 gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED], 0, host, port);
1592
1593 GST_DEBUG_OBJECT (sink, "added client on host %s, port %d", host, port);
1594 return;
1595
1596 /* ERRORS */
1597 error:
1598 {
1599 GST_DEBUG_OBJECT (sink, "did not add client on host %s, port %d", host,
1600 port);
1601 if (lock)
1602 g_mutex_unlock (&sink->client_lock);
1603 return;
1604 }
1605 }
1606
1607 void
gst_multiudpsink_add(GstMultiUDPSink * sink,const gchar * host,gint port)1608 gst_multiudpsink_add (GstMultiUDPSink * sink, const gchar * host, gint port)
1609 {
1610 gst_multiudpsink_add_internal (sink, host, port, TRUE);
1611 }
1612
1613 void
gst_multiudpsink_remove(GstMultiUDPSink * sink,const gchar * host,gint port)1614 gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port)
1615 {
1616 GSocketFamily family;
1617 GList *find;
1618 GstUDPClient udpclient;
1619 GstUDPClient *client;
1620
1621 udpclient.host = (gchar *) host;
1622 udpclient.port = port;
1623
1624 g_mutex_lock (&sink->client_lock);
1625 find = g_list_find_custom (sink->clients, &udpclient,
1626 (GCompareFunc) client_compare);
1627 if (!find)
1628 goto not_found;
1629
1630 client = (GstUDPClient *) find->data;
1631
1632 GST_DEBUG_OBJECT (sink, "found %d clients with host %s, port %d",
1633 client->add_count, host, port);
1634
1635 --client->add_count;
1636
1637 family = g_socket_address_get_family (client->addr);
1638 if (family == G_SOCKET_FAMILY_IPV4)
1639 --sink->num_v4_all;
1640 else
1641 --sink->num_v6_all;
1642
1643 if (client->add_count == 0) {
1644 GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr);
1645 GInetAddress *addr = g_inet_socket_address_get_address (saddr);
1646 GSocket *socket;
1647
1648 /* Select socket to send from for this address */
1649 if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
1650 socket = sink->used_socket_v6;
1651 else
1652 socket = sink->used_socket;
1653
1654 GST_DEBUG_OBJECT (sink, "remove client with host %s, port %d", host, port);
1655
1656 client->disconnect_time = g_get_real_time () * GST_USECOND;
1657
1658 if (socket && sink->auto_multicast
1659 && g_inet_address_get_is_multicast (addr)) {
1660 GError *err = NULL;
1661
1662 if (!g_socket_leave_multicast_group (socket, addr, FALSE,
1663 sink->multi_iface, &err)) {
1664 GST_DEBUG_OBJECT (sink, "Failed to leave multicast group: %s",
1665 err->message);
1666 g_clear_error (&err);
1667 }
1668 }
1669
1670 if (family == G_SOCKET_FAMILY_IPV4)
1671 --sink->num_v4_unique;
1672 else
1673 --sink->num_v6_unique;
1674
1675 /* Keep state consistent for streaming thread, so remove from client list,
1676 * but keep it around until after the signal has been emitted, in case a
1677 * callback wants to get stats for that client or so */
1678 sink->clients = g_list_delete_link (sink->clients, find);
1679
1680 sink->clients_to_be_removed =
1681 g_list_prepend (sink->clients_to_be_removed, client);
1682
1683 /* Unlock to emit signal before we delete the actual client */
1684 g_mutex_unlock (&sink->client_lock);
1685 g_signal_emit (G_OBJECT (sink),
1686 gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED], 0, host, port);
1687 g_mutex_lock (&sink->client_lock);
1688
1689 sink->clients_to_be_removed =
1690 g_list_remove (sink->clients_to_be_removed, client);
1691
1692 gst_udp_client_unref (client);
1693 }
1694 g_mutex_unlock (&sink->client_lock);
1695
1696 return;
1697
1698 /* ERRORS */
1699 not_found:
1700 {
1701 g_mutex_unlock (&sink->client_lock);
1702 GST_WARNING_OBJECT (sink, "client at host %s, port %d not found",
1703 host, port);
1704 return;
1705 }
1706 }
1707
1708 static void
gst_multiudpsink_clear_internal(GstMultiUDPSink * sink,gboolean lock)1709 gst_multiudpsink_clear_internal (GstMultiUDPSink * sink, gboolean lock)
1710 {
1711 GST_DEBUG_OBJECT (sink, "clearing");
1712 /* we only need to remove the client structure, there is no additional
1713 * socket or anything to free for UDP */
1714 if (lock)
1715 g_mutex_lock (&sink->client_lock);
1716 g_list_foreach (sink->clients, (GFunc) gst_udp_client_unref, sink);
1717 g_list_free (sink->clients);
1718 sink->clients = NULL;
1719 sink->num_v4_unique = 0;
1720 sink->num_v4_all = 0;
1721 sink->num_v6_unique = 0;
1722 sink->num_v6_all = 0;
1723 if (lock)
1724 g_mutex_unlock (&sink->client_lock);
1725 }
1726
1727 void
gst_multiudpsink_clear(GstMultiUDPSink * sink)1728 gst_multiudpsink_clear (GstMultiUDPSink * sink)
1729 {
1730 gst_multiudpsink_clear_internal (sink, TRUE);
1731 }
1732
1733 GstStructure *
gst_multiudpsink_get_stats(GstMultiUDPSink * sink,const gchar * host,gint port)1734 gst_multiudpsink_get_stats (GstMultiUDPSink * sink, const gchar * host,
1735 gint port)
1736 {
1737 GstUDPClient *client;
1738 GstStructure *result = NULL;
1739 GstUDPClient udpclient;
1740 GList *find;
1741
1742 udpclient.host = (gchar *) host;
1743 udpclient.port = port;
1744
1745 g_mutex_lock (&sink->client_lock);
1746
1747 find = g_list_find_custom (sink->clients, &udpclient,
1748 (GCompareFunc) client_compare);
1749
1750 if (!find)
1751 find = g_list_find_custom (sink->clients_to_be_removed, &udpclient,
1752 (GCompareFunc) client_compare);
1753
1754 if (!find)
1755 goto not_found;
1756
1757 GST_DEBUG_OBJECT (sink, "stats for client with host %s, port %d", host, port);
1758
1759 client = (GstUDPClient *) find->data;
1760
1761 result = gst_structure_new_empty ("multiudpsink-stats");
1762
1763 gst_structure_set (result,
1764 "bytes-sent", G_TYPE_UINT64, client->bytes_sent,
1765 "packets-sent", G_TYPE_UINT64, client->packets_sent,
1766 "connect-time", G_TYPE_UINT64, client->connect_time,
1767 "disconnect-time", G_TYPE_UINT64, client->disconnect_time, NULL);
1768
1769 g_mutex_unlock (&sink->client_lock);
1770
1771 return result;
1772
1773 /* ERRORS */
1774 not_found:
1775 {
1776 g_mutex_unlock (&sink->client_lock);
1777 GST_WARNING_OBJECT (sink, "client with host %s, port %d not found",
1778 host, port);
1779 /* Apparently (see comment in gstmultifdsink.c) returning NULL from here may
1780 * confuse/break python bindings */
1781 return gst_structure_new_empty ("multiudpsink-stats");
1782 }
1783 }
1784
1785 static gboolean
gst_multiudpsink_unlock(GstBaseSink * bsink)1786 gst_multiudpsink_unlock (GstBaseSink * bsink)
1787 {
1788 GstMultiUDPSink *sink;
1789
1790 sink = GST_MULTIUDPSINK (bsink);
1791
1792 g_cancellable_cancel (sink->cancellable);
1793
1794 return TRUE;
1795 }
1796
1797 static gboolean
gst_multiudpsink_unlock_stop(GstBaseSink * bsink)1798 gst_multiudpsink_unlock_stop (GstBaseSink * bsink)
1799 {
1800 GstMultiUDPSink *sink;
1801
1802 sink = GST_MULTIUDPSINK (bsink);
1803
1804 gst_multiudpsink_free_cancellable (sink);
1805 gst_multiudpsink_create_cancellable (sink);
1806
1807 return TRUE;
1808 }
1809