1 /* GStreamer
2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4 * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com>
5 * Copyright (C) <2011> Collabora Ltd.
6 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
17 *
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
22 */
23
24 /**
25 * SECTION:element-multisocketsink
26 * @title: multisocketsink
27 * @see_also: tcpserversink
28 *
29 * This plugin writes incoming data to a set of sockets. The
30 * sockets can be added to multisocketsink by emitting the #GstMultiSocketSink::add signal.
31 * For each descriptor added, the #GstMultiSocketSink::client-added signal will be called.
32 *
33 * A client can also be added with the #GstMultiSocketSink::add-full signal
34 * that allows for more control over what and how much data a client
35 * initially receives.
36 *
37 * Clients can be removed from multisocketsink by emitting the #GstMultiSocketSink::remove signal. For
38 * each descriptor removed, the #GstMultiSocketSink::client-removed signal will be called. The
39 * #GstMultiSocketSink::client-removed signal can also be fired when multisocketsink decides that a
40 * client is not active anymore or, depending on the value of the
41 * #GstMultiHandleSink:recover-policy property, if the client is reading too slowly.
42 * In all cases, multisocketsink will never close a socket itself.
43 * The user of multisocketsink is responsible for closing all sockets.
44 * This can for example be done in response to the #GstMultiSocketSink::client-socket-removed signal.
45 * Note that multisocketsink still has a reference to the socket when the
46 * #GstMultiSocketSink::client-removed signal is emitted, so that "get-stats" can be performed on
47 * the descriptor; it is therefore not safe to close the socket in
48 * the #GstMultiSocketSink::client-removed signal handler, and you should use the
49 * #GstMultiSocketSink::client-socket-removed signal to safely close the socket.
50 *
51 * Multisocketsink internally keeps a queue of the incoming buffers and uses a
52 * separate thread to send the buffers to the clients. This ensures that no
53 * client write can block the pipeline and that clients can read with different
54 * speeds.
55 *
56 * When adding a client to multisocketsink, the #GstMultiHandleSink:sync-method property will define
57 * which buffer in the queued buffers will be sent first to the client. Clients
58 * can be sent the most recent buffer (which might not be decodable by the
59 * client if it is not a keyframe), the next keyframe received in
60 * multisocketsink (which can take some time depending on the keyframe rate), or the
61 * last received keyframe (which will cause a simple burst-on-connect).
62 * Multisocketsink will always keep at least one keyframe in its internal buffers
63 * when the sync-mode is set to latest-keyframe.
64 *
65 * There are additional values for the #GstMultiHandleSink:sync-method
66 * property to allow finer control over burst-on-connect behaviour. By selecting
67 * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
68 * additionally requires that the burst begin with a keyframe, and
69 * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
70 * prefer a minimum burst size even if it requires not starting with a keyframe.
71 *
72 * Multisocketsink can be instructed to keep at least a minimum amount of data
73 * expressed in time or byte units in its internal queues with the
74 * #GstMultiHandleSink:time-min and #GstMultiHandleSink:bytes-min properties respectively.
75 * These properties are useful if the application adds clients with the
76 * #GstMultiSocketSink::add-full signal to make sure that a burst connect can
77 * actually be honored.
78 *
79 * When streaming data, clients are allowed to read at a different rate than
80 * the rate at which multisocketsink receives data. If the client is reading too
81 * fast, no data will be send to the client until multisocketsink receives more
82 * data. If the client, however, reads too slowly, data for that client will be
83 * queued up in multisocketsink. Two properties control the amount of data
84 * (buffers) that is queued in multisocketsink: #GstMultiHandleSink:buffers-max and
85 * #GstMultiHandleSink:buffers-soft-max. A client that falls behind by
86 * #GstMultiHandleSink:buffers-max is removed from multisocketsink forcibly.
87 *
88 * A client with a lag of at least #GstMultiHandleSink:buffers-soft-max enters the recovery
89 * procedure which is controlled with the #GstMultiHandleSink:recover-policy property.
90 * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently
91 * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
92 * positions the client to the soft limit in the buffer queue and
93 * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
94 * buffer queue.
95 *
96 * multisocketsink will by default synchronize on the clock before serving the
97 * buffers to the clients. This behaviour can be disabled by setting the sync
98 * property to FALSE. Multisocketsink will by default not do QoS and will never
99 * drop late buffers.
100 */
101
102 #ifdef HAVE_CONFIG_H
103 #include "config.h"
104 #endif
105
106 #include <gst/gst-i18n-plugin.h>
107 #include <gst/net/gstnetcontrolmessagemeta.h>
108
109 #include <string.h>
110
111 #include "gstmultisocketsink.h"
112 #include "gsttcpelements.h"
113
114 #ifndef G_OS_WIN32
115 #include <netinet/in.h>
116 #endif
117
118 #define NOT_IMPLEMENTED 0
119
120 GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug);
121 #define GST_CAT_DEFAULT (multisocketsink_debug)
122
123 /* MultiSocketSink signals and args */
124 enum
125 {
126 /* methods */
127 SIGNAL_ADD,
128 SIGNAL_ADD_BURST,
129 SIGNAL_REMOVE,
130 SIGNAL_REMOVE_FLUSH,
131 SIGNAL_GET_STATS,
132
133 /* signals */
134 SIGNAL_CLIENT_ADDED,
135 SIGNAL_CLIENT_REMOVED,
136 SIGNAL_CLIENT_SOCKET_REMOVED,
137
138 LAST_SIGNAL
139 };
140
141 #define DEFAULT_SEND_DISPATCHED FALSE
142 #define DEFAULT_SEND_MESSAGES FALSE
143
144 enum
145 {
146 PROP_0,
147 PROP_SEND_DISPATCHED,
148 PROP_SEND_MESSAGES,
149 PROP_LAST
150 };
151
152 static void gst_multi_socket_sink_finalize (GObject * object);
153
154 static void gst_multi_socket_sink_add (GstMultiSocketSink * sink,
155 GSocket * socket);
156 static void gst_multi_socket_sink_add_full (GstMultiSocketSink * sink,
157 GSocket * socket, GstSyncMethod sync, GstFormat min_format,
158 guint64 min_value, GstFormat max_format, guint64 max_value);
159 static void gst_multi_socket_sink_remove (GstMultiSocketSink * sink,
160 GSocket * socket);
161 static void gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink,
162 GSocket * socket);
163 static GstStructure *gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink,
164 GSocket * socket);
165
166 static void gst_multi_socket_sink_emit_client_added (GstMultiHandleSink * mhs,
167 GstMultiSinkHandle handle);
168 static void gst_multi_socket_sink_emit_client_removed (GstMultiHandleSink * mhs,
169 GstMultiSinkHandle handle, GstClientStatus status);
170
171 static void gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink);
172 static void gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink);
173 static gboolean gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink);
174 static gpointer gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink);
175 static GstMultiHandleClient
176 * gst_multi_socket_sink_new_client (GstMultiHandleSink * mhsink,
177 GstMultiSinkHandle handle, GstSyncMethod sync_method);
178 static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client);
179 static void gst_multi_socket_sink_client_free (GstMultiHandleSink * mhsink,
180 GstMultiHandleClient * client);
181 static void gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle,
182 gchar debug[30]);
183
184 static gpointer gst_multi_socket_sink_handle_hash_key (GstMultiSinkHandle
185 handle);
186 static void gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
187 GstMultiHandleClient * mhclient);
188 static void gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
189 GstMultiHandleClient * mhclient);
190 static void gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
191 GstSocketClient * client);
192
193 static gboolean gst_multi_socket_sink_socket_condition (GstMultiSinkHandle
194 handle, GIOCondition condition, GstMultiSocketSink * sink);
195
196 static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink);
197 static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink);
198
199 static gboolean gst_multi_socket_sink_propose_allocation (GstBaseSink * bsink,
200 GstQuery * query);
201
202 static void gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
203 const GValue * value, GParamSpec * pspec);
204 static void gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
205 GValue * value, GParamSpec * pspec);
206
207 #define gst_multi_socket_sink_parent_class parent_class
208 G_DEFINE_TYPE (GstMultiSocketSink, gst_multi_socket_sink,
209 GST_TYPE_MULTI_HANDLE_SINK);
210 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (multisocketsink, "multisocketsink",
211 GST_RANK_NONE, GST_TYPE_MULTI_SOCKET_SINK, tcp_element_init (plugin));
212
213 static guint gst_multi_socket_sink_signals[LAST_SIGNAL] = { 0 };
214
215 static void
gst_multi_socket_sink_class_init(GstMultiSocketSinkClass * klass)216 gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
217 {
218 GObjectClass *gobject_class;
219 GstElementClass *gstelement_class;
220 GstBaseSinkClass *gstbasesink_class;
221 GstMultiHandleSinkClass *gstmultihandlesink_class;
222
223 gobject_class = (GObjectClass *) klass;
224 gstelement_class = (GstElementClass *) klass;
225 gstbasesink_class = (GstBaseSinkClass *) klass;
226 gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass;
227
228 gobject_class->set_property = gst_multi_socket_sink_set_property;
229 gobject_class->get_property = gst_multi_socket_sink_get_property;
230 gobject_class->finalize = gst_multi_socket_sink_finalize;
231
232 /**
233 * GstMultiSocketSink:send-dispatched:
234 *
235 * Sends a GstNetworkMessageDispatched event upstream whenever a buffer
236 * is sent to a client.
237 * The event is a CUSTOM event name GstNetworkMessageDispatched and
238 * contains:
239 *
240 * "object" G_TYPE_OBJECT : the object identifying the client
241 * "buffer" GST_TYPE_BUFFER : the buffer sent to the client
242 *
243 * Since: 1.8.0
244 */
245 g_object_class_install_property (gobject_class, PROP_SEND_DISPATCHED,
246 g_param_spec_boolean ("send-dispatched", "Send Dispatched",
247 "If GstNetworkMessageDispatched events should be pushed",
248 DEFAULT_SEND_DISPATCHED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
249 /**
250 * GstMultiSocketSink:send-messages:
251 *
252 * Sends a GstNetworkMessage event upstream whenever a buffer
253 * is received from a client.
254 * The event is a CUSTOM event name GstNetworkMessage and contains:
255 *
256 * "object" G_TYPE_OBJECT : the object identifying the client
257 * "buffer" GST_TYPE_BUFFER : the buffer with data received from the
258 * client
259 *
260 * Since: 1.8.0
261 */
262 g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
263 g_param_spec_boolean ("send-messages", "Send Messages",
264 "If GstNetworkMessage events should be pushed", DEFAULT_SEND_MESSAGES,
265 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266
267 /**
268 * GstMultiSocketSink::add:
269 * @gstmultisocketsink: the multisocketsink element to emit this signal on
270 * @socket: the socket to add to multisocketsink
271 *
272 * Hand the given open socket to multisocketsink to write to.
273 */
274 gst_multi_socket_sink_signals[SIGNAL_ADD] =
275 g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
276 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
277 G_STRUCT_OFFSET (GstMultiSocketSinkClass, add), NULL, NULL,
278 NULL, G_TYPE_NONE, 1, G_TYPE_SOCKET);
279 /**
280 * GstMultiSocketSink::add-full:
281 * @gstmultisocketsink: the multisocketsink element to emit this signal on
282 * @socket: the socket to add to multisocketsink
283 * @sync: the sync method to use
284 * @format_min: the format of @value_min
285 * @value_min: the minimum amount of data to burst expressed in
286 * @format_min units.
287 * @format_max: the format of @value_max
288 * @value_max: the maximum amount of data to burst expressed in
289 * @format_max units.
290 *
291 * Hand the given open socket to multisocketsink to write to and
292 * specify the burst parameters for the new connection.
293 */
294 gst_multi_socket_sink_signals[SIGNAL_ADD_BURST] =
295 g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
296 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
297 G_STRUCT_OFFSET (GstMultiSocketSinkClass, add_full), NULL, NULL,
298 NULL, G_TYPE_NONE, 6, G_TYPE_SOCKET, GST_TYPE_SYNC_METHOD,
299 GST_TYPE_FORMAT, G_TYPE_UINT64, GST_TYPE_FORMAT, G_TYPE_UINT64);
300 /**
301 * GstMultiSocketSink::remove:
302 * @gstmultisocketsink: the multisocketsink element to emit this signal on
303 * @socket: the socket to remove from multisocketsink
304 *
305 * Remove the given open socket from multisocketsink.
306 */
307 gst_multi_socket_sink_signals[SIGNAL_REMOVE] =
308 g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
309 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
310 G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove), NULL, NULL, NULL,
311 G_TYPE_NONE, 1, G_TYPE_SOCKET);
312 /**
313 * GstMultiSocketSink::remove-flush:
314 * @gstmultisocketsink: the multisocketsink element to emit this signal on
315 * @socket: the socket to remove from multisocketsink
316 *
317 * Remove the given open socket from multisocketsink after flushing all
318 * the pending data to the socket.
319 */
320 gst_multi_socket_sink_signals[SIGNAL_REMOVE_FLUSH] =
321 g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass),
322 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
323 G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove_flush), NULL, NULL, NULL,
324 G_TYPE_NONE, 1, G_TYPE_SOCKET);
325
326 /**
327 * GstMultiSocketSink::get-stats:
328 * @gstmultisocketsink: the multisocketsink element to emit this signal on
329 * @socket: the socket to get stats of from multisocketsink
330 *
331 * Get statistics about @socket. This function returns a GstStructure.
332 *
333 * Returns: a GstStructure with the statistics. The structure contains
334 * values that represent: total number of bytes sent, time
335 * when the client was added, time when the client was
336 * disconnected/removed, time the client is/was active, last activity
337 * time (in epoch seconds), number of buffers dropped.
338 * All times are expressed in nanoseconds (GstClockTime).
339 */
340 gst_multi_socket_sink_signals[SIGNAL_GET_STATS] =
341 g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
342 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
343 G_STRUCT_OFFSET (GstMultiSocketSinkClass, get_stats), NULL, NULL, NULL,
344 GST_TYPE_STRUCTURE, 1, G_TYPE_SOCKET);
345
346 /**
347 * GstMultiSocketSink::client-added:
348 * @gstmultisocketsink: the multisocketsink element that emitted this signal
349 * @socket: the socket that was added to multisocketsink
350 *
351 * The given socket was added to multisocketsink. This signal will
352 * be emitted from the streaming thread so application should be prepared
353 * for that.
354 */
355 gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED] =
356 g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
357 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_OBJECT);
358 /**
359 * GstMultiSocketSink::client-removed:
360 * @gstmultisocketsink: the multisocketsink element that emitted this signal
361 * @socket: the socket that is to be removed from multisocketsink
362 * @status: the reason why the client was removed
363 *
364 * The given socket is about to be removed from multisocketsink. This
365 * signal will be emitted from the streaming thread so applications should
366 * be prepared for that.
367 *
368 * @gstmultisocketsink still holds a handle to @socket so it is possible to call
369 * the get-stats signal from this callback. For the same reason it is
370 * not safe to `close()` and reuse @socket in this callback.
371 */
372 gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED] =
373 g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
374 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_SOCKET,
375 GST_TYPE_CLIENT_STATUS);
376 /**
377 * GstMultiSocketSink::client-socket-removed:
378 * @gstmultisocketsink: the multisocketsink element that emitted this signal
379 * @socket: the socket that was removed from multisocketsink
380 *
381 * The given socket was removed from multisocketsink. This signal will
382 * be emitted from the streaming thread so applications should be prepared
383 * for that.
384 *
385 * In this callback, @gstmultisocketsink has removed all the information
386 * associated with @socket and it is therefore not possible to call get-stats
387 * with @socket. It is however safe to `close()` and reuse @fd in the callback.
388 */
389 gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED] =
390 g_signal_new ("client-socket-removed", G_TYPE_FROM_CLASS (klass),
391 G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_SOCKET);
392
393 gst_element_class_set_static_metadata (gstelement_class,
394 "Multi socket sink", "Sink/Network",
395 "Send data to multiple sockets",
396 "Thomas Vander Stichele <thomas at apestaart dot org>, "
397 "Wim Taymans <wim@fluendo.com>, "
398 "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
399
400 gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock);
401 gstbasesink_class->unlock_stop =
402 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop);
403 gstbasesink_class->propose_allocation =
404 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_propose_allocation);
405
406 klass->add = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add);
407 klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add_full);
408 klass->remove = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove);
409 klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_flush);
410 klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_get_stats);
411
412 gstmultihandlesink_class->emit_client_added =
413 gst_multi_socket_sink_emit_client_added;
414 gstmultihandlesink_class->emit_client_removed =
415 gst_multi_socket_sink_emit_client_removed;
416
417 gstmultihandlesink_class->stop_pre =
418 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_pre);
419 gstmultihandlesink_class->stop_post =
420 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_post);
421 gstmultihandlesink_class->start_pre =
422 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_start_pre);
423 gstmultihandlesink_class->thread =
424 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_thread);
425 gstmultihandlesink_class->new_client =
426 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_new_client);
427 gstmultihandlesink_class->client_get_fd =
428 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_get_fd);
429 gstmultihandlesink_class->client_free =
430 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_free);
431 gstmultihandlesink_class->handle_debug =
432 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_debug);
433 gstmultihandlesink_class->handle_hash_key =
434 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_hash_key);
435 gstmultihandlesink_class->hash_adding =
436 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_hash_adding);
437 gstmultihandlesink_class->hash_removing =
438 GST_DEBUG_FUNCPTR (gst_multi_socket_sink_hash_removing);
439
440 GST_DEBUG_CATEGORY_INIT (multisocketsink_debug, "multisocketsink", 0,
441 "Multi socket sink");
442 }
443
444 static void
gst_multi_socket_sink_init(GstMultiSocketSink * this)445 gst_multi_socket_sink_init (GstMultiSocketSink * this)
446 {
447 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (this);
448
449 mhsink->handle_hash = g_hash_table_new (g_direct_hash, g_int_equal);
450
451 this->cancellable = g_cancellable_new ();
452 this->send_dispatched = DEFAULT_SEND_DISPATCHED;
453 this->send_messages = DEFAULT_SEND_MESSAGES;
454 }
455
456 static void
gst_multi_socket_sink_finalize(GObject * object)457 gst_multi_socket_sink_finalize (GObject * object)
458 {
459 GstMultiSocketSink *this = GST_MULTI_SOCKET_SINK (object);
460
461 if (this->cancellable) {
462 g_object_unref (this->cancellable);
463 this->cancellable = NULL;
464 }
465
466 G_OBJECT_CLASS (parent_class)->finalize (object);
467 }
468
469 /* methods to emit signals */
470
471 static void
gst_multi_socket_sink_emit_client_added(GstMultiHandleSink * mhsink,GstMultiSinkHandle handle)472 gst_multi_socket_sink_emit_client_added (GstMultiHandleSink * mhsink,
473 GstMultiSinkHandle handle)
474 {
475 g_signal_emit (mhsink, gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0,
476 handle.socket);
477 }
478
479 static void
gst_multi_socket_sink_emit_client_removed(GstMultiHandleSink * mhsink,GstMultiSinkHandle handle,GstClientStatus status)480 gst_multi_socket_sink_emit_client_removed (GstMultiHandleSink * mhsink,
481 GstMultiSinkHandle handle, GstClientStatus status)
482 {
483 g_signal_emit (mhsink, gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED],
484 0, handle.socket, status);
485 }
486
487 /* action signals */
488
489 static void
gst_multi_socket_sink_add(GstMultiSocketSink * sink,GSocket * socket)490 gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket)
491 {
492 GstMultiSinkHandle handle;
493
494 handle.socket = socket;
495 gst_multi_handle_sink_add (GST_MULTI_HANDLE_SINK_CAST (sink), handle);
496 }
497
498 static void
gst_multi_socket_sink_add_full(GstMultiSocketSink * sink,GSocket * socket,GstSyncMethod sync,GstFormat min_format,guint64 min_value,GstFormat max_format,guint64 max_value)499 gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
500 GstSyncMethod sync, GstFormat min_format, guint64 min_value,
501 GstFormat max_format, guint64 max_value)
502 {
503 GstMultiSinkHandle handle;
504
505 handle.socket = socket;
506 gst_multi_handle_sink_add_full (GST_MULTI_HANDLE_SINK_CAST (sink), handle,
507 sync, min_format, min_value, max_format, max_value);
508 }
509
510 static void
gst_multi_socket_sink_remove(GstMultiSocketSink * sink,GSocket * socket)511 gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket)
512 {
513 GstMultiSinkHandle handle;
514
515 handle.socket = socket;
516 gst_multi_handle_sink_remove (GST_MULTI_HANDLE_SINK_CAST (sink), handle);
517 }
518
519 static void
gst_multi_socket_sink_remove_flush(GstMultiSocketSink * sink,GSocket * socket)520 gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket)
521 {
522 GstMultiSinkHandle handle;
523
524 handle.socket = socket;
525 gst_multi_handle_sink_remove_flush (GST_MULTI_HANDLE_SINK_CAST (sink),
526 handle);
527 }
528
529 static GstStructure *
gst_multi_socket_sink_get_stats(GstMultiSocketSink * sink,GSocket * socket)530 gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, GSocket * socket)
531 {
532 GstMultiSinkHandle handle;
533
534 handle.socket = socket;
535 return gst_multi_handle_sink_get_stats (GST_MULTI_HANDLE_SINK_CAST (sink),
536 handle);
537 }
538
539 static GstMultiHandleClient *
gst_multi_socket_sink_new_client(GstMultiHandleSink * mhsink,GstMultiSinkHandle handle,GstSyncMethod sync_method)540 gst_multi_socket_sink_new_client (GstMultiHandleSink * mhsink,
541 GstMultiSinkHandle handle, GstSyncMethod sync_method)
542 {
543 GstSocketClient *client;
544 GstMultiHandleClient *mhclient;
545 GstMultiHandleSinkClass *mhsinkclass =
546 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
547
548 /* create client datastructure */
549 g_assert (G_IS_SOCKET (handle.socket));
550 client = g_new0 (GstSocketClient, 1);
551 mhclient = (GstMultiHandleClient *) client;
552
553 mhclient->handle.socket = G_SOCKET (g_object_ref (handle.socket));
554
555 gst_multi_handle_sink_client_init (mhclient, sync_method);
556 mhsinkclass->handle_debug (handle, mhclient->debug);
557
558 /* set the socket to non blocking */
559 g_socket_set_blocking (handle.socket, FALSE);
560
561 /* we always read from a client */
562 mhsinkclass->hash_adding (mhsink, mhclient);
563
564 gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient);
565
566 return mhclient;
567 }
568
569 static int
gst_multi_socket_sink_client_get_fd(GstMultiHandleClient * client)570 gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client)
571 {
572 return g_socket_get_fd (client->handle.socket);
573 }
574
575 static void
gst_multi_socket_sink_client_free(GstMultiHandleSink * mhsink,GstMultiHandleClient * client)576 gst_multi_socket_sink_client_free (GstMultiHandleSink * mhsink,
577 GstMultiHandleClient * client)
578 {
579 g_assert (G_IS_SOCKET (client->handle.socket));
580
581 g_signal_emit (mhsink,
582 gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0,
583 client->handle.socket);
584
585 g_object_unref (client->handle.socket);
586 }
587
588 static void
gst_multi_socket_sink_handle_debug(GstMultiSinkHandle handle,gchar debug[30])589 gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle, gchar debug[30])
590 {
591 g_snprintf (debug, 30, "[socket %p]", handle.socket);
592 }
593
594 static gpointer
gst_multi_socket_sink_handle_hash_key(GstMultiSinkHandle handle)595 gst_multi_socket_sink_handle_hash_key (GstMultiSinkHandle handle)
596 {
597 return handle.socket;
598 }
599
600 /* handle a read on a client socket,
601 * which either indicates a close or should be ignored
602 * returns FALSE if some error occurred or the client closed. */
603 static gboolean
gst_multi_socket_sink_handle_client_read(GstMultiSocketSink * sink,GstSocketClient * client)604 gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
605 GstSocketClient * client)
606 {
607 gboolean ret, do_event;
608 gchar dummy[256], *mem, *omem;
609 gssize nread;
610 GError *err = NULL;
611 gboolean first = TRUE;
612 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
613 gssize navail, maxmem;
614
615 GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug);
616
617 ret = TRUE;
618
619 navail = g_socket_get_available_bytes (mhclient->handle.socket);
620 if (navail < 0)
621 return TRUE;
622
623 /* only collect the data in a buffer when we need to send it with an event */
624 do_event = sink->send_messages && navail > 0;
625 if (do_event) {
626 omem = mem = g_malloc (navail);
627 maxmem = navail;
628 } else {
629 mem = dummy;
630 maxmem = sizeof (dummy);
631 }
632
633 /* just Read 'n' Drop, could also just drop the client as it's not supposed
634 * to write to us except for closing the socket, I guess it's because we
635 * like to listen to our customers. */
636 do {
637 GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug);
638
639 nread =
640 g_socket_receive (mhclient->handle.socket, mem, MIN (navail,
641 maxmem), sink->cancellable, &err);
642
643 if (first && nread == 0) {
644 /* client sent close, so remove it */
645 GST_DEBUG_OBJECT (sink, "%s client asked for close, removing",
646 mhclient->debug);
647 mhclient->status = GST_CLIENT_STATUS_CLOSED;
648 ret = FALSE;
649 break;
650 } else if (nread < 0) {
651 if (err->code == G_IO_ERROR_WOULD_BLOCK)
652 break;
653
654 GST_WARNING_OBJECT (sink, "%s could not read: %s",
655 mhclient->debug, err->message);
656 mhclient->status = GST_CLIENT_STATUS_ERROR;
657 ret = FALSE;
658 break;
659 }
660 navail -= nread;
661 if (do_event)
662 mem += nread;
663 first = FALSE;
664 } while (navail > 0);
665 g_clear_error (&err);
666
667 if (do_event) {
668 if (ret) {
669 GstBuffer *buf;
670 GstEvent *ev;
671
672 buf = gst_buffer_new_wrapped (omem, maxmem);
673 ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
674 gst_structure_new ("GstNetworkMessage",
675 "object", G_TYPE_OBJECT, mhclient->handle.socket,
676 "buffer", GST_TYPE_BUFFER, buf, NULL));
677 gst_buffer_unref (buf);
678
679 gst_pad_push_event (GST_BASE_SINK_PAD (sink), ev);
680 } else
681 g_free (omem);
682 }
683 return ret;
684 }
685
686 /**
687 * map_memory_output_vector_n:
688 * @buf: The #GstBuffer that should be mapped
689 * @offset: Offset into the buffer that should be mapped
690 * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
691 * @mapinfo: (out,array length=num_vectors): an array of #GstMapInfo structs to write into
692 * @num_vectors: the number of elements in @vectors to prevent buffer overruns
693 *
694 * Maps a buffer into memory, populating a #GOutputVector to use scatter-gather
695 * I/O to send the data over a socket. The whole buffer won't be mapped into
696 * memory if it consists of more than @num_vectors #GstMemory s.
697 *
698 * Use #unmap_n_memorys after you are
699 * finished with the mappings.
700 *
701 * Returns: The number of GstMemorys mapped
702 */
703 static int
map_n_memory_output_vector(GstBuffer * buf,size_t offset,GOutputVector * vectors,GstMapInfo * mapinfo,int num_vectors)704 map_n_memory_output_vector (GstBuffer * buf, size_t offset,
705 GOutputVector * vectors, GstMapInfo * mapinfo, int num_vectors)
706 {
707 guint mem_idx, mem_len;
708 gsize mem_skip;
709 size_t maxsize;
710 int i;
711
712 g_return_val_if_fail (num_vectors > 0, 0);
713 memset (vectors, 0, sizeof (GOutputVector) * num_vectors);
714
715 maxsize = gst_buffer_get_size (buf) - offset;
716 if (!gst_buffer_find_memory (buf, offset, maxsize, &mem_idx, &mem_len,
717 &mem_skip))
718 g_error ("Unable to map memory at offset %" G_GSIZE_FORMAT ", buffer "
719 "length is %" G_GSIZE_FORMAT, offset, gst_buffer_get_size (buf));
720
721 for (i = 0; i < mem_len && i < num_vectors; i++) {
722 GstMapInfo map = { 0 };
723 GstMemory *mem = gst_buffer_peek_memory (buf, mem_idx + i);
724 if (!gst_memory_map (mem, &map, GST_MAP_READ))
725 g_error ("Unable to map memory %p. This should never happen.", mem);
726
727 if (i == 0) {
728 vectors[i].buffer = map.data + mem_skip;
729 vectors[i].size = map.size - mem_skip;
730 } else {
731 vectors[i].buffer = map.data;
732 vectors[i].size = map.size;
733 }
734 mapinfo[i] = map;
735 }
736 return i;
737 }
738
739 /**
740 * map_n_memory_output_vector:
741 * @buf: The #GstBuffer that should be mapped
742 * @offset: Offset into the buffer that should be mapped
743 * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
744 * @num_vectors: the number of elements in @vectors to prevent buffer overruns
745 *
746 * Returns: The number of GstMemorys mapped
747 */
748 static void
unmap_n_memorys(GstMapInfo * mapinfo,int num_mappings)749 unmap_n_memorys (GstMapInfo * mapinfo, int num_mappings)
750 {
751 int i;
752 g_return_if_fail (num_mappings > 0);
753
754 for (i = 0; i < num_mappings; i++)
755 gst_memory_unmap (mapinfo[i].memory, &mapinfo[i]);
756 }
757
758 static gsize
gst_buffer_get_cmsg_list(GstBuffer * buf,GSocketControlMessage ** msgs,gsize msg_space)759 gst_buffer_get_cmsg_list (GstBuffer * buf, GSocketControlMessage ** msgs,
760 gsize msg_space)
761 {
762 gpointer iter_state = NULL;
763 GstMeta *meta;
764 gsize msg_count = 0;
765
766 while ((meta = gst_buffer_iterate_meta (buf, &iter_state)) != NULL
767 && msg_count < msg_space) {
768 if (meta->info->api == GST_NET_CONTROL_MESSAGE_META_API_TYPE)
769 msgs[msg_count++] = ((GstNetControlMessageMeta *) meta)->message;
770 }
771
772 return msg_count;
773 }
774
775 #define CMSG_MAX 255
776
777 static gssize
gst_multi_socket_sink_write(GstMultiSocketSink * sink,GSocket * sock,GstBuffer * buffer,gsize bufoffset,GCancellable * cancellable,GError ** err)778 gst_multi_socket_sink_write (GstMultiSocketSink * sink,
779 GSocket * sock, GstBuffer * buffer, gsize bufoffset,
780 GCancellable * cancellable, GError ** err)
781 {
782 GstMapInfo maps[8];
783 GOutputVector vec[8];
784 guint mems_mapped;
785 gssize wrote;
786 GSocketControlMessage *cmsgs[CMSG_MAX];
787 gsize msg_count;
788
789 mems_mapped = map_n_memory_output_vector (buffer, bufoffset, vec, maps, 8);
790
791 msg_count = gst_buffer_get_cmsg_list (buffer, cmsgs, CMSG_MAX);
792
793 wrote =
794 g_socket_send_message (sock, NULL, vec, mems_mapped, cmsgs, msg_count, 0,
795 cancellable, err);
796 unmap_n_memorys (maps, mems_mapped);
797 return wrote;
798 }
799
800 /* Handle a write on a client,
801 * which indicates a read request from a client.
802 *
803 * For each client we maintain a queue of GstBuffers that contain the raw bytes
804 * we need to send to the client.
805 *
806 * We first check to see if we need to send streamheaders. If so, we queue them.
807 *
808 * Then we run into the main loop that tries to send as many buffers as
809 * possible. It will first exhaust the mhclient->sending queue and if the queue
810 * is empty, it will pick a buffer from the global queue.
811 *
812 * Sending the buffers from the mhclient->sending queue is basically writing
813 * the bytes to the socket and maintaining a count of the bytes that were
814 * sent. When the buffer is completely sent, it is removed from the
815 * mhclient->sending queue and we try to pick a new buffer for sending.
816 *
817 * When the sending returns a partial buffer we stop sending more data as
818 * the next send operation could block.
819 *
820 * This functions returns FALSE if some error occurred.
821 */
822 static gboolean
gst_multi_socket_sink_handle_client_write(GstMultiSocketSink * sink,GstSocketClient * client)823 gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
824 GstSocketClient * client)
825 {
826 gboolean more;
827 gboolean flushing;
828 GstClockTime now, now_monotonic;
829 GError *err = NULL;
830 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
831 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
832 GstMultiHandleSinkClass *mhsinkclass =
833 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
834
835
836 now = g_get_real_time () * GST_USECOND;
837 now_monotonic = g_get_monotonic_time () * GST_USECOND;
838
839 flushing = mhclient->status == GST_CLIENT_STATUS_FLUSHING;
840
841 more = TRUE;
842 do {
843 if (!mhclient->sending) {
844 /* client is not working on a buffer */
845 if (mhclient->bufpos == -1) {
846 /* client is too fast, remove from write queue until new buffer is
847 * available */
848 gst_multi_socket_sink_stop_sending (sink, client);
849
850 /* if we flushed out all of the client buffers, we can stop */
851 if (mhclient->flushcount == 0)
852 goto flushed;
853
854 return TRUE;
855 } else {
856 /* client can pick a buffer from the global queue */
857 GstBuffer *buf;
858 GstClockTime timestamp;
859
860 /* for new connections, we need to find a good spot in the
861 * bufqueue to start streaming from */
862 if (mhclient->new_connection && !flushing) {
863 gint position =
864 gst_multi_handle_sink_new_client_position (mhsink, mhclient);
865
866 if (position >= 0) {
867 /* we got a valid spot in the queue */
868 mhclient->new_connection = FALSE;
869 mhclient->bufpos = position;
870 } else {
871 /* cannot send data to this client yet */
872 gst_multi_socket_sink_stop_sending (sink, client);
873 return TRUE;
874 }
875 }
876
877 /* we flushed all remaining buffers, no need to get a new one */
878 if (mhclient->flushcount == 0)
879 goto flushed;
880
881 /* grab buffer */
882 buf = g_array_index (mhsink->bufqueue, GstBuffer *, mhclient->bufpos);
883 mhclient->bufpos--;
884
885 /* update stats */
886 timestamp = GST_BUFFER_TIMESTAMP (buf);
887 if (mhclient->first_buffer_ts == GST_CLOCK_TIME_NONE)
888 mhclient->first_buffer_ts = timestamp;
889 if (timestamp != -1)
890 mhclient->last_buffer_ts = timestamp;
891
892 /* decrease flushcount */
893 if (mhclient->flushcount != -1)
894 mhclient->flushcount--;
895
896 GST_LOG_OBJECT (sink, "%s client %p at position %d",
897 mhclient->debug, client, mhclient->bufpos);
898
899 /* queueing a buffer will ref it */
900 mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
901
902 /* need to start from the first byte for this new buffer */
903 mhclient->bufoffset = 0;
904 }
905 }
906
907 /* see if we need to send something */
908 if (mhclient->sending) {
909 gssize wrote;
910 GstBuffer *head;
911
912 /* pick first buffer from list */
913 head = GST_BUFFER (mhclient->sending->data);
914
915 wrote = gst_multi_socket_sink_write (sink, mhclient->handle.socket, head,
916 mhclient->bufoffset, sink->cancellable, &err);
917
918 if (wrote < 0) {
919 /* hmm error.. */
920 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED)) {
921 goto connection_reset;
922 } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
923 /* write would block, try again later */
924 GST_LOG_OBJECT (sink, "write would block %p",
925 mhclient->handle.socket);
926 more = FALSE;
927 g_clear_error (&err);
928 } else {
929 goto write_error;
930 }
931 } else {
932 if (wrote < (gst_buffer_get_size (head) - mhclient->bufoffset)) {
933 /* partial write, try again now */
934 GST_LOG_OBJECT (sink,
935 "partial write on %p of %" G_GSSIZE_FORMAT " bytes",
936 mhclient->handle.socket, wrote);
937 mhclient->bufoffset += wrote;
938 } else {
939 if (sink->send_dispatched) {
940 gst_pad_push_event (GST_BASE_SINK_PAD (mhsink),
941 gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
942 gst_structure_new ("GstNetworkMessageDispatched",
943 "object", G_TYPE_OBJECT, mhclient->handle.socket,
944 "buffer", GST_TYPE_BUFFER, head, NULL)));
945 }
946 /* complete buffer was written, we can proceed to the next one */
947 mhclient->sending = g_slist_remove (mhclient->sending, head);
948 gst_buffer_unref (head);
949 /* make sure we start from byte 0 for the next buffer */
950 mhclient->bufoffset = 0;
951 }
952 /* update stats */
953 mhclient->bytes_sent += wrote;
954 mhclient->last_activity_time = now;
955 mhclient->last_activity_time_monotonic = now_monotonic;
956 mhsink->bytes_served += wrote;
957 }
958 }
959 } while (more);
960
961 return TRUE;
962
963 /* ERRORS */
964 flushed:
965 {
966 GST_DEBUG_OBJECT (sink, "%s flushed, removing", mhclient->debug);
967 mhclient->status = GST_CLIENT_STATUS_REMOVED;
968 return FALSE;
969 }
970 connection_reset:
971 {
972 GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing",
973 mhclient->debug);
974 mhclient->status = GST_CLIENT_STATUS_CLOSED;
975 g_clear_error (&err);
976 return FALSE;
977 }
978 write_error:
979 {
980 GST_WARNING_OBJECT (sink,
981 "%s could not write, removing client: %s", mhclient->debug,
982 err->message);
983 g_clear_error (&err);
984 mhclient->status = GST_CLIENT_STATUS_ERROR;
985 return FALSE;
986 }
987 }
988
989 static void
ensure_condition(GstMultiSocketSink * sink,GstSocketClient * client,GIOCondition condition)990 ensure_condition (GstMultiSocketSink * sink, GstSocketClient * client,
991 GIOCondition condition)
992 {
993 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
994
995 if (client->condition == condition)
996 return;
997
998 if (client->source) {
999 g_source_destroy (client->source);
1000 g_source_unref (client->source);
1001 }
1002 if (condition && sink->main_context) {
1003 client->source = g_socket_create_source (mhclient->handle.socket,
1004 condition, sink->cancellable);
1005 g_source_set_callback (client->source,
1006 (GSourceFunc) gst_multi_socket_sink_socket_condition,
1007 gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
1008 g_source_attach (client->source, sink->main_context);
1009 } else {
1010 client->source = NULL;
1011 condition = 0;
1012 }
1013 client->condition = condition;
1014 }
1015
1016 static void
gst_multi_socket_sink_hash_adding(GstMultiHandleSink * mhsink,GstMultiHandleClient * mhclient)1017 gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
1018 GstMultiHandleClient * mhclient)
1019 {
1020 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1021 GstSocketClient *client = (GstSocketClient *) (mhclient);
1022
1023 ensure_condition (sink, client,
1024 G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP);
1025 }
1026
1027 static void
gst_multi_socket_sink_hash_removing(GstMultiHandleSink * mhsink,GstMultiHandleClient * mhclient)1028 gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
1029 GstMultiHandleClient * mhclient)
1030 {
1031 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1032 GstSocketClient *client = (GstSocketClient *) (mhclient);
1033
1034 ensure_condition (sink, client, 0);
1035 }
1036
1037 static void
gst_multi_socket_sink_stop_sending(GstMultiSocketSink * sink,GstSocketClient * client)1038 gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
1039 GstSocketClient * client)
1040 {
1041 ensure_condition (sink, client, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
1042 }
1043
1044 /* Handle the clients. This is called when a socket becomes ready
1045 * to read or writable. Badly behaving clients are put on a
1046 * garbage list and removed.
1047 */
1048 static gboolean
gst_multi_socket_sink_socket_condition(GstMultiSinkHandle handle,GIOCondition condition,GstMultiSocketSink * sink)1049 gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle,
1050 GIOCondition condition, GstMultiSocketSink * sink)
1051 {
1052 GList *clink;
1053 GstSocketClient *client;
1054 gboolean ret = TRUE;
1055 GstMultiHandleClient *mhclient;
1056 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1057 GstMultiHandleSinkClass *mhsinkclass =
1058 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1059
1060 CLIENTS_LOCK (mhsink);
1061 clink = g_hash_table_lookup (mhsink->handle_hash,
1062 mhsinkclass->handle_hash_key (handle));
1063 if (clink == NULL) {
1064 ret = FALSE;
1065 goto done;
1066 }
1067
1068 client = clink->data;
1069 mhclient = (GstMultiHandleClient *) client;
1070
1071 if (mhclient->status != GST_CLIENT_STATUS_FLUSHING
1072 && mhclient->status != GST_CLIENT_STATUS_OK) {
1073 gst_multi_handle_sink_remove_client_link (mhsink, clink);
1074 ret = FALSE;
1075 goto done;
1076 }
1077
1078 if ((condition & G_IO_ERR)) {
1079 GST_WARNING_OBJECT (sink, "%s has error", mhclient->debug);
1080 mhclient->status = GST_CLIENT_STATUS_ERROR;
1081 gst_multi_handle_sink_remove_client_link (mhsink, clink);
1082 ret = FALSE;
1083 goto done;
1084 } else if ((condition & G_IO_HUP)) {
1085 mhclient->status = GST_CLIENT_STATUS_CLOSED;
1086 gst_multi_handle_sink_remove_client_link (mhsink, clink);
1087 ret = FALSE;
1088 goto done;
1089 }
1090 if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
1091 /* handle client read */
1092 if (!gst_multi_socket_sink_handle_client_read (sink, client)) {
1093 gst_multi_handle_sink_remove_client_link (mhsink, clink);
1094 ret = FALSE;
1095 goto done;
1096 }
1097 }
1098 if ((condition & G_IO_OUT)) {
1099 /* handle client write */
1100 if (!gst_multi_socket_sink_handle_client_write (sink, client)) {
1101 gst_multi_handle_sink_remove_client_link (mhsink, clink);
1102 ret = FALSE;
1103 goto done;
1104 }
1105 }
1106
1107 done:
1108 CLIENTS_UNLOCK (mhsink);
1109
1110 return ret;
1111 }
1112
1113 static gboolean
gst_multi_socket_sink_timeout(GstMultiSocketSink * sink)1114 gst_multi_socket_sink_timeout (GstMultiSocketSink * sink)
1115 {
1116 GstClockTime now;
1117 GList *clients;
1118 GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1119
1120 now = g_get_monotonic_time () * GST_USECOND;
1121
1122 CLIENTS_LOCK (mhsink);
1123 for (clients = mhsink->clients; clients; clients = clients->next) {
1124 GstSocketClient *client;
1125 GstMultiHandleClient *mhclient;
1126
1127 client = clients->data;
1128 mhclient = (GstMultiHandleClient *) client;
1129 if (mhsink->timeout > 0
1130 && now - mhclient->last_activity_time_monotonic > mhsink->timeout) {
1131 mhclient->status = GST_CLIENT_STATUS_SLOW;
1132 gst_multi_handle_sink_remove_client_link (mhsink, clients);
1133 }
1134 }
1135 CLIENTS_UNLOCK (mhsink);
1136
1137 return FALSE;
1138 }
1139
1140 /* we handle the client communication in another thread so that we do not block
1141 * the gstreamer thread while we select() on the client fds */
1142 static gpointer
gst_multi_socket_sink_thread(GstMultiHandleSink * mhsink)1143 gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink)
1144 {
1145 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1146 GSource *timeout = NULL;
1147
1148 while (mhsink->running) {
1149 if (mhsink->timeout > 0) {
1150 timeout = g_timeout_source_new (mhsink->timeout / GST_MSECOND);
1151
1152 g_source_set_callback (timeout,
1153 (GSourceFunc) gst_multi_socket_sink_timeout, gst_object_ref (sink),
1154 (GDestroyNotify) gst_object_unref);
1155 g_source_attach (timeout, sink->main_context);
1156 }
1157
1158 /* Returns after handling all pending events or when
1159 * _wakeup() was called. In any case we have to add
1160 * a new timeout because something happened.
1161 */
1162 g_main_context_iteration (sink->main_context, TRUE);
1163
1164 if (timeout) {
1165 g_source_destroy (timeout);
1166 g_source_unref (timeout);
1167 }
1168 }
1169
1170 return NULL;
1171 }
1172
1173 static void
gst_multi_socket_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1174 gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
1175 const GValue * value, GParamSpec * pspec)
1176 {
1177 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (object);
1178
1179 switch (prop_id) {
1180 case PROP_SEND_DISPATCHED:
1181 sink->send_dispatched = g_value_get_boolean (value);
1182 break;
1183 case PROP_SEND_MESSAGES:
1184 sink->send_messages = g_value_get_boolean (value);
1185 break;
1186 default:
1187 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1188 break;
1189 }
1190 }
1191
1192 static void
gst_multi_socket_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1193 gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
1194 GValue * value, GParamSpec * pspec)
1195 {
1196 GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (object);
1197
1198 switch (prop_id) {
1199 case PROP_SEND_DISPATCHED:
1200 g_value_set_boolean (value, sink->send_dispatched);
1201 break;
1202 case PROP_SEND_MESSAGES:
1203 g_value_set_boolean (value, sink->send_messages);
1204 break;
1205 default:
1206 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1207 break;
1208 }
1209 }
1210
1211 static gboolean
gst_multi_socket_sink_start_pre(GstMultiHandleSink * mhsink)1212 gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink)
1213 {
1214 GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1215 GstMultiHandleSinkClass *mhsinkclass =
1216 GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1217 GList *clients;
1218
1219 GST_INFO_OBJECT (mssink, "starting");
1220
1221 mssink->main_context = g_main_context_new ();
1222
1223 CLIENTS_LOCK (mhsink);
1224 for (clients = mhsink->clients; clients; clients = clients->next) {
1225 GstSocketClient *client = clients->data;
1226 GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1227
1228 if (client->source)
1229 continue;
1230 mhsinkclass->hash_adding (mhsink, mhclient);
1231 }
1232 CLIENTS_UNLOCK (mhsink);
1233
1234 return TRUE;
1235 }
1236
1237 static gboolean
multisocketsink_hash_remove(gpointer key,gpointer value,gpointer data)1238 multisocketsink_hash_remove (gpointer key, gpointer value, gpointer data)
1239 {
1240 return TRUE;
1241 }
1242
1243 static void
gst_multi_socket_sink_stop_pre(GstMultiHandleSink * mhsink)1244 gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink)
1245 {
1246 GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1247
1248 if (mssink->main_context)
1249 g_main_context_wakeup (mssink->main_context);
1250 }
1251
1252 static void
gst_multi_socket_sink_stop_post(GstMultiHandleSink * mhsink)1253 gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink)
1254 {
1255 GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1256
1257 if (mssink->main_context) {
1258 g_main_context_unref (mssink->main_context);
1259 mssink->main_context = NULL;
1260 }
1261
1262 g_hash_table_foreach_remove (mhsink->handle_hash, multisocketsink_hash_remove,
1263 mssink);
1264 }
1265
1266 static gboolean
gst_multi_socket_sink_unlock(GstBaseSink * bsink)1267 gst_multi_socket_sink_unlock (GstBaseSink * bsink)
1268 {
1269 GstMultiSocketSink *sink;
1270
1271 sink = GST_MULTI_SOCKET_SINK (bsink);
1272
1273 GST_DEBUG_OBJECT (sink, "set to flushing");
1274 g_cancellable_cancel (sink->cancellable);
1275 if (sink->main_context)
1276 g_main_context_wakeup (sink->main_context);
1277
1278 return TRUE;
1279 }
1280
1281 /* will be called only between calls to start() and stop() */
1282 static gboolean
gst_multi_socket_sink_unlock_stop(GstBaseSink * bsink)1283 gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink)
1284 {
1285 GstMultiSocketSink *sink;
1286
1287 sink = GST_MULTI_SOCKET_SINK (bsink);
1288
1289 GST_DEBUG_OBJECT (sink, "unset flushing");
1290 g_object_unref (sink->cancellable);
1291 sink->cancellable = g_cancellable_new ();
1292
1293 return TRUE;
1294 }
1295
1296 static gboolean
gst_multi_socket_sink_propose_allocation(GstBaseSink * bsink,GstQuery * query)1297 gst_multi_socket_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query)
1298 {
1299 /* we support some meta */
1300 gst_query_add_allocation_meta (query, GST_NET_CONTROL_MESSAGE_META_API_TYPE,
1301 NULL);
1302
1303 return TRUE;
1304 }
1305