• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
20 #define GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
21 
22 #include <grpcpp/impl/codegen/call.h>
23 #include <grpcpp/impl/codegen/call_op_set.h>
24 #include <grpcpp/impl/codegen/call_op_set_interface.h>
25 #include <grpcpp/impl/codegen/channel_interface.h>
26 #include <grpcpp/impl/codegen/client_context.h>
27 #include <grpcpp/impl/codegen/server_context.h>
28 #include <grpcpp/impl/codegen/service_type.h>
29 #include <grpcpp/impl/codegen/status.h>
30 
31 namespace grpc {
32 
33 // Forward declaration for use in Helper class
34 template <class R>
35 class ClientAsyncResponseReader;
36 
37 /// An interface relevant for async client side unary RPCs (which send
38 /// one request message to a server and receive one response message).
39 template <class R>
40 class ClientAsyncResponseReaderInterface {
41  public:
~ClientAsyncResponseReaderInterface()42   virtual ~ClientAsyncResponseReaderInterface() {}
43 
44   /// Start the call that was set up by the constructor, but only if the
45   /// constructor was invoked through the "Prepare" API which doesn't actually
46   /// start the call
47   virtual void StartCall() = 0;
48 
49   /// Request notification of the reading of initial metadata. Completion
50   /// will be notified by \a tag on the associated completion queue.
51   /// This call is optional, but if it is used, it cannot be used concurrently
52   /// with or after the \a Finish method.
53   ///
54   /// \param[in] tag Tag identifying this request.
55   virtual void ReadInitialMetadata(void* tag) = 0;
56 
57   /// Request to receive the server's response \a msg and final \a status for
58   /// the call, and to notify \a tag on this call's completion queue when
59   /// finished.
60   ///
61   /// This function will return when either:
62   /// - when the server's response message and status have been received.
63   /// - when the server has returned a non-OK status (no message expected in
64   ///   this case).
65   /// - when the call failed for some reason and the library generated a
66   ///   non-OK status.
67   ///
68   /// \param[in] tag Tag identifying this request.
69   /// \param[out] status To be updated with the operation status.
70   /// \param[out] msg To be filled in with the server's response message.
71   virtual void Finish(R* msg, ::grpc::Status* status, void* tag) = 0;
72 };
73 
74 namespace internal {
75 
76 class ClientAsyncResponseReaderHelper {
77  public:
78   /// Start a call and write the request out if \a start is set.
79   /// \a tag will be notified on \a cq when the call has been started (i.e.
80   /// intitial metadata sent) and \a request has been written out.
81   /// If \a start is not set, the actual call must be initiated by StartCall
82   /// Note that \a context will be used to fill in custom initial metadata
83   /// used to send to the server when starting the call.
84   ///
85   /// Optionally pass in a base class for request and response types so that the
86   /// internal functions and structs can be templated based on that, allowing
87   /// reuse across RPCs (e.g., MessageLite for protobuf). Since constructors
88   /// can't have an explicit template parameter, the last argument is an
89   /// extraneous parameter just to provide the needed type information.
90   template <class R, class W, class BaseR = R, class BaseW = W>
Create(::grpc::ChannelInterface * channel,::grpc::CompletionQueue * cq,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const W & request)91   static ClientAsyncResponseReader<R>* Create(
92       ::grpc::ChannelInterface* channel, ::grpc::CompletionQueue* cq,
93       const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context,
94       const W& request) /* __attribute__((noinline)) */ {
95     ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
96     ClientAsyncResponseReader<R>* result =
97         new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
98             call.call(), sizeof(ClientAsyncResponseReader<R>)))
99             ClientAsyncResponseReader<R>(call, context);
100     SetupRequest<BaseR, BaseW>(
101         call.call(), &result->single_buf_, &result->read_initial_metadata_,
102         &result->finish_, static_cast<const BaseW&>(request));
103 
104     return result;
105   }
106 
107   // Various helper functions to reduce templating use
108 
109   template <class R, class W>
SetupRequest(grpc_call * call,::grpc::internal::CallOpSendInitialMetadata ** single_buf_ptr,std::function<void (ClientContext *,internal::Call *,internal::CallOpSendInitialMetadata *,void *)> * read_initial_metadata,std::function<void (ClientContext *,internal::Call *,bool initial_metadata_read,internal::CallOpSendInitialMetadata *,internal::CallOpSetInterface **,void *,Status *,void *)> * finish,const W & request)110   static void SetupRequest(
111       grpc_call* call,
112       ::grpc::internal::CallOpSendInitialMetadata** single_buf_ptr,
113       std::function<void(ClientContext*, internal::Call*,
114                          internal::CallOpSendInitialMetadata*, void*)>*
115           read_initial_metadata,
116       std::function<
117           void(ClientContext*, internal::Call*, bool initial_metadata_read,
118                internal::CallOpSendInitialMetadata*,
119                internal::CallOpSetInterface**, void*, Status*, void*)>* finish,
120       const W& request) {
121     using SingleBufType =
122         ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
123                                     ::grpc::internal::CallOpSendMessage,
124                                     ::grpc::internal::CallOpClientSendClose,
125                                     ::grpc::internal::CallOpRecvInitialMetadata,
126                                     ::grpc::internal::CallOpRecvMessage<R>,
127                                     ::grpc::internal::CallOpClientRecvStatus>;
128     SingleBufType* single_buf =
129         new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
130             call, sizeof(SingleBufType))) SingleBufType;
131     *single_buf_ptr = single_buf;
132     // TODO(ctiller): don't assert
133     GPR_CODEGEN_ASSERT(single_buf->SendMessage(request).ok());
134     single_buf->ClientSendClose();
135 
136     // The purpose of the following functions is to type-erase the actual
137     // templated type of the CallOpSet being used by hiding that type inside the
138     // function definition rather than specifying it as an argument of the
139     // function or a member of the class. The type-erased CallOpSet will get
140     // static_cast'ed back to the real type so that it can be used properly.
141     *read_initial_metadata =
142         [](ClientContext* context, internal::Call* call,
143            internal::CallOpSendInitialMetadata* single_buf_view, void* tag) {
144           auto* single_buf = static_cast<SingleBufType*>(single_buf_view);
145           single_buf->set_output_tag(tag);
146           single_buf->RecvInitialMetadata(context);
147           call->PerformOps(single_buf);
148         };
149 
150     // Note that this function goes one step further than the previous one
151     // because it type-erases the message being written down to a void*. This
152     // will be static-cast'ed back to the class specified here by hiding that
153     // class information inside the function definition. Note that this feature
154     // expects the class being specified here for R to be a base-class of the
155     // "real" R without any multiple-inheritance (as applies in protbuf wrt
156     // MessageLite)
157     *finish = [](ClientContext* context, internal::Call* call,
158                  bool initial_metadata_read,
159                  internal::CallOpSendInitialMetadata* single_buf_view,
160                  internal::CallOpSetInterface** finish_buf_ptr, void* msg,
161                  Status* status, void* tag) {
162       if (initial_metadata_read) {
163         using FinishBufType = ::grpc::internal::CallOpSet<
164             ::grpc::internal::CallOpRecvMessage<R>,
165             ::grpc::internal::CallOpClientRecvStatus>;
166         FinishBufType* finish_buf =
167             new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
168                 call->call(), sizeof(FinishBufType))) FinishBufType;
169         *finish_buf_ptr = finish_buf;
170         finish_buf->set_output_tag(tag);
171         finish_buf->RecvMessage(static_cast<R*>(msg));
172         finish_buf->AllowNoMessage();
173         finish_buf->ClientRecvStatus(context, status);
174         call->PerformOps(finish_buf);
175       } else {
176         auto* single_buf = static_cast<SingleBufType*>(single_buf_view);
177         single_buf->set_output_tag(tag);
178         single_buf->RecvInitialMetadata(context);
179         single_buf->RecvMessage(static_cast<R*>(msg));
180         single_buf->AllowNoMessage();
181         single_buf->ClientRecvStatus(context, status);
182         call->PerformOps(single_buf);
183       }
184     };
185   }
186 
StartCall(::grpc::ClientContext * context,::grpc::internal::CallOpSendInitialMetadata * single_buf)187   static void StartCall(
188       ::grpc::ClientContext* context,
189       ::grpc::internal::CallOpSendInitialMetadata* single_buf) {
190     single_buf->SendInitialMetadata(&context->send_initial_metadata_,
191                                     context->initial_metadata_flags());
192   }
193 };
194 
195 // TODO(vjpai): This templated factory is deprecated and will be replaced by
196 //.             the non-templated helper as soon as possible.
197 template <class R>
198 class ClientAsyncResponseReaderFactory {
199  public:
200   template <class W>
Create(::grpc::ChannelInterface * channel,::grpc::CompletionQueue * cq,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const W & request,bool start)201   static ClientAsyncResponseReader<R>* Create(
202       ::grpc::ChannelInterface* channel, ::grpc::CompletionQueue* cq,
203       const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context,
204       const W& request, bool start) {
205     auto* result = ClientAsyncResponseReaderHelper::Create<R>(
206         channel, cq, method, context, request);
207     if (start) {
208       result->StartCall();
209     }
210     return result;
211   }
212 };
213 
214 }  // namespace internal
215 
216 /// Async API for client-side unary RPCs, where the message response
217 /// received from the server is of type \a R.
218 template <class R>
219 class ClientAsyncResponseReader final
220     : public ClientAsyncResponseReaderInterface<R> {
221  public:
222   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)223   static void operator delete(void* /*ptr*/, std::size_t size) {
224     GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncResponseReader));
225   }
226 
227   // This operator should never be called as the memory should be freed as part
228   // of the arena destruction. It only exists to provide a matching operator
229   // delete to the operator new so that some compilers will not complain (see
230   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
231   // there are no tests catching the compiler warning.
delete(void *,void *)232   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
233 
StartCall()234   void StartCall() override {
235     GPR_CODEGEN_DEBUG_ASSERT(!started_);
236     started_ = true;
237     internal::ClientAsyncResponseReaderHelper::StartCall(context_, single_buf_);
238   }
239 
240   /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for
241   /// semantics.
242   ///
243   /// Side effect:
244   ///   - the \a ClientContext associated with this call is updated with
245   ///     possible initial and trailing metadata sent from the server.
ReadInitialMetadata(void * tag)246   void ReadInitialMetadata(void* tag) override {
247     GPR_CODEGEN_DEBUG_ASSERT(started_);
248     GPR_CODEGEN_DEBUG_ASSERT(!context_->initial_metadata_received_);
249     read_initial_metadata_(context_, &call_, single_buf_, tag);
250     initial_metadata_read_ = true;
251   }
252 
253   /// See \a ClientAsyncResponseReaderInterface::Finish for semantics.
254   ///
255   /// Side effect:
256   ///   - the \a ClientContext associated with this call is updated with
257   ///     possible initial and trailing metadata sent from the server.
Finish(R * msg,::grpc::Status * status,void * tag)258   void Finish(R* msg, ::grpc::Status* status, void* tag) override {
259     GPR_CODEGEN_DEBUG_ASSERT(started_);
260     finish_(context_, &call_, initial_metadata_read_, single_buf_, &finish_buf_,
261             static_cast<void*>(msg), status, tag);
262   }
263 
264  private:
265   friend class internal::ClientAsyncResponseReaderHelper;
266   ::grpc::ClientContext* const context_;
267   ::grpc::internal::Call call_;
268   bool started_ = false;
269   bool initial_metadata_read_ = false;
270 
ClientAsyncResponseReader(::grpc::internal::Call call,::grpc::ClientContext * context)271   ClientAsyncResponseReader(::grpc::internal::Call call,
272                             ::grpc::ClientContext* context)
273       : context_(context), call_(call) {}
274 
275   // disable operator new
276   static void* operator new(std::size_t size);
new(std::size_t,void * p)277   static void* operator new(std::size_t /*size*/, void* p) { return p; }
278 
279   internal::CallOpSendInitialMetadata* single_buf_;
280   internal::CallOpSetInterface* finish_buf_ = nullptr;
281   std::function<void(ClientContext*, internal::Call*,
282                      internal::CallOpSendInitialMetadata*, void*)>
283       read_initial_metadata_;
284   std::function<void(ClientContext*, internal::Call*,
285                      bool initial_metadata_read,
286                      internal::CallOpSendInitialMetadata*,
287                      internal::CallOpSetInterface**, void*, Status*, void*)>
288       finish_;
289 };
290 
291 /// Async server-side API for handling unary calls, where the single
292 /// response message sent to the client is of type \a W.
293 template <class W>
294 class ServerAsyncResponseWriter final
295     : public ::grpc::internal::ServerAsyncStreamingInterface {
296  public:
ServerAsyncResponseWriter(::grpc::ServerContext * ctx)297   explicit ServerAsyncResponseWriter(::grpc::ServerContext* ctx)
298       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
299 
300   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
301   ///
302   /// Side effect:
303   ///   The initial metadata that will be sent to the client from this op will
304   ///   be taken from the \a ServerContext associated with the call.
305   ///
306   /// \param[in] tag Tag identifying this request.
SendInitialMetadata(void * tag)307   void SendInitialMetadata(void* tag) override {
308     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
309 
310     meta_buf_.set_output_tag(tag);
311     meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
312                                   ctx_->initial_metadata_flags());
313     if (ctx_->compression_level_set()) {
314       meta_buf_.set_compression_level(ctx_->compression_level());
315     }
316     ctx_->sent_initial_metadata_ = true;
317     call_.PerformOps(&meta_buf_);
318   }
319 
320   /// Indicate that the stream is to be finished and request notification
321   /// when the server has sent the appropriate signals to the client to
322   /// end the call. Should not be used concurrently with other operations.
323   ///
324   /// \param[in] tag Tag identifying this request.
325   /// \param[in] status To be sent to the client as the result of the call.
326   /// \param[in] msg Message to be sent to the client.
327   ///
328   /// Side effect:
329   ///   - also sends initial metadata if not already sent (using the
330   ///     \a ServerContext associated with this call).
331   ///
332   /// Note: if \a status has a non-OK code, then \a msg will not be sent,
333   /// and the client will receive only the status with possible trailing
334   /// metadata.
Finish(const W & msg,const::grpc::Status & status,void * tag)335   void Finish(const W& msg, const ::grpc::Status& status, void* tag) {
336     finish_buf_.set_output_tag(tag);
337     finish_buf_.set_core_cq_tag(&finish_buf_);
338     if (!ctx_->sent_initial_metadata_) {
339       finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
340                                       ctx_->initial_metadata_flags());
341       if (ctx_->compression_level_set()) {
342         finish_buf_.set_compression_level(ctx_->compression_level());
343       }
344       ctx_->sent_initial_metadata_ = true;
345     }
346     // The response is dropped if the status is not OK.
347     if (status.ok()) {
348       finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_,
349                                    finish_buf_.SendMessage(msg));
350     } else {
351       finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status);
352     }
353     call_.PerformOps(&finish_buf_);
354   }
355 
356   /// Indicate that the stream is to be finished with a non-OK status,
357   /// and request notification for when the server has finished sending the
358   /// appropriate signals to the client to end the call.
359   /// Should not be used concurrently with other operations.
360   ///
361   /// \param[in] tag Tag identifying this request.
362   /// \param[in] status To be sent to the client as the result of the call.
363   ///   - Note: \a status must have a non-OK code.
364   ///
365   /// Side effect:
366   ///   - also sends initial metadata if not already sent (using the
367   ///     \a ServerContext associated with this call).
FinishWithError(const::grpc::Status & status,void * tag)368   void FinishWithError(const ::grpc::Status& status, void* tag) {
369     GPR_CODEGEN_ASSERT(!status.ok());
370     finish_buf_.set_output_tag(tag);
371     if (!ctx_->sent_initial_metadata_) {
372       finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
373                                       ctx_->initial_metadata_flags());
374       if (ctx_->compression_level_set()) {
375         finish_buf_.set_compression_level(ctx_->compression_level());
376       }
377       ctx_->sent_initial_metadata_ = true;
378     }
379     finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status);
380     call_.PerformOps(&finish_buf_);
381   }
382 
383  private:
BindCall(::grpc::internal::Call * call)384   void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
385 
386   ::grpc::internal::Call call_;
387   ::grpc::ServerContext* ctx_;
388   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
389       meta_buf_;
390   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
391                               ::grpc::internal::CallOpSendMessage,
392                               ::grpc::internal::CallOpServerSendStatus>
393       finish_buf_;
394 };
395 
396 }  // namespace grpc
397 
398 namespace std {
399 template <class R>
400 class default_delete<::grpc::ClientAsyncResponseReader<R>> {
401  public:
operator()402   void operator()(void* /*p*/) {}
403 };
404 template <class R>
405 class default_delete<::grpc::ClientAsyncResponseReaderInterface<R>> {
406  public:
operator()407   void operator()(void* /*p*/) {}
408 };
409 }  // namespace std
410 
411 #endif  // GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
412