• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 gRPC authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 
18 #include <grpcpp/completion_queue.h>
19 
20 #include <memory>
21 
22 #include <grpc/grpc.h>
23 #include <grpc/support/log.h>
24 #include <grpcpp/impl/grpc_library.h>
25 #include <grpcpp/support/time.h>
26 
27 namespace grpc {
28 
29 static internal::GrpcLibraryInitializer g_gli_initializer;
30 
31 // 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here
32 // i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create
33 // a 'grpc_completion_queue' instance (which is being passed as the input to
34 // this constructor), one must have already called grpc_init().
CompletionQueue(grpc_completion_queue * take)35 CompletionQueue::CompletionQueue(grpc_completion_queue* take)
36     : GrpcLibraryCodegen(false), cq_(take) {
37   InitialAvalanching();
38 }
39 
Shutdown()40 void CompletionQueue::Shutdown() {
41   g_gli_initializer.summon();
42   CompleteAvalanching();
43 }
44 
CompleteAvalanching()45 void CompletionQueue::CompleteAvalanching() {
46   // Check if this was the last avalanching operation
47   if (gpr_atm_no_barrier_fetch_add(&avalanches_in_flight_,
48                                    static_cast<gpr_atm>(-1)) == 1) {
49     grpc_completion_queue_shutdown(cq_);
50   }
51 }
52 
AsyncNextInternal(void ** tag,bool * ok,gpr_timespec deadline)53 CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
54     void** tag, bool* ok, gpr_timespec deadline) {
55   for (;;) {
56     auto ev = grpc_completion_queue_next(cq_, deadline, nullptr);
57     switch (ev.type) {
58       case GRPC_QUEUE_TIMEOUT:
59         return TIMEOUT;
60       case GRPC_QUEUE_SHUTDOWN:
61         return SHUTDOWN;
62       case GRPC_OP_COMPLETE:
63         auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag);
64         *ok = ev.success != 0;
65         *tag = cq_tag;
66         if (cq_tag->FinalizeResult(tag, ok)) {
67           return GOT_EVENT;
68         }
69         break;
70     }
71   }
72 }
73 
CompletionQueueTLSCache(CompletionQueue * cq)74 CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache(
75     CompletionQueue* cq)
76     : cq_(cq), flushed_(false) {
77   grpc_completion_queue_thread_local_cache_init(cq_->cq_);
78 }
79 
~CompletionQueueTLSCache()80 CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache() {
81   GPR_ASSERT(flushed_);
82 }
83 
Flush(void ** tag,bool * ok)84 bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
85   int res = 0;
86   void* res_tag;
87   flushed_ = true;
88   if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
89                                                      &res)) {
90     auto cq_tag = static_cast<internal::CompletionQueueTag*>(res_tag);
91     *ok = res == 1;
92     if (cq_tag->FinalizeResult(tag, ok)) {
93       return true;
94     }
95   }
96   return false;
97 }
98 
99 }  // namespace grpc
100