1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/atm.h>
20 #include <grpc/support/port_platform.h>
21
22 #include "src/core/lib/iomgr/port.h"
23
24 #ifdef GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON
25
26 #include <errno.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/sync.h>
29 #include <limits.h>
30 #include <stdio.h>
31 #include <string.h>
32 #include <sys/socket.h>
33
34 #include <string>
35
36 #include "absl/log/check.h"
37 #include "absl/log/log.h"
38 #include "absl/strings/str_cat.h"
39 #include "src/core/lib/address_utils/sockaddr_utils.h"
40 #include "src/core/lib/iomgr/error.h"
41 #include "src/core/lib/iomgr/sockaddr.h"
42 #include "src/core/lib/iomgr/tcp_server_utils_posix.h"
43 #include "src/core/lib/iomgr/unix_sockets_posix.h"
44 #include "src/core/lib/iomgr/vsock.h"
45 #include "src/core/util/crash.h"
46
47 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
48
49 static gpr_once s_init_max_accept_queue_size = GPR_ONCE_INIT;
50 static int s_max_accept_queue_size;
51
52 // get max listen queue size on linux
init_max_accept_queue_size(void)53 static void init_max_accept_queue_size(void) {
54 int n = SOMAXCONN;
55 char buf[64];
56 FILE* fp = fopen("/proc/sys/net/core/somaxconn", "r");
57 if (fp == nullptr) {
58 // 2.4 kernel.
59 s_max_accept_queue_size = SOMAXCONN;
60 return;
61 }
62 if (fgets(buf, sizeof buf, fp)) {
63 char* end;
64 long i = strtol(buf, &end, 10);
65 if (i > 0 && i <= INT_MAX && end && *end == '\n') {
66 n = static_cast<int>(i);
67 }
68 }
69 fclose(fp);
70 s_max_accept_queue_size = n;
71
72 if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) {
73 LOG(INFO) << "Suspiciously small accept queue (" << s_max_accept_queue_size
74 << ") will probably lead to connection drops";
75 }
76 }
77
get_max_accept_queue_size(void)78 static int get_max_accept_queue_size(void) {
79 gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size);
80 return s_max_accept_queue_size;
81 }
82
listener_retry_timer_cb(void * arg,grpc_error_handle err)83 static void listener_retry_timer_cb(void* arg, grpc_error_handle err) {
84 // Do nothing if cancelled.
85 if (!err.ok()) return;
86 grpc_tcp_listener* listener = static_cast<grpc_tcp_listener*>(arg);
87 gpr_atm_no_barrier_store(&listener->retry_timer_armed, false);
88 if (!grpc_fd_is_shutdown(listener->emfd)) {
89 grpc_fd_set_readable(listener->emfd);
90 }
91 }
92
grpc_tcp_server_listener_initialize_retry_timer(grpc_tcp_listener * listener)93 void grpc_tcp_server_listener_initialize_retry_timer(
94 grpc_tcp_listener* listener) {
95 gpr_atm_no_barrier_store(&listener->retry_timer_armed, false);
96 grpc_timer_init_unset(&listener->retry_timer);
97 GRPC_CLOSURE_INIT(&listener->retry_closure, listener_retry_timer_cb, listener,
98 grpc_schedule_on_exec_ctx);
99 }
100
add_socket_to_server(grpc_tcp_server * s,int fd,const grpc_resolved_address * addr,unsigned port_index,unsigned fd_index,grpc_tcp_listener ** listener)101 static grpc_error_handle add_socket_to_server(grpc_tcp_server* s, int fd,
102 const grpc_resolved_address* addr,
103 unsigned port_index,
104 unsigned fd_index,
105 grpc_tcp_listener** listener) {
106 *listener = nullptr;
107 int port = -1;
108
109 grpc_error_handle err =
110 grpc_tcp_server_prepare_socket(s, fd, addr, s->so_reuseport, &port);
111 if (!err.ok()) return err;
112 CHECK_GT(port, 0);
113 absl::StatusOr<std::string> addr_str = grpc_sockaddr_to_string(addr, true);
114 if (!addr_str.ok()) {
115 return GRPC_ERROR_CREATE(addr_str.status().ToString());
116 }
117 std::string name = absl::StrCat("tcp-server-listener:", addr_str.value());
118 gpr_mu_lock(&s->mu);
119 s->nports++;
120 grpc_tcp_listener* sp =
121 static_cast<grpc_tcp_listener*>(gpr_malloc(sizeof(grpc_tcp_listener)));
122 sp->next = nullptr;
123 if (s->head == nullptr) {
124 s->head = sp;
125 } else {
126 s->tail->next = sp;
127 }
128 s->tail = sp;
129 sp->server = s;
130 sp->fd = fd;
131 sp->emfd = grpc_fd_create(fd, name.c_str(), true);
132 grpc_tcp_server_listener_initialize_retry_timer(sp);
133
134 // Check and set fd as prellocated
135 if (grpc_tcp_server_pre_allocated_fd(s) == fd) {
136 grpc_fd_set_pre_allocated(sp->emfd);
137 }
138
139 memcpy(&sp->addr, addr, sizeof(grpc_resolved_address));
140 sp->port = port;
141 sp->port_index = port_index;
142 sp->fd_index = fd_index;
143 sp->is_sibling = 0;
144 sp->sibling = nullptr;
145 CHECK(sp->emfd);
146 gpr_mu_unlock(&s->mu);
147
148 *listener = sp;
149 return err;
150 }
151
152 // If successful, add a listener to s for addr, set *dsmode for the socket, and
153 // return the *listener.
grpc_tcp_server_add_addr(grpc_tcp_server * s,const grpc_resolved_address * addr,unsigned port_index,unsigned fd_index,grpc_dualstack_mode * dsmode,grpc_tcp_listener ** listener)154 grpc_error_handle grpc_tcp_server_add_addr(grpc_tcp_server* s,
155 const grpc_resolved_address* addr,
156 unsigned port_index,
157 unsigned fd_index,
158 grpc_dualstack_mode* dsmode,
159 grpc_tcp_listener** listener) {
160 int fd;
161 fd = grpc_tcp_server_pre_allocated_fd(s);
162
163 // Check if FD has been pre-allocated
164 if (fd > 0) {
165 int family = grpc_sockaddr_get_family(addr);
166 // Set dsmode value
167 if (family == AF_INET6) {
168 const int off = 0;
169 if (setsockopt(fd, 0, IPV6_V6ONLY, &off, sizeof(off)) == 0) {
170 *dsmode = GRPC_DSMODE_DUALSTACK;
171 } else if (!grpc_sockaddr_is_v4mapped(addr, nullptr)) {
172 *dsmode = GRPC_DSMODE_IPV6;
173 } else {
174 *dsmode = GRPC_DSMODE_IPV4;
175 }
176 } else {
177 *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE;
178 }
179
180 grpc_resolved_address addr4_copy;
181 if (*dsmode == GRPC_DSMODE_IPV4 &&
182 grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
183 addr = &addr4_copy;
184 }
185
186 return add_socket_to_server(s, fd, addr, port_index, fd_index, listener);
187 }
188
189 grpc_resolved_address addr4_copy;
190 grpc_error_handle err =
191 grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, dsmode, &fd);
192 if (!err.ok()) {
193 return err;
194 }
195 if (*dsmode == GRPC_DSMODE_IPV4 &&
196 grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
197 addr = &addr4_copy;
198 }
199 return add_socket_to_server(s, fd, addr, port_index, fd_index, listener);
200 }
201
202 // Prepare a recently-created socket for listening.
grpc_tcp_server_prepare_socket(grpc_tcp_server * s,int fd,const grpc_resolved_address * addr,bool so_reuseport,int * port)203 grpc_error_handle grpc_tcp_server_prepare_socket(
204 grpc_tcp_server* s, int fd, const grpc_resolved_address* addr,
205 bool so_reuseport, int* port) {
206 grpc_resolved_address sockname_temp;
207 grpc_error_handle err;
208
209 CHECK_GE(fd, 0);
210
211 if (so_reuseport && !grpc_is_unix_socket(addr) && !grpc_is_vsock(addr)) {
212 err = grpc_set_socket_reuse_port(fd, 1);
213 if (!err.ok()) goto error;
214 }
215
216 #ifdef GRPC_LINUX_ERRQUEUE
217 err = grpc_set_socket_zerocopy(fd);
218 if (!err.ok()) {
219 // it's not fatal, so just log it.
220 VLOG(2) << "Node does not support SO_ZEROCOPY, continuing.";
221 }
222 #endif
223 err = grpc_set_socket_nonblocking(fd, 1);
224 if (!err.ok()) goto error;
225 err = grpc_set_socket_cloexec(fd, 1);
226 if (!err.ok()) goto error;
227 if (!grpc_is_unix_socket(addr) && !grpc_is_vsock(addr)) {
228 err = grpc_set_socket_low_latency(fd, 1);
229 if (!err.ok()) goto error;
230 err = grpc_set_socket_reuse_addr(fd, 1);
231 if (!err.ok()) goto error;
232 err = grpc_set_socket_dscp(fd, s->options.dscp);
233 if (!err.ok()) goto error;
234 err =
235 grpc_set_socket_tcp_user_timeout(fd, s->options, false /* is_client */);
236 if (!err.ok()) goto error;
237 }
238 err = grpc_set_socket_no_sigpipe_if_possible(fd);
239 if (!err.ok()) goto error;
240
241 err = grpc_apply_socket_mutator_in_args(fd, GRPC_FD_SERVER_LISTENER_USAGE,
242 s->options);
243 if (!err.ok()) goto error;
244
245 // Only bind/listen if fd has not been already preallocated
246 if (grpc_tcp_server_pre_allocated_fd(s) != fd) {
247 if (bind(fd,
248 reinterpret_cast<grpc_sockaddr*>(const_cast<char*>(addr->addr)),
249 addr->len) < 0) {
250 err = GRPC_OS_ERROR(errno, "bind");
251 goto error;
252 }
253
254 if (listen(fd, get_max_accept_queue_size()) < 0) {
255 err = GRPC_OS_ERROR(errno, "listen");
256 goto error;
257 }
258 }
259
260 sockname_temp.len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
261
262 if (getsockname(fd, reinterpret_cast<grpc_sockaddr*>(sockname_temp.addr),
263 &sockname_temp.len) < 0) {
264 err = GRPC_OS_ERROR(errno, "getsockname");
265 goto error;
266 }
267
268 *port = grpc_sockaddr_get_port(&sockname_temp);
269 return absl::OkStatus();
270
271 error:
272 CHECK(!err.ok());
273 if (fd >= 0) {
274 close(fd);
275 }
276 grpc_error_handle ret = grpc_error_set_int(
277 GRPC_ERROR_CREATE_REFERENCING("Unable to configure socket", &err, 1),
278 grpc_core::StatusIntProperty::kFd, fd);
279 return ret;
280 }
281
282 #endif // GRPC_POSIX_SOCKET_TCP_SERVER_UTILS_COMMON
283