1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #include <grpc/grpc_posix.h>
24 #include <grpc/support/log.h>
25
26 /* This polling engine is only relevant on linux kernels supporting epoll() */
27 #ifdef GRPC_LINUX_EPOLL_CREATE1
28
29 #include "src/core/lib/iomgr/ev_epollsig_linux.h"
30
31 #include <assert.h>
32 #include <errno.h>
33 #include <limits.h>
34 #include <poll.h>
35 #include <pthread.h>
36 #include <signal.h>
37 #include <string.h>
38 #include <sys/epoll.h>
39 #include <sys/socket.h>
40 #include <unistd.h>
41
42 #include <grpc/support/alloc.h>
43 #include <grpc/support/string_util.h>
44
45 #include "src/core/lib/debug/stats.h"
46 #include "src/core/lib/gpr/tls.h"
47 #include "src/core/lib/gpr/useful.h"
48 #include "src/core/lib/gprpp/manual_constructor.h"
49 #include "src/core/lib/iomgr/block_annotate.h"
50 #include "src/core/lib/iomgr/ev_posix.h"
51 #include "src/core/lib/iomgr/iomgr_internal.h"
52 #include "src/core/lib/iomgr/lockfree_event.h"
53 #include "src/core/lib/iomgr/timer.h"
54 #include "src/core/lib/iomgr/wakeup_fd_posix.h"
55 #include "src/core/lib/profiling/timers.h"
56
57 #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1)
58
59 #define GRPC_POLLING_TRACE(...) \
60 if (grpc_polling_trace.enabled()) { \
61 gpr_log(GPR_INFO, __VA_ARGS__); \
62 }
63
64 static int grpc_wakeup_signal = -1;
65 static bool is_grpc_wakeup_signal_initialized = false;
66
67 /* Implements the function defined in grpc_posix.h. This function might be
68 * called before even calling grpc_init() to set either a different signal to
69 * use. If signum == -1, then the use of signals is disabled */
grpc_use_signal(int signum)70 void grpc_use_signal(int signum) {
71 grpc_wakeup_signal = signum;
72 is_grpc_wakeup_signal_initialized = true;
73
74 if (grpc_wakeup_signal < 0) {
75 gpr_log(GPR_INFO,
76 "Use of signals is disabled. Epoll engine will not be used");
77 } else {
78 gpr_log(GPR_INFO, "epoll engine will be using signal: %d",
79 grpc_wakeup_signal);
80 }
81 }
82
83 struct polling_island;
84
85 typedef enum {
86 POLL_OBJ_FD,
87 POLL_OBJ_POLLSET,
88 POLL_OBJ_POLLSET_SET
89 } poll_obj_type;
90
91 typedef struct poll_obj {
92 #ifndef NDEBUG
93 poll_obj_type obj_type;
94 #endif
95 gpr_mu mu;
96 struct polling_island* pi;
97 } poll_obj;
98
poll_obj_string(poll_obj_type po_type)99 const char* poll_obj_string(poll_obj_type po_type) {
100 switch (po_type) {
101 case POLL_OBJ_FD:
102 return "fd";
103 case POLL_OBJ_POLLSET:
104 return "pollset";
105 case POLL_OBJ_POLLSET_SET:
106 return "pollset_set";
107 }
108
109 GPR_UNREACHABLE_CODE(return "UNKNOWN");
110 }
111
112 /*******************************************************************************
113 * Fd Declarations
114 */
115
116 #define FD_FROM_PO(po) ((grpc_fd*)(po))
117
118 struct grpc_fd {
119 poll_obj po;
120
121 int fd;
122 /* refst format:
123 bit 0 : 1=Active / 0=Orphaned
124 bits 1-n : refcount
125 Ref/Unref by two to avoid altering the orphaned bit */
126 gpr_atm refst;
127
128 /* The fd is either closed or we relinquished control of it. In either
129 cases, this indicates that the 'fd' on this structure is no longer
130 valid */
131 bool orphaned;
132
133 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure;
134 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure;
135 grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure;
136
137 struct grpc_fd* freelist_next;
138 grpc_closure* on_done_closure;
139
140 grpc_iomgr_object iomgr_object;
141
142 /* Do we need to track EPOLLERR events separately? */
143 bool track_err;
144 };
145
146 /* Reference counting for fds */
147 #ifndef NDEBUG
148 static void fd_ref(grpc_fd* fd, const char* reason, const char* file, int line);
149 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
150 int line);
151 #define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
152 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
153 #else
154 static void fd_ref(grpc_fd* fd);
155 static void fd_unref(grpc_fd* fd);
156 #define GRPC_FD_REF(fd, reason) fd_ref(fd)
157 #define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
158 #endif
159
160 static void fd_global_init(void);
161 static void fd_global_shutdown(void);
162
163 /*******************************************************************************
164 * Polling island Declarations
165 */
166
167 #ifndef NDEBUG
168
169 #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__)
170 #define PI_UNREF(p, r) pi_unref_dbg((p), (r), __FILE__, __LINE__)
171
172 #else
173
174 #define PI_ADD_REF(p, r) pi_add_ref((p))
175 #define PI_UNREF(p, r) pi_unref((p))
176
177 #endif
178
179 /* This is also used as grpc_workqueue (by directly casing it) */
180 typedef struct polling_island {
181 gpr_mu mu;
182 /* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
183 the refcount.
184 Once the ref count becomes zero, this structure is destroyed which means
185 we should ensure that there is never a scenario where a PI_ADD_REF() is
186 racing with a PI_UNREF() that just made the ref_count zero. */
187 gpr_atm ref_count;
188
189 /* Pointer to the polling_island this merged into.
190 * merged_to value is only set once in polling_island's lifetime (and that too
191 * only if the island is merged with another island). Because of this, we can
192 * use gpr_atm type here so that we can do atomic access on this and reduce
193 * lock contention on 'mu' mutex.
194 *
195 * Note that if this field is not NULL (i.e not 0), all the remaining fields
196 * (except mu and ref_count) are invalid and must be ignored. */
197 gpr_atm merged_to;
198
199 /* Number of threads currently polling on this island */
200 gpr_atm poller_count;
201
202 /* The fd of the underlying epoll set */
203 int epoll_fd;
204
205 /* The file descriptors in the epoll set */
206 size_t fd_cnt;
207 size_t fd_capacity;
208 grpc_fd** fds;
209 } polling_island;
210
211 /*******************************************************************************
212 * Pollset Declarations
213 */
214 struct grpc_pollset_worker {
215 /* Thread id of this worker */
216 pthread_t pt_id;
217
218 /* Used to prevent a worker from getting kicked multiple times */
219 gpr_atm is_kicked;
220 struct grpc_pollset_worker* next;
221 struct grpc_pollset_worker* prev;
222 };
223
224 struct grpc_pollset {
225 poll_obj po;
226
227 grpc_pollset_worker root_worker;
228 bool kicked_without_pollers;
229
230 bool shutting_down; /* Is the pollset shutting down ? */
231 bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
232 grpc_closure* shutdown_done; /* Called after after shutdown is complete */
233 };
234
235 /*******************************************************************************
236 * Pollset-set Declarations
237 */
238 struct grpc_pollset_set {
239 poll_obj po;
240 };
241
242 /*******************************************************************************
243 * Common helpers
244 */
245
append_error(grpc_error ** composite,grpc_error * error,const char * desc)246 static bool append_error(grpc_error** composite, grpc_error* error,
247 const char* desc) {
248 if (error == GRPC_ERROR_NONE) return true;
249 if (*composite == GRPC_ERROR_NONE) {
250 *composite = GRPC_ERROR_CREATE_FROM_COPIED_STRING(desc);
251 }
252 *composite = grpc_error_add_child(*composite, error);
253 return false;
254 }
255
256 /*******************************************************************************
257 * Polling island Definitions
258 */
259
260 /* The wakeup fd that is used to wake up all threads in a Polling island. This
261 is useful in the polling island merge operation where we need to wakeup all
262 the threads currently polling the smaller polling island (so that they can
263 start polling the new/merged polling island)
264
265 NOTE: This fd is initialized to be readable and MUST NOT be consumed i.e the
266 threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */
267 static grpc_wakeup_fd polling_island_wakeup_fd;
268
269 /* The polling island being polled right now.
270 See comments in workqueue_maybe_wakeup for why this is tracked. */
271 static __thread polling_island* g_current_thread_polling_island;
272
273 /* Forward declaration */
274 static void polling_island_delete(polling_island* pi);
275
276 #ifdef GRPC_TSAN
277 /* Currently TSAN may incorrectly flag data races between epoll_ctl and
278 epoll_wait for any grpc_fd structs that are added to the epoll set via
279 epoll_ctl and are returned (within a very short window) via epoll_wait().
280
281 To work-around this race, we establish a happens-before relation between
282 the code just-before epoll_ctl() and the code after epoll_wait() by using
283 this atomic */
284 gpr_atm g_epoll_sync;
285 #endif /* defined(GRPC_TSAN) */
286
287 static void pi_add_ref(polling_island* pi);
288 static void pi_unref(polling_island* pi);
289
290 #ifndef NDEBUG
pi_add_ref_dbg(polling_island * pi,const char * reason,const char * file,int line)291 static void pi_add_ref_dbg(polling_island* pi, const char* reason,
292 const char* file, int line) {
293 if (grpc_polling_trace.enabled()) {
294 gpr_atm old_cnt = gpr_atm_acq_load(&pi->ref_count);
295 gpr_log(GPR_INFO,
296 "Add ref pi: %p, old:%" PRIdPTR " -> new:%" PRIdPTR
297 " (%s) - (%s, %d)",
298 pi, old_cnt, old_cnt + 1, reason, file, line);
299 }
300 pi_add_ref(pi);
301 }
302
pi_unref_dbg(polling_island * pi,const char * reason,const char * file,int line)303 static void pi_unref_dbg(polling_island* pi, const char* reason,
304 const char* file, int line) {
305 if (grpc_polling_trace.enabled()) {
306 gpr_atm old_cnt = gpr_atm_acq_load(&pi->ref_count);
307 gpr_log(GPR_INFO,
308 "Unref pi: %p, old:%" PRIdPTR " -> new:%" PRIdPTR
309 " (%s) - (%s, %d)",
310 pi, old_cnt, (old_cnt - 1), reason, file, line);
311 }
312 pi_unref(pi);
313 }
314 #endif
315
pi_add_ref(polling_island * pi)316 static void pi_add_ref(polling_island* pi) {
317 gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1);
318 }
319
pi_unref(polling_island * pi)320 static void pi_unref(polling_island* pi) {
321 /* If ref count went to zero, delete the polling island.
322 Note that this deletion not be done under a lock. Once the ref count goes
323 to zero, we are guaranteed that no one else holds a reference to the
324 polling island (and that there is no racing pi_add_ref() call either).
325
326 Also, if we are deleting the polling island and the merged_to field is
327 non-empty, we should remove a ref to the merged_to polling island
328 */
329 if (1 == gpr_atm_full_fetch_add(&pi->ref_count, -1)) {
330 polling_island* next = (polling_island*)gpr_atm_acq_load(&pi->merged_to);
331 polling_island_delete(pi);
332 if (next != nullptr) {
333 PI_UNREF(next, "pi_delete"); /* Recursive call */
334 }
335 }
336 }
337
338 /* The caller is expected to hold pi->mu lock before calling this function */
polling_island_add_fds_locked(polling_island * pi,grpc_fd ** fds,size_t fd_count,bool add_fd_refs,grpc_error ** error)339 static void polling_island_add_fds_locked(polling_island* pi, grpc_fd** fds,
340 size_t fd_count, bool add_fd_refs,
341 grpc_error** error) {
342 int err;
343 size_t i;
344 struct epoll_event ev;
345 char* err_msg;
346 const char* err_desc = "polling_island_add_fds";
347
348 #ifdef GRPC_TSAN
349 /* See the definition of g_epoll_sync for more context */
350 gpr_atm_rel_store(&g_epoll_sync, (gpr_atm)0);
351 #endif /* defined(GRPC_TSAN) */
352
353 for (i = 0; i < fd_count; i++) {
354 ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET);
355 /* Use the least significant bit of ev.data.ptr to store track_err to avoid
356 * synchronization issues when accessing it after receiving an event */
357 ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fds[i]) |
358 (fds[i]->track_err ? 1 : 0));
359 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev);
360
361 if (err < 0) {
362 if (errno != EEXIST) {
363 gpr_asprintf(
364 &err_msg,
365 "epoll_ctl (epoll_fd: %d) add fd: %d failed with error: %d (%s)",
366 pi->epoll_fd, fds[i]->fd, errno, strerror(errno));
367 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
368 gpr_free(err_msg);
369 }
370
371 continue;
372 }
373
374 if (pi->fd_cnt == pi->fd_capacity) {
375 pi->fd_capacity = GPR_MAX(pi->fd_capacity + 8, pi->fd_cnt * 3 / 2);
376 pi->fds = static_cast<grpc_fd**>(
377 gpr_realloc(pi->fds, sizeof(grpc_fd*) * pi->fd_capacity));
378 }
379
380 pi->fds[pi->fd_cnt++] = fds[i];
381 if (add_fd_refs) {
382 GRPC_FD_REF(fds[i], "polling_island");
383 }
384 }
385 }
386
387 /* The caller is expected to hold pi->mu before calling this */
polling_island_add_wakeup_fd_locked(polling_island * pi,grpc_wakeup_fd * wakeup_fd,grpc_error ** error)388 static void polling_island_add_wakeup_fd_locked(polling_island* pi,
389 grpc_wakeup_fd* wakeup_fd,
390 grpc_error** error) {
391 struct epoll_event ev;
392 int err;
393 char* err_msg;
394 const char* err_desc = "polling_island_add_wakeup_fd";
395
396 ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLET);
397 ev.data.ptr = wakeup_fd;
398 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD,
399 GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), &ev);
400 if (err < 0 && errno != EEXIST) {
401 gpr_asprintf(&err_msg,
402 "epoll_ctl (epoll_fd: %d) add wakeup fd: %d failed with "
403 "error: %d (%s)",
404 pi->epoll_fd, GRPC_WAKEUP_FD_GET_READ_FD(wakeup_fd), errno,
405 strerror(errno));
406 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
407 gpr_free(err_msg);
408 }
409 }
410
411 /* The caller is expected to hold pi->mu lock before calling this function */
polling_island_remove_all_fds_locked(polling_island * pi,bool remove_fd_refs,grpc_error ** error)412 static void polling_island_remove_all_fds_locked(polling_island* pi,
413 bool remove_fd_refs,
414 grpc_error** error) {
415 int err;
416 size_t i;
417 char* err_msg;
418 const char* err_desc = "polling_island_remove_fds";
419
420 for (i = 0; i < pi->fd_cnt; i++) {
421 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, pi->fds[i]->fd, nullptr);
422 if (err < 0 && errno != ENOENT) {
423 gpr_asprintf(&err_msg,
424 "epoll_ctl (epoll_fd: %d) delete fds[%zu]: %d failed with "
425 "error: %d (%s)",
426 pi->epoll_fd, i, pi->fds[i]->fd, errno, strerror(errno));
427 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
428 gpr_free(err_msg);
429 }
430
431 if (remove_fd_refs) {
432 GRPC_FD_UNREF(pi->fds[i], "polling_island");
433 }
434 }
435
436 pi->fd_cnt = 0;
437 }
438
439 /* The caller is expected to hold pi->mu lock before calling this function */
polling_island_remove_fd_locked(polling_island * pi,grpc_fd * fd,grpc_error ** error)440 static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd,
441 grpc_error** error) {
442 int err;
443 size_t i;
444 char* err_msg;
445 const char* err_desc = "polling_island_remove_fd";
446
447 /* If fd is already closed, then it would have been automatically been removed
448 from the epoll set */
449 err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr);
450 if (err < 0 && errno != ENOENT) {
451 gpr_asprintf(
452 &err_msg,
453 "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)",
454 pi->epoll_fd, fd->fd, errno, strerror(errno));
455 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
456 gpr_free(err_msg);
457 }
458
459 for (i = 0; i < pi->fd_cnt; i++) {
460 if (pi->fds[i] == fd) {
461 pi->fds[i] = pi->fds[--pi->fd_cnt];
462 GRPC_FD_UNREF(fd, "polling_island");
463 break;
464 }
465 }
466 }
467
468 /* Might return NULL in case of an error */
polling_island_create(grpc_fd * initial_fd,grpc_error ** error)469 static polling_island* polling_island_create(grpc_fd* initial_fd,
470 grpc_error** error) {
471 polling_island* pi = nullptr;
472 const char* err_desc = "polling_island_create";
473
474 *error = GRPC_ERROR_NONE;
475
476 pi = static_cast<polling_island*>(gpr_malloc(sizeof(*pi)));
477 gpr_mu_init(&pi->mu);
478 pi->fd_cnt = 0;
479 pi->fd_capacity = 0;
480 pi->fds = nullptr;
481 pi->epoll_fd = -1;
482
483 gpr_atm_rel_store(&pi->ref_count, 0);
484 gpr_atm_rel_store(&pi->poller_count, 0);
485 gpr_atm_rel_store(&pi->merged_to, (gpr_atm) nullptr);
486
487 pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
488
489 if (pi->epoll_fd < 0) {
490 append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc);
491 goto done;
492 }
493
494 if (initial_fd != nullptr) {
495 polling_island_add_fds_locked(pi, &initial_fd, 1, true, error);
496 }
497
498 done:
499 if (*error != GRPC_ERROR_NONE) {
500 polling_island_delete(pi);
501 pi = nullptr;
502 }
503 return pi;
504 }
505
polling_island_delete(polling_island * pi)506 static void polling_island_delete(polling_island* pi) {
507 GPR_ASSERT(pi->fd_cnt == 0);
508
509 if (pi->epoll_fd >= 0) {
510 close(pi->epoll_fd);
511 }
512 gpr_mu_destroy(&pi->mu);
513 gpr_free(pi->fds);
514 gpr_free(pi);
515 }
516
517 /* Attempts to gets the last polling island in the linked list (liked by the
518 * 'merged_to' field). Since this does not lock the polling island, there are no
519 * guarantees that the island returned is the last island */
polling_island_maybe_get_latest(polling_island * pi)520 static polling_island* polling_island_maybe_get_latest(polling_island* pi) {
521 polling_island* next = (polling_island*)gpr_atm_acq_load(&pi->merged_to);
522 while (next != nullptr) {
523 pi = next;
524 next = (polling_island*)gpr_atm_acq_load(&pi->merged_to);
525 }
526
527 return pi;
528 }
529
530 /* Gets the lock on the *latest* polling island i.e the last polling island in
531 the linked list (linked by the 'merged_to' field). Call gpr_mu_unlock on the
532 returned polling island's mu.
533 Usage: To lock/unlock polling island "pi", do the following:
534 polling_island *pi_latest = polling_island_lock(pi);
535 ...
536 ... critical section ..
537 ...
538 gpr_mu_unlock(&pi_latest->mu); // NOTE: use pi_latest->mu. NOT pi->mu */
polling_island_lock(polling_island * pi)539 static polling_island* polling_island_lock(polling_island* pi) {
540 polling_island* next = nullptr;
541
542 while (true) {
543 next = (polling_island*)gpr_atm_acq_load(&pi->merged_to);
544 if (next == nullptr) {
545 /* Looks like 'pi' is the last node in the linked list but unless we check
546 this by holding the pi->mu lock, we cannot be sure (i.e without the
547 pi->mu lock, we don't prevent island merges).
548 To be absolutely sure, check once more by holding the pi->mu lock */
549 gpr_mu_lock(&pi->mu);
550 next = (polling_island*)gpr_atm_acq_load(&pi->merged_to);
551 if (next == nullptr) {
552 /* pi is infact the last node and we have the pi->mu lock. we're done */
553 break;
554 }
555
556 /* pi->merged_to is not NULL i.e pi isn't the last node anymore. pi->mu
557 * isn't the lock we are interested in. Continue traversing the list */
558 gpr_mu_unlock(&pi->mu);
559 }
560
561 pi = next;
562 }
563
564 return pi;
565 }
566
567 /* Gets the lock on the *latest* polling islands in the linked lists pointed by
568 *p and *q (and also updates *p and *q to point to the latest polling islands)
569
570 This function is needed because calling the following block of code to obtain
571 locks on polling islands (*p and *q) is prone to deadlocks.
572 {
573 polling_island_lock(*p, true);
574 polling_island_lock(*q, true);
575 }
576
577 Usage/example:
578 polling_island *p1;
579 polling_island *p2;
580 ..
581 polling_island_lock_pair(&p1, &p2);
582 ..
583 .. Critical section with both p1 and p2 locked
584 ..
585 // Release locks: Always call polling_island_unlock_pair() to release locks
586 polling_island_unlock_pair(p1, p2);
587 */
polling_island_lock_pair(polling_island ** p,polling_island ** q)588 static void polling_island_lock_pair(polling_island** p, polling_island** q) {
589 polling_island* pi_1 = *p;
590 polling_island* pi_2 = *q;
591 polling_island* next_1 = nullptr;
592 polling_island* next_2 = nullptr;
593
594 /* The algorithm is simple:
595 - Go to the last polling islands in the linked lists *pi_1 and *pi_2 (and
596 keep updating pi_1 and pi_2)
597 - Then obtain locks on the islands by following a lock order rule of
598 locking polling_island with lower address first
599 Special case: Before obtaining the locks, check if pi_1 and pi_2 are
600 pointing to the same island. If that is the case, we can just call
601 polling_island_lock()
602 - After obtaining both the locks, double check that the polling islands
603 are still the last polling islands in their respective linked lists
604 (this is because there might have been polling island merges before
605 we got the lock)
606 - If the polling islands are the last islands, we are done. If not,
607 release the locks and continue the process from the first step */
608 while (true) {
609 next_1 = (polling_island*)gpr_atm_acq_load(&pi_1->merged_to);
610 while (next_1 != nullptr) {
611 pi_1 = next_1;
612 next_1 = (polling_island*)gpr_atm_acq_load(&pi_1->merged_to);
613 }
614
615 next_2 = (polling_island*)gpr_atm_acq_load(&pi_2->merged_to);
616 while (next_2 != nullptr) {
617 pi_2 = next_2;
618 next_2 = (polling_island*)gpr_atm_acq_load(&pi_2->merged_to);
619 }
620
621 if (pi_1 == pi_2) {
622 pi_1 = pi_2 = polling_island_lock(pi_1);
623 break;
624 }
625
626 if (pi_1 < pi_2) {
627 gpr_mu_lock(&pi_1->mu);
628 gpr_mu_lock(&pi_2->mu);
629 } else {
630 gpr_mu_lock(&pi_2->mu);
631 gpr_mu_lock(&pi_1->mu);
632 }
633
634 next_1 = (polling_island*)gpr_atm_acq_load(&pi_1->merged_to);
635 next_2 = (polling_island*)gpr_atm_acq_load(&pi_2->merged_to);
636 if (next_1 == nullptr && next_2 == nullptr) {
637 break;
638 }
639
640 gpr_mu_unlock(&pi_1->mu);
641 gpr_mu_unlock(&pi_2->mu);
642 }
643
644 *p = pi_1;
645 *q = pi_2;
646 }
647
polling_island_unlock_pair(polling_island * p,polling_island * q)648 static void polling_island_unlock_pair(polling_island* p, polling_island* q) {
649 if (p == q) {
650 gpr_mu_unlock(&p->mu);
651 } else {
652 gpr_mu_unlock(&p->mu);
653 gpr_mu_unlock(&q->mu);
654 }
655 }
656
polling_island_merge(polling_island * p,polling_island * q,grpc_error ** error)657 static polling_island* polling_island_merge(polling_island* p,
658 polling_island* q,
659 grpc_error** error) {
660 /* Get locks on both the polling islands */
661 polling_island_lock_pair(&p, &q);
662
663 if (p != q) {
664 /* Make sure that p points to the polling island with fewer fds than q */
665 if (p->fd_cnt > q->fd_cnt) {
666 GPR_SWAP(polling_island*, p, q);
667 }
668
669 /* Merge p with q i.e move all the fds from p (The one with fewer fds) to q
670 Note that the refcounts on the fds being moved will not change here.
671 This is why the last param in the following two functions is 'false') */
672 polling_island_add_fds_locked(q, p->fds, p->fd_cnt, false, error);
673 polling_island_remove_all_fds_locked(p, false, error);
674
675 /* Wakeup all the pollers (if any) on p so that they pickup this change */
676 polling_island_add_wakeup_fd_locked(p, &polling_island_wakeup_fd, error);
677
678 /* Add the 'merged_to' link from p --> q */
679 gpr_atm_rel_store(&p->merged_to, (gpr_atm)q);
680 PI_ADD_REF(q, "pi_merge"); /* To account for the new incoming ref from p */
681 }
682 /* else if p == q, nothing needs to be done */
683
684 polling_island_unlock_pair(p, q);
685
686 /* Return the merged polling island (Note that no merge would have happened
687 if p == q which is ok) */
688 return q;
689 }
690
polling_island_global_init()691 static grpc_error* polling_island_global_init() {
692 grpc_error* error = GRPC_ERROR_NONE;
693
694 error = grpc_wakeup_fd_init(&polling_island_wakeup_fd);
695 if (error == GRPC_ERROR_NONE) {
696 error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd);
697 }
698
699 return error;
700 }
701
polling_island_global_shutdown()702 static void polling_island_global_shutdown() {
703 grpc_wakeup_fd_destroy(&polling_island_wakeup_fd);
704 }
705
706 /*******************************************************************************
707 * Fd Definitions
708 */
709
710 /* We need to keep a freelist not because of any concerns of malloc performance
711 * but instead so that implementations with multiple threads in (for example)
712 * epoll_wait deal with the race between pollset removal and incoming poll
713 * notifications.
714 *
715 * The problem is that the poller ultimately holds a reference to this
716 * object, so it is very difficult to know when is safe to free it, at least
717 * without some expensive synchronization.
718 *
719 * If we keep the object freelisted, in the worst case losing this race just
720 * becomes a spurious read notification on a reused fd.
721 */
722
723 /* The alarm system needs to be able to wakeup 'some poller' sometimes
724 * (specifically when a new alarm needs to be triggered earlier than the next
725 * alarm 'epoch'). This wakeup_fd gives us something to alert on when such a
726 * case occurs. */
727
728 static grpc_fd* fd_freelist = nullptr;
729 static gpr_mu fd_freelist_mu;
730
731 #ifndef NDEBUG
732 #define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
733 #define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
ref_by(grpc_fd * fd,int n,const char * reason,const char * file,int line)734 static void ref_by(grpc_fd* fd, int n, const char* reason, const char* file,
735 int line) {
736 if (grpc_trace_fd_refcount.enabled()) {
737 gpr_log(GPR_DEBUG,
738 "FD %d %p ref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
739 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
740 gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
741 }
742 #else
743 #define REF_BY(fd, n, reason) ref_by(fd, n)
744 #define UNREF_BY(fd, n, reason) unref_by(fd, n)
745 static void ref_by(grpc_fd* fd, int n) {
746 #endif
747 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
748 }
749
750 #ifndef NDEBUG
751 static void unref_by(grpc_fd* fd, int n, const char* reason, const char* file,
752 int line) {
753 if (grpc_trace_fd_refcount.enabled()) {
754 gpr_log(GPR_DEBUG,
755 "FD %d %p unref %d %" PRIdPTR " -> %" PRIdPTR " [%s; %s:%d]",
756 fd->fd, fd, n, gpr_atm_no_barrier_load(&fd->refst),
757 gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
758 }
759 #else
760 static void unref_by(grpc_fd* fd, int n) {
761 #endif
762 gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
763 if (old == n) {
764 /* Add the fd to the freelist */
765 gpr_mu_lock(&fd_freelist_mu);
766 fd->freelist_next = fd_freelist;
767 fd_freelist = fd;
768 grpc_iomgr_unregister_object(&fd->iomgr_object);
769
770 fd->read_closure->DestroyEvent();
771 fd->write_closure->DestroyEvent();
772 fd->error_closure->DestroyEvent();
773
774 gpr_mu_unlock(&fd_freelist_mu);
775 } else {
776 GPR_ASSERT(old > n);
777 }
778 }
779
780 /* Increment refcount by two to avoid changing the orphan bit */
781 #ifndef NDEBUG
782 static void fd_ref(grpc_fd* fd, const char* reason, const char* file,
783 int line) {
784 ref_by(fd, 2, reason, file, line);
785 }
786
787 static void fd_unref(grpc_fd* fd, const char* reason, const char* file,
788 int line) {
789 unref_by(fd, 2, reason, file, line);
790 }
791 #else
792 static void fd_ref(grpc_fd* fd) { ref_by(fd, 2); }
793 static void fd_unref(grpc_fd* fd) { unref_by(fd, 2); }
794 #endif
795
796 static void fd_global_init(void) { gpr_mu_init(&fd_freelist_mu); }
797
798 static void fd_global_shutdown(void) {
799 gpr_mu_lock(&fd_freelist_mu);
800 gpr_mu_unlock(&fd_freelist_mu);
801 while (fd_freelist != nullptr) {
802 grpc_fd* fd = fd_freelist;
803 fd_freelist = fd_freelist->freelist_next;
804 gpr_mu_destroy(&fd->po.mu);
805 gpr_free(fd);
806 }
807 gpr_mu_destroy(&fd_freelist_mu);
808 }
809
810 static grpc_fd* fd_create(int fd, const char* name, bool track_err) {
811 grpc_fd* new_fd = nullptr;
812
813 gpr_mu_lock(&fd_freelist_mu);
814 if (fd_freelist != nullptr) {
815 new_fd = fd_freelist;
816 fd_freelist = fd_freelist->freelist_next;
817 }
818 gpr_mu_unlock(&fd_freelist_mu);
819
820 if (new_fd == nullptr) {
821 new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd)));
822 gpr_mu_init(&new_fd->po.mu);
823 new_fd->read_closure.Init();
824 new_fd->write_closure.Init();
825 new_fd->error_closure.Init();
826 }
827
828 /* Note: It is not really needed to get the new_fd->po.mu lock here. If this
829 * is a newly created fd (or an fd we got from the freelist), no one else
830 * would be holding a lock to it anyway. */
831 gpr_mu_lock(&new_fd->po.mu);
832 new_fd->po.pi = nullptr;
833 #ifndef NDEBUG
834 new_fd->po.obj_type = POLL_OBJ_FD;
835 #endif
836
837 gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
838 new_fd->fd = fd;
839 new_fd->orphaned = false;
840 new_fd->read_closure->InitEvent();
841 new_fd->write_closure->InitEvent();
842 new_fd->error_closure->InitEvent();
843 new_fd->track_err = track_err;
844
845 new_fd->freelist_next = nullptr;
846 new_fd->on_done_closure = nullptr;
847
848 gpr_mu_unlock(&new_fd->po.mu);
849
850 char* fd_name;
851 gpr_asprintf(&fd_name, "%s fd=%d", name, fd);
852 grpc_iomgr_register_object(&new_fd->iomgr_object, fd_name);
853 gpr_free(fd_name);
854 return new_fd;
855 }
856
857 static int fd_wrapped_fd(grpc_fd* fd) {
858 int ret_fd = -1;
859 gpr_mu_lock(&fd->po.mu);
860 if (!fd->orphaned) {
861 ret_fd = fd->fd;
862 }
863 gpr_mu_unlock(&fd->po.mu);
864
865 return ret_fd;
866 }
867
868 static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
869 const char* reason) {
870 grpc_error* error = GRPC_ERROR_NONE;
871 polling_island* unref_pi = nullptr;
872
873 gpr_mu_lock(&fd->po.mu);
874 fd->on_done_closure = on_done;
875
876 /* Remove the active status but keep referenced. We want this grpc_fd struct
877 to be alive (and not added to freelist) until the end of this function */
878 REF_BY(fd, 1, reason);
879
880 /* Remove the fd from the polling island:
881 - Get a lock on the latest polling island (i.e the last island in the
882 linked list pointed by fd->po.pi). This is the island that
883 would actually contain the fd
884 - Remove the fd from the latest polling island
885 - Unlock the latest polling island
886 - Set fd->po.pi to NULL (but remove the ref on the polling island
887 before doing this.) */
888 if (fd->po.pi != nullptr) {
889 polling_island* pi_latest = polling_island_lock(fd->po.pi);
890 polling_island_remove_fd_locked(pi_latest, fd, &error);
891 gpr_mu_unlock(&pi_latest->mu);
892
893 unref_pi = fd->po.pi;
894 fd->po.pi = nullptr;
895 }
896
897 /* If release_fd is not NULL, we should be relinquishing control of the file
898 descriptor fd->fd (but we still own the grpc_fd structure). */
899 if (release_fd != nullptr) {
900 *release_fd = fd->fd;
901 } else {
902 close(fd->fd);
903 }
904
905 fd->orphaned = true;
906
907 GRPC_CLOSURE_SCHED(fd->on_done_closure, GRPC_ERROR_REF(error));
908
909 gpr_mu_unlock(&fd->po.mu);
910 UNREF_BY(fd, 2, reason); /* Drop the reference */
911 if (unref_pi != nullptr) {
912 /* Unref stale polling island here, outside the fd lock above.
913 The polling island owns a workqueue which owns an fd, and unreffing
914 inside the lock can cause an eventual lock loop that makes TSAN very
915 unhappy. */
916 PI_UNREF(unref_pi, "fd_orphan");
917 }
918 if (error != GRPC_ERROR_NONE) {
919 const char* msg = grpc_error_string(error);
920 gpr_log(GPR_DEBUG, "fd_orphan: %s", msg);
921 }
922 GRPC_ERROR_UNREF(error);
923 }
924
925 static bool fd_is_shutdown(grpc_fd* fd) {
926 return fd->read_closure->IsShutdown();
927 }
928
929 /* Might be called multiple times */
930 static void fd_shutdown(grpc_fd* fd, grpc_error* why) {
931 if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) {
932 shutdown(fd->fd, SHUT_RDWR);
933 fd->write_closure->SetShutdown(GRPC_ERROR_REF(why));
934 fd->error_closure->SetShutdown(GRPC_ERROR_REF(why));
935 }
936 GRPC_ERROR_UNREF(why);
937 }
938
939 static void fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
940 fd->read_closure->NotifyOn(closure);
941 }
942
943 static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
944 fd->write_closure->NotifyOn(closure);
945 }
946
947 static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
948 fd->error_closure->NotifyOn(closure);
949 }
950
951 static void fd_become_readable(grpc_fd* fd) { fd->read_closure->SetReady(); }
952
953 static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); }
954
955 static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); }
956
957 /*******************************************************************************
958 * Pollset Definitions
959 */
960 GPR_TLS_DECL(g_current_thread_pollset);
961 GPR_TLS_DECL(g_current_thread_worker);
962 static __thread bool g_initialized_sigmask;
963 static __thread sigset_t g_orig_sigmask;
964
965 static void sig_handler(int sig_num) {
966 #ifdef GRPC_EPOLL_DEBUG
967 gpr_log(GPR_INFO, "Received signal %d", sig_num);
968 #endif
969 }
970
971 static void poller_kick_init() { signal(grpc_wakeup_signal, sig_handler); }
972
973 /* Global state management */
974 static grpc_error* pollset_global_init(void) {
975 gpr_tls_init(&g_current_thread_pollset);
976 gpr_tls_init(&g_current_thread_worker);
977 poller_kick_init();
978 return GRPC_ERROR_NONE;
979 }
980
981 static void pollset_global_shutdown(void) {
982 gpr_tls_destroy(&g_current_thread_pollset);
983 gpr_tls_destroy(&g_current_thread_worker);
984 }
985
986 static grpc_error* pollset_worker_kick(grpc_pollset_worker* worker) {
987 grpc_error* err = GRPC_ERROR_NONE;
988
989 /* Kick the worker only if it was not already kicked */
990 if (gpr_atm_no_barrier_cas(&worker->is_kicked, static_cast<gpr_atm>(0),
991 static_cast<gpr_atm>(1))) {
992 GRPC_POLLING_TRACE(
993 "pollset_worker_kick: Kicking worker: %p (thread id: %ld)",
994 (void*)worker, (long int)worker->pt_id);
995 int err_num = pthread_kill(worker->pt_id, grpc_wakeup_signal);
996 if (err_num != 0) {
997 err = GRPC_OS_ERROR(err_num, "pthread_kill");
998 }
999 }
1000 return err;
1001 }
1002
1003 /* Return 1 if the pollset has active threads in pollset_work (pollset must
1004 * be locked) */
1005 static int pollset_has_workers(grpc_pollset* p) {
1006 return p->root_worker.next != &p->root_worker;
1007 }
1008
1009 static void remove_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
1010 worker->prev->next = worker->next;
1011 worker->next->prev = worker->prev;
1012 }
1013
1014 static grpc_pollset_worker* pop_front_worker(grpc_pollset* p) {
1015 if (pollset_has_workers(p)) {
1016 grpc_pollset_worker* w = p->root_worker.next;
1017 remove_worker(p, w);
1018 return w;
1019 } else {
1020 return nullptr;
1021 }
1022 }
1023
1024 static void push_back_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
1025 worker->next = &p->root_worker;
1026 worker->prev = worker->next->prev;
1027 worker->prev->next = worker->next->prev = worker;
1028 }
1029
1030 static void push_front_worker(grpc_pollset* p, grpc_pollset_worker* worker) {
1031 worker->prev = &p->root_worker;
1032 worker->next = worker->prev->next;
1033 worker->prev->next = worker->next->prev = worker;
1034 }
1035
1036 /* p->mu must be held before calling this function */
1037 static grpc_error* pollset_kick(grpc_pollset* p,
1038 grpc_pollset_worker* specific_worker) {
1039 GPR_TIMER_SCOPE("pollset_kick", 0);
1040 grpc_error* error = GRPC_ERROR_NONE;
1041 GRPC_STATS_INC_POLLSET_KICK();
1042 const char* err_desc = "Kick Failure";
1043 grpc_pollset_worker* worker = specific_worker;
1044 if (worker != nullptr) {
1045 if (worker == GRPC_POLLSET_KICK_BROADCAST) {
1046 if (pollset_has_workers(p)) {
1047 GPR_TIMER_SCOPE("pollset_kick.broadcast", 0);
1048 for (worker = p->root_worker.next; worker != &p->root_worker;
1049 worker = worker->next) {
1050 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
1051 append_error(&error, pollset_worker_kick(worker), err_desc);
1052 }
1053 }
1054 } else {
1055 p->kicked_without_pollers = true;
1056 }
1057 } else {
1058 GPR_TIMER_MARK("kicked_specifically", 0);
1059 if (gpr_tls_get(&g_current_thread_worker) != (intptr_t)worker) {
1060 append_error(&error, pollset_worker_kick(worker), err_desc);
1061 }
1062 }
1063 } else if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)p) {
1064 /* Since worker == NULL, it means that we can kick "any" worker on this
1065 pollset 'p'. If 'p' happens to be the same pollset this thread is
1066 currently polling (i.e in pollset_work() function), then there is no need
1067 to kick any other worker since the current thread can just absorb the
1068 kick. This is the reason why we enter this case only when
1069 g_current_thread_pollset is != p */
1070
1071 GPR_TIMER_MARK("kick_anonymous", 0);
1072 worker = pop_front_worker(p);
1073 if (worker != nullptr) {
1074 GPR_TIMER_MARK("finally_kick", 0);
1075 push_back_worker(p, worker);
1076 append_error(&error, pollset_worker_kick(worker), err_desc);
1077 } else {
1078 GPR_TIMER_MARK("kicked_no_pollers", 0);
1079 p->kicked_without_pollers = true;
1080 }
1081 }
1082
1083 GRPC_LOG_IF_ERROR("pollset_kick", GRPC_ERROR_REF(error));
1084 return error;
1085 }
1086
1087 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
1088 gpr_mu_init(&pollset->po.mu);
1089 *mu = &pollset->po.mu;
1090 pollset->po.pi = nullptr;
1091 #ifndef NDEBUG
1092 pollset->po.obj_type = POLL_OBJ_POLLSET;
1093 #endif
1094
1095 pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
1096 pollset->kicked_without_pollers = false;
1097
1098 pollset->shutting_down = false;
1099 pollset->finish_shutdown_called = false;
1100 pollset->shutdown_done = nullptr;
1101 }
1102
1103 static int poll_deadline_to_millis_timeout(grpc_millis millis) {
1104 if (millis == GRPC_MILLIS_INF_FUTURE) return -1;
1105 grpc_millis delta = millis - grpc_core::ExecCtx::Get()->Now();
1106 if (delta > INT_MAX)
1107 return INT_MAX;
1108 else if (delta < 0)
1109 return 0;
1110 else
1111 return static_cast<int>(delta);
1112 }
1113
1114 static void pollset_release_polling_island(grpc_pollset* ps,
1115 const char* reason) {
1116 if (ps->po.pi != nullptr) {
1117 PI_UNREF(ps->po.pi, reason);
1118 }
1119 ps->po.pi = nullptr;
1120 }
1121
1122 static void finish_shutdown_locked(grpc_pollset* pollset) {
1123 /* The pollset cannot have any workers if we are at this stage */
1124 GPR_ASSERT(!pollset_has_workers(pollset));
1125
1126 pollset->finish_shutdown_called = true;
1127
1128 /* Release the ref and set pollset->po.pi to NULL */
1129 pollset_release_polling_island(pollset, "ps_shutdown");
1130 GRPC_CLOSURE_SCHED(pollset->shutdown_done, GRPC_ERROR_NONE);
1131 }
1132
1133 /* pollset->po.mu lock must be held by the caller before calling this */
1134 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
1135 GPR_TIMER_SCOPE("pollset_shutdown", 0);
1136 GPR_ASSERT(!pollset->shutting_down);
1137 pollset->shutting_down = true;
1138 pollset->shutdown_done = closure;
1139 pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
1140
1141 /* If the pollset has any workers, we cannot call finish_shutdown_locked()
1142 because it would release the underlying polling island. In such a case, we
1143 let the last worker call finish_shutdown_locked() from pollset_work() */
1144 if (!pollset_has_workers(pollset)) {
1145 GPR_ASSERT(!pollset->finish_shutdown_called);
1146 GPR_TIMER_MARK("pollset_shutdown.finish_shutdown_locked", 0);
1147 finish_shutdown_locked(pollset);
1148 }
1149 }
1150
1151 /* pollset_shutdown is guaranteed to be called before pollset_destroy. So other
1152 * than destroying the mutexes, there is nothing special that needs to be done
1153 * here */
1154 static void pollset_destroy(grpc_pollset* pollset) {
1155 GPR_ASSERT(!pollset_has_workers(pollset));
1156 gpr_mu_destroy(&pollset->po.mu);
1157 }
1158
1159 #define GRPC_EPOLL_MAX_EVENTS 100
1160 /* Note: sig_mask contains the signal mask to use *during* epoll_wait() */
1161 static void pollset_work_and_unlock(grpc_pollset* pollset,
1162 grpc_pollset_worker* worker, int timeout_ms,
1163 sigset_t* sig_mask, grpc_error** error) {
1164 GPR_TIMER_SCOPE("pollset_work_and_unlock", 0);
1165 struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
1166 int epoll_fd = -1;
1167 int ep_rv;
1168 polling_island* pi = nullptr;
1169 char* err_msg;
1170 const char* err_desc = "pollset_work_and_unlock";
1171
1172 /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the
1173 latest polling island pointed by pollset->po.pi
1174
1175 Since epoll_fd is immutable, we can read it without obtaining the polling
1176 island lock. There is however a possibility that the polling island (from
1177 which we got the epoll_fd) got merged with another island while we are
1178 in this function. This is still okay because in such a case, we will wakeup
1179 right-away from epoll_wait() and pick up the latest polling_island the next
1180 this function (i.e pollset_work_and_unlock()) is called */
1181
1182 if (pollset->po.pi == nullptr) {
1183 pollset->po.pi = polling_island_create(nullptr, error);
1184 if (pollset->po.pi == nullptr) {
1185 return; /* Fatal error. We cannot continue */
1186 }
1187
1188 PI_ADD_REF(pollset->po.pi, "ps");
1189 GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p",
1190 (void*)pollset, (void*)pollset->po.pi);
1191 }
1192
1193 pi = polling_island_maybe_get_latest(pollset->po.pi);
1194 epoll_fd = pi->epoll_fd;
1195
1196 /* Update the pollset->po.pi since the island being pointed by
1197 pollset->po.pi maybe older than the one pointed by pi) */
1198 if (pollset->po.pi != pi) {
1199 /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the
1200 polling island to be deleted */
1201 PI_ADD_REF(pi, "ps");
1202 PI_UNREF(pollset->po.pi, "ps");
1203 pollset->po.pi = pi;
1204 }
1205
1206 /* Add an extra ref so that the island does not get destroyed (which means
1207 the epoll_fd won't be closed) while we are are doing an epoll_wait() on the
1208 epoll_fd */
1209 PI_ADD_REF(pi, "ps_work");
1210 gpr_mu_unlock(&pollset->po.mu);
1211
1212 gpr_atm_no_barrier_fetch_add(&pi->poller_count, 1);
1213 g_current_thread_polling_island = pi;
1214
1215 GRPC_SCHEDULING_START_BLOCKING_REGION;
1216 GRPC_STATS_INC_SYSCALL_POLL();
1217 ep_rv =
1218 epoll_pwait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms, sig_mask);
1219 GRPC_SCHEDULING_END_BLOCKING_REGION;
1220 if (ep_rv < 0) {
1221 if (errno != EINTR) {
1222 gpr_asprintf(&err_msg,
1223 "epoll_wait() epoll fd: %d failed with error: %d (%s)",
1224 epoll_fd, errno, strerror(errno));
1225 append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc);
1226 } else {
1227 /* We were interrupted. Save an interation by doing a zero timeout
1228 epoll_wait to see if there are any other events of interest */
1229 GRPC_POLLING_TRACE("pollset_work: pollset: %p, worker: %p received kick",
1230 (void*)pollset, (void*)worker);
1231 ep_rv = epoll_wait(epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
1232 }
1233 }
1234
1235 #ifdef GRPC_TSAN
1236 /* See the definition of g_poll_sync for more details */
1237 gpr_atm_acq_load(&g_epoll_sync);
1238 #endif /* defined(GRPC_TSAN) */
1239
1240 for (int i = 0; i < ep_rv; ++i) {
1241 void* data_ptr = ep_ev[i].data.ptr;
1242 if (data_ptr == &polling_island_wakeup_fd) {
1243 GRPC_POLLING_TRACE(
1244 "pollset_work: pollset: %p, worker: %p polling island (epoll_fd: "
1245 "%d) got merged",
1246 (void*)pollset, (void*)worker, epoll_fd);
1247 /* This means that our polling island is merged with a different
1248 island. We do not have to do anything here since the subsequent call
1249 to the function pollset_work_and_unlock() will pick up the correct
1250 epoll_fd */
1251 } else {
1252 grpc_fd* fd = reinterpret_cast<grpc_fd*>(
1253 reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1));
1254 bool track_err =
1255 reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1);
1256 bool cancel = (ep_ev[i].events & EPOLLHUP) != 0;
1257 bool error = (ep_ev[i].events & EPOLLERR) != 0;
1258 bool read_ev = (ep_ev[i].events & (EPOLLIN | EPOLLPRI)) != 0;
1259 bool write_ev = (ep_ev[i].events & EPOLLOUT) != 0;
1260 bool err_fallback = error && !track_err;
1261
1262 if (error && !err_fallback) {
1263 fd_has_errors(fd);
1264 }
1265 if (read_ev || cancel || err_fallback) {
1266 fd_become_readable(fd);
1267 }
1268 if (write_ev || cancel || err_fallback) {
1269 fd_become_writable(fd);
1270 }
1271 }
1272 }
1273
1274 g_current_thread_polling_island = nullptr;
1275 gpr_atm_no_barrier_fetch_add(&pi->poller_count, -1);
1276
1277 GPR_ASSERT(pi != nullptr);
1278
1279 /* Before leaving, release the extra ref we added to the polling island. It
1280 is important to use "pi" here (i.e our old copy of pollset->po.pi
1281 that we got before releasing the polling island lock). This is because
1282 pollset->po.pi pointer might get udpated in other parts of the
1283 code when there is an island merge while we are doing epoll_wait() above */
1284 PI_UNREF(pi, "ps_work");
1285 }
1286
1287 /* pollset->po.mu lock must be held by the caller before calling this.
1288 The function pollset_work() may temporarily release the lock (pollset->po.mu)
1289 during the course of its execution but it will always re-acquire the lock and
1290 ensure that it is held by the time the function returns */
1291 static grpc_error* pollset_work(grpc_pollset* pollset,
1292 grpc_pollset_worker** worker_hdl,
1293 grpc_millis deadline) {
1294 GPR_TIMER_SCOPE("pollset_work", 0);
1295 grpc_error* error = GRPC_ERROR_NONE;
1296 int timeout_ms = poll_deadline_to_millis_timeout(deadline);
1297
1298 sigset_t new_mask;
1299
1300 grpc_pollset_worker worker;
1301 worker.next = worker.prev = nullptr;
1302 worker.pt_id = pthread_self();
1303 gpr_atm_no_barrier_store(&worker.is_kicked, (gpr_atm)0);
1304
1305 if (worker_hdl) *worker_hdl = &worker;
1306
1307 gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
1308 gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
1309
1310 if (pollset->kicked_without_pollers) {
1311 /* If the pollset was kicked without pollers, pretend that the current
1312 worker got the kick and skip polling. A kick indicates that there is some
1313 work that needs attention like an event on the completion queue or an
1314 alarm */
1315 GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
1316 pollset->kicked_without_pollers = 0;
1317 } else if (!pollset->shutting_down) {
1318 /* We use the posix-signal with number 'grpc_wakeup_signal' for waking up
1319 (i.e 'kicking') a worker in the pollset. A 'kick' is a way to inform the
1320 worker that there is some pending work that needs immediate attention
1321 (like an event on the completion queue, or a polling island merge that
1322 results in a new epoll-fd to wait on) and that the worker should not
1323 spend time waiting in epoll_pwait().
1324
1325 A worker can be kicked anytime from the point it is added to the pollset
1326 via push_front_worker() (or push_back_worker()) to the point it is
1327 removed via remove_worker().
1328 If the worker is kicked before/during it calls epoll_pwait(), it should
1329 immediately exit from epoll_wait(). If the worker is kicked after it
1330 returns from epoll_wait(), then nothing really needs to be done.
1331
1332 To accomplish this, we mask 'grpc_wakeup_signal' on this thread at all
1333 times *except* when it is in epoll_pwait(). This way, the worker never
1334 misses acting on a kick */
1335
1336 if (!g_initialized_sigmask) {
1337 sigemptyset(&new_mask);
1338 sigaddset(&new_mask, grpc_wakeup_signal);
1339 pthread_sigmask(SIG_BLOCK, &new_mask, &g_orig_sigmask);
1340 sigdelset(&g_orig_sigmask, grpc_wakeup_signal);
1341 g_initialized_sigmask = true;
1342 /* new_mask: The new thread mask which blocks 'grpc_wakeup_signal'.
1343 This is the mask used at all times *except during
1344 epoll_wait()*"
1345 g_orig_sigmask: The thread mask which allows 'grpc_wakeup_signal' and
1346 this is the mask to use *during epoll_wait()*
1347
1348 The new_mask is set on the worker before it is added to the pollset
1349 (i.e before it can be kicked) */
1350 }
1351
1352 push_front_worker(pollset, &worker); /* Add worker to pollset */
1353
1354 pollset_work_and_unlock(pollset, &worker, timeout_ms, &g_orig_sigmask,
1355 &error);
1356 grpc_core::ExecCtx::Get()->Flush();
1357
1358 gpr_mu_lock(&pollset->po.mu);
1359
1360 /* Note: There is no need to reset worker.is_kicked to 0 since we are no
1361 longer going to use this worker */
1362 remove_worker(pollset, &worker);
1363 }
1364
1365 /* If we are the last worker on the pollset (i.e pollset_has_workers() is
1366 false at this point) and the pollset is shutting down, we may have to
1367 finish the shutdown process by calling finish_shutdown_locked().
1368 See pollset_shutdown() for more details.
1369
1370 Note: Continuing to access pollset here is safe; it is the caller's
1371 responsibility to not destroy a pollset when it has outstanding calls to
1372 pollset_work() */
1373 if (pollset->shutting_down && !pollset_has_workers(pollset) &&
1374 !pollset->finish_shutdown_called) {
1375 GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0);
1376 finish_shutdown_locked(pollset);
1377
1378 gpr_mu_unlock(&pollset->po.mu);
1379 grpc_core::ExecCtx::Get()->Flush();
1380 gpr_mu_lock(&pollset->po.mu);
1381 }
1382
1383 if (worker_hdl) *worker_hdl = nullptr;
1384
1385 gpr_tls_set(&g_current_thread_pollset, (intptr_t)0);
1386 gpr_tls_set(&g_current_thread_worker, (intptr_t)0);
1387
1388 GRPC_LOG_IF_ERROR("pollset_work", GRPC_ERROR_REF(error));
1389 return error;
1390 }
1391
1392 static void add_poll_object(poll_obj* bag, poll_obj_type bag_type,
1393 poll_obj* item, poll_obj_type item_type) {
1394 GPR_TIMER_SCOPE("add_poll_object", 0);
1395
1396 #ifndef NDEBUG
1397 GPR_ASSERT(item->obj_type == item_type);
1398 GPR_ASSERT(bag->obj_type == bag_type);
1399 #endif
1400
1401 grpc_error* error = GRPC_ERROR_NONE;
1402 polling_island* pi_new = nullptr;
1403
1404 gpr_mu_lock(&bag->mu);
1405 gpr_mu_lock(&item->mu);
1406
1407 retry:
1408 /*
1409 * 1) If item->pi and bag->pi are both non-NULL and equal, do nothing
1410 * 2) If item->pi and bag->pi are both NULL, create a new polling island (with
1411 * a refcount of 2) and point item->pi and bag->pi to the new island
1412 * 3) If exactly one of item->pi or bag->pi is NULL, update it to point to
1413 * the other's non-NULL pi
1414 * 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the
1415 * polling islands and update item->pi and bag->pi to point to the new
1416 * island
1417 */
1418
1419 /* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already
1420 * orphaned */
1421 if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) {
1422 gpr_mu_unlock(&item->mu);
1423 gpr_mu_unlock(&bag->mu);
1424 return;
1425 }
1426
1427 if (item->pi == bag->pi) {
1428 pi_new = item->pi;
1429 if (pi_new == nullptr) {
1430 /* GPR_ASSERT(item->pi == bag->pi == NULL) */
1431
1432 /* If we are adding an fd to a bag (i.e pollset or pollset_set), then
1433 * we need to do some extra work to make TSAN happy */
1434 if (item_type == POLL_OBJ_FD) {
1435 /* Unlock before creating a new polling island: the polling island will
1436 create a workqueue which creates a file descriptor, and holding an fd
1437 lock here can eventually cause a loop to appear to TSAN (making it
1438 unhappy). We don't think it's a real loop (there's an epoch point
1439 where that loop possibility disappears), but the advantages of
1440 keeping TSAN happy outweigh any performance advantage we might have
1441 by keeping the lock held. */
1442 gpr_mu_unlock(&item->mu);
1443 pi_new = polling_island_create(FD_FROM_PO(item), &error);
1444 gpr_mu_lock(&item->mu);
1445
1446 /* Need to reverify any assumptions made between the initial lock and
1447 getting to this branch: if they've changed, we need to throw away our
1448 work and figure things out again. */
1449 if (item->pi != nullptr) {
1450 GRPC_POLLING_TRACE(
1451 "add_poll_object: Raced creating new polling island. pi_new: %p "
1452 "(fd: %d, %s: %p)",
1453 (void*)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type),
1454 (void*)bag);
1455 /* No need to lock 'pi_new' here since this is a new polling island
1456 and no one has a reference to it yet */
1457 polling_island_remove_all_fds_locked(pi_new, true, &error);
1458
1459 /* Ref and unref so that the polling island gets deleted during unref
1460 */
1461 PI_ADD_REF(pi_new, "dance_of_destruction");
1462 PI_UNREF(pi_new, "dance_of_destruction");
1463 goto retry;
1464 }
1465 } else {
1466 pi_new = polling_island_create(nullptr, &error);
1467 }
1468
1469 GRPC_POLLING_TRACE(
1470 "add_poll_object: Created new polling island. pi_new: %p (%s: %p, "
1471 "%s: %p)",
1472 (void*)pi_new, poll_obj_string(item_type), (void*)item,
1473 poll_obj_string(bag_type), (void*)bag);
1474 } else {
1475 GRPC_POLLING_TRACE(
1476 "add_poll_object: Same polling island. pi: %p (%s, %s)",
1477 (void*)pi_new, poll_obj_string(item_type), poll_obj_string(bag_type));
1478 }
1479 } else if (item->pi == nullptr) {
1480 /* GPR_ASSERT(bag->pi != NULL) */
1481 /* Make pi_new point to latest pi*/
1482 pi_new = polling_island_lock(bag->pi);
1483
1484 if (item_type == POLL_OBJ_FD) {
1485 grpc_fd* fd = FD_FROM_PO(item);
1486 polling_island_add_fds_locked(pi_new, &fd, 1, true, &error);
1487 }
1488
1489 gpr_mu_unlock(&pi_new->mu);
1490 GRPC_POLLING_TRACE(
1491 "add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, "
1492 "bag(%s): %p)",
1493 (void*)pi_new, poll_obj_string(item_type), (void*)item,
1494 poll_obj_string(bag_type), (void*)bag);
1495 } else if (bag->pi == nullptr) {
1496 /* GPR_ASSERT(item->pi != NULL) */
1497 /* Make pi_new to point to latest pi */
1498 pi_new = polling_island_lock(item->pi);
1499 gpr_mu_unlock(&pi_new->mu);
1500 GRPC_POLLING_TRACE(
1501 "add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, "
1502 "bag(%s): %p)",
1503 (void*)pi_new, poll_obj_string(item_type), (void*)item,
1504 poll_obj_string(bag_type), (void*)bag);
1505 } else {
1506 pi_new = polling_island_merge(item->pi, bag->pi, &error);
1507 GRPC_POLLING_TRACE(
1508 "add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, "
1509 "bag(%s): %p)",
1510 (void*)pi_new, poll_obj_string(item_type), (void*)item,
1511 poll_obj_string(bag_type), (void*)bag);
1512 }
1513
1514 /* At this point, pi_new is the polling island that both item->pi and bag->pi
1515 MUST be pointing to */
1516
1517 if (item->pi != pi_new) {
1518 PI_ADD_REF(pi_new, poll_obj_string(item_type));
1519 if (item->pi != nullptr) {
1520 PI_UNREF(item->pi, poll_obj_string(item_type));
1521 }
1522 item->pi = pi_new;
1523 }
1524
1525 if (bag->pi != pi_new) {
1526 PI_ADD_REF(pi_new, poll_obj_string(bag_type));
1527 if (bag->pi != nullptr) {
1528 PI_UNREF(bag->pi, poll_obj_string(bag_type));
1529 }
1530 bag->pi = pi_new;
1531 }
1532
1533 gpr_mu_unlock(&item->mu);
1534 gpr_mu_unlock(&bag->mu);
1535
1536 GRPC_LOG_IF_ERROR("add_poll_object", error);
1537 }
1538
1539 static void pollset_add_fd(grpc_pollset* pollset, grpc_fd* fd) {
1540 add_poll_object(&pollset->po, POLL_OBJ_POLLSET, &fd->po, POLL_OBJ_FD);
1541 }
1542
1543 /*******************************************************************************
1544 * Pollset-set Definitions
1545 */
1546
1547 static grpc_pollset_set* pollset_set_create(void) {
1548 grpc_pollset_set* pss =
1549 static_cast<grpc_pollset_set*>(gpr_malloc(sizeof(*pss)));
1550 gpr_mu_init(&pss->po.mu);
1551 pss->po.pi = nullptr;
1552 #ifndef NDEBUG
1553 pss->po.obj_type = POLL_OBJ_POLLSET_SET;
1554 #endif
1555 return pss;
1556 }
1557
1558 static void pollset_set_destroy(grpc_pollset_set* pss) {
1559 gpr_mu_destroy(&pss->po.mu);
1560
1561 if (pss->po.pi != nullptr) {
1562 PI_UNREF(pss->po.pi, "pss_destroy");
1563 }
1564
1565 gpr_free(pss);
1566 }
1567
1568 static void pollset_set_add_fd(grpc_pollset_set* pss, grpc_fd* fd) {
1569 add_poll_object(&pss->po, POLL_OBJ_POLLSET_SET, &fd->po, POLL_OBJ_FD);
1570 }
1571
1572 static void pollset_set_del_fd(grpc_pollset_set* pss, grpc_fd* fd) {
1573 /* Nothing to do */
1574 }
1575
1576 static void pollset_set_add_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {
1577 add_poll_object(&pss->po, POLL_OBJ_POLLSET_SET, &ps->po, POLL_OBJ_POLLSET);
1578 }
1579
1580 static void pollset_set_del_pollset(grpc_pollset_set* pss, grpc_pollset* ps) {
1581 /* Nothing to do */
1582 }
1583
1584 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
1585 grpc_pollset_set* item) {
1586 add_poll_object(&bag->po, POLL_OBJ_POLLSET_SET, &item->po,
1587 POLL_OBJ_POLLSET_SET);
1588 }
1589
1590 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
1591 grpc_pollset_set* item) {
1592 /* Nothing to do */
1593 }
1594
1595 /* Test helper functions
1596 * */
1597 void* grpc_fd_get_polling_island(grpc_fd* fd) {
1598 polling_island* pi;
1599
1600 gpr_mu_lock(&fd->po.mu);
1601 pi = fd->po.pi;
1602 gpr_mu_unlock(&fd->po.mu);
1603
1604 return pi;
1605 }
1606
1607 void* grpc_pollset_get_polling_island(grpc_pollset* ps) {
1608 polling_island* pi;
1609
1610 gpr_mu_lock(&ps->po.mu);
1611 pi = ps->po.pi;
1612 gpr_mu_unlock(&ps->po.mu);
1613
1614 return pi;
1615 }
1616
1617 bool grpc_are_polling_islands_equal(void* p, void* q) {
1618 polling_island* p1 = static_cast<polling_island*>(p);
1619 polling_island* p2 = static_cast<polling_island*>(q);
1620
1621 /* Note: polling_island_lock_pair() may change p1 and p2 to point to the
1622 latest polling islands in their respective linked lists */
1623 polling_island_lock_pair(&p1, &p2);
1624 polling_island_unlock_pair(p1, p2);
1625
1626 return p1 == p2;
1627 }
1628
1629 /*******************************************************************************
1630 * Event engine binding
1631 */
1632
1633 static void shutdown_engine(void) {
1634 fd_global_shutdown();
1635 pollset_global_shutdown();
1636 polling_island_global_shutdown();
1637 }
1638
1639 static const grpc_event_engine_vtable vtable = {
1640 sizeof(grpc_pollset),
1641 true,
1642
1643 fd_create,
1644 fd_wrapped_fd,
1645 fd_orphan,
1646 fd_shutdown,
1647 fd_notify_on_read,
1648 fd_notify_on_write,
1649 fd_notify_on_error,
1650 fd_become_readable,
1651 fd_become_writable,
1652 fd_has_errors,
1653 fd_is_shutdown,
1654
1655 pollset_init,
1656 pollset_shutdown,
1657 pollset_destroy,
1658 pollset_work,
1659 pollset_kick,
1660 pollset_add_fd,
1661
1662 pollset_set_create,
1663 pollset_set_destroy,
1664 pollset_set_add_pollset,
1665 pollset_set_del_pollset,
1666 pollset_set_add_pollset_set,
1667 pollset_set_del_pollset_set,
1668 pollset_set_add_fd,
1669 pollset_set_del_fd,
1670
1671 shutdown_engine,
1672 };
1673
1674 /* It is possible that GLIBC has epoll but the underlying kernel doesn't.
1675 * Create a dummy epoll_fd to make sure epoll support is available */
1676 static bool is_epoll_available() {
1677 int fd = epoll_create1(EPOLL_CLOEXEC);
1678 if (fd < 0) {
1679 gpr_log(
1680 GPR_ERROR,
1681 "epoll_create1 failed with error: %d. Not using epoll polling engine",
1682 fd);
1683 return false;
1684 }
1685 close(fd);
1686 return true;
1687 }
1688
1689 const grpc_event_engine_vtable* grpc_init_epollsig_linux(
1690 bool explicit_request) {
1691 /* If use of signals is disabled, we cannot use epoll engine*/
1692 if (is_grpc_wakeup_signal_initialized && grpc_wakeup_signal < 0) {
1693 gpr_log(GPR_ERROR, "Skipping epollsig because use of signals is disabled.");
1694 return nullptr;
1695 }
1696
1697 if (!grpc_has_wakeup_fd()) {
1698 gpr_log(GPR_ERROR, "Skipping epollsig because of no wakeup fd.");
1699 return nullptr;
1700 }
1701
1702 if (!is_epoll_available()) {
1703 gpr_log(GPR_ERROR, "Skipping epollsig because epoll is unavailable.");
1704 return nullptr;
1705 }
1706
1707 if (!is_grpc_wakeup_signal_initialized) {
1708 if (explicit_request) {
1709 grpc_use_signal(SIGRTMIN + 6);
1710 } else {
1711 gpr_log(GPR_ERROR,
1712 "Skipping epollsig because uninitialized wakeup signal.");
1713 return nullptr;
1714 }
1715 }
1716
1717 fd_global_init();
1718
1719 if (!GRPC_LOG_IF_ERROR("pollset_global_init", pollset_global_init())) {
1720 return nullptr;
1721 }
1722
1723 if (!GRPC_LOG_IF_ERROR("polling_island_global_init",
1724 polling_island_global_init())) {
1725 return nullptr;
1726 }
1727
1728 return &vtable;
1729 }
1730
1731 #else /* defined(GRPC_LINUX_EPOLL_CREATE1) */
1732 #if defined(GRPC_POSIX_SOCKET_EV_EPOLLSIG)
1733 #include "src/core/lib/iomgr/ev_epollsig_linux.h"
1734 /* If GRPC_LINUX_EPOLL_CREATE1 is not defined, it means
1735 epoll_create1 is not available. Return NULL */
1736 const grpc_event_engine_vtable* grpc_init_epollsig_linux(
1737 bool explicit_request) {
1738 return nullptr;
1739 }
1740 #endif /* defined(GRPC_POSIX_SOCKET) */
1741
1742 void grpc_use_signal(int signum) {}
1743 #endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */
1744