• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_POSIX_SOCKET_EV_POLL
24 
25 #include "src/core/lib/iomgr/ev_poll_posix.h"
26 
27 #include <assert.h>
28 #include <errno.h>
29 #include <limits.h>
30 #include <poll.h>
31 #include <string.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34 
35 #include <string>
36 
37 #include "absl/strings/str_cat.h"
38 
39 #include <grpc/support/alloc.h>
40 #include <grpc/support/log.h>
41 
42 #include "src/core/lib/debug/stats.h"
43 #include "src/core/lib/gpr/murmur_hash.h"
44 #include "src/core/lib/gpr/tls.h"
45 #include "src/core/lib/gpr/useful.h"
46 #include "src/core/lib/gprpp/thd.h"
47 #include "src/core/lib/iomgr/block_annotate.h"
48 #include "src/core/lib/iomgr/iomgr_internal.h"
49 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
50 #include "src/core/lib/profiling/timers.h"
51 
52 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
53 
54 /*******************************************************************************
55  * FD declarations
56  */
57 typedef struct grpc_fd_watcher {
58   struct grpc_fd_watcher* next;
59   struct grpc_fd_watcher* prev;
60   grpc_pollset* pollset;
61   grpc_pollset_worker* worker;
62   grpc_fd* fd;
63 } grpc_fd_watcher;
64 
65 typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;
66 
67 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
68 struct grpc_fork_fd_list {
69   /* Only one of fd or cached_wakeup_fd will be set. The unused field will be
70   set to nullptr. */
71   grpc_fd* fd;
72   grpc_cached_wakeup_fd* cached_wakeup_fd;
73 
74   grpc_fork_fd_list* next;
75   grpc_fork_fd_list* prev;
76 };
77 
78 struct grpc_fd {
79   int fd;
80   /* refst format:
81      bit0:   1=active/0=orphaned
82      bit1-n: refcount
83      meaning that mostly we ref by two to avoid altering the orphaned bit,
84      and just unref by 1 when we're ready to flag the object as orphaned */
85   gpr_atm refst;
86 
87   gpr_mu mu;
88   int shutdown;
89   int closed;
90   int released;
91   gpr_atm pollhup;
92   grpc_error* shutdown_error;
93 
94   /* The watcher list.
95 
96      The following watcher related fields are protected by watcher_mu.
97 
98      An fd_watcher is an ephemeral object created when an fd wants to
99      begin polling, and destroyed after the poll.
100 
101      It denotes the fd's interest in whether to read poll or write poll
102      or both or neither on this fd.
103 
104      If a watcher is asked to poll for reads or writes, the read_watcher
105      or write_watcher fields are set respectively. A watcher may be asked
106      to poll for both, in which case both fields will be set.
107 
108      read_watcher and write_watcher may be NULL if no watcher has been
109      asked to poll for reads or writes.
110 
111      If an fd_watcher is not asked to poll for reads or writes, it's added
112      to a linked list of inactive watchers, rooted at inactive_watcher_root.
113      If at a later time there becomes need of a poller to poll, one of
114      the inactive pollers may be kicked out of their poll loops to take
115      that responsibility. */
116   grpc_fd_watcher inactive_watcher_root;
117   grpc_fd_watcher* read_watcher;
118   grpc_fd_watcher* write_watcher;
119 
120   grpc_closure* read_closure;
121   grpc_closure* write_closure;
122 
123   grpc_closure* on_done_closure;
124 
125   grpc_iomgr_object iomgr_object;
126 
127   /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
128   grpc_fork_fd_list* fork_fd_list;
129 };
130 
131 /* True when GRPC_ENABLE_FORK_SUPPORT=1. */
132 static bool track_fds_for_fork = false;
133 
134 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
135 static grpc_fork_fd_list* fork_fd_list_head = nullptr;
136 static gpr_mu fork_fd_list_mu;
137 
138 /* Begin polling on an fd.
139    Registers that the given pollset is interested in this fd - so that if read
140    or writability interest changes, the pollset can be kicked to pick up that
141    new interest.
142    Return value is:
143      (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
144    i.e. a combination of read_mask and write_mask determined by the fd's current
145    interest in said events.
146    Polling strategies that do not need to alter their behavior depending on the
147    fd's current interest (such as epoll) do not need to call this function.
148    MUST NOT be called with a pollset lock taken */
149 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
150                               grpc_pollset_worker* worker, uint32_t read_mask,
151                               uint32_t write_mask, grpc_fd_watcher* watcher);
152 /* Complete polling previously started with fd_begin_poll
153    MUST NOT be called with a pollset lock taken
154    if got_read or got_write are 1, also does the become_{readable,writable} as
155    appropriate. */
156 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write);
157 
158 /* Return 1 if this fd is orphaned, 0 otherwise */
159 static bool fd_is_orphaned(grpc_fd* fd);
160 
161 #ifndef NDEBUG
162 static void fd_ref(grpc_fd* fd, const char* reason, const char* file, int line);
163 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
164                      int line);
165 #define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
166 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
167 #else
168 static void fd_ref(grpc_fd* fd);
169 static void fd_unref(grpc_fd* fd);
170 #define GRPC_FD_REF(fd, reason) fd_ref(fd)
171 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
172 #endif
173 
174 #define CLOSURE_NOT_READY ((grpc_closure*)0)
175 #define CLOSURE_READY ((grpc_closure*)1)
176 
177 /*******************************************************************************
178  * pollset declarations
179  */
180 
181 typedef struct grpc_cached_wakeup_fd {
182   grpc_wakeup_fd fd;
183   struct grpc_cached_wakeup_fd* next;
184 
185   /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
186   grpc_fork_fd_list* fork_fd_list;
187 } grpc_cached_wakeup_fd;
188 
189 struct grpc_pollset_worker {
190   grpc_cached_wakeup_fd* wakeup_fd;
191   int reevaluate_polling_on_wakeup;
192   int kicked_specifically;
193   struct grpc_pollset_worker* next;
194   struct grpc_pollset_worker* prev;
195 };
196 
197 struct grpc_pollset {
198   gpr_mu mu;
199   grpc_pollset_worker root_worker;
200   int shutting_down;
201   int called_shutdown;
202   int kicked_without_pollers;
203   grpc_closure* shutdown_done;
204   int pollset_set_count;
205   /* all polled fds */
206   size_t fd_count;
207   size_t fd_capacity;
208   grpc_fd** fds;
209   /* Local cache of eventfds for workers */
210   grpc_cached_wakeup_fd* local_wakeup_cache;
211 };
212 
213 /* Add an fd to a pollset */
214 static void pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd);
215 
216 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
217 
218 /* Convert a timespec to milliseconds:
219    - very small or negative poll times are clamped to zero to do a
220      non-blocking poll (which becomes spin polling)
221    - other small values are rounded up to one millisecond
222    - longer than a millisecond polls are rounded up to the next nearest
223      millisecond to avoid spinning
224    - infinite timeouts are converted to -1 */
225 static int poll_deadline_to_millis_timeout(grpc_millis deadline);
226 
227 /* Allow kick to wakeup the currently polling worker */
228 #define GRPC_POLLSET_CAN_KICK_SELF 1
229 /* Force the wakee to repoll when awoken */
230 #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
231 /* As per pollset_kick, with an extended set of flags (defined above)
232    -- mostly for fd_posix's use. */
233 static grpc_error* pollset_kick_ext(grpc_pollset* p,
234                                     grpc_pollset_worker* specific_worker,
235                                     uint32_t flags) GRPC_MUST_USE_RESULT;
236 
237 /* Return 1 if the pollset has active threads in pollset_work (pollset must
238  * be locked) */
239 static bool pollset_has_workers(grpc_pollset* pollset);
240 
241 /*******************************************************************************
242  * pollset_set definitions
243  */
244 
245 struct grpc_pollset_set {
246   gpr_mu mu;
247 
248   size_t pollset_count;
249   size_t pollset_capacity;
250   grpc_pollset** pollsets;
251 
252   size_t pollset_set_count;
253   size_t pollset_set_capacity;
254   struct grpc_pollset_set** pollset_sets;
255 
256   size_t fd_count;
257   size_t fd_capacity;
258   grpc_fd** fds;
259 };
260 
261 /*******************************************************************************
262  * functions to track opened fds. No-ops unless track_fds_for_fork is true.
263  */
264 
fork_fd_list_remove_node(grpc_fork_fd_list * node)265 static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
266   if (track_fds_for_fork) {
267     gpr_mu_lock(&fork_fd_list_mu);
268     if (fork_fd_list_head == node) {
269       fork_fd_list_head = node->next;
270     }
271     if (node->prev != nullptr) {
272       node->prev->next = node->next;
273     }
274     if (node->next != nullptr) {
275       node->next->prev = node->prev;
276     }
277     gpr_free(node);
278     gpr_mu_unlock(&fork_fd_list_mu);
279   }
280 }
281 
fork_fd_list_add_node(grpc_fork_fd_list * node)282 static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
283   gpr_mu_lock(&fork_fd_list_mu);
284   node->next = fork_fd_list_head;
285   node->prev = nullptr;
286   if (fork_fd_list_head != nullptr) {
287     fork_fd_list_head->prev = node;
288   }
289   fork_fd_list_head = node;
290   gpr_mu_unlock(&fork_fd_list_mu);
291 }
292 
fork_fd_list_add_grpc_fd(grpc_fd * fd)293 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
294   if (track_fds_for_fork) {
295     fd->fork_fd_list =
296         static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
297     fd->fork_fd_list->fd = fd;
298     fd->fork_fd_list->cached_wakeup_fd = nullptr;
299     fork_fd_list_add_node(fd->fork_fd_list);
300   }
301 }
302 
fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd * fd)303 static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
304   if (track_fds_for_fork) {
305     fd->fork_fd_list =
306         static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
307     fd->fork_fd_list->cached_wakeup_fd = fd;
308     fd->fork_fd_list->fd = nullptr;
309     fork_fd_list_add_node(fd->fork_fd_list);
310   }
311 }
312 
313 /*******************************************************************************
314  * fd_posix.c
315  */
316 
317 #ifndef NDEBUG
318 #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
319 #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
ref_by(grpc_fd * fd,int n,const char * reason,const char * file,int line)320 static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
321                    int line) {
322   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
323     gpr_log(GPR_DEBUG,
324             "FD %d %p   ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
325             fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
326             gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
327   }
328 #else
329 #define REF_BY(fd, n, reason) \
330   do {                        \
331     ref_by(fd, n);            \
332     (void)(reason);           \
333   } while (0)
334 #define UNREF_BY(fd, n, reason) \
335   do {                          \
336     unref_by(fd, n);            \
337     (void)(reason);             \
338   } while (0)
339 static void ref_by(grpc_fd* fd, int n) {
340 #endif
341   GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
342 }
343 
344 #ifndef NDEBUG
345 static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
346                      int line) {
347   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
348     gpr_log(GPR_DEBUG,
349             "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
350             fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
351             gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
352   }
353 #else
354 static void unref_by(grpc_fd* fd, int n) {
355 #endif
356   gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
357   if (old == n) {
358     gpr_mu_destroy(&fd->mu);
359     grpc_iomgr_unregister_object(&fd->iomgr_object);
360     fork_fd_list_remove_node(fd->fork_fd_list);
361     if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
362     gpr_free(fd);
363   } else {
364     GPR_ASSERT(old > n);
365   }
366 }
367 
368 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
369   // Avoid unused-parameter warning for debug-only parameter
370   (void)track_err;
371   GPR_DEBUG_ASSERT(track_err == false);
372   grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r)));
373   gpr_mu_init(&r->mu);
374   gpr_atm_rel_store(&r->refst, 1);
375   r->shutdown = 0;
376   r->read_closure = CLOSURE_NOT_READY;
377   r->write_closure = CLOSURE_NOT_READY;
378   r->fd = fd;
379   r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
380       &r->inactive_watcher_root;
381   r->read_watcher = r->write_watcher = nullptr;
382   r->on_done_closure = nullptr;
383   r->closed = 0;
384   r->released = 0;
385   gpr_atm_no_barrier_store(&r->pollhup, 0);
386 
387   std::string name2 = absl::StrCat(name, " fd=", fd);
388   grpc_iomgr_register_object(&r->iomgr_object, name2.c_str());
389   fork_fd_list_add_grpc_fd(r);
390   return r;
391 }
392 
393 static bool fd_is_orphaned(grpc_fd* fd) {
394   return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
395 }
396 
397 static grpc_error* pollset_kick_locked(grpc_fd_watcher* watcher) {
398   gpr_mu_lock(&watcher->pollset->mu);
399   GPR_ASSERT(watcher->worker);
400   grpc_error* err = pollset_kick_ext(watcher->pollset, watcher->worker,
401                                      GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
402   gpr_mu_unlock(&watcher->pollset->mu);
403   return err;
404 }
405 
406 static void maybe_wake_one_watcher_locked(grpc_fd* fd) {
407   if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
408     pollset_kick_locked(fd->inactive_watcher_root.next);
409   } else if (fd->read_watcher) {
410     pollset_kick_locked(fd->read_watcher);
411   } else if (fd->write_watcher) {
412     pollset_kick_locked(fd->write_watcher);
413   }
414 }
415 
416 static void wake_all_watchers_locked(grpc_fd* fd) {
417   grpc_fd_watcher* watcher;
418   for (watcher = fd->inactive_watcher_root.next;
419        watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
420     pollset_kick_locked(watcher);
421   }
422   if (fd->read_watcher) {
423     pollset_kick_locked(fd->read_watcher);
424   }
425   if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
426     pollset_kick_locked(fd->write_watcher);
427   }
428 }
429 
430 static int has_watchers(grpc_fd* fd) {
431   return fd->read_watcher != nullptr || fd->write_watcher != nullptr ||
432          fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
433 }
434 
435 static void close_fd_locked(grpc_fd* fd) {
436   fd->closed = 1;
437   if (!fd->released) {
438     close(fd->fd);
439   }
440   grpc_core::ExecCtx::Run(DEBUG_LOCATION, fd->on_done_closure, GRPC_ERROR_NONE);
441 }
442 
443 static int fd_wrapped_fd(grpc_fd* fd) {
444   if (fd->released || fd->closed) {
445     return -1;
446   } else {
447     return fd->fd;
448   }
449 }
450 
451 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
452                       const char* reason) {
453   fd->on_done_closure = on_done;
454   fd->released = release_fd != nullptr;
455   if (release_fd != nullptr) {
456     *release_fd = fd->fd;
457     fd->released = true;
458   }
459   gpr_mu_lock(&fd->mu);
460   REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
461   if (!has_watchers(fd)) {
462     close_fd_locked(fd);
463   } else {
464     wake_all_watchers_locked(fd);
465   }
466   gpr_mu_unlock(&fd->mu);
467   UNREF_BY(fd, 2, reason); /* drop the reference */
468 }
469 
470 /* increment refcount by two to avoid changing the orphan bit */
471 #ifndef NDEBUG
472 static void fd_ref(grpc_fd* fd, const char* reason, const char* file,
473                    int line) {
474   ref_by(fd, 2, reason, file, line);
475 }
476 
477 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
478                      int line) {
479   unref_by(fd, 2, reason, file, line);
480 }
481 #else
482 static void fd_ref(grpc_fd* fd) { ref_by(fd, 2); }
483 
484 static void fd_unref(grpc_fd* fd) { unref_by(fd, 2); }
485 #endif
486 
487 static grpc_error* fd_shutdown_error(grpc_fd* fd) {
488   if (!fd->shutdown) {
489     return GRPC_ERROR_NONE;
490   } else {
491     return grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
492                                   "FD shutdown", &fd->shutdown_error, 1),
493                               GRPC_ERROR_INT_GRPC_STATUS,
494                               GRPC_STATUS_UNAVAILABLE);
495   }
496 }
497 
498 static void notify_on_locked(grpc_fd* fd, grpc_closure** st,
499                              grpc_closure* closure) {
500   if (fd->shutdown || gpr_atm_no_barrier_load(&fd->pollhup)) {
501     grpc_core::ExecCtx::Run(
502         DEBUG_LOCATION, closure,
503         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"),
504                            GRPC_ERROR_INT_GRPC_STATUS,
505                            GRPC_STATUS_UNAVAILABLE));
506   } else if (*st == CLOSURE_NOT_READY) {
507     /* not ready ==> switch to a waiting state by setting the closure */
508     *st = closure;
509   } else if (*st == CLOSURE_READY) {
510     /* already ready ==> queue the closure to run immediately */
511     *st = CLOSURE_NOT_READY;
512     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, fd_shutdown_error(fd));
513     maybe_wake_one_watcher_locked(fd);
514   } else {
515     /* upcallptr was set to a different closure.  This is an error! */
516     gpr_log(GPR_ERROR,
517             "User called a notify_on function with a previous callback still "
518             "pending");
519     abort();
520   }
521 }
522 
523 /* returns 1 if state becomes not ready */
524 static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {
525   if (*st == CLOSURE_READY) {
526     /* duplicate ready ==> ignore */
527     return 0;
528   } else if (*st == CLOSURE_NOT_READY) {
529     /* not ready, and not waiting ==> flag ready */
530     *st = CLOSURE_READY;
531     return 0;
532   } else {
533     /* waiting ==> queue closure */
534     grpc_core::ExecCtx::Run(DEBUG_LOCATION, *st, fd_shutdown_error(fd));
535     *st = CLOSURE_NOT_READY;
536     return 1;
537   }
538 }
539 
540 static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
541   gpr_mu_lock(&fd->mu);
542   /* only shutdown once */
543   if (!fd->shutdown) {
544     fd->shutdown = 1;
545     fd->shutdown_error = why;
546     /* signal read/write closed to OS so that future operations fail */
547     shutdown(fd->fd, SHUT_RDWR);
548     set_ready_locked(fd, &fd->read_closure);
549     set_ready_locked(fd, &fd->write_closure);
550   } else {
551     GRPC_ERROR_UNREF(why);
552   }
553   gpr_mu_unlock(&fd->mu);
554 }
555 
556 static bool fd_is_shutdown(grpc_fd* fd) {
557   gpr_mu_lock(&fd->mu);
558   bool r = fd->shutdown;
559   gpr_mu_unlock(&fd->mu);
560   return r;
561 }
562 
563 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
564   gpr_mu_lock(&fd->mu);
565   notify_on_locked(fd, &fd->read_closure, closure);
566   gpr_mu_unlock(&fd->mu);
567 }
568 
569 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
570   gpr_mu_lock(&fd->mu);
571   notify_on_locked(fd, &fd->write_closure, closure);
572   gpr_mu_unlock(&fd->mu);
573 }
574 
575 static void fd_notify_on_error(grpc_fd* /*fd*/, grpc_closure* closure) {
576   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
577     gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
578   }
579   grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_CANCELLED);
580 }
581 
582 static void fd_set_readable(grpc_fd* fd) {
583   gpr_mu_lock(&fd->mu);
584   set_ready_locked(fd, &fd->read_closure);
585   gpr_mu_unlock(&fd->mu);
586 }
587 
588 static void fd_set_writable(grpc_fd* fd) {
589   gpr_mu_lock(&fd->mu);
590   set_ready_locked(fd, &fd->write_closure);
591   gpr_mu_unlock(&fd->mu);
592 }
593 
594 static void fd_set_error(grpc_fd* /*fd*/) {
595   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
596     gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
597   }
598 }
599 
600 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
601                               grpc_pollset_worker* worker, uint32_t read_mask,
602                               uint32_t write_mask, grpc_fd_watcher* watcher) {
603   uint32_t mask = 0;
604   grpc_closure* cur;
605   int requested;
606   /* keep track of pollers that have requested our events, in case they change
607    */
608   GRPC_FD_REF(fd, "poll");
609 
610   gpr_mu_lock(&fd->mu);
611 
612   /* if we are shutdown, then don't add to the watcher set */
613   if (fd->shutdown) {
614     watcher->fd = nullptr;
615     watcher->pollset = nullptr;
616     watcher->worker = nullptr;
617     gpr_mu_unlock(&fd->mu);
618     GRPC_FD_UNREF(fd, "poll");
619     return 0;
620   }
621 
622   /* if there is nobody polling for read, but we need to, then start doing so */
623   cur = fd->read_closure;
624   requested = cur != CLOSURE_READY;
625   if (read_mask && fd->read_watcher == nullptr && requested) {
626     fd->read_watcher = watcher;
627     mask |= read_mask;
628   }
629   /* if there is nobody polling for write, but we need to, then start doing so
630    */
631   cur = fd->write_closure;
632   requested = cur != CLOSURE_READY;
633   if (write_mask && fd->write_watcher == nullptr && requested) {
634     fd->write_watcher = watcher;
635     mask |= write_mask;
636   }
637   /* if not polling, remember this watcher in case we need someone to later */
638   if (mask == 0 && worker != nullptr) {
639     watcher->next = &fd->inactive_watcher_root;
640     watcher->prev = watcher->next->prev;
641     watcher->next->prev = watcher->prev->next = watcher;
642   }
643   watcher->pollset = pollset;
644   watcher->worker = worker;
645   watcher->fd = fd;
646   gpr_mu_unlock(&fd->mu);
647 
648   return mask;
649 }
650 
651 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) {
652   int was_polling = 0;
653   int kick = 0;
654   grpc_fd* fd = watcher->fd;
655 
656   if (fd == nullptr) {
657     return;
658   }
659 
660   gpr_mu_lock(&fd->mu);
661 
662   if (watcher == fd->read_watcher) {
663     /* remove read watcher, kick if we still need a read */
664     was_polling = 1;
665     if (!got_read) {
666       kick = 1;
667     }
668     fd->read_watcher = nullptr;
669   }
670   if (watcher == fd->write_watcher) {
671     /* remove write watcher, kick if we still need a write */
672     was_polling = 1;
673     if (!got_write) {
674       kick = 1;
675     }
676     fd->write_watcher = nullptr;
677   }
678   if (!was_polling && watcher->worker != nullptr) {
679     /* remove from inactive list */
680     watcher->next->prev = watcher->prev;
681     watcher->prev->next = watcher->next;
682   }
683   if (got_read) {
684     if (set_ready_locked(fd, &fd->read_closure)) {
685       kick = 1;
686     }
687   }
688   if (got_write) {
689     if (set_ready_locked(fd, &fd->write_closure)) {
690       kick = 1;
691     }
692   }
693   if (kick) {
694     maybe_wake_one_watcher_locked(fd);
695   }
696   if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
697     close_fd_locked(fd);
698   }
699   gpr_mu_unlock(&fd->mu);
700 
701   GRPC_FD_UNREF(fd, "poll");
702 }
703 
704 /*******************************************************************************
705  * pollset_posix.c
706  */
707 
708 GPR_TLS_DECL(g_current_thread_poller);
709 GPR_TLS_DECL(g_current_thread_worker);
710 
711 static void remove_worker(grpc_pollset* /*p*/, grpc_pollset_worker* worker) {
712   worker->prev->next = worker->next;
713   worker->next->prev = worker->prev;
714 }
715 
716 static bool pollset_has_workers(grpc_pollset* p) {
717   return p->root_worker.next != &p->root_worker;
718 }
719 
720 static bool pollset_in_pollset_sets(grpc_pollset* p) {
721   return p->pollset_set_count;
722 }
723 
724 static bool pollset_has_observers(grpc_pollset* p) {
725   return pollset_has_workers(p) || pollset_in_pollset_sets(p);
726 }
727 
728 static grpc_pollset_worker* pop_front_worker(grpc_pollset* p) {
729   if (pollset_has_workers(p)) {
730     grpc_pollset_worker* w = p->root_worker.next;
731     remove_worker(p, w);
732     return w;
733   } else {
734     return nullptr;
735   }
736 }
737 
738 static void push_back_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
739   worker->next = &p->root_worker;
740   worker->prev = worker->next->prev;
741   worker->prev->next = worker->next->prev = worker;
742 }
743 
744 static void push_front_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
745   worker->prev = &p->root_worker;
746   worker->next = worker->prev->next;
747   worker->prev->next = worker->next->prev = worker;
748 }
749 
750 static void kick_append_error(grpc_error** composite, grpc_error* error) {
751   if (error == GRPC_ERROR_NONE) return;
752   if (*composite == GRPC_ERROR_NONE) {
753     *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Kick Failure");
754   }
755   *composite = grpc_error_add_child(*composite, error);
756 }
757 
758 static grpc_error* pollset_kick_ext(grpc_pollset* p,
759                                     grpc_pollset_worker* specific_worker,
760                                     uint32_t flags) {
761   GPR_TIMER_SCOPE("pollset_kick_ext", 0);
762   grpc_error* error = GRPC_ERROR_NONE;
763   GRPC_STATS_INC_POLLSET_KICK();
764 
765   /* pollset->mu already held */
766   if (specific_worker != nullptr) {
767     if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
768       GPR_TIMER_SCOPE("pollset_kick_ext.broadcast", 0);
769       GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
770       for (specific_worker = p->root_worker.next;
771            specific_worker != &p->root_worker;
772            specific_worker = specific_worker->next) {
773         kick_append_error(
774             &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
775       }
776       p->kicked_without_pollers = true;
777     } else if (gpr_tls_get(&g_current_thread_worker) !=
778                reinterpret_cast<intptr_t>(specific_worker)) {
779       GPR_TIMER_MARK("different_thread_worker", 0);
780       if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
781         specific_worker->reevaluate_polling_on_wakeup = true;
782       }
783       specific_worker->kicked_specifically = true;
784       kick_append_error(&error,
785                         grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
786     } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
787       GPR_TIMER_MARK("kick_yoself", 0);
788       if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
789         specific_worker->reevaluate_polling_on_wakeup = true;
790       }
791       specific_worker->kicked_specifically = true;
792       kick_append_error(&error,
793                         grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
794     }
795   } else if (gpr_tls_get(&g_current_thread_poller) !=
796              reinterpret_cast<intptr_t>(p)) {
797     GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
798     GPR_TIMER_MARK("kick_anonymous", 0);
799     specific_worker = pop_front_worker(p);
800     if (specific_worker != nullptr) {
801       if (gpr_tls_get(&g_current_thread_worker) ==
802           reinterpret_cast<intptr_t>(specific_worker)) {
803         GPR_TIMER_MARK("kick_anonymous_not_self", 0);
804         push_back_worker(p, specific_worker);
805         specific_worker = pop_front_worker(p);
806         if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
807             gpr_tls_get(&g_current_thread_worker) ==
808                 reinterpret_cast<intptr_t>(specific_worker)) {
809           push_back_worker(p, specific_worker);
810           specific_worker = nullptr;
811         }
812       }
813       if (specific_worker != nullptr) {
814         GPR_TIMER_MARK("finally_kick", 0);
815         push_back_worker(p, specific_worker);
816         kick_append_error(
817             &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
818       }
819     } else {
820       GPR_TIMER_MARK("kicked_no_pollers", 0);
821       p->kicked_without_pollers = true;
822     }
823   }
824 
825   GRPC_LOG_IF_ERROR("pollset_kick_ext", GRPC_ERROR_REF(error));
826   return error;
827 }
828 
829 static grpc_error* pollset_kick(grpc_pollset* p,
830                                 grpc_pollset_worker* specific_worker) {
831   return pollset_kick_ext(p, specific_worker, 0);
832 }
833 
834 /* global state management */
835 
836 static grpc_error* pollset_global_init(void) {
837   gpr_tls_init(&g_current_thread_poller);
838   gpr_tls_init(&g_current_thread_worker);
839   return GRPC_ERROR_NONE;
840 }
841 
842 static void pollset_global_shutdown(void) {
843   gpr_tls_destroy(&g_current_thread_poller);
844   gpr_tls_destroy(&g_current_thread_worker);
845 }
846 
847 /* main interface */
848 
849 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
850   gpr_mu_init(&pollset->mu);
851   *mu = &pollset->mu;
852   pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
853   pollset->shutting_down = 0;
854   pollset->called_shutdown = 0;
855   pollset->kicked_without_pollers = 0;
856   pollset->local_wakeup_cache = nullptr;
857   pollset->kicked_without_pollers = 0;
858   pollset->fd_count = 0;
859   pollset->fd_capacity = 0;
860   pollset->fds = nullptr;
861   pollset->pollset_set_count = 0;
862 }
863 
864 static void pollset_destroy(grpc_pollset* pollset) {
865   GPR_ASSERT(!pollset_has_workers(pollset));
866   while (pollset->local_wakeup_cache) {
867     grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
868     fork_fd_list_remove_node(pollset->local_wakeup_cache->fork_fd_list);
869     grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
870     gpr_free(pollset->local_wakeup_cache);
871     pollset->local_wakeup_cache = next;
872   }
873   gpr_free(pollset->fds);
874   gpr_mu_destroy(&pollset->mu);
875 }
876 
877 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
878   gpr_mu_lock(&pollset->mu);
879   size_t i;
880   /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
881   for (i = 0; i < pollset->fd_count; i++) {
882     if (pollset->fds[i] == fd) goto exit;
883   }
884   if (pollset->fd_count == pollset->fd_capacity) {
885     pollset->fd_capacity =
886         GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
887     pollset->fds = static_cast<grpc_fd**>(
888         gpr_realloc(pollset->fds, sizeof(grpc_fd*) * pollset->fd_capacity));
889   }
890   pollset->fds[pollset->fd_count++] = fd;
891   GRPC_FD_REF(fd, "multipoller");
892   pollset_kick(pollset, nullptr);
893 exit:
894   gpr_mu_unlock(&pollset->mu);
895 }
896 
897 static void finish_shutdown(grpc_pollset* pollset) {
898   size_t i;
899   for (i = 0; i < pollset->fd_count; i++) {
900     GRPC_FD_UNREF(pollset->fds[i], "multipoller");
901   }
902   pollset->fd_count = 0;
903   grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_done,
904                           GRPC_ERROR_NONE);
905 }
906 
907 static void work_combine_error(grpc_error** composite, grpc_error* error) {
908   if (error == GRPC_ERROR_NONE) return;
909   if (*composite == GRPC_ERROR_NONE) {
910     *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("pollset_work");
911   }
912   *composite = grpc_error_add_child(*composite, error);
913 }
914 
915 static grpc_error* pollset_work(grpc_pollset* pollset,
916                                 grpc_pollset_worker** worker_hdl,
917                                 grpc_millis deadline) {
918   GPR_TIMER_SCOPE("pollset_work", 0);
919   grpc_pollset_worker worker;
920   if (worker_hdl) *worker_hdl = &worker;
921   grpc_error* error = GRPC_ERROR_NONE;
922 
923   /* Avoid malloc for small number of elements. */
924   enum { inline_elements = 96 };
925   struct pollfd pollfd_space[inline_elements];
926   struct grpc_fd_watcher watcher_space[inline_elements];
927 
928   /* pollset->mu already held */
929   int added_worker = 0;
930   int locked = 1;
931   int queued_work = 0;
932   int keep_polling = 0;
933   /* this must happen before we (potentially) drop pollset->mu */
934   worker.next = worker.prev = nullptr;
935   worker.reevaluate_polling_on_wakeup = 0;
936   if (pollset->local_wakeup_cache != nullptr) {
937     worker.wakeup_fd = pollset->local_wakeup_cache;
938     pollset->local_wakeup_cache = worker.wakeup_fd->next;
939   } else {
940     worker.wakeup_fd = static_cast<grpc_cached_wakeup_fd*>(
941         gpr_malloc(sizeof(*worker.wakeup_fd)));
942     error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
943     fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
944     if (error != GRPC_ERROR_NONE) {
945       GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
946       return error;
947     }
948   }
949   worker.kicked_specifically = 0;
950   /* If we're shutting down then we don't execute any extended work */
951   if (pollset->shutting_down) {
952     GPR_TIMER_MARK("pollset_work.shutting_down", 0);
953     goto done;
954   }
955   /* Start polling, and keep doing so while we're being asked to
956      re-evaluate our pollers (this allows poll() based pollers to
957      ensure they don't miss wakeups) */
958   keep_polling = 1;
959   gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
960   while (keep_polling) {
961     keep_polling = 0;
962     if (!pollset->kicked_without_pollers ||
963         deadline <= grpc_core::ExecCtx::Get()->Now()) {
964       if (!added_worker) {
965         push_front_worker(pollset, &worker);
966         added_worker = 1;
967         gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
968       }
969       GPR_TIMER_SCOPE("maybe_work_and_unlock", 0);
970 #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
971 #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
972 
973       int timeout;
974       int r;
975       size_t i, fd_count;
976       nfds_t pfd_count;
977       grpc_fd_watcher* watchers;
978       struct pollfd* pfds;
979 
980       timeout = poll_deadline_to_millis_timeout(deadline);
981 
982       if (pollset->fd_count + 2 <= inline_elements) {
983         pfds = pollfd_space;
984         watchers = watcher_space;
985       } else {
986         /* Allocate one buffer to hold both pfds and watchers arrays */
987         const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2);
988         const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2);
989         void* buf = gpr_malloc(pfd_size + watch_size);
990         pfds = static_cast<struct pollfd*>(buf);
991         watchers = static_cast<grpc_fd_watcher*>(
992             static_cast<void*>((static_cast<char*>(buf) + pfd_size)));
993       }
994 
995       fd_count = 0;
996       pfd_count = 1;
997       pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
998       pfds[0].events = POLLIN;
999       pfds[0].revents = 0;
1000       for (i = 0; i < pollset->fd_count; i++) {
1001         if (fd_is_orphaned(pollset->fds[i]) ||
1002             gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) {
1003           GRPC_FD_UNREF(pollset->fds[i], "multipoller");
1004         } else {
1005           pollset->fds[fd_count++] = pollset->fds[i];
1006           watchers[pfd_count].fd = pollset->fds[i];
1007           GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
1008           pfds[pfd_count].fd = pollset->fds[i]->fd;
1009           pfds[pfd_count].revents = 0;
1010           pfd_count++;
1011         }
1012       }
1013       pollset->fd_count = fd_count;
1014       gpr_mu_unlock(&pollset->mu);
1015 
1016       for (i = 1; i < pfd_count; i++) {
1017         grpc_fd* fd = watchers[i].fd;
1018         pfds[i].events = static_cast<short>(
1019             fd_begin_poll(fd, pollset, &worker, POLLIN, POLLOUT, &watchers[i]));
1020         GRPC_FD_UNREF(fd, "multipoller_start");
1021       }
1022 
1023       /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
1024          even going into the blocking annotation if possible */
1025       GRPC_SCHEDULING_START_BLOCKING_REGION;
1026       GRPC_STATS_INC_SYSCALL_POLL();
1027       r = grpc_poll_function(pfds, pfd_count, timeout);
1028       GRPC_SCHEDULING_END_BLOCKING_REGION;
1029 
1030       if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1031         gpr_log(GPR_INFO, "%p poll=%d", pollset, r);
1032       }
1033 
1034       if (r < 0) {
1035         if (errno != EINTR) {
1036           work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
1037         }
1038 
1039         for (i = 1; i < pfd_count; i++) {
1040           if (watchers[i].fd == nullptr) {
1041             fd_end_poll(&watchers[i], 0, 0);
1042           } else {
1043             // Wake up all the file descriptors, if we have an invalid one
1044             // we can identify it on the next pollset_work()
1045             fd_end_poll(&watchers[i], 1, 1);
1046           }
1047         }
1048       } else if (r == 0) {
1049         for (i = 1; i < pfd_count; i++) {
1050           fd_end_poll(&watchers[i], 0, 0);
1051         }
1052       } else {
1053         if (pfds[0].revents & POLLIN_CHECK) {
1054           if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1055             gpr_log(GPR_INFO, "%p: got_wakeup", pollset);
1056           }
1057           work_combine_error(
1058               &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
1059         }
1060         for (i = 1; i < pfd_count; i++) {
1061           if (watchers[i].fd == nullptr) {
1062             fd_end_poll(&watchers[i], 0, 0);
1063           } else {
1064             if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1065               gpr_log(GPR_INFO, "%p got_event: %d r:%d w:%d [%d]", pollset,
1066                       pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
1067                       (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
1068             }
1069             /* This is a mitigation to prevent poll() from spinning on a
1070              ** POLLHUP https://github.com/grpc/grpc/pull/13665
1071              */
1072             if (pfds[i].revents & POLLHUP) {
1073               gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
1074             }
1075             fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
1076                         pfds[i].revents & POLLOUT_CHECK);
1077           }
1078         }
1079       }
1080 
1081       if (pfds != pollfd_space) {
1082         /* pfds and watchers are in the same memory block pointed to by pfds */
1083         gpr_free(pfds);
1084       }
1085 
1086       locked = 0;
1087     } else {
1088       GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1089       pollset->kicked_without_pollers = 0;
1090     }
1091   /* Finished execution - start cleaning up.
1092      Note that we may arrive here from outside the enclosing while() loop.
1093      In that case we won't loop though as we haven't added worker to the
1094      worker list, which means nobody could ask us to re-evaluate polling). */
1095   done:
1096     if (!locked) {
1097       queued_work |= grpc_core::ExecCtx::Get()->Flush();
1098       gpr_mu_lock(&pollset->mu);
1099       locked = 1;
1100     }
1101     /* If we're forced to re-evaluate polling (via pollset_kick with
1102        GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
1103        a loop */
1104     if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) {
1105       worker.reevaluate_polling_on_wakeup = 0;
1106       pollset->kicked_without_pollers = 0;
1107       if (queued_work || worker.kicked_specifically) {
1108         /* If there's queued work on the list, then set the deadline to be
1109            immediate so we get back out of the polling loop quickly */
1110         deadline = 0;
1111       }
1112       keep_polling = 1;
1113     }
1114   }
1115   gpr_tls_set(&g_current_thread_poller, 0);
1116   if (added_worker) {
1117     remove_worker(pollset, &worker);
1118     gpr_tls_set(&g_current_thread_worker, 0);
1119   }
1120   /* release wakeup fd to the local pool */
1121   worker.wakeup_fd->next = pollset->local_wakeup_cache;
1122   pollset->local_wakeup_cache = worker.wakeup_fd;
1123   /* check shutdown conditions */
1124   if (pollset->shutting_down) {
1125     if (pollset_has_workers(pollset)) {
1126       pollset_kick(pollset, nullptr);
1127     } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1128       pollset->called_shutdown = 1;
1129       gpr_mu_unlock(&pollset->mu);
1130       finish_shutdown(pollset);
1131       grpc_core::ExecCtx::Get()->Flush();
1132       /* Continuing to access pollset here is safe -- it is the caller's
1133        * responsibility to not destroy when it has outstanding calls to
1134        * pollset_work.
1135        * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
1136       gpr_mu_lock(&pollset->mu);
1137     }
1138   }
1139   if (worker_hdl) *worker_hdl = nullptr;
1140   GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1141   return error;
1142 }
1143 
1144 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
1145   GPR_ASSERT(!pollset->shutting_down);
1146   pollset->shutting_down = 1;
1147   pollset->shutdown_done = closure;
1148   pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1149   if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1150     pollset->called_shutdown = 1;
1151     finish_shutdown(pollset);
1152   }
1153 }
1154 
1155 static int poll_deadline_to_millis_timeout(grpc_millis deadline) {
1156   if (deadline == GRPC_MILLIS_INF_FUTURE) return -1;
1157   if (deadline == 0) return 0;
1158   grpc_millis n = deadline - grpc_core::ExecCtx::Get()->Now();
1159   if (n < 0) return 0;
1160   if (n > INT_MAX) return -1;
1161   return static_cast<int>(n);
1162 }
1163 
1164 /*******************************************************************************
1165  * pollset_set_posix.c
1166  */
1167 
1168 static grpc_pollset_set* pollset_set_create(void) {
1169   grpc_pollset_set* pollset_set =
1170       static_cast<grpc_pollset_set*>(gpr_zalloc(sizeof(*pollset_set)));
1171   gpr_mu_init(&pollset_set->mu);
1172   return pollset_set;
1173 }
1174 
1175 static void pollset_set_destroy(grpc_pollset_set* pollset_set) {
1176   size_t i;
1177   gpr_mu_destroy(&pollset_set->mu);
1178   for (i = 0; i < pollset_set->fd_count; i++) {
1179     GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1180   }
1181   for (i = 0; i < pollset_set->pollset_count; i++) {
1182     grpc_pollset* pollset = pollset_set->pollsets[i];
1183     gpr_mu_lock(&pollset->mu);
1184     pollset->pollset_set_count--;
1185     /* check shutdown */
1186     if (pollset->shutting_down && !pollset->called_shutdown &&
1187         !pollset_has_observers(pollset)) {
1188       pollset->called_shutdown = 1;
1189       gpr_mu_unlock(&pollset->mu);
1190       finish_shutdown(pollset);
1191     } else {
1192       gpr_mu_unlock(&pollset->mu);
1193     }
1194   }
1195   gpr_free(pollset_set->pollsets);
1196   gpr_free(pollset_set->pollset_sets);
1197   gpr_free(pollset_set->fds);
1198   gpr_free(pollset_set);
1199 }
1200 
1201 static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
1202                                     grpc_pollset* pollset) {
1203   size_t i, j;
1204   gpr_mu_lock(&pollset->mu);
1205   pollset->pollset_set_count++;
1206   gpr_mu_unlock(&pollset->mu);
1207   gpr_mu_lock(&pollset_set->mu);
1208   if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1209     pollset_set->pollset_capacity =
1210         GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1211     pollset_set->pollsets = static_cast<grpc_pollset**>(gpr_realloc(
1212         pollset_set->pollsets,
1213         pollset_set->pollset_capacity * sizeof(*pollset_set->pollsets)));
1214   }
1215   pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1216   for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1217     if (fd_is_orphaned(pollset_set->fds[i])) {
1218       GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1219     } else {
1220       pollset_add_fd(pollset, pollset_set->fds[i]);
1221       pollset_set->fds[j++] = pollset_set->fds[i];
1222     }
1223   }
1224   pollset_set->fd_count = j;
1225   gpr_mu_unlock(&pollset_set->mu);
1226 }
1227 
1228 static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
1229                                     grpc_pollset* pollset) {
1230   size_t i;
1231   gpr_mu_lock(&pollset_set->mu);
1232   for (i = 0; i < pollset_set->pollset_count; i++) {
1233     if (pollset_set->pollsets[i] == pollset) {
1234       pollset_set->pollset_count--;
1235       GPR_SWAP(grpc_pollset*, pollset_set->pollsets[i],
1236                pollset_set->pollsets[pollset_set->pollset_count]);
1237       break;
1238     }
1239   }
1240   gpr_mu_unlock(&pollset_set->mu);
1241   gpr_mu_lock(&pollset->mu);
1242   pollset->pollset_set_count--;
1243   /* check shutdown */
1244   if (pollset->shutting_down && !pollset->called_shutdown &&
1245       !pollset_has_observers(pollset)) {
1246     pollset->called_shutdown = 1;
1247     gpr_mu_unlock(&pollset->mu);
1248     finish_shutdown(pollset);
1249   } else {
1250     gpr_mu_unlock(&pollset->mu);
1251   }
1252 }
1253 
1254 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1255                                         grpc_pollset_set* item) {
1256   size_t i, j;
1257   gpr_mu_lock(&bag->mu);
1258   if (bag->pollset_set_count == bag->pollset_set_capacity) {
1259     bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1260     bag->pollset_sets = static_cast<grpc_pollset_set**>(
1261         gpr_realloc(bag->pollset_sets,
1262                     bag->pollset_set_capacity * sizeof(*bag->pollset_sets)));
1263   }
1264   bag->pollset_sets[bag->pollset_set_count++] = item;
1265   for (i = 0, j = 0; i < bag->fd_count; i++) {
1266     if (fd_is_orphaned(bag->fds[i])) {
1267       GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1268     } else {
1269       pollset_set_add_fd(item, bag->fds[i]);
1270       bag->fds[j++] = bag->fds[i];
1271     }
1272   }
1273   bag->fd_count = j;
1274   gpr_mu_unlock(&bag->mu);
1275 }
1276 
1277 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1278                                         grpc_pollset_set* item) {
1279   size_t i;
1280   gpr_mu_lock(&bag->mu);
1281   for (i = 0; i < bag->pollset_set_count; i++) {
1282     if (bag->pollset_sets[i] == item) {
1283       bag->pollset_set_count--;
1284       GPR_SWAP(grpc_pollset_set*, bag->pollset_sets[i],
1285                bag->pollset_sets[bag->pollset_set_count]);
1286       break;
1287     }
1288   }
1289   gpr_mu_unlock(&bag->mu);
1290 }
1291 
1292 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1293   size_t i;
1294   gpr_mu_lock(&pollset_set->mu);
1295   if (pollset_set->fd_count == pollset_set->fd_capacity) {
1296     pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1297     pollset_set->fds = static_cast<grpc_fd**>(
1298         gpr_realloc(pollset_set->fds,
1299                     pollset_set->fd_capacity * sizeof(*pollset_set->fds)));
1300   }
1301   GRPC_FD_REF(fd, "pollset_set");
1302   pollset_set->fds[pollset_set->fd_count++] = fd;
1303   for (i = 0; i < pollset_set->pollset_count; i++) {
1304     pollset_add_fd(pollset_set->pollsets[i], fd);
1305   }
1306   for (i = 0; i < pollset_set->pollset_set_count; i++) {
1307     pollset_set_add_fd(pollset_set->pollset_sets[i], fd);
1308   }
1309   gpr_mu_unlock(&pollset_set->mu);
1310 }
1311 
1312 static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1313   size_t i;
1314   gpr_mu_lock(&pollset_set->mu);
1315   for (i = 0; i < pollset_set->fd_count; i++) {
1316     if (pollset_set->fds[i] == fd) {
1317       pollset_set->fd_count--;
1318       GPR_SWAP(grpc_fd*, pollset_set->fds[i],
1319                pollset_set->fds[pollset_set->fd_count]);
1320       GRPC_FD_UNREF(fd, "pollset_set");
1321       break;
1322     }
1323   }
1324   for (i = 0; i < pollset_set->pollset_set_count; i++) {
1325     pollset_set_del_fd(pollset_set->pollset_sets[i], fd);
1326   }
1327   gpr_mu_unlock(&pollset_set->mu);
1328 }
1329 
1330 /*******************************************************************************
1331  * event engine binding
1332  */
1333 
1334 static bool is_any_background_poller_thread(void) { return false; }
1335 
1336 static void shutdown_background_closure(void) {}
1337 
1338 static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
1339                                              grpc_error* /*error*/) {
1340   return false;
1341 }
1342 
1343 static void shutdown_engine(void) {
1344   pollset_global_shutdown();
1345   if (track_fds_for_fork) {
1346     gpr_mu_destroy(&fork_fd_list_mu);
1347     grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1348   }
1349 }
1350 
1351 static const grpc_event_engine_vtable vtable = {
1352     sizeof(grpc_pollset),
1353     false,
1354     false,
1355 
1356     fd_create,
1357     fd_wrapped_fd,
1358     fd_orphan,
1359     fd_shutdown,
1360     fd_notify_on_read,
1361     fd_notify_on_write,
1362     fd_notify_on_error,
1363     fd_set_readable,
1364     fd_set_writable,
1365     fd_set_error,
1366     fd_is_shutdown,
1367 
1368     pollset_init,
1369     pollset_shutdown,
1370     pollset_destroy,
1371     pollset_work,
1372     pollset_kick,
1373     pollset_add_fd,
1374 
1375     pollset_set_create,
1376     pollset_set_destroy,
1377     pollset_set_add_pollset,
1378     pollset_set_del_pollset,
1379     pollset_set_add_pollset_set,
1380     pollset_set_del_pollset_set,
1381     pollset_set_add_fd,
1382     pollset_set_del_fd,
1383 
1384     is_any_background_poller_thread,
1385     shutdown_background_closure,
1386     shutdown_engine,
1387     add_closure_to_background_poller,
1388 };
1389 
1390 /* Called by the child process's post-fork handler to close open fds, including
1391  * worker wakeup fds. This allows gRPC to shutdown in the child process without
1392  * interfering with connections or RPCs ongoing in the parent. */
1393 static void reset_event_manager_on_fork() {
1394   gpr_mu_lock(&fork_fd_list_mu);
1395   while (fork_fd_list_head != nullptr) {
1396     if (fork_fd_list_head->fd != nullptr) {
1397       if (!fork_fd_list_head->fd->closed) {
1398         close(fork_fd_list_head->fd->fd);
1399       }
1400       fork_fd_list_head->fd->fd = -1;
1401     } else {
1402       close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
1403       fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
1404       close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
1405       fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
1406     }
1407     fork_fd_list_head = fork_fd_list_head->next;
1408   }
1409   gpr_mu_unlock(&fork_fd_list_mu);
1410 }
1411 
1412 const grpc_event_engine_vtable* grpc_init_poll_posix(
1413     bool /*explicit_request*/) {
1414   if (!grpc_has_wakeup_fd()) {
1415     gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
1416     return nullptr;
1417   }
1418   if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1419     return nullptr;
1420   }
1421   if (grpc_core::Fork::Enabled()) {
1422     track_fds_for_fork = true;
1423     gpr_mu_init(&fork_fd_list_mu);
1424     grpc_core::Fork::SetResetChildPollingEngineFunc(
1425         reset_event_manager_on_fork);
1426   }
1427   return &vtable;
1428 }
1429 
1430 #endif /* GRPC_POSIX_SOCKET_EV_POLL */
1431