• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h"
15 
16 #include <grpc/event_engine/event_engine.h>
17 #include <grpc/status.h>
18 #include <grpc/support/port_platform.h>
19 #include <grpc/support/sync.h>
20 #include <stdint.h>
21 
22 #include <atomic>
23 #include <memory>
24 
25 #include "absl/log/check.h"
26 #include "absl/log/log.h"
27 #include "absl/status/status.h"
28 #include "absl/status/statusor.h"
29 #include "absl/strings/str_format.h"
30 #include "src/core/lib/event_engine/poller.h"
31 #include "src/core/lib/event_engine/time_util.h"
32 #include "src/core/lib/iomgr/port.h"
33 #include "src/core/util/crash.h"
34 
35 // This polling engine is only relevant on linux kernels supporting epoll
36 // epoll_create() or epoll_create1()
37 #ifdef GRPC_LINUX_EPOLL
38 #include <errno.h>
39 #include <limits.h>
40 #include <sys/epoll.h>
41 #include <sys/socket.h>
42 #include <unistd.h>
43 
44 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
45 #include "src/core/lib/event_engine/posix_engine/lockfree_event.h"
46 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
47 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
48 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h"
49 #include "src/core/util/fork.h"
50 #include "src/core/util/status_helper.h"
51 #include "src/core/util/strerror.h"
52 #include "src/core/util/sync.h"
53 
54 #define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
55 
56 namespace grpc_event_engine {
57 namespace experimental {
58 
59 class Epoll1EventHandle : public EventHandle {
60  public:
Epoll1EventHandle(int fd,Epoll1Poller * poller)61   Epoll1EventHandle(int fd, Epoll1Poller* poller)
62       : fd_(fd),
63         list_(this),
64         poller_(poller),
65         read_closure_(std::make_unique<LockfreeEvent>(poller->GetScheduler())),
66         write_closure_(std::make_unique<LockfreeEvent>(poller->GetScheduler())),
67         error_closure_(
68             std::make_unique<LockfreeEvent>(poller->GetScheduler())) {
69     read_closure_->InitEvent();
70     write_closure_->InitEvent();
71     error_closure_->InitEvent();
72     pending_read_.store(false, std::memory_order_relaxed);
73     pending_write_.store(false, std::memory_order_relaxed);
74     pending_error_.store(false, std::memory_order_relaxed);
75   }
ReInit(int fd)76   void ReInit(int fd) {
77     fd_ = fd;
78     read_closure_->InitEvent();
79     write_closure_->InitEvent();
80     error_closure_->InitEvent();
81     pending_read_.store(false, std::memory_order_relaxed);
82     pending_write_.store(false, std::memory_order_relaxed);
83     pending_error_.store(false, std::memory_order_relaxed);
84   }
Poller()85   Epoll1Poller* Poller() override { return poller_; }
SetPendingActions(bool pending_read,bool pending_write,bool pending_error)86   bool SetPendingActions(bool pending_read, bool pending_write,
87                          bool pending_error) {
88     // Another thread may be executing ExecutePendingActions() at this point
89     // This is possible for instance, if one instantiation of Work(..) sets
90     // an fd to be readable while the next instantiation of Work(...) may
91     // set the fd to be writable. While the second instantiation is running,
92     // ExecutePendingActions() of the first instantiation may execute in
93     // parallel and read the pending_<***>_ variables. So we need to use
94     // atomics to manipulate pending_<***>_ variables.
95 
96     if (pending_read) {
97       pending_read_.store(true, std::memory_order_release);
98     }
99 
100     if (pending_write) {
101       pending_write_.store(true, std::memory_order_release);
102     }
103 
104     if (pending_error) {
105       pending_error_.store(true, std::memory_order_release);
106     }
107 
108     return pending_read || pending_write || pending_error;
109   }
WrappedFd()110   int WrappedFd() override { return fd_; }
111   void OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
112                     absl::string_view reason) override;
113   void ShutdownHandle(absl::Status why) override;
114   void NotifyOnRead(PosixEngineClosure* on_read) override;
115   void NotifyOnWrite(PosixEngineClosure* on_write) override;
116   void NotifyOnError(PosixEngineClosure* on_error) override;
117   void SetReadable() override;
118   void SetWritable() override;
119   void SetHasError() override;
120   bool IsHandleShutdown() override;
ExecutePendingActions()121   inline void ExecutePendingActions() {
122     // These may execute in Parallel with ShutdownHandle. Thats not an issue
123     // because the lockfree event implementation should be able to handle it.
124     if (pending_read_.exchange(false, std::memory_order_acq_rel)) {
125       read_closure_->SetReady();
126     }
127     if (pending_write_.exchange(false, std::memory_order_acq_rel)) {
128       write_closure_->SetReady();
129     }
130     if (pending_error_.exchange(false, std::memory_order_acq_rel)) {
131       error_closure_->SetReady();
132     }
133   }
mu()134   grpc_core::Mutex* mu() { return &mu_; }
ReadClosure()135   LockfreeEvent* ReadClosure() { return read_closure_.get(); }
WriteClosure()136   LockfreeEvent* WriteClosure() { return write_closure_.get(); }
ErrorClosure()137   LockfreeEvent* ErrorClosure() { return error_closure_.get(); }
ForkFdListPos()138   Epoll1Poller::HandlesList& ForkFdListPos() { return list_; }
139   ~Epoll1EventHandle() override = default;
140 
141  private:
142   void HandleShutdownInternal(absl::Status why, bool releasing_fd);
143   // See Epoll1Poller::ShutdownHandle for explanation on why a mutex is
144   // required.
145   grpc_core::Mutex mu_;
146   int fd_;
147   // See Epoll1Poller::SetPendingActions for explanation on why pending_<***>_
148   // need to be atomic.
149   std::atomic<bool> pending_read_{false};
150   std::atomic<bool> pending_write_{false};
151   std::atomic<bool> pending_error_{false};
152   Epoll1Poller::HandlesList list_;
153   Epoll1Poller* poller_;
154   std::unique_ptr<LockfreeEvent> read_closure_;
155   std::unique_ptr<LockfreeEvent> write_closure_;
156   std::unique_ptr<LockfreeEvent> error_closure_;
157 };
158 
159 namespace {
160 
EpollCreateAndCloexec()161 int EpollCreateAndCloexec() {
162 #ifdef GRPC_LINUX_EPOLL_CREATE1
163   int fd = epoll_create1(EPOLL_CLOEXEC);
164   if (fd < 0) {
165     LOG(ERROR) << "epoll_create1 unavailable";
166   }
167 #else
168   int fd = epoll_create(MAX_EPOLL_EVENTS);
169   if (fd < 0) {
170     LOG(ERROR) << "epoll_create unavailable";
171   } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
172     LOG(ERROR) << "fcntl following epoll_create failed";
173     return -1;
174   }
175 #endif
176   return fd;
177 }
178 
179 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
180 std::list<Epoll1Poller*> fork_poller_list;
181 
182 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
183 Epoll1EventHandle* fork_fd_list_head = nullptr;
184 gpr_mu fork_fd_list_mu;
185 
ForkFdListAddHandle(Epoll1EventHandle * handle)186 void ForkFdListAddHandle(Epoll1EventHandle* handle) {
187   if (grpc_core::Fork::Enabled()) {
188     gpr_mu_lock(&fork_fd_list_mu);
189     handle->ForkFdListPos().next = fork_fd_list_head;
190     handle->ForkFdListPos().prev = nullptr;
191     if (fork_fd_list_head != nullptr) {
192       fork_fd_list_head->ForkFdListPos().prev = handle;
193     }
194     fork_fd_list_head = handle;
195     gpr_mu_unlock(&fork_fd_list_mu);
196   }
197 }
198 
ForkFdListRemoveHandle(Epoll1EventHandle * handle)199 void ForkFdListRemoveHandle(Epoll1EventHandle* handle) {
200   if (grpc_core::Fork::Enabled()) {
201     gpr_mu_lock(&fork_fd_list_mu);
202     if (fork_fd_list_head == handle) {
203       fork_fd_list_head = handle->ForkFdListPos().next;
204     }
205     if (handle->ForkFdListPos().prev != nullptr) {
206       handle->ForkFdListPos().prev->ForkFdListPos().next =
207           handle->ForkFdListPos().next;
208     }
209     if (handle->ForkFdListPos().next != nullptr) {
210       handle->ForkFdListPos().next->ForkFdListPos().prev =
211           handle->ForkFdListPos().prev;
212     }
213     gpr_mu_unlock(&fork_fd_list_mu);
214   }
215 }
216 
ForkPollerListAddPoller(Epoll1Poller * poller)217 void ForkPollerListAddPoller(Epoll1Poller* poller) {
218   if (grpc_core::Fork::Enabled()) {
219     gpr_mu_lock(&fork_fd_list_mu);
220     fork_poller_list.push_back(poller);
221     gpr_mu_unlock(&fork_fd_list_mu);
222   }
223 }
224 
ForkPollerListRemovePoller(Epoll1Poller * poller)225 void ForkPollerListRemovePoller(Epoll1Poller* poller) {
226   if (grpc_core::Fork::Enabled()) {
227     gpr_mu_lock(&fork_fd_list_mu);
228     fork_poller_list.remove(poller);
229     gpr_mu_unlock(&fork_fd_list_mu);
230   }
231 }
232 
233 bool InitEpoll1PollerLinux();
234 
235 // Called by the child process's post-fork handler to close open fds,
236 // including the global epoll fd of each poller. This allows gRPC to shutdown in
237 // the child process without interfering with connections or RPCs ongoing in the
238 // parent.
ResetEventManagerOnFork()239 void ResetEventManagerOnFork() {
240   // Delete all pending Epoll1EventHandles.
241   gpr_mu_lock(&fork_fd_list_mu);
242   while (fork_fd_list_head != nullptr) {
243     close(fork_fd_list_head->WrappedFd());
244     Epoll1EventHandle* next = fork_fd_list_head->ForkFdListPos().next;
245     delete fork_fd_list_head;
246     fork_fd_list_head = next;
247   }
248   // Delete all registered pollers. This also closes all open epoll_sets
249   while (!fork_poller_list.empty()) {
250     Epoll1Poller* poller = fork_poller_list.front();
251     fork_poller_list.pop_front();
252     poller->Close();
253   }
254   gpr_mu_unlock(&fork_fd_list_mu);
255   InitEpoll1PollerLinux();
256 }
257 
258 // It is possible that GLIBC has epoll but the underlying kernel doesn't.
259 // Create epoll_fd to make sure epoll support is available
InitEpoll1PollerLinux()260 bool InitEpoll1PollerLinux() {
261   if (!grpc_event_engine::experimental::SupportsWakeupFd()) {
262     return false;
263   }
264   int fd = EpollCreateAndCloexec();
265   if (fd <= 0) {
266     return false;
267   }
268   if (grpc_core::Fork::Enabled()) {
269     if (grpc_core::Fork::RegisterResetChildPollingEngineFunc(
270             ResetEventManagerOnFork)) {
271       gpr_mu_init(&fork_fd_list_mu);
272     }
273   }
274   close(fd);
275   return true;
276 }
277 
278 }  // namespace
279 
OrphanHandle(PosixEngineClosure * on_done,int * release_fd,absl::string_view reason)280 void Epoll1EventHandle::OrphanHandle(PosixEngineClosure* on_done,
281                                      int* release_fd,
282                                      absl::string_view reason) {
283   bool is_release_fd = (release_fd != nullptr);
284   bool was_shutdown = false;
285   if (!read_closure_->IsShutdown()) {
286     was_shutdown = true;
287     HandleShutdownInternal(absl::Status(absl::StatusCode::kUnknown, reason),
288                            is_release_fd);
289   }
290 
291   // If release_fd is not NULL, we should be relinquishing control of the file
292   // descriptor fd->fd (but we still own the grpc_fd structure).
293   if (is_release_fd) {
294     if (!was_shutdown) {
295       epoll_event phony_event;
296       if (epoll_ctl(poller_->g_epoll_set_.epfd, EPOLL_CTL_DEL, fd_,
297                     &phony_event) != 0) {
298         LOG(ERROR) << "OrphanHandle: epoll_ctl failed: "
299                    << grpc_core::StrError(errno);
300       }
301     }
302     *release_fd = fd_;
303   } else {
304     shutdown(fd_, SHUT_RDWR);
305     close(fd_);
306   }
307 
308   ForkFdListRemoveHandle(this);
309   {
310     // See Epoll1Poller::ShutdownHandle for explanation on why a mutex is
311     // required here.
312     grpc_core::MutexLock lock(&mu_);
313     read_closure_->DestroyEvent();
314     write_closure_->DestroyEvent();
315     error_closure_->DestroyEvent();
316   }
317   pending_read_.store(false, std::memory_order_release);
318   pending_write_.store(false, std::memory_order_release);
319   pending_error_.store(false, std::memory_order_release);
320   {
321     grpc_core::MutexLock lock(&poller_->mu_);
322     poller_->free_epoll1_handles_list_.push_back(this);
323   }
324   if (on_done != nullptr) {
325     on_done->SetStatus(absl::OkStatus());
326     poller_->GetScheduler()->Run(on_done);
327   }
328 }
329 
330 // if 'releasing_fd' is true, it means that we are going to detach the internal
331 // fd from grpc_fd structure (i.e which means we should not be calling
332 // shutdown() syscall on that fd)
HandleShutdownInternal(absl::Status why,bool releasing_fd)333 void Epoll1EventHandle::HandleShutdownInternal(absl::Status why,
334                                                bool releasing_fd) {
335   grpc_core::StatusSetInt(&why, grpc_core::StatusIntProperty::kRpcStatus,
336                           GRPC_STATUS_UNAVAILABLE);
337   if (read_closure_->SetShutdown(why)) {
338     if (releasing_fd) {
339       epoll_event phony_event;
340       if (epoll_ctl(poller_->g_epoll_set_.epfd, EPOLL_CTL_DEL, fd_,
341                     &phony_event) != 0) {
342         LOG(ERROR) << "HandleShutdownInternal: epoll_ctl failed: "
343                    << grpc_core::StrError(errno);
344       }
345     }
346     write_closure_->SetShutdown(why);
347     error_closure_->SetShutdown(why);
348   }
349 }
350 
Epoll1Poller(Scheduler * scheduler)351 Epoll1Poller::Epoll1Poller(Scheduler* scheduler)
352     : scheduler_(scheduler), was_kicked_(false), closed_(false) {
353   g_epoll_set_.epfd = EpollCreateAndCloexec();
354   wakeup_fd_ = *CreateWakeupFd();
355   CHECK(wakeup_fd_ != nullptr);
356   CHECK_GE(g_epoll_set_.epfd, 0);
357   GRPC_TRACE_LOG(event_engine_poller, INFO)
358       << "grpc epoll fd: " << g_epoll_set_.epfd;
359   struct epoll_event ev{};
360   ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
361   ev.data.ptr = wakeup_fd_.get();
362   CHECK(epoll_ctl(g_epoll_set_.epfd, EPOLL_CTL_ADD, wakeup_fd_->ReadFd(),
363                   &ev) == 0);
364   g_epoll_set_.num_events = 0;
365   g_epoll_set_.cursor = 0;
366   ForkPollerListAddPoller(this);
367 }
368 
Shutdown()369 void Epoll1Poller::Shutdown() { ForkPollerListRemovePoller(this); }
370 
Close()371 void Epoll1Poller::Close() {
372   grpc_core::MutexLock lock(&mu_);
373   if (closed_) return;
374 
375   if (g_epoll_set_.epfd >= 0) {
376     close(g_epoll_set_.epfd);
377     g_epoll_set_.epfd = -1;
378   }
379 
380   while (!free_epoll1_handles_list_.empty()) {
381     Epoll1EventHandle* handle =
382         reinterpret_cast<Epoll1EventHandle*>(free_epoll1_handles_list_.front());
383     free_epoll1_handles_list_.pop_front();
384     delete handle;
385   }
386   closed_ = true;
387 }
388 
~Epoll1Poller()389 Epoll1Poller::~Epoll1Poller() { Close(); }
390 
CreateHandle(int fd,absl::string_view,bool track_err)391 EventHandle* Epoll1Poller::CreateHandle(int fd, absl::string_view /*name*/,
392                                         bool track_err) {
393   Epoll1EventHandle* new_handle = nullptr;
394   {
395     grpc_core::MutexLock lock(&mu_);
396     if (free_epoll1_handles_list_.empty()) {
397       new_handle = new Epoll1EventHandle(fd, this);
398     } else {
399       new_handle = reinterpret_cast<Epoll1EventHandle*>(
400           free_epoll1_handles_list_.front());
401       free_epoll1_handles_list_.pop_front();
402       new_handle->ReInit(fd);
403     }
404   }
405   ForkFdListAddHandle(new_handle);
406   struct epoll_event ev;
407   ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
408   // Use the least significant bit of ev.data.ptr to store track_err. We expect
409   // the addresses to be word aligned. We need to store track_err to avoid
410   // synchronization issues when accessing it after receiving an event.
411   // Accessing fd would be a data race there because the fd might have been
412   // returned to the free list at that point.
413   ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_handle) |
414                                         (track_err ? 1 : 0));
415   if (epoll_ctl(g_epoll_set_.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
416     LOG(ERROR) << "epoll_ctl failed: " << grpc_core::StrError(errno);
417   }
418 
419   return new_handle;
420 }
421 
422 // Process the epoll events found by DoEpollWait() function.
423 // - g_epoll_set.cursor points to the index of the first event to be processed
424 // - This function then processes up-to max_epoll_events_to_handle and
425 //   updates the g_epoll_set.cursor.
426 // It returns true, it there was a Kick that forced invocation of this
427 // function. It also returns the list of closures to run to take action
428 // on file descriptors that became readable/writable.
ProcessEpollEvents(int max_epoll_events_to_handle,Events & pending_events)429 bool Epoll1Poller::ProcessEpollEvents(int max_epoll_events_to_handle,
430                                       Events& pending_events) {
431   int64_t num_events = g_epoll_set_.num_events;
432   int64_t cursor = g_epoll_set_.cursor;
433   bool was_kicked = false;
434   for (int idx = 0; (idx < max_epoll_events_to_handle) && cursor != num_events;
435        idx++) {
436     int64_t c = cursor++;
437     struct epoll_event* ev = &g_epoll_set_.events[c];
438     void* data_ptr = ev->data.ptr;
439     if (data_ptr == wakeup_fd_.get()) {
440       CHECK(wakeup_fd_->ConsumeWakeup().ok());
441       was_kicked = true;
442     } else {
443       Epoll1EventHandle* handle = reinterpret_cast<Epoll1EventHandle*>(
444           reinterpret_cast<intptr_t>(data_ptr) & ~intptr_t{1});
445       bool track_err = reinterpret_cast<intptr_t>(data_ptr) & intptr_t{1};
446       bool cancel = (ev->events & EPOLLHUP) != 0;
447       bool error = (ev->events & EPOLLERR) != 0;
448       bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
449       bool write_ev = (ev->events & EPOLLOUT) != 0;
450       bool err_fallback = error && !track_err;
451       if (handle->SetPendingActions(read_ev || cancel || err_fallback,
452                                     write_ev || cancel || err_fallback,
453                                     error && !err_fallback)) {
454         pending_events.push_back(handle);
455       }
456     }
457   }
458   g_epoll_set_.cursor = cursor;
459   return was_kicked;
460 }
461 
462 //  Do epoll_wait and store the events in g_epoll_set.events field. This does
463 //  not "process" any of the events yet; that is done in ProcessEpollEvents().
464 //  See ProcessEpollEvents() function for more details. It returns the number
465 // of events generated by epoll_wait.
DoEpollWait(EventEngine::Duration timeout)466 int Epoll1Poller::DoEpollWait(EventEngine::Duration timeout) {
467   int r;
468   do {
469     r = epoll_wait(g_epoll_set_.epfd, g_epoll_set_.events, MAX_EPOLL_EVENTS,
470                    static_cast<int>(
471                        grpc_event_engine::experimental::Milliseconds(timeout)));
472   } while (r < 0 && errno == EINTR);
473   if (r < 0) {
474     grpc_core::Crash(absl::StrFormat(
475         "(event_engine) Epoll1Poller:%p encountered epoll_wait error: %s", this,
476         grpc_core::StrError(errno).c_str()));
477   }
478   g_epoll_set_.num_events = r;
479   g_epoll_set_.cursor = 0;
480   return r;
481 }
482 
483 // Might be called multiple times
ShutdownHandle(absl::Status why)484 void Epoll1EventHandle::ShutdownHandle(absl::Status why) {
485   // A mutex is required here because, the SetShutdown method of the
486   // lockfree event may schedule a closure if it is already ready and that
487   // closure may call OrphanHandle. Execution of ShutdownHandle and OrphanHandle
488   // in parallel is not safe because some of the lockfree event types e.g, read,
489   // write, error may-not have called SetShutdown when DestroyEvent gets
490   // called in the OrphanHandle method.
491   grpc_core::MutexLock lock(&mu_);
492   HandleShutdownInternal(why, false);
493 }
494 
IsHandleShutdown()495 bool Epoll1EventHandle::IsHandleShutdown() {
496   return read_closure_->IsShutdown();
497 }
498 
NotifyOnRead(PosixEngineClosure * on_read)499 void Epoll1EventHandle::NotifyOnRead(PosixEngineClosure* on_read) {
500   read_closure_->NotifyOn(on_read);
501 }
502 
NotifyOnWrite(PosixEngineClosure * on_write)503 void Epoll1EventHandle::NotifyOnWrite(PosixEngineClosure* on_write) {
504   write_closure_->NotifyOn(on_write);
505 }
506 
NotifyOnError(PosixEngineClosure * on_error)507 void Epoll1EventHandle::NotifyOnError(PosixEngineClosure* on_error) {
508   error_closure_->NotifyOn(on_error);
509 }
510 
SetReadable()511 void Epoll1EventHandle::SetReadable() { read_closure_->SetReady(); }
512 
SetWritable()513 void Epoll1EventHandle::SetWritable() { write_closure_->SetReady(); }
514 
SetHasError()515 void Epoll1EventHandle::SetHasError() { error_closure_->SetReady(); }
516 
517 // Polls the registered Fds for events until timeout is reached or there is a
518 // Kick(). If there is a Kick(), it collects and processes any previously
519 // un-processed events. If there are no un-processed events, it returns
520 // Poller::WorkResult::Kicked{}
Work(EventEngine::Duration timeout,absl::FunctionRef<void ()> schedule_poll_again)521 Poller::WorkResult Epoll1Poller::Work(
522     EventEngine::Duration timeout,
523     absl::FunctionRef<void()> schedule_poll_again) {
524   Events pending_events;
525   bool was_kicked_ext = false;
526   if (g_epoll_set_.cursor == g_epoll_set_.num_events) {
527     if (DoEpollWait(timeout) == 0) {
528       return Poller::WorkResult::kDeadlineExceeded;
529     }
530   }
531   {
532     grpc_core::MutexLock lock(&mu_);
533     // If was_kicked_ is true, collect all pending events in this iteration.
534     if (ProcessEpollEvents(
535             was_kicked_ ? INT_MAX : MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION,
536             pending_events)) {
537       was_kicked_ = false;
538       was_kicked_ext = true;
539     }
540     if (pending_events.empty()) {
541       return Poller::WorkResult::kKicked;
542     }
543   }
544   // Run the provided callback.
545   schedule_poll_again();
546   // Process all pending events inline.
547   for (auto& it : pending_events) {
548     it->ExecutePendingActions();
549   }
550   return was_kicked_ext ? Poller::WorkResult::kKicked : Poller::WorkResult::kOk;
551 }
552 
Kick()553 void Epoll1Poller::Kick() {
554   grpc_core::MutexLock lock(&mu_);
555   if (was_kicked_ || closed_) {
556     return;
557   }
558   was_kicked_ = true;
559   CHECK(wakeup_fd_->Wakeup().ok());
560 }
561 
MakeEpoll1Poller(Scheduler * scheduler)562 std::shared_ptr<Epoll1Poller> MakeEpoll1Poller(Scheduler* scheduler) {
563   static bool kEpoll1PollerSupported = InitEpoll1PollerLinux();
564   if (kEpoll1PollerSupported) {
565     return std::make_shared<Epoll1Poller>(scheduler);
566   }
567   return nullptr;
568 }
569 
PrepareFork()570 void Epoll1Poller::PrepareFork() { Kick(); }
571 
572 // TODO(vigneshbabu): implement
PostforkParent()573 void Epoll1Poller::PostforkParent() {}
574 
575 // TODO(vigneshbabu): implement
PostforkChild()576 void Epoll1Poller::PostforkChild() {}
577 
578 }  // namespace experimental
579 }  // namespace grpc_event_engine
580 
581 #else  // defined(GRPC_LINUX_EPOLL)
582 #if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
583 
584 namespace grpc_event_engine {
585 namespace experimental {
586 
587 using ::grpc_event_engine::experimental::EventEngine;
588 using ::grpc_event_engine::experimental::Poller;
589 
Epoll1Poller(Scheduler *)590 Epoll1Poller::Epoll1Poller(Scheduler* /* engine */) {
591   grpc_core::Crash("unimplemented");
592 }
593 
Shutdown()594 void Epoll1Poller::Shutdown() { grpc_core::Crash("unimplemented"); }
595 
~Epoll1Poller()596 Epoll1Poller::~Epoll1Poller() { grpc_core::Crash("unimplemented"); }
597 
CreateHandle(int,absl::string_view,bool)598 EventHandle* Epoll1Poller::CreateHandle(int /*fd*/, absl::string_view /*name*/,
599                                         bool /*track_err*/) {
600   grpc_core::Crash("unimplemented");
601 }
602 
ProcessEpollEvents(int,Events &)603 bool Epoll1Poller::ProcessEpollEvents(int /*max_epoll_events_to_handle*/,
604                                       Events& /*pending_events*/) {
605   grpc_core::Crash("unimplemented");
606 }
607 
DoEpollWait(EventEngine::Duration)608 int Epoll1Poller::DoEpollWait(EventEngine::Duration /*timeout*/) {
609   grpc_core::Crash("unimplemented");
610 }
611 
Work(EventEngine::Duration,absl::FunctionRef<void ()>)612 Poller::WorkResult Epoll1Poller::Work(
613     EventEngine::Duration /*timeout*/,
614     absl::FunctionRef<void()> /*schedule_poll_again*/) {
615   grpc_core::Crash("unimplemented");
616 }
617 
Kick()618 void Epoll1Poller::Kick() { grpc_core::Crash("unimplemented"); }
619 
620 // If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
621 // nullptr.
MakeEpoll1Poller(Scheduler *)622 std::shared_ptr<Epoll1Poller> MakeEpoll1Poller(Scheduler* /*scheduler*/) {
623   return nullptr;
624 }
625 
PrepareFork()626 void Epoll1Poller::PrepareFork() {}
627 
PostforkParent()628 void Epoll1Poller::PostforkParent() {}
629 
PostforkChild()630 void Epoll1Poller::PostforkChild() {}
631 
632 }  // namespace experimental
633 }  // namespace grpc_event_engine
634 
635 #endif  // defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
636 #endif  // !defined(GRPC_LINUX_EPOLL)
637