• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/posix_engine/posix_engine.h"
15 
16 #include <grpc/event_engine/event_engine.h>
17 #include <grpc/event_engine/memory_allocator.h>
18 #include <grpc/event_engine/slice_buffer.h>
19 #include <grpc/support/cpu.h>
20 #include <grpc/support/port_platform.h>
21 
22 #include <algorithm>
23 #include <atomic>
24 #include <chrono>
25 #include <cstdint>
26 #include <cstring>
27 #include <memory>
28 #include <string>
29 #include <type_traits>
30 #include <utility>
31 
32 #include "absl/cleanup/cleanup.h"
33 #include "absl/functional/any_invocable.h"
34 #include "absl/log/check.h"
35 #include "absl/log/log.h"
36 #include "absl/status/status.h"
37 #include "absl/strings/match.h"
38 #include "absl/strings/str_cat.h"
39 #include "src/core/lib/debug/trace.h"
40 #include "src/core/lib/event_engine/ares_resolver.h"
41 #include "src/core/lib/event_engine/forkable.h"
42 #include "src/core/lib/event_engine/grpc_polled_fd.h"
43 #include "src/core/lib/event_engine/poller.h"
44 #include "src/core/lib/event_engine/posix.h"
45 #include "src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h"
46 #include "src/core/lib/event_engine/posix_engine/native_posix_dns_resolver.h"
47 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
48 #include "src/core/lib/event_engine/posix_engine/timer.h"
49 #include "src/core/lib/event_engine/tcp_socket_utils.h"
50 #include "src/core/lib/event_engine/utils.h"
51 #include "src/core/lib/experiments/experiments.h"
52 #include "src/core/util/crash.h"
53 #include "src/core/util/no_destruct.h"
54 #include "src/core/util/sync.h"
55 #include "src/core/util/useful.h"
56 
57 #ifdef GRPC_POSIX_SOCKET_TCP
58 #include <errno.h>       // IWYU pragma: keep
59 #include <stdint.h>      // IWYU pragma: keep
60 #include <sys/socket.h>  // IWYU pragma: keep
61 
62 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
63 #include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
64 #include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
65 #include "src/core/lib/event_engine/posix_engine/posix_engine_listener.h"
66 #endif  // GRPC_POSIX_SOCKET_TCP
67 
68 // IWYU pragma: no_include <ratio>
69 
70 // TODO(eryu): remove this GRPC_CFSTREAM condition when the CFEngine is ready.
71 // The posix poller currently crashes iOS.
72 #if defined(GRPC_POSIX_SOCKET_TCP) && !defined(GRPC_CFSTREAM) && \
73     !defined(GRPC_DO_NOT_INSTANTIATE_POSIX_POLLER)
74 #define GRPC_PLATFORM_SUPPORTS_POSIX_POLLING true
75 #else
76 #define GRPC_PLATFORM_SUPPORTS_POSIX_POLLING false
77 #endif
78 
79 using namespace std::chrono_literals;
80 
81 namespace grpc_event_engine {
82 namespace experimental {
83 
84 namespace {
85 
86 grpc_core::NoDestruct<ObjectGroupForkHandler> g_timer_fork_manager;
87 
88 class TimerForkCallbackMethods {
89  public:
Prefork()90   static void Prefork() { g_timer_fork_manager->Prefork(); }
PostforkParent()91   static void PostforkParent() { g_timer_fork_manager->PostforkParent(); }
PostforkChild()92   static void PostforkChild() { g_timer_fork_manager->PostforkChild(); }
93 };
94 
95 }  // namespace
96 
97 #ifdef GRPC_POSIX_SOCKET_TCP
98 
Start(EventEngine::Duration timeout)99 void AsyncConnect::Start(EventEngine::Duration timeout) {
100   on_writable_ = PosixEngineClosure::ToPermanentClosure(
101       [this](absl::Status status) { OnWritable(std::move(status)); });
102   alarm_handle_ = engine_->RunAfter(timeout, [this]() {
103     OnTimeoutExpired(absl::DeadlineExceededError("connect() timed out"));
104   });
105   fd_->NotifyOnWrite(on_writable_);
106 }
107 
~AsyncConnect()108 AsyncConnect ::~AsyncConnect() { delete on_writable_; }
109 
OnTimeoutExpired(absl::Status status)110 void AsyncConnect::OnTimeoutExpired(absl::Status status) {
111   bool done = false;
112   {
113     grpc_core::MutexLock lock(&mu_);
114     if (fd_ != nullptr) {
115       fd_->ShutdownHandle(std::move(status));
116     }
117     done = (--refs_ == 0);
118   }
119   if (done) {
120     delete this;
121   }
122 }
123 
OnWritable(absl::Status status)124 void AsyncConnect::OnWritable(absl::Status status)
125     ABSL_NO_THREAD_SAFETY_ANALYSIS {
126   int so_error = 0;
127   socklen_t so_error_size;
128   int err;
129   int done;
130   int consumed_refs = 1;
131   EventHandle* fd;
132   absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> ep;
133 
134   mu_.Lock();
135   CHECK_NE(fd_, nullptr);
136   fd = std::exchange(fd_, nullptr);
137   bool connect_cancelled = connect_cancelled_;
138   if (fd->IsHandleShutdown() && status.ok()) {
139     if (!connect_cancelled) {
140       // status is OK and handle has been shutdown but the connect was not
141       // cancelled. This can happen if the timeout expired and the while the
142       // OnWritable just started executing.
143       status = absl::DeadlineExceededError("connect() timed out");
144     } else {
145       // This can happen if the connection was cancelled while the OnWritable
146       // just started executing.
147       status = absl::FailedPreconditionError("Connection cancelled");
148     }
149   }
150   mu_.Unlock();
151 
152   if (engine_->Cancel(alarm_handle_)) {
153     ++consumed_refs;
154   }
155 
156   auto on_writable_finish = absl::MakeCleanup([&]() -> void {
157     mu_.AssertHeld();
158     if (!connect_cancelled) {
159       reinterpret_cast<PosixEventEngine*>(engine_.get())
160           ->OnConnectFinishInternal(connection_handle_);
161     }
162     if (fd != nullptr) {
163       fd->OrphanHandle(nullptr, nullptr, "tcp_client_orphan");
164       fd = nullptr;
165     }
166     if (!status.ok()) {
167       ep = absl::UnknownError(
168           absl::StrCat("Failed to connect to remote host: ", status.message()));
169     }
170     // Run the OnConnect callback asynchronously.
171     if (!connect_cancelled) {
172       executor_->Run(
173           [ep = std::move(ep), on_connect = std::move(on_connect_)]() mutable {
174             if (on_connect) {
175               on_connect(std::move(ep));
176             }
177           });
178     }
179     done = ((refs_ -= consumed_refs) == 0);
180     mu_.Unlock();
181     if (done) {
182       delete this;
183     }
184   });
185 
186   mu_.Lock();
187   if (!status.ok() || connect_cancelled) {
188     return;
189   }
190 
191   do {
192     so_error_size = sizeof(so_error);
193     err = getsockopt(fd->WrappedFd(), SOL_SOCKET, SO_ERROR, &so_error,
194                      &so_error_size);
195   } while (err < 0 && errno == EINTR);
196   if (err < 0) {
197     status = absl::FailedPreconditionError(
198         absl::StrCat("getsockopt: ", std::strerror(errno)));
199     return;
200   }
201 
202   switch (so_error) {
203     case 0:
204       ep = CreatePosixEndpoint(fd, nullptr, engine_, std::move(allocator_),
205                                options_);
206       fd = nullptr;
207       break;
208     case ENOBUFS:
209       // We will get one of these errors if we have run out of
210       // memory in the kernel for the data structures allocated
211       // when you connect a socket.  If this happens it is very
212       // likely that if we wait a little bit then try again the
213       // connection will work (since other programs or this
214       // program will close their network connections and free up
215       // memory).  This does _not_ indicate that there is anything
216       // wrong with the server we are connecting to, this is a
217       // local problem.
218 
219       // If you are looking at this code, then chances are that
220       // your program or another program on the same computer
221       // opened too many network connections.  The "easy" fix:
222       // don't do that!
223       LOG(ERROR) << "kernel out of buffers";
224       mu_.Unlock();
225       fd->NotifyOnWrite(on_writable_);
226       // Don't run the cleanup function for this case.
227       std::move(on_writable_finish).Cancel();
228       return;
229     case ECONNREFUSED:
230       // This error shouldn't happen for anything other than connect().
231       status = absl::FailedPreconditionError(std::strerror(so_error));
232       break;
233     default:
234       // We don't really know which syscall triggered the problem here, so
235       // punt by reporting getsockopt().
236       status = absl::FailedPreconditionError(
237           absl::StrCat("getsockopt(SO_ERROR): ", std::strerror(so_error)));
238       break;
239   }
240 }
241 
242 EventEngine::ConnectionHandle
CreateEndpointFromUnconnectedFdInternal(int fd,EventEngine::OnConnectCallback on_connect,const EventEngine::ResolvedAddress & addr,const PosixTcpOptions & tcp_options,MemoryAllocator memory_allocator,EventEngine::Duration timeout)243 PosixEventEngine::CreateEndpointFromUnconnectedFdInternal(
244     int fd, EventEngine::OnConnectCallback on_connect,
245     const EventEngine::ResolvedAddress& addr,
246     const PosixTcpOptions& tcp_options, MemoryAllocator memory_allocator,
247     EventEngine::Duration timeout) {
248   int err;
249   int connect_errno;
250   do {
251     err = connect(fd, addr.address(), addr.size());
252   } while (err < 0 && errno == EINTR);
253   connect_errno = (err < 0) ? errno : 0;
254 
255   auto addr_uri = ResolvedAddressToURI(addr);
256   if (!addr_uri.ok()) {
257     Run([on_connect = std::move(on_connect),
258          ep = absl::FailedPreconditionError(absl::StrCat(
259              "connect failed: ", "invalid addr: ",
260              addr_uri.value()))]() mutable { on_connect(std::move(ep)); });
261     return EventEngine::ConnectionHandle::kInvalid;
262   }
263 
264   std::string name = absl::StrCat("tcp-client:", addr_uri.value());
265   PosixEventPoller* poller = poller_manager_->Poller();
266   EventHandle* handle =
267       poller->CreateHandle(fd, name, poller->CanTrackErrors());
268 
269   if (connect_errno == 0) {
270     // Connection already succeeded. Return 0 to discourage any cancellation
271     // attempts.
272     Run([on_connect = std::move(on_connect),
273          ep = CreatePosixEndpoint(
274              handle, nullptr, shared_from_this(), std::move(memory_allocator),
275              tcp_options)]() mutable { on_connect(std::move(ep)); });
276     return EventEngine::ConnectionHandle::kInvalid;
277   }
278   if (connect_errno != EWOULDBLOCK && connect_errno != EINPROGRESS) {
279     // Connection already failed. Return 0 to discourage any cancellation
280     // attempts.
281     handle->OrphanHandle(nullptr, nullptr, "tcp_client_connect_error");
282     Run([on_connect = std::move(on_connect),
283          ep = absl::FailedPreconditionError(absl::StrCat(
284              "connect failed: ", "addr: ", addr_uri.value(),
285              " error: ", std::strerror(connect_errno)))]() mutable {
286       on_connect(std::move(ep));
287     });
288     return EventEngine::ConnectionHandle::kInvalid;
289   }
290   // Connection is still in progress.
291   int64_t connection_id =
292       last_connection_id_.fetch_add(1, std::memory_order_acq_rel);
293   AsyncConnect* ac =
294       new AsyncConnect(std::move(on_connect), shared_from_this(),
295                        executor_.get(), handle, std::move(memory_allocator),
296                        tcp_options, addr_uri.value(), connection_id);
297   int shard_number = connection_id % connection_shards_.size();
298   struct ConnectionShard* shard = &connection_shards_[shard_number];
299   {
300     grpc_core::MutexLock lock(&shard->mu);
301     shard->pending_connections.insert_or_assign(connection_id, ac);
302   }
303   // Start asynchronous connect and return the connection id.
304   ac->Start(timeout);
305   return {static_cast<intptr_t>(connection_id), 0};
306 }
307 
OnConnectFinishInternal(int connection_handle)308 void PosixEventEngine::OnConnectFinishInternal(int connection_handle) {
309   int shard_number = connection_handle % connection_shards_.size();
310   struct ConnectionShard* shard = &connection_shards_[shard_number];
311   {
312     grpc_core::MutexLock lock(&shard->mu);
313     shard->pending_connections.erase(connection_handle);
314   }
315 }
316 
PosixEnginePollerManager(std::shared_ptr<ThreadPool> executor)317 PosixEnginePollerManager::PosixEnginePollerManager(
318     std::shared_ptr<ThreadPool> executor)
319     : poller_(grpc_event_engine::experimental::MakeDefaultPoller(this)),
320       executor_(std::move(executor)),
321       trigger_shutdown_called_(false) {}
322 
PosixEnginePollerManager(std::shared_ptr<PosixEventPoller> poller)323 PosixEnginePollerManager::PosixEnginePollerManager(
324     std::shared_ptr<PosixEventPoller> poller)
325     : poller_(std::move(poller)),
326       poller_state_(PollerState::kExternal),
327       executor_(nullptr),
328       trigger_shutdown_called_(false) {
329   DCHECK_NE(poller_, nullptr);
330 }
331 
Run(experimental::EventEngine::Closure * closure)332 void PosixEnginePollerManager::Run(
333     experimental::EventEngine::Closure* closure) {
334   if (executor_ != nullptr) {
335     executor_->Run(closure);
336   }
337 }
338 
Run(absl::AnyInvocable<void ()> cb)339 void PosixEnginePollerManager::Run(absl::AnyInvocable<void()> cb) {
340   if (executor_ != nullptr) {
341     executor_->Run(std::move(cb));
342   }
343 }
344 
TriggerShutdown()345 void PosixEnginePollerManager::TriggerShutdown() {
346   DCHECK(trigger_shutdown_called_ == false);
347   trigger_shutdown_called_ = true;
348   // If the poller is external, dont try to shut it down. Otherwise
349   // set poller state to PollerState::kShuttingDown.
350   if (poller_state_.exchange(PollerState::kShuttingDown) ==
351       PollerState::kExternal) {
352     poller_ = nullptr;
353     return;
354   }
355   poller_->Kick();
356 }
357 
~PosixEnginePollerManager()358 PosixEnginePollerManager::~PosixEnginePollerManager() {
359   if (poller_ != nullptr) {
360     poller_->Shutdown();
361   }
362 }
363 
PosixEventEngine(std::shared_ptr<PosixEventPoller> poller)364 PosixEventEngine::PosixEventEngine(std::shared_ptr<PosixEventPoller> poller)
365     : grpc_core::KeepsGrpcInitialized(
366           /*enabled=*/!grpc_core::IsPosixEeSkipGrpcInitEnabled()),
367       connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)),
368       executor_(MakeThreadPool(grpc_core::Clamp(gpr_cpu_num_cores(), 4u, 16u))),
369       timer_manager_(std::make_shared<TimerManager>(executor_)) {
370   g_timer_fork_manager->RegisterForkable(
371       timer_manager_, TimerForkCallbackMethods::Prefork,
372       TimerForkCallbackMethods::PostforkParent,
373       TimerForkCallbackMethods::PostforkChild);
374 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
375   poller_manager_ = std::make_shared<PosixEnginePollerManager>(poller);
376 #endif
377 }
378 
PosixEventEngine()379 PosixEventEngine::PosixEventEngine()
380     : grpc_core::KeepsGrpcInitialized(
381           /*enabled=*/!grpc_core::IsPosixEeSkipGrpcInitEnabled()),
382       connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)),
383       executor_(MakeThreadPool(grpc_core::Clamp(gpr_cpu_num_cores(), 4u, 16u))),
384       timer_manager_(std::make_shared<TimerManager>(executor_)) {
385   g_timer_fork_manager->RegisterForkable(
386       timer_manager_, TimerForkCallbackMethods::Prefork,
387       TimerForkCallbackMethods::PostforkParent,
388       TimerForkCallbackMethods::PostforkChild);
389 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
390   poller_manager_ = std::make_shared<PosixEnginePollerManager>(executor_);
391   // The threadpool must be instantiated after the poller otherwise, the
392   // process will deadlock when forking.
393   if (poller_manager_->Poller() != nullptr) {
394     executor_->Run([poller_manager = poller_manager_]() {
395       PollerWorkInternal(poller_manager);
396     });
397   }
398 #endif  // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
399 }
400 
PollerWorkInternal(std::shared_ptr<PosixEnginePollerManager> poller_manager)401 void PosixEventEngine::PollerWorkInternal(
402     std::shared_ptr<PosixEnginePollerManager> poller_manager) {
403   // TODO(vigneshbabu): The timeout specified here is arbitrary. For instance,
404   // this can be improved by setting the timeout to the next expiring timer.
405   PosixEventPoller* poller = poller_manager->Poller();
406   ThreadPool* executor = poller_manager->Executor();
407   auto result = poller->Work(24h, [executor, &poller_manager]() {
408     executor->Run([poller_manager]() mutable {
409       PollerWorkInternal(std::move(poller_manager));
410     });
411   });
412   if (result == Poller::WorkResult::kDeadlineExceeded) {
413     // The EventEngine is not shutting down but the next asynchronous
414     // PollerWorkInternal did not get scheduled. Schedule it now.
415     executor->Run([poller_manager = std::move(poller_manager)]() {
416       PollerWorkInternal(poller_manager);
417     });
418   } else if (result == Poller::WorkResult::kKicked &&
419              poller_manager->IsShuttingDown()) {
420     // The Poller Got Kicked and poller_state_ is set to
421     // PollerState::kShuttingDown. This can currently happen only from the
422     // EventEngine destructor. Sample the use_count of poller_manager. If the
423     // sampled use_count is > 1, there is one more instance of Work(...)
424     // which hasn't returned yet. Send another Kick to be safe to ensure the
425     // pending instance of Work(..) also breaks out. Its possible that the other
426     // instance of Work(..) had already broken out before this Kick is sent. In
427     // that case, the Kick is spurious but it shouldn't cause any side effects.
428     if (poller_manager.use_count() > 1) {
429       poller->Kick();
430     }
431   }
432 }
433 
434 #endif  // GRPC_POSIX_SOCKET_TCP
435 
436 struct PosixEventEngine::ClosureData final : public EventEngine::Closure {
437   absl::AnyInvocable<void()> cb;
438   Timer timer;
439   PosixEventEngine* engine;
440   EventEngine::TaskHandle handle;
441 
Rungrpc_event_engine::experimental::PosixEventEngine::ClosureData442   void Run() override {
443     GRPC_TRACE_LOG(event_engine, INFO)
444         << "PosixEventEngine:" << engine << " executing callback:" << handle;
445     {
446       grpc_core::MutexLock lock(&engine->mu_);
447       engine->known_handles_.erase(handle);
448     }
449     cb();
450     delete this;
451   }
452 };
453 
~PosixEventEngine()454 PosixEventEngine::~PosixEventEngine() {
455   {
456     grpc_core::MutexLock lock(&mu_);
457     if (GRPC_TRACE_FLAG_ENABLED(event_engine)) {
458       for (auto handle : known_handles_) {
459         LOG(ERROR) << "(event_engine) PosixEventEngine:" << this
460                    << " uncleared TaskHandle at shutdown:"
461                    << HandleToString(handle);
462       }
463     }
464     CHECK(GPR_LIKELY(known_handles_.empty()));
465   }
466   timer_manager_->Shutdown();
467 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
468   if (poller_manager_ != nullptr) {
469     poller_manager_->TriggerShutdown();
470   }
471 #endif  // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
472   executor_->Quiesce();
473 }
474 
Cancel(EventEngine::TaskHandle handle)475 bool PosixEventEngine::Cancel(EventEngine::TaskHandle handle) {
476   grpc_core::MutexLock lock(&mu_);
477   if (!known_handles_.contains(handle)) return false;
478   auto* cd = reinterpret_cast<ClosureData*>(handle.keys[0]);
479   bool r = timer_manager_->TimerCancel(&cd->timer);
480   known_handles_.erase(handle);
481   if (r) delete cd;
482   return r;
483 }
484 
RunAfter(Duration when,absl::AnyInvocable<void ()> closure)485 EventEngine::TaskHandle PosixEventEngine::RunAfter(
486     Duration when, absl::AnyInvocable<void()> closure) {
487   return RunAfterInternal(when, std::move(closure));
488 }
489 
RunAfter(Duration when,EventEngine::Closure * closure)490 EventEngine::TaskHandle PosixEventEngine::RunAfter(
491     Duration when, EventEngine::Closure* closure) {
492   return RunAfterInternal(when, [closure]() { closure->Run(); });
493 }
494 
Run(absl::AnyInvocable<void ()> closure)495 void PosixEventEngine::Run(absl::AnyInvocable<void()> closure) {
496   executor_->Run(std::move(closure));
497 }
498 
Run(EventEngine::Closure * closure)499 void PosixEventEngine::Run(EventEngine::Closure* closure) {
500   executor_->Run(closure);
501 }
502 
RunAfterInternal(Duration when,absl::AnyInvocable<void ()> cb)503 EventEngine::TaskHandle PosixEventEngine::RunAfterInternal(
504     Duration when, absl::AnyInvocable<void()> cb) {
505   if (when <= Duration::zero()) {
506     Run(std::move(cb));
507     return TaskHandle::kInvalid;
508   }
509   auto when_ts = ToTimestamp(timer_manager_->Now(), when);
510   auto* cd = new ClosureData;
511   cd->cb = std::move(cb);
512   cd->engine = this;
513   EventEngine::TaskHandle handle{reinterpret_cast<intptr_t>(cd),
514                                  aba_token_.fetch_add(1)};
515   grpc_core::MutexLock lock(&mu_);
516   known_handles_.insert(handle);
517   cd->handle = handle;
518   GRPC_TRACE_LOG(event_engine, INFO)
519       << "PosixEventEngine:" << this << " scheduling callback:" << handle;
520   timer_manager_->TimerInit(&cd->timer, when_ts, cd);
521   return handle;
522 }
523 
PosixDNSResolver(grpc_core::OrphanablePtr<RefCountedDNSResolverInterface> dns_resolver)524 PosixEventEngine::PosixDNSResolver::PosixDNSResolver(
525     grpc_core::OrphanablePtr<RefCountedDNSResolverInterface> dns_resolver)
526     : dns_resolver_(std::move(dns_resolver)) {}
527 
LookupHostname(LookupHostnameCallback on_resolve,absl::string_view name,absl::string_view default_port)528 void PosixEventEngine::PosixDNSResolver::LookupHostname(
529     LookupHostnameCallback on_resolve, absl::string_view name,
530     absl::string_view default_port) {
531   dns_resolver_->LookupHostname(std::move(on_resolve), name, default_port);
532 }
533 
LookupSRV(LookupSRVCallback on_resolve,absl::string_view name)534 void PosixEventEngine::PosixDNSResolver::LookupSRV(LookupSRVCallback on_resolve,
535                                                    absl::string_view name) {
536   dns_resolver_->LookupSRV(std::move(on_resolve), name);
537 }
538 
LookupTXT(LookupTXTCallback on_resolve,absl::string_view name)539 void PosixEventEngine::PosixDNSResolver::LookupTXT(LookupTXTCallback on_resolve,
540                                                    absl::string_view name) {
541   dns_resolver_->LookupTXT(std::move(on_resolve), name);
542 }
543 
544 absl::StatusOr<std::unique_ptr<EventEngine::DNSResolver>>
GetDNSResolver(GRPC_UNUSED const EventEngine::DNSResolver::ResolverOptions & options)545 PosixEventEngine::GetDNSResolver(
546     GRPC_UNUSED const EventEngine::DNSResolver::ResolverOptions& options) {
547 #ifndef GRPC_POSIX_SOCKET_RESOLVE_ADDRESS
548   grpc_core::Crash("Unable to get DNS resolver for this platform.");
549 #else  // GRPC_POSIX_SOCKET_RESOLVE_ADDRESS
550   // If c-ares is supported on the platform, build according to user's
551   // configuration.
552   if (ShouldUseAresDnsResolver()) {
553 #if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
554     GRPC_TRACE_LOG(event_engine_dns, INFO)
555         << "PosixEventEngine::" << this << " creating AresResolver";
556     auto ares_resolver = AresResolver::CreateAresResolver(
557         options.dns_server,
558         std::make_unique<GrpcPolledFdFactoryPosix>(poller_manager_->Poller()),
559         shared_from_this());
560     if (!ares_resolver.ok()) {
561       return ares_resolver.status();
562     }
563     return std::make_unique<PosixEventEngine::PosixDNSResolver>(
564         std::move(*ares_resolver));
565 #endif  // GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
566   }
567   GRPC_TRACE_LOG(event_engine_dns, INFO)
568       << "PosixEventEngine::" << this << " creating NativePosixDNSResolver";
569   return std::make_unique<NativePosixDNSResolver>(shared_from_this());
570 #endif  // GRPC_POSIX_SOCKET_RESOLVE_ADDRESS
571 }
572 
IsWorkerThread()573 bool PosixEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
574 
CancelConnect(EventEngine::ConnectionHandle handle)575 bool PosixEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) {
576 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
577   int connection_handle = handle.keys[0];
578   if (connection_handle <= 0) {
579     return false;
580   }
581   int shard_number = connection_handle % connection_shards_.size();
582   struct ConnectionShard* shard = &connection_shards_[shard_number];
583   AsyncConnect* ac = nullptr;
584   {
585     grpc_core::MutexLock lock(&shard->mu);
586     auto it = shard->pending_connections.find(connection_handle);
587     if (it != shard->pending_connections.end()) {
588       ac = it->second;
589       CHECK_NE(ac, nullptr);
590       // Trying to acquire ac->mu here would could cause a deadlock because
591       // the OnWritable method tries to acquire the two mutexes used
592       // here in the reverse order. But we dont need to acquire ac->mu before
593       // incrementing ac->refs here. This is because the OnWritable
594       // method decrements ac->refs only after deleting the connection handle
595       // from the corresponding hashmap. If the code enters here, it means
596       // that deletion hasn't happened yet. The deletion can only happen after
597       // the corresponding g_shard_mu is unlocked.
598       ++ac->refs_;
599       // Remove connection from list of active connections.
600       shard->pending_connections.erase(it);
601     }
602   }
603   if (ac == nullptr) {
604     return false;
605   }
606   ac->mu_.Lock();
607   bool connection_cancel_success = (ac->fd_ != nullptr);
608   if (connection_cancel_success) {
609     // Connection is still pending. The OnWritable callback hasn't executed
610     // yet because ac->fd != nullptr.
611     ac->connect_cancelled_ = true;
612     // Shutdown the fd. This would cause OnWritable to run as soon as
613     // possible. We dont need to pass a custom error here because it wont be
614     // used since the on_connect_closure is not run if connect cancellation is
615     // successful.
616     ac->fd_->ShutdownHandle(
617         absl::FailedPreconditionError("Connection cancelled"));
618   }
619   bool done = (--ac->refs_ == 0);
620   ac->mu_.Unlock();
621   if (done) {
622     delete ac;
623   }
624   return connection_cancel_success;
625 #else   // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
626   grpc_core::Crash(
627       "EventEngine::CancelConnect is not supported on this platform");
628 #endif  // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
629 }
630 
Connect(OnConnectCallback on_connect,const ResolvedAddress & addr,const EndpointConfig & args,MemoryAllocator memory_allocator,Duration timeout)631 EventEngine::ConnectionHandle PosixEventEngine::Connect(
632     OnConnectCallback on_connect, const ResolvedAddress& addr,
633     const EndpointConfig& args, MemoryAllocator memory_allocator,
634     Duration timeout) {
635 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
636   CHECK_NE(poller_manager_, nullptr);
637   PosixTcpOptions options = TcpOptionsFromEndpointConfig(args);
638   absl::StatusOr<PosixSocketWrapper::PosixSocketCreateResult> socket =
639       PosixSocketWrapper::CreateAndPrepareTcpClientSocket(options, addr);
640   if (!socket.ok()) {
641     Run([on_connect = std::move(on_connect),
642          status = socket.status()]() mutable { on_connect(status); });
643     return EventEngine::ConnectionHandle::kInvalid;
644   }
645   return CreateEndpointFromUnconnectedFdInternal(
646       (*socket).sock.Fd(), std::move(on_connect), (*socket).mapped_target_addr,
647       options, std::move(memory_allocator), timeout);
648 #else   // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
649   grpc_core::Crash("EventEngine::Connect is not supported on this platform");
650 #endif  // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
651 }
652 
CreateEndpointFromUnconnectedFd(int fd,EventEngine::OnConnectCallback on_connect,const EventEngine::ResolvedAddress & addr,const EndpointConfig & config,MemoryAllocator memory_allocator,EventEngine::Duration timeout)653 EventEngine::ConnectionHandle PosixEventEngine::CreateEndpointFromUnconnectedFd(
654     int fd, EventEngine::OnConnectCallback on_connect,
655     const EventEngine::ResolvedAddress& addr, const EndpointConfig& config,
656     MemoryAllocator memory_allocator, EventEngine::Duration timeout) {
657 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
658   return CreateEndpointFromUnconnectedFdInternal(
659       fd, std::move(on_connect), addr, TcpOptionsFromEndpointConfig(config),
660       std::move(memory_allocator), timeout);
661 #else   // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
662   grpc_core::Crash(
663       "EventEngine::CreateEndpointFromUnconnectedFd is not supported on this "
664       "platform");
665 #endif  // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
666 }
667 
668 std::unique_ptr<EventEngine::Endpoint>
CreatePosixEndpointFromFd(int fd,const EndpointConfig & config,MemoryAllocator memory_allocator)669 PosixEventEngine::CreatePosixEndpointFromFd(int fd,
670                                             const EndpointConfig& config,
671                                             MemoryAllocator memory_allocator) {
672 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
673   DCHECK_GT(fd, 0);
674   PosixEventPoller* poller = poller_manager_->Poller();
675   DCHECK_NE(poller, nullptr);
676   EventHandle* handle =
677       poller->CreateHandle(fd, "tcp-client", poller->CanTrackErrors());
678   return CreatePosixEndpoint(handle, nullptr, shared_from_this(),
679                              std::move(memory_allocator),
680                              TcpOptionsFromEndpointConfig(config));
681 #else   // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
682   grpc_core::Crash(
683       "PosixEventEngine::CreatePosixEndpointFromFd is not supported on "
684       "this platform");
685 #endif  // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
686 }
687 
CreateEndpointFromFd(int fd,const EndpointConfig & config)688 std::unique_ptr<EventEngine::Endpoint> PosixEventEngine::CreateEndpointFromFd(
689     int fd, const EndpointConfig& config) {
690   auto options = TcpOptionsFromEndpointConfig(config);
691   MemoryAllocator allocator;
692   if (options.memory_allocator_factory != nullptr) {
693     return CreatePosixEndpointFromFd(
694         fd, config,
695         options.memory_allocator_factory->CreateMemoryAllocator(
696             absl::StrCat("allocator:", fd)));
697   }
698   return CreatePosixEndpointFromFd(
699       fd, config,
700       options.resource_quota->memory_quota()->CreateMemoryAllocator(
701           absl::StrCat("allocator:", fd)));
702 }
703 
704 absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
CreateListener(Listener::AcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const EndpointConfig & config,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)705 PosixEventEngine::CreateListener(
706     Listener::AcceptCallback on_accept,
707     absl::AnyInvocable<void(absl::Status)> on_shutdown,
708     const EndpointConfig& config,
709     std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
710 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
711   PosixEventEngineWithFdSupport::PosixAcceptCallback posix_on_accept =
712       [on_accept_cb = std::move(on_accept)](
713           int /*listener_fd*/, std::unique_ptr<EventEngine::Endpoint> ep,
714           bool /*is_external*/, MemoryAllocator allocator,
715           SliceBuffer* /*pending_data*/) mutable {
716         on_accept_cb(std::move(ep), std::move(allocator));
717       };
718   return std::make_unique<PosixEngineListener>(
719       std::move(posix_on_accept), std::move(on_shutdown), config,
720       std::move(memory_allocator_factory), poller_manager_->Poller(),
721       shared_from_this());
722 #else   // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
723   grpc_core::Crash(
724       "EventEngine::CreateListener is not supported on this platform");
725 #endif  // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
726 }
727 
728 absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
CreatePosixListener(PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const EndpointConfig & config,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)729 PosixEventEngine::CreatePosixListener(
730     PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,
731     absl::AnyInvocable<void(absl::Status)> on_shutdown,
732     const EndpointConfig& config,
733     std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
734 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
735   return std::make_unique<PosixEngineListener>(
736       std::move(on_accept), std::move(on_shutdown), config,
737       std::move(memory_allocator_factory), poller_manager_->Poller(),
738       shared_from_this());
739 #else   // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
740   grpc_core::Crash(
741       "EventEngine::CreateListener is not supported on this platform");
742 #endif  // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
743 }
744 
745 }  // namespace experimental
746 }  // namespace grpc_event_engine
747