1 // 2 // 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPCPP_SUPPORT_ASYNC_STREAM_H 20 #define GRPCPP_SUPPORT_ASYNC_STREAM_H 21 22 #include <grpc/grpc.h> 23 #include <grpcpp/impl/call.h> 24 #include <grpcpp/impl/channel_interface.h> 25 #include <grpcpp/impl/service_type.h> 26 #include <grpcpp/server_context.h> 27 #include <grpcpp/support/status.h> 28 29 #include "absl/log/absl_check.h" 30 31 namespace grpc { 32 33 namespace internal { 34 /// Common interface for all client side asynchronous streaming. 35 class ClientAsyncStreamingInterface { 36 public: ~ClientAsyncStreamingInterface()37 virtual ~ClientAsyncStreamingInterface() {} 38 39 /// Start the call that was set up by the constructor, but only if the 40 /// constructor was invoked through the "Prepare" API which doesn't actually 41 /// start the call. 42 /// 43 /// It is illegal to start a write-type operation (eg. Write(), WriteLast(), 44 /// WritesDone()) while the `StartCall()` operation has not finished 45 /// (determined by the returning of \a tag). 46 virtual void StartCall(void* tag) = 0; 47 48 /// Request notification of the reading of the initial metadata. Completion 49 /// will be notified by \a tag on the associated completion queue. 50 /// This call is optional, but if it is used, it cannot be used concurrently 51 /// with or after the \a AsyncReaderInterface::Read method. 52 /// 53 /// \param[in] tag Tag identifying this request. 54 virtual void ReadInitialMetadata(void* tag) = 0; 55 56 /// Indicate that the stream is to be finished and request notification for 57 /// when the call has been ended. 58 /// Should not be used concurrently with other operations. 59 /// 60 /// It is appropriate to call this method exactly once when both: 61 /// * the client side has no more message to send 62 /// (this can be declared implicitly by calling this method, or 63 /// explicitly through an earlier call to the <i>WritesDone</i> method 64 /// of the class in use, e.g. \a ClientAsyncWriterInterface::WritesDone or 65 /// \a ClientAsyncReaderWriterInterface::WritesDone). 66 /// * there are no more messages to be received from the server (this can 67 /// be known implicitly by the calling code, or explicitly from an 68 /// earlier call to \a AsyncReaderInterface::Read that yielded a failed 69 /// result, e.g. cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). 70 /// 71 /// The tag will be returned when either: 72 /// - all incoming messages have been read and the server has returned 73 /// a status. 74 /// - the server has returned a non-OK status. 75 /// - the call failed for some reason and the library generated a 76 /// status. 77 /// 78 /// Note that implementations of this method attempt to receive initial 79 /// metadata from the server if initial metadata hasn't yet been received. 80 /// 81 /// \param[in] tag Tag identifying this request. 82 /// \param[out] status To be updated with the operation status. 83 virtual void Finish(grpc::Status* status, void* tag) = 0; 84 }; 85 86 /// An interface that yields a sequence of messages of type \a R. 87 template <class R> 88 class AsyncReaderInterface { 89 public: ~AsyncReaderInterface()90 virtual ~AsyncReaderInterface() {} 91 92 /// Read a message of type \a R into \a msg. Completion will be notified by \a 93 /// tag on the associated completion queue. 94 /// This is thread-safe with respect to \a Write or \a WritesDone methods. It 95 /// should not be called concurrently with other streaming APIs 96 /// on the same stream. It is not meaningful to call it concurrently 97 /// with another \a AsyncReaderInterface::Read on the same stream since reads 98 /// on the same stream are delivered in order. 99 /// 100 /// \param[out] msg Where to eventually store the read message. 101 /// \param[in] tag The tag identifying the operation. 102 /// 103 /// Side effect: note that this method attempt to receive initial metadata for 104 /// a stream if it hasn't yet been received. 105 virtual void Read(R* msg, void* tag) = 0; 106 }; 107 108 /// An interface that can be fed a sequence of messages of type \a W. 109 template <class W> 110 class AsyncWriterInterface { 111 public: ~AsyncWriterInterface()112 virtual ~AsyncWriterInterface() {} 113 114 /// Request the writing of \a msg with identifying tag \a tag. 115 /// 116 /// Only one write may be outstanding at any given time. This means that 117 /// after calling Write, one must wait to receive \a tag from the completion 118 /// queue BEFORE calling Write again. 119 /// This is thread-safe with respect to \a AsyncReaderInterface::Read 120 /// 121 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to 122 /// to deallocate once Write returns. 123 /// 124 /// \param[in] msg The message to be written. 125 /// \param[in] tag The tag identifying the operation. 126 virtual void Write(const W& msg, void* tag) = 0; 127 128 /// Request the writing of \a msg using WriteOptions \a options with 129 /// identifying tag \a tag. 130 /// 131 /// Only one write may be outstanding at any given time. This means that 132 /// after calling Write, one must wait to receive \a tag from the completion 133 /// queue BEFORE calling Write again. 134 /// WriteOptions \a options is used to set the write options of this message. 135 /// This is thread-safe with respect to \a AsyncReaderInterface::Read 136 /// 137 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to 138 /// to deallocate once Write returns. 139 /// 140 /// \param[in] msg The message to be written. 141 /// \param[in] options The WriteOptions to be used to write this message. 142 /// \param[in] tag The tag identifying the operation. 143 virtual void Write(const W& msg, grpc::WriteOptions options, void* tag) = 0; 144 145 /// Request the writing of \a msg and coalesce it with the writing 146 /// of trailing metadata, using WriteOptions \a options with 147 /// identifying tag \a tag. 148 /// 149 /// For client, WriteLast is equivalent of performing Write and 150 /// WritesDone in a single step. 151 /// For server, WriteLast buffers the \a msg. The writing of \a msg is held 152 /// until Finish is called, where \a msg and trailing metadata are coalesced 153 /// and write is initiated. Note that WriteLast can only buffer \a msg up to 154 /// the flow control window size. If \a msg size is larger than the window 155 /// size, it will be sent on wire without buffering. 156 /// 157 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to 158 /// to deallocate once Write returns. 159 /// 160 /// \param[in] msg The message to be written. 161 /// \param[in] options The WriteOptions to be used to write this message. 162 /// \param[in] tag The tag identifying the operation. WriteLast(const W & msg,grpc::WriteOptions options,void * tag)163 void WriteLast(const W& msg, grpc::WriteOptions options, void* tag) { 164 Write(msg, options.set_last_message(), tag); 165 } 166 }; 167 168 } // namespace internal 169 170 template <class R> 171 class ClientAsyncReaderInterface 172 : public internal::ClientAsyncStreamingInterface, 173 public internal::AsyncReaderInterface<R> {}; 174 175 namespace internal { 176 template <class R> 177 class ClientAsyncReaderFactory { 178 public: 179 /// Create a stream object. 180 /// Write the first request out if \a start is set. 181 /// \a tag will be notified on \a cq when the call has been started and 182 /// \a request has been written out. If \a start is not set, \a tag must be 183 /// nullptr and the actual call must be initiated by StartCall 184 /// Note that \a context will be used to fill in custom initial metadata 185 /// used to send to the server when starting the call. 186 template <class W> Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request,bool start,void * tag)187 static ClientAsyncReader<R>* Create(grpc::ChannelInterface* channel, 188 grpc::CompletionQueue* cq, 189 const grpc::internal::RpcMethod& method, 190 grpc::ClientContext* context, 191 const W& request, bool start, void* tag) { 192 grpc::internal::Call call = channel->CreateCall(method, context, cq); 193 return new ( 194 grpc_call_arena_alloc(call.call(), sizeof(ClientAsyncReader<R>))) 195 ClientAsyncReader<R>(call, context, request, start, tag); 196 } 197 }; 198 } // namespace internal 199 200 /// Async client-side API for doing server-streaming RPCs, 201 /// where the incoming message stream coming from the server has 202 /// messages of type \a R. 203 template <class R> 204 class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { 205 public: 206 // always allocated against a call arena, no memory free required delete(void *,std::size_t size)207 static void operator delete(void* /*ptr*/, std::size_t size) { 208 ABSL_CHECK_EQ(size, sizeof(ClientAsyncReader)); 209 } 210 211 // This operator should never be called as the memory should be freed as part 212 // of the arena destruction. It only exists to provide a matching operator 213 // delete to the operator new so that some compilers will not complain (see 214 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 215 // there are no tests catching the compiler warning. delete(void *,void *)216 static void operator delete(void*, void*) { ABSL_CHECK(false); } 217 StartCall(void * tag)218 void StartCall(void* tag) override { 219 ABSL_CHECK(!started_); 220 started_ = true; 221 StartCallInternal(tag); 222 } 223 224 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata 225 /// method for semantics. 226 /// 227 /// Side effect: 228 /// - upon receiving initial metadata from the server, 229 /// the \a ClientContext associated with this call is updated, and the 230 /// calling code can access the received metadata through the 231 /// \a ClientContext. ReadInitialMetadata(void * tag)232 void ReadInitialMetadata(void* tag) override { 233 ABSL_CHECK(started_); 234 ABSL_CHECK(!context_->initial_metadata_received_); 235 236 meta_ops_.set_output_tag(tag); 237 meta_ops_.RecvInitialMetadata(context_); 238 call_.PerformOps(&meta_ops_); 239 } 240 Read(R * msg,void * tag)241 void Read(R* msg, void* tag) override { 242 ABSL_CHECK(started_); 243 read_ops_.set_output_tag(tag); 244 if (!context_->initial_metadata_received_) { 245 read_ops_.RecvInitialMetadata(context_); 246 } 247 read_ops_.RecvMessage(msg); 248 call_.PerformOps(&read_ops_); 249 } 250 251 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. 252 /// 253 /// Side effect: 254 /// - the \a ClientContext associated with this call is updated with 255 /// possible initial and trailing metadata received from the server. Finish(grpc::Status * status,void * tag)256 void Finish(grpc::Status* status, void* tag) override { 257 ABSL_CHECK(started_); 258 finish_ops_.set_output_tag(tag); 259 if (!context_->initial_metadata_received_) { 260 finish_ops_.RecvInitialMetadata(context_); 261 } 262 finish_ops_.ClientRecvStatus(context_, status); 263 call_.PerformOps(&finish_ops_); 264 } 265 266 private: 267 friend class internal::ClientAsyncReaderFactory<R>; 268 template <class W> ClientAsyncReader(grpc::internal::Call call,grpc::ClientContext * context,const W & request,bool start,void * tag)269 ClientAsyncReader(grpc::internal::Call call, grpc::ClientContext* context, 270 const W& request, bool start, void* tag) 271 : context_(context), call_(call), started_(start) { 272 // TODO(ctiller): don't assert 273 ABSL_CHECK(init_ops_.SendMessage(request).ok()); 274 init_ops_.ClientSendClose(); 275 if (start) { 276 StartCallInternal(tag); 277 } else { 278 ABSL_CHECK(tag == nullptr); 279 } 280 } 281 StartCallInternal(void * tag)282 void StartCallInternal(void* tag) { 283 init_ops_.SendInitialMetadata(&context_->send_initial_metadata_, 284 context_->initial_metadata_flags()); 285 init_ops_.set_output_tag(tag); 286 call_.PerformOps(&init_ops_); 287 } 288 289 grpc::ClientContext* context_; 290 grpc::internal::Call call_; 291 bool started_; 292 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 293 grpc::internal::CallOpSendMessage, 294 grpc::internal::CallOpClientSendClose> 295 init_ops_; 296 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> 297 meta_ops_; 298 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 299 grpc::internal::CallOpRecvMessage<R>> 300 read_ops_; 301 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 302 grpc::internal::CallOpClientRecvStatus> 303 finish_ops_; 304 }; 305 306 /// Common interface for client side asynchronous writing. 307 template <class W> 308 class ClientAsyncWriterInterface 309 : public internal::ClientAsyncStreamingInterface, 310 public internal::AsyncWriterInterface<W> { 311 public: 312 /// Signal the client is done with the writes (half-close the client stream). 313 /// Thread-safe with respect to \a AsyncReaderInterface::Read 314 /// 315 /// \param[in] tag The tag identifying the operation. 316 virtual void WritesDone(void* tag) = 0; 317 }; 318 319 namespace internal { 320 template <class W> 321 class ClientAsyncWriterFactory { 322 public: 323 /// Create a stream object. 324 /// Start the RPC if \a start is set 325 /// \a tag will be notified on \a cq when the call has been started (i.e. 326 /// initial metadata sent) and \a request has been written out. 327 /// If \a start is not set, \a tag must be nullptr and the actual call 328 /// must be initiated by StartCall 329 /// Note that \a context will be used to fill in custom initial metadata 330 /// used to send to the server when starting the call. 331 /// \a response will be filled in with the single expected response 332 /// message from the server upon a successful call to the \a Finish 333 /// method of this instance. 334 template <class R> Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,R * response,bool start,void * tag)335 static ClientAsyncWriter<W>* Create(grpc::ChannelInterface* channel, 336 grpc::CompletionQueue* cq, 337 const grpc::internal::RpcMethod& method, 338 grpc::ClientContext* context, R* response, 339 bool start, void* tag) { 340 grpc::internal::Call call = channel->CreateCall(method, context, cq); 341 return new ( 342 grpc_call_arena_alloc(call.call(), sizeof(ClientAsyncWriter<W>))) 343 ClientAsyncWriter<W>(call, context, response, start, tag); 344 } 345 }; 346 } // namespace internal 347 348 /// Async API on the client side for doing client-streaming RPCs, 349 /// where the outgoing message stream going to the server contains 350 /// messages of type \a W. 351 template <class W> 352 class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { 353 public: 354 // always allocated against a call arena, no memory free required delete(void *,std::size_t size)355 static void operator delete(void* /*ptr*/, std::size_t size) { 356 ABSL_CHECK_EQ(size, sizeof(ClientAsyncWriter)); 357 } 358 359 // This operator should never be called as the memory should be freed as part 360 // of the arena destruction. It only exists to provide a matching operator 361 // delete to the operator new so that some compilers will not complain (see 362 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 363 // there are no tests catching the compiler warning. delete(void *,void *)364 static void operator delete(void*, void*) { ABSL_CHECK(false); } 365 StartCall(void * tag)366 void StartCall(void* tag) override { 367 ABSL_CHECK(!started_); 368 started_ = true; 369 StartCallInternal(tag); 370 } 371 372 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method for 373 /// semantics. 374 /// 375 /// Side effect: 376 /// - upon receiving initial metadata from the server, the \a ClientContext 377 /// associated with this call is updated, and the calling code can access 378 /// the received metadata through the \a ClientContext. ReadInitialMetadata(void * tag)379 void ReadInitialMetadata(void* tag) override { 380 ABSL_CHECK(started_); 381 ABSL_CHECK(!context_->initial_metadata_received_); 382 383 meta_ops_.set_output_tag(tag); 384 meta_ops_.RecvInitialMetadata(context_); 385 call_.PerformOps(&meta_ops_); 386 } 387 Write(const W & msg,void * tag)388 void Write(const W& msg, void* tag) override { 389 ABSL_CHECK(started_); 390 write_ops_.set_output_tag(tag); 391 // TODO(ctiller): don't assert 392 ABSL_CHECK(write_ops_.SendMessage(msg).ok()); 393 call_.PerformOps(&write_ops_); 394 } 395 Write(const W & msg,grpc::WriteOptions options,void * tag)396 void Write(const W& msg, grpc::WriteOptions options, void* tag) override { 397 ABSL_CHECK(started_); 398 write_ops_.set_output_tag(tag); 399 if (options.is_last_message()) { 400 options.set_buffer_hint(); 401 write_ops_.ClientSendClose(); 402 } 403 // TODO(ctiller): don't assert 404 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok()); 405 call_.PerformOps(&write_ops_); 406 } 407 WritesDone(void * tag)408 void WritesDone(void* tag) override { 409 ABSL_CHECK(started_); 410 write_ops_.set_output_tag(tag); 411 write_ops_.ClientSendClose(); 412 call_.PerformOps(&write_ops_); 413 } 414 415 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. 416 /// 417 /// Side effect: 418 /// - the \a ClientContext associated with this call is updated with 419 /// possible initial and trailing metadata received from the server. 420 /// - attempts to fill in the \a response parameter passed to this class's 421 /// constructor with the server's response message. Finish(grpc::Status * status,void * tag)422 void Finish(grpc::Status* status, void* tag) override { 423 ABSL_CHECK(started_); 424 finish_ops_.set_output_tag(tag); 425 if (!context_->initial_metadata_received_) { 426 finish_ops_.RecvInitialMetadata(context_); 427 } 428 finish_ops_.ClientRecvStatus(context_, status); 429 call_.PerformOps(&finish_ops_); 430 } 431 432 private: 433 friend class internal::ClientAsyncWriterFactory<W>; 434 template <class R> ClientAsyncWriter(grpc::internal::Call call,grpc::ClientContext * context,R * response,bool start,void * tag)435 ClientAsyncWriter(grpc::internal::Call call, grpc::ClientContext* context, 436 R* response, bool start, void* tag) 437 : context_(context), call_(call), started_(start) { 438 finish_ops_.RecvMessage(response); 439 finish_ops_.AllowNoMessage(); 440 if (start) { 441 StartCallInternal(tag); 442 } else { 443 ABSL_CHECK(tag == nullptr); 444 } 445 } 446 StartCallInternal(void * tag)447 void StartCallInternal(void* tag) { 448 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, 449 context_->initial_metadata_flags()); 450 // if corked bit is set in context, we just keep the initial metadata 451 // buffered up to coalesce with later message send. No op is performed. 452 if (!context_->initial_metadata_corked_) { 453 write_ops_.set_output_tag(tag); 454 call_.PerformOps(&write_ops_); 455 } 456 } 457 458 grpc::ClientContext* context_; 459 grpc::internal::Call call_; 460 bool started_; 461 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> 462 meta_ops_; 463 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 464 grpc::internal::CallOpSendMessage, 465 grpc::internal::CallOpClientSendClose> 466 write_ops_; 467 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 468 grpc::internal::CallOpGenericRecvMessage, 469 grpc::internal::CallOpClientRecvStatus> 470 finish_ops_; 471 }; 472 473 /// Async client-side interface for bi-directional streaming, 474 /// where the client-to-server message stream has messages of type \a W, 475 /// and the server-to-client message stream has messages of type \a R. 476 template <class W, class R> 477 class ClientAsyncReaderWriterInterface 478 : public internal::ClientAsyncStreamingInterface, 479 public internal::AsyncWriterInterface<W>, 480 public internal::AsyncReaderInterface<R> { 481 public: 482 /// Signal the client is done with the writes (half-close the client stream). 483 /// Thread-safe with respect to \a AsyncReaderInterface::Read 484 /// 485 /// \param[in] tag The tag identifying the operation. 486 virtual void WritesDone(void* tag) = 0; 487 }; 488 489 namespace internal { 490 template <class W, class R> 491 class ClientAsyncReaderWriterFactory { 492 public: 493 /// Create a stream object. 494 /// Start the RPC request if \a start is set. 495 /// \a tag will be notified on \a cq when the call has been started (i.e. 496 /// initial metadata sent). If \a start is not set, \a tag must be 497 /// nullptr and the actual call must be initiated by StartCall 498 /// Note that \a context will be used to fill in custom initial metadata 499 /// used to send to the server when starting the call. Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,bool start,void * tag)500 static ClientAsyncReaderWriter<W, R>* Create( 501 grpc::ChannelInterface* channel, grpc::CompletionQueue* cq, 502 const grpc::internal::RpcMethod& method, grpc::ClientContext* context, 503 bool start, void* tag) { 504 grpc::internal::Call call = channel->CreateCall(method, context, cq); 505 506 return new (grpc_call_arena_alloc(call.call(), 507 sizeof(ClientAsyncReaderWriter<W, R>))) 508 ClientAsyncReaderWriter<W, R>(call, context, start, tag); 509 } 510 }; 511 } // namespace internal 512 513 /// Async client-side interface for bi-directional streaming, 514 /// where the outgoing message stream going to the server 515 /// has messages of type \a W, and the incoming message stream coming 516 /// from the server has messages of type \a R. 517 template <class W, class R> 518 class ClientAsyncReaderWriter final 519 : public ClientAsyncReaderWriterInterface<W, R> { 520 public: 521 // always allocated against a call arena, no memory free required delete(void *,std::size_t size)522 static void operator delete(void* /*ptr*/, std::size_t size) { 523 ABSL_CHECK_EQ(size, sizeof(ClientAsyncReaderWriter)); 524 } 525 526 // This operator should never be called as the memory should be freed as part 527 // of the arena destruction. It only exists to provide a matching operator 528 // delete to the operator new so that some compilers will not complain (see 529 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 530 // there are no tests catching the compiler warning. delete(void *,void *)531 static void operator delete(void*, void*) { ABSL_CHECK(false); } 532 StartCall(void * tag)533 void StartCall(void* tag) override { 534 ABSL_CHECK(!started_); 535 started_ = true; 536 StartCallInternal(tag); 537 } 538 539 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method 540 /// for semantics of this method. 541 /// 542 /// Side effect: 543 /// - upon receiving initial metadata from the server, the \a ClientContext 544 /// is updated with it, and then the receiving initial metadata can 545 /// be accessed through this \a ClientContext. ReadInitialMetadata(void * tag)546 void ReadInitialMetadata(void* tag) override { 547 ABSL_CHECK(started_); 548 ABSL_CHECK(!context_->initial_metadata_received_); 549 550 meta_ops_.set_output_tag(tag); 551 meta_ops_.RecvInitialMetadata(context_); 552 call_.PerformOps(&meta_ops_); 553 } 554 Read(R * msg,void * tag)555 void Read(R* msg, void* tag) override { 556 ABSL_CHECK(started_); 557 read_ops_.set_output_tag(tag); 558 if (!context_->initial_metadata_received_) { 559 read_ops_.RecvInitialMetadata(context_); 560 } 561 read_ops_.RecvMessage(msg); 562 call_.PerformOps(&read_ops_); 563 } 564 Write(const W & msg,void * tag)565 void Write(const W& msg, void* tag) override { 566 ABSL_CHECK(started_); 567 write_ops_.set_output_tag(tag); 568 // TODO(ctiller): don't assert 569 ABSL_CHECK(write_ops_.SendMessage(msg).ok()); 570 call_.PerformOps(&write_ops_); 571 } 572 Write(const W & msg,grpc::WriteOptions options,void * tag)573 void Write(const W& msg, grpc::WriteOptions options, void* tag) override { 574 ABSL_CHECK(started_); 575 write_ops_.set_output_tag(tag); 576 if (options.is_last_message()) { 577 options.set_buffer_hint(); 578 write_ops_.ClientSendClose(); 579 } 580 // TODO(ctiller): don't assert 581 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok()); 582 call_.PerformOps(&write_ops_); 583 } 584 WritesDone(void * tag)585 void WritesDone(void* tag) override { 586 ABSL_CHECK(started_); 587 write_ops_.set_output_tag(tag); 588 write_ops_.ClientSendClose(); 589 call_.PerformOps(&write_ops_); 590 } 591 592 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. 593 /// Side effect 594 /// - the \a ClientContext associated with this call is updated with 595 /// possible initial and trailing metadata sent from the server. Finish(grpc::Status * status,void * tag)596 void Finish(grpc::Status* status, void* tag) override { 597 ABSL_CHECK(started_); 598 finish_ops_.set_output_tag(tag); 599 if (!context_->initial_metadata_received_) { 600 finish_ops_.RecvInitialMetadata(context_); 601 } 602 finish_ops_.ClientRecvStatus(context_, status); 603 call_.PerformOps(&finish_ops_); 604 } 605 606 private: 607 friend class internal::ClientAsyncReaderWriterFactory<W, R>; ClientAsyncReaderWriter(grpc::internal::Call call,grpc::ClientContext * context,bool start,void * tag)608 ClientAsyncReaderWriter(grpc::internal::Call call, 609 grpc::ClientContext* context, bool start, void* tag) 610 : context_(context), call_(call), started_(start) { 611 if (start) { 612 StartCallInternal(tag); 613 } else { 614 ABSL_CHECK(tag == nullptr); 615 } 616 } 617 StartCallInternal(void * tag)618 void StartCallInternal(void* tag) { 619 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, 620 context_->initial_metadata_flags()); 621 // if corked bit is set in context, we just keep the initial metadata 622 // buffered up to coalesce with later message send. No op is performed. 623 if (!context_->initial_metadata_corked_) { 624 write_ops_.set_output_tag(tag); 625 call_.PerformOps(&write_ops_); 626 } 627 } 628 629 grpc::ClientContext* context_; 630 grpc::internal::Call call_; 631 bool started_; 632 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> 633 meta_ops_; 634 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 635 grpc::internal::CallOpRecvMessage<R>> 636 read_ops_; 637 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 638 grpc::internal::CallOpSendMessage, 639 grpc::internal::CallOpClientSendClose> 640 write_ops_; 641 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 642 grpc::internal::CallOpClientRecvStatus> 643 finish_ops_; 644 }; 645 646 template <class W, class R> 647 class ServerAsyncReaderInterface 648 : public grpc::internal::ServerAsyncStreamingInterface, 649 public internal::AsyncReaderInterface<R> { 650 public: 651 /// Indicate that the stream is to be finished with a certain status code 652 /// and also send out \a msg response to the client. 653 /// Request notification for when the server has sent the response and the 654 /// appropriate signals to the client to end the call. 655 /// Should not be used concurrently with other operations. 656 /// 657 /// It is appropriate to call this method when: 658 /// * all messages from the client have been received (either known 659 /// implicitly, or explicitly because a previous 660 /// \a AsyncReaderInterface::Read operation with a non-ok result, 661 /// e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). 662 /// 663 /// This operation will end when the server has finished sending out initial 664 /// metadata (if not sent already), response message, and status, or if 665 /// some failure occurred when trying to do so. 666 /// 667 /// gRPC doesn't take ownership or a reference to \a msg or \a status, so it 668 /// is safe to deallocate once Finish returns. 669 /// 670 /// \param[in] tag Tag identifying this request. 671 /// \param[in] status To be sent to the client as the result of this call. 672 /// \param[in] msg To be sent to the client as the response for this call. 673 virtual void Finish(const W& msg, const grpc::Status& status, void* tag) = 0; 674 675 /// Indicate that the stream is to be finished with a certain 676 /// non-OK status code. 677 /// Request notification for when the server has sent the appropriate 678 /// signals to the client to end the call. 679 /// Should not be used concurrently with other operations. 680 /// 681 /// This call is meant to end the call with some error, and can be called at 682 /// any point that the server would like to "fail" the call (though note 683 /// this shouldn't be called concurrently with any other "sending" call, like 684 /// \a AsyncWriterInterface::Write). 685 /// 686 /// This operation will end when the server has finished sending out initial 687 /// metadata (if not sent already), and status, or if some failure occurred 688 /// when trying to do so. 689 /// 690 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 691 /// to deallocate once FinishWithError returns. 692 /// 693 /// \param[in] tag Tag identifying this request. 694 /// \param[in] status To be sent to the client as the result of this call. 695 /// - Note: \a status must have a non-OK code. 696 virtual void FinishWithError(const grpc::Status& status, void* tag) = 0; 697 }; 698 699 /// Async server-side API for doing client-streaming RPCs, 700 /// where the incoming message stream from the client has messages of type \a R, 701 /// and the single response message sent from the server is type \a W. 702 template <class W, class R> 703 class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { 704 public: ServerAsyncReader(grpc::ServerContext * ctx)705 explicit ServerAsyncReader(grpc::ServerContext* ctx) 706 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 707 708 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 709 /// 710 /// Implicit input parameter: 711 /// - The initial metadata that will be sent to the client from this op will 712 /// be taken from the \a ServerContext associated with the call. SendInitialMetadata(void * tag)713 void SendInitialMetadata(void* tag) override { 714 ABSL_CHECK(!ctx_->sent_initial_metadata_); 715 716 meta_ops_.set_output_tag(tag); 717 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 718 ctx_->initial_metadata_flags()); 719 if (ctx_->compression_level_set()) { 720 meta_ops_.set_compression_level(ctx_->compression_level()); 721 } 722 ctx_->sent_initial_metadata_ = true; 723 call_.PerformOps(&meta_ops_); 724 } 725 Read(R * msg,void * tag)726 void Read(R* msg, void* tag) override { 727 read_ops_.set_output_tag(tag); 728 read_ops_.RecvMessage(msg); 729 call_.PerformOps(&read_ops_); 730 } 731 732 /// See the \a ServerAsyncReaderInterface.Read method for semantics 733 /// 734 /// Side effect: 735 /// - also sends initial metadata if not already sent. 736 /// - uses the \a ServerContext associated with this call to send possible 737 /// initial and trailing metadata. 738 /// 739 /// Note: \a msg is not sent if \a status has a non-OK code. 740 /// 741 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 742 /// is safe to deallocate once Finish returns. Finish(const W & msg,const grpc::Status & status,void * tag)743 void Finish(const W& msg, const grpc::Status& status, void* tag) override { 744 finish_ops_.set_output_tag(tag); 745 if (!ctx_->sent_initial_metadata_) { 746 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 747 ctx_->initial_metadata_flags()); 748 if (ctx_->compression_level_set()) { 749 finish_ops_.set_compression_level(ctx_->compression_level()); 750 } 751 ctx_->sent_initial_metadata_ = true; 752 } 753 // The response is dropped if the status is not OK. 754 if (status.ok()) { 755 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, 756 finish_ops_.SendMessage(msg)); 757 } else { 758 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 759 } 760 call_.PerformOps(&finish_ops_); 761 } 762 763 /// See the \a ServerAsyncReaderInterface.Read method for semantics 764 /// 765 /// Side effect: 766 /// - also sends initial metadata if not already sent. 767 /// - uses the \a ServerContext associated with this call to send possible 768 /// initial and trailing metadata. 769 /// 770 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 771 /// to deallocate once FinishWithError returns. FinishWithError(const grpc::Status & status,void * tag)772 void FinishWithError(const grpc::Status& status, void* tag) override { 773 ABSL_CHECK(!status.ok()); 774 finish_ops_.set_output_tag(tag); 775 if (!ctx_->sent_initial_metadata_) { 776 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 777 ctx_->initial_metadata_flags()); 778 if (ctx_->compression_level_set()) { 779 finish_ops_.set_compression_level(ctx_->compression_level()); 780 } 781 ctx_->sent_initial_metadata_ = true; 782 } 783 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 784 call_.PerformOps(&finish_ops_); 785 } 786 787 private: BindCall(grpc::internal::Call * call)788 void BindCall(grpc::internal::Call* call) override { call_ = *call; } 789 790 grpc::internal::Call call_; 791 grpc::ServerContext* ctx_; 792 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 793 meta_ops_; 794 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> read_ops_; 795 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 796 grpc::internal::CallOpSendMessage, 797 grpc::internal::CallOpServerSendStatus> 798 finish_ops_; 799 }; 800 801 template <class W> 802 class ServerAsyncWriterInterface 803 : public grpc::internal::ServerAsyncStreamingInterface, 804 public internal::AsyncWriterInterface<W> { 805 public: 806 /// Indicate that the stream is to be finished with a certain status code. 807 /// Request notification for when the server has sent the appropriate 808 /// signals to the client to end the call. 809 /// Should not be used concurrently with other operations. 810 /// 811 /// It is appropriate to call this method when either: 812 /// * all messages from the client have been received (either known 813 /// implicitly, or explicitly because a previous \a 814 /// AsyncReaderInterface::Read operation with a non-ok 815 /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'. 816 /// * it is desired to end the call early with some non-OK status code. 817 /// 818 /// This operation will end when the server has finished sending out initial 819 /// metadata (if not sent already), response message, and status, or if 820 /// some failure occurred when trying to do so. 821 /// 822 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 823 /// to deallocate once Finish returns. 824 /// 825 /// \param[in] tag Tag identifying this request. 826 /// \param[in] status To be sent to the client as the result of this call. 827 virtual void Finish(const grpc::Status& status, void* tag) = 0; 828 829 /// Request the writing of \a msg and coalesce it with trailing metadata which 830 /// contains \a status, using WriteOptions options with 831 /// identifying tag \a tag. 832 /// 833 /// WriteAndFinish is equivalent of performing WriteLast and Finish 834 /// in a single step. 835 /// 836 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 837 /// is safe to deallocate once WriteAndFinish returns. 838 /// 839 /// \param[in] msg The message to be written. 840 /// \param[in] options The WriteOptions to be used to write this message. 841 /// \param[in] status The Status that server returns to client. 842 /// \param[in] tag The tag identifying the operation. 843 virtual void WriteAndFinish(const W& msg, grpc::WriteOptions options, 844 const grpc::Status& status, void* tag) = 0; 845 }; 846 847 /// Async server-side API for doing server streaming RPCs, 848 /// where the outgoing message stream from the server has messages of type \a W. 849 template <class W> 850 class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { 851 public: ServerAsyncWriter(grpc::ServerContext * ctx)852 explicit ServerAsyncWriter(grpc::ServerContext* ctx) 853 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 854 855 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 856 /// 857 /// Implicit input parameter: 858 /// - The initial metadata that will be sent to the client from this op will 859 /// be taken from the \a ServerContext associated with the call. 860 /// 861 /// \param[in] tag Tag identifying this request. SendInitialMetadata(void * tag)862 void SendInitialMetadata(void* tag) override { 863 ABSL_CHECK(!ctx_->sent_initial_metadata_); 864 865 meta_ops_.set_output_tag(tag); 866 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 867 ctx_->initial_metadata_flags()); 868 if (ctx_->compression_level_set()) { 869 meta_ops_.set_compression_level(ctx_->compression_level()); 870 } 871 ctx_->sent_initial_metadata_ = true; 872 call_.PerformOps(&meta_ops_); 873 } 874 Write(const W & msg,void * tag)875 void Write(const W& msg, void* tag) override { 876 write_ops_.set_output_tag(tag); 877 EnsureInitialMetadataSent(&write_ops_); 878 // TODO(ctiller): don't assert 879 ABSL_CHECK(write_ops_.SendMessage(msg).ok()); 880 call_.PerformOps(&write_ops_); 881 } 882 Write(const W & msg,grpc::WriteOptions options,void * tag)883 void Write(const W& msg, grpc::WriteOptions options, void* tag) override { 884 write_ops_.set_output_tag(tag); 885 if (options.is_last_message()) { 886 options.set_buffer_hint(); 887 } 888 889 EnsureInitialMetadataSent(&write_ops_); 890 // TODO(ctiller): don't assert 891 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok()); 892 call_.PerformOps(&write_ops_); 893 } 894 895 /// See the \a ServerAsyncWriterInterface.WriteAndFinish method for semantics. 896 /// 897 /// Implicit input parameter: 898 /// - the \a ServerContext associated with this call is used 899 /// for sending trailing (and initial) metadata to the client. 900 /// 901 /// Note: \a status must have an OK code. 902 /// 903 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 904 /// is safe to deallocate once WriteAndFinish returns. WriteAndFinish(const W & msg,grpc::WriteOptions options,const grpc::Status & status,void * tag)905 void WriteAndFinish(const W& msg, grpc::WriteOptions options, 906 const grpc::Status& status, void* tag) override { 907 write_ops_.set_output_tag(tag); 908 EnsureInitialMetadataSent(&write_ops_); 909 options.set_buffer_hint(); 910 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok()); 911 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 912 call_.PerformOps(&write_ops_); 913 } 914 915 /// See the \a ServerAsyncWriterInterface.Finish method for semantics. 916 /// 917 /// Implicit input parameter: 918 /// - the \a ServerContext associated with this call is used for sending 919 /// trailing (and initial if not already sent) metadata to the client. 920 /// 921 /// Note: there are no restrictions are the code of 922 /// \a status,it may be non-OK 923 /// 924 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 925 /// to deallocate once Finish returns. Finish(const grpc::Status & status,void * tag)926 void Finish(const grpc::Status& status, void* tag) override { 927 finish_ops_.set_output_tag(tag); 928 EnsureInitialMetadataSent(&finish_ops_); 929 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 930 call_.PerformOps(&finish_ops_); 931 } 932 933 private: BindCall(grpc::internal::Call * call)934 void BindCall(grpc::internal::Call* call) override { call_ = *call; } 935 936 template <class T> EnsureInitialMetadataSent(T * ops)937 void EnsureInitialMetadataSent(T* ops) { 938 if (!ctx_->sent_initial_metadata_) { 939 ops->SendInitialMetadata(&ctx_->initial_metadata_, 940 ctx_->initial_metadata_flags()); 941 if (ctx_->compression_level_set()) { 942 ops->set_compression_level(ctx_->compression_level()); 943 } 944 ctx_->sent_initial_metadata_ = true; 945 } 946 } 947 948 grpc::internal::Call call_; 949 grpc::ServerContext* ctx_; 950 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 951 meta_ops_; 952 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 953 grpc::internal::CallOpSendMessage, 954 grpc::internal::CallOpServerSendStatus> 955 write_ops_; 956 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 957 grpc::internal::CallOpServerSendStatus> 958 finish_ops_; 959 }; 960 961 /// Server-side interface for asynchronous bi-directional streaming. 962 template <class W, class R> 963 class ServerAsyncReaderWriterInterface 964 : public grpc::internal::ServerAsyncStreamingInterface, 965 public internal::AsyncWriterInterface<W>, 966 public internal::AsyncReaderInterface<R> { 967 public: 968 /// Indicate that the stream is to be finished with a certain status code. 969 /// Request notification for when the server has sent the appropriate 970 /// signals to the client to end the call. 971 /// Should not be used concurrently with other operations. 972 /// 973 /// It is appropriate to call this method when either: 974 /// * all messages from the client have been received (either known 975 /// implicitly, or explicitly because a previous \a 976 /// AsyncReaderInterface::Read operation 977 /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' 978 /// with 'false'. 979 /// * it is desired to end the call early with some non-OK status code. 980 /// 981 /// This operation will end when the server has finished sending out initial 982 /// metadata (if not sent already), response message, and status, or if some 983 /// failure occurred when trying to do so. 984 /// 985 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 986 /// to deallocate once Finish returns. 987 /// 988 /// \param[in] tag Tag identifying this request. 989 /// \param[in] status To be sent to the client as the result of this call. 990 virtual void Finish(const grpc::Status& status, void* tag) = 0; 991 992 /// Request the writing of \a msg and coalesce it with trailing metadata which 993 /// contains \a status, using WriteOptions options with 994 /// identifying tag \a tag. 995 /// 996 /// WriteAndFinish is equivalent of performing WriteLast and Finish in a 997 /// single step. 998 /// 999 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 1000 /// is safe to deallocate once WriteAndFinish returns. 1001 /// 1002 /// \param[in] msg The message to be written. 1003 /// \param[in] options The WriteOptions to be used to write this message. 1004 /// \param[in] status The Status that server returns to client. 1005 /// \param[in] tag The tag identifying the operation. 1006 virtual void WriteAndFinish(const W& msg, grpc::WriteOptions options, 1007 const grpc::Status& status, void* tag) = 0; 1008 }; 1009 1010 /// Async server-side API for doing bidirectional streaming RPCs, 1011 /// where the incoming message stream coming from the client has messages of 1012 /// type \a R, and the outgoing message stream coming from the server has 1013 /// messages of type \a W. 1014 template <class W, class R> 1015 class ServerAsyncReaderWriter final 1016 : public ServerAsyncReaderWriterInterface<W, R> { 1017 public: ServerAsyncReaderWriter(grpc::ServerContext * ctx)1018 explicit ServerAsyncReaderWriter(grpc::ServerContext* ctx) 1019 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 1020 1021 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 1022 /// 1023 /// Implicit input parameter: 1024 /// - The initial metadata that will be sent to the client from this op will 1025 /// be taken from the \a ServerContext associated with the call. 1026 /// 1027 /// \param[in] tag Tag identifying this request. SendInitialMetadata(void * tag)1028 void SendInitialMetadata(void* tag) override { 1029 ABSL_CHECK(!ctx_->sent_initial_metadata_); 1030 1031 meta_ops_.set_output_tag(tag); 1032 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 1033 ctx_->initial_metadata_flags()); 1034 if (ctx_->compression_level_set()) { 1035 meta_ops_.set_compression_level(ctx_->compression_level()); 1036 } 1037 ctx_->sent_initial_metadata_ = true; 1038 call_.PerformOps(&meta_ops_); 1039 } 1040 Read(R * msg,void * tag)1041 void Read(R* msg, void* tag) override { 1042 read_ops_.set_output_tag(tag); 1043 read_ops_.RecvMessage(msg); 1044 call_.PerformOps(&read_ops_); 1045 } 1046 Write(const W & msg,void * tag)1047 void Write(const W& msg, void* tag) override { 1048 write_ops_.set_output_tag(tag); 1049 EnsureInitialMetadataSent(&write_ops_); 1050 // TODO(ctiller): don't assert 1051 ABSL_CHECK(write_ops_.SendMessage(msg).ok()); 1052 call_.PerformOps(&write_ops_); 1053 } 1054 Write(const W & msg,grpc::WriteOptions options,void * tag)1055 void Write(const W& msg, grpc::WriteOptions options, void* tag) override { 1056 write_ops_.set_output_tag(tag); 1057 if (options.is_last_message()) { 1058 options.set_buffer_hint(); 1059 } 1060 EnsureInitialMetadataSent(&write_ops_); 1061 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok()); 1062 call_.PerformOps(&write_ops_); 1063 } 1064 1065 /// See the \a ServerAsyncReaderWriterInterface.WriteAndFinish 1066 /// method for semantics. 1067 /// 1068 /// Implicit input parameter: 1069 /// - the \a ServerContext associated with this call is used 1070 /// for sending trailing (and initial) metadata to the client. 1071 /// 1072 /// Note: \a status must have an OK code. 1073 // 1074 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 1075 /// is safe to deallocate once WriteAndFinish returns. WriteAndFinish(const W & msg,grpc::WriteOptions options,const grpc::Status & status,void * tag)1076 void WriteAndFinish(const W& msg, grpc::WriteOptions options, 1077 const grpc::Status& status, void* tag) override { 1078 write_ops_.set_output_tag(tag); 1079 EnsureInitialMetadataSent(&write_ops_); 1080 options.set_buffer_hint(); 1081 ABSL_CHECK(write_ops_.SendMessage(msg, options).ok()); 1082 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 1083 call_.PerformOps(&write_ops_); 1084 } 1085 1086 /// See the \a ServerAsyncReaderWriterInterface.Finish method for semantics. 1087 /// 1088 /// Implicit input parameter: 1089 /// - the \a ServerContext associated with this call is used for sending 1090 /// trailing (and initial if not already sent) metadata to the client. 1091 /// 1092 /// Note: there are no restrictions are the code of \a status, 1093 /// it may be non-OK 1094 // 1095 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 1096 /// to deallocate once Finish returns. Finish(const grpc::Status & status,void * tag)1097 void Finish(const grpc::Status& status, void* tag) override { 1098 finish_ops_.set_output_tag(tag); 1099 EnsureInitialMetadataSent(&finish_ops_); 1100 1101 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 1102 call_.PerformOps(&finish_ops_); 1103 } 1104 1105 private: 1106 friend class grpc::Server; 1107 BindCall(grpc::internal::Call * call)1108 void BindCall(grpc::internal::Call* call) override { call_ = *call; } 1109 1110 template <class T> EnsureInitialMetadataSent(T * ops)1111 void EnsureInitialMetadataSent(T* ops) { 1112 if (!ctx_->sent_initial_metadata_) { 1113 ops->SendInitialMetadata(&ctx_->initial_metadata_, 1114 ctx_->initial_metadata_flags()); 1115 if (ctx_->compression_level_set()) { 1116 ops->set_compression_level(ctx_->compression_level()); 1117 } 1118 ctx_->sent_initial_metadata_ = true; 1119 } 1120 } 1121 1122 grpc::internal::Call call_; 1123 grpc::ServerContext* ctx_; 1124 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 1125 meta_ops_; 1126 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> read_ops_; 1127 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 1128 grpc::internal::CallOpSendMessage, 1129 grpc::internal::CallOpServerSendStatus> 1130 write_ops_; 1131 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 1132 grpc::internal::CallOpServerSendStatus> 1133 finish_ops_; 1134 }; 1135 1136 } // namespace grpc 1137 1138 #endif // GRPCPP_SUPPORT_ASYNC_STREAM_H 1139