• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H
20 #define GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H
21 
22 #include <grpcpp/impl/codegen/call.h>
23 #include <grpcpp/impl/codegen/channel_interface.h>
24 #include <grpcpp/impl/codegen/core_codegen_interface.h>
25 #include <grpcpp/impl/codegen/server_context.h>
26 #include <grpcpp/impl/codegen/service_type.h>
27 #include <grpcpp/impl/codegen/status.h>
28 
29 namespace grpc {
30 
31 class CompletionQueue;
32 
33 namespace internal {
34 /// Common interface for all client side asynchronous streaming.
35 class ClientAsyncStreamingInterface {
36  public:
~ClientAsyncStreamingInterface()37   virtual ~ClientAsyncStreamingInterface() {}
38 
39   /// Start the call that was set up by the constructor, but only if the
40   /// constructor was invoked through the "Prepare" API which doesn't actually
41   /// start the call
42   virtual void StartCall(void* tag) = 0;
43 
44   /// Request notification of the reading of the initial metadata. Completion
45   /// will be notified by \a tag on the associated completion queue.
46   /// This call is optional, but if it is used, it cannot be used concurrently
47   /// with or after the \a AsyncReaderInterface::Read method.
48   ///
49   /// \param[in] tag Tag identifying this request.
50   virtual void ReadInitialMetadata(void* tag) = 0;
51 
52   /// Indicate that the stream is to be finished and request notification for
53   /// when the call has been ended.
54   /// Should not be used concurrently with other operations.
55   ///
56   /// It is appropriate to call this method when both:
57   ///   * the client side has no more message to send
58   ///     (this can be declared implicitly by calling this method, or
59   ///     explicitly through an earlier call to the <i>WritesDone</i> method
60   ///     of the class in use, e.g. \a ClientAsyncWriterInterface::WritesDone or
61   ///     \a ClientAsyncReaderWriterInterface::WritesDone).
62   ///   * there are no more messages to be received from the server (this can
63   ///     be known implicitly by the calling code, or explicitly from an
64   ///     earlier call to \a AsyncReaderInterface::Read that yielded a failed
65   ///     result, e.g. cq->Next(&read_tag, &ok) filled in 'ok' with 'false').
66   ///
67   /// The tag will be returned when either:
68   /// - all incoming messages have been read and the server has returned
69   ///   a status.
70   /// - the server has returned a non-OK status.
71   /// - the call failed for some reason and the library generated a
72   ///   status.
73   ///
74   /// Note that implementations of this method attempt to receive initial
75   /// metadata from the server if initial metadata hasn't yet been received.
76   ///
77   /// \param[in] tag Tag identifying this request.
78   /// \param[out] status To be updated with the operation status.
79   virtual void Finish(Status* status, void* tag) = 0;
80 };
81 
82 /// An interface that yields a sequence of messages of type \a R.
83 template <class R>
84 class AsyncReaderInterface {
85  public:
~AsyncReaderInterface()86   virtual ~AsyncReaderInterface() {}
87 
88   /// Read a message of type \a R into \a msg. Completion will be notified by \a
89   /// tag on the associated completion queue.
90   /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
91   /// should not be called concurrently with other streaming APIs
92   /// on the same stream. It is not meaningful to call it concurrently
93   /// with another \a AsyncReaderInterface::Read on the same stream since reads
94   /// on the same stream are delivered in order.
95   ///
96   /// \param[out] msg Where to eventually store the read message.
97   /// \param[in] tag The tag identifying the operation.
98   ///
99   /// Side effect: note that this method attempt to receive initial metadata for
100   /// a stream if it hasn't yet been received.
101   virtual void Read(R* msg, void* tag) = 0;
102 };
103 
104 /// An interface that can be fed a sequence of messages of type \a W.
105 template <class W>
106 class AsyncWriterInterface {
107  public:
~AsyncWriterInterface()108   virtual ~AsyncWriterInterface() {}
109 
110   /// Request the writing of \a msg with identifying tag \a tag.
111   ///
112   /// Only one write may be outstanding at any given time. This means that
113   /// after calling Write, one must wait to receive \a tag from the completion
114   /// queue BEFORE calling Write again.
115   /// This is thread-safe with respect to \a AsyncReaderInterface::Read
116   ///
117   /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to
118   /// to deallocate once Write returns.
119   ///
120   /// \param[in] msg The message to be written.
121   /// \param[in] tag The tag identifying the operation.
122   virtual void Write(const W& msg, void* tag) = 0;
123 
124   /// Request the writing of \a msg using WriteOptions \a options with
125   /// identifying tag \a tag.
126   ///
127   /// Only one write may be outstanding at any given time. This means that
128   /// after calling Write, one must wait to receive \a tag from the completion
129   /// queue BEFORE calling Write again.
130   /// WriteOptions \a options is used to set the write options of this message.
131   /// This is thread-safe with respect to \a AsyncReaderInterface::Read
132   ///
133   /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to
134   /// to deallocate once Write returns.
135   ///
136   /// \param[in] msg The message to be written.
137   /// \param[in] options The WriteOptions to be used to write this message.
138   /// \param[in] tag The tag identifying the operation.
139   virtual void Write(const W& msg, WriteOptions options, void* tag) = 0;
140 
141   /// Request the writing of \a msg and coalesce it with the writing
142   /// of trailing metadata, using WriteOptions \a options with
143   /// identifying tag \a tag.
144   ///
145   /// For client, WriteLast is equivalent of performing Write and
146   /// WritesDone in a single step.
147   /// For server, WriteLast buffers the \a msg. The writing of \a msg is held
148   /// until Finish is called, where \a msg and trailing metadata are coalesced
149   /// and write is initiated. Note that WriteLast can only buffer \a msg up to
150   /// the flow control window size. If \a msg size is larger than the window
151   /// size, it will be sent on wire without buffering.
152   ///
153   /// gRPC doesn't take ownership or a reference to \a msg, so it is safe to
154   /// to deallocate once Write returns.
155   ///
156   /// \param[in] msg The message to be written.
157   /// \param[in] options The WriteOptions to be used to write this message.
158   /// \param[in] tag The tag identifying the operation.
WriteLast(const W & msg,WriteOptions options,void * tag)159   void WriteLast(const W& msg, WriteOptions options, void* tag) {
160     Write(msg, options.set_last_message(), tag);
161   }
162 };
163 
164 }  // namespace internal
165 
166 template <class R>
167 class ClientAsyncReaderInterface
168     : public internal::ClientAsyncStreamingInterface,
169       public internal::AsyncReaderInterface<R> {};
170 
171 namespace internal {
172 template <class R>
173 class ClientAsyncReaderFactory {
174  public:
175   /// Create a stream object.
176   /// Write the first request out if \a start is set.
177   /// \a tag will be notified on \a cq when the call has been started and
178   /// \a request has been written out. If \a start is not set, \a tag must be
179   /// nullptr and the actual call must be initiated by StartCall
180   /// Note that \a context will be used to fill in custom initial metadata
181   /// used to send to the server when starting the call.
182   template <class W>
Create(ChannelInterface * channel,CompletionQueue * cq,const::grpc::internal::RpcMethod & method,ClientContext * context,const W & request,bool start,void * tag)183   static ClientAsyncReader<R>* Create(ChannelInterface* channel,
184                                       CompletionQueue* cq,
185                                       const ::grpc::internal::RpcMethod& method,
186                                       ClientContext* context, const W& request,
187                                       bool start, void* tag) {
188     ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
189     return new (g_core_codegen_interface->grpc_call_arena_alloc(
190         call.call(), sizeof(ClientAsyncReader<R>)))
191         ClientAsyncReader<R>(call, context, request, start, tag);
192   }
193 };
194 }  // namespace internal
195 
196 /// Async client-side API for doing server-streaming RPCs,
197 /// where the incoming message stream coming from the server has
198 /// messages of type \a R.
199 template <class R>
200 class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
201  public:
202   // always allocated against a call arena, no memory free required
delete(void * ptr,std::size_t size)203   static void operator delete(void* ptr, std::size_t size) {
204     assert(size == sizeof(ClientAsyncReader));
205   }
206 
207   // This operator should never be called as the memory should be freed as part
208   // of the arena destruction. It only exists to provide a matching operator
209   // delete to the operator new so that some compilers will not complain (see
210   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
211   // there are no tests catching the compiler warning.
delete(void *,void *)212   static void operator delete(void*, void*) { assert(0); }
213 
StartCall(void * tag)214   void StartCall(void* tag) override {
215     assert(!started_);
216     started_ = true;
217     StartCallInternal(tag);
218   }
219 
220   /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata
221   /// method for semantics.
222   ///
223   /// Side effect:
224   ///   - upon receiving initial metadata from the server,
225   ///     the \a ClientContext associated with this call is updated, and the
226   ///     calling code can access the received metadata through the
227   ///     \a ClientContext.
ReadInitialMetadata(void * tag)228   void ReadInitialMetadata(void* tag) override {
229     assert(started_);
230     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
231 
232     meta_ops_.set_output_tag(tag);
233     meta_ops_.RecvInitialMetadata(context_);
234     call_.PerformOps(&meta_ops_);
235   }
236 
Read(R * msg,void * tag)237   void Read(R* msg, void* tag) override {
238     assert(started_);
239     read_ops_.set_output_tag(tag);
240     if (!context_->initial_metadata_received_) {
241       read_ops_.RecvInitialMetadata(context_);
242     }
243     read_ops_.RecvMessage(msg);
244     call_.PerformOps(&read_ops_);
245   }
246 
247   /// See the \a ClientAsyncStreamingInterface.Finish method for semantics.
248   ///
249   /// Side effect:
250   ///   - the \a ClientContext associated with this call is updated with
251   ///     possible initial and trailing metadata received from the server.
Finish(Status * status,void * tag)252   void Finish(Status* status, void* tag) override {
253     assert(started_);
254     finish_ops_.set_output_tag(tag);
255     if (!context_->initial_metadata_received_) {
256       finish_ops_.RecvInitialMetadata(context_);
257     }
258     finish_ops_.ClientRecvStatus(context_, status);
259     call_.PerformOps(&finish_ops_);
260   }
261 
262  private:
263   friend class internal::ClientAsyncReaderFactory<R>;
264   template <class W>
ClientAsyncReader(::grpc::internal::Call call,ClientContext * context,const W & request,bool start,void * tag)265   ClientAsyncReader(::grpc::internal::Call call, ClientContext* context,
266                     const W& request, bool start, void* tag)
267       : context_(context), call_(call), started_(start) {
268     // TODO(ctiller): don't assert
269     GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok());
270     init_ops_.ClientSendClose();
271     if (start) {
272       StartCallInternal(tag);
273     } else {
274       assert(tag == nullptr);
275     }
276   }
277 
StartCallInternal(void * tag)278   void StartCallInternal(void* tag) {
279     init_ops_.SendInitialMetadata(context_->send_initial_metadata_,
280                                   context_->initial_metadata_flags());
281     init_ops_.set_output_tag(tag);
282     call_.PerformOps(&init_ops_);
283   }
284 
285   ClientContext* context_;
286   ::grpc::internal::Call call_;
287   bool started_;
288   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
289                               ::grpc::internal::CallOpSendMessage,
290                               ::grpc::internal::CallOpClientSendClose>
291       init_ops_;
292   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
293       meta_ops_;
294   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
295                               ::grpc::internal::CallOpRecvMessage<R>>
296       read_ops_;
297   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
298                               ::grpc::internal::CallOpClientRecvStatus>
299       finish_ops_;
300 };
301 
302 /// Common interface for client side asynchronous writing.
303 template <class W>
304 class ClientAsyncWriterInterface
305     : public internal::ClientAsyncStreamingInterface,
306       public internal::AsyncWriterInterface<W> {
307  public:
308   /// Signal the client is done with the writes (half-close the client stream).
309   /// Thread-safe with respect to \a AsyncReaderInterface::Read
310   ///
311   /// \param[in] tag The tag identifying the operation.
312   virtual void WritesDone(void* tag) = 0;
313 };
314 
315 namespace internal {
316 template <class W>
317 class ClientAsyncWriterFactory {
318  public:
319   /// Create a stream object.
320   /// Start the RPC if \a start is set
321   /// \a tag will be notified on \a cq when the call has been started (i.e.
322   /// intitial metadata sent) and \a request has been written out.
323   /// If \a start is not set, \a tag must be nullptr and the actual call
324   /// must be initiated by StartCall
325   /// Note that \a context will be used to fill in custom initial metadata
326   /// used to send to the server when starting the call.
327   /// \a response will be filled in with the single expected response
328   /// message from the server upon a successful call to the \a Finish
329   /// method of this instance.
330   template <class R>
Create(ChannelInterface * channel,CompletionQueue * cq,const::grpc::internal::RpcMethod & method,ClientContext * context,R * response,bool start,void * tag)331   static ClientAsyncWriter<W>* Create(ChannelInterface* channel,
332                                       CompletionQueue* cq,
333                                       const ::grpc::internal::RpcMethod& method,
334                                       ClientContext* context, R* response,
335                                       bool start, void* tag) {
336     ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
337     return new (g_core_codegen_interface->grpc_call_arena_alloc(
338         call.call(), sizeof(ClientAsyncWriter<W>)))
339         ClientAsyncWriter<W>(call, context, response, start, tag);
340   }
341 };
342 }  // namespace internal
343 
344 /// Async API on the client side for doing client-streaming RPCs,
345 /// where the outgoing message stream going to the server contains
346 /// messages of type \a W.
347 template <class W>
348 class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
349  public:
350   // always allocated against a call arena, no memory free required
delete(void * ptr,std::size_t size)351   static void operator delete(void* ptr, std::size_t size) {
352     assert(size == sizeof(ClientAsyncWriter));
353   }
354 
355   // This operator should never be called as the memory should be freed as part
356   // of the arena destruction. It only exists to provide a matching operator
357   // delete to the operator new so that some compilers will not complain (see
358   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
359   // there are no tests catching the compiler warning.
delete(void *,void *)360   static void operator delete(void*, void*) { assert(0); }
361 
StartCall(void * tag)362   void StartCall(void* tag) override {
363     assert(!started_);
364     started_ = true;
365     StartCallInternal(tag);
366   }
367 
368   /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method for
369   /// semantics.
370   ///
371   /// Side effect:
372   ///   - upon receiving initial metadata from the server, the \a ClientContext
373   ///     associated with this call is updated, and the calling code can access
374   ///     the received metadata through the \a ClientContext.
ReadInitialMetadata(void * tag)375   void ReadInitialMetadata(void* tag) override {
376     assert(started_);
377     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
378 
379     meta_ops_.set_output_tag(tag);
380     meta_ops_.RecvInitialMetadata(context_);
381     call_.PerformOps(&meta_ops_);
382   }
383 
Write(const W & msg,void * tag)384   void Write(const W& msg, void* tag) override {
385     assert(started_);
386     write_ops_.set_output_tag(tag);
387     // TODO(ctiller): don't assert
388     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
389     call_.PerformOps(&write_ops_);
390   }
391 
Write(const W & msg,WriteOptions options,void * tag)392   void Write(const W& msg, WriteOptions options, void* tag) override {
393     assert(started_);
394     write_ops_.set_output_tag(tag);
395     if (options.is_last_message()) {
396       options.set_buffer_hint();
397       write_ops_.ClientSendClose();
398     }
399     // TODO(ctiller): don't assert
400     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
401     call_.PerformOps(&write_ops_);
402   }
403 
WritesDone(void * tag)404   void WritesDone(void* tag) override {
405     assert(started_);
406     write_ops_.set_output_tag(tag);
407     write_ops_.ClientSendClose();
408     call_.PerformOps(&write_ops_);
409   }
410 
411   /// See the \a ClientAsyncStreamingInterface.Finish method for semantics.
412   ///
413   /// Side effect:
414   ///   - the \a ClientContext associated with this call is updated with
415   ///     possible initial and trailing metadata received from the server.
416   ///   - attempts to fill in the \a response parameter passed to this class's
417   ///     constructor with the server's response message.
Finish(Status * status,void * tag)418   void Finish(Status* status, void* tag) override {
419     assert(started_);
420     finish_ops_.set_output_tag(tag);
421     if (!context_->initial_metadata_received_) {
422       finish_ops_.RecvInitialMetadata(context_);
423     }
424     finish_ops_.ClientRecvStatus(context_, status);
425     call_.PerformOps(&finish_ops_);
426   }
427 
428  private:
429   friend class internal::ClientAsyncWriterFactory<W>;
430   template <class R>
ClientAsyncWriter(::grpc::internal::Call call,ClientContext * context,R * response,bool start,void * tag)431   ClientAsyncWriter(::grpc::internal::Call call, ClientContext* context,
432                     R* response, bool start, void* tag)
433       : context_(context), call_(call), started_(start) {
434     finish_ops_.RecvMessage(response);
435     finish_ops_.AllowNoMessage();
436     if (start) {
437       StartCallInternal(tag);
438     } else {
439       assert(tag == nullptr);
440     }
441   }
442 
StartCallInternal(void * tag)443   void StartCallInternal(void* tag) {
444     write_ops_.SendInitialMetadata(context_->send_initial_metadata_,
445                                    context_->initial_metadata_flags());
446     // if corked bit is set in context, we just keep the initial metadata
447     // buffered up to coalesce with later message send. No op is performed.
448     if (!context_->initial_metadata_corked_) {
449       write_ops_.set_output_tag(tag);
450       call_.PerformOps(&write_ops_);
451     }
452   }
453 
454   ClientContext* context_;
455   ::grpc::internal::Call call_;
456   bool started_;
457   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
458       meta_ops_;
459   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
460                               ::grpc::internal::CallOpSendMessage,
461                               ::grpc::internal::CallOpClientSendClose>
462       write_ops_;
463   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
464                               ::grpc::internal::CallOpGenericRecvMessage,
465                               ::grpc::internal::CallOpClientRecvStatus>
466       finish_ops_;
467 };
468 
469 /// Async client-side interface for bi-directional streaming,
470 /// where the client-to-server message stream has messages of type \a W,
471 /// and the server-to-client message stream has messages of type \a R.
472 template <class W, class R>
473 class ClientAsyncReaderWriterInterface
474     : public internal::ClientAsyncStreamingInterface,
475       public internal::AsyncWriterInterface<W>,
476       public internal::AsyncReaderInterface<R> {
477  public:
478   /// Signal the client is done with the writes (half-close the client stream).
479   /// Thread-safe with respect to \a AsyncReaderInterface::Read
480   ///
481   /// \param[in] tag The tag identifying the operation.
482   virtual void WritesDone(void* tag) = 0;
483 };
484 
485 namespace internal {
486 template <class W, class R>
487 class ClientAsyncReaderWriterFactory {
488  public:
489   /// Create a stream object.
490   /// Start the RPC request if \a start is set.
491   /// \a tag will be notified on \a cq when the call has been started (i.e.
492   /// intitial metadata sent). If \a start is not set, \a tag must be
493   /// nullptr and the actual call must be initiated by StartCall
494   /// Note that \a context will be used to fill in custom initial metadata
495   /// used to send to the server when starting the call.
Create(ChannelInterface * channel,CompletionQueue * cq,const::grpc::internal::RpcMethod & method,ClientContext * context,bool start,void * tag)496   static ClientAsyncReaderWriter<W, R>* Create(
497       ChannelInterface* channel, CompletionQueue* cq,
498       const ::grpc::internal::RpcMethod& method, ClientContext* context,
499       bool start, void* tag) {
500     ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
501 
502     return new (g_core_codegen_interface->grpc_call_arena_alloc(
503         call.call(), sizeof(ClientAsyncReaderWriter<W, R>)))
504         ClientAsyncReaderWriter<W, R>(call, context, start, tag);
505   }
506 };
507 }  // namespace internal
508 
509 /// Async client-side interface for bi-directional streaming,
510 /// where the outgoing message stream going to the server
511 /// has messages of type \a W,  and the incoming message stream coming
512 /// from the server has messages of type \a R.
513 template <class W, class R>
514 class ClientAsyncReaderWriter final
515     : public ClientAsyncReaderWriterInterface<W, R> {
516  public:
517   // always allocated against a call arena, no memory free required
delete(void * ptr,std::size_t size)518   static void operator delete(void* ptr, std::size_t size) {
519     assert(size == sizeof(ClientAsyncReaderWriter));
520   }
521 
522   // This operator should never be called as the memory should be freed as part
523   // of the arena destruction. It only exists to provide a matching operator
524   // delete to the operator new so that some compilers will not complain (see
525   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
526   // there are no tests catching the compiler warning.
delete(void *,void *)527   static void operator delete(void*, void*) { assert(0); }
528 
StartCall(void * tag)529   void StartCall(void* tag) override {
530     assert(!started_);
531     started_ = true;
532     StartCallInternal(tag);
533   }
534 
535   /// See the \a ClientAsyncStreamingInterface.ReadInitialMetadata method
536   /// for semantics of this method.
537   ///
538   /// Side effect:
539   ///   - upon receiving initial metadata from the server, the \a ClientContext
540   ///     is updated with it, and then the receiving initial metadata can
541   ///     be accessed through this \a ClientContext.
ReadInitialMetadata(void * tag)542   void ReadInitialMetadata(void* tag) override {
543     assert(started_);
544     GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
545 
546     meta_ops_.set_output_tag(tag);
547     meta_ops_.RecvInitialMetadata(context_);
548     call_.PerformOps(&meta_ops_);
549   }
550 
Read(R * msg,void * tag)551   void Read(R* msg, void* tag) override {
552     assert(started_);
553     read_ops_.set_output_tag(tag);
554     if (!context_->initial_metadata_received_) {
555       read_ops_.RecvInitialMetadata(context_);
556     }
557     read_ops_.RecvMessage(msg);
558     call_.PerformOps(&read_ops_);
559   }
560 
Write(const W & msg,void * tag)561   void Write(const W& msg, void* tag) override {
562     assert(started_);
563     write_ops_.set_output_tag(tag);
564     // TODO(ctiller): don't assert
565     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
566     call_.PerformOps(&write_ops_);
567   }
568 
Write(const W & msg,WriteOptions options,void * tag)569   void Write(const W& msg, WriteOptions options, void* tag) override {
570     assert(started_);
571     write_ops_.set_output_tag(tag);
572     if (options.is_last_message()) {
573       options.set_buffer_hint();
574       write_ops_.ClientSendClose();
575     }
576     // TODO(ctiller): don't assert
577     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
578     call_.PerformOps(&write_ops_);
579   }
580 
WritesDone(void * tag)581   void WritesDone(void* tag) override {
582     assert(started_);
583     write_ops_.set_output_tag(tag);
584     write_ops_.ClientSendClose();
585     call_.PerformOps(&write_ops_);
586   }
587 
588   /// See the \a ClientAsyncStreamingInterface.Finish method for semantics.
589   /// Side effect
590   ///   - the \a ClientContext associated with this call is updated with
591   ///     possible initial and trailing metadata sent from the server.
Finish(Status * status,void * tag)592   void Finish(Status* status, void* tag) override {
593     assert(started_);
594     finish_ops_.set_output_tag(tag);
595     if (!context_->initial_metadata_received_) {
596       finish_ops_.RecvInitialMetadata(context_);
597     }
598     finish_ops_.ClientRecvStatus(context_, status);
599     call_.PerformOps(&finish_ops_);
600   }
601 
602  private:
603   friend class internal::ClientAsyncReaderWriterFactory<W, R>;
ClientAsyncReaderWriter(::grpc::internal::Call call,ClientContext * context,bool start,void * tag)604   ClientAsyncReaderWriter(::grpc::internal::Call call, ClientContext* context,
605                           bool start, void* tag)
606       : context_(context), call_(call), started_(start) {
607     if (start) {
608       StartCallInternal(tag);
609     } else {
610       assert(tag == nullptr);
611     }
612   }
613 
StartCallInternal(void * tag)614   void StartCallInternal(void* tag) {
615     write_ops_.SendInitialMetadata(context_->send_initial_metadata_,
616                                    context_->initial_metadata_flags());
617     // if corked bit is set in context, we just keep the initial metadata
618     // buffered up to coalesce with later message send. No op is performed.
619     if (!context_->initial_metadata_corked_) {
620       write_ops_.set_output_tag(tag);
621       call_.PerformOps(&write_ops_);
622     }
623   }
624 
625   ClientContext* context_;
626   ::grpc::internal::Call call_;
627   bool started_;
628   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
629       meta_ops_;
630   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
631                               ::grpc::internal::CallOpRecvMessage<R>>
632       read_ops_;
633   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
634                               ::grpc::internal::CallOpSendMessage,
635                               ::grpc::internal::CallOpClientSendClose>
636       write_ops_;
637   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
638                               ::grpc::internal::CallOpClientRecvStatus>
639       finish_ops_;
640 };
641 
642 template <class W, class R>
643 class ServerAsyncReaderInterface
644     : public internal::ServerAsyncStreamingInterface,
645       public internal::AsyncReaderInterface<R> {
646  public:
647   /// Indicate that the stream is to be finished with a certain status code
648   /// and also send out \a msg response to the client.
649   /// Request notification for when the server has sent the response and the
650   /// appropriate signals to the client to end the call.
651   /// Should not be used concurrently with other operations.
652   ///
653   /// It is appropriate to call this method when:
654   ///   * all messages from the client have been received (either known
655   ///     implictly, or explicitly because a previous
656   ///     \a AsyncReaderInterface::Read operation with a non-ok result,
657   ///     e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false').
658   ///
659   /// This operation will end when the server has finished sending out initial
660   /// metadata (if not sent already), response message, and status, or if
661   /// some failure occurred when trying to do so.
662   ///
663   /// gRPC doesn't take ownership or a reference to \a msg or \a status, so it
664   /// is safe to to deallocate once Finish returns.
665   ///
666   /// \param[in] tag Tag identifying this request.
667   /// \param[in] status To be sent to the client as the result of this call.
668   /// \param[in] msg To be sent to the client as the response for this call.
669   virtual void Finish(const W& msg, const Status& status, void* tag) = 0;
670 
671   /// Indicate that the stream is to be finished with a certain
672   /// non-OK status code.
673   /// Request notification for when the server has sent the appropriate
674   /// signals to the client to end the call.
675   /// Should not be used concurrently with other operations.
676   ///
677   /// This call is meant to end the call with some error, and can be called at
678   /// any point that the server would like to "fail" the call (though note
679   /// this shouldn't be called concurrently with any other "sending" call, like
680   /// \a AsyncWriterInterface::Write).
681   ///
682   /// This operation will end when the server has finished sending out initial
683   /// metadata (if not sent already), and status, or if some failure occurred
684   /// when trying to do so.
685   ///
686   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
687   /// to deallocate once FinishWithError returns.
688   ///
689   /// \param[in] tag Tag identifying this request.
690   /// \param[in] status To be sent to the client as the result of this call.
691   ///     - Note: \a status must have a non-OK code.
692   virtual void FinishWithError(const Status& status, void* tag) = 0;
693 };
694 
695 /// Async server-side API for doing client-streaming RPCs,
696 /// where the incoming message stream from the client has messages of type \a R,
697 /// and the single response message sent from the server is type \a W.
698 template <class W, class R>
699 class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> {
700  public:
ServerAsyncReader(ServerContext * ctx)701   explicit ServerAsyncReader(ServerContext* ctx)
702       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
703 
704   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
705   ///
706   /// Implicit input parameter:
707   ///   - The initial metadata that will be sent to the client from this op will
708   ///     be taken from the \a ServerContext associated with the call.
SendInitialMetadata(void * tag)709   void SendInitialMetadata(void* tag) override {
710     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
711 
712     meta_ops_.set_output_tag(tag);
713     meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
714                                   ctx_->initial_metadata_flags());
715     if (ctx_->compression_level_set()) {
716       meta_ops_.set_compression_level(ctx_->compression_level());
717     }
718     ctx_->sent_initial_metadata_ = true;
719     call_.PerformOps(&meta_ops_);
720   }
721 
Read(R * msg,void * tag)722   void Read(R* msg, void* tag) override {
723     read_ops_.set_output_tag(tag);
724     read_ops_.RecvMessage(msg);
725     call_.PerformOps(&read_ops_);
726   }
727 
728   /// See the \a ServerAsyncReaderInterface.Read method for semantics
729   ///
730   /// Side effect:
731   ///   - also sends initial metadata if not alreay sent.
732   ///   - uses the \a ServerContext associated with this call to send possible
733   ///     initial and trailing metadata.
734   ///
735   /// Note: \a msg is not sent if \a status has a non-OK code.
736   ///
737   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
738   /// is safe to to deallocate once Finish returns.
Finish(const W & msg,const Status & status,void * tag)739   void Finish(const W& msg, const Status& status, void* tag) override {
740     finish_ops_.set_output_tag(tag);
741     if (!ctx_->sent_initial_metadata_) {
742       finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
743                                       ctx_->initial_metadata_flags());
744       if (ctx_->compression_level_set()) {
745         finish_ops_.set_compression_level(ctx_->compression_level());
746       }
747       ctx_->sent_initial_metadata_ = true;
748     }
749     // The response is dropped if the status is not OK.
750     if (status.ok()) {
751       finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
752                                    finish_ops_.SendMessage(msg));
753     } else {
754       finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
755     }
756     call_.PerformOps(&finish_ops_);
757   }
758 
759   /// See the \a ServerAsyncReaderInterface.Read method for semantics
760   ///
761   /// Side effect:
762   ///   - also sends initial metadata if not alreay sent.
763   ///   - uses the \a ServerContext associated with this call to send possible
764   ///     initial and trailing metadata.
765   ///
766   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
767   /// to deallocate once FinishWithError returns.
FinishWithError(const Status & status,void * tag)768   void FinishWithError(const Status& status, void* tag) override {
769     GPR_CODEGEN_ASSERT(!status.ok());
770     finish_ops_.set_output_tag(tag);
771     if (!ctx_->sent_initial_metadata_) {
772       finish_ops_.SendInitialMetadata(ctx_->initial_metadata_,
773                                       ctx_->initial_metadata_flags());
774       if (ctx_->compression_level_set()) {
775         finish_ops_.set_compression_level(ctx_->compression_level());
776       }
777       ctx_->sent_initial_metadata_ = true;
778     }
779     finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
780     call_.PerformOps(&finish_ops_);
781   }
782 
783  private:
BindCall(::grpc::internal::Call * call)784   void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
785 
786   ::grpc::internal::Call call_;
787   ServerContext* ctx_;
788   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
789       meta_ops_;
790   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_;
791   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
792                               ::grpc::internal::CallOpSendMessage,
793                               ::grpc::internal::CallOpServerSendStatus>
794       finish_ops_;
795 };
796 
797 template <class W>
798 class ServerAsyncWriterInterface
799     : public internal::ServerAsyncStreamingInterface,
800       public internal::AsyncWriterInterface<W> {
801  public:
802   /// Indicate that the stream is to be finished with a certain status code.
803   /// Request notification for when the server has sent the appropriate
804   /// signals to the client to end the call.
805   /// Should not be used concurrently with other operations.
806   ///
807   /// It is appropriate to call this method when either:
808   ///   * all messages from the client have been received (either known
809   ///     implictly, or explicitly because a previous \a
810   ///     AsyncReaderInterface::Read operation with a non-ok
811   ///     result (e.g., cq->Next(&read_tag, &ok) filled in 'ok' with 'false'.
812   ///   * it is desired to end the call early with some non-OK status code.
813   ///
814   /// This operation will end when the server has finished sending out initial
815   /// metadata (if not sent already), response message, and status, or if
816   /// some failure occurred when trying to do so.
817   ///
818   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
819   /// to deallocate once Finish returns.
820   ///
821   /// \param[in] tag Tag identifying this request.
822   /// \param[in] status To be sent to the client as the result of this call.
823   virtual void Finish(const Status& status, void* tag) = 0;
824 
825   /// Request the writing of \a msg and coalesce it with trailing metadata which
826   /// contains \a status, using WriteOptions options with
827   /// identifying tag \a tag.
828   ///
829   /// WriteAndFinish is equivalent of performing WriteLast and Finish
830   /// in a single step.
831   ///
832   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
833   /// is safe to to deallocate once WriteAndFinish returns.
834   ///
835   /// \param[in] msg The message to be written.
836   /// \param[in] options The WriteOptions to be used to write this message.
837   /// \param[in] status The Status that server returns to client.
838   /// \param[in] tag The tag identifying the operation.
839   virtual void WriteAndFinish(const W& msg, WriteOptions options,
840                               const Status& status, void* tag) = 0;
841 };
842 
843 /// Async server-side API for doing server streaming RPCs,
844 /// where the outgoing message stream from the server has messages of type \a W.
845 template <class W>
846 class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
847  public:
ServerAsyncWriter(ServerContext * ctx)848   explicit ServerAsyncWriter(ServerContext* ctx)
849       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
850 
851   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
852   ///
853   /// Implicit input parameter:
854   ///   - The initial metadata that will be sent to the client from this op will
855   ///     be taken from the \a ServerContext associated with the call.
856   ///
857   /// \param[in] tag Tag identifying this request.
SendInitialMetadata(void * tag)858   void SendInitialMetadata(void* tag) override {
859     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
860 
861     meta_ops_.set_output_tag(tag);
862     meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
863                                   ctx_->initial_metadata_flags());
864     if (ctx_->compression_level_set()) {
865       meta_ops_.set_compression_level(ctx_->compression_level());
866     }
867     ctx_->sent_initial_metadata_ = true;
868     call_.PerformOps(&meta_ops_);
869   }
870 
Write(const W & msg,void * tag)871   void Write(const W& msg, void* tag) override {
872     write_ops_.set_output_tag(tag);
873     EnsureInitialMetadataSent(&write_ops_);
874     // TODO(ctiller): don't assert
875     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
876     call_.PerformOps(&write_ops_);
877   }
878 
Write(const W & msg,WriteOptions options,void * tag)879   void Write(const W& msg, WriteOptions options, void* tag) override {
880     write_ops_.set_output_tag(tag);
881     if (options.is_last_message()) {
882       options.set_buffer_hint();
883     }
884 
885     EnsureInitialMetadataSent(&write_ops_);
886     // TODO(ctiller): don't assert
887     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
888     call_.PerformOps(&write_ops_);
889   }
890 
891   /// See the \a ServerAsyncWriterInterface.WriteAndFinish method for semantics.
892   ///
893   /// Implicit input parameter:
894   ///   - the \a ServerContext associated with this call is used
895   ///     for sending trailing (and initial) metadata to the client.
896   ///
897   /// Note: \a status must have an OK code.
898   ///
899   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
900   /// is safe to to deallocate once WriteAndFinish returns.
WriteAndFinish(const W & msg,WriteOptions options,const Status & status,void * tag)901   void WriteAndFinish(const W& msg, WriteOptions options, const Status& status,
902                       void* tag) override {
903     write_ops_.set_output_tag(tag);
904     EnsureInitialMetadataSent(&write_ops_);
905     options.set_buffer_hint();
906     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
907     write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
908     call_.PerformOps(&write_ops_);
909   }
910 
911   /// See the \a ServerAsyncWriterInterface.Finish method for semantics.
912   ///
913   /// Implicit input parameter:
914   ///   - the \a ServerContext associated with this call is used for sending
915   ///     trailing (and initial if not already sent) metadata to the client.
916   ///
917   /// Note: there are no restrictions are the code of
918   /// \a status,it may be non-OK
919   ///
920   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
921   /// to deallocate once Finish returns.
Finish(const Status & status,void * tag)922   void Finish(const Status& status, void* tag) override {
923     finish_ops_.set_output_tag(tag);
924     EnsureInitialMetadataSent(&finish_ops_);
925     finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
926     call_.PerformOps(&finish_ops_);
927   }
928 
929  private:
BindCall(::grpc::internal::Call * call)930   void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
931 
932   template <class T>
EnsureInitialMetadataSent(T * ops)933   void EnsureInitialMetadataSent(T* ops) {
934     if (!ctx_->sent_initial_metadata_) {
935       ops->SendInitialMetadata(ctx_->initial_metadata_,
936                                ctx_->initial_metadata_flags());
937       if (ctx_->compression_level_set()) {
938         ops->set_compression_level(ctx_->compression_level());
939       }
940       ctx_->sent_initial_metadata_ = true;
941     }
942   }
943 
944   ::grpc::internal::Call call_;
945   ServerContext* ctx_;
946   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
947       meta_ops_;
948   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
949                               ::grpc::internal::CallOpSendMessage,
950                               ::grpc::internal::CallOpServerSendStatus>
951       write_ops_;
952   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
953                               ::grpc::internal::CallOpServerSendStatus>
954       finish_ops_;
955 };
956 
957 /// Server-side interface for asynchronous bi-directional streaming.
958 template <class W, class R>
959 class ServerAsyncReaderWriterInterface
960     : public internal::ServerAsyncStreamingInterface,
961       public internal::AsyncWriterInterface<W>,
962       public internal::AsyncReaderInterface<R> {
963  public:
964   /// Indicate that the stream is to be finished with a certain status code.
965   /// Request notification for when the server has sent the appropriate
966   /// signals to the client to end the call.
967   /// Should not be used concurrently with other operations.
968   ///
969   /// It is appropriate to call this method when either:
970   ///   * all messages from the client have been received (either known
971   ///     implictly, or explicitly because a previous \a
972   ///     AsyncReaderInterface::Read operation
973   ///     with a non-ok result (e.g., cq->Next(&read_tag, &ok) filled in 'ok'
974   ///     with 'false'.
975   ///   * it is desired to end the call early with some non-OK status code.
976   ///
977   /// This operation will end when the server has finished sending out initial
978   /// metadata (if not sent already), response message, and status, or if some
979   /// failure occurred when trying to do so.
980   ///
981   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
982   /// to deallocate once Finish returns.
983   ///
984   /// \param[in] tag Tag identifying this request.
985   /// \param[in] status To be sent to the client as the result of this call.
986   virtual void Finish(const Status& status, void* tag) = 0;
987 
988   /// Request the writing of \a msg and coalesce it with trailing metadata which
989   /// contains \a status, using WriteOptions options with
990   /// identifying tag \a tag.
991   ///
992   /// WriteAndFinish is equivalent of performing WriteLast and Finish in a
993   /// single step.
994   ///
995   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
996   /// is safe to to deallocate once WriteAndFinish returns.
997   ///
998   /// \param[in] msg The message to be written.
999   /// \param[in] options The WriteOptions to be used to write this message.
1000   /// \param[in] status The Status that server returns to client.
1001   /// \param[in] tag The tag identifying the operation.
1002   virtual void WriteAndFinish(const W& msg, WriteOptions options,
1003                               const Status& status, void* tag) = 0;
1004 };
1005 
1006 /// Async server-side API for doing bidirectional streaming RPCs,
1007 /// where the incoming message stream coming from the client has messages of
1008 /// type \a R, and the outgoing message stream coming from the server has
1009 /// messages of type \a W.
1010 template <class W, class R>
1011 class ServerAsyncReaderWriter final
1012     : public ServerAsyncReaderWriterInterface<W, R> {
1013  public:
ServerAsyncReaderWriter(ServerContext * ctx)1014   explicit ServerAsyncReaderWriter(ServerContext* ctx)
1015       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
1016 
1017   /// See \a ServerAsyncStreamingInterface::SendInitialMetadata for semantics.
1018   ///
1019   /// Implicit input parameter:
1020   ///   - The initial metadata that will be sent to the client from this op will
1021   ///     be taken from the \a ServerContext associated with the call.
1022   ///
1023   /// \param[in] tag Tag identifying this request.
SendInitialMetadata(void * tag)1024   void SendInitialMetadata(void* tag) override {
1025     GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
1026 
1027     meta_ops_.set_output_tag(tag);
1028     meta_ops_.SendInitialMetadata(ctx_->initial_metadata_,
1029                                   ctx_->initial_metadata_flags());
1030     if (ctx_->compression_level_set()) {
1031       meta_ops_.set_compression_level(ctx_->compression_level());
1032     }
1033     ctx_->sent_initial_metadata_ = true;
1034     call_.PerformOps(&meta_ops_);
1035   }
1036 
Read(R * msg,void * tag)1037   void Read(R* msg, void* tag) override {
1038     read_ops_.set_output_tag(tag);
1039     read_ops_.RecvMessage(msg);
1040     call_.PerformOps(&read_ops_);
1041   }
1042 
Write(const W & msg,void * tag)1043   void Write(const W& msg, void* tag) override {
1044     write_ops_.set_output_tag(tag);
1045     EnsureInitialMetadataSent(&write_ops_);
1046     // TODO(ctiller): don't assert
1047     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg).ok());
1048     call_.PerformOps(&write_ops_);
1049   }
1050 
Write(const W & msg,WriteOptions options,void * tag)1051   void Write(const W& msg, WriteOptions options, void* tag) override {
1052     write_ops_.set_output_tag(tag);
1053     if (options.is_last_message()) {
1054       options.set_buffer_hint();
1055     }
1056     EnsureInitialMetadataSent(&write_ops_);
1057     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
1058     call_.PerformOps(&write_ops_);
1059   }
1060 
1061   /// See the \a ServerAsyncReaderWriterInterface.WriteAndFinish
1062   /// method for semantics.
1063   ///
1064   /// Implicit input parameter:
1065   ///   - the \a ServerContext associated with this call is used
1066   ///     for sending trailing (and initial) metadata to the client.
1067   ///
1068   /// Note: \a status must have an OK code.
1069   //
1070   /// gRPC doesn't take ownership or a reference to \a msg and \a status, so it
1071   /// is safe to to deallocate once WriteAndFinish returns.
WriteAndFinish(const W & msg,WriteOptions options,const Status & status,void * tag)1072   void WriteAndFinish(const W& msg, WriteOptions options, const Status& status,
1073                       void* tag) override {
1074     write_ops_.set_output_tag(tag);
1075     EnsureInitialMetadataSent(&write_ops_);
1076     options.set_buffer_hint();
1077     GPR_CODEGEN_ASSERT(write_ops_.SendMessage(msg, options).ok());
1078     write_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
1079     call_.PerformOps(&write_ops_);
1080   }
1081 
1082   /// See the \a ServerAsyncReaderWriterInterface.Finish method for semantics.
1083   ///
1084   /// Implicit input parameter:
1085   ///   - the \a ServerContext associated with this call is used for sending
1086   ///     trailing (and initial if not already sent) metadata to the client.
1087   ///
1088   /// Note: there are no restrictions are the code of \a status,
1089   /// it may be non-OK
1090   //
1091   /// gRPC doesn't take ownership or a reference to \a status, so it is safe to
1092   /// to deallocate once Finish returns.
Finish(const Status & status,void * tag)1093   void Finish(const Status& status, void* tag) override {
1094     finish_ops_.set_output_tag(tag);
1095     EnsureInitialMetadataSent(&finish_ops_);
1096 
1097     finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
1098     call_.PerformOps(&finish_ops_);
1099   }
1100 
1101  private:
1102   friend class ::grpc::Server;
1103 
BindCall(::grpc::internal::Call * call)1104   void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
1105 
1106   template <class T>
EnsureInitialMetadataSent(T * ops)1107   void EnsureInitialMetadataSent(T* ops) {
1108     if (!ctx_->sent_initial_metadata_) {
1109       ops->SendInitialMetadata(ctx_->initial_metadata_,
1110                                ctx_->initial_metadata_flags());
1111       if (ctx_->compression_level_set()) {
1112         ops->set_compression_level(ctx_->compression_level());
1113       }
1114       ctx_->sent_initial_metadata_ = true;
1115     }
1116   }
1117 
1118   ::grpc::internal::Call call_;
1119   ServerContext* ctx_;
1120   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
1121       meta_ops_;
1122   ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_;
1123   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
1124                               ::grpc::internal::CallOpSendMessage,
1125                               ::grpc::internal::CallOpServerSendStatus>
1126       write_ops_;
1127   ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
1128                               ::grpc::internal::CallOpServerSendStatus>
1129       finish_ops_;
1130 };
1131 
1132 }  // namespace grpc
1133 
1134 #endif  // GRPCPP_IMPL_CODEGEN_ASYNC_STREAM_H
1135