1 // 2 // 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H 20 #define GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H 21 22 #include <grpc/grpc.h> 23 #include <grpcpp/client_context.h> 24 #include <grpcpp/impl/call.h> 25 #include <grpcpp/impl/call_op_set.h> 26 #include <grpcpp/impl/call_op_set_interface.h> 27 #include <grpcpp/impl/channel_interface.h> 28 #include <grpcpp/impl/service_type.h> 29 #include <grpcpp/server_context.h> 30 #include <grpcpp/support/status.h> 31 32 #include "absl/log/absl_check.h" 33 34 namespace grpc { 35 36 // Forward declaration for use in Helper class 37 template <class R> 38 class ClientAsyncResponseReader; 39 40 /// An interface relevant for async client side unary RPCs (which send 41 /// one request message to a server and receive one response message). 42 template <class R> 43 class ClientAsyncResponseReaderInterface { 44 public: ~ClientAsyncResponseReaderInterface()45 virtual ~ClientAsyncResponseReaderInterface() {} 46 47 /// Start the call that was set up by the constructor, but only if the 48 /// constructor was invoked through the "Prepare" API which doesn't actually 49 /// start the call 50 virtual void StartCall() = 0; 51 52 /// Request notification of the reading of initial metadata. Completion 53 /// will be notified by \a tag on the associated completion queue. 54 /// This call is optional, but if it is used, it cannot be used concurrently 55 /// with or after the \a Finish method. 56 /// 57 /// \param[in] tag Tag identifying this request. 58 virtual void ReadInitialMetadata(void* tag) = 0; 59 60 /// Request to receive the server's response \a msg and final \a status for 61 /// the call, and to notify \a tag on this call's completion queue when 62 /// finished. 63 /// 64 /// This function will return when either: 65 /// - when the server's response message and status have been received. 66 /// - when the server has returned a non-OK status (no message expected in 67 /// this case). 68 /// - when the call failed for some reason and the library generated a 69 /// non-OK status. 70 /// 71 /// \param[in] tag Tag identifying this request. 72 /// \param[out] status To be updated with the operation status. 73 /// \param[out] msg To be filled in with the server's response message. 74 virtual void Finish(R* msg, grpc::Status* status, void* tag) = 0; 75 }; 76 77 namespace internal { 78 79 class ClientAsyncResponseReaderHelper { 80 public: 81 /// Start a call and write the request out if \a start is set. 82 /// \a tag will be notified on \a cq when the call has been started (i.e. 83 /// initial metadata sent) and \a request has been written out. 84 /// If \a start is not set, the actual call must be initiated by StartCall 85 /// Note that \a context will be used to fill in custom initial metadata 86 /// used to send to the server when starting the call. 87 /// 88 /// Optionally pass in a base class for request and response types so that the 89 /// internal functions and structs can be templated based on that, allowing 90 /// reuse across RPCs (e.g., MessageLite for protobuf). Since constructors 91 /// can't have an explicit template parameter, the last argument is an 92 /// extraneous parameter just to provide the needed type information. 93 template <class R, class W, class BaseR = R, class BaseW = W> Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request)94 static ClientAsyncResponseReader<R>* Create( 95 grpc::ChannelInterface* channel, grpc::CompletionQueue* cq, 96 const grpc::internal::RpcMethod& method, grpc::ClientContext* context, 97 const W& request) /* __attribute__((noinline)) */ { 98 grpc::internal::Call call = channel->CreateCall(method, context, cq); 99 ClientAsyncResponseReader<R>* result = new (grpc_call_arena_alloc( 100 call.call(), sizeof(ClientAsyncResponseReader<R>))) 101 ClientAsyncResponseReader<R>(call, context); 102 SetupRequest<BaseR, BaseW>( 103 call.call(), &result->single_buf_, &result->read_initial_metadata_, 104 &result->finish_, static_cast<const BaseW&>(request)); 105 106 return result; 107 } 108 109 // Various helper functions to reduce templating use 110 111 template <class R, class W> SetupRequest(grpc_call * call,grpc::internal::CallOpSendInitialMetadata ** single_buf_ptr,std::function<void (ClientContext *,internal::Call *,internal::CallOpSendInitialMetadata *,void *)> * read_initial_metadata,std::function<void (ClientContext *,internal::Call *,bool initial_metadata_read,internal::CallOpSendInitialMetadata *,internal::CallOpSetInterface **,void *,Status *,void *)> * finish,const W & request)112 static void SetupRequest( 113 grpc_call* call, 114 grpc::internal::CallOpSendInitialMetadata** single_buf_ptr, 115 std::function<void(ClientContext*, internal::Call*, 116 internal::CallOpSendInitialMetadata*, void*)>* 117 read_initial_metadata, 118 std::function< 119 void(ClientContext*, internal::Call*, bool initial_metadata_read, 120 internal::CallOpSendInitialMetadata*, 121 internal::CallOpSetInterface**, void*, Status*, void*)>* finish, 122 const W& request) { 123 using SingleBufType = 124 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 125 grpc::internal::CallOpSendMessage, 126 grpc::internal::CallOpClientSendClose, 127 grpc::internal::CallOpRecvInitialMetadata, 128 grpc::internal::CallOpRecvMessage<R>, 129 grpc::internal::CallOpClientRecvStatus>; 130 SingleBufType* single_buf = 131 new (grpc_call_arena_alloc(call, sizeof(SingleBufType))) SingleBufType; 132 *single_buf_ptr = single_buf; 133 // TODO(ctiller): don't assert 134 ABSL_CHECK(single_buf->SendMessage(request).ok()); 135 single_buf->ClientSendClose(); 136 137 // The purpose of the following functions is to type-erase the actual 138 // templated type of the CallOpSet being used by hiding that type inside the 139 // function definition rather than specifying it as an argument of the 140 // function or a member of the class. The type-erased CallOpSet will get 141 // static_cast'ed back to the real type so that it can be used properly. 142 *read_initial_metadata = 143 [](ClientContext* context, internal::Call* call, 144 internal::CallOpSendInitialMetadata* single_buf_view, void* tag) { 145 auto* single_buf = static_cast<SingleBufType*>(single_buf_view); 146 single_buf->set_output_tag(tag); 147 single_buf->RecvInitialMetadata(context); 148 call->PerformOps(single_buf); 149 }; 150 151 // Note that this function goes one step further than the previous one 152 // because it type-erases the message being written down to a void*. This 153 // will be static-cast'ed back to the class specified here by hiding that 154 // class information inside the function definition. Note that this feature 155 // expects the class being specified here for R to be a base-class of the 156 // "real" R without any multiple-inheritance (as applies in protobuf wrt 157 // MessageLite) 158 *finish = [](ClientContext* context, internal::Call* call, 159 bool initial_metadata_read, 160 internal::CallOpSendInitialMetadata* single_buf_view, 161 internal::CallOpSetInterface** finish_buf_ptr, void* msg, 162 Status* status, void* tag) { 163 if (initial_metadata_read) { 164 using FinishBufType = 165 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>, 166 grpc::internal::CallOpClientRecvStatus>; 167 FinishBufType* finish_buf = 168 new (grpc_call_arena_alloc(call->call(), sizeof(FinishBufType))) 169 FinishBufType; 170 *finish_buf_ptr = finish_buf; 171 finish_buf->set_output_tag(tag); 172 finish_buf->RecvMessage(static_cast<R*>(msg)); 173 finish_buf->AllowNoMessage(); 174 finish_buf->ClientRecvStatus(context, status); 175 call->PerformOps(finish_buf); 176 } else { 177 auto* single_buf = static_cast<SingleBufType*>(single_buf_view); 178 single_buf->set_output_tag(tag); 179 single_buf->RecvInitialMetadata(context); 180 single_buf->RecvMessage(static_cast<R*>(msg)); 181 single_buf->AllowNoMessage(); 182 single_buf->ClientRecvStatus(context, status); 183 call->PerformOps(single_buf); 184 } 185 }; 186 } 187 StartCall(grpc::ClientContext * context,grpc::internal::CallOpSendInitialMetadata * single_buf)188 static void StartCall(grpc::ClientContext* context, 189 grpc::internal::CallOpSendInitialMetadata* single_buf) { 190 single_buf->SendInitialMetadata(&context->send_initial_metadata_, 191 context->initial_metadata_flags()); 192 } 193 }; 194 195 // TODO(vjpai): This templated factory is deprecated and will be replaced by 196 //. the non-templated helper as soon as possible. 197 template <class R> 198 class ClientAsyncResponseReaderFactory { 199 public: 200 template <class W> Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request,bool start)201 static ClientAsyncResponseReader<R>* Create( 202 grpc::ChannelInterface* channel, grpc::CompletionQueue* cq, 203 const grpc::internal::RpcMethod& method, grpc::ClientContext* context, 204 const W& request, bool start) { 205 auto* result = ClientAsyncResponseReaderHelper::Create<R>( 206 channel, cq, method, context, request); 207 if (start) { 208 result->StartCall(); 209 } 210 return result; 211 } 212 }; 213 214 } // namespace internal 215 216 /// Async API for client-side unary RPCs, where the message response 217 /// received from the server is of type \a R. 218 template <class R> 219 class ClientAsyncResponseReader final 220 : public ClientAsyncResponseReaderInterface<R> { 221 public: 222 // always allocated against a call arena, no memory free required delete(void *,std::size_t size)223 static void operator delete(void* /*ptr*/, std::size_t size) { 224 ABSL_CHECK_EQ(size, sizeof(ClientAsyncResponseReader)); 225 } 226 227 // This operator should never be called as the memory should be freed as part 228 // of the arena destruction. It only exists to provide a matching operator 229 // delete to the operator new so that some compilers will not complain (see 230 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 231 // there are no tests catching the compiler warning. delete(void *,void *)232 static void operator delete(void*, void*) { ABSL_CHECK(false); } 233 StartCall()234 void StartCall() override { 235 ABSL_DCHECK(!started_); 236 started_ = true; 237 internal::ClientAsyncResponseReaderHelper::StartCall(context_, single_buf_); 238 } 239 240 /// See \a ClientAsyncResponseReaderInterface::ReadInitialMetadata for 241 /// semantics. 242 /// 243 /// Side effect: 244 /// - the \a ClientContext associated with this call is updated with 245 /// possible initial and trailing metadata sent from the server. ReadInitialMetadata(void * tag)246 void ReadInitialMetadata(void* tag) override { 247 ABSL_DCHECK(started_); 248 ABSL_DCHECK(!context_->initial_metadata_received_); 249 read_initial_metadata_(context_, &call_, single_buf_, tag); 250 initial_metadata_read_ = true; 251 } 252 253 /// See \a ClientAsyncResponseReaderInterface::Finish for semantics. 254 /// 255 /// Side effect: 256 /// - the \a ClientContext associated with this call is updated with 257 /// possible initial and trailing metadata sent from the server. Finish(R * msg,grpc::Status * status,void * tag)258 void Finish(R* msg, grpc::Status* status, void* tag) override { 259 ABSL_DCHECK(started_); 260 finish_(context_, &call_, initial_metadata_read_, single_buf_, &finish_buf_, 261 static_cast<void*>(msg), status, tag); 262 } 263 264 private: 265 friend class internal::ClientAsyncResponseReaderHelper; 266 grpc::ClientContext* const context_; 267 grpc::internal::Call call_; 268 bool started_ = false; 269 bool initial_metadata_read_ = false; 270 ClientAsyncResponseReader(grpc::internal::Call call,grpc::ClientContext * context)271 ClientAsyncResponseReader(grpc::internal::Call call, 272 grpc::ClientContext* context) 273 : context_(context), call_(call) {} 274 275 // disable operator new 276 static void* operator new(std::size_t size); new(std::size_t,void * p)277 static void* operator new(std::size_t /*size*/, void* p) { return p; } 278 279 internal::CallOpSendInitialMetadata* single_buf_; 280 internal::CallOpSetInterface* finish_buf_ = nullptr; 281 std::function<void(ClientContext*, internal::Call*, 282 internal::CallOpSendInitialMetadata*, void*)> 283 read_initial_metadata_; 284 std::function<void(ClientContext*, internal::Call*, 285 bool initial_metadata_read, 286 internal::CallOpSendInitialMetadata*, 287 internal::CallOpSetInterface**, void*, Status*, void*)> 288 finish_; 289 }; 290 291 /// Async server-side API for handling unary calls, where the single 292 /// response message sent to the client is of type \a W. 293 template <class W> 294 class ServerAsyncResponseWriter final 295 : public grpc::internal::ServerAsyncStreamingInterface { 296 public: ServerAsyncResponseWriter(grpc::ServerContext * ctx)297 explicit ServerAsyncResponseWriter(grpc::ServerContext* ctx) 298 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 299 300 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 301 /// 302 /// Side effect: 303 /// The initial metadata that will be sent to the client from this op will 304 /// be taken from the \a ServerContext associated with the call. 305 /// 306 /// \param[in] tag Tag identifying this request. SendInitialMetadata(void * tag)307 void SendInitialMetadata(void* tag) override { 308 ABSL_CHECK(!ctx_->sent_initial_metadata_); 309 310 meta_buf_.set_output_tag(tag); 311 meta_buf_.SendInitialMetadata(&ctx_->initial_metadata_, 312 ctx_->initial_metadata_flags()); 313 if (ctx_->compression_level_set()) { 314 meta_buf_.set_compression_level(ctx_->compression_level()); 315 } 316 ctx_->sent_initial_metadata_ = true; 317 call_.PerformOps(&meta_buf_); 318 } 319 320 /// Indicate that the stream is to be finished and request notification 321 /// when the server has sent the appropriate signals to the client to 322 /// end the call. Should not be used concurrently with other operations. 323 /// 324 /// \param[in] tag Tag identifying this request. 325 /// \param[in] status To be sent to the client as the result of the call. 326 /// \param[in] msg Message to be sent to the client. 327 /// 328 /// Side effect: 329 /// - also sends initial metadata if not already sent (using the 330 /// \a ServerContext associated with this call). 331 /// 332 /// Note: if \a status has a non-OK code, then \a msg will not be sent, 333 /// and the client will receive only the status with possible trailing 334 /// metadata. 335 /// 336 /// gRPC doesn't take ownership or a reference to msg and status, so it is 337 /// safe to deallocate them once the Finish operation is complete (i.e. a 338 /// result arrives in the completion queue). Finish(const W & msg,const grpc::Status & status,void * tag)339 void Finish(const W& msg, const grpc::Status& status, void* tag) { 340 finish_buf_.set_output_tag(tag); 341 finish_buf_.set_core_cq_tag(&finish_buf_); 342 if (!ctx_->sent_initial_metadata_) { 343 finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, 344 ctx_->initial_metadata_flags()); 345 if (ctx_->compression_level_set()) { 346 finish_buf_.set_compression_level(ctx_->compression_level()); 347 } 348 ctx_->sent_initial_metadata_ = true; 349 } 350 // The response is dropped if the status is not OK. 351 if (status.ok()) { 352 finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, 353 finish_buf_.SendMessage(msg)); 354 } else { 355 finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status); 356 } 357 call_.PerformOps(&finish_buf_); 358 } 359 360 /// Indicate that the stream is to be finished with a non-OK status, 361 /// and request notification for when the server has finished sending the 362 /// appropriate signals to the client to end the call. 363 /// Should not be used concurrently with other operations. 364 /// 365 /// \param[in] tag Tag identifying this request. 366 /// \param[in] status To be sent to the client as the result of the call. 367 /// - Note: \a status must have a non-OK code. 368 /// 369 /// Side effect: 370 /// - also sends initial metadata if not already sent (using the 371 /// \a ServerContext associated with this call). 372 /// 373 /// gRPC doesn't take ownership or a reference to status, so it is safe to 374 /// deallocate them once the Finish operation is complete (i.e. a result 375 /// arrives in the completion queue). FinishWithError(const grpc::Status & status,void * tag)376 void FinishWithError(const grpc::Status& status, void* tag) { 377 ABSL_CHECK(!status.ok()); 378 finish_buf_.set_output_tag(tag); 379 if (!ctx_->sent_initial_metadata_) { 380 finish_buf_.SendInitialMetadata(&ctx_->initial_metadata_, 381 ctx_->initial_metadata_flags()); 382 if (ctx_->compression_level_set()) { 383 finish_buf_.set_compression_level(ctx_->compression_level()); 384 } 385 ctx_->sent_initial_metadata_ = true; 386 } 387 finish_buf_.ServerSendStatus(&ctx_->trailing_metadata_, status); 388 call_.PerformOps(&finish_buf_); 389 } 390 391 private: BindCall(grpc::internal::Call * call)392 void BindCall(grpc::internal::Call* call) override { call_ = *call; } 393 394 grpc::internal::Call call_; 395 grpc::ServerContext* ctx_; 396 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 397 meta_buf_; 398 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 399 grpc::internal::CallOpSendMessage, 400 grpc::internal::CallOpServerSendStatus> 401 finish_buf_; 402 }; 403 404 } // namespace grpc 405 406 namespace std { 407 template <class R> 408 class default_delete<grpc::ClientAsyncResponseReader<R>> { 409 public: operator()410 void operator()(void* /*p*/) {} 411 }; 412 template <class R> 413 class default_delete<grpc::ClientAsyncResponseReaderInterface<R>> { 414 public: operator()415 void operator()(void* /*p*/) {} 416 }; 417 } // namespace std 418 419 #endif // GRPCPP_SUPPORT_ASYNC_UNARY_CALL_H 420