1 /* GStreamer
2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4 * Copyright (C) <2011> Collabora Ltd.
5 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
16 *
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 /**
24 * SECTION:element-tcpclientsink
25 * @title: tcpclientsink
26 * @see_also: #tcpclientsink
27 *
28 * ## Example launch line (server):
29 * |[
30 * nc -l -p 3000
31 * ]|
32 * ## Example launch line (client):
33 * |[
34 * gst-launch-1.0 fdsink fd=1 ! tcpclientsink port=3000
35 * ]|
36 * everything you type in the client is shown on the server (fd=1 means
37 * standard input which is the command line input file descriptor)
38 *
39 */
40
41 #ifdef HAVE_CONFIG_H
42 #include "config.h"
43 #endif
44 #include <gst/gst-i18n-plugin.h>
45
46 #include "gsttcpelements.h"
47 #include "gsttcpclientsink.h"
48
49 /* TCPClientSink signals and args */
50 enum
51 {
52 FRAME_ENCODED,
53 /* FILL ME */
54 LAST_SIGNAL
55 };
56
57 GST_DEBUG_CATEGORY_STATIC (tcpclientsink_debug);
58 #define GST_CAT_DEFAULT (tcpclientsink_debug)
59
60 enum
61 {
62 PROP_0,
63 PROP_HOST,
64 PROP_PORT
65 };
66
67 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
68 GST_PAD_SINK,
69 GST_PAD_ALWAYS,
70 GST_STATIC_CAPS_ANY);
71
72 static void gst_tcp_client_sink_finalize (GObject * gobject);
73
74 static gboolean gst_tcp_client_sink_setcaps (GstBaseSink * bsink,
75 GstCaps * caps);
76 static GstFlowReturn gst_tcp_client_sink_render (GstBaseSink * bsink,
77 GstBuffer * buf);
78 static gboolean gst_tcp_client_sink_start (GstBaseSink * bsink);
79 static gboolean gst_tcp_client_sink_stop (GstBaseSink * bsink);
80 static gboolean gst_tcp_client_sink_unlock (GstBaseSink * bsink);
81 static gboolean gst_tcp_client_sink_unlock_stop (GstBaseSink * bsink);
82
83 static void gst_tcp_client_sink_set_property (GObject * object, guint prop_id,
84 const GValue * value, GParamSpec * pspec);
85 static void gst_tcp_client_sink_get_property (GObject * object, guint prop_id,
86 GValue * value, GParamSpec * pspec);
87
88
89 /*static guint gst_tcp_client_sink_signals[LAST_SIGNAL] = { 0 }; */
90
91 #define gst_tcp_client_sink_parent_class parent_class
92 G_DEFINE_TYPE (GstTCPClientSink, gst_tcp_client_sink, GST_TYPE_BASE_SINK);
93 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (tcpclientsink, "tcpclientsink",
94 GST_RANK_NONE, GST_TYPE_TCP_CLIENT_SINK, tcp_element_init (plugin));
95
96 static void
gst_tcp_client_sink_class_init(GstTCPClientSinkClass * klass)97 gst_tcp_client_sink_class_init (GstTCPClientSinkClass * klass)
98 {
99 GObjectClass *gobject_class;
100 GstElementClass *gstelement_class;
101 GstBaseSinkClass *gstbasesink_class;
102
103 gobject_class = (GObjectClass *) klass;
104 gstelement_class = (GstElementClass *) klass;
105 gstbasesink_class = (GstBaseSinkClass *) klass;
106
107 parent_class = g_type_class_peek_parent (klass);
108
109 gobject_class->set_property = gst_tcp_client_sink_set_property;
110 gobject_class->get_property = gst_tcp_client_sink_get_property;
111 gobject_class->finalize = gst_tcp_client_sink_finalize;
112
113 g_object_class_install_property (gobject_class, PROP_HOST,
114 g_param_spec_string ("host", "Host", "The host/IP to send the packets to",
115 TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
116 g_object_class_install_property (gobject_class, PROP_PORT,
117 g_param_spec_int ("port", "Port", "The port to send the packets to",
118 0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT,
119 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
120
121 gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
122
123 gst_element_class_set_static_metadata (gstelement_class,
124 "TCP client sink", "Sink/Network",
125 "Send data as a client over the network via TCP",
126 "Thomas Vander Stichele <thomas at apestaart dot org>");
127
128 gstbasesink_class->start = gst_tcp_client_sink_start;
129 gstbasesink_class->stop = gst_tcp_client_sink_stop;
130 gstbasesink_class->set_caps = gst_tcp_client_sink_setcaps;
131 gstbasesink_class->render = gst_tcp_client_sink_render;
132 gstbasesink_class->unlock = gst_tcp_client_sink_unlock;
133 gstbasesink_class->unlock_stop = gst_tcp_client_sink_unlock_stop;
134
135 GST_DEBUG_CATEGORY_INIT (tcpclientsink_debug, "tcpclientsink", 0, "TCP sink");
136 }
137
138 static void
gst_tcp_client_sink_init(GstTCPClientSink * this)139 gst_tcp_client_sink_init (GstTCPClientSink * this)
140 {
141 this->host = g_strdup (TCP_DEFAULT_HOST);
142 this->port = TCP_DEFAULT_PORT;
143
144 this->socket = NULL;
145 this->cancellable = g_cancellable_new ();
146
147 GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN);
148 }
149
150 static void
gst_tcp_client_sink_finalize(GObject * gobject)151 gst_tcp_client_sink_finalize (GObject * gobject)
152 {
153 GstTCPClientSink *this = GST_TCP_CLIENT_SINK (gobject);
154
155 if (this->cancellable)
156 g_object_unref (this->cancellable);
157 this->cancellable = NULL;
158
159 if (this->socket)
160 g_object_unref (this->socket);
161 this->socket = NULL;
162
163 g_free (this->host);
164 this->host = NULL;
165
166 G_OBJECT_CLASS (parent_class)->finalize (gobject);
167 }
168
169 static gboolean
gst_tcp_client_sink_setcaps(GstBaseSink * bsink,GstCaps * caps)170 gst_tcp_client_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
171 {
172 return TRUE;
173 }
174
175 static GstFlowReturn
gst_tcp_client_sink_render(GstBaseSink * bsink,GstBuffer * buf)176 gst_tcp_client_sink_render (GstBaseSink * bsink, GstBuffer * buf)
177 {
178 GstTCPClientSink *sink;
179 GstMapInfo map;
180 gsize written = 0;
181 gssize rret;
182 GError *err = NULL;
183
184 sink = GST_TCP_CLIENT_SINK (bsink);
185
186 g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, GST_TCP_CLIENT_SINK_OPEN),
187 GST_FLOW_FLUSHING);
188
189 gst_buffer_map (buf, &map, GST_MAP_READ);
190 GST_LOG_OBJECT (sink, "writing %" G_GSIZE_FORMAT " bytes for buffer data",
191 map.size);
192
193 /* write buffer data */
194 while (written < map.size) {
195 rret =
196 g_socket_send (sink->socket, (gchar *) map.data + written,
197 map.size - written, sink->cancellable, &err);
198 if (rret < 0)
199 goto write_error;
200 written += rret;
201 }
202 gst_buffer_unmap (buf, &map);
203
204 sink->data_written += written;
205
206 return GST_FLOW_OK;
207
208 /* ERRORS */
209 write_error:
210 {
211 GstFlowReturn ret;
212
213 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
214 ret = GST_FLOW_FLUSHING;
215 GST_DEBUG_OBJECT (sink, "Cancelled reading from socket");
216 } else {
217 GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
218 (_("Error while sending data to \"%s:%d\"."), sink->host, sink->port),
219 ("Only %" G_GSIZE_FORMAT " of %" G_GSIZE_FORMAT " bytes written: %s",
220 written, map.size, err->message));
221 ret = GST_FLOW_ERROR;
222 }
223 gst_buffer_unmap (buf, &map);
224 g_clear_error (&err);
225 return ret;
226 }
227 }
228
229 static void
gst_tcp_client_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)230 gst_tcp_client_sink_set_property (GObject * object, guint prop_id,
231 const GValue * value, GParamSpec * pspec)
232 {
233 GstTCPClientSink *tcpclientsink;
234
235 g_return_if_fail (GST_IS_TCP_CLIENT_SINK (object));
236 tcpclientsink = GST_TCP_CLIENT_SINK (object);
237
238 switch (prop_id) {
239 case PROP_HOST:
240 if (!g_value_get_string (value)) {
241 g_warning ("host property cannot be NULL");
242 break;
243 }
244 g_free (tcpclientsink->host);
245 tcpclientsink->host = g_value_dup_string (value);
246 break;
247 case PROP_PORT:
248 tcpclientsink->port = g_value_get_int (value);
249 break;
250
251 default:
252 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
253 break;
254 }
255 }
256
257 static void
gst_tcp_client_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)258 gst_tcp_client_sink_get_property (GObject * object, guint prop_id,
259 GValue * value, GParamSpec * pspec)
260 {
261 GstTCPClientSink *tcpclientsink;
262
263 g_return_if_fail (GST_IS_TCP_CLIENT_SINK (object));
264 tcpclientsink = GST_TCP_CLIENT_SINK (object);
265
266 switch (prop_id) {
267 case PROP_HOST:
268 g_value_set_string (value, tcpclientsink->host);
269 break;
270 case PROP_PORT:
271 g_value_set_int (value, tcpclientsink->port);
272 break;
273 default:
274 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
275 break;
276 }
277 }
278
279
280 /* create a socket for sending to remote machine */
281 static gboolean
gst_tcp_client_sink_start(GstBaseSink * bsink)282 gst_tcp_client_sink_start (GstBaseSink * bsink)
283 {
284 GstTCPClientSink *this = GST_TCP_CLIENT_SINK (bsink);
285 GError *err = NULL;
286 GList *addrs;
287 GList *cur_addr;
288 GSocketAddress *saddr = NULL;
289
290 if (GST_OBJECT_FLAG_IS_SET (this, GST_TCP_CLIENT_SINK_OPEN))
291 return TRUE;
292
293 addrs =
294 tcp_get_addresses (GST_ELEMENT (this), this->host, this->cancellable,
295 &err);
296 if (!addrs)
297 goto name_resolve;
298
299 GST_DEBUG_OBJECT (this, "opening sending client socket to %s:%d", this->host,
300 this->port);
301
302 cur_addr = addrs;
303 while (cur_addr) {
304 /* clean up from possible previous iterations */
305 g_clear_error (&err);
306 g_clear_object (&this->socket);
307
308 /* iterate over addresses until one works */
309 this->socket =
310 tcp_create_socket (GST_ELEMENT (this), &cur_addr, this->port, &saddr,
311 &err);
312 if (!this->socket)
313 goto no_socket;
314
315 GST_DEBUG_OBJECT (this, "opened sending client socket");
316
317 /* connect to server */
318 if (g_socket_connect (this->socket, saddr, this->cancellable, &err))
319 break;
320
321 /* failed to connect, release and try next address... */
322 g_clear_object (&saddr);
323 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
324 goto connect_failed;
325 }
326
327 /* final connect attempt failed */
328 if (err)
329 goto connect_failed;
330
331 GST_DEBUG_OBJECT (this, "connected to %s:%d", this->host, this->port);
332 g_list_free_full (g_steal_pointer (&addrs), g_object_unref);
333 g_object_unref (saddr);
334
335 GST_OBJECT_FLAG_SET (this, GST_TCP_CLIENT_SINK_OPEN);
336
337 this->data_written = 0;
338
339 return TRUE;
340
341 name_resolve:
342 {
343 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
344 GST_DEBUG_OBJECT (this, "Cancelled name resolution");
345 } else {
346 GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
347 ("Failed to resolve host '%s': %s", this->host, err->message));
348 }
349 g_clear_error (&err);
350 return FALSE;
351 }
352 no_socket:
353 {
354 g_list_free_full (g_steal_pointer (&addrs), g_object_unref);
355 GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
356 ("Failed to create socket: %s", err->message));
357 g_clear_error (&err);
358 return FALSE;
359 }
360 connect_failed:
361 {
362 g_list_free_full (g_steal_pointer (&addrs), g_object_unref);
363 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
364 GST_DEBUG_OBJECT (this, "Cancelled connecting");
365 } else {
366 GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
367 ("Failed to connect to host '%s:%d': %s", this->host, this->port,
368 err->message));
369 }
370 g_clear_error (&err);
371 /* pretend we opened ok for proper cleanup to happen */
372 GST_OBJECT_FLAG_SET (this, GST_TCP_CLIENT_SINK_OPEN);
373 gst_tcp_client_sink_stop (GST_BASE_SINK (this));
374 return FALSE;
375 }
376 }
377
378 static gboolean
gst_tcp_client_sink_stop(GstBaseSink * bsink)379 gst_tcp_client_sink_stop (GstBaseSink * bsink)
380 {
381 GstTCPClientSink *this = GST_TCP_CLIENT_SINK (bsink);
382 GError *err = NULL;
383
384 if (!GST_OBJECT_FLAG_IS_SET (this, GST_TCP_CLIENT_SINK_OPEN))
385 return TRUE;
386
387 if (this->socket) {
388 GST_DEBUG_OBJECT (this, "closing socket");
389
390 if (!g_socket_close (this->socket, &err)) {
391 GST_ERROR_OBJECT (this, "Failed to close socket: %s", err->message);
392 g_clear_error (&err);
393 }
394 g_object_unref (this->socket);
395 this->socket = NULL;
396 }
397
398 GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN);
399
400 return TRUE;
401 }
402
403 /* will be called only between calls to start() and stop() */
404 static gboolean
gst_tcp_client_sink_unlock(GstBaseSink * bsink)405 gst_tcp_client_sink_unlock (GstBaseSink * bsink)
406 {
407 GstTCPClientSink *sink = GST_TCP_CLIENT_SINK (bsink);
408
409 GST_DEBUG_OBJECT (sink, "set to flushing");
410 g_cancellable_cancel (sink->cancellable);
411
412 return TRUE;
413 }
414
415 /* will be called only between calls to start() and stop() */
416 static gboolean
gst_tcp_client_sink_unlock_stop(GstBaseSink * bsink)417 gst_tcp_client_sink_unlock_stop (GstBaseSink * bsink)
418 {
419 GstTCPClientSink *sink = GST_TCP_CLIENT_SINK (bsink);
420
421 GST_DEBUG_OBJECT (sink, "unset flushing");
422 g_object_unref (sink->cancellable);
423 sink->cancellable = g_cancellable_new ();
424
425 return TRUE;
426 }
427