• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "src/core/lib/surface/completion_queue.h"
20 
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/log.h>
23 #include <grpc/support/time.h>
24 #include "src/core/lib/gpr/useful.h"
25 #include "src/core/lib/gprpp/memory.h"
26 #include "src/core/lib/gprpp/sync.h"
27 #include "src/core/lib/iomgr/iomgr.h"
28 #include "test/core/util/test_config.h"
29 
30 #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
31 
create_test_tag(void)32 static void* create_test_tag(void) {
33   static intptr_t i = 0;
34   return (void*)(++i);
35 }
36 
37 /* helper for tests to shutdown correctly and tersely */
shutdown_and_destroy(grpc_completion_queue * cc)38 static void shutdown_and_destroy(grpc_completion_queue* cc) {
39   grpc_event ev;
40   grpc_completion_queue_shutdown(cc);
41 
42   switch (grpc_get_cq_completion_type(cc)) {
43     case GRPC_CQ_NEXT: {
44       ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
45                                       nullptr);
46       GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
47       break;
48     }
49     case GRPC_CQ_PLUCK: {
50       ev = grpc_completion_queue_pluck(
51           cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
52       GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
53       break;
54     }
55     case GRPC_CQ_CALLBACK: {
56       // Nothing to do here. The shutdown callback will be invoked when
57       // possible.
58       break;
59     }
60     default: {
61       gpr_log(GPR_ERROR, "Unknown completion type");
62       break;
63     }
64   }
65 
66   grpc_completion_queue_destroy(cc);
67 }
68 
69 /* ensure we can create and destroy a completion channel */
test_no_op(void)70 static void test_no_op(void) {
71   grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
72   grpc_cq_polling_type polling_types[] = {
73       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
74   grpc_completion_queue_attributes attr;
75   LOG_TEST("test_no_op");
76 
77   attr.version = 1;
78   for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
79     for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
80       attr.cq_completion_type = completion_types[i];
81       attr.cq_polling_type = polling_types[j];
82       shutdown_and_destroy(grpc_completion_queue_create(
83           grpc_completion_queue_factory_lookup(&attr), &attr, nullptr));
84     }
85   }
86 }
87 
test_pollset_conversion(void)88 static void test_pollset_conversion(void) {
89   grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
90   grpc_cq_polling_type polling_types[] = {GRPC_CQ_DEFAULT_POLLING,
91                                           GRPC_CQ_NON_LISTENING};
92   grpc_completion_queue* cq;
93   grpc_completion_queue_attributes attr;
94 
95   LOG_TEST("test_pollset_conversion");
96 
97   attr.version = 1;
98   for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
99     for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
100       attr.cq_completion_type = completion_types[i];
101       attr.cq_polling_type = polling_types[j];
102       cq = grpc_completion_queue_create(
103           grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
104       GPR_ASSERT(grpc_cq_pollset(cq) != nullptr);
105       shutdown_and_destroy(cq);
106     }
107   }
108 }
109 
test_wait_empty(void)110 static void test_wait_empty(void) {
111   grpc_cq_polling_type polling_types[] = {
112       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
113   grpc_completion_queue* cc;
114   grpc_completion_queue_attributes attr;
115   grpc_event event;
116 
117   LOG_TEST("test_wait_empty");
118 
119   attr.version = 1;
120   attr.cq_completion_type = GRPC_CQ_NEXT;
121   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
122     attr.cq_polling_type = polling_types[i];
123     cc = grpc_completion_queue_create(
124         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
125     event =
126         grpc_completion_queue_next(cc, gpr_now(GPR_CLOCK_REALTIME), nullptr);
127     GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
128     shutdown_and_destroy(cc);
129   }
130 }
131 
do_nothing_end_completion(void *,grpc_cq_completion *)132 static void do_nothing_end_completion(void* /*arg*/,
133                                       grpc_cq_completion* /*c*/) {}
134 
test_cq_end_op(void)135 static void test_cq_end_op(void) {
136   grpc_event ev;
137   grpc_completion_queue* cc;
138   grpc_cq_completion completion;
139   grpc_cq_polling_type polling_types[] = {
140       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
141   grpc_completion_queue_attributes attr;
142   void* tag = create_test_tag();
143 
144   LOG_TEST("test_cq_end_op");
145 
146   attr.version = 1;
147   attr.cq_completion_type = GRPC_CQ_NEXT;
148   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
149     grpc_core::ExecCtx exec_ctx;
150     attr.cq_polling_type = polling_types[i];
151     cc = grpc_completion_queue_create(
152         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
153 
154     GPR_ASSERT(grpc_cq_begin_op(cc, tag));
155     grpc_cq_end_op(cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, nullptr,
156                    &completion);
157 
158     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
159                                     nullptr);
160     GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
161     GPR_ASSERT(ev.tag == tag);
162     GPR_ASSERT(ev.success);
163 
164     shutdown_and_destroy(cc);
165   }
166 }
167 
test_cq_tls_cache_full(void)168 static void test_cq_tls_cache_full(void) {
169   grpc_event ev;
170   grpc_completion_queue* cc;
171   grpc_cq_completion completion;
172   grpc_cq_polling_type polling_types[] = {
173       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
174   grpc_completion_queue_attributes attr;
175   void* tag = create_test_tag();
176   void* res_tag;
177   int ok;
178 
179   LOG_TEST("test_cq_tls_cache_full");
180 
181   attr.version = 1;
182   attr.cq_completion_type = GRPC_CQ_NEXT;
183   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
184     grpc_core::ExecCtx exec_ctx;  // Reset exec_ctx
185     attr.cq_polling_type = polling_types[i];
186     cc = grpc_completion_queue_create(
187         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
188 
189     grpc_completion_queue_thread_local_cache_init(cc);
190     GPR_ASSERT(grpc_cq_begin_op(cc, tag));
191     grpc_cq_end_op(cc, tag, GRPC_ERROR_NONE, do_nothing_end_completion, nullptr,
192                    &completion);
193 
194     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
195                                     nullptr);
196     GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
197 
198     GPR_ASSERT(
199         grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 1);
200     GPR_ASSERT(res_tag == tag);
201     GPR_ASSERT(ok);
202 
203     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
204                                     nullptr);
205     GPR_ASSERT(ev.type == GRPC_QUEUE_TIMEOUT);
206 
207     shutdown_and_destroy(cc);
208   }
209 }
210 
test_cq_tls_cache_empty(void)211 static void test_cq_tls_cache_empty(void) {
212   grpc_completion_queue* cc;
213   grpc_cq_polling_type polling_types[] = {
214       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
215   grpc_completion_queue_attributes attr;
216   void* res_tag;
217   int ok;
218 
219   LOG_TEST("test_cq_tls_cache_empty");
220 
221   attr.version = 1;
222   attr.cq_completion_type = GRPC_CQ_NEXT;
223   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
224     grpc_core::ExecCtx exec_ctx;  // Reset exec_ctx
225     attr.cq_polling_type = polling_types[i];
226     cc = grpc_completion_queue_create(
227         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
228 
229     GPR_ASSERT(
230         grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
231     grpc_completion_queue_thread_local_cache_init(cc);
232     GPR_ASSERT(
233         grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok) == 0);
234     shutdown_and_destroy(cc);
235   }
236 }
237 
test_shutdown_then_next_polling(void)238 static void test_shutdown_then_next_polling(void) {
239   grpc_cq_polling_type polling_types[] = {
240       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
241   grpc_completion_queue* cc;
242   grpc_completion_queue_attributes attr;
243   grpc_event event;
244   LOG_TEST("test_shutdown_then_next_polling");
245 
246   attr.version = 1;
247   attr.cq_completion_type = GRPC_CQ_NEXT;
248   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
249     attr.cq_polling_type = polling_types[i];
250     cc = grpc_completion_queue_create(
251         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
252     grpc_completion_queue_shutdown(cc);
253     event = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
254                                        nullptr);
255     GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
256     grpc_completion_queue_destroy(cc);
257   }
258 }
259 
test_shutdown_then_next_with_timeout(void)260 static void test_shutdown_then_next_with_timeout(void) {
261   grpc_cq_polling_type polling_types[] = {
262       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
263   grpc_completion_queue* cc;
264   grpc_completion_queue_attributes attr;
265   grpc_event event;
266   LOG_TEST("test_shutdown_then_next_with_timeout");
267 
268   attr.version = 1;
269   attr.cq_completion_type = GRPC_CQ_NEXT;
270   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
271     attr.cq_polling_type = polling_types[i];
272     cc = grpc_completion_queue_create(
273         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
274 
275     grpc_completion_queue_shutdown(cc);
276     event = grpc_completion_queue_next(cc, gpr_inf_future(GPR_CLOCK_REALTIME),
277                                        nullptr);
278     GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
279     grpc_completion_queue_destroy(cc);
280   }
281 }
282 
test_pluck(void)283 static void test_pluck(void) {
284   grpc_event ev;
285   grpc_completion_queue* cc;
286   void* tags[128];
287   grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
288   grpc_cq_polling_type polling_types[] = {
289       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
290   grpc_completion_queue_attributes attr;
291   unsigned i, j;
292 
293   LOG_TEST("test_pluck");
294 
295   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
296     tags[i] = create_test_tag();
297     for (j = 0; j < i; j++) {
298       GPR_ASSERT(tags[i] != tags[j]);
299     }
300   }
301 
302   attr.version = 1;
303   attr.cq_completion_type = GRPC_CQ_PLUCK;
304   for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
305     grpc_core::ExecCtx exec_ctx;  // reset exec_ctx
306     attr.cq_polling_type = polling_types[pidx];
307     cc = grpc_completion_queue_create(
308         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
309 
310     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
311       GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
312       grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
313                      nullptr, &completions[i]);
314     }
315 
316     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
317       ev = grpc_completion_queue_pluck(
318           cc, tags[i], gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
319       GPR_ASSERT(ev.tag == tags[i]);
320     }
321 
322     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
323       GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
324       grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
325                      nullptr, &completions[i]);
326     }
327 
328     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
329       ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1],
330                                        gpr_inf_past(GPR_CLOCK_REALTIME),
331                                        nullptr);
332       GPR_ASSERT(ev.tag == tags[GPR_ARRAY_SIZE(tags) - i - 1]);
333     }
334 
335     shutdown_and_destroy(cc);
336   }
337 }
338 
test_pluck_after_shutdown(void)339 static void test_pluck_after_shutdown(void) {
340   grpc_cq_polling_type polling_types[] = {
341       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
342   grpc_event ev;
343   grpc_completion_queue* cc;
344   grpc_completion_queue_attributes attr;
345 
346   LOG_TEST("test_pluck_after_shutdown");
347 
348   attr.version = 1;
349   attr.cq_completion_type = GRPC_CQ_PLUCK;
350   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
351     attr.cq_polling_type = polling_types[i];
352     cc = grpc_completion_queue_create(
353         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
354     grpc_completion_queue_shutdown(cc);
355     ev = grpc_completion_queue_pluck(
356         cc, nullptr, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
357     GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
358     grpc_completion_queue_destroy(cc);
359   }
360 }
361 
test_callback(void)362 static void test_callback(void) {
363   grpc_completion_queue* cc;
364   static void* tags[128];
365   grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
366   grpc_cq_polling_type polling_types[] = {
367       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
368   grpc_completion_queue_attributes attr;
369   unsigned i;
370   static gpr_mu mu, shutdown_mu;
371   static gpr_cv cv, shutdown_cv;
372   static int cb_counter;
373   gpr_mu_init(&mu);
374   gpr_mu_init(&shutdown_mu);
375   gpr_cv_init(&cv);
376   gpr_cv_init(&shutdown_cv);
377 
378   LOG_TEST("test_callback");
379 
380   bool got_shutdown = false;
381   class ShutdownCallback : public grpc_experimental_completion_queue_functor {
382    public:
383     ShutdownCallback(bool* done) : done_(done) {
384       functor_run = &ShutdownCallback::Run;
385       inlineable = false;
386     }
387     ~ShutdownCallback() {}
388     static void Run(grpc_experimental_completion_queue_functor* cb, int ok) {
389       gpr_mu_lock(&shutdown_mu);
390       *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
391       // Signal when the shutdown callback is completed.
392       gpr_cv_signal(&shutdown_cv);
393       gpr_mu_unlock(&shutdown_mu);
394     }
395 
396    private:
397     bool* done_;
398   };
399   ShutdownCallback shutdown_cb(&got_shutdown);
400 
401   attr.version = 2;
402   attr.cq_completion_type = GRPC_CQ_CALLBACK;
403   attr.cq_shutdown_cb = &shutdown_cb;
404 
405   for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
406     int sumtags = 0;
407     int counter = 0;
408     cb_counter = 0;
409     {
410       // reset exec_ctx types
411       grpc_core::ExecCtx exec_ctx;
412       attr.cq_polling_type = polling_types[pidx];
413       cc = grpc_completion_queue_create(
414           grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
415 
416       class TagCallback : public grpc_experimental_completion_queue_functor {
417        public:
418         TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
419           functor_run = &TagCallback::Run;
420           // Inlineable should be false since this callback takes locks.
421           inlineable = false;
422         }
423         ~TagCallback() {}
424         static void Run(grpc_experimental_completion_queue_functor* cb,
425                         int ok) {
426           GPR_ASSERT(static_cast<bool>(ok));
427           auto* callback = static_cast<TagCallback*>(cb);
428           gpr_mu_lock(&mu);
429           cb_counter++;
430           *callback->counter_ += callback->tag_;
431           if (cb_counter == GPR_ARRAY_SIZE(tags)) {
432             gpr_cv_signal(&cv);
433           }
434           gpr_mu_unlock(&mu);
435           delete callback;
436         };
437 
438        private:
439         int* counter_;
440         int tag_;
441       };
442 
443       for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
444         tags[i] = static_cast<void*>(new TagCallback(&counter, i));
445         sumtags += i;
446       }
447 
448       for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
449         GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
450         grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
451                        nullptr, &completions[i]);
452       }
453 
454       gpr_mu_lock(&mu);
455       while (cb_counter != GPR_ARRAY_SIZE(tags)) {
456         // Wait for all the callbacks to complete.
457         gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
458       }
459       gpr_mu_unlock(&mu);
460 
461       shutdown_and_destroy(cc);
462 
463       gpr_mu_lock(&shutdown_mu);
464       while (!got_shutdown) {
465         // Wait for the shutdown callback to complete.
466         gpr_cv_wait(&shutdown_cv, &shutdown_mu,
467                     gpr_inf_future(GPR_CLOCK_REALTIME));
468       }
469       gpr_mu_unlock(&shutdown_mu);
470     }
471 
472     // Run the assertions to check if the test ran successfully.
473     GPR_ASSERT(sumtags == counter);
474     GPR_ASSERT(got_shutdown);
475     got_shutdown = false;
476   }
477 
478   gpr_cv_destroy(&cv);
479   gpr_cv_destroy(&shutdown_cv);
480   gpr_mu_destroy(&mu);
481   gpr_mu_destroy(&shutdown_mu);
482 }
483 
484 struct thread_state {
485   grpc_completion_queue* cc;
486   void* tag;
487 };
488 
main(int argc,char ** argv)489 int main(int argc, char** argv) {
490   grpc::testing::TestEnvironment env(argc, argv);
491   grpc_init();
492   test_no_op();
493   test_pollset_conversion();
494   test_wait_empty();
495   test_shutdown_then_next_polling();
496   test_shutdown_then_next_with_timeout();
497   test_cq_end_op();
498   test_pluck();
499   test_pluck_after_shutdown();
500   test_cq_tls_cache_full();
501   test_cq_tls_cache_empty();
502   test_callback();
503   grpc_shutdown();
504   return 0;
505 }
506