1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_POSIX_SOCKET_EV
24
25 #include "src/core/lib/iomgr/ev_posix.h"
26
27 #include <string.h>
28
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32
33 #include "src/core/lib/debug/trace.h"
34 #include "src/core/lib/gpr/useful.h"
35 #include "src/core/lib/gprpp/global_config.h"
36 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
37 #include "src/core/lib/iomgr/ev_epollex_linux.h"
38 #include "src/core/lib/iomgr/ev_poll_posix.h"
39 #include "src/core/lib/iomgr/internal_errqueue.h"
40 #include "src/core/lib/iomgr/iomgr.h"
41
42 GPR_GLOBAL_CONFIG_DEFINE_STRING(
43 grpc_poll_strategy, "all",
44 "Declares which polling engines to try when starting gRPC. "
45 "This is a comma-separated list of engines, which are tried in priority "
46 "order first -> last.")
47
48 grpc_core::DebugOnlyTraceFlag grpc_polling_trace(
49 false, "polling"); /* Disabled by default */
50
51 /* Traces fd create/close operations */
52 grpc_core::DebugOnlyTraceFlag grpc_fd_trace(false, "fd_trace");
53 grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount");
54 grpc_core::DebugOnlyTraceFlag grpc_polling_api_trace(false, "polling_api");
55
56 // Polling API trace only enabled in debug builds
57 #ifndef NDEBUG
58 #define GRPC_POLLING_API_TRACE(format, ...) \
59 if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_api_trace)) { \
60 gpr_log(GPR_INFO, "(polling-api) " format, __VA_ARGS__); \
61 }
62 #else
63 #define GRPC_POLLING_API_TRACE(...)
64 #endif // NDEBUG
65
66 /** Default poll() function - a pointer so that it can be overridden by some
67 * tests */
68 #ifndef GPR_AIX
69 grpc_poll_function_type grpc_poll_function = poll;
70 #else
aix_poll(struct pollfd fds[],nfds_t nfds,int timeout)71 int aix_poll(struct pollfd fds[], nfds_t nfds, int timeout) {
72 return poll(fds, nfds, timeout);
73 }
74 grpc_poll_function_type grpc_poll_function = aix_poll;
75 #endif // GPR_AIX
76
77 grpc_wakeup_fd grpc_global_wakeup_fd;
78
79 static const grpc_event_engine_vtable* g_event_engine = nullptr;
80 static const char* g_poll_strategy_name = nullptr;
81
82 typedef const grpc_event_engine_vtable* (*event_engine_factory_fn)(
83 bool explicit_request);
84
85 struct event_engine_factory {
86 const char* name;
87 event_engine_factory_fn factory;
88 };
89 namespace {
90
91 grpc_poll_function_type real_poll_function;
92
dummy_poll(struct pollfd fds[],nfds_t nfds,int timeout)93 int dummy_poll(struct pollfd fds[], nfds_t nfds, int timeout) {
94 if (timeout == 0) {
95 return real_poll_function(fds, nfds, 0);
96 } else {
97 gpr_log(GPR_ERROR, "Attempted a blocking poll when declared non-polling.");
98 GPR_ASSERT(false);
99 return -1;
100 }
101 }
102
init_non_polling(bool explicit_request)103 const grpc_event_engine_vtable* init_non_polling(bool explicit_request) {
104 if (!explicit_request) {
105 return nullptr;
106 }
107 // return the simplest engine as a dummy but also override the poller
108 auto ret = grpc_init_poll_posix(explicit_request);
109 real_poll_function = grpc_poll_function;
110 grpc_poll_function = dummy_poll;
111 grpc_iomgr_mark_non_polling_internal();
112
113 return ret;
114 }
115 } // namespace
116
117 #define ENGINE_HEAD_CUSTOM "head_custom"
118 #define ENGINE_TAIL_CUSTOM "tail_custom"
119
120 // The global array of event-engine factories. Each entry is a pair with a name
121 // and an event-engine generator function (nullptr if there is no generator
122 // registered for this name). The middle entries are the engines predefined by
123 // open-source gRPC. The head entries represent an opportunity for specific
124 // high-priority custom pollers to be added by the initializer plugins of
125 // custom-built gRPC libraries. The tail entries represent the same, but for
126 // low-priority custom pollers. The actual poller selected is either the first
127 // available one in the list if no specific poller is requested, or the first
128 // specific poller that is requested by name in the GRPC_POLL_STRATEGY
129 // environment variable if that variable is set (which should be a
130 // comma-separated list of one or more event engine names)
131 static event_engine_factory g_factories[] = {
132 {ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr},
133 {ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr},
134 {"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux},
135 {"poll", grpc_init_poll_posix}, {"none", init_non_polling},
136 {ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr},
137 {ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr},
138 };
139
add(const char * beg,const char * end,char *** ss,size_t * ns)140 static void add(const char* beg, const char* end, char*** ss, size_t* ns) {
141 size_t n = *ns;
142 size_t np = n + 1;
143 char* s;
144 size_t len;
145 GPR_ASSERT(end >= beg);
146 len = static_cast<size_t>(end - beg);
147 s = static_cast<char*>(gpr_malloc(len + 1));
148 memcpy(s, beg, len);
149 s[len] = 0;
150 *ss = static_cast<char**>(gpr_realloc(*ss, sizeof(char**) * np));
151 (*ss)[n] = s;
152 *ns = np;
153 }
154
split(const char * s,char *** ss,size_t * ns)155 static void split(const char* s, char*** ss, size_t* ns) {
156 const char* c = strchr(s, ',');
157 if (c == nullptr) {
158 add(s, s + strlen(s), ss, ns);
159 } else {
160 add(s, c, ss, ns);
161 split(c + 1, ss, ns);
162 }
163 }
164
is(const char * want,const char * have)165 static bool is(const char* want, const char* have) {
166 return 0 == strcmp(want, "all") || 0 == strcmp(want, have);
167 }
168
try_engine(const char * engine)169 static void try_engine(const char* engine) {
170 for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
171 if (g_factories[i].factory != nullptr && is(engine, g_factories[i].name)) {
172 if ((g_event_engine = g_factories[i].factory(
173 0 == strcmp(engine, g_factories[i].name)))) {
174 g_poll_strategy_name = g_factories[i].name;
175 gpr_log(GPR_DEBUG, "Using polling engine: %s", g_factories[i].name);
176 return;
177 }
178 }
179 }
180 }
181
182 /* Call this before calling grpc_event_engine_init() */
grpc_register_event_engine_factory(const char * name,event_engine_factory_fn factory,bool add_at_head)183 void grpc_register_event_engine_factory(const char* name,
184 event_engine_factory_fn factory,
185 bool add_at_head) {
186 const char* custom_match =
187 add_at_head ? ENGINE_HEAD_CUSTOM : ENGINE_TAIL_CUSTOM;
188
189 // Overwrite an existing registration if already registered
190 for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
191 if (0 == strcmp(name, g_factories[i].name)) {
192 g_factories[i].factory = factory;
193 return;
194 }
195 }
196
197 // Otherwise fill in an available custom slot
198 for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
199 if (0 == strcmp(g_factories[i].name, custom_match)) {
200 g_factories[i].name = name;
201 g_factories[i].factory = factory;
202 return;
203 }
204 }
205
206 // Otherwise fail
207 GPR_ASSERT(false);
208 }
209
210 /*If grpc_event_engine_init() has been called, returns the poll_strategy_name.
211 * Otherwise, returns nullptr. */
grpc_get_poll_strategy_name()212 const char* grpc_get_poll_strategy_name() { return g_poll_strategy_name; }
213
grpc_event_engine_init(void)214 void grpc_event_engine_init(void) {
215 grpc_core::UniquePtr<char> value = GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy);
216
217 char** strings = nullptr;
218 size_t nstrings = 0;
219 split(value.get(), &strings, &nstrings);
220
221 for (size_t i = 0; g_event_engine == nullptr && i < nstrings; i++) {
222 try_engine(strings[i]);
223 }
224
225 for (size_t i = 0; i < nstrings; i++) {
226 gpr_free(strings[i]);
227 }
228 gpr_free(strings);
229
230 if (g_event_engine == nullptr) {
231 gpr_log(GPR_ERROR, "No event engine could be initialized from %s",
232 value.get());
233 abort();
234 }
235 }
236
grpc_event_engine_shutdown(void)237 void grpc_event_engine_shutdown(void) {
238 g_event_engine->shutdown_engine();
239 g_event_engine = nullptr;
240 }
241
grpc_event_engine_can_track_errors(void)242 bool grpc_event_engine_can_track_errors(void) {
243 /* Only track errors if platform supports errqueue. */
244 if (grpc_core::kernel_supports_errqueue()) {
245 return g_event_engine->can_track_err;
246 }
247 return false;
248 }
249
grpc_event_engine_run_in_background(void)250 bool grpc_event_engine_run_in_background(void) {
251 // g_event_engine is nullptr when using a custom iomgr.
252 return g_event_engine != nullptr && g_event_engine->run_in_background;
253 }
254
grpc_fd_create(int fd,const char * name,bool track_err)255 grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) {
256 GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
257 GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
258 return g_event_engine->fd_create(
259 fd, name, track_err && grpc_event_engine_can_track_errors());
260 }
261
grpc_fd_wrapped_fd(grpc_fd * fd)262 int grpc_fd_wrapped_fd(grpc_fd* fd) {
263 return g_event_engine->fd_wrapped_fd(fd);
264 }
265
grpc_fd_orphan(grpc_fd * fd,grpc_closure * on_done,int * release_fd,const char * reason)266 void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
267 const char* reason) {
268 GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %s)", grpc_fd_wrapped_fd(fd),
269 on_done, release_fd, reason);
270 GRPC_FD_TRACE("grpc_fd_orphan, fd:%d closed", grpc_fd_wrapped_fd(fd));
271
272 g_event_engine->fd_orphan(fd, on_done, release_fd, reason);
273 }
274
grpc_fd_shutdown(grpc_fd * fd,grpc_error * why)275 void grpc_fd_shutdown(grpc_fd* fd, grpc_error* why) {
276 GRPC_POLLING_API_TRACE("fd_shutdown(%d)", grpc_fd_wrapped_fd(fd));
277 GRPC_FD_TRACE("fd_shutdown(%d)", grpc_fd_wrapped_fd(fd));
278 g_event_engine->fd_shutdown(fd, why);
279 }
280
grpc_fd_is_shutdown(grpc_fd * fd)281 bool grpc_fd_is_shutdown(grpc_fd* fd) {
282 return g_event_engine->fd_is_shutdown(fd);
283 }
284
grpc_fd_notify_on_read(grpc_fd * fd,grpc_closure * closure)285 void grpc_fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
286 g_event_engine->fd_notify_on_read(fd, closure);
287 }
288
grpc_fd_notify_on_write(grpc_fd * fd,grpc_closure * closure)289 void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
290 g_event_engine->fd_notify_on_write(fd, closure);
291 }
292
grpc_fd_notify_on_error(grpc_fd * fd,grpc_closure * closure)293 void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
294 g_event_engine->fd_notify_on_error(fd, closure);
295 }
296
grpc_fd_set_readable(grpc_fd * fd)297 void grpc_fd_set_readable(grpc_fd* fd) { g_event_engine->fd_set_readable(fd); }
298
grpc_fd_set_writable(grpc_fd * fd)299 void grpc_fd_set_writable(grpc_fd* fd) { g_event_engine->fd_set_writable(fd); }
300
grpc_fd_set_error(grpc_fd * fd)301 void grpc_fd_set_error(grpc_fd* fd) { g_event_engine->fd_set_error(fd); }
302
pollset_size(void)303 static size_t pollset_size(void) { return g_event_engine->pollset_size; }
304
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)305 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
306 GRPC_POLLING_API_TRACE("pollset_init(%p)", pollset);
307 g_event_engine->pollset_init(pollset, mu);
308 }
309
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)310 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
311 GRPC_POLLING_API_TRACE("pollset_shutdown(%p)", pollset);
312 g_event_engine->pollset_shutdown(pollset, closure);
313 }
314
pollset_destroy(grpc_pollset * pollset)315 static void pollset_destroy(grpc_pollset* pollset) {
316 GRPC_POLLING_API_TRACE("pollset_destroy(%p)", pollset);
317 g_event_engine->pollset_destroy(pollset);
318 }
319
pollset_work(grpc_pollset * pollset,grpc_pollset_worker ** worker,grpc_millis deadline)320 static grpc_error* pollset_work(grpc_pollset* pollset,
321 grpc_pollset_worker** worker,
322 grpc_millis deadline) {
323 GRPC_POLLING_API_TRACE("pollset_work(%p, %" PRId64 ") begin", pollset,
324 deadline);
325 grpc_error* err = g_event_engine->pollset_work(pollset, worker, deadline);
326 GRPC_POLLING_API_TRACE("pollset_work(%p, %" PRId64 ") end", pollset,
327 deadline);
328 return err;
329 }
330
pollset_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)331 static grpc_error* pollset_kick(grpc_pollset* pollset,
332 grpc_pollset_worker* specific_worker) {
333 GRPC_POLLING_API_TRACE("pollset_kick(%p, %p)", pollset, specific_worker);
334 return g_event_engine->pollset_kick(pollset, specific_worker);
335 }
336
grpc_pollset_add_fd(grpc_pollset * pollset,struct grpc_fd * fd)337 void grpc_pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd) {
338 GRPC_POLLING_API_TRACE("pollset_add_fd(%p, %d)", pollset,
339 grpc_fd_wrapped_fd(fd));
340 g_event_engine->pollset_add_fd(pollset, fd);
341 }
342
pollset_global_init()343 void pollset_global_init() {}
pollset_global_shutdown()344 void pollset_global_shutdown() {}
345
346 grpc_pollset_vtable grpc_posix_pollset_vtable = {
347 pollset_global_init, pollset_global_shutdown,
348 pollset_init, pollset_shutdown,
349 pollset_destroy, pollset_work,
350 pollset_kick, pollset_size};
351
pollset_set_create(void)352 static grpc_pollset_set* pollset_set_create(void) {
353 grpc_pollset_set* pss = g_event_engine->pollset_set_create();
354 GRPC_POLLING_API_TRACE("pollset_set_create(%p)", pss);
355 return pss;
356 }
357
pollset_set_destroy(grpc_pollset_set * pollset_set)358 static void pollset_set_destroy(grpc_pollset_set* pollset_set) {
359 GRPC_POLLING_API_TRACE("pollset_set_destroy(%p)", pollset_set);
360 g_event_engine->pollset_set_destroy(pollset_set);
361 }
362
pollset_set_add_pollset(grpc_pollset_set * pollset_set,grpc_pollset * pollset)363 static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
364 grpc_pollset* pollset) {
365 GRPC_POLLING_API_TRACE("pollset_set_add_pollset(%p, %p)", pollset_set,
366 pollset);
367 g_event_engine->pollset_set_add_pollset(pollset_set, pollset);
368 }
369
pollset_set_del_pollset(grpc_pollset_set * pollset_set,grpc_pollset * pollset)370 static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
371 grpc_pollset* pollset) {
372 GRPC_POLLING_API_TRACE("pollset_set_del_pollset(%p, %p)", pollset_set,
373 pollset);
374 g_event_engine->pollset_set_del_pollset(pollset_set, pollset);
375 }
376
pollset_set_add_pollset_set(grpc_pollset_set * bag,grpc_pollset_set * item)377 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
378 grpc_pollset_set* item) {
379 GRPC_POLLING_API_TRACE("pollset_set_add_pollset_set(%p, %p)", bag, item);
380 g_event_engine->pollset_set_add_pollset_set(bag, item);
381 }
382
pollset_set_del_pollset_set(grpc_pollset_set * bag,grpc_pollset_set * item)383 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
384 grpc_pollset_set* item) {
385 GRPC_POLLING_API_TRACE("pollset_set_del_pollset_set(%p, %p)", bag, item);
386 g_event_engine->pollset_set_del_pollset_set(bag, item);
387 }
388
389 grpc_pollset_set_vtable grpc_posix_pollset_set_vtable = {
390 pollset_set_create, pollset_set_destroy,
391 pollset_set_add_pollset, pollset_set_del_pollset,
392 pollset_set_add_pollset_set, pollset_set_del_pollset_set};
393
grpc_pollset_set_add_fd(grpc_pollset_set * pollset_set,grpc_fd * fd)394 void grpc_pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
395 GRPC_POLLING_API_TRACE("pollset_set_add_fd(%p, %d)", pollset_set,
396 grpc_fd_wrapped_fd(fd));
397 g_event_engine->pollset_set_add_fd(pollset_set, fd);
398 }
399
grpc_pollset_set_del_fd(grpc_pollset_set * pollset_set,grpc_fd * fd)400 void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
401 GRPC_POLLING_API_TRACE("pollset_set_del_fd(%p, %d)", pollset_set,
402 grpc_fd_wrapped_fd(fd));
403 g_event_engine->pollset_set_del_fd(pollset_set, fd);
404 }
405
grpc_is_any_background_poller_thread(void)406 bool grpc_is_any_background_poller_thread(void) {
407 return g_event_engine->is_any_background_poller_thread();
408 }
409
grpc_add_closure_to_background_poller(grpc_closure * closure,grpc_error * error)410 bool grpc_add_closure_to_background_poller(grpc_closure* closure,
411 grpc_error* error) {
412 return g_event_engine->add_closure_to_background_poller(closure, error);
413 }
414
grpc_shutdown_background_closure(void)415 void grpc_shutdown_background_closure(void) {
416 g_event_engine->shutdown_background_closure();
417 }
418
419 #endif // GRPC_POSIX_SOCKET_EV
420