1 /* GStreamer
2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4 * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com>
5 * Copyright (C) <2011> Collabora Ltd.
6 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
17 *
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
22 */
23
24 /**
25 * SECTION:element-multisocketsink
26 * @title: multisocketsink
27 * @see_also: tcpserversink
28 *
29 * This plugin writes incoming data to a set of file descriptors. The
30 * file descriptors can be added to multisocketsink by emitting the #GstMultiSocketSink::add signal.
31 * For each descriptor added, the #GstMultiSocketSink::client-added signal will be called.
32 *
33 * A client can also be added with the #GstMultiSocketSink::add-full signal
34 * that allows for more control over what and how much data a client
35 * initially receives.
36 *
37 * Clients can be removed from multisocketsink by emitting the #GstMultiSocketSink::remove signal. For
38 * each descriptor removed, the #GstMultiSocketSink::client-removed signal will be called. The
39 * #GstMultiSocketSink::client-removed signal can also be fired when multisocketsink decides that a
40 * client is not active anymore or, depending on the value of the
41 * #GstMultiSocketSink:recover-policy property, if the client is reading too slowly.
42 * In all cases, multisocketsink will never close a file descriptor itself.
43 * The user of multisocketsink is responsible for closing all file descriptors.
44 * This can for example be done in response to the #GstMultiSocketSink::client-fd-removed signal.
45 * Note that multisocketsink still has a reference to the file descriptor when the
46 * #GstMultiSocketSink::client-removed signal is emitted, so that "get-stats" can be performed on
47 * the descriptor; it is therefore not safe to close the file descriptor in
48 * the #GstMultiSocketSink::client-removed signal handler, and you should use the
49 * #GstMultiSocketSink::client-fd-removed signal to safely close the fd.
50 *
51 * Multisocketsink internally keeps a queue of the incoming buffers and uses a
52 * separate thread to send the buffers to the clients. This ensures that no
53 * client write can block the pipeline and that clients can read with different
54 * speeds.
55 *
56 * When adding a client to multisocketsink, the #GstMultiSocketSink:sync-method property will define
57 * which buffer in the queued buffers will be sent first to the client. Clients
58 * can be sent the most recent buffer (which might not be decodable by the
59 * client if it is not a keyframe), the next keyframe received in
60 * multisocketsink (which can take some time depending on the keyframe rate), or the
61 * last received keyframe (which will cause a simple burst-on-connect).
62 * Multisocketsink will always keep at least one keyframe in its internal buffers
63 * when the sync-mode is set to latest-keyframe.
64 *
65 * There are additional values for the #GstMultiSocketSink:sync-method
66 * property to allow finer control over burst-on-connect behaviour. By selecting
67 * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
68 * additionally requires that the burst begin with a keyframe, and
69 * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
70 * prefer a minimum burst size even if it requires not starting with a keyframe.
71 *
72 * Multisocketsink can be instructed to keep at least a minimum amount of data
73 * expressed in time or byte units in its internal queues with the
74 * #GstMultiSocketSink:time-min and #GstMultiSocketSink:bytes-min properties respectively.
75 * These properties are useful if the application adds clients with the
76 * #GstMultiSocketSink::add-full signal to make sure that a burst connect can
77 * actually be honored.
78 *
79 * When streaming data, clients are allowed to read at a different rate than
80 * the rate at which multisocketsink receives data. If the client is reading too
81 * fast, no data will be send to the client until multisocketsink receives more
82 * data. If the client, however, reads too slowly, data for that client will be
83 * queued up in multisocketsink. Two properties control the amount of data
84 * (buffers) that is queued in multisocketsink: #GstMultiSocketSink:buffers-max and
85 * #GstMultiSocketSink:buffers-soft-max. A client that falls behind by
86 * #GstMultiSocketSink:buffers-max is removed from multisocketsink forcibly.
87 *
88 * A client with a lag of at least #GstMultiSocketSink:buffers-soft-max enters the recovery
89 * procedure which is controlled with the #GstMultiSocketSink:recover-policy property.
90 * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently
91 * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
92 * positions the client to the soft limit in the buffer queue and
93 * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
94 * buffer queue.
95 *
96 * multisocketsink will by default synchronize on the clock before serving the
97 * buffers to the clients. This behaviour can be disabled by setting the sync
98 * property to FALSE. Multisocketsink will by default not do QoS and will never
99 * drop late buffers.
100 */
101
102 #ifdef HAVE_CONFIG_H
103 #include "config.h"
104 #endif
105
106 #include <gst/gst-i18n-plugin.h>
107 #include <gst/net/gstnetcontrolmessagemeta.h>
108
109 #include <string.h>
110
111 #include "gstmultisocketsink.h"
112
113 #ifndef G_OS_WIN32
114 #include <netinet/in.h>
115 #endif
116
117 #define NOT_IMPLEMENTED 0
118
119 GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug);
120 #define GST_CAT_DEFAULT (multisocketsink_debug)
121
122 /* MultiSocketSink signals and args */
123 enum
124 {
125 /* methods */
126 SIGNAL_ADD,
127 SIGNAL_ADD_BURST,
128 SIGNAL_REMOVE,
129 SIGNAL_REMOVE_FLUSH,
130 SIGNAL_GET_STATS,
131
132 /* signals */
133 SIGNAL_CLIENT_ADDED,
134 SIGNAL_CLIENT_REMOVED,
135 SIGNAL_CLIENT_SOCKET_REMOVED,
136
137 LAST_SIGNAL
138 };
139
140 #define DEFAULT_SEND_DISPATCHED FALSE
141 #define DEFAULT_SEND_MESSAGES FALSE
142
143 enum
144 {
145 PROP_0,
146 PROP_SEND_DISPATCHED,
147 PROP_SEND_MESSAGES,
148 PROP_LAST
149 };
150
151 static void gst_multi_socket_sink_finalize (GObject * object);
152
153 static void gst_multi_socket_sink_add (GstMultiSocketSink * sink,
154 GSocket * socket);
155 static void gst_multi_socket_sink_add_full (GstMultiSocketSink * sink,
156 GSocket * socket, GstSyncMethod sync, GstFormat min_format,
157 guint64 min_value, GstFormat max_format, guint64 max_value);
158 static void gst_multi_socket_sink_remove (GstMultiSocketSink * sink,
159 GSocket * socket);
160 static void gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink,
161 GSocket * socket);
162 static GstStructure *gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink,
163 GSocket * socket);
164
165 static void gst_multi_socket_sink_emit_client_added (GstMultiHandleSink * mhs,
166 GstMultiSinkHandle handle);
167 static void gst_multi_socket_sink_emit_client_removed (GstMultiHandleSink * mhs,
168 GstMultiSinkHandle handle, GstClientStatus status);
169
170 static void gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink);
171 static void gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink);
172 static gboolean gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink);
173 static gpointer gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink);
174 static GstMultiHandleClient
175 * gst_multi_socket_sink_new_client (GstMultiHandleSink * mhsink,
176 GstMultiSinkHandle handle, GstSyncMethod sync_method);
177 static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client);
178 static void gst_multi_socket_sink_client_free (GstMultiHandleSink * mhsink,
179 GstMultiHandleClient * client);
180 static void gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle,
181 gchar debug[30]);
182
183 static gpointer gst_multi_socket_sink_handle_hash_key (GstMultiSinkHandle
184 handle);
185 static void gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
186 GstMultiHandleClient * mhclient);
187 static void gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
188 GstMultiHandleClient * mhclient);
189 static void gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
190 GstSocketClient * client);
191
192 static gboolean gst_multi_socket_sink_socket_condition (GstMultiSinkHandle
193 handle, GIOCondition condition, GstMultiSocketSink * sink);
194
195 static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink);
196 static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink);
197
198 static gboolean gst_multi_socket_sink_propose_allocation (GstBaseSink * bsink,
199 GstQuery * query);
200
201 static void gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
202 const GValue * value, GParamSpec * pspec);
203 static void gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
204 GValue * value, GParamSpec * pspec);
205
206 #define gst_multi_socket_sink_parent_class parent_class
207 G_DEFINE_TYPE (GstMultiSocketSink, gst_multi_socket_sink,
208 GST_TYPE_MULTI_HANDLE_SINK);
209
210 static guint gst_multi_socket_sink_signals[LAST_SIGNAL] = { 0 };
211
212 static void
gst_multi_socket_sink_class_init(GstMultiSocketSinkClass * klass)213 gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
214 {
215 GObjectClass *gobject_class;
216 GstElementClass *gstelement_class;
217 GstBaseSinkClass *gstbasesink_class;
218 GstMultiHandleSinkClass *gstmultihandlesink_class;
219
220 gobject_class = (GObjectClass *) klass;
221 gstelement_class = (GstElementClass *) klass;
222 gstbasesink_class = (GstBaseSinkClass *) klass;
223 gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass;
224
225 gobject_class->set_property = gst_multi_socket_sink_set_property;
226 gobject_class->get_property = gst_multi_socket_sink_get_property;
227 gobject_class->finalize = gst_multi_socket_sink_finalize;
228
229 /**
230 * GstMultiSocketSink:send-dispatched:
231 *
232 * Sends a GstNetworkMessageDispatched event upstream whenever a buffer
233 * is sent to a client.
234 * The event is a CUSTOM event name GstNetworkMessageDispatched and
235 * contains:
236 *
237 * "object" G_TYPE_OBJECT : the object identifying the client
238 * "buffer" GST_TYPE_BUFFER : the buffer sent to the client
239 *
240 * Since: 1.8.0
241 */
242 g_object_class_install_property (gobject_class, PROP_SEND_DISPATCHED,
243 g_param_spec_boolean ("send-dispatched", "Send Dispatched",
244 "If GstNetworkMessageDispatched events should be pushed",
245 DEFAULT_SEND_DISPATCHED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
246 /**
247 * GstMultiSocketSink:send-messages:
248 *
249 * Sends a GstNetworkMessage event upstream whenever a buffer
250 * is received from a client.
251 * The event is a CUSTOM event name GstNetworkMessage and contains:
252 *
253 * "object" G_TYPE_OBJECT : the object identifying the client
254 * "buffer" GST_TYPE_BUFFER : the buffer with data received from the
255 * client
256 *
257 * Since: 1.8.0
258 */
259 g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
260 g_param_spec_boolean ("send-messages", "Send Messages",
261 "If GstNetworkMessage events should be pushed", DEFAULT_SEND_MESSAGES,
262 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
263
264 /**
265 * GstMultiSocketSink::add:
266 * @gstmultisocketsink: the multisocketsink element to emit this signal on
267 * @socket: the socket to add to multisocketsink
268 *
269 * Hand the given open socket to multisocketsink to write to.
270 */
271 gst_multi_socket_sink_signals[SIGNAL_ADD] =
272 g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
273 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
274 G_STRUCT_OFFSET (GstMultiSocketSinkClass, add), NULL, NULL,
275 g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_SOCKET);
276 /**
277 * GstMultiSocketSink::add-full:
278 * @gstmultisocketsink: the multisocketsink element to emit this signal on
279 * @socket: the socket to add to multisocketsink
280 * @sync: the sync method to use
281 * @format_min: the format of @value_min
282 * @value_min: the minimum amount of data to burst expressed in
283 * @format_min units.
284 * @format_max: the format of @value_max
285 * @value_max: the maximum amount of data to burst expressed in
286 * @format_max units.
287 *
288 * Hand the given open socket to multisocketsink to write to and
289 * specify the burst parameters for the new connection.
290 */
291 gst_multi_socket_sink_signals[SIGNAL_ADD_BURST] =
292 g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
293 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
294 G_STRUCT_OFFSET (GstMultiSocketSinkClass, add_full), NULL, NULL,
295 g_cclosure_marshal_generic, G_TYPE_NONE, 6,
296 G_TYPE_SOCKET, GST_TYPE_SYNC_METHOD, GST_TYPE_FORMAT, G_TYPE_UINT64,
297 GST_TYPE_FORMAT, G_TYPE_UINT64);
298 /**
299 * GstMultiSocketSink::remove:
300 * @gstmultisocketsink: the multisocketsink element to emit this signal on
301 * @socket: the socket to remove from multisocketsink
302 *
303 * Remove the given open socket from multisocketsink.
304 */
305 gst_multi_socket_sink_signals[SIGNAL_REMOVE] =
306 g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
307 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
308 G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove), NULL, NULL,
309 g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_SOCKET);
310 /**
311 * GstMultiSocketSink::remove-flush:
312 * @gstmultisocketsink: the multisocketsink element to emit this signal on
313 * @socket: the socket to remove from multisocketsink
314 *
315 * Remove the given open socket from multisocketsink after flushing all
316 * the pending data to the socket.
317 */
318 gst_multi_socket_sink_signals[SIGNAL_REMOVE_FLUSH] =
319 g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass),
320 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
321 G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove_flush), NULL, NULL,
322 g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_SOCKET);
323
324 /**
325 * GstMultiSocketSink::get-stats:
326 * @gstmultisocketsink: the multisocketsink element to emit this signal on
327 * @socket: the socket to get stats of from multisocketsink
328 *
329 * Get statistics about @socket. This function returns a GstStructure.
330 *
331 * Returns: a GstStructure with the statistics. The structure contains
332 * values that represent: total number of bytes sent, time
333 * when the client was added, time when the client was
334 * disconnected/removed, time the client is/was active, last activity
335 * time (in epoch seconds), number of buffers dropped.
336 * All times are expressed in nanoseconds (GstClockTime).
337 */
338 gst_multi_socket_sink_signals[SIGNAL_GET_STATS] =
339 g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
340 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
341 G_STRUCT_OFFSET (GstMultiSocketSinkClass, get_stats), NULL, NULL,
342 g_cclosure_marshal_generic, GST_TYPE_STRUCTURE, 1, G_TYPE_SOCKET);
343
344 /**
345 * GstMultiSocketSink::client-added:
346 * @gstmultisocketsink: the multisocketsink element that emitted this signal
347 * @socket: the socket that was added to multisocketsink
348 *
349 * The given socket was added to multisocketsink. This signal will
350 * be emitted from the streaming thread so application should be prepared
351 * for that.
352 */
353 gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED] =
354 g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
355 G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
356 G_TYPE_NONE, 1, G_TYPE_OBJECT);
357 /**
358 * GstMultiSocketSink::client-removed:
359 * @gstmultisocketsink: the multisocketsink element that emitted this signal
360 * @socket: the socket that is to be removed from multisocketsink
361 * @status: the reason why the client was removed
362 *
363 * The given socket is about to be removed from multisocketsink. This
364 * signal will be emitted from the streaming thread so applications should
365 * be prepared for that.
366 *
367 * @gstmultisocketsink still holds a handle to @socket so it is possible to call
368 * the get-stats signal from this callback. For the same reason it is
369 * not safe to close() and reuse @socket in this callback.
370 */
371 gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED] =
372 g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
373 G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
374 G_TYPE_NONE, 2, G_TYPE_SOCKET, GST_TYPE_CLIENT_STATUS);
375 /**
376 * GstMultiSocketSink::client-socket-removed:
377 * @gstmultisocketsink: the multisocketsink element that emitted this signal
378 * @socket: the socket that was removed from multisocketsink
379 *
380 * The given socket was removed from multisocketsink. This signal will
381 * be emitted from the streaming thread so applications should be prepared
382 * for that.
383 *
384 * In this callback, @gstmultisocketsink has removed all the information
385 * associated with @socket and it is therefore not possible to call get-stats
386 * with @socket. It is however safe to close() and reuse @fd in the callback.
387 */
388 gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED] =
389 g_signal_new ("client-socket-removed", G_TYPE_FROM_CLASS (klass),
390 G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
391 G_TYPE_NONE, 1, G_TYPE_SOCKET);
392
393 gst_element_class_set_static_metadata (gstelement_class,
394 "Multi socket sink", "Sink/Network",
395 "Send data to multiple sockets",
396 "Thomas Vander Stichele <thomas at apestaart dot org>, "
397 "Wim Taymans <wim@fluendo.com>, "
398 "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
399
400 gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock);
401 gstbasesink_class->unlock_stop =
402 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop);
403 gstbasesink_class->propose_allocation =
404 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_propose_allocation);
405
406 klass->add = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add);
407 klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add_full);
408 klass->remove = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove);
409 klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_flush);
410 klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_get_stats);
411
412 gstmultihandlesink_class->emit_client_added =
413 gst_multi_socket_sink_emit_client_added;
414 gstmultihandlesink_class->emit_client_removed =
415 gst_multi_socket_sink_emit_client_removed;
416
417 gstmultihandlesink_class->stop_pre =
418 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_pre);
419 gstmultihandlesink_class->stop_post =
420 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_post);
421 gstmultihandlesink_class->start_pre =
422 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_start_pre);
423 gstmultihandlesink_class->thread =
424 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_thread);
425 gstmultihandlesink_class->new_client =
426 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_new_client);
427 gstmultihandlesink_class->client_get_fd =
428 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_get_fd);
429 gstmultihandlesink_class->client_free =
430 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_free);
431 gstmultihandlesink_class->handle_debug =
432 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_debug);
433 gstmultihandlesink_class->handle_hash_key =
434 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_hash_key);
435 gstmultihandlesink_class->hash_adding =
436 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_hash_adding);
437 gstmultihandlesink_class->hash_removing =
438 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_hash_removing);
439
440 GST_DEBUG_CATEGORY_INIT (multisocketsink_debug, "multisocketsink", 0,
441 "Multi socket sink");
442 }
443
444 static void
gst_multi_socket_sink_init(GstMultiSocketSink * this)445 gst_multi_socket_sink_init (GstMultiSocketSink * this)
446 {
447 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (this);
448
449 mhsink->handle_hash = g_hash_table_new (g_direct_hash, g_int_equal);
450
451 this->cancellable = g_cancellable_new ();
452 this->send_dispatched = DEFAULT_SEND_DISPATCHED;
453 this->send_messages = DEFAULT_SEND_MESSAGES;
454 }
455
456 static void
gst_multi_socket_sink_finalize(GObject * object)457 gst_multi_socket_sink_finalize (GObject * object)
458 {
459 GstMultiSocketSink *this = GST_MULTI_SOCKET_SINK (object);
460
461 if (this->cancellable) {
462 g_object_unref (this->cancellable);
463 this->cancellable = NULL;
464 }
465
466 G_OBJECT_CLASS (parent_class)->finalize (object);
467 }
468
469 /* methods to emit signals */
470
471 static void
gst_multi_socket_sink_emit_client_added(GstMultiHandleSink * mhsink,GstMultiSinkHandle handle)472 gst_multi_socket_sink_emit_client_added (GstMultiHandleSink * mhsink,
473 GstMultiSinkHandle handle)
474 {
475 g_signal_emit (mhsink, gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0,
476 handle.socket);
477 }
478
479 static void
gst_multi_socket_sink_emit_client_removed(GstMultiHandleSink * mhsink,GstMultiSinkHandle handle,GstClientStatus status)480 gst_multi_socket_sink_emit_client_removed (GstMultiHandleSink * mhsink,
481 GstMultiSinkHandle handle, GstClientStatus status)
482 {
483 g_signal_emit (mhsink, gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED],
484 0, handle.socket, status);
485 }
486
487 /* action signals */
488
489 static void
gst_multi_socket_sink_add(GstMultiSocketSink * sink,GSocket * socket)490 gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket)
491 {
492 GstMultiSinkHandle handle;
493
494 handle.socket = socket;
495 gst_multi_handle_sink_add (GST_MULTI_HANDLE_SINK_CAST (sink), handle);
496 }
497
498 static void
gst_multi_socket_sink_add_full(GstMultiSocketSink * sink,GSocket * socket,GstSyncMethod sync,GstFormat min_format,guint64 min_value,GstFormat max_format,guint64 max_value)499 gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
500 GstSyncMethod sync, GstFormat min_format, guint64 min_value,
501 GstFormat max_format, guint64 max_value)
502 {
503 GstMultiSinkHandle handle;
504
505 handle.socket = socket;
506 gst_multi_handle_sink_add_full (GST_MULTI_HANDLE_SINK_CAST (sink), handle,
507 sync, min_format, min_value, max_format, max_value);
508 }
509
510 static void
gst_multi_socket_sink_remove(GstMultiSocketSink * sink,GSocket * socket)511 gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket)
512 {
513 GstMultiSinkHandle handle;
514
515 handle.socket = socket;
516 gst_multi_handle_sink_remove (GST_MULTI_HANDLE_SINK_CAST (sink), handle);
517 }
518
519 static void
gst_multi_socket_sink_remove_flush(GstMultiSocketSink * sink,GSocket * socket)520 gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket)
521 {
522 GstMultiSinkHandle handle;
523
524 handle.socket = socket;
525 gst_multi_handle_sink_remove_flush (GST_MULTI_HANDLE_SINK_CAST (sink),
526 handle);
527 }
528
529 static GstStructure *
gst_multi_socket_sink_get_stats(GstMultiSocketSink * sink,GSocket * socket)530 gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, GSocket * socket)
531 {
532 GstMultiSinkHandle handle;
533
534 handle.socket = socket;
535 return gst_multi_handle_sink_get_stats (GST_MULTI_HANDLE_SINK_CAST (sink),
536 handle);
537 }
538
539 static GstMultiHandleClient *
gst_multi_socket_sink_new_client(GstMultiHandleSink * mhsink,GstMultiSinkHandle handle,GstSyncMethod sync_method)540 gst_multi_socket_sink_new_client (GstMultiHandleSink * mhsink,
541 GstMultiSinkHandle handle, GstSyncMethod sync_method)
542 {
543 GstSocketClient *client;
544 GstMultiHandleClient *mhclient;
545 GstMultiHandleSinkClass *mhsinkclass =
546 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
547
548 /* create client datastructure */
549 g_assert (G_IS_SOCKET (handle.socket));
550 client = g_new0 (GstSocketClient, 1);
551 mhclient = (GstMultiHandleClient *) client;
552
553 mhclient->handle.socket = G_SOCKET (g_object_ref (handle.socket));
554
555 gst_multi_handle_sink_client_init (mhclient, sync_method);
556 mhsinkclass->handle_debug (handle, mhclient->debug);
557
558 /* set the socket to non blocking */
559 g_socket_set_blocking (handle.socket, FALSE);
560
561 /* we always read from a client */
562 mhsinkclass->hash_adding (mhsink, mhclient);
563
564 gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient);
565
566 return mhclient;
567 }
568
569 static int
gst_multi_socket_sink_client_get_fd(GstMultiHandleClient * client)570 gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client)
571 {
572 return g_socket_get_fd (client->handle.socket);
573 }
574
575 static void
gst_multi_socket_sink_client_free(GstMultiHandleSink * mhsink,GstMultiHandleClient * client)576 gst_multi_socket_sink_client_free (GstMultiHandleSink * mhsink,
577 GstMultiHandleClient * client)
578 {
579 g_assert (G_IS_SOCKET (client->handle.socket));
580
581 g_signal_emit (mhsink,
582 gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0,
583 client->handle.socket);
584
585 g_object_unref (client->handle.socket);
586 }
587
588 static void
gst_multi_socket_sink_handle_debug(GstMultiSinkHandle handle,gchar debug[30])589 gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle, gchar debug[30])
590 {
591 g_snprintf (debug, 30, "[socket %p]", handle.socket);
592 }
593
594 static gpointer
gst_multi_socket_sink_handle_hash_key(GstMultiSinkHandle handle)595 gst_multi_socket_sink_handle_hash_key (GstMultiSinkHandle handle)
596 {
597 return handle.socket;
598 }
599
600 /* handle a read on a client socket,
601 * which either indicates a close or should be ignored
602 * returns FALSE if some error occured or the client closed. */
603 static gboolean
gst_multi_socket_sink_handle_client_read(GstMultiSocketSink * sink,GstSocketClient * client)604 gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
605 GstSocketClient * client)
606 {
607 gboolean ret, do_event;
608 gchar dummy[256], *mem, *omem;
609 gssize nread;
610 GError *err = NULL;
611 gboolean first = TRUE;
612 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
613 gssize navail, maxmem;
614
615 GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug);
616
617 ret = TRUE;
618
619 navail = g_socket_get_available_bytes (mhclient->handle.socket);
620 if (navail < 0)
621 return TRUE;
622
623 /* only collect the data in a buffer when we need to send it with an event */
624 do_event = sink->send_messages && navail > 0;
625 if (do_event) {
626 omem = mem = g_malloc (navail);
627 maxmem = navail;
628 } else {
629 mem = dummy;
630 maxmem = sizeof (dummy);
631 }
632
633 /* just Read 'n' Drop, could also just drop the client as it's not supposed
634 * to write to us except for closing the socket, I guess it's because we
635 * like to listen to our customers. */
636 do {
637 GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug);
638
639 nread =
640 g_socket_receive (mhclient->handle.socket, mem, MIN (navail,
641 maxmem), sink->cancellable, &err);
642
643 if (first && nread == 0) {
644 /* client sent close, so remove it */
645 GST_DEBUG_OBJECT (sink, "%s client asked for close, removing",
646 mhclient->debug);
647 mhclient->status = GST_CLIENT_STATUS_CLOSED;
648 ret = FALSE;
649 break;
650 } else if (nread < 0) {
651 if (err->code == G_IO_ERROR_WOULD_BLOCK)
652 break;
653
654 GST_WARNING_OBJECT (sink, "%s could not read: %s",
655 mhclient->debug, err->message);
656 mhclient->status = GST_CLIENT_STATUS_ERROR;
657 ret = FALSE;
658 break;
659 }
660 navail -= nread;
661 if (do_event)
662 mem += nread;
663 first = FALSE;
664 } while (navail > 0);
665 g_clear_error (&err);
666
667 if (do_event) {
668 if (ret) {
669 GstBuffer *buf;
670 GstEvent *ev;
671
672 buf = gst_buffer_new_wrapped (omem, maxmem);
673 ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
674 gst_structure_new ("GstNetworkMessage",
675 "object", G_TYPE_OBJECT, mhclient->handle.socket,
676 "buffer", GST_TYPE_BUFFER, buf, NULL));
677 gst_buffer_unref (buf);
678
679 gst_pad_push_event (GST_BASE_SINK_PAD (sink), ev);
680 } else
681 g_free (omem);
682 }
683 return ret;
684 }
685
686 /**
687 * map_memory_output_vector_n:
688 * @buf: The #GstBuffer that should be mapped
689 * @offset: Offset into the buffer that should be mapped
690 * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
691 * @mapinfo: (out,array length=num_vectors): an array of #GstMapInfo structs to write into
692 * @num_vectors: the number of elements in @vectors to prevent buffer overruns
693 *
694 * Maps a buffer into memory, populating a #GOutputVector to use scatter-gather
695 * I/O to send the data over a socket. The whole buffer won't be mapped into
696 * memory if it consists of more than @num_vectors #GstMemory s.
697 *
698 * Use #unmap_n_memorys after you are
699 * finished with the mappings.
700 *
701 * Returns: The number of GstMemorys mapped
702 */
703 static int
map_n_memory_output_vector(GstBuffer * buf,size_t offset,GOutputVector * vectors,GstMapInfo * mapinfo,int num_vectors)704 map_n_memory_output_vector (GstBuffer * buf, size_t offset,
705 GOutputVector * vectors, GstMapInfo * mapinfo, int num_vectors)
706 {
707 guint mem_idx, mem_len;
708 gsize mem_skip;
709 size_t maxsize;
710 int i;
711
712 g_return_val_if_fail (num_vectors > 0, 0);
713 memset (vectors, 0, sizeof (GOutputVector) * num_vectors);
714
715 maxsize = gst_buffer_get_size (buf) - offset;
716 if (!gst_buffer_find_memory (buf, offset, maxsize, &mem_idx, &mem_len,
717 &mem_skip))
718 g_error ("Unable to map memory at offset %" G_GSIZE_FORMAT ", buffer "
719 "length is %" G_GSIZE_FORMAT, offset, gst_buffer_get_size (buf));
720
721 for (i = 0; i < mem_len && i < num_vectors; i++) {
722 GstMapInfo map = { 0 };
723 GstMemory *mem = gst_buffer_peek_memory (buf, mem_idx + i);
724 if (!gst_memory_map (mem, &map, GST_MAP_READ))
725 g_error ("Unable to map memory %p. This should never happen.", mem);
726
727 if (i == 0) {
728 vectors[i].buffer = map.data + mem_skip;
729 vectors[i].size = map.size - mem_skip;
730 } else {
731 vectors[i].buffer = map.data;
732 vectors[i].size = map.size;
733 }
734 mapinfo[i] = map;
735 }
736 return i;
737 }
738
739 /**
740 * map_n_memory_output_vector:
741 * @buf: The #GstBuffer that should be mapped
742 * @offset: Offset into the buffer that should be mapped
743 * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
744 * @num_vectors: the number of elements in @vectors to prevent buffer overruns
745 *
746 * Returns: The number of GstMemorys mapped
747 */
748 static void
unmap_n_memorys(GstMapInfo * mapinfo,int num_mappings)749 unmap_n_memorys (GstMapInfo * mapinfo, int num_mappings)
750 {
751 int i;
752 g_return_if_fail (num_mappings > 0);
753
754 for (i = 0; i < num_mappings; i++)
755 gst_memory_unmap (mapinfo[i].memory, &mapinfo[i]);
756 }
757
758 static gsize
gst_buffer_get_cmsg_list(GstBuffer * buf,GSocketControlMessage ** msgs,gsize msg_space)759 gst_buffer_get_cmsg_list (GstBuffer * buf, GSocketControlMessage ** msgs,
760 gsize msg_space)
761 {
762 gpointer iter_state = NULL;
763 GstMeta *meta;
764 gsize msg_count = 0;
765
766 while ((meta = gst_buffer_iterate_meta (buf, &iter_state)) != NULL
767 && msg_count < msg_space) {
768 if (meta->info->api == GST_NET_CONTROL_MESSAGE_META_API_TYPE)
769 msgs[msg_count++] = ((GstNetControlMessageMeta *) meta)->message;
770 }
771
772 return msg_count;
773 }
774
775 #define CMSG_MAX 255
776
777 static gssize
gst_multi_socket_sink_write(GstMultiSocketSink * sink,GSocket * sock,GstBuffer * buffer,gsize bufoffset,GCancellable * cancellable,GError ** err)778 gst_multi_socket_sink_write (GstMultiSocketSink * sink,
779 GSocket * sock, GstBuffer * buffer, gsize bufoffset,
780 GCancellable * cancellable, GError ** err)
781 {
782 GstMapInfo maps[8];
783 GOutputVector vec[8];
784 guint mems_mapped;
785 gssize wrote;
786 GSocketControlMessage *cmsgs[CMSG_MAX];
787 gsize msg_count;
788
789 mems_mapped = map_n_memory_output_vector (buffer, bufoffset, vec, maps, 8);
790
791 msg_count = gst_buffer_get_cmsg_list (buffer, cmsgs, CMSG_MAX);
792
793 wrote =
794 g_socket_send_message (sock, NULL, vec, mems_mapped, cmsgs, msg_count, 0,
795 cancellable, err);
796 unmap_n_memorys (maps, mems_mapped);
797 return wrote;
798 }
799
800 /* Handle a write on a client,
801 * which indicates a read request from a client.
802 *
803 * For each client we maintain a queue of GstBuffers that contain the raw bytes
804 * we need to send to the client.
805 *
806 * We first check to see if we need to send streamheaders. If so, we queue them.
807 *
808 * Then we run into the main loop that tries to send as many buffers as
809 * possible. It will first exhaust the mhclient->sending queue and if the queue
810 * is empty, it will pick a buffer from the global queue.
811 *
812 * Sending the buffers from the mhclient->sending queue is basically writing
813 * the bytes to the socket and maintaining a count of the bytes that were
814 * sent. When the buffer is completely sent, it is removed from the
815 * mhclient->sending queue and we try to pick a new buffer for sending.
816 *
817 * When the sending returns a partial buffer we stop sending more data as
818 * the next send operation could block.
819 *
820 * This functions returns FALSE if some error occured.
821 */
822 static gboolean
gst_multi_socket_sink_handle_client_write(GstMultiSocketSink * sink,GstSocketClient * client)823 gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
824 GstSocketClient * client)
825 {
826 gboolean more;
827 gboolean flushing;
828 GstClockTime now;
829 GTimeVal nowtv;
830 GError *err = NULL;
831 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
832 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
833 GstMultiHandleSinkClass *mhsinkclass =
834 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
835
836
837 g_get_current_time (&nowtv);
838 now = GST_TIMEVAL_TO_TIME (nowtv);
839
840 flushing = mhclient->status == GST_CLIENT_STATUS_FLUSHING;
841
842 more = TRUE;
843 do {
844 if (!mhclient->sending) {
845 /* client is not working on a buffer */
846 if (mhclient->bufpos == -1) {
847 /* client is too fast, remove from write queue until new buffer is
848 * available */
849 gst_multi_socket_sink_stop_sending (sink, client);
850
851 /* if we flushed out all of the client buffers, we can stop */
852 if (mhclient->flushcount == 0)
853 goto flushed;
854
855 return TRUE;
856 } else {
857 /* client can pick a buffer from the global queue */
858 GstBuffer *buf;
859 GstClockTime timestamp;
860
861 /* for new connections, we need to find a good spot in the
862 * bufqueue to start streaming from */
863 if (mhclient->new_connection && !flushing) {
864 gint position =
865 gst_multi_handle_sink_new_client_position (mhsink, mhclient);
866
867 if (position >= 0) {
868 /* we got a valid spot in the queue */
869 mhclient->new_connection = FALSE;
870 mhclient->bufpos = position;
871 } else {
872 /* cannot send data to this client yet */
873 gst_multi_socket_sink_stop_sending (sink, client);
874 return TRUE;
875 }
876 }
877
878 /* we flushed all remaining buffers, no need to get a new one */
879 if (mhclient->flushcount == 0)
880 goto flushed;
881
882 /* grab buffer */
883 buf = g_array_index (mhsink->bufqueue, GstBuffer *, mhclient->bufpos);
884 mhclient->bufpos--;
885
886 /* update stats */
887 timestamp = GST_BUFFER_TIMESTAMP (buf);
888 if (mhclient->first_buffer_ts == GST_CLOCK_TIME_NONE)
889 mhclient->first_buffer_ts = timestamp;
890 if (timestamp != -1)
891 mhclient->last_buffer_ts = timestamp;
892
893 /* decrease flushcount */
894 if (mhclient->flushcount != -1)
895 mhclient->flushcount--;
896
897 GST_LOG_OBJECT (sink, "%s client %p at position %d",
898 mhclient->debug, client, mhclient->bufpos);
899
900 /* queueing a buffer will ref it */
901 mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
902
903 /* need to start from the first byte for this new buffer */
904 mhclient->bufoffset = 0;
905 }
906 }
907
908 /* see if we need to send something */
909 if (mhclient->sending) {
910 gssize wrote;
911 GstBuffer *head;
912
913 /* pick first buffer from list */
914 head = GST_BUFFER (mhclient->sending->data);
915
916 wrote = gst_multi_socket_sink_write (sink, mhclient->handle.socket, head,
917 mhclient->bufoffset, sink->cancellable, &err);
918
919 if (wrote < 0) {
920 /* hmm error.. */
921 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED)) {
922 goto connection_reset;
923 } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
924 /* write would block, try again later */
925 GST_LOG_OBJECT (sink, "write would block %p",
926 mhclient->handle.socket);
927 more = FALSE;
928 g_clear_error (&err);
929 } else {
930 goto write_error;
931 }
932 } else {
933 if (wrote < (gst_buffer_get_size (head) - mhclient->bufoffset)) {
934 /* partial write, try again now */
935 GST_LOG_OBJECT (sink,
936 "partial write on %p of %" G_GSSIZE_FORMAT " bytes",
937 mhclient->handle.socket, wrote);
938 mhclient->bufoffset += wrote;
939 } else {
940 if (sink->send_dispatched) {
941 gst_pad_push_event (GST_BASE_SINK_PAD (mhsink),
942 gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
943 gst_structure_new ("GstNetworkMessageDispatched",
944 "object", G_TYPE_OBJECT, mhclient->handle.socket,
945 "buffer", GST_TYPE_BUFFER, head, NULL)));
946 }
947 /* complete buffer was written, we can proceed to the next one */
948 mhclient->sending = g_slist_remove (mhclient->sending, head);
949 gst_buffer_unref (head);
950 /* make sure we start from byte 0 for the next buffer */
951 mhclient->bufoffset = 0;
952 }
953 /* update stats */
954 mhclient->bytes_sent += wrote;
955 mhclient->last_activity_time = now;
956 mhsink->bytes_served += wrote;
957 }
958 }
959 } while (more);
960
961 return TRUE;
962
963 /* ERRORS */
964 flushed:
965 {
966 GST_DEBUG_OBJECT (sink, "%s flushed, removing", mhclient->debug);
967 mhclient->status = GST_CLIENT_STATUS_REMOVED;
968 return FALSE;
969 }
970 connection_reset:
971 {
972 GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing",
973 mhclient->debug);
974 mhclient->status = GST_CLIENT_STATUS_CLOSED;
975 g_clear_error (&err);
976 return FALSE;
977 }
978 write_error:
979 {
980 GST_WARNING_OBJECT (sink,
981 "%s could not write, removing client: %s", mhclient->debug,
982 err->message);
983 g_clear_error (&err);
984 mhclient->status = GST_CLIENT_STATUS_ERROR;
985 return FALSE;
986 }
987 }
988
989 static void
ensure_condition(GstMultiSocketSink * sink,GstSocketClient * client,GIOCondition condition)990 ensure_condition (GstMultiSocketSink * sink, GstSocketClient * client,
991 GIOCondition condition)
992 {
993 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
994
995 if (client->condition == condition)
996 return;
997
998 if (client->source) {
999 g_source_destroy (client->source);
1000 g_source_unref (client->source);
1001 }
1002 if (condition && sink->main_context) {
1003 client->source = g_socket_create_source (mhclient->handle.socket,
1004 condition, sink->cancellable);
1005 g_source_set_callback (client->source,
1006 (GSourceFunc) gst_multi_socket_sink_socket_condition,
1007 gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
1008 g_source_attach (client->source, sink->main_context);
1009 } else {
1010 client->source = NULL;
1011 condition = 0;
1012 }
1013 client->condition = condition;
1014 }
1015
1016 static void
gst_multi_socket_sink_hash_adding(GstMultiHandleSink * mhsink,GstMultiHandleClient * mhclient)1017 gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
1018 GstMultiHandleClient * mhclient)
1019 {
1020 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1021 GstSocketClient *client = (GstSocketClient *) (mhclient);
1022
1023 ensure_condition (sink, client,
1024 G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP);
1025 }
1026
1027 static void
gst_multi_socket_sink_hash_removing(GstMultiHandleSink * mhsink,GstMultiHandleClient * mhclient)1028 gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
1029 GstMultiHandleClient * mhclient)
1030 {
1031 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1032 GstSocketClient *client = (GstSocketClient *) (mhclient);
1033
1034 ensure_condition (sink, client, 0);
1035 }
1036
1037 static void
gst_multi_socket_sink_stop_sending(GstMultiSocketSink * sink,GstSocketClient * client)1038 gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
1039 GstSocketClient * client)
1040 {
1041 ensure_condition (sink, client, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
1042 }
1043
1044 /* Handle the clients. This is called when a socket becomes ready
1045 * to read or writable. Badly behaving clients are put on a
1046 * garbage list and removed.
1047 */
1048 static gboolean
gst_multi_socket_sink_socket_condition(GstMultiSinkHandle handle,GIOCondition condition,GstMultiSocketSink * sink)1049 gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle,
1050 GIOCondition condition, GstMultiSocketSink * sink)
1051 {
1052 GList *clink;
1053 GstSocketClient *client;
1054 gboolean ret = TRUE;
1055 GstMultiHandleClient *mhclient;
1056 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1057 GstMultiHandleSinkClass *mhsinkclass =
1058 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1059
1060 CLIENTS_LOCK (mhsink);
1061 clink = g_hash_table_lookup (mhsink->handle_hash,
1062 mhsinkclass->handle_hash_key (handle));
1063 if (clink == NULL) {
1064 ret = FALSE;
1065 goto done;
1066 }
1067
1068 client = clink->data;
1069 mhclient = (GstMultiHandleClient *) client;
1070
1071 if (mhclient->status != GST_CLIENT_STATUS_FLUSHING
1072 && mhclient->status != GST_CLIENT_STATUS_OK) {
1073 gst_multi_handle_sink_remove_client_link (mhsink, clink);
1074 ret = FALSE;
1075 goto done;
1076 }
1077
1078 if ((condition & G_IO_ERR)) {
1079 GST_WARNING_OBJECT (sink, "%s has error", mhclient->debug);
1080 mhclient->status = GST_CLIENT_STATUS_ERROR;
1081 gst_multi_handle_sink_remove_client_link (mhsink, clink);
1082 ret = FALSE;
1083 goto done;
1084 } else if ((condition & G_IO_HUP)) {
1085 mhclient->status = GST_CLIENT_STATUS_CLOSED;
1086 gst_multi_handle_sink_remove_client_link (mhsink, clink);
1087 ret = FALSE;
1088 goto done;
1089 }
1090 if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
1091 /* handle client read */
1092 if (!gst_multi_socket_sink_handle_client_read (sink, client)) {
1093 gst_multi_handle_sink_remove_client_link (mhsink, clink);
1094 ret = FALSE;
1095 goto done;
1096 }
1097 }
1098 if ((condition & G_IO_OUT)) {
1099 /* handle client write */
1100 if (!gst_multi_socket_sink_handle_client_write (sink, client)) {
1101 gst_multi_handle_sink_remove_client_link (mhsink, clink);
1102 ret = FALSE;
1103 goto done;
1104 }
1105 }
1106
1107 done:
1108 CLIENTS_UNLOCK (mhsink);
1109
1110 return ret;
1111 }
1112
1113 static gboolean
gst_multi_socket_sink_timeout(GstMultiSocketSink * sink)1114 gst_multi_socket_sink_timeout (GstMultiSocketSink * sink)
1115 {
1116 GstClockTime now;
1117 GTimeVal nowtv;
1118 GList *clients;
1119 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1120
1121 g_get_current_time (&nowtv);
1122 now = GST_TIMEVAL_TO_TIME (nowtv);
1123
1124 CLIENTS_LOCK (mhsink);
1125 for (clients = mhsink->clients; clients; clients = clients->next) {
1126 GstSocketClient *client;
1127 GstMultiHandleClient *mhclient;
1128
1129 client = clients->data;
1130 mhclient = (GstMultiHandleClient *) client;
1131 if (mhsink->timeout > 0
1132 && now - mhclient->last_activity_time > mhsink->timeout) {
1133 mhclient->status = GST_CLIENT_STATUS_SLOW;
1134 gst_multi_handle_sink_remove_client_link (mhsink, clients);
1135 }
1136 }
1137 CLIENTS_UNLOCK (mhsink);
1138
1139 return FALSE;
1140 }
1141
1142 /* we handle the client communication in another thread so that we do not block
1143 * the gstreamer thread while we select() on the client fds */
1144 static gpointer
gst_multi_socket_sink_thread(GstMultiHandleSink * mhsink)1145 gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink)
1146 {
1147 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1148 GSource *timeout = NULL;
1149
1150 while (mhsink->running) {
1151 if (mhsink->timeout > 0) {
1152 timeout = g_timeout_source_new (mhsink->timeout / GST_MSECOND);
1153
1154 g_source_set_callback (timeout,
1155 (GSourceFunc) gst_multi_socket_sink_timeout, gst_object_ref (sink),
1156 (GDestroyNotify) gst_object_unref);
1157 g_source_attach (timeout, sink->main_context);
1158 }
1159
1160 /* Returns after handling all pending events or when
1161 * _wakeup() was called. In any case we have to add
1162 * a new timeout because something happened.
1163 */
1164 g_main_context_iteration (sink->main_context, TRUE);
1165
1166 if (timeout) {
1167 g_source_destroy (timeout);
1168 g_source_unref (timeout);
1169 }
1170 }
1171
1172 return NULL;
1173 }
1174
1175 static void
gst_multi_socket_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1176 gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
1177 const GValue * value, GParamSpec * pspec)
1178 {
1179 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (object);
1180
1181 switch (prop_id) {
1182 case PROP_SEND_DISPATCHED:
1183 sink->send_dispatched = g_value_get_boolean (value);
1184 break;
1185 case PROP_SEND_MESSAGES:
1186 sink->send_messages = g_value_get_boolean (value);
1187 break;
1188 default:
1189 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1190 break;
1191 }
1192 }
1193
1194 static void
gst_multi_socket_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1195 gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
1196 GValue * value, GParamSpec * pspec)
1197 {
1198 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (object);
1199
1200 switch (prop_id) {
1201 case PROP_SEND_DISPATCHED:
1202 g_value_set_boolean (value, sink->send_dispatched);
1203 break;
1204 case PROP_SEND_MESSAGES:
1205 g_value_set_boolean (value, sink->send_messages);
1206 break;
1207 default:
1208 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1209 break;
1210 }
1211 }
1212
1213 static gboolean
gst_multi_socket_sink_start_pre(GstMultiHandleSink * mhsink)1214 gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink)
1215 {
1216 GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1217 GstMultiHandleSinkClass *mhsinkclass =
1218 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1219 GList *clients;
1220
1221 GST_INFO_OBJECT (mssink, "starting");
1222
1223 mssink->main_context = g_main_context_new ();
1224
1225 CLIENTS_LOCK (mhsink);
1226 for (clients = mhsink->clients; clients; clients = clients->next) {
1227 GstSocketClient *client = clients->data;
1228 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1229
1230 if (client->source)
1231 continue;
1232 mhsinkclass->hash_adding (mhsink, mhclient);
1233 }
1234 CLIENTS_UNLOCK (mhsink);
1235
1236 return TRUE;
1237 }
1238
1239 static gboolean
multisocketsink_hash_remove(gpointer key,gpointer value,gpointer data)1240 multisocketsink_hash_remove (gpointer key, gpointer value, gpointer data)
1241 {
1242 return TRUE;
1243 }
1244
1245 static void
gst_multi_socket_sink_stop_pre(GstMultiHandleSink * mhsink)1246 gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink)
1247 {
1248 GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1249
1250 if (mssink->main_context)
1251 g_main_context_wakeup (mssink->main_context);
1252 }
1253
1254 static void
gst_multi_socket_sink_stop_post(GstMultiHandleSink * mhsink)1255 gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink)
1256 {
1257 GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1258
1259 if (mssink->main_context) {
1260 g_main_context_unref (mssink->main_context);
1261 mssink->main_context = NULL;
1262 }
1263
1264 g_hash_table_foreach_remove (mhsink->handle_hash, multisocketsink_hash_remove,
1265 mssink);
1266 }
1267
1268 static gboolean
gst_multi_socket_sink_unlock(GstBaseSink * bsink)1269 gst_multi_socket_sink_unlock (GstBaseSink * bsink)
1270 {
1271 GstMultiSocketSink *sink;
1272
1273 sink = GST_MULTI_SOCKET_SINK (bsink);
1274
1275 GST_DEBUG_OBJECT (sink, "set to flushing");
1276 g_cancellable_cancel (sink->cancellable);
1277 if (sink->main_context)
1278 g_main_context_wakeup (sink->main_context);
1279
1280 return TRUE;
1281 }
1282
1283 /* will be called only between calls to start() and stop() */
1284 static gboolean
gst_multi_socket_sink_unlock_stop(GstBaseSink * bsink)1285 gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink)
1286 {
1287 GstMultiSocketSink *sink;
1288
1289 sink = GST_MULTI_SOCKET_SINK (bsink);
1290
1291 GST_DEBUG_OBJECT (sink, "unset flushing");
1292 g_object_unref (sink->cancellable);
1293 sink->cancellable = g_cancellable_new ();
1294
1295 return TRUE;
1296 }
1297
1298 static gboolean
gst_multi_socket_sink_propose_allocation(GstBaseSink * bsink,GstQuery * query)1299 gst_multi_socket_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query)
1300 {
1301 /* we support some meta */
1302 gst_query_add_allocation_meta (query, GST_NET_CONTROL_MESSAGE_META_API_TYPE,
1303 NULL);
1304
1305 return TRUE;
1306 }
1307