• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpcpp/impl/codegen/server_context.h>
20 
21 #include <algorithm>
22 #include <utility>
23 
24 #include <grpc/compression.h>
25 #include <grpc/grpc.h>
26 #include <grpc/load_reporting.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpcpp/impl/call.h>
30 #include <grpcpp/impl/codegen/completion_queue.h>
31 #include <grpcpp/impl/grpc_library.h>
32 #include <grpcpp/support/server_callback.h>
33 #include <grpcpp/support/time.h>
34 
35 #include "src/core/lib/gprpp/ref_counted.h"
36 #include "src/core/lib/gprpp/sync.h"
37 #include "src/core/lib/surface/call.h"
38 
39 namespace grpc {
40 
41 static internal::GrpcLibraryInitializer g_gli_initializer;
42 
43 // CompletionOp
44 
45 class ServerContextBase::CompletionOp final
46     : public internal::CallOpSetInterface {
47  public:
48   // initial refs: one in the server context, one in the cq
49   // must ref the call before calling constructor and after deleting this
CompletionOp(internal::Call * call,::grpc::internal::ServerCallbackCall * callback_controller)50   CompletionOp(internal::Call* call,
51                ::grpc::internal::ServerCallbackCall* callback_controller)
52       : call_(*call),
53         callback_controller_(callback_controller),
54         has_tag_(false),
55         tag_(nullptr),
56         core_cq_tag_(this),
57         refs_(2),
58         finalized_(false),
59         cancelled_(0),
60         done_intercepting_(false) {}
61 
62   // CompletionOp isn't copyable or movable
63   CompletionOp(const CompletionOp&) = delete;
64   CompletionOp& operator=(const CompletionOp&) = delete;
65   CompletionOp(CompletionOp&&) = delete;
66   CompletionOp& operator=(CompletionOp&&) = delete;
67 
~CompletionOp()68   ~CompletionOp() override {
69     if (call_.server_rpc_info()) {
70       call_.server_rpc_info()->Unref();
71     }
72   }
73 
74   void FillOps(internal::Call* call) override;
75 
76   // This should always be arena allocated in the call, so override delete.
77   // But this class is not trivially destructible, so must actually call delete
78   // before allowing the arena to be freed
operator delete(void *,std::size_t size)79   static void operator delete(void* /*ptr*/, std::size_t size) {
80     // Use size to avoid unused-parameter warning since assert seems to be
81     // compiled out and treated as unused in some gcc optimized versions.
82     (void)size;
83     assert(size == sizeof(CompletionOp));
84   }
85 
86   // This operator should never be called as the memory should be freed as part
87   // of the arena destruction. It only exists to provide a matching operator
88   // delete to the operator new so that some compilers will not complain (see
89   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
90   // there are no tests catching the compiler warning.
operator delete(void *,void *)91   static void operator delete(void*, void*) { assert(0); }
92 
93   bool FinalizeResult(void** tag, bool* status) override;
94 
CheckCancelled(CompletionQueue * cq)95   bool CheckCancelled(CompletionQueue* cq) {
96     cq->TryPluck(this);
97     return CheckCancelledNoPluck();
98   }
CheckCancelledAsync()99   bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
100 
set_tag(void * tag)101   void set_tag(void* tag) {
102     has_tag_ = true;
103     tag_ = tag;
104   }
105 
set_core_cq_tag(void * core_cq_tag)106   void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
107 
core_cq_tag()108   void* core_cq_tag() override { return core_cq_tag_; }
109 
110   void Unref();
111 
112   // This will be called while interceptors are run if the RPC is a hijacked
113   // RPC. This should set hijacking state for each of the ops.
SetHijackingState()114   void SetHijackingState() override {
115     /* Servers don't allow hijacking */
116     GPR_ASSERT(false);
117   }
118 
119   /* Should be called after interceptors are done running */
ContinueFillOpsAfterInterception()120   void ContinueFillOpsAfterInterception() override {}
121 
122   /* Should be called after interceptors are done running on the finalize result
123    * path */
ContinueFinalizeResultAfterInterception()124   void ContinueFinalizeResultAfterInterception() override {
125     done_intercepting_ = true;
126     if (!has_tag_) {
127       // We don't have a tag to return.
128       Unref();
129       // Unref can delete this, so do not access anything from this afterward.
130       return;
131     }
132     /* Start a phony op so that we can return the tag */
133     GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_,
134                                      nullptr) == GRPC_CALL_OK);
135   }
136 
137  private:
CheckCancelledNoPluck()138   bool CheckCancelledNoPluck() {
139     grpc_core::MutexLock lock(&mu_);
140     return finalized_ ? (cancelled_ != 0) : false;
141   }
142 
143   internal::Call call_;
144   ::grpc::internal::ServerCallbackCall* const callback_controller_;
145   bool has_tag_;
146   void* tag_;
147   void* core_cq_tag_;
148   grpc_core::RefCount refs_;
149   grpc_core::Mutex mu_;
150   bool finalized_;
151   int cancelled_;  // This is an int (not bool) because it is passed to core
152   bool done_intercepting_;
153   internal::InterceptorBatchMethodsImpl interceptor_methods_;
154 };
155 
Unref()156 void ServerContextBase::CompletionOp::Unref() {
157   if (refs_.Unref()) {
158     grpc_call* call = call_.call();
159     delete this;
160     grpc_call_unref(call);
161   }
162 }
163 
FillOps(internal::Call * call)164 void ServerContextBase::CompletionOp::FillOps(internal::Call* call) {
165   grpc_op ops;
166   ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
167   ops.data.recv_close_on_server.cancelled = &cancelled_;
168   ops.flags = 0;
169   ops.reserved = nullptr;
170   interceptor_methods_.SetCall(&call_);
171   interceptor_methods_.SetReverse();
172   interceptor_methods_.SetCallOpSetInterface(this);
173   // The following call_start_batch is internally-generated so no need for an
174   // explanatory log on failure.
175   GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_,
176                                    nullptr) == GRPC_CALL_OK);
177   /* No interceptors to run here */
178 }
179 
FinalizeResult(void ** tag,bool * status)180 bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
181   // Decide whether to do the unref or call the cancel callback within the lock
182   bool do_unref = false;
183   bool has_tag = false;
184   bool call_cancel = false;
185 
186   {
187     grpc_core::MutexLock lock(&mu_);
188     if (done_intercepting_) {
189       // We are done intercepting.
190       has_tag = has_tag_;
191       if (has_tag) {
192         *tag = tag_;
193       }
194       // Release the lock before unreffing as Unref may delete this object
195       do_unref = true;
196     } else {
197       finalized_ = true;
198 
199       // If for some reason the incoming status is false, mark that as a
200       // cancellation.
201       // TODO(vjpai): does this ever happen?
202       if (!*status) {
203         cancelled_ = 1;
204       }
205 
206       call_cancel = (cancelled_ != 0);
207       // Release the lock since we may call a callback and interceptors.
208     }
209   }
210 
211   if (do_unref) {
212     Unref();
213     // Unref can delete this, so do not access anything from this afterward.
214     return has_tag;
215   }
216   if (call_cancel && callback_controller_ != nullptr) {
217     callback_controller_->MaybeCallOnCancel();
218   }
219   /* Add interception point and run through interceptors */
220   interceptor_methods_.AddInterceptionHookPoint(
221       experimental::InterceptionHookPoints::POST_RECV_CLOSE);
222   if (interceptor_methods_.RunInterceptors()) {
223     // No interceptors were run
224     bool has_tag = has_tag_;
225     if (has_tag) {
226       *tag = tag_;
227     }
228     Unref();
229     // Unref can delete this, so do not access anything from this afterward.
230     return has_tag;
231   }
232   // There are interceptors to be run. Return false for now.
233   return false;
234 }
235 
236 // ServerContextBase body
237 
ServerContextBase()238 ServerContextBase::ServerContextBase()
239     : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {
240   g_gli_initializer.summon();
241 }
242 
ServerContextBase(gpr_timespec deadline,grpc_metadata_array * arr)243 ServerContextBase::ServerContextBase(gpr_timespec deadline,
244                                      grpc_metadata_array* arr)
245     : deadline_(deadline) {
246   std::swap(*client_metadata_.arr(), *arr);
247 }
248 
BindDeadlineAndMetadata(gpr_timespec deadline,grpc_metadata_array * arr)249 void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline,
250                                                 grpc_metadata_array* arr) {
251   deadline_ = deadline;
252   std::swap(*client_metadata_.arr(), *arr);
253 }
254 
~ServerContextBase()255 ServerContextBase::~ServerContextBase() {
256   if (completion_op_) {
257     completion_op_->Unref();
258     // Unref can delete completion_op_, so do not access it afterward.
259   }
260   if (rpc_info_) {
261     rpc_info_->Unref();
262   }
263   if (default_reactor_used_.load(std::memory_order_relaxed)) {
264     reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor();
265   }
266 }
267 
~CallWrapper()268 ServerContextBase::CallWrapper::~CallWrapper() {
269   if (call) {
270     // If the ServerContext is part of the call's arena, this could free the
271     // object itself.
272     grpc_call_unref(call);
273   }
274 }
275 
BeginCompletionOp(internal::Call * call,std::function<void (bool)> callback,::grpc::internal::ServerCallbackCall * callback_controller)276 void ServerContextBase::BeginCompletionOp(
277     internal::Call* call, std::function<void(bool)> callback,
278     ::grpc::internal::ServerCallbackCall* callback_controller) {
279   GPR_ASSERT(!completion_op_);
280   if (rpc_info_) {
281     rpc_info_->Ref();
282   }
283   grpc_call_ref(call->call());
284   completion_op_ =
285       new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
286           CompletionOp(call, callback_controller);
287   if (callback_controller != nullptr) {
288     completion_tag_.Set(call->call(), std::move(callback), completion_op_,
289                         true);
290     completion_op_->set_core_cq_tag(&completion_tag_);
291     completion_op_->set_tag(completion_op_);
292   } else if (has_notify_when_done_tag_) {
293     completion_op_->set_tag(async_notify_when_done_tag_);
294   }
295   call->PerformOps(completion_op_);
296 }
297 
GetCompletionOpTag()298 internal::CompletionQueueTag* ServerContextBase::GetCompletionOpTag() {
299   return static_cast<internal::CompletionQueueTag*>(completion_op_);
300 }
301 
AddInitialMetadata(const std::string & key,const std::string & value)302 void ServerContextBase::AddInitialMetadata(const std::string& key,
303                                            const std::string& value) {
304   initial_metadata_.insert(std::make_pair(key, value));
305 }
306 
AddTrailingMetadata(const std::string & key,const std::string & value)307 void ServerContextBase::AddTrailingMetadata(const std::string& key,
308                                             const std::string& value) {
309   trailing_metadata_.insert(std::make_pair(key, value));
310 }
311 
TryCancel() const312 void ServerContextBase::TryCancel() const {
313   internal::CancelInterceptorBatchMethods cancel_methods;
314   if (rpc_info_) {
315     for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
316       rpc_info_->RunInterceptor(&cancel_methods, i);
317     }
318   }
319   grpc_call_error err =
320       grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED,
321                                    "Cancelled on the server side", nullptr);
322   if (err != GRPC_CALL_OK) {
323     gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
324   }
325 }
326 
IsCancelled() const327 bool ServerContextBase::IsCancelled() const {
328   if (completion_tag_) {
329     // When using callback API, this result is always valid.
330     return completion_op_->CheckCancelledAsync();
331   } else if (has_notify_when_done_tag_) {
332     // When using async API, the result is only valid
333     // if the tag has already been delivered at the completion queue
334     return completion_op_ && completion_op_->CheckCancelledAsync();
335   } else {
336     // when using sync API, the result is always valid
337     return completion_op_ && completion_op_->CheckCancelled(cq_);
338   }
339 }
340 
set_compression_algorithm(grpc_compression_algorithm algorithm)341 void ServerContextBase::set_compression_algorithm(
342     grpc_compression_algorithm algorithm) {
343   compression_algorithm_ = algorithm;
344   const char* algorithm_name = nullptr;
345   if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
346     gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
347             algorithm);
348     abort();
349   }
350   GPR_ASSERT(algorithm_name != nullptr);
351   AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
352 }
353 
peer() const354 std::string ServerContextBase::peer() const {
355   std::string peer;
356   if (call_.call) {
357     char* c_peer = grpc_call_get_peer(call_.call);
358     peer = c_peer;
359     gpr_free(c_peer);
360   }
361   return peer;
362 }
363 
census_context() const364 const struct census_context* ServerContextBase::census_context() const {
365   return call_.call == nullptr ? nullptr
366                                : grpc_census_call_get_context(call_.call);
367 }
368 
SetLoadReportingCosts(const std::vector<std::string> & cost_data)369 void ServerContextBase::SetLoadReportingCosts(
370     const std::vector<std::string>& cost_data) {
371   if (call_.call == nullptr) return;
372   for (const auto& cost_datum : cost_data) {
373     AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
374   }
375 }
376 
377 }  // namespace grpc
378