1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/lib/surface/completion_queue.h"
20
21 #include <grpc/grpc.h>
22 #include <grpc/support/sync.h>
23 #include <grpc/support/time.h>
24 #include <stddef.h>
25
26 #include <memory>
27
28 #include "absl/log/log.h"
29 #include "absl/status/status.h"
30 #include "gtest/gtest.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32 #include "src/core/util/useful.h"
33 #include "test/core/test_util/test_config.h"
34
35 #define LOG_TEST(x) LOG(INFO) << x
36
create_test_tag(void)37 static void* create_test_tag(void) {
38 static intptr_t i = 0;
39 return reinterpret_cast<void*>(++i);
40 }
41
42 // helper for tests to shutdown correctly and tersely
shutdown_and_destroy(grpc_completion_queue * cc)43 static void shutdown_and_destroy(grpc_completion_queue* cc) {
44 grpc_event ev;
45 grpc_completion_queue_shutdown(cc);
46
47 switch (grpc_get_cq_completion_type(cc)) {
48 case GRPC_CQ_NEXT: {
49 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
50 nullptr);
51 ASSERT_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
52 break;
53 }
54 case GRPC_CQ_PLUCK: {
55 ev = grpc_completion_queue_pluck(
56 cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
57 ASSERT_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
58 break;
59 }
60 case GRPC_CQ_CALLBACK: {
61 // Nothing to do here. The shutdown callback will be invoked when
62 // possible.
63 break;
64 }
65 default: {
66 LOG(ERROR) << "Unknown completion type";
67 break;
68 }
69 }
70
71 grpc_completion_queue_destroy(cc);
72 }
73
74 // ensure we can create and destroy a completion channel
TEST(GrpcCompletionQueueTest,TestNoOp)75 TEST(GrpcCompletionQueueTest, TestNoOp) {
76 grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
77 grpc_cq_polling_type polling_types[] = {
78 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
79 grpc_completion_queue_attributes attr = {};
80 LOG_TEST("test_no_op");
81
82 attr.version = 1;
83 for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
84 for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
85 attr.cq_completion_type = completion_types[i];
86 attr.cq_polling_type = polling_types[j];
87 shutdown_and_destroy(grpc_completion_queue_create(
88 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr));
89 }
90 }
91 }
92
TEST(GrpcCompletionQueueTest,TestPollsetConversion)93 TEST(GrpcCompletionQueueTest, TestPollsetConversion) {
94 grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
95 grpc_cq_polling_type polling_types[] = {GRPC_CQ_DEFAULT_POLLING,
96 GRPC_CQ_NON_LISTENING};
97 grpc_completion_queue* cq;
98 grpc_completion_queue_attributes attr = {};
99
100 LOG_TEST("test_pollset_conversion");
101
102 attr.version = 1;
103 for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
104 for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
105 attr.cq_completion_type = completion_types[i];
106 attr.cq_polling_type = polling_types[j];
107 cq = grpc_completion_queue_create(
108 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
109 ASSERT_NE(grpc_cq_pollset(cq), nullptr);
110 shutdown_and_destroy(cq);
111 }
112 }
113 }
114
TEST(GrpcCompletionQueueTest,TestWaitEmpty)115 TEST(GrpcCompletionQueueTest, TestWaitEmpty) {
116 grpc_cq_polling_type polling_types[] = {
117 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
118 grpc_completion_queue* cc;
119 grpc_completion_queue_attributes attr = {};
120 grpc_event event;
121
122 LOG_TEST("test_wait_empty");
123
124 attr.version = 1;
125 attr.cq_completion_type = GRPC_CQ_NEXT;
126 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
127 attr.cq_polling_type = polling_types[i];
128 cc = grpc_completion_queue_create(
129 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
130 event =
131 grpc_completion_queue_next(cc, gpr_now(GPR_CLOCK_REALTIME), nullptr);
132 ASSERT_EQ(event.type, GRPC_QUEUE_TIMEOUT);
133 shutdown_and_destroy(cc);
134 }
135 }
136
do_nothing_end_completion(void *,grpc_cq_completion *)137 static void do_nothing_end_completion(void* /*arg*/,
138 grpc_cq_completion* /*c*/) {}
139
TEST(GrpcCompletionQueueTest,TestCqEndOp)140 TEST(GrpcCompletionQueueTest, TestCqEndOp) {
141 grpc_event ev;
142 grpc_completion_queue* cc;
143 grpc_cq_completion completion;
144 grpc_cq_polling_type polling_types[] = {
145 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
146 grpc_completion_queue_attributes attr = {};
147 void* tag = create_test_tag();
148
149 LOG_TEST("test_cq_end_op");
150
151 attr.version = 1;
152 attr.cq_completion_type = GRPC_CQ_NEXT;
153 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
154 grpc_core::ExecCtx exec_ctx;
155 attr.cq_polling_type = polling_types[i];
156 cc = grpc_completion_queue_create(
157 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
158
159 ASSERT_TRUE(grpc_cq_begin_op(cc, tag));
160 grpc_cq_end_op(cc, tag, absl::OkStatus(), do_nothing_end_completion,
161 nullptr, &completion);
162
163 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
164 nullptr);
165 ASSERT_EQ(ev.type, GRPC_OP_COMPLETE);
166 ASSERT_EQ(ev.tag, tag);
167 ASSERT_TRUE(ev.success);
168
169 shutdown_and_destroy(cc);
170 }
171 }
172
TEST(GrpcCompletionQueueTest,TestCqTlsCacheFull)173 TEST(GrpcCompletionQueueTest, TestCqTlsCacheFull) {
174 grpc_event ev;
175 grpc_completion_queue* cc;
176 grpc_cq_completion completion;
177 grpc_cq_polling_type polling_types[] = {
178 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
179 grpc_completion_queue_attributes attr = {};
180 void* tag = create_test_tag();
181 void* res_tag;
182 int ok;
183
184 LOG_TEST("test_cq_tls_cache_full");
185
186 attr.version = 1;
187 attr.cq_completion_type = GRPC_CQ_NEXT;
188 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
189 grpc_core::ExecCtx exec_ctx; // Reset exec_ctx
190 attr.cq_polling_type = polling_types[i];
191 cc = grpc_completion_queue_create(
192 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
193
194 grpc_completion_queue_thread_local_cache_init(cc);
195 ASSERT_TRUE(grpc_cq_begin_op(cc, tag));
196 grpc_cq_end_op(cc, tag, absl::OkStatus(), do_nothing_end_completion,
197 nullptr, &completion);
198
199 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
200 nullptr);
201 ASSERT_EQ(ev.type, GRPC_QUEUE_TIMEOUT);
202
203 ASSERT_EQ(grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok),
204 1);
205 ASSERT_EQ(res_tag, tag);
206 ASSERT_TRUE(ok);
207
208 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
209 nullptr);
210 ASSERT_EQ(ev.type, GRPC_QUEUE_TIMEOUT);
211
212 shutdown_and_destroy(cc);
213 }
214 }
215
TEST(GrpcCompletionQueueTest,TestCqTlsCacheEmpty)216 TEST(GrpcCompletionQueueTest, TestCqTlsCacheEmpty) {
217 grpc_completion_queue* cc;
218 grpc_cq_polling_type polling_types[] = {
219 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
220 grpc_completion_queue_attributes attr = {};
221 void* res_tag;
222 int ok;
223
224 LOG_TEST("test_cq_tls_cache_empty");
225
226 attr.version = 1;
227 attr.cq_completion_type = GRPC_CQ_NEXT;
228 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
229 grpc_core::ExecCtx exec_ctx; // Reset exec_ctx
230 attr.cq_polling_type = polling_types[i];
231 cc = grpc_completion_queue_create(
232 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
233
234 ASSERT_EQ(grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok),
235 0);
236 grpc_completion_queue_thread_local_cache_init(cc);
237 ASSERT_EQ(grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok),
238 0);
239 shutdown_and_destroy(cc);
240 }
241 }
242
TEST(GrpcCompletionQueueTest,TestShutdownThenNextPolling)243 TEST(GrpcCompletionQueueTest, TestShutdownThenNextPolling) {
244 grpc_cq_polling_type polling_types[] = {
245 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
246 grpc_completion_queue* cc;
247 grpc_completion_queue_attributes attr = {};
248 grpc_event event;
249 LOG_TEST("test_shutdown_then_next_polling");
250
251 attr.version = 1;
252 attr.cq_completion_type = GRPC_CQ_NEXT;
253 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
254 attr.cq_polling_type = polling_types[i];
255 cc = grpc_completion_queue_create(
256 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
257 grpc_completion_queue_shutdown(cc);
258 event = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
259 nullptr);
260 ASSERT_EQ(event.type, GRPC_QUEUE_SHUTDOWN);
261 grpc_completion_queue_destroy(cc);
262 }
263 }
264
TEST(GrpcCompletionQueueTest,TestShutdownThenNextWithTimeout)265 TEST(GrpcCompletionQueueTest, TestShutdownThenNextWithTimeout) {
266 grpc_cq_polling_type polling_types[] = {
267 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
268 grpc_completion_queue* cc;
269 grpc_completion_queue_attributes attr = {};
270 grpc_event event;
271 LOG_TEST("test_shutdown_then_next_with_timeout");
272
273 attr.version = 1;
274 attr.cq_completion_type = GRPC_CQ_NEXT;
275 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
276 attr.cq_polling_type = polling_types[i];
277 cc = grpc_completion_queue_create(
278 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
279
280 grpc_completion_queue_shutdown(cc);
281 event = grpc_completion_queue_next(cc, gpr_inf_future(GPR_CLOCK_REALTIME),
282 nullptr);
283 ASSERT_EQ(event.type, GRPC_QUEUE_SHUTDOWN);
284 grpc_completion_queue_destroy(cc);
285 }
286 }
287
TEST(GrpcCompletionQueueTest,TestPluck)288 TEST(GrpcCompletionQueueTest, TestPluck) {
289 grpc_event ev;
290 grpc_completion_queue* cc;
291 void* tags[128];
292 grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
293 grpc_cq_polling_type polling_types[] = {
294 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
295 grpc_completion_queue_attributes attr = {};
296 unsigned i, j;
297
298 LOG_TEST("test_pluck");
299
300 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
301 tags[i] = create_test_tag();
302 for (j = 0; j < i; j++) {
303 ASSERT_NE(tags[i], tags[j]);
304 }
305 }
306
307 attr.version = 1;
308 attr.cq_completion_type = GRPC_CQ_PLUCK;
309 for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
310 grpc_core::ExecCtx exec_ctx; // reset exec_ctx
311 attr.cq_polling_type = polling_types[pidx];
312 cc = grpc_completion_queue_create(
313 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
314
315 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
316 ASSERT_TRUE(grpc_cq_begin_op(cc, tags[i]));
317 grpc_cq_end_op(cc, tags[i], absl::OkStatus(), do_nothing_end_completion,
318 nullptr, &completions[i]);
319 }
320
321 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
322 ev = grpc_completion_queue_pluck(
323 cc, tags[i], gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
324 ASSERT_EQ(ev.tag, tags[i]);
325 }
326
327 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
328 ASSERT_TRUE(grpc_cq_begin_op(cc, tags[i]));
329 grpc_cq_end_op(cc, tags[i], absl::OkStatus(), do_nothing_end_completion,
330 nullptr, &completions[i]);
331 }
332
333 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
334 ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1],
335 gpr_inf_past(GPR_CLOCK_REALTIME),
336 nullptr);
337 ASSERT_EQ(ev.tag, tags[GPR_ARRAY_SIZE(tags) - i - 1]);
338 }
339
340 shutdown_and_destroy(cc);
341 }
342 }
343
TEST(GrpcCompletionQueueTest,TestPluckAfterShutdown)344 TEST(GrpcCompletionQueueTest, TestPluckAfterShutdown) {
345 grpc_cq_polling_type polling_types[] = {
346 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
347 grpc_event ev;
348 grpc_completion_queue* cc;
349 grpc_completion_queue_attributes attr = {};
350
351 LOG_TEST("test_pluck_after_shutdown");
352
353 attr.version = 1;
354 attr.cq_completion_type = GRPC_CQ_PLUCK;
355 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
356 attr.cq_polling_type = polling_types[i];
357 cc = grpc_completion_queue_create(
358 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
359 grpc_completion_queue_shutdown(cc);
360 ev = grpc_completion_queue_pluck(
361 cc, nullptr, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
362 ASSERT_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
363 grpc_completion_queue_destroy(cc);
364 }
365 }
366
TEST(GrpcCompletionQueueTest,TestCallback)367 TEST(GrpcCompletionQueueTest, TestCallback) {
368 grpc_completion_queue* cc;
369 static void* tags[128];
370 grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
371 grpc_cq_polling_type polling_types[] = {
372 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
373 grpc_completion_queue_attributes attr = {};
374 unsigned i;
375 static gpr_mu mu, shutdown_mu;
376 static gpr_cv cv, shutdown_cv;
377 static int cb_counter;
378 gpr_mu_init(&mu);
379 gpr_mu_init(&shutdown_mu);
380 gpr_cv_init(&cv);
381 gpr_cv_init(&shutdown_cv);
382
383 LOG_TEST("test_callback");
384
385 bool got_shutdown = false;
386 class ShutdownCallback : public grpc_completion_queue_functor {
387 public:
388 explicit ShutdownCallback(bool* done) : done_(done) {
389 functor_run = &ShutdownCallback::Run;
390 inlineable = false;
391 }
392 ~ShutdownCallback() {}
393 static void Run(grpc_completion_queue_functor* cb, int ok) {
394 gpr_mu_lock(&shutdown_mu);
395 *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
396 // Signal when the shutdown callback is completed.
397 gpr_cv_signal(&shutdown_cv);
398 gpr_mu_unlock(&shutdown_mu);
399 }
400
401 private:
402 bool* done_;
403 };
404 ShutdownCallback shutdown_cb(&got_shutdown);
405
406 attr.version = 2;
407 attr.cq_completion_type = GRPC_CQ_CALLBACK;
408 attr.cq_shutdown_cb = &shutdown_cb;
409
410 for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
411 int sumtags = 0;
412 int counter = 0;
413 cb_counter = 0;
414 {
415 // reset exec_ctx types
416 grpc_core::ExecCtx exec_ctx;
417 attr.cq_polling_type = polling_types[pidx];
418 cc = grpc_completion_queue_create(
419 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
420
421 class TagCallback : public grpc_completion_queue_functor {
422 public:
423 TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
424 functor_run = &TagCallback::Run;
425 // Inlineable should be false since this callback takes locks.
426 inlineable = false;
427 }
428 ~TagCallback() {}
429 static void Run(grpc_completion_queue_functor* cb, int ok) {
430 ASSERT_TRUE(static_cast<bool>(ok));
431 auto* callback = static_cast<TagCallback*>(cb);
432 gpr_mu_lock(&mu);
433 cb_counter++;
434 *callback->counter_ += callback->tag_;
435 if (cb_counter == GPR_ARRAY_SIZE(tags)) {
436 gpr_cv_signal(&cv);
437 }
438 gpr_mu_unlock(&mu);
439 delete callback;
440 };
441
442 private:
443 int* counter_;
444 int tag_;
445 };
446
447 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
448 tags[i] = static_cast<void*>(new TagCallback(&counter, i));
449 sumtags += i;
450 }
451
452 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
453 ASSERT_TRUE(grpc_cq_begin_op(cc, tags[i]));
454 grpc_cq_end_op(cc, tags[i], absl::OkStatus(), do_nothing_end_completion,
455 nullptr, &completions[i]);
456 }
457
458 gpr_mu_lock(&mu);
459 while (cb_counter != GPR_ARRAY_SIZE(tags)) {
460 // Wait for all the callbacks to complete.
461 gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
462 }
463 gpr_mu_unlock(&mu);
464
465 shutdown_and_destroy(cc);
466
467 gpr_mu_lock(&shutdown_mu);
468 while (!got_shutdown) {
469 // Wait for the shutdown callback to complete.
470 gpr_cv_wait(&shutdown_cv, &shutdown_mu,
471 gpr_inf_future(GPR_CLOCK_REALTIME));
472 }
473 gpr_mu_unlock(&shutdown_mu);
474 }
475
476 // Run the assertions to check if the test ran successfully.
477 ASSERT_EQ(sumtags, counter);
478 ASSERT_TRUE(got_shutdown);
479 got_shutdown = false;
480 }
481
482 gpr_cv_destroy(&cv);
483 gpr_cv_destroy(&shutdown_cv);
484 gpr_mu_destroy(&mu);
485 gpr_mu_destroy(&shutdown_mu);
486 }
487
488 struct thread_state {
489 grpc_completion_queue* cc;
490 void* tag;
491 };
492
main(int argc,char ** argv)493 int main(int argc, char** argv) {
494 grpc::testing::TestEnvironment env(&argc, argv);
495 ::testing::InitGoogleTest(&argc, argv);
496 grpc::testing::TestGrpcScope grpc_scope;
497 return RUN_ALL_TESTS();
498 }
499