• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 #include "src/core/util/crash.h"
23 
24 // This polling engine is only relevant on linux kernels supporting epoll
25 // epoll_create() or epoll_create1()
26 #ifdef GRPC_LINUX_EPOLL
27 #include <assert.h>
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/cpu.h>
32 #include <limits.h>
33 #include <poll.h>
34 #include <pthread.h>
35 #include <string.h>
36 #include <sys/epoll.h>
37 #include <sys/socket.h>
38 #include <unistd.h>
39 
40 #include <string>
41 #include <vector>
42 
43 #include "absl/log/check.h"
44 #include "absl/log/log.h"
45 #include "absl/strings/str_cat.h"
46 #include "absl/strings/str_format.h"
47 #include "absl/strings/str_join.h"
48 #include "src/core/lib/iomgr/block_annotate.h"
49 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
50 #include "src/core/lib/iomgr/ev_posix.h"
51 #include "src/core/lib/iomgr/iomgr_internal.h"
52 #include "src/core/lib/iomgr/lockfree_event.h"
53 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
54 #include "src/core/telemetry/stats.h"
55 #include "src/core/telemetry/stats_data.h"
56 #include "src/core/util/manual_constructor.h"
57 #include "src/core/util/strerror.h"
58 #include "src/core/util/string.h"
59 #include "src/core/util/useful.h"
60 
61 static grpc_wakeup_fd global_wakeup_fd;
62 static bool g_is_shutdown = true;
63 
64 //******************************************************************************
65 // Singleton epoll set related fields
66 //
67 
68 #define MAX_EPOLL_EVENTS 100
69 #define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
70 
71 // NOTE ON SYNCHRONIZATION:
72 // - Fields in this struct are only modified by the designated poller. Hence
73 //   there is no need for any locks to protect the struct.
74 // - num_events and cursor fields have to be of atomic type to provide memory
75 //   visibility guarantees only. i.e In case of multiple pollers, the designated
76 //   polling thread keeps changing; the thread that wrote these values may be
77 //   different from the thread reading the values
78 //
79 typedef struct epoll_set {
80   int epfd;
81 
82   // The epoll_events after the last call to epoll_wait()
83   struct epoll_event events[MAX_EPOLL_EVENTS];
84 
85   // The number of epoll_events after the last call to epoll_wait()
86   gpr_atm num_events;
87 
88   // Index of the first event in epoll_events that has to be processed. This
89   // field is only valid if num_events > 0
90   gpr_atm cursor;
91 } epoll_set;
92 
93 // The global singleton epoll set
94 static epoll_set g_epoll_set;
95 
epoll_create_and_cloexec()96 static int epoll_create_and_cloexec() {
97 #ifdef GRPC_LINUX_EPOLL_CREATE1
98   int fd = epoll_create1(EPOLL_CLOEXEC);
99   if (fd < 0) {
100     LOG(ERROR) << "epoll_create1 unavailable";
101   }
102 #else
103   int fd = epoll_create(MAX_EPOLL_EVENTS);
104   if (fd < 0) {
105     LOG(ERROR) << "epoll_create unavailable";
106   } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
107     LOG(ERROR) << "fcntl following epoll_create failed";
108     return -1;
109   }
110 #endif
111   return fd;
112 }
113 
114 // Must be called *only* once
epoll_set_init()115 static bool epoll_set_init() {
116   g_epoll_set.epfd = epoll_create_and_cloexec();
117   if (g_epoll_set.epfd < 0) {
118     return false;
119   }
120 
121   GRPC_TRACE_LOG(polling, INFO) << "grpc epoll fd: " << g_epoll_set.epfd;
122   gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
123   gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
124   return true;
125 }
126 
127 // epoll_set_init() MUST be called before calling this.
epoll_set_shutdown()128 static void epoll_set_shutdown() {
129   if (g_epoll_set.epfd >= 0) {
130     close(g_epoll_set.epfd);
131     g_epoll_set.epfd = -1;
132   }
133 }
134 
135 //******************************************************************************
136 // Fd Declarations
137 //
138 
139 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
140 struct grpc_fork_fd_list {
141   grpc_fd* fd;
142   grpc_fd* next;
143   grpc_fd* prev;
144 };
145 
146 struct grpc_fd {
147   int fd;
148 
149   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
150   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
151   grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
152 
153   struct grpc_fd* freelist_next;
154 
155   grpc_iomgr_object iomgr_object;
156 
157   // Only used when GRPC_ENABLE_FORK_SUPPORT=1
158   grpc_fork_fd_list* fork_fd_list;
159 
160   bool is_pre_allocated;
161 };
162 
163 static void fd_global_init(void);
164 static void fd_global_shutdown(void);
165 
166 //******************************************************************************
167 // Pollset Declarations
168 //
169 
170 typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
171 
kick_state_string(kick_state st)172 static const char* kick_state_string(kick_state st) {
173   switch (st) {
174     case UNKICKED:
175       return "UNKICKED";
176     case KICKED:
177       return "KICKED";
178     case DESIGNATED_POLLER:
179       return "DESIGNATED_POLLER";
180   }
181   GPR_UNREACHABLE_CODE(return "UNKNOWN");
182 }
183 
184 struct grpc_pollset_worker {
185   kick_state state;
186   int kick_state_mutator;  // which line of code last changed kick state
187   bool initialized_cv;
188   grpc_pollset_worker* next;
189   grpc_pollset_worker* prev;
190   gpr_cv cv;
191   grpc_closure_list schedule_on_end_work;
192 };
193 
194 #define SET_KICK_STATE(worker, kick_state)   \
195   do {                                       \
196     (worker)->state = (kick_state);          \
197     (worker)->kick_state_mutator = __LINE__; \
198   } while (false)
199 
200 #define MAX_NEIGHBORHOODS 1024u
201 
202 typedef struct pollset_neighborhood {
203   union {
204     char pad[GPR_CACHELINE_SIZE];
205     struct {
206       gpr_mu mu;
207       grpc_pollset* active_root;
208     };
209   };
210 } pollset_neighborhood;
211 
212 struct grpc_pollset {
213   gpr_mu mu;
214   pollset_neighborhood* neighborhood;
215   bool reassigning_neighborhood;
216   grpc_pollset_worker* root_worker;
217   bool kicked_without_poller;
218 
219   // Set to true if the pollset is observed to have no workers available to
220   // poll
221   bool seen_inactive;
222   bool shutting_down;              // Is the pollset shutting down ?
223   grpc_closure* shutdown_closure;  // Called after shutdown is complete
224 
225   // Number of workers who are *about-to* attach themselves to the pollset
226   // worker list
227   int begin_refs;
228 
229   grpc_pollset* next;
230   grpc_pollset* prev;
231 };
232 
233 //******************************************************************************
234 // Pollset-set Declarations
235 //
236 
237 struct grpc_pollset_set {
238   char unused;
239 };
240 
241 //******************************************************************************
242 // Common helpers
243 //
244 
append_error(grpc_error_handle * composite,grpc_error_handle error,const char * desc)245 static bool append_error(grpc_error_handle* composite, grpc_error_handle error,
246                          const char* desc) {
247   if (error.ok()) return true;
248   if (composite->ok()) {
249     *composite = GRPC_ERROR_CREATE(desc);
250   }
251   *composite = grpc_error_add_child(*composite, error);
252   return false;
253 }
254 
255 //******************************************************************************
256 // Fd Definitions
257 //
258 
259 // We need to keep a freelist not because of any concerns of malloc performance
260 // but instead so that implementations with multiple threads in (for example)
261 // epoll_wait deal with the race between pollset removal and incoming poll
262 // notifications.
263 //
264 // The problem is that the poller ultimately holds a reference to this
265 // object, so it is very difficult to know when is safe to free it, at least
266 // without some expensive synchronization.
267 //
268 // If we keep the object freelisted, in the worst case losing this race just
269 // becomes a spurious read notification on a reused fd.
270 //
271 
272 // The alarm system needs to be able to wakeup 'some poller' sometimes
273 // (specifically when a new alarm needs to be triggered earlier than the next
274 // alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
275 // case occurs.
276 
277 static grpc_fd* fd_freelist = nullptr;
278 static gpr_mu fd_freelist_mu;
279 
280 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
281 static grpc_fd* fork_fd_list_head = nullptr;
282 static gpr_mu fork_fd_list_mu;
283 
fd_global_init(void)284 static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
285 
fd_global_shutdown(void)286 static void fd_global_shutdown(void) {
287   // TODO(guantaol): We don't have a reasonable explanation about this
288   // lock()/unlock() pattern. It can be a valid barrier if there is at most one
289   // pending lock() at this point. Otherwise, there is still a possibility of
290   // use-after-free race. Need to reason about the code and/or clean it up.
291   gpr_mu_lock(&fd_freelist_mu);
292   gpr_mu_unlock(&fd_freelist_mu);
293   while (fd_freelist != nullptr) {
294     grpc_fd* fd = fd_freelist;
295     fd_freelist = fd_freelist->freelist_next;
296     gpr_free(fd);
297   }
298   gpr_mu_destroy(&fd_freelist_mu);
299 }
300 
fork_fd_list_add_grpc_fd(grpc_fd * fd)301 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
302   if (grpc_core::Fork::Enabled()) {
303     gpr_mu_lock(&fork_fd_list_mu);
304     fd->fork_fd_list =
305         static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
306     fd->fork_fd_list->next = fork_fd_list_head;
307     fd->fork_fd_list->prev = nullptr;
308     if (fork_fd_list_head != nullptr) {
309       fork_fd_list_head->fork_fd_list->prev = fd;
310     }
311     fork_fd_list_head = fd;
312     gpr_mu_unlock(&fork_fd_list_mu);
313   }
314 }
315 
fork_fd_list_remove_grpc_fd(grpc_fd * fd)316 static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
317   if (grpc_core::Fork::Enabled()) {
318     gpr_mu_lock(&fork_fd_list_mu);
319     if (fork_fd_list_head == fd) {
320       fork_fd_list_head = fd->fork_fd_list->next;
321     }
322     if (fd->fork_fd_list->prev != nullptr) {
323       fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next;
324     }
325     if (fd->fork_fd_list->next != nullptr) {
326       fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev;
327     }
328     gpr_free(fd->fork_fd_list);
329     gpr_mu_unlock(&fork_fd_list_mu);
330   }
331 }
332 
fd_create(int fd,const char * name,bool track_err)333 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
334   grpc_fd* new_fd = nullptr;
335 
336   gpr_mu_lock(&fd_freelist_mu);
337   if (fd_freelist != nullptr) {
338     new_fd = fd_freelist;
339     fd_freelist = fd_freelist->freelist_next;
340   }
341   gpr_mu_unlock(&fd_freelist_mu);
342 
343   if (new_fd == nullptr) {
344     new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
345     new_fd->read_closure.Init();
346     new_fd->write_closure.Init();
347     new_fd->error_closure.Init();
348   }
349   new_fd->fd = fd;
350   new_fd->read_closure->InitEvent();
351   new_fd->write_closure->InitEvent();
352   new_fd->error_closure->InitEvent();
353 
354   new_fd->freelist_next = nullptr;
355   new_fd->is_pre_allocated = false;
356 
357   std::string fd_name = absl::StrCat(name, " fd=", fd);
358   grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name.c_str());
359   fork_fd_list_add_grpc_fd(new_fd);
360 #ifndef NDEBUG
361   GRPC_TRACE_VLOG(fd_refcount, 2)
362       << "FD " << fd << " " << new_fd << " create " << fd_name;
363 #endif
364 
365   struct epoll_event ev;
366   ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
367   // Use the least significant bit of ev.data.ptr to store track_err. We expect
368   // the addresses to be word aligned. We need to store track_err to avoid
369   // synchronization issues when accessing it after receiving an event.
370   // Accessing fd would be a data race there because the fd might have been
371   // returned to the free list at that point.
372   ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
373                                         (track_err ? 1 : 0));
374   if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
375     LOG(ERROR) << "epoll_ctl failed: " << grpc_core::StrError(errno);
376   }
377 
378   return new_fd;
379 }
380 
fd_wrapped_fd(grpc_fd * fd)381 static int fd_wrapped_fd(grpc_fd* fd) { return fd->fd; }
382 
383 // if 'releasing_fd' is true, it means that we are going to detach the internal
384 // fd from grpc_fd structure (i.e which means we should not be calling
385 // shutdown() syscall on that fd)
fd_shutdown_internal(grpc_fd * fd,grpc_error_handle why,bool releasing_fd)386 static void fd_shutdown_internal(grpc_fd* fd, grpc_error_handle why,
387                                  bool releasing_fd) {
388   if (fd->read_closure->SetShutdown(why)) {
389     if (!releasing_fd) {
390       if (!fd->is_pre_allocated) {
391         shutdown(fd->fd, SHUT_RDWR);
392       }
393     } else {
394       // we need a phony event for earlier linux versions.
395       epoll_event phony_event;
396       if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_DEL, fd->fd, &phony_event) !=
397           0) {
398         LOG(ERROR) << "epoll_ctl failed: " << grpc_core::StrError(errno);
399       }
400     }
401     fd->write_closure->SetShutdown(why);
402     fd->error_closure->SetShutdown(why);
403   }
404 }
405 
406 // Might be called multiple times
fd_shutdown(grpc_fd * fd,grpc_error_handle why)407 static void fd_shutdown(grpc_fd* fd, grpc_error_handle why) {
408   fd_shutdown_internal(fd, why, false);
409 }
410 
fd_orphan(grpc_fd * fd,grpc_closure * on_done,int * release_fd,const char * reason)411 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
412                       const char* reason) {
413   grpc_error_handle error;
414   bool is_release_fd = (release_fd != nullptr);
415 
416   if (!fd->read_closure->IsShutdown()) {
417     fd_shutdown_internal(fd, GRPC_ERROR_CREATE(reason), is_release_fd);
418   }
419 
420   // If release_fd is not NULL, we should be relinquishing control of the file
421   // descriptor fd->fd (but we still own the grpc_fd structure).
422   if (is_release_fd) {
423     *release_fd = fd->fd;
424   } else {
425     if (!fd->is_pre_allocated) {
426       close(fd->fd);
427     }
428   }
429 
430   grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
431 
432   grpc_iomgr_unregister_object(&fd->iomgr_object);
433   fork_fd_list_remove_grpc_fd(fd);
434   fd->read_closure->DestroyEvent();
435   fd->write_closure->DestroyEvent();
436   fd->error_closure->DestroyEvent();
437 
438   gpr_mu_lock(&fd_freelist_mu);
439   fd->freelist_next = fd_freelist;
440   fd_freelist = fd;
441   gpr_mu_unlock(&fd_freelist_mu);
442 }
443 
fd_is_shutdown(grpc_fd * fd)444 static bool fd_is_shutdown(grpc_fd* fd) {
445   return fd->read_closure->IsShutdown();
446 }
447 
fd_notify_on_read(grpc_fd * fd,grpc_closure * closure)448 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
449   fd->read_closure->NotifyOn(closure);
450 }
451 
fd_notify_on_write(grpc_fd * fd,grpc_closure * closure)452 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
453   fd->write_closure->NotifyOn(closure);
454 }
455 
fd_notify_on_error(grpc_fd * fd,grpc_closure * closure)456 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
457   fd->error_closure->NotifyOn(closure);
458 }
459 
fd_become_readable(grpc_fd * fd)460 static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
461 
fd_become_writable(grpc_fd * fd)462 static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
463 
fd_has_errors(grpc_fd * fd)464 static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
465 
fd_set_pre_allocated(grpc_fd * fd)466 static void fd_set_pre_allocated(grpc_fd* fd) { fd->is_pre_allocated = true; }
467 
468 //******************************************************************************
469 // Pollset Definitions
470 //
471 
472 static thread_local grpc_pollset* g_current_thread_pollset;
473 static thread_local grpc_pollset_worker* g_current_thread_worker;
474 
475 // The designated poller
476 static gpr_atm g_active_poller;
477 
478 static pollset_neighborhood* g_neighborhoods;
479 static size_t g_num_neighborhoods;
480 
481 // Return true if first in list
worker_insert(grpc_pollset * pollset,grpc_pollset_worker * worker)482 static bool worker_insert(grpc_pollset* pollset, grpc_pollset_worker* worker) {
483   if (pollset->root_worker == nullptr) {
484     pollset->root_worker = worker;
485     worker->next = worker->prev = worker;
486     return true;
487   } else {
488     worker->next = pollset->root_worker;
489     worker->prev = worker->next->prev;
490     worker->next->prev = worker;
491     worker->prev->next = worker;
492     return false;
493   }
494 }
495 
496 // Return true if last in list
497 typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
498 
worker_remove(grpc_pollset * pollset,grpc_pollset_worker * worker)499 static worker_remove_result worker_remove(grpc_pollset* pollset,
500                                           grpc_pollset_worker* worker) {
501   if (worker == pollset->root_worker) {
502     if (worker == worker->next) {
503       pollset->root_worker = nullptr;
504       return EMPTIED;
505     } else {
506       pollset->root_worker = worker->next;
507       worker->prev->next = worker->next;
508       worker->next->prev = worker->prev;
509       return NEW_ROOT;
510     }
511   } else {
512     worker->prev->next = worker->next;
513     worker->next->prev = worker->prev;
514     return REMOVED;
515   }
516 }
517 
choose_neighborhood(void)518 static size_t choose_neighborhood(void) {
519   return static_cast<size_t>(gpr_cpu_current_cpu()) % g_num_neighborhoods;
520 }
521 
pollset_global_init(void)522 static grpc_error_handle pollset_global_init(void) {
523   gpr_atm_no_barrier_store(&g_active_poller, 0);
524   global_wakeup_fd.read_fd = -1;
525   grpc_error_handle err = grpc_wakeup_fd_init(&global_wakeup_fd);
526   if (!err.ok()) return err;
527   struct epoll_event ev;
528   ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
529   ev.data.ptr = &global_wakeup_fd;
530   if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
531                 &ev) != 0) {
532     return GRPC_OS_ERROR(errno, "epoll_ctl");
533   }
534   g_num_neighborhoods =
535       grpc_core::Clamp(gpr_cpu_num_cores(), 1u, MAX_NEIGHBORHOODS);
536   g_neighborhoods = static_cast<pollset_neighborhood*>(
537       gpr_zalloc(sizeof(*g_neighborhoods) * g_num_neighborhoods));
538   for (size_t i = 0; i < g_num_neighborhoods; i++) {
539     gpr_mu_init(&g_neighborhoods[i].mu);
540   }
541   return absl::OkStatus();
542 }
543 
pollset_global_shutdown(void)544 static void pollset_global_shutdown(void) {
545   if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
546   for (size_t i = 0; i < g_num_neighborhoods; i++) {
547     gpr_mu_destroy(&g_neighborhoods[i].mu);
548   }
549   gpr_free(g_neighborhoods);
550 }
551 
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)552 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
553   gpr_mu_init(&pollset->mu);
554   *mu = &pollset->mu;
555   pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
556   pollset->reassigning_neighborhood = false;
557   pollset->root_worker = nullptr;
558   pollset->kicked_without_poller = false;
559   pollset->seen_inactive = true;
560   pollset->shutting_down = false;
561   pollset->shutdown_closure = nullptr;
562   pollset->begin_refs = 0;
563   pollset->next = pollset->prev = nullptr;
564 }
565 
pollset_destroy(grpc_pollset * pollset)566 static void pollset_destroy(grpc_pollset* pollset) {
567   gpr_mu_lock(&pollset->mu);
568   if (!pollset->seen_inactive) {
569     pollset_neighborhood* neighborhood = pollset->neighborhood;
570     gpr_mu_unlock(&pollset->mu);
571   retry_lock_neighborhood:
572     gpr_mu_lock(&neighborhood->mu);
573     gpr_mu_lock(&pollset->mu);
574     if (!pollset->seen_inactive) {
575       if (pollset->neighborhood != neighborhood) {
576         gpr_mu_unlock(&neighborhood->mu);
577         neighborhood = pollset->neighborhood;
578         gpr_mu_unlock(&pollset->mu);
579         goto retry_lock_neighborhood;
580       }
581       pollset->prev->next = pollset->next;
582       pollset->next->prev = pollset->prev;
583       if (pollset == pollset->neighborhood->active_root) {
584         pollset->neighborhood->active_root =
585             pollset->next == pollset ? nullptr : pollset->next;
586       }
587     }
588     gpr_mu_unlock(&pollset->neighborhood->mu);
589   }
590   gpr_mu_unlock(&pollset->mu);
591   gpr_mu_destroy(&pollset->mu);
592 }
593 
pollset_kick_all(grpc_pollset * pollset)594 static grpc_error_handle pollset_kick_all(grpc_pollset* pollset) {
595   grpc_error_handle error;
596   if (pollset->root_worker != nullptr) {
597     grpc_pollset_worker* worker = pollset->root_worker;
598     do {
599       switch (worker->state) {
600         case KICKED:
601           break;
602         case UNKICKED:
603           SET_KICK_STATE(worker, KICKED);
604           if (worker->initialized_cv) {
605             gpr_cv_signal(&worker->cv);
606           }
607           break;
608         case DESIGNATED_POLLER:
609           SET_KICK_STATE(worker, KICKED);
610           append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
611                        "pollset_kick_all");
612           break;
613       }
614 
615       worker = worker->next;
616     } while (worker != pollset->root_worker);
617   }
618   // TODO(sreek): Check if we need to set 'kicked_without_poller' to true here
619   // in the else case
620   return error;
621 }
622 
pollset_maybe_finish_shutdown(grpc_pollset * pollset)623 static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
624   if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
625       pollset->begin_refs == 0) {
626     grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_closure,
627                             absl::OkStatus());
628     pollset->shutdown_closure = nullptr;
629   }
630 }
631 
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)632 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
633   CHECK_EQ(pollset->shutdown_closure, nullptr);
634   CHECK(!pollset->shutting_down);
635   pollset->shutdown_closure = closure;
636   pollset->shutting_down = true;
637   GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
638   pollset_maybe_finish_shutdown(pollset);
639 }
640 
poll_deadline_to_millis_timeout(grpc_core::Timestamp millis)641 static int poll_deadline_to_millis_timeout(grpc_core::Timestamp millis) {
642   if (millis == grpc_core::Timestamp::InfFuture()) return -1;
643   int64_t delta = (millis - grpc_core::Timestamp::Now()).millis();
644   if (delta > INT_MAX) {
645     return INT_MAX;
646   } else if (delta < 0) {
647     return 0;
648   } else {
649     return static_cast<int>(delta);
650   }
651 }
652 
653 // Process the epoll events found by do_epoll_wait() function.
654 // - g_epoll_set.cursor points to the index of the first event to be processed
655 // - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
656 //   updates the g_epoll_set.cursor
657 
658 // NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
659 // called by g_active_poller thread. So there is no need for synchronization
660 // when accessing fields in g_epoll_set
process_epoll_events(grpc_pollset *)661 static grpc_error_handle process_epoll_events(grpc_pollset* /*pollset*/) {
662   static const char* err_desc = "process_events";
663   grpc_error_handle error;
664   long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
665   long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
666   for (int idx = 0;
667        (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
668        idx++) {
669     long c = cursor++;
670     struct epoll_event* ev = &g_epoll_set.events[c];
671     void* data_ptr = ev->data.ptr;
672 
673     if (data_ptr == &global_wakeup_fd) {
674       append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
675                    err_desc);
676     } else {
677       grpc_fd* fd = reinterpret_cast<grpc_fd*>(
678           reinterpret_cast<intptr_t>(data_ptr) & ~intptr_t{1});
679       bool track_err = reinterpret_cast<intptr_t>(data_ptr) & intptr_t{1};
680       bool cancel = (ev->events & EPOLLHUP) != 0;
681       bool error = (ev->events & EPOLLERR) != 0;
682       bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
683       bool write_ev = (ev->events & EPOLLOUT) != 0;
684       bool err_fallback = error && !track_err;
685 
686       if (error && !err_fallback) {
687         fd_has_errors(fd);
688       }
689 
690       if (read_ev || cancel || err_fallback) {
691         fd_become_readable(fd);
692       }
693 
694       if (write_ev || cancel || err_fallback) {
695         fd_become_writable(fd);
696       }
697     }
698   }
699   gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
700   return error;
701 }
702 
703 // Do epoll_wait and store the events in g_epoll_set.events field. This does not
704 // "process" any of the events yet; that is done in process_epoll_events().
705 // *See process_epoll_events() function for more details.
706 
707 // NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
708 // (i.e the designated poller thread) will be calling this function. So there is
709 // no need for any synchronization when accessing fields in g_epoll_set
do_epoll_wait(grpc_pollset * ps,grpc_core::Timestamp deadline)710 static grpc_error_handle do_epoll_wait(grpc_pollset* ps,
711                                        grpc_core::Timestamp deadline) {
712   int r;
713   int timeout = poll_deadline_to_millis_timeout(deadline);
714   if (timeout != 0) {
715     GRPC_SCHEDULING_START_BLOCKING_REGION;
716   }
717   do {
718     r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
719                    timeout);
720   } while (r < 0 && errno == EINTR);
721   if (timeout != 0) {
722     GRPC_SCHEDULING_END_BLOCKING_REGION;
723   }
724 
725   if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
726 
727   GRPC_TRACE_LOG(polling, INFO)
728       << "ps: " << ps << " poll got " << r << " events";
729 
730   gpr_atm_rel_store(&g_epoll_set.num_events, r);
731   gpr_atm_rel_store(&g_epoll_set.cursor, 0);
732 
733   return absl::OkStatus();
734 }
735 
begin_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl,grpc_core::Timestamp deadline)736 static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
737                          grpc_pollset_worker** worker_hdl,
738                          grpc_core::Timestamp deadline) {
739   if (worker_hdl != nullptr) *worker_hdl = worker;
740   worker->initialized_cv = false;
741   SET_KICK_STATE(worker, UNKICKED);
742   worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
743   pollset->begin_refs++;
744 
745   GRPC_TRACE_LOG(polling, INFO)
746       << "PS:" << pollset << " BEGIN_STARTS:" << worker;
747 
748   if (pollset->seen_inactive) {
749     // pollset has been observed to be inactive, we need to move back to the
750     // active list
751     bool is_reassigning = false;
752     if (!pollset->reassigning_neighborhood) {
753       is_reassigning = true;
754       pollset->reassigning_neighborhood = true;
755       pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
756     }
757     pollset_neighborhood* neighborhood = pollset->neighborhood;
758     gpr_mu_unlock(&pollset->mu);
759   // pollset unlocked: state may change (even worker->kick_state)
760   retry_lock_neighborhood:
761     gpr_mu_lock(&neighborhood->mu);
762     gpr_mu_lock(&pollset->mu);
763     GRPC_TRACE_LOG(polling, INFO)
764         << "PS:" << pollset << " BEGIN_REORG:" << worker
765         << " kick_state=" << kick_state_string(worker->state)
766         << " is_reassigning=" << is_reassigning;
767     if (pollset->seen_inactive) {
768       if (neighborhood != pollset->neighborhood) {
769         gpr_mu_unlock(&neighborhood->mu);
770         neighborhood = pollset->neighborhood;
771         gpr_mu_unlock(&pollset->mu);
772         goto retry_lock_neighborhood;
773       }
774 
775       // In the brief time we released the pollset locks above, the worker MAY
776       // have been kicked. In this case, the worker should get out of this
777       // pollset ASAP and hence this should neither add the pollset to
778       // neighborhood nor mark the pollset as active.
779 
780       // On a side note, the only way a worker's kick state could have changed
781       // at this point is if it were "kicked specifically". Since the worker has
782       // not added itself to the pollset yet (by calling worker_insert()), it is
783       // not visible in the "kick any" path yet
784       if (worker->state == UNKICKED) {
785         pollset->seen_inactive = false;
786         if (neighborhood->active_root == nullptr) {
787           neighborhood->active_root = pollset->next = pollset->prev = pollset;
788           // Make this the designated poller if there isn't one already
789           if (worker->state == UNKICKED &&
790               gpr_atm_no_barrier_cas(&g_active_poller, 0,
791                                      reinterpret_cast<gpr_atm>(worker))) {
792             SET_KICK_STATE(worker, DESIGNATED_POLLER);
793           }
794         } else {
795           pollset->next = neighborhood->active_root;
796           pollset->prev = pollset->next->prev;
797           pollset->next->prev = pollset->prev->next = pollset;
798         }
799       }
800     }
801     if (is_reassigning) {
802       CHECK(pollset->reassigning_neighborhood);
803       pollset->reassigning_neighborhood = false;
804     }
805     gpr_mu_unlock(&neighborhood->mu);
806   }
807 
808   worker_insert(pollset, worker);
809   pollset->begin_refs--;
810   if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
811     CHECK(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
812     worker->initialized_cv = true;
813     gpr_cv_init(&worker->cv);
814     while (worker->state == UNKICKED && !pollset->shutting_down) {
815       GRPC_TRACE_LOG(polling, INFO)
816           << "PS:" << pollset << " BEGIN_WAIT:" << worker
817           << " kick_state=" << kick_state_string(worker->state)
818           << " shutdown=" << pollset->shutting_down;
819 
820       if (gpr_cv_wait(&worker->cv, &pollset->mu,
821                       deadline.as_timespec(GPR_CLOCK_MONOTONIC)) &&
822           worker->state == UNKICKED) {
823         // If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
824         // received a kick
825         SET_KICK_STATE(worker, KICKED);
826       }
827     }
828     grpc_core::ExecCtx::Get()->InvalidateNow();
829   }
830 
831   GRPC_TRACE_LOG(polling, INFO)
832       << "PS:" << pollset << " BEGIN_DONE:" << worker
833       << " kick_state=" << kick_state_string(worker->state)
834       << " shutdown=" << pollset->shutting_down
835       << " kicked_without_poller: " << pollset->kicked_without_poller;
836 
837   // We release pollset lock in this function at a couple of places:
838   //   1. Briefly when assigning pollset to a neighborhood
839   //   2. When doing gpr_cv_wait()
840   // It is possible that 'kicked_without_poller' was set to true during (1) and
841   // 'shutting_down' is set to true during (1) or (2). If either of them is
842   // true, this worker cannot do polling
843   // TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
844   // case; especially when the worker is the DESIGNATED_POLLER
845 
846   if (pollset->kicked_without_poller) {
847     pollset->kicked_without_poller = false;
848     return false;
849   }
850 
851   return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
852 }
853 
check_neighborhood_for_available_poller(pollset_neighborhood * neighborhood)854 static bool check_neighborhood_for_available_poller(
855     pollset_neighborhood* neighborhood) {
856   bool found_worker = false;
857   do {
858     grpc_pollset* inspect = neighborhood->active_root;
859     if (inspect == nullptr) {
860       break;
861     }
862     gpr_mu_lock(&inspect->mu);
863     CHECK(!inspect->seen_inactive);
864     grpc_pollset_worker* inspect_worker = inspect->root_worker;
865     if (inspect_worker != nullptr) {
866       do {
867         switch (inspect_worker->state) {
868           case UNKICKED:
869             if (gpr_atm_no_barrier_cas(
870                     &g_active_poller, 0,
871                     reinterpret_cast<gpr_atm>(inspect_worker))) {
872               GRPC_TRACE_LOG(polling, INFO)
873                   << " .. choose next poller to be " << inspect_worker;
874               SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
875               if (inspect_worker->initialized_cv) {
876                 gpr_cv_signal(&inspect_worker->cv);
877               }
878             } else {
879               GRPC_TRACE_LOG(polling, INFO)
880                   << " .. beaten to choose next poller";
881             }
882             // even if we didn't win the cas, there's a worker, we can stop
883             found_worker = true;
884             break;
885           case KICKED:
886             break;
887           case DESIGNATED_POLLER:
888             found_worker = true;  // ok, so someone else found the worker, but
889                                   // we'll accept that
890             break;
891         }
892         inspect_worker = inspect_worker->next;
893       } while (!found_worker && inspect_worker != inspect->root_worker);
894     }
895     if (!found_worker) {
896       GRPC_TRACE_LOG(polling, INFO)
897           << " .. mark pollset " << inspect << " inactive";
898       inspect->seen_inactive = true;
899       if (inspect == neighborhood->active_root) {
900         neighborhood->active_root =
901             inspect->next == inspect ? nullptr : inspect->next;
902       }
903       inspect->next->prev = inspect->prev;
904       inspect->prev->next = inspect->next;
905       inspect->next = inspect->prev = nullptr;
906     }
907     gpr_mu_unlock(&inspect->mu);
908   } while (!found_worker);
909   return found_worker;
910 }
911 
end_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl)912 static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
913                        grpc_pollset_worker** worker_hdl) {
914   GRPC_TRACE_LOG(polling, INFO) << "PS:" << pollset << " END_WORKER:" << worker;
915   if (worker_hdl != nullptr) *worker_hdl = nullptr;
916   // Make sure we appear kicked
917   SET_KICK_STATE(worker, KICKED);
918   grpc_closure_list_move(&worker->schedule_on_end_work,
919                          grpc_core::ExecCtx::Get()->closure_list());
920   if (gpr_atm_no_barrier_load(&g_active_poller) ==
921       reinterpret_cast<gpr_atm>(worker)) {
922     if (worker->next != worker && worker->next->state == UNKICKED) {
923       GRPC_TRACE_LOG(polling, INFO)
924           << " .. choose next poller to be peer " << worker;
925       CHECK(worker->next->initialized_cv);
926       gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
927       SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
928       gpr_cv_signal(&worker->next->cv);
929       if (grpc_core::ExecCtx::Get()->HasWork()) {
930         gpr_mu_unlock(&pollset->mu);
931         grpc_core::ExecCtx::Get()->Flush();
932         gpr_mu_lock(&pollset->mu);
933       }
934     } else {
935       gpr_atm_no_barrier_store(&g_active_poller, 0);
936       size_t poller_neighborhood_idx =
937           static_cast<size_t>(pollset->neighborhood - g_neighborhoods);
938       gpr_mu_unlock(&pollset->mu);
939       bool found_worker = false;
940       bool scan_state[MAX_NEIGHBORHOODS];
941       for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
942         pollset_neighborhood* neighborhood =
943             &g_neighborhoods[(poller_neighborhood_idx + i) %
944                              g_num_neighborhoods];
945         if (gpr_mu_trylock(&neighborhood->mu)) {
946           found_worker = check_neighborhood_for_available_poller(neighborhood);
947           gpr_mu_unlock(&neighborhood->mu);
948           scan_state[i] = true;
949         } else {
950           scan_state[i] = false;
951         }
952       }
953       for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
954         if (scan_state[i]) continue;
955         pollset_neighborhood* neighborhood =
956             &g_neighborhoods[(poller_neighborhood_idx + i) %
957                              g_num_neighborhoods];
958         gpr_mu_lock(&neighborhood->mu);
959         found_worker = check_neighborhood_for_available_poller(neighborhood);
960         gpr_mu_unlock(&neighborhood->mu);
961       }
962       grpc_core::ExecCtx::Get()->Flush();
963       gpr_mu_lock(&pollset->mu);
964     }
965   } else if (grpc_core::ExecCtx::Get()->HasWork()) {
966     gpr_mu_unlock(&pollset->mu);
967     grpc_core::ExecCtx::Get()->Flush();
968     gpr_mu_lock(&pollset->mu);
969   }
970   if (worker->initialized_cv) {
971     gpr_cv_destroy(&worker->cv);
972   }
973   GRPC_TRACE_LOG(polling, INFO) << " .. remove worker";
974   if (EMPTIED == worker_remove(pollset, worker)) {
975     pollset_maybe_finish_shutdown(pollset);
976   }
977   CHECK(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
978 }
979 
980 // pollset->po.mu lock must be held by the caller before calling this.
981 // The function pollset_work() may temporarily release the lock (pollset->po.mu)
982 // during the course of its execution but it will always re-acquire the lock and
983 // ensure that it is held by the time the function returns
pollset_work(grpc_pollset * ps,grpc_pollset_worker ** worker_hdl,grpc_core::Timestamp deadline)984 static grpc_error_handle pollset_work(grpc_pollset* ps,
985                                       grpc_pollset_worker** worker_hdl,
986                                       grpc_core::Timestamp deadline) {
987   grpc_pollset_worker worker;
988   grpc_error_handle error;
989   static const char* err_desc = "pollset_work";
990   if (ps->kicked_without_poller) {
991     ps->kicked_without_poller = false;
992     return absl::OkStatus();
993   }
994 
995   if (begin_worker(ps, &worker, worker_hdl, deadline)) {
996     g_current_thread_pollset = ps;
997     g_current_thread_worker = &worker;
998     CHECK(!ps->shutting_down);
999     CHECK(!ps->seen_inactive);
1000 
1001     gpr_mu_unlock(&ps->mu);  // unlock
1002     // This is the designated polling thread at this point and should ideally do
1003     // polling. However, if there are unprocessed events left from a previous
1004     // call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
1005     // process the pending epoll events.
1006 
1007     // The reason for decoupling do_epoll_wait and process_epoll_events is to
1008     // better distribute the work (i.e handling epoll events) across multiple
1009     // threads
1010 
1011     // process_epoll_events() returns very quickly: It just queues the work on
1012     // exec_ctx but does not execute it (the actual execution or more
1013     // accurately grpc_core::ExecCtx::Get()->Flush() happens in end_worker()
1014     // AFTER selecting a designated poller). So we are not waiting long periods
1015     // without a designated poller
1016     if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
1017         gpr_atm_acq_load(&g_epoll_set.num_events)) {
1018       append_error(&error, do_epoll_wait(ps, deadline), err_desc);
1019     }
1020     append_error(&error, process_epoll_events(ps), err_desc);
1021 
1022     gpr_mu_lock(&ps->mu);  // lock
1023 
1024     g_current_thread_worker = nullptr;
1025   } else {
1026     g_current_thread_pollset = ps;
1027   }
1028   end_worker(ps, &worker, worker_hdl);
1029 
1030   g_current_thread_pollset = nullptr;
1031   return error;
1032 }
1033 
pollset_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)1034 static grpc_error_handle pollset_kick(grpc_pollset* pollset,
1035                                       grpc_pollset_worker* specific_worker) {
1036   grpc_error_handle ret_err;
1037   if (GRPC_TRACE_FLAG_ENABLED(polling)) {
1038     std::vector<std::string> log;
1039     log.push_back(absl::StrFormat(
1040         "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset, specific_worker,
1041         static_cast<void*>(g_current_thread_pollset),
1042         static_cast<void*>(g_current_thread_worker), pollset->root_worker));
1043     if (pollset->root_worker != nullptr) {
1044       log.push_back(absl::StrFormat(
1045           " {kick_state=%s next=%p {kick_state=%s}}",
1046           kick_state_string(pollset->root_worker->state),
1047           pollset->root_worker->next,
1048           kick_state_string(pollset->root_worker->next->state)));
1049     }
1050     if (specific_worker != nullptr) {
1051       log.push_back(absl::StrFormat(" worker_kick_state=%s",
1052                                     kick_state_string(specific_worker->state)));
1053     }
1054     VLOG(2) << absl::StrJoin(log, "");
1055   }
1056 
1057   if (specific_worker == nullptr) {
1058     if (g_current_thread_pollset != pollset) {
1059       grpc_pollset_worker* root_worker = pollset->root_worker;
1060       if (root_worker == nullptr) {
1061         pollset->kicked_without_poller = true;
1062         GRPC_TRACE_LOG(polling, INFO) << " .. kicked_without_poller";
1063         goto done;
1064       }
1065       grpc_pollset_worker* next_worker = root_worker->next;
1066       if (root_worker->state == KICKED) {
1067         GRPC_TRACE_LOG(polling, INFO) << " .. already kicked " << root_worker;
1068         SET_KICK_STATE(root_worker, KICKED);
1069         goto done;
1070       } else if (next_worker->state == KICKED) {
1071         GRPC_TRACE_LOG(polling, INFO) << " .. already kicked " << next_worker;
1072         SET_KICK_STATE(next_worker, KICKED);
1073         goto done;
1074       } else if (root_worker == next_worker &&  // only try and wake up a poller
1075                                                 // if there is no next worker
1076                  root_worker ==
1077                      reinterpret_cast<grpc_pollset_worker*>(
1078                          gpr_atm_no_barrier_load(&g_active_poller))) {
1079         GRPC_TRACE_LOG(polling, INFO) << " .. kicked " << root_worker;
1080         SET_KICK_STATE(root_worker, KICKED);
1081         ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1082         goto done;
1083       } else if (next_worker->state == UNKICKED) {
1084         GRPC_TRACE_LOG(polling, INFO) << " .. kicked " << next_worker;
1085         CHECK(next_worker->initialized_cv);
1086         SET_KICK_STATE(next_worker, KICKED);
1087         gpr_cv_signal(&next_worker->cv);
1088         goto done;
1089       } else if (next_worker->state == DESIGNATED_POLLER) {
1090         if (root_worker->state != DESIGNATED_POLLER) {
1091           GRPC_TRACE_LOG(polling, INFO)
1092               << " .. kicked root non-poller " << root_worker
1093               << " (initialized_cv=" << root_worker->initialized_cv
1094               << ") (poller=" << next_worker << ")";
1095           SET_KICK_STATE(root_worker, KICKED);
1096           if (root_worker->initialized_cv) {
1097             gpr_cv_signal(&root_worker->cv);
1098           }
1099           goto done;
1100         } else {
1101           GRPC_TRACE_LOG(polling, INFO) << " .. non-root poller " << next_worker
1102                                         << " (root=" << root_worker << ")";
1103           SET_KICK_STATE(next_worker, KICKED);
1104           ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1105           goto done;
1106         }
1107       } else {
1108         CHECK(next_worker->state == KICKED);
1109         SET_KICK_STATE(next_worker, KICKED);
1110         goto done;
1111       }
1112     } else {
1113       GRPC_TRACE_LOG(polling, INFO) << " .. kicked while waking up";
1114       goto done;
1115     }
1116 
1117     GPR_UNREACHABLE_CODE(goto done);
1118   }
1119 
1120   if (specific_worker->state == KICKED) {
1121     GRPC_TRACE_LOG(polling, INFO) << " .. specific worker already kicked";
1122     goto done;
1123   } else if (g_current_thread_worker == specific_worker) {
1124     GRPC_TRACE_LOG(polling, INFO)
1125         << " .. mark " << specific_worker << " kicked";
1126     SET_KICK_STATE(specific_worker, KICKED);
1127     goto done;
1128   } else if (specific_worker ==
1129              reinterpret_cast<grpc_pollset_worker*>(
1130                  gpr_atm_no_barrier_load(&g_active_poller))) {
1131     GRPC_TRACE_LOG(polling, INFO) << " .. kick active poller";
1132     SET_KICK_STATE(specific_worker, KICKED);
1133     ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1134     goto done;
1135   } else if (specific_worker->initialized_cv) {
1136     GRPC_TRACE_LOG(polling, INFO) << " .. kick waiting worker";
1137     SET_KICK_STATE(specific_worker, KICKED);
1138     gpr_cv_signal(&specific_worker->cv);
1139     goto done;
1140   } else {
1141     GRPC_TRACE_LOG(polling, INFO) << " .. kick non-waiting worker";
1142     SET_KICK_STATE(specific_worker, KICKED);
1143     goto done;
1144   }
1145 done:
1146   return ret_err;
1147 }
1148 
pollset_add_fd(grpc_pollset *,grpc_fd *)1149 static void pollset_add_fd(grpc_pollset* /*pollset*/, grpc_fd* /*fd*/) {}
1150 
1151 //******************************************************************************
1152 // Pollset-set Definitions
1153 //
1154 
pollset_set_create(void)1155 static grpc_pollset_set* pollset_set_create(void) {
1156   return reinterpret_cast<grpc_pollset_set*>(static_cast<intptr_t>(0xdeafbeef));
1157 }
1158 
pollset_set_destroy(grpc_pollset_set *)1159 static void pollset_set_destroy(grpc_pollset_set* /*pss*/) {}
1160 
pollset_set_add_fd(grpc_pollset_set *,grpc_fd *)1161 static void pollset_set_add_fd(grpc_pollset_set* /*pss*/, grpc_fd* /*fd*/) {}
1162 
pollset_set_del_fd(grpc_pollset_set *,grpc_fd *)1163 static void pollset_set_del_fd(grpc_pollset_set* /*pss*/, grpc_fd* /*fd*/) {}
1164 
pollset_set_add_pollset(grpc_pollset_set *,grpc_pollset *)1165 static void pollset_set_add_pollset(grpc_pollset_set* /*pss*/,
1166                                     grpc_pollset* /*ps*/) {}
1167 
pollset_set_del_pollset(grpc_pollset_set *,grpc_pollset *)1168 static void pollset_set_del_pollset(grpc_pollset_set* /*pss*/,
1169                                     grpc_pollset* /*ps*/) {}
1170 
pollset_set_add_pollset_set(grpc_pollset_set *,grpc_pollset_set *)1171 static void pollset_set_add_pollset_set(grpc_pollset_set* /*bag*/,
1172                                         grpc_pollset_set* /*item*/) {}
1173 
pollset_set_del_pollset_set(grpc_pollset_set *,grpc_pollset_set *)1174 static void pollset_set_del_pollset_set(grpc_pollset_set* /*bag*/,
1175                                         grpc_pollset_set* /*item*/) {}
1176 
1177 //******************************************************************************
1178 // Event engine binding
1179 //
1180 
is_any_background_poller_thread(void)1181 static bool is_any_background_poller_thread(void) { return false; }
1182 
shutdown_background_closure(void)1183 static void shutdown_background_closure(void) {}
1184 
add_closure_to_background_poller(grpc_closure *,grpc_error_handle)1185 static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
1186                                              grpc_error_handle /*error*/) {
1187   return false;
1188 }
1189 
shutdown_engine(void)1190 static void shutdown_engine(void) {
1191   fd_global_shutdown();
1192   pollset_global_shutdown();
1193   epoll_set_shutdown();
1194   g_is_shutdown = true;
1195 }
1196 
1197 static bool init_epoll1_linux();
1198 
1199 const grpc_event_engine_vtable grpc_ev_epoll1_posix = {
1200     sizeof(grpc_pollset),
1201     true,
1202     false,
1203 
1204     fd_create,
1205     fd_wrapped_fd,
1206     fd_orphan,
1207     fd_shutdown,
1208     fd_notify_on_read,
1209     fd_notify_on_write,
1210     fd_notify_on_error,
1211     fd_become_readable,
1212     fd_become_writable,
1213     fd_has_errors,
1214     fd_is_shutdown,
1215 
1216     pollset_init,
1217     pollset_shutdown,
1218     pollset_destroy,
1219     pollset_work,
1220     pollset_kick,
1221     pollset_add_fd,
1222 
1223     pollset_set_create,
1224     pollset_set_destroy,
1225     pollset_set_add_pollset,
1226     pollset_set_del_pollset,
1227     pollset_set_add_pollset_set,
1228     pollset_set_del_pollset_set,
1229     pollset_set_add_fd,
1230     pollset_set_del_fd,
1231 
1232     is_any_background_poller_thread,
1233     /* name = */ "epoll1",
1234     /* check_engine_available = */
__anon753e012b0502() 1235     [](bool) { return init_epoll1_linux(); },
1236     /* init_engine = */
__anon753e012b0602() 1237     []() { CHECK(init_epoll1_linux()); },
1238     shutdown_background_closure,
1239     /* shutdown_engine = */
__anon753e012b0702() 1240     []() { shutdown_engine(); },
1241     add_closure_to_background_poller,
1242 
1243     fd_set_pre_allocated,
1244 };
1245 
1246 // Called by the child process's post-fork handler to close open fds, including
1247 // the global epoll fd. This allows gRPC to shutdown in the child process
1248 // without interfering with connections or RPCs ongoing in the parent.
reset_event_manager_on_fork()1249 static void reset_event_manager_on_fork() {
1250   if (g_is_shutdown) return;
1251   gpr_mu_lock(&fork_fd_list_mu);
1252   while (fork_fd_list_head != nullptr) {
1253     close(fork_fd_list_head->fd);
1254     fork_fd_list_head->fd = -1;
1255     fork_fd_list_head = fork_fd_list_head->fork_fd_list->next;
1256   }
1257   gpr_mu_unlock(&fork_fd_list_mu);
1258   shutdown_engine();
1259   init_epoll1_linux();
1260 }
1261 
1262 // It is possible that GLIBC has epoll but the underlying kernel doesn't.
1263 // Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
1264 // support is available
init_epoll1_linux()1265 static bool init_epoll1_linux() {
1266   if (!g_is_shutdown) return true;
1267   if (!grpc_has_wakeup_fd()) {
1268     LOG(ERROR) << "Skipping epoll1 because of no wakeup fd.";
1269     return false;
1270   }
1271 
1272   if (!epoll_set_init()) {
1273     return false;
1274   }
1275 
1276   fd_global_init();
1277 
1278   if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1279     fd_global_shutdown();
1280     epoll_set_shutdown();
1281     return false;
1282   }
1283 
1284   if (grpc_core::Fork::Enabled()) {
1285     if (grpc_core::Fork::RegisterResetChildPollingEngineFunc(
1286             reset_event_manager_on_fork)) {
1287       gpr_mu_init(&fork_fd_list_mu);
1288     }
1289   }
1290   g_is_shutdown = false;
1291   return true;
1292 }
1293 
1294 #else  // defined(GRPC_LINUX_EPOLL)
1295 #if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1296 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
1297 const grpc_event_engine_vtable grpc_ev_epoll1_posix = {
1298     1,
1299     true,
1300     false,
1301 
1302     nullptr,
1303     nullptr,
1304     nullptr,
1305     nullptr,
1306     nullptr,
1307     nullptr,
1308     nullptr,
1309     nullptr,
1310     nullptr,
1311     nullptr,
1312     nullptr,
1313 
1314     nullptr,
1315     nullptr,
1316     nullptr,
1317     nullptr,
1318     nullptr,
1319     nullptr,
1320 
1321     nullptr,
1322     nullptr,
1323     nullptr,
1324     nullptr,
1325     nullptr,
1326     nullptr,
1327     nullptr,
1328     nullptr,
1329 
1330     nullptr,
1331     /* name = */ "epoll1",
__anon753e012b0802() 1332     /* check_engine_available = */ [](bool) { return false; },
1333     nullptr,
1334     nullptr,
1335     nullptr,
1336     nullptr,
1337     nullptr,
1338 };
1339 #endif  // defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1340 #endif  // !defined(GRPC_LINUX_EPOLL)
1341