• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a copy
4  * of this software and associated documentation files (the "Software"), to
5  * deal in the Software without restriction, including without limitation the
6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7  * sell copies of the Software, and to permit persons to whom the Software is
8  * furnished to do so, subject to the following conditions:
9  *
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19  * IN THE SOFTWARE.
20  */
21 
22 #include <assert.h>
23 #include <stdlib.h>
24 
25 #include "uv.h"
26 #include "internal.h"
27 #include "handle-inl.h"
28 #include "stream-inl.h"
29 #include "req-inl.h"
30 
31 
32 /*
33  * Threshold of active tcp streams for which to preallocate tcp read buffers.
34  * (Due to node slab allocator performing poorly under this pattern,
35  *  the optimization is temporarily disabled (threshold=0).  This will be
36  *  revisited once node allocator is improved.)
37  */
38 const unsigned int uv_active_tcp_streams_threshold = 0;
39 
40 /*
41  * Number of simultaneous pending AcceptEx calls.
42  */
43 const unsigned int uv_simultaneous_server_accepts = 32;
44 
45 /* A zero-size buffer for use by uv_tcp_read */
46 static char uv_zero_[] = "";
47 
uv__tcp_nodelay(uv_tcp_t * handle,SOCKET socket,int enable)48 static int uv__tcp_nodelay(uv_tcp_t* handle, SOCKET socket, int enable) {
49   if (setsockopt(socket,
50                  IPPROTO_TCP,
51                  TCP_NODELAY,
52                  (const char*)&enable,
53                  sizeof enable) == -1) {
54     return WSAGetLastError();
55   }
56   return 0;
57 }
58 
59 
uv__tcp_keepalive(uv_tcp_t * handle,SOCKET socket,int enable,unsigned int delay)60 static int uv__tcp_keepalive(uv_tcp_t* handle, SOCKET socket, int enable, unsigned int delay) {
61   if (setsockopt(socket,
62                  SOL_SOCKET,
63                  SO_KEEPALIVE,
64                  (const char*)&enable,
65                  sizeof enable) == -1) {
66     return WSAGetLastError();
67   }
68 
69   if (enable && setsockopt(socket,
70                            IPPROTO_TCP,
71                            TCP_KEEPALIVE,
72                            (const char*)&delay,
73                            sizeof delay) == -1) {
74     return WSAGetLastError();
75   }
76 
77   return 0;
78 }
79 
80 
uv__tcp_set_socket(uv_loop_t * loop,uv_tcp_t * handle,SOCKET socket,int family,int imported)81 static int uv__tcp_set_socket(uv_loop_t* loop,
82                               uv_tcp_t* handle,
83                               SOCKET socket,
84                               int family,
85                               int imported) {
86   DWORD yes = 1;
87   int non_ifs_lsp;
88   int err;
89 
90   if (handle->socket != INVALID_SOCKET)
91     return UV_EBUSY;
92 
93   /* Set the socket to nonblocking mode */
94   if (ioctlsocket(socket, FIONBIO, &yes) == SOCKET_ERROR) {
95     return WSAGetLastError();
96   }
97 
98   /* Make the socket non-inheritable */
99   if (!SetHandleInformation((HANDLE) socket, HANDLE_FLAG_INHERIT, 0))
100     return GetLastError();
101 
102   /* Associate it with the I/O completion port. Use uv_handle_t pointer as
103    * completion key. */
104   if (CreateIoCompletionPort((HANDLE)socket,
105                              loop->iocp,
106                              (ULONG_PTR)socket,
107                              0) == NULL) {
108     if (imported) {
109       handle->flags |= UV_HANDLE_EMULATE_IOCP;
110     } else {
111       return GetLastError();
112     }
113   }
114 
115   if (family == AF_INET6) {
116     non_ifs_lsp = uv_tcp_non_ifs_lsp_ipv6;
117   } else {
118     non_ifs_lsp = uv_tcp_non_ifs_lsp_ipv4;
119   }
120 
121   if (!(handle->flags & UV_HANDLE_EMULATE_IOCP) && !non_ifs_lsp) {
122     UCHAR sfcnm_flags =
123         FILE_SKIP_SET_EVENT_ON_HANDLE | FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
124     if (!SetFileCompletionNotificationModes((HANDLE) socket, sfcnm_flags))
125       return GetLastError();
126     handle->flags |= UV_HANDLE_SYNC_BYPASS_IOCP;
127   }
128 
129   if (handle->flags & UV_HANDLE_TCP_NODELAY) {
130     err = uv__tcp_nodelay(handle, socket, 1);
131     if (err)
132       return err;
133   }
134 
135   /* TODO: Use stored delay. */
136   if (handle->flags & UV_HANDLE_TCP_KEEPALIVE) {
137     err = uv__tcp_keepalive(handle, socket, 1, 60);
138     if (err)
139       return err;
140   }
141 
142   handle->socket = socket;
143 
144   if (family == AF_INET6) {
145     handle->flags |= UV_HANDLE_IPV6;
146   } else {
147     assert(!(handle->flags & UV_HANDLE_IPV6));
148   }
149 
150   return 0;
151 }
152 
153 
uv_tcp_init_ex(uv_loop_t * loop,uv_tcp_t * handle,unsigned int flags)154 int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* handle, unsigned int flags) {
155   int domain;
156 
157   /* Use the lower 8 bits for the domain */
158   domain = flags & 0xFF;
159   if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
160     return UV_EINVAL;
161 
162   if (flags & ~0xFF)
163     return UV_EINVAL;
164 
165   uv__stream_init(loop, (uv_stream_t*) handle, UV_TCP);
166   handle->tcp.serv.accept_reqs = NULL;
167   handle->tcp.serv.pending_accepts = NULL;
168   handle->socket = INVALID_SOCKET;
169   handle->reqs_pending = 0;
170   handle->tcp.serv.func_acceptex = NULL;
171   handle->tcp.conn.func_connectex = NULL;
172   handle->tcp.serv.processed_accepts = 0;
173   handle->delayed_error = 0;
174 
175   /* If anything fails beyond this point we need to remove the handle from
176    * the handle queue, since it was added by uv__handle_init in uv__stream_init.
177    */
178 
179   if (domain != AF_UNSPEC) {
180     SOCKET sock;
181     DWORD err;
182 
183     sock = socket(domain, SOCK_STREAM, 0);
184     if (sock == INVALID_SOCKET) {
185       err = WSAGetLastError();
186       QUEUE_REMOVE(&handle->handle_queue);
187       return uv_translate_sys_error(err);
188     }
189 
190     err = uv__tcp_set_socket(handle->loop, handle, sock, domain, 0);
191     if (err) {
192       closesocket(sock);
193       QUEUE_REMOVE(&handle->handle_queue);
194       return uv_translate_sys_error(err);
195     }
196 
197   }
198 
199   return 0;
200 }
201 
202 
uv_tcp_init(uv_loop_t * loop,uv_tcp_t * handle)203 int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) {
204   return uv_tcp_init_ex(loop, handle, AF_UNSPEC);
205 }
206 
207 
uv__process_tcp_shutdown_req(uv_loop_t * loop,uv_tcp_t * stream,uv_shutdown_t * req)208 void uv__process_tcp_shutdown_req(uv_loop_t* loop, uv_tcp_t* stream, uv_shutdown_t *req) {
209   int err;
210 
211   assert(req);
212   assert(stream->stream.conn.write_reqs_pending == 0);
213   assert(!(stream->flags & UV_HANDLE_SHUT));
214   assert(stream->flags & UV_HANDLE_CONNECTION);
215 
216   stream->stream.conn.shutdown_req = NULL;
217   stream->flags &= ~UV_HANDLE_SHUTTING;
218   UNREGISTER_HANDLE_REQ(loop, stream, req);
219 
220   err = 0;
221   if (stream->flags & UV_HANDLE_CLOSING)
222    /* The user destroyed the stream before we got to do the shutdown. */
223     err = UV_ECANCELED;
224   else if (shutdown(stream->socket, SD_SEND) == SOCKET_ERROR)
225     err = uv_translate_sys_error(WSAGetLastError());
226   else /* Success. */
227     stream->flags |= UV_HANDLE_SHUT;
228 
229   if (req->cb)
230     req->cb(req, err);
231 
232   DECREASE_PENDING_REQ_COUNT(stream);
233 }
234 
235 
uv__tcp_endgame(uv_loop_t * loop,uv_tcp_t * handle)236 void uv__tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) {
237   unsigned int i;
238   uv_tcp_accept_t* req;
239 
240   assert(handle->flags & UV_HANDLE_CLOSING);
241   assert(handle->reqs_pending == 0);
242   assert(!(handle->flags & UV_HANDLE_CLOSED));
243   assert(handle->socket == INVALID_SOCKET);
244 
245   if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->tcp.serv.accept_reqs) {
246     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
247       for (i = 0; i < uv_simultaneous_server_accepts; i++) {
248         req = &handle->tcp.serv.accept_reqs[i];
249         if (req->wait_handle != INVALID_HANDLE_VALUE) {
250           UnregisterWait(req->wait_handle);
251           req->wait_handle = INVALID_HANDLE_VALUE;
252         }
253         if (req->event_handle != NULL) {
254           CloseHandle(req->event_handle);
255           req->event_handle = NULL;
256         }
257       }
258     }
259 
260     uv__free(handle->tcp.serv.accept_reqs);
261     handle->tcp.serv.accept_reqs = NULL;
262   }
263 
264   if (handle->flags & UV_HANDLE_CONNECTION &&
265       handle->flags & UV_HANDLE_EMULATE_IOCP) {
266     if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
267       UnregisterWait(handle->read_req.wait_handle);
268       handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
269     }
270     if (handle->read_req.event_handle != NULL) {
271       CloseHandle(handle->read_req.event_handle);
272       handle->read_req.event_handle = NULL;
273     }
274   }
275 
276   uv__handle_close(handle);
277   loop->active_tcp_streams--;
278 }
279 
280 
281 /* Unlike on Unix, here we don't set SO_REUSEADDR, because it doesn't just
282  * allow binding to addresses that are in use by sockets in TIME_WAIT, it
283  * effectively allows 'stealing' a port which is in use by another application.
284  *
285  * SO_EXCLUSIVEADDRUSE is also not good here because it does check all sockets,
286  * regardless of state, so we'd get an error even if the port is in use by a
287  * socket in TIME_WAIT state.
288  *
289  * See issue #1360.
290  *
291  */
uv__tcp_try_bind(uv_tcp_t * handle,const struct sockaddr * addr,unsigned int addrlen,unsigned int flags)292 static int uv__tcp_try_bind(uv_tcp_t* handle,
293                             const struct sockaddr* addr,
294                             unsigned int addrlen,
295                             unsigned int flags) {
296   DWORD err;
297   int r;
298 
299   if (handle->socket == INVALID_SOCKET) {
300     SOCKET sock;
301 
302     /* Cannot set IPv6-only mode on non-IPv6 socket. */
303     if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)
304       return ERROR_INVALID_PARAMETER;
305 
306     sock = socket(addr->sa_family, SOCK_STREAM, 0);
307     if (sock == INVALID_SOCKET) {
308       return WSAGetLastError();
309     }
310 
311     err = uv__tcp_set_socket(handle->loop, handle, sock, addr->sa_family, 0);
312     if (err) {
313       closesocket(sock);
314       return err;
315     }
316   }
317 
318 #ifdef IPV6_V6ONLY
319   if (addr->sa_family == AF_INET6) {
320     int on;
321 
322     on = (flags & UV_TCP_IPV6ONLY) != 0;
323 
324     /* TODO: how to handle errors? This may fail if there is no ipv4 stack
325      * available, or when run on XP/2003 which have no support for dualstack
326      * sockets. For now we're silently ignoring the error. */
327     setsockopt(handle->socket,
328                IPPROTO_IPV6,
329                IPV6_V6ONLY,
330                (const char*)&on,
331                sizeof on);
332   }
333 #endif
334 
335   r = bind(handle->socket, addr, addrlen);
336 
337   if (r == SOCKET_ERROR) {
338     err = WSAGetLastError();
339     if (err == WSAEADDRINUSE) {
340       /* Some errors are not to be reported until connect() or listen() */
341       handle->delayed_error = err;
342     } else {
343       return err;
344     }
345   }
346 
347   handle->flags |= UV_HANDLE_BOUND;
348 
349   return 0;
350 }
351 
352 
post_completion(void * context,BOOLEAN timed_out)353 static void CALLBACK post_completion(void* context, BOOLEAN timed_out) {
354   uv_req_t* req;
355   uv_tcp_t* handle;
356 
357   req = (uv_req_t*) context;
358   assert(req != NULL);
359   handle = (uv_tcp_t*)req->data;
360   assert(handle != NULL);
361   assert(!timed_out);
362 
363   if (!PostQueuedCompletionStatus(handle->loop->iocp,
364                                   req->u.io.overlapped.InternalHigh,
365                                   0,
366                                   &req->u.io.overlapped)) {
367     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
368   }
369 }
370 
371 
post_write_completion(void * context,BOOLEAN timed_out)372 static void CALLBACK post_write_completion(void* context, BOOLEAN timed_out) {
373   uv_write_t* req;
374   uv_tcp_t* handle;
375 
376   req = (uv_write_t*) context;
377   assert(req != NULL);
378   handle = (uv_tcp_t*)req->handle;
379   assert(handle != NULL);
380   assert(!timed_out);
381 
382   if (!PostQueuedCompletionStatus(handle->loop->iocp,
383                                   req->u.io.overlapped.InternalHigh,
384                                   0,
385                                   &req->u.io.overlapped)) {
386     uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
387   }
388 }
389 
390 
uv__tcp_queue_accept(uv_tcp_t * handle,uv_tcp_accept_t * req)391 static void uv__tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
392   uv_loop_t* loop = handle->loop;
393   BOOL success;
394   DWORD bytes;
395   SOCKET accept_socket;
396   short family;
397 
398   assert(handle->flags & UV_HANDLE_LISTENING);
399   assert(req->accept_socket == INVALID_SOCKET);
400 
401   /* choose family and extension function */
402   if (handle->flags & UV_HANDLE_IPV6) {
403     family = AF_INET6;
404   } else {
405     family = AF_INET;
406   }
407 
408   /* Open a socket for the accepted connection. */
409   accept_socket = socket(family, SOCK_STREAM, 0);
410   if (accept_socket == INVALID_SOCKET) {
411     SET_REQ_ERROR(req, WSAGetLastError());
412     uv__insert_pending_req(loop, (uv_req_t*)req);
413     handle->reqs_pending++;
414     return;
415   }
416 
417   /* Make the socket non-inheritable */
418   if (!SetHandleInformation((HANDLE) accept_socket, HANDLE_FLAG_INHERIT, 0)) {
419     SET_REQ_ERROR(req, GetLastError());
420     uv__insert_pending_req(loop, (uv_req_t*)req);
421     handle->reqs_pending++;
422     closesocket(accept_socket);
423     return;
424   }
425 
426   /* Prepare the overlapped structure. */
427   memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
428   if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
429     assert(req->event_handle != NULL);
430     req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
431   }
432 
433   success = handle->tcp.serv.func_acceptex(handle->socket,
434                                           accept_socket,
435                                           (void*)req->accept_buffer,
436                                           0,
437                                           sizeof(struct sockaddr_storage),
438                                           sizeof(struct sockaddr_storage),
439                                           &bytes,
440                                           &req->u.io.overlapped);
441 
442   if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
443     /* Process the req without IOCP. */
444     req->accept_socket = accept_socket;
445     handle->reqs_pending++;
446     uv__insert_pending_req(loop, (uv_req_t*)req);
447   } else if (UV_SUCCEEDED_WITH_IOCP(success)) {
448     /* The req will be processed with IOCP. */
449     req->accept_socket = accept_socket;
450     handle->reqs_pending++;
451     if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
452         req->wait_handle == INVALID_HANDLE_VALUE &&
453         !RegisterWaitForSingleObject(&req->wait_handle,
454           req->event_handle, post_completion, (void*) req,
455           INFINITE, WT_EXECUTEINWAITTHREAD)) {
456       SET_REQ_ERROR(req, GetLastError());
457       uv__insert_pending_req(loop, (uv_req_t*)req);
458     }
459   } else {
460     /* Make this req pending reporting an error. */
461     SET_REQ_ERROR(req, WSAGetLastError());
462     uv__insert_pending_req(loop, (uv_req_t*)req);
463     handle->reqs_pending++;
464     /* Destroy the preallocated client socket. */
465     closesocket(accept_socket);
466     /* Destroy the event handle */
467     if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
468       CloseHandle(req->event_handle);
469       req->event_handle = NULL;
470     }
471   }
472 }
473 
474 
uv__tcp_queue_read(uv_loop_t * loop,uv_tcp_t * handle)475 static void uv__tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) {
476   uv_read_t* req;
477   uv_buf_t buf;
478   int result;
479   DWORD bytes, flags;
480 
481   assert(handle->flags & UV_HANDLE_READING);
482   assert(!(handle->flags & UV_HANDLE_READ_PENDING));
483 
484   req = &handle->read_req;
485   memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
486 
487   /*
488    * Preallocate a read buffer if the number of active streams is below
489    * the threshold.
490   */
491   if (loop->active_tcp_streams < uv_active_tcp_streams_threshold) {
492     handle->flags &= ~UV_HANDLE_ZERO_READ;
493     handle->tcp.conn.read_buffer = uv_buf_init(NULL, 0);
494     handle->alloc_cb((uv_handle_t*) handle, 65536, &handle->tcp.conn.read_buffer);
495     if (handle->tcp.conn.read_buffer.base == NULL ||
496         handle->tcp.conn.read_buffer.len == 0) {
497       handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &handle->tcp.conn.read_buffer);
498       return;
499     }
500     assert(handle->tcp.conn.read_buffer.base != NULL);
501     buf = handle->tcp.conn.read_buffer;
502   } else {
503     handle->flags |= UV_HANDLE_ZERO_READ;
504     buf.base = (char*) &uv_zero_;
505     buf.len = 0;
506   }
507 
508   /* Prepare the overlapped structure. */
509   memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
510   if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
511     assert(req->event_handle != NULL);
512     req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
513   }
514 
515   flags = 0;
516   result = WSARecv(handle->socket,
517                    (WSABUF*)&buf,
518                    1,
519                    &bytes,
520                    &flags,
521                    &req->u.io.overlapped,
522                    NULL);
523 
524   handle->flags |= UV_HANDLE_READ_PENDING;
525   handle->reqs_pending++;
526 
527   if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
528     /* Process the req without IOCP. */
529     req->u.io.overlapped.InternalHigh = bytes;
530     uv__insert_pending_req(loop, (uv_req_t*)req);
531   } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
532     /* The req will be processed with IOCP. */
533     if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
534         req->wait_handle == INVALID_HANDLE_VALUE &&
535         !RegisterWaitForSingleObject(&req->wait_handle,
536           req->event_handle, post_completion, (void*) req,
537           INFINITE, WT_EXECUTEINWAITTHREAD)) {
538       SET_REQ_ERROR(req, GetLastError());
539       uv__insert_pending_req(loop, (uv_req_t*)req);
540     }
541   } else {
542     /* Make this req pending reporting an error. */
543     SET_REQ_ERROR(req, WSAGetLastError());
544     uv__insert_pending_req(loop, (uv_req_t*)req);
545   }
546 }
547 
548 
uv_tcp_close_reset(uv_tcp_t * handle,uv_close_cb close_cb)549 int uv_tcp_close_reset(uv_tcp_t* handle, uv_close_cb close_cb) {
550   struct linger l = { 1, 0 };
551 
552   /* Disallow setting SO_LINGER to zero due to some platform inconsistencies */
553   if (handle->flags & UV_HANDLE_SHUTTING)
554     return UV_EINVAL;
555 
556   if (0 != setsockopt(handle->socket, SOL_SOCKET, SO_LINGER, (const char*)&l, sizeof(l)))
557     return uv_translate_sys_error(WSAGetLastError());
558 
559   uv_close((uv_handle_t*) handle, close_cb);
560   return 0;
561 }
562 
563 
uv__tcp_listen(uv_tcp_t * handle,int backlog,uv_connection_cb cb)564 int uv__tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
565   unsigned int i, simultaneous_accepts;
566   uv_tcp_accept_t* req;
567   int err;
568 
569   assert(backlog > 0);
570 
571   if (handle->flags & UV_HANDLE_LISTENING) {
572     handle->stream.serv.connection_cb = cb;
573   }
574 
575   if (handle->flags & UV_HANDLE_READING) {
576     return WSAEISCONN;
577   }
578 
579   if (handle->delayed_error) {
580     return handle->delayed_error;
581   }
582 
583   if (!(handle->flags & UV_HANDLE_BOUND)) {
584     err = uv__tcp_try_bind(handle,
585                            (const struct sockaddr*) &uv_addr_ip4_any_,
586                            sizeof(uv_addr_ip4_any_),
587                            0);
588     if (err)
589       return err;
590     if (handle->delayed_error)
591       return handle->delayed_error;
592   }
593 
594   if (!handle->tcp.serv.func_acceptex) {
595     if (!uv__get_acceptex_function(handle->socket, &handle->tcp.serv.func_acceptex)) {
596       return WSAEAFNOSUPPORT;
597     }
598   }
599 
600   /* If this flag is set, we already made this listen call in xfer. */
601   if (!(handle->flags & UV_HANDLE_SHARED_TCP_SOCKET) &&
602       listen(handle->socket, backlog) == SOCKET_ERROR) {
603     return WSAGetLastError();
604   }
605 
606   handle->flags |= UV_HANDLE_LISTENING;
607   handle->stream.serv.connection_cb = cb;
608   INCREASE_ACTIVE_COUNT(loop, handle);
609 
610   simultaneous_accepts = handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT ? 1
611     : uv_simultaneous_server_accepts;
612 
613   if (handle->tcp.serv.accept_reqs == NULL) {
614     handle->tcp.serv.accept_reqs =
615       uv__malloc(uv_simultaneous_server_accepts * sizeof(uv_tcp_accept_t));
616     if (!handle->tcp.serv.accept_reqs) {
617       uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
618     }
619 
620     for (i = 0; i < simultaneous_accepts; i++) {
621       req = &handle->tcp.serv.accept_reqs[i];
622       UV_REQ_INIT(req, UV_ACCEPT);
623       req->accept_socket = INVALID_SOCKET;
624       req->data = handle;
625 
626       req->wait_handle = INVALID_HANDLE_VALUE;
627       if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
628         req->event_handle = CreateEvent(NULL, 0, 0, NULL);
629         if (req->event_handle == NULL) {
630           uv_fatal_error(GetLastError(), "CreateEvent");
631         }
632       } else {
633         req->event_handle = NULL;
634       }
635 
636       uv__tcp_queue_accept(handle, req);
637     }
638 
639     /* Initialize other unused requests too, because uv_tcp_endgame doesn't
640      * know how many requests were initialized, so it will try to clean up
641      * {uv_simultaneous_server_accepts} requests. */
642     for (i = simultaneous_accepts; i < uv_simultaneous_server_accepts; i++) {
643       req = &handle->tcp.serv.accept_reqs[i];
644       UV_REQ_INIT(req, UV_ACCEPT);
645       req->accept_socket = INVALID_SOCKET;
646       req->data = handle;
647       req->wait_handle = INVALID_HANDLE_VALUE;
648       req->event_handle = NULL;
649     }
650   }
651 
652   return 0;
653 }
654 
655 
uv__tcp_accept(uv_tcp_t * server,uv_tcp_t * client)656 int uv__tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
657   uv_loop_t* loop = server->loop;
658   int err = 0;
659   int family;
660 
661   uv_tcp_accept_t* req = server->tcp.serv.pending_accepts;
662 
663   if (!req) {
664     /* No valid connections found, so we error out. */
665     return WSAEWOULDBLOCK;
666   }
667 
668   if (req->accept_socket == INVALID_SOCKET) {
669     return WSAENOTCONN;
670   }
671 
672   if (server->flags & UV_HANDLE_IPV6) {
673     family = AF_INET6;
674   } else {
675     family = AF_INET;
676   }
677 
678   err = uv__tcp_set_socket(client->loop,
679                           client,
680                           req->accept_socket,
681                           family,
682                           0);
683   if (err) {
684     closesocket(req->accept_socket);
685   } else {
686     uv__connection_init((uv_stream_t*) client);
687     /* AcceptEx() implicitly binds the accepted socket. */
688     client->flags |= UV_HANDLE_BOUND | UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
689   }
690 
691   /* Prepare the req to pick up a new connection */
692   server->tcp.serv.pending_accepts = req->next_pending;
693   req->next_pending = NULL;
694   req->accept_socket = INVALID_SOCKET;
695 
696   if (!(server->flags & UV_HANDLE_CLOSING)) {
697     /* Check if we're in a middle of changing the number of pending accepts. */
698     if (!(server->flags & UV_HANDLE_TCP_ACCEPT_STATE_CHANGING)) {
699       uv__tcp_queue_accept(server, req);
700     } else {
701       /* We better be switching to a single pending accept. */
702       assert(server->flags & UV_HANDLE_TCP_SINGLE_ACCEPT);
703 
704       server->tcp.serv.processed_accepts++;
705 
706       if (server->tcp.serv.processed_accepts >= uv_simultaneous_server_accepts) {
707         server->tcp.serv.processed_accepts = 0;
708         /*
709          * All previously queued accept requests are now processed.
710          * We now switch to queueing just a single accept.
711          */
712         uv__tcp_queue_accept(server, &server->tcp.serv.accept_reqs[0]);
713         server->flags &= ~UV_HANDLE_TCP_ACCEPT_STATE_CHANGING;
714         server->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
715       }
716     }
717   }
718 
719   loop->active_tcp_streams++;
720 
721   return err;
722 }
723 
724 
uv__tcp_read_start(uv_tcp_t * handle,uv_alloc_cb alloc_cb,uv_read_cb read_cb)725 int uv__tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb,
726     uv_read_cb read_cb) {
727   uv_loop_t* loop = handle->loop;
728 
729   handle->flags |= UV_HANDLE_READING;
730   handle->read_cb = read_cb;
731   handle->alloc_cb = alloc_cb;
732   INCREASE_ACTIVE_COUNT(loop, handle);
733 
734   /* If reading was stopped and then started again, there could still be a read
735    * request pending. */
736   if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
737     if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
738         handle->read_req.event_handle == NULL) {
739       handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
740       if (handle->read_req.event_handle == NULL) {
741         uv_fatal_error(GetLastError(), "CreateEvent");
742       }
743     }
744     uv__tcp_queue_read(loop, handle);
745   }
746 
747   return 0;
748 }
749 
uv__is_loopback(const struct sockaddr_storage * storage)750 static int uv__is_loopback(const struct sockaddr_storage* storage) {
751   const struct sockaddr_in* in4;
752   const struct sockaddr_in6* in6;
753   int i;
754 
755   if (storage->ss_family == AF_INET) {
756     in4 = (const struct sockaddr_in*) storage;
757     return in4->sin_addr.S_un.S_un_b.s_b1 == 127;
758   }
759   if (storage->ss_family == AF_INET6) {
760     in6 = (const struct sockaddr_in6*) storage;
761     for (i = 0; i < 7; ++i) {
762       if (in6->sin6_addr.u.Word[i] != 0)
763         return 0;
764     }
765     return in6->sin6_addr.u.Word[7] == htons(1);
766   }
767   return 0;
768 }
769 
770 // Check if Windows version is 10.0.16299 or later
uv__is_fast_loopback_fail_supported(void)771 static int uv__is_fast_loopback_fail_supported(void) {
772   OSVERSIONINFOW os_info;
773   if (!pRtlGetVersion)
774     return 0;
775   pRtlGetVersion(&os_info);
776   if (os_info.dwMajorVersion < 10)
777     return 0;
778   if (os_info.dwMajorVersion > 10)
779     return 1;
780   if (os_info.dwMinorVersion > 0)
781     return 1;
782   return os_info.dwBuildNumber >= 16299;
783 }
784 
uv__tcp_try_connect(uv_connect_t * req,uv_tcp_t * handle,const struct sockaddr * addr,unsigned int addrlen,uv_connect_cb cb)785 static int uv__tcp_try_connect(uv_connect_t* req,
786                               uv_tcp_t* handle,
787                               const struct sockaddr* addr,
788                               unsigned int addrlen,
789                               uv_connect_cb cb) {
790   uv_loop_t* loop = handle->loop;
791   TCP_INITIAL_RTO_PARAMETERS retransmit_ioctl;
792   const struct sockaddr* bind_addr;
793   struct sockaddr_storage converted;
794   BOOL success;
795   DWORD bytes;
796   int err;
797 
798   err = uv__convert_to_localhost_if_unspecified(addr, &converted);
799   if (err)
800     return err;
801 
802   if (handle->delayed_error != 0)
803     goto out;
804 
805   if (!(handle->flags & UV_HANDLE_BOUND)) {
806     if (addrlen == sizeof(uv_addr_ip4_any_)) {
807       bind_addr = (const struct sockaddr*) &uv_addr_ip4_any_;
808     } else if (addrlen == sizeof(uv_addr_ip6_any_)) {
809       bind_addr = (const struct sockaddr*) &uv_addr_ip6_any_;
810     } else {
811       abort();
812     }
813     err = uv__tcp_try_bind(handle, bind_addr, addrlen, 0);
814     if (err)
815       return err;
816     if (handle->delayed_error != 0)
817       goto out;
818   }
819 
820   if (!handle->tcp.conn.func_connectex) {
821     if (!uv__get_connectex_function(handle->socket, &handle->tcp.conn.func_connectex)) {
822       return WSAEAFNOSUPPORT;
823     }
824   }
825 
826   /* This makes connect() fail instantly if the target port on the localhost
827    * is not reachable, instead of waiting for 2s. We do not care if this fails.
828    * This only works on Windows version 10.0.16299 and later.
829    */
830   if (uv__is_fast_loopback_fail_supported() && uv__is_loopback(&converted)) {
831     memset(&retransmit_ioctl, 0, sizeof(retransmit_ioctl));
832     retransmit_ioctl.Rtt = TCP_INITIAL_RTO_NO_SYN_RETRANSMISSIONS;
833     retransmit_ioctl.MaxSynRetransmissions = TCP_INITIAL_RTO_NO_SYN_RETRANSMISSIONS;
834     WSAIoctl(handle->socket,
835              SIO_TCP_INITIAL_RTO,
836              &retransmit_ioctl,
837              sizeof(retransmit_ioctl),
838              NULL,
839              0,
840              &bytes,
841              NULL,
842              NULL);
843   }
844 
845 out:
846 
847   UV_REQ_INIT(req, UV_CONNECT);
848   req->handle = (uv_stream_t*) handle;
849   req->cb = cb;
850   memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
851 
852   if (handle->delayed_error != 0) {
853     /* Process the req without IOCP. */
854     handle->reqs_pending++;
855     REGISTER_HANDLE_REQ(loop, handle, req);
856     uv__insert_pending_req(loop, (uv_req_t*)req);
857     return 0;
858   }
859 
860   success = handle->tcp.conn.func_connectex(handle->socket,
861                                             (const struct sockaddr*) &converted,
862                                             addrlen,
863                                             NULL,
864                                             0,
865                                             &bytes,
866                                             &req->u.io.overlapped);
867 
868   if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
869     /* Process the req without IOCP. */
870     handle->reqs_pending++;
871     REGISTER_HANDLE_REQ(loop, handle, req);
872     uv__insert_pending_req(loop, (uv_req_t*)req);
873   } else if (UV_SUCCEEDED_WITH_IOCP(success)) {
874     /* The req will be processed with IOCP. */
875     handle->reqs_pending++;
876     REGISTER_HANDLE_REQ(loop, handle, req);
877   } else {
878     return WSAGetLastError();
879   }
880 
881   return 0;
882 }
883 
884 
uv_tcp_getsockname(const uv_tcp_t * handle,struct sockaddr * name,int * namelen)885 int uv_tcp_getsockname(const uv_tcp_t* handle,
886                        struct sockaddr* name,
887                        int* namelen) {
888 
889   return uv__getsockpeername((const uv_handle_t*) handle,
890                              getsockname,
891                              name,
892                              namelen,
893                              handle->delayed_error);
894 }
895 
896 
uv_tcp_getpeername(const uv_tcp_t * handle,struct sockaddr * name,int * namelen)897 int uv_tcp_getpeername(const uv_tcp_t* handle,
898                        struct sockaddr* name,
899                        int* namelen) {
900 
901   return uv__getsockpeername((const uv_handle_t*) handle,
902                              getpeername,
903                              name,
904                              namelen,
905                              handle->delayed_error);
906 }
907 
908 
uv__tcp_write(uv_loop_t * loop,uv_write_t * req,uv_tcp_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_write_cb cb)909 int uv__tcp_write(uv_loop_t* loop,
910                  uv_write_t* req,
911                  uv_tcp_t* handle,
912                  const uv_buf_t bufs[],
913                  unsigned int nbufs,
914                  uv_write_cb cb) {
915   int result;
916   DWORD bytes;
917 
918   UV_REQ_INIT(req, UV_WRITE);
919   req->handle = (uv_stream_t*) handle;
920   req->cb = cb;
921 
922   /* Prepare the overlapped structure. */
923   memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
924   if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
925     req->event_handle = CreateEvent(NULL, 0, 0, NULL);
926     if (req->event_handle == NULL) {
927       uv_fatal_error(GetLastError(), "CreateEvent");
928     }
929     req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
930     req->wait_handle = INVALID_HANDLE_VALUE;
931   }
932 
933   result = WSASend(handle->socket,
934                    (WSABUF*) bufs,
935                    nbufs,
936                    &bytes,
937                    0,
938                    &req->u.io.overlapped,
939                    NULL);
940 
941   if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
942     /* Request completed immediately. */
943     req->u.io.queued_bytes = 0;
944     handle->reqs_pending++;
945     handle->stream.conn.write_reqs_pending++;
946     REGISTER_HANDLE_REQ(loop, handle, req);
947     uv__insert_pending_req(loop, (uv_req_t*) req);
948   } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
949     /* Request queued by the kernel. */
950     req->u.io.queued_bytes = uv__count_bufs(bufs, nbufs);
951     handle->reqs_pending++;
952     handle->stream.conn.write_reqs_pending++;
953     REGISTER_HANDLE_REQ(loop, handle, req);
954     handle->write_queue_size += req->u.io.queued_bytes;
955     if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
956         !RegisterWaitForSingleObject(&req->wait_handle,
957           req->event_handle, post_write_completion, (void*) req,
958           INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
959       SET_REQ_ERROR(req, GetLastError());
960       uv__insert_pending_req(loop, (uv_req_t*)req);
961     }
962   } else {
963     /* Send failed due to an error, report it later */
964     req->u.io.queued_bytes = 0;
965     handle->reqs_pending++;
966     handle->stream.conn.write_reqs_pending++;
967     REGISTER_HANDLE_REQ(loop, handle, req);
968     SET_REQ_ERROR(req, WSAGetLastError());
969     uv__insert_pending_req(loop, (uv_req_t*) req);
970   }
971 
972   return 0;
973 }
974 
975 
uv__tcp_try_write(uv_tcp_t * handle,const uv_buf_t bufs[],unsigned int nbufs)976 int uv__tcp_try_write(uv_tcp_t* handle,
977                      const uv_buf_t bufs[],
978                      unsigned int nbufs) {
979   int result;
980   DWORD bytes;
981 
982   if (handle->stream.conn.write_reqs_pending > 0)
983     return UV_EAGAIN;
984 
985   result = WSASend(handle->socket,
986                    (WSABUF*) bufs,
987                    nbufs,
988                    &bytes,
989                    0,
990                    NULL,
991                    NULL);
992 
993   if (result == SOCKET_ERROR)
994     return uv_translate_sys_error(WSAGetLastError());
995   else
996     return bytes;
997 }
998 
999 
uv__process_tcp_read_req(uv_loop_t * loop,uv_tcp_t * handle,uv_req_t * req)1000 void uv__process_tcp_read_req(uv_loop_t* loop, uv_tcp_t* handle,
1001     uv_req_t* req) {
1002   DWORD bytes, flags, err;
1003   uv_buf_t buf;
1004   int count;
1005 
1006   assert(handle->type == UV_TCP);
1007 
1008   handle->flags &= ~UV_HANDLE_READ_PENDING;
1009 
1010   if (!REQ_SUCCESS(req)) {
1011     /* An error occurred doing the read. */
1012     if ((handle->flags & UV_HANDLE_READING) ||
1013         !(handle->flags & UV_HANDLE_ZERO_READ)) {
1014       handle->flags &= ~UV_HANDLE_READING;
1015       DECREASE_ACTIVE_COUNT(loop, handle);
1016       buf = (handle->flags & UV_HANDLE_ZERO_READ) ?
1017             uv_buf_init(NULL, 0) : handle->tcp.conn.read_buffer;
1018 
1019       err = GET_REQ_SOCK_ERROR(req);
1020 
1021       if (err == WSAECONNABORTED) {
1022         /* Turn WSAECONNABORTED into UV_ECONNRESET to be consistent with Unix.
1023          */
1024         err = WSAECONNRESET;
1025       }
1026       handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1027 
1028       handle->read_cb((uv_stream_t*)handle,
1029                       uv_translate_sys_error(err),
1030                       &buf);
1031     }
1032   } else {
1033     if (!(handle->flags & UV_HANDLE_ZERO_READ)) {
1034       /* The read was done with a non-zero buffer length. */
1035       if (req->u.io.overlapped.InternalHigh > 0) {
1036         /* Successful read */
1037         handle->read_cb((uv_stream_t*)handle,
1038                         req->u.io.overlapped.InternalHigh,
1039                         &handle->tcp.conn.read_buffer);
1040         /* Read again only if bytes == buf.len */
1041         if (req->u.io.overlapped.InternalHigh < handle->tcp.conn.read_buffer.len) {
1042           goto done;
1043         }
1044       } else {
1045         /* Connection closed */
1046         if (handle->flags & UV_HANDLE_READING) {
1047           handle->flags &= ~UV_HANDLE_READING;
1048           DECREASE_ACTIVE_COUNT(loop, handle);
1049         }
1050 
1051         buf.base = 0;
1052         buf.len = 0;
1053         handle->read_cb((uv_stream_t*)handle, UV_EOF, &handle->tcp.conn.read_buffer);
1054         goto done;
1055       }
1056     }
1057 
1058     /* Do nonblocking reads until the buffer is empty */
1059     count = 32;
1060     while ((handle->flags & UV_HANDLE_READING) && (count-- > 0)) {
1061       buf = uv_buf_init(NULL, 0);
1062       handle->alloc_cb((uv_handle_t*) handle, 65536, &buf);
1063       if (buf.base == NULL || buf.len == 0) {
1064         handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1065         break;
1066       }
1067       assert(buf.base != NULL);
1068 
1069       flags = 0;
1070       if (WSARecv(handle->socket,
1071                   (WSABUF*)&buf,
1072                   1,
1073                   &bytes,
1074                   &flags,
1075                   NULL,
1076                   NULL) != SOCKET_ERROR) {
1077         if (bytes > 0) {
1078           /* Successful read */
1079           handle->read_cb((uv_stream_t*)handle, bytes, &buf);
1080           /* Read again only if bytes == buf.len */
1081           if (bytes < buf.len) {
1082             break;
1083           }
1084         } else {
1085           /* Connection closed */
1086           handle->flags &= ~UV_HANDLE_READING;
1087           DECREASE_ACTIVE_COUNT(loop, handle);
1088 
1089           handle->read_cb((uv_stream_t*)handle, UV_EOF, &buf);
1090           break;
1091         }
1092       } else {
1093         err = WSAGetLastError();
1094         if (err == WSAEWOULDBLOCK) {
1095           /* Read buffer was completely empty, report a 0-byte read. */
1096           handle->read_cb((uv_stream_t*)handle, 0, &buf);
1097         } else {
1098           /* Ouch! serious error. */
1099           handle->flags &= ~UV_HANDLE_READING;
1100           DECREASE_ACTIVE_COUNT(loop, handle);
1101 
1102           if (err == WSAECONNABORTED) {
1103             /* Turn WSAECONNABORTED into UV_ECONNRESET to be consistent with
1104              * Unix. */
1105             err = WSAECONNRESET;
1106           }
1107           handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1108 
1109           handle->read_cb((uv_stream_t*)handle,
1110                           uv_translate_sys_error(err),
1111                           &buf);
1112         }
1113         break;
1114       }
1115     }
1116 
1117 done:
1118     /* Post another read if still reading and not closing. */
1119     if ((handle->flags & UV_HANDLE_READING) &&
1120         !(handle->flags & UV_HANDLE_READ_PENDING)) {
1121       uv__tcp_queue_read(loop, handle);
1122     }
1123   }
1124 
1125   DECREASE_PENDING_REQ_COUNT(handle);
1126 }
1127 
1128 
uv__process_tcp_write_req(uv_loop_t * loop,uv_tcp_t * handle,uv_write_t * req)1129 void uv__process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,
1130     uv_write_t* req) {
1131   int err;
1132 
1133   assert(handle->type == UV_TCP);
1134 
1135   assert(handle->write_queue_size >= req->u.io.queued_bytes);
1136   handle->write_queue_size -= req->u.io.queued_bytes;
1137 
1138   UNREGISTER_HANDLE_REQ(loop, handle, req);
1139 
1140   if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1141     if (req->wait_handle != INVALID_HANDLE_VALUE) {
1142       UnregisterWait(req->wait_handle);
1143       req->wait_handle = INVALID_HANDLE_VALUE;
1144     }
1145     if (req->event_handle != NULL) {
1146       CloseHandle(req->event_handle);
1147       req->event_handle = NULL;
1148     }
1149   }
1150 
1151   if (req->cb) {
1152     err = uv_translate_sys_error(GET_REQ_SOCK_ERROR(req));
1153     if (err == UV_ECONNABORTED) {
1154       /* use UV_ECANCELED for consistency with Unix */
1155       err = UV_ECANCELED;
1156     }
1157     req->cb(req, err);
1158   }
1159 
1160   handle->stream.conn.write_reqs_pending--;
1161   if (handle->stream.conn.write_reqs_pending == 0) {
1162     if (handle->flags & UV_HANDLE_CLOSING) {
1163       closesocket(handle->socket);
1164       handle->socket = INVALID_SOCKET;
1165     }
1166     if (handle->flags & UV_HANDLE_SHUTTING)
1167       uv__process_tcp_shutdown_req(loop,
1168                                    handle,
1169                                    handle->stream.conn.shutdown_req);
1170   }
1171 
1172   DECREASE_PENDING_REQ_COUNT(handle);
1173 }
1174 
1175 
uv__process_tcp_accept_req(uv_loop_t * loop,uv_tcp_t * handle,uv_req_t * raw_req)1176 void uv__process_tcp_accept_req(uv_loop_t* loop, uv_tcp_t* handle,
1177     uv_req_t* raw_req) {
1178   uv_tcp_accept_t* req = (uv_tcp_accept_t*) raw_req;
1179   int err;
1180 
1181   assert(handle->type == UV_TCP);
1182 
1183   /* If handle->accepted_socket is not a valid socket, then uv_queue_accept
1184    * must have failed. This is a serious error. We stop accepting connections
1185    * and report this error to the connection callback. */
1186   if (req->accept_socket == INVALID_SOCKET) {
1187     if (handle->flags & UV_HANDLE_LISTENING) {
1188       handle->flags &= ~UV_HANDLE_LISTENING;
1189       DECREASE_ACTIVE_COUNT(loop, handle);
1190       if (handle->stream.serv.connection_cb) {
1191         err = GET_REQ_SOCK_ERROR(req);
1192         handle->stream.serv.connection_cb((uv_stream_t*)handle,
1193                                       uv_translate_sys_error(err));
1194       }
1195     }
1196   } else if (REQ_SUCCESS(req) &&
1197       setsockopt(req->accept_socket,
1198                   SOL_SOCKET,
1199                   SO_UPDATE_ACCEPT_CONTEXT,
1200                   (char*)&handle->socket,
1201                   sizeof(handle->socket)) == 0) {
1202     req->next_pending = handle->tcp.serv.pending_accepts;
1203     handle->tcp.serv.pending_accepts = req;
1204 
1205     /* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */
1206     if (handle->stream.serv.connection_cb) {
1207       handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
1208     }
1209   } else {
1210     /* Error related to accepted socket is ignored because the server socket
1211      * may still be healthy. If the server socket is broken uv_queue_accept
1212      * will detect it. */
1213     closesocket(req->accept_socket);
1214     req->accept_socket = INVALID_SOCKET;
1215     if (handle->flags & UV_HANDLE_LISTENING) {
1216       uv__tcp_queue_accept(handle, req);
1217     }
1218   }
1219 
1220   DECREASE_PENDING_REQ_COUNT(handle);
1221 }
1222 
1223 
uv__process_tcp_connect_req(uv_loop_t * loop,uv_tcp_t * handle,uv_connect_t * req)1224 void uv__process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
1225     uv_connect_t* req) {
1226   int err;
1227 
1228   assert(handle->type == UV_TCP);
1229 
1230   UNREGISTER_HANDLE_REQ(loop, handle, req);
1231 
1232   err = 0;
1233   if (handle->delayed_error) {
1234     /* To smooth over the differences between unixes errors that
1235      * were reported synchronously on the first connect can be delayed
1236      * until the next tick--which is now.
1237      */
1238     err = handle->delayed_error;
1239     handle->delayed_error = 0;
1240   } else if (REQ_SUCCESS(req)) {
1241     if (handle->flags & UV_HANDLE_CLOSING) {
1242       /* use UV_ECANCELED for consistency with Unix */
1243       err = ERROR_OPERATION_ABORTED;
1244     } else if (setsockopt(handle->socket,
1245                           SOL_SOCKET,
1246                           SO_UPDATE_CONNECT_CONTEXT,
1247                           NULL,
1248                           0) == 0) {
1249       uv__connection_init((uv_stream_t*)handle);
1250       handle->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1251       loop->active_tcp_streams++;
1252     } else {
1253       err = WSAGetLastError();
1254     }
1255   } else {
1256     err = GET_REQ_SOCK_ERROR(req);
1257   }
1258   req->cb(req, uv_translate_sys_error(err));
1259 
1260   DECREASE_PENDING_REQ_COUNT(handle);
1261 }
1262 
1263 
uv__tcp_xfer_export(uv_tcp_t * handle,int target_pid,uv__ipc_socket_xfer_type_t * xfer_type,uv__ipc_socket_xfer_info_t * xfer_info)1264 int uv__tcp_xfer_export(uv_tcp_t* handle,
1265                         int target_pid,
1266                         uv__ipc_socket_xfer_type_t* xfer_type,
1267                         uv__ipc_socket_xfer_info_t* xfer_info) {
1268   if (handle->flags & UV_HANDLE_CONNECTION) {
1269     *xfer_type = UV__IPC_SOCKET_XFER_TCP_CONNECTION;
1270   } else {
1271     *xfer_type = UV__IPC_SOCKET_XFER_TCP_SERVER;
1272     /* We're about to share the socket with another process. Because this is a
1273      * listening socket, we assume that the other process will be accepting
1274      * connections on it. Thus, before sharing the socket with another process,
1275      * we call listen here in the parent process. */
1276     if (!(handle->flags & UV_HANDLE_LISTENING)) {
1277       if (!(handle->flags & UV_HANDLE_BOUND)) {
1278         return ERROR_NOT_SUPPORTED;
1279       }
1280       if (handle->delayed_error == 0 &&
1281           listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) {
1282         handle->delayed_error = WSAGetLastError();
1283       }
1284     }
1285   }
1286 
1287   if (WSADuplicateSocketW(handle->socket, target_pid, &xfer_info->socket_info))
1288     return WSAGetLastError();
1289   xfer_info->delayed_error = handle->delayed_error;
1290 
1291   /* Mark the local copy of the handle as 'shared' so we behave in a way that's
1292    * friendly to the process(es) that we share the socket with. */
1293   handle->flags |= UV_HANDLE_SHARED_TCP_SOCKET;
1294 
1295   return 0;
1296 }
1297 
1298 
uv__tcp_xfer_import(uv_tcp_t * tcp,uv__ipc_socket_xfer_type_t xfer_type,uv__ipc_socket_xfer_info_t * xfer_info)1299 int uv__tcp_xfer_import(uv_tcp_t* tcp,
1300                         uv__ipc_socket_xfer_type_t xfer_type,
1301                         uv__ipc_socket_xfer_info_t* xfer_info) {
1302   int err;
1303   SOCKET socket;
1304 
1305   assert(xfer_type == UV__IPC_SOCKET_XFER_TCP_SERVER ||
1306          xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION);
1307 
1308   socket = WSASocketW(FROM_PROTOCOL_INFO,
1309                       FROM_PROTOCOL_INFO,
1310                       FROM_PROTOCOL_INFO,
1311                       &xfer_info->socket_info,
1312                       0,
1313                       WSA_FLAG_OVERLAPPED);
1314 
1315   if (socket == INVALID_SOCKET) {
1316     return WSAGetLastError();
1317   }
1318 
1319   err = uv__tcp_set_socket(
1320       tcp->loop, tcp, socket, xfer_info->socket_info.iAddressFamily, 1);
1321   if (err) {
1322     closesocket(socket);
1323     return err;
1324   }
1325 
1326   tcp->delayed_error = xfer_info->delayed_error;
1327   tcp->flags |= UV_HANDLE_BOUND | UV_HANDLE_SHARED_TCP_SOCKET;
1328 
1329   if (xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION) {
1330     uv__connection_init((uv_stream_t*)tcp);
1331     tcp->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1332   }
1333 
1334   tcp->loop->active_tcp_streams++;
1335   return 0;
1336 }
1337 
1338 
uv_tcp_nodelay(uv_tcp_t * handle,int enable)1339 int uv_tcp_nodelay(uv_tcp_t* handle, int enable) {
1340   int err;
1341 
1342   if (handle->socket != INVALID_SOCKET) {
1343     err = uv__tcp_nodelay(handle, handle->socket, enable);
1344     if (err)
1345       return uv_translate_sys_error(err);
1346   }
1347 
1348   if (enable) {
1349     handle->flags |= UV_HANDLE_TCP_NODELAY;
1350   } else {
1351     handle->flags &= ~UV_HANDLE_TCP_NODELAY;
1352   }
1353 
1354   return 0;
1355 }
1356 
1357 
uv_tcp_keepalive(uv_tcp_t * handle,int enable,unsigned int delay)1358 int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) {
1359   int err;
1360 
1361   if (handle->socket != INVALID_SOCKET) {
1362     err = uv__tcp_keepalive(handle, handle->socket, enable, delay);
1363     if (err)
1364       return uv_translate_sys_error(err);
1365   }
1366 
1367   if (enable) {
1368     handle->flags |= UV_HANDLE_TCP_KEEPALIVE;
1369   } else {
1370     handle->flags &= ~UV_HANDLE_TCP_KEEPALIVE;
1371   }
1372 
1373   /* TODO: Store delay if handle->socket isn't created yet. */
1374 
1375   return 0;
1376 }
1377 
1378 
uv_tcp_simultaneous_accepts(uv_tcp_t * handle,int enable)1379 int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) {
1380   if (handle->flags & UV_HANDLE_CONNECTION) {
1381     return UV_EINVAL;
1382   }
1383 
1384   /* Check if we're already in the desired mode. */
1385   if ((enable && !(handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) ||
1386       (!enable && handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
1387     return 0;
1388   }
1389 
1390   /* Don't allow switching from single pending accept to many. */
1391   if (enable) {
1392     return UV_ENOTSUP;
1393   }
1394 
1395   /* Check if we're in a middle of changing the number of pending accepts. */
1396   if (handle->flags & UV_HANDLE_TCP_ACCEPT_STATE_CHANGING) {
1397     return 0;
1398   }
1399 
1400   handle->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
1401 
1402   /* Flip the changing flag if we have already queued multiple accepts. */
1403   if (handle->flags & UV_HANDLE_LISTENING) {
1404     handle->flags |= UV_HANDLE_TCP_ACCEPT_STATE_CHANGING;
1405   }
1406 
1407   return 0;
1408 }
1409 
1410 
uv__tcp_try_cancel_reqs(uv_tcp_t * tcp)1411 static void uv__tcp_try_cancel_reqs(uv_tcp_t* tcp) {
1412   SOCKET socket;
1413   int non_ifs_lsp;
1414   int reading;
1415   int writing;
1416 
1417   socket = tcp->socket;
1418   reading = tcp->flags & UV_HANDLE_READ_PENDING;
1419   writing = tcp->stream.conn.write_reqs_pending > 0;
1420   if (!reading && !writing)
1421     return;
1422 
1423   /* TODO: in libuv v2, keep explicit track of write_reqs, so we can cancel
1424    * them each explicitly with CancelIoEx (like unix). */
1425   if (reading)
1426     CancelIoEx((HANDLE) socket, &tcp->read_req.u.io.overlapped);
1427   if (writing)
1428     CancelIo((HANDLE) socket);
1429 
1430   /* Check if we have any non-IFS LSPs stacked on top of TCP */
1431   non_ifs_lsp = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_non_ifs_lsp_ipv6 :
1432                                                 uv_tcp_non_ifs_lsp_ipv4;
1433 
1434   /* If there are non-ifs LSPs then try to obtain a base handle for the socket.
1435    * This will always fail on Windows XP/3k. */
1436   if (non_ifs_lsp) {
1437     DWORD bytes;
1438     if (WSAIoctl(socket,
1439                  SIO_BASE_HANDLE,
1440                  NULL,
1441                  0,
1442                  &socket,
1443                  sizeof socket,
1444                  &bytes,
1445                  NULL,
1446                  NULL) != 0) {
1447       /* Failed. We can't do CancelIo. */
1448       return;
1449     }
1450   }
1451 
1452   assert(socket != 0 && socket != INVALID_SOCKET);
1453 
1454   if (socket != tcp->socket) {
1455     if (reading)
1456       CancelIoEx((HANDLE) socket, &tcp->read_req.u.io.overlapped);
1457     if (writing)
1458       CancelIo((HANDLE) socket);
1459   }
1460 }
1461 
1462 
uv__tcp_close(uv_loop_t * loop,uv_tcp_t * tcp)1463 void uv__tcp_close(uv_loop_t* loop, uv_tcp_t* tcp) {
1464   if (tcp->flags & UV_HANDLE_CONNECTION) {
1465     if (tcp->flags & UV_HANDLE_READING) {
1466       uv_read_stop((uv_stream_t*) tcp);
1467     }
1468     uv__tcp_try_cancel_reqs(tcp);
1469   } else {
1470     if (tcp->tcp.serv.accept_reqs != NULL) {
1471       /* First close the incoming sockets to cancel the accept operations before
1472        * we free their resources. */
1473       unsigned int i;
1474       for (i = 0; i < uv_simultaneous_server_accepts; i++) {
1475         uv_tcp_accept_t* req = &tcp->tcp.serv.accept_reqs[i];
1476         if (req->accept_socket != INVALID_SOCKET) {
1477           closesocket(req->accept_socket);
1478           req->accept_socket = INVALID_SOCKET;
1479         }
1480       }
1481     }
1482     assert(!(tcp->flags & UV_HANDLE_READING));
1483   }
1484 
1485   if (tcp->flags & UV_HANDLE_LISTENING) {
1486     tcp->flags &= ~UV_HANDLE_LISTENING;
1487     DECREASE_ACTIVE_COUNT(loop, tcp);
1488   }
1489 
1490   tcp->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1491   uv__handle_closing(tcp);
1492 
1493   /* If any overlapped req failed to cancel, calling `closesocket` now would
1494    * cause Win32 to send an RST packet. Try to avoid that for writes, if
1495    * possibly applicable, by waiting to process the completion notifications
1496    * first (which typically should be cancellations). There's not much we can
1497    * do about canceled reads, which also will generate an RST packet. */
1498   if (!(tcp->flags & UV_HANDLE_CONNECTION) ||
1499       tcp->stream.conn.write_reqs_pending == 0) {
1500     closesocket(tcp->socket);
1501     tcp->socket = INVALID_SOCKET;
1502   }
1503 
1504   if (tcp->reqs_pending == 0)
1505     uv__want_endgame(loop, (uv_handle_t*) tcp);
1506 }
1507 
1508 
uv_tcp_open(uv_tcp_t * handle,uv_os_sock_t sock)1509 int uv_tcp_open(uv_tcp_t* handle, uv_os_sock_t sock) {
1510   WSAPROTOCOL_INFOW protocol_info;
1511   int opt_len;
1512   int err;
1513   struct sockaddr_storage saddr;
1514   int saddr_len;
1515 
1516   /* Detect the address family of the socket. */
1517   opt_len = (int) sizeof protocol_info;
1518   if (getsockopt(sock,
1519                  SOL_SOCKET,
1520                  SO_PROTOCOL_INFOW,
1521                  (char*) &protocol_info,
1522                  &opt_len) == SOCKET_ERROR) {
1523     return uv_translate_sys_error(GetLastError());
1524   }
1525 
1526   err = uv__tcp_set_socket(handle->loop,
1527                           handle,
1528                           sock,
1529                           protocol_info.iAddressFamily,
1530                           1);
1531   if (err) {
1532     return uv_translate_sys_error(err);
1533   }
1534 
1535   /* Support already active socket. */
1536   saddr_len = sizeof(saddr);
1537   if (!uv_tcp_getsockname(handle, (struct sockaddr*) &saddr, &saddr_len)) {
1538     /* Socket is already bound. */
1539     handle->flags |= UV_HANDLE_BOUND;
1540     saddr_len = sizeof(saddr);
1541     if (!uv_tcp_getpeername(handle, (struct sockaddr*) &saddr, &saddr_len)) {
1542       /* Socket is already connected. */
1543       uv__connection_init((uv_stream_t*) handle);
1544       handle->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1545     }
1546   }
1547 
1548   return 0;
1549 }
1550 
1551 
1552 /* This function is an egress point, i.e. it returns libuv errors rather than
1553  * system errors.
1554  */
uv__tcp_bind(uv_tcp_t * handle,const struct sockaddr * addr,unsigned int addrlen,unsigned int flags)1555 int uv__tcp_bind(uv_tcp_t* handle,
1556                  const struct sockaddr* addr,
1557                  unsigned int addrlen,
1558                  unsigned int flags) {
1559   int err;
1560 
1561   err = uv__tcp_try_bind(handle, addr, addrlen, flags);
1562   if (err)
1563     return uv_translate_sys_error(err);
1564 
1565   return 0;
1566 }
1567 
1568 
1569 /* This function is an egress point, i.e. it returns libuv errors rather than
1570  * system errors.
1571  */
uv__tcp_connect(uv_connect_t * req,uv_tcp_t * handle,const struct sockaddr * addr,unsigned int addrlen,uv_connect_cb cb)1572 int uv__tcp_connect(uv_connect_t* req,
1573                     uv_tcp_t* handle,
1574                     const struct sockaddr* addr,
1575                     unsigned int addrlen,
1576                     uv_connect_cb cb) {
1577   int err;
1578 
1579   err = uv__tcp_try_connect(req, handle, addr, addrlen, cb);
1580   if (err)
1581     return uv_translate_sys_error(err);
1582 
1583   return 0;
1584 }
1585 
1586 #ifndef WSA_FLAG_NO_HANDLE_INHERIT
1587 /* Added in Windows 7 SP1. Specify this to avoid race conditions, */
1588 /* but also manually clear the inherit flag in case this failed. */
1589 #define WSA_FLAG_NO_HANDLE_INHERIT 0x80
1590 #endif
1591 
uv_socketpair(int type,int protocol,uv_os_sock_t fds[2],int flags0,int flags1)1592 int uv_socketpair(int type, int protocol, uv_os_sock_t fds[2], int flags0, int flags1) {
1593   SOCKET server = INVALID_SOCKET;
1594   SOCKET client0 = INVALID_SOCKET;
1595   SOCKET client1 = INVALID_SOCKET;
1596   SOCKADDR_IN name;
1597   LPFN_ACCEPTEX func_acceptex;
1598   WSAOVERLAPPED overlap;
1599   char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32];
1600   int namelen;
1601   int err;
1602   DWORD bytes;
1603   DWORD flags;
1604   DWORD client0_flags = WSA_FLAG_NO_HANDLE_INHERIT;
1605   DWORD client1_flags = WSA_FLAG_NO_HANDLE_INHERIT;
1606 
1607   if (flags0 & UV_NONBLOCK_PIPE)
1608       client0_flags |= WSA_FLAG_OVERLAPPED;
1609   if (flags1 & UV_NONBLOCK_PIPE)
1610       client1_flags |= WSA_FLAG_OVERLAPPED;
1611 
1612   server = WSASocketW(AF_INET, type, protocol, NULL, 0,
1613                       WSA_FLAG_OVERLAPPED | WSA_FLAG_NO_HANDLE_INHERIT);
1614   if (server == INVALID_SOCKET)
1615     goto wsaerror;
1616   if (!SetHandleInformation((HANDLE) server, HANDLE_FLAG_INHERIT, 0))
1617     goto error;
1618   name.sin_family = AF_INET;
1619   name.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1620   name.sin_port = 0;
1621   if (bind(server, (SOCKADDR*) &name, sizeof(name)) != 0)
1622     goto wsaerror;
1623   if (listen(server, 1) != 0)
1624     goto wsaerror;
1625   namelen = sizeof(name);
1626   if (getsockname(server, (SOCKADDR*) &name, &namelen) != 0)
1627     goto wsaerror;
1628   client0 = WSASocketW(AF_INET, type, protocol, NULL, 0, client0_flags);
1629   if (client0 == INVALID_SOCKET)
1630     goto wsaerror;
1631   if (!SetHandleInformation((HANDLE) client0, HANDLE_FLAG_INHERIT, 0))
1632     goto error;
1633   if (connect(client0, (SOCKADDR*) &name, sizeof(name)) != 0)
1634     goto wsaerror;
1635   client1 = WSASocketW(AF_INET, type, protocol, NULL, 0, client1_flags);
1636   if (client1 == INVALID_SOCKET)
1637     goto wsaerror;
1638   if (!SetHandleInformation((HANDLE) client1, HANDLE_FLAG_INHERIT, 0))
1639     goto error;
1640   if (!uv__get_acceptex_function(server, &func_acceptex)) {
1641     err = WSAEAFNOSUPPORT;
1642     goto cleanup;
1643   }
1644   memset(&overlap, 0, sizeof(overlap));
1645   if (!func_acceptex(server,
1646                      client1,
1647                      accept_buffer,
1648                      0,
1649                      sizeof(struct sockaddr_storage),
1650                      sizeof(struct sockaddr_storage),
1651                      &bytes,
1652                      &overlap)) {
1653     err = WSAGetLastError();
1654     if (err == ERROR_IO_PENDING) {
1655       /* Result should complete immediately, since we already called connect,
1656        * but empirically, we sometimes have to poll the kernel a couple times
1657        * until it notices that. */
1658       while (!WSAGetOverlappedResult(client1, &overlap, &bytes, FALSE, &flags)) {
1659         err = WSAGetLastError();
1660         if (err != WSA_IO_INCOMPLETE)
1661           goto cleanup;
1662         SwitchToThread();
1663       }
1664     }
1665     else {
1666       goto cleanup;
1667     }
1668   }
1669   if (setsockopt(client1, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
1670                   (char*) &server, sizeof(server)) != 0) {
1671     goto wsaerror;
1672   }
1673 
1674   closesocket(server);
1675 
1676   fds[0] = client0;
1677   fds[1] = client1;
1678 
1679   return 0;
1680 
1681  wsaerror:
1682     err = WSAGetLastError();
1683     goto cleanup;
1684 
1685  error:
1686     err = GetLastError();
1687     goto cleanup;
1688 
1689  cleanup:
1690     if (server != INVALID_SOCKET)
1691       closesocket(server);
1692     if (client0 != INVALID_SOCKET)
1693       closesocket(client0);
1694     if (client1 != INVALID_SOCKET)
1695       closesocket(client1);
1696 
1697     assert(err);
1698     return uv_translate_sys_error(err);
1699 }
1700