• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com>
5  * Copyright (C) <2011> Collabora Ltd.
6  *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 
24 /**
25  * SECTION:element-multisocketsink
26  * @title: multisocketsink
27  * @see_also: tcpserversink
28  *
29  * This plugin writes incoming data to a set of file descriptors. The
30  * file descriptors can be added to multisocketsink by emitting the #GstMultiSocketSink::add signal.
31  * For each descriptor added, the #GstMultiSocketSink::client-added signal will be called.
32  *
33  * A client can also be added with the #GstMultiSocketSink::add-full signal
34  * that allows for more control over what and how much data a client
35  * initially receives.
36  *
37  * Clients can be removed from multisocketsink by emitting the #GstMultiSocketSink::remove signal. For
38  * each descriptor removed, the #GstMultiSocketSink::client-removed signal will be called. The
39  * #GstMultiSocketSink::client-removed signal can also be fired when multisocketsink decides that a
40  * client is not active anymore or, depending on the value of the
41  * #GstMultiSocketSink:recover-policy property, if the client is reading too slowly.
42  * In all cases, multisocketsink will never close a file descriptor itself.
43  * The user of multisocketsink is responsible for closing all file descriptors.
44  * This can for example be done in response to the #GstMultiSocketSink::client-fd-removed signal.
45  * Note that multisocketsink still has a reference to the file descriptor when the
46  * #GstMultiSocketSink::client-removed signal is emitted, so that "get-stats" can be performed on
47  * the descriptor; it is therefore not safe to close the file descriptor in
48  * the #GstMultiSocketSink::client-removed signal handler, and you should use the
49  * #GstMultiSocketSink::client-fd-removed signal to safely close the fd.
50  *
51  * Multisocketsink internally keeps a queue of the incoming buffers and uses a
52  * separate thread to send the buffers to the clients. This ensures that no
53  * client write can block the pipeline and that clients can read with different
54  * speeds.
55  *
56  * When adding a client to multisocketsink, the #GstMultiSocketSink:sync-method property will define
57  * which buffer in the queued buffers will be sent first to the client. Clients
58  * can be sent the most recent buffer (which might not be decodable by the
59  * client if it is not a keyframe), the next keyframe received in
60  * multisocketsink (which can take some time depending on the keyframe rate), or the
61  * last received keyframe (which will cause a simple burst-on-connect).
62  * Multisocketsink will always keep at least one keyframe in its internal buffers
63  * when the sync-mode is set to latest-keyframe.
64  *
65  * There are additional values for the #GstMultiSocketSink:sync-method
66  * property to allow finer control over burst-on-connect behaviour. By selecting
67  * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe'
68  * additionally requires that the burst begin with a keyframe, and
69  * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will
70  * prefer a minimum burst size even if it requires not starting with a keyframe.
71  *
72  * Multisocketsink can be instructed to keep at least a minimum amount of data
73  * expressed in time or byte units in its internal queues with the
74  * #GstMultiSocketSink:time-min and #GstMultiSocketSink:bytes-min properties respectively.
75  * These properties are useful if the application adds clients with the
76  * #GstMultiSocketSink::add-full signal to make sure that a burst connect can
77  * actually be honored.
78  *
79  * When streaming data, clients are allowed to read at a different rate than
80  * the rate at which multisocketsink receives data. If the client is reading too
81  * fast, no data will be send to the client until multisocketsink receives more
82  * data. If the client, however, reads too slowly, data for that client will be
83  * queued up in multisocketsink. Two properties control the amount of data
84  * (buffers) that is queued in multisocketsink: #GstMultiSocketSink:buffers-max and
85  * #GstMultiSocketSink:buffers-soft-max. A client that falls behind by
86  * #GstMultiSocketSink:buffers-max is removed from multisocketsink forcibly.
87  *
88  * A client with a lag of at least #GstMultiSocketSink:buffers-soft-max enters the recovery
89  * procedure which is controlled with the #GstMultiSocketSink:recover-policy property.
90  * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently
91  * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
92  * positions the client to the soft limit in the buffer queue and
93  * RESYNC_KEYFRAME positions the client at the most recent keyframe in the
94  * buffer queue.
95  *
96  * multisocketsink will by default synchronize on the clock before serving the
97  * buffers to the clients. This behaviour can be disabled by setting the sync
98  * property to FALSE. Multisocketsink will by default not do QoS and will never
99  * drop late buffers.
100  */
101 
102 #ifdef HAVE_CONFIG_H
103 #include "config.h"
104 #endif
105 
106 #include <gst/gst-i18n-plugin.h>
107 #include <gst/net/gstnetcontrolmessagemeta.h>
108 
109 #include <string.h>
110 
111 #include "gstmultisocketsink.h"
112 
113 #ifndef G_OS_WIN32
114 #include <netinet/in.h>
115 #endif
116 
117 #define NOT_IMPLEMENTED 0
118 
119 GST_DEBUG_CATEGORY_STATIC (multisocketsink_debug);
120 #define GST_CAT_DEFAULT (multisocketsink_debug)
121 
122 /* MultiSocketSink signals and args */
123 enum
124 {
125   /* methods */
126   SIGNAL_ADD,
127   SIGNAL_ADD_BURST,
128   SIGNAL_REMOVE,
129   SIGNAL_REMOVE_FLUSH,
130   SIGNAL_GET_STATS,
131 
132   /* signals */
133   SIGNAL_CLIENT_ADDED,
134   SIGNAL_CLIENT_REMOVED,
135   SIGNAL_CLIENT_SOCKET_REMOVED,
136 
137   LAST_SIGNAL
138 };
139 
140 #define DEFAULT_SEND_DISPATCHED FALSE
141 #define DEFAULT_SEND_MESSAGES   FALSE
142 
143 enum
144 {
145   PROP_0,
146   PROP_SEND_DISPATCHED,
147   PROP_SEND_MESSAGES,
148   PROP_LAST
149 };
150 
151 static void gst_multi_socket_sink_finalize (GObject * object);
152 
153 static void gst_multi_socket_sink_add (GstMultiSocketSink * sink,
154     GSocket * socket);
155 static void gst_multi_socket_sink_add_full (GstMultiSocketSink * sink,
156     GSocket * socket, GstSyncMethod sync, GstFormat min_format,
157     guint64 min_value, GstFormat max_format, guint64 max_value);
158 static void gst_multi_socket_sink_remove (GstMultiSocketSink * sink,
159     GSocket * socket);
160 static void gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink,
161     GSocket * socket);
162 static GstStructure *gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink,
163     GSocket * socket);
164 
165 static void gst_multi_socket_sink_emit_client_added (GstMultiHandleSink * mhs,
166     GstMultiSinkHandle handle);
167 static void gst_multi_socket_sink_emit_client_removed (GstMultiHandleSink * mhs,
168     GstMultiSinkHandle handle, GstClientStatus status);
169 
170 static void gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink);
171 static void gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink);
172 static gboolean gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink);
173 static gpointer gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink);
174 static GstMultiHandleClient
175     * gst_multi_socket_sink_new_client (GstMultiHandleSink * mhsink,
176     GstMultiSinkHandle handle, GstSyncMethod sync_method);
177 static int gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client);
178 static void gst_multi_socket_sink_client_free (GstMultiHandleSink * mhsink,
179     GstMultiHandleClient * client);
180 static void gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle,
181     gchar debug[30]);
182 
183 static gpointer gst_multi_socket_sink_handle_hash_key (GstMultiSinkHandle
184     handle);
185 static void gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
186     GstMultiHandleClient * mhclient);
187 static void gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
188     GstMultiHandleClient * mhclient);
189 static void gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
190     GstSocketClient * client);
191 
192 static gboolean gst_multi_socket_sink_socket_condition (GstMultiSinkHandle
193     handle, GIOCondition condition, GstMultiSocketSink * sink);
194 
195 static gboolean gst_multi_socket_sink_unlock (GstBaseSink * bsink);
196 static gboolean gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink);
197 
198 static gboolean gst_multi_socket_sink_propose_allocation (GstBaseSink * bsink,
199     GstQuery * query);
200 
201 static void gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
202     const GValue * value, GParamSpec * pspec);
203 static void gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
204     GValue * value, GParamSpec * pspec);
205 
206 #define gst_multi_socket_sink_parent_class parent_class
207 G_DEFINE_TYPE (GstMultiSocketSink, gst_multi_socket_sink,
208     GST_TYPE_MULTI_HANDLE_SINK);
209 
210 static guint gst_multi_socket_sink_signals[LAST_SIGNAL] = { 0 };
211 
212 static void
gst_multi_socket_sink_class_init(GstMultiSocketSinkClass * klass)213 gst_multi_socket_sink_class_init (GstMultiSocketSinkClass * klass)
214 {
215   GObjectClass *gobject_class;
216   GstElementClass *gstelement_class;
217   GstBaseSinkClass *gstbasesink_class;
218   GstMultiHandleSinkClass *gstmultihandlesink_class;
219 
220   gobject_class = (GObjectClass *) klass;
221   gstelement_class = (GstElementClass *) klass;
222   gstbasesink_class = (GstBaseSinkClass *) klass;
223   gstmultihandlesink_class = (GstMultiHandleSinkClass *) klass;
224 
225   gobject_class->set_property = gst_multi_socket_sink_set_property;
226   gobject_class->get_property = gst_multi_socket_sink_get_property;
227   gobject_class->finalize = gst_multi_socket_sink_finalize;
228 
229   /**
230    * GstMultiSocketSink:send-dispatched:
231    *
232    * Sends a GstNetworkMessageDispatched event upstream whenever a buffer
233    * is sent to a client.
234    * The event is a CUSTOM event name GstNetworkMessageDispatched and
235    * contains:
236    *
237    *   "object"  G_TYPE_OBJECT     : the object identifying the client
238    *   "buffer"  GST_TYPE_BUFFER   : the buffer sent to the client
239    *
240    * Since: 1.8.0
241    */
242   g_object_class_install_property (gobject_class, PROP_SEND_DISPATCHED,
243       g_param_spec_boolean ("send-dispatched", "Send Dispatched",
244           "If GstNetworkMessageDispatched events should be pushed",
245           DEFAULT_SEND_DISPATCHED, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
246   /**
247    * GstMultiSocketSink:send-messages:
248    *
249    * Sends a GstNetworkMessage event upstream whenever a buffer
250    * is received from a client.
251    * The event is a CUSTOM event name GstNetworkMessage and contains:
252    *
253    *   "object"  G_TYPE_OBJECT     : the object identifying the client
254    *   "buffer"  GST_TYPE_BUFFER   : the buffer with data received from the
255    *                                 client
256    *
257    * Since: 1.8.0
258    */
259   g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
260       g_param_spec_boolean ("send-messages", "Send Messages",
261           "If GstNetworkMessage events should be pushed", DEFAULT_SEND_MESSAGES,
262           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
263 
264   /**
265    * GstMultiSocketSink::add:
266    * @gstmultisocketsink: the multisocketsink element to emit this signal on
267    * @socket:             the socket to add to multisocketsink
268    *
269    * Hand the given open socket to multisocketsink to write to.
270    */
271   gst_multi_socket_sink_signals[SIGNAL_ADD] =
272       g_signal_new ("add", G_TYPE_FROM_CLASS (klass),
273       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
274       G_STRUCT_OFFSET (GstMultiSocketSinkClass, add), NULL, NULL,
275       g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_SOCKET);
276   /**
277    * GstMultiSocketSink::add-full:
278    * @gstmultisocketsink: the multisocketsink element to emit this signal on
279    * @socket:         the socket to add to multisocketsink
280    * @sync:           the sync method to use
281    * @format_min:     the format of @value_min
282    * @value_min:      the minimum amount of data to burst expressed in
283    *                  @format_min units.
284    * @format_max:     the format of @value_max
285    * @value_max:      the maximum amount of data to burst expressed in
286    *                  @format_max units.
287    *
288    * Hand the given open socket to multisocketsink to write to and
289    * specify the burst parameters for the new connection.
290    */
291   gst_multi_socket_sink_signals[SIGNAL_ADD_BURST] =
292       g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass),
293       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
294       G_STRUCT_OFFSET (GstMultiSocketSinkClass, add_full), NULL, NULL,
295       g_cclosure_marshal_generic, G_TYPE_NONE, 6,
296       G_TYPE_SOCKET, GST_TYPE_SYNC_METHOD, GST_TYPE_FORMAT, G_TYPE_UINT64,
297       GST_TYPE_FORMAT, G_TYPE_UINT64);
298   /**
299    * GstMultiSocketSink::remove:
300    * @gstmultisocketsink: the multisocketsink element to emit this signal on
301    * @socket:             the socket to remove from multisocketsink
302    *
303    * Remove the given open socket from multisocketsink.
304    */
305   gst_multi_socket_sink_signals[SIGNAL_REMOVE] =
306       g_signal_new ("remove", G_TYPE_FROM_CLASS (klass),
307       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
308       G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove), NULL, NULL,
309       g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_SOCKET);
310   /**
311    * GstMultiSocketSink::remove-flush:
312    * @gstmultisocketsink: the multisocketsink element to emit this signal on
313    * @socket:             the socket to remove from multisocketsink
314    *
315    * Remove the given open socket from multisocketsink after flushing all
316    * the pending data to the socket.
317    */
318   gst_multi_socket_sink_signals[SIGNAL_REMOVE_FLUSH] =
319       g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass),
320       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
321       G_STRUCT_OFFSET (GstMultiSocketSinkClass, remove_flush), NULL, NULL,
322       g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_SOCKET);
323 
324   /**
325    * GstMultiSocketSink::get-stats:
326    * @gstmultisocketsink: the multisocketsink element to emit this signal on
327    * @socket:             the socket to get stats of from multisocketsink
328    *
329    * Get statistics about @socket. This function returns a GstStructure.
330    *
331    * Returns: a GstStructure with the statistics. The structure contains
332    *     values that represent: total number of bytes sent, time
333    *     when the client was added, time when the client was
334    *     disconnected/removed, time the client is/was active, last activity
335    *     time (in epoch seconds), number of buffers dropped.
336    *     All times are expressed in nanoseconds (GstClockTime).
337    */
338   gst_multi_socket_sink_signals[SIGNAL_GET_STATS] =
339       g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass),
340       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
341       G_STRUCT_OFFSET (GstMultiSocketSinkClass, get_stats), NULL, NULL,
342       g_cclosure_marshal_generic, GST_TYPE_STRUCTURE, 1, G_TYPE_SOCKET);
343 
344   /**
345    * GstMultiSocketSink::client-added:
346    * @gstmultisocketsink: the multisocketsink element that emitted this signal
347    * @socket:             the socket that was added to multisocketsink
348    *
349    * The given socket was added to multisocketsink. This signal will
350    * be emitted from the streaming thread so application should be prepared
351    * for that.
352    */
353   gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED] =
354       g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
355       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
356       G_TYPE_NONE, 1, G_TYPE_OBJECT);
357   /**
358    * GstMultiSocketSink::client-removed:
359    * @gstmultisocketsink: the multisocketsink element that emitted this signal
360    * @socket:             the socket that is to be removed from multisocketsink
361    * @status:             the reason why the client was removed
362    *
363    * The given socket is about to be removed from multisocketsink. This
364    * signal will be emitted from the streaming thread so applications should
365    * be prepared for that.
366    *
367    * @gstmultisocketsink still holds a handle to @socket so it is possible to call
368    * the get-stats signal from this callback. For the same reason it is
369    * not safe to close() and reuse @socket in this callback.
370    */
371   gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED] =
372       g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
373       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
374       G_TYPE_NONE, 2, G_TYPE_SOCKET, GST_TYPE_CLIENT_STATUS);
375   /**
376    * GstMultiSocketSink::client-socket-removed:
377    * @gstmultisocketsink: the multisocketsink element that emitted this signal
378    * @socket:             the socket that was removed from multisocketsink
379    *
380    * The given socket was removed from multisocketsink. This signal will
381    * be emitted from the streaming thread so applications should be prepared
382    * for that.
383    *
384    * In this callback, @gstmultisocketsink has removed all the information
385    * associated with @socket and it is therefore not possible to call get-stats
386    * with @socket. It is however safe to close() and reuse @fd in the callback.
387    */
388   gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED] =
389       g_signal_new ("client-socket-removed", G_TYPE_FROM_CLASS (klass),
390       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
391       G_TYPE_NONE, 1, G_TYPE_SOCKET);
392 
393   gst_element_class_set_static_metadata (gstelement_class,
394       "Multi socket sink", "Sink/Network",
395       "Send data to multiple sockets",
396       "Thomas Vander Stichele <thomas at apestaart dot org>, "
397       "Wim Taymans <wim@fluendo.com>, "
398       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
399 
400   gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock);
401   gstbasesink_class->unlock_stop =
402       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_unlock_stop);
403   gstbasesink_class->propose_allocation =
404       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_propose_allocation);
405 
406   klass->add = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add);
407   klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_add_full);
408   klass->remove = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove);
409   klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_remove_flush);
410   klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_socket_sink_get_stats);
411 
412   gstmultihandlesink_class->emit_client_added =
413       gst_multi_socket_sink_emit_client_added;
414   gstmultihandlesink_class->emit_client_removed =
415       gst_multi_socket_sink_emit_client_removed;
416 
417   gstmultihandlesink_class->stop_pre =
418       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_pre);
419   gstmultihandlesink_class->stop_post =
420       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_stop_post);
421   gstmultihandlesink_class->start_pre =
422       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_start_pre);
423   gstmultihandlesink_class->thread =
424       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_thread);
425   gstmultihandlesink_class->new_client =
426       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_new_client);
427   gstmultihandlesink_class->client_get_fd =
428       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_get_fd);
429   gstmultihandlesink_class->client_free =
430       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_client_free);
431   gstmultihandlesink_class->handle_debug =
432       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_debug);
433   gstmultihandlesink_class->handle_hash_key =
434       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_handle_hash_key);
435   gstmultihandlesink_class->hash_adding =
436       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_hash_adding);
437   gstmultihandlesink_class->hash_removing =
438       GST_DEBUG_FUNCPTR (gst_multi_socket_sink_hash_removing);
439 
440   GST_DEBUG_CATEGORY_INIT (multisocketsink_debug, "multisocketsink", 0,
441       "Multi socket sink");
442 }
443 
444 static void
gst_multi_socket_sink_init(GstMultiSocketSink * this)445 gst_multi_socket_sink_init (GstMultiSocketSink * this)
446 {
447   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (this);
448 
449   mhsink->handle_hash = g_hash_table_new (g_direct_hash, g_int_equal);
450 
451   this->cancellable = g_cancellable_new ();
452   this->send_dispatched = DEFAULT_SEND_DISPATCHED;
453   this->send_messages = DEFAULT_SEND_MESSAGES;
454 }
455 
456 static void
gst_multi_socket_sink_finalize(GObject * object)457 gst_multi_socket_sink_finalize (GObject * object)
458 {
459   GstMultiSocketSink *this = GST_MULTI_SOCKET_SINK (object);
460 
461   if (this->cancellable) {
462     g_object_unref (this->cancellable);
463     this->cancellable = NULL;
464   }
465 
466   G_OBJECT_CLASS (parent_class)->finalize (object);
467 }
468 
469 /* methods to emit signals */
470 
471 static void
gst_multi_socket_sink_emit_client_added(GstMultiHandleSink * mhsink,GstMultiSinkHandle handle)472 gst_multi_socket_sink_emit_client_added (GstMultiHandleSink * mhsink,
473     GstMultiSinkHandle handle)
474 {
475   g_signal_emit (mhsink, gst_multi_socket_sink_signals[SIGNAL_CLIENT_ADDED], 0,
476       handle.socket);
477 }
478 
479 static void
gst_multi_socket_sink_emit_client_removed(GstMultiHandleSink * mhsink,GstMultiSinkHandle handle,GstClientStatus status)480 gst_multi_socket_sink_emit_client_removed (GstMultiHandleSink * mhsink,
481     GstMultiSinkHandle handle, GstClientStatus status)
482 {
483   g_signal_emit (mhsink, gst_multi_socket_sink_signals[SIGNAL_CLIENT_REMOVED],
484       0, handle.socket, status);
485 }
486 
487 /* action signals */
488 
489 static void
gst_multi_socket_sink_add(GstMultiSocketSink * sink,GSocket * socket)490 gst_multi_socket_sink_add (GstMultiSocketSink * sink, GSocket * socket)
491 {
492   GstMultiSinkHandle handle;
493 
494   handle.socket = socket;
495   gst_multi_handle_sink_add (GST_MULTI_HANDLE_SINK_CAST (sink), handle);
496 }
497 
498 static void
gst_multi_socket_sink_add_full(GstMultiSocketSink * sink,GSocket * socket,GstSyncMethod sync,GstFormat min_format,guint64 min_value,GstFormat max_format,guint64 max_value)499 gst_multi_socket_sink_add_full (GstMultiSocketSink * sink, GSocket * socket,
500     GstSyncMethod sync, GstFormat min_format, guint64 min_value,
501     GstFormat max_format, guint64 max_value)
502 {
503   GstMultiSinkHandle handle;
504 
505   handle.socket = socket;
506   gst_multi_handle_sink_add_full (GST_MULTI_HANDLE_SINK_CAST (sink), handle,
507       sync, min_format, min_value, max_format, max_value);
508 }
509 
510 static void
gst_multi_socket_sink_remove(GstMultiSocketSink * sink,GSocket * socket)511 gst_multi_socket_sink_remove (GstMultiSocketSink * sink, GSocket * socket)
512 {
513   GstMultiSinkHandle handle;
514 
515   handle.socket = socket;
516   gst_multi_handle_sink_remove (GST_MULTI_HANDLE_SINK_CAST (sink), handle);
517 }
518 
519 static void
gst_multi_socket_sink_remove_flush(GstMultiSocketSink * sink,GSocket * socket)520 gst_multi_socket_sink_remove_flush (GstMultiSocketSink * sink, GSocket * socket)
521 {
522   GstMultiSinkHandle handle;
523 
524   handle.socket = socket;
525   gst_multi_handle_sink_remove_flush (GST_MULTI_HANDLE_SINK_CAST (sink),
526       handle);
527 }
528 
529 static GstStructure *
gst_multi_socket_sink_get_stats(GstMultiSocketSink * sink,GSocket * socket)530 gst_multi_socket_sink_get_stats (GstMultiSocketSink * sink, GSocket * socket)
531 {
532   GstMultiSinkHandle handle;
533 
534   handle.socket = socket;
535   return gst_multi_handle_sink_get_stats (GST_MULTI_HANDLE_SINK_CAST (sink),
536       handle);
537 }
538 
539 static GstMultiHandleClient *
gst_multi_socket_sink_new_client(GstMultiHandleSink * mhsink,GstMultiSinkHandle handle,GstSyncMethod sync_method)540 gst_multi_socket_sink_new_client (GstMultiHandleSink * mhsink,
541     GstMultiSinkHandle handle, GstSyncMethod sync_method)
542 {
543   GstSocketClient *client;
544   GstMultiHandleClient *mhclient;
545   GstMultiHandleSinkClass *mhsinkclass =
546       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
547 
548   /* create client datastructure */
549   g_assert (G_IS_SOCKET (handle.socket));
550   client = g_new0 (GstSocketClient, 1);
551   mhclient = (GstMultiHandleClient *) client;
552 
553   mhclient->handle.socket = G_SOCKET (g_object_ref (handle.socket));
554 
555   gst_multi_handle_sink_client_init (mhclient, sync_method);
556   mhsinkclass->handle_debug (handle, mhclient->debug);
557 
558   /* set the socket to non blocking */
559   g_socket_set_blocking (handle.socket, FALSE);
560 
561   /* we always read from a client */
562   mhsinkclass->hash_adding (mhsink, mhclient);
563 
564   gst_multi_handle_sink_setup_dscp_client (mhsink, mhclient);
565 
566   return mhclient;
567 }
568 
569 static int
gst_multi_socket_sink_client_get_fd(GstMultiHandleClient * client)570 gst_multi_socket_sink_client_get_fd (GstMultiHandleClient * client)
571 {
572   return g_socket_get_fd (client->handle.socket);
573 }
574 
575 static void
gst_multi_socket_sink_client_free(GstMultiHandleSink * mhsink,GstMultiHandleClient * client)576 gst_multi_socket_sink_client_free (GstMultiHandleSink * mhsink,
577     GstMultiHandleClient * client)
578 {
579   g_assert (G_IS_SOCKET (client->handle.socket));
580 
581   g_signal_emit (mhsink,
582       gst_multi_socket_sink_signals[SIGNAL_CLIENT_SOCKET_REMOVED], 0,
583       client->handle.socket);
584 
585   g_object_unref (client->handle.socket);
586 }
587 
588 static void
gst_multi_socket_sink_handle_debug(GstMultiSinkHandle handle,gchar debug[30])589 gst_multi_socket_sink_handle_debug (GstMultiSinkHandle handle, gchar debug[30])
590 {
591   g_snprintf (debug, 30, "[socket %p]", handle.socket);
592 }
593 
594 static gpointer
gst_multi_socket_sink_handle_hash_key(GstMultiSinkHandle handle)595 gst_multi_socket_sink_handle_hash_key (GstMultiSinkHandle handle)
596 {
597   return handle.socket;
598 }
599 
600 /* handle a read on a client socket,
601  * which either indicates a close or should be ignored
602  * returns FALSE if some error occured or the client closed. */
603 static gboolean
gst_multi_socket_sink_handle_client_read(GstMultiSocketSink * sink,GstSocketClient * client)604 gst_multi_socket_sink_handle_client_read (GstMultiSocketSink * sink,
605     GstSocketClient * client)
606 {
607   gboolean ret, do_event;
608   gchar dummy[256], *mem, *omem;
609   gssize nread;
610   GError *err = NULL;
611   gboolean first = TRUE;
612   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
613   gssize navail, maxmem;
614 
615   GST_DEBUG_OBJECT (sink, "%s select reports client read", mhclient->debug);
616 
617   ret = TRUE;
618 
619   navail = g_socket_get_available_bytes (mhclient->handle.socket);
620   if (navail < 0)
621     return TRUE;
622 
623   /* only collect the data in a buffer when we need to send it with an event */
624   do_event = sink->send_messages && navail > 0;
625   if (do_event) {
626     omem = mem = g_malloc (navail);
627     maxmem = navail;
628   } else {
629     mem = dummy;
630     maxmem = sizeof (dummy);
631   }
632 
633   /* just Read 'n' Drop, could also just drop the client as it's not supposed
634    * to write to us except for closing the socket, I guess it's because we
635    * like to listen to our customers. */
636   do {
637     GST_DEBUG_OBJECT (sink, "%s client wants us to read", mhclient->debug);
638 
639     nread =
640         g_socket_receive (mhclient->handle.socket, mem, MIN (navail,
641             maxmem), sink->cancellable, &err);
642 
643     if (first && nread == 0) {
644       /* client sent close, so remove it */
645       GST_DEBUG_OBJECT (sink, "%s client asked for close, removing",
646           mhclient->debug);
647       mhclient->status = GST_CLIENT_STATUS_CLOSED;
648       ret = FALSE;
649       break;
650     } else if (nread < 0) {
651       if (err->code == G_IO_ERROR_WOULD_BLOCK)
652         break;
653 
654       GST_WARNING_OBJECT (sink, "%s could not read: %s",
655           mhclient->debug, err->message);
656       mhclient->status = GST_CLIENT_STATUS_ERROR;
657       ret = FALSE;
658       break;
659     }
660     navail -= nread;
661     if (do_event)
662       mem += nread;
663     first = FALSE;
664   } while (navail > 0);
665   g_clear_error (&err);
666 
667   if (do_event) {
668     if (ret) {
669       GstBuffer *buf;
670       GstEvent *ev;
671 
672       buf = gst_buffer_new_wrapped (omem, maxmem);
673       ev = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
674           gst_structure_new ("GstNetworkMessage",
675               "object", G_TYPE_OBJECT, mhclient->handle.socket,
676               "buffer", GST_TYPE_BUFFER, buf, NULL));
677       gst_buffer_unref (buf);
678 
679       gst_pad_push_event (GST_BASE_SINK_PAD (sink), ev);
680     } else
681       g_free (omem);
682   }
683   return ret;
684 }
685 
686 /**
687  * map_memory_output_vector_n:
688  * @buf: The #GstBuffer that should be mapped
689  * @offset: Offset into the buffer that should be mapped
690  * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
691  * @mapinfo: (out,array length=num_vectors): an array of #GstMapInfo structs to write into
692  * @num_vectors: the number of elements in @vectors to prevent buffer overruns
693  *
694  * Maps a buffer into memory, populating a #GOutputVector to use scatter-gather
695  * I/O to send the data over a socket.  The whole buffer won't be mapped into
696  * memory if it consists of more than @num_vectors #GstMemory s.
697  *
698  * Use #unmap_n_memorys after you are
699  * finished with the mappings.
700  *
701  * Returns: The number of GstMemorys mapped
702  */
703 static int
map_n_memory_output_vector(GstBuffer * buf,size_t offset,GOutputVector * vectors,GstMapInfo * mapinfo,int num_vectors)704 map_n_memory_output_vector (GstBuffer * buf, size_t offset,
705     GOutputVector * vectors, GstMapInfo * mapinfo, int num_vectors)
706 {
707   guint mem_idx, mem_len;
708   gsize mem_skip;
709   size_t maxsize;
710   int i;
711 
712   g_return_val_if_fail (num_vectors > 0, 0);
713   memset (vectors, 0, sizeof (GOutputVector) * num_vectors);
714 
715   maxsize = gst_buffer_get_size (buf) - offset;
716   if (!gst_buffer_find_memory (buf, offset, maxsize, &mem_idx, &mem_len,
717           &mem_skip))
718     g_error ("Unable to map memory at offset %" G_GSIZE_FORMAT ", buffer "
719         "length is %" G_GSIZE_FORMAT, offset, gst_buffer_get_size (buf));
720 
721   for (i = 0; i < mem_len && i < num_vectors; i++) {
722     GstMapInfo map = { 0 };
723     GstMemory *mem = gst_buffer_peek_memory (buf, mem_idx + i);
724     if (!gst_memory_map (mem, &map, GST_MAP_READ))
725       g_error ("Unable to map memory %p.  This should never happen.", mem);
726 
727     if (i == 0) {
728       vectors[i].buffer = map.data + mem_skip;
729       vectors[i].size = map.size - mem_skip;
730     } else {
731       vectors[i].buffer = map.data;
732       vectors[i].size = map.size;
733     }
734     mapinfo[i] = map;
735   }
736   return i;
737 }
738 
739 /**
740  * map_n_memory_output_vector:
741  * @buf: The #GstBuffer that should be mapped
742  * @offset: Offset into the buffer that should be mapped
743  * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
744  * @num_vectors: the number of elements in @vectors to prevent buffer overruns
745  *
746  * Returns: The number of GstMemorys mapped
747  */
748 static void
unmap_n_memorys(GstMapInfo * mapinfo,int num_mappings)749 unmap_n_memorys (GstMapInfo * mapinfo, int num_mappings)
750 {
751   int i;
752   g_return_if_fail (num_mappings > 0);
753 
754   for (i = 0; i < num_mappings; i++)
755     gst_memory_unmap (mapinfo[i].memory, &mapinfo[i]);
756 }
757 
758 static gsize
gst_buffer_get_cmsg_list(GstBuffer * buf,GSocketControlMessage ** msgs,gsize msg_space)759 gst_buffer_get_cmsg_list (GstBuffer * buf, GSocketControlMessage ** msgs,
760     gsize msg_space)
761 {
762   gpointer iter_state = NULL;
763   GstMeta *meta;
764   gsize msg_count = 0;
765 
766   while ((meta = gst_buffer_iterate_meta (buf, &iter_state)) != NULL
767       && msg_count < msg_space) {
768     if (meta->info->api == GST_NET_CONTROL_MESSAGE_META_API_TYPE)
769       msgs[msg_count++] = ((GstNetControlMessageMeta *) meta)->message;
770   }
771 
772   return msg_count;
773 }
774 
775 #define CMSG_MAX 255
776 
777 static gssize
gst_multi_socket_sink_write(GstMultiSocketSink * sink,GSocket * sock,GstBuffer * buffer,gsize bufoffset,GCancellable * cancellable,GError ** err)778 gst_multi_socket_sink_write (GstMultiSocketSink * sink,
779     GSocket * sock, GstBuffer * buffer, gsize bufoffset,
780     GCancellable * cancellable, GError ** err)
781 {
782   GstMapInfo maps[8];
783   GOutputVector vec[8];
784   guint mems_mapped;
785   gssize wrote;
786   GSocketControlMessage *cmsgs[CMSG_MAX];
787   gsize msg_count;
788 
789   mems_mapped = map_n_memory_output_vector (buffer, bufoffset, vec, maps, 8);
790 
791   msg_count = gst_buffer_get_cmsg_list (buffer, cmsgs, CMSG_MAX);
792 
793   wrote =
794       g_socket_send_message (sock, NULL, vec, mems_mapped, cmsgs, msg_count, 0,
795       cancellable, err);
796   unmap_n_memorys (maps, mems_mapped);
797   return wrote;
798 }
799 
800 /* Handle a write on a client,
801  * which indicates a read request from a client.
802  *
803  * For each client we maintain a queue of GstBuffers that contain the raw bytes
804  * we need to send to the client.
805  *
806  * We first check to see if we need to send streamheaders. If so, we queue them.
807  *
808  * Then we run into the main loop that tries to send as many buffers as
809  * possible. It will first exhaust the mhclient->sending queue and if the queue
810  * is empty, it will pick a buffer from the global queue.
811  *
812  * Sending the buffers from the mhclient->sending queue is basically writing
813  * the bytes to the socket and maintaining a count of the bytes that were
814  * sent. When the buffer is completely sent, it is removed from the
815  * mhclient->sending queue and we try to pick a new buffer for sending.
816  *
817  * When the sending returns a partial buffer we stop sending more data as
818  * the next send operation could block.
819  *
820  * This functions returns FALSE if some error occured.
821  */
822 static gboolean
gst_multi_socket_sink_handle_client_write(GstMultiSocketSink * sink,GstSocketClient * client)823 gst_multi_socket_sink_handle_client_write (GstMultiSocketSink * sink,
824     GstSocketClient * client)
825 {
826   gboolean more;
827   gboolean flushing;
828   GstClockTime now;
829   GTimeVal nowtv;
830   GError *err = NULL;
831   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
832   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
833   GstMultiHandleSinkClass *mhsinkclass =
834       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
835 
836 
837   g_get_current_time (&nowtv);
838   now = GST_TIMEVAL_TO_TIME (nowtv);
839 
840   flushing = mhclient->status == GST_CLIENT_STATUS_FLUSHING;
841 
842   more = TRUE;
843   do {
844     if (!mhclient->sending) {
845       /* client is not working on a buffer */
846       if (mhclient->bufpos == -1) {
847         /* client is too fast, remove from write queue until new buffer is
848          * available */
849         gst_multi_socket_sink_stop_sending (sink, client);
850 
851         /* if we flushed out all of the client buffers, we can stop */
852         if (mhclient->flushcount == 0)
853           goto flushed;
854 
855         return TRUE;
856       } else {
857         /* client can pick a buffer from the global queue */
858         GstBuffer *buf;
859         GstClockTime timestamp;
860 
861         /* for new connections, we need to find a good spot in the
862          * bufqueue to start streaming from */
863         if (mhclient->new_connection && !flushing) {
864           gint position =
865               gst_multi_handle_sink_new_client_position (mhsink, mhclient);
866 
867           if (position >= 0) {
868             /* we got a valid spot in the queue */
869             mhclient->new_connection = FALSE;
870             mhclient->bufpos = position;
871           } else {
872             /* cannot send data to this client yet */
873             gst_multi_socket_sink_stop_sending (sink, client);
874             return TRUE;
875           }
876         }
877 
878         /* we flushed all remaining buffers, no need to get a new one */
879         if (mhclient->flushcount == 0)
880           goto flushed;
881 
882         /* grab buffer */
883         buf = g_array_index (mhsink->bufqueue, GstBuffer *, mhclient->bufpos);
884         mhclient->bufpos--;
885 
886         /* update stats */
887         timestamp = GST_BUFFER_TIMESTAMP (buf);
888         if (mhclient->first_buffer_ts == GST_CLOCK_TIME_NONE)
889           mhclient->first_buffer_ts = timestamp;
890         if (timestamp != -1)
891           mhclient->last_buffer_ts = timestamp;
892 
893         /* decrease flushcount */
894         if (mhclient->flushcount != -1)
895           mhclient->flushcount--;
896 
897         GST_LOG_OBJECT (sink, "%s client %p at position %d",
898             mhclient->debug, client, mhclient->bufpos);
899 
900         /* queueing a buffer will ref it */
901         mhsinkclass->client_queue_buffer (mhsink, mhclient, buf);
902 
903         /* need to start from the first byte for this new buffer */
904         mhclient->bufoffset = 0;
905       }
906     }
907 
908     /* see if we need to send something */
909     if (mhclient->sending) {
910       gssize wrote;
911       GstBuffer *head;
912 
913       /* pick first buffer from list */
914       head = GST_BUFFER (mhclient->sending->data);
915 
916       wrote = gst_multi_socket_sink_write (sink, mhclient->handle.socket, head,
917           mhclient->bufoffset, sink->cancellable, &err);
918 
919       if (wrote < 0) {
920         /* hmm error.. */
921         if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CLOSED)) {
922           goto connection_reset;
923         } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
924           /* write would block, try again later */
925           GST_LOG_OBJECT (sink, "write would block %p",
926               mhclient->handle.socket);
927           more = FALSE;
928           g_clear_error (&err);
929         } else {
930           goto write_error;
931         }
932       } else {
933         if (wrote < (gst_buffer_get_size (head) - mhclient->bufoffset)) {
934           /* partial write, try again now */
935           GST_LOG_OBJECT (sink,
936               "partial write on %p of %" G_GSSIZE_FORMAT " bytes",
937               mhclient->handle.socket, wrote);
938           mhclient->bufoffset += wrote;
939         } else {
940           if (sink->send_dispatched) {
941             gst_pad_push_event (GST_BASE_SINK_PAD (mhsink),
942                 gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
943                     gst_structure_new ("GstNetworkMessageDispatched",
944                         "object", G_TYPE_OBJECT, mhclient->handle.socket,
945                         "buffer", GST_TYPE_BUFFER, head, NULL)));
946           }
947           /* complete buffer was written, we can proceed to the next one */
948           mhclient->sending = g_slist_remove (mhclient->sending, head);
949           gst_buffer_unref (head);
950           /* make sure we start from byte 0 for the next buffer */
951           mhclient->bufoffset = 0;
952         }
953         /* update stats */
954         mhclient->bytes_sent += wrote;
955         mhclient->last_activity_time = now;
956         mhsink->bytes_served += wrote;
957       }
958     }
959   } while (more);
960 
961   return TRUE;
962 
963   /* ERRORS */
964 flushed:
965   {
966     GST_DEBUG_OBJECT (sink, "%s flushed, removing", mhclient->debug);
967     mhclient->status = GST_CLIENT_STATUS_REMOVED;
968     return FALSE;
969   }
970 connection_reset:
971   {
972     GST_DEBUG_OBJECT (sink, "%s connection reset by peer, removing",
973         mhclient->debug);
974     mhclient->status = GST_CLIENT_STATUS_CLOSED;
975     g_clear_error (&err);
976     return FALSE;
977   }
978 write_error:
979   {
980     GST_WARNING_OBJECT (sink,
981         "%s could not write, removing client: %s", mhclient->debug,
982         err->message);
983     g_clear_error (&err);
984     mhclient->status = GST_CLIENT_STATUS_ERROR;
985     return FALSE;
986   }
987 }
988 
989 static void
ensure_condition(GstMultiSocketSink * sink,GstSocketClient * client,GIOCondition condition)990 ensure_condition (GstMultiSocketSink * sink, GstSocketClient * client,
991     GIOCondition condition)
992 {
993   GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
994 
995   if (client->condition == condition)
996     return;
997 
998   if (client->source) {
999     g_source_destroy (client->source);
1000     g_source_unref (client->source);
1001   }
1002   if (condition && sink->main_context) {
1003     client->source = g_socket_create_source (mhclient->handle.socket,
1004         condition, sink->cancellable);
1005     g_source_set_callback (client->source,
1006         (GSourceFunc) gst_multi_socket_sink_socket_condition,
1007         gst_object_ref (sink), (GDestroyNotify) gst_object_unref);
1008     g_source_attach (client->source, sink->main_context);
1009   } else {
1010     client->source = NULL;
1011     condition = 0;
1012   }
1013   client->condition = condition;
1014 }
1015 
1016 static void
gst_multi_socket_sink_hash_adding(GstMultiHandleSink * mhsink,GstMultiHandleClient * mhclient)1017 gst_multi_socket_sink_hash_adding (GstMultiHandleSink * mhsink,
1018     GstMultiHandleClient * mhclient)
1019 {
1020   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1021   GstSocketClient *client = (GstSocketClient *) (mhclient);
1022 
1023   ensure_condition (sink, client,
1024       G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP);
1025 }
1026 
1027 static void
gst_multi_socket_sink_hash_removing(GstMultiHandleSink * mhsink,GstMultiHandleClient * mhclient)1028 gst_multi_socket_sink_hash_removing (GstMultiHandleSink * mhsink,
1029     GstMultiHandleClient * mhclient)
1030 {
1031   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1032   GstSocketClient *client = (GstSocketClient *) (mhclient);
1033 
1034   ensure_condition (sink, client, 0);
1035 }
1036 
1037 static void
gst_multi_socket_sink_stop_sending(GstMultiSocketSink * sink,GstSocketClient * client)1038 gst_multi_socket_sink_stop_sending (GstMultiSocketSink * sink,
1039     GstSocketClient * client)
1040 {
1041   ensure_condition (sink, client, G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
1042 }
1043 
1044 /* Handle the clients. This is called when a socket becomes ready
1045  * to read or writable. Badly behaving clients are put on a
1046  * garbage list and removed.
1047  */
1048 static gboolean
gst_multi_socket_sink_socket_condition(GstMultiSinkHandle handle,GIOCondition condition,GstMultiSocketSink * sink)1049 gst_multi_socket_sink_socket_condition (GstMultiSinkHandle handle,
1050     GIOCondition condition, GstMultiSocketSink * sink)
1051 {
1052   GList *clink;
1053   GstSocketClient *client;
1054   gboolean ret = TRUE;
1055   GstMultiHandleClient *mhclient;
1056   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1057   GstMultiHandleSinkClass *mhsinkclass =
1058       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1059 
1060   CLIENTS_LOCK (mhsink);
1061   clink = g_hash_table_lookup (mhsink->handle_hash,
1062       mhsinkclass->handle_hash_key (handle));
1063   if (clink == NULL) {
1064     ret = FALSE;
1065     goto done;
1066   }
1067 
1068   client = clink->data;
1069   mhclient = (GstMultiHandleClient *) client;
1070 
1071   if (mhclient->status != GST_CLIENT_STATUS_FLUSHING
1072       && mhclient->status != GST_CLIENT_STATUS_OK) {
1073     gst_multi_handle_sink_remove_client_link (mhsink, clink);
1074     ret = FALSE;
1075     goto done;
1076   }
1077 
1078   if ((condition & G_IO_ERR)) {
1079     GST_WARNING_OBJECT (sink, "%s has error", mhclient->debug);
1080     mhclient->status = GST_CLIENT_STATUS_ERROR;
1081     gst_multi_handle_sink_remove_client_link (mhsink, clink);
1082     ret = FALSE;
1083     goto done;
1084   } else if ((condition & G_IO_HUP)) {
1085     mhclient->status = GST_CLIENT_STATUS_CLOSED;
1086     gst_multi_handle_sink_remove_client_link (mhsink, clink);
1087     ret = FALSE;
1088     goto done;
1089   }
1090   if ((condition & G_IO_IN) || (condition & G_IO_PRI)) {
1091     /* handle client read */
1092     if (!gst_multi_socket_sink_handle_client_read (sink, client)) {
1093       gst_multi_handle_sink_remove_client_link (mhsink, clink);
1094       ret = FALSE;
1095       goto done;
1096     }
1097   }
1098   if ((condition & G_IO_OUT)) {
1099     /* handle client write */
1100     if (!gst_multi_socket_sink_handle_client_write (sink, client)) {
1101       gst_multi_handle_sink_remove_client_link (mhsink, clink);
1102       ret = FALSE;
1103       goto done;
1104     }
1105   }
1106 
1107 done:
1108   CLIENTS_UNLOCK (mhsink);
1109 
1110   return ret;
1111 }
1112 
1113 static gboolean
gst_multi_socket_sink_timeout(GstMultiSocketSink * sink)1114 gst_multi_socket_sink_timeout (GstMultiSocketSink * sink)
1115 {
1116   GstClockTime now;
1117   GTimeVal nowtv;
1118   GList *clients;
1119   GstMultiHandleSink *mhsink = GST_MULTI_HANDLE_SINK (sink);
1120 
1121   g_get_current_time (&nowtv);
1122   now = GST_TIMEVAL_TO_TIME (nowtv);
1123 
1124   CLIENTS_LOCK (mhsink);
1125   for (clients = mhsink->clients; clients; clients = clients->next) {
1126     GstSocketClient *client;
1127     GstMultiHandleClient *mhclient;
1128 
1129     client = clients->data;
1130     mhclient = (GstMultiHandleClient *) client;
1131     if (mhsink->timeout > 0
1132         && now - mhclient->last_activity_time > mhsink->timeout) {
1133       mhclient->status = GST_CLIENT_STATUS_SLOW;
1134       gst_multi_handle_sink_remove_client_link (mhsink, clients);
1135     }
1136   }
1137   CLIENTS_UNLOCK (mhsink);
1138 
1139   return FALSE;
1140 }
1141 
1142 /* we handle the client communication in another thread so that we do not block
1143  * the gstreamer thread while we select() on the client fds */
1144 static gpointer
gst_multi_socket_sink_thread(GstMultiHandleSink * mhsink)1145 gst_multi_socket_sink_thread (GstMultiHandleSink * mhsink)
1146 {
1147   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (mhsink);
1148   GSource *timeout = NULL;
1149 
1150   while (mhsink->running) {
1151     if (mhsink->timeout > 0) {
1152       timeout = g_timeout_source_new (mhsink->timeout / GST_MSECOND);
1153 
1154       g_source_set_callback (timeout,
1155           (GSourceFunc) gst_multi_socket_sink_timeout, gst_object_ref (sink),
1156           (GDestroyNotify) gst_object_unref);
1157       g_source_attach (timeout, sink->main_context);
1158     }
1159 
1160     /* Returns after handling all pending events or when
1161      * _wakeup() was called. In any case we have to add
1162      * a new timeout because something happened.
1163      */
1164     g_main_context_iteration (sink->main_context, TRUE);
1165 
1166     if (timeout) {
1167       g_source_destroy (timeout);
1168       g_source_unref (timeout);
1169     }
1170   }
1171 
1172   return NULL;
1173 }
1174 
1175 static void
gst_multi_socket_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)1176 gst_multi_socket_sink_set_property (GObject * object, guint prop_id,
1177     const GValue * value, GParamSpec * pspec)
1178 {
1179   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (object);
1180 
1181   switch (prop_id) {
1182     case PROP_SEND_DISPATCHED:
1183       sink->send_dispatched = g_value_get_boolean (value);
1184       break;
1185     case PROP_SEND_MESSAGES:
1186       sink->send_messages = g_value_get_boolean (value);
1187       break;
1188     default:
1189       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1190       break;
1191   }
1192 }
1193 
1194 static void
gst_multi_socket_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)1195 gst_multi_socket_sink_get_property (GObject * object, guint prop_id,
1196     GValue * value, GParamSpec * pspec)
1197 {
1198   GstMultiSocketSink *sink = GST_MULTI_SOCKET_SINK (object);
1199 
1200   switch (prop_id) {
1201     case PROP_SEND_DISPATCHED:
1202       g_value_set_boolean (value, sink->send_dispatched);
1203       break;
1204     case PROP_SEND_MESSAGES:
1205       g_value_set_boolean (value, sink->send_messages);
1206       break;
1207     default:
1208       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1209       break;
1210   }
1211 }
1212 
1213 static gboolean
gst_multi_socket_sink_start_pre(GstMultiHandleSink * mhsink)1214 gst_multi_socket_sink_start_pre (GstMultiHandleSink * mhsink)
1215 {
1216   GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1217   GstMultiHandleSinkClass *mhsinkclass =
1218       GST_MULTI_HANDLE_SINK_GET_CLASS (mhsink);
1219   GList *clients;
1220 
1221   GST_INFO_OBJECT (mssink, "starting");
1222 
1223   mssink->main_context = g_main_context_new ();
1224 
1225   CLIENTS_LOCK (mhsink);
1226   for (clients = mhsink->clients; clients; clients = clients->next) {
1227     GstSocketClient *client = clients->data;
1228     GstMultiHandleClient *mhclient = (GstMultiHandleClient *) client;
1229 
1230     if (client->source)
1231       continue;
1232     mhsinkclass->hash_adding (mhsink, mhclient);
1233   }
1234   CLIENTS_UNLOCK (mhsink);
1235 
1236   return TRUE;
1237 }
1238 
1239 static gboolean
multisocketsink_hash_remove(gpointer key,gpointer value,gpointer data)1240 multisocketsink_hash_remove (gpointer key, gpointer value, gpointer data)
1241 {
1242   return TRUE;
1243 }
1244 
1245 static void
gst_multi_socket_sink_stop_pre(GstMultiHandleSink * mhsink)1246 gst_multi_socket_sink_stop_pre (GstMultiHandleSink * mhsink)
1247 {
1248   GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1249 
1250   if (mssink->main_context)
1251     g_main_context_wakeup (mssink->main_context);
1252 }
1253 
1254 static void
gst_multi_socket_sink_stop_post(GstMultiHandleSink * mhsink)1255 gst_multi_socket_sink_stop_post (GstMultiHandleSink * mhsink)
1256 {
1257   GstMultiSocketSink *mssink = GST_MULTI_SOCKET_SINK (mhsink);
1258 
1259   if (mssink->main_context) {
1260     g_main_context_unref (mssink->main_context);
1261     mssink->main_context = NULL;
1262   }
1263 
1264   g_hash_table_foreach_remove (mhsink->handle_hash, multisocketsink_hash_remove,
1265       mssink);
1266 }
1267 
1268 static gboolean
gst_multi_socket_sink_unlock(GstBaseSink * bsink)1269 gst_multi_socket_sink_unlock (GstBaseSink * bsink)
1270 {
1271   GstMultiSocketSink *sink;
1272 
1273   sink = GST_MULTI_SOCKET_SINK (bsink);
1274 
1275   GST_DEBUG_OBJECT (sink, "set to flushing");
1276   g_cancellable_cancel (sink->cancellable);
1277   if (sink->main_context)
1278     g_main_context_wakeup (sink->main_context);
1279 
1280   return TRUE;
1281 }
1282 
1283 /* will be called only between calls to start() and stop() */
1284 static gboolean
gst_multi_socket_sink_unlock_stop(GstBaseSink * bsink)1285 gst_multi_socket_sink_unlock_stop (GstBaseSink * bsink)
1286 {
1287   GstMultiSocketSink *sink;
1288 
1289   sink = GST_MULTI_SOCKET_SINK (bsink);
1290 
1291   GST_DEBUG_OBJECT (sink, "unset flushing");
1292   g_object_unref (sink->cancellable);
1293   sink->cancellable = g_cancellable_new ();
1294 
1295   return TRUE;
1296 }
1297 
1298 static gboolean
gst_multi_socket_sink_propose_allocation(GstBaseSink * bsink,GstQuery * query)1299 gst_multi_socket_sink_propose_allocation (GstBaseSink * bsink, GstQuery * query)
1300 {
1301   /* we support some meta */
1302   gst_query_add_allocation_meta (query, GST_NET_CONTROL_MESSAGE_META_API_TYPE,
1303       NULL);
1304 
1305   return TRUE;
1306 }
1307