1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_WINSOCK_SOCKET
24
25 #include "src/core/lib/iomgr/sockaddr.h"
26
27 #include <inttypes.h>
28 #include <io.h>
29
30 #include <vector>
31
32 #include "absl/strings/str_cat.h"
33
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/log_windows.h>
37 #include <grpc/support/string_util.h>
38 #include <grpc/support/sync.h>
39 #include <grpc/support/time.h>
40
41 #include "src/core/lib/channel/channel_args.h"
42 #include "src/core/lib/iomgr/iocp_windows.h"
43 #include "src/core/lib/iomgr/pollset_windows.h"
44 #include "src/core/lib/iomgr/resolve_address.h"
45 #include "src/core/lib/iomgr/sockaddr_utils.h"
46 #include "src/core/lib/iomgr/socket_windows.h"
47 #include "src/core/lib/iomgr/tcp_server.h"
48 #include "src/core/lib/iomgr/tcp_windows.h"
49 #include "src/core/lib/slice/slice_internal.h"
50
51 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
52
53 /* one listening port */
54 typedef struct grpc_tcp_listener grpc_tcp_listener;
55 struct grpc_tcp_listener {
56 /* This seemingly magic number comes from AcceptEx's documentation. each
57 address buffer needs to have at least 16 more bytes at their end. */
58 uint8_t addresses[(sizeof(grpc_sockaddr_in6) + 16) * 2];
59 /* This will hold the socket for the next accept. */
60 SOCKET new_socket;
61 /* The listener winsocket. */
62 grpc_winsocket* socket;
63 /* The actual TCP port number. */
64 int port;
65 unsigned port_index;
66 grpc_tcp_server* server;
67 /* The cached AcceptEx for that port. */
68 LPFN_ACCEPTEX AcceptEx;
69 int shutting_down;
70 int outstanding_calls;
71 /* closure for socket notification of accept being ready */
72 grpc_closure on_accept;
73 /* linked list */
74 struct grpc_tcp_listener* next;
75 };
76
77 /* the overall server */
78 struct grpc_tcp_server {
79 gpr_refcount refs;
80 /* Called whenever accept() succeeds on a server port. */
81 grpc_tcp_server_cb on_accept_cb;
82 void* on_accept_cb_arg;
83
84 gpr_mu mu;
85
86 /* active port count: how many ports are actually still listening */
87 int active_ports;
88
89 /* linked list of server ports */
90 grpc_tcp_listener* head;
91 grpc_tcp_listener* tail;
92
93 /* List of closures passed to shutdown_starting_add(). */
94 grpc_closure_list shutdown_starting;
95
96 /* shutdown callback */
97 grpc_closure* shutdown_complete;
98
99 grpc_channel_args* channel_args;
100 };
101
102 /* Public function. Allocates the proper data structures to hold a
103 grpc_tcp_server. */
tcp_server_create(grpc_closure * shutdown_complete,const grpc_channel_args * args,grpc_tcp_server ** server)104 static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
105 const grpc_channel_args* args,
106 grpc_tcp_server** server) {
107 grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
108 s->channel_args = grpc_channel_args_copy(args);
109 gpr_ref_init(&s->refs, 1);
110 gpr_mu_init(&s->mu);
111 s->active_ports = 0;
112 s->on_accept_cb = NULL;
113 s->on_accept_cb_arg = NULL;
114 s->head = NULL;
115 s->tail = NULL;
116 s->shutdown_starting.head = NULL;
117 s->shutdown_starting.tail = NULL;
118 s->shutdown_complete = shutdown_complete;
119 *server = s;
120 return GRPC_ERROR_NONE;
121 }
122
destroy_server(void * arg,grpc_error * error)123 static void destroy_server(void* arg, grpc_error* error) {
124 grpc_tcp_server* s = (grpc_tcp_server*)arg;
125
126 /* Now that the accepts have been aborted, we can destroy the sockets.
127 The IOCP won't get notified on these, so we can flag them as already
128 closed by the system. */
129 while (s->head) {
130 grpc_tcp_listener* sp = s->head;
131 s->head = sp->next;
132 sp->next = NULL;
133 grpc_winsocket_destroy(sp->socket);
134 gpr_free(sp);
135 }
136 grpc_channel_args_destroy(s->channel_args);
137 gpr_mu_destroy(&s->mu);
138 gpr_free(s);
139 }
140
finish_shutdown_locked(grpc_tcp_server * s)141 static void finish_shutdown_locked(grpc_tcp_server* s) {
142 if (s->shutdown_complete != NULL) {
143 grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
144 GRPC_ERROR_NONE);
145 }
146
147 grpc_core::ExecCtx::Run(
148 DEBUG_LOCATION,
149 GRPC_CLOSURE_CREATE(destroy_server, s, grpc_schedule_on_exec_ctx),
150 GRPC_ERROR_NONE);
151 }
152
tcp_server_ref(grpc_tcp_server * s)153 static grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
154 gpr_ref_non_zero(&s->refs);
155 return s;
156 }
157
tcp_server_shutdown_starting_add(grpc_tcp_server * s,grpc_closure * shutdown_starting)158 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
159 grpc_closure* shutdown_starting) {
160 gpr_mu_lock(&s->mu);
161 grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
162 GRPC_ERROR_NONE);
163 gpr_mu_unlock(&s->mu);
164 }
165
tcp_server_destroy(grpc_tcp_server * s)166 static void tcp_server_destroy(grpc_tcp_server* s) {
167 grpc_tcp_listener* sp;
168 gpr_mu_lock(&s->mu);
169
170 /* First, shutdown all fd's. This will queue abortion calls for all
171 of the pending accepts due to the normal operation mechanism. */
172 if (s->active_ports == 0) {
173 finish_shutdown_locked(s);
174 } else {
175 for (sp = s->head; sp; sp = sp->next) {
176 sp->shutting_down = 1;
177 grpc_winsocket_shutdown(sp->socket);
178 }
179 }
180 gpr_mu_unlock(&s->mu);
181 }
182
tcp_server_unref(grpc_tcp_server * s)183 static void tcp_server_unref(grpc_tcp_server* s) {
184 if (gpr_unref(&s->refs)) {
185 grpc_tcp_server_shutdown_listeners(s);
186 gpr_mu_lock(&s->mu);
187 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
188 gpr_mu_unlock(&s->mu);
189 tcp_server_destroy(s);
190 }
191 }
192
193 /* Prepare (bind) a recently-created socket for listening. */
prepare_socket(SOCKET sock,const grpc_resolved_address * addr,int * port)194 static grpc_error* prepare_socket(SOCKET sock,
195 const grpc_resolved_address* addr,
196 int* port) {
197 grpc_resolved_address sockname_temp;
198 grpc_error* error = GRPC_ERROR_NONE;
199 int sockname_temp_len;
200
201 error = grpc_tcp_prepare_socket(sock);
202 if (error != GRPC_ERROR_NONE) {
203 goto failure;
204 }
205
206 if (bind(sock, (const grpc_sockaddr*)addr->addr, (int)addr->len) ==
207 SOCKET_ERROR) {
208 error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
209 goto failure;
210 }
211
212 if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
213 error = GRPC_WSA_ERROR(WSAGetLastError(), "listen");
214 goto failure;
215 }
216
217 sockname_temp_len = sizeof(struct sockaddr_storage);
218 if (getsockname(sock, (grpc_sockaddr*)sockname_temp.addr,
219 &sockname_temp_len) == SOCKET_ERROR) {
220 error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
221 goto failure;
222 }
223 sockname_temp.len = (size_t)sockname_temp_len;
224
225 *port = grpc_sockaddr_get_port(&sockname_temp);
226 return GRPC_ERROR_NONE;
227
228 failure:
229 GPR_ASSERT(error != GRPC_ERROR_NONE);
230 grpc_error_set_int(
231 grpc_error_set_str(
232 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
233 "Failed to prepare server socket", &error, 1),
234 GRPC_ERROR_STR_TARGET_ADDRESS,
235 grpc_slice_from_cpp_string(grpc_sockaddr_to_uri(addr))),
236 GRPC_ERROR_INT_FD, (intptr_t)sock);
237 GRPC_ERROR_UNREF(error);
238 if (sock != INVALID_SOCKET) closesocket(sock);
239 return error;
240 }
241
decrement_active_ports_and_notify_locked(grpc_tcp_listener * sp)242 static void decrement_active_ports_and_notify_locked(grpc_tcp_listener* sp) {
243 sp->shutting_down = 0;
244 GPR_ASSERT(sp->server->active_ports > 0);
245 if (0 == --sp->server->active_ports) {
246 finish_shutdown_locked(sp->server);
247 }
248 }
249
250 /* In order to do an async accept, we need to create a socket first which
251 will be the one assigned to the new incoming connection. */
start_accept_locked(grpc_tcp_listener * port)252 static grpc_error* start_accept_locked(grpc_tcp_listener* port) {
253 SOCKET sock = INVALID_SOCKET;
254 BOOL success;
255 DWORD addrlen = sizeof(grpc_sockaddr_in6) + 16;
256 DWORD bytes_received = 0;
257 grpc_error* error = GRPC_ERROR_NONE;
258
259 if (port->shutting_down) {
260 return GRPC_ERROR_NONE;
261 }
262
263 sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
264 grpc_get_default_wsa_socket_flags());
265 if (sock == INVALID_SOCKET) {
266 error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
267 goto failure;
268 }
269
270 error = grpc_tcp_prepare_socket(sock);
271 if (error != GRPC_ERROR_NONE) goto failure;
272
273 /* Start the "accept" asynchronously. */
274 success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
275 addrlen, addrlen, &bytes_received,
276 &port->socket->read_info.overlapped);
277
278 /* It is possible to get an accept immediately without delay. However, we
279 will still get an IOCP notification for it. So let's just ignore it. */
280 if (!success) {
281 int last_error = WSAGetLastError();
282 if (last_error != ERROR_IO_PENDING) {
283 error = GRPC_WSA_ERROR(last_error, "AcceptEx");
284 goto failure;
285 }
286 }
287
288 /* We're ready to do the accept. Calling grpc_socket_notify_on_read may
289 immediately process an accept that happened in the meantime. */
290 port->new_socket = sock;
291 grpc_socket_notify_on_read(port->socket, &port->on_accept);
292 port->outstanding_calls++;
293 return error;
294
295 failure:
296 GPR_ASSERT(error != GRPC_ERROR_NONE);
297 if (sock != INVALID_SOCKET) closesocket(sock);
298 return error;
299 }
300
301 /* Event manager callback when reads are ready. */
on_accept(void * arg,grpc_error * error)302 static void on_accept(void* arg, grpc_error* error) {
303 grpc_tcp_listener* sp = (grpc_tcp_listener*)arg;
304 SOCKET sock = sp->new_socket;
305 grpc_winsocket_callback_info* info = &sp->socket->read_info;
306 grpc_endpoint* ep = NULL;
307 grpc_resolved_address peer_name;
308 DWORD transfered_bytes;
309 DWORD flags;
310 BOOL wsa_success;
311 int err;
312
313 gpr_mu_lock(&sp->server->mu);
314
315 peer_name.len = sizeof(struct sockaddr_storage);
316
317 /* The general mechanism for shutting down is to queue abortion calls. While
318 this is necessary in the read/write case, it's useless for the accept
319 case. We only need to adjust the pending callback count */
320 if (error != GRPC_ERROR_NONE) {
321 const char* msg = grpc_error_string(error);
322 gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg);
323
324 gpr_mu_unlock(&sp->server->mu);
325 return;
326 }
327
328 /* The IOCP notified us of a completed operation. Let's grab the results,
329 and act accordingly. */
330 transfered_bytes = 0;
331 wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
332 &transfered_bytes, FALSE, &flags);
333 if (!wsa_success) {
334 if (!sp->shutting_down) {
335 char* utf8_message = gpr_format_message(WSAGetLastError());
336 gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
337 gpr_free(utf8_message);
338 }
339 closesocket(sock);
340 } else {
341 if (!sp->shutting_down) {
342 err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
343 (char*)&sp->socket->socket, sizeof(sp->socket->socket));
344 if (err) {
345 char* utf8_message = gpr_format_message(WSAGetLastError());
346 gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message);
347 gpr_free(utf8_message);
348 }
349 int peer_name_len = (int)peer_name.len;
350 err = getpeername(sock, (grpc_sockaddr*)peer_name.addr, &peer_name_len);
351 peer_name.len = (size_t)peer_name_len;
352 std::string peer_name_string;
353 if (!err) {
354 peer_name_string = grpc_sockaddr_to_uri(&peer_name);
355 } else {
356 char* utf8_message = gpr_format_message(WSAGetLastError());
357 gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message);
358 gpr_free(utf8_message);
359 }
360 std::string fd_name = absl::StrCat("tcp_server:", peer_name_string);
361 ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name.c_str()),
362 sp->server->channel_args, peer_name_string.c_str());
363 } else {
364 closesocket(sock);
365 }
366 }
367
368 /* The only time we should call our callback, is where we successfully
369 managed to accept a connection, and created an endpoint. */
370 if (ep) {
371 // Create acceptor.
372 grpc_tcp_server_acceptor* acceptor =
373 (grpc_tcp_server_acceptor*)gpr_malloc(sizeof(*acceptor));
374 acceptor->from_server = sp->server;
375 acceptor->port_index = sp->port_index;
376 acceptor->fd_index = 0;
377 acceptor->external_connection = false;
378 sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, NULL, acceptor);
379 }
380 /* As we were notified from the IOCP of one and exactly one accept,
381 the former socked we created has now either been destroy or assigned
382 to the new connection. We need to create a new one for the next
383 connection. */
384 GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
385 if (0 == --sp->outstanding_calls) {
386 decrement_active_ports_and_notify_locked(sp);
387 }
388 gpr_mu_unlock(&sp->server->mu);
389 }
390
add_socket_to_server(grpc_tcp_server * s,SOCKET sock,const grpc_resolved_address * addr,unsigned port_index,grpc_tcp_listener ** listener)391 static grpc_error* add_socket_to_server(grpc_tcp_server* s, SOCKET sock,
392 const grpc_resolved_address* addr,
393 unsigned port_index,
394 grpc_tcp_listener** listener) {
395 grpc_tcp_listener* sp = NULL;
396 int port = -1;
397 int status;
398 GUID guid = WSAID_ACCEPTEX;
399 DWORD ioctl_num_bytes;
400 LPFN_ACCEPTEX AcceptEx;
401 grpc_error* error = GRPC_ERROR_NONE;
402
403 /* We need to grab the AcceptEx pointer for that port, as it may be
404 interface-dependent. We'll cache it to avoid doing that again. */
405 status =
406 WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
407 &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
408
409 if (status != 0) {
410 char* utf8_message = gpr_format_message(WSAGetLastError());
411 gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
412 gpr_free(utf8_message);
413 closesocket(sock);
414 return GRPC_ERROR_NONE;
415 }
416
417 error = prepare_socket(sock, addr, &port);
418 if (error != GRPC_ERROR_NONE) {
419 return error;
420 }
421
422 GPR_ASSERT(port >= 0);
423 gpr_mu_lock(&s->mu);
424 GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
425 sp = (grpc_tcp_listener*)gpr_malloc(sizeof(grpc_tcp_listener));
426 sp->next = NULL;
427 if (s->head == NULL) {
428 s->head = sp;
429 } else {
430 s->tail->next = sp;
431 }
432 s->tail = sp;
433 sp->server = s;
434 sp->socket = grpc_winsocket_create(sock, "listener");
435 sp->shutting_down = 0;
436 sp->outstanding_calls = 0;
437 sp->AcceptEx = AcceptEx;
438 sp->new_socket = INVALID_SOCKET;
439 sp->port = port;
440 sp->port_index = port_index;
441 GRPC_CLOSURE_INIT(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
442 GPR_ASSERT(sp->socket);
443 gpr_mu_unlock(&s->mu);
444 *listener = sp;
445
446 return GRPC_ERROR_NONE;
447 }
448
tcp_server_add_port(grpc_tcp_server * s,const grpc_resolved_address * addr,int * port)449 static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
450 const grpc_resolved_address* addr,
451 int* port) {
452 grpc_tcp_listener* sp = NULL;
453 SOCKET sock;
454 grpc_resolved_address addr6_v4mapped;
455 grpc_resolved_address wildcard;
456 grpc_resolved_address* allocated_addr = NULL;
457 grpc_resolved_address sockname_temp;
458 unsigned port_index = 0;
459 grpc_error* error = GRPC_ERROR_NONE;
460
461 if (s->tail != NULL) {
462 port_index = s->tail->port_index + 1;
463 }
464
465 /* Check if this is a wildcard port, and if so, try to keep the port the same
466 as some previously created listener. */
467 if (grpc_sockaddr_get_port(addr) == 0) {
468 for (sp = s->head; sp; sp = sp->next) {
469 int sockname_temp_len = sizeof(struct sockaddr_storage);
470 if (0 == getsockname(sp->socket->socket,
471 (grpc_sockaddr*)sockname_temp.addr,
472 &sockname_temp_len)) {
473 sockname_temp.len = (size_t)sockname_temp_len;
474 *port = grpc_sockaddr_get_port(&sockname_temp);
475 if (*port > 0) {
476 allocated_addr =
477 (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address));
478 memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
479 grpc_sockaddr_set_port(allocated_addr, *port);
480 addr = allocated_addr;
481 break;
482 }
483 }
484 }
485 }
486
487 if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
488 addr = &addr6_v4mapped;
489 }
490
491 /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
492 if (grpc_sockaddr_is_wildcard(addr, port)) {
493 grpc_sockaddr_make_wildcard6(*port, &wildcard);
494
495 addr = &wildcard;
496 }
497
498 sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
499 grpc_get_default_wsa_socket_flags());
500 if (sock == INVALID_SOCKET) {
501 error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
502 goto done;
503 }
504
505 error = add_socket_to_server(s, sock, addr, port_index, &sp);
506
507 done:
508 gpr_free(allocated_addr);
509
510 if (error != GRPC_ERROR_NONE) {
511 grpc_error* error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
512 "Failed to add port to server", &error, 1);
513 GRPC_ERROR_UNREF(error);
514 error = error_out;
515 *port = -1;
516 } else {
517 GPR_ASSERT(sp != NULL);
518 *port = sp->port;
519 }
520 return error;
521 }
522
tcp_server_start(grpc_tcp_server * s,const std::vector<grpc_pollset * > *,grpc_tcp_server_cb on_accept_cb,void * on_accept_cb_arg)523 static void tcp_server_start(grpc_tcp_server* s,
524 const std::vector<grpc_pollset*>* /*pollsets*/,
525 grpc_tcp_server_cb on_accept_cb,
526 void* on_accept_cb_arg) {
527 grpc_tcp_listener* sp;
528 GPR_ASSERT(on_accept_cb);
529 gpr_mu_lock(&s->mu);
530 GPR_ASSERT(!s->on_accept_cb);
531 GPR_ASSERT(s->active_ports == 0);
532 s->on_accept_cb = on_accept_cb;
533 s->on_accept_cb_arg = on_accept_cb_arg;
534 for (sp = s->head; sp; sp = sp->next) {
535 GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
536 s->active_ports++;
537 }
538 gpr_mu_unlock(&s->mu);
539 }
540
tcp_server_port_fd_count(grpc_tcp_server * s,unsigned port_index)541 static unsigned tcp_server_port_fd_count(grpc_tcp_server* s,
542 unsigned port_index) {
543 return 0;
544 }
545
tcp_server_port_fd(grpc_tcp_server * s,unsigned port_index,unsigned fd_index)546 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
547 unsigned fd_index) {
548 return -1;
549 }
550
tcp_server_create_fd_handler(grpc_tcp_server * s)551 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
552 grpc_tcp_server* s) {
553 return nullptr;
554 }
555
tcp_server_shutdown_listeners(grpc_tcp_server * s)556 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {}
557
558 grpc_tcp_server_vtable grpc_windows_tcp_server_vtable = {
559 tcp_server_create, tcp_server_start,
560 tcp_server_add_port, tcp_server_create_fd_handler,
561 tcp_server_port_fd_count, tcp_server_port_fd,
562 tcp_server_ref, tcp_server_shutdown_starting_add,
563 tcp_server_unref, tcp_server_shutdown_listeners};
564 #endif /* GRPC_WINSOCK_SOCKET */
565