• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
20 #define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
21 
22 #include <grpcpp/impl/codegen/byte_buffer.h>
23 #include <grpcpp/impl/codegen/core_codegen_interface.h>
24 #include <grpcpp/impl/codegen/rpc_service_method.h>
25 #include <grpcpp/impl/codegen/sync_stream.h>
26 
27 namespace grpc {
28 
29 namespace internal {
30 
31 // Invoke the method handler, fill in the status, and
32 // return whether or not we finished safely (without an exception).
33 // Note that exception handling is 0-cost in most compiler/library
34 // implementations (except when an exception is actually thrown),
35 // so this process doesn't require additional overhead in the common case.
36 // Additionally, we don't need to return if we caught an exception or not;
37 // the handling is the same in either case.
38 template <class Callable>
CatchingFunctionHandler(Callable && handler)39 ::grpc::Status CatchingFunctionHandler(Callable&& handler) {
40 #if GRPC_ALLOW_EXCEPTIONS
41   try {
42     return handler();
43   } catch (...) {
44     return ::grpc::Status(::grpc::StatusCode::UNKNOWN,
45                           "Unexpected error in RPC handling");
46   }
47 #else   // GRPC_ALLOW_EXCEPTIONS
48   return handler();
49 #endif  // GRPC_ALLOW_EXCEPTIONS
50 }
51 
52 /// A helper function with reduced templating to do the common work needed to
53 /// actually send the server response. Uses non-const parameter for Status since
54 /// this should only ever be called from the end of the RunHandler method.
55 
56 template <class ResponseType>
UnaryRunHandlerHelper(const MethodHandler::HandlerParameter & param,ResponseType * rsp,::grpc::Status & status)57 void UnaryRunHandlerHelper(const MethodHandler::HandlerParameter& param,
58                            ResponseType* rsp, ::grpc::Status& status) {
59   GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
60   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
61                               ::grpc::internal::CallOpSendMessage,
62                               ::grpc::internal::CallOpServerSendStatus>
63       ops;
64   ops.SendInitialMetadata(&param.server_context->initial_metadata_,
65                           param.server_context->initial_metadata_flags());
66   if (param.server_context->compression_level_set()) {
67     ops.set_compression_level(param.server_context->compression_level());
68   }
69   if (status.ok()) {
70     status = ops.SendMessagePtr(rsp);
71   }
72   ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
73   param.call->PerformOps(&ops);
74   param.call->cq()->Pluck(&ops);
75 }
76 
77 /// A helper function with reduced templating to do deserializing.
78 
79 template <class RequestType>
UnaryDeserializeHelper(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,RequestType * request)80 void* UnaryDeserializeHelper(grpc_call* call, grpc_byte_buffer* req,
81                              ::grpc::Status* status, RequestType* request) {
82   ::grpc::ByteBuffer buf;
83   buf.set_buffer(req);
84   *status = ::grpc::SerializationTraits<RequestType>::Deserialize(
85       &buf, static_cast<RequestType*>(request));
86   buf.Release();
87   if (status->ok()) {
88     return request;
89   }
90   request->~RequestType();
91   return nullptr;
92 }
93 
94 /// A wrapper class of an application provided rpc method handler.
95 template <class ServiceType, class RequestType, class ResponseType,
96           class BaseRequestType = RequestType,
97           class BaseResponseType = ResponseType>
98 class RpcMethodHandler : public ::grpc::internal::MethodHandler {
99  public:
RpcMethodHandler(std::function<::grpc::Status (ServiceType *,::grpc::ServerContext *,const RequestType *,ResponseType *)> func,ServiceType * service)100   RpcMethodHandler(
101       std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
102                                    const RequestType*, ResponseType*)>
103           func,
104       ServiceType* service)
105       : func_(func), service_(service) {}
106 
RunHandler(const HandlerParameter & param)107   void RunHandler(const HandlerParameter& param) final {
108     ResponseType rsp;
109     ::grpc::Status status = param.status;
110     if (status.ok()) {
111       status = CatchingFunctionHandler([this, &param, &rsp] {
112         return func_(service_,
113                      static_cast<::grpc::ServerContext*>(param.server_context),
114                      static_cast<RequestType*>(param.request), &rsp);
115       });
116       static_cast<RequestType*>(param.request)->~RequestType();
117     }
118     UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status);
119   }
120 
Deserialize(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,void **)121   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
122                     ::grpc::Status* status, void** /*handler_data*/) final {
123     auto* request =
124         new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
125             call, sizeof(RequestType))) RequestType;
126     return UnaryDeserializeHelper(call, req, status,
127                                   static_cast<BaseRequestType*>(request));
128   }
129 
130  private:
131   /// Application provided rpc handler function.
132   std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
133                                const RequestType*, ResponseType*)>
134       func_;
135   // The class the above handler function lives in.
136   ServiceType* service_;
137 };
138 
139 /// A wrapper class of an application provided client streaming handler.
140 template <class ServiceType, class RequestType, class ResponseType>
141 class ClientStreamingHandler : public ::grpc::internal::MethodHandler {
142  public:
ClientStreamingHandler(std::function<::grpc::Status (ServiceType *,::grpc::ServerContext *,ServerReader<RequestType> *,ResponseType *)> func,ServiceType * service)143   ClientStreamingHandler(
144       std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
145                                    ServerReader<RequestType>*, ResponseType*)>
146           func,
147       ServiceType* service)
148       : func_(func), service_(service) {}
149 
RunHandler(const HandlerParameter & param)150   void RunHandler(const HandlerParameter& param) final {
151     ServerReader<RequestType> reader(
152         param.call, static_cast<::grpc::ServerContext*>(param.server_context));
153     ResponseType rsp;
154     ::grpc::Status status = CatchingFunctionHandler([this, &param, &reader,
155                                                      &rsp] {
156       return func_(service_,
157                    static_cast<::grpc::ServerContext*>(param.server_context),
158                    &reader, &rsp);
159     });
160 
161     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
162                                 ::grpc::internal::CallOpSendMessage,
163                                 ::grpc::internal::CallOpServerSendStatus>
164         ops;
165     if (!param.server_context->sent_initial_metadata_) {
166       ops.SendInitialMetadata(&param.server_context->initial_metadata_,
167                               param.server_context->initial_metadata_flags());
168       if (param.server_context->compression_level_set()) {
169         ops.set_compression_level(param.server_context->compression_level());
170       }
171     }
172     if (status.ok()) {
173       status = ops.SendMessagePtr(&rsp);
174     }
175     ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
176     param.call->PerformOps(&ops);
177     param.call->cq()->Pluck(&ops);
178   }
179 
180  private:
181   std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
182                                ServerReader<RequestType>*, ResponseType*)>
183       func_;
184   ServiceType* service_;
185 };
186 
187 /// A wrapper class of an application provided server streaming handler.
188 template <class ServiceType, class RequestType, class ResponseType>
189 class ServerStreamingHandler : public ::grpc::internal::MethodHandler {
190  public:
ServerStreamingHandler(std::function<::grpc::Status (ServiceType *,::grpc::ServerContext *,const RequestType *,ServerWriter<ResponseType> *)> func,ServiceType * service)191   ServerStreamingHandler(std::function<::grpc::Status(
192                              ServiceType*, ::grpc::ServerContext*,
193                              const RequestType*, ServerWriter<ResponseType>*)>
194                              func,
195                          ServiceType* service)
196       : func_(func), service_(service) {}
197 
RunHandler(const HandlerParameter & param)198   void RunHandler(const HandlerParameter& param) final {
199     ::grpc::Status status = param.status;
200     if (status.ok()) {
201       ServerWriter<ResponseType> writer(
202           param.call,
203           static_cast<::grpc::ServerContext*>(param.server_context));
204       status = CatchingFunctionHandler([this, &param, &writer] {
205         return func_(service_,
206                      static_cast<::grpc::ServerContext*>(param.server_context),
207                      static_cast<RequestType*>(param.request), &writer);
208       });
209       static_cast<RequestType*>(param.request)->~RequestType();
210     }
211 
212     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
213                                 ::grpc::internal::CallOpServerSendStatus>
214         ops;
215     if (!param.server_context->sent_initial_metadata_) {
216       ops.SendInitialMetadata(&param.server_context->initial_metadata_,
217                               param.server_context->initial_metadata_flags());
218       if (param.server_context->compression_level_set()) {
219         ops.set_compression_level(param.server_context->compression_level());
220       }
221     }
222     ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
223     param.call->PerformOps(&ops);
224     if (param.server_context->has_pending_ops_) {
225       param.call->cq()->Pluck(&param.server_context->pending_ops_);
226     }
227     param.call->cq()->Pluck(&ops);
228   }
229 
Deserialize(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,void **)230   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
231                     ::grpc::Status* status, void** /*handler_data*/) final {
232     ::grpc::ByteBuffer buf;
233     buf.set_buffer(req);
234     auto* request =
235         new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
236             call, sizeof(RequestType))) RequestType();
237     *status =
238         ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
239     buf.Release();
240     if (status->ok()) {
241       return request;
242     }
243     request->~RequestType();
244     return nullptr;
245   }
246 
247  private:
248   std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
249                                const RequestType*, ServerWriter<ResponseType>*)>
250       func_;
251   ServiceType* service_;
252 };
253 
254 /// A wrapper class of an application provided bidi-streaming handler.
255 /// This also applies to server-streamed implementation of a unary method
256 /// with the additional requirement that such methods must have done a
257 /// write for status to be ok
258 /// Since this is used by more than 1 class, the service is not passed in.
259 /// Instead, it is expected to be an implicitly-captured argument of func
260 /// (through bind or something along those lines)
261 template <class Streamer, bool WriteNeeded>
262 class TemplatedBidiStreamingHandler : public ::grpc::internal::MethodHandler {
263  public:
TemplatedBidiStreamingHandler(std::function<::grpc::Status (::grpc::ServerContext *,Streamer *)> func)264   explicit TemplatedBidiStreamingHandler(
265       std::function<::grpc::Status(::grpc::ServerContext*, Streamer*)> func)
266       : func_(func), write_needed_(WriteNeeded) {}
267 
RunHandler(const HandlerParameter & param)268   void RunHandler(const HandlerParameter& param) final {
269     Streamer stream(param.call,
270                     static_cast<::grpc::ServerContext*>(param.server_context));
271     ::grpc::Status status = CatchingFunctionHandler([this, &param, &stream] {
272       return func_(static_cast<::grpc::ServerContext*>(param.server_context),
273                    &stream);
274     });
275 
276     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
277                                 ::grpc::internal::CallOpServerSendStatus>
278         ops;
279     if (!param.server_context->sent_initial_metadata_) {
280       ops.SendInitialMetadata(&param.server_context->initial_metadata_,
281                               param.server_context->initial_metadata_flags());
282       if (param.server_context->compression_level_set()) {
283         ops.set_compression_level(param.server_context->compression_level());
284       }
285       if (write_needed_ && status.ok()) {
286         // If we needed a write but never did one, we need to mark the
287         // status as a fail
288         status = ::grpc::Status(::grpc::StatusCode::INTERNAL,
289                                 "Service did not provide response message");
290       }
291     }
292     ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
293     param.call->PerformOps(&ops);
294     if (param.server_context->has_pending_ops_) {
295       param.call->cq()->Pluck(&param.server_context->pending_ops_);
296     }
297     param.call->cq()->Pluck(&ops);
298   }
299 
300  private:
301   std::function<::grpc::Status(::grpc::ServerContext*, Streamer*)> func_;
302   const bool write_needed_;
303 };
304 
305 template <class ServiceType, class RequestType, class ResponseType>
306 class BidiStreamingHandler
307     : public TemplatedBidiStreamingHandler<
308           ServerReaderWriter<ResponseType, RequestType>, false> {
309  public:
BidiStreamingHandler(std::function<::grpc::Status (ServiceType *,::grpc::ServerContext *,ServerReaderWriter<ResponseType,RequestType> *)> func,ServiceType * service)310   BidiStreamingHandler(std::function<::grpc::Status(
311                            ServiceType*, ::grpc::ServerContext*,
312                            ServerReaderWriter<ResponseType, RequestType>*)>
313                            func,
314                        ServiceType* service)
315       // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
316       : TemplatedBidiStreamingHandler<
317             ServerReaderWriter<ResponseType, RequestType>, false>(
318             [func, service](
319                 ::grpc::ServerContext* ctx,
320                 ServerReaderWriter<ResponseType, RequestType>* streamer) {
321               return func(service, ctx, streamer);
322             }) {}
323 };
324 
325 template <class RequestType, class ResponseType>
326 class StreamedUnaryHandler
327     : public TemplatedBidiStreamingHandler<
328           ServerUnaryStreamer<RequestType, ResponseType>, true> {
329  public:
StreamedUnaryHandler(std::function<::grpc::Status (::grpc::ServerContext *,ServerUnaryStreamer<RequestType,ResponseType> *)> func)330   explicit StreamedUnaryHandler(
331       std::function<
332           ::grpc::Status(::grpc::ServerContext*,
333                          ServerUnaryStreamer<RequestType, ResponseType>*)>
334           func)
335       : TemplatedBidiStreamingHandler<
336             ServerUnaryStreamer<RequestType, ResponseType>, true>(
337             std::move(func)) {}
338 };
339 
340 template <class RequestType, class ResponseType>
341 class SplitServerStreamingHandler
342     : public TemplatedBidiStreamingHandler<
343           ServerSplitStreamer<RequestType, ResponseType>, false> {
344  public:
SplitServerStreamingHandler(std::function<::grpc::Status (::grpc::ServerContext *,ServerSplitStreamer<RequestType,ResponseType> *)> func)345   explicit SplitServerStreamingHandler(
346       std::function<
347           ::grpc::Status(::grpc::ServerContext*,
348                          ServerSplitStreamer<RequestType, ResponseType>*)>
349           func)
350       : TemplatedBidiStreamingHandler<
351             ServerSplitStreamer<RequestType, ResponseType>, false>(
352             std::move(func)) {}
353 };
354 
355 /// General method handler class for errors that prevent real method use
356 /// e.g., handle unknown method by returning UNIMPLEMENTED error.
357 template <::grpc::StatusCode code>
358 class ErrorMethodHandler : public ::grpc::internal::MethodHandler {
359  public:
360   template <class T>
FillOps(::grpc::ServerContextBase * context,T * ops)361   static void FillOps(::grpc::ServerContextBase* context, T* ops) {
362     ::grpc::Status status(code, "");
363     if (!context->sent_initial_metadata_) {
364       ops->SendInitialMetadata(&context->initial_metadata_,
365                                context->initial_metadata_flags());
366       if (context->compression_level_set()) {
367         ops->set_compression_level(context->compression_level());
368       }
369       context->sent_initial_metadata_ = true;
370     }
371     ops->ServerSendStatus(&context->trailing_metadata_, status);
372   }
373 
RunHandler(const HandlerParameter & param)374   void RunHandler(const HandlerParameter& param) final {
375     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
376                                 ::grpc::internal::CallOpServerSendStatus>
377         ops;
378     FillOps(param.server_context, &ops);
379     param.call->PerformOps(&ops);
380     param.call->cq()->Pluck(&ops);
381   }
382 
Deserialize(grpc_call *,grpc_byte_buffer * req,::grpc::Status *,void **)383   void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
384                     ::grpc::Status* /*status*/, void** /*handler_data*/) final {
385     // We have to destroy any request payload
386     if (req != nullptr) {
387       ::grpc::g_core_codegen_interface->grpc_byte_buffer_destroy(req);
388     }
389     return nullptr;
390   }
391 };
392 
393 typedef ErrorMethodHandler<::grpc::StatusCode::UNIMPLEMENTED>
394     UnknownMethodHandler;
395 typedef ErrorMethodHandler<::grpc::StatusCode::RESOURCE_EXHAUSTED>
396     ResourceExhaustedHandler;
397 
398 }  // namespace internal
399 }  // namespace grpc
400 
401 #endif  // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
402