1 // Copyright 2022 The gRPC Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_H 15 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_H 16 #include <grpc/event_engine/endpoint_config.h> 17 #include <grpc/event_engine/event_engine.h> 18 #include <grpc/event_engine/memory_allocator.h> 19 #include <grpc/support/port_platform.h> 20 21 #include <atomic> 22 #include <cstdint> 23 #include <memory> 24 #include <string> 25 #include <utility> 26 #include <vector> 27 28 #include "absl/base/thread_annotations.h" 29 #include "absl/container/flat_hash_map.h" 30 #include "absl/functional/any_invocable.h" 31 #include "absl/hash/hash.h" 32 #include "absl/status/status.h" 33 #include "absl/status/statusor.h" 34 #include "absl/strings/string_view.h" 35 #include "src/core/lib/event_engine/handle_containers.h" 36 #include "src/core/lib/event_engine/posix.h" 37 #include "src/core/lib/event_engine/posix_engine/event_poller.h" 38 #include "src/core/lib/event_engine/posix_engine/timer_manager.h" 39 #include "src/core/lib/event_engine/ref_counted_dns_resolver_interface.h" 40 #include "src/core/lib/event_engine/thread_pool/thread_pool.h" 41 #include "src/core/lib/iomgr/port.h" 42 #include "src/core/lib/surface/init_internally.h" 43 #include "src/core/util/orphanable.h" 44 #include "src/core/util/sync.h" 45 46 #ifdef GRPC_POSIX_SOCKET_TCP 47 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" 48 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h" 49 #endif // GRPC_POSIX_SOCKET_TCP 50 51 namespace grpc_event_engine { 52 namespace experimental { 53 54 #ifdef GRPC_POSIX_SOCKET_TCP 55 // A helper class to handle asynchronous connect operations. 56 class AsyncConnect { 57 public: AsyncConnect(EventEngine::OnConnectCallback on_connect,std::shared_ptr<EventEngine> engine,ThreadPool * executor,grpc_event_engine::experimental::EventHandle * fd,MemoryAllocator && allocator,const grpc_event_engine::experimental::PosixTcpOptions & options,std::string resolved_addr_str,int64_t connection_handle)58 AsyncConnect(EventEngine::OnConnectCallback on_connect, 59 std::shared_ptr<EventEngine> engine, ThreadPool* executor, 60 grpc_event_engine::experimental::EventHandle* fd, 61 MemoryAllocator&& allocator, 62 const grpc_event_engine::experimental::PosixTcpOptions& options, 63 std::string resolved_addr_str, int64_t connection_handle) 64 : on_connect_(std::move(on_connect)), 65 engine_(engine), 66 executor_(executor), 67 fd_(fd), 68 allocator_(std::move(allocator)), 69 options_(options), 70 resolved_addr_str_(resolved_addr_str), 71 connection_handle_(connection_handle), 72 connect_cancelled_(false) {} 73 74 void Start(EventEngine::Duration timeout); 75 ~AsyncConnect(); 76 77 private: 78 friend class PosixEventEngine; 79 void OnTimeoutExpired(absl::Status status); 80 81 void OnWritable(absl::Status status) ABSL_NO_THREAD_SAFETY_ANALYSIS; 82 83 grpc_core::Mutex mu_; 84 grpc_event_engine::experimental::PosixEngineClosure* on_writable_ = nullptr; 85 EventEngine::OnConnectCallback on_connect_; 86 std::shared_ptr<EventEngine> engine_; 87 ThreadPool* executor_; 88 EventEngine::TaskHandle alarm_handle_; 89 int refs_{2}; 90 grpc_event_engine::experimental::EventHandle* fd_; 91 MemoryAllocator allocator_; 92 grpc_event_engine::experimental::PosixTcpOptions options_; 93 std::string resolved_addr_str_; 94 int64_t connection_handle_; 95 bool connect_cancelled_; 96 }; 97 98 // A helper class to manager lifetime of the poller associated with the 99 // posix EventEngine. 100 class PosixEnginePollerManager 101 : public grpc_event_engine::experimental::Scheduler { 102 public: 103 explicit PosixEnginePollerManager(std::shared_ptr<ThreadPool> executor); 104 explicit PosixEnginePollerManager( 105 std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller> 106 poller); Poller()107 grpc_event_engine::experimental::PosixEventPoller* Poller() { 108 return poller_.get(); 109 } 110 Executor()111 ThreadPool* Executor() { return executor_.get(); } 112 113 void Run(experimental::EventEngine::Closure* closure) override; 114 void Run(absl::AnyInvocable<void()>) override; 115 IsShuttingDown()116 bool IsShuttingDown() { 117 return poller_state_.load(std::memory_order_acquire) == 118 PollerState::kShuttingDown; 119 } 120 void TriggerShutdown(); 121 122 ~PosixEnginePollerManager() override; 123 124 private: 125 enum class PollerState { kExternal, kOk, kShuttingDown }; 126 std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller> poller_; 127 std::atomic<PollerState> poller_state_{PollerState::kOk}; 128 std::shared_ptr<ThreadPool> executor_; 129 bool trigger_shutdown_called_; 130 }; 131 #endif // GRPC_POSIX_SOCKET_TCP 132 133 // An iomgr-based Posix EventEngine implementation. 134 // All methods require an ExecCtx to already exist on the thread's stack. 135 class PosixEventEngine final : public PosixEventEngineWithFdSupport, 136 public grpc_core::KeepsGrpcInitialized { 137 public: 138 class PosixDNSResolver : public EventEngine::DNSResolver { 139 public: 140 explicit PosixDNSResolver( 141 grpc_core::OrphanablePtr<RefCountedDNSResolverInterface> dns_resolver); 142 void LookupHostname(LookupHostnameCallback on_resolve, 143 absl::string_view name, 144 absl::string_view default_port) override; 145 void LookupSRV(LookupSRVCallback on_resolve, 146 absl::string_view name) override; 147 void LookupTXT(LookupTXTCallback on_resolve, 148 absl::string_view name) override; 149 150 private: 151 grpc_core::OrphanablePtr<RefCountedDNSResolverInterface> dns_resolver_; 152 }; 153 154 #ifdef GRPC_POSIX_SOCKET_TCP 155 // Constructs an EventEngine which has a shared ownership of the poller. Do 156 // not call this constructor directly. Instead use the 157 // MakeTestOnlyPosixEventEngine static method. Its expected to be used only in 158 // tests. 159 explicit PosixEventEngine( 160 std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller> 161 poller); 162 PosixEventEngine(); 163 #else // GRPC_POSIX_SOCKET_TCP 164 PosixEventEngine(); 165 #endif // GRPC_POSIX_SOCKET_TCP 166 167 ~PosixEventEngine() override; 168 169 std::unique_ptr<EventEngine::Endpoint> CreatePosixEndpointFromFd( 170 int fd, const EndpointConfig& config, 171 MemoryAllocator memory_allocator) override; 172 std::unique_ptr<EventEngine::Endpoint> CreateEndpointFromFd( 173 int fd, const EndpointConfig& config) override; 174 175 ConnectionHandle CreateEndpointFromUnconnectedFd( 176 int fd, EventEngine::OnConnectCallback on_connect, 177 const EventEngine::ResolvedAddress& addr, const EndpointConfig& config, 178 MemoryAllocator memory_allocator, EventEngine::Duration timeout) override; 179 180 absl::StatusOr<std::unique_ptr<Listener>> CreateListener( 181 Listener::AcceptCallback on_accept, 182 absl::AnyInvocable<void(absl::Status)> on_shutdown, 183 const EndpointConfig& config, 184 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) 185 override; 186 187 absl::StatusOr<std::unique_ptr<EventEngine::Listener>> CreatePosixListener( 188 PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept, 189 absl::AnyInvocable<void(absl::Status)> on_shutdown, 190 const EndpointConfig& config, 191 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) 192 override; 193 194 ConnectionHandle Connect(OnConnectCallback on_connect, 195 const ResolvedAddress& addr, 196 const EndpointConfig& args, 197 MemoryAllocator memory_allocator, 198 Duration timeout) override; 199 200 bool CancelConnect(ConnectionHandle handle) override; 201 bool IsWorkerThread() override; 202 absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver( 203 GRPC_UNUSED const DNSResolver::ResolverOptions& options) override; 204 void Run(Closure* closure) override; 205 void Run(absl::AnyInvocable<void()> closure) override; 206 // Caution!! The timer implementation cannot create any fds. See #20418. 207 TaskHandle RunAfter(Duration when, Closure* closure) override; 208 TaskHandle RunAfter(Duration when, 209 absl::AnyInvocable<void()> closure) override; 210 bool Cancel(TaskHandle handle) override; 211 212 #ifdef GRPC_POSIX_SOCKET_TCP 213 // The posix EventEngine returned by this method would have a shared ownership 214 // of the poller and would not be in-charge of driving the poller by calling 215 // its Work(..) method. Instead its upto the test to drive the poller. The 216 // returned posix EventEngine will also not attempt to shutdown the poller 217 // since it does not own it. MakeTestOnlyPosixEventEngine(std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller> test_only_poller)218 static std::shared_ptr<PosixEventEngine> MakeTestOnlyPosixEventEngine( 219 std::shared_ptr<grpc_event_engine::experimental::PosixEventPoller> 220 test_only_poller) { 221 return std::make_shared<PosixEventEngine>(std::move(test_only_poller)); 222 } 223 #endif // GRPC_POSIX_SOCKET_TCP 224 225 private: 226 struct ClosureData; 227 EventEngine::TaskHandle RunAfterInternal(Duration when, 228 absl::AnyInvocable<void()> cb); 229 230 #ifdef GRPC_POSIX_SOCKET_TCP 231 friend class AsyncConnect; 232 struct ConnectionShard { 233 grpc_core::Mutex mu; 234 absl::flat_hash_map<int64_t, AsyncConnect*> pending_connections 235 ABSL_GUARDED_BY(&mu); 236 }; 237 238 static void PollerWorkInternal( 239 std::shared_ptr<PosixEnginePollerManager> poller_manager); 240 241 ConnectionHandle CreateEndpointFromUnconnectedFdInternal( 242 int fd, EventEngine::OnConnectCallback on_connect, 243 const EventEngine::ResolvedAddress& addr, const PosixTcpOptions& options, 244 MemoryAllocator memory_allocator, EventEngine::Duration timeout); 245 246 void OnConnectFinishInternal(int connection_handle); 247 248 std::vector<ConnectionShard> connection_shards_; 249 std::atomic<int64_t> last_connection_id_{1}; 250 251 #endif // GRPC_POSIX_SOCKET_TCP 252 253 grpc_core::Mutex mu_; 254 TaskHandleSet known_handles_ ABSL_GUARDED_BY(mu_); 255 std::atomic<intptr_t> aba_token_{0}; 256 std::shared_ptr<ThreadPool> executor_; 257 std::shared_ptr<TimerManager> timer_manager_; 258 #ifdef GRPC_POSIX_SOCKET_TCP 259 std::shared_ptr<PosixEnginePollerManager> poller_manager_; 260 #endif // GRPC_POSIX_SOCKET_TCP 261 }; 262 263 } // namespace experimental 264 } // namespace grpc_event_engine 265 266 #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_POSIX_ENGINE_H 267