• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
20 #define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
21 
22 #include <grpcpp/impl/codegen/byte_buffer.h>
23 #include <grpcpp/impl/codegen/core_codegen_interface.h>
24 #include <grpcpp/impl/codegen/rpc_service_method.h>
25 #include <grpcpp/impl/codegen/sync_stream_impl.h>
26 
27 namespace grpc_impl {
28 
29 namespace internal {
30 
31 // Invoke the method handler, fill in the status, and
32 // return whether or not we finished safely (without an exception).
33 // Note that exception handling is 0-cost in most compiler/library
34 // implementations (except when an exception is actually thrown),
35 // so this process doesn't require additional overhead in the common case.
36 // Additionally, we don't need to return if we caught an exception or not;
37 // the handling is the same in either case.
38 template <class Callable>
CatchingFunctionHandler(Callable && handler)39 ::grpc::Status CatchingFunctionHandler(Callable&& handler) {
40 #if GRPC_ALLOW_EXCEPTIONS
41   try {
42     return handler();
43   } catch (...) {
44     return ::grpc::Status(::grpc::StatusCode::UNKNOWN,
45                           "Unexpected error in RPC handling");
46   }
47 #else   // GRPC_ALLOW_EXCEPTIONS
48   return handler();
49 #endif  // GRPC_ALLOW_EXCEPTIONS
50 }
51 
52 /// A wrapper class of an application provided rpc method handler.
53 template <class ServiceType, class RequestType, class ResponseType>
54 class RpcMethodHandler : public ::grpc::internal::MethodHandler {
55  public:
RpcMethodHandler(std::function<::grpc::Status (ServiceType *,::grpc_impl::ServerContext *,const RequestType *,ResponseType *)> func,ServiceType * service)56   RpcMethodHandler(
57       std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
58                                    const RequestType*, ResponseType*)>
59           func,
60       ServiceType* service)
61       : func_(func), service_(service) {}
62 
RunHandler(const HandlerParameter & param)63   void RunHandler(const HandlerParameter& param) final {
64     ResponseType rsp;
65     ::grpc::Status status = param.status;
66     if (status.ok()) {
67       status = CatchingFunctionHandler([this, &param, &rsp] {
68         return func_(
69             service_,
70             static_cast<::grpc_impl::ServerContext*>(param.server_context),
71             static_cast<RequestType*>(param.request), &rsp);
72       });
73       static_cast<RequestType*>(param.request)->~RequestType();
74     }
75 
76     GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
77     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
78                                 ::grpc::internal::CallOpSendMessage,
79                                 ::grpc::internal::CallOpServerSendStatus>
80         ops;
81     ops.SendInitialMetadata(&param.server_context->initial_metadata_,
82                             param.server_context->initial_metadata_flags());
83     if (param.server_context->compression_level_set()) {
84       ops.set_compression_level(param.server_context->compression_level());
85     }
86     if (status.ok()) {
87       status = ops.SendMessagePtr(&rsp);
88     }
89     ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
90     param.call->PerformOps(&ops);
91     param.call->cq()->Pluck(&ops);
92   }
93 
Deserialize(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,void **)94   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
95                     ::grpc::Status* status, void** /*handler_data*/) final {
96     ::grpc::ByteBuffer buf;
97     buf.set_buffer(req);
98     auto* request =
99         new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
100             call, sizeof(RequestType))) RequestType();
101     *status =
102         ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
103     buf.Release();
104     if (status->ok()) {
105       return request;
106     }
107     request->~RequestType();
108     return nullptr;
109   }
110 
111  private:
112   /// Application provided rpc handler function.
113   std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
114                                const RequestType*, ResponseType*)>
115       func_;
116   // The class the above handler function lives in.
117   ServiceType* service_;
118 };
119 
120 /// A wrapper class of an application provided client streaming handler.
121 template <class ServiceType, class RequestType, class ResponseType>
122 class ClientStreamingHandler : public ::grpc::internal::MethodHandler {
123  public:
ClientStreamingHandler(std::function<::grpc::Status (ServiceType *,::grpc_impl::ServerContext *,::grpc_impl::ServerReader<RequestType> *,ResponseType *)> func,ServiceType * service)124   ClientStreamingHandler(
125       std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
126                                    ::grpc_impl::ServerReader<RequestType>*,
127                                    ResponseType*)>
128           func,
129       ServiceType* service)
130       : func_(func), service_(service) {}
131 
RunHandler(const HandlerParameter & param)132   void RunHandler(const HandlerParameter& param) final {
133     ::grpc_impl::ServerReader<RequestType> reader(
134         param.call,
135         static_cast<::grpc_impl::ServerContext*>(param.server_context));
136     ResponseType rsp;
137     ::grpc::Status status =
138         CatchingFunctionHandler([this, &param, &reader, &rsp] {
139           return func_(
140               service_,
141               static_cast<::grpc_impl::ServerContext*>(param.server_context),
142               &reader, &rsp);
143         });
144 
145     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
146                                 ::grpc::internal::CallOpSendMessage,
147                                 ::grpc::internal::CallOpServerSendStatus>
148         ops;
149     if (!param.server_context->sent_initial_metadata_) {
150       ops.SendInitialMetadata(&param.server_context->initial_metadata_,
151                               param.server_context->initial_metadata_flags());
152       if (param.server_context->compression_level_set()) {
153         ops.set_compression_level(param.server_context->compression_level());
154       }
155     }
156     if (status.ok()) {
157       status = ops.SendMessagePtr(&rsp);
158     }
159     ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
160     param.call->PerformOps(&ops);
161     param.call->cq()->Pluck(&ops);
162   }
163 
164  private:
165   std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
166                                ::grpc_impl::ServerReader<RequestType>*,
167                                ResponseType*)>
168       func_;
169   ServiceType* service_;
170 };
171 
172 /// A wrapper class of an application provided server streaming handler.
173 template <class ServiceType, class RequestType, class ResponseType>
174 class ServerStreamingHandler : public ::grpc::internal::MethodHandler {
175  public:
ServerStreamingHandler(std::function<::grpc::Status (ServiceType *,::grpc_impl::ServerContext *,const RequestType *,::grpc_impl::ServerWriter<ResponseType> *)> func,ServiceType * service)176   ServerStreamingHandler(
177       std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
178                                    const RequestType*,
179                                    ::grpc_impl::ServerWriter<ResponseType>*)>
180           func,
181       ServiceType* service)
182       : func_(func), service_(service) {}
183 
RunHandler(const HandlerParameter & param)184   void RunHandler(const HandlerParameter& param) final {
185     ::grpc::Status status = param.status;
186     if (status.ok()) {
187       ::grpc_impl::ServerWriter<ResponseType> writer(
188           param.call,
189           static_cast<::grpc_impl::ServerContext*>(param.server_context));
190       status = CatchingFunctionHandler([this, &param, &writer] {
191         return func_(
192             service_,
193             static_cast<::grpc_impl::ServerContext*>(param.server_context),
194             static_cast<RequestType*>(param.request), &writer);
195       });
196       static_cast<RequestType*>(param.request)->~RequestType();
197     }
198 
199     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
200                                 ::grpc::internal::CallOpServerSendStatus>
201         ops;
202     if (!param.server_context->sent_initial_metadata_) {
203       ops.SendInitialMetadata(&param.server_context->initial_metadata_,
204                               param.server_context->initial_metadata_flags());
205       if (param.server_context->compression_level_set()) {
206         ops.set_compression_level(param.server_context->compression_level());
207       }
208     }
209     ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
210     param.call->PerformOps(&ops);
211     if (param.server_context->has_pending_ops_) {
212       param.call->cq()->Pluck(&param.server_context->pending_ops_);
213     }
214     param.call->cq()->Pluck(&ops);
215   }
216 
Deserialize(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,void **)217   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
218                     ::grpc::Status* status, void** /*handler_data*/) final {
219     ::grpc::ByteBuffer buf;
220     buf.set_buffer(req);
221     auto* request =
222         new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
223             call, sizeof(RequestType))) RequestType();
224     *status =
225         ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
226     buf.Release();
227     if (status->ok()) {
228       return request;
229     }
230     request->~RequestType();
231     return nullptr;
232   }
233 
234  private:
235   std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
236                                const RequestType*,
237                                ::grpc_impl::ServerWriter<ResponseType>*)>
238       func_;
239   ServiceType* service_;
240 };
241 
242 /// A wrapper class of an application provided bidi-streaming handler.
243 /// This also applies to server-streamed implementation of a unary method
244 /// with the additional requirement that such methods must have done a
245 /// write for status to be ok
246 /// Since this is used by more than 1 class, the service is not passed in.
247 /// Instead, it is expected to be an implicitly-captured argument of func
248 /// (through bind or something along those lines)
249 template <class Streamer, bool WriteNeeded>
250 class TemplatedBidiStreamingHandler : public ::grpc::internal::MethodHandler {
251  public:
TemplatedBidiStreamingHandler(std::function<::grpc::Status (::grpc_impl::ServerContext *,Streamer *)> func)252   TemplatedBidiStreamingHandler(
253       std::function<::grpc::Status(::grpc_impl::ServerContext*, Streamer*)>
254           func)
255       : func_(func), write_needed_(WriteNeeded) {}
256 
RunHandler(const HandlerParameter & param)257   void RunHandler(const HandlerParameter& param) final {
258     Streamer stream(param.call, static_cast<::grpc_impl::ServerContext*>(
259                                     param.server_context));
260     ::grpc::Status status = CatchingFunctionHandler([this, &param, &stream] {
261       return func_(
262           static_cast<::grpc_impl::ServerContext*>(param.server_context),
263           &stream);
264     });
265 
266     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
267                                 ::grpc::internal::CallOpServerSendStatus>
268         ops;
269     if (!param.server_context->sent_initial_metadata_) {
270       ops.SendInitialMetadata(&param.server_context->initial_metadata_,
271                               param.server_context->initial_metadata_flags());
272       if (param.server_context->compression_level_set()) {
273         ops.set_compression_level(param.server_context->compression_level());
274       }
275       if (write_needed_ && status.ok()) {
276         // If we needed a write but never did one, we need to mark the
277         // status as a fail
278         status = ::grpc::Status(::grpc::StatusCode::INTERNAL,
279                                 "Service did not provide response message");
280       }
281     }
282     ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
283     param.call->PerformOps(&ops);
284     if (param.server_context->has_pending_ops_) {
285       param.call->cq()->Pluck(&param.server_context->pending_ops_);
286     }
287     param.call->cq()->Pluck(&ops);
288   }
289 
290  private:
291   std::function<::grpc::Status(::grpc_impl::ServerContext*, Streamer*)> func_;
292   const bool write_needed_;
293 };
294 
295 template <class ServiceType, class RequestType, class ResponseType>
296 class BidiStreamingHandler
297     : public TemplatedBidiStreamingHandler<
298           ::grpc_impl::ServerReaderWriter<ResponseType, RequestType>, false> {
299  public:
BidiStreamingHandler(std::function<::grpc::Status (ServiceType *,::grpc_impl::ServerContext *,::grpc_impl::ServerReaderWriter<ResponseType,RequestType> *)> func,ServiceType * service)300   BidiStreamingHandler(
301       std::function<::grpc::Status(
302           ServiceType*, ::grpc_impl::ServerContext*,
303           ::grpc_impl::ServerReaderWriter<ResponseType, RequestType>*)>
304           func,
305       ServiceType* service)
306       // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
307       : TemplatedBidiStreamingHandler<
308             ::grpc_impl::ServerReaderWriter<ResponseType, RequestType>, false>(
309             [func, service](
310                 ::grpc_impl::ServerContext* ctx,
311                 ::grpc_impl::ServerReaderWriter<ResponseType, RequestType>*
312                     streamer) { return func(service, ctx, streamer); }) {}
313 };
314 
315 template <class RequestType, class ResponseType>
316 class StreamedUnaryHandler
317     : public TemplatedBidiStreamingHandler<
318           ::grpc_impl::ServerUnaryStreamer<RequestType, ResponseType>, true> {
319  public:
StreamedUnaryHandler(std::function<::grpc::Status (::grpc_impl::ServerContext *,::grpc_impl::ServerUnaryStreamer<RequestType,ResponseType> *)> func)320   explicit StreamedUnaryHandler(
321       std::function<::grpc::Status(
322           ::grpc_impl::ServerContext*,
323           ::grpc_impl::ServerUnaryStreamer<RequestType, ResponseType>*)>
324           func)
325       : TemplatedBidiStreamingHandler<
326             ::grpc_impl::ServerUnaryStreamer<RequestType, ResponseType>, true>(
327             std::move(func)) {}
328 };
329 
330 template <class RequestType, class ResponseType>
331 class SplitServerStreamingHandler
332     : public TemplatedBidiStreamingHandler<
333           ::grpc_impl::ServerSplitStreamer<RequestType, ResponseType>, false> {
334  public:
SplitServerStreamingHandler(std::function<::grpc::Status (::grpc_impl::ServerContext *,::grpc_impl::ServerSplitStreamer<RequestType,ResponseType> *)> func)335   explicit SplitServerStreamingHandler(
336       std::function<::grpc::Status(
337           ::grpc_impl::ServerContext*,
338           ::grpc_impl::ServerSplitStreamer<RequestType, ResponseType>*)>
339           func)
340       : TemplatedBidiStreamingHandler<
341             ::grpc_impl::ServerSplitStreamer<RequestType, ResponseType>, false>(
342             std::move(func)) {}
343 };
344 
345 /// General method handler class for errors that prevent real method use
346 /// e.g., handle unknown method by returning UNIMPLEMENTED error.
347 template <::grpc::StatusCode code>
348 class ErrorMethodHandler : public ::grpc::internal::MethodHandler {
349  public:
350   template <class T>
FillOps(::grpc_impl::ServerContextBase * context,T * ops)351   static void FillOps(::grpc_impl::ServerContextBase* context, T* ops) {
352     ::grpc::Status status(code, "");
353     if (!context->sent_initial_metadata_) {
354       ops->SendInitialMetadata(&context->initial_metadata_,
355                                context->initial_metadata_flags());
356       if (context->compression_level_set()) {
357         ops->set_compression_level(context->compression_level());
358       }
359       context->sent_initial_metadata_ = true;
360     }
361     ops->ServerSendStatus(&context->trailing_metadata_, status);
362   }
363 
RunHandler(const HandlerParameter & param)364   void RunHandler(const HandlerParameter& param) final {
365     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
366                                 ::grpc::internal::CallOpServerSendStatus>
367         ops;
368     FillOps(param.server_context, &ops);
369     param.call->PerformOps(&ops);
370     param.call->cq()->Pluck(&ops);
371   }
372 
Deserialize(grpc_call *,grpc_byte_buffer * req,::grpc::Status *,void **)373   void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
374                     ::grpc::Status* /*status*/, void** /*handler_data*/) final {
375     // We have to destroy any request payload
376     if (req != nullptr) {
377       ::grpc::g_core_codegen_interface->grpc_byte_buffer_destroy(req);
378     }
379     return nullptr;
380   }
381 };
382 
383 typedef ErrorMethodHandler<::grpc::StatusCode::UNIMPLEMENTED>
384     UnknownMethodHandler;
385 typedef ErrorMethodHandler<::grpc::StatusCode::RESOURCE_EXHAUSTED>
386     ResourceExhaustedHandler;
387 
388 }  // namespace internal
389 }  // namespace grpc_impl
390 
391 #endif  // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
392