1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpcpp/impl/codegen/server_context.h>
20
21 #include <algorithm>
22 #include <utility>
23
24 #include <grpc/compression.h>
25 #include <grpc/grpc.h>
26 #include <grpc/load_reporting.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpcpp/impl/call.h>
30 #include <grpcpp/impl/codegen/completion_queue.h>
31 #include <grpcpp/impl/grpc_library.h>
32 #include <grpcpp/support/server_callback.h>
33 #include <grpcpp/support/time.h>
34
35 #include "src/core/lib/gprpp/ref_counted.h"
36 #include "src/core/lib/gprpp/sync.h"
37 #include "src/core/lib/surface/call.h"
38
39 namespace grpc {
40
41 static internal::GrpcLibraryInitializer g_gli_initializer;
42
43 // CompletionOp
44
45 class ServerContextBase::CompletionOp final
46 : public internal::CallOpSetInterface {
47 public:
48 // initial refs: one in the server context, one in the cq
49 // must ref the call before calling constructor and after deleting this
CompletionOp(internal::Call * call,::grpc::internal::ServerCallbackCall * callback_controller)50 CompletionOp(internal::Call* call,
51 ::grpc::internal::ServerCallbackCall* callback_controller)
52 : call_(*call),
53 callback_controller_(callback_controller),
54 has_tag_(false),
55 tag_(nullptr),
56 core_cq_tag_(this),
57 refs_(2),
58 finalized_(false),
59 cancelled_(0),
60 done_intercepting_(false) {}
61
62 // CompletionOp isn't copyable or movable
63 CompletionOp(const CompletionOp&) = delete;
64 CompletionOp& operator=(const CompletionOp&) = delete;
65 CompletionOp(CompletionOp&&) = delete;
66 CompletionOp& operator=(CompletionOp&&) = delete;
67
~CompletionOp()68 ~CompletionOp() override {
69 if (call_.server_rpc_info()) {
70 call_.server_rpc_info()->Unref();
71 }
72 }
73
74 void FillOps(internal::Call* call) override;
75
76 // This should always be arena allocated in the call, so override delete.
77 // But this class is not trivially destructible, so must actually call delete
78 // before allowing the arena to be freed
operator delete(void *,std::size_t size)79 static void operator delete(void* /*ptr*/, std::size_t size) {
80 // Use size to avoid unused-parameter warning since assert seems to be
81 // compiled out and treated as unused in some gcc optimized versions.
82 (void)size;
83 assert(size == sizeof(CompletionOp));
84 }
85
86 // This operator should never be called as the memory should be freed as part
87 // of the arena destruction. It only exists to provide a matching operator
88 // delete to the operator new so that some compilers will not complain (see
89 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
90 // there are no tests catching the compiler warning.
operator delete(void *,void *)91 static void operator delete(void*, void*) { assert(0); }
92
93 bool FinalizeResult(void** tag, bool* status) override;
94
CheckCancelled(CompletionQueue * cq)95 bool CheckCancelled(CompletionQueue* cq) {
96 cq->TryPluck(this);
97 return CheckCancelledNoPluck();
98 }
CheckCancelledAsync()99 bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
100
set_tag(void * tag)101 void set_tag(void* tag) {
102 has_tag_ = true;
103 tag_ = tag;
104 }
105
set_core_cq_tag(void * core_cq_tag)106 void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
107
core_cq_tag()108 void* core_cq_tag() override { return core_cq_tag_; }
109
110 void Unref();
111
112 // This will be called while interceptors are run if the RPC is a hijacked
113 // RPC. This should set hijacking state for each of the ops.
SetHijackingState()114 void SetHijackingState() override {
115 /* Servers don't allow hijacking */
116 GPR_ASSERT(false);
117 }
118
119 /* Should be called after interceptors are done running */
ContinueFillOpsAfterInterception()120 void ContinueFillOpsAfterInterception() override {}
121
122 /* Should be called after interceptors are done running on the finalize result
123 * path */
ContinueFinalizeResultAfterInterception()124 void ContinueFinalizeResultAfterInterception() override {
125 done_intercepting_ = true;
126 if (!has_tag_) {
127 // We don't have a tag to return.
128 Unref();
129 // Unref can delete this, so do not access anything from this afterward.
130 return;
131 }
132 /* Start a phony op so that we can return the tag */
133 GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_,
134 nullptr) == GRPC_CALL_OK);
135 }
136
137 private:
CheckCancelledNoPluck()138 bool CheckCancelledNoPluck() {
139 grpc_core::MutexLock lock(&mu_);
140 return finalized_ ? (cancelled_ != 0) : false;
141 }
142
143 internal::Call call_;
144 ::grpc::internal::ServerCallbackCall* const callback_controller_;
145 bool has_tag_;
146 void* tag_;
147 void* core_cq_tag_;
148 grpc_core::RefCount refs_;
149 grpc_core::Mutex mu_;
150 bool finalized_;
151 int cancelled_; // This is an int (not bool) because it is passed to core
152 bool done_intercepting_;
153 internal::InterceptorBatchMethodsImpl interceptor_methods_;
154 };
155
Unref()156 void ServerContextBase::CompletionOp::Unref() {
157 if (refs_.Unref()) {
158 grpc_call* call = call_.call();
159 delete this;
160 grpc_call_unref(call);
161 }
162 }
163
FillOps(internal::Call * call)164 void ServerContextBase::CompletionOp::FillOps(internal::Call* call) {
165 grpc_op ops;
166 ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
167 ops.data.recv_close_on_server.cancelled = &cancelled_;
168 ops.flags = 0;
169 ops.reserved = nullptr;
170 interceptor_methods_.SetCall(&call_);
171 interceptor_methods_.SetReverse();
172 interceptor_methods_.SetCallOpSetInterface(this);
173 // The following call_start_batch is internally-generated so no need for an
174 // explanatory log on failure.
175 GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_,
176 nullptr) == GRPC_CALL_OK);
177 /* No interceptors to run here */
178 }
179
FinalizeResult(void ** tag,bool * status)180 bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
181 // Decide whether to do the unref or call the cancel callback within the lock
182 bool do_unref = false;
183 bool has_tag = false;
184 bool call_cancel = false;
185
186 {
187 grpc_core::MutexLock lock(&mu_);
188 if (done_intercepting_) {
189 // We are done intercepting.
190 has_tag = has_tag_;
191 if (has_tag) {
192 *tag = tag_;
193 }
194 // Release the lock before unreffing as Unref may delete this object
195 do_unref = true;
196 } else {
197 finalized_ = true;
198
199 // If for some reason the incoming status is false, mark that as a
200 // cancellation.
201 // TODO(vjpai): does this ever happen?
202 if (!*status) {
203 cancelled_ = 1;
204 }
205
206 call_cancel = (cancelled_ != 0);
207 // Release the lock since we may call a callback and interceptors.
208 }
209 }
210
211 if (do_unref) {
212 Unref();
213 // Unref can delete this, so do not access anything from this afterward.
214 return has_tag;
215 }
216 if (call_cancel && callback_controller_ != nullptr) {
217 callback_controller_->MaybeCallOnCancel();
218 }
219 /* Add interception point and run through interceptors */
220 interceptor_methods_.AddInterceptionHookPoint(
221 experimental::InterceptionHookPoints::POST_RECV_CLOSE);
222 if (interceptor_methods_.RunInterceptors()) {
223 // No interceptors were run
224 bool has_tag = has_tag_;
225 if (has_tag) {
226 *tag = tag_;
227 }
228 Unref();
229 // Unref can delete this, so do not access anything from this afterward.
230 return has_tag;
231 }
232 // There are interceptors to be run. Return false for now.
233 return false;
234 }
235
236 // ServerContextBase body
237
ServerContextBase()238 ServerContextBase::ServerContextBase()
239 : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {
240 g_gli_initializer.summon();
241 }
242
ServerContextBase(gpr_timespec deadline,grpc_metadata_array * arr)243 ServerContextBase::ServerContextBase(gpr_timespec deadline,
244 grpc_metadata_array* arr)
245 : deadline_(deadline) {
246 std::swap(*client_metadata_.arr(), *arr);
247 }
248
BindDeadlineAndMetadata(gpr_timespec deadline,grpc_metadata_array * arr)249 void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline,
250 grpc_metadata_array* arr) {
251 deadline_ = deadline;
252 std::swap(*client_metadata_.arr(), *arr);
253 }
254
~ServerContextBase()255 ServerContextBase::~ServerContextBase() {
256 if (completion_op_) {
257 completion_op_->Unref();
258 // Unref can delete completion_op_, so do not access it afterward.
259 }
260 if (rpc_info_) {
261 rpc_info_->Unref();
262 }
263 if (default_reactor_used_.load(std::memory_order_relaxed)) {
264 reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor();
265 }
266 }
267
~CallWrapper()268 ServerContextBase::CallWrapper::~CallWrapper() {
269 if (call) {
270 // If the ServerContext is part of the call's arena, this could free the
271 // object itself.
272 grpc_call_unref(call);
273 }
274 }
275
BeginCompletionOp(internal::Call * call,std::function<void (bool)> callback,::grpc::internal::ServerCallbackCall * callback_controller)276 void ServerContextBase::BeginCompletionOp(
277 internal::Call* call, std::function<void(bool)> callback,
278 ::grpc::internal::ServerCallbackCall* callback_controller) {
279 GPR_ASSERT(!completion_op_);
280 if (rpc_info_) {
281 rpc_info_->Ref();
282 }
283 grpc_call_ref(call->call());
284 completion_op_ =
285 new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
286 CompletionOp(call, callback_controller);
287 if (callback_controller != nullptr) {
288 completion_tag_.Set(call->call(), std::move(callback), completion_op_,
289 true);
290 completion_op_->set_core_cq_tag(&completion_tag_);
291 completion_op_->set_tag(completion_op_);
292 } else if (has_notify_when_done_tag_) {
293 completion_op_->set_tag(async_notify_when_done_tag_);
294 }
295 call->PerformOps(completion_op_);
296 }
297
GetCompletionOpTag()298 internal::CompletionQueueTag* ServerContextBase::GetCompletionOpTag() {
299 return static_cast<internal::CompletionQueueTag*>(completion_op_);
300 }
301
AddInitialMetadata(const std::string & key,const std::string & value)302 void ServerContextBase::AddInitialMetadata(const std::string& key,
303 const std::string& value) {
304 initial_metadata_.insert(std::make_pair(key, value));
305 }
306
AddTrailingMetadata(const std::string & key,const std::string & value)307 void ServerContextBase::AddTrailingMetadata(const std::string& key,
308 const std::string& value) {
309 trailing_metadata_.insert(std::make_pair(key, value));
310 }
311
TryCancel() const312 void ServerContextBase::TryCancel() const {
313 internal::CancelInterceptorBatchMethods cancel_methods;
314 if (rpc_info_) {
315 for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
316 rpc_info_->RunInterceptor(&cancel_methods, i);
317 }
318 }
319 grpc_call_error err =
320 grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED,
321 "Cancelled on the server side", nullptr);
322 if (err != GRPC_CALL_OK) {
323 gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
324 }
325 }
326
IsCancelled() const327 bool ServerContextBase::IsCancelled() const {
328 if (completion_tag_) {
329 // When using callback API, this result is always valid.
330 return completion_op_->CheckCancelledAsync();
331 } else if (has_notify_when_done_tag_) {
332 // When using async API, the result is only valid
333 // if the tag has already been delivered at the completion queue
334 return completion_op_ && completion_op_->CheckCancelledAsync();
335 } else {
336 // when using sync API, the result is always valid
337 return completion_op_ && completion_op_->CheckCancelled(cq_);
338 }
339 }
340
set_compression_algorithm(grpc_compression_algorithm algorithm)341 void ServerContextBase::set_compression_algorithm(
342 grpc_compression_algorithm algorithm) {
343 compression_algorithm_ = algorithm;
344 const char* algorithm_name = nullptr;
345 if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
346 gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
347 algorithm);
348 abort();
349 }
350 GPR_ASSERT(algorithm_name != nullptr);
351 AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
352 }
353
peer() const354 std::string ServerContextBase::peer() const {
355 std::string peer;
356 if (call_.call) {
357 char* c_peer = grpc_call_get_peer(call_.call);
358 peer = c_peer;
359 gpr_free(c_peer);
360 }
361 return peer;
362 }
363
census_context() const364 const struct census_context* ServerContextBase::census_context() const {
365 return call_.call == nullptr ? nullptr
366 : grpc_census_call_get_context(call_.call);
367 }
368
SetLoadReportingCosts(const std::vector<std::string> & cost_data)369 void ServerContextBase::SetLoadReportingCosts(
370 const std::vector<std::string>& cost_data) {
371 if (call_.call == nullptr) return;
372 for (const auto& cost_datum : cost_data) {
373 AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
374 }
375 }
376
377 } // namespace grpc
378