1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
16
17 #include <errno.h>
18 #include <grpc/event_engine/event_engine.h>
19 #include <grpc/event_engine/memory_allocator.h>
20 #include <grpc/impl/channel_arg_names.h>
21 #include <grpc/support/port_platform.h>
22 #include <limits.h>
23
24 #include "absl/cleanup/cleanup.h"
25 #include "absl/status/statusor.h"
26 #include "absl/strings/str_cat.h"
27 #include "absl/types/optional.h"
28 #include "src/core/lib/iomgr/port.h"
29 #include "src/core/util/crash.h" // IWYU pragma: keep
30 #include "src/core/util/time.h"
31 #include "src/core/util/useful.h"
32
33 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
34 #include <arpa/inet.h> // IWYU pragma: keep
35 #ifdef GRPC_LINUX_TCP_H
36 #include <linux/tcp.h>
37 #else
38 #include <netinet/in.h> // IWYU pragma: keep
39 #include <netinet/tcp.h>
40 #endif
41 #include <fcntl.h>
42 #include <sys/socket.h>
43 #include <unistd.h>
44 #endif // GRPC_POSIX_SOCKET_UTILS_COMMON
45
46 #include <atomic>
47 #include <cstring>
48
49 #include "absl/log/check.h"
50 #include "absl/log/log.h"
51 #include "absl/status/status.h"
52 #include "src/core/lib/event_engine/tcp_socket_utils.h"
53 #include "src/core/util/status_helper.h"
54 #include "src/core/util/strerror.h"
55
56 #ifdef GRPC_HAVE_UNIX_SOCKET
57 #ifdef GPR_WINDOWS
58 // clang-format off
59 #include <ws2def.h>
60 #include <afunix.h>
61 // clang-format on
62 #else
63 #include <sys/stat.h> // IWYU pragma: keep
64 #include <sys/un.h>
65 #endif // GPR_WINDOWS
66 #endif
67
68 namespace grpc_event_engine {
69 namespace experimental {
70
71 namespace {
72
AdjustValue(int default_value,int min_value,int max_value,absl::optional<int> actual_value)73 int AdjustValue(int default_value, int min_value, int max_value,
74 absl::optional<int> actual_value) {
75 if (!actual_value.has_value() || *actual_value < min_value ||
76 *actual_value > max_value) {
77 return default_value;
78 }
79 return *actual_value;
80 }
81
82 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
83 // The default values for TCP_USER_TIMEOUT are currently configured to be in
84 // line with the default values of KEEPALIVE_TIMEOUT as proposed in
85 // https://github.com/grpc/proposal/blob/master/A18-tcp-user-timeout.md */
86 int kDefaultClientUserTimeoutMs = 20000;
87 int kDefaultServerUserTimeoutMs = 20000;
88 bool kDefaultClientUserTimeoutEnabled = false;
89 bool kDefaultServerUserTimeoutEnabled = true;
90
ErrorForFd(int fd,const experimental::EventEngine::ResolvedAddress & addr)91 absl::Status ErrorForFd(
92 int fd, const experimental::EventEngine::ResolvedAddress& addr) {
93 if (fd >= 0) return absl::OkStatus();
94 const char* addr_str = reinterpret_cast<const char*>(addr.address());
95 return absl::Status(absl::StatusCode::kInternal,
96 absl::StrCat("socket: ", grpc_core::StrError(errno),
97 std::string(addr_str, addr.size())));
98 }
99
CreateSocket(std::function<int (int,int,int)> socket_factory,int family,int type,int protocol)100 int CreateSocket(std::function<int(int, int, int)> socket_factory, int family,
101 int type, int protocol) {
102 int res = socket_factory != nullptr ? socket_factory(family, type, protocol)
103 : socket(family, type, protocol);
104 if (res < 0 && errno == EMFILE) {
105 int saved_errno = errno;
106 LOG_EVERY_N_SEC(ERROR, 10)
107 << "socket(" << family << ", " << type << ", " << protocol
108 << ") returned " << res << " with error: |"
109 << grpc_core::StrError(errno)
110 << "|. This process might not have a sufficient file descriptor limit "
111 "for the number of connections grpc wants to open (which is "
112 "generally a function of the number of grpc channels, the lb policy "
113 "of each channel, and the number of backends each channel is load "
114 "balancing across).";
115 errno = saved_errno;
116 }
117 return res;
118 }
119
PrepareTcpClientSocket(PosixSocketWrapper sock,const EventEngine::ResolvedAddress & addr,const PosixTcpOptions & options)120 absl::Status PrepareTcpClientSocket(PosixSocketWrapper sock,
121 const EventEngine::ResolvedAddress& addr,
122 const PosixTcpOptions& options) {
123 bool close_fd = true;
124 auto sock_cleanup = absl::MakeCleanup([&close_fd, &sock]() -> void {
125 if (close_fd and sock.Fd() >= 0) {
126 close(sock.Fd());
127 }
128 });
129 GRPC_RETURN_IF_ERROR(sock.SetSocketNonBlocking(1));
130 GRPC_RETURN_IF_ERROR(sock.SetSocketCloexec(1));
131 if (options.tcp_receive_buffer_size != options.kReadBufferSizeUnset) {
132 GRPC_RETURN_IF_ERROR(sock.SetSocketRcvBuf(options.tcp_receive_buffer_size));
133 }
134 if (addr.address()->sa_family != AF_UNIX && !ResolvedAddressIsVSock(addr)) {
135 // If its not a unix socket or vsock address.
136 GRPC_RETURN_IF_ERROR(sock.SetSocketLowLatency(1));
137 GRPC_RETURN_IF_ERROR(sock.SetSocketReuseAddr(1));
138 GRPC_RETURN_IF_ERROR(sock.SetSocketDscp(options.dscp));
139 sock.TrySetSocketTcpUserTimeout(options, true);
140 }
141 GRPC_RETURN_IF_ERROR(sock.SetSocketNoSigpipeIfPossible());
142 GRPC_RETURN_IF_ERROR(sock.ApplySocketMutatorInOptions(
143 GRPC_FD_CLIENT_CONNECTION_USAGE, options));
144 // No errors. Set close_fd to false to ensure the socket is not closed.
145 close_fd = false;
146 return absl::OkStatus();
147 }
148
149 #endif // GRPC_POSIX_SOCKET_UTILS_COMMON
150
151 } // namespace
152
153 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
154 #ifndef GRPC_SET_SOCKET_DUALSTACK_CUSTOM
155
SetSocketDualStack(int fd)156 bool SetSocketDualStack(int fd) {
157 const int off = 0;
158 return 0 == setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &off, sizeof(off));
159 }
160
161 #endif // GRPC_SET_SOCKET_DUALSTACK_CUSTOM
162 #endif // GRPC_POSIX_SOCKET_UTILS_COMMON
163
TcpOptionsFromEndpointConfig(const EndpointConfig & config)164 PosixTcpOptions TcpOptionsFromEndpointConfig(const EndpointConfig& config) {
165 void* value;
166 PosixTcpOptions options;
167 options.tcp_read_chunk_size = AdjustValue(
168 PosixTcpOptions::kDefaultReadChunkSize, 1, PosixTcpOptions::kMaxChunkSize,
169 config.GetInt(GRPC_ARG_TCP_READ_CHUNK_SIZE));
170 options.tcp_min_read_chunk_size =
171 AdjustValue(PosixTcpOptions::kDefaultMinReadChunksize, 1,
172 PosixTcpOptions::kMaxChunkSize,
173 config.GetInt(GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE));
174 options.tcp_max_read_chunk_size =
175 AdjustValue(PosixTcpOptions::kDefaultMaxReadChunksize, 1,
176 PosixTcpOptions::kMaxChunkSize,
177 config.GetInt(GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE));
178 options.tcp_tx_zerocopy_send_bytes_threshold =
179 AdjustValue(PosixTcpOptions::kDefaultSendBytesThreshold, 0, INT_MAX,
180 config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_SEND_BYTES_THRESHOLD));
181 options.tcp_tx_zerocopy_max_simultaneous_sends =
182 AdjustValue(PosixTcpOptions::kDefaultMaxSends, 0, INT_MAX,
183 config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_MAX_SIMULT_SENDS));
184 options.tcp_receive_buffer_size =
185 AdjustValue(PosixTcpOptions::kReadBufferSizeUnset, 0, INT_MAX,
186 config.GetInt(GRPC_ARG_TCP_RECEIVE_BUFFER_SIZE));
187 options.tcp_tx_zero_copy_enabled =
188 (AdjustValue(PosixTcpOptions::kZerocpTxEnabledDefault, 0, 1,
189 config.GetInt(GRPC_ARG_TCP_TX_ZEROCOPY_ENABLED)) != 0);
190 options.keep_alive_time_ms =
191 AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_KEEPALIVE_TIME_MS));
192 options.keep_alive_timeout_ms =
193 AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS));
194 options.expand_wildcard_addrs =
195 (AdjustValue(0, 1, INT_MAX,
196 config.GetInt(GRPC_ARG_EXPAND_WILDCARD_ADDRS)) != 0);
197 options.dscp = AdjustValue(PosixTcpOptions::kDscpNotSet, 0, 63,
198 config.GetInt(GRPC_ARG_DSCP));
199 options.allow_reuse_port = PosixSocketWrapper::IsSocketReusePortSupported();
200 auto allow_reuse_port_value = config.GetInt(GRPC_ARG_ALLOW_REUSEPORT);
201 if (allow_reuse_port_value.has_value()) {
202 options.allow_reuse_port =
203 (AdjustValue(0, 1, INT_MAX, config.GetInt(GRPC_ARG_ALLOW_REUSEPORT)) !=
204 0);
205 }
206 if (options.tcp_min_read_chunk_size > options.tcp_max_read_chunk_size) {
207 options.tcp_min_read_chunk_size = options.tcp_max_read_chunk_size;
208 }
209 options.tcp_read_chunk_size = grpc_core::Clamp(
210 options.tcp_read_chunk_size, options.tcp_min_read_chunk_size,
211 options.tcp_max_read_chunk_size);
212
213 value = config.GetVoidPointer(GRPC_ARG_RESOURCE_QUOTA);
214 if (value != nullptr) {
215 options.resource_quota =
216 reinterpret_cast<grpc_core::ResourceQuota*>(value)->Ref();
217 }
218 value = config.GetVoidPointer(GRPC_ARG_SOCKET_MUTATOR);
219 if (value != nullptr) {
220 options.socket_mutator =
221 grpc_socket_mutator_ref(static_cast<grpc_socket_mutator*>(value));
222 }
223 value =
224 config.GetVoidPointer(GRPC_ARG_EVENT_ENGINE_USE_MEMORY_ALLOCATOR_FACTORY);
225 if (value != nullptr) {
226 options.memory_allocator_factory =
227 static_cast<grpc_event_engine::experimental::MemoryAllocatorFactory*>(
228 value);
229 }
230 return options;
231 }
232
233 #ifdef GRPC_POSIX_SOCKETUTILS
234
Accept4(int sockfd,grpc_event_engine::experimental::EventEngine::ResolvedAddress & addr,int nonblock,int cloexec)235 int Accept4(int sockfd,
236 grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr,
237 int nonblock, int cloexec) {
238 int fd, flags;
239 EventEngine::ResolvedAddress peer_addr;
240 socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
241 fd = accept(sockfd, const_cast<sockaddr*>(peer_addr.address()), &len);
242 if (fd >= 0) {
243 if (nonblock) {
244 flags = fcntl(fd, F_GETFL, 0);
245 if (flags < 0) goto close_and_error;
246 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0) goto close_and_error;
247 }
248 if (cloexec) {
249 flags = fcntl(fd, F_GETFD, 0);
250 if (flags < 0) goto close_and_error;
251 if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC) != 0) goto close_and_error;
252 }
253 }
254 addr = EventEngine::ResolvedAddress(peer_addr.address(), len);
255 return fd;
256
257 close_and_error:
258 close(fd);
259 return -1;
260 }
261
262 #elif GRPC_LINUX_SOCKETUTILS
263
Accept4(int sockfd,grpc_event_engine::experimental::EventEngine::ResolvedAddress & addr,int nonblock,int cloexec)264 int Accept4(int sockfd,
265 grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr,
266 int nonblock, int cloexec) {
267 int flags = 0;
268 flags |= nonblock ? SOCK_NONBLOCK : 0;
269 flags |= cloexec ? SOCK_CLOEXEC : 0;
270 EventEngine::ResolvedAddress peer_addr;
271 socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
272 int ret =
273 accept4(sockfd, const_cast<sockaddr*>(peer_addr.address()), &len, flags);
274 addr = EventEngine::ResolvedAddress(peer_addr.address(), len);
275 return ret;
276 }
277
278 #endif // GRPC_LINUX_SOCKETUTILS
279
280 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
281
UnlinkIfUnixDomainSocket(const EventEngine::ResolvedAddress & resolved_addr)282 void UnlinkIfUnixDomainSocket(
283 const EventEngine::ResolvedAddress& resolved_addr) {
284 #ifdef GRPC_HAVE_UNIX_SOCKET
285 if (resolved_addr.address()->sa_family != AF_UNIX) {
286 return;
287 }
288 struct sockaddr_un* un = reinterpret_cast<struct sockaddr_un*>(
289 const_cast<sockaddr*>(resolved_addr.address()));
290
291 // There is nothing to unlink for an abstract unix socket
292 if (un->sun_path[0] == '\0' && un->sun_path[1] != '\0') {
293 return;
294 }
295
296 struct stat st;
297 if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) {
298 unlink(un->sun_path);
299 }
300 #else
301 (void)resolved_addr;
302 #endif
303 }
304
305 // Instruct the kernel to wait for specified number of bytes to be received on
306 // the socket before generating an interrupt for packet receive. If the call
307 // succeeds, it returns the number of bytes (wait threshold) that was actually
308 // set.
SetSocketRcvLowat(int bytes)309 absl::StatusOr<int> PosixSocketWrapper::SetSocketRcvLowat(int bytes) {
310 if (setsockopt(fd_, SOL_SOCKET, SO_RCVLOWAT, &bytes, sizeof(bytes)) != 0) {
311 return absl::Status(
312 absl::StatusCode::kInternal,
313 absl::StrCat("setsockopt(SO_RCVLOWAT): ", grpc_core::StrError(errno)));
314 }
315 return bytes;
316 }
317
318 // Set a socket to use zerocopy
SetSocketZeroCopy()319 absl::Status PosixSocketWrapper::SetSocketZeroCopy() {
320 #ifdef GRPC_LINUX_ERRQUEUE
321 const int enable = 1;
322 auto err = setsockopt(fd_, SOL_SOCKET, SO_ZEROCOPY, &enable, sizeof(enable));
323 if (err != 0) {
324 return absl::Status(
325 absl::StatusCode::kInternal,
326 absl::StrCat("setsockopt(SO_ZEROCOPY): ", grpc_core::StrError(errno)));
327 }
328 return absl::OkStatus();
329 #else
330 return absl::Status(absl::StatusCode::kInternal,
331 absl::StrCat("setsockopt(SO_ZEROCOPY): ",
332 grpc_core::StrError(ENOSYS).c_str()));
333 #endif
334 }
335
336 // Set a socket to non blocking mode
SetSocketNonBlocking(int non_blocking)337 absl::Status PosixSocketWrapper::SetSocketNonBlocking(int non_blocking) {
338 int oldflags = fcntl(fd_, F_GETFL, 0);
339 if (oldflags < 0) {
340 return absl::Status(absl::StatusCode::kInternal,
341 absl::StrCat("fcntl: ", grpc_core::StrError(errno)));
342 }
343
344 if (non_blocking) {
345 oldflags |= O_NONBLOCK;
346 } else {
347 oldflags &= ~O_NONBLOCK;
348 }
349
350 if (fcntl(fd_, F_SETFL, oldflags) != 0) {
351 return absl::Status(absl::StatusCode::kInternal,
352 absl::StrCat("fcntl: ", grpc_core::StrError(errno)));
353 }
354
355 return absl::OkStatus();
356 }
357
SetSocketNoSigpipeIfPossible()358 absl::Status PosixSocketWrapper::SetSocketNoSigpipeIfPossible() {
359 #ifdef GRPC_HAVE_SO_NOSIGPIPE
360 int val = 1;
361 int newval;
362 socklen_t intlen = sizeof(newval);
363 if (0 != setsockopt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))) {
364 return absl::Status(
365 absl::StatusCode::kInternal,
366 absl::StrCat("setsockopt(SO_NOSIGPIPE): ", grpc_core::StrError(errno)));
367 }
368 if (0 != getsockopt(fd_, SOL_SOCKET, SO_NOSIGPIPE, &newval, &intlen)) {
369 return absl::Status(
370 absl::StatusCode::kInternal,
371 absl::StrCat("getsockopt(SO_NOSIGPIPE): ", grpc_core::StrError(errno)));
372 }
373 if ((newval != 0) != (val != 0)) {
374 return absl::Status(absl::StatusCode::kInternal,
375 "Failed to set SO_NOSIGPIPE");
376 }
377 #endif
378 return absl::OkStatus();
379 }
380
SetSocketIpPktInfoIfPossible()381 absl::Status PosixSocketWrapper::SetSocketIpPktInfoIfPossible() {
382 #ifdef GRPC_HAVE_IP_PKTINFO
383 int get_local_ip = 1;
384 if (0 != setsockopt(fd_, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
385 sizeof(get_local_ip))) {
386 return absl::Status(
387 absl::StatusCode::kInternal,
388 absl::StrCat("setsockopt(IP_PKTINFO): ", grpc_core::StrError(errno)));
389 }
390 #endif
391 return absl::OkStatus();
392 }
393
SetSocketIpv6RecvPktInfoIfPossible()394 absl::Status PosixSocketWrapper::SetSocketIpv6RecvPktInfoIfPossible() {
395 #ifdef GRPC_HAVE_IPV6_RECVPKTINFO
396 int get_local_ip = 1;
397 if (0 != setsockopt(fd_, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
398 sizeof(get_local_ip))) {
399 return absl::Status(absl::StatusCode::kInternal,
400 absl::StrCat("setsockopt(IPV6_RECVPKTINFO): ",
401 grpc_core::StrError(errno)));
402 }
403 #endif
404 return absl::OkStatus();
405 }
406
SetSocketSndBuf(int buffer_size_bytes)407 absl::Status PosixSocketWrapper::SetSocketSndBuf(int buffer_size_bytes) {
408 return 0 == setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &buffer_size_bytes,
409 sizeof(buffer_size_bytes))
410 ? absl::OkStatus()
411 : absl::Status(absl::StatusCode::kInternal,
412 absl::StrCat("setsockopt(SO_SNDBUF): ",
413 grpc_core::StrError(errno)));
414 }
415
SetSocketRcvBuf(int buffer_size_bytes)416 absl::Status PosixSocketWrapper::SetSocketRcvBuf(int buffer_size_bytes) {
417 return 0 == setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &buffer_size_bytes,
418 sizeof(buffer_size_bytes))
419 ? absl::OkStatus()
420 : absl::Status(absl::StatusCode::kInternal,
421 absl::StrCat("setsockopt(SO_RCVBUF): ",
422 grpc_core::StrError(errno)));
423 }
424
425 // Set a socket to close on exec
SetSocketCloexec(int close_on_exec)426 absl::Status PosixSocketWrapper::SetSocketCloexec(int close_on_exec) {
427 int oldflags = fcntl(fd_, F_GETFD, 0);
428 if (oldflags < 0) {
429 return absl::Status(absl::StatusCode::kInternal,
430 absl::StrCat("fcntl: ", grpc_core::StrError(errno)));
431 }
432
433 if (close_on_exec) {
434 oldflags |= FD_CLOEXEC;
435 } else {
436 oldflags &= ~FD_CLOEXEC;
437 }
438
439 if (fcntl(fd_, F_SETFD, oldflags) != 0) {
440 return absl::Status(absl::StatusCode::kInternal,
441 absl::StrCat("fcntl: ", grpc_core::StrError(errno)));
442 }
443
444 return absl::OkStatus();
445 }
446
447 // set a socket to reuse old addresses
SetSocketReuseAddr(int reuse)448 absl::Status PosixSocketWrapper::SetSocketReuseAddr(int reuse) {
449 int val = (reuse != 0);
450 int newval;
451 socklen_t intlen = sizeof(newval);
452 if (0 != setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val))) {
453 return absl::Status(
454 absl::StatusCode::kInternal,
455 absl::StrCat("setsockopt(SO_REUSEADDR): ", grpc_core::StrError(errno)));
456 }
457 if (0 != getsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &newval, &intlen)) {
458 return absl::Status(
459 absl::StatusCode::kInternal,
460 absl::StrCat("getsockopt(SO_REUSEADDR): ", grpc_core::StrError(errno)));
461 }
462 if ((newval != 0) != val) {
463 return absl::Status(absl::StatusCode::kInternal,
464 "Failed to set SO_REUSEADDR");
465 }
466
467 return absl::OkStatus();
468 }
469
470 // set a socket to reuse old ports
SetSocketReusePort(int reuse)471 absl::Status PosixSocketWrapper::SetSocketReusePort(int reuse) {
472 #ifndef SO_REUSEPORT
473 return absl::Status(absl::StatusCode::kInternal,
474 "SO_REUSEPORT unavailable on compiling system");
475 #else
476 int val = (reuse != 0);
477 int newval;
478 socklen_t intlen = sizeof(newval);
479 if (0 != setsockopt(fd_, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val))) {
480 return absl::Status(
481 absl::StatusCode::kInternal,
482 absl::StrCat("setsockopt(SO_REUSEPORT): ", grpc_core::StrError(errno)));
483 }
484 if (0 != getsockopt(fd_, SOL_SOCKET, SO_REUSEPORT, &newval, &intlen)) {
485 return absl::Status(
486 absl::StatusCode::kInternal,
487 absl::StrCat("getsockopt(SO_REUSEPORT): ", grpc_core::StrError(errno)));
488 }
489 if ((newval != 0) != val) {
490 return absl::Status(absl::StatusCode::kInternal,
491 "Failed to set SO_REUSEPORT");
492 }
493
494 return absl::OkStatus();
495 #endif
496 }
497
IsSocketReusePortSupported()498 bool PosixSocketWrapper::IsSocketReusePortSupported() {
499 static bool kSupportSoReusePort = []() -> bool {
500 int s = socket(AF_INET, SOCK_STREAM, 0);
501 if (s < 0) {
502 // This might be an ipv6-only environment in which case
503 // 'socket(AF_INET,..)' call would fail. Try creating IPv6 socket in
504 // that case
505 s = socket(AF_INET6, SOCK_STREAM, 0);
506 }
507 if (s >= 0) {
508 PosixSocketWrapper sock(s);
509 bool result = sock.SetSocketReusePort(1).ok();
510 close(sock.Fd());
511 return result;
512 } else {
513 return false;
514 }
515 }();
516 return kSupportSoReusePort;
517 }
518
519 // Disable nagle algorithm
SetSocketLowLatency(int low_latency)520 absl::Status PosixSocketWrapper::SetSocketLowLatency(int low_latency) {
521 int val = (low_latency != 0);
522 int newval;
523 socklen_t intlen = sizeof(newval);
524 if (0 != setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) {
525 return absl::Status(
526 absl::StatusCode::kInternal,
527 absl::StrCat("setsockopt(TCP_NODELAY): ", grpc_core::StrError(errno)));
528 }
529 if (0 != getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &newval, &intlen)) {
530 return absl::Status(
531 absl::StatusCode::kInternal,
532 absl::StrCat("getsockopt(TCP_NODELAY): ", grpc_core::StrError(errno)));
533 }
534 if ((newval != 0) != val) {
535 return absl::Status(absl::StatusCode::kInternal,
536 "Failed to set TCP_NODELAY");
537 }
538 return absl::OkStatus();
539 }
540
541 // Set Differentiated Services Code Point (DSCP)
SetSocketDscp(int dscp)542 absl::Status PosixSocketWrapper::SetSocketDscp(int dscp) {
543 if (dscp == PosixTcpOptions::kDscpNotSet) {
544 return absl::OkStatus();
545 }
546 // The TOS/TrafficClass byte consists of following bits:
547 // | 7 6 5 4 3 2 | 1 0 |
548 // | DSCP | ECN |
549 int newval = dscp << 2;
550 int val;
551 socklen_t intlen = sizeof(val);
552 // Get ECN bits from current IP_TOS value unless IPv6 only
553 if (0 == getsockopt(fd_, IPPROTO_IP, IP_TOS, &val, &intlen)) {
554 newval |= (val & 0x3);
555 if (0 != setsockopt(fd_, IPPROTO_IP, IP_TOS, &newval, sizeof(newval))) {
556 return absl::Status(
557 absl::StatusCode::kInternal,
558 absl::StrCat("setsockopt(IP_TOS): ", grpc_core::StrError(errno)));
559 }
560 }
561 // Get ECN from current Traffic Class value if IPv6 is available
562 if (0 == getsockopt(fd_, IPPROTO_IPV6, IPV6_TCLASS, &val, &intlen)) {
563 newval |= (val & 0x3);
564 if (0 !=
565 setsockopt(fd_, IPPROTO_IPV6, IPV6_TCLASS, &newval, sizeof(newval))) {
566 return absl::Status(absl::StatusCode::kInternal,
567 absl::StrCat("setsockopt(IPV6_TCLASS): ",
568 grpc_core::StrError(errno)));
569 }
570 }
571 return absl::OkStatus();
572 }
573
574 #if GPR_LINUX == 1
575 // For Linux, it will be detected to support TCP_USER_TIMEOUT
576 #ifndef TCP_USER_TIMEOUT
577 #define TCP_USER_TIMEOUT 18
578 #endif
579 #define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT 0
580 #else
581 // For non-Linux, TCP_USER_TIMEOUT will be used if TCP_USER_TIMEOUT is defined.
582 #ifdef TCP_USER_TIMEOUT
583 #define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT 0
584 #else
585 #define TCP_USER_TIMEOUT 0
586 #define SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT -1
587 #endif // TCP_USER_TIMEOUT
588 #endif // GPR_LINUX == 1
589
590 // Whether the socket supports TCP_USER_TIMEOUT option.
591 // (0: don't know, 1: support, -1: not support)
592 static std::atomic<int> g_socket_supports_tcp_user_timeout(
593 SOCKET_SUPPORTS_TCP_USER_TIMEOUT_DEFAULT);
594
ConfigureDefaultTcpUserTimeout(bool enable,int timeout,bool is_client)595 void PosixSocketWrapper::ConfigureDefaultTcpUserTimeout(bool enable,
596 int timeout,
597 bool is_client) {
598 if (is_client) {
599 kDefaultClientUserTimeoutEnabled = enable;
600 if (timeout > 0) {
601 kDefaultClientUserTimeoutMs = timeout;
602 }
603 } else {
604 kDefaultServerUserTimeoutEnabled = enable;
605 if (timeout > 0) {
606 kDefaultServerUserTimeoutMs = timeout;
607 }
608 }
609 }
610
611 // Set TCP_USER_TIMEOUT
TrySetSocketTcpUserTimeout(const PosixTcpOptions & options,bool is_client)612 void PosixSocketWrapper::TrySetSocketTcpUserTimeout(
613 const PosixTcpOptions& options, bool is_client) {
614 if (g_socket_supports_tcp_user_timeout.load() < 0) {
615 return;
616 }
617 bool enable = is_client ? kDefaultClientUserTimeoutEnabled
618 : kDefaultServerUserTimeoutEnabled;
619 int timeout =
620 is_client ? kDefaultClientUserTimeoutMs : kDefaultServerUserTimeoutMs;
621 if (options.keep_alive_time_ms > 0) {
622 enable = options.keep_alive_time_ms != INT_MAX;
623 }
624 if (options.keep_alive_timeout_ms > 0) {
625 timeout = options.keep_alive_timeout_ms;
626 }
627 if (enable) {
628 int newval;
629 socklen_t len = sizeof(newval);
630 // If this is the first time to use TCP_USER_TIMEOUT, try to check
631 // if it is available.
632 if (g_socket_supports_tcp_user_timeout.load() == 0) {
633 if (0 != getsockopt(fd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) {
634 // This log is intentionally not protected behind a flag, so that users
635 // know that TCP_USER_TIMEOUT is not being used.
636 GRPC_TRACE_LOG(tcp, INFO)
637 << "TCP_USER_TIMEOUT is not available. TCP_USER_TIMEOUT "
638 "won't be used thereafter";
639 g_socket_supports_tcp_user_timeout.store(-1);
640 } else {
641 GRPC_TRACE_LOG(tcp, INFO)
642 << "TCP_USER_TIMEOUT is available. TCP_USER_TIMEOUT will be "
643 "used thereafter";
644 g_socket_supports_tcp_user_timeout.store(1);
645 }
646 }
647 if (g_socket_supports_tcp_user_timeout.load() > 0) {
648 if (0 != setsockopt(fd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout,
649 sizeof(timeout))) {
650 LOG(ERROR) << "setsockopt(TCP_USER_TIMEOUT) "
651 << grpc_core::StrError(errno);
652 return;
653 }
654 if (0 != getsockopt(fd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &newval, &len)) {
655 LOG(ERROR) << "getsockopt(TCP_USER_TIMEOUT) "
656 << grpc_core::StrError(errno);
657 return;
658 }
659 if (newval != timeout) {
660 // Do not fail on failing to set TCP_USER_TIMEOUT
661 LOG(ERROR) << "Failed to set TCP_USER_TIMEOUT";
662 return;
663 }
664 }
665 }
666 }
667
668 // Set a socket using a grpc_socket_mutator
SetSocketMutator(grpc_fd_usage usage,grpc_socket_mutator * mutator)669 absl::Status PosixSocketWrapper::SetSocketMutator(
670 grpc_fd_usage usage, grpc_socket_mutator* mutator) {
671 CHECK(mutator);
672 if (!grpc_socket_mutator_mutate_fd(mutator, fd_, usage)) {
673 return absl::Status(absl::StatusCode::kInternal,
674 "grpc_socket_mutator failed.");
675 }
676 return absl::OkStatus();
677 }
678
ApplySocketMutatorInOptions(grpc_fd_usage usage,const PosixTcpOptions & options)679 absl::Status PosixSocketWrapper::ApplySocketMutatorInOptions(
680 grpc_fd_usage usage, const PosixTcpOptions& options) {
681 if (options.socket_mutator == nullptr) {
682 return absl::OkStatus();
683 }
684 return SetSocketMutator(usage, options.socket_mutator);
685 }
686
IsIpv6LoopbackAvailable()687 bool PosixSocketWrapper::IsIpv6LoopbackAvailable() {
688 static bool kIpv6LoopbackAvailable = []() -> bool {
689 int fd = socket(AF_INET6, SOCK_STREAM, 0);
690 bool loopback_available = false;
691 if (fd < 0) {
692 GRPC_TRACE_LOG(tcp, INFO)
693 << "Disabling AF_INET6 sockets because socket() failed.";
694 } else {
695 sockaddr_in6 addr;
696 memset(&addr, 0, sizeof(addr));
697 addr.sin6_family = AF_INET6;
698 addr.sin6_addr.s6_addr[15] = 1; // [::1]:0
699 if (bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == 0) {
700 loopback_available = true;
701 } else {
702 GRPC_TRACE_LOG(tcp, INFO)
703 << "Disabling AF_INET6 sockets because ::1 is not available.";
704 }
705 close(fd);
706 }
707 return loopback_available;
708 }();
709 return kIpv6LoopbackAvailable;
710 }
711
712 absl::StatusOr<EventEngine::ResolvedAddress>
LocalAddress()713 PosixSocketWrapper::LocalAddress() {
714 EventEngine::ResolvedAddress addr;
715 socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
716 if (getsockname(fd_, const_cast<sockaddr*>(addr.address()), &len) < 0) {
717 return absl::InternalError(
718 absl::StrCat("getsockname:", grpc_core::StrError(errno)));
719 }
720 return EventEngine::ResolvedAddress(addr.address(), len);
721 }
722
PeerAddress()723 absl::StatusOr<EventEngine::ResolvedAddress> PosixSocketWrapper::PeerAddress() {
724 EventEngine::ResolvedAddress addr;
725 socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
726 if (getpeername(fd_, const_cast<sockaddr*>(addr.address()), &len) < 0) {
727 return absl::InternalError(
728 absl::StrCat("getpeername:", grpc_core::StrError(errno)));
729 }
730 return EventEngine::ResolvedAddress(addr.address(), len);
731 }
732
LocalAddressString()733 absl::StatusOr<std::string> PosixSocketWrapper::LocalAddressString() {
734 auto status = LocalAddress();
735 if (!status.ok()) {
736 return status.status();
737 }
738 return ResolvedAddressToNormalizedString((*status));
739 }
740
PeerAddressString()741 absl::StatusOr<std::string> PosixSocketWrapper::PeerAddressString() {
742 auto status = PeerAddress();
743 if (!status.ok()) {
744 return status.status();
745 }
746 return ResolvedAddressToNormalizedString((*status));
747 }
748
CreateDualStackSocket(std::function<int (int,int,int)> socket_factory,const experimental::EventEngine::ResolvedAddress & addr,int type,int protocol,PosixSocketWrapper::DSMode & dsmode)749 absl::StatusOr<PosixSocketWrapper> PosixSocketWrapper::CreateDualStackSocket(
750 std::function<int(int, int, int)> socket_factory,
751 const experimental::EventEngine::ResolvedAddress& addr, int type,
752 int protocol, PosixSocketWrapper::DSMode& dsmode) {
753 const sockaddr* sock_addr = addr.address();
754 int family = sock_addr->sa_family;
755 int newfd;
756 if (family == AF_INET6) {
757 if (IsIpv6LoopbackAvailable()) {
758 newfd = CreateSocket(socket_factory, family, type, protocol);
759 } else {
760 newfd = -1;
761 errno = EAFNOSUPPORT;
762 }
763 // Check if we've got a valid dualstack socket.
764 if (newfd > 0 && SetSocketDualStack(newfd)) {
765 dsmode = PosixSocketWrapper::DSMode::DSMODE_DUALSTACK;
766 return PosixSocketWrapper(newfd);
767 }
768 // If this isn't an IPv4 address, then return whatever we've got.
769 if (!ResolvedAddressIsV4Mapped(addr, nullptr)) {
770 if (newfd < 0) {
771 return ErrorForFd(newfd, addr);
772 }
773 dsmode = PosixSocketWrapper::DSMode::DSMODE_IPV6;
774 return PosixSocketWrapper(newfd);
775 }
776 // Fall back to AF_INET.
777 if (newfd >= 0) {
778 close(newfd);
779 }
780 family = AF_INET;
781 }
782 dsmode = family == AF_INET ? PosixSocketWrapper::DSMode::DSMODE_IPV4
783 : PosixSocketWrapper::DSMode::DSMODE_NONE;
784 newfd = CreateSocket(socket_factory, family, type, protocol);
785 if (newfd < 0) {
786 return ErrorForFd(newfd, addr);
787 }
788 return PosixSocketWrapper(newfd);
789 }
790
791 absl::StatusOr<PosixSocketWrapper::PosixSocketCreateResult>
CreateAndPrepareTcpClientSocket(const PosixTcpOptions & options,const EventEngine::ResolvedAddress & target_addr)792 PosixSocketWrapper::CreateAndPrepareTcpClientSocket(
793 const PosixTcpOptions& options,
794 const EventEngine::ResolvedAddress& target_addr) {
795 PosixSocketWrapper::DSMode dsmode;
796 EventEngine::ResolvedAddress mapped_target_addr;
797
798 // Use dualstack sockets where available. Set mapped to v6 or v4 mapped to
799 // v6.
800 if (!ResolvedAddressToV4Mapped(target_addr, &mapped_target_addr)) {
801 // addr is v4 mapped to v6 or just v6.
802 mapped_target_addr = target_addr;
803 }
804 absl::StatusOr<PosixSocketWrapper> posix_socket_wrapper =
805 PosixSocketWrapper::CreateDualStackSocket(nullptr, mapped_target_addr,
806 SOCK_STREAM, 0, dsmode);
807 if (!posix_socket_wrapper.ok()) {
808 return posix_socket_wrapper.status();
809 }
810
811 if (dsmode == PosixSocketWrapper::DSMode::DSMODE_IPV4) {
812 // Original addr is either v4 or v4 mapped to v6. Set mapped_addr to v4.
813 if (!ResolvedAddressIsV4Mapped(target_addr, &mapped_target_addr)) {
814 mapped_target_addr = target_addr;
815 }
816 }
817
818 auto error = PrepareTcpClientSocket(*posix_socket_wrapper, mapped_target_addr,
819 options);
820 if (!error.ok()) {
821 return error;
822 }
823 return PosixSocketWrapper::PosixSocketCreateResult{*posix_socket_wrapper,
824 mapped_target_addr};
825 }
826
827 #else // GRPC_POSIX_SOCKET_UTILS_COMMON
828
SetSocketRcvLowat(int)829 absl::StatusOr<int> PosixSocketWrapper::SetSocketRcvLowat(int /*bytes*/) {
830 grpc_core::Crash("unimplemented");
831 }
832
SetSocketZeroCopy()833 absl::Status PosixSocketWrapper::SetSocketZeroCopy() {
834 grpc_core::Crash("unimplemented");
835 }
836
SetSocketNonBlocking(int)837 absl::Status PosixSocketWrapper::SetSocketNonBlocking(int /*non_blocking*/) {
838 grpc_core::Crash("unimplemented");
839 }
840
SetSocketCloexec(int)841 absl::Status PosixSocketWrapper::SetSocketCloexec(int /*close_on_exec*/) {
842 grpc_core::Crash("unimplemented");
843 }
844
SetSocketReuseAddr(int)845 absl::Status PosixSocketWrapper::SetSocketReuseAddr(int /*reuse*/) {
846 grpc_core::Crash("unimplemented");
847 }
848
SetSocketLowLatency(int)849 absl::Status PosixSocketWrapper::SetSocketLowLatency(int /*low_latency*/) {
850 grpc_core::Crash("unimplemented");
851 }
852
SetSocketReusePort(int)853 absl::Status PosixSocketWrapper::SetSocketReusePort(int /*reuse*/) {
854 grpc_core::Crash("unimplemented");
855 }
856
SetSocketDscp(int)857 absl::Status PosixSocketWrapper::SetSocketDscp(int /*dscp*/) {
858 grpc_core::Crash("unimplemented");
859 }
860
ConfigureDefaultTcpUserTimeout(bool,int,bool)861 void PosixSocketWrapper::ConfigureDefaultTcpUserTimeout(bool /*enable*/,
862 int /*timeout*/,
863 bool /*is_client*/) {}
864
TrySetSocketTcpUserTimeout(const PosixTcpOptions &,bool)865 void PosixSocketWrapper::TrySetSocketTcpUserTimeout(
866 const PosixTcpOptions& /*options*/, bool /*is_client*/) {
867 grpc_core::Crash("unimplemented");
868 }
869
SetSocketNoSigpipeIfPossible()870 absl::Status PosixSocketWrapper::SetSocketNoSigpipeIfPossible() {
871 grpc_core::Crash("unimplemented");
872 }
873
SetSocketIpPktInfoIfPossible()874 absl::Status PosixSocketWrapper::SetSocketIpPktInfoIfPossible() {
875 grpc_core::Crash("unimplemented");
876 }
877
SetSocketIpv6RecvPktInfoIfPossible()878 absl::Status PosixSocketWrapper::SetSocketIpv6RecvPktInfoIfPossible() {
879 grpc_core::Crash("unimplemented");
880 }
881
SetSocketSndBuf(int)882 absl::Status PosixSocketWrapper::SetSocketSndBuf(int /*buffer_size_bytes*/) {
883 grpc_core::Crash("unimplemented");
884 }
885
SetSocketRcvBuf(int)886 absl::Status PosixSocketWrapper::SetSocketRcvBuf(int /*buffer_size_bytes*/) {
887 grpc_core::Crash("unimplemented");
888 }
889
SetSocketMutator(grpc_fd_usage,grpc_socket_mutator *)890 absl::Status PosixSocketWrapper::SetSocketMutator(
891 grpc_fd_usage /*usage*/, grpc_socket_mutator* /*mutator*/) {
892 grpc_core::Crash("unimplemented");
893 }
894
ApplySocketMutatorInOptions(grpc_fd_usage,const PosixTcpOptions &)895 absl::Status PosixSocketWrapper::ApplySocketMutatorInOptions(
896 grpc_fd_usage /*usage*/, const PosixTcpOptions& /*options*/) {
897 grpc_core::Crash("unimplemented");
898 }
899
IsSocketReusePortSupported()900 bool PosixSocketWrapper::IsSocketReusePortSupported() {
901 grpc_core::Crash("unimplemented");
902 }
903
IsIpv6LoopbackAvailable()904 bool PosixSocketWrapper::IsIpv6LoopbackAvailable() {
905 grpc_core::Crash("unimplemented");
906 }
907
CreateDualStackSocket(std::function<int (int,int,int)>,const experimental::EventEngine::ResolvedAddress &,int,int,DSMode &)908 absl::StatusOr<PosixSocketWrapper> PosixSocketWrapper::CreateDualStackSocket(
909 std::function<int(int /*domain*/, int /*type*/, int /*protocol*/)>
910 /* socket_factory */,
911 const experimental::EventEngine::ResolvedAddress& /*addr*/, int /*type*/,
912 int /*protocol*/, DSMode& /*dsmode*/) {
913 grpc_core::Crash("unimplemented");
914 }
915
916 absl::StatusOr<PosixSocketWrapper::PosixSocketCreateResult>
CreateAndPrepareTcpClientSocket(const PosixTcpOptions &,const EventEngine::ResolvedAddress &)917 PosixSocketWrapper::CreateAndPrepareTcpClientSocket(
918 const PosixTcpOptions& /*options*/,
919 const EventEngine::ResolvedAddress& /*target_addr*/) {
920 grpc_core::Crash("unimplemented");
921 }
922
923 #endif // GRPC_POSIX_SOCKET_UTILS_COMMON
924
925 } // namespace experimental
926 } // namespace grpc_event_engine
927