• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H
20 #define GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H
21 
22 #include <grpc/grpc.h>
23 #include <grpcpp/client_context.h>
24 #include <grpcpp/impl/call.h>
25 #include <grpcpp/impl/call_op_set.h>
26 #include <grpcpp/impl/call_op_set_interface.h>
27 #include <grpcpp/impl/channel_interface.h>
28 #include <grpcpp/impl/service_type.h>
29 #include <grpcpp/server_context.h>
30 #include <grpcpp/support/status.h>
31 
32 #include "absl/log/absl_check.h"
33 
34 namespace grpc {
35 
36 // Forward declaration for use in Helper class
37 template <class R>
38 class ClientAsyncResponseReader;
39 
40 /// An interface relevant for async client side unary RPCs (which send
41 /// one request message to a server and receive one response message).
42 template <class R>
43 class ClientAsyncResponseReaderInterface {
44  public:
~ClientAsyncResponseReaderInterface()45   virtual ~ClientAsyncResponseReaderInterface() {}
46 
47   /// Start the call that was set up by the constructor, but only if the
48   /// constructor was invoked through the "Prepare" API which doesn't actually
49   /// start the call
50   virtual void StartCall() = 0;
51 
52   /// Request notification of the reading of initial metadata. Completion
53   /// will be notified by \a tag on the associated completion queue.
54   /// This call is optional, but if it is used, it cannot be used concurrently
55   /// with or after the \a Finish method.
56   ///
57   /// \param[in] tag Tag identifying this request.
58   virtual void ReadInitialMetadata(void* tag) = 0;
59 
60   /// Request to receive the server's response \a msg and final \a status for
61   /// the call, and to notify \a tag on this call's completion queue when
62   /// finished.
63   ///
64   /// This function will return when either:
65   /// - when the server's response message and status have been received.
66   /// - when the server has returned a non-OK status (no message expected in
67   ///   this case).
68   /// - when the call failed for some reason and the library generated a
69   ///   non-OK status.
70   ///
71   /// \param[in] tag Tag identifying this request.
72   /// \param[out] status To be updated with the operation status.
73   /// \param[out] msg To be filled in with the server's response message.
74   virtual void Finish(R* msg, grpc::Status* status, void* tag) = 0;
75 };
76 
77 namespace internal {
78 
79 class ClientAsyncResponseReaderHelper {
80  public:
81   /// Start a call and write the request out if \a start is set.
82   /// \a tag will be notified on \a cq when the call has been started (i.e.
83   /// initial metadata sent) and \a request has been written out.
84   /// If \a start is not set, the actual call must be initiated by StartCall
85   /// Note that \a context will be used to fill in custom initial metadata
86   /// used to send to the server when starting the call.
87   ///
88   /// Optionally pass in a base class for request and response types so that the
89   /// internal functions and structs can be templated based on that, allowing
90   /// reuse across RPCs (e.g., MessageLite for protobuf). Since constructors
91   /// can't have an explicit template parameter, the last argument is an
92   /// extraneous parameter just to provide the needed type information.
93   template <class R, class W, class BaseR = R, class BaseW = W>
Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request)94   static ClientAsyncResponseReader<R>* Create(
95       grpc::ChannelInterface* channel, grpc::CompletionQueue* cq,
96       const grpc::internal::RpcMethod& method, grpc::ClientContext* context,
97       const W& request) /* __attribute__((noinline)) */ {
98     grpc::internal::Call call = channel->CreateCall(method, context, cq);
99     ClientAsyncResponseReader<R>* result = new (grpc_call_arena_alloc(
100         call.call(), sizeof(ClientAsyncResponseReader<R>)))
101         ClientAsyncResponseReader<R>(call, context);
102     SetupRequest<BaseR, BaseW>(
103         call.call(), &result->single_buf_, &result->read_initial_metadata_,
104         &result->finish_, static_cast<const BaseW&>(request));
105 
106     return result;
107   }
108 
109   // Various helper functions to reduce templating use
110 
111   template <class R, class W>
SetupRequest(grpc_call * call,grpc::internal::CallOpSendInitialMetadata ** single_buf_ptr,std::function<void (ClientContext *,internal::Call *,internal::CallOpSendInitialMetadata *,void *)> * read_initial_metadata,std::function<void (ClientContext *,internal::Call *,bool initial_metadata_read,internal::CallOpSendInitialMetadata *,internal::CallOpSetInterface **,void *,Status *,void *)> * finish,const W & request)112   static void SetupRequest(
113       grpc_call* call,
114       grpc::internal::CallOpSendInitialMetadata** single_buf_ptr,
115       std::function<void(ClientContext*, internal::Call*,
116                          internal::CallOpSendInitialMetadata*, void*)>*
117           read_initial_metadata,
118       std::function<
119           void(ClientContext*, internal::Call*, bool initial_metadata_read,
120                internal::CallOpSendInitialMetadata*,
121                internal::CallOpSetInterface**, void*, Status*, void*)>* finish,
122       const W& request) {
123     using SingleBufType =
124         grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
125                                   grpc::internal::CallOpSendMessage,
126                                   grpc::internal::CallOpClientSendClose,
127                                   grpc::internal::CallOpRecvInitialMetadata,
128                                   grpc::internal::CallOpRecvMessage<R>,
129                                   grpc::internal::CallOpClientRecvStatus>;
130     SingleBufType* single_buf =
131         new (grpc_call_arena_alloc(call, sizeof(SingleBufType))) SingleBufType;
132     *single_buf_ptr = single_buf;
133     // TODO(ctiller): don't assert
134     ABSL_CHECK(single_buf->SendMessage(request).ok());
135     single_buf->ClientSendClose();
136 
137     // The purpose of the following functions is to type-erase the actual
138     // templated type of the CallOpSet being used by hiding that type inside the
139     // function definition rather than specifying it as an argument of the
140     // function or a member of the class. The type-erased CallOpSet will get
141     // static_cast'ed back to the real type so that it can be used properly.
142     *read_initial_metadata =
143         [](ClientContext* context, internal::Call* call,
144            internal::CallOpSendInitialMetadata* single_buf_view, void* tag) {
145           auto* single_buf = static_cast<SingleBufType*>(single_buf_view);
146           single_buf->set_output_tag(tag);
147           single_buf->RecvInitialMetadata(context);
148           call->PerformOps(single_buf);
149         };
150 
151     // Note that this function goes one step further than the previous one
152     // because it type-erases the message being written down to a void*. This
153     // will be static-cast'ed back to the class specified here by hiding that
154     // class information inside the function definition. Note that this feature
155     // expects the class being specified here for R to be a base-class of the
156     // "real" R without any multiple-inheritance (as applies in protobuf wrt
157     // MessageLite)
158     *finish = [](ClientContext* context, internal::Call* call,
159                  bool initial_metadata_read,
160                  internal::CallOpSendInitialMetadata* single_buf_view,
161                  internal::CallOpSetInterface** finish_buf_ptr, void* msg,
162                  Status* status, void* tag) {
163       if (initial_metadata_read) {
164         using FinishBufType =
165             grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>,
166                                       grpc::internal::CallOpClientRecvStatus>;
167         FinishBufType* finish_buf =
168             new (grpc_call_arena_alloc(call->call(), sizeof(FinishBufType)))
169                 FinishBufType;
170         *finish_buf_ptr = finish_buf;
171         finish_buf->set_output_tag(tag);
172         finish_buf->RecvMessage(static_cast<R*>(msg));
173         finish_buf->AllowNoMessage();
174         finish_buf->ClientRecvStatus(context, status);
175         call->PerformOps(finish_buf);
176       } else {
177         auto* single_buf = static_cast<SingleBufType*>(single_buf_view);
178         single_buf->set_output_tag(tag);
179         single_buf->RecvInitialMetadata(context);
180         single_buf->RecvMessage(static_cast<R*>(msg));
181         single_buf->AllowNoMessage();
182         single_buf->ClientRecvStatus(context, status);
183         call->PerformOps(single_buf);
184       }
185     };
186   }
187 
StartCall(grpc::ClientContext * context,grpc::internal::CallOpSendInitialMetadata * single_buf)188   static void StartCall(grpc::ClientContext* context,
189                         grpc::internal::CallOpSendInitialMetadata* single_buf) {
190     single_buf->SendInitialMetadata(&context->send_initial_metadata_,
191                                     context->initial_metadata_flags());
192   }
193 };
194 
195 // TODO(vjpai): This templated factory is deprecated and will be replaced by
196 //.             the non-templated helper as soon as possible.
197 template <class R>
198 class ClientAsyncResponseReaderFactory {
199  public:
200   template <class W>
Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request,bool start)201   static ClientAsyncResponseReader<R>* Create(
202       grpc::ChannelInterface* channel, grpc::CompletionQueue* cq,
203       const grpc::internal::RpcMethod& method, grpc::ClientContext* context,
204       const W& request, bool start) {
205     auto* result = ClientAsyncResponseReaderHelper::Create<R>(
206         channel, cq, method, context, request);
207     if (start) {
208       result->StartCall();
209     }
210     return result;
211   }
212 };
213 
214 }  // namespace internal
215 
216 /// Async API for client-side unary RPCs, where the message response
217 /// received from the server is of type \a R.
218 template <class R>
219 class ClientAsyncResponseReader final
220     : public ClientAsyncResponseReaderInterface<R> {
221  public:
222   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)223   static void operator delete(void* /*ptr*/, std::size_t size) {
224     ABSL_CHECK_EQ(size, sizeof(ClientAsyncResponseReader));
225   }
226 
227   // This operator should never be called as the memory should be freed as part
228   // of the arena destruction. It only exists to provide a matching operator
229   // delete to the operator new so that some compilers will not complain (see
230   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
231   // there are no tests catching the compiler warning.
delete(void *,void *)232   static void operator delete(void*, void*) { ABSL_CHECK(false); }
233 
StartCall()234   void StartCall() override {
235     ABSL_DCHECK(!started_);
236     started_ = true;
237     internal::ClientAsyncResponseReaderHelper::StartCall(context_, single_buf_);
238   }
239 
240   /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for
241   /// semantics.
242   ///
243   /// Side effect:
244   ///   - the \a ClientContext associated with this call is updated with
245   ///     possible initial and trailing metadata sent from the server.
ReadInitialMetadata(void * tag)246   void ReadInitialMetadata(void* tag) override {
247     ABSL_DCHECK(started_);
248     ABSL_DCHECK(!context_->initial_metadata_received_);
249     read_initial_metadata_(context_, &call_, single_buf_, tag);
250     initial_metadata_read_ = true;
251   }
252 
253   /// See \a ClientAsyncResponseReaderInterface::Finish for semantics.
254   ///
255   /// Side effect:
256   ///   - the \a ClientContext associated with this call is updated with
257   ///     possible initial and trailing metadata sent from the server.
Finish(R * msg,grpc::Status * status,void * tag)258   void Finish(R* msg, grpc::Status* status, void* tag) override {
259     ABSL_DCHECK(started_);
260     finish_(context_, &call_, initial_metadata_read_, single_buf_, &finish_buf_,
261             static_cast<void*>(msg), status, tag);
262   }
263 
264  private:
265   friend class internal::ClientAsyncResponseReaderHelper;
266   grpc::ClientContext* const context_;
267   grpc::internal::Call call_;
268   bool started_ = false;
269   bool initial_metadata_read_ = false;
270 
ClientAsyncResponseReader(grpc::internal::Call call,grpc::ClientContext * context)271   ClientAsyncResponseReader(grpc::internal::Call call,
272                             grpc::ClientContext* context)
273       : context_(context), call_(call) {}
274 
275   // disable operator new
276   static void* operator new(std::size_t size);
new(std::size_t,void * p)277   static void* operator new(std::size_t /*size*/, void* p) { return p; }
278 
279   internal::CallOpSendInitialMetadata* single_buf_;
280   internal::CallOpSetInterface* finish_buf_ = nullptr;
281   std::function<void(ClientContext*, internal::Call*,
282                      internal::CallOpSendInitialMetadata*, void*)>
283       read_initial_metadata_;
284   std::function<void(ClientContext*, internal::Call*,
285                      bool initial_metadata_read,
286                      internal::CallOpSendInitialMetadata*,
287                      internal::CallOpSetInterface**, void*, Status*, void*)>
288       finish_;
289 };
290 
291 /// Async server-side API for handling unary calls, where the single
292 /// response message sent to the client is of type \a W.
293 template <class W>
294 class ServerAsyncResponseWriter final
295     : public grpc::internal::ServerAsyncStreamingInterface {
296  public:
ServerAsyncResponseWriter(grpc::ServerContext * ctx)297   explicit ServerAsyncResponseWriter(grpc::ServerContext* ctx)
298       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
299 
300   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
301   ///
302   /// Side effect:
303   ///   The initial metadata that will be sent to the client from this op will
304   ///   be taken from the \a ServerContext associated with the call.
305   ///
306   /// \param[in] tag Tag identifying this request.
SendInitialMetadata(void * tag)307   void SendInitialMetadata(void* tag) override {
308     ABSL_CHECK(!ctx_->sent_initial_metadata_);
309 
310     meta_buf_.set_output_tag(tag);
311     meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
312                                   ctx_->initial_metadata_flags());
313     if (ctx_->compression_level_set()) {
314       meta_buf_.set_compression_level(ctx_->compression_level());
315     }
316     ctx_->sent_initial_metadata_ = true;
317     call_.PerformOps(&meta_buf_);
318   }
319 
320   /// Indicate that the stream is to be finished and request notification
321   /// when the server has sent the appropriate signals to the client to
322   /// end the call. Should not be used concurrently with other operations.
323   ///
324   /// \param[in] tag Tag identifying this request.
325   /// \param[in] status To be sent to the client as the result of the call.
326   /// \param[in] msg Message to be sent to the client.
327   ///
328   /// Side effect:
329   ///   - also sends initial metadata if not already sent (using the
330   ///     \a ServerContext associated with this call).
331   ///
332   /// Note: if \a status has a non-OK code, then \a msg will not be sent,
333   /// and the client will receive only the status with possible trailing
334   /// metadata.
335   ///
336   /// gRPC doesn't take ownership or a reference to msg and status, so it is
337   /// safe to deallocate them once the Finish operation is complete (i.e. a
338   /// result arrives in the completion queue).
Finish(const W & msg,const grpc::Status & status,void * tag)339   void Finish(const W& msg, const grpc::Status& status, void* tag) {
340     finish_buf_.set_output_tag(tag);
341     finish_buf_.set_core_cq_tag(&finish_buf_);
342     if (!ctx_->sent_initial_metadata_) {
343       finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
344                                       ctx_->initial_metadata_flags());
345       if (ctx_->compression_level_set()) {
346         finish_buf_.set_compression_level(ctx_->compression_level());
347       }
348       ctx_->sent_initial_metadata_ = true;
349     }
350     // The response is dropped if the status is not OK.
351     if (status.ok()) {
352       finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_,
353                                    finish_buf_.SendMessage(msg));
354     } else {
355       finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status);
356     }
357     call_.PerformOps(&finish_buf_);
358   }
359 
360   /// Indicate that the stream is to be finished with a non-OK status,
361   /// and request notification for when the server has finished sending the
362   /// appropriate signals to the client to end the call.
363   /// Should not be used concurrently with other operations.
364   ///
365   /// \param[in] tag Tag identifying this request.
366   /// \param[in] status To be sent to the client as the result of the call.
367   ///   - Note: \a status must have a non-OK code.
368   ///
369   /// Side effect:
370   ///   - also sends initial metadata if not already sent (using the
371   ///     \a ServerContext associated with this call).
372   ///
373   /// gRPC doesn't take ownership or a reference to status, so it is safe to
374   /// deallocate them once the Finish operation is complete (i.e. a result
375   /// arrives in the completion queue).
FinishWithError(const grpc::Status & status,void * tag)376   void FinishWithError(const grpc::Status& status, void* tag) {
377     ABSL_CHECK(!status.ok());
378     finish_buf_.set_output_tag(tag);
379     if (!ctx_->sent_initial_metadata_) {
380       finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
381                                       ctx_->initial_metadata_flags());
382       if (ctx_->compression_level_set()) {
383         finish_buf_.set_compression_level(ctx_->compression_level());
384       }
385       ctx_->sent_initial_metadata_ = true;
386     }
387     finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status);
388     call_.PerformOps(&finish_buf_);
389   }
390 
391  private:
BindCall(grpc::internal::Call * call)392   void BindCall(grpc::internal::Call* call) override { call_ = *call; }
393 
394   grpc::internal::Call call_;
395   grpc::ServerContext* ctx_;
396   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
397       meta_buf_;
398   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
399                             grpc::internal::CallOpSendMessage,
400                             grpc::internal::CallOpServerSendStatus>
401       finish_buf_;
402 };
403 
404 }  // namespace grpc
405 
406 namespace std {
407 template <class R>
408 class default_delete<grpc::ClientAsyncResponseReader<R>> {
409  public:
operator()410   void operator()(void* /*p*/) {}
411 };
412 template <class R>
413 class default_delete<grpc::ClientAsyncResponseReaderInterface<R>> {
414  public:
operator()415   void operator()(void* /*p*/) {}
416 };
417 }  // namespace std
418 
419 #endif  // GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H
420