1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "src/core/lib/surface/completion_queue.h"
20
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/log.h>
23 #include <grpc/support/time.h>
24
25 #include "src/core/lib/gpr/useful.h"
26 #include "src/core/lib/gprpp/thd.h"
27 #include "src/core/lib/iomgr/iomgr.h"
28 #include "test/core/util/test_config.h"
29
30 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
31
create_test_tag(void)32 static void* create_test_tag(void) {
33 static intptr_t i = 0;
34 return (void*)(++i);
35 }
36
37 /* helper for tests to shutdown correctly and tersely */
shutdown_and_destroy(grpc_completion_queue * cc)38 static void shutdown_and_destroy(grpc_completion_queue* cc) {
39 grpc_event ev;
40 grpc_completion_queue_shutdown(cc);
41
42 switch (grpc_get_cq_completion_type(cc)) {
43 case GRPC_CQ_NEXT: {
44 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
45 nullptr);
46 break;
47 }
48 case GRPC_CQ_PLUCK: {
49 ev = grpc_completion_queue_pluck(
50 cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
51 break;
52 }
53 default: {
54 gpr_log(GPR_ERROR, "Unknown completion type");
55 break;
56 }
57 }
58
59 GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
60 grpc_completion_queue_destroy(cc);
61 }
62
do_nothing_end_completion(void * arg,grpc_cq_completion * c)63 static void do_nothing_end_completion(void* arg, grpc_cq_completion* c) {}
64
65 struct thread_state {
66 grpc_completion_queue* cc;
67 void* tag;
68 };
69
pluck_one(void * arg)70 static void pluck_one(void* arg) {
71 struct thread_state* state = static_cast<struct thread_state*>(arg);
72 grpc_completion_queue_pluck(state->cc, state->tag,
73 gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
74 }
75
test_too_many_plucks(void)76 static void test_too_many_plucks(void) {
77 grpc_event ev;
78 grpc_completion_queue* cc;
79 void* tags[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
80 grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
81 grpc_core::Thread threads[GPR_ARRAY_SIZE(tags)];
82 struct thread_state thread_states[GPR_ARRAY_SIZE(tags)];
83 grpc_core::ExecCtx exec_ctx;
84 unsigned i, j;
85
86 LOG_TEST("test_too_many_plucks");
87
88 cc = grpc_completion_queue_create_for_pluck(nullptr);
89
90 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
91 tags[i] = create_test_tag();
92 for (j = 0; j < i; j++) {
93 GPR_ASSERT(tags[i] != tags[j]);
94 }
95 thread_states[i].cc = cc;
96 thread_states[i].tag = tags[i];
97 threads[i] =
98 grpc_core::Thread("grpc_pluck_test", pluck_one, thread_states + i);
99 threads[i].Start();
100 }
101
102 /* wait until all other threads are plucking */
103 gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(1000));
104
105 ev = grpc_completion_queue_pluck(cc, create_test_tag(),
106 gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
107 GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
108
109 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
110 GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
111 grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
112 nullptr, &completions[i]);
113 }
114
115 for (auto& th : threads) {
116 th.Join();
117 }
118
119 shutdown_and_destroy(cc);
120 }
121
122 #define TEST_THREAD_EVENTS 10000
123
124 typedef struct test_thread_options {
125 gpr_event on_started;
126 gpr_event* phase1;
127 gpr_event on_phase1_done;
128 gpr_event* phase2;
129 gpr_event on_finished;
130 size_t events_triggered;
131 int id;
132 grpc_completion_queue* cc;
133 } test_thread_options;
134
ten_seconds_time(void)135 gpr_timespec ten_seconds_time(void) {
136 return grpc_timeout_seconds_to_deadline(10);
137 }
138
free_completion(void * arg,grpc_cq_completion * completion)139 static void free_completion(void* arg, grpc_cq_completion* completion) {
140 gpr_free(completion);
141 }
142
producer_thread(void * arg)143 static void producer_thread(void* arg) {
144 test_thread_options* opt = static_cast<test_thread_options*>(arg);
145 int i;
146
147 gpr_log(GPR_INFO, "producer %d started", opt->id);
148 gpr_event_set(&opt->on_started, (void*)static_cast<intptr_t>(1));
149 GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time()));
150
151 gpr_log(GPR_INFO, "producer %d phase 1", opt->id);
152 for (i = 0; i < TEST_THREAD_EVENTS; i++) {
153 GPR_ASSERT(grpc_cq_begin_op(opt->cc, (void*)(intptr_t)1));
154 }
155
156 gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id);
157 gpr_event_set(&opt->on_phase1_done, (void*)static_cast<intptr_t>(1));
158 GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time()));
159
160 gpr_log(GPR_INFO, "producer %d phase 2", opt->id);
161 for (i = 0; i < TEST_THREAD_EVENTS; i++) {
162 grpc_core::ExecCtx exec_ctx;
163 grpc_cq_end_op(opt->cc, (void*)static_cast<intptr_t>(1), GRPC_ERROR_NONE,
164 free_completion, nullptr,
165 static_cast<grpc_cq_completion*>(
166 gpr_malloc(sizeof(grpc_cq_completion))));
167 opt->events_triggered++;
168 }
169
170 gpr_log(GPR_INFO, "producer %d phase 2 done", opt->id);
171 gpr_event_set(&opt->on_finished, (void*)static_cast<intptr_t>(1));
172 }
173
consumer_thread(void * arg)174 static void consumer_thread(void* arg) {
175 test_thread_options* opt = static_cast<test_thread_options*>(arg);
176 grpc_event ev;
177
178 gpr_log(GPR_INFO, "consumer %d started", opt->id);
179 gpr_event_set(&opt->on_started, (void*)static_cast<intptr_t>(1));
180 GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time()));
181
182 gpr_log(GPR_INFO, "consumer %d phase 1", opt->id);
183
184 gpr_log(GPR_INFO, "consumer %d phase 1 done", opt->id);
185 gpr_event_set(&opt->on_phase1_done, (void*)static_cast<intptr_t>(1));
186 GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time()));
187
188 gpr_log(GPR_INFO, "consumer %d phase 2", opt->id);
189 for (;;) {
190 ev = grpc_completion_queue_next(
191 opt->cc, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
192 switch (ev.type) {
193 case GRPC_OP_COMPLETE:
194 GPR_ASSERT(ev.success);
195 opt->events_triggered++;
196 break;
197 case GRPC_QUEUE_SHUTDOWN:
198 gpr_log(GPR_INFO, "consumer %d phase 2 done", opt->id);
199 gpr_event_set(&opt->on_finished, (void*)static_cast<intptr_t>(1));
200 return;
201 case GRPC_QUEUE_TIMEOUT:
202 gpr_log(GPR_ERROR, "Invalid timeout received");
203 abort();
204 }
205 }
206 }
207
test_threading(size_t producers,size_t consumers)208 static void test_threading(size_t producers, size_t consumers) {
209 test_thread_options* options = static_cast<test_thread_options*>(
210 gpr_malloc((producers + consumers) * sizeof(test_thread_options)));
211 gpr_event phase1 = GPR_EVENT_INIT;
212 gpr_event phase2 = GPR_EVENT_INIT;
213 grpc_completion_queue* cc = grpc_completion_queue_create_for_next(nullptr);
214 size_t i;
215 size_t total_consumed = 0;
216 static int optid = 101;
217
218 gpr_log(GPR_INFO, "%s: %" PRIuPTR " producers, %" PRIuPTR " consumers",
219 "test_threading", producers, consumers);
220
221 /* start all threads: they will wait for phase1 */
222 grpc_core::Thread* threads = static_cast<grpc_core::Thread*>(
223 gpr_malloc(sizeof(*threads) * (producers + consumers)));
224 for (i = 0; i < producers + consumers; i++) {
225 gpr_event_init(&options[i].on_started);
226 gpr_event_init(&options[i].on_phase1_done);
227 gpr_event_init(&options[i].on_finished);
228 options[i].phase1 = &phase1;
229 options[i].phase2 = &phase2;
230 options[i].events_triggered = 0;
231 options[i].cc = cc;
232 options[i].id = optid++;
233
234 bool ok;
235 threads[i] = grpc_core::Thread(
236 i < producers ? "grpc_producer" : "grpc_consumer",
237 i < producers ? producer_thread : consumer_thread, options + i, &ok);
238 GPR_ASSERT(ok);
239 threads[i].Start();
240 gpr_event_wait(&options[i].on_started, ten_seconds_time());
241 }
242
243 /* start phase1: producers will pre-declare all operations they will
244 complete */
245 gpr_log(GPR_INFO, "start phase 1");
246 gpr_event_set(&phase1, (void*)static_cast<intptr_t>(1));
247
248 gpr_log(GPR_INFO, "wait phase 1");
249 for (i = 0; i < producers + consumers; i++) {
250 GPR_ASSERT(gpr_event_wait(&options[i].on_phase1_done, ten_seconds_time()));
251 }
252 gpr_log(GPR_INFO, "done phase 1");
253
254 /* start phase2: operations will complete, and consumers will consume them */
255 gpr_log(GPR_INFO, "start phase 2");
256 gpr_event_set(&phase2, (void*)static_cast<intptr_t>(1));
257
258 /* in parallel, we shutdown the completion channel - all events should still
259 be consumed */
260 grpc_completion_queue_shutdown(cc);
261
262 /* join all threads */
263 gpr_log(GPR_INFO, "wait phase 2");
264 for (i = 0; i < producers + consumers; i++) {
265 GPR_ASSERT(gpr_event_wait(&options[i].on_finished, ten_seconds_time()));
266 }
267 gpr_log(GPR_INFO, "done phase 2");
268
269 /* destroy the completion channel */
270 grpc_completion_queue_destroy(cc);
271
272 for (i = 0; i < producers + consumers; i++) {
273 threads[i].Join();
274 }
275 gpr_free(threads);
276
277 /* verify that everything was produced and consumed */
278 for (i = 0; i < producers + consumers; i++) {
279 if (i < producers) {
280 GPR_ASSERT(options[i].events_triggered == TEST_THREAD_EVENTS);
281 } else {
282 total_consumed += options[i].events_triggered;
283 }
284 }
285 GPR_ASSERT(total_consumed == producers * TEST_THREAD_EVENTS);
286
287 gpr_free(options);
288 }
289
main(int argc,char ** argv)290 int main(int argc, char** argv) {
291 grpc_test_init(argc, argv);
292 grpc_init();
293 test_too_many_plucks();
294 test_threading(1, 1);
295 test_threading(1, 10);
296 test_threading(10, 1);
297 test_threading(10, 10);
298 grpc_shutdown();
299 return 0;
300 }
301