1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_POSIX_SOCKET_EV_POLL
24
25 #include "src/core/lib/iomgr/ev_poll_posix.h"
26
27 #include <assert.h>
28 #include <errno.h>
29 #include <limits.h>
30 #include <poll.h>
31 #include <string.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34
35 #include <string>
36
37 #include "absl/strings/str_cat.h"
38
39 #include <grpc/support/alloc.h>
40 #include <grpc/support/log.h>
41
42 #include "src/core/lib/debug/stats.h"
43 #include "src/core/lib/gpr/murmur_hash.h"
44 #include "src/core/lib/gpr/tls.h"
45 #include "src/core/lib/gpr/useful.h"
46 #include "src/core/lib/gprpp/thd.h"
47 #include "src/core/lib/iomgr/block_annotate.h"
48 #include "src/core/lib/iomgr/iomgr_internal.h"
49 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
50 #include "src/core/lib/profiling/timers.h"
51
52 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
53
54 /*******************************************************************************
55 * FD declarations
56 */
57 typedef struct grpc_fd_watcher {
58 struct grpc_fd_watcher* next;
59 struct grpc_fd_watcher* prev;
60 grpc_pollset* pollset;
61 grpc_pollset_worker* worker;
62 grpc_fd* fd;
63 } grpc_fd_watcher;
64
65 typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;
66
67 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
68 struct grpc_fork_fd_list {
69 /* Only one of fd or cached_wakeup_fd will be set. The unused field will be
70 set to nullptr. */
71 grpc_fd* fd;
72 grpc_cached_wakeup_fd* cached_wakeup_fd;
73
74 grpc_fork_fd_list* next;
75 grpc_fork_fd_list* prev;
76 };
77
78 struct grpc_fd {
79 int fd;
80 /* refst format:
81 bit0: 1=active/0=orphaned
82 bit1-n: refcount
83 meaning that mostly we ref by two to avoid altering the orphaned bit,
84 and just unref by 1 when we're ready to flag the object as orphaned */
85 gpr_atm refst;
86
87 gpr_mu mu;
88 int shutdown;
89 int closed;
90 int released;
91 gpr_atm pollhup;
92 grpc_error* shutdown_error;
93
94 /* The watcher list.
95
96 The following watcher related fields are protected by watcher_mu.
97
98 An fd_watcher is an ephemeral object created when an fd wants to
99 begin polling, and destroyed after the poll.
100
101 It denotes the fd's interest in whether to read poll or write poll
102 or both or neither on this fd.
103
104 If a watcher is asked to poll for reads or writes, the read_watcher
105 or write_watcher fields are set respectively. A watcher may be asked
106 to poll for both, in which case both fields will be set.
107
108 read_watcher and write_watcher may be NULL if no watcher has been
109 asked to poll for reads or writes.
110
111 If an fd_watcher is not asked to poll for reads or writes, it's added
112 to a linked list of inactive watchers, rooted at inactive_watcher_root.
113 If at a later time there becomes need of a poller to poll, one of
114 the inactive pollers may be kicked out of their poll loops to take
115 that responsibility. */
116 grpc_fd_watcher inactive_watcher_root;
117 grpc_fd_watcher* read_watcher;
118 grpc_fd_watcher* write_watcher;
119
120 grpc_closure* read_closure;
121 grpc_closure* write_closure;
122
123 grpc_closure* on_done_closure;
124
125 grpc_iomgr_object iomgr_object;
126
127 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
128 grpc_fork_fd_list* fork_fd_list;
129 };
130
131 /* True when GRPC_ENABLE_FORK_SUPPORT=1. */
132 static bool track_fds_for_fork = false;
133
134 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
135 static grpc_fork_fd_list* fork_fd_list_head = nullptr;
136 static gpr_mu fork_fd_list_mu;
137
138 /* Begin polling on an fd.
139 Registers that the given pollset is interested in this fd - so that if read
140 or writability interest changes, the pollset can be kicked to pick up that
141 new interest.
142 Return value is:
143 (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
144 i.e. a combination of read_mask and write_mask determined by the fd's current
145 interest in said events.
146 Polling strategies that do not need to alter their behavior depending on the
147 fd's current interest (such as epoll) do not need to call this function.
148 MUST NOT be called with a pollset lock taken */
149 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
150 grpc_pollset_worker* worker, uint32_t read_mask,
151 uint32_t write_mask, grpc_fd_watcher* watcher);
152 /* Complete polling previously started with fd_begin_poll
153 MUST NOT be called with a pollset lock taken
154 if got_read or got_write are 1, also does the become_{readable,writable} as
155 appropriate. */
156 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write);
157
158 /* Return 1 if this fd is orphaned, 0 otherwise */
159 static bool fd_is_orphaned(grpc_fd* fd);
160
161 #ifndef NDEBUG
162 static void fd_ref(grpc_fd* fd, const char* reason, const char* file, int line);
163 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
164 int line);
165 #define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
166 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
167 #else
168 static void fd_ref(grpc_fd* fd);
169 static void fd_unref(grpc_fd* fd);
170 #define GRPC_FD_REF(fd, reason) fd_ref(fd)
171 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
172 #endif
173
174 #define CLOSURE_NOT_READY ((grpc_closure*)0)
175 #define CLOSURE_READY ((grpc_closure*)1)
176
177 /*******************************************************************************
178 * pollset declarations
179 */
180
181 typedef struct grpc_cached_wakeup_fd {
182 grpc_wakeup_fd fd;
183 struct grpc_cached_wakeup_fd* next;
184
185 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
186 grpc_fork_fd_list* fork_fd_list;
187 } grpc_cached_wakeup_fd;
188
189 struct grpc_pollset_worker {
190 grpc_cached_wakeup_fd* wakeup_fd;
191 int reevaluate_polling_on_wakeup;
192 int kicked_specifically;
193 struct grpc_pollset_worker* next;
194 struct grpc_pollset_worker* prev;
195 };
196
197 struct grpc_pollset {
198 gpr_mu mu;
199 grpc_pollset_worker root_worker;
200 int shutting_down;
201 int called_shutdown;
202 int kicked_without_pollers;
203 grpc_closure* shutdown_done;
204 int pollset_set_count;
205 /* all polled fds */
206 size_t fd_count;
207 size_t fd_capacity;
208 grpc_fd** fds;
209 /* Local cache of eventfds for workers */
210 grpc_cached_wakeup_fd* local_wakeup_cache;
211 };
212
213 /* Add an fd to a pollset */
214 static void pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd);
215
216 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
217
218 /* Convert a timespec to milliseconds:
219 - very small or negative poll times are clamped to zero to do a
220 non-blocking poll (which becomes spin polling)
221 - other small values are rounded up to one millisecond
222 - longer than a millisecond polls are rounded up to the next nearest
223 millisecond to avoid spinning
224 - infinite timeouts are converted to -1 */
225 static int poll_deadline_to_millis_timeout(grpc_millis deadline);
226
227 /* Allow kick to wakeup the currently polling worker */
228 #define GRPC_POLLSET_CAN_KICK_SELF 1
229 /* Force the wakee to repoll when awoken */
230 #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
231 /* As per pollset_kick, with an extended set of flags (defined above)
232 -- mostly for fd_posix's use. */
233 static grpc_error* pollset_kick_ext(grpc_pollset* p,
234 grpc_pollset_worker* specific_worker,
235 uint32_t flags) GRPC_MUST_USE_RESULT;
236
237 /* Return 1 if the pollset has active threads in pollset_work (pollset must
238 * be locked) */
239 static bool pollset_has_workers(grpc_pollset* pollset);
240
241 /*******************************************************************************
242 * pollset_set definitions
243 */
244
245 struct grpc_pollset_set {
246 gpr_mu mu;
247
248 size_t pollset_count;
249 size_t pollset_capacity;
250 grpc_pollset** pollsets;
251
252 size_t pollset_set_count;
253 size_t pollset_set_capacity;
254 struct grpc_pollset_set** pollset_sets;
255
256 size_t fd_count;
257 size_t fd_capacity;
258 grpc_fd** fds;
259 };
260
261 /*******************************************************************************
262 * functions to track opened fds. No-ops unless track_fds_for_fork is true.
263 */
264
fork_fd_list_remove_node(grpc_fork_fd_list * node)265 static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
266 if (track_fds_for_fork) {
267 gpr_mu_lock(&fork_fd_list_mu);
268 if (fork_fd_list_head == node) {
269 fork_fd_list_head = node->next;
270 }
271 if (node->prev != nullptr) {
272 node->prev->next = node->next;
273 }
274 if (node->next != nullptr) {
275 node->next->prev = node->prev;
276 }
277 gpr_free(node);
278 gpr_mu_unlock(&fork_fd_list_mu);
279 }
280 }
281
fork_fd_list_add_node(grpc_fork_fd_list * node)282 static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
283 gpr_mu_lock(&fork_fd_list_mu);
284 node->next = fork_fd_list_head;
285 node->prev = nullptr;
286 if (fork_fd_list_head != nullptr) {
287 fork_fd_list_head->prev = node;
288 }
289 fork_fd_list_head = node;
290 gpr_mu_unlock(&fork_fd_list_mu);
291 }
292
fork_fd_list_add_grpc_fd(grpc_fd * fd)293 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
294 if (track_fds_for_fork) {
295 fd->fork_fd_list =
296 static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
297 fd->fork_fd_list->fd = fd;
298 fd->fork_fd_list->cached_wakeup_fd = nullptr;
299 fork_fd_list_add_node(fd->fork_fd_list);
300 }
301 }
302
fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd * fd)303 static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
304 if (track_fds_for_fork) {
305 fd->fork_fd_list =
306 static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
307 fd->fork_fd_list->cached_wakeup_fd = fd;
308 fd->fork_fd_list->fd = nullptr;
309 fork_fd_list_add_node(fd->fork_fd_list);
310 }
311 }
312
313 /*******************************************************************************
314 * fd_posix.c
315 */
316
317 #ifndef NDEBUG
318 #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
319 #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
ref_by(grpc_fd * fd,int n,const char * reason,const char * file,int line)320 static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
321 int line) {
322 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
323 gpr_log(GPR_DEBUG,
324 "FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
325 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
326 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
327 }
328 #else
329 #define REF_BY(fd, n, reason) \
330 do { \
331 ref_by(fd, n); \
332 (void)(reason); \
333 } while (0)
334 #define UNREF_BY(fd, n, reason) \
335 do { \
336 unref_by(fd, n); \
337 (void)(reason); \
338 } while (0)
339 static void ref_by(grpc_fd* fd, int n) {
340 #endif
341 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
342 }
343
344 #ifndef NDEBUG
345 static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
346 int line) {
347 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
348 gpr_log(GPR_DEBUG,
349 "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
350 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
351 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
352 }
353 #else
354 static void unref_by(grpc_fd* fd, int n) {
355 #endif
356 gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
357 if (old == n) {
358 gpr_mu_destroy(&fd->mu);
359 grpc_iomgr_unregister_object(&fd->iomgr_object);
360 fork_fd_list_remove_node(fd->fork_fd_list);
361 if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
362 gpr_free(fd);
363 } else {
364 GPR_ASSERT(old > n);
365 }
366 }
367
368 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
369 // Avoid unused-parameter warning for debug-only parameter
370 (void)track_err;
371 GPR_DEBUG_ASSERT(track_err == false);
372 grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r)));
373 gpr_mu_init(&r->mu);
374 gpr_atm_rel_store(&r->refst, 1);
375 r->shutdown = 0;
376 r->read_closure = CLOSURE_NOT_READY;
377 r->write_closure = CLOSURE_NOT_READY;
378 r->fd = fd;
379 r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
380 &r->inactive_watcher_root;
381 r->read_watcher = r->write_watcher = nullptr;
382 r->on_done_closure = nullptr;
383 r->closed = 0;
384 r->released = 0;
385 gpr_atm_no_barrier_store(&r->pollhup, 0);
386
387 std::string name2 = absl::StrCat(name, " fd=", fd);
388 grpc_iomgr_register_object(&r->iomgr_object, name2.c_str());
389 fork_fd_list_add_grpc_fd(r);
390 return r;
391 }
392
393 static bool fd_is_orphaned(grpc_fd* fd) {
394 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
395 }
396
397 static grpc_error* pollset_kick_locked(grpc_fd_watcher* watcher) {
398 gpr_mu_lock(&watcher->pollset->mu);
399 GPR_ASSERT(watcher->worker);
400 grpc_error* err = pollset_kick_ext(watcher->pollset, watcher->worker,
401 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
402 gpr_mu_unlock(&watcher->pollset->mu);
403 return err;
404 }
405
406 static void maybe_wake_one_watcher_locked(grpc_fd* fd) {
407 if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
408 pollset_kick_locked(fd->inactive_watcher_root.next);
409 } else if (fd->read_watcher) {
410 pollset_kick_locked(fd->read_watcher);
411 } else if (fd->write_watcher) {
412 pollset_kick_locked(fd->write_watcher);
413 }
414 }
415
416 static void wake_all_watchers_locked(grpc_fd* fd) {
417 grpc_fd_watcher* watcher;
418 for (watcher = fd->inactive_watcher_root.next;
419 watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
420 pollset_kick_locked(watcher);
421 }
422 if (fd->read_watcher) {
423 pollset_kick_locked(fd->read_watcher);
424 }
425 if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
426 pollset_kick_locked(fd->write_watcher);
427 }
428 }
429
430 static int has_watchers(grpc_fd* fd) {
431 return fd->read_watcher != nullptr || fd->write_watcher != nullptr ||
432 fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
433 }
434
435 static void close_fd_locked(grpc_fd* fd) {
436 fd->closed = 1;
437 if (!fd->released) {
438 close(fd->fd);
439 }
440 grpc_core::ExecCtx::Run(DEBUG_LOCATION, fd->on_done_closure, GRPC_ERROR_NONE);
441 }
442
443 static int fd_wrapped_fd(grpc_fd* fd) {
444 if (fd->released || fd->closed) {
445 return -1;
446 } else {
447 return fd->fd;
448 }
449 }
450
451 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
452 const char* reason) {
453 fd->on_done_closure = on_done;
454 fd->released = release_fd != nullptr;
455 if (release_fd != nullptr) {
456 *release_fd = fd->fd;
457 fd->released = true;
458 }
459 gpr_mu_lock(&fd->mu);
460 REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
461 if (!has_watchers(fd)) {
462 close_fd_locked(fd);
463 } else {
464 wake_all_watchers_locked(fd);
465 }
466 gpr_mu_unlock(&fd->mu);
467 UNREF_BY(fd, 2, reason); /* drop the reference */
468 }
469
470 /* increment refcount by two to avoid changing the orphan bit */
471 #ifndef NDEBUG
472 static void fd_ref(grpc_fd* fd, const char* reason, const char* file,
473 int line) {
474 ref_by(fd, 2, reason, file, line);
475 }
476
477 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
478 int line) {
479 unref_by(fd, 2, reason, file, line);
480 }
481 #else
482 static void fd_ref(grpc_fd* fd) { ref_by(fd, 2); }
483
484 static void fd_unref(grpc_fd* fd) { unref_by(fd, 2); }
485 #endif
486
487 static grpc_error* fd_shutdown_error(grpc_fd* fd) {
488 if (!fd->shutdown) {
489 return GRPC_ERROR_NONE;
490 } else {
491 return grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
492 "FD shutdown", &fd->shutdown_error, 1),
493 GRPC_ERROR_INT_GRPC_STATUS,
494 GRPC_STATUS_UNAVAILABLE);
495 }
496 }
497
498 static void notify_on_locked(grpc_fd* fd, grpc_closure** st,
499 grpc_closure* closure) {
500 if (fd->shutdown || gpr_atm_no_barrier_load(&fd->pollhup)) {
501 grpc_core::ExecCtx::Run(
502 DEBUG_LOCATION, closure,
503 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"),
504 GRPC_ERROR_INT_GRPC_STATUS,
505 GRPC_STATUS_UNAVAILABLE));
506 } else if (*st == CLOSURE_NOT_READY) {
507 /* not ready ==> switch to a waiting state by setting the closure */
508 *st = closure;
509 } else if (*st == CLOSURE_READY) {
510 /* already ready ==> queue the closure to run immediately */
511 *st = CLOSURE_NOT_READY;
512 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, fd_shutdown_error(fd));
513 maybe_wake_one_watcher_locked(fd);
514 } else {
515 /* upcallptr was set to a different closure. This is an error! */
516 gpr_log(GPR_ERROR,
517 "User called a notify_on function with a previous callback still "
518 "pending");
519 abort();
520 }
521 }
522
523 /* returns 1 if state becomes not ready */
524 static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {
525 if (*st == CLOSURE_READY) {
526 /* duplicate ready ==> ignore */
527 return 0;
528 } else if (*st == CLOSURE_NOT_READY) {
529 /* not ready, and not waiting ==> flag ready */
530 *st = CLOSURE_READY;
531 return 0;
532 } else {
533 /* waiting ==> queue closure */
534 grpc_core::ExecCtx::Run(DEBUG_LOCATION, *st, fd_shutdown_error(fd));
535 *st = CLOSURE_NOT_READY;
536 return 1;
537 }
538 }
539
540 static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
541 gpr_mu_lock(&fd->mu);
542 /* only shutdown once */
543 if (!fd->shutdown) {
544 fd->shutdown = 1;
545 fd->shutdown_error = why;
546 /* signal read/write closed to OS so that future operations fail */
547 shutdown(fd->fd, SHUT_RDWR);
548 set_ready_locked(fd, &fd->read_closure);
549 set_ready_locked(fd, &fd->write_closure);
550 } else {
551 GRPC_ERROR_UNREF(why);
552 }
553 gpr_mu_unlock(&fd->mu);
554 }
555
556 static bool fd_is_shutdown(grpc_fd* fd) {
557 gpr_mu_lock(&fd->mu);
558 bool r = fd->shutdown;
559 gpr_mu_unlock(&fd->mu);
560 return r;
561 }
562
563 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
564 gpr_mu_lock(&fd->mu);
565 notify_on_locked(fd, &fd->read_closure, closure);
566 gpr_mu_unlock(&fd->mu);
567 }
568
569 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
570 gpr_mu_lock(&fd->mu);
571 notify_on_locked(fd, &fd->write_closure, closure);
572 gpr_mu_unlock(&fd->mu);
573 }
574
575 static void fd_notify_on_error(grpc_fd* /*fd*/, grpc_closure* closure) {
576 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
577 gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
578 }
579 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_CANCELLED);
580 }
581
582 static void fd_set_readable(grpc_fd* fd) {
583 gpr_mu_lock(&fd->mu);
584 set_ready_locked(fd, &fd->read_closure);
585 gpr_mu_unlock(&fd->mu);
586 }
587
588 static void fd_set_writable(grpc_fd* fd) {
589 gpr_mu_lock(&fd->mu);
590 set_ready_locked(fd, &fd->write_closure);
591 gpr_mu_unlock(&fd->mu);
592 }
593
594 static void fd_set_error(grpc_fd* /*fd*/) {
595 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
596 gpr_log(GPR_ERROR, "Polling engine does not support tracking errors.");
597 }
598 }
599
600 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
601 grpc_pollset_worker* worker, uint32_t read_mask,
602 uint32_t write_mask, grpc_fd_watcher* watcher) {
603 uint32_t mask = 0;
604 grpc_closure* cur;
605 int requested;
606 /* keep track of pollers that have requested our events, in case they change
607 */
608 GRPC_FD_REF(fd, "poll");
609
610 gpr_mu_lock(&fd->mu);
611
612 /* if we are shutdown, then don't add to the watcher set */
613 if (fd->shutdown) {
614 watcher->fd = nullptr;
615 watcher->pollset = nullptr;
616 watcher->worker = nullptr;
617 gpr_mu_unlock(&fd->mu);
618 GRPC_FD_UNREF(fd, "poll");
619 return 0;
620 }
621
622 /* if there is nobody polling for read, but we need to, then start doing so */
623 cur = fd->read_closure;
624 requested = cur != CLOSURE_READY;
625 if (read_mask && fd->read_watcher == nullptr && requested) {
626 fd->read_watcher = watcher;
627 mask |= read_mask;
628 }
629 /* if there is nobody polling for write, but we need to, then start doing so
630 */
631 cur = fd->write_closure;
632 requested = cur != CLOSURE_READY;
633 if (write_mask && fd->write_watcher == nullptr && requested) {
634 fd->write_watcher = watcher;
635 mask |= write_mask;
636 }
637 /* if not polling, remember this watcher in case we need someone to later */
638 if (mask == 0 && worker != nullptr) {
639 watcher->next = &fd->inactive_watcher_root;
640 watcher->prev = watcher->next->prev;
641 watcher->next->prev = watcher->prev->next = watcher;
642 }
643 watcher->pollset = pollset;
644 watcher->worker = worker;
645 watcher->fd = fd;
646 gpr_mu_unlock(&fd->mu);
647
648 return mask;
649 }
650
651 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) {
652 int was_polling = 0;
653 int kick = 0;
654 grpc_fd* fd = watcher->fd;
655
656 if (fd == nullptr) {
657 return;
658 }
659
660 gpr_mu_lock(&fd->mu);
661
662 if (watcher == fd->read_watcher) {
663 /* remove read watcher, kick if we still need a read */
664 was_polling = 1;
665 if (!got_read) {
666 kick = 1;
667 }
668 fd->read_watcher = nullptr;
669 }
670 if (watcher == fd->write_watcher) {
671 /* remove write watcher, kick if we still need a write */
672 was_polling = 1;
673 if (!got_write) {
674 kick = 1;
675 }
676 fd->write_watcher = nullptr;
677 }
678 if (!was_polling && watcher->worker != nullptr) {
679 /* remove from inactive list */
680 watcher->next->prev = watcher->prev;
681 watcher->prev->next = watcher->next;
682 }
683 if (got_read) {
684 if (set_ready_locked(fd, &fd->read_closure)) {
685 kick = 1;
686 }
687 }
688 if (got_write) {
689 if (set_ready_locked(fd, &fd->write_closure)) {
690 kick = 1;
691 }
692 }
693 if (kick) {
694 maybe_wake_one_watcher_locked(fd);
695 }
696 if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
697 close_fd_locked(fd);
698 }
699 gpr_mu_unlock(&fd->mu);
700
701 GRPC_FD_UNREF(fd, "poll");
702 }
703
704 /*******************************************************************************
705 * pollset_posix.c
706 */
707
708 GPR_TLS_DECL(g_current_thread_poller);
709 GPR_TLS_DECL(g_current_thread_worker);
710
711 static void remove_worker(grpc_pollset* /*p*/, grpc_pollset_worker* worker) {
712 worker->prev->next = worker->next;
713 worker->next->prev = worker->prev;
714 }
715
716 static bool pollset_has_workers(grpc_pollset* p) {
717 return p->root_worker.next != &p->root_worker;
718 }
719
720 static bool pollset_in_pollset_sets(grpc_pollset* p) {
721 return p->pollset_set_count;
722 }
723
724 static bool pollset_has_observers(grpc_pollset* p) {
725 return pollset_has_workers(p) || pollset_in_pollset_sets(p);
726 }
727
728 static grpc_pollset_worker* pop_front_worker(grpc_pollset* p) {
729 if (pollset_has_workers(p)) {
730 grpc_pollset_worker* w = p->root_worker.next;
731 remove_worker(p, w);
732 return w;
733 } else {
734 return nullptr;
735 }
736 }
737
738 static void push_back_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
739 worker->next = &p->root_worker;
740 worker->prev = worker->next->prev;
741 worker->prev->next = worker->next->prev = worker;
742 }
743
744 static void push_front_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
745 worker->prev = &p->root_worker;
746 worker->next = worker->prev->next;
747 worker->prev->next = worker->next->prev = worker;
748 }
749
750 static void kick_append_error(grpc_error** composite, grpc_error* error) {
751 if (error == GRPC_ERROR_NONE) return;
752 if (*composite == GRPC_ERROR_NONE) {
753 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Kick Failure");
754 }
755 *composite = grpc_error_add_child(*composite, error);
756 }
757
758 static grpc_error* pollset_kick_ext(grpc_pollset* p,
759 grpc_pollset_worker* specific_worker,
760 uint32_t flags) {
761 GPR_TIMER_SCOPE("pollset_kick_ext", 0);
762 grpc_error* error = GRPC_ERROR_NONE;
763 GRPC_STATS_INC_POLLSET_KICK();
764
765 /* pollset->mu already held */
766 if (specific_worker != nullptr) {
767 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
768 GPR_TIMER_SCOPE("pollset_kick_ext.broadcast", 0);
769 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
770 for (specific_worker = p->root_worker.next;
771 specific_worker != &p->root_worker;
772 specific_worker = specific_worker->next) {
773 kick_append_error(
774 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
775 }
776 p->kicked_without_pollers = true;
777 } else if (gpr_tls_get(&g_current_thread_worker) !=
778 reinterpret_cast<intptr_t>(specific_worker)) {
779 GPR_TIMER_MARK("different_thread_worker", 0);
780 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
781 specific_worker->reevaluate_polling_on_wakeup = true;
782 }
783 specific_worker->kicked_specifically = true;
784 kick_append_error(&error,
785 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
786 } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
787 GPR_TIMER_MARK("kick_yoself", 0);
788 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
789 specific_worker->reevaluate_polling_on_wakeup = true;
790 }
791 specific_worker->kicked_specifically = true;
792 kick_append_error(&error,
793 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
794 }
795 } else if (gpr_tls_get(&g_current_thread_poller) !=
796 reinterpret_cast<intptr_t>(p)) {
797 GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
798 GPR_TIMER_MARK("kick_anonymous", 0);
799 specific_worker = pop_front_worker(p);
800 if (specific_worker != nullptr) {
801 if (gpr_tls_get(&g_current_thread_worker) ==
802 reinterpret_cast<intptr_t>(specific_worker)) {
803 GPR_TIMER_MARK("kick_anonymous_not_self", 0);
804 push_back_worker(p, specific_worker);
805 specific_worker = pop_front_worker(p);
806 if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
807 gpr_tls_get(&g_current_thread_worker) ==
808 reinterpret_cast<intptr_t>(specific_worker)) {
809 push_back_worker(p, specific_worker);
810 specific_worker = nullptr;
811 }
812 }
813 if (specific_worker != nullptr) {
814 GPR_TIMER_MARK("finally_kick", 0);
815 push_back_worker(p, specific_worker);
816 kick_append_error(
817 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
818 }
819 } else {
820 GPR_TIMER_MARK("kicked_no_pollers", 0);
821 p->kicked_without_pollers = true;
822 }
823 }
824
825 GRPC_LOG_IF_ERROR("pollset_kick_ext", GRPC_ERROR_REF(error));
826 return error;
827 }
828
829 static grpc_error* pollset_kick(grpc_pollset* p,
830 grpc_pollset_worker* specific_worker) {
831 return pollset_kick_ext(p, specific_worker, 0);
832 }
833
834 /* global state management */
835
836 static grpc_error* pollset_global_init(void) {
837 gpr_tls_init(&g_current_thread_poller);
838 gpr_tls_init(&g_current_thread_worker);
839 return GRPC_ERROR_NONE;
840 }
841
842 static void pollset_global_shutdown(void) {
843 gpr_tls_destroy(&g_current_thread_poller);
844 gpr_tls_destroy(&g_current_thread_worker);
845 }
846
847 /* main interface */
848
849 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
850 gpr_mu_init(&pollset->mu);
851 *mu = &pollset->mu;
852 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
853 pollset->shutting_down = 0;
854 pollset->called_shutdown = 0;
855 pollset->kicked_without_pollers = 0;
856 pollset->local_wakeup_cache = nullptr;
857 pollset->kicked_without_pollers = 0;
858 pollset->fd_count = 0;
859 pollset->fd_capacity = 0;
860 pollset->fds = nullptr;
861 pollset->pollset_set_count = 0;
862 }
863
864 static void pollset_destroy(grpc_pollset* pollset) {
865 GPR_ASSERT(!pollset_has_workers(pollset));
866 while (pollset->local_wakeup_cache) {
867 grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
868 fork_fd_list_remove_node(pollset->local_wakeup_cache->fork_fd_list);
869 grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
870 gpr_free(pollset->local_wakeup_cache);
871 pollset->local_wakeup_cache = next;
872 }
873 gpr_free(pollset->fds);
874 gpr_mu_destroy(&pollset->mu);
875 }
876
877 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
878 gpr_mu_lock(&pollset->mu);
879 size_t i;
880 /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
881 for (i = 0; i < pollset->fd_count; i++) {
882 if (pollset->fds[i] == fd) goto exit;
883 }
884 if (pollset->fd_count == pollset->fd_capacity) {
885 pollset->fd_capacity =
886 GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
887 pollset->fds = static_cast<grpc_fd**>(
888 gpr_realloc(pollset->fds, sizeof(grpc_fd*) * pollset->fd_capacity));
889 }
890 pollset->fds[pollset->fd_count++] = fd;
891 GRPC_FD_REF(fd, "multipoller");
892 pollset_kick(pollset, nullptr);
893 exit:
894 gpr_mu_unlock(&pollset->mu);
895 }
896
897 static void finish_shutdown(grpc_pollset* pollset) {
898 size_t i;
899 for (i = 0; i < pollset->fd_count; i++) {
900 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
901 }
902 pollset->fd_count = 0;
903 grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_done,
904 GRPC_ERROR_NONE);
905 }
906
907 static void work_combine_error(grpc_error** composite, grpc_error* error) {
908 if (error == GRPC_ERROR_NONE) return;
909 if (*composite == GRPC_ERROR_NONE) {
910 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("pollset_work");
911 }
912 *composite = grpc_error_add_child(*composite, error);
913 }
914
915 static grpc_error* pollset_work(grpc_pollset* pollset,
916 grpc_pollset_worker** worker_hdl,
917 grpc_millis deadline) {
918 GPR_TIMER_SCOPE("pollset_work", 0);
919 grpc_pollset_worker worker;
920 if (worker_hdl) *worker_hdl = &worker;
921 grpc_error* error = GRPC_ERROR_NONE;
922
923 /* Avoid malloc for small number of elements. */
924 enum { inline_elements = 96 };
925 struct pollfd pollfd_space[inline_elements];
926 struct grpc_fd_watcher watcher_space[inline_elements];
927
928 /* pollset->mu already held */
929 int added_worker = 0;
930 int locked = 1;
931 int queued_work = 0;
932 int keep_polling = 0;
933 /* this must happen before we (potentially) drop pollset->mu */
934 worker.next = worker.prev = nullptr;
935 worker.reevaluate_polling_on_wakeup = 0;
936 if (pollset->local_wakeup_cache != nullptr) {
937 worker.wakeup_fd = pollset->local_wakeup_cache;
938 pollset->local_wakeup_cache = worker.wakeup_fd->next;
939 } else {
940 worker.wakeup_fd = static_cast<grpc_cached_wakeup_fd*>(
941 gpr_malloc(sizeof(*worker.wakeup_fd)));
942 error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
943 fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
944 if (error != GRPC_ERROR_NONE) {
945 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
946 return error;
947 }
948 }
949 worker.kicked_specifically = 0;
950 /* If we're shutting down then we don't execute any extended work */
951 if (pollset->shutting_down) {
952 GPR_TIMER_MARK("pollset_work.shutting_down", 0);
953 goto done;
954 }
955 /* Start polling, and keep doing so while we're being asked to
956 re-evaluate our pollers (this allows poll() based pollers to
957 ensure they don't miss wakeups) */
958 keep_polling = 1;
959 gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
960 while (keep_polling) {
961 keep_polling = 0;
962 if (!pollset->kicked_without_pollers ||
963 deadline <= grpc_core::ExecCtx::Get()->Now()) {
964 if (!added_worker) {
965 push_front_worker(pollset, &worker);
966 added_worker = 1;
967 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
968 }
969 GPR_TIMER_SCOPE("maybe_work_and_unlock", 0);
970 #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
971 #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
972
973 int timeout;
974 int r;
975 size_t i, fd_count;
976 nfds_t pfd_count;
977 grpc_fd_watcher* watchers;
978 struct pollfd* pfds;
979
980 timeout = poll_deadline_to_millis_timeout(deadline);
981
982 if (pollset->fd_count + 2 <= inline_elements) {
983 pfds = pollfd_space;
984 watchers = watcher_space;
985 } else {
986 /* Allocate one buffer to hold both pfds and watchers arrays */
987 const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2);
988 const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2);
989 void* buf = gpr_malloc(pfd_size + watch_size);
990 pfds = static_cast<struct pollfd*>(buf);
991 watchers = static_cast<grpc_fd_watcher*>(
992 static_cast<void*>((static_cast<char*>(buf) + pfd_size)));
993 }
994
995 fd_count = 0;
996 pfd_count = 1;
997 pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
998 pfds[0].events = POLLIN;
999 pfds[0].revents = 0;
1000 for (i = 0; i < pollset->fd_count; i++) {
1001 if (fd_is_orphaned(pollset->fds[i]) ||
1002 gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) {
1003 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
1004 } else {
1005 pollset->fds[fd_count++] = pollset->fds[i];
1006 watchers[pfd_count].fd = pollset->fds[i];
1007 GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
1008 pfds[pfd_count].fd = pollset->fds[i]->fd;
1009 pfds[pfd_count].revents = 0;
1010 pfd_count++;
1011 }
1012 }
1013 pollset->fd_count = fd_count;
1014 gpr_mu_unlock(&pollset->mu);
1015
1016 for (i = 1; i < pfd_count; i++) {
1017 grpc_fd* fd = watchers[i].fd;
1018 pfds[i].events = static_cast<short>(
1019 fd_begin_poll(fd, pollset, &worker, POLLIN, POLLOUT, &watchers[i]));
1020 GRPC_FD_UNREF(fd, "multipoller_start");
1021 }
1022
1023 /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
1024 even going into the blocking annotation if possible */
1025 GRPC_SCHEDULING_START_BLOCKING_REGION;
1026 GRPC_STATS_INC_SYSCALL_POLL();
1027 r = grpc_poll_function(pfds, pfd_count, timeout);
1028 GRPC_SCHEDULING_END_BLOCKING_REGION;
1029
1030 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1031 gpr_log(GPR_INFO, "%p poll=%d", pollset, r);
1032 }
1033
1034 if (r < 0) {
1035 if (errno != EINTR) {
1036 work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
1037 }
1038
1039 for (i = 1; i < pfd_count; i++) {
1040 if (watchers[i].fd == nullptr) {
1041 fd_end_poll(&watchers[i], 0, 0);
1042 } else {
1043 // Wake up all the file descriptors, if we have an invalid one
1044 // we can identify it on the next pollset_work()
1045 fd_end_poll(&watchers[i], 1, 1);
1046 }
1047 }
1048 } else if (r == 0) {
1049 for (i = 1; i < pfd_count; i++) {
1050 fd_end_poll(&watchers[i], 0, 0);
1051 }
1052 } else {
1053 if (pfds[0].revents & POLLIN_CHECK) {
1054 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1055 gpr_log(GPR_INFO, "%p: got_wakeup", pollset);
1056 }
1057 work_combine_error(
1058 &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
1059 }
1060 for (i = 1; i < pfd_count; i++) {
1061 if (watchers[i].fd == nullptr) {
1062 fd_end_poll(&watchers[i], 0, 0);
1063 } else {
1064 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1065 gpr_log(GPR_INFO, "%p got_event: %d r:%d w:%d [%d]", pollset,
1066 pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
1067 (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
1068 }
1069 /* This is a mitigation to prevent poll() from spinning on a
1070 ** POLLHUP https://github.com/grpc/grpc/pull/13665
1071 */
1072 if (pfds[i].revents & POLLHUP) {
1073 gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
1074 }
1075 fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
1076 pfds[i].revents & POLLOUT_CHECK);
1077 }
1078 }
1079 }
1080
1081 if (pfds != pollfd_space) {
1082 /* pfds and watchers are in the same memory block pointed to by pfds */
1083 gpr_free(pfds);
1084 }
1085
1086 locked = 0;
1087 } else {
1088 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1089 pollset->kicked_without_pollers = 0;
1090 }
1091 /* Finished execution - start cleaning up.
1092 Note that we may arrive here from outside the enclosing while() loop.
1093 In that case we won't loop though as we haven't added worker to the
1094 worker list, which means nobody could ask us to re-evaluate polling). */
1095 done:
1096 if (!locked) {
1097 queued_work |= grpc_core::ExecCtx::Get()->Flush();
1098 gpr_mu_lock(&pollset->mu);
1099 locked = 1;
1100 }
1101 /* If we're forced to re-evaluate polling (via pollset_kick with
1102 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
1103 a loop */
1104 if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) {
1105 worker.reevaluate_polling_on_wakeup = 0;
1106 pollset->kicked_without_pollers = 0;
1107 if (queued_work || worker.kicked_specifically) {
1108 /* If there's queued work on the list, then set the deadline to be
1109 immediate so we get back out of the polling loop quickly */
1110 deadline = 0;
1111 }
1112 keep_polling = 1;
1113 }
1114 }
1115 gpr_tls_set(&g_current_thread_poller, 0);
1116 if (added_worker) {
1117 remove_worker(pollset, &worker);
1118 gpr_tls_set(&g_current_thread_worker, 0);
1119 }
1120 /* release wakeup fd to the local pool */
1121 worker.wakeup_fd->next = pollset->local_wakeup_cache;
1122 pollset->local_wakeup_cache = worker.wakeup_fd;
1123 /* check shutdown conditions */
1124 if (pollset->shutting_down) {
1125 if (pollset_has_workers(pollset)) {
1126 pollset_kick(pollset, nullptr);
1127 } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1128 pollset->called_shutdown = 1;
1129 gpr_mu_unlock(&pollset->mu);
1130 finish_shutdown(pollset);
1131 grpc_core::ExecCtx::Get()->Flush();
1132 /* Continuing to access pollset here is safe -- it is the caller's
1133 * responsibility to not destroy when it has outstanding calls to
1134 * pollset_work.
1135 * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
1136 gpr_mu_lock(&pollset->mu);
1137 }
1138 }
1139 if (worker_hdl) *worker_hdl = nullptr;
1140 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1141 return error;
1142 }
1143
1144 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
1145 GPR_ASSERT(!pollset->shutting_down);
1146 pollset->shutting_down = 1;
1147 pollset->shutdown_done = closure;
1148 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1149 if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1150 pollset->called_shutdown = 1;
1151 finish_shutdown(pollset);
1152 }
1153 }
1154
1155 static int poll_deadline_to_millis_timeout(grpc_millis deadline) {
1156 if (deadline == GRPC_MILLIS_INF_FUTURE) return -1;
1157 if (deadline == 0) return 0;
1158 grpc_millis n = deadline - grpc_core::ExecCtx::Get()->Now();
1159 if (n < 0) return 0;
1160 if (n > INT_MAX) return -1;
1161 return static_cast<int>(n);
1162 }
1163
1164 /*******************************************************************************
1165 * pollset_set_posix.c
1166 */
1167
1168 static grpc_pollset_set* pollset_set_create(void) {
1169 grpc_pollset_set* pollset_set =
1170 static_cast<grpc_pollset_set*>(gpr_zalloc(sizeof(*pollset_set)));
1171 gpr_mu_init(&pollset_set->mu);
1172 return pollset_set;
1173 }
1174
1175 static void pollset_set_destroy(grpc_pollset_set* pollset_set) {
1176 size_t i;
1177 gpr_mu_destroy(&pollset_set->mu);
1178 for (i = 0; i < pollset_set->fd_count; i++) {
1179 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1180 }
1181 for (i = 0; i < pollset_set->pollset_count; i++) {
1182 grpc_pollset* pollset = pollset_set->pollsets[i];
1183 gpr_mu_lock(&pollset->mu);
1184 pollset->pollset_set_count--;
1185 /* check shutdown */
1186 if (pollset->shutting_down && !pollset->called_shutdown &&
1187 !pollset_has_observers(pollset)) {
1188 pollset->called_shutdown = 1;
1189 gpr_mu_unlock(&pollset->mu);
1190 finish_shutdown(pollset);
1191 } else {
1192 gpr_mu_unlock(&pollset->mu);
1193 }
1194 }
1195 gpr_free(pollset_set->pollsets);
1196 gpr_free(pollset_set->pollset_sets);
1197 gpr_free(pollset_set->fds);
1198 gpr_free(pollset_set);
1199 }
1200
1201 static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
1202 grpc_pollset* pollset) {
1203 size_t i, j;
1204 gpr_mu_lock(&pollset->mu);
1205 pollset->pollset_set_count++;
1206 gpr_mu_unlock(&pollset->mu);
1207 gpr_mu_lock(&pollset_set->mu);
1208 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1209 pollset_set->pollset_capacity =
1210 GPR_MAX(8, 2 * pollset_set->pollset_capacity);
1211 pollset_set->pollsets = static_cast<grpc_pollset**>(gpr_realloc(
1212 pollset_set->pollsets,
1213 pollset_set->pollset_capacity * sizeof(*pollset_set->pollsets)));
1214 }
1215 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1216 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1217 if (fd_is_orphaned(pollset_set->fds[i])) {
1218 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1219 } else {
1220 pollset_add_fd(pollset, pollset_set->fds[i]);
1221 pollset_set->fds[j++] = pollset_set->fds[i];
1222 }
1223 }
1224 pollset_set->fd_count = j;
1225 gpr_mu_unlock(&pollset_set->mu);
1226 }
1227
1228 static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
1229 grpc_pollset* pollset) {
1230 size_t i;
1231 gpr_mu_lock(&pollset_set->mu);
1232 for (i = 0; i < pollset_set->pollset_count; i++) {
1233 if (pollset_set->pollsets[i] == pollset) {
1234 pollset_set->pollset_count--;
1235 GPR_SWAP(grpc_pollset*, pollset_set->pollsets[i],
1236 pollset_set->pollsets[pollset_set->pollset_count]);
1237 break;
1238 }
1239 }
1240 gpr_mu_unlock(&pollset_set->mu);
1241 gpr_mu_lock(&pollset->mu);
1242 pollset->pollset_set_count--;
1243 /* check shutdown */
1244 if (pollset->shutting_down && !pollset->called_shutdown &&
1245 !pollset_has_observers(pollset)) {
1246 pollset->called_shutdown = 1;
1247 gpr_mu_unlock(&pollset->mu);
1248 finish_shutdown(pollset);
1249 } else {
1250 gpr_mu_unlock(&pollset->mu);
1251 }
1252 }
1253
1254 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1255 grpc_pollset_set* item) {
1256 size_t i, j;
1257 gpr_mu_lock(&bag->mu);
1258 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1259 bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
1260 bag->pollset_sets = static_cast<grpc_pollset_set**>(
1261 gpr_realloc(bag->pollset_sets,
1262 bag->pollset_set_capacity * sizeof(*bag->pollset_sets)));
1263 }
1264 bag->pollset_sets[bag->pollset_set_count++] = item;
1265 for (i = 0, j = 0; i < bag->fd_count; i++) {
1266 if (fd_is_orphaned(bag->fds[i])) {
1267 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1268 } else {
1269 pollset_set_add_fd(item, bag->fds[i]);
1270 bag->fds[j++] = bag->fds[i];
1271 }
1272 }
1273 bag->fd_count = j;
1274 gpr_mu_unlock(&bag->mu);
1275 }
1276
1277 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1278 grpc_pollset_set* item) {
1279 size_t i;
1280 gpr_mu_lock(&bag->mu);
1281 for (i = 0; i < bag->pollset_set_count; i++) {
1282 if (bag->pollset_sets[i] == item) {
1283 bag->pollset_set_count--;
1284 GPR_SWAP(grpc_pollset_set*, bag->pollset_sets[i],
1285 bag->pollset_sets[bag->pollset_set_count]);
1286 break;
1287 }
1288 }
1289 gpr_mu_unlock(&bag->mu);
1290 }
1291
1292 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1293 size_t i;
1294 gpr_mu_lock(&pollset_set->mu);
1295 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1296 pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
1297 pollset_set->fds = static_cast<grpc_fd**>(
1298 gpr_realloc(pollset_set->fds,
1299 pollset_set->fd_capacity * sizeof(*pollset_set->fds)));
1300 }
1301 GRPC_FD_REF(fd, "pollset_set");
1302 pollset_set->fds[pollset_set->fd_count++] = fd;
1303 for (i = 0; i < pollset_set->pollset_count; i++) {
1304 pollset_add_fd(pollset_set->pollsets[i], fd);
1305 }
1306 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1307 pollset_set_add_fd(pollset_set->pollset_sets[i], fd);
1308 }
1309 gpr_mu_unlock(&pollset_set->mu);
1310 }
1311
1312 static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1313 size_t i;
1314 gpr_mu_lock(&pollset_set->mu);
1315 for (i = 0; i < pollset_set->fd_count; i++) {
1316 if (pollset_set->fds[i] == fd) {
1317 pollset_set->fd_count--;
1318 GPR_SWAP(grpc_fd*, pollset_set->fds[i],
1319 pollset_set->fds[pollset_set->fd_count]);
1320 GRPC_FD_UNREF(fd, "pollset_set");
1321 break;
1322 }
1323 }
1324 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1325 pollset_set_del_fd(pollset_set->pollset_sets[i], fd);
1326 }
1327 gpr_mu_unlock(&pollset_set->mu);
1328 }
1329
1330 /*******************************************************************************
1331 * event engine binding
1332 */
1333
1334 static bool is_any_background_poller_thread(void) { return false; }
1335
1336 static void shutdown_background_closure(void) {}
1337
1338 static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
1339 grpc_error* /*error*/) {
1340 return false;
1341 }
1342
1343 static void shutdown_engine(void) {
1344 pollset_global_shutdown();
1345 if (track_fds_for_fork) {
1346 gpr_mu_destroy(&fork_fd_list_mu);
1347 grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1348 }
1349 }
1350
1351 static const grpc_event_engine_vtable vtable = {
1352 sizeof(grpc_pollset),
1353 false,
1354 false,
1355
1356 fd_create,
1357 fd_wrapped_fd,
1358 fd_orphan,
1359 fd_shutdown,
1360 fd_notify_on_read,
1361 fd_notify_on_write,
1362 fd_notify_on_error,
1363 fd_set_readable,
1364 fd_set_writable,
1365 fd_set_error,
1366 fd_is_shutdown,
1367
1368 pollset_init,
1369 pollset_shutdown,
1370 pollset_destroy,
1371 pollset_work,
1372 pollset_kick,
1373 pollset_add_fd,
1374
1375 pollset_set_create,
1376 pollset_set_destroy,
1377 pollset_set_add_pollset,
1378 pollset_set_del_pollset,
1379 pollset_set_add_pollset_set,
1380 pollset_set_del_pollset_set,
1381 pollset_set_add_fd,
1382 pollset_set_del_fd,
1383
1384 is_any_background_poller_thread,
1385 shutdown_background_closure,
1386 shutdown_engine,
1387 add_closure_to_background_poller,
1388 };
1389
1390 /* Called by the child process's post-fork handler to close open fds, including
1391 * worker wakeup fds. This allows gRPC to shutdown in the child process without
1392 * interfering with connections or RPCs ongoing in the parent. */
1393 static void reset_event_manager_on_fork() {
1394 gpr_mu_lock(&fork_fd_list_mu);
1395 while (fork_fd_list_head != nullptr) {
1396 if (fork_fd_list_head->fd != nullptr) {
1397 if (!fork_fd_list_head->fd->closed) {
1398 close(fork_fd_list_head->fd->fd);
1399 }
1400 fork_fd_list_head->fd->fd = -1;
1401 } else {
1402 close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
1403 fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
1404 close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
1405 fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
1406 }
1407 fork_fd_list_head = fork_fd_list_head->next;
1408 }
1409 gpr_mu_unlock(&fork_fd_list_mu);
1410 }
1411
1412 const grpc_event_engine_vtable* grpc_init_poll_posix(
1413 bool /*explicit_request*/) {
1414 if (!grpc_has_wakeup_fd()) {
1415 gpr_log(GPR_ERROR, "Skipping poll because of no wakeup fd.");
1416 return nullptr;
1417 }
1418 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1419 return nullptr;
1420 }
1421 if (grpc_core::Fork::Enabled()) {
1422 track_fds_for_fork = true;
1423 gpr_mu_init(&fork_fd_list_mu);
1424 grpc_core::Fork::SetResetChildPollingEngineFunc(
1425 reset_event_manager_on_fork);
1426 }
1427 return &vtable;
1428 }
1429
1430 #endif /* GRPC_POSIX_SOCKET_EV_POLL */
1431