• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H
19 #define GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H
20 
21 #include <grpcpp/impl/codegen/call.h>
22 #include <grpcpp/impl/codegen/channel_interface.h>
23 #include <grpcpp/impl/codegen/client_context.h>
24 #include <grpcpp/impl/codegen/completion_queue.h>
25 #include <grpcpp/impl/codegen/core_codegen_interface.h>
26 #include <grpcpp/impl/codegen/server_context.h>
27 #include <grpcpp/impl/codegen/service_type.h>
28 #include <grpcpp/impl/codegen/status.h>
29 
30 namespace grpc {
31 
32 namespace internal {
33 /// Common interface for all synchronous client side streaming.
34 class ClientStreamingInterface {
35  public:
~ClientStreamingInterface()36   virtual ~ClientStreamingInterface() {}
37 
38   /// Block waiting until the stream finishes and a final status of the call is
39   /// available.
40   ///
41   /// It is appropriate to call this method exactly once when both:
42   ///   * the calling code (client-side) has no more message to send
43   ///     (this can be declared implicitly by calling this method, or
44   ///     explicitly through an earlier call to <i>WritesDone</i> method of the
45   ///     class in use, e.g. \a ClientWriterInterface::WritesDone or
46   ///     \a ClientReaderWriterInterface::WritesDone).
47   ///   * there are no more messages to be received from the server (which can
48   ///     be known implicitly, or explicitly from an earlier call to \a
49   ///     ReaderInterface::Read that returned "false").
50   ///
51   /// This function will return either:
52   /// - when all incoming messages have been read and the server has
53   ///   returned status.
54   /// - when the server has returned a non-OK status.
55   /// - OR when the call failed for some reason and the library generated a
56   ///   status.
57   ///
58   /// Return values:
59   ///   - \a Status contains the status code, message and details for the call
60   ///   - the \a ClientContext associated with this call is updated with
61   ///     possible trailing metadata sent from the server.
62   virtual ::grpc::Status Finish() = 0;
63 };
64 
65 /// Common interface for all synchronous server side streaming.
66 class ServerStreamingInterface {
67  public:
~ServerStreamingInterface()68   virtual ~ServerStreamingInterface() {}
69 
70   /// Block to send initial metadata to client.
71   /// This call is optional, but if it is used, it cannot be used concurrently
72   /// with or after the \a Finish method.
73   ///
74   /// The initial metadata that will be sent to the client will be
75   /// taken from the \a ServerContext associated with the call.
76   virtual void SendInitialMetadata() = 0;
77 };
78 
79 /// An interface that yields a sequence of messages of type \a R.
80 template <class R>
81 class ReaderInterface {
82  public:
~ReaderInterface()83   virtual ~ReaderInterface() {}
84 
85   /// Get an upper bound on the next message size available for reading on this
86   /// stream.
87   virtual bool NextMessageSize(uint32_t* sz) = 0;
88 
89   /// Block to read a message and parse to \a msg. Returns \a true on success.
90   /// This is thread-safe with respect to \a Write or \WritesDone methods on
91   /// the same stream. It should not be called concurrently with another \a
92   /// Read on the same stream as the order of delivery will not be defined.
93   ///
94   /// \param[out] msg The read message.
95   ///
96   /// \return \a false when there will be no more incoming messages, either
97   /// because the other side has called \a WritesDone() or the stream has failed
98   /// (or been cancelled).
99   virtual bool Read(R* msg) = 0;
100 };
101 
102 /// An interface that can be fed a sequence of messages of type \a W.
103 template <class W>
104 class WriterInterface {
105  public:
~WriterInterface()106   virtual ~WriterInterface() {}
107 
108   /// Block to write \a msg to the stream with WriteOptions \a options.
109   /// This is thread-safe with respect to \a ReaderInterface::Read
110   ///
111   /// \param msg The message to be written to the stream.
112   /// \param options The WriteOptions affecting the write operation.
113   ///
114   /// \return \a true on success, \a false when the stream has been closed.
115   virtual bool Write(const W& msg, ::grpc::WriteOptions options) = 0;
116 
117   /// Block to write \a msg to the stream with default write options.
118   /// This is thread-safe with respect to \a ReaderInterface::Read
119   ///
120   /// \param msg The message to be written to the stream.
121   ///
122   /// \return \a true on success, \a false when the stream has been closed.
Write(const W & msg)123   inline bool Write(const W& msg) { return Write(msg, ::grpc::WriteOptions()); }
124 
125   /// Write \a msg and coalesce it with the writing of trailing metadata, using
126   /// WriteOptions \a options.
127   ///
128   /// For client, WriteLast is equivalent of performing Write and WritesDone in
129   /// a single step. \a msg and trailing metadata are coalesced and sent on wire
130   /// by calling this function. For server, WriteLast buffers the \a msg.
131   /// The writing of \a msg is held until the service handler returns,
132   /// where \a msg and trailing metadata are coalesced and sent on wire.
133   /// Note that WriteLast can only buffer \a msg up to the flow control window
134   /// size. If \a msg size is larger than the window size, it will be sent on
135   /// wire without buffering.
136   ///
137   /// \param[in] msg The message to be written to the stream.
138   /// \param[in] options The WriteOptions to be used to write this message.
WriteLast(const W & msg,::grpc::WriteOptions options)139   void WriteLast(const W& msg, ::grpc::WriteOptions options) {
140     Write(msg, options.set_last_message());
141   }
142 };
143 
144 }  // namespace internal
145 
146 /// Client-side interface for streaming reads of message of type \a R.
147 template <class R>
148 class ClientReaderInterface : public internal::ClientStreamingInterface,
149                               public internal::ReaderInterface<R> {
150  public:
151   /// Block to wait for initial metadata from server. The received metadata
152   /// can only be accessed after this call returns. Should only be called before
153   /// the first read. Calling this method is optional, and if it is not called
154   /// the metadata will be available in ClientContext after the first read.
155   virtual void WaitForInitialMetadata() = 0;
156 };
157 
158 namespace internal {
159 template <class R>
160 class ClientReaderFactory {
161  public:
162   template <class W>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const W & request)163   static ClientReader<R>* Create(::grpc::ChannelInterface* channel,
164                                  const ::grpc::internal::RpcMethod& method,
165                                  ::grpc::ClientContext* context,
166                                  const W& request) {
167     return new ClientReader<R>(channel, method, context, request);
168   }
169 };
170 }  // namespace internal
171 
172 /// Synchronous (blocking) client-side API for doing server-streaming RPCs,
173 /// where the stream of messages coming from the server has messages
174 /// of type \a R.
175 template <class R>
176 class ClientReader final : public ClientReaderInterface<R> {
177  public:
178   /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
179   /// semantics.
180   ///
181   //  Side effect:
182   ///   Once complete, the initial metadata read from
183   ///   the server will be accessible through the \a ClientContext used to
184   ///   construct this object.
WaitForInitialMetadata()185   void WaitForInitialMetadata() override {
186     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
187 
188     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
189         ops;
190     ops.RecvInitialMetadata(context_);
191     call_.PerformOps(&ops);
192     cq_.Pluck(&ops);  /// status ignored
193   }
194 
NextMessageSize(uint32_t * sz)195   bool NextMessageSize(uint32_t* sz) override {
196     int result = call_.max_receive_message_size();
197     *sz = (result > 0) ? result : UINT32_MAX;
198     return true;
199   }
200 
201   /// See the \a ReaderInterface.Read method for semantics.
202   /// Side effect:
203   ///   This also receives initial metadata from the server, if not
204   ///   already received (if initial metadata is received, it can be then
205   ///   accessed through the \a ClientContext associated with this call).
Read(R * msg)206   bool Read(R* msg) override {
207     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
208                                 ::grpc::internal::CallOpRecvMessage<R>>
209         ops;
210     if (!context_->initial_metadata_received_) {
211       ops.RecvInitialMetadata(context_);
212     }
213     ops.RecvMessage(msg);
214     call_.PerformOps(&ops);
215     return cq_.Pluck(&ops) && ops.got_message;
216   }
217 
218   /// See the \a ClientStreamingInterface.Finish method for semantics.
219   ///
220   /// Side effect:
221   ///   The \a ClientContext associated with this call is updated with
222   ///   possible metadata received from the server.
Finish()223   ::grpc::Status Finish() override {
224     ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops;
225     ::grpc::Status status;
226     ops.ClientRecvStatus(context_, &status);
227     call_.PerformOps(&ops);
228     GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
229     return status;
230   }
231 
232  private:
233   friend class internal::ClientReaderFactory<R>;
234   ::grpc::ClientContext* context_;
235   ::grpc::CompletionQueue cq_;
236   ::grpc::internal::Call call_;
237 
238   /// Block to create a stream and write the initial metadata and \a request
239   /// out. Note that \a context will be used to fill in custom initial
240   /// metadata used to send to the server when starting the call.
241   template <class W>
ClientReader(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const W & request)242   ClientReader(::grpc::ChannelInterface* channel,
243                const ::grpc::internal::RpcMethod& method,
244                ::grpc::ClientContext* context, const W& request)
245       : context_(context),
246         cq_(grpc_completion_queue_attributes{
247             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
248             nullptr}),  // Pluckable cq
249         call_(channel->CreateCall(method, context, &cq_)) {
250     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
251                                 ::grpc::internal::CallOpSendMessage,
252                                 ::grpc::internal::CallOpClientSendClose>
253         ops;
254     ops.SendInitialMetadata(&context->send_initial_metadata_,
255                             context->initial_metadata_flags());
256     // TODO(ctiller): don't assert
257     GPR_CODEGEN_ASSERT(ops.SendMessagePtr(&request).ok());
258     ops.ClientSendClose();
259     call_.PerformOps(&ops);
260     cq_.Pluck(&ops);
261   }
262 };
263 
264 /// Client-side interface for streaming writes of message type \a W.
265 template <class W>
266 class ClientWriterInterface : public internal::ClientStreamingInterface,
267                               public internal::WriterInterface<W> {
268  public:
269   /// Half close writing from the client. (signal that the stream of messages
270   /// coming from the client is complete).
271   /// Blocks until currently-pending writes are completed.
272   /// Thread safe with respect to \a ReaderInterface::Read operations only
273   ///
274   /// \return Whether the writes were successful.
275   virtual bool WritesDone() = 0;
276 };
277 
278 namespace internal {
279 template <class W>
280 class ClientWriterFactory {
281  public:
282   template <class R>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,R * response)283   static ClientWriter<W>* Create(::grpc::ChannelInterface* channel,
284                                  const ::grpc::internal::RpcMethod& method,
285                                  ::grpc::ClientContext* context, R* response) {
286     return new ClientWriter<W>(channel, method, context, response);
287   }
288 };
289 }  // namespace internal
290 
291 /// Synchronous (blocking) client-side API for doing client-streaming RPCs,
292 /// where the outgoing message stream coming from the client has messages of
293 /// type \a W.
294 template <class W>
295 class ClientWriter : public ClientWriterInterface<W> {
296  public:
297   /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
298   /// semantics.
299   ///
300   //  Side effect:
301   ///   Once complete, the initial metadata read from the server will be
302   ///   accessible through the \a ClientContext used to construct this object.
WaitForInitialMetadata()303   void WaitForInitialMetadata() {
304     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
305 
306     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
307         ops;
308     ops.RecvInitialMetadata(context_);
309     call_.PerformOps(&ops);
310     cq_.Pluck(&ops);  // status ignored
311   }
312 
313   /// See the WriterInterface.Write(const W& msg, WriteOptions options) method
314   /// for semantics.
315   ///
316   /// Side effect:
317   ///   Also sends initial metadata if not already sent (using the
318   ///   \a ClientContext associated with this call).
319   using internal::WriterInterface<W>::Write;
Write(const W & msg,::grpc::WriteOptions options)320   bool Write(const W& msg, ::grpc::WriteOptions options) override {
321     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
322                                 ::grpc::internal::CallOpSendMessage,
323                                 ::grpc::internal::CallOpClientSendClose>
324         ops;
325 
326     if (options.is_last_message()) {
327       options.set_buffer_hint();
328       ops.ClientSendClose();
329     }
330     if (context_->initial_metadata_corked_) {
331       ops.SendInitialMetadata(&context_->send_initial_metadata_,
332                               context_->initial_metadata_flags());
333       context_->set_initial_metadata_corked(false);
334     }
335     if (!ops.SendMessagePtr(&msg, options).ok()) {
336       return false;
337     }
338 
339     call_.PerformOps(&ops);
340     return cq_.Pluck(&ops);
341   }
342 
WritesDone()343   bool WritesDone() override {
344     ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops;
345     ops.ClientSendClose();
346     call_.PerformOps(&ops);
347     return cq_.Pluck(&ops);
348   }
349 
350   /// See the ClientStreamingInterface.Finish method for semantics.
351   /// Side effects:
352   ///   - Also receives initial metadata if not already received.
353   ///   - Attempts to fill in the \a response parameter passed
354   ///     to the constructor of this instance with the response
355   ///     message from the server.
Finish()356   ::grpc::Status Finish() override {
357     ::grpc::Status status;
358     if (!context_->initial_metadata_received_) {
359       finish_ops_.RecvInitialMetadata(context_);
360     }
361     finish_ops_.ClientRecvStatus(context_, &status);
362     call_.PerformOps(&finish_ops_);
363     GPR_CODEGEN_ASSERT(cq_.Pluck(&finish_ops_));
364     return status;
365   }
366 
367  private:
368   friend class internal::ClientWriterFactory<W>;
369 
370   /// Block to create a stream (i.e. send request headers and other initial
371   /// metadata to the server). Note that \a context will be used to fill
372   /// in custom initial metadata. \a response will be filled in with the
373   /// single expected response message from the server upon a successful
374   /// call to the \a Finish method of this instance.
375   template <class R>
ClientWriter(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,R * response)376   ClientWriter(::grpc::ChannelInterface* channel,
377                const ::grpc::internal::RpcMethod& method,
378                ::grpc::ClientContext* context, R* response)
379       : context_(context),
380         cq_(grpc_completion_queue_attributes{
381             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
382             nullptr}),  // Pluckable cq
383         call_(channel->CreateCall(method, context, &cq_)) {
384     finish_ops_.RecvMessage(response);
385     finish_ops_.AllowNoMessage();
386 
387     if (!context_->initial_metadata_corked_) {
388       ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
389           ops;
390       ops.SendInitialMetadata(&context->send_initial_metadata_,
391                               context->initial_metadata_flags());
392       call_.PerformOps(&ops);
393       cq_.Pluck(&ops);
394     }
395   }
396 
397   ::grpc::ClientContext* context_;
398   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
399                               ::grpc::internal::CallOpGenericRecvMessage,
400                               ::grpc::internal::CallOpClientRecvStatus>
401       finish_ops_;
402   ::grpc::CompletionQueue cq_;
403   ::grpc::internal::Call call_;
404 };
405 
406 /// Client-side interface for bi-directional streaming with
407 /// client-to-server stream messages of type \a W and
408 /// server-to-client stream messages of type \a R.
409 template <class W, class R>
410 class ClientReaderWriterInterface : public internal::ClientStreamingInterface,
411                                     public internal::WriterInterface<W>,
412                                     public internal::ReaderInterface<R> {
413  public:
414   /// Block to wait for initial metadata from server. The received metadata
415   /// can only be accessed after this call returns. Should only be called before
416   /// the first read. Calling this method is optional, and if it is not called
417   /// the metadata will be available in ClientContext after the first read.
418   virtual void WaitForInitialMetadata() = 0;
419 
420   /// Half close writing from the client. (signal that the stream of messages
421   /// coming from the client is complete).
422   /// Blocks until currently-pending writes are completed.
423   /// Thread-safe with respect to \a ReaderInterface::Read
424   ///
425   /// \return Whether the writes were successful.
426   virtual bool WritesDone() = 0;
427 };
428 
429 namespace internal {
430 template <class W, class R>
431 class ClientReaderWriterFactory {
432  public:
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context)433   static ClientReaderWriter<W, R>* Create(
434       ::grpc::ChannelInterface* channel,
435       const ::grpc::internal::RpcMethod& method,
436       ::grpc::ClientContext* context) {
437     return new ClientReaderWriter<W, R>(channel, method, context);
438   }
439 };
440 }  // namespace internal
441 
442 /// Synchronous (blocking) client-side API for bi-directional streaming RPCs,
443 /// where the outgoing message stream coming from the client has messages of
444 /// type \a W, and the incoming messages stream coming from the server has
445 /// messages of type \a R.
446 template <class W, class R>
447 class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
448  public:
449   /// Block waiting to read initial metadata from the server.
450   /// This call is optional, but if it is used, it cannot be used concurrently
451   /// with or after the \a Finish method.
452   ///
453   /// Once complete, the initial metadata read from the server will be
454   /// accessible through the \a ClientContext used to construct this object.
WaitForInitialMetadata()455   void WaitForInitialMetadata() override {
456     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
457 
458     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
459         ops;
460     ops.RecvInitialMetadata(context_);
461     call_.PerformOps(&ops);
462     cq_.Pluck(&ops);  // status ignored
463   }
464 
NextMessageSize(uint32_t * sz)465   bool NextMessageSize(uint32_t* sz) override {
466     int result = call_.max_receive_message_size();
467     *sz = (result > 0) ? result : UINT32_MAX;
468     return true;
469   }
470 
471   /// See the \a ReaderInterface.Read method for semantics.
472   /// Side effect:
473   ///   Also receives initial metadata if not already received (updates the \a
474   ///   ClientContext associated with this call in that case).
Read(R * msg)475   bool Read(R* msg) override {
476     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
477                                 ::grpc::internal::CallOpRecvMessage<R>>
478         ops;
479     if (!context_->initial_metadata_received_) {
480       ops.RecvInitialMetadata(context_);
481     }
482     ops.RecvMessage(msg);
483     call_.PerformOps(&ops);
484     return cq_.Pluck(&ops) && ops.got_message;
485   }
486 
487   /// See the \a WriterInterface.Write method for semantics.
488   ///
489   /// Side effect:
490   ///   Also sends initial metadata if not already sent (using the
491   ///   \a ClientContext associated with this call to fill in values).
492   using internal::WriterInterface<W>::Write;
Write(const W & msg,::grpc::WriteOptions options)493   bool Write(const W& msg, ::grpc::WriteOptions options) override {
494     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
495                                 ::grpc::internal::CallOpSendMessage,
496                                 ::grpc::internal::CallOpClientSendClose>
497         ops;
498 
499     if (options.is_last_message()) {
500       options.set_buffer_hint();
501       ops.ClientSendClose();
502     }
503     if (context_->initial_metadata_corked_) {
504       ops.SendInitialMetadata(&context_->send_initial_metadata_,
505                               context_->initial_metadata_flags());
506       context_->set_initial_metadata_corked(false);
507     }
508     if (!ops.SendMessagePtr(&msg, options).ok()) {
509       return false;
510     }
511 
512     call_.PerformOps(&ops);
513     return cq_.Pluck(&ops);
514   }
515 
WritesDone()516   bool WritesDone() override {
517     ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops;
518     ops.ClientSendClose();
519     call_.PerformOps(&ops);
520     return cq_.Pluck(&ops);
521   }
522 
523   /// See the ClientStreamingInterface.Finish method for semantics.
524   ///
525   /// Side effect:
526   ///   - the \a ClientContext associated with this call is updated with
527   ///     possible trailing metadata sent from the server.
Finish()528   ::grpc::Status Finish() override {
529     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
530                                 ::grpc::internal::CallOpClientRecvStatus>
531         ops;
532     if (!context_->initial_metadata_received_) {
533       ops.RecvInitialMetadata(context_);
534     }
535     ::grpc::Status status;
536     ops.ClientRecvStatus(context_, &status);
537     call_.PerformOps(&ops);
538     GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
539     return status;
540   }
541 
542  private:
543   friend class internal::ClientReaderWriterFactory<W, R>;
544 
545   ::grpc::ClientContext* context_;
546   ::grpc::CompletionQueue cq_;
547   ::grpc::internal::Call call_;
548 
549   /// Block to create a stream and write the initial metadata and \a request
550   /// out. Note that \a context will be used to fill in custom initial metadata
551   /// used to send to the server when starting the call.
ClientReaderWriter(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context)552   ClientReaderWriter(::grpc::ChannelInterface* channel,
553                      const ::grpc::internal::RpcMethod& method,
554                      ::grpc::ClientContext* context)
555       : context_(context),
556         cq_(grpc_completion_queue_attributes{
557             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
558             nullptr}),  // Pluckable cq
559         call_(channel->CreateCall(method, context, &cq_)) {
560     if (!context_->initial_metadata_corked_) {
561       ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
562           ops;
563       ops.SendInitialMetadata(&context->send_initial_metadata_,
564                               context->initial_metadata_flags());
565       call_.PerformOps(&ops);
566       cq_.Pluck(&ops);
567     }
568   }
569 };
570 
571 /// Server-side interface for streaming reads of message of type \a R.
572 template <class R>
573 class ServerReaderInterface : public internal::ServerStreamingInterface,
574                               public internal::ReaderInterface<R> {};
575 
576 /// Synchronous (blocking) server-side API for doing client-streaming RPCs,
577 /// where the incoming message stream coming from the client has messages of
578 /// type \a R.
579 template <class R>
580 class ServerReader final : public ServerReaderInterface<R> {
581  public:
582   /// See the \a ServerStreamingInterface.SendInitialMetadata method
583   /// for semantics. Note that initial metadata will be affected by the
584   /// \a ServerContext associated with this call.
SendInitialMetadata()585   void SendInitialMetadata() override {
586     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
587 
588     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
589         ops;
590     ops.SendInitialMetadata(&ctx_->initial_metadata_,
591                             ctx_->initial_metadata_flags());
592     if (ctx_->compression_level_set()) {
593       ops.set_compression_level(ctx_->compression_level());
594     }
595     ctx_->sent_initial_metadata_ = true;
596     call_->PerformOps(&ops);
597     call_->cq()->Pluck(&ops);
598   }
599 
NextMessageSize(uint32_t * sz)600   bool NextMessageSize(uint32_t* sz) override {
601     int result = call_->max_receive_message_size();
602     *sz = (result > 0) ? result : UINT32_MAX;
603     return true;
604   }
605 
Read(R * msg)606   bool Read(R* msg) override {
607     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> ops;
608     ops.RecvMessage(msg);
609     call_->PerformOps(&ops);
610     return call_->cq()->Pluck(&ops) && ops.got_message;
611   }
612 
613  private:
614   ::grpc::internal::Call* const call_;
615   ServerContext* const ctx_;
616 
617   template <class ServiceType, class RequestType, class ResponseType>
618   friend class internal::ClientStreamingHandler;
619 
ServerReader(::grpc::internal::Call * call,::grpc::ServerContext * ctx)620   ServerReader(::grpc::internal::Call* call, ::grpc::ServerContext* ctx)
621       : call_(call), ctx_(ctx) {}
622 };
623 
624 /// Server-side interface for streaming writes of message of type \a W.
625 template <class W>
626 class ServerWriterInterface : public internal::ServerStreamingInterface,
627                               public internal::WriterInterface<W> {};
628 
629 /// Synchronous (blocking) server-side API for doing for doing a
630 /// server-streaming RPCs, where the outgoing message stream coming from the
631 /// server has messages of type \a W.
632 template <class W>
633 class ServerWriter final : public ServerWriterInterface<W> {
634  public:
635   /// See the \a ServerStreamingInterface.SendInitialMetadata method
636   /// for semantics.
637   /// Note that initial metadata will be affected by the
638   /// \a ServerContext associated with this call.
SendInitialMetadata()639   void SendInitialMetadata() override {
640     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
641 
642     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
643         ops;
644     ops.SendInitialMetadata(&ctx_->initial_metadata_,
645                             ctx_->initial_metadata_flags());
646     if (ctx_->compression_level_set()) {
647       ops.set_compression_level(ctx_->compression_level());
648     }
649     ctx_->sent_initial_metadata_ = true;
650     call_->PerformOps(&ops);
651     call_->cq()->Pluck(&ops);
652   }
653 
654   /// See the \a WriterInterface.Write method for semantics.
655   ///
656   /// Side effect:
657   ///   Also sends initial metadata if not already sent (using the
658   ///   \a ClientContext associated with this call to fill in values).
659   using internal::WriterInterface<W>::Write;
Write(const W & msg,::grpc::WriteOptions options)660   bool Write(const W& msg, ::grpc::WriteOptions options) override {
661     if (options.is_last_message()) {
662       options.set_buffer_hint();
663     }
664 
665     if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) {
666       return false;
667     }
668     if (!ctx_->sent_initial_metadata_) {
669       ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
670                                              ctx_->initial_metadata_flags());
671       if (ctx_->compression_level_set()) {
672         ctx_->pending_ops_.set_compression_level(ctx_->compression_level());
673       }
674       ctx_->sent_initial_metadata_ = true;
675     }
676     call_->PerformOps(&ctx_->pending_ops_);
677     // if this is the last message we defer the pluck until AFTER we start
678     // the trailing md op. This prevents hangs. See
679     // https://github.com/grpc/grpc/issues/11546
680     if (options.is_last_message()) {
681       ctx_->has_pending_ops_ = true;
682       return true;
683     }
684     ctx_->has_pending_ops_ = false;
685     return call_->cq()->Pluck(&ctx_->pending_ops_);
686   }
687 
688  private:
689   ::grpc::internal::Call* const call_;
690   ::grpc::ServerContext* const ctx_;
691 
692   template <class ServiceType, class RequestType, class ResponseType>
693   friend class internal::ServerStreamingHandler;
694 
ServerWriter(::grpc::internal::Call * call,::grpc::ServerContext * ctx)695   ServerWriter(::grpc::internal::Call* call, ::grpc::ServerContext* ctx)
696       : call_(call), ctx_(ctx) {}
697 };
698 
699 /// Server-side interface for bi-directional streaming.
700 template <class W, class R>
701 class ServerReaderWriterInterface : public internal::ServerStreamingInterface,
702                                     public internal::WriterInterface<W>,
703                                     public internal::ReaderInterface<R> {};
704 
705 /// Actual implementation of bi-directional streaming
706 namespace internal {
707 template <class W, class R>
708 class ServerReaderWriterBody final {
709  public:
ServerReaderWriterBody(grpc::internal::Call * call,::grpc::ServerContext * ctx)710   ServerReaderWriterBody(grpc::internal::Call* call, ::grpc::ServerContext* ctx)
711       : call_(call), ctx_(ctx) {}
712 
SendInitialMetadata()713   void SendInitialMetadata() {
714     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
715 
716     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
717     ops.SendInitialMetadata(&ctx_->initial_metadata_,
718                             ctx_->initial_metadata_flags());
719     if (ctx_->compression_level_set()) {
720       ops.set_compression_level(ctx_->compression_level());
721     }
722     ctx_->sent_initial_metadata_ = true;
723     call_->PerformOps(&ops);
724     call_->cq()->Pluck(&ops);
725   }
726 
NextMessageSize(uint32_t * sz)727   bool NextMessageSize(uint32_t* sz) {
728     int result = call_->max_receive_message_size();
729     *sz = (result > 0) ? result : UINT32_MAX;
730     return true;
731   }
732 
Read(R * msg)733   bool Read(R* msg) {
734     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> ops;
735     ops.RecvMessage(msg);
736     call_->PerformOps(&ops);
737     return call_->cq()->Pluck(&ops) && ops.got_message;
738   }
739 
Write(const W & msg,::grpc::WriteOptions options)740   bool Write(const W& msg, ::grpc::WriteOptions options) {
741     if (options.is_last_message()) {
742       options.set_buffer_hint();
743     }
744     if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) {
745       return false;
746     }
747     if (!ctx_->sent_initial_metadata_) {
748       ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
749                                              ctx_->initial_metadata_flags());
750       if (ctx_->compression_level_set()) {
751         ctx_->pending_ops_.set_compression_level(ctx_->compression_level());
752       }
753       ctx_->sent_initial_metadata_ = true;
754     }
755     call_->PerformOps(&ctx_->pending_ops_);
756     // if this is the last message we defer the pluck until AFTER we start
757     // the trailing md op. This prevents hangs. See
758     // https://github.com/grpc/grpc/issues/11546
759     if (options.is_last_message()) {
760       ctx_->has_pending_ops_ = true;
761       return true;
762     }
763     ctx_->has_pending_ops_ = false;
764     return call_->cq()->Pluck(&ctx_->pending_ops_);
765   }
766 
767  private:
768   grpc::internal::Call* const call_;
769   ::grpc::ServerContext* const ctx_;
770 };
771 
772 }  // namespace internal
773 
774 /// Synchronous (blocking) server-side API for a bidirectional
775 /// streaming call, where the incoming message stream coming from the client has
776 /// messages of type \a R, and the outgoing message streaming coming from
777 /// the server has messages of type \a W.
778 template <class W, class R>
779 class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> {
780  public:
781   /// See the \a ServerStreamingInterface.SendInitialMetadata method
782   /// for semantics. Note that initial metadata will be affected by the
783   /// \a ServerContext associated with this call.
SendInitialMetadata()784   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
785 
NextMessageSize(uint32_t * sz)786   bool NextMessageSize(uint32_t* sz) override {
787     return body_.NextMessageSize(sz);
788   }
789 
Read(R * msg)790   bool Read(R* msg) override { return body_.Read(msg); }
791 
792   /// See the \a WriterInterface.Write(const W& msg, WriteOptions options)
793   /// method for semantics.
794   /// Side effect:
795   ///   Also sends initial metadata if not already sent (using the \a
796   ///   ServerContext associated with this call).
797   using internal::WriterInterface<W>::Write;
Write(const W & msg,::grpc::WriteOptions options)798   bool Write(const W& msg, ::grpc::WriteOptions options) override {
799     return body_.Write(msg, options);
800   }
801 
802  private:
803   internal::ServerReaderWriterBody<W, R> body_;
804 
805   friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>,
806                                                        false>;
ServerReaderWriter(::grpc::internal::Call * call,::grpc::ServerContext * ctx)807   ServerReaderWriter(::grpc::internal::Call* call, ::grpc::ServerContext* ctx)
808       : body_(call, ctx) {}
809 };
810 
811 /// A class to represent a flow-controlled unary call. This is something
812 /// of a hybrid between conventional unary and streaming. This is invoked
813 /// through a unary call on the client side, but the server responds to it
814 /// as though it were a single-ping-pong streaming call. The server can use
815 /// the \a NextMessageSize method to determine an upper-bound on the size of
816 /// the message. A key difference relative to streaming: ServerUnaryStreamer
817 /// must have exactly 1 Read and exactly 1 Write, in that order, to function
818 /// correctly. Otherwise, the RPC is in error.
819 template <class RequestType, class ResponseType>
820 class ServerUnaryStreamer final
821     : public ServerReaderWriterInterface<ResponseType, RequestType> {
822  public:
823   /// Block to send initial metadata to client.
824   /// Implicit input parameter:
825   ///    - the \a ServerContext associated with this call will be used for
826   ///      sending initial metadata.
SendInitialMetadata()827   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
828 
829   /// Get an upper bound on the request message size from the client.
NextMessageSize(uint32_t * sz)830   bool NextMessageSize(uint32_t* sz) override {
831     return body_.NextMessageSize(sz);
832   }
833 
834   /// Read a message of type \a R into \a msg. Completion will be notified by \a
835   /// tag on the associated completion queue.
836   /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
837   /// should not be called concurrently with other streaming APIs
838   /// on the same stream. It is not meaningful to call it concurrently
839   /// with another \a ReaderInterface::Read on the same stream since reads on
840   /// the same stream are delivered in order.
841   ///
842   /// \param[out] msg Where to eventually store the read message.
843   /// \param[in] tag The tag identifying the operation.
Read(RequestType * request)844   bool Read(RequestType* request) override {
845     if (read_done_) {
846       return false;
847     }
848     read_done_ = true;
849     return body_.Read(request);
850   }
851 
852   /// Block to write \a msg to the stream with WriteOptions \a options.
853   /// This is thread-safe with respect to \a ReaderInterface::Read
854   ///
855   /// \param msg The message to be written to the stream.
856   /// \param options The WriteOptions affecting the write operation.
857   ///
858   /// \return \a true on success, \a false when the stream has been closed.
859   using internal::WriterInterface<ResponseType>::Write;
Write(const ResponseType & response,::grpc::WriteOptions options)860   bool Write(const ResponseType& response,
861              ::grpc::WriteOptions options) override {
862     if (write_done_ || !read_done_) {
863       return false;
864     }
865     write_done_ = true;
866     return body_.Write(response, options);
867   }
868 
869  private:
870   internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
871   bool read_done_;
872   bool write_done_;
873 
874   friend class internal::TemplatedBidiStreamingHandler<
875       ServerUnaryStreamer<RequestType, ResponseType>, true>;
ServerUnaryStreamer(::grpc::internal::Call * call,::grpc::ServerContext * ctx)876   ServerUnaryStreamer(::grpc::internal::Call* call, ::grpc::ServerContext* ctx)
877       : body_(call, ctx), read_done_(false), write_done_(false) {}
878 };
879 
880 /// A class to represent a flow-controlled server-side streaming call.
881 /// This is something of a hybrid between server-side and bidi streaming.
882 /// This is invoked through a server-side streaming call on the client side,
883 /// but the server responds to it as though it were a bidi streaming call that
884 /// must first have exactly 1 Read and then any number of Writes.
885 template <class RequestType, class ResponseType>
886 class ServerSplitStreamer final
887     : public ServerReaderWriterInterface<ResponseType, RequestType> {
888  public:
889   /// Block to send initial metadata to client.
890   /// Implicit input parameter:
891   ///    - the \a ServerContext associated with this call will be used for
892   ///      sending initial metadata.
SendInitialMetadata()893   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
894 
895   /// Get an upper bound on the request message size from the client.
NextMessageSize(uint32_t * sz)896   bool NextMessageSize(uint32_t* sz) override {
897     return body_.NextMessageSize(sz);
898   }
899 
900   /// Read a message of type \a R into \a msg. Completion will be notified by \a
901   /// tag on the associated completion queue.
902   /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
903   /// should not be called concurrently with other streaming APIs
904   /// on the same stream. It is not meaningful to call it concurrently
905   /// with another \a ReaderInterface::Read on the same stream since reads on
906   /// the same stream are delivered in order.
907   ///
908   /// \param[out] msg Where to eventually store the read message.
909   /// \param[in] tag The tag identifying the operation.
Read(RequestType * request)910   bool Read(RequestType* request) override {
911     if (read_done_) {
912       return false;
913     }
914     read_done_ = true;
915     return body_.Read(request);
916   }
917 
918   /// Block to write \a msg to the stream with WriteOptions \a options.
919   /// This is thread-safe with respect to \a ReaderInterface::Read
920   ///
921   /// \param msg The message to be written to the stream.
922   /// \param options The WriteOptions affecting the write operation.
923   ///
924   /// \return \a true on success, \a false when the stream has been closed.
925   using internal::WriterInterface<ResponseType>::Write;
Write(const ResponseType & response,::grpc::WriteOptions options)926   bool Write(const ResponseType& response,
927              ::grpc::WriteOptions options) override {
928     return read_done_ && body_.Write(response, options);
929   }
930 
931  private:
932   internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
933   bool read_done_;
934 
935   friend class internal::TemplatedBidiStreamingHandler<
936       ServerSplitStreamer<RequestType, ResponseType>, false>;
ServerSplitStreamer(::grpc::internal::Call * call,::grpc::ServerContext * ctx)937   ServerSplitStreamer(::grpc::internal::Call* call, ::grpc::ServerContext* ctx)
938       : body_(call, ctx), read_done_(false) {}
939 };
940 
941 }  // namespace grpc
942 
943 #endif  // GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H
944