1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/event_engine/posix_engine/ev_poll_posix.h"
16
17 #include <grpc/event_engine/event_engine.h>
18 #include <grpc/status.h>
19 #include <grpc/support/port_platform.h>
20 #include <grpc/support/sync.h>
21 #include <grpc/support/time.h>
22 #include <stdint.h>
23
24 #include <atomic>
25 #include <list>
26 #include <memory>
27 #include <utility>
28
29 #include "absl/container/inlined_vector.h"
30 #include "absl/functional/any_invocable.h"
31 #include "absl/log/check.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/str_format.h"
35 #include "src/core/lib/event_engine/poller.h"
36 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
37 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
38 #include "src/core/lib/iomgr/port.h"
39 #include "src/core/util/crash.h"
40
41 #ifdef GRPC_POSIX_SOCKET_EV_POLL
42
43 #include <errno.h>
44 #include <grpc/support/alloc.h>
45 #include <limits.h>
46 #include <poll.h>
47 #include <sys/socket.h>
48 #include <unistd.h>
49
50 #include "src/core/lib/event_engine/common_closures.h"
51 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
52 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h"
53 #include "src/core/lib/event_engine/time_util.h"
54 #include "src/core/util/fork.h"
55 #include "src/core/util/status_helper.h"
56 #include "src/core/util/strerror.h"
57 #include "src/core/util/sync.h"
58 #include "src/core/util/time.h"
59
60 static const intptr_t kClosureNotReady = 0;
61 static const intptr_t kClosureReady = 1;
62 static const int kPollinCheck = POLLIN | POLLHUP | POLLERR;
63 static const int kPolloutCheck = POLLOUT | POLLHUP | POLLERR;
64
65 namespace grpc_event_engine {
66 namespace experimental {
67
68 using Events = absl::InlinedVector<PollEventHandle*, 5>;
69
70 class PollEventHandle : public EventHandle {
71 public:
PollEventHandle(int fd,std::shared_ptr<PollPoller> poller)72 PollEventHandle(int fd, std::shared_ptr<PollPoller> poller)
73 : fd_(fd),
74 pending_actions_(0),
75 fork_fd_list_(this),
76 poller_handles_list_(this),
77 scheduler_(poller->GetScheduler()),
78 poller_(std::move(poller)),
79 is_orphaned_(false),
80 is_shutdown_(false),
81 closed_(false),
82 released_(false),
83 pollhup_(false),
84 watch_mask_(-1),
85 shutdown_error_(absl::OkStatus()),
86 exec_actions_closure_([this]() { ExecutePendingActions(); }),
87 on_done_(nullptr),
88 read_closure_(reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)),
89 write_closure_(
90 reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)) {
91 grpc_core::MutexLock lock(&poller_->mu_);
92 poller_->PollerHandlesListAddHandle(this);
93 }
Poller()94 PollPoller* Poller() override { return poller_.get(); }
SetPendingActions(bool pending_read,bool pending_write)95 bool SetPendingActions(bool pending_read, bool pending_write) {
96 pending_actions_ |= pending_read;
97 if (pending_write) {
98 pending_actions_ |= (1 << 2);
99 }
100 if (pending_read || pending_write) {
101 // The closure is going to be executed. We'll Unref this handle in
102 // ExecutePendingActions.
103 Ref();
104 return true;
105 }
106 return false;
107 }
ForceRemoveHandleFromPoller()108 void ForceRemoveHandleFromPoller() {
109 grpc_core::MutexLock lock(&poller_->mu_);
110 poller_->PollerHandlesListRemoveHandle(this);
111 }
WrappedFd()112 int WrappedFd() override { return fd_; }
IsOrphaned() const113 bool IsOrphaned() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
114 return is_orphaned_;
115 }
CloseFd()116 void CloseFd() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
117 if (!released_ && !closed_) {
118 closed_ = true;
119 close(fd_);
120 }
121 }
IsPollhup() const122 bool IsPollhup() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return pollhup_; }
SetPollhup(bool pollhup)123 void SetPollhup(bool pollhup) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
124 pollhup_ = pollhup;
125 }
IsWatched(int & watch_mask) const126 bool IsWatched(int& watch_mask) const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
127 watch_mask = watch_mask_;
128 return watch_mask_ != -1;
129 }
IsWatched() const130 bool IsWatched() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
131 return watch_mask_ != -1;
132 }
SetWatched(int watch_mask)133 void SetWatched(int watch_mask) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
134 watch_mask_ = watch_mask;
135 }
136 void OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
137 absl::string_view reason) override;
138 void ShutdownHandle(absl::Status why) override;
139 void NotifyOnRead(PosixEngineClosure* on_read) override;
140 void NotifyOnWrite(PosixEngineClosure* on_write) override;
141 void NotifyOnError(PosixEngineClosure* on_error) override;
142 void SetReadable() override;
143 void SetWritable() override;
144 void SetHasError() override;
IsHandleShutdown()145 bool IsHandleShutdown() override {
146 grpc_core::MutexLock lock(&mu_);
147 return is_shutdown_;
148 };
ExecutePendingActions()149 inline void ExecutePendingActions() {
150 int kick = 0;
151 {
152 grpc_core::MutexLock lock(&mu_);
153 if ((pending_actions_ & 1UL)) {
154 if (SetReadyLocked(&read_closure_)) {
155 kick = 1;
156 }
157 }
158 if (((pending_actions_ >> 2) & 1UL)) {
159 if (SetReadyLocked(&write_closure_)) {
160 kick = 1;
161 }
162 }
163 pending_actions_ = 0;
164 }
165 if (kick) {
166 // SetReadyLocked immediately scheduled some closure. It would have set
167 // the closure state to NOT_READY. We need to wakeup the Work(...)
168 // thread to start polling on this fd. If this call is not made, it is
169 // possible that the poller will reach a state where all the fds under
170 // the poller's control are not polled for POLLIN/POLLOUT events thus
171 // leading to an indefinitely blocked Work(..) method.
172 poller_->KickExternal(false);
173 }
174 Unref();
175 }
Ref()176 void Ref() { ref_count_.fetch_add(1, std::memory_order_relaxed); }
Unref()177 void Unref() {
178 if (ref_count_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
179 if (on_done_ != nullptr) {
180 scheduler_->Run(on_done_);
181 }
182 delete this;
183 }
184 }
185 ~PollEventHandle() override = default;
mu()186 grpc_core::Mutex* mu() ABSL_LOCK_RETURNED(mu_) { return &mu_; }
ForkFdListPos()187 PollPoller::HandlesList& ForkFdListPos() { return fork_fd_list_; }
PollerHandlesListPos()188 PollPoller::HandlesList& PollerHandlesListPos() {
189 return poller_handles_list_;
190 }
191 uint32_t BeginPollLocked(uint32_t read_mask, uint32_t write_mask)
192 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
193 bool EndPollLocked(bool got_read, bool got_write)
194 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
195
196 private:
197 int SetReadyLocked(PosixEngineClosure** st);
198 int NotifyOnLocked(PosixEngineClosure** st, PosixEngineClosure* closure);
199 // See Epoll1Poller::ShutdownHandle for explanation on why a mutex is
200 // required.
201 grpc_core::Mutex mu_;
202 std::atomic<int> ref_count_{1};
203 int fd_;
204 int pending_actions_;
205 PollPoller::HandlesList fork_fd_list_;
206 PollPoller::HandlesList poller_handles_list_;
207 Scheduler* scheduler_;
208 std::shared_ptr<PollPoller> poller_;
209 bool is_orphaned_;
210 bool is_shutdown_;
211 bool closed_;
212 bool released_;
213 bool pollhup_;
214 int watch_mask_;
215 absl::Status shutdown_error_;
216 AnyInvocableClosure exec_actions_closure_;
217 PosixEngineClosure* on_done_;
218 PosixEngineClosure* read_closure_;
219 PosixEngineClosure* write_closure_;
220 };
221
222 namespace {
223 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
224 std::list<PollPoller*> fork_poller_list;
225
226 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
227 PollEventHandle* fork_fd_list_head = nullptr;
228 gpr_mu fork_fd_list_mu;
229
ForkFdListAddHandle(PollEventHandle * handle)230 void ForkFdListAddHandle(PollEventHandle* handle) {
231 if (grpc_core::Fork::Enabled()) {
232 gpr_mu_lock(&fork_fd_list_mu);
233 handle->ForkFdListPos().next = fork_fd_list_head;
234 handle->ForkFdListPos().prev = nullptr;
235 if (fork_fd_list_head != nullptr) {
236 fork_fd_list_head->ForkFdListPos().prev = handle;
237 }
238 fork_fd_list_head = handle;
239 gpr_mu_unlock(&fork_fd_list_mu);
240 }
241 }
242
ForkFdListRemoveHandle(PollEventHandle * handle)243 void ForkFdListRemoveHandle(PollEventHandle* handle) {
244 if (grpc_core::Fork::Enabled()) {
245 gpr_mu_lock(&fork_fd_list_mu);
246 if (fork_fd_list_head == handle) {
247 fork_fd_list_head = handle->ForkFdListPos().next;
248 }
249 if (handle->ForkFdListPos().prev != nullptr) {
250 handle->ForkFdListPos().prev->ForkFdListPos().next =
251 handle->ForkFdListPos().next;
252 }
253 if (handle->ForkFdListPos().next != nullptr) {
254 handle->ForkFdListPos().next->ForkFdListPos().prev =
255 handle->ForkFdListPos().prev;
256 }
257 gpr_mu_unlock(&fork_fd_list_mu);
258 }
259 }
260
ForkPollerListAddPoller(PollPoller * poller)261 void ForkPollerListAddPoller(PollPoller* poller) {
262 if (grpc_core::Fork::Enabled()) {
263 gpr_mu_lock(&fork_fd_list_mu);
264 fork_poller_list.push_back(poller);
265 gpr_mu_unlock(&fork_fd_list_mu);
266 }
267 }
268
ForkPollerListRemovePoller(PollPoller * poller)269 void ForkPollerListRemovePoller(PollPoller* poller) {
270 if (grpc_core::Fork::Enabled()) {
271 gpr_mu_lock(&fork_fd_list_mu);
272 fork_poller_list.remove(poller);
273 gpr_mu_unlock(&fork_fd_list_mu);
274 }
275 }
276
277 // Returns the number of milliseconds elapsed between now and start timestamp.
PollElapsedTimeToMillis(grpc_core::Timestamp start)278 int PollElapsedTimeToMillis(grpc_core::Timestamp start) {
279 if (start == grpc_core::Timestamp::InfFuture()) return -1;
280 grpc_core::Timestamp now =
281 grpc_core::Timestamp::FromTimespecRoundDown(gpr_now(GPR_CLOCK_MONOTONIC));
282 int64_t delta = (now - start).millis();
283 if (delta > INT_MAX) {
284 return INT_MAX;
285 } else if (delta < 0) {
286 return 0;
287 } else {
288 return static_cast<int>(delta);
289 }
290 }
291
292 bool InitPollPollerPosix();
293
294 // Called by the child process's post-fork handler to close open fds,
295 // including the global epoll fd of each poller. This allows gRPC to shutdown
296 // in the child process without interfering with connections or RPCs ongoing
297 // in the parent.
ResetEventManagerOnFork()298 void ResetEventManagerOnFork() {
299 // Delete all pending Epoll1EventHandles.
300 gpr_mu_lock(&fork_fd_list_mu);
301 while (fork_fd_list_head != nullptr) {
302 close(fork_fd_list_head->WrappedFd());
303 PollEventHandle* next = fork_fd_list_head->ForkFdListPos().next;
304 fork_fd_list_head->ForceRemoveHandleFromPoller();
305 delete fork_fd_list_head;
306 fork_fd_list_head = next;
307 }
308 // Delete all registered pollers.
309 while (!fork_poller_list.empty()) {
310 PollPoller* poller = fork_poller_list.front();
311 fork_poller_list.pop_front();
312 poller->Close();
313 }
314 gpr_mu_unlock(&fork_fd_list_mu);
315 InitPollPollerPosix();
316 }
317
318 // It is possible that GLIBC has epoll but the underlying kernel doesn't.
319 // Create epoll_fd to make sure epoll support is available
InitPollPollerPosix()320 bool InitPollPollerPosix() {
321 if (!grpc_event_engine::experimental::SupportsWakeupFd()) {
322 return false;
323 }
324 if (grpc_core::Fork::Enabled()) {
325 if (grpc_core::Fork::RegisterResetChildPollingEngineFunc(
326 ResetEventManagerOnFork)) {
327 gpr_mu_init(&fork_fd_list_mu);
328 }
329 }
330 return true;
331 }
332
333 } // namespace
334
CreateHandle(int fd,absl::string_view,bool track_err)335 EventHandle* PollPoller::CreateHandle(int fd, absl::string_view /*name*/,
336 bool track_err) {
337 // Avoid unused-parameter warning for debug-only parameter
338 (void)track_err;
339 DCHECK(track_err == false);
340 PollEventHandle* handle = new PollEventHandle(fd, shared_from_this());
341 ForkFdListAddHandle(handle);
342 // We need to send a kick to the thread executing Work(..) so that it can
343 // add this new Fd into the list of Fds to poll.
344 KickExternal(false);
345 return handle;
346 }
347
OrphanHandle(PosixEngineClosure * on_done,int * release_fd,absl::string_view)348 void PollEventHandle::OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
349 absl::string_view /*reason*/) {
350 ForkFdListRemoveHandle(this);
351 ForceRemoveHandleFromPoller();
352 {
353 grpc_core::ReleasableMutexLock lock(&mu_);
354 on_done_ = on_done;
355 released_ = release_fd != nullptr;
356 if (release_fd != nullptr) {
357 *release_fd = fd_;
358 }
359 CHECK(!is_orphaned_);
360 is_orphaned_ = true;
361 // Perform shutdown operations if not already done so.
362 if (!is_shutdown_) {
363 is_shutdown_ = true;
364 shutdown_error_ =
365 absl::Status(absl::StatusCode::kInternal, "FD Orphaned");
366 grpc_core::StatusSetInt(&shutdown_error_,
367 grpc_core::StatusIntProperty::kRpcStatus,
368 GRPC_STATUS_UNAVAILABLE);
369 SetReadyLocked(&read_closure_);
370 SetReadyLocked(&write_closure_);
371 }
372 // signal read/write closed to OS so that future operations fail.
373 if (!released_) {
374 shutdown(fd_, SHUT_RDWR);
375 }
376 if (!IsWatched()) {
377 CloseFd();
378 } else {
379 // It is watched i.e we cannot take action without breaking from the
380 // blocking poll. Mark it as Unwatched and kick the thread executing
381 // Work(...). That thread should proceed with the cleanup.
382 SetWatched(-1);
383 lock.Release();
384 poller_->KickExternal(false);
385 }
386 }
387 Unref();
388 }
389
NotifyOnLocked(PosixEngineClosure ** st,PosixEngineClosure * closure)390 int PollEventHandle::NotifyOnLocked(PosixEngineClosure** st,
391 PosixEngineClosure* closure) {
392 if (is_shutdown_ || pollhup_) {
393 closure->SetStatus(shutdown_error_);
394 scheduler_->Run(closure);
395 } else if (*st == reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)) {
396 // not ready ==> switch to a waiting state by setting the closure
397 *st = closure;
398 return 0;
399 } else if (*st == reinterpret_cast<PosixEngineClosure*>(kClosureReady)) {
400 // already ready ==> queue the closure to run immediately
401 *st = reinterpret_cast<PosixEngineClosure*>(kClosureNotReady);
402 closure->SetStatus(shutdown_error_);
403 scheduler_->Run(closure);
404 return 1;
405 } else {
406 // upcallptr was set to a different closure. This is an error!
407 grpc_core::Crash(
408 "User called a notify_on function with a previous callback still "
409 "pending");
410 }
411 return 0;
412 }
413
414 // returns 1 if state becomes not ready
SetReadyLocked(PosixEngineClosure ** st)415 int PollEventHandle::SetReadyLocked(PosixEngineClosure** st) {
416 if (*st == reinterpret_cast<PosixEngineClosure*>(kClosureReady)) {
417 // duplicate ready ==> ignore
418 return 0;
419 } else if (*st == reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)) {
420 // not ready, and not waiting ==> flag ready
421 *st = reinterpret_cast<PosixEngineClosure*>(kClosureReady);
422 return 0;
423 } else {
424 // waiting ==> queue closure
425 PosixEngineClosure* closure = *st;
426 *st = reinterpret_cast<PosixEngineClosure*>(kClosureNotReady);
427 closure->SetStatus(shutdown_error_);
428 scheduler_->Run(closure);
429 return 1;
430 }
431 }
432
ShutdownHandle(absl::Status why)433 void PollEventHandle::ShutdownHandle(absl::Status why) {
434 // We need to take a Ref here because SetReadyLocked may trigger execution
435 // of a closure which calls OrphanHandle or poller->Shutdown() prematurely.
436 Ref();
437 {
438 grpc_core::MutexLock lock(&mu_);
439 // only shutdown once
440 if (!is_shutdown_) {
441 is_shutdown_ = true;
442 shutdown_error_ = why;
443 grpc_core::StatusSetInt(&shutdown_error_,
444 grpc_core::StatusIntProperty::kRpcStatus,
445 GRPC_STATUS_UNAVAILABLE);
446 SetReadyLocked(&read_closure_);
447 SetReadyLocked(&write_closure_);
448 }
449 }
450 // For the Ref() taken at the beginning of this function.
451 Unref();
452 }
453
NotifyOnRead(PosixEngineClosure * on_read)454 void PollEventHandle::NotifyOnRead(PosixEngineClosure* on_read) {
455 // We need to take a Ref here because NotifyOnLocked may trigger execution
456 // of a closure which calls OrphanHandle that may delete this object or call
457 // poller->Shutdown() prematurely.
458 Ref();
459 {
460 grpc_core::ReleasableMutexLock lock(&mu_);
461 if (NotifyOnLocked(&read_closure_, on_read)) {
462 lock.Release();
463 // NotifyOnLocked immediately scheduled some closure. It would have set
464 // the closure state to NOT_READY. We need to wakeup the Work(...) thread
465 // to start polling on this fd. If this call is not made, it is possible
466 // that the poller will reach a state where all the fds under the
467 // poller's control are not polled for POLLIN/POLLOUT events thus leading
468 // to an indefinitely blocked Work(..) method.
469 poller_->KickExternal(false);
470 }
471 }
472 // For the Ref() taken at the beginning of this function.
473 Unref();
474 }
475
NotifyOnWrite(PosixEngineClosure * on_write)476 void PollEventHandle::NotifyOnWrite(PosixEngineClosure* on_write) {
477 // We need to take a Ref here because NotifyOnLocked may trigger execution
478 // of a closure which calls OrphanHandle that may delete this object or call
479 // poller->Shutdown() prematurely.
480 Ref();
481 {
482 grpc_core::ReleasableMutexLock lock(&mu_);
483 if (NotifyOnLocked(&write_closure_, on_write)) {
484 lock.Release();
485 // NotifyOnLocked immediately scheduled some closure. It would have set
486 // the closure state to NOT_READY. We need to wakeup the Work(...) thread
487 // to start polling on this fd. If this call is not made, it is possible
488 // that the poller will reach a state where all the fds under the
489 // poller's control are not polled for POLLIN/POLLOUT events thus leading
490 // to an indefinitely blocked Work(..) method.
491 poller_->KickExternal(false);
492 }
493 }
494 // For the Ref() taken at the beginning of this function.
495 Unref();
496 }
497
NotifyOnError(PosixEngineClosure * on_error)498 void PollEventHandle::NotifyOnError(PosixEngineClosure* on_error) {
499 on_error->SetStatus(
500 absl::Status(absl::StatusCode::kCancelled,
501 "Polling engine does not support tracking errors"));
502 scheduler_->Run(on_error);
503 }
504
SetReadable()505 void PollEventHandle::SetReadable() {
506 Ref();
507 {
508 grpc_core::MutexLock lock(&mu_);
509 SetReadyLocked(&read_closure_);
510 }
511 Unref();
512 }
513
SetWritable()514 void PollEventHandle::SetWritable() {
515 Ref();
516 {
517 grpc_core::MutexLock lock(&mu_);
518 SetReadyLocked(&write_closure_);
519 }
520 Unref();
521 }
522
SetHasError()523 void PollEventHandle::SetHasError() {}
524
BeginPollLocked(uint32_t read_mask,uint32_t write_mask)525 uint32_t PollEventHandle::BeginPollLocked(uint32_t read_mask,
526 uint32_t write_mask) {
527 uint32_t mask = 0;
528 bool read_ready = (pending_actions_ & 1UL);
529 bool write_ready = ((pending_actions_ >> 2) & 1UL);
530 Ref();
531 // If we are shutdown, then no need to poll this fd. Set watch_mask to 0.
532 if (is_shutdown_) {
533 SetWatched(0);
534 return 0;
535 }
536 // If there is nobody polling for read, but we need to, then start doing so.
537 if (read_mask && !read_ready &&
538 read_closure_ != reinterpret_cast<PosixEngineClosure*>(kClosureReady)) {
539 mask |= read_mask;
540 }
541
542 // If there is nobody polling for write, but we need to, then start doing so
543 if (write_mask && !write_ready &&
544 write_closure_ != reinterpret_cast<PosixEngineClosure*>(kClosureReady)) {
545 mask |= write_mask;
546 }
547 SetWatched(mask);
548 return mask;
549 }
550
EndPollLocked(bool got_read,bool got_write)551 bool PollEventHandle::EndPollLocked(bool got_read, bool got_write) {
552 if (is_orphaned_ && !IsWatched()) {
553 CloseFd();
554 } else if (!is_orphaned_) {
555 return SetPendingActions(got_read, got_write);
556 }
557 return false;
558 }
559
KickExternal(bool ext)560 void PollPoller::KickExternal(bool ext) {
561 grpc_core::MutexLock lock(&mu_);
562 if (closed_) {
563 return;
564 }
565 if (was_kicked_) {
566 if (ext) {
567 was_kicked_ext_ = true;
568 }
569 return;
570 }
571 was_kicked_ = true;
572 was_kicked_ext_ = ext;
573 CHECK(wakeup_fd_->Wakeup().ok());
574 }
575
Kick()576 void PollPoller::Kick() { KickExternal(true); }
577
PollerHandlesListAddHandle(PollEventHandle * handle)578 void PollPoller::PollerHandlesListAddHandle(PollEventHandle* handle) {
579 handle->PollerHandlesListPos().next = poll_handles_list_head_;
580 handle->PollerHandlesListPos().prev = nullptr;
581 if (poll_handles_list_head_ != nullptr) {
582 poll_handles_list_head_->PollerHandlesListPos().prev = handle;
583 }
584 poll_handles_list_head_ = handle;
585 ++num_poll_handles_;
586 }
587
PollerHandlesListRemoveHandle(PollEventHandle * handle)588 void PollPoller::PollerHandlesListRemoveHandle(PollEventHandle* handle) {
589 if (poll_handles_list_head_ == handle) {
590 poll_handles_list_head_ = handle->PollerHandlesListPos().next;
591 }
592 if (handle->PollerHandlesListPos().prev != nullptr) {
593 handle->PollerHandlesListPos().prev->PollerHandlesListPos().next =
594 handle->PollerHandlesListPos().next;
595 }
596 if (handle->PollerHandlesListPos().next != nullptr) {
597 handle->PollerHandlesListPos().next->PollerHandlesListPos().prev =
598 handle->PollerHandlesListPos().prev;
599 }
600 --num_poll_handles_;
601 }
602
PollPoller(Scheduler * scheduler)603 PollPoller::PollPoller(Scheduler* scheduler)
604 : scheduler_(scheduler),
605 use_phony_poll_(false),
606 was_kicked_(false),
607 was_kicked_ext_(false),
608 num_poll_handles_(0),
609 poll_handles_list_head_(nullptr),
610 closed_(false) {
611 wakeup_fd_ = *CreateWakeupFd();
612 CHECK(wakeup_fd_ != nullptr);
613 ForkPollerListAddPoller(this);
614 }
615
PollPoller(Scheduler * scheduler,bool use_phony_poll)616 PollPoller::PollPoller(Scheduler* scheduler, bool use_phony_poll)
617 : scheduler_(scheduler),
618 use_phony_poll_(use_phony_poll),
619 was_kicked_(false),
620 was_kicked_ext_(false),
621 num_poll_handles_(0),
622 poll_handles_list_head_(nullptr),
623 closed_(false) {
624 wakeup_fd_ = *CreateWakeupFd();
625 CHECK(wakeup_fd_ != nullptr);
626 ForkPollerListAddPoller(this);
627 }
628
~PollPoller()629 PollPoller::~PollPoller() {
630 // Assert that no active handles are present at the time of destruction.
631 // They should have been orphaned before reaching this state.
632 CHECK_EQ(num_poll_handles_, 0);
633 CHECK_EQ(poll_handles_list_head_, nullptr);
634 }
635
Work(EventEngine::Duration timeout,absl::FunctionRef<void ()> schedule_poll_again)636 Poller::WorkResult PollPoller::Work(
637 EventEngine::Duration timeout,
638 absl::FunctionRef<void()> schedule_poll_again) {
639 // Avoid malloc for small number of elements.
640 enum { inline_elements = 96 };
641 struct pollfd pollfd_space[inline_elements];
642 bool was_kicked_ext = false;
643 PollEventHandle* watcher_space[inline_elements];
644 Events pending_events;
645 pending_events.clear();
646 int timeout_ms =
647 static_cast<int>(grpc_event_engine::experimental::Milliseconds(timeout));
648 mu_.Lock();
649 // Start polling, and keep doing so while we're being asked to
650 // re-evaluate our pollers (this allows poll() based pollers to
651 // ensure they don't miss wakeups).
652 while (pending_events.empty() && timeout_ms >= 0) {
653 int r = 0;
654 size_t i;
655 nfds_t pfd_count;
656 struct pollfd* pfds;
657 PollEventHandle** watchers;
658 // Estimate start time for a poll iteration.
659 grpc_core::Timestamp start = grpc_core::Timestamp::FromTimespecRoundDown(
660 gpr_now(GPR_CLOCK_MONOTONIC));
661 if (num_poll_handles_ + 2 <= inline_elements) {
662 pfds = pollfd_space;
663 watchers = watcher_space;
664 } else {
665 const size_t pfd_size = sizeof(*pfds) * (num_poll_handles_ + 2);
666 const size_t watch_size = sizeof(*watchers) * (num_poll_handles_ + 2);
667 void* buf = gpr_malloc(pfd_size + watch_size);
668 pfds = static_cast<struct pollfd*>(buf);
669 watchers = static_cast<PollEventHandle**>(
670 static_cast<void*>((static_cast<char*>(buf) + pfd_size)));
671 pfds = static_cast<struct pollfd*>(buf);
672 }
673
674 pfd_count = 1;
675 pfds[0].fd = wakeup_fd_->ReadFd();
676 pfds[0].events = POLLIN;
677 pfds[0].revents = 0;
678 PollEventHandle* head = poll_handles_list_head_;
679 while (head != nullptr) {
680 {
681 grpc_core::MutexLock lock(head->mu());
682 // There shouldn't be any orphaned fds at this point. This is because
683 // prior to marking a handle as orphaned it is first removed from
684 // poll handle list for the poller under the poller lock.
685 CHECK(!head->IsOrphaned());
686 if (!head->IsPollhup()) {
687 pfds[pfd_count].fd = head->WrappedFd();
688 watchers[pfd_count] = head;
689 // BeginPollLocked takes a ref of the handle. It also marks the
690 // fd as Watched with an appropriate watch_mask. The watch_mask
691 // is 0 if the fd is shutdown or if the fd is already ready (i.e
692 // both read and write events are already available) and doesn't
693 // need to be polled again. The watch_mask is > 0 otherwise
694 // indicating the fd needs to be polled.
695 pfds[pfd_count].events = head->BeginPollLocked(POLLIN, POLLOUT);
696 pfd_count++;
697 }
698 }
699 head = head->PollerHandlesListPos().next;
700 }
701 mu_.Unlock();
702
703 if (!use_phony_poll_ || timeout_ms == 0 || pfd_count == 1) {
704 // If use_phony_poll is true and pfd_count == 1, it implies only the
705 // wakeup_fd is present. Allow the call to get blocked in this case as
706 // well instead of crashing. This is because the poller::Work is called
707 // right after an event enging is constructed. Even if phony poll is
708 // expected to be used, we dont want to check for it until some actual
709 // event handles are registered. Otherwise the EventEngine construction
710 // may crash.
711 r = poll(pfds, pfd_count, timeout_ms);
712 } else {
713 grpc_core::Crash("Attempted a blocking poll when declared non-polling.");
714 }
715
716 if (r <= 0) {
717 if (r < 0 && errno != EINTR) {
718 // Abort fail here.
719 grpc_core::Crash(absl::StrFormat(
720 "(event_engine) PollPoller:%p encountered poll error: %s", this,
721 grpc_core::StrError(errno).c_str()));
722 }
723
724 for (i = 1; i < pfd_count; i++) {
725 PollEventHandle* head = watchers[i];
726 int watch_mask;
727 grpc_core::ReleasableMutexLock lock(head->mu());
728 if (head->IsWatched(watch_mask)) {
729 head->SetWatched(-1);
730 // This fd was Watched with a watch mask > 0.
731 if (watch_mask > 0 && r < 0) {
732 // This case implies the fd was polled (since watch_mask > 0 and
733 // the poll returned an error. Mark the fds as both readable and
734 // writable.
735 if (head->EndPollLocked(true, true)) {
736 // Its safe to add to list of pending events because
737 // EndPollLocked returns true only when the handle is
738 // not orphaned. But an orphan might be initiated on the handle
739 // after this Work() method returns and before the next Work()
740 // method is invoked.
741 pending_events.push_back(head);
742 }
743 } else {
744 // In this case, (1) watch_mask > 0 && r == 0 or (2) watch_mask ==
745 // 0 and r < 0 or (3) watch_mask == 0 and r == 0. For case-1, no
746 // events are pending on the fd even though the fd was polled. For
747 // case-2 and 3, the fd was not polled
748 head->EndPollLocked(false, false);
749 }
750 } else {
751 // It can enter this case if an orphan was invoked on the handle
752 // while it was being polled.
753 head->EndPollLocked(false, false);
754 }
755 lock.Release();
756 // Unref the ref taken at BeginPollLocked.
757 head->Unref();
758 }
759 } else {
760 if (pfds[0].revents & kPollinCheck) {
761 CHECK(wakeup_fd_->ConsumeWakeup().ok());
762 }
763 for (i = 1; i < pfd_count; i++) {
764 PollEventHandle* head = watchers[i];
765 int watch_mask;
766 grpc_core::ReleasableMutexLock lock(head->mu());
767 if (!head->IsWatched(watch_mask) || watch_mask == 0) {
768 // IsWatched will be false if an orphan was invoked on the
769 // handle while it was being polled. If watch_mask is 0, then the fd
770 // was not polled.
771 head->SetWatched(-1);
772 head->EndPollLocked(false, false);
773 } else {
774 // Watched is true and watch_mask > 0
775 if (pfds[i].revents & POLLHUP) {
776 head->SetPollhup(true);
777 }
778 head->SetWatched(-1);
779 if (head->EndPollLocked(pfds[i].revents & kPollinCheck,
780 pfds[i].revents & kPolloutCheck)) {
781 // Its safe to add to list of pending events because EndPollLocked
782 // returns true only when the handle is not orphaned.
783 // But an orphan might be initiated on the handle after this
784 // Work() method returns and before the next Work() method is
785 // invoked.
786 pending_events.push_back(head);
787 }
788 }
789 lock.Release();
790 // Unref the ref taken at BeginPollLocked.
791 head->Unref();
792 }
793 }
794
795 if (pfds != pollfd_space) {
796 gpr_free(pfds);
797 }
798
799 // End of poll iteration. Update how much time is remaining.
800 timeout_ms -= PollElapsedTimeToMillis(start);
801 mu_.Lock();
802 if (std::exchange(was_kicked_, false) &&
803 std::exchange(was_kicked_ext_, false)) {
804 // External kick. Need to break out.
805 was_kicked_ext = true;
806 break;
807 }
808 }
809 mu_.Unlock();
810 if (pending_events.empty()) {
811 if (was_kicked_ext) {
812 return Poller::WorkResult::kKicked;
813 }
814 return Poller::WorkResult::kDeadlineExceeded;
815 }
816 // Run the provided callback synchronously.
817 schedule_poll_again();
818 // Process all pending events inline.
819 for (auto& it : pending_events) {
820 it->ExecutePendingActions();
821 }
822 return was_kicked_ext ? Poller::WorkResult::kKicked : Poller::WorkResult::kOk;
823 }
824
Shutdown()825 void PollPoller::Shutdown() { ForkPollerListRemovePoller(this); }
826
PrepareFork()827 void PollPoller::PrepareFork() { Kick(); }
828 // TODO(vigneshbabu): implement
PostforkParent()829 void PollPoller::PostforkParent() {}
830 // TODO(vigneshbabu): implement
PostforkChild()831 void PollPoller::PostforkChild() {}
832
Close()833 void PollPoller::Close() {
834 grpc_core::MutexLock lock(&mu_);
835 closed_ = true;
836 }
837
MakePollPoller(Scheduler * scheduler,bool use_phony_poll)838 std::shared_ptr<PollPoller> MakePollPoller(Scheduler* scheduler,
839 bool use_phony_poll) {
840 static bool kPollPollerSupported = InitPollPollerPosix();
841 if (kPollPollerSupported) {
842 return std::make_shared<PollPoller>(scheduler, use_phony_poll);
843 }
844 return nullptr;
845 }
846
847 } // namespace experimental
848 } // namespace grpc_event_engine
849
850 #else // GRPC_POSIX_SOCKET_EV_POLL
851
852 #include "src/core/util/crash.h"
853
854 namespace grpc_event_engine {
855 namespace experimental {
856
PollPoller(Scheduler *)857 PollPoller::PollPoller(Scheduler* /* engine */) {
858 grpc_core::Crash("unimplemented");
859 }
860
Shutdown()861 void PollPoller::Shutdown() { grpc_core::Crash("unimplemented"); }
862
~PollPoller()863 PollPoller::~PollPoller() { grpc_core::Crash("unimplemented"); }
864
CreateHandle(int,absl::string_view,bool)865 EventHandle* PollPoller::CreateHandle(int /*fd*/, absl::string_view /*name*/,
866 bool /*track_err*/) {
867 grpc_core::Crash("unimplemented");
868 }
869
Work(EventEngine::Duration,absl::FunctionRef<void ()>)870 Poller::WorkResult PollPoller::Work(
871 EventEngine::Duration /*timeout*/,
872 absl::FunctionRef<void()> /*schedule_poll_again*/) {
873 grpc_core::Crash("unimplemented");
874 }
875
Kick()876 void PollPoller::Kick() { grpc_core::Crash("unimplemented"); }
877
878 // If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
879 // nullptr.
MakePollPoller(Scheduler *,bool)880 std::shared_ptr<PollPoller> MakePollPoller(Scheduler* /*scheduler*/,
881 bool /* use_phony_poll */) {
882 return nullptr;
883 }
884
PrepareFork()885 void PollPoller::PrepareFork() { grpc_core::Crash("unimplemented"); }
PostforkParent()886 void PollPoller::PostforkParent() { grpc_core::Crash("unimplemented"); }
PostforkChild()887 void PollPoller::PostforkChild() { grpc_core::Crash("unimplemented"); }
888
Close()889 void PollPoller::Close() { grpc_core::Crash("unimplemented"); }
890
KickExternal(bool)891 void PollPoller::KickExternal(bool /*ext*/) {
892 grpc_core::Crash("unimplemented");
893 }
894
PollerHandlesListAddHandle(PollEventHandle *)895 void PollPoller::PollerHandlesListAddHandle(PollEventHandle* /*handle*/) {
896 grpc_core::Crash("unimplemented");
897 }
898
PollerHandlesListRemoveHandle(PollEventHandle *)899 void PollPoller::PollerHandlesListRemoveHandle(PollEventHandle* /*handle*/) {
900 grpc_core::Crash("unimplemented");
901 }
902
903 } // namespace experimental
904 } // namespace grpc_event_engine
905
906 #endif // GRPC_POSIX_SOCKET_EV_POLL
907