1 // Copyright 2022 The gRPC Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENGINE_H 15 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENGINE_H 16 17 #include <grpc/support/port_platform.h> 18 19 #include "src/core/lib/iomgr/port.h" // IWYU pragma: keep 20 21 #ifdef GPR_WINDOWS 22 23 #include <grpc/event_engine/endpoint_config.h> 24 #include <grpc/event_engine/event_engine.h> 25 #include <grpc/event_engine/memory_allocator.h> 26 #include <grpc/event_engine/slice_buffer.h> 27 28 #include <memory> 29 30 #include "absl/status/status.h" 31 #include "absl/status/statusor.h" 32 #include "absl/strings/string_view.h" 33 #include "src/core/lib/event_engine/ares_resolver.h" 34 #include "src/core/lib/event_engine/handle_containers.h" 35 #include "src/core/lib/event_engine/posix_engine/timer_manager.h" 36 #include "src/core/lib/event_engine/thread_pool/thread_pool.h" 37 #include "src/core/lib/event_engine/windows/iocp.h" 38 #include "src/core/lib/event_engine/windows/windows_endpoint.h" 39 #include "src/core/lib/surface/init_internally.h" 40 #include "src/core/util/sync.h" 41 #include "src/core/util/time.h" 42 43 namespace grpc_event_engine { 44 namespace experimental { 45 46 class WindowsEventEngine : public EventEngine, 47 public grpc_core::KeepsGrpcInitialized { 48 public: 49 class WindowsDNSResolver : public EventEngine::DNSResolver { 50 public: 51 WindowsDNSResolver() = delete; 52 #if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER) 53 explicit WindowsDNSResolver( 54 grpc_core::OrphanablePtr<AresResolver> ares_resolver); 55 #endif // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER) 56 void LookupHostname(LookupHostnameCallback on_resolve, 57 absl::string_view name, 58 absl::string_view default_port) override; 59 void LookupSRV(LookupSRVCallback on_resolve, 60 absl::string_view name) override; 61 void LookupTXT(LookupTXTCallback on_resolve, 62 absl::string_view name) override; 63 64 #if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER) 65 private: 66 grpc_core::OrphanablePtr<AresResolver> ares_resolver_; 67 #endif // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER) 68 }; 69 70 WindowsEventEngine(); 71 ~WindowsEventEngine() override; 72 73 absl::StatusOr<std::unique_ptr<Listener>> CreateListener( 74 Listener::AcceptCallback on_accept, 75 absl::AnyInvocable<void(absl::Status)> on_shutdown, 76 const EndpointConfig& config, 77 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) 78 override; 79 80 ConnectionHandle Connect(OnConnectCallback on_connect, 81 const ResolvedAddress& addr, 82 const EndpointConfig& args, 83 MemoryAllocator memory_allocator, 84 Duration timeout) override; 85 86 bool CancelConnect(ConnectionHandle handle) override; 87 bool IsWorkerThread() override; 88 absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver( 89 const DNSResolver::ResolverOptions& options) override; 90 void Run(Closure* closure) override; 91 void Run(absl::AnyInvocable<void()> closure) override; 92 TaskHandle RunAfter(Duration when, Closure* closure) override; 93 TaskHandle RunAfter(Duration when, 94 absl::AnyInvocable<void()> closure) override; 95 bool Cancel(TaskHandle handle) override; 96 97 // Retrieve the base ThreadPool. 98 // This is public because most classes that know the concrete 99 // WindowsEventEngine type are effectively friends. 100 // Not intended for external use. thread_pool()101 ThreadPool* thread_pool() { return thread_pool_.get(); } poller()102 IOCP* poller() { return &iocp_; } 103 104 private: 105 // The state of an active connection. 106 // 107 // This object is managed by a shared_ptr, which is owned by: 108 // 1) the deadline timer callback, and 109 // 2) the OnConnectCompleted callback. 110 class ConnectionState : public std::enable_shared_from_this<ConnectionState> { 111 public: 112 ConnectionState(std::shared_ptr<WindowsEventEngine> engine, 113 std::unique_ptr<WinSocket> socket, 114 EventEngine::ResolvedAddress address, 115 MemoryAllocator allocator, 116 EventEngine::OnConnectCallback on_connect_user_callback); 117 118 // Starts the deadline timer, and sets up the socket to notify on writes. 119 // 120 // This cannot be done in the constructor since shared_from_this is required 121 // for the callbacks to hold a ref to this object. 122 void Start(Duration timeout) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 123 124 // Returns the user's callback and resets it to nullptr to ensure it only 125 // runs once. 126 OnConnectCallback TakeCallback() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 127 128 // Create an Endpoint, transferring held object ownership to the endpoint. 129 // 130 // This can only be called once, and the connection state is no longer valid 131 // after an endpoint has been created. Callers must guarantee that the 132 // deadline timer callback will not be run. 133 std::unique_ptr<WindowsEndpoint> FinishConnectingAndMakeEndpoint( 134 ThreadPool* thread_pool) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 135 136 // Release all refs to the on-connect callback. 137 void AbortOnConnect() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 138 // Release all refs to the deadline timer callback. 139 void AbortDeadlineTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 140 141 // TODO(hork): this is unsafe. Whatever needs the socket should likely 142 // delegate responsibility to this object. socket()143 WinSocket* socket() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 144 return socket_.get(); 145 } 146 connection_handle()147 const EventEngine::ConnectionHandle& connection_handle() { 148 return connection_handle_; 149 } timer_handle()150 const EventEngine::TaskHandle& timer_handle() { return timer_handle_; } 151 mu()152 grpc_core::Mutex& mu() ABSL_LOCK_RETURNED(mu_) { return mu_; } 153 154 private: 155 // Required for the custom operator<< overload to see the private 156 // ConnectionState internal state. 157 friend std::ostream& operator<<(std::ostream& out, 158 const ConnectionState& connection_state) 159 ABSL_EXCLUSIVE_LOCKS_REQUIRED(connection_state.mu_); 160 161 // Stateful closure for the endpoint's on-connect callback. 162 // 163 // Once created, this closure must be Run or deleted to release the held 164 // refs. 165 class OnConnectedCallback : public EventEngine::Closure { 166 public: OnConnectedCallback(WindowsEventEngine * engine,std::shared_ptr<ConnectionState> connection_state)167 OnConnectedCallback(WindowsEventEngine* engine, 168 std::shared_ptr<ConnectionState> connection_state) 169 : engine_(engine), connection_state_(std::move(connection_state)) {} 170 171 // Runs the WindowsEventEngine's OnConnectCompleted if the deadline timer 172 // hasn't fired first. 173 void Run() override; 174 175 private: 176 WindowsEventEngine* engine_; 177 std::shared_ptr<ConnectionState> connection_state_; 178 }; 179 180 // Stateful closure for the deadline timer. 181 // 182 // Once created, this closure must be Run or deleted to release the held 183 // refs. 184 class DeadlineTimerCallback : public EventEngine::Closure { 185 public: DeadlineTimerCallback(WindowsEventEngine * engine,std::shared_ptr<ConnectionState> connection_state)186 DeadlineTimerCallback(WindowsEventEngine* engine, 187 std::shared_ptr<ConnectionState> connection_state) 188 : engine_(engine), connection_state_(std::move(connection_state)) {} 189 190 // Runs the WindowsEventEngine's OnDeadlineTimerFired if the deadline 191 // timer hasn't fired first. 192 void Run() override; 193 194 private: 195 WindowsEventEngine* engine_; 196 std::shared_ptr<ConnectionState> connection_state_; 197 }; 198 199 // everything is guarded by mu_; 200 grpc_core::Mutex mu_ 201 ABSL_ACQUIRED_BEFORE(WindowsEventEngine::connection_mu_); 202 // Endpoint connection state. 203 std::unique_ptr<WinSocket> socket_ ABSL_GUARDED_BY(mu_); 204 EventEngine::ResolvedAddress address_ ABSL_GUARDED_BY(mu_); 205 MemoryAllocator allocator_ ABSL_GUARDED_BY(mu_); 206 EventEngine::OnConnectCallback on_connect_user_callback_ 207 ABSL_GUARDED_BY(mu_); 208 // This guarantees the EventEngine survives long enough to execute these 209 // deadline timer or on-connect callbacks. 210 std::shared_ptr<WindowsEventEngine> engine_ ABSL_GUARDED_BY(mu_); 211 // Owned closures. These hold refs to this object. 212 std::unique_ptr<OnConnectedCallback> on_connected_cb_ ABSL_GUARDED_BY(mu_); 213 std::unique_ptr<DeadlineTimerCallback> deadline_timer_cb_ 214 ABSL_GUARDED_BY(mu_); 215 // Their respective method handles. 216 EventEngine::ConnectionHandle connection_handle_ ABSL_GUARDED_BY(mu_) = 217 EventEngine::ConnectionHandle::kInvalid; 218 EventEngine::TaskHandle timer_handle_ ABSL_GUARDED_BY(mu_) = 219 EventEngine::TaskHandle::kInvalid; 220 // Flag to ensure that only one of the even closures will complete its 221 // responsibilities. 222 bool has_run_ ABSL_GUARDED_BY(mu_) = false; 223 }; 224 225 // Required for the custom operator<< overload to see the private 226 // ConnectionState type. 227 friend std::ostream& operator<<(std::ostream& out, 228 const ConnectionState& connection_state); 229 230 struct TimerClosure; 231 232 // A poll worker which schedules itself unless kicked 233 class IOCPWorkClosure : public EventEngine::Closure { 234 public: 235 explicit IOCPWorkClosure(ThreadPool* thread_pool, IOCP* iocp); 236 void Run() override; 237 void WaitForShutdown(); 238 239 private: 240 std::atomic<int> workers_{1}; 241 grpc_core::Notification done_signal_; 242 ThreadPool* thread_pool_; 243 IOCP* iocp_; 244 }; 245 246 // Called via IOCP notifications when a connection is ready to be processed. 247 // Either this or the deadline timer will run, never both. 248 void OnConnectCompleted(std::shared_ptr<ConnectionState> state); 249 250 // Called after a timeout when no connection has been established. 251 // Either this or the on-connect callback will run, never both. 252 void OnDeadlineTimerFired(std::shared_ptr<ConnectionState> state); 253 254 // CancelConnect, called from within the deadline timer. 255 // Timer cancellation is not possible. 256 bool CancelConnectFromDeadlineTimer(ConnectionState* connection_state) 257 ABSL_EXCLUSIVE_LOCKS_REQUIRED(connection_state->mu()); 258 259 // Completes the connection cancellation logic after checking handle 260 // validity and optionally cancelling deadline timers. 261 bool CancelConnectInternalStateLocked(ConnectionState* connection_state) 262 ABSL_EXCLUSIVE_LOCKS_REQUIRED(connection_state->mu()); 263 264 EventEngine::TaskHandle RunAfterInternal(Duration when, 265 absl::AnyInvocable<void()> cb); 266 grpc_core::Mutex task_mu_; 267 TaskHandleSet known_handles_ ABSL_GUARDED_BY(task_mu_); 268 grpc_core::Mutex connection_mu_; 269 grpc_core::CondVar connection_cv_; 270 ConnectionHandleSet known_connection_handles_ ABSL_GUARDED_BY(connection_mu_); 271 std::atomic<intptr_t> aba_token_{0}; 272 273 std::shared_ptr<ThreadPool> thread_pool_; 274 IOCP iocp_; 275 TimerManager timer_manager_; 276 IOCPWorkClosure iocp_worker_; 277 }; 278 279 } // namespace experimental 280 } // namespace grpc_event_engine 281 282 #endif 283 284 #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENGINE_H 285