1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "src/core/lib/surface/completion_queue.h"
20
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/log.h>
23 #include <grpc/support/time.h>
24 #include "src/core/lib/gpr/useful.h"
25 #include "src/core/lib/gprpp/memory.h"
26 #include "src/core/lib/gprpp/sync.h"
27 #include "src/core/lib/iomgr/iomgr.h"
28 #include "test/core/util/test_config.h"
29
30 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
31
create_test_tag(void)32 static void* create_test_tag(void) {
33 static intptr_t i = 0;
34 return (void*)(++i);
35 }
36
37 /* helper for tests to shutdown correctly and tersely */
shutdown_and_destroy(grpc_completion_queue * cc)38 static void shutdown_and_destroy(grpc_completion_queue* cc) {
39 grpc_event ev;
40 grpc_completion_queue_shutdown(cc);
41
42 switch (grpc_get_cq_completion_type(cc)) {
43 case GRPC_CQ_NEXT: {
44 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
45 nullptr);
46 GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
47 break;
48 }
49 case GRPC_CQ_PLUCK: {
50 ev = grpc_completion_queue_pluck(
51 cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
52 GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
53 break;
54 }
55 case GRPC_CQ_CALLBACK: {
56 // Nothing to do here. The shutdown callback will be invoked when
57 // possible.
58 break;
59 }
60 default: {
61 gpr_log(GPR_ERROR, "Unknown completion type");
62 break;
63 }
64 }
65
66 grpc_completion_queue_destroy(cc);
67 }
68
69 /* ensure we can create and destroy a completion channel */
test_no_op(void)70 static void test_no_op(void) {
71 grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
72 grpc_cq_polling_type polling_types[] = {
73 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
74 grpc_completion_queue_attributes attr;
75 LOG_TEST("test_no_op");
76
77 attr.version = 1;
78 for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
79 for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
80 attr.cq_completion_type = completion_types[i];
81 attr.cq_polling_type = polling_types[j];
82 shutdown_and_destroy(grpc_completion_queue_create(
83 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr));
84 }
85 }
86 }
87
test_pollset_conversion(void)88 static void test_pollset_conversion(void) {
89 grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
90 grpc_cq_polling_type polling_types[] = {GRPC_CQ_DEFAULT_POLLING,
91 GRPC_CQ_NON_LISTENING};
92 grpc_completion_queue* cq;
93 grpc_completion_queue_attributes attr;
94
95 LOG_TEST("test_pollset_conversion");
96
97 attr.version = 1;
98 for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
99 for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
100 attr.cq_completion_type = completion_types[i];
101 attr.cq_polling_type = polling_types[j];
102 cq = grpc_completion_queue_create(
103 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
104 GPR_ASSERT(grpc_cq_pollset(cq) != nullptr);
105 shutdown_and_destroy(cq);
106 }
107 }
108 }
109
test_wait_empty(void)110 static void test_wait_empty(void) {
111 grpc_cq_polling_type polling_types[] = {
112 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
113 grpc_completion_queue* cc;
114 grpc_completion_queue_attributes attr;
115 grpc_event event;
116
117 LOG_TEST("test_wait_empty");
118
119 attr.version = 1;
120 attr.cq_completion_type = GRPC_CQ_NEXT;
121 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
122 attr.cq_polling_type = polling_types[i];
123 cc = grpc_completion_queue_create(
124 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
125 event =
126 grpc_completion_queue_next(cc, gpr_now(GPR_CLOCK_REALTIME), nullptr);
127 GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
128 shutdown_and_destroy(cc);
129 }
130 }
131
do_nothing_end_completion(void *,grpc_cq_completion *)132 static void do_nothing_end_completion(void* /*arg*/,
133 grpc_cq_completion* /*c*/) {}
134
test_cq_end_op(void)135 static void test_cq_end_op(void) {
136 grpc_event ev;
137 grpc_completion_queue* cc;
138 grpc_cq_completion completion;
139 grpc_cq_polling_type polling_types[] = {
140 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
141 grpc_completion_queue_attributes attr;
142 void* tag = create_test_tag();
143
144 LOG_TEST("test_cq_end_op");
145
146 attr.version = 1;
147 attr.cq_completion_type = GRPC_CQ_NEXT;
148 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
149 grpc_core::ExecCtx exec_ctx;
150 attr.cq_polling_type = polling_types[i];
151 cc = grpc_completion_queue_create(
152 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
153
154 GPR_ASSERT(grpc_cq_begin_op(cc, tag));
155 grpc_cq_end_op(cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, nullptr,
156 &completion);
157
158 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
159 nullptr);
160 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
161 GPR_ASSERT(ev.tag == tag);
162 GPR_ASSERT(ev.success);
163
164 shutdown_and_destroy(cc);
165 }
166 }
167
test_cq_tls_cache_full(void)168 static void test_cq_tls_cache_full(void) {
169 grpc_event ev;
170 grpc_completion_queue* cc;
171 grpc_cq_completion completion;
172 grpc_cq_polling_type polling_types[] = {
173 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
174 grpc_completion_queue_attributes attr;
175 void* tag = create_test_tag();
176 void* res_tag;
177 int ok;
178
179 LOG_TEST("test_cq_tls_cache_full");
180
181 attr.version = 1;
182 attr.cq_completion_type = GRPC_CQ_NEXT;
183 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
184 grpc_core::ExecCtx exec_ctx; // Reset exec_ctx
185 attr.cq_polling_type = polling_types[i];
186 cc = grpc_completion_queue_create(
187 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
188
189 grpc_completion_queue_thread_local_cache_init(cc);
190 GPR_ASSERT(grpc_cq_begin_op(cc, tag));
191 grpc_cq_end_op(cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, nullptr,
192 &completion);
193
194 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
195 nullptr);
196 GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
197
198 GPR_ASSERT(
199 grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 1);
200 GPR_ASSERT(res_tag == tag);
201 GPR_ASSERT(ok);
202
203 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
204 nullptr);
205 GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
206
207 shutdown_and_destroy(cc);
208 }
209 }
210
test_cq_tls_cache_empty(void)211 static void test_cq_tls_cache_empty(void) {
212 grpc_completion_queue* cc;
213 grpc_cq_polling_type polling_types[] = {
214 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
215 grpc_completion_queue_attributes attr;
216 void* res_tag;
217 int ok;
218
219 LOG_TEST("test_cq_tls_cache_empty");
220
221 attr.version = 1;
222 attr.cq_completion_type = GRPC_CQ_NEXT;
223 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
224 grpc_core::ExecCtx exec_ctx; // Reset exec_ctx
225 attr.cq_polling_type = polling_types[i];
226 cc = grpc_completion_queue_create(
227 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
228
229 GPR_ASSERT(
230 grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
231 grpc_completion_queue_thread_local_cache_init(cc);
232 GPR_ASSERT(
233 grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
234 shutdown_and_destroy(cc);
235 }
236 }
237
test_shutdown_then_next_polling(void)238 static void test_shutdown_then_next_polling(void) {
239 grpc_cq_polling_type polling_types[] = {
240 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
241 grpc_completion_queue* cc;
242 grpc_completion_queue_attributes attr;
243 grpc_event event;
244 LOG_TEST("test_shutdown_then_next_polling");
245
246 attr.version = 1;
247 attr.cq_completion_type = GRPC_CQ_NEXT;
248 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
249 attr.cq_polling_type = polling_types[i];
250 cc = grpc_completion_queue_create(
251 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
252 grpc_completion_queue_shutdown(cc);
253 event = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
254 nullptr);
255 GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
256 grpc_completion_queue_destroy(cc);
257 }
258 }
259
test_shutdown_then_next_with_timeout(void)260 static void test_shutdown_then_next_with_timeout(void) {
261 grpc_cq_polling_type polling_types[] = {
262 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
263 grpc_completion_queue* cc;
264 grpc_completion_queue_attributes attr;
265 grpc_event event;
266 LOG_TEST("test_shutdown_then_next_with_timeout");
267
268 attr.version = 1;
269 attr.cq_completion_type = GRPC_CQ_NEXT;
270 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
271 attr.cq_polling_type = polling_types[i];
272 cc = grpc_completion_queue_create(
273 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
274
275 grpc_completion_queue_shutdown(cc);
276 event = grpc_completion_queue_next(cc, gpr_inf_future(GPR_CLOCK_REALTIME),
277 nullptr);
278 GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
279 grpc_completion_queue_destroy(cc);
280 }
281 }
282
test_pluck(void)283 static void test_pluck(void) {
284 grpc_event ev;
285 grpc_completion_queue* cc;
286 void* tags[128];
287 grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
288 grpc_cq_polling_type polling_types[] = {
289 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
290 grpc_completion_queue_attributes attr;
291 unsigned i, j;
292
293 LOG_TEST("test_pluck");
294
295 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
296 tags[i] = create_test_tag();
297 for (j = 0; j < i; j++) {
298 GPR_ASSERT(tags[i] != tags[j]);
299 }
300 }
301
302 attr.version = 1;
303 attr.cq_completion_type = GRPC_CQ_PLUCK;
304 for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
305 grpc_core::ExecCtx exec_ctx; // reset exec_ctx
306 attr.cq_polling_type = polling_types[pidx];
307 cc = grpc_completion_queue_create(
308 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
309
310 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
311 GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
312 grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
313 nullptr, &completions[i]);
314 }
315
316 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
317 ev = grpc_completion_queue_pluck(
318 cc, tags[i], gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
319 GPR_ASSERT(ev.tag == tags[i]);
320 }
321
322 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
323 GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
324 grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
325 nullptr, &completions[i]);
326 }
327
328 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
329 ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1],
330 gpr_inf_past(GPR_CLOCK_REALTIME),
331 nullptr);
332 GPR_ASSERT(ev.tag == tags[GPR_ARRAY_SIZE(tags) - i - 1]);
333 }
334
335 shutdown_and_destroy(cc);
336 }
337 }
338
test_pluck_after_shutdown(void)339 static void test_pluck_after_shutdown(void) {
340 grpc_cq_polling_type polling_types[] = {
341 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
342 grpc_event ev;
343 grpc_completion_queue* cc;
344 grpc_completion_queue_attributes attr;
345
346 LOG_TEST("test_pluck_after_shutdown");
347
348 attr.version = 1;
349 attr.cq_completion_type = GRPC_CQ_PLUCK;
350 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
351 attr.cq_polling_type = polling_types[i];
352 cc = grpc_completion_queue_create(
353 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
354 grpc_completion_queue_shutdown(cc);
355 ev = grpc_completion_queue_pluck(
356 cc, nullptr, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
357 GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
358 grpc_completion_queue_destroy(cc);
359 }
360 }
361
test_callback(void)362 static void test_callback(void) {
363 grpc_completion_queue* cc;
364 static void* tags[128];
365 grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
366 grpc_cq_polling_type polling_types[] = {
367 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
368 grpc_completion_queue_attributes attr;
369 unsigned i;
370 static gpr_mu mu, shutdown_mu;
371 static gpr_cv cv, shutdown_cv;
372 static int cb_counter;
373 gpr_mu_init(&mu);
374 gpr_mu_init(&shutdown_mu);
375 gpr_cv_init(&cv);
376 gpr_cv_init(&shutdown_cv);
377
378 LOG_TEST("test_callback");
379
380 bool got_shutdown = false;
381 class ShutdownCallback : public grpc_experimental_completion_queue_functor {
382 public:
383 ShutdownCallback(bool* done) : done_(done) {
384 functor_run = &ShutdownCallback::Run;
385 inlineable = false;
386 }
387 ~ShutdownCallback() {}
388 static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
389 gpr_mu_lock(&shutdown_mu);
390 *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
391 // Signal when the shutdown callback is completed.
392 gpr_cv_signal(&shutdown_cv);
393 gpr_mu_unlock(&shutdown_mu);
394 }
395
396 private:
397 bool* done_;
398 };
399 ShutdownCallback shutdown_cb(&got_shutdown);
400
401 attr.version = 2;
402 attr.cq_completion_type = GRPC_CQ_CALLBACK;
403 attr.cq_shutdown_cb = &shutdown_cb;
404
405 for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
406 int sumtags = 0;
407 int counter = 0;
408 cb_counter = 0;
409 {
410 // reset exec_ctx types
411 grpc_core::ExecCtx exec_ctx;
412 attr.cq_polling_type = polling_types[pidx];
413 cc = grpc_completion_queue_create(
414 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
415
416 class TagCallback : public grpc_experimental_completion_queue_functor {
417 public:
418 TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
419 functor_run = &TagCallback::Run;
420 // Inlineable should be false since this callback takes locks.
421 inlineable = false;
422 }
423 ~TagCallback() {}
424 static void Run(grpc_experimental_completion_queue_functor* cb,
425 int ok) {
426 GPR_ASSERT(static_cast<bool>(ok));
427 auto* callback = static_cast<TagCallback*>(cb);
428 gpr_mu_lock(&mu);
429 cb_counter++;
430 *callback->counter_ += callback->tag_;
431 if (cb_counter == GPR_ARRAY_SIZE(tags)) {
432 gpr_cv_signal(&cv);
433 }
434 gpr_mu_unlock(&mu);
435 delete callback;
436 };
437
438 private:
439 int* counter_;
440 int tag_;
441 };
442
443 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
444 tags[i] = static_cast<void*>(new TagCallback(&counter, i));
445 sumtags += i;
446 }
447
448 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
449 GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
450 grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
451 nullptr, &completions[i]);
452 }
453
454 gpr_mu_lock(&mu);
455 while (cb_counter != GPR_ARRAY_SIZE(tags)) {
456 // Wait for all the callbacks to complete.
457 gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
458 }
459 gpr_mu_unlock(&mu);
460
461 shutdown_and_destroy(cc);
462
463 gpr_mu_lock(&shutdown_mu);
464 while (!got_shutdown) {
465 // Wait for the shutdown callback to complete.
466 gpr_cv_wait(&shutdown_cv, &shutdown_mu,
467 gpr_inf_future(GPR_CLOCK_REALTIME));
468 }
469 gpr_mu_unlock(&shutdown_mu);
470 }
471
472 // Run the assertions to check if the test ran successfully.
473 GPR_ASSERT(sumtags == counter);
474 GPR_ASSERT(got_shutdown);
475 got_shutdown = false;
476 }
477
478 gpr_cv_destroy(&cv);
479 gpr_cv_destroy(&shutdown_cv);
480 gpr_mu_destroy(&mu);
481 gpr_mu_destroy(&shutdown_mu);
482 }
483
484 struct thread_state {
485 grpc_completion_queue* cc;
486 void* tag;
487 };
488
main(int argc,char ** argv)489 int main(int argc, char** argv) {
490 grpc::testing::TestEnvironment env(argc, argv);
491 grpc_init();
492 test_no_op();
493 test_pollset_conversion();
494 test_wait_empty();
495 test_shutdown_then_next_polling();
496 test_shutdown_then_next_with_timeout();
497 test_cq_end_op();
498 test_pluck();
499 test_pluck_after_shutdown();
500 test_cq_tls_cache_full();
501 test_cq_tls_cache_empty();
502 test_callback();
503 grpc_shutdown();
504 return 0;
505 }
506