1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #ifndef GRPCPP_SUPPORT_METHOD_HANDLER_H
20 #define GRPCPP_SUPPORT_METHOD_HANDLER_H
21
22 #include <grpc/byte_buffer.h>
23 #include <grpcpp/impl/rpc_service_method.h>
24 #include <grpcpp/support/byte_buffer.h>
25 #include <grpcpp/support/sync_stream.h>
26
27 #include "absl/log/absl_check.h"
28
29 namespace grpc {
30
31 namespace internal {
32
33 // Invoke the method handler, fill in the status, and
34 // return whether or not we finished safely (without an exception).
35 // Note that exception handling is 0-cost in most compiler/library
36 // implementations (except when an exception is actually thrown),
37 // so this process doesn't require additional overhead in the common case.
38 // Additionally, we don't need to return if we caught an exception or not;
39 // the handling is the same in either case.
40 template <class Callable>
CatchingFunctionHandler(Callable && handler)41 ::grpc::Status CatchingFunctionHandler(Callable&& handler) {
42 #if GRPC_ALLOW_EXCEPTIONS
43 try {
44 return handler();
45 } catch (...) {
46 return grpc::Status(grpc::StatusCode::UNKNOWN,
47 "Unexpected error in RPC handling");
48 }
49 #else // GRPC_ALLOW_EXCEPTIONS
50 return handler();
51 #endif // GRPC_ALLOW_EXCEPTIONS
52 }
53
54 /// A helper function with reduced templating to do the common work needed to
55 /// actually send the server response. Uses non-const parameter for Status since
56 /// this should only ever be called from the end of the RunHandler method.
57
58 template <class ResponseType>
UnaryRunHandlerHelper(const MethodHandler::HandlerParameter & param,ResponseType * rsp,grpc::Status & status)59 void UnaryRunHandlerHelper(const MethodHandler::HandlerParameter& param,
60 ResponseType* rsp, grpc::Status& status) {
61 ABSL_CHECK(!param.server_context->sent_initial_metadata_);
62 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
63 grpc::internal::CallOpSendMessage,
64 grpc::internal::CallOpServerSendStatus>
65 ops;
66 ops.SendInitialMetadata(¶m.server_context->initial_metadata_,
67 param.server_context->initial_metadata_flags());
68 if (param.server_context->compression_level_set()) {
69 ops.set_compression_level(param.server_context->compression_level());
70 }
71 if (status.ok()) {
72 status = ops.SendMessagePtr(rsp);
73 }
74 ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status);
75 param.call->PerformOps(&ops);
76 param.call->cq()->Pluck(&ops);
77 }
78
79 /// A helper function with reduced templating to do deserializing.
80
81 template <class RequestType>
UnaryDeserializeHelper(grpc_byte_buffer * req,grpc::Status * status,RequestType * request)82 void* UnaryDeserializeHelper(grpc_byte_buffer* req, grpc::Status* status,
83 RequestType* request) {
84 grpc::ByteBuffer buf;
85 buf.set_buffer(req);
86 *status = grpc::SerializationTraits<RequestType>::Deserialize(
87 &buf, static_cast<RequestType*>(request));
88 buf.Release();
89 if (status->ok()) {
90 return request;
91 }
92 request->~RequestType();
93 return nullptr;
94 }
95
96 /// A wrapper class of an application provided rpc method handler.
97 template <class ServiceType, class RequestType, class ResponseType,
98 class BaseRequestType = RequestType,
99 class BaseResponseType = ResponseType>
100 class RpcMethodHandler : public grpc::internal::MethodHandler {
101 public:
RpcMethodHandler(std::function<grpc::Status (ServiceType *,grpc::ServerContext *,const RequestType *,ResponseType *)> func,ServiceType * service)102 RpcMethodHandler(
103 std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
104 const RequestType*, ResponseType*)>
105 func,
106 ServiceType* service)
107 : func_(func), service_(service) {}
108
RunHandler(const HandlerParameter & param)109 void RunHandler(const HandlerParameter& param) final {
110 ResponseType rsp;
111 grpc::Status status = param.status;
112 if (status.ok()) {
113 status = CatchingFunctionHandler([this, ¶m, &rsp] {
114 return func_(service_,
115 static_cast<grpc::ServerContext*>(param.server_context),
116 static_cast<RequestType*>(param.request), &rsp);
117 });
118 static_cast<RequestType*>(param.request)->~RequestType();
119 }
120 UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status);
121 }
122
Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void **)123 void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
124 grpc::Status* status, void** /*handler_data*/) final {
125 auto* request =
126 new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType;
127 return UnaryDeserializeHelper(req, status,
128 static_cast<BaseRequestType*>(request));
129 }
130
131 private:
132 /// Application provided rpc handler function.
133 std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
134 const RequestType*, ResponseType*)>
135 func_;
136 // The class the above handler function lives in.
137 ServiceType* service_;
138 };
139
140 /// A wrapper class of an application provided client streaming handler.
141 template <class ServiceType, class RequestType, class ResponseType>
142 class ClientStreamingHandler : public grpc::internal::MethodHandler {
143 public:
ClientStreamingHandler(std::function<grpc::Status (ServiceType *,grpc::ServerContext *,ServerReader<RequestType> *,ResponseType *)> func,ServiceType * service)144 ClientStreamingHandler(
145 std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
146 ServerReader<RequestType>*, ResponseType*)>
147 func,
148 ServiceType* service)
149 : func_(func), service_(service) {}
150
RunHandler(const HandlerParameter & param)151 void RunHandler(const HandlerParameter& param) final {
152 ServerReader<RequestType> reader(
153 param.call, static_cast<grpc::ServerContext*>(param.server_context));
154 ResponseType rsp;
155 grpc::Status status =
156 CatchingFunctionHandler([this, ¶m, &reader, &rsp] {
157 return func_(service_,
158 static_cast<grpc::ServerContext*>(param.server_context),
159 &reader, &rsp);
160 });
161
162 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
163 grpc::internal::CallOpSendMessage,
164 grpc::internal::CallOpServerSendStatus>
165 ops;
166 if (!param.server_context->sent_initial_metadata_) {
167 ops.SendInitialMetadata(¶m.server_context->initial_metadata_,
168 param.server_context->initial_metadata_flags());
169 if (param.server_context->compression_level_set()) {
170 ops.set_compression_level(param.server_context->compression_level());
171 }
172 }
173 if (status.ok()) {
174 status = ops.SendMessagePtr(&rsp);
175 }
176 ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status);
177 param.call->PerformOps(&ops);
178 param.call->cq()->Pluck(&ops);
179 }
180
181 private:
182 std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
183 ServerReader<RequestType>*, ResponseType*)>
184 func_;
185 ServiceType* service_;
186 };
187
188 /// A wrapper class of an application provided server streaming handler.
189 template <class ServiceType, class RequestType, class ResponseType>
190 class ServerStreamingHandler : public grpc::internal::MethodHandler {
191 public:
ServerStreamingHandler(std::function<grpc::Status (ServiceType *,grpc::ServerContext *,const RequestType *,ServerWriter<ResponseType> *)> func,ServiceType * service)192 ServerStreamingHandler(std::function<grpc::Status(
193 ServiceType*, grpc::ServerContext*,
194 const RequestType*, ServerWriter<ResponseType>*)>
195 func,
196 ServiceType* service)
197 : func_(func), service_(service) {}
198
RunHandler(const HandlerParameter & param)199 void RunHandler(const HandlerParameter& param) final {
200 grpc::Status status = param.status;
201 if (status.ok()) {
202 ServerWriter<ResponseType> writer(
203 param.call, static_cast<grpc::ServerContext*>(param.server_context));
204 status = CatchingFunctionHandler([this, ¶m, &writer] {
205 return func_(service_,
206 static_cast<grpc::ServerContext*>(param.server_context),
207 static_cast<RequestType*>(param.request), &writer);
208 });
209 static_cast<RequestType*>(param.request)->~RequestType();
210 }
211
212 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
213 grpc::internal::CallOpServerSendStatus>
214 ops;
215 if (!param.server_context->sent_initial_metadata_) {
216 ops.SendInitialMetadata(¶m.server_context->initial_metadata_,
217 param.server_context->initial_metadata_flags());
218 if (param.server_context->compression_level_set()) {
219 ops.set_compression_level(param.server_context->compression_level());
220 }
221 }
222 ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status);
223 param.call->PerformOps(&ops);
224 if (param.server_context->has_pending_ops_) {
225 param.call->cq()->Pluck(¶m.server_context->pending_ops_);
226 }
227 param.call->cq()->Pluck(&ops);
228 }
229
Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void **)230 void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
231 grpc::Status* status, void** /*handler_data*/) final {
232 grpc::ByteBuffer buf;
233 buf.set_buffer(req);
234 auto* request =
235 new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
236 *status =
237 grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
238 buf.Release();
239 if (status->ok()) {
240 return request;
241 }
242 request->~RequestType();
243 return nullptr;
244 }
245
246 private:
247 std::function<grpc::Status(ServiceType*, grpc::ServerContext*,
248 const RequestType*, ServerWriter<ResponseType>*)>
249 func_;
250 ServiceType* service_;
251 };
252
253 /// A wrapper class of an application provided bidi-streaming handler.
254 /// This also applies to server-streamed implementation of a unary method
255 /// with the additional requirement that such methods must have done a
256 /// write for status to be ok
257 /// Since this is used by more than 1 class, the service is not passed in.
258 /// Instead, it is expected to be an implicitly-captured argument of func
259 /// (through bind or something along those lines)
260 template <class Streamer, bool WriteNeeded>
261 class TemplatedBidiStreamingHandler : public grpc::internal::MethodHandler {
262 public:
TemplatedBidiStreamingHandler(std::function<grpc::Status (grpc::ServerContext *,Streamer *)> func)263 explicit TemplatedBidiStreamingHandler(
264 std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func)
265 : func_(func), write_needed_(WriteNeeded) {}
266
RunHandler(const HandlerParameter & param)267 void RunHandler(const HandlerParameter& param) final {
268 Streamer stream(param.call,
269 static_cast<grpc::ServerContext*>(param.server_context));
270 grpc::Status status = CatchingFunctionHandler([this, ¶m, &stream] {
271 return func_(static_cast<grpc::ServerContext*>(param.server_context),
272 &stream);
273 });
274
275 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
276 grpc::internal::CallOpServerSendStatus>
277 ops;
278 if (!param.server_context->sent_initial_metadata_) {
279 ops.SendInitialMetadata(¶m.server_context->initial_metadata_,
280 param.server_context->initial_metadata_flags());
281 if (param.server_context->compression_level_set()) {
282 ops.set_compression_level(param.server_context->compression_level());
283 }
284 if (write_needed_ && status.ok()) {
285 // If we needed a write but never did one, we need to mark the
286 // status as a fail
287 status = grpc::Status(grpc::StatusCode::INTERNAL,
288 "Service did not provide response message");
289 }
290 }
291 ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status);
292 param.call->PerformOps(&ops);
293 if (param.server_context->has_pending_ops_) {
294 param.call->cq()->Pluck(¶m.server_context->pending_ops_);
295 }
296 param.call->cq()->Pluck(&ops);
297 }
298
299 private:
300 std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func_;
301 const bool write_needed_;
302 };
303
304 template <class ServiceType, class RequestType, class ResponseType>
305 class BidiStreamingHandler
306 : public TemplatedBidiStreamingHandler<
307 ServerReaderWriter<ResponseType, RequestType>, false> {
308 public:
BidiStreamingHandler(std::function<grpc::Status (ServiceType *,grpc::ServerContext *,ServerReaderWriter<ResponseType,RequestType> *)> func,ServiceType * service)309 BidiStreamingHandler(std::function<grpc::Status(
310 ServiceType*, grpc::ServerContext*,
311 ServerReaderWriter<ResponseType, RequestType>*)>
312 func,
313 ServiceType* service)
314 // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
315 : TemplatedBidiStreamingHandler<
316 ServerReaderWriter<ResponseType, RequestType>, false>(
317 [func, service](
318 grpc::ServerContext* ctx,
319 ServerReaderWriter<ResponseType, RequestType>* streamer) {
320 return func(service, ctx, streamer);
321 }) {}
322 };
323
324 template <class RequestType, class ResponseType>
325 class StreamedUnaryHandler
326 : public TemplatedBidiStreamingHandler<
327 ServerUnaryStreamer<RequestType, ResponseType>, true> {
328 public:
StreamedUnaryHandler(std::function<grpc::Status (grpc::ServerContext *,ServerUnaryStreamer<RequestType,ResponseType> *)> func)329 explicit StreamedUnaryHandler(
330 std::function<
331 grpc::Status(grpc::ServerContext*,
332 ServerUnaryStreamer<RequestType, ResponseType>*)>
333 func)
334 : TemplatedBidiStreamingHandler<
335 ServerUnaryStreamer<RequestType, ResponseType>, true>(
336 std::move(func)) {}
337 };
338
339 template <class RequestType, class ResponseType>
340 class SplitServerStreamingHandler
341 : public TemplatedBidiStreamingHandler<
342 ServerSplitStreamer<RequestType, ResponseType>, false> {
343 public:
SplitServerStreamingHandler(std::function<grpc::Status (grpc::ServerContext *,ServerSplitStreamer<RequestType,ResponseType> *)> func)344 explicit SplitServerStreamingHandler(
345 std::function<
346 grpc::Status(grpc::ServerContext*,
347 ServerSplitStreamer<RequestType, ResponseType>*)>
348 func)
349 : TemplatedBidiStreamingHandler<
350 ServerSplitStreamer<RequestType, ResponseType>, false>(
351 std::move(func)) {}
352 };
353
354 /// General method handler class for errors that prevent real method use
355 /// e.g., handle unknown method by returning UNIMPLEMENTED error.
356 template <grpc::StatusCode code>
357 class ErrorMethodHandler : public grpc::internal::MethodHandler {
358 public:
ErrorMethodHandler(const std::string & message)359 explicit ErrorMethodHandler(const std::string& message) : message_(message) {}
360
361 template <class T>
FillOps(grpc::ServerContextBase * context,const std::string & message,T * ops)362 static void FillOps(grpc::ServerContextBase* context,
363 const std::string& message, T* ops) {
364 grpc::Status status(code, message);
365 if (!context->sent_initial_metadata_) {
366 ops->SendInitialMetadata(&context->initial_metadata_,
367 context->initial_metadata_flags());
368 if (context->compression_level_set()) {
369 ops->set_compression_level(context->compression_level());
370 }
371 context->sent_initial_metadata_ = true;
372 }
373 ops->ServerSendStatus(&context->trailing_metadata_, status);
374 }
375
RunHandler(const HandlerParameter & param)376 void RunHandler(const HandlerParameter& param) final {
377 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
378 grpc::internal::CallOpServerSendStatus>
379 ops;
380 FillOps(param.server_context, message_, &ops);
381 param.call->PerformOps(&ops);
382 param.call->cq()->Pluck(&ops);
383 }
384
Deserialize(grpc_call *,grpc_byte_buffer * req,grpc::Status *,void **)385 void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
386 grpc::Status* /*status*/, void** /*handler_data*/) final {
387 // We have to destroy any request payload
388 if (req != nullptr) {
389 grpc_byte_buffer_destroy(req);
390 }
391 return nullptr;
392 }
393
394 private:
395 const std::string message_;
396 };
397
398 typedef ErrorMethodHandler<grpc::StatusCode::UNIMPLEMENTED>
399 UnknownMethodHandler;
400 typedef ErrorMethodHandler<grpc::StatusCode::RESOURCE_EXHAUSTED>
401 ResourceExhaustedHandler;
402
403 } // namespace internal
404 } // namespace grpc
405
406 #endif // GRPCPP_SUPPORT_METHOD_HANDLER_H
407