• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv.h"
23 #include "task.h"
24 
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h> /* strlen */
28 
29 static int completed_pingers = 0;
30 
31 #if defined(__CYGWIN__) || defined(__MSYS__) || defined(__MVS__)
32 #define NUM_PINGS 100 /* fewer pings to avoid timeout */
33 #else
34 #define NUM_PINGS 1000
35 #endif
36 
37 static char PING[] = "PING\n";
38 static char PONG[] = "PONG\n";
39 static int pinger_on_connect_count;
40 
41 
42 typedef struct {
43   int vectored_writes;
44   unsigned pongs;
45   unsigned state;
46   union {
47     uv_tcp_t tcp;
48     uv_pipe_t pipe;
49   } stream;
50   uv_connect_t connect_req;
51   char* pong;
52 } pinger_t;
53 
54 
alloc_cb(uv_handle_t * handle,size_t size,uv_buf_t * buf)55 static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
56   buf->base = malloc(size);
57   buf->len = size;
58 }
59 
60 
ponger_on_close(uv_handle_t * handle)61 static void ponger_on_close(uv_handle_t* handle) {
62   if (handle->data)
63     free(handle->data);
64   else
65     free(handle);
66 }
67 
68 
pinger_on_close(uv_handle_t * handle)69 static void pinger_on_close(uv_handle_t* handle) {
70   pinger_t* pinger = (pinger_t*) handle->data;
71 
72   ASSERT_EQ(NUM_PINGS, pinger->pongs);
73 
74   if (handle == (uv_handle_t*) &pinger->stream.tcp) {
75     free(pinger); /* also frees handle */
76   } else {
77     uv_close((uv_handle_t*) &pinger->stream.tcp, ponger_on_close);
78     free(handle);
79   }
80 
81   completed_pingers++;
82 }
83 
84 
pinger_after_write(uv_write_t * req,int status)85 static void pinger_after_write(uv_write_t* req, int status) {
86   ASSERT_EQ(status, 0);
87   free(req->data);
88   free(req);
89 }
90 
91 
pinger_write_ping(pinger_t * pinger)92 static void pinger_write_ping(pinger_t* pinger) {
93   uv_stream_t* stream;
94   uv_write_t* req;
95   uv_buf_t bufs[sizeof PING - 1];
96   int i, nbufs;
97 
98   stream = (uv_stream_t*) &pinger->stream.tcp;
99 
100   if (!pinger->vectored_writes) {
101     /* Write a single buffer. */
102     nbufs = 1;
103     bufs[0] = uv_buf_init(PING, sizeof PING - 1);
104   } else {
105     /* Write multiple buffers, each with one byte in them. */
106     nbufs = sizeof PING - 1;
107     for (i = 0; i < nbufs; i++) {
108       bufs[i] = uv_buf_init(&PING[i], 1);
109     }
110   }
111 
112   req = malloc(sizeof(*req));
113   ASSERT_NOT_NULL(req);
114   req->data = NULL;
115   ASSERT_EQ(0, uv_write(req, stream, bufs, nbufs, pinger_after_write));
116 
117   puts("PING");
118 }
119 
120 
pinger_read_cb(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)121 static void pinger_read_cb(uv_stream_t* stream,
122                            ssize_t nread,
123                            const uv_buf_t* buf) {
124   ssize_t i;
125   pinger_t* pinger;
126 
127   pinger = (pinger_t*) stream->data;
128 
129   if (nread < 0) {
130     ASSERT_EQ(nread, UV_EOF);
131 
132     puts("got EOF");
133     free(buf->base);
134 
135     uv_close((uv_handle_t*) stream, pinger_on_close);
136 
137     return;
138   }
139 
140   /* Now we count the pongs */
141   for (i = 0; i < nread; i++) {
142     ASSERT_EQ(buf->base[i], pinger->pong[pinger->state]);
143     pinger->state = (pinger->state + 1) % strlen(pinger->pong);
144 
145     if (pinger->state != 0)
146       continue;
147 
148     printf("PONG %d\n", pinger->pongs);
149     pinger->pongs++;
150 
151     if (pinger->pongs < NUM_PINGS) {
152       pinger_write_ping(pinger);
153     } else {
154       uv_close((uv_handle_t*) stream, pinger_on_close);
155       break;
156     }
157   }
158 
159   free(buf->base);
160 }
161 
162 
ponger_read_cb(uv_stream_t * stream,ssize_t nread,const uv_buf_t * buf)163 static void ponger_read_cb(uv_stream_t* stream,
164                            ssize_t nread,
165                            const uv_buf_t* buf) {
166   uv_buf_t writebuf;
167   uv_write_t* req;
168   int i;
169 
170   if (nread < 0) {
171     ASSERT_EQ(nread, UV_EOF);
172 
173     puts("got EOF");
174     free(buf->base);
175 
176     uv_close((uv_handle_t*) stream, ponger_on_close);
177 
178     return;
179   }
180 
181   /* Echo back */
182   for (i = 0; i < nread; i++) {
183     if (buf->base[i] == 'I')
184       buf->base[i] = 'O';
185   }
186 
187   writebuf = uv_buf_init(buf->base, nread);
188   req = malloc(sizeof(*req));
189   ASSERT_NOT_NULL(req);
190   req->data = buf->base;
191   ASSERT_EQ(0, uv_write(req, stream, &writebuf, 1, pinger_after_write));
192 }
193 
194 
pinger_on_connect(uv_connect_t * req,int status)195 static void pinger_on_connect(uv_connect_t* req, int status) {
196   pinger_t* pinger = (pinger_t*) req->handle->data;
197 
198   pinger_on_connect_count++;
199 
200   ASSERT_EQ(status, 0);
201 
202   ASSERT_EQ(1, uv_is_readable(req->handle));
203   ASSERT_EQ(1, uv_is_writable(req->handle));
204   ASSERT_EQ(0, uv_is_closing((uv_handle_t *) req->handle));
205 
206   pinger_write_ping(pinger);
207 
208   ASSERT_EQ(0, uv_read_start((uv_stream_t*) req->handle,
209                              alloc_cb,
210                              pinger_read_cb));
211 }
212 
213 
214 /* same ping-pong test, but using IPv6 connection */
tcp_pinger_v6_new(int vectored_writes)215 static void tcp_pinger_v6_new(int vectored_writes) {
216   int r;
217   struct sockaddr_in6 server_addr;
218   pinger_t* pinger;
219 
220 
221   ASSERT_EQ(0, uv_ip6_addr("::1", TEST_PORT, &server_addr));
222   pinger = malloc(sizeof(*pinger));
223   ASSERT_NOT_NULL(pinger);
224   pinger->vectored_writes = vectored_writes;
225   pinger->state = 0;
226   pinger->pongs = 0;
227   pinger->pong = PING;
228 
229   /* Try to connect to the server and do NUM_PINGS ping-pongs. */
230   r = uv_tcp_init(uv_default_loop(), &pinger->stream.tcp);
231   pinger->stream.tcp.data = pinger;
232   ASSERT_EQ(0, r);
233 
234   /* We are never doing multiple reads/connects at a time anyway, so these
235    * handles can be pre-initialized. */
236   r = uv_tcp_connect(&pinger->connect_req,
237                      &pinger->stream.tcp,
238                      (const struct sockaddr*) &server_addr,
239                      pinger_on_connect);
240   ASSERT_EQ(0, r);
241 
242   /* Synchronous connect callbacks are not allowed. */
243   ASSERT_EQ(pinger_on_connect_count, 0);
244 }
245 
246 
tcp_pinger_new(int vectored_writes)247 static void tcp_pinger_new(int vectored_writes) {
248   int r;
249   struct sockaddr_in server_addr;
250   pinger_t* pinger;
251 
252   ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &server_addr));
253   pinger = malloc(sizeof(*pinger));
254   ASSERT_NOT_NULL(pinger);
255   pinger->vectored_writes = vectored_writes;
256   pinger->state = 0;
257   pinger->pongs = 0;
258   pinger->pong = PING;
259 
260   /* Try to connect to the server and do NUM_PINGS ping-pongs. */
261   r = uv_tcp_init(uv_default_loop(), &pinger->stream.tcp);
262   pinger->stream.tcp.data = pinger;
263   ASSERT_EQ(0, r);
264 
265   /* We are never doing multiple reads/connects at a time anyway, so these
266    * handles can be pre-initialized. */
267   r = uv_tcp_connect(&pinger->connect_req,
268                      &pinger->stream.tcp,
269                      (const struct sockaddr*) &server_addr,
270                      pinger_on_connect);
271   ASSERT_EQ(0, r);
272 
273   /* Synchronous connect callbacks are not allowed. */
274   ASSERT_EQ(pinger_on_connect_count, 0);
275 }
276 
277 
pipe_pinger_new(int vectored_writes)278 static void pipe_pinger_new(int vectored_writes) {
279   int r;
280   pinger_t* pinger;
281 
282   pinger = malloc(sizeof(*pinger));
283   ASSERT_NOT_NULL(pinger);
284   pinger->vectored_writes = vectored_writes;
285   pinger->state = 0;
286   pinger->pongs = 0;
287   pinger->pong = PING;
288 
289   /* Try to connect to the server and do NUM_PINGS ping-pongs. */
290   r = uv_pipe_init(uv_default_loop(), &pinger->stream.pipe, 0);
291   pinger->stream.pipe.data = pinger;
292   ASSERT_EQ(0, r);
293 
294   /* We are never doing multiple reads/connects at a time anyway, so these
295    * handles can be pre-initialized. */
296   uv_pipe_connect(&pinger->connect_req, &pinger->stream.pipe, TEST_PIPENAME,
297       pinger_on_connect);
298 
299   /* Synchronous connect callbacks are not allowed. */
300   ASSERT_EQ(pinger_on_connect_count, 0);
301 }
302 
303 
socketpair_pinger_new(int vectored_writes)304 static void socketpair_pinger_new(int vectored_writes) {
305   pinger_t* pinger;
306   uv_os_sock_t fds[2];
307   uv_tcp_t* ponger;
308 
309   pinger = malloc(sizeof(*pinger));
310   ASSERT_NOT_NULL(pinger);
311   pinger->vectored_writes = vectored_writes;
312   pinger->state = 0;
313   pinger->pongs = 0;
314   pinger->pong = PONG;
315 
316   /* Try to make a socketpair and do NUM_PINGS ping-pongs. */
317   (void)uv_default_loop(); /* ensure WSAStartup has been performed */
318   ASSERT_EQ(0, uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE));
319 #ifndef _WIN32
320   /* On Windows, this is actually a UV_TCP, but libuv doesn't detect that. */
321   ASSERT_EQ(uv_guess_handle((uv_file) fds[0]), UV_NAMED_PIPE);
322   ASSERT_EQ(uv_guess_handle((uv_file) fds[1]), UV_NAMED_PIPE);
323 #endif
324 
325   ASSERT_EQ(0, uv_tcp_init(uv_default_loop(), &pinger->stream.tcp));
326   pinger->stream.pipe.data = pinger;
327   ASSERT_EQ(0, uv_tcp_open(&pinger->stream.tcp, fds[1]));
328 
329   ponger = malloc(sizeof(*ponger));
330   ASSERT_NOT_NULL(ponger);
331   ponger->data = NULL;
332   ASSERT_EQ(0, uv_tcp_init(uv_default_loop(), ponger));
333   ASSERT_EQ(0, uv_tcp_open(ponger, fds[0]));
334 
335   pinger_write_ping(pinger);
336 
337   ASSERT_EQ(0, uv_read_start((uv_stream_t*) &pinger->stream.tcp,
338                              alloc_cb,
339                              pinger_read_cb));
340   ASSERT_EQ(0, uv_read_start((uv_stream_t*) ponger,
341                              alloc_cb,
342                              ponger_read_cb));
343 }
344 
345 
pipe2_pinger_new(int vectored_writes)346 static void pipe2_pinger_new(int vectored_writes) {
347   uv_file fds[2];
348   pinger_t* pinger;
349   uv_pipe_t* ponger;
350 
351   /* Try to make a pipe and do NUM_PINGS pings. */
352   ASSERT_EQ(0, uv_pipe(fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE));
353   ASSERT_EQ(uv_guess_handle(fds[0]), UV_NAMED_PIPE);
354   ASSERT_EQ(uv_guess_handle(fds[1]), UV_NAMED_PIPE);
355 
356   ponger = malloc(sizeof(*ponger));
357   ASSERT_NOT_NULL(ponger);
358   ASSERT_EQ(0, uv_pipe_init(uv_default_loop(), ponger, 0));
359   ASSERT_EQ(0, uv_pipe_open(ponger, fds[0]));
360 
361   pinger = malloc(sizeof(*pinger));
362   ASSERT_NOT_NULL(pinger);
363   pinger->vectored_writes = vectored_writes;
364   pinger->state = 0;
365   pinger->pongs = 0;
366   pinger->pong = PING;
367   ASSERT_EQ(0, uv_pipe_init(uv_default_loop(), &pinger->stream.pipe, 0));
368   ASSERT_EQ(0, uv_pipe_open(&pinger->stream.pipe, fds[1]));
369   pinger->stream.pipe.data = pinger; /* record for close_cb */
370   ponger->data = pinger; /* record for read_cb */
371 
372   pinger_write_ping(pinger);
373 
374   ASSERT_EQ(0, uv_read_start((uv_stream_t*) ponger, alloc_cb, pinger_read_cb));
375 }
376 
run_ping_pong_test(void)377 static int run_ping_pong_test(void) {
378   uv_run(uv_default_loop(), UV_RUN_DEFAULT);
379   ASSERT_EQ(completed_pingers, 1);
380 
381   MAKE_VALGRIND_HAPPY();
382   return 0;
383 }
384 
385 
TEST_IMPL(tcp_ping_pong)386 TEST_IMPL(tcp_ping_pong) {
387   tcp_pinger_new(0);
388   run_ping_pong_test();
389 
390   completed_pingers = 0;
391   socketpair_pinger_new(0);
392   return run_ping_pong_test();
393 }
394 
395 
TEST_IMPL(tcp_ping_pong_vec)396 TEST_IMPL(tcp_ping_pong_vec) {
397   tcp_pinger_new(1);
398   run_ping_pong_test();
399 
400   completed_pingers = 0;
401   socketpair_pinger_new(1);
402   return run_ping_pong_test();
403 }
404 
405 
TEST_IMPL(tcp6_ping_pong)406 TEST_IMPL(tcp6_ping_pong) {
407   if (!can_ipv6())
408     RETURN_SKIP("IPv6 not supported");
409   tcp_pinger_v6_new(0);
410   return run_ping_pong_test();
411 }
412 
413 
TEST_IMPL(tcp6_ping_pong_vec)414 TEST_IMPL(tcp6_ping_pong_vec) {
415   if (!can_ipv6())
416     RETURN_SKIP("IPv6 not supported");
417   tcp_pinger_v6_new(1);
418   return run_ping_pong_test();
419 }
420 
421 
TEST_IMPL(pipe_ping_pong)422 TEST_IMPL(pipe_ping_pong) {
423   pipe_pinger_new(0);
424   run_ping_pong_test();
425 
426   completed_pingers = 0;
427   pipe2_pinger_new(0);
428   return run_ping_pong_test();
429 }
430 
431 
TEST_IMPL(pipe_ping_pong_vec)432 TEST_IMPL(pipe_ping_pong_vec) {
433   pipe_pinger_new(1);
434   run_ping_pong_test();
435 
436   completed_pingers = 0;
437   pipe2_pinger_new(1);
438   return run_ping_pong_test();
439 }
440