1 /* GStreamer
2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4 * Copyright (C) <2011> Collabora Ltd.
5 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
16 *
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 /**
24 * SECTION:element-tcpclientsrc
25 * @title: tcpclientsrc
26 * @see_also: #tcpclientsink
27 *
28 * ## Example launch line (server):
29 * |[
30 * nc -l -p 3000
31 * ]|
32 * ## Example launch line (client):
33 * |[
34 * gst-launch-1.0 tcpclientsrc port=3000 ! fdsink fd=2
35 * ]|
36 * everything you type in the server is shown on the client.
37 * If you want to detect network failures and/or limit the time your tcp client
38 * keeps waiting for data from server setting a timeout value can be useful.
39 *
40 */
41
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45
46 #include <gst/gst-i18n-plugin.h>
47 #include "gsttcpelements.h"
48 #include "gsttcpclientsrc.h"
49 #include "gsttcpsrcstats.h"
50
51 GST_DEBUG_CATEGORY_STATIC (tcpclientsrc_debug);
52 #define GST_CAT_DEFAULT tcpclientsrc_debug
53
54 #define MAX_READ_SIZE 4 * 1024
55 #define TCP_DEFAULT_TIMEOUT 0
56
57
58 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
59 GST_PAD_SRC,
60 GST_PAD_ALWAYS,
61 GST_STATIC_CAPS_ANY);
62
63
64 enum
65 {
66 PROP_0,
67 PROP_HOST,
68 PROP_PORT,
69 PROP_TIMEOUT,
70 PROP_STATS,
71 };
72
73 #define gst_tcp_client_src_parent_class parent_class
74 G_DEFINE_TYPE (GstTCPClientSrc, gst_tcp_client_src, GST_TYPE_PUSH_SRC);
75 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (tcpclientsrc, "tcpclientsrc",
76 GST_RANK_NONE, GST_TYPE_TCP_CLIENT_SRC, tcp_element_init (plugin));
77
78 static void gst_tcp_client_src_finalize (GObject * gobject);
79
80 static GstCaps *gst_tcp_client_src_getcaps (GstBaseSrc * psrc,
81 GstCaps * filter);
82
83 static GstFlowReturn gst_tcp_client_src_create (GstPushSrc * psrc,
84 GstBuffer ** outbuf);
85 static gboolean gst_tcp_client_src_stop (GstBaseSrc * bsrc);
86 static gboolean gst_tcp_client_src_start (GstBaseSrc * bsrc);
87 static gboolean gst_tcp_client_src_unlock (GstBaseSrc * bsrc);
88 static gboolean gst_tcp_client_src_unlock_stop (GstBaseSrc * bsrc);
89
90 static void gst_tcp_client_src_set_property (GObject * object, guint prop_id,
91 const GValue * value, GParamSpec * pspec);
92 static void gst_tcp_client_src_get_property (GObject * object, guint prop_id,
93 GValue * value, GParamSpec * pspec);
94 static GstStructure *gst_tcp_client_src_get_stats (GstTCPClientSrc * src);
95
96 static void
gst_tcp_client_src_class_init(GstTCPClientSrcClass * klass)97 gst_tcp_client_src_class_init (GstTCPClientSrcClass * klass)
98 {
99 GObjectClass *gobject_class;
100 GstElementClass *gstelement_class;
101 GstBaseSrcClass *gstbasesrc_class;
102 GstPushSrcClass *gstpush_src_class;
103
104 gobject_class = (GObjectClass *) klass;
105 gstelement_class = (GstElementClass *) klass;
106 gstbasesrc_class = (GstBaseSrcClass *) klass;
107 gstpush_src_class = (GstPushSrcClass *) klass;
108
109 gobject_class->set_property = gst_tcp_client_src_set_property;
110 gobject_class->get_property = gst_tcp_client_src_get_property;
111 gobject_class->finalize = gst_tcp_client_src_finalize;
112
113 g_object_class_install_property (gobject_class, PROP_HOST,
114 g_param_spec_string ("host", "Host",
115 "The host IP address to receive packets from", TCP_DEFAULT_HOST,
116 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
117 g_object_class_install_property (gobject_class, PROP_PORT,
118 g_param_spec_int ("port", "Port", "The port to receive packets from", 0,
119 TCP_HIGHEST_PORT, TCP_DEFAULT_PORT,
120 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
121
122 /**
123 * GstTCPClientSrc::timeout;
124 *
125 * Value in seconds to timeout a blocking I/O (0 = No timeout).
126 *
127 * Since: 1.12
128 */
129 g_object_class_install_property (gobject_class, PROP_TIMEOUT,
130 g_param_spec_uint ("timeout", "timeout",
131 "Value in seconds to timeout a blocking I/O. 0 = No timeout. ", 0,
132 G_MAXUINT, TCP_DEFAULT_TIMEOUT,
133 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
134
135 /**
136 * GstTCPClientSrc::stats:
137 *
138 * Sends a GstStructure with statistics. We count bytes-received in a
139 * platform-independent way and the rest via the tcp_info struct, if it's
140 * available. The OS takes care of the TCP layer for us so we can't know it
141 * from here.
142 *
143 * Struct members:
144 *
145 * bytes-received (uint64): Total bytes received (platform-independent)
146 * reordering (uint): Amount of reordering (linux-specific)
147 * unacked (uint): Un-acked packets (linux-specific)
148 * sacked (uint): Selective acked packets (linux-specific)
149 * lost (uint): Lost packets (linux-specific)
150 * retrans (uint): Retransmits (linux-specific)
151 * fackets (uint): Forward acknowledgement (linux-specific)
152 *
153 * Since: 1.18
154 */
155 g_object_class_install_property (gobject_class, PROP_STATS,
156 g_param_spec_boxed ("stats", "Stats", "Retrieve a statistics structure",
157 GST_TYPE_STRUCTURE, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
158
159 gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
160
161 gst_element_class_set_static_metadata (gstelement_class,
162 "TCP client source", "Source/Network",
163 "Receive data as a client over the network via TCP",
164 "Thomas Vander Stichele <thomas at apestaart dot org>");
165
166 gstbasesrc_class->get_caps = gst_tcp_client_src_getcaps;
167 gstbasesrc_class->start = gst_tcp_client_src_start;
168 gstbasesrc_class->stop = gst_tcp_client_src_stop;
169 gstbasesrc_class->unlock = gst_tcp_client_src_unlock;
170 gstbasesrc_class->unlock_stop = gst_tcp_client_src_unlock_stop;
171
172 gstpush_src_class->create = gst_tcp_client_src_create;
173
174 GST_DEBUG_CATEGORY_INIT (tcpclientsrc_debug, "tcpclientsrc", 0,
175 "TCP Client Source");
176 }
177
178 static void
gst_tcp_client_src_init(GstTCPClientSrc * this)179 gst_tcp_client_src_init (GstTCPClientSrc * this)
180 {
181 this->port = TCP_DEFAULT_PORT;
182 this->host = g_strdup (TCP_DEFAULT_HOST);
183 this->timeout = TCP_DEFAULT_TIMEOUT;
184 this->socket = NULL;
185 this->cancellable = g_cancellable_new ();
186
187 GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SRC_OPEN);
188 }
189
190 static void
gst_tcp_client_src_finalize(GObject * gobject)191 gst_tcp_client_src_finalize (GObject * gobject)
192 {
193 GstTCPClientSrc *this = GST_TCP_CLIENT_SRC (gobject);
194
195 if (this->cancellable)
196 g_object_unref (this->cancellable);
197 this->cancellable = NULL;
198 if (this->socket)
199 g_object_unref (this->socket);
200 this->socket = NULL;
201 g_free (this->host);
202 this->host = NULL;
203 gst_clear_structure (&this->stats);
204
205 G_OBJECT_CLASS (parent_class)->finalize (gobject);
206 }
207
208 static GstCaps *
gst_tcp_client_src_getcaps(GstBaseSrc * bsrc,GstCaps * filter)209 gst_tcp_client_src_getcaps (GstBaseSrc * bsrc, GstCaps * filter)
210 {
211 GstTCPClientSrc *src;
212 GstCaps *caps = NULL;
213
214 src = GST_TCP_CLIENT_SRC (bsrc);
215
216 caps = (filter ? gst_caps_ref (filter) : gst_caps_new_any ());
217
218 GST_DEBUG_OBJECT (src, "returning caps %" GST_PTR_FORMAT, caps);
219 g_assert (GST_IS_CAPS (caps));
220 return caps;
221 }
222
223 static GstFlowReturn
gst_tcp_client_src_create(GstPushSrc * psrc,GstBuffer ** outbuf)224 gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
225 {
226 GstTCPClientSrc *src;
227 GstFlowReturn ret = GST_FLOW_OK;
228 gssize rret;
229 GError *err = NULL;
230 GstMapInfo map;
231 gssize avail, read;
232
233 src = GST_TCP_CLIENT_SRC (psrc);
234
235 if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_CLIENT_SRC_OPEN))
236 goto wrong_state;
237
238 GST_LOG_OBJECT (src, "asked for a buffer");
239
240 /* read the buffer header */
241 avail = g_socket_get_available_bytes (src->socket);
242 if (avail < 0) {
243 goto get_available_error;
244 } else if (avail == 0) {
245 GIOCondition condition;
246
247 if (!g_socket_condition_wait (src->socket,
248 G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP, src->cancellable, &err))
249 goto select_error;
250
251 condition =
252 g_socket_condition_check (src->socket,
253 G_IO_IN | G_IO_PRI | G_IO_ERR | G_IO_HUP);
254
255 if ((condition & G_IO_ERR)) {
256 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
257 ("Socket in error state"));
258 *outbuf = NULL;
259 ret = GST_FLOW_ERROR;
260 goto done;
261 } else if ((condition & G_IO_HUP)) {
262 GST_DEBUG_OBJECT (src, "Connection closed");
263 *outbuf = NULL;
264 ret = GST_FLOW_EOS;
265 goto done;
266 }
267 avail = g_socket_get_available_bytes (src->socket);
268 if (avail < 0)
269 goto get_available_error;
270 }
271
272 if (avail > 0) {
273 read = MIN (avail, MAX_READ_SIZE);
274 *outbuf = gst_buffer_new_and_alloc (read);
275 gst_buffer_map (*outbuf, &map, GST_MAP_READWRITE);
276 rret =
277 g_socket_receive (src->socket, (gchar *) map.data, read,
278 src->cancellable, &err);
279 } else {
280 /* Connection closed */
281 *outbuf = NULL;
282 read = 0;
283 rret = 0;
284 }
285
286 if (rret == 0) {
287 GST_DEBUG_OBJECT (src, "Connection closed");
288 ret = GST_FLOW_EOS;
289 if (*outbuf) {
290 gst_buffer_unmap (*outbuf, &map);
291 gst_buffer_unref (*outbuf);
292 }
293 *outbuf = NULL;
294 } else if (rret < 0) {
295 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
296 ret = GST_FLOW_FLUSHING;
297 GST_DEBUG_OBJECT (src, "Cancelled reading from socket");
298 } else {
299 ret = GST_FLOW_ERROR;
300 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
301 ("Failed to read from socket: %s", err->message));
302 }
303 gst_buffer_unmap (*outbuf, &map);
304 gst_buffer_unref (*outbuf);
305 *outbuf = NULL;
306 } else {
307 ret = GST_FLOW_OK;
308 gst_buffer_unmap (*outbuf, &map);
309 gst_buffer_resize (*outbuf, 0, rret);
310 src->bytes_received += read;
311
312 GST_LOG_OBJECT (src,
313 "Returning buffer from _get of size %" G_GSIZE_FORMAT ", ts %"
314 GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT
315 ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT,
316 gst_buffer_get_size (*outbuf),
317 GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)),
318 GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)),
319 GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf));
320 }
321 g_clear_error (&err);
322
323 done:
324 return ret;
325
326 select_error:
327 {
328 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
329 GST_DEBUG_OBJECT (src, "Cancelled");
330 ret = GST_FLOW_FLUSHING;
331 } else {
332 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
333 ("Select failed: %s", err->message));
334 ret = GST_FLOW_ERROR;
335 }
336 g_clear_error (&err);
337 return ret;
338 }
339 get_available_error:
340 {
341 GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
342 ("Failed to get available bytes from socket"));
343 return GST_FLOW_ERROR;
344 }
345 wrong_state:
346 {
347 GST_DEBUG_OBJECT (src, "connection to closed, cannot read data");
348 return GST_FLOW_FLUSHING;
349 }
350 }
351
352 static void
gst_tcp_client_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)353 gst_tcp_client_src_set_property (GObject * object, guint prop_id,
354 const GValue * value, GParamSpec * pspec)
355 {
356 GstTCPClientSrc *tcpclientsrc = GST_TCP_CLIENT_SRC (object);
357
358 switch (prop_id) {
359 case PROP_HOST:
360 if (!g_value_get_string (value)) {
361 g_warning ("host property cannot be NULL");
362 break;
363 }
364 g_free (tcpclientsrc->host);
365 tcpclientsrc->host = g_value_dup_string (value);
366 break;
367 case PROP_PORT:
368 tcpclientsrc->port = g_value_get_int (value);
369 break;
370 case PROP_TIMEOUT:
371 tcpclientsrc->timeout = g_value_get_uint (value);
372 break;
373
374 default:
375 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
376 break;
377 }
378 }
379
380 static void
gst_tcp_client_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)381 gst_tcp_client_src_get_property (GObject * object, guint prop_id,
382 GValue * value, GParamSpec * pspec)
383 {
384 GstTCPClientSrc *tcpclientsrc = GST_TCP_CLIENT_SRC (object);
385
386 switch (prop_id) {
387 case PROP_HOST:
388 g_value_set_string (value, tcpclientsrc->host);
389 break;
390 case PROP_PORT:
391 g_value_set_int (value, tcpclientsrc->port);
392 break;
393 case PROP_TIMEOUT:
394 g_value_set_uint (value, tcpclientsrc->timeout);
395 break;
396 case PROP_STATS:
397 g_value_take_boxed (value, gst_tcp_client_src_get_stats (tcpclientsrc));
398 break;
399 default:
400 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
401 break;
402 }
403 }
404
405 /* create a socket for connecting to remote server */
406 static gboolean
gst_tcp_client_src_start(GstBaseSrc * bsrc)407 gst_tcp_client_src_start (GstBaseSrc * bsrc)
408 {
409 GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
410 GError *err = NULL;
411 GList *addrs;
412 GList *cur_addr;
413 GSocketAddress *saddr;
414
415 src->bytes_received = 0;
416 gst_clear_structure (&src->stats);
417
418 addrs =
419 tcp_get_addresses (GST_ELEMENT (src), src->host, src->cancellable, &err);
420 if (!addrs)
421 goto name_resolve;
422
423 /* create receiving client socket */
424 GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d",
425 src->host, src->port);
426
427 cur_addr = addrs;
428 while (cur_addr) {
429 /* clean up from possible previous iterations */
430 g_clear_error (&err);
431 g_clear_object (&src->socket);
432
433 /* iterate over addresses until one works */
434 src->socket =
435 tcp_create_socket (GST_ELEMENT (src), &cur_addr, src->port, &saddr,
436 &err);
437 if (!src->socket)
438 goto no_socket;
439
440 g_socket_set_timeout (src->socket, src->timeout);
441
442 GST_DEBUG_OBJECT (src, "opened receiving client socket");
443
444 /* connect to server */
445 if (g_socket_connect (src->socket, saddr, src->cancellable, &err))
446 break;
447
448 /* failed to connect, release and try next address... */
449 g_clear_object (&saddr);
450 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
451 goto connect_failed;
452 }
453
454 /* final connect attempt failed */
455 if (err)
456 goto connect_failed;
457
458 GST_DEBUG_OBJECT (src, "connected to %s:%d", src->host, src->port);
459 g_list_free_full (g_steal_pointer (&addrs), g_object_unref);
460 g_clear_object (&saddr);
461
462 GST_OBJECT_FLAG_SET (src, GST_TCP_CLIENT_SRC_OPEN);
463
464 return TRUE;
465
466 name_resolve:
467 {
468 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
469 GST_DEBUG_OBJECT (src, "Cancelled name resolution");
470 } else {
471 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
472 ("Failed to resolve host '%s': %s", src->host, err->message));
473 }
474 g_clear_error (&err);
475 return FALSE;
476 }
477 no_socket:
478 {
479 g_list_free_full (g_steal_pointer (&addrs), g_object_unref);
480 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
481 ("Failed to create socket: %s", err->message));
482 g_clear_error (&err);
483 return FALSE;
484 }
485 connect_failed:
486 {
487 g_list_free_full (g_steal_pointer (&addrs), g_object_unref);
488 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
489 GST_DEBUG_OBJECT (src, "Cancelled connecting");
490 } else {
491 GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
492 ("Failed to connect to host '%s:%d': %s", src->host, src->port,
493 err->message));
494 }
495 g_clear_error (&err);
496 /* pretend we opened ok for proper cleanup to happen */
497 GST_OBJECT_FLAG_SET (src, GST_TCP_CLIENT_SRC_OPEN);
498 gst_tcp_client_src_stop (GST_BASE_SRC (src));
499 return FALSE;
500 }
501 }
502
503 /* close the socket and associated resources
504 * unset OPEN flag
505 * used both to recover from errors and go to NULL state */
506 static gboolean
gst_tcp_client_src_stop(GstBaseSrc * bsrc)507 gst_tcp_client_src_stop (GstBaseSrc * bsrc)
508 {
509 GstTCPClientSrc *src;
510 GError *err = NULL;
511
512 src = GST_TCP_CLIENT_SRC (bsrc);
513
514 if (src->socket) {
515 GST_DEBUG_OBJECT (src, "closing socket");
516
517 src->stats = gst_tcp_client_src_get_stats (src);
518
519 if (!g_socket_close (src->socket, &err)) {
520 GST_ERROR_OBJECT (src, "Failed to close socket: %s", err->message);
521 g_clear_error (&err);
522 }
523 g_object_unref (src->socket);
524 src->socket = NULL;
525 }
526
527 GST_OBJECT_FLAG_UNSET (src, GST_TCP_CLIENT_SRC_OPEN);
528
529 return TRUE;
530 }
531
532 /* will be called only between calls to start() and stop() */
533 static gboolean
gst_tcp_client_src_unlock(GstBaseSrc * bsrc)534 gst_tcp_client_src_unlock (GstBaseSrc * bsrc)
535 {
536 GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
537
538 GST_DEBUG_OBJECT (src, "set to flushing");
539 g_cancellable_cancel (src->cancellable);
540
541 return TRUE;
542 }
543
544 /* will be called only between calls to start() and stop() */
545 static gboolean
gst_tcp_client_src_unlock_stop(GstBaseSrc * bsrc)546 gst_tcp_client_src_unlock_stop (GstBaseSrc * bsrc)
547 {
548 GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc);
549
550 GST_DEBUG_OBJECT (src, "unset flushing");
551 g_object_unref (src->cancellable);
552 src->cancellable = g_cancellable_new ();
553
554 return TRUE;
555 }
556
557 static GstStructure *
gst_tcp_client_src_get_stats(GstTCPClientSrc * src)558 gst_tcp_client_src_get_stats (GstTCPClientSrc * src)
559 {
560 GstStructure *s;
561
562 /* we can't get the values post stop so just return the saved ones */
563 if (src->stats)
564 return gst_structure_copy (src->stats);
565
566 s = gst_structure_new ("GstTCPClientSrcStats",
567 "bytes-received", G_TYPE_UINT64, src->bytes_received, NULL);
568
569 gst_tcp_stats_from_socket (s, src->socket);
570
571 return s;
572 }
573