• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 gRPC authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 
18 #include <grpcpp/completion_queue.h>
19 
20 #include <memory>
21 
22 #include <grpc/grpc.h>
23 #include <grpc/support/log.h>
24 #include <grpcpp/impl/grpc_library.h>
25 #include <grpcpp/support/time.h>
26 
27 namespace grpc {
28 
29 static internal::GrpcLibraryInitializer g_gli_initializer;
30 
31 // 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here
32 // i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create
33 // a 'grpc_completion_queue' instance (which is being passed as the input to
34 // this constructor), one must have already called grpc_init().
CompletionQueue(grpc_completion_queue * take)35 CompletionQueue::CompletionQueue(grpc_completion_queue* take)
36     : GrpcLibraryCodegen(false), cq_(take) {
37   InitialAvalanching();
38 }
39 
Shutdown()40 void CompletionQueue::Shutdown() {
41   g_gli_initializer.summon();
42 #ifndef NDEBUG
43   if (!ServerListEmpty()) {
44     gpr_log(GPR_ERROR,
45             "CompletionQueue shutdown being shutdown before its server.");
46   }
47 #endif
48   CompleteAvalanching();
49 }
50 
AsyncNextInternal(void ** tag,bool * ok,gpr_timespec deadline)51 CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
52     void** tag, bool* ok, gpr_timespec deadline) {
53   for (;;) {
54     auto ev = grpc_completion_queue_next(cq_, deadline, nullptr);
55     switch (ev.type) {
56       case GRPC_QUEUE_TIMEOUT:
57         return TIMEOUT;
58       case GRPC_QUEUE_SHUTDOWN:
59         return SHUTDOWN;
60       case GRPC_OP_COMPLETE:
61         auto core_cq_tag =
62             static_cast<::grpc::internal::CompletionQueueTag*>(ev.tag);
63         *ok = ev.success != 0;
64         *tag = core_cq_tag;
65         if (core_cq_tag->FinalizeResult(tag, ok)) {
66           return GOT_EVENT;
67         }
68         break;
69     }
70   }
71 }
72 
CompletionQueueTLSCache(CompletionQueue * cq)73 CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache(
74     CompletionQueue* cq)
75     : cq_(cq), flushed_(false) {
76   grpc_completion_queue_thread_local_cache_init(cq_->cq_);
77 }
78 
~CompletionQueueTLSCache()79 CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache() {
80   GPR_ASSERT(flushed_);
81 }
82 
Flush(void ** tag,bool * ok)83 bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
84   int res = 0;
85   void* res_tag;
86   flushed_ = true;
87   if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
88                                                      &res)) {
89     auto core_cq_tag =
90         static_cast<::grpc::internal::CompletionQueueTag*>(res_tag);
91     *ok = res == 1;
92     if (core_cq_tag->FinalizeResult(tag, ok)) {
93       return true;
94     }
95   }
96   return false;
97 }
98 
99 }  // namespace grpc
100