• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #include <grpc/support/log.h>
24 
25 /* This polling engine is only relevant on linux kernels supporting epoll
26    epoll_create() or epoll_create1() */
27 #ifdef GRPC_LINUX_EPOLL
28 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
29 
30 #include <assert.h>
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <limits.h>
34 #include <poll.h>
35 #include <pthread.h>
36 #include <string.h>
37 #include <sys/epoll.h>
38 #include <sys/socket.h>
39 #include <unistd.h>
40 
41 #include <grpc/support/alloc.h>
42 #include <grpc/support/cpu.h>
43 #include <grpc/support/string_util.h>
44 
45 #include "src/core/lib/debug/stats.h"
46 #include "src/core/lib/gpr/string.h"
47 #include "src/core/lib/gpr/tls.h"
48 #include "src/core/lib/gpr/useful.h"
49 #include "src/core/lib/gprpp/manual_constructor.h"
50 #include "src/core/lib/iomgr/block_annotate.h"
51 #include "src/core/lib/iomgr/ev_posix.h"
52 #include "src/core/lib/iomgr/iomgr_internal.h"
53 #include "src/core/lib/iomgr/lockfree_event.h"
54 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
55 #include "src/core/lib/profiling/timers.h"
56 
57 static grpc_wakeup_fd global_wakeup_fd;
58 
59 /*******************************************************************************
60  * Singleton epoll set related fields
61  */
62 
63 #define MAX_EPOLL_EVENTS 100
64 #define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
65 
66 /* NOTE ON SYNCHRONIZATION:
67  * - Fields in this struct are only modified by the designated poller. Hence
68  *   there is no need for any locks to protect the struct.
69  * - num_events and cursor fields have to be of atomic type to provide memory
70  *   visibility guarantees only. i.e In case of multiple pollers, the designated
71  *   polling thread keeps changing; the thread that wrote these values may be
72  *   different from the thread reading the values
73  */
74 typedef struct epoll_set {
75   int epfd;
76 
77   /* The epoll_events after the last call to epoll_wait() */
78   struct epoll_event events[MAX_EPOLL_EVENTS];
79 
80   /* The number of epoll_events after the last call to epoll_wait() */
81   gpr_atm num_events;
82 
83   /* Index of the first event in epoll_events that has to be processed. This
84    * field is only valid if num_events > 0 */
85   gpr_atm cursor;
86 } epoll_set;
87 
88 /* The global singleton epoll set */
89 static epoll_set g_epoll_set;
90 
epoll_create_and_cloexec()91 static int epoll_create_and_cloexec() {
92 #ifdef GRPC_LINUX_EPOLL_CREATE1
93   int fd = epoll_create1(EPOLL_CLOEXEC);
94   if (fd < 0) {
95     gpr_log(GPR_ERROR, "epoll_create1 unavailable");
96   }
97 #else
98   int fd = epoll_create(MAX_EPOLL_EVENTS);
99   if (fd < 0) {
100     gpr_log(GPR_ERROR, "epoll_create unavailable");
101   } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
102     gpr_log(GPR_ERROR, "fcntl following epoll_create failed");
103     return -1;
104   }
105 #endif
106   return fd;
107 }
108 
109 /* Must be called *only* once */
epoll_set_init()110 static bool epoll_set_init() {
111   g_epoll_set.epfd = epoll_create_and_cloexec();
112   if (g_epoll_set.epfd < 0) {
113     return false;
114   }
115 
116   gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
117   gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
118   gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
119   return true;
120 }
121 
122 /* epoll_set_init() MUST be called before calling this. */
epoll_set_shutdown()123 static void epoll_set_shutdown() {
124   if (g_epoll_set.epfd >= 0) {
125     close(g_epoll_set.epfd);
126     g_epoll_set.epfd = -1;
127   }
128 }
129 
130 /*******************************************************************************
131  * Fd Declarations
132  */
133 
134 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
135 struct grpc_fork_fd_list {
136   grpc_fd* fd;
137   grpc_fd* next;
138   grpc_fd* prev;
139 };
140 
141 struct grpc_fd {
142   int fd;
143 
144   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
145   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
146   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
147 
148   struct grpc_fd* freelist_next;
149 
150   grpc_iomgr_object iomgr_object;
151 
152   /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
153   grpc_fork_fd_list* fork_fd_list;
154 };
155 
156 static void fd_global_init(void);
157 static void fd_global_shutdown(void);
158 
159 /*******************************************************************************
160  * Pollset Declarations
161  */
162 
163 typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
164 
kick_state_string(kick_state st)165 static const char* kick_state_string(kick_state st) {
166   switch (st) {
167     case UNKICKED:
168       return "UNKICKED";
169     case KICKED:
170       return "KICKED";
171     case DESIGNATED_POLLER:
172       return "DESIGNATED_POLLER";
173   }
174   GPR_UNREACHABLE_CODE(return "UNKNOWN");
175 }
176 
177 struct grpc_pollset_worker {
178   kick_state state;
179   int kick_state_mutator;  // which line of code last changed kick state
180   bool initialized_cv;
181   grpc_pollset_worker* next;
182   grpc_pollset_worker* prev;
183   gpr_cv cv;
184   grpc_closure_list schedule_on_end_work;
185 };
186 
187 #define SET_KICK_STATE(worker, kick_state)   \
188   do {                                       \
189     (worker)->state = (kick_state);          \
190     (worker)->kick_state_mutator = __LINE__; \
191   } while (false)
192 
193 #define MAX_NEIGHBORHOODS 1024
194 
195 typedef struct pollset_neighborhood {
196   gpr_mu mu;
197   grpc_pollset* active_root;
198   char pad[GPR_CACHELINE_SIZE];
199 } pollset_neighborhood;
200 
201 struct grpc_pollset {
202   gpr_mu mu;
203   pollset_neighborhood* neighborhood;
204   bool reassigning_neighborhood;
205   grpc_pollset_worker* root_worker;
206   bool kicked_without_poller;
207 
208   /* Set to true if the pollset is observed to have no workers available to
209      poll */
210   bool seen_inactive;
211   bool shutting_down;             /* Is the pollset shutting down ? */
212   grpc_closure* shutdown_closure; /* Called after after shutdown is complete */
213 
214   /* Number of workers who are *about-to* attach themselves to the pollset
215    * worker list */
216   int begin_refs;
217 
218   grpc_pollset* next;
219   grpc_pollset* prev;
220 };
221 
222 /*******************************************************************************
223  * Pollset-set Declarations
224  */
225 
226 struct grpc_pollset_set {
227   char unused;
228 };
229 
230 /*******************************************************************************
231  * Common helpers
232  */
233 
append_error(grpc_error ** composite,grpc_error * error,const char * desc)234 static bool append_error(grpc_error** composite, grpc_error* error,
235                          const char* desc) {
236   if (error == GRPC_ERROR_NONE) return true;
237   if (*composite == GRPC_ERROR_NONE) {
238     *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
239   }
240   *composite = grpc_error_add_child(*composite, error);
241   return false;
242 }
243 
244 /*******************************************************************************
245  * Fd Definitions
246  */
247 
248 /* We need to keep a freelist not because of any concerns of malloc performance
249  * but instead so that implementations with multiple threads in (for example)
250  * epoll_wait deal with the race between pollset removal and incoming poll
251  * notifications.
252  *
253  * The problem is that the poller ultimately holds a reference to this
254  * object, so it is very difficult to know when is safe to free it, at least
255  * without some expensive synchronization.
256  *
257  * If we keep the object freelisted, in the worst case losing this race just
258  * becomes a spurious read notification on a reused fd.
259  */
260 
261 /* The alarm system needs to be able to wakeup 'some poller' sometimes
262  * (specifically when a new alarm needs to be triggered earlier than the next
263  * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
264  * case occurs. */
265 
266 static grpc_fd* fd_freelist = nullptr;
267 static gpr_mu fd_freelist_mu;
268 
269 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
270 static grpc_fd* fork_fd_list_head = nullptr;
271 static gpr_mu fork_fd_list_mu;
272 
fd_global_init(void)273 static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
274 
fd_global_shutdown(void)275 static void fd_global_shutdown(void) {
276   gpr_mu_lock(&fd_freelist_mu);
277   gpr_mu_unlock(&fd_freelist_mu);
278   while (fd_freelist != nullptr) {
279     grpc_fd* fd = fd_freelist;
280     fd_freelist = fd_freelist->freelist_next;
281     gpr_free(fd);
282   }
283   gpr_mu_destroy(&fd_freelist_mu);
284 }
285 
fork_fd_list_add_grpc_fd(grpc_fd * fd)286 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
287   if (grpc_core::Fork::Enabled()) {
288     gpr_mu_lock(&fork_fd_list_mu);
289     fd->fork_fd_list =
290         static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
291     fd->fork_fd_list->next = fork_fd_list_head;
292     fd->fork_fd_list->prev = nullptr;
293     if (fork_fd_list_head != nullptr) {
294       fork_fd_list_head->fork_fd_list->prev = fd;
295     }
296     fork_fd_list_head = fd;
297     gpr_mu_unlock(&fork_fd_list_mu);
298   }
299 }
300 
fork_fd_list_remove_grpc_fd(grpc_fd * fd)301 static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
302   if (grpc_core::Fork::Enabled()) {
303     gpr_mu_lock(&fork_fd_list_mu);
304     if (fork_fd_list_head == fd) {
305       fork_fd_list_head = fd->fork_fd_list->next;
306     }
307     if (fd->fork_fd_list->prev != nullptr) {
308       fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next;
309     }
310     if (fd->fork_fd_list->next != nullptr) {
311       fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev;
312     }
313     gpr_free(fd->fork_fd_list);
314     gpr_mu_unlock(&fork_fd_list_mu);
315   }
316 }
317 
fd_create(int fd,const char * name,bool track_err)318 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
319   grpc_fd* new_fd = nullptr;
320 
321   gpr_mu_lock(&fd_freelist_mu);
322   if (fd_freelist != nullptr) {
323     new_fd = fd_freelist;
324     fd_freelist = fd_freelist->freelist_next;
325   }
326   gpr_mu_unlock(&fd_freelist_mu);
327 
328   if (new_fd == nullptr) {
329     new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
330     new_fd->read_closure.Init();
331     new_fd->write_closure.Init();
332     new_fd->error_closure.Init();
333   }
334   new_fd->fd = fd;
335   new_fd->read_closure->InitEvent();
336   new_fd->write_closure->InitEvent();
337   new_fd->error_closure->InitEvent();
338 
339   new_fd->freelist_next = nullptr;
340 
341   char* fd_name;
342   gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
343   grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
344   fork_fd_list_add_grpc_fd(new_fd);
345 #ifndef NDEBUG
346   if (grpc_trace_fd_refcount.enabled()) {
347     gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name);
348   }
349 #endif
350   gpr_free(fd_name);
351 
352   struct epoll_event ev;
353   ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
354   /* Use the least significant bit of ev.data.ptr to store track_err. We expect
355    * the addresses to be word aligned. We need to store track_err to avoid
356    * synchronization issues when accessing it after receiving an event.
357    * Accessing fd would be a data race there because the fd might have been
358    * returned to the free list at that point. */
359   ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
360                                         (track_err ? 1 : 0));
361   if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
362     gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
363   }
364 
365   return new_fd;
366 }
367 
fd_wrapped_fd(grpc_fd * fd)368 static int fd_wrapped_fd(grpc_fd* fd) { return fd->fd; }
369 
370 /* if 'releasing_fd' is true, it means that we are going to detach the internal
371  * fd from grpc_fd structure (i.e which means we should not be calling
372  * shutdown() syscall on that fd) */
fd_shutdown_internal(grpc_fd * fd,grpc_error * why,bool releasing_fd)373 static void fd_shutdown_internal(grpc_fd* fd, grpc_error* why,
374                                  bool releasing_fd) {
375   if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
376     if (!releasing_fd) {
377       shutdown(fd->fd, SHUT_RDWR);
378     }
379     fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
380     fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
381   }
382   GRPC_ERROR_UNREF(why);
383 }
384 
385 /* Might be called multiple times */
fd_shutdown(grpc_fd * fd,grpc_error * why)386 static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
387   fd_shutdown_internal(fd, why, false);
388 }
389 
fd_orphan(grpc_fd * fd,grpc_closure * on_done,int * release_fd,const char * reason)390 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
391                       const char* reason) {
392   grpc_error* error = GRPC_ERROR_NONE;
393   bool is_release_fd = (release_fd != nullptr);
394 
395   if (!fd->read_closure->IsShutdown()) {
396     fd_shutdown_internal(fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
397                          is_release_fd);
398   }
399 
400   /* If release_fd is not NULL, we should be relinquishing control of the file
401      descriptor fd->fd (but we still own the grpc_fd structure). */
402   if (is_release_fd) {
403     *release_fd = fd->fd;
404   } else {
405     close(fd->fd);
406   }
407 
408   GRPC_CLOSURE_SCHED(on_done, GRPC_ERROR_REF(error));
409 
410   grpc_iomgr_unregister_object(&fd->iomgr_object);
411   fork_fd_list_remove_grpc_fd(fd);
412   fd->read_closure->DestroyEvent();
413   fd->write_closure->DestroyEvent();
414   fd->error_closure->DestroyEvent();
415 
416   gpr_mu_lock(&fd_freelist_mu);
417   fd->freelist_next = fd_freelist;
418   fd_freelist = fd;
419   gpr_mu_unlock(&fd_freelist_mu);
420 }
421 
fd_is_shutdown(grpc_fd * fd)422 static bool fd_is_shutdown(grpc_fd* fd) {
423   return fd->read_closure->IsShutdown();
424 }
425 
fd_notify_on_read(grpc_fd * fd,grpc_closure * closure)426 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
427   fd->read_closure->NotifyOn(closure);
428 }
429 
fd_notify_on_write(grpc_fd * fd,grpc_closure * closure)430 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
431   fd->write_closure->NotifyOn(closure);
432 }
433 
fd_notify_on_error(grpc_fd * fd,grpc_closure * closure)434 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
435   fd->error_closure->NotifyOn(closure);
436 }
437 
fd_become_readable(grpc_fd * fd)438 static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
439 
fd_become_writable(grpc_fd * fd)440 static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
441 
fd_has_errors(grpc_fd * fd)442 static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
443 
444 /*******************************************************************************
445  * Pollset Definitions
446  */
447 
448 GPR_TLS_DECL(g_current_thread_pollset);
449 GPR_TLS_DECL(g_current_thread_worker);
450 
451 /* The designated poller */
452 static gpr_atm g_active_poller;
453 
454 static pollset_neighborhood* g_neighborhoods;
455 static size_t g_num_neighborhoods;
456 
457 /* Return true if first in list */
worker_insert(grpc_pollset * pollset,grpc_pollset_worker * worker)458 static bool worker_insert(grpc_pollset* pollset, grpc_pollset_worker* worker) {
459   if (pollset->root_worker == nullptr) {
460     pollset->root_worker = worker;
461     worker->next = worker->prev = worker;
462     return true;
463   } else {
464     worker->next = pollset->root_worker;
465     worker->prev = worker->next->prev;
466     worker->next->prev = worker;
467     worker->prev->next = worker;
468     return false;
469   }
470 }
471 
472 /* Return true if last in list */
473 typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
474 
worker_remove(grpc_pollset * pollset,grpc_pollset_worker * worker)475 static worker_remove_result worker_remove(grpc_pollset* pollset,
476                                           grpc_pollset_worker* worker) {
477   if (worker == pollset->root_worker) {
478     if (worker == worker->next) {
479       pollset->root_worker = nullptr;
480       return EMPTIED;
481     } else {
482       pollset->root_worker = worker->next;
483       worker->prev->next = worker->next;
484       worker->next->prev = worker->prev;
485       return NEW_ROOT;
486     }
487   } else {
488     worker->prev->next = worker->next;
489     worker->next->prev = worker->prev;
490     return REMOVED;
491   }
492 }
493 
choose_neighborhood(void)494 static size_t choose_neighborhood(void) {
495   return static_cast<size_t>(gpr_cpu_current_cpu()) % g_num_neighborhoods;
496 }
497 
pollset_global_init(void)498 static grpc_error* pollset_global_init(void) {
499   gpr_tls_init(&g_current_thread_pollset);
500   gpr_tls_init(&g_current_thread_worker);
501   gpr_atm_no_barrier_store(&g_active_poller, 0);
502   global_wakeup_fd.read_fd = -1;
503   grpc_error* err = grpc_wakeup_fd_init(&global_wakeup_fd);
504   if (err != GRPC_ERROR_NONE) return err;
505   struct epoll_event ev;
506   ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
507   ev.data.ptr = &global_wakeup_fd;
508   if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
509                 &ev) != 0) {
510     return GRPC_OS_ERROR(errno, "epoll_ctl");
511   }
512   g_num_neighborhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBORHOODS);
513   g_neighborhoods = static_cast<pollset_neighborhood*>(
514       gpr_zalloc(sizeof(*g_neighborhoods) * g_num_neighborhoods));
515   for (size_t i = 0; i < g_num_neighborhoods; i++) {
516     gpr_mu_init(&g_neighborhoods[i].mu);
517   }
518   return GRPC_ERROR_NONE;
519 }
520 
pollset_global_shutdown(void)521 static void pollset_global_shutdown(void) {
522   gpr_tls_destroy(&g_current_thread_pollset);
523   gpr_tls_destroy(&g_current_thread_worker);
524   if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
525   for (size_t i = 0; i < g_num_neighborhoods; i++) {
526     gpr_mu_destroy(&g_neighborhoods[i].mu);
527   }
528   gpr_free(g_neighborhoods);
529 }
530 
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)531 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
532   gpr_mu_init(&pollset->mu);
533   *mu = &pollset->mu;
534   pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
535   pollset->reassigning_neighborhood = false;
536   pollset->root_worker = nullptr;
537   pollset->kicked_without_poller = false;
538   pollset->seen_inactive = true;
539   pollset->shutting_down = false;
540   pollset->shutdown_closure = nullptr;
541   pollset->begin_refs = 0;
542   pollset->next = pollset->prev = nullptr;
543 }
544 
pollset_destroy(grpc_pollset * pollset)545 static void pollset_destroy(grpc_pollset* pollset) {
546   gpr_mu_lock(&pollset->mu);
547   if (!pollset->seen_inactive) {
548     pollset_neighborhood* neighborhood = pollset->neighborhood;
549     gpr_mu_unlock(&pollset->mu);
550   retry_lock_neighborhood:
551     gpr_mu_lock(&neighborhood->mu);
552     gpr_mu_lock(&pollset->mu);
553     if (!pollset->seen_inactive) {
554       if (pollset->neighborhood != neighborhood) {
555         gpr_mu_unlock(&neighborhood->mu);
556         neighborhood = pollset->neighborhood;
557         gpr_mu_unlock(&pollset->mu);
558         goto retry_lock_neighborhood;
559       }
560       pollset->prev->next = pollset->next;
561       pollset->next->prev = pollset->prev;
562       if (pollset == pollset->neighborhood->active_root) {
563         pollset->neighborhood->active_root =
564             pollset->next == pollset ? nullptr : pollset->next;
565       }
566     }
567     gpr_mu_unlock(&pollset->neighborhood->mu);
568   }
569   gpr_mu_unlock(&pollset->mu);
570   gpr_mu_destroy(&pollset->mu);
571 }
572 
pollset_kick_all(grpc_pollset * pollset)573 static grpc_error* pollset_kick_all(grpc_pollset* pollset) {
574   GPR_TIMER_SCOPE("pollset_kick_all", 0);
575   grpc_error* error = GRPC_ERROR_NONE;
576   if (pollset->root_worker != nullptr) {
577     grpc_pollset_worker* worker = pollset->root_worker;
578     do {
579       GRPC_STATS_INC_POLLSET_KICK();
580       switch (worker->state) {
581         case KICKED:
582           GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
583           break;
584         case UNKICKED:
585           SET_KICK_STATE(worker, KICKED);
586           if (worker->initialized_cv) {
587             GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
588             gpr_cv_signal(&worker->cv);
589           }
590           break;
591         case DESIGNATED_POLLER:
592           GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
593           SET_KICK_STATE(worker, KICKED);
594           append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
595                        "pollset_kick_all");
596           break;
597       }
598 
599       worker = worker->next;
600     } while (worker != pollset->root_worker);
601   }
602   // TODO: sreek.  Check if we need to set 'kicked_without_poller' to true here
603   // in the else case
604   return error;
605 }
606 
pollset_maybe_finish_shutdown(grpc_pollset * pollset)607 static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
608   if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
609       pollset->begin_refs == 0) {
610     GPR_TIMER_MARK("pollset_finish_shutdown", 0);
611     GRPC_CLOSURE_SCHED(pollset->shutdown_closure, GRPC_ERROR_NONE);
612     pollset->shutdown_closure = nullptr;
613   }
614 }
615 
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)616 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
617   GPR_TIMER_SCOPE("pollset_shutdown", 0);
618   GPR_ASSERT(pollset->shutdown_closure == nullptr);
619   GPR_ASSERT(!pollset->shutting_down);
620   pollset->shutdown_closure = closure;
621   pollset->shutting_down = true;
622   GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
623   pollset_maybe_finish_shutdown(pollset);
624 }
625 
poll_deadline_to_millis_timeout(grpc_millis millis)626 static int poll_deadline_to_millis_timeout(grpc_millis millis) {
627   if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
628   grpc_millis delta = millis - grpc_core::ExecCtx::Get()->Now();
629   if (delta > INT_MAX) {
630     return INT_MAX;
631   } else if (delta < 0) {
632     return 0;
633   } else {
634     return static_cast<int>(delta);
635   }
636 }
637 
638 /* Process the epoll events found by do_epoll_wait() function.
639    - g_epoll_set.cursor points to the index of the first event to be processed
640    - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
641      updates the g_epoll_set.cursor
642 
643    NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
644    called by g_active_poller thread. So there is no need for synchronization
645    when accessing fields in g_epoll_set */
process_epoll_events(grpc_pollset * pollset)646 static grpc_error* process_epoll_events(grpc_pollset* pollset) {
647   GPR_TIMER_SCOPE("process_epoll_events", 0);
648 
649   static const char* err_desc = "process_events";
650   grpc_error* error = GRPC_ERROR_NONE;
651   long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
652   long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
653   for (int idx = 0;
654        (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
655        idx++) {
656     long c = cursor++;
657     struct epoll_event* ev = &g_epoll_set.events[c];
658     void* data_ptr = ev->data.ptr;
659 
660     if (data_ptr == &global_wakeup_fd) {
661       append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
662                    err_desc);
663     } else {
664       grpc_fd* fd = reinterpret_cast<grpc_fd*>(
665           reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
666       bool track_err =
667           reinterpret_cast<intptr_t>(data_ptr) & static_cast<intptr_t>(1);
668       bool cancel = (ev->events & EPOLLHUP) != 0;
669       bool error = (ev->events & EPOLLERR) != 0;
670       bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
671       bool write_ev = (ev->events & EPOLLOUT) != 0;
672       bool err_fallback = error && !track_err;
673 
674       if (error && !err_fallback) {
675         fd_has_errors(fd);
676       }
677 
678       if (read_ev || cancel || err_fallback) {
679         fd_become_readable(fd);
680       }
681 
682       if (write_ev || cancel || err_fallback) {
683         fd_become_writable(fd);
684       }
685     }
686   }
687   gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
688   return error;
689 }
690 
691 /* Do epoll_wait and store the events in g_epoll_set.events field. This does not
692    "process" any of the events yet; that is done in process_epoll_events().
693    *See process_epoll_events() function for more details.
694 
695    NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
696    (i.e the designated poller thread) will be calling this function. So there is
697    no need for any synchronization when accesing fields in g_epoll_set */
do_epoll_wait(grpc_pollset * ps,grpc_millis deadline)698 static grpc_error* do_epoll_wait(grpc_pollset* ps, grpc_millis deadline) {
699   GPR_TIMER_SCOPE("do_epoll_wait", 0);
700 
701   int r;
702   int timeout = poll_deadline_to_millis_timeout(deadline);
703   if (timeout != 0) {
704     GRPC_SCHEDULING_START_BLOCKING_REGION;
705   }
706   do {
707     GRPC_STATS_INC_SYSCALL_POLL();
708     r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
709                    timeout);
710   } while (r < 0 && errno == EINTR);
711   if (timeout != 0) {
712     GRPC_SCHEDULING_END_BLOCKING_REGION;
713   }
714 
715   if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
716 
717   GRPC_STATS_INC_POLL_EVENTS_RETURNED(r);
718 
719   if (grpc_polling_trace.enabled()) {
720     gpr_log(GPR_INFO, "ps: %p poll got %d events", ps, r);
721   }
722 
723   gpr_atm_rel_store(&g_epoll_set.num_events, r);
724   gpr_atm_rel_store(&g_epoll_set.cursor, 0);
725 
726   return GRPC_ERROR_NONE;
727 }
728 
begin_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl,grpc_millis deadline)729 static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
730                          grpc_pollset_worker** worker_hdl,
731                          grpc_millis deadline) {
732   GPR_TIMER_SCOPE("begin_worker", 0);
733   if (worker_hdl != nullptr) *worker_hdl = worker;
734   worker->initialized_cv = false;
735   SET_KICK_STATE(worker, UNKICKED);
736   worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
737   pollset->begin_refs++;
738 
739   if (grpc_polling_trace.enabled()) {
740     gpr_log(GPR_INFO, "PS:%p BEGIN_STARTS:%p", pollset, worker);
741   }
742 
743   if (pollset->seen_inactive) {
744     // pollset has been observed to be inactive, we need to move back to the
745     // active list
746     bool is_reassigning = false;
747     if (!pollset->reassigning_neighborhood) {
748       is_reassigning = true;
749       pollset->reassigning_neighborhood = true;
750       pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
751     }
752     pollset_neighborhood* neighborhood = pollset->neighborhood;
753     gpr_mu_unlock(&pollset->mu);
754   // pollset unlocked: state may change (even worker->kick_state)
755   retry_lock_neighborhood:
756     gpr_mu_lock(&neighborhood->mu);
757     gpr_mu_lock(&pollset->mu);
758     if (grpc_polling_trace.enabled()) {
759       gpr_log(GPR_INFO, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
760               pollset, worker, kick_state_string(worker->state),
761               is_reassigning);
762     }
763     if (pollset->seen_inactive) {
764       if (neighborhood != pollset->neighborhood) {
765         gpr_mu_unlock(&neighborhood->mu);
766         neighborhood = pollset->neighborhood;
767         gpr_mu_unlock(&pollset->mu);
768         goto retry_lock_neighborhood;
769       }
770 
771       /* In the brief time we released the pollset locks above, the worker MAY
772          have been kicked. In this case, the worker should get out of this
773          pollset ASAP and hence this should neither add the pollset to
774          neighborhood nor mark the pollset as active.
775 
776          On a side note, the only way a worker's kick state could have changed
777          at this point is if it were "kicked specifically". Since the worker has
778          not added itself to the pollset yet (by calling worker_insert()), it is
779          not visible in the "kick any" path yet */
780       if (worker->state == UNKICKED) {
781         pollset->seen_inactive = false;
782         if (neighborhood->active_root == nullptr) {
783           neighborhood->active_root = pollset->next = pollset->prev = pollset;
784           /* Make this the designated poller if there isn't one already */
785           if (worker->state == UNKICKED &&
786               gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
787             SET_KICK_STATE(worker, DESIGNATED_POLLER);
788           }
789         } else {
790           pollset->next = neighborhood->active_root;
791           pollset->prev = pollset->next->prev;
792           pollset->next->prev = pollset->prev->next = pollset;
793         }
794       }
795     }
796     if (is_reassigning) {
797       GPR_ASSERT(pollset->reassigning_neighborhood);
798       pollset->reassigning_neighborhood = false;
799     }
800     gpr_mu_unlock(&neighborhood->mu);
801   }
802 
803   worker_insert(pollset, worker);
804   pollset->begin_refs--;
805   if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
806     GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
807     worker->initialized_cv = true;
808     gpr_cv_init(&worker->cv);
809     while (worker->state == UNKICKED && !pollset->shutting_down) {
810       if (grpc_polling_trace.enabled()) {
811         gpr_log(GPR_INFO, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
812                 pollset, worker, kick_state_string(worker->state),
813                 pollset->shutting_down);
814       }
815 
816       if (gpr_cv_wait(&worker->cv, &pollset->mu,
817                       grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC)) &&
818           worker->state == UNKICKED) {
819         /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
820            received a kick */
821         SET_KICK_STATE(worker, KICKED);
822       }
823     }
824     grpc_core::ExecCtx::Get()->InvalidateNow();
825   }
826 
827   if (grpc_polling_trace.enabled()) {
828     gpr_log(GPR_INFO,
829             "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
830             "kicked_without_poller: %d",
831             pollset, worker, kick_state_string(worker->state),
832             pollset->shutting_down, pollset->kicked_without_poller);
833   }
834 
835   /* We release pollset lock in this function at a couple of places:
836    *   1. Briefly when assigning pollset to a neighborhood
837    *   2. When doing gpr_cv_wait()
838    * It is possible that 'kicked_without_poller' was set to true during (1) and
839    * 'shutting_down' is set to true during (1) or (2). If either of them is
840    * true, this worker cannot do polling */
841   /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
842    * case; especially when the worker is the DESIGNATED_POLLER */
843 
844   if (pollset->kicked_without_poller) {
845     pollset->kicked_without_poller = false;
846     return false;
847   }
848 
849   return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
850 }
851 
check_neighborhood_for_available_poller(pollset_neighborhood * neighborhood)852 static bool check_neighborhood_for_available_poller(
853     pollset_neighborhood* neighborhood) {
854   GPR_TIMER_SCOPE("check_neighborhood_for_available_poller", 0);
855   bool found_worker = false;
856   do {
857     grpc_pollset* inspect = neighborhood->active_root;
858     if (inspect == nullptr) {
859       break;
860     }
861     gpr_mu_lock(&inspect->mu);
862     GPR_ASSERT(!inspect->seen_inactive);
863     grpc_pollset_worker* inspect_worker = inspect->root_worker;
864     if (inspect_worker != nullptr) {
865       do {
866         switch (inspect_worker->state) {
867           case UNKICKED:
868             if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
869                                        (gpr_atm)inspect_worker)) {
870               if (grpc_polling_trace.enabled()) {
871                 gpr_log(GPR_INFO, " .. choose next poller to be %p",
872                         inspect_worker);
873               }
874               SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
875               if (inspect_worker->initialized_cv) {
876                 GPR_TIMER_MARK("signal worker", 0);
877                 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
878                 gpr_cv_signal(&inspect_worker->cv);
879               }
880             } else {
881               if (grpc_polling_trace.enabled()) {
882                 gpr_log(GPR_INFO, " .. beaten to choose next poller");
883               }
884             }
885             // even if we didn't win the cas, there's a worker, we can stop
886             found_worker = true;
887             break;
888           case KICKED:
889             break;
890           case DESIGNATED_POLLER:
891             found_worker = true;  // ok, so someone else found the worker, but
892                                   // we'll accept that
893             break;
894         }
895         inspect_worker = inspect_worker->next;
896       } while (!found_worker && inspect_worker != inspect->root_worker);
897     }
898     if (!found_worker) {
899       if (grpc_polling_trace.enabled()) {
900         gpr_log(GPR_INFO, " .. mark pollset %p inactive", inspect);
901       }
902       inspect->seen_inactive = true;
903       if (inspect == neighborhood->active_root) {
904         neighborhood->active_root =
905             inspect->next == inspect ? nullptr : inspect->next;
906       }
907       inspect->next->prev = inspect->prev;
908       inspect->prev->next = inspect->next;
909       inspect->next = inspect->prev = nullptr;
910     }
911     gpr_mu_unlock(&inspect->mu);
912   } while (!found_worker);
913   return found_worker;
914 }
915 
end_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl)916 static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
917                        grpc_pollset_worker** worker_hdl) {
918   GPR_TIMER_SCOPE("end_worker", 0);
919   if (grpc_polling_trace.enabled()) {
920     gpr_log(GPR_INFO, "PS:%p END_WORKER:%p", pollset, worker);
921   }
922   if (worker_hdl != nullptr) *worker_hdl = nullptr;
923   /* Make sure we appear kicked */
924   SET_KICK_STATE(worker, KICKED);
925   grpc_closure_list_move(&worker->schedule_on_end_work,
926                          grpc_core::ExecCtx::Get()->closure_list());
927   if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
928     if (worker->next != worker && worker->next->state == UNKICKED) {
929       if (grpc_polling_trace.enabled()) {
930         gpr_log(GPR_INFO, " .. choose next poller to be peer %p", worker);
931       }
932       GPR_ASSERT(worker->next->initialized_cv);
933       gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
934       SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
935       GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
936       gpr_cv_signal(&worker->next->cv);
937       if (grpc_core::ExecCtx::Get()->HasWork()) {
938         gpr_mu_unlock(&pollset->mu);
939         grpc_core::ExecCtx::Get()->Flush();
940         gpr_mu_lock(&pollset->mu);
941       }
942     } else {
943       gpr_atm_no_barrier_store(&g_active_poller, 0);
944       size_t poller_neighborhood_idx =
945           static_cast<size_t>(pollset->neighborhood - g_neighborhoods);
946       gpr_mu_unlock(&pollset->mu);
947       bool found_worker = false;
948       bool scan_state[MAX_NEIGHBORHOODS];
949       for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
950         pollset_neighborhood* neighborhood =
951             &g_neighborhoods[(poller_neighborhood_idx + i) %
952                              g_num_neighborhoods];
953         if (gpr_mu_trylock(&neighborhood->mu)) {
954           found_worker = check_neighborhood_for_available_poller(neighborhood);
955           gpr_mu_unlock(&neighborhood->mu);
956           scan_state[i] = true;
957         } else {
958           scan_state[i] = false;
959         }
960       }
961       for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
962         if (scan_state[i]) continue;
963         pollset_neighborhood* neighborhood =
964             &g_neighborhoods[(poller_neighborhood_idx + i) %
965                              g_num_neighborhoods];
966         gpr_mu_lock(&neighborhood->mu);
967         found_worker = check_neighborhood_for_available_poller(neighborhood);
968         gpr_mu_unlock(&neighborhood->mu);
969       }
970       grpc_core::ExecCtx::Get()->Flush();
971       gpr_mu_lock(&pollset->mu);
972     }
973   } else if (grpc_core::ExecCtx::Get()->HasWork()) {
974     gpr_mu_unlock(&pollset->mu);
975     grpc_core::ExecCtx::Get()->Flush();
976     gpr_mu_lock(&pollset->mu);
977   }
978   if (worker->initialized_cv) {
979     gpr_cv_destroy(&worker->cv);
980   }
981   if (grpc_polling_trace.enabled()) {
982     gpr_log(GPR_INFO, " .. remove worker");
983   }
984   if (EMPTIED == worker_remove(pollset, worker)) {
985     pollset_maybe_finish_shutdown(pollset);
986   }
987   GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
988 }
989 
990 /* pollset->po.mu lock must be held by the caller before calling this.
991    The function pollset_work() may temporarily release the lock (pollset->po.mu)
992    during the course of its execution but it will always re-acquire the lock and
993    ensure that it is held by the time the function returns */
pollset_work(grpc_pollset * ps,grpc_pollset_worker ** worker_hdl,grpc_millis deadline)994 static grpc_error* pollset_work(grpc_pollset* ps,
995                                 grpc_pollset_worker** worker_hdl,
996                                 grpc_millis deadline) {
997   GPR_TIMER_SCOPE("pollset_work", 0);
998   grpc_pollset_worker worker;
999   grpc_error* error = GRPC_ERROR_NONE;
1000   static const char* err_desc = "pollset_work";
1001   if (ps->kicked_without_poller) {
1002     ps->kicked_without_poller = false;
1003     return GRPC_ERROR_NONE;
1004   }
1005 
1006   if (begin_worker(ps, &worker, worker_hdl, deadline)) {
1007     gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
1008     gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
1009     GPR_ASSERT(!ps->shutting_down);
1010     GPR_ASSERT(!ps->seen_inactive);
1011 
1012     gpr_mu_unlock(&ps->mu); /* unlock */
1013     /* This is the designated polling thread at this point and should ideally do
1014        polling. However, if there are unprocessed events left from a previous
1015        call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
1016        process the pending epoll events.
1017 
1018        The reason for decoupling do_epoll_wait and process_epoll_events is to
1019        better distrubute the work (i.e handling epoll events) across multiple
1020        threads
1021 
1022        process_epoll_events() returns very quickly: It just queues the work on
1023        exec_ctx but does not execute it (the actual exectution or more
1024        accurately grpc_core::ExecCtx::Get()->Flush() happens in end_worker()
1025        AFTER selecting a designated poller). So we are not waiting long periods
1026        without a designated poller */
1027     if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
1028         gpr_atm_acq_load(&g_epoll_set.num_events)) {
1029       append_error(&error, do_epoll_wait(ps, deadline), err_desc);
1030     }
1031     append_error(&error, process_epoll_events(ps), err_desc);
1032 
1033     gpr_mu_lock(&ps->mu); /* lock */
1034 
1035     gpr_tls_set(&g_current_thread_worker, 0);
1036   } else {
1037     gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
1038   }
1039   end_worker(ps, &worker, worker_hdl);
1040 
1041   gpr_tls_set(&g_current_thread_pollset, 0);
1042   return error;
1043 }
1044 
pollset_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)1045 static grpc_error* pollset_kick(grpc_pollset* pollset,
1046                                 grpc_pollset_worker* specific_worker) {
1047   GPR_TIMER_SCOPE("pollset_kick", 0);
1048   GRPC_STATS_INC_POLLSET_KICK();
1049   grpc_error* ret_err = GRPC_ERROR_NONE;
1050   if (grpc_polling_trace.enabled()) {
1051     gpr_strvec log;
1052     gpr_strvec_init(&log);
1053     char* tmp;
1054     gpr_asprintf(&tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
1055                  specific_worker, (void*)gpr_tls_get(&g_current_thread_pollset),
1056                  (void*)gpr_tls_get(&g_current_thread_worker),
1057                  pollset->root_worker);
1058     gpr_strvec_add(&log, tmp);
1059     if (pollset->root_worker != nullptr) {
1060       gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
1061                    kick_state_string(pollset->root_worker->state),
1062                    pollset->root_worker->next,
1063                    kick_state_string(pollset->root_worker->next->state));
1064       gpr_strvec_add(&log, tmp);
1065     }
1066     if (specific_worker != nullptr) {
1067       gpr_asprintf(&tmp, " worker_kick_state=%s",
1068                    kick_state_string(specific_worker->state));
1069       gpr_strvec_add(&log, tmp);
1070     }
1071     tmp = gpr_strvec_flatten(&log, nullptr);
1072     gpr_strvec_destroy(&log);
1073     gpr_log(GPR_DEBUG, "%s", tmp);
1074     gpr_free(tmp);
1075   }
1076 
1077   if (specific_worker == nullptr) {
1078     if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
1079       grpc_pollset_worker* root_worker = pollset->root_worker;
1080       if (root_worker == nullptr) {
1081         GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER();
1082         pollset->kicked_without_poller = true;
1083         if (grpc_polling_trace.enabled()) {
1084           gpr_log(GPR_INFO, " .. kicked_without_poller");
1085         }
1086         goto done;
1087       }
1088       grpc_pollset_worker* next_worker = root_worker->next;
1089       if (root_worker->state == KICKED) {
1090         GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1091         if (grpc_polling_trace.enabled()) {
1092           gpr_log(GPR_INFO, " .. already kicked %p", root_worker);
1093         }
1094         SET_KICK_STATE(root_worker, KICKED);
1095         goto done;
1096       } else if (next_worker->state == KICKED) {
1097         GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1098         if (grpc_polling_trace.enabled()) {
1099           gpr_log(GPR_INFO, " .. already kicked %p", next_worker);
1100         }
1101         SET_KICK_STATE(next_worker, KICKED);
1102         goto done;
1103       } else if (root_worker ==
1104                      next_worker &&  // only try and wake up a poller if
1105                                      // there is no next worker
1106                  root_worker == (grpc_pollset_worker*)gpr_atm_no_barrier_load(
1107                                     &g_active_poller)) {
1108         GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1109         if (grpc_polling_trace.enabled()) {
1110           gpr_log(GPR_INFO, " .. kicked %p", root_worker);
1111         }
1112         SET_KICK_STATE(root_worker, KICKED);
1113         ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1114         goto done;
1115       } else if (next_worker->state == UNKICKED) {
1116         GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1117         if (grpc_polling_trace.enabled()) {
1118           gpr_log(GPR_INFO, " .. kicked %p", next_worker);
1119         }
1120         GPR_ASSERT(next_worker->initialized_cv);
1121         SET_KICK_STATE(next_worker, KICKED);
1122         gpr_cv_signal(&next_worker->cv);
1123         goto done;
1124       } else if (next_worker->state == DESIGNATED_POLLER) {
1125         if (root_worker->state != DESIGNATED_POLLER) {
1126           if (grpc_polling_trace.enabled()) {
1127             gpr_log(
1128                 GPR_INFO,
1129                 " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
1130                 root_worker, root_worker->initialized_cv, next_worker);
1131           }
1132           SET_KICK_STATE(root_worker, KICKED);
1133           if (root_worker->initialized_cv) {
1134             GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1135             gpr_cv_signal(&root_worker->cv);
1136           }
1137           goto done;
1138         } else {
1139           GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1140           if (grpc_polling_trace.enabled()) {
1141             gpr_log(GPR_INFO, " .. non-root poller %p (root=%p)", next_worker,
1142                     root_worker);
1143           }
1144           SET_KICK_STATE(next_worker, KICKED);
1145           ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1146           goto done;
1147         }
1148       } else {
1149         GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1150         GPR_ASSERT(next_worker->state == KICKED);
1151         SET_KICK_STATE(next_worker, KICKED);
1152         goto done;
1153       }
1154     } else {
1155       GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
1156       if (grpc_polling_trace.enabled()) {
1157         gpr_log(GPR_INFO, " .. kicked while waking up");
1158       }
1159       goto done;
1160     }
1161 
1162     GPR_UNREACHABLE_CODE(goto done);
1163   }
1164 
1165   if (specific_worker->state == KICKED) {
1166     if (grpc_polling_trace.enabled()) {
1167       gpr_log(GPR_INFO, " .. specific worker already kicked");
1168     }
1169     goto done;
1170   } else if (gpr_tls_get(&g_current_thread_worker) ==
1171              (intptr_t)specific_worker) {
1172     GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
1173     if (grpc_polling_trace.enabled()) {
1174       gpr_log(GPR_INFO, " .. mark %p kicked", specific_worker);
1175     }
1176     SET_KICK_STATE(specific_worker, KICKED);
1177     goto done;
1178   } else if (specific_worker ==
1179              (grpc_pollset_worker*)gpr_atm_no_barrier_load(&g_active_poller)) {
1180     GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1181     if (grpc_polling_trace.enabled()) {
1182       gpr_log(GPR_INFO, " .. kick active poller");
1183     }
1184     SET_KICK_STATE(specific_worker, KICKED);
1185     ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1186     goto done;
1187   } else if (specific_worker->initialized_cv) {
1188     GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1189     if (grpc_polling_trace.enabled()) {
1190       gpr_log(GPR_INFO, " .. kick waiting worker");
1191     }
1192     SET_KICK_STATE(specific_worker, KICKED);
1193     gpr_cv_signal(&specific_worker->cv);
1194     goto done;
1195   } else {
1196     GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1197     if (grpc_polling_trace.enabled()) {
1198       gpr_log(GPR_INFO, " .. kick non-waiting worker");
1199     }
1200     SET_KICK_STATE(specific_worker, KICKED);
1201     goto done;
1202   }
1203 done:
1204   return ret_err;
1205 }
1206 
pollset_add_fd(grpc_pollset * pollset,grpc_fd * fd)1207 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {}
1208 
1209 /*******************************************************************************
1210  * Pollset-set Definitions
1211  */
1212 
pollset_set_create(void)1213 static grpc_pollset_set* pollset_set_create(void) {
1214   return (grpc_pollset_set*)(static_cast<intptr_t>(0xdeafbeef));
1215 }
1216 
pollset_set_destroy(grpc_pollset_set * pss)1217 static void pollset_set_destroy(grpc_pollset_set* pss) {}
1218 
pollset_set_add_fd(grpc_pollset_set * pss,grpc_fd * fd)1219 static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) {}
1220 
pollset_set_del_fd(grpc_pollset_set * pss,grpc_fd * fd)1221 static void pollset_set_del_fd(grpc_pollset_set* pss, grpc_fd* fd) {}
1222 
pollset_set_add_pollset(grpc_pollset_set * pss,grpc_pollset * ps)1223 static void pollset_set_add_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {}
1224 
pollset_set_del_pollset(grpc_pollset_set * pss,grpc_pollset * ps)1225 static void pollset_set_del_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {}
1226 
pollset_set_add_pollset_set(grpc_pollset_set * bag,grpc_pollset_set * item)1227 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1228                                         grpc_pollset_set* item) {}
1229 
pollset_set_del_pollset_set(grpc_pollset_set * bag,grpc_pollset_set * item)1230 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1231                                         grpc_pollset_set* item) {}
1232 
1233 /*******************************************************************************
1234  * Event engine binding
1235  */
1236 
shutdown_engine(void)1237 static void shutdown_engine(void) {
1238   fd_global_shutdown();
1239   pollset_global_shutdown();
1240   epoll_set_shutdown();
1241   if (grpc_core::Fork::Enabled()) {
1242     gpr_mu_destroy(&fork_fd_list_mu);
1243     grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1244   }
1245 }
1246 
1247 static const grpc_event_engine_vtable vtable = {
1248     sizeof(grpc_pollset),
1249     true,
1250 
1251     fd_create,
1252     fd_wrapped_fd,
1253     fd_orphan,
1254     fd_shutdown,
1255     fd_notify_on_read,
1256     fd_notify_on_write,
1257     fd_notify_on_error,
1258     fd_become_readable,
1259     fd_become_writable,
1260     fd_has_errors,
1261     fd_is_shutdown,
1262 
1263     pollset_init,
1264     pollset_shutdown,
1265     pollset_destroy,
1266     pollset_work,
1267     pollset_kick,
1268     pollset_add_fd,
1269 
1270     pollset_set_create,
1271     pollset_set_destroy,
1272     pollset_set_add_pollset,
1273     pollset_set_del_pollset,
1274     pollset_set_add_pollset_set,
1275     pollset_set_del_pollset_set,
1276     pollset_set_add_fd,
1277     pollset_set_del_fd,
1278 
1279     shutdown_engine,
1280 };
1281 
1282 /* Called by the child process's post-fork handler to close open fds, including
1283  * the global epoll fd. This allows gRPC to shutdown in the child process
1284  * without interfering with connections or RPCs ongoing in the parent. */
reset_event_manager_on_fork()1285 static void reset_event_manager_on_fork() {
1286   gpr_mu_lock(&fork_fd_list_mu);
1287   while (fork_fd_list_head != nullptr) {
1288     close(fork_fd_list_head->fd);
1289     fork_fd_list_head->fd = -1;
1290     fork_fd_list_head = fork_fd_list_head->fork_fd_list->next;
1291   }
1292   gpr_mu_unlock(&fork_fd_list_mu);
1293   shutdown_engine();
1294   grpc_init_epoll1_linux(true);
1295 }
1296 
1297 /* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1298  * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
1299  * support is available */
grpc_init_epoll1_linux(bool explicit_request)1300 const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
1301   if (!grpc_has_wakeup_fd()) {
1302     gpr_log(GPR_ERROR, "Skipping epoll1 because of no wakeup fd.");
1303     return nullptr;
1304   }
1305 
1306   if (!epoll_set_init()) {
1307     return nullptr;
1308   }
1309 
1310   fd_global_init();
1311 
1312   if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1313     fd_global_shutdown();
1314     epoll_set_shutdown();
1315     return nullptr;
1316   }
1317 
1318   if (grpc_core::Fork::Enabled()) {
1319     gpr_mu_init(&fork_fd_list_mu);
1320     grpc_core::Fork::SetResetChildPollingEngineFunc(
1321         reset_event_manager_on_fork);
1322   }
1323   return &vtable;
1324 }
1325 
1326 #else /* defined(GRPC_LINUX_EPOLL) */
1327 #if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1328 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
1329 /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1330  * NULL */
grpc_init_epoll1_linux(bool explicit_request)1331 const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
1332   return nullptr;
1333 }
1334 #endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */
1335 #endif /* !defined(GRPC_LINUX_EPOLL) */
1336