• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_WINSOCK_SOCKET
24 
25 #include "src/core/lib/iomgr/sockaddr.h"
26 
27 #include <inttypes.h>
28 #include <io.h>
29 
30 #include <vector>
31 
32 #include "absl/strings/str_cat.h"
33 
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/log_windows.h>
37 #include <grpc/support/string_util.h>
38 #include <grpc/support/sync.h>
39 #include <grpc/support/time.h>
40 
41 #include "src/core/lib/channel/channel_args.h"
42 #include "src/core/lib/iomgr/iocp_windows.h"
43 #include "src/core/lib/iomgr/pollset_windows.h"
44 #include "src/core/lib/iomgr/resolve_address.h"
45 #include "src/core/lib/iomgr/sockaddr_utils.h"
46 #include "src/core/lib/iomgr/socket_windows.h"
47 #include "src/core/lib/iomgr/tcp_server.h"
48 #include "src/core/lib/iomgr/tcp_windows.h"
49 #include "src/core/lib/slice/slice_internal.h"
50 
51 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
52 
53 /* one listening port */
54 typedef struct grpc_tcp_listener grpc_tcp_listener;
55 struct grpc_tcp_listener {
56   /* This seemingly magic number comes from AcceptEx's documentation. each
57      address buffer needs to have at least 16 more bytes at their end. */
58   uint8_t addresses[(sizeof(grpc_sockaddr_in6) + 16) * 2];
59   /* This will hold the socket for the next accept. */
60   SOCKET new_socket;
61   /* The listener winsocket. */
62   grpc_winsocket* socket;
63   /* The actual TCP port number. */
64   int port;
65   unsigned port_index;
66   grpc_tcp_server* server;
67   /* The cached AcceptEx for that port. */
68   LPFN_ACCEPTEX AcceptEx;
69   int shutting_down;
70   int outstanding_calls;
71   /* closure for socket notification of accept being ready */
72   grpc_closure on_accept;
73   /* linked list */
74   struct grpc_tcp_listener* next;
75 };
76 
77 /* the overall server */
78 struct grpc_tcp_server {
79   gpr_refcount refs;
80   /* Called whenever accept() succeeds on a server port. */
81   grpc_tcp_server_cb on_accept_cb;
82   void* on_accept_cb_arg;
83 
84   gpr_mu mu;
85 
86   /* active port count: how many ports are actually still listening */
87   int active_ports;
88 
89   /* linked list of server ports */
90   grpc_tcp_listener* head;
91   grpc_tcp_listener* tail;
92 
93   /* List of closures passed to shutdown_starting_add(). */
94   grpc_closure_list shutdown_starting;
95 
96   /* shutdown callback */
97   grpc_closure* shutdown_complete;
98 
99   grpc_channel_args* channel_args;
100 };
101 
102 /* Public function. Allocates the proper data structures to hold a
103    grpc_tcp_server. */
tcp_server_create(grpc_closure * shutdown_complete,const grpc_channel_args * args,grpc_tcp_server ** server)104 static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
105                                      const grpc_channel_args* args,
106                                      grpc_tcp_server** server) {
107   grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
108   s->channel_args = grpc_channel_args_copy(args);
109   gpr_ref_init(&s->refs, 1);
110   gpr_mu_init(&s->mu);
111   s->active_ports = 0;
112   s->on_accept_cb = NULL;
113   s->on_accept_cb_arg = NULL;
114   s->head = NULL;
115   s->tail = NULL;
116   s->shutdown_starting.head = NULL;
117   s->shutdown_starting.tail = NULL;
118   s->shutdown_complete = shutdown_complete;
119   *server = s;
120   return GRPC_ERROR_NONE;
121 }
122 
destroy_server(void * arg,grpc_error * error)123 static void destroy_server(void* arg, grpc_error* error) {
124   grpc_tcp_server* s = (grpc_tcp_server*)arg;
125 
126   /* Now that the accepts have been aborted, we can destroy the sockets.
127      The IOCP won't get notified on these, so we can flag them as already
128      closed by the system. */
129   while (s->head) {
130     grpc_tcp_listener* sp = s->head;
131     s->head = sp->next;
132     sp->next = NULL;
133     grpc_winsocket_destroy(sp->socket);
134     gpr_free(sp);
135   }
136   grpc_channel_args_destroy(s->channel_args);
137   gpr_mu_destroy(&s->mu);
138   gpr_free(s);
139 }
140 
finish_shutdown_locked(grpc_tcp_server * s)141 static void finish_shutdown_locked(grpc_tcp_server* s) {
142   if (s->shutdown_complete != NULL) {
143     grpc_core::ExecCtx::Run(DEBUG_LOCATION, s->shutdown_complete,
144                             GRPC_ERROR_NONE);
145   }
146 
147   grpc_core::ExecCtx::Run(
148       DEBUG_LOCATION,
149       GRPC_CLOSURE_CREATE(destroy_server, s, grpc_schedule_on_exec_ctx),
150       GRPC_ERROR_NONE);
151 }
152 
tcp_server_ref(grpc_tcp_server * s)153 static grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
154   gpr_ref_non_zero(&s->refs);
155   return s;
156 }
157 
tcp_server_shutdown_starting_add(grpc_tcp_server * s,grpc_closure * shutdown_starting)158 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
159                                              grpc_closure* shutdown_starting) {
160   gpr_mu_lock(&s->mu);
161   grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
162                            GRPC_ERROR_NONE);
163   gpr_mu_unlock(&s->mu);
164 }
165 
tcp_server_destroy(grpc_tcp_server * s)166 static void tcp_server_destroy(grpc_tcp_server* s) {
167   grpc_tcp_listener* sp;
168   gpr_mu_lock(&s->mu);
169 
170   /* First, shutdown all fd's. This will queue abortion calls for all
171      of the pending accepts due to the normal operation mechanism. */
172   if (s->active_ports == 0) {
173     finish_shutdown_locked(s);
174   } else {
175     for (sp = s->head; sp; sp = sp->next) {
176       sp->shutting_down = 1;
177       grpc_winsocket_shutdown(sp->socket);
178     }
179   }
180   gpr_mu_unlock(&s->mu);
181 }
182 
tcp_server_unref(grpc_tcp_server * s)183 static void tcp_server_unref(grpc_tcp_server* s) {
184   if (gpr_unref(&s->refs)) {
185     grpc_tcp_server_shutdown_listeners(s);
186     gpr_mu_lock(&s->mu);
187     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &s->shutdown_starting);
188     gpr_mu_unlock(&s->mu);
189     tcp_server_destroy(s);
190   }
191 }
192 
193 /* Prepare (bind) a recently-created socket for listening. */
prepare_socket(SOCKET sock,const grpc_resolved_address * addr,int * port)194 static grpc_error* prepare_socket(SOCKET sock,
195                                   const grpc_resolved_address* addr,
196                                   int* port) {
197   grpc_resolved_address sockname_temp;
198   grpc_error* error = GRPC_ERROR_NONE;
199   int sockname_temp_len;
200 
201   error = grpc_tcp_prepare_socket(sock);
202   if (error != GRPC_ERROR_NONE) {
203     goto failure;
204   }
205 
206   if (bind(sock, (const grpc_sockaddr*)addr->addr, (int)addr->len) ==
207       SOCKET_ERROR) {
208     error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
209     goto failure;
210   }
211 
212   if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
213     error = GRPC_WSA_ERROR(WSAGetLastError(), "listen");
214     goto failure;
215   }
216 
217   sockname_temp_len = sizeof(struct sockaddr_storage);
218   if (getsockname(sock, (grpc_sockaddr*)sockname_temp.addr,
219                   &sockname_temp_len) == SOCKET_ERROR) {
220     error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
221     goto failure;
222   }
223   sockname_temp.len = (size_t)sockname_temp_len;
224 
225   *port = grpc_sockaddr_get_port(&sockname_temp);
226   return GRPC_ERROR_NONE;
227 
228 failure:
229   GPR_ASSERT(error != GRPC_ERROR_NONE);
230   grpc_error_set_int(
231       grpc_error_set_str(
232           GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
233               "Failed to prepare server socket", &error, 1),
234           GRPC_ERROR_STR_TARGET_ADDRESS,
235           grpc_slice_from_cpp_string(grpc_sockaddr_to_uri(addr))),
236       GRPC_ERROR_INT_FD, (intptr_t)sock);
237   GRPC_ERROR_UNREF(error);
238   if (sock != INVALID_SOCKET) closesocket(sock);
239   return error;
240 }
241 
decrement_active_ports_and_notify_locked(grpc_tcp_listener * sp)242 static void decrement_active_ports_and_notify_locked(grpc_tcp_listener* sp) {
243   sp->shutting_down = 0;
244   GPR_ASSERT(sp->server->active_ports > 0);
245   if (0 == --sp->server->active_ports) {
246     finish_shutdown_locked(sp->server);
247   }
248 }
249 
250 /* In order to do an async accept, we need to create a socket first which
251    will be the one assigned to the new incoming connection. */
start_accept_locked(grpc_tcp_listener * port)252 static grpc_error* start_accept_locked(grpc_tcp_listener* port) {
253   SOCKET sock = INVALID_SOCKET;
254   BOOL success;
255   DWORD addrlen = sizeof(grpc_sockaddr_in6) + 16;
256   DWORD bytes_received = 0;
257   grpc_error* error = GRPC_ERROR_NONE;
258 
259   if (port->shutting_down) {
260     return GRPC_ERROR_NONE;
261   }
262 
263   sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
264                    grpc_get_default_wsa_socket_flags());
265   if (sock == INVALID_SOCKET) {
266     error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
267     goto failure;
268   }
269 
270   error = grpc_tcp_prepare_socket(sock);
271   if (error != GRPC_ERROR_NONE) goto failure;
272 
273   /* Start the "accept" asynchronously. */
274   success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
275                            addrlen, addrlen, &bytes_received,
276                            &port->socket->read_info.overlapped);
277 
278   /* It is possible to get an accept immediately without delay. However, we
279      will still get an IOCP notification for it. So let's just ignore it. */
280   if (!success) {
281     int last_error = WSAGetLastError();
282     if (last_error != ERROR_IO_PENDING) {
283       error = GRPC_WSA_ERROR(last_error, "AcceptEx");
284       goto failure;
285     }
286   }
287 
288   /* We're ready to do the accept. Calling grpc_socket_notify_on_read may
289      immediately process an accept that happened in the meantime. */
290   port->new_socket = sock;
291   grpc_socket_notify_on_read(port->socket, &port->on_accept);
292   port->outstanding_calls++;
293   return error;
294 
295 failure:
296   GPR_ASSERT(error != GRPC_ERROR_NONE);
297   if (sock != INVALID_SOCKET) closesocket(sock);
298   return error;
299 }
300 
301 /* Event manager callback when reads are ready. */
on_accept(void * arg,grpc_error * error)302 static void on_accept(void* arg, grpc_error* error) {
303   grpc_tcp_listener* sp = (grpc_tcp_listener*)arg;
304   SOCKET sock = sp->new_socket;
305   grpc_winsocket_callback_info* info = &sp->socket->read_info;
306   grpc_endpoint* ep = NULL;
307   grpc_resolved_address peer_name;
308   DWORD transfered_bytes;
309   DWORD flags;
310   BOOL wsa_success;
311   int err;
312 
313   gpr_mu_lock(&sp->server->mu);
314 
315   peer_name.len = sizeof(struct sockaddr_storage);
316 
317   /* The general mechanism for shutting down is to queue abortion calls. While
318      this is necessary in the read/write case, it's useless for the accept
319      case. We only need to adjust the pending callback count */
320   if (error != GRPC_ERROR_NONE) {
321     const char* msg = grpc_error_string(error);
322     gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg);
323 
324     gpr_mu_unlock(&sp->server->mu);
325     return;
326   }
327 
328   /* The IOCP notified us of a completed operation. Let's grab the results,
329      and act accordingly. */
330   transfered_bytes = 0;
331   wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
332                                        &transfered_bytes, FALSE, &flags);
333   if (!wsa_success) {
334     if (!sp->shutting_down) {
335       char* utf8_message = gpr_format_message(WSAGetLastError());
336       gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
337       gpr_free(utf8_message);
338     }
339     closesocket(sock);
340   } else {
341     if (!sp->shutting_down) {
342       err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
343                        (char*)&sp->socket->socket, sizeof(sp->socket->socket));
344       if (err) {
345         char* utf8_message = gpr_format_message(WSAGetLastError());
346         gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message);
347         gpr_free(utf8_message);
348       }
349       int peer_name_len = (int)peer_name.len;
350       err = getpeername(sock, (grpc_sockaddr*)peer_name.addr, &peer_name_len);
351       peer_name.len = (size_t)peer_name_len;
352       std::string peer_name_string;
353       if (!err) {
354         peer_name_string = grpc_sockaddr_to_uri(&peer_name);
355       } else {
356         char* utf8_message = gpr_format_message(WSAGetLastError());
357         gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message);
358         gpr_free(utf8_message);
359       }
360       std::string fd_name = absl::StrCat("tcp_server:", peer_name_string);
361       ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name.c_str()),
362                            sp->server->channel_args, peer_name_string.c_str());
363     } else {
364       closesocket(sock);
365     }
366   }
367 
368   /* The only time we should call our callback, is where we successfully
369      managed to accept a connection, and created an endpoint. */
370   if (ep) {
371     // Create acceptor.
372     grpc_tcp_server_acceptor* acceptor =
373         (grpc_tcp_server_acceptor*)gpr_malloc(sizeof(*acceptor));
374     acceptor->from_server = sp->server;
375     acceptor->port_index = sp->port_index;
376     acceptor->fd_index = 0;
377     acceptor->external_connection = false;
378     sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, NULL, acceptor);
379   }
380   /* As we were notified from the IOCP of one and exactly one accept,
381      the former socked we created has now either been destroy or assigned
382      to the new connection. We need to create a new one for the next
383      connection. */
384   GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
385   if (0 == --sp->outstanding_calls) {
386     decrement_active_ports_and_notify_locked(sp);
387   }
388   gpr_mu_unlock(&sp->server->mu);
389 }
390 
add_socket_to_server(grpc_tcp_server * s,SOCKET sock,const grpc_resolved_address * addr,unsigned port_index,grpc_tcp_listener ** listener)391 static grpc_error* add_socket_to_server(grpc_tcp_server* s, SOCKET sock,
392                                         const grpc_resolved_address* addr,
393                                         unsigned port_index,
394                                         grpc_tcp_listener** listener) {
395   grpc_tcp_listener* sp = NULL;
396   int port = -1;
397   int status;
398   GUID guid = WSAID_ACCEPTEX;
399   DWORD ioctl_num_bytes;
400   LPFN_ACCEPTEX AcceptEx;
401   grpc_error* error = GRPC_ERROR_NONE;
402 
403   /* We need to grab the AcceptEx pointer for that port, as it may be
404      interface-dependent. We'll cache it to avoid doing that again. */
405   status =
406       WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
407                &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
408 
409   if (status != 0) {
410     char* utf8_message = gpr_format_message(WSAGetLastError());
411     gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
412     gpr_free(utf8_message);
413     closesocket(sock);
414     return GRPC_ERROR_NONE;
415   }
416 
417   error = prepare_socket(sock, addr, &port);
418   if (error != GRPC_ERROR_NONE) {
419     return error;
420   }
421 
422   GPR_ASSERT(port >= 0);
423   gpr_mu_lock(&s->mu);
424   GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
425   sp = (grpc_tcp_listener*)gpr_malloc(sizeof(grpc_tcp_listener));
426   sp->next = NULL;
427   if (s->head == NULL) {
428     s->head = sp;
429   } else {
430     s->tail->next = sp;
431   }
432   s->tail = sp;
433   sp->server = s;
434   sp->socket = grpc_winsocket_create(sock, "listener");
435   sp->shutting_down = 0;
436   sp->outstanding_calls = 0;
437   sp->AcceptEx = AcceptEx;
438   sp->new_socket = INVALID_SOCKET;
439   sp->port = port;
440   sp->port_index = port_index;
441   GRPC_CLOSURE_INIT(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
442   GPR_ASSERT(sp->socket);
443   gpr_mu_unlock(&s->mu);
444   *listener = sp;
445 
446   return GRPC_ERROR_NONE;
447 }
448 
tcp_server_add_port(grpc_tcp_server * s,const grpc_resolved_address * addr,int * port)449 static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
450                                        const grpc_resolved_address* addr,
451                                        int* port) {
452   grpc_tcp_listener* sp = NULL;
453   SOCKET sock;
454   grpc_resolved_address addr6_v4mapped;
455   grpc_resolved_address wildcard;
456   grpc_resolved_address* allocated_addr = NULL;
457   grpc_resolved_address sockname_temp;
458   unsigned port_index = 0;
459   grpc_error* error = GRPC_ERROR_NONE;
460 
461   if (s->tail != NULL) {
462     port_index = s->tail->port_index + 1;
463   }
464 
465   /* Check if this is a wildcard port, and if so, try to keep the port the same
466      as some previously created listener. */
467   if (grpc_sockaddr_get_port(addr) == 0) {
468     for (sp = s->head; sp; sp = sp->next) {
469       int sockname_temp_len = sizeof(struct sockaddr_storage);
470       if (0 == getsockname(sp->socket->socket,
471                            (grpc_sockaddr*)sockname_temp.addr,
472                            &sockname_temp_len)) {
473         sockname_temp.len = (size_t)sockname_temp_len;
474         *port = grpc_sockaddr_get_port(&sockname_temp);
475         if (*port > 0) {
476           allocated_addr =
477               (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address));
478           memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
479           grpc_sockaddr_set_port(allocated_addr, *port);
480           addr = allocated_addr;
481           break;
482         }
483       }
484     }
485   }
486 
487   if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
488     addr = &addr6_v4mapped;
489   }
490 
491   /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
492   if (grpc_sockaddr_is_wildcard(addr, port)) {
493     grpc_sockaddr_make_wildcard6(*port, &wildcard);
494 
495     addr = &wildcard;
496   }
497 
498   sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
499                    grpc_get_default_wsa_socket_flags());
500   if (sock == INVALID_SOCKET) {
501     error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
502     goto done;
503   }
504 
505   error = add_socket_to_server(s, sock, addr, port_index, &sp);
506 
507 done:
508   gpr_free(allocated_addr);
509 
510   if (error != GRPC_ERROR_NONE) {
511     grpc_error* error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
512         "Failed to add port to server", &error, 1);
513     GRPC_ERROR_UNREF(error);
514     error = error_out;
515     *port = -1;
516   } else {
517     GPR_ASSERT(sp != NULL);
518     *port = sp->port;
519   }
520   return error;
521 }
522 
tcp_server_start(grpc_tcp_server * s,const std::vector<grpc_pollset * > *,grpc_tcp_server_cb on_accept_cb,void * on_accept_cb_arg)523 static void tcp_server_start(grpc_tcp_server* s,
524                              const std::vector<grpc_pollset*>* /*pollsets*/,
525                              grpc_tcp_server_cb on_accept_cb,
526                              void* on_accept_cb_arg) {
527   grpc_tcp_listener* sp;
528   GPR_ASSERT(on_accept_cb);
529   gpr_mu_lock(&s->mu);
530   GPR_ASSERT(!s->on_accept_cb);
531   GPR_ASSERT(s->active_ports == 0);
532   s->on_accept_cb = on_accept_cb;
533   s->on_accept_cb_arg = on_accept_cb_arg;
534   for (sp = s->head; sp; sp = sp->next) {
535     GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
536     s->active_ports++;
537   }
538   gpr_mu_unlock(&s->mu);
539 }
540 
tcp_server_port_fd_count(grpc_tcp_server * s,unsigned port_index)541 static unsigned tcp_server_port_fd_count(grpc_tcp_server* s,
542                                          unsigned port_index) {
543   return 0;
544 }
545 
tcp_server_port_fd(grpc_tcp_server * s,unsigned port_index,unsigned fd_index)546 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
547                               unsigned fd_index) {
548   return -1;
549 }
550 
tcp_server_create_fd_handler(grpc_tcp_server * s)551 static grpc_core::TcpServerFdHandler* tcp_server_create_fd_handler(
552     grpc_tcp_server* s) {
553   return nullptr;
554 }
555 
tcp_server_shutdown_listeners(grpc_tcp_server * s)556 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {}
557 
558 grpc_tcp_server_vtable grpc_windows_tcp_server_vtable = {
559     tcp_server_create,        tcp_server_start,
560     tcp_server_add_port,      tcp_server_create_fd_handler,
561     tcp_server_port_fd_count, tcp_server_port_fd,
562     tcp_server_ref,           tcp_server_shutdown_starting_add,
563     tcp_server_unref,         tcp_server_shutdown_listeners};
564 #endif /* GRPC_WINSOCK_SOCKET */
565