1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_IMPL_H 20 #define GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_IMPL_H 21 22 #include <grpcpp/impl/codegen/call.h> 23 #include <grpcpp/impl/codegen/channel_interface.h> 24 #include <grpcpp/impl/codegen/client_context_impl.h> 25 #include <grpcpp/impl/codegen/server_context_impl.h> 26 #include <grpcpp/impl/codegen/service_type.h> 27 #include <grpcpp/impl/codegen/status.h> 28 29 namespace grpc_impl { 30 31 /// An interface relevant for async client side unary RPCs (which send 32 /// one request message to a server and receive one response message). 33 template <class R> 34 class ClientAsyncResponseReaderInterface { 35 public: ~ClientAsyncResponseReaderInterface()36 virtual ~ClientAsyncResponseReaderInterface() {} 37 38 /// Start the call that was set up by the constructor, but only if the 39 /// constructor was invoked through the "Prepare" API which doesn't actually 40 /// start the call 41 virtual void StartCall() = 0; 42 43 /// Request notification of the reading of initial metadata. Completion 44 /// will be notified by \a tag on the associated completion queue. 45 /// This call is optional, but if it is used, it cannot be used concurrently 46 /// with or after the \a Finish method. 47 /// 48 /// \param[in] tag Tag identifying this request. 49 virtual void ReadInitialMetadata(void* tag) = 0; 50 51 /// Request to receive the server's response \a msg and final \a status for 52 /// the call, and to notify \a tag on this call's completion queue when 53 /// finished. 54 /// 55 /// This function will return when either: 56 /// - when the server's response message and status have been received. 57 /// - when the server has returned a non-OK status (no message expected in 58 /// this case). 59 /// - when the call failed for some reason and the library generated a 60 /// non-OK status. 61 /// 62 /// \param[in] tag Tag identifying this request. 63 /// \param[out] status To be updated with the operation status. 64 /// \param[out] msg To be filled in with the server's response message. 65 virtual void Finish(R* msg, ::grpc::Status* status, void* tag) = 0; 66 }; 67 68 namespace internal { 69 template <class R> 70 class ClientAsyncResponseReaderFactory { 71 public: 72 /// Start a call and write the request out if \a start is set. 73 /// \a tag will be notified on \a cq when the call has been started (i.e. 74 /// intitial metadata sent) and \a request has been written out. 75 /// If \a start is not set, the actual call must be initiated by StartCall 76 /// Note that \a context will be used to fill in custom initial metadata 77 /// used to send to the server when starting the call. 78 template <class W> Create(::grpc::ChannelInterface * channel,::grpc_impl::CompletionQueue * cq,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,const W & request,bool start)79 static ClientAsyncResponseReader<R>* Create( 80 ::grpc::ChannelInterface* channel, ::grpc_impl::CompletionQueue* cq, 81 const ::grpc::internal::RpcMethod& method, 82 ::grpc_impl::ClientContext* context, const W& request, bool start) { 83 ::grpc::internal::Call call = channel->CreateCall(method, context, cq); 84 return new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 85 call.call(), sizeof(ClientAsyncResponseReader<R>))) 86 ClientAsyncResponseReader<R>(call, context, request, start); 87 } 88 }; 89 } // namespace internal 90 91 /// Async API for client-side unary RPCs, where the message response 92 /// received from the server is of type \a R. 93 template <class R> 94 class ClientAsyncResponseReader final 95 : public ClientAsyncResponseReaderInterface<R> { 96 public: 97 // always allocated against a call arena, no memory free required delete(void *,std::size_t size)98 static void operator delete(void* /*ptr*/, std::size_t size) { 99 GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncResponseReader)); 100 } 101 102 // This operator should never be called as the memory should be freed as part 103 // of the arena destruction. It only exists to provide a matching operator 104 // delete to the operator new so that some compilers will not complain (see 105 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 106 // there are no tests catching the compiler warning. delete(void *,void *)107 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } 108 StartCall()109 void StartCall() override { 110 GPR_CODEGEN_ASSERT(!started_); 111 started_ = true; 112 StartCallInternal(); 113 } 114 115 /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for 116 /// semantics. 117 /// 118 /// Side effect: 119 /// - the \a ClientContext associated with this call is updated with 120 /// possible initial and trailing metadata sent from the server. ReadInitialMetadata(void * tag)121 void ReadInitialMetadata(void* tag) override { 122 GPR_CODEGEN_ASSERT(started_); 123 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 124 125 single_buf.set_output_tag(tag); 126 single_buf.RecvInitialMetadata(context_); 127 call_.PerformOps(&single_buf); 128 initial_metadata_read_ = true; 129 } 130 131 /// See \a ClientAysncResponseReaderInterface::Finish for semantics. 132 /// 133 /// Side effect: 134 /// - the \a ClientContext associated with this call is updated with 135 /// possible initial and trailing metadata sent from the server. Finish(R * msg,::grpc::Status * status,void * tag)136 void Finish(R* msg, ::grpc::Status* status, void* tag) override { 137 GPR_CODEGEN_ASSERT(started_); 138 if (initial_metadata_read_) { 139 finish_buf.set_output_tag(tag); 140 finish_buf.RecvMessage(msg); 141 finish_buf.AllowNoMessage(); 142 finish_buf.ClientRecvStatus(context_, status); 143 call_.PerformOps(&finish_buf); 144 } else { 145 single_buf.set_output_tag(tag); 146 single_buf.RecvInitialMetadata(context_); 147 single_buf.RecvMessage(msg); 148 single_buf.AllowNoMessage(); 149 single_buf.ClientRecvStatus(context_, status); 150 call_.PerformOps(&single_buf); 151 } 152 } 153 154 private: 155 friend class internal::ClientAsyncResponseReaderFactory<R>; 156 ::grpc_impl::ClientContext* const context_; 157 ::grpc::internal::Call call_; 158 bool started_; 159 bool initial_metadata_read_ = false; 160 161 template <class W> ClientAsyncResponseReader(::grpc::internal::Call call,::grpc_impl::ClientContext * context,const W & request,bool start)162 ClientAsyncResponseReader(::grpc::internal::Call call, 163 ::grpc_impl::ClientContext* context, 164 const W& request, bool start) 165 : context_(context), call_(call), started_(start) { 166 // Bind the metadata at time of StartCallInternal but set up the rest here 167 // TODO(ctiller): don't assert 168 GPR_CODEGEN_ASSERT(single_buf.SendMessage(request).ok()); 169 single_buf.ClientSendClose(); 170 if (start) StartCallInternal(); 171 } 172 StartCallInternal()173 void StartCallInternal() { 174 single_buf.SendInitialMetadata(&context_->send_initial_metadata_, 175 context_->initial_metadata_flags()); 176 } 177 178 // disable operator new 179 static void* operator new(std::size_t size); new(std::size_t,void * p)180 static void* operator new(std::size_t /*size*/, void* p) { return p; } 181 182 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 183 ::grpc::internal::CallOpSendMessage, 184 ::grpc::internal::CallOpClientSendClose, 185 ::grpc::internal::CallOpRecvInitialMetadata, 186 ::grpc::internal::CallOpRecvMessage<R>, 187 ::grpc::internal::CallOpClientRecvStatus> 188 single_buf; 189 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>, 190 ::grpc::internal::CallOpClientRecvStatus> 191 finish_buf; 192 }; 193 194 /// Async server-side API for handling unary calls, where the single 195 /// response message sent to the client is of type \a W. 196 template <class W> 197 class ServerAsyncResponseWriter final 198 : public ::grpc::internal::ServerAsyncStreamingInterface { 199 public: ServerAsyncResponseWriter(::grpc_impl::ServerContext * ctx)200 explicit ServerAsyncResponseWriter(::grpc_impl::ServerContext* ctx) 201 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 202 203 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 204 /// 205 /// Side effect: 206 /// The initial metadata that will be sent to the client from this op will 207 /// be taken from the \a ServerContext associated with the call. 208 /// 209 /// \param[in] tag Tag identifying this request. SendInitialMetadata(void * tag)210 void SendInitialMetadata(void* tag) override { 211 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 212 213 meta_buf_.set_output_tag(tag); 214 meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, 215 ctx_->initial_metadata_flags()); 216 if (ctx_->compression_level_set()) { 217 meta_buf_.set_compression_level(ctx_->compression_level()); 218 } 219 ctx_->sent_initial_metadata_ = true; 220 call_.PerformOps(&meta_buf_); 221 } 222 223 /// Indicate that the stream is to be finished and request notification 224 /// when the server has sent the appropriate signals to the client to 225 /// end the call. Should not be used concurrently with other operations. 226 /// 227 /// \param[in] tag Tag identifying this request. 228 /// \param[in] status To be sent to the client as the result of the call. 229 /// \param[in] msg Message to be sent to the client. 230 /// 231 /// Side effect: 232 /// - also sends initial metadata if not already sent (using the 233 /// \a ServerContext associated with this call). 234 /// 235 /// Note: if \a status has a non-OK code, then \a msg will not be sent, 236 /// and the client will receive only the status with possible trailing 237 /// metadata. Finish(const W & msg,const::grpc::Status & status,void * tag)238 void Finish(const W& msg, const ::grpc::Status& status, void* tag) { 239 finish_buf_.set_output_tag(tag); 240 finish_buf_.set_core_cq_tag(&finish_buf_); 241 if (!ctx_->sent_initial_metadata_) { 242 finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, 243 ctx_->initial_metadata_flags()); 244 if (ctx_->compression_level_set()) { 245 finish_buf_.set_compression_level(ctx_->compression_level()); 246 } 247 ctx_->sent_initial_metadata_ = true; 248 } 249 // The response is dropped if the status is not OK. 250 if (status.ok()) { 251 finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, 252 finish_buf_.SendMessage(msg)); 253 } else { 254 finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status); 255 } 256 call_.PerformOps(&finish_buf_); 257 } 258 259 /// Indicate that the stream is to be finished with a non-OK status, 260 /// and request notification for when the server has finished sending the 261 /// appropriate signals to the client to end the call. 262 /// Should not be used concurrently with other operations. 263 /// 264 /// \param[in] tag Tag identifying this request. 265 /// \param[in] status To be sent to the client as the result of the call. 266 /// - Note: \a status must have a non-OK code. 267 /// 268 /// Side effect: 269 /// - also sends initial metadata if not already sent (using the 270 /// \a ServerContext associated with this call). FinishWithError(const::grpc::Status & status,void * tag)271 void FinishWithError(const ::grpc::Status& status, void* tag) { 272 GPR_CODEGEN_ASSERT(!status.ok()); 273 finish_buf_.set_output_tag(tag); 274 if (!ctx_->sent_initial_metadata_) { 275 finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, 276 ctx_->initial_metadata_flags()); 277 if (ctx_->compression_level_set()) { 278 finish_buf_.set_compression_level(ctx_->compression_level()); 279 } 280 ctx_->sent_initial_metadata_ = true; 281 } 282 finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status); 283 call_.PerformOps(&finish_buf_); 284 } 285 286 private: BindCall(::grpc::internal::Call * call)287 void BindCall(::grpc::internal::Call* call) override { call_ = *call; } 288 289 ::grpc::internal::Call call_; 290 ::grpc_impl::ServerContext* ctx_; 291 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 292 meta_buf_; 293 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 294 ::grpc::internal::CallOpSendMessage, 295 ::grpc::internal::CallOpServerSendStatus> 296 finish_buf_; 297 }; 298 299 } // namespace grpc_impl 300 301 namespace std { 302 template <class R> 303 class default_delete<::grpc_impl::ClientAsyncResponseReader<R>> { 304 public: operator()305 void operator()(void* /*p*/) {} 306 }; 307 template <class R> 308 class default_delete<::grpc_impl::ClientAsyncResponseReaderInterface<R>> { 309 public: operator()310 void operator()(void* /*p*/) {} 311 }; 312 } // namespace std 313 314 #endif // GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_IMPL_H 315