1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22 #include <assert.h>
23 #include <stdlib.h>
24
25 #include "uv.h"
26 #include "internal.h"
27 #include "handle-inl.h"
28 #include "stream-inl.h"
29 #include "req-inl.h"
30
31
32 /*
33 * Threshold of active tcp streams for which to preallocate tcp read buffers.
34 * (Due to node slab allocator performing poorly under this pattern,
35 * the optimization is temporarily disabled (threshold=0). This will be
36 * revisited once node allocator is improved.)
37 */
38 const unsigned int uv_active_tcp_streams_threshold = 0;
39
40 /*
41 * Number of simultaneous pending AcceptEx calls.
42 */
43 const unsigned int uv_simultaneous_server_accepts = 32;
44
45 /* A zero-size buffer for use by uv_tcp_read */
46 static char uv_zero_[] = "";
47
uv__tcp_nodelay(uv_tcp_t * handle,SOCKET socket,int enable)48 static int uv__tcp_nodelay(uv_tcp_t* handle, SOCKET socket, int enable) {
49 if (setsockopt(socket,
50 IPPROTO_TCP,
51 TCP_NODELAY,
52 (const char*)&enable,
53 sizeof enable) == -1) {
54 return WSAGetLastError();
55 }
56 return 0;
57 }
58
59
uv__tcp_keepalive(uv_tcp_t * handle,SOCKET socket,int enable,unsigned int delay)60 static int uv__tcp_keepalive(uv_tcp_t* handle, SOCKET socket, int enable, unsigned int delay) {
61 if (setsockopt(socket,
62 SOL_SOCKET,
63 SO_KEEPALIVE,
64 (const char*)&enable,
65 sizeof enable) == -1) {
66 return WSAGetLastError();
67 }
68
69 if (enable && setsockopt(socket,
70 IPPROTO_TCP,
71 TCP_KEEPALIVE,
72 (const char*)&delay,
73 sizeof delay) == -1) {
74 return WSAGetLastError();
75 }
76
77 return 0;
78 }
79
80
uv_tcp_set_socket(uv_loop_t * loop,uv_tcp_t * handle,SOCKET socket,int family,int imported)81 static int uv_tcp_set_socket(uv_loop_t* loop,
82 uv_tcp_t* handle,
83 SOCKET socket,
84 int family,
85 int imported) {
86 DWORD yes = 1;
87 int non_ifs_lsp;
88 int err;
89
90 if (handle->socket != INVALID_SOCKET)
91 return UV_EBUSY;
92
93 /* Set the socket to nonblocking mode */
94 if (ioctlsocket(socket, FIONBIO, &yes) == SOCKET_ERROR) {
95 return WSAGetLastError();
96 }
97
98 /* Make the socket non-inheritable */
99 if (!SetHandleInformation((HANDLE) socket, HANDLE_FLAG_INHERIT, 0))
100 return GetLastError();
101
102 /* Associate it with the I/O completion port. Use uv_handle_t pointer as
103 * completion key. */
104 if (CreateIoCompletionPort((HANDLE)socket,
105 loop->iocp,
106 (ULONG_PTR)socket,
107 0) == NULL) {
108 if (imported) {
109 handle->flags |= UV_HANDLE_EMULATE_IOCP;
110 } else {
111 return GetLastError();
112 }
113 }
114
115 if (family == AF_INET6) {
116 non_ifs_lsp = uv_tcp_non_ifs_lsp_ipv6;
117 } else {
118 non_ifs_lsp = uv_tcp_non_ifs_lsp_ipv4;
119 }
120
121 if (!(handle->flags & UV_HANDLE_EMULATE_IOCP) && !non_ifs_lsp) {
122 UCHAR sfcnm_flags =
123 FILE_SKIP_SET_EVENT_ON_HANDLE | FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
124 if (!SetFileCompletionNotificationModes((HANDLE) socket, sfcnm_flags))
125 return GetLastError();
126 handle->flags |= UV_HANDLE_SYNC_BYPASS_IOCP;
127 }
128
129 if (handle->flags & UV_HANDLE_TCP_NODELAY) {
130 err = uv__tcp_nodelay(handle, socket, 1);
131 if (err)
132 return err;
133 }
134
135 /* TODO: Use stored delay. */
136 if (handle->flags & UV_HANDLE_TCP_KEEPALIVE) {
137 err = uv__tcp_keepalive(handle, socket, 1, 60);
138 if (err)
139 return err;
140 }
141
142 handle->socket = socket;
143
144 if (family == AF_INET6) {
145 handle->flags |= UV_HANDLE_IPV6;
146 } else {
147 assert(!(handle->flags & UV_HANDLE_IPV6));
148 }
149
150 return 0;
151 }
152
153
uv_tcp_init_ex(uv_loop_t * loop,uv_tcp_t * handle,unsigned int flags)154 int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* handle, unsigned int flags) {
155 int domain;
156
157 /* Use the lower 8 bits for the domain */
158 domain = flags & 0xFF;
159 if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
160 return UV_EINVAL;
161
162 if (flags & ~0xFF)
163 return UV_EINVAL;
164
165 uv_stream_init(loop, (uv_stream_t*) handle, UV_TCP);
166 handle->tcp.serv.accept_reqs = NULL;
167 handle->tcp.serv.pending_accepts = NULL;
168 handle->socket = INVALID_SOCKET;
169 handle->reqs_pending = 0;
170 handle->tcp.serv.func_acceptex = NULL;
171 handle->tcp.conn.func_connectex = NULL;
172 handle->tcp.serv.processed_accepts = 0;
173 handle->delayed_error = 0;
174
175 /* If anything fails beyond this point we need to remove the handle from
176 * the handle queue, since it was added by uv__handle_init in uv_stream_init.
177 */
178
179 if (domain != AF_UNSPEC) {
180 SOCKET sock;
181 DWORD err;
182
183 sock = socket(domain, SOCK_STREAM, 0);
184 if (sock == INVALID_SOCKET) {
185 err = WSAGetLastError();
186 QUEUE_REMOVE(&handle->handle_queue);
187 return uv_translate_sys_error(err);
188 }
189
190 err = uv_tcp_set_socket(handle->loop, handle, sock, domain, 0);
191 if (err) {
192 closesocket(sock);
193 QUEUE_REMOVE(&handle->handle_queue);
194 return uv_translate_sys_error(err);
195 }
196
197 }
198
199 return 0;
200 }
201
202
uv_tcp_init(uv_loop_t * loop,uv_tcp_t * handle)203 int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) {
204 return uv_tcp_init_ex(loop, handle, AF_UNSPEC);
205 }
206
207
uv_tcp_endgame(uv_loop_t * loop,uv_tcp_t * handle)208 void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) {
209 int err;
210 unsigned int i;
211 uv_tcp_accept_t* req;
212
213 if (handle->flags & UV_HANDLE_CONNECTION &&
214 handle->stream.conn.shutdown_req != NULL &&
215 handle->stream.conn.write_reqs_pending == 0) {
216
217 UNREGISTER_HANDLE_REQ(loop, handle, handle->stream.conn.shutdown_req);
218
219 err = 0;
220 if (handle->flags & UV_HANDLE_CLOSING) {
221 err = ERROR_OPERATION_ABORTED;
222 } else if (shutdown(handle->socket, SD_SEND) == SOCKET_ERROR) {
223 err = WSAGetLastError();
224 }
225
226 if (handle->stream.conn.shutdown_req->cb) {
227 handle->stream.conn.shutdown_req->cb(handle->stream.conn.shutdown_req,
228 uv_translate_sys_error(err));
229 }
230
231 handle->stream.conn.shutdown_req = NULL;
232 DECREASE_PENDING_REQ_COUNT(handle);
233 return;
234 }
235
236 if (handle->flags & UV_HANDLE_CLOSING &&
237 handle->reqs_pending == 0) {
238 assert(!(handle->flags & UV_HANDLE_CLOSED));
239
240 if (!(handle->flags & UV_HANDLE_TCP_SOCKET_CLOSED)) {
241 closesocket(handle->socket);
242 handle->socket = INVALID_SOCKET;
243 handle->flags |= UV_HANDLE_TCP_SOCKET_CLOSED;
244 }
245
246 if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->tcp.serv.accept_reqs) {
247 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
248 for (i = 0; i < uv_simultaneous_server_accepts; i++) {
249 req = &handle->tcp.serv.accept_reqs[i];
250 if (req->wait_handle != INVALID_HANDLE_VALUE) {
251 UnregisterWait(req->wait_handle);
252 req->wait_handle = INVALID_HANDLE_VALUE;
253 }
254 if (req->event_handle != NULL) {
255 CloseHandle(req->event_handle);
256 req->event_handle = NULL;
257 }
258 }
259 }
260
261 uv__free(handle->tcp.serv.accept_reqs);
262 handle->tcp.serv.accept_reqs = NULL;
263 }
264
265 if (handle->flags & UV_HANDLE_CONNECTION &&
266 handle->flags & UV_HANDLE_EMULATE_IOCP) {
267 if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
268 UnregisterWait(handle->read_req.wait_handle);
269 handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
270 }
271 if (handle->read_req.event_handle != NULL) {
272 CloseHandle(handle->read_req.event_handle);
273 handle->read_req.event_handle = NULL;
274 }
275 }
276
277 uv__handle_close(handle);
278 loop->active_tcp_streams--;
279 }
280 }
281
282
283 /* Unlike on Unix, here we don't set SO_REUSEADDR, because it doesn't just
284 * allow binding to addresses that are in use by sockets in TIME_WAIT, it
285 * effectively allows 'stealing' a port which is in use by another application.
286 *
287 * SO_EXCLUSIVEADDRUSE is also not good here because it does check all sockets,
288 * regardless of state, so we'd get an error even if the port is in use by a
289 * socket in TIME_WAIT state.
290 *
291 * See issue #1360.
292 *
293 */
uv_tcp_try_bind(uv_tcp_t * handle,const struct sockaddr * addr,unsigned int addrlen,unsigned int flags)294 static int uv_tcp_try_bind(uv_tcp_t* handle,
295 const struct sockaddr* addr,
296 unsigned int addrlen,
297 unsigned int flags) {
298 DWORD err;
299 int r;
300
301 if (handle->socket == INVALID_SOCKET) {
302 SOCKET sock;
303
304 /* Cannot set IPv6-only mode on non-IPv6 socket. */
305 if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)
306 return ERROR_INVALID_PARAMETER;
307
308 sock = socket(addr->sa_family, SOCK_STREAM, 0);
309 if (sock == INVALID_SOCKET) {
310 return WSAGetLastError();
311 }
312
313 err = uv_tcp_set_socket(handle->loop, handle, sock, addr->sa_family, 0);
314 if (err) {
315 closesocket(sock);
316 return err;
317 }
318 }
319
320 #ifdef IPV6_V6ONLY
321 if (addr->sa_family == AF_INET6) {
322 int on;
323
324 on = (flags & UV_TCP_IPV6ONLY) != 0;
325
326 /* TODO: how to handle errors? This may fail if there is no ipv4 stack
327 * available, or when run on XP/2003 which have no support for dualstack
328 * sockets. For now we're silently ignoring the error. */
329 setsockopt(handle->socket,
330 IPPROTO_IPV6,
331 IPV6_V6ONLY,
332 (const char*)&on,
333 sizeof on);
334 }
335 #endif
336
337 r = bind(handle->socket, addr, addrlen);
338
339 if (r == SOCKET_ERROR) {
340 err = WSAGetLastError();
341 if (err == WSAEADDRINUSE) {
342 /* Some errors are not to be reported until connect() or listen() */
343 handle->delayed_error = err;
344 } else {
345 return err;
346 }
347 }
348
349 handle->flags |= UV_HANDLE_BOUND;
350
351 return 0;
352 }
353
354
post_completion(void * context,BOOLEAN timed_out)355 static void CALLBACK post_completion(void* context, BOOLEAN timed_out) {
356 uv_req_t* req;
357 uv_tcp_t* handle;
358
359 req = (uv_req_t*) context;
360 assert(req != NULL);
361 handle = (uv_tcp_t*)req->data;
362 assert(handle != NULL);
363 assert(!timed_out);
364
365 if (!PostQueuedCompletionStatus(handle->loop->iocp,
366 req->u.io.overlapped.InternalHigh,
367 0,
368 &req->u.io.overlapped)) {
369 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
370 }
371 }
372
373
post_write_completion(void * context,BOOLEAN timed_out)374 static void CALLBACK post_write_completion(void* context, BOOLEAN timed_out) {
375 uv_write_t* req;
376 uv_tcp_t* handle;
377
378 req = (uv_write_t*) context;
379 assert(req != NULL);
380 handle = (uv_tcp_t*)req->handle;
381 assert(handle != NULL);
382 assert(!timed_out);
383
384 if (!PostQueuedCompletionStatus(handle->loop->iocp,
385 req->u.io.overlapped.InternalHigh,
386 0,
387 &req->u.io.overlapped)) {
388 uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
389 }
390 }
391
392
uv_tcp_queue_accept(uv_tcp_t * handle,uv_tcp_accept_t * req)393 static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
394 uv_loop_t* loop = handle->loop;
395 BOOL success;
396 DWORD bytes;
397 SOCKET accept_socket;
398 short family;
399
400 assert(handle->flags & UV_HANDLE_LISTENING);
401 assert(req->accept_socket == INVALID_SOCKET);
402
403 /* choose family and extension function */
404 if (handle->flags & UV_HANDLE_IPV6) {
405 family = AF_INET6;
406 } else {
407 family = AF_INET;
408 }
409
410 /* Open a socket for the accepted connection. */
411 accept_socket = socket(family, SOCK_STREAM, 0);
412 if (accept_socket == INVALID_SOCKET) {
413 SET_REQ_ERROR(req, WSAGetLastError());
414 uv_insert_pending_req(loop, (uv_req_t*)req);
415 handle->reqs_pending++;
416 return;
417 }
418
419 /* Make the socket non-inheritable */
420 if (!SetHandleInformation((HANDLE) accept_socket, HANDLE_FLAG_INHERIT, 0)) {
421 SET_REQ_ERROR(req, GetLastError());
422 uv_insert_pending_req(loop, (uv_req_t*)req);
423 handle->reqs_pending++;
424 closesocket(accept_socket);
425 return;
426 }
427
428 /* Prepare the overlapped structure. */
429 memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
430 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
431 assert(req->event_handle != NULL);
432 req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
433 }
434
435 success = handle->tcp.serv.func_acceptex(handle->socket,
436 accept_socket,
437 (void*)req->accept_buffer,
438 0,
439 sizeof(struct sockaddr_storage),
440 sizeof(struct sockaddr_storage),
441 &bytes,
442 &req->u.io.overlapped);
443
444 if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
445 /* Process the req without IOCP. */
446 req->accept_socket = accept_socket;
447 handle->reqs_pending++;
448 uv_insert_pending_req(loop, (uv_req_t*)req);
449 } else if (UV_SUCCEEDED_WITH_IOCP(success)) {
450 /* The req will be processed with IOCP. */
451 req->accept_socket = accept_socket;
452 handle->reqs_pending++;
453 if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
454 req->wait_handle == INVALID_HANDLE_VALUE &&
455 !RegisterWaitForSingleObject(&req->wait_handle,
456 req->event_handle, post_completion, (void*) req,
457 INFINITE, WT_EXECUTEINWAITTHREAD)) {
458 SET_REQ_ERROR(req, GetLastError());
459 uv_insert_pending_req(loop, (uv_req_t*)req);
460 }
461 } else {
462 /* Make this req pending reporting an error. */
463 SET_REQ_ERROR(req, WSAGetLastError());
464 uv_insert_pending_req(loop, (uv_req_t*)req);
465 handle->reqs_pending++;
466 /* Destroy the preallocated client socket. */
467 closesocket(accept_socket);
468 /* Destroy the event handle */
469 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
470 CloseHandle(req->event_handle);
471 req->event_handle = NULL;
472 }
473 }
474 }
475
476
uv_tcp_queue_read(uv_loop_t * loop,uv_tcp_t * handle)477 static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) {
478 uv_read_t* req;
479 uv_buf_t buf;
480 int result;
481 DWORD bytes, flags;
482
483 assert(handle->flags & UV_HANDLE_READING);
484 assert(!(handle->flags & UV_HANDLE_READ_PENDING));
485
486 req = &handle->read_req;
487 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
488
489 /*
490 * Preallocate a read buffer if the number of active streams is below
491 * the threshold.
492 */
493 if (loop->active_tcp_streams < uv_active_tcp_streams_threshold) {
494 handle->flags &= ~UV_HANDLE_ZERO_READ;
495 handle->tcp.conn.read_buffer = uv_buf_init(NULL, 0);
496 handle->alloc_cb((uv_handle_t*) handle, 65536, &handle->tcp.conn.read_buffer);
497 if (handle->tcp.conn.read_buffer.base == NULL ||
498 handle->tcp.conn.read_buffer.len == 0) {
499 handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &handle->tcp.conn.read_buffer);
500 return;
501 }
502 assert(handle->tcp.conn.read_buffer.base != NULL);
503 buf = handle->tcp.conn.read_buffer;
504 } else {
505 handle->flags |= UV_HANDLE_ZERO_READ;
506 buf.base = (char*) &uv_zero_;
507 buf.len = 0;
508 }
509
510 /* Prepare the overlapped structure. */
511 memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
512 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
513 assert(req->event_handle != NULL);
514 req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
515 }
516
517 flags = 0;
518 result = WSARecv(handle->socket,
519 (WSABUF*)&buf,
520 1,
521 &bytes,
522 &flags,
523 &req->u.io.overlapped,
524 NULL);
525
526 if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
527 /* Process the req without IOCP. */
528 handle->flags |= UV_HANDLE_READ_PENDING;
529 req->u.io.overlapped.InternalHigh = bytes;
530 handle->reqs_pending++;
531 uv_insert_pending_req(loop, (uv_req_t*)req);
532 } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
533 /* The req will be processed with IOCP. */
534 handle->flags |= UV_HANDLE_READ_PENDING;
535 handle->reqs_pending++;
536 if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
537 req->wait_handle == INVALID_HANDLE_VALUE &&
538 !RegisterWaitForSingleObject(&req->wait_handle,
539 req->event_handle, post_completion, (void*) req,
540 INFINITE, WT_EXECUTEINWAITTHREAD)) {
541 SET_REQ_ERROR(req, GetLastError());
542 uv_insert_pending_req(loop, (uv_req_t*)req);
543 }
544 } else {
545 /* Make this req pending reporting an error. */
546 SET_REQ_ERROR(req, WSAGetLastError());
547 uv_insert_pending_req(loop, (uv_req_t*)req);
548 handle->reqs_pending++;
549 }
550 }
551
552
uv_tcp_close_reset(uv_tcp_t * handle,uv_close_cb close_cb)553 int uv_tcp_close_reset(uv_tcp_t* handle, uv_close_cb close_cb) {
554 struct linger l = { 1, 0 };
555
556 /* Disallow setting SO_LINGER to zero due to some platform inconsistencies */
557 if (handle->flags & UV_HANDLE_SHUTTING)
558 return UV_EINVAL;
559
560 if (0 != setsockopt(handle->socket, SOL_SOCKET, SO_LINGER, (const char*)&l, sizeof(l)))
561 return uv_translate_sys_error(WSAGetLastError());
562
563 uv_close((uv_handle_t*) handle, close_cb);
564 return 0;
565 }
566
567
uv_tcp_listen(uv_tcp_t * handle,int backlog,uv_connection_cb cb)568 int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
569 unsigned int i, simultaneous_accepts;
570 uv_tcp_accept_t* req;
571 int err;
572
573 assert(backlog > 0);
574
575 if (handle->flags & UV_HANDLE_LISTENING) {
576 handle->stream.serv.connection_cb = cb;
577 }
578
579 if (handle->flags & UV_HANDLE_READING) {
580 return WSAEISCONN;
581 }
582
583 if (handle->delayed_error) {
584 return handle->delayed_error;
585 }
586
587 if (!(handle->flags & UV_HANDLE_BOUND)) {
588 err = uv_tcp_try_bind(handle,
589 (const struct sockaddr*) &uv_addr_ip4_any_,
590 sizeof(uv_addr_ip4_any_),
591 0);
592 if (err)
593 return err;
594 if (handle->delayed_error)
595 return handle->delayed_error;
596 }
597
598 if (!handle->tcp.serv.func_acceptex) {
599 if (!uv_get_acceptex_function(handle->socket, &handle->tcp.serv.func_acceptex)) {
600 return WSAEAFNOSUPPORT;
601 }
602 }
603
604 if (!(handle->flags & UV_HANDLE_SHARED_TCP_SOCKET) &&
605 listen(handle->socket, backlog) == SOCKET_ERROR) {
606 return WSAGetLastError();
607 }
608
609 handle->flags |= UV_HANDLE_LISTENING;
610 handle->stream.serv.connection_cb = cb;
611 INCREASE_ACTIVE_COUNT(loop, handle);
612
613 simultaneous_accepts = handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT ? 1
614 : uv_simultaneous_server_accepts;
615
616 if (handle->tcp.serv.accept_reqs == NULL) {
617 handle->tcp.serv.accept_reqs =
618 uv__malloc(uv_simultaneous_server_accepts * sizeof(uv_tcp_accept_t));
619 if (!handle->tcp.serv.accept_reqs) {
620 uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
621 }
622
623 for (i = 0; i < simultaneous_accepts; i++) {
624 req = &handle->tcp.serv.accept_reqs[i];
625 UV_REQ_INIT(req, UV_ACCEPT);
626 req->accept_socket = INVALID_SOCKET;
627 req->data = handle;
628
629 req->wait_handle = INVALID_HANDLE_VALUE;
630 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
631 req->event_handle = CreateEvent(NULL, 0, 0, NULL);
632 if (req->event_handle == NULL) {
633 uv_fatal_error(GetLastError(), "CreateEvent");
634 }
635 } else {
636 req->event_handle = NULL;
637 }
638
639 uv_tcp_queue_accept(handle, req);
640 }
641
642 /* Initialize other unused requests too, because uv_tcp_endgame doesn't
643 * know how many requests were initialized, so it will try to clean up
644 * {uv_simultaneous_server_accepts} requests. */
645 for (i = simultaneous_accepts; i < uv_simultaneous_server_accepts; i++) {
646 req = &handle->tcp.serv.accept_reqs[i];
647 UV_REQ_INIT(req, UV_ACCEPT);
648 req->accept_socket = INVALID_SOCKET;
649 req->data = handle;
650 req->wait_handle = INVALID_HANDLE_VALUE;
651 req->event_handle = NULL;
652 }
653 }
654
655 return 0;
656 }
657
658
uv_tcp_accept(uv_tcp_t * server,uv_tcp_t * client)659 int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
660 uv_loop_t* loop = server->loop;
661 int err = 0;
662 int family;
663
664 uv_tcp_accept_t* req = server->tcp.serv.pending_accepts;
665
666 if (!req) {
667 /* No valid connections found, so we error out. */
668 return WSAEWOULDBLOCK;
669 }
670
671 if (req->accept_socket == INVALID_SOCKET) {
672 return WSAENOTCONN;
673 }
674
675 if (server->flags & UV_HANDLE_IPV6) {
676 family = AF_INET6;
677 } else {
678 family = AF_INET;
679 }
680
681 err = uv_tcp_set_socket(client->loop,
682 client,
683 req->accept_socket,
684 family,
685 0);
686 if (err) {
687 closesocket(req->accept_socket);
688 } else {
689 uv_connection_init((uv_stream_t*) client);
690 /* AcceptEx() implicitly binds the accepted socket. */
691 client->flags |= UV_HANDLE_BOUND | UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
692 }
693
694 /* Prepare the req to pick up a new connection */
695 server->tcp.serv.pending_accepts = req->next_pending;
696 req->next_pending = NULL;
697 req->accept_socket = INVALID_SOCKET;
698
699 if (!(server->flags & UV_HANDLE_CLOSING)) {
700 /* Check if we're in a middle of changing the number of pending accepts. */
701 if (!(server->flags & UV_HANDLE_TCP_ACCEPT_STATE_CHANGING)) {
702 uv_tcp_queue_accept(server, req);
703 } else {
704 /* We better be switching to a single pending accept. */
705 assert(server->flags & UV_HANDLE_TCP_SINGLE_ACCEPT);
706
707 server->tcp.serv.processed_accepts++;
708
709 if (server->tcp.serv.processed_accepts >= uv_simultaneous_server_accepts) {
710 server->tcp.serv.processed_accepts = 0;
711 /*
712 * All previously queued accept requests are now processed.
713 * We now switch to queueing just a single accept.
714 */
715 uv_tcp_queue_accept(server, &server->tcp.serv.accept_reqs[0]);
716 server->flags &= ~UV_HANDLE_TCP_ACCEPT_STATE_CHANGING;
717 server->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
718 }
719 }
720 }
721
722 loop->active_tcp_streams++;
723
724 return err;
725 }
726
727
uv_tcp_read_start(uv_tcp_t * handle,uv_alloc_cb alloc_cb,uv_read_cb read_cb)728 int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb,
729 uv_read_cb read_cb) {
730 uv_loop_t* loop = handle->loop;
731
732 handle->flags |= UV_HANDLE_READING;
733 handle->read_cb = read_cb;
734 handle->alloc_cb = alloc_cb;
735 INCREASE_ACTIVE_COUNT(loop, handle);
736
737 /* If reading was stopped and then started again, there could still be a read
738 * request pending. */
739 if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
740 if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
741 handle->read_req.event_handle == NULL) {
742 handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
743 if (handle->read_req.event_handle == NULL) {
744 uv_fatal_error(GetLastError(), "CreateEvent");
745 }
746 }
747 uv_tcp_queue_read(loop, handle);
748 }
749
750 return 0;
751 }
752
753
uv_tcp_try_connect(uv_connect_t * req,uv_tcp_t * handle,const struct sockaddr * addr,unsigned int addrlen,uv_connect_cb cb)754 static int uv_tcp_try_connect(uv_connect_t* req,
755 uv_tcp_t* handle,
756 const struct sockaddr* addr,
757 unsigned int addrlen,
758 uv_connect_cb cb) {
759 uv_loop_t* loop = handle->loop;
760 const struct sockaddr* bind_addr;
761 struct sockaddr_storage converted;
762 BOOL success;
763 DWORD bytes;
764 int err;
765
766 err = uv__convert_to_localhost_if_unspecified(addr, &converted);
767 if (err)
768 return err;
769
770 if (handle->delayed_error) {
771 return handle->delayed_error;
772 }
773
774 if (!(handle->flags & UV_HANDLE_BOUND)) {
775 if (addrlen == sizeof(uv_addr_ip4_any_)) {
776 bind_addr = (const struct sockaddr*) &uv_addr_ip4_any_;
777 } else if (addrlen == sizeof(uv_addr_ip6_any_)) {
778 bind_addr = (const struct sockaddr*) &uv_addr_ip6_any_;
779 } else {
780 abort();
781 }
782 err = uv_tcp_try_bind(handle, bind_addr, addrlen, 0);
783 if (err)
784 return err;
785 if (handle->delayed_error)
786 return handle->delayed_error;
787 }
788
789 if (!handle->tcp.conn.func_connectex) {
790 if (!uv_get_connectex_function(handle->socket, &handle->tcp.conn.func_connectex)) {
791 return WSAEAFNOSUPPORT;
792 }
793 }
794
795 UV_REQ_INIT(req, UV_CONNECT);
796 req->handle = (uv_stream_t*) handle;
797 req->cb = cb;
798 memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
799
800 success = handle->tcp.conn.func_connectex(handle->socket,
801 (const struct sockaddr*) &converted,
802 addrlen,
803 NULL,
804 0,
805 &bytes,
806 &req->u.io.overlapped);
807
808 if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
809 /* Process the req without IOCP. */
810 handle->reqs_pending++;
811 REGISTER_HANDLE_REQ(loop, handle, req);
812 uv_insert_pending_req(loop, (uv_req_t*)req);
813 } else if (UV_SUCCEEDED_WITH_IOCP(success)) {
814 /* The req will be processed with IOCP. */
815 handle->reqs_pending++;
816 REGISTER_HANDLE_REQ(loop, handle, req);
817 } else {
818 return WSAGetLastError();
819 }
820
821 return 0;
822 }
823
824
uv_tcp_getsockname(const uv_tcp_t * handle,struct sockaddr * name,int * namelen)825 int uv_tcp_getsockname(const uv_tcp_t* handle,
826 struct sockaddr* name,
827 int* namelen) {
828
829 return uv__getsockpeername((const uv_handle_t*) handle,
830 getsockname,
831 name,
832 namelen,
833 handle->delayed_error);
834 }
835
836
uv_tcp_getpeername(const uv_tcp_t * handle,struct sockaddr * name,int * namelen)837 int uv_tcp_getpeername(const uv_tcp_t* handle,
838 struct sockaddr* name,
839 int* namelen) {
840
841 return uv__getsockpeername((const uv_handle_t*) handle,
842 getpeername,
843 name,
844 namelen,
845 handle->delayed_error);
846 }
847
848
uv_tcp_write(uv_loop_t * loop,uv_write_t * req,uv_tcp_t * handle,const uv_buf_t bufs[],unsigned int nbufs,uv_write_cb cb)849 int uv_tcp_write(uv_loop_t* loop,
850 uv_write_t* req,
851 uv_tcp_t* handle,
852 const uv_buf_t bufs[],
853 unsigned int nbufs,
854 uv_write_cb cb) {
855 int result;
856 DWORD bytes;
857
858 UV_REQ_INIT(req, UV_WRITE);
859 req->handle = (uv_stream_t*) handle;
860 req->cb = cb;
861
862 /* Prepare the overlapped structure. */
863 memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
864 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
865 req->event_handle = CreateEvent(NULL, 0, 0, NULL);
866 if (req->event_handle == NULL) {
867 uv_fatal_error(GetLastError(), "CreateEvent");
868 }
869 req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
870 req->wait_handle = INVALID_HANDLE_VALUE;
871 }
872
873 result = WSASend(handle->socket,
874 (WSABUF*) bufs,
875 nbufs,
876 &bytes,
877 0,
878 &req->u.io.overlapped,
879 NULL);
880
881 if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
882 /* Request completed immediately. */
883 req->u.io.queued_bytes = 0;
884 handle->reqs_pending++;
885 handle->stream.conn.write_reqs_pending++;
886 REGISTER_HANDLE_REQ(loop, handle, req);
887 uv_insert_pending_req(loop, (uv_req_t*) req);
888 } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
889 /* Request queued by the kernel. */
890 req->u.io.queued_bytes = uv__count_bufs(bufs, nbufs);
891 handle->reqs_pending++;
892 handle->stream.conn.write_reqs_pending++;
893 REGISTER_HANDLE_REQ(loop, handle, req);
894 handle->write_queue_size += req->u.io.queued_bytes;
895 if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
896 !RegisterWaitForSingleObject(&req->wait_handle,
897 req->event_handle, post_write_completion, (void*) req,
898 INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
899 SET_REQ_ERROR(req, GetLastError());
900 uv_insert_pending_req(loop, (uv_req_t*)req);
901 }
902 } else {
903 /* Send failed due to an error, report it later */
904 req->u.io.queued_bytes = 0;
905 handle->reqs_pending++;
906 handle->stream.conn.write_reqs_pending++;
907 REGISTER_HANDLE_REQ(loop, handle, req);
908 SET_REQ_ERROR(req, WSAGetLastError());
909 uv_insert_pending_req(loop, (uv_req_t*) req);
910 }
911
912 return 0;
913 }
914
915
uv__tcp_try_write(uv_tcp_t * handle,const uv_buf_t bufs[],unsigned int nbufs)916 int uv__tcp_try_write(uv_tcp_t* handle,
917 const uv_buf_t bufs[],
918 unsigned int nbufs) {
919 int result;
920 DWORD bytes;
921
922 if (handle->stream.conn.write_reqs_pending > 0)
923 return UV_EAGAIN;
924
925 result = WSASend(handle->socket,
926 (WSABUF*) bufs,
927 nbufs,
928 &bytes,
929 0,
930 NULL,
931 NULL);
932
933 if (result == SOCKET_ERROR)
934 return uv_translate_sys_error(WSAGetLastError());
935 else
936 return bytes;
937 }
938
939
uv_process_tcp_read_req(uv_loop_t * loop,uv_tcp_t * handle,uv_req_t * req)940 void uv_process_tcp_read_req(uv_loop_t* loop, uv_tcp_t* handle,
941 uv_req_t* req) {
942 DWORD bytes, flags, err;
943 uv_buf_t buf;
944 int count;
945
946 assert(handle->type == UV_TCP);
947
948 handle->flags &= ~UV_HANDLE_READ_PENDING;
949
950 if (!REQ_SUCCESS(req)) {
951 /* An error occurred doing the read. */
952 if ((handle->flags & UV_HANDLE_READING) ||
953 !(handle->flags & UV_HANDLE_ZERO_READ)) {
954 handle->flags &= ~UV_HANDLE_READING;
955 DECREASE_ACTIVE_COUNT(loop, handle);
956 buf = (handle->flags & UV_HANDLE_ZERO_READ) ?
957 uv_buf_init(NULL, 0) : handle->tcp.conn.read_buffer;
958
959 err = GET_REQ_SOCK_ERROR(req);
960
961 if (err == WSAECONNABORTED) {
962 /* Turn WSAECONNABORTED into UV_ECONNRESET to be consistent with Unix.
963 */
964 err = WSAECONNRESET;
965 }
966
967 handle->read_cb((uv_stream_t*)handle,
968 uv_translate_sys_error(err),
969 &buf);
970 }
971 } else {
972 if (!(handle->flags & UV_HANDLE_ZERO_READ)) {
973 /* The read was done with a non-zero buffer length. */
974 if (req->u.io.overlapped.InternalHigh > 0) {
975 /* Successful read */
976 handle->read_cb((uv_stream_t*)handle,
977 req->u.io.overlapped.InternalHigh,
978 &handle->tcp.conn.read_buffer);
979 /* Read again only if bytes == buf.len */
980 if (req->u.io.overlapped.InternalHigh < handle->tcp.conn.read_buffer.len) {
981 goto done;
982 }
983 } else {
984 /* Connection closed */
985 if (handle->flags & UV_HANDLE_READING) {
986 handle->flags &= ~UV_HANDLE_READING;
987 DECREASE_ACTIVE_COUNT(loop, handle);
988 }
989 handle->flags &= ~UV_HANDLE_READABLE;
990
991 buf.base = 0;
992 buf.len = 0;
993 handle->read_cb((uv_stream_t*)handle, UV_EOF, &handle->tcp.conn.read_buffer);
994 goto done;
995 }
996 }
997
998 /* Do nonblocking reads until the buffer is empty */
999 count = 32;
1000 while ((handle->flags & UV_HANDLE_READING) && (count-- > 0)) {
1001 buf = uv_buf_init(NULL, 0);
1002 handle->alloc_cb((uv_handle_t*) handle, 65536, &buf);
1003 if (buf.base == NULL || buf.len == 0) {
1004 handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1005 break;
1006 }
1007 assert(buf.base != NULL);
1008
1009 flags = 0;
1010 if (WSARecv(handle->socket,
1011 (WSABUF*)&buf,
1012 1,
1013 &bytes,
1014 &flags,
1015 NULL,
1016 NULL) != SOCKET_ERROR) {
1017 if (bytes > 0) {
1018 /* Successful read */
1019 handle->read_cb((uv_stream_t*)handle, bytes, &buf);
1020 /* Read again only if bytes == buf.len */
1021 if (bytes < buf.len) {
1022 break;
1023 }
1024 } else {
1025 /* Connection closed */
1026 handle->flags &= ~(UV_HANDLE_READING | UV_HANDLE_READABLE);
1027 DECREASE_ACTIVE_COUNT(loop, handle);
1028
1029 handle->read_cb((uv_stream_t*)handle, UV_EOF, &buf);
1030 break;
1031 }
1032 } else {
1033 err = WSAGetLastError();
1034 if (err == WSAEWOULDBLOCK) {
1035 /* Read buffer was completely empty, report a 0-byte read. */
1036 handle->read_cb((uv_stream_t*)handle, 0, &buf);
1037 } else {
1038 /* Ouch! serious error. */
1039 handle->flags &= ~UV_HANDLE_READING;
1040 DECREASE_ACTIVE_COUNT(loop, handle);
1041
1042 if (err == WSAECONNABORTED) {
1043 /* Turn WSAECONNABORTED into UV_ECONNRESET to be consistent with
1044 * Unix. */
1045 err = WSAECONNRESET;
1046 }
1047
1048 handle->read_cb((uv_stream_t*)handle,
1049 uv_translate_sys_error(err),
1050 &buf);
1051 }
1052 break;
1053 }
1054 }
1055
1056 done:
1057 /* Post another read if still reading and not closing. */
1058 if ((handle->flags & UV_HANDLE_READING) &&
1059 !(handle->flags & UV_HANDLE_READ_PENDING)) {
1060 uv_tcp_queue_read(loop, handle);
1061 }
1062 }
1063
1064 DECREASE_PENDING_REQ_COUNT(handle);
1065 }
1066
1067
uv_process_tcp_write_req(uv_loop_t * loop,uv_tcp_t * handle,uv_write_t * req)1068 void uv_process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,
1069 uv_write_t* req) {
1070 int err;
1071
1072 assert(handle->type == UV_TCP);
1073
1074 assert(handle->write_queue_size >= req->u.io.queued_bytes);
1075 handle->write_queue_size -= req->u.io.queued_bytes;
1076
1077 UNREGISTER_HANDLE_REQ(loop, handle, req);
1078
1079 if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1080 if (req->wait_handle != INVALID_HANDLE_VALUE) {
1081 UnregisterWait(req->wait_handle);
1082 req->wait_handle = INVALID_HANDLE_VALUE;
1083 }
1084 if (req->event_handle != NULL) {
1085 CloseHandle(req->event_handle);
1086 req->event_handle = NULL;
1087 }
1088 }
1089
1090 if (req->cb) {
1091 err = uv_translate_sys_error(GET_REQ_SOCK_ERROR(req));
1092 if (err == UV_ECONNABORTED) {
1093 /* use UV_ECANCELED for consistency with Unix */
1094 err = UV_ECANCELED;
1095 }
1096 req->cb(req, err);
1097 }
1098
1099 handle->stream.conn.write_reqs_pending--;
1100 if (handle->stream.conn.shutdown_req != NULL &&
1101 handle->stream.conn.write_reqs_pending == 0) {
1102 uv_want_endgame(loop, (uv_handle_t*)handle);
1103 }
1104
1105 DECREASE_PENDING_REQ_COUNT(handle);
1106 }
1107
1108
uv_process_tcp_accept_req(uv_loop_t * loop,uv_tcp_t * handle,uv_req_t * raw_req)1109 void uv_process_tcp_accept_req(uv_loop_t* loop, uv_tcp_t* handle,
1110 uv_req_t* raw_req) {
1111 uv_tcp_accept_t* req = (uv_tcp_accept_t*) raw_req;
1112 int err;
1113
1114 assert(handle->type == UV_TCP);
1115
1116 /* If handle->accepted_socket is not a valid socket, then uv_queue_accept
1117 * must have failed. This is a serious error. We stop accepting connections
1118 * and report this error to the connection callback. */
1119 if (req->accept_socket == INVALID_SOCKET) {
1120 if (handle->flags & UV_HANDLE_LISTENING) {
1121 handle->flags &= ~UV_HANDLE_LISTENING;
1122 DECREASE_ACTIVE_COUNT(loop, handle);
1123 if (handle->stream.serv.connection_cb) {
1124 err = GET_REQ_SOCK_ERROR(req);
1125 handle->stream.serv.connection_cb((uv_stream_t*)handle,
1126 uv_translate_sys_error(err));
1127 }
1128 }
1129 } else if (REQ_SUCCESS(req) &&
1130 setsockopt(req->accept_socket,
1131 SOL_SOCKET,
1132 SO_UPDATE_ACCEPT_CONTEXT,
1133 (char*)&handle->socket,
1134 sizeof(handle->socket)) == 0) {
1135 req->next_pending = handle->tcp.serv.pending_accepts;
1136 handle->tcp.serv.pending_accepts = req;
1137
1138 /* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */
1139 if (handle->stream.serv.connection_cb) {
1140 handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
1141 }
1142 } else {
1143 /* Error related to accepted socket is ignored because the server socket
1144 * may still be healthy. If the server socket is broken uv_queue_accept
1145 * will detect it. */
1146 closesocket(req->accept_socket);
1147 req->accept_socket = INVALID_SOCKET;
1148 if (handle->flags & UV_HANDLE_LISTENING) {
1149 uv_tcp_queue_accept(handle, req);
1150 }
1151 }
1152
1153 DECREASE_PENDING_REQ_COUNT(handle);
1154 }
1155
1156
uv_process_tcp_connect_req(uv_loop_t * loop,uv_tcp_t * handle,uv_connect_t * req)1157 void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
1158 uv_connect_t* req) {
1159 int err;
1160
1161 assert(handle->type == UV_TCP);
1162
1163 UNREGISTER_HANDLE_REQ(loop, handle, req);
1164
1165 err = 0;
1166 if (REQ_SUCCESS(req)) {
1167 if (handle->flags & UV_HANDLE_CLOSING) {
1168 /* use UV_ECANCELED for consistency with Unix */
1169 err = ERROR_OPERATION_ABORTED;
1170 } else if (setsockopt(handle->socket,
1171 SOL_SOCKET,
1172 SO_UPDATE_CONNECT_CONTEXT,
1173 NULL,
1174 0) == 0) {
1175 uv_connection_init((uv_stream_t*)handle);
1176 handle->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1177 loop->active_tcp_streams++;
1178 } else {
1179 err = WSAGetLastError();
1180 }
1181 } else {
1182 err = GET_REQ_SOCK_ERROR(req);
1183 }
1184 req->cb(req, uv_translate_sys_error(err));
1185
1186 DECREASE_PENDING_REQ_COUNT(handle);
1187 }
1188
1189
uv__tcp_xfer_export(uv_tcp_t * handle,int target_pid,uv__ipc_socket_xfer_type_t * xfer_type,uv__ipc_socket_xfer_info_t * xfer_info)1190 int uv__tcp_xfer_export(uv_tcp_t* handle,
1191 int target_pid,
1192 uv__ipc_socket_xfer_type_t* xfer_type,
1193 uv__ipc_socket_xfer_info_t* xfer_info) {
1194 if (handle->flags & UV_HANDLE_CONNECTION) {
1195 *xfer_type = UV__IPC_SOCKET_XFER_TCP_CONNECTION;
1196 } else {
1197 *xfer_type = UV__IPC_SOCKET_XFER_TCP_SERVER;
1198 /* We're about to share the socket with another process. Because this is a
1199 * listening socket, we assume that the other process will be accepting
1200 * connections on it. Thus, before sharing the socket with another process,
1201 * we call listen here in the parent process. */
1202 if (!(handle->flags & UV_HANDLE_LISTENING)) {
1203 if (!(handle->flags & UV_HANDLE_BOUND)) {
1204 return ERROR_NOT_SUPPORTED;
1205 }
1206 if (handle->delayed_error == 0 &&
1207 listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) {
1208 handle->delayed_error = WSAGetLastError();
1209 }
1210 }
1211 }
1212
1213 if (WSADuplicateSocketW(handle->socket, target_pid, &xfer_info->socket_info))
1214 return WSAGetLastError();
1215 xfer_info->delayed_error = handle->delayed_error;
1216
1217 /* Mark the local copy of the handle as 'shared' so we behave in a way that's
1218 * friendly to the process(es) that we share the socket with. */
1219 handle->flags |= UV_HANDLE_SHARED_TCP_SOCKET;
1220
1221 return 0;
1222 }
1223
1224
uv__tcp_xfer_import(uv_tcp_t * tcp,uv__ipc_socket_xfer_type_t xfer_type,uv__ipc_socket_xfer_info_t * xfer_info)1225 int uv__tcp_xfer_import(uv_tcp_t* tcp,
1226 uv__ipc_socket_xfer_type_t xfer_type,
1227 uv__ipc_socket_xfer_info_t* xfer_info) {
1228 int err;
1229 SOCKET socket;
1230
1231 assert(xfer_type == UV__IPC_SOCKET_XFER_TCP_SERVER ||
1232 xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION);
1233
1234 socket = WSASocketW(FROM_PROTOCOL_INFO,
1235 FROM_PROTOCOL_INFO,
1236 FROM_PROTOCOL_INFO,
1237 &xfer_info->socket_info,
1238 0,
1239 WSA_FLAG_OVERLAPPED);
1240
1241 if (socket == INVALID_SOCKET) {
1242 return WSAGetLastError();
1243 }
1244
1245 err = uv_tcp_set_socket(
1246 tcp->loop, tcp, socket, xfer_info->socket_info.iAddressFamily, 1);
1247 if (err) {
1248 closesocket(socket);
1249 return err;
1250 }
1251
1252 tcp->delayed_error = xfer_info->delayed_error;
1253 tcp->flags |= UV_HANDLE_BOUND | UV_HANDLE_SHARED_TCP_SOCKET;
1254
1255 if (xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION) {
1256 uv_connection_init((uv_stream_t*)tcp);
1257 tcp->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1258 }
1259
1260 tcp->loop->active_tcp_streams++;
1261 return 0;
1262 }
1263
1264
uv_tcp_nodelay(uv_tcp_t * handle,int enable)1265 int uv_tcp_nodelay(uv_tcp_t* handle, int enable) {
1266 int err;
1267
1268 if (handle->socket != INVALID_SOCKET) {
1269 err = uv__tcp_nodelay(handle, handle->socket, enable);
1270 if (err)
1271 return err;
1272 }
1273
1274 if (enable) {
1275 handle->flags |= UV_HANDLE_TCP_NODELAY;
1276 } else {
1277 handle->flags &= ~UV_HANDLE_TCP_NODELAY;
1278 }
1279
1280 return 0;
1281 }
1282
1283
uv_tcp_keepalive(uv_tcp_t * handle,int enable,unsigned int delay)1284 int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) {
1285 int err;
1286
1287 if (handle->socket != INVALID_SOCKET) {
1288 err = uv__tcp_keepalive(handle, handle->socket, enable, delay);
1289 if (err)
1290 return err;
1291 }
1292
1293 if (enable) {
1294 handle->flags |= UV_HANDLE_TCP_KEEPALIVE;
1295 } else {
1296 handle->flags &= ~UV_HANDLE_TCP_KEEPALIVE;
1297 }
1298
1299 /* TODO: Store delay if handle->socket isn't created yet. */
1300
1301 return 0;
1302 }
1303
1304
uv_tcp_simultaneous_accepts(uv_tcp_t * handle,int enable)1305 int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) {
1306 if (handle->flags & UV_HANDLE_CONNECTION) {
1307 return UV_EINVAL;
1308 }
1309
1310 /* Check if we're already in the desired mode. */
1311 if ((enable && !(handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) ||
1312 (!enable && handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
1313 return 0;
1314 }
1315
1316 /* Don't allow switching from single pending accept to many. */
1317 if (enable) {
1318 return UV_ENOTSUP;
1319 }
1320
1321 /* Check if we're in a middle of changing the number of pending accepts. */
1322 if (handle->flags & UV_HANDLE_TCP_ACCEPT_STATE_CHANGING) {
1323 return 0;
1324 }
1325
1326 handle->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
1327
1328 /* Flip the changing flag if we have already queued multiple accepts. */
1329 if (handle->flags & UV_HANDLE_LISTENING) {
1330 handle->flags |= UV_HANDLE_TCP_ACCEPT_STATE_CHANGING;
1331 }
1332
1333 return 0;
1334 }
1335
1336
uv_tcp_try_cancel_io(uv_tcp_t * tcp)1337 static int uv_tcp_try_cancel_io(uv_tcp_t* tcp) {
1338 SOCKET socket = tcp->socket;
1339 int non_ifs_lsp;
1340
1341 /* Check if we have any non-IFS LSPs stacked on top of TCP */
1342 non_ifs_lsp = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_non_ifs_lsp_ipv6 :
1343 uv_tcp_non_ifs_lsp_ipv4;
1344
1345 /* If there are non-ifs LSPs then try to obtain a base handle for the socket.
1346 * This will always fail on Windows XP/3k. */
1347 if (non_ifs_lsp) {
1348 DWORD bytes;
1349 if (WSAIoctl(socket,
1350 SIO_BASE_HANDLE,
1351 NULL,
1352 0,
1353 &socket,
1354 sizeof socket,
1355 &bytes,
1356 NULL,
1357 NULL) != 0) {
1358 /* Failed. We can't do CancelIo. */
1359 return -1;
1360 }
1361 }
1362
1363 assert(socket != 0 && socket != INVALID_SOCKET);
1364
1365 if (!CancelIo((HANDLE) socket)) {
1366 return GetLastError();
1367 }
1368
1369 /* It worked. */
1370 return 0;
1371 }
1372
1373
uv_tcp_close(uv_loop_t * loop,uv_tcp_t * tcp)1374 void uv_tcp_close(uv_loop_t* loop, uv_tcp_t* tcp) {
1375 int close_socket = 1;
1376
1377 if (tcp->flags & UV_HANDLE_READ_PENDING) {
1378 /* In order for winsock to do a graceful close there must not be any any
1379 * pending reads, or the socket must be shut down for writing */
1380 if (!(tcp->flags & UV_HANDLE_SHARED_TCP_SOCKET)) {
1381 /* Just do shutdown on non-shared sockets, which ensures graceful close. */
1382 shutdown(tcp->socket, SD_SEND);
1383
1384 } else if (uv_tcp_try_cancel_io(tcp) == 0) {
1385 /* In case of a shared socket, we try to cancel all outstanding I/O,. If
1386 * that works, don't close the socket yet - wait for the read req to
1387 * return and close the socket in uv_tcp_endgame. */
1388 close_socket = 0;
1389
1390 } else {
1391 /* When cancelling isn't possible - which could happen when an LSP is
1392 * present on an old Windows version, we will have to close the socket
1393 * with a read pending. That is not nice because trailing sent bytes may
1394 * not make it to the other side. */
1395 }
1396
1397 } else if ((tcp->flags & UV_HANDLE_SHARED_TCP_SOCKET) &&
1398 tcp->tcp.serv.accept_reqs != NULL) {
1399 /* Under normal circumstances closesocket() will ensure that all pending
1400 * accept reqs are canceled. However, when the socket is shared the
1401 * presence of another reference to the socket in another process will keep
1402 * the accept reqs going, so we have to ensure that these are canceled. */
1403 if (uv_tcp_try_cancel_io(tcp) != 0) {
1404 /* When cancellation is not possible, there is another option: we can
1405 * close the incoming sockets, which will also cancel the accept
1406 * operations. However this is not cool because we might inadvertently
1407 * close a socket that just accepted a new connection, which will cause
1408 * the connection to be aborted. */
1409 unsigned int i;
1410 for (i = 0; i < uv_simultaneous_server_accepts; i++) {
1411 uv_tcp_accept_t* req = &tcp->tcp.serv.accept_reqs[i];
1412 if (req->accept_socket != INVALID_SOCKET &&
1413 !HasOverlappedIoCompleted(&req->u.io.overlapped)) {
1414 closesocket(req->accept_socket);
1415 req->accept_socket = INVALID_SOCKET;
1416 }
1417 }
1418 }
1419 }
1420
1421 if (tcp->flags & UV_HANDLE_READING) {
1422 tcp->flags &= ~UV_HANDLE_READING;
1423 DECREASE_ACTIVE_COUNT(loop, tcp);
1424 }
1425
1426 if (tcp->flags & UV_HANDLE_LISTENING) {
1427 tcp->flags &= ~UV_HANDLE_LISTENING;
1428 DECREASE_ACTIVE_COUNT(loop, tcp);
1429 }
1430
1431 if (close_socket) {
1432 closesocket(tcp->socket);
1433 tcp->socket = INVALID_SOCKET;
1434 tcp->flags |= UV_HANDLE_TCP_SOCKET_CLOSED;
1435 }
1436
1437 tcp->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1438 uv__handle_closing(tcp);
1439
1440 if (tcp->reqs_pending == 0) {
1441 uv_want_endgame(tcp->loop, (uv_handle_t*)tcp);
1442 }
1443 }
1444
1445
uv_tcp_open(uv_tcp_t * handle,uv_os_sock_t sock)1446 int uv_tcp_open(uv_tcp_t* handle, uv_os_sock_t sock) {
1447 WSAPROTOCOL_INFOW protocol_info;
1448 int opt_len;
1449 int err;
1450 struct sockaddr_storage saddr;
1451 int saddr_len;
1452
1453 /* Detect the address family of the socket. */
1454 opt_len = (int) sizeof protocol_info;
1455 if (getsockopt(sock,
1456 SOL_SOCKET,
1457 SO_PROTOCOL_INFOW,
1458 (char*) &protocol_info,
1459 &opt_len) == SOCKET_ERROR) {
1460 return uv_translate_sys_error(GetLastError());
1461 }
1462
1463 err = uv_tcp_set_socket(handle->loop,
1464 handle,
1465 sock,
1466 protocol_info.iAddressFamily,
1467 1);
1468 if (err) {
1469 return uv_translate_sys_error(err);
1470 }
1471
1472 /* Support already active socket. */
1473 saddr_len = sizeof(saddr);
1474 if (!uv_tcp_getsockname(handle, (struct sockaddr*) &saddr, &saddr_len)) {
1475 /* Socket is already bound. */
1476 handle->flags |= UV_HANDLE_BOUND;
1477 saddr_len = sizeof(saddr);
1478 if (!uv_tcp_getpeername(handle, (struct sockaddr*) &saddr, &saddr_len)) {
1479 /* Socket is already connected. */
1480 uv_connection_init((uv_stream_t*) handle);
1481 handle->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1482 }
1483 }
1484
1485 return 0;
1486 }
1487
1488
1489 /* This function is an egress point, i.e. it returns libuv errors rather than
1490 * system errors.
1491 */
uv__tcp_bind(uv_tcp_t * handle,const struct sockaddr * addr,unsigned int addrlen,unsigned int flags)1492 int uv__tcp_bind(uv_tcp_t* handle,
1493 const struct sockaddr* addr,
1494 unsigned int addrlen,
1495 unsigned int flags) {
1496 int err;
1497
1498 err = uv_tcp_try_bind(handle, addr, addrlen, flags);
1499 if (err)
1500 return uv_translate_sys_error(err);
1501
1502 return 0;
1503 }
1504
1505
1506 /* This function is an egress point, i.e. it returns libuv errors rather than
1507 * system errors.
1508 */
uv__tcp_connect(uv_connect_t * req,uv_tcp_t * handle,const struct sockaddr * addr,unsigned int addrlen,uv_connect_cb cb)1509 int uv__tcp_connect(uv_connect_t* req,
1510 uv_tcp_t* handle,
1511 const struct sockaddr* addr,
1512 unsigned int addrlen,
1513 uv_connect_cb cb) {
1514 int err;
1515
1516 err = uv_tcp_try_connect(req, handle, addr, addrlen, cb);
1517 if (err)
1518 return uv_translate_sys_error(err);
1519
1520 return 0;
1521 }
1522