• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) <2011> Collabora Ltd.
5  *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 /**
24  * SECTION:element-tcpserversrc
25  * @title: tcpserversrc
26  * @see_also: #tcpserversink
27  *
28  * ## Example launch line (server):
29  * |[
30  * gst-launch-1.0 tcpserversrc port=3000 ! fdsink fd=2
31  * ]|
32  * ## Example launch line (client):
33  * |[
34  * gst-launch-1.0 fdsrc fd=1 ! tcpclientsink port=3000
35  * ]|
36  *
37  */
38 
39 #ifdef HAVE_CONFIG_H
40 #include "config.h"
41 #endif
42 
43 #include <gst/gst-i18n-plugin.h>
44 #include "gsttcp.h"
45 #include "gsttcpserversrc.h"
46 
47 GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug);
48 #define GST_CAT_DEFAULT tcpserversrc_debug
49 
50 #define TCP_DEFAULT_LISTEN_HOST         NULL    /* listen on all interfaces */
51 #define TCP_BACKLOG                     1       /* client connection queue */
52 
53 #define MAX_READ_SIZE                   4 * 1024
54 
55 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
56     GST_PAD_SRC,
57     GST_PAD_ALWAYS,
58     GST_STATIC_CAPS_ANY);
59 
60 enum
61 {
62   PROP_0,
63   PROP_HOST,
64   PROP_PORT,
65   PROP_CURRENT_PORT
66 };
67 
68 #define gst_tcp_server_src_parent_class parent_class
69 G_DEFINE_TYPE (GstTCPServerSrc, gst_tcp_server_src, GST_TYPE_PUSH_SRC);
70 
71 static void gst_tcp_server_src_finalize (GObject * gobject);
72 
73 static gboolean gst_tcp_server_src_start (GstBaseSrc * bsrc);
74 static gboolean gst_tcp_server_src_stop (GstBaseSrc * bsrc);
75 static gboolean gst_tcp_server_src_unlock (GstBaseSrc * bsrc);
76 static gboolean gst_tcp_server_src_unlock_stop (GstBaseSrc * bsrc);
77 static GstFlowReturn gst_tcp_server_src_create (GstPushSrc * psrc,
78     GstBuffer ** buf);
79 
80 static void gst_tcp_server_src_set_property (GObject * object, guint prop_id,
81     const GValue * value, GParamSpec * pspec);
82 static void gst_tcp_server_src_get_property (GObject * object, guint prop_id,
83     GValue * value, GParamSpec * pspec);
84 
85 static void
gst_tcp_server_src_class_init(GstTCPServerSrcClass * klass)86 gst_tcp_server_src_class_init (GstTCPServerSrcClass * klass)
87 {
88   GObjectClass *gobject_class;
89   GstElementClass *gstelement_class;
90   GstBaseSrcClass *gstbasesrc_class;
91   GstPushSrcClass *gstpush_src_class;
92 
93   gobject_class = (GObjectClass *) klass;
94   gstelement_class = (GstElementClass *) klass;
95   gstbasesrc_class = (GstBaseSrcClass *) klass;
96   gstpush_src_class = (GstPushSrcClass *) klass;
97 
98   gobject_class->set_property = gst_tcp_server_src_set_property;
99   gobject_class->get_property = gst_tcp_server_src_get_property;
100   gobject_class->finalize = gst_tcp_server_src_finalize;
101 
102   /* FIXME 2.0: Rename this to bind-address, host does not make much
103    * sense here */
104   g_object_class_install_property (gobject_class, PROP_HOST,
105       g_param_spec_string ("host", "Host", "The hostname to listen as",
106           TCP_DEFAULT_LISTEN_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
107   g_object_class_install_property (gobject_class, PROP_PORT,
108       g_param_spec_int ("port", "Port",
109           "The port to listen to (0=random available port)",
110           0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT,
111           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
112   /**
113    * GstTCPServerSrc:current-port:
114    *
115    * The port number the socket is currently bound to. Applications can use
116    * this property to retrieve the port number actually bound to in case
117    * the port requested was 0 (=allocate a random available port).
118    *
119    * Since: 1.0.2
120    **/
121   g_object_class_install_property (gobject_class, PROP_CURRENT_PORT,
122       g_param_spec_int ("current-port", "current-port",
123           "The port number the socket is currently bound to", 0,
124           TCP_HIGHEST_PORT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
125 
126   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
127 
128   gst_element_class_set_static_metadata (gstelement_class,
129       "TCP server source", "Source/Network",
130       "Receive data as a server over the network via TCP",
131       "Thomas Vander Stichele <thomas at apestaart dot org>");
132 
133   gstbasesrc_class->start = gst_tcp_server_src_start;
134   gstbasesrc_class->stop = gst_tcp_server_src_stop;
135   gstbasesrc_class->unlock = gst_tcp_server_src_unlock;
136   gstbasesrc_class->unlock_stop = gst_tcp_server_src_unlock_stop;
137 
138   gstpush_src_class->create = gst_tcp_server_src_create;
139 
140   GST_DEBUG_CATEGORY_INIT (tcpserversrc_debug, "tcpserversrc", 0,
141       "TCP Server Source");
142 }
143 
144 static void
gst_tcp_server_src_init(GstTCPServerSrc * src)145 gst_tcp_server_src_init (GstTCPServerSrc * src)
146 {
147   src->server_port = TCP_DEFAULT_PORT;
148   src->host = g_strdup (TCP_DEFAULT_HOST);
149   src->server_socket = NULL;
150   src->client_socket = NULL;
151   src->cancellable = g_cancellable_new ();
152 
153   GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
154 }
155 
156 static void
gst_tcp_server_src_finalize(GObject * gobject)157 gst_tcp_server_src_finalize (GObject * gobject)
158 {
159   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (gobject);
160 
161   if (src->cancellable)
162     g_object_unref (src->cancellable);
163   src->cancellable = NULL;
164   if (src->server_socket)
165     g_object_unref (src->server_socket);
166   src->server_socket = NULL;
167   if (src->client_socket)
168     g_object_unref (src->client_socket);
169   src->client_socket = NULL;
170 
171   g_free (src->host);
172   src->host = NULL;
173 
174   G_OBJECT_CLASS (parent_class)->finalize (gobject);
175 }
176 
177 static GstFlowReturn
gst_tcp_server_src_create(GstPushSrc * psrc,GstBuffer ** outbuf)178 gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
179 {
180   GstTCPServerSrc *src;
181   GstFlowReturn ret = GST_FLOW_OK;
182   gssize rret, avail;
183   gsize read;
184   GError *err = NULL;
185   GstMapInfo map;
186 
187   src = GST_TCP_SERVER_SRC (psrc);
188 
189   if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_SERVER_SRC_OPEN))
190     goto wrong_state;
191 
192   if (!src->client_socket) {
193     /* wait on server socket for connections */
194     src->client_socket =
195         g_socket_accept (src->server_socket, src->cancellable, &err);
196     if (!src->client_socket)
197       goto accept_error;
198     GST_DEBUG_OBJECT (src, "closing server socket");
199 
200     if (!g_socket_close (src->server_socket, &err)) {
201       GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
202       g_clear_error (&err);
203     }
204     /* now read from the socket. */
205   }
206 
207   /* if we have a client, wait for read */
208   GST_LOG_OBJECT (src, "asked for a buffer");
209 
210   /* read the buffer header */
211   avail = g_socket_get_available_bytes (src->client_socket);
212   if (avail < 0) {
213     goto get_available_error;
214   } else if (avail == 0) {
215     GIOCondition condition;
216 
217     if (!g_socket_condition_wait (src->client_socket,
218             G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
219       goto select_error;
220 
221     condition =
222         g_socket_condition_check (src->client_socket,
223         G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
224 
225     if ((condition & G_IO_ERR)) {
226       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
227           ("Socket in error state"));
228       *outbuf = NULL;
229       ret = GST_FLOW_ERROR;
230       goto done;
231     } else if ((condition & G_IO_HUP)) {
232       GST_DEBUG_OBJECT (src, "Connection closed");
233       *outbuf = NULL;
234       ret = GST_FLOW_EOS;
235       goto done;
236     }
237     avail = g_socket_get_available_bytes (src->client_socket);
238     if (avail < 0)
239       goto get_available_error;
240   }
241 
242   if (avail > 0) {
243     read = MIN (avail, MAX_READ_SIZE);
244     *outbuf = gst_buffer_new_and_alloc (read);
245     gst_buffer_map (*outbuf, &map, GST_MAP_READWRITE);
246     rret =
247         g_socket_receive (src->client_socket, (gchar *) map.data, read,
248         src->cancellable, &err);
249   } else {
250     /* Connection closed */
251     rret = 0;
252     *outbuf = NULL;
253     read = 0;
254   }
255 
256   if (rret == 0) {
257     GST_DEBUG_OBJECT (src, "Connection closed");
258     ret = GST_FLOW_EOS;
259     if (*outbuf) {
260       gst_buffer_unmap (*outbuf, &map);
261       gst_buffer_unref (*outbuf);
262     }
263     *outbuf = NULL;
264   } else if (rret < 0) {
265     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
266       ret = GST_FLOW_FLUSHING;
267       GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
268     } else {
269       ret = GST_FLOW_ERROR;
270       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
271           ("Failed to read from socket: %s", err->message));
272     }
273     gst_buffer_unmap (*outbuf, &map);
274     gst_buffer_unref (*outbuf);
275     *outbuf = NULL;
276   } else {
277     ret = GST_FLOW_OK;
278     gst_buffer_unmap (*outbuf, &map);
279     gst_buffer_resize (*outbuf, 0, rret);
280 
281     GST_LOG_OBJECT (src,
282         "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
283         GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
284         ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
285         gst_buffer_get_size (*outbuf),
286         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
287         GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
288         GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
289   }
290   g_clear_error (&err);
291 
292 done:
293   return ret;
294 
295 wrong_state:
296   {
297     GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
298     return GST_FLOW_FLUSHING;
299   }
300 accept_error:
301   {
302     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
303       GST_DEBUG_OBJECT (src, "Cancelled accepting of client");
304       ret = GST_FLOW_FLUSHING;
305     } else {
306       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
307           ("Failed to accept client: %s", err->message));
308       ret = GST_FLOW_ERROR;
309     }
310     g_clear_error (&err);
311     return ret;
312   }
313 select_error:
314   {
315     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
316       GST_DEBUG_OBJECT (src, "Cancelled select");
317       ret = GST_FLOW_FLUSHING;
318     } else {
319       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
320           ("Select failed: %s", err->message));
321       ret = GST_FLOW_ERROR;
322     }
323     g_clear_error (&err);
324     return ret;
325   }
326 get_available_error:
327   {
328     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
329         ("Failed to get available bytes from socket"));
330     return GST_FLOW_ERROR;
331   }
332 }
333 
334 static void
gst_tcp_server_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)335 gst_tcp_server_src_set_property (GObject * object, guint prop_id,
336     const GValue * value, GParamSpec * pspec)
337 {
338   GstTCPServerSrc *tcpserversrc = GST_TCP_SERVER_SRC (object);
339 
340   switch (prop_id) {
341     case PROP_HOST:
342       if (!g_value_get_string (value)) {
343         g_warning ("host property cannot be NULL");
344         break;
345       }
346       g_free (tcpserversrc->host);
347       tcpserversrc->host = g_strdup (g_value_get_string (value));
348       break;
349     case PROP_PORT:
350       tcpserversrc->server_port = g_value_get_int (value);
351       break;
352 
353     default:
354       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
355       break;
356   }
357 }
358 
359 static void
gst_tcp_server_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)360 gst_tcp_server_src_get_property (GObject * object, guint prop_id,
361     GValue * value, GParamSpec * pspec)
362 {
363   GstTCPServerSrc *tcpserversrc = GST_TCP_SERVER_SRC (object);
364 
365   switch (prop_id) {
366     case PROP_HOST:
367       g_value_set_string (value, tcpserversrc->host);
368       break;
369     case PROP_PORT:
370       g_value_set_int (value, tcpserversrc->server_port);
371       break;
372     case PROP_CURRENT_PORT:
373       g_value_set_int (value, g_atomic_int_get (&tcpserversrc->current_port));
374       break;
375     default:
376       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
377       break;
378   }
379 }
380 
381 /* set up server */
382 static gboolean
gst_tcp_server_src_start(GstBaseSrc * bsrc)383 gst_tcp_server_src_start (GstBaseSrc * bsrc)
384 {
385   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
386   GError *err = NULL;
387   GInetAddress *addr;
388   GSocketAddress *saddr;
389   GResolver *resolver;
390   gint bound_port = 0;
391 
392   /* look up name if we need to */
393   addr = g_inet_address_new_from_string (src->host);
394   if (!addr) {
395     GList *results;
396 
397     resolver = g_resolver_get_default ();
398 
399     results =
400         g_resolver_lookup_by_name (resolver, src->host, src->cancellable, &err);
401     if (!results)
402       goto name_resolve;
403     addr = G_INET_ADDRESS (g_object_ref (results->data));
404 
405     g_resolver_free_addresses (results);
406     g_object_unref (resolver);
407   }
408 #ifndef GST_DISABLE_GST_DEBUG
409   {
410     gchar *ip = g_inet_address_to_string (addr);
411 
412     GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip);
413     g_free (ip);
414   }
415 #endif
416 
417   saddr = g_inet_socket_address_new (addr, src->server_port);
418   g_object_unref (addr);
419 
420   /* create the server listener socket */
421   src->server_socket =
422       g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
423       G_SOCKET_PROTOCOL_TCP, &err);
424   if (!src->server_socket)
425     goto no_socket;
426 
427   GST_DEBUG_OBJECT (src, "opened receiving server socket");
428 
429   /* bind it */
430   GST_DEBUG_OBJECT (src, "binding server socket to address");
431   if (!g_socket_bind (src->server_socket, saddr, TRUE, &err))
432     goto bind_failed;
433 
434   g_object_unref (saddr);
435 
436   GST_DEBUG_OBJECT (src, "listening on server socket");
437 
438   g_socket_set_listen_backlog (src->server_socket, TCP_BACKLOG);
439 
440   if (!g_socket_listen (src->server_socket, &err))
441     goto listen_failed;
442 
443   GST_OBJECT_FLAG_SET (src, GST_TCP_SERVER_SRC_OPEN);
444 
445   if (src->server_port == 0) {
446     saddr = g_socket_get_local_address (src->server_socket, NULL);
447     bound_port = g_inet_socket_address_get_port ((GInetSocketAddress *) saddr);
448     g_object_unref (saddr);
449   } else {
450     bound_port = src->server_port;
451   }
452 
453   GST_DEBUG_OBJECT (src, "listening on port %d", bound_port);
454 
455   g_atomic_int_set (&src->current_port, bound_port);
456   g_object_notify (G_OBJECT (src), "current-port");
457 
458   return TRUE;
459 
460   /* ERRORS */
461 no_socket:
462   {
463     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
464         ("Failed to create socket: %s", err->message));
465     g_clear_error (&err);
466     g_object_unref (saddr);
467     return FALSE;
468   }
469 name_resolve:
470   {
471     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
472       GST_DEBUG_OBJECT (src, "Cancelled name resolval");
473     } else {
474       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
475           ("Failed to resolve host '%s': %s", src->host, err->message));
476     }
477     g_clear_error (&err);
478     g_object_unref (resolver);
479     return FALSE;
480   }
481 bind_failed:
482   {
483     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
484       GST_DEBUG_OBJECT (src, "Cancelled binding");
485     } else {
486       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
487           ("Failed to bind on host '%s:%d': %s", src->host, src->server_port,
488               err->message));
489     }
490     g_clear_error (&err);
491     g_object_unref (saddr);
492     gst_tcp_server_src_stop (GST_BASE_SRC (src));
493     return FALSE;
494   }
495 listen_failed:
496   {
497     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
498       GST_DEBUG_OBJECT (src, "Cancelled listening");
499     } else {
500       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
501           ("Failed to listen on host '%s:%d': %s", src->host, src->server_port,
502               err->message));
503     }
504     g_clear_error (&err);
505     gst_tcp_server_src_stop (GST_BASE_SRC (src));
506     return FALSE;
507   }
508 }
509 
510 static gboolean
gst_tcp_server_src_stop(GstBaseSrc * bsrc)511 gst_tcp_server_src_stop (GstBaseSrc * bsrc)
512 {
513   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
514   GError *err = NULL;
515 
516   if (src->client_socket) {
517     GST_DEBUG_OBJECT (src, "closing socket");
518 
519     if (!g_socket_close (src->client_socket, &err)) {
520       GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
521       g_clear_error (&err);
522     }
523     g_object_unref (src->client_socket);
524     src->client_socket = NULL;
525   }
526 
527   if (src->server_socket) {
528     GST_DEBUG_OBJECT (src, "closing socket");
529 
530     if (!g_socket_close (src->server_socket, &err)) {
531       GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
532       g_clear_error (&err);
533     }
534     g_object_unref (src->server_socket);
535     src->server_socket = NULL;
536 
537     g_atomic_int_set (&src->current_port, 0);
538     g_object_notify (G_OBJECT (src), "current-port");
539   }
540 
541   GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
542 
543   return TRUE;
544 }
545 
546 /* will be called only between calls to start() and stop() */
547 static gboolean
gst_tcp_server_src_unlock(GstBaseSrc * bsrc)548 gst_tcp_server_src_unlock (GstBaseSrc * bsrc)
549 {
550   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
551 
552   g_cancellable_cancel (src->cancellable);
553 
554   return TRUE;
555 }
556 
557 static gboolean
gst_tcp_server_src_unlock_stop(GstBaseSrc * bsrc)558 gst_tcp_server_src_unlock_stop (GstBaseSrc * bsrc)
559 {
560   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
561 
562   g_object_unref (src->cancellable);
563   src->cancellable = g_cancellable_new ();
564 
565   return TRUE;
566 }
567