• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_POSIX_SOCKET_EV
24 
25 #include "src/core/lib/iomgr/ev_posix.h"
26 
27 #include <string.h>
28 
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 
33 #include "src/core/lib/debug/trace.h"
34 #include "src/core/lib/gpr/useful.h"
35 #include "src/core/lib/gprpp/global_config.h"
36 #include "src/core/lib/iomgr/ev_epoll1_linux.h"
37 #include "src/core/lib/iomgr/ev_epollex_linux.h"
38 #include "src/core/lib/iomgr/ev_poll_posix.h"
39 #include "src/core/lib/iomgr/internal_errqueue.h"
40 #include "src/core/lib/iomgr/iomgr.h"
41 
42 GPR_GLOBAL_CONFIG_DEFINE_STRING(
43     grpc_poll_strategy, "all",
44     "Declares which polling engines to try when starting gRPC. "
45     "This is a comma-separated list of engines, which are tried in priority "
46     "order first -> last.")
47 
48 grpc_core::DebugOnlyTraceFlag grpc_polling_trace(
49     false, "polling"); /* Disabled by default */
50 
51 /* Traces fd create/close operations */
52 grpc_core::DebugOnlyTraceFlag grpc_fd_trace(false, "fd_trace");
53 grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount");
54 grpc_core::DebugOnlyTraceFlag grpc_polling_api_trace(false, "polling_api");
55 
56 // Polling API trace only enabled in debug builds
57 #ifndef NDEBUG
58 #define GRPC_POLLING_API_TRACE(format, ...)                  \
59   if (GRPC_TRACE_FLAG_ENABLED(grpc_polling_api_trace)) {     \
60     gpr_log(GPR_INFO, "(polling-api) " format, __VA_ARGS__); \
61   }
62 #else
63 #define GRPC_POLLING_API_TRACE(...)
64 #endif  // NDEBUG
65 
66 /** Default poll() function - a pointer so that it can be overridden by some
67  *  tests */
68 #ifndef GPR_AIX
69 grpc_poll_function_type grpc_poll_function = poll;
70 #else
aix_poll(struct pollfd fds[],nfds_t nfds,int timeout)71 int aix_poll(struct pollfd fds[], nfds_t nfds, int timeout) {
72   return poll(fds, nfds, timeout);
73 }
74 grpc_poll_function_type grpc_poll_function = aix_poll;
75 #endif  // GPR_AIX
76 
77 grpc_wakeup_fd grpc_global_wakeup_fd;
78 
79 static const grpc_event_engine_vtable* g_event_engine = nullptr;
80 static const char* g_poll_strategy_name = nullptr;
81 
82 typedef const grpc_event_engine_vtable* (*event_engine_factory_fn)(
83     bool explicit_request);
84 
85 struct event_engine_factory {
86   const char* name;
87   event_engine_factory_fn factory;
88 };
89 namespace {
90 
91 grpc_poll_function_type real_poll_function;
92 
dummy_poll(struct pollfd fds[],nfds_t nfds,int timeout)93 int dummy_poll(struct pollfd fds[], nfds_t nfds, int timeout) {
94   if (timeout == 0) {
95     return real_poll_function(fds, nfds, 0);
96   } else {
97     gpr_log(GPR_ERROR, "Attempted a blocking poll when declared non-polling.");
98     GPR_ASSERT(false);
99     return -1;
100   }
101 }
102 
init_non_polling(bool explicit_request)103 const grpc_event_engine_vtable* init_non_polling(bool explicit_request) {
104   if (!explicit_request) {
105     return nullptr;
106   }
107   // return the simplest engine as a dummy but also override the poller
108   auto ret = grpc_init_poll_posix(explicit_request);
109   real_poll_function = grpc_poll_function;
110   grpc_poll_function = dummy_poll;
111   grpc_iomgr_mark_non_polling_internal();
112 
113   return ret;
114 }
115 }  // namespace
116 
117 #define ENGINE_HEAD_CUSTOM "head_custom"
118 #define ENGINE_TAIL_CUSTOM "tail_custom"
119 
120 // The global array of event-engine factories. Each entry is a pair with a name
121 // and an event-engine generator function (nullptr if there is no generator
122 // registered for this name). The middle entries are the engines predefined by
123 // open-source gRPC. The head entries represent an opportunity for specific
124 // high-priority custom pollers to be added by the initializer plugins of
125 // custom-built gRPC libraries. The tail entries represent the same, but for
126 // low-priority custom pollers. The actual poller selected is either the first
127 // available one in the list if no specific poller is requested, or the first
128 // specific poller that is requested by name in the GRPC_POLL_STRATEGY
129 // environment variable if that variable is set (which should be a
130 // comma-separated list of one or more event engine names)
131 static event_engine_factory g_factories[] = {
132     {ENGINE_HEAD_CUSTOM, nullptr},        {ENGINE_HEAD_CUSTOM, nullptr},
133     {ENGINE_HEAD_CUSTOM, nullptr},        {ENGINE_HEAD_CUSTOM, nullptr},
134     {"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux},
135     {"poll", grpc_init_poll_posix},       {"none", init_non_polling},
136     {ENGINE_TAIL_CUSTOM, nullptr},        {ENGINE_TAIL_CUSTOM, nullptr},
137     {ENGINE_TAIL_CUSTOM, nullptr},        {ENGINE_TAIL_CUSTOM, nullptr},
138 };
139 
add(const char * beg,const char * end,char *** ss,size_t * ns)140 static void add(const char* beg, const char* end, char*** ss, size_t* ns) {
141   size_t n = *ns;
142   size_t np = n + 1;
143   char* s;
144   size_t len;
145   GPR_ASSERT(end >= beg);
146   len = static_cast<size_t>(end - beg);
147   s = static_cast<char*>(gpr_malloc(len + 1));
148   memcpy(s, beg, len);
149   s[len] = 0;
150   *ss = static_cast<char**>(gpr_realloc(*ss, sizeof(char**) * np));
151   (*ss)[n] = s;
152   *ns = np;
153 }
154 
split(const char * s,char *** ss,size_t * ns)155 static void split(const char* s, char*** ss, size_t* ns) {
156   const char* c = strchr(s, ',');
157   if (c == nullptr) {
158     add(s, s + strlen(s), ss, ns);
159   } else {
160     add(s, c, ss, ns);
161     split(c + 1, ss, ns);
162   }
163 }
164 
is(const char * want,const char * have)165 static bool is(const char* want, const char* have) {
166   return 0 == strcmp(want, "all") || 0 == strcmp(want, have);
167 }
168 
try_engine(const char * engine)169 static void try_engine(const char* engine) {
170   for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
171     if (g_factories[i].factory != nullptr && is(engine, g_factories[i].name)) {
172       if ((g_event_engine = g_factories[i].factory(
173                0 == strcmp(engine, g_factories[i].name)))) {
174         g_poll_strategy_name = g_factories[i].name;
175         gpr_log(GPR_DEBUG, "Using polling engine: %s", g_factories[i].name);
176         return;
177       }
178     }
179   }
180 }
181 
182 /* Call this before calling grpc_event_engine_init() */
grpc_register_event_engine_factory(const char * name,event_engine_factory_fn factory,bool add_at_head)183 void grpc_register_event_engine_factory(const char* name,
184                                         event_engine_factory_fn factory,
185                                         bool add_at_head) {
186   const char* custom_match =
187       add_at_head ? ENGINE_HEAD_CUSTOM : ENGINE_TAIL_CUSTOM;
188 
189   // Overwrite an existing registration if already registered
190   for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
191     if (0 == strcmp(name, g_factories[i].name)) {
192       g_factories[i].factory = factory;
193       return;
194     }
195   }
196 
197   // Otherwise fill in an available custom slot
198   for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
199     if (0 == strcmp(g_factories[i].name, custom_match)) {
200       g_factories[i].name = name;
201       g_factories[i].factory = factory;
202       return;
203     }
204   }
205 
206   // Otherwise fail
207   GPR_ASSERT(false);
208 }
209 
210 /*If grpc_event_engine_init() has been called, returns the poll_strategy_name.
211  * Otherwise, returns nullptr. */
grpc_get_poll_strategy_name()212 const char* grpc_get_poll_strategy_name() { return g_poll_strategy_name; }
213 
grpc_event_engine_init(void)214 void grpc_event_engine_init(void) {
215   grpc_core::UniquePtr<char> value = GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy);
216 
217   char** strings = nullptr;
218   size_t nstrings = 0;
219   split(value.get(), &strings, &nstrings);
220 
221   for (size_t i = 0; g_event_engine == nullptr && i < nstrings; i++) {
222     try_engine(strings[i]);
223   }
224 
225   for (size_t i = 0; i < nstrings; i++) {
226     gpr_free(strings[i]);
227   }
228   gpr_free(strings);
229 
230   if (g_event_engine == nullptr) {
231     gpr_log(GPR_ERROR, "No event engine could be initialized from %s",
232             value.get());
233     abort();
234   }
235 }
236 
grpc_event_engine_shutdown(void)237 void grpc_event_engine_shutdown(void) {
238   g_event_engine->shutdown_engine();
239   g_event_engine = nullptr;
240 }
241 
grpc_event_engine_can_track_errors(void)242 bool grpc_event_engine_can_track_errors(void) {
243   /* Only track errors if platform supports errqueue. */
244   if (grpc_core::kernel_supports_errqueue()) {
245     return g_event_engine->can_track_err;
246   }
247   return false;
248 }
249 
grpc_event_engine_run_in_background(void)250 bool grpc_event_engine_run_in_background(void) {
251   // g_event_engine is nullptr when using a custom iomgr.
252   return g_event_engine != nullptr && g_event_engine->run_in_background;
253 }
254 
grpc_fd_create(int fd,const char * name,bool track_err)255 grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) {
256   GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
257   GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
258   return g_event_engine->fd_create(
259       fd, name, track_err && grpc_event_engine_can_track_errors());
260 }
261 
grpc_fd_wrapped_fd(grpc_fd * fd)262 int grpc_fd_wrapped_fd(grpc_fd* fd) {
263   return g_event_engine->fd_wrapped_fd(fd);
264 }
265 
grpc_fd_orphan(grpc_fd * fd,grpc_closure * on_done,int * release_fd,const char * reason)266 void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd,
267                     const char* reason) {
268   GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %s)", grpc_fd_wrapped_fd(fd),
269                          on_done, release_fd, reason);
270   GRPC_FD_TRACE("grpc_fd_orphan, fd:%d closed", grpc_fd_wrapped_fd(fd));
271 
272   g_event_engine->fd_orphan(fd, on_done, release_fd, reason);
273 }
274 
grpc_fd_shutdown(grpc_fd * fd,grpc_error * why)275 void grpc_fd_shutdown(grpc_fd* fd, grpc_error* why) {
276   GRPC_POLLING_API_TRACE("fd_shutdown(%d)", grpc_fd_wrapped_fd(fd));
277   GRPC_FD_TRACE("fd_shutdown(%d)", grpc_fd_wrapped_fd(fd));
278   g_event_engine->fd_shutdown(fd, why);
279 }
280 
grpc_fd_is_shutdown(grpc_fd * fd)281 bool grpc_fd_is_shutdown(grpc_fd* fd) {
282   return g_event_engine->fd_is_shutdown(fd);
283 }
284 
grpc_fd_notify_on_read(grpc_fd * fd,grpc_closure * closure)285 void grpc_fd_notify_on_read(grpc_fd* fd, grpc_closure* closure) {
286   g_event_engine->fd_notify_on_read(fd, closure);
287 }
288 
grpc_fd_notify_on_write(grpc_fd * fd,grpc_closure * closure)289 void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) {
290   g_event_engine->fd_notify_on_write(fd, closure);
291 }
292 
grpc_fd_notify_on_error(grpc_fd * fd,grpc_closure * closure)293 void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) {
294   g_event_engine->fd_notify_on_error(fd, closure);
295 }
296 
grpc_fd_set_readable(grpc_fd * fd)297 void grpc_fd_set_readable(grpc_fd* fd) { g_event_engine->fd_set_readable(fd); }
298 
grpc_fd_set_writable(grpc_fd * fd)299 void grpc_fd_set_writable(grpc_fd* fd) { g_event_engine->fd_set_writable(fd); }
300 
grpc_fd_set_error(grpc_fd * fd)301 void grpc_fd_set_error(grpc_fd* fd) { g_event_engine->fd_set_error(fd); }
302 
pollset_size(void)303 static size_t pollset_size(void) { return g_event_engine->pollset_size; }
304 
pollset_init(grpc_pollset * pollset,gpr_mu ** mu)305 static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) {
306   GRPC_POLLING_API_TRACE("pollset_init(%p)", pollset);
307   g_event_engine->pollset_init(pollset, mu);
308 }
309 
pollset_shutdown(grpc_pollset * pollset,grpc_closure * closure)310 static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) {
311   GRPC_POLLING_API_TRACE("pollset_shutdown(%p)", pollset);
312   g_event_engine->pollset_shutdown(pollset, closure);
313 }
314 
pollset_destroy(grpc_pollset * pollset)315 static void pollset_destroy(grpc_pollset* pollset) {
316   GRPC_POLLING_API_TRACE("pollset_destroy(%p)", pollset);
317   g_event_engine->pollset_destroy(pollset);
318 }
319 
pollset_work(grpc_pollset * pollset,grpc_pollset_worker ** worker,grpc_millis deadline)320 static grpc_error* pollset_work(grpc_pollset* pollset,
321                                 grpc_pollset_worker** worker,
322                                 grpc_millis deadline) {
323   GRPC_POLLING_API_TRACE("pollset_work(%p, %" PRId64 ") begin", pollset,
324                          deadline);
325   grpc_error* err = g_event_engine->pollset_work(pollset, worker, deadline);
326   GRPC_POLLING_API_TRACE("pollset_work(%p, %" PRId64 ") end", pollset,
327                          deadline);
328   return err;
329 }
330 
pollset_kick(grpc_pollset * pollset,grpc_pollset_worker * specific_worker)331 static grpc_error* pollset_kick(grpc_pollset* pollset,
332                                 grpc_pollset_worker* specific_worker) {
333   GRPC_POLLING_API_TRACE("pollset_kick(%p, %p)", pollset, specific_worker);
334   return g_event_engine->pollset_kick(pollset, specific_worker);
335 }
336 
grpc_pollset_add_fd(grpc_pollset * pollset,struct grpc_fd * fd)337 void grpc_pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd) {
338   GRPC_POLLING_API_TRACE("pollset_add_fd(%p, %d)", pollset,
339                          grpc_fd_wrapped_fd(fd));
340   g_event_engine->pollset_add_fd(pollset, fd);
341 }
342 
pollset_global_init()343 void pollset_global_init() {}
pollset_global_shutdown()344 void pollset_global_shutdown() {}
345 
346 grpc_pollset_vtable grpc_posix_pollset_vtable = {
347     pollset_global_init, pollset_global_shutdown,
348     pollset_init,        pollset_shutdown,
349     pollset_destroy,     pollset_work,
350     pollset_kick,        pollset_size};
351 
pollset_set_create(void)352 static grpc_pollset_set* pollset_set_create(void) {
353   grpc_pollset_set* pss = g_event_engine->pollset_set_create();
354   GRPC_POLLING_API_TRACE("pollset_set_create(%p)", pss);
355   return pss;
356 }
357 
pollset_set_destroy(grpc_pollset_set * pollset_set)358 static void pollset_set_destroy(grpc_pollset_set* pollset_set) {
359   GRPC_POLLING_API_TRACE("pollset_set_destroy(%p)", pollset_set);
360   g_event_engine->pollset_set_destroy(pollset_set);
361 }
362 
pollset_set_add_pollset(grpc_pollset_set * pollset_set,grpc_pollset * pollset)363 static void pollset_set_add_pollset(grpc_pollset_set* pollset_set,
364                                     grpc_pollset* pollset) {
365   GRPC_POLLING_API_TRACE("pollset_set_add_pollset(%p, %p)", pollset_set,
366                          pollset);
367   g_event_engine->pollset_set_add_pollset(pollset_set, pollset);
368 }
369 
pollset_set_del_pollset(grpc_pollset_set * pollset_set,grpc_pollset * pollset)370 static void pollset_set_del_pollset(grpc_pollset_set* pollset_set,
371                                     grpc_pollset* pollset) {
372   GRPC_POLLING_API_TRACE("pollset_set_del_pollset(%p, %p)", pollset_set,
373                          pollset);
374   g_event_engine->pollset_set_del_pollset(pollset_set, pollset);
375 }
376 
pollset_set_add_pollset_set(grpc_pollset_set * bag,grpc_pollset_set * item)377 static void pollset_set_add_pollset_set(grpc_pollset_set* bag,
378                                         grpc_pollset_set* item) {
379   GRPC_POLLING_API_TRACE("pollset_set_add_pollset_set(%p, %p)", bag, item);
380   g_event_engine->pollset_set_add_pollset_set(bag, item);
381 }
382 
pollset_set_del_pollset_set(grpc_pollset_set * bag,grpc_pollset_set * item)383 static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
384                                         grpc_pollset_set* item) {
385   GRPC_POLLING_API_TRACE("pollset_set_del_pollset_set(%p, %p)", bag, item);
386   g_event_engine->pollset_set_del_pollset_set(bag, item);
387 }
388 
389 grpc_pollset_set_vtable grpc_posix_pollset_set_vtable = {
390     pollset_set_create,          pollset_set_destroy,
391     pollset_set_add_pollset,     pollset_set_del_pollset,
392     pollset_set_add_pollset_set, pollset_set_del_pollset_set};
393 
grpc_pollset_set_add_fd(grpc_pollset_set * pollset_set,grpc_fd * fd)394 void grpc_pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
395   GRPC_POLLING_API_TRACE("pollset_set_add_fd(%p, %d)", pollset_set,
396                          grpc_fd_wrapped_fd(fd));
397   g_event_engine->pollset_set_add_fd(pollset_set, fd);
398 }
399 
grpc_pollset_set_del_fd(grpc_pollset_set * pollset_set,grpc_fd * fd)400 void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
401   GRPC_POLLING_API_TRACE("pollset_set_del_fd(%p, %d)", pollset_set,
402                          grpc_fd_wrapped_fd(fd));
403   g_event_engine->pollset_set_del_fd(pollset_set, fd);
404 }
405 
grpc_is_any_background_poller_thread(void)406 bool grpc_is_any_background_poller_thread(void) {
407   return g_event_engine->is_any_background_poller_thread();
408 }
409 
grpc_add_closure_to_background_poller(grpc_closure * closure,grpc_error * error)410 bool grpc_add_closure_to_background_poller(grpc_closure* closure,
411                                            grpc_error* error) {
412   return g_event_engine->add_closure_to_background_poller(closure, error);
413 }
414 
grpc_shutdown_background_closure(void)415 void grpc_shutdown_background_closure(void) {
416   g_event_engine->shutdown_background_closure();
417 }
418 
419 #endif  // GRPC_POSIX_SOCKET_EV
420