• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/grpc.h>
20 #include <grpc/support/alloc.h>
21 #include <grpc/support/sync.h>
22 #include <grpc/support/time.h>
23 #include <inttypes.h>
24 #include <stdlib.h>
25 
26 #include <memory>
27 
28 #include "absl/log/log.h"
29 #include "absl/status/status.h"
30 #include "gtest/gtest.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32 #include "src/core/lib/surface/completion_queue.h"
33 #include "src/core/util/crash.h"
34 #include "src/core/util/thd.h"
35 #include "src/core/util/useful.h"
36 #include "test/core/test_util/test_config.h"
37 
38 #define LOG_TEST(x) LOG(INFO) << x
39 
create_test_tag(void)40 static void* create_test_tag(void) {
41   static intptr_t i = 0;
42   return reinterpret_cast<void*>(++i);
43 }
44 
45 // helper for tests to shutdown correctly and tersely
shutdown_and_destroy(grpc_completion_queue * cc)46 static void shutdown_and_destroy(grpc_completion_queue* cc) {
47   grpc_event ev;
48   grpc_completion_queue_shutdown(cc);
49 
50   switch (grpc_get_cq_completion_type(cc)) {
51     case GRPC_CQ_NEXT: {
52       ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
53                                       nullptr);
54       break;
55     }
56     case GRPC_CQ_PLUCK: {
57       ev = grpc_completion_queue_pluck(
58           cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
59       break;
60     }
61     default: {
62       LOG(ERROR) << "Unknown completion type";
63       break;
64     }
65   }
66 
67   ASSERT_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
68   grpc_completion_queue_destroy(cc);
69 }
70 
do_nothing_end_completion(void *,grpc_cq_completion *)71 static void do_nothing_end_completion(void* /*arg*/,
72                                       grpc_cq_completion* /*c*/) {}
73 
74 struct thread_state {
75   grpc_completion_queue* cc;
76   void* tag;
77 };
78 
pluck_one(void * arg)79 static void pluck_one(void* arg) {
80   struct thread_state* state = static_cast<struct thread_state*>(arg);
81   grpc_completion_queue_pluck(state->cc, state->tag,
82                               gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
83 }
84 
test_too_many_plucks(void)85 static void test_too_many_plucks(void) {
86   grpc_event ev;
87   grpc_completion_queue* cc;
88   void* tags[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
89   grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
90   grpc_core::Thread threads[GPR_ARRAY_SIZE(tags)];
91   struct thread_state thread_states[GPR_ARRAY_SIZE(tags)];
92   grpc_core::ExecCtx exec_ctx;
93   unsigned i, j;
94 
95   LOG_TEST("test_too_many_plucks");
96 
97   cc = grpc_completion_queue_create_for_pluck(nullptr);
98 
99   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
100     tags[i] = create_test_tag();
101     for (j = 0; j < i; j++) {
102       ASSERT_NE(tags[i], tags[j]);
103     }
104     thread_states[i].cc = cc;
105     thread_states[i].tag = tags[i];
106     threads[i] =
107         grpc_core::Thread("grpc_pluck_test", pluck_one, thread_states + i);
108     threads[i].Start();
109   }
110 
111   // wait until all other threads are plucking
112   gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1000));
113 
114   ev = grpc_completion_queue_pluck(cc, create_test_tag(),
115                                    gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
116   ASSERT_EQ(ev.type, GRPC_QUEUE_TIMEOUT);
117 
118   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
119     ASSERT_TRUE(grpc_cq_begin_op(cc, tags[i]));
120     grpc_cq_end_op(cc, tags[i], absl::OkStatus(), do_nothing_end_completion,
121                    nullptr, &completions[i]);
122   }
123 
124   for (auto& th : threads) {
125     th.Join();
126   }
127 
128   shutdown_and_destroy(cc);
129 }
130 
131 #define TEST_THREAD_EVENTS 10000
132 
133 typedef struct test_thread_options {
134   gpr_event on_started;
135   gpr_event* phase1;
136   gpr_event on_phase1_done;
137   gpr_event* phase2;
138   gpr_event on_finished;
139   size_t events_triggered;
140   int id;
141   grpc_completion_queue* cc;
142 } test_thread_options;
143 
ten_seconds_time(void)144 gpr_timespec ten_seconds_time(void) {
145   return grpc_timeout_seconds_to_deadline(10);
146 }
147 
free_completion(void *,grpc_cq_completion * completion)148 static void free_completion(void* /*arg*/, grpc_cq_completion* completion) {
149   gpr_free(completion);
150 }
151 
producer_thread(void * arg)152 static void producer_thread(void* arg) {
153   test_thread_options* opt = static_cast<test_thread_options*>(arg);
154   int i;
155 
156   LOG(INFO) << "producer " << opt->id << " started";
157   gpr_event_set(&opt->on_started, reinterpret_cast<void*>(1));
158   ASSERT_TRUE(gpr_event_wait(opt->phase1, ten_seconds_time()));
159 
160   LOG(INFO) << "producer " << opt->id << " phase 1";
161   for (i = 0; i < TEST_THREAD_EVENTS; i++) {
162     ASSERT_TRUE(grpc_cq_begin_op(opt->cc, (void*)(intptr_t)1));
163   }
164 
165   LOG(INFO) << "producer " << opt->id << " phase 1 done";
166   gpr_event_set(&opt->on_phase1_done, reinterpret_cast<void*>(1));
167   ASSERT_TRUE(gpr_event_wait(opt->phase2, ten_seconds_time()));
168 
169   LOG(INFO) << "producer " << opt->id << " phase 2";
170   for (i = 0; i < TEST_THREAD_EVENTS; i++) {
171     grpc_core::ExecCtx exec_ctx;
172     grpc_cq_end_op(opt->cc, reinterpret_cast<void*>(1), absl::OkStatus(),
173                    free_completion, nullptr,
174                    static_cast<grpc_cq_completion*>(
175                        gpr_malloc(sizeof(grpc_cq_completion))));
176     opt->events_triggered++;
177   }
178 
179   LOG(INFO) << "producer " << opt->id << " phase 2 done";
180   gpr_event_set(&opt->on_finished, reinterpret_cast<void*>(1));
181 }
182 
consumer_thread(void * arg)183 static void consumer_thread(void* arg) {
184   test_thread_options* opt = static_cast<test_thread_options*>(arg);
185   grpc_event ev;
186 
187   LOG(INFO) << "consumer " << opt->id << " started";
188   gpr_event_set(&opt->on_started, reinterpret_cast<void*>(1));
189   ASSERT_TRUE(gpr_event_wait(opt->phase1, ten_seconds_time()));
190 
191   LOG(INFO) << "consumer " << opt->id << " phase 1";
192 
193   LOG(INFO) << "consumer " << opt->id << " phase 1 done";
194   gpr_event_set(&opt->on_phase1_done, reinterpret_cast<void*>(1));
195   ASSERT_TRUE(gpr_event_wait(opt->phase2, ten_seconds_time()));
196 
197   LOG(INFO) << "consumer " << opt->id << " phase 2";
198   for (;;) {
199     ev = grpc_completion_queue_next(
200         opt->cc, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
201     switch (ev.type) {
202       case GRPC_OP_COMPLETE:
203         ASSERT_TRUE(ev.success);
204         opt->events_triggered++;
205         break;
206       case GRPC_QUEUE_SHUTDOWN:
207         LOG(INFO) << "consumer " << opt->id << " phase 2 done";
208         gpr_event_set(&opt->on_finished, reinterpret_cast<void*>(1));
209         return;
210       case GRPC_QUEUE_TIMEOUT:
211         grpc_core::Crash("Invalid timeout received");
212     }
213   }
214 }
215 
test_threading(size_t producers,size_t consumers)216 static void test_threading(size_t producers, size_t consumers) {
217   test_thread_options* options = static_cast<test_thread_options*>(
218       gpr_malloc((producers + consumers) * sizeof(test_thread_options)));
219   gpr_event phase1 = GPR_EVENT_INIT;
220   gpr_event phase2 = GPR_EVENT_INIT;
221   grpc_completion_queue* cc = grpc_completion_queue_create_for_next(nullptr);
222   size_t i;
223   size_t total_consumed = 0;
224   static int optid = 101;
225 
226   LOG(INFO) << "test_threading: " << producers << " producers, " << consumers
227             << " consumers";
228 
229   // start all threads: they will wait for phase1
230   grpc_core::Thread* threads = static_cast<grpc_core::Thread*>(
231       gpr_malloc(sizeof(*threads) * (producers + consumers)));
232   for (i = 0; i < producers + consumers; i++) {
233     gpr_event_init(&options[i].on_started);
234     gpr_event_init(&options[i].on_phase1_done);
235     gpr_event_init(&options[i].on_finished);
236     options[i].phase1 = &phase1;
237     options[i].phase2 = &phase2;
238     options[i].events_triggered = 0;
239     options[i].cc = cc;
240     options[i].id = optid++;
241 
242     bool ok;
243     threads[i] = grpc_core::Thread(
244         i < producers ? "grpc_producer" : "grpc_consumer",
245         i < producers ? producer_thread : consumer_thread, options + i, &ok);
246     ASSERT_TRUE(ok);
247     threads[i].Start();
248     gpr_event_wait(&options[i].on_started, ten_seconds_time());
249   }
250 
251   // start phase1: producers will pre-declare all operations they will
252   // complete
253   LOG(INFO) << "start phase 1";
254   gpr_event_set(&phase1, reinterpret_cast<void*>(1));
255 
256   LOG(INFO) << "wait phase 1";
257   for (i = 0; i < producers + consumers; i++) {
258     ASSERT_TRUE(gpr_event_wait(&options[i].on_phase1_done, ten_seconds_time()));
259   }
260   LOG(INFO) << "done phase 1";
261 
262   // start phase2: operations will complete, and consumers will consume them
263   LOG(INFO) << "start phase 2";
264   gpr_event_set(&phase2, reinterpret_cast<void*>(1));
265 
266   // in parallel, we shutdown the completion channel - all events should still
267   // be consumed
268   grpc_completion_queue_shutdown(cc);
269 
270   // join all threads
271   LOG(INFO) << "wait phase 2";
272   for (i = 0; i < producers + consumers; i++) {
273     ASSERT_TRUE(gpr_event_wait(&options[i].on_finished, ten_seconds_time()));
274   }
275   LOG(INFO) << "done phase 2";
276 
277   // destroy the completion channel
278   grpc_completion_queue_destroy(cc);
279 
280   for (i = 0; i < producers + consumers; i++) {
281     threads[i].Join();
282   }
283   gpr_free(threads);
284 
285   // verify that everything was produced and consumed
286   for (i = 0; i < producers + consumers; i++) {
287     if (i < producers) {
288       ASSERT_EQ(options[i].events_triggered, TEST_THREAD_EVENTS);
289     } else {
290       total_consumed += options[i].events_triggered;
291     }
292   }
293   ASSERT_EQ(total_consumed, producers * TEST_THREAD_EVENTS);
294 
295   gpr_free(options);
296 }
297 
TEST(CompletionQueueThreadingTest,MainTest)298 TEST(CompletionQueueThreadingTest, MainTest) {
299   grpc_init();
300   test_too_many_plucks();
301   test_threading(1, 1);
302   test_threading(1, 10);
303   test_threading(10, 1);
304   test_threading(10, 10);
305   grpc_shutdown();
306 }
307 
main(int argc,char ** argv)308 int main(int argc, char** argv) {
309   grpc::testing::TestEnvironment env(&argc, argv);
310   ::testing::InitGoogleTest(&argc, argv);
311   grpc::testing::TestGrpcScope grpc_scope;
312   return RUN_ALL_TESTS();
313 }
314