1 // 2 // 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPCPP_SUPPORT_SYNC_STREAM_H 20 #define GRPCPP_SUPPORT_SYNC_STREAM_H 21 22 #include <grpcpp/client_context.h> 23 #include <grpcpp/completion_queue.h> 24 #include <grpcpp/impl/call.h> 25 #include <grpcpp/impl/codegen/channel_interface.h> 26 #include <grpcpp/impl/service_type.h> 27 #include <grpcpp/server_context.h> 28 #include <grpcpp/support/status.h> 29 30 #include "absl/log/absl_check.h" 31 32 namespace grpc { 33 34 namespace internal { 35 /// Common interface for all synchronous client side streaming. 36 class ClientStreamingInterface { 37 public: ~ClientStreamingInterface()38 virtual ~ClientStreamingInterface() {} 39 40 /// Block waiting until the stream finishes and a final status of the call is 41 /// available. 42 /// 43 /// It is appropriate to call this method exactly once when both: 44 /// * the calling code (client-side) has no more message to send 45 /// (this can be declared implicitly by calling this method, or 46 /// explicitly through an earlier call to <i>WritesDone</i> method of the 47 /// class in use, e.g. \a ClientWriterInterface::WritesDone or 48 /// \a ClientReaderWriterInterface::WritesDone). 49 /// * there are no more messages to be received from the server (which can 50 /// be known implicitly, or explicitly from an earlier call to \a 51 /// ReaderInterface::Read that returned "false"). 52 /// 53 /// This function will return either: 54 /// - when all incoming messages have been read and the server has 55 /// returned status. 56 /// - when the server has returned a non-OK status. 57 /// - OR when the call failed for some reason and the library generated a 58 /// status. 59 /// 60 /// Return values: 61 /// - \a Status contains the status code, message and details for the call 62 /// - the \a ClientContext associated with this call is updated with 63 /// possible trailing metadata sent from the server. 64 virtual grpc::Status Finish() = 0; 65 }; 66 67 /// Common interface for all synchronous server side streaming. 68 class ServerStreamingInterface { 69 public: ~ServerStreamingInterface()70 virtual ~ServerStreamingInterface() {} 71 72 /// Block to send initial metadata to client. 73 /// This call is optional, but if it is used, it cannot be used concurrently 74 /// with or after the \a Finish method. 75 /// 76 /// The initial metadata that will be sent to the client will be 77 /// taken from the \a ServerContext associated with the call. 78 virtual void SendInitialMetadata() = 0; 79 }; 80 81 /// An interface that yields a sequence of messages of type \a R. 82 template <class R> 83 class ReaderInterface { 84 public: ~ReaderInterface()85 virtual ~ReaderInterface() {} 86 87 /// Get an upper bound on the next message size available for reading on this 88 /// stream. 89 virtual bool NextMessageSize(uint32_t* sz) = 0; 90 91 /// Block to read a message and parse to \a msg. Returns \a true on success. 92 /// This is thread-safe with respect to \a Write or \WritesDone methods on 93 /// the same stream. It should not be called concurrently with another \a 94 /// Read on the same stream as the order of delivery will not be defined. 95 /// 96 /// \param[out] msg The read message. 97 /// 98 /// \return \a false when there will be no more incoming messages, either 99 /// because the other side has called \a WritesDone() or the stream has failed 100 /// (or been cancelled). 101 virtual bool Read(R* msg) = 0; 102 }; 103 104 /// An interface that can be fed a sequence of messages of type \a W. 105 template <class W> 106 class WriterInterface { 107 public: ~WriterInterface()108 virtual ~WriterInterface() {} 109 110 /// Block to write \a msg to the stream with WriteOptions \a options. 111 /// This is thread-safe with respect to \a ReaderInterface::Read 112 /// 113 /// \param msg The message to be written to the stream. 114 /// \param options The WriteOptions affecting the write operation. 115 /// 116 /// \return \a true on success, \a false when the stream has been closed. 117 virtual bool Write(const W& msg, grpc::WriteOptions options) = 0; 118 119 /// Block to write \a msg to the stream with default write options. 120 /// This is thread-safe with respect to \a ReaderInterface::Read 121 /// 122 /// \param msg The message to be written to the stream. 123 /// 124 /// \return \a true on success, \a false when the stream has been closed. Write(const W & msg)125 inline bool Write(const W& msg) { return Write(msg, grpc::WriteOptions()); } 126 127 /// Write \a msg and coalesce it with the writing of trailing metadata, using 128 /// WriteOptions \a options. 129 /// 130 /// For client, WriteLast is equivalent of performing Write and WritesDone in 131 /// a single step. \a msg and trailing metadata are coalesced and sent on wire 132 /// by calling this function. For server, WriteLast buffers the \a msg. 133 /// The writing of \a msg is held until the service handler returns, 134 /// where \a msg and trailing metadata are coalesced and sent on wire. 135 /// Note that WriteLast can only buffer \a msg up to the flow control window 136 /// size. If \a msg size is larger than the window size, it will be sent on 137 /// wire without buffering. 138 /// 139 /// \param[in] msg The message to be written to the stream. 140 /// \param[in] options The WriteOptions to be used to write this message. WriteLast(const W & msg,grpc::WriteOptions options)141 void WriteLast(const W& msg, grpc::WriteOptions options) { 142 Write(msg, options.set_last_message()); 143 } 144 }; 145 146 } // namespace internal 147 148 /// Client-side interface for streaming reads of message of type \a R. 149 template <class R> 150 class ClientReaderInterface : public internal::ClientStreamingInterface, 151 public internal::ReaderInterface<R> { 152 public: 153 /// Block to wait for initial metadata from server. The received metadata 154 /// can only be accessed after this call returns. Should only be called before 155 /// the first read. Calling this method is optional, and if it is not called 156 /// the metadata will be available in ClientContext after the first read. 157 virtual void WaitForInitialMetadata() = 0; 158 }; 159 160 namespace internal { 161 template <class R> 162 class ClientReaderFactory { 163 public: 164 template <class W> Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request)165 static ClientReader<R>* Create(grpc::ChannelInterface* channel, 166 const grpc::internal::RpcMethod& method, 167 grpc::ClientContext* context, 168 const W& request) { 169 return new ClientReader<R>(channel, method, context, request); 170 } 171 }; 172 } // namespace internal 173 174 /// Synchronous (blocking) client-side API for doing server-streaming RPCs, 175 /// where the stream of messages coming from the server has messages 176 /// of type \a R. 177 template <class R> 178 class ClientReader final : public ClientReaderInterface<R> { 179 public: 180 /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for 181 /// semantics. 182 /// 183 // Side effect: 184 /// Once complete, the initial metadata read from 185 /// the server will be accessible through the \a ClientContext used to 186 /// construct this object. WaitForInitialMetadata()187 void WaitForInitialMetadata() override { 188 ABSL_CHECK(!context_->initial_metadata_received_); 189 190 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops; 191 ops.RecvInitialMetadata(context_); 192 call_.PerformOps(&ops); 193 cq_.Pluck(&ops); /// status ignored 194 } 195 NextMessageSize(uint32_t * sz)196 bool NextMessageSize(uint32_t* sz) override { 197 int result = call_.max_receive_message_size(); 198 *sz = (result > 0) ? result : UINT32_MAX; 199 return true; 200 } 201 202 /// See the \a ReaderInterface.Read method for semantics. 203 /// Side effect: 204 /// This also receives initial metadata from the server, if not 205 /// already received (if initial metadata is received, it can be then 206 /// accessed through the \a ClientContext associated with this call). Read(R * msg)207 bool Read(R* msg) override { 208 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 209 grpc::internal::CallOpRecvMessage<R>> 210 ops; 211 if (!context_->initial_metadata_received_) { 212 ops.RecvInitialMetadata(context_); 213 } 214 ops.RecvMessage(msg); 215 call_.PerformOps(&ops); 216 return cq_.Pluck(&ops) && ops.got_message; 217 } 218 219 /// See the \a ClientStreamingInterface.Finish method for semantics. 220 /// 221 /// Side effect: 222 /// The \a ClientContext associated with this call is updated with 223 /// possible metadata received from the server. Finish()224 grpc::Status Finish() override { 225 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 226 grpc::internal::CallOpClientRecvStatus> 227 ops; 228 if (!context_->initial_metadata_received_) { 229 ops.RecvInitialMetadata(context_); 230 } 231 grpc::Status status; 232 ops.ClientRecvStatus(context_, &status); 233 call_.PerformOps(&ops); 234 ABSL_CHECK(cq_.Pluck(&ops)); 235 return status; 236 } 237 238 private: 239 friend class internal::ClientReaderFactory<R>; 240 grpc::ClientContext* context_; 241 grpc::CompletionQueue cq_; 242 grpc::internal::Call call_; 243 244 /// Block to create a stream and write the initial metadata and \a request 245 /// out. Note that \a context will be used to fill in custom initial 246 /// metadata used to send to the server when starting the call. 247 template <class W> ClientReader(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request)248 ClientReader(grpc::ChannelInterface* channel, 249 const grpc::internal::RpcMethod& method, 250 grpc::ClientContext* context, const W& request) 251 : context_(context), 252 cq_(grpc_completion_queue_attributes{ 253 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, 254 nullptr}), // Pluckable cq 255 call_(channel->CreateCall(method, context, &cq_)) { 256 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 257 grpc::internal::CallOpSendMessage, 258 grpc::internal::CallOpClientSendClose> 259 ops; 260 ops.SendInitialMetadata(&context->send_initial_metadata_, 261 context->initial_metadata_flags()); 262 // TODO(ctiller): don't assert 263 ABSL_CHECK(ops.SendMessagePtr(&request).ok()); 264 ops.ClientSendClose(); 265 call_.PerformOps(&ops); 266 cq_.Pluck(&ops); 267 } 268 }; 269 270 /// Client-side interface for streaming writes of message type \a W. 271 template <class W> 272 class ClientWriterInterface : public internal::ClientStreamingInterface, 273 public internal::WriterInterface<W> { 274 public: 275 /// Half close writing from the client. (signal that the stream of messages 276 /// coming from the client is complete). 277 /// Blocks until currently-pending writes are completed. 278 /// Thread safe with respect to \a ReaderInterface::Read operations only 279 /// 280 /// \return Whether the writes were successful. 281 virtual bool WritesDone() = 0; 282 }; 283 284 namespace internal { 285 template <class W> 286 class ClientWriterFactory { 287 public: 288 template <class R> Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,R * response)289 static ClientWriter<W>* Create(grpc::ChannelInterface* channel, 290 const grpc::internal::RpcMethod& method, 291 grpc::ClientContext* context, R* response) { 292 return new ClientWriter<W>(channel, method, context, response); 293 } 294 }; 295 } // namespace internal 296 297 /// Synchronous (blocking) client-side API for doing client-streaming RPCs, 298 /// where the outgoing message stream coming from the client has messages of 299 /// type \a W. 300 template <class W> 301 class ClientWriter : public ClientWriterInterface<W> { 302 public: 303 /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for 304 /// semantics. 305 /// 306 // Side effect: 307 /// Once complete, the initial metadata read from the server will be 308 /// accessible through the \a ClientContext used to construct this object. WaitForInitialMetadata()309 void WaitForInitialMetadata() { 310 ABSL_CHECK(!context_->initial_metadata_received_); 311 312 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops; 313 ops.RecvInitialMetadata(context_); 314 call_.PerformOps(&ops); 315 cq_.Pluck(&ops); // status ignored 316 } 317 318 /// See the WriterInterface.Write(const W& msg, WriteOptions options) method 319 /// for semantics. 320 /// 321 /// Side effect: 322 /// Also sends initial metadata if not already sent (using the 323 /// \a ClientContext associated with this call). 324 using internal::WriterInterface<W>::Write; Write(const W & msg,grpc::WriteOptions options)325 bool Write(const W& msg, grpc::WriteOptions options) override { 326 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 327 grpc::internal::CallOpSendMessage, 328 grpc::internal::CallOpClientSendClose> 329 ops; 330 331 if (options.is_last_message()) { 332 options.set_buffer_hint(); 333 ops.ClientSendClose(); 334 } 335 if (context_->initial_metadata_corked_) { 336 ops.SendInitialMetadata(&context_->send_initial_metadata_, 337 context_->initial_metadata_flags()); 338 context_->set_initial_metadata_corked(false); 339 } 340 if (!ops.SendMessagePtr(&msg, options).ok()) { 341 return false; 342 } 343 344 call_.PerformOps(&ops); 345 return cq_.Pluck(&ops); 346 } 347 WritesDone()348 bool WritesDone() override { 349 grpc::internal::CallOpSet<grpc::internal::CallOpClientSendClose> ops; 350 ops.ClientSendClose(); 351 call_.PerformOps(&ops); 352 return cq_.Pluck(&ops); 353 } 354 355 /// See the ClientStreamingInterface.Finish method for semantics. 356 /// Side effects: 357 /// - Also receives initial metadata if not already received. 358 /// - Attempts to fill in the \a response parameter passed 359 /// to the constructor of this instance with the response 360 /// message from the server. Finish()361 grpc::Status Finish() override { 362 grpc::Status status; 363 if (!context_->initial_metadata_received_) { 364 finish_ops_.RecvInitialMetadata(context_); 365 } 366 finish_ops_.ClientRecvStatus(context_, &status); 367 call_.PerformOps(&finish_ops_); 368 ABSL_CHECK(cq_.Pluck(&finish_ops_)); 369 return status; 370 } 371 372 private: 373 friend class internal::ClientWriterFactory<W>; 374 375 /// Block to create a stream (i.e. send request headers and other initial 376 /// metadata to the server). Note that \a context will be used to fill 377 /// in custom initial metadata. \a response will be filled in with the 378 /// single expected response message from the server upon a successful 379 /// call to the \a Finish method of this instance. 380 template <class R> ClientWriter(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,R * response)381 ClientWriter(grpc::ChannelInterface* channel, 382 const grpc::internal::RpcMethod& method, 383 grpc::ClientContext* context, R* response) 384 : context_(context), 385 cq_(grpc_completion_queue_attributes{ 386 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, 387 nullptr}), // Pluckable cq 388 call_(channel->CreateCall(method, context, &cq_)) { 389 finish_ops_.RecvMessage(response); 390 finish_ops_.AllowNoMessage(); 391 392 if (!context_->initial_metadata_corked_) { 393 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; 394 ops.SendInitialMetadata(&context->send_initial_metadata_, 395 context->initial_metadata_flags()); 396 call_.PerformOps(&ops); 397 cq_.Pluck(&ops); 398 } 399 } 400 401 grpc::ClientContext* context_; 402 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 403 grpc::internal::CallOpGenericRecvMessage, 404 grpc::internal::CallOpClientRecvStatus> 405 finish_ops_; 406 grpc::CompletionQueue cq_; 407 grpc::internal::Call call_; 408 }; 409 410 /// Client-side interface for bi-directional streaming with 411 /// client-to-server stream messages of type \a W and 412 /// server-to-client stream messages of type \a R. 413 template <class W, class R> 414 class ClientReaderWriterInterface : public internal::ClientStreamingInterface, 415 public internal::WriterInterface<W>, 416 public internal::ReaderInterface<R> { 417 public: 418 /// Block to wait for initial metadata from server. The received metadata 419 /// can only be accessed after this call returns. Should only be called before 420 /// the first read. Calling this method is optional, and if it is not called 421 /// the metadata will be available in ClientContext after the first read. 422 virtual void WaitForInitialMetadata() = 0; 423 424 /// Half close writing from the client. (signal that the stream of messages 425 /// coming from the client is complete). 426 /// Blocks until currently-pending writes are completed. 427 /// Thread-safe with respect to \a ReaderInterface::Read 428 /// 429 /// \return Whether the writes were successful. 430 virtual bool WritesDone() = 0; 431 }; 432 433 namespace internal { 434 template <class W, class R> 435 class ClientReaderWriterFactory { 436 public: Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context)437 static ClientReaderWriter<W, R>* Create( 438 grpc::ChannelInterface* channel, const grpc::internal::RpcMethod& method, 439 grpc::ClientContext* context) { 440 return new ClientReaderWriter<W, R>(channel, method, context); 441 } 442 }; 443 } // namespace internal 444 445 /// Synchronous (blocking) client-side API for bi-directional streaming RPCs, 446 /// where the outgoing message stream coming from the client has messages of 447 /// type \a W, and the incoming messages stream coming from the server has 448 /// messages of type \a R. 449 template <class W, class R> 450 class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { 451 public: 452 /// Block waiting to read initial metadata from the server. 453 /// This call is optional, but if it is used, it cannot be used concurrently 454 /// with or after the \a Finish method. 455 /// 456 /// Once complete, the initial metadata read from the server will be 457 /// accessible through the \a ClientContext used to construct this object. WaitForInitialMetadata()458 void WaitForInitialMetadata() override { 459 ABSL_CHECK(!context_->initial_metadata_received_); 460 461 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops; 462 ops.RecvInitialMetadata(context_); 463 call_.PerformOps(&ops); 464 cq_.Pluck(&ops); // status ignored 465 } 466 NextMessageSize(uint32_t * sz)467 bool NextMessageSize(uint32_t* sz) override { 468 int result = call_.max_receive_message_size(); 469 *sz = (result > 0) ? result : UINT32_MAX; 470 return true; 471 } 472 473 /// See the \a ReaderInterface.Read method for semantics. 474 /// Side effect: 475 /// Also receives initial metadata if not already received (updates the \a 476 /// ClientContext associated with this call in that case). Read(R * msg)477 bool Read(R* msg) override { 478 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 479 grpc::internal::CallOpRecvMessage<R>> 480 ops; 481 if (!context_->initial_metadata_received_) { 482 ops.RecvInitialMetadata(context_); 483 } 484 ops.RecvMessage(msg); 485 call_.PerformOps(&ops); 486 return cq_.Pluck(&ops) && ops.got_message; 487 } 488 489 /// See the \a WriterInterface.Write method for semantics. 490 /// 491 /// Side effect: 492 /// Also sends initial metadata if not already sent (using the 493 /// \a ClientContext associated with this call to fill in values). 494 using internal::WriterInterface<W>::Write; Write(const W & msg,grpc::WriteOptions options)495 bool Write(const W& msg, grpc::WriteOptions options) override { 496 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 497 grpc::internal::CallOpSendMessage, 498 grpc::internal::CallOpClientSendClose> 499 ops; 500 501 if (options.is_last_message()) { 502 options.set_buffer_hint(); 503 ops.ClientSendClose(); 504 } 505 if (context_->initial_metadata_corked_) { 506 ops.SendInitialMetadata(&context_->send_initial_metadata_, 507 context_->initial_metadata_flags()); 508 context_->set_initial_metadata_corked(false); 509 } 510 if (!ops.SendMessagePtr(&msg, options).ok()) { 511 return false; 512 } 513 514 call_.PerformOps(&ops); 515 return cq_.Pluck(&ops); 516 } 517 WritesDone()518 bool WritesDone() override { 519 grpc::internal::CallOpSet<grpc::internal::CallOpClientSendClose> ops; 520 ops.ClientSendClose(); 521 call_.PerformOps(&ops); 522 return cq_.Pluck(&ops); 523 } 524 525 /// See the ClientStreamingInterface.Finish method for semantics. 526 /// 527 /// Side effect: 528 /// - the \a ClientContext associated with this call is updated with 529 /// possible trailing metadata sent from the server. Finish()530 grpc::Status Finish() override { 531 grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, 532 grpc::internal::CallOpClientRecvStatus> 533 ops; 534 if (!context_->initial_metadata_received_) { 535 ops.RecvInitialMetadata(context_); 536 } 537 grpc::Status status; 538 ops.ClientRecvStatus(context_, &status); 539 call_.PerformOps(&ops); 540 ABSL_CHECK(cq_.Pluck(&ops)); 541 return status; 542 } 543 544 private: 545 friend class internal::ClientReaderWriterFactory<W, R>; 546 547 grpc::ClientContext* context_; 548 grpc::CompletionQueue cq_; 549 grpc::internal::Call call_; 550 551 /// Block to create a stream and write the initial metadata and \a request 552 /// out. Note that \a context will be used to fill in custom initial metadata 553 /// used to send to the server when starting the call. ClientReaderWriter(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context)554 ClientReaderWriter(grpc::ChannelInterface* channel, 555 const grpc::internal::RpcMethod& method, 556 grpc::ClientContext* context) 557 : context_(context), 558 cq_(grpc_completion_queue_attributes{ 559 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, 560 nullptr}), // Pluckable cq 561 call_(channel->CreateCall(method, context, &cq_)) { 562 if (!context_->initial_metadata_corked_) { 563 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; 564 ops.SendInitialMetadata(&context->send_initial_metadata_, 565 context->initial_metadata_flags()); 566 call_.PerformOps(&ops); 567 cq_.Pluck(&ops); 568 } 569 } 570 }; 571 572 /// Server-side interface for streaming reads of message of type \a R. 573 template <class R> 574 class ServerReaderInterface : public internal::ServerStreamingInterface, 575 public internal::ReaderInterface<R> {}; 576 577 /// Synchronous (blocking) server-side API for doing client-streaming RPCs, 578 /// where the incoming message stream coming from the client has messages of 579 /// type \a R. 580 template <class R> 581 class ServerReader final : public ServerReaderInterface<R> { 582 public: 583 /// See the \a ServerStreamingInterface.SendInitialMetadata method 584 /// for semantics. Note that initial metadata will be affected by the 585 /// \a ServerContext associated with this call. SendInitialMetadata()586 void SendInitialMetadata() override { 587 ABSL_CHECK(!ctx_->sent_initial_metadata_); 588 589 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; 590 ops.SendInitialMetadata(&ctx_->initial_metadata_, 591 ctx_->initial_metadata_flags()); 592 if (ctx_->compression_level_set()) { 593 ops.set_compression_level(ctx_->compression_level()); 594 } 595 ctx_->sent_initial_metadata_ = true; 596 call_->PerformOps(&ops); 597 call_->cq()->Pluck(&ops); 598 } 599 NextMessageSize(uint32_t * sz)600 bool NextMessageSize(uint32_t* sz) override { 601 int result = call_->max_receive_message_size(); 602 *sz = (result > 0) ? result : UINT32_MAX; 603 return true; 604 } 605 Read(R * msg)606 bool Read(R* msg) override { 607 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> ops; 608 ops.RecvMessage(msg); 609 call_->PerformOps(&ops); 610 bool ok = call_->cq()->Pluck(&ops) && ops.got_message; 611 if (!ok) { 612 ctx_->MaybeMarkCancelledOnRead(); 613 } 614 return ok; 615 } 616 617 private: 618 grpc::internal::Call* const call_; 619 ServerContext* const ctx_; 620 621 template <class ServiceType, class RequestType, class ResponseType> 622 friend class internal::ClientStreamingHandler; 623 ServerReader(grpc::internal::Call * call,grpc::ServerContext * ctx)624 ServerReader(grpc::internal::Call* call, grpc::ServerContext* ctx) 625 : call_(call), ctx_(ctx) {} 626 }; 627 628 /// Server-side interface for streaming writes of message of type \a W. 629 template <class W> 630 class ServerWriterInterface : public internal::ServerStreamingInterface, 631 public internal::WriterInterface<W> {}; 632 633 /// Synchronous (blocking) server-side API for doing for doing a 634 /// server-streaming RPCs, where the outgoing message stream coming from the 635 /// server has messages of type \a W. 636 template <class W> 637 class ServerWriter final : public ServerWriterInterface<W> { 638 public: 639 /// See the \a ServerStreamingInterface.SendInitialMetadata method 640 /// for semantics. 641 /// Note that initial metadata will be affected by the 642 /// \a ServerContext associated with this call. SendInitialMetadata()643 void SendInitialMetadata() override { 644 ABSL_CHECK(!ctx_->sent_initial_metadata_); 645 646 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; 647 ops.SendInitialMetadata(&ctx_->initial_metadata_, 648 ctx_->initial_metadata_flags()); 649 if (ctx_->compression_level_set()) { 650 ops.set_compression_level(ctx_->compression_level()); 651 } 652 ctx_->sent_initial_metadata_ = true; 653 call_->PerformOps(&ops); 654 call_->cq()->Pluck(&ops); 655 } 656 657 /// See the \a WriterInterface.Write method for semantics. 658 /// 659 /// Side effect: 660 /// Also sends initial metadata if not already sent (using the 661 /// \a ClientContext associated with this call to fill in values). 662 using internal::WriterInterface<W>::Write; Write(const W & msg,grpc::WriteOptions options)663 bool Write(const W& msg, grpc::WriteOptions options) override { 664 if (options.is_last_message()) { 665 options.set_buffer_hint(); 666 } 667 668 if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { 669 return false; 670 } 671 if (!ctx_->sent_initial_metadata_) { 672 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 673 ctx_->initial_metadata_flags()); 674 if (ctx_->compression_level_set()) { 675 ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); 676 } 677 ctx_->sent_initial_metadata_ = true; 678 } 679 call_->PerformOps(&ctx_->pending_ops_); 680 // if this is the last message we defer the pluck until AFTER we start 681 // the trailing md op. This prevents hangs. See 682 // https://github.com/grpc/grpc/issues/11546 683 if (options.is_last_message()) { 684 ctx_->has_pending_ops_ = true; 685 return true; 686 } 687 ctx_->has_pending_ops_ = false; 688 return call_->cq()->Pluck(&ctx_->pending_ops_); 689 } 690 691 private: 692 grpc::internal::Call* const call_; 693 grpc::ServerContext* const ctx_; 694 695 template <class ServiceType, class RequestType, class ResponseType> 696 friend class internal::ServerStreamingHandler; 697 ServerWriter(grpc::internal::Call * call,grpc::ServerContext * ctx)698 ServerWriter(grpc::internal::Call* call, grpc::ServerContext* ctx) 699 : call_(call), ctx_(ctx) {} 700 }; 701 702 /// Server-side interface for bi-directional streaming. 703 template <class W, class R> 704 class ServerReaderWriterInterface : public internal::ServerStreamingInterface, 705 public internal::WriterInterface<W>, 706 public internal::ReaderInterface<R> {}; 707 708 /// Actual implementation of bi-directional streaming 709 namespace internal { 710 template <class W, class R> 711 class ServerReaderWriterBody final { 712 public: ServerReaderWriterBody(grpc::internal::Call * call,grpc::ServerContext * ctx)713 ServerReaderWriterBody(grpc::internal::Call* call, grpc::ServerContext* ctx) 714 : call_(call), ctx_(ctx) {} 715 SendInitialMetadata()716 void SendInitialMetadata() { 717 ABSL_CHECK(!ctx_->sent_initial_metadata_); 718 719 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; 720 ops.SendInitialMetadata(&ctx_->initial_metadata_, 721 ctx_->initial_metadata_flags()); 722 if (ctx_->compression_level_set()) { 723 ops.set_compression_level(ctx_->compression_level()); 724 } 725 ctx_->sent_initial_metadata_ = true; 726 call_->PerformOps(&ops); 727 call_->cq()->Pluck(&ops); 728 } 729 NextMessageSize(uint32_t * sz)730 bool NextMessageSize(uint32_t* sz) { 731 int result = call_->max_receive_message_size(); 732 *sz = (result > 0) ? result : UINT32_MAX; 733 return true; 734 } 735 Read(R * msg)736 bool Read(R* msg) { 737 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> ops; 738 ops.RecvMessage(msg); 739 call_->PerformOps(&ops); 740 bool ok = call_->cq()->Pluck(&ops) && ops.got_message; 741 if (!ok) { 742 ctx_->MaybeMarkCancelledOnRead(); 743 } 744 return ok; 745 } 746 Write(const W & msg,grpc::WriteOptions options)747 bool Write(const W& msg, grpc::WriteOptions options) { 748 if (options.is_last_message()) { 749 options.set_buffer_hint(); 750 } 751 if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { 752 return false; 753 } 754 if (!ctx_->sent_initial_metadata_) { 755 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 756 ctx_->initial_metadata_flags()); 757 if (ctx_->compression_level_set()) { 758 ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); 759 } 760 ctx_->sent_initial_metadata_ = true; 761 } 762 call_->PerformOps(&ctx_->pending_ops_); 763 // if this is the last message we defer the pluck until AFTER we start 764 // the trailing md op. This prevents hangs. See 765 // https://github.com/grpc/grpc/issues/11546 766 if (options.is_last_message()) { 767 ctx_->has_pending_ops_ = true; 768 return true; 769 } 770 ctx_->has_pending_ops_ = false; 771 return call_->cq()->Pluck(&ctx_->pending_ops_); 772 } 773 774 private: 775 grpc::internal::Call* const call_; 776 grpc::ServerContext* const ctx_; 777 }; 778 779 } // namespace internal 780 781 /// Synchronous (blocking) server-side API for a bidirectional 782 /// streaming call, where the incoming message stream coming from the client has 783 /// messages of type \a R, and the outgoing message streaming coming from 784 /// the server has messages of type \a W. 785 template <class W, class R> 786 class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { 787 public: 788 /// See the \a ServerStreamingInterface.SendInitialMetadata method 789 /// for semantics. Note that initial metadata will be affected by the 790 /// \a ServerContext associated with this call. SendInitialMetadata()791 void SendInitialMetadata() override { body_.SendInitialMetadata(); } 792 NextMessageSize(uint32_t * sz)793 bool NextMessageSize(uint32_t* sz) override { 794 return body_.NextMessageSize(sz); 795 } 796 Read(R * msg)797 bool Read(R* msg) override { return body_.Read(msg); } 798 799 /// See the \a WriterInterface.Write(const W& msg, WriteOptions options) 800 /// method for semantics. 801 /// Side effect: 802 /// Also sends initial metadata if not already sent (using the \a 803 /// ServerContext associated with this call). 804 using internal::WriterInterface<W>::Write; Write(const W & msg,grpc::WriteOptions options)805 bool Write(const W& msg, grpc::WriteOptions options) override { 806 return body_.Write(msg, options); 807 } 808 809 private: 810 internal::ServerReaderWriterBody<W, R> body_; 811 812 friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>, 813 false>; ServerReaderWriter(grpc::internal::Call * call,grpc::ServerContext * ctx)814 ServerReaderWriter(grpc::internal::Call* call, grpc::ServerContext* ctx) 815 : body_(call, ctx) {} 816 }; 817 818 /// A class to represent a flow-controlled unary call. This is something 819 /// of a hybrid between conventional unary and streaming. This is invoked 820 /// through a unary call on the client side, but the server responds to it 821 /// as though it were a single-ping-pong streaming call. The server can use 822 /// the \a NextMessageSize method to determine an upper-bound on the size of 823 /// the message. A key difference relative to streaming: ServerUnaryStreamer 824 /// must have exactly 1 Read and exactly 1 Write, in that order, to function 825 /// correctly. Otherwise, the RPC is in error. 826 template <class RequestType, class ResponseType> 827 class ServerUnaryStreamer final 828 : public ServerReaderWriterInterface<ResponseType, RequestType> { 829 public: 830 /// Block to send initial metadata to client. 831 /// Implicit input parameter: 832 /// - the \a ServerContext associated with this call will be used for 833 /// sending initial metadata. SendInitialMetadata()834 void SendInitialMetadata() override { body_.SendInitialMetadata(); } 835 836 /// Get an upper bound on the request message size from the client. NextMessageSize(uint32_t * sz)837 bool NextMessageSize(uint32_t* sz) override { 838 return body_.NextMessageSize(sz); 839 } 840 841 /// Read a message of type \a R into \a msg. Completion will be notified by \a 842 /// tag on the associated completion queue. 843 /// This is thread-safe with respect to \a Write or \a WritesDone methods. It 844 /// should not be called concurrently with other streaming APIs 845 /// on the same stream. It is not meaningful to call it concurrently 846 /// with another \a ReaderInterface::Read on the same stream since reads on 847 /// the same stream are delivered in order. 848 /// 849 /// \param[out] msg Where to eventually store the read message. 850 /// \param[in] tag The tag identifying the operation. Read(RequestType * request)851 bool Read(RequestType* request) override { 852 if (read_done_) { 853 return false; 854 } 855 read_done_ = true; 856 return body_.Read(request); 857 } 858 859 /// Block to write \a msg to the stream with WriteOptions \a options. 860 /// This is thread-safe with respect to \a ReaderInterface::Read 861 /// 862 /// \param msg The message to be written to the stream. 863 /// \param options The WriteOptions affecting the write operation. 864 /// 865 /// \return \a true on success, \a false when the stream has been closed. 866 using internal::WriterInterface<ResponseType>::Write; Write(const ResponseType & response,grpc::WriteOptions options)867 bool Write(const ResponseType& response, 868 grpc::WriteOptions options) override { 869 if (write_done_ || !read_done_) { 870 return false; 871 } 872 write_done_ = true; 873 return body_.Write(response, options); 874 } 875 876 private: 877 internal::ServerReaderWriterBody<ResponseType, RequestType> body_; 878 bool read_done_; 879 bool write_done_; 880 881 friend class internal::TemplatedBidiStreamingHandler< 882 ServerUnaryStreamer<RequestType, ResponseType>, true>; ServerUnaryStreamer(grpc::internal::Call * call,grpc::ServerContext * ctx)883 ServerUnaryStreamer(grpc::internal::Call* call, grpc::ServerContext* ctx) 884 : body_(call, ctx), read_done_(false), write_done_(false) {} 885 }; 886 887 /// A class to represent a flow-controlled server-side streaming call. 888 /// This is something of a hybrid between server-side and bidi streaming. 889 /// This is invoked through a server-side streaming call on the client side, 890 /// but the server responds to it as though it were a bidi streaming call that 891 /// must first have exactly 1 Read and then any number of Writes. 892 template <class RequestType, class ResponseType> 893 class ServerSplitStreamer final 894 : public ServerReaderWriterInterface<ResponseType, RequestType> { 895 public: 896 /// Block to send initial metadata to client. 897 /// Implicit input parameter: 898 /// - the \a ServerContext associated with this call will be used for 899 /// sending initial metadata. SendInitialMetadata()900 void SendInitialMetadata() override { body_.SendInitialMetadata(); } 901 902 /// Get an upper bound on the request message size from the client. NextMessageSize(uint32_t * sz)903 bool NextMessageSize(uint32_t* sz) override { 904 return body_.NextMessageSize(sz); 905 } 906 907 /// Read a message of type \a R into \a msg. Completion will be notified by \a 908 /// tag on the associated completion queue. 909 /// This is thread-safe with respect to \a Write or \a WritesDone methods. It 910 /// should not be called concurrently with other streaming APIs 911 /// on the same stream. It is not meaningful to call it concurrently 912 /// with another \a ReaderInterface::Read on the same stream since reads on 913 /// the same stream are delivered in order. 914 /// 915 /// \param[out] msg Where to eventually store the read message. 916 /// \param[in] tag The tag identifying the operation. Read(RequestType * request)917 bool Read(RequestType* request) override { 918 if (read_done_) { 919 return false; 920 } 921 read_done_ = true; 922 return body_.Read(request); 923 } 924 925 /// Block to write \a msg to the stream with WriteOptions \a options. 926 /// This is thread-safe with respect to \a ReaderInterface::Read 927 /// 928 /// \param msg The message to be written to the stream. 929 /// \param options The WriteOptions affecting the write operation. 930 /// 931 /// \return \a true on success, \a false when the stream has been closed. 932 using internal::WriterInterface<ResponseType>::Write; Write(const ResponseType & response,grpc::WriteOptions options)933 bool Write(const ResponseType& response, 934 grpc::WriteOptions options) override { 935 return read_done_ && body_.Write(response, options); 936 } 937 938 private: 939 internal::ServerReaderWriterBody<ResponseType, RequestType> body_; 940 bool read_done_; 941 942 friend class internal::TemplatedBidiStreamingHandler< 943 ServerSplitStreamer<RequestType, ResponseType>, false>; ServerSplitStreamer(grpc::internal::Call * call,grpc::ServerContext * ctx)944 ServerSplitStreamer(grpc::internal::Call* call, grpc::ServerContext* ctx) 945 : body_(call, ctx), read_done_(false) {} 946 }; 947 948 } // namespace grpc 949 950 #endif // GRPCPP_SUPPORT_SYNC_STREAM_H 951