1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
20 #define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
21
22 #include <grpcpp/impl/codegen/byte_buffer.h>
23 #include <grpcpp/impl/codegen/core_codegen_interface.h>
24 #include <grpcpp/impl/codegen/rpc_service_method.h>
25 #include <grpcpp/impl/codegen/sync_stream.h>
26
27 namespace grpc {
28
29 namespace internal {
30
31 // Invoke the method handler, fill in the status, and
32 // return whether or not we finished safely (without an exception).
33 // Note that exception handling is 0-cost in most compiler/library
34 // implementations (except when an exception is actually thrown),
35 // so this process doesn't require additional overhead in the common case.
36 // Additionally, we don't need to return if we caught an exception or not;
37 // the handling is the same in either case.
38 template <class Callable>
CatchingFunctionHandler(Callable && handler)39 ::grpc::Status CatchingFunctionHandler(Callable&& handler) {
40 #if GRPC_ALLOW_EXCEPTIONS
41 try {
42 return handler();
43 } catch (...) {
44 return ::grpc::Status(::grpc::StatusCode::UNKNOWN,
45 "Unexpected error in RPC handling");
46 }
47 #else // GRPC_ALLOW_EXCEPTIONS
48 return handler();
49 #endif // GRPC_ALLOW_EXCEPTIONS
50 }
51
52 /// A helper function with reduced templating to do the common work needed to
53 /// actually send the server response. Uses non-const parameter for Status since
54 /// this should only ever be called from the end of the RunHandler method.
55
56 template <class ResponseType>
UnaryRunHandlerHelper(const MethodHandler::HandlerParameter & param,ResponseType * rsp,::grpc::Status & status)57 void UnaryRunHandlerHelper(const MethodHandler::HandlerParameter& param,
58 ResponseType* rsp, ::grpc::Status& status) {
59 GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
60 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
61 ::grpc::internal::CallOpSendMessage,
62 ::grpc::internal::CallOpServerSendStatus>
63 ops;
64 ops.SendInitialMetadata(¶m.server_context->initial_metadata_,
65 param.server_context->initial_metadata_flags());
66 if (param.server_context->compression_level_set()) {
67 ops.set_compression_level(param.server_context->compression_level());
68 }
69 if (status.ok()) {
70 status = ops.SendMessagePtr(rsp);
71 }
72 ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status);
73 param.call->PerformOps(&ops);
74 param.call->cq()->Pluck(&ops);
75 }
76
77 /// A helper function with reduced templating to do deserializing.
78
79 template <class RequestType>
UnaryDeserializeHelper(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,RequestType * request)80 void* UnaryDeserializeHelper(grpc_call* call, grpc_byte_buffer* req,
81 ::grpc::Status* status, RequestType* request) {
82 ::grpc::ByteBuffer buf;
83 buf.set_buffer(req);
84 *status = ::grpc::SerializationTraits<RequestType>::Deserialize(
85 &buf, static_cast<RequestType*>(request));
86 buf.Release();
87 if (status->ok()) {
88 return request;
89 }
90 request->~RequestType();
91 return nullptr;
92 }
93
94 /// A wrapper class of an application provided rpc method handler.
95 template <class ServiceType, class RequestType, class ResponseType,
96 class BaseRequestType = RequestType,
97 class BaseResponseType = ResponseType>
98 class RpcMethodHandler : public ::grpc::internal::MethodHandler {
99 public:
RpcMethodHandler(std::function<::grpc::Status (ServiceType *,::grpc::ServerContext *,const RequestType *,ResponseType *)> func,ServiceType * service)100 RpcMethodHandler(
101 std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
102 const RequestType*, ResponseType*)>
103 func,
104 ServiceType* service)
105 : func_(func), service_(service) {}
106
RunHandler(const HandlerParameter & param)107 void RunHandler(const HandlerParameter& param) final {
108 ResponseType rsp;
109 ::grpc::Status status = param.status;
110 if (status.ok()) {
111 status = CatchingFunctionHandler([this, ¶m, &rsp] {
112 return func_(service_,
113 static_cast<::grpc::ServerContext*>(param.server_context),
114 static_cast<RequestType*>(param.request), &rsp);
115 });
116 static_cast<RequestType*>(param.request)->~RequestType();
117 }
118 UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status);
119 }
120
Deserialize(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,void **)121 void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
122 ::grpc::Status* status, void** /*handler_data*/) final {
123 auto* request =
124 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
125 call, sizeof(RequestType))) RequestType;
126 return UnaryDeserializeHelper(call, req, status,
127 static_cast<BaseRequestType*>(request));
128 }
129
130 private:
131 /// Application provided rpc handler function.
132 std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
133 const RequestType*, ResponseType*)>
134 func_;
135 // The class the above handler function lives in.
136 ServiceType* service_;
137 };
138
139 /// A wrapper class of an application provided client streaming handler.
140 template <class ServiceType, class RequestType, class ResponseType>
141 class ClientStreamingHandler : public ::grpc::internal::MethodHandler {
142 public:
ClientStreamingHandler(std::function<::grpc::Status (ServiceType *,::grpc::ServerContext *,ServerReader<RequestType> *,ResponseType *)> func,ServiceType * service)143 ClientStreamingHandler(
144 std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
145 ServerReader<RequestType>*, ResponseType*)>
146 func,
147 ServiceType* service)
148 : func_(func), service_(service) {}
149
RunHandler(const HandlerParameter & param)150 void RunHandler(const HandlerParameter& param) final {
151 ServerReader<RequestType> reader(
152 param.call, static_cast<::grpc::ServerContext*>(param.server_context));
153 ResponseType rsp;
154 ::grpc::Status status = CatchingFunctionHandler([this, ¶m, &reader,
155 &rsp] {
156 return func_(service_,
157 static_cast<::grpc::ServerContext*>(param.server_context),
158 &reader, &rsp);
159 });
160
161 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
162 ::grpc::internal::CallOpSendMessage,
163 ::grpc::internal::CallOpServerSendStatus>
164 ops;
165 if (!param.server_context->sent_initial_metadata_) {
166 ops.SendInitialMetadata(¶m.server_context->initial_metadata_,
167 param.server_context->initial_metadata_flags());
168 if (param.server_context->compression_level_set()) {
169 ops.set_compression_level(param.server_context->compression_level());
170 }
171 }
172 if (status.ok()) {
173 status = ops.SendMessagePtr(&rsp);
174 }
175 ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status);
176 param.call->PerformOps(&ops);
177 param.call->cq()->Pluck(&ops);
178 }
179
180 private:
181 std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
182 ServerReader<RequestType>*, ResponseType*)>
183 func_;
184 ServiceType* service_;
185 };
186
187 /// A wrapper class of an application provided server streaming handler.
188 template <class ServiceType, class RequestType, class ResponseType>
189 class ServerStreamingHandler : public ::grpc::internal::MethodHandler {
190 public:
ServerStreamingHandler(std::function<::grpc::Status (ServiceType *,::grpc::ServerContext *,const RequestType *,ServerWriter<ResponseType> *)> func,ServiceType * service)191 ServerStreamingHandler(std::function<::grpc::Status(
192 ServiceType*, ::grpc::ServerContext*,
193 const RequestType*, ServerWriter<ResponseType>*)>
194 func,
195 ServiceType* service)
196 : func_(func), service_(service) {}
197
RunHandler(const HandlerParameter & param)198 void RunHandler(const HandlerParameter& param) final {
199 ::grpc::Status status = param.status;
200 if (status.ok()) {
201 ServerWriter<ResponseType> writer(
202 param.call,
203 static_cast<::grpc::ServerContext*>(param.server_context));
204 status = CatchingFunctionHandler([this, ¶m, &writer] {
205 return func_(service_,
206 static_cast<::grpc::ServerContext*>(param.server_context),
207 static_cast<RequestType*>(param.request), &writer);
208 });
209 static_cast<RequestType*>(param.request)->~RequestType();
210 }
211
212 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
213 ::grpc::internal::CallOpServerSendStatus>
214 ops;
215 if (!param.server_context->sent_initial_metadata_) {
216 ops.SendInitialMetadata(¶m.server_context->initial_metadata_,
217 param.server_context->initial_metadata_flags());
218 if (param.server_context->compression_level_set()) {
219 ops.set_compression_level(param.server_context->compression_level());
220 }
221 }
222 ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status);
223 param.call->PerformOps(&ops);
224 if (param.server_context->has_pending_ops_) {
225 param.call->cq()->Pluck(¶m.server_context->pending_ops_);
226 }
227 param.call->cq()->Pluck(&ops);
228 }
229
Deserialize(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,void **)230 void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
231 ::grpc::Status* status, void** /*handler_data*/) final {
232 ::grpc::ByteBuffer buf;
233 buf.set_buffer(req);
234 auto* request =
235 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
236 call, sizeof(RequestType))) RequestType();
237 *status =
238 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
239 buf.Release();
240 if (status->ok()) {
241 return request;
242 }
243 request->~RequestType();
244 return nullptr;
245 }
246
247 private:
248 std::function<::grpc::Status(ServiceType*, ::grpc::ServerContext*,
249 const RequestType*, ServerWriter<ResponseType>*)>
250 func_;
251 ServiceType* service_;
252 };
253
254 /// A wrapper class of an application provided bidi-streaming handler.
255 /// This also applies to server-streamed implementation of a unary method
256 /// with the additional requirement that such methods must have done a
257 /// write for status to be ok
258 /// Since this is used by more than 1 class, the service is not passed in.
259 /// Instead, it is expected to be an implicitly-captured argument of func
260 /// (through bind or something along those lines)
261 template <class Streamer, bool WriteNeeded>
262 class TemplatedBidiStreamingHandler : public ::grpc::internal::MethodHandler {
263 public:
TemplatedBidiStreamingHandler(std::function<::grpc::Status (::grpc::ServerContext *,Streamer *)> func)264 explicit TemplatedBidiStreamingHandler(
265 std::function<::grpc::Status(::grpc::ServerContext*, Streamer*)> func)
266 : func_(func), write_needed_(WriteNeeded) {}
267
RunHandler(const HandlerParameter & param)268 void RunHandler(const HandlerParameter& param) final {
269 Streamer stream(param.call,
270 static_cast<::grpc::ServerContext*>(param.server_context));
271 ::grpc::Status status = CatchingFunctionHandler([this, ¶m, &stream] {
272 return func_(static_cast<::grpc::ServerContext*>(param.server_context),
273 &stream);
274 });
275
276 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
277 ::grpc::internal::CallOpServerSendStatus>
278 ops;
279 if (!param.server_context->sent_initial_metadata_) {
280 ops.SendInitialMetadata(¶m.server_context->initial_metadata_,
281 param.server_context->initial_metadata_flags());
282 if (param.server_context->compression_level_set()) {
283 ops.set_compression_level(param.server_context->compression_level());
284 }
285 if (write_needed_ && status.ok()) {
286 // If we needed a write but never did one, we need to mark the
287 // status as a fail
288 status = ::grpc::Status(::grpc::StatusCode::INTERNAL,
289 "Service did not provide response message");
290 }
291 }
292 ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status);
293 param.call->PerformOps(&ops);
294 if (param.server_context->has_pending_ops_) {
295 param.call->cq()->Pluck(¶m.server_context->pending_ops_);
296 }
297 param.call->cq()->Pluck(&ops);
298 }
299
300 private:
301 std::function<::grpc::Status(::grpc::ServerContext*, Streamer*)> func_;
302 const bool write_needed_;
303 };
304
305 template <class ServiceType, class RequestType, class ResponseType>
306 class BidiStreamingHandler
307 : public TemplatedBidiStreamingHandler<
308 ServerReaderWriter<ResponseType, RequestType>, false> {
309 public:
BidiStreamingHandler(std::function<::grpc::Status (ServiceType *,::grpc::ServerContext *,ServerReaderWriter<ResponseType,RequestType> *)> func,ServiceType * service)310 BidiStreamingHandler(std::function<::grpc::Status(
311 ServiceType*, ::grpc::ServerContext*,
312 ServerReaderWriter<ResponseType, RequestType>*)>
313 func,
314 ServiceType* service)
315 // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
316 : TemplatedBidiStreamingHandler<
317 ServerReaderWriter<ResponseType, RequestType>, false>(
318 [func, service](
319 ::grpc::ServerContext* ctx,
320 ServerReaderWriter<ResponseType, RequestType>* streamer) {
321 return func(service, ctx, streamer);
322 }) {}
323 };
324
325 template <class RequestType, class ResponseType>
326 class StreamedUnaryHandler
327 : public TemplatedBidiStreamingHandler<
328 ServerUnaryStreamer<RequestType, ResponseType>, true> {
329 public:
StreamedUnaryHandler(std::function<::grpc::Status (::grpc::ServerContext *,ServerUnaryStreamer<RequestType,ResponseType> *)> func)330 explicit StreamedUnaryHandler(
331 std::function<
332 ::grpc::Status(::grpc::ServerContext*,
333 ServerUnaryStreamer<RequestType, ResponseType>*)>
334 func)
335 : TemplatedBidiStreamingHandler<
336 ServerUnaryStreamer<RequestType, ResponseType>, true>(
337 std::move(func)) {}
338 };
339
340 template <class RequestType, class ResponseType>
341 class SplitServerStreamingHandler
342 : public TemplatedBidiStreamingHandler<
343 ServerSplitStreamer<RequestType, ResponseType>, false> {
344 public:
SplitServerStreamingHandler(std::function<::grpc::Status (::grpc::ServerContext *,ServerSplitStreamer<RequestType,ResponseType> *)> func)345 explicit SplitServerStreamingHandler(
346 std::function<
347 ::grpc::Status(::grpc::ServerContext*,
348 ServerSplitStreamer<RequestType, ResponseType>*)>
349 func)
350 : TemplatedBidiStreamingHandler<
351 ServerSplitStreamer<RequestType, ResponseType>, false>(
352 std::move(func)) {}
353 };
354
355 /// General method handler class for errors that prevent real method use
356 /// e.g., handle unknown method by returning UNIMPLEMENTED error.
357 template <::grpc::StatusCode code>
358 class ErrorMethodHandler : public ::grpc::internal::MethodHandler {
359 public:
360 template <class T>
FillOps(::grpc::ServerContextBase * context,T * ops)361 static void FillOps(::grpc::ServerContextBase* context, T* ops) {
362 ::grpc::Status status(code, "");
363 if (!context->sent_initial_metadata_) {
364 ops->SendInitialMetadata(&context->initial_metadata_,
365 context->initial_metadata_flags());
366 if (context->compression_level_set()) {
367 ops->set_compression_level(context->compression_level());
368 }
369 context->sent_initial_metadata_ = true;
370 }
371 ops->ServerSendStatus(&context->trailing_metadata_, status);
372 }
373
RunHandler(const HandlerParameter & param)374 void RunHandler(const HandlerParameter& param) final {
375 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
376 ::grpc::internal::CallOpServerSendStatus>
377 ops;
378 FillOps(param.server_context, &ops);
379 param.call->PerformOps(&ops);
380 param.call->cq()->Pluck(&ops);
381 }
382
Deserialize(grpc_call *,grpc_byte_buffer * req,::grpc::Status *,void **)383 void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
384 ::grpc::Status* /*status*/, void** /*handler_data*/) final {
385 // We have to destroy any request payload
386 if (req != nullptr) {
387 ::grpc::g_core_codegen_interface->grpc_byte_buffer_destroy(req);
388 }
389 return nullptr;
390 }
391 };
392
393 typedef ErrorMethodHandler<::grpc::StatusCode::UNIMPLEMENTED>
394 UnknownMethodHandler;
395 typedef ErrorMethodHandler<::grpc::StatusCode::RESOURCE_EXHAUSTED>
396 ResourceExhaustedHandler;
397
398 } // namespace internal
399 } // namespace grpc
400
401 #endif // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
402