• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/atm.h>
20 #include <grpc/support/port_platform.h>
21 
22 #include "src/core/lib/iomgr/port.h"
23 
24 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON
25 
26 #include <errno.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/sync.h>
29 #include <limits.h>
30 #include <stdio.h>
31 #include <string.h>
32 #include <sys/socket.h>
33 
34 #include <string>
35 
36 #include "absl/log/check.h"
37 #include "absl/log/log.h"
38 #include "absl/strings/str_cat.h"
39 #include "src/core/lib/address_utils/sockaddr_utils.h"
40 #include "src/core/lib/iomgr/error.h"
41 #include "src/core/lib/iomgr/sockaddr.h"
42 #include "src/core/lib/iomgr/tcp_server_utils_posix.h"
43 #include "src/core/lib/iomgr/unix_sockets_posix.h"
44 #include "src/core/lib/iomgr/vsock.h"
45 #include "src/core/util/crash.h"
46 
47 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
48 
49 static gpr_once s_init_max_accept_queue_size = GPR_ONCE_INIT;
50 static int s_max_accept_queue_size;
51 
52 // get max listen queue size on linux
init_max_accept_queue_size(void)53 static void init_max_accept_queue_size(void) {
54   int n = SOMAXCONN;
55   char buf[64];
56   FILE* fp = fopen("/proc/sys/net/core/somaxconn", "r");
57   if (fp == nullptr) {
58     // 2.4 kernel.
59     s_max_accept_queue_size = SOMAXCONN;
60     return;
61   }
62   if (fgets(buf, sizeof buf, fp)) {
63     char* end;
64     long i = strtol(buf, &end, 10);
65     if (i > 0 && i <= INT_MAX && end && *end == '\n') {
66       n = static_cast<int>(i);
67     }
68   }
69   fclose(fp);
70   s_max_accept_queue_size = n;
71 
72   if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) {
73     LOG(INFO) << "Suspiciously small accept queue (" << s_max_accept_queue_size
74               << ") will probably lead to connection drops";
75   }
76 }
77 
get_max_accept_queue_size(void)78 static int get_max_accept_queue_size(void) {
79   gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size);
80   return s_max_accept_queue_size;
81 }
82 
listener_retry_timer_cb(void * arg,grpc_error_handle err)83 static void listener_retry_timer_cb(void* arg, grpc_error_handle err) {
84   // Do nothing if cancelled.
85   if (!err.ok()) return;
86   grpc_tcp_listener* listener = static_cast<grpc_tcp_listener*>(arg);
87   gpr_atm_no_barrier_store(&listener->retry_timer_armed, false);
88   if (!grpc_fd_is_shutdown(listener->emfd)) {
89     grpc_fd_set_readable(listener->emfd);
90   }
91 }
92 
grpc_tcp_server_listener_initialize_retry_timer(grpc_tcp_listener * listener)93 void grpc_tcp_server_listener_initialize_retry_timer(
94     grpc_tcp_listener* listener) {
95   gpr_atm_no_barrier_store(&listener->retry_timer_armed, false);
96   grpc_timer_init_unset(&listener->retry_timer);
97   GRPC_CLOSURE_INIT(&listener->retry_closure, listener_retry_timer_cb, listener,
98                     grpc_schedule_on_exec_ctx);
99 }
100 
add_socket_to_server(grpc_tcp_server * s,int fd,const grpc_resolved_address * addr,unsigned port_index,unsigned fd_index,grpc_tcp_listener ** listener)101 static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd,
102                                               const grpc_resolved_address* addr,
103                                               unsigned port_index,
104                                               unsigned fd_index,
105                                               grpc_tcp_listener** listener) {
106   *listener = nullptr;
107   int port = -1;
108 
109   grpc_error_handle err =
110       grpc_tcp_server_prepare_socket(s, fd, addr, s->so_reuseport, &port);
111   if (!err.ok()) return err;
112   CHECK_GT(port, 0);
113   absl::StatusOr<std::string> addr_str = grpc_sockaddr_to_string(addr, true);
114   if (!addr_str.ok()) {
115     return GRPC_ERROR_CREATE(addr_str.status().ToString());
116   }
117   std::string name = absl::StrCat("tcp-server-listener:", addr_str.value());
118   gpr_mu_lock(&s->mu);
119   s->nports++;
120   grpc_tcp_listener* sp =
121       static_cast<grpc_tcp_listener*>(gpr_malloc(sizeof(grpc_tcp_listener)));
122   sp->next = nullptr;
123   if (s->head == nullptr) {
124     s->head = sp;
125   } else {
126     s->tail->next = sp;
127   }
128   s->tail = sp;
129   sp->server = s;
130   sp->fd = fd;
131   sp->emfd = grpc_fd_create(fd, name.c_str(), true);
132   grpc_tcp_server_listener_initialize_retry_timer(sp);
133 
134   // Check and set fd as prellocated
135   if (grpc_tcp_server_pre_allocated_fd(s) == fd) {
136     grpc_fd_set_pre_allocated(sp->emfd);
137   }
138 
139   memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
140   sp->port = port;
141   sp->port_index = port_index;
142   sp->fd_index = fd_index;
143   sp->is_sibling = 0;
144   sp->sibling = nullptr;
145   CHECK(sp->emfd);
146   gpr_mu_unlock(&s->mu);
147 
148   *listener = sp;
149   return err;
150 }
151 
152 // If successful, add a listener to s for addr, set *dsmode for the socket, and
153 // return the *listener.
grpc_tcp_server_add_addr(grpc_tcp_server * s,const grpc_resolved_address * addr,unsigned port_index,unsigned fd_index,grpc_dualstack_mode * dsmode,grpc_tcp_listener ** listener)154 grpc_error_handle grpc_tcp_server_add_addr(grpc_tcp_server* s,
155                                            const grpc_resolved_address* addr,
156                                            unsigned port_index,
157                                            unsigned fd_index,
158                                            grpc_dualstack_mode* dsmode,
159                                            grpc_tcp_listener** listener) {
160   int fd;
161   fd = grpc_tcp_server_pre_allocated_fd(s);
162 
163   // Check if FD has been pre-allocated
164   if (fd > 0) {
165     int family = grpc_sockaddr_get_family(addr);
166     // Set dsmode value
167     if (family == AF_INET6) {
168       const int off = 0;
169       if (setsockopt(fd, 0, IPV6_V6ONLY, &off, sizeof(off)) == 0) {
170         *dsmode = GRPC_DSMODE_DUALSTACK;
171       } else if (!grpc_sockaddr_is_v4mapped(addr, nullptr)) {
172         *dsmode = GRPC_DSMODE_IPV6;
173       } else {
174         *dsmode = GRPC_DSMODE_IPV4;
175       }
176     } else {
177       *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE;
178     }
179 
180     grpc_resolved_address addr4_copy;
181     if (*dsmode == GRPC_DSMODE_IPV4 &&
182         grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
183       addr = &addr4_copy;
184     }
185 
186     return add_socket_to_server(s, fd, addr, port_index, fd_index, listener);
187   }
188 
189   grpc_resolved_address addr4_copy;
190   grpc_error_handle err =
191       grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, dsmode, &fd);
192   if (!err.ok()) {
193     return err;
194   }
195   if (*dsmode == GRPC_DSMODE_IPV4 &&
196       grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
197     addr = &addr4_copy;
198   }
199   return add_socket_to_server(s, fd, addr, port_index, fd_index, listener);
200 }
201 
202 // Prepare a recently-created socket for listening.
grpc_tcp_server_prepare_socket(grpc_tcp_server * s,int fd,const grpc_resolved_address * addr,bool so_reuseport,int * port)203 grpc_error_handle grpc_tcp_server_prepare_socket(
204     grpc_tcp_server* s, int fd, const grpc_resolved_address* addr,
205     bool so_reuseport, int* port) {
206   grpc_resolved_address sockname_temp;
207   grpc_error_handle err;
208 
209   CHECK_GE(fd, 0);
210 
211   if (so_reuseport && !grpc_is_unix_socket(addr) && !grpc_is_vsock(addr)) {
212     err = grpc_set_socket_reuse_port(fd, 1);
213     if (!err.ok()) goto error;
214   }
215 
216 #ifdef GRPC_LINUX_ERRQUEUE
217   err = grpc_set_socket_zerocopy(fd);
218   if (!err.ok()) {
219     // it's not fatal, so just log it.
220     VLOG(2) << "Node does not support SO_ZEROCOPY, continuing.";
221   }
222 #endif
223   err = grpc_set_socket_nonblocking(fd, 1);
224   if (!err.ok()) goto error;
225   err = grpc_set_socket_cloexec(fd, 1);
226   if (!err.ok()) goto error;
227   if (!grpc_is_unix_socket(addr) && !grpc_is_vsock(addr)) {
228     err = grpc_set_socket_low_latency(fd, 1);
229     if (!err.ok()) goto error;
230     err = grpc_set_socket_reuse_addr(fd, 1);
231     if (!err.ok()) goto error;
232     err = grpc_set_socket_dscp(fd, s->options.dscp);
233     if (!err.ok()) goto error;
234     err =
235         grpc_set_socket_tcp_user_timeout(fd, s->options, false /* is_client */);
236     if (!err.ok()) goto error;
237   }
238   err = grpc_set_socket_no_sigpipe_if_possible(fd);
239   if (!err.ok()) goto error;
240 
241   err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_SERVER_LISTENER_USAGE,
242                                           s->options);
243   if (!err.ok()) goto error;
244 
245   // Only bind/listen if fd has not been already preallocated
246   if (grpc_tcp_server_pre_allocated_fd(s) != fd) {
247     if (bind(fd,
248              reinterpret_cast<grpc_sockaddr*>(const_cast<char*>(addr->addr)),
249              addr->len) < 0) {
250       err = GRPC_OS_ERROR(errno, "bind");
251       goto error;
252     }
253 
254     if (listen(fd, get_max_accept_queue_size()) < 0) {
255       err = GRPC_OS_ERROR(errno, "listen");
256       goto error;
257     }
258   }
259 
260   sockname_temp.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
261 
262   if (getsockname(fd, reinterpret_cast<grpc_sockaddr*>(sockname_temp.addr),
263                   &sockname_temp.len) < 0) {
264     err = GRPC_OS_ERROR(errno, "getsockname");
265     goto error;
266   }
267 
268   *port = grpc_sockaddr_get_port(&sockname_temp);
269   return absl::OkStatus();
270 
271 error:
272   CHECK(!err.ok());
273   if (fd >= 0) {
274     close(fd);
275   }
276   grpc_error_handle ret = grpc_error_set_int(
277       GRPC_ERROR_CREATE_REFERENCING("Unable to configure socket", &err, 1),
278       grpc_core::StatusIntProperty::kFd, fd);
279   return ret;
280 }
281 
282 #endif  // GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON
283