• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 //
17 
18 #include <grpc/grpc.h>
19 #include <grpc/support/cpu.h>
20 #include <grpc/support/sync.h>
21 #include <grpc/support/time.h>
22 #include <grpcpp/completion_queue.h>
23 #include <grpcpp/impl/completion_queue_tag.h>
24 #include <grpcpp/impl/grpc_library.h>
25 
26 #include <vector>
27 
28 #include "absl/base/thread_annotations.h"
29 #include "absl/log/check.h"
30 #include "absl/log/log.h"
31 #include "src/core/lib/experiments/experiments.h"
32 #include "src/core/util/crash.h"
33 #include "src/core/util/sync.h"
34 #include "src/core/util/thd.h"
35 #include "src/core/util/useful.h"
36 
37 namespace grpc {
38 namespace {
39 
40 gpr_once g_once_init_callback_alternative = GPR_ONCE_INIT;
41 grpc_core::Mutex* g_callback_alternative_mu;
42 
43 // Implement a ref-counted callback CQ for global use in the alternative
44 // implementation so that its threads are only created once. Do this using
45 // explicit ref-counts and raw pointers rather than a shared-ptr since that
46 // has a non-trivial destructor and thus can't be used for global variables.
47 struct CallbackAlternativeCQ {
48   int refs ABSL_GUARDED_BY(g_callback_alternative_mu) = 0;
49   CompletionQueue* cq ABSL_GUARDED_BY(g_callback_alternative_mu);
50   std::vector<grpc_core::Thread>* nexting_threads
51       ABSL_GUARDED_BY(g_callback_alternative_mu);
52 
Refgrpc::__anond77879d70111::CallbackAlternativeCQ53   CompletionQueue* Ref() {
54     grpc_core::MutexLock lock(&*g_callback_alternative_mu);
55     refs++;
56     if (refs == 1) {
57       cq = new CompletionQueue;
58       int num_nexting_threads =
59           grpc_core::Clamp(gpr_cpu_num_cores() / 2, 2u, 16u);
60       nexting_threads = new std::vector<grpc_core::Thread>;
61       for (int i = 0; i < num_nexting_threads; i++) {
62         nexting_threads->emplace_back(
63             "nexting_thread",
64             [](void* arg) {
65               grpc_completion_queue* cq =
66                   static_cast<CompletionQueue*>(arg)->cq();
67               while (true) {
68                 // Use the raw Core next function rather than the C++ Next since
69                 // Next incorporates FinalizeResult and we actually want that
70                 // called from the callback functor itself.
71                 // TODO(vjpai): Migrate below to next without a timeout or idle
72                 // phase. That's currently starving out some other polling,
73                 // though.
74                 auto ev = grpc_completion_queue_next(
75                     cq,
76                     gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
77                                  gpr_time_from_millis(1000, GPR_TIMESPAN)),
78                     nullptr);
79                 if (ev.type == GRPC_QUEUE_SHUTDOWN) {
80                   return;
81                 }
82                 if (ev.type == GRPC_QUEUE_TIMEOUT) {
83                   gpr_sleep_until(
84                       gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
85                                    gpr_time_from_millis(100, GPR_TIMESPAN)));
86                   continue;
87                 }
88                 DCHECK(ev.type == GRPC_OP_COMPLETE);
89                 // We can always execute the callback inline rather than
90                 // pushing it to another Executor thread because this
91                 // thread is definitely running on a background thread, does not
92                 // hold any application locks before executing the callback,
93                 // and cannot be entered recursively.
94                 auto* functor =
95                     static_cast<grpc_completion_queue_functor*>(ev.tag);
96                 functor->functor_run(functor, ev.success);
97               }
98             },
99             cq);
100       }
101       for (auto& th : *nexting_threads) {
102         th.Start();
103       }
104     }
105     return cq;
106   }
107 
Unrefgrpc::__anond77879d70111::CallbackAlternativeCQ108   void Unref() {
109     grpc_core::MutexLock lock(g_callback_alternative_mu);
110     refs--;
111     if (refs == 0) {
112       cq->Shutdown();
113       for (auto& th : *nexting_threads) {
114         th.Join();
115       }
116       delete nexting_threads;
117       delete cq;
118     }
119   }
120 };
121 
122 CallbackAlternativeCQ g_callback_alternative_cq;
123 
124 }  // namespace
125 
126 // 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here
127 // i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create
128 // a 'grpc_completion_queue' instance (which is being passed as the input to
129 // this constructor), one must have already called grpc_init().
CompletionQueue(grpc_completion_queue * take)130 CompletionQueue::CompletionQueue(grpc_completion_queue* take)
131     : GrpcLibrary(false), cq_(take) {
132   InitialAvalanching();
133 }
134 
Shutdown()135 void CompletionQueue::Shutdown() {
136 #ifndef NDEBUG
137   if (!ServerListEmpty()) {
138     LOG(ERROR) << "CompletionQueue shutdown being shutdown before its server.";
139   }
140 #endif
141   CompleteAvalanching();
142 }
143 
AsyncNextInternal(void ** tag,bool * ok,gpr_timespec deadline)144 CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
145     void** tag, bool* ok, gpr_timespec deadline) {
146   for (;;) {
147     auto ev = grpc_completion_queue_next(cq_, deadline, nullptr);
148     switch (ev.type) {
149       case GRPC_QUEUE_TIMEOUT:
150         return TIMEOUT;
151       case GRPC_QUEUE_SHUTDOWN:
152         return SHUTDOWN;
153       case GRPC_OP_COMPLETE:
154         auto core_cq_tag =
155             static_cast<grpc::internal::CompletionQueueTag*>(ev.tag);
156         *ok = ev.success != 0;
157         *tag = core_cq_tag;
158         if (core_cq_tag->FinalizeResult(tag, ok)) {
159           return GOT_EVENT;
160         }
161         break;
162     }
163   }
164 }
165 
CompletionQueueTLSCache(CompletionQueue * cq)166 CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache(
167     CompletionQueue* cq)
168     : cq_(cq), flushed_(false) {
169   grpc_completion_queue_thread_local_cache_init(cq_->cq_);
170 }
171 
~CompletionQueueTLSCache()172 CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache() {
173   CHECK(flushed_);
174 }
175 
Flush(void ** tag,bool * ok)176 bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
177   int res = 0;
178   void* res_tag;
179   flushed_ = true;
180   if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
181                                                      &res)) {
182     auto core_cq_tag =
183         static_cast<grpc::internal::CompletionQueueTag*>(res_tag);
184     *ok = res == 1;
185     if (core_cq_tag->FinalizeResult(tag, ok)) {
186       return true;
187     }
188   }
189   return false;
190 }
191 
CallbackAlternativeCQ()192 CompletionQueue* CompletionQueue::CallbackAlternativeCQ() {
193   if (grpc_core::IsEventEngineCallbackCqEnabled()) {
194     grpc_core::Crash("CallbackAlternativeCQ should not be instantiated");
195   }
196   gpr_once_init(&g_once_init_callback_alternative,
197                 [] { g_callback_alternative_mu = new grpc_core::Mutex(); });
198   return g_callback_alternative_cq.Ref();
199 }
200 
ReleaseCallbackAlternativeCQ(CompletionQueue * cq)201 void CompletionQueue::ReleaseCallbackAlternativeCQ(CompletionQueue* cq)
202     ABSL_NO_THREAD_SAFETY_ANALYSIS {
203   (void)cq;
204   // This accesses g_callback_alternative_cq without acquiring the mutex
205   // but it's considered safe because it just reads the pointer address.
206   DCHECK(cq == g_callback_alternative_cq.cq);
207   g_callback_alternative_cq.Unref();
208 }
209 
210 }  // namespace grpc
211