1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h"
15
16 #include <grpc/event_engine/event_engine.h>
17 #include <grpc/status.h>
18 #include <grpc/support/port_platform.h>
19 #include <grpc/support/sync.h>
20 #include <stdint.h>
21
22 #include <atomic>
23 #include <memory>
24
25 #include "absl/log/check.h"
26 #include "absl/log/log.h"
27 #include "absl/status/status.h"
28 #include "absl/status/statusor.h"
29 #include "absl/strings/str_format.h"
30 #include "src/core/lib/event_engine/poller.h"
31 #include "src/core/lib/event_engine/time_util.h"
32 #include "src/core/lib/iomgr/port.h"
33 #include "src/core/util/crash.h"
34
35 // This polling engine is only relevant on linux kernels supporting epoll
36 // epoll_create() or epoll_create1()
37 #ifdef GRPC_LINUX_EPOLL
38 #include <errno.h>
39 #include <limits.h>
40 #include <sys/epoll.h>
41 #include <sys/socket.h>
42 #include <unistd.h>
43
44 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
45 #include "src/core/lib/event_engine/posix_engine/lockfree_event.h"
46 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
47 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
48 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h"
49 #include "src/core/util/fork.h"
50 #include "src/core/util/status_helper.h"
51 #include "src/core/util/strerror.h"
52 #include "src/core/util/sync.h"
53
54 #define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
55
56 namespace grpc_event_engine {
57 namespace experimental {
58
59 class Epoll1EventHandle : public EventHandle {
60 public:
Epoll1EventHandle(int fd,Epoll1Poller * poller)61 Epoll1EventHandle(int fd, Epoll1Poller* poller)
62 : fd_(fd),
63 list_(this),
64 poller_(poller),
65 read_closure_(std::make_unique<LockfreeEvent>(poller->GetScheduler())),
66 write_closure_(std::make_unique<LockfreeEvent>(poller->GetScheduler())),
67 error_closure_(
68 std::make_unique<LockfreeEvent>(poller->GetScheduler())) {
69 read_closure_->InitEvent();
70 write_closure_->InitEvent();
71 error_closure_->InitEvent();
72 pending_read_.store(false, std::memory_order_relaxed);
73 pending_write_.store(false, std::memory_order_relaxed);
74 pending_error_.store(false, std::memory_order_relaxed);
75 }
ReInit(int fd)76 void ReInit(int fd) {
77 fd_ = fd;
78 read_closure_->InitEvent();
79 write_closure_->InitEvent();
80 error_closure_->InitEvent();
81 pending_read_.store(false, std::memory_order_relaxed);
82 pending_write_.store(false, std::memory_order_relaxed);
83 pending_error_.store(false, std::memory_order_relaxed);
84 }
Poller()85 Epoll1Poller* Poller() override { return poller_; }
SetPendingActions(bool pending_read,bool pending_write,bool pending_error)86 bool SetPendingActions(bool pending_read, bool pending_write,
87 bool pending_error) {
88 // Another thread may be executing ExecutePendingActions() at this point
89 // This is possible for instance, if one instantiation of Work(..) sets
90 // an fd to be readable while the next instantiation of Work(...) may
91 // set the fd to be writable. While the second instantiation is running,
92 // ExecutePendingActions() of the first instantiation may execute in
93 // parallel and read the pending_<***>_ variables. So we need to use
94 // atomics to manipulate pending_<***>_ variables.
95
96 if (pending_read) {
97 pending_read_.store(true, std::memory_order_release);
98 }
99
100 if (pending_write) {
101 pending_write_.store(true, std::memory_order_release);
102 }
103
104 if (pending_error) {
105 pending_error_.store(true, std::memory_order_release);
106 }
107
108 return pending_read || pending_write || pending_error;
109 }
WrappedFd()110 int WrappedFd() override { return fd_; }
111 void OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
112 absl::string_view reason) override;
113 void ShutdownHandle(absl::Status why) override;
114 void NotifyOnRead(PosixEngineClosure* on_read) override;
115 void NotifyOnWrite(PosixEngineClosure* on_write) override;
116 void NotifyOnError(PosixEngineClosure* on_error) override;
117 void SetReadable() override;
118 void SetWritable() override;
119 void SetHasError() override;
120 bool IsHandleShutdown() override;
ExecutePendingActions()121 inline void ExecutePendingActions() {
122 // These may execute in Parallel with ShutdownHandle. Thats not an issue
123 // because the lockfree event implementation should be able to handle it.
124 if (pending_read_.exchange(false, std::memory_order_acq_rel)) {
125 read_closure_->SetReady();
126 }
127 if (pending_write_.exchange(false, std::memory_order_acq_rel)) {
128 write_closure_->SetReady();
129 }
130 if (pending_error_.exchange(false, std::memory_order_acq_rel)) {
131 error_closure_->SetReady();
132 }
133 }
mu()134 grpc_core::Mutex* mu() { return &mu_; }
ReadClosure()135 LockfreeEvent* ReadClosure() { return read_closure_.get(); }
WriteClosure()136 LockfreeEvent* WriteClosure() { return write_closure_.get(); }
ErrorClosure()137 LockfreeEvent* ErrorClosure() { return error_closure_.get(); }
ForkFdListPos()138 Epoll1Poller::HandlesList& ForkFdListPos() { return list_; }
139 ~Epoll1EventHandle() override = default;
140
141 private:
142 void HandleShutdownInternal(absl::Status why, bool releasing_fd);
143 // See Epoll1Poller::ShutdownHandle for explanation on why a mutex is
144 // required.
145 grpc_core::Mutex mu_;
146 int fd_;
147 // See Epoll1Poller::SetPendingActions for explanation on why pending_<***>_
148 // need to be atomic.
149 std::atomic<bool> pending_read_{false};
150 std::atomic<bool> pending_write_{false};
151 std::atomic<bool> pending_error_{false};
152 Epoll1Poller::HandlesList list_;
153 Epoll1Poller* poller_;
154 std::unique_ptr<LockfreeEvent> read_closure_;
155 std::unique_ptr<LockfreeEvent> write_closure_;
156 std::unique_ptr<LockfreeEvent> error_closure_;
157 };
158
159 namespace {
160
EpollCreateAndCloexec()161 int EpollCreateAndCloexec() {
162 #ifdef GRPC_LINUX_EPOLL_CREATE1
163 int fd = epoll_create1(EPOLL_CLOEXEC);
164 if (fd < 0) {
165 LOG(ERROR) << "epoll_create1 unavailable";
166 }
167 #else
168 int fd = epoll_create(MAX_EPOLL_EVENTS);
169 if (fd < 0) {
170 LOG(ERROR) << "epoll_create unavailable";
171 } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
172 LOG(ERROR) << "fcntl following epoll_create failed";
173 return -1;
174 }
175 #endif
176 return fd;
177 }
178
179 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
180 std::list<Epoll1Poller*> fork_poller_list;
181
182 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
183 Epoll1EventHandle* fork_fd_list_head = nullptr;
184 gpr_mu fork_fd_list_mu;
185
ForkFdListAddHandle(Epoll1EventHandle * handle)186 void ForkFdListAddHandle(Epoll1EventHandle* handle) {
187 if (grpc_core::Fork::Enabled()) {
188 gpr_mu_lock(&fork_fd_list_mu);
189 handle->ForkFdListPos().next = fork_fd_list_head;
190 handle->ForkFdListPos().prev = nullptr;
191 if (fork_fd_list_head != nullptr) {
192 fork_fd_list_head->ForkFdListPos().prev = handle;
193 }
194 fork_fd_list_head = handle;
195 gpr_mu_unlock(&fork_fd_list_mu);
196 }
197 }
198
ForkFdListRemoveHandle(Epoll1EventHandle * handle)199 void ForkFdListRemoveHandle(Epoll1EventHandle* handle) {
200 if (grpc_core::Fork::Enabled()) {
201 gpr_mu_lock(&fork_fd_list_mu);
202 if (fork_fd_list_head == handle) {
203 fork_fd_list_head = handle->ForkFdListPos().next;
204 }
205 if (handle->ForkFdListPos().prev != nullptr) {
206 handle->ForkFdListPos().prev->ForkFdListPos().next =
207 handle->ForkFdListPos().next;
208 }
209 if (handle->ForkFdListPos().next != nullptr) {
210 handle->ForkFdListPos().next->ForkFdListPos().prev =
211 handle->ForkFdListPos().prev;
212 }
213 gpr_mu_unlock(&fork_fd_list_mu);
214 }
215 }
216
ForkPollerListAddPoller(Epoll1Poller * poller)217 void ForkPollerListAddPoller(Epoll1Poller* poller) {
218 if (grpc_core::Fork::Enabled()) {
219 gpr_mu_lock(&fork_fd_list_mu);
220 fork_poller_list.push_back(poller);
221 gpr_mu_unlock(&fork_fd_list_mu);
222 }
223 }
224
ForkPollerListRemovePoller(Epoll1Poller * poller)225 void ForkPollerListRemovePoller(Epoll1Poller* poller) {
226 if (grpc_core::Fork::Enabled()) {
227 gpr_mu_lock(&fork_fd_list_mu);
228 fork_poller_list.remove(poller);
229 gpr_mu_unlock(&fork_fd_list_mu);
230 }
231 }
232
233 bool InitEpoll1PollerLinux();
234
235 // Called by the child process's post-fork handler to close open fds,
236 // including the global epoll fd of each poller. This allows gRPC to shutdown in
237 // the child process without interfering with connections or RPCs ongoing in the
238 // parent.
ResetEventManagerOnFork()239 void ResetEventManagerOnFork() {
240 // Delete all pending Epoll1EventHandles.
241 gpr_mu_lock(&fork_fd_list_mu);
242 while (fork_fd_list_head != nullptr) {
243 close(fork_fd_list_head->WrappedFd());
244 Epoll1EventHandle* next = fork_fd_list_head->ForkFdListPos().next;
245 delete fork_fd_list_head;
246 fork_fd_list_head = next;
247 }
248 // Delete all registered pollers. This also closes all open epoll_sets
249 while (!fork_poller_list.empty()) {
250 Epoll1Poller* poller = fork_poller_list.front();
251 fork_poller_list.pop_front();
252 poller->Close();
253 }
254 gpr_mu_unlock(&fork_fd_list_mu);
255 InitEpoll1PollerLinux();
256 }
257
258 // It is possible that GLIBC has epoll but the underlying kernel doesn't.
259 // Create epoll_fd to make sure epoll support is available
InitEpoll1PollerLinux()260 bool InitEpoll1PollerLinux() {
261 if (!grpc_event_engine::experimental::SupportsWakeupFd()) {
262 return false;
263 }
264 int fd = EpollCreateAndCloexec();
265 if (fd <= 0) {
266 return false;
267 }
268 if (grpc_core::Fork::Enabled()) {
269 if (grpc_core::Fork::RegisterResetChildPollingEngineFunc(
270 ResetEventManagerOnFork)) {
271 gpr_mu_init(&fork_fd_list_mu);
272 }
273 }
274 close(fd);
275 return true;
276 }
277
278 } // namespace
279
OrphanHandle(PosixEngineClosure * on_done,int * release_fd,absl::string_view reason)280 void Epoll1EventHandle::OrphanHandle(PosixEngineClosure* on_done,
281 int* release_fd,
282 absl::string_view reason) {
283 bool is_release_fd = (release_fd != nullptr);
284 bool was_shutdown = false;
285 if (!read_closure_->IsShutdown()) {
286 was_shutdown = true;
287 HandleShutdownInternal(absl::Status(absl::StatusCode::kUnknown, reason),
288 is_release_fd);
289 }
290
291 // If release_fd is not NULL, we should be relinquishing control of the file
292 // descriptor fd->fd (but we still own the grpc_fd structure).
293 if (is_release_fd) {
294 if (!was_shutdown) {
295 epoll_event phony_event;
296 if (epoll_ctl(poller_->g_epoll_set_.epfd, EPOLL_CTL_DEL, fd_,
297 &phony_event) != 0) {
298 LOG(ERROR) << "OrphanHandle: epoll_ctl failed: "
299 << grpc_core::StrError(errno);
300 }
301 }
302 *release_fd = fd_;
303 } else {
304 shutdown(fd_, SHUT_RDWR);
305 close(fd_);
306 }
307
308 ForkFdListRemoveHandle(this);
309 {
310 // See Epoll1Poller::ShutdownHandle for explanation on why a mutex is
311 // required here.
312 grpc_core::MutexLock lock(&mu_);
313 read_closure_->DestroyEvent();
314 write_closure_->DestroyEvent();
315 error_closure_->DestroyEvent();
316 }
317 pending_read_.store(false, std::memory_order_release);
318 pending_write_.store(false, std::memory_order_release);
319 pending_error_.store(false, std::memory_order_release);
320 {
321 grpc_core::MutexLock lock(&poller_->mu_);
322 poller_->free_epoll1_handles_list_.push_back(this);
323 }
324 if (on_done != nullptr) {
325 on_done->SetStatus(absl::OkStatus());
326 poller_->GetScheduler()->Run(on_done);
327 }
328 }
329
330 // if 'releasing_fd' is true, it means that we are going to detach the internal
331 // fd from grpc_fd structure (i.e which means we should not be calling
332 // shutdown() syscall on that fd)
HandleShutdownInternal(absl::Status why,bool releasing_fd)333 void Epoll1EventHandle::HandleShutdownInternal(absl::Status why,
334 bool releasing_fd) {
335 grpc_core::StatusSetInt(&why, grpc_core::StatusIntProperty::kRpcStatus,
336 GRPC_STATUS_UNAVAILABLE);
337 if (read_closure_->SetShutdown(why)) {
338 if (releasing_fd) {
339 epoll_event phony_event;
340 if (epoll_ctl(poller_->g_epoll_set_.epfd, EPOLL_CTL_DEL, fd_,
341 &phony_event) != 0) {
342 LOG(ERROR) << "HandleShutdownInternal: epoll_ctl failed: "
343 << grpc_core::StrError(errno);
344 }
345 }
346 write_closure_->SetShutdown(why);
347 error_closure_->SetShutdown(why);
348 }
349 }
350
Epoll1Poller(Scheduler * scheduler)351 Epoll1Poller::Epoll1Poller(Scheduler* scheduler)
352 : scheduler_(scheduler), was_kicked_(false), closed_(false) {
353 g_epoll_set_.epfd = EpollCreateAndCloexec();
354 wakeup_fd_ = *CreateWakeupFd();
355 CHECK(wakeup_fd_ != nullptr);
356 CHECK_GE(g_epoll_set_.epfd, 0);
357 GRPC_TRACE_LOG(event_engine_poller, INFO)
358 << "grpc epoll fd: " << g_epoll_set_.epfd;
359 struct epoll_event ev{};
360 ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
361 ev.data.ptr = wakeup_fd_.get();
362 CHECK(epoll_ctl(g_epoll_set_.epfd, EPOLL_CTL_ADD, wakeup_fd_->ReadFd(),
363 &ev) == 0);
364 g_epoll_set_.num_events = 0;
365 g_epoll_set_.cursor = 0;
366 ForkPollerListAddPoller(this);
367 }
368
Shutdown()369 void Epoll1Poller::Shutdown() { ForkPollerListRemovePoller(this); }
370
Close()371 void Epoll1Poller::Close() {
372 grpc_core::MutexLock lock(&mu_);
373 if (closed_) return;
374
375 if (g_epoll_set_.epfd >= 0) {
376 close(g_epoll_set_.epfd);
377 g_epoll_set_.epfd = -1;
378 }
379
380 while (!free_epoll1_handles_list_.empty()) {
381 Epoll1EventHandle* handle =
382 reinterpret_cast<Epoll1EventHandle*>(free_epoll1_handles_list_.front());
383 free_epoll1_handles_list_.pop_front();
384 delete handle;
385 }
386 closed_ = true;
387 }
388
~Epoll1Poller()389 Epoll1Poller::~Epoll1Poller() { Close(); }
390
CreateHandle(int fd,absl::string_view,bool track_err)391 EventHandle* Epoll1Poller::CreateHandle(int fd, absl::string_view /*name*/,
392 bool track_err) {
393 Epoll1EventHandle* new_handle = nullptr;
394 {
395 grpc_core::MutexLock lock(&mu_);
396 if (free_epoll1_handles_list_.empty()) {
397 new_handle = new Epoll1EventHandle(fd, this);
398 } else {
399 new_handle = reinterpret_cast<Epoll1EventHandle*>(
400 free_epoll1_handles_list_.front());
401 free_epoll1_handles_list_.pop_front();
402 new_handle->ReInit(fd);
403 }
404 }
405 ForkFdListAddHandle(new_handle);
406 struct epoll_event ev;
407 ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
408 // Use the least significant bit of ev.data.ptr to store track_err. We expect
409 // the addresses to be word aligned. We need to store track_err to avoid
410 // synchronization issues when accessing it after receiving an event.
411 // Accessing fd would be a data race there because the fd might have been
412 // returned to the free list at that point.
413 ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_handle) |
414 (track_err ? 1 : 0));
415 if (epoll_ctl(g_epoll_set_.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
416 LOG(ERROR) << "epoll_ctl failed: " << grpc_core::StrError(errno);
417 }
418
419 return new_handle;
420 }
421
422 // Process the epoll events found by DoEpollWait() function.
423 // - g_epoll_set.cursor points to the index of the first event to be processed
424 // - This function then processes up-to max_epoll_events_to_handle and
425 // updates the g_epoll_set.cursor.
426 // It returns true, it there was a Kick that forced invocation of this
427 // function. It also returns the list of closures to run to take action
428 // on file descriptors that became readable/writable.
ProcessEpollEvents(int max_epoll_events_to_handle,Events & pending_events)429 bool Epoll1Poller::ProcessEpollEvents(int max_epoll_events_to_handle,
430 Events& pending_events) {
431 int64_t num_events = g_epoll_set_.num_events;
432 int64_t cursor = g_epoll_set_.cursor;
433 bool was_kicked = false;
434 for (int idx = 0; (idx < max_epoll_events_to_handle) && cursor != num_events;
435 idx++) {
436 int64_t c = cursor++;
437 struct epoll_event* ev = &g_epoll_set_.events[c];
438 void* data_ptr = ev->data.ptr;
439 if (data_ptr == wakeup_fd_.get()) {
440 CHECK(wakeup_fd_->ConsumeWakeup().ok());
441 was_kicked = true;
442 } else {
443 Epoll1EventHandle* handle = reinterpret_cast<Epoll1EventHandle*>(
444 reinterpret_cast<intptr_t>(data_ptr) & ~intptr_t{1});
445 bool track_err = reinterpret_cast<intptr_t>(data_ptr) & intptr_t{1};
446 bool cancel = (ev->events & EPOLLHUP) != 0;
447 bool error = (ev->events & EPOLLERR) != 0;
448 bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
449 bool write_ev = (ev->events & EPOLLOUT) != 0;
450 bool err_fallback = error && !track_err;
451 if (handle->SetPendingActions(read_ev || cancel || err_fallback,
452 write_ev || cancel || err_fallback,
453 error && !err_fallback)) {
454 pending_events.push_back(handle);
455 }
456 }
457 }
458 g_epoll_set_.cursor = cursor;
459 return was_kicked;
460 }
461
462 // Do epoll_wait and store the events in g_epoll_set.events field. This does
463 // not "process" any of the events yet; that is done in ProcessEpollEvents().
464 // See ProcessEpollEvents() function for more details. It returns the number
465 // of events generated by epoll_wait.
DoEpollWait(EventEngine::Duration timeout)466 int Epoll1Poller::DoEpollWait(EventEngine::Duration timeout) {
467 int r;
468 do {
469 r = epoll_wait(g_epoll_set_.epfd, g_epoll_set_.events, MAX_EPOLL_EVENTS,
470 static_cast<int>(
471 grpc_event_engine::experimental::Milliseconds(timeout)));
472 } while (r < 0 && errno == EINTR);
473 if (r < 0) {
474 grpc_core::Crash(absl::StrFormat(
475 "(event_engine) Epoll1Poller:%p encountered epoll_wait error: %s", this,
476 grpc_core::StrError(errno).c_str()));
477 }
478 g_epoll_set_.num_events = r;
479 g_epoll_set_.cursor = 0;
480 return r;
481 }
482
483 // Might be called multiple times
ShutdownHandle(absl::Status why)484 void Epoll1EventHandle::ShutdownHandle(absl::Status why) {
485 // A mutex is required here because, the SetShutdown method of the
486 // lockfree event may schedule a closure if it is already ready and that
487 // closure may call OrphanHandle. Execution of ShutdownHandle and OrphanHandle
488 // in parallel is not safe because some of the lockfree event types e.g, read,
489 // write, error may-not have called SetShutdown when DestroyEvent gets
490 // called in the OrphanHandle method.
491 grpc_core::MutexLock lock(&mu_);
492 HandleShutdownInternal(why, false);
493 }
494
IsHandleShutdown()495 bool Epoll1EventHandle::IsHandleShutdown() {
496 return read_closure_->IsShutdown();
497 }
498
NotifyOnRead(PosixEngineClosure * on_read)499 void Epoll1EventHandle::NotifyOnRead(PosixEngineClosure* on_read) {
500 read_closure_->NotifyOn(on_read);
501 }
502
NotifyOnWrite(PosixEngineClosure * on_write)503 void Epoll1EventHandle::NotifyOnWrite(PosixEngineClosure* on_write) {
504 write_closure_->NotifyOn(on_write);
505 }
506
NotifyOnError(PosixEngineClosure * on_error)507 void Epoll1EventHandle::NotifyOnError(PosixEngineClosure* on_error) {
508 error_closure_->NotifyOn(on_error);
509 }
510
SetReadable()511 void Epoll1EventHandle::SetReadable() { read_closure_->SetReady(); }
512
SetWritable()513 void Epoll1EventHandle::SetWritable() { write_closure_->SetReady(); }
514
SetHasError()515 void Epoll1EventHandle::SetHasError() { error_closure_->SetReady(); }
516
517 // Polls the registered Fds for events until timeout is reached or there is a
518 // Kick(). If there is a Kick(), it collects and processes any previously
519 // un-processed events. If there are no un-processed events, it returns
520 // Poller::WorkResult::Kicked{}
Work(EventEngine::Duration timeout,absl::FunctionRef<void ()> schedule_poll_again)521 Poller::WorkResult Epoll1Poller::Work(
522 EventEngine::Duration timeout,
523 absl::FunctionRef<void()> schedule_poll_again) {
524 Events pending_events;
525 bool was_kicked_ext = false;
526 if (g_epoll_set_.cursor == g_epoll_set_.num_events) {
527 if (DoEpollWait(timeout) == 0) {
528 return Poller::WorkResult::kDeadlineExceeded;
529 }
530 }
531 {
532 grpc_core::MutexLock lock(&mu_);
533 // If was_kicked_ is true, collect all pending events in this iteration.
534 if (ProcessEpollEvents(
535 was_kicked_ ? INT_MAX : MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION,
536 pending_events)) {
537 was_kicked_ = false;
538 was_kicked_ext = true;
539 }
540 if (pending_events.empty()) {
541 return Poller::WorkResult::kKicked;
542 }
543 }
544 // Run the provided callback.
545 schedule_poll_again();
546 // Process all pending events inline.
547 for (auto& it : pending_events) {
548 it->ExecutePendingActions();
549 }
550 return was_kicked_ext ? Poller::WorkResult::kKicked : Poller::WorkResult::kOk;
551 }
552
Kick()553 void Epoll1Poller::Kick() {
554 grpc_core::MutexLock lock(&mu_);
555 if (was_kicked_ || closed_) {
556 return;
557 }
558 was_kicked_ = true;
559 CHECK(wakeup_fd_->Wakeup().ok());
560 }
561
MakeEpoll1Poller(Scheduler * scheduler)562 std::shared_ptr<Epoll1Poller> MakeEpoll1Poller(Scheduler* scheduler) {
563 static bool kEpoll1PollerSupported = InitEpoll1PollerLinux();
564 if (kEpoll1PollerSupported) {
565 return std::make_shared<Epoll1Poller>(scheduler);
566 }
567 return nullptr;
568 }
569
PrepareFork()570 void Epoll1Poller::PrepareFork() { Kick(); }
571
572 // TODO(vigneshbabu): implement
PostforkParent()573 void Epoll1Poller::PostforkParent() {}
574
575 // TODO(vigneshbabu): implement
PostforkChild()576 void Epoll1Poller::PostforkChild() {}
577
578 } // namespace experimental
579 } // namespace grpc_event_engine
580
581 #else // defined(GRPC_LINUX_EPOLL)
582 #if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
583
584 namespace grpc_event_engine {
585 namespace experimental {
586
587 using ::grpc_event_engine::experimental::EventEngine;
588 using ::grpc_event_engine::experimental::Poller;
589
Epoll1Poller(Scheduler *)590 Epoll1Poller::Epoll1Poller(Scheduler* /* engine */) {
591 grpc_core::Crash("unimplemented");
592 }
593
Shutdown()594 void Epoll1Poller::Shutdown() { grpc_core::Crash("unimplemented"); }
595
~Epoll1Poller()596 Epoll1Poller::~Epoll1Poller() { grpc_core::Crash("unimplemented"); }
597
CreateHandle(int,absl::string_view,bool)598 EventHandle* Epoll1Poller::CreateHandle(int /*fd*/, absl::string_view /*name*/,
599 bool /*track_err*/) {
600 grpc_core::Crash("unimplemented");
601 }
602
ProcessEpollEvents(int,Events &)603 bool Epoll1Poller::ProcessEpollEvents(int /*max_epoll_events_to_handle*/,
604 Events& /*pending_events*/) {
605 grpc_core::Crash("unimplemented");
606 }
607
DoEpollWait(EventEngine::Duration)608 int Epoll1Poller::DoEpollWait(EventEngine::Duration /*timeout*/) {
609 grpc_core::Crash("unimplemented");
610 }
611
Work(EventEngine::Duration,absl::FunctionRef<void ()>)612 Poller::WorkResult Epoll1Poller::Work(
613 EventEngine::Duration /*timeout*/,
614 absl::FunctionRef<void()> /*schedule_poll_again*/) {
615 grpc_core::Crash("unimplemented");
616 }
617
Kick()618 void Epoll1Poller::Kick() { grpc_core::Crash("unimplemented"); }
619
620 // If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
621 // nullptr.
MakeEpoll1Poller(Scheduler *)622 std::shared_ptr<Epoll1Poller> MakeEpoll1Poller(Scheduler* /*scheduler*/) {
623 return nullptr;
624 }
625
PrepareFork()626 void Epoll1Poller::PrepareFork() {}
627
PostforkParent()628 void Epoll1Poller::PostforkParent() {}
629
PostforkChild()630 void Epoll1Poller::PostforkChild() {}
631
632 } // namespace experimental
633 } // namespace grpc_event_engine
634
635 #endif // defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
636 #endif // !defined(GRPC_LINUX_EPOLL)
637