• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENGINE_H
15 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENGINE_H
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/lib/iomgr/port.h"  // IWYU pragma: keep
20 
21 #ifdef GPR_WINDOWS
22 
23 #include <grpc/event_engine/endpoint_config.h>
24 #include <grpc/event_engine/event_engine.h>
25 #include <grpc/event_engine/memory_allocator.h>
26 #include <grpc/event_engine/slice_buffer.h>
27 
28 #include <memory>
29 
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/string_view.h"
33 #include "src/core/lib/event_engine/ares_resolver.h"
34 #include "src/core/lib/event_engine/handle_containers.h"
35 #include "src/core/lib/event_engine/posix_engine/timer_manager.h"
36 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
37 #include "src/core/lib/event_engine/windows/iocp.h"
38 #include "src/core/lib/event_engine/windows/windows_endpoint.h"
39 #include "src/core/lib/surface/init_internally.h"
40 #include "src/core/util/sync.h"
41 #include "src/core/util/time.h"
42 
43 namespace grpc_event_engine {
44 namespace experimental {
45 
46 class WindowsEventEngine : public EventEngine,
47                            public grpc_core::KeepsGrpcInitialized {
48  public:
49   class WindowsDNSResolver : public EventEngine::DNSResolver {
50    public:
51     WindowsDNSResolver() = delete;
52 #if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
53     explicit WindowsDNSResolver(
54         grpc_core::OrphanablePtr<AresResolver> ares_resolver);
55 #endif  // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
56     void LookupHostname(LookupHostnameCallback on_resolve,
57                         absl::string_view name,
58                         absl::string_view default_port) override;
59     void LookupSRV(LookupSRVCallback on_resolve,
60                    absl::string_view name) override;
61     void LookupTXT(LookupTXTCallback on_resolve,
62                    absl::string_view name) override;
63 
64 #if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
65    private:
66     grpc_core::OrphanablePtr<AresResolver> ares_resolver_;
67 #endif  // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
68   };
69 
70   WindowsEventEngine();
71   ~WindowsEventEngine() override;
72 
73   absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
74       Listener::AcceptCallback on_accept,
75       absl::AnyInvocable<void(absl::Status)> on_shutdown,
76       const EndpointConfig& config,
77       std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
78       override;
79 
80   ConnectionHandle Connect(OnConnectCallback on_connect,
81                            const ResolvedAddress& addr,
82                            const EndpointConfig& args,
83                            MemoryAllocator memory_allocator,
84                            Duration timeout) override;
85 
86   bool CancelConnect(ConnectionHandle handle) override;
87   bool IsWorkerThread() override;
88   absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver(
89       const DNSResolver::ResolverOptions& options) override;
90   void Run(Closure* closure) override;
91   void Run(absl::AnyInvocable<void()> closure) override;
92   TaskHandle RunAfter(Duration when, Closure* closure) override;
93   TaskHandle RunAfter(Duration when,
94                       absl::AnyInvocable<void()> closure) override;
95   bool Cancel(TaskHandle handle) override;
96 
97   // Retrieve the base ThreadPool.
98   // This is public because most classes that know the concrete
99   // WindowsEventEngine type are effectively friends.
100   // Not intended for external use.
thread_pool()101   ThreadPool* thread_pool() { return thread_pool_.get(); }
poller()102   IOCP* poller() { return &iocp_; }
103 
104  private:
105   // The state of an active connection.
106   //
107   // This object is managed by a shared_ptr, which is owned by:
108   //   1) the deadline timer callback, and
109   //   2) the OnConnectCompleted callback.
110   class ConnectionState : public std::enable_shared_from_this<ConnectionState> {
111    public:
112     ConnectionState(std::shared_ptr<WindowsEventEngine> engine,
113                     std::unique_ptr<WinSocket> socket,
114                     EventEngine::ResolvedAddress address,
115                     MemoryAllocator allocator,
116                     EventEngine::OnConnectCallback on_connect_user_callback);
117 
118     // Starts the deadline timer, and sets up the socket to notify on writes.
119     //
120     // This cannot be done in the constructor since shared_from_this is required
121     // for the callbacks to hold a ref to this object.
122     void Start(Duration timeout) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
123 
124     // Returns the user's callback and resets it to nullptr to ensure it only
125     // runs once.
126     OnConnectCallback TakeCallback() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
127 
128     // Create an Endpoint, transferring held object ownership to the endpoint.
129     //
130     // This can only be called once, and the connection state is no longer valid
131     // after an endpoint has been created. Callers must guarantee that the
132     // deadline timer callback will not be run.
133     std::unique_ptr<WindowsEndpoint> FinishConnectingAndMakeEndpoint(
134         ThreadPool* thread_pool) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
135 
136     // Release all refs to the on-connect callback.
137     void AbortOnConnect() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
138     // Release all refs to the deadline timer callback.
139     void AbortDeadlineTimer() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
140 
141     // TODO(hork): this is unsafe. Whatever needs the socket should likely
142     // delegate responsibility to this object.
socket()143     WinSocket* socket() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
144       return socket_.get();
145     }
146 
connection_handle()147     const EventEngine::ConnectionHandle& connection_handle() {
148       return connection_handle_;
149     }
timer_handle()150     const EventEngine::TaskHandle& timer_handle() { return timer_handle_; }
151 
mu()152     grpc_core::Mutex& mu() ABSL_LOCK_RETURNED(mu_) { return mu_; }
153 
154    private:
155     // Required for the custom operator<< overload to see the private
156     // ConnectionState internal state.
157     friend std::ostream& operator<<(std::ostream& out,
158                                     const ConnectionState& connection_state)
159         ABSL_EXCLUSIVE_LOCKS_REQUIRED(connection_state.mu_);
160 
161     // Stateful closure for the endpoint's on-connect callback.
162     //
163     // Once created, this closure must be Run or deleted to release the held
164     // refs.
165     class OnConnectedCallback : public EventEngine::Closure {
166      public:
OnConnectedCallback(WindowsEventEngine * engine,std::shared_ptr<ConnectionState> connection_state)167       OnConnectedCallback(WindowsEventEngine* engine,
168                           std::shared_ptr<ConnectionState> connection_state)
169           : engine_(engine), connection_state_(std::move(connection_state)) {}
170 
171       // Runs the WindowsEventEngine's OnConnectCompleted if the deadline timer
172       // hasn't fired first.
173       void Run() override;
174 
175      private:
176       WindowsEventEngine* engine_;
177       std::shared_ptr<ConnectionState> connection_state_;
178     };
179 
180     // Stateful closure for the deadline timer.
181     //
182     // Once created, this closure must be Run or deleted to release the held
183     // refs.
184     class DeadlineTimerCallback : public EventEngine::Closure {
185      public:
DeadlineTimerCallback(WindowsEventEngine * engine,std::shared_ptr<ConnectionState> connection_state)186       DeadlineTimerCallback(WindowsEventEngine* engine,
187                             std::shared_ptr<ConnectionState> connection_state)
188           : engine_(engine), connection_state_(std::move(connection_state)) {}
189 
190       // Runs the WindowsEventEngine's OnDeadlineTimerFired if the deadline
191       // timer hasn't fired first.
192       void Run() override;
193 
194      private:
195       WindowsEventEngine* engine_;
196       std::shared_ptr<ConnectionState> connection_state_;
197     };
198 
199     // everything is guarded by mu_;
200     grpc_core::Mutex mu_
201         ABSL_ACQUIRED_BEFORE(WindowsEventEngine::connection_mu_);
202     // Endpoint connection state.
203     std::unique_ptr<WinSocket> socket_ ABSL_GUARDED_BY(mu_);
204     EventEngine::ResolvedAddress address_ ABSL_GUARDED_BY(mu_);
205     MemoryAllocator allocator_ ABSL_GUARDED_BY(mu_);
206     EventEngine::OnConnectCallback on_connect_user_callback_
207         ABSL_GUARDED_BY(mu_);
208     // This guarantees the EventEngine survives long enough to execute these
209     // deadline timer or on-connect callbacks.
210     std::shared_ptr<WindowsEventEngine> engine_ ABSL_GUARDED_BY(mu_);
211     // Owned closures. These hold refs to this object.
212     std::unique_ptr<OnConnectedCallback> on_connected_cb_ ABSL_GUARDED_BY(mu_);
213     std::unique_ptr<DeadlineTimerCallback> deadline_timer_cb_
214         ABSL_GUARDED_BY(mu_);
215     // Their respective method handles.
216     EventEngine::ConnectionHandle connection_handle_ ABSL_GUARDED_BY(mu_) =
217         EventEngine::ConnectionHandle::kInvalid;
218     EventEngine::TaskHandle timer_handle_ ABSL_GUARDED_BY(mu_) =
219         EventEngine::TaskHandle::kInvalid;
220     // Flag to ensure that only one of the even closures will complete its
221     // responsibilities.
222     bool has_run_ ABSL_GUARDED_BY(mu_) = false;
223   };
224 
225   // Required for the custom operator<< overload to see the private
226   // ConnectionState type.
227   friend std::ostream& operator<<(std::ostream& out,
228                                   const ConnectionState& connection_state);
229 
230   struct TimerClosure;
231 
232   // A poll worker which schedules itself unless kicked
233   class IOCPWorkClosure : public EventEngine::Closure {
234    public:
235     explicit IOCPWorkClosure(ThreadPool* thread_pool, IOCP* iocp);
236     void Run() override;
237     void WaitForShutdown();
238 
239    private:
240     std::atomic<int> workers_{1};
241     grpc_core::Notification done_signal_;
242     ThreadPool* thread_pool_;
243     IOCP* iocp_;
244   };
245 
246   // Called via IOCP notifications when a connection is ready to be processed.
247   // Either this or the deadline timer will run, never both.
248   void OnConnectCompleted(std::shared_ptr<ConnectionState> state);
249 
250   // Called after a timeout when no connection has been established.
251   // Either this or the on-connect callback will run, never both.
252   void OnDeadlineTimerFired(std::shared_ptr<ConnectionState> state);
253 
254   // CancelConnect, called from within the deadline timer.
255   // Timer cancellation is not possible.
256   bool CancelConnectFromDeadlineTimer(ConnectionState* connection_state)
257       ABSL_EXCLUSIVE_LOCKS_REQUIRED(connection_state->mu());
258 
259   // Completes the connection cancellation logic after checking handle
260   // validity and optionally cancelling deadline timers.
261   bool CancelConnectInternalStateLocked(ConnectionState* connection_state)
262       ABSL_EXCLUSIVE_LOCKS_REQUIRED(connection_state->mu());
263 
264   EventEngine::TaskHandle RunAfterInternal(Duration when,
265                                            absl::AnyInvocable<void()> cb);
266   grpc_core::Mutex task_mu_;
267   TaskHandleSet known_handles_ ABSL_GUARDED_BY(task_mu_);
268   grpc_core::Mutex connection_mu_;
269   grpc_core::CondVar connection_cv_;
270   ConnectionHandleSet known_connection_handles_ ABSL_GUARDED_BY(connection_mu_);
271   std::atomic<intptr_t> aba_token_{0};
272 
273   std::shared_ptr<ThreadPool> thread_pool_;
274   IOCP iocp_;
275   TimerManager timer_manager_;
276   IOCPWorkClosure iocp_worker_;
277 };
278 
279 }  // namespace experimental
280 }  // namespace grpc_event_engine
281 
282 #endif
283 
284 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_WINDOWS_WINDOWS_ENGINE_H
285