1 /* GStreamer
2 * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3 * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4 * Copyright (C) <2011> Collabora Ltd.
5 * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Library General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Library General Public License for more details.
16 *
17 * You should have received a copy of the GNU Library General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 /**
24 * SECTION:element-tcpclientsink
25 * @title: tcpclientsink
26 * @see_also: #tcpclientsink
27 *
28 * ## Example launch line (server):
29 * |[
30 * nc -l -p 3000
31 * ]|
32 * ## Example launch line (client):
33 * |[
34 * gst-launch-1.0 fdsink fd=1 ! tcpclientsink port=3000
35 * ]|
36 * everything you type in the client is shown on the server (fd=1 means
37 * standard input which is the command line input file descriptor)
38 *
39 */
40
41 #ifdef HAVE_CONFIG_H
42 #include "config.h"
43 #endif
44 #include <gst/gst-i18n-plugin.h>
45
46 #include "gsttcp.h"
47 #include "gsttcpclientsink.h"
48
49 /* TCPClientSink signals and args */
50 enum
51 {
52 FRAME_ENCODED,
53 /* FILL ME */
54 LAST_SIGNAL
55 };
56
57 GST_DEBUG_CATEGORY_STATIC (tcpclientsink_debug);
58 #define GST_CAT_DEFAULT (tcpclientsink_debug)
59
60 enum
61 {
62 PROP_0,
63 PROP_HOST,
64 PROP_PORT
65 };
66
67 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
68 GST_PAD_SINK,
69 GST_PAD_ALWAYS,
70 GST_STATIC_CAPS_ANY);
71
72 static void gst_tcp_client_sink_finalize (GObject * gobject);
73
74 static gboolean gst_tcp_client_sink_setcaps (GstBaseSink * bsink,
75 GstCaps * caps);
76 static GstFlowReturn gst_tcp_client_sink_render (GstBaseSink * bsink,
77 GstBuffer * buf);
78 static gboolean gst_tcp_client_sink_start (GstBaseSink * bsink);
79 static gboolean gst_tcp_client_sink_stop (GstBaseSink * bsink);
80 static gboolean gst_tcp_client_sink_unlock (GstBaseSink * bsink);
81 static gboolean gst_tcp_client_sink_unlock_stop (GstBaseSink * bsink);
82
83 static void gst_tcp_client_sink_set_property (GObject * object, guint prop_id,
84 const GValue * value, GParamSpec * pspec);
85 static void gst_tcp_client_sink_get_property (GObject * object, guint prop_id,
86 GValue * value, GParamSpec * pspec);
87
88
89 /*static guint gst_tcp_client_sink_signals[LAST_SIGNAL] = { 0 }; */
90
91 #define gst_tcp_client_sink_parent_class parent_class
92 G_DEFINE_TYPE (GstTCPClientSink, gst_tcp_client_sink, GST_TYPE_BASE_SINK);
93
94 static void
gst_tcp_client_sink_class_init(GstTCPClientSinkClass * klass)95 gst_tcp_client_sink_class_init (GstTCPClientSinkClass * klass)
96 {
97 GObjectClass *gobject_class;
98 GstElementClass *gstelement_class;
99 GstBaseSinkClass *gstbasesink_class;
100
101 gobject_class = (GObjectClass *) klass;
102 gstelement_class = (GstElementClass *) klass;
103 gstbasesink_class = (GstBaseSinkClass *) klass;
104
105 parent_class = g_type_class_peek_parent (klass);
106
107 gobject_class->set_property = gst_tcp_client_sink_set_property;
108 gobject_class->get_property = gst_tcp_client_sink_get_property;
109 gobject_class->finalize = gst_tcp_client_sink_finalize;
110
111 g_object_class_install_property (gobject_class, PROP_HOST,
112 g_param_spec_string ("host", "Host", "The host/IP to send the packets to",
113 TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
114 g_object_class_install_property (gobject_class, PROP_PORT,
115 g_param_spec_int ("port", "Port", "The port to send the packets to",
116 0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT,
117 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
118
119 gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
120
121 gst_element_class_set_static_metadata (gstelement_class,
122 "TCP client sink", "Sink/Network",
123 "Send data as a client over the network via TCP",
124 "Thomas Vander Stichele <thomas at apestaart dot org>");
125
126 gstbasesink_class->start = gst_tcp_client_sink_start;
127 gstbasesink_class->stop = gst_tcp_client_sink_stop;
128 gstbasesink_class->set_caps = gst_tcp_client_sink_setcaps;
129 gstbasesink_class->render = gst_tcp_client_sink_render;
130 gstbasesink_class->unlock = gst_tcp_client_sink_unlock;
131 gstbasesink_class->unlock_stop = gst_tcp_client_sink_unlock_stop;
132
133 GST_DEBUG_CATEGORY_INIT (tcpclientsink_debug, "tcpclientsink", 0, "TCP sink");
134 }
135
136 static void
gst_tcp_client_sink_init(GstTCPClientSink * this)137 gst_tcp_client_sink_init (GstTCPClientSink * this)
138 {
139 this->host = g_strdup (TCP_DEFAULT_HOST);
140 this->port = TCP_DEFAULT_PORT;
141
142 this->socket = NULL;
143 this->cancellable = g_cancellable_new ();
144
145 GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN);
146 }
147
148 static void
gst_tcp_client_sink_finalize(GObject * gobject)149 gst_tcp_client_sink_finalize (GObject * gobject)
150 {
151 GstTCPClientSink *this = GST_TCP_CLIENT_SINK (gobject);
152
153 if (this->cancellable)
154 g_object_unref (this->cancellable);
155 this->cancellable = NULL;
156
157 if (this->socket)
158 g_object_unref (this->socket);
159 this->socket = NULL;
160
161 g_free (this->host);
162 this->host = NULL;
163
164 G_OBJECT_CLASS (parent_class)->finalize (gobject);
165 }
166
167 static gboolean
gst_tcp_client_sink_setcaps(GstBaseSink * bsink,GstCaps * caps)168 gst_tcp_client_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
169 {
170 return TRUE;
171 }
172
173 static GstFlowReturn
gst_tcp_client_sink_render(GstBaseSink * bsink,GstBuffer * buf)174 gst_tcp_client_sink_render (GstBaseSink * bsink, GstBuffer * buf)
175 {
176 GstTCPClientSink *sink;
177 GstMapInfo map;
178 gsize written = 0;
179 gssize rret;
180 GError *err = NULL;
181
182 sink = GST_TCP_CLIENT_SINK (bsink);
183
184 g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, GST_TCP_CLIENT_SINK_OPEN),
185 GST_FLOW_FLUSHING);
186
187 gst_buffer_map (buf, &map, GST_MAP_READ);
188 GST_LOG_OBJECT (sink, "writing %" G_GSIZE_FORMAT " bytes for buffer data",
189 map.size);
190
191 /* write buffer data */
192 while (written < map.size) {
193 rret =
194 g_socket_send (sink->socket, (gchar *) map.data + written,
195 map.size - written, sink->cancellable, &err);
196 if (rret < 0)
197 goto write_error;
198 written += rret;
199 }
200 gst_buffer_unmap (buf, &map);
201
202 sink->data_written += written;
203
204 return GST_FLOW_OK;
205
206 /* ERRORS */
207 write_error:
208 {
209 GstFlowReturn ret;
210
211 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
212 ret = GST_FLOW_FLUSHING;
213 GST_DEBUG_OBJECT (sink, "Cancelled reading from socket");
214 } else {
215 GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
216 (_("Error while sending data to \"%s:%d\"."), sink->host, sink->port),
217 ("Only %" G_GSIZE_FORMAT " of %" G_GSIZE_FORMAT " bytes written: %s",
218 written, map.size, err->message));
219 ret = GST_FLOW_ERROR;
220 }
221 gst_buffer_unmap (buf, &map);
222 g_clear_error (&err);
223 return ret;
224 }
225 }
226
227 static void
gst_tcp_client_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)228 gst_tcp_client_sink_set_property (GObject * object, guint prop_id,
229 const GValue * value, GParamSpec * pspec)
230 {
231 GstTCPClientSink *tcpclientsink;
232
233 g_return_if_fail (GST_IS_TCP_CLIENT_SINK (object));
234 tcpclientsink = GST_TCP_CLIENT_SINK (object);
235
236 switch (prop_id) {
237 case PROP_HOST:
238 if (!g_value_get_string (value)) {
239 g_warning ("host property cannot be NULL");
240 break;
241 }
242 g_free (tcpclientsink->host);
243 tcpclientsink->host = g_strdup (g_value_get_string (value));
244 break;
245 case PROP_PORT:
246 tcpclientsink->port = g_value_get_int (value);
247 break;
248
249 default:
250 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
251 break;
252 }
253 }
254
255 static void
gst_tcp_client_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)256 gst_tcp_client_sink_get_property (GObject * object, guint prop_id,
257 GValue * value, GParamSpec * pspec)
258 {
259 GstTCPClientSink *tcpclientsink;
260
261 g_return_if_fail (GST_IS_TCP_CLIENT_SINK (object));
262 tcpclientsink = GST_TCP_CLIENT_SINK (object);
263
264 switch (prop_id) {
265 case PROP_HOST:
266 g_value_set_string (value, tcpclientsink->host);
267 break;
268 case PROP_PORT:
269 g_value_set_int (value, tcpclientsink->port);
270 break;
271 default:
272 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
273 break;
274 }
275 }
276
277
278 /* create a socket for sending to remote machine */
279 static gboolean
gst_tcp_client_sink_start(GstBaseSink * bsink)280 gst_tcp_client_sink_start (GstBaseSink * bsink)
281 {
282 GstTCPClientSink *this = GST_TCP_CLIENT_SINK (bsink);
283 GError *err = NULL;
284 GInetAddress *addr;
285 GSocketAddress *saddr;
286 GResolver *resolver;
287
288 if (GST_OBJECT_FLAG_IS_SET (this, GST_TCP_CLIENT_SINK_OPEN))
289 return TRUE;
290
291 /* look up name if we need to */
292 addr = g_inet_address_new_from_string (this->host);
293 if (!addr) {
294 GList *results;
295
296 resolver = g_resolver_get_default ();
297
298 results =
299 g_resolver_lookup_by_name (resolver, this->host, this->cancellable,
300 &err);
301 if (!results)
302 goto name_resolve;
303 addr = G_INET_ADDRESS (g_object_ref (results->data));
304
305 g_resolver_free_addresses (results);
306 g_object_unref (resolver);
307 }
308 #ifndef GST_DISABLE_GST_DEBUG
309 {
310 gchar *ip = g_inet_address_to_string (addr);
311
312 GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip);
313 g_free (ip);
314 }
315 #endif
316 saddr = g_inet_socket_address_new (addr, this->port);
317 g_object_unref (addr);
318
319 /* create sending client socket */
320 GST_DEBUG_OBJECT (this, "opening sending client socket to %s:%d", this->host,
321 this->port);
322 this->socket =
323 g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
324 G_SOCKET_PROTOCOL_TCP, &err);
325 if (!this->socket)
326 goto no_socket;
327
328 GST_DEBUG_OBJECT (this, "opened sending client socket");
329
330 /* connect to server */
331 if (!g_socket_connect (this->socket, saddr, this->cancellable, &err))
332 goto connect_failed;
333
334 g_object_unref (saddr);
335
336 GST_OBJECT_FLAG_SET (this, GST_TCP_CLIENT_SINK_OPEN);
337
338 this->data_written = 0;
339
340 return TRUE;
341 no_socket:
342 {
343 GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
344 ("Failed to create socket: %s", err->message));
345 g_clear_error (&err);
346 g_object_unref (saddr);
347 return FALSE;
348 }
349 name_resolve:
350 {
351 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
352 GST_DEBUG_OBJECT (this, "Cancelled name resolval");
353 } else {
354 GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
355 ("Failed to resolve host '%s': %s", this->host, err->message));
356 }
357 g_clear_error (&err);
358 g_object_unref (resolver);
359 return FALSE;
360 }
361 connect_failed:
362 {
363 if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
364 GST_DEBUG_OBJECT (this, "Cancelled connecting");
365 } else {
366 GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
367 ("Failed to connect to host '%s:%d': %s", this->host, this->port,
368 err->message));
369 }
370 g_clear_error (&err);
371 g_object_unref (saddr);
372 /* pretend we opened ok for proper cleanup to happen */
373 GST_OBJECT_FLAG_SET (this, GST_TCP_CLIENT_SINK_OPEN);
374 gst_tcp_client_sink_stop (GST_BASE_SINK (this));
375 return FALSE;
376 }
377 }
378
379 static gboolean
gst_tcp_client_sink_stop(GstBaseSink * bsink)380 gst_tcp_client_sink_stop (GstBaseSink * bsink)
381 {
382 GstTCPClientSink *this = GST_TCP_CLIENT_SINK (bsink);
383 GError *err = NULL;
384
385 if (!GST_OBJECT_FLAG_IS_SET (this, GST_TCP_CLIENT_SINK_OPEN))
386 return TRUE;
387
388 if (this->socket) {
389 GST_DEBUG_OBJECT (this, "closing socket");
390
391 if (!g_socket_close (this->socket, &err)) {
392 GST_ERROR_OBJECT (this, "Failed to close socket: %s", err->message);
393 g_clear_error (&err);
394 }
395 g_object_unref (this->socket);
396 this->socket = NULL;
397 }
398
399 GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN);
400
401 return TRUE;
402 }
403
404 /* will be called only between calls to start() and stop() */
405 static gboolean
gst_tcp_client_sink_unlock(GstBaseSink * bsink)406 gst_tcp_client_sink_unlock (GstBaseSink * bsink)
407 {
408 GstTCPClientSink *sink = GST_TCP_CLIENT_SINK (bsink);
409
410 GST_DEBUG_OBJECT (sink, "set to flushing");
411 g_cancellable_cancel (sink->cancellable);
412
413 return TRUE;
414 }
415
416 /* will be called only between calls to start() and stop() */
417 static gboolean
gst_tcp_client_sink_unlock_stop(GstBaseSink * bsink)418 gst_tcp_client_sink_unlock_stop (GstBaseSink * bsink)
419 {
420 GstTCPClientSink *sink = GST_TCP_CLIENT_SINK (bsink);
421
422 GST_DEBUG_OBJECT (sink, "unset flushing");
423 g_object_unref (sink->cancellable);
424 sink->cancellable = g_cancellable_new ();
425
426 return TRUE;
427 }
428