• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com>
5  * Copyright (C) <2011> Collabora Ltd.
6  *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 
24 /**
25  * SECTION:element-multisocketsink
26  * @title: multisocketsink
27  * @see_also: tcpserversink
28  *
29  * This plugin writes incoming data to a set of sockets. The
30  * sockets can be added to multisocketsink by emitting the #GstMultiSocketSink::add signal.
31  * For each descriptor added, the #GstMultiSocketSink::client-added signal will be called.
32  *
33  * A client can also be added with the #GstMultiSocketSink::add-full signal
34  * that allows for more control over what and how much data a client
35  * initially receives.
36  *
37  * Clients can be removed from multisocketsink by emitting the #GstMultiSocketSink::remove signal. For
38  * each descriptor removed, the #GstMultiSocketSink::client-removed signal will be called. The
39  * #GstMultiSocketSink::client-removed signal can also be fired when multisocketsink decides that a
40  * client is not active anymore or, depending on the value of the
41  * #GstMultiHandleSink:recover-policy property, if the client is reading too slowly.
42  * In all cases, multisocketsink will never close a socket itself.
43  * The user of multisocketsink is responsible for closing all sockets.
44  * This can for example be done in response to the #GstMultiSocketSink::client-socket-removed signal.
45  * Note that multisocketsink still has a reference to the socket when the
46  * #GstMultiSocketSink::client-removed signal is emitted, so that "get-stats" can be performed on
47  * the descriptor; it is therefore not safe to close the socket in
48  * the #GstMultiSocketSink::client-removed signal handler, and you should use the
49  * #GstMultiSocketSink::client-socket-removed signal to safely close the socket.
50  *
51  * Multisocketsink internally keeps a queue of the incoming buffers and uses a
52  * separate thread to send the buffers to the clients. This ensures that no
53  * client write can block the pipeline and that clients can read with different
54  * speeds.
55  *
56  * When adding a client to multisocketsink, the #GstMultiHandleSink:sync-method property will define
57  * which buffer in the queued buffers will be sent first to the client. Clients
58  * can be sent the most recent buffer (which might not be decodable by the
59  * client if it is not a keyframe), the next keyframe received in
60  * multisocketsink (which can take some time depending on the keyframe rate), or the
61  * last received keyframe (which will cause a simple burst-on-connect).
62  * Multisocketsink will always keep at least one keyframe in its internal buffers
63  * when the sync-mode is set to latest-keyframe.
64  *
65  * There are additional values for the #GstMultiHandleSink:sync-method
66  * property to allow finer control over burst-on-connect behaviour. By selecting
67  * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
68  * additionally requires that the burst begin with a keyframe, and
69  * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
70  * prefer a minimum burst size even if it requires not starting with a keyframe.
71  *
72  * Multisocketsink can be instructed to keep at least a minimum amount of data
73  * expressed in time or byte units in its internal queues with the
74  * #GstMultiHandleSink:time-min and #GstMultiHandleSink:bytes-min properties respectively.
75  * These properties are useful if the application adds clients with the
76  * #GstMultiSocketSink::add-full signal to make sure that a burst connect can
77  * actually be honored.
78  *
79  * When streaming data, clients are allowed to read at a different rate than
80  * the rate at which multisocketsink receives data. If the client is reading too
81  * fast, no data will be send to the client until multisocketsink receives more
82  * data. If the client, however, reads too slowly, data for that client will be
83  * queued up in multisocketsink. Two properties control the amount of data
84  * (buffers) that is queued in multisocketsink: #GstMultiHandleSink:buffers-max and
85  * #GstMultiHandleSink:buffers-soft-max. A client that falls behind by
86  * #GstMultiHandleSink:buffers-max is removed from multisocketsink forcibly.
87  *
88  * A client with a lag of at least #GstMultiHandleSink:buffers-soft-max enters the recovery
89  * procedure which is controlled with the #GstMultiHandleSink:recover-policy property.
90  * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently
91  * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
92  * positions the client to the soft limit in the buffer queue and
93  * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
94  * buffer queue.
95  *
96  * multisocketsink will by default synchronize on the clock before serving the
97  * buffers to the clients. This behaviour can be disabled by setting the sync
98  * property to FALSE. Multisocketsink will by default not do QoS and will never
99  * drop late buffers.
100  */
101 
102 #ifdef HAVE_CONFIG_H
103 #include "config.h"
104 #endif
105 
106 #include <gst/gst-i18n-plugin.h>
107 #include <gst/net/gstnetcontrolmessagemeta.h>
108 
109 #include <string.h>
110 
111 #include "gstmultisocketsink.h"
112 #include "gsttcpelements.h"
113 
114 #ifndef G_OS_WIN32
115 #include <netinet/in.h>
116 #endif
117 
118 #define NOT_IMPLEMENTED 0
119 
120 GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug);
121 #define GST_CAT_DEFAULT (multisocketsink_debug)
122 
123 /* MultiSocketSink signals and args */
124 enum
125 {
126   /* methods */
127   SIGNAL_ADD,
128   SIGNAL_ADD_BURST,
129   SIGNAL_REMOVE,
130   SIGNAL_REMOVE_FLUSH,
131   SIGNAL_GET_STATS,
132 
133   /* signals */
134   SIGNAL_CLIENT_ADDED,
135   SIGNAL_CLIENT_REMOVED,
136   SIGNAL_CLIENT_SOCKET_REMOVED,
137 
138   LAST_SIGNAL
139 };
140 
141 #define DEFAULT_SEND_DISPATCHED FALSE
142 #define DEFAULT_SEND_MESSAGES   FALSE
143 
144 enum
145 {
146   PROP_0,
147   PROP_SEND_DISPATCHED,
148   PROP_SEND_MESSAGES,
149   PROP_LAST
150 };
151 
152 static void gst_multi_socket_sink_finalize (GObject * object);
153 
154 static void gst_multi_socket_sink_add (GstMultiSocketSink * sink,
155     GSocket * socket);
156 static void gst_multi_socket_sink_add_full (GstMultiSocketSink * sink,
157     GSocket * socket, GstSyncMethod sync, GstFormat min_format,
158     guint64 min_value, GstFormat max_format, guint64 max_value);
159 static void gst_multi_socket_sink_remove (GstMultiSocketSink * sink,
160     GSocket * socket);
161 static void gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink,
162     GSocket * socket);
163 static GstStructure *gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink,
164     GSocket * socket);
165 
166 static void gst_multi_socket_sink_emit_client_added (GstMultiHandleSink * mhs,
167     GstMultiSinkHandle handle);
168 static void gst_multi_socket_sink_emit_client_removed (GstMultiHandleSink * mhs,
169     GstMultiSinkHandle handle, GstClientStatus status);
170 
171 static void gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink);
172 static void gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink);
173 static gboolean gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink);
174 static gpointer gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink);
175 static GstMultiHandleClient
176     * gst_multi_socket_sink_new_client (GstMultiHandleSink * mhsink,
177     GstMultiSinkHandle handle, GstSyncMethod sync_method);
178 static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client);
179 static void gst_multi_socket_sink_client_free (GstMultiHandleSink * mhsink,
180     GstMultiHandleClient * client);
181 static void gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle,
182     gchar debug[30]);
183 
184 static gpointer gst_multi_socket_sink_handle_hash_key (GstMultiSinkHandle
185     handle);
186 static void gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
187     GstMultiHandleClient * mhclient);
188 static void gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
189     GstMultiHandleClient * mhclient);
190 static void gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
191     GstSocketClient * client);
192 
193 static gboolean gst_multi_socket_sink_socket_condition (GstMultiSinkHandle
194     handle, GIOCondition condition, GstMultiSocketSink * sink);
195 
196 static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink);
197 static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink);
198 
199 static gboolean gst_multi_socket_sink_propose_allocation (GstBaseSink * bsink,
200     GstQuery * query);
201 
202 static void gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
203     const GValue * value, GParamSpec * pspec);
204 static void gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
205     GValue * value, GParamSpec * pspec);
206 
207 #define gst_multi_socket_sink_parent_class parent_class
208 G_DEFINE_TYPE (GstMultiSocketSink, gst_multi_socket_sink,
209     GST_TYPE_MULTI_HANDLE_SINK);
210 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (multisocketsink, "multisocketsink",
211     GST_RANK_NONE, GST_TYPE_MULTI_SOCKET_SINK, tcp_element_init (plugin));
212 
213 static guint gst_multi_socket_sink_signals[LAST_SIGNAL] = { 0 };
214 
215 static void
gst_multi_socket_sink_class_init(GstMultiSocketSinkClass * klass)216 gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
217 {
218   GObjectClass *gobject_class;
219   GstElementClass *gstelement_class;
220   GstBaseSinkClass *gstbasesink_class;
221   GstMultiHandleSinkClass *gstmultihandlesink_class;
222 
223   gobject_class = (GObjectClass *) klass;
224   gstelement_class = (GstElementClass *) klass;
225   gstbasesink_class = (GstBaseSinkClass *) klass;
226   gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass;
227 
228   gobject_class->set_property = gst_multi_socket_sink_set_property;
229   gobject_class->get_property = gst_multi_socket_sink_get_property;
230   gobject_class->finalize = gst_multi_socket_sink_finalize;
231 
232   /**
233    * GstMultiSocketSink:send-dispatched:
234    *
235    * Sends a GstNetworkMessageDispatched event upstream whenever a buffer
236    * is sent to a client.
237    * The event is a CUSTOM event name GstNetworkMessageDispatched and
238    * contains:
239    *
240    *   "object"  G_TYPE_OBJECT     : the object identifying the client
241    *   "buffer"  GST_TYPE_BUFFER   : the buffer sent to the client
242    *
243    * Since: 1.8.0
244    */
245   g_object_class_install_property (gobject_class, PROP_SEND_DISPATCHED,
246       g_param_spec_boolean ("send-dispatched", "Send Dispatched",
247           "If GstNetworkMessageDispatched events should be pushed",
248           DEFAULT_SEND_DISPATCHED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
249   /**
250    * GstMultiSocketSink:send-messages:
251    *
252    * Sends a GstNetworkMessage event upstream whenever a buffer
253    * is received from a client.
254    * The event is a CUSTOM event name GstNetworkMessage and contains:
255    *
256    *   "object"  G_TYPE_OBJECT     : the object identifying the client
257    *   "buffer"  GST_TYPE_BUFFER   : the buffer with data received from the
258    *                                 client
259    *
260    * Since: 1.8.0
261    */
262   g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
263       g_param_spec_boolean ("send-messages", "Send Messages",
264           "If GstNetworkMessage events should be pushed", DEFAULT_SEND_MESSAGES,
265           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
266 
267   /**
268    * GstMultiSocketSink::add:
269    * @gstmultisocketsink: the multisocketsink element to emit this signal on
270    * @socket:             the socket to add to multisocketsink
271    *
272    * Hand the given open socket to multisocketsink to write to.
273    */
274   gst_multi_socket_sink_signals[SIGNAL_ADD] =
275       g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
276       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
277       G_STRUCT_OFFSET (GstMultiSocketSinkClass, add), NULL, NULL,
278       NULL, G_TYPE_NONE, 1, G_TYPE_SOCKET);
279   /**
280    * GstMultiSocketSink::add-full:
281    * @gstmultisocketsink: the multisocketsink element to emit this signal on
282    * @socket:         the socket to add to multisocketsink
283    * @sync:           the sync method to use
284    * @format_min:     the format of @value_min
285    * @value_min:      the minimum amount of data to burst expressed in
286    *                  @format_min units.
287    * @format_max:     the format of @value_max
288    * @value_max:      the maximum amount of data to burst expressed in
289    *                  @format_max units.
290    *
291    * Hand the given open socket to multisocketsink to write to and
292    * specify the burst parameters for the new connection.
293    */
294   gst_multi_socket_sink_signals[SIGNAL_ADD_BURST] =
295       g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
296       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
297       G_STRUCT_OFFSET (GstMultiSocketSinkClass, add_full), NULL, NULL,
298       NULL, G_TYPE_NONE, 6, G_TYPE_SOCKET, GST_TYPE_SYNC_METHOD,
299       GST_TYPE_FORMAT, G_TYPE_UINT64, GST_TYPE_FORMAT, G_TYPE_UINT64);
300   /**
301    * GstMultiSocketSink::remove:
302    * @gstmultisocketsink: the multisocketsink element to emit this signal on
303    * @socket:             the socket to remove from multisocketsink
304    *
305    * Remove the given open socket from multisocketsink.
306    */
307   gst_multi_socket_sink_signals[SIGNAL_REMOVE] =
308       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
309       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
310       G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove), NULL, NULL, NULL,
311       G_TYPE_NONE, 1, G_TYPE_SOCKET);
312   /**
313    * GstMultiSocketSink::remove-flush:
314    * @gstmultisocketsink: the multisocketsink element to emit this signal on
315    * @socket:             the socket to remove from multisocketsink
316    *
317    * Remove the given open socket from multisocketsink after flushing all
318    * the pending data to the socket.
319    */
320   gst_multi_socket_sink_signals[SIGNAL_REMOVE_FLUSH] =
321       g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass),
322       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
323       G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove_flush), NULL, NULL, NULL,
324       G_TYPE_NONE, 1, G_TYPE_SOCKET);
325 
326   /**
327    * GstMultiSocketSink::get-stats:
328    * @gstmultisocketsink: the multisocketsink element to emit this signal on
329    * @socket:             the socket to get stats of from multisocketsink
330    *
331    * Get statistics about @socket. This function returns a GstStructure.
332    *
333    * Returns: a GstStructure with the statistics. The structure contains
334    *     values that represent: total number of bytes sent, time
335    *     when the client was added, time when the client was
336    *     disconnected/removed, time the client is/was active, last activity
337    *     time (in epoch seconds), number of buffers dropped.
338    *     All times are expressed in nanoseconds (GstClockTime).
339    */
340   gst_multi_socket_sink_signals[SIGNAL_GET_STATS] =
341       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
342       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
343       G_STRUCT_OFFSET (GstMultiSocketSinkClass, get_stats), NULL, NULL, NULL,
344       GST_TYPE_STRUCTURE, 1, G_TYPE_SOCKET);
345 
346   /**
347    * GstMultiSocketSink::client-added:
348    * @gstmultisocketsink: the multisocketsink element that emitted this signal
349    * @socket:             the socket that was added to multisocketsink
350    *
351    * The given socket was added to multisocketsink. This signal will
352    * be emitted from the streaming thread so application should be prepared
353    * for that.
354    */
355   gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED] =
356       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
357       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_OBJECT);
358   /**
359    * GstMultiSocketSink::client-removed:
360    * @gstmultisocketsink: the multisocketsink element that emitted this signal
361    * @socket:             the socket that is to be removed from multisocketsink
362    * @status:             the reason why the client was removed
363    *
364    * The given socket is about to be removed from multisocketsink. This
365    * signal will be emitted from the streaming thread so applications should
366    * be prepared for that.
367    *
368    * @gstmultisocketsink still holds a handle to @socket so it is possible to call
369    * the get-stats signal from this callback. For the same reason it is
370    * not safe to `close()` and reuse @socket in this callback.
371    */
372   gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED] =
373       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
374       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_SOCKET,
375       GST_TYPE_CLIENT_STATUS);
376   /**
377    * GstMultiSocketSink::client-socket-removed:
378    * @gstmultisocketsink: the multisocketsink element that emitted this signal
379    * @socket:             the socket that was removed from multisocketsink
380    *
381    * The given socket was removed from multisocketsink. This signal will
382    * be emitted from the streaming thread so applications should be prepared
383    * for that.
384    *
385    * In this callback, @gstmultisocketsink has removed all the information
386    * associated with @socket and it is therefore not possible to call get-stats
387    * with @socket. It is however safe to `close()` and reuse @fd in the callback.
388    */
389   gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED] =
390       g_signal_new ("client-socket-removed", G_TYPE_FROM_CLASS (klass),
391       G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_SOCKET);
392 
393   gst_element_class_set_static_metadata (gstelement_class,
394       "Multi socket sink", "Sink/Network",
395       "Send data to multiple sockets",
396       "Thomas Vander Stichele <thomas at apestaart dot org>, "
397       "Wim Taymans <wim@fluendo.com>, "
398       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
399 
400   gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock);
401   gstbasesink_class->unlock_stop =
402       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop);
403   gstbasesink_class->propose_allocation =
404       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_propose_allocation);
405 
406   klass->add = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add);
407   klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add_full);
408   klass->remove = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove);
409   klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_flush);
410   klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_get_stats);
411 
412   gstmultihandlesink_class->emit_client_added =
413       gst_multi_socket_sink_emit_client_added;
414   gstmultihandlesink_class->emit_client_removed =
415       gst_multi_socket_sink_emit_client_removed;
416 
417   gstmultihandlesink_class->stop_pre =
418       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_pre);
419   gstmultihandlesink_class->stop_post =
420       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_post);
421   gstmultihandlesink_class->start_pre =
422       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_start_pre);
423   gstmultihandlesink_class->thread =
424       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_thread);
425   gstmultihandlesink_class->new_client =
426       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_new_client);
427   gstmultihandlesink_class->client_get_fd =
428       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_get_fd);
429   gstmultihandlesink_class->client_free =
430       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_free);
431   gstmultihandlesink_class->handle_debug =
432       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_debug);
433   gstmultihandlesink_class->handle_hash_key =
434       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_hash_key);
435   gstmultihandlesink_class->hash_adding =
436       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_hash_adding);
437   gstmultihandlesink_class->hash_removing =
438       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_hash_removing);
439 
440   GST_DEBUG_CATEGORY_INIT (multisocketsink_debug, "multisocketsink", 0,
441       "Multi socket sink");
442 }
443 
444 static void
gst_multi_socket_sink_init(GstMultiSocketSink * this)445 gst_multi_socket_sink_init (GstMultiSocketSink * this)
446 {
447   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (this);
448 
449   mhsink->handle_hash = g_hash_table_new (g_direct_hash, g_int_equal);
450 
451   this->cancellable = g_cancellable_new ();
452   this->send_dispatched = DEFAULT_SEND_DISPATCHED;
453   this->send_messages = DEFAULT_SEND_MESSAGES;
454 }
455 
456 static void
gst_multi_socket_sink_finalize(GObject * object)457 gst_multi_socket_sink_finalize (GObject * object)
458 {
459   GstMultiSocketSink *this = GST_MULTI_SOCKET_SINK (object);
460 
461   if (this->cancellable) {
462     g_object_unref (this->cancellable);
463     this->cancellable = NULL;
464   }
465 
466   G_OBJECT_CLASS (parent_class)->finalize (object);
467 }
468 
469 /* methods to emit signals */
470 
471 static void
gst_multi_socket_sink_emit_client_added(GstMultiHandleSink * mhsink,GstMultiSinkHandle handle)472 gst_multi_socket_sink_emit_client_added (GstMultiHandleSink * mhsink,
473     GstMultiSinkHandle handle)
474 {
475   g_signal_emit (mhsink, gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0,
476       handle.socket);
477 }
478 
479 static void
gst_multi_socket_sink_emit_client_removed(GstMultiHandleSink * mhsink,GstMultiSinkHandle handle,GstClientStatus status)480 gst_multi_socket_sink_emit_client_removed (GstMultiHandleSink * mhsink,
481     GstMultiSinkHandle handle, GstClientStatus status)
482 {
483   g_signal_emit (mhsink, gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED],
484       0, handle.socket, status);
485 }
486 
487 /* action signals */
488 
489 static void
gst_multi_socket_sink_add(GstMultiSocketSink * sink,GSocket * socket)490 gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket)
491 {
492   GstMultiSinkHandle handle;
493 
494   handle.socket = socket;
495   gst_multi_handle_sink_add (GST_MULTI_HANDLE_SINK_CAST (sink), handle);
496 }
497 
498 static void
gst_multi_socket_sink_add_full(GstMultiSocketSink * sink,GSocket * socket,GstSyncMethod sync,GstFormat min_format,guint64 min_value,GstFormat max_format,guint64 max_value)499 gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
500     GstSyncMethod sync, GstFormat min_format, guint64 min_value,
501     GstFormat max_format, guint64 max_value)
502 {
503   GstMultiSinkHandle handle;
504 
505   handle.socket = socket;
506   gst_multi_handle_sink_add_full (GST_MULTI_HANDLE_SINK_CAST (sink), handle,
507       sync, min_format, min_value, max_format, max_value);
508 }
509 
510 static void
gst_multi_socket_sink_remove(GstMultiSocketSink * sink,GSocket * socket)511 gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket)
512 {
513   GstMultiSinkHandle handle;
514 
515   handle.socket = socket;
516   gst_multi_handle_sink_remove (GST_MULTI_HANDLE_SINK_CAST (sink), handle);
517 }
518 
519 static void
gst_multi_socket_sink_remove_flush(GstMultiSocketSink * sink,GSocket * socket)520 gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket)
521 {
522   GstMultiSinkHandle handle;
523 
524   handle.socket = socket;
525   gst_multi_handle_sink_remove_flush (GST_MULTI_HANDLE_SINK_CAST (sink),
526       handle);
527 }
528 
529 static GstStructure *
gst_multi_socket_sink_get_stats(GstMultiSocketSink * sink,GSocket * socket)530 gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, GSocket * socket)
531 {
532   GstMultiSinkHandle handle;
533 
534   handle.socket = socket;
535   return gst_multi_handle_sink_get_stats (GST_MULTI_HANDLE_SINK_CAST (sink),
536       handle);
537 }
538 
539 static GstMultiHandleClient *
gst_multi_socket_sink_new_client(GstMultiHandleSink * mhsink,GstMultiSinkHandle handle,GstSyncMethod sync_method)540 gst_multi_socket_sink_new_client (GstMultiHandleSink * mhsink,
541     GstMultiSinkHandle handle, GstSyncMethod sync_method)
542 {
543   GstSocketClient *client;
544   GstMultiHandleClient *mhclient;
545   GstMultiHandleSinkClass *mhsinkclass =
546       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
547 
548   /* create client datastructure */
549   g_assert (G_IS_SOCKET (handle.socket));
550   client = g_new0 (GstSocketClient, 1);
551   mhclient = (GstMultiHandleClient *) client;
552 
553   mhclient->handle.socket = G_SOCKET (g_object_ref (handle.socket));
554 
555   gst_multi_handle_sink_client_init (mhclient, sync_method);
556   mhsinkclass->handle_debug (handle, mhclient->debug);
557 
558   /* set the socket to non blocking */
559   g_socket_set_blocking (handle.socket, FALSE);
560 
561   /* we always read from a client */
562   mhsinkclass->hash_adding (mhsink, mhclient);
563 
564   gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient);
565 
566   return mhclient;
567 }
568 
569 static int
gst_multi_socket_sink_client_get_fd(GstMultiHandleClient * client)570 gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client)
571 {
572   return g_socket_get_fd (client->handle.socket);
573 }
574 
575 static void
gst_multi_socket_sink_client_free(GstMultiHandleSink * mhsink,GstMultiHandleClient * client)576 gst_multi_socket_sink_client_free (GstMultiHandleSink * mhsink,
577     GstMultiHandleClient * client)
578 {
579   g_assert (G_IS_SOCKET (client->handle.socket));
580 
581   g_signal_emit (mhsink,
582       gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0,
583       client->handle.socket);
584 
585   g_object_unref (client->handle.socket);
586 }
587 
588 static void
gst_multi_socket_sink_handle_debug(GstMultiSinkHandle handle,gchar debug[30])589 gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle, gchar debug[30])
590 {
591   g_snprintf (debug, 30, "[socket %p]", handle.socket);
592 }
593 
594 static gpointer
gst_multi_socket_sink_handle_hash_key(GstMultiSinkHandle handle)595 gst_multi_socket_sink_handle_hash_key (GstMultiSinkHandle handle)
596 {
597   return handle.socket;
598 }
599 
600 /* handle a read on a client socket,
601  * which either indicates a close or should be ignored
602  * returns FALSE if some error occurred or the client closed. */
603 static gboolean
gst_multi_socket_sink_handle_client_read(GstMultiSocketSink * sink,GstSocketClient * client)604 gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
605     GstSocketClient * client)
606 {
607   gboolean ret, do_event;
608   gchar dummy[256], *mem, *omem;
609   gssize nread;
610   GError *err = NULL;
611   gboolean first = TRUE;
612   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
613   gssize navail, maxmem;
614 
615   GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug);
616 
617   ret = TRUE;
618 
619   navail = g_socket_get_available_bytes (mhclient->handle.socket);
620   if (navail < 0)
621     return TRUE;
622 
623   /* only collect the data in a buffer when we need to send it with an event */
624   do_event = sink->send_messages && navail > 0;
625   if (do_event) {
626     omem = mem = g_malloc (navail);
627     maxmem = navail;
628   } else {
629     mem = dummy;
630     maxmem = sizeof (dummy);
631   }
632 
633   /* just Read 'n' Drop, could also just drop the client as it's not supposed
634    * to write to us except for closing the socket, I guess it's because we
635    * like to listen to our customers. */
636   do {
637     GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug);
638 
639     nread =
640         g_socket_receive (mhclient->handle.socket, mem, MIN (navail,
641             maxmem), sink->cancellable, &err);
642 
643     if (first && nread == 0) {
644       /* client sent close, so remove it */
645       GST_DEBUG_OBJECT (sink, "%s client asked for close, removing",
646           mhclient->debug);
647       mhclient->status = GST_CLIENT_STATUS_CLOSED;
648       ret = FALSE;
649       break;
650     } else if (nread < 0) {
651       if (err->code == G_IO_ERROR_WOULD_BLOCK)
652         break;
653 
654       GST_WARNING_OBJECT (sink, "%s could not read: %s",
655           mhclient->debug, err->message);
656       mhclient->status = GST_CLIENT_STATUS_ERROR;
657       ret = FALSE;
658       break;
659     }
660     navail -= nread;
661     if (do_event)
662       mem += nread;
663     first = FALSE;
664   } while (navail > 0);
665   g_clear_error (&err);
666 
667   if (do_event) {
668     if (ret) {
669       GstBuffer *buf;
670       GstEvent *ev;
671 
672       buf = gst_buffer_new_wrapped (omem, maxmem);
673       ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
674           gst_structure_new ("GstNetworkMessage",
675               "object", G_TYPE_OBJECT, mhclient->handle.socket,
676               "buffer", GST_TYPE_BUFFER, buf, NULL));
677       gst_buffer_unref (buf);
678 
679       gst_pad_push_event (GST_BASE_SINK_PAD (sink), ev);
680     } else
681       g_free (omem);
682   }
683   return ret;
684 }
685 
686 /**
687  * map_memory_output_vector_n:
688  * @buf: The #GstBuffer that should be mapped
689  * @offset: Offset into the buffer that should be mapped
690  * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
691  * @mapinfo: (out,array length=num_vectors): an array of #GstMapInfo structs to write into
692  * @num_vectors: the number of elements in @vectors to prevent buffer overruns
693  *
694  * Maps a buffer into memory, populating a #GOutputVector to use scatter-gather
695  * I/O to send the data over a socket.  The whole buffer won't be mapped into
696  * memory if it consists of more than @num_vectors #GstMemory s.
697  *
698  * Use #unmap_n_memorys after you are
699  * finished with the mappings.
700  *
701  * Returns: The number of GstMemorys mapped
702  */
703 static int
map_n_memory_output_vector(GstBuffer * buf,size_t offset,GOutputVector * vectors,GstMapInfo * mapinfo,int num_vectors)704 map_n_memory_output_vector (GstBuffer * buf, size_t offset,
705     GOutputVector * vectors, GstMapInfo * mapinfo, int num_vectors)
706 {
707   guint mem_idx, mem_len;
708   gsize mem_skip;
709   size_t maxsize;
710   int i;
711 
712   g_return_val_if_fail (num_vectors > 0, 0);
713   memset (vectors, 0, sizeof (GOutputVector) * num_vectors);
714 
715   maxsize = gst_buffer_get_size (buf) - offset;
716   if (!gst_buffer_find_memory (buf, offset, maxsize, &mem_idx, &mem_len,
717           &mem_skip))
718     g_error ("Unable to map memory at offset %" G_GSIZE_FORMAT ", buffer "
719         "length is %" G_GSIZE_FORMAT, offset, gst_buffer_get_size (buf));
720 
721   for (i = 0; i < mem_len && i < num_vectors; i++) {
722     GstMapInfo map = { 0 };
723     GstMemory *mem = gst_buffer_peek_memory (buf, mem_idx + i);
724     if (!gst_memory_map (mem, &map, GST_MAP_READ))
725       g_error ("Unable to map memory %p.  This should never happen.", mem);
726 
727     if (i == 0) {
728       vectors[i].buffer = map.data + mem_skip;
729       vectors[i].size = map.size - mem_skip;
730     } else {
731       vectors[i].buffer = map.data;
732       vectors[i].size = map.size;
733     }
734     mapinfo[i] = map;
735   }
736   return i;
737 }
738 
739 /**
740  * map_n_memory_output_vector:
741  * @buf: The #GstBuffer that should be mapped
742  * @offset: Offset into the buffer that should be mapped
743  * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
744  * @num_vectors: the number of elements in @vectors to prevent buffer overruns
745  *
746  * Returns: The number of GstMemorys mapped
747  */
748 static void
unmap_n_memorys(GstMapInfo * mapinfo,int num_mappings)749 unmap_n_memorys (GstMapInfo * mapinfo, int num_mappings)
750 {
751   int i;
752   g_return_if_fail (num_mappings > 0);
753 
754   for (i = 0; i < num_mappings; i++)
755     gst_memory_unmap (mapinfo[i].memory, &mapinfo[i]);
756 }
757 
758 static gsize
gst_buffer_get_cmsg_list(GstBuffer * buf,GSocketControlMessage ** msgs,gsize msg_space)759 gst_buffer_get_cmsg_list (GstBuffer * buf, GSocketControlMessage ** msgs,
760     gsize msg_space)
761 {
762   gpointer iter_state = NULL;
763   GstMeta *meta;
764   gsize msg_count = 0;
765 
766   while ((meta = gst_buffer_iterate_meta (buf, &iter_state)) != NULL
767       && msg_count < msg_space) {
768     if (meta->info->api == GST_NET_CONTROL_MESSAGE_META_API_TYPE)
769       msgs[msg_count++] = ((GstNetControlMessageMeta *) meta)->message;
770   }
771 
772   return msg_count;
773 }
774 
775 #define CMSG_MAX 255
776 
777 static gssize
gst_multi_socket_sink_write(GstMultiSocketSink * sink,GSocket * sock,GstBuffer * buffer,gsize bufoffset,GCancellable * cancellable,GError ** err)778 gst_multi_socket_sink_write (GstMultiSocketSink * sink,
779     GSocket * sock, GstBuffer * buffer, gsize bufoffset,
780     GCancellable * cancellable, GError ** err)
781 {
782   GstMapInfo maps[8];
783   GOutputVector vec[8];
784   guint mems_mapped;
785   gssize wrote;
786   GSocketControlMessage *cmsgs[CMSG_MAX];
787   gsize msg_count;
788 
789   mems_mapped = map_n_memory_output_vector (buffer, bufoffset, vec, maps, 8);
790 
791   msg_count = gst_buffer_get_cmsg_list (buffer, cmsgs, CMSG_MAX);
792 
793   wrote =
794       g_socket_send_message (sock, NULL, vec, mems_mapped, cmsgs, msg_count, 0,
795       cancellable, err);
796   unmap_n_memorys (maps, mems_mapped);
797   return wrote;
798 }
799 
800 /* Handle a write on a client,
801  * which indicates a read request from a client.
802  *
803  * For each client we maintain a queue of GstBuffers that contain the raw bytes
804  * we need to send to the client.
805  *
806  * We first check to see if we need to send streamheaders. If so, we queue them.
807  *
808  * Then we run into the main loop that tries to send as many buffers as
809  * possible. It will first exhaust the mhclient->sending queue and if the queue
810  * is empty, it will pick a buffer from the global queue.
811  *
812  * Sending the buffers from the mhclient->sending queue is basically writing
813  * the bytes to the socket and maintaining a count of the bytes that were
814  * sent. When the buffer is completely sent, it is removed from the
815  * mhclient->sending queue and we try to pick a new buffer for sending.
816  *
817  * When the sending returns a partial buffer we stop sending more data as
818  * the next send operation could block.
819  *
820  * This functions returns FALSE if some error occurred.
821  */
822 static gboolean
gst_multi_socket_sink_handle_client_write(GstMultiSocketSink * sink,GstSocketClient * client)823 gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
824     GstSocketClient * client)
825 {
826   gboolean more;
827   gboolean flushing;
828   GstClockTime now, now_monotonic;
829   GError *err = NULL;
830   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
831   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
832   GstMultiHandleSinkClass *mhsinkclass =
833       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
834 
835 
836   now = g_get_real_time () * GST_USECOND;
837   now_monotonic = g_get_monotonic_time () * GST_USECOND;
838 
839   flushing = mhclient->status == GST_CLIENT_STATUS_FLUSHING;
840 
841   more = TRUE;
842   do {
843     if (!mhclient->sending) {
844       /* client is not working on a buffer */
845       if (mhclient->bufpos == -1) {
846         /* client is too fast, remove from write queue until new buffer is
847          * available */
848         gst_multi_socket_sink_stop_sending (sink, client);
849 
850         /* if we flushed out all of the client buffers, we can stop */
851         if (mhclient->flushcount == 0)
852           goto flushed;
853 
854         return TRUE;
855       } else {
856         /* client can pick a buffer from the global queue */
857         GstBuffer *buf;
858         GstClockTime timestamp;
859 
860         /* for new connections, we need to find a good spot in the
861          * bufqueue to start streaming from */
862         if (mhclient->new_connection && !flushing) {
863           gint position =
864               gst_multi_handle_sink_new_client_position (mhsink, mhclient);
865 
866           if (position >= 0) {
867             /* we got a valid spot in the queue */
868             mhclient->new_connection = FALSE;
869             mhclient->bufpos = position;
870           } else {
871             /* cannot send data to this client yet */
872             gst_multi_socket_sink_stop_sending (sink, client);
873             return TRUE;
874           }
875         }
876 
877         /* we flushed all remaining buffers, no need to get a new one */
878         if (mhclient->flushcount == 0)
879           goto flushed;
880 
881         /* grab buffer */
882         buf = g_array_index (mhsink->bufqueue, GstBuffer *, mhclient->bufpos);
883         mhclient->bufpos--;
884 
885         /* update stats */
886         timestamp = GST_BUFFER_TIMESTAMP (buf);
887         if (mhclient->first_buffer_ts == GST_CLOCK_TIME_NONE)
888           mhclient->first_buffer_ts = timestamp;
889         if (timestamp != -1)
890           mhclient->last_buffer_ts = timestamp;
891 
892         /* decrease flushcount */
893         if (mhclient->flushcount != -1)
894           mhclient->flushcount--;
895 
896         GST_LOG_OBJECT (sink, "%s client %p at position %d",
897             mhclient->debug, client, mhclient->bufpos);
898 
899         /* queueing a buffer will ref it */
900         mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
901 
902         /* need to start from the first byte for this new buffer */
903         mhclient->bufoffset = 0;
904       }
905     }
906 
907     /* see if we need to send something */
908     if (mhclient->sending) {
909       gssize wrote;
910       GstBuffer *head;
911 
912       /* pick first buffer from list */
913       head = GST_BUFFER (mhclient->sending->data);
914 
915       wrote = gst_multi_socket_sink_write (sink, mhclient->handle.socket, head,
916           mhclient->bufoffset, sink->cancellable, &err);
917 
918       if (wrote < 0) {
919         /* hmm error.. */
920         if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED)) {
921           goto connection_reset;
922         } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
923           /* write would block, try again later */
924           GST_LOG_OBJECT (sink, "write would block %p",
925               mhclient->handle.socket);
926           more = FALSE;
927           g_clear_error (&err);
928         } else {
929           goto write_error;
930         }
931       } else {
932         if (wrote < (gst_buffer_get_size (head) - mhclient->bufoffset)) {
933           /* partial write, try again now */
934           GST_LOG_OBJECT (sink,
935               "partial write on %p of %" G_GSSIZE_FORMAT " bytes",
936               mhclient->handle.socket, wrote);
937           mhclient->bufoffset += wrote;
938         } else {
939           if (sink->send_dispatched) {
940             gst_pad_push_event (GST_BASE_SINK_PAD (mhsink),
941                 gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
942                     gst_structure_new ("GstNetworkMessageDispatched",
943                         "object", G_TYPE_OBJECT, mhclient->handle.socket,
944                         "buffer", GST_TYPE_BUFFER, head, NULL)));
945           }
946           /* complete buffer was written, we can proceed to the next one */
947           mhclient->sending = g_slist_remove (mhclient->sending, head);
948           gst_buffer_unref (head);
949           /* make sure we start from byte 0 for the next buffer */
950           mhclient->bufoffset = 0;
951         }
952         /* update stats */
953         mhclient->bytes_sent += wrote;
954         mhclient->last_activity_time = now;
955         mhclient->last_activity_time_monotonic = now_monotonic;
956         mhsink->bytes_served += wrote;
957       }
958     }
959   } while (more);
960 
961   return TRUE;
962 
963   /* ERRORS */
964 flushed:
965   {
966     GST_DEBUG_OBJECT (sink, "%s flushed, removing", mhclient->debug);
967     mhclient->status = GST_CLIENT_STATUS_REMOVED;
968     return FALSE;
969   }
970 connection_reset:
971   {
972     GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing",
973         mhclient->debug);
974     mhclient->status = GST_CLIENT_STATUS_CLOSED;
975     g_clear_error (&err);
976     return FALSE;
977   }
978 write_error:
979   {
980     GST_WARNING_OBJECT (sink,
981         "%s could not write, removing client: %s", mhclient->debug,
982         err->message);
983     g_clear_error (&err);
984     mhclient->status = GST_CLIENT_STATUS_ERROR;
985     return FALSE;
986   }
987 }
988 
989 static void
ensure_condition(GstMultiSocketSink * sink,GstSocketClient * client,GIOCondition condition)990 ensure_condition (GstMultiSocketSink * sink, GstSocketClient * client,
991     GIOCondition condition)
992 {
993   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
994 
995   if (client->condition == condition)
996     return;
997 
998   if (client->source) {
999     g_source_destroy (client->source);
1000     g_source_unref (client->source);
1001   }
1002   if (condition && sink->main_context) {
1003     client->source = g_socket_create_source (mhclient->handle.socket,
1004         condition, sink->cancellable);
1005     g_source_set_callback (client->source,
1006         (GSourceFunc) gst_multi_socket_sink_socket_condition,
1007         gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
1008     g_source_attach (client->source, sink->main_context);
1009   } else {
1010     client->source = NULL;
1011     condition = 0;
1012   }
1013   client->condition = condition;
1014 }
1015 
1016 static void
gst_multi_socket_sink_hash_adding(GstMultiHandleSink * mhsink,GstMultiHandleClient * mhclient)1017 gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
1018     GstMultiHandleClient * mhclient)
1019 {
1020   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1021   GstSocketClient *client = (GstSocketClient *) (mhclient);
1022 
1023   ensure_condition (sink, client,
1024       G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP);
1025 }
1026 
1027 static void
gst_multi_socket_sink_hash_removing(GstMultiHandleSink * mhsink,GstMultiHandleClient * mhclient)1028 gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
1029     GstMultiHandleClient * mhclient)
1030 {
1031   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1032   GstSocketClient *client = (GstSocketClient *) (mhclient);
1033 
1034   ensure_condition (sink, client, 0);
1035 }
1036 
1037 static void
gst_multi_socket_sink_stop_sending(GstMultiSocketSink * sink,GstSocketClient * client)1038 gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
1039     GstSocketClient * client)
1040 {
1041   ensure_condition (sink, client, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
1042 }
1043 
1044 /* Handle the clients. This is called when a socket becomes ready
1045  * to read or writable. Badly behaving clients are put on a
1046  * garbage list and removed.
1047  */
1048 static gboolean
gst_multi_socket_sink_socket_condition(GstMultiSinkHandle handle,GIOCondition condition,GstMultiSocketSink * sink)1049 gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle,
1050     GIOCondition condition, GstMultiSocketSink * sink)
1051 {
1052   GList *clink;
1053   GstSocketClient *client;
1054   gboolean ret = TRUE;
1055   GstMultiHandleClient *mhclient;
1056   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1057   GstMultiHandleSinkClass *mhsinkclass =
1058       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1059 
1060   CLIENTS_LOCK (mhsink);
1061   clink = g_hash_table_lookup (mhsink->handle_hash,
1062       mhsinkclass->handle_hash_key (handle));
1063   if (clink == NULL) {
1064     ret = FALSE;
1065     goto done;
1066   }
1067 
1068   client = clink->data;
1069   mhclient = (GstMultiHandleClient *) client;
1070 
1071   if (mhclient->status != GST_CLIENT_STATUS_FLUSHING
1072       && mhclient->status != GST_CLIENT_STATUS_OK) {
1073     gst_multi_handle_sink_remove_client_link (mhsink, clink);
1074     ret = FALSE;
1075     goto done;
1076   }
1077 
1078   if ((condition & G_IO_ERR)) {
1079     GST_WARNING_OBJECT (sink, "%s has error", mhclient->debug);
1080     mhclient->status = GST_CLIENT_STATUS_ERROR;
1081     gst_multi_handle_sink_remove_client_link (mhsink, clink);
1082     ret = FALSE;
1083     goto done;
1084   } else if ((condition & G_IO_HUP)) {
1085     mhclient->status = GST_CLIENT_STATUS_CLOSED;
1086     gst_multi_handle_sink_remove_client_link (mhsink, clink);
1087     ret = FALSE;
1088     goto done;
1089   }
1090   if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
1091     /* handle client read */
1092     if (!gst_multi_socket_sink_handle_client_read (sink, client)) {
1093       gst_multi_handle_sink_remove_client_link (mhsink, clink);
1094       ret = FALSE;
1095       goto done;
1096     }
1097   }
1098   if ((condition & G_IO_OUT)) {
1099     /* handle client write */
1100     if (!gst_multi_socket_sink_handle_client_write (sink, client)) {
1101       gst_multi_handle_sink_remove_client_link (mhsink, clink);
1102       ret = FALSE;
1103       goto done;
1104     }
1105   }
1106 
1107 done:
1108   CLIENTS_UNLOCK (mhsink);
1109 
1110   return ret;
1111 }
1112 
1113 static gboolean
gst_multi_socket_sink_timeout(GstMultiSocketSink * sink)1114 gst_multi_socket_sink_timeout (GstMultiSocketSink * sink)
1115 {
1116   GstClockTime now;
1117   GList *clients;
1118   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1119 
1120   now = g_get_monotonic_time () * GST_USECOND;
1121 
1122   CLIENTS_LOCK (mhsink);
1123   for (clients = mhsink->clients; clients; clients = clients->next) {
1124     GstSocketClient *client;
1125     GstMultiHandleClient *mhclient;
1126 
1127     client = clients->data;
1128     mhclient = (GstMultiHandleClient *) client;
1129     if (mhsink->timeout > 0
1130         && now - mhclient->last_activity_time_monotonic > mhsink->timeout) {
1131       mhclient->status = GST_CLIENT_STATUS_SLOW;
1132       gst_multi_handle_sink_remove_client_link (mhsink, clients);
1133     }
1134   }
1135   CLIENTS_UNLOCK (mhsink);
1136 
1137   return FALSE;
1138 }
1139 
1140 /* we handle the client communication in another thread so that we do not block
1141  * the gstreamer thread while we select() on the client fds */
1142 static gpointer
gst_multi_socket_sink_thread(GstMultiHandleSink * mhsink)1143 gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink)
1144 {
1145   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1146   GSource *timeout = NULL;
1147 
1148   while (mhsink->running) {
1149     if (mhsink->timeout > 0) {
1150       timeout = g_timeout_source_new (mhsink->timeout / GST_MSECOND);
1151 
1152       g_source_set_callback (timeout,
1153           (GSourceFunc) gst_multi_socket_sink_timeout, gst_object_ref (sink),
1154           (GDestroyNotify) gst_object_unref);
1155       g_source_attach (timeout, sink->main_context);
1156     }
1157 
1158     /* Returns after handling all pending events or when
1159      * _wakeup() was called. In any case we have to add
1160      * a new timeout because something happened.
1161      */
1162     g_main_context_iteration (sink->main_context, TRUE);
1163 
1164     if (timeout) {
1165       g_source_destroy (timeout);
1166       g_source_unref (timeout);
1167     }
1168   }
1169 
1170   return NULL;
1171 }
1172 
1173 static void
gst_multi_socket_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1174 gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
1175     const GValue * value, GParamSpec * pspec)
1176 {
1177   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (object);
1178 
1179   switch (prop_id) {
1180     case PROP_SEND_DISPATCHED:
1181       sink->send_dispatched = g_value_get_boolean (value);
1182       break;
1183     case PROP_SEND_MESSAGES:
1184       sink->send_messages = g_value_get_boolean (value);
1185       break;
1186     default:
1187       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1188       break;
1189   }
1190 }
1191 
1192 static void
gst_multi_socket_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1193 gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
1194     GValue * value, GParamSpec * pspec)
1195 {
1196   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (object);
1197 
1198   switch (prop_id) {
1199     case PROP_SEND_DISPATCHED:
1200       g_value_set_boolean (value, sink->send_dispatched);
1201       break;
1202     case PROP_SEND_MESSAGES:
1203       g_value_set_boolean (value, sink->send_messages);
1204       break;
1205     default:
1206       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1207       break;
1208   }
1209 }
1210 
1211 static gboolean
gst_multi_socket_sink_start_pre(GstMultiHandleSink * mhsink)1212 gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink)
1213 {
1214   GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1215   GstMultiHandleSinkClass *mhsinkclass =
1216       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1217   GList *clients;
1218 
1219   GST_INFO_OBJECT (mssink, "starting");
1220 
1221   mssink->main_context = g_main_context_new ();
1222 
1223   CLIENTS_LOCK (mhsink);
1224   for (clients = mhsink->clients; clients; clients = clients->next) {
1225     GstSocketClient *client = clients->data;
1226     GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1227 
1228     if (client->source)
1229       continue;
1230     mhsinkclass->hash_adding (mhsink, mhclient);
1231   }
1232   CLIENTS_UNLOCK (mhsink);
1233 
1234   return TRUE;
1235 }
1236 
1237 static gboolean
multisocketsink_hash_remove(gpointer key,gpointer value,gpointer data)1238 multisocketsink_hash_remove (gpointer key, gpointer value, gpointer data)
1239 {
1240   return TRUE;
1241 }
1242 
1243 static void
gst_multi_socket_sink_stop_pre(GstMultiHandleSink * mhsink)1244 gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink)
1245 {
1246   GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1247 
1248   if (mssink->main_context)
1249     g_main_context_wakeup (mssink->main_context);
1250 }
1251 
1252 static void
gst_multi_socket_sink_stop_post(GstMultiHandleSink * mhsink)1253 gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink)
1254 {
1255   GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1256 
1257   if (mssink->main_context) {
1258     g_main_context_unref (mssink->main_context);
1259     mssink->main_context = NULL;
1260   }
1261 
1262   g_hash_table_foreach_remove (mhsink->handle_hash, multisocketsink_hash_remove,
1263       mssink);
1264 }
1265 
1266 static gboolean
gst_multi_socket_sink_unlock(GstBaseSink * bsink)1267 gst_multi_socket_sink_unlock (GstBaseSink * bsink)
1268 {
1269   GstMultiSocketSink *sink;
1270 
1271   sink = GST_MULTI_SOCKET_SINK (bsink);
1272 
1273   GST_DEBUG_OBJECT (sink, "set to flushing");
1274   g_cancellable_cancel (sink->cancellable);
1275   if (sink->main_context)
1276     g_main_context_wakeup (sink->main_context);
1277 
1278   return TRUE;
1279 }
1280 
1281 /* will be called only between calls to start() and stop() */
1282 static gboolean
gst_multi_socket_sink_unlock_stop(GstBaseSink * bsink)1283 gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink)
1284 {
1285   GstMultiSocketSink *sink;
1286 
1287   sink = GST_MULTI_SOCKET_SINK (bsink);
1288 
1289   GST_DEBUG_OBJECT (sink, "unset flushing");
1290   g_object_unref (sink->cancellable);
1291   sink->cancellable = g_cancellable_new ();
1292 
1293   return TRUE;
1294 }
1295 
1296 static gboolean
gst_multi_socket_sink_propose_allocation(GstBaseSink * bsink,GstQuery * query)1297 gst_multi_socket_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query)
1298 {
1299   /* we support some meta */
1300   gst_query_add_allocation_meta (query, GST_NET_CONTROL_MESSAGE_META_API_TYPE,
1301       NULL);
1302 
1303   return TRUE;
1304 }
1305