• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include "uv.h"
23 #include "task.h"
24 
25 #include <stdio.h>
26 #include <string.h>
27 
28 static uv_pipe_t channel;
29 static uv_tcp_t tcp_server;
30 static uv_tcp_t tcp_server2;
31 static uv_tcp_t tcp_connection;
32 
33 static int exit_cb_called;
34 static int read_cb_called;
35 static int tcp_write_cb_called;
36 static int tcp_read_cb_called;
37 static int on_pipe_read_called;
38 static int local_conn_accepted;
39 static int remote_conn_accepted;
40 static int tcp_server_listening;
41 static uv_write_t write_req;
42 static uv_write_t write_req2;
43 static uv_write_t conn_notify_req;
44 static int close_cb_called;
45 static int connection_accepted;
46 static int tcp_conn_read_cb_called;
47 static int tcp_conn_write_cb_called;
48 static int send_zero_write;
49 
50 typedef struct {
51   uv_connect_t conn_req;
52   uv_write_t tcp_write_req;
53   uv_tcp_t conn;
54 } tcp_conn;
55 
56 #define CONN_COUNT 100
57 #define BACKLOG 128
58 
59 
close_server_conn_cb(uv_handle_t * handle)60 static void close_server_conn_cb(uv_handle_t* handle) {
61   free(handle);
62 }
63 
64 
on_connection(uv_stream_t * server,int status)65 static void on_connection(uv_stream_t* server, int status) {
66   uv_tcp_t* conn;
67   int r;
68 
69   if (!local_conn_accepted) {
70     /* Accept the connection and close it.  Also and close the server. */
71     ASSERT_EQ(status, 0);
72     ASSERT_PTR_EQ(&tcp_server, server);
73 
74     conn = malloc(sizeof(*conn));
75     ASSERT_NOT_NULL(conn);
76     r = uv_tcp_init(server->loop, conn);
77     ASSERT_EQ(r, 0);
78 
79     r = uv_accept(server, (uv_stream_t*)conn);
80     ASSERT_EQ(r, 0);
81 
82     uv_close((uv_handle_t*)conn, close_server_conn_cb);
83     uv_close((uv_handle_t*)server, NULL);
84     local_conn_accepted = 1;
85   }
86 }
87 
88 
exit_cb(uv_process_t * process,int64_t exit_status,int term_signal)89 static void exit_cb(uv_process_t* process,
90                     int64_t exit_status,
91                     int term_signal) {
92   printf("exit_cb\n");
93   exit_cb_called++;
94   ASSERT_EQ(exit_status, 0);
95   ASSERT_EQ(term_signal, 0);
96   uv_close((uv_handle_t*)process, NULL);
97 }
98 
99 
on_alloc(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)100 static void on_alloc(uv_handle_t* handle,
101                      size_t suggested_size,
102                      uv_buf_t* buf) {
103   buf->base = malloc(suggested_size);
104   buf->len = suggested_size;
105 }
106 
107 
close_client_conn_cb(uv_handle_t * handle)108 static void close_client_conn_cb(uv_handle_t* handle) {
109   tcp_conn* p = (tcp_conn*)handle->data;
110   free(p);
111 }
112 
113 
connect_cb(uv_connect_t * req,int status)114 static void connect_cb(uv_connect_t* req, int status) {
115   uv_close((uv_handle_t*)req->handle, close_client_conn_cb);
116 }
117 
118 
make_many_connections(void)119 static void make_many_connections(void) {
120   tcp_conn* conn;
121   struct sockaddr_in addr;
122   int r, i;
123 
124   for (i = 0; i < CONN_COUNT; i++) {
125     conn = malloc(sizeof(*conn));
126     ASSERT_NOT_NULL(conn);
127 
128     r = uv_tcp_init(uv_default_loop(), &conn->conn);
129     ASSERT_EQ(r, 0);
130     ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
131 
132     r = uv_tcp_connect(&conn->conn_req,
133                        (uv_tcp_t*) &conn->conn,
134                        (const struct sockaddr*) &addr,
135                        connect_cb);
136     ASSERT_EQ(r, 0);
137 
138     conn->conn.data = conn;
139   }
140 }
141 
142 
on_read(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)143 static void on_read(uv_stream_t* handle,
144                     ssize_t nread,
145                     const uv_buf_t* buf) {
146   int r;
147   uv_pipe_t* pipe;
148   uv_handle_type pending;
149   uv_buf_t outbuf;
150 
151   pipe = (uv_pipe_t*) handle;
152 
153   if (nread == 0) {
154     /* Everything OK, but nothing read. */
155     free(buf->base);
156     return;
157   }
158 
159   if (nread < 0) {
160     if (nread == UV_EOF) {
161       free(buf->base);
162       return;
163     }
164 
165     printf("error recving on channel: %s\n", uv_strerror(nread));
166     abort();
167   }
168 
169   fprintf(stderr, "got %d bytes\n", (int)nread);
170 
171   pending = uv_pipe_pending_type(pipe);
172   if (!tcp_server_listening) {
173     ASSERT_EQ(1, uv_pipe_pending_count(pipe));
174     ASSERT_GT(nread, 0);
175     ASSERT_NOT_NULL(buf->base);
176     ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
177     read_cb_called++;
178 
179     /* Accept the pending TCP server, and start listening on it. */
180     ASSERT_EQ(pending, UV_TCP);
181     r = uv_tcp_init(uv_default_loop(), &tcp_server);
182     ASSERT_EQ(r, 0);
183 
184     r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
185     ASSERT_EQ(r, 0);
186 
187     r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection);
188     ASSERT_EQ(r, 0);
189 
190     tcp_server_listening = 1;
191 
192     /* Make sure that the expected data is correctly multiplexed. */
193     ASSERT_MEM_EQ("hello\n", buf->base, nread);
194 
195     outbuf = uv_buf_init("foobar\n", 7);
196     r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL);
197     ASSERT_EQ(r, 0);
198 
199     /* Create a bunch of connections to get both servers to accept. */
200     make_many_connections();
201   } else if (memcmp("accepted_connection\n", buf->base, nread) == 0) {
202     /* Remote server has accepted a connection.  Close the channel. */
203     ASSERT_EQ(0, uv_pipe_pending_count(pipe));
204     ASSERT_EQ(pending, UV_UNKNOWN_HANDLE);
205     remote_conn_accepted = 1;
206     uv_close((uv_handle_t*)&channel, NULL);
207   }
208 
209   free(buf->base);
210 }
211 
212 #ifdef _WIN32
on_read_listen_after_bound_twice(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)213 static void on_read_listen_after_bound_twice(uv_stream_t* handle,
214                                              ssize_t nread,
215                                              const uv_buf_t* buf) {
216   int r;
217   uv_pipe_t* pipe;
218   uv_handle_type pending;
219 
220   pipe = (uv_pipe_t*) handle;
221 
222   if (nread == 0) {
223     /* Everything OK, but nothing read. */
224     free(buf->base);
225     return;
226   }
227 
228   if (nread < 0) {
229     if (nread == UV_EOF) {
230       free(buf->base);
231       return;
232     }
233 
234     printf("error recving on channel: %s\n", uv_strerror(nread));
235     abort();
236   }
237 
238   fprintf(stderr, "got %d bytes\n", (int)nread);
239 
240   ASSERT_GT(uv_pipe_pending_count(pipe), 0);
241   pending = uv_pipe_pending_type(pipe);
242   ASSERT_GT(nread, 0);
243   ASSERT_NOT_NULL(buf->base);
244   ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
245   read_cb_called++;
246 
247   if (read_cb_called == 1) {
248     /* Accept the first TCP server, and start listening on it. */
249     ASSERT_EQ(pending, UV_TCP);
250     r = uv_tcp_init(uv_default_loop(), &tcp_server);
251     ASSERT_EQ(r, 0);
252 
253     r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
254     ASSERT_EQ(r, 0);
255 
256     r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection);
257     ASSERT_EQ(r, 0);
258   } else if (read_cb_called == 2) {
259     /* Accept the second TCP server, and start listening on it. */
260     ASSERT_EQ(pending, UV_TCP);
261     r = uv_tcp_init(uv_default_loop(), &tcp_server2);
262     ASSERT_EQ(r, 0);
263 
264     r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server2);
265     ASSERT_EQ(r, 0);
266 
267     r = uv_listen((uv_stream_t*)&tcp_server2, BACKLOG, on_connection);
268     ASSERT_EQ(r, UV_EADDRINUSE);
269 
270     uv_close((uv_handle_t*)&tcp_server, NULL);
271     uv_close((uv_handle_t*)&tcp_server2, NULL);
272     ASSERT_EQ(0, uv_pipe_pending_count(pipe));
273     uv_close((uv_handle_t*)&channel, NULL);
274   }
275 
276   free(buf->base);
277 }
278 #endif
279 
spawn_helper(uv_pipe_t * channel,uv_process_t * process,const char * helper)280 void spawn_helper(uv_pipe_t* channel,
281                   uv_process_t* process,
282                   const char* helper) {
283   uv_process_options_t options;
284   size_t exepath_size;
285   char exepath[1024];
286   char* args[3];
287   int r;
288   uv_stdio_container_t stdio[3];
289 
290   r = uv_pipe_init(uv_default_loop(), channel, 1);
291   ASSERT_EQ(r, 0);
292   ASSERT_NE(channel->ipc, 0);
293 
294   exepath_size = sizeof(exepath);
295   r = uv_exepath(exepath, &exepath_size);
296   ASSERT_EQ(r, 0);
297 
298   exepath[exepath_size] = '\0';
299   args[0] = exepath;
300   args[1] = (char*)helper;
301   args[2] = NULL;
302 
303   memset(&options, 0, sizeof(options));
304   options.file = exepath;
305   options.args = args;
306   options.exit_cb = exit_cb;
307   options.stdio = stdio;
308   options.stdio_count = ARRAY_SIZE(stdio);
309 
310   stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
311   stdio[0].data.stream = (uv_stream_t*) channel;
312   stdio[1].flags = UV_INHERIT_FD;
313   stdio[1].data.fd = 1;
314   stdio[2].flags = UV_INHERIT_FD;
315   stdio[2].data.fd = 2;
316 
317   r = uv_spawn(uv_default_loop(), process, &options);
318   ASSERT_EQ(r, 0);
319 }
320 
321 
on_tcp_write(uv_write_t * req,int status)322 static void on_tcp_write(uv_write_t* req, int status) {
323   ASSERT_EQ(status, 0);
324   ASSERT_PTR_EQ(req->handle, &tcp_connection);
325   tcp_write_cb_called++;
326 }
327 
328 
on_read_alloc(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)329 static void on_read_alloc(uv_handle_t* handle,
330                           size_t suggested_size,
331                           uv_buf_t* buf) {
332   buf->base = malloc(suggested_size);
333   buf->len = suggested_size;
334 }
335 
336 
on_tcp_read(uv_stream_t * tcp,ssize_t nread,const uv_buf_t * buf)337 static void on_tcp_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) {
338   ASSERT_GT(nread, 0);
339   ASSERT_MEM_EQ("hello again\n", buf->base, nread);
340   ASSERT_PTR_EQ(tcp, &tcp_connection);
341   free(buf->base);
342 
343   tcp_read_cb_called++;
344 
345   uv_close((uv_handle_t*)tcp, NULL);
346   uv_close((uv_handle_t*)&channel, NULL);
347 }
348 
349 
on_read_connection(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)350 static void on_read_connection(uv_stream_t* handle,
351                                ssize_t nread,
352                                const uv_buf_t* buf) {
353   int r;
354   uv_buf_t outbuf;
355   uv_pipe_t* pipe;
356   uv_handle_type pending;
357 
358   pipe = (uv_pipe_t*) handle;
359   if (nread == 0) {
360     /* Everything OK, but nothing read. */
361     free(buf->base);
362     return;
363   }
364 
365   if (nread < 0) {
366     if (nread == UV_EOF) {
367       free(buf->base);
368       return;
369     }
370 
371     printf("error recving on channel: %s\n", uv_strerror(nread));
372     abort();
373   }
374 
375   fprintf(stderr, "got %d bytes\n", (int)nread);
376 
377   ASSERT_EQ(1, uv_pipe_pending_count(pipe));
378   pending = uv_pipe_pending_type(pipe);
379 
380   ASSERT_GT(nread, 0);
381   ASSERT_NOT_NULL(buf->base);
382   ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
383   read_cb_called++;
384 
385   /* Accept the pending TCP connection */
386   ASSERT_EQ(pending, UV_TCP);
387   r = uv_tcp_init(uv_default_loop(), &tcp_connection);
388   ASSERT_EQ(r, 0);
389 
390   r = uv_accept(handle, (uv_stream_t*)&tcp_connection);
391   ASSERT_EQ(r, 0);
392 
393   /* Make sure that the expected data is correctly multiplexed. */
394   ASSERT_MEM_EQ("hello\n", buf->base, nread);
395 
396   /* Write/read to/from the connection */
397   outbuf = uv_buf_init("world\n", 6);
398   r = uv_write(&write_req, (uv_stream_t*)&tcp_connection, &outbuf, 1,
399     on_tcp_write);
400   ASSERT_EQ(r, 0);
401 
402   r = uv_read_start((uv_stream_t*)&tcp_connection, on_read_alloc, on_tcp_read);
403   ASSERT_EQ(r, 0);
404 
405   free(buf->base);
406 }
407 
408 
on_read_send_zero(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)409 static void on_read_send_zero(uv_stream_t* handle,
410                               ssize_t nread,
411                               const uv_buf_t* buf) {
412   ASSERT(nread == 0 || nread == UV_EOF);
413   free(buf->base);
414 }
415 
416 
run_ipc_test(const char * helper,uv_read_cb read_cb)417 static int run_ipc_test(const char* helper, uv_read_cb read_cb) {
418   uv_process_t process;
419   int r;
420 
421   spawn_helper(&channel, &process, helper);
422   uv_read_start((uv_stream_t*)&channel, on_alloc, read_cb);
423 
424   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
425   ASSERT_EQ(r, 0);
426 
427   MAKE_VALGRIND_HAPPY();
428   return 0;
429 }
430 
431 
TEST_IMPL(ipc_listen_before_write)432 TEST_IMPL(ipc_listen_before_write) {
433 #if defined(NO_SEND_HANDLE_ON_PIPE)
434   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
435 #endif
436   int r = run_ipc_test("ipc_helper_listen_before_write", on_read);
437   ASSERT_EQ(local_conn_accepted, 1);
438   ASSERT_EQ(remote_conn_accepted, 1);
439   ASSERT_EQ(read_cb_called, 1);
440   ASSERT_EQ(exit_cb_called, 1);
441   return r;
442 }
443 
444 
TEST_IMPL(ipc_listen_after_write)445 TEST_IMPL(ipc_listen_after_write) {
446 #if defined(NO_SEND_HANDLE_ON_PIPE)
447   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
448 #endif
449   int r = run_ipc_test("ipc_helper_listen_after_write", on_read);
450   ASSERT_EQ(local_conn_accepted, 1);
451   ASSERT_EQ(remote_conn_accepted, 1);
452   ASSERT_EQ(read_cb_called, 1);
453   ASSERT_EQ(exit_cb_called, 1);
454   return r;
455 }
456 
457 
TEST_IMPL(ipc_tcp_connection)458 TEST_IMPL(ipc_tcp_connection) {
459 #if defined(NO_SEND_HANDLE_ON_PIPE)
460   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
461 #endif
462   int r = run_ipc_test("ipc_helper_tcp_connection", on_read_connection);
463   ASSERT_EQ(read_cb_called, 1);
464   ASSERT_EQ(tcp_write_cb_called, 1);
465   ASSERT_EQ(tcp_read_cb_called, 1);
466   ASSERT_EQ(exit_cb_called, 1);
467   return r;
468 }
469 
470 
471 #ifdef _WIN32
TEST_IMPL(listen_with_simultaneous_accepts)472 TEST_IMPL(listen_with_simultaneous_accepts) {
473   uv_tcp_t server;
474   int r;
475   struct sockaddr_in addr;
476 
477   ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
478 
479   r = uv_tcp_init(uv_default_loop(), &server);
480   ASSERT_EQ(r, 0);
481 
482   r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
483   ASSERT_EQ(r, 0);
484 
485   r = uv_tcp_simultaneous_accepts(&server, 1);
486   ASSERT_EQ(r, 0);
487 
488   r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
489   ASSERT_EQ(r, 0);
490   ASSERT_EQ(server.reqs_pending, 32);
491 
492   MAKE_VALGRIND_HAPPY();
493   return 0;
494 }
495 
496 
TEST_IMPL(listen_no_simultaneous_accepts)497 TEST_IMPL(listen_no_simultaneous_accepts) {
498   uv_tcp_t server;
499   int r;
500   struct sockaddr_in addr;
501 
502   ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
503 
504   r = uv_tcp_init(uv_default_loop(), &server);
505   ASSERT_EQ(r, 0);
506 
507   r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
508   ASSERT_EQ(r, 0);
509 
510   r = uv_tcp_simultaneous_accepts(&server, 0);
511   ASSERT_EQ(r, 0);
512 
513   r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
514   ASSERT_EQ(r, 0);
515   ASSERT_EQ(server.reqs_pending, 1);
516 
517   MAKE_VALGRIND_HAPPY();
518   return 0;
519 }
520 
TEST_IMPL(ipc_listen_after_bind_twice)521 TEST_IMPL(ipc_listen_after_bind_twice) {
522 #if defined(NO_SEND_HANDLE_ON_PIPE)
523   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
524 #endif
525   int r = run_ipc_test("ipc_helper_bind_twice", on_read_listen_after_bound_twice);
526   ASSERT_EQ(read_cb_called, 2);
527   ASSERT_EQ(exit_cb_called, 1);
528   return r;
529 }
530 #endif
531 
TEST_IMPL(ipc_send_zero)532 TEST_IMPL(ipc_send_zero) {
533   int r;
534   r = run_ipc_test("ipc_helper_send_zero", on_read_send_zero);
535   ASSERT_EQ(r, 0);
536   return 0;
537 }
538 
539 
540 /* Everything here runs in a child process. */
541 
542 static tcp_conn conn;
543 
544 
close_cb(uv_handle_t * handle)545 static void close_cb(uv_handle_t* handle) {
546   close_cb_called++;
547 }
548 
549 
conn_notify_write_cb(uv_write_t * req,int status)550 static void conn_notify_write_cb(uv_write_t* req, int status) {
551   uv_close((uv_handle_t*)&tcp_server, close_cb);
552   uv_close((uv_handle_t*)&channel, close_cb);
553 }
554 
555 
tcp_connection_write_cb(uv_write_t * req,int status)556 static void tcp_connection_write_cb(uv_write_t* req, int status) {
557   ASSERT_PTR_EQ(&conn.conn, req->handle);
558   uv_close((uv_handle_t*)req->handle, close_cb);
559   uv_close((uv_handle_t*)&channel, close_cb);
560   uv_close((uv_handle_t*)&tcp_server, close_cb);
561   tcp_conn_write_cb_called++;
562 }
563 
564 
send_zero_write_cb(uv_write_t * req,int status)565 static void send_zero_write_cb(uv_write_t* req, int status) {
566   ASSERT_EQ(status, 0);
567   send_zero_write++;
568 }
569 
on_tcp_child_process_read(uv_stream_t * tcp,ssize_t nread,const uv_buf_t * buf)570 static void on_tcp_child_process_read(uv_stream_t* tcp,
571                                       ssize_t nread,
572                                       const uv_buf_t* buf) {
573   uv_buf_t outbuf;
574   int r;
575 
576   if (nread < 0) {
577     if (nread == UV_EOF) {
578       free(buf->base);
579       return;
580     }
581 
582     printf("error recving on tcp connection: %s\n", uv_strerror(nread));
583     abort();
584   }
585 
586   ASSERT_GT(nread, 0);
587   ASSERT_MEM_EQ("world\n", buf->base, nread);
588   on_pipe_read_called++;
589   free(buf->base);
590 
591   /* Write to the socket */
592   outbuf = uv_buf_init("hello again\n", 12);
593   r = uv_write(&conn.tcp_write_req, tcp, &outbuf, 1, tcp_connection_write_cb);
594   ASSERT_EQ(r, 0);
595 
596   tcp_conn_read_cb_called++;
597 }
598 
599 
connect_child_process_cb(uv_connect_t * req,int status)600 static void connect_child_process_cb(uv_connect_t* req, int status) {
601   int r;
602 
603   ASSERT_EQ(status, 0);
604   r = uv_read_start(req->handle, on_read_alloc, on_tcp_child_process_read);
605   ASSERT_EQ(r, 0);
606 }
607 
608 
ipc_on_connection(uv_stream_t * server,int status)609 static void ipc_on_connection(uv_stream_t* server, int status) {
610   int r;
611   uv_buf_t buf;
612 
613   if (!connection_accepted) {
614     /*
615      * Accept the connection and close it.  Also let the other
616      * side know.
617      */
618     ASSERT_EQ(status, 0);
619     ASSERT_PTR_EQ(&tcp_server, server);
620 
621     r = uv_tcp_init(server->loop, &conn.conn);
622     ASSERT_EQ(r, 0);
623 
624     r = uv_accept(server, (uv_stream_t*)&conn.conn);
625     ASSERT_EQ(r, 0);
626 
627     uv_close((uv_handle_t*)&conn.conn, close_cb);
628 
629     buf = uv_buf_init("accepted_connection\n", 20);
630     r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
631       NULL, conn_notify_write_cb);
632     ASSERT_EQ(r, 0);
633 
634     connection_accepted = 1;
635   }
636 }
637 
638 
close_and_free_cb(uv_handle_t * handle)639 static void close_and_free_cb(uv_handle_t* handle) {
640   close_cb_called++;
641   free(handle);
642 }
643 
ipc_on_connection_tcp_conn(uv_stream_t * server,int status)644 static void ipc_on_connection_tcp_conn(uv_stream_t* server, int status) {
645   int r;
646   uv_buf_t buf;
647   uv_tcp_t* conn;
648 
649   ASSERT_EQ(status, 0);
650   ASSERT_PTR_EQ(&tcp_server, server);
651 
652   conn = malloc(sizeof(*conn));
653   ASSERT_NOT_NULL(conn);
654 
655   r = uv_tcp_init(server->loop, conn);
656   ASSERT_EQ(r, 0);
657 
658   r = uv_accept(server, (uv_stream_t*)conn);
659   ASSERT_EQ(r, 0);
660 
661   /* Send the accepted connection to the other process */
662   buf = uv_buf_init("hello\n", 6);
663   r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
664     (uv_stream_t*)conn, NULL);
665   ASSERT_EQ(r, 0);
666 
667   r = uv_read_start((uv_stream_t*) conn,
668                     on_read_alloc,
669                     on_tcp_child_process_read);
670   ASSERT_EQ(r, 0);
671 
672   uv_close((uv_handle_t*)conn, close_and_free_cb);
673 }
674 
675 
ipc_helper(int listen_after_write)676 int ipc_helper(int listen_after_write) {
677   /*
678    * This is launched from test-ipc.c. stdin is a duplex channel that we
679    * over which a handle will be transmitted.
680    */
681   struct sockaddr_in addr;
682   int r;
683   uv_buf_t buf;
684 
685   ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
686 
687   r = uv_pipe_init(uv_default_loop(), &channel, 1);
688   ASSERT_EQ(r, 0);
689 
690   uv_pipe_open(&channel, 0);
691 
692   ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
693   ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
694   ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
695 
696   r = uv_tcp_init(uv_default_loop(), &tcp_server);
697   ASSERT_EQ(r, 0);
698 
699   r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
700   ASSERT_EQ(r, 0);
701 
702   if (!listen_after_write) {
703     r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection);
704     ASSERT_EQ(r, 0);
705   }
706 
707   buf = uv_buf_init("hello\n", 6);
708   r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
709       (uv_stream_t*)&tcp_server, NULL);
710   ASSERT_EQ(r, 0);
711 
712   if (listen_after_write) {
713     r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection);
714     ASSERT_EQ(r, 0);
715   }
716 
717   notify_parent_process();
718   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
719   ASSERT_EQ(r, 0);
720 
721   ASSERT_EQ(connection_accepted, 1);
722   ASSERT_EQ(close_cb_called, 3);
723 
724   MAKE_VALGRIND_HAPPY();
725   return 0;
726 }
727 
728 
ipc_helper_tcp_connection(void)729 int ipc_helper_tcp_connection(void) {
730   /*
731    * This is launched from test-ipc.c. stdin is a duplex channel
732    * over which a handle will be transmitted.
733    */
734 
735   int r;
736   struct sockaddr_in addr;
737 
738   r = uv_pipe_init(uv_default_loop(), &channel, 1);
739   ASSERT_EQ(r, 0);
740 
741   uv_pipe_open(&channel, 0);
742 
743   ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
744   ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
745   ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
746 
747   r = uv_tcp_init(uv_default_loop(), &tcp_server);
748   ASSERT_EQ(r, 0);
749 
750   ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
751 
752   r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
753   ASSERT_EQ(r, 0);
754 
755   r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection_tcp_conn);
756   ASSERT_EQ(r, 0);
757 
758   /* Make a connection to the server */
759   r = uv_tcp_init(uv_default_loop(), &conn.conn);
760   ASSERT_EQ(r, 0);
761 
762   ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
763 
764   r = uv_tcp_connect(&conn.conn_req,
765                      (uv_tcp_t*) &conn.conn,
766                      (const struct sockaddr*) &addr,
767                      connect_child_process_cb);
768   ASSERT_EQ(r, 0);
769 
770   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
771   ASSERT_EQ(r, 0);
772 
773   ASSERT_EQ(tcp_conn_read_cb_called, 1);
774   ASSERT_EQ(tcp_conn_write_cb_called, 1);
775   ASSERT_EQ(close_cb_called, 4);
776 
777   MAKE_VALGRIND_HAPPY();
778   return 0;
779 }
780 
ipc_helper_bind_twice(void)781 int ipc_helper_bind_twice(void) {
782   /*
783    * This is launched from test-ipc.c. stdin is a duplex channel
784    * over which two handles will be transmitted.
785    */
786   struct sockaddr_in addr;
787   int r;
788   uv_buf_t buf;
789 
790   ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
791 
792   r = uv_pipe_init(uv_default_loop(), &channel, 1);
793   ASSERT_EQ(r, 0);
794 
795   uv_pipe_open(&channel, 0);
796 
797   ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
798   ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
799   ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
800 
801   buf = uv_buf_init("hello\n", 6);
802 
803   r = uv_tcp_init(uv_default_loop(), &tcp_server);
804   ASSERT_EQ(r, 0);
805   r = uv_tcp_init(uv_default_loop(), &tcp_server2);
806   ASSERT_EQ(r, 0);
807 
808   r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
809   ASSERT_EQ(r, 0);
810   r = uv_tcp_bind(&tcp_server2, (const struct sockaddr*) &addr, 0);
811   ASSERT_EQ(r, 0);
812 
813   r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
814                 (uv_stream_t*)&tcp_server, NULL);
815   ASSERT_EQ(r, 0);
816   r = uv_write2(&write_req2, (uv_stream_t*)&channel, &buf, 1,
817                 (uv_stream_t*)&tcp_server2, NULL);
818   ASSERT_EQ(r, 0);
819 
820   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
821   ASSERT_EQ(r, 0);
822 
823   MAKE_VALGRIND_HAPPY();
824   return 0;
825 }
826 
ipc_helper_send_zero(void)827 int ipc_helper_send_zero(void) {
828   int r;
829   uv_buf_t zero_buf;
830 
831   zero_buf = uv_buf_init(0, 0);
832 
833   r = uv_pipe_init(uv_default_loop(), &channel, 0);
834   ASSERT_EQ(r, 0);
835 
836   uv_pipe_open(&channel, 0);
837 
838   ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
839   ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
840   ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
841 
842   r = uv_write(&write_req,
843                (uv_stream_t*)&channel,
844                &zero_buf,
845                1,
846                send_zero_write_cb);
847 
848   ASSERT_EQ(r, 0);
849 
850   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
851   ASSERT_EQ(r, 0);
852 
853   ASSERT_EQ(send_zero_write, 1);
854 
855   MAKE_VALGRIND_HAPPY();
856   return 0;
857 }
858