1 /* GStreamer
2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4 * Copyright (C) <2011> Collabora Ltd.
5 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6 * Copyright (C) <2014> William Manley <will@williammanley.net>
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Library General Public
10 * License as published by the Free Software Foundation; either
11 * version 2 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Library General Public License for more details.
17 *
18 * You should have received a copy of the GNU Library General Public
19 * License along with this library; if not, write to the
20 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
21 * Boston, MA 02110-1301, USA.
22 */
23
24 /**
25 * SECTION:element-socketsrc
26 * @title: socketsrc
27 *
28 * Receive data from a socket.
29 *
30 * As compared to other elements:
31 *
32 * socketsrc can be considered a source counterpart to the #multisocketsink
33 * sink.
34 *
35 * socketsrc can also be considered a generalization of #tcpclientsrc and
36 * #tcpserversrc: it contains all the logic required to communicate over the
37 * socket but none of the logic for creating the sockets/establishing the
38 * connection in the first place, allowing the user to accomplish this
39 * externally in whatever manner they wish making it applicable to other types
40 * of sockets besides TCP.
41 *
42 * As compared to #fdsrc socketsrc is socket specific and deals with #GSocket
43 * objects rather than sockets via integer file-descriptors.
44 *
45 * @see_also: #multisocketsink
46 */
47
48 #ifdef HAVE_CONFIG_H
49 #include "config.h"
50 #endif
51
52 #include <gst/gst-i18n-plugin.h>
53 #include <gst/net/gstnetcontrolmessagemeta.h>
54 #include "gsttcpelements.h"
55 #include "gstsocketsrc.h"
56
57 GST_DEBUG_CATEGORY_STATIC (socketsrc_debug);
58 #define GST_CAT_DEFAULT socketsrc_debug
59
60 #define MAX_READ_SIZE 4 * 1024
61
62
63 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
64 GST_PAD_SRC,
65 GST_PAD_ALWAYS,
66 GST_STATIC_CAPS_ANY);
67
68
69 #define DEFAULT_SEND_MESSAGES FALSE
70
71 enum
72 {
73 PROP_0,
74 PROP_SOCKET,
75 PROP_CAPS,
76 PROP_SEND_MESSAGES
77 };
78
79 enum
80 {
81 CONNECTION_CLOSED_BY_PEER,
82 LAST_SIGNAL
83 };
84
85 static guint gst_socket_src_signals[LAST_SIGNAL] = { 0 };
86
87 #define gst_socket_src_parent_class parent_class
88 G_DEFINE_TYPE (GstSocketSrc, gst_socket_src, GST_TYPE_PUSH_SRC);
89 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (socketsrc, "socketsrc",
90 GST_RANK_NONE, GST_TYPE_SOCKET_SRC, tcp_element_init (plugin));
91
92 static void gst_socket_src_finalize (GObject * gobject);
93
94 static GstCaps *gst_socketsrc_getcaps (GstBaseSrc * src, GstCaps * filter);
95 static gboolean gst_socketsrc_event (GstBaseSrc * src, GstEvent * event);
96 static GstFlowReturn gst_socket_src_fill (GstPushSrc * psrc,
97 GstBuffer * outbuf);
98 static gboolean gst_socket_src_unlock (GstBaseSrc * bsrc);
99 static gboolean gst_socket_src_unlock_stop (GstBaseSrc * bsrc);
100
101 static void gst_socket_src_set_property (GObject * object, guint prop_id,
102 const GValue * value, GParamSpec * pspec);
103 static void gst_socket_src_get_property (GObject * object, guint prop_id,
104 GValue * value, GParamSpec * pspec);
105
106 #define SWAP(a, b) do { GSocket* _swap_tmp = a; a = b; b = _swap_tmp; } while (0);
107
108 static void
gst_socket_src_class_init(GstSocketSrcClass * klass)109 gst_socket_src_class_init (GstSocketSrcClass * klass)
110 {
111 GObjectClass *gobject_class;
112 GstElementClass *gstelement_class;
113 GstBaseSrcClass *gstbasesrc_class;
114 GstPushSrcClass *gstpush_src_class;
115
116 gobject_class = (GObjectClass *) klass;
117 gstelement_class = (GstElementClass *) klass;
118 gstbasesrc_class = (GstBaseSrcClass *) klass;
119 gstpush_src_class = (GstPushSrcClass *) klass;
120
121 gobject_class->set_property = gst_socket_src_set_property;
122 gobject_class->get_property = gst_socket_src_get_property;
123 gobject_class->finalize = gst_socket_src_finalize;
124
125 g_object_class_install_property (gobject_class, PROP_SOCKET,
126 g_param_spec_object ("socket", "Socket",
127 "The socket to receive packets from", G_TYPE_SOCKET,
128 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
129
130 g_object_class_install_property (gobject_class, PROP_CAPS,
131 g_param_spec_boxed ("caps", "Caps",
132 "The caps of the source pad", GST_TYPE_CAPS,
133 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
134
135 /**
136 * GstSocketSrc:send-messages:
137 *
138 * Control if the source will handle GstNetworkMessage events.
139 * The event is a CUSTOM event named 'GstNetworkMessage' and contains:
140 *
141 * "buffer", GST_TYPE_BUFFER : the buffer with data to send
142 *
143 * The buffer in the event will be sent on the socket. This allows
144 * for simple bidirectional communication.
145 *
146 * Since: 1.8.0
147 **/
148 g_object_class_install_property (gobject_class, PROP_SEND_MESSAGES,
149 g_param_spec_boolean ("send-messages", "Send Messages",
150 "If GstNetworkMessage events should be handled",
151 DEFAULT_SEND_MESSAGES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
152
153 gst_socket_src_signals[CONNECTION_CLOSED_BY_PEER] =
154 g_signal_new ("connection-closed-by-peer", G_TYPE_FROM_CLASS (klass),
155 G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSocketSrcClass,
156 connection_closed_by_peer), NULL, NULL, NULL, G_TYPE_NONE, 0);
157
158 gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
159
160 gst_element_class_set_static_metadata (gstelement_class,
161 "socket source", "Source/Network",
162 "Receive data from a socket",
163 "Thomas Vander Stichele <thomas at apestaart dot org>, "
164 "William Manley <will@williammanley.net>");
165
166 gstbasesrc_class->event = gst_socketsrc_event;
167 gstbasesrc_class->get_caps = gst_socketsrc_getcaps;
168 gstbasesrc_class->unlock = gst_socket_src_unlock;
169 gstbasesrc_class->unlock_stop = gst_socket_src_unlock_stop;
170
171 gstpush_src_class->fill = gst_socket_src_fill;
172
173 GST_DEBUG_CATEGORY_INIT (socketsrc_debug, "socketsrc", 0, "Socket Source");
174 }
175
176 static void
gst_socket_src_init(GstSocketSrc * this)177 gst_socket_src_init (GstSocketSrc * this)
178 {
179 this->socket = NULL;
180 this->cancellable = g_cancellable_new ();
181 this->send_messages = DEFAULT_SEND_MESSAGES;
182 }
183
184 static void
gst_socket_src_finalize(GObject * gobject)185 gst_socket_src_finalize (GObject * gobject)
186 {
187 GstSocketSrc *this = GST_SOCKET_SRC (gobject);
188
189 if (this->caps)
190 gst_caps_unref (this->caps);
191 g_clear_object (&this->cancellable);
192 g_clear_object (&this->socket);
193
194 G_OBJECT_CLASS (parent_class)->finalize (gobject);
195 }
196
197 static gboolean
gst_socketsrc_event(GstBaseSrc * bsrc,GstEvent * event)198 gst_socketsrc_event (GstBaseSrc * bsrc, GstEvent * event)
199 {
200 GstSocketSrc *src;
201 gboolean res = FALSE;
202
203 src = GST_SOCKET_SRC (bsrc);
204
205 switch (GST_EVENT_TYPE (event)) {
206 case GST_EVENT_CUSTOM_UPSTREAM:
207 if (src->send_messages && gst_event_has_name (event, "GstNetworkMessage")) {
208 const GstStructure *str = gst_event_get_structure (event);
209 GSocket *socket;
210
211 GST_OBJECT_LOCK (src);
212 if ((socket = src->socket))
213 g_object_ref (socket);
214 GST_OBJECT_UNLOCK (src);
215
216 if (socket) {
217 GstBuffer *buf;
218 GstMapInfo map;
219 GError *err = NULL;
220 gssize ret;
221
222 gst_structure_get (str, "buffer", GST_TYPE_BUFFER, &buf, NULL);
223
224 if (buf) {
225 gst_buffer_map (buf, &map, GST_MAP_READ);
226 GST_LOG ("sending buffer of size %" G_GSIZE_FORMAT, map.size);
227 ret = g_socket_send_with_blocking (socket, (gchar *) map.data,
228 map.size, FALSE, src->cancellable, &err);
229 gst_buffer_unmap (buf, &map);
230
231 if (ret == -1) {
232 GST_WARNING ("could not send message: %s", err->message);
233 g_clear_error (&err);
234 res = FALSE;
235 } else
236 res = TRUE;
237 gst_buffer_unref (buf);
238 }
239 g_object_unref (socket);
240 }
241 }
242 break;
243 default:
244 res = GST_BASE_SRC_CLASS (parent_class)->event (bsrc, event);
245 break;
246 }
247 return res;
248 }
249
250 static GstCaps *
gst_socketsrc_getcaps(GstBaseSrc * src,GstCaps * filter)251 gst_socketsrc_getcaps (GstBaseSrc * src, GstCaps * filter)
252 {
253 GstSocketSrc *socketsrc;
254 GstCaps *caps, *result;
255
256 socketsrc = GST_SOCKET_SRC (src);
257
258 GST_OBJECT_LOCK (src);
259 if ((caps = socketsrc->caps))
260 gst_caps_ref (caps);
261 GST_OBJECT_UNLOCK (src);
262
263 if (caps) {
264 if (filter) {
265 result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
266 gst_caps_unref (caps);
267 } else {
268 result = caps;
269 }
270 } else {
271 result = (filter) ? gst_caps_ref (filter) : gst_caps_new_any ();
272 }
273 return result;
274 }
275
276 static GstFlowReturn
gst_socket_src_fill(GstPushSrc * psrc,GstBuffer * outbuf)277 gst_socket_src_fill (GstPushSrc * psrc, GstBuffer * outbuf)
278 {
279 GstSocketSrc *src;
280 GstFlowReturn ret = GST_FLOW_OK;
281 gssize rret;
282 GError *err = NULL;
283 GstMapInfo map;
284 GSocket *socket = NULL;
285 GSocketControlMessage **messages = NULL;
286 gint num_messages = 0;
287 gint i;
288 GInputVector ivec;
289 gint flags = 0;
290
291 src = GST_SOCKET_SRC (psrc);
292
293 GST_OBJECT_LOCK (src);
294
295 if (src->socket)
296 socket = g_object_ref (src->socket);
297
298 GST_OBJECT_UNLOCK (src);
299
300 if (socket == NULL)
301 goto no_socket;
302
303 GST_LOG_OBJECT (src, "asked for a buffer");
304
305 retry:
306 gst_buffer_map (outbuf, &map, GST_MAP_READWRITE);
307 ivec.buffer = map.data;
308 ivec.size = map.size;
309 rret =
310 g_socket_receive_message (socket, NULL, &ivec, 1, &messages,
311 &num_messages, &flags, src->cancellable, &err);
312 gst_buffer_unmap (outbuf, &map);
313
314 for (i = 0; i < num_messages; i++) {
315 gst_buffer_add_net_control_message_meta (outbuf, messages[i]);
316 g_object_unref (messages[i]);
317 messages[i] = NULL;
318 }
319 g_free (messages);
320
321 if (rret == 0) {
322 GSocket *tmp = NULL;
323 GST_DEBUG_OBJECT (src, "Received EOS on socket %p fd %i", socket,
324 g_socket_get_fd (socket));
325
326 /* We've hit EOS but we'll send this signal to allow someone to change
327 * our socket before we send EOS downstream. */
328 g_signal_emit (src, gst_socket_src_signals[CONNECTION_CLOSED_BY_PEER], 0);
329
330 GST_OBJECT_LOCK (src);
331
332 if (src->socket)
333 tmp = g_object_ref (src->socket);
334
335 GST_OBJECT_UNLOCK (src);
336
337 /* Do this dance with tmp to avoid unreffing with the lock held */
338 if (tmp != NULL && tmp != socket) {
339 SWAP (socket, tmp);
340 g_clear_object (&tmp);
341
342 GST_INFO_OBJECT (src, "New socket available after EOS %p fd %i: Retrying",
343 socket, g_socket_get_fd (socket));
344
345 /* retry with our new socket: */
346 goto retry;
347 } else {
348 g_clear_object (&tmp);
349 GST_INFO_OBJECT (src, "Forwarding EOS downstream");
350 ret = GST_FLOW_EOS;
351 }
352 } else if (rret < 0) {
353 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
354 ret = GST_FLOW_FLUSHING;
355 GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
356 } else {
357 ret = GST_FLOW_ERROR;
358 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
359 ("Failed to read from socket: %s", err->message));
360 }
361 } else {
362 ret = GST_FLOW_OK;
363 gst_buffer_resize (outbuf, 0, rret);
364
365 GST_LOG_OBJECT (src,
366 "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
367 GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
368 ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
369 gst_buffer_get_size (outbuf),
370 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (outbuf)),
371 GST_TIME_ARGS (GST_BUFFER_DURATION (outbuf)),
372 GST_BUFFER_OFFSET (outbuf), GST_BUFFER_OFFSET_END (outbuf));
373 }
374 g_clear_error (&err);
375 g_clear_object (&socket);
376
377 return ret;
378
379 no_socket:
380 {
381 GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND, (NULL),
382 ("Cannot receive: No socket set on socketsrc"));
383 return GST_FLOW_ERROR;
384 }
385 }
386
387 static void
gst_socket_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)388 gst_socket_src_set_property (GObject * object, guint prop_id,
389 const GValue * value, GParamSpec * pspec)
390 {
391 GstSocketSrc *socketsrc = GST_SOCKET_SRC (object);
392
393 switch (prop_id) {
394 case PROP_SOCKET:{
395 GSocket *socket = G_SOCKET (g_value_dup_object (value));
396 GST_OBJECT_LOCK (socketsrc);
397 SWAP (socket, socketsrc->socket);
398 GST_OBJECT_UNLOCK (socketsrc);
399 g_clear_object (&socket);
400 break;
401 }
402 case PROP_CAPS:
403 {
404 const GstCaps *new_caps_val = gst_value_get_caps (value);
405 GstCaps *new_caps;
406 GstCaps *old_caps;
407
408 if (new_caps_val == NULL) {
409 new_caps = gst_caps_new_any ();
410 } else {
411 new_caps = gst_caps_copy (new_caps_val);
412 }
413
414 GST_OBJECT_LOCK (socketsrc);
415 old_caps = socketsrc->caps;
416 socketsrc->caps = new_caps;
417 GST_OBJECT_UNLOCK (socketsrc);
418
419 if (old_caps)
420 gst_caps_unref (old_caps);
421
422 gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (socketsrc));
423 break;
424 }
425 case PROP_SEND_MESSAGES:
426 socketsrc->send_messages = g_value_get_boolean (value);
427 break;
428 default:
429 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
430 break;
431 }
432 }
433
434 static void
gst_socket_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)435 gst_socket_src_get_property (GObject * object, guint prop_id,
436 GValue * value, GParamSpec * pspec)
437 {
438 GstSocketSrc *socketsrc = GST_SOCKET_SRC (object);
439
440 switch (prop_id) {
441 case PROP_SOCKET:
442 g_value_set_object (value, socketsrc->socket);
443 break;
444 case PROP_CAPS:
445 GST_OBJECT_LOCK (socketsrc);
446 gst_value_set_caps (value, socketsrc->caps);
447 GST_OBJECT_UNLOCK (socketsrc);
448 break;
449 case PROP_SEND_MESSAGES:
450 g_value_set_boolean (value, socketsrc->send_messages);
451 break;
452 default:
453 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
454 break;
455 }
456 }
457
458 static gboolean
gst_socket_src_unlock(GstBaseSrc * bsrc)459 gst_socket_src_unlock (GstBaseSrc * bsrc)
460 {
461 GstSocketSrc *src = GST_SOCKET_SRC (bsrc);
462
463 GST_DEBUG_OBJECT (src, "set to flushing");
464 g_cancellable_cancel (src->cancellable);
465
466 return TRUE;
467 }
468
469 static gboolean
gst_socket_src_unlock_stop(GstBaseSrc * bsrc)470 gst_socket_src_unlock_stop (GstBaseSrc * bsrc)
471 {
472 GstSocketSrc *src = GST_SOCKET_SRC (bsrc);
473
474 GST_DEBUG_OBJECT (src, "unset flushing");
475 g_cancellable_reset (src->cancellable);
476
477 return TRUE;
478 }
479