• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include <errno.h>
23 
24 #ifdef _WIN32
25 # include <fcntl.h>
26 #else
27 # include <sys/socket.h>
28 # include <unistd.h>
29 #endif
30 
31 #include "uv.h"
32 #include "task.h"
33 
34 #ifdef __linux__
35 # include <sys/epoll.h>
36 #endif
37 
38 #ifdef UV_HAVE_KQUEUE
39 # include <sys/types.h>
40 # include <sys/event.h>
41 # include <sys/time.h>
42 #endif
43 
44 
45 #define NUM_CLIENTS 5
46 #define TRANSFER_BYTES (1 << 16)
47 
48 #undef MIN
49 #define MIN(a, b) (((a) < (b)) ? (a) : (b));
50 
51 
52 typedef enum {
53   UNIDIRECTIONAL,
54   DUPLEX
55 } test_mode_t;
56 
57 typedef struct connection_context_s {
58   uv_poll_t poll_handle;
59   uv_timer_t timer_handle;
60   uv_os_sock_t sock;
61   size_t read, sent;
62   int is_server_connection;
63   int open_handles;
64   int got_fin, sent_fin, got_disconnect;
65   unsigned int events, delayed_events;
66 } connection_context_t;
67 
68 typedef struct server_context_s {
69   uv_poll_t poll_handle;
70   uv_os_sock_t sock;
71   int connections;
72 } server_context_t;
73 
74 
75 static void delay_timer_cb(uv_timer_t* timer);
76 
77 
78 static test_mode_t test_mode = DUPLEX;
79 
80 static int closed_connections = 0;
81 
82 static int valid_writable_wakeups = 0;
83 static int spurious_writable_wakeups = 0;
84 
85 #if !defined(__sun) && !defined(_AIX) && !defined(__MVS__)
86 static int disconnects = 0;
87 #endif /* !__sun && !_AIX  && !__MVS__ */
88 
got_eagain(void)89 static int got_eagain(void) {
90 #ifdef _WIN32
91   return WSAGetLastError() == WSAEWOULDBLOCK;
92 #else
93   return errno == EAGAIN
94       || errno == EINPROGRESS
95 #ifdef EWOULDBLOCK
96       || errno == EWOULDBLOCK;
97 #endif
98       ;
99 #endif
100 }
101 
102 
create_bound_socket(struct sockaddr_in bind_addr)103 static uv_os_sock_t create_bound_socket (struct sockaddr_in bind_addr) {
104   uv_os_sock_t sock;
105   int r;
106 
107   sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
108 #ifdef _WIN32
109   ASSERT(sock != INVALID_SOCKET);
110 #else
111   ASSERT(sock >= 0);
112 #endif
113 
114 #ifndef _WIN32
115   {
116     /* Allow reuse of the port. */
117     int yes = 1;
118     r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
119     ASSERT(r == 0);
120   }
121 #endif
122 
123   r = bind(sock, (const struct sockaddr*) &bind_addr, sizeof bind_addr);
124   ASSERT(r == 0);
125 
126   return sock;
127 }
128 
129 
close_socket(uv_os_sock_t sock)130 static void close_socket(uv_os_sock_t sock) {
131   int r;
132 #ifdef _WIN32
133   r = closesocket(sock);
134 #else
135   r = close(sock);
136 #endif
137   /* On FreeBSD close() can fail with ECONNRESET if the socket was shutdown by
138    * the peer before all pending data was delivered.
139    */
140   ASSERT(r == 0 || errno == ECONNRESET);
141 }
142 
143 
create_connection_context(uv_os_sock_t sock,int is_server_connection)144 static connection_context_t* create_connection_context(
145     uv_os_sock_t sock, int is_server_connection) {
146   int r;
147   connection_context_t* context;
148 
149   context = (connection_context_t*) malloc(sizeof *context);
150   ASSERT_NOT_NULL(context);
151 
152   context->sock = sock;
153   context->is_server_connection = is_server_connection;
154   context->read = 0;
155   context->sent = 0;
156   context->open_handles = 0;
157   context->events = 0;
158   context->delayed_events = 0;
159   context->got_fin = 0;
160   context->sent_fin = 0;
161   context->got_disconnect = 0;
162 
163   r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock);
164   context->open_handles++;
165   context->poll_handle.data = context;
166   ASSERT(r == 0);
167 
168   r = uv_timer_init(uv_default_loop(), &context->timer_handle);
169   context->open_handles++;
170   context->timer_handle.data = context;
171   ASSERT(r == 0);
172 
173   return context;
174 }
175 
176 
connection_close_cb(uv_handle_t * handle)177 static void connection_close_cb(uv_handle_t* handle) {
178   connection_context_t* context = (connection_context_t*) handle->data;
179 
180   if (--context->open_handles == 0) {
181     if (test_mode == DUPLEX || context->is_server_connection) {
182       ASSERT(context->read == TRANSFER_BYTES);
183     } else {
184       ASSERT(context->read == 0);
185     }
186 
187     if (test_mode == DUPLEX || !context->is_server_connection) {
188       ASSERT(context->sent == TRANSFER_BYTES);
189     } else {
190       ASSERT(context->sent == 0);
191     }
192 
193     closed_connections++;
194 
195     free(context);
196   }
197 }
198 
199 
destroy_connection_context(connection_context_t * context)200 static void destroy_connection_context(connection_context_t* context) {
201   uv_close((uv_handle_t*) &context->poll_handle, connection_close_cb);
202   uv_close((uv_handle_t*) &context->timer_handle, connection_close_cb);
203 }
204 
205 
connection_poll_cb(uv_poll_t * handle,int status,int events)206 static void connection_poll_cb(uv_poll_t* handle, int status, int events) {
207   connection_context_t* context = (connection_context_t*) handle->data;
208   unsigned int new_events;
209   int r;
210 
211   ASSERT(status == 0);
212   ASSERT(events & context->events);
213   ASSERT(!(events & ~context->events));
214 
215   new_events = context->events;
216 
217   if (events & UV_READABLE) {
218     int action = rand() % 7;
219 
220     switch (action) {
221       case 0:
222       case 1: {
223         /* Read a couple of bytes. */
224         static char buffer[74];
225 
226         do
227           r = recv(context->sock, buffer, sizeof buffer, 0);
228         while (r == -1 && errno == EINTR);
229         ASSERT(r >= 0);
230 
231         if (r > 0) {
232           context->read += r;
233         } else {
234           /* Got FIN. */
235           context->got_fin = 1;
236           new_events &= ~UV_READABLE;
237         }
238 
239         break;
240       }
241 
242       case 2:
243       case 3: {
244         /* Read until EAGAIN. */
245         static char buffer[931];
246 
247         for (;;) {
248           do
249             r = recv(context->sock, buffer, sizeof buffer, 0);
250           while (r == -1 && errno == EINTR);
251 
252           if (r <= 0)
253             break;
254 
255           context->read += r;
256         }
257 
258         if (r == 0) {
259           /* Got FIN. */
260           context->got_fin = 1;
261           new_events &= ~UV_READABLE;
262         } else {
263           ASSERT(got_eagain());
264         }
265 
266         break;
267       }
268 
269       case 4:
270         /* Ignore. */
271         break;
272 
273       case 5:
274         /* Stop reading for a while. Restart in timer callback. */
275         new_events &= ~UV_READABLE;
276         if (!uv_is_active((uv_handle_t*) &context->timer_handle)) {
277           context->delayed_events = UV_READABLE;
278           uv_timer_start(&context->timer_handle, delay_timer_cb, 10, 0);
279         } else {
280           context->delayed_events |= UV_READABLE;
281         }
282         break;
283 
284       case 6:
285         /* Fudge with the event mask. */
286         uv_poll_start(&context->poll_handle, UV_WRITABLE, connection_poll_cb);
287         uv_poll_start(&context->poll_handle, UV_READABLE, connection_poll_cb);
288         context->events = UV_READABLE;
289         break;
290 
291       default:
292         ASSERT(0);
293     }
294   }
295 
296   if (events & UV_WRITABLE) {
297     if (context->sent < TRANSFER_BYTES &&
298         !(test_mode == UNIDIRECTIONAL && context->is_server_connection)) {
299       /* We have to send more bytes. */
300       int action = rand() % 7;
301 
302       switch (action) {
303         case 0:
304         case 1: {
305           /* Send a couple of bytes. */
306           static char buffer[103];
307 
308           int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
309           ASSERT(send_bytes > 0);
310 
311           do
312             r = send(context->sock, buffer, send_bytes, 0);
313           while (r == -1 && errno == EINTR);
314 
315           if (r < 0) {
316             ASSERT(got_eagain());
317             spurious_writable_wakeups++;
318             break;
319           }
320 
321           ASSERT(r > 0);
322           context->sent += r;
323           valid_writable_wakeups++;
324           break;
325         }
326 
327         case 2:
328         case 3: {
329           /* Send until EAGAIN. */
330           static char buffer[1234];
331 
332           int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
333           ASSERT(send_bytes > 0);
334 
335           do
336             r = send(context->sock, buffer, send_bytes, 0);
337           while (r == -1 && errno == EINTR);
338 
339           if (r < 0) {
340             ASSERT(got_eagain());
341             spurious_writable_wakeups++;
342             break;
343           }
344 
345           ASSERT(r > 0);
346           valid_writable_wakeups++;
347           context->sent += r;
348 
349           while (context->sent < TRANSFER_BYTES) {
350             send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer);
351             ASSERT(send_bytes > 0);
352 
353             do
354               r = send(context->sock, buffer, send_bytes, 0);
355             while (r == -1 && errno == EINTR);
356             ASSERT(r != 0);
357 
358             if (r < 0) {
359               ASSERT(got_eagain());
360               break;
361             }
362 
363             context->sent += r;
364           }
365           break;
366         }
367 
368         case 4:
369           /* Ignore. */
370          break;
371 
372         case 5:
373           /* Stop sending for a while. Restart in timer callback. */
374           new_events &= ~UV_WRITABLE;
375           if (!uv_is_active((uv_handle_t*) &context->timer_handle)) {
376             context->delayed_events = UV_WRITABLE;
377             uv_timer_start(&context->timer_handle, delay_timer_cb, 100, 0);
378           } else {
379             context->delayed_events |= UV_WRITABLE;
380           }
381           break;
382 
383         case 6:
384           /* Fudge with the event mask. */
385           uv_poll_start(&context->poll_handle,
386                         UV_READABLE,
387                         connection_poll_cb);
388           uv_poll_start(&context->poll_handle,
389                         UV_WRITABLE,
390                         connection_poll_cb);
391           context->events = UV_WRITABLE;
392           break;
393 
394         default:
395           ASSERT(0);
396       }
397 
398     } else {
399       /* Nothing more to write. Send FIN. */
400       int r;
401 #ifdef _WIN32
402       r = shutdown(context->sock, SD_SEND);
403 #else
404       r = shutdown(context->sock, SHUT_WR);
405 #endif
406       ASSERT(r == 0);
407       context->sent_fin = 1;
408       new_events &= ~UV_WRITABLE;
409     }
410   }
411 #if !defined(__sun) && !defined(_AIX) && !defined(__MVS__)
412   if (events & UV_DISCONNECT) {
413     context->got_disconnect = 1;
414     ++disconnects;
415     new_events &= ~UV_DISCONNECT;
416   }
417 
418   if (context->got_fin && context->sent_fin && context->got_disconnect) {
419 #else /* __sun && _AIX  && __MVS__ */
420   if (context->got_fin && context->sent_fin) {
421 #endif /* !__sun && !_AIX && !__MVS__  */
422     /* Sent and received FIN. Close and destroy context. */
423     close_socket(context->sock);
424     destroy_connection_context(context);
425     context->events = 0;
426 
427   } else if (new_events != context->events) {
428     /* Poll mask changed. Call uv_poll_start again. */
429     context->events = new_events;
430     uv_poll_start(handle, new_events, connection_poll_cb);
431   }
432 
433   /* Assert that uv_is_active works correctly for poll handles. */
434   if (context->events != 0) {
435     ASSERT(1 == uv_is_active((uv_handle_t*) handle));
436   } else {
437     ASSERT(0 == uv_is_active((uv_handle_t*) handle));
438   }
439 }
440 
441 
442 static void delay_timer_cb(uv_timer_t* timer) {
443   connection_context_t* context = (connection_context_t*) timer->data;
444   int r;
445 
446   /* Timer should auto stop. */
447   ASSERT(0 == uv_is_active((uv_handle_t*) timer));
448 
449   /* Add the requested events to the poll mask. */
450   ASSERT(context->delayed_events != 0);
451   context->events |= context->delayed_events;
452   context->delayed_events = 0;
453 
454   r = uv_poll_start(&context->poll_handle,
455                     context->events,
456                     connection_poll_cb);
457   ASSERT(r == 0);
458 }
459 
460 
461 static server_context_t* create_server_context(
462     uv_os_sock_t sock) {
463   int r;
464   server_context_t* context;
465 
466   context = (server_context_t*) malloc(sizeof *context);
467   ASSERT_NOT_NULL(context);
468 
469   context->sock = sock;
470   context->connections = 0;
471 
472   r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock);
473   context->poll_handle.data = context;
474   ASSERT(r == 0);
475 
476   return context;
477 }
478 
479 
480 static void server_close_cb(uv_handle_t* handle) {
481   server_context_t* context = (server_context_t*) handle->data;
482   free(context);
483 }
484 
485 
486 static void destroy_server_context(server_context_t* context) {
487   uv_close((uv_handle_t*) &context->poll_handle, server_close_cb);
488 }
489 
490 
491 static void server_poll_cb(uv_poll_t* handle, int status, int events) {
492   server_context_t* server_context = (server_context_t*)
493                                           handle->data;
494   connection_context_t* connection_context;
495   struct sockaddr_in addr;
496   socklen_t addr_len;
497   uv_os_sock_t sock;
498   int r;
499 
500   addr_len = sizeof addr;
501   sock = accept(server_context->sock, (struct sockaddr*) &addr, &addr_len);
502 #ifdef _WIN32
503   ASSERT(sock != INVALID_SOCKET);
504 #else
505   ASSERT(sock >= 0);
506 #endif
507 
508   connection_context = create_connection_context(sock, 1);
509   connection_context->events = UV_READABLE | UV_WRITABLE | UV_DISCONNECT;
510   r = uv_poll_start(&connection_context->poll_handle,
511                     UV_READABLE | UV_WRITABLE | UV_DISCONNECT,
512                     connection_poll_cb);
513   ASSERT(r == 0);
514 
515   if (++server_context->connections == NUM_CLIENTS) {
516     close_socket(server_context->sock);
517     destroy_server_context(server_context);
518   }
519 }
520 
521 
522 static void start_server(void) {
523   server_context_t* context;
524   struct sockaddr_in addr;
525   uv_os_sock_t sock;
526   int r;
527 
528   ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
529   sock = create_bound_socket(addr);
530   context = create_server_context(sock);
531 
532   r = listen(sock, 100);
533   ASSERT(r == 0);
534 
535   r = uv_poll_start(&context->poll_handle, UV_READABLE, server_poll_cb);
536   ASSERT(r == 0);
537 }
538 
539 
540 static void start_client(void) {
541   uv_os_sock_t sock;
542   connection_context_t* context;
543   struct sockaddr_in server_addr;
544   struct sockaddr_in addr;
545   int r;
546 
547   ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &server_addr));
548   ASSERT(0 == uv_ip4_addr("0.0.0.0", 0, &addr));
549 
550   sock = create_bound_socket(addr);
551   context = create_connection_context(sock, 0);
552 
553   context->events = UV_READABLE | UV_WRITABLE | UV_DISCONNECT;
554   r = uv_poll_start(&context->poll_handle,
555                     UV_READABLE | UV_WRITABLE | UV_DISCONNECT,
556                     connection_poll_cb);
557   ASSERT(r == 0);
558 
559   r = connect(sock, (struct sockaddr*) &server_addr, sizeof server_addr);
560   ASSERT(r == 0 || got_eagain());
561 }
562 
563 
564 static void start_poll_test(void) {
565   int i, r;
566 
567 #ifdef _WIN32
568   {
569     struct WSAData wsa_data;
570     int r = WSAStartup(MAKEWORD(2, 2), &wsa_data);
571     ASSERT(r == 0);
572   }
573 #endif
574 
575   start_server();
576 
577   for (i = 0; i < NUM_CLIENTS; i++)
578     start_client();
579 
580   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
581   ASSERT(r == 0);
582 
583   /* Assert that at most five percent of the writable wakeups was spurious. */
584   ASSERT(spurious_writable_wakeups == 0 ||
585          (valid_writable_wakeups + spurious_writable_wakeups) /
586          spurious_writable_wakeups > 20);
587 
588   ASSERT(closed_connections == NUM_CLIENTS * 2);
589 #if !defined(__sun) && !defined(_AIX) && !defined(__MVS__)
590   ASSERT(disconnects == NUM_CLIENTS * 2);
591 #endif
592   MAKE_VALGRIND_HAPPY();
593 }
594 
595 
596 /* Issuing a shutdown() on IBM i PASE with parameter SHUT_WR
597  * also sends a normal close sequence to the partner program.
598  * This leads to timing issues and ECONNRESET failures in the
599  * test 'poll_duplex' and 'poll_unidirectional'.
600  *
601  * https://www.ibm.com/support/knowledgecenter/en/ssw_ibm_i_74/apis/shutdn.htm
602  */
603 TEST_IMPL(poll_duplex) {
604 #if defined(NO_SELF_CONNECT)
605   RETURN_SKIP(NO_SELF_CONNECT);
606 #elif defined(__PASE__)
607   RETURN_SKIP("API shutdown() may lead to timing issue on IBM i PASE");
608 #endif
609   test_mode = DUPLEX;
610   start_poll_test();
611   return 0;
612 }
613 
614 
615 TEST_IMPL(poll_unidirectional) {
616 #if defined(NO_SELF_CONNECT)
617   RETURN_SKIP(NO_SELF_CONNECT);
618 #elif defined(__PASE__)
619   RETURN_SKIP("API shutdown() may lead to timing issue on IBM i PASE");
620 #endif
621   test_mode = UNIDIRECTIONAL;
622   start_poll_test();
623   return 0;
624 }
625 
626 
627 /* Windows won't let you open a directory so we open a file instead.
628  * OS X lets you poll a file so open the $PWD instead.  Both fail
629  * on Linux so it doesn't matter which one we pick.  Both succeed
630  * on FreeBSD, Solaris and AIX so skip the test on those platforms.
631  */
632 TEST_IMPL(poll_bad_fdtype) {
633 #if !defined(__DragonFly__) && !defined(__FreeBSD__) && !defined(__sun) && \
634     !defined(_AIX) && !defined(__MVS__) && !defined(__FreeBSD_kernel__) && \
635     !defined(__OpenBSD__) && !defined(__CYGWIN__) && !defined(__MSYS__) && \
636     !defined(__NetBSD__)
637   uv_poll_t poll_handle;
638   int fd;
639 
640 #if defined(_WIN32)
641   fd = open("test/fixtures/empty_file", O_RDONLY);
642 #else
643   fd = open(".", O_RDONLY);
644 #endif
645   ASSERT(fd != -1);
646   ASSERT(0 != uv_poll_init(uv_default_loop(), &poll_handle, fd));
647   ASSERT(0 == close(fd));
648 #endif
649 
650   MAKE_VALGRIND_HAPPY();
651   return 0;
652 }
653 
654 
655 #ifdef __linux__
656 TEST_IMPL(poll_nested_epoll) {
657   uv_poll_t poll_handle;
658   int fd;
659 
660   fd = epoll_create(1);
661   ASSERT(fd != -1);
662 
663   ASSERT(0 == uv_poll_init(uv_default_loop(), &poll_handle, fd));
664   ASSERT(0 == uv_poll_start(&poll_handle, UV_READABLE, (uv_poll_cb) abort));
665   ASSERT(0 != uv_run(uv_default_loop(), UV_RUN_NOWAIT));
666 
667   uv_close((uv_handle_t*) &poll_handle, NULL);
668   ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT));
669   ASSERT(0 == close(fd));
670 
671   MAKE_VALGRIND_HAPPY();
672   return 0;
673 }
674 #endif  /* __linux__ */
675 
676 
677 #ifdef UV_HAVE_KQUEUE
678 TEST_IMPL(poll_nested_kqueue) {
679   uv_poll_t poll_handle;
680   int fd;
681 
682   fd = kqueue();
683   ASSERT(fd != -1);
684 
685   ASSERT(0 == uv_poll_init(uv_default_loop(), &poll_handle, fd));
686   ASSERT(0 == uv_poll_start(&poll_handle, UV_READABLE, (uv_poll_cb) abort));
687   ASSERT(0 != uv_run(uv_default_loop(), UV_RUN_NOWAIT));
688 
689   uv_close((uv_handle_t*) &poll_handle, NULL);
690   ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT));
691   ASSERT(0 == close(fd));
692 
693   MAKE_VALGRIND_HAPPY();
694   return 0;
695 }
696 #endif  /* UV_HAVE_KQUEUE */
697