• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* GStreamer
2  *
3  * Copyright (C) 2014 William Manley <will@williammanley.net>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 #ifdef HAVE_CONFIG_H
21 #include "config.h"
22 #endif
23 
24 #include <gio/gio.h>
25 #include <gst/check/gstcheck.h>
26 #include <gst/app/gstappsink.h>
27 #include <gst/app/gstappsrc.h>
28 #include <gst/net/gstnetcontrolmessagemeta.h>
29 
30 #ifdef HAVE_GIO_UNIX_2_0
31 #include <gio/gunixfdmessage.h>
32 #endif /*  HAVE_GIO_UNIX_2_0 */
33 
34 #include <sys/types.h>
35 #include <sys/stat.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38 
39 static gboolean
40 g_socketpair (GSocketFamily family, GSocketType type, GSocketProtocol protocol,
41     GSocket * gsv[2], GError ** error);
42 
43 typedef struct
44 {
45   GstElement *sink;
46   GstElement *src;
47 
48   GstPipeline *sink_pipeline;
49   GstPipeline *src_pipeline;
50   GstAppSrc *sink_src;
51   GstAppSink *src_sink;
52 } SymmetryTest;
53 
54 static void
symmetry_test_setup(SymmetryTest * st,GstElement * sink,GstElement * src)55 symmetry_test_setup (SymmetryTest * st, GstElement * sink, GstElement * src)
56 {
57   GstStateChangeReturn ret;
58   GstCaps *caps;
59   st->sink = sink;
60   g_object_set (sink, "sync", FALSE, NULL);
61   st->src = src;
62 
63   st->sink_pipeline = GST_PIPELINE (gst_pipeline_new (NULL));
64   st->src_pipeline = GST_PIPELINE (gst_pipeline_new (NULL));
65 
66   st->sink_src = GST_APP_SRC (gst_element_factory_make ("appsrc", NULL));
67   fail_unless (st->sink_src != NULL);
68   caps = gst_caps_from_string ("application/x-gst-check");
69   gst_app_src_set_caps (st->sink_src, caps);
70   gst_caps_unref (caps);
71 
72   gst_bin_add_many (GST_BIN (st->sink_pipeline), GST_ELEMENT (st->sink_src),
73       st->sink, NULL);
74   fail_unless (gst_element_link_many (GST_ELEMENT (st->sink_src), st->sink,
75           NULL));
76 
77   st->src_sink = GST_APP_SINK (gst_element_factory_make ("appsink", NULL));
78   fail_unless (st->src_sink != NULL);
79   gst_bin_add_many (GST_BIN (st->src_pipeline), st->src,
80       GST_ELEMENT (st->src_sink), NULL);
81   fail_unless (gst_element_link_many (st->src, GST_ELEMENT (st->src_sink),
82           NULL));
83 
84   ret =
85       gst_element_set_state (GST_ELEMENT (st->sink_pipeline),
86       GST_STATE_PLAYING);
87   fail_if (ret == GST_STATE_CHANGE_FAILURE);
88   ret =
89       gst_element_set_state (GST_ELEMENT (st->src_pipeline), GST_STATE_PLAYING);
90   fail_if (ret == GST_STATE_CHANGE_FAILURE);
91 }
92 
93 static void
symmetry_test_teardown(SymmetryTest * st)94 symmetry_test_teardown (SymmetryTest * st)
95 {
96   fail_unless (gst_element_set_state (GST_ELEMENT (st->sink_pipeline),
97           GST_STATE_NULL) != GST_STATE_CHANGE_FAILURE);
98   fail_unless (gst_element_set_state (GST_ELEMENT (st->src_pipeline),
99           GST_STATE_NULL) != GST_STATE_CHANGE_FAILURE);
100 
101   gst_object_unref (st->sink_pipeline);
102   gst_object_unref (st->src_pipeline);
103 
104   memset (st, 0, sizeof (*st));
105 }
106 
107 static void
symmetry_test_assert_passthrough(SymmetryTest * st,GstBuffer * in)108 symmetry_test_assert_passthrough (SymmetryTest * st, GstBuffer * in)
109 {
110   gpointer copy;
111   gsize data_size;
112   GstSample *out;
113 
114   gst_buffer_extract_dup (in, 0, -1, &copy, &data_size);
115 
116   fail_unless (gst_app_src_push_buffer (st->sink_src, in) == GST_FLOW_OK);
117   in = NULL;
118   out = gst_app_sink_pull_sample (st->src_sink);
119   fail_unless (out != NULL);
120 
121   fail_unless (gst_buffer_get_size (gst_sample_get_buffer (out)) == data_size);
122   fail_unless (gst_buffer_memcmp (gst_sample_get_buffer (out), 0, copy,
123           data_size) == 0);
124   g_free (copy);
125   gst_sample_unref (out);
126 }
127 
128 static gboolean
g_socketpair(GSocketFamily family,GSocketType type,GSocketProtocol protocol,GSocket * gsv[2],GError ** error)129 g_socketpair (GSocketFamily family, GSocketType type, GSocketProtocol protocol,
130     GSocket * gsv[2], GError ** error)
131 {
132   int ret;
133   int sv[2];
134 
135   ret = socketpair (family, type, protocol, sv);
136   if (ret != 0) {
137     g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "socketpair failed: %s",
138         g_strerror (errno));
139     return FALSE;
140   }
141 
142   gsv[0] = g_socket_new_from_fd (sv[0], error);
143   if (gsv[0] == NULL) {
144     close (sv[0]);
145     close (sv[1]);
146     return FALSE;
147   }
148   gsv[1] = g_socket_new_from_fd (sv[1], error);
149   if (gsv[1] == NULL) {
150     g_object_unref (gsv[0]);
151     gsv[0] = NULL;
152     close (sv[1]);
153     return FALSE;
154   }
155   return TRUE;
156 }
157 
158 static void
setup_multisocketsink_and_socketsrc(SymmetryTest * st)159 setup_multisocketsink_and_socketsrc (SymmetryTest * st)
160 {
161   GSocket *sockets[2] = { NULL, NULL };
162   GError *err = NULL;
163 
164   st->sink = gst_check_setup_element ("multisocketsink");
165   st->src = gst_check_setup_element ("socketsrc");
166 
167   fail_unless (g_socketpair (G_SOCKET_FAMILY_UNIX,
168           G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_DEFAULT, sockets, &err));
169 
170   g_object_set (st->src, "socket", sockets[0], NULL);
171   g_object_unref (sockets[0]);
172   sockets[0] = NULL;
173 
174   symmetry_test_setup (st, st->sink, st->src);
175 
176   g_signal_emit_by_name (st->sink, "add", sockets[1], NULL);
177   g_object_unref (sockets[1]);
178   sockets[1] = NULL;
179 }
180 
GST_START_TEST(test_that_socketsrc_and_multisocketsink_are_symmetrical)181 GST_START_TEST (test_that_socketsrc_and_multisocketsink_are_symmetrical)
182 {
183   SymmetryTest st = { 0 };
184   setup_multisocketsink_and_socketsrc (&st);
185   symmetry_test_assert_passthrough (&st,
186       gst_buffer_new_wrapped (g_strdup ("hello"), 5));
187   symmetry_test_teardown (&st);
188 }
189 
190 GST_END_TEST;
191 
192 
GST_START_TEST(test_that_tcpclientsink_and_tcpserversrc_are_symmetrical)193 GST_START_TEST (test_that_tcpclientsink_and_tcpserversrc_are_symmetrical)
194 {
195   SymmetryTest st = { 0 };
196   GstElement *serversrc = gst_check_setup_element ("tcpserversrc");
197 
198   gst_element_set_state (serversrc, GST_STATE_PAUSED);
199   symmetry_test_setup (&st, gst_check_setup_element ("tcpclientsink"),
200       serversrc);
201 
202   symmetry_test_assert_passthrough (&st,
203       gst_buffer_new_wrapped (g_strdup ("hello"), 5));
204 
205   symmetry_test_teardown (&st);
206 }
207 
208 GST_END_TEST;
209 
210 
GST_START_TEST(test_that_tcpserversink_and_tcpclientsrc_are_symmetrical)211 GST_START_TEST (test_that_tcpserversink_and_tcpclientsrc_are_symmetrical)
212 {
213   SymmetryTest st = { 0 };
214   GstElement *serversink = gst_check_setup_element ("tcpserversink");
215   guint timeout = 100;
216 
217   symmetry_test_setup (&st, serversink,
218       gst_check_setup_element ("tcpclientsrc"));
219 
220   /* Wait for the client to *actually* be connected before doing the
221    * test.  The socket connection from the client might very well
222    * succeed, but that doesn't mean the server has accepted it yet. If
223    * we don't wait for the server to have accepted the connection, we
224    * would end up dropping the buffer (because no one is "connected")
225    * and the receiving side would wait forever. */
226   while (timeout) {
227     guint handles;
228     g_object_get (serversink, "num-handles", &handles, NULL);
229     if (handles > 0)
230       break;
231     /* Wait for 10ms to see if client connected */
232     g_usleep (G_USEC_PER_SEC / 100);
233     timeout--;
234   }
235 
236   symmetry_test_assert_passthrough (&st,
237       gst_buffer_new_wrapped (g_strdup ("hello"), 5));
238   symmetry_test_teardown (&st);
239 }
240 
241 GST_END_TEST;
242 
243 static void
on_connection_closed(GstElement * socketsrc,gpointer user_data)244 on_connection_closed (GstElement * socketsrc, gpointer user_data)
245 {
246   GSocket *socket = (GSocket *) user_data;
247 
248   g_object_set (socketsrc, "socket", socket, NULL);
249 }
250 
GST_START_TEST(test_that_we_can_provide_new_socketsrc_sockets_during_signal)251 GST_START_TEST (test_that_we_can_provide_new_socketsrc_sockets_during_signal)
252 {
253   GSocket *sockets[4] = { NULL, NULL };
254 
255   GstPipeline *pipeline = NULL;
256   GstAppSink *appsink = NULL;
257   GstElement *socketsrc = NULL;
258   GstSample *sample = NULL;
259 
260   socketsrc = gst_check_setup_element ("socketsrc");
261 
262   fail_unless (g_socketpair (G_SOCKET_FAMILY_UNIX,
263           G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_DEFAULT, &sockets[0], NULL));
264 
265   fail_unless (g_socket_send (sockets[0], "hello", 5, NULL, NULL) == 5);
266   fail_unless (g_socket_shutdown (sockets[0], FALSE, TRUE, NULL));
267 
268   fail_unless (g_socketpair (G_SOCKET_FAMILY_UNIX,
269           G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_DEFAULT, &sockets[2], NULL));
270   fail_unless (g_socket_send (sockets[2], "goodbye", 7, NULL, NULL) == 7);
271   fail_unless (g_socket_shutdown (sockets[2], FALSE, TRUE, NULL));
272 
273   g_object_set (socketsrc, "socket", sockets[1], NULL);
274 
275   g_signal_connect (socketsrc, "connection-closed-by-peer",
276       G_CALLBACK (on_connection_closed), sockets[3]);
277 
278   pipeline = (GstPipeline *) gst_pipeline_new (NULL);
279   appsink = GST_APP_SINK (gst_check_setup_element ("appsink"));
280   gst_bin_add_many (GST_BIN (pipeline), socketsrc, GST_ELEMENT (appsink), NULL);
281   fail_unless (gst_element_link_many (socketsrc, GST_ELEMENT (appsink), NULL));
282 
283   gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
284 
285   fail_unless ((sample = gst_app_sink_pull_sample (appsink)) != NULL);
286   gst_buffer_memcmp (gst_sample_get_buffer (sample), 0, "hello", 5);
287   gst_sample_unref (sample);
288 
289   fail_unless ((sample = gst_app_sink_pull_sample (appsink)) != NULL);
290   gst_buffer_memcmp (gst_sample_get_buffer (sample), 0, "goodbye", 7);
291   gst_sample_unref (sample);
292 
293   fail_unless (NULL == gst_app_sink_pull_sample (appsink));
294   fail_unless (gst_app_sink_is_eos (appsink));
295 
296   gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL);
297   g_clear_object (&sockets[0]);
298   g_clear_object (&sockets[1]);
299   g_clear_object (&sockets[2]);
300   g_clear_object (&sockets[3]);
301   gst_object_unref (pipeline);
302 }
303 
304 GST_END_TEST
305 #ifdef HAVE_GIO_UNIX_2_0
306 static GSocketControlMessage *
get_control_message_meta(GstBuffer * buf)307 get_control_message_meta (GstBuffer * buf)
308 {
309   GstMeta *meta;
310   gpointer iter_state = NULL;
311 
312   while ((meta = gst_buffer_iterate_meta (buf, &iter_state)) != NULL) {
313     if (meta->info->api == GST_NET_CONTROL_MESSAGE_META_API_TYPE)
314       return ((GstNetControlMessageMeta *) meta)->message;
315   }
316   fail ("Expected GSocketControlMessage attached to buffer");
317   return NULL;
318 }
319 
320 
GST_START_TEST(test_that_multisocketsink_and_socketsrc_preserve_meta)321 GST_START_TEST (test_that_multisocketsink_and_socketsrc_preserve_meta)
322 {
323   GstBuffer *buf;
324   GSocketControlMessage *msg;
325   SymmetryTest st = { 0 };
326   char tmpfilename[] = "/tmp/tcp-test.XXXXXX";
327   GstSample *out;
328   int orig_fd, *new_fds, new_fds_len;
329   struct stat orig_stat, new_stat;
330 
331   setup_multisocketsink_and_socketsrc (&st);
332 
333   orig_fd = mkstemp (tmpfilename);
334   fail_unless (orig_fd > 0);
335   fail_unless (unlink (tmpfilename) == 0);
336   fstat (orig_fd, &orig_stat);
337 
338   msg = g_unix_fd_message_new ();
339   fail_unless (g_unix_fd_message_append_fd ((GUnixFDMessage *) msg, orig_fd,
340           NULL));
341   close (orig_fd);
342   orig_fd = -1;
343 
344   buf = gst_buffer_new_wrapped (g_strdup ("hello"), 5);
345   gst_buffer_add_net_control_message_meta (buf, msg);
346   g_clear_object (&msg);
347 
348   fail_unless (gst_app_src_push_buffer (st.sink_src, buf) == GST_FLOW_OK);
349   buf = NULL;
350 
351   out = gst_app_sink_pull_sample (st.src_sink);
352   fail_unless (out != NULL);
353 
354   fail_unless (gst_buffer_get_size (gst_sample_get_buffer (out)) == 5);
355   fail_unless (gst_buffer_memcmp (gst_sample_get_buffer (out), 0, "hello",
356           5) == 0);
357 
358   msg = get_control_message_meta (gst_sample_get_buffer (out));
359   fail_unless (g_socket_control_message_get_msg_type (msg) == SCM_RIGHTS);
360   new_fds = g_unix_fd_message_steal_fds ((GUnixFDMessage *) msg, &new_fds_len);
361   fail_unless (new_fds_len == 1);
362 
363   fstat (new_fds[0], &new_stat);
364   fail_unless (orig_stat.st_ino, new_stat.st_ino);
365 
366   close (new_fds[0]);
367   g_free (new_fds);
368 
369   gst_sample_unref (out);
370 
371   symmetry_test_teardown (&st);
372 }
373 
374 GST_END_TEST;
375 #endif /* HAVE_GIO_UNIX_2_0 */
376 
377 static Suite *
socketintegrationtest_suite(void)378 socketintegrationtest_suite (void)
379 {
380   Suite *s = suite_create ("socketintegrationtest");
381   TCase *tc_chain = tcase_create ("general");
382 
383   suite_add_tcase (s, tc_chain);
384   tcase_add_test (tc_chain,
385       test_that_socketsrc_and_multisocketsink_are_symmetrical);
386   tcase_add_test (tc_chain,
387       test_that_tcpclientsink_and_tcpserversrc_are_symmetrical);
388   tcase_add_test (tc_chain,
389       test_that_tcpserversink_and_tcpclientsrc_are_symmetrical);
390   tcase_add_test (tc_chain,
391       test_that_we_can_provide_new_socketsrc_sockets_during_signal);
392 #ifdef HAVE_GIO_UNIX_2_0
393   tcase_add_test (tc_chain,
394       test_that_multisocketsink_and_socketsrc_preserve_meta);
395 #endif /* HAVE_GIO_UNIX_2_0 */
396 
397   return s;
398 }
399 
400 GST_CHECK_MAIN (socketintegrationtest);
401