• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_ASYNC_STREAM_H
20 #define GRPCPP_SUPPORT_ASYNC_STREAM_H
21 
22 #include <grpc/grpc.h>
23 #include <grpcpp/impl/call.h>
24 #include <grpcpp/impl/channel_interface.h>
25 #include <grpcpp/impl/service_type.h>
26 #include <grpcpp/server_context.h>
27 #include <grpcpp/support/status.h>
28 
29 #include "absl/log/absl_check.h"
30 
31 namespace grpc {
32 
33 namespace internal {
34 /// Common interface for all client side asynchronous streaming.
35 class ClientAsyncStreamingInterface {
36  public:
~ClientAsyncStreamingInterface()37   virtual ~ClientAsyncStreamingInterface() {}
38 
39   /// Start the call that was set up by the constructor, but only if the
40   /// constructor was invoked through the "Prepare" API which doesn't actually
41   /// start the call.
42   ///
43   /// It is illegal to start a write-type operation (eg. Write(), WriteLast(),
44   /// WritesDone()) while the `StartCall()` operation has not finished
45   /// (determined by the returning of \a tag).
46   virtual void StartCall(void* tag) = 0;
47 
48   /// Request notification of the reading of the initial metadata. Completion
49   /// will be notified by \a tag on the associated completion queue.
50   /// This call is optional, but if it is used, it cannot be used concurrently
51   /// with or after the \a AsyncReaderInterface::Read method.
52   ///
53   /// \param[in] tag Tag identifying this request.
54   virtual void ReadInitialMetadata(void* tag) = 0;
55 
56   /// Indicate that the stream is to be finished and request notification for
57   /// when the call has been ended.
58   /// Should not be used concurrently with other operations.
59   ///
60   /// It is appropriate to call this method exactly once when both:
61   ///   * the client side has no more message to send
62   ///     (this can be declared implicitly by calling this method, or
63   ///     explicitly through an earlier call to the <i>WritesDone</i> method
64   ///     of the class in use, e.g. \a ClientAsyncWriterInterface::WritesDone or
65   ///     \a ClientAsyncReaderWriterInterface::WritesDone).
66   ///   * there are no more messages to be received from the server (this can
67   ///     be known implicitly by the calling code, or explicitly from an
68   ///     earlier call to \a AsyncReaderInterface::Read that yielded a failed
69   ///     result, e.g. cq->Next(&read_tag, &ok) filled in 'ok' with 'false').
70   ///
71   /// The tag will be returned when either:
72   /// - all incoming messages have been read and the server has returned
73   ///   a status.
74   /// - the server has returned a non-OK status.
75   /// - the call failed for some reason and the library generated a
76   ///   status.
77   ///
78   /// Note that implementations of this method attempt to receive initial
79   /// metadata from the server if initial metadata hasn't yet been received.
80   ///
81   /// \param[in] tag Tag identifying this request.
82   /// \param[out] status To be updated with the operation status.
83   virtual void Finish(grpc::Status* status, void* tag) = 0;
84 };
85 
86 /// An interface that yields a sequence of messages of type \a R.
87 template <class R>
88 class AsyncReaderInterface {
89  public:
~AsyncReaderInterface()90   virtual ~AsyncReaderInterface() {}
91 
92   /// Read a message of type \a R into \a msg. Completion will be notified by \a
93   /// tag on the associated completion queue.
94   /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
95   /// should not be called concurrently with other streaming APIs
96   /// on the same stream. It is not meaningful to call it concurrently
97   /// with another \a AsyncReaderInterface::Read on the same stream since reads
98   /// on the same stream are delivered in order.
99   ///
100   /// \param[out] msg Where to eventually store the read message.
101   /// \param[in] tag The tag identifying the operation.
102   ///
103   /// Side effect: note that this method attempt to receive initial metadata for
104   /// a stream if it hasn't yet been received.
105   virtual void Read(R* msg, void* tag) = 0;
106 };
107 
108 /// An interface that can be fed a sequence of messages of type \a W.
109 template <class W>
110 class AsyncWriterInterface {
111  public:
~AsyncWriterInterface()112   virtual ~AsyncWriterInterface() {}
113 
114   /// Request the writing of \a msg with identifying tag \a tag.
115   ///
116   /// Only one write may be outstanding at any given time. This means that
117   /// after calling Write, one must wait to receive \a tag from the completion
118   /// queue BEFORE calling Write again.
119   /// This is thread-safe with respect to \a AsyncReaderInterface::Read
120   ///
121   /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to
122   /// to deallocate once Write returns.
123   ///
124   /// \param[in] msg The message to be written.
125   /// \param[in] tag The tag identifying the operation.
126   virtual void Write(const W& msg, void* tag) = 0;
127 
128   /// Request the writing of \a msg using WriteOptions \a options with
129   /// identifying tag \a tag.
130   ///
131   /// Only one write may be outstanding at any given time. This means that
132   /// after calling Write, one must wait to receive \a tag from the completion
133   /// queue BEFORE calling Write again.
134   /// WriteOptions \a options is used to set the write options of this message.
135   /// This is thread-safe with respect to \a AsyncReaderInterface::Read
136   ///
137   /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to
138   /// to deallocate once Write returns.
139   ///
140   /// \param[in] msg The message to be written.
141   /// \param[in] options The WriteOptions to be used to write this message.
142   /// \param[in] tag The tag identifying the operation.
143   virtual void Write(const W& msg, grpc::WriteOptions options, void* tag) = 0;
144 
145   /// Request the writing of \a msg and coalesce it with the writing
146   /// of trailing metadata, using WriteOptions \a options with
147   /// identifying tag \a tag.
148   ///
149   /// For client, WriteLast is equivalent of performing Write and
150   /// WritesDone in a single step.
151   /// For server, WriteLast buffers the \a msg. The writing of \a msg is held
152   /// until Finish is called, where \a msg and trailing metadata are coalesced
153   /// and write is initiated. Note that WriteLast can only buffer \a msg up to
154   /// the flow control window size. If \a msg size is larger than the window
155   /// size, it will be sent on wire without buffering.
156   ///
157   /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to
158   /// to deallocate once Write returns.
159   ///
160   /// \param[in] msg The message to be written.
161   /// \param[in] options The WriteOptions to be used to write this message.
162   /// \param[in] tag The tag identifying the operation.
WriteLast(const W & msg,grpc::WriteOptions options,void * tag)163   void WriteLast(const W& msg, grpc::WriteOptions options, void* tag) {
164     Write(msg, options.set_last_message(), tag);
165   }
166 };
167 
168 }  // namespace internal
169 
170 template <class R>
171 class ClientAsyncReaderInterface
172     : public internal::ClientAsyncStreamingInterface,
173       public internal::AsyncReaderInterface<R> {};
174 
175 namespace internal {
176 template <class R>
177 class ClientAsyncReaderFactory {
178  public:
179   /// Create a stream object.
180   /// Write the first request out if \a start is set.
181   /// \a tag will be notified on \a cq when the call has been started and
182   /// \a request has been written out. If \a start is not set, \a tag must be
183   /// nullptr and the actual call must be initiated by StartCall
184   /// Note that \a context will be used to fill in custom initial metadata
185   /// used to send to the server when starting the call.
186   template <class W>
Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,const W & request,bool start,void * tag)187   static ClientAsyncReader<R>* Create(grpc::ChannelInterface* channel,
188                                       grpc::CompletionQueue* cq,
189                                       const grpc::internal::RpcMethod& method,
190                                       grpc::ClientContext* context,
191                                       const W& request, bool start, void* tag) {
192     grpc::internal::Call call = channel->CreateCall(method, context, cq);
193     return new (
194         grpc_call_arena_alloc(call.call(), sizeof(ClientAsyncReader<R>)))
195         ClientAsyncReader<R>(call, context, request, start, tag);
196   }
197 };
198 }  // namespace internal
199 
200 /// Async client-side API for doing server-streaming RPCs,
201 /// where the incoming message stream coming from the server has
202 /// messages of type \a R.
203 template <class R>
204 class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
205  public:
206   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)207   static void operator delete(void* /*ptr*/, std::size_t size) {
208     ABSL_CHECK_EQ(size, sizeof(ClientAsyncReader));
209   }
210 
211   // This operator should never be called as the memory should be freed as part
212   // of the arena destruction. It only exists to provide a matching operator
213   // delete to the operator new so that some compilers will not complain (see
214   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
215   // there are no tests catching the compiler warning.
delete(void *,void *)216   static void operator delete(void*, void*) { ABSL_CHECK(false); }
217 
StartCall(void * tag)218   void StartCall(void* tag) override {
219     ABSL_CHECK(!started_);
220     started_ = true;
221     StartCallInternal(tag);
222   }
223 
224   /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata
225   /// method for semantics.
226   ///
227   /// Side effect:
228   ///   - upon receiving initial metadata from the server,
229   ///     the \a ClientContext associated with this call is updated, and the
230   ///     calling code can access the received metadata through the
231   ///     \a ClientContext.
ReadInitialMetadata(void * tag)232   void ReadInitialMetadata(void* tag) override {
233     ABSL_CHECK(started_);
234     ABSL_CHECK(!context_->initial_metadata_received_);
235 
236     meta_ops_.set_output_tag(tag);
237     meta_ops_.RecvInitialMetadata(context_);
238     call_.PerformOps(&meta_ops_);
239   }
240 
Read(R * msg,void * tag)241   void Read(R* msg, void* tag) override {
242     ABSL_CHECK(started_);
243     read_ops_.set_output_tag(tag);
244     if (!context_->initial_metadata_received_) {
245       read_ops_.RecvInitialMetadata(context_);
246     }
247     read_ops_.RecvMessage(msg);
248     call_.PerformOps(&read_ops_);
249   }
250 
251   /// See the \a ClientAsyncStreamingInterface.Finish method for semantics.
252   ///
253   /// Side effect:
254   ///   - the \a ClientContext associated with this call is updated with
255   ///     possible initial and trailing metadata received from the server.
Finish(grpc::Status * status,void * tag)256   void Finish(grpc::Status* status, void* tag) override {
257     ABSL_CHECK(started_);
258     finish_ops_.set_output_tag(tag);
259     if (!context_->initial_metadata_received_) {
260       finish_ops_.RecvInitialMetadata(context_);
261     }
262     finish_ops_.ClientRecvStatus(context_, status);
263     call_.PerformOps(&finish_ops_);
264   }
265 
266  private:
267   friend class internal::ClientAsyncReaderFactory<R>;
268   template <class W>
ClientAsyncReader(grpc::internal::Call call,grpc::ClientContext * context,const W & request,bool start,void * tag)269   ClientAsyncReader(grpc::internal::Call call, grpc::ClientContext* context,
270                     const W& request, bool start, void* tag)
271       : context_(context), call_(call), started_(start) {
272     // TODO(ctiller): don't assert
273     ABSL_CHECK(init_ops_.SendMessage(request).ok());
274     init_ops_.ClientSendClose();
275     if (start) {
276       StartCallInternal(tag);
277     } else {
278       ABSL_CHECK(tag == nullptr);
279     }
280   }
281 
StartCallInternal(void * tag)282   void StartCallInternal(void* tag) {
283     init_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
284                                   context_->initial_metadata_flags());
285     init_ops_.set_output_tag(tag);
286     call_.PerformOps(&init_ops_);
287   }
288 
289   grpc::ClientContext* context_;
290   grpc::internal::Call call_;
291   bool started_;
292   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
293                             grpc::internal::CallOpSendMessage,
294                             grpc::internal::CallOpClientSendClose>
295       init_ops_;
296   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata>
297       meta_ops_;
298   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
299                             grpc::internal::CallOpRecvMessage<R>>
300       read_ops_;
301   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
302                             grpc::internal::CallOpClientRecvStatus>
303       finish_ops_;
304 };
305 
306 /// Common interface for client side asynchronous writing.
307 template <class W>
308 class ClientAsyncWriterInterface
309     : public internal::ClientAsyncStreamingInterface,
310       public internal::AsyncWriterInterface<W> {
311  public:
312   /// Signal the client is done with the writes (half-close the client stream).
313   /// Thread-safe with respect to \a AsyncReaderInterface::Read
314   ///
315   /// \param[in] tag The tag identifying the operation.
316   virtual void WritesDone(void* tag) = 0;
317 };
318 
319 namespace internal {
320 template <class W>
321 class ClientAsyncWriterFactory {
322  public:
323   /// Create a stream object.
324   /// Start the RPC if \a start is set
325   /// \a tag will be notified on \a cq when the call has been started (i.e.
326   /// initial metadata sent) and \a request has been written out.
327   /// If \a start is not set, \a tag must be nullptr and the actual call
328   /// must be initiated by StartCall
329   /// Note that \a context will be used to fill in custom initial metadata
330   /// used to send to the server when starting the call.
331   /// \a response will be filled in with the single expected response
332   /// message from the server upon a successful call to the \a Finish
333   /// method of this instance.
334   template <class R>
Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,R * response,bool start,void * tag)335   static ClientAsyncWriter<W>* Create(grpc::ChannelInterface* channel,
336                                       grpc::CompletionQueue* cq,
337                                       const grpc::internal::RpcMethod& method,
338                                       grpc::ClientContext* context, R* response,
339                                       bool start, void* tag) {
340     grpc::internal::Call call = channel->CreateCall(method, context, cq);
341     return new (
342         grpc_call_arena_alloc(call.call(), sizeof(ClientAsyncWriter<W>)))
343         ClientAsyncWriter<W>(call, context, response, start, tag);
344   }
345 };
346 }  // namespace internal
347 
348 /// Async API on the client side for doing client-streaming RPCs,
349 /// where the outgoing message stream going to the server contains
350 /// messages of type \a W.
351 template <class W>
352 class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
353  public:
354   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)355   static void operator delete(void* /*ptr*/, std::size_t size) {
356     ABSL_CHECK_EQ(size, sizeof(ClientAsyncWriter));
357   }
358 
359   // This operator should never be called as the memory should be freed as part
360   // of the arena destruction. It only exists to provide a matching operator
361   // delete to the operator new so that some compilers will not complain (see
362   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
363   // there are no tests catching the compiler warning.
delete(void *,void *)364   static void operator delete(void*, void*) { ABSL_CHECK(false); }
365 
StartCall(void * tag)366   void StartCall(void* tag) override {
367     ABSL_CHECK(!started_);
368     started_ = true;
369     StartCallInternal(tag);
370   }
371 
372   /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method for
373   /// semantics.
374   ///
375   /// Side effect:
376   ///   - upon receiving initial metadata from the server, the \a ClientContext
377   ///     associated with this call is updated, and the calling code can access
378   ///     the received metadata through the \a ClientContext.
ReadInitialMetadata(void * tag)379   void ReadInitialMetadata(void* tag) override {
380     ABSL_CHECK(started_);
381     ABSL_CHECK(!context_->initial_metadata_received_);
382 
383     meta_ops_.set_output_tag(tag);
384     meta_ops_.RecvInitialMetadata(context_);
385     call_.PerformOps(&meta_ops_);
386   }
387 
Write(const W & msg,void * tag)388   void Write(const W& msg, void* tag) override {
389     ABSL_CHECK(started_);
390     write_ops_.set_output_tag(tag);
391     // TODO(ctiller): don't assert
392     ABSL_CHECK(write_ops_.SendMessage(msg).ok());
393     call_.PerformOps(&write_ops_);
394   }
395 
Write(const W & msg,grpc::WriteOptions options,void * tag)396   void Write(const W& msg, grpc::WriteOptions options, void* tag) override {
397     ABSL_CHECK(started_);
398     write_ops_.set_output_tag(tag);
399     if (options.is_last_message()) {
400       options.set_buffer_hint();
401       write_ops_.ClientSendClose();
402     }
403     // TODO(ctiller): don't assert
404     ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
405     call_.PerformOps(&write_ops_);
406   }
407 
WritesDone(void * tag)408   void WritesDone(void* tag) override {
409     ABSL_CHECK(started_);
410     write_ops_.set_output_tag(tag);
411     write_ops_.ClientSendClose();
412     call_.PerformOps(&write_ops_);
413   }
414 
415   /// See the \a ClientAsyncStreamingInterface.Finish method for semantics.
416   ///
417   /// Side effect:
418   ///   - the \a ClientContext associated with this call is updated with
419   ///     possible initial and trailing metadata received from the server.
420   ///   - attempts to fill in the \a response parameter passed to this class's
421   ///     constructor with the server's response message.
Finish(grpc::Status * status,void * tag)422   void Finish(grpc::Status* status, void* tag) override {
423     ABSL_CHECK(started_);
424     finish_ops_.set_output_tag(tag);
425     if (!context_->initial_metadata_received_) {
426       finish_ops_.RecvInitialMetadata(context_);
427     }
428     finish_ops_.ClientRecvStatus(context_, status);
429     call_.PerformOps(&finish_ops_);
430   }
431 
432  private:
433   friend class internal::ClientAsyncWriterFactory<W>;
434   template <class R>
ClientAsyncWriter(grpc::internal::Call call,grpc::ClientContext * context,R * response,bool start,void * tag)435   ClientAsyncWriter(grpc::internal::Call call, grpc::ClientContext* context,
436                     R* response, bool start, void* tag)
437       : context_(context), call_(call), started_(start) {
438     finish_ops_.RecvMessage(response);
439     finish_ops_.AllowNoMessage();
440     if (start) {
441       StartCallInternal(tag);
442     } else {
443       ABSL_CHECK(tag == nullptr);
444     }
445   }
446 
StartCallInternal(void * tag)447   void StartCallInternal(void* tag) {
448     write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
449                                    context_->initial_metadata_flags());
450     // if corked bit is set in context, we just keep the initial metadata
451     // buffered up to coalesce with later message send. No op is performed.
452     if (!context_->initial_metadata_corked_) {
453       write_ops_.set_output_tag(tag);
454       call_.PerformOps(&write_ops_);
455     }
456   }
457 
458   grpc::ClientContext* context_;
459   grpc::internal::Call call_;
460   bool started_;
461   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata>
462       meta_ops_;
463   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
464                             grpc::internal::CallOpSendMessage,
465                             grpc::internal::CallOpClientSendClose>
466       write_ops_;
467   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
468                             grpc::internal::CallOpGenericRecvMessage,
469                             grpc::internal::CallOpClientRecvStatus>
470       finish_ops_;
471 };
472 
473 /// Async client-side interface for bi-directional streaming,
474 /// where the client-to-server message stream has messages of type \a W,
475 /// and the server-to-client message stream has messages of type \a R.
476 template <class W, class R>
477 class ClientAsyncReaderWriterInterface
478     : public internal::ClientAsyncStreamingInterface,
479       public internal::AsyncWriterInterface<W>,
480       public internal::AsyncReaderInterface<R> {
481  public:
482   /// Signal the client is done with the writes (half-close the client stream).
483   /// Thread-safe with respect to \a AsyncReaderInterface::Read
484   ///
485   /// \param[in] tag The tag identifying the operation.
486   virtual void WritesDone(void* tag) = 0;
487 };
488 
489 namespace internal {
490 template <class W, class R>
491 class ClientAsyncReaderWriterFactory {
492  public:
493   /// Create a stream object.
494   /// Start the RPC request if \a start is set.
495   /// \a tag will be notified on \a cq when the call has been started (i.e.
496   /// initial metadata sent). If \a start is not set, \a tag must be
497   /// nullptr and the actual call must be initiated by StartCall
498   /// Note that \a context will be used to fill in custom initial metadata
499   /// used to send to the server when starting the call.
Create(grpc::ChannelInterface * channel,grpc::CompletionQueue * cq,const grpc::internal::RpcMethod & method,grpc::ClientContext * context,bool start,void * tag)500   static ClientAsyncReaderWriter<W, R>* Create(
501       grpc::ChannelInterface* channel, grpc::CompletionQueue* cq,
502       const grpc::internal::RpcMethod& method, grpc::ClientContext* context,
503       bool start, void* tag) {
504     grpc::internal::Call call = channel->CreateCall(method, context, cq);
505 
506     return new (grpc_call_arena_alloc(call.call(),
507                                       sizeof(ClientAsyncReaderWriter<W, R>)))
508         ClientAsyncReaderWriter<W, R>(call, context, start, tag);
509   }
510 };
511 }  // namespace internal
512 
513 /// Async client-side interface for bi-directional streaming,
514 /// where the outgoing message stream going to the server
515 /// has messages of type \a W,  and the incoming message stream coming
516 /// from the server has messages of type \a R.
517 template <class W, class R>
518 class ClientAsyncReaderWriter final
519     : public ClientAsyncReaderWriterInterface<W, R> {
520  public:
521   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)522   static void operator delete(void* /*ptr*/, std::size_t size) {
523     ABSL_CHECK_EQ(size, sizeof(ClientAsyncReaderWriter));
524   }
525 
526   // This operator should never be called as the memory should be freed as part
527   // of the arena destruction. It only exists to provide a matching operator
528   // delete to the operator new so that some compilers will not complain (see
529   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
530   // there are no tests catching the compiler warning.
delete(void *,void *)531   static void operator delete(void*, void*) { ABSL_CHECK(false); }
532 
StartCall(void * tag)533   void StartCall(void* tag) override {
534     ABSL_CHECK(!started_);
535     started_ = true;
536     StartCallInternal(tag);
537   }
538 
539   /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method
540   /// for semantics of this method.
541   ///
542   /// Side effect:
543   ///   - upon receiving initial metadata from the server, the \a ClientContext
544   ///     is updated with it, and then the receiving initial metadata can
545   ///     be accessed through this \a ClientContext.
ReadInitialMetadata(void * tag)546   void ReadInitialMetadata(void* tag) override {
547     ABSL_CHECK(started_);
548     ABSL_CHECK(!context_->initial_metadata_received_);
549 
550     meta_ops_.set_output_tag(tag);
551     meta_ops_.RecvInitialMetadata(context_);
552     call_.PerformOps(&meta_ops_);
553   }
554 
Read(R * msg,void * tag)555   void Read(R* msg, void* tag) override {
556     ABSL_CHECK(started_);
557     read_ops_.set_output_tag(tag);
558     if (!context_->initial_metadata_received_) {
559       read_ops_.RecvInitialMetadata(context_);
560     }
561     read_ops_.RecvMessage(msg);
562     call_.PerformOps(&read_ops_);
563   }
564 
Write(const W & msg,void * tag)565   void Write(const W& msg, void* tag) override {
566     ABSL_CHECK(started_);
567     write_ops_.set_output_tag(tag);
568     // TODO(ctiller): don't assert
569     ABSL_CHECK(write_ops_.SendMessage(msg).ok());
570     call_.PerformOps(&write_ops_);
571   }
572 
Write(const W & msg,grpc::WriteOptions options,void * tag)573   void Write(const W& msg, grpc::WriteOptions options, void* tag) override {
574     ABSL_CHECK(started_);
575     write_ops_.set_output_tag(tag);
576     if (options.is_last_message()) {
577       options.set_buffer_hint();
578       write_ops_.ClientSendClose();
579     }
580     // TODO(ctiller): don't assert
581     ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
582     call_.PerformOps(&write_ops_);
583   }
584 
WritesDone(void * tag)585   void WritesDone(void* tag) override {
586     ABSL_CHECK(started_);
587     write_ops_.set_output_tag(tag);
588     write_ops_.ClientSendClose();
589     call_.PerformOps(&write_ops_);
590   }
591 
592   /// See the \a ClientAsyncStreamingInterface.Finish method for semantics.
593   /// Side effect
594   ///   - the \a ClientContext associated with this call is updated with
595   ///     possible initial and trailing metadata sent from the server.
Finish(grpc::Status * status,void * tag)596   void Finish(grpc::Status* status, void* tag) override {
597     ABSL_CHECK(started_);
598     finish_ops_.set_output_tag(tag);
599     if (!context_->initial_metadata_received_) {
600       finish_ops_.RecvInitialMetadata(context_);
601     }
602     finish_ops_.ClientRecvStatus(context_, status);
603     call_.PerformOps(&finish_ops_);
604   }
605 
606  private:
607   friend class internal::ClientAsyncReaderWriterFactory<W, R>;
ClientAsyncReaderWriter(grpc::internal::Call call,grpc::ClientContext * context,bool start,void * tag)608   ClientAsyncReaderWriter(grpc::internal::Call call,
609                           grpc::ClientContext* context, bool start, void* tag)
610       : context_(context), call_(call), started_(start) {
611     if (start) {
612       StartCallInternal(tag);
613     } else {
614       ABSL_CHECK(tag == nullptr);
615     }
616   }
617 
StartCallInternal(void * tag)618   void StartCallInternal(void* tag) {
619     write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
620                                    context_->initial_metadata_flags());
621     // if corked bit is set in context, we just keep the initial metadata
622     // buffered up to coalesce with later message send. No op is performed.
623     if (!context_->initial_metadata_corked_) {
624       write_ops_.set_output_tag(tag);
625       call_.PerformOps(&write_ops_);
626     }
627   }
628 
629   grpc::ClientContext* context_;
630   grpc::internal::Call call_;
631   bool started_;
632   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata>
633       meta_ops_;
634   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
635                             grpc::internal::CallOpRecvMessage<R>>
636       read_ops_;
637   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
638                             grpc::internal::CallOpSendMessage,
639                             grpc::internal::CallOpClientSendClose>
640       write_ops_;
641   grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata,
642                             grpc::internal::CallOpClientRecvStatus>
643       finish_ops_;
644 };
645 
646 template <class W, class R>
647 class ServerAsyncReaderInterface
648     : public grpc::internal::ServerAsyncStreamingInterface,
649       public internal::AsyncReaderInterface<R> {
650  public:
651   /// Indicate that the stream is to be finished with a certain status code
652   /// and also send out \a msg response to the client.
653   /// Request notification for when the server has sent the response and the
654   /// appropriate signals to the client to end the call.
655   /// Should not be used concurrently with other operations.
656   ///
657   /// It is appropriate to call this method when:
658   ///   * all messages from the client have been received (either known
659   ///     implicitly, or explicitly because a previous
660   ///     \a AsyncReaderInterface::Read operation with a non-ok result,
661   ///     e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false').
662   ///
663   /// This operation will end when the server has finished sending out initial
664   /// metadata (if not sent already), response message, and status, or if
665   /// some failure occurred when trying to do so.
666   ///
667   /// gRPC doesn't take ownership or a reference to \a msg or \a status, so it
668   /// is safe to deallocate once Finish returns.
669   ///
670   /// \param[in] tag Tag identifying this request.
671   /// \param[in] status To be sent to the client as the result of this call.
672   /// \param[in] msg To be sent to the client as the response for this call.
673   virtual void Finish(const W& msg, const grpc::Status& status, void* tag) = 0;
674 
675   /// Indicate that the stream is to be finished with a certain
676   /// non-OK status code.
677   /// Request notification for when the server has sent the appropriate
678   /// signals to the client to end the call.
679   /// Should not be used concurrently with other operations.
680   ///
681   /// This call is meant to end the call with some error, and can be called at
682   /// any point that the server would like to "fail" the call (though note
683   /// this shouldn't be called concurrently with any other "sending" call, like
684   /// \a AsyncWriterInterface::Write).
685   ///
686   /// This operation will end when the server has finished sending out initial
687   /// metadata (if not sent already), and status, or if some failure occurred
688   /// when trying to do so.
689   ///
690   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
691   /// to deallocate once FinishWithError returns.
692   ///
693   /// \param[in] tag Tag identifying this request.
694   /// \param[in] status To be sent to the client as the result of this call.
695   ///     - Note: \a status must have a non-OK code.
696   virtual void FinishWithError(const grpc::Status& status, void* tag) = 0;
697 };
698 
699 /// Async server-side API for doing client-streaming RPCs,
700 /// where the incoming message stream from the client has messages of type \a R,
701 /// and the single response message sent from the server is type \a W.
702 template <class W, class R>
703 class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> {
704  public:
ServerAsyncReader(grpc::ServerContext * ctx)705   explicit ServerAsyncReader(grpc::ServerContext* ctx)
706       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
707 
708   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
709   ///
710   /// Implicit input parameter:
711   ///   - The initial metadata that will be sent to the client from this op will
712   ///     be taken from the \a ServerContext associated with the call.
SendInitialMetadata(void * tag)713   void SendInitialMetadata(void* tag) override {
714     ABSL_CHECK(!ctx_->sent_initial_metadata_);
715 
716     meta_ops_.set_output_tag(tag);
717     meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
718                                   ctx_->initial_metadata_flags());
719     if (ctx_->compression_level_set()) {
720       meta_ops_.set_compression_level(ctx_->compression_level());
721     }
722     ctx_->sent_initial_metadata_ = true;
723     call_.PerformOps(&meta_ops_);
724   }
725 
Read(R * msg,void * tag)726   void Read(R* msg, void* tag) override {
727     read_ops_.set_output_tag(tag);
728     read_ops_.RecvMessage(msg);
729     call_.PerformOps(&read_ops_);
730   }
731 
732   /// See the \a ServerAsyncReaderInterface.Read method for semantics
733   ///
734   /// Side effect:
735   ///   - also sends initial metadata if not already sent.
736   ///   - uses the \a ServerContext associated with this call to send possible
737   ///     initial and trailing metadata.
738   ///
739   /// Note: \a msg is not sent if \a status has a non-OK code.
740   ///
741   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
742   /// is safe to deallocate once Finish returns.
Finish(const W & msg,const grpc::Status & status,void * tag)743   void Finish(const W& msg, const grpc::Status& status, void* tag) override {
744     finish_ops_.set_output_tag(tag);
745     if (!ctx_->sent_initial_metadata_) {
746       finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
747                                       ctx_->initial_metadata_flags());
748       if (ctx_->compression_level_set()) {
749         finish_ops_.set_compression_level(ctx_->compression_level());
750       }
751       ctx_->sent_initial_metadata_ = true;
752     }
753     // The response is dropped if the status is not OK.
754     if (status.ok()) {
755       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
756                                    finish_ops_.SendMessage(msg));
757     } else {
758       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
759     }
760     call_.PerformOps(&finish_ops_);
761   }
762 
763   /// See the \a ServerAsyncReaderInterface.Read method for semantics
764   ///
765   /// Side effect:
766   ///   - also sends initial metadata if not already sent.
767   ///   - uses the \a ServerContext associated with this call to send possible
768   ///     initial and trailing metadata.
769   ///
770   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
771   /// to deallocate once FinishWithError returns.
FinishWithError(const grpc::Status & status,void * tag)772   void FinishWithError(const grpc::Status& status, void* tag) override {
773     ABSL_CHECK(!status.ok());
774     finish_ops_.set_output_tag(tag);
775     if (!ctx_->sent_initial_metadata_) {
776       finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
777                                       ctx_->initial_metadata_flags());
778       if (ctx_->compression_level_set()) {
779         finish_ops_.set_compression_level(ctx_->compression_level());
780       }
781       ctx_->sent_initial_metadata_ = true;
782     }
783     finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
784     call_.PerformOps(&finish_ops_);
785   }
786 
787  private:
BindCall(grpc::internal::Call * call)788   void BindCall(grpc::internal::Call* call) override { call_ = *call; }
789 
790   grpc::internal::Call call_;
791   grpc::ServerContext* ctx_;
792   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
793       meta_ops_;
794   grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> read_ops_;
795   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
796                             grpc::internal::CallOpSendMessage,
797                             grpc::internal::CallOpServerSendStatus>
798       finish_ops_;
799 };
800 
801 template <class W>
802 class ServerAsyncWriterInterface
803     : public grpc::internal::ServerAsyncStreamingInterface,
804       public internal::AsyncWriterInterface<W> {
805  public:
806   /// Indicate that the stream is to be finished with a certain status code.
807   /// Request notification for when the server has sent the appropriate
808   /// signals to the client to end the call.
809   /// Should not be used concurrently with other operations.
810   ///
811   /// It is appropriate to call this method when either:
812   ///   * all messages from the client have been received (either known
813   ///     implicitly, or explicitly because a previous \a
814   ///     AsyncReaderInterface::Read operation with a non-ok
815   ///     result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'.
816   ///   * it is desired to end the call early with some non-OK status code.
817   ///
818   /// This operation will end when the server has finished sending out initial
819   /// metadata (if not sent already), response message, and status, or if
820   /// some failure occurred when trying to do so.
821   ///
822   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
823   /// to deallocate once Finish returns.
824   ///
825   /// \param[in] tag Tag identifying this request.
826   /// \param[in] status To be sent to the client as the result of this call.
827   virtual void Finish(const grpc::Status& status, void* tag) = 0;
828 
829   /// Request the writing of \a msg and coalesce it with trailing metadata which
830   /// contains \a status, using WriteOptions options with
831   /// identifying tag \a tag.
832   ///
833   /// WriteAndFinish is equivalent of performing WriteLast and Finish
834   /// in a single step.
835   ///
836   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
837   /// is safe to deallocate once WriteAndFinish returns.
838   ///
839   /// \param[in] msg The message to be written.
840   /// \param[in] options The WriteOptions to be used to write this message.
841   /// \param[in] status The Status that server returns to client.
842   /// \param[in] tag The tag identifying the operation.
843   virtual void WriteAndFinish(const W& msg, grpc::WriteOptions options,
844                               const grpc::Status& status, void* tag) = 0;
845 };
846 
847 /// Async server-side API for doing server streaming RPCs,
848 /// where the outgoing message stream from the server has messages of type \a W.
849 template <class W>
850 class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
851  public:
ServerAsyncWriter(grpc::ServerContext * ctx)852   explicit ServerAsyncWriter(grpc::ServerContext* ctx)
853       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
854 
855   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
856   ///
857   /// Implicit input parameter:
858   ///   - The initial metadata that will be sent to the client from this op will
859   ///     be taken from the \a ServerContext associated with the call.
860   ///
861   /// \param[in] tag Tag identifying this request.
SendInitialMetadata(void * tag)862   void SendInitialMetadata(void* tag) override {
863     ABSL_CHECK(!ctx_->sent_initial_metadata_);
864 
865     meta_ops_.set_output_tag(tag);
866     meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
867                                   ctx_->initial_metadata_flags());
868     if (ctx_->compression_level_set()) {
869       meta_ops_.set_compression_level(ctx_->compression_level());
870     }
871     ctx_->sent_initial_metadata_ = true;
872     call_.PerformOps(&meta_ops_);
873   }
874 
Write(const W & msg,void * tag)875   void Write(const W& msg, void* tag) override {
876     write_ops_.set_output_tag(tag);
877     EnsureInitialMetadataSent(&write_ops_);
878     // TODO(ctiller): don't assert
879     ABSL_CHECK(write_ops_.SendMessage(msg).ok());
880     call_.PerformOps(&write_ops_);
881   }
882 
Write(const W & msg,grpc::WriteOptions options,void * tag)883   void Write(const W& msg, grpc::WriteOptions options, void* tag) override {
884     write_ops_.set_output_tag(tag);
885     if (options.is_last_message()) {
886       options.set_buffer_hint();
887     }
888 
889     EnsureInitialMetadataSent(&write_ops_);
890     // TODO(ctiller): don't assert
891     ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
892     call_.PerformOps(&write_ops_);
893   }
894 
895   /// See the \a ServerAsyncWriterInterface.WriteAndFinish method for semantics.
896   ///
897   /// Implicit input parameter:
898   ///   - the \a ServerContext associated with this call is used
899   ///     for sending trailing (and initial) metadata to the client.
900   ///
901   /// Note: \a status must have an OK code.
902   ///
903   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
904   /// is safe to deallocate once WriteAndFinish returns.
WriteAndFinish(const W & msg,grpc::WriteOptions options,const grpc::Status & status,void * tag)905   void WriteAndFinish(const W& msg, grpc::WriteOptions options,
906                       const grpc::Status& status, void* tag) override {
907     write_ops_.set_output_tag(tag);
908     EnsureInitialMetadataSent(&write_ops_);
909     options.set_buffer_hint();
910     ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
911     write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
912     call_.PerformOps(&write_ops_);
913   }
914 
915   /// See the \a ServerAsyncWriterInterface.Finish method for semantics.
916   ///
917   /// Implicit input parameter:
918   ///   - the \a ServerContext associated with this call is used for sending
919   ///     trailing (and initial if not already sent) metadata to the client.
920   ///
921   /// Note: there are no restrictions are the code of
922   /// \a status,it may be non-OK
923   ///
924   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
925   /// to deallocate once Finish returns.
Finish(const grpc::Status & status,void * tag)926   void Finish(const grpc::Status& status, void* tag) override {
927     finish_ops_.set_output_tag(tag);
928     EnsureInitialMetadataSent(&finish_ops_);
929     finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
930     call_.PerformOps(&finish_ops_);
931   }
932 
933  private:
BindCall(grpc::internal::Call * call)934   void BindCall(grpc::internal::Call* call) override { call_ = *call; }
935 
936   template <class T>
EnsureInitialMetadataSent(T * ops)937   void EnsureInitialMetadataSent(T* ops) {
938     if (!ctx_->sent_initial_metadata_) {
939       ops->SendInitialMetadata(&ctx_->initial_metadata_,
940                                ctx_->initial_metadata_flags());
941       if (ctx_->compression_level_set()) {
942         ops->set_compression_level(ctx_->compression_level());
943       }
944       ctx_->sent_initial_metadata_ = true;
945     }
946   }
947 
948   grpc::internal::Call call_;
949   grpc::ServerContext* ctx_;
950   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
951       meta_ops_;
952   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
953                             grpc::internal::CallOpSendMessage,
954                             grpc::internal::CallOpServerSendStatus>
955       write_ops_;
956   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
957                             grpc::internal::CallOpServerSendStatus>
958       finish_ops_;
959 };
960 
961 /// Server-side interface for asynchronous bi-directional streaming.
962 template <class W, class R>
963 class ServerAsyncReaderWriterInterface
964     : public grpc::internal::ServerAsyncStreamingInterface,
965       public internal::AsyncWriterInterface<W>,
966       public internal::AsyncReaderInterface<R> {
967  public:
968   /// Indicate that the stream is to be finished with a certain status code.
969   /// Request notification for when the server has sent the appropriate
970   /// signals to the client to end the call.
971   /// Should not be used concurrently with other operations.
972   ///
973   /// It is appropriate to call this method when either:
974   ///   * all messages from the client have been received (either known
975   ///     implicitly, or explicitly because a previous \a
976   ///     AsyncReaderInterface::Read operation
977   ///     with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok'
978   ///     with 'false'.
979   ///   * it is desired to end the call early with some non-OK status code.
980   ///
981   /// This operation will end when the server has finished sending out initial
982   /// metadata (if not sent already), response message, and status, or if some
983   /// failure occurred when trying to do so.
984   ///
985   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
986   /// to deallocate once Finish returns.
987   ///
988   /// \param[in] tag Tag identifying this request.
989   /// \param[in] status To be sent to the client as the result of this call.
990   virtual void Finish(const grpc::Status& status, void* tag) = 0;
991 
992   /// Request the writing of \a msg and coalesce it with trailing metadata which
993   /// contains \a status, using WriteOptions options with
994   /// identifying tag \a tag.
995   ///
996   /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
997   /// single step.
998   ///
999   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
1000   /// is safe to deallocate once WriteAndFinish returns.
1001   ///
1002   /// \param[in] msg The message to be written.
1003   /// \param[in] options The WriteOptions to be used to write this message.
1004   /// \param[in] status The Status that server returns to client.
1005   /// \param[in] tag The tag identifying the operation.
1006   virtual void WriteAndFinish(const W& msg, grpc::WriteOptions options,
1007                               const grpc::Status& status, void* tag) = 0;
1008 };
1009 
1010 /// Async server-side API for doing bidirectional streaming RPCs,
1011 /// where the incoming message stream coming from the client has messages of
1012 /// type \a R, and the outgoing message stream coming from the server has
1013 /// messages of type \a W.
1014 template <class W, class R>
1015 class ServerAsyncReaderWriter final
1016     : public ServerAsyncReaderWriterInterface<W, R> {
1017  public:
ServerAsyncReaderWriter(grpc::ServerContext * ctx)1018   explicit ServerAsyncReaderWriter(grpc::ServerContext* ctx)
1019       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
1020 
1021   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
1022   ///
1023   /// Implicit input parameter:
1024   ///   - The initial metadata that will be sent to the client from this op will
1025   ///     be taken from the \a ServerContext associated with the call.
1026   ///
1027   /// \param[in] tag Tag identifying this request.
SendInitialMetadata(void * tag)1028   void SendInitialMetadata(void* tag) override {
1029     ABSL_CHECK(!ctx_->sent_initial_metadata_);
1030 
1031     meta_ops_.set_output_tag(tag);
1032     meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
1033                                   ctx_->initial_metadata_flags());
1034     if (ctx_->compression_level_set()) {
1035       meta_ops_.set_compression_level(ctx_->compression_level());
1036     }
1037     ctx_->sent_initial_metadata_ = true;
1038     call_.PerformOps(&meta_ops_);
1039   }
1040 
Read(R * msg,void * tag)1041   void Read(R* msg, void* tag) override {
1042     read_ops_.set_output_tag(tag);
1043     read_ops_.RecvMessage(msg);
1044     call_.PerformOps(&read_ops_);
1045   }
1046 
Write(const W & msg,void * tag)1047   void Write(const W& msg, void* tag) override {
1048     write_ops_.set_output_tag(tag);
1049     EnsureInitialMetadataSent(&write_ops_);
1050     // TODO(ctiller): don't assert
1051     ABSL_CHECK(write_ops_.SendMessage(msg).ok());
1052     call_.PerformOps(&write_ops_);
1053   }
1054 
Write(const W & msg,grpc::WriteOptions options,void * tag)1055   void Write(const W& msg, grpc::WriteOptions options, void* tag) override {
1056     write_ops_.set_output_tag(tag);
1057     if (options.is_last_message()) {
1058       options.set_buffer_hint();
1059     }
1060     EnsureInitialMetadataSent(&write_ops_);
1061     ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
1062     call_.PerformOps(&write_ops_);
1063   }
1064 
1065   /// See the \a ServerAsyncReaderWriterInterface.WriteAndFinish
1066   /// method for semantics.
1067   ///
1068   /// Implicit input parameter:
1069   ///   - the \a ServerContext associated with this call is used
1070   ///     for sending trailing (and initial) metadata to the client.
1071   ///
1072   /// Note: \a status must have an OK code.
1073   //
1074   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
1075   /// is safe to deallocate once WriteAndFinish returns.
WriteAndFinish(const W & msg,grpc::WriteOptions options,const grpc::Status & status,void * tag)1076   void WriteAndFinish(const W& msg, grpc::WriteOptions options,
1077                       const grpc::Status& status, void* tag) override {
1078     write_ops_.set_output_tag(tag);
1079     EnsureInitialMetadataSent(&write_ops_);
1080     options.set_buffer_hint();
1081     ABSL_CHECK(write_ops_.SendMessage(msg, options).ok());
1082     write_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1083     call_.PerformOps(&write_ops_);
1084   }
1085 
1086   /// See the \a ServerAsyncReaderWriterInterface.Finish method for semantics.
1087   ///
1088   /// Implicit input parameter:
1089   ///   - the \a ServerContext associated with this call is used for sending
1090   ///     trailing (and initial if not already sent) metadata to the client.
1091   ///
1092   /// Note: there are no restrictions are the code of \a status,
1093   /// it may be non-OK
1094   //
1095   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
1096   /// to deallocate once Finish returns.
Finish(const grpc::Status & status,void * tag)1097   void Finish(const grpc::Status& status, void* tag) override {
1098     finish_ops_.set_output_tag(tag);
1099     EnsureInitialMetadataSent(&finish_ops_);
1100 
1101     finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, status);
1102     call_.PerformOps(&finish_ops_);
1103   }
1104 
1105  private:
1106   friend class grpc::Server;
1107 
BindCall(grpc::internal::Call * call)1108   void BindCall(grpc::internal::Call* call) override { call_ = *call; }
1109 
1110   template <class T>
EnsureInitialMetadataSent(T * ops)1111   void EnsureInitialMetadataSent(T* ops) {
1112     if (!ctx_->sent_initial_metadata_) {
1113       ops->SendInitialMetadata(&ctx_->initial_metadata_,
1114                                ctx_->initial_metadata_flags());
1115       if (ctx_->compression_level_set()) {
1116         ops->set_compression_level(ctx_->compression_level());
1117       }
1118       ctx_->sent_initial_metadata_ = true;
1119     }
1120   }
1121 
1122   grpc::internal::Call call_;
1123   grpc::ServerContext* ctx_;
1124   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
1125       meta_ops_;
1126   grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> read_ops_;
1127   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1128                             grpc::internal::CallOpSendMessage,
1129                             grpc::internal::CallOpServerSendStatus>
1130       write_ops_;
1131   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1132                             grpc::internal::CallOpServerSendStatus>
1133       finish_ops_;
1134 };
1135 
1136 }  // namespace grpc
1137 
1138 #endif  // GRPCPP_SUPPORT_ASYNC_STREAM_H
1139