• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
16 
17 #include <errno.h>
18 #include <grpc/event_engine/event_engine.h>
19 #include <grpc/event_engine/memory_allocator.h>
20 #include <grpc/impl/channel_arg_names.h>
21 #include <grpc/support/port_platform.h>
22 #include <limits.h>
23 
24 #include "absl/cleanup/cleanup.h"
25 #include "absl/status/statusor.h"
26 #include "absl/strings/str_cat.h"
27 #include "absl/types/optional.h"
28 #include "src/core/lib/iomgr/port.h"
29 #include "src/core/util/crash.h"  // IWYU pragma: keep
30 #include "src/core/util/time.h"
31 #include "src/core/util/useful.h"
32 
33 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
34 #include <arpa/inet.h>  // IWYU pragma: keep
35 #ifdef GRPC_LINUX_TCP_H
36 #include <linux/tcp.h>
37 #else
38 #include <netinet/in.h>  // IWYU pragma: keep
39 #include <netinet/tcp.h>
40 #endif
41 #include <fcntl.h>
42 #include <sys/socket.h>
43 #include <unistd.h>
44 #endif  //  GRPC_POSIX_SOCKET_UTILS_COMMON
45 
46 #include <atomic>
47 #include <cstring>
48 
49 #include "absl/log/check.h"
50 #include "absl/log/log.h"
51 #include "absl/status/status.h"
52 #include "src/core/lib/event_engine/tcp_socket_utils.h"
53 #include "src/core/util/status_helper.h"
54 #include "src/core/util/strerror.h"
55 
56 #ifdef GRPC_HAVE_UNIX_SOCKET
57 #ifdef GPR_WINDOWS
58 // clang-format off
59 #include <ws2def.h>
60 #include <afunix.h>
61 // clang-format on
62 #else
63 #include <sys/stat.h>  // IWYU pragma: keep
64 #include <sys/un.h>
65 #endif  // GPR_WINDOWS
66 #endif
67 
68 namespace grpc_event_engine {
69 namespace experimental {
70 
71 namespace {
72 
AdjustValue(int default_value,int min_value,int max_value,absl::optional<int> actual_value)73 int AdjustValue(int default_value, int min_value, int max_value,
74                 absl::optional<int> actual_value) {
75   if (!actual_value.has_value() || *actual_value < min_value ||
76       *actual_value > max_value) {
77     return default_value;
78   }
79   return *actual_value;
80 }
81 
82 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
83 // The default values for TCP_USER_TIMEOUT are currently configured to be in
84 // line with the default values of KEEPALIVE_TIMEOUT as proposed in
85 // https://github.com/grpc/proposal/blob/master/A18-tcp-user-timeout.md */
86 int kDefaultClientUserTimeoutMs = 20000;
87 int kDefaultServerUserTimeoutMs = 20000;
88 bool kDefaultClientUserTimeoutEnabled = false;
89 bool kDefaultServerUserTimeoutEnabled = true;
90 
ErrorForFd(int fd,const experimental::EventEngine::ResolvedAddress & addr)91 absl::Status ErrorForFd(
92     int fd, const experimental::EventEngine::ResolvedAddress& addr) {
93   if (fd >= 0) return absl::OkStatus();
94   const char* addr_str = reinterpret_cast<const char*>(addr.address());
95   return absl::Status(absl::StatusCode::kInternal,
96                       absl::StrCat("socket: ", grpc_core::StrError(errno),
97                                    std::string(addr_str, addr.size())));
98 }
99 
CreateSocket(std::function<int (int,int,int)> socket_factory,int family,int type,int protocol)100 int CreateSocket(std::function<int(int, int, int)> socket_factory, int family,
101                  int type, int protocol) {
102   int res = socket_factory != nullptr ? socket_factory(family, type, protocol)
103                                       : socket(family, type, protocol);
104   if (res < 0 && errno == EMFILE) {
105     int saved_errno = errno;
106     LOG_EVERY_N_SEC(ERROR, 10)
107         << "socket(" << family << ", " << type << ", " << protocol
108         << ") returned " << res << " with error: |"
109         << grpc_core::StrError(errno)
110         << "|. This process might not have a sufficient file descriptor limit "
111            "for the number of connections grpc wants to open (which is "
112            "generally a function of the number of grpc channels, the lb policy "
113            "of each channel, and the number of backends each channel is load "
114            "balancing across).";
115     errno = saved_errno;
116   }
117   return res;
118 }
119 
PrepareTcpClientSocket(PosixSocketWrapper sock,const EventEngine::ResolvedAddress & addr,const PosixTcpOptions & options)120 absl::Status PrepareTcpClientSocket(PosixSocketWrapper sock,
121                                     const EventEngine::ResolvedAddress& addr,
122                                     const PosixTcpOptions& options) {
123   bool close_fd = true;
124   auto sock_cleanup = absl::MakeCleanup([&close_fd, &sock]() -> void {
125     if (close_fd and sock.Fd() >= 0) {
126       close(sock.Fd());
127     }
128   });
129   GRPC_RETURN_IF_ERROR(sock.SetSocketNonBlocking(1));
130   GRPC_RETURN_IF_ERROR(sock.SetSocketCloexec(1));
131   if (options.tcp_receive_buffer_size != options.kReadBufferSizeUnset) {
132     GRPC_RETURN_IF_ERROR(sock.SetSocketRcvBuf(options.tcp_receive_buffer_size));
133   }
134   if (addr.address()->sa_family != AF_UNIX && !ResolvedAddressIsVSock(addr)) {
135     // If its not a unix socket or vsock address.
136     GRPC_RETURN_IF_ERROR(sock.SetSocketLowLatency(1));
137     GRPC_RETURN_IF_ERROR(sock.SetSocketReuseAddr(1));
138     GRPC_RETURN_IF_ERROR(sock.SetSocketDscp(options.dscp));
139     sock.TrySetSocketTcpUserTimeout(options, true);
140   }
141   GRPC_RETURN_IF_ERROR(sock.SetSocketNoSigpipeIfPossible());
142   GRPC_RETURN_IF_ERROR(sock.ApplySocketMutatorInOptions(
143       GRPC_FD_CLIENT_CONNECTION_USAGE, options));
144   // No errors. Set close_fd to false to ensure the socket is not closed.
145   close_fd = false;
146   return absl::OkStatus();
147 }
148 
149 #endif  // GRPC_POSIX_SOCKET_UTILS_COMMON
150 
151 }  // namespace
152 
153 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
154 #ifndef GRPC_SET_SOCKET_DUALSTACK_CUSTOM
155 
SetSocketDualStack(int fd)156 bool SetSocketDualStack(int fd) {
157   const int off = 0;
158   return 0 == setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &off, sizeof(off));
159 }
160 
161 #endif  // GRPC_SET_SOCKET_DUALSTACK_CUSTOM
162 #endif  // GRPC_POSIX_SOCKET_UTILS_COMMON
163 
TcpOptionsFromEndpointConfig(const EndpointConfig & config)164 PosixTcpOptions TcpOptionsFromEndpointConfig(const EndpointConfig& config) {
165   void* value;
166   PosixTcpOptions options;
167   options.tcp_read_chunk_size = AdjustValue(
168       PosixTcpOptions::kDefaultReadChunkSize, 1, PosixTcpOptions::kMaxChunkSize,
169       config.GetInt(GRPC_ARG_TCP_READ_CHUNK_SIZE));
170   options.tcp_min_read_chunk_size =
171       AdjustValue(PosixTcpOptions::kDefaultMinReadChunksize, 1,
172                   PosixTcpOptions::kMaxChunkSize,
173                   config.GetInt(GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE));
174   options.tcp_max_read_chunk_size =
175       AdjustValue(PosixTcpOptions::kDefaultMaxReadChunksize, 1,
176                   PosixTcpOptions::kMaxChunkSize,
177                   config.GetInt(GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE));
178   options.tcp_tx_zerocopy_send_bytes_threshold =
179       AdjustValue(PosixTcpOptions::kDefaultSendBytesThreshold, 0, INT_MAX,
180                   config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD));
181   options.tcp_tx_zerocopy_max_simultaneous_sends =
182       AdjustValue(PosixTcpOptions::kDefaultMaxSends, 0, INT_MAX,
183                   config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS));
184   options.tcp_receive_buffer_size =
185       AdjustValue(PosixTcpOptions::kReadBufferSizeUnset, 0, INT_MAX,
186                   config.GetInt(GRPC_ARG_TCP_RECEIVE_BUFFER_SIZE));
187   options.tcp_tx_zero_copy_enabled =
188       (AdjustValue(PosixTcpOptions::kZerocpTxEnabledDefault, 0, 1,
189                    config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED)) != 0);
190   options.keep_alive_time_ms =
191       AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_KEEPALIVE_TIME_MS));
192   options.keep_alive_timeout_ms =
193       AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS));
194   options.expand_wildcard_addrs =
195       (AdjustValue(0, 1, INT_MAX,
196                    config.GetInt(GRPC_ARG_EXPAND_WILDCARD_ADDRS)) != 0);
197   options.dscp = AdjustValue(PosixTcpOptions::kDscpNotSet, 0, 63,
198                              config.GetInt(GRPC_ARG_DSCP));
199   options.allow_reuse_port = PosixSocketWrapper::IsSocketReusePortSupported();
200   auto allow_reuse_port_value = config.GetInt(GRPC_ARG_ALLOW_REUSEPORT);
201   if (allow_reuse_port_value.has_value()) {
202     options.allow_reuse_port =
203         (AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_ALLOW_REUSEPORT)) !=
204          0);
205   }
206   if (options.tcp_min_read_chunk_size > options.tcp_max_read_chunk_size) {
207     options.tcp_min_read_chunk_size = options.tcp_max_read_chunk_size;
208   }
209   options.tcp_read_chunk_size = grpc_core::Clamp(
210       options.tcp_read_chunk_size, options.tcp_min_read_chunk_size,
211       options.tcp_max_read_chunk_size);
212 
213   value = config.GetVoidPointer(GRPC_ARG_RESOURCE_QUOTA);
214   if (value != nullptr) {
215     options.resource_quota =
216         reinterpret_cast<grpc_core::ResourceQuota*>(value)->Ref();
217   }
218   value = config.GetVoidPointer(GRPC_ARG_SOCKET_MUTATOR);
219   if (value != nullptr) {
220     options.socket_mutator =
221         grpc_socket_mutator_ref(static_cast<grpc_socket_mutator*>(value));
222   }
223   value =
224       config.GetVoidPointer(GRPC_ARG_EVENT_ENGINE_USE_MEMORY_ALLOCATOR_FACTORY);
225   if (value != nullptr) {
226     options.memory_allocator_factory =
227         static_cast<grpc_event_engine::experimental::MemoryAllocatorFactory*>(
228             value);
229   }
230   return options;
231 }
232 
233 #ifdef GRPC_POSIX_SOCKETUTILS
234 
Accept4(int sockfd,grpc_event_engine::experimental::EventEngine::ResolvedAddress & addr,int nonblock,int cloexec)235 int Accept4(int sockfd,
236             grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr,
237             int nonblock, int cloexec) {
238   int fd, flags;
239   EventEngine::ResolvedAddress peer_addr;
240   socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
241   fd = accept(sockfd, const_cast<sockaddr*>(peer_addr.address()), &len);
242   if (fd >= 0) {
243     if (nonblock) {
244       flags = fcntl(fd, F_GETFL, 0);
245       if (flags < 0) goto close_and_error;
246       if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0) goto close_and_error;
247     }
248     if (cloexec) {
249       flags = fcntl(fd, F_GETFD, 0);
250       if (flags < 0) goto close_and_error;
251       if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC) != 0) goto close_and_error;
252     }
253   }
254   addr = EventEngine::ResolvedAddress(peer_addr.address(), len);
255   return fd;
256 
257 close_and_error:
258   close(fd);
259   return -1;
260 }
261 
262 #elif GRPC_LINUX_SOCKETUTILS
263 
Accept4(int sockfd,grpc_event_engine::experimental::EventEngine::ResolvedAddress & addr,int nonblock,int cloexec)264 int Accept4(int sockfd,
265             grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr,
266             int nonblock, int cloexec) {
267   int flags = 0;
268   flags |= nonblock ? SOCK_NONBLOCK : 0;
269   flags |= cloexec ? SOCK_CLOEXEC : 0;
270   EventEngine::ResolvedAddress peer_addr;
271   socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
272   int ret =
273       accept4(sockfd, const_cast<sockaddr*>(peer_addr.address()), &len, flags);
274   addr = EventEngine::ResolvedAddress(peer_addr.address(), len);
275   return ret;
276 }
277 
278 #endif  // GRPC_LINUX_SOCKETUTILS
279 
280 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
281 
UnlinkIfUnixDomainSocket(const EventEngine::ResolvedAddress & resolved_addr)282 void UnlinkIfUnixDomainSocket(
283     const EventEngine::ResolvedAddress& resolved_addr) {
284 #ifdef GRPC_HAVE_UNIX_SOCKET
285   if (resolved_addr.address()->sa_family != AF_UNIX) {
286     return;
287   }
288   struct sockaddr_un* un = reinterpret_cast<struct sockaddr_un*>(
289       const_cast<sockaddr*>(resolved_addr.address()));
290 
291   // There is nothing to unlink for an abstract unix socket
292   if (un->sun_path[0] == '\0' && un->sun_path[1] != '\0') {
293     return;
294   }
295 
296   struct stat st;
297   if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) {
298     unlink(un->sun_path);
299   }
300 #else
301   (void)resolved_addr;
302 #endif
303 }
304 
305 // Instruct the kernel to wait for specified number of bytes to be received on
306 // the socket before generating an interrupt for packet receive. If the call
307 // succeeds, it returns the number of bytes (wait threshold) that was actually
308 // set.
SetSocketRcvLowat(int bytes)309 absl::StatusOr<int> PosixSocketWrapper::SetSocketRcvLowat(int bytes) {
310   if (setsockopt(fd_, SOL_SOCKET, SO_RCVLOWAT, &bytes, sizeof(bytes)) != 0) {
311     return absl::Status(
312         absl::StatusCode::kInternal,
313         absl::StrCat("setsockopt(SO_RCVLOWAT): ", grpc_core::StrError(errno)));
314   }
315   return bytes;
316 }
317 
318 // Set a socket to use zerocopy
SetSocketZeroCopy()319 absl::Status PosixSocketWrapper::SetSocketZeroCopy() {
320 #ifdef GRPC_LINUX_ERRQUEUE
321   const int enable = 1;
322   auto err = setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable));
323   if (err != 0) {
324     return absl::Status(
325         absl::StatusCode::kInternal,
326         absl::StrCat("setsockopt(SO_ZEROCOPY): ", grpc_core::StrError(errno)));
327   }
328   return absl::OkStatus();
329 #else
330   return absl::Status(absl::StatusCode::kInternal,
331                       absl::StrCat("setsockopt(SO_ZEROCOPY): ",
332                                    grpc_core::StrError(ENOSYS).c_str()));
333 #endif
334 }
335 
336 // Set a socket to non blocking mode
SetSocketNonBlocking(int non_blocking)337 absl::Status PosixSocketWrapper::SetSocketNonBlocking(int non_blocking) {
338   int oldflags = fcntl(fd_, F_GETFL, 0);
339   if (oldflags < 0) {
340     return absl::Status(absl::StatusCode::kInternal,
341                         absl::StrCat("fcntl: ", grpc_core::StrError(errno)));
342   }
343 
344   if (non_blocking) {
345     oldflags |= O_NONBLOCK;
346   } else {
347     oldflags &= ~O_NONBLOCK;
348   }
349 
350   if (fcntl(fd_, F_SETFL, oldflags) != 0) {
351     return absl::Status(absl::StatusCode::kInternal,
352                         absl::StrCat("fcntl: ", grpc_core::StrError(errno)));
353   }
354 
355   return absl::OkStatus();
356 }
357 
SetSocketNoSigpipeIfPossible()358 absl::Status PosixSocketWrapper::SetSocketNoSigpipeIfPossible() {
359 #ifdef GRPC_HAVE_SO_NOSIGPIPE
360   int val = 1;
361   int newval;
362   socklen_t intlen = sizeof(newval);
363   if (0 != setsockopt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))) {
364     return absl::Status(
365         absl::StatusCode::kInternal,
366         absl::StrCat("setsockopt(SO_NOSIGPIPE): ", grpc_core::StrError(errno)));
367   }
368   if (0 != getsockopt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen)) {
369     return absl::Status(
370         absl::StatusCode::kInternal,
371         absl::StrCat("getsockopt(SO_NOSIGPIPE): ", grpc_core::StrError(errno)));
372   }
373   if ((newval != 0) != (val != 0)) {
374     return absl::Status(absl::StatusCode::kInternal,
375                         "Failed to set SO_NOSIGPIPE");
376   }
377 #endif
378   return absl::OkStatus();
379 }
380 
SetSocketIpPktInfoIfPossible()381 absl::Status PosixSocketWrapper::SetSocketIpPktInfoIfPossible() {
382 #ifdef GRPC_HAVE_IP_PKTINFO
383   int get_local_ip = 1;
384   if (0 != setsockopt(fd_, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
385                       sizeof(get_local_ip))) {
386     return absl::Status(
387         absl::StatusCode::kInternal,
388         absl::StrCat("setsockopt(IP_PKTINFO): ", grpc_core::StrError(errno)));
389   }
390 #endif
391   return absl::OkStatus();
392 }
393 
SetSocketIpv6RecvPktInfoIfPossible()394 absl::Status PosixSocketWrapper::SetSocketIpv6RecvPktInfoIfPossible() {
395 #ifdef GRPC_HAVE_IPV6_RECVPKTINFO
396   int get_local_ip = 1;
397   if (0 != setsockopt(fd_, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
398                       sizeof(get_local_ip))) {
399     return absl::Status(absl::StatusCode::kInternal,
400                         absl::StrCat("setsockopt(IPV6_RECVPKTINFO): ",
401                                      grpc_core::StrError(errno)));
402   }
403 #endif
404   return absl::OkStatus();
405 }
406 
SetSocketSndBuf(int buffer_size_bytes)407 absl::Status PosixSocketWrapper::SetSocketSndBuf(int buffer_size_bytes) {
408   return 0 == setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &buffer_size_bytes,
409                          sizeof(buffer_size_bytes))
410              ? absl::OkStatus()
411              : absl::Status(absl::StatusCode::kInternal,
412                             absl::StrCat("setsockopt(SO_SNDBUF): ",
413                                          grpc_core::StrError(errno)));
414 }
415 
SetSocketRcvBuf(int buffer_size_bytes)416 absl::Status PosixSocketWrapper::SetSocketRcvBuf(int buffer_size_bytes) {
417   return 0 == setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &buffer_size_bytes,
418                          sizeof(buffer_size_bytes))
419              ? absl::OkStatus()
420              : absl::Status(absl::StatusCode::kInternal,
421                             absl::StrCat("setsockopt(SO_RCVBUF): ",
422                                          grpc_core::StrError(errno)));
423 }
424 
425 // Set a socket to close on exec
SetSocketCloexec(int close_on_exec)426 absl::Status PosixSocketWrapper::SetSocketCloexec(int close_on_exec) {
427   int oldflags = fcntl(fd_, F_GETFD, 0);
428   if (oldflags < 0) {
429     return absl::Status(absl::StatusCode::kInternal,
430                         absl::StrCat("fcntl: ", grpc_core::StrError(errno)));
431   }
432 
433   if (close_on_exec) {
434     oldflags |= FD_CLOEXEC;
435   } else {
436     oldflags &= ~FD_CLOEXEC;
437   }
438 
439   if (fcntl(fd_, F_SETFD, oldflags) != 0) {
440     return absl::Status(absl::StatusCode::kInternal,
441                         absl::StrCat("fcntl: ", grpc_core::StrError(errno)));
442   }
443 
444   return absl::OkStatus();
445 }
446 
447 // set a socket to reuse old addresses
SetSocketReuseAddr(int reuse)448 absl::Status PosixSocketWrapper::SetSocketReuseAddr(int reuse) {
449   int val = (reuse != 0);
450   int newval;
451   socklen_t intlen = sizeof(newval);
452   if (0 != setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val))) {
453     return absl::Status(
454         absl::StatusCode::kInternal,
455         absl::StrCat("setsockopt(SO_REUSEADDR): ", grpc_core::StrError(errno)));
456   }
457   if (0 != getsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen)) {
458     return absl::Status(
459         absl::StatusCode::kInternal,
460         absl::StrCat("getsockopt(SO_REUSEADDR): ", grpc_core::StrError(errno)));
461   }
462   if ((newval != 0) != val) {
463     return absl::Status(absl::StatusCode::kInternal,
464                         "Failed to set SO_REUSEADDR");
465   }
466 
467   return absl::OkStatus();
468 }
469 
470 // set a socket to reuse old ports
SetSocketReusePort(int reuse)471 absl::Status PosixSocketWrapper::SetSocketReusePort(int reuse) {
472 #ifndef SO_REUSEPORT
473   return absl::Status(absl::StatusCode::kInternal,
474                       "SO_REUSEPORT unavailable on compiling system");
475 #else
476   int val = (reuse != 0);
477   int newval;
478   socklen_t intlen = sizeof(newval);
479   if (0 != setsockopt(fd_, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val))) {
480     return absl::Status(
481         absl::StatusCode::kInternal,
482         absl::StrCat("setsockopt(SO_REUSEPORT): ", grpc_core::StrError(errno)));
483   }
484   if (0 != getsockopt(fd_, SOL_SOCKET, SO_REUSEPORT, &newval, &intlen)) {
485     return absl::Status(
486         absl::StatusCode::kInternal,
487         absl::StrCat("getsockopt(SO_REUSEPORT): ", grpc_core::StrError(errno)));
488   }
489   if ((newval != 0) != val) {
490     return absl::Status(absl::StatusCode::kInternal,
491                         "Failed to set SO_REUSEPORT");
492   }
493 
494   return absl::OkStatus();
495 #endif
496 }
497 
IsSocketReusePortSupported()498 bool PosixSocketWrapper::IsSocketReusePortSupported() {
499   static bool kSupportSoReusePort = []() -> bool {
500     int s = socket(AF_INET, SOCK_STREAM, 0);
501     if (s < 0) {
502       // This might be an ipv6-only environment in which case
503       // 'socket(AF_INET,..)' call would fail. Try creating IPv6 socket in
504       // that case
505       s = socket(AF_INET6, SOCK_STREAM, 0);
506     }
507     if (s >= 0) {
508       PosixSocketWrapper sock(s);
509       bool result = sock.SetSocketReusePort(1).ok();
510       close(sock.Fd());
511       return result;
512     } else {
513       return false;
514     }
515   }();
516   return kSupportSoReusePort;
517 }
518 
519 // Disable nagle algorithm
SetSocketLowLatency(int low_latency)520 absl::Status PosixSocketWrapper::SetSocketLowLatency(int low_latency) {
521   int val = (low_latency != 0);
522   int newval;
523   socklen_t intlen = sizeof(newval);
524   if (0 != setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) {
525     return absl::Status(
526         absl::StatusCode::kInternal,
527         absl::StrCat("setsockopt(TCP_NODELAY): ", grpc_core::StrError(errno)));
528   }
529   if (0 != getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen)) {
530     return absl::Status(
531         absl::StatusCode::kInternal,
532         absl::StrCat("getsockopt(TCP_NODELAY): ", grpc_core::StrError(errno)));
533   }
534   if ((newval != 0) != val) {
535     return absl::Status(absl::StatusCode::kInternal,
536                         "Failed to set TCP_NODELAY");
537   }
538   return absl::OkStatus();
539 }
540 
541 // Set Differentiated Services Code Point (DSCP)
SetSocketDscp(int dscp)542 absl::Status PosixSocketWrapper::SetSocketDscp(int dscp) {
543   if (dscp == PosixTcpOptions::kDscpNotSet) {
544     return absl::OkStatus();
545   }
546   // The TOS/TrafficClass byte consists of following bits:
547   // | 7 6 5 4 3 2 | 1 0 |
548   // |    DSCP     | ECN |
549   int newval = dscp << 2;
550   int val;
551   socklen_t intlen = sizeof(val);
552   // Get ECN bits from current IP_TOS value unless IPv6 only
553   if (0 == getsockopt(fd_, IPPROTO_IP, IP_TOS, &val, &intlen)) {
554     newval |= (val & 0x3);
555     if (0 != setsockopt(fd_, IPPROTO_IP, IP_TOS, &newval, sizeof(newval))) {
556       return absl::Status(
557           absl::StatusCode::kInternal,
558           absl::StrCat("setsockopt(IP_TOS): ", grpc_core::StrError(errno)));
559     }
560   }
561   // Get ECN from current Traffic Class value if IPv6 is available
562   if (0 == getsockopt(fd_, IPPROTO_IPV6, IPV6_TCLASS, &val, &intlen)) {
563     newval |= (val & 0x3);
564     if (0 !=
565         setsockopt(fd_, IPPROTO_IPV6, IPV6_TCLASS, &newval, sizeof(newval))) {
566       return absl::Status(absl::StatusCode::kInternal,
567                           absl::StrCat("setsockopt(IPV6_TCLASS): ",
568                                        grpc_core::StrError(errno)));
569     }
570   }
571   return absl::OkStatus();
572 }
573 
574 #if GPR_LINUX == 1
575 // For Linux, it will be detected to support TCP_USER_TIMEOUT
576 #ifndef TCP_USER_TIMEOUT
577 #define TCP_USER_TIMEOUT 18
578 #endif
579 #define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT 0
580 #else
581 // For non-Linux, TCP_USER_TIMEOUT will be used if TCP_USER_TIMEOUT is defined.
582 #ifdef TCP_USER_TIMEOUT
583 #define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT 0
584 #else
585 #define TCP_USER_TIMEOUT 0
586 #define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT -1
587 #endif  // TCP_USER_TIMEOUT
588 #endif  // GPR_LINUX == 1
589 
590 // Whether the socket supports TCP_USER_TIMEOUT option.
591 // (0: don't know, 1: support, -1: not support)
592 static std::atomic<int> g_socket_supports_tcp_user_timeout(
593     SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT);
594 
ConfigureDefaultTcpUserTimeout(bool enable,int timeout,bool is_client)595 void PosixSocketWrapper::ConfigureDefaultTcpUserTimeout(bool enable,
596                                                         int timeout,
597                                                         bool is_client) {
598   if (is_client) {
599     kDefaultClientUserTimeoutEnabled = enable;
600     if (timeout > 0) {
601       kDefaultClientUserTimeoutMs = timeout;
602     }
603   } else {
604     kDefaultServerUserTimeoutEnabled = enable;
605     if (timeout > 0) {
606       kDefaultServerUserTimeoutMs = timeout;
607     }
608   }
609 }
610 
611 // Set TCP_USER_TIMEOUT
TrySetSocketTcpUserTimeout(const PosixTcpOptions & options,bool is_client)612 void PosixSocketWrapper::TrySetSocketTcpUserTimeout(
613     const PosixTcpOptions& options, bool is_client) {
614   if (g_socket_supports_tcp_user_timeout.load() < 0) {
615     return;
616   }
617   bool enable = is_client ? kDefaultClientUserTimeoutEnabled
618                           : kDefaultServerUserTimeoutEnabled;
619   int timeout =
620       is_client ? kDefaultClientUserTimeoutMs : kDefaultServerUserTimeoutMs;
621   if (options.keep_alive_time_ms > 0) {
622     enable = options.keep_alive_time_ms != INT_MAX;
623   }
624   if (options.keep_alive_timeout_ms > 0) {
625     timeout = options.keep_alive_timeout_ms;
626   }
627   if (enable) {
628     int newval;
629     socklen_t len = sizeof(newval);
630     // If this is the first time to use TCP_USER_TIMEOUT, try to check
631     // if it is available.
632     if (g_socket_supports_tcp_user_timeout.load() == 0) {
633       if (0 != getsockopt(fd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) {
634         // This log is intentionally not protected behind a flag, so that users
635         // know that TCP_USER_TIMEOUT is not being used.
636         GRPC_TRACE_LOG(tcp, INFO)
637             << "TCP_USER_TIMEOUT is not available. TCP_USER_TIMEOUT "
638                "won't be used thereafter";
639         g_socket_supports_tcp_user_timeout.store(-1);
640       } else {
641         GRPC_TRACE_LOG(tcp, INFO)
642             << "TCP_USER_TIMEOUT is available. TCP_USER_TIMEOUT will be "
643                "used thereafter";
644         g_socket_supports_tcp_user_timeout.store(1);
645       }
646     }
647     if (g_socket_supports_tcp_user_timeout.load() > 0) {
648       if (0 != setsockopt(fd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout,
649                           sizeof(timeout))) {
650         LOG(ERROR) << "setsockopt(TCP_USER_TIMEOUT) "
651                    << grpc_core::StrError(errno);
652         return;
653       }
654       if (0 != getsockopt(fd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) {
655         LOG(ERROR) << "getsockopt(TCP_USER_TIMEOUT) "
656                    << grpc_core::StrError(errno);
657         return;
658       }
659       if (newval != timeout) {
660         // Do not fail on failing to set TCP_USER_TIMEOUT
661         LOG(ERROR) << "Failed to set TCP_USER_TIMEOUT";
662         return;
663       }
664     }
665   }
666 }
667 
668 // Set a socket using a grpc_socket_mutator
SetSocketMutator(grpc_fd_usage usage,grpc_socket_mutator * mutator)669 absl::Status PosixSocketWrapper::SetSocketMutator(
670     grpc_fd_usage usage, grpc_socket_mutator* mutator) {
671   CHECK(mutator);
672   if (!grpc_socket_mutator_mutate_fd(mutator, fd_, usage)) {
673     return absl::Status(absl::StatusCode::kInternal,
674                         "grpc_socket_mutator failed.");
675   }
676   return absl::OkStatus();
677 }
678 
ApplySocketMutatorInOptions(grpc_fd_usage usage,const PosixTcpOptions & options)679 absl::Status PosixSocketWrapper::ApplySocketMutatorInOptions(
680     grpc_fd_usage usage, const PosixTcpOptions& options) {
681   if (options.socket_mutator == nullptr) {
682     return absl::OkStatus();
683   }
684   return SetSocketMutator(usage, options.socket_mutator);
685 }
686 
IsIpv6LoopbackAvailable()687 bool PosixSocketWrapper::IsIpv6LoopbackAvailable() {
688   static bool kIpv6LoopbackAvailable = []() -> bool {
689     int fd = socket(AF_INET6, SOCK_STREAM, 0);
690     bool loopback_available = false;
691     if (fd < 0) {
692       GRPC_TRACE_LOG(tcp, INFO)
693           << "Disabling AF_INET6 sockets because socket() failed.";
694     } else {
695       sockaddr_in6 addr;
696       memset(&addr, 0, sizeof(addr));
697       addr.sin6_family = AF_INET6;
698       addr.sin6_addr.s6_addr[15] = 1;  // [::1]:0
699       if (bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == 0) {
700         loopback_available = true;
701       } else {
702         GRPC_TRACE_LOG(tcp, INFO)
703             << "Disabling AF_INET6 sockets because ::1 is not available.";
704       }
705       close(fd);
706     }
707     return loopback_available;
708   }();
709   return kIpv6LoopbackAvailable;
710 }
711 
712 absl::StatusOr<EventEngine::ResolvedAddress>
LocalAddress()713 PosixSocketWrapper::LocalAddress() {
714   EventEngine::ResolvedAddress addr;
715   socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
716   if (getsockname(fd_, const_cast<sockaddr*>(addr.address()), &len) < 0) {
717     return absl::InternalError(
718         absl::StrCat("getsockname:", grpc_core::StrError(errno)));
719   }
720   return EventEngine::ResolvedAddress(addr.address(), len);
721 }
722 
PeerAddress()723 absl::StatusOr<EventEngine::ResolvedAddress> PosixSocketWrapper::PeerAddress() {
724   EventEngine::ResolvedAddress addr;
725   socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
726   if (getpeername(fd_, const_cast<sockaddr*>(addr.address()), &len) < 0) {
727     return absl::InternalError(
728         absl::StrCat("getpeername:", grpc_core::StrError(errno)));
729   }
730   return EventEngine::ResolvedAddress(addr.address(), len);
731 }
732 
LocalAddressString()733 absl::StatusOr<std::string> PosixSocketWrapper::LocalAddressString() {
734   auto status = LocalAddress();
735   if (!status.ok()) {
736     return status.status();
737   }
738   return ResolvedAddressToNormalizedString((*status));
739 }
740 
PeerAddressString()741 absl::StatusOr<std::string> PosixSocketWrapper::PeerAddressString() {
742   auto status = PeerAddress();
743   if (!status.ok()) {
744     return status.status();
745   }
746   return ResolvedAddressToNormalizedString((*status));
747 }
748 
CreateDualStackSocket(std::function<int (int,int,int)> socket_factory,const experimental::EventEngine::ResolvedAddress & addr,int type,int protocol,PosixSocketWrapper::DSMode & dsmode)749 absl::StatusOr<PosixSocketWrapper> PosixSocketWrapper::CreateDualStackSocket(
750     std::function<int(int, int, int)> socket_factory,
751     const experimental::EventEngine::ResolvedAddress& addr, int type,
752     int protocol, PosixSocketWrapper::DSMode& dsmode) {
753   const sockaddr* sock_addr = addr.address();
754   int family = sock_addr->sa_family;
755   int newfd;
756   if (family == AF_INET6) {
757     if (IsIpv6LoopbackAvailable()) {
758       newfd = CreateSocket(socket_factory, family, type, protocol);
759     } else {
760       newfd = -1;
761       errno = EAFNOSUPPORT;
762     }
763     // Check if we've got a valid dualstack socket.
764     if (newfd > 0 && SetSocketDualStack(newfd)) {
765       dsmode = PosixSocketWrapper::DSMode::DSMODE_DUALSTACK;
766       return PosixSocketWrapper(newfd);
767     }
768     // If this isn't an IPv4 address, then return whatever we've got.
769     if (!ResolvedAddressIsV4Mapped(addr, nullptr)) {
770       if (newfd < 0) {
771         return ErrorForFd(newfd, addr);
772       }
773       dsmode = PosixSocketWrapper::DSMode::DSMODE_IPV6;
774       return PosixSocketWrapper(newfd);
775     }
776     // Fall back to AF_INET.
777     if (newfd >= 0) {
778       close(newfd);
779     }
780     family = AF_INET;
781   }
782   dsmode = family == AF_INET ? PosixSocketWrapper::DSMode::DSMODE_IPV4
783                              : PosixSocketWrapper::DSMode::DSMODE_NONE;
784   newfd = CreateSocket(socket_factory, family, type, protocol);
785   if (newfd < 0) {
786     return ErrorForFd(newfd, addr);
787   }
788   return PosixSocketWrapper(newfd);
789 }
790 
791 absl::StatusOr<PosixSocketWrapper::PosixSocketCreateResult>
CreateAndPrepareTcpClientSocket(const PosixTcpOptions & options,const EventEngine::ResolvedAddress & target_addr)792 PosixSocketWrapper::CreateAndPrepareTcpClientSocket(
793     const PosixTcpOptions& options,
794     const EventEngine::ResolvedAddress& target_addr) {
795   PosixSocketWrapper::DSMode dsmode;
796   EventEngine::ResolvedAddress mapped_target_addr;
797 
798   // Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
799   // v6.
800   if (!ResolvedAddressToV4Mapped(target_addr, &mapped_target_addr)) {
801     // addr is v4 mapped to v6 or just v6.
802     mapped_target_addr = target_addr;
803   }
804   absl::StatusOr<PosixSocketWrapper> posix_socket_wrapper =
805       PosixSocketWrapper::CreateDualStackSocket(nullptr, mapped_target_addr,
806                                                 SOCK_STREAM, 0, dsmode);
807   if (!posix_socket_wrapper.ok()) {
808     return posix_socket_wrapper.status();
809   }
810 
811   if (dsmode == PosixSocketWrapper::DSMode::DSMODE_IPV4) {
812     // Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4.
813     if (!ResolvedAddressIsV4Mapped(target_addr, &mapped_target_addr)) {
814       mapped_target_addr = target_addr;
815     }
816   }
817 
818   auto error = PrepareTcpClientSocket(*posix_socket_wrapper, mapped_target_addr,
819                                       options);
820   if (!error.ok()) {
821     return error;
822   }
823   return PosixSocketWrapper::PosixSocketCreateResult{*posix_socket_wrapper,
824                                                      mapped_target_addr};
825 }
826 
827 #else  // GRPC_POSIX_SOCKET_UTILS_COMMON
828 
SetSocketRcvLowat(int)829 absl::StatusOr<int> PosixSocketWrapper::SetSocketRcvLowat(int /*bytes*/) {
830   grpc_core::Crash("unimplemented");
831 }
832 
SetSocketZeroCopy()833 absl::Status PosixSocketWrapper::SetSocketZeroCopy() {
834   grpc_core::Crash("unimplemented");
835 }
836 
SetSocketNonBlocking(int)837 absl::Status PosixSocketWrapper::SetSocketNonBlocking(int /*non_blocking*/) {
838   grpc_core::Crash("unimplemented");
839 }
840 
SetSocketCloexec(int)841 absl::Status PosixSocketWrapper::SetSocketCloexec(int /*close_on_exec*/) {
842   grpc_core::Crash("unimplemented");
843 }
844 
SetSocketReuseAddr(int)845 absl::Status PosixSocketWrapper::SetSocketReuseAddr(int /*reuse*/) {
846   grpc_core::Crash("unimplemented");
847 }
848 
SetSocketLowLatency(int)849 absl::Status PosixSocketWrapper::SetSocketLowLatency(int /*low_latency*/) {
850   grpc_core::Crash("unimplemented");
851 }
852 
SetSocketReusePort(int)853 absl::Status PosixSocketWrapper::SetSocketReusePort(int /*reuse*/) {
854   grpc_core::Crash("unimplemented");
855 }
856 
SetSocketDscp(int)857 absl::Status PosixSocketWrapper::SetSocketDscp(int /*dscp*/) {
858   grpc_core::Crash("unimplemented");
859 }
860 
ConfigureDefaultTcpUserTimeout(bool,int,bool)861 void PosixSocketWrapper::ConfigureDefaultTcpUserTimeout(bool /*enable*/,
862                                                         int /*timeout*/,
863                                                         bool /*is_client*/) {}
864 
TrySetSocketTcpUserTimeout(const PosixTcpOptions &,bool)865 void PosixSocketWrapper::TrySetSocketTcpUserTimeout(
866     const PosixTcpOptions& /*options*/, bool /*is_client*/) {
867   grpc_core::Crash("unimplemented");
868 }
869 
SetSocketNoSigpipeIfPossible()870 absl::Status PosixSocketWrapper::SetSocketNoSigpipeIfPossible() {
871   grpc_core::Crash("unimplemented");
872 }
873 
SetSocketIpPktInfoIfPossible()874 absl::Status PosixSocketWrapper::SetSocketIpPktInfoIfPossible() {
875   grpc_core::Crash("unimplemented");
876 }
877 
SetSocketIpv6RecvPktInfoIfPossible()878 absl::Status PosixSocketWrapper::SetSocketIpv6RecvPktInfoIfPossible() {
879   grpc_core::Crash("unimplemented");
880 }
881 
SetSocketSndBuf(int)882 absl::Status PosixSocketWrapper::SetSocketSndBuf(int /*buffer_size_bytes*/) {
883   grpc_core::Crash("unimplemented");
884 }
885 
SetSocketRcvBuf(int)886 absl::Status PosixSocketWrapper::SetSocketRcvBuf(int /*buffer_size_bytes*/) {
887   grpc_core::Crash("unimplemented");
888 }
889 
SetSocketMutator(grpc_fd_usage,grpc_socket_mutator *)890 absl::Status PosixSocketWrapper::SetSocketMutator(
891     grpc_fd_usage /*usage*/, grpc_socket_mutator* /*mutator*/) {
892   grpc_core::Crash("unimplemented");
893 }
894 
ApplySocketMutatorInOptions(grpc_fd_usage,const PosixTcpOptions &)895 absl::Status PosixSocketWrapper::ApplySocketMutatorInOptions(
896     grpc_fd_usage /*usage*/, const PosixTcpOptions& /*options*/) {
897   grpc_core::Crash("unimplemented");
898 }
899 
IsSocketReusePortSupported()900 bool PosixSocketWrapper::IsSocketReusePortSupported() {
901   grpc_core::Crash("unimplemented");
902 }
903 
IsIpv6LoopbackAvailable()904 bool PosixSocketWrapper::IsIpv6LoopbackAvailable() {
905   grpc_core::Crash("unimplemented");
906 }
907 
CreateDualStackSocket(std::function<int (int,int,int)>,const experimental::EventEngine::ResolvedAddress &,int,int,DSMode &)908 absl::StatusOr<PosixSocketWrapper> PosixSocketWrapper::CreateDualStackSocket(
909     std::function<int(int /*domain*/, int /*type*/, int /*protocol*/)>
910     /* socket_factory */,
911     const experimental::EventEngine::ResolvedAddress& /*addr*/, int /*type*/,
912     int /*protocol*/, DSMode& /*dsmode*/) {
913   grpc_core::Crash("unimplemented");
914 }
915 
916 absl::StatusOr<PosixSocketWrapper::PosixSocketCreateResult>
CreateAndPrepareTcpClientSocket(const PosixTcpOptions &,const EventEngine::ResolvedAddress &)917 PosixSocketWrapper::CreateAndPrepareTcpClientSocket(
918     const PosixTcpOptions& /*options*/,
919     const EventEngine::ResolvedAddress& /*target_addr*/) {
920   grpc_core::Crash("unimplemented");
921 }
922 
923 #endif  // GRPC_POSIX_SOCKET_UTILS_COMMON
924 
925 }  // namespace experimental
926 }  // namespace grpc_event_engine
927