1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
20 #define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
21
22 #include <grpcpp/impl/codegen/byte_buffer.h>
23 #include <grpcpp/impl/codegen/core_codegen_interface.h>
24 #include <grpcpp/impl/codegen/rpc_service_method.h>
25 #include <grpcpp/impl/codegen/sync_stream_impl.h>
26
27 namespace grpc_impl {
28
29 namespace internal {
30
31 // Invoke the method handler, fill in the status, and
32 // return whether or not we finished safely (without an exception).
33 // Note that exception handling is 0-cost in most compiler/library
34 // implementations (except when an exception is actually thrown),
35 // so this process doesn't require additional overhead in the common case.
36 // Additionally, we don't need to return if we caught an exception or not;
37 // the handling is the same in either case.
38 template <class Callable>
CatchingFunctionHandler(Callable && handler)39 ::grpc::Status CatchingFunctionHandler(Callable&& handler) {
40 #if GRPC_ALLOW_EXCEPTIONS
41 try {
42 return handler();
43 } catch (...) {
44 return ::grpc::Status(::grpc::StatusCode::UNKNOWN,
45 "Unexpected error in RPC handling");
46 }
47 #else // GRPC_ALLOW_EXCEPTIONS
48 return handler();
49 #endif // GRPC_ALLOW_EXCEPTIONS
50 }
51
52 /// A wrapper class of an application provided rpc method handler.
53 template <class ServiceType, class RequestType, class ResponseType>
54 class RpcMethodHandler : public ::grpc::internal::MethodHandler {
55 public:
RpcMethodHandler(std::function<::grpc::Status (ServiceType *,::grpc_impl::ServerContext *,const RequestType *,ResponseType *)> func,ServiceType * service)56 RpcMethodHandler(
57 std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
58 const RequestType*, ResponseType*)>
59 func,
60 ServiceType* service)
61 : func_(func), service_(service) {}
62
RunHandler(const HandlerParameter & param)63 void RunHandler(const HandlerParameter& param) final {
64 ResponseType rsp;
65 ::grpc::Status status = param.status;
66 if (status.ok()) {
67 status = CatchingFunctionHandler([this, ¶m, &rsp] {
68 return func_(
69 service_,
70 static_cast<::grpc_impl::ServerContext*>(param.server_context),
71 static_cast<RequestType*>(param.request), &rsp);
72 });
73 static_cast<RequestType*>(param.request)->~RequestType();
74 }
75
76 GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
77 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
78 ::grpc::internal::CallOpSendMessage,
79 ::grpc::internal::CallOpServerSendStatus>
80 ops;
81 ops.SendInitialMetadata(¶m.server_context->initial_metadata_,
82 param.server_context->initial_metadata_flags());
83 if (param.server_context->compression_level_set()) {
84 ops.set_compression_level(param.server_context->compression_level());
85 }
86 if (status.ok()) {
87 status = ops.SendMessagePtr(&rsp);
88 }
89 ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status);
90 param.call->PerformOps(&ops);
91 param.call->cq()->Pluck(&ops);
92 }
93
Deserialize(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,void **)94 void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
95 ::grpc::Status* status, void** /*handler_data*/) final {
96 ::grpc::ByteBuffer buf;
97 buf.set_buffer(req);
98 auto* request =
99 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
100 call, sizeof(RequestType))) RequestType();
101 *status =
102 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
103 buf.Release();
104 if (status->ok()) {
105 return request;
106 }
107 request->~RequestType();
108 return nullptr;
109 }
110
111 private:
112 /// Application provided rpc handler function.
113 std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
114 const RequestType*, ResponseType*)>
115 func_;
116 // The class the above handler function lives in.
117 ServiceType* service_;
118 };
119
120 /// A wrapper class of an application provided client streaming handler.
121 template <class ServiceType, class RequestType, class ResponseType>
122 class ClientStreamingHandler : public ::grpc::internal::MethodHandler {
123 public:
ClientStreamingHandler(std::function<::grpc::Status (ServiceType *,::grpc_impl::ServerContext *,::grpc_impl::ServerReader<RequestType> *,ResponseType *)> func,ServiceType * service)124 ClientStreamingHandler(
125 std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
126 ::grpc_impl::ServerReader<RequestType>*,
127 ResponseType*)>
128 func,
129 ServiceType* service)
130 : func_(func), service_(service) {}
131
RunHandler(const HandlerParameter & param)132 void RunHandler(const HandlerParameter& param) final {
133 ::grpc_impl::ServerReader<RequestType> reader(
134 param.call,
135 static_cast<::grpc_impl::ServerContext*>(param.server_context));
136 ResponseType rsp;
137 ::grpc::Status status =
138 CatchingFunctionHandler([this, ¶m, &reader, &rsp] {
139 return func_(
140 service_,
141 static_cast<::grpc_impl::ServerContext*>(param.server_context),
142 &reader, &rsp);
143 });
144
145 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
146 ::grpc::internal::CallOpSendMessage,
147 ::grpc::internal::CallOpServerSendStatus>
148 ops;
149 if (!param.server_context->sent_initial_metadata_) {
150 ops.SendInitialMetadata(¶m.server_context->initial_metadata_,
151 param.server_context->initial_metadata_flags());
152 if (param.server_context->compression_level_set()) {
153 ops.set_compression_level(param.server_context->compression_level());
154 }
155 }
156 if (status.ok()) {
157 status = ops.SendMessagePtr(&rsp);
158 }
159 ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status);
160 param.call->PerformOps(&ops);
161 param.call->cq()->Pluck(&ops);
162 }
163
164 private:
165 std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
166 ::grpc_impl::ServerReader<RequestType>*,
167 ResponseType*)>
168 func_;
169 ServiceType* service_;
170 };
171
172 /// A wrapper class of an application provided server streaming handler.
173 template <class ServiceType, class RequestType, class ResponseType>
174 class ServerStreamingHandler : public ::grpc::internal::MethodHandler {
175 public:
ServerStreamingHandler(std::function<::grpc::Status (ServiceType *,::grpc_impl::ServerContext *,const RequestType *,::grpc_impl::ServerWriter<ResponseType> *)> func,ServiceType * service)176 ServerStreamingHandler(
177 std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
178 const RequestType*,
179 ::grpc_impl::ServerWriter<ResponseType>*)>
180 func,
181 ServiceType* service)
182 : func_(func), service_(service) {}
183
RunHandler(const HandlerParameter & param)184 void RunHandler(const HandlerParameter& param) final {
185 ::grpc::Status status = param.status;
186 if (status.ok()) {
187 ::grpc_impl::ServerWriter<ResponseType> writer(
188 param.call,
189 static_cast<::grpc_impl::ServerContext*>(param.server_context));
190 status = CatchingFunctionHandler([this, ¶m, &writer] {
191 return func_(
192 service_,
193 static_cast<::grpc_impl::ServerContext*>(param.server_context),
194 static_cast<RequestType*>(param.request), &writer);
195 });
196 static_cast<RequestType*>(param.request)->~RequestType();
197 }
198
199 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
200 ::grpc::internal::CallOpServerSendStatus>
201 ops;
202 if (!param.server_context->sent_initial_metadata_) {
203 ops.SendInitialMetadata(¶m.server_context->initial_metadata_,
204 param.server_context->initial_metadata_flags());
205 if (param.server_context->compression_level_set()) {
206 ops.set_compression_level(param.server_context->compression_level());
207 }
208 }
209 ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status);
210 param.call->PerformOps(&ops);
211 if (param.server_context->has_pending_ops_) {
212 param.call->cq()->Pluck(¶m.server_context->pending_ops_);
213 }
214 param.call->cq()->Pluck(&ops);
215 }
216
Deserialize(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,void **)217 void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
218 ::grpc::Status* status, void** /*handler_data*/) final {
219 ::grpc::ByteBuffer buf;
220 buf.set_buffer(req);
221 auto* request =
222 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
223 call, sizeof(RequestType))) RequestType();
224 *status =
225 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
226 buf.Release();
227 if (status->ok()) {
228 return request;
229 }
230 request->~RequestType();
231 return nullptr;
232 }
233
234 private:
235 std::function<::grpc::Status(ServiceType*, ::grpc_impl::ServerContext*,
236 const RequestType*,
237 ::grpc_impl::ServerWriter<ResponseType>*)>
238 func_;
239 ServiceType* service_;
240 };
241
242 /// A wrapper class of an application provided bidi-streaming handler.
243 /// This also applies to server-streamed implementation of a unary method
244 /// with the additional requirement that such methods must have done a
245 /// write for status to be ok
246 /// Since this is used by more than 1 class, the service is not passed in.
247 /// Instead, it is expected to be an implicitly-captured argument of func
248 /// (through bind or something along those lines)
249 template <class Streamer, bool WriteNeeded>
250 class TemplatedBidiStreamingHandler : public ::grpc::internal::MethodHandler {
251 public:
TemplatedBidiStreamingHandler(std::function<::grpc::Status (::grpc_impl::ServerContext *,Streamer *)> func)252 TemplatedBidiStreamingHandler(
253 std::function<::grpc::Status(::grpc_impl::ServerContext*, Streamer*)>
254 func)
255 : func_(func), write_needed_(WriteNeeded) {}
256
RunHandler(const HandlerParameter & param)257 void RunHandler(const HandlerParameter& param) final {
258 Streamer stream(param.call, static_cast<::grpc_impl::ServerContext*>(
259 param.server_context));
260 ::grpc::Status status = CatchingFunctionHandler([this, ¶m, &stream] {
261 return func_(
262 static_cast<::grpc_impl::ServerContext*>(param.server_context),
263 &stream);
264 });
265
266 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
267 ::grpc::internal::CallOpServerSendStatus>
268 ops;
269 if (!param.server_context->sent_initial_metadata_) {
270 ops.SendInitialMetadata(¶m.server_context->initial_metadata_,
271 param.server_context->initial_metadata_flags());
272 if (param.server_context->compression_level_set()) {
273 ops.set_compression_level(param.server_context->compression_level());
274 }
275 if (write_needed_ && status.ok()) {
276 // If we needed a write but never did one, we need to mark the
277 // status as a fail
278 status = ::grpc::Status(::grpc::StatusCode::INTERNAL,
279 "Service did not provide response message");
280 }
281 }
282 ops.ServerSendStatus(¶m.server_context->trailing_metadata_, status);
283 param.call->PerformOps(&ops);
284 if (param.server_context->has_pending_ops_) {
285 param.call->cq()->Pluck(¶m.server_context->pending_ops_);
286 }
287 param.call->cq()->Pluck(&ops);
288 }
289
290 private:
291 std::function<::grpc::Status(::grpc_impl::ServerContext*, Streamer*)> func_;
292 const bool write_needed_;
293 };
294
295 template <class ServiceType, class RequestType, class ResponseType>
296 class BidiStreamingHandler
297 : public TemplatedBidiStreamingHandler<
298 ::grpc_impl::ServerReaderWriter<ResponseType, RequestType>, false> {
299 public:
BidiStreamingHandler(std::function<::grpc::Status (ServiceType *,::grpc_impl::ServerContext *,::grpc_impl::ServerReaderWriter<ResponseType,RequestType> *)> func,ServiceType * service)300 BidiStreamingHandler(
301 std::function<::grpc::Status(
302 ServiceType*, ::grpc_impl::ServerContext*,
303 ::grpc_impl::ServerReaderWriter<ResponseType, RequestType>*)>
304 func,
305 ServiceType* service)
306 // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
307 : TemplatedBidiStreamingHandler<
308 ::grpc_impl::ServerReaderWriter<ResponseType, RequestType>, false>(
309 [func, service](
310 ::grpc_impl::ServerContext* ctx,
311 ::grpc_impl::ServerReaderWriter<ResponseType, RequestType>*
312 streamer) { return func(service, ctx, streamer); }) {}
313 };
314
315 template <class RequestType, class ResponseType>
316 class StreamedUnaryHandler
317 : public TemplatedBidiStreamingHandler<
318 ::grpc_impl::ServerUnaryStreamer<RequestType, ResponseType>, true> {
319 public:
StreamedUnaryHandler(std::function<::grpc::Status (::grpc_impl::ServerContext *,::grpc_impl::ServerUnaryStreamer<RequestType,ResponseType> *)> func)320 explicit StreamedUnaryHandler(
321 std::function<::grpc::Status(
322 ::grpc_impl::ServerContext*,
323 ::grpc_impl::ServerUnaryStreamer<RequestType, ResponseType>*)>
324 func)
325 : TemplatedBidiStreamingHandler<
326 ::grpc_impl::ServerUnaryStreamer<RequestType, ResponseType>, true>(
327 std::move(func)) {}
328 };
329
330 template <class RequestType, class ResponseType>
331 class SplitServerStreamingHandler
332 : public TemplatedBidiStreamingHandler<
333 ::grpc_impl::ServerSplitStreamer<RequestType, ResponseType>, false> {
334 public:
SplitServerStreamingHandler(std::function<::grpc::Status (::grpc_impl::ServerContext *,::grpc_impl::ServerSplitStreamer<RequestType,ResponseType> *)> func)335 explicit SplitServerStreamingHandler(
336 std::function<::grpc::Status(
337 ::grpc_impl::ServerContext*,
338 ::grpc_impl::ServerSplitStreamer<RequestType, ResponseType>*)>
339 func)
340 : TemplatedBidiStreamingHandler<
341 ::grpc_impl::ServerSplitStreamer<RequestType, ResponseType>, false>(
342 std::move(func)) {}
343 };
344
345 /// General method handler class for errors that prevent real method use
346 /// e.g., handle unknown method by returning UNIMPLEMENTED error.
347 template <::grpc::StatusCode code>
348 class ErrorMethodHandler : public ::grpc::internal::MethodHandler {
349 public:
350 template <class T>
FillOps(::grpc_impl::ServerContextBase * context,T * ops)351 static void FillOps(::grpc_impl::ServerContextBase* context, T* ops) {
352 ::grpc::Status status(code, "");
353 if (!context->sent_initial_metadata_) {
354 ops->SendInitialMetadata(&context->initial_metadata_,
355 context->initial_metadata_flags());
356 if (context->compression_level_set()) {
357 ops->set_compression_level(context->compression_level());
358 }
359 context->sent_initial_metadata_ = true;
360 }
361 ops->ServerSendStatus(&context->trailing_metadata_, status);
362 }
363
RunHandler(const HandlerParameter & param)364 void RunHandler(const HandlerParameter& param) final {
365 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
366 ::grpc::internal::CallOpServerSendStatus>
367 ops;
368 FillOps(param.server_context, &ops);
369 param.call->PerformOps(&ops);
370 param.call->cq()->Pluck(&ops);
371 }
372
Deserialize(grpc_call *,grpc_byte_buffer * req,::grpc::Status *,void **)373 void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req,
374 ::grpc::Status* /*status*/, void** /*handler_data*/) final {
375 // We have to destroy any request payload
376 if (req != nullptr) {
377 ::grpc::g_core_codegen_interface->grpc_byte_buffer_destroy(req);
378 }
379 return nullptr;
380 }
381 };
382
383 typedef ErrorMethodHandler<::grpc::StatusCode::UNIMPLEMENTED>
384 UnknownMethodHandler;
385 typedef ErrorMethodHandler<::grpc::StatusCode::RESOURCE_EXHAUSTED>
386 ResourceExhaustedHandler;
387
388 } // namespace internal
389 } // namespace grpc_impl
390
391 #endif // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
392