• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #include "src/core/lib/event_engine/posix.h"
18 #include "src/core/lib/iomgr/exec_ctx.h"
19 #include "src/core/lib/iomgr/port.h"
20 
21 #ifdef GRPC_POSIX_SOCKET_TCP
22 
23 #include <errno.h>  // IWYU pragma: keep
24 #include <grpc/event_engine/event_engine.h>
25 #include <grpc/event_engine/memory_allocator.h>
26 #include <sys/socket.h>  // IWYU pragma: keep
27 #include <unistd.h>      // IWYU pragma: keep
28 
29 #include <atomic>
30 #include <string>
31 #include <tuple>
32 #include <type_traits>
33 #include <utility>
34 
35 #include "absl/functional/any_invocable.h"
36 #include "absl/log/check.h"
37 #include "absl/log/log.h"
38 #include "absl/status/status.h"
39 #include "absl/strings/str_cat.h"
40 #include "absl/types/optional.h"
41 #include "src/core/lib/debug/trace.h"
42 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
43 #include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
44 #include "src/core/lib/event_engine/posix_engine/posix_engine_listener.h"
45 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
46 #include "src/core/lib/event_engine/tcp_socket_utils.h"
47 #include "src/core/lib/iomgr/socket_mutator.h"
48 #include "src/core/util/status_helper.h"
49 #include "src/core/util/strerror.h"
50 #include "src/core/util/time.h"
51 
52 namespace grpc_event_engine {
53 namespace experimental {
54 
PosixEngineListenerImpl(PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const grpc_event_engine::experimental::EndpointConfig & config,std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory> memory_allocator_factory,PosixEventPoller * poller,std::shared_ptr<EventEngine> engine)55 PosixEngineListenerImpl::PosixEngineListenerImpl(
56     PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,
57     absl::AnyInvocable<void(absl::Status)> on_shutdown,
58     const grpc_event_engine::experimental::EndpointConfig& config,
59     std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory>
60         memory_allocator_factory,
61     PosixEventPoller* poller, std::shared_ptr<EventEngine> engine)
62     : poller_(poller),
63       options_(TcpOptionsFromEndpointConfig(config)),
64       engine_(std::move(engine)),
65       acceptors_(this),
66       on_accept_(std::move(on_accept)),
67       on_shutdown_(std::move(on_shutdown)),
68       memory_allocator_factory_(std::move(memory_allocator_factory)) {}
69 
Bind(const EventEngine::ResolvedAddress & addr,PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd)70 absl::StatusOr<int> PosixEngineListenerImpl::Bind(
71     const EventEngine::ResolvedAddress& addr,
72     PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd) {
73   grpc_core::MutexLock lock(&this->mu_);
74   if (this->started_) {
75     return absl::FailedPreconditionError(
76         "Listener is already started, ports can no longer be bound");
77   }
78   EventEngine::ResolvedAddress res_addr = addr;
79   EventEngine::ResolvedAddress addr6_v4mapped;
80   int requested_port = ResolvedAddressGetPort(res_addr);
81   CHECK(addr.size() <= EventEngine::ResolvedAddress::MAX_SIZE_BYTES);
82   UnlinkIfUnixDomainSocket(addr);
83 
84   /// Check if this is a wildcard port, and if so, try to keep the port the same
85   /// as some previously created listener socket.
86   for (auto it = acceptors_.begin();
87        requested_port == 0 && it != acceptors_.end(); it++) {
88     EventEngine::ResolvedAddress sockname_temp;
89     socklen_t len = static_cast<socklen_t>(sizeof(struct sockaddr_storage));
90     if (0 == getsockname((*it)->Socket().sock.Fd(),
91                          const_cast<sockaddr*>(sockname_temp.address()),
92                          &len)) {
93       int used_port = ResolvedAddressGetPort(sockname_temp);
94       if (used_port > 0) {
95         requested_port = used_port;
96         ResolvedAddressSetPort(res_addr, requested_port);
97         break;
98       }
99     }
100   }
101 
102   auto used_port = MaybeGetWildcardPortFromAddress(res_addr);
103   // Update the callback. Any subsequent new sockets created and added to
104   // acceptors_ in this function will invoke the new callback.
105   acceptors_.UpdateOnAppendCallback(std::move(on_bind_new_fd));
106   if (used_port.has_value()) {
107     requested_port = *used_port;
108     return ListenerContainerAddWildcardAddresses(acceptors_, options_,
109                                                  requested_port);
110   }
111   if (ResolvedAddressToV4Mapped(res_addr, &addr6_v4mapped)) {
112     res_addr = addr6_v4mapped;
113   }
114 
115   auto result = CreateAndPrepareListenerSocket(options_, res_addr);
116   GRPC_RETURN_IF_ERROR(result.status());
117   acceptors_.Append(*result);
118   return result->port;
119 }
120 
Start()121 void PosixEngineListenerImpl::AsyncConnectionAcceptor::Start() {
122   Ref();
123   handle_->NotifyOnRead(notify_on_accept_);
124 }
125 
NotifyOnAccept(absl::Status status)126 void PosixEngineListenerImpl::AsyncConnectionAcceptor::NotifyOnAccept(
127     absl::Status status) {
128   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
129       << "Acceptor[" << this << "]: NotifyOnAccept: " << status;
130   if (!status.ok()) {
131     // Shutting down the acceptor. Unref the ref grabbed in
132     // AsyncConnectionAcceptor::Start().
133     Unref();
134     return;
135   }
136   // loop until accept4 returns EAGAIN, and then re-arm notification.
137   for (;;) {
138     EventEngine::ResolvedAddress addr;
139     memset(const_cast<sockaddr*>(addr.address()), 0, addr.size());
140     // Note: If we ever decide to return this address to the user, remember to
141     // strip off the ::ffff:0.0.0.0/96 prefix first.
142     int fd = Accept4(handle_->WrappedFd(), addr, 1, 1);
143     if (fd < 0) {
144       switch (errno) {
145         case EINTR:
146           continue;
147         case EMFILE:
148           // When the process runs out of fds, accept4() returns EMFILE. When
149           // this happens, the connection is left in the accept queue until
150           // either a read event triggers the on_read callback, or time has
151           // passed and the accept should be re-tried regardless. This callback
152           // is not cancelled, so a spurious wakeup may occur even when there's
153           // nothing to accept. This is not a performant code path, but if an fd
154           // limit has been reached, the system is likely in an unhappy state
155           // regardless.
156           LOG_EVERY_N_SEC(ERROR, 1)
157               << "File descriptor limit reached. Retrying.";
158           handle_->NotifyOnRead(notify_on_accept_);
159           // Do not schedule another timer if one is already armed.
160           if (retry_timer_armed_.exchange(true)) return;
161           // Hold a ref while the retry timer is waiting, to prevent listener
162           // destruction and the races that would ensue.
163           Ref();
164           std::ignore =
165               engine_->RunAfter(grpc_core::Duration::Seconds(1), [this]() {
166                 retry_timer_armed_.store(false);
167                 if (!handle_->IsHandleShutdown()) {
168                   handle_->SetReadable();
169                 }
170                 Unref();
171               });
172           return;
173         case EAGAIN:
174         case ECONNABORTED:
175           handle_->NotifyOnRead(notify_on_accept_);
176           return;
177         default:
178           LOG(ERROR) << "Closing acceptor. Failed accept4: "
179                      << grpc_core::StrError(errno);
180           // Shutting down the acceptor. Unref the ref grabbed in
181           // AsyncConnectionAcceptor::Start().
182           Unref();
183           return;
184       }
185     }
186 
187     // For UNIX sockets, the accept call might not fill up the member
188     // sun_path of sockaddr_un, so explicitly call getpeername to get it.
189     if (addr.address()->sa_family == AF_UNIX) {
190       socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
191       if (getpeername(fd, const_cast<sockaddr*>(addr.address()), &len) < 0) {
192         auto listener_addr_uri = ResolvedAddressToURI(socket_.addr);
193         LOG(ERROR) << "Failed getpeername: " << grpc_core::StrError(errno)
194                    << ". Dropping the connection, and continuing "
195                       "to listen on "
196                    << (listener_addr_uri.ok() ? *listener_addr_uri
197                                               : "<unknown>")
198                    << ":" << socket_.port;
199         close(fd);
200         handle_->NotifyOnRead(notify_on_accept_);
201         return;
202       }
203       addr = EventEngine::ResolvedAddress(addr.address(), len);
204     }
205 
206     PosixSocketWrapper sock(fd);
207     (void)sock.SetSocketNoSigpipeIfPossible();
208     auto result = sock.ApplySocketMutatorInOptions(
209         GRPC_FD_SERVER_CONNECTION_USAGE, listener_->options_);
210     if (!result.ok()) {
211       LOG(ERROR) << "Closing acceptor. Failed to apply socket mutator: "
212                  << result;
213       // Shutting down the acceptor. Unref the ref grabbed in
214       // AsyncConnectionAcceptor::Start().
215       Unref();
216       return;
217     }
218 
219     // Create an Endpoint here.
220     auto peer_name = ResolvedAddressToURI(addr);
221     if (!peer_name.ok()) {
222       LOG(ERROR) << "Invalid address: " << peer_name.status();
223       // Shutting down the acceptor. Unref the ref grabbed in
224       // AsyncConnectionAcceptor::Start().
225       Unref();
226       return;
227     }
228     auto endpoint = CreatePosixEndpoint(
229         /*handle=*/listener_->poller_->CreateHandle(
230             fd, *peer_name, listener_->poller_->CanTrackErrors()),
231         /*on_shutdown=*/nullptr, /*engine=*/listener_->engine_,
232         // allocator=
233         listener_->memory_allocator_factory_->CreateMemoryAllocator(
234             absl::StrCat("endpoint-tcp-server-connection: ", *peer_name)),
235         /*options=*/listener_->options_);
236 
237     grpc_core::EnsureRunInExecCtx([this, peer_name = std::move(*peer_name),
238                                    endpoint = std::move(endpoint)]() mutable {
239       // Call on_accept_ and then resume accepting new connections
240       // by continuing the parent for-loop.
241       listener_->on_accept_(
242           /*listener_fd=*/handle_->WrappedFd(),
243           /*endpoint=*/std::move(endpoint),
244           /*is_external=*/false,
245           /*memory_allocator=*/
246           listener_->memory_allocator_factory_->CreateMemoryAllocator(
247               absl::StrCat("on-accept-tcp-server-connection: ", peer_name)),
248           /*pending_data=*/nullptr);
249     });
250   }
251   GPR_UNREACHABLE_CODE(return);
252 }
253 
HandleExternalConnection(int listener_fd,int fd,SliceBuffer * pending_data)254 absl::Status PosixEngineListenerImpl::HandleExternalConnection(
255     int listener_fd, int fd, SliceBuffer* pending_data) {
256   if (listener_fd < 0) {
257     return absl::UnknownError(absl::StrCat(
258         "HandleExternalConnection: Invalid listener socket: ", listener_fd));
259   }
260   if (fd < 0) {
261     return absl::UnknownError(
262         absl::StrCat("HandleExternalConnection: Invalid peer socket: ", fd));
263   }
264   PosixSocketWrapper sock(fd);
265   (void)sock.SetSocketNoSigpipeIfPossible();
266   auto peer_name = sock.PeerAddressString();
267   if (!peer_name.ok()) {
268     return absl::UnknownError(
269         absl::StrCat("HandleExternalConnection: peer not connected: ",
270                      peer_name.status().ToString()));
271   }
272   grpc_core::EnsureRunInExecCtx([this, peer_name = std::move(*peer_name),
273                                  pending_data, listener_fd, fd]() mutable {
274     auto endpoint = CreatePosixEndpoint(
275         /*handle=*/poller_->CreateHandle(fd, peer_name,
276                                          poller_->CanTrackErrors()),
277         /*on_shutdown=*/nullptr, /*engine=*/engine_,
278         /*allocator=*/
279         memory_allocator_factory_->CreateMemoryAllocator(absl::StrCat(
280             "external:endpoint-tcp-server-connection: ", peer_name)),
281         /*options=*/options_);
282     on_accept_(
283         /*listener_fd=*/listener_fd, /*endpoint=*/std::move(endpoint),
284         /*is_external=*/true,
285         /*memory_allocator=*/
286         memory_allocator_factory_->CreateMemoryAllocator(absl::StrCat(
287             "external:on-accept-tcp-server-connection: ", peer_name)),
288         /*pending_data=*/pending_data);
289   });
290   return absl::OkStatus();
291 }
292 
Shutdown()293 void PosixEngineListenerImpl::AsyncConnectionAcceptor::Shutdown() {
294   // The ShutdownHandle would trigger any waiting notify_on_accept_ to get
295   // scheduled with the not-OK status.
296   handle_->ShutdownHandle(absl::InternalError("Shutting down acceptor"));
297   Unref();
298 }
299 
Start()300 absl::Status PosixEngineListenerImpl::Start() {
301   grpc_core::MutexLock lock(&this->mu_);
302   // Start each asynchronous acceptor.
303   CHECK(!this->started_);
304   this->started_ = true;
305   for (auto it = acceptors_.begin(); it != acceptors_.end(); it++) {
306     (*it)->Start();
307   }
308   return absl::OkStatus();
309 }
310 
TriggerShutdown()311 void PosixEngineListenerImpl::TriggerShutdown() {
312   // This would get invoked from the destructor of the parent
313   // PosixEngineListener object.
314   grpc_core::MutexLock lock(&this->mu_);
315   for (auto it = acceptors_.begin(); it != acceptors_.end(); it++) {
316     // Trigger shutdown of each asynchronous acceptor. This in-turn calls
317     // ShutdownHandle on the associated poller event handle. It may also
318     // immediately delete the asynchronous acceptor if the acceptor was never
319     // started.
320     (*it)->Shutdown();
321   }
322 }
323 
~PosixEngineListenerImpl()324 PosixEngineListenerImpl::~PosixEngineListenerImpl() {
325   // This should get invoked only after all the AsyncConnectionAcceptors have
326   // been destroyed. This is because each AsyncConnectionAcceptor has a
327   // shared_ptr ref to the parent PosixEngineListenerImpl.
328   if (on_shutdown_ != nullptr) {
329     on_shutdown_(absl::OkStatus());
330   }
331 }
332 
333 }  // namespace experimental
334 }  // namespace grpc_event_engine
335 
336 #endif  // GRPC_POSIX_SOCKET_TCP
337