• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/surface/completion_queue.h"
20 
21 #include <grpc/grpc.h>
22 #include <grpc/support/sync.h>
23 #include <grpc/support/time.h>
24 #include <stddef.h>
25 
26 #include <memory>
27 
28 #include "absl/log/log.h"
29 #include "absl/status/status.h"
30 #include "gtest/gtest.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32 #include "src/core/util/useful.h"
33 #include "test/core/test_util/test_config.h"
34 
35 #define LOG_TEST(x) LOG(INFO) << x
36 
create_test_tag(void)37 static void* create_test_tag(void) {
38   static intptr_t i = 0;
39   return reinterpret_cast<void*>(++i);
40 }
41 
42 // helper for tests to shutdown correctly and tersely
shutdown_and_destroy(grpc_completion_queue * cc)43 static void shutdown_and_destroy(grpc_completion_queue* cc) {
44   grpc_event ev;
45   grpc_completion_queue_shutdown(cc);
46 
47   switch (grpc_get_cq_completion_type(cc)) {
48     case GRPC_CQ_NEXT: {
49       ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
50                                       nullptr);
51       ASSERT_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
52       break;
53     }
54     case GRPC_CQ_PLUCK: {
55       ev = grpc_completion_queue_pluck(
56           cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
57       ASSERT_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
58       break;
59     }
60     case GRPC_CQ_CALLBACK: {
61       // Nothing to do here. The shutdown callback will be invoked when
62       // possible.
63       break;
64     }
65     default: {
66       LOG(ERROR) << "Unknown completion type";
67       break;
68     }
69   }
70 
71   grpc_completion_queue_destroy(cc);
72 }
73 
74 // ensure we can create and destroy a completion channel
TEST(GrpcCompletionQueueTest,TestNoOp)75 TEST(GrpcCompletionQueueTest, TestNoOp) {
76   grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
77   grpc_cq_polling_type polling_types[] = {
78       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
79   grpc_completion_queue_attributes attr = {};
80   LOG_TEST("test_no_op");
81 
82   attr.version = 1;
83   for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
84     for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
85       attr.cq_completion_type = completion_types[i];
86       attr.cq_polling_type = polling_types[j];
87       shutdown_and_destroy(grpc_completion_queue_create(
88           grpc_completion_queue_factory_lookup(&attr), &attr, nullptr));
89     }
90   }
91 }
92 
TEST(GrpcCompletionQueueTest,TestPollsetConversion)93 TEST(GrpcCompletionQueueTest, TestPollsetConversion) {
94   grpc_cq_completion_type completion_types[] = {GRPC_CQ_NEXT, GRPC_CQ_PLUCK};
95   grpc_cq_polling_type polling_types[] = {GRPC_CQ_DEFAULT_POLLING,
96                                           GRPC_CQ_NON_LISTENING};
97   grpc_completion_queue* cq;
98   grpc_completion_queue_attributes attr = {};
99 
100   LOG_TEST("test_pollset_conversion");
101 
102   attr.version = 1;
103   for (size_t i = 0; i < GPR_ARRAY_SIZE(completion_types); i++) {
104     for (size_t j = 0; j < GPR_ARRAY_SIZE(polling_types); j++) {
105       attr.cq_completion_type = completion_types[i];
106       attr.cq_polling_type = polling_types[j];
107       cq = grpc_completion_queue_create(
108           grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
109       ASSERT_NE(grpc_cq_pollset(cq), nullptr);
110       shutdown_and_destroy(cq);
111     }
112   }
113 }
114 
TEST(GrpcCompletionQueueTest,TestWaitEmpty)115 TEST(GrpcCompletionQueueTest, TestWaitEmpty) {
116   grpc_cq_polling_type polling_types[] = {
117       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
118   grpc_completion_queue* cc;
119   grpc_completion_queue_attributes attr = {};
120   grpc_event event;
121 
122   LOG_TEST("test_wait_empty");
123 
124   attr.version = 1;
125   attr.cq_completion_type = GRPC_CQ_NEXT;
126   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
127     attr.cq_polling_type = polling_types[i];
128     cc = grpc_completion_queue_create(
129         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
130     event =
131         grpc_completion_queue_next(cc, gpr_now(GPR_CLOCK_REALTIME), nullptr);
132     ASSERT_EQ(event.type, GRPC_QUEUE_TIMEOUT);
133     shutdown_and_destroy(cc);
134   }
135 }
136 
do_nothing_end_completion(void *,grpc_cq_completion *)137 static void do_nothing_end_completion(void* /*arg*/,
138                                       grpc_cq_completion* /*c*/) {}
139 
TEST(GrpcCompletionQueueTest,TestCqEndOp)140 TEST(GrpcCompletionQueueTest, TestCqEndOp) {
141   grpc_event ev;
142   grpc_completion_queue* cc;
143   grpc_cq_completion completion;
144   grpc_cq_polling_type polling_types[] = {
145       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
146   grpc_completion_queue_attributes attr = {};
147   void* tag = create_test_tag();
148 
149   LOG_TEST("test_cq_end_op");
150 
151   attr.version = 1;
152   attr.cq_completion_type = GRPC_CQ_NEXT;
153   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
154     grpc_core::ExecCtx exec_ctx;
155     attr.cq_polling_type = polling_types[i];
156     cc = grpc_completion_queue_create(
157         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
158 
159     ASSERT_TRUE(grpc_cq_begin_op(cc, tag));
160     grpc_cq_end_op(cc, tag, absl::OkStatus(), do_nothing_end_completion,
161                    nullptr, &completion);
162 
163     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
164                                     nullptr);
165     ASSERT_EQ(ev.type, GRPC_OP_COMPLETE);
166     ASSERT_EQ(ev.tag, tag);
167     ASSERT_TRUE(ev.success);
168 
169     shutdown_and_destroy(cc);
170   }
171 }
172 
TEST(GrpcCompletionQueueTest,TestCqTlsCacheFull)173 TEST(GrpcCompletionQueueTest, TestCqTlsCacheFull) {
174   grpc_event ev;
175   grpc_completion_queue* cc;
176   grpc_cq_completion completion;
177   grpc_cq_polling_type polling_types[] = {
178       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
179   grpc_completion_queue_attributes attr = {};
180   void* tag = create_test_tag();
181   void* res_tag;
182   int ok;
183 
184   LOG_TEST("test_cq_tls_cache_full");
185 
186   attr.version = 1;
187   attr.cq_completion_type = GRPC_CQ_NEXT;
188   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
189     grpc_core::ExecCtx exec_ctx;  // Reset exec_ctx
190     attr.cq_polling_type = polling_types[i];
191     cc = grpc_completion_queue_create(
192         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
193 
194     grpc_completion_queue_thread_local_cache_init(cc);
195     ASSERT_TRUE(grpc_cq_begin_op(cc, tag));
196     grpc_cq_end_op(cc, tag, absl::OkStatus(), do_nothing_end_completion,
197                    nullptr, &completion);
198 
199     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
200                                     nullptr);
201     ASSERT_EQ(ev.type, GRPC_QUEUE_TIMEOUT);
202 
203     ASSERT_EQ(grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok),
204               1);
205     ASSERT_EQ(res_tag, tag);
206     ASSERT_TRUE(ok);
207 
208     ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
209                                     nullptr);
210     ASSERT_EQ(ev.type, GRPC_QUEUE_TIMEOUT);
211 
212     shutdown_and_destroy(cc);
213   }
214 }
215 
TEST(GrpcCompletionQueueTest,TestCqTlsCacheEmpty)216 TEST(GrpcCompletionQueueTest, TestCqTlsCacheEmpty) {
217   grpc_completion_queue* cc;
218   grpc_cq_polling_type polling_types[] = {
219       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
220   grpc_completion_queue_attributes attr = {};
221   void* res_tag;
222   int ok;
223 
224   LOG_TEST("test_cq_tls_cache_empty");
225 
226   attr.version = 1;
227   attr.cq_completion_type = GRPC_CQ_NEXT;
228   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
229     grpc_core::ExecCtx exec_ctx;  // Reset exec_ctx
230     attr.cq_polling_type = polling_types[i];
231     cc = grpc_completion_queue_create(
232         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
233 
234     ASSERT_EQ(grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok),
235               0);
236     grpc_completion_queue_thread_local_cache_init(cc);
237     ASSERT_EQ(grpc_completion_queue_thread_local_cache_flush(cc, &res_tag, &ok),
238               0);
239     shutdown_and_destroy(cc);
240   }
241 }
242 
TEST(GrpcCompletionQueueTest,TestShutdownThenNextPolling)243 TEST(GrpcCompletionQueueTest, TestShutdownThenNextPolling) {
244   grpc_cq_polling_type polling_types[] = {
245       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
246   grpc_completion_queue* cc;
247   grpc_completion_queue_attributes attr = {};
248   grpc_event event;
249   LOG_TEST("test_shutdown_then_next_polling");
250 
251   attr.version = 1;
252   attr.cq_completion_type = GRPC_CQ_NEXT;
253   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
254     attr.cq_polling_type = polling_types[i];
255     cc = grpc_completion_queue_create(
256         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
257     grpc_completion_queue_shutdown(cc);
258     event = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
259                                        nullptr);
260     ASSERT_EQ(event.type, GRPC_QUEUE_SHUTDOWN);
261     grpc_completion_queue_destroy(cc);
262   }
263 }
264 
TEST(GrpcCompletionQueueTest,TestShutdownThenNextWithTimeout)265 TEST(GrpcCompletionQueueTest, TestShutdownThenNextWithTimeout) {
266   grpc_cq_polling_type polling_types[] = {
267       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
268   grpc_completion_queue* cc;
269   grpc_completion_queue_attributes attr = {};
270   grpc_event event;
271   LOG_TEST("test_shutdown_then_next_with_timeout");
272 
273   attr.version = 1;
274   attr.cq_completion_type = GRPC_CQ_NEXT;
275   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
276     attr.cq_polling_type = polling_types[i];
277     cc = grpc_completion_queue_create(
278         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
279 
280     grpc_completion_queue_shutdown(cc);
281     event = grpc_completion_queue_next(cc, gpr_inf_future(GPR_CLOCK_REALTIME),
282                                        nullptr);
283     ASSERT_EQ(event.type, GRPC_QUEUE_SHUTDOWN);
284     grpc_completion_queue_destroy(cc);
285   }
286 }
287 
TEST(GrpcCompletionQueueTest,TestPluck)288 TEST(GrpcCompletionQueueTest, TestPluck) {
289   grpc_event ev;
290   grpc_completion_queue* cc;
291   void* tags[128];
292   grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
293   grpc_cq_polling_type polling_types[] = {
294       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
295   grpc_completion_queue_attributes attr = {};
296   unsigned i, j;
297 
298   LOG_TEST("test_pluck");
299 
300   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
301     tags[i] = create_test_tag();
302     for (j = 0; j < i; j++) {
303       ASSERT_NE(tags[i], tags[j]);
304     }
305   }
306 
307   attr.version = 1;
308   attr.cq_completion_type = GRPC_CQ_PLUCK;
309   for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
310     grpc_core::ExecCtx exec_ctx;  // reset exec_ctx
311     attr.cq_polling_type = polling_types[pidx];
312     cc = grpc_completion_queue_create(
313         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
314 
315     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
316       ASSERT_TRUE(grpc_cq_begin_op(cc, tags[i]));
317       grpc_cq_end_op(cc, tags[i], absl::OkStatus(), do_nothing_end_completion,
318                      nullptr, &completions[i]);
319     }
320 
321     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
322       ev = grpc_completion_queue_pluck(
323           cc, tags[i], gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
324       ASSERT_EQ(ev.tag, tags[i]);
325     }
326 
327     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
328       ASSERT_TRUE(grpc_cq_begin_op(cc, tags[i]));
329       grpc_cq_end_op(cc, tags[i], absl::OkStatus(), do_nothing_end_completion,
330                      nullptr, &completions[i]);
331     }
332 
333     for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
334       ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1],
335                                        gpr_inf_past(GPR_CLOCK_REALTIME),
336                                        nullptr);
337       ASSERT_EQ(ev.tag, tags[GPR_ARRAY_SIZE(tags) - i - 1]);
338     }
339 
340     shutdown_and_destroy(cc);
341   }
342 }
343 
TEST(GrpcCompletionQueueTest,TestPluckAfterShutdown)344 TEST(GrpcCompletionQueueTest, TestPluckAfterShutdown) {
345   grpc_cq_polling_type polling_types[] = {
346       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
347   grpc_event ev;
348   grpc_completion_queue* cc;
349   grpc_completion_queue_attributes attr = {};
350 
351   LOG_TEST("test_pluck_after_shutdown");
352 
353   attr.version = 1;
354   attr.cq_completion_type = GRPC_CQ_PLUCK;
355   for (size_t i = 0; i < GPR_ARRAY_SIZE(polling_types); i++) {
356     attr.cq_polling_type = polling_types[i];
357     cc = grpc_completion_queue_create(
358         grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
359     grpc_completion_queue_shutdown(cc);
360     ev = grpc_completion_queue_pluck(
361         cc, nullptr, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
362     ASSERT_EQ(ev.type, GRPC_QUEUE_SHUTDOWN);
363     grpc_completion_queue_destroy(cc);
364   }
365 }
366 
TEST(GrpcCompletionQueueTest,TestCallback)367 TEST(GrpcCompletionQueueTest, TestCallback) {
368   grpc_completion_queue* cc;
369   static void* tags[128];
370   grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
371   grpc_cq_polling_type polling_types[] = {
372       GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
373   grpc_completion_queue_attributes attr = {};
374   unsigned i;
375   static gpr_mu mu, shutdown_mu;
376   static gpr_cv cv, shutdown_cv;
377   static int cb_counter;
378   gpr_mu_init(&mu);
379   gpr_mu_init(&shutdown_mu);
380   gpr_cv_init(&cv);
381   gpr_cv_init(&shutdown_cv);
382 
383   LOG_TEST("test_callback");
384 
385   bool got_shutdown = false;
386   class ShutdownCallback : public grpc_completion_queue_functor {
387    public:
388     explicit ShutdownCallback(bool* done) : done_(done) {
389       functor_run = &ShutdownCallback::Run;
390       inlineable = false;
391     }
392     ~ShutdownCallback() {}
393     static void Run(grpc_completion_queue_functor* cb, int ok) {
394       gpr_mu_lock(&shutdown_mu);
395       *static_cast<ShutdownCallback*>(cb)->done_ = static_cast<bool>(ok);
396       // Signal when the shutdown callback is completed.
397       gpr_cv_signal(&shutdown_cv);
398       gpr_mu_unlock(&shutdown_mu);
399     }
400 
401    private:
402     bool* done_;
403   };
404   ShutdownCallback shutdown_cb(&got_shutdown);
405 
406   attr.version = 2;
407   attr.cq_completion_type = GRPC_CQ_CALLBACK;
408   attr.cq_shutdown_cb = &shutdown_cb;
409 
410   for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
411     int sumtags = 0;
412     int counter = 0;
413     cb_counter = 0;
414     {
415       // reset exec_ctx types
416       grpc_core::ExecCtx exec_ctx;
417       attr.cq_polling_type = polling_types[pidx];
418       cc = grpc_completion_queue_create(
419           grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
420 
421       class TagCallback : public grpc_completion_queue_functor {
422        public:
423         TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {
424           functor_run = &TagCallback::Run;
425           // Inlineable should be false since this callback takes locks.
426           inlineable = false;
427         }
428         ~TagCallback() {}
429         static void Run(grpc_completion_queue_functor* cb, int ok) {
430           ASSERT_TRUE(static_cast<bool>(ok));
431           auto* callback = static_cast<TagCallback*>(cb);
432           gpr_mu_lock(&mu);
433           cb_counter++;
434           *callback->counter_ += callback->tag_;
435           if (cb_counter == GPR_ARRAY_SIZE(tags)) {
436             gpr_cv_signal(&cv);
437           }
438           gpr_mu_unlock(&mu);
439           delete callback;
440         };
441 
442        private:
443         int* counter_;
444         int tag_;
445       };
446 
447       for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
448         tags[i] = static_cast<void*>(new TagCallback(&counter, i));
449         sumtags += i;
450       }
451 
452       for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
453         ASSERT_TRUE(grpc_cq_begin_op(cc, tags[i]));
454         grpc_cq_end_op(cc, tags[i], absl::OkStatus(), do_nothing_end_completion,
455                        nullptr, &completions[i]);
456       }
457 
458       gpr_mu_lock(&mu);
459       while (cb_counter != GPR_ARRAY_SIZE(tags)) {
460         // Wait for all the callbacks to complete.
461         gpr_cv_wait(&cv, &mu, gpr_inf_future(GPR_CLOCK_REALTIME));
462       }
463       gpr_mu_unlock(&mu);
464 
465       shutdown_and_destroy(cc);
466 
467       gpr_mu_lock(&shutdown_mu);
468       while (!got_shutdown) {
469         // Wait for the shutdown callback to complete.
470         gpr_cv_wait(&shutdown_cv, &shutdown_mu,
471                     gpr_inf_future(GPR_CLOCK_REALTIME));
472       }
473       gpr_mu_unlock(&shutdown_mu);
474     }
475 
476     // Run the assertions to check if the test ran successfully.
477     ASSERT_EQ(sumtags, counter);
478     ASSERT_TRUE(got_shutdown);
479     got_shutdown = false;
480   }
481 
482   gpr_cv_destroy(&cv);
483   gpr_cv_destroy(&shutdown_cv);
484   gpr_mu_destroy(&mu);
485   gpr_mu_destroy(&shutdown_mu);
486 }
487 
488 struct thread_state {
489   grpc_completion_queue* cc;
490   void* tag;
491 };
492 
main(int argc,char ** argv)493 int main(int argc, char** argv) {
494   grpc::testing::TestEnvironment env(&argc, argv);
495   ::testing::InitGoogleTest(&argc, argv);
496   grpc::testing::TestGrpcScope grpc_scope;
497   return RUN_ALL_TESTS();
498 }
499