• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 #include <grpc/support/sync.h>
21 
22 #include "src/core/lib/iomgr/port.h"
23 
24 #ifdef GRPC_POSIX_SOCKET_EV_POLL
25 
26 #include <assert.h>
27 #include <errno.h>
28 #include <grpc/support/alloc.h>
29 #include <limits.h>
30 #include <poll.h>
31 #include <string.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34 
35 #include <string>
36 
37 #include "absl/log/check.h"
38 #include "absl/log/log.h"
39 #include "absl/strings/str_cat.h"
40 #include "absl/strings/str_format.h"
41 #include "src/core/lib/iomgr/block_annotate.h"
42 #include "src/core/lib/iomgr/ev_poll_posix.h"
43 #include "src/core/lib/iomgr/iomgr_internal.h"
44 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
45 #include "src/core/telemetry/stats.h"
46 #include "src/core/telemetry/stats_data.h"
47 #include "src/core/util/crash.h"
48 #include "src/core/util/thd.h"
49 #include "src/core/util/useful.h"
50 
51 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
52 
53 //******************************************************************************
54 // FD declarations
55 //
56 typedef struct grpc_fd_watcher {
57   struct grpc_fd_watcher* next;
58   struct grpc_fd_watcher* prev;
59   grpc_pollset* pollset;
60   grpc_pollset_worker* worker;
61   grpc_fd* fd;
62 } grpc_fd_watcher;
63 
64 typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;
65 
66 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
67 struct grpc_fork_fd_list {
68   // Only one of fd or cached_wakeup_fd will be set. The unused field will be
69   // set to nullptr.
70   grpc_fd* fd;
71   grpc_cached_wakeup_fd* cached_wakeup_fd;
72 
73   grpc_fork_fd_list* next;
74   grpc_fork_fd_list* prev;
75 };
76 
77 struct grpc_fd {
78   int fd;
79   // refst format:
80   // bit0:   1=active/0=orphaned
81   // bit1-n: refcount
82   // meaning that mostly we ref by two to avoid altering the orphaned bit,
83   // and just unref by 1 when we're ready to flag the object as orphaned
84   gpr_atm refst;
85 
86   gpr_mu mu;
87   int shutdown;
88   int closed;
89   int released;
90   gpr_atm pollhup;
91   grpc_error_handle shutdown_error;
92 
93   // The watcher list.
94 
95   // The following watcher related fields are protected by watcher_mu.
96 
97   // An fd_watcher is an ephemeral object created when an fd wants to
98   // begin polling, and destroyed after the poll.
99 
100   // It denotes the fd's interest in whether to read poll or write poll
101   // or both or neither on this fd.
102 
103   // If a watcher is asked to poll for reads or writes, the read_watcher
104   // or write_watcher fields are set respectively. A watcher may be asked
105   // to poll for both, in which case both fields will be set.
106 
107   // read_watcher and write_watcher may be NULL if no watcher has been
108   // asked to poll for reads or writes.
109 
110   // If an fd_watcher is not asked to poll for reads or writes, it's added
111   // to a linked list of inactive watchers, rooted at inactive_watcher_root.
112   // If at a later time there becomes need of a poller to poll, one of
113   // the inactive pollers may be kicked out of their poll loops to take
114   // that responsibility.
115   grpc_fd_watcher inactive_watcher_root;
116   grpc_fd_watcher* read_watcher;
117   grpc_fd_watcher* write_watcher;
118 
119   grpc_closure* read_closure;
120   grpc_closure* write_closure;
121 
122   grpc_closure* on_done_closure;
123 
124   grpc_iomgr_object iomgr_object;
125 
126   // Only used when GRPC_ENABLE_FORK_SUPPORT=1
127   grpc_fork_fd_list* fork_fd_list;
128 
129   bool is_pre_allocated;
130 };
131 
132 // True when GRPC_ENABLE_FORK_SUPPORT=1.
133 static bool track_fds_for_fork = false;
134 
135 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
136 static grpc_fork_fd_list* fork_fd_list_head = nullptr;
137 static gpr_mu fork_fd_list_mu;
138 
139 // Begin polling on an fd.
140 // Registers that the given pollset is interested in this fd - so that if read
141 // or writability interest changes, the pollset can be kicked to pick up that
142 // new interest.
143 // Return value is:
144 //   (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
145 // i.e. a combination of read_mask and write_mask determined by the fd's current
146 // interest in said events.
147 // Polling strategies that do not need to alter their behavior depending on the
148 // fd's current interest (such as epoll) do not need to call this function.
149 // MUST NOT be called with a pollset lock taken
150 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
151                               grpc_pollset_worker* worker, uint32_t read_mask,
152                               uint32_t write_mask, grpc_fd_watcher* watcher);
153 // Complete polling previously started with fd_begin_poll
154 // MUST NOT be called with a pollset lock taken
155 // if got_read or got_write are 1, also does the become_{readable,writable} as
156 // appropriate.
157 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write);
158 
159 // Return 1 if this fd is orphaned, 0 otherwise
160 static bool fd_is_orphaned(grpc_fd* fd);
161 
162 #ifndef NDEBUG
163 static void fd_ref(grpc_fd* fd, const char* reason, const char* file, int line);
164 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
165                      int line);
166 #define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
167 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
168 #else
169 static void fd_ref(grpc_fd* fd);
170 static void fd_unref(grpc_fd* fd);
171 #define GRPC_FD_REF(fd, reason) fd_ref(fd)
172 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
173 #endif
174 
175 #define CLOSURE_NOT_READY ((grpc_closure*)0)
176 #define CLOSURE_READY ((grpc_closure*)1)
177 
178 //******************************************************************************
179 // pollset declarations
180 //
181 
182 typedef struct grpc_cached_wakeup_fd {
183   grpc_wakeup_fd fd;
184   struct grpc_cached_wakeup_fd* next;
185 
186   // Only used when GRPC_ENABLE_FORK_SUPPORT=1
187   grpc_fork_fd_list* fork_fd_list;
188 } grpc_cached_wakeup_fd;
189 
190 struct grpc_pollset_worker {
191   grpc_cached_wakeup_fd* wakeup_fd;
192   int reevaluate_polling_on_wakeup;
193   int kicked_specifically;
194   struct grpc_pollset_worker* next;
195   struct grpc_pollset_worker* prev;
196 };
197 
198 struct grpc_pollset {
199   gpr_mu mu;
200   grpc_pollset_worker root_worker;
201   int shutting_down;
202   int called_shutdown;
203   int kicked_without_pollers;
204   grpc_closure* shutdown_done;
205   int pollset_set_count;
206   // all polled fds
207   size_t fd_count;
208   size_t fd_capacity;
209   grpc_fd** fds;
210   // Local cache of eventfds for workers
211   grpc_cached_wakeup_fd* local_wakeup_cache;
212 };
213 
214 // Add an fd to a pollset
215 static void pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd);
216 
217 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
218 
219 // Convert a timespec to milliseconds:
220 // - very small or negative poll times are clamped to zero to do a
221 //   non-blocking poll (which becomes spin polling)
222 // - other small values are rounded up to one millisecond
223 // - longer than a millisecond polls are rounded up to the next nearest
224 //   millisecond to avoid spinning
225 // - infinite timeouts are converted to -1
226 static int poll_deadline_to_millis_timeout(grpc_core::Timestamp deadline);
227 
228 // Allow kick to wakeup the currently polling worker
229 #define GRPC_POLLSET_CAN_KICK_SELF 1
230 // Force the wakee to repoll when awoken
231 #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
232 // As per pollset_kick, with an extended set of flags (defined above)
233 // -- mostly for fd_posix's use.
234 static grpc_error_handle pollset_kick_ext(grpc_pollset* p,
235                                           grpc_pollset_worker* specific_worker,
236                                           uint32_t flags);
237 
238 // Return 1 if the pollset has active threads in pollset_work (pollset must
239 // be locked)
240 static bool pollset_has_workers(grpc_pollset* pollset);
241 
242 //******************************************************************************
243 // pollset_set definitions
244 //
245 
246 struct grpc_pollset_set {
247   gpr_mu mu;
248 
249   size_t pollset_count;
250   size_t pollset_capacity;
251   grpc_pollset** pollsets;
252 
253   size_t pollset_set_count;
254   size_t pollset_set_capacity;
255   struct grpc_pollset_set** pollset_sets;
256 
257   size_t fd_count;
258   size_t fd_capacity;
259   grpc_fd** fds;
260 };
261 
262 //******************************************************************************
263 // functions to track opened fds. No-ops unless track_fds_for_fork is true.
264 //
265 
fork_fd_list_remove_node(grpc_fork_fd_list * node)266 static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
267   gpr_mu_lock(&fork_fd_list_mu);
268   if (fork_fd_list_head == node) {
269     fork_fd_list_head = node->next;
270   }
271   if (node->prev != nullptr) {
272     node->prev->next = node->next;
273   }
274   if (node->next != nullptr) {
275     node->next->prev = node->prev;
276   }
277   gpr_free(node);
278   gpr_mu_unlock(&fork_fd_list_mu);
279 }
280 
fork_fd_list_remove_grpc_fd(grpc_fd * fd)281 static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
282   if (track_fds_for_fork) {
283     fork_fd_list_remove_node(fd->fork_fd_list);
284   }
285 }
286 
fork_fd_list_remove_wakeup_fd(grpc_cached_wakeup_fd * fd)287 static void fork_fd_list_remove_wakeup_fd(grpc_cached_wakeup_fd* fd) {
288   if (track_fds_for_fork) {
289     fork_fd_list_remove_node(fd->fork_fd_list);
290   }
291 }
292 
fork_fd_list_add_node(grpc_fork_fd_list * node)293 static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
294   gpr_mu_lock(&fork_fd_list_mu);
295   node->next = fork_fd_list_head;
296   node->prev = nullptr;
297   if (fork_fd_list_head != nullptr) {
298     fork_fd_list_head->prev = node;
299   }
300   fork_fd_list_head = node;
301   gpr_mu_unlock(&fork_fd_list_mu);
302 }
303 
fork_fd_list_add_grpc_fd(grpc_fd * fd)304 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
305   if (track_fds_for_fork) {
306     fd->fork_fd_list =
307         static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
308     fd->fork_fd_list->fd = fd;
309     fd->fork_fd_list->cached_wakeup_fd = nullptr;
310     fork_fd_list_add_node(fd->fork_fd_list);
311   }
312 }
313 
fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd * fd)314 static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
315   if (track_fds_for_fork) {
316     fd->fork_fd_list =
317         static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
318     fd->fork_fd_list->cached_wakeup_fd = fd;
319     fd->fork_fd_list->fd = nullptr;
320     fork_fd_list_add_node(fd->fork_fd_list);
321   }
322 }
323 
324 //******************************************************************************
325 // fd_posix.c
326 //
327 
328 #ifndef NDEBUG
329 #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
330 #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
ref_by(grpc_fd * fd,int n,const char * reason,const char * file,int line)331 static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
332                    int line) {
333   GRPC_TRACE_VLOG(fd_refcount, 2)
334       << "FD " << fd->fd << " " << fd << "   ref " << n << " "
335       << gpr_atm_no_barrier_load(&fd->refst) << " -> "
336       << gpr_atm_no_barrier_load(&fd->refst) + n << " [" << reason << "; "
337       << file << ":" << line << "]";
338 #else
339 #define REF_BY(fd, n, reason) \
340   do {                        \
341     ref_by(fd, n);            \
342     (void)(reason);           \
343   } while (0)
344 #define UNREF_BY(fd, n, reason) \
345   do {                          \
346     unref_by(fd, n);            \
347     (void)(reason);             \
348   } while (0)
349 static void ref_by(grpc_fd* fd, int n) {
350 #endif
351   CHECK_GT(gpr_atm_no_barrier_fetch_add(&fd->refst, n), 0);
352 }
353 
354 #ifndef NDEBUG
355 static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
356                      int line) {
357   GRPC_TRACE_VLOG(fd_refcount, 2)
358       << "FD " << fd->fd << " " << fd << " unref " << n << " "
359       << gpr_atm_no_barrier_load(&fd->refst) << " -> "
360       << gpr_atm_no_barrier_load(&fd->refst) - n << " [" << reason << "; "
361       << file << ":" << line << "]";
362 #else
363 static void unref_by(grpc_fd* fd, int n) {
364 #endif
365   gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
366   if (old == n) {
367     gpr_mu_destroy(&fd->mu);
368     grpc_iomgr_unregister_object(&fd->iomgr_object);
369     fork_fd_list_remove_grpc_fd(fd);
370     if (fd->shutdown) {
371     }
372     fd->shutdown_error.~Status();
373     gpr_free(fd);
374   } else {
375     CHECK(old > n);
376   }
377 }
378 
379 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
380   // Avoid unused-parameter warning for debug-only parameter
381   (void)track_err;
382   DCHECK(track_err == false);
383   grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r)));
384   gpr_mu_init(&r->mu);
385   gpr_atm_rel_store(&r->refst, 1);
386   r->shutdown = 0;
387   new (&r->shutdown_error) absl::Status();
388   r->read_closure = CLOSURE_NOT_READY;
389   r->write_closure = CLOSURE_NOT_READY;
390   r->fd = fd;
391   r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
392       &r->inactive_watcher_root;
393   r->read_watcher = r->write_watcher = nullptr;
394   r->on_done_closure = nullptr;
395   r->closed = 0;
396   r->released = 0;
397   r->is_pre_allocated = false;
398   gpr_atm_no_barrier_store(&r->pollhup, 0);
399 
400   std::string name2 = absl::StrCat(name, " fd=", fd);
401   grpc_iomgr_register_object(&r->iomgr_object, name2.c_str());
402   fork_fd_list_add_grpc_fd(r);
403   return r;
404 }
405 
406 static bool fd_is_orphaned(grpc_fd* fd) {
407   return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
408 }
409 
410 static grpc_error_handle pollset_kick_locked(grpc_fd_watcher* watcher) {
411   gpr_mu_lock(&watcher->pollset->mu);
412   CHECK(watcher->worker);
413   grpc_error_handle err =
414       pollset_kick_ext(watcher->pollset, watcher->worker,
415                        GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
416   gpr_mu_unlock(&watcher->pollset->mu);
417   return err;
418 }
419 
420 static void maybe_wake_one_watcher_locked(grpc_fd* fd) {
421   if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
422     (void)pollset_kick_locked(fd->inactive_watcher_root.next);
423   } else if (fd->read_watcher) {
424     (void)pollset_kick_locked(fd->read_watcher);
425   } else if (fd->write_watcher) {
426     (void)pollset_kick_locked(fd->write_watcher);
427   }
428 }
429 
430 static void wake_all_watchers_locked(grpc_fd* fd) {
431   grpc_fd_watcher* watcher;
432   for (watcher = fd->inactive_watcher_root.next;
433        watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
434     (void)pollset_kick_locked(watcher);
435   }
436   if (fd->read_watcher) {
437     (void)pollset_kick_locked(fd->read_watcher);
438   }
439   if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
440     (void)pollset_kick_locked(fd->write_watcher);
441   }
442 }
443 
444 static int has_watchers(grpc_fd* fd) {
445   return fd->read_watcher != nullptr || fd->write_watcher != nullptr ||
446          fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
447 }
448 
449 static void close_fd_locked(grpc_fd* fd) {
450   fd->closed = 1;
451   if (!fd->released) {
452     if (!fd->is_pre_allocated) {
453       close(fd->fd);
454     }
455   }
456   grpc_core::ExecCtx::Run(DEBUG_LOCATION, fd->on_done_closure,
457                           absl::OkStatus());
458 }
459 
460 static int fd_wrapped_fd(grpc_fd* fd) {
461   if (fd->released || fd->closed) {
462     return -1;
463   } else {
464     return fd->fd;
465   }
466 }
467 
468 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
469                       const char* reason) {
470   fd->on_done_closure = on_done;
471   fd->released = release_fd != nullptr;
472   if (release_fd != nullptr) {
473     *release_fd = fd->fd;
474     fd->released = true;
475   }
476   gpr_mu_lock(&fd->mu);
477   REF_BY(fd, 1, reason);  // remove active status, but keep referenced
478   if (!has_watchers(fd)) {
479     close_fd_locked(fd);
480   } else {
481     wake_all_watchers_locked(fd);
482   }
483   gpr_mu_unlock(&fd->mu);
484   UNREF_BY(fd, 2, reason);  // drop the reference
485 }
486 
487 // increment refcount by two to avoid changing the orphan bit
488 #ifndef NDEBUG
489 static void fd_ref(grpc_fd* fd, const char* reason, const char* file,
490                    int line) {
491   ref_by(fd, 2, reason, file, line);
492 }
493 
494 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
495                      int line) {
496   unref_by(fd, 2, reason, file, line);
497 }
498 #else
499 static void fd_ref(grpc_fd* fd) { ref_by(fd, 2); }
500 
501 static void fd_unref(grpc_fd* fd) { unref_by(fd, 2); }
502 #endif
503 
504 static grpc_error_handle fd_shutdown_error(grpc_fd* fd) {
505   if (!fd->shutdown) {
506     return absl::OkStatus();
507   } else {
508     return grpc_error_set_int(
509         GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1),
510         grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE);
511   }
512 }
513 
514 static void notify_on_locked(grpc_fd* fd, grpc_closure** st,
515                              grpc_closure* closure) {
516   if (fd->shutdown || gpr_atm_no_barrier_load(&fd->pollhup)) {
517     grpc_core::ExecCtx::Run(
518         DEBUG_LOCATION, closure,
519         grpc_error_set_int(GRPC_ERROR_CREATE("FD shutdown"),
520                            grpc_core::StatusIntProperty::kRpcStatus,
521                            GRPC_STATUS_UNAVAILABLE));
522   } else if (*st == CLOSURE_NOT_READY) {
523     // not ready ==> switch to a waiting state by setting the closure
524     *st = closure;
525   } else if (*st == CLOSURE_READY) {
526     // already ready ==> queue the closure to run immediately
527     *st = CLOSURE_NOT_READY;
528     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, fd_shutdown_error(fd));
529     maybe_wake_one_watcher_locked(fd);
530   } else {
531     // upcallptr was set to a different closure.  This is an error!
532     grpc_core::Crash(
533         "User called a notify_on function with a previous callback still "
534         "pending");
535   }
536 }
537 
538 // returns 1 if state becomes not ready
539 static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {
540   if (*st == CLOSURE_READY) {
541     // duplicate ready ==> ignore
542     return 0;
543   } else if (*st == CLOSURE_NOT_READY) {
544     // not ready, and not waiting ==> flag ready
545     *st = CLOSURE_READY;
546     return 0;
547   } else {
548     // waiting ==> queue closure
549     grpc_core::ExecCtx::Run(DEBUG_LOCATION, *st, fd_shutdown_error(fd));
550     *st = CLOSURE_NOT_READY;
551     return 1;
552   }
553 }
554 
555 static void fd_shutdown(grpc_fd* fd, grpc_error_handle why) {
556   gpr_mu_lock(&fd->mu);
557   // only shutdown once
558   if (!fd->shutdown) {
559     fd->shutdown = 1;
560     fd->shutdown_error = why;
561     // signal read/write closed to OS so that future operations fail
562     if (!fd->is_pre_allocated) {
563       shutdown(fd->fd, SHUT_RDWR);
564     }
565     set_ready_locked(fd, &fd->read_closure);
566     set_ready_locked(fd, &fd->write_closure);
567   }
568   gpr_mu_unlock(&fd->mu);
569 }
570 
571 static bool fd_is_shutdown(grpc_fd* fd) {
572   gpr_mu_lock(&fd->mu);
573   bool r = fd->shutdown;
574   gpr_mu_unlock(&fd->mu);
575   return r;
576 }
577 
578 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
579   gpr_mu_lock(&fd->mu);
580   notify_on_locked(fd, &fd->read_closure, closure);
581   gpr_mu_unlock(&fd->mu);
582 }
583 
584 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
585   gpr_mu_lock(&fd->mu);
586   notify_on_locked(fd, &fd->write_closure, closure);
587   gpr_mu_unlock(&fd->mu);
588 }
589 
590 static void fd_notify_on_error(grpc_fd* /*fd*/, grpc_closure* closure) {
591   GRPC_TRACE_LOG(polling, ERROR)
592       << "Polling engine does not support tracking errors.";
593   grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::CancelledError());
594 }
595 
596 static void fd_set_readable(grpc_fd* fd) {
597   gpr_mu_lock(&fd->mu);
598   set_ready_locked(fd, &fd->read_closure);
599   gpr_mu_unlock(&fd->mu);
600 }
601 
602 static void fd_set_writable(grpc_fd* fd) {
603   gpr_mu_lock(&fd->mu);
604   set_ready_locked(fd, &fd->write_closure);
605   gpr_mu_unlock(&fd->mu);
606 }
607 
608 static void fd_set_error(grpc_fd* /*fd*/) {
609   GRPC_TRACE_LOG(polling, ERROR)
610       << "Polling engine does not support tracking errors.";
611 }
612 
613 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
614                               grpc_pollset_worker* worker, uint32_t read_mask,
615                               uint32_t write_mask, grpc_fd_watcher* watcher) {
616   uint32_t mask = 0;
617   grpc_closure* cur;
618   int requested;
619   // keep track of pollers that have requested our events, in case they change
620   //
621   GRPC_FD_REF(fd, "poll");
622 
623   gpr_mu_lock(&fd->mu);
624 
625   // if we are shutdown, then don't add to the watcher set
626   if (fd->shutdown) {
627     watcher->pollset = nullptr;
628     watcher->worker = nullptr;
629     gpr_mu_unlock(&fd->mu);
630     GRPC_FD_UNREF(fd, "poll");
631     return 0;
632   }
633 
634   // if there is nobody polling for read, but we need to, then start doing so
635   cur = fd->read_closure;
636   requested = cur != CLOSURE_READY;
637   if (read_mask && fd->read_watcher == nullptr && requested) {
638     fd->read_watcher = watcher;
639     mask |= read_mask;
640   }
641   // if there is nobody polling for write, but we need to, then start doing so
642   //
643   cur = fd->write_closure;
644   requested = cur != CLOSURE_READY;
645   if (write_mask && fd->write_watcher == nullptr && requested) {
646     fd->write_watcher = watcher;
647     mask |= write_mask;
648   }
649   // if not polling, remember this watcher in case we need someone to later
650   if (mask == 0 && worker != nullptr) {
651     watcher->next = &fd->inactive_watcher_root;
652     watcher->prev = watcher->next->prev;
653     watcher->next->prev = watcher->prev->next = watcher;
654   }
655   watcher->pollset = pollset;
656   watcher->worker = worker;
657   watcher->fd = fd;
658   gpr_mu_unlock(&fd->mu);
659 
660   return mask;
661 }
662 
663 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) {
664   int was_polling = 0;
665   int kick = 0;
666   grpc_fd* fd = watcher->fd;
667   if (fd == nullptr) {
668     return;
669   }
670 
671   gpr_mu_lock(&fd->mu);
672   if (watcher->pollset == nullptr) {
673     watcher->fd = nullptr;
674     gpr_mu_unlock(&fd->mu);
675     GRPC_FD_UNREF(fd, "multipoller_start");
676     return;
677   }
678 
679   if (watcher == fd->read_watcher) {
680     // remove read watcher, kick if we still need a read
681     was_polling = 1;
682     if (!got_read) {
683       kick = 1;
684     }
685     fd->read_watcher = nullptr;
686   }
687   if (watcher == fd->write_watcher) {
688     // remove write watcher, kick if we still need a write
689     was_polling = 1;
690     if (!got_write) {
691       kick = 1;
692     }
693     fd->write_watcher = nullptr;
694   }
695   if (!was_polling && watcher->worker != nullptr) {
696     // remove from inactive list
697     watcher->next->prev = watcher->prev;
698     watcher->prev->next = watcher->next;
699   }
700   if (got_read) {
701     if (set_ready_locked(fd, &fd->read_closure)) {
702       kick = 1;
703     }
704   }
705   if (got_write) {
706     if (set_ready_locked(fd, &fd->write_closure)) {
707       kick = 1;
708     }
709   }
710   if (kick) {
711     maybe_wake_one_watcher_locked(fd);
712   }
713   if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
714     close_fd_locked(fd);
715   }
716   gpr_mu_unlock(&fd->mu);
717 
718   GRPC_FD_UNREF(fd, "poll");
719 }
720 
721 static void fd_set_pre_allocated(grpc_fd* fd) { fd->is_pre_allocated = true; }
722 
723 //******************************************************************************
724 // pollset_posix.c
725 //
726 
727 static thread_local grpc_pollset* g_current_thread_poller;
728 static thread_local grpc_pollset_worker* g_current_thread_worker;
729 
730 static void remove_worker(grpc_pollset* /*p*/, grpc_pollset_worker* worker) {
731   worker->prev->next = worker->next;
732   worker->next->prev = worker->prev;
733 }
734 
735 static bool pollset_has_workers(grpc_pollset* p) {
736   return p->root_worker.next != &p->root_worker;
737 }
738 
739 static bool pollset_in_pollset_sets(grpc_pollset* p) {
740   return p->pollset_set_count;
741 }
742 
743 static bool pollset_has_observers(grpc_pollset* p) {
744   return pollset_has_workers(p) || pollset_in_pollset_sets(p);
745 }
746 
747 static grpc_pollset_worker* pop_front_worker(grpc_pollset* p) {
748   if (pollset_has_workers(p)) {
749     grpc_pollset_worker* w = p->root_worker.next;
750     remove_worker(p, w);
751     return w;
752   } else {
753     return nullptr;
754   }
755 }
756 
757 static void push_back_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
758   worker->next = &p->root_worker;
759   worker->prev = worker->next->prev;
760   worker->prev->next = worker->next->prev = worker;
761 }
762 
763 static void push_front_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
764   worker->prev = &p->root_worker;
765   worker->next = worker->prev->next;
766   worker->prev->next = worker->next->prev = worker;
767 }
768 
769 static void kick_append_error(grpc_error_handle* composite,
770                               grpc_error_handle error) {
771   if (error.ok()) return;
772   if (composite->ok()) {
773     *composite = GRPC_ERROR_CREATE("Kick Failure");
774   }
775   *composite = grpc_error_add_child(*composite, error);
776 }
777 
778 static grpc_error_handle pollset_kick_ext(grpc_pollset* p,
779                                           grpc_pollset_worker* specific_worker,
780                                           uint32_t flags) {
781   grpc_error_handle error;
782 
783   // pollset->mu already held
784   if (specific_worker != nullptr) {
785     if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
786       CHECK_EQ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP), 0u);
787       for (specific_worker = p->root_worker.next;
788            specific_worker != &p->root_worker;
789            specific_worker = specific_worker->next) {
790         kick_append_error(
791             &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
792       }
793       p->kicked_without_pollers = true;
794     } else if (g_current_thread_worker != specific_worker) {
795       if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
796         specific_worker->reevaluate_polling_on_wakeup = true;
797       }
798       specific_worker->kicked_specifically = true;
799       kick_append_error(&error,
800                         grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
801     } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
802       if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
803         specific_worker->reevaluate_polling_on_wakeup = true;
804       }
805       specific_worker->kicked_specifically = true;
806       kick_append_error(&error,
807                         grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
808     }
809   } else if (g_current_thread_poller != p) {
810     CHECK_EQ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP), 0u);
811     specific_worker = pop_front_worker(p);
812     if (specific_worker != nullptr) {
813       if (g_current_thread_worker == specific_worker) {
814         push_back_worker(p, specific_worker);
815         specific_worker = pop_front_worker(p);
816         if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
817             g_current_thread_worker == specific_worker) {
818           push_back_worker(p, specific_worker);
819           specific_worker = nullptr;
820         }
821       }
822       if (specific_worker != nullptr) {
823         push_back_worker(p, specific_worker);
824         kick_append_error(
825             &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
826       }
827     } else {
828       p->kicked_without_pollers = true;
829     }
830   }
831 
832   GRPC_LOG_IF_ERROR("pollset_kick_ext", error);
833   return error;
834 }
835 
836 static grpc_error_handle pollset_kick(grpc_pollset* p,
837                                       grpc_pollset_worker* specific_worker) {
838   return pollset_kick_ext(p, specific_worker, 0);
839 }
840 
841 // global state management
842 
843 static grpc_error_handle pollset_global_init(void) { return absl::OkStatus(); }
844 
845 // main interface
846 
847 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
848   gpr_mu_init(&pollset->mu);
849   *mu = &pollset->mu;
850   pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
851   pollset->shutting_down = 0;
852   pollset->called_shutdown = 0;
853   pollset->kicked_without_pollers = 0;
854   pollset->local_wakeup_cache = nullptr;
855   pollset->kicked_without_pollers = 0;
856   pollset->fd_count = 0;
857   pollset->fd_capacity = 0;
858   pollset->fds = nullptr;
859   pollset->pollset_set_count = 0;
860 }
861 
862 static void pollset_destroy(grpc_pollset* pollset) {
863   CHECK(!pollset_has_workers(pollset));
864   while (pollset->local_wakeup_cache) {
865     grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
866     fork_fd_list_remove_wakeup_fd(pollset->local_wakeup_cache);
867     grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
868     gpr_free(pollset->local_wakeup_cache);
869     pollset->local_wakeup_cache = next;
870   }
871   gpr_free(pollset->fds);
872   gpr_mu_destroy(&pollset->mu);
873 }
874 
875 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
876   gpr_mu_lock(&pollset->mu);
877   size_t i;
878   // TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here
879   for (i = 0; i < pollset->fd_count; i++) {
880     if (pollset->fds[i] == fd) goto exit;
881   }
882   if (pollset->fd_count == pollset->fd_capacity) {
883     pollset->fd_capacity =
884         std::max(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
885     pollset->fds = static_cast<grpc_fd**>(
886         gpr_realloc(pollset->fds, sizeof(grpc_fd*) * pollset->fd_capacity));
887   }
888   pollset->fds[pollset->fd_count++] = fd;
889   GRPC_FD_REF(fd, "multipoller");
890   (void)pollset_kick(pollset, nullptr);
891 exit:
892   gpr_mu_unlock(&pollset->mu);
893 }
894 
895 static void finish_shutdown(grpc_pollset* pollset) {
896   size_t i;
897   for (i = 0; i < pollset->fd_count; i++) {
898     GRPC_FD_UNREF(pollset->fds[i], "multipoller");
899   }
900   pollset->fd_count = 0;
901   grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_done,
902                           absl::OkStatus());
903 }
904 
905 static void work_combine_error(grpc_error_handle* composite,
906                                grpc_error_handle error) {
907   if (error.ok()) return;
908   if (composite->ok()) {
909     *composite = GRPC_ERROR_CREATE("pollset_work");
910   }
911   *composite = grpc_error_add_child(*composite, error);
912 }
913 
914 static grpc_error_handle pollset_work(grpc_pollset* pollset,
915                                       grpc_pollset_worker** worker_hdl,
916                                       grpc_core::Timestamp deadline) {
917   grpc_pollset_worker worker;
918   if (worker_hdl) *worker_hdl = &worker;
919   grpc_error_handle error;
920 
921   // Avoid malloc for small number of elements.
922   enum { inline_elements = 96 };
923   struct pollfd pollfd_space[inline_elements];
924   struct grpc_fd_watcher watcher_space[inline_elements];
925 
926   // pollset->mu already held
927   int added_worker = 0;
928   int locked = 1;
929   int queued_work = 0;
930   int keep_polling = 0;
931   // this must happen before we (potentially) drop pollset->mu
932   worker.next = worker.prev = nullptr;
933   worker.reevaluate_polling_on_wakeup = 0;
934   if (pollset->local_wakeup_cache != nullptr) {
935     worker.wakeup_fd = pollset->local_wakeup_cache;
936     pollset->local_wakeup_cache = worker.wakeup_fd->next;
937   } else {
938     worker.wakeup_fd = static_cast<grpc_cached_wakeup_fd*>(
939         gpr_malloc(sizeof(*worker.wakeup_fd)));
940     error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
941     fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
942     if (!error.ok()) {
943       GRPC_LOG_IF_ERROR("pollset_work", error);
944       return error;
945     }
946   }
947   worker.kicked_specifically = 0;
948   // If we're shutting down then we don't execute any extended work
949   if (pollset->shutting_down) {
950     goto done;
951   }
952   // Start polling, and keep doing so while we're being asked to
953   // re-evaluate our pollers (this allows poll() based pollers to
954   // ensure they don't miss wakeups)
955   keep_polling = 1;
956   g_current_thread_poller = pollset;
957   while (keep_polling) {
958     keep_polling = 0;
959     if (!pollset->kicked_without_pollers ||
960         deadline <= grpc_core::Timestamp::Now()) {
961       if (!added_worker) {
962         push_front_worker(pollset, &worker);
963         added_worker = 1;
964         g_current_thread_worker = &worker;
965       }
966 #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
967 #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
968 
969       int timeout;
970       int r;
971       size_t i, fd_count;
972       nfds_t pfd_count;
973       grpc_fd_watcher* watchers;
974       struct pollfd* pfds;
975 
976       timeout = poll_deadline_to_millis_timeout(deadline);
977 
978       if (pollset->fd_count + 2 <= inline_elements) {
979         pfds = pollfd_space;
980         watchers = watcher_space;
981       } else {
982         // Allocate one buffer to hold both pfds and watchers arrays
983         const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2);
984         const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2);
985         void* buf = gpr_malloc(pfd_size + watch_size);
986         pfds = static_cast<struct pollfd*>(buf);
987         watchers = static_cast<grpc_fd_watcher*>(
988             static_cast<void*>((static_cast<char*>(buf) + pfd_size)));
989       }
990 
991       fd_count = 0;
992       pfd_count = 1;
993       pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
994       pfds[0].events = POLLIN;
995       pfds[0].revents = 0;
996       for (i = 0; i < pollset->fd_count; i++) {
997         if (fd_is_orphaned(pollset->fds[i]) ||
998             gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) {
999           GRPC_FD_UNREF(pollset->fds[i], "multipoller");
1000         } else {
1001           pollset->fds[fd_count++] = pollset->fds[i];
1002           watchers[pfd_count].fd = pollset->fds[i];
1003           GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
1004           pfds[pfd_count].fd = pollset->fds[i]->fd;
1005           pfds[pfd_count].revents = 0;
1006           pfd_count++;
1007         }
1008       }
1009       pollset->fd_count = fd_count;
1010       gpr_mu_unlock(&pollset->mu);
1011 
1012       for (i = 1; i < pfd_count; i++) {
1013         grpc_fd* fd = watchers[i].fd;
1014         pfds[i].events = static_cast<short>(
1015             fd_begin_poll(fd, pollset, &worker, POLLIN, POLLOUT, &watchers[i]));
1016         if (watchers[i].pollset != nullptr) {
1017           GRPC_FD_UNREF(fd, "multipoller_start");
1018         }
1019       }
1020 
1021       // TODO(vpai): Consider first doing a 0 timeout poll here to avoid
1022       // even going into the blocking annotation if possible
1023       GRPC_SCHEDULING_START_BLOCKING_REGION;
1024       r = grpc_poll_function(pfds, pfd_count, timeout);
1025       GRPC_SCHEDULING_END_BLOCKING_REGION;
1026 
1027       GRPC_TRACE_LOG(polling, INFO) << pollset << " poll=" << r;
1028 
1029       if (r < 0) {
1030         if (errno != EINTR) {
1031           work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
1032         }
1033 
1034         for (i = 1; i < pfd_count; i++) {
1035           if (watchers[i].pollset == nullptr) {
1036             fd_end_poll(&watchers[i], 0, 0);
1037           } else {
1038             // Wake up all the file descriptors, if we have an invalid one
1039             // we can identify it on the next pollset_work()
1040             fd_end_poll(&watchers[i], 1, 1);
1041           }
1042         }
1043       } else if (r == 0) {
1044         for (i = 1; i < pfd_count; i++) {
1045           fd_end_poll(&watchers[i], 0, 0);
1046         }
1047       } else {
1048         if (pfds[0].revents & POLLIN_CHECK) {
1049           GRPC_TRACE_LOG(polling, INFO) << pollset << ": got_wakeup";
1050           work_combine_error(
1051               &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
1052         }
1053         for (i = 1; i < pfd_count; i++) {
1054           if (watchers[i].pollset == nullptr) {
1055             grpc_fd* fd = watchers[i].fd;
1056             if (pfds[i].revents & POLLHUP) {
1057               gpr_atm_no_barrier_store(&fd->pollhup, 1);
1058             }
1059             fd_end_poll(&watchers[i], 0, 0);
1060           } else {
1061             GRPC_TRACE_LOG(polling, INFO)
1062                 << pollset << " got_event: " << pfds[i].fd
1063                 << " r:" << ((pfds[i].revents & POLLIN_CHECK) != 0)
1064                 << " w:" << ((pfds[i].revents & POLLOUT_CHECK) != 0) << " ["
1065                 << pfds[i].revents << "]";
1066             // This is a mitigation to prevent poll() from spinning on a
1067             //* POLLHUP https://github.com/grpc/grpc/pull/13665
1068             //
1069             if (pfds[i].revents & POLLHUP) {
1070               gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
1071             }
1072             fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
1073                         pfds[i].revents & POLLOUT_CHECK);
1074           }
1075         }
1076       }
1077 
1078       if (pfds != pollfd_space) {
1079         // pfds and watchers are in the same memory block pointed to by pfds
1080         gpr_free(pfds);
1081       }
1082 
1083       locked = 0;
1084     } else {
1085       pollset->kicked_without_pollers = 0;
1086     }
1087   // Finished execution - start cleaning up.
1088   // Note that we may arrive here from outside the enclosing while() loop.
1089   // In that case we won't loop though as we haven't added worker to the
1090   // worker list, which means nobody could ask us to re-evaluate polling).
1091   done:
1092     if (!locked) {
1093       queued_work |= grpc_core::ExecCtx::Get()->Flush();
1094       gpr_mu_lock(&pollset->mu);
1095       locked = 1;
1096     }
1097     // If we're forced to re-evaluate polling (via pollset_kick with
1098     // GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
1099     // a loop
1100     if (worker.reevaluate_polling_on_wakeup && error.ok()) {
1101       worker.reevaluate_polling_on_wakeup = 0;
1102       pollset->kicked_without_pollers = 0;
1103       if (queued_work || worker.kicked_specifically) {
1104         // If there's queued work on the list, then set the deadline to be
1105         // immediate so we get back out of the polling loop quickly
1106         deadline = grpc_core::Timestamp();
1107       }
1108       keep_polling = 1;
1109     }
1110   }
1111   g_current_thread_poller = nullptr;
1112   if (added_worker) {
1113     remove_worker(pollset, &worker);
1114     g_current_thread_worker = nullptr;
1115   }
1116   // release wakeup fd to the local pool
1117   worker.wakeup_fd->next = pollset->local_wakeup_cache;
1118   pollset->local_wakeup_cache = worker.wakeup_fd;
1119   // check shutdown conditions
1120   if (pollset->shutting_down) {
1121     if (pollset_has_workers(pollset)) {
1122       (void)pollset_kick(pollset, nullptr);
1123     } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1124       pollset->called_shutdown = 1;
1125       gpr_mu_unlock(&pollset->mu);
1126       finish_shutdown(pollset);
1127       grpc_core::ExecCtx::Get()->Flush();
1128       // Continuing to access pollset here is safe -- it is the caller's
1129       // responsibility to not destroy when it has outstanding calls to
1130       // pollset_work.
1131       // TODO(dklempner): Can we refactor the shutdown logic to avoid this?
1132       gpr_mu_lock(&pollset->mu);
1133     }
1134   }
1135   if (worker_hdl) *worker_hdl = nullptr;
1136   GRPC_LOG_IF_ERROR("pollset_work", error);
1137   return error;
1138 }
1139 
1140 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
1141   CHECK(!pollset->shutting_down);
1142   pollset->shutting_down = 1;
1143   pollset->shutdown_done = closure;
1144   (void)pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1145   if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1146     pollset->called_shutdown = 1;
1147     finish_shutdown(pollset);
1148   }
1149 }
1150 
1151 static int poll_deadline_to_millis_timeout(grpc_core::Timestamp deadline) {
1152   if (deadline == grpc_core::Timestamp::InfFuture()) return -1;
1153   if (deadline.is_process_epoch()) return 0;
1154   int64_t n = (deadline - grpc_core::Timestamp::Now()).millis();
1155   if (n < 0) return 0;
1156   if (n > INT_MAX) return -1;
1157   return static_cast<int>(n);
1158 }
1159 
1160 //******************************************************************************
1161 // pollset_set_posix.c
1162 //
1163 
1164 static grpc_pollset_set* pollset_set_create(void) {
1165   grpc_pollset_set* pollset_set =
1166       static_cast<grpc_pollset_set*>(gpr_zalloc(sizeof(*pollset_set)));
1167   gpr_mu_init(&pollset_set->mu);
1168   return pollset_set;
1169 }
1170 
1171 static void pollset_set_destroy(grpc_pollset_set* pollset_set) {
1172   size_t i;
1173   gpr_mu_destroy(&pollset_set->mu);
1174   for (i = 0; i < pollset_set->fd_count; i++) {
1175     GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1176   }
1177   for (i = 0; i < pollset_set->pollset_count; i++) {
1178     grpc_pollset* pollset = pollset_set->pollsets[i];
1179     gpr_mu_lock(&pollset->mu);
1180     pollset->pollset_set_count--;
1181     // check shutdown
1182     if (pollset->shutting_down && !pollset->called_shutdown &&
1183         !pollset_has_observers(pollset)) {
1184       pollset->called_shutdown = 1;
1185       gpr_mu_unlock(&pollset->mu);
1186       finish_shutdown(pollset);
1187     } else {
1188       gpr_mu_unlock(&pollset->mu);
1189     }
1190   }
1191   gpr_free(pollset_set->pollsets);
1192   gpr_free(pollset_set->pollset_sets);
1193   gpr_free(pollset_set->fds);
1194   gpr_free(pollset_set);
1195 }
1196 
1197 static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
1198                                     grpc_pollset* pollset) {
1199   size_t i, j;
1200   gpr_mu_lock(&pollset->mu);
1201   pollset->pollset_set_count++;
1202   gpr_mu_unlock(&pollset->mu);
1203   gpr_mu_lock(&pollset_set->mu);
1204   if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1205     pollset_set->pollset_capacity =
1206         std::max(size_t{8}, 2 * pollset_set->pollset_capacity);
1207     pollset_set->pollsets = static_cast<grpc_pollset**>(gpr_realloc(
1208         pollset_set->pollsets,
1209         pollset_set->pollset_capacity * sizeof(*pollset_set->pollsets)));
1210   }
1211   pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1212   for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1213     if (fd_is_orphaned(pollset_set->fds[i])) {
1214       GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1215     } else {
1216       pollset_add_fd(pollset, pollset_set->fds[i]);
1217       pollset_set->fds[j++] = pollset_set->fds[i];
1218     }
1219   }
1220   pollset_set->fd_count = j;
1221   gpr_mu_unlock(&pollset_set->mu);
1222 }
1223 
1224 static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
1225                                     grpc_pollset* pollset) {
1226   size_t i;
1227   gpr_mu_lock(&pollset_set->mu);
1228   for (i = 0; i < pollset_set->pollset_count; i++) {
1229     if (pollset_set->pollsets[i] == pollset) {
1230       pollset_set->pollset_count--;
1231       std::swap(pollset_set->pollsets[i],
1232                 pollset_set->pollsets[pollset_set->pollset_count]);
1233       break;
1234     }
1235   }
1236   gpr_mu_unlock(&pollset_set->mu);
1237   gpr_mu_lock(&pollset->mu);
1238   pollset->pollset_set_count--;
1239   // check shutdown
1240   if (pollset->shutting_down && !pollset->called_shutdown &&
1241       !pollset_has_observers(pollset)) {
1242     pollset->called_shutdown = 1;
1243     gpr_mu_unlock(&pollset->mu);
1244     finish_shutdown(pollset);
1245   } else {
1246     gpr_mu_unlock(&pollset->mu);
1247   }
1248 }
1249 
1250 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1251                                         grpc_pollset_set* item) {
1252   size_t i, j;
1253   gpr_mu_lock(&bag->mu);
1254   if (bag->pollset_set_count == bag->pollset_set_capacity) {
1255     bag->pollset_set_capacity =
1256         std::max(size_t{8}, 2 * bag->pollset_set_capacity);
1257     bag->pollset_sets = static_cast<grpc_pollset_set**>(
1258         gpr_realloc(bag->pollset_sets,
1259                     bag->pollset_set_capacity * sizeof(*bag->pollset_sets)));
1260   }
1261   bag->pollset_sets[bag->pollset_set_count++] = item;
1262   for (i = 0, j = 0; i < bag->fd_count; i++) {
1263     if (fd_is_orphaned(bag->fds[i])) {
1264       GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1265     } else {
1266       pollset_set_add_fd(item, bag->fds[i]);
1267       bag->fds[j++] = bag->fds[i];
1268     }
1269   }
1270   bag->fd_count = j;
1271   gpr_mu_unlock(&bag->mu);
1272 }
1273 
1274 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1275                                         grpc_pollset_set* item) {
1276   size_t i;
1277   gpr_mu_lock(&bag->mu);
1278   for (i = 0; i < bag->pollset_set_count; i++) {
1279     if (bag->pollset_sets[i] == item) {
1280       bag->pollset_set_count--;
1281       std::swap(bag->pollset_sets[i],
1282                 bag->pollset_sets[bag->pollset_set_count]);
1283       break;
1284     }
1285   }
1286   gpr_mu_unlock(&bag->mu);
1287 }
1288 
1289 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1290   size_t i;
1291   gpr_mu_lock(&pollset_set->mu);
1292   if (pollset_set->fd_count == pollset_set->fd_capacity) {
1293     pollset_set->fd_capacity =
1294         std::max(size_t{8}, 2 * pollset_set->fd_capacity);
1295     pollset_set->fds = static_cast<grpc_fd**>(
1296         gpr_realloc(pollset_set->fds,
1297                     pollset_set->fd_capacity * sizeof(*pollset_set->fds)));
1298   }
1299   GRPC_FD_REF(fd, "pollset_set");
1300   pollset_set->fds[pollset_set->fd_count++] = fd;
1301   for (i = 0; i < pollset_set->pollset_count; i++) {
1302     pollset_add_fd(pollset_set->pollsets[i], fd);
1303   }
1304   for (i = 0; i < pollset_set->pollset_set_count; i++) {
1305     pollset_set_add_fd(pollset_set->pollset_sets[i], fd);
1306   }
1307   gpr_mu_unlock(&pollset_set->mu);
1308 }
1309 
1310 static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1311   size_t i;
1312   gpr_mu_lock(&pollset_set->mu);
1313   for (i = 0; i < pollset_set->fd_count; i++) {
1314     if (pollset_set->fds[i] == fd) {
1315       pollset_set->fd_count--;
1316       std::swap(pollset_set->fds[i], pollset_set->fds[pollset_set->fd_count]);
1317       GRPC_FD_UNREF(fd, "pollset_set");
1318       break;
1319     }
1320   }
1321   for (i = 0; i < pollset_set->pollset_set_count; i++) {
1322     pollset_set_del_fd(pollset_set->pollset_sets[i], fd);
1323   }
1324   gpr_mu_unlock(&pollset_set->mu);
1325 }
1326 
1327 //******************************************************************************
1328 // event engine binding
1329 //
1330 
1331 static bool is_any_background_poller_thread(void) { return false; }
1332 
1333 static void shutdown_background_closure(void) {}
1334 
1335 static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
1336                                              grpc_error_handle /*error*/) {
1337   return false;
1338 }
1339 
1340 // Called by the child process's post-fork handler to close open fds, including
1341 // worker wakeup fds. This allows gRPC to shutdown in the child process without
1342 // interfering with connections or RPCs ongoing in the parent.
1343 static void reset_event_manager_on_fork() {
1344   gpr_mu_lock(&fork_fd_list_mu);
1345   while (fork_fd_list_head != nullptr) {
1346     if (fork_fd_list_head->fd != nullptr) {
1347       if (!fork_fd_list_head->fd->closed) {
1348         close(fork_fd_list_head->fd->fd);
1349       }
1350       fork_fd_list_head->fd->fd = -1;
1351     } else {
1352       close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
1353       fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
1354       close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
1355       fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
1356     }
1357     fork_fd_list_head = fork_fd_list_head->next;
1358   }
1359   gpr_mu_unlock(&fork_fd_list_mu);
1360 }
1361 
1362 const grpc_event_engine_vtable grpc_ev_poll_posix = {
1363     sizeof(grpc_pollset),
1364     false,
1365     false,
1366 
1367     fd_create,
1368     fd_wrapped_fd,
1369     fd_orphan,
1370     fd_shutdown,
1371     fd_notify_on_read,
1372     fd_notify_on_write,
1373     fd_notify_on_error,
1374     fd_set_readable,
1375     fd_set_writable,
1376     fd_set_error,
1377     fd_is_shutdown,
1378 
1379     pollset_init,
1380     pollset_shutdown,
1381     pollset_destroy,
1382     pollset_work,
1383     pollset_kick,
1384     pollset_add_fd,
1385 
1386     pollset_set_create,
1387     pollset_set_destroy,
1388     pollset_set_add_pollset,
1389     pollset_set_del_pollset,
1390     pollset_set_add_pollset_set,
1391     pollset_set_del_pollset_set,
1392     pollset_set_add_fd,
1393     pollset_set_del_fd,
1394 
1395     is_any_background_poller_thread,
1396     /* name = */ "poll",
1397     // check_engine_available =
1398     [](bool) {
1399       if (!grpc_has_wakeup_fd()) {
1400         LOG(ERROR) << "Skipping poll because of no wakeup fd.";
1401         return false;
1402       }
1403       if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1404         return false;
1405       }
1406       if (grpc_core::Fork::Enabled()) {
1407         if (grpc_core::Fork::RegisterResetChildPollingEngineFunc(
1408                 reset_event_manager_on_fork)) {
1409           track_fds_for_fork = true;
1410           gpr_mu_init(&fork_fd_list_mu);
1411         }
1412       }
1413       return true;
1414     },
1415     /* init_engine = */ []() {},
1416     /* shutdown_engine = */ shutdown_background_closure,
1417     []() {},
1418     add_closure_to_background_poller,
1419 
1420     fd_set_pre_allocated,
1421 };
1422 
1423 namespace {
1424 
1425 grpc_poll_function_type real_poll_function;
1426 
1427 int phony_poll(struct pollfd fds[], nfds_t nfds, int timeout) {
1428   if (timeout == 0) {
1429     return real_poll_function(fds, nfds, 0);
1430   } else {
1431     grpc_core::Crash("Attempted a blocking poll when declared non-polling.");
1432     return -1;
1433   }
1434 }
1435 
1436 }  // namespace
1437 
1438 const grpc_event_engine_vtable grpc_ev_none_posix = []() {
1439   grpc_event_engine_vtable v = grpc_ev_poll_posix;
1440   v.check_engine_available = [](bool explicit_request) {
1441     if (!explicit_request) return false;
1442     // return the simplest engine as a phony but also override the poller
1443     if (!grpc_ev_poll_posix.check_engine_available(explicit_request)) {
1444       return false;
1445     }
1446     real_poll_function = grpc_poll_function;
1447     grpc_poll_function = phony_poll;
1448     return true;
1449   };
1450   v.name = "none";
1451   v.init_engine = []() {};
1452   v.shutdown_engine = []() {};
1453   return v;
1454 }();
1455 
1456 #endif  // GRPC_POSIX_SOCKET_EV_POLL
1457