1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include <errno.h>
23
24 #ifdef _WIN32
25 # include <fcntl.h>
26 #else
27 # include <sys/socket.h>
28 # include <unistd.h>
29 #endif
30
31 #include "uv.h"
32 #include "task.h"
33
34 #ifdef __linux__
35 # include <sys/epoll.h>
36 #endif
37
38 #ifdef UV_HAVE_KQUEUE
39 # include <sys/types.h>
40 # include <sys/event.h>
41 # include <sys/time.h>
42 #endif
43
44
45 #define NUM_CLIENTS 5
46 #define TRANSFER_BYTES (1 << 16)
47
48 #undef MIN
49 #define MIN(a, b) (((a) < (b)) ? (a) : (b));
50
51
52 typedef enum {
53 UNIDIRECTIONAL,
54 DUPLEX
55 } test_mode_t;
56
57 typedef struct connection_context_s {
58 uv_poll_t poll_handle;
59 uv_timer_t timer_handle;
60 uv_os_sock_t sock;
61 size_t read, sent;
62 int is_server_connection;
63 int open_handles;
64 int got_fin, sent_fin, got_disconnect;
65 unsigned int events, delayed_events;
66 } connection_context_t;
67
68 typedef struct server_context_s {
69 uv_poll_t poll_handle;
70 uv_os_sock_t sock;
71 int connections;
72 } server_context_t;
73
74
75 static void delay_timer_cb(uv_timer_t* timer);
76
77
78 static test_mode_t test_mode = DUPLEX;
79
80 static int closed_connections = 0;
81
82 static int valid_writable_wakeups = 0;
83 static int spurious_writable_wakeups = 0;
84
85 #if !defined(__sun) && !defined(_AIX) && !defined(__MVS__)
86 static int disconnects = 0;
87 #endif /* !__sun && !_AIX && !__MVS__ */
88
got_eagain(void)89 static int got_eagain(void) {
90 #ifdef _WIN32
91 return WSAGetLastError() == WSAEWOULDBLOCK;
92 #else
93 return errno == EAGAIN
94 || errno == EINPROGRESS
95 #ifdef EWOULDBLOCK
96 || errno == EWOULDBLOCK;
97 #endif
98 ;
99 #endif
100 }
101
102
create_bound_socket(struct sockaddr_in bind_addr)103 static uv_os_sock_t create_bound_socket (struct sockaddr_in bind_addr) {
104 uv_os_sock_t sock;
105 int r;
106
107 sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
108 #ifdef _WIN32
109 ASSERT(sock != INVALID_SOCKET);
110 #else
111 ASSERT(sock >= 0);
112 #endif
113
114 #ifndef _WIN32
115 {
116 /* Allow reuse of the port. */
117 int yes = 1;
118 r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
119 ASSERT(r == 0);
120 }
121 #endif
122
123 r = bind(sock, (const struct sockaddr*) &bind_addr, sizeof bind_addr);
124 ASSERT(r == 0);
125
126 return sock;
127 }
128
129
close_socket(uv_os_sock_t sock)130 static void close_socket(uv_os_sock_t sock) {
131 int r;
132 #ifdef _WIN32
133 r = closesocket(sock);
134 #else
135 r = close(sock);
136 #endif
137 /* On FreeBSD close() can fail with ECONNRESET if the socket was shutdown by
138 * the peer before all pending data was delivered.
139 */
140 ASSERT(r == 0 || errno == ECONNRESET);
141 }
142
143
create_connection_context(uv_os_sock_t sock,int is_server_connection)144 static connection_context_t* create_connection_context(
145 uv_os_sock_t sock, int is_server_connection) {
146 int r;
147 connection_context_t* context;
148
149 context = (connection_context_t*) malloc(sizeof *context);
150 ASSERT_NOT_NULL(context);
151
152 context->sock = sock;
153 context->is_server_connection = is_server_connection;
154 context->read = 0;
155 context->sent = 0;
156 context->open_handles = 0;
157 context->events = 0;
158 context->delayed_events = 0;
159 context->got_fin = 0;
160 context->sent_fin = 0;
161 context->got_disconnect = 0;
162
163 r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock);
164 context->open_handles++;
165 context->poll_handle.data = context;
166 ASSERT(r == 0);
167
168 r = uv_timer_init(uv_default_loop(), &context->timer_handle);
169 context->open_handles++;
170 context->timer_handle.data = context;
171 ASSERT(r == 0);
172
173 return context;
174 }
175
176
connection_close_cb(uv_handle_t * handle)177 static void connection_close_cb(uv_handle_t* handle) {
178 connection_context_t* context = (connection_context_t*) handle->data;
179
180 if (--context->open_handles == 0) {
181 if (test_mode == DUPLEX || context->is_server_connection) {
182 ASSERT(context->read == TRANSFER_BYTES);
183 } else {
184 ASSERT(context->read == 0);
185 }
186
187 if (test_mode == DUPLEX || !context->is_server_connection) {
188 ASSERT(context->sent == TRANSFER_BYTES);
189 } else {
190 ASSERT(context->sent == 0);
191 }
192
193 closed_connections++;
194
195 free(context);
196 }
197 }
198
199
destroy_connection_context(connection_context_t * context)200 static void destroy_connection_context(connection_context_t* context) {
201 uv_close((uv_handle_t*) &context->poll_handle, connection_close_cb);
202 uv_close((uv_handle_t*) &context->timer_handle, connection_close_cb);
203 }
204
205
connection_poll_cb(uv_poll_t * handle,int status,int events)206 static void connection_poll_cb(uv_poll_t* handle, int status, int events) {
207 connection_context_t* context = (connection_context_t*) handle->data;
208 unsigned int new_events;
209 int r;
210
211 ASSERT(status == 0);
212 ASSERT(events & context->events);
213 ASSERT(!(events & ~context->events));
214
215 new_events = context->events;
216
217 if (events & UV_READABLE) {
218 int action = rand() % 7;
219
220 switch (action) {
221 case 0:
222 case 1: {
223 /* Read a couple of bytes. */
224 static char buffer[74];
225
226 do
227 r = recv(context->sock, buffer, sizeof buffer, 0);
228 while (r == -1 && errno == EINTR);
229 ASSERT(r >= 0);
230
231 if (r > 0) {
232 context->read += r;
233 } else {
234 /* Got FIN. */
235 context->got_fin = 1;
236 new_events &= ~UV_READABLE;
237 }
238
239 break;
240 }
241
242 case 2:
243 case 3: {
244 /* Read until EAGAIN. */
245 static char buffer[931];
246
247 for (;;) {
248 do
249 r = recv(context->sock, buffer, sizeof buffer, 0);
250 while (r == -1 && errno == EINTR);
251
252 if (r <= 0)
253 break;
254
255 context->read += r;
256 }
257
258 if (r == 0) {
259 /* Got FIN. */
260 context->got_fin = 1;
261 new_events &= ~UV_READABLE;
262 } else {
263 ASSERT(got_eagain());
264 }
265
266 break;
267 }
268
269 case 4:
270 /* Ignore. */
271 break;
272
273 case 5:
274 /* Stop reading for a while. Restart in timer callback. */
275 new_events &= ~UV_READABLE;
276 if (!uv_is_active((uv_handle_t*) &context->timer_handle)) {
277 context->delayed_events = UV_READABLE;
278 uv_timer_start(&context->timer_handle, delay_timer_cb, 10, 0);
279 } else {
280 context->delayed_events |= UV_READABLE;
281 }
282 break;
283
284 case 6:
285 /* Fudge with the event mask. */
286 uv_poll_start(&context->poll_handle, UV_WRITABLE, connection_poll_cb);
287 uv_poll_start(&context->poll_handle, UV_READABLE, connection_poll_cb);
288 context->events = UV_READABLE;
289 break;
290
291 default:
292 ASSERT(0);
293 }
294 }
295
296 if (events & UV_WRITABLE) {
297 if (context->sent < TRANSFER_BYTES &&
298 !(test_mode == UNIDIRECTIONAL && context->is_server_connection)) {
299 /* We have to send more bytes. */
300 int action = rand() % 7;
301
302 switch (action) {
303 case 0:
304 case 1: {
305 /* Send a couple of bytes. */
306 static char buffer[103];
307
308 int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
309 ASSERT(send_bytes > 0);
310
311 do
312 r = send(context->sock, buffer, send_bytes, 0);
313 while (r == -1 && errno == EINTR);
314
315 if (r < 0) {
316 ASSERT(got_eagain());
317 spurious_writable_wakeups++;
318 break;
319 }
320
321 ASSERT(r > 0);
322 context->sent += r;
323 valid_writable_wakeups++;
324 break;
325 }
326
327 case 2:
328 case 3: {
329 /* Send until EAGAIN. */
330 static char buffer[1234];
331
332 int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
333 ASSERT(send_bytes > 0);
334
335 do
336 r = send(context->sock, buffer, send_bytes, 0);
337 while (r == -1 && errno == EINTR);
338
339 if (r < 0) {
340 ASSERT(got_eagain());
341 spurious_writable_wakeups++;
342 break;
343 }
344
345 ASSERT(r > 0);
346 valid_writable_wakeups++;
347 context->sent += r;
348
349 while (context->sent < TRANSFER_BYTES) {
350 send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
351 ASSERT(send_bytes > 0);
352
353 do
354 r = send(context->sock, buffer, send_bytes, 0);
355 while (r == -1 && errno == EINTR);
356 ASSERT(r != 0);
357
358 if (r < 0) {
359 ASSERT(got_eagain());
360 break;
361 }
362
363 context->sent += r;
364 }
365 break;
366 }
367
368 case 4:
369 /* Ignore. */
370 break;
371
372 case 5:
373 /* Stop sending for a while. Restart in timer callback. */
374 new_events &= ~UV_WRITABLE;
375 if (!uv_is_active((uv_handle_t*) &context->timer_handle)) {
376 context->delayed_events = UV_WRITABLE;
377 uv_timer_start(&context->timer_handle, delay_timer_cb, 100, 0);
378 } else {
379 context->delayed_events |= UV_WRITABLE;
380 }
381 break;
382
383 case 6:
384 /* Fudge with the event mask. */
385 uv_poll_start(&context->poll_handle,
386 UV_READABLE,
387 connection_poll_cb);
388 uv_poll_start(&context->poll_handle,
389 UV_WRITABLE,
390 connection_poll_cb);
391 context->events = UV_WRITABLE;
392 break;
393
394 default:
395 ASSERT(0);
396 }
397
398 } else {
399 /* Nothing more to write. Send FIN. */
400 int r;
401 #ifdef _WIN32
402 r = shutdown(context->sock, SD_SEND);
403 #else
404 r = shutdown(context->sock, SHUT_WR);
405 #endif
406 ASSERT(r == 0);
407 context->sent_fin = 1;
408 new_events &= ~UV_WRITABLE;
409 }
410 }
411 #if !defined(__sun) && !defined(_AIX) && !defined(__MVS__)
412 if (events & UV_DISCONNECT) {
413 context->got_disconnect = 1;
414 ++disconnects;
415 new_events &= ~UV_DISCONNECT;
416 }
417
418 if (context->got_fin && context->sent_fin && context->got_disconnect) {
419 #else /* __sun && _AIX && __MVS__ */
420 if (context->got_fin && context->sent_fin) {
421 #endif /* !__sun && !_AIX && !__MVS__ */
422 /* Sent and received FIN. Close and destroy context. */
423 close_socket(context->sock);
424 destroy_connection_context(context);
425 context->events = 0;
426
427 } else if (new_events != context->events) {
428 /* Poll mask changed. Call uv_poll_start again. */
429 context->events = new_events;
430 uv_poll_start(handle, new_events, connection_poll_cb);
431 }
432
433 /* Assert that uv_is_active works correctly for poll handles. */
434 if (context->events != 0) {
435 ASSERT(1 == uv_is_active((uv_handle_t*) handle));
436 } else {
437 ASSERT(0 == uv_is_active((uv_handle_t*) handle));
438 }
439 }
440
441
442 static void delay_timer_cb(uv_timer_t* timer) {
443 connection_context_t* context = (connection_context_t*) timer->data;
444 int r;
445
446 /* Timer should auto stop. */
447 ASSERT(0 == uv_is_active((uv_handle_t*) timer));
448
449 /* Add the requested events to the poll mask. */
450 ASSERT(context->delayed_events != 0);
451 context->events |= context->delayed_events;
452 context->delayed_events = 0;
453
454 r = uv_poll_start(&context->poll_handle,
455 context->events,
456 connection_poll_cb);
457 ASSERT(r == 0);
458 }
459
460
461 static server_context_t* create_server_context(
462 uv_os_sock_t sock) {
463 int r;
464 server_context_t* context;
465
466 context = (server_context_t*) malloc(sizeof *context);
467 ASSERT_NOT_NULL(context);
468
469 context->sock = sock;
470 context->connections = 0;
471
472 r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock);
473 context->poll_handle.data = context;
474 ASSERT(r == 0);
475
476 return context;
477 }
478
479
480 static void server_close_cb(uv_handle_t* handle) {
481 server_context_t* context = (server_context_t*) handle->data;
482 free(context);
483 }
484
485
486 static void destroy_server_context(server_context_t* context) {
487 uv_close((uv_handle_t*) &context->poll_handle, server_close_cb);
488 }
489
490
491 static void server_poll_cb(uv_poll_t* handle, int status, int events) {
492 server_context_t* server_context = (server_context_t*)
493 handle->data;
494 connection_context_t* connection_context;
495 struct sockaddr_in addr;
496 socklen_t addr_len;
497 uv_os_sock_t sock;
498 int r;
499
500 addr_len = sizeof addr;
501 sock = accept(server_context->sock, (struct sockaddr*) &addr, &addr_len);
502 #ifdef _WIN32
503 ASSERT(sock != INVALID_SOCKET);
504 #else
505 ASSERT(sock >= 0);
506 #endif
507
508 connection_context = create_connection_context(sock, 1);
509 connection_context->events = UV_READABLE | UV_WRITABLE | UV_DISCONNECT;
510 r = uv_poll_start(&connection_context->poll_handle,
511 UV_READABLE | UV_WRITABLE | UV_DISCONNECT,
512 connection_poll_cb);
513 ASSERT(r == 0);
514
515 if (++server_context->connections == NUM_CLIENTS) {
516 close_socket(server_context->sock);
517 destroy_server_context(server_context);
518 }
519 }
520
521
522 static void start_server(void) {
523 server_context_t* context;
524 struct sockaddr_in addr;
525 uv_os_sock_t sock;
526 int r;
527
528 ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
529 sock = create_bound_socket(addr);
530 context = create_server_context(sock);
531
532 r = listen(sock, 100);
533 ASSERT(r == 0);
534
535 r = uv_poll_start(&context->poll_handle, UV_READABLE, server_poll_cb);
536 ASSERT(r == 0);
537 }
538
539
540 static void start_client(void) {
541 uv_os_sock_t sock;
542 connection_context_t* context;
543 struct sockaddr_in server_addr;
544 struct sockaddr_in addr;
545 int r;
546
547 ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &server_addr));
548 ASSERT(0 == uv_ip4_addr("0.0.0.0", 0, &addr));
549
550 sock = create_bound_socket(addr);
551 context = create_connection_context(sock, 0);
552
553 context->events = UV_READABLE | UV_WRITABLE | UV_DISCONNECT;
554 r = uv_poll_start(&context->poll_handle,
555 UV_READABLE | UV_WRITABLE | UV_DISCONNECT,
556 connection_poll_cb);
557 ASSERT(r == 0);
558
559 r = connect(sock, (struct sockaddr*) &server_addr, sizeof server_addr);
560 ASSERT(r == 0 || got_eagain());
561 }
562
563
564 static void start_poll_test(void) {
565 int i, r;
566
567 #ifdef _WIN32
568 {
569 struct WSAData wsa_data;
570 int r = WSAStartup(MAKEWORD(2, 2), &wsa_data);
571 ASSERT(r == 0);
572 }
573 #endif
574
575 start_server();
576
577 for (i = 0; i < NUM_CLIENTS; i++)
578 start_client();
579
580 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
581 ASSERT(r == 0);
582
583 /* Assert that at most five percent of the writable wakeups was spurious. */
584 ASSERT(spurious_writable_wakeups == 0 ||
585 (valid_writable_wakeups + spurious_writable_wakeups) /
586 spurious_writable_wakeups > 20);
587
588 ASSERT(closed_connections == NUM_CLIENTS * 2);
589 #if !defined(__sun) && !defined(_AIX) && !defined(__MVS__)
590 ASSERT(disconnects == NUM_CLIENTS * 2);
591 #endif
592 MAKE_VALGRIND_HAPPY();
593 }
594
595
596 /* Issuing a shutdown() on IBM i PASE with parameter SHUT_WR
597 * also sends a normal close sequence to the partner program.
598 * This leads to timing issues and ECONNRESET failures in the
599 * test 'poll_duplex' and 'poll_unidirectional'.
600 *
601 * https://www.ibm.com/support/knowledgecenter/en/ssw_ibm_i_74/apis/shutdn.htm
602 */
603 TEST_IMPL(poll_duplex) {
604 #if defined(NO_SELF_CONNECT)
605 RETURN_SKIP(NO_SELF_CONNECT);
606 #elif defined(__PASE__)
607 RETURN_SKIP("API shutdown() may lead to timing issue on IBM i PASE");
608 #endif
609 test_mode = DUPLEX;
610 start_poll_test();
611 return 0;
612 }
613
614
615 TEST_IMPL(poll_unidirectional) {
616 #if defined(NO_SELF_CONNECT)
617 RETURN_SKIP(NO_SELF_CONNECT);
618 #elif defined(__PASE__)
619 RETURN_SKIP("API shutdown() may lead to timing issue on IBM i PASE");
620 #endif
621 test_mode = UNIDIRECTIONAL;
622 start_poll_test();
623 return 0;
624 }
625
626
627 /* Windows won't let you open a directory so we open a file instead.
628 * OS X lets you poll a file so open the $PWD instead. Both fail
629 * on Linux so it doesn't matter which one we pick. Both succeed
630 * on FreeBSD, Solaris and AIX so skip the test on those platforms.
631 */
632 TEST_IMPL(poll_bad_fdtype) {
633 #if !defined(__DragonFly__) && !defined(__FreeBSD__) && !defined(__sun) && \
634 !defined(_AIX) && !defined(__MVS__) && !defined(__FreeBSD_kernel__) && \
635 !defined(__OpenBSD__) && !defined(__CYGWIN__) && !defined(__MSYS__) && \
636 !defined(__NetBSD__)
637 uv_poll_t poll_handle;
638 int fd;
639
640 #if defined(_WIN32)
641 fd = open("test/fixtures/empty_file", O_RDONLY);
642 #else
643 fd = open(".", O_RDONLY);
644 #endif
645 ASSERT(fd != -1);
646 ASSERT(0 != uv_poll_init(uv_default_loop(), &poll_handle, fd));
647 ASSERT(0 == close(fd));
648 #endif
649
650 MAKE_VALGRIND_HAPPY();
651 return 0;
652 }
653
654
655 #ifdef __linux__
656 TEST_IMPL(poll_nested_epoll) {
657 uv_poll_t poll_handle;
658 int fd;
659
660 fd = epoll_create(1);
661 ASSERT(fd != -1);
662
663 ASSERT(0 == uv_poll_init(uv_default_loop(), &poll_handle, fd));
664 ASSERT(0 == uv_poll_start(&poll_handle, UV_READABLE, (uv_poll_cb) abort));
665 ASSERT(0 != uv_run(uv_default_loop(), UV_RUN_NOWAIT));
666
667 uv_close((uv_handle_t*) &poll_handle, NULL);
668 ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT));
669 ASSERT(0 == close(fd));
670
671 MAKE_VALGRIND_HAPPY();
672 return 0;
673 }
674 #endif /* __linux__ */
675
676
677 #ifdef UV_HAVE_KQUEUE
678 TEST_IMPL(poll_nested_kqueue) {
679 uv_poll_t poll_handle;
680 int fd;
681
682 fd = kqueue();
683 ASSERT(fd != -1);
684
685 ASSERT(0 == uv_poll_init(uv_default_loop(), &poll_handle, fd));
686 ASSERT(0 == uv_poll_start(&poll_handle, UV_READABLE, (uv_poll_cb) abort));
687 ASSERT(0 != uv_run(uv_default_loop(), UV_RUN_NOWAIT));
688
689 uv_close((uv_handle_t*) &poll_handle, NULL);
690 ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT));
691 ASSERT(0 == close(fd));
692
693 MAKE_VALGRIND_HAPPY();
694 return 0;
695 }
696 #endif /* UV_HAVE_KQUEUE */
697