• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpcpp/impl/codegen/server_context.h>
20 
21 #include <algorithm>
22 #include <utility>
23 
24 #include <grpc/compression.h>
25 #include <grpc/grpc.h>
26 #include <grpc/load_reporting.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpcpp/impl/call.h>
30 #include <grpcpp/impl/codegen/completion_queue.h>
31 #include <grpcpp/support/server_callback.h>
32 #include <grpcpp/support/time.h>
33 
34 #include "src/core/lib/gprpp/ref_counted.h"
35 #include "src/core/lib/gprpp/sync.h"
36 #include "src/core/lib/surface/call.h"
37 
38 namespace grpc {
39 
40 // CompletionOp
41 
42 class ServerContextBase::CompletionOp final
43     : public internal::CallOpSetInterface {
44  public:
45   // initial refs: one in the server context, one in the cq
46   // must ref the call before calling constructor and after deleting this
CompletionOp(internal::Call * call,::grpc::internal::ServerCallbackCall * callback_controller)47   CompletionOp(internal::Call* call,
48                ::grpc::internal::ServerCallbackCall* callback_controller)
49       : call_(*call),
50         callback_controller_(callback_controller),
51         has_tag_(false),
52         tag_(nullptr),
53         core_cq_tag_(this),
54         refs_(2),
55         finalized_(false),
56         cancelled_(0),
57         done_intercepting_(false) {}
58 
59   // CompletionOp isn't copyable or movable
60   CompletionOp(const CompletionOp&) = delete;
61   CompletionOp& operator=(const CompletionOp&) = delete;
62   CompletionOp(CompletionOp&&) = delete;
63   CompletionOp& operator=(CompletionOp&&) = delete;
64 
~CompletionOp()65   ~CompletionOp() override {
66     if (call_.server_rpc_info()) {
67       call_.server_rpc_info()->Unref();
68     }
69   }
70 
71   void FillOps(internal::Call* call) override;
72 
73   // This should always be arena allocated in the call, so override delete.
74   // But this class is not trivially destructible, so must actually call delete
75   // before allowing the arena to be freed
operator delete(void *,std::size_t size)76   static void operator delete(void* /*ptr*/, std::size_t size) {
77     // Use size to avoid unused-parameter warning since assert seems to be
78     // compiled out and treated as unused in some gcc optimized versions.
79     (void)size;
80     assert(size == sizeof(CompletionOp));
81   }
82 
83   // This operator should never be called as the memory should be freed as part
84   // of the arena destruction. It only exists to provide a matching operator
85   // delete to the operator new so that some compilers will not complain (see
86   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
87   // there are no tests catching the compiler warning.
operator delete(void *,void *)88   static void operator delete(void*, void*) { assert(0); }
89 
90   bool FinalizeResult(void** tag, bool* status) override;
91 
CheckCancelled(CompletionQueue * cq)92   bool CheckCancelled(CompletionQueue* cq) {
93     cq->TryPluck(this);
94     return CheckCancelledNoPluck();
95   }
CheckCancelledAsync()96   bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
97 
set_tag(void * tag)98   void set_tag(void* tag) {
99     has_tag_ = true;
100     tag_ = tag;
101   }
102 
set_core_cq_tag(void * core_cq_tag)103   void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
104 
core_cq_tag()105   void* core_cq_tag() override { return core_cq_tag_; }
106 
107   void Unref();
108 
109   // This will be called while interceptors are run if the RPC is a hijacked
110   // RPC. This should set hijacking state for each of the ops.
SetHijackingState()111   void SetHijackingState() override {
112     /* Servers don't allow hijacking */
113     GPR_ASSERT(false);
114   }
115 
116   /* Should be called after interceptors are done running */
ContinueFillOpsAfterInterception()117   void ContinueFillOpsAfterInterception() override {}
118 
119   /* Should be called after interceptors are done running on the finalize result
120    * path */
ContinueFinalizeResultAfterInterception()121   void ContinueFinalizeResultAfterInterception() override {
122     done_intercepting_ = true;
123     if (!has_tag_) {
124       // We don't have a tag to return.
125       Unref();
126       // Unref can delete this, so do not access anything from this afterward.
127       return;
128     }
129     /* Start a dummy op so that we can return the tag */
130     GPR_ASSERT(grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_,
131                                      nullptr) == GRPC_CALL_OK);
132   }
133 
134  private:
CheckCancelledNoPluck()135   bool CheckCancelledNoPluck() {
136     grpc_core::MutexLock lock(&mu_);
137     return finalized_ ? (cancelled_ != 0) : false;
138   }
139 
140   internal::Call call_;
141   ::grpc::internal::ServerCallbackCall* const callback_controller_;
142   bool has_tag_;
143   void* tag_;
144   void* core_cq_tag_;
145   grpc_core::RefCount refs_;
146   grpc_core::Mutex mu_;
147   bool finalized_;
148   int cancelled_;  // This is an int (not bool) because it is passed to core
149   bool done_intercepting_;
150   internal::InterceptorBatchMethodsImpl interceptor_methods_;
151 };
152 
Unref()153 void ServerContextBase::CompletionOp::Unref() {
154   if (refs_.Unref()) {
155     grpc_call* call = call_.call();
156     delete this;
157     grpc_call_unref(call);
158   }
159 }
160 
FillOps(internal::Call * call)161 void ServerContextBase::CompletionOp::FillOps(internal::Call* call) {
162   grpc_op ops;
163   ops.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
164   ops.data.recv_close_on_server.cancelled = &cancelled_;
165   ops.flags = 0;
166   ops.reserved = nullptr;
167   interceptor_methods_.SetCall(&call_);
168   interceptor_methods_.SetReverse();
169   interceptor_methods_.SetCallOpSetInterface(this);
170   // The following call_start_batch is internally-generated so no need for an
171   // explanatory log on failure.
172   GPR_ASSERT(grpc_call_start_batch(call->call(), &ops, 1, core_cq_tag_,
173                                    nullptr) == GRPC_CALL_OK);
174   /* No interceptors to run here */
175 }
176 
FinalizeResult(void ** tag,bool * status)177 bool ServerContextBase::CompletionOp::FinalizeResult(void** tag, bool* status) {
178   // Decide whether to do the unref or call the cancel callback within the lock
179   bool do_unref = false;
180   bool has_tag = false;
181   bool call_cancel = false;
182 
183   {
184     grpc_core::MutexLock lock(&mu_);
185     if (done_intercepting_) {
186       // We are done intercepting.
187       has_tag = has_tag_;
188       if (has_tag) {
189         *tag = tag_;
190       }
191       // Release the lock before unreffing as Unref may delete this object
192       do_unref = true;
193     } else {
194       finalized_ = true;
195 
196       // If for some reason the incoming status is false, mark that as a
197       // cancellation.
198       // TODO(vjpai): does this ever happen?
199       if (!*status) {
200         cancelled_ = 1;
201       }
202 
203       call_cancel = (cancelled_ != 0);
204       // Release the lock since we may call a callback and interceptors.
205     }
206   }
207 
208   if (do_unref) {
209     Unref();
210     // Unref can delete this, so do not access anything from this afterward.
211     return has_tag;
212   }
213   if (call_cancel && callback_controller_ != nullptr) {
214     callback_controller_->MaybeCallOnCancel();
215   }
216   /* Add interception point and run through interceptors */
217   interceptor_methods_.AddInterceptionHookPoint(
218       experimental::InterceptionHookPoints::POST_RECV_CLOSE);
219   if (interceptor_methods_.RunInterceptors()) {
220     // No interceptors were run
221     bool has_tag = has_tag_;
222     if (has_tag) {
223       *tag = tag_;
224     }
225     Unref();
226     // Unref can delete this, so do not access anything from this afterward.
227     return has_tag;
228   }
229   // There are interceptors to be run. Return false for now.
230   return false;
231 }
232 
233 // ServerContextBase body
234 
ServerContextBase()235 ServerContextBase::ServerContextBase()
236     : deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)) {}
237 
ServerContextBase(gpr_timespec deadline,grpc_metadata_array * arr)238 ServerContextBase::ServerContextBase(gpr_timespec deadline,
239                                      grpc_metadata_array* arr)
240     : deadline_(deadline) {
241   std::swap(*client_metadata_.arr(), *arr);
242 }
243 
BindDeadlineAndMetadata(gpr_timespec deadline,grpc_metadata_array * arr)244 void ServerContextBase::BindDeadlineAndMetadata(gpr_timespec deadline,
245                                                 grpc_metadata_array* arr) {
246   deadline_ = deadline;
247   std::swap(*client_metadata_.arr(), *arr);
248 }
249 
~ServerContextBase()250 ServerContextBase::~ServerContextBase() {
251   if (completion_op_) {
252     completion_op_->Unref();
253     // Unref can delete completion_op_, so do not access it afterward.
254   }
255   if (rpc_info_) {
256     rpc_info_->Unref();
257   }
258   if (default_reactor_used_.load(std::memory_order_relaxed)) {
259     reinterpret_cast<Reactor*>(&default_reactor_)->~Reactor();
260   }
261 }
262 
~CallWrapper()263 ServerContextBase::CallWrapper::~CallWrapper() {
264   if (call) {
265     // If the ServerContext is part of the call's arena, this could free the
266     // object itself.
267     grpc_call_unref(call);
268   }
269 }
270 
BeginCompletionOp(internal::Call * call,std::function<void (bool)> callback,::grpc::internal::ServerCallbackCall * callback_controller)271 void ServerContextBase::BeginCompletionOp(
272     internal::Call* call, std::function<void(bool)> callback,
273     ::grpc::internal::ServerCallbackCall* callback_controller) {
274   GPR_ASSERT(!completion_op_);
275   if (rpc_info_) {
276     rpc_info_->Ref();
277   }
278   grpc_call_ref(call->call());
279   completion_op_ =
280       new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
281           CompletionOp(call, callback_controller);
282   if (callback_controller != nullptr) {
283     completion_tag_.Set(call->call(), std::move(callback), completion_op_,
284                         true);
285     completion_op_->set_core_cq_tag(&completion_tag_);
286     completion_op_->set_tag(completion_op_);
287   } else if (has_notify_when_done_tag_) {
288     completion_op_->set_tag(async_notify_when_done_tag_);
289   }
290   call->PerformOps(completion_op_);
291 }
292 
GetCompletionOpTag()293 internal::CompletionQueueTag* ServerContextBase::GetCompletionOpTag() {
294   return static_cast<internal::CompletionQueueTag*>(completion_op_);
295 }
296 
AddInitialMetadata(const std::string & key,const std::string & value)297 void ServerContextBase::AddInitialMetadata(const std::string& key,
298                                            const std::string& value) {
299   initial_metadata_.insert(std::make_pair(key, value));
300 }
301 
AddTrailingMetadata(const std::string & key,const std::string & value)302 void ServerContextBase::AddTrailingMetadata(const std::string& key,
303                                             const std::string& value) {
304   trailing_metadata_.insert(std::make_pair(key, value));
305 }
306 
TryCancel() const307 void ServerContextBase::TryCancel() const {
308   internal::CancelInterceptorBatchMethods cancel_methods;
309   if (rpc_info_) {
310     for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
311       rpc_info_->RunInterceptor(&cancel_methods, i);
312     }
313   }
314   grpc_call_error err =
315       grpc_call_cancel_with_status(call_.call, GRPC_STATUS_CANCELLED,
316                                    "Cancelled on the server side", nullptr);
317   if (err != GRPC_CALL_OK) {
318     gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
319   }
320 }
321 
IsCancelled() const322 bool ServerContextBase::IsCancelled() const {
323   if (completion_tag_) {
324     // When using callback API, this result is always valid.
325     return completion_op_->CheckCancelledAsync();
326   } else if (has_notify_when_done_tag_) {
327     // When using async API, the result is only valid
328     // if the tag has already been delivered at the completion queue
329     return completion_op_ && completion_op_->CheckCancelledAsync();
330   } else {
331     // when using sync API, the result is always valid
332     return completion_op_ && completion_op_->CheckCancelled(cq_);
333   }
334 }
335 
set_compression_algorithm(grpc_compression_algorithm algorithm)336 void ServerContextBase::set_compression_algorithm(
337     grpc_compression_algorithm algorithm) {
338   compression_algorithm_ = algorithm;
339   const char* algorithm_name = nullptr;
340   if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
341     gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
342             algorithm);
343     abort();
344   }
345   GPR_ASSERT(algorithm_name != nullptr);
346   AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
347 }
348 
peer() const349 std::string ServerContextBase::peer() const {
350   std::string peer;
351   if (call_.call) {
352     char* c_peer = grpc_call_get_peer(call_.call);
353     peer = c_peer;
354     gpr_free(c_peer);
355   }
356   return peer;
357 }
358 
census_context() const359 const struct census_context* ServerContextBase::census_context() const {
360   return call_.call == nullptr ? nullptr
361                                : grpc_census_call_get_context(call_.call);
362 }
363 
SetLoadReportingCosts(const std::vector<std::string> & cost_data)364 void ServerContextBase::SetLoadReportingCosts(
365     const std::vector<std::string>& cost_data) {
366   if (call_.call == nullptr) return;
367   for (const auto& cost_datum : cost_data) {
368     AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
369   }
370 }
371 
372 }  // namespace grpc
373