• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_METHOD_HANDLER_H
20 #define GRPCPP_SUPPORT_METHOD_HANDLER_H
21 
22 #include <grpc/byte_buffer.h>
23 #include <grpcpp/impl/rpc_service_method.h>
24 #include <grpcpp/support/byte_buffer.h>
25 #include <grpcpp/support/sync_stream.h>
26 
27 #include "absl/log/absl_check.h"
28 
29 namespace grpc {
30 
31 namespace internal {
32 
33 // Invoke the method handler, fill in the status, and
34 // return whether or not we finished safely (without an exception).
35 // Note that exception handling is 0-cost in most compiler/library
36 // implementations (except when an exception is actually thrown),
37 // so this process doesn't require additional overhead in the common case.
38 // Additionally, we don't need to return if we caught an exception or not;
39 // the handling is the same in either case.
40 template <class Callable>
CatchingFunctionHandler(Callable && handler)41 ::grpc::Status CatchingFunctionHandler(Callable&& handler) {
42 #if GRPC_ALLOW_EXCEPTIONS
43   try {
44     return handler();
45   } catch (...) {
46     return grpc::Status(grpc::StatusCode::UNKNOWN,
47                         "Unexpected error in RPC handling");
48   }
49 #else   // GRPC_ALLOW_EXCEPTIONS
50   return handler();
51 #endif  // GRPC_ALLOW_EXCEPTIONS
52 }
53 
54 /// A helper function with reduced templating to do the common work needed to
55 /// actually send the server response. Uses non-const parameter for Status since
56 /// this should only ever be called from the end of the RunHandler method.
57 
58 template <class ResponseType>
UnaryRunHandlerHelper(const MethodHandler::HandlerParameter & param,ResponseType * rsp,grpc::Status & status)59 void UnaryRunHandlerHelper(const MethodHandler::HandlerParameter& param,
60                            ResponseType* rsp, grpc::Status& status) {
61   ABSL_CHECK(!param.server_context->sent_initial_metadata_);
62   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
63                             grpc::internal::CallOpSendMessage,
64                             grpc::internal::CallOpServerSendStatus>
65       ops;
66   ops.SendInitialMetadata(&param.server_context->initial_metadata_,
67                           param.server_context->initial_metadata_flags());
68   if (param.server_context->compression_level_set()) {
69     ops.set_compression_level(param.server_context->compression_level());
70   }
71   if (status.ok()) {
72     status = ops.SendMessagePtr(rsp);
73   }
74   ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
75   param.call->PerformOps(&ops);
76   param.call->cq()->Pluck(&ops);
77 }
78 
79 /// A helper function with reduced templating to do deserializing.
80 
81 template <class RequestType>
UnaryDeserializeHelper(grpc_byte_buffer * req,grpc::Status * status,RequestType * request)82 void* UnaryDeserializeHelper(grpc_byte_buffer* req, grpc::Status* status,
83                              RequestType* request) {
84   grpc::ByteBuffer buf;
85   buf.set_buffer(req);
86   *status = grpc::SerializationTraits<RequestType>::Deserialize(
87       &buf, static_cast<RequestType*>(request));
88   buf.Release();
89   if (status->ok()) {
90     return request;
91   }
92   request->~RequestType();
93   return nullptr;
94 }
95 
96 /// A wrapper class of an application provided rpc method handler.
97 template <class ServiceType, class RequestType, class ResponseType,
98           class BaseRequestType = RequestType,
99           class BaseResponseType = ResponseType>
100 class RpcMethodHandler : public grpc::internal::MethodHandler {
101  public:
RpcMethodHandler(std::function<grpc::Status (ServiceType *,grpc::ServerContext *,const RequestType *,ResponseType *)> func,ServiceType * service)102   RpcMethodHandler(
103       std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
104                                  const RequestType*, ResponseType*)>
105           func,
106       ServiceType* service)
107       : func_(func), service_(service) {}
108 
RunHandler(const HandlerParameter & param)109   void RunHandler(const HandlerParameter& param) final {
110     ResponseType rsp;
111     grpc::Status status = param.status;
112     if (status.ok()) {
113       status = CatchingFunctionHandler([this, &param, &rsp] {
114         return func_(service_,
115                      static_cast<grpc::ServerContext*>(param.server_context),
116                      static_cast<RequestType*>(param.request), &rsp);
117       });
118       static_cast<RequestType*>(param.request)->~RequestType();
119     }
120     UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status);
121   }
122 
Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void **)123   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
124                     grpc::Status* status, void** /*handler_data*/) final {
125     auto* request =
126         new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType;
127     return UnaryDeserializeHelper(req, status,
128                                   static_cast<BaseRequestType*>(request));
129   }
130 
131  private:
132   /// Application provided rpc handler function.
133   std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
134                              const RequestType*, ResponseType*)>
135       func_;
136   // The class the above handler function lives in.
137   ServiceType* service_;
138 };
139 
140 /// A wrapper class of an application provided client streaming handler.
141 template <class ServiceType, class RequestType, class ResponseType>
142 class ClientStreamingHandler : public grpc::internal::MethodHandler {
143  public:
ClientStreamingHandler(std::function<grpc::Status (ServiceType *,grpc::ServerContext *,ServerReader<RequestType> *,ResponseType *)> func,ServiceType * service)144   ClientStreamingHandler(
145       std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
146                                  ServerReader<RequestType>*, ResponseType*)>
147           func,
148       ServiceType* service)
149       : func_(func), service_(service) {}
150 
RunHandler(const HandlerParameter & param)151   void RunHandler(const HandlerParameter& param) final {
152     ServerReader<RequestType> reader(
153         param.call, static_cast<grpc::ServerContext*>(param.server_context));
154     ResponseType rsp;
155     grpc::Status status =
156         CatchingFunctionHandler([this, &param, &reader, &rsp] {
157           return func_(service_,
158                        static_cast<grpc::ServerContext*>(param.server_context),
159                        &reader, &rsp);
160         });
161 
162     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
163                               grpc::internal::CallOpSendMessage,
164                               grpc::internal::CallOpServerSendStatus>
165         ops;
166     if (!param.server_context->sent_initial_metadata_) {
167       ops.SendInitialMetadata(&param.server_context->initial_metadata_,
168                               param.server_context->initial_metadata_flags());
169       if (param.server_context->compression_level_set()) {
170         ops.set_compression_level(param.server_context->compression_level());
171       }
172     }
173     if (status.ok()) {
174       status = ops.SendMessagePtr(&rsp);
175     }
176     ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
177     param.call->PerformOps(&ops);
178     param.call->cq()->Pluck(&ops);
179   }
180 
181  private:
182   std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
183                              ServerReader<RequestType>*, ResponseType*)>
184       func_;
185   ServiceType* service_;
186 };
187 
188 /// A wrapper class of an application provided server streaming handler.
189 template <class ServiceType, class RequestType, class ResponseType>
190 class ServerStreamingHandler : public grpc::internal::MethodHandler {
191  public:
ServerStreamingHandler(std::function<grpc::Status (ServiceType *,grpc::ServerContext *,const RequestType *,ServerWriter<ResponseType> *)> func,ServiceType * service)192   ServerStreamingHandler(std::function<grpc::Status(
193                              ServiceType*, grpc::ServerContext*,
194                              const RequestType*, ServerWriter<ResponseType>*)>
195                              func,
196                          ServiceType* service)
197       : func_(func), service_(service) {}
198 
RunHandler(const HandlerParameter & param)199   void RunHandler(const HandlerParameter& param) final {
200     grpc::Status status = param.status;
201     if (status.ok()) {
202       ServerWriter<ResponseType> writer(
203           param.call, static_cast<grpc::ServerContext*>(param.server_context));
204       status = CatchingFunctionHandler([this, &param, &writer] {
205         return func_(service_,
206                      static_cast<grpc::ServerContext*>(param.server_context),
207                      static_cast<RequestType*>(param.request), &writer);
208       });
209       static_cast<RequestType*>(param.request)->~RequestType();
210     }
211 
212     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
213                               grpc::internal::CallOpServerSendStatus>
214         ops;
215     if (!param.server_context->sent_initial_metadata_) {
216       ops.SendInitialMetadata(&param.server_context->initial_metadata_,
217                               param.server_context->initial_metadata_flags());
218       if (param.server_context->compression_level_set()) {
219         ops.set_compression_level(param.server_context->compression_level());
220       }
221     }
222     ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
223     param.call->PerformOps(&ops);
224     if (param.server_context->has_pending_ops_) {
225       param.call->cq()->Pluck(&param.server_context->pending_ops_);
226     }
227     param.call->cq()->Pluck(&ops);
228   }
229 
Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void **)230   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
231                     grpc::Status* status, void** /*handler_data*/) final {
232     grpc::ByteBuffer buf;
233     buf.set_buffer(req);
234     auto* request =
235         new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
236     *status =
237         grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
238     buf.Release();
239     if (status->ok()) {
240       return request;
241     }
242     request->~RequestType();
243     return nullptr;
244   }
245 
246  private:
247   std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
248                              const RequestType*, ServerWriter<ResponseType>*)>
249       func_;
250   ServiceType* service_;
251 };
252 
253 /// A wrapper class of an application provided bidi-streaming handler.
254 /// This also applies to server-streamed implementation of a unary method
255 /// with the additional requirement that such methods must have done a
256 /// write for status to be ok
257 /// Since this is used by more than 1 class, the service is not passed in.
258 /// Instead, it is expected to be an implicitly-captured argument of func
259 /// (through bind or something along those lines)
260 template <class Streamer, bool WriteNeeded>
261 class TemplatedBidiStreamingHandler : public grpc::internal::MethodHandler {
262  public:
TemplatedBidiStreamingHandler(std::function<grpc::Status (grpc::ServerContext *,Streamer *)> func)263   explicit TemplatedBidiStreamingHandler(
264       std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func)
265       : func_(func), write_needed_(WriteNeeded) {}
266 
RunHandler(const HandlerParameter & param)267   void RunHandler(const HandlerParameter& param) final {
268     Streamer stream(param.call,
269                     static_cast<grpc::ServerContext*>(param.server_context));
270     grpc::Status status = CatchingFunctionHandler([this, &param, &stream] {
271       return func_(static_cast<grpc::ServerContext*>(param.server_context),
272                    &stream);
273     });
274 
275     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
276                               grpc::internal::CallOpServerSendStatus>
277         ops;
278     if (!param.server_context->sent_initial_metadata_) {
279       ops.SendInitialMetadata(&param.server_context->initial_metadata_,
280                               param.server_context->initial_metadata_flags());
281       if (param.server_context->compression_level_set()) {
282         ops.set_compression_level(param.server_context->compression_level());
283       }
284       if (write_needed_ && status.ok()) {
285         // If we needed a write but never did one, we need to mark the
286         // status as a fail
287         status = grpc::Status(grpc::StatusCode::INTERNAL,
288                               "Service did not provide response message");
289       }
290     }
291     ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
292     param.call->PerformOps(&ops);
293     if (param.server_context->has_pending_ops_) {
294       param.call->cq()->Pluck(&param.server_context->pending_ops_);
295     }
296     param.call->cq()->Pluck(&ops);
297   }
298 
299  private:
300   std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func_;
301   const bool write_needed_;
302 };
303 
304 template <class ServiceType, class RequestType, class ResponseType>
305 class BidiStreamingHandler
306     : public TemplatedBidiStreamingHandler<
307           ServerReaderWriter<ResponseType, RequestType>, false> {
308  public:
BidiStreamingHandler(std::function<grpc::Status (ServiceType *,grpc::ServerContext *,ServerReaderWriter<ResponseType,RequestType> *)> func,ServiceType * service)309   BidiStreamingHandler(std::function<grpc::Status(
310                            ServiceType*, grpc::ServerContext*,
311                            ServerReaderWriter<ResponseType, RequestType>*)>
312                            func,
313                        ServiceType* service)
314       // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
315       : TemplatedBidiStreamingHandler<
316             ServerReaderWriter<ResponseType, RequestType>, false>(
317             [func, service](
318                 grpc::ServerContext* ctx,
319                 ServerReaderWriter<ResponseType, RequestType>* streamer) {
320               return func(service, ctx, streamer);
321             }) {}
322 };
323 
324 template <class RequestType, class ResponseType>
325 class StreamedUnaryHandler
326     : public TemplatedBidiStreamingHandler<
327           ServerUnaryStreamer<RequestType, ResponseType>, true> {
328  public:
StreamedUnaryHandler(std::function<grpc::Status (grpc::ServerContext *,ServerUnaryStreamer<RequestType,ResponseType> *)> func)329   explicit StreamedUnaryHandler(
330       std::function<
331           grpc::Status(grpc::ServerContext*,
332                        ServerUnaryStreamer<RequestType, ResponseType>*)>
333           func)
334       : TemplatedBidiStreamingHandler<
335             ServerUnaryStreamer<RequestType, ResponseType>, true>(
336             std::move(func)) {}
337 };
338 
339 template <class RequestType, class ResponseType>
340 class SplitServerStreamingHandler
341     : public TemplatedBidiStreamingHandler<
342           ServerSplitStreamer<RequestType, ResponseType>, false> {
343  public:
SplitServerStreamingHandler(std::function<grpc::Status (grpc::ServerContext *,ServerSplitStreamer<RequestType,ResponseType> *)> func)344   explicit SplitServerStreamingHandler(
345       std::function<
346           grpc::Status(grpc::ServerContext*,
347                        ServerSplitStreamer<RequestType, ResponseType>*)>
348           func)
349       : TemplatedBidiStreamingHandler<
350             ServerSplitStreamer<RequestType, ResponseType>, false>(
351             std::move(func)) {}
352 };
353 
354 /// General method handler class for errors that prevent real method use
355 /// e.g., handle unknown method by returning UNIMPLEMENTED error.
356 template <grpc::StatusCode code>
357 class ErrorMethodHandler : public grpc::internal::MethodHandler {
358  public:
ErrorMethodHandler(const std::string & message)359   explicit ErrorMethodHandler(const std::string& message) : message_(message) {}
360 
361   template <class T>
FillOps(grpc::ServerContextBase * context,const std::string & message,T * ops)362   static void FillOps(grpc::ServerContextBase* context,
363                       const std::string& message, T* ops) {
364     grpc::Status status(code, message);
365     if (!context->sent_initial_metadata_) {
366       ops->SendInitialMetadata(&context->initial_metadata_,
367                                context->initial_metadata_flags());
368       if (context->compression_level_set()) {
369         ops->set_compression_level(context->compression_level());
370       }
371       context->sent_initial_metadata_ = true;
372     }
373     ops->ServerSendStatus(&context->trailing_metadata_, status);
374   }
375 
RunHandler(const HandlerParameter & param)376   void RunHandler(const HandlerParameter& param) final {
377     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
378                               grpc::internal::CallOpServerSendStatus>
379         ops;
380     FillOps(param.server_context, message_, &ops);
381     param.call->PerformOps(&ops);
382     param.call->cq()->Pluck(&ops);
383   }
384 
Deserialize(grpc_call *,grpc_byte_buffer * req,grpc::Status *,void **)385   void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
386                     grpc::Status* /*status*/, void** /*handler_data*/) final {
387     // We have to destroy any request payload
388     if (req != nullptr) {
389       grpc_byte_buffer_destroy(req);
390     }
391     return nullptr;
392   }
393 
394  private:
395   const std::string message_;
396 };
397 
398 typedef ErrorMethodHandler<grpc::StatusCode::UNIMPLEMENTED>
399     UnknownMethodHandler;
400 typedef ErrorMethodHandler<grpc::StatusCode::RESOURCE_EXHAUSTED>
401     ResourceExhaustedHandler;
402 
403 }  // namespace internal
404 }  // namespace grpc
405 
406 #endif  // GRPCPP_SUPPORT_METHOD_HANDLER_H
407