1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <assert.h>
20 #include <grpc/compression.h>
21 #include <grpc/grpc.h>
22 #include <grpc/impl/compression_types.h>
23 #include <grpc/load_reporting.h>
24 #include <grpc/status.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/time.h>
27 #include <grpcpp/completion_queue.h>
28 #include <grpcpp/ext/call_metric_recorder.h>
29 #include <grpcpp/ext/server_metric_recorder.h>
30 #include <grpcpp/impl/call.h>
31 #include <grpcpp/impl/call_op_set.h>
32 #include <grpcpp/impl/call_op_set_interface.h>
33 #include <grpcpp/impl/completion_queue_tag.h>
34 #include <grpcpp/impl/interceptor_common.h>
35 #include <grpcpp/impl/metadata_map.h>
36 #include <grpcpp/server_context.h>
37 #include <grpcpp/support/callback_common.h>
38 #include <grpcpp/support/interceptor.h>
39 #include <grpcpp/support/server_callback.h>
40 #include <grpcpp/support/server_interceptor.h>
41 #include <grpcpp/support/string_ref.h>
42
43 #include <atomic>
44 #include <cstdlib>
45 #include <functional>
46 #include <map>
47 #include <memory>
48 #include <new>
49 #include <string>
50 #include <utility>
51 #include <vector>
52
53 #include "absl/log/check.h"
54 #include "absl/log/log.h"
55 #include "absl/strings/str_format.h"
56 #include "absl/strings/string_view.h"
57 #include "src/core/lib/resource_quota/arena.h"
58 #include "src/core/lib/surface/call.h"
59 #include "src/core/util/crash.h"
60 #include "src/core/util/ref_counted.h"
61 #include "src/core/util/sync.h"
62 #include "src/cpp/server/backend_metric_recorder.h"
63
64 namespace grpc {
65
66 // CompletionOp
67
68 class ServerContextBase::CompletionOp final
69 : public internal::CallOpSetInterface {
70 public:
71 // initial refs: one in the server context, one in the cq
72 // must ref the call before calling constructor and after deleting this
CompletionOp(internal::Call * call,grpc::internal::ServerCallbackCall * callback_controller)73 CompletionOp(internal::Call* call,
74 grpc::internal::ServerCallbackCall* callback_controller)
75 : call_(*call),
76 callback_controller_(callback_controller),
77 has_tag_(false),
78 tag_(nullptr),
79 core_cq_tag_(this),
80 refs_(2),
81 finalized_(false),
82 cancelled_(0),
83 done_intercepting_(false) {}
84
85 // CompletionOp isn't copyable or movable
86 CompletionOp(const CompletionOp&) = delete;
87 CompletionOp& operator=(const CompletionOp&) = delete;
88 CompletionOp(CompletionOp&&) = delete;
89 CompletionOp& operator=(CompletionOp&&) = delete;
90
~CompletionOp()91 ~CompletionOp() override {
92 if (call_.server_rpc_info()) {
93 call_.server_rpc_info()->Unref();
94 }
95 }
96
97 void FillOps(internal::Call* call) override;
98
99 // This should always be arena allocated in the call, so override delete.
100 // But this class is not trivially destructible, so must actually call delete
101 // before allowing the arena to be freed
operator delete(void *,std::size_t size)102 static void operator delete(void* /*ptr*/, std::size_t size) {
103 // Use size to avoid unused-parameter warning since assert seems to be
104 // compiled out and treated as unused in some gcc optimized versions.
105 (void)size;
106 assert(size == sizeof(CompletionOp));
107 }
108
109 // This operator should never be called as the memory should be freed as part
110 // of the arena destruction. It only exists to provide a matching operator
111 // delete to the operator new so that some compilers will not complain (see
112 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
113 // there are no tests catching the compiler warning.
operator delete(void *,void *)114 static void operator delete(void*, void*) { assert(0); }
115
116 bool FinalizeResult(void** tag, bool* status) override;
117
CheckCancelled(CompletionQueue * cq)118 bool CheckCancelled(CompletionQueue* cq) {
119 cq->TryPluck(this);
120 return CheckCancelledNoPluck();
121 }
CheckCancelledAsync()122 bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
123
set_tag(void * tag)124 void set_tag(void* tag) {
125 has_tag_ = true;
126 tag_ = tag;
127 }
128
set_core_cq_tag(void * core_cq_tag)129 void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
130
core_cq_tag()131 void* core_cq_tag() override { return core_cq_tag_; }
132
133 void Unref();
134
135 // This will be called while interceptors are run if the RPC is a hijacked
136 // RPC. This should set hijacking state for each of the ops.
SetHijackingState()137 void SetHijackingState() override {
138 // Servers don't allow hijacking
139 grpc_core::Crash("unreachable");
140 }
141
142 // Should be called after interceptors are done running
ContinueFillOpsAfterInterception()143 void ContinueFillOpsAfterInterception() override {}
144
145 // Should be called after interceptors are done running on the finalize result
146 // path
ContinueFinalizeResultAfterInterception()147 void ContinueFinalizeResultAfterInterception() override {
148 done_intercepting_ = true;
149 if (!has_tag_) {
150 // We don't have a tag to return.
151 Unref();
152 // Unref can delete this, so do not access anything from this afterward.
153 return;
154 }
155 // Start a phony op so that we can return the tag
156 CHECK(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_,
157 nullptr) == GRPC_CALL_OK);
158 }
159
160 private:
CheckCancelledNoPluck()161 bool CheckCancelledNoPluck() {
162 grpc_core::MutexLock lock(&mu_);
163 return finalized_ ? (cancelled_ != 0) : false;
164 }
165
166 internal::Call call_;
167 grpc::internal::ServerCallbackCall* const callback_controller_;
168 bool has_tag_;
169 void* tag_;
170 void* core_cq_tag_;
171 grpc_core::RefCount refs_;
172 grpc_core::Mutex mu_;
173 bool finalized_;
174 int cancelled_; // This is an int (not bool) because it is passed to core
175 bool done_intercepting_;
176 internal::InterceptorBatchMethodsImpl interceptor_methods_;
177 };
178
Unref()179 void ServerContextBase::CompletionOp::Unref() {
180 if (refs_.Unref()) {
181 grpc_call* call = call_.call();
182 delete this;
183 grpc_call_unref(call);
184 }
185 }
186
FillOps(internal::Call * call)187 void ServerContextBase::CompletionOp::FillOps(internal::Call* call) {
188 grpc_op ops;
189 ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
190 ops.data.recv_close_on_server.cancelled = &cancelled_;
191 ops.flags = 0;
192 ops.reserved = nullptr;
193 interceptor_methods_.SetCall(&call_);
194 interceptor_methods_.SetReverse();
195 interceptor_methods_.SetCallOpSetInterface(this);
196 // The following call_start_batch is internally-generated so no need for an
197 // explanatory log on failure.
198 CHECK(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_, nullptr) ==
199 GRPC_CALL_OK);
200 // No interceptors to run here
201 }
202
FinalizeResult(void ** tag,bool * status)203 bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
204 // Decide whether to do the unref or call the cancel callback within the lock
205 bool do_unref = false;
206 bool has_tag = false;
207 bool call_cancel = false;
208
209 {
210 grpc_core::MutexLock lock(&mu_);
211 if (done_intercepting_) {
212 // We are done intercepting.
213 has_tag = has_tag_;
214 if (has_tag) {
215 *tag = tag_;
216 }
217 // Release the lock before unreffing as Unref may delete this object
218 do_unref = true;
219 } else {
220 finalized_ = true;
221
222 // If for some reason the incoming status is false, mark that as a
223 // cancellation.
224 // TODO(vjpai): does this ever happen?
225 if (!*status) {
226 cancelled_ = 1;
227 }
228
229 call_cancel = (cancelled_ != 0);
230 // Release the lock since we may call a callback and interceptors.
231 }
232 }
233
234 if (do_unref) {
235 Unref();
236 // Unref can delete this, so do not access anything from this afterward.
237 return has_tag;
238 }
239 if (call_cancel && callback_controller_ != nullptr) {
240 callback_controller_->MaybeCallOnCancel();
241 }
242 // Add interception point and run through interceptors
243 interceptor_methods_.AddInterceptionHookPoint(
244 experimental::InterceptionHookPoints::POST_RECV_CLOSE);
245 if (interceptor_methods_.RunInterceptors()) {
246 // No interceptors were run
247 bool has_tag = has_tag_;
248 if (has_tag) {
249 *tag = tag_;
250 }
251 Unref();
252 // Unref can delete this, so do not access anything from this afterward.
253 return has_tag;
254 }
255 // There are interceptors to be run. Return false for now.
256 return false;
257 }
258
259 // ServerContextBase body
260
ServerContextBase()261 ServerContextBase::ServerContextBase()
262 : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {}
263
ServerContextBase(gpr_timespec deadline,grpc_metadata_array * arr)264 ServerContextBase::ServerContextBase(gpr_timespec deadline,
265 grpc_metadata_array* arr)
266 : deadline_(deadline) {
267 std::swap(*client_metadata_.arr(), *arr);
268 }
269
BindDeadlineAndMetadata(gpr_timespec deadline,grpc_metadata_array * arr)270 void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline,
271 grpc_metadata_array* arr) {
272 deadline_ = deadline;
273 std::swap(*client_metadata_.arr(), *arr);
274 }
275
~ServerContextBase()276 ServerContextBase::~ServerContextBase() {
277 if (completion_op_) {
278 completion_op_->Unref();
279 // Unref can delete completion_op_, so do not access it afterward.
280 }
281 if (rpc_info_) {
282 rpc_info_->Unref();
283 }
284 if (default_reactor_used_.load(std::memory_order_relaxed)) {
285 reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor();
286 }
287 if (call_metric_recorder_ != nullptr) {
288 call_metric_recorder_->~CallMetricRecorder();
289 }
290 }
291
~CallWrapper()292 ServerContextBase::CallWrapper::~CallWrapper() {
293 if (call) {
294 // If the ServerContext is part of the call's arena, this could free the
295 // object itself.
296 grpc_call_unref(call);
297 }
298 }
299
BeginCompletionOp(internal::Call * call,std::function<void (bool)> callback,grpc::internal::ServerCallbackCall * callback_controller)300 void ServerContextBase::BeginCompletionOp(
301 internal::Call* call, std::function<void(bool)> callback,
302 grpc::internal::ServerCallbackCall* callback_controller) {
303 CHECK(!completion_op_);
304 if (rpc_info_) {
305 rpc_info_->Ref();
306 }
307 grpc_call_ref(call->call());
308 completion_op_ =
309 new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
310 CompletionOp(call, callback_controller);
311 if (callback_controller != nullptr) {
312 completion_tag_.Set(call->call(), std::move(callback), completion_op_,
313 true);
314 completion_op_->set_core_cq_tag(&completion_tag_);
315 completion_op_->set_tag(completion_op_);
316 } else if (has_notify_when_done_tag_) {
317 completion_op_->set_tag(async_notify_when_done_tag_);
318 }
319 call->PerformOps(completion_op_);
320 }
321
GetCompletionOpTag()322 internal::CompletionQueueTag* ServerContextBase::GetCompletionOpTag() {
323 return static_cast<internal::CompletionQueueTag*>(completion_op_);
324 }
325
AddInitialMetadata(const std::string & key,const std::string & value)326 void ServerContextBase::AddInitialMetadata(const std::string& key,
327 const std::string& value) {
328 initial_metadata_.insert(std::make_pair(key, value));
329 }
330
AddTrailingMetadata(const std::string & key,const std::string & value)331 void ServerContextBase::AddTrailingMetadata(const std::string& key,
332 const std::string& value) {
333 trailing_metadata_.insert(std::make_pair(key, value));
334 }
335
TryCancel() const336 void ServerContextBase::TryCancel() const {
337 internal::CancelInterceptorBatchMethods cancel_methods;
338 if (rpc_info_) {
339 for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
340 rpc_info_->RunInterceptor(&cancel_methods, i);
341 }
342 }
343 grpc_call_error err =
344 grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED,
345 "Cancelled on the server side", nullptr);
346 if (err != GRPC_CALL_OK) {
347 LOG(ERROR) << "TryCancel failed with: " << err;
348 }
349 }
350
IsCancelled() const351 bool ServerContextBase::IsCancelled() const {
352 if (completion_tag_) {
353 // When using callback API, this result is always valid.
354 return marked_cancelled_.load(std::memory_order_acquire) ||
355 completion_op_->CheckCancelledAsync();
356 } else if (has_notify_when_done_tag_) {
357 // When using async API, the result is only valid
358 // if the tag has already been delivered at the completion queue
359 return completion_op_ && completion_op_->CheckCancelledAsync();
360 } else {
361 // when using sync API, the result is always valid
362 return marked_cancelled_.load(std::memory_order_acquire) ||
363 (completion_op_ && completion_op_->CheckCancelled(cq_));
364 }
365 }
366
set_compression_algorithm(grpc_compression_algorithm algorithm)367 void ServerContextBase::set_compression_algorithm(
368 grpc_compression_algorithm algorithm) {
369 compression_algorithm_ = algorithm;
370 const char* algorithm_name = nullptr;
371 if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
372 grpc_core::Crash(absl::StrFormat(
373 "Name for compression algorithm '%d' unknown.", algorithm));
374 }
375 CHECK_NE(algorithm_name, nullptr);
376 AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
377 }
378
peer() const379 std::string ServerContextBase::peer() const {
380 std::string peer;
381 if (call_.call) {
382 char* c_peer = grpc_call_get_peer(call_.call);
383 peer = c_peer;
384 gpr_free(c_peer);
385 }
386 return peer;
387 }
388
census_context() const389 const struct census_context* ServerContextBase::census_context() const {
390 return call_.call == nullptr ? nullptr
391 : grpc_census_call_get_context(call_.call);
392 }
393
SetLoadReportingCosts(const std::vector<std::string> & cost_data)394 void ServerContextBase::SetLoadReportingCosts(
395 const std::vector<std::string>& cost_data) {
396 if (call_.call == nullptr) return;
397 for (const auto& cost_datum : cost_data) {
398 AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
399 }
400 }
401
CreateCallMetricRecorder(experimental::ServerMetricRecorder * server_metric_recorder)402 void ServerContextBase::CreateCallMetricRecorder(
403 experimental::ServerMetricRecorder* server_metric_recorder) {
404 if (call_.call == nullptr) return;
405 CHECK_EQ(call_metric_recorder_, nullptr);
406 grpc_core::Arena* arena = grpc_call_get_arena(call_.call);
407 auto* backend_metric_state =
408 arena->New<BackendMetricState>(server_metric_recorder);
409 call_metric_recorder_ = backend_metric_state;
410 arena->SetContext<grpc_core::BackendMetricProvider>(backend_metric_state);
411 }
412
ExperimentalGetAuthority() const413 grpc::string_ref ServerContextBase::ExperimentalGetAuthority() const {
414 absl::string_view authority = grpc_call_server_authority(call_.call);
415 return grpc::string_ref(authority.data(), authority.size());
416 }
417
418 } // namespace grpc
419