• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) <2011> Collabora Ltd.
5  *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 /**
24  * SECTION:element-tcpclientsink
25  * @title: tcpclientsink
26  * @see_also: #tcpclientsink
27  *
28  * ## Example launch line (server):
29  * |[
30  * nc -l -p 3000
31  * ]|
32  * ## Example launch line (client):
33  * |[
34  * gst-launch-1.0 fdsink fd=1 ! tcpclientsink port=3000
35  * ]|
36  *  everything you type in the client is shown on the server (fd=1 means
37  * standard input which is the command line input file descriptor)
38  *
39  */
40 
41 #ifdef HAVE_CONFIG_H
42 #include "config.h"
43 #endif
44 #include <gst/gst-i18n-plugin.h>
45 
46 #include "gsttcpelements.h"
47 #include "gsttcpclientsink.h"
48 
49 /* TCPClientSink signals and args */
50 enum
51 {
52   FRAME_ENCODED,
53   /* FILL ME */
54   LAST_SIGNAL
55 };
56 
57 GST_DEBUG_CATEGORY_STATIC (tcpclientsink_debug);
58 #define GST_CAT_DEFAULT (tcpclientsink_debug)
59 
60 enum
61 {
62   PROP_0,
63   PROP_HOST,
64   PROP_PORT
65 };
66 
67 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
68     GST_PAD_SINK,
69     GST_PAD_ALWAYS,
70     GST_STATIC_CAPS_ANY);
71 
72 static void gst_tcp_client_sink_finalize (GObject * gobject);
73 
74 static gboolean gst_tcp_client_sink_setcaps (GstBaseSink * bsink,
75     GstCaps * caps);
76 static GstFlowReturn gst_tcp_client_sink_render (GstBaseSink * bsink,
77     GstBuffer * buf);
78 static gboolean gst_tcp_client_sink_start (GstBaseSink * bsink);
79 static gboolean gst_tcp_client_sink_stop (GstBaseSink * bsink);
80 static gboolean gst_tcp_client_sink_unlock (GstBaseSink * bsink);
81 static gboolean gst_tcp_client_sink_unlock_stop (GstBaseSink * bsink);
82 
83 static void gst_tcp_client_sink_set_property (GObject * object, guint prop_id,
84     const GValue * value, GParamSpec * pspec);
85 static void gst_tcp_client_sink_get_property (GObject * object, guint prop_id,
86     GValue * value, GParamSpec * pspec);
87 
88 
89 /*static guint gst_tcp_client_sink_signals[LAST_SIGNAL] = { 0 }; */
90 
91 #define gst_tcp_client_sink_parent_class parent_class
92 G_DEFINE_TYPE (GstTCPClientSink, gst_tcp_client_sink, GST_TYPE_BASE_SINK);
93 GST_ELEMENT_REGISTER_DEFINE_WITH_CODE (tcpclientsink, "tcpclientsink",
94     GST_RANK_NONE, GST_TYPE_TCP_CLIENT_SINK, tcp_element_init (plugin));
95 
96 static void
gst_tcp_client_sink_class_init(GstTCPClientSinkClass * klass)97 gst_tcp_client_sink_class_init (GstTCPClientSinkClass * klass)
98 {
99   GObjectClass *gobject_class;
100   GstElementClass *gstelement_class;
101   GstBaseSinkClass *gstbasesink_class;
102 
103   gobject_class = (GObjectClass *) klass;
104   gstelement_class = (GstElementClass *) klass;
105   gstbasesink_class = (GstBaseSinkClass *) klass;
106 
107   parent_class = g_type_class_peek_parent (klass);
108 
109   gobject_class->set_property = gst_tcp_client_sink_set_property;
110   gobject_class->get_property = gst_tcp_client_sink_get_property;
111   gobject_class->finalize = gst_tcp_client_sink_finalize;
112 
113   g_object_class_install_property (gobject_class, PROP_HOST,
114       g_param_spec_string ("host", "Host", "The host/IP to send the packets to",
115           TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
116   g_object_class_install_property (gobject_class, PROP_PORT,
117       g_param_spec_int ("port", "Port", "The port to send the packets to",
118           0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT,
119           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
120 
121   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
122 
123   gst_element_class_set_static_metadata (gstelement_class,
124       "TCP client sink", "Sink/Network",
125       "Send data as a client over the network via TCP",
126       "Thomas Vander Stichele <thomas at apestaart dot org>");
127 
128   gstbasesink_class->start = gst_tcp_client_sink_start;
129   gstbasesink_class->stop = gst_tcp_client_sink_stop;
130   gstbasesink_class->set_caps = gst_tcp_client_sink_setcaps;
131   gstbasesink_class->render = gst_tcp_client_sink_render;
132   gstbasesink_class->unlock = gst_tcp_client_sink_unlock;
133   gstbasesink_class->unlock_stop = gst_tcp_client_sink_unlock_stop;
134 
135   GST_DEBUG_CATEGORY_INIT (tcpclientsink_debug, "tcpclientsink", 0, "TCP sink");
136 }
137 
138 static void
gst_tcp_client_sink_init(GstTCPClientSink * this)139 gst_tcp_client_sink_init (GstTCPClientSink * this)
140 {
141   this->host = g_strdup (TCP_DEFAULT_HOST);
142   this->port = TCP_DEFAULT_PORT;
143 
144   this->socket = NULL;
145   this->cancellable = g_cancellable_new ();
146 
147   GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN);
148 }
149 
150 static void
gst_tcp_client_sink_finalize(GObject * gobject)151 gst_tcp_client_sink_finalize (GObject * gobject)
152 {
153   GstTCPClientSink *this = GST_TCP_CLIENT_SINK (gobject);
154 
155   if (this->cancellable)
156     g_object_unref (this->cancellable);
157   this->cancellable = NULL;
158 
159   if (this->socket)
160     g_object_unref (this->socket);
161   this->socket = NULL;
162 
163   g_free (this->host);
164   this->host = NULL;
165 
166   G_OBJECT_CLASS (parent_class)->finalize (gobject);
167 }
168 
169 static gboolean
gst_tcp_client_sink_setcaps(GstBaseSink * bsink,GstCaps * caps)170 gst_tcp_client_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
171 {
172   return TRUE;
173 }
174 
175 static GstFlowReturn
gst_tcp_client_sink_render(GstBaseSink * bsink,GstBuffer * buf)176 gst_tcp_client_sink_render (GstBaseSink * bsink, GstBuffer * buf)
177 {
178   GstTCPClientSink *sink;
179   GstMapInfo map;
180   gsize written = 0;
181   gssize rret;
182   GError *err = NULL;
183 
184   sink = GST_TCP_CLIENT_SINK (bsink);
185 
186   g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, GST_TCP_CLIENT_SINK_OPEN),
187       GST_FLOW_FLUSHING);
188 
189   gst_buffer_map (buf, &map, GST_MAP_READ);
190   GST_LOG_OBJECT (sink, "writing %" G_GSIZE_FORMAT " bytes for buffer data",
191       map.size);
192 
193   /* write buffer data */
194   while (written < map.size) {
195     rret =
196         g_socket_send (sink->socket, (gchar *) map.data + written,
197         map.size - written, sink->cancellable, &err);
198     if (rret < 0)
199       goto write_error;
200     written += rret;
201   }
202   gst_buffer_unmap (buf, &map);
203 
204   sink->data_written += written;
205 
206   return GST_FLOW_OK;
207 
208   /* ERRORS */
209 write_error:
210   {
211     GstFlowReturn ret;
212 
213     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
214       ret = GST_FLOW_FLUSHING;
215       GST_DEBUG_OBJECT (sink, "Cancelled reading from socket");
216     } else {
217       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
218           (_("Error while sending data to \"%s:%d\"."), sink->host, sink->port),
219           ("Only %" G_GSIZE_FORMAT " of %" G_GSIZE_FORMAT " bytes written: %s",
220               written, map.size, err->message));
221       ret = GST_FLOW_ERROR;
222     }
223     gst_buffer_unmap (buf, &map);
224     g_clear_error (&err);
225     return ret;
226   }
227 }
228 
229 static void
gst_tcp_client_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)230 gst_tcp_client_sink_set_property (GObject * object, guint prop_id,
231     const GValue * value, GParamSpec * pspec)
232 {
233   GstTCPClientSink *tcpclientsink;
234 
235   g_return_if_fail (GST_IS_TCP_CLIENT_SINK (object));
236   tcpclientsink = GST_TCP_CLIENT_SINK (object);
237 
238   switch (prop_id) {
239     case PROP_HOST:
240       if (!g_value_get_string (value)) {
241         g_warning ("host property cannot be NULL");
242         break;
243       }
244       g_free (tcpclientsink->host);
245       tcpclientsink->host = g_value_dup_string (value);
246       break;
247     case PROP_PORT:
248       tcpclientsink->port = g_value_get_int (value);
249       break;
250 
251     default:
252       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
253       break;
254   }
255 }
256 
257 static void
gst_tcp_client_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)258 gst_tcp_client_sink_get_property (GObject * object, guint prop_id,
259     GValue * value, GParamSpec * pspec)
260 {
261   GstTCPClientSink *tcpclientsink;
262 
263   g_return_if_fail (GST_IS_TCP_CLIENT_SINK (object));
264   tcpclientsink = GST_TCP_CLIENT_SINK (object);
265 
266   switch (prop_id) {
267     case PROP_HOST:
268       g_value_set_string (value, tcpclientsink->host);
269       break;
270     case PROP_PORT:
271       g_value_set_int (value, tcpclientsink->port);
272       break;
273     default:
274       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
275       break;
276   }
277 }
278 
279 
280 /* create a socket for sending to remote machine */
281 static gboolean
gst_tcp_client_sink_start(GstBaseSink * bsink)282 gst_tcp_client_sink_start (GstBaseSink * bsink)
283 {
284   GstTCPClientSink *this = GST_TCP_CLIENT_SINK (bsink);
285   GError *err = NULL;
286   GList *addrs;
287   GList *cur_addr;
288   GSocketAddress *saddr = NULL;
289 
290   if (GST_OBJECT_FLAG_IS_SET (this, GST_TCP_CLIENT_SINK_OPEN))
291     return TRUE;
292 
293   addrs =
294       tcp_get_addresses (GST_ELEMENT (this), this->host, this->cancellable,
295       &err);
296   if (!addrs)
297     goto name_resolve;
298 
299   GST_DEBUG_OBJECT (this, "opening sending client socket to %s:%d", this->host,
300       this->port);
301 
302   cur_addr = addrs;
303   while (cur_addr) {
304     /* clean up from possible previous iterations */
305     g_clear_error (&err);
306     g_clear_object (&this->socket);
307 
308     /* iterate over addresses until one works */
309     this->socket =
310         tcp_create_socket (GST_ELEMENT (this), &cur_addr, this->port, &saddr,
311         &err);
312     if (!this->socket)
313       goto no_socket;
314 
315     GST_DEBUG_OBJECT (this, "opened sending client socket");
316 
317     /* connect to server */
318     if (g_socket_connect (this->socket, saddr, this->cancellable, &err))
319       break;
320 
321     /* failed to connect, release and try next address... */
322     g_clear_object (&saddr);
323     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED))
324       goto connect_failed;
325   }
326 
327   /* final connect attempt failed */
328   if (err)
329     goto connect_failed;
330 
331   GST_DEBUG_OBJECT (this, "connected to %s:%d", this->host, this->port);
332   g_list_free_full (g_steal_pointer (&addrs), g_object_unref);
333   g_object_unref (saddr);
334 
335   GST_OBJECT_FLAG_SET (this, GST_TCP_CLIENT_SINK_OPEN);
336 
337   this->data_written = 0;
338 
339   return TRUE;
340 
341 name_resolve:
342   {
343     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
344       GST_DEBUG_OBJECT (this, "Cancelled name resolution");
345     } else {
346       GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
347           ("Failed to resolve host '%s': %s", this->host, err->message));
348     }
349     g_clear_error (&err);
350     return FALSE;
351   }
352 no_socket:
353   {
354     g_list_free_full (g_steal_pointer (&addrs), g_object_unref);
355     GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
356         ("Failed to create socket: %s", err->message));
357     g_clear_error (&err);
358     return FALSE;
359   }
360 connect_failed:
361   {
362     g_list_free_full (g_steal_pointer (&addrs), g_object_unref);
363     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
364       GST_DEBUG_OBJECT (this, "Cancelled connecting");
365     } else {
366       GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
367           ("Failed to connect to host '%s:%d': %s", this->host, this->port,
368               err->message));
369     }
370     g_clear_error (&err);
371     /* pretend we opened ok for proper cleanup to happen */
372     GST_OBJECT_FLAG_SET (this, GST_TCP_CLIENT_SINK_OPEN);
373     gst_tcp_client_sink_stop (GST_BASE_SINK (this));
374     return FALSE;
375   }
376 }
377 
378 static gboolean
gst_tcp_client_sink_stop(GstBaseSink * bsink)379 gst_tcp_client_sink_stop (GstBaseSink * bsink)
380 {
381   GstTCPClientSink *this = GST_TCP_CLIENT_SINK (bsink);
382   GError *err = NULL;
383 
384   if (!GST_OBJECT_FLAG_IS_SET (this, GST_TCP_CLIENT_SINK_OPEN))
385     return TRUE;
386 
387   if (this->socket) {
388     GST_DEBUG_OBJECT (this, "closing socket");
389 
390     if (!g_socket_close (this->socket, &err)) {
391       GST_ERROR_OBJECT (this, "Failed to close socket: %s", err->message);
392       g_clear_error (&err);
393     }
394     g_object_unref (this->socket);
395     this->socket = NULL;
396   }
397 
398   GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN);
399 
400   return TRUE;
401 }
402 
403 /* will be called only between calls to start() and stop() */
404 static gboolean
gst_tcp_client_sink_unlock(GstBaseSink * bsink)405 gst_tcp_client_sink_unlock (GstBaseSink * bsink)
406 {
407   GstTCPClientSink *sink = GST_TCP_CLIENT_SINK (bsink);
408 
409   GST_DEBUG_OBJECT (sink, "set to flushing");
410   g_cancellable_cancel (sink->cancellable);
411 
412   return TRUE;
413 }
414 
415 /* will be called only between calls to start() and stop() */
416 static gboolean
gst_tcp_client_sink_unlock_stop(GstBaseSink * bsink)417 gst_tcp_client_sink_unlock_stop (GstBaseSink * bsink)
418 {
419   GstTCPClientSink *sink = GST_TCP_CLIENT_SINK (bsink);
420 
421   GST_DEBUG_OBJECT (sink, "unset flushing");
422   g_object_unref (sink->cancellable);
423   sink->cancellable = g_cancellable_new ();
424 
425   return TRUE;
426 }
427