• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <assert.h>
20 #include <grpc/compression.h>
21 #include <grpc/grpc.h>
22 #include <grpc/impl/compression_types.h>
23 #include <grpc/load_reporting.h>
24 #include <grpc/status.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/time.h>
27 #include <grpcpp/completion_queue.h>
28 #include <grpcpp/ext/call_metric_recorder.h>
29 #include <grpcpp/ext/server_metric_recorder.h>
30 #include <grpcpp/impl/call.h>
31 #include <grpcpp/impl/call_op_set.h>
32 #include <grpcpp/impl/call_op_set_interface.h>
33 #include <grpcpp/impl/completion_queue_tag.h>
34 #include <grpcpp/impl/interceptor_common.h>
35 #include <grpcpp/impl/metadata_map.h>
36 #include <grpcpp/server_context.h>
37 #include <grpcpp/support/callback_common.h>
38 #include <grpcpp/support/interceptor.h>
39 #include <grpcpp/support/server_callback.h>
40 #include <grpcpp/support/server_interceptor.h>
41 #include <grpcpp/support/string_ref.h>
42 
43 #include <atomic>
44 #include <cstdlib>
45 #include <functional>
46 #include <map>
47 #include <memory>
48 #include <new>
49 #include <string>
50 #include <utility>
51 #include <vector>
52 
53 #include "absl/log/check.h"
54 #include "absl/log/log.h"
55 #include "absl/strings/str_format.h"
56 #include "absl/strings/string_view.h"
57 #include "src/core/lib/resource_quota/arena.h"
58 #include "src/core/lib/surface/call.h"
59 #include "src/core/util/crash.h"
60 #include "src/core/util/ref_counted.h"
61 #include "src/core/util/sync.h"
62 #include "src/cpp/server/backend_metric_recorder.h"
63 
64 namespace grpc {
65 
66 // CompletionOp
67 
68 class ServerContextBase::CompletionOp final
69     : public internal::CallOpSetInterface {
70  public:
71   // initial refs: one in the server context, one in the cq
72   // must ref the call before calling constructor and after deleting this
CompletionOp(internal::Call * call,grpc::internal::ServerCallbackCall * callback_controller)73   CompletionOp(internal::Call* call,
74                grpc::internal::ServerCallbackCall* callback_controller)
75       : call_(*call),
76         callback_controller_(callback_controller),
77         has_tag_(false),
78         tag_(nullptr),
79         core_cq_tag_(this),
80         refs_(2),
81         finalized_(false),
82         cancelled_(0),
83         done_intercepting_(false) {}
84 
85   // CompletionOp isn't copyable or movable
86   CompletionOp(const CompletionOp&) = delete;
87   CompletionOp& operator=(const CompletionOp&) = delete;
88   CompletionOp(CompletionOp&&) = delete;
89   CompletionOp& operator=(CompletionOp&&) = delete;
90 
~CompletionOp()91   ~CompletionOp() override {
92     if (call_.server_rpc_info()) {
93       call_.server_rpc_info()->Unref();
94     }
95   }
96 
97   void FillOps(internal::Call* call) override;
98 
99   // This should always be arena allocated in the call, so override delete.
100   // But this class is not trivially destructible, so must actually call delete
101   // before allowing the arena to be freed
operator delete(void *,std::size_t size)102   static void operator delete(void* /*ptr*/, std::size_t size) {
103     // Use size to avoid unused-parameter warning since assert seems to be
104     // compiled out and treated as unused in some gcc optimized versions.
105     (void)size;
106     assert(size == sizeof(CompletionOp));
107   }
108 
109   // This operator should never be called as the memory should be freed as part
110   // of the arena destruction. It only exists to provide a matching operator
111   // delete to the operator new so that some compilers will not complain (see
112   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
113   // there are no tests catching the compiler warning.
operator delete(void *,void *)114   static void operator delete(void*, void*) { assert(0); }
115 
116   bool FinalizeResult(void** tag, bool* status) override;
117 
CheckCancelled(CompletionQueue * cq)118   bool CheckCancelled(CompletionQueue* cq) {
119     cq->TryPluck(this);
120     return CheckCancelledNoPluck();
121   }
CheckCancelledAsync()122   bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
123 
set_tag(void * tag)124   void set_tag(void* tag) {
125     has_tag_ = true;
126     tag_ = tag;
127   }
128 
set_core_cq_tag(void * core_cq_tag)129   void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
130 
core_cq_tag()131   void* core_cq_tag() override { return core_cq_tag_; }
132 
133   void Unref();
134 
135   // This will be called while interceptors are run if the RPC is a hijacked
136   // RPC. This should set hijacking state for each of the ops.
SetHijackingState()137   void SetHijackingState() override {
138     // Servers don't allow hijacking
139     grpc_core::Crash("unreachable");
140   }
141 
142   // Should be called after interceptors are done running
ContinueFillOpsAfterInterception()143   void ContinueFillOpsAfterInterception() override {}
144 
145   // Should be called after interceptors are done running on the finalize result
146   // path
ContinueFinalizeResultAfterInterception()147   void ContinueFinalizeResultAfterInterception() override {
148     done_intercepting_ = true;
149     if (!has_tag_) {
150       // We don't have a tag to return.
151       Unref();
152       // Unref can delete this, so do not access anything from this afterward.
153       return;
154     }
155     // Start a phony op so that we can return the tag
156     CHECK(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_,
157                                 nullptr) == GRPC_CALL_OK);
158   }
159 
160  private:
CheckCancelledNoPluck()161   bool CheckCancelledNoPluck() {
162     grpc_core::MutexLock lock(&mu_);
163     return finalized_ ? (cancelled_ != 0) : false;
164   }
165 
166   internal::Call call_;
167   grpc::internal::ServerCallbackCall* const callback_controller_;
168   bool has_tag_;
169   void* tag_;
170   void* core_cq_tag_;
171   grpc_core::RefCount refs_;
172   grpc_core::Mutex mu_;
173   bool finalized_;
174   int cancelled_;  // This is an int (not bool) because it is passed to core
175   bool done_intercepting_;
176   internal::InterceptorBatchMethodsImpl interceptor_methods_;
177 };
178 
Unref()179 void ServerContextBase::CompletionOp::Unref() {
180   if (refs_.Unref()) {
181     grpc_call* call = call_.call();
182     delete this;
183     grpc_call_unref(call);
184   }
185 }
186 
FillOps(internal::Call * call)187 void ServerContextBase::CompletionOp::FillOps(internal::Call* call) {
188   grpc_op ops;
189   ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
190   ops.data.recv_close_on_server.cancelled = &cancelled_;
191   ops.flags = 0;
192   ops.reserved = nullptr;
193   interceptor_methods_.SetCall(&call_);
194   interceptor_methods_.SetReverse();
195   interceptor_methods_.SetCallOpSetInterface(this);
196   // The following call_start_batch is internally-generated so no need for an
197   // explanatory log on failure.
198   CHECK(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_, nullptr) ==
199         GRPC_CALL_OK);
200   // No interceptors to run here
201 }
202 
FinalizeResult(void ** tag,bool * status)203 bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
204   // Decide whether to do the unref or call the cancel callback within the lock
205   bool do_unref = false;
206   bool has_tag = false;
207   bool call_cancel = false;
208 
209   {
210     grpc_core::MutexLock lock(&mu_);
211     if (done_intercepting_) {
212       // We are done intercepting.
213       has_tag = has_tag_;
214       if (has_tag) {
215         *tag = tag_;
216       }
217       // Release the lock before unreffing as Unref may delete this object
218       do_unref = true;
219     } else {
220       finalized_ = true;
221 
222       // If for some reason the incoming status is false, mark that as a
223       // cancellation.
224       // TODO(vjpai): does this ever happen?
225       if (!*status) {
226         cancelled_ = 1;
227       }
228 
229       call_cancel = (cancelled_ != 0);
230       // Release the lock since we may call a callback and interceptors.
231     }
232   }
233 
234   if (do_unref) {
235     Unref();
236     // Unref can delete this, so do not access anything from this afterward.
237     return has_tag;
238   }
239   if (call_cancel && callback_controller_ != nullptr) {
240     callback_controller_->MaybeCallOnCancel();
241   }
242   // Add interception point and run through interceptors
243   interceptor_methods_.AddInterceptionHookPoint(
244       experimental::InterceptionHookPoints::POST_RECV_CLOSE);
245   if (interceptor_methods_.RunInterceptors()) {
246     // No interceptors were run
247     bool has_tag = has_tag_;
248     if (has_tag) {
249       *tag = tag_;
250     }
251     Unref();
252     // Unref can delete this, so do not access anything from this afterward.
253     return has_tag;
254   }
255   // There are interceptors to be run. Return false for now.
256   return false;
257 }
258 
259 // ServerContextBase body
260 
ServerContextBase()261 ServerContextBase::ServerContextBase()
262     : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {}
263 
ServerContextBase(gpr_timespec deadline,grpc_metadata_array * arr)264 ServerContextBase::ServerContextBase(gpr_timespec deadline,
265                                      grpc_metadata_array* arr)
266     : deadline_(deadline) {
267   std::swap(*client_metadata_.arr(), *arr);
268 }
269 
BindDeadlineAndMetadata(gpr_timespec deadline,grpc_metadata_array * arr)270 void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline,
271                                                 grpc_metadata_array* arr) {
272   deadline_ = deadline;
273   std::swap(*client_metadata_.arr(), *arr);
274 }
275 
~ServerContextBase()276 ServerContextBase::~ServerContextBase() {
277   if (completion_op_) {
278     completion_op_->Unref();
279     // Unref can delete completion_op_, so do not access it afterward.
280   }
281   if (rpc_info_) {
282     rpc_info_->Unref();
283   }
284   if (default_reactor_used_.load(std::memory_order_relaxed)) {
285     reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor();
286   }
287   if (call_metric_recorder_ != nullptr) {
288     call_metric_recorder_->~CallMetricRecorder();
289   }
290 }
291 
~CallWrapper()292 ServerContextBase::CallWrapper::~CallWrapper() {
293   if (call) {
294     // If the ServerContext is part of the call's arena, this could free the
295     // object itself.
296     grpc_call_unref(call);
297   }
298 }
299 
BeginCompletionOp(internal::Call * call,std::function<void (bool)> callback,grpc::internal::ServerCallbackCall * callback_controller)300 void ServerContextBase::BeginCompletionOp(
301     internal::Call* call, std::function<void(bool)> callback,
302     grpc::internal::ServerCallbackCall* callback_controller) {
303   CHECK(!completion_op_);
304   if (rpc_info_) {
305     rpc_info_->Ref();
306   }
307   grpc_call_ref(call->call());
308   completion_op_ =
309       new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
310           CompletionOp(call, callback_controller);
311   if (callback_controller != nullptr) {
312     completion_tag_.Set(call->call(), std::move(callback), completion_op_,
313                         true);
314     completion_op_->set_core_cq_tag(&completion_tag_);
315     completion_op_->set_tag(completion_op_);
316   } else if (has_notify_when_done_tag_) {
317     completion_op_->set_tag(async_notify_when_done_tag_);
318   }
319   call->PerformOps(completion_op_);
320 }
321 
GetCompletionOpTag()322 internal::CompletionQueueTag* ServerContextBase::GetCompletionOpTag() {
323   return static_cast<internal::CompletionQueueTag*>(completion_op_);
324 }
325 
AddInitialMetadata(const std::string & key,const std::string & value)326 void ServerContextBase::AddInitialMetadata(const std::string& key,
327                                            const std::string& value) {
328   initial_metadata_.insert(std::make_pair(key, value));
329 }
330 
AddTrailingMetadata(const std::string & key,const std::string & value)331 void ServerContextBase::AddTrailingMetadata(const std::string& key,
332                                             const std::string& value) {
333   trailing_metadata_.insert(std::make_pair(key, value));
334 }
335 
TryCancel() const336 void ServerContextBase::TryCancel() const {
337   internal::CancelInterceptorBatchMethods cancel_methods;
338   if (rpc_info_) {
339     for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
340       rpc_info_->RunInterceptor(&cancel_methods, i);
341     }
342   }
343   grpc_call_error err =
344       grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED,
345                                    "Cancelled on the server side", nullptr);
346   if (err != GRPC_CALL_OK) {
347     LOG(ERROR) << "TryCancel failed with: " << err;
348   }
349 }
350 
IsCancelled() const351 bool ServerContextBase::IsCancelled() const {
352   if (completion_tag_) {
353     // When using callback API, this result is always valid.
354     return marked_cancelled_.load(std::memory_order_acquire) ||
355            completion_op_->CheckCancelledAsync();
356   } else if (has_notify_when_done_tag_) {
357     // When using async API, the result is only valid
358     // if the tag has already been delivered at the completion queue
359     return completion_op_ && completion_op_->CheckCancelledAsync();
360   } else {
361     // when using sync API, the result is always valid
362     return marked_cancelled_.load(std::memory_order_acquire) ||
363            (completion_op_ && completion_op_->CheckCancelled(cq_));
364   }
365 }
366 
set_compression_algorithm(grpc_compression_algorithm algorithm)367 void ServerContextBase::set_compression_algorithm(
368     grpc_compression_algorithm algorithm) {
369   compression_algorithm_ = algorithm;
370   const char* algorithm_name = nullptr;
371   if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
372     grpc_core::Crash(absl::StrFormat(
373         "Name for compression algorithm '%d' unknown.", algorithm));
374   }
375   CHECK_NE(algorithm_name, nullptr);
376   AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
377 }
378 
peer() const379 std::string ServerContextBase::peer() const {
380   std::string peer;
381   if (call_.call) {
382     char* c_peer = grpc_call_get_peer(call_.call);
383     peer = c_peer;
384     gpr_free(c_peer);
385   }
386   return peer;
387 }
388 
census_context() const389 const struct census_context* ServerContextBase::census_context() const {
390   return call_.call == nullptr ? nullptr
391                                : grpc_census_call_get_context(call_.call);
392 }
393 
SetLoadReportingCosts(const std::vector<std::string> & cost_data)394 void ServerContextBase::SetLoadReportingCosts(
395     const std::vector<std::string>& cost_data) {
396   if (call_.call == nullptr) return;
397   for (const auto& cost_datum : cost_data) {
398     AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
399   }
400 }
401 
CreateCallMetricRecorder(experimental::ServerMetricRecorder * server_metric_recorder)402 void ServerContextBase::CreateCallMetricRecorder(
403     experimental::ServerMetricRecorder* server_metric_recorder) {
404   if (call_.call == nullptr) return;
405   CHECK_EQ(call_metric_recorder_, nullptr);
406   grpc_core::Arena* arena = grpc_call_get_arena(call_.call);
407   auto* backend_metric_state =
408       arena->New<BackendMetricState>(server_metric_recorder);
409   call_metric_recorder_ = backend_metric_state;
410   arena->SetContext<grpc_core::BackendMetricProvider>(backend_metric_state);
411 }
412 
ExperimentalGetAuthority() const413 grpc::string_ref ServerContextBase::ExperimentalGetAuthority() const {
414   absl::string_view authority = grpc_call_server_authority(call_.call);
415   return grpc::string_ref(authority.data(), authority.size());
416 }
417 
418 }  // namespace grpc
419