• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) <2011> Collabora Ltd.
5  *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 /**
24  * SECTION:element-tcpserversrc
25  * @title: tcpserversrc
26  * @see_also: #tcpserversink
27  *
28  * ## Example launch line (server):
29  * |[
30  * gst-launch-1.0 tcpserversrc port=3000 ! fdsink fd=2
31  * ]|
32  * ## Example launch line (client):
33  * |[
34  * gst-launch-1.0 fdsrc fd=1 ! tcpclientsink port=3000
35  * ]|
36  *
37  */
38 
39 #ifdef HAVE_CONFIG_H
40 #include "config.h"
41 #endif
42 
43 #include <gst/gst-i18n-plugin.h>
44 #include "gsttcpelements.h"
45 #include "gsttcpsrcstats.h"
46 #include "gsttcpserversrc.h"
47 
48 GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug);
49 #define GST_CAT_DEFAULT tcpserversrc_debug
50 
51 #define TCP_DEFAULT_LISTEN_HOST         NULL    /* listen on all interfaces */
52 #define TCP_BACKLOG                     1       /* client connection queue */
53 
54 #define MAX_READ_SIZE                   4 * 1024
55 
56 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
57     GST_PAD_SRC,
58     GST_PAD_ALWAYS,
59     GST_STATIC_CAPS_ANY);
60 
61 enum
62 {
63   PROP_0,
64   PROP_HOST,
65   PROP_PORT,
66   PROP_CURRENT_PORT,
67   PROP_STATS,
68 };
69 
70 #define gst_tcp_server_src_parent_class parent_class
71 G_DEFINE_TYPE (GstTCPServerSrc, gst_tcp_server_src, GST_TYPE_PUSH_SRC);
72 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (tcpserversrc, "tcpserversrc",
73     GST_RANK_NONE, GST_TYPE_TCP_SERVER_SRC, tcp_element_init (plugin));
74 
75 static void gst_tcp_server_src_finalize (GObject * gobject);
76 
77 static gboolean gst_tcp_server_src_start (GstBaseSrc * bsrc);
78 static gboolean gst_tcp_server_src_stop (GstBaseSrc * bsrc);
79 static gboolean gst_tcp_server_src_unlock (GstBaseSrc * bsrc);
80 static gboolean gst_tcp_server_src_unlock_stop (GstBaseSrc * bsrc);
81 static GstFlowReturn gst_tcp_server_src_create (GstPushSrc * psrc,
82     GstBuffer ** buf);
83 
84 static void gst_tcp_server_src_set_property (GObject * object, guint prop_id,
85     const GValue * value, GParamSpec * pspec);
86 static void gst_tcp_server_src_get_property (GObject * object, guint prop_id,
87     GValue * value, GParamSpec * pspec);
88 static GstStructure *gst_tcp_server_src_get_stats (GstTCPServerSrc * src);
89 
90 static void
gst_tcp_server_src_class_init(GstTCPServerSrcClass * klass)91 gst_tcp_server_src_class_init (GstTCPServerSrcClass * klass)
92 {
93   GObjectClass *gobject_class;
94   GstElementClass *gstelement_class;
95   GstBaseSrcClass *gstbasesrc_class;
96   GstPushSrcClass *gstpush_src_class;
97 
98   gobject_class = (GObjectClass *) klass;
99   gstelement_class = (GstElementClass *) klass;
100   gstbasesrc_class = (GstBaseSrcClass *) klass;
101   gstpush_src_class = (GstPushSrcClass *) klass;
102 
103   gobject_class->set_property = gst_tcp_server_src_set_property;
104   gobject_class->get_property = gst_tcp_server_src_get_property;
105   gobject_class->finalize = gst_tcp_server_src_finalize;
106 
107   /* FIXME 2.0: Rename this to bind-address, host does not make much
108    * sense here */
109   g_object_class_install_property (gobject_class, PROP_HOST,
110       g_param_spec_string ("host", "Host", "The hostname to listen as",
111           TCP_DEFAULT_LISTEN_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
112   g_object_class_install_property (gobject_class, PROP_PORT,
113       g_param_spec_int ("port", "Port",
114           "The port to listen to (0=random available port)",
115           0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT,
116           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
117   /**
118    * GstTCPServerSrc:current-port:
119    *
120    * The port number the socket is currently bound to. Applications can use
121    * this property to retrieve the port number actually bound to in case
122    * the port requested was 0 (=allocate a random available port).
123    *
124    * Since: 1.0.2
125    **/
126   g_object_class_install_property (gobject_class, PROP_CURRENT_PORT,
127       g_param_spec_int ("current-port", "current-port",
128           "The port number the socket is currently bound to", 0,
129           TCP_HIGHEST_PORT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
130 
131   /**
132    * GstTCPServerSrc::stats:
133    *
134    * Sends a GstStructure with statistics. We count bytes-received in a
135    * platform-independent way and the rest via the tcp_info struct, if it's
136    * available. The OS takes care of the TCP layer for us so we can't know it
137    * from here.
138    *
139    * Struct members:
140    *
141    * bytes-received (uint64): Total bytes received (platform-independent)
142    * reordering (uint): Amount of reordering (linux-specific)
143    * unacked (uint): Un-acked packets (linux-specific)
144    * sacked (uint): Selective acked packets (linux-specific)
145    * lost (uint): Lost packets (linux-specific)
146    * retrans (uint): Retransmits (linux-specific)
147    * fackets (uint): Forward acknowledgement (linux-specific)
148    *
149    * Since: 1.18
150    */
151   g_object_class_install_property (gobject_class, PROP_STATS,
152       g_param_spec_boxed ("stats", "Stats", "Retrieve a statistics structure",
153           GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
154 
155   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
156 
157   gst_element_class_set_static_metadata (gstelement_class,
158       "TCP server source", "Source/Network",
159       "Receive data as a server over the network via TCP",
160       "Thomas Vander Stichele <thomas at apestaart dot org>");
161 
162   gstbasesrc_class->start = gst_tcp_server_src_start;
163   gstbasesrc_class->stop = gst_tcp_server_src_stop;
164   gstbasesrc_class->unlock = gst_tcp_server_src_unlock;
165   gstbasesrc_class->unlock_stop = gst_tcp_server_src_unlock_stop;
166 
167   gstpush_src_class->create = gst_tcp_server_src_create;
168 
169   GST_DEBUG_CATEGORY_INIT (tcpserversrc_debug, "tcpserversrc", 0,
170       "TCP Server Source");
171 }
172 
173 static void
gst_tcp_server_src_init(GstTCPServerSrc * src)174 gst_tcp_server_src_init (GstTCPServerSrc * src)
175 {
176   src->server_port = TCP_DEFAULT_PORT;
177   src->host = g_strdup (TCP_DEFAULT_HOST);
178   src->server_socket = NULL;
179   src->client_socket = NULL;
180   src->cancellable = g_cancellable_new ();
181 
182   GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
183 }
184 
185 static void
gst_tcp_server_src_finalize(GObject * gobject)186 gst_tcp_server_src_finalize (GObject * gobject)
187 {
188   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (gobject);
189 
190   if (src->cancellable)
191     g_object_unref (src->cancellable);
192   src->cancellable = NULL;
193   if (src->server_socket)
194     g_object_unref (src->server_socket);
195   src->server_socket = NULL;
196   if (src->client_socket)
197     g_object_unref (src->client_socket);
198   src->client_socket = NULL;
199 
200   g_free (src->host);
201   src->host = NULL;
202 
203   gst_clear_structure (&src->stats);
204 
205   G_OBJECT_CLASS (parent_class)->finalize (gobject);
206 }
207 
208 static GstFlowReturn
gst_tcp_server_src_create(GstPushSrc * psrc,GstBuffer ** outbuf)209 gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
210 {
211   GstTCPServerSrc *src;
212   GstFlowReturn ret = GST_FLOW_OK;
213   gssize rret, avail;
214   gsize read;
215   GError *err = NULL;
216   GstMapInfo map;
217 
218   src = GST_TCP_SERVER_SRC (psrc);
219 
220   if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_SERVER_SRC_OPEN))
221     goto wrong_state;
222 
223   if (!src->client_socket) {
224     /* wait on server socket for connections */
225     src->client_socket =
226         g_socket_accept (src->server_socket, src->cancellable, &err);
227     if (!src->client_socket)
228       goto accept_error;
229     GST_DEBUG_OBJECT (src, "closing server socket");
230 
231     if (!g_socket_close (src->server_socket, &err)) {
232       GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
233       g_clear_error (&err);
234     }
235     /* now read from the socket. */
236   }
237 
238   /* if we have a client, wait for read */
239   GST_LOG_OBJECT (src, "asked for a buffer");
240 
241   /* read the buffer header */
242   avail = g_socket_get_available_bytes (src->client_socket);
243   if (avail < 0) {
244     goto get_available_error;
245   } else if (avail == 0) {
246     GIOCondition condition;
247 
248     if (!g_socket_condition_wait (src->client_socket,
249             G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
250       goto select_error;
251 
252     condition =
253         g_socket_condition_check (src->client_socket,
254         G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
255 
256     if ((condition & G_IO_ERR)) {
257       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
258           ("Socket in error state"));
259       *outbuf = NULL;
260       ret = GST_FLOW_ERROR;
261       goto done;
262     } else if ((condition & G_IO_HUP)) {
263       GST_DEBUG_OBJECT (src, "Connection closed");
264       *outbuf = NULL;
265       ret = GST_FLOW_EOS;
266       goto done;
267     }
268     avail = g_socket_get_available_bytes (src->client_socket);
269     if (avail < 0)
270       goto get_available_error;
271   }
272 
273   if (avail > 0) {
274     read = MIN (avail, MAX_READ_SIZE);
275     *outbuf = gst_buffer_new_and_alloc (read);
276     gst_buffer_map (*outbuf, &map, GST_MAP_READWRITE);
277     rret =
278         g_socket_receive (src->client_socket, (gchar *) map.data, read,
279         src->cancellable, &err);
280   } else {
281     /* Connection closed */
282     rret = 0;
283     *outbuf = NULL;
284     read = 0;
285   }
286 
287   if (rret == 0) {
288     GST_DEBUG_OBJECT (src, "Connection closed");
289     ret = GST_FLOW_EOS;
290     if (*outbuf) {
291       gst_buffer_unmap (*outbuf, &map);
292       gst_buffer_unref (*outbuf);
293     }
294     *outbuf = NULL;
295   } else if (rret < 0) {
296     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
297       ret = GST_FLOW_FLUSHING;
298       GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
299     } else {
300       ret = GST_FLOW_ERROR;
301       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
302           ("Failed to read from socket: %s", err->message));
303     }
304     gst_buffer_unmap (*outbuf, &map);
305     gst_buffer_unref (*outbuf);
306     *outbuf = NULL;
307   } else {
308     ret = GST_FLOW_OK;
309     gst_buffer_unmap (*outbuf, &map);
310     gst_buffer_resize (*outbuf, 0, rret);
311     src->bytes_received += read;
312 
313     GST_LOG_OBJECT (src,
314         "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
315         GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
316         ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
317         gst_buffer_get_size (*outbuf),
318         GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
319         GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
320         GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
321   }
322   g_clear_error (&err);
323 
324 done:
325   return ret;
326 
327 wrong_state:
328   {
329     GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
330     return GST_FLOW_FLUSHING;
331   }
332 accept_error:
333   {
334     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
335       GST_DEBUG_OBJECT (src, "Cancelled accepting of client");
336       ret = GST_FLOW_FLUSHING;
337     } else {
338       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
339           ("Failed to accept client: %s", err->message));
340       ret = GST_FLOW_ERROR;
341     }
342     g_clear_error (&err);
343     return ret;
344   }
345 select_error:
346   {
347     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
348       GST_DEBUG_OBJECT (src, "Cancelled select");
349       ret = GST_FLOW_FLUSHING;
350     } else {
351       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
352           ("Select failed: %s", err->message));
353       ret = GST_FLOW_ERROR;
354     }
355     g_clear_error (&err);
356     return ret;
357   }
358 get_available_error:
359   {
360     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
361         ("Failed to get available bytes from socket"));
362     return GST_FLOW_ERROR;
363   }
364 }
365 
366 static void
gst_tcp_server_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)367 gst_tcp_server_src_set_property (GObject * object, guint prop_id,
368     const GValue * value, GParamSpec * pspec)
369 {
370   GstTCPServerSrc *tcpserversrc = GST_TCP_SERVER_SRC (object);
371 
372   switch (prop_id) {
373     case PROP_HOST:
374       if (!g_value_get_string (value)) {
375         g_warning ("host property cannot be NULL");
376         break;
377       }
378       g_free (tcpserversrc->host);
379       tcpserversrc->host = g_value_dup_string (value);
380       break;
381     case PROP_PORT:
382       tcpserversrc->server_port = g_value_get_int (value);
383       break;
384 
385     default:
386       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
387       break;
388   }
389 }
390 
391 static void
gst_tcp_server_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)392 gst_tcp_server_src_get_property (GObject * object, guint prop_id,
393     GValue * value, GParamSpec * pspec)
394 {
395   GstTCPServerSrc *tcpserversrc = GST_TCP_SERVER_SRC (object);
396 
397   switch (prop_id) {
398     case PROP_HOST:
399       g_value_set_string (value, tcpserversrc->host);
400       break;
401     case PROP_PORT:
402       g_value_set_int (value, tcpserversrc->server_port);
403       break;
404     case PROP_CURRENT_PORT:
405       g_value_set_int (value, g_atomic_int_get (&tcpserversrc->current_port));
406       break;
407     case PROP_STATS:
408       g_value_take_boxed (value, gst_tcp_server_src_get_stats (tcpserversrc));
409       break;
410     default:
411       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
412       break;
413   }
414 }
415 
416 /* set up server */
417 static gboolean
gst_tcp_server_src_start(GstBaseSrc * bsrc)418 gst_tcp_server_src_start (GstBaseSrc * bsrc)
419 {
420   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
421   GError *err = NULL;
422   GList *addrs;
423   GList *cur_addr;
424   GSocketAddress *saddr = NULL;
425   gint bound_port = 0;
426 
427   src->bytes_received = 0;
428   gst_clear_structure (&src->stats);
429 
430   addrs =
431       tcp_get_addresses (GST_ELEMENT (src), src->host, src->cancellable, &err);
432   if (!addrs)
433     goto name_resolve;
434 
435   /* iterate over addresses until one works */
436   cur_addr = addrs;
437   src->server_socket =
438       tcp_create_socket (GST_ELEMENT (src), &cur_addr, src->server_port,
439       &saddr, &err);
440   g_list_free_full (addrs, g_object_unref);
441 
442   if (!src->server_socket)
443     goto no_socket;
444 
445   GST_DEBUG_OBJECT (src, "opened receiving server socket");
446 
447   /* bind it */
448   GST_DEBUG_OBJECT (src, "binding server socket to address");
449   if (!g_socket_bind (src->server_socket, saddr, TRUE, &err))
450     goto bind_failed;
451 
452   g_object_unref (saddr);
453 
454   GST_DEBUG_OBJECT (src, "listening on server socket");
455 
456   g_socket_set_listen_backlog (src->server_socket, TCP_BACKLOG);
457 
458   if (!g_socket_listen (src->server_socket, &err))
459     goto listen_failed;
460 
461   GST_OBJECT_FLAG_SET (src, GST_TCP_SERVER_SRC_OPEN);
462 
463   if (src->server_port == 0) {
464     saddr = g_socket_get_local_address (src->server_socket, NULL);
465     bound_port = g_inet_socket_address_get_port ((GInetSocketAddress *) saddr);
466     g_object_unref (saddr);
467   } else {
468     bound_port = src->server_port;
469   }
470 
471   GST_DEBUG_OBJECT (src, "listening on port %d", bound_port);
472 
473   g_atomic_int_set (&src->current_port, bound_port);
474   g_object_notify (G_OBJECT (src), "current-port");
475 
476   return TRUE;
477 
478   /* ERRORS */
479 no_socket:
480   {
481     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
482         ("Failed to create socket: %s", err->message));
483     g_clear_error (&err);
484     return FALSE;
485   }
486 name_resolve:
487   {
488     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
489       GST_DEBUG_OBJECT (src, "Cancelled name resolution");
490     } else {
491       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
492           ("Failed to resolve host '%s': %s", src->host, err->message));
493     }
494     g_clear_error (&err);
495     return FALSE;
496   }
497 bind_failed:
498   {
499     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
500       GST_DEBUG_OBJECT (src, "Cancelled binding");
501     } else {
502       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
503           ("Failed to bind on host '%s:%d': %s", src->host, src->server_port,
504               err->message));
505     }
506     g_clear_error (&err);
507     g_object_unref (saddr);
508     gst_tcp_server_src_stop (GST_BASE_SRC (src));
509     return FALSE;
510   }
511 listen_failed:
512   {
513     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
514       GST_DEBUG_OBJECT (src, "Cancelled listening");
515     } else {
516       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
517           ("Failed to listen on host '%s:%d': %s", src->host, src->server_port,
518               err->message));
519     }
520     g_clear_error (&err);
521     gst_tcp_server_src_stop (GST_BASE_SRC (src));
522     return FALSE;
523   }
524 }
525 
526 static gboolean
gst_tcp_server_src_stop(GstBaseSrc * bsrc)527 gst_tcp_server_src_stop (GstBaseSrc * bsrc)
528 {
529   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
530   GError *err = NULL;
531 
532   if (src->client_socket) {
533     GST_DEBUG_OBJECT (src, "closing socket");
534 
535     src->stats = gst_tcp_server_src_get_stats (src);
536 
537     if (!g_socket_close (src->client_socket, &err)) {
538       GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
539       g_clear_error (&err);
540     }
541     g_object_unref (src->client_socket);
542     src->client_socket = NULL;
543   }
544 
545   if (src->server_socket) {
546     GST_DEBUG_OBJECT (src, "closing socket");
547 
548     if (!g_socket_close (src->server_socket, &err)) {
549       GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
550       g_clear_error (&err);
551     }
552     g_object_unref (src->server_socket);
553     src->server_socket = NULL;
554 
555     g_atomic_int_set (&src->current_port, 0);
556     g_object_notify (G_OBJECT (src), "current-port");
557   }
558 
559   GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN);
560 
561   return TRUE;
562 }
563 
564 /* will be called only between calls to start() and stop() */
565 static gboolean
gst_tcp_server_src_unlock(GstBaseSrc * bsrc)566 gst_tcp_server_src_unlock (GstBaseSrc * bsrc)
567 {
568   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
569 
570   g_cancellable_cancel (src->cancellable);
571 
572   return TRUE;
573 }
574 
575 static gboolean
gst_tcp_server_src_unlock_stop(GstBaseSrc * bsrc)576 gst_tcp_server_src_unlock_stop (GstBaseSrc * bsrc)
577 {
578   GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc);
579 
580   g_object_unref (src->cancellable);
581   src->cancellable = g_cancellable_new ();
582 
583   return TRUE;
584 }
585 
586 
587 static GstStructure *
gst_tcp_server_src_get_stats(GstTCPServerSrc * src)588 gst_tcp_server_src_get_stats (GstTCPServerSrc * src)
589 {
590   GstStructure *s;
591 
592   /* we can't get the values post stop so just return the saved ones */
593   if (src->stats)
594     return gst_structure_copy (src->stats);
595 
596   s = gst_structure_new ("GstTCPServerSrcStats",
597       "bytes-received", G_TYPE_UINT64, src->bytes_received, NULL);
598 
599   gst_tcp_stats_from_socket (s, src->client_socket);
600 
601   return s;
602 }
603