1 // Copyright 2022 The gRPC Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H 16 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H 17 18 #include <grpc/event_engine/endpoint_config.h> 19 #include <grpc/event_engine/event_engine.h> 20 #include <grpc/event_engine/memory_allocator.h> 21 #include <grpc/grpc.h> 22 #include <grpc/support/port_platform.h> 23 24 #include <functional> 25 #include <string> 26 #include <utility> 27 28 #include "absl/log/check.h" 29 #include "absl/status/status.h" 30 #include "absl/status/statusor.h" 31 #include "src/core/lib/iomgr/port.h" 32 #include "src/core/lib/iomgr/socket_mutator.h" 33 #include "src/core/lib/resource_quota/resource_quota.h" 34 #include "src/core/util/ref_counted_ptr.h" 35 36 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON 37 #include <sys/socket.h> 38 #endif 39 40 #ifdef GRPC_LINUX_ERRQUEUE 41 #ifndef SO_ZEROCOPY 42 #define SO_ZEROCOPY 60 43 #endif 44 #ifndef SO_EE_ORIGIN_ZEROCOPY 45 #define SO_EE_ORIGIN_ZEROCOPY 5 46 #endif 47 #endif // ifdef GRPC_LINUX_ERRQUEUE 48 49 namespace grpc_event_engine { 50 namespace experimental { 51 52 struct PosixTcpOptions { 53 static constexpr int kDefaultReadChunkSize = 8192; 54 static constexpr int kDefaultMinReadChunksize = 256; 55 static constexpr int kDefaultMaxReadChunksize = 4 * 1024 * 1024; 56 static constexpr int kZerocpTxEnabledDefault = 0; 57 static constexpr int kMaxChunkSize = 32 * 1024 * 1024; 58 static constexpr int kDefaultMaxSends = 4; 59 static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024; 60 // Let the system decide the proper buffer size. 61 static constexpr int kReadBufferSizeUnset = -1; 62 static constexpr int kDscpNotSet = -1; 63 int tcp_read_chunk_size = kDefaultReadChunkSize; 64 int tcp_min_read_chunk_size = kDefaultMinReadChunksize; 65 int tcp_max_read_chunk_size = kDefaultMaxReadChunksize; 66 int tcp_tx_zerocopy_send_bytes_threshold = kDefaultSendBytesThreshold; 67 int tcp_tx_zerocopy_max_simultaneous_sends = kDefaultMaxSends; 68 int tcp_receive_buffer_size = kReadBufferSizeUnset; 69 bool tcp_tx_zero_copy_enabled = kZerocpTxEnabledDefault; 70 int keep_alive_time_ms = 0; 71 int keep_alive_timeout_ms = 0; 72 bool expand_wildcard_addrs = false; 73 bool allow_reuse_port = false; 74 int dscp = kDscpNotSet; 75 grpc_core::RefCountedPtr<grpc_core::ResourceQuota> resource_quota; 76 struct grpc_socket_mutator* socket_mutator = nullptr; 77 grpc_event_engine::experimental::MemoryAllocatorFactory* 78 memory_allocator_factory = nullptr; 79 PosixTcpOptions() = default; 80 // Move ctor PosixTcpOptionsPosixTcpOptions81 PosixTcpOptions(PosixTcpOptions&& other) noexcept { 82 socket_mutator = std::exchange(other.socket_mutator, nullptr); 83 resource_quota = std::move(other.resource_quota); 84 CopyIntegerOptions(other); 85 } 86 // Move assignment 87 PosixTcpOptions& operator=(PosixTcpOptions&& other) noexcept { 88 if (socket_mutator != nullptr) { 89 grpc_socket_mutator_unref(socket_mutator); 90 } 91 socket_mutator = std::exchange(other.socket_mutator, nullptr); 92 resource_quota = std::move(other.resource_quota); 93 memory_allocator_factory = 94 std::exchange(other.memory_allocator_factory, nullptr); 95 CopyIntegerOptions(other); 96 return *this; 97 } 98 // Copy ctor PosixTcpOptionsPosixTcpOptions99 PosixTcpOptions(const PosixTcpOptions& other) { 100 if (other.socket_mutator != nullptr) { 101 socket_mutator = grpc_socket_mutator_ref(other.socket_mutator); 102 } 103 resource_quota = other.resource_quota; 104 memory_allocator_factory = other.memory_allocator_factory; 105 CopyIntegerOptions(other); 106 } 107 // Copy assignment 108 PosixTcpOptions& operator=(const PosixTcpOptions& other) { 109 if (&other == this) { 110 return *this; 111 } 112 if (socket_mutator != nullptr) { 113 grpc_socket_mutator_unref(socket_mutator); 114 socket_mutator = nullptr; 115 } 116 if (other.socket_mutator != nullptr) { 117 socket_mutator = grpc_socket_mutator_ref(other.socket_mutator); 118 } 119 resource_quota = other.resource_quota; 120 memory_allocator_factory = other.memory_allocator_factory; 121 CopyIntegerOptions(other); 122 return *this; 123 } 124 // Destructor. ~PosixTcpOptionsPosixTcpOptions125 ~PosixTcpOptions() { 126 if (socket_mutator != nullptr) { 127 grpc_socket_mutator_unref(socket_mutator); 128 } 129 } 130 131 private: CopyIntegerOptionsPosixTcpOptions132 void CopyIntegerOptions(const PosixTcpOptions& other) { 133 tcp_read_chunk_size = other.tcp_read_chunk_size; 134 tcp_min_read_chunk_size = other.tcp_min_read_chunk_size; 135 tcp_max_read_chunk_size = other.tcp_max_read_chunk_size; 136 tcp_tx_zerocopy_send_bytes_threshold = 137 other.tcp_tx_zerocopy_send_bytes_threshold; 138 tcp_tx_zerocopy_max_simultaneous_sends = 139 other.tcp_tx_zerocopy_max_simultaneous_sends; 140 tcp_tx_zero_copy_enabled = other.tcp_tx_zero_copy_enabled; 141 keep_alive_time_ms = other.keep_alive_time_ms; 142 keep_alive_timeout_ms = other.keep_alive_timeout_ms; 143 expand_wildcard_addrs = other.expand_wildcard_addrs; 144 allow_reuse_port = other.allow_reuse_port; 145 dscp = other.dscp; 146 } 147 }; 148 149 PosixTcpOptions TcpOptionsFromEndpointConfig( 150 const grpc_event_engine::experimental::EndpointConfig& config); 151 152 // a wrapper for accept or accept4 153 int Accept4(int sockfd, 154 grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr, 155 int nonblock, int cloexec); 156 157 // Unlink the path pointed to by the given address if it refers to UDS path. 158 void UnlinkIfUnixDomainSocket( 159 const EventEngine::ResolvedAddress& resolved_addr); 160 161 class PosixSocketWrapper { 162 public: PosixSocketWrapper(int fd)163 explicit PosixSocketWrapper(int fd) : fd_(fd) { CHECK_GT(fd_, 0); } 164 PosixSocketWrapper()165 PosixSocketWrapper() : fd_(-1) {}; 166 167 ~PosixSocketWrapper() = default; 168 169 // Instruct the kernel to wait for specified number of bytes to be received on 170 // the socket before generating an interrupt for packet receive. If the call 171 // succeeds, it returns the number of bytes (wait threshold) that was actually 172 // set. 173 absl::StatusOr<int> SetSocketRcvLowat(int bytes); 174 175 // Set socket to use zerocopy 176 absl::Status SetSocketZeroCopy(); 177 178 // Set socket to non blocking mode 179 absl::Status SetSocketNonBlocking(int non_blocking); 180 181 // Set socket to close on exec 182 absl::Status SetSocketCloexec(int close_on_exec); 183 184 // Set socket to reuse old addresses 185 absl::Status SetSocketReuseAddr(int reuse); 186 187 // Disable nagle algorithm 188 absl::Status SetSocketLowLatency(int low_latency); 189 190 // Set SO_REUSEPORT 191 absl::Status SetSocketReusePort(int reuse); 192 193 // Set Differentiated Services Code Point (DSCP) 194 absl::Status SetSocketDscp(int dscp); 195 196 // Override default Tcp user timeout values if necessary. 197 void TrySetSocketTcpUserTimeout(const PosixTcpOptions& options, 198 bool is_client); 199 200 // Tries to set SO_NOSIGPIPE if available on this platform. 201 // If SO_NO_SIGPIPE is not available, returns not OK status. 202 absl::Status SetSocketNoSigpipeIfPossible(); 203 204 // Tries to set IP_PKTINFO if available on this platform. If IP_PKTINFO is not 205 // available, returns not OK status. 206 absl::Status SetSocketIpPktInfoIfPossible(); 207 208 // Tries to set IPV6_RECVPKTINFO if available on this platform. If 209 // IPV6_RECVPKTINFO is not available, returns not OK status. 210 absl::Status SetSocketIpv6RecvPktInfoIfPossible(); 211 212 // Tries to set the socket's send buffer to given size. 213 absl::Status SetSocketSndBuf(int buffer_size_bytes); 214 215 // Tries to set the socket's receive buffer to given size. 216 absl::Status SetSocketRcvBuf(int buffer_size_bytes); 217 218 // Tries to set the socket using a grpc_socket_mutator 219 absl::Status SetSocketMutator(grpc_fd_usage usage, 220 grpc_socket_mutator* mutator); 221 222 // Extracts the first socket mutator from config if any and applies on the fd. 223 absl::Status ApplySocketMutatorInOptions(grpc_fd_usage usage, 224 const PosixTcpOptions& options); 225 226 // Return LocalAddress as EventEngine::ResolvedAddress 227 absl::StatusOr<EventEngine::ResolvedAddress> LocalAddress(); 228 229 // Return PeerAddress as EventEngine::ResolvedAddress 230 absl::StatusOr<EventEngine::ResolvedAddress> PeerAddress(); 231 232 // Return LocalAddress as string 233 absl::StatusOr<std::string> LocalAddressString(); 234 235 // Return PeerAddress as string 236 absl::StatusOr<std::string> PeerAddressString(); 237 238 // An enum to keep track of IPv4/IPv6 socket modes. 239 240 // Currently, this information is only used when a socket is first created, 241 // but in the future we may wish to store it alongside the fd. This would let 242 // calls like sendto() know which family to use without asking the kernel 243 // first. 244 enum DSMode { 245 // Uninitialized, or a non-IP socket. 246 DSMODE_NONE, 247 // AF_INET only. 248 DSMODE_IPV4, 249 // AF_INET6 only, because IPV6_V6ONLY could not be cleared. 250 DSMODE_IPV6, 251 // AF_INET6, which also supports ::ffff-mapped IPv4 addresses. 252 DSMODE_DUALSTACK 253 }; 254 255 // Returns the underlying file-descriptor. Fd()256 int Fd() const { return fd_; } 257 258 // Static methods 259 260 // Configure default values for tcp user timeout to be used by client 261 // and server side sockets. 262 static void ConfigureDefaultTcpUserTimeout(bool enable, int timeout, 263 bool is_client); 264 265 // Return true if SO_REUSEPORT is supported 266 static bool IsSocketReusePortSupported(); 267 268 // Returns true if this system can create AF_INET6 sockets bound to ::1. 269 // The value is probed once, and cached for the life of the process. 270 271 // This is more restrictive than checking for socket(AF_INET6) to succeed, 272 // because Linux with "net.ipv6.conf.all.disable_ipv6 = 1" is able to create 273 // and bind IPv6 sockets, but cannot connect to a getsockname() of [::]:port 274 // without a valid loopback interface. Rather than expose this half-broken 275 // state to library users, we turn off IPv6 sockets. 276 static bool IsIpv6LoopbackAvailable(); 277 278 // Creates a new socket for connecting to (or listening on) an address. 279 280 // If addr is AF_INET6, this creates an IPv6 socket first. If that fails, 281 // and addr is within ::ffff:0.0.0.0/96, then it automatically falls back to 282 // an IPv4 socket. 283 284 // If addr is AF_INET, AF_UNIX, or anything else, then this is similar to 285 // calling socket() directly. 286 287 // Returns an PosixSocketWrapper on success, otherwise returns a not-OK 288 // absl::Status 289 290 // The dsmode output indicates which address family was actually created. 291 static absl::StatusOr<PosixSocketWrapper> CreateDualStackSocket( 292 std::function<int(int /*domain*/, int /*type*/, int /*protocol*/)> 293 socket_factory, 294 const experimental::EventEngine::ResolvedAddress& addr, int type, 295 int protocol, DSMode& dsmode); 296 297 struct PosixSocketCreateResult; 298 // Return a PosixSocketCreateResult which manages a configured, unbound, 299 // unconnected TCP client fd. 300 // options: may contain custom tcp settings for the fd. 301 // target_addr: the destination address. 302 // 303 // Returns: Not-OK status on error. Otherwise it returns a 304 // PosixSocketWrapper::PosixSocketCreateResult type which includes a sock 305 // of type PosixSocketWrapper and a mapped_target_addr which is 306 // target_addr mapped to an address appropriate to the type of socket FD 307 // created. For example, if target_addr is IPv4 and dual stack sockets are 308 // available, mapped_target_addr will be an IPv4-mapped IPv6 address. 309 // 310 static absl::StatusOr<PosixSocketCreateResult> 311 CreateAndPrepareTcpClientSocket( 312 const PosixTcpOptions& options, 313 const EventEngine::ResolvedAddress& target_addr); 314 315 private: 316 int fd_; 317 }; 318 319 struct PosixSocketWrapper::PosixSocketCreateResult { 320 PosixSocketWrapper sock; 321 EventEngine::ResolvedAddress mapped_target_addr; 322 }; 323 324 bool SetSocketDualStack(int fd); 325 326 } // namespace experimental 327 } // namespace grpc_event_engine 328 329 #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H 330