1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H 20 #define GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H 21 22 #include <grpcpp/impl/codegen/call.h> 23 #include <grpcpp/impl/codegen/call_op_set.h> 24 #include <grpcpp/impl/codegen/call_op_set_interface.h> 25 #include <grpcpp/impl/codegen/channel_interface.h> 26 #include <grpcpp/impl/codegen/client_context.h> 27 #include <grpcpp/impl/codegen/server_context.h> 28 #include <grpcpp/impl/codegen/service_type.h> 29 #include <grpcpp/impl/codegen/status.h> 30 31 namespace grpc { 32 33 // Forward declaration for use in Helper class 34 template <class R> 35 class ClientAsyncResponseReader; 36 37 /// An interface relevant for async client side unary RPCs (which send 38 /// one request message to a server and receive one response message). 39 template <class R> 40 class ClientAsyncResponseReaderInterface { 41 public: ~ClientAsyncResponseReaderInterface()42 virtual ~ClientAsyncResponseReaderInterface() {} 43 44 /// Start the call that was set up by the constructor, but only if the 45 /// constructor was invoked through the "Prepare" API which doesn't actually 46 /// start the call 47 virtual void StartCall() = 0; 48 49 /// Request notification of the reading of initial metadata. Completion 50 /// will be notified by \a tag on the associated completion queue. 51 /// This call is optional, but if it is used, it cannot be used concurrently 52 /// with or after the \a Finish method. 53 /// 54 /// \param[in] tag Tag identifying this request. 55 virtual void ReadInitialMetadata(void* tag) = 0; 56 57 /// Request to receive the server's response \a msg and final \a status for 58 /// the call, and to notify \a tag on this call's completion queue when 59 /// finished. 60 /// 61 /// This function will return when either: 62 /// - when the server's response message and status have been received. 63 /// - when the server has returned a non-OK status (no message expected in 64 /// this case). 65 /// - when the call failed for some reason and the library generated a 66 /// non-OK status. 67 /// 68 /// \param[in] tag Tag identifying this request. 69 /// \param[out] status To be updated with the operation status. 70 /// \param[out] msg To be filled in with the server's response message. 71 virtual void Finish(R* msg, ::grpc::Status* status, void* tag) = 0; 72 }; 73 74 namespace internal { 75 76 class ClientAsyncResponseReaderHelper { 77 public: 78 /// Start a call and write the request out if \a start is set. 79 /// \a tag will be notified on \a cq when the call has been started (i.e. 80 /// intitial metadata sent) and \a request has been written out. 81 /// If \a start is not set, the actual call must be initiated by StartCall 82 /// Note that \a context will be used to fill in custom initial metadata 83 /// used to send to the server when starting the call. 84 /// 85 /// Optionally pass in a base class for request and response types so that the 86 /// internal functions and structs can be templated based on that, allowing 87 /// reuse across RPCs (e.g., MessageLite for protobuf). Since constructors 88 /// can't have an explicit template parameter, the last argument is an 89 /// extraneous parameter just to provide the needed type information. 90 template <class R, class W, class BaseR = R, class BaseW = W> Create(::grpc::ChannelInterface * channel,::grpc::CompletionQueue * cq,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const W & request)91 static ClientAsyncResponseReader<R>* Create( 92 ::grpc::ChannelInterface* channel, ::grpc::CompletionQueue* cq, 93 const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context, 94 const W& request) /* __attribute__((noinline)) */ { 95 ::grpc::internal::Call call = channel->CreateCall(method, context, cq); 96 ClientAsyncResponseReader<R>* result = 97 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 98 call.call(), sizeof(ClientAsyncResponseReader<R>))) 99 ClientAsyncResponseReader<R>(call, context); 100 SetupRequest<BaseR, BaseW>( 101 call.call(), &result->single_buf_, &result->read_initial_metadata_, 102 &result->finish_, static_cast<const BaseW&>(request)); 103 104 return result; 105 } 106 107 // Various helper functions to reduce templating use 108 109 template <class R, class W> SetupRequest(grpc_call * call,::grpc::internal::CallOpSendInitialMetadata ** single_buf_ptr,std::function<void (ClientContext *,internal::Call *,internal::CallOpSendInitialMetadata *,void *)> * read_initial_metadata,std::function<void (ClientContext *,internal::Call *,bool initial_metadata_read,internal::CallOpSendInitialMetadata *,internal::CallOpSetInterface **,void *,Status *,void *)> * finish,const W & request)110 static void SetupRequest( 111 grpc_call* call, 112 ::grpc::internal::CallOpSendInitialMetadata** single_buf_ptr, 113 std::function<void(ClientContext*, internal::Call*, 114 internal::CallOpSendInitialMetadata*, void*)>* 115 read_initial_metadata, 116 std::function< 117 void(ClientContext*, internal::Call*, bool initial_metadata_read, 118 internal::CallOpSendInitialMetadata*, 119 internal::CallOpSetInterface**, void*, Status*, void*)>* finish, 120 const W& request) { 121 using SingleBufType = 122 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 123 ::grpc::internal::CallOpSendMessage, 124 ::grpc::internal::CallOpClientSendClose, 125 ::grpc::internal::CallOpRecvInitialMetadata, 126 ::grpc::internal::CallOpRecvMessage<R>, 127 ::grpc::internal::CallOpClientRecvStatus>; 128 SingleBufType* single_buf = 129 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 130 call, sizeof(SingleBufType))) SingleBufType; 131 *single_buf_ptr = single_buf; 132 // TODO(ctiller): don't assert 133 GPR_CODEGEN_ASSERT(single_buf->SendMessage(request).ok()); 134 single_buf->ClientSendClose(); 135 136 // The purpose of the following functions is to type-erase the actual 137 // templated type of the CallOpSet being used by hiding that type inside the 138 // function definition rather than specifying it as an argument of the 139 // function or a member of the class. The type-erased CallOpSet will get 140 // static_cast'ed back to the real type so that it can be used properly. 141 *read_initial_metadata = 142 [](ClientContext* context, internal::Call* call, 143 internal::CallOpSendInitialMetadata* single_buf_view, void* tag) { 144 auto* single_buf = static_cast<SingleBufType*>(single_buf_view); 145 single_buf->set_output_tag(tag); 146 single_buf->RecvInitialMetadata(context); 147 call->PerformOps(single_buf); 148 }; 149 150 // Note that this function goes one step further than the previous one 151 // because it type-erases the message being written down to a void*. This 152 // will be static-cast'ed back to the class specified here by hiding that 153 // class information inside the function definition. Note that this feature 154 // expects the class being specified here for R to be a base-class of the 155 // "real" R without any multiple-inheritance (as applies in protbuf wrt 156 // MessageLite) 157 *finish = [](ClientContext* context, internal::Call* call, 158 bool initial_metadata_read, 159 internal::CallOpSendInitialMetadata* single_buf_view, 160 internal::CallOpSetInterface** finish_buf_ptr, void* msg, 161 Status* status, void* tag) { 162 if (initial_metadata_read) { 163 using FinishBufType = ::grpc::internal::CallOpSet< 164 ::grpc::internal::CallOpRecvMessage<R>, 165 ::grpc::internal::CallOpClientRecvStatus>; 166 FinishBufType* finish_buf = 167 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 168 call->call(), sizeof(FinishBufType))) FinishBufType; 169 *finish_buf_ptr = finish_buf; 170 finish_buf->set_output_tag(tag); 171 finish_buf->RecvMessage(static_cast<R*>(msg)); 172 finish_buf->AllowNoMessage(); 173 finish_buf->ClientRecvStatus(context, status); 174 call->PerformOps(finish_buf); 175 } else { 176 auto* single_buf = static_cast<SingleBufType*>(single_buf_view); 177 single_buf->set_output_tag(tag); 178 single_buf->RecvInitialMetadata(context); 179 single_buf->RecvMessage(static_cast<R*>(msg)); 180 single_buf->AllowNoMessage(); 181 single_buf->ClientRecvStatus(context, status); 182 call->PerformOps(single_buf); 183 } 184 }; 185 } 186 StartCall(::grpc::ClientContext * context,::grpc::internal::CallOpSendInitialMetadata * single_buf)187 static void StartCall( 188 ::grpc::ClientContext* context, 189 ::grpc::internal::CallOpSendInitialMetadata* single_buf) { 190 single_buf->SendInitialMetadata(&context->send_initial_metadata_, 191 context->initial_metadata_flags()); 192 } 193 }; 194 195 // TODO(vjpai): This templated factory is deprecated and will be replaced by 196 //. the non-templated helper as soon as possible. 197 template <class R> 198 class ClientAsyncResponseReaderFactory { 199 public: 200 template <class W> Create(::grpc::ChannelInterface * channel,::grpc::CompletionQueue * cq,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const W & request,bool start)201 static ClientAsyncResponseReader<R>* Create( 202 ::grpc::ChannelInterface* channel, ::grpc::CompletionQueue* cq, 203 const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context, 204 const W& request, bool start) { 205 auto* result = ClientAsyncResponseReaderHelper::Create<R>( 206 channel, cq, method, context, request); 207 if (start) { 208 result->StartCall(); 209 } 210 return result; 211 } 212 }; 213 214 } // namespace internal 215 216 /// Async API for client-side unary RPCs, where the message response 217 /// received from the server is of type \a R. 218 template <class R> 219 class ClientAsyncResponseReader final 220 : public ClientAsyncResponseReaderInterface<R> { 221 public: 222 // always allocated against a call arena, no memory free required delete(void *,std::size_t size)223 static void operator delete(void* /*ptr*/, std::size_t size) { 224 GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncResponseReader)); 225 } 226 227 // This operator should never be called as the memory should be freed as part 228 // of the arena destruction. It only exists to provide a matching operator 229 // delete to the operator new so that some compilers will not complain (see 230 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 231 // there are no tests catching the compiler warning. delete(void *,void *)232 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } 233 StartCall()234 void StartCall() override { 235 GPR_CODEGEN_DEBUG_ASSERT(!started_); 236 started_ = true; 237 internal::ClientAsyncResponseReaderHelper::StartCall(context_, single_buf_); 238 } 239 240 /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for 241 /// semantics. 242 /// 243 /// Side effect: 244 /// - the \a ClientContext associated with this call is updated with 245 /// possible initial and trailing metadata sent from the server. ReadInitialMetadata(void * tag)246 void ReadInitialMetadata(void* tag) override { 247 GPR_CODEGEN_DEBUG_ASSERT(started_); 248 GPR_CODEGEN_DEBUG_ASSERT(!context_->initial_metadata_received_); 249 read_initial_metadata_(context_, &call_, single_buf_, tag); 250 initial_metadata_read_ = true; 251 } 252 253 /// See \a ClientAsyncResponseReaderInterface::Finish for semantics. 254 /// 255 /// Side effect: 256 /// - the \a ClientContext associated with this call is updated with 257 /// possible initial and trailing metadata sent from the server. Finish(R * msg,::grpc::Status * status,void * tag)258 void Finish(R* msg, ::grpc::Status* status, void* tag) override { 259 GPR_CODEGEN_DEBUG_ASSERT(started_); 260 finish_(context_, &call_, initial_metadata_read_, single_buf_, &finish_buf_, 261 static_cast<void*>(msg), status, tag); 262 } 263 264 private: 265 friend class internal::ClientAsyncResponseReaderHelper; 266 ::grpc::ClientContext* const context_; 267 ::grpc::internal::Call call_; 268 bool started_ = false; 269 bool initial_metadata_read_ = false; 270 ClientAsyncResponseReader(::grpc::internal::Call call,::grpc::ClientContext * context)271 ClientAsyncResponseReader(::grpc::internal::Call call, 272 ::grpc::ClientContext* context) 273 : context_(context), call_(call) {} 274 275 // disable operator new 276 static void* operator new(std::size_t size); new(std::size_t,void * p)277 static void* operator new(std::size_t /*size*/, void* p) { return p; } 278 279 internal::CallOpSendInitialMetadata* single_buf_; 280 internal::CallOpSetInterface* finish_buf_ = nullptr; 281 std::function<void(ClientContext*, internal::Call*, 282 internal::CallOpSendInitialMetadata*, void*)> 283 read_initial_metadata_; 284 std::function<void(ClientContext*, internal::Call*, 285 bool initial_metadata_read, 286 internal::CallOpSendInitialMetadata*, 287 internal::CallOpSetInterface**, void*, Status*, void*)> 288 finish_; 289 }; 290 291 /// Async server-side API for handling unary calls, where the single 292 /// response message sent to the client is of type \a W. 293 template <class W> 294 class ServerAsyncResponseWriter final 295 : public ::grpc::internal::ServerAsyncStreamingInterface { 296 public: ServerAsyncResponseWriter(::grpc::ServerContext * ctx)297 explicit ServerAsyncResponseWriter(::grpc::ServerContext* ctx) 298 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 299 300 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 301 /// 302 /// Side effect: 303 /// The initial metadata that will be sent to the client from this op will 304 /// be taken from the \a ServerContext associated with the call. 305 /// 306 /// \param[in] tag Tag identifying this request. SendInitialMetadata(void * tag)307 void SendInitialMetadata(void* tag) override { 308 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 309 310 meta_buf_.set_output_tag(tag); 311 meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, 312 ctx_->initial_metadata_flags()); 313 if (ctx_->compression_level_set()) { 314 meta_buf_.set_compression_level(ctx_->compression_level()); 315 } 316 ctx_->sent_initial_metadata_ = true; 317 call_.PerformOps(&meta_buf_); 318 } 319 320 /// Indicate that the stream is to be finished and request notification 321 /// when the server has sent the appropriate signals to the client to 322 /// end the call. Should not be used concurrently with other operations. 323 /// 324 /// \param[in] tag Tag identifying this request. 325 /// \param[in] status To be sent to the client as the result of the call. 326 /// \param[in] msg Message to be sent to the client. 327 /// 328 /// Side effect: 329 /// - also sends initial metadata if not already sent (using the 330 /// \a ServerContext associated with this call). 331 /// 332 /// Note: if \a status has a non-OK code, then \a msg will not be sent, 333 /// and the client will receive only the status with possible trailing 334 /// metadata. Finish(const W & msg,const::grpc::Status & status,void * tag)335 void Finish(const W& msg, const ::grpc::Status& status, void* tag) { 336 finish_buf_.set_output_tag(tag); 337 finish_buf_.set_core_cq_tag(&finish_buf_); 338 if (!ctx_->sent_initial_metadata_) { 339 finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, 340 ctx_->initial_metadata_flags()); 341 if (ctx_->compression_level_set()) { 342 finish_buf_.set_compression_level(ctx_->compression_level()); 343 } 344 ctx_->sent_initial_metadata_ = true; 345 } 346 // The response is dropped if the status is not OK. 347 if (status.ok()) { 348 finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, 349 finish_buf_.SendMessage(msg)); 350 } else { 351 finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status); 352 } 353 call_.PerformOps(&finish_buf_); 354 } 355 356 /// Indicate that the stream is to be finished with a non-OK status, 357 /// and request notification for when the server has finished sending the 358 /// appropriate signals to the client to end the call. 359 /// Should not be used concurrently with other operations. 360 /// 361 /// \param[in] tag Tag identifying this request. 362 /// \param[in] status To be sent to the client as the result of the call. 363 /// - Note: \a status must have a non-OK code. 364 /// 365 /// Side effect: 366 /// - also sends initial metadata if not already sent (using the 367 /// \a ServerContext associated with this call). FinishWithError(const::grpc::Status & status,void * tag)368 void FinishWithError(const ::grpc::Status& status, void* tag) { 369 GPR_CODEGEN_ASSERT(!status.ok()); 370 finish_buf_.set_output_tag(tag); 371 if (!ctx_->sent_initial_metadata_) { 372 finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, 373 ctx_->initial_metadata_flags()); 374 if (ctx_->compression_level_set()) { 375 finish_buf_.set_compression_level(ctx_->compression_level()); 376 } 377 ctx_->sent_initial_metadata_ = true; 378 } 379 finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status); 380 call_.PerformOps(&finish_buf_); 381 } 382 383 private: BindCall(::grpc::internal::Call * call)384 void BindCall(::grpc::internal::Call* call) override { call_ = *call; } 385 386 ::grpc::internal::Call call_; 387 ::grpc::ServerContext* ctx_; 388 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 389 meta_buf_; 390 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 391 ::grpc::internal::CallOpSendMessage, 392 ::grpc::internal::CallOpServerSendStatus> 393 finish_buf_; 394 }; 395 396 } // namespace grpc 397 398 namespace std { 399 template <class R> 400 class default_delete<::grpc::ClientAsyncResponseReader<R>> { 401 public: operator()402 void operator()(void* /*p*/) {} 403 }; 404 template <class R> 405 class default_delete<::grpc::ClientAsyncResponseReaderInterface<R>> { 406 public: operator()407 void operator()(void* /*p*/) {} 408 }; 409 } // namespace std 410 411 #endif // GRPCPP_IMPL_CODEGEN_ASYNC_UNARY_CALL_H 412