• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_SYNC_STREAM_H
20 #define GRPCPP_SUPPORT_SYNC_STREAM_H
21 
22 #include <grpcpp/client_context.h>
23 #include <grpcpp/completion_queue.h>
24 #include <grpcpp/impl/call.h>
25 #include <grpcpp/impl/codegen/channel_interface.h>
26 #include <grpcpp/impl/service_type.h>
27 #include <grpcpp/server_context.h>
28 #include <grpcpp/support/status.h>
29 
30 #include "absl/log/absl_check.h"
31 
32 namespace grpc {
33 
34 namespace internal {
35 /// Common interface for all synchronous client side streaming.
36 class ClientStreamingInterface {
37  public:
~ClientStreamingInterface()38   virtual ~ClientStreamingInterface() {}
39 
40   /// Block waiting until the stream finishes and a final status of the call is
41   /// available.
42   ///
43   /// It is appropriate to call this method exactly once when both:
44   ///   * the calling code (client-side) has no more message to send
45   ///     (this can be declared implicitly by calling this method, or
46   ///     explicitly through an earlier call to <i>WritesDone</i> method of the
47   ///     class in use, e.g. \a ClientWriterInterface::WritesDone or
48   ///     \a ClientReaderWriterInterface::WritesDone).
49   ///   * there are no more messages to be received from the server (which can
50   ///     be known implicitly, or explicitly from an earlier call to \a
51   ///     ReaderInterface::Read that returned "false").
52   ///
53   /// This function will return either:
54   /// - when all incoming messages have been read and the server has
55   ///   returned status.
56   /// - when the server has returned a non-OK status.
57   /// - OR when the call failed for some reason and the library generated a
58   ///   status.
59   ///
60   /// Return values:
61   ///   - \a Status contains the status code, message and details for the call
62   ///   - the \a ClientContext associated with this call is updated with
63   ///     possible trailing metadata sent from the server.
64   virtual grpc::Status Finish() = 0;
65 };
66 
67 /// Common interface for all synchronous server side streaming.
68 class ServerStreamingInterface {
69  public:
~ServerStreamingInterface()70   virtual ~ServerStreamingInterface() {}
71 
72   /// Block to send initial metadata to client.
73   /// This call is optional, but if it is used, it cannot be used concurrently
74   /// with or after the \a Finish method.
75   ///
76   /// The initial metadata that will be sent to the client will be
77   /// taken from the \a ServerContext associated with the call.
78   virtual void SendInitialMetadata() = 0;
79 };
80 
81 /// An interface that yields a sequence of messages of type \a R.
82 template <class R>
83 class ReaderInterface {
84  public:
~ReaderInterface()85   virtual ~ReaderInterface() {}
86 
87   /// Get an upper bound on the next message size available for reading on this
88   /// stream.
89   virtual bool NextMessageSize(uint32_t* sz) = 0;
90 
91   /// Block to read a message and parse to \a msg. Returns \a true on success.
92   /// This is thread-safe with respect to \a Write or \WritesDone methods on
93   /// the same stream. It should not be called concurrently with another \a
94   /// Read on the same stream as the order of delivery will not be defined.
95   ///
96   /// \param[out] msg The read message.
97   ///
98   /// \return \a false when there will be no more incoming messages, either
99   /// because the other side has called \a WritesDone() or the stream has failed
100   /// (or been cancelled).
101   virtual bool Read(R* msg) = 0;
102 };
103 
104 /// An interface that can be fed a sequence of messages of type \a W.
105 template <class W>
106 class WriterInterface {
107  public:
~WriterInterface()108   virtual ~WriterInterface() {}
109 
110   /// Block to write \a msg to the stream with WriteOptions \a options.
111   /// This is thread-safe with respect to \a ReaderInterface::Read
112   ///
113   /// \param msg The message to be written to the stream.
114   /// \param options The WriteOptions affecting the write operation.
115   ///
116   /// \return \a true on success, \a false when the stream has been closed.
117   virtual bool Write(const W& msg, grpc::WriteOptions options) = 0;
118 
119   /// Block to write \a msg to the stream with default write options.
120   /// This is thread-safe with respect to \a ReaderInterface::Read
121   ///
122   /// \param msg The message to be written to the stream.
123   ///
124   /// \return \a true on success, \a false when the stream has been closed.
Write(const W & msg)125   inline bool Write(const W& msg) { return Write(msg, grpc::WriteOptions()); }
126 
127   /// Write \a msg and coalesce it with the writing of trailing metadata, using
128   /// WriteOptions \a options.
129   ///
130   /// For client, WriteLast is equivalent of performing Write and WritesDone in
131   /// a single step. \a msg and trailing metadata are coalesced and sent on wire
132   /// by calling this function. For server, WriteLast buffers the \a msg.
133   /// The writing of \a msg is held until the service handler returns,
134   /// where \a msg and trailing metadata are coalesced and sent on wire.
135   /// Note that WriteLast can only buffer \a msg up to the flow control window
136   /// size. If \a msg size is larger than the window size, it will be sent on
137   /// wire without buffering.
138   ///
139   /// \param[in] msg The message to be written to the stream.
140   /// \param[in] options The WriteOptions to be used to write this message.
WriteLast(const W & msg,grpc::WriteOptions options)141   void WriteLast(const W& msg, grpc::WriteOptions options) {
142     Write(msg, options.set_last_message());
143   }
144 };
145 
146 }  // namespace internal
147 
148 /// Client-side interface for streaming reads of message of type \a R.
149 template <class R>
150 class ClientReaderInterface : public internal::ClientStreamingInterface,
151                               public internal::ReaderInterface<R> {
152  public:
153   /// Block to wait for initial metadata from server. The received metadata
154   /// can only be accessed after this call returns. Should only be called before
155   /// the first read. Calling this method is optional, and if it is not called
156   /// the metadata will be available in ClientContext after the first read.
157   virtual void WaitForInitialMetadata() = 0;
158 };
159 
160 namespace internal {
161 template <class R>
162 class ClientReaderFactory {
163  public:
164   template <class W>
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request)165   static ClientReader<R>* Create(grpc::ChannelInterface* channel,
166                                  const grpc::internal::RpcMethod& method,
167                                  grpc::ClientContext* context,
168                                  const W& request) {
169     return new ClientReader<R>(channel, method, context, request);
170   }
171 };
172 }  // namespace internal
173 
174 /// Synchronous (blocking) client-side API for doing server-streaming RPCs,
175 /// where the stream of messages coming from the server has messages
176 /// of type \a R.
177 template <class R>
178 class ClientReader final : public ClientReaderInterface<R> {
179  public:
180   /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
181   /// semantics.
182   ///
183   //  Side effect:
184   ///   Once complete, the initial metadata read from
185   ///   the server will be accessible through the \a ClientContext used to
186   ///   construct this object.
WaitForInitialMetadata()187   void WaitForInitialMetadata() override {
188     ABSL_CHECK(!context_->initial_metadata_received_);
189 
190     grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops;
191     ops.RecvInitialMetadata(context_);
192     call_.PerformOps(&ops);
193     cq_.Pluck(&ops);  /// status ignored
194   }
195 
NextMessageSize(uint32_t * sz)196   bool NextMessageSize(uint32_t* sz) override {
197     int result = call_.max_receive_message_size();
198     *sz = (result > 0) ? result : UINT32_MAX;
199     return true;
200   }
201 
202   /// See the \a ReaderInterface.Read method for semantics.
203   /// Side effect:
204   ///   This also receives initial metadata from the server, if not
205   ///   already received (if initial metadata is received, it can be then
206   ///   accessed through the \a ClientContext associated with this call).
Read(R * msg)207   bool Read(R* msg) override {
208     grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
209                               grpc::internal::CallOpRecvMessage<R>>
210         ops;
211     if (!context_->initial_metadata_received_) {
212       ops.RecvInitialMetadata(context_);
213     }
214     ops.RecvMessage(msg);
215     call_.PerformOps(&ops);
216     return cq_.Pluck(&ops) && ops.got_message;
217   }
218 
219   /// See the \a ClientStreamingInterface.Finish method for semantics.
220   ///
221   /// Side effect:
222   ///   The \a ClientContext associated with this call is updated with
223   ///   possible metadata received from the server.
Finish()224   grpc::Status Finish() override {
225     grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
226                               grpc::internal::CallOpClientRecvStatus>
227         ops;
228     if (!context_->initial_metadata_received_) {
229       ops.RecvInitialMetadata(context_);
230     }
231     grpc::Status status;
232     ops.ClientRecvStatus(context_, &status);
233     call_.PerformOps(&ops);
234     ABSL_CHECK(cq_.Pluck(&ops));
235     return status;
236   }
237 
238  private:
239   friend class internal::ClientReaderFactory<R>;
240   grpc::ClientContext* context_;
241   grpc::CompletionQueue cq_;
242   grpc::internal::Call call_;
243 
244   /// Block to create a stream and write the initial metadata and \a request
245   /// out. Note that \a context will be used to fill in custom initial
246   /// metadata used to send to the server when starting the call.
247   template <class W>
ClientReader(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request)248   ClientReader(grpc::ChannelInterface* channel,
249                const grpc::internal::RpcMethod& method,
250                grpc::ClientContext* context, const W& request)
251       : context_(context),
252         cq_(grpc_completion_queue_attributes{
253             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
254             nullptr}),  // Pluckable cq
255         call_(channel->CreateCall(method, context, &cq_)) {
256     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
257                               grpc::internal::CallOpSendMessage,
258                               grpc::internal::CallOpClientSendClose>
259         ops;
260     ops.SendInitialMetadata(&context->send_initial_metadata_,
261                             context->initial_metadata_flags());
262     // TODO(ctiller): don't assert
263     ABSL_CHECK(ops.SendMessagePtr(&request).ok());
264     ops.ClientSendClose();
265     call_.PerformOps(&ops);
266     cq_.Pluck(&ops);
267   }
268 };
269 
270 /// Client-side interface for streaming writes of message type \a W.
271 template <class W>
272 class ClientWriterInterface : public internal::ClientStreamingInterface,
273                               public internal::WriterInterface<W> {
274  public:
275   /// Half close writing from the client. (signal that the stream of messages
276   /// coming from the client is complete).
277   /// Blocks until currently-pending writes are completed.
278   /// Thread safe with respect to \a ReaderInterface::Read operations only
279   ///
280   /// \return Whether the writes were successful.
281   virtual bool WritesDone() = 0;
282 };
283 
284 namespace internal {
285 template <class W>
286 class ClientWriterFactory {
287  public:
288   template <class R>
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,R * response)289   static ClientWriter<W>* Create(grpc::ChannelInterface* channel,
290                                  const grpc::internal::RpcMethod& method,
291                                  grpc::ClientContext* context, R* response) {
292     return new ClientWriter<W>(channel, method, context, response);
293   }
294 };
295 }  // namespace internal
296 
297 /// Synchronous (blocking) client-side API for doing client-streaming RPCs,
298 /// where the outgoing message stream coming from the client has messages of
299 /// type \a W.
300 template <class W>
301 class ClientWriter : public ClientWriterInterface<W> {
302  public:
303   /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
304   /// semantics.
305   ///
306   //  Side effect:
307   ///   Once complete, the initial metadata read from the server will be
308   ///   accessible through the \a ClientContext used to construct this object.
WaitForInitialMetadata()309   void WaitForInitialMetadata() {
310     ABSL_CHECK(!context_->initial_metadata_received_);
311 
312     grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops;
313     ops.RecvInitialMetadata(context_);
314     call_.PerformOps(&ops);
315     cq_.Pluck(&ops);  // status ignored
316   }
317 
318   /// See the WriterInterface.Write(const W& msg, WriteOptions options) method
319   /// for semantics.
320   ///
321   /// Side effect:
322   ///   Also sends initial metadata if not already sent (using the
323   ///   \a ClientContext associated with this call).
324   using internal::WriterInterface<W>::Write;
Write(const W & msg,grpc::WriteOptions options)325   bool Write(const W& msg, grpc::WriteOptions options) override {
326     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
327                               grpc::internal::CallOpSendMessage,
328                               grpc::internal::CallOpClientSendClose>
329         ops;
330 
331     if (options.is_last_message()) {
332       options.set_buffer_hint();
333       ops.ClientSendClose();
334     }
335     if (context_->initial_metadata_corked_) {
336       ops.SendInitialMetadata(&context_->send_initial_metadata_,
337                               context_->initial_metadata_flags());
338       context_->set_initial_metadata_corked(false);
339     }
340     if (!ops.SendMessagePtr(&msg, options).ok()) {
341       return false;
342     }
343 
344     call_.PerformOps(&ops);
345     return cq_.Pluck(&ops);
346   }
347 
WritesDone()348   bool WritesDone() override {
349     grpc::internal::CallOpSet<grpc::internal::CallOpClientSendClose> ops;
350     ops.ClientSendClose();
351     call_.PerformOps(&ops);
352     return cq_.Pluck(&ops);
353   }
354 
355   /// See the ClientStreamingInterface.Finish method for semantics.
356   /// Side effects:
357   ///   - Also receives initial metadata if not already received.
358   ///   - Attempts to fill in the \a response parameter passed
359   ///     to the constructor of this instance with the response
360   ///     message from the server.
Finish()361   grpc::Status Finish() override {
362     grpc::Status status;
363     if (!context_->initial_metadata_received_) {
364       finish_ops_.RecvInitialMetadata(context_);
365     }
366     finish_ops_.ClientRecvStatus(context_, &status);
367     call_.PerformOps(&finish_ops_);
368     ABSL_CHECK(cq_.Pluck(&finish_ops_));
369     return status;
370   }
371 
372  private:
373   friend class internal::ClientWriterFactory<W>;
374 
375   /// Block to create a stream (i.e. send request headers and other initial
376   /// metadata to the server). Note that \a context will be used to fill
377   /// in custom initial metadata. \a response will be filled in with the
378   /// single expected response message from the server upon a successful
379   /// call to the \a Finish method of this instance.
380   template <class R>
ClientWriter(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,R * response)381   ClientWriter(grpc::ChannelInterface* channel,
382                const grpc::internal::RpcMethod& method,
383                grpc::ClientContext* context, R* response)
384       : context_(context),
385         cq_(grpc_completion_queue_attributes{
386             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
387             nullptr}),  // Pluckable cq
388         call_(channel->CreateCall(method, context, &cq_)) {
389     finish_ops_.RecvMessage(response);
390     finish_ops_.AllowNoMessage();
391 
392     if (!context_->initial_metadata_corked_) {
393       grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
394       ops.SendInitialMetadata(&context->send_initial_metadata_,
395                               context->initial_metadata_flags());
396       call_.PerformOps(&ops);
397       cq_.Pluck(&ops);
398     }
399   }
400 
401   grpc::ClientContext* context_;
402   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
403                             grpc::internal::CallOpGenericRecvMessage,
404                             grpc::internal::CallOpClientRecvStatus>
405       finish_ops_;
406   grpc::CompletionQueue cq_;
407   grpc::internal::Call call_;
408 };
409 
410 /// Client-side interface for bi-directional streaming with
411 /// client-to-server stream messages of type \a W and
412 /// server-to-client stream messages of type \a R.
413 template <class W, class R>
414 class ClientReaderWriterInterface : public internal::ClientStreamingInterface,
415                                     public internal::WriterInterface<W>,
416                                     public internal::ReaderInterface<R> {
417  public:
418   /// Block to wait for initial metadata from server. The received metadata
419   /// can only be accessed after this call returns. Should only be called before
420   /// the first read. Calling this method is optional, and if it is not called
421   /// the metadata will be available in ClientContext after the first read.
422   virtual void WaitForInitialMetadata() = 0;
423 
424   /// Half close writing from the client. (signal that the stream of messages
425   /// coming from the client is complete).
426   /// Blocks until currently-pending writes are completed.
427   /// Thread-safe with respect to \a ReaderInterface::Read
428   ///
429   /// \return Whether the writes were successful.
430   virtual bool WritesDone() = 0;
431 };
432 
433 namespace internal {
434 template <class W, class R>
435 class ClientReaderWriterFactory {
436  public:
Create(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context)437   static ClientReaderWriter<W, R>* Create(
438       grpc::ChannelInterface* channel, const grpc::internal::RpcMethod& method,
439       grpc::ClientContext* context) {
440     return new ClientReaderWriter<W, R>(channel, method, context);
441   }
442 };
443 }  // namespace internal
444 
445 /// Synchronous (blocking) client-side API for bi-directional streaming RPCs,
446 /// where the outgoing message stream coming from the client has messages of
447 /// type \a W, and the incoming messages stream coming from the server has
448 /// messages of type \a R.
449 template <class W, class R>
450 class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
451  public:
452   /// Block waiting to read initial metadata from the server.
453   /// This call is optional, but if it is used, it cannot be used concurrently
454   /// with or after the \a Finish method.
455   ///
456   /// Once complete, the initial metadata read from the server will be
457   /// accessible through the \a ClientContext used to construct this object.
WaitForInitialMetadata()458   void WaitForInitialMetadata() override {
459     ABSL_CHECK(!context_->initial_metadata_received_);
460 
461     grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops;
462     ops.RecvInitialMetadata(context_);
463     call_.PerformOps(&ops);
464     cq_.Pluck(&ops);  // status ignored
465   }
466 
NextMessageSize(uint32_t * sz)467   bool NextMessageSize(uint32_t* sz) override {
468     int result = call_.max_receive_message_size();
469     *sz = (result > 0) ? result : UINT32_MAX;
470     return true;
471   }
472 
473   /// See the \a ReaderInterface.Read method for semantics.
474   /// Side effect:
475   ///   Also receives initial metadata if not already received (updates the \a
476   ///   ClientContext associated with this call in that case).
Read(R * msg)477   bool Read(R* msg) override {
478     grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
479                               grpc::internal::CallOpRecvMessage<R>>
480         ops;
481     if (!context_->initial_metadata_received_) {
482       ops.RecvInitialMetadata(context_);
483     }
484     ops.RecvMessage(msg);
485     call_.PerformOps(&ops);
486     return cq_.Pluck(&ops) && ops.got_message;
487   }
488 
489   /// See the \a WriterInterface.Write method for semantics.
490   ///
491   /// Side effect:
492   ///   Also sends initial metadata if not already sent (using the
493   ///   \a ClientContext associated with this call to fill in values).
494   using internal::WriterInterface<W>::Write;
Write(const W & msg,grpc::WriteOptions options)495   bool Write(const W& msg, grpc::WriteOptions options) override {
496     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
497                               grpc::internal::CallOpSendMessage,
498                               grpc::internal::CallOpClientSendClose>
499         ops;
500 
501     if (options.is_last_message()) {
502       options.set_buffer_hint();
503       ops.ClientSendClose();
504     }
505     if (context_->initial_metadata_corked_) {
506       ops.SendInitialMetadata(&context_->send_initial_metadata_,
507                               context_->initial_metadata_flags());
508       context_->set_initial_metadata_corked(false);
509     }
510     if (!ops.SendMessagePtr(&msg, options).ok()) {
511       return false;
512     }
513 
514     call_.PerformOps(&ops);
515     return cq_.Pluck(&ops);
516   }
517 
WritesDone()518   bool WritesDone() override {
519     grpc::internal::CallOpSet<grpc::internal::CallOpClientSendClose> ops;
520     ops.ClientSendClose();
521     call_.PerformOps(&ops);
522     return cq_.Pluck(&ops);
523   }
524 
525   /// See the ClientStreamingInterface.Finish method for semantics.
526   ///
527   /// Side effect:
528   ///   - the \a ClientContext associated with this call is updated with
529   ///     possible trailing metadata sent from the server.
Finish()530   grpc::Status Finish() override {
531     grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
532                               grpc::internal::CallOpClientRecvStatus>
533         ops;
534     if (!context_->initial_metadata_received_) {
535       ops.RecvInitialMetadata(context_);
536     }
537     grpc::Status status;
538     ops.ClientRecvStatus(context_, &status);
539     call_.PerformOps(&ops);
540     ABSL_CHECK(cq_.Pluck(&ops));
541     return status;
542   }
543 
544  private:
545   friend class internal::ClientReaderWriterFactory<W, R>;
546 
547   grpc::ClientContext* context_;
548   grpc::CompletionQueue cq_;
549   grpc::internal::Call call_;
550 
551   /// Block to create a stream and write the initial metadata and \a request
552   /// out. Note that \a context will be used to fill in custom initial metadata
553   /// used to send to the server when starting the call.
ClientReaderWriter(grpc::ChannelInterface * channel,const grpc::internal::RpcMethod & method,grpc::ClientContext * context)554   ClientReaderWriter(grpc::ChannelInterface* channel,
555                      const grpc::internal::RpcMethod& method,
556                      grpc::ClientContext* context)
557       : context_(context),
558         cq_(grpc_completion_queue_attributes{
559             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
560             nullptr}),  // Pluckable cq
561         call_(channel->CreateCall(method, context, &cq_)) {
562     if (!context_->initial_metadata_corked_) {
563       grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
564       ops.SendInitialMetadata(&context->send_initial_metadata_,
565                               context->initial_metadata_flags());
566       call_.PerformOps(&ops);
567       cq_.Pluck(&ops);
568     }
569   }
570 };
571 
572 /// Server-side interface for streaming reads of message of type \a R.
573 template <class R>
574 class ServerReaderInterface : public internal::ServerStreamingInterface,
575                               public internal::ReaderInterface<R> {};
576 
577 /// Synchronous (blocking) server-side API for doing client-streaming RPCs,
578 /// where the incoming message stream coming from the client has messages of
579 /// type \a R.
580 template <class R>
581 class ServerReader final : public ServerReaderInterface<R> {
582  public:
583   /// See the \a ServerStreamingInterface.SendInitialMetadata method
584   /// for semantics. Note that initial metadata will be affected by the
585   /// \a ServerContext associated with this call.
SendInitialMetadata()586   void SendInitialMetadata() override {
587     ABSL_CHECK(!ctx_->sent_initial_metadata_);
588 
589     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
590     ops.SendInitialMetadata(&ctx_->initial_metadata_,
591                             ctx_->initial_metadata_flags());
592     if (ctx_->compression_level_set()) {
593       ops.set_compression_level(ctx_->compression_level());
594     }
595     ctx_->sent_initial_metadata_ = true;
596     call_->PerformOps(&ops);
597     call_->cq()->Pluck(&ops);
598   }
599 
NextMessageSize(uint32_t * sz)600   bool NextMessageSize(uint32_t* sz) override {
601     int result = call_->max_receive_message_size();
602     *sz = (result > 0) ? result : UINT32_MAX;
603     return true;
604   }
605 
Read(R * msg)606   bool Read(R* msg) override {
607     grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> ops;
608     ops.RecvMessage(msg);
609     call_->PerformOps(&ops);
610     bool ok = call_->cq()->Pluck(&ops) && ops.got_message;
611     if (!ok) {
612       ctx_->MaybeMarkCancelledOnRead();
613     }
614     return ok;
615   }
616 
617  private:
618   grpc::internal::Call* const call_;
619   ServerContext* const ctx_;
620 
621   template <class ServiceType, class RequestType, class ResponseType>
622   friend class internal::ClientStreamingHandler;
623 
ServerReader(grpc::internal::Call * call,grpc::ServerContext * ctx)624   ServerReader(grpc::internal::Call* call, grpc::ServerContext* ctx)
625       : call_(call), ctx_(ctx) {}
626 };
627 
628 /// Server-side interface for streaming writes of message of type \a W.
629 template <class W>
630 class ServerWriterInterface : public internal::ServerStreamingInterface,
631                               public internal::WriterInterface<W> {};
632 
633 /// Synchronous (blocking) server-side API for doing for doing a
634 /// server-streaming RPCs, where the outgoing message stream coming from the
635 /// server has messages of type \a W.
636 template <class W>
637 class ServerWriter final : public ServerWriterInterface<W> {
638  public:
639   /// See the \a ServerStreamingInterface.SendInitialMetadata method
640   /// for semantics.
641   /// Note that initial metadata will be affected by the
642   /// \a ServerContext associated with this call.
SendInitialMetadata()643   void SendInitialMetadata() override {
644     ABSL_CHECK(!ctx_->sent_initial_metadata_);
645 
646     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
647     ops.SendInitialMetadata(&ctx_->initial_metadata_,
648                             ctx_->initial_metadata_flags());
649     if (ctx_->compression_level_set()) {
650       ops.set_compression_level(ctx_->compression_level());
651     }
652     ctx_->sent_initial_metadata_ = true;
653     call_->PerformOps(&ops);
654     call_->cq()->Pluck(&ops);
655   }
656 
657   /// See the \a WriterInterface.Write method for semantics.
658   ///
659   /// Side effect:
660   ///   Also sends initial metadata if not already sent (using the
661   ///   \a ClientContext associated with this call to fill in values).
662   using internal::WriterInterface<W>::Write;
Write(const W & msg,grpc::WriteOptions options)663   bool Write(const W& msg, grpc::WriteOptions options) override {
664     if (options.is_last_message()) {
665       options.set_buffer_hint();
666     }
667 
668     if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) {
669       return false;
670     }
671     if (!ctx_->sent_initial_metadata_) {
672       ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
673                                              ctx_->initial_metadata_flags());
674       if (ctx_->compression_level_set()) {
675         ctx_->pending_ops_.set_compression_level(ctx_->compression_level());
676       }
677       ctx_->sent_initial_metadata_ = true;
678     }
679     call_->PerformOps(&ctx_->pending_ops_);
680     // if this is the last message we defer the pluck until AFTER we start
681     // the trailing md op. This prevents hangs. See
682     // https://github.com/grpc/grpc/issues/11546
683     if (options.is_last_message()) {
684       ctx_->has_pending_ops_ = true;
685       return true;
686     }
687     ctx_->has_pending_ops_ = false;
688     return call_->cq()->Pluck(&ctx_->pending_ops_);
689   }
690 
691  private:
692   grpc::internal::Call* const call_;
693   grpc::ServerContext* const ctx_;
694 
695   template <class ServiceType, class RequestType, class ResponseType>
696   friend class internal::ServerStreamingHandler;
697 
ServerWriter(grpc::internal::Call * call,grpc::ServerContext * ctx)698   ServerWriter(grpc::internal::Call* call, grpc::ServerContext* ctx)
699       : call_(call), ctx_(ctx) {}
700 };
701 
702 /// Server-side interface for bi-directional streaming.
703 template <class W, class R>
704 class ServerReaderWriterInterface : public internal::ServerStreamingInterface,
705                                     public internal::WriterInterface<W>,
706                                     public internal::ReaderInterface<R> {};
707 
708 /// Actual implementation of bi-directional streaming
709 namespace internal {
710 template <class W, class R>
711 class ServerReaderWriterBody final {
712  public:
ServerReaderWriterBody(grpc::internal::Call * call,grpc::ServerContext * ctx)713   ServerReaderWriterBody(grpc::internal::Call* call, grpc::ServerContext* ctx)
714       : call_(call), ctx_(ctx) {}
715 
SendInitialMetadata()716   void SendInitialMetadata() {
717     ABSL_CHECK(!ctx_->sent_initial_metadata_);
718 
719     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
720     ops.SendInitialMetadata(&ctx_->initial_metadata_,
721                             ctx_->initial_metadata_flags());
722     if (ctx_->compression_level_set()) {
723       ops.set_compression_level(ctx_->compression_level());
724     }
725     ctx_->sent_initial_metadata_ = true;
726     call_->PerformOps(&ops);
727     call_->cq()->Pluck(&ops);
728   }
729 
NextMessageSize(uint32_t * sz)730   bool NextMessageSize(uint32_t* sz) {
731     int result = call_->max_receive_message_size();
732     *sz = (result > 0) ? result : UINT32_MAX;
733     return true;
734   }
735 
Read(R * msg)736   bool Read(R* msg) {
737     grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> ops;
738     ops.RecvMessage(msg);
739     call_->PerformOps(&ops);
740     bool ok = call_->cq()->Pluck(&ops) && ops.got_message;
741     if (!ok) {
742       ctx_->MaybeMarkCancelledOnRead();
743     }
744     return ok;
745   }
746 
Write(const W & msg,grpc::WriteOptions options)747   bool Write(const W& msg, grpc::WriteOptions options) {
748     if (options.is_last_message()) {
749       options.set_buffer_hint();
750     }
751     if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) {
752       return false;
753     }
754     if (!ctx_->sent_initial_metadata_) {
755       ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
756                                              ctx_->initial_metadata_flags());
757       if (ctx_->compression_level_set()) {
758         ctx_->pending_ops_.set_compression_level(ctx_->compression_level());
759       }
760       ctx_->sent_initial_metadata_ = true;
761     }
762     call_->PerformOps(&ctx_->pending_ops_);
763     // if this is the last message we defer the pluck until AFTER we start
764     // the trailing md op. This prevents hangs. See
765     // https://github.com/grpc/grpc/issues/11546
766     if (options.is_last_message()) {
767       ctx_->has_pending_ops_ = true;
768       return true;
769     }
770     ctx_->has_pending_ops_ = false;
771     return call_->cq()->Pluck(&ctx_->pending_ops_);
772   }
773 
774  private:
775   grpc::internal::Call* const call_;
776   grpc::ServerContext* const ctx_;
777 };
778 
779 }  // namespace internal
780 
781 /// Synchronous (blocking) server-side API for a bidirectional
782 /// streaming call, where the incoming message stream coming from the client has
783 /// messages of type \a R, and the outgoing message streaming coming from
784 /// the server has messages of type \a W.
785 template <class W, class R>
786 class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> {
787  public:
788   /// See the \a ServerStreamingInterface.SendInitialMetadata method
789   /// for semantics. Note that initial metadata will be affected by the
790   /// \a ServerContext associated with this call.
SendInitialMetadata()791   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
792 
NextMessageSize(uint32_t * sz)793   bool NextMessageSize(uint32_t* sz) override {
794     return body_.NextMessageSize(sz);
795   }
796 
Read(R * msg)797   bool Read(R* msg) override { return body_.Read(msg); }
798 
799   /// See the \a WriterInterface.Write(const W& msg, WriteOptions options)
800   /// method for semantics.
801   /// Side effect:
802   ///   Also sends initial metadata if not already sent (using the \a
803   ///   ServerContext associated with this call).
804   using internal::WriterInterface<W>::Write;
Write(const W & msg,grpc::WriteOptions options)805   bool Write(const W& msg, grpc::WriteOptions options) override {
806     return body_.Write(msg, options);
807   }
808 
809  private:
810   internal::ServerReaderWriterBody<W, R> body_;
811 
812   friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>,
813                                                        false>;
ServerReaderWriter(grpc::internal::Call * call,grpc::ServerContext * ctx)814   ServerReaderWriter(grpc::internal::Call* call, grpc::ServerContext* ctx)
815       : body_(call, ctx) {}
816 };
817 
818 /// A class to represent a flow-controlled unary call. This is something
819 /// of a hybrid between conventional unary and streaming. This is invoked
820 /// through a unary call on the client side, but the server responds to it
821 /// as though it were a single-ping-pong streaming call. The server can use
822 /// the \a NextMessageSize method to determine an upper-bound on the size of
823 /// the message. A key difference relative to streaming: ServerUnaryStreamer
824 /// must have exactly 1 Read and exactly 1 Write, in that order, to function
825 /// correctly. Otherwise, the RPC is in error.
826 template <class RequestType, class ResponseType>
827 class ServerUnaryStreamer final
828     : public ServerReaderWriterInterface<ResponseType, RequestType> {
829  public:
830   /// Block to send initial metadata to client.
831   /// Implicit input parameter:
832   ///    - the \a ServerContext associated with this call will be used for
833   ///      sending initial metadata.
SendInitialMetadata()834   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
835 
836   /// Get an upper bound on the request message size from the client.
NextMessageSize(uint32_t * sz)837   bool NextMessageSize(uint32_t* sz) override {
838     return body_.NextMessageSize(sz);
839   }
840 
841   /// Read a message of type \a R into \a msg. Completion will be notified by \a
842   /// tag on the associated completion queue.
843   /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
844   /// should not be called concurrently with other streaming APIs
845   /// on the same stream. It is not meaningful to call it concurrently
846   /// with another \a ReaderInterface::Read on the same stream since reads on
847   /// the same stream are delivered in order.
848   ///
849   /// \param[out] msg Where to eventually store the read message.
850   /// \param[in] tag The tag identifying the operation.
Read(RequestType * request)851   bool Read(RequestType* request) override {
852     if (read_done_) {
853       return false;
854     }
855     read_done_ = true;
856     return body_.Read(request);
857   }
858 
859   /// Block to write \a msg to the stream with WriteOptions \a options.
860   /// This is thread-safe with respect to \a ReaderInterface::Read
861   ///
862   /// \param msg The message to be written to the stream.
863   /// \param options The WriteOptions affecting the write operation.
864   ///
865   /// \return \a true on success, \a false when the stream has been closed.
866   using internal::WriterInterface<ResponseType>::Write;
Write(const ResponseType & response,grpc::WriteOptions options)867   bool Write(const ResponseType& response,
868              grpc::WriteOptions options) override {
869     if (write_done_ || !read_done_) {
870       return false;
871     }
872     write_done_ = true;
873     return body_.Write(response, options);
874   }
875 
876  private:
877   internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
878   bool read_done_;
879   bool write_done_;
880 
881   friend class internal::TemplatedBidiStreamingHandler<
882       ServerUnaryStreamer<RequestType, ResponseType>, true>;
ServerUnaryStreamer(grpc::internal::Call * call,grpc::ServerContext * ctx)883   ServerUnaryStreamer(grpc::internal::Call* call, grpc::ServerContext* ctx)
884       : body_(call, ctx), read_done_(false), write_done_(false) {}
885 };
886 
887 /// A class to represent a flow-controlled server-side streaming call.
888 /// This is something of a hybrid between server-side and bidi streaming.
889 /// This is invoked through a server-side streaming call on the client side,
890 /// but the server responds to it as though it were a bidi streaming call that
891 /// must first have exactly 1 Read and then any number of Writes.
892 template <class RequestType, class ResponseType>
893 class ServerSplitStreamer final
894     : public ServerReaderWriterInterface<ResponseType, RequestType> {
895  public:
896   /// Block to send initial metadata to client.
897   /// Implicit input parameter:
898   ///    - the \a ServerContext associated with this call will be used for
899   ///      sending initial metadata.
SendInitialMetadata()900   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
901 
902   /// Get an upper bound on the request message size from the client.
NextMessageSize(uint32_t * sz)903   bool NextMessageSize(uint32_t* sz) override {
904     return body_.NextMessageSize(sz);
905   }
906 
907   /// Read a message of type \a R into \a msg. Completion will be notified by \a
908   /// tag on the associated completion queue.
909   /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
910   /// should not be called concurrently with other streaming APIs
911   /// on the same stream. It is not meaningful to call it concurrently
912   /// with another \a ReaderInterface::Read on the same stream since reads on
913   /// the same stream are delivered in order.
914   ///
915   /// \param[out] msg Where to eventually store the read message.
916   /// \param[in] tag The tag identifying the operation.
Read(RequestType * request)917   bool Read(RequestType* request) override {
918     if (read_done_) {
919       return false;
920     }
921     read_done_ = true;
922     return body_.Read(request);
923   }
924 
925   /// Block to write \a msg to the stream with WriteOptions \a options.
926   /// This is thread-safe with respect to \a ReaderInterface::Read
927   ///
928   /// \param msg The message to be written to the stream.
929   /// \param options The WriteOptions affecting the write operation.
930   ///
931   /// \return \a true on success, \a false when the stream has been closed.
932   using internal::WriterInterface<ResponseType>::Write;
Write(const ResponseType & response,grpc::WriteOptions options)933   bool Write(const ResponseType& response,
934              grpc::WriteOptions options) override {
935     return read_done_ && body_.Write(response, options);
936   }
937 
938  private:
939   internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
940   bool read_done_;
941 
942   friend class internal::TemplatedBidiStreamingHandler<
943       ServerSplitStreamer<RequestType, ResponseType>, false>;
ServerSplitStreamer(grpc::internal::Call * call,grpc::ServerContext * ctx)944   ServerSplitStreamer(grpc::internal::Call* call, grpc::ServerContext* ctx)
945       : body_(call, ctx), read_done_(false) {}
946 };
947 
948 }  // namespace grpc
949 
950 #endif  // GRPCPP_SUPPORT_SYNC_STREAM_H
951