• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "src/core/lib/surface/completion_queue.h"
20 
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/log.h>
23 #include <grpc/support/time.h>
24 #include "src/core/lib/gpr/useful.h"
25 #include "src/core/lib/gprpp/memory.h"
26 #include "src/core/lib/iomgr/iomgr.h"
27 #include "test/core/util/test_config.h"
28 
29 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
30 
create_test_tag(void)31 static void* create_test_tag(void) {
32   static intptr_t i = 0;
33   return (void*)(++i);
34 }
35 
36 /* helper for tests to shutdown correctly and tersely */
shutdown_and_destroy(grpc_completion_queue * cc)37 static void shutdown_and_destroy(grpc_completion_queue* cc) {
38   grpc_event ev;
39   grpc_completion_queue_shutdown(cc);
40 
41   switch (grpc_get_cq_completion_type(cc)) {
42     case GRPC_CQ_NEXT: {
43       ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
44                                       nullptr);
45       GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
46       break;
47     }
48     case GRPC_CQ_PLUCK: {
49       ev = grpc_completion_queue_pluck(
50           cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
51       GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
52       break;
53     }
54     case GRPC_CQ_CALLBACK: {
55       // Nothing to do here. The shutdown callback will be invoked when
56       // possible.
57       break;
58     }
59     default: {
60       gpr_log(GPR_ERROR, "Unknown completion type");
61       break;
62     }
63   }
64 
65   grpc_completion_queue_destroy(cc);
66 }
67 
68 /* ensure we can create and destroy a completion channel */
test_no_op(void)69 static void test_no_op(void) {
70   grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
71   grpc_cq_polling_type polling_types[] = {
72       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
73   grpc_completion_queue_attributes attr;
74   LOG_TEST("test_no_op");
75 
76   attr.version = 1;
77   for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
78     for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
79       attr.cq_completion_type = completion_types[i];
80       attr.cq_polling_type = polling_types[j];
81       shutdown_and_destroy(grpc_completion_queue_create(
82           grpc_completion_queue_factory_lookup(&attr), &attr, nullptr));
83     }
84   }
85 }
86 
test_pollset_conversion(void)87 static void test_pollset_conversion(void) {
88   grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
89   grpc_cq_polling_type polling_types[] = {GRPC_CQ_DEFAULT_POLLING,
90                                           GRPC_CQ_NON_LISTENING};
91   grpc_completion_queue* cq;
92   grpc_completion_queue_attributes attr;
93 
94   LOG_TEST("test_pollset_conversion");
95 
96   attr.version = 1;
97   for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
98     for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
99       attr.cq_completion_type = completion_types[i];
100       attr.cq_polling_type = polling_types[j];
101       cq = grpc_completion_queue_create(
102           grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
103       GPR_ASSERT(grpc_cq_pollset(cq) != nullptr);
104       shutdown_and_destroy(cq);
105     }
106   }
107 }
108 
test_wait_empty(void)109 static void test_wait_empty(void) {
110   grpc_cq_polling_type polling_types[] = {
111       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
112   grpc_completion_queue* cc;
113   grpc_completion_queue_attributes attr;
114   grpc_event event;
115 
116   LOG_TEST("test_wait_empty");
117 
118   attr.version = 1;
119   attr.cq_completion_type = GRPC_CQ_NEXT;
120   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
121     attr.cq_polling_type = polling_types[i];
122     cc = grpc_completion_queue_create(
123         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
124     event =
125         grpc_completion_queue_next(cc, gpr_now(GPR_CLOCK_REALTIME), nullptr);
126     GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
127     shutdown_and_destroy(cc);
128   }
129 }
130 
do_nothing_end_completion(void * arg,grpc_cq_completion * c)131 static void do_nothing_end_completion(void* arg, grpc_cq_completion* c) {}
132 
test_cq_end_op(void)133 static void test_cq_end_op(void) {
134   grpc_event ev;
135   grpc_completion_queue* cc;
136   grpc_cq_completion completion;
137   grpc_cq_polling_type polling_types[] = {
138       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
139   grpc_completion_queue_attributes attr;
140   void* tag = create_test_tag();
141 
142   LOG_TEST("test_cq_end_op");
143 
144   attr.version = 1;
145   attr.cq_completion_type = GRPC_CQ_NEXT;
146   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
147     grpc_core::ExecCtx exec_ctx;
148     attr.cq_polling_type = polling_types[i];
149     cc = grpc_completion_queue_create(
150         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
151 
152     GPR_ASSERT(grpc_cq_begin_op(cc, tag));
153     grpc_cq_end_op(cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, nullptr,
154                    &completion);
155 
156     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
157                                     nullptr);
158     GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
159     GPR_ASSERT(ev.tag == tag);
160     GPR_ASSERT(ev.success);
161 
162     shutdown_and_destroy(cc);
163   }
164 }
165 
test_cq_tls_cache_full(void)166 static void test_cq_tls_cache_full(void) {
167   grpc_event ev;
168   grpc_completion_queue* cc;
169   grpc_cq_completion completion;
170   grpc_cq_polling_type polling_types[] = {
171       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
172   grpc_completion_queue_attributes attr;
173   void* tag = create_test_tag();
174   void* res_tag;
175   int ok;
176 
177   LOG_TEST("test_cq_tls_cache_full");
178 
179   attr.version = 1;
180   attr.cq_completion_type = GRPC_CQ_NEXT;
181   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
182     grpc_core::ExecCtx exec_ctx;  // Reset exec_ctx
183     attr.cq_polling_type = polling_types[i];
184     cc = grpc_completion_queue_create(
185         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
186 
187     grpc_completion_queue_thread_local_cache_init(cc);
188     GPR_ASSERT(grpc_cq_begin_op(cc, tag));
189     grpc_cq_end_op(cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, nullptr,
190                    &completion);
191 
192     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
193                                     nullptr);
194     GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
195 
196     GPR_ASSERT(
197         grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 1);
198     GPR_ASSERT(res_tag == tag);
199     GPR_ASSERT(ok);
200 
201     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
202                                     nullptr);
203     GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
204 
205     shutdown_and_destroy(cc);
206   }
207 }
208 
test_cq_tls_cache_empty(void)209 static void test_cq_tls_cache_empty(void) {
210   grpc_completion_queue* cc;
211   grpc_cq_polling_type polling_types[] = {
212       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
213   grpc_completion_queue_attributes attr;
214   void* res_tag;
215   int ok;
216 
217   LOG_TEST("test_cq_tls_cache_empty");
218 
219   attr.version = 1;
220   attr.cq_completion_type = GRPC_CQ_NEXT;
221   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
222     grpc_core::ExecCtx exec_ctx;  // Reset exec_ctx
223     attr.cq_polling_type = polling_types[i];
224     cc = grpc_completion_queue_create(
225         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
226 
227     GPR_ASSERT(
228         grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
229     grpc_completion_queue_thread_local_cache_init(cc);
230     GPR_ASSERT(
231         grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
232     shutdown_and_destroy(cc);
233   }
234 }
235 
test_shutdown_then_next_polling(void)236 static void test_shutdown_then_next_polling(void) {
237   grpc_cq_polling_type polling_types[] = {
238       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
239   grpc_completion_queue* cc;
240   grpc_completion_queue_attributes attr;
241   grpc_event event;
242   LOG_TEST("test_shutdown_then_next_polling");
243 
244   attr.version = 1;
245   attr.cq_completion_type = GRPC_CQ_NEXT;
246   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
247     attr.cq_polling_type = polling_types[i];
248     cc = grpc_completion_queue_create(
249         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
250     grpc_completion_queue_shutdown(cc);
251     event = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
252                                        nullptr);
253     GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
254     grpc_completion_queue_destroy(cc);
255   }
256 }
257 
test_shutdown_then_next_with_timeout(void)258 static void test_shutdown_then_next_with_timeout(void) {
259   grpc_cq_polling_type polling_types[] = {
260       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
261   grpc_completion_queue* cc;
262   grpc_completion_queue_attributes attr;
263   grpc_event event;
264   LOG_TEST("test_shutdown_then_next_with_timeout");
265 
266   attr.version = 1;
267   attr.cq_completion_type = GRPC_CQ_NEXT;
268   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
269     attr.cq_polling_type = polling_types[i];
270     cc = grpc_completion_queue_create(
271         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
272 
273     grpc_completion_queue_shutdown(cc);
274     event = grpc_completion_queue_next(cc, gpr_inf_future(GPR_CLOCK_REALTIME),
275                                        nullptr);
276     GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
277     grpc_completion_queue_destroy(cc);
278   }
279 }
280 
test_pluck(void)281 static void test_pluck(void) {
282   grpc_event ev;
283   grpc_completion_queue* cc;
284   void* tags[128];
285   grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
286   grpc_cq_polling_type polling_types[] = {
287       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
288   grpc_completion_queue_attributes attr;
289   unsigned i, j;
290 
291   LOG_TEST("test_pluck");
292 
293   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
294     tags[i] = create_test_tag();
295     for (j = 0; j < i; j++) {
296       GPR_ASSERT(tags[i] != tags[j]);
297     }
298   }
299 
300   attr.version = 1;
301   attr.cq_completion_type = GRPC_CQ_PLUCK;
302   for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
303     grpc_core::ExecCtx exec_ctx;  // reset exec_ctx
304     attr.cq_polling_type = polling_types[pidx];
305     cc = grpc_completion_queue_create(
306         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
307 
308     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
309       GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
310       grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
311                      nullptr, &completions[i]);
312     }
313 
314     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
315       ev = grpc_completion_queue_pluck(
316           cc, tags[i], gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
317       GPR_ASSERT(ev.tag == tags[i]);
318     }
319 
320     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
321       GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
322       grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
323                      nullptr, &completions[i]);
324     }
325 
326     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
327       ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1],
328                                        gpr_inf_past(GPR_CLOCK_REALTIME),
329                                        nullptr);
330       GPR_ASSERT(ev.tag == tags[GPR_ARRAY_SIZE(tags) - i - 1]);
331     }
332 
333     shutdown_and_destroy(cc);
334   }
335 }
336 
test_pluck_after_shutdown(void)337 static void test_pluck_after_shutdown(void) {
338   grpc_cq_polling_type polling_types[] = {
339       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
340   grpc_event ev;
341   grpc_completion_queue* cc;
342   grpc_completion_queue_attributes attr;
343 
344   LOG_TEST("test_pluck_after_shutdown");
345 
346   attr.version = 1;
347   attr.cq_completion_type = GRPC_CQ_PLUCK;
348   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
349     attr.cq_polling_type = polling_types[i];
350     cc = grpc_completion_queue_create(
351         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
352     grpc_completion_queue_shutdown(cc);
353     ev = grpc_completion_queue_pluck(
354         cc, nullptr, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
355     GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
356     grpc_completion_queue_destroy(cc);
357   }
358 }
359 
test_callback(void)360 static void test_callback(void) {
361   grpc_completion_queue* cc;
362   void* tags[128];
363   grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
364   grpc_cq_polling_type polling_types[] = {
365       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
366   grpc_completion_queue_attributes attr;
367   unsigned i;
368 
369   LOG_TEST("test_callback");
370 
371   bool got_shutdown = false;
372   class ShutdownCallback : public grpc_experimental_completion_queue_functor {
373    public:
374     ShutdownCallback(bool* done) : done_(done) {
375       functor_run = &ShutdownCallback::Run;
376     }
377     ~ShutdownCallback() {}
378     static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
379       *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
380     }
381 
382    private:
383     bool* done_;
384   };
385   ShutdownCallback shutdown_cb(&got_shutdown);
386 
387   attr.version = 2;
388   attr.cq_completion_type = GRPC_CQ_CALLBACK;
389   attr.cq_shutdown_cb = &shutdown_cb;
390 
391   for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
392     grpc_core::ExecCtx exec_ctx;  // reset exec_ctx
393     attr.cq_polling_type = polling_types[pidx];
394     cc = grpc_completion_queue_create(
395         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
396 
397     int counter = 0;
398     class TagCallback : public grpc_experimental_completion_queue_functor {
399      public:
400       TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
401         functor_run = &TagCallback::Run;
402       }
403       ~TagCallback() {}
404       static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
405         GPR_ASSERT(static_cast<bool>(ok));
406         auto* callback = static_cast<TagCallback*>(cb);
407         *callback->counter_ += callback->tag_;
408         grpc_core::Delete(callback);
409       };
410 
411      private:
412       int* counter_;
413       int tag_;
414     };
415 
416     int sumtags = 0;
417     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
418       tags[i] = static_cast<void*>(grpc_core::New<TagCallback>(&counter, i));
419       sumtags += i;
420     }
421 
422     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
423       GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
424       grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
425                      nullptr, &completions[i]);
426     }
427 
428     GPR_ASSERT(sumtags == counter);
429 
430     shutdown_and_destroy(cc);
431 
432     GPR_ASSERT(got_shutdown);
433     got_shutdown = false;
434   }
435 }
436 
437 struct thread_state {
438   grpc_completion_queue* cc;
439   void* tag;
440 };
441 
main(int argc,char ** argv)442 int main(int argc, char** argv) {
443   grpc_test_init(argc, argv);
444   grpc_init();
445   test_no_op();
446   test_pollset_conversion();
447   test_wait_empty();
448   test_shutdown_then_next_polling();
449   test_shutdown_then_next_with_timeout();
450   test_cq_end_op();
451   test_pluck();
452   test_pluck_after_shutdown();
453   test_cq_tls_cache_full();
454   test_cq_tls_cache_empty();
455   test_callback();
456   grpc_shutdown();
457   return 0;
458 }
459