1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/posix_engine/posix_engine.h"
15
16 #include <grpc/event_engine/event_engine.h>
17 #include <grpc/event_engine/memory_allocator.h>
18 #include <grpc/event_engine/slice_buffer.h>
19 #include <grpc/support/cpu.h>
20 #include <grpc/support/port_platform.h>
21
22 #include <algorithm>
23 #include <atomic>
24 #include <chrono>
25 #include <cstdint>
26 #include <cstring>
27 #include <memory>
28 #include <string>
29 #include <type_traits>
30 #include <utility>
31
32 #include "absl/cleanup/cleanup.h"
33 #include "absl/functional/any_invocable.h"
34 #include "absl/log/check.h"
35 #include "absl/log/log.h"
36 #include "absl/status/status.h"
37 #include "absl/strings/match.h"
38 #include "absl/strings/str_cat.h"
39 #include "src/core/lib/debug/trace.h"
40 #include "src/core/lib/event_engine/ares_resolver.h"
41 #include "src/core/lib/event_engine/forkable.h"
42 #include "src/core/lib/event_engine/grpc_polled_fd.h"
43 #include "src/core/lib/event_engine/poller.h"
44 #include "src/core/lib/event_engine/posix.h"
45 #include "src/core/lib/event_engine/posix_engine/grpc_polled_fd_posix.h"
46 #include "src/core/lib/event_engine/posix_engine/native_posix_dns_resolver.h"
47 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
48 #include "src/core/lib/event_engine/posix_engine/timer.h"
49 #include "src/core/lib/event_engine/tcp_socket_utils.h"
50 #include "src/core/lib/event_engine/utils.h"
51 #include "src/core/lib/experiments/experiments.h"
52 #include "src/core/util/crash.h"
53 #include "src/core/util/no_destruct.h"
54 #include "src/core/util/sync.h"
55 #include "src/core/util/useful.h"
56
57 #ifdef GRPC_POSIX_SOCKET_TCP
58 #include <errno.h> // IWYU pragma: keep
59 #include <stdint.h> // IWYU pragma: keep
60 #include <sys/socket.h> // IWYU pragma: keep
61
62 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
63 #include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
64 #include "src/core/lib/event_engine/posix_engine/posix_endpoint.h"
65 #include "src/core/lib/event_engine/posix_engine/posix_engine_listener.h"
66 #endif // GRPC_POSIX_SOCKET_TCP
67
68 // IWYU pragma: no_include <ratio>
69
70 // TODO(eryu): remove this GRPC_CFSTREAM condition when the CFEngine is ready.
71 // The posix poller currently crashes iOS.
72 #if defined(GRPC_POSIX_SOCKET_TCP) && !defined(GRPC_CFSTREAM) && \
73 !defined(GRPC_DO_NOT_INSTANTIATE_POSIX_POLLER)
74 #define GRPC_PLATFORM_SUPPORTS_POSIX_POLLING true
75 #else
76 #define GRPC_PLATFORM_SUPPORTS_POSIX_POLLING false
77 #endif
78
79 using namespace std::chrono_literals;
80
81 namespace grpc_event_engine {
82 namespace experimental {
83
84 namespace {
85
86 grpc_core::NoDestruct<ObjectGroupForkHandler> g_timer_fork_manager;
87
88 class TimerForkCallbackMethods {
89 public:
Prefork()90 static void Prefork() { g_timer_fork_manager->Prefork(); }
PostforkParent()91 static void PostforkParent() { g_timer_fork_manager->PostforkParent(); }
PostforkChild()92 static void PostforkChild() { g_timer_fork_manager->PostforkChild(); }
93 };
94
95 } // namespace
96
97 #ifdef GRPC_POSIX_SOCKET_TCP
98
Start(EventEngine::Duration timeout)99 void AsyncConnect::Start(EventEngine::Duration timeout) {
100 on_writable_ = PosixEngineClosure::ToPermanentClosure(
101 [this](absl::Status status) { OnWritable(std::move(status)); });
102 alarm_handle_ = engine_->RunAfter(timeout, [this]() {
103 OnTimeoutExpired(absl::DeadlineExceededError("connect() timed out"));
104 });
105 fd_->NotifyOnWrite(on_writable_);
106 }
107
~AsyncConnect()108 AsyncConnect ::~AsyncConnect() { delete on_writable_; }
109
OnTimeoutExpired(absl::Status status)110 void AsyncConnect::OnTimeoutExpired(absl::Status status) {
111 bool done = false;
112 {
113 grpc_core::MutexLock lock(&mu_);
114 if (fd_ != nullptr) {
115 fd_->ShutdownHandle(std::move(status));
116 }
117 done = (--refs_ == 0);
118 }
119 if (done) {
120 delete this;
121 }
122 }
123
OnWritable(absl::Status status)124 void AsyncConnect::OnWritable(absl::Status status)
125 ABSL_NO_THREAD_SAFETY_ANALYSIS {
126 int so_error = 0;
127 socklen_t so_error_size;
128 int err;
129 int done;
130 int consumed_refs = 1;
131 EventHandle* fd;
132 absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> ep;
133
134 mu_.Lock();
135 CHECK_NE(fd_, nullptr);
136 fd = std::exchange(fd_, nullptr);
137 bool connect_cancelled = connect_cancelled_;
138 if (fd->IsHandleShutdown() && status.ok()) {
139 if (!connect_cancelled) {
140 // status is OK and handle has been shutdown but the connect was not
141 // cancelled. This can happen if the timeout expired and the while the
142 // OnWritable just started executing.
143 status = absl::DeadlineExceededError("connect() timed out");
144 } else {
145 // This can happen if the connection was cancelled while the OnWritable
146 // just started executing.
147 status = absl::FailedPreconditionError("Connection cancelled");
148 }
149 }
150 mu_.Unlock();
151
152 if (engine_->Cancel(alarm_handle_)) {
153 ++consumed_refs;
154 }
155
156 auto on_writable_finish = absl::MakeCleanup([&]() -> void {
157 mu_.AssertHeld();
158 if (!connect_cancelled) {
159 reinterpret_cast<PosixEventEngine*>(engine_.get())
160 ->OnConnectFinishInternal(connection_handle_);
161 }
162 if (fd != nullptr) {
163 fd->OrphanHandle(nullptr, nullptr, "tcp_client_orphan");
164 fd = nullptr;
165 }
166 if (!status.ok()) {
167 ep = absl::UnknownError(
168 absl::StrCat("Failed to connect to remote host: ", status.message()));
169 }
170 // Run the OnConnect callback asynchronously.
171 if (!connect_cancelled) {
172 executor_->Run(
173 [ep = std::move(ep), on_connect = std::move(on_connect_)]() mutable {
174 if (on_connect) {
175 on_connect(std::move(ep));
176 }
177 });
178 }
179 done = ((refs_ -= consumed_refs) == 0);
180 mu_.Unlock();
181 if (done) {
182 delete this;
183 }
184 });
185
186 mu_.Lock();
187 if (!status.ok() || connect_cancelled) {
188 return;
189 }
190
191 do {
192 so_error_size = sizeof(so_error);
193 err = getsockopt(fd->WrappedFd(), SOL_SOCKET, SO_ERROR, &so_error,
194 &so_error_size);
195 } while (err < 0 && errno == EINTR);
196 if (err < 0) {
197 status = absl::FailedPreconditionError(
198 absl::StrCat("getsockopt: ", std::strerror(errno)));
199 return;
200 }
201
202 switch (so_error) {
203 case 0:
204 ep = CreatePosixEndpoint(fd, nullptr, engine_, std::move(allocator_),
205 options_);
206 fd = nullptr;
207 break;
208 case ENOBUFS:
209 // We will get one of these errors if we have run out of
210 // memory in the kernel for the data structures allocated
211 // when you connect a socket. If this happens it is very
212 // likely that if we wait a little bit then try again the
213 // connection will work (since other programs or this
214 // program will close their network connections and free up
215 // memory). This does _not_ indicate that there is anything
216 // wrong with the server we are connecting to, this is a
217 // local problem.
218
219 // If you are looking at this code, then chances are that
220 // your program or another program on the same computer
221 // opened too many network connections. The "easy" fix:
222 // don't do that!
223 LOG(ERROR) << "kernel out of buffers";
224 mu_.Unlock();
225 fd->NotifyOnWrite(on_writable_);
226 // Don't run the cleanup function for this case.
227 std::move(on_writable_finish).Cancel();
228 return;
229 case ECONNREFUSED:
230 // This error shouldn't happen for anything other than connect().
231 status = absl::FailedPreconditionError(std::strerror(so_error));
232 break;
233 default:
234 // We don't really know which syscall triggered the problem here, so
235 // punt by reporting getsockopt().
236 status = absl::FailedPreconditionError(
237 absl::StrCat("getsockopt(SO_ERROR): ", std::strerror(so_error)));
238 break;
239 }
240 }
241
242 EventEngine::ConnectionHandle
CreateEndpointFromUnconnectedFdInternal(int fd,EventEngine::OnConnectCallback on_connect,const EventEngine::ResolvedAddress & addr,const PosixTcpOptions & tcp_options,MemoryAllocator memory_allocator,EventEngine::Duration timeout)243 PosixEventEngine::CreateEndpointFromUnconnectedFdInternal(
244 int fd, EventEngine::OnConnectCallback on_connect,
245 const EventEngine::ResolvedAddress& addr,
246 const PosixTcpOptions& tcp_options, MemoryAllocator memory_allocator,
247 EventEngine::Duration timeout) {
248 int err;
249 int connect_errno;
250 do {
251 err = connect(fd, addr.address(), addr.size());
252 } while (err < 0 && errno == EINTR);
253 connect_errno = (err < 0) ? errno : 0;
254
255 auto addr_uri = ResolvedAddressToURI(addr);
256 if (!addr_uri.ok()) {
257 Run([on_connect = std::move(on_connect),
258 ep = absl::FailedPreconditionError(absl::StrCat(
259 "connect failed: ", "invalid addr: ",
260 addr_uri.value()))]() mutable { on_connect(std::move(ep)); });
261 return EventEngine::ConnectionHandle::kInvalid;
262 }
263
264 std::string name = absl::StrCat("tcp-client:", addr_uri.value());
265 PosixEventPoller* poller = poller_manager_->Poller();
266 EventHandle* handle =
267 poller->CreateHandle(fd, name, poller->CanTrackErrors());
268
269 if (connect_errno == 0) {
270 // Connection already succeeded. Return 0 to discourage any cancellation
271 // attempts.
272 Run([on_connect = std::move(on_connect),
273 ep = CreatePosixEndpoint(
274 handle, nullptr, shared_from_this(), std::move(memory_allocator),
275 tcp_options)]() mutable { on_connect(std::move(ep)); });
276 return EventEngine::ConnectionHandle::kInvalid;
277 }
278 if (connect_errno != EWOULDBLOCK && connect_errno != EINPROGRESS) {
279 // Connection already failed. Return 0 to discourage any cancellation
280 // attempts.
281 handle->OrphanHandle(nullptr, nullptr, "tcp_client_connect_error");
282 Run([on_connect = std::move(on_connect),
283 ep = absl::FailedPreconditionError(absl::StrCat(
284 "connect failed: ", "addr: ", addr_uri.value(),
285 " error: ", std::strerror(connect_errno)))]() mutable {
286 on_connect(std::move(ep));
287 });
288 return EventEngine::ConnectionHandle::kInvalid;
289 }
290 // Connection is still in progress.
291 int64_t connection_id =
292 last_connection_id_.fetch_add(1, std::memory_order_acq_rel);
293 AsyncConnect* ac =
294 new AsyncConnect(std::move(on_connect), shared_from_this(),
295 executor_.get(), handle, std::move(memory_allocator),
296 tcp_options, addr_uri.value(), connection_id);
297 int shard_number = connection_id % connection_shards_.size();
298 struct ConnectionShard* shard = &connection_shards_[shard_number];
299 {
300 grpc_core::MutexLock lock(&shard->mu);
301 shard->pending_connections.insert_or_assign(connection_id, ac);
302 }
303 // Start asynchronous connect and return the connection id.
304 ac->Start(timeout);
305 return {static_cast<intptr_t>(connection_id), 0};
306 }
307
OnConnectFinishInternal(int connection_handle)308 void PosixEventEngine::OnConnectFinishInternal(int connection_handle) {
309 int shard_number = connection_handle % connection_shards_.size();
310 struct ConnectionShard* shard = &connection_shards_[shard_number];
311 {
312 grpc_core::MutexLock lock(&shard->mu);
313 shard->pending_connections.erase(connection_handle);
314 }
315 }
316
PosixEnginePollerManager(std::shared_ptr<ThreadPool> executor)317 PosixEnginePollerManager::PosixEnginePollerManager(
318 std::shared_ptr<ThreadPool> executor)
319 : poller_(grpc_event_engine::experimental::MakeDefaultPoller(this)),
320 executor_(std::move(executor)),
321 trigger_shutdown_called_(false) {}
322
PosixEnginePollerManager(std::shared_ptr<PosixEventPoller> poller)323 PosixEnginePollerManager::PosixEnginePollerManager(
324 std::shared_ptr<PosixEventPoller> poller)
325 : poller_(std::move(poller)),
326 poller_state_(PollerState::kExternal),
327 executor_(nullptr),
328 trigger_shutdown_called_(false) {
329 DCHECK_NE(poller_, nullptr);
330 }
331
Run(experimental::EventEngine::Closure * closure)332 void PosixEnginePollerManager::Run(
333 experimental::EventEngine::Closure* closure) {
334 if (executor_ != nullptr) {
335 executor_->Run(closure);
336 }
337 }
338
Run(absl::AnyInvocable<void ()> cb)339 void PosixEnginePollerManager::Run(absl::AnyInvocable<void()> cb) {
340 if (executor_ != nullptr) {
341 executor_->Run(std::move(cb));
342 }
343 }
344
TriggerShutdown()345 void PosixEnginePollerManager::TriggerShutdown() {
346 DCHECK(trigger_shutdown_called_ == false);
347 trigger_shutdown_called_ = true;
348 // If the poller is external, dont try to shut it down. Otherwise
349 // set poller state to PollerState::kShuttingDown.
350 if (poller_state_.exchange(PollerState::kShuttingDown) ==
351 PollerState::kExternal) {
352 poller_ = nullptr;
353 return;
354 }
355 poller_->Kick();
356 }
357
~PosixEnginePollerManager()358 PosixEnginePollerManager::~PosixEnginePollerManager() {
359 if (poller_ != nullptr) {
360 poller_->Shutdown();
361 }
362 }
363
PosixEventEngine(std::shared_ptr<PosixEventPoller> poller)364 PosixEventEngine::PosixEventEngine(std::shared_ptr<PosixEventPoller> poller)
365 : grpc_core::KeepsGrpcInitialized(
366 /*enabled=*/!grpc_core::IsPosixEeSkipGrpcInitEnabled()),
367 connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)),
368 executor_(MakeThreadPool(grpc_core::Clamp(gpr_cpu_num_cores(), 4u, 16u))),
369 timer_manager_(std::make_shared<TimerManager>(executor_)) {
370 g_timer_fork_manager->RegisterForkable(
371 timer_manager_, TimerForkCallbackMethods::Prefork,
372 TimerForkCallbackMethods::PostforkParent,
373 TimerForkCallbackMethods::PostforkChild);
374 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
375 poller_manager_ = std::make_shared<PosixEnginePollerManager>(poller);
376 #endif
377 }
378
PosixEventEngine()379 PosixEventEngine::PosixEventEngine()
380 : grpc_core::KeepsGrpcInitialized(
381 /*enabled=*/!grpc_core::IsPosixEeSkipGrpcInitEnabled()),
382 connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)),
383 executor_(MakeThreadPool(grpc_core::Clamp(gpr_cpu_num_cores(), 4u, 16u))),
384 timer_manager_(std::make_shared<TimerManager>(executor_)) {
385 g_timer_fork_manager->RegisterForkable(
386 timer_manager_, TimerForkCallbackMethods::Prefork,
387 TimerForkCallbackMethods::PostforkParent,
388 TimerForkCallbackMethods::PostforkChild);
389 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
390 poller_manager_ = std::make_shared<PosixEnginePollerManager>(executor_);
391 // The threadpool must be instantiated after the poller otherwise, the
392 // process will deadlock when forking.
393 if (poller_manager_->Poller() != nullptr) {
394 executor_->Run([poller_manager = poller_manager_]() {
395 PollerWorkInternal(poller_manager);
396 });
397 }
398 #endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
399 }
400
PollerWorkInternal(std::shared_ptr<PosixEnginePollerManager> poller_manager)401 void PosixEventEngine::PollerWorkInternal(
402 std::shared_ptr<PosixEnginePollerManager> poller_manager) {
403 // TODO(vigneshbabu): The timeout specified here is arbitrary. For instance,
404 // this can be improved by setting the timeout to the next expiring timer.
405 PosixEventPoller* poller = poller_manager->Poller();
406 ThreadPool* executor = poller_manager->Executor();
407 auto result = poller->Work(24h, [executor, &poller_manager]() {
408 executor->Run([poller_manager]() mutable {
409 PollerWorkInternal(std::move(poller_manager));
410 });
411 });
412 if (result == Poller::WorkResult::kDeadlineExceeded) {
413 // The EventEngine is not shutting down but the next asynchronous
414 // PollerWorkInternal did not get scheduled. Schedule it now.
415 executor->Run([poller_manager = std::move(poller_manager)]() {
416 PollerWorkInternal(poller_manager);
417 });
418 } else if (result == Poller::WorkResult::kKicked &&
419 poller_manager->IsShuttingDown()) {
420 // The Poller Got Kicked and poller_state_ is set to
421 // PollerState::kShuttingDown. This can currently happen only from the
422 // EventEngine destructor. Sample the use_count of poller_manager. If the
423 // sampled use_count is > 1, there is one more instance of Work(...)
424 // which hasn't returned yet. Send another Kick to be safe to ensure the
425 // pending instance of Work(..) also breaks out. Its possible that the other
426 // instance of Work(..) had already broken out before this Kick is sent. In
427 // that case, the Kick is spurious but it shouldn't cause any side effects.
428 if (poller_manager.use_count() > 1) {
429 poller->Kick();
430 }
431 }
432 }
433
434 #endif // GRPC_POSIX_SOCKET_TCP
435
436 struct PosixEventEngine::ClosureData final : public EventEngine::Closure {
437 absl::AnyInvocable<void()> cb;
438 Timer timer;
439 PosixEventEngine* engine;
440 EventEngine::TaskHandle handle;
441
Rungrpc_event_engine::experimental::PosixEventEngine::ClosureData442 void Run() override {
443 GRPC_TRACE_LOG(event_engine, INFO)
444 << "PosixEventEngine:" << engine << " executing callback:" << handle;
445 {
446 grpc_core::MutexLock lock(&engine->mu_);
447 engine->known_handles_.erase(handle);
448 }
449 cb();
450 delete this;
451 }
452 };
453
~PosixEventEngine()454 PosixEventEngine::~PosixEventEngine() {
455 {
456 grpc_core::MutexLock lock(&mu_);
457 if (GRPC_TRACE_FLAG_ENABLED(event_engine)) {
458 for (auto handle : known_handles_) {
459 LOG(ERROR) << "(event_engine) PosixEventEngine:" << this
460 << " uncleared TaskHandle at shutdown:"
461 << HandleToString(handle);
462 }
463 }
464 CHECK(GPR_LIKELY(known_handles_.empty()));
465 }
466 timer_manager_->Shutdown();
467 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
468 if (poller_manager_ != nullptr) {
469 poller_manager_->TriggerShutdown();
470 }
471 #endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
472 executor_->Quiesce();
473 }
474
Cancel(EventEngine::TaskHandle handle)475 bool PosixEventEngine::Cancel(EventEngine::TaskHandle handle) {
476 grpc_core::MutexLock lock(&mu_);
477 if (!known_handles_.contains(handle)) return false;
478 auto* cd = reinterpret_cast<ClosureData*>(handle.keys[0]);
479 bool r = timer_manager_->TimerCancel(&cd->timer);
480 known_handles_.erase(handle);
481 if (r) delete cd;
482 return r;
483 }
484
RunAfter(Duration when,absl::AnyInvocable<void ()> closure)485 EventEngine::TaskHandle PosixEventEngine::RunAfter(
486 Duration when, absl::AnyInvocable<void()> closure) {
487 return RunAfterInternal(when, std::move(closure));
488 }
489
RunAfter(Duration when,EventEngine::Closure * closure)490 EventEngine::TaskHandle PosixEventEngine::RunAfter(
491 Duration when, EventEngine::Closure* closure) {
492 return RunAfterInternal(when, [closure]() { closure->Run(); });
493 }
494
Run(absl::AnyInvocable<void ()> closure)495 void PosixEventEngine::Run(absl::AnyInvocable<void()> closure) {
496 executor_->Run(std::move(closure));
497 }
498
Run(EventEngine::Closure * closure)499 void PosixEventEngine::Run(EventEngine::Closure* closure) {
500 executor_->Run(closure);
501 }
502
RunAfterInternal(Duration when,absl::AnyInvocable<void ()> cb)503 EventEngine::TaskHandle PosixEventEngine::RunAfterInternal(
504 Duration when, absl::AnyInvocable<void()> cb) {
505 if (when <= Duration::zero()) {
506 Run(std::move(cb));
507 return TaskHandle::kInvalid;
508 }
509 auto when_ts = ToTimestamp(timer_manager_->Now(), when);
510 auto* cd = new ClosureData;
511 cd->cb = std::move(cb);
512 cd->engine = this;
513 EventEngine::TaskHandle handle{reinterpret_cast<intptr_t>(cd),
514 aba_token_.fetch_add(1)};
515 grpc_core::MutexLock lock(&mu_);
516 known_handles_.insert(handle);
517 cd->handle = handle;
518 GRPC_TRACE_LOG(event_engine, INFO)
519 << "PosixEventEngine:" << this << " scheduling callback:" << handle;
520 timer_manager_->TimerInit(&cd->timer, when_ts, cd);
521 return handle;
522 }
523
PosixDNSResolver(grpc_core::OrphanablePtr<RefCountedDNSResolverInterface> dns_resolver)524 PosixEventEngine::PosixDNSResolver::PosixDNSResolver(
525 grpc_core::OrphanablePtr<RefCountedDNSResolverInterface> dns_resolver)
526 : dns_resolver_(std::move(dns_resolver)) {}
527
LookupHostname(LookupHostnameCallback on_resolve,absl::string_view name,absl::string_view default_port)528 void PosixEventEngine::PosixDNSResolver::LookupHostname(
529 LookupHostnameCallback on_resolve, absl::string_view name,
530 absl::string_view default_port) {
531 dns_resolver_->LookupHostname(std::move(on_resolve), name, default_port);
532 }
533
LookupSRV(LookupSRVCallback on_resolve,absl::string_view name)534 void PosixEventEngine::PosixDNSResolver::LookupSRV(LookupSRVCallback on_resolve,
535 absl::string_view name) {
536 dns_resolver_->LookupSRV(std::move(on_resolve), name);
537 }
538
LookupTXT(LookupTXTCallback on_resolve,absl::string_view name)539 void PosixEventEngine::PosixDNSResolver::LookupTXT(LookupTXTCallback on_resolve,
540 absl::string_view name) {
541 dns_resolver_->LookupTXT(std::move(on_resolve), name);
542 }
543
544 absl::StatusOr<std::unique_ptr<EventEngine::DNSResolver>>
GetDNSResolver(GRPC_UNUSED const EventEngine::DNSResolver::ResolverOptions & options)545 PosixEventEngine::GetDNSResolver(
546 GRPC_UNUSED const EventEngine::DNSResolver::ResolverOptions& options) {
547 #ifndef GRPC_POSIX_SOCKET_RESOLVE_ADDRESS
548 grpc_core::Crash("Unable to get DNS resolver for this platform.");
549 #else // GRPC_POSIX_SOCKET_RESOLVE_ADDRESS
550 // If c-ares is supported on the platform, build according to user's
551 // configuration.
552 if (ShouldUseAresDnsResolver()) {
553 #if GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
554 GRPC_TRACE_LOG(event_engine_dns, INFO)
555 << "PosixEventEngine::" << this << " creating AresResolver";
556 auto ares_resolver = AresResolver::CreateAresResolver(
557 options.dns_server,
558 std::make_unique<GrpcPolledFdFactoryPosix>(poller_manager_->Poller()),
559 shared_from_this());
560 if (!ares_resolver.ok()) {
561 return ares_resolver.status();
562 }
563 return std::make_unique<PosixEventEngine::PosixDNSResolver>(
564 std::move(*ares_resolver));
565 #endif // GRPC_ARES == 1 && defined(GRPC_POSIX_SOCKET_ARES_EV_DRIVER)
566 }
567 GRPC_TRACE_LOG(event_engine_dns, INFO)
568 << "PosixEventEngine::" << this << " creating NativePosixDNSResolver";
569 return std::make_unique<NativePosixDNSResolver>(shared_from_this());
570 #endif // GRPC_POSIX_SOCKET_RESOLVE_ADDRESS
571 }
572
IsWorkerThread()573 bool PosixEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
574
CancelConnect(EventEngine::ConnectionHandle handle)575 bool PosixEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) {
576 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
577 int connection_handle = handle.keys[0];
578 if (connection_handle <= 0) {
579 return false;
580 }
581 int shard_number = connection_handle % connection_shards_.size();
582 struct ConnectionShard* shard = &connection_shards_[shard_number];
583 AsyncConnect* ac = nullptr;
584 {
585 grpc_core::MutexLock lock(&shard->mu);
586 auto it = shard->pending_connections.find(connection_handle);
587 if (it != shard->pending_connections.end()) {
588 ac = it->second;
589 CHECK_NE(ac, nullptr);
590 // Trying to acquire ac->mu here would could cause a deadlock because
591 // the OnWritable method tries to acquire the two mutexes used
592 // here in the reverse order. But we dont need to acquire ac->mu before
593 // incrementing ac->refs here. This is because the OnWritable
594 // method decrements ac->refs only after deleting the connection handle
595 // from the corresponding hashmap. If the code enters here, it means
596 // that deletion hasn't happened yet. The deletion can only happen after
597 // the corresponding g_shard_mu is unlocked.
598 ++ac->refs_;
599 // Remove connection from list of active connections.
600 shard->pending_connections.erase(it);
601 }
602 }
603 if (ac == nullptr) {
604 return false;
605 }
606 ac->mu_.Lock();
607 bool connection_cancel_success = (ac->fd_ != nullptr);
608 if (connection_cancel_success) {
609 // Connection is still pending. The OnWritable callback hasn't executed
610 // yet because ac->fd != nullptr.
611 ac->connect_cancelled_ = true;
612 // Shutdown the fd. This would cause OnWritable to run as soon as
613 // possible. We dont need to pass a custom error here because it wont be
614 // used since the on_connect_closure is not run if connect cancellation is
615 // successful.
616 ac->fd_->ShutdownHandle(
617 absl::FailedPreconditionError("Connection cancelled"));
618 }
619 bool done = (--ac->refs_ == 0);
620 ac->mu_.Unlock();
621 if (done) {
622 delete ac;
623 }
624 return connection_cancel_success;
625 #else // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
626 grpc_core::Crash(
627 "EventEngine::CancelConnect is not supported on this platform");
628 #endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
629 }
630
Connect(OnConnectCallback on_connect,const ResolvedAddress & addr,const EndpointConfig & args,MemoryAllocator memory_allocator,Duration timeout)631 EventEngine::ConnectionHandle PosixEventEngine::Connect(
632 OnConnectCallback on_connect, const ResolvedAddress& addr,
633 const EndpointConfig& args, MemoryAllocator memory_allocator,
634 Duration timeout) {
635 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
636 CHECK_NE(poller_manager_, nullptr);
637 PosixTcpOptions options = TcpOptionsFromEndpointConfig(args);
638 absl::StatusOr<PosixSocketWrapper::PosixSocketCreateResult> socket =
639 PosixSocketWrapper::CreateAndPrepareTcpClientSocket(options, addr);
640 if (!socket.ok()) {
641 Run([on_connect = std::move(on_connect),
642 status = socket.status()]() mutable { on_connect(status); });
643 return EventEngine::ConnectionHandle::kInvalid;
644 }
645 return CreateEndpointFromUnconnectedFdInternal(
646 (*socket).sock.Fd(), std::move(on_connect), (*socket).mapped_target_addr,
647 options, std::move(memory_allocator), timeout);
648 #else // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
649 grpc_core::Crash("EventEngine::Connect is not supported on this platform");
650 #endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
651 }
652
CreateEndpointFromUnconnectedFd(int fd,EventEngine::OnConnectCallback on_connect,const EventEngine::ResolvedAddress & addr,const EndpointConfig & config,MemoryAllocator memory_allocator,EventEngine::Duration timeout)653 EventEngine::ConnectionHandle PosixEventEngine::CreateEndpointFromUnconnectedFd(
654 int fd, EventEngine::OnConnectCallback on_connect,
655 const EventEngine::ResolvedAddress& addr, const EndpointConfig& config,
656 MemoryAllocator memory_allocator, EventEngine::Duration timeout) {
657 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
658 return CreateEndpointFromUnconnectedFdInternal(
659 fd, std::move(on_connect), addr, TcpOptionsFromEndpointConfig(config),
660 std::move(memory_allocator), timeout);
661 #else // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
662 grpc_core::Crash(
663 "EventEngine::CreateEndpointFromUnconnectedFd is not supported on this "
664 "platform");
665 #endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
666 }
667
668 std::unique_ptr<EventEngine::Endpoint>
CreatePosixEndpointFromFd(int fd,const EndpointConfig & config,MemoryAllocator memory_allocator)669 PosixEventEngine::CreatePosixEndpointFromFd(int fd,
670 const EndpointConfig& config,
671 MemoryAllocator memory_allocator) {
672 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
673 DCHECK_GT(fd, 0);
674 PosixEventPoller* poller = poller_manager_->Poller();
675 DCHECK_NE(poller, nullptr);
676 EventHandle* handle =
677 poller->CreateHandle(fd, "tcp-client", poller->CanTrackErrors());
678 return CreatePosixEndpoint(handle, nullptr, shared_from_this(),
679 std::move(memory_allocator),
680 TcpOptionsFromEndpointConfig(config));
681 #else // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
682 grpc_core::Crash(
683 "PosixEventEngine::CreatePosixEndpointFromFd is not supported on "
684 "this platform");
685 #endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
686 }
687
CreateEndpointFromFd(int fd,const EndpointConfig & config)688 std::unique_ptr<EventEngine::Endpoint> PosixEventEngine::CreateEndpointFromFd(
689 int fd, const EndpointConfig& config) {
690 auto options = TcpOptionsFromEndpointConfig(config);
691 MemoryAllocator allocator;
692 if (options.memory_allocator_factory != nullptr) {
693 return CreatePosixEndpointFromFd(
694 fd, config,
695 options.memory_allocator_factory->CreateMemoryAllocator(
696 absl::StrCat("allocator:", fd)));
697 }
698 return CreatePosixEndpointFromFd(
699 fd, config,
700 options.resource_quota->memory_quota()->CreateMemoryAllocator(
701 absl::StrCat("allocator:", fd)));
702 }
703
704 absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
CreateListener(Listener::AcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const EndpointConfig & config,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)705 PosixEventEngine::CreateListener(
706 Listener::AcceptCallback on_accept,
707 absl::AnyInvocable<void(absl::Status)> on_shutdown,
708 const EndpointConfig& config,
709 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
710 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
711 PosixEventEngineWithFdSupport::PosixAcceptCallback posix_on_accept =
712 [on_accept_cb = std::move(on_accept)](
713 int /*listener_fd*/, std::unique_ptr<EventEngine::Endpoint> ep,
714 bool /*is_external*/, MemoryAllocator allocator,
715 SliceBuffer* /*pending_data*/) mutable {
716 on_accept_cb(std::move(ep), std::move(allocator));
717 };
718 return std::make_unique<PosixEngineListener>(
719 std::move(posix_on_accept), std::move(on_shutdown), config,
720 std::move(memory_allocator_factory), poller_manager_->Poller(),
721 shared_from_this());
722 #else // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
723 grpc_core::Crash(
724 "EventEngine::CreateListener is not supported on this platform");
725 #endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
726 }
727
728 absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
CreatePosixListener(PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const EndpointConfig & config,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)729 PosixEventEngine::CreatePosixListener(
730 PosixEventEngineWithFdSupport::PosixAcceptCallback on_accept,
731 absl::AnyInvocable<void(absl::Status)> on_shutdown,
732 const EndpointConfig& config,
733 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
734 #if GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
735 return std::make_unique<PosixEngineListener>(
736 std::move(on_accept), std::move(on_shutdown), config,
737 std::move(memory_allocator_factory), poller_manager_->Poller(),
738 shared_from_this());
739 #else // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
740 grpc_core::Crash(
741 "EventEngine::CreateListener is not supported on this platform");
742 #endif // GRPC_PLATFORM_SUPPORTS_POSIX_POLLING
743 }
744
745 } // namespace experimental
746 } // namespace grpc_event_engine
747