1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 //
17
18 #include <grpc/grpc.h>
19 #include <grpc/support/cpu.h>
20 #include <grpc/support/sync.h>
21 #include <grpc/support/time.h>
22 #include <grpcpp/completion_queue.h>
23 #include <grpcpp/impl/completion_queue_tag.h>
24 #include <grpcpp/impl/grpc_library.h>
25
26 #include <vector>
27
28 #include "absl/base/thread_annotations.h"
29 #include "absl/log/check.h"
30 #include "absl/log/log.h"
31 #include "src/core/lib/experiments/experiments.h"
32 #include "src/core/util/crash.h"
33 #include "src/core/util/sync.h"
34 #include "src/core/util/thd.h"
35 #include "src/core/util/useful.h"
36
37 namespace grpc {
38 namespace {
39
40 gpr_once g_once_init_callback_alternative = GPR_ONCE_INIT;
41 grpc_core::Mutex* g_callback_alternative_mu;
42
43 // Implement a ref-counted callback CQ for global use in the alternative
44 // implementation so that its threads are only created once. Do this using
45 // explicit ref-counts and raw pointers rather than a shared-ptr since that
46 // has a non-trivial destructor and thus can't be used for global variables.
47 struct CallbackAlternativeCQ {
48 int refs ABSL_GUARDED_BY(g_callback_alternative_mu) = 0;
49 CompletionQueue* cq ABSL_GUARDED_BY(g_callback_alternative_mu);
50 std::vector<grpc_core::Thread>* nexting_threads
51 ABSL_GUARDED_BY(g_callback_alternative_mu);
52
Refgrpc::__anond77879d70111::CallbackAlternativeCQ53 CompletionQueue* Ref() {
54 grpc_core::MutexLock lock(&*g_callback_alternative_mu);
55 refs++;
56 if (refs == 1) {
57 cq = new CompletionQueue;
58 int num_nexting_threads =
59 grpc_core::Clamp(gpr_cpu_num_cores() / 2, 2u, 16u);
60 nexting_threads = new std::vector<grpc_core::Thread>;
61 for (int i = 0; i < num_nexting_threads; i++) {
62 nexting_threads->emplace_back(
63 "nexting_thread",
64 [](void* arg) {
65 grpc_completion_queue* cq =
66 static_cast<CompletionQueue*>(arg)->cq();
67 while (true) {
68 // Use the raw Core next function rather than the C++ Next since
69 // Next incorporates FinalizeResult and we actually want that
70 // called from the callback functor itself.
71 // TODO(vjpai): Migrate below to next without a timeout or idle
72 // phase. That's currently starving out some other polling,
73 // though.
74 auto ev = grpc_completion_queue_next(
75 cq,
76 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
77 gpr_time_from_millis(1000, GPR_TIMESPAN)),
78 nullptr);
79 if (ev.type == GRPC_QUEUE_SHUTDOWN) {
80 return;
81 }
82 if (ev.type == GRPC_QUEUE_TIMEOUT) {
83 gpr_sleep_until(
84 gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
85 gpr_time_from_millis(100, GPR_TIMESPAN)));
86 continue;
87 }
88 DCHECK(ev.type == GRPC_OP_COMPLETE);
89 // We can always execute the callback inline rather than
90 // pushing it to another Executor thread because this
91 // thread is definitely running on a background thread, does not
92 // hold any application locks before executing the callback,
93 // and cannot be entered recursively.
94 auto* functor =
95 static_cast<grpc_completion_queue_functor*>(ev.tag);
96 functor->functor_run(functor, ev.success);
97 }
98 },
99 cq);
100 }
101 for (auto& th : *nexting_threads) {
102 th.Start();
103 }
104 }
105 return cq;
106 }
107
Unrefgrpc::__anond77879d70111::CallbackAlternativeCQ108 void Unref() {
109 grpc_core::MutexLock lock(g_callback_alternative_mu);
110 refs--;
111 if (refs == 0) {
112 cq->Shutdown();
113 for (auto& th : *nexting_threads) {
114 th.Join();
115 }
116 delete nexting_threads;
117 delete cq;
118 }
119 }
120 };
121
122 CallbackAlternativeCQ g_callback_alternative_cq;
123
124 } // namespace
125
126 // 'CompletionQueue' constructor can safely call GrpcLibraryCodegen(false) here
127 // i.e not have GrpcLibraryCodegen call grpc_init(). This is because, to create
128 // a 'grpc_completion_queue' instance (which is being passed as the input to
129 // this constructor), one must have already called grpc_init().
CompletionQueue(grpc_completion_queue * take)130 CompletionQueue::CompletionQueue(grpc_completion_queue* take)
131 : GrpcLibrary(false), cq_(take) {
132 InitialAvalanching();
133 }
134
Shutdown()135 void CompletionQueue::Shutdown() {
136 #ifndef NDEBUG
137 if (!ServerListEmpty()) {
138 LOG(ERROR) << "CompletionQueue shutdown being shutdown before its server.";
139 }
140 #endif
141 CompleteAvalanching();
142 }
143
AsyncNextInternal(void ** tag,bool * ok,gpr_timespec deadline)144 CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
145 void** tag, bool* ok, gpr_timespec deadline) {
146 for (;;) {
147 auto ev = grpc_completion_queue_next(cq_, deadline, nullptr);
148 switch (ev.type) {
149 case GRPC_QUEUE_TIMEOUT:
150 return TIMEOUT;
151 case GRPC_QUEUE_SHUTDOWN:
152 return SHUTDOWN;
153 case GRPC_OP_COMPLETE:
154 auto core_cq_tag =
155 static_cast<grpc::internal::CompletionQueueTag*>(ev.tag);
156 *ok = ev.success != 0;
157 *tag = core_cq_tag;
158 if (core_cq_tag->FinalizeResult(tag, ok)) {
159 return GOT_EVENT;
160 }
161 break;
162 }
163 }
164 }
165
CompletionQueueTLSCache(CompletionQueue * cq)166 CompletionQueue::CompletionQueueTLSCache::CompletionQueueTLSCache(
167 CompletionQueue* cq)
168 : cq_(cq), flushed_(false) {
169 grpc_completion_queue_thread_local_cache_init(cq_->cq_);
170 }
171
~CompletionQueueTLSCache()172 CompletionQueue::CompletionQueueTLSCache::~CompletionQueueTLSCache() {
173 CHECK(flushed_);
174 }
175
Flush(void ** tag,bool * ok)176 bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
177 int res = 0;
178 void* res_tag;
179 flushed_ = true;
180 if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
181 &res)) {
182 auto core_cq_tag =
183 static_cast<grpc::internal::CompletionQueueTag*>(res_tag);
184 *ok = res == 1;
185 if (core_cq_tag->FinalizeResult(tag, ok)) {
186 return true;
187 }
188 }
189 return false;
190 }
191
CallbackAlternativeCQ()192 CompletionQueue* CompletionQueue::CallbackAlternativeCQ() {
193 if (grpc_core::IsEventEngineCallbackCqEnabled()) {
194 grpc_core::Crash("CallbackAlternativeCQ should not be instantiated");
195 }
196 gpr_once_init(&g_once_init_callback_alternative,
197 [] { g_callback_alternative_mu = new grpc_core::Mutex(); });
198 return g_callback_alternative_cq.Ref();
199 }
200
ReleaseCallbackAlternativeCQ(CompletionQueue * cq)201 void CompletionQueue::ReleaseCallbackAlternativeCQ(CompletionQueue* cq)
202 ABSL_NO_THREAD_SAFETY_ANALYSIS {
203 (void)cq;
204 // This accesses g_callback_alternative_cq without acquiring the mutex
205 // but it's considered safe because it just reads the pointer address.
206 DCHECK(cq == g_callback_alternative_cq.cq);
207 g_callback_alternative_cq.Unref();
208 }
209
210 } // namespace grpc
211