1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "src/core/lib/surface/completion_queue.h"
20
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/log.h>
23 #include <grpc/support/time.h>
24 #include "src/core/lib/gpr/useful.h"
25 #include "src/core/lib/gprpp/memory.h"
26 #include "src/core/lib/iomgr/iomgr.h"
27 #include "test/core/util/test_config.h"
28
29 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
30
create_test_tag(void)31 static void* create_test_tag(void) {
32 static intptr_t i = 0;
33 return (void*)(++i);
34 }
35
36 /* helper for tests to shutdown correctly and tersely */
shutdown_and_destroy(grpc_completion_queue * cc)37 static void shutdown_and_destroy(grpc_completion_queue* cc) {
38 grpc_event ev;
39 grpc_completion_queue_shutdown(cc);
40
41 switch (grpc_get_cq_completion_type(cc)) {
42 case GRPC_CQ_NEXT: {
43 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
44 nullptr);
45 GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
46 break;
47 }
48 case GRPC_CQ_PLUCK: {
49 ev = grpc_completion_queue_pluck(
50 cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
51 GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
52 break;
53 }
54 case GRPC_CQ_CALLBACK: {
55 // Nothing to do here. The shutdown callback will be invoked when
56 // possible.
57 break;
58 }
59 default: {
60 gpr_log(GPR_ERROR, "Unknown completion type");
61 break;
62 }
63 }
64
65 grpc_completion_queue_destroy(cc);
66 }
67
68 /* ensure we can create and destroy a completion channel */
test_no_op(void)69 static void test_no_op(void) {
70 grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
71 grpc_cq_polling_type polling_types[] = {
72 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
73 grpc_completion_queue_attributes attr;
74 LOG_TEST("test_no_op");
75
76 attr.version = 1;
77 for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
78 for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
79 attr.cq_completion_type = completion_types[i];
80 attr.cq_polling_type = polling_types[j];
81 shutdown_and_destroy(grpc_completion_queue_create(
82 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr));
83 }
84 }
85 }
86
test_pollset_conversion(void)87 static void test_pollset_conversion(void) {
88 grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
89 grpc_cq_polling_type polling_types[] = {GRPC_CQ_DEFAULT_POLLING,
90 GRPC_CQ_NON_LISTENING};
91 grpc_completion_queue* cq;
92 grpc_completion_queue_attributes attr;
93
94 LOG_TEST("test_pollset_conversion");
95
96 attr.version = 1;
97 for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
98 for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
99 attr.cq_completion_type = completion_types[i];
100 attr.cq_polling_type = polling_types[j];
101 cq = grpc_completion_queue_create(
102 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
103 GPR_ASSERT(grpc_cq_pollset(cq) != nullptr);
104 shutdown_and_destroy(cq);
105 }
106 }
107 }
108
test_wait_empty(void)109 static void test_wait_empty(void) {
110 grpc_cq_polling_type polling_types[] = {
111 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
112 grpc_completion_queue* cc;
113 grpc_completion_queue_attributes attr;
114 grpc_event event;
115
116 LOG_TEST("test_wait_empty");
117
118 attr.version = 1;
119 attr.cq_completion_type = GRPC_CQ_NEXT;
120 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
121 attr.cq_polling_type = polling_types[i];
122 cc = grpc_completion_queue_create(
123 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
124 event =
125 grpc_completion_queue_next(cc, gpr_now(GPR_CLOCK_REALTIME), nullptr);
126 GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
127 shutdown_and_destroy(cc);
128 }
129 }
130
do_nothing_end_completion(void * arg,grpc_cq_completion * c)131 static void do_nothing_end_completion(void* arg, grpc_cq_completion* c) {}
132
test_cq_end_op(void)133 static void test_cq_end_op(void) {
134 grpc_event ev;
135 grpc_completion_queue* cc;
136 grpc_cq_completion completion;
137 grpc_cq_polling_type polling_types[] = {
138 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
139 grpc_completion_queue_attributes attr;
140 void* tag = create_test_tag();
141
142 LOG_TEST("test_cq_end_op");
143
144 attr.version = 1;
145 attr.cq_completion_type = GRPC_CQ_NEXT;
146 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
147 grpc_core::ExecCtx exec_ctx;
148 attr.cq_polling_type = polling_types[i];
149 cc = grpc_completion_queue_create(
150 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
151
152 GPR_ASSERT(grpc_cq_begin_op(cc, tag));
153 grpc_cq_end_op(cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, nullptr,
154 &completion);
155
156 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
157 nullptr);
158 GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
159 GPR_ASSERT(ev.tag == tag);
160 GPR_ASSERT(ev.success);
161
162 shutdown_and_destroy(cc);
163 }
164 }
165
test_cq_tls_cache_full(void)166 static void test_cq_tls_cache_full(void) {
167 grpc_event ev;
168 grpc_completion_queue* cc;
169 grpc_cq_completion completion;
170 grpc_cq_polling_type polling_types[] = {
171 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
172 grpc_completion_queue_attributes attr;
173 void* tag = create_test_tag();
174 void* res_tag;
175 int ok;
176
177 LOG_TEST("test_cq_tls_cache_full");
178
179 attr.version = 1;
180 attr.cq_completion_type = GRPC_CQ_NEXT;
181 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
182 grpc_core::ExecCtx exec_ctx; // Reset exec_ctx
183 attr.cq_polling_type = polling_types[i];
184 cc = grpc_completion_queue_create(
185 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
186
187 grpc_completion_queue_thread_local_cache_init(cc);
188 GPR_ASSERT(grpc_cq_begin_op(cc, tag));
189 grpc_cq_end_op(cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, nullptr,
190 &completion);
191
192 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
193 nullptr);
194 GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
195
196 GPR_ASSERT(
197 grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 1);
198 GPR_ASSERT(res_tag == tag);
199 GPR_ASSERT(ok);
200
201 ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
202 nullptr);
203 GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
204
205 shutdown_and_destroy(cc);
206 }
207 }
208
test_cq_tls_cache_empty(void)209 static void test_cq_tls_cache_empty(void) {
210 grpc_completion_queue* cc;
211 grpc_cq_polling_type polling_types[] = {
212 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
213 grpc_completion_queue_attributes attr;
214 void* res_tag;
215 int ok;
216
217 LOG_TEST("test_cq_tls_cache_empty");
218
219 attr.version = 1;
220 attr.cq_completion_type = GRPC_CQ_NEXT;
221 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
222 grpc_core::ExecCtx exec_ctx; // Reset exec_ctx
223 attr.cq_polling_type = polling_types[i];
224 cc = grpc_completion_queue_create(
225 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
226
227 GPR_ASSERT(
228 grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
229 grpc_completion_queue_thread_local_cache_init(cc);
230 GPR_ASSERT(
231 grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
232 shutdown_and_destroy(cc);
233 }
234 }
235
test_shutdown_then_next_polling(void)236 static void test_shutdown_then_next_polling(void) {
237 grpc_cq_polling_type polling_types[] = {
238 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
239 grpc_completion_queue* cc;
240 grpc_completion_queue_attributes attr;
241 grpc_event event;
242 LOG_TEST("test_shutdown_then_next_polling");
243
244 attr.version = 1;
245 attr.cq_completion_type = GRPC_CQ_NEXT;
246 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
247 attr.cq_polling_type = polling_types[i];
248 cc = grpc_completion_queue_create(
249 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
250 grpc_completion_queue_shutdown(cc);
251 event = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
252 nullptr);
253 GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
254 grpc_completion_queue_destroy(cc);
255 }
256 }
257
test_shutdown_then_next_with_timeout(void)258 static void test_shutdown_then_next_with_timeout(void) {
259 grpc_cq_polling_type polling_types[] = {
260 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
261 grpc_completion_queue* cc;
262 grpc_completion_queue_attributes attr;
263 grpc_event event;
264 LOG_TEST("test_shutdown_then_next_with_timeout");
265
266 attr.version = 1;
267 attr.cq_completion_type = GRPC_CQ_NEXT;
268 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
269 attr.cq_polling_type = polling_types[i];
270 cc = grpc_completion_queue_create(
271 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
272
273 grpc_completion_queue_shutdown(cc);
274 event = grpc_completion_queue_next(cc, gpr_inf_future(GPR_CLOCK_REALTIME),
275 nullptr);
276 GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
277 grpc_completion_queue_destroy(cc);
278 }
279 }
280
test_pluck(void)281 static void test_pluck(void) {
282 grpc_event ev;
283 grpc_completion_queue* cc;
284 void* tags[128];
285 grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
286 grpc_cq_polling_type polling_types[] = {
287 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
288 grpc_completion_queue_attributes attr;
289 unsigned i, j;
290
291 LOG_TEST("test_pluck");
292
293 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
294 tags[i] = create_test_tag();
295 for (j = 0; j < i; j++) {
296 GPR_ASSERT(tags[i] != tags[j]);
297 }
298 }
299
300 attr.version = 1;
301 attr.cq_completion_type = GRPC_CQ_PLUCK;
302 for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
303 grpc_core::ExecCtx exec_ctx; // reset exec_ctx
304 attr.cq_polling_type = polling_types[pidx];
305 cc = grpc_completion_queue_create(
306 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
307
308 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
309 GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
310 grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
311 nullptr, &completions[i]);
312 }
313
314 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
315 ev = grpc_completion_queue_pluck(
316 cc, tags[i], gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
317 GPR_ASSERT(ev.tag == tags[i]);
318 }
319
320 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
321 GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
322 grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
323 nullptr, &completions[i]);
324 }
325
326 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
327 ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1],
328 gpr_inf_past(GPR_CLOCK_REALTIME),
329 nullptr);
330 GPR_ASSERT(ev.tag == tags[GPR_ARRAY_SIZE(tags) - i - 1]);
331 }
332
333 shutdown_and_destroy(cc);
334 }
335 }
336
test_pluck_after_shutdown(void)337 static void test_pluck_after_shutdown(void) {
338 grpc_cq_polling_type polling_types[] = {
339 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
340 grpc_event ev;
341 grpc_completion_queue* cc;
342 grpc_completion_queue_attributes attr;
343
344 LOG_TEST("test_pluck_after_shutdown");
345
346 attr.version = 1;
347 attr.cq_completion_type = GRPC_CQ_PLUCK;
348 for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
349 attr.cq_polling_type = polling_types[i];
350 cc = grpc_completion_queue_create(
351 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
352 grpc_completion_queue_shutdown(cc);
353 ev = grpc_completion_queue_pluck(
354 cc, nullptr, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
355 GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
356 grpc_completion_queue_destroy(cc);
357 }
358 }
359
test_callback(void)360 static void test_callback(void) {
361 grpc_completion_queue* cc;
362 void* tags[128];
363 grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
364 grpc_cq_polling_type polling_types[] = {
365 GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
366 grpc_completion_queue_attributes attr;
367 unsigned i;
368
369 LOG_TEST("test_callback");
370
371 bool got_shutdown = false;
372 class ShutdownCallback : public grpc_experimental_completion_queue_functor {
373 public:
374 ShutdownCallback(bool* done) : done_(done) {
375 functor_run = &ShutdownCallback::Run;
376 }
377 ~ShutdownCallback() {}
378 static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
379 *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
380 }
381
382 private:
383 bool* done_;
384 };
385 ShutdownCallback shutdown_cb(&got_shutdown);
386
387 attr.version = 2;
388 attr.cq_completion_type = GRPC_CQ_CALLBACK;
389 attr.cq_shutdown_cb = &shutdown_cb;
390
391 for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
392 grpc_core::ExecCtx exec_ctx; // reset exec_ctx
393 attr.cq_polling_type = polling_types[pidx];
394 cc = grpc_completion_queue_create(
395 grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
396
397 int counter = 0;
398 class TagCallback : public grpc_experimental_completion_queue_functor {
399 public:
400 TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
401 functor_run = &TagCallback::Run;
402 }
403 ~TagCallback() {}
404 static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
405 GPR_ASSERT(static_cast<bool>(ok));
406 auto* callback = static_cast<TagCallback*>(cb);
407 *callback->counter_ += callback->tag_;
408 grpc_core::Delete(callback);
409 };
410
411 private:
412 int* counter_;
413 int tag_;
414 };
415
416 int sumtags = 0;
417 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
418 tags[i] = static_cast<void*>(grpc_core::New<TagCallback>(&counter, i));
419 sumtags += i;
420 }
421
422 for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
423 GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
424 grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
425 nullptr, &completions[i]);
426 }
427
428 GPR_ASSERT(sumtags == counter);
429
430 shutdown_and_destroy(cc);
431
432 GPR_ASSERT(got_shutdown);
433 got_shutdown = false;
434 }
435 }
436
437 struct thread_state {
438 grpc_completion_queue* cc;
439 void* tag;
440 };
441
main(int argc,char ** argv)442 int main(int argc, char** argv) {
443 grpc_test_init(argc, argv);
444 grpc_init();
445 test_no_op();
446 test_pollset_conversion();
447 test_wait_empty();
448 test_shutdown_then_next_polling();
449 test_shutdown_then_next_with_timeout();
450 test_cq_end_op();
451 test_pluck();
452 test_pluck_after_shutdown();
453 test_cq_tls_cache_full();
454 test_cq_tls_cache_empty();
455 test_callback();
456 grpc_shutdown();
457 return 0;
458 }
459