1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include <assert.h>
23 #include <stdlib.h>
24
25 #include "uv.h"
26 #include "internal.h"
27 #include "handle-inl.h"
28 #include "stream-inl.h"
29 #include "req-inl.h"
30
31
32 /*
33 * Threshold of active tcp streams for which to preallocate tcp read buffers.
34 * (Due to node slab allocator performing poorly under this pattern,
35 * the optimization is temporarily disabled (threshold=0). This will be
36 * revisited once node allocator is improved.)
37 */
38 const unsigned int uv_active_tcp_streams_threshold = 0;
39
40 /*
41 * Number of simultaneous pending AcceptEx calls.
42 */
43 const unsigned int uv_simultaneous_server_accepts = 32;
44
45 /* A zero-size buffer for use by uv_tcp_read */
46 static char uv_zero_[] = "";
47
uv__tcp_nodelay(uv_tcp_t * handle,SOCKET socket,int enable)48 static int uv__tcp_nodelay(uv_tcp_t* handle, SOCKET socket, int enable) {
49 if (setsockopt(socket,
50 IPPROTO_TCP,
51 TCP_NODELAY,
52 (const char*)&enable,
53 sizeof enable) == -1) {
54 return WSAGetLastError();
55 }
56 return 0;
57 }
58
59
uv__tcp_keepalive(uv_tcp_t * handle,SOCKET socket,int enable,unsigned int delay)60 static int uv__tcp_keepalive(uv_tcp_t* handle, SOCKET socket, int enable, unsigned int delay) {
61 if (setsockopt(socket,
62 SOL_SOCKET,
63 SO_KEEPALIVE,
64 (const char*)&enable,
65 sizeof enable) == -1) {
66 return WSAGetLastError();
67 }
68
69 if (enable && setsockopt(socket,
70 IPPROTO_TCP,
71 TCP_KEEPALIVE,
72 (const char*)&delay,
73 sizeof delay) == -1) {
74 return WSAGetLastError();
75 }
76
77 return 0;
78 }
79
80
uv__tcp_set_socket(uv_loop_t * loop,uv_tcp_t * handle,SOCKET socket,int family,int imported)81 static int uv__tcp_set_socket(uv_loop_t* loop,
82 uv_tcp_t* handle,
83 SOCKET socket,
84 int family,
85 int imported) {
86 DWORD yes = 1;
87 int non_ifs_lsp;
88 int err;
89
90 if (handle->socket != INVALID_SOCKET)
91 return UV_EBUSY;
92
93 /* Set the socket to nonblocking mode */
94 if (ioctlsocket(socket, FIONBIO, &yes) == SOCKET_ERROR) {
95 return WSAGetLastError();
96 }
97
98 /* Make the socket non-inheritable */
99 if (!SetHandleInformation((HANDLE) socket, HANDLE_FLAG_INHERIT, 0))
100 return GetLastError();
101
102 /* Associate it with the I/O completion port. Use uv_handle_t pointer as
103 * completion key. */
104 if (CreateIoCompletionPort((HANDLE)socket,
105 loop->iocp,
106 (ULONG_PTR)socket,
107 0) == NULL) {
108 if (imported) {
109 handle->flags |= UV_HANDLE_EMULATE_IOCP;
110 } else {
111 return GetLastError();
112 }
113 }
114
115 if (family == AF_INET6) {
116 non_ifs_lsp = uv_tcp_non_ifs_lsp_ipv6;
117 } else {
118 non_ifs_lsp = uv_tcp_non_ifs_lsp_ipv4;
119 }
120
121 if (!(handle->flags & UV_HANDLE_EMULATE_IOCP) && !non_ifs_lsp) {
122 UCHAR sfcnm_flags =
123 FILE_SKIP_SET_EVENT_ON_HANDLE | FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
124 if (!SetFileCompletionNotificationModes((HANDLE) socket, sfcnm_flags))
125 return GetLastError();
126 handle->flags |= UV_HANDLE_SYNC_BYPASS_IOCP;
127 }
128
129 if (handle->flags & UV_HANDLE_TCP_NODELAY) {
130 err = uv__tcp_nodelay(handle, socket, 1);
131 if (err)
132 return err;
133 }
134
135 /* TODO: Use stored delay. */
136 if (handle->flags & UV_HANDLE_TCP_KEEPALIVE) {
137 err = uv__tcp_keepalive(handle, socket, 1, 60);
138 if (err)
139 return err;
140 }
141
142 handle->socket = socket;
143
144 if (family == AF_INET6) {
145 handle->flags |= UV_HANDLE_IPV6;
146 } else {
147 assert(!(handle->flags & UV_HANDLE_IPV6));
148 }
149
150 return 0;
151 }
152
153
uv_tcp_init_ex(uv_loop_t * loop,uv_tcp_t * handle,unsigned int flags)154 int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* handle, unsigned int flags) {
155 int domain;
156
157 /* Use the lower 8 bits for the domain */
158 domain = flags & 0xFF;
159 if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
160 return UV_EINVAL;
161
162 if (flags & ~0xFF)
163 return UV_EINVAL;
164
165 uv__stream_init(loop, (uv_stream_t*) handle, UV_TCP);
166 handle->tcp.serv.accept_reqs = NULL;
167 handle->tcp.serv.pending_accepts = NULL;
168 handle->socket = INVALID_SOCKET;
169 handle->reqs_pending = 0;
170 handle->tcp.serv.func_acceptex = NULL;
171 handle->tcp.conn.func_connectex = NULL;
172 handle->tcp.serv.processed_accepts = 0;
173 handle->delayed_error = 0;
174
175 /* If anything fails beyond this point we need to remove the handle from
176 * the handle queue, since it was added by uv__handle_init in uv__stream_init.
177 */
178
179 if (domain != AF_UNSPEC) {
180 SOCKET sock;
181 DWORD err;
182
183 sock = socket(domain, SOCK_STREAM, 0);
184 if (sock == INVALID_SOCKET) {
185 err = WSAGetLastError();
186 QUEUE_REMOVE(&handle->handle_queue);
187 return uv_translate_sys_error(err);
188 }
189
190 err = uv__tcp_set_socket(handle->loop, handle, sock, domain, 0);
191 if (err) {
192 closesocket(sock);
193 QUEUE_REMOVE(&handle->handle_queue);
194 return uv_translate_sys_error(err);
195 }
196
197 }
198
199 return 0;
200 }
201
202
uv_tcp_init(uv_loop_t * loop,uv_tcp_t * handle)203 int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) {
204 return uv_tcp_init_ex(loop, handle, AF_UNSPEC);
205 }
206
207
uv__tcp_endgame(uv_loop_t * loop,uv_tcp_t * handle)208 void uv__tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) {
209 int err;
210 unsigned int i;
211 uv_tcp_accept_t* req;
212
213 if (handle->flags & UV_HANDLE_CONNECTION &&
214 handle->stream.conn.shutdown_req != NULL &&
215 handle->stream.conn.write_reqs_pending == 0) {
216
217 UNREGISTER_HANDLE_REQ(loop, handle, handle->stream.conn.shutdown_req);
218
219 err = 0;
220 if (handle->flags & UV_HANDLE_CLOSING) {
221 err = ERROR_OPERATION_ABORTED;
222 } else if (shutdown(handle->socket, SD_SEND) == SOCKET_ERROR) {
223 err = WSAGetLastError();
224 }
225
226 if (handle->stream.conn.shutdown_req->cb) {
227 handle->stream.conn.shutdown_req->cb(handle->stream.conn.shutdown_req,
228 uv_translate_sys_error(err));
229 }
230
231 handle->stream.conn.shutdown_req = NULL;
232 DECREASE_PENDING_REQ_COUNT(handle);
233 return;
234 }
235
236 if (handle->flags & UV_HANDLE_CLOSING &&
237 handle->reqs_pending == 0) {
238 assert(!(handle->flags & UV_HANDLE_CLOSED));
239 assert(handle->socket == INVALID_SOCKET);
240
241 if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->tcp.serv.accept_reqs) {
242 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
243 for (i = 0; i < uv_simultaneous_server_accepts; i++) {
244 req = &handle->tcp.serv.accept_reqs[i];
245 if (req->wait_handle != INVALID_HANDLE_VALUE) {
246 UnregisterWait(req->wait_handle);
247 req->wait_handle = INVALID_HANDLE_VALUE;
248 }
249 if (req->event_handle != NULL) {
250 CloseHandle(req->event_handle);
251 req->event_handle = NULL;
252 }
253 }
254 }
255
256 uv__free(handle->tcp.serv.accept_reqs);
257 handle->tcp.serv.accept_reqs = NULL;
258 }
259
260 if (handle->flags & UV_HANDLE_CONNECTION &&
261 handle->flags & UV_HANDLE_EMULATE_IOCP) {
262 if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
263 UnregisterWait(handle->read_req.wait_handle);
264 handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
265 }
266 if (handle->read_req.event_handle != NULL) {
267 CloseHandle(handle->read_req.event_handle);
268 handle->read_req.event_handle = NULL;
269 }
270 }
271
272 uv__handle_close(handle);
273 loop->active_tcp_streams--;
274 }
275 }
276
277
278 /* Unlike on Unix, here we don't set SO_REUSEADDR, because it doesn't just
279 * allow binding to addresses that are in use by sockets in TIME_WAIT, it
280 * effectively allows 'stealing' a port which is in use by another application.
281 *
282 * SO_EXCLUSIVEADDRUSE is also not good here because it does check all sockets,
283 * regardless of state, so we'd get an error even if the port is in use by a
284 * socket in TIME_WAIT state.
285 *
286 * See issue #1360.
287 *
288 */
uv__tcp_try_bind(uv_tcp_t * handle,const struct sockaddr * addr,unsigned int addrlen,unsigned int flags)289 static int uv__tcp_try_bind(uv_tcp_t* handle,
290 const struct sockaddr* addr,
291 unsigned int addrlen,
292 unsigned int flags) {
293 DWORD err;
294 int r;
295
296 if (handle->socket == INVALID_SOCKET) {
297 SOCKET sock;
298
299 /* Cannot set IPv6-only mode on non-IPv6 socket. */
300 if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)
301 return ERROR_INVALID_PARAMETER;
302
303 sock = socket(addr->sa_family, SOCK_STREAM, 0);
304 if (sock == INVALID_SOCKET) {
305 return WSAGetLastError();
306 }
307
308 err = uv__tcp_set_socket(handle->loop, handle, sock, addr->sa_family, 0);
309 if (err) {
310 closesocket(sock);
311 return err;
312 }
313 }
314
315 #ifdef IPV6_V6ONLY
316 if (addr->sa_family == AF_INET6) {
317 int on;
318
319 on = (flags & UV_TCP_IPV6ONLY) != 0;
320
321 /* TODO: how to handle errors? This may fail if there is no ipv4 stack
322 * available, or when run on XP/2003 which have no support for dualstack
323 * sockets. For now we're silently ignoring the error. */
324 setsockopt(handle->socket,
325 IPPROTO_IPV6,
326 IPV6_V6ONLY,
327 (const char*)&on,
328 sizeof on);
329 }
330 #endif
331
332 r = bind(handle->socket, addr, addrlen);
333
334 if (r == SOCKET_ERROR) {
335 err = WSAGetLastError();
336 if (err == WSAEADDRINUSE) {
337 /* Some errors are not to be reported until connect() or listen() */
338 handle->delayed_error = err;
339 } else {
340 return err;
341 }
342 }
343
344 handle->flags |= UV_HANDLE_BOUND;
345
346 return 0;
347 }
348
349
post_completion(void * context,BOOLEAN timed_out)350 static void CALLBACK post_completion(void* context, BOOLEAN timed_out) {
351 uv_req_t* req;
352 uv_tcp_t* handle;
353
354 req = (uv_req_t*) context;
355 assert(req != NULL);
356 handle = (uv_tcp_t*)req->data;
357 assert(handle != NULL);
358 assert(!timed_out);
359
360 if (!PostQueuedCompletionStatus(handle->loop->iocp,
361 req->u.io.overlapped.InternalHigh,
362 0,
363 &req->u.io.overlapped)) {
364 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
365 }
366 }
367
368
post_write_completion(void * context,BOOLEAN timed_out)369 static void CALLBACK post_write_completion(void* context, BOOLEAN timed_out) {
370 uv_write_t* req;
371 uv_tcp_t* handle;
372
373 req = (uv_write_t*) context;
374 assert(req != NULL);
375 handle = (uv_tcp_t*)req->handle;
376 assert(handle != NULL);
377 assert(!timed_out);
378
379 if (!PostQueuedCompletionStatus(handle->loop->iocp,
380 req->u.io.overlapped.InternalHigh,
381 0,
382 &req->u.io.overlapped)) {
383 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
384 }
385 }
386
387
uv__tcp_queue_accept(uv_tcp_t * handle,uv_tcp_accept_t * req)388 static void uv__tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
389 uv_loop_t* loop = handle->loop;
390 BOOL success;
391 DWORD bytes;
392 SOCKET accept_socket;
393 short family;
394
395 assert(handle->flags & UV_HANDLE_LISTENING);
396 assert(req->accept_socket == INVALID_SOCKET);
397
398 /* choose family and extension function */
399 if (handle->flags & UV_HANDLE_IPV6) {
400 family = AF_INET6;
401 } else {
402 family = AF_INET;
403 }
404
405 /* Open a socket for the accepted connection. */
406 accept_socket = socket(family, SOCK_STREAM, 0);
407 if (accept_socket == INVALID_SOCKET) {
408 SET_REQ_ERROR(req, WSAGetLastError());
409 uv__insert_pending_req(loop, (uv_req_t*)req);
410 handle->reqs_pending++;
411 return;
412 }
413
414 /* Make the socket non-inheritable */
415 if (!SetHandleInformation((HANDLE) accept_socket, HANDLE_FLAG_INHERIT, 0)) {
416 SET_REQ_ERROR(req, GetLastError());
417 uv__insert_pending_req(loop, (uv_req_t*)req);
418 handle->reqs_pending++;
419 closesocket(accept_socket);
420 return;
421 }
422
423 /* Prepare the overlapped structure. */
424 memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
425 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
426 assert(req->event_handle != NULL);
427 req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
428 }
429
430 success = handle->tcp.serv.func_acceptex(handle->socket,
431 accept_socket,
432 (void*)req->accept_buffer,
433 0,
434 sizeof(struct sockaddr_storage),
435 sizeof(struct sockaddr_storage),
436 &bytes,
437 &req->u.io.overlapped);
438
439 if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
440 /* Process the req without IOCP. */
441 req->accept_socket = accept_socket;
442 handle->reqs_pending++;
443 uv__insert_pending_req(loop, (uv_req_t*)req);
444 } else if (UV_SUCCEEDED_WITH_IOCP(success)) {
445 /* The req will be processed with IOCP. */
446 req->accept_socket = accept_socket;
447 handle->reqs_pending++;
448 if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
449 req->wait_handle == INVALID_HANDLE_VALUE &&
450 !RegisterWaitForSingleObject(&req->wait_handle,
451 req->event_handle, post_completion, (void*) req,
452 INFINITE, WT_EXECUTEINWAITTHREAD)) {
453 SET_REQ_ERROR(req, GetLastError());
454 uv__insert_pending_req(loop, (uv_req_t*)req);
455 }
456 } else {
457 /* Make this req pending reporting an error. */
458 SET_REQ_ERROR(req, WSAGetLastError());
459 uv__insert_pending_req(loop, (uv_req_t*)req);
460 handle->reqs_pending++;
461 /* Destroy the preallocated client socket. */
462 closesocket(accept_socket);
463 /* Destroy the event handle */
464 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
465 CloseHandle(req->event_handle);
466 req->event_handle = NULL;
467 }
468 }
469 }
470
471
uv__tcp_queue_read(uv_loop_t * loop,uv_tcp_t * handle)472 static void uv__tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) {
473 uv_read_t* req;
474 uv_buf_t buf;
475 int result;
476 DWORD bytes, flags;
477
478 assert(handle->flags & UV_HANDLE_READING);
479 assert(!(handle->flags & UV_HANDLE_READ_PENDING));
480
481 req = &handle->read_req;
482 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
483
484 /*
485 * Preallocate a read buffer if the number of active streams is below
486 * the threshold.
487 */
488 if (loop->active_tcp_streams < uv_active_tcp_streams_threshold) {
489 handle->flags &= ~UV_HANDLE_ZERO_READ;
490 handle->tcp.conn.read_buffer = uv_buf_init(NULL, 0);
491 handle->alloc_cb((uv_handle_t*) handle, 65536, &handle->tcp.conn.read_buffer);
492 if (handle->tcp.conn.read_buffer.base == NULL ||
493 handle->tcp.conn.read_buffer.len == 0) {
494 handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &handle->tcp.conn.read_buffer);
495 return;
496 }
497 assert(handle->tcp.conn.read_buffer.base != NULL);
498 buf = handle->tcp.conn.read_buffer;
499 } else {
500 handle->flags |= UV_HANDLE_ZERO_READ;
501 buf.base = (char*) &uv_zero_;
502 buf.len = 0;
503 }
504
505 /* Prepare the overlapped structure. */
506 memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
507 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
508 assert(req->event_handle != NULL);
509 req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
510 }
511
512 flags = 0;
513 result = WSARecv(handle->socket,
514 (WSABUF*)&buf,
515 1,
516 &bytes,
517 &flags,
518 &req->u.io.overlapped,
519 NULL);
520
521 handle->flags |= UV_HANDLE_READ_PENDING;
522 handle->reqs_pending++;
523
524 if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
525 /* Process the req without IOCP. */
526 req->u.io.overlapped.InternalHigh = bytes;
527 uv__insert_pending_req(loop, (uv_req_t*)req);
528 } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
529 /* The req will be processed with IOCP. */
530 if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
531 req->wait_handle == INVALID_HANDLE_VALUE &&
532 !RegisterWaitForSingleObject(&req->wait_handle,
533 req->event_handle, post_completion, (void*) req,
534 INFINITE, WT_EXECUTEINWAITTHREAD)) {
535 SET_REQ_ERROR(req, GetLastError());
536 uv__insert_pending_req(loop, (uv_req_t*)req);
537 }
538 } else {
539 /* Make this req pending reporting an error. */
540 SET_REQ_ERROR(req, WSAGetLastError());
541 uv__insert_pending_req(loop, (uv_req_t*)req);
542 }
543 }
544
545
uv_tcp_close_reset(uv_tcp_t * handle,uv_close_cb close_cb)546 int uv_tcp_close_reset(uv_tcp_t* handle, uv_close_cb close_cb) {
547 struct linger l = { 1, 0 };
548
549 /* Disallow setting SO_LINGER to zero due to some platform inconsistencies */
550 if (handle->flags & UV_HANDLE_SHUTTING)
551 return UV_EINVAL;
552
553 if (0 != setsockopt(handle->socket, SOL_SOCKET, SO_LINGER, (const char*)&l, sizeof(l)))
554 return uv_translate_sys_error(WSAGetLastError());
555
556 uv_close((uv_handle_t*) handle, close_cb);
557 return 0;
558 }
559
560
uv__tcp_listen(uv_tcp_t * handle,int backlog,uv_connection_cb cb)561 int uv__tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
562 unsigned int i, simultaneous_accepts;
563 uv_tcp_accept_t* req;
564 int err;
565
566 assert(backlog > 0);
567
568 if (handle->flags & UV_HANDLE_LISTENING) {
569 handle->stream.serv.connection_cb = cb;
570 }
571
572 if (handle->flags & UV_HANDLE_READING) {
573 return WSAEISCONN;
574 }
575
576 if (handle->delayed_error) {
577 return handle->delayed_error;
578 }
579
580 if (!(handle->flags & UV_HANDLE_BOUND)) {
581 err = uv__tcp_try_bind(handle,
582 (const struct sockaddr*) &uv_addr_ip4_any_,
583 sizeof(uv_addr_ip4_any_),
584 0);
585 if (err)
586 return err;
587 if (handle->delayed_error)
588 return handle->delayed_error;
589 }
590
591 if (!handle->tcp.serv.func_acceptex) {
592 if (!uv__get_acceptex_function(handle->socket, &handle->tcp.serv.func_acceptex)) {
593 return WSAEAFNOSUPPORT;
594 }
595 }
596
597 /* If this flag is set, we already made this listen call in xfer. */
598 if (!(handle->flags & UV_HANDLE_SHARED_TCP_SOCKET) &&
599 listen(handle->socket, backlog) == SOCKET_ERROR) {
600 return WSAGetLastError();
601 }
602
603 handle->flags |= UV_HANDLE_LISTENING;
604 handle->stream.serv.connection_cb = cb;
605 INCREASE_ACTIVE_COUNT(loop, handle);
606
607 simultaneous_accepts = handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT ? 1
608 : uv_simultaneous_server_accepts;
609
610 if (handle->tcp.serv.accept_reqs == NULL) {
611 handle->tcp.serv.accept_reqs =
612 uv__malloc(uv_simultaneous_server_accepts * sizeof(uv_tcp_accept_t));
613 if (!handle->tcp.serv.accept_reqs) {
614 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
615 }
616
617 for (i = 0; i < simultaneous_accepts; i++) {
618 req = &handle->tcp.serv.accept_reqs[i];
619 UV_REQ_INIT(req, UV_ACCEPT);
620 req->accept_socket = INVALID_SOCKET;
621 req->data = handle;
622
623 req->wait_handle = INVALID_HANDLE_VALUE;
624 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
625 req->event_handle = CreateEvent(NULL, 0, 0, NULL);
626 if (req->event_handle == NULL) {
627 uv_fatal_error(GetLastError(), "CreateEvent");
628 }
629 } else {
630 req->event_handle = NULL;
631 }
632
633 uv__tcp_queue_accept(handle, req);
634 }
635
636 /* Initialize other unused requests too, because uv_tcp_endgame doesn't
637 * know how many requests were initialized, so it will try to clean up
638 * {uv_simultaneous_server_accepts} requests. */
639 for (i = simultaneous_accepts; i < uv_simultaneous_server_accepts; i++) {
640 req = &handle->tcp.serv.accept_reqs[i];
641 UV_REQ_INIT(req, UV_ACCEPT);
642 req->accept_socket = INVALID_SOCKET;
643 req->data = handle;
644 req->wait_handle = INVALID_HANDLE_VALUE;
645 req->event_handle = NULL;
646 }
647 }
648
649 return 0;
650 }
651
652
uv__tcp_accept(uv_tcp_t * server,uv_tcp_t * client)653 int uv__tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
654 uv_loop_t* loop = server->loop;
655 int err = 0;
656 int family;
657
658 uv_tcp_accept_t* req = server->tcp.serv.pending_accepts;
659
660 if (!req) {
661 /* No valid connections found, so we error out. */
662 return WSAEWOULDBLOCK;
663 }
664
665 if (req->accept_socket == INVALID_SOCKET) {
666 return WSAENOTCONN;
667 }
668
669 if (server->flags & UV_HANDLE_IPV6) {
670 family = AF_INET6;
671 } else {
672 family = AF_INET;
673 }
674
675 err = uv__tcp_set_socket(client->loop,
676 client,
677 req->accept_socket,
678 family,
679 0);
680 if (err) {
681 closesocket(req->accept_socket);
682 } else {
683 uv__connection_init((uv_stream_t*) client);
684 /* AcceptEx() implicitly binds the accepted socket. */
685 client->flags |= UV_HANDLE_BOUND | UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
686 }
687
688 /* Prepare the req to pick up a new connection */
689 server->tcp.serv.pending_accepts = req->next_pending;
690 req->next_pending = NULL;
691 req->accept_socket = INVALID_SOCKET;
692
693 if (!(server->flags & UV_HANDLE_CLOSING)) {
694 /* Check if we're in a middle of changing the number of pending accepts. */
695 if (!(server->flags & UV_HANDLE_TCP_ACCEPT_STATE_CHANGING)) {
696 uv__tcp_queue_accept(server, req);
697 } else {
698 /* We better be switching to a single pending accept. */
699 assert(server->flags & UV_HANDLE_TCP_SINGLE_ACCEPT);
700
701 server->tcp.serv.processed_accepts++;
702
703 if (server->tcp.serv.processed_accepts >= uv_simultaneous_server_accepts) {
704 server->tcp.serv.processed_accepts = 0;
705 /*
706 * All previously queued accept requests are now processed.
707 * We now switch to queueing just a single accept.
708 */
709 uv__tcp_queue_accept(server, &server->tcp.serv.accept_reqs[0]);
710 server->flags &= ~UV_HANDLE_TCP_ACCEPT_STATE_CHANGING;
711 server->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
712 }
713 }
714 }
715
716 loop->active_tcp_streams++;
717
718 return err;
719 }
720
721
uv__tcp_read_start(uv_tcp_t * handle,uv_alloc_cb alloc_cb,uv_read_cb read_cb)722 int uv__tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb,
723 uv_read_cb read_cb) {
724 uv_loop_t* loop = handle->loop;
725
726 handle->flags |= UV_HANDLE_READING;
727 handle->read_cb = read_cb;
728 handle->alloc_cb = alloc_cb;
729 INCREASE_ACTIVE_COUNT(loop, handle);
730
731 /* If reading was stopped and then started again, there could still be a read
732 * request pending. */
733 if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
734 if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
735 handle->read_req.event_handle == NULL) {
736 handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
737 if (handle->read_req.event_handle == NULL) {
738 uv_fatal_error(GetLastError(), "CreateEvent");
739 }
740 }
741 uv__tcp_queue_read(loop, handle);
742 }
743
744 return 0;
745 }
746
uv__is_loopback(const struct sockaddr_storage * storage)747 static int uv__is_loopback(const struct sockaddr_storage* storage) {
748 const struct sockaddr_in* in4;
749 const struct sockaddr_in6* in6;
750 int i;
751
752 if (storage->ss_family == AF_INET) {
753 in4 = (const struct sockaddr_in*) storage;
754 return in4->sin_addr.S_un.S_un_b.s_b1 == 127;
755 }
756 if (storage->ss_family == AF_INET6) {
757 in6 = (const struct sockaddr_in6*) storage;
758 for (i = 0; i < 7; ++i) {
759 if (in6->sin6_addr.u.Word[i] != 0)
760 return 0;
761 }
762 return in6->sin6_addr.u.Word[7] == htons(1);
763 }
764 return 0;
765 }
766
767 // Check if Windows version is 10.0.16299 or later
uv__is_fast_loopback_fail_supported(void)768 static int uv__is_fast_loopback_fail_supported(void) {
769 OSVERSIONINFOW os_info;
770 if (!pRtlGetVersion)
771 return 0;
772 pRtlGetVersion(&os_info);
773 if (os_info.dwMajorVersion < 10)
774 return 0;
775 if (os_info.dwMajorVersion > 10)
776 return 1;
777 if (os_info.dwMinorVersion > 0)
778 return 1;
779 return os_info.dwBuildNumber >= 16299;
780 }
781
uv__tcp_try_connect(uv_connect_t * req,uv_tcp_t * handle,const struct sockaddr * addr,unsigned int addrlen,uv_connect_cb cb)782 static int uv__tcp_try_connect(uv_connect_t* req,
783 uv_tcp_t* handle,
784 const struct sockaddr* addr,
785 unsigned int addrlen,
786 uv_connect_cb cb) {
787 uv_loop_t* loop = handle->loop;
788 TCP_INITIAL_RTO_PARAMETERS retransmit_ioctl;
789 const struct sockaddr* bind_addr;
790 struct sockaddr_storage converted;
791 BOOL success;
792 DWORD bytes;
793 int err;
794
795 err = uv__convert_to_localhost_if_unspecified(addr, &converted);
796 if (err)
797 return err;
798
799 if (handle->delayed_error != 0)
800 goto out;
801
802 if (!(handle->flags & UV_HANDLE_BOUND)) {
803 if (addrlen == sizeof(uv_addr_ip4_any_)) {
804 bind_addr = (const struct sockaddr*) &uv_addr_ip4_any_;
805 } else if (addrlen == sizeof(uv_addr_ip6_any_)) {
806 bind_addr = (const struct sockaddr*) &uv_addr_ip6_any_;
807 } else {
808 abort();
809 }
810 err = uv__tcp_try_bind(handle, bind_addr, addrlen, 0);
811 if (err)
812 return err;
813 if (handle->delayed_error != 0)
814 goto out;
815 }
816
817 if (!handle->tcp.conn.func_connectex) {
818 if (!uv__get_connectex_function(handle->socket, &handle->tcp.conn.func_connectex)) {
819 return WSAEAFNOSUPPORT;
820 }
821 }
822
823 /* This makes connect() fail instantly if the target port on the localhost
824 * is not reachable, instead of waiting for 2s. We do not care if this fails.
825 * This only works on Windows version 10.0.16299 and later.
826 */
827 if (uv__is_fast_loopback_fail_supported() && uv__is_loopback(&converted)) {
828 memset(&retransmit_ioctl, 0, sizeof(retransmit_ioctl));
829 retransmit_ioctl.Rtt = TCP_INITIAL_RTO_NO_SYN_RETRANSMISSIONS;
830 retransmit_ioctl.MaxSynRetransmissions = TCP_INITIAL_RTO_NO_SYN_RETRANSMISSIONS;
831 WSAIoctl(handle->socket,
832 SIO_TCP_INITIAL_RTO,
833 &retransmit_ioctl,
834 sizeof(retransmit_ioctl),
835 NULL,
836 0,
837 &bytes,
838 NULL,
839 NULL);
840 }
841
842 out:
843
844 UV_REQ_INIT(req, UV_CONNECT);
845 req->handle = (uv_stream_t*) handle;
846 req->cb = cb;
847 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
848
849 if (handle->delayed_error != 0) {
850 /* Process the req without IOCP. */
851 handle->reqs_pending++;
852 REGISTER_HANDLE_REQ(loop, handle, req);
853 uv__insert_pending_req(loop, (uv_req_t*)req);
854 return 0;
855 }
856
857 success = handle->tcp.conn.func_connectex(handle->socket,
858 (const struct sockaddr*) &converted,
859 addrlen,
860 NULL,
861 0,
862 &bytes,
863 &req->u.io.overlapped);
864
865 if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
866 /* Process the req without IOCP. */
867 handle->reqs_pending++;
868 REGISTER_HANDLE_REQ(loop, handle, req);
869 uv__insert_pending_req(loop, (uv_req_t*)req);
870 } else if (UV_SUCCEEDED_WITH_IOCP(success)) {
871 /* The req will be processed with IOCP. */
872 handle->reqs_pending++;
873 REGISTER_HANDLE_REQ(loop, handle, req);
874 } else {
875 return WSAGetLastError();
876 }
877
878 return 0;
879 }
880
881
uv_tcp_getsockname(const uv_tcp_t * handle,struct sockaddr * name,int * namelen)882 int uv_tcp_getsockname(const uv_tcp_t* handle,
883 struct sockaddr* name,
884 int* namelen) {
885
886 return uv__getsockpeername((const uv_handle_t*) handle,
887 getsockname,
888 name,
889 namelen,
890 handle->delayed_error);
891 }
892
893
uv_tcp_getpeername(const uv_tcp_t * handle,struct sockaddr * name,int * namelen)894 int uv_tcp_getpeername(const uv_tcp_t* handle,
895 struct sockaddr* name,
896 int* namelen) {
897
898 return uv__getsockpeername((const uv_handle_t*) handle,
899 getpeername,
900 name,
901 namelen,
902 handle->delayed_error);
903 }
904
905
uv__tcp_write(uv_loop_t * loop,uv_write_t * req,uv_tcp_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_write_cb cb)906 int uv__tcp_write(uv_loop_t* loop,
907 uv_write_t* req,
908 uv_tcp_t* handle,
909 const uv_buf_t bufs[],
910 unsigned int nbufs,
911 uv_write_cb cb) {
912 int result;
913 DWORD bytes;
914
915 UV_REQ_INIT(req, UV_WRITE);
916 req->handle = (uv_stream_t*) handle;
917 req->cb = cb;
918
919 /* Prepare the overlapped structure. */
920 memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
921 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
922 req->event_handle = CreateEvent(NULL, 0, 0, NULL);
923 if (req->event_handle == NULL) {
924 uv_fatal_error(GetLastError(), "CreateEvent");
925 }
926 req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
927 req->wait_handle = INVALID_HANDLE_VALUE;
928 }
929
930 result = WSASend(handle->socket,
931 (WSABUF*) bufs,
932 nbufs,
933 &bytes,
934 0,
935 &req->u.io.overlapped,
936 NULL);
937
938 if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
939 /* Request completed immediately. */
940 req->u.io.queued_bytes = 0;
941 handle->reqs_pending++;
942 handle->stream.conn.write_reqs_pending++;
943 REGISTER_HANDLE_REQ(loop, handle, req);
944 uv__insert_pending_req(loop, (uv_req_t*) req);
945 } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
946 /* Request queued by the kernel. */
947 req->u.io.queued_bytes = uv__count_bufs(bufs, nbufs);
948 handle->reqs_pending++;
949 handle->stream.conn.write_reqs_pending++;
950 REGISTER_HANDLE_REQ(loop, handle, req);
951 handle->write_queue_size += req->u.io.queued_bytes;
952 if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
953 !RegisterWaitForSingleObject(&req->wait_handle,
954 req->event_handle, post_write_completion, (void*) req,
955 INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
956 SET_REQ_ERROR(req, GetLastError());
957 uv__insert_pending_req(loop, (uv_req_t*)req);
958 }
959 } else {
960 /* Send failed due to an error, report it later */
961 req->u.io.queued_bytes = 0;
962 handle->reqs_pending++;
963 handle->stream.conn.write_reqs_pending++;
964 REGISTER_HANDLE_REQ(loop, handle, req);
965 SET_REQ_ERROR(req, WSAGetLastError());
966 uv__insert_pending_req(loop, (uv_req_t*) req);
967 }
968
969 return 0;
970 }
971
972
uv__tcp_try_write(uv_tcp_t * handle,const uv_buf_t bufs[],unsigned int nbufs)973 int uv__tcp_try_write(uv_tcp_t* handle,
974 const uv_buf_t bufs[],
975 unsigned int nbufs) {
976 int result;
977 DWORD bytes;
978
979 if (handle->stream.conn.write_reqs_pending > 0)
980 return UV_EAGAIN;
981
982 result = WSASend(handle->socket,
983 (WSABUF*) bufs,
984 nbufs,
985 &bytes,
986 0,
987 NULL,
988 NULL);
989
990 if (result == SOCKET_ERROR)
991 return uv_translate_sys_error(WSAGetLastError());
992 else
993 return bytes;
994 }
995
996
uv__process_tcp_read_req(uv_loop_t * loop,uv_tcp_t * handle,uv_req_t * req)997 void uv__process_tcp_read_req(uv_loop_t* loop, uv_tcp_t* handle,
998 uv_req_t* req) {
999 DWORD bytes, flags, err;
1000 uv_buf_t buf;
1001 int count;
1002
1003 assert(handle->type == UV_TCP);
1004
1005 handle->flags &= ~UV_HANDLE_READ_PENDING;
1006
1007 if (!REQ_SUCCESS(req)) {
1008 /* An error occurred doing the read. */
1009 if ((handle->flags & UV_HANDLE_READING) ||
1010 !(handle->flags & UV_HANDLE_ZERO_READ)) {
1011 handle->flags &= ~UV_HANDLE_READING;
1012 DECREASE_ACTIVE_COUNT(loop, handle);
1013 buf = (handle->flags & UV_HANDLE_ZERO_READ) ?
1014 uv_buf_init(NULL, 0) : handle->tcp.conn.read_buffer;
1015
1016 err = GET_REQ_SOCK_ERROR(req);
1017
1018 if (err == WSAECONNABORTED) {
1019 /* Turn WSAECONNABORTED into UV_ECONNRESET to be consistent with Unix.
1020 */
1021 err = WSAECONNRESET;
1022 }
1023 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1024
1025 handle->read_cb((uv_stream_t*)handle,
1026 uv_translate_sys_error(err),
1027 &buf);
1028 }
1029 } else {
1030 if (!(handle->flags & UV_HANDLE_ZERO_READ)) {
1031 /* The read was done with a non-zero buffer length. */
1032 if (req->u.io.overlapped.InternalHigh > 0) {
1033 /* Successful read */
1034 handle->read_cb((uv_stream_t*)handle,
1035 req->u.io.overlapped.InternalHigh,
1036 &handle->tcp.conn.read_buffer);
1037 /* Read again only if bytes == buf.len */
1038 if (req->u.io.overlapped.InternalHigh < handle->tcp.conn.read_buffer.len) {
1039 goto done;
1040 }
1041 } else {
1042 /* Connection closed */
1043 if (handle->flags & UV_HANDLE_READING) {
1044 handle->flags &= ~UV_HANDLE_READING;
1045 DECREASE_ACTIVE_COUNT(loop, handle);
1046 }
1047
1048 buf.base = 0;
1049 buf.len = 0;
1050 handle->read_cb((uv_stream_t*)handle, UV_EOF, &handle->tcp.conn.read_buffer);
1051 goto done;
1052 }
1053 }
1054
1055 /* Do nonblocking reads until the buffer is empty */
1056 count = 32;
1057 while ((handle->flags & UV_HANDLE_READING) && (count-- > 0)) {
1058 buf = uv_buf_init(NULL, 0);
1059 handle->alloc_cb((uv_handle_t*) handle, 65536, &buf);
1060 if (buf.base == NULL || buf.len == 0) {
1061 handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1062 break;
1063 }
1064 assert(buf.base != NULL);
1065
1066 flags = 0;
1067 if (WSARecv(handle->socket,
1068 (WSABUF*)&buf,
1069 1,
1070 &bytes,
1071 &flags,
1072 NULL,
1073 NULL) != SOCKET_ERROR) {
1074 if (bytes > 0) {
1075 /* Successful read */
1076 handle->read_cb((uv_stream_t*)handle, bytes, &buf);
1077 /* Read again only if bytes == buf.len */
1078 if (bytes < buf.len) {
1079 break;
1080 }
1081 } else {
1082 /* Connection closed */
1083 handle->flags &= ~UV_HANDLE_READING;
1084 DECREASE_ACTIVE_COUNT(loop, handle);
1085
1086 handle->read_cb((uv_stream_t*)handle, UV_EOF, &buf);
1087 break;
1088 }
1089 } else {
1090 err = WSAGetLastError();
1091 if (err == WSAEWOULDBLOCK) {
1092 /* Read buffer was completely empty, report a 0-byte read. */
1093 handle->read_cb((uv_stream_t*)handle, 0, &buf);
1094 } else {
1095 /* Ouch! serious error. */
1096 handle->flags &= ~UV_HANDLE_READING;
1097 DECREASE_ACTIVE_COUNT(loop, handle);
1098
1099 if (err == WSAECONNABORTED) {
1100 /* Turn WSAECONNABORTED into UV_ECONNRESET to be consistent with
1101 * Unix. */
1102 err = WSAECONNRESET;
1103 }
1104 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1105
1106 handle->read_cb((uv_stream_t*)handle,
1107 uv_translate_sys_error(err),
1108 &buf);
1109 }
1110 break;
1111 }
1112 }
1113
1114 done:
1115 /* Post another read if still reading and not closing. */
1116 if ((handle->flags & UV_HANDLE_READING) &&
1117 !(handle->flags & UV_HANDLE_READ_PENDING)) {
1118 uv__tcp_queue_read(loop, handle);
1119 }
1120 }
1121
1122 DECREASE_PENDING_REQ_COUNT(handle);
1123 }
1124
1125
uv__process_tcp_write_req(uv_loop_t * loop,uv_tcp_t * handle,uv_write_t * req)1126 void uv__process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,
1127 uv_write_t* req) {
1128 int err;
1129
1130 assert(handle->type == UV_TCP);
1131
1132 assert(handle->write_queue_size >= req->u.io.queued_bytes);
1133 handle->write_queue_size -= req->u.io.queued_bytes;
1134
1135 UNREGISTER_HANDLE_REQ(loop, handle, req);
1136
1137 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1138 if (req->wait_handle != INVALID_HANDLE_VALUE) {
1139 UnregisterWait(req->wait_handle);
1140 req->wait_handle = INVALID_HANDLE_VALUE;
1141 }
1142 if (req->event_handle != NULL) {
1143 CloseHandle(req->event_handle);
1144 req->event_handle = NULL;
1145 }
1146 }
1147
1148 if (req->cb) {
1149 err = uv_translate_sys_error(GET_REQ_SOCK_ERROR(req));
1150 if (err == UV_ECONNABORTED) {
1151 /* use UV_ECANCELED for consistency with Unix */
1152 err = UV_ECANCELED;
1153 }
1154 req->cb(req, err);
1155 }
1156
1157 handle->stream.conn.write_reqs_pending--;
1158 if (handle->stream.conn.write_reqs_pending == 0) {
1159 if (handle->flags & UV_HANDLE_CLOSING) {
1160 closesocket(handle->socket);
1161 handle->socket = INVALID_SOCKET;
1162 }
1163 if (handle->stream.conn.shutdown_req != NULL) {
1164 uv__want_endgame(loop, (uv_handle_t*)handle);
1165 }
1166 }
1167
1168 DECREASE_PENDING_REQ_COUNT(handle);
1169 }
1170
1171
uv__process_tcp_accept_req(uv_loop_t * loop,uv_tcp_t * handle,uv_req_t * raw_req)1172 void uv__process_tcp_accept_req(uv_loop_t* loop, uv_tcp_t* handle,
1173 uv_req_t* raw_req) {
1174 uv_tcp_accept_t* req = (uv_tcp_accept_t*) raw_req;
1175 int err;
1176
1177 assert(handle->type == UV_TCP);
1178
1179 /* If handle->accepted_socket is not a valid socket, then uv_queue_accept
1180 * must have failed. This is a serious error. We stop accepting connections
1181 * and report this error to the connection callback. */
1182 if (req->accept_socket == INVALID_SOCKET) {
1183 if (handle->flags & UV_HANDLE_LISTENING) {
1184 handle->flags &= ~UV_HANDLE_LISTENING;
1185 DECREASE_ACTIVE_COUNT(loop, handle);
1186 if (handle->stream.serv.connection_cb) {
1187 err = GET_REQ_SOCK_ERROR(req);
1188 handle->stream.serv.connection_cb((uv_stream_t*)handle,
1189 uv_translate_sys_error(err));
1190 }
1191 }
1192 } else if (REQ_SUCCESS(req) &&
1193 setsockopt(req->accept_socket,
1194 SOL_SOCKET,
1195 SO_UPDATE_ACCEPT_CONTEXT,
1196 (char*)&handle->socket,
1197 sizeof(handle->socket)) == 0) {
1198 req->next_pending = handle->tcp.serv.pending_accepts;
1199 handle->tcp.serv.pending_accepts = req;
1200
1201 /* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */
1202 if (handle->stream.serv.connection_cb) {
1203 handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
1204 }
1205 } else {
1206 /* Error related to accepted socket is ignored because the server socket
1207 * may still be healthy. If the server socket is broken uv_queue_accept
1208 * will detect it. */
1209 closesocket(req->accept_socket);
1210 req->accept_socket = INVALID_SOCKET;
1211 if (handle->flags & UV_HANDLE_LISTENING) {
1212 uv__tcp_queue_accept(handle, req);
1213 }
1214 }
1215
1216 DECREASE_PENDING_REQ_COUNT(handle);
1217 }
1218
1219
uv__process_tcp_connect_req(uv_loop_t * loop,uv_tcp_t * handle,uv_connect_t * req)1220 void uv__process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
1221 uv_connect_t* req) {
1222 int err;
1223
1224 assert(handle->type == UV_TCP);
1225
1226 UNREGISTER_HANDLE_REQ(loop, handle, req);
1227
1228 err = 0;
1229 if (handle->delayed_error) {
1230 /* To smooth over the differences between unixes errors that
1231 * were reported synchronously on the first connect can be delayed
1232 * until the next tick--which is now.
1233 */
1234 err = handle->delayed_error;
1235 handle->delayed_error = 0;
1236 } else if (REQ_SUCCESS(req)) {
1237 if (handle->flags & UV_HANDLE_CLOSING) {
1238 /* use UV_ECANCELED for consistency with Unix */
1239 err = ERROR_OPERATION_ABORTED;
1240 } else if (setsockopt(handle->socket,
1241 SOL_SOCKET,
1242 SO_UPDATE_CONNECT_CONTEXT,
1243 NULL,
1244 0) == 0) {
1245 uv__connection_init((uv_stream_t*)handle);
1246 handle->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1247 loop->active_tcp_streams++;
1248 } else {
1249 err = WSAGetLastError();
1250 }
1251 } else {
1252 err = GET_REQ_SOCK_ERROR(req);
1253 }
1254 req->cb(req, uv_translate_sys_error(err));
1255
1256 DECREASE_PENDING_REQ_COUNT(handle);
1257 }
1258
1259
uv__tcp_xfer_export(uv_tcp_t * handle,int target_pid,uv__ipc_socket_xfer_type_t * xfer_type,uv__ipc_socket_xfer_info_t * xfer_info)1260 int uv__tcp_xfer_export(uv_tcp_t* handle,
1261 int target_pid,
1262 uv__ipc_socket_xfer_type_t* xfer_type,
1263 uv__ipc_socket_xfer_info_t* xfer_info) {
1264 if (handle->flags & UV_HANDLE_CONNECTION) {
1265 *xfer_type = UV__IPC_SOCKET_XFER_TCP_CONNECTION;
1266 } else {
1267 *xfer_type = UV__IPC_SOCKET_XFER_TCP_SERVER;
1268 /* We're about to share the socket with another process. Because this is a
1269 * listening socket, we assume that the other process will be accepting
1270 * connections on it. Thus, before sharing the socket with another process,
1271 * we call listen here in the parent process. */
1272 if (!(handle->flags & UV_HANDLE_LISTENING)) {
1273 if (!(handle->flags & UV_HANDLE_BOUND)) {
1274 return ERROR_NOT_SUPPORTED;
1275 }
1276 if (handle->delayed_error == 0 &&
1277 listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) {
1278 handle->delayed_error = WSAGetLastError();
1279 }
1280 }
1281 }
1282
1283 if (WSADuplicateSocketW(handle->socket, target_pid, &xfer_info->socket_info))
1284 return WSAGetLastError();
1285 xfer_info->delayed_error = handle->delayed_error;
1286
1287 /* Mark the local copy of the handle as 'shared' so we behave in a way that's
1288 * friendly to the process(es) that we share the socket with. */
1289 handle->flags |= UV_HANDLE_SHARED_TCP_SOCKET;
1290
1291 return 0;
1292 }
1293
1294
uv__tcp_xfer_import(uv_tcp_t * tcp,uv__ipc_socket_xfer_type_t xfer_type,uv__ipc_socket_xfer_info_t * xfer_info)1295 int uv__tcp_xfer_import(uv_tcp_t* tcp,
1296 uv__ipc_socket_xfer_type_t xfer_type,
1297 uv__ipc_socket_xfer_info_t* xfer_info) {
1298 int err;
1299 SOCKET socket;
1300
1301 assert(xfer_type == UV__IPC_SOCKET_XFER_TCP_SERVER ||
1302 xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION);
1303
1304 socket = WSASocketW(FROM_PROTOCOL_INFO,
1305 FROM_PROTOCOL_INFO,
1306 FROM_PROTOCOL_INFO,
1307 &xfer_info->socket_info,
1308 0,
1309 WSA_FLAG_OVERLAPPED);
1310
1311 if (socket == INVALID_SOCKET) {
1312 return WSAGetLastError();
1313 }
1314
1315 err = uv__tcp_set_socket(
1316 tcp->loop, tcp, socket, xfer_info->socket_info.iAddressFamily, 1);
1317 if (err) {
1318 closesocket(socket);
1319 return err;
1320 }
1321
1322 tcp->delayed_error = xfer_info->delayed_error;
1323 tcp->flags |= UV_HANDLE_BOUND | UV_HANDLE_SHARED_TCP_SOCKET;
1324
1325 if (xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION) {
1326 uv__connection_init((uv_stream_t*)tcp);
1327 tcp->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1328 }
1329
1330 tcp->loop->active_tcp_streams++;
1331 return 0;
1332 }
1333
1334
uv_tcp_nodelay(uv_tcp_t * handle,int enable)1335 int uv_tcp_nodelay(uv_tcp_t* handle, int enable) {
1336 int err;
1337
1338 if (handle->socket != INVALID_SOCKET) {
1339 err = uv__tcp_nodelay(handle, handle->socket, enable);
1340 if (err)
1341 return uv_translate_sys_error(err);
1342 }
1343
1344 if (enable) {
1345 handle->flags |= UV_HANDLE_TCP_NODELAY;
1346 } else {
1347 handle->flags &= ~UV_HANDLE_TCP_NODELAY;
1348 }
1349
1350 return 0;
1351 }
1352
1353
uv_tcp_keepalive(uv_tcp_t * handle,int enable,unsigned int delay)1354 int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) {
1355 int err;
1356
1357 if (handle->socket != INVALID_SOCKET) {
1358 err = uv__tcp_keepalive(handle, handle->socket, enable, delay);
1359 if (err)
1360 return uv_translate_sys_error(err);
1361 }
1362
1363 if (enable) {
1364 handle->flags |= UV_HANDLE_TCP_KEEPALIVE;
1365 } else {
1366 handle->flags &= ~UV_HANDLE_TCP_KEEPALIVE;
1367 }
1368
1369 /* TODO: Store delay if handle->socket isn't created yet. */
1370
1371 return 0;
1372 }
1373
1374
uv_tcp_simultaneous_accepts(uv_tcp_t * handle,int enable)1375 int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) {
1376 if (handle->flags & UV_HANDLE_CONNECTION) {
1377 return UV_EINVAL;
1378 }
1379
1380 /* Check if we're already in the desired mode. */
1381 if ((enable && !(handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) ||
1382 (!enable && handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
1383 return 0;
1384 }
1385
1386 /* Don't allow switching from single pending accept to many. */
1387 if (enable) {
1388 return UV_ENOTSUP;
1389 }
1390
1391 /* Check if we're in a middle of changing the number of pending accepts. */
1392 if (handle->flags & UV_HANDLE_TCP_ACCEPT_STATE_CHANGING) {
1393 return 0;
1394 }
1395
1396 handle->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
1397
1398 /* Flip the changing flag if we have already queued multiple accepts. */
1399 if (handle->flags & UV_HANDLE_LISTENING) {
1400 handle->flags |= UV_HANDLE_TCP_ACCEPT_STATE_CHANGING;
1401 }
1402
1403 return 0;
1404 }
1405
1406
uv__tcp_try_cancel_reqs(uv_tcp_t * tcp)1407 static void uv__tcp_try_cancel_reqs(uv_tcp_t* tcp) {
1408 SOCKET socket;
1409 int non_ifs_lsp;
1410 int reading;
1411 int writing;
1412
1413 socket = tcp->socket;
1414 reading = tcp->flags & UV_HANDLE_READING;
1415 writing = tcp->stream.conn.write_reqs_pending > 0;
1416 if (!reading && !writing)
1417 return;
1418
1419 /* TODO: in libuv v2, keep explicit track of write_reqs, so we can cancel
1420 * them each explicitly with CancelIoEx (like unix). */
1421 if (reading)
1422 CancelIoEx((HANDLE) socket, &tcp->read_req.u.io.overlapped);
1423 if (writing)
1424 CancelIo((HANDLE) socket);
1425
1426 /* Check if we have any non-IFS LSPs stacked on top of TCP */
1427 non_ifs_lsp = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_non_ifs_lsp_ipv6 :
1428 uv_tcp_non_ifs_lsp_ipv4;
1429
1430 /* If there are non-ifs LSPs then try to obtain a base handle for the socket.
1431 * This will always fail on Windows XP/3k. */
1432 if (non_ifs_lsp) {
1433 DWORD bytes;
1434 if (WSAIoctl(socket,
1435 SIO_BASE_HANDLE,
1436 NULL,
1437 0,
1438 &socket,
1439 sizeof socket,
1440 &bytes,
1441 NULL,
1442 NULL) != 0) {
1443 /* Failed. We can't do CancelIo. */
1444 return;
1445 }
1446 }
1447
1448 assert(socket != 0 && socket != INVALID_SOCKET);
1449
1450 if (socket != tcp->socket) {
1451 if (reading)
1452 CancelIoEx((HANDLE) socket, &tcp->read_req.u.io.overlapped);
1453 if (writing)
1454 CancelIo((HANDLE) socket);
1455 }
1456 }
1457
1458
uv__tcp_close(uv_loop_t * loop,uv_tcp_t * tcp)1459 void uv__tcp_close(uv_loop_t* loop, uv_tcp_t* tcp) {
1460 if (tcp->flags & UV_HANDLE_CONNECTION) {
1461 uv__tcp_try_cancel_reqs(tcp);
1462 if (tcp->flags & UV_HANDLE_READING) {
1463 uv_read_stop((uv_stream_t*) tcp);
1464 }
1465 } else {
1466 if (tcp->tcp.serv.accept_reqs != NULL) {
1467 /* First close the incoming sockets to cancel the accept operations before
1468 * we free their resources. */
1469 unsigned int i;
1470 for (i = 0; i < uv_simultaneous_server_accepts; i++) {
1471 uv_tcp_accept_t* req = &tcp->tcp.serv.accept_reqs[i];
1472 if (req->accept_socket != INVALID_SOCKET) {
1473 closesocket(req->accept_socket);
1474 req->accept_socket = INVALID_SOCKET;
1475 }
1476 }
1477 }
1478 assert(!(tcp->flags & UV_HANDLE_READING));
1479 }
1480
1481 if (tcp->flags & UV_HANDLE_LISTENING) {
1482 tcp->flags &= ~UV_HANDLE_LISTENING;
1483 DECREASE_ACTIVE_COUNT(loop, tcp);
1484 }
1485
1486 /* If any overlapped req failed to cancel, calling `closesocket` now would
1487 * cause Win32 to send an RST packet. Try to avoid that for writes, if
1488 * possibly applicable, by waiting to process the completion notifications
1489 * first (which typically should be cancellations). There's not much we can
1490 * do about canceled reads, which also will generate an RST packet. */
1491 if (!(tcp->flags & UV_HANDLE_CONNECTION) ||
1492 tcp->stream.conn.write_reqs_pending == 0) {
1493 closesocket(tcp->socket);
1494 tcp->socket = INVALID_SOCKET;
1495 }
1496
1497 tcp->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1498 uv__handle_closing(tcp);
1499
1500 if (tcp->reqs_pending == 0) {
1501 uv__want_endgame(tcp->loop, (uv_handle_t*)tcp);
1502 }
1503 }
1504
1505
uv_tcp_open(uv_tcp_t * handle,uv_os_sock_t sock)1506 int uv_tcp_open(uv_tcp_t* handle, uv_os_sock_t sock) {
1507 WSAPROTOCOL_INFOW protocol_info;
1508 int opt_len;
1509 int err;
1510 struct sockaddr_storage saddr;
1511 int saddr_len;
1512
1513 /* Detect the address family of the socket. */
1514 opt_len = (int) sizeof protocol_info;
1515 if (getsockopt(sock,
1516 SOL_SOCKET,
1517 SO_PROTOCOL_INFOW,
1518 (char*) &protocol_info,
1519 &opt_len) == SOCKET_ERROR) {
1520 return uv_translate_sys_error(GetLastError());
1521 }
1522
1523 err = uv__tcp_set_socket(handle->loop,
1524 handle,
1525 sock,
1526 protocol_info.iAddressFamily,
1527 1);
1528 if (err) {
1529 return uv_translate_sys_error(err);
1530 }
1531
1532 /* Support already active socket. */
1533 saddr_len = sizeof(saddr);
1534 if (!uv_tcp_getsockname(handle, (struct sockaddr*) &saddr, &saddr_len)) {
1535 /* Socket is already bound. */
1536 handle->flags |= UV_HANDLE_BOUND;
1537 saddr_len = sizeof(saddr);
1538 if (!uv_tcp_getpeername(handle, (struct sockaddr*) &saddr, &saddr_len)) {
1539 /* Socket is already connected. */
1540 uv__connection_init((uv_stream_t*) handle);
1541 handle->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1542 }
1543 }
1544
1545 return 0;
1546 }
1547
1548
1549 /* This function is an egress point, i.e. it returns libuv errors rather than
1550 * system errors.
1551 */
uv__tcp_bind(uv_tcp_t * handle,const struct sockaddr * addr,unsigned int addrlen,unsigned int flags)1552 int uv__tcp_bind(uv_tcp_t* handle,
1553 const struct sockaddr* addr,
1554 unsigned int addrlen,
1555 unsigned int flags) {
1556 int err;
1557
1558 err = uv__tcp_try_bind(handle, addr, addrlen, flags);
1559 if (err)
1560 return uv_translate_sys_error(err);
1561
1562 return 0;
1563 }
1564
1565
1566 /* This function is an egress point, i.e. it returns libuv errors rather than
1567 * system errors.
1568 */
uv__tcp_connect(uv_connect_t * req,uv_tcp_t * handle,const struct sockaddr * addr,unsigned int addrlen,uv_connect_cb cb)1569 int uv__tcp_connect(uv_connect_t* req,
1570 uv_tcp_t* handle,
1571 const struct sockaddr* addr,
1572 unsigned int addrlen,
1573 uv_connect_cb cb) {
1574 int err;
1575
1576 err = uv__tcp_try_connect(req, handle, addr, addrlen, cb);
1577 if (err)
1578 return uv_translate_sys_error(err);
1579
1580 return 0;
1581 }
1582
1583 #ifndef WSA_FLAG_NO_HANDLE_INHERIT
1584 /* Added in Windows 7 SP1. Specify this to avoid race conditions, */
1585 /* but also manually clear the inherit flag in case this failed. */
1586 #define WSA_FLAG_NO_HANDLE_INHERIT 0x80
1587 #endif
1588
uv_socketpair(int type,int protocol,uv_os_sock_t fds[2],int flags0,int flags1)1589 int uv_socketpair(int type, int protocol, uv_os_sock_t fds[2], int flags0, int flags1) {
1590 SOCKET server = INVALID_SOCKET;
1591 SOCKET client0 = INVALID_SOCKET;
1592 SOCKET client1 = INVALID_SOCKET;
1593 SOCKADDR_IN name;
1594 LPFN_ACCEPTEX func_acceptex;
1595 WSAOVERLAPPED overlap;
1596 char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32];
1597 int namelen;
1598 int err;
1599 DWORD bytes;
1600 DWORD flags;
1601 DWORD client0_flags = WSA_FLAG_NO_HANDLE_INHERIT;
1602 DWORD client1_flags = WSA_FLAG_NO_HANDLE_INHERIT;
1603
1604 if (flags0 & UV_NONBLOCK_PIPE)
1605 client0_flags |= WSA_FLAG_OVERLAPPED;
1606 if (flags1 & UV_NONBLOCK_PIPE)
1607 client1_flags |= WSA_FLAG_OVERLAPPED;
1608
1609 server = WSASocketW(AF_INET, type, protocol, NULL, 0,
1610 WSA_FLAG_OVERLAPPED | WSA_FLAG_NO_HANDLE_INHERIT);
1611 if (server == INVALID_SOCKET)
1612 goto wsaerror;
1613 if (!SetHandleInformation((HANDLE) server, HANDLE_FLAG_INHERIT, 0))
1614 goto error;
1615 name.sin_family = AF_INET;
1616 name.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1617 name.sin_port = 0;
1618 if (bind(server, (SOCKADDR*) &name, sizeof(name)) != 0)
1619 goto wsaerror;
1620 if (listen(server, 1) != 0)
1621 goto wsaerror;
1622 namelen = sizeof(name);
1623 if (getsockname(server, (SOCKADDR*) &name, &namelen) != 0)
1624 goto wsaerror;
1625 client0 = WSASocketW(AF_INET, type, protocol, NULL, 0, client0_flags);
1626 if (client0 == INVALID_SOCKET)
1627 goto wsaerror;
1628 if (!SetHandleInformation((HANDLE) client0, HANDLE_FLAG_INHERIT, 0))
1629 goto error;
1630 if (connect(client0, (SOCKADDR*) &name, sizeof(name)) != 0)
1631 goto wsaerror;
1632 client1 = WSASocketW(AF_INET, type, protocol, NULL, 0, client1_flags);
1633 if (client1 == INVALID_SOCKET)
1634 goto wsaerror;
1635 if (!SetHandleInformation((HANDLE) client1, HANDLE_FLAG_INHERIT, 0))
1636 goto error;
1637 if (!uv__get_acceptex_function(server, &func_acceptex)) {
1638 err = WSAEAFNOSUPPORT;
1639 goto cleanup;
1640 }
1641 memset(&overlap, 0, sizeof(overlap));
1642 if (!func_acceptex(server,
1643 client1,
1644 accept_buffer,
1645 0,
1646 sizeof(struct sockaddr_storage),
1647 sizeof(struct sockaddr_storage),
1648 &bytes,
1649 &overlap)) {
1650 err = WSAGetLastError();
1651 if (err == ERROR_IO_PENDING) {
1652 /* Result should complete immediately, since we already called connect,
1653 * but empirically, we sometimes have to poll the kernel a couple times
1654 * until it notices that. */
1655 while (!WSAGetOverlappedResult(client1, &overlap, &bytes, FALSE, &flags)) {
1656 err = WSAGetLastError();
1657 if (err != WSA_IO_INCOMPLETE)
1658 goto cleanup;
1659 SwitchToThread();
1660 }
1661 }
1662 else {
1663 goto cleanup;
1664 }
1665 }
1666 if (setsockopt(client1, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
1667 (char*) &server, sizeof(server)) != 0) {
1668 goto wsaerror;
1669 }
1670
1671 closesocket(server);
1672
1673 fds[0] = client0;
1674 fds[1] = client1;
1675
1676 return 0;
1677
1678 wsaerror:
1679 err = WSAGetLastError();
1680 goto cleanup;
1681
1682 error:
1683 err = GetLastError();
1684 goto cleanup;
1685
1686 cleanup:
1687 if (server != INVALID_SOCKET)
1688 closesocket(server);
1689 if (client0 != INVALID_SOCKET)
1690 closesocket(client0);
1691 if (client1 != INVALID_SOCKET)
1692 closesocket(client1);
1693
1694 assert(err);
1695 return uv_translate_sys_error(err);
1696 }
1697