• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "src/core/lib/surface/completion_queue.h"
20 
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/log.h>
23 #include <grpc/support/time.h>
24 
25 #include "src/core/lib/gpr/useful.h"
26 #include "src/core/lib/gprpp/thd.h"
27 #include "src/core/lib/iomgr/iomgr.h"
28 #include "test/core/util/test_config.h"
29 
30 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
31 
create_test_tag(void)32 static void* create_test_tag(void) {
33   static intptr_t i = 0;
34   return (void*)(++i);
35 }
36 
37 /* helper for tests to shutdown correctly and tersely */
shutdown_and_destroy(grpc_completion_queue * cc)38 static void shutdown_and_destroy(grpc_completion_queue* cc) {
39   grpc_event ev;
40   grpc_completion_queue_shutdown(cc);
41 
42   switch (grpc_get_cq_completion_type(cc)) {
43     case GRPC_CQ_NEXT: {
44       ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
45                                       nullptr);
46       break;
47     }
48     case GRPC_CQ_PLUCK: {
49       ev = grpc_completion_queue_pluck(
50           cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
51       break;
52     }
53     default: {
54       gpr_log(GPR_ERROR, "Unknown completion type");
55       break;
56     }
57   }
58 
59   GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
60   grpc_completion_queue_destroy(cc);
61 }
62 
do_nothing_end_completion(void *,grpc_cq_completion *)63 static void do_nothing_end_completion(void* /*arg*/,
64                                       grpc_cq_completion* /*c*/) {}
65 
66 struct thread_state {
67   grpc_completion_queue* cc;
68   void* tag;
69 };
70 
pluck_one(void * arg)71 static void pluck_one(void* arg) {
72   struct thread_state* state = static_cast<struct thread_state*>(arg);
73   grpc_completion_queue_pluck(state->cc, state->tag,
74                               gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
75 }
76 
test_too_many_plucks(void)77 static void test_too_many_plucks(void) {
78   grpc_event ev;
79   grpc_completion_queue* cc;
80   void* tags[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
81   grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
82   grpc_core::Thread threads[GPR_ARRAY_SIZE(tags)];
83   struct thread_state thread_states[GPR_ARRAY_SIZE(tags)];
84   grpc_core::ExecCtx exec_ctx;
85   unsigned i, j;
86 
87   LOG_TEST("test_too_many_plucks");
88 
89   cc = grpc_completion_queue_create_for_pluck(nullptr);
90 
91   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
92     tags[i] = create_test_tag();
93     for (j = 0; j < i; j++) {
94       GPR_ASSERT(tags[i] != tags[j]);
95     }
96     thread_states[i].cc = cc;
97     thread_states[i].tag = tags[i];
98     threads[i] =
99         grpc_core::Thread("grpc_pluck_test", pluck_one, thread_states + i);
100     threads[i].Start();
101   }
102 
103   /* wait until all other threads are plucking */
104   gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1000));
105 
106   ev = grpc_completion_queue_pluck(cc, create_test_tag(),
107                                    gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
108   GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
109 
110   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
111     GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
112     grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
113                    nullptr, &completions[i]);
114   }
115 
116   for (auto& th : threads) {
117     th.Join();
118   }
119 
120   shutdown_and_destroy(cc);
121 }
122 
123 #define TEST_THREAD_EVENTS 10000
124 
125 typedef struct test_thread_options {
126   gpr_event on_started;
127   gpr_event* phase1;
128   gpr_event on_phase1_done;
129   gpr_event* phase2;
130   gpr_event on_finished;
131   size_t events_triggered;
132   int id;
133   grpc_completion_queue* cc;
134 } test_thread_options;
135 
ten_seconds_time(void)136 gpr_timespec ten_seconds_time(void) {
137   return grpc_timeout_seconds_to_deadline(10);
138 }
139 
free_completion(void *,grpc_cq_completion * completion)140 static void free_completion(void* /*arg*/, grpc_cq_completion* completion) {
141   gpr_free(completion);
142 }
143 
producer_thread(void * arg)144 static void producer_thread(void* arg) {
145   test_thread_options* opt = static_cast<test_thread_options*>(arg);
146   int i;
147 
148   gpr_log(GPR_INFO, "producer %d started", opt->id);
149   gpr_event_set(&opt->on_started, (void*)static_cast<intptr_t>(1));
150   GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time()));
151 
152   gpr_log(GPR_INFO, "producer %d phase 1", opt->id);
153   for (i = 0; i < TEST_THREAD_EVENTS; i++) {
154     GPR_ASSERT(grpc_cq_begin_op(opt->cc, (void*)(intptr_t)1));
155   }
156 
157   gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id);
158   gpr_event_set(&opt->on_phase1_done, (void*)static_cast<intptr_t>(1));
159   GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time()));
160 
161   gpr_log(GPR_INFO, "producer %d phase 2", opt->id);
162   for (i = 0; i < TEST_THREAD_EVENTS; i++) {
163     grpc_core::ExecCtx exec_ctx;
164     grpc_cq_end_op(opt->cc, (void*)static_cast<intptr_t>(1), GRPC_ERROR_NONE,
165                    free_completion, nullptr,
166                    static_cast<grpc_cq_completion*>(
167                        gpr_malloc(sizeof(grpc_cq_completion))));
168     opt->events_triggered++;
169   }
170 
171   gpr_log(GPR_INFO, "producer %d phase 2 done", opt->id);
172   gpr_event_set(&opt->on_finished, (void*)static_cast<intptr_t>(1));
173 }
174 
consumer_thread(void * arg)175 static void consumer_thread(void* arg) {
176   test_thread_options* opt = static_cast<test_thread_options*>(arg);
177   grpc_event ev;
178 
179   gpr_log(GPR_INFO, "consumer %d started", opt->id);
180   gpr_event_set(&opt->on_started, (void*)static_cast<intptr_t>(1));
181   GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time()));
182 
183   gpr_log(GPR_INFO, "consumer %d phase 1", opt->id);
184 
185   gpr_log(GPR_INFO, "consumer %d phase 1 done", opt->id);
186   gpr_event_set(&opt->on_phase1_done, (void*)static_cast<intptr_t>(1));
187   GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time()));
188 
189   gpr_log(GPR_INFO, "consumer %d phase 2", opt->id);
190   for (;;) {
191     ev = grpc_completion_queue_next(
192         opt->cc, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
193     switch (ev.type) {
194       case GRPC_OP_COMPLETE:
195         GPR_ASSERT(ev.success);
196         opt->events_triggered++;
197         break;
198       case GRPC_QUEUE_SHUTDOWN:
199         gpr_log(GPR_INFO, "consumer %d phase 2 done", opt->id);
200         gpr_event_set(&opt->on_finished, (void*)static_cast<intptr_t>(1));
201         return;
202       case GRPC_QUEUE_TIMEOUT:
203         gpr_log(GPR_ERROR, "Invalid timeout received");
204         abort();
205     }
206   }
207 }
208 
test_threading(size_t producers,size_t consumers)209 static void test_threading(size_t producers, size_t consumers) {
210   test_thread_options* options = static_cast<test_thread_options*>(
211       gpr_malloc((producers + consumers) * sizeof(test_thread_options)));
212   gpr_event phase1 = GPR_EVENT_INIT;
213   gpr_event phase2 = GPR_EVENT_INIT;
214   grpc_completion_queue* cc = grpc_completion_queue_create_for_next(nullptr);
215   size_t i;
216   size_t total_consumed = 0;
217   static int optid = 101;
218 
219   gpr_log(GPR_INFO, "%s: %" PRIuPTR " producers, %" PRIuPTR " consumers",
220           "test_threading", producers, consumers);
221 
222   /* start all threads: they will wait for phase1 */
223   grpc_core::Thread* threads = static_cast<grpc_core::Thread*>(
224       gpr_malloc(sizeof(*threads) * (producers + consumers)));
225   for (i = 0; i < producers + consumers; i++) {
226     gpr_event_init(&options[i].on_started);
227     gpr_event_init(&options[i].on_phase1_done);
228     gpr_event_init(&options[i].on_finished);
229     options[i].phase1 = &phase1;
230     options[i].phase2 = &phase2;
231     options[i].events_triggered = 0;
232     options[i].cc = cc;
233     options[i].id = optid++;
234 
235     bool ok;
236     threads[i] = grpc_core::Thread(
237         i < producers ? "grpc_producer" : "grpc_consumer",
238         i < producers ? producer_thread : consumer_thread, options + i, &ok);
239     GPR_ASSERT(ok);
240     threads[i].Start();
241     gpr_event_wait(&options[i].on_started, ten_seconds_time());
242   }
243 
244   /* start phase1: producers will pre-declare all operations they will
245      complete */
246   gpr_log(GPR_INFO, "start phase 1");
247   gpr_event_set(&phase1, (void*)static_cast<intptr_t>(1));
248 
249   gpr_log(GPR_INFO, "wait phase 1");
250   for (i = 0; i < producers + consumers; i++) {
251     GPR_ASSERT(gpr_event_wait(&options[i].on_phase1_done, ten_seconds_time()));
252   }
253   gpr_log(GPR_INFO, "done phase 1");
254 
255   /* start phase2: operations will complete, and consumers will consume them */
256   gpr_log(GPR_INFO, "start phase 2");
257   gpr_event_set(&phase2, (void*)static_cast<intptr_t>(1));
258 
259   /* in parallel, we shutdown the completion channel - all events should still
260      be consumed */
261   grpc_completion_queue_shutdown(cc);
262 
263   /* join all threads */
264   gpr_log(GPR_INFO, "wait phase 2");
265   for (i = 0; i < producers + consumers; i++) {
266     GPR_ASSERT(gpr_event_wait(&options[i].on_finished, ten_seconds_time()));
267   }
268   gpr_log(GPR_INFO, "done phase 2");
269 
270   /* destroy the completion channel */
271   grpc_completion_queue_destroy(cc);
272 
273   for (i = 0; i < producers + consumers; i++) {
274     threads[i].Join();
275   }
276   gpr_free(threads);
277 
278   /* verify that everything was produced and consumed */
279   for (i = 0; i < producers + consumers; i++) {
280     if (i < producers) {
281       GPR_ASSERT(options[i].events_triggered == TEST_THREAD_EVENTS);
282     } else {
283       total_consumed += options[i].events_triggered;
284     }
285   }
286   GPR_ASSERT(total_consumed == producers * TEST_THREAD_EVENTS);
287 
288   gpr_free(options);
289 }
290 
main(int argc,char ** argv)291 int main(int argc, char** argv) {
292   grpc::testing::TestEnvironment env(argc, argv);
293   grpc_init();
294   test_too_many_plucks();
295   test_threading(1, 1);
296   test_threading(1, 10);
297   test_threading(10, 1);
298   test_threading(10, 10);
299   grpc_shutdown();
300   return 0;
301 }
302