• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3  * Copyright (C) <2009> Jarkko Palviainen <jarkko.palviainen@sesca.com>
4  * Copyright (C) <2012> Collabora Ltd.
5  *   Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 /**
24  * SECTION:element-multiudpsink
25  * @title: multiudpsink
26  * @see_also: udpsink, multifdsink
27  *
28  * multiudpsink is a network sink that sends UDP packets to multiple
29  * clients.
30  * It can be combined with rtp payload encoders to implement RTP streaming.
31  */
32 
33 #ifdef HAVE_CONFIG_H
34 #include "config.h"
35 #endif
36 #include "gstudpelements.h"
37 #include "gstmultiudpsink.h"
38 
39 #include <string.h>
40 
41 #ifdef HAVE_SYS_SOCKET_H
42 #include <sys/socket.h>
43 #endif
44 
45 #include <gio/gnetworking.h>
46 
47 #include "gst/net/net.h"
48 #include "gst/glib-compat-private.h"
49 
50 GST_DEBUG_CATEGORY_STATIC (multiudpsink_debug);
51 #define GST_CAT_DEFAULT (multiudpsink_debug)
52 
53 #define UDP_MAX_SIZE 65507
54 
55 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
56     GST_PAD_SINK,
57     GST_PAD_ALWAYS,
58     GST_STATIC_CAPS_ANY);
59 
60 /* MultiUDPSink signals and args */
61 enum
62 {
63   /* methods */
64   SIGNAL_ADD,
65   SIGNAL_REMOVE,
66   SIGNAL_CLEAR,
67   SIGNAL_GET_STATS,
68 
69   /* signals */
70   SIGNAL_CLIENT_ADDED,
71   SIGNAL_CLIENT_REMOVED,
72 
73   /* FILL ME */
74   LAST_SIGNAL
75 };
76 
77 #define DEFAULT_SOCKET             NULL
78 #define DEFAULT_CLOSE_SOCKET       TRUE
79 #define DEFAULT_USED_SOCKET        NULL
80 #define DEFAULT_CLIENTS            NULL
81 /* FIXME, this should be disabled by default, we don't need to join a multicast
82  * group for sending, if this socket is also used for receiving, it should
83  * be configured in the element that does the receive. */
84 #define DEFAULT_AUTO_MULTICAST     TRUE
85 #define DEFAULT_MULTICAST_IFACE    NULL
86 #define DEFAULT_TTL                64
87 #define DEFAULT_TTL_MC             1
88 #define DEFAULT_LOOP               TRUE
89 #define DEFAULT_FORCE_IPV4         FALSE
90 #define DEFAULT_QOS_DSCP           -1
91 #define DEFAULT_SEND_DUPLICATES    TRUE
92 #define DEFAULT_BUFFER_SIZE        0
93 #define DEFAULT_BIND_ADDRESS       NULL
94 #define DEFAULT_BIND_PORT          0
95 
96 enum
97 {
98   PROP_0,
99   PROP_BYTES_TO_SERVE,
100   PROP_BYTES_SERVED,
101   PROP_SOCKET,
102   PROP_SOCKET_V6,
103   PROP_CLOSE_SOCKET,
104   PROP_USED_SOCKET,
105   PROP_USED_SOCKET_V6,
106   PROP_CLIENTS,
107   PROP_AUTO_MULTICAST,
108   PROP_MULTICAST_IFACE,
109   PROP_TTL,
110   PROP_TTL_MC,
111   PROP_LOOP,
112   PROP_FORCE_IPV4,
113   PROP_QOS_DSCP,
114   PROP_SEND_DUPLICATES,
115   PROP_BUFFER_SIZE,
116   PROP_BIND_ADDRESS,
117   PROP_BIND_PORT
118 };
119 
120 static void gst_multiudpsink_finalize (GObject * object);
121 
122 static GstFlowReturn gst_multiudpsink_render (GstBaseSink * sink,
123     GstBuffer * buffer);
124 static GstFlowReturn gst_multiudpsink_render_list (GstBaseSink * bsink,
125     GstBufferList * buffer_list);
126 
127 static gboolean gst_multiudpsink_start (GstBaseSink * bsink);
128 static gboolean gst_multiudpsink_stop (GstBaseSink * bsink);
129 static gboolean gst_multiudpsink_unlock (GstBaseSink * bsink);
130 static gboolean gst_multiudpsink_unlock_stop (GstBaseSink * bsink);
131 
132 static void gst_multiudpsink_set_property (GObject * object, guint prop_id,
133     const GValue * value, GParamSpec * pspec);
134 static void gst_multiudpsink_get_property (GObject * object, guint prop_id,
135     GValue * value, GParamSpec * pspec);
136 
137 static void gst_multiudpsink_add_internal (GstMultiUDPSink * sink,
138     const gchar * host, gint port, gboolean lock);
139 static void gst_multiudpsink_clear_internal (GstMultiUDPSink * sink,
140     gboolean lock);
141 
142 static guint gst_multiudpsink_signals[LAST_SIGNAL] = { 0 };
143 
144 #define gst_multiudpsink_parent_class parent_class
145 G_DEFINE_TYPE (GstMultiUDPSink, gst_multiudpsink, GST_TYPE_BASE_SINK);
146 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (multiudpsink, "multiudpsink",
147     GST_RANK_NONE, GST_TYPE_MULTIUDPSINK, udp_element_init (plugin));
148 
149 static void
gst_multiudpsink_class_init(GstMultiUDPSinkClass * klass)150 gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass)
151 {
152   GObjectClass *gobject_class;
153   GstElementClass *gstelement_class;
154   GstBaseSinkClass *gstbasesink_class;
155 
156   gobject_class = (GObjectClass *) klass;
157   gstelement_class = (GstElementClass *) klass;
158   gstbasesink_class = (GstBaseSinkClass *) klass;
159 
160   gobject_class->set_property = gst_multiudpsink_set_property;
161   gobject_class->get_property = gst_multiudpsink_get_property;
162   gobject_class->finalize = gst_multiudpsink_finalize;
163 
164   /**
165    * GstMultiUDPSink::add:
166    * @gstmultiudpsink: the sink on which the signal is emitted
167    * @host: the hostname/IP address of the client to add
168    * @port: the port of the client to add
169    *
170    * Add a client with destination @host and @port to the list of
171    * clients. When the same host/port pair is added multiple times, the
172    * send-duplicates property defines if the packets are sent multiple times to
173    * the same host/port pair or not.
174    *
175    * When a host/port pair is added multiple times, an equal amount of remove
176    * calls must be performed to actually remove the host/port pair from the list
177    * of destinations.
178    */
179   gst_multiudpsink_signals[SIGNAL_ADD] =
180       g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
181       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
182       G_STRUCT_OFFSET (GstMultiUDPSinkClass, add),
183       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_INT);
184   /**
185    * GstMultiUDPSink::remove:
186    * @gstmultiudpsink: the sink on which the signal is emitted
187    * @host: the hostname/IP address of the client to remove
188    * @port: the port of the client to remove
189    *
190    * Remove the client with destination @host and @port from the list of
191    * clients.
192    */
193   gst_multiudpsink_signals[SIGNAL_REMOVE] =
194       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
195       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
196       G_STRUCT_OFFSET (GstMultiUDPSinkClass, remove),
197       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_INT);
198   /**
199    * GstMultiUDPSink::clear:
200    * @gstmultiudpsink: the sink on which the signal is emitted
201    *
202    * Clear the list of clients.
203    */
204   gst_multiudpsink_signals[SIGNAL_CLEAR] =
205       g_signal_new ("clear", G_TYPE_FROM_CLASS (klass),
206       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
207       G_STRUCT_OFFSET (GstMultiUDPSinkClass, clear), NULL, NULL, NULL,
208       G_TYPE_NONE, 0);
209   /**
210    * GstMultiUDPSink::get-stats:
211    * @gstmultiudpsink: the sink on which the signal is emitted
212    * @host: the hostname/IP address of the client to get stats on
213    * @port: the port of the client to get stats on
214    *
215    * Get the statistics of the client with destination @host and @port.
216    *
217    * Returns: a GstStructure: bytes_sent, packets_sent, connect_time
218    *           (in epoch nanoseconds), disconnect_time (in epoch
219    *           nanoseconds)
220    */
221   gst_multiudpsink_signals[SIGNAL_GET_STATS] =
222       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
223       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
224       G_STRUCT_OFFSET (GstMultiUDPSinkClass, get_stats),
225       NULL, NULL, NULL, GST_TYPE_STRUCTURE, 2, G_TYPE_STRING, G_TYPE_INT);
226   /**
227    * GstMultiUDPSink::client-added:
228    * @gstmultiudpsink: the sink emitting the signal
229    * @host: the hostname/IP address of the added client
230    * @port: the port of the added client
231    *
232    * Signal emitted when a new client is added to the list of
233    * clients.
234    */
235   gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED] =
236       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
237       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass, client_added),
238       NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_INT);
239   /**
240    * GstMultiUDPSink::client-removed:
241    * @gstmultiudpsink: the sink emitting the signal
242    * @host: the hostname/IP address of the removed client
243    * @port: the port of the removed client
244    *
245    * Signal emitted when a client is removed from the list of
246    * clients.
247    */
248   gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED] =
249       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
250       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass,
251           client_removed), NULL, NULL, NULL,
252       G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_INT);
253 
254   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_TO_SERVE,
255       g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
256           "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
257           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
258   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_SERVED,
259       g_param_spec_uint64 ("bytes-served", "Bytes served",
260           "Total number of bytes sent to all clients", 0, G_MAXUINT64, 0,
261           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
262   g_object_class_install_property (gobject_class, PROP_SOCKET,
263       g_param_spec_object ("socket", "Socket Handle",
264           "Socket to use for UDP sending. (NULL == allocate)",
265           G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266   g_object_class_install_property (gobject_class, PROP_SOCKET_V6,
267       g_param_spec_object ("socket-v6", "Socket Handle IPv6",
268           "Socket to use for UDPv6 sending. (NULL == allocate)",
269           G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
270   g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
271       g_param_spec_boolean ("close-socket", "Close socket",
272           "Close socket if passed as property on state change",
273           DEFAULT_CLOSE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
274   g_object_class_install_property (gobject_class, PROP_USED_SOCKET,
275       g_param_spec_object ("used-socket", "Used Socket Handle",
276           "Socket currently in use for UDP sending. (NULL == no socket)",
277           G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
278   g_object_class_install_property (gobject_class, PROP_USED_SOCKET_V6,
279       g_param_spec_object ("used-socket-v6", "Used Socket Handle IPv6",
280           "Socket currently in use for UDPv6 sending. (NULL == no socket)",
281           G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
282   g_object_class_install_property (gobject_class, PROP_CLIENTS,
283       g_param_spec_string ("clients", "Clients",
284           "A comma separated list of host:port pairs with destinations",
285           DEFAULT_CLIENTS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
286   g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
287       g_param_spec_boolean ("auto-multicast",
288           "Automatically join/leave multicast groups",
289           "Automatically join/leave the multicast groups, FALSE means user"
290           " has to do it himself", DEFAULT_AUTO_MULTICAST,
291           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
292   g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
293       g_param_spec_string ("multicast-iface", "Multicast Interface",
294           "The network interface on which to join the multicast group",
295           DEFAULT_MULTICAST_IFACE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
296   g_object_class_install_property (gobject_class, PROP_TTL,
297       g_param_spec_int ("ttl", "Unicast TTL",
298           "Used for setting the unicast TTL parameter",
299           0, 255, DEFAULT_TTL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
300   g_object_class_install_property (gobject_class, PROP_TTL_MC,
301       g_param_spec_int ("ttl-mc", "Multicast TTL",
302           "Used for setting the multicast TTL parameter",
303           0, 255, DEFAULT_TTL_MC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
304   g_object_class_install_property (gobject_class, PROP_LOOP,
305       g_param_spec_boolean ("loop", "Multicast Loopback",
306           "Used for setting the multicast loop parameter. TRUE = enable,"
307           " FALSE = disable", DEFAULT_LOOP,
308           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
309   /**
310    * GstMultiUDPSink::force-ipv4:
311    *
312    * Force the use of an IPv4 socket.
313    *
314    * Since: 1.0.2
315    */
316 #ifndef GST_REMOVE_DEPRECATED
317   g_object_class_install_property (gobject_class, PROP_FORCE_IPV4,
318       g_param_spec_boolean ("force-ipv4", "Force IPv4",
319           "Forcing the use of an IPv4 socket (DEPRECATED, has no effect anymore)",
320           DEFAULT_FORCE_IPV4,
321           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
322 #endif
323   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_QOS_DSCP,
324       g_param_spec_int ("qos-dscp", "QoS diff srv code point",
325           "Quality of Service, differentiated services code point (-1 default)",
326           -1, 63, DEFAULT_QOS_DSCP,
327           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
328   /**
329    * GstMultiUDPSink::send-duplicates:
330    *
331    * When a host/port pair is added multiple times, send the packet to the host
332    * multiple times as well.
333    */
334   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEND_DUPLICATES,
335       g_param_spec_boolean ("send-duplicates", "Send Duplicates",
336           "When a distination/port pair is added multiple times, send packets "
337           "multiple times as well", DEFAULT_SEND_DUPLICATES,
338           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
339 
340   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
341       g_param_spec_int ("buffer-size", "Buffer Size",
342           "Size of the kernel send buffer in bytes, 0=default", 0, G_MAXINT,
343           DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
344 
345   g_object_class_install_property (gobject_class, PROP_BIND_ADDRESS,
346       g_param_spec_string ("bind-address", "Bind Address",
347           "Address to bind the socket to", DEFAULT_BIND_ADDRESS,
348           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
349   g_object_class_install_property (gobject_class, PROP_BIND_PORT,
350       g_param_spec_int ("bind-port", "Bind Port",
351           "Port to bind the socket to", 0, G_MAXUINT16,
352           DEFAULT_BIND_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
353 
354   gst_element_class_add_static_pad_template (gstelement_class, &sink_template);
355 
356   gst_element_class_set_static_metadata (gstelement_class, "UDP packet sender",
357       "Sink/Network",
358       "Send data over the network via UDP to one or multiple recipients "
359       "which can be added or removed at runtime using action signals",
360       "Wim Taymans <wim.taymans@gmail.com>");
361 
362   gstbasesink_class->render = gst_multiudpsink_render;
363   gstbasesink_class->render_list = gst_multiudpsink_render_list;
364   gstbasesink_class->start = gst_multiudpsink_start;
365   gstbasesink_class->stop = gst_multiudpsink_stop;
366   gstbasesink_class->unlock = gst_multiudpsink_unlock;
367   gstbasesink_class->unlock_stop = gst_multiudpsink_unlock_stop;
368   klass->add = gst_multiudpsink_add;
369   klass->remove = gst_multiudpsink_remove;
370   klass->clear = gst_multiudpsink_clear;
371   klass->get_stats = gst_multiudpsink_get_stats;
372 
373   GST_DEBUG_CATEGORY_INIT (multiudpsink_debug, "multiudpsink", 0, "UDP sink");
374 }
375 
376 static void
gst_multiudpsink_create_cancellable(GstMultiUDPSink * sink)377 gst_multiudpsink_create_cancellable (GstMultiUDPSink * sink)
378 {
379   GPollFD pollfd;
380 
381   sink->cancellable = g_cancellable_new ();
382   sink->made_cancel_fd = g_cancellable_make_pollfd (sink->cancellable, &pollfd);
383 }
384 
385 static void
gst_multiudpsink_free_cancellable(GstMultiUDPSink * sink)386 gst_multiudpsink_free_cancellable (GstMultiUDPSink * sink)
387 {
388   if (sink->made_cancel_fd) {
389     g_cancellable_release_fd (sink->cancellable);
390     sink->made_cancel_fd = FALSE;
391   }
392   g_object_unref (sink->cancellable);
393   sink->cancellable = NULL;
394 }
395 
396 static void
gst_multiudpsink_init(GstMultiUDPSink * sink)397 gst_multiudpsink_init (GstMultiUDPSink * sink)
398 {
399   guint max_mem;
400 
401   g_mutex_init (&sink->client_lock);
402   sink->clients = NULL;
403   sink->num_v4_unique = 0;
404   sink->num_v4_all = 0;
405   sink->num_v6_unique = 0;
406   sink->num_v6_all = 0;
407 
408   sink->socket = DEFAULT_SOCKET;
409   sink->socket_v6 = DEFAULT_SOCKET;
410   sink->used_socket = DEFAULT_USED_SOCKET;
411   sink->used_socket_v6 = DEFAULT_USED_SOCKET;
412   sink->close_socket = DEFAULT_CLOSE_SOCKET;
413   sink->external_socket = (sink->socket != NULL);
414   sink->auto_multicast = DEFAULT_AUTO_MULTICAST;
415   sink->ttl = DEFAULT_TTL;
416   sink->ttl_mc = DEFAULT_TTL_MC;
417   sink->loop = DEFAULT_LOOP;
418   sink->force_ipv4 = DEFAULT_FORCE_IPV4;
419   sink->qos_dscp = DEFAULT_QOS_DSCP;
420   sink->send_duplicates = DEFAULT_SEND_DUPLICATES;
421   sink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
422 
423   gst_multiudpsink_create_cancellable (sink);
424 
425   /* pre-allocate OutputVector, MapInfo and OutputMessage arrays
426    * for use in the render and render_list functions */
427   max_mem = gst_buffer_get_max_memory ();
428 
429   sink->n_vecs = max_mem;
430   sink->vecs = g_new (GOutputVector, sink->n_vecs);
431 
432   sink->n_maps = max_mem;
433   sink->maps = g_new (GstMapInfo, sink->n_maps);
434 
435   sink->n_messages = 1;
436   sink->messages = g_new (GstOutputMessage, sink->n_messages);
437 
438   /* we assume that the number of memories per buffer can fit into a guint8 */
439   g_warn_if_fail (max_mem <= G_MAXUINT8);
440 }
441 
442 static GstUDPClient *
gst_udp_client_new(GstMultiUDPSink * sink,const gchar * host,gint port)443 gst_udp_client_new (GstMultiUDPSink * sink, const gchar * host, gint port)
444 {
445   GstUDPClient *client;
446   GInetAddress *addr;
447   GSocketAddress *sockaddr;
448   GResolver *resolver;
449   GError *err = NULL;
450 
451   sockaddr = g_inet_socket_address_new_from_string (host, port);
452   if (!sockaddr) {
453     GList *results;
454 
455     resolver = g_resolver_get_default ();
456     results =
457         g_resolver_lookup_by_name (resolver, host, sink->cancellable, &err);
458     if (!results)
459       goto name_resolve;
460     addr = G_INET_ADDRESS (g_object_ref (results->data));
461     sockaddr = g_inet_socket_address_new (addr, port);
462 
463     g_resolver_free_addresses (results);
464     g_object_unref (resolver);
465     g_object_unref (addr);
466   }
467   addr = g_inet_socket_address_get_address (G_INET_SOCKET_ADDRESS (sockaddr));
468 #ifndef GST_DISABLE_GST_DEBUG
469   {
470     gchar *ip = g_inet_address_to_string (addr);
471 
472     GST_DEBUG_OBJECT (sink, "IP address for host %s is %s", host, ip);
473     g_free (ip);
474   }
475 #endif
476 
477   client = g_slice_new0 (GstUDPClient);
478   client->ref_count = 1;
479   client->add_count = 0;
480   client->host = g_strdup (host);
481   client->port = port;
482   client->addr = sockaddr;
483 
484   return client;
485 
486 name_resolve:
487   {
488     g_clear_error (&err);
489     g_object_unref (resolver);
490 
491     return NULL;
492   }
493 }
494 
495 /* call with client lock held */
496 static void
gst_udp_client_unref(GstUDPClient * client)497 gst_udp_client_unref (GstUDPClient * client)
498 {
499   if (--client->ref_count == 0) {
500     g_object_unref (client->addr);
501     g_free (client->host);
502     g_slice_free (GstUDPClient, client);
503   }
504 }
505 
506 /* call with client lock held */
507 static inline GstUDPClient *
gst_udp_client_ref(GstUDPClient * client)508 gst_udp_client_ref (GstUDPClient * client)
509 {
510   ++client->ref_count;
511   return client;
512 }
513 
514 static gint
client_compare(GstUDPClient * a,GstUDPClient * b)515 client_compare (GstUDPClient * a, GstUDPClient * b)
516 {
517   if ((a->port == b->port) && (strcmp (a->host, b->host) == 0))
518     return 0;
519 
520   return 1;
521 }
522 
523 static void
gst_multiudpsink_finalize(GObject * object)524 gst_multiudpsink_finalize (GObject * object)
525 {
526   GstMultiUDPSink *sink;
527 
528   sink = GST_MULTIUDPSINK (object);
529 
530   g_list_foreach (sink->clients, (GFunc) gst_udp_client_unref, NULL);
531   g_list_free (sink->clients);
532 
533   if (sink->socket)
534     g_object_unref (sink->socket);
535   sink->socket = NULL;
536 
537   if (sink->socket_v6)
538     g_object_unref (sink->socket_v6);
539   sink->socket_v6 = NULL;
540 
541   if (sink->used_socket)
542     g_object_unref (sink->used_socket);
543   sink->used_socket = NULL;
544 
545   if (sink->used_socket_v6)
546     g_object_unref (sink->used_socket_v6);
547   sink->used_socket_v6 = NULL;
548 
549   gst_multiudpsink_free_cancellable (sink);
550 
551   g_free (sink->multi_iface);
552   sink->multi_iface = NULL;
553 
554   g_free (sink->vecs);
555   sink->vecs = NULL;
556   g_free (sink->maps);
557   sink->maps = NULL;
558   g_free (sink->messages);
559   sink->messages = NULL;
560 
561   g_free (sink->bind_address);
562   sink->bind_address = NULL;
563 
564   g_mutex_clear (&sink->client_lock);
565 
566   G_OBJECT_CLASS (parent_class)->finalize (object);
567 }
568 
569 static gsize
fill_vectors(GOutputVector * vecs,GstMapInfo * maps,guint n,GstBuffer * buf)570 fill_vectors (GOutputVector * vecs, GstMapInfo * maps, guint n, GstBuffer * buf)
571 {
572   GstMemory *mem;
573   gsize size = 0;
574   guint i;
575 
576   g_assert (gst_buffer_n_memory (buf) == n);
577 
578   for (i = 0; i < n; ++i) {
579     mem = gst_buffer_peek_memory (buf, i);
580     if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
581       vecs[i].buffer = maps[i].data;
582       vecs[i].size = maps[i].size;
583     } else {
584       GST_WARNING ("Failed to map memory %p for reading", mem);
585       vecs[i].buffer = "";
586       vecs[i].size = 0;
587     }
588     size += vecs[i].size;
589   }
590 
591   return size;
592 }
593 
594 static gsize
gst_udp_calc_message_size(GstOutputMessage * msg)595 gst_udp_calc_message_size (GstOutputMessage * msg)
596 {
597   gsize size = 0;
598   guint i;
599 
600   for (i = 0; i < msg->num_vectors; ++i)
601     size += msg->vectors[i].size;
602 
603   return size;
604 }
605 
606 static gint
gst_udp_messsages_find_first_not_sent(GstOutputMessage * messages,guint num_messages)607 gst_udp_messsages_find_first_not_sent (GstOutputMessage * messages,
608     guint num_messages)
609 {
610   guint i;
611 
612   for (i = 0; i < num_messages; ++i) {
613     GstOutputMessage *msg = &messages[i];
614 
615     if (msg->bytes_sent == 0 && gst_udp_calc_message_size (msg) > 0)
616       return i;
617   }
618 
619   return -1;
620 }
621 
622 static inline gchar *
gst_udp_address_get_string(GSocketAddress * addr,gchar * s,gsize size)623 gst_udp_address_get_string (GSocketAddress * addr, gchar * s, gsize size)
624 {
625   GInetSocketAddress *isa = G_INET_SOCKET_ADDRESS (addr);
626   GInetAddress *ia;
627   gchar *addr_str;
628 
629   ia = g_inet_socket_address_get_address (isa);
630   addr_str = g_inet_address_to_string (ia);
631   g_snprintf (s, size, "%s:%u", addr_str, g_inet_socket_address_get_port (isa));
632   g_free (addr_str);
633 
634   return s;
635 }
636 
637 /* Wrapper around g_socket_send_messages() plus error handling (ignoring).
638  * Returns FALSE if we got cancelled, otherwise TRUE. */
639 static GstFlowReturn
gst_multiudpsink_send_messages(GstMultiUDPSink * sink,GSocket * socket,GstOutputMessage * messages,guint num_messages)640 gst_multiudpsink_send_messages (GstMultiUDPSink * sink, GSocket * socket,
641     GstOutputMessage * messages, guint num_messages)
642 {
643   gboolean sent_max_size_warning = FALSE;
644 
645   while (num_messages > 0) {
646     gchar astr[64] G_GNUC_UNUSED;
647     GError *err = NULL;
648     guint msg_size, skip, i;
649     gint ret, err_idx;
650 
651     ret = g_socket_send_messages (socket, messages, num_messages, 0,
652         sink->cancellable, &err);
653 
654     if (G_UNLIKELY (ret < 0)) {
655       GstOutputMessage *msg;
656 
657       if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
658         GstFlowReturn flow_ret;
659 
660         g_clear_error (&err);
661 
662         flow_ret = gst_base_sink_wait_preroll (GST_BASE_SINK (sink));
663 
664         if (flow_ret == GST_FLOW_OK)
665           continue;
666 
667         return flow_ret;
668       }
669 
670       err_idx = gst_udp_messsages_find_first_not_sent (messages, num_messages);
671       if (err_idx < 0)
672         break;
673 
674       msg = &messages[err_idx];
675       msg_size = gst_udp_calc_message_size (msg);
676 
677       GST_LOG_OBJECT (sink, "error sending %u bytes to client %s: %s", msg_size,
678           gst_udp_address_get_string (msg->address, astr, sizeof (astr)),
679           err->message);
680 
681       skip = 1;
682       if (msg_size > UDP_MAX_SIZE) {
683         if (!sent_max_size_warning) {
684           GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
685               ("Attempting to send a UDP packets larger than maximum size "
686                   "(%u > %d)", msg_size, UDP_MAX_SIZE),
687               ("Reason: %s", err ? err->message : "unknown reason"));
688           sent_max_size_warning = FALSE;
689         }
690       } else {
691         GST_ELEMENT_WARNING (sink, RESOURCE, WRITE,
692             ("Error sending UDP packets"), ("client %s, reason: %s",
693                 gst_udp_address_get_string (msg->address, astr, sizeof (astr)),
694                 (err != NULL) ? err->message : "unknown reason"));
695 
696         for (i = err_idx + 1; i < num_messages; ++i, ++skip) {
697           if (messages[i].address != msg->address)
698             break;
699         }
700         GST_DEBUG_OBJECT (sink, "skipping %d message(s) to same client", skip);
701       }
702 
703       /* ignore any errors and try sending the rest */
704       g_clear_error (&err);
705       ret = skip;
706     }
707 
708     g_assert (ret <= num_messages);
709 
710     messages += ret;
711     num_messages -= ret;
712   }
713 
714   return GST_FLOW_OK;
715 }
716 
717 static GstFlowReturn
gst_multiudpsink_render_buffers(GstMultiUDPSink * sink,GstBuffer ** buffers,guint num_buffers,guint8 * mem_nums,guint total_mem_num)718 gst_multiudpsink_render_buffers (GstMultiUDPSink * sink, GstBuffer ** buffers,
719     guint num_buffers, guint8 * mem_nums, guint total_mem_num)
720 {
721   GstOutputMessage *msgs;
722   gboolean send_duplicates;
723   GstUDPClient **clients;
724   GOutputVector *vecs;
725   GstMapInfo *map_infos;
726   GstFlowReturn flow_ret;
727   guint num_addr_v4, num_addr_v6;
728   guint num_addr, num_msgs;
729   guint i, j, mem;
730   gsize size = 0;
731   GList *l;
732 
733   send_duplicates = sink->send_duplicates;
734 
735   g_mutex_lock (&sink->client_lock);
736 
737   if (send_duplicates) {
738     num_addr_v4 = sink->num_v4_all;
739     num_addr_v6 = sink->num_v6_all;
740   } else {
741     num_addr_v4 = sink->num_v4_unique;
742     num_addr_v6 = sink->num_v6_unique;
743   }
744   num_addr = num_addr_v4 + num_addr_v6;
745 
746   if (num_addr == 0)
747     goto no_clients;
748 
749   clients = g_newa (GstUDPClient *, num_addr);
750   for (l = sink->clients, i = 0; l != NULL; l = l->next) {
751     GstUDPClient *client = l->data;
752 
753     clients[i++] = gst_udp_client_ref (client);
754     for (j = 1; send_duplicates && j < client->add_count; ++j)
755       clients[i++] = gst_udp_client_ref (client);
756   }
757   g_assert_cmpuint (i, ==, num_addr);
758 
759   g_mutex_unlock (&sink->client_lock);
760 
761   GST_LOG_OBJECT (sink, "%u buffers, %u memories -> to be sent to %u clients",
762       num_buffers, total_mem_num, num_addr);
763 
764   /* ensure our pre-allocated scratch space arrays are large enough */
765   if (sink->n_vecs < total_mem_num) {
766     sink->n_vecs = GST_ROUND_UP_16 (total_mem_num);
767     g_free (sink->vecs);
768     sink->vecs = g_new (GOutputVector, sink->n_vecs);
769   }
770   vecs = sink->vecs;
771 
772   if (sink->n_maps < total_mem_num) {
773     sink->n_maps = GST_ROUND_UP_16 (total_mem_num);
774     g_free (sink->maps);
775     sink->maps = g_new (GstMapInfo, sink->n_maps);
776   }
777   map_infos = sink->maps;
778 
779   num_msgs = num_addr * num_buffers;
780   if (sink->n_messages < num_msgs) {
781     sink->n_messages = GST_ROUND_UP_16 (num_msgs);
782     g_free (sink->messages);
783     sink->messages = g_new (GstOutputMessage, sink->n_messages);
784   }
785   msgs = sink->messages;
786 
787   /* populate first num_buffers messages with output vectors for the buffers */
788   for (i = 0, mem = 0; i < num_buffers; ++i) {
789     size += fill_vectors (&vecs[mem], &map_infos[mem], mem_nums[i], buffers[i]);
790     msgs[i].vectors = &vecs[mem];
791     msgs[i].num_vectors = mem_nums[i];
792     msgs[i].num_control_messages = 0;
793     msgs[i].bytes_sent = 0;
794     msgs[i].control_messages = NULL;
795     msgs[i].address = clients[0]->addr;
796     mem += mem_nums[i];
797   }
798 
799   /* FIXME: how about some locking? (there wasn't any before either, but..) */
800   sink->bytes_to_serve += size;
801 
802   /* now copy the pre-filled num_buffer messages over to the next num_buffer
803    * messages for the next client, where we also change the target address */
804   for (i = 1; i < num_addr; ++i) {
805     for (j = 0; j < num_buffers; ++j) {
806       msgs[i * num_buffers + j] = msgs[j];
807       msgs[i * num_buffers + j].address = clients[i]->addr;
808     }
809   }
810 
811   /* now send it! */
812 
813   /* no IPv4 socket? Send it all from the IPv6 socket then.. */
814   if (sink->used_socket == NULL) {
815     flow_ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6,
816         msgs, num_msgs);
817   } else {
818     guint num_msgs_v4 = num_buffers * num_addr_v4;
819     guint num_msgs_v6 = num_buffers * num_addr_v6;
820 
821     /* our client list is sorted with IPv4 clients first and IPv6 ones last */
822     flow_ret = gst_multiudpsink_send_messages (sink, sink->used_socket,
823         msgs, num_msgs_v4);
824 
825     if (flow_ret != GST_FLOW_OK)
826       goto cancelled;
827 
828     flow_ret = gst_multiudpsink_send_messages (sink, sink->used_socket_v6,
829         msgs + num_msgs_v4, num_msgs_v6);
830   }
831 
832   if (flow_ret != GST_FLOW_OK)
833     goto cancelled;
834 
835   /* now update stats */
836   g_mutex_lock (&sink->client_lock);
837 
838   for (i = 0; i < num_addr; ++i) {
839     GstUDPClient *client = clients[i];
840 
841     for (j = 0; j < num_buffers; ++j) {
842       gsize bytes_sent;
843 
844       bytes_sent = msgs[i * num_buffers + j].bytes_sent;
845 
846       client->bytes_sent += bytes_sent;
847       client->packets_sent++;
848       sink->bytes_served += bytes_sent;
849     }
850     gst_udp_client_unref (client);
851   }
852 
853   g_mutex_unlock (&sink->client_lock);
854 
855 out:
856 
857   for (i = 0; i < mem; ++i)
858     gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
859 
860   return flow_ret;
861 
862 no_clients:
863   {
864     g_mutex_unlock (&sink->client_lock);
865     GST_LOG_OBJECT (sink, "no clients");
866     return GST_FLOW_OK;
867   }
868 cancelled:
869   {
870     GST_INFO_OBJECT (sink, "cancelled");
871 
872     g_mutex_lock (&sink->client_lock);
873     for (i = 0; i < num_addr; ++i)
874       gst_udp_client_unref (clients[i]);
875     g_mutex_unlock (&sink->client_lock);
876     goto out;
877   }
878 }
879 
880 static GstFlowReturn
gst_multiudpsink_render_list(GstBaseSink * bsink,GstBufferList * buffer_list)881 gst_multiudpsink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
882 {
883   GstMultiUDPSink *sink;
884   GstBuffer **buffers;
885   GstFlowReturn flow;
886   guint8 *mem_nums;
887   guint total_mems;
888   guint i, num_buffers;
889 
890   sink = GST_MULTIUDPSINK_CAST (bsink);
891 
892   num_buffers = gst_buffer_list_length (buffer_list);
893   if (num_buffers == 0)
894     goto no_data;
895 
896   buffers = g_newa (GstBuffer *, num_buffers);
897   mem_nums = g_newa (guint8, num_buffers);
898   for (i = 0, total_mems = 0; i < num_buffers; ++i) {
899     buffers[i] = gst_buffer_list_get (buffer_list, i);
900     mem_nums[i] = gst_buffer_n_memory (buffers[i]);
901     total_mems += mem_nums[i];
902   }
903 
904   flow = gst_multiudpsink_render_buffers (sink, buffers, num_buffers,
905       mem_nums, total_mems);
906 
907   return flow;
908 
909 no_data:
910   {
911     GST_LOG_OBJECT (sink, "empty buffer");
912     return GST_FLOW_OK;
913   }
914 }
915 
916 static GstFlowReturn
gst_multiudpsink_render(GstBaseSink * bsink,GstBuffer * buffer)917 gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
918 {
919   GstMultiUDPSink *sink;
920   GstFlowReturn flow;
921   guint8 n_mem;
922 
923   sink = GST_MULTIUDPSINK_CAST (bsink);
924 
925   n_mem = gst_buffer_n_memory (buffer);
926 
927   if (n_mem > 0)
928     flow = gst_multiudpsink_render_buffers (sink, &buffer, 1, &n_mem, n_mem);
929   else
930     flow = GST_FLOW_OK;
931 
932   return flow;
933 }
934 
935 static void
gst_multiudpsink_set_clients_string(GstMultiUDPSink * sink,const gchar * string)936 gst_multiudpsink_set_clients_string (GstMultiUDPSink * sink,
937     const gchar * string)
938 {
939   gchar **clients;
940   gint i;
941 
942   clients = g_strsplit (string, ",", 0);
943 
944   g_mutex_lock (&sink->client_lock);
945   /* clear all existing clients */
946   gst_multiudpsink_clear_internal (sink, FALSE);
947   for (i = 0; clients[i]; i++) {
948     gchar *host, *p;
949     gint64 port = 0;
950 
951     host = clients[i];
952     p = strstr (clients[i], ":");
953     if (p != NULL) {
954       *p = '\0';
955       port = g_ascii_strtoll (p + 1, NULL, 10);
956     }
957     if (port != 0)
958       gst_multiudpsink_add_internal (sink, host, port, FALSE);
959   }
960   g_mutex_unlock (&sink->client_lock);
961 
962   g_strfreev (clients);
963 }
964 
965 static gchar *
gst_multiudpsink_get_clients_string(GstMultiUDPSink * sink)966 gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink)
967 {
968   GString *str;
969   GList *clients;
970 
971   str = g_string_new ("");
972 
973   g_mutex_lock (&sink->client_lock);
974   clients = sink->clients;
975   while (clients) {
976     GstUDPClient *client;
977     gint count;
978 
979     client = (GstUDPClient *) clients->data;
980 
981     clients = g_list_next (clients);
982 
983     count = client->add_count;
984     while (count--) {
985       g_string_append_printf (str, "%s:%d%s", client->host, client->port,
986           (clients || count > 1 ? "," : ""));
987     }
988   }
989   g_mutex_unlock (&sink->client_lock);
990 
991   return g_string_free (str, FALSE);
992 }
993 
994 static void
gst_multiudpsink_setup_qos_dscp(GstMultiUDPSink * sink,GSocket * socket)995 gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink, GSocket * socket)
996 {
997   /* don't touch on -1 */
998   if (sink->qos_dscp < 0)
999     return;
1000 
1001   if (socket == NULL)
1002     return;
1003 
1004   if (!gst_net_utils_set_socket_tos (socket, sink->qos_dscp))
1005     GST_ERROR_OBJECT (sink, "could not set qos dscp: %d", sink->qos_dscp);
1006 }
1007 
1008 static void
gst_multiudpsink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1009 gst_multiudpsink_set_property (GObject * object, guint prop_id,
1010     const GValue * value, GParamSpec * pspec)
1011 {
1012   GstMultiUDPSink *udpsink;
1013 
1014   udpsink = GST_MULTIUDPSINK (object);
1015 
1016   switch (prop_id) {
1017     case PROP_SOCKET:
1018       if (udpsink->socket != NULL && udpsink->socket != udpsink->used_socket &&
1019           udpsink->close_socket) {
1020         GError *err = NULL;
1021 
1022         if (!g_socket_close (udpsink->socket, &err)) {
1023           GST_ERROR ("failed to close socket %p: %s", udpsink->socket,
1024               err->message);
1025           g_clear_error (&err);
1026         }
1027       }
1028       if (udpsink->socket)
1029         g_object_unref (udpsink->socket);
1030       udpsink->socket = g_value_dup_object (value);
1031       GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket);
1032       break;
1033     case PROP_SOCKET_V6:
1034       if (udpsink->socket_v6 != NULL
1035           && udpsink->socket_v6 != udpsink->used_socket_v6
1036           && udpsink->close_socket) {
1037         GError *err = NULL;
1038 
1039         if (!g_socket_close (udpsink->socket_v6, &err)) {
1040           GST_ERROR ("failed to close socket %p: %s", udpsink->socket_v6,
1041               err->message);
1042           g_clear_error (&err);
1043         }
1044       }
1045       if (udpsink->socket_v6)
1046         g_object_unref (udpsink->socket_v6);
1047       udpsink->socket_v6 = g_value_dup_object (value);
1048       GST_DEBUG_OBJECT (udpsink, "setting socket to %p", udpsink->socket_v6);
1049       break;
1050     case PROP_CLOSE_SOCKET:
1051       udpsink->close_socket = g_value_get_boolean (value);
1052       break;
1053     case PROP_CLIENTS:
1054       gst_multiudpsink_set_clients_string (udpsink, g_value_get_string (value));
1055       break;
1056     case PROP_AUTO_MULTICAST:
1057       udpsink->auto_multicast = g_value_get_boolean (value);
1058       break;
1059     case PROP_MULTICAST_IFACE:
1060       g_free (udpsink->multi_iface);
1061 
1062       if (g_value_get_string (value) == NULL)
1063         udpsink->multi_iface = g_strdup (DEFAULT_MULTICAST_IFACE);
1064       else
1065         udpsink->multi_iface = g_value_dup_string (value);
1066       break;
1067     case PROP_TTL:
1068       udpsink->ttl = g_value_get_int (value);
1069       break;
1070     case PROP_TTL_MC:
1071       udpsink->ttl_mc = g_value_get_int (value);
1072       break;
1073     case PROP_LOOP:
1074       udpsink->loop = g_value_get_boolean (value);
1075       break;
1076     case PROP_FORCE_IPV4:
1077       udpsink->force_ipv4 = g_value_get_boolean (value);
1078       break;
1079     case PROP_QOS_DSCP:
1080       udpsink->qos_dscp = g_value_get_int (value);
1081       gst_multiudpsink_setup_qos_dscp (udpsink, udpsink->used_socket);
1082       gst_multiudpsink_setup_qos_dscp (udpsink, udpsink->used_socket_v6);
1083       break;
1084     case PROP_SEND_DUPLICATES:
1085       udpsink->send_duplicates = g_value_get_boolean (value);
1086       break;
1087     case PROP_BUFFER_SIZE:
1088       udpsink->buffer_size = g_value_get_int (value);
1089       break;
1090     case PROP_BIND_ADDRESS:
1091       g_free (udpsink->bind_address);
1092       udpsink->bind_address = g_value_dup_string (value);
1093       break;
1094     case PROP_BIND_PORT:
1095       udpsink->bind_port = g_value_get_int (value);
1096       break;
1097     default:
1098       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1099       break;
1100   }
1101 }
1102 
1103 static void
gst_multiudpsink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1104 gst_multiudpsink_get_property (GObject * object, guint prop_id, GValue * value,
1105     GParamSpec * pspec)
1106 {
1107   GstMultiUDPSink *udpsink;
1108 
1109   udpsink = GST_MULTIUDPSINK (object);
1110 
1111   switch (prop_id) {
1112     case PROP_BYTES_TO_SERVE:
1113       g_value_set_uint64 (value, udpsink->bytes_to_serve);
1114       break;
1115     case PROP_BYTES_SERVED:
1116       g_value_set_uint64 (value, udpsink->bytes_served);
1117       break;
1118     case PROP_SOCKET:
1119       g_value_set_object (value, udpsink->socket);
1120       break;
1121     case PROP_SOCKET_V6:
1122       g_value_set_object (value, udpsink->socket_v6);
1123       break;
1124     case PROP_CLOSE_SOCKET:
1125       g_value_set_boolean (value, udpsink->close_socket);
1126       break;
1127     case PROP_USED_SOCKET:
1128       g_value_set_object (value, udpsink->used_socket);
1129       break;
1130     case PROP_USED_SOCKET_V6:
1131       g_value_set_object (value, udpsink->used_socket_v6);
1132       break;
1133     case PROP_CLIENTS:
1134       g_value_take_string (value,
1135           gst_multiudpsink_get_clients_string (udpsink));
1136       break;
1137     case PROP_AUTO_MULTICAST:
1138       g_value_set_boolean (value, udpsink->auto_multicast);
1139       break;
1140     case PROP_MULTICAST_IFACE:
1141       g_value_set_string (value, udpsink->multi_iface);
1142       break;
1143     case PROP_TTL:
1144       g_value_set_int (value, udpsink->ttl);
1145       break;
1146     case PROP_TTL_MC:
1147       g_value_set_int (value, udpsink->ttl_mc);
1148       break;
1149     case PROP_LOOP:
1150       g_value_set_boolean (value, udpsink->loop);
1151       break;
1152     case PROP_FORCE_IPV4:
1153       g_value_set_boolean (value, udpsink->force_ipv4);
1154       break;
1155     case PROP_QOS_DSCP:
1156       g_value_set_int (value, udpsink->qos_dscp);
1157       break;
1158     case PROP_SEND_DUPLICATES:
1159       g_value_set_boolean (value, udpsink->send_duplicates);
1160       break;
1161     case PROP_BUFFER_SIZE:
1162       g_value_set_int (value, udpsink->buffer_size);
1163       break;
1164     case PROP_BIND_ADDRESS:
1165       g_value_set_string (value, udpsink->bind_address);
1166       break;
1167     case PROP_BIND_PORT:
1168       g_value_set_int (value, udpsink->bind_port);
1169       break;
1170     default:
1171       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1172       break;
1173   }
1174 }
1175 
1176 static gboolean
gst_multiudpsink_configure_client(GstMultiUDPSink * sink,GstUDPClient * client)1177 gst_multiudpsink_configure_client (GstMultiUDPSink * sink,
1178     GstUDPClient * client)
1179 {
1180   GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr);
1181   GInetAddress *addr = g_inet_socket_address_get_address (saddr);
1182   GSocketFamily family = g_socket_address_get_family (G_SOCKET_ADDRESS (saddr));
1183   GSocket *socket;
1184   GError *err = NULL;
1185 
1186   GST_DEBUG_OBJECT (sink, "configuring client %p", client);
1187 
1188   if (family == G_SOCKET_FAMILY_IPV6 && !sink->used_socket_v6)
1189     goto invalid_family;
1190 
1191   /* Select socket to send from for this address */
1192   if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
1193     socket = sink->used_socket_v6;
1194   else
1195     socket = sink->used_socket;
1196 
1197   if (g_inet_address_get_is_multicast (addr)) {
1198     GST_DEBUG_OBJECT (sink, "we have a multicast client %p", client);
1199     if (sink->auto_multicast) {
1200       GST_DEBUG_OBJECT (sink, "autojoining group");
1201       if (!g_socket_join_multicast_group (socket, addr, FALSE,
1202               sink->multi_iface, &err))
1203         goto join_group_failed;
1204     }
1205     GST_DEBUG_OBJECT (sink, "setting loop to %d", sink->loop);
1206     g_socket_set_multicast_loopback (socket, sink->loop);
1207     GST_DEBUG_OBJECT (sink, "setting ttl to %d", sink->ttl_mc);
1208     g_socket_set_multicast_ttl (socket, sink->ttl_mc);
1209   } else {
1210     GST_DEBUG_OBJECT (sink, "setting unicast ttl to %d", sink->ttl);
1211     g_socket_set_ttl (socket, sink->ttl);
1212   }
1213   return TRUE;
1214 
1215   /* ERRORS */
1216 join_group_failed:
1217   {
1218     gst_multiudpsink_stop (GST_BASE_SINK (sink));
1219     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
1220         ("Could not join multicast group: %s",
1221             err ? err->message : "unknown reason"));
1222     g_clear_error (&err);
1223     return FALSE;
1224   }
1225 invalid_family:
1226   {
1227     gst_multiudpsink_stop (GST_BASE_SINK (sink));
1228     GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
1229         ("Invalid address family (got %d)", family));
1230     return FALSE;
1231   }
1232 }
1233 
1234 /* create a socket for sending to remote machine */
1235 static gboolean
gst_multiudpsink_start(GstBaseSink * bsink)1236 gst_multiudpsink_start (GstBaseSink * bsink)
1237 {
1238   GstMultiUDPSink *sink;
1239   GList *clients;
1240   GstUDPClient *client;
1241   GError *err = NULL;
1242 
1243   sink = GST_MULTIUDPSINK (bsink);
1244 
1245   sink->external_socket = FALSE;
1246 
1247   if (sink->socket) {
1248     GST_DEBUG_OBJECT (sink, "using configured socket");
1249     if (g_socket_get_family (sink->socket) == G_SOCKET_FAMILY_IPV6) {
1250       sink->used_socket_v6 = G_SOCKET (g_object_ref (sink->socket));
1251       sink->external_socket = TRUE;
1252     } else {
1253       sink->used_socket = G_SOCKET (g_object_ref (sink->socket));
1254       sink->external_socket = TRUE;
1255     }
1256   }
1257 
1258   if (sink->socket_v6) {
1259     GST_DEBUG_OBJECT (sink, "using configured IPv6 socket");
1260     g_return_val_if_fail (!sink->socket || g_socket_get_family (sink->socket) !=
1261         G_SOCKET_FAMILY_IPV6, FALSE);
1262 
1263     if (sink->used_socket_v6 && sink->used_socket_v6 != sink->socket_v6) {
1264       GST_ERROR_OBJECT (sink,
1265           "Provided different IPv6 sockets in socket and socket-v6 properties");
1266       return FALSE;
1267     }
1268 
1269     sink->used_socket_v6 = G_SOCKET (g_object_ref (sink->socket_v6));
1270     sink->external_socket = TRUE;
1271   }
1272 
1273   if (!sink->used_socket && !sink->used_socket_v6) {
1274     GSocketAddress *bind_addr;
1275     GInetAddress *bind_iaddr;
1276 
1277     if (sink->bind_address) {
1278       GSocketFamily family;
1279 
1280       bind_iaddr = g_inet_address_new_from_string (sink->bind_address);
1281       if (!bind_iaddr) {
1282         GList *results;
1283         GResolver *resolver;
1284 
1285         resolver = g_resolver_get_default ();
1286         results =
1287             g_resolver_lookup_by_name (resolver, sink->bind_address,
1288             sink->cancellable, &err);
1289         if (!results) {
1290           g_object_unref (resolver);
1291           goto name_resolve;
1292         }
1293         bind_iaddr = G_INET_ADDRESS (g_object_ref (results->data));
1294         g_resolver_free_addresses (results);
1295         g_object_unref (resolver);
1296       }
1297 
1298       bind_addr = g_inet_socket_address_new (bind_iaddr, sink->bind_port);
1299       g_object_unref (bind_iaddr);
1300       family = g_socket_address_get_family (G_SOCKET_ADDRESS (bind_addr));
1301 
1302       if ((sink->used_socket =
1303               g_socket_new (family, G_SOCKET_TYPE_DATAGRAM,
1304                   G_SOCKET_PROTOCOL_UDP, &err)) == NULL) {
1305         g_object_unref (bind_addr);
1306         goto no_socket;
1307       }
1308 
1309       g_socket_bind (sink->used_socket, bind_addr, TRUE, &err);
1310       g_object_unref (bind_addr);
1311       if (err != NULL)
1312         goto bind_error;
1313     } else {
1314       /* create sender sockets if none available */
1315       if ((sink->used_socket = g_socket_new (G_SOCKET_FAMILY_IPV4,
1316                   G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP, &err)) == NULL)
1317         goto no_socket;
1318 
1319       bind_iaddr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
1320       bind_addr = g_inet_socket_address_new (bind_iaddr, sink->bind_port);
1321       g_socket_bind (sink->used_socket, bind_addr, TRUE, &err);
1322       g_object_unref (bind_addr);
1323       g_object_unref (bind_iaddr);
1324       if (err != NULL)
1325         goto bind_error;
1326 
1327       if ((sink->used_socket_v6 = g_socket_new (G_SOCKET_FAMILY_IPV6,
1328                   G_SOCKET_TYPE_DATAGRAM, G_SOCKET_PROTOCOL_UDP,
1329                   &err)) == NULL) {
1330         GST_INFO_OBJECT (sink, "Failed to create IPv6 socket: %s",
1331             err->message);
1332         g_clear_error (&err);
1333       } else {
1334         bind_iaddr = g_inet_address_new_any (G_SOCKET_FAMILY_IPV6);
1335         bind_addr = g_inet_socket_address_new (bind_iaddr, sink->bind_port);
1336         g_socket_bind (sink->used_socket_v6, bind_addr, TRUE, &err);
1337         g_object_unref (bind_addr);
1338         g_object_unref (bind_iaddr);
1339         if (err != NULL)
1340           goto bind_error;
1341       }
1342     }
1343   }
1344 #ifdef SO_SNDBUF
1345   {
1346     gint sndsize;
1347     GError *opt_err = NULL;
1348 
1349     if (sink->buffer_size != 0) {
1350       sndsize = sink->buffer_size;
1351 
1352       GST_DEBUG_OBJECT (sink, "setting udp buffer of %d bytes", sndsize);
1353       /* set buffer size, Note that on Linux this is typically limited to a
1354        * maximum of around 100K. Also a minimum of 128 bytes is required on
1355        * Linux. */
1356 
1357       if (sink->used_socket) {
1358         if (!g_socket_set_option (sink->used_socket, SOL_SOCKET, SO_SNDBUF,
1359                 sndsize, &opt_err)) {
1360           GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL),
1361               ("Could not create a buffer of requested %d bytes (%s)",
1362                   sndsize, opt_err->message));
1363           g_clear_error (&opt_err);
1364         }
1365       }
1366 
1367       if (sink->used_socket_v6) {
1368         if (!g_socket_set_option (sink->used_socket_v6, SOL_SOCKET, SO_SNDBUF,
1369                 sndsize, &opt_err)) {
1370           GST_ELEMENT_WARNING (sink, RESOURCE, SETTINGS, (NULL),
1371               ("Could not create a buffer of requested %d bytes (%s)",
1372                   sndsize, opt_err->message));
1373           g_clear_error (&opt_err);
1374         }
1375       }
1376     }
1377 
1378     /* read the value of the receive buffer. Note that on linux this returns 2x the
1379      * value we set because the kernel allocates extra memory for metadata.
1380      * The default on Linux is about 100K (which is about 50K without metadata) */
1381     if (sink->used_socket) {
1382       if (g_socket_get_option (sink->used_socket, SOL_SOCKET, SO_SNDBUF,
1383               &sndsize, NULL)) {
1384         GST_DEBUG_OBJECT (sink, "have UDP buffer of %d bytes", sndsize);
1385       } else {
1386         GST_DEBUG_OBJECT (sink, "could not get UDP buffer size");
1387       }
1388     }
1389 
1390     if (sink->used_socket_v6) {
1391       if (g_socket_get_option (sink->used_socket_v6, SOL_SOCKET, SO_SNDBUF,
1392               &sndsize, NULL)) {
1393         GST_DEBUG_OBJECT (sink, "have UDPv6 buffer of %d bytes", sndsize);
1394       } else {
1395         GST_DEBUG_OBJECT (sink, "could not get UDPv6 buffer size");
1396       }
1397     }
1398   }
1399 #endif
1400 
1401 #ifdef SO_BINDTODEVICE
1402   if (sink->multi_iface) {
1403     if (sink->used_socket) {
1404       if (setsockopt (g_socket_get_fd (sink->used_socket), SOL_SOCKET,
1405               SO_BINDTODEVICE, sink->multi_iface,
1406               strlen (sink->multi_iface)) < 0)
1407         GST_WARNING_OBJECT (sink, "setsockopt SO_BINDTODEVICE failed: %s",
1408             strerror (errno));
1409     }
1410     if (sink->used_socket_v6) {
1411       if (setsockopt (g_socket_get_fd (sink->used_socket_v6), SOL_SOCKET,
1412               SO_BINDTODEVICE, sink->multi_iface,
1413               strlen (sink->multi_iface)) < 0)
1414         GST_WARNING_OBJECT (sink, "setsockopt SO_BINDTODEVICE failed (v6): %s",
1415             strerror (errno));
1416     }
1417   }
1418 #endif
1419 
1420   if (sink->used_socket)
1421     g_socket_set_broadcast (sink->used_socket, TRUE);
1422   if (sink->used_socket_v6)
1423     g_socket_set_broadcast (sink->used_socket_v6, TRUE);
1424 
1425   sink->bytes_to_serve = 0;
1426   sink->bytes_served = 0;
1427 
1428   gst_multiudpsink_setup_qos_dscp (sink, sink->used_socket);
1429   gst_multiudpsink_setup_qos_dscp (sink, sink->used_socket_v6);
1430 
1431   /* look for multicast clients and join multicast groups appropriately
1432      set also ttl and multicast loopback delivery appropriately  */
1433   for (clients = sink->clients; clients; clients = g_list_next (clients)) {
1434     client = (GstUDPClient *) clients->data;
1435 
1436     if (!gst_multiudpsink_configure_client (sink, client))
1437       return FALSE;
1438   }
1439   return TRUE;
1440 
1441   /* ERRORS */
1442 no_socket:
1443   {
1444     GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1445         ("Could not create socket: %s", err->message));
1446     g_clear_error (&err);
1447     return FALSE;
1448   }
1449 bind_error:
1450   {
1451     GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1452         ("Failed to bind socket: %s", err->message));
1453     g_clear_error (&err);
1454     return FALSE;
1455   }
1456 name_resolve:
1457   {
1458     GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
1459         ("Failed to resolve bind address %s: %s", sink->bind_address,
1460             err->message));
1461     g_clear_error (&err);
1462     return FALSE;
1463   }
1464 }
1465 
1466 static gboolean
gst_multiudpsink_stop(GstBaseSink * bsink)1467 gst_multiudpsink_stop (GstBaseSink * bsink)
1468 {
1469   GstMultiUDPSink *udpsink;
1470 
1471   udpsink = GST_MULTIUDPSINK (bsink);
1472 
1473   if (udpsink->used_socket) {
1474     if (udpsink->close_socket || !udpsink->external_socket) {
1475       GError *err = NULL;
1476 
1477       if (!g_socket_close (udpsink->used_socket, &err)) {
1478         GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
1479         g_clear_error (&err);
1480       }
1481     }
1482 
1483     g_object_unref (udpsink->used_socket);
1484     udpsink->used_socket = NULL;
1485   }
1486 
1487   if (udpsink->used_socket_v6) {
1488     if (udpsink->close_socket || !udpsink->external_socket) {
1489       GError *err = NULL;
1490 
1491       if (!g_socket_close (udpsink->used_socket_v6, &err)) {
1492         GST_ERROR_OBJECT (udpsink, "Failed to close socket: %s", err->message);
1493         g_clear_error (&err);
1494       }
1495     }
1496 
1497     g_object_unref (udpsink->used_socket_v6);
1498     udpsink->used_socket_v6 = NULL;
1499   }
1500 
1501   return TRUE;
1502 }
1503 
1504 static gint
gst_udp_client_compare_socket_family(GstUDPClient * a,GstUDPClient * b)1505 gst_udp_client_compare_socket_family (GstUDPClient * a, GstUDPClient * b)
1506 {
1507   GSocketFamily fa = g_socket_address_get_family (a->addr);
1508   GSocketFamily fb = g_socket_address_get_family (b->addr);
1509 
1510   if (fa == fb)
1511     return 0;
1512 
1513   /* a should go before b */
1514   if (fa == G_SOCKET_FAMILY_IPV4 && fb == G_SOCKET_FAMILY_IPV6)
1515     return -1;
1516 
1517   /* b should go before a */
1518   return 1;
1519 }
1520 
1521 static void
gst_multiudpsink_add_internal(GstMultiUDPSink * sink,const gchar * host,gint port,gboolean lock)1522 gst_multiudpsink_add_internal (GstMultiUDPSink * sink, const gchar * host,
1523     gint port, gboolean lock)
1524 {
1525   GSocketFamily family;
1526   GstUDPClient *client;
1527   GstUDPClient udpclient;
1528   GList *find;
1529 
1530   udpclient.host = (gchar *) host;
1531   udpclient.port = port;
1532 
1533   GST_DEBUG_OBJECT (sink, "adding client on host %s, port %d", host, port);
1534 
1535   if (lock)
1536     g_mutex_lock (&sink->client_lock);
1537 
1538   find = g_list_find_custom (sink->clients, &udpclient,
1539       (GCompareFunc) client_compare);
1540 
1541   if (!find) {
1542     find = g_list_find_custom (sink->clients_to_be_removed, &udpclient,
1543         (GCompareFunc) client_compare);
1544     if (find)
1545       gst_udp_client_ref (find->data);
1546   }
1547 
1548   if (find) {
1549     client = (GstUDPClient *) find->data;
1550 
1551     family = g_socket_address_get_family (client->addr);
1552 
1553     GST_DEBUG_OBJECT (sink, "found %d existing clients with host %s, port %d",
1554         client->add_count, host, port);
1555   } else {
1556     client = gst_udp_client_new (sink, host, port);
1557     if (!client)
1558       goto error;
1559 
1560     family = g_socket_address_get_family (client->addr);
1561 
1562     client->connect_time = g_get_real_time () * GST_USECOND;
1563 
1564     if (sink->used_socket)
1565       gst_multiudpsink_configure_client (sink, client);
1566 
1567     GST_DEBUG_OBJECT (sink, "add client with host %s, port %d", host, port);
1568 
1569     /* keep IPv4 clients at the beginning, and IPv6 at the end, we can make
1570      * use of this in gst_multiudpsink_render_buffers() */
1571     sink->clients = g_list_insert_sorted (sink->clients, client,
1572         (GCompareFunc) gst_udp_client_compare_socket_family);
1573 
1574     if (family == G_SOCKET_FAMILY_IPV4)
1575       ++sink->num_v4_unique;
1576     else
1577       ++sink->num_v6_unique;
1578   }
1579 
1580   ++client->add_count;
1581 
1582   if (family == G_SOCKET_FAMILY_IPV4)
1583     ++sink->num_v4_all;
1584   else
1585     ++sink->num_v6_all;
1586 
1587   if (lock)
1588     g_mutex_unlock (&sink->client_lock);
1589 
1590   g_signal_emit (G_OBJECT (sink),
1591       gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED], 0, host, port);
1592 
1593   GST_DEBUG_OBJECT (sink, "added client on host %s, port %d", host, port);
1594   return;
1595 
1596   /* ERRORS */
1597 error:
1598   {
1599     GST_DEBUG_OBJECT (sink, "did not add client on host %s, port %d", host,
1600         port);
1601     if (lock)
1602       g_mutex_unlock (&sink->client_lock);
1603     return;
1604   }
1605 }
1606 
1607 void
gst_multiudpsink_add(GstMultiUDPSink * sink,const gchar * host,gint port)1608 gst_multiudpsink_add (GstMultiUDPSink * sink, const gchar * host, gint port)
1609 {
1610   gst_multiudpsink_add_internal (sink, host, port, TRUE);
1611 }
1612 
1613 void
gst_multiudpsink_remove(GstMultiUDPSink * sink,const gchar * host,gint port)1614 gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port)
1615 {
1616   GSocketFamily family;
1617   GList *find;
1618   GstUDPClient udpclient;
1619   GstUDPClient *client;
1620 
1621   udpclient.host = (gchar *) host;
1622   udpclient.port = port;
1623 
1624   g_mutex_lock (&sink->client_lock);
1625   find = g_list_find_custom (sink->clients, &udpclient,
1626       (GCompareFunc) client_compare);
1627   if (!find)
1628     goto not_found;
1629 
1630   client = (GstUDPClient *) find->data;
1631 
1632   GST_DEBUG_OBJECT (sink, "found %d clients with host %s, port %d",
1633       client->add_count, host, port);
1634 
1635   --client->add_count;
1636 
1637   family = g_socket_address_get_family (client->addr);
1638   if (family == G_SOCKET_FAMILY_IPV4)
1639     --sink->num_v4_all;
1640   else
1641     --sink->num_v6_all;
1642 
1643   if (client->add_count == 0) {
1644     GInetSocketAddress *saddr = G_INET_SOCKET_ADDRESS (client->addr);
1645     GInetAddress *addr = g_inet_socket_address_get_address (saddr);
1646     GSocket *socket;
1647 
1648     /* Select socket to send from for this address */
1649     if (family == G_SOCKET_FAMILY_IPV6 || !sink->used_socket)
1650       socket = sink->used_socket_v6;
1651     else
1652       socket = sink->used_socket;
1653 
1654     GST_DEBUG_OBJECT (sink, "remove client with host %s, port %d", host, port);
1655 
1656     client->disconnect_time = g_get_real_time () * GST_USECOND;
1657 
1658     if (socket && sink->auto_multicast
1659         && g_inet_address_get_is_multicast (addr)) {
1660       GError *err = NULL;
1661 
1662       if (!g_socket_leave_multicast_group (socket, addr, FALSE,
1663               sink->multi_iface, &err)) {
1664         GST_DEBUG_OBJECT (sink, "Failed to leave multicast group: %s",
1665             err->message);
1666         g_clear_error (&err);
1667       }
1668     }
1669 
1670     if (family == G_SOCKET_FAMILY_IPV4)
1671       --sink->num_v4_unique;
1672     else
1673       --sink->num_v6_unique;
1674 
1675     /* Keep state consistent for streaming thread, so remove from client list,
1676      * but keep it around until after the signal has been emitted, in case a
1677      * callback wants to get stats for that client or so */
1678     sink->clients = g_list_delete_link (sink->clients, find);
1679 
1680     sink->clients_to_be_removed =
1681         g_list_prepend (sink->clients_to_be_removed, client);
1682 
1683     /* Unlock to emit signal before we delete the actual client */
1684     g_mutex_unlock (&sink->client_lock);
1685     g_signal_emit (G_OBJECT (sink),
1686         gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED], 0, host, port);
1687     g_mutex_lock (&sink->client_lock);
1688 
1689     sink->clients_to_be_removed =
1690         g_list_remove (sink->clients_to_be_removed, client);
1691 
1692     gst_udp_client_unref (client);
1693   }
1694   g_mutex_unlock (&sink->client_lock);
1695 
1696   return;
1697 
1698   /* ERRORS */
1699 not_found:
1700   {
1701     g_mutex_unlock (&sink->client_lock);
1702     GST_WARNING_OBJECT (sink, "client at host %s, port %d not found",
1703         host, port);
1704     return;
1705   }
1706 }
1707 
1708 static void
gst_multiudpsink_clear_internal(GstMultiUDPSink * sink,gboolean lock)1709 gst_multiudpsink_clear_internal (GstMultiUDPSink * sink, gboolean lock)
1710 {
1711   GST_DEBUG_OBJECT (sink, "clearing");
1712   /* we only need to remove the client structure, there is no additional
1713    * socket or anything to free for UDP */
1714   if (lock)
1715     g_mutex_lock (&sink->client_lock);
1716   g_list_foreach (sink->clients, (GFunc) gst_udp_client_unref, sink);
1717   g_list_free (sink->clients);
1718   sink->clients = NULL;
1719   sink->num_v4_unique = 0;
1720   sink->num_v4_all = 0;
1721   sink->num_v6_unique = 0;
1722   sink->num_v6_all = 0;
1723   if (lock)
1724     g_mutex_unlock (&sink->client_lock);
1725 }
1726 
1727 void
gst_multiudpsink_clear(GstMultiUDPSink * sink)1728 gst_multiudpsink_clear (GstMultiUDPSink * sink)
1729 {
1730   gst_multiudpsink_clear_internal (sink, TRUE);
1731 }
1732 
1733 GstStructure *
gst_multiudpsink_get_stats(GstMultiUDPSink * sink,const gchar * host,gint port)1734 gst_multiudpsink_get_stats (GstMultiUDPSink * sink, const gchar * host,
1735     gint port)
1736 {
1737   GstUDPClient *client;
1738   GstStructure *result = NULL;
1739   GstUDPClient udpclient;
1740   GList *find;
1741 
1742   udpclient.host = (gchar *) host;
1743   udpclient.port = port;
1744 
1745   g_mutex_lock (&sink->client_lock);
1746 
1747   find = g_list_find_custom (sink->clients, &udpclient,
1748       (GCompareFunc) client_compare);
1749 
1750   if (!find)
1751     find = g_list_find_custom (sink->clients_to_be_removed, &udpclient,
1752         (GCompareFunc) client_compare);
1753 
1754   if (!find)
1755     goto not_found;
1756 
1757   GST_DEBUG_OBJECT (sink, "stats for client with host %s, port %d", host, port);
1758 
1759   client = (GstUDPClient *) find->data;
1760 
1761   result = gst_structure_new_empty ("multiudpsink-stats");
1762 
1763   gst_structure_set (result,
1764       "bytes-sent", G_TYPE_UINT64, client->bytes_sent,
1765       "packets-sent", G_TYPE_UINT64, client->packets_sent,
1766       "connect-time", G_TYPE_UINT64, client->connect_time,
1767       "disconnect-time", G_TYPE_UINT64, client->disconnect_time, NULL);
1768 
1769   g_mutex_unlock (&sink->client_lock);
1770 
1771   return result;
1772 
1773   /* ERRORS */
1774 not_found:
1775   {
1776     g_mutex_unlock (&sink->client_lock);
1777     GST_WARNING_OBJECT (sink, "client with host %s, port %d not found",
1778         host, port);
1779     /* Apparently (see comment in gstmultifdsink.c) returning NULL from here may
1780      * confuse/break python bindings */
1781     return gst_structure_new_empty ("multiudpsink-stats");
1782   }
1783 }
1784 
1785 static gboolean
gst_multiudpsink_unlock(GstBaseSink * bsink)1786 gst_multiudpsink_unlock (GstBaseSink * bsink)
1787 {
1788   GstMultiUDPSink *sink;
1789 
1790   sink = GST_MULTIUDPSINK (bsink);
1791 
1792   g_cancellable_cancel (sink->cancellable);
1793 
1794   return TRUE;
1795 }
1796 
1797 static gboolean
gst_multiudpsink_unlock_stop(GstBaseSink * bsink)1798 gst_multiudpsink_unlock_stop (GstBaseSink * bsink)
1799 {
1800   GstMultiUDPSink *sink;
1801 
1802   sink = GST_MULTIUDPSINK (bsink);
1803 
1804   gst_multiudpsink_free_cancellable (sink);
1805   gst_multiudpsink_create_cancellable (sink);
1806 
1807   return TRUE;
1808 }
1809