• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include <assert.h>
23 #include <stdlib.h>
24 
25 #include "uv.h"
26 #include "internal.h"
27 #include "handle-inl.h"
28 #include "stream-inl.h"
29 #include "req-inl.h"
30 
31 
32 /*
33  * Threshold of active tcp streams for which to preallocate tcp read buffers.
34  * (Due to node slab allocator performing poorly under this pattern,
35  *  the optimization is temporarily disabled (threshold=0).  This will be
36  *  revisited once node allocator is improved.)
37  */
38 const unsigned int uv_active_tcp_streams_threshold = 0;
39 
40 /*
41  * Number of simultaneous pending AcceptEx calls.
42  */
43 const unsigned int uv_simultaneous_server_accepts = 32;
44 
45 /* A zero-size buffer for use by uv_tcp_read */
46 static char uv_zero_[] = "";
47 
uv__tcp_nodelay(uv_tcp_t * handle,SOCKET socket,int enable)48 static int uv__tcp_nodelay(uv_tcp_t* handle, SOCKET socket, int enable) {
49   if (setsockopt(socket,
50                  IPPROTO_TCP,
51                  TCP_NODELAY,
52                  (const char*)&enable,
53                  sizeof enable) == -1) {
54     return WSAGetLastError();
55   }
56   return 0;
57 }
58 
59 
uv__tcp_keepalive(uv_tcp_t * handle,SOCKET socket,int enable,unsigned int delay)60 static int uv__tcp_keepalive(uv_tcp_t* handle, SOCKET socket, int enable, unsigned int delay) {
61   if (setsockopt(socket,
62                  SOL_SOCKET,
63                  SO_KEEPALIVE,
64                  (const char*)&enable,
65                  sizeof enable) == -1) {
66     return WSAGetLastError();
67   }
68 
69   if (enable && setsockopt(socket,
70                            IPPROTO_TCP,
71                            TCP_KEEPALIVE,
72                            (const char*)&delay,
73                            sizeof delay) == -1) {
74     return WSAGetLastError();
75   }
76 
77   return 0;
78 }
79 
80 
uv__tcp_set_socket(uv_loop_t * loop,uv_tcp_t * handle,SOCKET socket,int family,int imported)81 static int uv__tcp_set_socket(uv_loop_t* loop,
82                               uv_tcp_t* handle,
83                               SOCKET socket,
84                               int family,
85                               int imported) {
86   DWORD yes = 1;
87   int non_ifs_lsp;
88   int err;
89 
90   if (handle->socket != INVALID_SOCKET)
91     return UV_EBUSY;
92 
93   /* Set the socket to nonblocking mode */
94   if (ioctlsocket(socket, FIONBIO, &yes) == SOCKET_ERROR) {
95     return WSAGetLastError();
96   }
97 
98   /* Make the socket non-inheritable */
99   if (!SetHandleInformation((HANDLE) socket, HANDLE_FLAG_INHERIT, 0))
100     return GetLastError();
101 
102   /* Associate it with the I/O completion port. Use uv_handle_t pointer as
103    * completion key. */
104   if (CreateIoCompletionPort((HANDLE)socket,
105                              loop->iocp,
106                              (ULONG_PTR)socket,
107                              0) == NULL) {
108     if (imported) {
109       handle->flags |= UV_HANDLE_EMULATE_IOCP;
110     } else {
111       return GetLastError();
112     }
113   }
114 
115   if (family == AF_INET6) {
116     non_ifs_lsp = uv_tcp_non_ifs_lsp_ipv6;
117   } else {
118     non_ifs_lsp = uv_tcp_non_ifs_lsp_ipv4;
119   }
120 
121   if (!(handle->flags & UV_HANDLE_EMULATE_IOCP) && !non_ifs_lsp) {
122     UCHAR sfcnm_flags =
123         FILE_SKIP_SET_EVENT_ON_HANDLE | FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
124     if (!SetFileCompletionNotificationModes((HANDLE) socket, sfcnm_flags))
125       return GetLastError();
126     handle->flags |= UV_HANDLE_SYNC_BYPASS_IOCP;
127   }
128 
129   if (handle->flags & UV_HANDLE_TCP_NODELAY) {
130     err = uv__tcp_nodelay(handle, socket, 1);
131     if (err)
132       return err;
133   }
134 
135   /* TODO: Use stored delay. */
136   if (handle->flags & UV_HANDLE_TCP_KEEPALIVE) {
137     err = uv__tcp_keepalive(handle, socket, 1, 60);
138     if (err)
139       return err;
140   }
141 
142   handle->socket = socket;
143 
144   if (family == AF_INET6) {
145     handle->flags |= UV_HANDLE_IPV6;
146   } else {
147     assert(!(handle->flags & UV_HANDLE_IPV6));
148   }
149 
150   return 0;
151 }
152 
153 
uv_tcp_init_ex(uv_loop_t * loop,uv_tcp_t * handle,unsigned int flags)154 int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* handle, unsigned int flags) {
155   int domain;
156 
157   /* Use the lower 8 bits for the domain */
158   domain = flags & 0xFF;
159   if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
160     return UV_EINVAL;
161 
162   if (flags & ~0xFF)
163     return UV_EINVAL;
164 
165   uv__stream_init(loop, (uv_stream_t*) handle, UV_TCP);
166   handle->tcp.serv.accept_reqs = NULL;
167   handle->tcp.serv.pending_accepts = NULL;
168   handle->socket = INVALID_SOCKET;
169   handle->reqs_pending = 0;
170   handle->tcp.serv.func_acceptex = NULL;
171   handle->tcp.conn.func_connectex = NULL;
172   handle->tcp.serv.processed_accepts = 0;
173   handle->delayed_error = 0;
174 
175   /* If anything fails beyond this point we need to remove the handle from
176    * the handle queue, since it was added by uv__handle_init in uv__stream_init.
177    */
178 
179   if (domain != AF_UNSPEC) {
180     SOCKET sock;
181     DWORD err;
182 
183     sock = socket(domain, SOCK_STREAM, 0);
184     if (sock == INVALID_SOCKET) {
185       err = WSAGetLastError();
186       QUEUE_REMOVE(&handle->handle_queue);
187       return uv_translate_sys_error(err);
188     }
189 
190     err = uv__tcp_set_socket(handle->loop, handle, sock, domain, 0);
191     if (err) {
192       closesocket(sock);
193       QUEUE_REMOVE(&handle->handle_queue);
194       return uv_translate_sys_error(err);
195     }
196 
197   }
198 
199   return 0;
200 }
201 
202 
uv_tcp_init(uv_loop_t * loop,uv_tcp_t * handle)203 int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) {
204   return uv_tcp_init_ex(loop, handle, AF_UNSPEC);
205 }
206 
207 
uv__tcp_endgame(uv_loop_t * loop,uv_tcp_t * handle)208 void uv__tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) {
209   int err;
210   unsigned int i;
211   uv_tcp_accept_t* req;
212 
213   if (handle->flags & UV_HANDLE_CONNECTION &&
214       handle->stream.conn.shutdown_req != NULL &&
215       handle->stream.conn.write_reqs_pending == 0) {
216 
217     UNREGISTER_HANDLE_REQ(loop, handle, handle->stream.conn.shutdown_req);
218 
219     err = 0;
220     if (handle->flags & UV_HANDLE_CLOSING) {
221       err = ERROR_OPERATION_ABORTED;
222     } else if (shutdown(handle->socket, SD_SEND) == SOCKET_ERROR) {
223       err = WSAGetLastError();
224     }
225 
226     if (handle->stream.conn.shutdown_req->cb) {
227       handle->stream.conn.shutdown_req->cb(handle->stream.conn.shutdown_req,
228                                uv_translate_sys_error(err));
229     }
230 
231     handle->stream.conn.shutdown_req = NULL;
232     DECREASE_PENDING_REQ_COUNT(handle);
233     return;
234   }
235 
236   if (handle->flags & UV_HANDLE_CLOSING &&
237       handle->reqs_pending == 0) {
238     assert(!(handle->flags & UV_HANDLE_CLOSED));
239     assert(handle->socket == INVALID_SOCKET);
240 
241     if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->tcp.serv.accept_reqs) {
242       if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
243         for (i = 0; i < uv_simultaneous_server_accepts; i++) {
244           req = &handle->tcp.serv.accept_reqs[i];
245           if (req->wait_handle != INVALID_HANDLE_VALUE) {
246             UnregisterWait(req->wait_handle);
247             req->wait_handle = INVALID_HANDLE_VALUE;
248           }
249           if (req->event_handle != NULL) {
250             CloseHandle(req->event_handle);
251             req->event_handle = NULL;
252           }
253         }
254       }
255 
256       uv__free(handle->tcp.serv.accept_reqs);
257       handle->tcp.serv.accept_reqs = NULL;
258     }
259 
260     if (handle->flags & UV_HANDLE_CONNECTION &&
261         handle->flags & UV_HANDLE_EMULATE_IOCP) {
262       if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
263         UnregisterWait(handle->read_req.wait_handle);
264         handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
265       }
266       if (handle->read_req.event_handle != NULL) {
267         CloseHandle(handle->read_req.event_handle);
268         handle->read_req.event_handle = NULL;
269       }
270     }
271 
272     uv__handle_close(handle);
273     loop->active_tcp_streams--;
274   }
275 }
276 
277 
278 /* Unlike on Unix, here we don't set SO_REUSEADDR, because it doesn't just
279  * allow binding to addresses that are in use by sockets in TIME_WAIT, it
280  * effectively allows 'stealing' a port which is in use by another application.
281  *
282  * SO_EXCLUSIVEADDRUSE is also not good here because it does check all sockets,
283  * regardless of state, so we'd get an error even if the port is in use by a
284  * socket in TIME_WAIT state.
285  *
286  * See issue #1360.
287  *
288  */
uv__tcp_try_bind(uv_tcp_t * handle,const struct sockaddr * addr,unsigned int addrlen,unsigned int flags)289 static int uv__tcp_try_bind(uv_tcp_t* handle,
290                             const struct sockaddr* addr,
291                             unsigned int addrlen,
292                             unsigned int flags) {
293   DWORD err;
294   int r;
295 
296   if (handle->socket == INVALID_SOCKET) {
297     SOCKET sock;
298 
299     /* Cannot set IPv6-only mode on non-IPv6 socket. */
300     if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)
301       return ERROR_INVALID_PARAMETER;
302 
303     sock = socket(addr->sa_family, SOCK_STREAM, 0);
304     if (sock == INVALID_SOCKET) {
305       return WSAGetLastError();
306     }
307 
308     err = uv__tcp_set_socket(handle->loop, handle, sock, addr->sa_family, 0);
309     if (err) {
310       closesocket(sock);
311       return err;
312     }
313   }
314 
315 #ifdef IPV6_V6ONLY
316   if (addr->sa_family == AF_INET6) {
317     int on;
318 
319     on = (flags & UV_TCP_IPV6ONLY) != 0;
320 
321     /* TODO: how to handle errors? This may fail if there is no ipv4 stack
322      * available, or when run on XP/2003 which have no support for dualstack
323      * sockets. For now we're silently ignoring the error. */
324     setsockopt(handle->socket,
325                IPPROTO_IPV6,
326                IPV6_V6ONLY,
327                (const char*)&on,
328                sizeof on);
329   }
330 #endif
331 
332   r = bind(handle->socket, addr, addrlen);
333 
334   if (r == SOCKET_ERROR) {
335     err = WSAGetLastError();
336     if (err == WSAEADDRINUSE) {
337       /* Some errors are not to be reported until connect() or listen() */
338       handle->delayed_error = err;
339     } else {
340       return err;
341     }
342   }
343 
344   handle->flags |= UV_HANDLE_BOUND;
345 
346   return 0;
347 }
348 
349 
post_completion(void * context,BOOLEAN timed_out)350 static void CALLBACK post_completion(void* context, BOOLEAN timed_out) {
351   uv_req_t* req;
352   uv_tcp_t* handle;
353 
354   req = (uv_req_t*) context;
355   assert(req != NULL);
356   handle = (uv_tcp_t*)req->data;
357   assert(handle != NULL);
358   assert(!timed_out);
359 
360   if (!PostQueuedCompletionStatus(handle->loop->iocp,
361                                   req->u.io.overlapped.InternalHigh,
362                                   0,
363                                   &req->u.io.overlapped)) {
364     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
365   }
366 }
367 
368 
post_write_completion(void * context,BOOLEAN timed_out)369 static void CALLBACK post_write_completion(void* context, BOOLEAN timed_out) {
370   uv_write_t* req;
371   uv_tcp_t* handle;
372 
373   req = (uv_write_t*) context;
374   assert(req != NULL);
375   handle = (uv_tcp_t*)req->handle;
376   assert(handle != NULL);
377   assert(!timed_out);
378 
379   if (!PostQueuedCompletionStatus(handle->loop->iocp,
380                                   req->u.io.overlapped.InternalHigh,
381                                   0,
382                                   &req->u.io.overlapped)) {
383     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
384   }
385 }
386 
387 
uv__tcp_queue_accept(uv_tcp_t * handle,uv_tcp_accept_t * req)388 static void uv__tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
389   uv_loop_t* loop = handle->loop;
390   BOOL success;
391   DWORD bytes;
392   SOCKET accept_socket;
393   short family;
394 
395   assert(handle->flags & UV_HANDLE_LISTENING);
396   assert(req->accept_socket == INVALID_SOCKET);
397 
398   /* choose family and extension function */
399   if (handle->flags & UV_HANDLE_IPV6) {
400     family = AF_INET6;
401   } else {
402     family = AF_INET;
403   }
404 
405   /* Open a socket for the accepted connection. */
406   accept_socket = socket(family, SOCK_STREAM, 0);
407   if (accept_socket == INVALID_SOCKET) {
408     SET_REQ_ERROR(req, WSAGetLastError());
409     uv__insert_pending_req(loop, (uv_req_t*)req);
410     handle->reqs_pending++;
411     return;
412   }
413 
414   /* Make the socket non-inheritable */
415   if (!SetHandleInformation((HANDLE) accept_socket, HANDLE_FLAG_INHERIT, 0)) {
416     SET_REQ_ERROR(req, GetLastError());
417     uv__insert_pending_req(loop, (uv_req_t*)req);
418     handle->reqs_pending++;
419     closesocket(accept_socket);
420     return;
421   }
422 
423   /* Prepare the overlapped structure. */
424   memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
425   if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
426     assert(req->event_handle != NULL);
427     req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
428   }
429 
430   success = handle->tcp.serv.func_acceptex(handle->socket,
431                                           accept_socket,
432                                           (void*)req->accept_buffer,
433                                           0,
434                                           sizeof(struct sockaddr_storage),
435                                           sizeof(struct sockaddr_storage),
436                                           &bytes,
437                                           &req->u.io.overlapped);
438 
439   if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
440     /* Process the req without IOCP. */
441     req->accept_socket = accept_socket;
442     handle->reqs_pending++;
443     uv__insert_pending_req(loop, (uv_req_t*)req);
444   } else if (UV_SUCCEEDED_WITH_IOCP(success)) {
445     /* The req will be processed with IOCP. */
446     req->accept_socket = accept_socket;
447     handle->reqs_pending++;
448     if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
449         req->wait_handle == INVALID_HANDLE_VALUE &&
450         !RegisterWaitForSingleObject(&req->wait_handle,
451           req->event_handle, post_completion, (void*) req,
452           INFINITE, WT_EXECUTEINWAITTHREAD)) {
453       SET_REQ_ERROR(req, GetLastError());
454       uv__insert_pending_req(loop, (uv_req_t*)req);
455     }
456   } else {
457     /* Make this req pending reporting an error. */
458     SET_REQ_ERROR(req, WSAGetLastError());
459     uv__insert_pending_req(loop, (uv_req_t*)req);
460     handle->reqs_pending++;
461     /* Destroy the preallocated client socket. */
462     closesocket(accept_socket);
463     /* Destroy the event handle */
464     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
465       CloseHandle(req->event_handle);
466       req->event_handle = NULL;
467     }
468   }
469 }
470 
471 
uv__tcp_queue_read(uv_loop_t * loop,uv_tcp_t * handle)472 static void uv__tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) {
473   uv_read_t* req;
474   uv_buf_t buf;
475   int result;
476   DWORD bytes, flags;
477 
478   assert(handle->flags & UV_HANDLE_READING);
479   assert(!(handle->flags & UV_HANDLE_READ_PENDING));
480 
481   req = &handle->read_req;
482   memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
483 
484   /*
485    * Preallocate a read buffer if the number of active streams is below
486    * the threshold.
487   */
488   if (loop->active_tcp_streams < uv_active_tcp_streams_threshold) {
489     handle->flags &= ~UV_HANDLE_ZERO_READ;
490     handle->tcp.conn.read_buffer = uv_buf_init(NULL, 0);
491     handle->alloc_cb((uv_handle_t*) handle, 65536, &handle->tcp.conn.read_buffer);
492     if (handle->tcp.conn.read_buffer.base == NULL ||
493         handle->tcp.conn.read_buffer.len == 0) {
494       handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &handle->tcp.conn.read_buffer);
495       return;
496     }
497     assert(handle->tcp.conn.read_buffer.base != NULL);
498     buf = handle->tcp.conn.read_buffer;
499   } else {
500     handle->flags |= UV_HANDLE_ZERO_READ;
501     buf.base = (char*) &uv_zero_;
502     buf.len = 0;
503   }
504 
505   /* Prepare the overlapped structure. */
506   memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
507   if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
508     assert(req->event_handle != NULL);
509     req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
510   }
511 
512   flags = 0;
513   result = WSARecv(handle->socket,
514                    (WSABUF*)&buf,
515                    1,
516                    &bytes,
517                    &flags,
518                    &req->u.io.overlapped,
519                    NULL);
520 
521   handle->flags |= UV_HANDLE_READ_PENDING;
522   handle->reqs_pending++;
523 
524   if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
525     /* Process the req without IOCP. */
526     req->u.io.overlapped.InternalHigh = bytes;
527     uv__insert_pending_req(loop, (uv_req_t*)req);
528   } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
529     /* The req will be processed with IOCP. */
530     if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
531         req->wait_handle == INVALID_HANDLE_VALUE &&
532         !RegisterWaitForSingleObject(&req->wait_handle,
533           req->event_handle, post_completion, (void*) req,
534           INFINITE, WT_EXECUTEINWAITTHREAD)) {
535       SET_REQ_ERROR(req, GetLastError());
536       uv__insert_pending_req(loop, (uv_req_t*)req);
537     }
538   } else {
539     /* Make this req pending reporting an error. */
540     SET_REQ_ERROR(req, WSAGetLastError());
541     uv__insert_pending_req(loop, (uv_req_t*)req);
542   }
543 }
544 
545 
uv_tcp_close_reset(uv_tcp_t * handle,uv_close_cb close_cb)546 int uv_tcp_close_reset(uv_tcp_t* handle, uv_close_cb close_cb) {
547   struct linger l = { 1, 0 };
548 
549   /* Disallow setting SO_LINGER to zero due to some platform inconsistencies */
550   if (handle->flags & UV_HANDLE_SHUTTING)
551     return UV_EINVAL;
552 
553   if (0 != setsockopt(handle->socket, SOL_SOCKET, SO_LINGER, (const char*)&l, sizeof(l)))
554     return uv_translate_sys_error(WSAGetLastError());
555 
556   uv_close((uv_handle_t*) handle, close_cb);
557   return 0;
558 }
559 
560 
uv__tcp_listen(uv_tcp_t * handle,int backlog,uv_connection_cb cb)561 int uv__tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
562   unsigned int i, simultaneous_accepts;
563   uv_tcp_accept_t* req;
564   int err;
565 
566   assert(backlog > 0);
567 
568   if (handle->flags & UV_HANDLE_LISTENING) {
569     handle->stream.serv.connection_cb = cb;
570   }
571 
572   if (handle->flags & UV_HANDLE_READING) {
573     return WSAEISCONN;
574   }
575 
576   if (handle->delayed_error) {
577     return handle->delayed_error;
578   }
579 
580   if (!(handle->flags & UV_HANDLE_BOUND)) {
581     err = uv__tcp_try_bind(handle,
582                            (const struct sockaddr*) &uv_addr_ip4_any_,
583                            sizeof(uv_addr_ip4_any_),
584                            0);
585     if (err)
586       return err;
587     if (handle->delayed_error)
588       return handle->delayed_error;
589   }
590 
591   if (!handle->tcp.serv.func_acceptex) {
592     if (!uv__get_acceptex_function(handle->socket, &handle->tcp.serv.func_acceptex)) {
593       return WSAEAFNOSUPPORT;
594     }
595   }
596 
597   /* If this flag is set, we already made this listen call in xfer. */
598   if (!(handle->flags & UV_HANDLE_SHARED_TCP_SOCKET) &&
599       listen(handle->socket, backlog) == SOCKET_ERROR) {
600     return WSAGetLastError();
601   }
602 
603   handle->flags |= UV_HANDLE_LISTENING;
604   handle->stream.serv.connection_cb = cb;
605   INCREASE_ACTIVE_COUNT(loop, handle);
606 
607   simultaneous_accepts = handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT ? 1
608     : uv_simultaneous_server_accepts;
609 
610   if (handle->tcp.serv.accept_reqs == NULL) {
611     handle->tcp.serv.accept_reqs =
612       uv__malloc(uv_simultaneous_server_accepts * sizeof(uv_tcp_accept_t));
613     if (!handle->tcp.serv.accept_reqs) {
614       uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
615     }
616 
617     for (i = 0; i < simultaneous_accepts; i++) {
618       req = &handle->tcp.serv.accept_reqs[i];
619       UV_REQ_INIT(req, UV_ACCEPT);
620       req->accept_socket = INVALID_SOCKET;
621       req->data = handle;
622 
623       req->wait_handle = INVALID_HANDLE_VALUE;
624       if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
625         req->event_handle = CreateEvent(NULL, 0, 0, NULL);
626         if (req->event_handle == NULL) {
627           uv_fatal_error(GetLastError(), "CreateEvent");
628         }
629       } else {
630         req->event_handle = NULL;
631       }
632 
633       uv__tcp_queue_accept(handle, req);
634     }
635 
636     /* Initialize other unused requests too, because uv_tcp_endgame doesn't
637      * know how many requests were initialized, so it will try to clean up
638      * {uv_simultaneous_server_accepts} requests. */
639     for (i = simultaneous_accepts; i < uv_simultaneous_server_accepts; i++) {
640       req = &handle->tcp.serv.accept_reqs[i];
641       UV_REQ_INIT(req, UV_ACCEPT);
642       req->accept_socket = INVALID_SOCKET;
643       req->data = handle;
644       req->wait_handle = INVALID_HANDLE_VALUE;
645       req->event_handle = NULL;
646     }
647   }
648 
649   return 0;
650 }
651 
652 
uv__tcp_accept(uv_tcp_t * server,uv_tcp_t * client)653 int uv__tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
654   uv_loop_t* loop = server->loop;
655   int err = 0;
656   int family;
657 
658   uv_tcp_accept_t* req = server->tcp.serv.pending_accepts;
659 
660   if (!req) {
661     /* No valid connections found, so we error out. */
662     return WSAEWOULDBLOCK;
663   }
664 
665   if (req->accept_socket == INVALID_SOCKET) {
666     return WSAENOTCONN;
667   }
668 
669   if (server->flags & UV_HANDLE_IPV6) {
670     family = AF_INET6;
671   } else {
672     family = AF_INET;
673   }
674 
675   err = uv__tcp_set_socket(client->loop,
676                           client,
677                           req->accept_socket,
678                           family,
679                           0);
680   if (err) {
681     closesocket(req->accept_socket);
682   } else {
683     uv__connection_init((uv_stream_t*) client);
684     /* AcceptEx() implicitly binds the accepted socket. */
685     client->flags |= UV_HANDLE_BOUND | UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
686   }
687 
688   /* Prepare the req to pick up a new connection */
689   server->tcp.serv.pending_accepts = req->next_pending;
690   req->next_pending = NULL;
691   req->accept_socket = INVALID_SOCKET;
692 
693   if (!(server->flags & UV_HANDLE_CLOSING)) {
694     /* Check if we're in a middle of changing the number of pending accepts. */
695     if (!(server->flags & UV_HANDLE_TCP_ACCEPT_STATE_CHANGING)) {
696       uv__tcp_queue_accept(server, req);
697     } else {
698       /* We better be switching to a single pending accept. */
699       assert(server->flags & UV_HANDLE_TCP_SINGLE_ACCEPT);
700 
701       server->tcp.serv.processed_accepts++;
702 
703       if (server->tcp.serv.processed_accepts >= uv_simultaneous_server_accepts) {
704         server->tcp.serv.processed_accepts = 0;
705         /*
706          * All previously queued accept requests are now processed.
707          * We now switch to queueing just a single accept.
708          */
709         uv__tcp_queue_accept(server, &server->tcp.serv.accept_reqs[0]);
710         server->flags &= ~UV_HANDLE_TCP_ACCEPT_STATE_CHANGING;
711         server->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
712       }
713     }
714   }
715 
716   loop->active_tcp_streams++;
717 
718   return err;
719 }
720 
721 
uv__tcp_read_start(uv_tcp_t * handle,uv_alloc_cb alloc_cb,uv_read_cb read_cb)722 int uv__tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb,
723     uv_read_cb read_cb) {
724   uv_loop_t* loop = handle->loop;
725 
726   handle->flags |= UV_HANDLE_READING;
727   handle->read_cb = read_cb;
728   handle->alloc_cb = alloc_cb;
729   INCREASE_ACTIVE_COUNT(loop, handle);
730 
731   /* If reading was stopped and then started again, there could still be a read
732    * request pending. */
733   if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
734     if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
735         handle->read_req.event_handle == NULL) {
736       handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
737       if (handle->read_req.event_handle == NULL) {
738         uv_fatal_error(GetLastError(), "CreateEvent");
739       }
740     }
741     uv__tcp_queue_read(loop, handle);
742   }
743 
744   return 0;
745 }
746 
uv__is_loopback(const struct sockaddr_storage * storage)747 static int uv__is_loopback(const struct sockaddr_storage* storage) {
748   const struct sockaddr_in* in4;
749   const struct sockaddr_in6* in6;
750   int i;
751 
752   if (storage->ss_family == AF_INET) {
753     in4 = (const struct sockaddr_in*) storage;
754     return in4->sin_addr.S_un.S_un_b.s_b1 == 127;
755   }
756   if (storage->ss_family == AF_INET6) {
757     in6 = (const struct sockaddr_in6*) storage;
758     for (i = 0; i < 7; ++i) {
759       if (in6->sin6_addr.u.Word[i] != 0)
760         return 0;
761     }
762     return in6->sin6_addr.u.Word[7] == htons(1);
763   }
764   return 0;
765 }
766 
767 // Check if Windows version is 10.0.16299 or later
uv__is_fast_loopback_fail_supported(void)768 static int uv__is_fast_loopback_fail_supported(void) {
769   OSVERSIONINFOW os_info;
770   if (!pRtlGetVersion)
771     return 0;
772   pRtlGetVersion(&os_info);
773   if (os_info.dwMajorVersion < 10)
774     return 0;
775   if (os_info.dwMajorVersion > 10)
776     return 1;
777   if (os_info.dwMinorVersion > 0)
778     return 1;
779   return os_info.dwBuildNumber >= 16299;
780 }
781 
uv__tcp_try_connect(uv_connect_t * req,uv_tcp_t * handle,const struct sockaddr * addr,unsigned int addrlen,uv_connect_cb cb)782 static int uv__tcp_try_connect(uv_connect_t* req,
783                               uv_tcp_t* handle,
784                               const struct sockaddr* addr,
785                               unsigned int addrlen,
786                               uv_connect_cb cb) {
787   uv_loop_t* loop = handle->loop;
788   TCP_INITIAL_RTO_PARAMETERS retransmit_ioctl;
789   const struct sockaddr* bind_addr;
790   struct sockaddr_storage converted;
791   BOOL success;
792   DWORD bytes;
793   int err;
794 
795   err = uv__convert_to_localhost_if_unspecified(addr, &converted);
796   if (err)
797     return err;
798 
799   if (handle->delayed_error != 0)
800     goto out;
801 
802   if (!(handle->flags & UV_HANDLE_BOUND)) {
803     if (addrlen == sizeof(uv_addr_ip4_any_)) {
804       bind_addr = (const struct sockaddr*) &uv_addr_ip4_any_;
805     } else if (addrlen == sizeof(uv_addr_ip6_any_)) {
806       bind_addr = (const struct sockaddr*) &uv_addr_ip6_any_;
807     } else {
808       abort();
809     }
810     err = uv__tcp_try_bind(handle, bind_addr, addrlen, 0);
811     if (err)
812       return err;
813     if (handle->delayed_error != 0)
814       goto out;
815   }
816 
817   if (!handle->tcp.conn.func_connectex) {
818     if (!uv__get_connectex_function(handle->socket, &handle->tcp.conn.func_connectex)) {
819       return WSAEAFNOSUPPORT;
820     }
821   }
822 
823   /* This makes connect() fail instantly if the target port on the localhost
824    * is not reachable, instead of waiting for 2s. We do not care if this fails.
825    * This only works on Windows version 10.0.16299 and later.
826    */
827   if (uv__is_fast_loopback_fail_supported() && uv__is_loopback(&converted)) {
828     memset(&retransmit_ioctl, 0, sizeof(retransmit_ioctl));
829     retransmit_ioctl.Rtt = TCP_INITIAL_RTO_NO_SYN_RETRANSMISSIONS;
830     retransmit_ioctl.MaxSynRetransmissions = TCP_INITIAL_RTO_NO_SYN_RETRANSMISSIONS;
831     WSAIoctl(handle->socket,
832              SIO_TCP_INITIAL_RTO,
833              &retransmit_ioctl,
834              sizeof(retransmit_ioctl),
835              NULL,
836              0,
837              &bytes,
838              NULL,
839              NULL);
840   }
841 
842 out:
843 
844   UV_REQ_INIT(req, UV_CONNECT);
845   req->handle = (uv_stream_t*) handle;
846   req->cb = cb;
847   memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
848 
849   if (handle->delayed_error != 0) {
850     /* Process the req without IOCP. */
851     handle->reqs_pending++;
852     REGISTER_HANDLE_REQ(loop, handle, req);
853     uv__insert_pending_req(loop, (uv_req_t*)req);
854     return 0;
855   }
856 
857   success = handle->tcp.conn.func_connectex(handle->socket,
858                                             (const struct sockaddr*) &converted,
859                                             addrlen,
860                                             NULL,
861                                             0,
862                                             &bytes,
863                                             &req->u.io.overlapped);
864 
865   if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
866     /* Process the req without IOCP. */
867     handle->reqs_pending++;
868     REGISTER_HANDLE_REQ(loop, handle, req);
869     uv__insert_pending_req(loop, (uv_req_t*)req);
870   } else if (UV_SUCCEEDED_WITH_IOCP(success)) {
871     /* The req will be processed with IOCP. */
872     handle->reqs_pending++;
873     REGISTER_HANDLE_REQ(loop, handle, req);
874   } else {
875     return WSAGetLastError();
876   }
877 
878   return 0;
879 }
880 
881 
uv_tcp_getsockname(const uv_tcp_t * handle,struct sockaddr * name,int * namelen)882 int uv_tcp_getsockname(const uv_tcp_t* handle,
883                        struct sockaddr* name,
884                        int* namelen) {
885 
886   return uv__getsockpeername((const uv_handle_t*) handle,
887                              getsockname,
888                              name,
889                              namelen,
890                              handle->delayed_error);
891 }
892 
893 
uv_tcp_getpeername(const uv_tcp_t * handle,struct sockaddr * name,int * namelen)894 int uv_tcp_getpeername(const uv_tcp_t* handle,
895                        struct sockaddr* name,
896                        int* namelen) {
897 
898   return uv__getsockpeername((const uv_handle_t*) handle,
899                              getpeername,
900                              name,
901                              namelen,
902                              handle->delayed_error);
903 }
904 
905 
uv__tcp_write(uv_loop_t * loop,uv_write_t * req,uv_tcp_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_write_cb cb)906 int uv__tcp_write(uv_loop_t* loop,
907                  uv_write_t* req,
908                  uv_tcp_t* handle,
909                  const uv_buf_t bufs[],
910                  unsigned int nbufs,
911                  uv_write_cb cb) {
912   int result;
913   DWORD bytes;
914 
915   UV_REQ_INIT(req, UV_WRITE);
916   req->handle = (uv_stream_t*) handle;
917   req->cb = cb;
918 
919   /* Prepare the overlapped structure. */
920   memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
921   if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
922     req->event_handle = CreateEvent(NULL, 0, 0, NULL);
923     if (req->event_handle == NULL) {
924       uv_fatal_error(GetLastError(), "CreateEvent");
925     }
926     req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
927     req->wait_handle = INVALID_HANDLE_VALUE;
928   }
929 
930   result = WSASend(handle->socket,
931                    (WSABUF*) bufs,
932                    nbufs,
933                    &bytes,
934                    0,
935                    &req->u.io.overlapped,
936                    NULL);
937 
938   if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
939     /* Request completed immediately. */
940     req->u.io.queued_bytes = 0;
941     handle->reqs_pending++;
942     handle->stream.conn.write_reqs_pending++;
943     REGISTER_HANDLE_REQ(loop, handle, req);
944     uv__insert_pending_req(loop, (uv_req_t*) req);
945   } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
946     /* Request queued by the kernel. */
947     req->u.io.queued_bytes = uv__count_bufs(bufs, nbufs);
948     handle->reqs_pending++;
949     handle->stream.conn.write_reqs_pending++;
950     REGISTER_HANDLE_REQ(loop, handle, req);
951     handle->write_queue_size += req->u.io.queued_bytes;
952     if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
953         !RegisterWaitForSingleObject(&req->wait_handle,
954           req->event_handle, post_write_completion, (void*) req,
955           INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
956       SET_REQ_ERROR(req, GetLastError());
957       uv__insert_pending_req(loop, (uv_req_t*)req);
958     }
959   } else {
960     /* Send failed due to an error, report it later */
961     req->u.io.queued_bytes = 0;
962     handle->reqs_pending++;
963     handle->stream.conn.write_reqs_pending++;
964     REGISTER_HANDLE_REQ(loop, handle, req);
965     SET_REQ_ERROR(req, WSAGetLastError());
966     uv__insert_pending_req(loop, (uv_req_t*) req);
967   }
968 
969   return 0;
970 }
971 
972 
uv__tcp_try_write(uv_tcp_t * handle,const uv_buf_t bufs[],unsigned int nbufs)973 int uv__tcp_try_write(uv_tcp_t* handle,
974                      const uv_buf_t bufs[],
975                      unsigned int nbufs) {
976   int result;
977   DWORD bytes;
978 
979   if (handle->stream.conn.write_reqs_pending > 0)
980     return UV_EAGAIN;
981 
982   result = WSASend(handle->socket,
983                    (WSABUF*) bufs,
984                    nbufs,
985                    &bytes,
986                    0,
987                    NULL,
988                    NULL);
989 
990   if (result == SOCKET_ERROR)
991     return uv_translate_sys_error(WSAGetLastError());
992   else
993     return bytes;
994 }
995 
996 
uv__process_tcp_read_req(uv_loop_t * loop,uv_tcp_t * handle,uv_req_t * req)997 void uv__process_tcp_read_req(uv_loop_t* loop, uv_tcp_t* handle,
998     uv_req_t* req) {
999   DWORD bytes, flags, err;
1000   uv_buf_t buf;
1001   int count;
1002 
1003   assert(handle->type == UV_TCP);
1004 
1005   handle->flags &= ~UV_HANDLE_READ_PENDING;
1006 
1007   if (!REQ_SUCCESS(req)) {
1008     /* An error occurred doing the read. */
1009     if ((handle->flags & UV_HANDLE_READING) ||
1010         !(handle->flags & UV_HANDLE_ZERO_READ)) {
1011       handle->flags &= ~UV_HANDLE_READING;
1012       DECREASE_ACTIVE_COUNT(loop, handle);
1013       buf = (handle->flags & UV_HANDLE_ZERO_READ) ?
1014             uv_buf_init(NULL, 0) : handle->tcp.conn.read_buffer;
1015 
1016       err = GET_REQ_SOCK_ERROR(req);
1017 
1018       if (err == WSAECONNABORTED) {
1019         /* Turn WSAECONNABORTED into UV_ECONNRESET to be consistent with Unix.
1020          */
1021         err = WSAECONNRESET;
1022       }
1023       handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1024 
1025       handle->read_cb((uv_stream_t*)handle,
1026                       uv_translate_sys_error(err),
1027                       &buf);
1028     }
1029   } else {
1030     if (!(handle->flags & UV_HANDLE_ZERO_READ)) {
1031       /* The read was done with a non-zero buffer length. */
1032       if (req->u.io.overlapped.InternalHigh > 0) {
1033         /* Successful read */
1034         handle->read_cb((uv_stream_t*)handle,
1035                         req->u.io.overlapped.InternalHigh,
1036                         &handle->tcp.conn.read_buffer);
1037         /* Read again only if bytes == buf.len */
1038         if (req->u.io.overlapped.InternalHigh < handle->tcp.conn.read_buffer.len) {
1039           goto done;
1040         }
1041       } else {
1042         /* Connection closed */
1043         if (handle->flags & UV_HANDLE_READING) {
1044           handle->flags &= ~UV_HANDLE_READING;
1045           DECREASE_ACTIVE_COUNT(loop, handle);
1046         }
1047 
1048         buf.base = 0;
1049         buf.len = 0;
1050         handle->read_cb((uv_stream_t*)handle, UV_EOF, &handle->tcp.conn.read_buffer);
1051         goto done;
1052       }
1053     }
1054 
1055     /* Do nonblocking reads until the buffer is empty */
1056     count = 32;
1057     while ((handle->flags & UV_HANDLE_READING) && (count-- > 0)) {
1058       buf = uv_buf_init(NULL, 0);
1059       handle->alloc_cb((uv_handle_t*) handle, 65536, &buf);
1060       if (buf.base == NULL || buf.len == 0) {
1061         handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1062         break;
1063       }
1064       assert(buf.base != NULL);
1065 
1066       flags = 0;
1067       if (WSARecv(handle->socket,
1068                   (WSABUF*)&buf,
1069                   1,
1070                   &bytes,
1071                   &flags,
1072                   NULL,
1073                   NULL) != SOCKET_ERROR) {
1074         if (bytes > 0) {
1075           /* Successful read */
1076           handle->read_cb((uv_stream_t*)handle, bytes, &buf);
1077           /* Read again only if bytes == buf.len */
1078           if (bytes < buf.len) {
1079             break;
1080           }
1081         } else {
1082           /* Connection closed */
1083           handle->flags &= ~UV_HANDLE_READING;
1084           DECREASE_ACTIVE_COUNT(loop, handle);
1085 
1086           handle->read_cb((uv_stream_t*)handle, UV_EOF, &buf);
1087           break;
1088         }
1089       } else {
1090         err = WSAGetLastError();
1091         if (err == WSAEWOULDBLOCK) {
1092           /* Read buffer was completely empty, report a 0-byte read. */
1093           handle->read_cb((uv_stream_t*)handle, 0, &buf);
1094         } else {
1095           /* Ouch! serious error. */
1096           handle->flags &= ~UV_HANDLE_READING;
1097           DECREASE_ACTIVE_COUNT(loop, handle);
1098 
1099           if (err == WSAECONNABORTED) {
1100             /* Turn WSAECONNABORTED into UV_ECONNRESET to be consistent with
1101              * Unix. */
1102             err = WSAECONNRESET;
1103           }
1104           handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1105 
1106           handle->read_cb((uv_stream_t*)handle,
1107                           uv_translate_sys_error(err),
1108                           &buf);
1109         }
1110         break;
1111       }
1112     }
1113 
1114 done:
1115     /* Post another read if still reading and not closing. */
1116     if ((handle->flags & UV_HANDLE_READING) &&
1117         !(handle->flags & UV_HANDLE_READ_PENDING)) {
1118       uv__tcp_queue_read(loop, handle);
1119     }
1120   }
1121 
1122   DECREASE_PENDING_REQ_COUNT(handle);
1123 }
1124 
1125 
uv__process_tcp_write_req(uv_loop_t * loop,uv_tcp_t * handle,uv_write_t * req)1126 void uv__process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,
1127     uv_write_t* req) {
1128   int err;
1129 
1130   assert(handle->type == UV_TCP);
1131 
1132   assert(handle->write_queue_size >= req->u.io.queued_bytes);
1133   handle->write_queue_size -= req->u.io.queued_bytes;
1134 
1135   UNREGISTER_HANDLE_REQ(loop, handle, req);
1136 
1137   if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1138     if (req->wait_handle != INVALID_HANDLE_VALUE) {
1139       UnregisterWait(req->wait_handle);
1140       req->wait_handle = INVALID_HANDLE_VALUE;
1141     }
1142     if (req->event_handle != NULL) {
1143       CloseHandle(req->event_handle);
1144       req->event_handle = NULL;
1145     }
1146   }
1147 
1148   if (req->cb) {
1149     err = uv_translate_sys_error(GET_REQ_SOCK_ERROR(req));
1150     if (err == UV_ECONNABORTED) {
1151       /* use UV_ECANCELED for consistency with Unix */
1152       err = UV_ECANCELED;
1153     }
1154     req->cb(req, err);
1155   }
1156 
1157   handle->stream.conn.write_reqs_pending--;
1158   if (handle->stream.conn.write_reqs_pending == 0) {
1159     if (handle->flags & UV_HANDLE_CLOSING) {
1160       closesocket(handle->socket);
1161       handle->socket = INVALID_SOCKET;
1162     }
1163     if (handle->stream.conn.shutdown_req != NULL) {
1164       uv__want_endgame(loop, (uv_handle_t*)handle);
1165     }
1166   }
1167 
1168   DECREASE_PENDING_REQ_COUNT(handle);
1169 }
1170 
1171 
uv__process_tcp_accept_req(uv_loop_t * loop,uv_tcp_t * handle,uv_req_t * raw_req)1172 void uv__process_tcp_accept_req(uv_loop_t* loop, uv_tcp_t* handle,
1173     uv_req_t* raw_req) {
1174   uv_tcp_accept_t* req = (uv_tcp_accept_t*) raw_req;
1175   int err;
1176 
1177   assert(handle->type == UV_TCP);
1178 
1179   /* If handle->accepted_socket is not a valid socket, then uv_queue_accept
1180    * must have failed. This is a serious error. We stop accepting connections
1181    * and report this error to the connection callback. */
1182   if (req->accept_socket == INVALID_SOCKET) {
1183     if (handle->flags & UV_HANDLE_LISTENING) {
1184       handle->flags &= ~UV_HANDLE_LISTENING;
1185       DECREASE_ACTIVE_COUNT(loop, handle);
1186       if (handle->stream.serv.connection_cb) {
1187         err = GET_REQ_SOCK_ERROR(req);
1188         handle->stream.serv.connection_cb((uv_stream_t*)handle,
1189                                       uv_translate_sys_error(err));
1190       }
1191     }
1192   } else if (REQ_SUCCESS(req) &&
1193       setsockopt(req->accept_socket,
1194                   SOL_SOCKET,
1195                   SO_UPDATE_ACCEPT_CONTEXT,
1196                   (char*)&handle->socket,
1197                   sizeof(handle->socket)) == 0) {
1198     req->next_pending = handle->tcp.serv.pending_accepts;
1199     handle->tcp.serv.pending_accepts = req;
1200 
1201     /* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */
1202     if (handle->stream.serv.connection_cb) {
1203       handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
1204     }
1205   } else {
1206     /* Error related to accepted socket is ignored because the server socket
1207      * may still be healthy. If the server socket is broken uv_queue_accept
1208      * will detect it. */
1209     closesocket(req->accept_socket);
1210     req->accept_socket = INVALID_SOCKET;
1211     if (handle->flags & UV_HANDLE_LISTENING) {
1212       uv__tcp_queue_accept(handle, req);
1213     }
1214   }
1215 
1216   DECREASE_PENDING_REQ_COUNT(handle);
1217 }
1218 
1219 
uv__process_tcp_connect_req(uv_loop_t * loop,uv_tcp_t * handle,uv_connect_t * req)1220 void uv__process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
1221     uv_connect_t* req) {
1222   int err;
1223 
1224   assert(handle->type == UV_TCP);
1225 
1226   UNREGISTER_HANDLE_REQ(loop, handle, req);
1227 
1228   err = 0;
1229   if (handle->delayed_error) {
1230     /* To smooth over the differences between unixes errors that
1231      * were reported synchronously on the first connect can be delayed
1232      * until the next tick--which is now.
1233      */
1234     err = handle->delayed_error;
1235     handle->delayed_error = 0;
1236   } else if (REQ_SUCCESS(req)) {
1237     if (handle->flags & UV_HANDLE_CLOSING) {
1238       /* use UV_ECANCELED for consistency with Unix */
1239       err = ERROR_OPERATION_ABORTED;
1240     } else if (setsockopt(handle->socket,
1241                           SOL_SOCKET,
1242                           SO_UPDATE_CONNECT_CONTEXT,
1243                           NULL,
1244                           0) == 0) {
1245       uv__connection_init((uv_stream_t*)handle);
1246       handle->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1247       loop->active_tcp_streams++;
1248     } else {
1249       err = WSAGetLastError();
1250     }
1251   } else {
1252     err = GET_REQ_SOCK_ERROR(req);
1253   }
1254   req->cb(req, uv_translate_sys_error(err));
1255 
1256   DECREASE_PENDING_REQ_COUNT(handle);
1257 }
1258 
1259 
uv__tcp_xfer_export(uv_tcp_t * handle,int target_pid,uv__ipc_socket_xfer_type_t * xfer_type,uv__ipc_socket_xfer_info_t * xfer_info)1260 int uv__tcp_xfer_export(uv_tcp_t* handle,
1261                         int target_pid,
1262                         uv__ipc_socket_xfer_type_t* xfer_type,
1263                         uv__ipc_socket_xfer_info_t* xfer_info) {
1264   if (handle->flags & UV_HANDLE_CONNECTION) {
1265     *xfer_type = UV__IPC_SOCKET_XFER_TCP_CONNECTION;
1266   } else {
1267     *xfer_type = UV__IPC_SOCKET_XFER_TCP_SERVER;
1268     /* We're about to share the socket with another process. Because this is a
1269      * listening socket, we assume that the other process will be accepting
1270      * connections on it. Thus, before sharing the socket with another process,
1271      * we call listen here in the parent process. */
1272     if (!(handle->flags & UV_HANDLE_LISTENING)) {
1273       if (!(handle->flags & UV_HANDLE_BOUND)) {
1274         return ERROR_NOT_SUPPORTED;
1275       }
1276       if (handle->delayed_error == 0 &&
1277           listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) {
1278         handle->delayed_error = WSAGetLastError();
1279       }
1280     }
1281   }
1282 
1283   if (WSADuplicateSocketW(handle->socket, target_pid, &xfer_info->socket_info))
1284     return WSAGetLastError();
1285   xfer_info->delayed_error = handle->delayed_error;
1286 
1287   /* Mark the local copy of the handle as 'shared' so we behave in a way that's
1288    * friendly to the process(es) that we share the socket with. */
1289   handle->flags |= UV_HANDLE_SHARED_TCP_SOCKET;
1290 
1291   return 0;
1292 }
1293 
1294 
uv__tcp_xfer_import(uv_tcp_t * tcp,uv__ipc_socket_xfer_type_t xfer_type,uv__ipc_socket_xfer_info_t * xfer_info)1295 int uv__tcp_xfer_import(uv_tcp_t* tcp,
1296                         uv__ipc_socket_xfer_type_t xfer_type,
1297                         uv__ipc_socket_xfer_info_t* xfer_info) {
1298   int err;
1299   SOCKET socket;
1300 
1301   assert(xfer_type == UV__IPC_SOCKET_XFER_TCP_SERVER ||
1302          xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION);
1303 
1304   socket = WSASocketW(FROM_PROTOCOL_INFO,
1305                       FROM_PROTOCOL_INFO,
1306                       FROM_PROTOCOL_INFO,
1307                       &xfer_info->socket_info,
1308                       0,
1309                       WSA_FLAG_OVERLAPPED);
1310 
1311   if (socket == INVALID_SOCKET) {
1312     return WSAGetLastError();
1313   }
1314 
1315   err = uv__tcp_set_socket(
1316       tcp->loop, tcp, socket, xfer_info->socket_info.iAddressFamily, 1);
1317   if (err) {
1318     closesocket(socket);
1319     return err;
1320   }
1321 
1322   tcp->delayed_error = xfer_info->delayed_error;
1323   tcp->flags |= UV_HANDLE_BOUND | UV_HANDLE_SHARED_TCP_SOCKET;
1324 
1325   if (xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION) {
1326     uv__connection_init((uv_stream_t*)tcp);
1327     tcp->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1328   }
1329 
1330   tcp->loop->active_tcp_streams++;
1331   return 0;
1332 }
1333 
1334 
uv_tcp_nodelay(uv_tcp_t * handle,int enable)1335 int uv_tcp_nodelay(uv_tcp_t* handle, int enable) {
1336   int err;
1337 
1338   if (handle->socket != INVALID_SOCKET) {
1339     err = uv__tcp_nodelay(handle, handle->socket, enable);
1340     if (err)
1341       return uv_translate_sys_error(err);
1342   }
1343 
1344   if (enable) {
1345     handle->flags |= UV_HANDLE_TCP_NODELAY;
1346   } else {
1347     handle->flags &= ~UV_HANDLE_TCP_NODELAY;
1348   }
1349 
1350   return 0;
1351 }
1352 
1353 
uv_tcp_keepalive(uv_tcp_t * handle,int enable,unsigned int delay)1354 int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) {
1355   int err;
1356 
1357   if (handle->socket != INVALID_SOCKET) {
1358     err = uv__tcp_keepalive(handle, handle->socket, enable, delay);
1359     if (err)
1360       return uv_translate_sys_error(err);
1361   }
1362 
1363   if (enable) {
1364     handle->flags |= UV_HANDLE_TCP_KEEPALIVE;
1365   } else {
1366     handle->flags &= ~UV_HANDLE_TCP_KEEPALIVE;
1367   }
1368 
1369   /* TODO: Store delay if handle->socket isn't created yet. */
1370 
1371   return 0;
1372 }
1373 
1374 
uv_tcp_simultaneous_accepts(uv_tcp_t * handle,int enable)1375 int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) {
1376   if (handle->flags & UV_HANDLE_CONNECTION) {
1377     return UV_EINVAL;
1378   }
1379 
1380   /* Check if we're already in the desired mode. */
1381   if ((enable && !(handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) ||
1382       (!enable && handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
1383     return 0;
1384   }
1385 
1386   /* Don't allow switching from single pending accept to many. */
1387   if (enable) {
1388     return UV_ENOTSUP;
1389   }
1390 
1391   /* Check if we're in a middle of changing the number of pending accepts. */
1392   if (handle->flags & UV_HANDLE_TCP_ACCEPT_STATE_CHANGING) {
1393     return 0;
1394   }
1395 
1396   handle->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
1397 
1398   /* Flip the changing flag if we have already queued multiple accepts. */
1399   if (handle->flags & UV_HANDLE_LISTENING) {
1400     handle->flags |= UV_HANDLE_TCP_ACCEPT_STATE_CHANGING;
1401   }
1402 
1403   return 0;
1404 }
1405 
1406 
uv__tcp_try_cancel_reqs(uv_tcp_t * tcp)1407 static void uv__tcp_try_cancel_reqs(uv_tcp_t* tcp) {
1408   SOCKET socket;
1409   int non_ifs_lsp;
1410   int reading;
1411   int writing;
1412 
1413   socket = tcp->socket;
1414   reading = tcp->flags & UV_HANDLE_READING;
1415   writing = tcp->stream.conn.write_reqs_pending > 0;
1416   if (!reading && !writing)
1417     return;
1418 
1419   /* TODO: in libuv v2, keep explicit track of write_reqs, so we can cancel
1420    * them each explicitly with CancelIoEx (like unix). */
1421   if (reading)
1422     CancelIoEx((HANDLE) socket, &tcp->read_req.u.io.overlapped);
1423   if (writing)
1424     CancelIo((HANDLE) socket);
1425 
1426   /* Check if we have any non-IFS LSPs stacked on top of TCP */
1427   non_ifs_lsp = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_non_ifs_lsp_ipv6 :
1428                                                 uv_tcp_non_ifs_lsp_ipv4;
1429 
1430   /* If there are non-ifs LSPs then try to obtain a base handle for the socket.
1431    * This will always fail on Windows XP/3k. */
1432   if (non_ifs_lsp) {
1433     DWORD bytes;
1434     if (WSAIoctl(socket,
1435                  SIO_BASE_HANDLE,
1436                  NULL,
1437                  0,
1438                  &socket,
1439                  sizeof socket,
1440                  &bytes,
1441                  NULL,
1442                  NULL) != 0) {
1443       /* Failed. We can't do CancelIo. */
1444       return;
1445     }
1446   }
1447 
1448   assert(socket != 0 && socket != INVALID_SOCKET);
1449 
1450   if (socket != tcp->socket) {
1451     if (reading)
1452       CancelIoEx((HANDLE) socket, &tcp->read_req.u.io.overlapped);
1453     if (writing)
1454       CancelIo((HANDLE) socket);
1455   }
1456 }
1457 
1458 
uv__tcp_close(uv_loop_t * loop,uv_tcp_t * tcp)1459 void uv__tcp_close(uv_loop_t* loop, uv_tcp_t* tcp) {
1460   if (tcp->flags & UV_HANDLE_CONNECTION) {
1461     uv__tcp_try_cancel_reqs(tcp);
1462     if (tcp->flags & UV_HANDLE_READING) {
1463       uv_read_stop((uv_stream_t*) tcp);
1464     }
1465   } else {
1466     if (tcp->tcp.serv.accept_reqs != NULL) {
1467       /* First close the incoming sockets to cancel the accept operations before
1468        * we free their resources. */
1469       unsigned int i;
1470       for (i = 0; i < uv_simultaneous_server_accepts; i++) {
1471         uv_tcp_accept_t* req = &tcp->tcp.serv.accept_reqs[i];
1472         if (req->accept_socket != INVALID_SOCKET) {
1473           closesocket(req->accept_socket);
1474           req->accept_socket = INVALID_SOCKET;
1475         }
1476       }
1477     }
1478     assert(!(tcp->flags & UV_HANDLE_READING));
1479   }
1480 
1481   if (tcp->flags & UV_HANDLE_LISTENING) {
1482     tcp->flags &= ~UV_HANDLE_LISTENING;
1483     DECREASE_ACTIVE_COUNT(loop, tcp);
1484   }
1485 
1486   /* If any overlapped req failed to cancel, calling `closesocket` now would
1487    * cause Win32 to send an RST packet. Try to avoid that for writes, if
1488    * possibly applicable, by waiting to process the completion notifications
1489    * first (which typically should be cancellations). There's not much we can
1490    * do about canceled reads, which also will generate an RST packet. */
1491   if (!(tcp->flags & UV_HANDLE_CONNECTION) ||
1492       tcp->stream.conn.write_reqs_pending == 0) {
1493     closesocket(tcp->socket);
1494     tcp->socket = INVALID_SOCKET;
1495   }
1496 
1497   tcp->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1498   uv__handle_closing(tcp);
1499 
1500   if (tcp->reqs_pending == 0) {
1501     uv__want_endgame(tcp->loop, (uv_handle_t*)tcp);
1502   }
1503 }
1504 
1505 
uv_tcp_open(uv_tcp_t * handle,uv_os_sock_t sock)1506 int uv_tcp_open(uv_tcp_t* handle, uv_os_sock_t sock) {
1507   WSAPROTOCOL_INFOW protocol_info;
1508   int opt_len;
1509   int err;
1510   struct sockaddr_storage saddr;
1511   int saddr_len;
1512 
1513   /* Detect the address family of the socket. */
1514   opt_len = (int) sizeof protocol_info;
1515   if (getsockopt(sock,
1516                  SOL_SOCKET,
1517                  SO_PROTOCOL_INFOW,
1518                  (char*) &protocol_info,
1519                  &opt_len) == SOCKET_ERROR) {
1520     return uv_translate_sys_error(GetLastError());
1521   }
1522 
1523   err = uv__tcp_set_socket(handle->loop,
1524                           handle,
1525                           sock,
1526                           protocol_info.iAddressFamily,
1527                           1);
1528   if (err) {
1529     return uv_translate_sys_error(err);
1530   }
1531 
1532   /* Support already active socket. */
1533   saddr_len = sizeof(saddr);
1534   if (!uv_tcp_getsockname(handle, (struct sockaddr*) &saddr, &saddr_len)) {
1535     /* Socket is already bound. */
1536     handle->flags |= UV_HANDLE_BOUND;
1537     saddr_len = sizeof(saddr);
1538     if (!uv_tcp_getpeername(handle, (struct sockaddr*) &saddr, &saddr_len)) {
1539       /* Socket is already connected. */
1540       uv__connection_init((uv_stream_t*) handle);
1541       handle->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1542     }
1543   }
1544 
1545   return 0;
1546 }
1547 
1548 
1549 /* This function is an egress point, i.e. it returns libuv errors rather than
1550  * system errors.
1551  */
uv__tcp_bind(uv_tcp_t * handle,const struct sockaddr * addr,unsigned int addrlen,unsigned int flags)1552 int uv__tcp_bind(uv_tcp_t* handle,
1553                  const struct sockaddr* addr,
1554                  unsigned int addrlen,
1555                  unsigned int flags) {
1556   int err;
1557 
1558   err = uv__tcp_try_bind(handle, addr, addrlen, flags);
1559   if (err)
1560     return uv_translate_sys_error(err);
1561 
1562   return 0;
1563 }
1564 
1565 
1566 /* This function is an egress point, i.e. it returns libuv errors rather than
1567  * system errors.
1568  */
uv__tcp_connect(uv_connect_t * req,uv_tcp_t * handle,const struct sockaddr * addr,unsigned int addrlen,uv_connect_cb cb)1569 int uv__tcp_connect(uv_connect_t* req,
1570                     uv_tcp_t* handle,
1571                     const struct sockaddr* addr,
1572                     unsigned int addrlen,
1573                     uv_connect_cb cb) {
1574   int err;
1575 
1576   err = uv__tcp_try_connect(req, handle, addr, addrlen, cb);
1577   if (err)
1578     return uv_translate_sys_error(err);
1579 
1580   return 0;
1581 }
1582 
1583 #ifndef WSA_FLAG_NO_HANDLE_INHERIT
1584 /* Added in Windows 7 SP1. Specify this to avoid race conditions, */
1585 /* but also manually clear the inherit flag in case this failed. */
1586 #define WSA_FLAG_NO_HANDLE_INHERIT 0x80
1587 #endif
1588 
uv_socketpair(int type,int protocol,uv_os_sock_t fds[2],int flags0,int flags1)1589 int uv_socketpair(int type, int protocol, uv_os_sock_t fds[2], int flags0, int flags1) {
1590   SOCKET server = INVALID_SOCKET;
1591   SOCKET client0 = INVALID_SOCKET;
1592   SOCKET client1 = INVALID_SOCKET;
1593   SOCKADDR_IN name;
1594   LPFN_ACCEPTEX func_acceptex;
1595   WSAOVERLAPPED overlap;
1596   char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32];
1597   int namelen;
1598   int err;
1599   DWORD bytes;
1600   DWORD flags;
1601   DWORD client0_flags = WSA_FLAG_NO_HANDLE_INHERIT;
1602   DWORD client1_flags = WSA_FLAG_NO_HANDLE_INHERIT;
1603 
1604   if (flags0 & UV_NONBLOCK_PIPE)
1605       client0_flags |= WSA_FLAG_OVERLAPPED;
1606   if (flags1 & UV_NONBLOCK_PIPE)
1607       client1_flags |= WSA_FLAG_OVERLAPPED;
1608 
1609   server = WSASocketW(AF_INET, type, protocol, NULL, 0,
1610                       WSA_FLAG_OVERLAPPED | WSA_FLAG_NO_HANDLE_INHERIT);
1611   if (server == INVALID_SOCKET)
1612     goto wsaerror;
1613   if (!SetHandleInformation((HANDLE) server, HANDLE_FLAG_INHERIT, 0))
1614     goto error;
1615   name.sin_family = AF_INET;
1616   name.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1617   name.sin_port = 0;
1618   if (bind(server, (SOCKADDR*) &name, sizeof(name)) != 0)
1619     goto wsaerror;
1620   if (listen(server, 1) != 0)
1621     goto wsaerror;
1622   namelen = sizeof(name);
1623   if (getsockname(server, (SOCKADDR*) &name, &namelen) != 0)
1624     goto wsaerror;
1625   client0 = WSASocketW(AF_INET, type, protocol, NULL, 0, client0_flags);
1626   if (client0 == INVALID_SOCKET)
1627     goto wsaerror;
1628   if (!SetHandleInformation((HANDLE) client0, HANDLE_FLAG_INHERIT, 0))
1629     goto error;
1630   if (connect(client0, (SOCKADDR*) &name, sizeof(name)) != 0)
1631     goto wsaerror;
1632   client1 = WSASocketW(AF_INET, type, protocol, NULL, 0, client1_flags);
1633   if (client1 == INVALID_SOCKET)
1634     goto wsaerror;
1635   if (!SetHandleInformation((HANDLE) client1, HANDLE_FLAG_INHERIT, 0))
1636     goto error;
1637   if (!uv__get_acceptex_function(server, &func_acceptex)) {
1638     err = WSAEAFNOSUPPORT;
1639     goto cleanup;
1640   }
1641   memset(&overlap, 0, sizeof(overlap));
1642   if (!func_acceptex(server,
1643                      client1,
1644                      accept_buffer,
1645                      0,
1646                      sizeof(struct sockaddr_storage),
1647                      sizeof(struct sockaddr_storage),
1648                      &bytes,
1649                      &overlap)) {
1650     err = WSAGetLastError();
1651     if (err == ERROR_IO_PENDING) {
1652       /* Result should complete immediately, since we already called connect,
1653        * but empirically, we sometimes have to poll the kernel a couple times
1654        * until it notices that. */
1655       while (!WSAGetOverlappedResult(client1, &overlap, &bytes, FALSE, &flags)) {
1656         err = WSAGetLastError();
1657         if (err != WSA_IO_INCOMPLETE)
1658           goto cleanup;
1659         SwitchToThread();
1660       }
1661     }
1662     else {
1663       goto cleanup;
1664     }
1665   }
1666   if (setsockopt(client1, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
1667                   (char*) &server, sizeof(server)) != 0) {
1668     goto wsaerror;
1669   }
1670 
1671   closesocket(server);
1672 
1673   fds[0] = client0;
1674   fds[1] = client1;
1675 
1676   return 0;
1677 
1678  wsaerror:
1679     err = WSAGetLastError();
1680     goto cleanup;
1681 
1682  error:
1683     err = GetLastError();
1684     goto cleanup;
1685 
1686  cleanup:
1687     if (server != INVALID_SOCKET)
1688       closesocket(server);
1689     if (client0 != INVALID_SOCKET)
1690       closesocket(client0);
1691     if (client1 != INVALID_SOCKET)
1692       closesocket(client1);
1693 
1694     assert(err);
1695     return uv_translate_sys_error(err);
1696 }
1697