• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #ifdef GPR_WINDOWS
17 
18 #include <grpc/event_engine/endpoint_config.h>
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/event_engine/memory_allocator.h>
21 #include <grpc/event_engine/slice_buffer.h>
22 #include <grpc/support/cpu.h>
23 
24 #include <memory>
25 #include <ostream>
26 
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "absl/status/status.h"
30 #include "absl/status/statusor.h"
31 #include "absl/strings/string_view.h"
32 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
33 #include "src/core/lib/event_engine/common_closures.h"
34 #include "src/core/lib/event_engine/handle_containers.h"
35 #include "src/core/lib/event_engine/posix_engine/timer_manager.h"
36 #include "src/core/lib/event_engine/tcp_socket_utils.h"
37 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
38 #include "src/core/lib/event_engine/utils.h"
39 #include "src/core/lib/event_engine/windows/grpc_polled_fd_windows.h"
40 #include "src/core/lib/event_engine/windows/iocp.h"
41 #include "src/core/lib/event_engine/windows/native_windows_dns_resolver.h"
42 #include "src/core/lib/event_engine/windows/windows_endpoint.h"
43 #include "src/core/lib/event_engine/windows/windows_engine.h"
44 #include "src/core/lib/event_engine/windows/windows_listener.h"
45 #include "src/core/lib/iomgr/error.h"
46 #include "src/core/lib/surface/init_internally.h"
47 #include "src/core/util/crash.h"
48 #include "src/core/util/dump_args.h"
49 #include "src/core/util/sync.h"
50 #include "src/core/util/time.h"
51 
52 namespace grpc_event_engine {
53 namespace experimental {
54 
operator <<(std::ostream & out,const WindowsEventEngine::ConnectionState & connection_state)55 std::ostream& operator<<(
56     std::ostream& out,
57     const WindowsEventEngine::ConnectionState& connection_state) {
58   out << "ConnectionState::" << &connection_state
59       << ": connection_state.address="
60       << ResolvedAddressToURI(connection_state.address_) << ","
61       << GRPC_DUMP_ARGS(connection_state.has_run_,
62                         connection_state.connection_handle_,
63                         connection_state.timer_handle_);
64   return out;
65 }
66 
67 // ---- ConnectionState ----
68 
ConnectionState(std::shared_ptr<WindowsEventEngine> engine,std::unique_ptr<WinSocket> socket,EventEngine::ResolvedAddress address,MemoryAllocator allocator,EventEngine::OnConnectCallback on_connect_user_callback)69 WindowsEventEngine::ConnectionState::ConnectionState(
70     std::shared_ptr<WindowsEventEngine> engine,
71     std::unique_ptr<WinSocket> socket, EventEngine::ResolvedAddress address,
72     MemoryAllocator allocator,
73     EventEngine::OnConnectCallback on_connect_user_callback)
74     : socket_(std::move(socket)),
75       address_(address),
76       allocator_(std::move(allocator)),
77       on_connect_user_callback_(std::move(on_connect_user_callback)),
78       engine_(std::move(engine)) {
79   CHECK(socket_ != nullptr);
80   connection_handle_ = ConnectionHandle{reinterpret_cast<intptr_t>(this),
81                                         engine_->aba_token_.fetch_add(1)};
82 }
83 
Start(Duration timeout)84 void WindowsEventEngine::ConnectionState::Start(Duration timeout) {
85   on_connected_cb_ =
86       std::make_unique<OnConnectedCallback>(engine_.get(), shared_from_this());
87   socket_->NotifyOnWrite(on_connected_cb_.get());
88   deadline_timer_cb_ = std::make_unique<DeadlineTimerCallback>(
89       engine_.get(), shared_from_this());
90   timer_handle_ = engine_->RunAfter(timeout, deadline_timer_cb_.get());
91 }
92 
93 EventEngine::OnConnectCallback
TakeCallback()94 WindowsEventEngine::ConnectionState::TakeCallback() {
95   return std::exchange(on_connect_user_callback_, nullptr);
96 }
97 
98 std::unique_ptr<WindowsEndpoint>
FinishConnectingAndMakeEndpoint(ThreadPool * thread_pool)99 WindowsEventEngine::ConnectionState::FinishConnectingAndMakeEndpoint(
100     ThreadPool* thread_pool) {
101   ChannelArgsEndpointConfig cfg;
102   return std::make_unique<WindowsEndpoint>(address_, std::move(socket_),
103                                            std::move(allocator_), cfg,
104                                            thread_pool, engine_);
105 }
106 
AbortOnConnect()107 void WindowsEventEngine::ConnectionState::AbortOnConnect() {
108   on_connected_cb_.reset();
109 }
110 
AbortDeadlineTimer()111 void WindowsEventEngine::ConnectionState::AbortDeadlineTimer() {
112   deadline_timer_cb_.reset();
113 }
114 
Run()115 void WindowsEventEngine::ConnectionState::OnConnectedCallback::Run() {
116   DCHECK_NE(connection_state_, nullptr)
117       << "ConnectionState::OnConnectedCallback::" << this
118       << " has already run. It should only ever run once.";
119   bool has_run;
120   {
121     grpc_core::MutexLock lock(&connection_state_->mu_);
122     has_run = std::exchange(connection_state_->has_run_, true);
123   }
124   // This could race with the deadline timer. If so, the engine's
125   // OnConnectCompleted callback should not run, and the refs should be
126   // released.
127   if (has_run) {
128     connection_state_.reset();
129     return;
130   }
131   engine_->OnConnectCompleted(std::move(connection_state_));
132 }
133 
Run()134 void WindowsEventEngine::ConnectionState::DeadlineTimerCallback::Run() {
135   DCHECK_NE(connection_state_, nullptr)
136       << "ConnectionState::DeadlineTimerCallback::" << this
137       << " has already run. It should only ever run once.";
138   bool has_run;
139   {
140     grpc_core::MutexLock lock(&connection_state_->mu_);
141     has_run = std::exchange(connection_state_->has_run_, true);
142   }
143   // This could race with the on connected callback. If so, the engine's
144   // OnDeadlineTimerFired callback should not run, and the refs should be
145   // released.
146   if (has_run) {
147     connection_state_.reset();
148     return;
149   }
150   engine_->OnDeadlineTimerFired(std::move(connection_state_));
151 }
152 
153 // ---- IOCPWorkClosure ----
154 
IOCPWorkClosure(ThreadPool * thread_pool,IOCP * iocp)155 WindowsEventEngine::IOCPWorkClosure::IOCPWorkClosure(ThreadPool* thread_pool,
156                                                      IOCP* iocp)
157     : thread_pool_(thread_pool), iocp_(iocp) {
158   thread_pool_->Run(this);
159 }
160 
Run()161 void WindowsEventEngine::IOCPWorkClosure::Run() {
162   if (done_signal_.HasBeenNotified()) return;
163   auto result = iocp_->Work(std::chrono::seconds(60), [this] {
164     workers_.fetch_add(1);
165     thread_pool_->Run(this);
166   });
167   if (result == Poller::WorkResult::kDeadlineExceeded) {
168     // iocp received no messages. restart the worker
169     workers_.fetch_add(1);
170     thread_pool_->Run(this);
171   }
172   if (workers_.fetch_sub(1) == 1) done_signal_.Notify();
173 }
174 
WaitForShutdown()175 void WindowsEventEngine::IOCPWorkClosure::WaitForShutdown() {
176   done_signal_.WaitForNotification();
177 }
178 
179 // ---- WindowsEventEngine ----
180 
181 // TODO(hork): The iomgr timer and execution engine can be reused. It should
182 // be separated out from the posix_engine and instantiated as components. It is
183 // effectively copied below.
184 
185 struct WindowsEventEngine::TimerClosure final : public EventEngine::Closure {
186   absl::AnyInvocable<void()> cb;
187   Timer timer;
188   WindowsEventEngine* engine;
189   EventEngine::TaskHandle handle;
190 
Rungrpc_event_engine::experimental::WindowsEventEngine::TimerClosure191   void Run() override {
192     GRPC_TRACE_LOG(event_engine, INFO)
193         << "WindowsEventEngine:" << engine << " executing callback:" << handle;
194     {
195       grpc_core::MutexLock lock(&engine->task_mu_);
196       engine->known_handles_.erase(handle);
197     }
198     cb();
199     delete this;
200   }
201 };
202 
WindowsEventEngine()203 WindowsEventEngine::WindowsEventEngine()
204     : thread_pool_(
205           MakeThreadPool(grpc_core::Clamp(gpr_cpu_num_cores(), 4u, 16u))),
206       iocp_(thread_pool_.get()),
207       timer_manager_(thread_pool_),
208       iocp_worker_(thread_pool_.get(), &iocp_) {
209   WSADATA wsaData;
210   int status = WSAStartup(MAKEWORD(2, 0), &wsaData);
211   CHECK_EQ(status, 0);
212 }
213 
~WindowsEventEngine()214 WindowsEventEngine::~WindowsEventEngine() {
215   GRPC_TRACE_LOG(event_engine, INFO) << "~WindowsEventEngine::" << this;
216   {
217     task_mu_.Lock();
218     if (!known_handles_.empty()) {
219       if (GRPC_TRACE_FLAG_ENABLED(event_engine)) {
220         for (auto handle : known_handles_) {
221           LOG(ERROR) << "WindowsEventEngine:" << this
222                      << " uncleared TaskHandle at shutdown:"
223                      << HandleToString<EventEngine::TaskHandle>(handle);
224         }
225       }
226       // Allow a small grace period for timers to be run before shutting down.
227       auto deadline =
228           timer_manager_.Now() + grpc_core::Duration::FromSecondsAsDouble(10);
229       while (!known_handles_.empty() && timer_manager_.Now() < deadline) {
230         if (GRPC_TRACE_FLAG_ENABLED(event_engine)) {
231           VLOG_EVERY_N_SEC(2, 1) << "Waiting for timers. "
232                                  << known_handles_.size() << " remaining";
233         }
234         task_mu_.Unlock();
235         absl::SleepFor(absl::Milliseconds(200));
236         task_mu_.Lock();
237       }
238     }
239     CHECK(GPR_LIKELY(known_handles_.empty()));
240     task_mu_.Unlock();
241   }
242   iocp_.Kick();
243   iocp_worker_.WaitForShutdown();
244   iocp_.Shutdown();
245   CHECK_EQ(WSACleanup(), 0);
246   timer_manager_.Shutdown();
247   thread_pool_->Quiesce();
248 }
249 
Cancel(EventEngine::TaskHandle handle)250 bool WindowsEventEngine::Cancel(EventEngine::TaskHandle handle) {
251   grpc_core::MutexLock lock(&task_mu_);
252   if (!known_handles_.contains(handle)) return false;
253   GRPC_TRACE_LOG(event_engine, INFO)
254       << "WindowsEventEngine::" << this << " cancelling " << handle;
255   auto* cd = reinterpret_cast<TimerClosure*>(handle.keys[0]);
256   bool r = timer_manager_.TimerCancel(&cd->timer);
257   known_handles_.erase(handle);
258   if (r) delete cd;
259   return r;
260 }
261 
RunAfter(Duration when,absl::AnyInvocable<void ()> closure)262 EventEngine::TaskHandle WindowsEventEngine::RunAfter(
263     Duration when, absl::AnyInvocable<void()> closure) {
264   return RunAfterInternal(when, std::move(closure));
265 }
266 
RunAfter(Duration when,EventEngine::Closure * closure)267 EventEngine::TaskHandle WindowsEventEngine::RunAfter(
268     Duration when, EventEngine::Closure* closure) {
269   return RunAfterInternal(when, [closure]() { closure->Run(); });
270 }
271 
Run(absl::AnyInvocable<void ()> closure)272 void WindowsEventEngine::Run(absl::AnyInvocable<void()> closure) {
273   thread_pool_->Run(std::move(closure));
274 }
275 
Run(EventEngine::Closure * closure)276 void WindowsEventEngine::Run(EventEngine::Closure* closure) {
277   thread_pool_->Run(closure);
278 }
279 
RunAfterInternal(Duration when,absl::AnyInvocable<void ()> cb)280 EventEngine::TaskHandle WindowsEventEngine::RunAfterInternal(
281     Duration when, absl::AnyInvocable<void()> cb) {
282   auto when_ts = ToTimestamp(timer_manager_.Now(), when);
283   auto* cd = new TimerClosure;
284   cd->cb = std::move(cb);
285   cd->engine = this;
286   EventEngine::TaskHandle handle{reinterpret_cast<intptr_t>(cd),
287                                  aba_token_.fetch_add(1)};
288   grpc_core::MutexLock lock(&task_mu_);
289   known_handles_.insert(handle);
290   cd->handle = handle;
291   GRPC_TRACE_LOG(event_engine, INFO)
292       << "WindowsEventEngine:" << this << " scheduling callback:" << handle;
293   timer_manager_.TimerInit(&cd->timer, when_ts, cd);
294   return handle;
295 }
296 
297 #if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
298 
WindowsDNSResolver(grpc_core::OrphanablePtr<AresResolver> ares_resolver)299 WindowsEventEngine::WindowsDNSResolver::WindowsDNSResolver(
300     grpc_core::OrphanablePtr<AresResolver> ares_resolver)
301     : ares_resolver_(std::move(ares_resolver)) {}
302 
LookupHostname(LookupHostnameCallback on_resolve,absl::string_view name,absl::string_view default_port)303 void WindowsEventEngine::WindowsDNSResolver::LookupHostname(
304     LookupHostnameCallback on_resolve, absl::string_view name,
305     absl::string_view default_port) {
306   ares_resolver_->LookupHostname(std::move(on_resolve), name, default_port);
307 }
308 
LookupSRV(LookupSRVCallback on_resolve,absl::string_view name)309 void WindowsEventEngine::WindowsDNSResolver::LookupSRV(
310     LookupSRVCallback on_resolve, absl::string_view name) {
311   ares_resolver_->LookupSRV(std::move(on_resolve), name);
312 }
313 
LookupTXT(LookupTXTCallback on_resolve,absl::string_view name)314 void WindowsEventEngine::WindowsDNSResolver::LookupTXT(
315     LookupTXTCallback on_resolve, absl::string_view name) {
316   ares_resolver_->LookupTXT(std::move(on_resolve), name);
317 }
318 
319 #endif  // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
320 
321 absl::StatusOr<std::unique_ptr<EventEngine::DNSResolver>>
GetDNSResolver(EventEngine::DNSResolver::ResolverOptions const & options)322 WindowsEventEngine::GetDNSResolver(
323     EventEngine::DNSResolver::ResolverOptions const& options) {
324   if (ShouldUseAresDnsResolver()) {
325 #if GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
326     GRPC_TRACE_LOG(event_engine_dns, INFO)
327         << "WindowsEventEngine::" << this << " creating AresResolver";
328     auto ares_resolver = AresResolver::CreateAresResolver(
329         options.dns_server,
330         std::make_unique<GrpcPolledFdFactoryWindows>(poller()),
331         shared_from_this());
332     if (!ares_resolver.ok()) {
333       return ares_resolver.status();
334     }
335     return std::make_unique<WindowsEventEngine::WindowsDNSResolver>(
336         std::move(*ares_resolver));
337 #endif  // GRPC_ARES == 1 && defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
338   }
339   GRPC_TRACE_LOG(event_engine_dns, INFO)
340       << "WindowsEventEngine::" << this << " creating NativeWindowsDNSResolver";
341   return std::make_unique<NativeWindowsDNSResolver>(shared_from_this());
342 }
343 
IsWorkerThread()344 bool WindowsEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
345 
OnConnectCompleted(std::shared_ptr<ConnectionState> state)346 void WindowsEventEngine::OnConnectCompleted(
347     std::shared_ptr<ConnectionState> state) {
348   absl::StatusOr<std::unique_ptr<WindowsEndpoint>> endpoint;
349   EventEngine::OnConnectCallback cb;
350   {
351     // Connection attempt complete!
352     grpc_core::MutexLock lock(&state->mu());
353     // return early if we cannot cancel the connection timeout timer.
354     int erased_handles = 0;
355     {
356       grpc_core::MutexLock handle_lock(&connection_mu_);
357       erased_handles =
358           known_connection_handles_.erase(state->connection_handle());
359     }
360     if (erased_handles != 1 || !Cancel(state->timer_handle())) {
361       GRPC_TRACE_LOG(event_engine, INFO)
362           << "Not accepting connection since the deadline timer has fired";
363       return;
364     }
365     // Release refs held by the deadline timer.
366     state->AbortDeadlineTimer();
367     const auto& overlapped_result = state->socket()->write_info()->result();
368     if (!overlapped_result.error_status.ok()) {
369       state->socket()->Shutdown(DEBUG_LOCATION, "ConnectEx failure");
370       endpoint = overlapped_result.error_status;
371     } else if (overlapped_result.wsa_error != 0) {
372       state->socket()->Shutdown(DEBUG_LOCATION, "ConnectEx failure");
373       endpoint = GRPC_WSA_ERROR(overlapped_result.wsa_error, "ConnectEx");
374     } else {
375       endpoint = state->FinishConnectingAndMakeEndpoint(thread_pool_.get());
376     }
377     cb = state->TakeCallback();
378   }
379   // This code should be running in a thread pool thread already, so the
380   // callback can be run directly.
381   state.reset();
382   cb(std::move(endpoint));
383 }
384 
OnDeadlineTimerFired(std::shared_ptr<ConnectionState> connection_state)385 void WindowsEventEngine::OnDeadlineTimerFired(
386     std::shared_ptr<ConnectionState> connection_state) {
387   bool cancelled = false;
388   EventEngine::OnConnectCallback cb;
389   {
390     grpc_core::MutexLock lock(&connection_state->mu());
391     cancelled = CancelConnectFromDeadlineTimer(connection_state.get());
392     if (cancelled) cb = connection_state->TakeCallback();
393   }
394   if (cancelled) {
395     connection_state.reset();
396     cb(absl::DeadlineExceededError("Connection timed out"));
397   }
398 }
399 
Connect(OnConnectCallback on_connect,const ResolvedAddress & addr,const EndpointConfig &,MemoryAllocator memory_allocator,Duration timeout)400 EventEngine::ConnectionHandle WindowsEventEngine::Connect(
401     OnConnectCallback on_connect, const ResolvedAddress& addr,
402     const EndpointConfig& /* args */, MemoryAllocator memory_allocator,
403     Duration timeout) {
404   // TODO(hork): utilize the endpoint config
405   absl::Status status;
406   int istatus;
407   auto uri = ResolvedAddressToURI(addr);
408   if (!uri.ok()) {
409     Run([on_connect = std::move(on_connect), status = uri.status()]() mutable {
410       on_connect(status);
411     });
412     return EventEngine::ConnectionHandle::kInvalid;
413   }
414   GRPC_TRACE_LOG(event_engine, INFO)
415       << "EventEngine::" << this << " connecting to " << *uri;
416   // Use dualstack sockets where available.
417   ResolvedAddress address = addr;
418   ResolvedAddress addr6_v4mapped;
419   if (ResolvedAddressToV4Mapped(addr, &addr6_v4mapped)) {
420     address = addr6_v4mapped;
421   }
422   const int addr_family =
423       (address.address()->sa_family == AF_UNIX) ? AF_UNIX : AF_INET6;
424   const int protocol = addr_family == AF_UNIX ? 0 : IPPROTO_TCP;
425   SOCKET sock = WSASocket(addr_family, SOCK_STREAM, protocol, nullptr, 0,
426                           IOCP::GetDefaultSocketFlags());
427   if (sock == INVALID_SOCKET) {
428     Run([on_connect = std::move(on_connect),
429          status = GRPC_WSA_ERROR(WSAGetLastError(), "WSASocket")]() mutable {
430       on_connect(status);
431     });
432     return EventEngine::ConnectionHandle::kInvalid;
433   }
434   if (addr_family == AF_UNIX) {
435     status = SetSocketNonBlock(sock);
436   } else {
437     status = PrepareSocket(sock);
438   }
439   if (!status.ok()) {
440     Run([on_connect = std::move(on_connect), status]() mutable {
441       on_connect(status);
442     });
443     return EventEngine::ConnectionHandle::kInvalid;
444   }
445   // Grab the function pointer for ConnectEx for that specific socket It may
446   // change depending on the interface.
447   LPFN_CONNECTEX ConnectEx;
448   GUID guid = WSAID_CONNECTEX;
449   DWORD ioctl_num_bytes;
450   istatus = WSAIoctl(sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid,
451                      sizeof(guid), &ConnectEx, sizeof(ConnectEx),
452                      &ioctl_num_bytes, nullptr, nullptr);
453   if (istatus != 0) {
454     Run([on_connect = std::move(on_connect),
455          status = GRPC_WSA_ERROR(
456              WSAGetLastError(),
457              "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)")]() mutable {
458       on_connect(status);
459     });
460     return EventEngine::ConnectionHandle::kInvalid;
461   }
462   // bind the local address
463   ResolvedAddress local_address;
464   if (addr_family == AF_UNIX) {
465     // For ConnectEx() to work for AF_UNIX, the sock needs to be bound to
466     // the local address of an unnamed socket.
467     sockaddr addr = {};
468     addr.sa_family = AF_UNIX;
469     local_address = ResolvedAddress(&addr, sizeof(addr));
470   } else {
471     local_address = ResolvedAddressMakeWild6(0);
472   }
473   istatus = bind(sock, local_address.address(), local_address.size());
474   if (istatus != 0) {
475     Run([on_connect = std::move(on_connect),
476          status = GRPC_WSA_ERROR(WSAGetLastError(), "bind")]() mutable {
477       on_connect(status);
478     });
479     return EventEngine::ConnectionHandle::kInvalid;
480   }
481   // Prepare the socket to receive a connection
482   auto connection_state = std::make_shared<ConnectionState>(
483       std::static_pointer_cast<WindowsEventEngine>(shared_from_this()),
484       /*socket=*/iocp_.Watch(sock), address,
485       /*memory_allocator=*/std::move(memory_allocator),
486       /*on_connect_user_callback=*/std::move(on_connect));
487   grpc_core::MutexLock lock(&connection_state->mu());
488   auto* info = connection_state->socket()->write_info();
489   {
490     grpc_core::MutexLock connection_handle_lock(&connection_mu_);
491     known_connection_handles_.insert(connection_state->connection_handle());
492   }
493   connection_state->Start(timeout);
494   bool success =
495       ConnectEx(connection_state->socket()->raw_socket(), address.address(),
496                 address.size(), nullptr, 0, nullptr, info->overlapped());
497   // It wouldn't be unusual to get a success immediately. But we'll still get an
498   // IOCP notification, so let's ignore it.
499   if (success) return connection_state->connection_handle();
500   // Otherwise, we need to handle an error or a pending IO Event.
501   int last_error = WSAGetLastError();
502   if (last_error == ERROR_IO_PENDING) {
503     // Overlapped I/O operation is in progress.
504     return connection_state->connection_handle();
505   }
506   // Time to abort the connection.
507   // The on-connect callback won't run, so we must clean up its state.
508   connection_state->AbortOnConnect();
509   int erased_handles = 0;
510   {
511     grpc_core::MutexLock connection_handle_lock(&connection_mu_);
512     erased_handles =
513         known_connection_handles_.erase(connection_state->connection_handle());
514   }
515   CHECK_EQ(erased_handles, 1) << "Did not find connection handle "
516                               << connection_state->connection_handle()
517                               << " after a synchronous connection failure. "
518                                  "This should not be possible.";
519   connection_state->socket()->Shutdown(DEBUG_LOCATION, "ConnectEx");
520   if (!Cancel(connection_state->timer_handle())) {
521     // The deadline timer will run, or is running.
522     return EventEngine::ConnectionHandle::kInvalid;
523   }
524   // The deadline timer won't run, so we must clean up its state.
525   connection_state->AbortDeadlineTimer();
526   Run([connection_state = std::move(connection_state),
527        status = GRPC_WSA_ERROR(WSAGetLastError(), "ConnectEx")]() mutable {
528     EventEngine::OnConnectCallback cb;
529     {
530       grpc_core::MutexLock lock(&connection_state->mu());
531       cb = connection_state->TakeCallback();
532     }
533     connection_state.reset();
534     cb(std::move(status));
535   });
536   return EventEngine::ConnectionHandle::kInvalid;
537 }
538 
CancelConnect(EventEngine::ConnectionHandle handle)539 bool WindowsEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) {
540   if (handle == EventEngine::ConnectionHandle::kInvalid) {
541     GRPC_TRACE_LOG(event_engine, INFO)
542         << "Attempted to cancel an invalid connection handle";
543     return false;
544   }
545   // Erase the connection handle, which may be unknown
546   {
547     grpc_core::MutexLock lock(&connection_mu_);
548     if (known_connection_handles_.erase(handle) != 1) {
549       GRPC_TRACE_LOG(event_engine, INFO)
550           << "Unknown connection handle: " << handle;
551       return false;
552     }
553   }
554   auto* connection_state = reinterpret_cast<ConnectionState*>(handle.keys[0]);
555   grpc_core::MutexLock state_lock(&connection_state->mu());
556   // The connection cannot be cancelled if the deadline timer is already firing.
557   if (!Cancel(connection_state->timer_handle())) return false;
558   // The deadline timer was cancelled, so we must clean up its state.
559   connection_state->AbortDeadlineTimer();
560   // The on-connect callback will run when the socket shutdown event occurs.
561   return CancelConnectInternalStateLocked(connection_state);
562 }
563 
CancelConnectFromDeadlineTimer(ConnectionState * connection_state)564 bool WindowsEventEngine::CancelConnectFromDeadlineTimer(
565     ConnectionState* connection_state) {
566   // Erase the connection handle, which is guaranteed to exist.
567   {
568     grpc_core::MutexLock lock(&connection_mu_);
569     if (known_connection_handles_.erase(
570             connection_state->connection_handle()) != 1) {
571       return false;
572     }
573   }
574   return CancelConnectInternalStateLocked(connection_state);
575 }
576 
CancelConnectInternalStateLocked(ConnectionState * connection_state)577 bool WindowsEventEngine::CancelConnectInternalStateLocked(
578     ConnectionState* connection_state) {
579   connection_state->socket()->Shutdown(DEBUG_LOCATION, "CancelConnect");
580   // Release the connection_state shared_ptr owned by the connected callback.
581   GRPC_TRACE_LOG(event_engine, INFO) << "Successfully cancelled connection "
582                                      << connection_state->connection_handle();
583   return true;
584 }
585 
586 absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
CreateListener(Listener::AcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const EndpointConfig & config,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)587 WindowsEventEngine::CreateListener(
588     Listener::AcceptCallback on_accept,
589     absl::AnyInvocable<void(absl::Status)> on_shutdown,
590     const EndpointConfig& config,
591     std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
592   return std::make_unique<WindowsEventEngineListener>(
593       &iocp_, std::move(on_accept), std::move(on_shutdown),
594       std::move(memory_allocator_factory), shared_from_this(),
595       thread_pool_.get(), config);
596 }
597 }  // namespace experimental
598 }  // namespace grpc_event_engine
599 
600 #endif  // GPR_WINDOWS
601