1 // Copyright 2022 gRPC Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_H 15 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_H 16 17 #include <grpc/event_engine/endpoint_config.h> 18 #include <grpc/event_engine/event_engine.h> 19 #include <grpc/event_engine/memory_allocator.h> 20 #include <grpc/event_engine/slice_buffer.h> 21 #include <grpc/support/port_platform.h> 22 #include <string.h> 23 24 #include <atomic> 25 #include <list> 26 #include <memory> 27 #include <string> 28 #include <utility> 29 30 #include "absl/base/thread_annotations.h" 31 #include "absl/functional/any_invocable.h" 32 #include "absl/status/status.h" 33 #include "absl/status/statusor.h" 34 #include "src/core/lib/event_engine/posix.h" 35 #include "src/core/lib/iomgr/port.h" 36 #include "src/core/util/sync.h" 37 38 #ifdef GRPC_POSIX_SOCKET_TCP 39 #include "src/core/lib/event_engine/posix_engine/event_poller.h" 40 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" 41 #include "src/core/lib/event_engine/posix_engine/posix_engine_listener_utils.h" 42 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" 43 #include "src/core/lib/event_engine/tcp_socket_utils.h" 44 #endif 45 46 namespace grpc_event_engine { 47 namespace experimental { 48 49 #ifdef GRPC_POSIX_SOCKET_TCP 50 class PosixEngineListenerImpl 51 : public std::enable_shared_from_this<PosixEngineListenerImpl> { 52 public: 53 PosixEngineListenerImpl( 54 PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept, 55 absl::AnyInvocable<void(absl::Status)> on_shutdown, 56 const grpc_event_engine::experimental::EndpointConfig& config, 57 std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory> 58 memory_allocator_factory, 59 PosixEventPoller* poller, std::shared_ptr<EventEngine> engine); 60 // Binds an address to the listener. This creates a ListenerSocket 61 // and sets its fields appropriately. 62 absl::StatusOr<int> Bind( 63 const EventEngine::ResolvedAddress& addr, 64 PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd); 65 // Signals event manager to listen for connections on all created sockets. 66 absl::Status Start(); 67 // Trigger graceful shutdown of all asynchronous accept operations. 68 void TriggerShutdown(); 69 70 absl::Status HandleExternalConnection(int listener_fd, int fd, 71 SliceBuffer* pending_data); 72 73 ~PosixEngineListenerImpl(); 74 75 private: 76 // This class represents accepting for one bind fd belonging to the listener. 77 // Each AsyncConnectionAcceptor takes a ref to the parent 78 // PosixEngineListenerImpl object. So the PosixEngineListenerImpl can be 79 // deleted only after all AsyncConnectionAcceptors get destroyed. 80 class AsyncConnectionAcceptor { 81 public: AsyncConnectionAcceptor(std::shared_ptr<EventEngine> engine,std::shared_ptr<PosixEngineListenerImpl> listener,ListenerSocketsContainer::ListenerSocket socket)82 AsyncConnectionAcceptor(std::shared_ptr<EventEngine> engine, 83 std::shared_ptr<PosixEngineListenerImpl> listener, 84 ListenerSocketsContainer::ListenerSocket socket) 85 : engine_(std::move(engine)), 86 listener_(std::move(listener)), 87 socket_(socket), 88 handle_(listener_->poller_->CreateHandle( 89 socket_.sock.Fd(), 90 *grpc_event_engine::experimental:: 91 ResolvedAddressToNormalizedString(socket_.addr), 92 listener_->poller_->CanTrackErrors())), 93 notify_on_accept_(PosixEngineClosure::ToPermanentClosure( 94 [this](absl::Status status) { NotifyOnAccept(status); })) {}; 95 // Start listening for incoming connections on the socket. 96 void Start(); 97 // Internal callback invoked when the socket has incoming connections to 98 // process. 99 void NotifyOnAccept(absl::Status status); 100 // Shutdown the poller handle associated with this socket. 101 void Shutdown(); Ref()102 void Ref() { ref_count_.fetch_add(1, std::memory_order_relaxed); } Unref()103 void Unref() { 104 if (ref_count_.fetch_sub(1, std::memory_order_acq_rel) == 1) { 105 delete this; 106 } 107 } Socket()108 ListenerSocketsContainer::ListenerSocket& Socket() { return socket_; } ~AsyncConnectionAcceptor()109 ~AsyncConnectionAcceptor() { 110 auto address = socket_.sock.LocalAddress(); 111 if (address.ok()) { 112 // If uds socket, unlink it so that the corresponding file is deleted. 113 UnlinkIfUnixDomainSocket(*address); 114 } 115 handle_->OrphanHandle(nullptr, nullptr, ""); 116 delete notify_on_accept_; 117 } 118 119 private: 120 std::atomic<int> ref_count_{1}; 121 std::shared_ptr<EventEngine> engine_; 122 std::shared_ptr<PosixEngineListenerImpl> listener_; 123 ListenerSocketsContainer::ListenerSocket socket_; 124 EventHandle* handle_; 125 PosixEngineClosure* notify_on_accept_; 126 // Tracks the status of a backup timer to retry accept4 calls after file 127 // descriptor exhaustion. 128 std::atomic<bool> retry_timer_armed_{false}; 129 }; 130 class ListenerAsyncAcceptors : public ListenerSocketsContainer { 131 public: ListenerAsyncAcceptors(PosixEngineListenerImpl * listener)132 explicit ListenerAsyncAcceptors(PosixEngineListenerImpl* listener) 133 : listener_(listener) {}; 134 UpdateOnAppendCallback(PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_append)135 void UpdateOnAppendCallback( 136 PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_append) { 137 on_append_ = std::move(on_append); 138 } 139 Append(ListenerSocket socket)140 void Append(ListenerSocket socket) override { 141 acceptors_.push_back(new AsyncConnectionAcceptor( 142 listener_->engine_, listener_->shared_from_this(), socket)); 143 if (on_append_) { 144 on_append_(socket.sock.Fd()); 145 } 146 } 147 Find(const grpc_event_engine::experimental::EventEngine::ResolvedAddress & addr)148 absl::StatusOr<ListenerSocket> Find( 149 const grpc_event_engine::experimental::EventEngine::ResolvedAddress& 150 addr) override { 151 for (auto* acceptor : acceptors_) { 152 if (acceptor->Socket().addr.size() == addr.size() && 153 memcmp(acceptor->Socket().addr.address(), addr.address(), 154 addr.size()) == 0) { 155 return acceptor->Socket(); 156 } 157 } 158 return absl::NotFoundError("Socket not found!"); 159 } 160 Size()161 int Size() { return static_cast<int>(acceptors_.size()); } 162 begin()163 std::list<AsyncConnectionAcceptor*>::const_iterator begin() { 164 return acceptors_.begin(); 165 } end()166 std::list<AsyncConnectionAcceptor*>::const_iterator end() { 167 return acceptors_.end(); 168 } 169 170 private: 171 PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_append_; 172 std::list<AsyncConnectionAcceptor*> acceptors_; 173 PosixEngineListenerImpl* listener_; 174 }; 175 friend class ListenerAsyncAcceptors; 176 friend class AsyncConnectionAcceptor; 177 // The mutex ensures thread safety when multiple threads try to call Bind 178 // and Start in parallel. 179 grpc_core::Mutex mu_; 180 PosixEventPoller* poller_; 181 PosixTcpOptions options_; 182 std::shared_ptr<EventEngine> engine_; 183 // Linked list of sockets. One is created upon each successful bind 184 // operation. 185 ListenerAsyncAcceptors acceptors_ ABSL_GUARDED_BY(mu_); 186 // Callback to be invoked upon accepting a connection. 187 PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept_; 188 // Callback to be invoked upon shutdown of listener. 189 absl::AnyInvocable<void(absl::Status)> on_shutdown_; 190 // Set to true when the listener has started listening for new connections. 191 // Any further bind operations would fail. 192 bool started_ ABSL_GUARDED_BY(mu_) = false; 193 // Pointer to a slice allocator factory object which can generate 194 // unique slice allocators for each new incoming connection. 195 std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory> 196 memory_allocator_factory_; 197 }; 198 199 class PosixEngineListener : public PosixListenerWithFdSupport { 200 public: PosixEngineListener(PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const grpc_event_engine::experimental::EndpointConfig & config,std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory> memory_allocator_factory,PosixEventPoller * poller,std::shared_ptr<EventEngine> engine)201 PosixEngineListener( 202 PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept, 203 absl::AnyInvocable<void(absl::Status)> on_shutdown, 204 const grpc_event_engine::experimental::EndpointConfig& config, 205 std::unique_ptr<grpc_event_engine::experimental::MemoryAllocatorFactory> 206 memory_allocator_factory, 207 PosixEventPoller* poller, std::shared_ptr<EventEngine> engine) 208 : impl_(std::make_shared<PosixEngineListenerImpl>( 209 std::move(on_accept), std::move(on_shutdown), config, 210 std::move(memory_allocator_factory), poller, std::move(engine))) {} ~PosixEngineListener()211 ~PosixEngineListener() override { ShutdownListeningFds(); }; Bind(const grpc_event_engine::experimental::EventEngine::ResolvedAddress & addr)212 absl::StatusOr<int> Bind( 213 const grpc_event_engine::experimental::EventEngine::ResolvedAddress& addr) 214 override { 215 return impl_->Bind(addr, nullptr); 216 } BindWithFd(const EventEngine::ResolvedAddress & addr,PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd)217 absl::StatusOr<int> BindWithFd( 218 const EventEngine::ResolvedAddress& addr, 219 PosixListenerWithFdSupport::OnPosixBindNewFdCallback on_bind_new_fd) 220 override { 221 return impl_->Bind(addr, std::move(on_bind_new_fd)); 222 } HandleExternalConnection(int listener_fd,int fd,SliceBuffer * pending_data)223 absl::Status HandleExternalConnection(int listener_fd, int fd, 224 SliceBuffer* pending_data) override { 225 return impl_->HandleExternalConnection(listener_fd, fd, pending_data); 226 } Start()227 absl::Status Start() override { return impl_->Start(); } 228 ShutdownListeningFds()229 void ShutdownListeningFds() override { 230 if (!shutdown_.exchange(true, std::memory_order_acq_rel)) { 231 impl_->TriggerShutdown(); 232 } 233 } 234 235 private: 236 std::shared_ptr<PosixEngineListenerImpl> impl_; 237 // Set to true when the listener had been explicitly shutdown. 238 std::atomic<bool> shutdown_{false}; 239 }; 240 241 #else // GRPC_POSIX_SOCKET_TCP 242 243 #include "src/core/util/crash.h" 244 245 class PosixEngineListener : public PosixListenerWithFdSupport { 246 public: 247 PosixEngineListener() = default; 248 ~PosixEngineListener() override = default; 249 absl::StatusOr<int> Bind(const grpc_event_engine::experimental::EventEngine:: 250 ResolvedAddress& /*addr*/) override { 251 grpc_core::Crash( 252 "EventEngine::Listener::Bind not supported on this platform"); 253 } 254 absl::Status Start() override { 255 grpc_core::Crash( 256 "EventEngine::Listener::Start not supported on this platform"); 257 } 258 absl::StatusOr<int> BindWithFd( 259 const EventEngine::ResolvedAddress& /*addr*/, 260 PosixListenerWithFdSupport::OnPosixBindNewFdCallback 261 /*on_bind_new_fd*/) override { 262 grpc_core::Crash( 263 "PosixEngineListener::BindWithFd not supported on this " 264 "platform"); 265 } 266 absl::Status HandleExternalConnection( 267 int /*listener_fd*/, int /*fd*/, SliceBuffer* /*pending_data*/) override { 268 grpc_core::Crash( 269 "PosixEngineListener::HandleExternalConnection not " 270 "supported on this platform"); 271 } 272 void ShutdownListeningFds() override { 273 grpc_core::Crash( 274 "PosixEngineListener::ShutdownListeningFds not supported on " 275 "this platform"); 276 } 277 }; 278 279 #endif 280 281 } // namespace experimental 282 } // namespace grpc_event_engine 283 #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_LISTENER_H 284