• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H
16 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H
17 
18 #include <grpc/event_engine/endpoint_config.h>
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/event_engine/memory_allocator.h>
21 #include <grpc/grpc.h>
22 #include <grpc/support/port_platform.h>
23 
24 #include <functional>
25 #include <string>
26 #include <utility>
27 
28 #include "absl/log/check.h"
29 #include "absl/status/status.h"
30 #include "absl/status/statusor.h"
31 #include "src/core/lib/iomgr/port.h"
32 #include "src/core/lib/iomgr/socket_mutator.h"
33 #include "src/core/lib/resource_quota/resource_quota.h"
34 #include "src/core/util/ref_counted_ptr.h"
35 
36 #ifdef GRPC_POSIX_SOCKET_UTILS_COMMON
37 #include <sys/socket.h>
38 #endif
39 
40 #ifdef GRPC_LINUX_ERRQUEUE
41 #ifndef SO_ZEROCOPY
42 #define SO_ZEROCOPY 60
43 #endif
44 #ifndef SO_EE_ORIGIN_ZEROCOPY
45 #define SO_EE_ORIGIN_ZEROCOPY 5
46 #endif
47 #endif  // ifdef GRPC_LINUX_ERRQUEUE
48 
49 namespace grpc_event_engine {
50 namespace experimental {
51 
52 struct PosixTcpOptions {
53   static constexpr int kDefaultReadChunkSize = 8192;
54   static constexpr int kDefaultMinReadChunksize = 256;
55   static constexpr int kDefaultMaxReadChunksize = 4 * 1024 * 1024;
56   static constexpr int kZerocpTxEnabledDefault = 0;
57   static constexpr int kMaxChunkSize = 32 * 1024 * 1024;
58   static constexpr int kDefaultMaxSends = 4;
59   static constexpr size_t kDefaultSendBytesThreshold = 16 * 1024;
60   // Let the system decide the proper buffer size.
61   static constexpr int kReadBufferSizeUnset = -1;
62   static constexpr int kDscpNotSet = -1;
63   int tcp_read_chunk_size = kDefaultReadChunkSize;
64   int tcp_min_read_chunk_size = kDefaultMinReadChunksize;
65   int tcp_max_read_chunk_size = kDefaultMaxReadChunksize;
66   int tcp_tx_zerocopy_send_bytes_threshold = kDefaultSendBytesThreshold;
67   int tcp_tx_zerocopy_max_simultaneous_sends = kDefaultMaxSends;
68   int tcp_receive_buffer_size = kReadBufferSizeUnset;
69   bool tcp_tx_zero_copy_enabled = kZerocpTxEnabledDefault;
70   int keep_alive_time_ms = 0;
71   int keep_alive_timeout_ms = 0;
72   bool expand_wildcard_addrs = false;
73   bool allow_reuse_port = false;
74   int dscp = kDscpNotSet;
75   grpc_core::RefCountedPtr<grpc_core::ResourceQuota> resource_quota;
76   struct grpc_socket_mutator* socket_mutator = nullptr;
77   grpc_event_engine::experimental::MemoryAllocatorFactory*
78       memory_allocator_factory = nullptr;
79   PosixTcpOptions() = default;
80   // Move ctor
PosixTcpOptionsPosixTcpOptions81   PosixTcpOptions(PosixTcpOptions&& other) noexcept {
82     socket_mutator = std::exchange(other.socket_mutator, nullptr);
83     resource_quota = std::move(other.resource_quota);
84     CopyIntegerOptions(other);
85   }
86   // Move assignment
87   PosixTcpOptions& operator=(PosixTcpOptions&& other) noexcept {
88     if (socket_mutator != nullptr) {
89       grpc_socket_mutator_unref(socket_mutator);
90     }
91     socket_mutator = std::exchange(other.socket_mutator, nullptr);
92     resource_quota = std::move(other.resource_quota);
93     memory_allocator_factory =
94         std::exchange(other.memory_allocator_factory, nullptr);
95     CopyIntegerOptions(other);
96     return *this;
97   }
98   // Copy ctor
PosixTcpOptionsPosixTcpOptions99   PosixTcpOptions(const PosixTcpOptions& other) {
100     if (other.socket_mutator != nullptr) {
101       socket_mutator = grpc_socket_mutator_ref(other.socket_mutator);
102     }
103     resource_quota = other.resource_quota;
104     memory_allocator_factory = other.memory_allocator_factory;
105     CopyIntegerOptions(other);
106   }
107   // Copy assignment
108   PosixTcpOptions& operator=(const PosixTcpOptions& other) {
109     if (&other == this) {
110       return *this;
111     }
112     if (socket_mutator != nullptr) {
113       grpc_socket_mutator_unref(socket_mutator);
114       socket_mutator = nullptr;
115     }
116     if (other.socket_mutator != nullptr) {
117       socket_mutator = grpc_socket_mutator_ref(other.socket_mutator);
118     }
119     resource_quota = other.resource_quota;
120     memory_allocator_factory = other.memory_allocator_factory;
121     CopyIntegerOptions(other);
122     return *this;
123   }
124   // Destructor.
~PosixTcpOptionsPosixTcpOptions125   ~PosixTcpOptions() {
126     if (socket_mutator != nullptr) {
127       grpc_socket_mutator_unref(socket_mutator);
128     }
129   }
130 
131  private:
CopyIntegerOptionsPosixTcpOptions132   void CopyIntegerOptions(const PosixTcpOptions& other) {
133     tcp_read_chunk_size = other.tcp_read_chunk_size;
134     tcp_min_read_chunk_size = other.tcp_min_read_chunk_size;
135     tcp_max_read_chunk_size = other.tcp_max_read_chunk_size;
136     tcp_tx_zerocopy_send_bytes_threshold =
137         other.tcp_tx_zerocopy_send_bytes_threshold;
138     tcp_tx_zerocopy_max_simultaneous_sends =
139         other.tcp_tx_zerocopy_max_simultaneous_sends;
140     tcp_tx_zero_copy_enabled = other.tcp_tx_zero_copy_enabled;
141     keep_alive_time_ms = other.keep_alive_time_ms;
142     keep_alive_timeout_ms = other.keep_alive_timeout_ms;
143     expand_wildcard_addrs = other.expand_wildcard_addrs;
144     allow_reuse_port = other.allow_reuse_port;
145     dscp = other.dscp;
146   }
147 };
148 
149 PosixTcpOptions TcpOptionsFromEndpointConfig(
150     const grpc_event_engine::experimental::EndpointConfig& config);
151 
152 // a wrapper for accept or accept4
153 int Accept4(int sockfd,
154             grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr,
155             int nonblock, int cloexec);
156 
157 // Unlink the path pointed to by the given address if it refers to UDS path.
158 void UnlinkIfUnixDomainSocket(
159     const EventEngine::ResolvedAddress& resolved_addr);
160 
161 class PosixSocketWrapper {
162  public:
PosixSocketWrapper(int fd)163   explicit PosixSocketWrapper(int fd) : fd_(fd) { CHECK_GT(fd_, 0); }
164 
PosixSocketWrapper()165   PosixSocketWrapper() : fd_(-1) {};
166 
167   ~PosixSocketWrapper() = default;
168 
169   // Instruct the kernel to wait for specified number of bytes to be received on
170   // the socket before generating an interrupt for packet receive. If the call
171   // succeeds, it returns the number of bytes (wait threshold) that was actually
172   // set.
173   absl::StatusOr<int> SetSocketRcvLowat(int bytes);
174 
175   // Set socket to use zerocopy
176   absl::Status SetSocketZeroCopy();
177 
178   // Set socket to non blocking mode
179   absl::Status SetSocketNonBlocking(int non_blocking);
180 
181   // Set socket to close on exec
182   absl::Status SetSocketCloexec(int close_on_exec);
183 
184   // Set socket to reuse old addresses
185   absl::Status SetSocketReuseAddr(int reuse);
186 
187   // Disable nagle algorithm
188   absl::Status SetSocketLowLatency(int low_latency);
189 
190   // Set SO_REUSEPORT
191   absl::Status SetSocketReusePort(int reuse);
192 
193   // Set Differentiated Services Code Point (DSCP)
194   absl::Status SetSocketDscp(int dscp);
195 
196   // Override default Tcp user timeout values if necessary.
197   void TrySetSocketTcpUserTimeout(const PosixTcpOptions& options,
198                                   bool is_client);
199 
200   // Tries to set SO_NOSIGPIPE if available on this platform.
201   // If SO_NO_SIGPIPE is not available, returns not OK status.
202   absl::Status SetSocketNoSigpipeIfPossible();
203 
204   // Tries to set IP_PKTINFO if available on this platform. If IP_PKTINFO is not
205   // available, returns not OK status.
206   absl::Status SetSocketIpPktInfoIfPossible();
207 
208   // Tries to set IPV6_RECVPKTINFO if available on this platform. If
209   // IPV6_RECVPKTINFO is not available, returns not OK status.
210   absl::Status SetSocketIpv6RecvPktInfoIfPossible();
211 
212   // Tries to set the socket's send buffer to given size.
213   absl::Status SetSocketSndBuf(int buffer_size_bytes);
214 
215   // Tries to set the socket's receive buffer to given size.
216   absl::Status SetSocketRcvBuf(int buffer_size_bytes);
217 
218   // Tries to set the socket using a grpc_socket_mutator
219   absl::Status SetSocketMutator(grpc_fd_usage usage,
220                                 grpc_socket_mutator* mutator);
221 
222   // Extracts the first socket mutator from config if any and applies on the fd.
223   absl::Status ApplySocketMutatorInOptions(grpc_fd_usage usage,
224                                            const PosixTcpOptions& options);
225 
226   // Return LocalAddress as EventEngine::ResolvedAddress
227   absl::StatusOr<EventEngine::ResolvedAddress> LocalAddress();
228 
229   // Return PeerAddress as EventEngine::ResolvedAddress
230   absl::StatusOr<EventEngine::ResolvedAddress> PeerAddress();
231 
232   // Return LocalAddress as string
233   absl::StatusOr<std::string> LocalAddressString();
234 
235   // Return PeerAddress as string
236   absl::StatusOr<std::string> PeerAddressString();
237 
238   // An enum to keep track of IPv4/IPv6 socket modes.
239 
240   // Currently, this information is only used when a socket is first created,
241   // but in the future we may wish to store it alongside the fd.  This would let
242   // calls like sendto() know which family to use without asking the kernel
243   // first.
244   enum DSMode {
245     // Uninitialized, or a non-IP socket.
246     DSMODE_NONE,
247     // AF_INET only.
248     DSMODE_IPV4,
249     // AF_INET6 only, because IPV6_V6ONLY could not be cleared.
250     DSMODE_IPV6,
251     // AF_INET6, which also supports ::ffff-mapped IPv4 addresses.
252     DSMODE_DUALSTACK
253   };
254 
255   // Returns the underlying file-descriptor.
Fd()256   int Fd() const { return fd_; }
257 
258   // Static methods
259 
260   // Configure default values for tcp user timeout to be used by client
261   // and server side sockets.
262   static void ConfigureDefaultTcpUserTimeout(bool enable, int timeout,
263                                              bool is_client);
264 
265   // Return true if SO_REUSEPORT is supported
266   static bool IsSocketReusePortSupported();
267 
268   // Returns true if this system can create AF_INET6 sockets bound to ::1.
269   // The value is probed once, and cached for the life of the process.
270 
271   // This is more restrictive than checking for socket(AF_INET6) to succeed,
272   // because Linux with "net.ipv6.conf.all.disable_ipv6 = 1" is able to create
273   // and bind IPv6 sockets, but cannot connect to a getsockname() of [::]:port
274   // without a valid loopback interface.  Rather than expose this half-broken
275   // state to library users, we turn off IPv6 sockets.
276   static bool IsIpv6LoopbackAvailable();
277 
278   // Creates a new socket for connecting to (or listening on) an address.
279 
280   // If addr is AF_INET6, this creates an IPv6 socket first.  If that fails,
281   // and addr is within ::ffff:0.0.0.0/96, then it automatically falls back to
282   // an IPv4 socket.
283 
284   // If addr is AF_INET, AF_UNIX, or anything else, then this is similar to
285   // calling socket() directly.
286 
287   // Returns an PosixSocketWrapper on success, otherwise returns a not-OK
288   // absl::Status
289 
290   // The dsmode output indicates which address family was actually created.
291   static absl::StatusOr<PosixSocketWrapper> CreateDualStackSocket(
292       std::function<int(int /*domain*/, int /*type*/, int /*protocol*/)>
293           socket_factory,
294       const experimental::EventEngine::ResolvedAddress& addr, int type,
295       int protocol, DSMode& dsmode);
296 
297   struct PosixSocketCreateResult;
298   // Return a PosixSocketCreateResult which manages a configured, unbound,
299   // unconnected TCP client fd.
300   //  options: may contain custom tcp settings for the fd.
301   //  target_addr: the destination address.
302   //
303   // Returns: Not-OK status on error. Otherwise it returns a
304   // PosixSocketWrapper::PosixSocketCreateResult type which includes a sock
305   // of type PosixSocketWrapper and a mapped_target_addr which is
306   // target_addr mapped to an address appropriate to the type of socket FD
307   // created. For example, if target_addr is IPv4 and dual stack sockets are
308   // available, mapped_target_addr will be an IPv4-mapped IPv6 address.
309   //
310   static absl::StatusOr<PosixSocketCreateResult>
311   CreateAndPrepareTcpClientSocket(
312       const PosixTcpOptions& options,
313       const EventEngine::ResolvedAddress& target_addr);
314 
315  private:
316   int fd_;
317 };
318 
319 struct PosixSocketWrapper::PosixSocketCreateResult {
320   PosixSocketWrapper sock;
321   EventEngine::ResolvedAddress mapped_target_addr;
322 };
323 
324 bool SetSocketDualStack(int fd);
325 
326 }  // namespace experimental
327 }  // namespace grpc_event_engine
328 
329 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TCP_SOCKET_UTILS_H
330