1 /* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #ifndef GRPCPP_IMPL_CODEGEN_SYNC_STREAM_IMPL_H 19 #define GRPCPP_IMPL_CODEGEN_SYNC_STREAM_IMPL_H 20 21 #include <grpcpp/impl/codegen/call.h> 22 #include <grpcpp/impl/codegen/channel_interface.h> 23 #include <grpcpp/impl/codegen/client_context_impl.h> 24 #include <grpcpp/impl/codegen/completion_queue_impl.h> 25 #include <grpcpp/impl/codegen/core_codegen_interface.h> 26 #include <grpcpp/impl/codegen/server_context_impl.h> 27 #include <grpcpp/impl/codegen/service_type.h> 28 #include <grpcpp/impl/codegen/status.h> 29 30 namespace grpc_impl { 31 32 namespace internal { 33 /// Common interface for all synchronous client side streaming. 34 class ClientStreamingInterface { 35 public: ~ClientStreamingInterface()36 virtual ~ClientStreamingInterface() {} 37 38 /// Block waiting until the stream finishes and a final status of the call is 39 /// available. 40 /// 41 /// It is appropriate to call this method exactly once when both: 42 /// * the calling code (client-side) has no more message to send 43 /// (this can be declared implicitly by calling this method, or 44 /// explicitly through an earlier call to <i>WritesDone</i> method of the 45 /// class in use, e.g. \a ClientWriterInterface::WritesDone or 46 /// \a ClientReaderWriterInterface::WritesDone). 47 /// * there are no more messages to be received from the server (which can 48 /// be known implicitly, or explicitly from an earlier call to \a 49 /// ReaderInterface::Read that returned "false"). 50 /// 51 /// This function will return either: 52 /// - when all incoming messages have been read and the server has 53 /// returned status. 54 /// - when the server has returned a non-OK status. 55 /// - OR when the call failed for some reason and the library generated a 56 /// status. 57 /// 58 /// Return values: 59 /// - \a Status contains the status code, message and details for the call 60 /// - the \a ClientContext associated with this call is updated with 61 /// possible trailing metadata sent from the server. 62 virtual ::grpc::Status Finish() = 0; 63 }; 64 65 /// Common interface for all synchronous server side streaming. 66 class ServerStreamingInterface { 67 public: ~ServerStreamingInterface()68 virtual ~ServerStreamingInterface() {} 69 70 /// Block to send initial metadata to client. 71 /// This call is optional, but if it is used, it cannot be used concurrently 72 /// with or after the \a Finish method. 73 /// 74 /// The initial metadata that will be sent to the client will be 75 /// taken from the \a ServerContext associated with the call. 76 virtual void SendInitialMetadata() = 0; 77 }; 78 79 /// An interface that yields a sequence of messages of type \a R. 80 template <class R> 81 class ReaderInterface { 82 public: ~ReaderInterface()83 virtual ~ReaderInterface() {} 84 85 /// Get an upper bound on the next message size available for reading on this 86 /// stream. 87 virtual bool NextMessageSize(uint32_t* sz) = 0; 88 89 /// Block to read a message and parse to \a msg. Returns \a true on success. 90 /// This is thread-safe with respect to \a Write or \WritesDone methods on 91 /// the same stream. It should not be called concurrently with another \a 92 /// Read on the same stream as the order of delivery will not be defined. 93 /// 94 /// \param[out] msg The read message. 95 /// 96 /// \return \a false when there will be no more incoming messages, either 97 /// because the other side has called \a WritesDone() or the stream has failed 98 /// (or been cancelled). 99 virtual bool Read(R* msg) = 0; 100 }; 101 102 /// An interface that can be fed a sequence of messages of type \a W. 103 template <class W> 104 class WriterInterface { 105 public: ~WriterInterface()106 virtual ~WriterInterface() {} 107 108 /// Block to write \a msg to the stream with WriteOptions \a options. 109 /// This is thread-safe with respect to \a ReaderInterface::Read 110 /// 111 /// \param msg The message to be written to the stream. 112 /// \param options The WriteOptions affecting the write operation. 113 /// 114 /// \return \a true on success, \a false when the stream has been closed. 115 virtual bool Write(const W& msg, ::grpc::WriteOptions options) = 0; 116 117 /// Block to write \a msg to the stream with default write options. 118 /// This is thread-safe with respect to \a ReaderInterface::Read 119 /// 120 /// \param msg The message to be written to the stream. 121 /// 122 /// \return \a true on success, \a false when the stream has been closed. Write(const W & msg)123 inline bool Write(const W& msg) { return Write(msg, ::grpc::WriteOptions()); } 124 125 /// Write \a msg and coalesce it with the writing of trailing metadata, using 126 /// WriteOptions \a options. 127 /// 128 /// For client, WriteLast is equivalent of performing Write and WritesDone in 129 /// a single step. \a msg and trailing metadata are coalesced and sent on wire 130 /// by calling this function. For server, WriteLast buffers the \a msg. 131 /// The writing of \a msg is held until the service handler returns, 132 /// where \a msg and trailing metadata are coalesced and sent on wire. 133 /// Note that WriteLast can only buffer \a msg up to the flow control window 134 /// size. If \a msg size is larger than the window size, it will be sent on 135 /// wire without buffering. 136 /// 137 /// \param[in] msg The message to be written to the stream. 138 /// \param[in] options The WriteOptions to be used to write this message. WriteLast(const W & msg,::grpc::WriteOptions options)139 void WriteLast(const W& msg, ::grpc::WriteOptions options) { 140 Write(msg, options.set_last_message()); 141 } 142 }; 143 144 } // namespace internal 145 146 /// Client-side interface for streaming reads of message of type \a R. 147 template <class R> 148 class ClientReaderInterface : public internal::ClientStreamingInterface, 149 public internal::ReaderInterface<R> { 150 public: 151 /// Block to wait for initial metadata from server. The received metadata 152 /// can only be accessed after this call returns. Should only be called before 153 /// the first read. Calling this method is optional, and if it is not called 154 /// the metadata will be available in ClientContext after the first read. 155 virtual void WaitForInitialMetadata() = 0; 156 }; 157 158 namespace internal { 159 template <class R> 160 class ClientReaderFactory { 161 public: 162 template <class W> Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,const W & request)163 static ClientReader<R>* Create(::grpc::ChannelInterface* channel, 164 const ::grpc::internal::RpcMethod& method, 165 ::grpc_impl::ClientContext* context, 166 const W& request) { 167 return new ClientReader<R>(channel, method, context, request); 168 } 169 }; 170 } // namespace internal 171 172 /// Synchronous (blocking) client-side API for doing server-streaming RPCs, 173 /// where the stream of messages coming from the server has messages 174 /// of type \a R. 175 template <class R> 176 class ClientReader final : public ClientReaderInterface<R> { 177 public: 178 /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for 179 /// semantics. 180 /// 181 // Side effect: 182 /// Once complete, the initial metadata read from 183 /// the server will be accessible through the \a ClientContext used to 184 /// construct this object. WaitForInitialMetadata()185 void WaitForInitialMetadata() override { 186 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 187 188 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 189 ops; 190 ops.RecvInitialMetadata(context_); 191 call_.PerformOps(&ops); 192 cq_.Pluck(&ops); /// status ignored 193 } 194 NextMessageSize(uint32_t * sz)195 bool NextMessageSize(uint32_t* sz) override { 196 int result = call_.max_receive_message_size(); 197 *sz = (result > 0) ? result : UINT32_MAX; 198 return true; 199 } 200 201 /// See the \a ReaderInterface.Read method for semantics. 202 /// Side effect: 203 /// This also receives initial metadata from the server, if not 204 /// already received (if initial metadata is received, it can be then 205 /// accessed through the \a ClientContext associated with this call). Read(R * msg)206 bool Read(R* msg) override { 207 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 208 ::grpc::internal::CallOpRecvMessage<R>> 209 ops; 210 if (!context_->initial_metadata_received_) { 211 ops.RecvInitialMetadata(context_); 212 } 213 ops.RecvMessage(msg); 214 call_.PerformOps(&ops); 215 return cq_.Pluck(&ops) && ops.got_message; 216 } 217 218 /// See the \a ClientStreamingInterface.Finish method for semantics. 219 /// 220 /// Side effect: 221 /// The \a ClientContext associated with this call is updated with 222 /// possible metadata received from the server. Finish()223 ::grpc::Status Finish() override { 224 ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops; 225 ::grpc::Status status; 226 ops.ClientRecvStatus(context_, &status); 227 call_.PerformOps(&ops); 228 GPR_CODEGEN_ASSERT(cq_.Pluck(&ops)); 229 return status; 230 } 231 232 private: 233 friend class internal::ClientReaderFactory<R>; 234 ::grpc_impl::ClientContext* context_; 235 ::grpc_impl::CompletionQueue cq_; 236 ::grpc::internal::Call call_; 237 238 /// Block to create a stream and write the initial metadata and \a request 239 /// out. Note that \a context will be used to fill in custom initial 240 /// metadata used to send to the server when starting the call. 241 template <class W> ClientReader(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,const W & request)242 ClientReader(::grpc::ChannelInterface* channel, 243 const ::grpc::internal::RpcMethod& method, 244 ::grpc_impl::ClientContext* context, const W& request) 245 : context_(context), 246 cq_(grpc_completion_queue_attributes{ 247 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, 248 nullptr}), // Pluckable cq 249 call_(channel->CreateCall(method, context, &cq_)) { 250 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 251 ::grpc::internal::CallOpSendMessage, 252 ::grpc::internal::CallOpClientSendClose> 253 ops; 254 ops.SendInitialMetadata(&context->send_initial_metadata_, 255 context->initial_metadata_flags()); 256 // TODO(ctiller): don't assert 257 GPR_CODEGEN_ASSERT(ops.SendMessagePtr(&request).ok()); 258 ops.ClientSendClose(); 259 call_.PerformOps(&ops); 260 cq_.Pluck(&ops); 261 } 262 }; 263 264 /// Client-side interface for streaming writes of message type \a W. 265 template <class W> 266 class ClientWriterInterface : public internal::ClientStreamingInterface, 267 public internal::WriterInterface<W> { 268 public: 269 /// Half close writing from the client. (signal that the stream of messages 270 /// coming from the client is complete). 271 /// Blocks until currently-pending writes are completed. 272 /// Thread safe with respect to \a ReaderInterface::Read operations only 273 /// 274 /// \return Whether the writes were successful. 275 virtual bool WritesDone() = 0; 276 }; 277 278 namespace internal { 279 template <class W> 280 class ClientWriterFactory { 281 public: 282 template <class R> Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,R * response)283 static ClientWriter<W>* Create(::grpc::ChannelInterface* channel, 284 const ::grpc::internal::RpcMethod& method, 285 ::grpc_impl::ClientContext* context, 286 R* response) { 287 return new ClientWriter<W>(channel, method, context, response); 288 } 289 }; 290 } // namespace internal 291 292 /// Synchronous (blocking) client-side API for doing client-streaming RPCs, 293 /// where the outgoing message stream coming from the client has messages of 294 /// type \a W. 295 template <class W> 296 class ClientWriter : public ClientWriterInterface<W> { 297 public: 298 /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for 299 /// semantics. 300 /// 301 // Side effect: 302 /// Once complete, the initial metadata read from the server will be 303 /// accessible through the \a ClientContext used to construct this object. WaitForInitialMetadata()304 void WaitForInitialMetadata() { 305 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 306 307 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 308 ops; 309 ops.RecvInitialMetadata(context_); 310 call_.PerformOps(&ops); 311 cq_.Pluck(&ops); // status ignored 312 } 313 314 /// See the WriterInterface.Write(const W& msg, WriteOptions options) method 315 /// for semantics. 316 /// 317 /// Side effect: 318 /// Also sends initial metadata if not already sent (using the 319 /// \a ClientContext associated with this call). 320 using internal::WriterInterface<W>::Write; Write(const W & msg,::grpc::WriteOptions options)321 bool Write(const W& msg, ::grpc::WriteOptions options) override { 322 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 323 ::grpc::internal::CallOpSendMessage, 324 ::grpc::internal::CallOpClientSendClose> 325 ops; 326 327 if (options.is_last_message()) { 328 options.set_buffer_hint(); 329 ops.ClientSendClose(); 330 } 331 if (context_->initial_metadata_corked_) { 332 ops.SendInitialMetadata(&context_->send_initial_metadata_, 333 context_->initial_metadata_flags()); 334 context_->set_initial_metadata_corked(false); 335 } 336 if (!ops.SendMessagePtr(&msg, options).ok()) { 337 return false; 338 } 339 340 call_.PerformOps(&ops); 341 return cq_.Pluck(&ops); 342 } 343 WritesDone()344 bool WritesDone() override { 345 ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; 346 ops.ClientSendClose(); 347 call_.PerformOps(&ops); 348 return cq_.Pluck(&ops); 349 } 350 351 /// See the ClientStreamingInterface.Finish method for semantics. 352 /// Side effects: 353 /// - Also receives initial metadata if not already received. 354 /// - Attempts to fill in the \a response parameter passed 355 /// to the constructor of this instance with the response 356 /// message from the server. Finish()357 ::grpc::Status Finish() override { 358 ::grpc::Status status; 359 if (!context_->initial_metadata_received_) { 360 finish_ops_.RecvInitialMetadata(context_); 361 } 362 finish_ops_.ClientRecvStatus(context_, &status); 363 call_.PerformOps(&finish_ops_); 364 GPR_CODEGEN_ASSERT(cq_.Pluck(&finish_ops_)); 365 return status; 366 } 367 368 private: 369 friend class internal::ClientWriterFactory<W>; 370 371 /// Block to create a stream (i.e. send request headers and other initial 372 /// metadata to the server). Note that \a context will be used to fill 373 /// in custom initial metadata. \a response will be filled in with the 374 /// single expected response message from the server upon a successful 375 /// call to the \a Finish method of this instance. 376 template <class R> ClientWriter(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,R * response)377 ClientWriter(::grpc::ChannelInterface* channel, 378 const ::grpc::internal::RpcMethod& method, 379 ::grpc_impl::ClientContext* context, R* response) 380 : context_(context), 381 cq_(grpc_completion_queue_attributes{ 382 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, 383 nullptr}), // Pluckable cq 384 call_(channel->CreateCall(method, context, &cq_)) { 385 finish_ops_.RecvMessage(response); 386 finish_ops_.AllowNoMessage(); 387 388 if (!context_->initial_metadata_corked_) { 389 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 390 ops; 391 ops.SendInitialMetadata(&context->send_initial_metadata_, 392 context->initial_metadata_flags()); 393 call_.PerformOps(&ops); 394 cq_.Pluck(&ops); 395 } 396 } 397 398 ::grpc_impl::ClientContext* context_; 399 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 400 ::grpc::internal::CallOpGenericRecvMessage, 401 ::grpc::internal::CallOpClientRecvStatus> 402 finish_ops_; 403 ::grpc_impl::CompletionQueue cq_; 404 ::grpc::internal::Call call_; 405 }; 406 407 /// Client-side interface for bi-directional streaming with 408 /// client-to-server stream messages of type \a W and 409 /// server-to-client stream messages of type \a R. 410 template <class W, class R> 411 class ClientReaderWriterInterface : public internal::ClientStreamingInterface, 412 public internal::WriterInterface<W>, 413 public internal::ReaderInterface<R> { 414 public: 415 /// Block to wait for initial metadata from server. The received metadata 416 /// can only be accessed after this call returns. Should only be called before 417 /// the first read. Calling this method is optional, and if it is not called 418 /// the metadata will be available in ClientContext after the first read. 419 virtual void WaitForInitialMetadata() = 0; 420 421 /// Half close writing from the client. (signal that the stream of messages 422 /// coming from the client is complete). 423 /// Blocks until currently-pending writes are completed. 424 /// Thread-safe with respect to \a ReaderInterface::Read 425 /// 426 /// \return Whether the writes were successful. 427 virtual bool WritesDone() = 0; 428 }; 429 430 namespace internal { 431 template <class W, class R> 432 class ClientReaderWriterFactory { 433 public: Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context)434 static ClientReaderWriter<W, R>* Create( 435 ::grpc::ChannelInterface* channel, 436 const ::grpc::internal::RpcMethod& method, 437 ::grpc_impl::ClientContext* context) { 438 return new ClientReaderWriter<W, R>(channel, method, context); 439 } 440 }; 441 } // namespace internal 442 443 /// Synchronous (blocking) client-side API for bi-directional streaming RPCs, 444 /// where the outgoing message stream coming from the client has messages of 445 /// type \a W, and the incoming messages stream coming from the server has 446 /// messages of type \a R. 447 template <class W, class R> 448 class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { 449 public: 450 /// Block waiting to read initial metadata from the server. 451 /// This call is optional, but if it is used, it cannot be used concurrently 452 /// with or after the \a Finish method. 453 /// 454 /// Once complete, the initial metadata read from the server will be 455 /// accessible through the \a ClientContext used to construct this object. WaitForInitialMetadata()456 void WaitForInitialMetadata() override { 457 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 458 459 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 460 ops; 461 ops.RecvInitialMetadata(context_); 462 call_.PerformOps(&ops); 463 cq_.Pluck(&ops); // status ignored 464 } 465 NextMessageSize(uint32_t * sz)466 bool NextMessageSize(uint32_t* sz) override { 467 int result = call_.max_receive_message_size(); 468 *sz = (result > 0) ? result : UINT32_MAX; 469 return true; 470 } 471 472 /// See the \a ReaderInterface.Read method for semantics. 473 /// Side effect: 474 /// Also receives initial metadata if not already received (updates the \a 475 /// ClientContext associated with this call in that case). Read(R * msg)476 bool Read(R* msg) override { 477 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 478 ::grpc::internal::CallOpRecvMessage<R>> 479 ops; 480 if (!context_->initial_metadata_received_) { 481 ops.RecvInitialMetadata(context_); 482 } 483 ops.RecvMessage(msg); 484 call_.PerformOps(&ops); 485 return cq_.Pluck(&ops) && ops.got_message; 486 } 487 488 /// See the \a WriterInterface.Write method for semantics. 489 /// 490 /// Side effect: 491 /// Also sends initial metadata if not already sent (using the 492 /// \a ClientContext associated with this call to fill in values). 493 using internal::WriterInterface<W>::Write; Write(const W & msg,::grpc::WriteOptions options)494 bool Write(const W& msg, ::grpc::WriteOptions options) override { 495 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 496 ::grpc::internal::CallOpSendMessage, 497 ::grpc::internal::CallOpClientSendClose> 498 ops; 499 500 if (options.is_last_message()) { 501 options.set_buffer_hint(); 502 ops.ClientSendClose(); 503 } 504 if (context_->initial_metadata_corked_) { 505 ops.SendInitialMetadata(&context_->send_initial_metadata_, 506 context_->initial_metadata_flags()); 507 context_->set_initial_metadata_corked(false); 508 } 509 if (!ops.SendMessagePtr(&msg, options).ok()) { 510 return false; 511 } 512 513 call_.PerformOps(&ops); 514 return cq_.Pluck(&ops); 515 } 516 WritesDone()517 bool WritesDone() override { 518 ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; 519 ops.ClientSendClose(); 520 call_.PerformOps(&ops); 521 return cq_.Pluck(&ops); 522 } 523 524 /// See the ClientStreamingInterface.Finish method for semantics. 525 /// 526 /// Side effect: 527 /// - the \a ClientContext associated with this call is updated with 528 /// possible trailing metadata sent from the server. Finish()529 ::grpc::Status Finish() override { 530 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 531 ::grpc::internal::CallOpClientRecvStatus> 532 ops; 533 if (!context_->initial_metadata_received_) { 534 ops.RecvInitialMetadata(context_); 535 } 536 ::grpc::Status status; 537 ops.ClientRecvStatus(context_, &status); 538 call_.PerformOps(&ops); 539 GPR_CODEGEN_ASSERT(cq_.Pluck(&ops)); 540 return status; 541 } 542 543 private: 544 friend class internal::ClientReaderWriterFactory<W, R>; 545 546 ::grpc_impl::ClientContext* context_; 547 ::grpc_impl::CompletionQueue cq_; 548 ::grpc::internal::Call call_; 549 550 /// Block to create a stream and write the initial metadata and \a request 551 /// out. Note that \a context will be used to fill in custom initial metadata 552 /// used to send to the server when starting the call. ClientReaderWriter(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context)553 ClientReaderWriter(::grpc::ChannelInterface* channel, 554 const ::grpc::internal::RpcMethod& method, 555 ::grpc_impl::ClientContext* context) 556 : context_(context), 557 cq_(grpc_completion_queue_attributes{ 558 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, 559 nullptr}), // Pluckable cq 560 call_(channel->CreateCall(method, context, &cq_)) { 561 if (!context_->initial_metadata_corked_) { 562 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 563 ops; 564 ops.SendInitialMetadata(&context->send_initial_metadata_, 565 context->initial_metadata_flags()); 566 call_.PerformOps(&ops); 567 cq_.Pluck(&ops); 568 } 569 } 570 }; 571 572 /// Server-side interface for streaming reads of message of type \a R. 573 template <class R> 574 class ServerReaderInterface : public internal::ServerStreamingInterface, 575 public internal::ReaderInterface<R> {}; 576 577 /// Synchronous (blocking) server-side API for doing client-streaming RPCs, 578 /// where the incoming message stream coming from the client has messages of 579 /// type \a R. 580 template <class R> 581 class ServerReader final : public ServerReaderInterface<R> { 582 public: 583 /// See the \a ServerStreamingInterface.SendInitialMetadata method 584 /// for semantics. Note that initial metadata will be affected by the 585 /// \a ServerContext associated with this call. SendInitialMetadata()586 void SendInitialMetadata() override { 587 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 588 589 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 590 ops; 591 ops.SendInitialMetadata(&ctx_->initial_metadata_, 592 ctx_->initial_metadata_flags()); 593 if (ctx_->compression_level_set()) { 594 ops.set_compression_level(ctx_->compression_level()); 595 } 596 ctx_->sent_initial_metadata_ = true; 597 call_->PerformOps(&ops); 598 call_->cq()->Pluck(&ops); 599 } 600 NextMessageSize(uint32_t * sz)601 bool NextMessageSize(uint32_t* sz) override { 602 int result = call_->max_receive_message_size(); 603 *sz = (result > 0) ? result : UINT32_MAX; 604 return true; 605 } 606 Read(R * msg)607 bool Read(R* msg) override { 608 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> ops; 609 ops.RecvMessage(msg); 610 call_->PerformOps(&ops); 611 return call_->cq()->Pluck(&ops) && ops.got_message; 612 } 613 614 private: 615 ::grpc::internal::Call* const call_; 616 ServerContext* const ctx_; 617 618 template <class ServiceType, class RequestType, class ResponseType> 619 friend class ::grpc_impl::internal::ClientStreamingHandler; 620 ServerReader(::grpc::internal::Call * call,::grpc_impl::ServerContext * ctx)621 ServerReader(::grpc::internal::Call* call, ::grpc_impl::ServerContext* ctx) 622 : call_(call), ctx_(ctx) {} 623 }; 624 625 /// Server-side interface for streaming writes of message of type \a W. 626 template <class W> 627 class ServerWriterInterface : public internal::ServerStreamingInterface, 628 public internal::WriterInterface<W> {}; 629 630 /// Synchronous (blocking) server-side API for doing for doing a 631 /// server-streaming RPCs, where the outgoing message stream coming from the 632 /// server has messages of type \a W. 633 template <class W> 634 class ServerWriter final : public ServerWriterInterface<W> { 635 public: 636 /// See the \a ServerStreamingInterface.SendInitialMetadata method 637 /// for semantics. 638 /// Note that initial metadata will be affected by the 639 /// \a ServerContext associated with this call. SendInitialMetadata()640 void SendInitialMetadata() override { 641 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 642 643 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 644 ops; 645 ops.SendInitialMetadata(&ctx_->initial_metadata_, 646 ctx_->initial_metadata_flags()); 647 if (ctx_->compression_level_set()) { 648 ops.set_compression_level(ctx_->compression_level()); 649 } 650 ctx_->sent_initial_metadata_ = true; 651 call_->PerformOps(&ops); 652 call_->cq()->Pluck(&ops); 653 } 654 655 /// See the \a WriterInterface.Write method for semantics. 656 /// 657 /// Side effect: 658 /// Also sends initial metadata if not already sent (using the 659 /// \a ClientContext associated with this call to fill in values). 660 using internal::WriterInterface<W>::Write; Write(const W & msg,::grpc::WriteOptions options)661 bool Write(const W& msg, ::grpc::WriteOptions options) override { 662 if (options.is_last_message()) { 663 options.set_buffer_hint(); 664 } 665 666 if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { 667 return false; 668 } 669 if (!ctx_->sent_initial_metadata_) { 670 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 671 ctx_->initial_metadata_flags()); 672 if (ctx_->compression_level_set()) { 673 ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); 674 } 675 ctx_->sent_initial_metadata_ = true; 676 } 677 call_->PerformOps(&ctx_->pending_ops_); 678 // if this is the last message we defer the pluck until AFTER we start 679 // the trailing md op. This prevents hangs. See 680 // https://github.com/grpc/grpc/issues/11546 681 if (options.is_last_message()) { 682 ctx_->has_pending_ops_ = true; 683 return true; 684 } 685 ctx_->has_pending_ops_ = false; 686 return call_->cq()->Pluck(&ctx_->pending_ops_); 687 } 688 689 private: 690 ::grpc::internal::Call* const call_; 691 ::grpc_impl::ServerContext* const ctx_; 692 693 template <class ServiceType, class RequestType, class ResponseType> 694 friend class ::grpc_impl::internal::ServerStreamingHandler; 695 ServerWriter(::grpc::internal::Call * call,::grpc_impl::ServerContext * ctx)696 ServerWriter(::grpc::internal::Call* call, ::grpc_impl::ServerContext* ctx) 697 : call_(call), ctx_(ctx) {} 698 }; 699 700 /// Server-side interface for bi-directional streaming. 701 template <class W, class R> 702 class ServerReaderWriterInterface : public internal::ServerStreamingInterface, 703 public internal::WriterInterface<W>, 704 public internal::ReaderInterface<R> {}; 705 706 /// Actual implementation of bi-directional streaming 707 namespace internal { 708 template <class W, class R> 709 class ServerReaderWriterBody final { 710 public: ServerReaderWriterBody(grpc::internal::Call * call,::grpc_impl::ServerContext * ctx)711 ServerReaderWriterBody(grpc::internal::Call* call, 712 ::grpc_impl::ServerContext* ctx) 713 : call_(call), ctx_(ctx) {} 714 SendInitialMetadata()715 void SendInitialMetadata() { 716 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 717 718 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; 719 ops.SendInitialMetadata(&ctx_->initial_metadata_, 720 ctx_->initial_metadata_flags()); 721 if (ctx_->compression_level_set()) { 722 ops.set_compression_level(ctx_->compression_level()); 723 } 724 ctx_->sent_initial_metadata_ = true; 725 call_->PerformOps(&ops); 726 call_->cq()->Pluck(&ops); 727 } 728 NextMessageSize(uint32_t * sz)729 bool NextMessageSize(uint32_t* sz) { 730 int result = call_->max_receive_message_size(); 731 *sz = (result > 0) ? result : UINT32_MAX; 732 return true; 733 } 734 Read(R * msg)735 bool Read(R* msg) { 736 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> ops; 737 ops.RecvMessage(msg); 738 call_->PerformOps(&ops); 739 return call_->cq()->Pluck(&ops) && ops.got_message; 740 } 741 Write(const W & msg,::grpc::WriteOptions options)742 bool Write(const W& msg, ::grpc::WriteOptions options) { 743 if (options.is_last_message()) { 744 options.set_buffer_hint(); 745 } 746 if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { 747 return false; 748 } 749 if (!ctx_->sent_initial_metadata_) { 750 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 751 ctx_->initial_metadata_flags()); 752 if (ctx_->compression_level_set()) { 753 ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); 754 } 755 ctx_->sent_initial_metadata_ = true; 756 } 757 call_->PerformOps(&ctx_->pending_ops_); 758 // if this is the last message we defer the pluck until AFTER we start 759 // the trailing md op. This prevents hangs. See 760 // https://github.com/grpc/grpc/issues/11546 761 if (options.is_last_message()) { 762 ctx_->has_pending_ops_ = true; 763 return true; 764 } 765 ctx_->has_pending_ops_ = false; 766 return call_->cq()->Pluck(&ctx_->pending_ops_); 767 } 768 769 private: 770 grpc::internal::Call* const call_; 771 ::grpc_impl::ServerContext* const ctx_; 772 }; 773 774 } // namespace internal 775 776 /// Synchronous (blocking) server-side API for a bidirectional 777 /// streaming call, where the incoming message stream coming from the client has 778 /// messages of type \a R, and the outgoing message streaming coming from 779 /// the server has messages of type \a W. 780 template <class W, class R> 781 class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { 782 public: 783 /// See the \a ServerStreamingInterface.SendInitialMetadata method 784 /// for semantics. Note that initial metadata will be affected by the 785 /// \a ServerContext associated with this call. SendInitialMetadata()786 void SendInitialMetadata() override { body_.SendInitialMetadata(); } 787 NextMessageSize(uint32_t * sz)788 bool NextMessageSize(uint32_t* sz) override { 789 return body_.NextMessageSize(sz); 790 } 791 Read(R * msg)792 bool Read(R* msg) override { return body_.Read(msg); } 793 794 /// See the \a WriterInterface.Write(const W& msg, WriteOptions options) 795 /// method for semantics. 796 /// Side effect: 797 /// Also sends initial metadata if not already sent (using the \a 798 /// ServerContext associated with this call). 799 using internal::WriterInterface<W>::Write; Write(const W & msg,::grpc::WriteOptions options)800 bool Write(const W& msg, ::grpc::WriteOptions options) override { 801 return body_.Write(msg, options); 802 } 803 804 private: 805 internal::ServerReaderWriterBody<W, R> body_; 806 807 friend class ::grpc_impl::internal::TemplatedBidiStreamingHandler< 808 ServerReaderWriter<W, R>, false>; ServerReaderWriter(::grpc::internal::Call * call,::grpc_impl::ServerContext * ctx)809 ServerReaderWriter(::grpc::internal::Call* call, 810 ::grpc_impl::ServerContext* ctx) 811 : body_(call, ctx) {} 812 }; 813 814 /// A class to represent a flow-controlled unary call. This is something 815 /// of a hybrid between conventional unary and streaming. This is invoked 816 /// through a unary call on the client side, but the server responds to it 817 /// as though it were a single-ping-pong streaming call. The server can use 818 /// the \a NextMessageSize method to determine an upper-bound on the size of 819 /// the message. A key difference relative to streaming: ServerUnaryStreamer 820 /// must have exactly 1 Read and exactly 1 Write, in that order, to function 821 /// correctly. Otherwise, the RPC is in error. 822 template <class RequestType, class ResponseType> 823 class ServerUnaryStreamer final 824 : public ServerReaderWriterInterface<ResponseType, RequestType> { 825 public: 826 /// Block to send initial metadata to client. 827 /// Implicit input parameter: 828 /// - the \a ServerContext associated with this call will be used for 829 /// sending initial metadata. SendInitialMetadata()830 void SendInitialMetadata() override { body_.SendInitialMetadata(); } 831 832 /// Get an upper bound on the request message size from the client. NextMessageSize(uint32_t * sz)833 bool NextMessageSize(uint32_t* sz) override { 834 return body_.NextMessageSize(sz); 835 } 836 837 /// Read a message of type \a R into \a msg. Completion will be notified by \a 838 /// tag on the associated completion queue. 839 /// This is thread-safe with respect to \a Write or \a WritesDone methods. It 840 /// should not be called concurrently with other streaming APIs 841 /// on the same stream. It is not meaningful to call it concurrently 842 /// with another \a ReaderInterface::Read on the same stream since reads on 843 /// the same stream are delivered in order. 844 /// 845 /// \param[out] msg Where to eventually store the read message. 846 /// \param[in] tag The tag identifying the operation. Read(RequestType * request)847 bool Read(RequestType* request) override { 848 if (read_done_) { 849 return false; 850 } 851 read_done_ = true; 852 return body_.Read(request); 853 } 854 855 /// Block to write \a msg to the stream with WriteOptions \a options. 856 /// This is thread-safe with respect to \a ReaderInterface::Read 857 /// 858 /// \param msg The message to be written to the stream. 859 /// \param options The WriteOptions affecting the write operation. 860 /// 861 /// \return \a true on success, \a false when the stream has been closed. 862 using internal::WriterInterface<ResponseType>::Write; Write(const ResponseType & response,::grpc::WriteOptions options)863 bool Write(const ResponseType& response, 864 ::grpc::WriteOptions options) override { 865 if (write_done_ || !read_done_) { 866 return false; 867 } 868 write_done_ = true; 869 return body_.Write(response, options); 870 } 871 872 private: 873 internal::ServerReaderWriterBody<ResponseType, RequestType> body_; 874 bool read_done_; 875 bool write_done_; 876 877 friend class ::grpc_impl::internal::TemplatedBidiStreamingHandler< 878 ServerUnaryStreamer<RequestType, ResponseType>, true>; ServerUnaryStreamer(::grpc::internal::Call * call,::grpc_impl::ServerContext * ctx)879 ServerUnaryStreamer(::grpc::internal::Call* call, 880 ::grpc_impl::ServerContext* ctx) 881 : body_(call, ctx), read_done_(false), write_done_(false) {} 882 }; 883 884 /// A class to represent a flow-controlled server-side streaming call. 885 /// This is something of a hybrid between server-side and bidi streaming. 886 /// This is invoked through a server-side streaming call on the client side, 887 /// but the server responds to it as though it were a bidi streaming call that 888 /// must first have exactly 1 Read and then any number of Writes. 889 template <class RequestType, class ResponseType> 890 class ServerSplitStreamer final 891 : public ServerReaderWriterInterface<ResponseType, RequestType> { 892 public: 893 /// Block to send initial metadata to client. 894 /// Implicit input parameter: 895 /// - the \a ServerContext associated with this call will be used for 896 /// sending initial metadata. SendInitialMetadata()897 void SendInitialMetadata() override { body_.SendInitialMetadata(); } 898 899 /// Get an upper bound on the request message size from the client. NextMessageSize(uint32_t * sz)900 bool NextMessageSize(uint32_t* sz) override { 901 return body_.NextMessageSize(sz); 902 } 903 904 /// Read a message of type \a R into \a msg. Completion will be notified by \a 905 /// tag on the associated completion queue. 906 /// This is thread-safe with respect to \a Write or \a WritesDone methods. It 907 /// should not be called concurrently with other streaming APIs 908 /// on the same stream. It is not meaningful to call it concurrently 909 /// with another \a ReaderInterface::Read on the same stream since reads on 910 /// the same stream are delivered in order. 911 /// 912 /// \param[out] msg Where to eventually store the read message. 913 /// \param[in] tag The tag identifying the operation. Read(RequestType * request)914 bool Read(RequestType* request) override { 915 if (read_done_) { 916 return false; 917 } 918 read_done_ = true; 919 return body_.Read(request); 920 } 921 922 /// Block to write \a msg to the stream with WriteOptions \a options. 923 /// This is thread-safe with respect to \a ReaderInterface::Read 924 /// 925 /// \param msg The message to be written to the stream. 926 /// \param options The WriteOptions affecting the write operation. 927 /// 928 /// \return \a true on success, \a false when the stream has been closed. 929 using internal::WriterInterface<ResponseType>::Write; Write(const ResponseType & response,::grpc::WriteOptions options)930 bool Write(const ResponseType& response, 931 ::grpc::WriteOptions options) override { 932 return read_done_ && body_.Write(response, options); 933 } 934 935 private: 936 internal::ServerReaderWriterBody<ResponseType, RequestType> body_; 937 bool read_done_; 938 939 friend class ::grpc_impl::internal::TemplatedBidiStreamingHandler< 940 ServerSplitStreamer<RequestType, ResponseType>, false>; ServerSplitStreamer(::grpc::internal::Call * call,::grpc_impl::ServerContext * ctx)941 ServerSplitStreamer(::grpc::internal::Call* call, 942 ::grpc_impl::ServerContext* ctx) 943 : body_(call, ctx), read_done_(false) {} 944 }; 945 946 } // namespace grpc_impl 947 948 #endif // GRPCPP_IMPL_CODEGEN_SYNC_STREAM_IMPL_H 949