• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_SYNC_STREAM_IMPL_H
19 #define GRPCPP_IMPL_CODEGEN_SYNC_STREAM_IMPL_H
20 
21 #include <grpcpp/impl/codegen/call.h>
22 #include <grpcpp/impl/codegen/channel_interface.h>
23 #include <grpcpp/impl/codegen/client_context_impl.h>
24 #include <grpcpp/impl/codegen/completion_queue_impl.h>
25 #include <grpcpp/impl/codegen/core_codegen_interface.h>
26 #include <grpcpp/impl/codegen/server_context_impl.h>
27 #include <grpcpp/impl/codegen/service_type.h>
28 #include <grpcpp/impl/codegen/status.h>
29 
30 namespace grpc_impl {
31 
32 namespace internal {
33 /// Common interface for all synchronous client side streaming.
34 class ClientStreamingInterface {
35  public:
~ClientStreamingInterface()36   virtual ~ClientStreamingInterface() {}
37 
38   /// Block waiting until the stream finishes and a final status of the call is
39   /// available.
40   ///
41   /// It is appropriate to call this method exactly once when both:
42   ///   * the calling code (client-side) has no more message to send
43   ///     (this can be declared implicitly by calling this method, or
44   ///     explicitly through an earlier call to <i>WritesDone</i> method of the
45   ///     class in use, e.g. \a ClientWriterInterface::WritesDone or
46   ///     \a ClientReaderWriterInterface::WritesDone).
47   ///   * there are no more messages to be received from the server (which can
48   ///     be known implicitly, or explicitly from an earlier call to \a
49   ///     ReaderInterface::Read that returned "false").
50   ///
51   /// This function will return either:
52   /// - when all incoming messages have been read and the server has
53   ///   returned status.
54   /// - when the server has returned a non-OK status.
55   /// - OR when the call failed for some reason and the library generated a
56   ///   status.
57   ///
58   /// Return values:
59   ///   - \a Status contains the status code, message and details for the call
60   ///   - the \a ClientContext associated with this call is updated with
61   ///     possible trailing metadata sent from the server.
62   virtual ::grpc::Status Finish() = 0;
63 };
64 
65 /// Common interface for all synchronous server side streaming.
66 class ServerStreamingInterface {
67  public:
~ServerStreamingInterface()68   virtual ~ServerStreamingInterface() {}
69 
70   /// Block to send initial metadata to client.
71   /// This call is optional, but if it is used, it cannot be used concurrently
72   /// with or after the \a Finish method.
73   ///
74   /// The initial metadata that will be sent to the client will be
75   /// taken from the \a ServerContext associated with the call.
76   virtual void SendInitialMetadata() = 0;
77 };
78 
79 /// An interface that yields a sequence of messages of type \a R.
80 template <class R>
81 class ReaderInterface {
82  public:
~ReaderInterface()83   virtual ~ReaderInterface() {}
84 
85   /// Get an upper bound on the next message size available for reading on this
86   /// stream.
87   virtual bool NextMessageSize(uint32_t* sz) = 0;
88 
89   /// Block to read a message and parse to \a msg. Returns \a true on success.
90   /// This is thread-safe with respect to \a Write or \WritesDone methods on
91   /// the same stream. It should not be called concurrently with another \a
92   /// Read on the same stream as the order of delivery will not be defined.
93   ///
94   /// \param[out] msg The read message.
95   ///
96   /// \return \a false when there will be no more incoming messages, either
97   /// because the other side has called \a WritesDone() or the stream has failed
98   /// (or been cancelled).
99   virtual bool Read(R* msg) = 0;
100 };
101 
102 /// An interface that can be fed a sequence of messages of type \a W.
103 template <class W>
104 class WriterInterface {
105  public:
~WriterInterface()106   virtual ~WriterInterface() {}
107 
108   /// Block to write \a msg to the stream with WriteOptions \a options.
109   /// This is thread-safe with respect to \a ReaderInterface::Read
110   ///
111   /// \param msg The message to be written to the stream.
112   /// \param options The WriteOptions affecting the write operation.
113   ///
114   /// \return \a true on success, \a false when the stream has been closed.
115   virtual bool Write(const W& msg, ::grpc::WriteOptions options) = 0;
116 
117   /// Block to write \a msg to the stream with default write options.
118   /// This is thread-safe with respect to \a ReaderInterface::Read
119   ///
120   /// \param msg The message to be written to the stream.
121   ///
122   /// \return \a true on success, \a false when the stream has been closed.
Write(const W & msg)123   inline bool Write(const W& msg) { return Write(msg, ::grpc::WriteOptions()); }
124 
125   /// Write \a msg and coalesce it with the writing of trailing metadata, using
126   /// WriteOptions \a options.
127   ///
128   /// For client, WriteLast is equivalent of performing Write and WritesDone in
129   /// a single step. \a msg and trailing metadata are coalesced and sent on wire
130   /// by calling this function. For server, WriteLast buffers the \a msg.
131   /// The writing of \a msg is held until the service handler returns,
132   /// where \a msg and trailing metadata are coalesced and sent on wire.
133   /// Note that WriteLast can only buffer \a msg up to the flow control window
134   /// size. If \a msg size is larger than the window size, it will be sent on
135   /// wire without buffering.
136   ///
137   /// \param[in] msg The message to be written to the stream.
138   /// \param[in] options The WriteOptions to be used to write this message.
WriteLast(const W & msg,::grpc::WriteOptions options)139   void WriteLast(const W& msg, ::grpc::WriteOptions options) {
140     Write(msg, options.set_last_message());
141   }
142 };
143 
144 }  // namespace internal
145 
146 /// Client-side interface for streaming reads of message of type \a R.
147 template <class R>
148 class ClientReaderInterface : public internal::ClientStreamingInterface,
149                               public internal::ReaderInterface<R> {
150  public:
151   /// Block to wait for initial metadata from server. The received metadata
152   /// can only be accessed after this call returns. Should only be called before
153   /// the first read. Calling this method is optional, and if it is not called
154   /// the metadata will be available in ClientContext after the first read.
155   virtual void WaitForInitialMetadata() = 0;
156 };
157 
158 namespace internal {
159 template <class R>
160 class ClientReaderFactory {
161  public:
162   template <class W>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,const W & request)163   static ClientReader<R>* Create(::grpc::ChannelInterface* channel,
164                                  const ::grpc::internal::RpcMethod& method,
165                                  ::grpc_impl::ClientContext* context,
166                                  const W& request) {
167     return new ClientReader<R>(channel, method, context, request);
168   }
169 };
170 }  // namespace internal
171 
172 /// Synchronous (blocking) client-side API for doing server-streaming RPCs,
173 /// where the stream of messages coming from the server has messages
174 /// of type \a R.
175 template <class R>
176 class ClientReader final : public ClientReaderInterface<R> {
177  public:
178   /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
179   /// semantics.
180   ///
181   //  Side effect:
182   ///   Once complete, the initial metadata read from
183   ///   the server will be accessible through the \a ClientContext used to
184   ///   construct this object.
WaitForInitialMetadata()185   void WaitForInitialMetadata() override {
186     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
187 
188     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
189         ops;
190     ops.RecvInitialMetadata(context_);
191     call_.PerformOps(&ops);
192     cq_.Pluck(&ops);  /// status ignored
193   }
194 
NextMessageSize(uint32_t * sz)195   bool NextMessageSize(uint32_t* sz) override {
196     int result = call_.max_receive_message_size();
197     *sz = (result > 0) ? result : UINT32_MAX;
198     return true;
199   }
200 
201   /// See the \a ReaderInterface.Read method for semantics.
202   /// Side effect:
203   ///   This also receives initial metadata from the server, if not
204   ///   already received (if initial metadata is received, it can be then
205   ///   accessed through the \a ClientContext associated with this call).
Read(R * msg)206   bool Read(R* msg) override {
207     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
208                                 ::grpc::internal::CallOpRecvMessage<R>>
209         ops;
210     if (!context_->initial_metadata_received_) {
211       ops.RecvInitialMetadata(context_);
212     }
213     ops.RecvMessage(msg);
214     call_.PerformOps(&ops);
215     return cq_.Pluck(&ops) && ops.got_message;
216   }
217 
218   /// See the \a ClientStreamingInterface.Finish method for semantics.
219   ///
220   /// Side effect:
221   ///   The \a ClientContext associated with this call is updated with
222   ///   possible metadata received from the server.
Finish()223   ::grpc::Status Finish() override {
224     ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops;
225     ::grpc::Status status;
226     ops.ClientRecvStatus(context_, &status);
227     call_.PerformOps(&ops);
228     GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
229     return status;
230   }
231 
232  private:
233   friend class internal::ClientReaderFactory<R>;
234   ::grpc_impl::ClientContext* context_;
235   ::grpc_impl::CompletionQueue cq_;
236   ::grpc::internal::Call call_;
237 
238   /// Block to create a stream and write the initial metadata and \a request
239   /// out. Note that \a context will be used to fill in custom initial
240   /// metadata used to send to the server when starting the call.
241   template <class W>
ClientReader(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,const W & request)242   ClientReader(::grpc::ChannelInterface* channel,
243                const ::grpc::internal::RpcMethod& method,
244                ::grpc_impl::ClientContext* context, const W& request)
245       : context_(context),
246         cq_(grpc_completion_queue_attributes{
247             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
248             nullptr}),  // Pluckable cq
249         call_(channel->CreateCall(method, context, &cq_)) {
250     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
251                                 ::grpc::internal::CallOpSendMessage,
252                                 ::grpc::internal::CallOpClientSendClose>
253         ops;
254     ops.SendInitialMetadata(&context->send_initial_metadata_,
255                             context->initial_metadata_flags());
256     // TODO(ctiller): don't assert
257     GPR_CODEGEN_ASSERT(ops.SendMessagePtr(&request).ok());
258     ops.ClientSendClose();
259     call_.PerformOps(&ops);
260     cq_.Pluck(&ops);
261   }
262 };
263 
264 /// Client-side interface for streaming writes of message type \a W.
265 template <class W>
266 class ClientWriterInterface : public internal::ClientStreamingInterface,
267                               public internal::WriterInterface<W> {
268  public:
269   /// Half close writing from the client. (signal that the stream of messages
270   /// coming from the client is complete).
271   /// Blocks until currently-pending writes are completed.
272   /// Thread safe with respect to \a ReaderInterface::Read operations only
273   ///
274   /// \return Whether the writes were successful.
275   virtual bool WritesDone() = 0;
276 };
277 
278 namespace internal {
279 template <class W>
280 class ClientWriterFactory {
281  public:
282   template <class R>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,R * response)283   static ClientWriter<W>* Create(::grpc::ChannelInterface* channel,
284                                  const ::grpc::internal::RpcMethod& method,
285                                  ::grpc_impl::ClientContext* context,
286                                  R* response) {
287     return new ClientWriter<W>(channel, method, context, response);
288   }
289 };
290 }  // namespace internal
291 
292 /// Synchronous (blocking) client-side API for doing client-streaming RPCs,
293 /// where the outgoing message stream coming from the client has messages of
294 /// type \a W.
295 template <class W>
296 class ClientWriter : public ClientWriterInterface<W> {
297  public:
298   /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
299   /// semantics.
300   ///
301   //  Side effect:
302   ///   Once complete, the initial metadata read from the server will be
303   ///   accessible through the \a ClientContext used to construct this object.
WaitForInitialMetadata()304   void WaitForInitialMetadata() {
305     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
306 
307     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
308         ops;
309     ops.RecvInitialMetadata(context_);
310     call_.PerformOps(&ops);
311     cq_.Pluck(&ops);  // status ignored
312   }
313 
314   /// See the WriterInterface.Write(const W& msg, WriteOptions options) method
315   /// for semantics.
316   ///
317   /// Side effect:
318   ///   Also sends initial metadata if not already sent (using the
319   ///   \a ClientContext associated with this call).
320   using internal::WriterInterface<W>::Write;
Write(const W & msg,::grpc::WriteOptions options)321   bool Write(const W& msg, ::grpc::WriteOptions options) override {
322     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
323                                 ::grpc::internal::CallOpSendMessage,
324                                 ::grpc::internal::CallOpClientSendClose>
325         ops;
326 
327     if (options.is_last_message()) {
328       options.set_buffer_hint();
329       ops.ClientSendClose();
330     }
331     if (context_->initial_metadata_corked_) {
332       ops.SendInitialMetadata(&context_->send_initial_metadata_,
333                               context_->initial_metadata_flags());
334       context_->set_initial_metadata_corked(false);
335     }
336     if (!ops.SendMessagePtr(&msg, options).ok()) {
337       return false;
338     }
339 
340     call_.PerformOps(&ops);
341     return cq_.Pluck(&ops);
342   }
343 
WritesDone()344   bool WritesDone() override {
345     ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops;
346     ops.ClientSendClose();
347     call_.PerformOps(&ops);
348     return cq_.Pluck(&ops);
349   }
350 
351   /// See the ClientStreamingInterface.Finish method for semantics.
352   /// Side effects:
353   ///   - Also receives initial metadata if not already received.
354   ///   - Attempts to fill in the \a response parameter passed
355   ///     to the constructor of this instance with the response
356   ///     message from the server.
Finish()357   ::grpc::Status Finish() override {
358     ::grpc::Status status;
359     if (!context_->initial_metadata_received_) {
360       finish_ops_.RecvInitialMetadata(context_);
361     }
362     finish_ops_.ClientRecvStatus(context_, &status);
363     call_.PerformOps(&finish_ops_);
364     GPR_CODEGEN_ASSERT(cq_.Pluck(&finish_ops_));
365     return status;
366   }
367 
368  private:
369   friend class internal::ClientWriterFactory<W>;
370 
371   /// Block to create a stream (i.e. send request headers and other initial
372   /// metadata to the server). Note that \a context will be used to fill
373   /// in custom initial metadata. \a response will be filled in with the
374   /// single expected response message from the server upon a successful
375   /// call to the \a Finish method of this instance.
376   template <class R>
ClientWriter(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,R * response)377   ClientWriter(::grpc::ChannelInterface* channel,
378                const ::grpc::internal::RpcMethod& method,
379                ::grpc_impl::ClientContext* context, R* response)
380       : context_(context),
381         cq_(grpc_completion_queue_attributes{
382             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
383             nullptr}),  // Pluckable cq
384         call_(channel->CreateCall(method, context, &cq_)) {
385     finish_ops_.RecvMessage(response);
386     finish_ops_.AllowNoMessage();
387 
388     if (!context_->initial_metadata_corked_) {
389       ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
390           ops;
391       ops.SendInitialMetadata(&context->send_initial_metadata_,
392                               context->initial_metadata_flags());
393       call_.PerformOps(&ops);
394       cq_.Pluck(&ops);
395     }
396   }
397 
398   ::grpc_impl::ClientContext* context_;
399   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
400                               ::grpc::internal::CallOpGenericRecvMessage,
401                               ::grpc::internal::CallOpClientRecvStatus>
402       finish_ops_;
403   ::grpc_impl::CompletionQueue cq_;
404   ::grpc::internal::Call call_;
405 };
406 
407 /// Client-side interface for bi-directional streaming with
408 /// client-to-server stream messages of type \a W and
409 /// server-to-client stream messages of type \a R.
410 template <class W, class R>
411 class ClientReaderWriterInterface : public internal::ClientStreamingInterface,
412                                     public internal::WriterInterface<W>,
413                                     public internal::ReaderInterface<R> {
414  public:
415   /// Block to wait for initial metadata from server. The received metadata
416   /// can only be accessed after this call returns. Should only be called before
417   /// the first read. Calling this method is optional, and if it is not called
418   /// the metadata will be available in ClientContext after the first read.
419   virtual void WaitForInitialMetadata() = 0;
420 
421   /// Half close writing from the client. (signal that the stream of messages
422   /// coming from the client is complete).
423   /// Blocks until currently-pending writes are completed.
424   /// Thread-safe with respect to \a ReaderInterface::Read
425   ///
426   /// \return Whether the writes were successful.
427   virtual bool WritesDone() = 0;
428 };
429 
430 namespace internal {
431 template <class W, class R>
432 class ClientReaderWriterFactory {
433  public:
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context)434   static ClientReaderWriter<W, R>* Create(
435       ::grpc::ChannelInterface* channel,
436       const ::grpc::internal::RpcMethod& method,
437       ::grpc_impl::ClientContext* context) {
438     return new ClientReaderWriter<W, R>(channel, method, context);
439   }
440 };
441 }  // namespace internal
442 
443 /// Synchronous (blocking) client-side API for bi-directional streaming RPCs,
444 /// where the outgoing message stream coming from the client has messages of
445 /// type \a W, and the incoming messages stream coming from the server has
446 /// messages of type \a R.
447 template <class W, class R>
448 class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
449  public:
450   /// Block waiting to read initial metadata from the server.
451   /// This call is optional, but if it is used, it cannot be used concurrently
452   /// with or after the \a Finish method.
453   ///
454   /// Once complete, the initial metadata read from the server will be
455   /// accessible through the \a ClientContext used to construct this object.
WaitForInitialMetadata()456   void WaitForInitialMetadata() override {
457     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
458 
459     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
460         ops;
461     ops.RecvInitialMetadata(context_);
462     call_.PerformOps(&ops);
463     cq_.Pluck(&ops);  // status ignored
464   }
465 
NextMessageSize(uint32_t * sz)466   bool NextMessageSize(uint32_t* sz) override {
467     int result = call_.max_receive_message_size();
468     *sz = (result > 0) ? result : UINT32_MAX;
469     return true;
470   }
471 
472   /// See the \a ReaderInterface.Read method for semantics.
473   /// Side effect:
474   ///   Also receives initial metadata if not already received (updates the \a
475   ///   ClientContext associated with this call in that case).
Read(R * msg)476   bool Read(R* msg) override {
477     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
478                                 ::grpc::internal::CallOpRecvMessage<R>>
479         ops;
480     if (!context_->initial_metadata_received_) {
481       ops.RecvInitialMetadata(context_);
482     }
483     ops.RecvMessage(msg);
484     call_.PerformOps(&ops);
485     return cq_.Pluck(&ops) && ops.got_message;
486   }
487 
488   /// See the \a WriterInterface.Write method for semantics.
489   ///
490   /// Side effect:
491   ///   Also sends initial metadata if not already sent (using the
492   ///   \a ClientContext associated with this call to fill in values).
493   using internal::WriterInterface<W>::Write;
Write(const W & msg,::grpc::WriteOptions options)494   bool Write(const W& msg, ::grpc::WriteOptions options) override {
495     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
496                                 ::grpc::internal::CallOpSendMessage,
497                                 ::grpc::internal::CallOpClientSendClose>
498         ops;
499 
500     if (options.is_last_message()) {
501       options.set_buffer_hint();
502       ops.ClientSendClose();
503     }
504     if (context_->initial_metadata_corked_) {
505       ops.SendInitialMetadata(&context_->send_initial_metadata_,
506                               context_->initial_metadata_flags());
507       context_->set_initial_metadata_corked(false);
508     }
509     if (!ops.SendMessagePtr(&msg, options).ok()) {
510       return false;
511     }
512 
513     call_.PerformOps(&ops);
514     return cq_.Pluck(&ops);
515   }
516 
WritesDone()517   bool WritesDone() override {
518     ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops;
519     ops.ClientSendClose();
520     call_.PerformOps(&ops);
521     return cq_.Pluck(&ops);
522   }
523 
524   /// See the ClientStreamingInterface.Finish method for semantics.
525   ///
526   /// Side effect:
527   ///   - the \a ClientContext associated with this call is updated with
528   ///     possible trailing metadata sent from the server.
Finish()529   ::grpc::Status Finish() override {
530     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
531                                 ::grpc::internal::CallOpClientRecvStatus>
532         ops;
533     if (!context_->initial_metadata_received_) {
534       ops.RecvInitialMetadata(context_);
535     }
536     ::grpc::Status status;
537     ops.ClientRecvStatus(context_, &status);
538     call_.PerformOps(&ops);
539     GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
540     return status;
541   }
542 
543  private:
544   friend class internal::ClientReaderWriterFactory<W, R>;
545 
546   ::grpc_impl::ClientContext* context_;
547   ::grpc_impl::CompletionQueue cq_;
548   ::grpc::internal::Call call_;
549 
550   /// Block to create a stream and write the initial metadata and \a request
551   /// out. Note that \a context will be used to fill in custom initial metadata
552   /// used to send to the server when starting the call.
ClientReaderWriter(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context)553   ClientReaderWriter(::grpc::ChannelInterface* channel,
554                      const ::grpc::internal::RpcMethod& method,
555                      ::grpc_impl::ClientContext* context)
556       : context_(context),
557         cq_(grpc_completion_queue_attributes{
558             GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING,
559             nullptr}),  // Pluckable cq
560         call_(channel->CreateCall(method, context, &cq_)) {
561     if (!context_->initial_metadata_corked_) {
562       ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
563           ops;
564       ops.SendInitialMetadata(&context->send_initial_metadata_,
565                               context->initial_metadata_flags());
566       call_.PerformOps(&ops);
567       cq_.Pluck(&ops);
568     }
569   }
570 };
571 
572 /// Server-side interface for streaming reads of message of type \a R.
573 template <class R>
574 class ServerReaderInterface : public internal::ServerStreamingInterface,
575                               public internal::ReaderInterface<R> {};
576 
577 /// Synchronous (blocking) server-side API for doing client-streaming RPCs,
578 /// where the incoming message stream coming from the client has messages of
579 /// type \a R.
580 template <class R>
581 class ServerReader final : public ServerReaderInterface<R> {
582  public:
583   /// See the \a ServerStreamingInterface.SendInitialMetadata method
584   /// for semantics. Note that initial metadata will be affected by the
585   /// \a ServerContext associated with this call.
SendInitialMetadata()586   void SendInitialMetadata() override {
587     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
588 
589     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
590         ops;
591     ops.SendInitialMetadata(&ctx_->initial_metadata_,
592                             ctx_->initial_metadata_flags());
593     if (ctx_->compression_level_set()) {
594       ops.set_compression_level(ctx_->compression_level());
595     }
596     ctx_->sent_initial_metadata_ = true;
597     call_->PerformOps(&ops);
598     call_->cq()->Pluck(&ops);
599   }
600 
NextMessageSize(uint32_t * sz)601   bool NextMessageSize(uint32_t* sz) override {
602     int result = call_->max_receive_message_size();
603     *sz = (result > 0) ? result : UINT32_MAX;
604     return true;
605   }
606 
Read(R * msg)607   bool Read(R* msg) override {
608     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> ops;
609     ops.RecvMessage(msg);
610     call_->PerformOps(&ops);
611     return call_->cq()->Pluck(&ops) && ops.got_message;
612   }
613 
614  private:
615   ::grpc::internal::Call* const call_;
616   ServerContext* const ctx_;
617 
618   template <class ServiceType, class RequestType, class ResponseType>
619   friend class ::grpc_impl::internal::ClientStreamingHandler;
620 
ServerReader(::grpc::internal::Call * call,::grpc_impl::ServerContext * ctx)621   ServerReader(::grpc::internal::Call* call, ::grpc_impl::ServerContext* ctx)
622       : call_(call), ctx_(ctx) {}
623 };
624 
625 /// Server-side interface for streaming writes of message of type \a W.
626 template <class W>
627 class ServerWriterInterface : public internal::ServerStreamingInterface,
628                               public internal::WriterInterface<W> {};
629 
630 /// Synchronous (blocking) server-side API for doing for doing a
631 /// server-streaming RPCs, where the outgoing message stream coming from the
632 /// server has messages of type \a W.
633 template <class W>
634 class ServerWriter final : public ServerWriterInterface<W> {
635  public:
636   /// See the \a ServerStreamingInterface.SendInitialMetadata method
637   /// for semantics.
638   /// Note that initial metadata will be affected by the
639   /// \a ServerContext associated with this call.
SendInitialMetadata()640   void SendInitialMetadata() override {
641     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
642 
643     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
644         ops;
645     ops.SendInitialMetadata(&ctx_->initial_metadata_,
646                             ctx_->initial_metadata_flags());
647     if (ctx_->compression_level_set()) {
648       ops.set_compression_level(ctx_->compression_level());
649     }
650     ctx_->sent_initial_metadata_ = true;
651     call_->PerformOps(&ops);
652     call_->cq()->Pluck(&ops);
653   }
654 
655   /// See the \a WriterInterface.Write method for semantics.
656   ///
657   /// Side effect:
658   ///   Also sends initial metadata if not already sent (using the
659   ///   \a ClientContext associated with this call to fill in values).
660   using internal::WriterInterface<W>::Write;
Write(const W & msg,::grpc::WriteOptions options)661   bool Write(const W& msg, ::grpc::WriteOptions options) override {
662     if (options.is_last_message()) {
663       options.set_buffer_hint();
664     }
665 
666     if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) {
667       return false;
668     }
669     if (!ctx_->sent_initial_metadata_) {
670       ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
671                                              ctx_->initial_metadata_flags());
672       if (ctx_->compression_level_set()) {
673         ctx_->pending_ops_.set_compression_level(ctx_->compression_level());
674       }
675       ctx_->sent_initial_metadata_ = true;
676     }
677     call_->PerformOps(&ctx_->pending_ops_);
678     // if this is the last message we defer the pluck until AFTER we start
679     // the trailing md op. This prevents hangs. See
680     // https://github.com/grpc/grpc/issues/11546
681     if (options.is_last_message()) {
682       ctx_->has_pending_ops_ = true;
683       return true;
684     }
685     ctx_->has_pending_ops_ = false;
686     return call_->cq()->Pluck(&ctx_->pending_ops_);
687   }
688 
689  private:
690   ::grpc::internal::Call* const call_;
691   ::grpc_impl::ServerContext* const ctx_;
692 
693   template <class ServiceType, class RequestType, class ResponseType>
694   friend class ::grpc_impl::internal::ServerStreamingHandler;
695 
ServerWriter(::grpc::internal::Call * call,::grpc_impl::ServerContext * ctx)696   ServerWriter(::grpc::internal::Call* call, ::grpc_impl::ServerContext* ctx)
697       : call_(call), ctx_(ctx) {}
698 };
699 
700 /// Server-side interface for bi-directional streaming.
701 template <class W, class R>
702 class ServerReaderWriterInterface : public internal::ServerStreamingInterface,
703                                     public internal::WriterInterface<W>,
704                                     public internal::ReaderInterface<R> {};
705 
706 /// Actual implementation of bi-directional streaming
707 namespace internal {
708 template <class W, class R>
709 class ServerReaderWriterBody final {
710  public:
ServerReaderWriterBody(grpc::internal::Call * call,::grpc_impl::ServerContext * ctx)711   ServerReaderWriterBody(grpc::internal::Call* call,
712                          ::grpc_impl::ServerContext* ctx)
713       : call_(call), ctx_(ctx) {}
714 
SendInitialMetadata()715   void SendInitialMetadata() {
716     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
717 
718     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
719     ops.SendInitialMetadata(&ctx_->initial_metadata_,
720                             ctx_->initial_metadata_flags());
721     if (ctx_->compression_level_set()) {
722       ops.set_compression_level(ctx_->compression_level());
723     }
724     ctx_->sent_initial_metadata_ = true;
725     call_->PerformOps(&ops);
726     call_->cq()->Pluck(&ops);
727   }
728 
NextMessageSize(uint32_t * sz)729   bool NextMessageSize(uint32_t* sz) {
730     int result = call_->max_receive_message_size();
731     *sz = (result > 0) ? result : UINT32_MAX;
732     return true;
733   }
734 
Read(R * msg)735   bool Read(R* msg) {
736     ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> ops;
737     ops.RecvMessage(msg);
738     call_->PerformOps(&ops);
739     return call_->cq()->Pluck(&ops) && ops.got_message;
740   }
741 
Write(const W & msg,::grpc::WriteOptions options)742   bool Write(const W& msg, ::grpc::WriteOptions options) {
743     if (options.is_last_message()) {
744       options.set_buffer_hint();
745     }
746     if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok()) {
747       return false;
748     }
749     if (!ctx_->sent_initial_metadata_) {
750       ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
751                                              ctx_->initial_metadata_flags());
752       if (ctx_->compression_level_set()) {
753         ctx_->pending_ops_.set_compression_level(ctx_->compression_level());
754       }
755       ctx_->sent_initial_metadata_ = true;
756     }
757     call_->PerformOps(&ctx_->pending_ops_);
758     // if this is the last message we defer the pluck until AFTER we start
759     // the trailing md op. This prevents hangs. See
760     // https://github.com/grpc/grpc/issues/11546
761     if (options.is_last_message()) {
762       ctx_->has_pending_ops_ = true;
763       return true;
764     }
765     ctx_->has_pending_ops_ = false;
766     return call_->cq()->Pluck(&ctx_->pending_ops_);
767   }
768 
769  private:
770   grpc::internal::Call* const call_;
771   ::grpc_impl::ServerContext* const ctx_;
772 };
773 
774 }  // namespace internal
775 
776 /// Synchronous (blocking) server-side API for a bidirectional
777 /// streaming call, where the incoming message stream coming from the client has
778 /// messages of type \a R, and the outgoing message streaming coming from
779 /// the server has messages of type \a W.
780 template <class W, class R>
781 class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> {
782  public:
783   /// See the \a ServerStreamingInterface.SendInitialMetadata method
784   /// for semantics. Note that initial metadata will be affected by the
785   /// \a ServerContext associated with this call.
SendInitialMetadata()786   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
787 
NextMessageSize(uint32_t * sz)788   bool NextMessageSize(uint32_t* sz) override {
789     return body_.NextMessageSize(sz);
790   }
791 
Read(R * msg)792   bool Read(R* msg) override { return body_.Read(msg); }
793 
794   /// See the \a WriterInterface.Write(const W& msg, WriteOptions options)
795   /// method for semantics.
796   /// Side effect:
797   ///   Also sends initial metadata if not already sent (using the \a
798   ///   ServerContext associated with this call).
799   using internal::WriterInterface<W>::Write;
Write(const W & msg,::grpc::WriteOptions options)800   bool Write(const W& msg, ::grpc::WriteOptions options) override {
801     return body_.Write(msg, options);
802   }
803 
804  private:
805   internal::ServerReaderWriterBody<W, R> body_;
806 
807   friend class ::grpc_impl::internal::TemplatedBidiStreamingHandler<
808       ServerReaderWriter<W, R>, false>;
ServerReaderWriter(::grpc::internal::Call * call,::grpc_impl::ServerContext * ctx)809   ServerReaderWriter(::grpc::internal::Call* call,
810                      ::grpc_impl::ServerContext* ctx)
811       : body_(call, ctx) {}
812 };
813 
814 /// A class to represent a flow-controlled unary call. This is something
815 /// of a hybrid between conventional unary and streaming. This is invoked
816 /// through a unary call on the client side, but the server responds to it
817 /// as though it were a single-ping-pong streaming call. The server can use
818 /// the \a NextMessageSize method to determine an upper-bound on the size of
819 /// the message. A key difference relative to streaming: ServerUnaryStreamer
820 /// must have exactly 1 Read and exactly 1 Write, in that order, to function
821 /// correctly. Otherwise, the RPC is in error.
822 template <class RequestType, class ResponseType>
823 class ServerUnaryStreamer final
824     : public ServerReaderWriterInterface<ResponseType, RequestType> {
825  public:
826   /// Block to send initial metadata to client.
827   /// Implicit input parameter:
828   ///    - the \a ServerContext associated with this call will be used for
829   ///      sending initial metadata.
SendInitialMetadata()830   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
831 
832   /// Get an upper bound on the request message size from the client.
NextMessageSize(uint32_t * sz)833   bool NextMessageSize(uint32_t* sz) override {
834     return body_.NextMessageSize(sz);
835   }
836 
837   /// Read a message of type \a R into \a msg. Completion will be notified by \a
838   /// tag on the associated completion queue.
839   /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
840   /// should not be called concurrently with other streaming APIs
841   /// on the same stream. It is not meaningful to call it concurrently
842   /// with another \a ReaderInterface::Read on the same stream since reads on
843   /// the same stream are delivered in order.
844   ///
845   /// \param[out] msg Where to eventually store the read message.
846   /// \param[in] tag The tag identifying the operation.
Read(RequestType * request)847   bool Read(RequestType* request) override {
848     if (read_done_) {
849       return false;
850     }
851     read_done_ = true;
852     return body_.Read(request);
853   }
854 
855   /// Block to write \a msg to the stream with WriteOptions \a options.
856   /// This is thread-safe with respect to \a ReaderInterface::Read
857   ///
858   /// \param msg The message to be written to the stream.
859   /// \param options The WriteOptions affecting the write operation.
860   ///
861   /// \return \a true on success, \a false when the stream has been closed.
862   using internal::WriterInterface<ResponseType>::Write;
Write(const ResponseType & response,::grpc::WriteOptions options)863   bool Write(const ResponseType& response,
864              ::grpc::WriteOptions options) override {
865     if (write_done_ || !read_done_) {
866       return false;
867     }
868     write_done_ = true;
869     return body_.Write(response, options);
870   }
871 
872  private:
873   internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
874   bool read_done_;
875   bool write_done_;
876 
877   friend class ::grpc_impl::internal::TemplatedBidiStreamingHandler<
878       ServerUnaryStreamer<RequestType, ResponseType>, true>;
ServerUnaryStreamer(::grpc::internal::Call * call,::grpc_impl::ServerContext * ctx)879   ServerUnaryStreamer(::grpc::internal::Call* call,
880                       ::grpc_impl::ServerContext* ctx)
881       : body_(call, ctx), read_done_(false), write_done_(false) {}
882 };
883 
884 /// A class to represent a flow-controlled server-side streaming call.
885 /// This is something of a hybrid between server-side and bidi streaming.
886 /// This is invoked through a server-side streaming call on the client side,
887 /// but the server responds to it as though it were a bidi streaming call that
888 /// must first have exactly 1 Read and then any number of Writes.
889 template <class RequestType, class ResponseType>
890 class ServerSplitStreamer final
891     : public ServerReaderWriterInterface<ResponseType, RequestType> {
892  public:
893   /// Block to send initial metadata to client.
894   /// Implicit input parameter:
895   ///    - the \a ServerContext associated with this call will be used for
896   ///      sending initial metadata.
SendInitialMetadata()897   void SendInitialMetadata() override { body_.SendInitialMetadata(); }
898 
899   /// Get an upper bound on the request message size from the client.
NextMessageSize(uint32_t * sz)900   bool NextMessageSize(uint32_t* sz) override {
901     return body_.NextMessageSize(sz);
902   }
903 
904   /// Read a message of type \a R into \a msg. Completion will be notified by \a
905   /// tag on the associated completion queue.
906   /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
907   /// should not be called concurrently with other streaming APIs
908   /// on the same stream. It is not meaningful to call it concurrently
909   /// with another \a ReaderInterface::Read on the same stream since reads on
910   /// the same stream are delivered in order.
911   ///
912   /// \param[out] msg Where to eventually store the read message.
913   /// \param[in] tag The tag identifying the operation.
Read(RequestType * request)914   bool Read(RequestType* request) override {
915     if (read_done_) {
916       return false;
917     }
918     read_done_ = true;
919     return body_.Read(request);
920   }
921 
922   /// Block to write \a msg to the stream with WriteOptions \a options.
923   /// This is thread-safe with respect to \a ReaderInterface::Read
924   ///
925   /// \param msg The message to be written to the stream.
926   /// \param options The WriteOptions affecting the write operation.
927   ///
928   /// \return \a true on success, \a false when the stream has been closed.
929   using internal::WriterInterface<ResponseType>::Write;
Write(const ResponseType & response,::grpc::WriteOptions options)930   bool Write(const ResponseType& response,
931              ::grpc::WriteOptions options) override {
932     return read_done_ && body_.Write(response, options);
933   }
934 
935  private:
936   internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
937   bool read_done_;
938 
939   friend class ::grpc_impl::internal::TemplatedBidiStreamingHandler<
940       ServerSplitStreamer<RequestType, ResponseType>, false>;
ServerSplitStreamer(::grpc::internal::Call * call,::grpc_impl::ServerContext * ctx)941   ServerSplitStreamer(::grpc::internal::Call* call,
942                       ::grpc_impl::ServerContext* ctx)
943       : body_(call, ctx), read_done_(false) {}
944 };
945 
946 }  // namespace grpc_impl
947 
948 #endif  // GRPCPP_IMPL_CODEGEN_SYNC_STREAM_IMPL_H
949