1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22 #include "src/core/util/crash.h"
23
24 // This polling engine is only relevant on linux kernels supporting epoll
25 // epoll_create() or epoll_create1()
26 #ifdef GRPC_LINUX_EPOLL
27 #include <assert.h>
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/cpu.h>
32 #include <limits.h>
33 #include <poll.h>
34 #include <pthread.h>
35 #include <string.h>
36 #include <sys/epoll.h>
37 #include <sys/socket.h>
38 #include <unistd.h>
39
40 #include <string>
41 #include <vector>
42
43 #include "absl/log/check.h"
44 #include "absl/log/log.h"
45 #include "absl/strings/str_cat.h"
46 #include "absl/strings/str_format.h"
47 #include "absl/strings/str_join.h"
48 #include "src/core/lib/iomgr/block_annotate.h"
49 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
50 #include "src/core/lib/iomgr/ev_posix.h"
51 #include "src/core/lib/iomgr/iomgr_internal.h"
52 #include "src/core/lib/iomgr/lockfree_event.h"
53 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
54 #include "src/core/telemetry/stats.h"
55 #include "src/core/telemetry/stats_data.h"
56 #include "src/core/util/manual_constructor.h"
57 #include "src/core/util/strerror.h"
58 #include "src/core/util/string.h"
59 #include "src/core/util/useful.h"
60
61 static grpc_wakeup_fd global_wakeup_fd;
62 static bool g_is_shutdown = true;
63
64 //******************************************************************************
65 // Singleton epoll set related fields
66 //
67
68 #define MAX_EPOLL_EVENTS 100
69 #define MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION 1
70
71 // NOTE ON SYNCHRONIZATION:
72 // - Fields in this struct are only modified by the designated poller. Hence
73 // there is no need for any locks to protect the struct.
74 // - num_events and cursor fields have to be of atomic type to provide memory
75 // visibility guarantees only. i.e In case of multiple pollers, the designated
76 // polling thread keeps changing; the thread that wrote these values may be
77 // different from the thread reading the values
78 //
79 typedef struct epoll_set {
80 int epfd;
81
82 // The epoll_events after the last call to epoll_wait()
83 struct epoll_event events[MAX_EPOLL_EVENTS];
84
85 // The number of epoll_events after the last call to epoll_wait()
86 gpr_atm num_events;
87
88 // Index of the first event in epoll_events that has to be processed. This
89 // field is only valid if num_events > 0
90 gpr_atm cursor;
91 } epoll_set;
92
93 // The global singleton epoll set
94 static epoll_set g_epoll_set;
95
epoll_create_and_cloexec()96 static int epoll_create_and_cloexec() {
97 #ifdef GRPC_LINUX_EPOLL_CREATE1
98 int fd = epoll_create1(EPOLL_CLOEXEC);
99 if (fd < 0) {
100 LOG(ERROR) << "epoll_create1 unavailable";
101 }
102 #else
103 int fd = epoll_create(MAX_EPOLL_EVENTS);
104 if (fd < 0) {
105 LOG(ERROR) << "epoll_create unavailable";
106 } else if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
107 LOG(ERROR) << "fcntl following epoll_create failed";
108 return -1;
109 }
110 #endif
111 return fd;
112 }
113
114 // Must be called *only* once
epoll_set_init()115 static bool epoll_set_init() {
116 g_epoll_set.epfd = epoll_create_and_cloexec();
117 if (g_epoll_set.epfd < 0) {
118 return false;
119 }
120
121 GRPC_TRACE_LOG(polling, INFO) << "grpc epoll fd: " << g_epoll_set.epfd;
122 gpr_atm_no_barrier_store(&g_epoll_set.num_events, 0);
123 gpr_atm_no_barrier_store(&g_epoll_set.cursor, 0);
124 return true;
125 }
126
127 // epoll_set_init() MUST be called before calling this.
epoll_set_shutdown()128 static void epoll_set_shutdown() {
129 if (g_epoll_set.epfd >= 0) {
130 close(g_epoll_set.epfd);
131 g_epoll_set.epfd = -1;
132 }
133 }
134
135 //******************************************************************************
136 // Fd Declarations
137 //
138
139 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
140 struct grpc_fork_fd_list {
141 grpc_fd* fd;
142 grpc_fd* next;
143 grpc_fd* prev;
144 };
145
146 struct grpc_fd {
147 int fd;
148
149 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
150 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
151 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
152
153 struct grpc_fd* freelist_next;
154
155 grpc_iomgr_object iomgr_object;
156
157 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
158 grpc_fork_fd_list* fork_fd_list;
159
160 bool is_pre_allocated;
161 };
162
163 static void fd_global_init(void);
164 static void fd_global_shutdown(void);
165
166 //******************************************************************************
167 // Pollset Declarations
168 //
169
170 typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
171
kick_state_string(kick_state st)172 static const char* kick_state_string(kick_state st) {
173 switch (st) {
174 case UNKICKED:
175 return "UNKICKED";
176 case KICKED:
177 return "KICKED";
178 case DESIGNATED_POLLER:
179 return "DESIGNATED_POLLER";
180 }
181 GPR_UNREACHABLE_CODE(return "UNKNOWN");
182 }
183
184 struct grpc_pollset_worker {
185 kick_state state;
186 int kick_state_mutator; // which line of code last changed kick state
187 bool initialized_cv;
188 grpc_pollset_worker* next;
189 grpc_pollset_worker* prev;
190 gpr_cv cv;
191 grpc_closure_list schedule_on_end_work;
192 };
193
194 #define SET_KICK_STATE(worker, kick_state) \
195 do { \
196 (worker)->state = (kick_state); \
197 (worker)->kick_state_mutator = __LINE__; \
198 } while (false)
199
200 #define MAX_NEIGHBORHOODS 1024u
201
202 typedef struct pollset_neighborhood {
203 union {
204 char pad[GPR_CACHELINE_SIZE];
205 struct {
206 gpr_mu mu;
207 grpc_pollset* active_root;
208 };
209 };
210 } pollset_neighborhood;
211
212 struct grpc_pollset {
213 gpr_mu mu;
214 pollset_neighborhood* neighborhood;
215 bool reassigning_neighborhood;
216 grpc_pollset_worker* root_worker;
217 bool kicked_without_poller;
218
219 // Set to true if the pollset is observed to have no workers available to
220 // poll
221 bool seen_inactive;
222 bool shutting_down; // Is the pollset shutting down ?
223 grpc_closure* shutdown_closure; // Called after shutdown is complete
224
225 // Number of workers who are *about-to* attach themselves to the pollset
226 // worker list
227 int begin_refs;
228
229 grpc_pollset* next;
230 grpc_pollset* prev;
231 };
232
233 //******************************************************************************
234 // Pollset-set Declarations
235 //
236
237 struct grpc_pollset_set {
238 char unused;
239 };
240
241 //******************************************************************************
242 // Common helpers
243 //
244
append_error(grpc_error_handle * composite,grpc_error_handle error,const char * desc)245 static bool append_error(grpc_error_handle* composite, grpc_error_handle error,
246 const char* desc) {
247 if (error.ok()) return true;
248 if (composite->ok()) {
249 *composite = GRPC_ERROR_CREATE(desc);
250 }
251 *composite = grpc_error_add_child(*composite, error);
252 return false;
253 }
254
255 //******************************************************************************
256 // Fd Definitions
257 //
258
259 // We need to keep a freelist not because of any concerns of malloc performance
260 // but instead so that implementations with multiple threads in (for example)
261 // epoll_wait deal with the race between pollset removal and incoming poll
262 // notifications.
263 //
264 // The problem is that the poller ultimately holds a reference to this
265 // object, so it is very difficult to know when is safe to free it, at least
266 // without some expensive synchronization.
267 //
268 // If we keep the object freelisted, in the worst case losing this race just
269 // becomes a spurious read notification on a reused fd.
270 //
271
272 // The alarm system needs to be able to wakeup 'some poller' sometimes
273 // (specifically when a new alarm needs to be triggered earlier than the next
274 // alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
275 // case occurs.
276
277 static grpc_fd* fd_freelist = nullptr;
278 static gpr_mu fd_freelist_mu;
279
280 // Only used when GRPC_ENABLE_FORK_SUPPORT=1
281 static grpc_fd* fork_fd_list_head = nullptr;
282 static gpr_mu fork_fd_list_mu;
283
fd_global_init(void)284 static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
285
fd_global_shutdown(void)286 static void fd_global_shutdown(void) {
287 // TODO(guantaol): We don't have a reasonable explanation about this
288 // lock()/unlock() pattern. It can be a valid barrier if there is at most one
289 // pending lock() at this point. Otherwise, there is still a possibility of
290 // use-after-free race. Need to reason about the code and/or clean it up.
291 gpr_mu_lock(&fd_freelist_mu);
292 gpr_mu_unlock(&fd_freelist_mu);
293 while (fd_freelist != nullptr) {
294 grpc_fd* fd = fd_freelist;
295 fd_freelist = fd_freelist->freelist_next;
296 gpr_free(fd);
297 }
298 gpr_mu_destroy(&fd_freelist_mu);
299 }
300
fork_fd_list_add_grpc_fd(grpc_fd * fd)301 static void fork_fd_list_add_grpc_fd(grpc_fd* fd) {
302 if (grpc_core::Fork::Enabled()) {
303 gpr_mu_lock(&fork_fd_list_mu);
304 fd->fork_fd_list =
305 static_cast<grpc_fork_fd_list*>(gpr_malloc(sizeof(grpc_fork_fd_list)));
306 fd->fork_fd_list->next = fork_fd_list_head;
307 fd->fork_fd_list->prev = nullptr;
308 if (fork_fd_list_head != nullptr) {
309 fork_fd_list_head->fork_fd_list->prev = fd;
310 }
311 fork_fd_list_head = fd;
312 gpr_mu_unlock(&fork_fd_list_mu);
313 }
314 }
315
fork_fd_list_remove_grpc_fd(grpc_fd * fd)316 static void fork_fd_list_remove_grpc_fd(grpc_fd* fd) {
317 if (grpc_core::Fork::Enabled()) {
318 gpr_mu_lock(&fork_fd_list_mu);
319 if (fork_fd_list_head == fd) {
320 fork_fd_list_head = fd->fork_fd_list->next;
321 }
322 if (fd->fork_fd_list->prev != nullptr) {
323 fd->fork_fd_list->prev->fork_fd_list->next = fd->fork_fd_list->next;
324 }
325 if (fd->fork_fd_list->next != nullptr) {
326 fd->fork_fd_list->next->fork_fd_list->prev = fd->fork_fd_list->prev;
327 }
328 gpr_free(fd->fork_fd_list);
329 gpr_mu_unlock(&fork_fd_list_mu);
330 }
331 }
332
fd_create(int fd,const char * name,bool track_err)333 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
334 grpc_fd* new_fd = nullptr;
335
336 gpr_mu_lock(&fd_freelist_mu);
337 if (fd_freelist != nullptr) {
338 new_fd = fd_freelist;
339 fd_freelist = fd_freelist->freelist_next;
340 }
341 gpr_mu_unlock(&fd_freelist_mu);
342
343 if (new_fd == nullptr) {
344 new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
345 new_fd->read_closure.Init();
346 new_fd->write_closure.Init();
347 new_fd->error_closure.Init();
348 }
349 new_fd->fd = fd;
350 new_fd->read_closure->InitEvent();
351 new_fd->write_closure->InitEvent();
352 new_fd->error_closure->InitEvent();
353
354 new_fd->freelist_next = nullptr;
355 new_fd->is_pre_allocated = false;
356
357 std::string fd_name = absl::StrCat(name, " fd=", fd);
358 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name.c_str());
359 fork_fd_list_add_grpc_fd(new_fd);
360 #ifndef NDEBUG
361 GRPC_TRACE_VLOG(fd_refcount, 2)
362 << "FD " << fd << " " << new_fd << " create " << fd_name;
363 #endif
364
365 struct epoll_event ev;
366 ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
367 // Use the least significant bit of ev.data.ptr to store track_err. We expect
368 // the addresses to be word aligned. We need to store track_err to avoid
369 // synchronization issues when accessing it after receiving an event.
370 // Accessing fd would be a data race there because the fd might have been
371 // returned to the free list at that point.
372 ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) |
373 (track_err ? 1 : 0));
374 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) {
375 LOG(ERROR) << "epoll_ctl failed: " << grpc_core::StrError(errno);
376 }
377
378 return new_fd;
379 }
380
fd_wrapped_fd(grpc_fd * fd)381 static int fd_wrapped_fd(grpc_fd* fd) { return fd->fd; }
382
383 // if 'releasing_fd' is true, it means that we are going to detach the internal
384 // fd from grpc_fd structure (i.e which means we should not be calling
385 // shutdown() syscall on that fd)
fd_shutdown_internal(grpc_fd * fd,grpc_error_handle why,bool releasing_fd)386 static void fd_shutdown_internal(grpc_fd* fd, grpc_error_handle why,
387 bool releasing_fd) {
388 if (fd->read_closure->SetShutdown(why)) {
389 if (!releasing_fd) {
390 if (!fd->is_pre_allocated) {
391 shutdown(fd->fd, SHUT_RDWR);
392 }
393 } else {
394 // we need a phony event for earlier linux versions.
395 epoll_event phony_event;
396 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_DEL, fd->fd, &phony_event) !=
397 0) {
398 LOG(ERROR) << "epoll_ctl failed: " << grpc_core::StrError(errno);
399 }
400 }
401 fd->write_closure->SetShutdown(why);
402 fd->error_closure->SetShutdown(why);
403 }
404 }
405
406 // Might be called multiple times
fd_shutdown(grpc_fd * fd,grpc_error_handle why)407 static void fd_shutdown(grpc_fd* fd, grpc_error_handle why) {
408 fd_shutdown_internal(fd, why, false);
409 }
410
fd_orphan(grpc_fd * fd,grpc_closure * on_done,int * release_fd,const char * reason)411 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
412 const char* reason) {
413 grpc_error_handle error;
414 bool is_release_fd = (release_fd != nullptr);
415
416 if (!fd->read_closure->IsShutdown()) {
417 fd_shutdown_internal(fd, GRPC_ERROR_CREATE(reason), is_release_fd);
418 }
419
420 // If release_fd is not NULL, we should be relinquishing control of the file
421 // descriptor fd->fd (but we still own the grpc_fd structure).
422 if (is_release_fd) {
423 *release_fd = fd->fd;
424 } else {
425 if (!fd->is_pre_allocated) {
426 close(fd->fd);
427 }
428 }
429
430 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_done, error);
431
432 grpc_iomgr_unregister_object(&fd->iomgr_object);
433 fork_fd_list_remove_grpc_fd(fd);
434 fd->read_closure->DestroyEvent();
435 fd->write_closure->DestroyEvent();
436 fd->error_closure->DestroyEvent();
437
438 gpr_mu_lock(&fd_freelist_mu);
439 fd->freelist_next = fd_freelist;
440 fd_freelist = fd;
441 gpr_mu_unlock(&fd_freelist_mu);
442 }
443
fd_is_shutdown(grpc_fd * fd)444 static bool fd_is_shutdown(grpc_fd* fd) {
445 return fd->read_closure->IsShutdown();
446 }
447
fd_notify_on_read(grpc_fd * fd,grpc_closure * closure)448 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
449 fd->read_closure->NotifyOn(closure);
450 }
451
fd_notify_on_write(grpc_fd * fd,grpc_closure * closure)452 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
453 fd->write_closure->NotifyOn(closure);
454 }
455
fd_notify_on_error(grpc_fd * fd,grpc_closure * closure)456 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
457 fd->error_closure->NotifyOn(closure);
458 }
459
fd_become_readable(grpc_fd * fd)460 static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
461
fd_become_writable(grpc_fd * fd)462 static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
463
fd_has_errors(grpc_fd * fd)464 static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
465
fd_set_pre_allocated(grpc_fd * fd)466 static void fd_set_pre_allocated(grpc_fd* fd) { fd->is_pre_allocated = true; }
467
468 //******************************************************************************
469 // Pollset Definitions
470 //
471
472 static thread_local grpc_pollset* g_current_thread_pollset;
473 static thread_local grpc_pollset_worker* g_current_thread_worker;
474
475 // The designated poller
476 static gpr_atm g_active_poller;
477
478 static pollset_neighborhood* g_neighborhoods;
479 static size_t g_num_neighborhoods;
480
481 // Return true if first in list
worker_insert(grpc_pollset * pollset,grpc_pollset_worker * worker)482 static bool worker_insert(grpc_pollset* pollset, grpc_pollset_worker* worker) {
483 if (pollset->root_worker == nullptr) {
484 pollset->root_worker = worker;
485 worker->next = worker->prev = worker;
486 return true;
487 } else {
488 worker->next = pollset->root_worker;
489 worker->prev = worker->next->prev;
490 worker->next->prev = worker;
491 worker->prev->next = worker;
492 return false;
493 }
494 }
495
496 // Return true if last in list
497 typedef enum { EMPTIED, NEW_ROOT, REMOVED } worker_remove_result;
498
worker_remove(grpc_pollset * pollset,grpc_pollset_worker * worker)499 static worker_remove_result worker_remove(grpc_pollset* pollset,
500 grpc_pollset_worker* worker) {
501 if (worker == pollset->root_worker) {
502 if (worker == worker->next) {
503 pollset->root_worker = nullptr;
504 return EMPTIED;
505 } else {
506 pollset->root_worker = worker->next;
507 worker->prev->next = worker->next;
508 worker->next->prev = worker->prev;
509 return NEW_ROOT;
510 }
511 } else {
512 worker->prev->next = worker->next;
513 worker->next->prev = worker->prev;
514 return REMOVED;
515 }
516 }
517
choose_neighborhood(void)518 static size_t choose_neighborhood(void) {
519 return static_cast<size_t>(gpr_cpu_current_cpu()) % g_num_neighborhoods;
520 }
521
pollset_global_init(void)522 static grpc_error_handle pollset_global_init(void) {
523 gpr_atm_no_barrier_store(&g_active_poller, 0);
524 global_wakeup_fd.read_fd = -1;
525 grpc_error_handle err = grpc_wakeup_fd_init(&global_wakeup_fd);
526 if (!err.ok()) return err;
527 struct epoll_event ev;
528 ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
529 ev.data.ptr = &global_wakeup_fd;
530 if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, global_wakeup_fd.read_fd,
531 &ev) != 0) {
532 return GRPC_OS_ERROR(errno, "epoll_ctl");
533 }
534 g_num_neighborhoods =
535 grpc_core::Clamp(gpr_cpu_num_cores(), 1u, MAX_NEIGHBORHOODS);
536 g_neighborhoods = static_cast<pollset_neighborhood*>(
537 gpr_zalloc(sizeof(*g_neighborhoods) * g_num_neighborhoods));
538 for (size_t i = 0; i < g_num_neighborhoods; i++) {
539 gpr_mu_init(&g_neighborhoods[i].mu);
540 }
541 return absl::OkStatus();
542 }
543
pollset_global_shutdown(void)544 static void pollset_global_shutdown(void) {
545 if (global_wakeup_fd.read_fd != -1) grpc_wakeup_fd_destroy(&global_wakeup_fd);
546 for (size_t i = 0; i < g_num_neighborhoods; i++) {
547 gpr_mu_destroy(&g_neighborhoods[i].mu);
548 }
549 gpr_free(g_neighborhoods);
550 }
551
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)552 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
553 gpr_mu_init(&pollset->mu);
554 *mu = &pollset->mu;
555 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
556 pollset->reassigning_neighborhood = false;
557 pollset->root_worker = nullptr;
558 pollset->kicked_without_poller = false;
559 pollset->seen_inactive = true;
560 pollset->shutting_down = false;
561 pollset->shutdown_closure = nullptr;
562 pollset->begin_refs = 0;
563 pollset->next = pollset->prev = nullptr;
564 }
565
pollset_destroy(grpc_pollset * pollset)566 static void pollset_destroy(grpc_pollset* pollset) {
567 gpr_mu_lock(&pollset->mu);
568 if (!pollset->seen_inactive) {
569 pollset_neighborhood* neighborhood = pollset->neighborhood;
570 gpr_mu_unlock(&pollset->mu);
571 retry_lock_neighborhood:
572 gpr_mu_lock(&neighborhood->mu);
573 gpr_mu_lock(&pollset->mu);
574 if (!pollset->seen_inactive) {
575 if (pollset->neighborhood != neighborhood) {
576 gpr_mu_unlock(&neighborhood->mu);
577 neighborhood = pollset->neighborhood;
578 gpr_mu_unlock(&pollset->mu);
579 goto retry_lock_neighborhood;
580 }
581 pollset->prev->next = pollset->next;
582 pollset->next->prev = pollset->prev;
583 if (pollset == pollset->neighborhood->active_root) {
584 pollset->neighborhood->active_root =
585 pollset->next == pollset ? nullptr : pollset->next;
586 }
587 }
588 gpr_mu_unlock(&pollset->neighborhood->mu);
589 }
590 gpr_mu_unlock(&pollset->mu);
591 gpr_mu_destroy(&pollset->mu);
592 }
593
pollset_kick_all(grpc_pollset * pollset)594 static grpc_error_handle pollset_kick_all(grpc_pollset* pollset) {
595 grpc_error_handle error;
596 if (pollset->root_worker != nullptr) {
597 grpc_pollset_worker* worker = pollset->root_worker;
598 do {
599 switch (worker->state) {
600 case KICKED:
601 break;
602 case UNKICKED:
603 SET_KICK_STATE(worker, KICKED);
604 if (worker->initialized_cv) {
605 gpr_cv_signal(&worker->cv);
606 }
607 break;
608 case DESIGNATED_POLLER:
609 SET_KICK_STATE(worker, KICKED);
610 append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
611 "pollset_kick_all");
612 break;
613 }
614
615 worker = worker->next;
616 } while (worker != pollset->root_worker);
617 }
618 // TODO(sreek): Check if we need to set 'kicked_without_poller' to true here
619 // in the else case
620 return error;
621 }
622
pollset_maybe_finish_shutdown(grpc_pollset * pollset)623 static void pollset_maybe_finish_shutdown(grpc_pollset* pollset) {
624 if (pollset->shutdown_closure != nullptr && pollset->root_worker == nullptr &&
625 pollset->begin_refs == 0) {
626 grpc_core::ExecCtx::Run(DEBUG_LOCATION, pollset->shutdown_closure,
627 absl::OkStatus());
628 pollset->shutdown_closure = nullptr;
629 }
630 }
631
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)632 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
633 CHECK_EQ(pollset->shutdown_closure, nullptr);
634 CHECK(!pollset->shutting_down);
635 pollset->shutdown_closure = closure;
636 pollset->shutting_down = true;
637 GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
638 pollset_maybe_finish_shutdown(pollset);
639 }
640
poll_deadline_to_millis_timeout(grpc_core::Timestamp millis)641 static int poll_deadline_to_millis_timeout(grpc_core::Timestamp millis) {
642 if (millis == grpc_core::Timestamp::InfFuture()) return -1;
643 int64_t delta = (millis - grpc_core::Timestamp::Now()).millis();
644 if (delta > INT_MAX) {
645 return INT_MAX;
646 } else if (delta < 0) {
647 return 0;
648 } else {
649 return static_cast<int>(delta);
650 }
651 }
652
653 // Process the epoll events found by do_epoll_wait() function.
654 // - g_epoll_set.cursor points to the index of the first event to be processed
655 // - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
656 // updates the g_epoll_set.cursor
657
658 // NOTE ON SYNCRHONIZATION: Similar to do_epoll_wait(), this function is only
659 // called by g_active_poller thread. So there is no need for synchronization
660 // when accessing fields in g_epoll_set
process_epoll_events(grpc_pollset *)661 static grpc_error_handle process_epoll_events(grpc_pollset* /*pollset*/) {
662 static const char* err_desc = "process_events";
663 grpc_error_handle error;
664 long num_events = gpr_atm_acq_load(&g_epoll_set.num_events);
665 long cursor = gpr_atm_acq_load(&g_epoll_set.cursor);
666 for (int idx = 0;
667 (idx < MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION) && cursor != num_events;
668 idx++) {
669 long c = cursor++;
670 struct epoll_event* ev = &g_epoll_set.events[c];
671 void* data_ptr = ev->data.ptr;
672
673 if (data_ptr == &global_wakeup_fd) {
674 append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd),
675 err_desc);
676 } else {
677 grpc_fd* fd = reinterpret_cast<grpc_fd*>(
678 reinterpret_cast<intptr_t>(data_ptr) & ~intptr_t{1});
679 bool track_err = reinterpret_cast<intptr_t>(data_ptr) & intptr_t{1};
680 bool cancel = (ev->events & EPOLLHUP) != 0;
681 bool error = (ev->events & EPOLLERR) != 0;
682 bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
683 bool write_ev = (ev->events & EPOLLOUT) != 0;
684 bool err_fallback = error && !track_err;
685
686 if (error && !err_fallback) {
687 fd_has_errors(fd);
688 }
689
690 if (read_ev || cancel || err_fallback) {
691 fd_become_readable(fd);
692 }
693
694 if (write_ev || cancel || err_fallback) {
695 fd_become_writable(fd);
696 }
697 }
698 }
699 gpr_atm_rel_store(&g_epoll_set.cursor, cursor);
700 return error;
701 }
702
703 // Do epoll_wait and store the events in g_epoll_set.events field. This does not
704 // "process" any of the events yet; that is done in process_epoll_events().
705 // *See process_epoll_events() function for more details.
706
707 // NOTE ON SYNCHRONIZATION: At any point of time, only the g_active_poller
708 // (i.e the designated poller thread) will be calling this function. So there is
709 // no need for any synchronization when accessing fields in g_epoll_set
do_epoll_wait(grpc_pollset * ps,grpc_core::Timestamp deadline)710 static grpc_error_handle do_epoll_wait(grpc_pollset* ps,
711 grpc_core::Timestamp deadline) {
712 int r;
713 int timeout = poll_deadline_to_millis_timeout(deadline);
714 if (timeout != 0) {
715 GRPC_SCHEDULING_START_BLOCKING_REGION;
716 }
717 do {
718 r = epoll_wait(g_epoll_set.epfd, g_epoll_set.events, MAX_EPOLL_EVENTS,
719 timeout);
720 } while (r < 0 && errno == EINTR);
721 if (timeout != 0) {
722 GRPC_SCHEDULING_END_BLOCKING_REGION;
723 }
724
725 if (r < 0) return GRPC_OS_ERROR(errno, "epoll_wait");
726
727 GRPC_TRACE_LOG(polling, INFO)
728 << "ps: " << ps << " poll got " << r << " events";
729
730 gpr_atm_rel_store(&g_epoll_set.num_events, r);
731 gpr_atm_rel_store(&g_epoll_set.cursor, 0);
732
733 return absl::OkStatus();
734 }
735
begin_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl,grpc_core::Timestamp deadline)736 static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
737 grpc_pollset_worker** worker_hdl,
738 grpc_core::Timestamp deadline) {
739 if (worker_hdl != nullptr) *worker_hdl = worker;
740 worker->initialized_cv = false;
741 SET_KICK_STATE(worker, UNKICKED);
742 worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
743 pollset->begin_refs++;
744
745 GRPC_TRACE_LOG(polling, INFO)
746 << "PS:" << pollset << " BEGIN_STARTS:" << worker;
747
748 if (pollset->seen_inactive) {
749 // pollset has been observed to be inactive, we need to move back to the
750 // active list
751 bool is_reassigning = false;
752 if (!pollset->reassigning_neighborhood) {
753 is_reassigning = true;
754 pollset->reassigning_neighborhood = true;
755 pollset->neighborhood = &g_neighborhoods[choose_neighborhood()];
756 }
757 pollset_neighborhood* neighborhood = pollset->neighborhood;
758 gpr_mu_unlock(&pollset->mu);
759 // pollset unlocked: state may change (even worker->kick_state)
760 retry_lock_neighborhood:
761 gpr_mu_lock(&neighborhood->mu);
762 gpr_mu_lock(&pollset->mu);
763 GRPC_TRACE_LOG(polling, INFO)
764 << "PS:" << pollset << " BEGIN_REORG:" << worker
765 << " kick_state=" << kick_state_string(worker->state)
766 << " is_reassigning=" << is_reassigning;
767 if (pollset->seen_inactive) {
768 if (neighborhood != pollset->neighborhood) {
769 gpr_mu_unlock(&neighborhood->mu);
770 neighborhood = pollset->neighborhood;
771 gpr_mu_unlock(&pollset->mu);
772 goto retry_lock_neighborhood;
773 }
774
775 // In the brief time we released the pollset locks above, the worker MAY
776 // have been kicked. In this case, the worker should get out of this
777 // pollset ASAP and hence this should neither add the pollset to
778 // neighborhood nor mark the pollset as active.
779
780 // On a side note, the only way a worker's kick state could have changed
781 // at this point is if it were "kicked specifically". Since the worker has
782 // not added itself to the pollset yet (by calling worker_insert()), it is
783 // not visible in the "kick any" path yet
784 if (worker->state == UNKICKED) {
785 pollset->seen_inactive = false;
786 if (neighborhood->active_root == nullptr) {
787 neighborhood->active_root = pollset->next = pollset->prev = pollset;
788 // Make this the designated poller if there isn't one already
789 if (worker->state == UNKICKED &&
790 gpr_atm_no_barrier_cas(&g_active_poller, 0,
791 reinterpret_cast<gpr_atm>(worker))) {
792 SET_KICK_STATE(worker, DESIGNATED_POLLER);
793 }
794 } else {
795 pollset->next = neighborhood->active_root;
796 pollset->prev = pollset->next->prev;
797 pollset->next->prev = pollset->prev->next = pollset;
798 }
799 }
800 }
801 if (is_reassigning) {
802 CHECK(pollset->reassigning_neighborhood);
803 pollset->reassigning_neighborhood = false;
804 }
805 gpr_mu_unlock(&neighborhood->mu);
806 }
807
808 worker_insert(pollset, worker);
809 pollset->begin_refs--;
810 if (worker->state == UNKICKED && !pollset->kicked_without_poller) {
811 CHECK(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
812 worker->initialized_cv = true;
813 gpr_cv_init(&worker->cv);
814 while (worker->state == UNKICKED && !pollset->shutting_down) {
815 GRPC_TRACE_LOG(polling, INFO)
816 << "PS:" << pollset << " BEGIN_WAIT:" << worker
817 << " kick_state=" << kick_state_string(worker->state)
818 << " shutdown=" << pollset->shutting_down;
819
820 if (gpr_cv_wait(&worker->cv, &pollset->mu,
821 deadline.as_timespec(GPR_CLOCK_MONOTONIC)) &&
822 worker->state == UNKICKED) {
823 // If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
824 // received a kick
825 SET_KICK_STATE(worker, KICKED);
826 }
827 }
828 grpc_core::ExecCtx::Get()->InvalidateNow();
829 }
830
831 GRPC_TRACE_LOG(polling, INFO)
832 << "PS:" << pollset << " BEGIN_DONE:" << worker
833 << " kick_state=" << kick_state_string(worker->state)
834 << " shutdown=" << pollset->shutting_down
835 << " kicked_without_poller: " << pollset->kicked_without_poller;
836
837 // We release pollset lock in this function at a couple of places:
838 // 1. Briefly when assigning pollset to a neighborhood
839 // 2. When doing gpr_cv_wait()
840 // It is possible that 'kicked_without_poller' was set to true during (1) and
841 // 'shutting_down' is set to true during (1) or (2). If either of them is
842 // true, this worker cannot do polling
843 // TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
844 // case; especially when the worker is the DESIGNATED_POLLER
845
846 if (pollset->kicked_without_poller) {
847 pollset->kicked_without_poller = false;
848 return false;
849 }
850
851 return worker->state == DESIGNATED_POLLER && !pollset->shutting_down;
852 }
853
check_neighborhood_for_available_poller(pollset_neighborhood * neighborhood)854 static bool check_neighborhood_for_available_poller(
855 pollset_neighborhood* neighborhood) {
856 bool found_worker = false;
857 do {
858 grpc_pollset* inspect = neighborhood->active_root;
859 if (inspect == nullptr) {
860 break;
861 }
862 gpr_mu_lock(&inspect->mu);
863 CHECK(!inspect->seen_inactive);
864 grpc_pollset_worker* inspect_worker = inspect->root_worker;
865 if (inspect_worker != nullptr) {
866 do {
867 switch (inspect_worker->state) {
868 case UNKICKED:
869 if (gpr_atm_no_barrier_cas(
870 &g_active_poller, 0,
871 reinterpret_cast<gpr_atm>(inspect_worker))) {
872 GRPC_TRACE_LOG(polling, INFO)
873 << " .. choose next poller to be " << inspect_worker;
874 SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
875 if (inspect_worker->initialized_cv) {
876 gpr_cv_signal(&inspect_worker->cv);
877 }
878 } else {
879 GRPC_TRACE_LOG(polling, INFO)
880 << " .. beaten to choose next poller";
881 }
882 // even if we didn't win the cas, there's a worker, we can stop
883 found_worker = true;
884 break;
885 case KICKED:
886 break;
887 case DESIGNATED_POLLER:
888 found_worker = true; // ok, so someone else found the worker, but
889 // we'll accept that
890 break;
891 }
892 inspect_worker = inspect_worker->next;
893 } while (!found_worker && inspect_worker != inspect->root_worker);
894 }
895 if (!found_worker) {
896 GRPC_TRACE_LOG(polling, INFO)
897 << " .. mark pollset " << inspect << " inactive";
898 inspect->seen_inactive = true;
899 if (inspect == neighborhood->active_root) {
900 neighborhood->active_root =
901 inspect->next == inspect ? nullptr : inspect->next;
902 }
903 inspect->next->prev = inspect->prev;
904 inspect->prev->next = inspect->next;
905 inspect->next = inspect->prev = nullptr;
906 }
907 gpr_mu_unlock(&inspect->mu);
908 } while (!found_worker);
909 return found_worker;
910 }
911
end_worker(grpc_pollset * pollset,grpc_pollset_worker * worker,grpc_pollset_worker ** worker_hdl)912 static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker,
913 grpc_pollset_worker** worker_hdl) {
914 GRPC_TRACE_LOG(polling, INFO) << "PS:" << pollset << " END_WORKER:" << worker;
915 if (worker_hdl != nullptr) *worker_hdl = nullptr;
916 // Make sure we appear kicked
917 SET_KICK_STATE(worker, KICKED);
918 grpc_closure_list_move(&worker->schedule_on_end_work,
919 grpc_core::ExecCtx::Get()->closure_list());
920 if (gpr_atm_no_barrier_load(&g_active_poller) ==
921 reinterpret_cast<gpr_atm>(worker)) {
922 if (worker->next != worker && worker->next->state == UNKICKED) {
923 GRPC_TRACE_LOG(polling, INFO)
924 << " .. choose next poller to be peer " << worker;
925 CHECK(worker->next->initialized_cv);
926 gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
927 SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
928 gpr_cv_signal(&worker->next->cv);
929 if (grpc_core::ExecCtx::Get()->HasWork()) {
930 gpr_mu_unlock(&pollset->mu);
931 grpc_core::ExecCtx::Get()->Flush();
932 gpr_mu_lock(&pollset->mu);
933 }
934 } else {
935 gpr_atm_no_barrier_store(&g_active_poller, 0);
936 size_t poller_neighborhood_idx =
937 static_cast<size_t>(pollset->neighborhood - g_neighborhoods);
938 gpr_mu_unlock(&pollset->mu);
939 bool found_worker = false;
940 bool scan_state[MAX_NEIGHBORHOODS];
941 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
942 pollset_neighborhood* neighborhood =
943 &g_neighborhoods[(poller_neighborhood_idx + i) %
944 g_num_neighborhoods];
945 if (gpr_mu_trylock(&neighborhood->mu)) {
946 found_worker = check_neighborhood_for_available_poller(neighborhood);
947 gpr_mu_unlock(&neighborhood->mu);
948 scan_state[i] = true;
949 } else {
950 scan_state[i] = false;
951 }
952 }
953 for (size_t i = 0; !found_worker && i < g_num_neighborhoods; i++) {
954 if (scan_state[i]) continue;
955 pollset_neighborhood* neighborhood =
956 &g_neighborhoods[(poller_neighborhood_idx + i) %
957 g_num_neighborhoods];
958 gpr_mu_lock(&neighborhood->mu);
959 found_worker = check_neighborhood_for_available_poller(neighborhood);
960 gpr_mu_unlock(&neighborhood->mu);
961 }
962 grpc_core::ExecCtx::Get()->Flush();
963 gpr_mu_lock(&pollset->mu);
964 }
965 } else if (grpc_core::ExecCtx::Get()->HasWork()) {
966 gpr_mu_unlock(&pollset->mu);
967 grpc_core::ExecCtx::Get()->Flush();
968 gpr_mu_lock(&pollset->mu);
969 }
970 if (worker->initialized_cv) {
971 gpr_cv_destroy(&worker->cv);
972 }
973 GRPC_TRACE_LOG(polling, INFO) << " .. remove worker";
974 if (EMPTIED == worker_remove(pollset, worker)) {
975 pollset_maybe_finish_shutdown(pollset);
976 }
977 CHECK(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
978 }
979
980 // pollset->po.mu lock must be held by the caller before calling this.
981 // The function pollset_work() may temporarily release the lock (pollset->po.mu)
982 // during the course of its execution but it will always re-acquire the lock and
983 // ensure that it is held by the time the function returns
pollset_work(grpc_pollset * ps,grpc_pollset_worker ** worker_hdl,grpc_core::Timestamp deadline)984 static grpc_error_handle pollset_work(grpc_pollset* ps,
985 grpc_pollset_worker** worker_hdl,
986 grpc_core::Timestamp deadline) {
987 grpc_pollset_worker worker;
988 grpc_error_handle error;
989 static const char* err_desc = "pollset_work";
990 if (ps->kicked_without_poller) {
991 ps->kicked_without_poller = false;
992 return absl::OkStatus();
993 }
994
995 if (begin_worker(ps, &worker, worker_hdl, deadline)) {
996 g_current_thread_pollset = ps;
997 g_current_thread_worker = &worker;
998 CHECK(!ps->shutting_down);
999 CHECK(!ps->seen_inactive);
1000
1001 gpr_mu_unlock(&ps->mu); // unlock
1002 // This is the designated polling thread at this point and should ideally do
1003 // polling. However, if there are unprocessed events left from a previous
1004 // call to do_epoll_wait(), skip calling epoll_wait() in this iteration and
1005 // process the pending epoll events.
1006
1007 // The reason for decoupling do_epoll_wait and process_epoll_events is to
1008 // better distribute the work (i.e handling epoll events) across multiple
1009 // threads
1010
1011 // process_epoll_events() returns very quickly: It just queues the work on
1012 // exec_ctx but does not execute it (the actual execution or more
1013 // accurately grpc_core::ExecCtx::Get()->Flush() happens in end_worker()
1014 // AFTER selecting a designated poller). So we are not waiting long periods
1015 // without a designated poller
1016 if (gpr_atm_acq_load(&g_epoll_set.cursor) ==
1017 gpr_atm_acq_load(&g_epoll_set.num_events)) {
1018 append_error(&error, do_epoll_wait(ps, deadline), err_desc);
1019 }
1020 append_error(&error, process_epoll_events(ps), err_desc);
1021
1022 gpr_mu_lock(&ps->mu); // lock
1023
1024 g_current_thread_worker = nullptr;
1025 } else {
1026 g_current_thread_pollset = ps;
1027 }
1028 end_worker(ps, &worker, worker_hdl);
1029
1030 g_current_thread_pollset = nullptr;
1031 return error;
1032 }
1033
pollset_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)1034 static grpc_error_handle pollset_kick(grpc_pollset* pollset,
1035 grpc_pollset_worker* specific_worker) {
1036 grpc_error_handle ret_err;
1037 if (GRPC_TRACE_FLAG_ENABLED(polling)) {
1038 std::vector<std::string> log;
1039 log.push_back(absl::StrFormat(
1040 "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset, specific_worker,
1041 static_cast<void*>(g_current_thread_pollset),
1042 static_cast<void*>(g_current_thread_worker), pollset->root_worker));
1043 if (pollset->root_worker != nullptr) {
1044 log.push_back(absl::StrFormat(
1045 " {kick_state=%s next=%p {kick_state=%s}}",
1046 kick_state_string(pollset->root_worker->state),
1047 pollset->root_worker->next,
1048 kick_state_string(pollset->root_worker->next->state)));
1049 }
1050 if (specific_worker != nullptr) {
1051 log.push_back(absl::StrFormat(" worker_kick_state=%s",
1052 kick_state_string(specific_worker->state)));
1053 }
1054 VLOG(2) << absl::StrJoin(log, "");
1055 }
1056
1057 if (specific_worker == nullptr) {
1058 if (g_current_thread_pollset != pollset) {
1059 grpc_pollset_worker* root_worker = pollset->root_worker;
1060 if (root_worker == nullptr) {
1061 pollset->kicked_without_poller = true;
1062 GRPC_TRACE_LOG(polling, INFO) << " .. kicked_without_poller";
1063 goto done;
1064 }
1065 grpc_pollset_worker* next_worker = root_worker->next;
1066 if (root_worker->state == KICKED) {
1067 GRPC_TRACE_LOG(polling, INFO) << " .. already kicked " << root_worker;
1068 SET_KICK_STATE(root_worker, KICKED);
1069 goto done;
1070 } else if (next_worker->state == KICKED) {
1071 GRPC_TRACE_LOG(polling, INFO) << " .. already kicked " << next_worker;
1072 SET_KICK_STATE(next_worker, KICKED);
1073 goto done;
1074 } else if (root_worker == next_worker && // only try and wake up a poller
1075 // if there is no next worker
1076 root_worker ==
1077 reinterpret_cast<grpc_pollset_worker*>(
1078 gpr_atm_no_barrier_load(&g_active_poller))) {
1079 GRPC_TRACE_LOG(polling, INFO) << " .. kicked " << root_worker;
1080 SET_KICK_STATE(root_worker, KICKED);
1081 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1082 goto done;
1083 } else if (next_worker->state == UNKICKED) {
1084 GRPC_TRACE_LOG(polling, INFO) << " .. kicked " << next_worker;
1085 CHECK(next_worker->initialized_cv);
1086 SET_KICK_STATE(next_worker, KICKED);
1087 gpr_cv_signal(&next_worker->cv);
1088 goto done;
1089 } else if (next_worker->state == DESIGNATED_POLLER) {
1090 if (root_worker->state != DESIGNATED_POLLER) {
1091 GRPC_TRACE_LOG(polling, INFO)
1092 << " .. kicked root non-poller " << root_worker
1093 << " (initialized_cv=" << root_worker->initialized_cv
1094 << ") (poller=" << next_worker << ")";
1095 SET_KICK_STATE(root_worker, KICKED);
1096 if (root_worker->initialized_cv) {
1097 gpr_cv_signal(&root_worker->cv);
1098 }
1099 goto done;
1100 } else {
1101 GRPC_TRACE_LOG(polling, INFO) << " .. non-root poller " << next_worker
1102 << " (root=" << root_worker << ")";
1103 SET_KICK_STATE(next_worker, KICKED);
1104 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1105 goto done;
1106 }
1107 } else {
1108 CHECK(next_worker->state == KICKED);
1109 SET_KICK_STATE(next_worker, KICKED);
1110 goto done;
1111 }
1112 } else {
1113 GRPC_TRACE_LOG(polling, INFO) << " .. kicked while waking up";
1114 goto done;
1115 }
1116
1117 GPR_UNREACHABLE_CODE(goto done);
1118 }
1119
1120 if (specific_worker->state == KICKED) {
1121 GRPC_TRACE_LOG(polling, INFO) << " .. specific worker already kicked";
1122 goto done;
1123 } else if (g_current_thread_worker == specific_worker) {
1124 GRPC_TRACE_LOG(polling, INFO)
1125 << " .. mark " << specific_worker << " kicked";
1126 SET_KICK_STATE(specific_worker, KICKED);
1127 goto done;
1128 } else if (specific_worker ==
1129 reinterpret_cast<grpc_pollset_worker*>(
1130 gpr_atm_no_barrier_load(&g_active_poller))) {
1131 GRPC_TRACE_LOG(polling, INFO) << " .. kick active poller";
1132 SET_KICK_STATE(specific_worker, KICKED);
1133 ret_err = grpc_wakeup_fd_wakeup(&global_wakeup_fd);
1134 goto done;
1135 } else if (specific_worker->initialized_cv) {
1136 GRPC_TRACE_LOG(polling, INFO) << " .. kick waiting worker";
1137 SET_KICK_STATE(specific_worker, KICKED);
1138 gpr_cv_signal(&specific_worker->cv);
1139 goto done;
1140 } else {
1141 GRPC_TRACE_LOG(polling, INFO) << " .. kick non-waiting worker";
1142 SET_KICK_STATE(specific_worker, KICKED);
1143 goto done;
1144 }
1145 done:
1146 return ret_err;
1147 }
1148
pollset_add_fd(grpc_pollset *,grpc_fd *)1149 static void pollset_add_fd(grpc_pollset* /*pollset*/, grpc_fd* /*fd*/) {}
1150
1151 //******************************************************************************
1152 // Pollset-set Definitions
1153 //
1154
pollset_set_create(void)1155 static grpc_pollset_set* pollset_set_create(void) {
1156 return reinterpret_cast<grpc_pollset_set*>(static_cast<intptr_t>(0xdeafbeef));
1157 }
1158
pollset_set_destroy(grpc_pollset_set *)1159 static void pollset_set_destroy(grpc_pollset_set* /*pss*/) {}
1160
pollset_set_add_fd(grpc_pollset_set *,grpc_fd *)1161 static void pollset_set_add_fd(grpc_pollset_set* /*pss*/, grpc_fd* /*fd*/) {}
1162
pollset_set_del_fd(grpc_pollset_set *,grpc_fd *)1163 static void pollset_set_del_fd(grpc_pollset_set* /*pss*/, grpc_fd* /*fd*/) {}
1164
pollset_set_add_pollset(grpc_pollset_set *,grpc_pollset *)1165 static void pollset_set_add_pollset(grpc_pollset_set* /*pss*/,
1166 grpc_pollset* /*ps*/) {}
1167
pollset_set_del_pollset(grpc_pollset_set *,grpc_pollset *)1168 static void pollset_set_del_pollset(grpc_pollset_set* /*pss*/,
1169 grpc_pollset* /*ps*/) {}
1170
pollset_set_add_pollset_set(grpc_pollset_set *,grpc_pollset_set *)1171 static void pollset_set_add_pollset_set(grpc_pollset_set* /*bag*/,
1172 grpc_pollset_set* /*item*/) {}
1173
pollset_set_del_pollset_set(grpc_pollset_set *,grpc_pollset_set *)1174 static void pollset_set_del_pollset_set(grpc_pollset_set* /*bag*/,
1175 grpc_pollset_set* /*item*/) {}
1176
1177 //******************************************************************************
1178 // Event engine binding
1179 //
1180
is_any_background_poller_thread(void)1181 static bool is_any_background_poller_thread(void) { return false; }
1182
shutdown_background_closure(void)1183 static void shutdown_background_closure(void) {}
1184
add_closure_to_background_poller(grpc_closure *,grpc_error_handle)1185 static bool add_closure_to_background_poller(grpc_closure* /*closure*/,
1186 grpc_error_handle /*error*/) {
1187 return false;
1188 }
1189
shutdown_engine(void)1190 static void shutdown_engine(void) {
1191 fd_global_shutdown();
1192 pollset_global_shutdown();
1193 epoll_set_shutdown();
1194 g_is_shutdown = true;
1195 }
1196
1197 static bool init_epoll1_linux();
1198
1199 const grpc_event_engine_vtable grpc_ev_epoll1_posix = {
1200 sizeof(grpc_pollset),
1201 true,
1202 false,
1203
1204 fd_create,
1205 fd_wrapped_fd,
1206 fd_orphan,
1207 fd_shutdown,
1208 fd_notify_on_read,
1209 fd_notify_on_write,
1210 fd_notify_on_error,
1211 fd_become_readable,
1212 fd_become_writable,
1213 fd_has_errors,
1214 fd_is_shutdown,
1215
1216 pollset_init,
1217 pollset_shutdown,
1218 pollset_destroy,
1219 pollset_work,
1220 pollset_kick,
1221 pollset_add_fd,
1222
1223 pollset_set_create,
1224 pollset_set_destroy,
1225 pollset_set_add_pollset,
1226 pollset_set_del_pollset,
1227 pollset_set_add_pollset_set,
1228 pollset_set_del_pollset_set,
1229 pollset_set_add_fd,
1230 pollset_set_del_fd,
1231
1232 is_any_background_poller_thread,
1233 /* name = */ "epoll1",
1234 /* check_engine_available = */
__anon753e012b0502() 1235 [](bool) { return init_epoll1_linux(); },
1236 /* init_engine = */
__anon753e012b0602() 1237 []() { CHECK(init_epoll1_linux()); },
1238 shutdown_background_closure,
1239 /* shutdown_engine = */
__anon753e012b0702() 1240 []() { shutdown_engine(); },
1241 add_closure_to_background_poller,
1242
1243 fd_set_pre_allocated,
1244 };
1245
1246 // Called by the child process's post-fork handler to close open fds, including
1247 // the global epoll fd. This allows gRPC to shutdown in the child process
1248 // without interfering with connections or RPCs ongoing in the parent.
reset_event_manager_on_fork()1249 static void reset_event_manager_on_fork() {
1250 if (g_is_shutdown) return;
1251 gpr_mu_lock(&fork_fd_list_mu);
1252 while (fork_fd_list_head != nullptr) {
1253 close(fork_fd_list_head->fd);
1254 fork_fd_list_head->fd = -1;
1255 fork_fd_list_head = fork_fd_list_head->fork_fd_list->next;
1256 }
1257 gpr_mu_unlock(&fork_fd_list_mu);
1258 shutdown_engine();
1259 init_epoll1_linux();
1260 }
1261
1262 // It is possible that GLIBC has epoll but the underlying kernel doesn't.
1263 // Create epoll_fd (epoll_set_init() takes care of that) to make sure epoll
1264 // support is available
init_epoll1_linux()1265 static bool init_epoll1_linux() {
1266 if (!g_is_shutdown) return true;
1267 if (!grpc_has_wakeup_fd()) {
1268 LOG(ERROR) << "Skipping epoll1 because of no wakeup fd.";
1269 return false;
1270 }
1271
1272 if (!epoll_set_init()) {
1273 return false;
1274 }
1275
1276 fd_global_init();
1277
1278 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1279 fd_global_shutdown();
1280 epoll_set_shutdown();
1281 return false;
1282 }
1283
1284 if (grpc_core::Fork::Enabled()) {
1285 if (grpc_core::Fork::RegisterResetChildPollingEngineFunc(
1286 reset_event_manager_on_fork)) {
1287 gpr_mu_init(&fork_fd_list_mu);
1288 }
1289 }
1290 g_is_shutdown = false;
1291 return true;
1292 }
1293
1294 #else // defined(GRPC_LINUX_EPOLL)
1295 #if defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1296 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
1297 const grpc_event_engine_vtable grpc_ev_epoll1_posix = {
1298 1,
1299 true,
1300 false,
1301
1302 nullptr,
1303 nullptr,
1304 nullptr,
1305 nullptr,
1306 nullptr,
1307 nullptr,
1308 nullptr,
1309 nullptr,
1310 nullptr,
1311 nullptr,
1312 nullptr,
1313
1314 nullptr,
1315 nullptr,
1316 nullptr,
1317 nullptr,
1318 nullptr,
1319 nullptr,
1320
1321 nullptr,
1322 nullptr,
1323 nullptr,
1324 nullptr,
1325 nullptr,
1326 nullptr,
1327 nullptr,
1328 nullptr,
1329
1330 nullptr,
1331 /* name = */ "epoll1",
__anon753e012b0802() 1332 /* check_engine_available = */ [](bool) { return false; },
1333 nullptr,
1334 nullptr,
1335 nullptr,
1336 nullptr,
1337 nullptr,
1338 };
1339 #endif // defined(GRPC_POSIX_SOCKET_EV_EPOLL1)
1340 #endif // !defined(GRPC_LINUX_EPOLL)
1341