• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_IMPL_H
20 #define GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_IMPL_H
21 
22 #include <grpcpp/impl/codegen/call.h>
23 #include <grpcpp/impl/codegen/channel_interface.h>
24 #include <grpcpp/impl/codegen/client_context_impl.h>
25 #include <grpcpp/impl/codegen/server_context_impl.h>
26 #include <grpcpp/impl/codegen/service_type.h>
27 #include <grpcpp/impl/codegen/status.h>
28 
29 namespace grpc_impl {
30 
31 /// An interface relevant for async client side unary RPCs (which send
32 /// one request message to a server and receive one response message).
33 template <class R>
34 class ClientAsyncResponseReaderInterface {
35  public:
~ClientAsyncResponseReaderInterface()36   virtual ~ClientAsyncResponseReaderInterface() {}
37 
38   /// Start the call that was set up by the constructor, but only if the
39   /// constructor was invoked through the "Prepare" API which doesn't actually
40   /// start the call
41   virtual void StartCall() = 0;
42 
43   /// Request notification of the reading of initial metadata. Completion
44   /// will be notified by \a tag on the associated completion queue.
45   /// This call is optional, but if it is used, it cannot be used concurrently
46   /// with or after the \a Finish method.
47   ///
48   /// \param[in] tag Tag identifying this request.
49   virtual void ReadInitialMetadata(void* tag) = 0;
50 
51   /// Request to receive the server's response \a msg and final \a status for
52   /// the call, and to notify \a tag on this call's completion queue when
53   /// finished.
54   ///
55   /// This function will return when either:
56   /// - when the server's response message and status have been received.
57   /// - when the server has returned a non-OK status (no message expected in
58   ///   this case).
59   /// - when the call failed for some reason and the library generated a
60   ///   non-OK status.
61   ///
62   /// \param[in] tag Tag identifying this request.
63   /// \param[out] status To be updated with the operation status.
64   /// \param[out] msg To be filled in with the server's response message.
65   virtual void Finish(R* msg, ::grpc::Status* status, void* tag) = 0;
66 };
67 
68 namespace internal {
69 template <class R>
70 class ClientAsyncResponseReaderFactory {
71  public:
72   /// Start a call and write the request out if \a start is set.
73   /// \a tag will be notified on \a cq when the call has been started (i.e.
74   /// intitial metadata sent) and \a request has been written out.
75   /// If \a start is not set, the actual call must be initiated by StartCall
76   /// Note that \a context will be used to fill in custom initial metadata
77   /// used to send to the server when starting the call.
78   template <class W>
Create(::grpc::ChannelInterface * channel,::grpc_impl::CompletionQueue * cq,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,const W & request,bool start)79   static ClientAsyncResponseReader<R>* Create(
80       ::grpc::ChannelInterface* channel, ::grpc_impl::CompletionQueue* cq,
81       const ::grpc::internal::RpcMethod& method,
82       ::grpc_impl::ClientContext* context, const W& request, bool start) {
83     ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
84     return new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
85         call.call(), sizeof(ClientAsyncResponseReader<R>)))
86         ClientAsyncResponseReader<R>(call, context, request, start);
87   }
88 };
89 }  // namespace internal
90 
91 /// Async API for client-side unary RPCs, where the message response
92 /// received from the server is of type \a R.
93 template <class R>
94 class ClientAsyncResponseReader final
95     : public ClientAsyncResponseReaderInterface<R> {
96  public:
97   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)98   static void operator delete(void* /*ptr*/, std::size_t size) {
99     GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncResponseReader));
100   }
101 
102   // This operator should never be called as the memory should be freed as part
103   // of the arena destruction. It only exists to provide a matching operator
104   // delete to the operator new so that some compilers will not complain (see
105   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
106   // there are no tests catching the compiler warning.
delete(void *,void *)107   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
108 
StartCall()109   void StartCall() override {
110     GPR_CODEGEN_ASSERT(!started_);
111     started_ = true;
112     StartCallInternal();
113   }
114 
115   /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for
116   /// semantics.
117   ///
118   /// Side effect:
119   ///   - the \a ClientContext associated with this call is updated with
120   ///     possible initial and trailing metadata sent from the server.
ReadInitialMetadata(void * tag)121   void ReadInitialMetadata(void* tag) override {
122     GPR_CODEGEN_ASSERT(started_);
123     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
124 
125     single_buf.set_output_tag(tag);
126     single_buf.RecvInitialMetadata(context_);
127     call_.PerformOps(&single_buf);
128     initial_metadata_read_ = true;
129   }
130 
131   /// See \a ClientAysncResponseReaderInterface::Finish for semantics.
132   ///
133   /// Side effect:
134   ///   - the \a ClientContext associated with this call is updated with
135   ///     possible initial and trailing metadata sent from the server.
Finish(R * msg,::grpc::Status * status,void * tag)136   void Finish(R* msg, ::grpc::Status* status, void* tag) override {
137     GPR_CODEGEN_ASSERT(started_);
138     if (initial_metadata_read_) {
139       finish_buf.set_output_tag(tag);
140       finish_buf.RecvMessage(msg);
141       finish_buf.AllowNoMessage();
142       finish_buf.ClientRecvStatus(context_, status);
143       call_.PerformOps(&finish_buf);
144     } else {
145       single_buf.set_output_tag(tag);
146       single_buf.RecvInitialMetadata(context_);
147       single_buf.RecvMessage(msg);
148       single_buf.AllowNoMessage();
149       single_buf.ClientRecvStatus(context_, status);
150       call_.PerformOps(&single_buf);
151     }
152   }
153 
154  private:
155   friend class internal::ClientAsyncResponseReaderFactory<R>;
156   ::grpc_impl::ClientContext* const context_;
157   ::grpc::internal::Call call_;
158   bool started_;
159   bool initial_metadata_read_ = false;
160 
161   template <class W>
ClientAsyncResponseReader(::grpc::internal::Call call,::grpc_impl::ClientContext * context,const W & request,bool start)162   ClientAsyncResponseReader(::grpc::internal::Call call,
163                             ::grpc_impl::ClientContext* context,
164                             const W& request, bool start)
165       : context_(context), call_(call), started_(start) {
166     // Bind the metadata at time of StartCallInternal but set up the rest here
167     // TODO(ctiller): don't assert
168     GPR_CODEGEN_ASSERT(single_buf.SendMessage(request).ok());
169     single_buf.ClientSendClose();
170     if (start) StartCallInternal();
171   }
172 
StartCallInternal()173   void StartCallInternal() {
174     single_buf.SendInitialMetadata(&context_->send_initial_metadata_,
175                                    context_->initial_metadata_flags());
176   }
177 
178   // disable operator new
179   static void* operator new(std::size_t size);
new(std::size_t,void * p)180   static void* operator new(std::size_t /*size*/, void* p) { return p; }
181 
182   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
183                               ::grpc::internal::CallOpSendMessage,
184                               ::grpc::internal::CallOpClientSendClose,
185                               ::grpc::internal::CallOpRecvInitialMetadata,
186                               ::grpc::internal::CallOpRecvMessage<R>,
187                               ::grpc::internal::CallOpClientRecvStatus>
188       single_buf;
189   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>,
190                               ::grpc::internal::CallOpClientRecvStatus>
191       finish_buf;
192 };
193 
194 /// Async server-side API for handling unary calls, where the single
195 /// response message sent to the client is of type \a W.
196 template <class W>
197 class ServerAsyncResponseWriter final
198     : public ::grpc::internal::ServerAsyncStreamingInterface {
199  public:
ServerAsyncResponseWriter(::grpc_impl::ServerContext * ctx)200   explicit ServerAsyncResponseWriter(::grpc_impl::ServerContext* ctx)
201       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
202 
203   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
204   ///
205   /// Side effect:
206   ///   The initial metadata that will be sent to the client from this op will
207   ///   be taken from the \a ServerContext associated with the call.
208   ///
209   /// \param[in] tag Tag identifying this request.
SendInitialMetadata(void * tag)210   void SendInitialMetadata(void* tag) override {
211     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
212 
213     meta_buf_.set_output_tag(tag);
214     meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
215                                   ctx_->initial_metadata_flags());
216     if (ctx_->compression_level_set()) {
217       meta_buf_.set_compression_level(ctx_->compression_level());
218     }
219     ctx_->sent_initial_metadata_ = true;
220     call_.PerformOps(&meta_buf_);
221   }
222 
223   /// Indicate that the stream is to be finished and request notification
224   /// when the server has sent the appropriate signals to the client to
225   /// end the call. Should not be used concurrently with other operations.
226   ///
227   /// \param[in] tag Tag identifying this request.
228   /// \param[in] status To be sent to the client as the result of the call.
229   /// \param[in] msg Message to be sent to the client.
230   ///
231   /// Side effect:
232   ///   - also sends initial metadata if not already sent (using the
233   ///     \a ServerContext associated with this call).
234   ///
235   /// Note: if \a status has a non-OK code, then \a msg will not be sent,
236   /// and the client will receive only the status with possible trailing
237   /// metadata.
Finish(const W & msg,const::grpc::Status & status,void * tag)238   void Finish(const W& msg, const ::grpc::Status& status, void* tag) {
239     finish_buf_.set_output_tag(tag);
240     finish_buf_.set_core_cq_tag(&finish_buf_);
241     if (!ctx_->sent_initial_metadata_) {
242       finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
243                                       ctx_->initial_metadata_flags());
244       if (ctx_->compression_level_set()) {
245         finish_buf_.set_compression_level(ctx_->compression_level());
246       }
247       ctx_->sent_initial_metadata_ = true;
248     }
249     // The response is dropped if the status is not OK.
250     if (status.ok()) {
251       finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_,
252                                    finish_buf_.SendMessage(msg));
253     } else {
254       finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status);
255     }
256     call_.PerformOps(&finish_buf_);
257   }
258 
259   /// Indicate that the stream is to be finished with a non-OK status,
260   /// and request notification for when the server has finished sending the
261   /// appropriate signals to the client to end the call.
262   /// Should not be used concurrently with other operations.
263   ///
264   /// \param[in] tag Tag identifying this request.
265   /// \param[in] status To be sent to the client as the result of the call.
266   ///   - Note: \a status must have a non-OK code.
267   ///
268   /// Side effect:
269   ///   - also sends initial metadata if not already sent (using the
270   ///     \a ServerContext associated with this call).
FinishWithError(const::grpc::Status & status,void * tag)271   void FinishWithError(const ::grpc::Status& status, void* tag) {
272     GPR_CODEGEN_ASSERT(!status.ok());
273     finish_buf_.set_output_tag(tag);
274     if (!ctx_->sent_initial_metadata_) {
275       finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_,
276                                       ctx_->initial_metadata_flags());
277       if (ctx_->compression_level_set()) {
278         finish_buf_.set_compression_level(ctx_->compression_level());
279       }
280       ctx_->sent_initial_metadata_ = true;
281     }
282     finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status);
283     call_.PerformOps(&finish_buf_);
284   }
285 
286  private:
BindCall(::grpc::internal::Call * call)287   void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
288 
289   ::grpc::internal::Call call_;
290   ::grpc_impl::ServerContext* ctx_;
291   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
292       meta_buf_;
293   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
294                               ::grpc::internal::CallOpSendMessage,
295                               ::grpc::internal::CallOpServerSendStatus>
296       finish_buf_;
297 };
298 
299 }  // namespace grpc_impl
300 
301 namespace std {
302 template <class R>
303 class default_delete<::grpc_impl::ClientAsyncResponseReader<R>> {
304  public:
operator()305   void operator()(void* /*p*/) {}
306 };
307 template <class R>
308 class default_delete<::grpc_impl::ClientAsyncResponseReaderInterface<R>> {
309  public:
operator()310   void operator()(void* /*p*/) {}
311 };
312 }  // namespace std
313 
314 #endif  // GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_IMPL_H
315