• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
3  * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
4  * Copyright (C) <2011> Collabora Ltd.
5  *     Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22 
23 /**
24  * SECTION:element-tcpclientsink
25  * @title: tcpclientsink
26  * @see_also: #tcpclientsink
27  *
28  * ## Example launch line (server):
29  * |[
30  * nc -l -p 3000
31  * ]|
32  * ## Example launch line (client):
33  * |[
34  * gst-launch-1.0 fdsink fd=1 ! tcpclientsink port=3000
35  * ]|
36  *  everything you type in the client is shown on the server (fd=1 means
37  * standard input which is the command line input file descriptor)
38  *
39  */
40 
41 #ifdef HAVE_CONFIG_H
42 #include "config.h"
43 #endif
44 #include <gst/gst-i18n-plugin.h>
45 
46 #include "gsttcp.h"
47 #include "gsttcpclientsink.h"
48 
49 /* TCPClientSink signals and args */
50 enum
51 {
52   FRAME_ENCODED,
53   /* FILL ME */
54   LAST_SIGNAL
55 };
56 
57 GST_DEBUG_CATEGORY_STATIC (tcpclientsink_debug);
58 #define GST_CAT_DEFAULT (tcpclientsink_debug)
59 
60 enum
61 {
62   PROP_0,
63   PROP_HOST,
64   PROP_PORT
65 };
66 
67 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
68     GST_PAD_SINK,
69     GST_PAD_ALWAYS,
70     GST_STATIC_CAPS_ANY);
71 
72 static void gst_tcp_client_sink_finalize (GObject * gobject);
73 
74 static gboolean gst_tcp_client_sink_setcaps (GstBaseSink * bsink,
75     GstCaps * caps);
76 static GstFlowReturn gst_tcp_client_sink_render (GstBaseSink * bsink,
77     GstBuffer * buf);
78 static gboolean gst_tcp_client_sink_start (GstBaseSink * bsink);
79 static gboolean gst_tcp_client_sink_stop (GstBaseSink * bsink);
80 static gboolean gst_tcp_client_sink_unlock (GstBaseSink * bsink);
81 static gboolean gst_tcp_client_sink_unlock_stop (GstBaseSink * bsink);
82 
83 static void gst_tcp_client_sink_set_property (GObject * object, guint prop_id,
84     const GValue * value, GParamSpec * pspec);
85 static void gst_tcp_client_sink_get_property (GObject * object, guint prop_id,
86     GValue * value, GParamSpec * pspec);
87 
88 
89 /*static guint gst_tcp_client_sink_signals[LAST_SIGNAL] = { 0 }; */
90 
91 #define gst_tcp_client_sink_parent_class parent_class
92 G_DEFINE_TYPE (GstTCPClientSink, gst_tcp_client_sink, GST_TYPE_BASE_SINK);
93 
94 static void
gst_tcp_client_sink_class_init(GstTCPClientSinkClass * klass)95 gst_tcp_client_sink_class_init (GstTCPClientSinkClass * klass)
96 {
97   GObjectClass *gobject_class;
98   GstElementClass *gstelement_class;
99   GstBaseSinkClass *gstbasesink_class;
100 
101   gobject_class = (GObjectClass *) klass;
102   gstelement_class = (GstElementClass *) klass;
103   gstbasesink_class = (GstBaseSinkClass *) klass;
104 
105   parent_class = g_type_class_peek_parent (klass);
106 
107   gobject_class->set_property = gst_tcp_client_sink_set_property;
108   gobject_class->get_property = gst_tcp_client_sink_get_property;
109   gobject_class->finalize = gst_tcp_client_sink_finalize;
110 
111   g_object_class_install_property (gobject_class, PROP_HOST,
112       g_param_spec_string ("host", "Host", "The host/IP to send the packets to",
113           TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
114   g_object_class_install_property (gobject_class, PROP_PORT,
115       g_param_spec_int ("port", "Port", "The port to send the packets to",
116           0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT,
117           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
118 
119   gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
120 
121   gst_element_class_set_static_metadata (gstelement_class,
122       "TCP client sink", "Sink/Network",
123       "Send data as a client over the network via TCP",
124       "Thomas Vander Stichele <thomas at apestaart dot org>");
125 
126   gstbasesink_class->start = gst_tcp_client_sink_start;
127   gstbasesink_class->stop = gst_tcp_client_sink_stop;
128   gstbasesink_class->set_caps = gst_tcp_client_sink_setcaps;
129   gstbasesink_class->render = gst_tcp_client_sink_render;
130   gstbasesink_class->unlock = gst_tcp_client_sink_unlock;
131   gstbasesink_class->unlock_stop = gst_tcp_client_sink_unlock_stop;
132 
133   GST_DEBUG_CATEGORY_INIT (tcpclientsink_debug, "tcpclientsink", 0, "TCP sink");
134 }
135 
136 static void
gst_tcp_client_sink_init(GstTCPClientSink * this)137 gst_tcp_client_sink_init (GstTCPClientSink * this)
138 {
139   this->host = g_strdup (TCP_DEFAULT_HOST);
140   this->port = TCP_DEFAULT_PORT;
141 
142   this->socket = NULL;
143   this->cancellable = g_cancellable_new ();
144 
145   GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN);
146 }
147 
148 static void
gst_tcp_client_sink_finalize(GObject * gobject)149 gst_tcp_client_sink_finalize (GObject * gobject)
150 {
151   GstTCPClientSink *this = GST_TCP_CLIENT_SINK (gobject);
152 
153   if (this->cancellable)
154     g_object_unref (this->cancellable);
155   this->cancellable = NULL;
156 
157   if (this->socket)
158     g_object_unref (this->socket);
159   this->socket = NULL;
160 
161   g_free (this->host);
162   this->host = NULL;
163 
164   G_OBJECT_CLASS (parent_class)->finalize (gobject);
165 }
166 
167 static gboolean
gst_tcp_client_sink_setcaps(GstBaseSink * bsink,GstCaps * caps)168 gst_tcp_client_sink_setcaps (GstBaseSink * bsink, GstCaps * caps)
169 {
170   return TRUE;
171 }
172 
173 static GstFlowReturn
gst_tcp_client_sink_render(GstBaseSink * bsink,GstBuffer * buf)174 gst_tcp_client_sink_render (GstBaseSink * bsink, GstBuffer * buf)
175 {
176   GstTCPClientSink *sink;
177   GstMapInfo map;
178   gsize written = 0;
179   gssize rret;
180   GError *err = NULL;
181 
182   sink = GST_TCP_CLIENT_SINK (bsink);
183 
184   g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, GST_TCP_CLIENT_SINK_OPEN),
185       GST_FLOW_FLUSHING);
186 
187   gst_buffer_map (buf, &map, GST_MAP_READ);
188   GST_LOG_OBJECT (sink, "writing %" G_GSIZE_FORMAT " bytes for buffer data",
189       map.size);
190 
191   /* write buffer data */
192   while (written < map.size) {
193     rret =
194         g_socket_send (sink->socket, (gchar *) map.data + written,
195         map.size - written, sink->cancellable, &err);
196     if (rret < 0)
197       goto write_error;
198     written += rret;
199   }
200   gst_buffer_unmap (buf, &map);
201 
202   sink->data_written += written;
203 
204   return GST_FLOW_OK;
205 
206   /* ERRORS */
207 write_error:
208   {
209     GstFlowReturn ret;
210 
211     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
212       ret = GST_FLOW_FLUSHING;
213       GST_DEBUG_OBJECT (sink, "Cancelled reading from socket");
214     } else {
215       GST_ELEMENT_ERROR (sink, RESOURCE, WRITE,
216           (_("Error while sending data to \"%s:%d\"."), sink->host, sink->port),
217           ("Only %" G_GSIZE_FORMAT " of %" G_GSIZE_FORMAT " bytes written: %s",
218               written, map.size, err->message));
219       ret = GST_FLOW_ERROR;
220     }
221     gst_buffer_unmap (buf, &map);
222     g_clear_error (&err);
223     return ret;
224   }
225 }
226 
227 static void
gst_tcp_client_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)228 gst_tcp_client_sink_set_property (GObject * object, guint prop_id,
229     const GValue * value, GParamSpec * pspec)
230 {
231   GstTCPClientSink *tcpclientsink;
232 
233   g_return_if_fail (GST_IS_TCP_CLIENT_SINK (object));
234   tcpclientsink = GST_TCP_CLIENT_SINK (object);
235 
236   switch (prop_id) {
237     case PROP_HOST:
238       if (!g_value_get_string (value)) {
239         g_warning ("host property cannot be NULL");
240         break;
241       }
242       g_free (tcpclientsink->host);
243       tcpclientsink->host = g_strdup (g_value_get_string (value));
244       break;
245     case PROP_PORT:
246       tcpclientsink->port = g_value_get_int (value);
247       break;
248 
249     default:
250       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
251       break;
252   }
253 }
254 
255 static void
gst_tcp_client_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)256 gst_tcp_client_sink_get_property (GObject * object, guint prop_id,
257     GValue * value, GParamSpec * pspec)
258 {
259   GstTCPClientSink *tcpclientsink;
260 
261   g_return_if_fail (GST_IS_TCP_CLIENT_SINK (object));
262   tcpclientsink = GST_TCP_CLIENT_SINK (object);
263 
264   switch (prop_id) {
265     case PROP_HOST:
266       g_value_set_string (value, tcpclientsink->host);
267       break;
268     case PROP_PORT:
269       g_value_set_int (value, tcpclientsink->port);
270       break;
271     default:
272       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
273       break;
274   }
275 }
276 
277 
278 /* create a socket for sending to remote machine */
279 static gboolean
gst_tcp_client_sink_start(GstBaseSink * bsink)280 gst_tcp_client_sink_start (GstBaseSink * bsink)
281 {
282   GstTCPClientSink *this = GST_TCP_CLIENT_SINK (bsink);
283   GError *err = NULL;
284   GInetAddress *addr;
285   GSocketAddress *saddr;
286   GResolver *resolver;
287 
288   if (GST_OBJECT_FLAG_IS_SET (this, GST_TCP_CLIENT_SINK_OPEN))
289     return TRUE;
290 
291   /* look up name if we need to */
292   addr = g_inet_address_new_from_string (this->host);
293   if (!addr) {
294     GList *results;
295 
296     resolver = g_resolver_get_default ();
297 
298     results =
299         g_resolver_lookup_by_name (resolver, this->host, this->cancellable,
300         &err);
301     if (!results)
302       goto name_resolve;
303     addr = G_INET_ADDRESS (g_object_ref (results->data));
304 
305     g_resolver_free_addresses (results);
306     g_object_unref (resolver);
307   }
308 #ifndef GST_DISABLE_GST_DEBUG
309   {
310     gchar *ip = g_inet_address_to_string (addr);
311 
312     GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip);
313     g_free (ip);
314   }
315 #endif
316   saddr = g_inet_socket_address_new (addr, this->port);
317   g_object_unref (addr);
318 
319   /* create sending client socket */
320   GST_DEBUG_OBJECT (this, "opening sending client socket to %s:%d", this->host,
321       this->port);
322   this->socket =
323       g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
324       G_SOCKET_PROTOCOL_TCP, &err);
325   if (!this->socket)
326     goto no_socket;
327 
328   GST_DEBUG_OBJECT (this, "opened sending client socket");
329 
330   /* connect to server */
331   if (!g_socket_connect (this->socket, saddr, this->cancellable, &err))
332     goto connect_failed;
333 
334   g_object_unref (saddr);
335 
336   GST_OBJECT_FLAG_SET (this, GST_TCP_CLIENT_SINK_OPEN);
337 
338   this->data_written = 0;
339 
340   return TRUE;
341 no_socket:
342   {
343     GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
344         ("Failed to create socket: %s", err->message));
345     g_clear_error (&err);
346     g_object_unref (saddr);
347     return FALSE;
348   }
349 name_resolve:
350   {
351     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
352       GST_DEBUG_OBJECT (this, "Cancelled name resolval");
353     } else {
354       GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
355           ("Failed to resolve host '%s': %s", this->host, err->message));
356     }
357     g_clear_error (&err);
358     g_object_unref (resolver);
359     return FALSE;
360   }
361 connect_failed:
362   {
363     if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
364       GST_DEBUG_OBJECT (this, "Cancelled connecting");
365     } else {
366       GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL),
367           ("Failed to connect to host '%s:%d': %s", this->host, this->port,
368               err->message));
369     }
370     g_clear_error (&err);
371     g_object_unref (saddr);
372     /* pretend we opened ok for proper cleanup to happen */
373     GST_OBJECT_FLAG_SET (this, GST_TCP_CLIENT_SINK_OPEN);
374     gst_tcp_client_sink_stop (GST_BASE_SINK (this));
375     return FALSE;
376   }
377 }
378 
379 static gboolean
gst_tcp_client_sink_stop(GstBaseSink * bsink)380 gst_tcp_client_sink_stop (GstBaseSink * bsink)
381 {
382   GstTCPClientSink *this = GST_TCP_CLIENT_SINK (bsink);
383   GError *err = NULL;
384 
385   if (!GST_OBJECT_FLAG_IS_SET (this, GST_TCP_CLIENT_SINK_OPEN))
386     return TRUE;
387 
388   if (this->socket) {
389     GST_DEBUG_OBJECT (this, "closing socket");
390 
391     if (!g_socket_close (this->socket, &err)) {
392       GST_ERROR_OBJECT (this, "Failed to close socket: %s", err->message);
393       g_clear_error (&err);
394     }
395     g_object_unref (this->socket);
396     this->socket = NULL;
397   }
398 
399   GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN);
400 
401   return TRUE;
402 }
403 
404 /* will be called only between calls to start() and stop() */
405 static gboolean
gst_tcp_client_sink_unlock(GstBaseSink * bsink)406 gst_tcp_client_sink_unlock (GstBaseSink * bsink)
407 {
408   GstTCPClientSink *sink = GST_TCP_CLIENT_SINK (bsink);
409 
410   GST_DEBUG_OBJECT (sink, "set to flushing");
411   g_cancellable_cancel (sink->cancellable);
412 
413   return TRUE;
414 }
415 
416 /* will be called only between calls to start() and stop() */
417 static gboolean
gst_tcp_client_sink_unlock_stop(GstBaseSink * bsink)418 gst_tcp_client_sink_unlock_stop (GstBaseSink * bsink)
419 {
420   GstTCPClientSink *sink = GST_TCP_CLIENT_SINK (bsink);
421 
422   GST_DEBUG_OBJECT (sink, "unset flushing");
423   g_object_unref (sink->cancellable);
424   sink->cancellable = g_cancellable_new ();
425 
426   return TRUE;
427 }
428