1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #include <grpc/support/log.h>
24
25 /* This polling engine is only relevant on linux kernels supporting epoll
26 epoll_create() or epoll_create1() */
27 #ifdef GRPC_LINUX_EPOLL
28 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
29
30 #include <assert.h>
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <limits.h>
34 #include <poll.h>
35 #include <pthread.h>
36 #include <string.h>
37 #include <sys/epoll.h>
38 #include <sys/socket.h>
39 #include <unistd.h>
40
41 #include <string>
42 #include <vector>
43
44 #include "absl/strings/str_cat.h"
45 #include "absl/strings/str_format.h"
46 #include "absl/strings/str_join.h"
47
48 #include <grpc/support/alloc.h>
49 #include <grpc/support/cpu.h>
50
51 #include "src/core/lib/debug/stats.h"
52 #include "src/core/lib/gpr/string.h"
53 #include "src/core/lib/gpr/tls.h"
54 #include "src/core/lib/gpr/useful.h"
55 #include "src/core/lib/gprpp/manual_constructor.h"
56 #include "src/core/lib/iomgr/block_annotate.h"
57 #include "src/core/lib/iomgr/ev_posix.h"
58 #include "src/core/lib/iomgr/iomgr_internal.h"
59 #include "src/core/lib/iomgr/lockfree_event.h"
60 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
61 #include "src/core/lib/profiling/timers.h"
62
63 static grpc_wakeup_fd global_wakeup_fd;
64
65 /*******************************************************************************
66 * Singleton epoll set related fields
67 */
68
69 #define MAX_EPOLL_EVENTS 100
70 #define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
71
72 /* NOTE ON SYNCHRONIZATION:
73 * - Fields in this struct are only modified by the designated poller. Hence
74 * there is no need for any locks to protect the struct.
75 * - num_events and cursor fields have to be of atomic type to provide memory
76 * visibility guarantees only. i.e In case of multiple pollers, the designated
77 * polling thread keeps changing; the thread that wrote these values may be
78 * different from the thread reading the values
79 */
80 typedef struct epoll_set {
81 int epfd;
82
83 /* The epoll_events after the last call to epoll_wait() */
84 struct epoll_event events[MAX_EPOLL_EVENTS];
85
86 /* The number of epoll_events after the last call to epoll_wait() */
87 gpr_atm num_events;
88
89 /* Index of the first event in epoll_events that has to be processed. This
90 * field is only valid if num_events > 0 */
91 gpr_atm cursor;
92 } epoll_set;
93
94 /* The global singleton epoll set */
95 static epoll_set g_epoll_set;
96
epoll_create_and_cloexec()97 static int epoll_create_and_cloexec() {
98 #ifdef GRPC_LINUX_EPOLL_CREATE1
99 int fd = epoll_create1(EPOLL_CLOEXEC);
100 if (fd < 0) {
101 gpr_log(GPR_ERROR, "epoll_create1 unavailable");
102 }
103 #else
104 int fd = epoll_create(MAX_EPOLL_EVENTS);
105 if (fd < 0) {
106 gpr_log(GPR_ERROR, "epoll_create unavailable");
107 } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
108 gpr_log(GPR_ERROR, "fcntl following epoll_create failed");
109 return -1;
110 }
111 #endif
112 return fd;
113 }
114
115 /* Must be called *only* once */
epoll_set_init()116 static bool epoll_set_init() {
117 g_epoll_set.epfd = epoll_create_and_cloexec();
118 if (g_epoll_set.epfd < 0) {
119 return false;
120 }
121
122 gpr_log(GPR_INFO, "grpc epoll fd: %d", g_epoll_set.epfd);
123 gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
124 gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
125 return true;
126 }
127
128 /* epoll_set_init() MUST be called before calling this. */
epoll_set_shutdown()129 static void epoll_set_shutdown() {
130 if (g_epoll_set.epfd >= 0) {
131 close(g_epoll_set.epfd);
132 g_epoll_set.epfd = -1;
133 }
134 }
135
136 /*******************************************************************************
137 * Fd Declarations
138 */
139
140 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
141 struct grpc_fork_fd_list {
142 grpc_fd* fd;
143 grpc_fd* next;
144 grpc_fd* prev;
145 };
146
147 struct grpc_fd {
148 int fd;
149
150 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
151 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
152 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
153
154 struct grpc_fd* freelist_next;
155
156 grpc_iomgr_object iomgr_object;
157
158 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
159 grpc_fork_fd_list* fork_fd_list;
160 };
161
162 static void fd_global_init(void);
163 static void fd_global_shutdown(void);
164
165 /*******************************************************************************
166 * Pollset Declarations
167 */
168
169 typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
170
kick_state_string(kick_state st)171 static const char* kick_state_string(kick_state st) {
172 switch (st) {
173 case UNKICKED:
174 return "UNKICKED";
175 case KICKED:
176 return "KICKED";
177 case DESIGNATED_POLLER:
178 return "DESIGNATED_POLLER";
179 }
180 GPR_UNREACHABLE_CODE(return "UNKNOWN");
181 }
182
183 struct grpc_pollset_worker {
184 kick_state state;
185 int kick_state_mutator; // which line of code last changed kick state
186 bool initialized_cv;
187 grpc_pollset_worker* next;
188 grpc_pollset_worker* prev;
189 gpr_cv cv;
190 grpc_closure_list schedule_on_end_work;
191 };
192
193 #define SET_KICK_STATE(worker, kick_state) \
194 do { \
195 (worker)->state = (kick_state); \
196 (worker)->kick_state_mutator = __LINE__; \
197 } while (false)
198
199 #define MAX_NEIGHBORHOODS 1024
200
201 typedef struct pollset_neighborhood {
202 union {
203 char pad[GPR_CACHELINE_SIZE];
204 struct {
205 gpr_mu mu;
206 grpc_pollset* active_root;
207 };
208 };
209 } pollset_neighborhood;
210
211 struct grpc_pollset {
212 gpr_mu mu;
213 pollset_neighborhood* neighborhood;
214 bool reassigning_neighborhood;
215 grpc_pollset_worker* root_worker;
216 bool kicked_without_poller;
217
218 /* Set to true if the pollset is observed to have no workers available to
219 poll */
220 bool seen_inactive;
221 bool shutting_down; /* Is the pollset shutting down ? */
222 grpc_closure* shutdown_closure; /* Called after shutdown is complete */
223
224 /* Number of workers who are *about-to* attach themselves to the pollset
225 * worker list */
226 int begin_refs;
227
228 grpc_pollset* next;
229 grpc_pollset* prev;
230 };
231
232 /*******************************************************************************
233 * Pollset-set Declarations
234 */
235
236 struct grpc_pollset_set {
237 char unused;
238 };
239
240 /*******************************************************************************
241 * Common helpers
242 */
243
append_error(grpc_error ** composite,grpc_error * error,const char * desc)244 static bool append_error(grpc_error** composite, grpc_error* error,
245 const char* desc) {
246 if (error == GRPC_ERROR_NONE) return true;
247 if (*composite == GRPC_ERROR_NONE) {
248 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
249 }
250 *composite = grpc_error_add_child(*composite, error);
251 return false;
252 }
253
254 /*******************************************************************************
255 * Fd Definitions
256 */
257
258 /* We need to keep a freelist not because of any concerns of malloc performance
259 * but instead so that implementations with multiple threads in (for example)
260 * epoll_wait deal with the race between pollset removal and incoming poll
261 * notifications.
262 *
263 * The problem is that the poller ultimately holds a reference to this
264 * object, so it is very difficult to know when is safe to free it, at least
265 * without some expensive synchronization.
266 *
267 * If we keep the object freelisted, in the worst case losing this race just
268 * becomes a spurious read notification on a reused fd.
269 */
270
271 /* The alarm system needs to be able to wakeup 'some poller' sometimes
272 * (specifically when a new alarm needs to be triggered earlier than the next
273 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
274 * case occurs. */
275
276 static grpc_fd* fd_freelist = nullptr;
277 static gpr_mu fd_freelist_mu;
278
279 /* Only used when GRPC_ENABLE_FORK_SUPPORT=1 */
280 static grpc_fd* fork_fd_list_head = nullptr;
281 static gpr_mu fork_fd_list_mu;
282
fd_global_init(void)283 static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
284
fd_global_shutdown(void)285 static void fd_global_shutdown(void) {
286 // TODO(guantaol): We don't have a reasonable explanation about this
287 // lock()/unlock() pattern. It can be a valid barrier if there is at most one
288 // pending lock() at this point. Otherwise, there is still a possibility of
289 // use-after-free race. Need to reason about the code and/or clean it up.
290 gpr_mu_lock(&fd_freelist_mu);
291 gpr_mu_unlock(&fd_freelist_mu);
292 while (fd_freelist != nullptr) {
293 grpc_fd* fd = fd_freelist;
294 fd_freelist = fd_freelist->freelist_next;
295 gpr_free(fd);
296 }
297 gpr_mu_destroy(&fd_freelist_mu);
298 }
299
fork_fd_list_add_grpc_fd(grpc_fd * fd)300 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
301 if (grpc_core::Fork::Enabled()) {
302 gpr_mu_lock(&fork_fd_list_mu);
303 fd->fork_fd_list =
304 static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
305 fd->fork_fd_list->next = fork_fd_list_head;
306 fd->fork_fd_list->prev = nullptr;
307 if (fork_fd_list_head != nullptr) {
308 fork_fd_list_head->fork_fd_list->prev = fd;
309 }
310 fork_fd_list_head = fd;
311 gpr_mu_unlock(&fork_fd_list_mu);
312 }
313 }
314
fork_fd_list_remove_grpc_fd(grpc_fd * fd)315 static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
316 if (grpc_core::Fork::Enabled()) {
317 gpr_mu_lock(&fork_fd_list_mu);
318 if (fork_fd_list_head == fd) {
319 fork_fd_list_head = fd->fork_fd_list->next;
320 }
321 if (fd->fork_fd_list->prev != nullptr) {
322 fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next;
323 }
324 if (fd->fork_fd_list->next != nullptr) {
325 fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev;
326 }
327 gpr_free(fd->fork_fd_list);
328 gpr_mu_unlock(&fork_fd_list_mu);
329 }
330 }
331
fd_create(int fd,const char * name,bool track_err)332 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
333 grpc_fd* new_fd = nullptr;
334
335 gpr_mu_lock(&fd_freelist_mu);
336 if (fd_freelist != nullptr) {
337 new_fd = fd_freelist;
338 fd_freelist = fd_freelist->freelist_next;
339 }
340 gpr_mu_unlock(&fd_freelist_mu);
341
342 if (new_fd == nullptr) {
343 new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
344 new_fd->read_closure.Init();
345 new_fd->write_closure.Init();
346 new_fd->error_closure.Init();
347 }
348 new_fd->fd = fd;
349 new_fd->read_closure->InitEvent();
350 new_fd->write_closure->InitEvent();
351 new_fd->error_closure->InitEvent();
352
353 new_fd->freelist_next = nullptr;
354
355 std::string fd_name = absl::StrCat(name, " fd=", fd);
356 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name.c_str());
357 fork_fd_list_add_grpc_fd(new_fd);
358 #ifndef NDEBUG
359 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_fd_refcount)) {
360 gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, new_fd, fd_name.c_str());
361 }
362 #endif
363
364 struct epoll_event ev;
365 ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
366 /* Use the least significant bit of ev.data.ptr to store track_err. We expect
367 * the addresses to be word aligned. We need to store track_err to avoid
368 * synchronization issues when accessing it after receiving an event.
369 * Accessing fd would be a data race there because the fd might have been
370 * returned to the free list at that point. */
371 ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
372 (track_err ? 1 : 0));
373 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
374 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
375 }
376
377 return new_fd;
378 }
379
fd_wrapped_fd(grpc_fd * fd)380 static int fd_wrapped_fd(grpc_fd* fd) { return fd->fd; }
381
382 /* if 'releasing_fd' is true, it means that we are going to detach the internal
383 * fd from grpc_fd structure (i.e which means we should not be calling
384 * shutdown() syscall on that fd) */
fd_shutdown_internal(grpc_fd * fd,grpc_error * why,bool releasing_fd)385 static void fd_shutdown_internal(grpc_fd* fd, grpc_error* why,
386 bool releasing_fd) {
387 if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
388 if (!releasing_fd) {
389 shutdown(fd->fd, SHUT_RDWR);
390 } else {
391 /* we need a dummy event for earlier linux versions. */
392 epoll_event dummy_event;
393 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_DEL, fd->fd, &dummy_event) !=
394 0) {
395 gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno));
396 }
397 }
398 fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
399 fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
400 }
401 GRPC_ERROR_UNREF(why);
402 }
403
404 /* Might be called multiple times */
fd_shutdown(grpc_fd * fd,grpc_error * why)405 static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
406 fd_shutdown_internal(fd, why, false);
407 }
408
fd_orphan(grpc_fd * fd,grpc_closure * on_done,int * release_fd,const char * reason)409 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
410 const char* reason) {
411 grpc_error* error = GRPC_ERROR_NONE;
412 bool is_release_fd = (release_fd != nullptr);
413
414 if (!fd->read_closure->IsShutdown()) {
415 fd_shutdown_internal(fd, GRPC_ERROR_CREATE_FROM_COPIED_STRING(reason),
416 is_release_fd);
417 }
418
419 /* If release_fd is not NULL, we should be relinquishing control of the file
420 descriptor fd->fd (but we still own the grpc_fd structure). */
421 if (is_release_fd) {
422 *release_fd = fd->fd;
423 } else {
424 close(fd->fd);
425 }
426
427 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, GRPC_ERROR_REF(error));
428
429 grpc_iomgr_unregister_object(&fd->iomgr_object);
430 fork_fd_list_remove_grpc_fd(fd);
431 fd->read_closure->DestroyEvent();
432 fd->write_closure->DestroyEvent();
433 fd->error_closure->DestroyEvent();
434
435 gpr_mu_lock(&fd_freelist_mu);
436 fd->freelist_next = fd_freelist;
437 fd_freelist = fd;
438 gpr_mu_unlock(&fd_freelist_mu);
439 }
440
fd_is_shutdown(grpc_fd * fd)441 static bool fd_is_shutdown(grpc_fd* fd) {
442 return fd->read_closure->IsShutdown();
443 }
444
fd_notify_on_read(grpc_fd * fd,grpc_closure * closure)445 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
446 fd->read_closure->NotifyOn(closure);
447 }
448
fd_notify_on_write(grpc_fd * fd,grpc_closure * closure)449 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
450 fd->write_closure->NotifyOn(closure);
451 }
452
fd_notify_on_error(grpc_fd * fd,grpc_closure * closure)453 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
454 fd->error_closure->NotifyOn(closure);
455 }
456
fd_become_readable(grpc_fd * fd)457 static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
458
fd_become_writable(grpc_fd * fd)459 static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
460
fd_has_errors(grpc_fd * fd)461 static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
462
463 /*******************************************************************************
464 * Pollset Definitions
465 */
466
467 GPR_TLS_DECL(g_current_thread_pollset);
468 GPR_TLS_DECL(g_current_thread_worker);
469
470 /* The designated poller */
471 static gpr_atm g_active_poller;
472
473 static pollset_neighborhood* g_neighborhoods;
474 static size_t g_num_neighborhoods;
475
476 /* Return true if first in list */
worker_insert(grpc_pollset * pollset,grpc_pollset_worker * worker)477 static bool worker_insert(grpc_pollset* pollset, grpc_pollset_worker* worker) {
478 if (pollset->root_worker == nullptr) {
479 pollset->root_worker = worker;
480 worker->next = worker->prev = worker;
481 return true;
482 } else {
483 worker->next = pollset->root_worker;
484 worker->prev = worker->next->prev;
485 worker->next->prev = worker;
486 worker->prev->next = worker;
487 return false;
488 }
489 }
490
491 /* Return true if last in list */
492 typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
493
worker_remove(grpc_pollset * pollset,grpc_pollset_worker * worker)494 static worker_remove_result worker_remove(grpc_pollset* pollset,
495 grpc_pollset_worker* worker) {
496 if (worker == pollset->root_worker) {
497 if (worker == worker->next) {
498 pollset->root_worker = nullptr;
499 return EMPTIED;
500 } else {
501 pollset->root_worker = worker->next;
502 worker->prev->next = worker->next;
503 worker->next->prev = worker->prev;
504 return NEW_ROOT;
505 }
506 } else {
507 worker->prev->next = worker->next;
508 worker->next->prev = worker->prev;
509 return REMOVED;
510 }
511 }
512
choose_neighborhood(void)513 static size_t choose_neighborhood(void) {
514 return static_cast<size_t>(gpr_cpu_current_cpu()) % g_num_neighborhoods;
515 }
516
pollset_global_init(void)517 static grpc_error* pollset_global_init(void) {
518 gpr_tls_init(&g_current_thread_pollset);
519 gpr_tls_init(&g_current_thread_worker);
520 gpr_atm_no_barrier_store(&g_active_poller, 0);
521 global_wakeup_fd.read_fd = -1;
522 grpc_error* err = grpc_wakeup_fd_init(&global_wakeup_fd);
523 if (err != GRPC_ERROR_NONE) return err;
524 struct epoll_event ev;
525 ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
526 ev.data.ptr = &global_wakeup_fd;
527 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
528 &ev) != 0) {
529 return GRPC_OS_ERROR(errno, "epoll_ctl");
530 }
531 g_num_neighborhoods = GPR_CLAMP(gpr_cpu_num_cores(), 1, MAX_NEIGHBORHOODS);
532 g_neighborhoods = static_cast<pollset_neighborhood*>(
533 gpr_zalloc(sizeof(*g_neighborhoods) * g_num_neighborhoods));
534 for (size_t i = 0; i < g_num_neighborhoods; i++) {
535 gpr_mu_init(&g_neighborhoods[i].mu);
536 }
537 return GRPC_ERROR_NONE;
538 }
539
pollset_global_shutdown(void)540 static void pollset_global_shutdown(void) {
541 gpr_tls_destroy(&g_current_thread_pollset);
542 gpr_tls_destroy(&g_current_thread_worker);
543 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
544 for (size_t i = 0; i < g_num_neighborhoods; i++) {
545 gpr_mu_destroy(&g_neighborhoods[i].mu);
546 }
547 gpr_free(g_neighborhoods);
548 }
549
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)550 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
551 gpr_mu_init(&pollset->mu);
552 *mu = &pollset->mu;
553 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
554 pollset->reassigning_neighborhood = false;
555 pollset->root_worker = nullptr;
556 pollset->kicked_without_poller = false;
557 pollset->seen_inactive = true;
558 pollset->shutting_down = false;
559 pollset->shutdown_closure = nullptr;
560 pollset->begin_refs = 0;
561 pollset->next = pollset->prev = nullptr;
562 }
563
pollset_destroy(grpc_pollset * pollset)564 static void pollset_destroy(grpc_pollset* pollset) {
565 gpr_mu_lock(&pollset->mu);
566 if (!pollset->seen_inactive) {
567 pollset_neighborhood* neighborhood = pollset->neighborhood;
568 gpr_mu_unlock(&pollset->mu);
569 retry_lock_neighborhood:
570 gpr_mu_lock(&neighborhood->mu);
571 gpr_mu_lock(&pollset->mu);
572 if (!pollset->seen_inactive) {
573 if (pollset->neighborhood != neighborhood) {
574 gpr_mu_unlock(&neighborhood->mu);
575 neighborhood = pollset->neighborhood;
576 gpr_mu_unlock(&pollset->mu);
577 goto retry_lock_neighborhood;
578 }
579 pollset->prev->next = pollset->next;
580 pollset->next->prev = pollset->prev;
581 if (pollset == pollset->neighborhood->active_root) {
582 pollset->neighborhood->active_root =
583 pollset->next == pollset ? nullptr : pollset->next;
584 }
585 }
586 gpr_mu_unlock(&pollset->neighborhood->mu);
587 }
588 gpr_mu_unlock(&pollset->mu);
589 gpr_mu_destroy(&pollset->mu);
590 }
591
pollset_kick_all(grpc_pollset * pollset)592 static grpc_error* pollset_kick_all(grpc_pollset* pollset) {
593 GPR_TIMER_SCOPE("pollset_kick_all", 0);
594 grpc_error* error = GRPC_ERROR_NONE;
595 if (pollset->root_worker != nullptr) {
596 grpc_pollset_worker* worker = pollset->root_worker;
597 do {
598 GRPC_STATS_INC_POLLSET_KICK();
599 switch (worker->state) {
600 case KICKED:
601 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
602 break;
603 case UNKICKED:
604 SET_KICK_STATE(worker, KICKED);
605 if (worker->initialized_cv) {
606 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
607 gpr_cv_signal(&worker->cv);
608 }
609 break;
610 case DESIGNATED_POLLER:
611 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
612 SET_KICK_STATE(worker, KICKED);
613 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
614 "pollset_kick_all");
615 break;
616 }
617
618 worker = worker->next;
619 } while (worker != pollset->root_worker);
620 }
621 // TODO(sreek): Check if we need to set 'kicked_without_poller' to true here
622 // in the else case
623 return error;
624 }
625
pollset_maybe_finish_shutdown(grpc_pollset * pollset)626 static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
627 if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
628 pollset->begin_refs == 0) {
629 GPR_TIMER_MARK("pollset_finish_shutdown", 0);
630 grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_closure,
631 GRPC_ERROR_NONE);
632 pollset->shutdown_closure = nullptr;
633 }
634 }
635
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)636 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
637 GPR_TIMER_SCOPE("pollset_shutdown", 0);
638 GPR_ASSERT(pollset->shutdown_closure == nullptr);
639 GPR_ASSERT(!pollset->shutting_down);
640 pollset->shutdown_closure = closure;
641 pollset->shutting_down = true;
642 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
643 pollset_maybe_finish_shutdown(pollset);
644 }
645
poll_deadline_to_millis_timeout(grpc_millis millis)646 static int poll_deadline_to_millis_timeout(grpc_millis millis) {
647 if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
648 grpc_millis delta = millis - grpc_core::ExecCtx::Get()->Now();
649 if (delta > INT_MAX) {
650 return INT_MAX;
651 } else if (delta < 0) {
652 return 0;
653 } else {
654 return static_cast<int>(delta);
655 }
656 }
657
658 /* Process the epoll events found by do_epoll_wait() function.
659 - g_epoll_set.cursor points to the index of the first event to be processed
660 - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
661 updates the g_epoll_set.cursor
662
663 NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
664 called by g_active_poller thread. So there is no need for synchronization
665 when accessing fields in g_epoll_set */
process_epoll_events(grpc_pollset *)666 static grpc_error* process_epoll_events(grpc_pollset* /*pollset*/) {
667 GPR_TIMER_SCOPE("process_epoll_events", 0);
668
669 static const char* err_desc = "process_events";
670 grpc_error* error = GRPC_ERROR_NONE;
671 long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
672 long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
673 for (int idx = 0;
674 (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
675 idx++) {
676 long c = cursor++;
677 struct epoll_event* ev = &g_epoll_set.events[c];
678 void* data_ptr = ev->data.ptr;
679
680 if (data_ptr == &global_wakeup_fd) {
681 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
682 err_desc);
683 } else {
684 grpc_fd* fd = reinterpret_cast<grpc_fd*>(
685 reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
686 bool track_err =
687 reinterpret_cast<intptr_t>(data_ptr) & static_cast<intptr_t>(1);
688 bool cancel = (ev->events & EPOLLHUP) != 0;
689 bool error = (ev->events & EPOLLERR) != 0;
690 bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
691 bool write_ev = (ev->events & EPOLLOUT) != 0;
692 bool err_fallback = error && !track_err;
693
694 if (error && !err_fallback) {
695 fd_has_errors(fd);
696 }
697
698 if (read_ev || cancel || err_fallback) {
699 fd_become_readable(fd);
700 }
701
702 if (write_ev || cancel || err_fallback) {
703 fd_become_writable(fd);
704 }
705 }
706 }
707 gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
708 return error;
709 }
710
711 /* Do epoll_wait and store the events in g_epoll_set.events field. This does not
712 "process" any of the events yet; that is done in process_epoll_events().
713 *See process_epoll_events() function for more details.
714
715 NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
716 (i.e the designated poller thread) will be calling this function. So there is
717 no need for any synchronization when accesing fields in g_epoll_set */
do_epoll_wait(grpc_pollset * ps,grpc_millis deadline)718 static grpc_error* do_epoll_wait(grpc_pollset* ps, grpc_millis deadline) {
719 GPR_TIMER_SCOPE("do_epoll_wait", 0);
720
721 int r;
722 int timeout = poll_deadline_to_millis_timeout(deadline);
723 if (timeout != 0) {
724 GRPC_SCHEDULING_START_BLOCKING_REGION;
725 }
726 do {
727 GRPC_STATS_INC_SYSCALL_POLL();
728 r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
729 timeout);
730 } while (r < 0 && errno == EINTR);
731 if (timeout != 0) {
732 GRPC_SCHEDULING_END_BLOCKING_REGION;
733 }
734
735 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
736
737 GRPC_STATS_INC_POLL_EVENTS_RETURNED(r);
738
739 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
740 gpr_log(GPR_INFO, "ps: %p poll got %d events", ps, r);
741 }
742
743 gpr_atm_rel_store(&g_epoll_set.num_events, r);
744 gpr_atm_rel_store(&g_epoll_set.cursor, 0);
745
746 return GRPC_ERROR_NONE;
747 }
748
begin_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl,grpc_millis deadline)749 static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
750 grpc_pollset_worker** worker_hdl,
751 grpc_millis deadline) {
752 GPR_TIMER_SCOPE("begin_worker", 0);
753 if (worker_hdl != nullptr) *worker_hdl = worker;
754 worker->initialized_cv = false;
755 SET_KICK_STATE(worker, UNKICKED);
756 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
757 pollset->begin_refs++;
758
759 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
760 gpr_log(GPR_INFO, "PS:%p BEGIN_STARTS:%p", pollset, worker);
761 }
762
763 if (pollset->seen_inactive) {
764 // pollset has been observed to be inactive, we need to move back to the
765 // active list
766 bool is_reassigning = false;
767 if (!pollset->reassigning_neighborhood) {
768 is_reassigning = true;
769 pollset->reassigning_neighborhood = true;
770 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
771 }
772 pollset_neighborhood* neighborhood = pollset->neighborhood;
773 gpr_mu_unlock(&pollset->mu);
774 // pollset unlocked: state may change (even worker->kick_state)
775 retry_lock_neighborhood:
776 gpr_mu_lock(&neighborhood->mu);
777 gpr_mu_lock(&pollset->mu);
778 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
779 gpr_log(GPR_INFO, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
780 pollset, worker, kick_state_string(worker->state),
781 is_reassigning);
782 }
783 if (pollset->seen_inactive) {
784 if (neighborhood != pollset->neighborhood) {
785 gpr_mu_unlock(&neighborhood->mu);
786 neighborhood = pollset->neighborhood;
787 gpr_mu_unlock(&pollset->mu);
788 goto retry_lock_neighborhood;
789 }
790
791 /* In the brief time we released the pollset locks above, the worker MAY
792 have been kicked. In this case, the worker should get out of this
793 pollset ASAP and hence this should neither add the pollset to
794 neighborhood nor mark the pollset as active.
795
796 On a side note, the only way a worker's kick state could have changed
797 at this point is if it were "kicked specifically". Since the worker has
798 not added itself to the pollset yet (by calling worker_insert()), it is
799 not visible in the "kick any" path yet */
800 if (worker->state == UNKICKED) {
801 pollset->seen_inactive = false;
802 if (neighborhood->active_root == nullptr) {
803 neighborhood->active_root = pollset->next = pollset->prev = pollset;
804 /* Make this the designated poller if there isn't one already */
805 if (worker->state == UNKICKED &&
806 gpr_atm_no_barrier_cas(&g_active_poller, 0,
807 reinterpret_cast<gpr_atm>(worker))) {
808 SET_KICK_STATE(worker, DESIGNATED_POLLER);
809 }
810 } else {
811 pollset->next = neighborhood->active_root;
812 pollset->prev = pollset->next->prev;
813 pollset->next->prev = pollset->prev->next = pollset;
814 }
815 }
816 }
817 if (is_reassigning) {
818 GPR_ASSERT(pollset->reassigning_neighborhood);
819 pollset->reassigning_neighborhood = false;
820 }
821 gpr_mu_unlock(&neighborhood->mu);
822 }
823
824 worker_insert(pollset, worker);
825 pollset->begin_refs--;
826 if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
827 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
828 worker->initialized_cv = true;
829 gpr_cv_init(&worker->cv);
830 while (worker->state == UNKICKED && !pollset->shutting_down) {
831 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
832 gpr_log(GPR_INFO, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
833 pollset, worker, kick_state_string(worker->state),
834 pollset->shutting_down);
835 }
836
837 if (gpr_cv_wait(&worker->cv, &pollset->mu,
838 grpc_millis_to_timespec(deadline, GPR_CLOCK_MONOTONIC)) &&
839 worker->state == UNKICKED) {
840 /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
841 received a kick */
842 SET_KICK_STATE(worker, KICKED);
843 }
844 }
845 grpc_core::ExecCtx::Get()->InvalidateNow();
846 }
847
848 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
849 gpr_log(GPR_INFO,
850 "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
851 "kicked_without_poller: %d",
852 pollset, worker, kick_state_string(worker->state),
853 pollset->shutting_down, pollset->kicked_without_poller);
854 }
855
856 /* We release pollset lock in this function at a couple of places:
857 * 1. Briefly when assigning pollset to a neighborhood
858 * 2. When doing gpr_cv_wait()
859 * It is possible that 'kicked_without_poller' was set to true during (1) and
860 * 'shutting_down' is set to true during (1) or (2). If either of them is
861 * true, this worker cannot do polling */
862 /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
863 * case; especially when the worker is the DESIGNATED_POLLER */
864
865 if (pollset->kicked_without_poller) {
866 pollset->kicked_without_poller = false;
867 return false;
868 }
869
870 return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
871 }
872
check_neighborhood_for_available_poller(pollset_neighborhood * neighborhood)873 static bool check_neighborhood_for_available_poller(
874 pollset_neighborhood* neighborhood) {
875 GPR_TIMER_SCOPE("check_neighborhood_for_available_poller", 0);
876 bool found_worker = false;
877 do {
878 grpc_pollset* inspect = neighborhood->active_root;
879 if (inspect == nullptr) {
880 break;
881 }
882 gpr_mu_lock(&inspect->mu);
883 GPR_ASSERT(!inspect->seen_inactive);
884 grpc_pollset_worker* inspect_worker = inspect->root_worker;
885 if (inspect_worker != nullptr) {
886 do {
887 switch (inspect_worker->state) {
888 case UNKICKED:
889 if (gpr_atm_no_barrier_cas(
890 &g_active_poller, 0,
891 reinterpret_cast<gpr_atm>(inspect_worker))) {
892 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
893 gpr_log(GPR_INFO, " .. choose next poller to be %p",
894 inspect_worker);
895 }
896 SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
897 if (inspect_worker->initialized_cv) {
898 GPR_TIMER_MARK("signal worker", 0);
899 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
900 gpr_cv_signal(&inspect_worker->cv);
901 }
902 } else {
903 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
904 gpr_log(GPR_INFO, " .. beaten to choose next poller");
905 }
906 }
907 // even if we didn't win the cas, there's a worker, we can stop
908 found_worker = true;
909 break;
910 case KICKED:
911 break;
912 case DESIGNATED_POLLER:
913 found_worker = true; // ok, so someone else found the worker, but
914 // we'll accept that
915 break;
916 }
917 inspect_worker = inspect_worker->next;
918 } while (!found_worker && inspect_worker != inspect->root_worker);
919 }
920 if (!found_worker) {
921 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
922 gpr_log(GPR_INFO, " .. mark pollset %p inactive", inspect);
923 }
924 inspect->seen_inactive = true;
925 if (inspect == neighborhood->active_root) {
926 neighborhood->active_root =
927 inspect->next == inspect ? nullptr : inspect->next;
928 }
929 inspect->next->prev = inspect->prev;
930 inspect->prev->next = inspect->next;
931 inspect->next = inspect->prev = nullptr;
932 }
933 gpr_mu_unlock(&inspect->mu);
934 } while (!found_worker);
935 return found_worker;
936 }
937
end_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl)938 static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
939 grpc_pollset_worker** worker_hdl) {
940 GPR_TIMER_SCOPE("end_worker", 0);
941 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
942 gpr_log(GPR_INFO, "PS:%p END_WORKER:%p", pollset, worker);
943 }
944 if (worker_hdl != nullptr) *worker_hdl = nullptr;
945 /* Make sure we appear kicked */
946 SET_KICK_STATE(worker, KICKED);
947 grpc_closure_list_move(&worker->schedule_on_end_work,
948 grpc_core::ExecCtx::Get()->closure_list());
949 if (gpr_atm_no_barrier_load(&g_active_poller) ==
950 reinterpret_cast<gpr_atm>(worker)) {
951 if (worker->next != worker && worker->next->state == UNKICKED) {
952 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
953 gpr_log(GPR_INFO, " .. choose next poller to be peer %p", worker);
954 }
955 GPR_ASSERT(worker->next->initialized_cv);
956 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
957 SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
958 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
959 gpr_cv_signal(&worker->next->cv);
960 if (grpc_core::ExecCtx::Get()->HasWork()) {
961 gpr_mu_unlock(&pollset->mu);
962 grpc_core::ExecCtx::Get()->Flush();
963 gpr_mu_lock(&pollset->mu);
964 }
965 } else {
966 gpr_atm_no_barrier_store(&g_active_poller, 0);
967 size_t poller_neighborhood_idx =
968 static_cast<size_t>(pollset->neighborhood - g_neighborhoods);
969 gpr_mu_unlock(&pollset->mu);
970 bool found_worker = false;
971 bool scan_state[MAX_NEIGHBORHOODS];
972 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
973 pollset_neighborhood* neighborhood =
974 &g_neighborhoods[(poller_neighborhood_idx + i) %
975 g_num_neighborhoods];
976 if (gpr_mu_trylock(&neighborhood->mu)) {
977 found_worker = check_neighborhood_for_available_poller(neighborhood);
978 gpr_mu_unlock(&neighborhood->mu);
979 scan_state[i] = true;
980 } else {
981 scan_state[i] = false;
982 }
983 }
984 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
985 if (scan_state[i]) continue;
986 pollset_neighborhood* neighborhood =
987 &g_neighborhoods[(poller_neighborhood_idx + i) %
988 g_num_neighborhoods];
989 gpr_mu_lock(&neighborhood->mu);
990 found_worker = check_neighborhood_for_available_poller(neighborhood);
991 gpr_mu_unlock(&neighborhood->mu);
992 }
993 grpc_core::ExecCtx::Get()->Flush();
994 gpr_mu_lock(&pollset->mu);
995 }
996 } else if (grpc_core::ExecCtx::Get()->HasWork()) {
997 gpr_mu_unlock(&pollset->mu);
998 grpc_core::ExecCtx::Get()->Flush();
999 gpr_mu_lock(&pollset->mu);
1000 }
1001 if (worker->initialized_cv) {
1002 gpr_cv_destroy(&worker->cv);
1003 }
1004 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1005 gpr_log(GPR_INFO, " .. remove worker");
1006 }
1007 if (EMPTIED == worker_remove(pollset, worker)) {
1008 pollset_maybe_finish_shutdown(pollset);
1009 }
1010 GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
1011 }
1012
1013 /* pollset->po.mu lock must be held by the caller before calling this.
1014 The function pollset_work() may temporarily release the lock (pollset->po.mu)
1015 during the course of its execution but it will always re-acquire the lock and
1016 ensure that it is held by the time the function returns */
pollset_work(grpc_pollset * ps,grpc_pollset_worker ** worker_hdl,grpc_millis deadline)1017 static grpc_error* pollset_work(grpc_pollset* ps,
1018 grpc_pollset_worker** worker_hdl,
1019 grpc_millis deadline) {
1020 GPR_TIMER_SCOPE("pollset_work", 0);
1021 grpc_pollset_worker worker;
1022 grpc_error* error = GRPC_ERROR_NONE;
1023 static const char* err_desc = "pollset_work";
1024 if (ps->kicked_without_poller) {
1025 ps->kicked_without_poller = false;
1026 return GRPC_ERROR_NONE;
1027 }
1028
1029 if (begin_worker(ps, &worker, worker_hdl, deadline)) {
1030 gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
1031 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
1032 GPR_ASSERT(!ps->shutting_down);
1033 GPR_ASSERT(!ps->seen_inactive);
1034
1035 gpr_mu_unlock(&ps->mu); /* unlock */
1036 /* This is the designated polling thread at this point and should ideally do
1037 polling. However, if there are unprocessed events left from a previous
1038 call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
1039 process the pending epoll events.
1040
1041 The reason for decoupling do_epoll_wait and process_epoll_events is to
1042 better distribute the work (i.e handling epoll events) across multiple
1043 threads
1044
1045 process_epoll_events() returns very quickly: It just queues the work on
1046 exec_ctx but does not execute it (the actual exectution or more
1047 accurately grpc_core::ExecCtx::Get()->Flush() happens in end_worker()
1048 AFTER selecting a designated poller). So we are not waiting long periods
1049 without a designated poller */
1050 if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
1051 gpr_atm_acq_load(&g_epoll_set.num_events)) {
1052 append_error(&error, do_epoll_wait(ps, deadline), err_desc);
1053 }
1054 append_error(&error, process_epoll_events(ps), err_desc);
1055
1056 gpr_mu_lock(&ps->mu); /* lock */
1057
1058 gpr_tls_set(&g_current_thread_worker, 0);
1059 } else {
1060 gpr_tls_set(&g_current_thread_pollset, (intptr_t)ps);
1061 }
1062 end_worker(ps, &worker, worker_hdl);
1063
1064 gpr_tls_set(&g_current_thread_pollset, 0);
1065 return error;
1066 }
1067
pollset_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)1068 static grpc_error* pollset_kick(grpc_pollset* pollset,
1069 grpc_pollset_worker* specific_worker) {
1070 GPR_TIMER_SCOPE("pollset_kick", 0);
1071 GRPC_STATS_INC_POLLSET_KICK();
1072 grpc_error* ret_err = GRPC_ERROR_NONE;
1073 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1074 std::vector<std::string> log;
1075 log.push_back(absl::StrFormat(
1076 "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset, specific_worker,
1077 reinterpret_cast<void*>(gpr_tls_get(&g_current_thread_pollset)),
1078 reinterpret_cast<void*>(gpr_tls_get(&g_current_thread_worker)),
1079 pollset->root_worker));
1080 if (pollset->root_worker != nullptr) {
1081 log.push_back(absl::StrFormat(
1082 " {kick_state=%s next=%p {kick_state=%s}}",
1083 kick_state_string(pollset->root_worker->state),
1084 pollset->root_worker->next,
1085 kick_state_string(pollset->root_worker->next->state)));
1086 }
1087 if (specific_worker != nullptr) {
1088 log.push_back(absl::StrFormat(" worker_kick_state=%s",
1089 kick_state_string(specific_worker->state)));
1090 }
1091 gpr_log(GPR_DEBUG, "%s", absl::StrJoin(log, "").c_str());
1092 }
1093
1094 if (specific_worker == nullptr) {
1095 if (gpr_tls_get(&g_current_thread_pollset) !=
1096 reinterpret_cast<intptr_t>(pollset)) {
1097 grpc_pollset_worker* root_worker = pollset->root_worker;
1098 if (root_worker == nullptr) {
1099 GRPC_STATS_INC_POLLSET_KICKED_WITHOUT_POLLER();
1100 pollset->kicked_without_poller = true;
1101 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1102 gpr_log(GPR_INFO, " .. kicked_without_poller");
1103 }
1104 goto done;
1105 }
1106 grpc_pollset_worker* next_worker = root_worker->next;
1107 if (root_worker->state == KICKED) {
1108 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1109 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1110 gpr_log(GPR_INFO, " .. already kicked %p", root_worker);
1111 }
1112 SET_KICK_STATE(root_worker, KICKED);
1113 goto done;
1114 } else if (next_worker->state == KICKED) {
1115 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1116 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1117 gpr_log(GPR_INFO, " .. already kicked %p", next_worker);
1118 }
1119 SET_KICK_STATE(next_worker, KICKED);
1120 goto done;
1121 } else if (root_worker == next_worker && // only try and wake up a poller
1122 // if there is no next worker
1123 root_worker ==
1124 reinterpret_cast<grpc_pollset_worker*>(
1125 gpr_atm_no_barrier_load(&g_active_poller))) {
1126 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1127 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1128 gpr_log(GPR_INFO, " .. kicked %p", root_worker);
1129 }
1130 SET_KICK_STATE(root_worker, KICKED);
1131 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1132 goto done;
1133 } else if (next_worker->state == UNKICKED) {
1134 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1135 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1136 gpr_log(GPR_INFO, " .. kicked %p", next_worker);
1137 }
1138 GPR_ASSERT(next_worker->initialized_cv);
1139 SET_KICK_STATE(next_worker, KICKED);
1140 gpr_cv_signal(&next_worker->cv);
1141 goto done;
1142 } else if (next_worker->state == DESIGNATED_POLLER) {
1143 if (root_worker->state != DESIGNATED_POLLER) {
1144 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1145 gpr_log(
1146 GPR_INFO,
1147 " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
1148 root_worker, root_worker->initialized_cv, next_worker);
1149 }
1150 SET_KICK_STATE(root_worker, KICKED);
1151 if (root_worker->initialized_cv) {
1152 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1153 gpr_cv_signal(&root_worker->cv);
1154 }
1155 goto done;
1156 } else {
1157 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1158 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1159 gpr_log(GPR_INFO, " .. non-root poller %p (root=%p)", next_worker,
1160 root_worker);
1161 }
1162 SET_KICK_STATE(next_worker, KICKED);
1163 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1164 goto done;
1165 }
1166 } else {
1167 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1168 GPR_ASSERT(next_worker->state == KICKED);
1169 SET_KICK_STATE(next_worker, KICKED);
1170 goto done;
1171 }
1172 } else {
1173 GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
1174 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1175 gpr_log(GPR_INFO, " .. kicked while waking up");
1176 }
1177 goto done;
1178 }
1179
1180 GPR_UNREACHABLE_CODE(goto done);
1181 }
1182
1183 if (specific_worker->state == KICKED) {
1184 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1185 gpr_log(GPR_INFO, " .. specific worker already kicked");
1186 }
1187 goto done;
1188 } else if (gpr_tls_get(&g_current_thread_worker) ==
1189 reinterpret_cast<intptr_t>(specific_worker)) {
1190 GRPC_STATS_INC_POLLSET_KICK_OWN_THREAD();
1191 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1192 gpr_log(GPR_INFO, " .. mark %p kicked", specific_worker);
1193 }
1194 SET_KICK_STATE(specific_worker, KICKED);
1195 goto done;
1196 } else if (specific_worker ==
1197 reinterpret_cast<grpc_pollset_worker*>(
1198 gpr_atm_no_barrier_load(&g_active_poller))) {
1199 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_FD();
1200 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1201 gpr_log(GPR_INFO, " .. kick active poller");
1202 }
1203 SET_KICK_STATE(specific_worker, KICKED);
1204 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1205 goto done;
1206 } else if (specific_worker->initialized_cv) {
1207 GRPC_STATS_INC_POLLSET_KICK_WAKEUP_CV();
1208 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1209 gpr_log(GPR_INFO, " .. kick waiting worker");
1210 }
1211 SET_KICK_STATE(specific_worker, KICKED);
1212 gpr_cv_signal(&specific_worker->cv);
1213 goto done;
1214 } else {
1215 GRPC_STATS_INC_POLLSET_KICKED_AGAIN();
1216 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_trace)) {
1217 gpr_log(GPR_INFO, " .. kick non-waiting worker");
1218 }
1219 SET_KICK_STATE(specific_worker, KICKED);
1220 goto done;
1221 }
1222 done:
1223 return ret_err;
1224 }
1225
pollset_add_fd(grpc_pollset *,grpc_fd *)1226 static void pollset_add_fd(grpc_pollset* /*pollset*/, grpc_fd* /*fd*/) {}
1227
1228 /*******************************************************************************
1229 * Pollset-set Definitions
1230 */
1231
pollset_set_create(void)1232 static grpc_pollset_set* pollset_set_create(void) {
1233 return reinterpret_cast<grpc_pollset_set*>(static_cast<intptr_t>(0xdeafbeef));
1234 }
1235
pollset_set_destroy(grpc_pollset_set *)1236 static void pollset_set_destroy(grpc_pollset_set* /*pss*/) {}
1237
pollset_set_add_fd(grpc_pollset_set *,grpc_fd *)1238 static void pollset_set_add_fd(grpc_pollset_set* /*pss*/, grpc_fd* /*fd*/) {}
1239
pollset_set_del_fd(grpc_pollset_set *,grpc_fd *)1240 static void pollset_set_del_fd(grpc_pollset_set* /*pss*/, grpc_fd* /*fd*/) {}
1241
pollset_set_add_pollset(grpc_pollset_set *,grpc_pollset *)1242 static void pollset_set_add_pollset(grpc_pollset_set* /*pss*/,
1243 grpc_pollset* /*ps*/) {}
1244
pollset_set_del_pollset(grpc_pollset_set *,grpc_pollset *)1245 static void pollset_set_del_pollset(grpc_pollset_set* /*pss*/,
1246 grpc_pollset* /*ps*/) {}
1247
pollset_set_add_pollset_set(grpc_pollset_set *,grpc_pollset_set *)1248 static void pollset_set_add_pollset_set(grpc_pollset_set* /*bag*/,
1249 grpc_pollset_set* /*item*/) {}
1250
pollset_set_del_pollset_set(grpc_pollset_set *,grpc_pollset_set *)1251 static void pollset_set_del_pollset_set(grpc_pollset_set* /*bag*/,
1252 grpc_pollset_set* /*item*/) {}
1253
1254 /*******************************************************************************
1255 * Event engine binding
1256 */
1257
is_any_background_poller_thread(void)1258 static bool is_any_background_poller_thread(void) { return false; }
1259
shutdown_background_closure(void)1260 static void shutdown_background_closure(void) {}
1261
add_closure_to_background_poller(grpc_closure *,grpc_error *)1262 static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
1263 grpc_error* /*error*/) {
1264 return false;
1265 }
1266
shutdown_engine(void)1267 static void shutdown_engine(void) {
1268 fd_global_shutdown();
1269 pollset_global_shutdown();
1270 epoll_set_shutdown();
1271 if (grpc_core::Fork::Enabled()) {
1272 gpr_mu_destroy(&fork_fd_list_mu);
1273 grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr);
1274 }
1275 }
1276
1277 static const grpc_event_engine_vtable vtable = {
1278 sizeof(grpc_pollset),
1279 true,
1280 false,
1281
1282 fd_create,
1283 fd_wrapped_fd,
1284 fd_orphan,
1285 fd_shutdown,
1286 fd_notify_on_read,
1287 fd_notify_on_write,
1288 fd_notify_on_error,
1289 fd_become_readable,
1290 fd_become_writable,
1291 fd_has_errors,
1292 fd_is_shutdown,
1293
1294 pollset_init,
1295 pollset_shutdown,
1296 pollset_destroy,
1297 pollset_work,
1298 pollset_kick,
1299 pollset_add_fd,
1300
1301 pollset_set_create,
1302 pollset_set_destroy,
1303 pollset_set_add_pollset,
1304 pollset_set_del_pollset,
1305 pollset_set_add_pollset_set,
1306 pollset_set_del_pollset_set,
1307 pollset_set_add_fd,
1308 pollset_set_del_fd,
1309
1310 is_any_background_poller_thread,
1311 shutdown_background_closure,
1312 shutdown_engine,
1313 add_closure_to_background_poller,
1314 };
1315
1316 /* Called by the child process's post-fork handler to close open fds, including
1317 * the global epoll fd. This allows gRPC to shutdown in the child process
1318 * without interfering with connections or RPCs ongoing in the parent. */
reset_event_manager_on_fork()1319 static void reset_event_manager_on_fork() {
1320 gpr_mu_lock(&fork_fd_list_mu);
1321 while (fork_fd_list_head != nullptr) {
1322 close(fork_fd_list_head->fd);
1323 fork_fd_list_head->fd = -1;
1324 fork_fd_list_head = fork_fd_list_head->fork_fd_list->next;
1325 }
1326 gpr_mu_unlock(&fork_fd_list_mu);
1327 shutdown_engine();
1328 grpc_init_epoll1_linux(true);
1329 }
1330
1331 /* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1332 * Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
1333 * support is available */
grpc_init_epoll1_linux(bool)1334 const grpc_event_engine_vtable* grpc_init_epoll1_linux(
1335 bool /*explicit_request*/) {
1336 if (!grpc_has_wakeup_fd()) {
1337 gpr_log(GPR_ERROR, "Skipping epoll1 because of no wakeup fd.");
1338 return nullptr;
1339 }
1340
1341 if (!epoll_set_init()) {
1342 return nullptr;
1343 }
1344
1345 fd_global_init();
1346
1347 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1348 fd_global_shutdown();
1349 epoll_set_shutdown();
1350 return nullptr;
1351 }
1352
1353 if (grpc_core::Fork::Enabled()) {
1354 gpr_mu_init(&fork_fd_list_mu);
1355 grpc_core::Fork::SetResetChildPollingEngineFunc(
1356 reset_event_manager_on_fork);
1357 }
1358 return &vtable;
1359 }
1360
1361 #else /* defined(GRPC_LINUX_EPOLL) */
1362 #if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1363 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
1364 /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
1365 * NULL */
grpc_init_epoll1_linux(bool)1366 const grpc_event_engine_vtable* grpc_init_epoll1_linux(
1367 bool /*explicit_request*/) {
1368 return nullptr;
1369 }
1370 #endif /* defined(GRPC_POSIX_SOCKET_EV_EPOLL1) */
1371 #endif /* !defined(GRPC_LINUX_EPOLL) */
1372