1 /* GStreamer
2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4 * Copyright (C) <2011> Collabora Ltd.
5 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6 * Copyright (C) <2014> William Manley <will@williammanley.net>
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
17 *
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
22 */
23
24 /**
25 * SECTION:element-socketsrc
26 * @title: socketsrc
27 *
28 * Receive data from a socket.
29 *
30 * As compared to other elements:
31 *
32 * socketsrc can be considered a source counterpart to the #multisocketsink
33 * sink.
34 *
35 * socketsrc can also be considered a generalization of #tcpclientsrc and
36 * #tcpserversrc: it contains all the logic required to communicate over the
37 * socket but none of the logic for creating the sockets/establishing the
38 * connection in the first place, allowing the user to accomplish this
39 * externally in whatever manner they wish making it applicable to other types
40 * of sockets besides TCP.
41 *
42 * As compared to #fdsrc socketsrc is socket specific and deals with #GSocket
43 * objects rather than sockets via integer file-descriptors.
44 *
45 * @see_also: #multisocketsink
46 */
47
48 #ifdef HAVE_CONFIG_H
49 #include "config.h"
50 #endif
51
52 #include <gst/gst-i18n-plugin.h>
53 #include <gst/net/gstnetcontrolmessagemeta.h>
54 #include "gstsocketsrc.h"
55 #include "gsttcp.h"
56
57 GST_DEBUG_CATEGORY_STATIC (socketsrc_debug);
58 #define GST_CAT_DEFAULT socketsrc_debug
59
60 #define MAX_READ_SIZE 4 * 1024
61
62
63 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
64 GST_PAD_SRC,
65 GST_PAD_ALWAYS,
66 GST_STATIC_CAPS_ANY);
67
68
69 #define DEFAULT_SEND_MESSAGES FALSE
70
71 enum
72 {
73 PROP_0,
74 PROP_SOCKET,
75 PROP_CAPS,
76 PROP_SEND_MESSAGES
77 };
78
79 enum
80 {
81 CONNECTION_CLOSED_BY_PEER,
82 LAST_SIGNAL
83 };
84
85 static guint gst_socket_src_signals[LAST_SIGNAL] = { 0 };
86
87 #define gst_socket_src_parent_class parent_class
88 G_DEFINE_TYPE (GstSocketSrc, gst_socket_src, GST_TYPE_PUSH_SRC);
89
90
91 static void gst_socket_src_finalize (GObject * gobject);
92
93 static GstCaps *gst_socketsrc_getcaps (GstBaseSrc * src, GstCaps * filter);
94 static gboolean gst_socketsrc_event (GstBaseSrc * src, GstEvent * event);
95 static GstFlowReturn gst_socket_src_fill (GstPushSrc * psrc,
96 GstBuffer * outbuf);
97 static gboolean gst_socket_src_unlock (GstBaseSrc * bsrc);
98 static gboolean gst_socket_src_unlock_stop (GstBaseSrc * bsrc);
99
100 static void gst_socket_src_set_property (GObject * object, guint prop_id,
101 const GValue * value, GParamSpec * pspec);
102 static void gst_socket_src_get_property (GObject * object, guint prop_id,
103 GValue * value, GParamSpec * pspec);
104
105 #define SWAP(a, b) do { GSocket* _swap_tmp = a; a = b; b = _swap_tmp; } while (0);
106
107 static void
gst_socket_src_class_init(GstSocketSrcClass * klass)108 gst_socket_src_class_init (GstSocketSrcClass * klass)
109 {
110 GObjectClass *gobject_class;
111 GstElementClass *gstelement_class;
112 GstBaseSrcClass *gstbasesrc_class;
113 GstPushSrcClass *gstpush_src_class;
114
115 gobject_class = (GObjectClass *) klass;
116 gstelement_class = (GstElementClass *) klass;
117 gstbasesrc_class = (GstBaseSrcClass *) klass;
118 gstpush_src_class = (GstPushSrcClass *) klass;
119
120 gobject_class->set_property = gst_socket_src_set_property;
121 gobject_class->get_property = gst_socket_src_get_property;
122 gobject_class->finalize = gst_socket_src_finalize;
123
124 g_object_class_install_property (gobject_class, PROP_SOCKET,
125 g_param_spec_object ("socket", "Socket",
126 "The socket to receive packets from", G_TYPE_SOCKET,
127 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
128
129 g_object_class_install_property (gobject_class, PROP_CAPS,
130 g_param_spec_boxed ("caps", "Caps",
131 "The caps of the source pad", GST_TYPE_CAPS,
132 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
133
134 /**
135 * GstSocketSrc:send-messages:
136 *
137 * Control if the source will handle GstNetworkMessage events.
138 * The event is a CUSTOM event named 'GstNetworkMessage' and contains:
139 *
140 * "buffer", GST_TYPE_BUFFER : the buffer with data to send
141 *
142 * The buffer in the event will be sent on the socket. This allows
143 * for simple bidirectional communication.
144 *
145 * Since: 1.8.0
146 **/
147 g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
148 g_param_spec_boolean ("send-messages", "Send Messages",
149 "If GstNetworkMessage events should be handled",
150 DEFAULT_SEND_MESSAGES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
151
152 gst_socket_src_signals[CONNECTION_CLOSED_BY_PEER] =
153 g_signal_new ("connection-closed-by-peer", G_TYPE_FROM_CLASS (klass),
154 G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSocketSrcClass,
155 connection_closed_by_peer), NULL, NULL, NULL, G_TYPE_NONE, 0);
156
157 gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
158
159 gst_element_class_set_static_metadata (gstelement_class,
160 "socket source", "Source/Network",
161 "Receive data from a socket",
162 "Thomas Vander Stichele <thomas at apestaart dot org>, "
163 "William Manley <will@williammanley.net>");
164
165 gstbasesrc_class->event = gst_socketsrc_event;
166 gstbasesrc_class->get_caps = gst_socketsrc_getcaps;
167 gstbasesrc_class->unlock = gst_socket_src_unlock;
168 gstbasesrc_class->unlock_stop = gst_socket_src_unlock_stop;
169
170 gstpush_src_class->fill = gst_socket_src_fill;
171
172 GST_DEBUG_CATEGORY_INIT (socketsrc_debug, "socketsrc", 0, "Socket Source");
173 }
174
175 static void
gst_socket_src_init(GstSocketSrc * this)176 gst_socket_src_init (GstSocketSrc * this)
177 {
178 this->socket = NULL;
179 this->cancellable = g_cancellable_new ();
180 this->send_messages = DEFAULT_SEND_MESSAGES;
181 }
182
183 static void
gst_socket_src_finalize(GObject * gobject)184 gst_socket_src_finalize (GObject * gobject)
185 {
186 GstSocketSrc *this = GST_SOCKET_SRC (gobject);
187
188 if (this->caps)
189 gst_caps_unref (this->caps);
190 g_clear_object (&this->cancellable);
191 g_clear_object (&this->socket);
192
193 G_OBJECT_CLASS (parent_class)->finalize (gobject);
194 }
195
196 static gboolean
gst_socketsrc_event(GstBaseSrc * bsrc,GstEvent * event)197 gst_socketsrc_event (GstBaseSrc * bsrc, GstEvent * event)
198 {
199 GstSocketSrc *src;
200 gboolean res = FALSE;
201
202 src = GST_SOCKET_SRC (bsrc);
203
204 switch (GST_EVENT_TYPE (event)) {
205 case GST_EVENT_CUSTOM_UPSTREAM:
206 if (src->send_messages && gst_event_has_name (event, "GstNetworkMessage")) {
207 const GstStructure *str = gst_event_get_structure (event);
208 GSocket *socket;
209
210 GST_OBJECT_LOCK (src);
211 if ((socket = src->socket))
212 g_object_ref (socket);
213 GST_OBJECT_UNLOCK (src);
214
215 if (socket) {
216 GstBuffer *buf;
217 GstMapInfo map;
218 GError *err = NULL;
219 gssize ret;
220
221 gst_structure_get (str, "buffer", GST_TYPE_BUFFER, &buf, NULL);
222
223 if (buf) {
224 gst_buffer_map (buf, &map, GST_MAP_READ);
225 GST_LOG ("sending buffer of size %" G_GSIZE_FORMAT, map.size);
226 ret = g_socket_send_with_blocking (socket, (gchar *) map.data,
227 map.size, FALSE, src->cancellable, &err);
228 gst_buffer_unmap (buf, &map);
229
230 if (ret == -1) {
231 GST_WARNING ("could not send message: %s", err->message);
232 g_clear_error (&err);
233 res = FALSE;
234 } else
235 res = TRUE;
236 gst_buffer_unref (buf);
237 }
238 g_object_unref (socket);
239 }
240 }
241 break;
242 default:
243 res = GST_BASE_SRC_CLASS (parent_class)->event (bsrc, event);
244 break;
245 }
246 return res;
247 }
248
249 static GstCaps *
gst_socketsrc_getcaps(GstBaseSrc * src,GstCaps * filter)250 gst_socketsrc_getcaps (GstBaseSrc * src, GstCaps * filter)
251 {
252 GstSocketSrc *socketsrc;
253 GstCaps *caps, *result;
254
255 socketsrc = GST_SOCKET_SRC (src);
256
257 GST_OBJECT_LOCK (src);
258 if ((caps = socketsrc->caps))
259 gst_caps_ref (caps);
260 GST_OBJECT_UNLOCK (src);
261
262 if (caps) {
263 if (filter) {
264 result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
265 gst_caps_unref (caps);
266 } else {
267 result = caps;
268 }
269 } else {
270 result = (filter) ? gst_caps_ref (filter) : gst_caps_new_any ();
271 }
272 return result;
273 }
274
275 static GstFlowReturn
gst_socket_src_fill(GstPushSrc * psrc,GstBuffer * outbuf)276 gst_socket_src_fill (GstPushSrc * psrc, GstBuffer * outbuf)
277 {
278 GstSocketSrc *src;
279 GstFlowReturn ret = GST_FLOW_OK;
280 gssize rret;
281 GError *err = NULL;
282 GstMapInfo map;
283 GSocket *socket = NULL;
284 GSocketControlMessage **messages = NULL;
285 gint num_messages = 0;
286 gint i;
287 GInputVector ivec;
288 gint flags = 0;
289
290 src = GST_SOCKET_SRC (psrc);
291
292 GST_OBJECT_LOCK (src);
293
294 if (src->socket)
295 socket = g_object_ref (src->socket);
296
297 GST_OBJECT_UNLOCK (src);
298
299 if (socket == NULL)
300 goto no_socket;
301
302 GST_LOG_OBJECT (src, "asked for a buffer");
303
304 retry:
305 gst_buffer_map (outbuf, &map, GST_MAP_READWRITE);
306 ivec.buffer = map.data;
307 ivec.size = map.size;
308 rret =
309 g_socket_receive_message (socket, NULL, &ivec, 1, &messages,
310 &num_messages, &flags, src->cancellable, &err);
311 gst_buffer_unmap (outbuf, &map);
312
313 for (i = 0; i < num_messages; i++) {
314 gst_buffer_add_net_control_message_meta (outbuf, messages[i]);
315 g_object_unref (messages[i]);
316 messages[i] = NULL;
317 }
318 g_free (messages);
319
320 if (rret == 0) {
321 GSocket *tmp = NULL;
322 GST_DEBUG_OBJECT (src, "Received EOS on socket %p fd %i", socket,
323 g_socket_get_fd (socket));
324
325 /* We've hit EOS but we'll send this signal to allow someone to change
326 * our socket before we send EOS downstream. */
327 g_signal_emit (src, gst_socket_src_signals[CONNECTION_CLOSED_BY_PEER], 0);
328
329 GST_OBJECT_LOCK (src);
330
331 if (src->socket)
332 tmp = g_object_ref (src->socket);
333
334 GST_OBJECT_UNLOCK (src);
335
336 /* Do this dance with tmp to avoid unreffing with the lock held */
337 if (tmp != NULL && tmp != socket) {
338 SWAP (socket, tmp);
339 g_clear_object (&tmp);
340
341 GST_INFO_OBJECT (src, "New socket available after EOS %p fd %i: Retrying",
342 socket, g_socket_get_fd (socket));
343
344 /* retry with our new socket: */
345 goto retry;
346 } else {
347 g_clear_object (&tmp);
348 GST_INFO_OBJECT (src, "Forwarding EOS downstream");
349 ret = GST_FLOW_EOS;
350 }
351 } else if (rret < 0) {
352 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
353 ret = GST_FLOW_FLUSHING;
354 GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
355 } else {
356 ret = GST_FLOW_ERROR;
357 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
358 ("Failed to read from socket: %s", err->message));
359 }
360 } else {
361 ret = GST_FLOW_OK;
362 gst_buffer_resize (outbuf, 0, rret);
363
364 GST_LOG_OBJECT (src,
365 "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
366 GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
367 ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
368 gst_buffer_get_size (outbuf),
369 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
370 GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)),
371 GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf));
372 }
373 g_clear_error (&err);
374 g_clear_object (&socket);
375
376 return ret;
377
378 no_socket:
379 {
380 GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL),
381 ("Cannot receive: No socket set on socketsrc"));
382 return GST_FLOW_ERROR;
383 }
384 }
385
386 static void
gst_socket_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)387 gst_socket_src_set_property (GObject * object, guint prop_id,
388 const GValue * value, GParamSpec * pspec)
389 {
390 GstSocketSrc *socketsrc = GST_SOCKET_SRC (object);
391
392 switch (prop_id) {
393 case PROP_SOCKET:{
394 GSocket *socket = G_SOCKET (g_value_dup_object (value));
395 GST_OBJECT_LOCK (socketsrc);
396 SWAP (socket, socketsrc->socket);
397 GST_OBJECT_UNLOCK (socketsrc);
398 g_clear_object (&socket);
399 break;
400 }
401 case PROP_CAPS:
402 {
403 const GstCaps *new_caps_val = gst_value_get_caps (value);
404 GstCaps *new_caps;
405 GstCaps *old_caps;
406
407 if (new_caps_val == NULL) {
408 new_caps = gst_caps_new_any ();
409 } else {
410 new_caps = gst_caps_copy (new_caps_val);
411 }
412
413 GST_OBJECT_LOCK (socketsrc);
414 old_caps = socketsrc->caps;
415 socketsrc->caps = new_caps;
416 GST_OBJECT_UNLOCK (socketsrc);
417
418 if (old_caps)
419 gst_caps_unref (old_caps);
420
421 gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (socketsrc));
422 break;
423 }
424 case PROP_SEND_MESSAGES:
425 socketsrc->send_messages = g_value_get_boolean (value);
426 break;
427 default:
428 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
429 break;
430 }
431 }
432
433 static void
gst_socket_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)434 gst_socket_src_get_property (GObject * object, guint prop_id,
435 GValue * value, GParamSpec * pspec)
436 {
437 GstSocketSrc *socketsrc = GST_SOCKET_SRC (object);
438
439 switch (prop_id) {
440 case PROP_SOCKET:
441 g_value_set_object (value, socketsrc->socket);
442 break;
443 case PROP_CAPS:
444 GST_OBJECT_LOCK (socketsrc);
445 gst_value_set_caps (value, socketsrc->caps);
446 GST_OBJECT_UNLOCK (socketsrc);
447 break;
448 case PROP_SEND_MESSAGES:
449 g_value_set_boolean (value, socketsrc->send_messages);
450 break;
451 default:
452 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
453 break;
454 }
455 }
456
457 static gboolean
gst_socket_src_unlock(GstBaseSrc * bsrc)458 gst_socket_src_unlock (GstBaseSrc * bsrc)
459 {
460 GstSocketSrc *src = GST_SOCKET_SRC (bsrc);
461
462 GST_DEBUG_OBJECT (src, "set to flushing");
463 g_cancellable_cancel (src->cancellable);
464
465 return TRUE;
466 }
467
468 static gboolean
gst_socket_src_unlock_stop(GstBaseSrc * bsrc)469 gst_socket_src_unlock_stop (GstBaseSrc * bsrc)
470 {
471 GstSocketSrc *src = GST_SOCKET_SRC (bsrc);
472
473 GST_DEBUG_OBJECT (src, "unset flushing");
474 g_cancellable_reset (src->cancellable);
475
476 return TRUE;
477 }
478