1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_POSIX_SOCKET_EV_POLL
24
25 #include "src/core/lib/iomgr/ev_poll_posix.h"
26
27 #include <assert.h>
28 #include <errno.h>
29 #include <limits.h>
30 #include <poll.h>
31 #include <string.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34
35 #include <grpc/support/alloc.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/string_util.h>
38
39 #include "src/core/lib/debug/stats.h"
40 #include "src/core/lib/gpr/murmur_hash.h"
41 #include "src/core/lib/gpr/tls.h"
42 #include "src/core/lib/gpr/useful.h"
43 #include "src/core/lib/gprpp/thd.h"
44 #include "src/core/lib/iomgr/block_annotate.h"
45 #include "src/core/lib/iomgr/iomgr_internal.h"
46 #include "src/core/lib/iomgr/wakeup_fd_cv.h"
47 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
48 #include "src/core/lib/profiling/timers.h"
49
50 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
51
52 /*******************************************************************************
53 * FD declarations
54 */
55 typedef struct grpc_fd_watcher {
56 struct grpc_fd_watcher* next;
57 struct grpc_fd_watcher* prev;
58 grpc_pollset* pollset;
59 grpc_pollset_worker* worker;
60 grpc_fd* fd;
61 } grpc_fd_watcher;
62
63 typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;
64
65 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
66 struct grpc_fork_fd_list {
67 /* Only one of fd or cached_wakeup_fd will be set. The unused field will be
68 set to nullptr. */
69 grpc_fd* fd;
70 grpc_cached_wakeup_fd* cached_wakeup_fd;
71
72 grpc_fork_fd_list* next;
73 grpc_fork_fd_list* prev;
74 };
75
76 struct grpc_fd {
77 int fd;
78 /* refst format:
79 bit0: 1=active/0=orphaned
80 bit1-n: refcount
81 meaning that mostly we ref by two to avoid altering the orphaned bit,
82 and just unref by 1 when we're ready to flag the object as orphaned */
83 gpr_atm refst;
84
85 gpr_mu mu;
86 int shutdown;
87 int closed;
88 int released;
89 gpr_atm pollhup;
90 grpc_error* shutdown_error;
91
92 /* The watcher list.
93
94 The following watcher related fields are protected by watcher_mu.
95
96 An fd_watcher is an ephemeral object created when an fd wants to
97 begin polling, and destroyed after the poll.
98
99 It denotes the fd's interest in whether to read poll or write poll
100 or both or neither on this fd.
101
102 If a watcher is asked to poll for reads or writes, the read_watcher
103 or write_watcher fields are set respectively. A watcher may be asked
104 to poll for both, in which case both fields will be set.
105
106 read_watcher and write_watcher may be NULL if no watcher has been
107 asked to poll for reads or writes.
108
109 If an fd_watcher is not asked to poll for reads or writes, it's added
110 to a linked list of inactive watchers, rooted at inactive_watcher_root.
111 If at a later time there becomes need of a poller to poll, one of
112 the inactive pollers may be kicked out of their poll loops to take
113 that responsibility. */
114 grpc_fd_watcher inactive_watcher_root;
115 grpc_fd_watcher* read_watcher;
116 grpc_fd_watcher* write_watcher;
117
118 grpc_closure* read_closure;
119 grpc_closure* write_closure;
120
121 grpc_closure* on_done_closure;
122
123 grpc_iomgr_object iomgr_object;
124
125 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
126 grpc_fork_fd_list* fork_fd_list;
127 };
128
129 /* True when GRPC_ENABLE_FORK_SUPPORT=1. We do not support fork with poll-cv */
130 static bool track_fds_for_fork = false;
131
132 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
133 static grpc_fork_fd_list* fork_fd_list_head = nullptr;
134 static gpr_mu fork_fd_list_mu;
135
136 /* Begin polling on an fd.
137 Registers that the given pollset is interested in this fd - so that if read
138 or writability interest changes, the pollset can be kicked to pick up that
139 new interest.
140 Return value is:
141 (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
142 i.e. a combination of read_mask and write_mask determined by the fd's current
143 interest in said events.
144 Polling strategies that do not need to alter their behavior depending on the
145 fd's current interest (such as epoll) do not need to call this function.
146 MUST NOT be called with a pollset lock taken */
147 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
148 grpc_pollset_worker* worker, uint32_t read_mask,
149 uint32_t write_mask, grpc_fd_watcher* rec);
150 /* Complete polling previously started with fd_begin_poll
151 MUST NOT be called with a pollset lock taken
152 if got_read or got_write are 1, also does the become_{readable,writable} as
153 appropriate. */
154 static void fd_end_poll(grpc_fd_watcher* rec, int got_read, int got_write);
155
156 /* Return 1 if this fd is orphaned, 0 otherwise */
157 static bool fd_is_orphaned(grpc_fd* fd);
158
159 #ifndef NDEBUG
160 static void fd_ref(grpc_fd* fd, const char* reason, const char* file, int line);
161 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
162 int line);
163 #define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
164 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
165 #else
166 static void fd_ref(grpc_fd* fd);
167 static void fd_unref(grpc_fd* fd);
168 #define GRPC_FD_REF(fd, reason) fd_ref(fd)
169 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
170 #endif
171
172 #define CLOSURE_NOT_READY ((grpc_closure*)0)
173 #define CLOSURE_READY ((grpc_closure*)1)
174
175 /*******************************************************************************
176 * pollset declarations
177 */
178
179 typedef struct grpc_cached_wakeup_fd {
180 grpc_wakeup_fd fd;
181 struct grpc_cached_wakeup_fd* next;
182
183 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
184 grpc_fork_fd_list* fork_fd_list;
185 } grpc_cached_wakeup_fd;
186
187 struct grpc_pollset_worker {
188 grpc_cached_wakeup_fd* wakeup_fd;
189 int reevaluate_polling_on_wakeup;
190 int kicked_specifically;
191 struct grpc_pollset_worker* next;
192 struct grpc_pollset_worker* prev;
193 };
194
195 struct grpc_pollset {
196 gpr_mu mu;
197 grpc_pollset_worker root_worker;
198 int shutting_down;
199 int called_shutdown;
200 int kicked_without_pollers;
201 grpc_closure* shutdown_done;
202 int pollset_set_count;
203 /* all polled fds */
204 size_t fd_count;
205 size_t fd_capacity;
206 grpc_fd** fds;
207 /* Local cache of eventfds for workers */
208 grpc_cached_wakeup_fd* local_wakeup_cache;
209 };
210
211 /* Add an fd to a pollset */
212 static void pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd);
213
214 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
215
216 /* Convert a timespec to milliseconds:
217 - very small or negative poll times are clamped to zero to do a
218 non-blocking poll (which becomes spin polling)
219 - other small values are rounded up to one millisecond
220 - longer than a millisecond polls are rounded up to the next nearest
221 millisecond to avoid spinning
222 - infinite timeouts are converted to -1 */
223 static int poll_deadline_to_millis_timeout(grpc_millis deadline);
224
225 /* Allow kick to wakeup the currently polling worker */
226 #define GRPC_POLLSET_CAN_KICK_SELF 1
227 /* Force the wakee to repoll when awoken */
228 #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
229 /* As per pollset_kick, with an extended set of flags (defined above)
230 -- mostly for fd_posix's use. */
231 static grpc_error* pollset_kick_ext(grpc_pollset* p,
232 grpc_pollset_worker* specific_worker,
233 uint32_t flags) GRPC_MUST_USE_RESULT;
234
235 /* Return 1 if the pollset has active threads in pollset_work (pollset must
236 * be locked) */
237 static bool pollset_has_workers(grpc_pollset* pollset);
238
239 /*******************************************************************************
240 * pollset_set definitions
241 */
242
243 struct grpc_pollset_set {
244 gpr_mu mu;
245
246 size_t pollset_count;
247 size_t pollset_capacity;
248 grpc_pollset** pollsets;
249
250 size_t pollset_set_count;
251 size_t pollset_set_capacity;
252 struct grpc_pollset_set** pollset_sets;
253
254 size_t fd_count;
255 size_t fd_capacity;
256 grpc_fd** fds;
257 };
258
259 /*******************************************************************************
260 * condition variable polling definitions
261 */
262
263 #define POLLCV_THREAD_GRACE_MS 1000
264 #define CV_POLL_PERIOD_MS 1000
265 #define CV_DEFAULT_TABLE_SIZE 16
266
267 typedef struct poll_result {
268 gpr_refcount refcount;
269 grpc_cv_node* watchers;
270 int watchcount;
271 struct pollfd* fds;
272 nfds_t nfds;
273 int retval;
274 int err;
275 int completed;
276 } poll_result;
277
278 typedef struct poll_args {
279 grpc_core::Thread poller_thd;
280 gpr_cv trigger;
281 int trigger_set;
282 bool harvestable;
283 gpr_cv harvest;
284 bool joinable;
285 gpr_cv join;
286 struct pollfd* fds;
287 nfds_t nfds;
288 poll_result* result;
289 struct poll_args* next;
290 struct poll_args* prev;
291 } poll_args;
292
293 // This is a 2-tiered cache, we mantain a hash table
294 // of active poll calls, so we can wait on the result
295 // of that call. We also maintain freelists of inactive
296 // poll args and of dead poller threads.
297 typedef struct poll_hash_table {
298 poll_args* free_pollers;
299 poll_args** active_pollers;
300 poll_args* dead_pollers;
301 unsigned int size;
302 unsigned int count;
303 } poll_hash_table;
304
305 // TODO(kpayson64): Eliminate use of global non-POD variables
306 poll_hash_table poll_cache;
307 grpc_cv_fd_table g_cvfds;
308
309 /*******************************************************************************
310 * functions to track opened fds. No-ops unless track_fds_for_fork is true.
311 */
312
fork_fd_list_remove_node(grpc_fork_fd_list * node)313 static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
314 if (track_fds_for_fork) {
315 gpr_mu_lock(&fork_fd_list_mu);
316 if (fork_fd_list_head == node) {
317 fork_fd_list_head = node->next;
318 }
319 if (node->prev != nullptr) {
320 node->prev->next = node->next;
321 }
322 if (node->next != nullptr) {
323 node->next->prev = node->prev;
324 }
325 gpr_free(node);
326 gpr_mu_unlock(&fork_fd_list_mu);
327 }
328 }
329
fork_fd_list_add_node(grpc_fork_fd_list * node)330 static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
331 gpr_mu_lock(&fork_fd_list_mu);
332 node->next = fork_fd_list_head;
333 node->prev = nullptr;
334 if (fork_fd_list_head != nullptr) {
335 fork_fd_list_head->prev = node;
336 }
337 fork_fd_list_head = node;
338 gpr_mu_unlock(&fork_fd_list_mu);
339 }
340
fork_fd_list_add_grpc_fd(grpc_fd * fd)341 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
342 if (track_fds_for_fork) {
343 fd->fork_fd_list =
344 static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
345 fd->fork_fd_list->fd = fd;
346 fd->fork_fd_list->cached_wakeup_fd = nullptr;
347 fork_fd_list_add_node(fd->fork_fd_list);
348 }
349 }
350
fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd * fd)351 static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
352 if (track_fds_for_fork) {
353 fd->fork_fd_list =
354 static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
355 fd->fork_fd_list->cached_wakeup_fd = fd;
356 fd->fork_fd_list->fd = nullptr;
357 fork_fd_list_add_node(fd->fork_fd_list);
358 }
359 }
360
361 /*******************************************************************************
362 * fd_posix.c
363 */
364
365 #ifndef NDEBUG
366 #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
367 #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
ref_by(grpc_fd * fd,int n,const char * reason,const char * file,int line)368 static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
369 int line) {
370 if (grpc_trace_fd_refcount.enabled()) {
371 gpr_log(GPR_DEBUG,
372 "FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
373 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
374 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
375 }
376 #else
377 #define REF_BY(fd, n, reason) ref_by(fd, n)
378 #define UNREF_BY(fd, n, reason) unref_by(fd, n)
379 static void ref_by(grpc_fd* fd, int n) {
380 #endif
381 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
382 }
383
384 #ifndef NDEBUG
385 static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
386 int line) {
387 if (grpc_trace_fd_refcount.enabled()) {
388 gpr_log(GPR_DEBUG,
389 "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
390 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
391 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
392 }
393 #else
394 static void unref_by(grpc_fd* fd, int n) {
395 #endif
396 gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
397 if (old == n) {
398 gpr_mu_destroy(&fd->mu);
399 grpc_iomgr_unregister_object(&fd->iomgr_object);
400 fork_fd_list_remove_node(fd->fork_fd_list);
401 if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
402 gpr_free(fd);
403 } else {
404 GPR_ASSERT(old > n);
405 }
406 }
407
408 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
409 GPR_DEBUG_ASSERT(track_err == false);
410 grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r)));
411 gpr_mu_init(&r->mu);
412 gpr_atm_rel_store(&r->refst, 1);
413 r->shutdown = 0;
414 r->read_closure = CLOSURE_NOT_READY;
415 r->write_closure = CLOSURE_NOT_READY;
416 r->fd = fd;
417 r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
418 &r->inactive_watcher_root;
419 r->read_watcher = r->write_watcher = nullptr;
420 r->on_done_closure = nullptr;
421 r->closed = 0;
422 r->released = 0;
423 gpr_atm_no_barrier_store(&r->pollhup, 0);
424
425 char* name2;
426 gpr_asprintf(&name2, "%s fd=%d", name, fd);
427 grpc_iomgr_register_object(&r->iomgr_object, name2);
428 gpr_free(name2);
429 fork_fd_list_add_grpc_fd(r);
430 return r;
431 }
432
433 static bool fd_is_orphaned(grpc_fd* fd) {
434 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
435 }
436
437 static grpc_error* pollset_kick_locked(grpc_fd_watcher* watcher) {
438 gpr_mu_lock(&watcher->pollset->mu);
439 GPR_ASSERT(watcher->worker);
440 grpc_error* err = pollset_kick_ext(watcher->pollset, watcher->worker,
441 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
442 gpr_mu_unlock(&watcher->pollset->mu);
443 return err;
444 }
445
446 static void maybe_wake_one_watcher_locked(grpc_fd* fd) {
447 if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
448 pollset_kick_locked(fd->inactive_watcher_root.next);
449 } else if (fd->read_watcher) {
450 pollset_kick_locked(fd->read_watcher);
451 } else if (fd->write_watcher) {
452 pollset_kick_locked(fd->write_watcher);
453 }
454 }
455
456 static void wake_all_watchers_locked(grpc_fd* fd) {
457 grpc_fd_watcher* watcher;
458 for (watcher = fd->inactive_watcher_root.next;
459 watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
460 pollset_kick_locked(watcher);
461 }
462 if (fd->read_watcher) {
463 pollset_kick_locked(fd->read_watcher);
464 }
465 if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
466 pollset_kick_locked(fd->write_watcher);
467 }
468 }
469
470 static int has_watchers(grpc_fd* fd) {
471 return fd->read_watcher != nullptr || fd->write_watcher != nullptr ||
472 fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
473 }
474
475 static void close_fd_locked(grpc_fd* fd) {
476 fd->closed = 1;
477 if (!fd->released) {
478 close(fd->fd);
479 }
480 GRPC_CLOSURE_SCHED(fd->on_done_closure, GRPC_ERROR_NONE);
481 }
482
483 static int fd_wrapped_fd(grpc_fd* fd) {
484 if (fd->released || fd->closed) {
485 return -1;
486 } else {
487 return fd->fd;
488 }
489 }
490
491 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
492 const char* reason) {
493 fd->on_done_closure = on_done;
494 fd->released = release_fd != nullptr;
495 if (release_fd != nullptr) {
496 *release_fd = fd->fd;
497 fd->released = true;
498 }
499 gpr_mu_lock(&fd->mu);
500 REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
501 if (!has_watchers(fd)) {
502 close_fd_locked(fd);
503 } else {
504 wake_all_watchers_locked(fd);
505 }
506 gpr_mu_unlock(&fd->mu);
507 UNREF_BY(fd, 2, reason); /* drop the reference */
508 }
509
510 /* increment refcount by two to avoid changing the orphan bit */
511 #ifndef NDEBUG
512 static void fd_ref(grpc_fd* fd, const char* reason, const char* file,
513 int line) {
514 ref_by(fd, 2, reason, file, line);
515 }
516
517 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
518 int line) {
519 unref_by(fd, 2, reason, file, line);
520 }
521 #else
522 static void fd_ref(grpc_fd* fd) { ref_by(fd, 2); }
523
524 static void fd_unref(grpc_fd* fd) { unref_by(fd, 2); }
525 #endif
526
527 static grpc_error* fd_shutdown_error(grpc_fd* fd) {
528 if (!fd->shutdown) {
529 return GRPC_ERROR_NONE;
530 } else {
531 return grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
532 "FD shutdown", &fd->shutdown_error, 1),
533 GRPC_ERROR_INT_GRPC_STATUS,
534 GRPC_STATUS_UNAVAILABLE);
535 }
536 }
537
538 static void notify_on_locked(grpc_fd* fd, grpc_closure** st,
539 grpc_closure* closure) {
540 if (fd->shutdown || gpr_atm_no_barrier_load(&fd->pollhup)) {
541 GRPC_CLOSURE_SCHED(
542 closure, grpc_error_set_int(
543 GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"),
544 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
545 } else if (*st == CLOSURE_NOT_READY) {
546 /* not ready ==> switch to a waiting state by setting the closure */
547 *st = closure;
548 } else if (*st == CLOSURE_READY) {
549 /* already ready ==> queue the closure to run immediately */
550 *st = CLOSURE_NOT_READY;
551 GRPC_CLOSURE_SCHED(closure, fd_shutdown_error(fd));
552 maybe_wake_one_watcher_locked(fd);
553 } else {
554 /* upcallptr was set to a different closure. This is an error! */
555 gpr_log(GPR_ERROR,
556 "User called a notify_on function with a previous callback still "
557 "pending");
558 abort();
559 }
560 }
561
562 /* returns 1 if state becomes not ready */
563 static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {
564 if (*st == CLOSURE_READY) {
565 /* duplicate ready ==> ignore */
566 return 0;
567 } else if (*st == CLOSURE_NOT_READY) {
568 /* not ready, and not waiting ==> flag ready */
569 *st = CLOSURE_READY;
570 return 0;
571 } else {
572 /* waiting ==> queue closure */
573 GRPC_CLOSURE_SCHED(*st, fd_shutdown_error(fd));
574 *st = CLOSURE_NOT_READY;
575 return 1;
576 }
577 }
578
579 static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
580 gpr_mu_lock(&fd->mu);
581 /* only shutdown once */
582 if (!fd->shutdown) {
583 fd->shutdown = 1;
584 fd->shutdown_error = why;
585 /* signal read/write closed to OS so that future operations fail */
586 shutdown(fd->fd, SHUT_RDWR);
587 set_ready_locked(fd, &fd->read_closure);
588 set_ready_locked(fd, &fd->write_closure);
589 } else {
590 GRPC_ERROR_UNREF(why);
591 }
592 gpr_mu_unlock(&fd->mu);
593 }
594
595 static bool fd_is_shutdown(grpc_fd* fd) {
596 gpr_mu_lock(&fd->mu);
597 bool r = fd->shutdown;
598 gpr_mu_unlock(&fd->mu);
599 return r;
600 }
601
602 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
603 gpr_mu_lock(&fd->mu);
604 notify_on_locked(fd, &fd->read_closure, closure);
605 gpr_mu_unlock(&fd->mu);
606 }
607
608 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
609 gpr_mu_lock(&fd->mu);
610 notify_on_locked(fd, &fd->write_closure, closure);
611 gpr_mu_unlock(&fd->mu);
612 }
613
614 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
615 if (grpc_polling_trace.enabled()) {
616 gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
617 }
618 GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
619 }
620
621 static void fd_set_readable(grpc_fd* fd) {
622 gpr_mu_lock(&fd->mu);
623 set_ready_locked(fd, &fd->read_closure);
624 gpr_mu_unlock(&fd->mu);
625 }
626
627 static void fd_set_writable(grpc_fd* fd) {
628 gpr_mu_lock(&fd->mu);
629 set_ready_locked(fd, &fd->write_closure);
630 gpr_mu_unlock(&fd->mu);
631 }
632
633 static void fd_set_error(grpc_fd* fd) {
634 if (grpc_polling_trace.enabled()) {
635 gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
636 }
637 }
638
639 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
640 grpc_pollset_worker* worker, uint32_t read_mask,
641 uint32_t write_mask, grpc_fd_watcher* watcher) {
642 uint32_t mask = 0;
643 grpc_closure* cur;
644 int requested;
645 /* keep track of pollers that have requested our events, in case they change
646 */
647 GRPC_FD_REF(fd, "poll");
648
649 gpr_mu_lock(&fd->mu);
650
651 /* if we are shutdown, then don't add to the watcher set */
652 if (fd->shutdown) {
653 watcher->fd = nullptr;
654 watcher->pollset = nullptr;
655 watcher->worker = nullptr;
656 gpr_mu_unlock(&fd->mu);
657 GRPC_FD_UNREF(fd, "poll");
658 return 0;
659 }
660
661 /* if there is nobody polling for read, but we need to, then start doing so */
662 cur = fd->read_closure;
663 requested = cur != CLOSURE_READY;
664 if (read_mask && fd->read_watcher == nullptr && requested) {
665 fd->read_watcher = watcher;
666 mask |= read_mask;
667 }
668 /* if there is nobody polling for write, but we need to, then start doing so
669 */
670 cur = fd->write_closure;
671 requested = cur != CLOSURE_READY;
672 if (write_mask && fd->write_watcher == nullptr && requested) {
673 fd->write_watcher = watcher;
674 mask |= write_mask;
675 }
676 /* if not polling, remember this watcher in case we need someone to later */
677 if (mask == 0 && worker != nullptr) {
678 watcher->next = &fd->inactive_watcher_root;
679 watcher->prev = watcher->next->prev;
680 watcher->next->prev = watcher->prev->next = watcher;
681 }
682 watcher->pollset = pollset;
683 watcher->worker = worker;
684 watcher->fd = fd;
685 gpr_mu_unlock(&fd->mu);
686
687 return mask;
688 }
689
690 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) {
691 int was_polling = 0;
692 int kick = 0;
693 grpc_fd* fd = watcher->fd;
694
695 if (fd == nullptr) {
696 return;
697 }
698
699 gpr_mu_lock(&fd->mu);
700
701 if (watcher == fd->read_watcher) {
702 /* remove read watcher, kick if we still need a read */
703 was_polling = 1;
704 if (!got_read) {
705 kick = 1;
706 }
707 fd->read_watcher = nullptr;
708 }
709 if (watcher == fd->write_watcher) {
710 /* remove write watcher, kick if we still need a write */
711 was_polling = 1;
712 if (!got_write) {
713 kick = 1;
714 }
715 fd->write_watcher = nullptr;
716 }
717 if (!was_polling && watcher->worker != nullptr) {
718 /* remove from inactive list */
719 watcher->next->prev = watcher->prev;
720 watcher->prev->next = watcher->next;
721 }
722 if (got_read) {
723 if (set_ready_locked(fd, &fd->read_closure)) {
724 kick = 1;
725 }
726 }
727 if (got_write) {
728 if (set_ready_locked(fd, &fd->write_closure)) {
729 kick = 1;
730 }
731 }
732 if (kick) {
733 maybe_wake_one_watcher_locked(fd);
734 }
735 if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
736 close_fd_locked(fd);
737 }
738 gpr_mu_unlock(&fd->mu);
739
740 GRPC_FD_UNREF(fd, "poll");
741 }
742
743 /*******************************************************************************
744 * pollset_posix.c
745 */
746
747 GPR_TLS_DECL(g_current_thread_poller);
748 GPR_TLS_DECL(g_current_thread_worker);
749
750 static void remove_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
751 worker->prev->next = worker->next;
752 worker->next->prev = worker->prev;
753 }
754
755 static bool pollset_has_workers(grpc_pollset* p) {
756 return p->root_worker.next != &p->root_worker;
757 }
758
759 static bool pollset_in_pollset_sets(grpc_pollset* p) {
760 return p->pollset_set_count;
761 }
762
763 static bool pollset_has_observers(grpc_pollset* p) {
764 return pollset_has_workers(p) || pollset_in_pollset_sets(p);
765 }
766
767 static grpc_pollset_worker* pop_front_worker(grpc_pollset* p) {
768 if (pollset_has_workers(p)) {
769 grpc_pollset_worker* w = p->root_worker.next;
770 remove_worker(p, w);
771 return w;
772 } else {
773 return nullptr;
774 }
775 }
776
777 static void push_back_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
778 worker->next = &p->root_worker;
779 worker->prev = worker->next->prev;
780 worker->prev->next = worker->next->prev = worker;
781 }
782
783 static void push_front_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
784 worker->prev = &p->root_worker;
785 worker->next = worker->prev->next;
786 worker->prev->next = worker->next->prev = worker;
787 }
788
789 static void kick_append_error(grpc_error** composite, grpc_error* error) {
790 if (error == GRPC_ERROR_NONE) return;
791 if (*composite == GRPC_ERROR_NONE) {
792 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Kick Failure");
793 }
794 *composite = grpc_error_add_child(*composite, error);
795 }
796
797 static grpc_error* pollset_kick_ext(grpc_pollset* p,
798 grpc_pollset_worker* specific_worker,
799 uint32_t flags) {
800 GPR_TIMER_SCOPE("pollset_kick_ext", 0);
801 grpc_error* error = GRPC_ERROR_NONE;
802 GRPC_STATS_INC_POLLSET_KICK();
803
804 /* pollset->mu already held */
805 if (specific_worker != nullptr) {
806 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
807 GPR_TIMER_SCOPE("pollset_kick_ext.broadcast", 0);
808 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
809 for (specific_worker = p->root_worker.next;
810 specific_worker != &p->root_worker;
811 specific_worker = specific_worker->next) {
812 kick_append_error(
813 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
814 }
815 p->kicked_without_pollers = true;
816 } else if (gpr_tls_get(&g_current_thread_worker) !=
817 (intptr_t)specific_worker) {
818 GPR_TIMER_MARK("different_thread_worker", 0);
819 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
820 specific_worker->reevaluate_polling_on_wakeup = true;
821 }
822 specific_worker->kicked_specifically = true;
823 kick_append_error(&error,
824 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
825 } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
826 GPR_TIMER_MARK("kick_yoself", 0);
827 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
828 specific_worker->reevaluate_polling_on_wakeup = true;
829 }
830 specific_worker->kicked_specifically = true;
831 kick_append_error(&error,
832 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
833 }
834 } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
835 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
836 GPR_TIMER_MARK("kick_anonymous", 0);
837 specific_worker = pop_front_worker(p);
838 if (specific_worker != nullptr) {
839 if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
840 GPR_TIMER_MARK("kick_anonymous_not_self", 0);
841 push_back_worker(p, specific_worker);
842 specific_worker = pop_front_worker(p);
843 if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
844 gpr_tls_get(&g_current_thread_worker) ==
845 (intptr_t)specific_worker) {
846 push_back_worker(p, specific_worker);
847 specific_worker = nullptr;
848 }
849 }
850 if (specific_worker != nullptr) {
851 GPR_TIMER_MARK("finally_kick", 0);
852 push_back_worker(p, specific_worker);
853 kick_append_error(
854 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
855 }
856 } else {
857 GPR_TIMER_MARK("kicked_no_pollers", 0);
858 p->kicked_without_pollers = true;
859 }
860 }
861
862 GRPC_LOG_IF_ERROR("pollset_kick_ext", GRPC_ERROR_REF(error));
863 return error;
864 }
865
866 static grpc_error* pollset_kick(grpc_pollset* p,
867 grpc_pollset_worker* specific_worker) {
868 return pollset_kick_ext(p, specific_worker, 0);
869 }
870
871 /* global state management */
872
873 static grpc_error* pollset_global_init(void) {
874 gpr_tls_init(&g_current_thread_poller);
875 gpr_tls_init(&g_current_thread_worker);
876 return GRPC_ERROR_NONE;
877 }
878
879 static void pollset_global_shutdown(void) {
880 gpr_tls_destroy(&g_current_thread_poller);
881 gpr_tls_destroy(&g_current_thread_worker);
882 }
883
884 /* main interface */
885
886 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
887 gpr_mu_init(&pollset->mu);
888 *mu = &pollset->mu;
889 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
890 pollset->shutting_down = 0;
891 pollset->called_shutdown = 0;
892 pollset->kicked_without_pollers = 0;
893 pollset->local_wakeup_cache = nullptr;
894 pollset->kicked_without_pollers = 0;
895 pollset->fd_count = 0;
896 pollset->fd_capacity = 0;
897 pollset->fds = nullptr;
898 pollset->pollset_set_count = 0;
899 }
900
901 static void pollset_destroy(grpc_pollset* pollset) {
902 GPR_ASSERT(!pollset_has_workers(pollset));
903 while (pollset->local_wakeup_cache) {
904 grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
905 fork_fd_list_remove_node(pollset->local_wakeup_cache->fork_fd_list);
906 grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
907 gpr_free(pollset->local_wakeup_cache);
908 pollset->local_wakeup_cache = next;
909 }
910 gpr_free(pollset->fds);
911 gpr_mu_destroy(&pollset->mu);
912 }
913
914 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
915 gpr_mu_lock(&pollset->mu);
916 size_t i;
917 /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
918 for (i = 0; i < pollset->fd_count; i++) {
919 if (pollset->fds[i] == fd) goto exit;
920 }
921 if (pollset->fd_count == pollset->fd_capacity) {
922 pollset->fd_capacity =
923 GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
924 pollset->fds = static_cast<grpc_fd**>(
925 gpr_realloc(pollset->fds, sizeof(grpc_fd*) * pollset->fd_capacity));
926 }
927 pollset->fds[pollset->fd_count++] = fd;
928 GRPC_FD_REF(fd, "multipoller");
929 pollset_kick(pollset, nullptr);
930 exit:
931 gpr_mu_unlock(&pollset->mu);
932 }
933
934 static void finish_shutdown(grpc_pollset* pollset) {
935 size_t i;
936 for (i = 0; i < pollset->fd_count; i++) {
937 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
938 }
939 pollset->fd_count = 0;
940 GRPC_CLOSURE_SCHED(pollset->shutdown_done, GRPC_ERROR_NONE);
941 }
942
943 static void work_combine_error(grpc_error** composite, grpc_error* error) {
944 if (error == GRPC_ERROR_NONE) return;
945 if (*composite == GRPC_ERROR_NONE) {
946 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("pollset_work");
947 }
948 *composite = grpc_error_add_child(*composite, error);
949 }
950
951 static grpc_error* pollset_work(grpc_pollset* pollset,
952 grpc_pollset_worker** worker_hdl,
953 grpc_millis deadline) {
954 GPR_TIMER_SCOPE("pollset_work", 0);
955 grpc_pollset_worker worker;
956 if (worker_hdl) *worker_hdl = &worker;
957 grpc_error* error = GRPC_ERROR_NONE;
958
959 /* Avoid malloc for small number of elements. */
960 enum { inline_elements = 96 };
961 struct pollfd pollfd_space[inline_elements];
962 struct grpc_fd_watcher watcher_space[inline_elements];
963
964 /* pollset->mu already held */
965 int added_worker = 0;
966 int locked = 1;
967 int queued_work = 0;
968 int keep_polling = 0;
969 /* this must happen before we (potentially) drop pollset->mu */
970 worker.next = worker.prev = nullptr;
971 worker.reevaluate_polling_on_wakeup = 0;
972 if (pollset->local_wakeup_cache != nullptr) {
973 worker.wakeup_fd = pollset->local_wakeup_cache;
974 pollset->local_wakeup_cache = worker.wakeup_fd->next;
975 } else {
976 worker.wakeup_fd = static_cast<grpc_cached_wakeup_fd*>(
977 gpr_malloc(sizeof(*worker.wakeup_fd)));
978 error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
979 fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
980 if (error != GRPC_ERROR_NONE) {
981 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
982 return error;
983 }
984 }
985 worker.kicked_specifically = 0;
986 /* If we're shutting down then we don't execute any extended work */
987 if (pollset->shutting_down) {
988 GPR_TIMER_MARK("pollset_work.shutting_down", 0);
989 goto done;
990 }
991 /* Start polling, and keep doing so while we're being asked to
992 re-evaluate our pollers (this allows poll() based pollers to
993 ensure they don't miss wakeups) */
994 keep_polling = 1;
995 gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
996 while (keep_polling) {
997 keep_polling = 0;
998 if (!pollset->kicked_without_pollers ||
999 deadline <= grpc_core::ExecCtx::Get()->Now()) {
1000 if (!added_worker) {
1001 push_front_worker(pollset, &worker);
1002 added_worker = 1;
1003 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
1004 }
1005 GPR_TIMER_SCOPE("maybe_work_and_unlock", 0);
1006 #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
1007 #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
1008
1009 int timeout;
1010 int r;
1011 size_t i, fd_count;
1012 nfds_t pfd_count;
1013 grpc_fd_watcher* watchers;
1014 struct pollfd* pfds;
1015
1016 timeout = poll_deadline_to_millis_timeout(deadline);
1017
1018 if (pollset->fd_count + 2 <= inline_elements) {
1019 pfds = pollfd_space;
1020 watchers = watcher_space;
1021 } else {
1022 /* Allocate one buffer to hold both pfds and watchers arrays */
1023 const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2);
1024 const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2);
1025 void* buf = gpr_malloc(pfd_size + watch_size);
1026 pfds = static_cast<struct pollfd*>(buf);
1027 watchers = static_cast<grpc_fd_watcher*>(
1028 (void*)(static_cast<char*>(buf) + pfd_size));
1029 }
1030
1031 fd_count = 0;
1032 pfd_count = 1;
1033 pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
1034 pfds[0].events = POLLIN;
1035 pfds[0].revents = 0;
1036 for (i = 0; i < pollset->fd_count; i++) {
1037 if (fd_is_orphaned(pollset->fds[i]) ||
1038 gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) {
1039 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
1040 } else {
1041 pollset->fds[fd_count++] = pollset->fds[i];
1042 watchers[pfd_count].fd = pollset->fds[i];
1043 GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
1044 pfds[pfd_count].fd = pollset->fds[i]->fd;
1045 pfds[pfd_count].revents = 0;
1046 pfd_count++;
1047 }
1048 }
1049 pollset->fd_count = fd_count;
1050 gpr_mu_unlock(&pollset->mu);
1051
1052 for (i = 1; i < pfd_count; i++) {
1053 grpc_fd* fd = watchers[i].fd;
1054 pfds[i].events = static_cast<short>(
1055 fd_begin_poll(fd, pollset, &worker, POLLIN, POLLOUT, &watchers[i]));
1056 GRPC_FD_UNREF(fd, "multipoller_start");
1057 }
1058
1059 /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
1060 even going into the blocking annotation if possible */
1061 GRPC_SCHEDULING_START_BLOCKING_REGION;
1062 GRPC_STATS_INC_SYSCALL_POLL();
1063 r = grpc_poll_function(pfds, pfd_count, timeout);
1064 GRPC_SCHEDULING_END_BLOCKING_REGION;
1065
1066 if (grpc_polling_trace.enabled()) {
1067 gpr_log(GPR_INFO, "%p poll=%d", pollset, r);
1068 }
1069
1070 if (r < 0) {
1071 if (errno != EINTR) {
1072 work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
1073 }
1074
1075 for (i = 1; i < pfd_count; i++) {
1076 if (watchers[i].fd == nullptr) {
1077 fd_end_poll(&watchers[i], 0, 0);
1078 } else {
1079 // Wake up all the file descriptors, if we have an invalid one
1080 // we can identify it on the next pollset_work()
1081 fd_end_poll(&watchers[i], 1, 1);
1082 }
1083 }
1084 } else if (r == 0) {
1085 for (i = 1; i < pfd_count; i++) {
1086 fd_end_poll(&watchers[i], 0, 0);
1087 }
1088 } else {
1089 if (pfds[0].revents & POLLIN_CHECK) {
1090 if (grpc_polling_trace.enabled()) {
1091 gpr_log(GPR_INFO, "%p: got_wakeup", pollset);
1092 }
1093 work_combine_error(
1094 &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
1095 }
1096 for (i = 1; i < pfd_count; i++) {
1097 if (watchers[i].fd == nullptr) {
1098 fd_end_poll(&watchers[i], 0, 0);
1099 } else {
1100 if (grpc_polling_trace.enabled()) {
1101 gpr_log(GPR_INFO, "%p got_event: %d r:%d w:%d [%d]", pollset,
1102 pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
1103 (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
1104 }
1105 /* This is a mitigation to prevent poll() from spinning on a
1106 ** POLLHUP https://github.com/grpc/grpc/pull/13665
1107 */
1108 if (pfds[i].revents & POLLHUP) {
1109 gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
1110 }
1111 fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
1112 pfds[i].revents & POLLOUT_CHECK);
1113 }
1114 }
1115 }
1116
1117 if (pfds != pollfd_space) {
1118 /* pfds and watchers are in the same memory block pointed to by pfds */
1119 gpr_free(pfds);
1120 }
1121
1122 locked = 0;
1123 } else {
1124 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1125 pollset->kicked_without_pollers = 0;
1126 }
1127 /* Finished execution - start cleaning up.
1128 Note that we may arrive here from outside the enclosing while() loop.
1129 In that case we won't loop though as we haven't added worker to the
1130 worker list, which means nobody could ask us to re-evaluate polling). */
1131 done:
1132 if (!locked) {
1133 queued_work |= grpc_core::ExecCtx::Get()->Flush();
1134 gpr_mu_lock(&pollset->mu);
1135 locked = 1;
1136 }
1137 /* If we're forced to re-evaluate polling (via pollset_kick with
1138 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
1139 a loop */
1140 if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) {
1141 worker.reevaluate_polling_on_wakeup = 0;
1142 pollset->kicked_without_pollers = 0;
1143 if (queued_work || worker.kicked_specifically) {
1144 /* If there's queued work on the list, then set the deadline to be
1145 immediate so we get back out of the polling loop quickly */
1146 deadline = 0;
1147 }
1148 keep_polling = 1;
1149 }
1150 }
1151 gpr_tls_set(&g_current_thread_poller, 0);
1152 if (added_worker) {
1153 remove_worker(pollset, &worker);
1154 gpr_tls_set(&g_current_thread_worker, 0);
1155 }
1156 /* release wakeup fd to the local pool */
1157 worker.wakeup_fd->next = pollset->local_wakeup_cache;
1158 pollset->local_wakeup_cache = worker.wakeup_fd;
1159 /* check shutdown conditions */
1160 if (pollset->shutting_down) {
1161 if (pollset_has_workers(pollset)) {
1162 pollset_kick(pollset, nullptr);
1163 } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1164 pollset->called_shutdown = 1;
1165 gpr_mu_unlock(&pollset->mu);
1166 finish_shutdown(pollset);
1167 grpc_core::ExecCtx::Get()->Flush();
1168 /* Continuing to access pollset here is safe -- it is the caller's
1169 * responsibility to not destroy when it has outstanding calls to
1170 * pollset_work.
1171 * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
1172 gpr_mu_lock(&pollset->mu);
1173 }
1174 }
1175 if (worker_hdl) *worker_hdl = nullptr;
1176 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1177 return error;
1178 }
1179
1180 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
1181 GPR_ASSERT(!pollset->shutting_down);
1182 pollset->shutting_down = 1;
1183 pollset->shutdown_done = closure;
1184 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1185 if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1186 pollset->called_shutdown = 1;
1187 finish_shutdown(pollset);
1188 }
1189 }
1190
1191 static int poll_deadline_to_millis_timeout(grpc_millis deadline) {
1192 if (deadline == GRPC_MILLIS_INF_FUTURE) return -1;
1193 if (deadline == 0) return 0;
1194 grpc_millis n = deadline - grpc_core::ExecCtx::Get()->Now();
1195 if (n < 0) return 0;
1196 if (n > INT_MAX) return -1;
1197 return static_cast<int>(n);
1198 }
1199
1200 /*******************************************************************************
1201 * pollset_set_posix.c
1202 */
1203
1204 static grpc_pollset_set* pollset_set_create(void) {
1205 grpc_pollset_set* pollset_set =
1206 static_cast<grpc_pollset_set*>(gpr_zalloc(sizeof(*pollset_set)));
1207 gpr_mu_init(&pollset_set->mu);
1208 return pollset_set;
1209 }
1210
1211 static void pollset_set_destroy(grpc_pollset_set* pollset_set) {
1212 size_t i;
1213 gpr_mu_destroy(&pollset_set->mu);
1214 for (i = 0; i < pollset_set->fd_count; i++) {
1215 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1216 }
1217 for (i = 0; i < pollset_set->pollset_count; i++) {
1218 grpc_pollset* pollset = pollset_set->pollsets[i];
1219 gpr_mu_lock(&pollset->mu);
1220 pollset->pollset_set_count--;
1221 /* check shutdown */
1222 if (pollset->shutting_down && !pollset->called_shutdown &&
1223 !pollset_has_observers(pollset)) {
1224 pollset->called_shutdown = 1;
1225 gpr_mu_unlock(&pollset->mu);
1226 finish_shutdown(pollset);
1227 } else {
1228 gpr_mu_unlock(&pollset->mu);
1229 }
1230 }
1231 gpr_free(pollset_set->pollsets);
1232 gpr_free(pollset_set->pollset_sets);
1233 gpr_free(pollset_set->fds);
1234 gpr_free(pollset_set);
1235 }
1236
1237 static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
1238 grpc_pollset* pollset) {
1239 size_t i, j;
1240 gpr_mu_lock(&pollset->mu);
1241 pollset->pollset_set_count++;
1242 gpr_mu_unlock(&pollset->mu);
1243 gpr_mu_lock(&pollset_set->mu);
1244 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1245 pollset_set->pollset_capacity =
1246 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1247 pollset_set->pollsets = static_cast<grpc_pollset**>(gpr_realloc(
1248 pollset_set->pollsets,
1249 pollset_set->pollset_capacity * sizeof(*pollset_set->pollsets)));
1250 }
1251 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1252 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1253 if (fd_is_orphaned(pollset_set->fds[i])) {
1254 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1255 } else {
1256 pollset_add_fd(pollset, pollset_set->fds[i]);
1257 pollset_set->fds[j++] = pollset_set->fds[i];
1258 }
1259 }
1260 pollset_set->fd_count = j;
1261 gpr_mu_unlock(&pollset_set->mu);
1262 }
1263
1264 static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
1265 grpc_pollset* pollset) {
1266 size_t i;
1267 gpr_mu_lock(&pollset_set->mu);
1268 for (i = 0; i < pollset_set->pollset_count; i++) {
1269 if (pollset_set->pollsets[i] == pollset) {
1270 pollset_set->pollset_count--;
1271 GPR_SWAP(grpc_pollset*, pollset_set->pollsets[i],
1272 pollset_set->pollsets[pollset_set->pollset_count]);
1273 break;
1274 }
1275 }
1276 gpr_mu_unlock(&pollset_set->mu);
1277 gpr_mu_lock(&pollset->mu);
1278 pollset->pollset_set_count--;
1279 /* check shutdown */
1280 if (pollset->shutting_down && !pollset->called_shutdown &&
1281 !pollset_has_observers(pollset)) {
1282 pollset->called_shutdown = 1;
1283 gpr_mu_unlock(&pollset->mu);
1284 finish_shutdown(pollset);
1285 } else {
1286 gpr_mu_unlock(&pollset->mu);
1287 }
1288 }
1289
1290 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1291 grpc_pollset_set* item) {
1292 size_t i, j;
1293 gpr_mu_lock(&bag->mu);
1294 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1295 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1296 bag->pollset_sets = static_cast<grpc_pollset_set**>(
1297 gpr_realloc(bag->pollset_sets,
1298 bag->pollset_set_capacity * sizeof(*bag->pollset_sets)));
1299 }
1300 bag->pollset_sets[bag->pollset_set_count++] = item;
1301 for (i = 0, j = 0; i < bag->fd_count; i++) {
1302 if (fd_is_orphaned(bag->fds[i])) {
1303 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1304 } else {
1305 pollset_set_add_fd(item, bag->fds[i]);
1306 bag->fds[j++] = bag->fds[i];
1307 }
1308 }
1309 bag->fd_count = j;
1310 gpr_mu_unlock(&bag->mu);
1311 }
1312
1313 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1314 grpc_pollset_set* item) {
1315 size_t i;
1316 gpr_mu_lock(&bag->mu);
1317 for (i = 0; i < bag->pollset_set_count; i++) {
1318 if (bag->pollset_sets[i] == item) {
1319 bag->pollset_set_count--;
1320 GPR_SWAP(grpc_pollset_set*, bag->pollset_sets[i],
1321 bag->pollset_sets[bag->pollset_set_count]);
1322 break;
1323 }
1324 }
1325 gpr_mu_unlock(&bag->mu);
1326 }
1327
1328 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1329 size_t i;
1330 gpr_mu_lock(&pollset_set->mu);
1331 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1332 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1333 pollset_set->fds = static_cast<grpc_fd**>(
1334 gpr_realloc(pollset_set->fds,
1335 pollset_set->fd_capacity * sizeof(*pollset_set->fds)));
1336 }
1337 GRPC_FD_REF(fd, "pollset_set");
1338 pollset_set->fds[pollset_set->fd_count++] = fd;
1339 for (i = 0; i < pollset_set->pollset_count; i++) {
1340 pollset_add_fd(pollset_set->pollsets[i], fd);
1341 }
1342 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1343 pollset_set_add_fd(pollset_set->pollset_sets[i], fd);
1344 }
1345 gpr_mu_unlock(&pollset_set->mu);
1346 }
1347
1348 static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1349 size_t i;
1350 gpr_mu_lock(&pollset_set->mu);
1351 for (i = 0; i < pollset_set->fd_count; i++) {
1352 if (pollset_set->fds[i] == fd) {
1353 pollset_set->fd_count--;
1354 GPR_SWAP(grpc_fd*, pollset_set->fds[i],
1355 pollset_set->fds[pollset_set->fd_count]);
1356 GRPC_FD_UNREF(fd, "pollset_set");
1357 break;
1358 }
1359 }
1360 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1361 pollset_set_del_fd(pollset_set->pollset_sets[i], fd);
1362 }
1363 gpr_mu_unlock(&pollset_set->mu);
1364 }
1365
1366 /*******************************************************************************
1367 * Condition Variable polling extensions
1368 */
1369
1370 static void run_poll(void* args);
1371 static void cache_poller_locked(poll_args* args);
1372 static void cache_harvest_locked();
1373
1374 static void cache_insert_locked(poll_args* args) {
1375 uint32_t key = gpr_murmur_hash3(args->fds, args->nfds * sizeof(struct pollfd),
1376 0xDEADBEEF);
1377 key = key % poll_cache.size;
1378 if (poll_cache.active_pollers[key]) {
1379 poll_cache.active_pollers[key]->prev = args;
1380 }
1381 args->next = poll_cache.active_pollers[key];
1382 args->prev = nullptr;
1383 poll_cache.active_pollers[key] = args;
1384 poll_cache.count++;
1385 }
1386
1387 static void init_result(poll_args* pargs) {
1388 pargs->result = static_cast<poll_result*>(gpr_malloc(sizeof(poll_result)));
1389 gpr_ref_init(&pargs->result->refcount, 1);
1390 pargs->result->watchers = nullptr;
1391 pargs->result->watchcount = 0;
1392 pargs->result->fds = static_cast<struct pollfd*>(
1393 gpr_malloc(sizeof(struct pollfd) * pargs->nfds));
1394 memcpy(pargs->result->fds, pargs->fds, sizeof(struct pollfd) * pargs->nfds);
1395 pargs->result->nfds = pargs->nfds;
1396 pargs->result->retval = 0;
1397 pargs->result->err = 0;
1398 pargs->result->completed = 0;
1399 }
1400
1401 // Creates a poll_args object for a given arguments to poll().
1402 // This object may return a poll_args in the cache.
1403 static poll_args* get_poller_locked(struct pollfd* fds, nfds_t count) {
1404 uint32_t key =
1405 gpr_murmur_hash3(fds, count * sizeof(struct pollfd), 0xDEADBEEF);
1406 key = key % poll_cache.size;
1407 poll_args* curr = poll_cache.active_pollers[key];
1408 while (curr) {
1409 if (curr->nfds == count &&
1410 memcmp(curr->fds, fds, count * sizeof(struct pollfd)) == 0) {
1411 gpr_free(fds);
1412 return curr;
1413 }
1414 curr = curr->next;
1415 }
1416
1417 if (poll_cache.free_pollers) {
1418 poll_args* pargs = poll_cache.free_pollers;
1419 poll_cache.free_pollers = pargs->next;
1420 if (poll_cache.free_pollers) {
1421 poll_cache.free_pollers->prev = nullptr;
1422 }
1423 pargs->fds = fds;
1424 pargs->nfds = count;
1425 pargs->next = nullptr;
1426 pargs->prev = nullptr;
1427 init_result(pargs);
1428 cache_poller_locked(pargs);
1429 return pargs;
1430 }
1431
1432 poll_args* pargs =
1433 static_cast<poll_args*>(gpr_malloc(sizeof(struct poll_args)));
1434 gpr_cv_init(&pargs->trigger);
1435 gpr_cv_init(&pargs->harvest);
1436 gpr_cv_init(&pargs->join);
1437 pargs->harvestable = false;
1438 pargs->joinable = false;
1439 pargs->fds = fds;
1440 pargs->nfds = count;
1441 pargs->next = nullptr;
1442 pargs->prev = nullptr;
1443 pargs->trigger_set = 0;
1444 init_result(pargs);
1445 cache_poller_locked(pargs);
1446 gpr_ref(&g_cvfds.pollcount);
1447 pargs->poller_thd = grpc_core::Thread("grpc_poller", &run_poll, pargs);
1448 pargs->poller_thd.Start();
1449 return pargs;
1450 }
1451
1452 static void cache_delete_locked(poll_args* args) {
1453 if (!args->prev) {
1454 uint32_t key = gpr_murmur_hash3(
1455 args->fds, args->nfds * sizeof(struct pollfd), 0xDEADBEEF);
1456 key = key % poll_cache.size;
1457 GPR_ASSERT(poll_cache.active_pollers[key] == args);
1458 poll_cache.active_pollers[key] = args->next;
1459 } else {
1460 args->prev->next = args->next;
1461 }
1462
1463 if (args->next) {
1464 args->next->prev = args->prev;
1465 }
1466
1467 poll_cache.count--;
1468 if (poll_cache.free_pollers) {
1469 poll_cache.free_pollers->prev = args;
1470 }
1471 args->prev = nullptr;
1472 args->next = poll_cache.free_pollers;
1473 gpr_free(args->fds);
1474 poll_cache.free_pollers = args;
1475 }
1476
1477 static void cache_poller_locked(poll_args* args) {
1478 if (poll_cache.count + 1 > poll_cache.size / 2) {
1479 poll_args** old_active_pollers = poll_cache.active_pollers;
1480 poll_cache.size = poll_cache.size * 2;
1481 poll_cache.count = 0;
1482 poll_cache.active_pollers =
1483 static_cast<poll_args**>(gpr_malloc(sizeof(void*) * poll_cache.size));
1484 for (unsigned int i = 0; i < poll_cache.size; i++) {
1485 poll_cache.active_pollers[i] = nullptr;
1486 }
1487 for (unsigned int i = 0; i < poll_cache.size / 2; i++) {
1488 poll_args* curr = old_active_pollers[i];
1489 poll_args* next = nullptr;
1490 while (curr) {
1491 next = curr->next;
1492 cache_insert_locked(curr);
1493 curr = next;
1494 }
1495 }
1496 gpr_free(old_active_pollers);
1497 }
1498
1499 cache_insert_locked(args);
1500 }
1501
1502 static void cache_destroy_locked(poll_args* args) {
1503 if (args->next) {
1504 args->next->prev = args->prev;
1505 }
1506
1507 if (args->prev) {
1508 args->prev->next = args->next;
1509 } else {
1510 poll_cache.free_pollers = args->next;
1511 }
1512
1513 // Now move this args to the dead poller list for later join
1514 if (poll_cache.dead_pollers != nullptr) {
1515 poll_cache.dead_pollers->prev = args;
1516 }
1517 args->prev = nullptr;
1518 args->next = poll_cache.dead_pollers;
1519 poll_cache.dead_pollers = args;
1520 }
1521
1522 static void cache_harvest_locked() {
1523 while (poll_cache.dead_pollers) {
1524 poll_args* args = poll_cache.dead_pollers;
1525 poll_cache.dead_pollers = poll_cache.dead_pollers->next;
1526 // Keep the list consistent in case new dead pollers get added when we
1527 // release the lock below to wait on joining
1528 if (poll_cache.dead_pollers) {
1529 poll_cache.dead_pollers->prev = nullptr;
1530 }
1531 args->harvestable = true;
1532 gpr_cv_signal(&args->harvest);
1533 while (!args->joinable) {
1534 gpr_cv_wait(&args->join, &g_cvfds.mu,
1535 gpr_inf_future(GPR_CLOCK_MONOTONIC));
1536 }
1537 args->poller_thd.Join();
1538 gpr_free(args);
1539 }
1540 }
1541
1542 static void decref_poll_result(poll_result* res) {
1543 if (gpr_unref(&res->refcount)) {
1544 GPR_ASSERT(!res->watchers);
1545 gpr_free(res->fds);
1546 gpr_free(res);
1547 }
1548 }
1549
1550 void remove_cvn(grpc_cv_node** head, grpc_cv_node* target) {
1551 if (target->next) {
1552 target->next->prev = target->prev;
1553 }
1554
1555 if (target->prev) {
1556 target->prev->next = target->next;
1557 } else {
1558 *head = target->next;
1559 }
1560 }
1561
1562 gpr_timespec thread_grace;
1563
1564 // Poll in a background thread
1565 static void run_poll(void* args) {
1566 poll_args* pargs = static_cast<poll_args*>(args);
1567 while (1) {
1568 poll_result* result = pargs->result;
1569 int retval = g_cvfds.poll(result->fds, result->nfds, CV_POLL_PERIOD_MS);
1570 gpr_mu_lock(&g_cvfds.mu);
1571 cache_harvest_locked();
1572 if (retval != 0) {
1573 result->completed = 1;
1574 result->retval = retval;
1575 result->err = errno;
1576 grpc_cv_node* watcher = result->watchers;
1577 while (watcher) {
1578 gpr_cv_signal(watcher->cv);
1579 watcher = watcher->next;
1580 }
1581 }
1582 if (result->watchcount == 0 || result->completed) {
1583 cache_delete_locked(pargs);
1584 decref_poll_result(result);
1585 // Leave this polling thread alive for a grace period to do another poll()
1586 // op
1587 gpr_timespec deadline = gpr_now(GPR_CLOCK_MONOTONIC);
1588 deadline = gpr_time_add(deadline, thread_grace);
1589 pargs->trigger_set = 0;
1590 gpr_cv_wait(&pargs->trigger, &g_cvfds.mu, deadline);
1591 cache_harvest_locked();
1592 if (!pargs->trigger_set) {
1593 cache_destroy_locked(pargs);
1594 break;
1595 }
1596 }
1597 gpr_mu_unlock(&g_cvfds.mu);
1598 }
1599
1600 if (gpr_unref(&g_cvfds.pollcount)) {
1601 gpr_cv_signal(&g_cvfds.shutdown_cv);
1602 }
1603 while (!pargs->harvestable) {
1604 gpr_cv_wait(&pargs->harvest, &g_cvfds.mu,
1605 gpr_inf_future(GPR_CLOCK_MONOTONIC));
1606 }
1607 pargs->joinable = true;
1608 gpr_cv_signal(&pargs->join);
1609 gpr_mu_unlock(&g_cvfds.mu);
1610 }
1611
1612 // This function overrides poll() to handle condition variable wakeup fds
1613 static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) {
1614 if (timeout == 0) {
1615 // Don't bother using background threads for polling if timeout is 0,
1616 // poll-cv might not wait for a poll to return otherwise.
1617 // https://github.com/grpc/grpc/issues/13298
1618 return poll(fds, nfds, 0);
1619 }
1620 unsigned int i;
1621 int res, idx;
1622 grpc_cv_node* pollcv;
1623 int skip_poll = 0;
1624 nfds_t nsockfds = 0;
1625 poll_result* result = nullptr;
1626 gpr_mu_lock(&g_cvfds.mu);
1627 cache_harvest_locked();
1628 pollcv = static_cast<grpc_cv_node*>(gpr_malloc(sizeof(grpc_cv_node)));
1629 pollcv->next = nullptr;
1630 gpr_cv pollcv_cv;
1631 gpr_cv_init(&pollcv_cv);
1632 pollcv->cv = &pollcv_cv;
1633 grpc_cv_node* fd_cvs =
1634 static_cast<grpc_cv_node*>(gpr_malloc(nfds * sizeof(grpc_cv_node)));
1635
1636 for (i = 0; i < nfds; i++) {
1637 fds[i].revents = 0;
1638 if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
1639 idx = GRPC_FD_TO_IDX(fds[i].fd);
1640 fd_cvs[i].cv = &pollcv_cv;
1641 fd_cvs[i].prev = nullptr;
1642 fd_cvs[i].next = g_cvfds.cvfds[idx].cvs;
1643 if (g_cvfds.cvfds[idx].cvs) {
1644 g_cvfds.cvfds[idx].cvs->prev = &(fd_cvs[i]);
1645 }
1646 g_cvfds.cvfds[idx].cvs = &(fd_cvs[i]);
1647 // Don't bother polling if a wakeup fd is ready
1648 if (g_cvfds.cvfds[idx].is_set) {
1649 skip_poll = 1;
1650 }
1651 } else if (fds[i].fd >= 0) {
1652 nsockfds++;
1653 }
1654 }
1655
1656 gpr_timespec deadline = gpr_now(GPR_CLOCK_MONOTONIC);
1657 if (timeout < 0) {
1658 deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
1659 } else {
1660 deadline =
1661 gpr_time_add(deadline, gpr_time_from_millis(timeout, GPR_TIMESPAN));
1662 }
1663
1664 res = 0;
1665 if (!skip_poll && nsockfds > 0) {
1666 struct pollfd* pollfds = static_cast<struct pollfd*>(
1667 gpr_malloc(sizeof(struct pollfd) * nsockfds));
1668 idx = 0;
1669 for (i = 0; i < nfds; i++) {
1670 if (fds[i].fd >= 0) {
1671 pollfds[idx].fd = fds[i].fd;
1672 pollfds[idx].events = fds[i].events;
1673 pollfds[idx].revents = 0;
1674 idx++;
1675 }
1676 }
1677 poll_args* pargs = get_poller_locked(pollfds, nsockfds);
1678 result = pargs->result;
1679 pollcv->next = result->watchers;
1680 pollcv->prev = nullptr;
1681 if (result->watchers) {
1682 result->watchers->prev = pollcv;
1683 }
1684 result->watchers = pollcv;
1685 result->watchcount++;
1686 gpr_ref(&result->refcount);
1687
1688 pargs->trigger_set = 1;
1689 gpr_cv_signal(&pargs->trigger);
1690 gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
1691 cache_harvest_locked();
1692 res = result->retval;
1693 errno = result->err;
1694 result->watchcount--;
1695 remove_cvn(&result->watchers, pollcv);
1696 } else if (!skip_poll) {
1697 gpr_cv_wait(&pollcv_cv, &g_cvfds.mu, deadline);
1698 cache_harvest_locked();
1699 }
1700
1701 idx = 0;
1702 for (i = 0; i < nfds; i++) {
1703 if (fds[i].fd < 0 && (fds[i].events & POLLIN)) {
1704 remove_cvn(&g_cvfds.cvfds[GRPC_FD_TO_IDX(fds[i].fd)].cvs, &(fd_cvs[i]));
1705 if (g_cvfds.cvfds[GRPC_FD_TO_IDX(fds[i].fd)].is_set) {
1706 fds[i].revents = POLLIN;
1707 if (res >= 0) res++;
1708 }
1709 } else if (!skip_poll && fds[i].fd >= 0 && result->completed) {
1710 fds[i].revents = result->fds[idx].revents;
1711 idx++;
1712 }
1713 }
1714
1715 gpr_free(fd_cvs);
1716 gpr_free(pollcv);
1717 if (result) {
1718 decref_poll_result(result);
1719 }
1720
1721 gpr_mu_unlock(&g_cvfds.mu);
1722
1723 return res;
1724 }
1725
1726 static void global_cv_fd_table_init() {
1727 gpr_mu_init(&g_cvfds.mu);
1728 gpr_mu_lock(&g_cvfds.mu);
1729 gpr_cv_init(&g_cvfds.shutdown_cv);
1730 gpr_ref_init(&g_cvfds.pollcount, 1);
1731 g_cvfds.size = CV_DEFAULT_TABLE_SIZE;
1732 g_cvfds.cvfds = static_cast<grpc_fd_node*>(
1733 gpr_malloc(sizeof(grpc_fd_node) * CV_DEFAULT_TABLE_SIZE));
1734 g_cvfds.free_fds = nullptr;
1735 thread_grace = gpr_time_from_millis(POLLCV_THREAD_GRACE_MS, GPR_TIMESPAN);
1736 for (int i = 0; i < CV_DEFAULT_TABLE_SIZE; i++) {
1737 g_cvfds.cvfds[i].is_set = 0;
1738 g_cvfds.cvfds[i].cvs = nullptr;
1739 g_cvfds.cvfds[i].next_free = g_cvfds.free_fds;
1740 g_cvfds.free_fds = &g_cvfds.cvfds[i];
1741 }
1742 // Override the poll function with one that supports cvfds
1743 g_cvfds.poll = grpc_poll_function;
1744 grpc_poll_function = &cvfd_poll;
1745
1746 // Initialize the cache
1747 poll_cache.size = 32;
1748 poll_cache.count = 0;
1749 poll_cache.free_pollers = nullptr;
1750 poll_cache.active_pollers =
1751 static_cast<poll_args**>(gpr_malloc(sizeof(void*) * 32));
1752 for (unsigned int i = 0; i < poll_cache.size; i++) {
1753 poll_cache.active_pollers[i] = nullptr;
1754 }
1755 poll_cache.dead_pollers = nullptr;
1756
1757 gpr_mu_unlock(&g_cvfds.mu);
1758 }
1759
1760 static void global_cv_fd_table_shutdown() {
1761 gpr_mu_lock(&g_cvfds.mu);
1762 // Attempt to wait for all abandoned poll() threads to terminate
1763 // Not doing so will result in reported memory leaks
1764 if (!gpr_unref(&g_cvfds.pollcount)) {
1765 int res = gpr_cv_wait(&g_cvfds.shutdown_cv, &g_cvfds.mu,
1766 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
1767 gpr_time_from_seconds(3, GPR_TIMESPAN)));
1768 GPR_ASSERT(res == 0);
1769 }
1770 gpr_cv_destroy(&g_cvfds.shutdown_cv);
1771 grpc_poll_function = g_cvfds.poll;
1772 gpr_free(g_cvfds.cvfds);
1773
1774 cache_harvest_locked();
1775 gpr_free(poll_cache.active_pollers);
1776
1777 gpr_mu_unlock(&g_cvfds.mu);
1778 gpr_mu_destroy(&g_cvfds.mu);
1779 }
1780
1781 /*******************************************************************************
1782 * event engine binding
1783 */
1784
1785 static void shutdown_engine(void) {
1786 pollset_global_shutdown();
1787 if (grpc_cv_wakeup_fds_enabled()) {
1788 global_cv_fd_table_shutdown();
1789 }
1790 if (track_fds_for_fork) {
1791 gpr_mu_destroy(&fork_fd_list_mu);
1792 grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1793 }
1794 }
1795
1796 static const grpc_event_engine_vtable vtable = {
1797 sizeof(grpc_pollset),
1798 false,
1799
1800 fd_create,
1801 fd_wrapped_fd,
1802 fd_orphan,
1803 fd_shutdown,
1804 fd_notify_on_read,
1805 fd_notify_on_write,
1806 fd_notify_on_error,
1807 fd_set_readable,
1808 fd_set_writable,
1809 fd_set_error,
1810 fd_is_shutdown,
1811
1812 pollset_init,
1813 pollset_shutdown,
1814 pollset_destroy,
1815 pollset_work,
1816 pollset_kick,
1817 pollset_add_fd,
1818
1819 pollset_set_create,
1820 pollset_set_destroy,
1821 pollset_set_add_pollset,
1822 pollset_set_del_pollset,
1823 pollset_set_add_pollset_set,
1824 pollset_set_del_pollset_set,
1825 pollset_set_add_fd,
1826 pollset_set_del_fd,
1827
1828 shutdown_engine,
1829 };
1830
1831 /* Called by the child process's post-fork handler to close open fds, including
1832 * worker wakeup fds. This allows gRPC to shutdown in the child process without
1833 * interfering with connections or RPCs ongoing in the parent. */
1834 static void reset_event_manager_on_fork() {
1835 gpr_mu_lock(&fork_fd_list_mu);
1836 while (fork_fd_list_head != nullptr) {
1837 if (fork_fd_list_head->fd != nullptr) {
1838 close(fork_fd_list_head->fd->fd);
1839 fork_fd_list_head->fd->fd = -1;
1840 } else {
1841 close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
1842 fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
1843 close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
1844 fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
1845 }
1846 fork_fd_list_head = fork_fd_list_head->next;
1847 }
1848 gpr_mu_unlock(&fork_fd_list_mu);
1849 }
1850
1851 const grpc_event_engine_vtable* grpc_init_poll_posix(bool explicit_request) {
1852 if (!grpc_has_wakeup_fd()) {
1853 gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
1854 return nullptr;
1855 }
1856 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1857 return nullptr;
1858 }
1859 if (grpc_core::Fork::Enabled()) {
1860 track_fds_for_fork = true;
1861 gpr_mu_init(&fork_fd_list_mu);
1862 grpc_core::Fork::SetResetChildPollingEngineFunc(
1863 reset_event_manager_on_fork);
1864 }
1865 return &vtable;
1866 }
1867
1868 const grpc_event_engine_vtable* grpc_init_poll_cv_posix(bool explicit_request) {
1869 global_cv_fd_table_init();
1870 grpc_enable_cv_wakeup_fds(1);
1871 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1872 global_cv_fd_table_shutdown();
1873 grpc_enable_cv_wakeup_fds(0);
1874 return nullptr;
1875 }
1876 return &vtable;
1877 }
1878
1879 #endif /* GRPC_POSIX_SOCKET_EV_POLL */
1880