• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) <2011> Collabora Ltd.
5  *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  * Copyright (C) <2014> William Manley <will@williammanley.net>
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21  * Boston, MA 02110-1301, USA.
22  */
23 
24 /**
25  * SECTION:element-socketsrc
26  * @title: socketsrc
27  *
28  * Receive data from a socket.
29  *
30  * As compared to other elements:
31  *
32  * socketsrc can be considered a source counterpart to the #multisocketsink
33  * sink.
34  *
35  * socketsrc can also be considered a generalization of #tcpclientsrc and
36  * #tcpserversrc: it contains all the logic required to communicate over the
37  * socket but none of the logic for creating the sockets/establishing the
38  * connection in the first place, allowing the user to accomplish this
39  * externally in whatever manner they wish making it applicable to other types
40  * of sockets besides TCP.
41  *
42  * As compared to #fdsrc socketsrc is socket specific and deals with #GSocket
43  * objects rather than sockets via integer file-descriptors.
44  *
45  * @see_also: #multisocketsink
46  */
47 
48 #ifdef HAVE_CONFIG_H
49 #include "config.h"
50 #endif
51 
52 #include <gst/gst-i18n-plugin.h>
53 #include <gst/net/gstnetcontrolmessagemeta.h>
54 #include "gstsocketsrc.h"
55 #include "gsttcp.h"
56 
57 GST_DEBUG_CATEGORY_STATIC (socketsrc_debug);
58 #define GST_CAT_DEFAULT socketsrc_debug
59 
60 #define MAX_READ_SIZE                   4 * 1024
61 
62 
63 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
64     GST_PAD_SRC,
65     GST_PAD_ALWAYS,
66     GST_STATIC_CAPS_ANY);
67 
68 
69 #define DEFAULT_SEND_MESSAGES FALSE
70 
71 enum
72 {
73   PROP_0,
74   PROP_SOCKET,
75   PROP_CAPS,
76   PROP_SEND_MESSAGES
77 };
78 
79 enum
80 {
81   CONNECTION_CLOSED_BY_PEER,
82   LAST_SIGNAL
83 };
84 
85 static guint gst_socket_src_signals[LAST_SIGNAL] = { 0 };
86 
87 #define gst_socket_src_parent_class parent_class
88 G_DEFINE_TYPE (GstSocketSrc, gst_socket_src, GST_TYPE_PUSH_SRC);
89 
90 
91 static void gst_socket_src_finalize (GObject * gobject);
92 
93 static GstCaps *gst_socketsrc_getcaps (GstBaseSrc * src, GstCaps * filter);
94 static gboolean gst_socketsrc_event (GstBaseSrc * src, GstEvent * event);
95 static GstFlowReturn gst_socket_src_fill (GstPushSrc * psrc,
96     GstBuffer * outbuf);
97 static gboolean gst_socket_src_unlock (GstBaseSrc * bsrc);
98 static gboolean gst_socket_src_unlock_stop (GstBaseSrc * bsrc);
99 
100 static void gst_socket_src_set_property (GObject * object, guint prop_id,
101     const GValue * value, GParamSpec * pspec);
102 static void gst_socket_src_get_property (GObject * object, guint prop_id,
103     GValue * value, GParamSpec * pspec);
104 
105 #define SWAP(a, b) do { GSocket* _swap_tmp = a; a = b; b = _swap_tmp; } while (0);
106 
107 static void
gst_socket_src_class_init(GstSocketSrcClass * klass)108 gst_socket_src_class_init (GstSocketSrcClass * klass)
109 {
110   GObjectClass *gobject_class;
111   GstElementClass *gstelement_class;
112   GstBaseSrcClass *gstbasesrc_class;
113   GstPushSrcClass *gstpush_src_class;
114 
115   gobject_class = (GObjectClass *) klass;
116   gstelement_class = (GstElementClass *) klass;
117   gstbasesrc_class = (GstBaseSrcClass *) klass;
118   gstpush_src_class = (GstPushSrcClass *) klass;
119 
120   gobject_class->set_property = gst_socket_src_set_property;
121   gobject_class->get_property = gst_socket_src_get_property;
122   gobject_class->finalize = gst_socket_src_finalize;
123 
124   g_object_class_install_property (gobject_class, PROP_SOCKET,
125       g_param_spec_object ("socket", "Socket",
126           "The socket to receive packets from", G_TYPE_SOCKET,
127           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
128 
129   g_object_class_install_property (gobject_class, PROP_CAPS,
130       g_param_spec_boxed ("caps", "Caps",
131           "The caps of the source pad", GST_TYPE_CAPS,
132           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
133 
134   /**
135    * GstSocketSrc:send-messages:
136    *
137    * Control if the source will handle GstNetworkMessage events.
138    * The event is a CUSTOM event named 'GstNetworkMessage' and contains:
139    *
140    *   "buffer", GST_TYPE_BUFFER    : the buffer with data to send
141    *
142    * The buffer in the event will be sent on the socket. This allows
143    * for simple bidirectional communication.
144    *
145    * Since: 1.8.0
146    **/
147   g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
148       g_param_spec_boolean ("send-messages", "Send Messages",
149           "If GstNetworkMessage events should be handled",
150           DEFAULT_SEND_MESSAGES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
151 
152   gst_socket_src_signals[CONNECTION_CLOSED_BY_PEER] =
153       g_signal_new ("connection-closed-by-peer", G_TYPE_FROM_CLASS (klass),
154       G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSocketSrcClass,
155           connection_closed_by_peer), NULL, NULL, NULL, G_TYPE_NONE, 0);
156 
157   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
158 
159   gst_element_class_set_static_metadata (gstelement_class,
160       "socket source", "Source/Network",
161       "Receive data from a socket",
162       "Thomas Vander Stichele <thomas at apestaart dot org>, "
163       "William Manley <will@williammanley.net>");
164 
165   gstbasesrc_class->event = gst_socketsrc_event;
166   gstbasesrc_class->get_caps = gst_socketsrc_getcaps;
167   gstbasesrc_class->unlock = gst_socket_src_unlock;
168   gstbasesrc_class->unlock_stop = gst_socket_src_unlock_stop;
169 
170   gstpush_src_class->fill = gst_socket_src_fill;
171 
172   GST_DEBUG_CATEGORY_INIT (socketsrc_debug, "socketsrc", 0, "Socket Source");
173 }
174 
175 static void
gst_socket_src_init(GstSocketSrc * this)176 gst_socket_src_init (GstSocketSrc * this)
177 {
178   this->socket = NULL;
179   this->cancellable = g_cancellable_new ();
180   this->send_messages = DEFAULT_SEND_MESSAGES;
181 }
182 
183 static void
gst_socket_src_finalize(GObject * gobject)184 gst_socket_src_finalize (GObject * gobject)
185 {
186   GstSocketSrc *this = GST_SOCKET_SRC (gobject);
187 
188   if (this->caps)
189     gst_caps_unref (this->caps);
190   g_clear_object (&this->cancellable);
191   g_clear_object (&this->socket);
192 
193   G_OBJECT_CLASS (parent_class)->finalize (gobject);
194 }
195 
196 static gboolean
gst_socketsrc_event(GstBaseSrc * bsrc,GstEvent * event)197 gst_socketsrc_event (GstBaseSrc * bsrc, GstEvent * event)
198 {
199   GstSocketSrc *src;
200   gboolean res = FALSE;
201 
202   src = GST_SOCKET_SRC (bsrc);
203 
204   switch (GST_EVENT_TYPE (event)) {
205     case GST_EVENT_CUSTOM_UPSTREAM:
206       if (src->send_messages && gst_event_has_name (event, "GstNetworkMessage")) {
207         const GstStructure *str = gst_event_get_structure (event);
208         GSocket *socket;
209 
210         GST_OBJECT_LOCK (src);
211         if ((socket = src->socket))
212           g_object_ref (socket);
213         GST_OBJECT_UNLOCK (src);
214 
215         if (socket) {
216           GstBuffer *buf;
217           GstMapInfo map;
218           GError *err = NULL;
219           gssize ret;
220 
221           gst_structure_get (str, "buffer", GST_TYPE_BUFFER, &buf, NULL);
222 
223           if (buf) {
224             gst_buffer_map (buf, &map, GST_MAP_READ);
225             GST_LOG ("sending buffer of size %" G_GSIZE_FORMAT, map.size);
226             ret = g_socket_send_with_blocking (socket, (gchar *) map.data,
227                 map.size, FALSE, src->cancellable, &err);
228             gst_buffer_unmap (buf, &map);
229 
230             if (ret == -1) {
231               GST_WARNING ("could not send message: %s", err->message);
232               g_clear_error (&err);
233               res = FALSE;
234             } else
235               res = TRUE;
236             gst_buffer_unref (buf);
237           }
238           g_object_unref (socket);
239         }
240       }
241       break;
242     default:
243       res = GST_BASE_SRC_CLASS (parent_class)->event (bsrc, event);
244       break;
245   }
246   return res;
247 }
248 
249 static GstCaps *
gst_socketsrc_getcaps(GstBaseSrc * src,GstCaps * filter)250 gst_socketsrc_getcaps (GstBaseSrc * src, GstCaps * filter)
251 {
252   GstSocketSrc *socketsrc;
253   GstCaps *caps, *result;
254 
255   socketsrc = GST_SOCKET_SRC (src);
256 
257   GST_OBJECT_LOCK (src);
258   if ((caps = socketsrc->caps))
259     gst_caps_ref (caps);
260   GST_OBJECT_UNLOCK (src);
261 
262   if (caps) {
263     if (filter) {
264       result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
265       gst_caps_unref (caps);
266     } else {
267       result = caps;
268     }
269   } else {
270     result = (filter) ? gst_caps_ref (filter) : gst_caps_new_any ();
271   }
272   return result;
273 }
274 
275 static GstFlowReturn
gst_socket_src_fill(GstPushSrc * psrc,GstBuffer * outbuf)276 gst_socket_src_fill (GstPushSrc * psrc, GstBuffer * outbuf)
277 {
278   GstSocketSrc *src;
279   GstFlowReturn ret = GST_FLOW_OK;
280   gssize rret;
281   GError *err = NULL;
282   GstMapInfo map;
283   GSocket *socket = NULL;
284   GSocketControlMessage **messages = NULL;
285   gint num_messages = 0;
286   gint i;
287   GInputVector ivec;
288   gint flags = 0;
289 
290   src = GST_SOCKET_SRC (psrc);
291 
292   GST_OBJECT_LOCK (src);
293 
294   if (src->socket)
295     socket = g_object_ref (src->socket);
296 
297   GST_OBJECT_UNLOCK (src);
298 
299   if (socket == NULL)
300     goto no_socket;
301 
302   GST_LOG_OBJECT (src, "asked for a buffer");
303 
304 retry:
305   gst_buffer_map (outbuf, &map, GST_MAP_READWRITE);
306   ivec.buffer = map.data;
307   ivec.size = map.size;
308   rret =
309       g_socket_receive_message (socket, NULL, &ivec, 1, &messages,
310       &num_messages, &flags, src->cancellable, &err);
311   gst_buffer_unmap (outbuf, &map);
312 
313   for (i = 0; i < num_messages; i++) {
314     gst_buffer_add_net_control_message_meta (outbuf, messages[i]);
315     g_object_unref (messages[i]);
316     messages[i] = NULL;
317   }
318   g_free (messages);
319 
320   if (rret == 0) {
321     GSocket *tmp = NULL;
322     GST_DEBUG_OBJECT (src, "Received EOS on socket %p fd %i", socket,
323         g_socket_get_fd (socket));
324 
325     /* We've hit EOS but we'll send this signal to allow someone to change
326      * our socket before we send EOS downstream. */
327     g_signal_emit (src, gst_socket_src_signals[CONNECTION_CLOSED_BY_PEER], 0);
328 
329     GST_OBJECT_LOCK (src);
330 
331     if (src->socket)
332       tmp = g_object_ref (src->socket);
333 
334     GST_OBJECT_UNLOCK (src);
335 
336     /* Do this dance with tmp to avoid unreffing with the lock held */
337     if (tmp != NULL && tmp != socket) {
338       SWAP (socket, tmp);
339       g_clear_object (&tmp);
340 
341       GST_INFO_OBJECT (src, "New socket available after EOS %p fd %i: Retrying",
342           socket, g_socket_get_fd (socket));
343 
344       /* retry with our new socket: */
345       goto retry;
346     } else {
347       g_clear_object (&tmp);
348       GST_INFO_OBJECT (src, "Forwarding EOS downstream");
349       ret = GST_FLOW_EOS;
350     }
351   } else if (rret < 0) {
352     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
353       ret = GST_FLOW_FLUSHING;
354       GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
355     } else {
356       ret = GST_FLOW_ERROR;
357       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
358           ("Failed to read from socket: %s", err->message));
359     }
360   } else {
361     ret = GST_FLOW_OK;
362     gst_buffer_resize (outbuf, 0, rret);
363 
364     GST_LOG_OBJECT (src,
365         "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
366         GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
367         ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
368         gst_buffer_get_size (outbuf),
369         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
370         GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)),
371         GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf));
372   }
373   g_clear_error (&err);
374   g_clear_object (&socket);
375 
376   return ret;
377 
378 no_socket:
379   {
380     GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL),
381         ("Cannot receive: No socket set on socketsrc"));
382     return GST_FLOW_ERROR;
383   }
384 }
385 
386 static void
gst_socket_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)387 gst_socket_src_set_property (GObject * object, guint prop_id,
388     const GValue * value, GParamSpec * pspec)
389 {
390   GstSocketSrc *socketsrc = GST_SOCKET_SRC (object);
391 
392   switch (prop_id) {
393     case PROP_SOCKET:{
394       GSocket *socket = G_SOCKET (g_value_dup_object (value));
395       GST_OBJECT_LOCK (socketsrc);
396       SWAP (socket, socketsrc->socket);
397       GST_OBJECT_UNLOCK (socketsrc);
398       g_clear_object (&socket);
399       break;
400     }
401     case PROP_CAPS:
402     {
403       const GstCaps *new_caps_val = gst_value_get_caps (value);
404       GstCaps *new_caps;
405       GstCaps *old_caps;
406 
407       if (new_caps_val == NULL) {
408         new_caps = gst_caps_new_any ();
409       } else {
410         new_caps = gst_caps_copy (new_caps_val);
411       }
412 
413       GST_OBJECT_LOCK (socketsrc);
414       old_caps = socketsrc->caps;
415       socketsrc->caps = new_caps;
416       GST_OBJECT_UNLOCK (socketsrc);
417 
418       if (old_caps)
419         gst_caps_unref (old_caps);
420 
421       gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (socketsrc));
422       break;
423     }
424     case PROP_SEND_MESSAGES:
425       socketsrc->send_messages = g_value_get_boolean (value);
426       break;
427     default:
428       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
429       break;
430   }
431 }
432 
433 static void
gst_socket_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)434 gst_socket_src_get_property (GObject * object, guint prop_id,
435     GValue * value, GParamSpec * pspec)
436 {
437   GstSocketSrc *socketsrc = GST_SOCKET_SRC (object);
438 
439   switch (prop_id) {
440     case PROP_SOCKET:
441       g_value_set_object (value, socketsrc->socket);
442       break;
443     case PROP_CAPS:
444       GST_OBJECT_LOCK (socketsrc);
445       gst_value_set_caps (value, socketsrc->caps);
446       GST_OBJECT_UNLOCK (socketsrc);
447       break;
448     case PROP_SEND_MESSAGES:
449       g_value_set_boolean (value, socketsrc->send_messages);
450       break;
451     default:
452       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
453       break;
454   }
455 }
456 
457 static gboolean
gst_socket_src_unlock(GstBaseSrc * bsrc)458 gst_socket_src_unlock (GstBaseSrc * bsrc)
459 {
460   GstSocketSrc *src = GST_SOCKET_SRC (bsrc);
461 
462   GST_DEBUG_OBJECT (src, "set to flushing");
463   g_cancellable_cancel (src->cancellable);
464 
465   return TRUE;
466 }
467 
468 static gboolean
gst_socket_src_unlock_stop(GstBaseSrc * bsrc)469 gst_socket_src_unlock_stop (GstBaseSrc * bsrc)
470 {
471   GstSocketSrc *src = GST_SOCKET_SRC (bsrc);
472 
473   GST_DEBUG_OBJECT (src, "unset flushing");
474   g_cancellable_reset (src->cancellable);
475 
476   return TRUE;
477 }
478