1 /* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #ifndef GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H 19 #define GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H 20 21 #include <grpcpp/impl/codegen/call.h> 22 #include <grpcpp/impl/codegen/channel_interface.h> 23 #include <grpcpp/impl/codegen/client_context.h> 24 #include <grpcpp/impl/codegen/completion_queue.h> 25 #include <grpcpp/impl/codegen/core_codegen_interface.h> 26 #include <grpcpp/impl/codegen/server_context.h> 27 #include <grpcpp/impl/codegen/service_type.h> 28 #include <grpcpp/impl/codegen/status.h> 29 30 namespace grpc { 31 32 namespace internal { 33 /// Common interface for all synchronous client side streaming. 34 class ClientStreamingInterface { 35 public: ~ClientStreamingInterface()36 virtual ~ClientStreamingInterface() {} 37 38 /// Block waiting until the stream finishes and a final status of the call is 39 /// available. 40 /// 41 /// It is appropriate to call this method exactly once when both: 42 /// * the calling code (client-side) has no more message to send 43 /// (this can be declared implicitly by calling this method, or 44 /// explicitly through an earlier call to <i>WritesDone</i> method of the 45 /// class in use, e.g. \a ClientWriterInterface::WritesDone or 46 /// \a ClientReaderWriterInterface::WritesDone). 47 /// * there are no more messages to be received from the server (which can 48 /// be known implicitly, or explicitly from an earlier call to \a 49 /// ReaderInterface::Read that returned "false"). 50 /// 51 /// This function will return either: 52 /// - when all incoming messages have been read and the server has 53 /// returned status. 54 /// - when the server has returned a non-OK status. 55 /// - OR when the call failed for some reason and the library generated a 56 /// status. 57 /// 58 /// Return values: 59 /// - \a Status contains the status code, message and details for the call 60 /// - the \a ClientContext associated with this call is updated with 61 /// possible trailing metadata sent from the server. 62 virtual ::grpc::Status Finish() = 0; 63 }; 64 65 /// Common interface for all synchronous server side streaming. 66 class ServerStreamingInterface { 67 public: ~ServerStreamingInterface()68 virtual ~ServerStreamingInterface() {} 69 70 /// Block to send initial metadata to client. 71 /// This call is optional, but if it is used, it cannot be used concurrently 72 /// with or after the \a Finish method. 73 /// 74 /// The initial metadata that will be sent to the client will be 75 /// taken from the \a ServerContext associated with the call. 76 virtual void SendInitialMetadata() = 0; 77 }; 78 79 /// An interface that yields a sequence of messages of type \a R. 80 template <class R> 81 class ReaderInterface { 82 public: ~ReaderInterface()83 virtual ~ReaderInterface() {} 84 85 /// Get an upper bound on the next message size available for reading on this 86 /// stream. 87 virtual bool NextMessageSize(uint32_t* sz) = 0; 88 89 /// Block to read a message and parse to \a msg. Returns \a true on success. 90 /// This is thread-safe with respect to \a Write or \WritesDone methods on 91 /// the same stream. It should not be called concurrently with another \a 92 /// Read on the same stream as the order of delivery will not be defined. 93 /// 94 /// \param[out] msg The read message. 95 /// 96 /// \return \a false when there will be no more incoming messages, either 97 /// because the other side has called \a WritesDone() or the stream has failed 98 /// (or been cancelled). 99 virtual bool Read(R* msg) = 0; 100 }; 101 102 /// An interface that can be fed a sequence of messages of type \a W. 103 template <class W> 104 class WriterInterface { 105 public: ~WriterInterface()106 virtual ~WriterInterface() {} 107 108 /// Block to write \a msg to the stream with WriteOptions \a options. 109 /// This is thread-safe with respect to \a ReaderInterface::Read 110 /// 111 /// \param msg The message to be written to the stream. 112 /// \param options The WriteOptions affecting the write operation. 113 /// 114 /// \return \a true on success, \a false when the stream has been closed. 115 virtual bool Write(const W& msg, ::grpc::WriteOptions options) = 0; 116 117 /// Block to write \a msg to the stream with default write options. 118 /// This is thread-safe with respect to \a ReaderInterface::Read 119 /// 120 /// \param msg The message to be written to the stream. 121 /// 122 /// \return \a true on success, \a false when the stream has been closed. Write(const W & msg)123 inline bool Write(const W& msg) { return Write(msg, ::grpc::WriteOptions()); } 124 125 /// Write \a msg and coalesce it with the writing of trailing metadata, using 126 /// WriteOptions \a options. 127 /// 128 /// For client, WriteLast is equivalent of performing Write and WritesDone in 129 /// a single step. \a msg and trailing metadata are coalesced and sent on wire 130 /// by calling this function. For server, WriteLast buffers the \a msg. 131 /// The writing of \a msg is held until the service handler returns, 132 /// where \a msg and trailing metadata are coalesced and sent on wire. 133 /// Note that WriteLast can only buffer \a msg up to the flow control window 134 /// size. If \a msg size is larger than the window size, it will be sent on 135 /// wire without buffering. 136 /// 137 /// \param[in] msg The message to be written to the stream. 138 /// \param[in] options The WriteOptions to be used to write this message. WriteLast(const W & msg,::grpc::WriteOptions options)139 void WriteLast(const W& msg, ::grpc::WriteOptions options) { 140 Write(msg, options.set_last_message()); 141 } 142 }; 143 144 } // namespace internal 145 146 /// Client-side interface for streaming reads of message of type \a R. 147 template <class R> 148 class ClientReaderInterface : public internal::ClientStreamingInterface, 149 public internal::ReaderInterface<R> { 150 public: 151 /// Block to wait for initial metadata from server. The received metadata 152 /// can only be accessed after this call returns. Should only be called before 153 /// the first read. Calling this method is optional, and if it is not called 154 /// the metadata will be available in ClientContext after the first read. 155 virtual void WaitForInitialMetadata() = 0; 156 }; 157 158 namespace internal { 159 template <class R> 160 class ClientReaderFactory { 161 public: 162 template <class W> Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const W & request)163 static ClientReader<R>* Create(::grpc::ChannelInterface* channel, 164 const ::grpc::internal::RpcMethod& method, 165 ::grpc::ClientContext* context, 166 const W& request) { 167 return new ClientReader<R>(channel, method, context, request); 168 } 169 }; 170 } // namespace internal 171 172 /// Synchronous (blocking) client-side API for doing server-streaming RPCs, 173 /// where the stream of messages coming from the server has messages 174 /// of type \a R. 175 template <class R> 176 class ClientReader final : public ClientReaderInterface<R> { 177 public: 178 /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for 179 /// semantics. 180 /// 181 // Side effect: 182 /// Once complete, the initial metadata read from 183 /// the server will be accessible through the \a ClientContext used to 184 /// construct this object. WaitForInitialMetadata()185 void WaitForInitialMetadata() override { 186 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 187 188 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 189 ops; 190 ops.RecvInitialMetadata(context_); 191 call_.PerformOps(&ops); 192 cq_.Pluck(&ops); /// status ignored 193 } 194 NextMessageSize(uint32_t * sz)195 bool NextMessageSize(uint32_t* sz) override { 196 int result = call_.max_receive_message_size(); 197 *sz = (result > 0) ? result : UINT32_MAX; 198 return true; 199 } 200 201 /// See the \a ReaderInterface.Read method for semantics. 202 /// Side effect: 203 /// This also receives initial metadata from the server, if not 204 /// already received (if initial metadata is received, it can be then 205 /// accessed through the \a ClientContext associated with this call). Read(R * msg)206 bool Read(R* msg) override { 207 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 208 ::grpc::internal::CallOpRecvMessage<R>> 209 ops; 210 if (!context_->initial_metadata_received_) { 211 ops.RecvInitialMetadata(context_); 212 } 213 ops.RecvMessage(msg); 214 call_.PerformOps(&ops); 215 return cq_.Pluck(&ops) && ops.got_message; 216 } 217 218 /// See the \a ClientStreamingInterface.Finish method for semantics. 219 /// 220 /// Side effect: 221 /// The \a ClientContext associated with this call is updated with 222 /// possible metadata received from the server. Finish()223 ::grpc::Status Finish() override { 224 ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops; 225 ::grpc::Status status; 226 ops.ClientRecvStatus(context_, &status); 227 call_.PerformOps(&ops); 228 GPR_CODEGEN_ASSERT(cq_.Pluck(&ops)); 229 return status; 230 } 231 232 private: 233 friend class internal::ClientReaderFactory<R>; 234 ::grpc::ClientContext* context_; 235 ::grpc::CompletionQueue cq_; 236 ::grpc::internal::Call call_; 237 238 /// Block to create a stream and write the initial metadata and \a request 239 /// out. Note that \a context will be used to fill in custom initial 240 /// metadata used to send to the server when starting the call. 241 template <class W> ClientReader(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const W & request)242 ClientReader(::grpc::ChannelInterface* channel, 243 const ::grpc::internal::RpcMethod& method, 244 ::grpc::ClientContext* context, const W& request) 245 : context_(context), 246 cq_(grpc_completion_queue_attributes{ 247 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, 248 nullptr}), // Pluckable cq 249 call_(channel->CreateCall(method, context, &cq_)) { 250 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 251 ::grpc::internal::CallOpSendMessage, 252 ::grpc::internal::CallOpClientSendClose> 253 ops; 254 ops.SendInitialMetadata(&context->send_initial_metadata_, 255 context->initial_metadata_flags()); 256 // TODO(ctiller): don't assert 257 GPR_CODEGEN_ASSERT(ops.SendMessagePtr(&request).ok()); 258 ops.ClientSendClose(); 259 call_.PerformOps(&ops); 260 cq_.Pluck(&ops); 261 } 262 }; 263 264 /// Client-side interface for streaming writes of message type \a W. 265 template <class W> 266 class ClientWriterInterface : public internal::ClientStreamingInterface, 267 public internal::WriterInterface<W> { 268 public: 269 /// Half close writing from the client. (signal that the stream of messages 270 /// coming from the client is complete). 271 /// Blocks until currently-pending writes are completed. 272 /// Thread safe with respect to \a ReaderInterface::Read operations only 273 /// 274 /// \return Whether the writes were successful. 275 virtual bool WritesDone() = 0; 276 }; 277 278 namespace internal { 279 template <class W> 280 class ClientWriterFactory { 281 public: 282 template <class R> Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,R * response)283 static ClientWriter<W>* Create(::grpc::ChannelInterface* channel, 284 const ::grpc::internal::RpcMethod& method, 285 ::grpc::ClientContext* context, R* response) { 286 return new ClientWriter<W>(channel, method, context, response); 287 } 288 }; 289 } // namespace internal 290 291 /// Synchronous (blocking) client-side API for doing client-streaming RPCs, 292 /// where the outgoing message stream coming from the client has messages of 293 /// type \a W. 294 template <class W> 295 class ClientWriter : public ClientWriterInterface<W> { 296 public: 297 /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for 298 /// semantics. 299 /// 300 // Side effect: 301 /// Once complete, the initial metadata read from the server will be 302 /// accessible through the \a ClientContext used to construct this object. WaitForInitialMetadata()303 void WaitForInitialMetadata() { 304 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 305 306 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 307 ops; 308 ops.RecvInitialMetadata(context_); 309 call_.PerformOps(&ops); 310 cq_.Pluck(&ops); // status ignored 311 } 312 313 /// See the WriterInterface.Write(const W& msg, WriteOptions options) method 314 /// for semantics. 315 /// 316 /// Side effect: 317 /// Also sends initial metadata if not already sent (using the 318 /// \a ClientContext associated with this call). 319 using internal::WriterInterface<W>::Write; Write(const W & msg,::grpc::WriteOptions options)320 bool Write(const W& msg, ::grpc::WriteOptions options) override { 321 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 322 ::grpc::internal::CallOpSendMessage, 323 ::grpc::internal::CallOpClientSendClose> 324 ops; 325 326 if (options.is_last_message()) { 327 options.set_buffer_hint(); 328 ops.ClientSendClose(); 329 } 330 if (context_->initial_metadata_corked_) { 331 ops.SendInitialMetadata(&context_->send_initial_metadata_, 332 context_->initial_metadata_flags()); 333 context_->set_initial_metadata_corked(false); 334 } 335 if (!ops.SendMessagePtr(&msg, options).ok()) { 336 return false; 337 } 338 339 call_.PerformOps(&ops); 340 return cq_.Pluck(&ops); 341 } 342 WritesDone()343 bool WritesDone() override { 344 ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; 345 ops.ClientSendClose(); 346 call_.PerformOps(&ops); 347 return cq_.Pluck(&ops); 348 } 349 350 /// See the ClientStreamingInterface.Finish method for semantics. 351 /// Side effects: 352 /// - Also receives initial metadata if not already received. 353 /// - Attempts to fill in the \a response parameter passed 354 /// to the constructor of this instance with the response 355 /// message from the server. Finish()356 ::grpc::Status Finish() override { 357 ::grpc::Status status; 358 if (!context_->initial_metadata_received_) { 359 finish_ops_.RecvInitialMetadata(context_); 360 } 361 finish_ops_.ClientRecvStatus(context_, &status); 362 call_.PerformOps(&finish_ops_); 363 GPR_CODEGEN_ASSERT(cq_.Pluck(&finish_ops_)); 364 return status; 365 } 366 367 private: 368 friend class internal::ClientWriterFactory<W>; 369 370 /// Block to create a stream (i.e. send request headers and other initial 371 /// metadata to the server). Note that \a context will be used to fill 372 /// in custom initial metadata. \a response will be filled in with the 373 /// single expected response message from the server upon a successful 374 /// call to the \a Finish method of this instance. 375 template <class R> ClientWriter(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,R * response)376 ClientWriter(::grpc::ChannelInterface* channel, 377 const ::grpc::internal::RpcMethod& method, 378 ::grpc::ClientContext* context, R* response) 379 : context_(context), 380 cq_(grpc_completion_queue_attributes{ 381 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, 382 nullptr}), // Pluckable cq 383 call_(channel->CreateCall(method, context, &cq_)) { 384 finish_ops_.RecvMessage(response); 385 finish_ops_.AllowNoMessage(); 386 387 if (!context_->initial_metadata_corked_) { 388 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 389 ops; 390 ops.SendInitialMetadata(&context->send_initial_metadata_, 391 context->initial_metadata_flags()); 392 call_.PerformOps(&ops); 393 cq_.Pluck(&ops); 394 } 395 } 396 397 ::grpc::ClientContext* context_; 398 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 399 ::grpc::internal::CallOpGenericRecvMessage, 400 ::grpc::internal::CallOpClientRecvStatus> 401 finish_ops_; 402 ::grpc::CompletionQueue cq_; 403 ::grpc::internal::Call call_; 404 }; 405 406 /// Client-side interface for bi-directional streaming with 407 /// client-to-server stream messages of type \a W and 408 /// server-to-client stream messages of type \a R. 409 template <class W, class R> 410 class ClientReaderWriterInterface : public internal::ClientStreamingInterface, 411 public internal::WriterInterface<W>, 412 public internal::ReaderInterface<R> { 413 public: 414 /// Block to wait for initial metadata from server. The received metadata 415 /// can only be accessed after this call returns. Should only be called before 416 /// the first read. Calling this method is optional, and if it is not called 417 /// the metadata will be available in ClientContext after the first read. 418 virtual void WaitForInitialMetadata() = 0; 419 420 /// Half close writing from the client. (signal that the stream of messages 421 /// coming from the client is complete). 422 /// Blocks until currently-pending writes are completed. 423 /// Thread-safe with respect to \a ReaderInterface::Read 424 /// 425 /// \return Whether the writes were successful. 426 virtual bool WritesDone() = 0; 427 }; 428 429 namespace internal { 430 template <class W, class R> 431 class ClientReaderWriterFactory { 432 public: Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context)433 static ClientReaderWriter<W, R>* Create( 434 ::grpc::ChannelInterface* channel, 435 const ::grpc::internal::RpcMethod& method, 436 ::grpc::ClientContext* context) { 437 return new ClientReaderWriter<W, R>(channel, method, context); 438 } 439 }; 440 } // namespace internal 441 442 /// Synchronous (blocking) client-side API for bi-directional streaming RPCs, 443 /// where the outgoing message stream coming from the client has messages of 444 /// type \a W, and the incoming messages stream coming from the server has 445 /// messages of type \a R. 446 template <class W, class R> 447 class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> { 448 public: 449 /// Block waiting to read initial metadata from the server. 450 /// This call is optional, but if it is used, it cannot be used concurrently 451 /// with or after the \a Finish method. 452 /// 453 /// Once complete, the initial metadata read from the server will be 454 /// accessible through the \a ClientContext used to construct this object. WaitForInitialMetadata()455 void WaitForInitialMetadata() override { 456 GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_); 457 458 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata> 459 ops; 460 ops.RecvInitialMetadata(context_); 461 call_.PerformOps(&ops); 462 cq_.Pluck(&ops); // status ignored 463 } 464 NextMessageSize(uint32_t * sz)465 bool NextMessageSize(uint32_t* sz) override { 466 int result = call_.max_receive_message_size(); 467 *sz = (result > 0) ? result : UINT32_MAX; 468 return true; 469 } 470 471 /// See the \a ReaderInterface.Read method for semantics. 472 /// Side effect: 473 /// Also receives initial metadata if not already received (updates the \a 474 /// ClientContext associated with this call in that case). Read(R * msg)475 bool Read(R* msg) override { 476 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 477 ::grpc::internal::CallOpRecvMessage<R>> 478 ops; 479 if (!context_->initial_metadata_received_) { 480 ops.RecvInitialMetadata(context_); 481 } 482 ops.RecvMessage(msg); 483 call_.PerformOps(&ops); 484 return cq_.Pluck(&ops) && ops.got_message; 485 } 486 487 /// See the \a WriterInterface.Write method for semantics. 488 /// 489 /// Side effect: 490 /// Also sends initial metadata if not already sent (using the 491 /// \a ClientContext associated with this call to fill in values). 492 using internal::WriterInterface<W>::Write; Write(const W & msg,::grpc::WriteOptions options)493 bool Write(const W& msg, ::grpc::WriteOptions options) override { 494 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 495 ::grpc::internal::CallOpSendMessage, 496 ::grpc::internal::CallOpClientSendClose> 497 ops; 498 499 if (options.is_last_message()) { 500 options.set_buffer_hint(); 501 ops.ClientSendClose(); 502 } 503 if (context_->initial_metadata_corked_) { 504 ops.SendInitialMetadata(&context_->send_initial_metadata_, 505 context_->initial_metadata_flags()); 506 context_->set_initial_metadata_corked(false); 507 } 508 if (!ops.SendMessagePtr(&msg, options).ok()) { 509 return false; 510 } 511 512 call_.PerformOps(&ops); 513 return cq_.Pluck(&ops); 514 } 515 WritesDone()516 bool WritesDone() override { 517 ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops; 518 ops.ClientSendClose(); 519 call_.PerformOps(&ops); 520 return cq_.Pluck(&ops); 521 } 522 523 /// See the ClientStreamingInterface.Finish method for semantics. 524 /// 525 /// Side effect: 526 /// - the \a ClientContext associated with this call is updated with 527 /// possible trailing metadata sent from the server. Finish()528 ::grpc::Status Finish() override { 529 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata, 530 ::grpc::internal::CallOpClientRecvStatus> 531 ops; 532 if (!context_->initial_metadata_received_) { 533 ops.RecvInitialMetadata(context_); 534 } 535 ::grpc::Status status; 536 ops.ClientRecvStatus(context_, &status); 537 call_.PerformOps(&ops); 538 GPR_CODEGEN_ASSERT(cq_.Pluck(&ops)); 539 return status; 540 } 541 542 private: 543 friend class internal::ClientReaderWriterFactory<W, R>; 544 545 ::grpc::ClientContext* context_; 546 ::grpc::CompletionQueue cq_; 547 ::grpc::internal::Call call_; 548 549 /// Block to create a stream and write the initial metadata and \a request 550 /// out. Note that \a context will be used to fill in custom initial metadata 551 /// used to send to the server when starting the call. ClientReaderWriter(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context)552 ClientReaderWriter(::grpc::ChannelInterface* channel, 553 const ::grpc::internal::RpcMethod& method, 554 ::grpc::ClientContext* context) 555 : context_(context), 556 cq_(grpc_completion_queue_attributes{ 557 GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, 558 nullptr}), // Pluckable cq 559 call_(channel->CreateCall(method, context, &cq_)) { 560 if (!context_->initial_metadata_corked_) { 561 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 562 ops; 563 ops.SendInitialMetadata(&context->send_initial_metadata_, 564 context->initial_metadata_flags()); 565 call_.PerformOps(&ops); 566 cq_.Pluck(&ops); 567 } 568 } 569 }; 570 571 /// Server-side interface for streaming reads of message of type \a R. 572 template <class R> 573 class ServerReaderInterface : public internal::ServerStreamingInterface, 574 public internal::ReaderInterface<R> {}; 575 576 /// Synchronous (blocking) server-side API for doing client-streaming RPCs, 577 /// where the incoming message stream coming from the client has messages of 578 /// type \a R. 579 template <class R> 580 class ServerReader final : public ServerReaderInterface<R> { 581 public: 582 /// See the \a ServerStreamingInterface.SendInitialMetadata method 583 /// for semantics. Note that initial metadata will be affected by the 584 /// \a ServerContext associated with this call. SendInitialMetadata()585 void SendInitialMetadata() override { 586 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 587 588 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 589 ops; 590 ops.SendInitialMetadata(&ctx_->initial_metadata_, 591 ctx_->initial_metadata_flags()); 592 if (ctx_->compression_level_set()) { 593 ops.set_compression_level(ctx_->compression_level()); 594 } 595 ctx_->sent_initial_metadata_ = true; 596 call_->PerformOps(&ops); 597 call_->cq()->Pluck(&ops); 598 } 599 NextMessageSize(uint32_t * sz)600 bool NextMessageSize(uint32_t* sz) override { 601 int result = call_->max_receive_message_size(); 602 *sz = (result > 0) ? result : UINT32_MAX; 603 return true; 604 } 605 Read(R * msg)606 bool Read(R* msg) override { 607 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> ops; 608 ops.RecvMessage(msg); 609 call_->PerformOps(&ops); 610 return call_->cq()->Pluck(&ops) && ops.got_message; 611 } 612 613 private: 614 ::grpc::internal::Call* const call_; 615 ServerContext* const ctx_; 616 617 template <class ServiceType, class RequestType, class ResponseType> 618 friend class internal::ClientStreamingHandler; 619 ServerReader(::grpc::internal::Call * call,::grpc::ServerContext * ctx)620 ServerReader(::grpc::internal::Call* call, ::grpc::ServerContext* ctx) 621 : call_(call), ctx_(ctx) {} 622 }; 623 624 /// Server-side interface for streaming writes of message of type \a W. 625 template <class W> 626 class ServerWriterInterface : public internal::ServerStreamingInterface, 627 public internal::WriterInterface<W> {}; 628 629 /// Synchronous (blocking) server-side API for doing for doing a 630 /// server-streaming RPCs, where the outgoing message stream coming from the 631 /// server has messages of type \a W. 632 template <class W> 633 class ServerWriter final : public ServerWriterInterface<W> { 634 public: 635 /// See the \a ServerStreamingInterface.SendInitialMetadata method 636 /// for semantics. 637 /// Note that initial metadata will be affected by the 638 /// \a ServerContext associated with this call. SendInitialMetadata()639 void SendInitialMetadata() override { 640 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 641 642 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 643 ops; 644 ops.SendInitialMetadata(&ctx_->initial_metadata_, 645 ctx_->initial_metadata_flags()); 646 if (ctx_->compression_level_set()) { 647 ops.set_compression_level(ctx_->compression_level()); 648 } 649 ctx_->sent_initial_metadata_ = true; 650 call_->PerformOps(&ops); 651 call_->cq()->Pluck(&ops); 652 } 653 654 /// See the \a WriterInterface.Write method for semantics. 655 /// 656 /// Side effect: 657 /// Also sends initial metadata if not already sent (using the 658 /// \a ClientContext associated with this call to fill in values). 659 using internal::WriterInterface<W>::Write; Write(const W & msg,::grpc::WriteOptions options)660 bool Write(const W& msg, ::grpc::WriteOptions options) override { 661 if (options.is_last_message()) { 662 options.set_buffer_hint(); 663 } 664 665 if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { 666 return false; 667 } 668 if (!ctx_->sent_initial_metadata_) { 669 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 670 ctx_->initial_metadata_flags()); 671 if (ctx_->compression_level_set()) { 672 ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); 673 } 674 ctx_->sent_initial_metadata_ = true; 675 } 676 call_->PerformOps(&ctx_->pending_ops_); 677 // if this is the last message we defer the pluck until AFTER we start 678 // the trailing md op. This prevents hangs. See 679 // https://github.com/grpc/grpc/issues/11546 680 if (options.is_last_message()) { 681 ctx_->has_pending_ops_ = true; 682 return true; 683 } 684 ctx_->has_pending_ops_ = false; 685 return call_->cq()->Pluck(&ctx_->pending_ops_); 686 } 687 688 private: 689 ::grpc::internal::Call* const call_; 690 ::grpc::ServerContext* const ctx_; 691 692 template <class ServiceType, class RequestType, class ResponseType> 693 friend class internal::ServerStreamingHandler; 694 ServerWriter(::grpc::internal::Call * call,::grpc::ServerContext * ctx)695 ServerWriter(::grpc::internal::Call* call, ::grpc::ServerContext* ctx) 696 : call_(call), ctx_(ctx) {} 697 }; 698 699 /// Server-side interface for bi-directional streaming. 700 template <class W, class R> 701 class ServerReaderWriterInterface : public internal::ServerStreamingInterface, 702 public internal::WriterInterface<W>, 703 public internal::ReaderInterface<R> {}; 704 705 /// Actual implementation of bi-directional streaming 706 namespace internal { 707 template <class W, class R> 708 class ServerReaderWriterBody final { 709 public: ServerReaderWriterBody(grpc::internal::Call * call,::grpc::ServerContext * ctx)710 ServerReaderWriterBody(grpc::internal::Call* call, ::grpc::ServerContext* ctx) 711 : call_(call), ctx_(ctx) {} 712 SendInitialMetadata()713 void SendInitialMetadata() { 714 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 715 716 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops; 717 ops.SendInitialMetadata(&ctx_->initial_metadata_, 718 ctx_->initial_metadata_flags()); 719 if (ctx_->compression_level_set()) { 720 ops.set_compression_level(ctx_->compression_level()); 721 } 722 ctx_->sent_initial_metadata_ = true; 723 call_->PerformOps(&ops); 724 call_->cq()->Pluck(&ops); 725 } 726 NextMessageSize(uint32_t * sz)727 bool NextMessageSize(uint32_t* sz) { 728 int result = call_->max_receive_message_size(); 729 *sz = (result > 0) ? result : UINT32_MAX; 730 return true; 731 } 732 Read(R * msg)733 bool Read(R* msg) { 734 ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> ops; 735 ops.RecvMessage(msg); 736 call_->PerformOps(&ops); 737 return call_->cq()->Pluck(&ops) && ops.got_message; 738 } 739 Write(const W & msg,::grpc::WriteOptions options)740 bool Write(const W& msg, ::grpc::WriteOptions options) { 741 if (options.is_last_message()) { 742 options.set_buffer_hint(); 743 } 744 if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) { 745 return false; 746 } 747 if (!ctx_->sent_initial_metadata_) { 748 ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 749 ctx_->initial_metadata_flags()); 750 if (ctx_->compression_level_set()) { 751 ctx_->pending_ops_.set_compression_level(ctx_->compression_level()); 752 } 753 ctx_->sent_initial_metadata_ = true; 754 } 755 call_->PerformOps(&ctx_->pending_ops_); 756 // if this is the last message we defer the pluck until AFTER we start 757 // the trailing md op. This prevents hangs. See 758 // https://github.com/grpc/grpc/issues/11546 759 if (options.is_last_message()) { 760 ctx_->has_pending_ops_ = true; 761 return true; 762 } 763 ctx_->has_pending_ops_ = false; 764 return call_->cq()->Pluck(&ctx_->pending_ops_); 765 } 766 767 private: 768 grpc::internal::Call* const call_; 769 ::grpc::ServerContext* const ctx_; 770 }; 771 772 } // namespace internal 773 774 /// Synchronous (blocking) server-side API for a bidirectional 775 /// streaming call, where the incoming message stream coming from the client has 776 /// messages of type \a R, and the outgoing message streaming coming from 777 /// the server has messages of type \a W. 778 template <class W, class R> 779 class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> { 780 public: 781 /// See the \a ServerStreamingInterface.SendInitialMetadata method 782 /// for semantics. Note that initial metadata will be affected by the 783 /// \a ServerContext associated with this call. SendInitialMetadata()784 void SendInitialMetadata() override { body_.SendInitialMetadata(); } 785 NextMessageSize(uint32_t * sz)786 bool NextMessageSize(uint32_t* sz) override { 787 return body_.NextMessageSize(sz); 788 } 789 Read(R * msg)790 bool Read(R* msg) override { return body_.Read(msg); } 791 792 /// See the \a WriterInterface.Write(const W& msg, WriteOptions options) 793 /// method for semantics. 794 /// Side effect: 795 /// Also sends initial metadata if not already sent (using the \a 796 /// ServerContext associated with this call). 797 using internal::WriterInterface<W>::Write; Write(const W & msg,::grpc::WriteOptions options)798 bool Write(const W& msg, ::grpc::WriteOptions options) override { 799 return body_.Write(msg, options); 800 } 801 802 private: 803 internal::ServerReaderWriterBody<W, R> body_; 804 805 friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>, 806 false>; ServerReaderWriter(::grpc::internal::Call * call,::grpc::ServerContext * ctx)807 ServerReaderWriter(::grpc::internal::Call* call, ::grpc::ServerContext* ctx) 808 : body_(call, ctx) {} 809 }; 810 811 /// A class to represent a flow-controlled unary call. This is something 812 /// of a hybrid between conventional unary and streaming. This is invoked 813 /// through a unary call on the client side, but the server responds to it 814 /// as though it were a single-ping-pong streaming call. The server can use 815 /// the \a NextMessageSize method to determine an upper-bound on the size of 816 /// the message. A key difference relative to streaming: ServerUnaryStreamer 817 /// must have exactly 1 Read and exactly 1 Write, in that order, to function 818 /// correctly. Otherwise, the RPC is in error. 819 template <class RequestType, class ResponseType> 820 class ServerUnaryStreamer final 821 : public ServerReaderWriterInterface<ResponseType, RequestType> { 822 public: 823 /// Block to send initial metadata to client. 824 /// Implicit input parameter: 825 /// - the \a ServerContext associated with this call will be used for 826 /// sending initial metadata. SendInitialMetadata()827 void SendInitialMetadata() override { body_.SendInitialMetadata(); } 828 829 /// Get an upper bound on the request message size from the client. NextMessageSize(uint32_t * sz)830 bool NextMessageSize(uint32_t* sz) override { 831 return body_.NextMessageSize(sz); 832 } 833 834 /// Read a message of type \a R into \a msg. Completion will be notified by \a 835 /// tag on the associated completion queue. 836 /// This is thread-safe with respect to \a Write or \a WritesDone methods. It 837 /// should not be called concurrently with other streaming APIs 838 /// on the same stream. It is not meaningful to call it concurrently 839 /// with another \a ReaderInterface::Read on the same stream since reads on 840 /// the same stream are delivered in order. 841 /// 842 /// \param[out] msg Where to eventually store the read message. 843 /// \param[in] tag The tag identifying the operation. Read(RequestType * request)844 bool Read(RequestType* request) override { 845 if (read_done_) { 846 return false; 847 } 848 read_done_ = true; 849 return body_.Read(request); 850 } 851 852 /// Block to write \a msg to the stream with WriteOptions \a options. 853 /// This is thread-safe with respect to \a ReaderInterface::Read 854 /// 855 /// \param msg The message to be written to the stream. 856 /// \param options The WriteOptions affecting the write operation. 857 /// 858 /// \return \a true on success, \a false when the stream has been closed. 859 using internal::WriterInterface<ResponseType>::Write; Write(const ResponseType & response,::grpc::WriteOptions options)860 bool Write(const ResponseType& response, 861 ::grpc::WriteOptions options) override { 862 if (write_done_ || !read_done_) { 863 return false; 864 } 865 write_done_ = true; 866 return body_.Write(response, options); 867 } 868 869 private: 870 internal::ServerReaderWriterBody<ResponseType, RequestType> body_; 871 bool read_done_; 872 bool write_done_; 873 874 friend class internal::TemplatedBidiStreamingHandler< 875 ServerUnaryStreamer<RequestType, ResponseType>, true>; ServerUnaryStreamer(::grpc::internal::Call * call,::grpc::ServerContext * ctx)876 ServerUnaryStreamer(::grpc::internal::Call* call, ::grpc::ServerContext* ctx) 877 : body_(call, ctx), read_done_(false), write_done_(false) {} 878 }; 879 880 /// A class to represent a flow-controlled server-side streaming call. 881 /// This is something of a hybrid between server-side and bidi streaming. 882 /// This is invoked through a server-side streaming call on the client side, 883 /// but the server responds to it as though it were a bidi streaming call that 884 /// must first have exactly 1 Read and then any number of Writes. 885 template <class RequestType, class ResponseType> 886 class ServerSplitStreamer final 887 : public ServerReaderWriterInterface<ResponseType, RequestType> { 888 public: 889 /// Block to send initial metadata to client. 890 /// Implicit input parameter: 891 /// - the \a ServerContext associated with this call will be used for 892 /// sending initial metadata. SendInitialMetadata()893 void SendInitialMetadata() override { body_.SendInitialMetadata(); } 894 895 /// Get an upper bound on the request message size from the client. NextMessageSize(uint32_t * sz)896 bool NextMessageSize(uint32_t* sz) override { 897 return body_.NextMessageSize(sz); 898 } 899 900 /// Read a message of type \a R into \a msg. Completion will be notified by \a 901 /// tag on the associated completion queue. 902 /// This is thread-safe with respect to \a Write or \a WritesDone methods. It 903 /// should not be called concurrently with other streaming APIs 904 /// on the same stream. It is not meaningful to call it concurrently 905 /// with another \a ReaderInterface::Read on the same stream since reads on 906 /// the same stream are delivered in order. 907 /// 908 /// \param[out] msg Where to eventually store the read message. 909 /// \param[in] tag The tag identifying the operation. Read(RequestType * request)910 bool Read(RequestType* request) override { 911 if (read_done_) { 912 return false; 913 } 914 read_done_ = true; 915 return body_.Read(request); 916 } 917 918 /// Block to write \a msg to the stream with WriteOptions \a options. 919 /// This is thread-safe with respect to \a ReaderInterface::Read 920 /// 921 /// \param msg The message to be written to the stream. 922 /// \param options The WriteOptions affecting the write operation. 923 /// 924 /// \return \a true on success, \a false when the stream has been closed. 925 using internal::WriterInterface<ResponseType>::Write; Write(const ResponseType & response,::grpc::WriteOptions options)926 bool Write(const ResponseType& response, 927 ::grpc::WriteOptions options) override { 928 return read_done_ && body_.Write(response, options); 929 } 930 931 private: 932 internal::ServerReaderWriterBody<ResponseType, RequestType> body_; 933 bool read_done_; 934 935 friend class internal::TemplatedBidiStreamingHandler< 936 ServerSplitStreamer<RequestType, ResponseType>, false>; ServerSplitStreamer(::grpc::internal::Call * call,::grpc::ServerContext * ctx)937 ServerSplitStreamer(::grpc::internal::Call* call, ::grpc::ServerContext* ctx) 938 : body_(call, ctx), read_done_(false) {} 939 }; 940 941 } // namespace grpc 942 943 #endif // GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H 944