• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_WINSOCK_SOCKET
24 
25 #include "src/core/lib/iomgr/sockaddr.h"
26 
27 #include <inttypes.h>
28 #include <io.h>
29 
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/log_windows.h>
33 #include <grpc/support/string_util.h>
34 #include <grpc/support/sync.h>
35 #include <grpc/support/time.h>
36 
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/iomgr/iocp_windows.h"
39 #include "src/core/lib/iomgr/pollset_windows.h"
40 #include "src/core/lib/iomgr/resolve_address.h"
41 #include "src/core/lib/iomgr/sockaddr_utils.h"
42 #include "src/core/lib/iomgr/socket_windows.h"
43 #include "src/core/lib/iomgr/tcp_server.h"
44 #include "src/core/lib/iomgr/tcp_windows.h"
45 
46 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
47 
48 /* one listening port */
49 typedef struct grpc_tcp_listener grpc_tcp_listener;
50 struct grpc_tcp_listener {
51   /* This seemingly magic number comes from AcceptEx's documentation. each
52      address buffer needs to have at least 16 more bytes at their end. */
53   uint8_t addresses[(sizeof(grpc_sockaddr_in6) + 16) * 2];
54   /* This will hold the socket for the next accept. */
55   SOCKET new_socket;
56   /* The listener winsocket. */
57   grpc_winsocket* socket;
58   /* The actual TCP port number. */
59   int port;
60   unsigned port_index;
61   grpc_tcp_server* server;
62   /* The cached AcceptEx for that port. */
63   LPFN_ACCEPTEX AcceptEx;
64   int shutting_down;
65   int outstanding_calls;
66   /* closure for socket notification of accept being ready */
67   grpc_closure on_accept;
68   /* linked list */
69   struct grpc_tcp_listener* next;
70 };
71 
72 /* the overall server */
73 struct grpc_tcp_server {
74   gpr_refcount refs;
75   /* Called whenever accept() succeeds on a server port. */
76   grpc_tcp_server_cb on_accept_cb;
77   void* on_accept_cb_arg;
78 
79   gpr_mu mu;
80 
81   /* active port count: how many ports are actually still listening */
82   int active_ports;
83 
84   /* linked list of server ports */
85   grpc_tcp_listener* head;
86   grpc_tcp_listener* tail;
87 
88   /* List of closures passed to shutdown_starting_add(). */
89   grpc_closure_list shutdown_starting;
90 
91   /* shutdown callback */
92   grpc_closure* shutdown_complete;
93 
94   grpc_channel_args* channel_args;
95 };
96 
97 /* Public function. Allocates the proper data structures to hold a
98    grpc_tcp_server. */
tcp_server_create(grpc_closure * shutdown_complete,const grpc_channel_args * args,grpc_tcp_server ** server)99 static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
100                                      const grpc_channel_args* args,
101                                      grpc_tcp_server** server) {
102   grpc_tcp_server* s = (grpc_tcp_server*)gpr_malloc(sizeof(grpc_tcp_server));
103   s->channel_args = grpc_channel_args_copy(args);
104   gpr_ref_init(&s->refs, 1);
105   gpr_mu_init(&s->mu);
106   s->active_ports = 0;
107   s->on_accept_cb = NULL;
108   s->on_accept_cb_arg = NULL;
109   s->head = NULL;
110   s->tail = NULL;
111   s->shutdown_starting.head = NULL;
112   s->shutdown_starting.tail = NULL;
113   s->shutdown_complete = shutdown_complete;
114   *server = s;
115   return GRPC_ERROR_NONE;
116 }
117 
destroy_server(void * arg,grpc_error * error)118 static void destroy_server(void* arg, grpc_error* error) {
119   grpc_tcp_server* s = (grpc_tcp_server*)arg;
120 
121   /* Now that the accepts have been aborted, we can destroy the sockets.
122      The IOCP won't get notified on these, so we can flag them as already
123      closed by the system. */
124   while (s->head) {
125     grpc_tcp_listener* sp = s->head;
126     s->head = sp->next;
127     sp->next = NULL;
128     grpc_winsocket_destroy(sp->socket);
129     gpr_free(sp);
130   }
131   grpc_channel_args_destroy(s->channel_args);
132   gpr_mu_destroy(&s->mu);
133   gpr_free(s);
134 }
135 
finish_shutdown_locked(grpc_tcp_server * s)136 static void finish_shutdown_locked(grpc_tcp_server* s) {
137   if (s->shutdown_complete != NULL) {
138     GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
139   }
140 
141   GRPC_CLOSURE_SCHED(
142       GRPC_CLOSURE_CREATE(destroy_server, s, grpc_schedule_on_exec_ctx),
143       GRPC_ERROR_NONE);
144 }
145 
tcp_server_ref(grpc_tcp_server * s)146 static grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
147   gpr_ref_non_zero(&s->refs);
148   return s;
149 }
150 
tcp_server_shutdown_starting_add(grpc_tcp_server * s,grpc_closure * shutdown_starting)151 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
152                                              grpc_closure* shutdown_starting) {
153   gpr_mu_lock(&s->mu);
154   grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
155                            GRPC_ERROR_NONE);
156   gpr_mu_unlock(&s->mu);
157 }
158 
tcp_server_destroy(grpc_tcp_server * s)159 static void tcp_server_destroy(grpc_tcp_server* s) {
160   grpc_tcp_listener* sp;
161   gpr_mu_lock(&s->mu);
162 
163   /* First, shutdown all fd's. This will queue abortion calls for all
164      of the pending accepts due to the normal operation mechanism. */
165   if (s->active_ports == 0) {
166     finish_shutdown_locked(s);
167   } else {
168     for (sp = s->head; sp; sp = sp->next) {
169       sp->shutting_down = 1;
170       grpc_winsocket_shutdown(sp->socket);
171     }
172   }
173   gpr_mu_unlock(&s->mu);
174 }
175 
tcp_server_unref(grpc_tcp_server * s)176 static void tcp_server_unref(grpc_tcp_server* s) {
177   if (gpr_unref(&s->refs)) {
178     grpc_tcp_server_shutdown_listeners(s);
179     gpr_mu_lock(&s->mu);
180     GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting);
181     gpr_mu_unlock(&s->mu);
182     tcp_server_destroy(s);
183   }
184 }
185 
186 /* Prepare (bind) a recently-created socket for listening. */
prepare_socket(SOCKET sock,const grpc_resolved_address * addr,int * port)187 static grpc_error* prepare_socket(SOCKET sock,
188                                   const grpc_resolved_address* addr,
189                                   int* port) {
190   grpc_resolved_address sockname_temp;
191   grpc_error* error = GRPC_ERROR_NONE;
192   int sockname_temp_len;
193 
194   error = grpc_tcp_prepare_socket(sock);
195   if (error != GRPC_ERROR_NONE) {
196     goto failure;
197   }
198 
199   if (bind(sock, (const grpc_sockaddr*)addr->addr, (int)addr->len) ==
200       SOCKET_ERROR) {
201     error = GRPC_WSA_ERROR(WSAGetLastError(), "bind");
202     goto failure;
203   }
204 
205   if (listen(sock, SOMAXCONN) == SOCKET_ERROR) {
206     error = GRPC_WSA_ERROR(WSAGetLastError(), "listen");
207     goto failure;
208   }
209 
210   sockname_temp_len = sizeof(struct sockaddr_storage);
211   if (getsockname(sock, (grpc_sockaddr*)sockname_temp.addr,
212                   &sockname_temp_len) == SOCKET_ERROR) {
213     error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
214     goto failure;
215   }
216   sockname_temp.len = (size_t)sockname_temp_len;
217 
218   *port = grpc_sockaddr_get_port(&sockname_temp);
219   return GRPC_ERROR_NONE;
220 
221 failure:
222   GPR_ASSERT(error != GRPC_ERROR_NONE);
223   char* tgtaddr = grpc_sockaddr_to_uri(addr);
224   grpc_error_set_int(
225       grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
226                              "Failed to prepare server socket", &error, 1),
227                          GRPC_ERROR_STR_TARGET_ADDRESS,
228                          grpc_slice_from_copied_string(tgtaddr)),
229       GRPC_ERROR_INT_FD, (intptr_t)sock);
230   gpr_free(tgtaddr);
231   GRPC_ERROR_UNREF(error);
232   if (sock != INVALID_SOCKET) closesocket(sock);
233   return error;
234 }
235 
decrement_active_ports_and_notify_locked(grpc_tcp_listener * sp)236 static void decrement_active_ports_and_notify_locked(grpc_tcp_listener* sp) {
237   sp->shutting_down = 0;
238   GPR_ASSERT(sp->server->active_ports > 0);
239   if (0 == --sp->server->active_ports) {
240     finish_shutdown_locked(sp->server);
241   }
242 }
243 
244 /* In order to do an async accept, we need to create a socket first which
245    will be the one assigned to the new incoming connection. */
start_accept_locked(grpc_tcp_listener * port)246 static grpc_error* start_accept_locked(grpc_tcp_listener* port) {
247   SOCKET sock = INVALID_SOCKET;
248   BOOL success;
249   DWORD addrlen = sizeof(grpc_sockaddr_in6) + 16;
250   DWORD bytes_received = 0;
251   grpc_error* error = GRPC_ERROR_NONE;
252 
253   if (port->shutting_down) {
254     return GRPC_ERROR_NONE;
255   }
256 
257   sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
258                    WSA_FLAG_OVERLAPPED);
259   if (sock == INVALID_SOCKET) {
260     error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
261     goto failure;
262   }
263 
264   error = grpc_tcp_prepare_socket(sock);
265   if (error != GRPC_ERROR_NONE) goto failure;
266 
267   /* Start the "accept" asynchronously. */
268   success = port->AcceptEx(port->socket->socket, sock, port->addresses, 0,
269                            addrlen, addrlen, &bytes_received,
270                            &port->socket->read_info.overlapped);
271 
272   /* It is possible to get an accept immediately without delay. However, we
273      will still get an IOCP notification for it. So let's just ignore it. */
274   if (!success) {
275     int last_error = WSAGetLastError();
276     if (last_error != ERROR_IO_PENDING) {
277       error = GRPC_WSA_ERROR(last_error, "AcceptEx");
278       goto failure;
279     }
280   }
281 
282   /* We're ready to do the accept. Calling grpc_socket_notify_on_read may
283      immediately process an accept that happened in the meantime. */
284   port->new_socket = sock;
285   grpc_socket_notify_on_read(port->socket, &port->on_accept);
286   port->outstanding_calls++;
287   return error;
288 
289 failure:
290   GPR_ASSERT(error != GRPC_ERROR_NONE);
291   if (sock != INVALID_SOCKET) closesocket(sock);
292   return error;
293 }
294 
295 /* Event manager callback when reads are ready. */
on_accept(void * arg,grpc_error * error)296 static void on_accept(void* arg, grpc_error* error) {
297   grpc_tcp_listener* sp = (grpc_tcp_listener*)arg;
298   SOCKET sock = sp->new_socket;
299   grpc_winsocket_callback_info* info = &sp->socket->read_info;
300   grpc_endpoint* ep = NULL;
301   grpc_resolved_address peer_name;
302   char* peer_name_string;
303   char* fd_name;
304   DWORD transfered_bytes;
305   DWORD flags;
306   BOOL wsa_success;
307   int err;
308 
309   gpr_mu_lock(&sp->server->mu);
310 
311   peer_name.len = sizeof(struct sockaddr_storage);
312 
313   /* The general mechanism for shutting down is to queue abortion calls. While
314      this is necessary in the read/write case, it's useless for the accept
315      case. We only need to adjust the pending callback count */
316   if (error != GRPC_ERROR_NONE) {
317     const char* msg = grpc_error_string(error);
318     gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", msg);
319 
320     gpr_mu_unlock(&sp->server->mu);
321     return;
322   }
323 
324   /* The IOCP notified us of a completed operation. Let's grab the results,
325      and act accordingly. */
326   transfered_bytes = 0;
327   wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
328                                        &transfered_bytes, FALSE, &flags);
329   if (!wsa_success) {
330     if (!sp->shutting_down) {
331       char* utf8_message = gpr_format_message(WSAGetLastError());
332       gpr_log(GPR_ERROR, "on_accept error: %s", utf8_message);
333       gpr_free(utf8_message);
334     }
335     closesocket(sock);
336   } else {
337     if (!sp->shutting_down) {
338       peer_name_string = NULL;
339       err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
340                        (char*)&sp->socket->socket, sizeof(sp->socket->socket));
341       if (err) {
342         char* utf8_message = gpr_format_message(WSAGetLastError());
343         gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message);
344         gpr_free(utf8_message);
345       }
346       int peer_name_len = (int)peer_name.len;
347       err = getpeername(sock, (grpc_sockaddr*)peer_name.addr, &peer_name_len);
348       peer_name.len = (size_t)peer_name_len;
349       if (!err) {
350         peer_name_string = grpc_sockaddr_to_uri(&peer_name);
351       } else {
352         char* utf8_message = gpr_format_message(WSAGetLastError());
353         gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message);
354         gpr_free(utf8_message);
355       }
356       gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string);
357       ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name),
358                            sp->server->channel_args, peer_name_string);
359       gpr_free(fd_name);
360       gpr_free(peer_name_string);
361     } else {
362       closesocket(sock);
363     }
364   }
365 
366   /* The only time we should call our callback, is where we successfully
367      managed to accept a connection, and created an endpoint. */
368   if (ep) {
369     // Create acceptor.
370     grpc_tcp_server_acceptor* acceptor =
371         (grpc_tcp_server_acceptor*)gpr_malloc(sizeof(*acceptor));
372     acceptor->from_server = sp->server;
373     acceptor->port_index = sp->port_index;
374     acceptor->fd_index = 0;
375     sp->server->on_accept_cb(sp->server->on_accept_cb_arg, ep, NULL, acceptor);
376   }
377   /* As we were notified from the IOCP of one and exactly one accept,
378      the former socked we created has now either been destroy or assigned
379      to the new connection. We need to create a new one for the next
380      connection. */
381   GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
382   if (0 == --sp->outstanding_calls) {
383     decrement_active_ports_and_notify_locked(sp);
384   }
385   gpr_mu_unlock(&sp->server->mu);
386 }
387 
add_socket_to_server(grpc_tcp_server * s,SOCKET sock,const grpc_resolved_address * addr,unsigned port_index,grpc_tcp_listener ** listener)388 static grpc_error* add_socket_to_server(grpc_tcp_server* s, SOCKET sock,
389                                         const grpc_resolved_address* addr,
390                                         unsigned port_index,
391                                         grpc_tcp_listener** listener) {
392   grpc_tcp_listener* sp = NULL;
393   int port = -1;
394   int status;
395   GUID guid = WSAID_ACCEPTEX;
396   DWORD ioctl_num_bytes;
397   LPFN_ACCEPTEX AcceptEx;
398   grpc_error* error = GRPC_ERROR_NONE;
399 
400   /* We need to grab the AcceptEx pointer for that port, as it may be
401      interface-dependent. We'll cache it to avoid doing that again. */
402   status =
403       WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
404                &AcceptEx, sizeof(AcceptEx), &ioctl_num_bytes, NULL, NULL);
405 
406   if (status != 0) {
407     char* utf8_message = gpr_format_message(WSAGetLastError());
408     gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message);
409     gpr_free(utf8_message);
410     closesocket(sock);
411     return NULL;
412   }
413 
414   error = prepare_socket(sock, addr, &port);
415   if (error != GRPC_ERROR_NONE) {
416     return error;
417   }
418 
419   GPR_ASSERT(port >= 0);
420   gpr_mu_lock(&s->mu);
421   GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
422   sp = (grpc_tcp_listener*)gpr_malloc(sizeof(grpc_tcp_listener));
423   sp->next = NULL;
424   if (s->head == NULL) {
425     s->head = sp;
426   } else {
427     s->tail->next = sp;
428   }
429   s->tail = sp;
430   sp->server = s;
431   sp->socket = grpc_winsocket_create(sock, "listener");
432   sp->shutting_down = 0;
433   sp->outstanding_calls = 0;
434   sp->AcceptEx = AcceptEx;
435   sp->new_socket = INVALID_SOCKET;
436   sp->port = port;
437   sp->port_index = port_index;
438   GRPC_CLOSURE_INIT(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
439   GPR_ASSERT(sp->socket);
440   gpr_mu_unlock(&s->mu);
441   *listener = sp;
442 
443   return GRPC_ERROR_NONE;
444 }
445 
tcp_server_add_port(grpc_tcp_server * s,const grpc_resolved_address * addr,int * port)446 static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
447                                        const grpc_resolved_address* addr,
448                                        int* port) {
449   grpc_tcp_listener* sp = NULL;
450   SOCKET sock;
451   grpc_resolved_address addr6_v4mapped;
452   grpc_resolved_address wildcard;
453   grpc_resolved_address* allocated_addr = NULL;
454   grpc_resolved_address sockname_temp;
455   unsigned port_index = 0;
456   grpc_error* error = GRPC_ERROR_NONE;
457 
458   if (s->tail != NULL) {
459     port_index = s->tail->port_index + 1;
460   }
461 
462   /* Check if this is a wildcard port, and if so, try to keep the port the same
463      as some previously created listener. */
464   if (grpc_sockaddr_get_port(addr) == 0) {
465     for (sp = s->head; sp; sp = sp->next) {
466       int sockname_temp_len = sizeof(struct sockaddr_storage);
467       if (0 == getsockname(sp->socket->socket,
468                            (grpc_sockaddr*)sockname_temp.addr,
469                            &sockname_temp_len)) {
470         sockname_temp.len = (size_t)sockname_temp_len;
471         *port = grpc_sockaddr_get_port(&sockname_temp);
472         if (*port > 0) {
473           allocated_addr =
474               (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address));
475           memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
476           grpc_sockaddr_set_port(allocated_addr, *port);
477           addr = allocated_addr;
478           break;
479         }
480       }
481     }
482   }
483 
484   if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
485     addr = &addr6_v4mapped;
486   }
487 
488   /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
489   if (grpc_sockaddr_is_wildcard(addr, port)) {
490     grpc_sockaddr_make_wildcard6(*port, &wildcard);
491 
492     addr = &wildcard;
493   }
494 
495   sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0,
496                    WSA_FLAG_OVERLAPPED);
497   if (sock == INVALID_SOCKET) {
498     error = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket");
499     goto done;
500   }
501 
502   error = add_socket_to_server(s, sock, addr, port_index, &sp);
503 
504 done:
505   gpr_free(allocated_addr);
506 
507   if (error != GRPC_ERROR_NONE) {
508     grpc_error* error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
509         "Failed to add port to server", &error, 1);
510     GRPC_ERROR_UNREF(error);
511     error = error_out;
512     *port = -1;
513   } else {
514     GPR_ASSERT(sp != NULL);
515     *port = sp->port;
516   }
517   return error;
518 }
519 
tcp_server_start(grpc_tcp_server * s,grpc_pollset ** pollset,size_t pollset_count,grpc_tcp_server_cb on_accept_cb,void * on_accept_cb_arg)520 static void tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollset,
521                              size_t pollset_count,
522                              grpc_tcp_server_cb on_accept_cb,
523                              void* on_accept_cb_arg) {
524   grpc_tcp_listener* sp;
525   GPR_ASSERT(on_accept_cb);
526   gpr_mu_lock(&s->mu);
527   GPR_ASSERT(!s->on_accept_cb);
528   GPR_ASSERT(s->active_ports == 0);
529   s->on_accept_cb = on_accept_cb;
530   s->on_accept_cb_arg = on_accept_cb_arg;
531   for (sp = s->head; sp; sp = sp->next) {
532     GPR_ASSERT(GRPC_LOG_IF_ERROR("start_accept", start_accept_locked(sp)));
533     s->active_ports++;
534   }
535   gpr_mu_unlock(&s->mu);
536 }
537 
tcp_server_port_fd_count(grpc_tcp_server * s,unsigned port_index)538 static unsigned tcp_server_port_fd_count(grpc_tcp_server* s,
539                                          unsigned port_index) {
540   return 0;
541 }
542 
tcp_server_port_fd(grpc_tcp_server * s,unsigned port_index,unsigned fd_index)543 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
544                               unsigned fd_index) {
545   return -1;
546 }
547 
tcp_server_shutdown_listeners(grpc_tcp_server * s)548 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {}
549 
550 grpc_tcp_server_vtable grpc_windows_tcp_server_vtable = {
551     tcp_server_create,
552     tcp_server_start,
553     tcp_server_add_port,
554     tcp_server_port_fd_count,
555     tcp_server_port_fd,
556     tcp_server_ref,
557     tcp_server_shutdown_starting_add,
558     tcp_server_unref,
559     tcp_server_shutdown_listeners};
560 #endif /* GRPC_WINSOCK_SOCKET */
561