• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv.h"
23 #include "task.h"
24 
25 #include <stdio.h>
26 #include <string.h>
27 
28 static uv_pipe_t channel;
29 static uv_tcp_t tcp_server;
30 static uv_tcp_t tcp_server2;
31 static uv_tcp_t tcp_connection;
32 
33 static int exit_cb_called;
34 static int read_cb_called;
35 static int tcp_write_cb_called;
36 static int tcp_read_cb_called;
37 static int on_pipe_read_called;
38 static int local_conn_accepted;
39 static int remote_conn_accepted;
40 static int tcp_server_listening;
41 static uv_write_t write_req;
42 static uv_write_t write_req2;
43 static uv_write_t conn_notify_req;
44 static int close_cb_called;
45 static int connection_accepted;
46 static int tcp_conn_read_cb_called;
47 static int tcp_conn_write_cb_called;
48 static int closed_handle_data_read;
49 static int closed_handle_write;
50 static int send_zero_write;
51 
52 typedef struct {
53   uv_connect_t conn_req;
54   uv_write_t tcp_write_req;
55   uv_tcp_t conn;
56 } tcp_conn;
57 
58 #define CONN_COUNT 100
59 #define BACKLOG 128
60 #define LARGE_SIZE 100000
61 
62 static uv_buf_t large_buf;
63 static char buffer[LARGE_SIZE];
64 static uv_write_t write_reqs[300];
65 static int write_reqs_completed;
66 
67 static unsigned int write_until_data_queued(void);
68 static void send_handle_and_close(void);
69 
70 
close_server_conn_cb(uv_handle_t * handle)71 static void close_server_conn_cb(uv_handle_t* handle) {
72   free(handle);
73 }
74 
75 
on_connection(uv_stream_t * server,int status)76 static void on_connection(uv_stream_t* server, int status) {
77   uv_tcp_t* conn;
78   int r;
79 
80   if (!local_conn_accepted) {
81     /* Accept the connection and close it.  Also and close the server. */
82     ASSERT(status == 0);
83     ASSERT((uv_stream_t*)&tcp_server == server);
84 
85     conn = malloc(sizeof(*conn));
86     ASSERT(conn);
87     r = uv_tcp_init(server->loop, conn);
88     ASSERT(r == 0);
89 
90     r = uv_accept(server, (uv_stream_t*)conn);
91     ASSERT(r == 0);
92 
93     uv_close((uv_handle_t*)conn, close_server_conn_cb);
94     uv_close((uv_handle_t*)server, NULL);
95     local_conn_accepted = 1;
96   }
97 }
98 
99 
exit_cb(uv_process_t * process,int64_t exit_status,int term_signal)100 static void exit_cb(uv_process_t* process,
101                     int64_t exit_status,
102                     int term_signal) {
103   printf("exit_cb\n");
104   exit_cb_called++;
105   ASSERT(exit_status == 0);
106   ASSERT(term_signal == 0);
107   uv_close((uv_handle_t*)process, NULL);
108 }
109 
110 
on_alloc(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)111 static void on_alloc(uv_handle_t* handle,
112                      size_t suggested_size,
113                      uv_buf_t* buf) {
114   buf->base = malloc(suggested_size);
115   buf->len = suggested_size;
116 }
117 
118 
close_client_conn_cb(uv_handle_t * handle)119 static void close_client_conn_cb(uv_handle_t* handle) {
120   tcp_conn* p = (tcp_conn*)handle->data;
121   free(p);
122 }
123 
124 
connect_cb(uv_connect_t * req,int status)125 static void connect_cb(uv_connect_t* req, int status) {
126   uv_close((uv_handle_t*)req->handle, close_client_conn_cb);
127 }
128 
129 
make_many_connections(void)130 static void make_many_connections(void) {
131   tcp_conn* conn;
132   struct sockaddr_in addr;
133   int r, i;
134 
135   for (i = 0; i < CONN_COUNT; i++) {
136     conn = malloc(sizeof(*conn));
137     ASSERT(conn);
138 
139     r = uv_tcp_init(uv_default_loop(), &conn->conn);
140     ASSERT(r == 0);
141 
142     ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
143 
144     r = uv_tcp_connect(&conn->conn_req,
145                        (uv_tcp_t*) &conn->conn,
146                        (const struct sockaddr*) &addr,
147                        connect_cb);
148     ASSERT(r == 0);
149 
150     conn->conn.data = conn;
151   }
152 }
153 
154 
on_read(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)155 static void on_read(uv_stream_t* handle,
156                     ssize_t nread,
157                     const uv_buf_t* buf) {
158   int r;
159   uv_pipe_t* pipe;
160   uv_handle_type pending;
161   uv_buf_t outbuf;
162 
163   pipe = (uv_pipe_t*) handle;
164 
165   if (nread == 0) {
166     /* Everything OK, but nothing read. */
167     free(buf->base);
168     return;
169   }
170 
171   if (nread < 0) {
172     if (nread == UV_EOF) {
173       free(buf->base);
174       return;
175     }
176 
177     printf("error recving on channel: %s\n", uv_strerror(nread));
178     abort();
179   }
180 
181   fprintf(stderr, "got %d bytes\n", (int)nread);
182 
183   pending = uv_pipe_pending_type(pipe);
184   if (!tcp_server_listening) {
185     ASSERT(1 == uv_pipe_pending_count(pipe));
186     ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE);
187     read_cb_called++;
188 
189     /* Accept the pending TCP server, and start listening on it. */
190     ASSERT(pending == UV_TCP);
191     r = uv_tcp_init(uv_default_loop(), &tcp_server);
192     ASSERT(r == 0);
193 
194     r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
195     ASSERT(r == 0);
196 
197     r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection);
198     ASSERT(r == 0);
199 
200     tcp_server_listening = 1;
201 
202     /* Make sure that the expected data is correctly multiplexed. */
203     ASSERT(memcmp("hello\n", buf->base, nread) == 0);
204 
205     outbuf = uv_buf_init("world\n", 6);
206     r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL);
207     ASSERT(r == 0);
208 
209     /* Create a bunch of connections to get both servers to accept. */
210     make_many_connections();
211   } else if (memcmp("accepted_connection\n", buf->base, nread) == 0) {
212     /* Remote server has accepted a connection.  Close the channel. */
213     ASSERT(0 == uv_pipe_pending_count(pipe));
214     ASSERT(pending == UV_UNKNOWN_HANDLE);
215     remote_conn_accepted = 1;
216     uv_close((uv_handle_t*)&channel, NULL);
217   }
218 
219   free(buf->base);
220 }
221 
222 #ifdef _WIN32
on_read_listen_after_bound_twice(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)223 static void on_read_listen_after_bound_twice(uv_stream_t* handle,
224                                              ssize_t nread,
225                                              const uv_buf_t* buf) {
226   int r;
227   uv_pipe_t* pipe;
228   uv_handle_type pending;
229 
230   pipe = (uv_pipe_t*) handle;
231 
232   if (nread == 0) {
233     /* Everything OK, but nothing read. */
234     free(buf->base);
235     return;
236   }
237 
238   if (nread < 0) {
239     if (nread == UV_EOF) {
240       free(buf->base);
241       return;
242     }
243 
244     printf("error recving on channel: %s\n", uv_strerror(nread));
245     abort();
246   }
247 
248   fprintf(stderr, "got %d bytes\n", (int)nread);
249 
250   ASSERT(uv_pipe_pending_count(pipe) > 0);
251   pending = uv_pipe_pending_type(pipe);
252   ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE);
253   read_cb_called++;
254 
255   if (read_cb_called == 1) {
256     /* Accept the first TCP server, and start listening on it. */
257     ASSERT(pending == UV_TCP);
258     r = uv_tcp_init(uv_default_loop(), &tcp_server);
259     ASSERT(r == 0);
260 
261     r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
262     ASSERT(r == 0);
263 
264     r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection);
265     ASSERT(r == 0);
266   } else if (read_cb_called == 2) {
267     /* Accept the second TCP server, and start listening on it. */
268     ASSERT(pending == UV_TCP);
269     r = uv_tcp_init(uv_default_loop(), &tcp_server2);
270     ASSERT(r == 0);
271 
272     r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server2);
273     ASSERT(r == 0);
274 
275     r = uv_listen((uv_stream_t*)&tcp_server2, BACKLOG, on_connection);
276     ASSERT(r == UV_EADDRINUSE);
277 
278     uv_close((uv_handle_t*)&tcp_server, NULL);
279     uv_close((uv_handle_t*)&tcp_server2, NULL);
280     ASSERT(0 == uv_pipe_pending_count(pipe));
281     uv_close((uv_handle_t*)&channel, NULL);
282   }
283 
284   free(buf->base);
285 }
286 #endif
287 
spawn_helper(uv_pipe_t * channel,uv_process_t * process,const char * helper)288 void spawn_helper(uv_pipe_t* channel,
289                   uv_process_t* process,
290                   const char* helper) {
291   uv_process_options_t options;
292   size_t exepath_size;
293   char exepath[1024];
294   char* args[3];
295   int r;
296   uv_stdio_container_t stdio[3];
297 
298   r = uv_pipe_init(uv_default_loop(), channel, 1);
299   ASSERT(r == 0);
300   ASSERT(channel->ipc);
301 
302   exepath_size = sizeof(exepath);
303   r = uv_exepath(exepath, &exepath_size);
304   ASSERT(r == 0);
305 
306   exepath[exepath_size] = '\0';
307   args[0] = exepath;
308   args[1] = (char*)helper;
309   args[2] = NULL;
310 
311   memset(&options, 0, sizeof(options));
312   options.file = exepath;
313   options.args = args;
314   options.exit_cb = exit_cb;
315   options.stdio = stdio;
316   options.stdio_count = ARRAY_SIZE(stdio);
317 
318   stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
319   stdio[0].data.stream = (uv_stream_t*) channel;
320   stdio[1].flags = UV_INHERIT_FD;
321   stdio[1].data.fd = 1;
322   stdio[2].flags = UV_INHERIT_FD;
323   stdio[2].data.fd = 2;
324 
325   r = uv_spawn(uv_default_loop(), process, &options);
326   ASSERT(r == 0);
327 }
328 
329 
on_tcp_write(uv_write_t * req,int status)330 static void on_tcp_write(uv_write_t* req, int status) {
331   ASSERT(status == 0);
332   ASSERT(req->handle == (uv_stream_t*)&tcp_connection);
333   tcp_write_cb_called++;
334 }
335 
336 
on_read_alloc(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)337 static void on_read_alloc(uv_handle_t* handle,
338                           size_t suggested_size,
339                           uv_buf_t* buf) {
340   buf->base = malloc(suggested_size);
341   buf->len = suggested_size;
342 }
343 
344 
on_tcp_read(uv_stream_t * tcp,ssize_t nread,const uv_buf_t * buf)345 static void on_tcp_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) {
346   ASSERT(nread > 0);
347   ASSERT(memcmp("hello again\n", buf->base, nread) == 0);
348   ASSERT(tcp == (uv_stream_t*)&tcp_connection);
349   free(buf->base);
350 
351   tcp_read_cb_called++;
352 
353   uv_close((uv_handle_t*)tcp, NULL);
354   uv_close((uv_handle_t*)&channel, NULL);
355 }
356 
357 
on_read_connection(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)358 static void on_read_connection(uv_stream_t* handle,
359                                ssize_t nread,
360                                const uv_buf_t* buf) {
361   int r;
362   uv_buf_t outbuf;
363   uv_pipe_t* pipe;
364   uv_handle_type pending;
365 
366   pipe = (uv_pipe_t*) handle;
367   if (nread == 0) {
368     /* Everything OK, but nothing read. */
369     free(buf->base);
370     return;
371   }
372 
373   if (nread < 0) {
374     if (nread == UV_EOF) {
375       free(buf->base);
376       return;
377     }
378 
379     printf("error recving on channel: %s\n", uv_strerror(nread));
380     abort();
381   }
382 
383   fprintf(stderr, "got %d bytes\n", (int)nread);
384 
385   ASSERT(1 == uv_pipe_pending_count(pipe));
386   pending = uv_pipe_pending_type(pipe);
387 
388   ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE);
389   read_cb_called++;
390 
391   /* Accept the pending TCP connection */
392   ASSERT(pending == UV_TCP);
393   r = uv_tcp_init(uv_default_loop(), &tcp_connection);
394   ASSERT(r == 0);
395 
396   r = uv_accept(handle, (uv_stream_t*)&tcp_connection);
397   ASSERT(r == 0);
398 
399   /* Make sure that the expected data is correctly multiplexed. */
400   ASSERT(memcmp("hello\n", buf->base, nread) == 0);
401 
402   /* Write/read to/from the connection */
403   outbuf = uv_buf_init("world\n", 6);
404   r = uv_write(&write_req, (uv_stream_t*)&tcp_connection, &outbuf, 1,
405     on_tcp_write);
406   ASSERT(r == 0);
407 
408   r = uv_read_start((uv_stream_t*)&tcp_connection, on_read_alloc, on_tcp_read);
409   ASSERT(r == 0);
410 
411   free(buf->base);
412 }
413 
414 
415 #ifndef _WIN32
on_read_closed_handle(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)416 static void on_read_closed_handle(uv_stream_t* handle,
417                                   ssize_t nread,
418                                   const uv_buf_t* buf) {
419   if (nread == 0 || nread == UV_EOF) {
420     free(buf->base);
421     return;
422   }
423 
424   if (nread < 0) {
425     printf("error recving on channel: %s\n", uv_strerror(nread));
426     abort();
427   }
428 
429   closed_handle_data_read += nread;
430   free(buf->base);
431 }
432 #endif
433 
434 
on_read_send_zero(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)435 static void on_read_send_zero(uv_stream_t* handle,
436                               ssize_t nread,
437                               const uv_buf_t* buf) {
438   ASSERT(nread == 0 || nread == UV_EOF);
439   free(buf->base);
440 }
441 
442 
run_ipc_test(const char * helper,uv_read_cb read_cb)443 static int run_ipc_test(const char* helper, uv_read_cb read_cb) {
444   uv_process_t process;
445   int r;
446 
447   spawn_helper(&channel, &process, helper);
448   uv_read_start((uv_stream_t*)&channel, on_alloc, read_cb);
449 
450   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
451   ASSERT(r == 0);
452 
453   MAKE_VALGRIND_HAPPY();
454   return 0;
455 }
456 
457 
TEST_IMPL(ipc_listen_before_write)458 TEST_IMPL(ipc_listen_before_write) {
459 #if defined(NO_SEND_HANDLE_ON_PIPE)
460   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
461 #endif
462   int r = run_ipc_test("ipc_helper_listen_before_write", on_read);
463   ASSERT(local_conn_accepted == 1);
464   ASSERT(remote_conn_accepted == 1);
465   ASSERT(read_cb_called == 1);
466   ASSERT(exit_cb_called == 1);
467   return r;
468 }
469 
470 
TEST_IMPL(ipc_listen_after_write)471 TEST_IMPL(ipc_listen_after_write) {
472 #if defined(NO_SEND_HANDLE_ON_PIPE)
473   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
474 #endif
475   int r = run_ipc_test("ipc_helper_listen_after_write", on_read);
476   ASSERT(local_conn_accepted == 1);
477   ASSERT(remote_conn_accepted == 1);
478   ASSERT(read_cb_called == 1);
479   ASSERT(exit_cb_called == 1);
480   return r;
481 }
482 
483 
TEST_IMPL(ipc_tcp_connection)484 TEST_IMPL(ipc_tcp_connection) {
485 #if defined(NO_SEND_HANDLE_ON_PIPE)
486   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
487 #endif
488   int r = run_ipc_test("ipc_helper_tcp_connection", on_read_connection);
489   ASSERT(read_cb_called == 1);
490   ASSERT(tcp_write_cb_called == 1);
491   ASSERT(tcp_read_cb_called == 1);
492   ASSERT(exit_cb_called == 1);
493   return r;
494 }
495 
496 #ifndef _WIN32
TEST_IMPL(ipc_closed_handle)497 TEST_IMPL(ipc_closed_handle) {
498   int r;
499   r = run_ipc_test("ipc_helper_closed_handle", on_read_closed_handle);
500   ASSERT(r == 0);
501   return 0;
502 }
503 #endif
504 
505 
506 #ifdef _WIN32
TEST_IMPL(listen_with_simultaneous_accepts)507 TEST_IMPL(listen_with_simultaneous_accepts) {
508   uv_tcp_t server;
509   int r;
510   struct sockaddr_in addr;
511 
512   ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
513 
514   r = uv_tcp_init(uv_default_loop(), &server);
515   ASSERT(r == 0);
516 
517   r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
518   ASSERT(r == 0);
519 
520   r = uv_tcp_simultaneous_accepts(&server, 1);
521   ASSERT(r == 0);
522 
523   r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
524   ASSERT(r == 0);
525   ASSERT(server.reqs_pending == 32);
526 
527   MAKE_VALGRIND_HAPPY();
528   return 0;
529 }
530 
531 
TEST_IMPL(listen_no_simultaneous_accepts)532 TEST_IMPL(listen_no_simultaneous_accepts) {
533   uv_tcp_t server;
534   int r;
535   struct sockaddr_in addr;
536 
537   ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
538 
539   r = uv_tcp_init(uv_default_loop(), &server);
540   ASSERT(r == 0);
541 
542   r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
543   ASSERT(r == 0);
544 
545   r = uv_tcp_simultaneous_accepts(&server, 0);
546   ASSERT(r == 0);
547 
548   r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
549   ASSERT(r == 0);
550   ASSERT(server.reqs_pending == 1);
551 
552   MAKE_VALGRIND_HAPPY();
553   return 0;
554 }
555 
TEST_IMPL(ipc_listen_after_bind_twice)556 TEST_IMPL(ipc_listen_after_bind_twice) {
557 #if defined(NO_SEND_HANDLE_ON_PIPE)
558   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
559 #endif
560   int r = run_ipc_test("ipc_helper_bind_twice", on_read_listen_after_bound_twice);
561   ASSERT(read_cb_called == 2);
562   ASSERT(exit_cb_called == 1);
563   return r;
564 }
565 #endif
566 
TEST_IMPL(ipc_send_zero)567 TEST_IMPL(ipc_send_zero) {
568   int r;
569   r = run_ipc_test("ipc_helper_send_zero", on_read_send_zero);
570   ASSERT(r == 0);
571   return 0;
572 }
573 
574 
575 /* Everything here runs in a child process. */
576 
577 static tcp_conn conn;
578 
579 
close_cb(uv_handle_t * handle)580 static void close_cb(uv_handle_t* handle) {
581   close_cb_called++;
582 }
583 
584 
conn_notify_write_cb(uv_write_t * req,int status)585 static void conn_notify_write_cb(uv_write_t* req, int status) {
586   uv_close((uv_handle_t*)&tcp_server, close_cb);
587   uv_close((uv_handle_t*)&channel, close_cb);
588 }
589 
590 
tcp_connection_write_cb(uv_write_t * req,int status)591 static void tcp_connection_write_cb(uv_write_t* req, int status) {
592   ASSERT((uv_handle_t*)&conn.conn == (uv_handle_t*)req->handle);
593   uv_close((uv_handle_t*)req->handle, close_cb);
594   uv_close((uv_handle_t*)&channel, close_cb);
595   uv_close((uv_handle_t*)&tcp_server, close_cb);
596   tcp_conn_write_cb_called++;
597 }
598 
599 
closed_handle_large_write_cb(uv_write_t * req,int status)600 static void closed_handle_large_write_cb(uv_write_t* req, int status) {
601   ASSERT(status == 0);
602   ASSERT(closed_handle_data_read = LARGE_SIZE);
603   if (++write_reqs_completed == ARRAY_SIZE(write_reqs)) {
604     write_reqs_completed = 0;
605     if (write_until_data_queued() > 0)
606       send_handle_and_close();
607   }
608 }
609 
610 
closed_handle_write_cb(uv_write_t * req,int status)611 static void closed_handle_write_cb(uv_write_t* req, int status) {
612   ASSERT(status == UV_EBADF);
613   closed_handle_write = 1;
614 }
615 
616 
send_zero_write_cb(uv_write_t * req,int status)617 static void send_zero_write_cb(uv_write_t* req, int status) {
618   ASSERT(status == 0);
619   send_zero_write++;
620 }
621 
on_tcp_child_process_read(uv_stream_t * tcp,ssize_t nread,const uv_buf_t * buf)622 static void on_tcp_child_process_read(uv_stream_t* tcp,
623                                       ssize_t nread,
624                                       const uv_buf_t* buf) {
625   uv_buf_t outbuf;
626   int r;
627 
628   if (nread < 0) {
629     if (nread == UV_EOF) {
630       free(buf->base);
631       return;
632     }
633 
634     printf("error recving on tcp connection: %s\n", uv_strerror(nread));
635     abort();
636   }
637 
638   ASSERT(nread > 0);
639   ASSERT(memcmp("world\n", buf->base, nread) == 0);
640   on_pipe_read_called++;
641   free(buf->base);
642 
643   /* Write to the socket */
644   outbuf = uv_buf_init("hello again\n", 12);
645   r = uv_write(&conn.tcp_write_req, tcp, &outbuf, 1, tcp_connection_write_cb);
646   ASSERT(r == 0);
647 
648   tcp_conn_read_cb_called++;
649 }
650 
651 
connect_child_process_cb(uv_connect_t * req,int status)652 static void connect_child_process_cb(uv_connect_t* req, int status) {
653   int r;
654 
655   ASSERT(status == 0);
656   r = uv_read_start(req->handle, on_read_alloc, on_tcp_child_process_read);
657   ASSERT(r == 0);
658 }
659 
660 
ipc_on_connection(uv_stream_t * server,int status)661 static void ipc_on_connection(uv_stream_t* server, int status) {
662   int r;
663   uv_buf_t buf;
664 
665   if (!connection_accepted) {
666     /*
667      * Accept the connection and close it.  Also let the other
668      * side know.
669      */
670     ASSERT(status == 0);
671     ASSERT((uv_stream_t*)&tcp_server == server);
672 
673     r = uv_tcp_init(server->loop, &conn.conn);
674     ASSERT(r == 0);
675 
676     r = uv_accept(server, (uv_stream_t*)&conn.conn);
677     ASSERT(r == 0);
678 
679     uv_close((uv_handle_t*)&conn.conn, close_cb);
680 
681     buf = uv_buf_init("accepted_connection\n", 20);
682     r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
683       NULL, conn_notify_write_cb);
684     ASSERT(r == 0);
685 
686     connection_accepted = 1;
687   }
688 }
689 
690 
ipc_on_connection_tcp_conn(uv_stream_t * server,int status)691 static void ipc_on_connection_tcp_conn(uv_stream_t* server, int status) {
692   int r;
693   uv_buf_t buf;
694   uv_tcp_t* conn;
695 
696   ASSERT(status == 0);
697   ASSERT((uv_stream_t*)&tcp_server == server);
698 
699   conn = malloc(sizeof(*conn));
700   ASSERT(conn);
701 
702   r = uv_tcp_init(server->loop, conn);
703   ASSERT(r == 0);
704 
705   r = uv_accept(server, (uv_stream_t*)conn);
706   ASSERT(r == 0);
707 
708   /* Send the accepted connection to the other process */
709   buf = uv_buf_init("hello\n", 6);
710   r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
711     (uv_stream_t*)conn, NULL);
712   ASSERT(r == 0);
713 
714   r = uv_read_start((uv_stream_t*) conn,
715                     on_read_alloc,
716                     on_tcp_child_process_read);
717   ASSERT(r == 0);
718 
719   uv_close((uv_handle_t*)conn, close_cb);
720 }
721 
722 
ipc_helper(int listen_after_write)723 int ipc_helper(int listen_after_write) {
724   /*
725    * This is launched from test-ipc.c. stdin is a duplex channel that we
726    * over which a handle will be transmitted.
727    */
728   struct sockaddr_in addr;
729   int r;
730   uv_buf_t buf;
731 
732   ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
733 
734   r = uv_pipe_init(uv_default_loop(), &channel, 1);
735   ASSERT(r == 0);
736 
737   uv_pipe_open(&channel, 0);
738 
739   ASSERT(1 == uv_is_readable((uv_stream_t*) &channel));
740   ASSERT(1 == uv_is_writable((uv_stream_t*) &channel));
741   ASSERT(0 == uv_is_closing((uv_handle_t*) &channel));
742 
743   r = uv_tcp_init(uv_default_loop(), &tcp_server);
744   ASSERT(r == 0);
745 
746   r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
747   ASSERT(r == 0);
748 
749   if (!listen_after_write) {
750     r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection);
751     ASSERT(r == 0);
752   }
753 
754   buf = uv_buf_init("hello\n", 6);
755   r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
756       (uv_stream_t*)&tcp_server, NULL);
757   ASSERT(r == 0);
758 
759   if (listen_after_write) {
760     r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection);
761     ASSERT(r == 0);
762   }
763 
764   notify_parent_process();
765   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
766   ASSERT(r == 0);
767 
768   ASSERT(connection_accepted == 1);
769   ASSERT(close_cb_called == 3);
770 
771   MAKE_VALGRIND_HAPPY();
772   return 0;
773 }
774 
775 
ipc_helper_tcp_connection(void)776 int ipc_helper_tcp_connection(void) {
777   /*
778    * This is launched from test-ipc.c. stdin is a duplex channel
779    * over which a handle will be transmitted.
780    */
781 
782   int r;
783   struct sockaddr_in addr;
784 
785   r = uv_pipe_init(uv_default_loop(), &channel, 1);
786   ASSERT(r == 0);
787 
788   uv_pipe_open(&channel, 0);
789 
790   ASSERT(1 == uv_is_readable((uv_stream_t*) &channel));
791   ASSERT(1 == uv_is_writable((uv_stream_t*) &channel));
792   ASSERT(0 == uv_is_closing((uv_handle_t*) &channel));
793 
794   r = uv_tcp_init(uv_default_loop(), &tcp_server);
795   ASSERT(r == 0);
796 
797   ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
798 
799   r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
800   ASSERT(r == 0);
801 
802   r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection_tcp_conn);
803   ASSERT(r == 0);
804 
805   /* Make a connection to the server */
806   r = uv_tcp_init(uv_default_loop(), &conn.conn);
807   ASSERT(r == 0);
808 
809   ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
810 
811   r = uv_tcp_connect(&conn.conn_req,
812                      (uv_tcp_t*) &conn.conn,
813                      (const struct sockaddr*) &addr,
814                      connect_child_process_cb);
815   ASSERT(r == 0);
816 
817   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
818   ASSERT(r == 0);
819 
820   ASSERT(tcp_conn_read_cb_called == 1);
821   ASSERT(tcp_conn_write_cb_called == 1);
822   ASSERT(close_cb_called == 4);
823 
824   MAKE_VALGRIND_HAPPY();
825   return 0;
826 }
827 
write_until_data_queued()828 static unsigned int write_until_data_queued() {
829   unsigned int i;
830   int r;
831 
832   i = 0;
833   do {
834     r = uv_write(&write_reqs[i],
835                  (uv_stream_t*)&channel,
836                  &large_buf,
837                  1,
838                  closed_handle_large_write_cb);
839     ASSERT(r == 0);
840     i++;
841   } while (channel.write_queue_size == 0 &&
842            i < ARRAY_SIZE(write_reqs));
843 
844   return channel.write_queue_size;
845 }
846 
send_handle_and_close()847 static void send_handle_and_close() {
848   int r;
849   struct sockaddr_in addr;
850 
851   r = uv_tcp_init(uv_default_loop(), &tcp_server);
852   ASSERT(r == 0);
853 
854   ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
855 
856   r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
857   ASSERT(r == 0);
858 
859   r = uv_write2(&write_req,
860                 (uv_stream_t*)&channel,
861                 &large_buf,
862                 1,
863                 (uv_stream_t*)&tcp_server,
864                 closed_handle_write_cb);
865   ASSERT(r == 0);
866 
867   uv_close((uv_handle_t*)&tcp_server, NULL);
868 }
869 
ipc_helper_closed_handle(void)870 int ipc_helper_closed_handle(void) {
871   int r;
872 
873   memset(buffer, '.', LARGE_SIZE);
874   large_buf = uv_buf_init(buffer, LARGE_SIZE);
875 
876   r = uv_pipe_init(uv_default_loop(), &channel, 1);
877   ASSERT(r == 0);
878 
879   uv_pipe_open(&channel, 0);
880 
881   ASSERT(1 == uv_is_readable((uv_stream_t*) &channel));
882   ASSERT(1 == uv_is_writable((uv_stream_t*) &channel));
883   ASSERT(0 == uv_is_closing((uv_handle_t*) &channel));
884 
885   if (write_until_data_queued() > 0)
886     send_handle_and_close();
887 
888   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
889   ASSERT(r == 0);
890 
891   ASSERT(closed_handle_write == 1);
892 
893   MAKE_VALGRIND_HAPPY();
894   return 0;
895 }
896 
897 
ipc_helper_bind_twice(void)898 int ipc_helper_bind_twice(void) {
899   /*
900    * This is launched from test-ipc.c. stdin is a duplex channel
901    * over which two handles will be transmitted.
902    */
903   struct sockaddr_in addr;
904   int r;
905   uv_buf_t buf;
906 
907   ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
908 
909   r = uv_pipe_init(uv_default_loop(), &channel, 1);
910   ASSERT(r == 0);
911 
912   uv_pipe_open(&channel, 0);
913 
914   ASSERT(1 == uv_is_readable((uv_stream_t*) &channel));
915   ASSERT(1 == uv_is_writable((uv_stream_t*) &channel));
916   ASSERT(0 == uv_is_closing((uv_handle_t*) &channel));
917 
918   buf = uv_buf_init("hello\n", 6);
919 
920   r = uv_tcp_init(uv_default_loop(), &tcp_server);
921   ASSERT(r == 0);
922   r = uv_tcp_init(uv_default_loop(), &tcp_server2);
923   ASSERT(r == 0);
924 
925   r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
926   ASSERT(r == 0);
927   r = uv_tcp_bind(&tcp_server2, (const struct sockaddr*) &addr, 0);
928   ASSERT(r == 0);
929 
930   r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
931                 (uv_stream_t*)&tcp_server, NULL);
932   ASSERT(r == 0);
933   r = uv_write2(&write_req2, (uv_stream_t*)&channel, &buf, 1,
934                 (uv_stream_t*)&tcp_server2, NULL);
935   ASSERT(r == 0);
936 
937   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
938   ASSERT(r == 0);
939 
940   MAKE_VALGRIND_HAPPY();
941   return 0;
942 }
943 
ipc_helper_send_zero(void)944 int ipc_helper_send_zero(void) {
945   int r;
946   uv_buf_t zero_buf;
947 
948   zero_buf = uv_buf_init(0, 0);
949 
950   r = uv_pipe_init(uv_default_loop(), &channel, 0);
951   ASSERT(r == 0);
952 
953   uv_pipe_open(&channel, 0);
954 
955   ASSERT(1 == uv_is_readable((uv_stream_t*) &channel));
956   ASSERT(1 == uv_is_writable((uv_stream_t*) &channel));
957   ASSERT(0 == uv_is_closing((uv_handle_t*) &channel));
958 
959   r = uv_write(&write_req,
960                (uv_stream_t*)&channel,
961                &zero_buf,
962                1,
963                send_zero_write_cb);
964 
965   ASSERT(r == 0);
966 
967   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
968   ASSERT(r == 0);
969 
970   ASSERT(send_zero_write == 1);
971 
972   MAKE_VALGRIND_HAPPY();
973   return 0;
974 }
975