1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H 20 #define GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H 21 22 #include <grpcpp/impl/codegen/call.h> 23 #include <grpcpp/impl/codegen/channel_interface.h> 24 #include <grpcpp/impl/codegen/client_context.h> 25 #include <grpcpp/impl/codegen/completion_queue.h> 26 #include <grpcpp/impl/codegen/core_codegen_interface.h> 27 #include <grpcpp/impl/codegen/server_context.h> 28 #include <grpcpp/impl/codegen/service_type.h> 29 #include <grpcpp/impl/codegen/status.h> 30 31 namespace grpc { 32 33 namespace internal { 34 /// Common interface for all synchronous client side streaming. 35 class ClientStreamingInterface { 36 public: ~ClientStreamingInterface()37 virtual ~ClientStreamingInterface() {} 38 39 /// Block waiting until the stream finishes and a final status of the call is 40 /// available. 41 /// 42 /// It is appropriate to call this method when both: 43 /// * the calling code (client-side) has no more message to send 44 /// (this can be declared implicitly by calling this method, or 45 /// explicitly through an earlier call to <i>WritesDone</i> method of the 46 /// class in use, e.g. \a ClientWriterInterface::WritesDone or 47 /// \a ClientReaderWriterInterface::WritesDone). 48 /// * there are no more messages to be received from the server (which can 49 /// be known implicitly, or explicitly from an earlier call to \a 50 /// ReaderInterface::Read that returned "false"). 51 /// 52 /// This function will return either: 53 /// - when all incoming messages have been read and the server has 54 /// returned status. 55 /// - when the server has returned a non-OK status. 56 /// - OR when the call failed for some reason and the library generated a 57 /// status. 58 /// 59 /// Return values: 60 /// - \a Status contains the status code, message and details for the call 61 /// - the \a ClientContext associated with this call is updated with 62 /// possible trailing metadata sent from the server. 63 virtual Status Finish() = 0; 64 }; 65 66 /// Common interface for all synchronous server side streaming. 67 class ServerStreamingInterface { 68 public: ~ServerStreamingInterface()69 virtual ~ServerStreamingInterface() {} 70 71 /// Block to send initial metadata to client. 72 /// This call is optional, but if it is used, it cannot be used concurrently 73 /// with or after the \a Finish method. 74 /// 75 /// The initial metadata that will be sent to the client will be 76 /// taken from the \a ServerContext associated with the call. 77 virtual void SendInitialMetadata() = 0; 78 }; 79 80 /// An interface that yields a sequence of messages of type \a R. 81 template <class R> 82 class ReaderInterface { 83 public: ~ReaderInterface()84 virtual ~ReaderInterface() {} 85 86 /// Get an upper bound on the next message size available for reading on this 87 /// stream. 88 virtual bool NextMessageSize(uint32_t* sz) = 0; 89 90 /// Block to read a message and parse to \a msg. Returns \a true on success. 91 /// This is thread-safe with respect to \a Write or \WritesDone methods on 92 /// the same stream. It should not be called concurrently with another \a 93 /// Read on the same stream as the order of delivery will not be defined. 94 /// 95 /// \param[out] msg The read message. 96 /// 97 /// \return \a false when there will be no more incoming messages, either 98 /// because the other side has called \a WritesDone() or the stream has failed 99 /// (or been cancelled). 100 virtual bool Read(R* msg) = 0; 101 }; 102 103 /// An interface that can be fed a sequence of messages of type \a W. 104 template <class W> 105 class WriterInterface { 106 public: ~WriterInterface()107 virtual ~WriterInterface() {} 108 109 /// Block to write \a msg to the stream with WriteOptions \a options. 110 /// This is thread-safe with respect to \a ReaderInterface::Read 111 /// 112 /// \param msg The message to be written to the stream. 113 /// \param options The WriteOptions affecting the write operation. 114 /// 115 /// \return \a true on success, \a false when the stream has been closed. 116 virtual bool Write(const W& msg, WriteOptions options) = 0; 117 118 /// Block to write \a msg to the stream with default write options. 119 /// This is thread-safe with respect to \a ReaderInterface::Read 120 /// 121 /// \param msg The message to be written to the stream. 122 /// 123 /// \return \a true on success, \a false when the stream has been closed. Write(const W & msg)124 inline bool Write(const W& msg) { return Write(msg, WriteOptions()); } 125 126 /// Write \a msg and coalesce it with the writing of trailing metadata, using 127 /// WriteOptions \a options. 128 /// 129 /// For client, WriteLast is equivalent of performing Write and WritesDone in 130 /// a single step. \a msg and trailing metadata are coalesced and sent on wire 131 /// by calling this function. For server, WriteLast buffers the \a msg. 132 /// The writing of \a msg is held until the service handler returns, 133 /// where \a msg and trailing metadata are coalesced and sent on wire. 134 /// Note that WriteLast can only buffer \a msg up to the flow control window 135 /// size. If \a msg size is larger than the window size, it will be sent on 136 /// wire without buffering. 137 /// 138 /// \param[in] msg The message to be written to the stream. 139 /// \param[in] options The WriteOptions to be used to write this message. WriteLast(const W & msg,WriteOptions options)140 void WriteLast(const W& msg, WriteOptions options) { 141 Write(msg, options.set_last_message()); 142 } 143 }; 144 145 } // namespace internal 146 147 /// Client-side interface for streaming reads of message of type \a R. 148 template <class R> 149 class ClientReaderInterface : public internal::ClientStreamingInterface, 150 public internal::ReaderInterface<R> { 151 public: 152 /// Block to wait for initial metadata from server. The received metadata 153 /// can only be accessed after this call returns. Should only be called before 154 /// the first read. Calling this method is optional, and if it is not called 155 /// the metadata will be available in ClientContext after the first read. 156 virtual void WaitForInitialMetadata() = 0; 157 }; 158 159 namespace internal { 160 template <class R> 161 class ClientReaderFactory { 162 public: 163 template <class W> Create(ChannelInterface * channel,const::grpc::internal::RpcMethod & method,ClientContext * context,const W & request)164 static ClientReader<R>* Create(ChannelInterface* channel, 165 const ::grpc::internal::RpcMethod& method, 166 ClientContext* context, const W& request) { 167 return new ClientReader<R>(channel, method, context, request); 168 } 169 }; 170 } // namespace internal 171 172 /// Synchronous (blocking) client-side API for doing server-streaming RPCs, 173 /// where the stream of messages coming from the server has messages 174 /// of type \a R. 175 template <class R> 176 class ClientReader final : public ClientReaderInterface<R> { 177 public: 178 /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for 179 /// semantics. 180 /// 181 // Side effect: 182 /// Once complete, the initial metadata read from 183 /// the server will be accessable through the \a ClientContext used to 184 /// construct this object. WaitForInitialMetadata()185 void WaitForInitialMetadata() override { 186 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 187 188 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 189 ops; 190 ops.RecvInitialMetadata(context_); 191 call_.PerformOps(&ops); 192 cq_.Pluck(&ops); /// status ignored 193 } 194 NextMessageSize(uint32_t * sz)195 bool NextMessageSize(uint32_t* sz) override { 196 *sz = call_.max_receive_message_size(); 197 return true; 198 } 199 200 /// See the \a ReaderInterface.Read method for semantics. 201 /// Side effect: 202 /// This also receives initial metadata from the server, if not 203 /// already received (if initial metadata is received, it can be then 204 /// accessed through the \a ClientContext associated with this call). Read(R * msg)205 bool Read(R* msg) override { 206 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 207 ::grpc::internal::CallOpRecvMessage<R>> 208 ops; 209 if (!context_->initial_metadata_received_) { 210 ops.RecvInitialMetadata(context_); 211 } 212 ops.RecvMessage(msg); 213 call_.PerformOps(&ops); 214 return cq_.Pluck(&ops) && ops.got_message; 215 } 216 217 /// See the \a ClientStreamingInterface.Finish method for semantics. 218 /// 219 /// Side effect: 220 /// The \a ClientContext associated with this call is updated with 221 /// possible metadata received from the server. Finish()222 Status Finish() override { 223 ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops; 224 Status status; 225 ops.ClientRecvStatus(context_, &status); 226 call_.PerformOps(&ops); 227 GPR_CODEGEN_ASSERT(cq_.Pluck(&ops)); 228 return status; 229 } 230 231 private: 232 friend class internal::ClientReaderFactory<R>; 233 ClientContext* context_; 234 CompletionQueue cq_; 235 ::grpc::internal::Call call_; 236 237 /// Block to create a stream and write the initial metadata and \a request 238 /// out. Note that \a context will be used to fill in custom initial 239 /// metadata used to send to the server when starting the call. 240 template <class W> ClientReader(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,ClientContext * context,const W & request)241 ClientReader(::grpc::ChannelInterface* channel, 242 const ::grpc::internal::RpcMethod& method, 243 ClientContext* context, const W& request) 244 : context_(context), 245 cq_(grpc_completion_queue_attributes{ 246 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, 247 nullptr}), // Pluckable cq 248 call_(channel->CreateCall(method, context, &cq_)) { 249 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 250 ::grpc::internal::CallOpSendMessage, 251 ::grpc::internal::CallOpClientSendClose> 252 ops; 253 ops.SendInitialMetadata(context->send_initial_metadata_, 254 context->initial_metadata_flags()); 255 // TODO(ctiller): don't assert 256 GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok()); 257 ops.ClientSendClose(); 258 call_.PerformOps(&ops); 259 cq_.Pluck(&ops); 260 } 261 }; 262 263 /// Client-side interface for streaming writes of message type \a W. 264 template <class W> 265 class ClientWriterInterface : public internal::ClientStreamingInterface, 266 public internal::WriterInterface<W> { 267 public: 268 /// Half close writing from the client. (signal that the stream of messages 269 /// coming from the client is complete). 270 /// Blocks until currently-pending writes are completed. 271 /// Thread safe with respect to \a ReaderInterface::Read operations only 272 /// 273 /// \return Whether the writes were successful. 274 virtual bool WritesDone() = 0; 275 }; 276 277 namespace internal { 278 template <class W> 279 class ClientWriterFactory { 280 public: 281 template <class R> Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,ClientContext * context,R * response)282 static ClientWriter<W>* Create(::grpc::ChannelInterface* channel, 283 const ::grpc::internal::RpcMethod& method, 284 ClientContext* context, R* response) { 285 return new ClientWriter<W>(channel, method, context, response); 286 } 287 }; 288 } // namespace internal 289 290 /// Synchronous (blocking) client-side API for doing client-streaming RPCs, 291 /// where the outgoing message stream coming from the client has messages of 292 /// type \a W. 293 template <class W> 294 class ClientWriter : public ClientWriterInterface<W> { 295 public: 296 /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for 297 /// semantics. 298 /// 299 // Side effect: 300 /// Once complete, the initial metadata read from the server will be 301 /// accessable through the \a ClientContext used to construct this object. WaitForInitialMetadata()302 void WaitForInitialMetadata() { 303 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 304 305 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 306 ops; 307 ops.RecvInitialMetadata(context_); 308 call_.PerformOps(&ops); 309 cq_.Pluck(&ops); // status ignored 310 } 311 312 /// See the WriterInterface.Write(const W& msg, WriteOptions options) method 313 /// for semantics. 314 /// 315 /// Side effect: 316 /// Also sends initial metadata if not already sent (using the 317 /// \a ClientContext associated with this call). 318 using ::grpc::internal::WriterInterface<W>::Write; Write(const W & msg,WriteOptions options)319 bool Write(const W& msg, WriteOptions options) override { 320 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 321 ::grpc::internal::CallOpSendMessage, 322 ::grpc::internal::CallOpClientSendClose> 323 ops; 324 325 if (options.is_last_message()) { 326 options.set_buffer_hint(); 327 ops.ClientSendClose(); 328 } 329 if (context_->initial_metadata_corked_) { 330 ops.SendInitialMetadata(context_->send_initial_metadata_, 331 context_->initial_metadata_flags()); 332 context_->set_initial_metadata_corked(false); 333 } 334 if (!ops.SendMessage(msg, options).ok()) { 335 return false; 336 } 337 338 call_.PerformOps(&ops); 339 return cq_.Pluck(&ops); 340 } 341 WritesDone()342 bool WritesDone() override { 343 ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; 344 ops.ClientSendClose(); 345 call_.PerformOps(&ops); 346 return cq_.Pluck(&ops); 347 } 348 349 /// See the ClientStreamingInterface.Finish method for semantics. 350 /// Side effects: 351 /// - Also receives initial metadata if not already received. 352 /// - Attempts to fill in the \a response parameter passed 353 /// to the constructor of this instance with the response 354 /// message from the server. Finish()355 Status Finish() override { 356 Status status; 357 if (!context_->initial_metadata_received_) { 358 finish_ops_.RecvInitialMetadata(context_); 359 } 360 finish_ops_.ClientRecvStatus(context_, &status); 361 call_.PerformOps(&finish_ops_); 362 GPR_CODEGEN_ASSERT(cq_.Pluck(&finish_ops_)); 363 return status; 364 } 365 366 private: 367 friend class internal::ClientWriterFactory<W>; 368 369 /// Block to create a stream (i.e. send request headers and other initial 370 /// metadata to the server). Note that \a context will be used to fill 371 /// in custom initial metadata. \a response will be filled in with the 372 /// single expected response message from the server upon a successful 373 /// call to the \a Finish method of this instance. 374 template <class R> ClientWriter(ChannelInterface * channel,const::grpc::internal::RpcMethod & method,ClientContext * context,R * response)375 ClientWriter(ChannelInterface* channel, 376 const ::grpc::internal::RpcMethod& method, 377 ClientContext* context, R* response) 378 : context_(context), 379 cq_(grpc_completion_queue_attributes{ 380 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, 381 nullptr}), // Pluckable cq 382 call_(channel->CreateCall(method, context, &cq_)) { 383 finish_ops_.RecvMessage(response); 384 finish_ops_.AllowNoMessage(); 385 386 if (!context_->initial_metadata_corked_) { 387 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 388 ops; 389 ops.SendInitialMetadata(context->send_initial_metadata_, 390 context->initial_metadata_flags()); 391 call_.PerformOps(&ops); 392 cq_.Pluck(&ops); 393 } 394 } 395 396 ClientContext* context_; 397 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 398 ::grpc::internal::CallOpGenericRecvMessage, 399 ::grpc::internal::CallOpClientRecvStatus> 400 finish_ops_; 401 CompletionQueue cq_; 402 ::grpc::internal::Call call_; 403 }; 404 405 /// Client-side interface for bi-directional streaming with 406 /// client-to-server stream messages of type \a W and 407 /// server-to-client stream messages of type \a R. 408 template <class W, class R> 409 class ClientReaderWriterInterface : public internal::ClientStreamingInterface, 410 public internal::WriterInterface<W>, 411 public internal::ReaderInterface<R> { 412 public: 413 /// Block to wait for initial metadata from server. The received metadata 414 /// can only be accessed after this call returns. Should only be called before 415 /// the first read. Calling this method is optional, and if it is not called 416 /// the metadata will be available in ClientContext after the first read. 417 virtual void WaitForInitialMetadata() = 0; 418 419 /// Half close writing from the client. (signal that the stream of messages 420 /// coming from the clinet is complete). 421 /// Blocks until currently-pending writes are completed. 422 /// Thread-safe with respect to \a ReaderInterface::Read 423 /// 424 /// \return Whether the writes were successful. 425 virtual bool WritesDone() = 0; 426 }; 427 428 namespace internal { 429 template <class W, class R> 430 class ClientReaderWriterFactory { 431 public: Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,ClientContext * context)432 static ClientReaderWriter<W, R>* Create( 433 ::grpc::ChannelInterface* channel, 434 const ::grpc::internal::RpcMethod& method, ClientContext* context) { 435 return new ClientReaderWriter<W, R>(channel, method, context); 436 } 437 }; 438 } // namespace internal 439 440 /// Synchronous (blocking) client-side API for bi-directional streaming RPCs, 441 /// where the outgoing message stream coming from the client has messages of 442 /// type \a W, and the incoming messages stream coming from the server has 443 /// messages of type \a R. 444 template <class W, class R> 445 class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { 446 public: 447 /// Block waiting to read initial metadata from the server. 448 /// This call is optional, but if it is used, it cannot be used concurrently 449 /// with or after the \a Finish method. 450 /// 451 /// Once complete, the initial metadata read from the server will be 452 /// accessable through the \a ClientContext used to construct this object. WaitForInitialMetadata()453 void WaitForInitialMetadata() override { 454 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 455 456 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 457 ops; 458 ops.RecvInitialMetadata(context_); 459 call_.PerformOps(&ops); 460 cq_.Pluck(&ops); // status ignored 461 } 462 NextMessageSize(uint32_t * sz)463 bool NextMessageSize(uint32_t* sz) override { 464 *sz = call_.max_receive_message_size(); 465 return true; 466 } 467 468 /// See the \a ReaderInterface.Read method for semantics. 469 /// Side effect: 470 /// Also receives initial metadata if not already received (updates the \a 471 /// ClientContext associated with this call in that case). Read(R * msg)472 bool Read(R* msg) override { 473 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 474 ::grpc::internal::CallOpRecvMessage<R>> 475 ops; 476 if (!context_->initial_metadata_received_) { 477 ops.RecvInitialMetadata(context_); 478 } 479 ops.RecvMessage(msg); 480 call_.PerformOps(&ops); 481 return cq_.Pluck(&ops) && ops.got_message; 482 } 483 484 /// See the \a WriterInterface.Write method for semantics. 485 /// 486 /// Side effect: 487 /// Also sends initial metadata if not already sent (using the 488 /// \a ClientContext associated with this call to fill in values). 489 using ::grpc::internal::WriterInterface<W>::Write; Write(const W & msg,WriteOptions options)490 bool Write(const W& msg, WriteOptions options) override { 491 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 492 ::grpc::internal::CallOpSendMessage, 493 ::grpc::internal::CallOpClientSendClose> 494 ops; 495 496 if (options.is_last_message()) { 497 options.set_buffer_hint(); 498 ops.ClientSendClose(); 499 } 500 if (context_->initial_metadata_corked_) { 501 ops.SendInitialMetadata(context_->send_initial_metadata_, 502 context_->initial_metadata_flags()); 503 context_->set_initial_metadata_corked(false); 504 } 505 if (!ops.SendMessage(msg, options).ok()) { 506 return false; 507 } 508 509 call_.PerformOps(&ops); 510 return cq_.Pluck(&ops); 511 } 512 WritesDone()513 bool WritesDone() override { 514 ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; 515 ops.ClientSendClose(); 516 call_.PerformOps(&ops); 517 return cq_.Pluck(&ops); 518 } 519 520 /// See the ClientStreamingInterface.Finish method for semantics. 521 /// 522 /// Side effect: 523 /// - the \a ClientContext associated with this call is updated with 524 /// possible trailing metadata sent from the server. Finish()525 Status Finish() override { 526 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 527 ::grpc::internal::CallOpClientRecvStatus> 528 ops; 529 if (!context_->initial_metadata_received_) { 530 ops.RecvInitialMetadata(context_); 531 } 532 Status status; 533 ops.ClientRecvStatus(context_, &status); 534 call_.PerformOps(&ops); 535 GPR_CODEGEN_ASSERT(cq_.Pluck(&ops)); 536 return status; 537 } 538 539 private: 540 friend class internal::ClientReaderWriterFactory<W, R>; 541 542 ClientContext* context_; 543 CompletionQueue cq_; 544 ::grpc::internal::Call call_; 545 546 /// Block to create a stream and write the initial metadata and \a request 547 /// out. Note that \a context will be used to fill in custom initial metadata 548 /// used to send to the server when starting the call. ClientReaderWriter(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,ClientContext * context)549 ClientReaderWriter(::grpc::ChannelInterface* channel, 550 const ::grpc::internal::RpcMethod& method, 551 ClientContext* context) 552 : context_(context), 553 cq_(grpc_completion_queue_attributes{ 554 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, 555 nullptr}), // Pluckable cq 556 call_(channel->CreateCall(method, context, &cq_)) { 557 if (!context_->initial_metadata_corked_) { 558 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 559 ops; 560 ops.SendInitialMetadata(context->send_initial_metadata_, 561 context->initial_metadata_flags()); 562 call_.PerformOps(&ops); 563 cq_.Pluck(&ops); 564 } 565 } 566 }; 567 568 /// Server-side interface for streaming reads of message of type \a R. 569 template <class R> 570 class ServerReaderInterface : public internal::ServerStreamingInterface, 571 public internal::ReaderInterface<R> {}; 572 573 /// Synchronous (blocking) server-side API for doing client-streaming RPCs, 574 /// where the incoming message stream coming from the client has messages of 575 /// type \a R. 576 template <class R> 577 class ServerReader final : public ServerReaderInterface<R> { 578 public: 579 /// See the \a ServerStreamingInterface.SendInitialMetadata method 580 /// for semantics. Note that initial metadata will be affected by the 581 /// \a ServerContext associated with this call. SendInitialMetadata()582 void SendInitialMetadata() override { 583 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 584 585 internal::CallOpSet<internal::CallOpSendInitialMetadata> ops; 586 ops.SendInitialMetadata(ctx_->initial_metadata_, 587 ctx_->initial_metadata_flags()); 588 if (ctx_->compression_level_set()) { 589 ops.set_compression_level(ctx_->compression_level()); 590 } 591 ctx_->sent_initial_metadata_ = true; 592 call_->PerformOps(&ops); 593 call_->cq()->Pluck(&ops); 594 } 595 NextMessageSize(uint32_t * sz)596 bool NextMessageSize(uint32_t* sz) override { 597 *sz = call_->max_receive_message_size(); 598 return true; 599 } 600 Read(R * msg)601 bool Read(R* msg) override { 602 internal::CallOpSet<internal::CallOpRecvMessage<R>> ops; 603 ops.RecvMessage(msg); 604 call_->PerformOps(&ops); 605 return call_->cq()->Pluck(&ops) && ops.got_message; 606 } 607 608 private: 609 internal::Call* const call_; 610 ServerContext* const ctx_; 611 612 template <class ServiceType, class RequestType, class ResponseType> 613 friend class internal::ClientStreamingHandler; 614 ServerReader(internal::Call * call,ServerContext * ctx)615 ServerReader(internal::Call* call, ServerContext* ctx) 616 : call_(call), ctx_(ctx) {} 617 }; 618 619 /// Server-side interface for streaming writes of message of type \a W. 620 template <class W> 621 class ServerWriterInterface : public internal::ServerStreamingInterface, 622 public internal::WriterInterface<W> {}; 623 624 /// Synchronous (blocking) server-side API for doing for doing a 625 /// server-streaming RPCs, where the outgoing message stream coming from the 626 /// server has messages of type \a W. 627 template <class W> 628 class ServerWriter final : public ServerWriterInterface<W> { 629 public: 630 /// See the \a ServerStreamingInterface.SendInitialMetadata method 631 /// for semantics. 632 /// Note that initial metadata will be affected by the 633 /// \a ServerContext associated with this call. SendInitialMetadata()634 void SendInitialMetadata() override { 635 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 636 637 internal::CallOpSet<internal::CallOpSendInitialMetadata> ops; 638 ops.SendInitialMetadata(ctx_->initial_metadata_, 639 ctx_->initial_metadata_flags()); 640 if (ctx_->compression_level_set()) { 641 ops.set_compression_level(ctx_->compression_level()); 642 } 643 ctx_->sent_initial_metadata_ = true; 644 call_->PerformOps(&ops); 645 call_->cq()->Pluck(&ops); 646 } 647 648 /// See the \a WriterInterface.Write method for semantics. 649 /// 650 /// Side effect: 651 /// Also sends initial metadata if not already sent (using the 652 /// \a ClientContext associated with this call to fill in values). 653 using internal::WriterInterface<W>::Write; Write(const W & msg,WriteOptions options)654 bool Write(const W& msg, WriteOptions options) override { 655 if (options.is_last_message()) { 656 options.set_buffer_hint(); 657 } 658 659 if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { 660 return false; 661 } 662 if (!ctx_->sent_initial_metadata_) { 663 ctx_->pending_ops_.SendInitialMetadata(ctx_->initial_metadata_, 664 ctx_->initial_metadata_flags()); 665 if (ctx_->compression_level_set()) { 666 ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); 667 } 668 ctx_->sent_initial_metadata_ = true; 669 } 670 call_->PerformOps(&ctx_->pending_ops_); 671 // if this is the last message we defer the pluck until AFTER we start 672 // the trailing md op. This prevents hangs. See 673 // https://github.com/grpc/grpc/issues/11546 674 if (options.is_last_message()) { 675 ctx_->has_pending_ops_ = true; 676 return true; 677 } 678 ctx_->has_pending_ops_ = false; 679 return call_->cq()->Pluck(&ctx_->pending_ops_); 680 } 681 682 private: 683 internal::Call* const call_; 684 ServerContext* const ctx_; 685 686 template <class ServiceType, class RequestType, class ResponseType> 687 friend class internal::ServerStreamingHandler; 688 ServerWriter(internal::Call * call,ServerContext * ctx)689 ServerWriter(internal::Call* call, ServerContext* ctx) 690 : call_(call), ctx_(ctx) {} 691 }; 692 693 /// Server-side interface for bi-directional streaming. 694 template <class W, class R> 695 class ServerReaderWriterInterface : public internal::ServerStreamingInterface, 696 public internal::WriterInterface<W>, 697 public internal::ReaderInterface<R> {}; 698 699 /// Actual implementation of bi-directional streaming 700 namespace internal { 701 template <class W, class R> 702 class ServerReaderWriterBody final { 703 public: ServerReaderWriterBody(Call * call,ServerContext * ctx)704 ServerReaderWriterBody(Call* call, ServerContext* ctx) 705 : call_(call), ctx_(ctx) {} 706 SendInitialMetadata()707 void SendInitialMetadata() { 708 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 709 710 CallOpSet<CallOpSendInitialMetadata> ops; 711 ops.SendInitialMetadata(ctx_->initial_metadata_, 712 ctx_->initial_metadata_flags()); 713 if (ctx_->compression_level_set()) { 714 ops.set_compression_level(ctx_->compression_level()); 715 } 716 ctx_->sent_initial_metadata_ = true; 717 call_->PerformOps(&ops); 718 call_->cq()->Pluck(&ops); 719 } 720 NextMessageSize(uint32_t * sz)721 bool NextMessageSize(uint32_t* sz) { 722 *sz = call_->max_receive_message_size(); 723 return true; 724 } 725 Read(R * msg)726 bool Read(R* msg) { 727 CallOpSet<CallOpRecvMessage<R>> ops; 728 ops.RecvMessage(msg); 729 call_->PerformOps(&ops); 730 return call_->cq()->Pluck(&ops) && ops.got_message; 731 } 732 Write(const W & msg,WriteOptions options)733 bool Write(const W& msg, WriteOptions options) { 734 if (options.is_last_message()) { 735 options.set_buffer_hint(); 736 } 737 if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) { 738 return false; 739 } 740 if (!ctx_->sent_initial_metadata_) { 741 ctx_->pending_ops_.SendInitialMetadata(ctx_->initial_metadata_, 742 ctx_->initial_metadata_flags()); 743 if (ctx_->compression_level_set()) { 744 ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); 745 } 746 ctx_->sent_initial_metadata_ = true; 747 } 748 call_->PerformOps(&ctx_->pending_ops_); 749 // if this is the last message we defer the pluck until AFTER we start 750 // the trailing md op. This prevents hangs. See 751 // https://github.com/grpc/grpc/issues/11546 752 if (options.is_last_message()) { 753 ctx_->has_pending_ops_ = true; 754 return true; 755 } 756 ctx_->has_pending_ops_ = false; 757 return call_->cq()->Pluck(&ctx_->pending_ops_); 758 } 759 760 private: 761 Call* const call_; 762 ServerContext* const ctx_; 763 }; 764 765 } // namespace internal 766 767 /// Synchronous (blocking) server-side API for a bidirectional 768 /// streaming call, where the incoming message stream coming from the client has 769 /// messages of type \a R, and the outgoing message streaming coming from 770 /// the server has messages of type \a W. 771 template <class W, class R> 772 class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { 773 public: 774 /// See the \a ServerStreamingInterface.SendInitialMetadata method 775 /// for semantics. Note that initial metadata will be affected by the 776 /// \a ServerContext associated with this call. SendInitialMetadata()777 void SendInitialMetadata() override { body_.SendInitialMetadata(); } 778 NextMessageSize(uint32_t * sz)779 bool NextMessageSize(uint32_t* sz) override { 780 return body_.NextMessageSize(sz); 781 } 782 Read(R * msg)783 bool Read(R* msg) override { return body_.Read(msg); } 784 785 /// See the \a WriterInterface.Write(const W& msg, WriteOptions options) 786 /// method for semantics. 787 /// Side effect: 788 /// Also sends initial metadata if not already sent (using the \a 789 /// ServerContext associated with this call). 790 using internal::WriterInterface<W>::Write; Write(const W & msg,WriteOptions options)791 bool Write(const W& msg, WriteOptions options) override { 792 return body_.Write(msg, options); 793 } 794 795 private: 796 internal::ServerReaderWriterBody<W, R> body_; 797 798 friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>, 799 false>; ServerReaderWriter(internal::Call * call,ServerContext * ctx)800 ServerReaderWriter(internal::Call* call, ServerContext* ctx) 801 : body_(call, ctx) {} 802 }; 803 804 /// A class to represent a flow-controlled unary call. This is something 805 /// of a hybrid between conventional unary and streaming. This is invoked 806 /// through a unary call on the client side, but the server responds to it 807 /// as though it were a single-ping-pong streaming call. The server can use 808 /// the \a NextMessageSize method to determine an upper-bound on the size of 809 /// the message. A key difference relative to streaming: ServerUnaryStreamer 810 /// must have exactly 1 Read and exactly 1 Write, in that order, to function 811 /// correctly. Otherwise, the RPC is in error. 812 template <class RequestType, class ResponseType> 813 class ServerUnaryStreamer final 814 : public ServerReaderWriterInterface<ResponseType, RequestType> { 815 public: 816 /// Block to send initial metadata to client. 817 /// Implicit input parameter: 818 /// - the \a ServerContext associated with this call will be used for 819 /// sending initial metadata. SendInitialMetadata()820 void SendInitialMetadata() override { body_.SendInitialMetadata(); } 821 822 /// Get an upper bound on the request message size from the client. NextMessageSize(uint32_t * sz)823 bool NextMessageSize(uint32_t* sz) override { 824 return body_.NextMessageSize(sz); 825 } 826 827 /// Read a message of type \a R into \a msg. Completion will be notified by \a 828 /// tag on the associated completion queue. 829 /// This is thread-safe with respect to \a Write or \a WritesDone methods. It 830 /// should not be called concurrently with other streaming APIs 831 /// on the same stream. It is not meaningful to call it concurrently 832 /// with another \a ReaderInterface::Read on the same stream since reads on 833 /// the same stream are delivered in order. 834 /// 835 /// \param[out] msg Where to eventually store the read message. 836 /// \param[in] tag The tag identifying the operation. Read(RequestType * request)837 bool Read(RequestType* request) override { 838 if (read_done_) { 839 return false; 840 } 841 read_done_ = true; 842 return body_.Read(request); 843 } 844 845 /// Block to write \a msg to the stream with WriteOptions \a options. 846 /// This is thread-safe with respect to \a ReaderInterface::Read 847 /// 848 /// \param msg The message to be written to the stream. 849 /// \param options The WriteOptions affecting the write operation. 850 /// 851 /// \return \a true on success, \a false when the stream has been closed. 852 using internal::WriterInterface<ResponseType>::Write; Write(const ResponseType & response,WriteOptions options)853 bool Write(const ResponseType& response, WriteOptions options) override { 854 if (write_done_ || !read_done_) { 855 return false; 856 } 857 write_done_ = true; 858 return body_.Write(response, options); 859 } 860 861 private: 862 internal::ServerReaderWriterBody<ResponseType, RequestType> body_; 863 bool read_done_; 864 bool write_done_; 865 866 friend class internal::TemplatedBidiStreamingHandler< 867 ServerUnaryStreamer<RequestType, ResponseType>, true>; ServerUnaryStreamer(internal::Call * call,ServerContext * ctx)868 ServerUnaryStreamer(internal::Call* call, ServerContext* ctx) 869 : body_(call, ctx), read_done_(false), write_done_(false) {} 870 }; 871 872 /// A class to represent a flow-controlled server-side streaming call. 873 /// This is something of a hybrid between server-side and bidi streaming. 874 /// This is invoked through a server-side streaming call on the client side, 875 /// but the server responds to it as though it were a bidi streaming call that 876 /// must first have exactly 1 Read and then any number of Writes. 877 template <class RequestType, class ResponseType> 878 class ServerSplitStreamer final 879 : public ServerReaderWriterInterface<ResponseType, RequestType> { 880 public: 881 /// Block to send initial metadata to client. 882 /// Implicit input parameter: 883 /// - the \a ServerContext associated with this call will be used for 884 /// sending initial metadata. SendInitialMetadata()885 void SendInitialMetadata() override { body_.SendInitialMetadata(); } 886 887 /// Get an upper bound on the request message size from the client. NextMessageSize(uint32_t * sz)888 bool NextMessageSize(uint32_t* sz) override { 889 return body_.NextMessageSize(sz); 890 } 891 892 /// Read a message of type \a R into \a msg. Completion will be notified by \a 893 /// tag on the associated completion queue. 894 /// This is thread-safe with respect to \a Write or \a WritesDone methods. It 895 /// should not be called concurrently with other streaming APIs 896 /// on the same stream. It is not meaningful to call it concurrently 897 /// with another \a ReaderInterface::Read on the same stream since reads on 898 /// the same stream are delivered in order. 899 /// 900 /// \param[out] msg Where to eventually store the read message. 901 /// \param[in] tag The tag identifying the operation. Read(RequestType * request)902 bool Read(RequestType* request) override { 903 if (read_done_) { 904 return false; 905 } 906 read_done_ = true; 907 return body_.Read(request); 908 } 909 910 /// Block to write \a msg to the stream with WriteOptions \a options. 911 /// This is thread-safe with respect to \a ReaderInterface::Read 912 /// 913 /// \param msg The message to be written to the stream. 914 /// \param options The WriteOptions affecting the write operation. 915 /// 916 /// \return \a true on success, \a false when the stream has been closed. 917 using internal::WriterInterface<ResponseType>::Write; Write(const ResponseType & response,WriteOptions options)918 bool Write(const ResponseType& response, WriteOptions options) override { 919 return read_done_ && body_.Write(response, options); 920 } 921 922 private: 923 internal::ServerReaderWriterBody<ResponseType, RequestType> body_; 924 bool read_done_; 925 926 friend class internal::TemplatedBidiStreamingHandler< 927 ServerSplitStreamer<RequestType, ResponseType>, false>; ServerSplitStreamer(internal::Call * call,ServerContext * ctx)928 ServerSplitStreamer(internal::Call* call, ServerContext* ctx) 929 : body_(call, ctx), read_done_(false) {} 930 }; 931 932 } // namespace grpc 933 934 #endif // GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H 935