• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #include <grpc/support/log.h>
24 
25 /* This polling engine is only relevant on linux kernels supporting epoll
26    epoll_create() or epoll_create1() */
27 #ifdef GRPC_LINUX_EPOLL
28 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
29 
30 #include <assert.h>
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <limits.h>
34 #include <poll.h>
35 #include <pthread.h>
36 #include <string.h>
37 #include <sys/epoll.h>
38 #include <sys/socket.h>
39 #include <unistd.h>
40 
41 #include <string>
42 #include <vector>
43 
44 #include "absl/strings/str_cat.h"
45 #include "absl/strings/str_format.h"
46 #include "absl/strings/str_join.h"
47 
48 #include <grpc/support/alloc.h>
49 #include <grpc/support/cpu.h>
50 
51 #include "src/core/lib/debug/stats.h"
52 #include "src/core/lib/gpr/string.h"
53 #include "src/core/lib/gpr/tls.h"
54 #include "src/core/lib/gpr/useful.h"
55 #include "src/core/lib/gprpp/manual_constructor.h"
56 #include "src/core/lib/iomgr/block_annotate.h"
57 #include "src/core/lib/iomgr/ev_posix.h"
58 #include "src/core/lib/iomgr/iomgr_internal.h"
59 #include "src/core/lib/iomgr/lockfree_event.h"
60 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
61 #include "src/core/lib/profiling/timers.h"
62 
63 static grpc_wakeup_fd global_wakeup_fd;
64 
65 /*******************************************************************************
66  * Singleton epoll set related fields
67  */
68 
69 #define MAX_EPOLL_EVENTS 100
70 #define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
71 
72 /* NOTE ON SYNCHRONIZATION:
73  * - Fields in this struct are only modified by the designated poller. Hence
74  *   there is no need for any locks to protect the struct.
75  * - num_events and cursor fields have to be of atomic type to provide memory
76  *   visibility guarantees only. i.e In case of multiple pollers, the designated
77  *   polling thread keeps changing; the thread that wrote these values may be
78  *   different from the thread reading the values
79  */
80 typedef struct epoll_set {
81   int epfd;
82 
83   /* The epoll_events after the last call to epoll_wait() */
84   struct epoll_event events[MAX_EPOLL_EVENTS];
85 
86   /* The number of epoll_events after the last call to epoll_wait() */
87   gpr_atm num_events;
88 
89   /* Index of the first event in epoll_events that has to be processed. This
90    * field is only valid if num_events > 0 */
91   gpr_atm cursor;
92 } epoll_set;
93 
94 /* The global singleton epoll set */
95 static epoll_set g_epoll_set;
96 
epoll_create_and_cloexec()97 static int epoll_create_and_cloexec() {
98 #ifdef GRPC_LINUX_EPOLL_CREATE1
99   int fd = epoll_create1(EPOLL_CLOEXEC);
100   if (fd < 0) {
101     gpr_log(GPR_ERROR, "epoll_create1 unavailable");
102   }
103 #else
104   int fd = epoll_create(MAX_EPOLL_EVENTS);
105   if (fd < 0) {
106     gpr_log(GPR_ERROR, "epoll_create unavailable");
107   } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
108     gpr_log(GPR_ERROR, "fcntl following epoll_create failed");
109     return -1;
110   }
111 #endif
112   return fd;
113 }
114 
115 /* Must be called *only* once */
epoll_set_init()116 static bool epoll_set_init() {
117   g_epoll_set.epfd = epoll_create_and_cloexec();
118   if (g_epoll_set.epfd < 0) {
119     return false;
120   }
121 
122   gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
123   gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
124   gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
125   return true;
126 }
127 
128 /* epoll_set_init() MUST be called before calling this. */
epoll_set_shutdown()129 static void epoll_set_shutdown() {
130   if (g_epoll_set.epfd >= 0) {
131     close(g_epoll_set.epfd);
132     g_epoll_set.epfd = -1;
133   }
134 }
135 
136 /*******************************************************************************
137  * Fd Declarations
138  */
139 
140 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
141 struct grpc_fork_fd_list {
142   grpc_fd* fd;
143   grpc_fd* next;
144   grpc_fd* prev;
145 };
146 
147 struct grpc_fd {
148   int fd;
149 
150   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
151   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
152   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
153 
154   struct grpc_fd* freelist_next;
155 
156   grpc_iomgr_object iomgr_object;
157 
158   /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
159   grpc_fork_fd_list* fork_fd_list;
160 };
161 
162 static void fd_global_init(void);
163 static void fd_global_shutdown(void);
164 
165 /*******************************************************************************
166  * Pollset Declarations
167  */
168 
169 typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
170 
kick_state_string(kick_state st)171 static const char* kick_state_string(kick_state st) {
172   switch (st) {
173     case UNKICKED:
174       return "UNKICKED";
175     case KICKED:
176       return "KICKED";
177     case DESIGNATED_POLLER:
178       return "DESIGNATED_POLLER";
179   }
180   GPR_UNREACHABLE_CODE(return "UNKNOWN");
181 }
182 
183 struct grpc_pollset_worker {
184   kick_state state;
185   int kick_state_mutator;  // which line of code last changed kick state
186   bool initialized_cv;
187   grpc_pollset_worker* next;
188   grpc_pollset_worker* prev;
189   gpr_cv cv;
190   grpc_closure_list schedule_on_end_work;
191 };
192 
193 #define SET_KICK_STATE(worker, kick_state)   \
194   do {                                       \
195     (worker)->state = (kick_state);          \
196     (worker)->kick_state_mutator = __LINE__; \
197   } while (false)
198 
199 #define MAX_NEIGHBORHOODS 1024
200 
201 typedef struct pollset_neighborhood {
202   union {
203     char pad[GPR_CACHELINE_SIZE];
204     struct {
205       gpr_mu mu;
206       grpc_pollset* active_root;
207     };
208   };
209 } pollset_neighborhood;
210 
211 struct grpc_pollset {
212   gpr_mu mu;
213   pollset_neighborhood* neighborhood;
214   bool reassigning_neighborhood;
215   grpc_pollset_worker* root_worker;
216   bool kicked_without_poller;
217 
218   /* Set to true if the pollset is observed to have no workers available to
219      poll */
220   bool seen_inactive;
221   bool shutting_down;             /* Is the pollset shutting down ? */
222   grpc_closure* shutdown_closure; /* Called after shutdown is complete */
223 
224   /* Number of workers who are *about-to* attach themselves to the pollset
225    * worker list */
226   int begin_refs;
227 
228   grpc_pollset* next;
229   grpc_pollset* prev;
230 };
231 
232 /*******************************************************************************
233  * Pollset-set Declarations
234  */
235 
236 struct grpc_pollset_set {
237   char unused;
238 };
239 
240 /*******************************************************************************
241  * Common helpers
242  */
243 
append_error(grpc_error ** composite,grpc_error * error,const char * desc)244 static bool append_error(grpc_error** composite, grpc_error* error,
245                          const char* desc) {
246   if (error == GRPC_ERROR_NONE) return true;
247   if (*composite == GRPC_ERROR_NONE) {
248     *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
249   }
250   *composite = grpc_error_add_child(*composite, error);
251   return false;
252 }
253 
254 /*******************************************************************************
255  * Fd Definitions
256  */
257 
258 /* We need to keep a freelist not because of any concerns of malloc performance
259  * but instead so that implementations with multiple threads in (for example)
260  * epoll_wait deal with the race between pollset removal and incoming poll
261  * notifications.
262  *
263  * The problem is that the poller ultimately holds a reference to this
264  * object, so it is very difficult to know when is safe to free it, at least
265  * without some expensive synchronization.
266  *
267  * If we keep the object freelisted, in the worst case losing this race just
268  * becomes a spurious read notification on a reused fd.
269  */
270 
271 /* The alarm system needs to be able to wakeup 'some poller' sometimes
272  * (specifically when a new alarm needs to be triggered earlier than the next
273  * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
274  * case occurs. */
275 
276 static grpc_fd* fd_freelist = nullptr;
277 static gpr_mu fd_freelist_mu;
278 
279 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
280 static grpc_fd* fork_fd_list_head = nullptr;
281 static gpr_mu fork_fd_list_mu;
282 
fd_global_init(void)283 static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
284 
fd_global_shutdown(void)285 static void fd_global_shutdown(void) {
286   // TODO(guantaol): We don't have a reasonable explanation about this
287   // lock()/unlock() pattern. It can be a valid barrier if there is at most one
288   // pending lock() at this point. Otherwise, there is still a possibility of
289   // use-after-free race. Need to reason about the code and/or clean it up.
290   gpr_mu_lock(&fd_freelist_mu);
291   gpr_mu_unlock(&fd_freelist_mu);
292   while (fd_freelist != nullptr) {
293     grpc_fd* fd = fd_freelist;
294     fd_freelist = fd_freelist->freelist_next;
295     gpr_free(fd);
296   }
297   gpr_mu_destroy(&fd_freelist_mu);
298 }
299 
fork_fd_list_add_grpc_fd(grpc_fd * fd)300 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
301   if (grpc_core::Fork::Enabled()) {
302     gpr_mu_lock(&fork_fd_list_mu);
303     fd->fork_fd_list =
304         static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
305     fd->fork_fd_list->next = fork_fd_list_head;
306     fd->fork_fd_list->prev = nullptr;
307     if (fork_fd_list_head != nullptr) {
308       fork_fd_list_head->fork_fd_list->prev = fd;
309     }
310     fork_fd_list_head = fd;
311     gpr_mu_unlock(&fork_fd_list_mu);
312   }
313 }
314 
fork_fd_list_remove_grpc_fd(grpc_fd * fd)315 static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
316   if (grpc_core::Fork::Enabled()) {
317     gpr_mu_lock(&fork_fd_list_mu);
318     if (fork_fd_list_head == fd) {
319       fork_fd_list_head = fd->fork_fd_list->next;
320     }
321     if (fd->fork_fd_list->prev != nullptr) {
322       fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next;
323     }
324     if (fd->fork_fd_list->next != nullptr) {
325       fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev;
326     }
327     gpr_free(fd->fork_fd_list);
328     gpr_mu_unlock(&fork_fd_list_mu);
329   }
330 }
331 
fd_create(int fd,const char * name,bool track_err)332 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
333   grpc_fd* new_fd = nullptr;
334 
335   gpr_mu_lock(&fd_freelist_mu);
336   if (fd_freelist != nullptr) {
337     new_fd = fd_freelist;
338     fd_freelist = fd_freelist->freelist_next;
339   }
340   gpr_mu_unlock(&fd_freelist_mu);
341 
342   if (new_fd == nullptr) {
343     new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
344     new_fd->read_closure.Init();
345     new_fd->write_closure.Init();
346     new_fd->error_closure.Init();
347   }
348   new_fd->fd = fd;
349   new_fd->read_closure->InitEvent();
350   new_fd->write_closure->InitEvent();
351   new_fd->error_closure->InitEvent();
352 
353   new_fd->freelist_next = nullptr;
354 
355   std::string fd_name = absl::StrCat(name, " fd=", fd);
356   grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name.c_str());
357   fork_fd_list_add_grpc_fd(new_fd);
358 #ifndef NDEBUG
359   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
360     gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name.c_str());
361   }
362 #endif
363 
364   struct epoll_event ev;
365   ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
366   /* Use the least significant bit of ev.data.ptr to store track_err. We expect
367    * the addresses to be word aligned. We need to store track_err to avoid
368    * synchronization issues when accessing it after receiving an event.
369    * Accessing fd would be a data race there because the fd might have been
370    * returned to the free list at that point. */
371   ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
372                                         (track_err ? 1 : 0));
373   if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
374     gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
375   }
376 
377   return new_fd;
378 }
379 
fd_wrapped_fd(grpc_fd * fd)380 static int fd_wrapped_fd(grpc_fd* fd) { return fd->fd; }
381 
382 /* if 'releasing_fd' is true, it means that we are going to detach the internal
383  * fd from grpc_fd structure (i.e which means we should not be calling
384  * shutdown() syscall on that fd) */
fd_shutdown_internal(grpc_fd * fd,grpc_error * why,bool releasing_fd)385 static void fd_shutdown_internal(grpc_fd* fd, grpc_error* why,
386                                  bool releasing_fd) {
387   if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
388     if (!releasing_fd) {
389       shutdown(fd->fd, SHUT_RDWR);
390     } else {
391       /* we need a dummy event for earlier linux versions. */
392       epoll_event dummy_event;
393       if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_DEL, fd->fd, &dummy_event) !=
394           0) {
395         gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
396       }
397     }
398     fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
399     fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
400   }
401   GRPC_ERROR_UNREF(why);
402 }
403 
404 /* Might be called multiple times */
fd_shutdown(grpc_fd * fd,grpc_error * why)405 static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
406   fd_shutdown_internal(fd, why, false);
407 }
408 
fd_orphan(grpc_fd * fd,grpc_closure * on_done,int * release_fd,const char * reason)409 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
410                       const char* reason) {
411   grpc_error* error = GRPC_ERROR_NONE;
412   bool is_release_fd = (release_fd != nullptr);
413 
414   if (!fd->read_closure->IsShutdown()) {
415     fd_shutdown_internal(fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
416                          is_release_fd);
417   }
418 
419   /* If release_fd is not NULL, we should be relinquishing control of the file
420      descriptor fd->fd (but we still own the grpc_fd structure). */
421   if (is_release_fd) {
422     *release_fd = fd->fd;
423   } else {
424     close(fd->fd);
425   }
426 
427   grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_REF(error));
428 
429   grpc_iomgr_unregister_object(&fd->iomgr_object);
430   fork_fd_list_remove_grpc_fd(fd);
431   fd->read_closure->DestroyEvent();
432   fd->write_closure->DestroyEvent();
433   fd->error_closure->DestroyEvent();
434 
435   gpr_mu_lock(&fd_freelist_mu);
436   fd->freelist_next = fd_freelist;
437   fd_freelist = fd;
438   gpr_mu_unlock(&fd_freelist_mu);
439 }
440 
fd_is_shutdown(grpc_fd * fd)441 static bool fd_is_shutdown(grpc_fd* fd) {
442   return fd->read_closure->IsShutdown();
443 }
444 
fd_notify_on_read(grpc_fd * fd,grpc_closure * closure)445 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
446   fd->read_closure->NotifyOn(closure);
447 }
448 
fd_notify_on_write(grpc_fd * fd,grpc_closure * closure)449 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
450   fd->write_closure->NotifyOn(closure);
451 }
452 
fd_notify_on_error(grpc_fd * fd,grpc_closure * closure)453 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
454   fd->error_closure->NotifyOn(closure);
455 }
456 
fd_become_readable(grpc_fd * fd)457 static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
458 
fd_become_writable(grpc_fd * fd)459 static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
460 
fd_has_errors(grpc_fd * fd)461 static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
462 
463 /*******************************************************************************
464  * Pollset Definitions
465  */
466 
467 GPR_TLS_DECL(g_current_thread_pollset);
468 GPR_TLS_DECL(g_current_thread_worker);
469 
470 /* The designated poller */
471 static gpr_atm g_active_poller;
472 
473 static pollset_neighborhood* g_neighborhoods;
474 static size_t g_num_neighborhoods;
475 
476 /* Return true if first in list */
worker_insert(grpc_pollset * pollset,grpc_pollset_worker * worker)477 static bool worker_insert(grpc_pollset* pollset, grpc_pollset_worker* worker) {
478   if (pollset->root_worker == nullptr) {
479     pollset->root_worker = worker;
480     worker->next = worker->prev = worker;
481     return true;
482   } else {
483     worker->next = pollset->root_worker;
484     worker->prev = worker->next->prev;
485     worker->next->prev = worker;
486     worker->prev->next = worker;
487     return false;
488   }
489 }
490 
491 /* Return true if last in list */
492 typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
493 
worker_remove(grpc_pollset * pollset,grpc_pollset_worker * worker)494 static worker_remove_result worker_remove(grpc_pollset* pollset,
495                                           grpc_pollset_worker* worker) {
496   if (worker == pollset->root_worker) {
497     if (worker == worker->next) {
498       pollset->root_worker = nullptr;
499       return EMPTIED;
500     } else {
501       pollset->root_worker = worker->next;
502       worker->prev->next = worker->next;
503       worker->next->prev = worker->prev;
504       return NEW_ROOT;
505     }
506   } else {
507     worker->prev->next = worker->next;
508     worker->next->prev = worker->prev;
509     return REMOVED;
510   }
511 }
512 
choose_neighborhood(void)513 static size_t choose_neighborhood(void) {
514   return static_cast<size_t>(gpr_cpu_current_cpu()) % g_num_neighborhoods;
515 }
516 
pollset_global_init(void)517 static grpc_error* pollset_global_init(void) {
518   gpr_tls_init(&g_current_thread_pollset);
519   gpr_tls_init(&g_current_thread_worker);
520   gpr_atm_no_barrier_store(&g_active_poller, 0);
521   global_wakeup_fd.read_fd = -1;
522   grpc_error* err = grpc_wakeup_fd_init(&global_wakeup_fd);
523   if (err != GRPC_ERROR_NONE) return err;
524   struct epoll_event ev;
525   ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
526   ev.data.ptr = &global_wakeup_fd;
527   if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
528                 &ev) != 0) {
529     return GRPC_OS_ERROR(errno, "epoll_ctl");
530   }
531   g_num_neighborhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBORHOODS);
532   g_neighborhoods = static_cast<pollset_neighborhood*>(
533       gpr_zalloc(sizeof(*g_neighborhoods) * g_num_neighborhoods));
534   for (size_t i = 0; i < g_num_neighborhoods; i++) {
535     gpr_mu_init(&g_neighborhoods[i].mu);
536   }
537   return GRPC_ERROR_NONE;
538 }
539 
pollset_global_shutdown(void)540 static void pollset_global_shutdown(void) {
541   gpr_tls_destroy(&g_current_thread_pollset);
542   gpr_tls_destroy(&g_current_thread_worker);
543   if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
544   for (size_t i = 0; i < g_num_neighborhoods; i++) {
545     gpr_mu_destroy(&g_neighborhoods[i].mu);
546   }
547   gpr_free(g_neighborhoods);
548 }
549 
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)550 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
551   gpr_mu_init(&pollset->mu);
552   *mu = &pollset->mu;
553   pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
554   pollset->reassigning_neighborhood = false;
555   pollset->root_worker = nullptr;
556   pollset->kicked_without_poller = false;
557   pollset->seen_inactive = true;
558   pollset->shutting_down = false;
559   pollset->shutdown_closure = nullptr;
560   pollset->begin_refs = 0;
561   pollset->next = pollset->prev = nullptr;
562 }
563 
pollset_destroy(grpc_pollset * pollset)564 static void pollset_destroy(grpc_pollset* pollset) {
565   gpr_mu_lock(&pollset->mu);
566   if (!pollset->seen_inactive) {
567     pollset_neighborhood* neighborhood = pollset->neighborhood;
568     gpr_mu_unlock(&pollset->mu);
569   retry_lock_neighborhood:
570     gpr_mu_lock(&neighborhood->mu);
571     gpr_mu_lock(&pollset->mu);
572     if (!pollset->seen_inactive) {
573       if (pollset->neighborhood != neighborhood) {
574         gpr_mu_unlock(&neighborhood->mu);
575         neighborhood = pollset->neighborhood;
576         gpr_mu_unlock(&pollset->mu);
577         goto retry_lock_neighborhood;
578       }
579       pollset->prev->next = pollset->next;
580       pollset->next->prev = pollset->prev;
581       if (pollset == pollset->neighborhood->active_root) {
582         pollset->neighborhood->active_root =
583             pollset->next == pollset ? nullptr : pollset->next;
584       }
585     }
586     gpr_mu_unlock(&pollset->neighborhood->mu);
587   }
588   gpr_mu_unlock(&pollset->mu);
589   gpr_mu_destroy(&pollset->mu);
590 }
591 
pollset_kick_all(grpc_pollset * pollset)592 static grpc_error* pollset_kick_all(grpc_pollset* pollset) {
593   GPR_TIMER_SCOPE("pollset_kick_all", 0);
594   grpc_error* error = GRPC_ERROR_NONE;
595   if (pollset->root_worker != nullptr) {
596     grpc_pollset_worker* worker = pollset->root_worker;
597     do {
598       GRPC_STATS_INC_POLLSET_KICK();
599       switch (worker->state) {
600         case KICKED:
601           GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
602           break;
603         case UNKICKED:
604           SET_KICK_STATE(worker, KICKED);
605           if (worker->initialized_cv) {
606             GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
607             gpr_cv_signal(&worker->cv);
608           }
609           break;
610         case DESIGNATED_POLLER:
611           GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
612           SET_KICK_STATE(worker, KICKED);
613           append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
614                        "pollset_kick_all");
615           break;
616       }
617 
618       worker = worker->next;
619     } while (worker != pollset->root_worker);
620   }
621   // TODO(sreek): Check if we need to set 'kicked_without_poller' to true here
622   // in the else case
623   return error;
624 }
625 
pollset_maybe_finish_shutdown(grpc_pollset * pollset)626 static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
627   if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
628       pollset->begin_refs == 0) {
629     GPR_TIMER_MARK("pollset_finish_shutdown", 0);
630     grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_closure,
631                             GRPC_ERROR_NONE);
632     pollset->shutdown_closure = nullptr;
633   }
634 }
635 
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)636 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
637   GPR_TIMER_SCOPE("pollset_shutdown", 0);
638   GPR_ASSERT(pollset->shutdown_closure == nullptr);
639   GPR_ASSERT(!pollset->shutting_down);
640   pollset->shutdown_closure = closure;
641   pollset->shutting_down = true;
642   GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
643   pollset_maybe_finish_shutdown(pollset);
644 }
645 
poll_deadline_to_millis_timeout(grpc_millis millis)646 static int poll_deadline_to_millis_timeout(grpc_millis millis) {
647   if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
648   grpc_millis delta = millis - grpc_core::ExecCtx::Get()->Now();
649   if (delta > INT_MAX) {
650     return INT_MAX;
651   } else if (delta < 0) {
652     return 0;
653   } else {
654     return static_cast<int>(delta);
655   }
656 }
657 
658 /* Process the epoll events found by do_epoll_wait() function.
659    - g_epoll_set.cursor points to the index of the first event to be processed
660    - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
661      updates the g_epoll_set.cursor
662 
663    NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
664    called by g_active_poller thread. So there is no need for synchronization
665    when accessing fields in g_epoll_set */
process_epoll_events(grpc_pollset *)666 static grpc_error* process_epoll_events(grpc_pollset* /*pollset*/) {
667   GPR_TIMER_SCOPE("process_epoll_events", 0);
668 
669   static const char* err_desc = "process_events";
670   grpc_error* error = GRPC_ERROR_NONE;
671   long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
672   long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
673   for (int idx = 0;
674        (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
675        idx++) {
676     long c = cursor++;
677     struct epoll_event* ev = &g_epoll_set.events[c];
678     void* data_ptr = ev->data.ptr;
679 
680     if (data_ptr == &global_wakeup_fd) {
681       append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
682                    err_desc);
683     } else {
684       grpc_fd* fd = reinterpret_cast<grpc_fd*>(
685           reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
686       bool track_err =
687           reinterpret_cast<intptr_t>(data_ptr) & static_cast<intptr_t>(1);
688       bool cancel = (ev->events & EPOLLHUP) != 0;
689       bool error = (ev->events & EPOLLERR) != 0;
690       bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
691       bool write_ev = (ev->events & EPOLLOUT) != 0;
692       bool err_fallback = error && !track_err;
693 
694       if (error && !err_fallback) {
695         fd_has_errors(fd);
696       }
697 
698       if (read_ev || cancel || err_fallback) {
699         fd_become_readable(fd);
700       }
701 
702       if (write_ev || cancel || err_fallback) {
703         fd_become_writable(fd);
704       }
705     }
706   }
707   gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
708   return error;
709 }
710 
711 /* Do epoll_wait and store the events in g_epoll_set.events field. This does not
712    "process" any of the events yet; that is done in process_epoll_events().
713    *See process_epoll_events() function for more details.
714 
715    NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
716    (i.e the designated poller thread) will be calling this function. So there is
717    no need for any synchronization when accesing fields in g_epoll_set */
do_epoll_wait(grpc_pollset * ps,grpc_millis deadline)718 static grpc_error* do_epoll_wait(grpc_pollset* ps, grpc_millis deadline) {
719   GPR_TIMER_SCOPE("do_epoll_wait", 0);
720 
721   int r;
722   int timeout = poll_deadline_to_millis_timeout(deadline);
723   if (timeout != 0) {
724     GRPC_SCHEDULING_START_BLOCKING_REGION;
725   }
726   do {
727     GRPC_STATS_INC_SYSCALL_POLL();
728     r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
729                    timeout);
730   } while (r < 0 && errno == EINTR);
731   if (timeout != 0) {
732     GRPC_SCHEDULING_END_BLOCKING_REGION;
733   }
734 
735   if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
736 
737   GRPC_STATS_INC_POLL_EVENTS_RETURNED(r);
738 
739   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
740     gpr_log(GPR_INFO, "ps: %p poll got %d events", ps, r);
741   }
742 
743   gpr_atm_rel_store(&g_epoll_set.num_events, r);
744   gpr_atm_rel_store(&g_epoll_set.cursor, 0);
745 
746   return GRPC_ERROR_NONE;
747 }
748 
begin_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl,grpc_millis deadline)749 static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
750                          grpc_pollset_worker** worker_hdl,
751                          grpc_millis deadline) {
752   GPR_TIMER_SCOPE("begin_worker", 0);
753   if (worker_hdl != nullptr) *worker_hdl = worker;
754   worker->initialized_cv = false;
755   SET_KICK_STATE(worker, UNKICKED);
756   worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
757   pollset->begin_refs++;
758 
759   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
760     gpr_log(GPR_INFO, "PS:%p BEGIN_STARTS:%p", pollset, worker);
761   }
762 
763   if (pollset->seen_inactive) {
764     // pollset has been observed to be inactive, we need to move back to the
765     // active list
766     bool is_reassigning = false;
767     if (!pollset->reassigning_neighborhood) {
768       is_reassigning = true;
769       pollset->reassigning_neighborhood = true;
770       pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
771     }
772     pollset_neighborhood* neighborhood = pollset->neighborhood;
773     gpr_mu_unlock(&pollset->mu);
774   // pollset unlocked: state may change (even worker->kick_state)
775   retry_lock_neighborhood:
776     gpr_mu_lock(&neighborhood->mu);
777     gpr_mu_lock(&pollset->mu);
778     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
779       gpr_log(GPR_INFO, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
780               pollset, worker, kick_state_string(worker->state),
781               is_reassigning);
782     }
783     if (pollset->seen_inactive) {
784       if (neighborhood != pollset->neighborhood) {
785         gpr_mu_unlock(&neighborhood->mu);
786         neighborhood = pollset->neighborhood;
787         gpr_mu_unlock(&pollset->mu);
788         goto retry_lock_neighborhood;
789       }
790 
791       /* In the brief time we released the pollset locks above, the worker MAY
792          have been kicked. In this case, the worker should get out of this
793          pollset ASAP and hence this should neither add the pollset to
794          neighborhood nor mark the pollset as active.
795 
796          On a side note, the only way a worker's kick state could have changed
797          at this point is if it were "kicked specifically". Since the worker has
798          not added itself to the pollset yet (by calling worker_insert()), it is
799          not visible in the "kick any" path yet */
800       if (worker->state == UNKICKED) {
801         pollset->seen_inactive = false;
802         if (neighborhood->active_root == nullptr) {
803           neighborhood->active_root = pollset->next = pollset->prev = pollset;
804           /* Make this the designated poller if there isn't one already */
805           if (worker->state == UNKICKED &&
806               gpr_atm_no_barrier_cas(&g_active_poller, 0,
807                                      reinterpret_cast<gpr_atm>(worker))) {
808             SET_KICK_STATE(worker, DESIGNATED_POLLER);
809           }
810         } else {
811           pollset->next = neighborhood->active_root;
812           pollset->prev = pollset->next->prev;
813           pollset->next->prev = pollset->prev->next = pollset;
814         }
815       }
816     }
817     if (is_reassigning) {
818       GPR_ASSERT(pollset->reassigning_neighborhood);
819       pollset->reassigning_neighborhood = false;
820     }
821     gpr_mu_unlock(&neighborhood->mu);
822   }
823 
824   worker_insert(pollset, worker);
825   pollset->begin_refs--;
826   if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
827     GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
828     worker->initialized_cv = true;
829     gpr_cv_init(&worker->cv);
830     while (worker->state == UNKICKED && !pollset->shutting_down) {
831       if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
832         gpr_log(GPR_INFO, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
833                 pollset, worker, kick_state_string(worker->state),
834                 pollset->shutting_down);
835       }
836 
837       if (gpr_cv_wait(&worker->cv, &pollset->mu,
838                       grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC)) &&
839           worker->state == UNKICKED) {
840         /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
841            received a kick */
842         SET_KICK_STATE(worker, KICKED);
843       }
844     }
845     grpc_core::ExecCtx::Get()->InvalidateNow();
846   }
847 
848   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
849     gpr_log(GPR_INFO,
850             "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
851             "kicked_without_poller: %d",
852             pollset, worker, kick_state_string(worker->state),
853             pollset->shutting_down, pollset->kicked_without_poller);
854   }
855 
856   /* We release pollset lock in this function at a couple of places:
857    *   1. Briefly when assigning pollset to a neighborhood
858    *   2. When doing gpr_cv_wait()
859    * It is possible that 'kicked_without_poller' was set to true during (1) and
860    * 'shutting_down' is set to true during (1) or (2). If either of them is
861    * true, this worker cannot do polling */
862   /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
863    * case; especially when the worker is the DESIGNATED_POLLER */
864 
865   if (pollset->kicked_without_poller) {
866     pollset->kicked_without_poller = false;
867     return false;
868   }
869 
870   return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
871 }
872 
check_neighborhood_for_available_poller(pollset_neighborhood * neighborhood)873 static bool check_neighborhood_for_available_poller(
874     pollset_neighborhood* neighborhood) {
875   GPR_TIMER_SCOPE("check_neighborhood_for_available_poller", 0);
876   bool found_worker = false;
877   do {
878     grpc_pollset* inspect = neighborhood->active_root;
879     if (inspect == nullptr) {
880       break;
881     }
882     gpr_mu_lock(&inspect->mu);
883     GPR_ASSERT(!inspect->seen_inactive);
884     grpc_pollset_worker* inspect_worker = inspect->root_worker;
885     if (inspect_worker != nullptr) {
886       do {
887         switch (inspect_worker->state) {
888           case UNKICKED:
889             if (gpr_atm_no_barrier_cas(
890                     &g_active_poller, 0,
891                     reinterpret_cast<gpr_atm>(inspect_worker))) {
892               if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
893                 gpr_log(GPR_INFO, " .. choose next poller to be %p",
894                         inspect_worker);
895               }
896               SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
897               if (inspect_worker->initialized_cv) {
898                 GPR_TIMER_MARK("signal worker", 0);
899                 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
900                 gpr_cv_signal(&inspect_worker->cv);
901               }
902             } else {
903               if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
904                 gpr_log(GPR_INFO, " .. beaten to choose next poller");
905               }
906             }
907             // even if we didn't win the cas, there's a worker, we can stop
908             found_worker = true;
909             break;
910           case KICKED:
911             break;
912           case DESIGNATED_POLLER:
913             found_worker = true;  // ok, so someone else found the worker, but
914                                   // we'll accept that
915             break;
916         }
917         inspect_worker = inspect_worker->next;
918       } while (!found_worker && inspect_worker != inspect->root_worker);
919     }
920     if (!found_worker) {
921       if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
922         gpr_log(GPR_INFO, " .. mark pollset %p inactive", inspect);
923       }
924       inspect->seen_inactive = true;
925       if (inspect == neighborhood->active_root) {
926         neighborhood->active_root =
927             inspect->next == inspect ? nullptr : inspect->next;
928       }
929       inspect->next->prev = inspect->prev;
930       inspect->prev->next = inspect->next;
931       inspect->next = inspect->prev = nullptr;
932     }
933     gpr_mu_unlock(&inspect->mu);
934   } while (!found_worker);
935   return found_worker;
936 }
937 
end_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl)938 static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
939                        grpc_pollset_worker** worker_hdl) {
940   GPR_TIMER_SCOPE("end_worker", 0);
941   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
942     gpr_log(GPR_INFO, "PS:%p END_WORKER:%p", pollset, worker);
943   }
944   if (worker_hdl != nullptr) *worker_hdl = nullptr;
945   /* Make sure we appear kicked */
946   SET_KICK_STATE(worker, KICKED);
947   grpc_closure_list_move(&worker->schedule_on_end_work,
948                          grpc_core::ExecCtx::Get()->closure_list());
949   if (gpr_atm_no_barrier_load(&g_active_poller) ==
950       reinterpret_cast<gpr_atm>(worker)) {
951     if (worker->next != worker && worker->next->state == UNKICKED) {
952       if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
953         gpr_log(GPR_INFO, " .. choose next poller to be peer %p", worker);
954       }
955       GPR_ASSERT(worker->next->initialized_cv);
956       gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
957       SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
958       GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
959       gpr_cv_signal(&worker->next->cv);
960       if (grpc_core::ExecCtx::Get()->HasWork()) {
961         gpr_mu_unlock(&pollset->mu);
962         grpc_core::ExecCtx::Get()->Flush();
963         gpr_mu_lock(&pollset->mu);
964       }
965     } else {
966       gpr_atm_no_barrier_store(&g_active_poller, 0);
967       size_t poller_neighborhood_idx =
968           static_cast<size_t>(pollset->neighborhood - g_neighborhoods);
969       gpr_mu_unlock(&pollset->mu);
970       bool found_worker = false;
971       bool scan_state[MAX_NEIGHBORHOODS];
972       for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
973         pollset_neighborhood* neighborhood =
974             &g_neighborhoods[(poller_neighborhood_idx + i) %
975                              g_num_neighborhoods];
976         if (gpr_mu_trylock(&neighborhood->mu)) {
977           found_worker = check_neighborhood_for_available_poller(neighborhood);
978           gpr_mu_unlock(&neighborhood->mu);
979           scan_state[i] = true;
980         } else {
981           scan_state[i] = false;
982         }
983       }
984       for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
985         if (scan_state[i]) continue;
986         pollset_neighborhood* neighborhood =
987             &g_neighborhoods[(poller_neighborhood_idx + i) %
988                              g_num_neighborhoods];
989         gpr_mu_lock(&neighborhood->mu);
990         found_worker = check_neighborhood_for_available_poller(neighborhood);
991         gpr_mu_unlock(&neighborhood->mu);
992       }
993       grpc_core::ExecCtx::Get()->Flush();
994       gpr_mu_lock(&pollset->mu);
995     }
996   } else if (grpc_core::ExecCtx::Get()->HasWork()) {
997     gpr_mu_unlock(&pollset->mu);
998     grpc_core::ExecCtx::Get()->Flush();
999     gpr_mu_lock(&pollset->mu);
1000   }
1001   if (worker->initialized_cv) {
1002     gpr_cv_destroy(&worker->cv);
1003   }
1004   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1005     gpr_log(GPR_INFO, " .. remove worker");
1006   }
1007   if (EMPTIED == worker_remove(pollset, worker)) {
1008     pollset_maybe_finish_shutdown(pollset);
1009   }
1010   GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
1011 }
1012 
1013 /* pollset->po.mu lock must be held by the caller before calling this.
1014    The function pollset_work() may temporarily release the lock (pollset->po.mu)
1015    during the course of its execution but it will always re-acquire the lock and
1016    ensure that it is held by the time the function returns */
pollset_work(grpc_pollset * ps,grpc_pollset_worker ** worker_hdl,grpc_millis deadline)1017 static grpc_error* pollset_work(grpc_pollset* ps,
1018                                 grpc_pollset_worker** worker_hdl,
1019                                 grpc_millis deadline) {
1020   GPR_TIMER_SCOPE("pollset_work", 0);
1021   grpc_pollset_worker worker;
1022   grpc_error* error = GRPC_ERROR_NONE;
1023   static const char* err_desc = "pollset_work";
1024   if (ps->kicked_without_poller) {
1025     ps->kicked_without_poller = false;
1026     return GRPC_ERROR_NONE;
1027   }
1028 
1029   if (begin_worker(ps, &worker, worker_hdl, deadline)) {
1030     gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
1031     gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
1032     GPR_ASSERT(!ps->shutting_down);
1033     GPR_ASSERT(!ps->seen_inactive);
1034 
1035     gpr_mu_unlock(&ps->mu); /* unlock */
1036     /* This is the designated polling thread at this point and should ideally do
1037        polling. However, if there are unprocessed events left from a previous
1038        call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
1039        process the pending epoll events.
1040 
1041        The reason for decoupling do_epoll_wait and process_epoll_events is to
1042        better distribute the work (i.e handling epoll events) across multiple
1043        threads
1044 
1045        process_epoll_events() returns very quickly: It just queues the work on
1046        exec_ctx but does not execute it (the actual exectution or more
1047        accurately grpc_core::ExecCtx::Get()->Flush() happens in end_worker()
1048        AFTER selecting a designated poller). So we are not waiting long periods
1049        without a designated poller */
1050     if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
1051         gpr_atm_acq_load(&g_epoll_set.num_events)) {
1052       append_error(&error, do_epoll_wait(ps, deadline), err_desc);
1053     }
1054     append_error(&error, process_epoll_events(ps), err_desc);
1055 
1056     gpr_mu_lock(&ps->mu); /* lock */
1057 
1058     gpr_tls_set(&g_current_thread_worker, 0);
1059   } else {
1060     gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
1061   }
1062   end_worker(ps, &worker, worker_hdl);
1063 
1064   gpr_tls_set(&g_current_thread_pollset, 0);
1065   return error;
1066 }
1067 
pollset_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)1068 static grpc_error* pollset_kick(grpc_pollset* pollset,
1069                                 grpc_pollset_worker* specific_worker) {
1070   GPR_TIMER_SCOPE("pollset_kick", 0);
1071   GRPC_STATS_INC_POLLSET_KICK();
1072   grpc_error* ret_err = GRPC_ERROR_NONE;
1073   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1074     std::vector<std::string> log;
1075     log.push_back(absl::StrFormat(
1076         "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset, specific_worker,
1077         reinterpret_cast<void*>(gpr_tls_get(&g_current_thread_pollset)),
1078         reinterpret_cast<void*>(gpr_tls_get(&g_current_thread_worker)),
1079         pollset->root_worker));
1080     if (pollset->root_worker != nullptr) {
1081       log.push_back(absl::StrFormat(
1082           " {kick_state=%s next=%p {kick_state=%s}}",
1083           kick_state_string(pollset->root_worker->state),
1084           pollset->root_worker->next,
1085           kick_state_string(pollset->root_worker->next->state)));
1086     }
1087     if (specific_worker != nullptr) {
1088       log.push_back(absl::StrFormat(" worker_kick_state=%s",
1089                                     kick_state_string(specific_worker->state)));
1090     }
1091     gpr_log(GPR_DEBUG, "%s", absl::StrJoin(log, "").c_str());
1092   }
1093 
1094   if (specific_worker == nullptr) {
1095     if (gpr_tls_get(&g_current_thread_pollset) !=
1096         reinterpret_cast<intptr_t>(pollset)) {
1097       grpc_pollset_worker* root_worker = pollset->root_worker;
1098       if (root_worker == nullptr) {
1099         GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER();
1100         pollset->kicked_without_poller = true;
1101         if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1102           gpr_log(GPR_INFO, " .. kicked_without_poller");
1103         }
1104         goto done;
1105       }
1106       grpc_pollset_worker* next_worker = root_worker->next;
1107       if (root_worker->state == KICKED) {
1108         GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1109         if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1110           gpr_log(GPR_INFO, " .. already kicked %p", root_worker);
1111         }
1112         SET_KICK_STATE(root_worker, KICKED);
1113         goto done;
1114       } else if (next_worker->state == KICKED) {
1115         GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1116         if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1117           gpr_log(GPR_INFO, " .. already kicked %p", next_worker);
1118         }
1119         SET_KICK_STATE(next_worker, KICKED);
1120         goto done;
1121       } else if (root_worker == next_worker &&  // only try and wake up a poller
1122                                                 // if there is no next worker
1123                  root_worker ==
1124                      reinterpret_cast<grpc_pollset_worker*>(
1125                          gpr_atm_no_barrier_load(&g_active_poller))) {
1126         GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1127         if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1128           gpr_log(GPR_INFO, " .. kicked %p", root_worker);
1129         }
1130         SET_KICK_STATE(root_worker, KICKED);
1131         ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1132         goto done;
1133       } else if (next_worker->state == UNKICKED) {
1134         GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1135         if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1136           gpr_log(GPR_INFO, " .. kicked %p", next_worker);
1137         }
1138         GPR_ASSERT(next_worker->initialized_cv);
1139         SET_KICK_STATE(next_worker, KICKED);
1140         gpr_cv_signal(&next_worker->cv);
1141         goto done;
1142       } else if (next_worker->state == DESIGNATED_POLLER) {
1143         if (root_worker->state != DESIGNATED_POLLER) {
1144           if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1145             gpr_log(
1146                 GPR_INFO,
1147                 " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
1148                 root_worker, root_worker->initialized_cv, next_worker);
1149           }
1150           SET_KICK_STATE(root_worker, KICKED);
1151           if (root_worker->initialized_cv) {
1152             GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1153             gpr_cv_signal(&root_worker->cv);
1154           }
1155           goto done;
1156         } else {
1157           GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1158           if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1159             gpr_log(GPR_INFO, " .. non-root poller %p (root=%p)", next_worker,
1160                     root_worker);
1161           }
1162           SET_KICK_STATE(next_worker, KICKED);
1163           ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1164           goto done;
1165         }
1166       } else {
1167         GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1168         GPR_ASSERT(next_worker->state == KICKED);
1169         SET_KICK_STATE(next_worker, KICKED);
1170         goto done;
1171       }
1172     } else {
1173       GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
1174       if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1175         gpr_log(GPR_INFO, " .. kicked while waking up");
1176       }
1177       goto done;
1178     }
1179 
1180     GPR_UNREACHABLE_CODE(goto done);
1181   }
1182 
1183   if (specific_worker->state == KICKED) {
1184     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1185       gpr_log(GPR_INFO, " .. specific worker already kicked");
1186     }
1187     goto done;
1188   } else if (gpr_tls_get(&g_current_thread_worker) ==
1189              reinterpret_cast<intptr_t>(specific_worker)) {
1190     GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
1191     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1192       gpr_log(GPR_INFO, " .. mark %p kicked", specific_worker);
1193     }
1194     SET_KICK_STATE(specific_worker, KICKED);
1195     goto done;
1196   } else if (specific_worker ==
1197              reinterpret_cast<grpc_pollset_worker*>(
1198                  gpr_atm_no_barrier_load(&g_active_poller))) {
1199     GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1200     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1201       gpr_log(GPR_INFO, " .. kick active poller");
1202     }
1203     SET_KICK_STATE(specific_worker, KICKED);
1204     ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1205     goto done;
1206   } else if (specific_worker->initialized_cv) {
1207     GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1208     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1209       gpr_log(GPR_INFO, " .. kick waiting worker");
1210     }
1211     SET_KICK_STATE(specific_worker, KICKED);
1212     gpr_cv_signal(&specific_worker->cv);
1213     goto done;
1214   } else {
1215     GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1216     if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1217       gpr_log(GPR_INFO, " .. kick non-waiting worker");
1218     }
1219     SET_KICK_STATE(specific_worker, KICKED);
1220     goto done;
1221   }
1222 done:
1223   return ret_err;
1224 }
1225 
pollset_add_fd(grpc_pollset *,grpc_fd *)1226 static void pollset_add_fd(grpc_pollset* /*pollset*/, grpc_fd* /*fd*/) {}
1227 
1228 /*******************************************************************************
1229  * Pollset-set Definitions
1230  */
1231 
pollset_set_create(void)1232 static grpc_pollset_set* pollset_set_create(void) {
1233   return reinterpret_cast<grpc_pollset_set*>(static_cast<intptr_t>(0xdeafbeef));
1234 }
1235 
pollset_set_destroy(grpc_pollset_set *)1236 static void pollset_set_destroy(grpc_pollset_set* /*pss*/) {}
1237 
pollset_set_add_fd(grpc_pollset_set *,grpc_fd *)1238 static void pollset_set_add_fd(grpc_pollset_set* /*pss*/, grpc_fd* /*fd*/) {}
1239 
pollset_set_del_fd(grpc_pollset_set *,grpc_fd *)1240 static void pollset_set_del_fd(grpc_pollset_set* /*pss*/, grpc_fd* /*fd*/) {}
1241 
pollset_set_add_pollset(grpc_pollset_set *,grpc_pollset *)1242 static void pollset_set_add_pollset(grpc_pollset_set* /*pss*/,
1243                                     grpc_pollset* /*ps*/) {}
1244 
pollset_set_del_pollset(grpc_pollset_set *,grpc_pollset *)1245 static void pollset_set_del_pollset(grpc_pollset_set* /*pss*/,
1246                                     grpc_pollset* /*ps*/) {}
1247 
pollset_set_add_pollset_set(grpc_pollset_set *,grpc_pollset_set *)1248 static void pollset_set_add_pollset_set(grpc_pollset_set* /*bag*/,
1249                                         grpc_pollset_set* /*item*/) {}
1250 
pollset_set_del_pollset_set(grpc_pollset_set *,grpc_pollset_set *)1251 static void pollset_set_del_pollset_set(grpc_pollset_set* /*bag*/,
1252                                         grpc_pollset_set* /*item*/) {}
1253 
1254 /*******************************************************************************
1255  * Event engine binding
1256  */
1257 
is_any_background_poller_thread(void)1258 static bool is_any_background_poller_thread(void) { return false; }
1259 
shutdown_background_closure(void)1260 static void shutdown_background_closure(void) {}
1261 
add_closure_to_background_poller(grpc_closure *,grpc_error *)1262 static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
1263                                              grpc_error* /*error*/) {
1264   return false;
1265 }
1266 
shutdown_engine(void)1267 static void shutdown_engine(void) {
1268   fd_global_shutdown();
1269   pollset_global_shutdown();
1270   epoll_set_shutdown();
1271   if (grpc_core::Fork::Enabled()) {
1272     gpr_mu_destroy(&fork_fd_list_mu);
1273     grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1274   }
1275 }
1276 
1277 static const grpc_event_engine_vtable vtable = {
1278     sizeof(grpc_pollset),
1279     true,
1280     false,
1281 
1282     fd_create,
1283     fd_wrapped_fd,
1284     fd_orphan,
1285     fd_shutdown,
1286     fd_notify_on_read,
1287     fd_notify_on_write,
1288     fd_notify_on_error,
1289     fd_become_readable,
1290     fd_become_writable,
1291     fd_has_errors,
1292     fd_is_shutdown,
1293 
1294     pollset_init,
1295     pollset_shutdown,
1296     pollset_destroy,
1297     pollset_work,
1298     pollset_kick,
1299     pollset_add_fd,
1300 
1301     pollset_set_create,
1302     pollset_set_destroy,
1303     pollset_set_add_pollset,
1304     pollset_set_del_pollset,
1305     pollset_set_add_pollset_set,
1306     pollset_set_del_pollset_set,
1307     pollset_set_add_fd,
1308     pollset_set_del_fd,
1309 
1310     is_any_background_poller_thread,
1311     shutdown_background_closure,
1312     shutdown_engine,
1313     add_closure_to_background_poller,
1314 };
1315 
1316 /* Called by the child process's post-fork handler to close open fds, including
1317  * the global epoll fd. This allows gRPC to shutdown in the child process
1318  * without interfering with connections or RPCs ongoing in the parent. */
reset_event_manager_on_fork()1319 static void reset_event_manager_on_fork() {
1320   gpr_mu_lock(&fork_fd_list_mu);
1321   while (fork_fd_list_head != nullptr) {
1322     close(fork_fd_list_head->fd);
1323     fork_fd_list_head->fd = -1;
1324     fork_fd_list_head = fork_fd_list_head->fork_fd_list->next;
1325   }
1326   gpr_mu_unlock(&fork_fd_list_mu);
1327   shutdown_engine();
1328   grpc_init_epoll1_linux(true);
1329 }
1330 
1331 /* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1332  * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
1333  * support is available */
grpc_init_epoll1_linux(bool)1334 const grpc_event_engine_vtable* grpc_init_epoll1_linux(
1335     bool /*explicit_request*/) {
1336   if (!grpc_has_wakeup_fd()) {
1337     gpr_log(GPR_ERROR, "Skipping epoll1 because of no wakeup fd.");
1338     return nullptr;
1339   }
1340 
1341   if (!epoll_set_init()) {
1342     return nullptr;
1343   }
1344 
1345   fd_global_init();
1346 
1347   if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1348     fd_global_shutdown();
1349     epoll_set_shutdown();
1350     return nullptr;
1351   }
1352 
1353   if (grpc_core::Fork::Enabled()) {
1354     gpr_mu_init(&fork_fd_list_mu);
1355     grpc_core::Fork::SetResetChildPollingEngineFunc(
1356         reset_event_manager_on_fork);
1357   }
1358   return &vtable;
1359 }
1360 
1361 #else /* defined(GRPC_LINUX_EPOLL) */
1362 #if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1363 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
1364 /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1365  * NULL */
grpc_init_epoll1_linux(bool)1366 const grpc_event_engine_vtable* grpc_init_epoll1_linux(
1367     bool /*explicit_request*/) {
1368   return nullptr;
1369 }
1370 #endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */
1371 #endif /* !defined(GRPC_LINUX_EPOLL) */
1372