1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include "uv.h"
23 #include "task.h"
24
25 #include <stdio.h>
26 #include <string.h>
27
28 static uv_pipe_t channel;
29 static uv_tcp_t tcp_server;
30 static uv_tcp_t tcp_server2;
31 static uv_tcp_t tcp_connection;
32
33 static int exit_cb_called;
34 static int read_cb_called;
35 static int tcp_write_cb_called;
36 static int tcp_read_cb_called;
37 static int on_pipe_read_called;
38 static int local_conn_accepted;
39 static int remote_conn_accepted;
40 static int tcp_server_listening;
41 static uv_write_t write_req;
42 static uv_write_t write_req2;
43 static uv_write_t conn_notify_req;
44 static int close_cb_called;
45 static int connection_accepted;
46 static int tcp_conn_read_cb_called;
47 static int tcp_conn_write_cb_called;
48 static int send_zero_write;
49
50 typedef struct {
51 uv_connect_t conn_req;
52 uv_write_t tcp_write_req;
53 uv_tcp_t conn;
54 } tcp_conn;
55
56 #define CONN_COUNT 100
57 #define BACKLOG 128
58
59
close_server_conn_cb(uv_handle_t * handle)60 static void close_server_conn_cb(uv_handle_t* handle) {
61 free(handle);
62 }
63
64
on_connection(uv_stream_t * server,int status)65 static void on_connection(uv_stream_t* server, int status) {
66 uv_tcp_t* conn;
67 int r;
68
69 if (!local_conn_accepted) {
70 /* Accept the connection and close it. Also and close the server. */
71 ASSERT_EQ(status, 0);
72 ASSERT_PTR_EQ(&tcp_server, server);
73
74 conn = malloc(sizeof(*conn));
75 ASSERT_NOT_NULL(conn);
76 r = uv_tcp_init(server->loop, conn);
77 ASSERT_EQ(r, 0);
78
79 r = uv_accept(server, (uv_stream_t*)conn);
80 ASSERT_EQ(r, 0);
81
82 uv_close((uv_handle_t*)conn, close_server_conn_cb);
83 uv_close((uv_handle_t*)server, NULL);
84 local_conn_accepted = 1;
85 }
86 }
87
88
exit_cb(uv_process_t * process,int64_t exit_status,int term_signal)89 static void exit_cb(uv_process_t* process,
90 int64_t exit_status,
91 int term_signal) {
92 printf("exit_cb\n");
93 exit_cb_called++;
94 ASSERT_EQ(exit_status, 0);
95 ASSERT_EQ(term_signal, 0);
96 uv_close((uv_handle_t*)process, NULL);
97 }
98
99
on_alloc(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)100 static void on_alloc(uv_handle_t* handle,
101 size_t suggested_size,
102 uv_buf_t* buf) {
103 buf->base = malloc(suggested_size);
104 buf->len = suggested_size;
105 }
106
107
close_client_conn_cb(uv_handle_t * handle)108 static void close_client_conn_cb(uv_handle_t* handle) {
109 tcp_conn* p = (tcp_conn*)handle->data;
110 free(p);
111 }
112
113
connect_cb(uv_connect_t * req,int status)114 static void connect_cb(uv_connect_t* req, int status) {
115 uv_close((uv_handle_t*)req->handle, close_client_conn_cb);
116 }
117
118
make_many_connections(void)119 static void make_many_connections(void) {
120 tcp_conn* conn;
121 struct sockaddr_in addr;
122 int r, i;
123
124 for (i = 0; i < CONN_COUNT; i++) {
125 conn = malloc(sizeof(*conn));
126 ASSERT_NOT_NULL(conn);
127
128 r = uv_tcp_init(uv_default_loop(), &conn->conn);
129 ASSERT_EQ(r, 0);
130 ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
131
132 r = uv_tcp_connect(&conn->conn_req,
133 (uv_tcp_t*) &conn->conn,
134 (const struct sockaddr*) &addr,
135 connect_cb);
136 ASSERT_EQ(r, 0);
137
138 conn->conn.data = conn;
139 }
140 }
141
142
on_read(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)143 static void on_read(uv_stream_t* handle,
144 ssize_t nread,
145 const uv_buf_t* buf) {
146 int r;
147 uv_pipe_t* pipe;
148 uv_handle_type pending;
149 uv_buf_t outbuf;
150
151 pipe = (uv_pipe_t*) handle;
152
153 if (nread == 0) {
154 /* Everything OK, but nothing read. */
155 free(buf->base);
156 return;
157 }
158
159 if (nread < 0) {
160 if (nread == UV_EOF) {
161 free(buf->base);
162 return;
163 }
164
165 printf("error recving on channel: %s\n", uv_strerror(nread));
166 abort();
167 }
168
169 fprintf(stderr, "got %d bytes\n", (int)nread);
170
171 pending = uv_pipe_pending_type(pipe);
172 if (!tcp_server_listening) {
173 ASSERT_EQ(1, uv_pipe_pending_count(pipe));
174 ASSERT_GT(nread, 0);
175 ASSERT_NOT_NULL(buf->base);
176 ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
177 read_cb_called++;
178
179 /* Accept the pending TCP server, and start listening on it. */
180 ASSERT_EQ(pending, UV_TCP);
181 r = uv_tcp_init(uv_default_loop(), &tcp_server);
182 ASSERT_EQ(r, 0);
183
184 r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
185 ASSERT_EQ(r, 0);
186
187 r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection);
188 ASSERT_EQ(r, 0);
189
190 tcp_server_listening = 1;
191
192 /* Make sure that the expected data is correctly multiplexed. */
193 ASSERT_MEM_EQ("hello\n", buf->base, nread);
194
195 outbuf = uv_buf_init("foobar\n", 7);
196 r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL);
197 ASSERT_EQ(r, 0);
198
199 /* Create a bunch of connections to get both servers to accept. */
200 make_many_connections();
201 } else if (memcmp("accepted_connection\n", buf->base, nread) == 0) {
202 /* Remote server has accepted a connection. Close the channel. */
203 ASSERT_EQ(0, uv_pipe_pending_count(pipe));
204 ASSERT_EQ(pending, UV_UNKNOWN_HANDLE);
205 remote_conn_accepted = 1;
206 uv_close((uv_handle_t*)&channel, NULL);
207 }
208
209 free(buf->base);
210 }
211
212 #ifdef _WIN32
on_read_listen_after_bound_twice(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)213 static void on_read_listen_after_bound_twice(uv_stream_t* handle,
214 ssize_t nread,
215 const uv_buf_t* buf) {
216 int r;
217 uv_pipe_t* pipe;
218 uv_handle_type pending;
219
220 pipe = (uv_pipe_t*) handle;
221
222 if (nread == 0) {
223 /* Everything OK, but nothing read. */
224 free(buf->base);
225 return;
226 }
227
228 if (nread < 0) {
229 if (nread == UV_EOF) {
230 free(buf->base);
231 return;
232 }
233
234 printf("error recving on channel: %s\n", uv_strerror(nread));
235 abort();
236 }
237
238 fprintf(stderr, "got %d bytes\n", (int)nread);
239
240 ASSERT_GT(uv_pipe_pending_count(pipe), 0);
241 pending = uv_pipe_pending_type(pipe);
242 ASSERT_GT(nread, 0);
243 ASSERT_NOT_NULL(buf->base);
244 ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
245 read_cb_called++;
246
247 if (read_cb_called == 1) {
248 /* Accept the first TCP server, and start listening on it. */
249 ASSERT_EQ(pending, UV_TCP);
250 r = uv_tcp_init(uv_default_loop(), &tcp_server);
251 ASSERT_EQ(r, 0);
252
253 r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
254 ASSERT_EQ(r, 0);
255
256 r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection);
257 ASSERT_EQ(r, 0);
258 } else if (read_cb_called == 2) {
259 /* Accept the second TCP server, and start listening on it. */
260 ASSERT_EQ(pending, UV_TCP);
261 r = uv_tcp_init(uv_default_loop(), &tcp_server2);
262 ASSERT_EQ(r, 0);
263
264 r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server2);
265 ASSERT_EQ(r, 0);
266
267 r = uv_listen((uv_stream_t*)&tcp_server2, BACKLOG, on_connection);
268 ASSERT_EQ(r, UV_EADDRINUSE);
269
270 uv_close((uv_handle_t*)&tcp_server, NULL);
271 uv_close((uv_handle_t*)&tcp_server2, NULL);
272 ASSERT_EQ(0, uv_pipe_pending_count(pipe));
273 uv_close((uv_handle_t*)&channel, NULL);
274 }
275
276 free(buf->base);
277 }
278 #endif
279
spawn_helper(uv_pipe_t * channel,uv_process_t * process,const char * helper)280 void spawn_helper(uv_pipe_t* channel,
281 uv_process_t* process,
282 const char* helper) {
283 uv_process_options_t options;
284 size_t exepath_size;
285 char exepath[1024];
286 char* args[3];
287 int r;
288 uv_stdio_container_t stdio[3];
289
290 r = uv_pipe_init(uv_default_loop(), channel, 1);
291 ASSERT_EQ(r, 0);
292 ASSERT_NE(channel->ipc, 0);
293
294 exepath_size = sizeof(exepath);
295 r = uv_exepath(exepath, &exepath_size);
296 ASSERT_EQ(r, 0);
297
298 exepath[exepath_size] = '\0';
299 args[0] = exepath;
300 args[1] = (char*)helper;
301 args[2] = NULL;
302
303 memset(&options, 0, sizeof(options));
304 options.file = exepath;
305 options.args = args;
306 options.exit_cb = exit_cb;
307 options.stdio = stdio;
308 options.stdio_count = ARRAY_SIZE(stdio);
309
310 stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
311 stdio[0].data.stream = (uv_stream_t*) channel;
312 stdio[1].flags = UV_INHERIT_FD;
313 stdio[1].data.fd = 1;
314 stdio[2].flags = UV_INHERIT_FD;
315 stdio[2].data.fd = 2;
316
317 r = uv_spawn(uv_default_loop(), process, &options);
318 ASSERT_EQ(r, 0);
319 }
320
321
on_tcp_write(uv_write_t * req,int status)322 static void on_tcp_write(uv_write_t* req, int status) {
323 ASSERT_EQ(status, 0);
324 ASSERT_PTR_EQ(req->handle, &tcp_connection);
325 tcp_write_cb_called++;
326 }
327
328
on_read_alloc(uv_handle_t * handle,size_t suggested_size,uv_buf_t * buf)329 static void on_read_alloc(uv_handle_t* handle,
330 size_t suggested_size,
331 uv_buf_t* buf) {
332 buf->base = malloc(suggested_size);
333 buf->len = suggested_size;
334 }
335
336
on_tcp_read(uv_stream_t * tcp,ssize_t nread,const uv_buf_t * buf)337 static void on_tcp_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) {
338 ASSERT_GT(nread, 0);
339 ASSERT_MEM_EQ("hello again\n", buf->base, nread);
340 ASSERT_PTR_EQ(tcp, &tcp_connection);
341 free(buf->base);
342
343 tcp_read_cb_called++;
344
345 uv_close((uv_handle_t*)tcp, NULL);
346 uv_close((uv_handle_t*)&channel, NULL);
347 }
348
349
on_read_connection(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)350 static void on_read_connection(uv_stream_t* handle,
351 ssize_t nread,
352 const uv_buf_t* buf) {
353 int r;
354 uv_buf_t outbuf;
355 uv_pipe_t* pipe;
356 uv_handle_type pending;
357
358 pipe = (uv_pipe_t*) handle;
359 if (nread == 0) {
360 /* Everything OK, but nothing read. */
361 free(buf->base);
362 return;
363 }
364
365 if (nread < 0) {
366 if (nread == UV_EOF) {
367 free(buf->base);
368 return;
369 }
370
371 printf("error recving on channel: %s\n", uv_strerror(nread));
372 abort();
373 }
374
375 fprintf(stderr, "got %d bytes\n", (int)nread);
376
377 ASSERT_EQ(1, uv_pipe_pending_count(pipe));
378 pending = uv_pipe_pending_type(pipe);
379
380 ASSERT_GT(nread, 0);
381 ASSERT_NOT_NULL(buf->base);
382 ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
383 read_cb_called++;
384
385 /* Accept the pending TCP connection */
386 ASSERT_EQ(pending, UV_TCP);
387 r = uv_tcp_init(uv_default_loop(), &tcp_connection);
388 ASSERT_EQ(r, 0);
389
390 r = uv_accept(handle, (uv_stream_t*)&tcp_connection);
391 ASSERT_EQ(r, 0);
392
393 /* Make sure that the expected data is correctly multiplexed. */
394 ASSERT_MEM_EQ("hello\n", buf->base, nread);
395
396 /* Write/read to/from the connection */
397 outbuf = uv_buf_init("world\n", 6);
398 r = uv_write(&write_req, (uv_stream_t*)&tcp_connection, &outbuf, 1,
399 on_tcp_write);
400 ASSERT_EQ(r, 0);
401
402 r = uv_read_start((uv_stream_t*)&tcp_connection, on_read_alloc, on_tcp_read);
403 ASSERT_EQ(r, 0);
404
405 free(buf->base);
406 }
407
408
on_read_send_zero(uv_stream_t * handle,ssize_t nread,const uv_buf_t * buf)409 static void on_read_send_zero(uv_stream_t* handle,
410 ssize_t nread,
411 const uv_buf_t* buf) {
412 ASSERT(nread == 0 || nread == UV_EOF);
413 free(buf->base);
414 }
415
416
run_ipc_test(const char * helper,uv_read_cb read_cb)417 static int run_ipc_test(const char* helper, uv_read_cb read_cb) {
418 uv_process_t process;
419 int r;
420
421 spawn_helper(&channel, &process, helper);
422 uv_read_start((uv_stream_t*)&channel, on_alloc, read_cb);
423
424 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
425 ASSERT_EQ(r, 0);
426
427 MAKE_VALGRIND_HAPPY();
428 return 0;
429 }
430
431
TEST_IMPL(ipc_listen_before_write)432 TEST_IMPL(ipc_listen_before_write) {
433 #if defined(NO_SEND_HANDLE_ON_PIPE)
434 RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
435 #endif
436 int r = run_ipc_test("ipc_helper_listen_before_write", on_read);
437 ASSERT_EQ(local_conn_accepted, 1);
438 ASSERT_EQ(remote_conn_accepted, 1);
439 ASSERT_EQ(read_cb_called, 1);
440 ASSERT_EQ(exit_cb_called, 1);
441 return r;
442 }
443
444
TEST_IMPL(ipc_listen_after_write)445 TEST_IMPL(ipc_listen_after_write) {
446 #if defined(NO_SEND_HANDLE_ON_PIPE)
447 RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
448 #endif
449 int r = run_ipc_test("ipc_helper_listen_after_write", on_read);
450 ASSERT_EQ(local_conn_accepted, 1);
451 ASSERT_EQ(remote_conn_accepted, 1);
452 ASSERT_EQ(read_cb_called, 1);
453 ASSERT_EQ(exit_cb_called, 1);
454 return r;
455 }
456
457
TEST_IMPL(ipc_tcp_connection)458 TEST_IMPL(ipc_tcp_connection) {
459 #if defined(NO_SEND_HANDLE_ON_PIPE)
460 RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
461 #endif
462 int r = run_ipc_test("ipc_helper_tcp_connection", on_read_connection);
463 ASSERT_EQ(read_cb_called, 1);
464 ASSERT_EQ(tcp_write_cb_called, 1);
465 ASSERT_EQ(tcp_read_cb_called, 1);
466 ASSERT_EQ(exit_cb_called, 1);
467 return r;
468 }
469
470
471 #ifdef _WIN32
TEST_IMPL(listen_with_simultaneous_accepts)472 TEST_IMPL(listen_with_simultaneous_accepts) {
473 uv_tcp_t server;
474 int r;
475 struct sockaddr_in addr;
476
477 ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
478
479 r = uv_tcp_init(uv_default_loop(), &server);
480 ASSERT_EQ(r, 0);
481
482 r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
483 ASSERT_EQ(r, 0);
484
485 r = uv_tcp_simultaneous_accepts(&server, 1);
486 ASSERT_EQ(r, 0);
487
488 r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
489 ASSERT_EQ(r, 0);
490 ASSERT_EQ(server.reqs_pending, 32);
491
492 MAKE_VALGRIND_HAPPY();
493 return 0;
494 }
495
496
TEST_IMPL(listen_no_simultaneous_accepts)497 TEST_IMPL(listen_no_simultaneous_accepts) {
498 uv_tcp_t server;
499 int r;
500 struct sockaddr_in addr;
501
502 ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
503
504 r = uv_tcp_init(uv_default_loop(), &server);
505 ASSERT_EQ(r, 0);
506
507 r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
508 ASSERT_EQ(r, 0);
509
510 r = uv_tcp_simultaneous_accepts(&server, 0);
511 ASSERT_EQ(r, 0);
512
513 r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
514 ASSERT_EQ(r, 0);
515 ASSERT_EQ(server.reqs_pending, 1);
516
517 MAKE_VALGRIND_HAPPY();
518 return 0;
519 }
520
TEST_IMPL(ipc_listen_after_bind_twice)521 TEST_IMPL(ipc_listen_after_bind_twice) {
522 #if defined(NO_SEND_HANDLE_ON_PIPE)
523 RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
524 #endif
525 int r = run_ipc_test("ipc_helper_bind_twice", on_read_listen_after_bound_twice);
526 ASSERT_EQ(read_cb_called, 2);
527 ASSERT_EQ(exit_cb_called, 1);
528 return r;
529 }
530 #endif
531
TEST_IMPL(ipc_send_zero)532 TEST_IMPL(ipc_send_zero) {
533 int r;
534 r = run_ipc_test("ipc_helper_send_zero", on_read_send_zero);
535 ASSERT_EQ(r, 0);
536 return 0;
537 }
538
539
540 /* Everything here runs in a child process. */
541
542 static tcp_conn conn;
543
544
close_cb(uv_handle_t * handle)545 static void close_cb(uv_handle_t* handle) {
546 close_cb_called++;
547 }
548
549
conn_notify_write_cb(uv_write_t * req,int status)550 static void conn_notify_write_cb(uv_write_t* req, int status) {
551 uv_close((uv_handle_t*)&tcp_server, close_cb);
552 uv_close((uv_handle_t*)&channel, close_cb);
553 }
554
555
tcp_connection_write_cb(uv_write_t * req,int status)556 static void tcp_connection_write_cb(uv_write_t* req, int status) {
557 ASSERT_PTR_EQ(&conn.conn, req->handle);
558 uv_close((uv_handle_t*)req->handle, close_cb);
559 uv_close((uv_handle_t*)&channel, close_cb);
560 uv_close((uv_handle_t*)&tcp_server, close_cb);
561 tcp_conn_write_cb_called++;
562 }
563
564
send_zero_write_cb(uv_write_t * req,int status)565 static void send_zero_write_cb(uv_write_t* req, int status) {
566 ASSERT_EQ(status, 0);
567 send_zero_write++;
568 }
569
on_tcp_child_process_read(uv_stream_t * tcp,ssize_t nread,const uv_buf_t * buf)570 static void on_tcp_child_process_read(uv_stream_t* tcp,
571 ssize_t nread,
572 const uv_buf_t* buf) {
573 uv_buf_t outbuf;
574 int r;
575
576 if (nread < 0) {
577 if (nread == UV_EOF) {
578 free(buf->base);
579 return;
580 }
581
582 printf("error recving on tcp connection: %s\n", uv_strerror(nread));
583 abort();
584 }
585
586 ASSERT_GT(nread, 0);
587 ASSERT_MEM_EQ("world\n", buf->base, nread);
588 on_pipe_read_called++;
589 free(buf->base);
590
591 /* Write to the socket */
592 outbuf = uv_buf_init("hello again\n", 12);
593 r = uv_write(&conn.tcp_write_req, tcp, &outbuf, 1, tcp_connection_write_cb);
594 ASSERT_EQ(r, 0);
595
596 tcp_conn_read_cb_called++;
597 }
598
599
connect_child_process_cb(uv_connect_t * req,int status)600 static void connect_child_process_cb(uv_connect_t* req, int status) {
601 int r;
602
603 ASSERT_EQ(status, 0);
604 r = uv_read_start(req->handle, on_read_alloc, on_tcp_child_process_read);
605 ASSERT_EQ(r, 0);
606 }
607
608
ipc_on_connection(uv_stream_t * server,int status)609 static void ipc_on_connection(uv_stream_t* server, int status) {
610 int r;
611 uv_buf_t buf;
612
613 if (!connection_accepted) {
614 /*
615 * Accept the connection and close it. Also let the other
616 * side know.
617 */
618 ASSERT_EQ(status, 0);
619 ASSERT_PTR_EQ(&tcp_server, server);
620
621 r = uv_tcp_init(server->loop, &conn.conn);
622 ASSERT_EQ(r, 0);
623
624 r = uv_accept(server, (uv_stream_t*)&conn.conn);
625 ASSERT_EQ(r, 0);
626
627 uv_close((uv_handle_t*)&conn.conn, close_cb);
628
629 buf = uv_buf_init("accepted_connection\n", 20);
630 r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
631 NULL, conn_notify_write_cb);
632 ASSERT_EQ(r, 0);
633
634 connection_accepted = 1;
635 }
636 }
637
638
close_and_free_cb(uv_handle_t * handle)639 static void close_and_free_cb(uv_handle_t* handle) {
640 close_cb_called++;
641 free(handle);
642 }
643
ipc_on_connection_tcp_conn(uv_stream_t * server,int status)644 static void ipc_on_connection_tcp_conn(uv_stream_t* server, int status) {
645 int r;
646 uv_buf_t buf;
647 uv_tcp_t* conn;
648
649 ASSERT_EQ(status, 0);
650 ASSERT_PTR_EQ(&tcp_server, server);
651
652 conn = malloc(sizeof(*conn));
653 ASSERT_NOT_NULL(conn);
654
655 r = uv_tcp_init(server->loop, conn);
656 ASSERT_EQ(r, 0);
657
658 r = uv_accept(server, (uv_stream_t*)conn);
659 ASSERT_EQ(r, 0);
660
661 /* Send the accepted connection to the other process */
662 buf = uv_buf_init("hello\n", 6);
663 r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
664 (uv_stream_t*)conn, NULL);
665 ASSERT_EQ(r, 0);
666
667 r = uv_read_start((uv_stream_t*) conn,
668 on_read_alloc,
669 on_tcp_child_process_read);
670 ASSERT_EQ(r, 0);
671
672 uv_close((uv_handle_t*)conn, close_and_free_cb);
673 }
674
675
ipc_helper(int listen_after_write)676 int ipc_helper(int listen_after_write) {
677 /*
678 * This is launched from test-ipc.c. stdin is a duplex channel that we
679 * over which a handle will be transmitted.
680 */
681 struct sockaddr_in addr;
682 int r;
683 uv_buf_t buf;
684
685 ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
686
687 r = uv_pipe_init(uv_default_loop(), &channel, 1);
688 ASSERT_EQ(r, 0);
689
690 uv_pipe_open(&channel, 0);
691
692 ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
693 ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
694 ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
695
696 r = uv_tcp_init(uv_default_loop(), &tcp_server);
697 ASSERT_EQ(r, 0);
698
699 r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
700 ASSERT_EQ(r, 0);
701
702 if (!listen_after_write) {
703 r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection);
704 ASSERT_EQ(r, 0);
705 }
706
707 buf = uv_buf_init("hello\n", 6);
708 r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
709 (uv_stream_t*)&tcp_server, NULL);
710 ASSERT_EQ(r, 0);
711
712 if (listen_after_write) {
713 r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection);
714 ASSERT_EQ(r, 0);
715 }
716
717 notify_parent_process();
718 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
719 ASSERT_EQ(r, 0);
720
721 ASSERT_EQ(connection_accepted, 1);
722 ASSERT_EQ(close_cb_called, 3);
723
724 MAKE_VALGRIND_HAPPY();
725 return 0;
726 }
727
728
ipc_helper_tcp_connection(void)729 int ipc_helper_tcp_connection(void) {
730 /*
731 * This is launched from test-ipc.c. stdin is a duplex channel
732 * over which a handle will be transmitted.
733 */
734
735 int r;
736 struct sockaddr_in addr;
737
738 r = uv_pipe_init(uv_default_loop(), &channel, 1);
739 ASSERT_EQ(r, 0);
740
741 uv_pipe_open(&channel, 0);
742
743 ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
744 ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
745 ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
746
747 r = uv_tcp_init(uv_default_loop(), &tcp_server);
748 ASSERT_EQ(r, 0);
749
750 ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
751
752 r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
753 ASSERT_EQ(r, 0);
754
755 r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection_tcp_conn);
756 ASSERT_EQ(r, 0);
757
758 /* Make a connection to the server */
759 r = uv_tcp_init(uv_default_loop(), &conn.conn);
760 ASSERT_EQ(r, 0);
761
762 ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
763
764 r = uv_tcp_connect(&conn.conn_req,
765 (uv_tcp_t*) &conn.conn,
766 (const struct sockaddr*) &addr,
767 connect_child_process_cb);
768 ASSERT_EQ(r, 0);
769
770 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
771 ASSERT_EQ(r, 0);
772
773 ASSERT_EQ(tcp_conn_read_cb_called, 1);
774 ASSERT_EQ(tcp_conn_write_cb_called, 1);
775 ASSERT_EQ(close_cb_called, 4);
776
777 MAKE_VALGRIND_HAPPY();
778 return 0;
779 }
780
ipc_helper_bind_twice(void)781 int ipc_helper_bind_twice(void) {
782 /*
783 * This is launched from test-ipc.c. stdin is a duplex channel
784 * over which two handles will be transmitted.
785 */
786 struct sockaddr_in addr;
787 int r;
788 uv_buf_t buf;
789
790 ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
791
792 r = uv_pipe_init(uv_default_loop(), &channel, 1);
793 ASSERT_EQ(r, 0);
794
795 uv_pipe_open(&channel, 0);
796
797 ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
798 ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
799 ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
800
801 buf = uv_buf_init("hello\n", 6);
802
803 r = uv_tcp_init(uv_default_loop(), &tcp_server);
804 ASSERT_EQ(r, 0);
805 r = uv_tcp_init(uv_default_loop(), &tcp_server2);
806 ASSERT_EQ(r, 0);
807
808 r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
809 ASSERT_EQ(r, 0);
810 r = uv_tcp_bind(&tcp_server2, (const struct sockaddr*) &addr, 0);
811 ASSERT_EQ(r, 0);
812
813 r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
814 (uv_stream_t*)&tcp_server, NULL);
815 ASSERT_EQ(r, 0);
816 r = uv_write2(&write_req2, (uv_stream_t*)&channel, &buf, 1,
817 (uv_stream_t*)&tcp_server2, NULL);
818 ASSERT_EQ(r, 0);
819
820 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
821 ASSERT_EQ(r, 0);
822
823 MAKE_VALGRIND_HAPPY();
824 return 0;
825 }
826
ipc_helper_send_zero(void)827 int ipc_helper_send_zero(void) {
828 int r;
829 uv_buf_t zero_buf;
830
831 zero_buf = uv_buf_init(0, 0);
832
833 r = uv_pipe_init(uv_default_loop(), &channel, 0);
834 ASSERT_EQ(r, 0);
835
836 uv_pipe_open(&channel, 0);
837
838 ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
839 ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
840 ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
841
842 r = uv_write(&write_req,
843 (uv_stream_t*)&channel,
844 &zero_buf,
845 1,
846 send_zero_write_cb);
847
848 ASSERT_EQ(r, 0);
849
850 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
851 ASSERT_EQ(r, 0);
852
853 ASSERT_EQ(send_zero_write, 1);
854
855 MAKE_VALGRIND_HAPPY();
856 return 0;
857 }
858