1 // Copyright 2022 gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include "src/core/lib/event_engine/posix.h"
18 #include "src/core/lib/iomgr/exec_ctx.h"
19 #include "src/core/lib/iomgr/port.h"
20
21 #ifdef GRPC_POSIX_SOCKET_TCP
22
23 #include <errno.h> // IWYU pragma: keep
24 #include <grpc/event_engine/event_engine.h>
25 #include <grpc/event_engine/memory_allocator.h>
26 #include <sys/socket.h> // IWYU pragma: keep
27 #include <unistd.h> // IWYU pragma: keep
28
29 #include <atomic>
30 #include <string>
31 #include <tuple>
32 #include <type_traits>
33 #include <utility>
34
35 #include "absl/functional/any_invocable.h"
36 #include "absl/log/check.h"
37 #include "absl/log/log.h"
38 #include "absl/status/status.h"
39 #include "absl/strings/str_cat.h"
40 #include "absl/types/optional.h"
41 #include "src/core/lib/debug/trace.h"
42 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
43 #include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
44 #include "src/core/lib/event_engine/posix_engine/posix_engine_listener.h"
45 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
46 #include "src/core/lib/event_engine/tcp_socket_utils.h"
47 #include "src/core/lib/iomgr/socket_mutator.h"
48 #include "src/core/util/status_helper.h"
49 #include "src/core/util/strerror.h"
50 #include "src/core/util/time.h"
51
52 namespace grpc_event_engine {
53 namespace experimental {
54
PosixEngineListenerImpl(PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const grpc_event_engine::experimental::EndpointConfig & config,std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory> memory_allocator_factory,PosixEventPoller * poller,std::shared_ptr<EventEngine> engine)55 PosixEngineListenerImpl::PosixEngineListenerImpl(
56 PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,
57 absl::AnyInvocable<void(absl::Status)> on_shutdown,
58 const grpc_event_engine::experimental::EndpointConfig& config,
59 std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory>
60 memory_allocator_factory,
61 PosixEventPoller* poller, std::shared_ptr<EventEngine> engine)
62 : poller_(poller),
63 options_(TcpOptionsFromEndpointConfig(config)),
64 engine_(std::move(engine)),
65 acceptors_(this),
66 on_accept_(std::move(on_accept)),
67 on_shutdown_(std::move(on_shutdown)),
68 memory_allocator_factory_(std::move(memory_allocator_factory)) {}
69
Bind(const EventEngine::ResolvedAddress & addr,PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd)70 absl::StatusOr<int> PosixEngineListenerImpl::Bind(
71 const EventEngine::ResolvedAddress& addr,
72 PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd) {
73 grpc_core::MutexLock lock(&this->mu_);
74 if (this->started_) {
75 return absl::FailedPreconditionError(
76 "Listener is already started, ports can no longer be bound");
77 }
78 EventEngine::ResolvedAddress res_addr = addr;
79 EventEngine::ResolvedAddress addr6_v4mapped;
80 int requested_port = ResolvedAddressGetPort(res_addr);
81 CHECK(addr.size() <= EventEngine::ResolvedAddress::MAX_SIZE_BYTES);
82 UnlinkIfUnixDomainSocket(addr);
83
84 /// Check if this is a wildcard port, and if so, try to keep the port the same
85 /// as some previously created listener socket.
86 for (auto it = acceptors_.begin();
87 requested_port == 0 && it != acceptors_.end(); it++) {
88 EventEngine::ResolvedAddress sockname_temp;
89 socklen_t len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
90 if (0 == getsockname((*it)->Socket().sock.Fd(),
91 const_cast<sockaddr*>(sockname_temp.address()),
92 &len)) {
93 int used_port = ResolvedAddressGetPort(sockname_temp);
94 if (used_port > 0) {
95 requested_port = used_port;
96 ResolvedAddressSetPort(res_addr, requested_port);
97 break;
98 }
99 }
100 }
101
102 auto used_port = MaybeGetWildcardPortFromAddress(res_addr);
103 // Update the callback. Any subsequent new sockets created and added to
104 // acceptors_ in this function will invoke the new callback.
105 acceptors_.UpdateOnAppendCallback(std::move(on_bind_new_fd));
106 if (used_port.has_value()) {
107 requested_port = *used_port;
108 return ListenerContainerAddWildcardAddresses(acceptors_, options_,
109 requested_port);
110 }
111 if (ResolvedAddressToV4Mapped(res_addr, &addr6_v4mapped)) {
112 res_addr = addr6_v4mapped;
113 }
114
115 auto result = CreateAndPrepareListenerSocket(options_, res_addr);
116 GRPC_RETURN_IF_ERROR(result.status());
117 acceptors_.Append(*result);
118 return result->port;
119 }
120
Start()121 void PosixEngineListenerImpl::AsyncConnectionAcceptor::Start() {
122 Ref();
123 handle_->NotifyOnRead(notify_on_accept_);
124 }
125
NotifyOnAccept(absl::Status status)126 void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
127 absl::Status status) {
128 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
129 << "Acceptor[" << this << "]: NotifyOnAccept: " << status;
130 if (!status.ok()) {
131 // Shutting down the acceptor. Unref the ref grabbed in
132 // AsyncConnectionAcceptor::Start().
133 Unref();
134 return;
135 }
136 // loop until accept4 returns EAGAIN, and then re-arm notification.
137 for (;;) {
138 EventEngine::ResolvedAddress addr;
139 memset(const_cast<sockaddr*>(addr.address()), 0, addr.size());
140 // Note: If we ever decide to return this address to the user, remember to
141 // strip off the ::ffff:0.0.0.0/96 prefix first.
142 int fd = Accept4(handle_->WrappedFd(), addr, 1, 1);
143 if (fd < 0) {
144 switch (errno) {
145 case EINTR:
146 continue;
147 case EMFILE:
148 // When the process runs out of fds, accept4() returns EMFILE. When
149 // this happens, the connection is left in the accept queue until
150 // either a read event triggers the on_read callback, or time has
151 // passed and the accept should be re-tried regardless. This callback
152 // is not cancelled, so a spurious wakeup may occur even when there's
153 // nothing to accept. This is not a performant code path, but if an fd
154 // limit has been reached, the system is likely in an unhappy state
155 // regardless.
156 LOG_EVERY_N_SEC(ERROR, 1)
157 << "File descriptor limit reached. Retrying.";
158 handle_->NotifyOnRead(notify_on_accept_);
159 // Do not schedule another timer if one is already armed.
160 if (retry_timer_armed_.exchange(true)) return;
161 // Hold a ref while the retry timer is waiting, to prevent listener
162 // destruction and the races that would ensue.
163 Ref();
164 std::ignore =
165 engine_->RunAfter(grpc_core::Duration::Seconds(1), [this]() {
166 retry_timer_armed_.store(false);
167 if (!handle_->IsHandleShutdown()) {
168 handle_->SetReadable();
169 }
170 Unref();
171 });
172 return;
173 case EAGAIN:
174 case ECONNABORTED:
175 handle_->NotifyOnRead(notify_on_accept_);
176 return;
177 default:
178 LOG(ERROR) << "Closing acceptor. Failed accept4: "
179 << grpc_core::StrError(errno);
180 // Shutting down the acceptor. Unref the ref grabbed in
181 // AsyncConnectionAcceptor::Start().
182 Unref();
183 return;
184 }
185 }
186
187 // For UNIX sockets, the accept call might not fill up the member
188 // sun_path of sockaddr_un, so explicitly call getpeername to get it.
189 if (addr.address()->sa_family == AF_UNIX) {
190 socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
191 if (getpeername(fd, const_cast<sockaddr*>(addr.address()), &len) < 0) {
192 auto listener_addr_uri = ResolvedAddressToURI(socket_.addr);
193 LOG(ERROR) << "Failed getpeername: " << grpc_core::StrError(errno)
194 << ". Dropping the connection, and continuing "
195 "to listen on "
196 << (listener_addr_uri.ok() ? *listener_addr_uri
197 : "<unknown>")
198 << ":" << socket_.port;
199 close(fd);
200 handle_->NotifyOnRead(notify_on_accept_);
201 return;
202 }
203 addr = EventEngine::ResolvedAddress(addr.address(), len);
204 }
205
206 PosixSocketWrapper sock(fd);
207 (void)sock.SetSocketNoSigpipeIfPossible();
208 auto result = sock.ApplySocketMutatorInOptions(
209 GRPC_FD_SERVER_CONNECTION_USAGE, listener_->options_);
210 if (!result.ok()) {
211 LOG(ERROR) << "Closing acceptor. Failed to apply socket mutator: "
212 << result;
213 // Shutting down the acceptor. Unref the ref grabbed in
214 // AsyncConnectionAcceptor::Start().
215 Unref();
216 return;
217 }
218
219 // Create an Endpoint here.
220 auto peer_name = ResolvedAddressToURI(addr);
221 if (!peer_name.ok()) {
222 LOG(ERROR) << "Invalid address: " << peer_name.status();
223 // Shutting down the acceptor. Unref the ref grabbed in
224 // AsyncConnectionAcceptor::Start().
225 Unref();
226 return;
227 }
228 auto endpoint = CreatePosixEndpoint(
229 /*handle=*/listener_->poller_->CreateHandle(
230 fd, *peer_name, listener_->poller_->CanTrackErrors()),
231 /*on_shutdown=*/nullptr, /*engine=*/listener_->engine_,
232 // allocator=
233 listener_->memory_allocator_factory_->CreateMemoryAllocator(
234 absl::StrCat("endpoint-tcp-server-connection: ", *peer_name)),
235 /*options=*/listener_->options_);
236
237 grpc_core::EnsureRunInExecCtx([this, peer_name = std::move(*peer_name),
238 endpoint = std::move(endpoint)]() mutable {
239 // Call on_accept_ and then resume accepting new connections
240 // by continuing the parent for-loop.
241 listener_->on_accept_(
242 /*listener_fd=*/handle_->WrappedFd(),
243 /*endpoint=*/std::move(endpoint),
244 /*is_external=*/false,
245 /*memory_allocator=*/
246 listener_->memory_allocator_factory_->CreateMemoryAllocator(
247 absl::StrCat("on-accept-tcp-server-connection: ", peer_name)),
248 /*pending_data=*/nullptr);
249 });
250 }
251 GPR_UNREACHABLE_CODE(return);
252 }
253
HandleExternalConnection(int listener_fd,int fd,SliceBuffer * pending_data)254 absl::Status PosixEngineListenerImpl::HandleExternalConnection(
255 int listener_fd, int fd, SliceBuffer* pending_data) {
256 if (listener_fd < 0) {
257 return absl::UnknownError(absl::StrCat(
258 "HandleExternalConnection: Invalid listener socket: ", listener_fd));
259 }
260 if (fd < 0) {
261 return absl::UnknownError(
262 absl::StrCat("HandleExternalConnection: Invalid peer socket: ", fd));
263 }
264 PosixSocketWrapper sock(fd);
265 (void)sock.SetSocketNoSigpipeIfPossible();
266 auto peer_name = sock.PeerAddressString();
267 if (!peer_name.ok()) {
268 return absl::UnknownError(
269 absl::StrCat("HandleExternalConnection: peer not connected: ",
270 peer_name.status().ToString()));
271 }
272 grpc_core::EnsureRunInExecCtx([this, peer_name = std::move(*peer_name),
273 pending_data, listener_fd, fd]() mutable {
274 auto endpoint = CreatePosixEndpoint(
275 /*handle=*/poller_->CreateHandle(fd, peer_name,
276 poller_->CanTrackErrors()),
277 /*on_shutdown=*/nullptr, /*engine=*/engine_,
278 /*allocator=*/
279 memory_allocator_factory_->CreateMemoryAllocator(absl::StrCat(
280 "external:endpoint-tcp-server-connection: ", peer_name)),
281 /*options=*/options_);
282 on_accept_(
283 /*listener_fd=*/listener_fd, /*endpoint=*/std::move(endpoint),
284 /*is_external=*/true,
285 /*memory_allocator=*/
286 memory_allocator_factory_->CreateMemoryAllocator(absl::StrCat(
287 "external:on-accept-tcp-server-connection: ", peer_name)),
288 /*pending_data=*/pending_data);
289 });
290 return absl::OkStatus();
291 }
292
Shutdown()293 void PosixEngineListenerImpl::AsyncConnectionAcceptor::Shutdown() {
294 // The ShutdownHandle would trigger any waiting notify_on_accept_ to get
295 // scheduled with the not-OK status.
296 handle_->ShutdownHandle(absl::InternalError("Shutting down acceptor"));
297 Unref();
298 }
299
Start()300 absl::Status PosixEngineListenerImpl::Start() {
301 grpc_core::MutexLock lock(&this->mu_);
302 // Start each asynchronous acceptor.
303 CHECK(!this->started_);
304 this->started_ = true;
305 for (auto it = acceptors_.begin(); it != acceptors_.end(); it++) {
306 (*it)->Start();
307 }
308 return absl::OkStatus();
309 }
310
TriggerShutdown()311 void PosixEngineListenerImpl::TriggerShutdown() {
312 // This would get invoked from the destructor of the parent
313 // PosixEngineListener object.
314 grpc_core::MutexLock lock(&this->mu_);
315 for (auto it = acceptors_.begin(); it != acceptors_.end(); it++) {
316 // Trigger shutdown of each asynchronous acceptor. This in-turn calls
317 // ShutdownHandle on the associated poller event handle. It may also
318 // immediately delete the asynchronous acceptor if the acceptor was never
319 // started.
320 (*it)->Shutdown();
321 }
322 }
323
~PosixEngineListenerImpl()324 PosixEngineListenerImpl::~PosixEngineListenerImpl() {
325 // This should get invoked only after all the AsyncConnectionAcceptors have
326 // been destroyed. This is because each AsyncConnectionAcceptor has a
327 // shared_ptr ref to the parent PosixEngineListenerImpl.
328 if (on_shutdown_ != nullptr) {
329 on_shutdown_(absl::OkStatus());
330 }
331 }
332
333 } // namespace experimental
334 } // namespace grpc_event_engine
335
336 #endif // GRPC_POSIX_SOCKET_TCP
337