• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/event_engine/posix_engine/ev_poll_posix.h"
16 
17 #include <grpc/event_engine/event_engine.h>
18 #include <grpc/status.h>
19 #include <grpc/support/port_platform.h>
20 #include <grpc/support/sync.h>
21 #include <grpc/support/time.h>
22 #include <stdint.h>
23 
24 #include <atomic>
25 #include <list>
26 #include <memory>
27 #include <utility>
28 
29 #include "absl/container/inlined_vector.h"
30 #include "absl/functional/any_invocable.h"
31 #include "absl/log/check.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/str_format.h"
35 #include "src/core/lib/event_engine/poller.h"
36 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
37 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
38 #include "src/core/lib/iomgr/port.h"
39 #include "src/core/util/crash.h"
40 
41 #ifdef GRPC_POSIX_SOCKET_EV_POLL
42 
43 #include <errno.h>
44 #include <grpc/support/alloc.h>
45 #include <limits.h>
46 #include <poll.h>
47 #include <sys/socket.h>
48 #include <unistd.h>
49 
50 #include "src/core/lib/event_engine/common_closures.h"
51 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
52 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h"
53 #include "src/core/lib/event_engine/time_util.h"
54 #include "src/core/util/fork.h"
55 #include "src/core/util/status_helper.h"
56 #include "src/core/util/strerror.h"
57 #include "src/core/util/sync.h"
58 #include "src/core/util/time.h"
59 
60 static const intptr_t kClosureNotReady = 0;
61 static const intptr_t kClosureReady = 1;
62 static const int kPollinCheck = POLLIN | POLLHUP | POLLERR;
63 static const int kPolloutCheck = POLLOUT | POLLHUP | POLLERR;
64 
65 namespace grpc_event_engine {
66 namespace experimental {
67 
68 using Events = absl::InlinedVector<PollEventHandle*, 5>;
69 
70 class PollEventHandle : public EventHandle {
71  public:
PollEventHandle(int fd,std::shared_ptr<PollPoller> poller)72   PollEventHandle(int fd, std::shared_ptr<PollPoller> poller)
73       : fd_(fd),
74         pending_actions_(0),
75         fork_fd_list_(this),
76         poller_handles_list_(this),
77         scheduler_(poller->GetScheduler()),
78         poller_(std::move(poller)),
79         is_orphaned_(false),
80         is_shutdown_(false),
81         closed_(false),
82         released_(false),
83         pollhup_(false),
84         watch_mask_(-1),
85         shutdown_error_(absl::OkStatus()),
86         exec_actions_closure_([this]() { ExecutePendingActions(); }),
87         on_done_(nullptr),
88         read_closure_(reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)),
89         write_closure_(
90             reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)) {
91     grpc_core::MutexLock lock(&poller_->mu_);
92     poller_->PollerHandlesListAddHandle(this);
93   }
Poller()94   PollPoller* Poller() override { return poller_.get(); }
SetPendingActions(bool pending_read,bool pending_write)95   bool SetPendingActions(bool pending_read, bool pending_write) {
96     pending_actions_ |= pending_read;
97     if (pending_write) {
98       pending_actions_ |= (1 << 2);
99     }
100     if (pending_read || pending_write) {
101       // The closure is going to be executed. We'll Unref this handle in
102       // ExecutePendingActions.
103       Ref();
104       return true;
105     }
106     return false;
107   }
ForceRemoveHandleFromPoller()108   void ForceRemoveHandleFromPoller() {
109     grpc_core::MutexLock lock(&poller_->mu_);
110     poller_->PollerHandlesListRemoveHandle(this);
111   }
WrappedFd()112   int WrappedFd() override { return fd_; }
IsOrphaned() const113   bool IsOrphaned() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
114     return is_orphaned_;
115   }
CloseFd()116   void CloseFd() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
117     if (!released_ && !closed_) {
118       closed_ = true;
119       close(fd_);
120     }
121   }
IsPollhup() const122   bool IsPollhup() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return pollhup_; }
SetPollhup(bool pollhup)123   void SetPollhup(bool pollhup) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
124     pollhup_ = pollhup;
125   }
IsWatched(int & watch_mask) const126   bool IsWatched(int& watch_mask) const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
127     watch_mask = watch_mask_;
128     return watch_mask_ != -1;
129   }
IsWatched() const130   bool IsWatched() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
131     return watch_mask_ != -1;
132   }
SetWatched(int watch_mask)133   void SetWatched(int watch_mask) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
134     watch_mask_ = watch_mask;
135   }
136   void OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
137                     absl::string_view reason) override;
138   void ShutdownHandle(absl::Status why) override;
139   void NotifyOnRead(PosixEngineClosure* on_read) override;
140   void NotifyOnWrite(PosixEngineClosure* on_write) override;
141   void NotifyOnError(PosixEngineClosure* on_error) override;
142   void SetReadable() override;
143   void SetWritable() override;
144   void SetHasError() override;
IsHandleShutdown()145   bool IsHandleShutdown() override {
146     grpc_core::MutexLock lock(&mu_);
147     return is_shutdown_;
148   };
ExecutePendingActions()149   inline void ExecutePendingActions() {
150     int kick = 0;
151     {
152       grpc_core::MutexLock lock(&mu_);
153       if ((pending_actions_ & 1UL)) {
154         if (SetReadyLocked(&read_closure_)) {
155           kick = 1;
156         }
157       }
158       if (((pending_actions_ >> 2) & 1UL)) {
159         if (SetReadyLocked(&write_closure_)) {
160           kick = 1;
161         }
162       }
163       pending_actions_ = 0;
164     }
165     if (kick) {
166       // SetReadyLocked immediately scheduled some closure. It would have set
167       // the closure state to NOT_READY. We need to wakeup the Work(...)
168       // thread to start polling on this fd. If this call is not made, it is
169       // possible that the poller will reach a state where all the fds under
170       // the poller's control are not polled for POLLIN/POLLOUT events thus
171       // leading to an indefinitely blocked Work(..) method.
172       poller_->KickExternal(false);
173     }
174     Unref();
175   }
Ref()176   void Ref() { ref_count_.fetch_add(1, std::memory_order_relaxed); }
Unref()177   void Unref() {
178     if (ref_count_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
179       if (on_done_ != nullptr) {
180         scheduler_->Run(on_done_);
181       }
182       delete this;
183     }
184   }
185   ~PollEventHandle() override = default;
mu()186   grpc_core::Mutex* mu() ABSL_LOCK_RETURNED(mu_) { return &mu_; }
ForkFdListPos()187   PollPoller::HandlesList& ForkFdListPos() { return fork_fd_list_; }
PollerHandlesListPos()188   PollPoller::HandlesList& PollerHandlesListPos() {
189     return poller_handles_list_;
190   }
191   uint32_t BeginPollLocked(uint32_t read_mask, uint32_t write_mask)
192       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
193   bool EndPollLocked(bool got_read, bool got_write)
194       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
195 
196  private:
197   int SetReadyLocked(PosixEngineClosure** st);
198   int NotifyOnLocked(PosixEngineClosure** st, PosixEngineClosure* closure);
199   // See Epoll1Poller::ShutdownHandle for explanation on why a mutex is
200   // required.
201   grpc_core::Mutex mu_;
202   std::atomic<int> ref_count_{1};
203   int fd_;
204   int pending_actions_;
205   PollPoller::HandlesList fork_fd_list_;
206   PollPoller::HandlesList poller_handles_list_;
207   Scheduler* scheduler_;
208   std::shared_ptr<PollPoller> poller_;
209   bool is_orphaned_;
210   bool is_shutdown_;
211   bool closed_;
212   bool released_;
213   bool pollhup_;
214   int watch_mask_;
215   absl::Status shutdown_error_;
216   AnyInvocableClosure exec_actions_closure_;
217   PosixEngineClosure* on_done_;
218   PosixEngineClosure* read_closure_;
219   PosixEngineClosure* write_closure_;
220 };
221 
222 namespace {
223 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
224 std::list<PollPoller*> fork_poller_list;
225 
226 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
227 PollEventHandle* fork_fd_list_head = nullptr;
228 gpr_mu fork_fd_list_mu;
229 
ForkFdListAddHandle(PollEventHandle * handle)230 void ForkFdListAddHandle(PollEventHandle* handle) {
231   if (grpc_core::Fork::Enabled()) {
232     gpr_mu_lock(&fork_fd_list_mu);
233     handle->ForkFdListPos().next = fork_fd_list_head;
234     handle->ForkFdListPos().prev = nullptr;
235     if (fork_fd_list_head != nullptr) {
236       fork_fd_list_head->ForkFdListPos().prev = handle;
237     }
238     fork_fd_list_head = handle;
239     gpr_mu_unlock(&fork_fd_list_mu);
240   }
241 }
242 
ForkFdListRemoveHandle(PollEventHandle * handle)243 void ForkFdListRemoveHandle(PollEventHandle* handle) {
244   if (grpc_core::Fork::Enabled()) {
245     gpr_mu_lock(&fork_fd_list_mu);
246     if (fork_fd_list_head == handle) {
247       fork_fd_list_head = handle->ForkFdListPos().next;
248     }
249     if (handle->ForkFdListPos().prev != nullptr) {
250       handle->ForkFdListPos().prev->ForkFdListPos().next =
251           handle->ForkFdListPos().next;
252     }
253     if (handle->ForkFdListPos().next != nullptr) {
254       handle->ForkFdListPos().next->ForkFdListPos().prev =
255           handle->ForkFdListPos().prev;
256     }
257     gpr_mu_unlock(&fork_fd_list_mu);
258   }
259 }
260 
ForkPollerListAddPoller(PollPoller * poller)261 void ForkPollerListAddPoller(PollPoller* poller) {
262   if (grpc_core::Fork::Enabled()) {
263     gpr_mu_lock(&fork_fd_list_mu);
264     fork_poller_list.push_back(poller);
265     gpr_mu_unlock(&fork_fd_list_mu);
266   }
267 }
268 
ForkPollerListRemovePoller(PollPoller * poller)269 void ForkPollerListRemovePoller(PollPoller* poller) {
270   if (grpc_core::Fork::Enabled()) {
271     gpr_mu_lock(&fork_fd_list_mu);
272     fork_poller_list.remove(poller);
273     gpr_mu_unlock(&fork_fd_list_mu);
274   }
275 }
276 
277 // Returns the number of milliseconds elapsed between now and start timestamp.
PollElapsedTimeToMillis(grpc_core::Timestamp start)278 int PollElapsedTimeToMillis(grpc_core::Timestamp start) {
279   if (start == grpc_core::Timestamp::InfFuture()) return -1;
280   grpc_core::Timestamp now =
281       grpc_core::Timestamp::FromTimespecRoundDown(gpr_now(GPR_CLOCK_MONOTONIC));
282   int64_t delta = (now - start).millis();
283   if (delta > INT_MAX) {
284     return INT_MAX;
285   } else if (delta < 0) {
286     return 0;
287   } else {
288     return static_cast<int>(delta);
289   }
290 }
291 
292 bool InitPollPollerPosix();
293 
294 // Called by the child process's post-fork handler to close open fds,
295 // including the global epoll fd of each poller. This allows gRPC to shutdown
296 // in the child process without interfering with connections or RPCs ongoing
297 // in the parent.
ResetEventManagerOnFork()298 void ResetEventManagerOnFork() {
299   // Delete all pending Epoll1EventHandles.
300   gpr_mu_lock(&fork_fd_list_mu);
301   while (fork_fd_list_head != nullptr) {
302     close(fork_fd_list_head->WrappedFd());
303     PollEventHandle* next = fork_fd_list_head->ForkFdListPos().next;
304     fork_fd_list_head->ForceRemoveHandleFromPoller();
305     delete fork_fd_list_head;
306     fork_fd_list_head = next;
307   }
308   // Delete all registered pollers.
309   while (!fork_poller_list.empty()) {
310     PollPoller* poller = fork_poller_list.front();
311     fork_poller_list.pop_front();
312     poller->Close();
313   }
314   gpr_mu_unlock(&fork_fd_list_mu);
315   InitPollPollerPosix();
316 }
317 
318 // It is possible that GLIBC has epoll but the underlying kernel doesn't.
319 // Create epoll_fd to make sure epoll support is available
InitPollPollerPosix()320 bool InitPollPollerPosix() {
321   if (!grpc_event_engine::experimental::SupportsWakeupFd()) {
322     return false;
323   }
324   if (grpc_core::Fork::Enabled()) {
325     if (grpc_core::Fork::RegisterResetChildPollingEngineFunc(
326             ResetEventManagerOnFork)) {
327       gpr_mu_init(&fork_fd_list_mu);
328     }
329   }
330   return true;
331 }
332 
333 }  // namespace
334 
CreateHandle(int fd,absl::string_view,bool track_err)335 EventHandle* PollPoller::CreateHandle(int fd, absl::string_view /*name*/,
336                                       bool track_err) {
337   // Avoid unused-parameter warning for debug-only parameter
338   (void)track_err;
339   DCHECK(track_err == false);
340   PollEventHandle* handle = new PollEventHandle(fd, shared_from_this());
341   ForkFdListAddHandle(handle);
342   // We need to send a kick to the thread executing Work(..) so that it can
343   // add this new Fd into the list of Fds to poll.
344   KickExternal(false);
345   return handle;
346 }
347 
OrphanHandle(PosixEngineClosure * on_done,int * release_fd,absl::string_view)348 void PollEventHandle::OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
349                                    absl::string_view /*reason*/) {
350   ForkFdListRemoveHandle(this);
351   ForceRemoveHandleFromPoller();
352   {
353     grpc_core::ReleasableMutexLock lock(&mu_);
354     on_done_ = on_done;
355     released_ = release_fd != nullptr;
356     if (release_fd != nullptr) {
357       *release_fd = fd_;
358     }
359     CHECK(!is_orphaned_);
360     is_orphaned_ = true;
361     // Perform shutdown operations if not already done so.
362     if (!is_shutdown_) {
363       is_shutdown_ = true;
364       shutdown_error_ =
365           absl::Status(absl::StatusCode::kInternal, "FD Orphaned");
366       grpc_core::StatusSetInt(&shutdown_error_,
367                               grpc_core::StatusIntProperty::kRpcStatus,
368                               GRPC_STATUS_UNAVAILABLE);
369       SetReadyLocked(&read_closure_);
370       SetReadyLocked(&write_closure_);
371     }
372     // signal read/write closed to OS so that future operations fail.
373     if (!released_) {
374       shutdown(fd_, SHUT_RDWR);
375     }
376     if (!IsWatched()) {
377       CloseFd();
378     } else {
379       // It is watched i.e we cannot take action without breaking from the
380       // blocking poll. Mark it as Unwatched and kick the thread executing
381       // Work(...). That thread should proceed with the cleanup.
382       SetWatched(-1);
383       lock.Release();
384       poller_->KickExternal(false);
385     }
386   }
387   Unref();
388 }
389 
NotifyOnLocked(PosixEngineClosure ** st,PosixEngineClosure * closure)390 int PollEventHandle::NotifyOnLocked(PosixEngineClosure** st,
391                                     PosixEngineClosure* closure) {
392   if (is_shutdown_ || pollhup_) {
393     closure->SetStatus(shutdown_error_);
394     scheduler_->Run(closure);
395   } else if (*st == reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)) {
396     // not ready ==> switch to a waiting state by setting the closure
397     *st = closure;
398     return 0;
399   } else if (*st == reinterpret_cast<PosixEngineClosure*>(kClosureReady)) {
400     // already ready ==> queue the closure to run immediately
401     *st = reinterpret_cast<PosixEngineClosure*>(kClosureNotReady);
402     closure->SetStatus(shutdown_error_);
403     scheduler_->Run(closure);
404     return 1;
405   } else {
406     // upcallptr was set to a different closure.  This is an error!
407     grpc_core::Crash(
408         "User called a notify_on function with a previous callback still "
409         "pending");
410   }
411   return 0;
412 }
413 
414 // returns 1 if state becomes not ready
SetReadyLocked(PosixEngineClosure ** st)415 int PollEventHandle::SetReadyLocked(PosixEngineClosure** st) {
416   if (*st == reinterpret_cast<PosixEngineClosure*>(kClosureReady)) {
417     // duplicate ready ==> ignore
418     return 0;
419   } else if (*st == reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)) {
420     // not ready, and not waiting ==> flag ready
421     *st = reinterpret_cast<PosixEngineClosure*>(kClosureReady);
422     return 0;
423   } else {
424     // waiting ==> queue closure
425     PosixEngineClosure* closure = *st;
426     *st = reinterpret_cast<PosixEngineClosure*>(kClosureNotReady);
427     closure->SetStatus(shutdown_error_);
428     scheduler_->Run(closure);
429     return 1;
430   }
431 }
432 
ShutdownHandle(absl::Status why)433 void PollEventHandle::ShutdownHandle(absl::Status why) {
434   // We need to take a Ref here because SetReadyLocked may trigger execution
435   // of a closure which calls OrphanHandle or poller->Shutdown() prematurely.
436   Ref();
437   {
438     grpc_core::MutexLock lock(&mu_);
439     // only shutdown once
440     if (!is_shutdown_) {
441       is_shutdown_ = true;
442       shutdown_error_ = why;
443       grpc_core::StatusSetInt(&shutdown_error_,
444                               grpc_core::StatusIntProperty::kRpcStatus,
445                               GRPC_STATUS_UNAVAILABLE);
446       SetReadyLocked(&read_closure_);
447       SetReadyLocked(&write_closure_);
448     }
449   }
450   // For the Ref() taken at the beginning of this function.
451   Unref();
452 }
453 
NotifyOnRead(PosixEngineClosure * on_read)454 void PollEventHandle::NotifyOnRead(PosixEngineClosure* on_read) {
455   // We need to take a Ref here because NotifyOnLocked may trigger execution
456   // of a closure which calls OrphanHandle that may delete this object or call
457   // poller->Shutdown() prematurely.
458   Ref();
459   {
460     grpc_core::ReleasableMutexLock lock(&mu_);
461     if (NotifyOnLocked(&read_closure_, on_read)) {
462       lock.Release();
463       // NotifyOnLocked immediately scheduled some closure. It would have set
464       // the closure state to NOT_READY. We need to wakeup the Work(...) thread
465       // to start polling on this fd. If this call is not made, it is possible
466       // that the poller will reach a state where all the fds under the
467       // poller's control are not polled for POLLIN/POLLOUT events thus leading
468       // to an indefinitely blocked Work(..) method.
469       poller_->KickExternal(false);
470     }
471   }
472   // For the Ref() taken at the beginning of this function.
473   Unref();
474 }
475 
NotifyOnWrite(PosixEngineClosure * on_write)476 void PollEventHandle::NotifyOnWrite(PosixEngineClosure* on_write) {
477   // We need to take a Ref here because NotifyOnLocked may trigger execution
478   // of a closure which calls OrphanHandle that may delete this object or call
479   // poller->Shutdown() prematurely.
480   Ref();
481   {
482     grpc_core::ReleasableMutexLock lock(&mu_);
483     if (NotifyOnLocked(&write_closure_, on_write)) {
484       lock.Release();
485       // NotifyOnLocked immediately scheduled some closure. It would have set
486       // the closure state to NOT_READY. We need to wakeup the Work(...) thread
487       // to start polling on this fd. If this call is not made, it is possible
488       // that the poller will reach a state where all the fds under the
489       // poller's control are not polled for POLLIN/POLLOUT events thus leading
490       // to an indefinitely blocked Work(..) method.
491       poller_->KickExternal(false);
492     }
493   }
494   // For the Ref() taken at the beginning of this function.
495   Unref();
496 }
497 
NotifyOnError(PosixEngineClosure * on_error)498 void PollEventHandle::NotifyOnError(PosixEngineClosure* on_error) {
499   on_error->SetStatus(
500       absl::Status(absl::StatusCode::kCancelled,
501                    "Polling engine does not support tracking errors"));
502   scheduler_->Run(on_error);
503 }
504 
SetReadable()505 void PollEventHandle::SetReadable() {
506   Ref();
507   {
508     grpc_core::MutexLock lock(&mu_);
509     SetReadyLocked(&read_closure_);
510   }
511   Unref();
512 }
513 
SetWritable()514 void PollEventHandle::SetWritable() {
515   Ref();
516   {
517     grpc_core::MutexLock lock(&mu_);
518     SetReadyLocked(&write_closure_);
519   }
520   Unref();
521 }
522 
SetHasError()523 void PollEventHandle::SetHasError() {}
524 
BeginPollLocked(uint32_t read_mask,uint32_t write_mask)525 uint32_t PollEventHandle::BeginPollLocked(uint32_t read_mask,
526                                           uint32_t write_mask) {
527   uint32_t mask = 0;
528   bool read_ready = (pending_actions_ & 1UL);
529   bool write_ready = ((pending_actions_ >> 2) & 1UL);
530   Ref();
531   // If we are shutdown, then no need to poll this fd. Set watch_mask to 0.
532   if (is_shutdown_) {
533     SetWatched(0);
534     return 0;
535   }
536   // If there is nobody polling for read, but we need to, then start doing so.
537   if (read_mask && !read_ready &&
538       read_closure_ != reinterpret_cast<PosixEngineClosure*>(kClosureReady)) {
539     mask |= read_mask;
540   }
541 
542   // If there is nobody polling for write, but we need to, then start doing so
543   if (write_mask && !write_ready &&
544       write_closure_ != reinterpret_cast<PosixEngineClosure*>(kClosureReady)) {
545     mask |= write_mask;
546   }
547   SetWatched(mask);
548   return mask;
549 }
550 
EndPollLocked(bool got_read,bool got_write)551 bool PollEventHandle::EndPollLocked(bool got_read, bool got_write) {
552   if (is_orphaned_ && !IsWatched()) {
553     CloseFd();
554   } else if (!is_orphaned_) {
555     return SetPendingActions(got_read, got_write);
556   }
557   return false;
558 }
559 
KickExternal(bool ext)560 void PollPoller::KickExternal(bool ext) {
561   grpc_core::MutexLock lock(&mu_);
562   if (closed_) {
563     return;
564   }
565   if (was_kicked_) {
566     if (ext) {
567       was_kicked_ext_ = true;
568     }
569     return;
570   }
571   was_kicked_ = true;
572   was_kicked_ext_ = ext;
573   CHECK(wakeup_fd_->Wakeup().ok());
574 }
575 
Kick()576 void PollPoller::Kick() { KickExternal(true); }
577 
PollerHandlesListAddHandle(PollEventHandle * handle)578 void PollPoller::PollerHandlesListAddHandle(PollEventHandle* handle) {
579   handle->PollerHandlesListPos().next = poll_handles_list_head_;
580   handle->PollerHandlesListPos().prev = nullptr;
581   if (poll_handles_list_head_ != nullptr) {
582     poll_handles_list_head_->PollerHandlesListPos().prev = handle;
583   }
584   poll_handles_list_head_ = handle;
585   ++num_poll_handles_;
586 }
587 
PollerHandlesListRemoveHandle(PollEventHandle * handle)588 void PollPoller::PollerHandlesListRemoveHandle(PollEventHandle* handle) {
589   if (poll_handles_list_head_ == handle) {
590     poll_handles_list_head_ = handle->PollerHandlesListPos().next;
591   }
592   if (handle->PollerHandlesListPos().prev != nullptr) {
593     handle->PollerHandlesListPos().prev->PollerHandlesListPos().next =
594         handle->PollerHandlesListPos().next;
595   }
596   if (handle->PollerHandlesListPos().next != nullptr) {
597     handle->PollerHandlesListPos().next->PollerHandlesListPos().prev =
598         handle->PollerHandlesListPos().prev;
599   }
600   --num_poll_handles_;
601 }
602 
PollPoller(Scheduler * scheduler)603 PollPoller::PollPoller(Scheduler* scheduler)
604     : scheduler_(scheduler),
605       use_phony_poll_(false),
606       was_kicked_(false),
607       was_kicked_ext_(false),
608       num_poll_handles_(0),
609       poll_handles_list_head_(nullptr),
610       closed_(false) {
611   wakeup_fd_ = *CreateWakeupFd();
612   CHECK(wakeup_fd_ != nullptr);
613   ForkPollerListAddPoller(this);
614 }
615 
PollPoller(Scheduler * scheduler,bool use_phony_poll)616 PollPoller::PollPoller(Scheduler* scheduler, bool use_phony_poll)
617     : scheduler_(scheduler),
618       use_phony_poll_(use_phony_poll),
619       was_kicked_(false),
620       was_kicked_ext_(false),
621       num_poll_handles_(0),
622       poll_handles_list_head_(nullptr),
623       closed_(false) {
624   wakeup_fd_ = *CreateWakeupFd();
625   CHECK(wakeup_fd_ != nullptr);
626   ForkPollerListAddPoller(this);
627 }
628 
~PollPoller()629 PollPoller::~PollPoller() {
630   // Assert that no active handles are present at the time of destruction.
631   // They should have been orphaned before reaching this state.
632   CHECK_EQ(num_poll_handles_, 0);
633   CHECK_EQ(poll_handles_list_head_, nullptr);
634 }
635 
Work(EventEngine::Duration timeout,absl::FunctionRef<void ()> schedule_poll_again)636 Poller::WorkResult PollPoller::Work(
637     EventEngine::Duration timeout,
638     absl::FunctionRef<void()> schedule_poll_again) {
639   // Avoid malloc for small number of elements.
640   enum { inline_elements = 96 };
641   struct pollfd pollfd_space[inline_elements];
642   bool was_kicked_ext = false;
643   PollEventHandle* watcher_space[inline_elements];
644   Events pending_events;
645   pending_events.clear();
646   int timeout_ms =
647       static_cast<int>(grpc_event_engine::experimental::Milliseconds(timeout));
648   mu_.Lock();
649   // Start polling, and keep doing so while we're being asked to
650   // re-evaluate our pollers (this allows poll() based pollers to
651   // ensure they don't miss wakeups).
652   while (pending_events.empty() && timeout_ms >= 0) {
653     int r = 0;
654     size_t i;
655     nfds_t pfd_count;
656     struct pollfd* pfds;
657     PollEventHandle** watchers;
658     // Estimate start time for a poll iteration.
659     grpc_core::Timestamp start = grpc_core::Timestamp::FromTimespecRoundDown(
660         gpr_now(GPR_CLOCK_MONOTONIC));
661     if (num_poll_handles_ + 2 <= inline_elements) {
662       pfds = pollfd_space;
663       watchers = watcher_space;
664     } else {
665       const size_t pfd_size = sizeof(*pfds) * (num_poll_handles_ + 2);
666       const size_t watch_size = sizeof(*watchers) * (num_poll_handles_ + 2);
667       void* buf = gpr_malloc(pfd_size + watch_size);
668       pfds = static_cast<struct pollfd*>(buf);
669       watchers = static_cast<PollEventHandle**>(
670           static_cast<void*>((static_cast<char*>(buf) + pfd_size)));
671       pfds = static_cast<struct pollfd*>(buf);
672     }
673 
674     pfd_count = 1;
675     pfds[0].fd = wakeup_fd_->ReadFd();
676     pfds[0].events = POLLIN;
677     pfds[0].revents = 0;
678     PollEventHandle* head = poll_handles_list_head_;
679     while (head != nullptr) {
680       {
681         grpc_core::MutexLock lock(head->mu());
682         // There shouldn't be any orphaned fds at this point. This is because
683         // prior to marking a handle as orphaned it is first removed from
684         // poll handle list for the poller under the poller lock.
685         CHECK(!head->IsOrphaned());
686         if (!head->IsPollhup()) {
687           pfds[pfd_count].fd = head->WrappedFd();
688           watchers[pfd_count] = head;
689           // BeginPollLocked takes a ref of the handle. It also marks the
690           // fd as Watched with an appropriate watch_mask. The watch_mask
691           // is 0 if the fd is shutdown or if the fd is already ready (i.e
692           // both read and write events are already available) and doesn't
693           // need to be polled again. The watch_mask is > 0 otherwise
694           // indicating the fd needs to be polled.
695           pfds[pfd_count].events = head->BeginPollLocked(POLLIN, POLLOUT);
696           pfd_count++;
697         }
698       }
699       head = head->PollerHandlesListPos().next;
700     }
701     mu_.Unlock();
702 
703     if (!use_phony_poll_ || timeout_ms == 0 || pfd_count == 1) {
704       // If use_phony_poll is true and pfd_count == 1, it implies only the
705       // wakeup_fd is present. Allow the call to get blocked in this case as
706       // well instead of crashing. This is because the poller::Work is called
707       // right after an event enging is constructed. Even if phony poll is
708       // expected to be used, we dont want to check for it until some actual
709       // event handles are registered. Otherwise the EventEngine construction
710       // may crash.
711       r = poll(pfds, pfd_count, timeout_ms);
712     } else {
713       grpc_core::Crash("Attempted a blocking poll when declared non-polling.");
714     }
715 
716     if (r <= 0) {
717       if (r < 0 && errno != EINTR) {
718         // Abort fail here.
719         grpc_core::Crash(absl::StrFormat(
720             "(event_engine) PollPoller:%p encountered poll error: %s", this,
721             grpc_core::StrError(errno).c_str()));
722       }
723 
724       for (i = 1; i < pfd_count; i++) {
725         PollEventHandle* head = watchers[i];
726         int watch_mask;
727         grpc_core::ReleasableMutexLock lock(head->mu());
728         if (head->IsWatched(watch_mask)) {
729           head->SetWatched(-1);
730           // This fd was Watched with a watch mask > 0.
731           if (watch_mask > 0 && r < 0) {
732             // This case implies the fd was polled (since watch_mask > 0 and
733             // the poll returned an error. Mark the fds as both readable and
734             // writable.
735             if (head->EndPollLocked(true, true)) {
736               // Its safe to add to list of pending events because
737               // EndPollLocked returns true only when the handle is
738               // not orphaned. But an orphan might be initiated on the handle
739               // after this Work() method returns and before the next Work()
740               // method is invoked.
741               pending_events.push_back(head);
742             }
743           } else {
744             // In this case, (1) watch_mask > 0 && r == 0 or (2) watch_mask ==
745             // 0 and r < 0 or (3) watch_mask == 0 and r == 0. For case-1, no
746             // events are pending on the fd even though the fd was polled. For
747             // case-2 and 3, the fd was not polled
748             head->EndPollLocked(false, false);
749           }
750         } else {
751           // It can enter this case if an orphan was invoked on the handle
752           // while it was being polled.
753           head->EndPollLocked(false, false);
754         }
755         lock.Release();
756         // Unref the ref taken at BeginPollLocked.
757         head->Unref();
758       }
759     } else {
760       if (pfds[0].revents & kPollinCheck) {
761         CHECK(wakeup_fd_->ConsumeWakeup().ok());
762       }
763       for (i = 1; i < pfd_count; i++) {
764         PollEventHandle* head = watchers[i];
765         int watch_mask;
766         grpc_core::ReleasableMutexLock lock(head->mu());
767         if (!head->IsWatched(watch_mask) || watch_mask == 0) {
768           // IsWatched will be false if an orphan was invoked on the
769           // handle while it was being polled. If watch_mask is 0, then the fd
770           // was not polled.
771           head->SetWatched(-1);
772           head->EndPollLocked(false, false);
773         } else {
774           // Watched is true and watch_mask > 0
775           if (pfds[i].revents & POLLHUP) {
776             head->SetPollhup(true);
777           }
778           head->SetWatched(-1);
779           if (head->EndPollLocked(pfds[i].revents & kPollinCheck,
780                                   pfds[i].revents & kPolloutCheck)) {
781             // Its safe to add to list of pending events because EndPollLocked
782             // returns true only when the handle is not orphaned.
783             // But an orphan might be initiated on the handle after this
784             // Work() method returns and before the next Work() method is
785             // invoked.
786             pending_events.push_back(head);
787           }
788         }
789         lock.Release();
790         // Unref the ref taken at BeginPollLocked.
791         head->Unref();
792       }
793     }
794 
795     if (pfds != pollfd_space) {
796       gpr_free(pfds);
797     }
798 
799     // End of poll iteration. Update how much time is remaining.
800     timeout_ms -= PollElapsedTimeToMillis(start);
801     mu_.Lock();
802     if (std::exchange(was_kicked_, false) &&
803         std::exchange(was_kicked_ext_, false)) {
804       // External kick. Need to break out.
805       was_kicked_ext = true;
806       break;
807     }
808   }
809   mu_.Unlock();
810   if (pending_events.empty()) {
811     if (was_kicked_ext) {
812       return Poller::WorkResult::kKicked;
813     }
814     return Poller::WorkResult::kDeadlineExceeded;
815   }
816   // Run the provided callback synchronously.
817   schedule_poll_again();
818   // Process all pending events inline.
819   for (auto& it : pending_events) {
820     it->ExecutePendingActions();
821   }
822   return was_kicked_ext ? Poller::WorkResult::kKicked : Poller::WorkResult::kOk;
823 }
824 
Shutdown()825 void PollPoller::Shutdown() { ForkPollerListRemovePoller(this); }
826 
PrepareFork()827 void PollPoller::PrepareFork() { Kick(); }
828 // TODO(vigneshbabu): implement
PostforkParent()829 void PollPoller::PostforkParent() {}
830 // TODO(vigneshbabu): implement
PostforkChild()831 void PollPoller::PostforkChild() {}
832 
Close()833 void PollPoller::Close() {
834   grpc_core::MutexLock lock(&mu_);
835   closed_ = true;
836 }
837 
MakePollPoller(Scheduler * scheduler,bool use_phony_poll)838 std::shared_ptr<PollPoller> MakePollPoller(Scheduler* scheduler,
839                                            bool use_phony_poll) {
840   static bool kPollPollerSupported = InitPollPollerPosix();
841   if (kPollPollerSupported) {
842     return std::make_shared<PollPoller>(scheduler, use_phony_poll);
843   }
844   return nullptr;
845 }
846 
847 }  // namespace experimental
848 }  // namespace grpc_event_engine
849 
850 #else  // GRPC_POSIX_SOCKET_EV_POLL
851 
852 #include "src/core/util/crash.h"
853 
854 namespace grpc_event_engine {
855 namespace experimental {
856 
PollPoller(Scheduler *)857 PollPoller::PollPoller(Scheduler* /* engine */) {
858   grpc_core::Crash("unimplemented");
859 }
860 
Shutdown()861 void PollPoller::Shutdown() { grpc_core::Crash("unimplemented"); }
862 
~PollPoller()863 PollPoller::~PollPoller() { grpc_core::Crash("unimplemented"); }
864 
CreateHandle(int,absl::string_view,bool)865 EventHandle* PollPoller::CreateHandle(int /*fd*/, absl::string_view /*name*/,
866                                       bool /*track_err*/) {
867   grpc_core::Crash("unimplemented");
868 }
869 
Work(EventEngine::Duration,absl::FunctionRef<void ()>)870 Poller::WorkResult PollPoller::Work(
871     EventEngine::Duration /*timeout*/,
872     absl::FunctionRef<void()> /*schedule_poll_again*/) {
873   grpc_core::Crash("unimplemented");
874 }
875 
Kick()876 void PollPoller::Kick() { grpc_core::Crash("unimplemented"); }
877 
878 // If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
879 // nullptr.
MakePollPoller(Scheduler *,bool)880 std::shared_ptr<PollPoller> MakePollPoller(Scheduler* /*scheduler*/,
881                                            bool /* use_phony_poll */) {
882   return nullptr;
883 }
884 
PrepareFork()885 void PollPoller::PrepareFork() { grpc_core::Crash("unimplemented"); }
PostforkParent()886 void PollPoller::PostforkParent() { grpc_core::Crash("unimplemented"); }
PostforkChild()887 void PollPoller::PostforkChild() { grpc_core::Crash("unimplemented"); }
888 
Close()889 void PollPoller::Close() { grpc_core::Crash("unimplemented"); }
890 
KickExternal(bool)891 void PollPoller::KickExternal(bool /*ext*/) {
892   grpc_core::Crash("unimplemented");
893 }
894 
PollerHandlesListAddHandle(PollEventHandle *)895 void PollPoller::PollerHandlesListAddHandle(PollEventHandle* /*handle*/) {
896   grpc_core::Crash("unimplemented");
897 }
898 
PollerHandlesListRemoveHandle(PollEventHandle *)899 void PollPoller::PollerHandlesListRemoveHandle(PollEventHandle* /*handle*/) {
900   grpc_core::Crash("unimplemented");
901 }
902 
903 }  // namespace experimental
904 }  // namespace grpc_event_engine
905 
906 #endif  // GRPC_POSIX_SOCKET_EV_POLL
907