1 /* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_IMPL_H 19 #define GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_IMPL_H 20 21 #include <grpcpp/impl/codegen/call.h> 22 #include <grpcpp/impl/codegen/channel_interface.h> 23 #include <grpcpp/impl/codegen/core_codegen_interface.h> 24 #include <grpcpp/impl/codegen/server_context_impl.h> 25 #include <grpcpp/impl/codegen/service_type.h> 26 #include <grpcpp/impl/codegen/status.h> 27 28 namespace grpc_impl { 29 30 namespace internal { 31 /// Common interface for all client side asynchronous streaming. 32 class ClientAsyncStreamingInterface { 33 public: ~ClientAsyncStreamingInterface()34 virtual ~ClientAsyncStreamingInterface() {} 35 36 /// Start the call that was set up by the constructor, but only if the 37 /// constructor was invoked through the "Prepare" API which doesn't actually 38 /// start the call 39 virtual void StartCall(void* tag) = 0; 40 41 /// Request notification of the reading of the initial metadata. Completion 42 /// will be notified by \a tag on the associated completion queue. 43 /// This call is optional, but if it is used, it cannot be used concurrently 44 /// with or after the \a AsyncReaderInterface::Read method. 45 /// 46 /// \param[in] tag Tag identifying this request. 47 virtual void ReadInitialMetadata(void* tag) = 0; 48 49 /// Indicate that the stream is to be finished and request notification for 50 /// when the call has been ended. 51 /// Should not be used concurrently with other operations. 52 /// 53 /// It is appropriate to call this method exactly once when both: 54 /// * the client side has no more message to send 55 /// (this can be declared implicitly by calling this method, or 56 /// explicitly through an earlier call to the <i>WritesDone</i> method 57 /// of the class in use, e.g. \a ClientAsyncWriterInterface::WritesDone or 58 /// \a ClientAsyncReaderWriterInterface::WritesDone). 59 /// * there are no more messages to be received from the server (this can 60 /// be known implicitly by the calling code, or explicitly from an 61 /// earlier call to \a AsyncReaderInterface::Read that yielded a failed 62 /// result, e.g. cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). 63 /// 64 /// The tag will be returned when either: 65 /// - all incoming messages have been read and the server has returned 66 /// a status. 67 /// - the server has returned a non-OK status. 68 /// - the call failed for some reason and the library generated a 69 /// status. 70 /// 71 /// Note that implementations of this method attempt to receive initial 72 /// metadata from the server if initial metadata hasn't yet been received. 73 /// 74 /// \param[in] tag Tag identifying this request. 75 /// \param[out] status To be updated with the operation status. 76 virtual void Finish(::grpc::Status* status, void* tag) = 0; 77 }; 78 79 /// An interface that yields a sequence of messages of type \a R. 80 template <class R> 81 class AsyncReaderInterface { 82 public: ~AsyncReaderInterface()83 virtual ~AsyncReaderInterface() {} 84 85 /// Read a message of type \a R into \a msg. Completion will be notified by \a 86 /// tag on the associated completion queue. 87 /// This is thread-safe with respect to \a Write or \a WritesDone methods. It 88 /// should not be called concurrently with other streaming APIs 89 /// on the same stream. It is not meaningful to call it concurrently 90 /// with another \a AsyncReaderInterface::Read on the same stream since reads 91 /// on the same stream are delivered in order. 92 /// 93 /// \param[out] msg Where to eventually store the read message. 94 /// \param[in] tag The tag identifying the operation. 95 /// 96 /// Side effect: note that this method attempt to receive initial metadata for 97 /// a stream if it hasn't yet been received. 98 virtual void Read(R* msg, void* tag) = 0; 99 }; 100 101 /// An interface that can be fed a sequence of messages of type \a W. 102 template <class W> 103 class AsyncWriterInterface { 104 public: ~AsyncWriterInterface()105 virtual ~AsyncWriterInterface() {} 106 107 /// Request the writing of \a msg with identifying tag \a tag. 108 /// 109 /// Only one write may be outstanding at any given time. This means that 110 /// after calling Write, one must wait to receive \a tag from the completion 111 /// queue BEFORE calling Write again. 112 /// This is thread-safe with respect to \a AsyncReaderInterface::Read 113 /// 114 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to 115 /// to deallocate once Write returns. 116 /// 117 /// \param[in] msg The message to be written. 118 /// \param[in] tag The tag identifying the operation. 119 virtual void Write(const W& msg, void* tag) = 0; 120 121 /// Request the writing of \a msg using WriteOptions \a options with 122 /// identifying tag \a tag. 123 /// 124 /// Only one write may be outstanding at any given time. This means that 125 /// after calling Write, one must wait to receive \a tag from the completion 126 /// queue BEFORE calling Write again. 127 /// WriteOptions \a options is used to set the write options of this message. 128 /// This is thread-safe with respect to \a AsyncReaderInterface::Read 129 /// 130 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to 131 /// to deallocate once Write returns. 132 /// 133 /// \param[in] msg The message to be written. 134 /// \param[in] options The WriteOptions to be used to write this message. 135 /// \param[in] tag The tag identifying the operation. 136 virtual void Write(const W& msg, ::grpc::WriteOptions options, void* tag) = 0; 137 138 /// Request the writing of \a msg and coalesce it with the writing 139 /// of trailing metadata, using WriteOptions \a options with 140 /// identifying tag \a tag. 141 /// 142 /// For client, WriteLast is equivalent of performing Write and 143 /// WritesDone in a single step. 144 /// For server, WriteLast buffers the \a msg. The writing of \a msg is held 145 /// until Finish is called, where \a msg and trailing metadata are coalesced 146 /// and write is initiated. Note that WriteLast can only buffer \a msg up to 147 /// the flow control window size. If \a msg size is larger than the window 148 /// size, it will be sent on wire without buffering. 149 /// 150 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to 151 /// to deallocate once Write returns. 152 /// 153 /// \param[in] msg The message to be written. 154 /// \param[in] options The WriteOptions to be used to write this message. 155 /// \param[in] tag The tag identifying the operation. WriteLast(const W & msg,::grpc::WriteOptions options,void * tag)156 void WriteLast(const W& msg, ::grpc::WriteOptions options, void* tag) { 157 Write(msg, options.set_last_message(), tag); 158 } 159 }; 160 161 } // namespace internal 162 163 template <class R> 164 class ClientAsyncReaderInterface 165 : public internal::ClientAsyncStreamingInterface, 166 public internal::AsyncReaderInterface<R> {}; 167 168 namespace internal { 169 template <class R> 170 class ClientAsyncReaderFactory { 171 public: 172 /// Create a stream object. 173 /// Write the first request out if \a start is set. 174 /// \a tag will be notified on \a cq when the call has been started and 175 /// \a request has been written out. If \a start is not set, \a tag must be 176 /// nullptr and the actual call must be initiated by StartCall 177 /// Note that \a context will be used to fill in custom initial metadata 178 /// used to send to the server when starting the call. 179 template <class W> Create(::grpc::ChannelInterface * channel,::grpc_impl::CompletionQueue * cq,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,const W & request,bool start,void * tag)180 static ClientAsyncReader<R>* Create(::grpc::ChannelInterface* channel, 181 ::grpc_impl::CompletionQueue* cq, 182 const ::grpc::internal::RpcMethod& method, 183 ::grpc_impl::ClientContext* context, 184 const W& request, bool start, void* tag) { 185 ::grpc::internal::Call call = channel->CreateCall(method, context, cq); 186 return new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 187 call.call(), sizeof(ClientAsyncReader<R>))) 188 ClientAsyncReader<R>(call, context, request, start, tag); 189 } 190 }; 191 } // namespace internal 192 193 /// Async client-side API for doing server-streaming RPCs, 194 /// where the incoming message stream coming from the server has 195 /// messages of type \a R. 196 template <class R> 197 class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { 198 public: 199 // always allocated against a call arena, no memory free required delete(void *,std::size_t size)200 static void operator delete(void* /*ptr*/, std::size_t size) { 201 GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncReader)); 202 } 203 204 // This operator should never be called as the memory should be freed as part 205 // of the arena destruction. It only exists to provide a matching operator 206 // delete to the operator new so that some compilers will not complain (see 207 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 208 // there are no tests catching the compiler warning. delete(void *,void *)209 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } 210 StartCall(void * tag)211 void StartCall(void* tag) override { 212 GPR_CODEGEN_ASSERT(!started_); 213 started_ = true; 214 StartCallInternal(tag); 215 } 216 217 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata 218 /// method for semantics. 219 /// 220 /// Side effect: 221 /// - upon receiving initial metadata from the server, 222 /// the \a ClientContext associated with this call is updated, and the 223 /// calling code can access the received metadata through the 224 /// \a ClientContext. ReadInitialMetadata(void * tag)225 void ReadInitialMetadata(void* tag) override { 226 GPR_CODEGEN_ASSERT(started_); 227 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 228 229 meta_ops_.set_output_tag(tag); 230 meta_ops_.RecvInitialMetadata(context_); 231 call_.PerformOps(&meta_ops_); 232 } 233 Read(R * msg,void * tag)234 void Read(R* msg, void* tag) override { 235 GPR_CODEGEN_ASSERT(started_); 236 read_ops_.set_output_tag(tag); 237 if (!context_->initial_metadata_received_) { 238 read_ops_.RecvInitialMetadata(context_); 239 } 240 read_ops_.RecvMessage(msg); 241 call_.PerformOps(&read_ops_); 242 } 243 244 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. 245 /// 246 /// Side effect: 247 /// - the \a ClientContext associated with this call is updated with 248 /// possible initial and trailing metadata received from the server. Finish(::grpc::Status * status,void * tag)249 void Finish(::grpc::Status* status, void* tag) override { 250 GPR_CODEGEN_ASSERT(started_); 251 finish_ops_.set_output_tag(tag); 252 if (!context_->initial_metadata_received_) { 253 finish_ops_.RecvInitialMetadata(context_); 254 } 255 finish_ops_.ClientRecvStatus(context_, status); 256 call_.PerformOps(&finish_ops_); 257 } 258 259 private: 260 friend class internal::ClientAsyncReaderFactory<R>; 261 template <class W> ClientAsyncReader(::grpc::internal::Call call,::grpc_impl::ClientContext * context,const W & request,bool start,void * tag)262 ClientAsyncReader(::grpc::internal::Call call, 263 ::grpc_impl::ClientContext* context, const W& request, 264 bool start, void* tag) 265 : context_(context), call_(call), started_(start) { 266 // TODO(ctiller): don't assert 267 GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); 268 init_ops_.ClientSendClose(); 269 if (start) { 270 StartCallInternal(tag); 271 } else { 272 GPR_CODEGEN_ASSERT(tag == nullptr); 273 } 274 } 275 StartCallInternal(void * tag)276 void StartCallInternal(void* tag) { 277 init_ops_.SendInitialMetadata(&context_->send_initial_metadata_, 278 context_->initial_metadata_flags()); 279 init_ops_.set_output_tag(tag); 280 call_.PerformOps(&init_ops_); 281 } 282 283 ::grpc_impl::ClientContext* context_; 284 ::grpc::internal::Call call_; 285 bool started_; 286 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 287 ::grpc::internal::CallOpSendMessage, 288 ::grpc::internal::CallOpClientSendClose> 289 init_ops_; 290 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 291 meta_ops_; 292 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 293 ::grpc::internal::CallOpRecvMessage<R>> 294 read_ops_; 295 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 296 ::grpc::internal::CallOpClientRecvStatus> 297 finish_ops_; 298 }; 299 300 /// Common interface for client side asynchronous writing. 301 template <class W> 302 class ClientAsyncWriterInterface 303 : public internal::ClientAsyncStreamingInterface, 304 public internal::AsyncWriterInterface<W> { 305 public: 306 /// Signal the client is done with the writes (half-close the client stream). 307 /// Thread-safe with respect to \a AsyncReaderInterface::Read 308 /// 309 /// \param[in] tag The tag identifying the operation. 310 virtual void WritesDone(void* tag) = 0; 311 }; 312 313 namespace internal { 314 template <class W> 315 class ClientAsyncWriterFactory { 316 public: 317 /// Create a stream object. 318 /// Start the RPC if \a start is set 319 /// \a tag will be notified on \a cq when the call has been started (i.e. 320 /// intitial metadata sent) and \a request has been written out. 321 /// If \a start is not set, \a tag must be nullptr and the actual call 322 /// must be initiated by StartCall 323 /// Note that \a context will be used to fill in custom initial metadata 324 /// used to send to the server when starting the call. 325 /// \a response will be filled in with the single expected response 326 /// message from the server upon a successful call to the \a Finish 327 /// method of this instance. 328 template <class R> Create(::grpc::ChannelInterface * channel,::grpc_impl::CompletionQueue * cq,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,R * response,bool start,void * tag)329 static ClientAsyncWriter<W>* Create(::grpc::ChannelInterface* channel, 330 ::grpc_impl::CompletionQueue* cq, 331 const ::grpc::internal::RpcMethod& method, 332 ::grpc_impl::ClientContext* context, 333 R* response, bool start, void* tag) { 334 ::grpc::internal::Call call = channel->CreateCall(method, context, cq); 335 return new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 336 call.call(), sizeof(ClientAsyncWriter<W>))) 337 ClientAsyncWriter<W>(call, context, response, start, tag); 338 } 339 }; 340 } // namespace internal 341 342 /// Async API on the client side for doing client-streaming RPCs, 343 /// where the outgoing message stream going to the server contains 344 /// messages of type \a W. 345 template <class W> 346 class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { 347 public: 348 // always allocated against a call arena, no memory free required delete(void *,std::size_t size)349 static void operator delete(void* /*ptr*/, std::size_t size) { 350 GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncWriter)); 351 } 352 353 // This operator should never be called as the memory should be freed as part 354 // of the arena destruction. It only exists to provide a matching operator 355 // delete to the operator new so that some compilers will not complain (see 356 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 357 // there are no tests catching the compiler warning. delete(void *,void *)358 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } 359 StartCall(void * tag)360 void StartCall(void* tag) override { 361 GPR_CODEGEN_ASSERT(!started_); 362 started_ = true; 363 StartCallInternal(tag); 364 } 365 366 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method for 367 /// semantics. 368 /// 369 /// Side effect: 370 /// - upon receiving initial metadata from the server, the \a ClientContext 371 /// associated with this call is updated, and the calling code can access 372 /// the received metadata through the \a ClientContext. ReadInitialMetadata(void * tag)373 void ReadInitialMetadata(void* tag) override { 374 GPR_CODEGEN_ASSERT(started_); 375 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 376 377 meta_ops_.set_output_tag(tag); 378 meta_ops_.RecvInitialMetadata(context_); 379 call_.PerformOps(&meta_ops_); 380 } 381 Write(const W & msg,void * tag)382 void Write(const W& msg, void* tag) override { 383 GPR_CODEGEN_ASSERT(started_); 384 write_ops_.set_output_tag(tag); 385 // TODO(ctiller): don't assert 386 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); 387 call_.PerformOps(&write_ops_); 388 } 389 Write(const W & msg,::grpc::WriteOptions options,void * tag)390 void Write(const W& msg, ::grpc::WriteOptions options, void* tag) override { 391 GPR_CODEGEN_ASSERT(started_); 392 write_ops_.set_output_tag(tag); 393 if (options.is_last_message()) { 394 options.set_buffer_hint(); 395 write_ops_.ClientSendClose(); 396 } 397 // TODO(ctiller): don't assert 398 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 399 call_.PerformOps(&write_ops_); 400 } 401 WritesDone(void * tag)402 void WritesDone(void* tag) override { 403 GPR_CODEGEN_ASSERT(started_); 404 write_ops_.set_output_tag(tag); 405 write_ops_.ClientSendClose(); 406 call_.PerformOps(&write_ops_); 407 } 408 409 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. 410 /// 411 /// Side effect: 412 /// - the \a ClientContext associated with this call is updated with 413 /// possible initial and trailing metadata received from the server. 414 /// - attempts to fill in the \a response parameter passed to this class's 415 /// constructor with the server's response message. Finish(::grpc::Status * status,void * tag)416 void Finish(::grpc::Status* status, void* tag) override { 417 GPR_CODEGEN_ASSERT(started_); 418 finish_ops_.set_output_tag(tag); 419 if (!context_->initial_metadata_received_) { 420 finish_ops_.RecvInitialMetadata(context_); 421 } 422 finish_ops_.ClientRecvStatus(context_, status); 423 call_.PerformOps(&finish_ops_); 424 } 425 426 private: 427 friend class internal::ClientAsyncWriterFactory<W>; 428 template <class R> ClientAsyncWriter(::grpc::internal::Call call,::grpc_impl::ClientContext * context,R * response,bool start,void * tag)429 ClientAsyncWriter(::grpc::internal::Call call, 430 ::grpc_impl::ClientContext* context, R* response, 431 bool start, void* tag) 432 : context_(context), call_(call), started_(start) { 433 finish_ops_.RecvMessage(response); 434 finish_ops_.AllowNoMessage(); 435 if (start) { 436 StartCallInternal(tag); 437 } else { 438 GPR_CODEGEN_ASSERT(tag == nullptr); 439 } 440 } 441 StartCallInternal(void * tag)442 void StartCallInternal(void* tag) { 443 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, 444 context_->initial_metadata_flags()); 445 // if corked bit is set in context, we just keep the initial metadata 446 // buffered up to coalesce with later message send. No op is performed. 447 if (!context_->initial_metadata_corked_) { 448 write_ops_.set_output_tag(tag); 449 call_.PerformOps(&write_ops_); 450 } 451 } 452 453 ::grpc_impl::ClientContext* context_; 454 ::grpc::internal::Call call_; 455 bool started_; 456 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 457 meta_ops_; 458 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 459 ::grpc::internal::CallOpSendMessage, 460 ::grpc::internal::CallOpClientSendClose> 461 write_ops_; 462 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 463 ::grpc::internal::CallOpGenericRecvMessage, 464 ::grpc::internal::CallOpClientRecvStatus> 465 finish_ops_; 466 }; 467 468 /// Async client-side interface for bi-directional streaming, 469 /// where the client-to-server message stream has messages of type \a W, 470 /// and the server-to-client message stream has messages of type \a R. 471 template <class W, class R> 472 class ClientAsyncReaderWriterInterface 473 : public internal::ClientAsyncStreamingInterface, 474 public internal::AsyncWriterInterface<W>, 475 public internal::AsyncReaderInterface<R> { 476 public: 477 /// Signal the client is done with the writes (half-close the client stream). 478 /// Thread-safe with respect to \a AsyncReaderInterface::Read 479 /// 480 /// \param[in] tag The tag identifying the operation. 481 virtual void WritesDone(void* tag) = 0; 482 }; 483 484 namespace internal { 485 template <class W, class R> 486 class ClientAsyncReaderWriterFactory { 487 public: 488 /// Create a stream object. 489 /// Start the RPC request if \a start is set. 490 /// \a tag will be notified on \a cq when the call has been started (i.e. 491 /// intitial metadata sent). If \a start is not set, \a tag must be 492 /// nullptr and the actual call must be initiated by StartCall 493 /// Note that \a context will be used to fill in custom initial metadata 494 /// used to send to the server when starting the call. Create(::grpc::ChannelInterface * channel,::grpc_impl::CompletionQueue * cq,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,bool start,void * tag)495 static ClientAsyncReaderWriter<W, R>* Create( 496 ::grpc::ChannelInterface* channel, ::grpc_impl::CompletionQueue* cq, 497 const ::grpc::internal::RpcMethod& method, 498 ::grpc_impl::ClientContext* context, bool start, void* tag) { 499 ::grpc::internal::Call call = channel->CreateCall(method, context, cq); 500 501 return new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 502 call.call(), sizeof(ClientAsyncReaderWriter<W, R>))) 503 ClientAsyncReaderWriter<W, R>(call, context, start, tag); 504 } 505 }; 506 } // namespace internal 507 508 /// Async client-side interface for bi-directional streaming, 509 /// where the outgoing message stream going to the server 510 /// has messages of type \a W, and the incoming message stream coming 511 /// from the server has messages of type \a R. 512 template <class W, class R> 513 class ClientAsyncReaderWriter final 514 : public ClientAsyncReaderWriterInterface<W, R> { 515 public: 516 // always allocated against a call arena, no memory free required delete(void *,std::size_t size)517 static void operator delete(void* /*ptr*/, std::size_t size) { 518 GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncReaderWriter)); 519 } 520 521 // This operator should never be called as the memory should be freed as part 522 // of the arena destruction. It only exists to provide a matching operator 523 // delete to the operator new so that some compilers will not complain (see 524 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 525 // there are no tests catching the compiler warning. delete(void *,void *)526 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } 527 StartCall(void * tag)528 void StartCall(void* tag) override { 529 GPR_CODEGEN_ASSERT(!started_); 530 started_ = true; 531 StartCallInternal(tag); 532 } 533 534 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method 535 /// for semantics of this method. 536 /// 537 /// Side effect: 538 /// - upon receiving initial metadata from the server, the \a ClientContext 539 /// is updated with it, and then the receiving initial metadata can 540 /// be accessed through this \a ClientContext. ReadInitialMetadata(void * tag)541 void ReadInitialMetadata(void* tag) override { 542 GPR_CODEGEN_ASSERT(started_); 543 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 544 545 meta_ops_.set_output_tag(tag); 546 meta_ops_.RecvInitialMetadata(context_); 547 call_.PerformOps(&meta_ops_); 548 } 549 Read(R * msg,void * tag)550 void Read(R* msg, void* tag) override { 551 GPR_CODEGEN_ASSERT(started_); 552 read_ops_.set_output_tag(tag); 553 if (!context_->initial_metadata_received_) { 554 read_ops_.RecvInitialMetadata(context_); 555 } 556 read_ops_.RecvMessage(msg); 557 call_.PerformOps(&read_ops_); 558 } 559 Write(const W & msg,void * tag)560 void Write(const W& msg, void* tag) override { 561 GPR_CODEGEN_ASSERT(started_); 562 write_ops_.set_output_tag(tag); 563 // TODO(ctiller): don't assert 564 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); 565 call_.PerformOps(&write_ops_); 566 } 567 Write(const W & msg,::grpc::WriteOptions options,void * tag)568 void Write(const W& msg, ::grpc::WriteOptions options, void* tag) override { 569 GPR_CODEGEN_ASSERT(started_); 570 write_ops_.set_output_tag(tag); 571 if (options.is_last_message()) { 572 options.set_buffer_hint(); 573 write_ops_.ClientSendClose(); 574 } 575 // TODO(ctiller): don't assert 576 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 577 call_.PerformOps(&write_ops_); 578 } 579 WritesDone(void * tag)580 void WritesDone(void* tag) override { 581 GPR_CODEGEN_ASSERT(started_); 582 write_ops_.set_output_tag(tag); 583 write_ops_.ClientSendClose(); 584 call_.PerformOps(&write_ops_); 585 } 586 587 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. 588 /// Side effect 589 /// - the \a ClientContext associated with this call is updated with 590 /// possible initial and trailing metadata sent from the server. Finish(::grpc::Status * status,void * tag)591 void Finish(::grpc::Status* status, void* tag) override { 592 GPR_CODEGEN_ASSERT(started_); 593 finish_ops_.set_output_tag(tag); 594 if (!context_->initial_metadata_received_) { 595 finish_ops_.RecvInitialMetadata(context_); 596 } 597 finish_ops_.ClientRecvStatus(context_, status); 598 call_.PerformOps(&finish_ops_); 599 } 600 601 private: 602 friend class internal::ClientAsyncReaderWriterFactory<W, R>; ClientAsyncReaderWriter(::grpc::internal::Call call,::grpc_impl::ClientContext * context,bool start,void * tag)603 ClientAsyncReaderWriter(::grpc::internal::Call call, 604 ::grpc_impl::ClientContext* context, bool start, 605 void* tag) 606 : context_(context), call_(call), started_(start) { 607 if (start) { 608 StartCallInternal(tag); 609 } else { 610 GPR_CODEGEN_ASSERT(tag == nullptr); 611 } 612 } 613 StartCallInternal(void * tag)614 void StartCallInternal(void* tag) { 615 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, 616 context_->initial_metadata_flags()); 617 // if corked bit is set in context, we just keep the initial metadata 618 // buffered up to coalesce with later message send. No op is performed. 619 if (!context_->initial_metadata_corked_) { 620 write_ops_.set_output_tag(tag); 621 call_.PerformOps(&write_ops_); 622 } 623 } 624 625 ::grpc_impl::ClientContext* context_; 626 ::grpc::internal::Call call_; 627 bool started_; 628 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 629 meta_ops_; 630 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 631 ::grpc::internal::CallOpRecvMessage<R>> 632 read_ops_; 633 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 634 ::grpc::internal::CallOpSendMessage, 635 ::grpc::internal::CallOpClientSendClose> 636 write_ops_; 637 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 638 ::grpc::internal::CallOpClientRecvStatus> 639 finish_ops_; 640 }; 641 642 template <class W, class R> 643 class ServerAsyncReaderInterface 644 : public ::grpc::internal::ServerAsyncStreamingInterface, 645 public internal::AsyncReaderInterface<R> { 646 public: 647 /// Indicate that the stream is to be finished with a certain status code 648 /// and also send out \a msg response to the client. 649 /// Request notification for when the server has sent the response and the 650 /// appropriate signals to the client to end the call. 651 /// Should not be used concurrently with other operations. 652 /// 653 /// It is appropriate to call this method when: 654 /// * all messages from the client have been received (either known 655 /// implictly, or explicitly because a previous 656 /// \a AsyncReaderInterface::Read operation with a non-ok result, 657 /// e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). 658 /// 659 /// This operation will end when the server has finished sending out initial 660 /// metadata (if not sent already), response message, and status, or if 661 /// some failure occurred when trying to do so. 662 /// 663 /// gRPC doesn't take ownership or a reference to \a msg or \a status, so it 664 /// is safe to deallocate once Finish returns. 665 /// 666 /// \param[in] tag Tag identifying this request. 667 /// \param[in] status To be sent to the client as the result of this call. 668 /// \param[in] msg To be sent to the client as the response for this call. 669 virtual void Finish(const W& msg, const ::grpc::Status& status, 670 void* tag) = 0; 671 672 /// Indicate that the stream is to be finished with a certain 673 /// non-OK status code. 674 /// Request notification for when the server has sent the appropriate 675 /// signals to the client to end the call. 676 /// Should not be used concurrently with other operations. 677 /// 678 /// This call is meant to end the call with some error, and can be called at 679 /// any point that the server would like to "fail" the call (though note 680 /// this shouldn't be called concurrently with any other "sending" call, like 681 /// \a AsyncWriterInterface::Write). 682 /// 683 /// This operation will end when the server has finished sending out initial 684 /// metadata (if not sent already), and status, or if some failure occurred 685 /// when trying to do so. 686 /// 687 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 688 /// to deallocate once FinishWithError returns. 689 /// 690 /// \param[in] tag Tag identifying this request. 691 /// \param[in] status To be sent to the client as the result of this call. 692 /// - Note: \a status must have a non-OK code. 693 virtual void FinishWithError(const ::grpc::Status& status, void* tag) = 0; 694 }; 695 696 /// Async server-side API for doing client-streaming RPCs, 697 /// where the incoming message stream from the client has messages of type \a R, 698 /// and the single response message sent from the server is type \a W. 699 template <class W, class R> 700 class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { 701 public: ServerAsyncReader(::grpc_impl::ServerContext * ctx)702 explicit ServerAsyncReader(::grpc_impl::ServerContext* ctx) 703 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 704 705 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 706 /// 707 /// Implicit input parameter: 708 /// - The initial metadata that will be sent to the client from this op will 709 /// be taken from the \a ServerContext associated with the call. SendInitialMetadata(void * tag)710 void SendInitialMetadata(void* tag) override { 711 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 712 713 meta_ops_.set_output_tag(tag); 714 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 715 ctx_->initial_metadata_flags()); 716 if (ctx_->compression_level_set()) { 717 meta_ops_.set_compression_level(ctx_->compression_level()); 718 } 719 ctx_->sent_initial_metadata_ = true; 720 call_.PerformOps(&meta_ops_); 721 } 722 Read(R * msg,void * tag)723 void Read(R* msg, void* tag) override { 724 read_ops_.set_output_tag(tag); 725 read_ops_.RecvMessage(msg); 726 call_.PerformOps(&read_ops_); 727 } 728 729 /// See the \a ServerAsyncReaderInterface.Read method for semantics 730 /// 731 /// Side effect: 732 /// - also sends initial metadata if not alreay sent. 733 /// - uses the \a ServerContext associated with this call to send possible 734 /// initial and trailing metadata. 735 /// 736 /// Note: \a msg is not sent if \a status has a non-OK code. 737 /// 738 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 739 /// is safe to deallocate once Finish returns. Finish(const W & msg,const::grpc::Status & status,void * tag)740 void Finish(const W& msg, const ::grpc::Status& status, void* tag) override { 741 finish_ops_.set_output_tag(tag); 742 if (!ctx_->sent_initial_metadata_) { 743 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 744 ctx_->initial_metadata_flags()); 745 if (ctx_->compression_level_set()) { 746 finish_ops_.set_compression_level(ctx_->compression_level()); 747 } 748 ctx_->sent_initial_metadata_ = true; 749 } 750 // The response is dropped if the status is not OK. 751 if (status.ok()) { 752 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, 753 finish_ops_.SendMessage(msg)); 754 } else { 755 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 756 } 757 call_.PerformOps(&finish_ops_); 758 } 759 760 /// See the \a ServerAsyncReaderInterface.Read method for semantics 761 /// 762 /// Side effect: 763 /// - also sends initial metadata if not alreay sent. 764 /// - uses the \a ServerContext associated with this call to send possible 765 /// initial and trailing metadata. 766 /// 767 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 768 /// to deallocate once FinishWithError returns. FinishWithError(const::grpc::Status & status,void * tag)769 void FinishWithError(const ::grpc::Status& status, void* tag) override { 770 GPR_CODEGEN_ASSERT(!status.ok()); 771 finish_ops_.set_output_tag(tag); 772 if (!ctx_->sent_initial_metadata_) { 773 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 774 ctx_->initial_metadata_flags()); 775 if (ctx_->compression_level_set()) { 776 finish_ops_.set_compression_level(ctx_->compression_level()); 777 } 778 ctx_->sent_initial_metadata_ = true; 779 } 780 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 781 call_.PerformOps(&finish_ops_); 782 } 783 784 private: BindCall(::grpc::internal::Call * call)785 void BindCall(::grpc::internal::Call* call) override { call_ = *call; } 786 787 ::grpc::internal::Call call_; 788 ::grpc_impl::ServerContext* ctx_; 789 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 790 meta_ops_; 791 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_; 792 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 793 ::grpc::internal::CallOpSendMessage, 794 ::grpc::internal::CallOpServerSendStatus> 795 finish_ops_; 796 }; 797 798 template <class W> 799 class ServerAsyncWriterInterface 800 : public ::grpc::internal::ServerAsyncStreamingInterface, 801 public internal::AsyncWriterInterface<W> { 802 public: 803 /// Indicate that the stream is to be finished with a certain status code. 804 /// Request notification for when the server has sent the appropriate 805 /// signals to the client to end the call. 806 /// Should not be used concurrently with other operations. 807 /// 808 /// It is appropriate to call this method when either: 809 /// * all messages from the client have been received (either known 810 /// implictly, or explicitly because a previous \a 811 /// AsyncReaderInterface::Read operation with a non-ok 812 /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'. 813 /// * it is desired to end the call early with some non-OK status code. 814 /// 815 /// This operation will end when the server has finished sending out initial 816 /// metadata (if not sent already), response message, and status, or if 817 /// some failure occurred when trying to do so. 818 /// 819 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 820 /// to deallocate once Finish returns. 821 /// 822 /// \param[in] tag Tag identifying this request. 823 /// \param[in] status To be sent to the client as the result of this call. 824 virtual void Finish(const ::grpc::Status& status, void* tag) = 0; 825 826 /// Request the writing of \a msg and coalesce it with trailing metadata which 827 /// contains \a status, using WriteOptions options with 828 /// identifying tag \a tag. 829 /// 830 /// WriteAndFinish is equivalent of performing WriteLast and Finish 831 /// in a single step. 832 /// 833 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 834 /// is safe to deallocate once WriteAndFinish returns. 835 /// 836 /// \param[in] msg The message to be written. 837 /// \param[in] options The WriteOptions to be used to write this message. 838 /// \param[in] status The Status that server returns to client. 839 /// \param[in] tag The tag identifying the operation. 840 virtual void WriteAndFinish(const W& msg, ::grpc::WriteOptions options, 841 const ::grpc::Status& status, void* tag) = 0; 842 }; 843 844 /// Async server-side API for doing server streaming RPCs, 845 /// where the outgoing message stream from the server has messages of type \a W. 846 template <class W> 847 class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { 848 public: ServerAsyncWriter(::grpc_impl::ServerContext * ctx)849 explicit ServerAsyncWriter(::grpc_impl::ServerContext* ctx) 850 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 851 852 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 853 /// 854 /// Implicit input parameter: 855 /// - The initial metadata that will be sent to the client from this op will 856 /// be taken from the \a ServerContext associated with the call. 857 /// 858 /// \param[in] tag Tag identifying this request. SendInitialMetadata(void * tag)859 void SendInitialMetadata(void* tag) override { 860 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 861 862 meta_ops_.set_output_tag(tag); 863 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 864 ctx_->initial_metadata_flags()); 865 if (ctx_->compression_level_set()) { 866 meta_ops_.set_compression_level(ctx_->compression_level()); 867 } 868 ctx_->sent_initial_metadata_ = true; 869 call_.PerformOps(&meta_ops_); 870 } 871 Write(const W & msg,void * tag)872 void Write(const W& msg, void* tag) override { 873 write_ops_.set_output_tag(tag); 874 EnsureInitialMetadataSent(&write_ops_); 875 // TODO(ctiller): don't assert 876 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); 877 call_.PerformOps(&write_ops_); 878 } 879 Write(const W & msg,::grpc::WriteOptions options,void * tag)880 void Write(const W& msg, ::grpc::WriteOptions options, void* tag) override { 881 write_ops_.set_output_tag(tag); 882 if (options.is_last_message()) { 883 options.set_buffer_hint(); 884 } 885 886 EnsureInitialMetadataSent(&write_ops_); 887 // TODO(ctiller): don't assert 888 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 889 call_.PerformOps(&write_ops_); 890 } 891 892 /// See the \a ServerAsyncWriterInterface.WriteAndFinish method for semantics. 893 /// 894 /// Implicit input parameter: 895 /// - the \a ServerContext associated with this call is used 896 /// for sending trailing (and initial) metadata to the client. 897 /// 898 /// Note: \a status must have an OK code. 899 /// 900 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 901 /// is safe to deallocate once WriteAndFinish returns. WriteAndFinish(const W & msg,::grpc::WriteOptions options,const::grpc::Status & status,void * tag)902 void WriteAndFinish(const W& msg, ::grpc::WriteOptions options, 903 const ::grpc::Status& status, void* tag) override { 904 write_ops_.set_output_tag(tag); 905 EnsureInitialMetadataSent(&write_ops_); 906 options.set_buffer_hint(); 907 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 908 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 909 call_.PerformOps(&write_ops_); 910 } 911 912 /// See the \a ServerAsyncWriterInterface.Finish method for semantics. 913 /// 914 /// Implicit input parameter: 915 /// - the \a ServerContext associated with this call is used for sending 916 /// trailing (and initial if not already sent) metadata to the client. 917 /// 918 /// Note: there are no restrictions are the code of 919 /// \a status,it may be non-OK 920 /// 921 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 922 /// to deallocate once Finish returns. Finish(const::grpc::Status & status,void * tag)923 void Finish(const ::grpc::Status& status, void* tag) override { 924 finish_ops_.set_output_tag(tag); 925 EnsureInitialMetadataSent(&finish_ops_); 926 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 927 call_.PerformOps(&finish_ops_); 928 } 929 930 private: BindCall(::grpc::internal::Call * call)931 void BindCall(::grpc::internal::Call* call) override { call_ = *call; } 932 933 template <class T> EnsureInitialMetadataSent(T * ops)934 void EnsureInitialMetadataSent(T* ops) { 935 if (!ctx_->sent_initial_metadata_) { 936 ops->SendInitialMetadata(&ctx_->initial_metadata_, 937 ctx_->initial_metadata_flags()); 938 if (ctx_->compression_level_set()) { 939 ops->set_compression_level(ctx_->compression_level()); 940 } 941 ctx_->sent_initial_metadata_ = true; 942 } 943 } 944 945 ::grpc::internal::Call call_; 946 ::grpc_impl::ServerContext* ctx_; 947 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 948 meta_ops_; 949 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 950 ::grpc::internal::CallOpSendMessage, 951 ::grpc::internal::CallOpServerSendStatus> 952 write_ops_; 953 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 954 ::grpc::internal::CallOpServerSendStatus> 955 finish_ops_; 956 }; 957 958 /// Server-side interface for asynchronous bi-directional streaming. 959 template <class W, class R> 960 class ServerAsyncReaderWriterInterface 961 : public ::grpc::internal::ServerAsyncStreamingInterface, 962 public internal::AsyncWriterInterface<W>, 963 public internal::AsyncReaderInterface<R> { 964 public: 965 /// Indicate that the stream is to be finished with a certain status code. 966 /// Request notification for when the server has sent the appropriate 967 /// signals to the client to end the call. 968 /// Should not be used concurrently with other operations. 969 /// 970 /// It is appropriate to call this method when either: 971 /// * all messages from the client have been received (either known 972 /// implictly, or explicitly because a previous \a 973 /// AsyncReaderInterface::Read operation 974 /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' 975 /// with 'false'. 976 /// * it is desired to end the call early with some non-OK status code. 977 /// 978 /// This operation will end when the server has finished sending out initial 979 /// metadata (if not sent already), response message, and status, or if some 980 /// failure occurred when trying to do so. 981 /// 982 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 983 /// to deallocate once Finish returns. 984 /// 985 /// \param[in] tag Tag identifying this request. 986 /// \param[in] status To be sent to the client as the result of this call. 987 virtual void Finish(const ::grpc::Status& status, void* tag) = 0; 988 989 /// Request the writing of \a msg and coalesce it with trailing metadata which 990 /// contains \a status, using WriteOptions options with 991 /// identifying tag \a tag. 992 /// 993 /// WriteAndFinish is equivalent of performing WriteLast and Finish in a 994 /// single step. 995 /// 996 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 997 /// is safe to deallocate once WriteAndFinish returns. 998 /// 999 /// \param[in] msg The message to be written. 1000 /// \param[in] options The WriteOptions to be used to write this message. 1001 /// \param[in] status The Status that server returns to client. 1002 /// \param[in] tag The tag identifying the operation. 1003 virtual void WriteAndFinish(const W& msg, ::grpc::WriteOptions options, 1004 const ::grpc::Status& status, void* tag) = 0; 1005 }; 1006 1007 /// Async server-side API for doing bidirectional streaming RPCs, 1008 /// where the incoming message stream coming from the client has messages of 1009 /// type \a R, and the outgoing message stream coming from the server has 1010 /// messages of type \a W. 1011 template <class W, class R> 1012 class ServerAsyncReaderWriter final 1013 : public ServerAsyncReaderWriterInterface<W, R> { 1014 public: ServerAsyncReaderWriter(::grpc_impl::ServerContext * ctx)1015 explicit ServerAsyncReaderWriter(::grpc_impl::ServerContext* ctx) 1016 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 1017 1018 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 1019 /// 1020 /// Implicit input parameter: 1021 /// - The initial metadata that will be sent to the client from this op will 1022 /// be taken from the \a ServerContext associated with the call. 1023 /// 1024 /// \param[in] tag Tag identifying this request. SendInitialMetadata(void * tag)1025 void SendInitialMetadata(void* tag) override { 1026 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 1027 1028 meta_ops_.set_output_tag(tag); 1029 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 1030 ctx_->initial_metadata_flags()); 1031 if (ctx_->compression_level_set()) { 1032 meta_ops_.set_compression_level(ctx_->compression_level()); 1033 } 1034 ctx_->sent_initial_metadata_ = true; 1035 call_.PerformOps(&meta_ops_); 1036 } 1037 Read(R * msg,void * tag)1038 void Read(R* msg, void* tag) override { 1039 read_ops_.set_output_tag(tag); 1040 read_ops_.RecvMessage(msg); 1041 call_.PerformOps(&read_ops_); 1042 } 1043 Write(const W & msg,void * tag)1044 void Write(const W& msg, void* tag) override { 1045 write_ops_.set_output_tag(tag); 1046 EnsureInitialMetadataSent(&write_ops_); 1047 // TODO(ctiller): don't assert 1048 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); 1049 call_.PerformOps(&write_ops_); 1050 } 1051 Write(const W & msg,::grpc::WriteOptions options,void * tag)1052 void Write(const W& msg, ::grpc::WriteOptions options, void* tag) override { 1053 write_ops_.set_output_tag(tag); 1054 if (options.is_last_message()) { 1055 options.set_buffer_hint(); 1056 } 1057 EnsureInitialMetadataSent(&write_ops_); 1058 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 1059 call_.PerformOps(&write_ops_); 1060 } 1061 1062 /// See the \a ServerAsyncReaderWriterInterface.WriteAndFinish 1063 /// method for semantics. 1064 /// 1065 /// Implicit input parameter: 1066 /// - the \a ServerContext associated with this call is used 1067 /// for sending trailing (and initial) metadata to the client. 1068 /// 1069 /// Note: \a status must have an OK code. 1070 // 1071 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 1072 /// is safe to deallocate once WriteAndFinish returns. WriteAndFinish(const W & msg,::grpc::WriteOptions options,const::grpc::Status & status,void * tag)1073 void WriteAndFinish(const W& msg, ::grpc::WriteOptions options, 1074 const ::grpc::Status& status, void* tag) override { 1075 write_ops_.set_output_tag(tag); 1076 EnsureInitialMetadataSent(&write_ops_); 1077 options.set_buffer_hint(); 1078 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 1079 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 1080 call_.PerformOps(&write_ops_); 1081 } 1082 1083 /// See the \a ServerAsyncReaderWriterInterface.Finish method for semantics. 1084 /// 1085 /// Implicit input parameter: 1086 /// - the \a ServerContext associated with this call is used for sending 1087 /// trailing (and initial if not already sent) metadata to the client. 1088 /// 1089 /// Note: there are no restrictions are the code of \a status, 1090 /// it may be non-OK 1091 // 1092 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 1093 /// to deallocate once Finish returns. Finish(const::grpc::Status & status,void * tag)1094 void Finish(const ::grpc::Status& status, void* tag) override { 1095 finish_ops_.set_output_tag(tag); 1096 EnsureInitialMetadataSent(&finish_ops_); 1097 1098 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 1099 call_.PerformOps(&finish_ops_); 1100 } 1101 1102 private: 1103 friend class ::grpc_impl::Server; 1104 BindCall(::grpc::internal::Call * call)1105 void BindCall(::grpc::internal::Call* call) override { call_ = *call; } 1106 1107 template <class T> EnsureInitialMetadataSent(T * ops)1108 void EnsureInitialMetadataSent(T* ops) { 1109 if (!ctx_->sent_initial_metadata_) { 1110 ops->SendInitialMetadata(&ctx_->initial_metadata_, 1111 ctx_->initial_metadata_flags()); 1112 if (ctx_->compression_level_set()) { 1113 ops->set_compression_level(ctx_->compression_level()); 1114 } 1115 ctx_->sent_initial_metadata_ = true; 1116 } 1117 } 1118 1119 ::grpc::internal::Call call_; 1120 ::grpc_impl::ServerContext* ctx_; 1121 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 1122 meta_ops_; 1123 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_; 1124 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 1125 ::grpc::internal::CallOpSendMessage, 1126 ::grpc::internal::CallOpServerSendStatus> 1127 write_ops_; 1128 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 1129 ::grpc::internal::CallOpServerSendStatus> 1130 finish_ops_; 1131 }; 1132 1133 } // namespace grpc_impl 1134 #endif // GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_IMPL_H 1135