1 /* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H 19 #define GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H 20 21 #include <grpcpp/impl/codegen/call.h> 22 #include <grpcpp/impl/codegen/channel_interface.h> 23 #include <grpcpp/impl/codegen/core_codegen_interface.h> 24 #include <grpcpp/impl/codegen/server_context.h> 25 #include <grpcpp/impl/codegen/service_type.h> 26 #include <grpcpp/impl/codegen/status.h> 27 28 namespace grpc { 29 30 namespace internal { 31 /// Common interface for all client side asynchronous streaming. 32 class ClientAsyncStreamingInterface { 33 public: ~ClientAsyncStreamingInterface()34 virtual ~ClientAsyncStreamingInterface() {} 35 36 /// Start the call that was set up by the constructor, but only if the 37 /// constructor was invoked through the "Prepare" API which doesn't actually 38 /// start the call 39 virtual void StartCall(void* tag) = 0; 40 41 /// Request notification of the reading of the initial metadata. Completion 42 /// will be notified by \a tag on the associated completion queue. 43 /// This call is optional, but if it is used, it cannot be used concurrently 44 /// with or after the \a AsyncReaderInterface::Read method. 45 /// 46 /// \param[in] tag Tag identifying this request. 47 virtual void ReadInitialMetadata(void* tag) = 0; 48 49 /// Indicate that the stream is to be finished and request notification for 50 /// when the call has been ended. 51 /// Should not be used concurrently with other operations. 52 /// 53 /// It is appropriate to call this method exactly once when both: 54 /// * the client side has no more message to send 55 /// (this can be declared implicitly by calling this method, or 56 /// explicitly through an earlier call to the <i>WritesDone</i> method 57 /// of the class in use, e.g. \a ClientAsyncWriterInterface::WritesDone or 58 /// \a ClientAsyncReaderWriterInterface::WritesDone). 59 /// * there are no more messages to be received from the server (this can 60 /// be known implicitly by the calling code, or explicitly from an 61 /// earlier call to \a AsyncReaderInterface::Read that yielded a failed 62 /// result, e.g. cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). 63 /// 64 /// The tag will be returned when either: 65 /// - all incoming messages have been read and the server has returned 66 /// a status. 67 /// - the server has returned a non-OK status. 68 /// - the call failed for some reason and the library generated a 69 /// status. 70 /// 71 /// Note that implementations of this method attempt to receive initial 72 /// metadata from the server if initial metadata hasn't yet been received. 73 /// 74 /// \param[in] tag Tag identifying this request. 75 /// \param[out] status To be updated with the operation status. 76 virtual void Finish(::grpc::Status* status, void* tag) = 0; 77 }; 78 79 /// An interface that yields a sequence of messages of type \a R. 80 template <class R> 81 class AsyncReaderInterface { 82 public: ~AsyncReaderInterface()83 virtual ~AsyncReaderInterface() {} 84 85 /// Read a message of type \a R into \a msg. Completion will be notified by \a 86 /// tag on the associated completion queue. 87 /// This is thread-safe with respect to \a Write or \a WritesDone methods. It 88 /// should not be called concurrently with other streaming APIs 89 /// on the same stream. It is not meaningful to call it concurrently 90 /// with another \a AsyncReaderInterface::Read on the same stream since reads 91 /// on the same stream are delivered in order. 92 /// 93 /// \param[out] msg Where to eventually store the read message. 94 /// \param[in] tag The tag identifying the operation. 95 /// 96 /// Side effect: note that this method attempt to receive initial metadata for 97 /// a stream if it hasn't yet been received. 98 virtual void Read(R* msg, void* tag) = 0; 99 }; 100 101 /// An interface that can be fed a sequence of messages of type \a W. 102 template <class W> 103 class AsyncWriterInterface { 104 public: ~AsyncWriterInterface()105 virtual ~AsyncWriterInterface() {} 106 107 /// Request the writing of \a msg with identifying tag \a tag. 108 /// 109 /// Only one write may be outstanding at any given time. This means that 110 /// after calling Write, one must wait to receive \a tag from the completion 111 /// queue BEFORE calling Write again. 112 /// This is thread-safe with respect to \a AsyncReaderInterface::Read 113 /// 114 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to 115 /// to deallocate once Write returns. 116 /// 117 /// \param[in] msg The message to be written. 118 /// \param[in] tag The tag identifying the operation. 119 virtual void Write(const W& msg, void* tag) = 0; 120 121 /// Request the writing of \a msg using WriteOptions \a options with 122 /// identifying tag \a tag. 123 /// 124 /// Only one write may be outstanding at any given time. This means that 125 /// after calling Write, one must wait to receive \a tag from the completion 126 /// queue BEFORE calling Write again. 127 /// WriteOptions \a options is used to set the write options of this message. 128 /// This is thread-safe with respect to \a AsyncReaderInterface::Read 129 /// 130 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to 131 /// to deallocate once Write returns. 132 /// 133 /// \param[in] msg The message to be written. 134 /// \param[in] options The WriteOptions to be used to write this message. 135 /// \param[in] tag The tag identifying the operation. 136 virtual void Write(const W& msg, ::grpc::WriteOptions options, void* tag) = 0; 137 138 /// Request the writing of \a msg and coalesce it with the writing 139 /// of trailing metadata, using WriteOptions \a options with 140 /// identifying tag \a tag. 141 /// 142 /// For client, WriteLast is equivalent of performing Write and 143 /// WritesDone in a single step. 144 /// For server, WriteLast buffers the \a msg. The writing of \a msg is held 145 /// until Finish is called, where \a msg and trailing metadata are coalesced 146 /// and write is initiated. Note that WriteLast can only buffer \a msg up to 147 /// the flow control window size. If \a msg size is larger than the window 148 /// size, it will be sent on wire without buffering. 149 /// 150 /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to 151 /// to deallocate once Write returns. 152 /// 153 /// \param[in] msg The message to be written. 154 /// \param[in] options The WriteOptions to be used to write this message. 155 /// \param[in] tag The tag identifying the operation. WriteLast(const W & msg,::grpc::WriteOptions options,void * tag)156 void WriteLast(const W& msg, ::grpc::WriteOptions options, void* tag) { 157 Write(msg, options.set_last_message(), tag); 158 } 159 }; 160 161 } // namespace internal 162 163 template <class R> 164 class ClientAsyncReaderInterface 165 : public internal::ClientAsyncStreamingInterface, 166 public internal::AsyncReaderInterface<R> {}; 167 168 namespace internal { 169 template <class R> 170 class ClientAsyncReaderFactory { 171 public: 172 /// Create a stream object. 173 /// Write the first request out if \a start is set. 174 /// \a tag will be notified on \a cq when the call has been started and 175 /// \a request has been written out. If \a start is not set, \a tag must be 176 /// nullptr and the actual call must be initiated by StartCall 177 /// Note that \a context will be used to fill in custom initial metadata 178 /// used to send to the server when starting the call. 179 template <class W> Create(::grpc::ChannelInterface * channel,::grpc::CompletionQueue * cq,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const W & request,bool start,void * tag)180 static ClientAsyncReader<R>* Create(::grpc::ChannelInterface* channel, 181 ::grpc::CompletionQueue* cq, 182 const ::grpc::internal::RpcMethod& method, 183 ::grpc::ClientContext* context, 184 const W& request, bool start, void* tag) { 185 ::grpc::internal::Call call = channel->CreateCall(method, context, cq); 186 return new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 187 call.call(), sizeof(ClientAsyncReader<R>))) 188 ClientAsyncReader<R>(call, context, request, start, tag); 189 } 190 }; 191 } // namespace internal 192 193 /// Async client-side API for doing server-streaming RPCs, 194 /// where the incoming message stream coming from the server has 195 /// messages of type \a R. 196 template <class R> 197 class ClientAsyncReader final : public ClientAsyncReaderInterface<R> { 198 public: 199 // always allocated against a call arena, no memory free required delete(void *,std::size_t size)200 static void operator delete(void* /*ptr*/, std::size_t size) { 201 GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncReader)); 202 } 203 204 // This operator should never be called as the memory should be freed as part 205 // of the arena destruction. It only exists to provide a matching operator 206 // delete to the operator new so that some compilers will not complain (see 207 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 208 // there are no tests catching the compiler warning. delete(void *,void *)209 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } 210 StartCall(void * tag)211 void StartCall(void* tag) override { 212 GPR_CODEGEN_ASSERT(!started_); 213 started_ = true; 214 StartCallInternal(tag); 215 } 216 217 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata 218 /// method for semantics. 219 /// 220 /// Side effect: 221 /// - upon receiving initial metadata from the server, 222 /// the \a ClientContext associated with this call is updated, and the 223 /// calling code can access the received metadata through the 224 /// \a ClientContext. ReadInitialMetadata(void * tag)225 void ReadInitialMetadata(void* tag) override { 226 GPR_CODEGEN_ASSERT(started_); 227 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 228 229 meta_ops_.set_output_tag(tag); 230 meta_ops_.RecvInitialMetadata(context_); 231 call_.PerformOps(&meta_ops_); 232 } 233 Read(R * msg,void * tag)234 void Read(R* msg, void* tag) override { 235 GPR_CODEGEN_ASSERT(started_); 236 read_ops_.set_output_tag(tag); 237 if (!context_->initial_metadata_received_) { 238 read_ops_.RecvInitialMetadata(context_); 239 } 240 read_ops_.RecvMessage(msg); 241 call_.PerformOps(&read_ops_); 242 } 243 244 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. 245 /// 246 /// Side effect: 247 /// - the \a ClientContext associated with this call is updated with 248 /// possible initial and trailing metadata received from the server. Finish(::grpc::Status * status,void * tag)249 void Finish(::grpc::Status* status, void* tag) override { 250 GPR_CODEGEN_ASSERT(started_); 251 finish_ops_.set_output_tag(tag); 252 if (!context_->initial_metadata_received_) { 253 finish_ops_.RecvInitialMetadata(context_); 254 } 255 finish_ops_.ClientRecvStatus(context_, status); 256 call_.PerformOps(&finish_ops_); 257 } 258 259 private: 260 friend class internal::ClientAsyncReaderFactory<R>; 261 template <class W> ClientAsyncReader(::grpc::internal::Call call,::grpc::ClientContext * context,const W & request,bool start,void * tag)262 ClientAsyncReader(::grpc::internal::Call call, ::grpc::ClientContext* context, 263 const W& request, bool start, void* tag) 264 : context_(context), call_(call), started_(start) { 265 // TODO(ctiller): don't assert 266 GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok()); 267 init_ops_.ClientSendClose(); 268 if (start) { 269 StartCallInternal(tag); 270 } else { 271 GPR_CODEGEN_ASSERT(tag == nullptr); 272 } 273 } 274 StartCallInternal(void * tag)275 void StartCallInternal(void* tag) { 276 init_ops_.SendInitialMetadata(&context_->send_initial_metadata_, 277 context_->initial_metadata_flags()); 278 init_ops_.set_output_tag(tag); 279 call_.PerformOps(&init_ops_); 280 } 281 282 ::grpc::ClientContext* context_; 283 ::grpc::internal::Call call_; 284 bool started_; 285 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 286 ::grpc::internal::CallOpSendMessage, 287 ::grpc::internal::CallOpClientSendClose> 288 init_ops_; 289 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 290 meta_ops_; 291 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 292 ::grpc::internal::CallOpRecvMessage<R>> 293 read_ops_; 294 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 295 ::grpc::internal::CallOpClientRecvStatus> 296 finish_ops_; 297 }; 298 299 /// Common interface for client side asynchronous writing. 300 template <class W> 301 class ClientAsyncWriterInterface 302 : public internal::ClientAsyncStreamingInterface, 303 public internal::AsyncWriterInterface<W> { 304 public: 305 /// Signal the client is done with the writes (half-close the client stream). 306 /// Thread-safe with respect to \a AsyncReaderInterface::Read 307 /// 308 /// \param[in] tag The tag identifying the operation. 309 virtual void WritesDone(void* tag) = 0; 310 }; 311 312 namespace internal { 313 template <class W> 314 class ClientAsyncWriterFactory { 315 public: 316 /// Create a stream object. 317 /// Start the RPC if \a start is set 318 /// \a tag will be notified on \a cq when the call has been started (i.e. 319 /// intitial metadata sent) and \a request has been written out. 320 /// If \a start is not set, \a tag must be nullptr and the actual call 321 /// must be initiated by StartCall 322 /// Note that \a context will be used to fill in custom initial metadata 323 /// used to send to the server when starting the call. 324 /// \a response will be filled in with the single expected response 325 /// message from the server upon a successful call to the \a Finish 326 /// method of this instance. 327 template <class R> Create(::grpc::ChannelInterface * channel,::grpc::CompletionQueue * cq,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,R * response,bool start,void * tag)328 static ClientAsyncWriter<W>* Create(::grpc::ChannelInterface* channel, 329 ::grpc::CompletionQueue* cq, 330 const ::grpc::internal::RpcMethod& method, 331 ::grpc::ClientContext* context, 332 R* response, bool start, void* tag) { 333 ::grpc::internal::Call call = channel->CreateCall(method, context, cq); 334 return new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 335 call.call(), sizeof(ClientAsyncWriter<W>))) 336 ClientAsyncWriter<W>(call, context, response, start, tag); 337 } 338 }; 339 } // namespace internal 340 341 /// Async API on the client side for doing client-streaming RPCs, 342 /// where the outgoing message stream going to the server contains 343 /// messages of type \a W. 344 template <class W> 345 class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> { 346 public: 347 // always allocated against a call arena, no memory free required delete(void *,std::size_t size)348 static void operator delete(void* /*ptr*/, std::size_t size) { 349 GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncWriter)); 350 } 351 352 // This operator should never be called as the memory should be freed as part 353 // of the arena destruction. It only exists to provide a matching operator 354 // delete to the operator new so that some compilers will not complain (see 355 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 356 // there are no tests catching the compiler warning. delete(void *,void *)357 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } 358 StartCall(void * tag)359 void StartCall(void* tag) override { 360 GPR_CODEGEN_ASSERT(!started_); 361 started_ = true; 362 StartCallInternal(tag); 363 } 364 365 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method for 366 /// semantics. 367 /// 368 /// Side effect: 369 /// - upon receiving initial metadata from the server, the \a ClientContext 370 /// associated with this call is updated, and the calling code can access 371 /// the received metadata through the \a ClientContext. ReadInitialMetadata(void * tag)372 void ReadInitialMetadata(void* tag) override { 373 GPR_CODEGEN_ASSERT(started_); 374 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 375 376 meta_ops_.set_output_tag(tag); 377 meta_ops_.RecvInitialMetadata(context_); 378 call_.PerformOps(&meta_ops_); 379 } 380 Write(const W & msg,void * tag)381 void Write(const W& msg, void* tag) override { 382 GPR_CODEGEN_ASSERT(started_); 383 write_ops_.set_output_tag(tag); 384 // TODO(ctiller): don't assert 385 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); 386 call_.PerformOps(&write_ops_); 387 } 388 Write(const W & msg,::grpc::WriteOptions options,void * tag)389 void Write(const W& msg, ::grpc::WriteOptions options, void* tag) override { 390 GPR_CODEGEN_ASSERT(started_); 391 write_ops_.set_output_tag(tag); 392 if (options.is_last_message()) { 393 options.set_buffer_hint(); 394 write_ops_.ClientSendClose(); 395 } 396 // TODO(ctiller): don't assert 397 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 398 call_.PerformOps(&write_ops_); 399 } 400 WritesDone(void * tag)401 void WritesDone(void* tag) override { 402 GPR_CODEGEN_ASSERT(started_); 403 write_ops_.set_output_tag(tag); 404 write_ops_.ClientSendClose(); 405 call_.PerformOps(&write_ops_); 406 } 407 408 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. 409 /// 410 /// Side effect: 411 /// - the \a ClientContext associated with this call is updated with 412 /// possible initial and trailing metadata received from the server. 413 /// - attempts to fill in the \a response parameter passed to this class's 414 /// constructor with the server's response message. Finish(::grpc::Status * status,void * tag)415 void Finish(::grpc::Status* status, void* tag) override { 416 GPR_CODEGEN_ASSERT(started_); 417 finish_ops_.set_output_tag(tag); 418 if (!context_->initial_metadata_received_) { 419 finish_ops_.RecvInitialMetadata(context_); 420 } 421 finish_ops_.ClientRecvStatus(context_, status); 422 call_.PerformOps(&finish_ops_); 423 } 424 425 private: 426 friend class internal::ClientAsyncWriterFactory<W>; 427 template <class R> ClientAsyncWriter(::grpc::internal::Call call,::grpc::ClientContext * context,R * response,bool start,void * tag)428 ClientAsyncWriter(::grpc::internal::Call call, ::grpc::ClientContext* context, 429 R* response, bool start, void* tag) 430 : context_(context), call_(call), started_(start) { 431 finish_ops_.RecvMessage(response); 432 finish_ops_.AllowNoMessage(); 433 if (start) { 434 StartCallInternal(tag); 435 } else { 436 GPR_CODEGEN_ASSERT(tag == nullptr); 437 } 438 } 439 StartCallInternal(void * tag)440 void StartCallInternal(void* tag) { 441 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, 442 context_->initial_metadata_flags()); 443 // if corked bit is set in context, we just keep the initial metadata 444 // buffered up to coalesce with later message send. No op is performed. 445 if (!context_->initial_metadata_corked_) { 446 write_ops_.set_output_tag(tag); 447 call_.PerformOps(&write_ops_); 448 } 449 } 450 451 ::grpc::ClientContext* context_; 452 ::grpc::internal::Call call_; 453 bool started_; 454 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 455 meta_ops_; 456 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 457 ::grpc::internal::CallOpSendMessage, 458 ::grpc::internal::CallOpClientSendClose> 459 write_ops_; 460 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 461 ::grpc::internal::CallOpGenericRecvMessage, 462 ::grpc::internal::CallOpClientRecvStatus> 463 finish_ops_; 464 }; 465 466 /// Async client-side interface for bi-directional streaming, 467 /// where the client-to-server message stream has messages of type \a W, 468 /// and the server-to-client message stream has messages of type \a R. 469 template <class W, class R> 470 class ClientAsyncReaderWriterInterface 471 : public internal::ClientAsyncStreamingInterface, 472 public internal::AsyncWriterInterface<W>, 473 public internal::AsyncReaderInterface<R> { 474 public: 475 /// Signal the client is done with the writes (half-close the client stream). 476 /// Thread-safe with respect to \a AsyncReaderInterface::Read 477 /// 478 /// \param[in] tag The tag identifying the operation. 479 virtual void WritesDone(void* tag) = 0; 480 }; 481 482 namespace internal { 483 template <class W, class R> 484 class ClientAsyncReaderWriterFactory { 485 public: 486 /// Create a stream object. 487 /// Start the RPC request if \a start is set. 488 /// \a tag will be notified on \a cq when the call has been started (i.e. 489 /// intitial metadata sent). If \a start is not set, \a tag must be 490 /// nullptr and the actual call must be initiated by StartCall 491 /// Note that \a context will be used to fill in custom initial metadata 492 /// used to send to the server when starting the call. Create(::grpc::ChannelInterface * channel,::grpc::CompletionQueue * cq,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,bool start,void * tag)493 static ClientAsyncReaderWriter<W, R>* Create( 494 ::grpc::ChannelInterface* channel, ::grpc::CompletionQueue* cq, 495 const ::grpc::internal::RpcMethod& method, ::grpc::ClientContext* context, 496 bool start, void* tag) { 497 ::grpc::internal::Call call = channel->CreateCall(method, context, cq); 498 499 return new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 500 call.call(), sizeof(ClientAsyncReaderWriter<W, R>))) 501 ClientAsyncReaderWriter<W, R>(call, context, start, tag); 502 } 503 }; 504 } // namespace internal 505 506 /// Async client-side interface for bi-directional streaming, 507 /// where the outgoing message stream going to the server 508 /// has messages of type \a W, and the incoming message stream coming 509 /// from the server has messages of type \a R. 510 template <class W, class R> 511 class ClientAsyncReaderWriter final 512 : public ClientAsyncReaderWriterInterface<W, R> { 513 public: 514 // always allocated against a call arena, no memory free required delete(void *,std::size_t size)515 static void operator delete(void* /*ptr*/, std::size_t size) { 516 GPR_CODEGEN_ASSERT(size == sizeof(ClientAsyncReaderWriter)); 517 } 518 519 // This operator should never be called as the memory should be freed as part 520 // of the arena destruction. It only exists to provide a matching operator 521 // delete to the operator new so that some compilers will not complain (see 522 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this 523 // there are no tests catching the compiler warning. delete(void *,void *)524 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); } 525 StartCall(void * tag)526 void StartCall(void* tag) override { 527 GPR_CODEGEN_ASSERT(!started_); 528 started_ = true; 529 StartCallInternal(tag); 530 } 531 532 /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method 533 /// for semantics of this method. 534 /// 535 /// Side effect: 536 /// - upon receiving initial metadata from the server, the \a ClientContext 537 /// is updated with it, and then the receiving initial metadata can 538 /// be accessed through this \a ClientContext. ReadInitialMetadata(void * tag)539 void ReadInitialMetadata(void* tag) override { 540 GPR_CODEGEN_ASSERT(started_); 541 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 542 543 meta_ops_.set_output_tag(tag); 544 meta_ops_.RecvInitialMetadata(context_); 545 call_.PerformOps(&meta_ops_); 546 } 547 Read(R * msg,void * tag)548 void Read(R* msg, void* tag) override { 549 GPR_CODEGEN_ASSERT(started_); 550 read_ops_.set_output_tag(tag); 551 if (!context_->initial_metadata_received_) { 552 read_ops_.RecvInitialMetadata(context_); 553 } 554 read_ops_.RecvMessage(msg); 555 call_.PerformOps(&read_ops_); 556 } 557 Write(const W & msg,void * tag)558 void Write(const W& msg, void* tag) override { 559 GPR_CODEGEN_ASSERT(started_); 560 write_ops_.set_output_tag(tag); 561 // TODO(ctiller): don't assert 562 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); 563 call_.PerformOps(&write_ops_); 564 } 565 Write(const W & msg,::grpc::WriteOptions options,void * tag)566 void Write(const W& msg, ::grpc::WriteOptions options, void* tag) override { 567 GPR_CODEGEN_ASSERT(started_); 568 write_ops_.set_output_tag(tag); 569 if (options.is_last_message()) { 570 options.set_buffer_hint(); 571 write_ops_.ClientSendClose(); 572 } 573 // TODO(ctiller): don't assert 574 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 575 call_.PerformOps(&write_ops_); 576 } 577 WritesDone(void * tag)578 void WritesDone(void* tag) override { 579 GPR_CODEGEN_ASSERT(started_); 580 write_ops_.set_output_tag(tag); 581 write_ops_.ClientSendClose(); 582 call_.PerformOps(&write_ops_); 583 } 584 585 /// See the \a ClientAsyncStreamingInterface.Finish method for semantics. 586 /// Side effect 587 /// - the \a ClientContext associated with this call is updated with 588 /// possible initial and trailing metadata sent from the server. Finish(::grpc::Status * status,void * tag)589 void Finish(::grpc::Status* status, void* tag) override { 590 GPR_CODEGEN_ASSERT(started_); 591 finish_ops_.set_output_tag(tag); 592 if (!context_->initial_metadata_received_) { 593 finish_ops_.RecvInitialMetadata(context_); 594 } 595 finish_ops_.ClientRecvStatus(context_, status); 596 call_.PerformOps(&finish_ops_); 597 } 598 599 private: 600 friend class internal::ClientAsyncReaderWriterFactory<W, R>; ClientAsyncReaderWriter(::grpc::internal::Call call,::grpc::ClientContext * context,bool start,void * tag)601 ClientAsyncReaderWriter(::grpc::internal::Call call, 602 ::grpc::ClientContext* context, bool start, void* tag) 603 : context_(context), call_(call), started_(start) { 604 if (start) { 605 StartCallInternal(tag); 606 } else { 607 GPR_CODEGEN_ASSERT(tag == nullptr); 608 } 609 } 610 StartCallInternal(void * tag)611 void StartCallInternal(void* tag) { 612 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_, 613 context_->initial_metadata_flags()); 614 // if corked bit is set in context, we just keep the initial metadata 615 // buffered up to coalesce with later message send. No op is performed. 616 if (!context_->initial_metadata_corked_) { 617 write_ops_.set_output_tag(tag); 618 call_.PerformOps(&write_ops_); 619 } 620 } 621 622 ::grpc::ClientContext* context_; 623 ::grpc::internal::Call call_; 624 bool started_; 625 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 626 meta_ops_; 627 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 628 ::grpc::internal::CallOpRecvMessage<R>> 629 read_ops_; 630 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 631 ::grpc::internal::CallOpSendMessage, 632 ::grpc::internal::CallOpClientSendClose> 633 write_ops_; 634 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 635 ::grpc::internal::CallOpClientRecvStatus> 636 finish_ops_; 637 }; 638 639 template <class W, class R> 640 class ServerAsyncReaderInterface 641 : public ::grpc::internal::ServerAsyncStreamingInterface, 642 public internal::AsyncReaderInterface<R> { 643 public: 644 /// Indicate that the stream is to be finished with a certain status code 645 /// and also send out \a msg response to the client. 646 /// Request notification for when the server has sent the response and the 647 /// appropriate signals to the client to end the call. 648 /// Should not be used concurrently with other operations. 649 /// 650 /// It is appropriate to call this method when: 651 /// * all messages from the client have been received (either known 652 /// implictly, or explicitly because a previous 653 /// \a AsyncReaderInterface::Read operation with a non-ok result, 654 /// e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'). 655 /// 656 /// This operation will end when the server has finished sending out initial 657 /// metadata (if not sent already), response message, and status, or if 658 /// some failure occurred when trying to do so. 659 /// 660 /// gRPC doesn't take ownership or a reference to \a msg or \a status, so it 661 /// is safe to deallocate once Finish returns. 662 /// 663 /// \param[in] tag Tag identifying this request. 664 /// \param[in] status To be sent to the client as the result of this call. 665 /// \param[in] msg To be sent to the client as the response for this call. 666 virtual void Finish(const W& msg, const ::grpc::Status& status, 667 void* tag) = 0; 668 669 /// Indicate that the stream is to be finished with a certain 670 /// non-OK status code. 671 /// Request notification for when the server has sent the appropriate 672 /// signals to the client to end the call. 673 /// Should not be used concurrently with other operations. 674 /// 675 /// This call is meant to end the call with some error, and can be called at 676 /// any point that the server would like to "fail" the call (though note 677 /// this shouldn't be called concurrently with any other "sending" call, like 678 /// \a AsyncWriterInterface::Write). 679 /// 680 /// This operation will end when the server has finished sending out initial 681 /// metadata (if not sent already), and status, or if some failure occurred 682 /// when trying to do so. 683 /// 684 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 685 /// to deallocate once FinishWithError returns. 686 /// 687 /// \param[in] tag Tag identifying this request. 688 /// \param[in] status To be sent to the client as the result of this call. 689 /// - Note: \a status must have a non-OK code. 690 virtual void FinishWithError(const ::grpc::Status& status, void* tag) = 0; 691 }; 692 693 /// Async server-side API for doing client-streaming RPCs, 694 /// where the incoming message stream from the client has messages of type \a R, 695 /// and the single response message sent from the server is type \a W. 696 template <class W, class R> 697 class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> { 698 public: ServerAsyncReader(::grpc::ServerContext * ctx)699 explicit ServerAsyncReader(::grpc::ServerContext* ctx) 700 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 701 702 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 703 /// 704 /// Implicit input parameter: 705 /// - The initial metadata that will be sent to the client from this op will 706 /// be taken from the \a ServerContext associated with the call. SendInitialMetadata(void * tag)707 void SendInitialMetadata(void* tag) override { 708 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 709 710 meta_ops_.set_output_tag(tag); 711 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 712 ctx_->initial_metadata_flags()); 713 if (ctx_->compression_level_set()) { 714 meta_ops_.set_compression_level(ctx_->compression_level()); 715 } 716 ctx_->sent_initial_metadata_ = true; 717 call_.PerformOps(&meta_ops_); 718 } 719 Read(R * msg,void * tag)720 void Read(R* msg, void* tag) override { 721 read_ops_.set_output_tag(tag); 722 read_ops_.RecvMessage(msg); 723 call_.PerformOps(&read_ops_); 724 } 725 726 /// See the \a ServerAsyncReaderInterface.Read method for semantics 727 /// 728 /// Side effect: 729 /// - also sends initial metadata if not alreay sent. 730 /// - uses the \a ServerContext associated with this call to send possible 731 /// initial and trailing metadata. 732 /// 733 /// Note: \a msg is not sent if \a status has a non-OK code. 734 /// 735 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 736 /// is safe to deallocate once Finish returns. Finish(const W & msg,const::grpc::Status & status,void * tag)737 void Finish(const W& msg, const ::grpc::Status& status, void* tag) override { 738 finish_ops_.set_output_tag(tag); 739 if (!ctx_->sent_initial_metadata_) { 740 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 741 ctx_->initial_metadata_flags()); 742 if (ctx_->compression_level_set()) { 743 finish_ops_.set_compression_level(ctx_->compression_level()); 744 } 745 ctx_->sent_initial_metadata_ = true; 746 } 747 // The response is dropped if the status is not OK. 748 if (status.ok()) { 749 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, 750 finish_ops_.SendMessage(msg)); 751 } else { 752 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 753 } 754 call_.PerformOps(&finish_ops_); 755 } 756 757 /// See the \a ServerAsyncReaderInterface.Read method for semantics 758 /// 759 /// Side effect: 760 /// - also sends initial metadata if not alreay sent. 761 /// - uses the \a ServerContext associated with this call to send possible 762 /// initial and trailing metadata. 763 /// 764 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 765 /// to deallocate once FinishWithError returns. FinishWithError(const::grpc::Status & status,void * tag)766 void FinishWithError(const ::grpc::Status& status, void* tag) override { 767 GPR_CODEGEN_ASSERT(!status.ok()); 768 finish_ops_.set_output_tag(tag); 769 if (!ctx_->sent_initial_metadata_) { 770 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 771 ctx_->initial_metadata_flags()); 772 if (ctx_->compression_level_set()) { 773 finish_ops_.set_compression_level(ctx_->compression_level()); 774 } 775 ctx_->sent_initial_metadata_ = true; 776 } 777 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 778 call_.PerformOps(&finish_ops_); 779 } 780 781 private: BindCall(::grpc::internal::Call * call)782 void BindCall(::grpc::internal::Call* call) override { call_ = *call; } 783 784 ::grpc::internal::Call call_; 785 ::grpc::ServerContext* ctx_; 786 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 787 meta_ops_; 788 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_; 789 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 790 ::grpc::internal::CallOpSendMessage, 791 ::grpc::internal::CallOpServerSendStatus> 792 finish_ops_; 793 }; 794 795 template <class W> 796 class ServerAsyncWriterInterface 797 : public ::grpc::internal::ServerAsyncStreamingInterface, 798 public internal::AsyncWriterInterface<W> { 799 public: 800 /// Indicate that the stream is to be finished with a certain status code. 801 /// Request notification for when the server has sent the appropriate 802 /// signals to the client to end the call. 803 /// Should not be used concurrently with other operations. 804 /// 805 /// It is appropriate to call this method when either: 806 /// * all messages from the client have been received (either known 807 /// implictly, or explicitly because a previous \a 808 /// AsyncReaderInterface::Read operation with a non-ok 809 /// result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'. 810 /// * it is desired to end the call early with some non-OK status code. 811 /// 812 /// This operation will end when the server has finished sending out initial 813 /// metadata (if not sent already), response message, and status, or if 814 /// some failure occurred when trying to do so. 815 /// 816 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 817 /// to deallocate once Finish returns. 818 /// 819 /// \param[in] tag Tag identifying this request. 820 /// \param[in] status To be sent to the client as the result of this call. 821 virtual void Finish(const ::grpc::Status& status, void* tag) = 0; 822 823 /// Request the writing of \a msg and coalesce it with trailing metadata which 824 /// contains \a status, using WriteOptions options with 825 /// identifying tag \a tag. 826 /// 827 /// WriteAndFinish is equivalent of performing WriteLast and Finish 828 /// in a single step. 829 /// 830 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 831 /// is safe to deallocate once WriteAndFinish returns. 832 /// 833 /// \param[in] msg The message to be written. 834 /// \param[in] options The WriteOptions to be used to write this message. 835 /// \param[in] status The Status that server returns to client. 836 /// \param[in] tag The tag identifying the operation. 837 virtual void WriteAndFinish(const W& msg, ::grpc::WriteOptions options, 838 const ::grpc::Status& status, void* tag) = 0; 839 }; 840 841 /// Async server-side API for doing server streaming RPCs, 842 /// where the outgoing message stream from the server has messages of type \a W. 843 template <class W> 844 class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> { 845 public: ServerAsyncWriter(::grpc::ServerContext * ctx)846 explicit ServerAsyncWriter(::grpc::ServerContext* ctx) 847 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 848 849 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 850 /// 851 /// Implicit input parameter: 852 /// - The initial metadata that will be sent to the client from this op will 853 /// be taken from the \a ServerContext associated with the call. 854 /// 855 /// \param[in] tag Tag identifying this request. SendInitialMetadata(void * tag)856 void SendInitialMetadata(void* tag) override { 857 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 858 859 meta_ops_.set_output_tag(tag); 860 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 861 ctx_->initial_metadata_flags()); 862 if (ctx_->compression_level_set()) { 863 meta_ops_.set_compression_level(ctx_->compression_level()); 864 } 865 ctx_->sent_initial_metadata_ = true; 866 call_.PerformOps(&meta_ops_); 867 } 868 Write(const W & msg,void * tag)869 void Write(const W& msg, void* tag) override { 870 write_ops_.set_output_tag(tag); 871 EnsureInitialMetadataSent(&write_ops_); 872 // TODO(ctiller): don't assert 873 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); 874 call_.PerformOps(&write_ops_); 875 } 876 Write(const W & msg,::grpc::WriteOptions options,void * tag)877 void Write(const W& msg, ::grpc::WriteOptions options, void* tag) override { 878 write_ops_.set_output_tag(tag); 879 if (options.is_last_message()) { 880 options.set_buffer_hint(); 881 } 882 883 EnsureInitialMetadataSent(&write_ops_); 884 // TODO(ctiller): don't assert 885 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 886 call_.PerformOps(&write_ops_); 887 } 888 889 /// See the \a ServerAsyncWriterInterface.WriteAndFinish method for semantics. 890 /// 891 /// Implicit input parameter: 892 /// - the \a ServerContext associated with this call is used 893 /// for sending trailing (and initial) metadata to the client. 894 /// 895 /// Note: \a status must have an OK code. 896 /// 897 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 898 /// is safe to deallocate once WriteAndFinish returns. WriteAndFinish(const W & msg,::grpc::WriteOptions options,const::grpc::Status & status,void * tag)899 void WriteAndFinish(const W& msg, ::grpc::WriteOptions options, 900 const ::grpc::Status& status, void* tag) override { 901 write_ops_.set_output_tag(tag); 902 EnsureInitialMetadataSent(&write_ops_); 903 options.set_buffer_hint(); 904 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 905 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 906 call_.PerformOps(&write_ops_); 907 } 908 909 /// See the \a ServerAsyncWriterInterface.Finish method for semantics. 910 /// 911 /// Implicit input parameter: 912 /// - the \a ServerContext associated with this call is used for sending 913 /// trailing (and initial if not already sent) metadata to the client. 914 /// 915 /// Note: there are no restrictions are the code of 916 /// \a status,it may be non-OK 917 /// 918 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 919 /// to deallocate once Finish returns. Finish(const::grpc::Status & status,void * tag)920 void Finish(const ::grpc::Status& status, void* tag) override { 921 finish_ops_.set_output_tag(tag); 922 EnsureInitialMetadataSent(&finish_ops_); 923 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 924 call_.PerformOps(&finish_ops_); 925 } 926 927 private: BindCall(::grpc::internal::Call * call)928 void BindCall(::grpc::internal::Call* call) override { call_ = *call; } 929 930 template <class T> EnsureInitialMetadataSent(T * ops)931 void EnsureInitialMetadataSent(T* ops) { 932 if (!ctx_->sent_initial_metadata_) { 933 ops->SendInitialMetadata(&ctx_->initial_metadata_, 934 ctx_->initial_metadata_flags()); 935 if (ctx_->compression_level_set()) { 936 ops->set_compression_level(ctx_->compression_level()); 937 } 938 ctx_->sent_initial_metadata_ = true; 939 } 940 } 941 942 ::grpc::internal::Call call_; 943 ::grpc::ServerContext* ctx_; 944 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 945 meta_ops_; 946 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 947 ::grpc::internal::CallOpSendMessage, 948 ::grpc::internal::CallOpServerSendStatus> 949 write_ops_; 950 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 951 ::grpc::internal::CallOpServerSendStatus> 952 finish_ops_; 953 }; 954 955 /// Server-side interface for asynchronous bi-directional streaming. 956 template <class W, class R> 957 class ServerAsyncReaderWriterInterface 958 : public ::grpc::internal::ServerAsyncStreamingInterface, 959 public internal::AsyncWriterInterface<W>, 960 public internal::AsyncReaderInterface<R> { 961 public: 962 /// Indicate that the stream is to be finished with a certain status code. 963 /// Request notification for when the server has sent the appropriate 964 /// signals to the client to end the call. 965 /// Should not be used concurrently with other operations. 966 /// 967 /// It is appropriate to call this method when either: 968 /// * all messages from the client have been received (either known 969 /// implictly, or explicitly because a previous \a 970 /// AsyncReaderInterface::Read operation 971 /// with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' 972 /// with 'false'. 973 /// * it is desired to end the call early with some non-OK status code. 974 /// 975 /// This operation will end when the server has finished sending out initial 976 /// metadata (if not sent already), response message, and status, or if some 977 /// failure occurred when trying to do so. 978 /// 979 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 980 /// to deallocate once Finish returns. 981 /// 982 /// \param[in] tag Tag identifying this request. 983 /// \param[in] status To be sent to the client as the result of this call. 984 virtual void Finish(const ::grpc::Status& status, void* tag) = 0; 985 986 /// Request the writing of \a msg and coalesce it with trailing metadata which 987 /// contains \a status, using WriteOptions options with 988 /// identifying tag \a tag. 989 /// 990 /// WriteAndFinish is equivalent of performing WriteLast and Finish in a 991 /// single step. 992 /// 993 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 994 /// is safe to deallocate once WriteAndFinish returns. 995 /// 996 /// \param[in] msg The message to be written. 997 /// \param[in] options The WriteOptions to be used to write this message. 998 /// \param[in] status The Status that server returns to client. 999 /// \param[in] tag The tag identifying the operation. 1000 virtual void WriteAndFinish(const W& msg, ::grpc::WriteOptions options, 1001 const ::grpc::Status& status, void* tag) = 0; 1002 }; 1003 1004 /// Async server-side API for doing bidirectional streaming RPCs, 1005 /// where the incoming message stream coming from the client has messages of 1006 /// type \a R, and the outgoing message stream coming from the server has 1007 /// messages of type \a W. 1008 template <class W, class R> 1009 class ServerAsyncReaderWriter final 1010 : public ServerAsyncReaderWriterInterface<W, R> { 1011 public: ServerAsyncReaderWriter(::grpc::ServerContext * ctx)1012 explicit ServerAsyncReaderWriter(::grpc::ServerContext* ctx) 1013 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {} 1014 1015 /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics. 1016 /// 1017 /// Implicit input parameter: 1018 /// - The initial metadata that will be sent to the client from this op will 1019 /// be taken from the \a ServerContext associated with the call. 1020 /// 1021 /// \param[in] tag Tag identifying this request. SendInitialMetadata(void * tag)1022 void SendInitialMetadata(void* tag) override { 1023 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 1024 1025 meta_ops_.set_output_tag(tag); 1026 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 1027 ctx_->initial_metadata_flags()); 1028 if (ctx_->compression_level_set()) { 1029 meta_ops_.set_compression_level(ctx_->compression_level()); 1030 } 1031 ctx_->sent_initial_metadata_ = true; 1032 call_.PerformOps(&meta_ops_); 1033 } 1034 Read(R * msg,void * tag)1035 void Read(R* msg, void* tag) override { 1036 read_ops_.set_output_tag(tag); 1037 read_ops_.RecvMessage(msg); 1038 call_.PerformOps(&read_ops_); 1039 } 1040 Write(const W & msg,void * tag)1041 void Write(const W& msg, void* tag) override { 1042 write_ops_.set_output_tag(tag); 1043 EnsureInitialMetadataSent(&write_ops_); 1044 // TODO(ctiller): don't assert 1045 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok()); 1046 call_.PerformOps(&write_ops_); 1047 } 1048 Write(const W & msg,::grpc::WriteOptions options,void * tag)1049 void Write(const W& msg, ::grpc::WriteOptions options, void* tag) override { 1050 write_ops_.set_output_tag(tag); 1051 if (options.is_last_message()) { 1052 options.set_buffer_hint(); 1053 } 1054 EnsureInitialMetadataSent(&write_ops_); 1055 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 1056 call_.PerformOps(&write_ops_); 1057 } 1058 1059 /// See the \a ServerAsyncReaderWriterInterface.WriteAndFinish 1060 /// method for semantics. 1061 /// 1062 /// Implicit input parameter: 1063 /// - the \a ServerContext associated with this call is used 1064 /// for sending trailing (and initial) metadata to the client. 1065 /// 1066 /// Note: \a status must have an OK code. 1067 // 1068 /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it 1069 /// is safe to deallocate once WriteAndFinish returns. WriteAndFinish(const W & msg,::grpc::WriteOptions options,const::grpc::Status & status,void * tag)1070 void WriteAndFinish(const W& msg, ::grpc::WriteOptions options, 1071 const ::grpc::Status& status, void* tag) override { 1072 write_ops_.set_output_tag(tag); 1073 EnsureInitialMetadataSent(&write_ops_); 1074 options.set_buffer_hint(); 1075 GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok()); 1076 write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 1077 call_.PerformOps(&write_ops_); 1078 } 1079 1080 /// See the \a ServerAsyncReaderWriterInterface.Finish method for semantics. 1081 /// 1082 /// Implicit input parameter: 1083 /// - the \a ServerContext associated with this call is used for sending 1084 /// trailing (and initial if not already sent) metadata to the client. 1085 /// 1086 /// Note: there are no restrictions are the code of \a status, 1087 /// it may be non-OK 1088 // 1089 /// gRPC doesn't take ownership or a reference to \a status, so it is safe to 1090 /// to deallocate once Finish returns. Finish(const::grpc::Status & status,void * tag)1091 void Finish(const ::grpc::Status& status, void* tag) override { 1092 finish_ops_.set_output_tag(tag); 1093 EnsureInitialMetadataSent(&finish_ops_); 1094 1095 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status); 1096 call_.PerformOps(&finish_ops_); 1097 } 1098 1099 private: 1100 friend class ::grpc::Server; 1101 BindCall(::grpc::internal::Call * call)1102 void BindCall(::grpc::internal::Call* call) override { call_ = *call; } 1103 1104 template <class T> EnsureInitialMetadataSent(T * ops)1105 void EnsureInitialMetadataSent(T* ops) { 1106 if (!ctx_->sent_initial_metadata_) { 1107 ops->SendInitialMetadata(&ctx_->initial_metadata_, 1108 ctx_->initial_metadata_flags()); 1109 if (ctx_->compression_level_set()) { 1110 ops->set_compression_level(ctx_->compression_level()); 1111 } 1112 ctx_->sent_initial_metadata_ = true; 1113 } 1114 } 1115 1116 ::grpc::internal::Call call_; 1117 ::grpc::ServerContext* ctx_; 1118 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 1119 meta_ops_; 1120 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_; 1121 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 1122 ::grpc::internal::CallOpSendMessage, 1123 ::grpc::internal::CallOpServerSendStatus> 1124 write_ops_; 1125 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 1126 ::grpc::internal::CallOpServerSendStatus> 1127 finish_ops_; 1128 }; 1129 1130 } // namespace grpc 1131 #endif // GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H 1132