• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_H
15 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_H
16 
17 #include <grpc/event_engine/endpoint_config.h>
18 #include <grpc/event_engine/event_engine.h>
19 #include <grpc/event_engine/memory_allocator.h>
20 #include <grpc/event_engine/slice_buffer.h>
21 #include <grpc/support/port_platform.h>
22 #include <string.h>
23 
24 #include <atomic>
25 #include <list>
26 #include <memory>
27 #include <string>
28 #include <utility>
29 
30 #include "absl/base/thread_annotations.h"
31 #include "absl/functional/any_invocable.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "src/core/lib/event_engine/posix.h"
35 #include "src/core/lib/iomgr/port.h"
36 #include "src/core/util/sync.h"
37 
38 #ifdef GRPC_POSIX_SOCKET_TCP
39 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
40 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
41 #include "src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h"
42 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
43 #include "src/core/lib/event_engine/tcp_socket_utils.h"
44 #endif
45 
46 namespace grpc_event_engine {
47 namespace experimental {
48 
49 #ifdef GRPC_POSIX_SOCKET_TCP
50 class PosixEngineListenerImpl
51     : public std::enable_shared_from_this<PosixEngineListenerImpl> {
52  public:
53   PosixEngineListenerImpl(
54       PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,
55       absl::AnyInvocable<void(absl::Status)> on_shutdown,
56       const grpc_event_engine::experimental::EndpointConfig& config,
57       std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory>
58           memory_allocator_factory,
59       PosixEventPoller* poller, std::shared_ptr<EventEngine> engine);
60   // Binds an address to the listener. This creates a ListenerSocket
61   // and sets its fields appropriately.
62   absl::StatusOr<int> Bind(
63       const EventEngine::ResolvedAddress& addr,
64       PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd);
65   // Signals event manager to listen for connections on all created sockets.
66   absl::Status Start();
67   // Trigger graceful shutdown of all asynchronous accept operations.
68   void TriggerShutdown();
69 
70   absl::Status HandleExternalConnection(int listener_fd, int fd,
71                                         SliceBuffer* pending_data);
72 
73   ~PosixEngineListenerImpl();
74 
75  private:
76   // This class represents accepting for one bind fd belonging to the listener.
77   // Each AsyncConnectionAcceptor takes a ref to the parent
78   // PosixEngineListenerImpl object. So the PosixEngineListenerImpl can be
79   // deleted only after all AsyncConnectionAcceptors get destroyed.
80   class AsyncConnectionAcceptor {
81    public:
AsyncConnectionAcceptor(std::shared_ptr<EventEngine> engine,std::shared_ptr<PosixEngineListenerImpl> listener,ListenerSocketsContainer::ListenerSocket socket)82     AsyncConnectionAcceptor(std::shared_ptr<EventEngine> engine,
83                             std::shared_ptr<PosixEngineListenerImpl> listener,
84                             ListenerSocketsContainer::ListenerSocket socket)
85         : engine_(std::move(engine)),
86           listener_(std::move(listener)),
87           socket_(socket),
88           handle_(listener_->poller_->CreateHandle(
89               socket_.sock.Fd(),
90               *grpc_event_engine::experimental::
91                   ResolvedAddressToNormalizedString(socket_.addr),
92               listener_->poller_->CanTrackErrors())),
93           notify_on_accept_(PosixEngineClosure::ToPermanentClosure(
94               [this](absl::Status status) { NotifyOnAccept(status); })) {};
95     // Start listening for incoming connections on the socket.
96     void Start();
97     // Internal callback invoked when the socket has incoming connections to
98     // process.
99     void NotifyOnAccept(absl::Status status);
100     // Shutdown the poller handle associated with this socket.
101     void Shutdown();
Ref()102     void Ref() { ref_count_.fetch_add(1, std::memory_order_relaxed); }
Unref()103     void Unref() {
104       if (ref_count_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
105         delete this;
106       }
107     }
Socket()108     ListenerSocketsContainer::ListenerSocket& Socket() { return socket_; }
~AsyncConnectionAcceptor()109     ~AsyncConnectionAcceptor() {
110       auto address = socket_.sock.LocalAddress();
111       if (address.ok()) {
112         // If uds socket, unlink it so that the corresponding file is deleted.
113         UnlinkIfUnixDomainSocket(*address);
114       }
115       handle_->OrphanHandle(nullptr, nullptr, "");
116       delete notify_on_accept_;
117     }
118 
119    private:
120     std::atomic<int> ref_count_{1};
121     std::shared_ptr<EventEngine> engine_;
122     std::shared_ptr<PosixEngineListenerImpl> listener_;
123     ListenerSocketsContainer::ListenerSocket socket_;
124     EventHandle* handle_;
125     PosixEngineClosure* notify_on_accept_;
126     // Tracks the status of a backup timer to retry accept4 calls after file
127     // descriptor exhaustion.
128     std::atomic<bool> retry_timer_armed_{false};
129   };
130   class ListenerAsyncAcceptors : public ListenerSocketsContainer {
131    public:
ListenerAsyncAcceptors(PosixEngineListenerImpl * listener)132     explicit ListenerAsyncAcceptors(PosixEngineListenerImpl* listener)
133         : listener_(listener) {};
134 
UpdateOnAppendCallback(PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_append)135     void UpdateOnAppendCallback(
136         PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_append) {
137       on_append_ = std::move(on_append);
138     }
139 
Append(ListenerSocket socket)140     void Append(ListenerSocket socket) override {
141       acceptors_.push_back(new AsyncConnectionAcceptor(
142           listener_->engine_, listener_->shared_from_this(), socket));
143       if (on_append_) {
144         on_append_(socket.sock.Fd());
145       }
146     }
147 
Find(const grpc_event_engine::experimental::EventEngine::ResolvedAddress & addr)148     absl::StatusOr<ListenerSocket> Find(
149         const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
150             addr) override {
151       for (auto* acceptor : acceptors_) {
152         if (acceptor->Socket().addr.size() == addr.size() &&
153             memcmp(acceptor->Socket().addr.address(), addr.address(),
154                    addr.size()) == 0) {
155           return acceptor->Socket();
156         }
157       }
158       return absl::NotFoundError("Socket not found!");
159     }
160 
Size()161     int Size() { return static_cast<int>(acceptors_.size()); }
162 
begin()163     std::list<AsyncConnectionAcceptor*>::const_iterator begin() {
164       return acceptors_.begin();
165     }
end()166     std::list<AsyncConnectionAcceptor*>::const_iterator end() {
167       return acceptors_.end();
168     }
169 
170    private:
171     PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_append_;
172     std::list<AsyncConnectionAcceptor*> acceptors_;
173     PosixEngineListenerImpl* listener_;
174   };
175   friend class ListenerAsyncAcceptors;
176   friend class AsyncConnectionAcceptor;
177   // The mutex ensures thread safety when multiple threads try to call Bind
178   // and Start in parallel.
179   grpc_core::Mutex mu_;
180   PosixEventPoller* poller_;
181   PosixTcpOptions options_;
182   std::shared_ptr<EventEngine> engine_;
183   // Linked list of sockets. One is created upon each successful bind
184   // operation.
185   ListenerAsyncAcceptors acceptors_ ABSL_GUARDED_BY(mu_);
186   // Callback to be invoked upon accepting a connection.
187   PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept_;
188   // Callback to be invoked upon shutdown of listener.
189   absl::AnyInvocable<void(absl::Status)> on_shutdown_;
190   // Set to true when the listener has started listening for new connections.
191   // Any further bind operations would fail.
192   bool started_ ABSL_GUARDED_BY(mu_) = false;
193   // Pointer to a slice allocator factory object which can generate
194   // unique slice allocators for each new incoming connection.
195   std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory>
196       memory_allocator_factory_;
197 };
198 
199 class PosixEngineListener : public PosixListenerWithFdSupport {
200  public:
PosixEngineListener(PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const grpc_event_engine::experimental::EndpointConfig & config,std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory> memory_allocator_factory,PosixEventPoller * poller,std::shared_ptr<EventEngine> engine)201   PosixEngineListener(
202       PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,
203       absl::AnyInvocable<void(absl::Status)> on_shutdown,
204       const grpc_event_engine::experimental::EndpointConfig& config,
205       std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory>
206           memory_allocator_factory,
207       PosixEventPoller* poller, std::shared_ptr<EventEngine> engine)
208       : impl_(std::make_shared<PosixEngineListenerImpl>(
209             std::move(on_accept), std::move(on_shutdown), config,
210             std::move(memory_allocator_factory), poller, std::move(engine))) {}
~PosixEngineListener()211   ~PosixEngineListener() override { ShutdownListeningFds(); };
Bind(const grpc_event_engine::experimental::EventEngine::ResolvedAddress & addr)212   absl::StatusOr<int> Bind(
213       const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr)
214       override {
215     return impl_->Bind(addr, nullptr);
216   }
BindWithFd(const EventEngine::ResolvedAddress & addr,PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd)217   absl::StatusOr<int> BindWithFd(
218       const EventEngine::ResolvedAddress& addr,
219       PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd)
220       override {
221     return impl_->Bind(addr, std::move(on_bind_new_fd));
222   }
HandleExternalConnection(int listener_fd,int fd,SliceBuffer * pending_data)223   absl::Status HandleExternalConnection(int listener_fd, int fd,
224                                         SliceBuffer* pending_data) override {
225     return impl_->HandleExternalConnection(listener_fd, fd, pending_data);
226   }
Start()227   absl::Status Start() override { return impl_->Start(); }
228 
ShutdownListeningFds()229   void ShutdownListeningFds() override {
230     if (!shutdown_.exchange(true, std::memory_order_acq_rel)) {
231       impl_->TriggerShutdown();
232     }
233   }
234 
235  private:
236   std::shared_ptr<PosixEngineListenerImpl> impl_;
237   // Set to true when the listener had been explicitly shutdown.
238   std::atomic<bool> shutdown_{false};
239 };
240 
241 #else  // GRPC_POSIX_SOCKET_TCP
242 
243 #include "src/core/util/crash.h"
244 
245 class PosixEngineListener : public PosixListenerWithFdSupport {
246  public:
247   PosixEngineListener() = default;
248   ~PosixEngineListener() override = default;
249   absl::StatusOr<int> Bind(const grpc_event_engine::experimental::EventEngine::
250                                ResolvedAddress& /*addr*/) override {
251     grpc_core::Crash(
252         "EventEngine::Listener::Bind not supported on this platform");
253   }
254   absl::Status Start() override {
255     grpc_core::Crash(
256         "EventEngine::Listener::Start not supported on this platform");
257   }
258   absl::StatusOr<int> BindWithFd(
259       const EventEngine::ResolvedAddress& /*addr*/,
260       PosixListenerWithFdSupport::OnPosixBindNewFdCallback
261       /*on_bind_new_fd*/) override {
262     grpc_core::Crash(
263         "PosixEngineListener::BindWithFd not supported on this "
264         "platform");
265   }
266   absl::Status HandleExternalConnection(
267       int /*listener_fd*/, int /*fd*/, SliceBuffer* /*pending_data*/) override {
268     grpc_core::Crash(
269         "PosixEngineListener::HandleExternalConnection not "
270         "supported on this platform");
271   }
272   void ShutdownListeningFds() override {
273     grpc_core::Crash(
274         "PosixEngineListener::ShutdownListeningFds not supported on "
275         "this platform");
276   }
277 };
278 
279 #endif
280 
281 }  // namespace experimental
282 }  // namespace grpc_event_engine
283 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_H
284