• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_POSIX_SOCKET_EV_POLL
24 
25 #include "src/core/lib/iomgr/ev_poll_posix.h"
26 
27 #include <assert.h>
28 #include <errno.h>
29 #include <limits.h>
30 #include <poll.h>
31 #include <string.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34 
35 #include <grpc/support/alloc.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/string_util.h>
38 
39 #include "src/core/lib/debug/stats.h"
40 #include "src/core/lib/gpr/murmur_hash.h"
41 #include "src/core/lib/gpr/tls.h"
42 #include "src/core/lib/gpr/useful.h"
43 #include "src/core/lib/gprpp/thd.h"
44 #include "src/core/lib/iomgr/block_annotate.h"
45 #include "src/core/lib/iomgr/iomgr_internal.h"
46 #include "src/core/lib/iomgr/wakeup_fd_cv.h"
47 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
48 #include "src/core/lib/profiling/timers.h"
49 
50 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
51 
52 /*******************************************************************************
53  * FD declarations
54  */
55 typedef struct grpc_fd_watcher {
56   struct grpc_fd_watcher* next;
57   struct grpc_fd_watcher* prev;
58   grpc_pollset* pollset;
59   grpc_pollset_worker* worker;
60   grpc_fd* fd;
61 } grpc_fd_watcher;
62 
63 typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;
64 
65 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
66 struct grpc_fork_fd_list {
67   /* Only one of fd or cached_wakeup_fd will be set. The unused field will be
68   set to nullptr. */
69   grpc_fd* fd;
70   grpc_cached_wakeup_fd* cached_wakeup_fd;
71 
72   grpc_fork_fd_list* next;
73   grpc_fork_fd_list* prev;
74 };
75 
76 struct grpc_fd {
77   int fd;
78   /* refst format:
79      bit0:   1=active/0=orphaned
80      bit1-n: refcount
81      meaning that mostly we ref by two to avoid altering the orphaned bit,
82      and just unref by 1 when we're ready to flag the object as orphaned */
83   gpr_atm refst;
84 
85   gpr_mu mu;
86   int shutdown;
87   int closed;
88   int released;
89   gpr_atm pollhup;
90   grpc_error* shutdown_error;
91 
92   /* The watcher list.
93 
94      The following watcher related fields are protected by watcher_mu.
95 
96      An fd_watcher is an ephemeral object created when an fd wants to
97      begin polling, and destroyed after the poll.
98 
99      It denotes the fd's interest in whether to read poll or write poll
100      or both or neither on this fd.
101 
102      If a watcher is asked to poll for reads or writes, the read_watcher
103      or write_watcher fields are set respectively. A watcher may be asked
104      to poll for both, in which case both fields will be set.
105 
106      read_watcher and write_watcher may be NULL if no watcher has been
107      asked to poll for reads or writes.
108 
109      If an fd_watcher is not asked to poll for reads or writes, it's added
110      to a linked list of inactive watchers, rooted at inactive_watcher_root.
111      If at a later time there becomes need of a poller to poll, one of
112      the inactive pollers may be kicked out of their poll loops to take
113      that responsibility. */
114   grpc_fd_watcher inactive_watcher_root;
115   grpc_fd_watcher* read_watcher;
116   grpc_fd_watcher* write_watcher;
117 
118   grpc_closure* read_closure;
119   grpc_closure* write_closure;
120 
121   grpc_closure* on_done_closure;
122 
123   grpc_iomgr_object iomgr_object;
124 
125   /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
126   grpc_fork_fd_list* fork_fd_list;
127 };
128 
129 /* True when GRPC_ENABLE_FORK_SUPPORT=1. We do not support fork with poll-cv */
130 static bool track_fds_for_fork = false;
131 
132 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
133 static grpc_fork_fd_list* fork_fd_list_head = nullptr;
134 static gpr_mu fork_fd_list_mu;
135 
136 /* Begin polling on an fd.
137    Registers that the given pollset is interested in this fd - so that if read
138    or writability interest changes, the pollset can be kicked to pick up that
139    new interest.
140    Return value is:
141      (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
142    i.e. a combination of read_mask and write_mask determined by the fd's current
143    interest in said events.
144    Polling strategies that do not need to alter their behavior depending on the
145    fd's current interest (such as epoll) do not need to call this function.
146    MUST NOT be called with a pollset lock taken */
147 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
148                               grpc_pollset_worker* worker, uint32_t read_mask,
149                               uint32_t write_mask, grpc_fd_watcher* rec);
150 /* Complete polling previously started with fd_begin_poll
151    MUST NOT be called with a pollset lock taken
152    if got_read or got_write are 1, also does the become_{readable,writable} as
153    appropriate. */
154 static void fd_end_poll(grpc_fd_watcher* rec, int got_read, int got_write);
155 
156 /* Return 1 if this fd is orphaned, 0 otherwise */
157 static bool fd_is_orphaned(grpc_fd* fd);
158 
159 #ifndef NDEBUG
160 static void fd_ref(grpc_fd* fd, const char* reason, const char* file, int line);
161 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
162                      int line);
163 #define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
164 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
165 #else
166 static void fd_ref(grpc_fd* fd);
167 static void fd_unref(grpc_fd* fd);
168 #define GRPC_FD_REF(fd, reason) fd_ref(fd)
169 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
170 #endif
171 
172 #define CLOSURE_NOT_READY ((grpc_closure*)0)
173 #define CLOSURE_READY ((grpc_closure*)1)
174 
175 /*******************************************************************************
176  * pollset declarations
177  */
178 
179 typedef struct grpc_cached_wakeup_fd {
180   grpc_wakeup_fd fd;
181   struct grpc_cached_wakeup_fd* next;
182 
183   /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
184   grpc_fork_fd_list* fork_fd_list;
185 } grpc_cached_wakeup_fd;
186 
187 struct grpc_pollset_worker {
188   grpc_cached_wakeup_fd* wakeup_fd;
189   int reevaluate_polling_on_wakeup;
190   int kicked_specifically;
191   struct grpc_pollset_worker* next;
192   struct grpc_pollset_worker* prev;
193 };
194 
195 struct grpc_pollset {
196   gpr_mu mu;
197   grpc_pollset_worker root_worker;
198   int shutting_down;
199   int called_shutdown;
200   int kicked_without_pollers;
201   grpc_closure* shutdown_done;
202   int pollset_set_count;
203   /* all polled fds */
204   size_t fd_count;
205   size_t fd_capacity;
206   grpc_fd** fds;
207   /* Local cache of eventfds for workers */
208   grpc_cached_wakeup_fd* local_wakeup_cache;
209 };
210 
211 /* Add an fd to a pollset */
212 static void pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd);
213 
214 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
215 
216 /* Convert a timespec to milliseconds:
217    - very small or negative poll times are clamped to zero to do a
218      non-blocking poll (which becomes spin polling)
219    - other small values are rounded up to one millisecond
220    - longer than a millisecond polls are rounded up to the next nearest
221      millisecond to avoid spinning
222    - infinite timeouts are converted to -1 */
223 static int poll_deadline_to_millis_timeout(grpc_millis deadline);
224 
225 /* Allow kick to wakeup the currently polling worker */
226 #define GRPC_POLLSET_CAN_KICK_SELF 1
227 /* Force the wakee to repoll when awoken */
228 #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
229 /* As per pollset_kick, with an extended set of flags (defined above)
230    -- mostly for fd_posix's use. */
231 static grpc_error* pollset_kick_ext(grpc_pollset* p,
232                                     grpc_pollset_worker* specific_worker,
233                                     uint32_t flags) GRPC_MUST_USE_RESULT;
234 
235 /* Return 1 if the pollset has active threads in pollset_work (pollset must
236  * be locked) */
237 static bool pollset_has_workers(grpc_pollset* pollset);
238 
239 /*******************************************************************************
240  * pollset_set definitions
241  */
242 
243 struct grpc_pollset_set {
244   gpr_mu mu;
245 
246   size_t pollset_count;
247   size_t pollset_capacity;
248   grpc_pollset** pollsets;
249 
250   size_t pollset_set_count;
251   size_t pollset_set_capacity;
252   struct grpc_pollset_set** pollset_sets;
253 
254   size_t fd_count;
255   size_t fd_capacity;
256   grpc_fd** fds;
257 };
258 
259 /*******************************************************************************
260  * condition variable polling definitions
261  */
262 
263 #define POLLCV_THREAD_GRACE_MS 1000
264 #define CV_POLL_PERIOD_MS 1000
265 #define CV_DEFAULT_TABLE_SIZE 16
266 
267 typedef struct poll_result {
268   gpr_refcount refcount;
269   grpc_cv_node* watchers;
270   int watchcount;
271   struct pollfd* fds;
272   nfds_t nfds;
273   int retval;
274   int err;
275   int completed;
276 } poll_result;
277 
278 typedef struct poll_args {
279   grpc_core::Thread poller_thd;
280   gpr_cv trigger;
281   int trigger_set;
282   bool harvestable;
283   gpr_cv harvest;
284   bool joinable;
285   gpr_cv join;
286   struct pollfd* fds;
287   nfds_t nfds;
288   poll_result* result;
289   struct poll_args* next;
290   struct poll_args* prev;
291 } poll_args;
292 
293 // This is a 2-tiered cache, we mantain a hash table
294 // of active poll calls, so we can wait on the result
295 // of that call.  We also maintain freelists of inactive
296 // poll args and of dead poller threads.
297 typedef struct poll_hash_table {
298   poll_args* free_pollers;
299   poll_args** active_pollers;
300   poll_args* dead_pollers;
301   unsigned int size;
302   unsigned int count;
303 } poll_hash_table;
304 
305 // TODO(kpayson64): Eliminate use of global non-POD variables
306 poll_hash_table poll_cache;
307 grpc_cv_fd_table g_cvfds;
308 
309 /*******************************************************************************
310  * functions to track opened fds. No-ops unless track_fds_for_fork is true.
311  */
312 
fork_fd_list_remove_node(grpc_fork_fd_list * node)313 static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
314   if (track_fds_for_fork) {
315     gpr_mu_lock(&fork_fd_list_mu);
316     if (fork_fd_list_head == node) {
317       fork_fd_list_head = node->next;
318     }
319     if (node->prev != nullptr) {
320       node->prev->next = node->next;
321     }
322     if (node->next != nullptr) {
323       node->next->prev = node->prev;
324     }
325     gpr_free(node);
326     gpr_mu_unlock(&fork_fd_list_mu);
327   }
328 }
329 
fork_fd_list_add_node(grpc_fork_fd_list * node)330 static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
331   gpr_mu_lock(&fork_fd_list_mu);
332   node->next = fork_fd_list_head;
333   node->prev = nullptr;
334   if (fork_fd_list_head != nullptr) {
335     fork_fd_list_head->prev = node;
336   }
337   fork_fd_list_head = node;
338   gpr_mu_unlock(&fork_fd_list_mu);
339 }
340 
fork_fd_list_add_grpc_fd(grpc_fd * fd)341 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
342   if (track_fds_for_fork) {
343     fd->fork_fd_list =
344         static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
345     fd->fork_fd_list->fd = fd;
346     fd->fork_fd_list->cached_wakeup_fd = nullptr;
347     fork_fd_list_add_node(fd->fork_fd_list);
348   }
349 }
350 
fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd * fd)351 static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
352   if (track_fds_for_fork) {
353     fd->fork_fd_list =
354         static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
355     fd->fork_fd_list->cached_wakeup_fd = fd;
356     fd->fork_fd_list->fd = nullptr;
357     fork_fd_list_add_node(fd->fork_fd_list);
358   }
359 }
360 
361   /*******************************************************************************
362    * fd_posix.c
363    */
364 
365 #ifndef NDEBUG
366 #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
367 #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
ref_by(grpc_fd * fd,int n,const char * reason,const char * file,int line)368 static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
369                    int line) {
370   if (grpc_trace_fd_refcount.enabled()) {
371     gpr_log(GPR_DEBUG,
372             "FD %d %p   ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
373             fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
374             gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
375   }
376 #else
377 #define REF_BY(fd, n, reason) ref_by(fd, n)
378 #define UNREF_BY(fd, n, reason) unref_by(fd, n)
379 static void ref_by(grpc_fd* fd, int n) {
380 #endif
381   GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
382 }
383 
384 #ifndef NDEBUG
385 static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
386                      int line) {
387   if (grpc_trace_fd_refcount.enabled()) {
388     gpr_log(GPR_DEBUG,
389             "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
390             fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
391             gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
392   }
393 #else
394 static void unref_by(grpc_fd* fd, int n) {
395 #endif
396   gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
397   if (old == n) {
398     gpr_mu_destroy(&fd->mu);
399     grpc_iomgr_unregister_object(&fd->iomgr_object);
400     fork_fd_list_remove_node(fd->fork_fd_list);
401     if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
402     gpr_free(fd);
403   } else {
404     GPR_ASSERT(old > n);
405   }
406 }
407 
408 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
409   GPR_DEBUG_ASSERT(track_err == false);
410   grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r)));
411   gpr_mu_init(&r->mu);
412   gpr_atm_rel_store(&r->refst, 1);
413   r->shutdown = 0;
414   r->read_closure = CLOSURE_NOT_READY;
415   r->write_closure = CLOSURE_NOT_READY;
416   r->fd = fd;
417   r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
418       &r->inactive_watcher_root;
419   r->read_watcher = r->write_watcher = nullptr;
420   r->on_done_closure = nullptr;
421   r->closed = 0;
422   r->released = 0;
423   gpr_atm_no_barrier_store(&r->pollhup, 0);
424 
425   char* name2;
426   gpr_asprintf(&name2, "%s fd=%d", name, fd);
427   grpc_iomgr_register_object(&r->iomgr_object, name2);
428   gpr_free(name2);
429   fork_fd_list_add_grpc_fd(r);
430   return r;
431 }
432 
433 static bool fd_is_orphaned(grpc_fd* fd) {
434   return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
435 }
436 
437 static grpc_error* pollset_kick_locked(grpc_fd_watcher* watcher) {
438   gpr_mu_lock(&watcher->pollset->mu);
439   GPR_ASSERT(watcher->worker);
440   grpc_error* err = pollset_kick_ext(watcher->pollset, watcher->worker,
441                                      GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
442   gpr_mu_unlock(&watcher->pollset->mu);
443   return err;
444 }
445 
446 static void maybe_wake_one_watcher_locked(grpc_fd* fd) {
447   if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
448     pollset_kick_locked(fd->inactive_watcher_root.next);
449   } else if (fd->read_watcher) {
450     pollset_kick_locked(fd->read_watcher);
451   } else if (fd->write_watcher) {
452     pollset_kick_locked(fd->write_watcher);
453   }
454 }
455 
456 static void wake_all_watchers_locked(grpc_fd* fd) {
457   grpc_fd_watcher* watcher;
458   for (watcher = fd->inactive_watcher_root.next;
459        watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
460     pollset_kick_locked(watcher);
461   }
462   if (fd->read_watcher) {
463     pollset_kick_locked(fd->read_watcher);
464   }
465   if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
466     pollset_kick_locked(fd->write_watcher);
467   }
468 }
469 
470 static int has_watchers(grpc_fd* fd) {
471   return fd->read_watcher != nullptr || fd->write_watcher != nullptr ||
472          fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
473 }
474 
475 static void close_fd_locked(grpc_fd* fd) {
476   fd->closed = 1;
477   if (!fd->released) {
478     close(fd->fd);
479   }
480   GRPC_CLOSURE_SCHED(fd->on_done_closure, GRPC_ERROR_NONE);
481 }
482 
483 static int fd_wrapped_fd(grpc_fd* fd) {
484   if (fd->released || fd->closed) {
485     return -1;
486   } else {
487     return fd->fd;
488   }
489 }
490 
491 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
492                       const char* reason) {
493   fd->on_done_closure = on_done;
494   fd->released = release_fd != nullptr;
495   if (release_fd != nullptr) {
496     *release_fd = fd->fd;
497     fd->released = true;
498   }
499   gpr_mu_lock(&fd->mu);
500   REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
501   if (!has_watchers(fd)) {
502     close_fd_locked(fd);
503   } else {
504     wake_all_watchers_locked(fd);
505   }
506   gpr_mu_unlock(&fd->mu);
507   UNREF_BY(fd, 2, reason); /* drop the reference */
508 }
509 
510 /* increment refcount by two to avoid changing the orphan bit */
511 #ifndef NDEBUG
512 static void fd_ref(grpc_fd* fd, const char* reason, const char* file,
513                    int line) {
514   ref_by(fd, 2, reason, file, line);
515 }
516 
517 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
518                      int line) {
519   unref_by(fd, 2, reason, file, line);
520 }
521 #else
522 static void fd_ref(grpc_fd* fd) { ref_by(fd, 2); }
523 
524 static void fd_unref(grpc_fd* fd) { unref_by(fd, 2); }
525 #endif
526 
527 static grpc_error* fd_shutdown_error(grpc_fd* fd) {
528   if (!fd->shutdown) {
529     return GRPC_ERROR_NONE;
530   } else {
531     return grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
532                                   "FD shutdown", &fd->shutdown_error, 1),
533                               GRPC_ERROR_INT_GRPC_STATUS,
534                               GRPC_STATUS_UNAVAILABLE);
535   }
536 }
537 
538 static void notify_on_locked(grpc_fd* fd, grpc_closure** st,
539                              grpc_closure* closure) {
540   if (fd->shutdown || gpr_atm_no_barrier_load(&fd->pollhup)) {
541     GRPC_CLOSURE_SCHED(
542         closure, grpc_error_set_int(
543                      GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"),
544                      GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
545   } else if (*st == CLOSURE_NOT_READY) {
546     /* not ready ==> switch to a waiting state by setting the closure */
547     *st = closure;
548   } else if (*st == CLOSURE_READY) {
549     /* already ready ==> queue the closure to run immediately */
550     *st = CLOSURE_NOT_READY;
551     GRPC_CLOSURE_SCHED(closure, fd_shutdown_error(fd));
552     maybe_wake_one_watcher_locked(fd);
553   } else {
554     /* upcallptr was set to a different closure.  This is an error! */
555     gpr_log(GPR_ERROR,
556             "User called a notify_on function with a previous callback still "
557             "pending");
558     abort();
559   }
560 }
561 
562 /* returns 1 if state becomes not ready */
563 static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {
564   if (*st == CLOSURE_READY) {
565     /* duplicate ready ==> ignore */
566     return 0;
567   } else if (*st == CLOSURE_NOT_READY) {
568     /* not ready, and not waiting ==> flag ready */
569     *st = CLOSURE_READY;
570     return 0;
571   } else {
572     /* waiting ==> queue closure */
573     GRPC_CLOSURE_SCHED(*st, fd_shutdown_error(fd));
574     *st = CLOSURE_NOT_READY;
575     return 1;
576   }
577 }
578 
579 static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
580   gpr_mu_lock(&fd->mu);
581   /* only shutdown once */
582   if (!fd->shutdown) {
583     fd->shutdown = 1;
584     fd->shutdown_error = why;
585     /* signal read/write closed to OS so that future operations fail */
586     shutdown(fd->fd, SHUT_RDWR);
587     set_ready_locked(fd, &fd->read_closure);
588     set_ready_locked(fd, &fd->write_closure);
589   } else {
590     GRPC_ERROR_UNREF(why);
591   }
592   gpr_mu_unlock(&fd->mu);
593 }
594 
595 static bool fd_is_shutdown(grpc_fd* fd) {
596   gpr_mu_lock(&fd->mu);
597   bool r = fd->shutdown;
598   gpr_mu_unlock(&fd->mu);
599   return r;
600 }
601 
602 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
603   gpr_mu_lock(&fd->mu);
604   notify_on_locked(fd, &fd->read_closure, closure);
605   gpr_mu_unlock(&fd->mu);
606 }
607 
608 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
609   gpr_mu_lock(&fd->mu);
610   notify_on_locked(fd, &fd->write_closure, closure);
611   gpr_mu_unlock(&fd->mu);
612 }
613 
614 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
615   if (grpc_polling_trace.enabled()) {
616     gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
617   }
618   GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
619 }
620 
621 static void fd_set_readable(grpc_fd* fd) {
622   gpr_mu_lock(&fd->mu);
623   set_ready_locked(fd, &fd->read_closure);
624   gpr_mu_unlock(&fd->mu);
625 }
626 
627 static void fd_set_writable(grpc_fd* fd) {
628   gpr_mu_lock(&fd->mu);
629   set_ready_locked(fd, &fd->write_closure);
630   gpr_mu_unlock(&fd->mu);
631 }
632 
633 static void fd_set_error(grpc_fd* fd) {
634   if (grpc_polling_trace.enabled()) {
635     gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
636   }
637 }
638 
639 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
640                               grpc_pollset_worker* worker, uint32_t read_mask,
641                               uint32_t write_mask, grpc_fd_watcher* watcher) {
642   uint32_t mask = 0;
643   grpc_closure* cur;
644   int requested;
645   /* keep track of pollers that have requested our events, in case they change
646    */
647   GRPC_FD_REF(fd, "poll");
648 
649   gpr_mu_lock(&fd->mu);
650 
651   /* if we are shutdown, then don't add to the watcher set */
652   if (fd->shutdown) {
653     watcher->fd = nullptr;
654     watcher->pollset = nullptr;
655     watcher->worker = nullptr;
656     gpr_mu_unlock(&fd->mu);
657     GRPC_FD_UNREF(fd, "poll");
658     return 0;
659   }
660 
661   /* if there is nobody polling for read, but we need to, then start doing so */
662   cur = fd->read_closure;
663   requested = cur != CLOSURE_READY;
664   if (read_mask && fd->read_watcher == nullptr && requested) {
665     fd->read_watcher = watcher;
666     mask |= read_mask;
667   }
668   /* if there is nobody polling for write, but we need to, then start doing so
669    */
670   cur = fd->write_closure;
671   requested = cur != CLOSURE_READY;
672   if (write_mask && fd->write_watcher == nullptr && requested) {
673     fd->write_watcher = watcher;
674     mask |= write_mask;
675   }
676   /* if not polling, remember this watcher in case we need someone to later */
677   if (mask == 0 && worker != nullptr) {
678     watcher->next = &fd->inactive_watcher_root;
679     watcher->prev = watcher->next->prev;
680     watcher->next->prev = watcher->prev->next = watcher;
681   }
682   watcher->pollset = pollset;
683   watcher->worker = worker;
684   watcher->fd = fd;
685   gpr_mu_unlock(&fd->mu);
686 
687   return mask;
688 }
689 
690 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) {
691   int was_polling = 0;
692   int kick = 0;
693   grpc_fd* fd = watcher->fd;
694 
695   if (fd == nullptr) {
696     return;
697   }
698 
699   gpr_mu_lock(&fd->mu);
700 
701   if (watcher == fd->read_watcher) {
702     /* remove read watcher, kick if we still need a read */
703     was_polling = 1;
704     if (!got_read) {
705       kick = 1;
706     }
707     fd->read_watcher = nullptr;
708   }
709   if (watcher == fd->write_watcher) {
710     /* remove write watcher, kick if we still need a write */
711     was_polling = 1;
712     if (!got_write) {
713       kick = 1;
714     }
715     fd->write_watcher = nullptr;
716   }
717   if (!was_polling && watcher->worker != nullptr) {
718     /* remove from inactive list */
719     watcher->next->prev = watcher->prev;
720     watcher->prev->next = watcher->next;
721   }
722   if (got_read) {
723     if (set_ready_locked(fd, &fd->read_closure)) {
724       kick = 1;
725     }
726   }
727   if (got_write) {
728     if (set_ready_locked(fd, &fd->write_closure)) {
729       kick = 1;
730     }
731   }
732   if (kick) {
733     maybe_wake_one_watcher_locked(fd);
734   }
735   if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
736     close_fd_locked(fd);
737   }
738   gpr_mu_unlock(&fd->mu);
739 
740   GRPC_FD_UNREF(fd, "poll");
741 }
742 
743 /*******************************************************************************
744  * pollset_posix.c
745  */
746 
747 GPR_TLS_DECL(g_current_thread_poller);
748 GPR_TLS_DECL(g_current_thread_worker);
749 
750 static void remove_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
751   worker->prev->next = worker->next;
752   worker->next->prev = worker->prev;
753 }
754 
755 static bool pollset_has_workers(grpc_pollset* p) {
756   return p->root_worker.next != &p->root_worker;
757 }
758 
759 static bool pollset_in_pollset_sets(grpc_pollset* p) {
760   return p->pollset_set_count;
761 }
762 
763 static bool pollset_has_observers(grpc_pollset* p) {
764   return pollset_has_workers(p) || pollset_in_pollset_sets(p);
765 }
766 
767 static grpc_pollset_worker* pop_front_worker(grpc_pollset* p) {
768   if (pollset_has_workers(p)) {
769     grpc_pollset_worker* w = p->root_worker.next;
770     remove_worker(p, w);
771     return w;
772   } else {
773     return nullptr;
774   }
775 }
776 
777 static void push_back_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
778   worker->next = &p->root_worker;
779   worker->prev = worker->next->prev;
780   worker->prev->next = worker->next->prev = worker;
781 }
782 
783 static void push_front_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
784   worker->prev = &p->root_worker;
785   worker->next = worker->prev->next;
786   worker->prev->next = worker->next->prev = worker;
787 }
788 
789 static void kick_append_error(grpc_error** composite, grpc_error* error) {
790   if (error == GRPC_ERROR_NONE) return;
791   if (*composite == GRPC_ERROR_NONE) {
792     *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Kick Failure");
793   }
794   *composite = grpc_error_add_child(*composite, error);
795 }
796 
797 static grpc_error* pollset_kick_ext(grpc_pollset* p,
798                                     grpc_pollset_worker* specific_worker,
799                                     uint32_t flags) {
800   GPR_TIMER_SCOPE("pollset_kick_ext", 0);
801   grpc_error* error = GRPC_ERROR_NONE;
802   GRPC_STATS_INC_POLLSET_KICK();
803 
804   /* pollset->mu already held */
805   if (specific_worker != nullptr) {
806     if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
807       GPR_TIMER_SCOPE("pollset_kick_ext.broadcast", 0);
808       GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
809       for (specific_worker = p->root_worker.next;
810            specific_worker != &p->root_worker;
811            specific_worker = specific_worker->next) {
812         kick_append_error(
813             &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
814       }
815       p->kicked_without_pollers = true;
816     } else if (gpr_tls_get(&g_current_thread_worker) !=
817                (intptr_t)specific_worker) {
818       GPR_TIMER_MARK("different_thread_worker", 0);
819       if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
820         specific_worker->reevaluate_polling_on_wakeup = true;
821       }
822       specific_worker->kicked_specifically = true;
823       kick_append_error(&error,
824                         grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
825     } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
826       GPR_TIMER_MARK("kick_yoself", 0);
827       if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
828         specific_worker->reevaluate_polling_on_wakeup = true;
829       }
830       specific_worker->kicked_specifically = true;
831       kick_append_error(&error,
832                         grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
833     }
834   } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
835     GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
836     GPR_TIMER_MARK("kick_anonymous", 0);
837     specific_worker = pop_front_worker(p);
838     if (specific_worker != nullptr) {
839       if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
840         GPR_TIMER_MARK("kick_anonymous_not_self", 0);
841         push_back_worker(p, specific_worker);
842         specific_worker = pop_front_worker(p);
843         if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
844             gpr_tls_get(&g_current_thread_worker) ==
845                 (intptr_t)specific_worker) {
846           push_back_worker(p, specific_worker);
847           specific_worker = nullptr;
848         }
849       }
850       if (specific_worker != nullptr) {
851         GPR_TIMER_MARK("finally_kick", 0);
852         push_back_worker(p, specific_worker);
853         kick_append_error(
854             &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
855       }
856     } else {
857       GPR_TIMER_MARK("kicked_no_pollers", 0);
858       p->kicked_without_pollers = true;
859     }
860   }
861 
862   GRPC_LOG_IF_ERROR("pollset_kick_ext", GRPC_ERROR_REF(error));
863   return error;
864 }
865 
866 static grpc_error* pollset_kick(grpc_pollset* p,
867                                 grpc_pollset_worker* specific_worker) {
868   return pollset_kick_ext(p, specific_worker, 0);
869 }
870 
871 /* global state management */
872 
873 static grpc_error* pollset_global_init(void) {
874   gpr_tls_init(&g_current_thread_poller);
875   gpr_tls_init(&g_current_thread_worker);
876   return GRPC_ERROR_NONE;
877 }
878 
879 static void pollset_global_shutdown(void) {
880   gpr_tls_destroy(&g_current_thread_poller);
881   gpr_tls_destroy(&g_current_thread_worker);
882 }
883 
884 /* main interface */
885 
886 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
887   gpr_mu_init(&pollset->mu);
888   *mu = &pollset->mu;
889   pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
890   pollset->shutting_down = 0;
891   pollset->called_shutdown = 0;
892   pollset->kicked_without_pollers = 0;
893   pollset->local_wakeup_cache = nullptr;
894   pollset->kicked_without_pollers = 0;
895   pollset->fd_count = 0;
896   pollset->fd_capacity = 0;
897   pollset->fds = nullptr;
898   pollset->pollset_set_count = 0;
899 }
900 
901 static void pollset_destroy(grpc_pollset* pollset) {
902   GPR_ASSERT(!pollset_has_workers(pollset));
903   while (pollset->local_wakeup_cache) {
904     grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
905     fork_fd_list_remove_node(pollset->local_wakeup_cache->fork_fd_list);
906     grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
907     gpr_free(pollset->local_wakeup_cache);
908     pollset->local_wakeup_cache = next;
909   }
910   gpr_free(pollset->fds);
911   gpr_mu_destroy(&pollset->mu);
912 }
913 
914 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
915   gpr_mu_lock(&pollset->mu);
916   size_t i;
917   /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
918   for (i = 0; i < pollset->fd_count; i++) {
919     if (pollset->fds[i] == fd) goto exit;
920   }
921   if (pollset->fd_count == pollset->fd_capacity) {
922     pollset->fd_capacity =
923         GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
924     pollset->fds = static_cast<grpc_fd**>(
925         gpr_realloc(pollset->fds, sizeof(grpc_fd*) * pollset->fd_capacity));
926   }
927   pollset->fds[pollset->fd_count++] = fd;
928   GRPC_FD_REF(fd, "multipoller");
929   pollset_kick(pollset, nullptr);
930 exit:
931   gpr_mu_unlock(&pollset->mu);
932 }
933 
934 static void finish_shutdown(grpc_pollset* pollset) {
935   size_t i;
936   for (i = 0; i < pollset->fd_count; i++) {
937     GRPC_FD_UNREF(pollset->fds[i], "multipoller");
938   }
939   pollset->fd_count = 0;
940   GRPC_CLOSURE_SCHED(pollset->shutdown_done, GRPC_ERROR_NONE);
941 }
942 
943 static void work_combine_error(grpc_error** composite, grpc_error* error) {
944   if (error == GRPC_ERROR_NONE) return;
945   if (*composite == GRPC_ERROR_NONE) {
946     *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("pollset_work");
947   }
948   *composite = grpc_error_add_child(*composite, error);
949 }
950 
951 static grpc_error* pollset_work(grpc_pollset* pollset,
952                                 grpc_pollset_worker** worker_hdl,
953                                 grpc_millis deadline) {
954   GPR_TIMER_SCOPE("pollset_work", 0);
955   grpc_pollset_worker worker;
956   if (worker_hdl) *worker_hdl = &worker;
957   grpc_error* error = GRPC_ERROR_NONE;
958 
959   /* Avoid malloc for small number of elements. */
960   enum { inline_elements = 96 };
961   struct pollfd pollfd_space[inline_elements];
962   struct grpc_fd_watcher watcher_space[inline_elements];
963 
964   /* pollset->mu already held */
965   int added_worker = 0;
966   int locked = 1;
967   int queued_work = 0;
968   int keep_polling = 0;
969   /* this must happen before we (potentially) drop pollset->mu */
970   worker.next = worker.prev = nullptr;
971   worker.reevaluate_polling_on_wakeup = 0;
972   if (pollset->local_wakeup_cache != nullptr) {
973     worker.wakeup_fd = pollset->local_wakeup_cache;
974     pollset->local_wakeup_cache = worker.wakeup_fd->next;
975   } else {
976     worker.wakeup_fd = static_cast<grpc_cached_wakeup_fd*>(
977         gpr_malloc(sizeof(*worker.wakeup_fd)));
978     error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
979     fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
980     if (error != GRPC_ERROR_NONE) {
981       GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
982       return error;
983     }
984   }
985   worker.kicked_specifically = 0;
986   /* If we're shutting down then we don't execute any extended work */
987   if (pollset->shutting_down) {
988     GPR_TIMER_MARK("pollset_work.shutting_down", 0);
989     goto done;
990   }
991   /* Start polling, and keep doing so while we're being asked to
992      re-evaluate our pollers (this allows poll() based pollers to
993      ensure they don't miss wakeups) */
994   keep_polling = 1;
995   gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
996   while (keep_polling) {
997     keep_polling = 0;
998     if (!pollset->kicked_without_pollers ||
999         deadline <= grpc_core::ExecCtx::Get()->Now()) {
1000       if (!added_worker) {
1001         push_front_worker(pollset, &worker);
1002         added_worker = 1;
1003         gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
1004       }
1005       GPR_TIMER_SCOPE("maybe_work_and_unlock", 0);
1006 #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
1007 #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
1008 
1009       int timeout;
1010       int r;
1011       size_t i, fd_count;
1012       nfds_t pfd_count;
1013       grpc_fd_watcher* watchers;
1014       struct pollfd* pfds;
1015 
1016       timeout = poll_deadline_to_millis_timeout(deadline);
1017 
1018       if (pollset->fd_count + 2 <= inline_elements) {
1019         pfds = pollfd_space;
1020         watchers = watcher_space;
1021       } else {
1022         /* Allocate one buffer to hold both pfds and watchers arrays */
1023         const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2);
1024         const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2);
1025         void* buf = gpr_malloc(pfd_size + watch_size);
1026         pfds = static_cast<struct pollfd*>(buf);
1027         watchers = static_cast<grpc_fd_watcher*>(
1028             (void*)(static_cast<char*>(buf) + pfd_size));
1029       }
1030 
1031       fd_count = 0;
1032       pfd_count = 1;
1033       pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
1034       pfds[0].events = POLLIN;
1035       pfds[0].revents = 0;
1036       for (i = 0; i < pollset->fd_count; i++) {
1037         if (fd_is_orphaned(pollset->fds[i]) ||
1038             gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) {
1039           GRPC_FD_UNREF(pollset->fds[i], "multipoller");
1040         } else {
1041           pollset->fds[fd_count++] = pollset->fds[i];
1042           watchers[pfd_count].fd = pollset->fds[i];
1043           GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
1044           pfds[pfd_count].fd = pollset->fds[i]->fd;
1045           pfds[pfd_count].revents = 0;
1046           pfd_count++;
1047         }
1048       }
1049       pollset->fd_count = fd_count;
1050       gpr_mu_unlock(&pollset->mu);
1051 
1052       for (i = 1; i < pfd_count; i++) {
1053         grpc_fd* fd = watchers[i].fd;
1054         pfds[i].events = static_cast<short>(
1055             fd_begin_poll(fd, pollset, &worker, POLLIN, POLLOUT, &watchers[i]));
1056         GRPC_FD_UNREF(fd, "multipoller_start");
1057       }
1058 
1059       /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
1060          even going into the blocking annotation if possible */
1061       GRPC_SCHEDULING_START_BLOCKING_REGION;
1062       GRPC_STATS_INC_SYSCALL_POLL();
1063       r = grpc_poll_function(pfds, pfd_count, timeout);
1064       GRPC_SCHEDULING_END_BLOCKING_REGION;
1065 
1066       if (grpc_polling_trace.enabled()) {
1067         gpr_log(GPR_INFO, "%p poll=%d", pollset, r);
1068       }
1069 
1070       if (r < 0) {
1071         if (errno != EINTR) {
1072           work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
1073         }
1074 
1075         for (i = 1; i < pfd_count; i++) {
1076           if (watchers[i].fd == nullptr) {
1077             fd_end_poll(&watchers[i], 0, 0);
1078           } else {
1079             // Wake up all the file descriptors, if we have an invalid one
1080             // we can identify it on the next pollset_work()
1081             fd_end_poll(&watchers[i], 1, 1);
1082           }
1083         }
1084       } else if (r == 0) {
1085         for (i = 1; i < pfd_count; i++) {
1086           fd_end_poll(&watchers[i], 0, 0);
1087         }
1088       } else {
1089         if (pfds[0].revents & POLLIN_CHECK) {
1090           if (grpc_polling_trace.enabled()) {
1091             gpr_log(GPR_INFO, "%p: got_wakeup", pollset);
1092           }
1093           work_combine_error(
1094               &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
1095         }
1096         for (i = 1; i < pfd_count; i++) {
1097           if (watchers[i].fd == nullptr) {
1098             fd_end_poll(&watchers[i], 0, 0);
1099           } else {
1100             if (grpc_polling_trace.enabled()) {
1101               gpr_log(GPR_INFO, "%p got_event: %d r:%d w:%d [%d]", pollset,
1102                       pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
1103                       (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
1104             }
1105             /* This is a mitigation to prevent poll() from spinning on a
1106              ** POLLHUP https://github.com/grpc/grpc/pull/13665
1107              */
1108             if (pfds[i].revents & POLLHUP) {
1109               gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
1110             }
1111             fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
1112                         pfds[i].revents & POLLOUT_CHECK);
1113           }
1114         }
1115       }
1116 
1117       if (pfds != pollfd_space) {
1118         /* pfds and watchers are in the same memory block pointed to by pfds */
1119         gpr_free(pfds);
1120       }
1121 
1122       locked = 0;
1123     } else {
1124       GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1125       pollset->kicked_without_pollers = 0;
1126     }
1127   /* Finished execution - start cleaning up.
1128      Note that we may arrive here from outside the enclosing while() loop.
1129      In that case we won't loop though as we haven't added worker to the
1130      worker list, which means nobody could ask us to re-evaluate polling). */
1131   done:
1132     if (!locked) {
1133       queued_work |= grpc_core::ExecCtx::Get()->Flush();
1134       gpr_mu_lock(&pollset->mu);
1135       locked = 1;
1136     }
1137     /* If we're forced to re-evaluate polling (via pollset_kick with
1138        GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
1139        a loop */
1140     if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) {
1141       worker.reevaluate_polling_on_wakeup = 0;
1142       pollset->kicked_without_pollers = 0;
1143       if (queued_work || worker.kicked_specifically) {
1144         /* If there's queued work on the list, then set the deadline to be
1145            immediate so we get back out of the polling loop quickly */
1146         deadline = 0;
1147       }
1148       keep_polling = 1;
1149     }
1150   }
1151   gpr_tls_set(&g_current_thread_poller, 0);
1152   if (added_worker) {
1153     remove_worker(pollset, &worker);
1154     gpr_tls_set(&g_current_thread_worker, 0);
1155   }
1156   /* release wakeup fd to the local pool */
1157   worker.wakeup_fd->next = pollset->local_wakeup_cache;
1158   pollset->local_wakeup_cache = worker.wakeup_fd;
1159   /* check shutdown conditions */
1160   if (pollset->shutting_down) {
1161     if (pollset_has_workers(pollset)) {
1162       pollset_kick(pollset, nullptr);
1163     } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1164       pollset->called_shutdown = 1;
1165       gpr_mu_unlock(&pollset->mu);
1166       finish_shutdown(pollset);
1167       grpc_core::ExecCtx::Get()->Flush();
1168       /* Continuing to access pollset here is safe -- it is the caller's
1169        * responsibility to not destroy when it has outstanding calls to
1170        * pollset_work.
1171        * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
1172       gpr_mu_lock(&pollset->mu);
1173     }
1174   }
1175   if (worker_hdl) *worker_hdl = nullptr;
1176   GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1177   return error;
1178 }
1179 
1180 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
1181   GPR_ASSERT(!pollset->shutting_down);
1182   pollset->shutting_down = 1;
1183   pollset->shutdown_done = closure;
1184   pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1185   if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1186     pollset->called_shutdown = 1;
1187     finish_shutdown(pollset);
1188   }
1189 }
1190 
1191 static int poll_deadline_to_millis_timeout(grpc_millis deadline) {
1192   if (deadline == GRPC_MILLIS_INF_FUTURE) return -1;
1193   if (deadline == 0) return 0;
1194   grpc_millis n = deadline - grpc_core::ExecCtx::Get()->Now();
1195   if (n < 0) return 0;
1196   if (n > INT_MAX) return -1;
1197   return static_cast<int>(n);
1198 }
1199 
1200 /*******************************************************************************
1201  * pollset_set_posix.c
1202  */
1203 
1204 static grpc_pollset_set* pollset_set_create(void) {
1205   grpc_pollset_set* pollset_set =
1206       static_cast<grpc_pollset_set*>(gpr_zalloc(sizeof(*pollset_set)));
1207   gpr_mu_init(&pollset_set->mu);
1208   return pollset_set;
1209 }
1210 
1211 static void pollset_set_destroy(grpc_pollset_set* pollset_set) {
1212   size_t i;
1213   gpr_mu_destroy(&pollset_set->mu);
1214   for (i = 0; i < pollset_set->fd_count; i++) {
1215     GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1216   }
1217   for (i = 0; i < pollset_set->pollset_count; i++) {
1218     grpc_pollset* pollset = pollset_set->pollsets[i];
1219     gpr_mu_lock(&pollset->mu);
1220     pollset->pollset_set_count--;
1221     /* check shutdown */
1222     if (pollset->shutting_down && !pollset->called_shutdown &&
1223         !pollset_has_observers(pollset)) {
1224       pollset->called_shutdown = 1;
1225       gpr_mu_unlock(&pollset->mu);
1226       finish_shutdown(pollset);
1227     } else {
1228       gpr_mu_unlock(&pollset->mu);
1229     }
1230   }
1231   gpr_free(pollset_set->pollsets);
1232   gpr_free(pollset_set->pollset_sets);
1233   gpr_free(pollset_set->fds);
1234   gpr_free(pollset_set);
1235 }
1236 
1237 static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
1238                                     grpc_pollset* pollset) {
1239   size_t i, j;
1240   gpr_mu_lock(&pollset->mu);
1241   pollset->pollset_set_count++;
1242   gpr_mu_unlock(&pollset->mu);
1243   gpr_mu_lock(&pollset_set->mu);
1244   if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1245     pollset_set->pollset_capacity =
1246         GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1247     pollset_set->pollsets = static_cast<grpc_pollset**>(gpr_realloc(
1248         pollset_set->pollsets,
1249         pollset_set->pollset_capacity * sizeof(*pollset_set->pollsets)));
1250   }
1251   pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1252   for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1253     if (fd_is_orphaned(pollset_set->fds[i])) {
1254       GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1255     } else {
1256       pollset_add_fd(pollset, pollset_set->fds[i]);
1257       pollset_set->fds[j++] = pollset_set->fds[i];
1258     }
1259   }
1260   pollset_set->fd_count = j;
1261   gpr_mu_unlock(&pollset_set->mu);
1262 }
1263 
1264 static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
1265                                     grpc_pollset* pollset) {
1266   size_t i;
1267   gpr_mu_lock(&pollset_set->mu);
1268   for (i = 0; i < pollset_set->pollset_count; i++) {
1269     if (pollset_set->pollsets[i] == pollset) {
1270       pollset_set->pollset_count--;
1271       GPR_SWAP(grpc_pollset*, pollset_set->pollsets[i],
1272                pollset_set->pollsets[pollset_set->pollset_count]);
1273       break;
1274     }
1275   }
1276   gpr_mu_unlock(&pollset_set->mu);
1277   gpr_mu_lock(&pollset->mu);
1278   pollset->pollset_set_count--;
1279   /* check shutdown */
1280   if (pollset->shutting_down && !pollset->called_shutdown &&
1281       !pollset_has_observers(pollset)) {
1282     pollset->called_shutdown = 1;
1283     gpr_mu_unlock(&pollset->mu);
1284     finish_shutdown(pollset);
1285   } else {
1286     gpr_mu_unlock(&pollset->mu);
1287   }
1288 }
1289 
1290 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1291                                         grpc_pollset_set* item) {
1292   size_t i, j;
1293   gpr_mu_lock(&bag->mu);
1294   if (bag->pollset_set_count == bag->pollset_set_capacity) {
1295     bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1296     bag->pollset_sets = static_cast<grpc_pollset_set**>(
1297         gpr_realloc(bag->pollset_sets,
1298                     bag->pollset_set_capacity * sizeof(*bag->pollset_sets)));
1299   }
1300   bag->pollset_sets[bag->pollset_set_count++] = item;
1301   for (i = 0, j = 0; i < bag->fd_count; i++) {
1302     if (fd_is_orphaned(bag->fds[i])) {
1303       GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1304     } else {
1305       pollset_set_add_fd(item, bag->fds[i]);
1306       bag->fds[j++] = bag->fds[i];
1307     }
1308   }
1309   bag->fd_count = j;
1310   gpr_mu_unlock(&bag->mu);
1311 }
1312 
1313 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1314                                         grpc_pollset_set* item) {
1315   size_t i;
1316   gpr_mu_lock(&bag->mu);
1317   for (i = 0; i < bag->pollset_set_count; i++) {
1318     if (bag->pollset_sets[i] == item) {
1319       bag->pollset_set_count--;
1320       GPR_SWAP(grpc_pollset_set*, bag->pollset_sets[i],
1321                bag->pollset_sets[bag->pollset_set_count]);
1322       break;
1323     }
1324   }
1325   gpr_mu_unlock(&bag->mu);
1326 }
1327 
1328 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1329   size_t i;
1330   gpr_mu_lock(&pollset_set->mu);
1331   if (pollset_set->fd_count == pollset_set->fd_capacity) {
1332     pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1333     pollset_set->fds = static_cast<grpc_fd**>(
1334         gpr_realloc(pollset_set->fds,
1335                     pollset_set->fd_capacity * sizeof(*pollset_set->fds)));
1336   }
1337   GRPC_FD_REF(fd, "pollset_set");
1338   pollset_set->fds[pollset_set->fd_count++] = fd;
1339   for (i = 0; i < pollset_set->pollset_count; i++) {
1340     pollset_add_fd(pollset_set->pollsets[i], fd);
1341   }
1342   for (i = 0; i < pollset_set->pollset_set_count; i++) {
1343     pollset_set_add_fd(pollset_set->pollset_sets[i], fd);
1344   }
1345   gpr_mu_unlock(&pollset_set->mu);
1346 }
1347 
1348 static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1349   size_t i;
1350   gpr_mu_lock(&pollset_set->mu);
1351   for (i = 0; i < pollset_set->fd_count; i++) {
1352     if (pollset_set->fds[i] == fd) {
1353       pollset_set->fd_count--;
1354       GPR_SWAP(grpc_fd*, pollset_set->fds[i],
1355                pollset_set->fds[pollset_set->fd_count]);
1356       GRPC_FD_UNREF(fd, "pollset_set");
1357       break;
1358     }
1359   }
1360   for (i = 0; i < pollset_set->pollset_set_count; i++) {
1361     pollset_set_del_fd(pollset_set->pollset_sets[i], fd);
1362   }
1363   gpr_mu_unlock(&pollset_set->mu);
1364 }
1365 
1366 /*******************************************************************************
1367  * Condition Variable polling extensions
1368  */
1369 
1370 static void run_poll(void* args);
1371 static void cache_poller_locked(poll_args* args);
1372 static void cache_harvest_locked();
1373 
1374 static void cache_insert_locked(poll_args* args) {
1375   uint32_t key = gpr_murmur_hash3(args->fds, args->nfds * sizeof(struct pollfd),
1376                                   0xDEADBEEF);
1377   key = key % poll_cache.size;
1378   if (poll_cache.active_pollers[key]) {
1379     poll_cache.active_pollers[key]->prev = args;
1380   }
1381   args->next = poll_cache.active_pollers[key];
1382   args->prev = nullptr;
1383   poll_cache.active_pollers[key] = args;
1384   poll_cache.count++;
1385 }
1386 
1387 static void init_result(poll_args* pargs) {
1388   pargs->result = static_cast<poll_result*>(gpr_malloc(sizeof(poll_result)));
1389   gpr_ref_init(&pargs->result->refcount, 1);
1390   pargs->result->watchers = nullptr;
1391   pargs->result->watchcount = 0;
1392   pargs->result->fds = static_cast<struct pollfd*>(
1393       gpr_malloc(sizeof(struct pollfd) * pargs->nfds));
1394   memcpy(pargs->result->fds, pargs->fds, sizeof(struct pollfd) * pargs->nfds);
1395   pargs->result->nfds = pargs->nfds;
1396   pargs->result->retval = 0;
1397   pargs->result->err = 0;
1398   pargs->result->completed = 0;
1399 }
1400 
1401 // Creates a poll_args object for a given arguments to poll().
1402 // This object may return a poll_args in the cache.
1403 static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) {
1404   uint32_t key =
1405       gpr_murmur_hash3(fds, count * sizeof(struct pollfd), 0xDEADBEEF);
1406   key = key % poll_cache.size;
1407   poll_args* curr = poll_cache.active_pollers[key];
1408   while (curr) {
1409     if (curr->nfds == count &&
1410         memcmp(curr->fds, fds, count * sizeof(struct pollfd)) == 0) {
1411       gpr_free(fds);
1412       return curr;
1413     }
1414     curr = curr->next;
1415   }
1416 
1417   if (poll_cache.free_pollers) {
1418     poll_args* pargs = poll_cache.free_pollers;
1419     poll_cache.free_pollers = pargs->next;
1420     if (poll_cache.free_pollers) {
1421       poll_cache.free_pollers->prev = nullptr;
1422     }
1423     pargs->fds = fds;
1424     pargs->nfds = count;
1425     pargs->next = nullptr;
1426     pargs->prev = nullptr;
1427     init_result(pargs);
1428     cache_poller_locked(pargs);
1429     return pargs;
1430   }
1431 
1432   poll_args* pargs =
1433       static_cast<poll_args*>(gpr_malloc(sizeof(struct poll_args)));
1434   gpr_cv_init(&pargs->trigger);
1435   gpr_cv_init(&pargs->harvest);
1436   gpr_cv_init(&pargs->join);
1437   pargs->harvestable = false;
1438   pargs->joinable = false;
1439   pargs->fds = fds;
1440   pargs->nfds = count;
1441   pargs->next = nullptr;
1442   pargs->prev = nullptr;
1443   pargs->trigger_set = 0;
1444   init_result(pargs);
1445   cache_poller_locked(pargs);
1446   gpr_ref(&g_cvfds.pollcount);
1447   pargs->poller_thd = grpc_core::Thread("grpc_poller", &run_poll, pargs);
1448   pargs->poller_thd.Start();
1449   return pargs;
1450 }
1451 
1452 static void cache_delete_locked(poll_args* args) {
1453   if (!args->prev) {
1454     uint32_t key = gpr_murmur_hash3(
1455         args->fds, args->nfds * sizeof(struct pollfd), 0xDEADBEEF);
1456     key = key % poll_cache.size;
1457     GPR_ASSERT(poll_cache.active_pollers[key] == args);
1458     poll_cache.active_pollers[key] = args->next;
1459   } else {
1460     args->prev->next = args->next;
1461   }
1462 
1463   if (args->next) {
1464     args->next->prev = args->prev;
1465   }
1466 
1467   poll_cache.count--;
1468   if (poll_cache.free_pollers) {
1469     poll_cache.free_pollers->prev = args;
1470   }
1471   args->prev = nullptr;
1472   args->next = poll_cache.free_pollers;
1473   gpr_free(args->fds);
1474   poll_cache.free_pollers = args;
1475 }
1476 
1477 static void cache_poller_locked(poll_args* args) {
1478   if (poll_cache.count + 1 > poll_cache.size / 2) {
1479     poll_args** old_active_pollers = poll_cache.active_pollers;
1480     poll_cache.size = poll_cache.size * 2;
1481     poll_cache.count = 0;
1482     poll_cache.active_pollers =
1483         static_cast<poll_args**>(gpr_malloc(sizeof(void*) * poll_cache.size));
1484     for (unsigned int i = 0; i < poll_cache.size; i++) {
1485       poll_cache.active_pollers[i] = nullptr;
1486     }
1487     for (unsigned int i = 0; i < poll_cache.size / 2; i++) {
1488       poll_args* curr = old_active_pollers[i];
1489       poll_args* next = nullptr;
1490       while (curr) {
1491         next = curr->next;
1492         cache_insert_locked(curr);
1493         curr = next;
1494       }
1495     }
1496     gpr_free(old_active_pollers);
1497   }
1498 
1499   cache_insert_locked(args);
1500 }
1501 
1502 static void cache_destroy_locked(poll_args* args) {
1503   if (args->next) {
1504     args->next->prev = args->prev;
1505   }
1506 
1507   if (args->prev) {
1508     args->prev->next = args->next;
1509   } else {
1510     poll_cache.free_pollers = args->next;
1511   }
1512 
1513   // Now move this args to the dead poller list for later join
1514   if (poll_cache.dead_pollers != nullptr) {
1515     poll_cache.dead_pollers->prev = args;
1516   }
1517   args->prev = nullptr;
1518   args->next = poll_cache.dead_pollers;
1519   poll_cache.dead_pollers = args;
1520 }
1521 
1522 static void cache_harvest_locked() {
1523   while (poll_cache.dead_pollers) {
1524     poll_args* args = poll_cache.dead_pollers;
1525     poll_cache.dead_pollers = poll_cache.dead_pollers->next;
1526     // Keep the list consistent in case new dead pollers get added when we
1527     // release the lock below to wait on joining
1528     if (poll_cache.dead_pollers) {
1529       poll_cache.dead_pollers->prev = nullptr;
1530     }
1531     args->harvestable = true;
1532     gpr_cv_signal(&args->harvest);
1533     while (!args->joinable) {
1534       gpr_cv_wait(&args->join, &g_cvfds.mu,
1535                   gpr_inf_future(GPR_CLOCK_MONOTONIC));
1536     }
1537     args->poller_thd.Join();
1538     gpr_free(args);
1539   }
1540 }
1541 
1542 static void decref_poll_result(poll_result* res) {
1543   if (gpr_unref(&res->refcount)) {
1544     GPR_ASSERT(!res->watchers);
1545     gpr_free(res->fds);
1546     gpr_free(res);
1547   }
1548 }
1549 
1550 void remove_cvn(grpc_cv_node** head, grpc_cv_node* target) {
1551   if (target->next) {
1552     target->next->prev = target->prev;
1553   }
1554 
1555   if (target->prev) {
1556     target->prev->next = target->next;
1557   } else {
1558     *head = target->next;
1559   }
1560 }
1561 
1562 gpr_timespec thread_grace;
1563 
1564 // Poll in a background thread
1565 static void run_poll(void* args) {
1566   poll_args* pargs = static_cast<poll_args*>(args);
1567   while (1) {
1568     poll_result* result = pargs->result;
1569     int retval = g_cvfds.poll(result->fds, result->nfds, CV_POLL_PERIOD_MS);
1570     gpr_mu_lock(&g_cvfds.mu);
1571     cache_harvest_locked();
1572     if (retval != 0) {
1573       result->completed = 1;
1574       result->retval = retval;
1575       result->err = errno;
1576       grpc_cv_node* watcher = result->watchers;
1577       while (watcher) {
1578         gpr_cv_signal(watcher->cv);
1579         watcher = watcher->next;
1580       }
1581     }
1582     if (result->watchcount == 0 || result->completed) {
1583       cache_delete_locked(pargs);
1584       decref_poll_result(result);
1585       // Leave this polling thread alive for a grace period to do another poll()
1586       // op
1587       gpr_timespec deadline = gpr_now(GPR_CLOCK_MONOTONIC);
1588       deadline = gpr_time_add(deadline, thread_grace);
1589       pargs->trigger_set = 0;
1590       gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline);
1591       cache_harvest_locked();
1592       if (!pargs->trigger_set) {
1593         cache_destroy_locked(pargs);
1594         break;
1595       }
1596     }
1597     gpr_mu_unlock(&g_cvfds.mu);
1598   }
1599 
1600   if (gpr_unref(&g_cvfds.pollcount)) {
1601     gpr_cv_signal(&g_cvfds.shutdown_cv);
1602   }
1603   while (!pargs->harvestable) {
1604     gpr_cv_wait(&pargs->harvest, &g_cvfds.mu,
1605                 gpr_inf_future(GPR_CLOCK_MONOTONIC));
1606   }
1607   pargs->joinable = true;
1608   gpr_cv_signal(&pargs->join);
1609   gpr_mu_unlock(&g_cvfds.mu);
1610 }
1611 
1612 // This function overrides poll() to handle condition variable wakeup fds
1613 static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) {
1614   if (timeout == 0) {
1615     // Don't bother using background threads for polling if timeout is 0,
1616     // poll-cv might not wait for a poll to return otherwise.
1617     // https://github.com/grpc/grpc/issues/13298
1618     return poll(fds, nfds, 0);
1619   }
1620   unsigned int i;
1621   int res, idx;
1622   grpc_cv_node* pollcv;
1623   int skip_poll = 0;
1624   nfds_t nsockfds = 0;
1625   poll_result* result = nullptr;
1626   gpr_mu_lock(&g_cvfds.mu);
1627   cache_harvest_locked();
1628   pollcv = static_cast<grpc_cv_node*>(gpr_malloc(sizeof(grpc_cv_node)));
1629   pollcv->next = nullptr;
1630   gpr_cv pollcv_cv;
1631   gpr_cv_init(&pollcv_cv);
1632   pollcv->cv = &pollcv_cv;
1633   grpc_cv_node* fd_cvs =
1634       static_cast<grpc_cv_node*>(gpr_malloc(nfds * sizeof(grpc_cv_node)));
1635 
1636   for (i = 0; i < nfds; i++) {
1637     fds[i].revents = 0;
1638     if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
1639       idx = GRPC_FD_TO_IDX(fds[i].fd);
1640       fd_cvs[i].cv = &pollcv_cv;
1641       fd_cvs[i].prev = nullptr;
1642       fd_cvs[i].next = g_cvfds.cvfds[idx].cvs;
1643       if (g_cvfds.cvfds[idx].cvs) {
1644         g_cvfds.cvfds[idx].cvs->prev = &(fd_cvs[i]);
1645       }
1646       g_cvfds.cvfds[idx].cvs = &(fd_cvs[i]);
1647       // Don't bother polling if a wakeup fd is ready
1648       if (g_cvfds.cvfds[idx].is_set) {
1649         skip_poll = 1;
1650       }
1651     } else if (fds[i].fd >= 0) {
1652       nsockfds++;
1653     }
1654   }
1655 
1656   gpr_timespec deadline = gpr_now(GPR_CLOCK_MONOTONIC);
1657   if (timeout < 0) {
1658     deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
1659   } else {
1660     deadline =
1661         gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
1662   }
1663 
1664   res = 0;
1665   if (!skip_poll && nsockfds > 0) {
1666     struct pollfd* pollfds = static_cast<struct pollfd*>(
1667         gpr_malloc(sizeof(struct pollfd) * nsockfds));
1668     idx = 0;
1669     for (i = 0; i < nfds; i++) {
1670       if (fds[i].fd >= 0) {
1671         pollfds[idx].fd = fds[i].fd;
1672         pollfds[idx].events = fds[i].events;
1673         pollfds[idx].revents = 0;
1674         idx++;
1675       }
1676     }
1677     poll_args* pargs = get_poller_locked(pollfds, nsockfds);
1678     result = pargs->result;
1679     pollcv->next = result->watchers;
1680     pollcv->prev = nullptr;
1681     if (result->watchers) {
1682       result->watchers->prev = pollcv;
1683     }
1684     result->watchers = pollcv;
1685     result->watchcount++;
1686     gpr_ref(&result->refcount);
1687 
1688     pargs->trigger_set = 1;
1689     gpr_cv_signal(&pargs->trigger);
1690     gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
1691     cache_harvest_locked();
1692     res = result->retval;
1693     errno = result->err;
1694     result->watchcount--;
1695     remove_cvn(&result->watchers, pollcv);
1696   } else if (!skip_poll) {
1697     gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
1698     cache_harvest_locked();
1699   }
1700 
1701   idx = 0;
1702   for (i = 0; i < nfds; i++) {
1703     if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
1704       remove_cvn(&g_cvfds.cvfds[GRPC_FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i]));
1705       if (g_cvfds.cvfds[GRPC_FD_TO_IDX(fds[i].fd)].is_set) {
1706         fds[i].revents = POLLIN;
1707         if (res >= 0) res++;
1708       }
1709     } else if (!skip_poll && fds[i].fd >= 0 && result->completed) {
1710       fds[i].revents = result->fds[idx].revents;
1711       idx++;
1712     }
1713   }
1714 
1715   gpr_free(fd_cvs);
1716   gpr_free(pollcv);
1717   if (result) {
1718     decref_poll_result(result);
1719   }
1720 
1721   gpr_mu_unlock(&g_cvfds.mu);
1722 
1723   return res;
1724 }
1725 
1726 static void global_cv_fd_table_init() {
1727   gpr_mu_init(&g_cvfds.mu);
1728   gpr_mu_lock(&g_cvfds.mu);
1729   gpr_cv_init(&g_cvfds.shutdown_cv);
1730   gpr_ref_init(&g_cvfds.pollcount, 1);
1731   g_cvfds.size = CV_DEFAULT_TABLE_SIZE;
1732   g_cvfds.cvfds = static_cast<grpc_fd_node*>(
1733       gpr_malloc(sizeof(grpc_fd_node) * CV_DEFAULT_TABLE_SIZE));
1734   g_cvfds.free_fds = nullptr;
1735   thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN);
1736   for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) {
1737     g_cvfds.cvfds[i].is_set = 0;
1738     g_cvfds.cvfds[i].cvs = nullptr;
1739     g_cvfds.cvfds[i].next_free = g_cvfds.free_fds;
1740     g_cvfds.free_fds = &g_cvfds.cvfds[i];
1741   }
1742   // Override the poll function with one that supports cvfds
1743   g_cvfds.poll = grpc_poll_function;
1744   grpc_poll_function = &cvfd_poll;
1745 
1746   // Initialize the cache
1747   poll_cache.size = 32;
1748   poll_cache.count = 0;
1749   poll_cache.free_pollers = nullptr;
1750   poll_cache.active_pollers =
1751       static_cast<poll_args**>(gpr_malloc(sizeof(void*) * 32));
1752   for (unsigned int i = 0; i < poll_cache.size; i++) {
1753     poll_cache.active_pollers[i] = nullptr;
1754   }
1755   poll_cache.dead_pollers = nullptr;
1756 
1757   gpr_mu_unlock(&g_cvfds.mu);
1758 }
1759 
1760 static void global_cv_fd_table_shutdown() {
1761   gpr_mu_lock(&g_cvfds.mu);
1762   // Attempt to wait for all abandoned poll() threads to terminate
1763   // Not doing so will result in reported memory leaks
1764   if (!gpr_unref(&g_cvfds.pollcount)) {
1765     int res = gpr_cv_wait(&g_cvfds.shutdown_cv, &g_cvfds.mu,
1766                           gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
1767                                        gpr_time_from_seconds(3, GPR_TIMESPAN)));
1768     GPR_ASSERT(res == 0);
1769   }
1770   gpr_cv_destroy(&g_cvfds.shutdown_cv);
1771   grpc_poll_function = g_cvfds.poll;
1772   gpr_free(g_cvfds.cvfds);
1773 
1774   cache_harvest_locked();
1775   gpr_free(poll_cache.active_pollers);
1776 
1777   gpr_mu_unlock(&g_cvfds.mu);
1778   gpr_mu_destroy(&g_cvfds.mu);
1779 }
1780 
1781 /*******************************************************************************
1782  * event engine binding
1783  */
1784 
1785 static void shutdown_engine(void) {
1786   pollset_global_shutdown();
1787   if (grpc_cv_wakeup_fds_enabled()) {
1788     global_cv_fd_table_shutdown();
1789   }
1790   if (track_fds_for_fork) {
1791     gpr_mu_destroy(&fork_fd_list_mu);
1792     grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1793   }
1794 }
1795 
1796 static const grpc_event_engine_vtable vtable = {
1797     sizeof(grpc_pollset),
1798     false,
1799 
1800     fd_create,
1801     fd_wrapped_fd,
1802     fd_orphan,
1803     fd_shutdown,
1804     fd_notify_on_read,
1805     fd_notify_on_write,
1806     fd_notify_on_error,
1807     fd_set_readable,
1808     fd_set_writable,
1809     fd_set_error,
1810     fd_is_shutdown,
1811 
1812     pollset_init,
1813     pollset_shutdown,
1814     pollset_destroy,
1815     pollset_work,
1816     pollset_kick,
1817     pollset_add_fd,
1818 
1819     pollset_set_create,
1820     pollset_set_destroy,
1821     pollset_set_add_pollset,
1822     pollset_set_del_pollset,
1823     pollset_set_add_pollset_set,
1824     pollset_set_del_pollset_set,
1825     pollset_set_add_fd,
1826     pollset_set_del_fd,
1827 
1828     shutdown_engine,
1829 };
1830 
1831 /* Called by the child process's post-fork handler to close open fds, including
1832  * worker wakeup fds. This allows gRPC to shutdown in the child process without
1833  * interfering with connections or RPCs ongoing in the parent. */
1834 static void reset_event_manager_on_fork() {
1835   gpr_mu_lock(&fork_fd_list_mu);
1836   while (fork_fd_list_head != nullptr) {
1837     if (fork_fd_list_head->fd != nullptr) {
1838       close(fork_fd_list_head->fd->fd);
1839       fork_fd_list_head->fd->fd = -1;
1840     } else {
1841       close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
1842       fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
1843       close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
1844       fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
1845     }
1846     fork_fd_list_head = fork_fd_list_head->next;
1847   }
1848   gpr_mu_unlock(&fork_fd_list_mu);
1849 }
1850 
1851 const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request) {
1852   if (!grpc_has_wakeup_fd()) {
1853     gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
1854     return nullptr;
1855   }
1856   if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1857     return nullptr;
1858   }
1859   if (grpc_core::Fork::Enabled()) {
1860     track_fds_for_fork = true;
1861     gpr_mu_init(&fork_fd_list_mu);
1862     grpc_core::Fork::SetResetChildPollingEngineFunc(
1863         reset_event_manager_on_fork);
1864   }
1865   return &vtable;
1866 }
1867 
1868 const grpc_event_engine_vtable* grpc_init_poll_cv_posix(bool explicit_request) {
1869   global_cv_fd_table_init();
1870   grpc_enable_cv_wakeup_fds(1);
1871   if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1872     global_cv_fd_table_shutdown();
1873     grpc_enable_cv_wakeup_fds(0);
1874     return nullptr;
1875   }
1876   return &vtable;
1877 }
1878 
1879 #endif /* GRPC_POSIX_SOCKET_EV_POLL */
1880