• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 #include "src/core/lib/iomgr/port.h"
19 
20 /* This test only relevant on linux systems where epoll() is available */
21 #ifdef GRPC_LINUX_EPOLL_CREATE1
22 #include "src/core/lib/iomgr/ev_epollsig_linux.h"
23 #include "src/core/lib/iomgr/ev_posix.h"
24 
25 #include <errno.h>
26 #include <string.h>
27 #include <unistd.h>
28 
29 #include <grpc/grpc.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 
33 #include "src/core/lib/gpr/useful.h"
34 #include "src/core/lib/gprpp/thd.h"
35 #include "src/core/lib/iomgr/iomgr.h"
36 #include "test/core/util/test_config.h"
37 
38 typedef struct test_pollset {
39   grpc_pollset* pollset;
40   gpr_mu* mu;
41 } test_pollset;
42 
43 typedef struct test_fd {
44   int inner_fd;
45   grpc_fd* fd;
46 } test_fd;
47 
48 /* num_fds should be an even number */
test_fd_init(test_fd * tfds,int * fds,int num_fds)49 static void test_fd_init(test_fd* tfds, int* fds, int num_fds) {
50   int i;
51   int r;
52 
53   /* Create some dummy file descriptors. Currently using pipe file descriptors
54    * for this test but we could use any other type of file descriptors. Also,
55    * since pipe() used in this test creates two fds in each call, num_fds should
56    * be an even number */
57   GPR_ASSERT((num_fds % 2) == 0);
58   for (i = 0; i < num_fds; i = i + 2) {
59     r = pipe(fds + i);
60     if (r != 0) {
61       gpr_log(GPR_ERROR, "Error in creating pipe. %d (%s)", errno,
62               strerror(errno));
63       return;
64     }
65   }
66 
67   for (i = 0; i < num_fds; i++) {
68     tfds[i].inner_fd = fds[i];
69     tfds[i].fd = grpc_fd_create(fds[i], "test_fd", false);
70   }
71 }
72 
test_fd_cleanup(test_fd * tfds,int num_fds)73 static void test_fd_cleanup(test_fd* tfds, int num_fds) {
74   int release_fd;
75   int i;
76 
77   for (i = 0; i < num_fds; i++) {
78     grpc_fd_shutdown(tfds[i].fd,
79                      GRPC_ERROR_CREATE_FROM_STATIC_STRING("test_fd_cleanup"));
80     grpc_core::ExecCtx::Get()->Flush();
81 
82     grpc_fd_orphan(tfds[i].fd, nullptr, &release_fd, "test_fd_cleanup");
83     grpc_core::ExecCtx::Get()->Flush();
84 
85     GPR_ASSERT(release_fd == tfds[i].inner_fd);
86     close(tfds[i].inner_fd);
87   }
88 }
89 
test_pollset_init(test_pollset * pollsets,int num_pollsets)90 static void test_pollset_init(test_pollset* pollsets, int num_pollsets) {
91   int i;
92   for (i = 0; i < num_pollsets; i++) {
93     pollsets[i].pollset =
94         static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
95     grpc_pollset_init(pollsets[i].pollset, &pollsets[i].mu);
96   }
97 }
98 
destroy_pollset(void * p,grpc_error * error)99 static void destroy_pollset(void* p, grpc_error* error) {
100   grpc_pollset_destroy(static_cast<grpc_pollset*>(p));
101 }
102 
test_pollset_cleanup(test_pollset * pollsets,int num_pollsets)103 static void test_pollset_cleanup(test_pollset* pollsets, int num_pollsets) {
104   grpc_closure destroyed;
105   int i;
106 
107   for (i = 0; i < num_pollsets; i++) {
108     GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, pollsets[i].pollset,
109                       grpc_schedule_on_exec_ctx);
110     grpc_pollset_shutdown(pollsets[i].pollset, &destroyed);
111 
112     grpc_core::ExecCtx::Get()->Flush();
113     gpr_free(pollsets[i].pollset);
114   }
115 }
116 
117   /*
118    * Cases to test:
119    *  case 1) Polling islands of both fd and pollset are NULL
120    *  case 2) Polling island of fd is NULL but that of pollset is not-NULL
121    *  case 3) Polling island of fd is not-NULL but that of pollset is NULL
122    *  case 4) Polling islands of both fd and pollset are not-NULL and:
123    *     case 4.1) Polling islands of fd and pollset are equal
124    *     case 4.2) Polling islands of fd and pollset are NOT-equal (This results
125    *     in a merge)
126    * */
127 
128 #define NUM_FDS 8
129 #define NUM_POLLSETS 4
130 
test_add_fd_to_pollset()131 static void test_add_fd_to_pollset() {
132   grpc_core::ExecCtx exec_ctx;
133   test_fd tfds[NUM_FDS];
134   int fds[NUM_FDS];
135   test_pollset pollsets[NUM_POLLSETS];
136   void* expected_pi = nullptr;
137   int i;
138 
139   test_fd_init(tfds, fds, NUM_FDS);
140   test_pollset_init(pollsets, NUM_POLLSETS);
141 
142   /*Step 1.
143    * Create three polling islands (This will exercise test case 1 and 2) with
144    * the following configuration:
145    *   polling island 0 = { fds:0,1,2, pollsets:0}
146    *   polling island 1 = { fds:3,4,   pollsets:1}
147    *   polling island 2 = { fds:5,6,7  pollsets:2}
148    *
149    *Step 2.
150    * Add pollset 3 to polling island 0 (by adding fds 0 and 1 to pollset 3)
151    * (This will exercise test cases 3 and 4.1). The configuration becomes:
152    *   polling island 0 = { fds:0,1,2, pollsets:0,3} <<< pollset 3 added here
153    *   polling island 1 = { fds:3,4,   pollsets:1}
154    *   polling island 2 = { fds:5,6,7  pollsets:2}
155    *
156    *Step 3.
157    * Merge polling islands 0 and 1 by adding fd 0 to pollset 1 (This will
158    * exercise test case 4.2). The configuration becomes:
159    *   polling island (merged) = {fds: 0,1,2,3,4, pollsets: 0,1,3}
160    *   polling island 2 = {fds: 5,6,7 pollsets: 2}
161    *
162    *Step 4.
163    * Finally do one more merge by adding fd 3 to pollset 2.
164    *   polling island (merged) = {fds: 0,1,2,3,4,5,6,7, pollsets: 0,1,2,3}
165    */
166 
167   /* == Step 1 == */
168   for (i = 0; i <= 2; i++) {
169     grpc_pollset_add_fd(pollsets[0].pollset, tfds[i].fd);
170     grpc_core::ExecCtx::Get()->Flush();
171   }
172 
173   for (i = 3; i <= 4; i++) {
174     grpc_pollset_add_fd(pollsets[1].pollset, tfds[i].fd);
175     grpc_core::ExecCtx::Get()->Flush();
176   }
177 
178   for (i = 5; i <= 7; i++) {
179     grpc_pollset_add_fd(pollsets[2].pollset, tfds[i].fd);
180     grpc_core::ExecCtx::Get()->Flush();
181   }
182 
183   /* == Step 2 == */
184   for (i = 0; i <= 1; i++) {
185     grpc_pollset_add_fd(pollsets[3].pollset, tfds[i].fd);
186     grpc_core::ExecCtx::Get()->Flush();
187   }
188 
189   /* == Step 3 == */
190   grpc_pollset_add_fd(pollsets[1].pollset, tfds[0].fd);
191   grpc_core::ExecCtx::Get()->Flush();
192 
193   /* == Step 4 == */
194   grpc_pollset_add_fd(pollsets[2].pollset, tfds[3].fd);
195   grpc_core::ExecCtx::Get()->Flush();
196 
197   /* All polling islands are merged at this point */
198 
199   /* Compare Fd:0's polling island with that of all other Fds */
200   expected_pi = grpc_fd_get_polling_island(tfds[0].fd);
201   for (i = 1; i < NUM_FDS; i++) {
202     GPR_ASSERT(grpc_are_polling_islands_equal(
203         expected_pi, grpc_fd_get_polling_island(tfds[i].fd)));
204   }
205 
206   /* Compare Fd:0's polling island with that of all other pollsets */
207   for (i = 0; i < NUM_POLLSETS; i++) {
208     GPR_ASSERT(grpc_are_polling_islands_equal(
209         expected_pi, grpc_pollset_get_polling_island(pollsets[i].pollset)));
210   }
211 
212   test_fd_cleanup(tfds, NUM_FDS);
213   test_pollset_cleanup(pollsets, NUM_POLLSETS);
214 }
215 
216 #undef NUM_FDS
217 #undef NUM_POLLSETS
218 
219 typedef struct threading_shared {
220   gpr_mu* mu;
221   grpc_pollset* pollset;
222   grpc_wakeup_fd* wakeup_fd;
223   grpc_fd* wakeup_desc;
224   grpc_closure on_wakeup;
225   int wakeups;
226 } threading_shared;
227 
228 static __thread int thread_wakeups = 0;
229 
test_threading_loop(void * arg)230 static void test_threading_loop(void* arg) {
231   threading_shared* shared = static_cast<threading_shared*>(arg);
232   while (thread_wakeups < 1000000) {
233     grpc_core::ExecCtx exec_ctx;
234     grpc_pollset_worker* worker;
235     gpr_mu_lock(shared->mu);
236     GPR_ASSERT(GRPC_LOG_IF_ERROR(
237         "pollset_work",
238         grpc_pollset_work(shared->pollset, &worker, GRPC_MILLIS_INF_FUTURE)));
239     gpr_mu_unlock(shared->mu);
240   }
241 }
242 
test_threading_wakeup(void * arg,grpc_error * error)243 static void test_threading_wakeup(void* arg, grpc_error* error) {
244   threading_shared* shared = static_cast<threading_shared*>(arg);
245   ++shared->wakeups;
246   ++thread_wakeups;
247   if (error == GRPC_ERROR_NONE) {
248     GPR_ASSERT(GRPC_LOG_IF_ERROR(
249         "consume_wakeup", grpc_wakeup_fd_consume_wakeup(shared->wakeup_fd)));
250     grpc_fd_notify_on_read(shared->wakeup_desc, &shared->on_wakeup);
251     GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_next",
252                                  grpc_wakeup_fd_wakeup(shared->wakeup_fd)));
253   }
254 }
255 
test_threading(void)256 static void test_threading(void) {
257   threading_shared shared;
258   shared.pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
259   grpc_pollset_init(shared.pollset, &shared.mu);
260 
261   grpc_core::Thread thds[10];
262   for (auto& th : thds) {
263     th = grpc_core::Thread("test_thread", test_threading_loop, &shared);
264     th.Start();
265   }
266   grpc_wakeup_fd fd;
267   GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_fd_init", grpc_wakeup_fd_init(&fd)));
268   shared.wakeup_fd = &fd;
269   shared.wakeup_desc = grpc_fd_create(fd.read_fd, "wakeup", false);
270   shared.wakeups = 0;
271   {
272     grpc_core::ExecCtx exec_ctx;
273     grpc_pollset_add_fd(shared.pollset, shared.wakeup_desc);
274     grpc_fd_notify_on_read(
275         shared.wakeup_desc,
276         GRPC_CLOSURE_INIT(&shared.on_wakeup, test_threading_wakeup, &shared,
277                           grpc_schedule_on_exec_ctx));
278   }
279   GPR_ASSERT(GRPC_LOG_IF_ERROR("wakeup_first",
280                                grpc_wakeup_fd_wakeup(shared.wakeup_fd)));
281   for (auto& th : thds) {
282     th.Join();
283   }
284   fd.read_fd = 0;
285   grpc_wakeup_fd_destroy(&fd);
286   {
287     grpc_core::ExecCtx exec_ctx;
288     grpc_fd_shutdown(shared.wakeup_desc, GRPC_ERROR_CANCELLED);
289     grpc_fd_orphan(shared.wakeup_desc, nullptr, nullptr, "done");
290     grpc_pollset_shutdown(shared.pollset,
291                           GRPC_CLOSURE_CREATE(destroy_pollset, shared.pollset,
292                                               grpc_schedule_on_exec_ctx));
293   }
294   gpr_free(shared.pollset);
295 }
296 
main(int argc,char ** argv)297 int main(int argc, char** argv) {
298   const char* poll_strategy = nullptr;
299   grpc_test_init(argc, argv);
300   grpc_init();
301   {
302     grpc_core::ExecCtx exec_ctx;
303 
304     poll_strategy = grpc_get_poll_strategy_name();
305     if (poll_strategy != nullptr && strcmp(poll_strategy, "epollsig") == 0) {
306       test_add_fd_to_pollset();
307       test_threading();
308     } else {
309       gpr_log(GPR_INFO,
310               "Skipping the test. The test is only relevant for 'epollsig' "
311               "strategy. and the current strategy is: '%s'",
312               poll_strategy);
313     }
314   }
315 
316   grpc_shutdown();
317   return 0;
318 }
319 #else  /* defined(GRPC_LINUX_EPOLL_CREATE1) */
main(int argc,char ** argv)320 int main(int argc, char** argv) { return 0; }
321 #endif /* !defined(GRPC_LINUX_EPOLL_CREATE1) */
322