1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20 #include <grpc/support/sync.h>
21
22 #include "src/core/lib/iomgr/port.h"
23
24 #ifdef GRPC_POSIX_SOCKET_EV_POLL
25
26 #include <assert.h>
27 #include <errno.h>
28 #include <grpc/support/alloc.h>
29 #include <limits.h>
30 #include <poll.h>
31 #include <string.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34
35 #include <string>
36
37 #include "absl/log/check.h"
38 #include "absl/log/log.h"
39 #include "absl/strings/str_cat.h"
40 #include "absl/strings/str_format.h"
41 #include "src/core/lib/iomgr/block_annotate.h"
42 #include "src/core/lib/iomgr/ev_poll_posix.h"
43 #include "src/core/lib/iomgr/iomgr_internal.h"
44 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
45 #include "src/core/telemetry/stats.h"
46 #include "src/core/telemetry/stats_data.h"
47 #include "src/core/util/crash.h"
48 #include "src/core/util/thd.h"
49 #include "src/core/util/useful.h"
50
51 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
52
53 //******************************************************************************
54 // FD declarations
55 //
56 typedef struct grpc_fd_watcher {
57 struct grpc_fd_watcher* next;
58 struct grpc_fd_watcher* prev;
59 grpc_pollset* pollset;
60 grpc_pollset_worker* worker;
61 grpc_fd* fd;
62 } grpc_fd_watcher;
63
64 typedef struct grpc_cached_wakeup_fd grpc_cached_wakeup_fd;
65
66 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
67 struct grpc_fork_fd_list {
68 // Only one of fd or cached_wakeup_fd will be set. The unused field will be
69 // set to nullptr.
70 grpc_fd* fd;
71 grpc_cached_wakeup_fd* cached_wakeup_fd;
72
73 grpc_fork_fd_list* next;
74 grpc_fork_fd_list* prev;
75 };
76
77 struct grpc_fd {
78 int fd;
79 // refst format:
80 // bit0: 1=active/0=orphaned
81 // bit1-n: refcount
82 // meaning that mostly we ref by two to avoid altering the orphaned bit,
83 // and just unref by 1 when we're ready to flag the object as orphaned
84 gpr_atm refst;
85
86 gpr_mu mu;
87 int shutdown;
88 int closed;
89 int released;
90 gpr_atm pollhup;
91 grpc_error_handle shutdown_error;
92
93 // The watcher list.
94
95 // The following watcher related fields are protected by watcher_mu.
96
97 // An fd_watcher is an ephemeral object created when an fd wants to
98 // begin polling, and destroyed after the poll.
99
100 // It denotes the fd's interest in whether to read poll or write poll
101 // or both or neither on this fd.
102
103 // If a watcher is asked to poll for reads or writes, the read_watcher
104 // or write_watcher fields are set respectively. A watcher may be asked
105 // to poll for both, in which case both fields will be set.
106
107 // read_watcher and write_watcher may be NULL if no watcher has been
108 // asked to poll for reads or writes.
109
110 // If an fd_watcher is not asked to poll for reads or writes, it's added
111 // to a linked list of inactive watchers, rooted at inactive_watcher_root.
112 // If at a later time there becomes need of a poller to poll, one of
113 // the inactive pollers may be kicked out of their poll loops to take
114 // that responsibility.
115 grpc_fd_watcher inactive_watcher_root;
116 grpc_fd_watcher* read_watcher;
117 grpc_fd_watcher* write_watcher;
118
119 grpc_closure* read_closure;
120 grpc_closure* write_closure;
121
122 grpc_closure* on_done_closure;
123
124 grpc_iomgr_object iomgr_object;
125
126 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
127 grpc_fork_fd_list* fork_fd_list;
128
129 bool is_pre_allocated;
130 };
131
132 // True when GRPC_ENABLE_FORK_SUPPORT=1.
133 static bool track_fds_for_fork = false;
134
135 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
136 static grpc_fork_fd_list* fork_fd_list_head = nullptr;
137 static gpr_mu fork_fd_list_mu;
138
139 // Begin polling on an fd.
140 // Registers that the given pollset is interested in this fd - so that if read
141 // or writability interest changes, the pollset can be kicked to pick up that
142 // new interest.
143 // Return value is:
144 // (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
145 // i.e. a combination of read_mask and write_mask determined by the fd's current
146 // interest in said events.
147 // Polling strategies that do not need to alter their behavior depending on the
148 // fd's current interest (such as epoll) do not need to call this function.
149 // MUST NOT be called with a pollset lock taken
150 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
151 grpc_pollset_worker* worker, uint32_t read_mask,
152 uint32_t write_mask, grpc_fd_watcher* watcher);
153 // Complete polling previously started with fd_begin_poll
154 // MUST NOT be called with a pollset lock taken
155 // if got_read or got_write are 1, also does the become_{readable,writable} as
156 // appropriate.
157 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write);
158
159 // Return 1 if this fd is orphaned, 0 otherwise
160 static bool fd_is_orphaned(grpc_fd* fd);
161
162 #ifndef NDEBUG
163 static void fd_ref(grpc_fd* fd, const char* reason, const char* file, int line);
164 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
165 int line);
166 #define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
167 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
168 #else
169 static void fd_ref(grpc_fd* fd);
170 static void fd_unref(grpc_fd* fd);
171 #define GRPC_FD_REF(fd, reason) fd_ref(fd)
172 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
173 #endif
174
175 #define CLOSURE_NOT_READY ((grpc_closure*)0)
176 #define CLOSURE_READY ((grpc_closure*)1)
177
178 //******************************************************************************
179 // pollset declarations
180 //
181
182 typedef struct grpc_cached_wakeup_fd {
183 grpc_wakeup_fd fd;
184 struct grpc_cached_wakeup_fd* next;
185
186 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
187 grpc_fork_fd_list* fork_fd_list;
188 } grpc_cached_wakeup_fd;
189
190 struct grpc_pollset_worker {
191 grpc_cached_wakeup_fd* wakeup_fd;
192 int reevaluate_polling_on_wakeup;
193 int kicked_specifically;
194 struct grpc_pollset_worker* next;
195 struct grpc_pollset_worker* prev;
196 };
197
198 struct grpc_pollset {
199 gpr_mu mu;
200 grpc_pollset_worker root_worker;
201 int shutting_down;
202 int called_shutdown;
203 int kicked_without_pollers;
204 grpc_closure* shutdown_done;
205 int pollset_set_count;
206 // all polled fds
207 size_t fd_count;
208 size_t fd_capacity;
209 grpc_fd** fds;
210 // Local cache of eventfds for workers
211 grpc_cached_wakeup_fd* local_wakeup_cache;
212 };
213
214 // Add an fd to a pollset
215 static void pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd);
216
217 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
218
219 // Convert a timespec to milliseconds:
220 // - very small or negative poll times are clamped to zero to do a
221 // non-blocking poll (which becomes spin polling)
222 // - other small values are rounded up to one millisecond
223 // - longer than a millisecond polls are rounded up to the next nearest
224 // millisecond to avoid spinning
225 // - infinite timeouts are converted to -1
226 static int poll_deadline_to_millis_timeout(grpc_core::Timestamp deadline);
227
228 // Allow kick to wakeup the currently polling worker
229 #define GRPC_POLLSET_CAN_KICK_SELF 1
230 // Force the wakee to repoll when awoken
231 #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
232 // As per pollset_kick, with an extended set of flags (defined above)
233 // -- mostly for fd_posix's use.
234 static grpc_error_handle pollset_kick_ext(grpc_pollset* p,
235 grpc_pollset_worker* specific_worker,
236 uint32_t flags);
237
238 // Return 1 if the pollset has active threads in pollset_work (pollset must
239 // be locked)
240 static bool pollset_has_workers(grpc_pollset* pollset);
241
242 //******************************************************************************
243 // pollset_set definitions
244 //
245
246 struct grpc_pollset_set {
247 gpr_mu mu;
248
249 size_t pollset_count;
250 size_t pollset_capacity;
251 grpc_pollset** pollsets;
252
253 size_t pollset_set_count;
254 size_t pollset_set_capacity;
255 struct grpc_pollset_set** pollset_sets;
256
257 size_t fd_count;
258 size_t fd_capacity;
259 grpc_fd** fds;
260 };
261
262 //******************************************************************************
263 // functions to track opened fds. No-ops unless track_fds_for_fork is true.
264 //
265
fork_fd_list_remove_node(grpc_fork_fd_list * node)266 static void fork_fd_list_remove_node(grpc_fork_fd_list* node) {
267 gpr_mu_lock(&fork_fd_list_mu);
268 if (fork_fd_list_head == node) {
269 fork_fd_list_head = node->next;
270 }
271 if (node->prev != nullptr) {
272 node->prev->next = node->next;
273 }
274 if (node->next != nullptr) {
275 node->next->prev = node->prev;
276 }
277 gpr_free(node);
278 gpr_mu_unlock(&fork_fd_list_mu);
279 }
280
fork_fd_list_remove_grpc_fd(grpc_fd * fd)281 static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
282 if (track_fds_for_fork) {
283 fork_fd_list_remove_node(fd->fork_fd_list);
284 }
285 }
286
fork_fd_list_remove_wakeup_fd(grpc_cached_wakeup_fd * fd)287 static void fork_fd_list_remove_wakeup_fd(grpc_cached_wakeup_fd* fd) {
288 if (track_fds_for_fork) {
289 fork_fd_list_remove_node(fd->fork_fd_list);
290 }
291 }
292
fork_fd_list_add_node(grpc_fork_fd_list * node)293 static void fork_fd_list_add_node(grpc_fork_fd_list* node) {
294 gpr_mu_lock(&fork_fd_list_mu);
295 node->next = fork_fd_list_head;
296 node->prev = nullptr;
297 if (fork_fd_list_head != nullptr) {
298 fork_fd_list_head->prev = node;
299 }
300 fork_fd_list_head = node;
301 gpr_mu_unlock(&fork_fd_list_mu);
302 }
303
fork_fd_list_add_grpc_fd(grpc_fd * fd)304 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
305 if (track_fds_for_fork) {
306 fd->fork_fd_list =
307 static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
308 fd->fork_fd_list->fd = fd;
309 fd->fork_fd_list->cached_wakeup_fd = nullptr;
310 fork_fd_list_add_node(fd->fork_fd_list);
311 }
312 }
313
fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd * fd)314 static void fork_fd_list_add_wakeup_fd(grpc_cached_wakeup_fd* fd) {
315 if (track_fds_for_fork) {
316 fd->fork_fd_list =
317 static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
318 fd->fork_fd_list->cached_wakeup_fd = fd;
319 fd->fork_fd_list->fd = nullptr;
320 fork_fd_list_add_node(fd->fork_fd_list);
321 }
322 }
323
324 //******************************************************************************
325 // fd_posix.c
326 //
327
328 #ifndef NDEBUG
329 #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
330 #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
ref_by(grpc_fd * fd,int n,const char * reason,const char * file,int line)331 static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
332 int line) {
333 GRPC_TRACE_VLOG(fd_refcount, 2)
334 << "FD " << fd->fd << " " << fd << " ref " << n << " "
335 << gpr_atm_no_barrier_load(&fd->refst) << " -> "
336 << gpr_atm_no_barrier_load(&fd->refst) + n << " [" << reason << "; "
337 << file << ":" << line << "]";
338 #else
339 #define REF_BY(fd, n, reason) \
340 do { \
341 ref_by(fd, n); \
342 (void)(reason); \
343 } while (0)
344 #define UNREF_BY(fd, n, reason) \
345 do { \
346 unref_by(fd, n); \
347 (void)(reason); \
348 } while (0)
349 static void ref_by(grpc_fd* fd, int n) {
350 #endif
351 CHECK_GT(gpr_atm_no_barrier_fetch_add(&fd->refst, n), 0);
352 }
353
354 #ifndef NDEBUG
355 static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
356 int line) {
357 GRPC_TRACE_VLOG(fd_refcount, 2)
358 << "FD " << fd->fd << " " << fd << " unref " << n << " "
359 << gpr_atm_no_barrier_load(&fd->refst) << " -> "
360 << gpr_atm_no_barrier_load(&fd->refst) - n << " [" << reason << "; "
361 << file << ":" << line << "]";
362 #else
363 static void unref_by(grpc_fd* fd, int n) {
364 #endif
365 gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
366 if (old == n) {
367 gpr_mu_destroy(&fd->mu);
368 grpc_iomgr_unregister_object(&fd->iomgr_object);
369 fork_fd_list_remove_grpc_fd(fd);
370 if (fd->shutdown) {
371 }
372 fd->shutdown_error.~Status();
373 gpr_free(fd);
374 } else {
375 CHECK(old > n);
376 }
377 }
378
379 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
380 // Avoid unused-parameter warning for debug-only parameter
381 (void)track_err;
382 DCHECK(track_err == false);
383 grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r)));
384 gpr_mu_init(&r->mu);
385 gpr_atm_rel_store(&r->refst, 1);
386 r->shutdown = 0;
387 new (&r->shutdown_error) absl::Status();
388 r->read_closure = CLOSURE_NOT_READY;
389 r->write_closure = CLOSURE_NOT_READY;
390 r->fd = fd;
391 r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
392 &r->inactive_watcher_root;
393 r->read_watcher = r->write_watcher = nullptr;
394 r->on_done_closure = nullptr;
395 r->closed = 0;
396 r->released = 0;
397 r->is_pre_allocated = false;
398 gpr_atm_no_barrier_store(&r->pollhup, 0);
399
400 std::string name2 = absl::StrCat(name, " fd=", fd);
401 grpc_iomgr_register_object(&r->iomgr_object, name2.c_str());
402 fork_fd_list_add_grpc_fd(r);
403 return r;
404 }
405
406 static bool fd_is_orphaned(grpc_fd* fd) {
407 return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
408 }
409
410 static grpc_error_handle pollset_kick_locked(grpc_fd_watcher* watcher) {
411 gpr_mu_lock(&watcher->pollset->mu);
412 CHECK(watcher->worker);
413 grpc_error_handle err =
414 pollset_kick_ext(watcher->pollset, watcher->worker,
415 GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
416 gpr_mu_unlock(&watcher->pollset->mu);
417 return err;
418 }
419
420 static void maybe_wake_one_watcher_locked(grpc_fd* fd) {
421 if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
422 (void)pollset_kick_locked(fd->inactive_watcher_root.next);
423 } else if (fd->read_watcher) {
424 (void)pollset_kick_locked(fd->read_watcher);
425 } else if (fd->write_watcher) {
426 (void)pollset_kick_locked(fd->write_watcher);
427 }
428 }
429
430 static void wake_all_watchers_locked(grpc_fd* fd) {
431 grpc_fd_watcher* watcher;
432 for (watcher = fd->inactive_watcher_root.next;
433 watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
434 (void)pollset_kick_locked(watcher);
435 }
436 if (fd->read_watcher) {
437 (void)pollset_kick_locked(fd->read_watcher);
438 }
439 if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
440 (void)pollset_kick_locked(fd->write_watcher);
441 }
442 }
443
444 static int has_watchers(grpc_fd* fd) {
445 return fd->read_watcher != nullptr || fd->write_watcher != nullptr ||
446 fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
447 }
448
449 static void close_fd_locked(grpc_fd* fd) {
450 fd->closed = 1;
451 if (!fd->released) {
452 if (!fd->is_pre_allocated) {
453 close(fd->fd);
454 }
455 }
456 grpc_core::ExecCtx::Run(DEBUG_LOCATION, fd->on_done_closure,
457 absl::OkStatus());
458 }
459
460 static int fd_wrapped_fd(grpc_fd* fd) {
461 if (fd->released || fd->closed) {
462 return -1;
463 } else {
464 return fd->fd;
465 }
466 }
467
468 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
469 const char* reason) {
470 fd->on_done_closure = on_done;
471 fd->released = release_fd != nullptr;
472 if (release_fd != nullptr) {
473 *release_fd = fd->fd;
474 fd->released = true;
475 }
476 gpr_mu_lock(&fd->mu);
477 REF_BY(fd, 1, reason); // remove active status, but keep referenced
478 if (!has_watchers(fd)) {
479 close_fd_locked(fd);
480 } else {
481 wake_all_watchers_locked(fd);
482 }
483 gpr_mu_unlock(&fd->mu);
484 UNREF_BY(fd, 2, reason); // drop the reference
485 }
486
487 // increment refcount by two to avoid changing the orphan bit
488 #ifndef NDEBUG
489 static void fd_ref(grpc_fd* fd, const char* reason, const char* file,
490 int line) {
491 ref_by(fd, 2, reason, file, line);
492 }
493
494 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
495 int line) {
496 unref_by(fd, 2, reason, file, line);
497 }
498 #else
499 static void fd_ref(grpc_fd* fd) { ref_by(fd, 2); }
500
501 static void fd_unref(grpc_fd* fd) { unref_by(fd, 2); }
502 #endif
503
504 static grpc_error_handle fd_shutdown_error(grpc_fd* fd) {
505 if (!fd->shutdown) {
506 return absl::OkStatus();
507 } else {
508 return grpc_error_set_int(
509 GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1),
510 grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE);
511 }
512 }
513
514 static void notify_on_locked(grpc_fd* fd, grpc_closure** st,
515 grpc_closure* closure) {
516 if (fd->shutdown || gpr_atm_no_barrier_load(&fd->pollhup)) {
517 grpc_core::ExecCtx::Run(
518 DEBUG_LOCATION, closure,
519 grpc_error_set_int(GRPC_ERROR_CREATE("FD shutdown"),
520 grpc_core::StatusIntProperty::kRpcStatus,
521 GRPC_STATUS_UNAVAILABLE));
522 } else if (*st == CLOSURE_NOT_READY) {
523 // not ready ==> switch to a waiting state by setting the closure
524 *st = closure;
525 } else if (*st == CLOSURE_READY) {
526 // already ready ==> queue the closure to run immediately
527 *st = CLOSURE_NOT_READY;
528 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, fd_shutdown_error(fd));
529 maybe_wake_one_watcher_locked(fd);
530 } else {
531 // upcallptr was set to a different closure. This is an error!
532 grpc_core::Crash(
533 "User called a notify_on function with a previous callback still "
534 "pending");
535 }
536 }
537
538 // returns 1 if state becomes not ready
539 static int set_ready_locked(grpc_fd* fd, grpc_closure** st) {
540 if (*st == CLOSURE_READY) {
541 // duplicate ready ==> ignore
542 return 0;
543 } else if (*st == CLOSURE_NOT_READY) {
544 // not ready, and not waiting ==> flag ready
545 *st = CLOSURE_READY;
546 return 0;
547 } else {
548 // waiting ==> queue closure
549 grpc_core::ExecCtx::Run(DEBUG_LOCATION, *st, fd_shutdown_error(fd));
550 *st = CLOSURE_NOT_READY;
551 return 1;
552 }
553 }
554
555 static void fd_shutdown(grpc_fd* fd, grpc_error_handle why) {
556 gpr_mu_lock(&fd->mu);
557 // only shutdown once
558 if (!fd->shutdown) {
559 fd->shutdown = 1;
560 fd->shutdown_error = why;
561 // signal read/write closed to OS so that future operations fail
562 if (!fd->is_pre_allocated) {
563 shutdown(fd->fd, SHUT_RDWR);
564 }
565 set_ready_locked(fd, &fd->read_closure);
566 set_ready_locked(fd, &fd->write_closure);
567 }
568 gpr_mu_unlock(&fd->mu);
569 }
570
571 static bool fd_is_shutdown(grpc_fd* fd) {
572 gpr_mu_lock(&fd->mu);
573 bool r = fd->shutdown;
574 gpr_mu_unlock(&fd->mu);
575 return r;
576 }
577
578 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
579 gpr_mu_lock(&fd->mu);
580 notify_on_locked(fd, &fd->read_closure, closure);
581 gpr_mu_unlock(&fd->mu);
582 }
583
584 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
585 gpr_mu_lock(&fd->mu);
586 notify_on_locked(fd, &fd->write_closure, closure);
587 gpr_mu_unlock(&fd->mu);
588 }
589
590 static void fd_notify_on_error(grpc_fd* /*fd*/, grpc_closure* closure) {
591 GRPC_TRACE_LOG(polling, ERROR)
592 << "Polling engine does not support tracking errors.";
593 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, absl::CancelledError());
594 }
595
596 static void fd_set_readable(grpc_fd* fd) {
597 gpr_mu_lock(&fd->mu);
598 set_ready_locked(fd, &fd->read_closure);
599 gpr_mu_unlock(&fd->mu);
600 }
601
602 static void fd_set_writable(grpc_fd* fd) {
603 gpr_mu_lock(&fd->mu);
604 set_ready_locked(fd, &fd->write_closure);
605 gpr_mu_unlock(&fd->mu);
606 }
607
608 static void fd_set_error(grpc_fd* /*fd*/) {
609 GRPC_TRACE_LOG(polling, ERROR)
610 << "Polling engine does not support tracking errors.";
611 }
612
613 static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset,
614 grpc_pollset_worker* worker, uint32_t read_mask,
615 uint32_t write_mask, grpc_fd_watcher* watcher) {
616 uint32_t mask = 0;
617 grpc_closure* cur;
618 int requested;
619 // keep track of pollers that have requested our events, in case they change
620 //
621 GRPC_FD_REF(fd, "poll");
622
623 gpr_mu_lock(&fd->mu);
624
625 // if we are shutdown, then don't add to the watcher set
626 if (fd->shutdown) {
627 watcher->pollset = nullptr;
628 watcher->worker = nullptr;
629 gpr_mu_unlock(&fd->mu);
630 GRPC_FD_UNREF(fd, "poll");
631 return 0;
632 }
633
634 // if there is nobody polling for read, but we need to, then start doing so
635 cur = fd->read_closure;
636 requested = cur != CLOSURE_READY;
637 if (read_mask && fd->read_watcher == nullptr && requested) {
638 fd->read_watcher = watcher;
639 mask |= read_mask;
640 }
641 // if there is nobody polling for write, but we need to, then start doing so
642 //
643 cur = fd->write_closure;
644 requested = cur != CLOSURE_READY;
645 if (write_mask && fd->write_watcher == nullptr && requested) {
646 fd->write_watcher = watcher;
647 mask |= write_mask;
648 }
649 // if not polling, remember this watcher in case we need someone to later
650 if (mask == 0 && worker != nullptr) {
651 watcher->next = &fd->inactive_watcher_root;
652 watcher->prev = watcher->next->prev;
653 watcher->next->prev = watcher->prev->next = watcher;
654 }
655 watcher->pollset = pollset;
656 watcher->worker = worker;
657 watcher->fd = fd;
658 gpr_mu_unlock(&fd->mu);
659
660 return mask;
661 }
662
663 static void fd_end_poll(grpc_fd_watcher* watcher, int got_read, int got_write) {
664 int was_polling = 0;
665 int kick = 0;
666 grpc_fd* fd = watcher->fd;
667 if (fd == nullptr) {
668 return;
669 }
670
671 gpr_mu_lock(&fd->mu);
672 if (watcher->pollset == nullptr) {
673 watcher->fd = nullptr;
674 gpr_mu_unlock(&fd->mu);
675 GRPC_FD_UNREF(fd, "multipoller_start");
676 return;
677 }
678
679 if (watcher == fd->read_watcher) {
680 // remove read watcher, kick if we still need a read
681 was_polling = 1;
682 if (!got_read) {
683 kick = 1;
684 }
685 fd->read_watcher = nullptr;
686 }
687 if (watcher == fd->write_watcher) {
688 // remove write watcher, kick if we still need a write
689 was_polling = 1;
690 if (!got_write) {
691 kick = 1;
692 }
693 fd->write_watcher = nullptr;
694 }
695 if (!was_polling && watcher->worker != nullptr) {
696 // remove from inactive list
697 watcher->next->prev = watcher->prev;
698 watcher->prev->next = watcher->next;
699 }
700 if (got_read) {
701 if (set_ready_locked(fd, &fd->read_closure)) {
702 kick = 1;
703 }
704 }
705 if (got_write) {
706 if (set_ready_locked(fd, &fd->write_closure)) {
707 kick = 1;
708 }
709 }
710 if (kick) {
711 maybe_wake_one_watcher_locked(fd);
712 }
713 if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
714 close_fd_locked(fd);
715 }
716 gpr_mu_unlock(&fd->mu);
717
718 GRPC_FD_UNREF(fd, "poll");
719 }
720
721 static void fd_set_pre_allocated(grpc_fd* fd) { fd->is_pre_allocated = true; }
722
723 //******************************************************************************
724 // pollset_posix.c
725 //
726
727 static thread_local grpc_pollset* g_current_thread_poller;
728 static thread_local grpc_pollset_worker* g_current_thread_worker;
729
730 static void remove_worker(grpc_pollset* /*p*/, grpc_pollset_worker* worker) {
731 worker->prev->next = worker->next;
732 worker->next->prev = worker->prev;
733 }
734
735 static bool pollset_has_workers(grpc_pollset* p) {
736 return p->root_worker.next != &p->root_worker;
737 }
738
739 static bool pollset_in_pollset_sets(grpc_pollset* p) {
740 return p->pollset_set_count;
741 }
742
743 static bool pollset_has_observers(grpc_pollset* p) {
744 return pollset_has_workers(p) || pollset_in_pollset_sets(p);
745 }
746
747 static grpc_pollset_worker* pop_front_worker(grpc_pollset* p) {
748 if (pollset_has_workers(p)) {
749 grpc_pollset_worker* w = p->root_worker.next;
750 remove_worker(p, w);
751 return w;
752 } else {
753 return nullptr;
754 }
755 }
756
757 static void push_back_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
758 worker->next = &p->root_worker;
759 worker->prev = worker->next->prev;
760 worker->prev->next = worker->next->prev = worker;
761 }
762
763 static void push_front_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
764 worker->prev = &p->root_worker;
765 worker->next = worker->prev->next;
766 worker->prev->next = worker->next->prev = worker;
767 }
768
769 static void kick_append_error(grpc_error_handle* composite,
770 grpc_error_handle error) {
771 if (error.ok()) return;
772 if (composite->ok()) {
773 *composite = GRPC_ERROR_CREATE("Kick Failure");
774 }
775 *composite = grpc_error_add_child(*composite, error);
776 }
777
778 static grpc_error_handle pollset_kick_ext(grpc_pollset* p,
779 grpc_pollset_worker* specific_worker,
780 uint32_t flags) {
781 grpc_error_handle error;
782
783 // pollset->mu already held
784 if (specific_worker != nullptr) {
785 if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
786 CHECK_EQ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP), 0u);
787 for (specific_worker = p->root_worker.next;
788 specific_worker != &p->root_worker;
789 specific_worker = specific_worker->next) {
790 kick_append_error(
791 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
792 }
793 p->kicked_without_pollers = true;
794 } else if (g_current_thread_worker != specific_worker) {
795 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
796 specific_worker->reevaluate_polling_on_wakeup = true;
797 }
798 specific_worker->kicked_specifically = true;
799 kick_append_error(&error,
800 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
801 } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
802 if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
803 specific_worker->reevaluate_polling_on_wakeup = true;
804 }
805 specific_worker->kicked_specifically = true;
806 kick_append_error(&error,
807 grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
808 }
809 } else if (g_current_thread_poller != p) {
810 CHECK_EQ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP), 0u);
811 specific_worker = pop_front_worker(p);
812 if (specific_worker != nullptr) {
813 if (g_current_thread_worker == specific_worker) {
814 push_back_worker(p, specific_worker);
815 specific_worker = pop_front_worker(p);
816 if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
817 g_current_thread_worker == specific_worker) {
818 push_back_worker(p, specific_worker);
819 specific_worker = nullptr;
820 }
821 }
822 if (specific_worker != nullptr) {
823 push_back_worker(p, specific_worker);
824 kick_append_error(
825 &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd));
826 }
827 } else {
828 p->kicked_without_pollers = true;
829 }
830 }
831
832 GRPC_LOG_IF_ERROR("pollset_kick_ext", error);
833 return error;
834 }
835
836 static grpc_error_handle pollset_kick(grpc_pollset* p,
837 grpc_pollset_worker* specific_worker) {
838 return pollset_kick_ext(p, specific_worker, 0);
839 }
840
841 // global state management
842
843 static grpc_error_handle pollset_global_init(void) { return absl::OkStatus(); }
844
845 // main interface
846
847 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
848 gpr_mu_init(&pollset->mu);
849 *mu = &pollset->mu;
850 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
851 pollset->shutting_down = 0;
852 pollset->called_shutdown = 0;
853 pollset->kicked_without_pollers = 0;
854 pollset->local_wakeup_cache = nullptr;
855 pollset->kicked_without_pollers = 0;
856 pollset->fd_count = 0;
857 pollset->fd_capacity = 0;
858 pollset->fds = nullptr;
859 pollset->pollset_set_count = 0;
860 }
861
862 static void pollset_destroy(grpc_pollset* pollset) {
863 CHECK(!pollset_has_workers(pollset));
864 while (pollset->local_wakeup_cache) {
865 grpc_cached_wakeup_fd* next = pollset->local_wakeup_cache->next;
866 fork_fd_list_remove_wakeup_fd(pollset->local_wakeup_cache);
867 grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
868 gpr_free(pollset->local_wakeup_cache);
869 pollset->local_wakeup_cache = next;
870 }
871 gpr_free(pollset->fds);
872 gpr_mu_destroy(&pollset->mu);
873 }
874
875 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
876 gpr_mu_lock(&pollset->mu);
877 size_t i;
878 // TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here
879 for (i = 0; i < pollset->fd_count; i++) {
880 if (pollset->fds[i] == fd) goto exit;
881 }
882 if (pollset->fd_count == pollset->fd_capacity) {
883 pollset->fd_capacity =
884 std::max(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
885 pollset->fds = static_cast<grpc_fd**>(
886 gpr_realloc(pollset->fds, sizeof(grpc_fd*) * pollset->fd_capacity));
887 }
888 pollset->fds[pollset->fd_count++] = fd;
889 GRPC_FD_REF(fd, "multipoller");
890 (void)pollset_kick(pollset, nullptr);
891 exit:
892 gpr_mu_unlock(&pollset->mu);
893 }
894
895 static void finish_shutdown(grpc_pollset* pollset) {
896 size_t i;
897 for (i = 0; i < pollset->fd_count; i++) {
898 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
899 }
900 pollset->fd_count = 0;
901 grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_done,
902 absl::OkStatus());
903 }
904
905 static void work_combine_error(grpc_error_handle* composite,
906 grpc_error_handle error) {
907 if (error.ok()) return;
908 if (composite->ok()) {
909 *composite = GRPC_ERROR_CREATE("pollset_work");
910 }
911 *composite = grpc_error_add_child(*composite, error);
912 }
913
914 static grpc_error_handle pollset_work(grpc_pollset* pollset,
915 grpc_pollset_worker** worker_hdl,
916 grpc_core::Timestamp deadline) {
917 grpc_pollset_worker worker;
918 if (worker_hdl) *worker_hdl = &worker;
919 grpc_error_handle error;
920
921 // Avoid malloc for small number of elements.
922 enum { inline_elements = 96 };
923 struct pollfd pollfd_space[inline_elements];
924 struct grpc_fd_watcher watcher_space[inline_elements];
925
926 // pollset->mu already held
927 int added_worker = 0;
928 int locked = 1;
929 int queued_work = 0;
930 int keep_polling = 0;
931 // this must happen before we (potentially) drop pollset->mu
932 worker.next = worker.prev = nullptr;
933 worker.reevaluate_polling_on_wakeup = 0;
934 if (pollset->local_wakeup_cache != nullptr) {
935 worker.wakeup_fd = pollset->local_wakeup_cache;
936 pollset->local_wakeup_cache = worker.wakeup_fd->next;
937 } else {
938 worker.wakeup_fd = static_cast<grpc_cached_wakeup_fd*>(
939 gpr_malloc(sizeof(*worker.wakeup_fd)));
940 error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
941 fork_fd_list_add_wakeup_fd(worker.wakeup_fd);
942 if (!error.ok()) {
943 GRPC_LOG_IF_ERROR("pollset_work", error);
944 return error;
945 }
946 }
947 worker.kicked_specifically = 0;
948 // If we're shutting down then we don't execute any extended work
949 if (pollset->shutting_down) {
950 goto done;
951 }
952 // Start polling, and keep doing so while we're being asked to
953 // re-evaluate our pollers (this allows poll() based pollers to
954 // ensure they don't miss wakeups)
955 keep_polling = 1;
956 g_current_thread_poller = pollset;
957 while (keep_polling) {
958 keep_polling = 0;
959 if (!pollset->kicked_without_pollers ||
960 deadline <= grpc_core::Timestamp::Now()) {
961 if (!added_worker) {
962 push_front_worker(pollset, &worker);
963 added_worker = 1;
964 g_current_thread_worker = &worker;
965 }
966 #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
967 #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
968
969 int timeout;
970 int r;
971 size_t i, fd_count;
972 nfds_t pfd_count;
973 grpc_fd_watcher* watchers;
974 struct pollfd* pfds;
975
976 timeout = poll_deadline_to_millis_timeout(deadline);
977
978 if (pollset->fd_count + 2 <= inline_elements) {
979 pfds = pollfd_space;
980 watchers = watcher_space;
981 } else {
982 // Allocate one buffer to hold both pfds and watchers arrays
983 const size_t pfd_size = sizeof(*pfds) * (pollset->fd_count + 2);
984 const size_t watch_size = sizeof(*watchers) * (pollset->fd_count + 2);
985 void* buf = gpr_malloc(pfd_size + watch_size);
986 pfds = static_cast<struct pollfd*>(buf);
987 watchers = static_cast<grpc_fd_watcher*>(
988 static_cast<void*>((static_cast<char*>(buf) + pfd_size)));
989 }
990
991 fd_count = 0;
992 pfd_count = 1;
993 pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
994 pfds[0].events = POLLIN;
995 pfds[0].revents = 0;
996 for (i = 0; i < pollset->fd_count; i++) {
997 if (fd_is_orphaned(pollset->fds[i]) ||
998 gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) {
999 GRPC_FD_UNREF(pollset->fds[i], "multipoller");
1000 } else {
1001 pollset->fds[fd_count++] = pollset->fds[i];
1002 watchers[pfd_count].fd = pollset->fds[i];
1003 GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
1004 pfds[pfd_count].fd = pollset->fds[i]->fd;
1005 pfds[pfd_count].revents = 0;
1006 pfd_count++;
1007 }
1008 }
1009 pollset->fd_count = fd_count;
1010 gpr_mu_unlock(&pollset->mu);
1011
1012 for (i = 1; i < pfd_count; i++) {
1013 grpc_fd* fd = watchers[i].fd;
1014 pfds[i].events = static_cast<short>(
1015 fd_begin_poll(fd, pollset, &worker, POLLIN, POLLOUT, &watchers[i]));
1016 if (watchers[i].pollset != nullptr) {
1017 GRPC_FD_UNREF(fd, "multipoller_start");
1018 }
1019 }
1020
1021 // TODO(vpai): Consider first doing a 0 timeout poll here to avoid
1022 // even going into the blocking annotation if possible
1023 GRPC_SCHEDULING_START_BLOCKING_REGION;
1024 r = grpc_poll_function(pfds, pfd_count, timeout);
1025 GRPC_SCHEDULING_END_BLOCKING_REGION;
1026
1027 GRPC_TRACE_LOG(polling, INFO) << pollset << " poll=" << r;
1028
1029 if (r < 0) {
1030 if (errno != EINTR) {
1031 work_combine_error(&error, GRPC_OS_ERROR(errno, "poll"));
1032 }
1033
1034 for (i = 1; i < pfd_count; i++) {
1035 if (watchers[i].pollset == nullptr) {
1036 fd_end_poll(&watchers[i], 0, 0);
1037 } else {
1038 // Wake up all the file descriptors, if we have an invalid one
1039 // we can identify it on the next pollset_work()
1040 fd_end_poll(&watchers[i], 1, 1);
1041 }
1042 }
1043 } else if (r == 0) {
1044 for (i = 1; i < pfd_count; i++) {
1045 fd_end_poll(&watchers[i], 0, 0);
1046 }
1047 } else {
1048 if (pfds[0].revents & POLLIN_CHECK) {
1049 GRPC_TRACE_LOG(polling, INFO) << pollset << ": got_wakeup";
1050 work_combine_error(
1051 &error, grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd));
1052 }
1053 for (i = 1; i < pfd_count; i++) {
1054 if (watchers[i].pollset == nullptr) {
1055 grpc_fd* fd = watchers[i].fd;
1056 if (pfds[i].revents & POLLHUP) {
1057 gpr_atm_no_barrier_store(&fd->pollhup, 1);
1058 }
1059 fd_end_poll(&watchers[i], 0, 0);
1060 } else {
1061 GRPC_TRACE_LOG(polling, INFO)
1062 << pollset << " got_event: " << pfds[i].fd
1063 << " r:" << ((pfds[i].revents & POLLIN_CHECK) != 0)
1064 << " w:" << ((pfds[i].revents & POLLOUT_CHECK) != 0) << " ["
1065 << pfds[i].revents << "]";
1066 // This is a mitigation to prevent poll() from spinning on a
1067 //* POLLHUP https://github.com/grpc/grpc/pull/13665
1068 //
1069 if (pfds[i].revents & POLLHUP) {
1070 gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
1071 }
1072 fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
1073 pfds[i].revents & POLLOUT_CHECK);
1074 }
1075 }
1076 }
1077
1078 if (pfds != pollfd_space) {
1079 // pfds and watchers are in the same memory block pointed to by pfds
1080 gpr_free(pfds);
1081 }
1082
1083 locked = 0;
1084 } else {
1085 pollset->kicked_without_pollers = 0;
1086 }
1087 // Finished execution - start cleaning up.
1088 // Note that we may arrive here from outside the enclosing while() loop.
1089 // In that case we won't loop though as we haven't added worker to the
1090 // worker list, which means nobody could ask us to re-evaluate polling).
1091 done:
1092 if (!locked) {
1093 queued_work |= grpc_core::ExecCtx::Get()->Flush();
1094 gpr_mu_lock(&pollset->mu);
1095 locked = 1;
1096 }
1097 // If we're forced to re-evaluate polling (via pollset_kick with
1098 // GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
1099 // a loop
1100 if (worker.reevaluate_polling_on_wakeup && error.ok()) {
1101 worker.reevaluate_polling_on_wakeup = 0;
1102 pollset->kicked_without_pollers = 0;
1103 if (queued_work || worker.kicked_specifically) {
1104 // If there's queued work on the list, then set the deadline to be
1105 // immediate so we get back out of the polling loop quickly
1106 deadline = grpc_core::Timestamp();
1107 }
1108 keep_polling = 1;
1109 }
1110 }
1111 g_current_thread_poller = nullptr;
1112 if (added_worker) {
1113 remove_worker(pollset, &worker);
1114 g_current_thread_worker = nullptr;
1115 }
1116 // release wakeup fd to the local pool
1117 worker.wakeup_fd->next = pollset->local_wakeup_cache;
1118 pollset->local_wakeup_cache = worker.wakeup_fd;
1119 // check shutdown conditions
1120 if (pollset->shutting_down) {
1121 if (pollset_has_workers(pollset)) {
1122 (void)pollset_kick(pollset, nullptr);
1123 } else if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1124 pollset->called_shutdown = 1;
1125 gpr_mu_unlock(&pollset->mu);
1126 finish_shutdown(pollset);
1127 grpc_core::ExecCtx::Get()->Flush();
1128 // Continuing to access pollset here is safe -- it is the caller's
1129 // responsibility to not destroy when it has outstanding calls to
1130 // pollset_work.
1131 // TODO(dklempner): Can we refactor the shutdown logic to avoid this?
1132 gpr_mu_lock(&pollset->mu);
1133 }
1134 }
1135 if (worker_hdl) *worker_hdl = nullptr;
1136 GRPC_LOG_IF_ERROR("pollset_work", error);
1137 return error;
1138 }
1139
1140 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
1141 CHECK(!pollset->shutting_down);
1142 pollset->shutting_down = 1;
1143 pollset->shutdown_done = closure;
1144 (void)pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1145 if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
1146 pollset->called_shutdown = 1;
1147 finish_shutdown(pollset);
1148 }
1149 }
1150
1151 static int poll_deadline_to_millis_timeout(grpc_core::Timestamp deadline) {
1152 if (deadline == grpc_core::Timestamp::InfFuture()) return -1;
1153 if (deadline.is_process_epoch()) return 0;
1154 int64_t n = (deadline - grpc_core::Timestamp::Now()).millis();
1155 if (n < 0) return 0;
1156 if (n > INT_MAX) return -1;
1157 return static_cast<int>(n);
1158 }
1159
1160 //******************************************************************************
1161 // pollset_set_posix.c
1162 //
1163
1164 static grpc_pollset_set* pollset_set_create(void) {
1165 grpc_pollset_set* pollset_set =
1166 static_cast<grpc_pollset_set*>(gpr_zalloc(sizeof(*pollset_set)));
1167 gpr_mu_init(&pollset_set->mu);
1168 return pollset_set;
1169 }
1170
1171 static void pollset_set_destroy(grpc_pollset_set* pollset_set) {
1172 size_t i;
1173 gpr_mu_destroy(&pollset_set->mu);
1174 for (i = 0; i < pollset_set->fd_count; i++) {
1175 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1176 }
1177 for (i = 0; i < pollset_set->pollset_count; i++) {
1178 grpc_pollset* pollset = pollset_set->pollsets[i];
1179 gpr_mu_lock(&pollset->mu);
1180 pollset->pollset_set_count--;
1181 // check shutdown
1182 if (pollset->shutting_down && !pollset->called_shutdown &&
1183 !pollset_has_observers(pollset)) {
1184 pollset->called_shutdown = 1;
1185 gpr_mu_unlock(&pollset->mu);
1186 finish_shutdown(pollset);
1187 } else {
1188 gpr_mu_unlock(&pollset->mu);
1189 }
1190 }
1191 gpr_free(pollset_set->pollsets);
1192 gpr_free(pollset_set->pollset_sets);
1193 gpr_free(pollset_set->fds);
1194 gpr_free(pollset_set);
1195 }
1196
1197 static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
1198 grpc_pollset* pollset) {
1199 size_t i, j;
1200 gpr_mu_lock(&pollset->mu);
1201 pollset->pollset_set_count++;
1202 gpr_mu_unlock(&pollset->mu);
1203 gpr_mu_lock(&pollset_set->mu);
1204 if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
1205 pollset_set->pollset_capacity =
1206 std::max(size_t{8}, 2 * pollset_set->pollset_capacity);
1207 pollset_set->pollsets = static_cast<grpc_pollset**>(gpr_realloc(
1208 pollset_set->pollsets,
1209 pollset_set->pollset_capacity * sizeof(*pollset_set->pollsets)));
1210 }
1211 pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
1212 for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
1213 if (fd_is_orphaned(pollset_set->fds[i])) {
1214 GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
1215 } else {
1216 pollset_add_fd(pollset, pollset_set->fds[i]);
1217 pollset_set->fds[j++] = pollset_set->fds[i];
1218 }
1219 }
1220 pollset_set->fd_count = j;
1221 gpr_mu_unlock(&pollset_set->mu);
1222 }
1223
1224 static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
1225 grpc_pollset* pollset) {
1226 size_t i;
1227 gpr_mu_lock(&pollset_set->mu);
1228 for (i = 0; i < pollset_set->pollset_count; i++) {
1229 if (pollset_set->pollsets[i] == pollset) {
1230 pollset_set->pollset_count--;
1231 std::swap(pollset_set->pollsets[i],
1232 pollset_set->pollsets[pollset_set->pollset_count]);
1233 break;
1234 }
1235 }
1236 gpr_mu_unlock(&pollset_set->mu);
1237 gpr_mu_lock(&pollset->mu);
1238 pollset->pollset_set_count--;
1239 // check shutdown
1240 if (pollset->shutting_down && !pollset->called_shutdown &&
1241 !pollset_has_observers(pollset)) {
1242 pollset->called_shutdown = 1;
1243 gpr_mu_unlock(&pollset->mu);
1244 finish_shutdown(pollset);
1245 } else {
1246 gpr_mu_unlock(&pollset->mu);
1247 }
1248 }
1249
1250 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1251 grpc_pollset_set* item) {
1252 size_t i, j;
1253 gpr_mu_lock(&bag->mu);
1254 if (bag->pollset_set_count == bag->pollset_set_capacity) {
1255 bag->pollset_set_capacity =
1256 std::max(size_t{8}, 2 * bag->pollset_set_capacity);
1257 bag->pollset_sets = static_cast<grpc_pollset_set**>(
1258 gpr_realloc(bag->pollset_sets,
1259 bag->pollset_set_capacity * sizeof(*bag->pollset_sets)));
1260 }
1261 bag->pollset_sets[bag->pollset_set_count++] = item;
1262 for (i = 0, j = 0; i < bag->fd_count; i++) {
1263 if (fd_is_orphaned(bag->fds[i])) {
1264 GRPC_FD_UNREF(bag->fds[i], "pollset_set");
1265 } else {
1266 pollset_set_add_fd(item, bag->fds[i]);
1267 bag->fds[j++] = bag->fds[i];
1268 }
1269 }
1270 bag->fd_count = j;
1271 gpr_mu_unlock(&bag->mu);
1272 }
1273
1274 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1275 grpc_pollset_set* item) {
1276 size_t i;
1277 gpr_mu_lock(&bag->mu);
1278 for (i = 0; i < bag->pollset_set_count; i++) {
1279 if (bag->pollset_sets[i] == item) {
1280 bag->pollset_set_count--;
1281 std::swap(bag->pollset_sets[i],
1282 bag->pollset_sets[bag->pollset_set_count]);
1283 break;
1284 }
1285 }
1286 gpr_mu_unlock(&bag->mu);
1287 }
1288
1289 static void pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1290 size_t i;
1291 gpr_mu_lock(&pollset_set->mu);
1292 if (pollset_set->fd_count == pollset_set->fd_capacity) {
1293 pollset_set->fd_capacity =
1294 std::max(size_t{8}, 2 * pollset_set->fd_capacity);
1295 pollset_set->fds = static_cast<grpc_fd**>(
1296 gpr_realloc(pollset_set->fds,
1297 pollset_set->fd_capacity * sizeof(*pollset_set->fds)));
1298 }
1299 GRPC_FD_REF(fd, "pollset_set");
1300 pollset_set->fds[pollset_set->fd_count++] = fd;
1301 for (i = 0; i < pollset_set->pollset_count; i++) {
1302 pollset_add_fd(pollset_set->pollsets[i], fd);
1303 }
1304 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1305 pollset_set_add_fd(pollset_set->pollset_sets[i], fd);
1306 }
1307 gpr_mu_unlock(&pollset_set->mu);
1308 }
1309
1310 static void pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
1311 size_t i;
1312 gpr_mu_lock(&pollset_set->mu);
1313 for (i = 0; i < pollset_set->fd_count; i++) {
1314 if (pollset_set->fds[i] == fd) {
1315 pollset_set->fd_count--;
1316 std::swap(pollset_set->fds[i], pollset_set->fds[pollset_set->fd_count]);
1317 GRPC_FD_UNREF(fd, "pollset_set");
1318 break;
1319 }
1320 }
1321 for (i = 0; i < pollset_set->pollset_set_count; i++) {
1322 pollset_set_del_fd(pollset_set->pollset_sets[i], fd);
1323 }
1324 gpr_mu_unlock(&pollset_set->mu);
1325 }
1326
1327 //******************************************************************************
1328 // event engine binding
1329 //
1330
1331 static bool is_any_background_poller_thread(void) { return false; }
1332
1333 static void shutdown_background_closure(void) {}
1334
1335 static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
1336 grpc_error_handle /*error*/) {
1337 return false;
1338 }
1339
1340 // Called by the child process's post-fork handler to close open fds, including
1341 // worker wakeup fds. This allows gRPC to shutdown in the child process without
1342 // interfering with connections or RPCs ongoing in the parent.
1343 static void reset_event_manager_on_fork() {
1344 gpr_mu_lock(&fork_fd_list_mu);
1345 while (fork_fd_list_head != nullptr) {
1346 if (fork_fd_list_head->fd != nullptr) {
1347 if (!fork_fd_list_head->fd->closed) {
1348 close(fork_fd_list_head->fd->fd);
1349 }
1350 fork_fd_list_head->fd->fd = -1;
1351 } else {
1352 close(fork_fd_list_head->cached_wakeup_fd->fd.read_fd);
1353 fork_fd_list_head->cached_wakeup_fd->fd.read_fd = -1;
1354 close(fork_fd_list_head->cached_wakeup_fd->fd.write_fd);
1355 fork_fd_list_head->cached_wakeup_fd->fd.write_fd = -1;
1356 }
1357 fork_fd_list_head = fork_fd_list_head->next;
1358 }
1359 gpr_mu_unlock(&fork_fd_list_mu);
1360 }
1361
1362 const grpc_event_engine_vtable grpc_ev_poll_posix = {
1363 sizeof(grpc_pollset),
1364 false,
1365 false,
1366
1367 fd_create,
1368 fd_wrapped_fd,
1369 fd_orphan,
1370 fd_shutdown,
1371 fd_notify_on_read,
1372 fd_notify_on_write,
1373 fd_notify_on_error,
1374 fd_set_readable,
1375 fd_set_writable,
1376 fd_set_error,
1377 fd_is_shutdown,
1378
1379 pollset_init,
1380 pollset_shutdown,
1381 pollset_destroy,
1382 pollset_work,
1383 pollset_kick,
1384 pollset_add_fd,
1385
1386 pollset_set_create,
1387 pollset_set_destroy,
1388 pollset_set_add_pollset,
1389 pollset_set_del_pollset,
1390 pollset_set_add_pollset_set,
1391 pollset_set_del_pollset_set,
1392 pollset_set_add_fd,
1393 pollset_set_del_fd,
1394
1395 is_any_background_poller_thread,
1396 /* name = */ "poll",
1397 // check_engine_available =
1398 [](bool) {
1399 if (!grpc_has_wakeup_fd()) {
1400 LOG(ERROR) << "Skipping poll because of no wakeup fd.";
1401 return false;
1402 }
1403 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1404 return false;
1405 }
1406 if (grpc_core::Fork::Enabled()) {
1407 if (grpc_core::Fork::RegisterResetChildPollingEngineFunc(
1408 reset_event_manager_on_fork)) {
1409 track_fds_for_fork = true;
1410 gpr_mu_init(&fork_fd_list_mu);
1411 }
1412 }
1413 return true;
1414 },
1415 /* init_engine = */ []() {},
1416 /* shutdown_engine = */ shutdown_background_closure,
1417 []() {},
1418 add_closure_to_background_poller,
1419
1420 fd_set_pre_allocated,
1421 };
1422
1423 namespace {
1424
1425 grpc_poll_function_type real_poll_function;
1426
1427 int phony_poll(struct pollfd fds[], nfds_t nfds, int timeout) {
1428 if (timeout == 0) {
1429 return real_poll_function(fds, nfds, 0);
1430 } else {
1431 grpc_core::Crash("Attempted a blocking poll when declared non-polling.");
1432 return -1;
1433 }
1434 }
1435
1436 } // namespace
1437
1438 const grpc_event_engine_vtable grpc_ev_none_posix = []() {
1439 grpc_event_engine_vtable v = grpc_ev_poll_posix;
1440 v.check_engine_available = [](bool explicit_request) {
1441 if (!explicit_request) return false;
1442 // return the simplest engine as a phony but also override the poller
1443 if (!grpc_ev_poll_posix.check_engine_available(explicit_request)) {
1444 return false;
1445 }
1446 real_poll_function = grpc_poll_function;
1447 grpc_poll_function = phony_poll;
1448 return true;
1449 };
1450 v.name = "none";
1451 v.init_engine = []() {};
1452 v.shutdown_engine = []() {};
1453 return v;
1454 }();
1455
1456 #endif // GRPC_POSIX_SOCKET_EV_POLL
1457