1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h"
16
17 #include <grpc/event_engine/event_engine.h>
18 #include <grpc/support/port_platform.h>
19 #include <limits.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22
23 #include <cstring>
24 #include <string>
25
26 #include "absl/cleanup/cleanup.h"
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "absl/status/status.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/str_replace.h"
32 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
33 #include "src/core/lib/event_engine/tcp_socket_utils.h"
34 #include "src/core/lib/iomgr/port.h"
35 #include "src/core/lib/iomgr/socket_mutator.h"
36 #include "src/core/util/crash.h" // IWYU pragma: keep
37 #include "src/core/util/status_helper.h"
38
39 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
40
41 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
42 #include <errno.h> // IWYU pragma: keep
43 #include <ifaddrs.h> // IWYU pragma: keep
44 #include <netinet/in.h> // IWYU pragma: keep
45 #include <sys/socket.h> // IWYU pragma: keep
46 #include <unistd.h> // IWYU pragma: keep
47 #endif
48
49 namespace grpc_event_engine {
50 namespace experimental {
51
52 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
53
54 namespace {
55
56 using ResolvedAddress =
57 grpc_event_engine::experimental::EventEngine::ResolvedAddress;
58 using ListenerSocket = ListenerSocketsContainer::ListenerSocket;
59
60 #ifdef GRPC_HAVE_IFADDRS
61
62 // Bind to "::" to get a port number not used by any address.
GetUnusedPort()63 absl::StatusOr<int> GetUnusedPort() {
64 ResolvedAddress wild = ResolvedAddressMakeWild6(0);
65 PosixSocketWrapper::DSMode dsmode;
66 auto sock = PosixSocketWrapper::CreateDualStackSocket(nullptr, wild,
67 SOCK_STREAM, 0, dsmode);
68 GRPC_RETURN_IF_ERROR(sock.status());
69 if (dsmode == PosixSocketWrapper::DSMode::DSMODE_IPV4) {
70 wild = ResolvedAddressMakeWild4(0);
71 }
72 if (bind(sock->Fd(), wild.address(), wild.size()) != 0) {
73 close(sock->Fd());
74 return absl::FailedPreconditionError(
75 absl::StrCat("bind(GetUnusedPort): ", std::strerror(errno)));
76 }
77 socklen_t len = wild.size();
78 if (getsockname(sock->Fd(), const_cast<sockaddr*>(wild.address()), &len) !=
79 0) {
80 close(sock->Fd());
81 return absl::FailedPreconditionError(
82 absl::StrCat("getsockname(GetUnusedPort): ", std::strerror(errno)));
83 }
84 close(sock->Fd());
85 int port = ResolvedAddressGetPort(wild);
86 if (port <= 0) {
87 return absl::FailedPreconditionError("Bad port");
88 }
89 return port;
90 }
91
SystemHasIfAddrs()92 bool SystemHasIfAddrs() { return true; }
93
94 #else // GRPC_HAVE_IFADDRS
95
SystemHasIfAddrs()96 bool SystemHasIfAddrs() { return false; }
97
98 #endif // GRPC_HAVE_IFADDRS
99
100 // get max listen queue size on linux
InitMaxAcceptQueueSize()101 int InitMaxAcceptQueueSize() {
102 int n = SOMAXCONN;
103 char buf[64];
104 FILE* fp = fopen("/proc/sys/net/core/somaxconn", "r");
105 int max_accept_queue_size;
106 if (fp == nullptr) {
107 // 2.4 kernel.
108 return SOMAXCONN;
109 }
110 if (fgets(buf, sizeof buf, fp)) {
111 char* end;
112 long i = strtol(buf, &end, 10);
113 if (i > 0 && i <= INT_MAX && end && *end == '\n') {
114 n = static_cast<int>(i);
115 }
116 }
117 fclose(fp);
118 max_accept_queue_size = n;
119
120 if (max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) {
121 LOG(INFO) << "Suspiciously small accept queue (" << max_accept_queue_size
122 << ") will probably lead to connection drops";
123 }
124 return max_accept_queue_size;
125 }
126
GetMaxAcceptQueueSize()127 int GetMaxAcceptQueueSize() {
128 static const int kMaxAcceptQueueSize = InitMaxAcceptQueueSize();
129 return kMaxAcceptQueueSize;
130 }
131
132 // Prepare a recently-created socket for listening.
PrepareSocket(const PosixTcpOptions & options,ListenerSocket & socket)133 absl::Status PrepareSocket(const PosixTcpOptions& options,
134 ListenerSocket& socket) {
135 ResolvedAddress sockname_temp;
136 int fd = socket.sock.Fd();
137 CHECK_GE(fd, 0);
138 bool close_fd = true;
139 socket.zero_copy_enabled = false;
140 socket.port = 0;
141 auto sock_cleanup = absl::MakeCleanup([&close_fd, fd]() -> void {
142 if (close_fd && fd >= 0) {
143 close(fd);
144 }
145 });
146 if (PosixSocketWrapper::IsSocketReusePortSupported() &&
147 options.allow_reuse_port && socket.addr.address()->sa_family != AF_UNIX &&
148 !ResolvedAddressIsVSock(socket.addr)) {
149 GRPC_RETURN_IF_ERROR(socket.sock.SetSocketReusePort(1));
150 }
151
152 #ifdef GRPC_LINUX_ERRQUEUE
153 if (!socket.sock.SetSocketZeroCopy().ok()) {
154 // it's not fatal, so just log it.
155 VLOG(2) << "Node does not support SO_ZEROCOPY, continuing.";
156 } else {
157 socket.zero_copy_enabled = true;
158 }
159 #endif
160
161 GRPC_RETURN_IF_ERROR(socket.sock.SetSocketNonBlocking(1));
162 GRPC_RETURN_IF_ERROR(socket.sock.SetSocketCloexec(1));
163
164 if (socket.addr.address()->sa_family != AF_UNIX &&
165 !ResolvedAddressIsVSock(socket.addr)) {
166 GRPC_RETURN_IF_ERROR(socket.sock.SetSocketLowLatency(1));
167 GRPC_RETURN_IF_ERROR(socket.sock.SetSocketReuseAddr(1));
168 GRPC_RETURN_IF_ERROR(socket.sock.SetSocketDscp(options.dscp));
169 socket.sock.TrySetSocketTcpUserTimeout(options, false);
170 }
171 GRPC_RETURN_IF_ERROR(socket.sock.SetSocketNoSigpipeIfPossible());
172 GRPC_RETURN_IF_ERROR(socket.sock.ApplySocketMutatorInOptions(
173 GRPC_FD_SERVER_LISTENER_USAGE, options));
174
175 if (bind(fd, socket.addr.address(), socket.addr.size()) < 0) {
176 auto sockaddr_str = ResolvedAddressToString(socket.addr);
177 if (!sockaddr_str.ok()) {
178 LOG(ERROR) << "Could not convert sockaddr to string: "
179 << sockaddr_str.status();
180 sockaddr_str = "<unparsable>";
181 }
182 sockaddr_str = absl::StrReplaceAll(*sockaddr_str, {{"\0", "@"}});
183 return absl::FailedPreconditionError(
184 absl::StrCat("Error in bind for address '", *sockaddr_str,
185 "': ", std::strerror(errno)));
186 }
187
188 if (listen(fd, GetMaxAcceptQueueSize()) < 0) {
189 return absl::FailedPreconditionError(
190 absl::StrCat("Error in listen: ", std::strerror(errno)));
191 }
192 socklen_t len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
193
194 if (getsockname(fd, const_cast<sockaddr*>(sockname_temp.address()), &len) <
195 0) {
196 return absl::FailedPreconditionError(
197 absl::StrCat("Error in getsockname: ", std::strerror(errno)));
198 }
199
200 socket.port =
201 ResolvedAddressGetPort(ResolvedAddress(sockname_temp.address(), len));
202 // No errors. Set close_fd to false to ensure the socket is not closed.
203 close_fd = false;
204 return absl::OkStatus();
205 }
206
207 } // namespace
208
CreateAndPrepareListenerSocket(const PosixTcpOptions & options,const ResolvedAddress & addr)209 absl::StatusOr<ListenerSocket> CreateAndPrepareListenerSocket(
210 const PosixTcpOptions& options, const ResolvedAddress& addr) {
211 ResolvedAddress addr4_copy;
212 ListenerSocket socket;
213 auto result = PosixSocketWrapper::CreateDualStackSocket(
214 nullptr, addr, SOCK_STREAM, 0, socket.dsmode);
215 if (!result.ok()) {
216 return result.status();
217 }
218 socket.sock = *result;
219 if (socket.dsmode == PosixSocketWrapper::DSMODE_IPV4 &&
220 ResolvedAddressIsV4Mapped(addr, &addr4_copy)) {
221 socket.addr = addr4_copy;
222 } else {
223 socket.addr = addr;
224 }
225 GRPC_RETURN_IF_ERROR(PrepareSocket(options, socket));
226 CHECK_GT(socket.port, 0);
227 return socket;
228 }
229
ListenerContainerAddAllLocalAddresses(ListenerSocketsContainer & listener_sockets,const PosixTcpOptions & options,int requested_port)230 absl::StatusOr<int> ListenerContainerAddAllLocalAddresses(
231 ListenerSocketsContainer& listener_sockets, const PosixTcpOptions& options,
232 int requested_port) {
233 #ifdef GRPC_HAVE_IFADDRS
234 absl::Status op_status = absl::OkStatus();
235 struct ifaddrs* ifa = nullptr;
236 struct ifaddrs* ifa_it;
237 bool no_local_addresses = true;
238 int assigned_port = 0;
239 if (requested_port == 0) {
240 auto result = GetUnusedPort();
241 GRPC_RETURN_IF_ERROR(result.status());
242 requested_port = *result;
243 VLOG(2) << "Picked unused port " << requested_port;
244 }
245 if (getifaddrs(&ifa) != 0 || ifa == nullptr) {
246 return absl::FailedPreconditionError(
247 absl::StrCat("getifaddrs: ", std::strerror(errno)));
248 }
249
250 static const bool is_ipv4_available = [] {
251 const int fd = socket(AF_INET, SOCK_DGRAM, 0);
252 if (fd >= 0) close(fd);
253 return fd >= 0;
254 }();
255
256 for (ifa_it = ifa; ifa_it != nullptr; ifa_it = ifa_it->ifa_next) {
257 ResolvedAddress addr;
258 socklen_t len;
259 const char* ifa_name = (ifa_it->ifa_name ? ifa_it->ifa_name : "<unknown>");
260 if (ifa_it->ifa_addr == nullptr) {
261 continue;
262 } else if (ifa_it->ifa_addr->sa_family == AF_INET) {
263 if (!is_ipv4_available) {
264 continue;
265 }
266 len = static_cast<socklen_t>(sizeof(sockaddr_in));
267 } else if (ifa_it->ifa_addr->sa_family == AF_INET6) {
268 len = static_cast<socklen_t>(sizeof(sockaddr_in6));
269 } else {
270 continue;
271 }
272 addr = EventEngine::ResolvedAddress(ifa_it->ifa_addr, len);
273 ResolvedAddressSetPort(addr, requested_port);
274 std::string addr_str = *ResolvedAddressToString(addr);
275 VLOG(2) << absl::StrFormat(
276 "Adding local addr from interface %s flags 0x%x to server: %s",
277 ifa_name, ifa_it->ifa_flags, addr_str.c_str());
278 // We could have multiple interfaces with the same address (e.g.,
279 // bonding), so look for duplicates.
280 if (listener_sockets.Find(addr).ok()) {
281 VLOG(2) << "Skipping duplicate addr " << addr_str << " on interface "
282 << ifa_name;
283 continue;
284 }
285 auto result = CreateAndPrepareListenerSocket(options, addr);
286 if (!result.ok()) {
287 op_status = absl::FailedPreconditionError(
288 absl::StrCat("Failed to add listener: ", addr_str,
289 " due to error: ", result.status().message()));
290 break;
291 } else {
292 listener_sockets.Append(*result);
293 assigned_port = result->port;
294 no_local_addresses = false;
295 }
296 }
297 freeifaddrs(ifa);
298 GRPC_RETURN_IF_ERROR(op_status);
299 if (no_local_addresses) {
300 return absl::FailedPreconditionError("No local addresses");
301 }
302 return assigned_port;
303
304 #else
305 (void)listener_sockets;
306 (void)options;
307 (void)requested_port;
308 grpc_core::Crash("System does not support ifaddrs");
309 #endif
310 }
311
ListenerContainerAddWildcardAddresses(ListenerSocketsContainer & listener_sockets,const PosixTcpOptions & options,int requested_port)312 absl::StatusOr<int> ListenerContainerAddWildcardAddresses(
313 ListenerSocketsContainer& listener_sockets, const PosixTcpOptions& options,
314 int requested_port) {
315 ResolvedAddress wild4 = ResolvedAddressMakeWild4(requested_port);
316 ResolvedAddress wild6 = ResolvedAddressMakeWild6(requested_port);
317 absl::StatusOr<ListenerSocket> v6_sock;
318 absl::StatusOr<ListenerSocket> v4_sock;
319 int assigned_port = 0;
320
321 if (SystemHasIfAddrs() && options.expand_wildcard_addrs) {
322 return ListenerContainerAddAllLocalAddresses(listener_sockets, options,
323 requested_port);
324 }
325
326 // Try listening on IPv6 first.
327 v6_sock = CreateAndPrepareListenerSocket(options, wild6);
328 if (v6_sock.ok()) {
329 listener_sockets.Append(*v6_sock);
330 requested_port = v6_sock->port;
331 assigned_port = v6_sock->port;
332 if (v6_sock->dsmode == PosixSocketWrapper::DSMODE_DUALSTACK ||
333 v6_sock->dsmode == PosixSocketWrapper::DSMODE_IPV4) {
334 return v6_sock->port;
335 }
336 }
337 // If we got a v6-only socket or nothing, try adding 0.0.0.0.
338 ResolvedAddressSetPort(wild4, requested_port);
339 v4_sock = CreateAndPrepareListenerSocket(options, wild4);
340 if (v4_sock.ok()) {
341 assigned_port = v4_sock->port;
342 listener_sockets.Append(*v4_sock);
343 }
344 if (assigned_port > 0) {
345 if (!v6_sock.ok()) {
346 VLOG(2) << "Failed to add :: listener, the environment may not support "
347 "IPv6: "
348 << v6_sock.status();
349 }
350 if (!v4_sock.ok()) {
351 VLOG(2) << "Failed to add 0.0.0.0 listener, "
352 "the environment may not support IPv4: "
353 << v4_sock.status();
354 }
355 return assigned_port;
356 } else {
357 CHECK(!v6_sock.ok());
358 CHECK(!v4_sock.ok());
359 return absl::FailedPreconditionError(absl::StrCat(
360 "Failed to add any wildcard listeners: ", v6_sock.status().message(),
361 v4_sock.status().message()));
362 }
363 }
364
365 #else // GRPC_POSIX_SOCKET_UTILS_COMMON
366
367 absl::StatusOr<ListenerSocketsContainer::ListenerSocket>
368 CreateAndPrepareListenerSocket(const PosixTcpOptions& /*options*/,
369 const grpc_event_engine::experimental::
370 EventEngine::ResolvedAddress& /*addr*/) {
371 grpc_core::Crash(
372 "CreateAndPrepareListenerSocket is not supported on this platform");
373 }
374
375 absl::StatusOr<int> ListenerContainerAddWildcardAddresses(
376 ListenerSocketsContainer& /*listener_sockets*/,
377 const PosixTcpOptions& /*options*/, int /*requested_port*/) {
378 grpc_core::Crash(
379 "ListenerContainerAddWildcardAddresses is not supported on this "
380 "platform");
381 }
382
383 absl::StatusOr<int> ListenerContainerAddAllLocalAddresses(
384 ListenerSocketsContainer& /*listener_sockets*/,
385 const PosixTcpOptions& /*options*/, int /*requested_port*/) {
386 grpc_core::Crash(
387 "ListenerContainerAddAllLocalAddresses is not supported on this "
388 "platform");
389 }
390
391 #endif // GRPC_POSIX_SOCKET_UTILS_COMMON
392
393 } // namespace experimental
394 } // namespace grpc_event_engine
395