1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18 #include "src/core/lib/iomgr/port.h"
19
20 /* This test only relevant on linux systems where epoll is available */
21 #ifdef GRPC_LINUX_EPOLL_CREATE1
22
23 #include <errno.h>
24 #include <string.h>
25 #include <unistd.h>
26
27 #include <grpc/grpc.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30
31 #include "src/core/lib/gpr/useful.h"
32 #include "src/core/lib/iomgr/ev_posix.h"
33 #include "src/core/lib/iomgr/iomgr.h"
34 #include "test/core/util/test_config.h"
35
36 /*******************************************************************************
37 * test_pollset_set
38 */
39
40 typedef struct test_pollset_set {
41 grpc_pollset_set* pss;
42 } test_pollset_set;
43
init_test_pollset_sets(test_pollset_set * pollset_sets,const int num_pss)44 void init_test_pollset_sets(test_pollset_set* pollset_sets, const int num_pss) {
45 for (int i = 0; i < num_pss; i++) {
46 pollset_sets[i].pss = grpc_pollset_set_create();
47 }
48 }
49
cleanup_test_pollset_sets(test_pollset_set * pollset_sets,const int num_pss)50 void cleanup_test_pollset_sets(test_pollset_set* pollset_sets,
51 const int num_pss) {
52 for (int i = 0; i < num_pss; i++) {
53 grpc_pollset_set_destroy(pollset_sets[i].pss);
54 pollset_sets[i].pss = nullptr;
55 }
56 }
57
58 /*******************************************************************************
59 * test_pollset
60 */
61
62 typedef struct test_pollset {
63 grpc_pollset* ps;
64 gpr_mu* mu;
65 } test_pollset;
66
init_test_pollsets(test_pollset * pollsets,const int num_pollsets)67 static void init_test_pollsets(test_pollset* pollsets, const int num_pollsets) {
68 for (int i = 0; i < num_pollsets; i++) {
69 pollsets[i].ps =
70 static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
71 grpc_pollset_init(pollsets[i].ps, &pollsets[i].mu);
72 }
73 }
74
destroy_pollset(void * p,grpc_error * error)75 static void destroy_pollset(void* p, grpc_error* error) {
76 grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
77 }
78
cleanup_test_pollsets(test_pollset * pollsets,const int num_pollsets)79 static void cleanup_test_pollsets(test_pollset* pollsets,
80 const int num_pollsets) {
81 grpc_closure destroyed;
82 for (int i = 0; i < num_pollsets; i++) {
83 GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, pollsets[i].ps,
84 grpc_schedule_on_exec_ctx);
85 grpc_pollset_shutdown(pollsets[i].ps, &destroyed);
86
87 grpc_core::ExecCtx::Get()->Flush();
88 gpr_free(pollsets[i].ps);
89 pollsets[i].ps = nullptr;
90 }
91 }
92
93 /*******************************************************************************
94 * test_fd
95 */
96
97 typedef struct test_fd {
98 grpc_fd* fd;
99 grpc_wakeup_fd wakeup_fd;
100
101 bool is_on_readable_called; /* Is on_readable closure is called ? */
102 grpc_closure on_readable; /* Closure to call when this fd is readable */
103 } test_fd;
104
on_readable(void * tfd,grpc_error * error)105 void on_readable(void* tfd, grpc_error* error) {
106 (static_cast<test_fd*>(tfd))->is_on_readable_called = true;
107 }
108
reset_test_fd(test_fd * tfd)109 static void reset_test_fd(test_fd* tfd) {
110 tfd->is_on_readable_called = false;
111
112 GRPC_CLOSURE_INIT(&tfd->on_readable, on_readable, tfd,
113 grpc_schedule_on_exec_ctx);
114 grpc_fd_notify_on_read(tfd->fd, &tfd->on_readable);
115 }
116
init_test_fds(test_fd * tfds,const int num_fds)117 static void init_test_fds(test_fd* tfds, const int num_fds) {
118 for (int i = 0; i < num_fds; i++) {
119 GPR_ASSERT(GRPC_ERROR_NONE == grpc_wakeup_fd_init(&tfds[i].wakeup_fd));
120 tfds[i].fd = grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&tfds[i].wakeup_fd),
121 "test_fd", false);
122 reset_test_fd(&tfds[i]);
123 }
124 }
125
cleanup_test_fds(test_fd * tfds,const int num_fds)126 static void cleanup_test_fds(test_fd* tfds, const int num_fds) {
127 int release_fd;
128
129 for (int i = 0; i < num_fds; i++) {
130 grpc_fd_shutdown(tfds[i].fd,
131 GRPC_ERROR_CREATE_FROM_STATIC_STRING("fd cleanup"));
132 grpc_core::ExecCtx::Get()->Flush();
133
134 /* grpc_fd_orphan frees the memory allocated for grpc_fd. Normally it also
135 * calls close() on the underlying fd. In our case, we are using
136 * grpc_wakeup_fd and we would like to destroy it ourselves (by calling
137 * grpc_wakeup_fd_destroy). To prevent grpc_fd from calling close() on the
138 * underlying fd, call it with a non-NULL 'release_fd' parameter */
139 grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, "test_fd_cleanup");
140 grpc_core::ExecCtx::Get()->Flush();
141
142 grpc_wakeup_fd_destroy(&tfds[i].wakeup_fd);
143 }
144 }
145
make_test_fds_readable(test_fd * tfds,const int num_fds)146 static void make_test_fds_readable(test_fd* tfds, const int num_fds) {
147 for (int i = 0; i < num_fds; i++) {
148 GPR_ASSERT(GRPC_ERROR_NONE == grpc_wakeup_fd_wakeup(&tfds[i].wakeup_fd));
149 }
150 }
151
verify_readable_and_reset(test_fd * tfds,const int num_fds)152 static void verify_readable_and_reset(test_fd* tfds, const int num_fds) {
153 for (int i = 0; i < num_fds; i++) {
154 /* Verify that the on_readable callback was called */
155 GPR_ASSERT(tfds[i].is_on_readable_called);
156
157 /* Reset the tfd[i] structure */
158 GPR_ASSERT(GRPC_ERROR_NONE ==
159 grpc_wakeup_fd_consume_wakeup(&tfds[i].wakeup_fd));
160 reset_test_fd(&tfds[i]);
161 }
162 }
163
164 /*******************************************************************************
165 * Main tests
166 */
167
168 /* Test some typical scenarios in pollset_set */
pollset_set_test_basic()169 static void pollset_set_test_basic() {
170 /* We construct the following structure for this test:
171 *
172 * +---> FD0 (Added before PSS1, PS1 and PS2 are added to PSS0)
173 * |
174 * +---> FD5 (Added after PSS1, PS1 and PS2 are added to PSS0)
175 * |
176 * |
177 * | +---> FD1 (Added before PSS1 is added to PSS0)
178 * | |
179 * | +---> FD6 (Added after PSS1 is added to PSS0)
180 * | |
181 * +---> PSS1--+ +--> FD2 (Added before PS0 is added to PSS1)
182 * | | |
183 * | +---> PS0---+
184 * | |
185 * PSS0---+ +--> FD7 (Added after PS0 is added to PSS1)
186 * |
187 * |
188 * | +---> FD3 (Added before PS1 is added to PSS0)
189 * | |
190 * +---> PS1---+
191 * | |
192 * | +---> FD8 (Added after PS1 added to PSS0)
193 * |
194 * |
195 * | +---> FD4 (Added before PS2 is added to PSS0)
196 * | |
197 * +---> PS2---+
198 * |
199 * +---> FD9 (Added after PS2 is added to PSS0)
200 */
201 grpc_core::ExecCtx exec_ctx;
202 grpc_pollset_worker* worker;
203 grpc_millis deadline;
204
205 test_fd tfds[10];
206 test_pollset pollsets[3];
207 test_pollset_set pollset_sets[2];
208 const int num_fds = GPR_ARRAY_SIZE(tfds);
209 const int num_ps = GPR_ARRAY_SIZE(pollsets);
210 const int num_pss = GPR_ARRAY_SIZE(pollset_sets);
211
212 init_test_fds(tfds, num_fds);
213 init_test_pollsets(pollsets, num_ps);
214 init_test_pollset_sets(pollset_sets, num_pss);
215
216 /* Construct the pollset_set/pollset/fd tree (see diagram above) */
217
218 grpc_pollset_set_add_fd(pollset_sets[0].pss, tfds[0].fd);
219 grpc_pollset_set_add_fd(pollset_sets[1].pss, tfds[1].fd);
220
221 grpc_pollset_add_fd(pollsets[0].ps, tfds[2].fd);
222 grpc_pollset_add_fd(pollsets[1].ps, tfds[3].fd);
223 grpc_pollset_add_fd(pollsets[2].ps, tfds[4].fd);
224
225 grpc_pollset_set_add_pollset_set(pollset_sets[0].pss, pollset_sets[1].pss);
226
227 grpc_pollset_set_add_pollset(pollset_sets[1].pss, pollsets[0].ps);
228 grpc_pollset_set_add_pollset(pollset_sets[0].pss, pollsets[1].ps);
229 grpc_pollset_set_add_pollset(pollset_sets[0].pss, pollsets[2].ps);
230
231 grpc_pollset_set_add_fd(pollset_sets[0].pss, tfds[5].fd);
232 grpc_pollset_set_add_fd(pollset_sets[1].pss, tfds[6].fd);
233
234 grpc_pollset_add_fd(pollsets[0].ps, tfds[7].fd);
235 grpc_pollset_add_fd(pollsets[1].ps, tfds[8].fd);
236 grpc_pollset_add_fd(pollsets[2].ps, tfds[9].fd);
237
238 grpc_core::ExecCtx::Get()->Flush();
239
240 /* Test that if any FD in the above structure is readable, it is observable by
241 * doing grpc_pollset_work on any pollset
242 *
243 * For every pollset, do the following:
244 * - (Ensure that all FDs are in reset state)
245 * - Make all FDs readable
246 * - Call grpc_pollset_work() on the pollset
247 * - Flush the exec_ctx
248 * - Verify that on_readable call back was called for all FDs (and
249 * reset the FDs)
250 * */
251 for (int i = 0; i < num_ps; i++) {
252 make_test_fds_readable(tfds, num_fds);
253
254 gpr_mu_lock(pollsets[i].mu);
255 deadline = grpc_timespec_to_millis_round_up(
256 grpc_timeout_milliseconds_to_deadline(2));
257 GPR_ASSERT(GRPC_ERROR_NONE ==
258 grpc_pollset_work(pollsets[i].ps, &worker, deadline));
259 gpr_mu_unlock(pollsets[i].mu);
260
261 grpc_core::ExecCtx::Get()->Flush();
262
263 verify_readable_and_reset(tfds, num_fds);
264 grpc_core::ExecCtx::Get()->Flush();
265 }
266
267 /* Test tear down */
268 grpc_pollset_set_del_fd(pollset_sets[0].pss, tfds[0].fd);
269 grpc_pollset_set_del_fd(pollset_sets[0].pss, tfds[5].fd);
270 grpc_pollset_set_del_fd(pollset_sets[1].pss, tfds[1].fd);
271 grpc_pollset_set_del_fd(pollset_sets[1].pss, tfds[6].fd);
272 grpc_core::ExecCtx::Get()->Flush();
273
274 grpc_pollset_set_del_pollset(pollset_sets[1].pss, pollsets[0].ps);
275 grpc_pollset_set_del_pollset(pollset_sets[0].pss, pollsets[1].ps);
276 grpc_pollset_set_del_pollset(pollset_sets[0].pss, pollsets[2].ps);
277
278 grpc_pollset_set_del_pollset_set(pollset_sets[0].pss, pollset_sets[1].pss);
279 grpc_core::ExecCtx::Get()->Flush();
280
281 cleanup_test_fds(tfds, num_fds);
282 cleanup_test_pollsets(pollsets, num_ps);
283 cleanup_test_pollset_sets(pollset_sets, num_pss);
284 }
285
286 /* Same FD added multiple times to the pollset_set tree */
pollset_set_test_dup_fds()287 void pollset_set_test_dup_fds() {
288 /* We construct the following structure for this test:
289 *
290 * +---> FD0
291 * |
292 * |
293 * PSS0---+
294 * | +---> FD0 (also under PSS0)
295 * | |
296 * +---> PSS1--+ +--> FD1 (also under PSS1)
297 * | |
298 * +---> PS ---+
299 * | |
300 * | +--> FD2
301 * +---> FD1
302 */
303 grpc_core::ExecCtx exec_ctx;
304 grpc_pollset_worker* worker;
305 grpc_millis deadline;
306
307 test_fd tfds[3];
308 test_pollset pollset;
309 test_pollset_set pollset_sets[2];
310 const int num_fds = GPR_ARRAY_SIZE(tfds);
311 const int num_ps = 1;
312 const int num_pss = GPR_ARRAY_SIZE(pollset_sets);
313
314 init_test_fds(tfds, num_fds);
315 init_test_pollsets(&pollset, num_ps);
316 init_test_pollset_sets(pollset_sets, num_pss);
317
318 /* Construct the structure */
319 grpc_pollset_set_add_fd(pollset_sets[0].pss, tfds[0].fd);
320 grpc_pollset_set_add_fd(pollset_sets[1].pss, tfds[0].fd);
321 grpc_pollset_set_add_fd(pollset_sets[1].pss, tfds[1].fd);
322
323 grpc_pollset_add_fd(pollset.ps, tfds[1].fd);
324 grpc_pollset_add_fd(pollset.ps, tfds[2].fd);
325
326 grpc_pollset_set_add_pollset(pollset_sets[1].pss, pollset.ps);
327 grpc_pollset_set_add_pollset_set(pollset_sets[0].pss, pollset_sets[1].pss);
328
329 /* Test. Make all FDs readable and make sure that can be observed by doing a
330 * grpc_pollset_work on the pollset 'PS' */
331 make_test_fds_readable(tfds, num_fds);
332
333 gpr_mu_lock(pollset.mu);
334 deadline = grpc_timespec_to_millis_round_up(
335 grpc_timeout_milliseconds_to_deadline(2));
336 GPR_ASSERT(GRPC_ERROR_NONE ==
337 grpc_pollset_work(pollset.ps, &worker, deadline));
338 gpr_mu_unlock(pollset.mu);
339 grpc_core::ExecCtx::Get()->Flush();
340
341 verify_readable_and_reset(tfds, num_fds);
342 grpc_core::ExecCtx::Get()->Flush();
343
344 /* Tear down */
345 grpc_pollset_set_del_fd(pollset_sets[0].pss, tfds[0].fd);
346 grpc_pollset_set_del_fd(pollset_sets[1].pss, tfds[0].fd);
347 grpc_pollset_set_del_fd(pollset_sets[1].pss, tfds[1].fd);
348
349 grpc_pollset_set_del_pollset(pollset_sets[1].pss, pollset.ps);
350 grpc_pollset_set_del_pollset_set(pollset_sets[0].pss, pollset_sets[1].pss);
351 grpc_core::ExecCtx::Get()->Flush();
352
353 cleanup_test_fds(tfds, num_fds);
354 cleanup_test_pollsets(&pollset, num_ps);
355 cleanup_test_pollset_sets(pollset_sets, num_pss);
356 }
357
358 /* Pollset_set with an empty pollset */
pollset_set_test_empty_pollset()359 void pollset_set_test_empty_pollset() {
360 /* We construct the following structure for this test:
361 *
362 * +---> PS0 (EMPTY)
363 * |
364 * +---> FD0
365 * |
366 * PSS0---+
367 * | +---> FD1
368 * | |
369 * +---> PS1--+
370 * |
371 * +---> FD2
372 */
373 grpc_core::ExecCtx exec_ctx;
374 grpc_pollset_worker* worker;
375 grpc_millis deadline;
376
377 test_fd tfds[3];
378 test_pollset pollsets[2];
379 test_pollset_set pollset_set;
380 const int num_fds = GPR_ARRAY_SIZE(tfds);
381 const int num_ps = GPR_ARRAY_SIZE(pollsets);
382 const int num_pss = 1;
383
384 init_test_fds(tfds, num_fds);
385 init_test_pollsets(pollsets, num_ps);
386 init_test_pollset_sets(&pollset_set, num_pss);
387
388 /* Construct the structure */
389 grpc_pollset_set_add_fd(pollset_set.pss, tfds[0].fd);
390 grpc_pollset_add_fd(pollsets[1].ps, tfds[1].fd);
391 grpc_pollset_add_fd(pollsets[1].ps, tfds[2].fd);
392
393 grpc_pollset_set_add_pollset(pollset_set.pss, pollsets[0].ps);
394 grpc_pollset_set_add_pollset(pollset_set.pss, pollsets[1].ps);
395
396 /* Test. Make all FDs readable and make sure that can be observed by doing
397 * grpc_pollset_work on the empty pollset 'PS0' */
398 make_test_fds_readable(tfds, num_fds);
399
400 gpr_mu_lock(pollsets[0].mu);
401 deadline = grpc_timespec_to_millis_round_up(
402 grpc_timeout_milliseconds_to_deadline(2));
403 GPR_ASSERT(GRPC_ERROR_NONE ==
404 grpc_pollset_work(pollsets[0].ps, &worker, deadline));
405 gpr_mu_unlock(pollsets[0].mu);
406 grpc_core::ExecCtx::Get()->Flush();
407
408 verify_readable_and_reset(tfds, num_fds);
409 grpc_core::ExecCtx::Get()->Flush();
410
411 /* Tear down */
412 grpc_pollset_set_del_fd(pollset_set.pss, tfds[0].fd);
413 grpc_pollset_set_del_pollset(pollset_set.pss, pollsets[0].ps);
414 grpc_pollset_set_del_pollset(pollset_set.pss, pollsets[1].ps);
415 grpc_core::ExecCtx::Get()->Flush();
416
417 cleanup_test_fds(tfds, num_fds);
418 cleanup_test_pollsets(pollsets, num_ps);
419 cleanup_test_pollset_sets(&pollset_set, num_pss);
420 }
421
main(int argc,char ** argv)422 int main(int argc, char** argv) {
423 grpc_test_init(argc, argv);
424 grpc_init();
425 {
426 grpc_core::ExecCtx exec_ctx;
427 const char* poll_strategy = grpc_get_poll_strategy_name();
428
429 if (poll_strategy != nullptr &&
430 (strcmp(poll_strategy, "epollsig") == 0 ||
431 strcmp(poll_strategy, "epoll-threadpool") == 0)) {
432 pollset_set_test_basic();
433 pollset_set_test_dup_fds();
434 pollset_set_test_empty_pollset();
435 } else {
436 gpr_log(GPR_INFO,
437 "Skipping the test. The test is only relevant for 'epoll' "
438 "strategy. and the current strategy is: '%s'",
439 poll_strategy);
440 }
441 }
442 grpc_shutdown();
443 return 0;
444 }
445 #else /* defined(GRPC_LINUX_EPOLL_CREATE1) */
main(int argc,char ** argv)446 int main(int argc, char** argv) { return 0; }
447 #endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */
448