• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1  /*
2   *
3   * Copyright 2015 gRPC authors.
4   *
5   * Licensed under the Apache License, Version 2.0 (the "License");
6   * you may not use this file except in compliance with the License.
7   * You may obtain a copy of the License at
8   *
9   *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   */
18  
19  #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_IMPL_H
20  #define GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_IMPL_H
21  
22  #include <grpcpp/impl/codegen/call.h>
23  #include <grpcpp/impl/codegen/channel_interface.h>
24  #include <grpcpp/impl/codegen/client_context_impl.h>
25  #include <grpcpp/impl/codegen/server_context_impl.h>
26  #include <grpcpp/impl/codegen/service_type.h>
27  #include <grpcpp/impl/codegen/status.h>
28  
29  namespace grpc_impl {
30  
31  /// An interface relevant for async client side unary RPCs (which send
32  /// one request message to a server and receive one response message).
33  template <class R>
34  class ClientAsyncResponseReaderInterface {
35   public:
~ClientAsyncResponseReaderInterface()36    virtual ~ClientAsyncResponseReaderInterface() {}
37  
38    /// Start the call that was set up by the constructor, but only if the
39    /// constructor was invoked through the "Prepare" API which doesn't actually
40    /// start the call
41    virtual void StartCall() = 0;
42  
43    /// Request notification of the reading of initial metadata. Completion
44    /// will be notified by \a tag on the associated completion queue.
45    /// This call is optional, but if it is used, it cannot be used concurrently
46    /// with or after the \a Finish method.
47    ///
48    /// \param[in] tag Tag identifying this request.
49    virtual void ReadInitialMetadata(void* tag) = 0;
50  
51    /// Request to receive the server's response \a msg and final \a status for
52    /// the call, and to notify \a tag on this call's completion queue when
53    /// finished.
54    ///
55    /// This function will return when either:
56    /// - when the server's response message and status have been received.
57    /// - when the server has returned a non-OK status (no message expected in
58    ///   this case).
59    /// - when the call failed for some reason and the library generated a
60    ///   non-OK status.
61    ///
62    /// \param[in] tag Tag identifying this request.
63    /// \param[out] status To be updated with the operation status.
64    /// \param[out] msg To be filled in with the server's response message.
65    virtual void Finish(R* msg, ::grpc::Status* status, void* tag) = 0;
66  };
67  
68  namespace internal {
69  template <class R>
70  class ClientAsyncResponseReaderFactory {
71   public:
72    /// Start a call and write the request out if \a start is set.
73    /// \a tag will be notified on \a cq when the call has been started (i.e.
74    /// intitial metadata sent) and \a request has been written out.
75    /// If \a start is not set, the actual call must be initiated by StartCall
76    /// Note that \a context will be used to fill in custom initial metadata
77    /// used to send to the server when starting the call.
78    template <class W>
Create(::grpc::ChannelInterface * channel,::grpc_impl::CompletionQueue * cq,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,const W & request,bool start)79    static ClientAsyncResponseReader<R>* Create(
80        ::grpc::ChannelInterface* channel, ::grpc_impl::CompletionQueue* cq,
81        const ::grpc::internal::RpcMethod& method,
82        ::grpc_impl::ClientContext* context, const W& request, bool start) {
83      ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
84      return new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
85          call.call(), sizeof(ClientAsyncResponseReader<R>)))
86          ClientAsyncResponseReader<R>(call, context, request, start);
87    }
88  };
89  }  // namespace internal
90  
91  /// Async API for client-side unary RPCs, where the message response
92  /// received from the server is of type \a R.
93  template <class R>
94  class ClientAsyncResponseReader final
95      : public ClientAsyncResponseReaderInterface<R> {
96   public:
97    // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)98    static void operator delete(void* /*ptr*/, std::size_t size) {
99      GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncResponseReader));
100    }
101  
102    // This operator should never be called as the memory should be freed as part
103    // of the arena destruction. It only exists to provide a matching operator
104    // delete to the operator new so that some compilers will not complain (see
105    // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
106    // there are no tests catching the compiler warning.
delete(void *,void *)107    static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
108  
StartCall()109    void StartCall() override {
110      GPR_CODEGEN_ASSERT(!started_);
111      started_ = true;
112      StartCallInternal();
113    }
114  
115    /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for
116    /// semantics.
117    ///
118    /// Side effect:
119    ///   - the \a ClientContext associated with this call is updated with
120    ///     possible initial and trailing metadata sent from the server.
ReadInitialMetadata(void * tag)121    void ReadInitialMetadata(void* tag) override {
122      GPR_CODEGEN_ASSERT(started_);
123      GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
124  
125      single_buf.set_output_tag(tag);
126      single_buf.RecvInitialMetadata(context_);
127      call_.PerformOps(&single_buf);
128      initial_metadata_read_ = true;
129    }
130  
131    /// See \a ClientAysncResponseReaderInterface::Finish for semantics.
132    ///
133    /// Side effect:
134    ///   - the \a ClientContext associated with this call is updated with
135    ///     possible initial and trailing metadata sent from the server.
Finish(R * msg,::grpc::Status * status,void * tag)136    void Finish(R* msg, ::grpc::Status* status, void* tag) override {
137      GPR_CODEGEN_ASSERT(started_);
138      if (initial_metadata_read_) {
139        finish_buf.set_output_tag(tag);
140        finish_buf.RecvMessage(msg);
141        finish_buf.AllowNoMessage();
142        finish_buf.ClientRecvStatus(context_, status);
143        call_.PerformOps(&finish_buf);
144      } else {
145        single_buf.set_output_tag(tag);
146        single_buf.RecvInitialMetadata(context_);
147        single_buf.RecvMessage(msg);
148        single_buf.AllowNoMessage();
149        single_buf.ClientRecvStatus(context_, status);
150        call_.PerformOps(&single_buf);
151      }
152    }
153  
154   private:
155    friend class internal::ClientAsyncResponseReaderFactory<R>;
156    ::grpc_impl::ClientContext* const context_;
157    ::grpc::internal::Call call_;
158    bool started_;
159    bool initial_metadata_read_ = false;
160  
161    template <class W>
ClientAsyncResponseReader(::grpc::internal::Call call,::grpc_impl::ClientContext * context,const W & request,bool start)162    ClientAsyncResponseReader(::grpc::internal::Call call,
163                              ::grpc_impl::ClientContext* context,
164                              const W& request, bool start)
165        : context_(context), call_(call), started_(start) {
166      // Bind the metadata at time of StartCallInternal but set up the rest here
167      // TODO(ctiller): don't assert
168      GPR_CODEGEN_ASSERT(single_buf.SendMessage(request).ok());
169      single_buf.ClientSendClose();
170      if (start) StartCallInternal();
171    }
172  
StartCallInternal()173    void StartCallInternal() {
174      single_buf.SendInitialMetadata(&context_->send_initial_metadata_,
175                                     context_->initial_metadata_flags());
176    }
177  
178    // disable operator new
179    static void* operator new(std::size_t size);
new(std::size_t,void * p)180    static void* operator new(std::size_t /*size*/, void* p) { return p; }
181  
182    ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
183                                ::grpc::internal::CallOpSendMessage,
184                                ::grpc::internal::CallOpClientSendClose,
185                                ::grpc::internal::CallOpRecvInitialMetadata,
186                                ::grpc::internal::CallOpRecvMessage<R>,
187                                ::grpc::internal::CallOpClientRecvStatus>
188        single_buf;
189    ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>,
190                                ::grpc::internal::CallOpClientRecvStatus>
191        finish_buf;
192  };
193  
194  /// Async server-side API for handling unary calls, where the single
195  /// response message sent to the client is of type \a W.
196  template <class W>
197  class ServerAsyncResponseWriter final
198      : public ::grpc::internal::ServerAsyncStreamingInterface {
199   public:
ServerAsyncResponseWriter(::grpc_impl::ServerContext * ctx)200    explicit ServerAsyncResponseWriter(::grpc_impl::ServerContext* ctx)
201        : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
202  
203    /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
204    ///
205    /// Side effect:
206    ///   The initial metadata that will be sent to the client from this op will
207    ///   be taken from the \a ServerContext associated with the call.
208    ///
209    /// \param[in] tag Tag identifying this request.
SendInitialMetadata(void * tag)210    void SendInitialMetadata(void* tag) override {
211      GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
212  
213      meta_buf_.set_output_tag(tag);
214      meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
215                                    ctx_->initial_metadata_flags());
216      if (ctx_->compression_level_set()) {
217        meta_buf_.set_compression_level(ctx_->compression_level());
218      }
219      ctx_->sent_initial_metadata_ = true;
220      call_.PerformOps(&meta_buf_);
221    }
222  
223    /// Indicate that the stream is to be finished and request notification
224    /// when the server has sent the appropriate signals to the client to
225    /// end the call. Should not be used concurrently with other operations.
226    ///
227    /// \param[in] tag Tag identifying this request.
228    /// \param[in] status To be sent to the client as the result of the call.
229    /// \param[in] msg Message to be sent to the client.
230    ///
231    /// Side effect:
232    ///   - also sends initial metadata if not already sent (using the
233    ///     \a ServerContext associated with this call).
234    ///
235    /// Note: if \a status has a non-OK code, then \a msg will not be sent,
236    /// and the client will receive only the status with possible trailing
237    /// metadata.
Finish(const W & msg,const::grpc::Status & status,void * tag)238    void Finish(const W& msg, const ::grpc::Status& status, void* tag) {
239      finish_buf_.set_output_tag(tag);
240      finish_buf_.set_core_cq_tag(&finish_buf_);
241      if (!ctx_->sent_initial_metadata_) {
242        finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
243                                        ctx_->initial_metadata_flags());
244        if (ctx_->compression_level_set()) {
245          finish_buf_.set_compression_level(ctx_->compression_level());
246        }
247        ctx_->sent_initial_metadata_ = true;
248      }
249      // The response is dropped if the status is not OK.
250      if (status.ok()) {
251        finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_,
252                                     finish_buf_.SendMessage(msg));
253      } else {
254        finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status);
255      }
256      call_.PerformOps(&finish_buf_);
257    }
258  
259    /// Indicate that the stream is to be finished with a non-OK status,
260    /// and request notification for when the server has finished sending the
261    /// appropriate signals to the client to end the call.
262    /// Should not be used concurrently with other operations.
263    ///
264    /// \param[in] tag Tag identifying this request.
265    /// \param[in] status To be sent to the client as the result of the call.
266    ///   - Note: \a status must have a non-OK code.
267    ///
268    /// Side effect:
269    ///   - also sends initial metadata if not already sent (using the
270    ///     \a ServerContext associated with this call).
FinishWithError(const::grpc::Status & status,void * tag)271    void FinishWithError(const ::grpc::Status& status, void* tag) {
272      GPR_CODEGEN_ASSERT(!status.ok());
273      finish_buf_.set_output_tag(tag);
274      if (!ctx_->sent_initial_metadata_) {
275        finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
276                                        ctx_->initial_metadata_flags());
277        if (ctx_->compression_level_set()) {
278          finish_buf_.set_compression_level(ctx_->compression_level());
279        }
280        ctx_->sent_initial_metadata_ = true;
281      }
282      finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status);
283      call_.PerformOps(&finish_buf_);
284    }
285  
286   private:
BindCall(::grpc::internal::Call * call)287    void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
288  
289    ::grpc::internal::Call call_;
290    ::grpc_impl::ServerContext* ctx_;
291    ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
292        meta_buf_;
293    ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
294                                ::grpc::internal::CallOpSendMessage,
295                                ::grpc::internal::CallOpServerSendStatus>
296        finish_buf_;
297  };
298  
299  }  // namespace grpc_impl
300  
301  namespace std {
302  template <class R>
303  class default_delete<::grpc_impl::ClientAsyncResponseReader<R>> {
304   public:
operator()305    void operator()(void* /*p*/) {}
306  };
307  template <class R>
308  class default_delete<::grpc_impl::ClientAsyncResponseReaderInterface<R>> {
309   public:
operator()310    void operator()(void* /*p*/) {}
311  };
312  }  // namespace std
313  
314  #endif  // GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_IMPL_H
315