• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */
20 #ifndef _GNU_SOURCE
21 #define _GNU_SOURCE
22 #endif
23 
24 #include <grpc/support/port_platform.h>
25 
26 #include "src/core/lib/iomgr/port.h"
27 
28 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER
29 
30 #include "src/core/lib/iomgr/tcp_server.h"
31 
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #include <string.h>
37 #include <sys/socket.h>
38 #include <sys/stat.h>
39 #include <sys/types.h>
40 #include <unistd.h>
41 
42 #include <grpc/support/alloc.h>
43 #include <grpc/support/log.h>
44 #include <grpc/support/string_util.h>
45 #include <grpc/support/sync.h>
46 #include <grpc/support/time.h>
47 
48 #include "src/core/lib/channel/channel_args.h"
49 #include "src/core/lib/gpr/string.h"
50 #include "src/core/lib/iomgr/resolve_address.h"
51 #include "src/core/lib/iomgr/sockaddr.h"
52 #include "src/core/lib/iomgr/sockaddr_utils.h"
53 #include "src/core/lib/iomgr/socket_utils_posix.h"
54 #include "src/core/lib/iomgr/tcp_posix.h"
55 #include "src/core/lib/iomgr/tcp_server_utils_posix.h"
56 #include "src/core/lib/iomgr/unix_sockets_posix.h"
57 
tcp_server_create(grpc_closure * shutdown_complete,const grpc_channel_args * args,grpc_tcp_server ** server)58 static grpc_error* tcp_server_create(grpc_closure* shutdown_complete,
59                                      const grpc_channel_args* args,
60                                      grpc_tcp_server** server) {
61   grpc_tcp_server* s =
62       static_cast<grpc_tcp_server*>(gpr_zalloc(sizeof(grpc_tcp_server)));
63   s->so_reuseport = grpc_is_socket_reuse_port_supported();
64   s->expand_wildcard_addrs = false;
65   for (size_t i = 0; i < (args == nullptr ? 0 : args->num_args); i++) {
66     if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) {
67       if (args->args[i].type == GRPC_ARG_INTEGER) {
68         s->so_reuseport = grpc_is_socket_reuse_port_supported() &&
69                           (args->args[i].value.integer != 0);
70       } else {
71         gpr_free(s);
72         return GRPC_ERROR_CREATE_FROM_STATIC_STRING(GRPC_ARG_ALLOW_REUSEPORT
73                                                     " must be an integer");
74       }
75     } else if (0 == strcmp(GRPC_ARG_EXPAND_WILDCARD_ADDRS, args->args[i].key)) {
76       if (args->args[i].type == GRPC_ARG_INTEGER) {
77         s->expand_wildcard_addrs = (args->args[i].value.integer != 0);
78       } else {
79         gpr_free(s);
80         return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
81             GRPC_ARG_EXPAND_WILDCARD_ADDRS " must be an integer");
82       }
83     }
84   }
85   gpr_ref_init(&s->refs, 1);
86   gpr_mu_init(&s->mu);
87   s->active_ports = 0;
88   s->destroyed_ports = 0;
89   s->shutdown = false;
90   s->shutdown_starting.head = nullptr;
91   s->shutdown_starting.tail = nullptr;
92   s->shutdown_complete = shutdown_complete;
93   s->on_accept_cb = nullptr;
94   s->on_accept_cb_arg = nullptr;
95   s->head = nullptr;
96   s->tail = nullptr;
97   s->nports = 0;
98   s->channel_args = grpc_channel_args_copy(args);
99   gpr_atm_no_barrier_store(&s->next_pollset_to_assign, 0);
100   *server = s;
101   return GRPC_ERROR_NONE;
102 }
103 
finish_shutdown(grpc_tcp_server * s)104 static void finish_shutdown(grpc_tcp_server* s) {
105   gpr_mu_lock(&s->mu);
106   GPR_ASSERT(s->shutdown);
107   gpr_mu_unlock(&s->mu);
108   if (s->shutdown_complete != nullptr) {
109     GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE);
110   }
111 
112   gpr_mu_destroy(&s->mu);
113 
114   while (s->head) {
115     grpc_tcp_listener* sp = s->head;
116     s->head = sp->next;
117     gpr_free(sp);
118   }
119   grpc_channel_args_destroy(s->channel_args);
120 
121   gpr_free(s);
122 }
123 
destroyed_port(void * server,grpc_error * error)124 static void destroyed_port(void* server, grpc_error* error) {
125   grpc_tcp_server* s = static_cast<grpc_tcp_server*>(server);
126   gpr_mu_lock(&s->mu);
127   s->destroyed_ports++;
128   if (s->destroyed_ports == s->nports) {
129     gpr_mu_unlock(&s->mu);
130     finish_shutdown(s);
131   } else {
132     GPR_ASSERT(s->destroyed_ports < s->nports);
133     gpr_mu_unlock(&s->mu);
134   }
135 }
136 
137 /* called when all listening endpoints have been shutdown, so no further
138    events will be received on them - at this point it's safe to destroy
139    things */
deactivated_all_ports(grpc_tcp_server * s)140 static void deactivated_all_ports(grpc_tcp_server* s) {
141   /* delete ALL the things */
142   gpr_mu_lock(&s->mu);
143 
144   GPR_ASSERT(s->shutdown);
145 
146   if (s->head) {
147     grpc_tcp_listener* sp;
148     for (sp = s->head; sp; sp = sp->next) {
149       grpc_unlink_if_unix_domain_socket(&sp->addr);
150       GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
151                         grpc_schedule_on_exec_ctx);
152       grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr,
153                      "tcp_listener_shutdown");
154     }
155     gpr_mu_unlock(&s->mu);
156   } else {
157     gpr_mu_unlock(&s->mu);
158     finish_shutdown(s);
159   }
160 }
161 
tcp_server_destroy(grpc_tcp_server * s)162 static void tcp_server_destroy(grpc_tcp_server* s) {
163   gpr_mu_lock(&s->mu);
164 
165   GPR_ASSERT(!s->shutdown);
166   s->shutdown = true;
167 
168   /* shutdown all fd's */
169   if (s->active_ports) {
170     grpc_tcp_listener* sp;
171     for (sp = s->head; sp; sp = sp->next) {
172       grpc_fd_shutdown(
173           sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server destroyed"));
174     }
175     gpr_mu_unlock(&s->mu);
176   } else {
177     gpr_mu_unlock(&s->mu);
178     deactivated_all_ports(s);
179   }
180 }
181 
182 /* event manager callback when reads are ready */
on_read(void * arg,grpc_error * err)183 static void on_read(void* arg, grpc_error* err) {
184   grpc_tcp_listener* sp = static_cast<grpc_tcp_listener*>(arg);
185   grpc_pollset* read_notifier_pollset;
186   if (err != GRPC_ERROR_NONE) {
187     goto error;
188   }
189 
190   /* loop until accept4 returns EAGAIN, and then re-arm notification */
191   for (;;) {
192     grpc_resolved_address addr;
193     char* addr_str;
194     char* name;
195     memset(&addr, 0, sizeof(addr));
196     addr.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
197     /* Note: If we ever decide to return this address to the user, remember to
198        strip off the ::ffff:0.0.0.0/96 prefix first. */
199     int fd = grpc_accept4(sp->fd, &addr, 1, 1);
200     if (fd < 0) {
201       switch (errno) {
202         case EINTR:
203           continue;
204         case EAGAIN:
205           grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
206           return;
207         default:
208           gpr_mu_lock(&sp->server->mu);
209           if (!sp->server->shutdown_listeners) {
210             gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
211           } else {
212             /* if we have shutdown listeners, accept4 could fail, and we
213                needn't notify users */
214           }
215           gpr_mu_unlock(&sp->server->mu);
216           goto error;
217       }
218     }
219 
220     grpc_set_socket_no_sigpipe_if_possible(fd);
221 
222     addr_str = grpc_sockaddr_to_uri(&addr);
223     gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
224 
225     if (grpc_tcp_trace.enabled()) {
226       gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str);
227     }
228 
229     grpc_fd* fdobj = grpc_fd_create(fd, name, true);
230 
231     read_notifier_pollset =
232         sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add(
233                                  &sp->server->next_pollset_to_assign, 1)) %
234                              sp->server->pollset_count];
235 
236     grpc_pollset_add_fd(read_notifier_pollset, fdobj);
237 
238     // Create acceptor.
239     grpc_tcp_server_acceptor* acceptor =
240         static_cast<grpc_tcp_server_acceptor*>(gpr_malloc(sizeof(*acceptor)));
241     acceptor->from_server = sp->server;
242     acceptor->port_index = sp->port_index;
243     acceptor->fd_index = sp->fd_index;
244 
245     sp->server->on_accept_cb(
246         sp->server->on_accept_cb_arg,
247         grpc_tcp_create(fdobj, sp->server->channel_args, addr_str),
248         read_notifier_pollset, acceptor);
249 
250     gpr_free(name);
251     gpr_free(addr_str);
252   }
253 
254   GPR_UNREACHABLE_CODE(return );
255 
256 error:
257   gpr_mu_lock(&sp->server->mu);
258   if (0 == --sp->server->active_ports && sp->server->shutdown) {
259     gpr_mu_unlock(&sp->server->mu);
260     deactivated_all_ports(sp->server);
261   } else {
262     gpr_mu_unlock(&sp->server->mu);
263   }
264 }
265 
266 /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
add_wildcard_addrs_to_server(grpc_tcp_server * s,unsigned port_index,int requested_port,int * out_port)267 static grpc_error* add_wildcard_addrs_to_server(grpc_tcp_server* s,
268                                                 unsigned port_index,
269                                                 int requested_port,
270                                                 int* out_port) {
271   grpc_resolved_address wild4;
272   grpc_resolved_address wild6;
273   unsigned fd_index = 0;
274   grpc_dualstack_mode dsmode;
275   grpc_tcp_listener* sp = nullptr;
276   grpc_tcp_listener* sp2 = nullptr;
277   grpc_error* v6_err = GRPC_ERROR_NONE;
278   grpc_error* v4_err = GRPC_ERROR_NONE;
279   *out_port = -1;
280 
281   if (grpc_tcp_server_have_ifaddrs() && s->expand_wildcard_addrs) {
282     return grpc_tcp_server_add_all_local_addrs(s, port_index, requested_port,
283                                                out_port);
284   }
285 
286   grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6);
287   /* Try listening on IPv6 first. */
288   if ((v6_err = grpc_tcp_server_add_addr(s, &wild6, port_index, fd_index,
289                                          &dsmode, &sp)) == GRPC_ERROR_NONE) {
290     ++fd_index;
291     requested_port = *out_port = sp->port;
292     if (dsmode == GRPC_DSMODE_DUALSTACK || dsmode == GRPC_DSMODE_IPV4) {
293       return GRPC_ERROR_NONE;
294     }
295   }
296   /* If we got a v6-only socket or nothing, try adding 0.0.0.0. */
297   grpc_sockaddr_set_port(&wild4, requested_port);
298   if ((v4_err = grpc_tcp_server_add_addr(s, &wild4, port_index, fd_index,
299                                          &dsmode, &sp2)) == GRPC_ERROR_NONE) {
300     *out_port = sp2->port;
301     if (sp != nullptr) {
302       sp2->is_sibling = 1;
303       sp->sibling = sp2;
304     }
305   }
306   if (*out_port > 0) {
307     if (v6_err != GRPC_ERROR_NONE) {
308       gpr_log(GPR_INFO,
309               "Failed to add :: listener, "
310               "the environment may not support IPv6: %s",
311               grpc_error_string(v6_err));
312       GRPC_ERROR_UNREF(v6_err);
313     }
314     if (v4_err != GRPC_ERROR_NONE) {
315       gpr_log(GPR_INFO,
316               "Failed to add 0.0.0.0 listener, "
317               "the environment may not support IPv4: %s",
318               grpc_error_string(v4_err));
319       GRPC_ERROR_UNREF(v4_err);
320     }
321     return GRPC_ERROR_NONE;
322   } else {
323     grpc_error* root_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
324         "Failed to add any wildcard listeners");
325     GPR_ASSERT(v6_err != GRPC_ERROR_NONE && v4_err != GRPC_ERROR_NONE);
326     root_err = grpc_error_add_child(root_err, v6_err);
327     root_err = grpc_error_add_child(root_err, v4_err);
328     return root_err;
329   }
330 }
331 
clone_port(grpc_tcp_listener * listener,unsigned count)332 static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) {
333   grpc_tcp_listener* sp = nullptr;
334   char* addr_str;
335   char* name;
336   grpc_error* err;
337 
338   for (grpc_tcp_listener* l = listener->next; l && l->is_sibling; l = l->next) {
339     l->fd_index += count;
340   }
341 
342   for (unsigned i = 0; i < count; i++) {
343     int fd = -1;
344     int port = -1;
345     grpc_dualstack_mode dsmode;
346     err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode,
347                                        &fd);
348     if (err != GRPC_ERROR_NONE) return err;
349     err = grpc_tcp_server_prepare_socket(listener->server, fd, &listener->addr,
350                                          true, &port);
351     if (err != GRPC_ERROR_NONE) return err;
352     listener->server->nports++;
353     grpc_sockaddr_to_string(&addr_str, &listener->addr, 1);
354     gpr_asprintf(&name, "tcp-server-listener:%s/clone-%d", addr_str, i);
355     sp = static_cast<grpc_tcp_listener*>(gpr_malloc(sizeof(grpc_tcp_listener)));
356     sp->next = listener->next;
357     listener->next = sp;
358     /* sp (the new listener) is a sibling of 'listener' (the original
359        listener). */
360     sp->is_sibling = 1;
361     sp->sibling = listener->sibling;
362     listener->sibling = sp;
363     sp->server = listener->server;
364     sp->fd = fd;
365     sp->emfd = grpc_fd_create(fd, name, true);
366     memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address));
367     sp->port = port;
368     sp->port_index = listener->port_index;
369     sp->fd_index = listener->fd_index + count - i;
370     GPR_ASSERT(sp->emfd);
371     while (listener->server->tail->next != nullptr) {
372       listener->server->tail = listener->server->tail->next;
373     }
374     gpr_free(addr_str);
375     gpr_free(name);
376   }
377 
378   return GRPC_ERROR_NONE;
379 }
380 
tcp_server_add_port(grpc_tcp_server * s,const grpc_resolved_address * addr,int * out_port)381 static grpc_error* tcp_server_add_port(grpc_tcp_server* s,
382                                        const grpc_resolved_address* addr,
383                                        int* out_port) {
384   grpc_tcp_listener* sp;
385   grpc_resolved_address sockname_temp;
386   grpc_resolved_address addr6_v4mapped;
387   int requested_port = grpc_sockaddr_get_port(addr);
388   unsigned port_index = 0;
389   grpc_dualstack_mode dsmode;
390   grpc_error* err;
391   *out_port = -1;
392   if (s->tail != nullptr) {
393     port_index = s->tail->port_index + 1;
394   }
395   grpc_unlink_if_unix_domain_socket(addr);
396 
397   /* Check if this is a wildcard port, and if so, try to keep the port the same
398      as some previously created listener. */
399   if (requested_port == 0) {
400     for (sp = s->head; sp; sp = sp->next) {
401       sockname_temp.len =
402           static_cast<socklen_t>(sizeof(struct sockaddr_storage));
403       if (0 ==
404           getsockname(sp->fd,
405                       reinterpret_cast<grpc_sockaddr*>(&sockname_temp.addr),
406                       &sockname_temp.len)) {
407         int used_port = grpc_sockaddr_get_port(&sockname_temp);
408         if (used_port > 0) {
409           memcpy(&sockname_temp, addr, sizeof(grpc_resolved_address));
410           grpc_sockaddr_set_port(&sockname_temp, used_port);
411           requested_port = used_port;
412           addr = &sockname_temp;
413           break;
414         }
415       }
416     }
417   }
418   if (grpc_sockaddr_is_wildcard(addr, &requested_port)) {
419     return add_wildcard_addrs_to_server(s, port_index, requested_port,
420                                         out_port);
421   }
422   if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
423     addr = &addr6_v4mapped;
424   }
425   if ((err = grpc_tcp_server_add_addr(s, addr, port_index, 0, &dsmode, &sp)) ==
426       GRPC_ERROR_NONE) {
427     *out_port = sp->port;
428   }
429   return err;
430 }
431 
432 /* Return listener at port_index or NULL. Should only be called with s->mu
433    locked. */
get_port_index(grpc_tcp_server * s,unsigned port_index)434 static grpc_tcp_listener* get_port_index(grpc_tcp_server* s,
435                                          unsigned port_index) {
436   unsigned num_ports = 0;
437   grpc_tcp_listener* sp;
438   for (sp = s->head; sp; sp = sp->next) {
439     if (!sp->is_sibling) {
440       if (++num_ports > port_index) {
441         return sp;
442       }
443     }
444   }
445   return nullptr;
446 }
447 
tcp_server_port_fd_count(grpc_tcp_server * s,unsigned port_index)448 unsigned tcp_server_port_fd_count(grpc_tcp_server* s, unsigned port_index) {
449   unsigned num_fds = 0;
450   gpr_mu_lock(&s->mu);
451   grpc_tcp_listener* sp = get_port_index(s, port_index);
452   for (; sp; sp = sp->sibling) {
453     ++num_fds;
454   }
455   gpr_mu_unlock(&s->mu);
456   return num_fds;
457 }
458 
tcp_server_port_fd(grpc_tcp_server * s,unsigned port_index,unsigned fd_index)459 static int tcp_server_port_fd(grpc_tcp_server* s, unsigned port_index,
460                               unsigned fd_index) {
461   gpr_mu_lock(&s->mu);
462   grpc_tcp_listener* sp = get_port_index(s, port_index);
463   for (; sp; sp = sp->sibling, --fd_index) {
464     if (fd_index == 0) {
465       gpr_mu_unlock(&s->mu);
466       return sp->fd;
467     }
468   }
469   gpr_mu_unlock(&s->mu);
470   return -1;
471 }
472 
tcp_server_start(grpc_tcp_server * s,grpc_pollset ** pollsets,size_t pollset_count,grpc_tcp_server_cb on_accept_cb,void * on_accept_cb_arg)473 static void tcp_server_start(grpc_tcp_server* s, grpc_pollset** pollsets,
474                              size_t pollset_count,
475                              grpc_tcp_server_cb on_accept_cb,
476                              void* on_accept_cb_arg) {
477   size_t i;
478   grpc_tcp_listener* sp;
479   GPR_ASSERT(on_accept_cb);
480   gpr_mu_lock(&s->mu);
481   GPR_ASSERT(!s->on_accept_cb);
482   GPR_ASSERT(s->active_ports == 0);
483   s->on_accept_cb = on_accept_cb;
484   s->on_accept_cb_arg = on_accept_cb_arg;
485   s->pollsets = pollsets;
486   s->pollset_count = pollset_count;
487   sp = s->head;
488   while (sp != nullptr) {
489     if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr) &&
490         !grpc_is_vsock(&sp->addr) && pollset_count > 1) {
491       GPR_ASSERT(GRPC_LOG_IF_ERROR(
492           "clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
493       for (i = 0; i < pollset_count; i++) {
494         grpc_pollset_add_fd(pollsets[i], sp->emfd);
495         GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
496                           grpc_schedule_on_exec_ctx);
497         grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
498         s->active_ports++;
499         sp = sp->next;
500       }
501     } else {
502       for (i = 0; i < pollset_count; i++) {
503         grpc_pollset_add_fd(pollsets[i], sp->emfd);
504       }
505       GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
506                         grpc_schedule_on_exec_ctx);
507       grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
508       s->active_ports++;
509       sp = sp->next;
510     }
511   }
512   gpr_mu_unlock(&s->mu);
513 }
514 
tcp_server_ref(grpc_tcp_server * s)515 grpc_tcp_server* tcp_server_ref(grpc_tcp_server* s) {
516   gpr_ref_non_zero(&s->refs);
517   return s;
518 }
519 
tcp_server_shutdown_starting_add(grpc_tcp_server * s,grpc_closure * shutdown_starting)520 static void tcp_server_shutdown_starting_add(grpc_tcp_server* s,
521                                              grpc_closure* shutdown_starting) {
522   gpr_mu_lock(&s->mu);
523   grpc_closure_list_append(&s->shutdown_starting, shutdown_starting,
524                            GRPC_ERROR_NONE);
525   gpr_mu_unlock(&s->mu);
526 }
527 
tcp_server_unref(grpc_tcp_server * s)528 static void tcp_server_unref(grpc_tcp_server* s) {
529   if (gpr_unref(&s->refs)) {
530     grpc_tcp_server_shutdown_listeners(s);
531     gpr_mu_lock(&s->mu);
532     GRPC_CLOSURE_LIST_SCHED(&s->shutdown_starting);
533     gpr_mu_unlock(&s->mu);
534     tcp_server_destroy(s);
535   }
536 }
537 
tcp_server_shutdown_listeners(grpc_tcp_server * s)538 static void tcp_server_shutdown_listeners(grpc_tcp_server* s) {
539   gpr_mu_lock(&s->mu);
540   s->shutdown_listeners = true;
541   /* shutdown all fd's */
542   if (s->active_ports) {
543     grpc_tcp_listener* sp;
544     for (sp = s->head; sp; sp = sp->next) {
545       grpc_fd_shutdown(sp->emfd,
546                        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"));
547     }
548   }
549   gpr_mu_unlock(&s->mu);
550 }
551 
552 grpc_tcp_server_vtable grpc_posix_tcp_server_vtable = {
553     tcp_server_create,
554     tcp_server_start,
555     tcp_server_add_port,
556     tcp_server_port_fd_count,
557     tcp_server_port_fd,
558     tcp_server_ref,
559     tcp_server_shutdown_starting_add,
560     tcp_server_unref,
561     tcp_server_shutdown_listeners};
562 #endif /* GRPC_POSIX_SOCKET_TCP_SERVER */
563