• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h"
16 
17 #include <grpc/event_engine/event_engine.h>
18 #include <grpc/support/port_platform.h>
19 #include <limits.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 
23 #include <cstring>
24 #include <string>
25 
26 #include "absl/cleanup/cleanup.h"
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "absl/status/status.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/str_replace.h"
32 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
33 #include "src/core/lib/event_engine/tcp_socket_utils.h"
34 #include "src/core/lib/iomgr/port.h"
35 #include "src/core/lib/iomgr/socket_mutator.h"
36 #include "src/core/util/crash.h"  // IWYU pragma: keep
37 #include "src/core/util/status_helper.h"
38 
39 #define MIN_SAFE_ACCEPT_QUEUE_SIZE 100
40 
41 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
42 #include <errno.h>       // IWYU pragma: keep
43 #include <ifaddrs.h>     // IWYU pragma: keep
44 #include <netinet/in.h>  // IWYU pragma: keep
45 #include <sys/socket.h>  // IWYU pragma: keep
46 #include <unistd.h>      // IWYU pragma: keep
47 #endif
48 
49 namespace grpc_event_engine {
50 namespace experimental {
51 
52 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
53 
54 namespace {
55 
56 using ResolvedAddress =
57     grpc_event_engine::experimental::EventEngine::ResolvedAddress;
58 using ListenerSocket = ListenerSocketsContainer::ListenerSocket;
59 
60 #ifdef GRPC_HAVE_IFADDRS
61 
62 // Bind to "::" to get a port number not used by any address.
GetUnusedPort()63 absl::StatusOr<int> GetUnusedPort() {
64   ResolvedAddress wild = ResolvedAddressMakeWild6(0);
65   PosixSocketWrapper::DSMode dsmode;
66   auto sock = PosixSocketWrapper::CreateDualStackSocket(nullptr, wild,
67                                                         SOCK_STREAM, 0, dsmode);
68   GRPC_RETURN_IF_ERROR(sock.status());
69   if (dsmode == PosixSocketWrapper::DSMode::DSMODE_IPV4) {
70     wild = ResolvedAddressMakeWild4(0);
71   }
72   if (bind(sock->Fd(), wild.address(), wild.size()) != 0) {
73     close(sock->Fd());
74     return absl::FailedPreconditionError(
75         absl::StrCat("bind(GetUnusedPort): ", std::strerror(errno)));
76   }
77   socklen_t len = wild.size();
78   if (getsockname(sock->Fd(), const_cast<sockaddr*>(wild.address()), &len) !=
79       0) {
80     close(sock->Fd());
81     return absl::FailedPreconditionError(
82         absl::StrCat("getsockname(GetUnusedPort): ", std::strerror(errno)));
83   }
84   close(sock->Fd());
85   int port = ResolvedAddressGetPort(wild);
86   if (port <= 0) {
87     return absl::FailedPreconditionError("Bad port");
88   }
89   return port;
90 }
91 
SystemHasIfAddrs()92 bool SystemHasIfAddrs() { return true; }
93 
94 #else  // GRPC_HAVE_IFADDRS
95 
SystemHasIfAddrs()96 bool SystemHasIfAddrs() { return false; }
97 
98 #endif  // GRPC_HAVE_IFADDRS
99 
100 // get max listen queue size on linux
InitMaxAcceptQueueSize()101 int InitMaxAcceptQueueSize() {
102   int n = SOMAXCONN;
103   char buf[64];
104   FILE* fp = fopen("/proc/sys/net/core/somaxconn", "r");
105   int max_accept_queue_size;
106   if (fp == nullptr) {
107     // 2.4 kernel.
108     return SOMAXCONN;
109   }
110   if (fgets(buf, sizeof buf, fp)) {
111     char* end;
112     long i = strtol(buf, &end, 10);
113     if (i > 0 && i <= INT_MAX && end && *end == '\n') {
114       n = static_cast<int>(i);
115     }
116   }
117   fclose(fp);
118   max_accept_queue_size = n;
119 
120   if (max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) {
121     LOG(INFO) << "Suspiciously small accept queue (" << max_accept_queue_size
122               << ") will probably lead to connection drops";
123   }
124   return max_accept_queue_size;
125 }
126 
GetMaxAcceptQueueSize()127 int GetMaxAcceptQueueSize() {
128   static const int kMaxAcceptQueueSize = InitMaxAcceptQueueSize();
129   return kMaxAcceptQueueSize;
130 }
131 
132 // Prepare a recently-created socket for listening.
PrepareSocket(const PosixTcpOptions & options,ListenerSocket & socket)133 absl::Status PrepareSocket(const PosixTcpOptions& options,
134                            ListenerSocket& socket) {
135   ResolvedAddress sockname_temp;
136   int fd = socket.sock.Fd();
137   CHECK_GE(fd, 0);
138   bool close_fd = true;
139   socket.zero_copy_enabled = false;
140   socket.port = 0;
141   auto sock_cleanup = absl::MakeCleanup([&close_fd, fd]() -> void {
142     if (close_fd && fd >= 0) {
143       close(fd);
144     }
145   });
146   if (PosixSocketWrapper::IsSocketReusePortSupported() &&
147       options.allow_reuse_port && socket.addr.address()->sa_family != AF_UNIX &&
148       !ResolvedAddressIsVSock(socket.addr)) {
149     GRPC_RETURN_IF_ERROR(socket.sock.SetSocketReusePort(1));
150   }
151 
152 #ifdef GRPC_LINUX_ERRQUEUE
153   if (!socket.sock.SetSocketZeroCopy().ok()) {
154     // it's not fatal, so just log it.
155     VLOG(2) << "Node does not support SO_ZEROCOPY, continuing.";
156   } else {
157     socket.zero_copy_enabled = true;
158   }
159 #endif
160 
161   GRPC_RETURN_IF_ERROR(socket.sock.SetSocketNonBlocking(1));
162   GRPC_RETURN_IF_ERROR(socket.sock.SetSocketCloexec(1));
163 
164   if (socket.addr.address()->sa_family != AF_UNIX &&
165       !ResolvedAddressIsVSock(socket.addr)) {
166     GRPC_RETURN_IF_ERROR(socket.sock.SetSocketLowLatency(1));
167     GRPC_RETURN_IF_ERROR(socket.sock.SetSocketReuseAddr(1));
168     GRPC_RETURN_IF_ERROR(socket.sock.SetSocketDscp(options.dscp));
169     socket.sock.TrySetSocketTcpUserTimeout(options, false);
170   }
171   GRPC_RETURN_IF_ERROR(socket.sock.SetSocketNoSigpipeIfPossible());
172   GRPC_RETURN_IF_ERROR(socket.sock.ApplySocketMutatorInOptions(
173       GRPC_FD_SERVER_LISTENER_USAGE, options));
174 
175   if (bind(fd, socket.addr.address(), socket.addr.size()) < 0) {
176     auto sockaddr_str = ResolvedAddressToString(socket.addr);
177     if (!sockaddr_str.ok()) {
178       LOG(ERROR) << "Could not convert sockaddr to string: "
179                  << sockaddr_str.status();
180       sockaddr_str = "<unparsable>";
181     }
182     sockaddr_str = absl::StrReplaceAll(*sockaddr_str, {{"\0", "@"}});
183     return absl::FailedPreconditionError(
184         absl::StrCat("Error in bind for address '", *sockaddr_str,
185                      "': ", std::strerror(errno)));
186   }
187 
188   if (listen(fd, GetMaxAcceptQueueSize()) < 0) {
189     return absl::FailedPreconditionError(
190         absl::StrCat("Error in listen: ", std::strerror(errno)));
191   }
192   socklen_t len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
193 
194   if (getsockname(fd, const_cast<sockaddr*>(sockname_temp.address()), &len) <
195       0) {
196     return absl::FailedPreconditionError(
197         absl::StrCat("Error in getsockname: ", std::strerror(errno)));
198   }
199 
200   socket.port =
201       ResolvedAddressGetPort(ResolvedAddress(sockname_temp.address(), len));
202   // No errors. Set close_fd to false to ensure the socket is not closed.
203   close_fd = false;
204   return absl::OkStatus();
205 }
206 
207 }  // namespace
208 
CreateAndPrepareListenerSocket(const PosixTcpOptions & options,const ResolvedAddress & addr)209 absl::StatusOr<ListenerSocket> CreateAndPrepareListenerSocket(
210     const PosixTcpOptions& options, const ResolvedAddress& addr) {
211   ResolvedAddress addr4_copy;
212   ListenerSocket socket;
213   auto result = PosixSocketWrapper::CreateDualStackSocket(
214       nullptr, addr, SOCK_STREAM, 0, socket.dsmode);
215   if (!result.ok()) {
216     return result.status();
217   }
218   socket.sock = *result;
219   if (socket.dsmode == PosixSocketWrapper::DSMODE_IPV4 &&
220       ResolvedAddressIsV4Mapped(addr, &addr4_copy)) {
221     socket.addr = addr4_copy;
222   } else {
223     socket.addr = addr;
224   }
225   GRPC_RETURN_IF_ERROR(PrepareSocket(options, socket));
226   CHECK_GT(socket.port, 0);
227   return socket;
228 }
229 
ListenerContainerAddAllLocalAddresses(ListenerSocketsContainer & listener_sockets,const PosixTcpOptions & options,int requested_port)230 absl::StatusOr<int> ListenerContainerAddAllLocalAddresses(
231     ListenerSocketsContainer& listener_sockets, const PosixTcpOptions& options,
232     int requested_port) {
233 #ifdef GRPC_HAVE_IFADDRS
234   absl::Status op_status = absl::OkStatus();
235   struct ifaddrs* ifa = nullptr;
236   struct ifaddrs* ifa_it;
237   bool no_local_addresses = true;
238   int assigned_port = 0;
239   if (requested_port == 0) {
240     auto result = GetUnusedPort();
241     GRPC_RETURN_IF_ERROR(result.status());
242     requested_port = *result;
243     VLOG(2) << "Picked unused port " << requested_port;
244   }
245   if (getifaddrs(&ifa) != 0 || ifa == nullptr) {
246     return absl::FailedPreconditionError(
247         absl::StrCat("getifaddrs: ", std::strerror(errno)));
248   }
249 
250   static const bool is_ipv4_available = [] {
251     const int fd = socket(AF_INET, SOCK_DGRAM, 0);
252     if (fd >= 0) close(fd);
253     return fd >= 0;
254   }();
255 
256   for (ifa_it = ifa; ifa_it != nullptr; ifa_it = ifa_it->ifa_next) {
257     ResolvedAddress addr;
258     socklen_t len;
259     const char* ifa_name = (ifa_it->ifa_name ? ifa_it->ifa_name : "<unknown>");
260     if (ifa_it->ifa_addr == nullptr) {
261       continue;
262     } else if (ifa_it->ifa_addr->sa_family == AF_INET) {
263       if (!is_ipv4_available) {
264         continue;
265       }
266       len = static_cast<socklen_t>(sizeof(sockaddr_in));
267     } else if (ifa_it->ifa_addr->sa_family == AF_INET6) {
268       len = static_cast<socklen_t>(sizeof(sockaddr_in6));
269     } else {
270       continue;
271     }
272     addr = EventEngine::ResolvedAddress(ifa_it->ifa_addr, len);
273     ResolvedAddressSetPort(addr, requested_port);
274     std::string addr_str = *ResolvedAddressToString(addr);
275     VLOG(2) << absl::StrFormat(
276         "Adding local addr from interface %s flags 0x%x to server: %s",
277         ifa_name, ifa_it->ifa_flags, addr_str.c_str());
278     // We could have multiple interfaces with the same address (e.g.,
279     // bonding), so look for duplicates.
280     if (listener_sockets.Find(addr).ok()) {
281       VLOG(2) << "Skipping duplicate addr " << addr_str << " on interface "
282               << ifa_name;
283       continue;
284     }
285     auto result = CreateAndPrepareListenerSocket(options, addr);
286     if (!result.ok()) {
287       op_status = absl::FailedPreconditionError(
288           absl::StrCat("Failed to add listener: ", addr_str,
289                        " due to error: ", result.status().message()));
290       break;
291     } else {
292       listener_sockets.Append(*result);
293       assigned_port = result->port;
294       no_local_addresses = false;
295     }
296   }
297   freeifaddrs(ifa);
298   GRPC_RETURN_IF_ERROR(op_status);
299   if (no_local_addresses) {
300     return absl::FailedPreconditionError("No local addresses");
301   }
302   return assigned_port;
303 
304 #else
305   (void)listener_sockets;
306   (void)options;
307   (void)requested_port;
308   grpc_core::Crash("System does not support ifaddrs");
309 #endif
310 }
311 
ListenerContainerAddWildcardAddresses(ListenerSocketsContainer & listener_sockets,const PosixTcpOptions & options,int requested_port)312 absl::StatusOr<int> ListenerContainerAddWildcardAddresses(
313     ListenerSocketsContainer& listener_sockets, const PosixTcpOptions& options,
314     int requested_port) {
315   ResolvedAddress wild4 = ResolvedAddressMakeWild4(requested_port);
316   ResolvedAddress wild6 = ResolvedAddressMakeWild6(requested_port);
317   absl::StatusOr<ListenerSocket> v6_sock;
318   absl::StatusOr<ListenerSocket> v4_sock;
319   int assigned_port = 0;
320 
321   if (SystemHasIfAddrs() && options.expand_wildcard_addrs) {
322     return ListenerContainerAddAllLocalAddresses(listener_sockets, options,
323                                                  requested_port);
324   }
325 
326   // Try listening on IPv6 first.
327   v6_sock = CreateAndPrepareListenerSocket(options, wild6);
328   if (v6_sock.ok()) {
329     listener_sockets.Append(*v6_sock);
330     requested_port = v6_sock->port;
331     assigned_port = v6_sock->port;
332     if (v6_sock->dsmode == PosixSocketWrapper::DSMODE_DUALSTACK ||
333         v6_sock->dsmode == PosixSocketWrapper::DSMODE_IPV4) {
334       return v6_sock->port;
335     }
336   }
337   // If we got a v6-only socket or nothing, try adding 0.0.0.0.
338   ResolvedAddressSetPort(wild4, requested_port);
339   v4_sock = CreateAndPrepareListenerSocket(options, wild4);
340   if (v4_sock.ok()) {
341     assigned_port = v4_sock->port;
342     listener_sockets.Append(*v4_sock);
343   }
344   if (assigned_port > 0) {
345     if (!v6_sock.ok()) {
346       VLOG(2) << "Failed to add :: listener, the environment may not support "
347                  "IPv6: "
348               << v6_sock.status();
349     }
350     if (!v4_sock.ok()) {
351       VLOG(2) << "Failed to add 0.0.0.0 listener, "
352                  "the environment may not support IPv4: "
353               << v4_sock.status();
354     }
355     return assigned_port;
356   } else {
357     CHECK(!v6_sock.ok());
358     CHECK(!v4_sock.ok());
359     return absl::FailedPreconditionError(absl::StrCat(
360         "Failed to add any wildcard listeners: ", v6_sock.status().message(),
361         v4_sock.status().message()));
362   }
363 }
364 
365 #else  // GRPC_POSIX_SOCKET_UTILS_COMMON
366 
367 absl::StatusOr<ListenerSocketsContainer::ListenerSocket>
368 CreateAndPrepareListenerSocket(const PosixTcpOptions& /*options*/,
369                                const grpc_event_engine::experimental::
370                                    EventEngine::ResolvedAddress& /*addr*/) {
371   grpc_core::Crash(
372       "CreateAndPrepareListenerSocket is not supported on this platform");
373 }
374 
375 absl::StatusOr<int> ListenerContainerAddWildcardAddresses(
376     ListenerSocketsContainer& /*listener_sockets*/,
377     const PosixTcpOptions& /*options*/, int /*requested_port*/) {
378   grpc_core::Crash(
379       "ListenerContainerAddWildcardAddresses is not supported on this "
380       "platform");
381 }
382 
383 absl::StatusOr<int> ListenerContainerAddAllLocalAddresses(
384     ListenerSocketsContainer& /*listener_sockets*/,
385     const PosixTcpOptions& /*options*/, int /*requested_port*/) {
386   grpc_core::Crash(
387       "ListenerContainerAddAllLocalAddresses is not supported on this "
388       "platform");
389 }
390 
391 #endif  // GRPC_POSIX_SOCKET_UTILS_COMMON
392 
393 }  // namespace experimental
394 }  // namespace grpc_event_engine
395