1 /*
2 * Copyright 2015 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18 #include <grpcpp/completion_queue.h>
19
20 #include <memory>
21
22 #include <grpc/grpc.h>
23 #include <grpc/support/log.h>
24 #include <grpcpp/impl/grpc_library.h>
25 #include <grpcpp/support/time.h>
26
27 namespace grpc {
28
29 static internal::GrpcLibraryInitializer g_gli_initializer;
30
31 // 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here
32 // i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create
33 // a 'grpc_completion_queue' instance (which is being passed as the input to
34 // this constructor), one must have already called grpc_init().
CompletionQueue(grpc_completion_queue * take)35 CompletionQueue::CompletionQueue(grpc_completion_queue* take)
36 : GrpcLibraryCodegen(false), cq_(take) {
37 InitialAvalanching();
38 }
39
Shutdown()40 void CompletionQueue::Shutdown() {
41 g_gli_initializer.summon();
42 CompleteAvalanching();
43 }
44
CompleteAvalanching()45 void CompletionQueue::CompleteAvalanching() {
46 // Check if this was the last avalanching operation
47 if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_,
48 static_cast<gpr_atm>(-1)) == 1) {
49 grpc_completion_queue_shutdown(cq_);
50 }
51 }
52
AsyncNextInternal(void ** tag,bool * ok,gpr_timespec deadline)53 CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
54 void** tag, bool* ok, gpr_timespec deadline) {
55 for (;;) {
56 auto ev = grpc_completion_queue_next(cq_, deadline, nullptr);
57 switch (ev.type) {
58 case GRPC_QUEUE_TIMEOUT:
59 return TIMEOUT;
60 case GRPC_QUEUE_SHUTDOWN:
61 return SHUTDOWN;
62 case GRPC_OP_COMPLETE:
63 auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag);
64 *ok = ev.success != 0;
65 *tag = cq_tag;
66 if (cq_tag->FinalizeResult(tag, ok)) {
67 return GOT_EVENT;
68 }
69 break;
70 }
71 }
72 }
73
CompletionQueueTLSCache(CompletionQueue * cq)74 CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache(
75 CompletionQueue* cq)
76 : cq_(cq), flushed_(false) {
77 grpc_completion_queue_thread_local_cache_init(cq_->cq_);
78 }
79
~CompletionQueueTLSCache()80 CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache() {
81 GPR_ASSERT(flushed_);
82 }
83
Flush(void ** tag,bool * ok)84 bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
85 int res = 0;
86 void* res_tag;
87 flushed_ = true;
88 if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
89 &res)) {
90 auto cq_tag = static_cast<internal::CompletionQueueTag*>(res_tag);
91 *ok = res == 1;
92 if (cq_tag->FinalizeResult(tag, ok)) {
93 return true;
94 }
95 }
96 return false;
97 }
98
99 } // namespace grpc
100