1 /*
2 * Copyright 2015 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18 #include <grpcpp/completion_queue.h>
19
20 #include <memory>
21
22 #include <grpc/grpc.h>
23 #include <grpc/support/log.h>
24 #include <grpcpp/impl/grpc_library.h>
25 #include <grpcpp/support/time.h>
26
27 namespace grpc {
28
29 static internal::GrpcLibraryInitializer g_gli_initializer;
30
31 // 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here
32 // i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create
33 // a 'grpc_completion_queue' instance (which is being passed as the input to
34 // this constructor), one must have already called grpc_init().
CompletionQueue(grpc_completion_queue * take)35 CompletionQueue::CompletionQueue(grpc_completion_queue* take)
36 : GrpcLibraryCodegen(false), cq_(take) {
37 InitialAvalanching();
38 }
39
Shutdown()40 void CompletionQueue::Shutdown() {
41 g_gli_initializer.summon();
42 #ifndef NDEBUG
43 if (!ServerListEmpty()) {
44 gpr_log(GPR_ERROR,
45 "CompletionQueue shutdown being shutdown before its server.");
46 }
47 #endif
48 CompleteAvalanching();
49 }
50
AsyncNextInternal(void ** tag,bool * ok,gpr_timespec deadline)51 CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
52 void** tag, bool* ok, gpr_timespec deadline) {
53 for (;;) {
54 auto ev = grpc_completion_queue_next(cq_, deadline, nullptr);
55 switch (ev.type) {
56 case GRPC_QUEUE_TIMEOUT:
57 return TIMEOUT;
58 case GRPC_QUEUE_SHUTDOWN:
59 return SHUTDOWN;
60 case GRPC_OP_COMPLETE:
61 auto core_cq_tag =
62 static_cast<::grpc::internal::CompletionQueueTag*>(ev.tag);
63 *ok = ev.success != 0;
64 *tag = core_cq_tag;
65 if (core_cq_tag->FinalizeResult(tag, ok)) {
66 return GOT_EVENT;
67 }
68 break;
69 }
70 }
71 }
72
CompletionQueueTLSCache(CompletionQueue * cq)73 CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache(
74 CompletionQueue* cq)
75 : cq_(cq), flushed_(false) {
76 grpc_completion_queue_thread_local_cache_init(cq_->cq_);
77 }
78
~CompletionQueueTLSCache()79 CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache() {
80 GPR_ASSERT(flushed_);
81 }
82
Flush(void ** tag,bool * ok)83 bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
84 int res = 0;
85 void* res_tag;
86 flushed_ = true;
87 if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
88 &res)) {
89 auto core_cq_tag =
90 static_cast<::grpc::internal::CompletionQueueTag*>(res_tag);
91 *ok = res == 1;
92 if (core_cq_tag->FinalizeResult(tag, ok)) {
93 return true;
94 }
95 }
96 return false;
97 }
98
99 } // namespace grpc
100