1 /* GStreamer
2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4 * Copyright (C) <2011> Collabora Ltd.
5 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
16 *
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 /**
24 * SECTION:element-tcpserversrc
25 * @title: tcpserversrc
26 * @see_also: #tcpserversink
27 *
28 * ## Example launch line (server):
29 * |[
30 * gst-launch-1.0 tcpserversrc port=3000 ! fdsink fd=2
31 * ]|
32 * ## Example launch line (client):
33 * |[
34 * gst-launch-1.0 fdsrc fd=1 ! tcpclientsink port=3000
35 * ]|
36 *
37 */
38
39 #ifdef HAVE_CONFIG_H
40 #include "config.h"
41 #endif
42
43 #include <gst/gst-i18n-plugin.h>
44 #include "gsttcp.h"
45 #include "gsttcpserversrc.h"
46
47 GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug);
48 #define GST_CAT_DEFAULT tcpserversrc_debug
49
50 #define TCP_DEFAULT_LISTEN_HOST NULL /* listen on all interfaces */
51 #define TCP_BACKLOG 1 /* client connection queue */
52
53 #define MAX_READ_SIZE 4 * 1024
54
55 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
56 GST_PAD_SRC,
57 GST_PAD_ALWAYS,
58 GST_STATIC_CAPS_ANY);
59
60 enum
61 {
62 PROP_0,
63 PROP_HOST,
64 PROP_PORT,
65 PROP_CURRENT_PORT
66 };
67
68 #define gst_tcp_server_src_parent_class parent_class
69 G_DEFINE_TYPE (GstTCPServerSrc, gst_tcp_server_src, GST_TYPE_PUSH_SRC);
70
71 static void gst_tcp_server_src_finalize (GObject * gobject);
72
73 static gboolean gst_tcp_server_src_start (GstBaseSrc * bsrc);
74 static gboolean gst_tcp_server_src_stop (GstBaseSrc * bsrc);
75 static gboolean gst_tcp_server_src_unlock (GstBaseSrc * bsrc);
76 static gboolean gst_tcp_server_src_unlock_stop (GstBaseSrc * bsrc);
77 static GstFlowReturn gst_tcp_server_src_create (GstPushSrc * psrc,
78 GstBuffer ** buf);
79
80 static void gst_tcp_server_src_set_property (GObject * object, guint prop_id,
81 const GValue * value, GParamSpec * pspec);
82 static void gst_tcp_server_src_get_property (GObject * object, guint prop_id,
83 GValue * value, GParamSpec * pspec);
84
85 static void
gst_tcp_server_src_class_init(GstTCPServerSrcClass * klass)86 gst_tcp_server_src_class_init (GstTCPServerSrcClass * klass)
87 {
88 GObjectClass *gobject_class;
89 GstElementClass *gstelement_class;
90 GstBaseSrcClass *gstbasesrc_class;
91 GstPushSrcClass *gstpush_src_class;
92
93 gobject_class = (GObjectClass *) klass;
94 gstelement_class = (GstElementClass *) klass;
95 gstbasesrc_class = (GstBaseSrcClass *) klass;
96 gstpush_src_class = (GstPushSrcClass *) klass;
97
98 gobject_class->set_property = gst_tcp_server_src_set_property;
99 gobject_class->get_property = gst_tcp_server_src_get_property;
100 gobject_class->finalize = gst_tcp_server_src_finalize;
101
102 /* FIXME 2.0: Rename this to bind-address, host does not make much
103 * sense here */
104 g_object_class_install_property (gobject_class, PROP_HOST,
105 g_param_spec_string ("host", "Host", "The hostname to listen as",
106 TCP_DEFAULT_LISTEN_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
107 g_object_class_install_property (gobject_class, PROP_PORT,
108 g_param_spec_int ("port", "Port",
109 "The port to listen to (0=random available port)",
110 0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT,
111 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
112 /**
113 * GstTCPServerSrc:current-port:
114 *
115 * The port number the socket is currently bound to. Applications can use
116 * this property to retrieve the port number actually bound to in case
117 * the port requested was 0 (=allocate a random available port).
118 *
119 * Since: 1.0.2
120 **/
121 g_object_class_install_property (gobject_class, PROP_CURRENT_PORT,
122 g_param_spec_int ("current-port", "current-port",
123 "The port number the socket is currently bound to", 0,
124 TCP_HIGHEST_PORT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
125
126 gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
127
128 gst_element_class_set_static_metadata (gstelement_class,
129 "TCP server source", "Source/Network",
130 "Receive data as a server over the network via TCP",
131 "Thomas Vander Stichele <thomas at apestaart dot org>");
132
133 gstbasesrc_class->start = gst_tcp_server_src_start;
134 gstbasesrc_class->stop = gst_tcp_server_src_stop;
135 gstbasesrc_class->unlock = gst_tcp_server_src_unlock;
136 gstbasesrc_class->unlock_stop = gst_tcp_server_src_unlock_stop;
137
138 gstpush_src_class->create = gst_tcp_server_src_create;
139
140 GST_DEBUG_CATEGORY_INIT (tcpserversrc_debug, "tcpserversrc", 0,
141 "TCP Server Source");
142 }
143
144 static void
gst_tcp_server_src_init(GstTCPServerSrc * src)145 gst_tcp_server_src_init (GstTCPServerSrc * src)
146 {
147 src->server_port = TCP_DEFAULT_PORT;
148 src->host = g_strdup (TCP_DEFAULT_HOST);
149 src->server_socket = NULL;
150 src->client_socket = NULL;
151 src->cancellable = g_cancellable_new ();
152
153 GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
154 }
155
156 static void
gst_tcp_server_src_finalize(GObject * gobject)157 gst_tcp_server_src_finalize (GObject * gobject)
158 {
159 GstTCPServerSrc *src = GST_TCP_SERVER_SRC (gobject);
160
161 if (src->cancellable)
162 g_object_unref (src->cancellable);
163 src->cancellable = NULL;
164 if (src->server_socket)
165 g_object_unref (src->server_socket);
166 src->server_socket = NULL;
167 if (src->client_socket)
168 g_object_unref (src->client_socket);
169 src->client_socket = NULL;
170
171 g_free (src->host);
172 src->host = NULL;
173
174 G_OBJECT_CLASS (parent_class)->finalize (gobject);
175 }
176
177 static GstFlowReturn
gst_tcp_server_src_create(GstPushSrc * psrc,GstBuffer ** outbuf)178 gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
179 {
180 GstTCPServerSrc *src;
181 GstFlowReturn ret = GST_FLOW_OK;
182 gssize rret, avail;
183 gsize read;
184 GError *err = NULL;
185 GstMapInfo map;
186
187 src = GST_TCP_SERVER_SRC (psrc);
188
189 if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_SERVER_SRC_OPEN))
190 goto wrong_state;
191
192 if (!src->client_socket) {
193 /* wait on server socket for connections */
194 src->client_socket =
195 g_socket_accept (src->server_socket, src->cancellable, &err);
196 if (!src->client_socket)
197 goto accept_error;
198 GST_DEBUG_OBJECT (src, "closing server socket");
199
200 if (!g_socket_close (src->server_socket, &err)) {
201 GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
202 g_clear_error (&err);
203 }
204 /* now read from the socket. */
205 }
206
207 /* if we have a client, wait for read */
208 GST_LOG_OBJECT (src, "asked for a buffer");
209
210 /* read the buffer header */
211 avail = g_socket_get_available_bytes (src->client_socket);
212 if (avail < 0) {
213 goto get_available_error;
214 } else if (avail == 0) {
215 GIOCondition condition;
216
217 if (!g_socket_condition_wait (src->client_socket,
218 G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
219 goto select_error;
220
221 condition =
222 g_socket_condition_check (src->client_socket,
223 G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
224
225 if ((condition & G_IO_ERR)) {
226 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
227 ("Socket in error state"));
228 *outbuf = NULL;
229 ret = GST_FLOW_ERROR;
230 goto done;
231 } else if ((condition & G_IO_HUP)) {
232 GST_DEBUG_OBJECT (src, "Connection closed");
233 *outbuf = NULL;
234 ret = GST_FLOW_EOS;
235 goto done;
236 }
237 avail = g_socket_get_available_bytes (src->client_socket);
238 if (avail < 0)
239 goto get_available_error;
240 }
241
242 if (avail > 0) {
243 read = MIN (avail, MAX_READ_SIZE);
244 *outbuf = gst_buffer_new_and_alloc (read);
245 gst_buffer_map (*outbuf, &map, GST_MAP_READWRITE);
246 rret =
247 g_socket_receive (src->client_socket, (gchar *) map.data, read,
248 src->cancellable, &err);
249 } else {
250 /* Connection closed */
251 rret = 0;
252 *outbuf = NULL;
253 read = 0;
254 }
255
256 if (rret == 0) {
257 GST_DEBUG_OBJECT (src, "Connection closed");
258 ret = GST_FLOW_EOS;
259 if (*outbuf) {
260 gst_buffer_unmap (*outbuf, &map);
261 gst_buffer_unref (*outbuf);
262 }
263 *outbuf = NULL;
264 } else if (rret < 0) {
265 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
266 ret = GST_FLOW_FLUSHING;
267 GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
268 } else {
269 ret = GST_FLOW_ERROR;
270 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
271 ("Failed to read from socket: %s", err->message));
272 }
273 gst_buffer_unmap (*outbuf, &map);
274 gst_buffer_unref (*outbuf);
275 *outbuf = NULL;
276 } else {
277 ret = GST_FLOW_OK;
278 gst_buffer_unmap (*outbuf, &map);
279 gst_buffer_resize (*outbuf, 0, rret);
280
281 GST_LOG_OBJECT (src,
282 "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
283 GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
284 ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
285 gst_buffer_get_size (*outbuf),
286 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
287 GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
288 GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
289 }
290 g_clear_error (&err);
291
292 done:
293 return ret;
294
295 wrong_state:
296 {
297 GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
298 return GST_FLOW_FLUSHING;
299 }
300 accept_error:
301 {
302 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
303 GST_DEBUG_OBJECT (src, "Cancelled accepting of client");
304 ret = GST_FLOW_FLUSHING;
305 } else {
306 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
307 ("Failed to accept client: %s", err->message));
308 ret = GST_FLOW_ERROR;
309 }
310 g_clear_error (&err);
311 return ret;
312 }
313 select_error:
314 {
315 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
316 GST_DEBUG_OBJECT (src, "Cancelled select");
317 ret = GST_FLOW_FLUSHING;
318 } else {
319 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
320 ("Select failed: %s", err->message));
321 ret = GST_FLOW_ERROR;
322 }
323 g_clear_error (&err);
324 return ret;
325 }
326 get_available_error:
327 {
328 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
329 ("Failed to get available bytes from socket"));
330 return GST_FLOW_ERROR;
331 }
332 }
333
334 static void
gst_tcp_server_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)335 gst_tcp_server_src_set_property (GObject * object, guint prop_id,
336 const GValue * value, GParamSpec * pspec)
337 {
338 GstTCPServerSrc *tcpserversrc = GST_TCP_SERVER_SRC (object);
339
340 switch (prop_id) {
341 case PROP_HOST:
342 if (!g_value_get_string (value)) {
343 g_warning ("host property cannot be NULL");
344 break;
345 }
346 g_free (tcpserversrc->host);
347 tcpserversrc->host = g_strdup (g_value_get_string (value));
348 break;
349 case PROP_PORT:
350 tcpserversrc->server_port = g_value_get_int (value);
351 break;
352
353 default:
354 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
355 break;
356 }
357 }
358
359 static void
gst_tcp_server_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)360 gst_tcp_server_src_get_property (GObject * object, guint prop_id,
361 GValue * value, GParamSpec * pspec)
362 {
363 GstTCPServerSrc *tcpserversrc = GST_TCP_SERVER_SRC (object);
364
365 switch (prop_id) {
366 case PROP_HOST:
367 g_value_set_string (value, tcpserversrc->host);
368 break;
369 case PROP_PORT:
370 g_value_set_int (value, tcpserversrc->server_port);
371 break;
372 case PROP_CURRENT_PORT:
373 g_value_set_int (value, g_atomic_int_get (&tcpserversrc->current_port));
374 break;
375 default:
376 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
377 break;
378 }
379 }
380
381 /* set up server */
382 static gboolean
gst_tcp_server_src_start(GstBaseSrc * bsrc)383 gst_tcp_server_src_start (GstBaseSrc * bsrc)
384 {
385 GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
386 GError *err = NULL;
387 GInetAddress *addr;
388 GSocketAddress *saddr;
389 GResolver *resolver;
390 gint bound_port = 0;
391
392 /* look up name if we need to */
393 addr = g_inet_address_new_from_string (src->host);
394 if (!addr) {
395 GList *results;
396
397 resolver = g_resolver_get_default ();
398
399 results =
400 g_resolver_lookup_by_name (resolver, src->host, src->cancellable, &err);
401 if (!results)
402 goto name_resolve;
403 addr = G_INET_ADDRESS (g_object_ref (results->data));
404
405 g_resolver_free_addresses (results);
406 g_object_unref (resolver);
407 }
408 #ifndef GST_DISABLE_GST_DEBUG
409 {
410 gchar *ip = g_inet_address_to_string (addr);
411
412 GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
413 g_free (ip);
414 }
415 #endif
416
417 saddr = g_inet_socket_address_new (addr, src->server_port);
418 g_object_unref (addr);
419
420 /* create the server listener socket */
421 src->server_socket =
422 g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
423 G_SOCKET_PROTOCOL_TCP, &err);
424 if (!src->server_socket)
425 goto no_socket;
426
427 GST_DEBUG_OBJECT (src, "opened receiving server socket");
428
429 /* bind it */
430 GST_DEBUG_OBJECT (src, "binding server socket to address");
431 if (!g_socket_bind (src->server_socket, saddr, TRUE, &err))
432 goto bind_failed;
433
434 g_object_unref (saddr);
435
436 GST_DEBUG_OBJECT (src, "listening on server socket");
437
438 g_socket_set_listen_backlog (src->server_socket, TCP_BACKLOG);
439
440 if (!g_socket_listen (src->server_socket, &err))
441 goto listen_failed;
442
443 GST_OBJECT_FLAG_SET (src, GST_TCP_SERVER_SRC_OPEN);
444
445 if (src->server_port == 0) {
446 saddr = g_socket_get_local_address (src->server_socket, NULL);
447 bound_port = g_inet_socket_address_get_port ((GInetSocketAddress *) saddr);
448 g_object_unref (saddr);
449 } else {
450 bound_port = src->server_port;
451 }
452
453 GST_DEBUG_OBJECT (src, "listening on port %d", bound_port);
454
455 g_atomic_int_set (&src->current_port, bound_port);
456 g_object_notify (G_OBJECT (src), "current-port");
457
458 return TRUE;
459
460 /* ERRORS */
461 no_socket:
462 {
463 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
464 ("Failed to create socket: %s", err->message));
465 g_clear_error (&err);
466 g_object_unref (saddr);
467 return FALSE;
468 }
469 name_resolve:
470 {
471 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
472 GST_DEBUG_OBJECT (src, "Cancelled name resolval");
473 } else {
474 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
475 ("Failed to resolve host '%s': %s", src->host, err->message));
476 }
477 g_clear_error (&err);
478 g_object_unref (resolver);
479 return FALSE;
480 }
481 bind_failed:
482 {
483 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
484 GST_DEBUG_OBJECT (src, "Cancelled binding");
485 } else {
486 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
487 ("Failed to bind on host '%s:%d': %s", src->host, src->server_port,
488 err->message));
489 }
490 g_clear_error (&err);
491 g_object_unref (saddr);
492 gst_tcp_server_src_stop (GST_BASE_SRC (src));
493 return FALSE;
494 }
495 listen_failed:
496 {
497 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
498 GST_DEBUG_OBJECT (src, "Cancelled listening");
499 } else {
500 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
501 ("Failed to listen on host '%s:%d': %s", src->host, src->server_port,
502 err->message));
503 }
504 g_clear_error (&err);
505 gst_tcp_server_src_stop (GST_BASE_SRC (src));
506 return FALSE;
507 }
508 }
509
510 static gboolean
gst_tcp_server_src_stop(GstBaseSrc * bsrc)511 gst_tcp_server_src_stop (GstBaseSrc * bsrc)
512 {
513 GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
514 GError *err = NULL;
515
516 if (src->client_socket) {
517 GST_DEBUG_OBJECT (src, "closing socket");
518
519 if (!g_socket_close (src->client_socket, &err)) {
520 GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
521 g_clear_error (&err);
522 }
523 g_object_unref (src->client_socket);
524 src->client_socket = NULL;
525 }
526
527 if (src->server_socket) {
528 GST_DEBUG_OBJECT (src, "closing socket");
529
530 if (!g_socket_close (src->server_socket, &err)) {
531 GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
532 g_clear_error (&err);
533 }
534 g_object_unref (src->server_socket);
535 src->server_socket = NULL;
536
537 g_atomic_int_set (&src->current_port, 0);
538 g_object_notify (G_OBJECT (src), "current-port");
539 }
540
541 GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
542
543 return TRUE;
544 }
545
546 /* will be called only between calls to start() and stop() */
547 static gboolean
gst_tcp_server_src_unlock(GstBaseSrc * bsrc)548 gst_tcp_server_src_unlock (GstBaseSrc * bsrc)
549 {
550 GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
551
552 g_cancellable_cancel (src->cancellable);
553
554 return TRUE;
555 }
556
557 static gboolean
gst_tcp_server_src_unlock_stop(GstBaseSrc * bsrc)558 gst_tcp_server_src_unlock_stop (GstBaseSrc * bsrc)
559 {
560 GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
561
562 g_object_unref (src->cancellable);
563 src->cancellable = g_cancellable_new ();
564
565 return TRUE;
566 }
567