1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15
16 #ifdef GPR_WINDOWS
17
18 #include <grpc/event_engine/endpoint_config.h>
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/event_engine/memory_allocator.h>
21 #include <grpc/event_engine/slice_buffer.h>
22 #include <grpc/support/cpu.h>
23
24 #include <memory>
25 #include <ostream>
26
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "absl/status/status.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/string_view.h"
32 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
33 #include "src/core/lib/event_engine/common_closures.h"
34 #include "src/core/lib/event_engine/handle_containers.h"
35 #include "src/core/lib/event_engine/posix_engine/timer_manager.h"
36 #include "src/core/lib/event_engine/tcp_socket_utils.h"
37 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
38 #include "src/core/lib/event_engine/utils.h"
39 #include "src/core/lib/event_engine/windows/grpc_polled_fd_windows.h"
40 #include "src/core/lib/event_engine/windows/iocp.h"
41 #include "src/core/lib/event_engine/windows/native_windows_dns_resolver.h"
42 #include "src/core/lib/event_engine/windows/windows_endpoint.h"
43 #include "src/core/lib/event_engine/windows/windows_engine.h"
44 #include "src/core/lib/event_engine/windows/windows_listener.h"
45 #include "src/core/lib/iomgr/error.h"
46 #include "src/core/lib/surface/init_internally.h"
47 #include "src/core/util/crash.h"
48 #include "src/core/util/dump_args.h"
49 #include "src/core/util/sync.h"
50 #include "src/core/util/time.h"
51
52 namespace grpc_event_engine {
53 namespace experimental {
54
operator <<(std::ostream & out,const WindowsEventEngine::ConnectionState & connection_state)55 std::ostream& operator<<(
56 std::ostream& out,
57 const WindowsEventEngine::ConnectionState& connection_state) {
58 out << "ConnectionState::" << &connection_state
59 << ": connection_state.address="
60 << ResolvedAddressToURI(connection_state.address_) << ","
61 << GRPC_DUMP_ARGS(connection_state.has_run_,
62 connection_state.connection_handle_,
63 connection_state.timer_handle_);
64 return out;
65 }
66
67 // ---- ConnectionState ----
68
ConnectionState(std::shared_ptr<WindowsEventEngine> engine,std::unique_ptr<WinSocket> socket,EventEngine::ResolvedAddress address,MemoryAllocator allocator,EventEngine::OnConnectCallback on_connect_user_callback)69 WindowsEventEngine::ConnectionState::ConnectionState(
70 std::shared_ptr<WindowsEventEngine> engine,
71 std::unique_ptr<WinSocket> socket, EventEngine::ResolvedAddress address,
72 MemoryAllocator allocator,
73 EventEngine::OnConnectCallback on_connect_user_callback)
74 : socket_(std::move(socket)),
75 address_(address),
76 allocator_(std::move(allocator)),
77 on_connect_user_callback_(std::move(on_connect_user_callback)),
78 engine_(std::move(engine)) {
79 CHECK(socket_ != nullptr);
80 connection_handle_ = ConnectionHandle{reinterpret_cast<intptr_t>(this),
81 engine_->aba_token_.fetch_add(1)};
82 }
83
Start(Duration timeout)84 void WindowsEventEngine::ConnectionState::Start(Duration timeout) {
85 on_connected_cb_ =
86 std::make_unique<OnConnectedCallback>(engine_.get(), shared_from_this());
87 socket_->NotifyOnWrite(on_connected_cb_.get());
88 deadline_timer_cb_ = std::make_unique<DeadlineTimerCallback>(
89 engine_.get(), shared_from_this());
90 timer_handle_ = engine_->RunAfter(timeout, deadline_timer_cb_.get());
91 }
92
93 EventEngine::OnConnectCallback
TakeCallback()94 WindowsEventEngine::ConnectionState::TakeCallback() {
95 return std::exchange(on_connect_user_callback_, nullptr);
96 }
97
98 std::unique_ptr<WindowsEndpoint>
FinishConnectingAndMakeEndpoint(ThreadPool * thread_pool)99 WindowsEventEngine::ConnectionState::FinishConnectingAndMakeEndpoint(
100 ThreadPool* thread_pool) {
101 ChannelArgsEndpointConfig cfg;
102 return std::make_unique<WindowsEndpoint>(address_, std::move(socket_),
103 std::move(allocator_), cfg,
104 thread_pool, engine_);
105 }
106
AbortOnConnect()107 void WindowsEventEngine::ConnectionState::AbortOnConnect() {
108 on_connected_cb_.reset();
109 }
110
AbortDeadlineTimer()111 void WindowsEventEngine::ConnectionState::AbortDeadlineTimer() {
112 deadline_timer_cb_.reset();
113 }
114
Run()115 void WindowsEventEngine::ConnectionState::OnConnectedCallback::Run() {
116 DCHECK_NE(connection_state_, nullptr)
117 << "ConnectionState::OnConnectedCallback::" << this
118 << " has already run. It should only ever run once.";
119 bool has_run;
120 {
121 grpc_core::MutexLock lock(&connection_state_->mu_);
122 has_run = std::exchange(connection_state_->has_run_, true);
123 }
124 // This could race with the deadline timer. If so, the engine's
125 // OnConnectCompleted callback should not run, and the refs should be
126 // released.
127 if (has_run) {
128 connection_state_.reset();
129 return;
130 }
131 engine_->OnConnectCompleted(std::move(connection_state_));
132 }
133
Run()134 void WindowsEventEngine::ConnectionState::DeadlineTimerCallback::Run() {
135 DCHECK_NE(connection_state_, nullptr)
136 << "ConnectionState::DeadlineTimerCallback::" << this
137 << " has already run. It should only ever run once.";
138 bool has_run;
139 {
140 grpc_core::MutexLock lock(&connection_state_->mu_);
141 has_run = std::exchange(connection_state_->has_run_, true);
142 }
143 // This could race with the on connected callback. If so, the engine's
144 // OnDeadlineTimerFired callback should not run, and the refs should be
145 // released.
146 if (has_run) {
147 connection_state_.reset();
148 return;
149 }
150 engine_->OnDeadlineTimerFired(std::move(connection_state_));
151 }
152
153 // ---- IOCPWorkClosure ----
154
IOCPWorkClosure(ThreadPool * thread_pool,IOCP * iocp)155 WindowsEventEngine::IOCPWorkClosure::IOCPWorkClosure(ThreadPool* thread_pool,
156 IOCP* iocp)
157 : thread_pool_(thread_pool), iocp_(iocp) {
158 thread_pool_->Run(this);
159 }
160
Run()161 void WindowsEventEngine::IOCPWorkClosure::Run() {
162 if (done_signal_.HasBeenNotified()) return;
163 auto result = iocp_->Work(std::chrono::seconds(60), [this] {
164 workers_.fetch_add(1);
165 thread_pool_->Run(this);
166 });
167 if (result == Poller::WorkResult::kDeadlineExceeded) {
168 // iocp received no messages. restart the worker
169 workers_.fetch_add(1);
170 thread_pool_->Run(this);
171 }
172 if (workers_.fetch_sub(1) == 1) done_signal_.Notify();
173 }
174
WaitForShutdown()175 void WindowsEventEngine::IOCPWorkClosure::WaitForShutdown() {
176 done_signal_.WaitForNotification();
177 }
178
179 // ---- WindowsEventEngine ----
180
181 // TODO(hork): The iomgr timer and execution engine can be reused. It should
182 // be separated out from the posix_engine and instantiated as components. It is
183 // effectively copied below.
184
185 struct WindowsEventEngine::TimerClosure final : public EventEngine::Closure {
186 absl::AnyInvocable<void()> cb;
187 Timer timer;
188 WindowsEventEngine* engine;
189 EventEngine::TaskHandle handle;
190
Rungrpc_event_engine::experimental::WindowsEventEngine::TimerClosure191 void Run() override {
192 GRPC_TRACE_LOG(event_engine, INFO)
193 << "WindowsEventEngine:" << engine << " executing callback:" << handle;
194 {
195 grpc_core::MutexLock lock(&engine->task_mu_);
196 engine->known_handles_.erase(handle);
197 }
198 cb();
199 delete this;
200 }
201 };
202
WindowsEventEngine()203 WindowsEventEngine::WindowsEventEngine()
204 : thread_pool_(
205 MakeThreadPool(grpc_core::Clamp(gpr_cpu_num_cores(), 4u, 16u))),
206 iocp_(thread_pool_.get()),
207 timer_manager_(thread_pool_),
208 iocp_worker_(thread_pool_.get(), &iocp_) {
209 WSADATA wsaData;
210 int status = WSAStartup(MAKEWORD(2, 0), &wsaData);
211 CHECK_EQ(status, 0);
212 }
213
~WindowsEventEngine()214 WindowsEventEngine::~WindowsEventEngine() {
215 GRPC_TRACE_LOG(event_engine, INFO) << "~WindowsEventEngine::" << this;
216 {
217 task_mu_.Lock();
218 if (!known_handles_.empty()) {
219 if (GRPC_TRACE_FLAG_ENABLED(event_engine)) {
220 for (auto handle : known_handles_) {
221 LOG(ERROR) << "WindowsEventEngine:" << this
222 << " uncleared TaskHandle at shutdown:"
223 << HandleToString<EventEngine::TaskHandle>(handle);
224 }
225 }
226 // Allow a small grace period for timers to be run before shutting down.
227 auto deadline =
228 timer_manager_.Now() + grpc_core::Duration::FromSecondsAsDouble(10);
229 while (!known_handles_.empty() && timer_manager_.Now() < deadline) {
230 if (GRPC_TRACE_FLAG_ENABLED(event_engine)) {
231 VLOG_EVERY_N_SEC(2, 1) << "Waiting for timers. "
232 << known_handles_.size() << " remaining";
233 }
234 task_mu_.Unlock();
235 absl::SleepFor(absl::Milliseconds(200));
236 task_mu_.Lock();
237 }
238 }
239 CHECK(GPR_LIKELY(known_handles_.empty()));
240 task_mu_.Unlock();
241 }
242 iocp_.Kick();
243 iocp_worker_.WaitForShutdown();
244 iocp_.Shutdown();
245 CHECK_EQ(WSACleanup(), 0);
246 timer_manager_.Shutdown();
247 thread_pool_->Quiesce();
248 }
249
Cancel(EventEngine::TaskHandle handle)250 bool WindowsEventEngine::Cancel(EventEngine::TaskHandle handle) {
251 grpc_core::MutexLock lock(&task_mu_);
252 if (!known_handles_.contains(handle)) return false;
253 GRPC_TRACE_LOG(event_engine, INFO)
254 << "WindowsEventEngine::" << this << " cancelling " << handle;
255 auto* cd = reinterpret_cast<TimerClosure*>(handle.keys[0]);
256 bool r = timer_manager_.TimerCancel(&cd->timer);
257 known_handles_.erase(handle);
258 if (r) delete cd;
259 return r;
260 }
261
RunAfter(Duration when,absl::AnyInvocable<void ()> closure)262 EventEngine::TaskHandle WindowsEventEngine::RunAfter(
263 Duration when, absl::AnyInvocable<void()> closure) {
264 return RunAfterInternal(when, std::move(closure));
265 }
266
RunAfter(Duration when,EventEngine::Closure * closure)267 EventEngine::TaskHandle WindowsEventEngine::RunAfter(
268 Duration when, EventEngine::Closure* closure) {
269 return RunAfterInternal(when, [closure]() { closure->Run(); });
270 }
271
Run(absl::AnyInvocable<void ()> closure)272 void WindowsEventEngine::Run(absl::AnyInvocable<void()> closure) {
273 thread_pool_->Run(std::move(closure));
274 }
275
Run(EventEngine::Closure * closure)276 void WindowsEventEngine::Run(EventEngine::Closure* closure) {
277 thread_pool_->Run(closure);
278 }
279
RunAfterInternal(Duration when,absl::AnyInvocable<void ()> cb)280 EventEngine::TaskHandle WindowsEventEngine::RunAfterInternal(
281 Duration when, absl::AnyInvocable<void()> cb) {
282 auto when_ts = ToTimestamp(timer_manager_.Now(), when);
283 auto* cd = new TimerClosure;
284 cd->cb = std::move(cb);
285 cd->engine = this;
286 EventEngine::TaskHandle handle{reinterpret_cast<intptr_t>(cd),
287 aba_token_.fetch_add(1)};
288 grpc_core::MutexLock lock(&task_mu_);
289 known_handles_.insert(handle);
290 cd->handle = handle;
291 GRPC_TRACE_LOG(event_engine, INFO)
292 << "WindowsEventEngine:" << this << " scheduling callback:" << handle;
293 timer_manager_.TimerInit(&cd->timer, when_ts, cd);
294 return handle;
295 }
296
297 #if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
298
WindowsDNSResolver(grpc_core::OrphanablePtr<AresResolver> ares_resolver)299 WindowsEventEngine::WindowsDNSResolver::WindowsDNSResolver(
300 grpc_core::OrphanablePtr<AresResolver> ares_resolver)
301 : ares_resolver_(std::move(ares_resolver)) {}
302
LookupHostname(LookupHostnameCallback on_resolve,absl::string_view name,absl::string_view default_port)303 void WindowsEventEngine::WindowsDNSResolver::LookupHostname(
304 LookupHostnameCallback on_resolve, absl::string_view name,
305 absl::string_view default_port) {
306 ares_resolver_->LookupHostname(std::move(on_resolve), name, default_port);
307 }
308
LookupSRV(LookupSRVCallback on_resolve,absl::string_view name)309 void WindowsEventEngine::WindowsDNSResolver::LookupSRV(
310 LookupSRVCallback on_resolve, absl::string_view name) {
311 ares_resolver_->LookupSRV(std::move(on_resolve), name);
312 }
313
LookupTXT(LookupTXTCallback on_resolve,absl::string_view name)314 void WindowsEventEngine::WindowsDNSResolver::LookupTXT(
315 LookupTXTCallback on_resolve, absl::string_view name) {
316 ares_resolver_->LookupTXT(std::move(on_resolve), name);
317 }
318
319 #endif // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
320
321 absl::StatusOr<std::unique_ptr<EventEngine::DNSResolver>>
GetDNSResolver(EventEngine::DNSResolver::ResolverOptions const & options)322 WindowsEventEngine::GetDNSResolver(
323 EventEngine::DNSResolver::ResolverOptions const& options) {
324 if (ShouldUseAresDnsResolver()) {
325 #if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
326 GRPC_TRACE_LOG(event_engine_dns, INFO)
327 << "WindowsEventEngine::" << this << " creating AresResolver";
328 auto ares_resolver = AresResolver::CreateAresResolver(
329 options.dns_server,
330 std::make_unique<GrpcPolledFdFactoryWindows>(poller()),
331 shared_from_this());
332 if (!ares_resolver.ok()) {
333 return ares_resolver.status();
334 }
335 return std::make_unique<WindowsEventEngine::WindowsDNSResolver>(
336 std::move(*ares_resolver));
337 #endif // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
338 }
339 GRPC_TRACE_LOG(event_engine_dns, INFO)
340 << "WindowsEventEngine::" << this << " creating NativeWindowsDNSResolver";
341 return std::make_unique<NativeWindowsDNSResolver>(shared_from_this());
342 }
343
IsWorkerThread()344 bool WindowsEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
345
OnConnectCompleted(std::shared_ptr<ConnectionState> state)346 void WindowsEventEngine::OnConnectCompleted(
347 std::shared_ptr<ConnectionState> state) {
348 absl::StatusOr<std::unique_ptr<WindowsEndpoint>> endpoint;
349 EventEngine::OnConnectCallback cb;
350 {
351 // Connection attempt complete!
352 grpc_core::MutexLock lock(&state->mu());
353 // return early if we cannot cancel the connection timeout timer.
354 int erased_handles = 0;
355 {
356 grpc_core::MutexLock handle_lock(&connection_mu_);
357 erased_handles =
358 known_connection_handles_.erase(state->connection_handle());
359 }
360 if (erased_handles != 1 || !Cancel(state->timer_handle())) {
361 GRPC_TRACE_LOG(event_engine, INFO)
362 << "Not accepting connection since the deadline timer has fired";
363 return;
364 }
365 // Release refs held by the deadline timer.
366 state->AbortDeadlineTimer();
367 const auto& overlapped_result = state->socket()->write_info()->result();
368 if (!overlapped_result.error_status.ok()) {
369 state->socket()->Shutdown(DEBUG_LOCATION, "ConnectEx failure");
370 endpoint = overlapped_result.error_status;
371 } else if (overlapped_result.wsa_error != 0) {
372 state->socket()->Shutdown(DEBUG_LOCATION, "ConnectEx failure");
373 endpoint = GRPC_WSA_ERROR(overlapped_result.wsa_error, "ConnectEx");
374 } else {
375 endpoint = state->FinishConnectingAndMakeEndpoint(thread_pool_.get());
376 }
377 cb = state->TakeCallback();
378 }
379 // This code should be running in a thread pool thread already, so the
380 // callback can be run directly.
381 state.reset();
382 cb(std::move(endpoint));
383 }
384
OnDeadlineTimerFired(std::shared_ptr<ConnectionState> connection_state)385 void WindowsEventEngine::OnDeadlineTimerFired(
386 std::shared_ptr<ConnectionState> connection_state) {
387 bool cancelled = false;
388 EventEngine::OnConnectCallback cb;
389 {
390 grpc_core::MutexLock lock(&connection_state->mu());
391 cancelled = CancelConnectFromDeadlineTimer(connection_state.get());
392 if (cancelled) cb = connection_state->TakeCallback();
393 }
394 if (cancelled) {
395 connection_state.reset();
396 cb(absl::DeadlineExceededError("Connection timed out"));
397 }
398 }
399
Connect(OnConnectCallback on_connect,const ResolvedAddress & addr,const EndpointConfig &,MemoryAllocator memory_allocator,Duration timeout)400 EventEngine::ConnectionHandle WindowsEventEngine::Connect(
401 OnConnectCallback on_connect, const ResolvedAddress& addr,
402 const EndpointConfig& /* args */, MemoryAllocator memory_allocator,
403 Duration timeout) {
404 // TODO(hork): utilize the endpoint config
405 absl::Status status;
406 int istatus;
407 auto uri = ResolvedAddressToURI(addr);
408 if (!uri.ok()) {
409 Run([on_connect = std::move(on_connect), status = uri.status()]() mutable {
410 on_connect(status);
411 });
412 return EventEngine::ConnectionHandle::kInvalid;
413 }
414 GRPC_TRACE_LOG(event_engine, INFO)
415 << "EventEngine::" << this << " connecting to " << *uri;
416 // Use dualstack sockets where available.
417 ResolvedAddress address = addr;
418 ResolvedAddress addr6_v4mapped;
419 if (ResolvedAddressToV4Mapped(addr, &addr6_v4mapped)) {
420 address = addr6_v4mapped;
421 }
422 const int addr_family =
423 (address.address()->sa_family == AF_UNIX) ? AF_UNIX : AF_INET6;
424 const int protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP;
425 SOCKET sock = WSASocket(addr_family, SOCK_STREAM, protocol, nullptr, 0,
426 IOCP::GetDefaultSocketFlags());
427 if (sock == INVALID_SOCKET) {
428 Run([on_connect = std::move(on_connect),
429 status = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket")]() mutable {
430 on_connect(status);
431 });
432 return EventEngine::ConnectionHandle::kInvalid;
433 }
434 if (addr_family == AF_UNIX) {
435 status = SetSocketNonBlock(sock);
436 } else {
437 status = PrepareSocket(sock);
438 }
439 if (!status.ok()) {
440 Run([on_connect = std::move(on_connect), status]() mutable {
441 on_connect(status);
442 });
443 return EventEngine::ConnectionHandle::kInvalid;
444 }
445 // Grab the function pointer for ConnectEx for that specific socket It may
446 // change depending on the interface.
447 LPFN_CONNECTEX ConnectEx;
448 GUID guid = WSAID_CONNECTEX;
449 DWORD ioctl_num_bytes;
450 istatus = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid,
451 sizeof(guid), &ConnectEx, sizeof(ConnectEx),
452 &ioctl_num_bytes, nullptr, nullptr);
453 if (istatus != 0) {
454 Run([on_connect = std::move(on_connect),
455 status = GRPC_WSA_ERROR(
456 WSAGetLastError(),
457 "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)")]() mutable {
458 on_connect(status);
459 });
460 return EventEngine::ConnectionHandle::kInvalid;
461 }
462 // bind the local address
463 ResolvedAddress local_address;
464 if (addr_family == AF_UNIX) {
465 // For ConnectEx() to work for AF_UNIX, the sock needs to be bound to
466 // the local address of an unnamed socket.
467 sockaddr addr = {};
468 addr.sa_family = AF_UNIX;
469 local_address = ResolvedAddress(&addr, sizeof(addr));
470 } else {
471 local_address = ResolvedAddressMakeWild6(0);
472 }
473 istatus = bind(sock, local_address.address(), local_address.size());
474 if (istatus != 0) {
475 Run([on_connect = std::move(on_connect),
476 status = GRPC_WSA_ERROR(WSAGetLastError(), "bind")]() mutable {
477 on_connect(status);
478 });
479 return EventEngine::ConnectionHandle::kInvalid;
480 }
481 // Prepare the socket to receive a connection
482 auto connection_state = std::make_shared<ConnectionState>(
483 std::static_pointer_cast<WindowsEventEngine>(shared_from_this()),
484 /*socket=*/iocp_.Watch(sock), address,
485 /*memory_allocator=*/std::move(memory_allocator),
486 /*on_connect_user_callback=*/std::move(on_connect));
487 grpc_core::MutexLock lock(&connection_state->mu());
488 auto* info = connection_state->socket()->write_info();
489 {
490 grpc_core::MutexLock connection_handle_lock(&connection_mu_);
491 known_connection_handles_.insert(connection_state->connection_handle());
492 }
493 connection_state->Start(timeout);
494 bool success =
495 ConnectEx(connection_state->socket()->raw_socket(), address.address(),
496 address.size(), nullptr, 0, nullptr, info->overlapped());
497 // It wouldn't be unusual to get a success immediately. But we'll still get an
498 // IOCP notification, so let's ignore it.
499 if (success) return connection_state->connection_handle();
500 // Otherwise, we need to handle an error or a pending IO Event.
501 int last_error = WSAGetLastError();
502 if (last_error == ERROR_IO_PENDING) {
503 // Overlapped I/O operation is in progress.
504 return connection_state->connection_handle();
505 }
506 // Time to abort the connection.
507 // The on-connect callback won't run, so we must clean up its state.
508 connection_state->AbortOnConnect();
509 int erased_handles = 0;
510 {
511 grpc_core::MutexLock connection_handle_lock(&connection_mu_);
512 erased_handles =
513 known_connection_handles_.erase(connection_state->connection_handle());
514 }
515 CHECK_EQ(erased_handles, 1) << "Did not find connection handle "
516 << connection_state->connection_handle()
517 << " after a synchronous connection failure. "
518 "This should not be possible.";
519 connection_state->socket()->Shutdown(DEBUG_LOCATION, "ConnectEx");
520 if (!Cancel(connection_state->timer_handle())) {
521 // The deadline timer will run, or is running.
522 return EventEngine::ConnectionHandle::kInvalid;
523 }
524 // The deadline timer won't run, so we must clean up its state.
525 connection_state->AbortDeadlineTimer();
526 Run([connection_state = std::move(connection_state),
527 status = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx")]() mutable {
528 EventEngine::OnConnectCallback cb;
529 {
530 grpc_core::MutexLock lock(&connection_state->mu());
531 cb = connection_state->TakeCallback();
532 }
533 connection_state.reset();
534 cb(std::move(status));
535 });
536 return EventEngine::ConnectionHandle::kInvalid;
537 }
538
CancelConnect(EventEngine::ConnectionHandle handle)539 bool WindowsEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) {
540 if (handle == EventEngine::ConnectionHandle::kInvalid) {
541 GRPC_TRACE_LOG(event_engine, INFO)
542 << "Attempted to cancel an invalid connection handle";
543 return false;
544 }
545 // Erase the connection handle, which may be unknown
546 {
547 grpc_core::MutexLock lock(&connection_mu_);
548 if (known_connection_handles_.erase(handle) != 1) {
549 GRPC_TRACE_LOG(event_engine, INFO)
550 << "Unknown connection handle: " << handle;
551 return false;
552 }
553 }
554 auto* connection_state = reinterpret_cast<ConnectionState*>(handle.keys[0]);
555 grpc_core::MutexLock state_lock(&connection_state->mu());
556 // The connection cannot be cancelled if the deadline timer is already firing.
557 if (!Cancel(connection_state->timer_handle())) return false;
558 // The deadline timer was cancelled, so we must clean up its state.
559 connection_state->AbortDeadlineTimer();
560 // The on-connect callback will run when the socket shutdown event occurs.
561 return CancelConnectInternalStateLocked(connection_state);
562 }
563
CancelConnectFromDeadlineTimer(ConnectionState * connection_state)564 bool WindowsEventEngine::CancelConnectFromDeadlineTimer(
565 ConnectionState* connection_state) {
566 // Erase the connection handle, which is guaranteed to exist.
567 {
568 grpc_core::MutexLock lock(&connection_mu_);
569 if (known_connection_handles_.erase(
570 connection_state->connection_handle()) != 1) {
571 return false;
572 }
573 }
574 return CancelConnectInternalStateLocked(connection_state);
575 }
576
CancelConnectInternalStateLocked(ConnectionState * connection_state)577 bool WindowsEventEngine::CancelConnectInternalStateLocked(
578 ConnectionState* connection_state) {
579 connection_state->socket()->Shutdown(DEBUG_LOCATION, "CancelConnect");
580 // Release the connection_state shared_ptr owned by the connected callback.
581 GRPC_TRACE_LOG(event_engine, INFO) << "Successfully cancelled connection "
582 << connection_state->connection_handle();
583 return true;
584 }
585
586 absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
CreateListener(Listener::AcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const EndpointConfig & config,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)587 WindowsEventEngine::CreateListener(
588 Listener::AcceptCallback on_accept,
589 absl::AnyInvocable<void(absl::Status)> on_shutdown,
590 const EndpointConfig& config,
591 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
592 return std::make_unique<WindowsEventEngineListener>(
593 &iocp_, std::move(on_accept), std::move(on_shutdown),
594 std::move(memory_allocator_factory), shared_from_this(),
595 thread_pool_.get(), config);
596 }
597 } // namespace experimental
598 } // namespace grpc_event_engine
599
600 #endif // GPR_WINDOWS
601