• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_CALL_H
20 #define GRPCPP_IMPL_CODEGEN_CALL_H
21 
22 #include <assert.h>
23 #include <cstring>
24 #include <functional>
25 #include <map>
26 #include <memory>
27 
28 #include <grpcpp/impl/codegen/byte_buffer.h>
29 #include <grpcpp/impl/codegen/call_hook.h>
30 #include <grpcpp/impl/codegen/client_context.h>
31 #include <grpcpp/impl/codegen/completion_queue_tag.h>
32 #include <grpcpp/impl/codegen/config.h>
33 #include <grpcpp/impl/codegen/core_codegen_interface.h>
34 #include <grpcpp/impl/codegen/serialization_traits.h>
35 #include <grpcpp/impl/codegen/slice.h>
36 #include <grpcpp/impl/codegen/status.h>
37 #include <grpcpp/impl/codegen/string_ref.h>
38 
39 #include <grpc/impl/codegen/atm.h>
40 #include <grpc/impl/codegen/compression_types.h>
41 #include <grpc/impl/codegen/grpc_types.h>
42 
43 namespace grpc {
44 
45 class ByteBuffer;
46 class CompletionQueue;
47 extern CoreCodegenInterface* g_core_codegen_interface;
48 
49 namespace internal {
50 class Call;
51 class CallHook;
52 
53 // TODO(yangg) if the map is changed before we send, the pointers will be a
54 // mess. Make sure it does not happen.
FillMetadataArray(const std::multimap<grpc::string,grpc::string> & metadata,size_t * metadata_count,const grpc::string & optional_error_details)55 inline grpc_metadata* FillMetadataArray(
56     const std::multimap<grpc::string, grpc::string>& metadata,
57     size_t* metadata_count, const grpc::string& optional_error_details) {
58   *metadata_count = metadata.size() + (optional_error_details.empty() ? 0 : 1);
59   if (*metadata_count == 0) {
60     return nullptr;
61   }
62   grpc_metadata* metadata_array =
63       (grpc_metadata*)(g_core_codegen_interface->gpr_malloc(
64           (*metadata_count) * sizeof(grpc_metadata)));
65   size_t i = 0;
66   for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
67     metadata_array[i].key = SliceReferencingString(iter->first);
68     metadata_array[i].value = SliceReferencingString(iter->second);
69   }
70   if (!optional_error_details.empty()) {
71     metadata_array[i].key =
72         g_core_codegen_interface->grpc_slice_from_static_buffer(
73             kBinaryErrorDetailsKey, sizeof(kBinaryErrorDetailsKey) - 1);
74     metadata_array[i].value = SliceReferencingString(optional_error_details);
75   }
76   return metadata_array;
77 }
78 }  // namespace internal
79 
80 /// Per-message write options.
81 class WriteOptions {
82  public:
WriteOptions()83   WriteOptions() : flags_(0), last_message_(false) {}
WriteOptions(const WriteOptions & other)84   WriteOptions(const WriteOptions& other)
85       : flags_(other.flags_), last_message_(other.last_message_) {}
86 
87   /// Clear all flags.
Clear()88   inline void Clear() { flags_ = 0; }
89 
90   /// Returns raw flags bitset.
flags()91   inline uint32_t flags() const { return flags_; }
92 
93   /// Sets flag for the disabling of compression for the next message write.
94   ///
95   /// \sa GRPC_WRITE_NO_COMPRESS
set_no_compression()96   inline WriteOptions& set_no_compression() {
97     SetBit(GRPC_WRITE_NO_COMPRESS);
98     return *this;
99   }
100 
101   /// Clears flag for the disabling of compression for the next message write.
102   ///
103   /// \sa GRPC_WRITE_NO_COMPRESS
clear_no_compression()104   inline WriteOptions& clear_no_compression() {
105     ClearBit(GRPC_WRITE_NO_COMPRESS);
106     return *this;
107   }
108 
109   /// Get value for the flag indicating whether compression for the next
110   /// message write is forcefully disabled.
111   ///
112   /// \sa GRPC_WRITE_NO_COMPRESS
get_no_compression()113   inline bool get_no_compression() const {
114     return GetBit(GRPC_WRITE_NO_COMPRESS);
115   }
116 
117   /// Sets flag indicating that the write may be buffered and need not go out on
118   /// the wire immediately.
119   ///
120   /// \sa GRPC_WRITE_BUFFER_HINT
set_buffer_hint()121   inline WriteOptions& set_buffer_hint() {
122     SetBit(GRPC_WRITE_BUFFER_HINT);
123     return *this;
124   }
125 
126   /// Clears flag indicating that the write may be buffered and need not go out
127   /// on the wire immediately.
128   ///
129   /// \sa GRPC_WRITE_BUFFER_HINT
clear_buffer_hint()130   inline WriteOptions& clear_buffer_hint() {
131     ClearBit(GRPC_WRITE_BUFFER_HINT);
132     return *this;
133   }
134 
135   /// Get value for the flag indicating that the write may be buffered and need
136   /// not go out on the wire immediately.
137   ///
138   /// \sa GRPC_WRITE_BUFFER_HINT
get_buffer_hint()139   inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
140 
141   /// corked bit: aliases set_buffer_hint currently, with the intent that
142   /// set_buffer_hint will be removed in the future
set_corked()143   inline WriteOptions& set_corked() {
144     SetBit(GRPC_WRITE_BUFFER_HINT);
145     return *this;
146   }
147 
clear_corked()148   inline WriteOptions& clear_corked() {
149     ClearBit(GRPC_WRITE_BUFFER_HINT);
150     return *this;
151   }
152 
is_corked()153   inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
154 
155   /// last-message bit: indicates this is the last message in a stream
156   /// client-side:  makes Write the equivalent of performing Write, WritesDone
157   /// in a single step
158   /// server-side:  hold the Write until the service handler returns (sync api)
159   /// or until Finish is called (async api)
set_last_message()160   inline WriteOptions& set_last_message() {
161     last_message_ = true;
162     return *this;
163   }
164 
165   /// Clears flag indicating that this is the last message in a stream,
166   /// disabling coalescing.
clear_last_message()167   inline WriteOptions& clear_last_message() {
168     last_message_ = false;
169     return *this;
170   }
171 
172   /// Guarantee that all bytes have been written to the socket before completing
173   /// this write (usually writes are completed when they pass flow control).
set_write_through()174   inline WriteOptions& set_write_through() {
175     SetBit(GRPC_WRITE_THROUGH);
176     return *this;
177   }
178 
is_write_through()179   inline bool is_write_through() const { return GetBit(GRPC_WRITE_THROUGH); }
180 
181   /// Get value for the flag indicating that this is the last message, and
182   /// should be coalesced with trailing metadata.
183   ///
184   /// \sa GRPC_WRITE_LAST_MESSAGE
is_last_message()185   bool is_last_message() const { return last_message_; }
186 
187   WriteOptions& operator=(const WriteOptions& rhs) {
188     flags_ = rhs.flags_;
189     return *this;
190   }
191 
192  private:
SetBit(const uint32_t mask)193   void SetBit(const uint32_t mask) { flags_ |= mask; }
194 
ClearBit(const uint32_t mask)195   void ClearBit(const uint32_t mask) { flags_ &= ~mask; }
196 
GetBit(const uint32_t mask)197   bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
198 
199   uint32_t flags_;
200   bool last_message_;
201 };
202 
203 namespace internal {
204 /// Default argument for CallOpSet. I is unused by the class, but can be
205 /// used for generating multiple names for the same thing.
206 template <int I>
207 class CallNoOp {
208  protected:
AddOp(grpc_op * ops,size_t * nops)209   void AddOp(grpc_op* ops, size_t* nops) {}
FinishOp(bool * status)210   void FinishOp(bool* status) {}
211 };
212 
213 class CallOpSendInitialMetadata {
214  public:
CallOpSendInitialMetadata()215   CallOpSendInitialMetadata() : send_(false) {
216     maybe_compression_level_.is_set = false;
217   }
218 
SendInitialMetadata(const std::multimap<grpc::string,grpc::string> & metadata,uint32_t flags)219   void SendInitialMetadata(
220       const std::multimap<grpc::string, grpc::string>& metadata,
221       uint32_t flags) {
222     maybe_compression_level_.is_set = false;
223     send_ = true;
224     flags_ = flags;
225     initial_metadata_ =
226         FillMetadataArray(metadata, &initial_metadata_count_, "");
227   }
228 
set_compression_level(grpc_compression_level level)229   void set_compression_level(grpc_compression_level level) {
230     maybe_compression_level_.is_set = true;
231     maybe_compression_level_.level = level;
232   }
233 
234  protected:
AddOp(grpc_op * ops,size_t * nops)235   void AddOp(grpc_op* ops, size_t* nops) {
236     if (!send_) return;
237     grpc_op* op = &ops[(*nops)++];
238     op->op = GRPC_OP_SEND_INITIAL_METADATA;
239     op->flags = flags_;
240     op->reserved = NULL;
241     op->data.send_initial_metadata.count = initial_metadata_count_;
242     op->data.send_initial_metadata.metadata = initial_metadata_;
243     op->data.send_initial_metadata.maybe_compression_level.is_set =
244         maybe_compression_level_.is_set;
245     if (maybe_compression_level_.is_set) {
246       op->data.send_initial_metadata.maybe_compression_level.level =
247           maybe_compression_level_.level;
248     }
249   }
FinishOp(bool * status)250   void FinishOp(bool* status) {
251     if (!send_) return;
252     g_core_codegen_interface->gpr_free(initial_metadata_);
253     send_ = false;
254   }
255 
256   bool send_;
257   uint32_t flags_;
258   size_t initial_metadata_count_;
259   grpc_metadata* initial_metadata_;
260   struct {
261     bool is_set;
262     grpc_compression_level level;
263   } maybe_compression_level_;
264 };
265 
266 class CallOpSendMessage {
267  public:
CallOpSendMessage()268   CallOpSendMessage() : send_buf_() {}
269 
270   /// Send \a message using \a options for the write. The \a options are cleared
271   /// after use.
272   template <class M>
273   Status SendMessage(const M& message,
274                      WriteOptions options) GRPC_MUST_USE_RESULT;
275 
276   template <class M>
277   Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
278 
279  protected:
AddOp(grpc_op * ops,size_t * nops)280   void AddOp(grpc_op* ops, size_t* nops) {
281     if (!send_buf_.Valid()) return;
282     grpc_op* op = &ops[(*nops)++];
283     op->op = GRPC_OP_SEND_MESSAGE;
284     op->flags = write_options_.flags();
285     op->reserved = NULL;
286     op->data.send_message.send_message = send_buf_.c_buffer();
287     // Flags are per-message: clear them after use.
288     write_options_.Clear();
289   }
FinishOp(bool * status)290   void FinishOp(bool* status) { send_buf_.Clear(); }
291 
292  private:
293   ByteBuffer send_buf_;
294   WriteOptions write_options_;
295 };
296 
297 template <class M>
SendMessage(const M & message,WriteOptions options)298 Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) {
299   write_options_ = options;
300   bool own_buf;
301   // TODO(vjpai): Remove the void below when possible
302   // The void in the template parameter below should not be needed
303   // (since it should be implicit) but is needed due to an observed
304   // difference in behavior between clang and gcc for certain internal users
305   Status result = SerializationTraits<M, void>::Serialize(
306       message, send_buf_.bbuf_ptr(), &own_buf);
307   if (!own_buf) {
308     send_buf_.Duplicate();
309   }
310   return result;
311 }
312 
313 template <class M>
SendMessage(const M & message)314 Status CallOpSendMessage::SendMessage(const M& message) {
315   return SendMessage(message, WriteOptions());
316 }
317 
318 template <class R>
319 class CallOpRecvMessage {
320  public:
CallOpRecvMessage()321   CallOpRecvMessage()
322       : got_message(false),
323         message_(nullptr),
324         allow_not_getting_message_(false) {}
325 
RecvMessage(R * message)326   void RecvMessage(R* message) { message_ = message; }
327 
328   // Do not change status if no message is received.
AllowNoMessage()329   void AllowNoMessage() { allow_not_getting_message_ = true; }
330 
331   bool got_message;
332 
333  protected:
AddOp(grpc_op * ops,size_t * nops)334   void AddOp(grpc_op* ops, size_t* nops) {
335     if (message_ == nullptr) return;
336     grpc_op* op = &ops[(*nops)++];
337     op->op = GRPC_OP_RECV_MESSAGE;
338     op->flags = 0;
339     op->reserved = NULL;
340     op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
341   }
342 
FinishOp(bool * status)343   void FinishOp(bool* status) {
344     if (message_ == nullptr) return;
345     if (recv_buf_.Valid()) {
346       if (*status) {
347         got_message = *status =
348             SerializationTraits<R>::Deserialize(recv_buf_.bbuf_ptr(), message_)
349                 .ok();
350         recv_buf_.Release();
351       } else {
352         got_message = false;
353         recv_buf_.Clear();
354       }
355     } else {
356       got_message = false;
357       if (!allow_not_getting_message_) {
358         *status = false;
359       }
360     }
361     message_ = nullptr;
362   }
363 
364  private:
365   R* message_;
366   ByteBuffer recv_buf_;
367   bool allow_not_getting_message_;
368 };
369 
370 class DeserializeFunc {
371  public:
372   virtual Status Deserialize(ByteBuffer* buf) = 0;
~DeserializeFunc()373   virtual ~DeserializeFunc() {}
374 };
375 
376 template <class R>
377 class DeserializeFuncType final : public DeserializeFunc {
378  public:
DeserializeFuncType(R * message)379   DeserializeFuncType(R* message) : message_(message) {}
Deserialize(ByteBuffer * buf)380   Status Deserialize(ByteBuffer* buf) override {
381     return SerializationTraits<R>::Deserialize(buf->bbuf_ptr(), message_);
382   }
383 
~DeserializeFuncType()384   ~DeserializeFuncType() override {}
385 
386  private:
387   R* message_;  // Not a managed pointer because management is external to this
388 };
389 
390 class CallOpGenericRecvMessage {
391  public:
CallOpGenericRecvMessage()392   CallOpGenericRecvMessage()
393       : got_message(false), allow_not_getting_message_(false) {}
394 
395   template <class R>
RecvMessage(R * message)396   void RecvMessage(R* message) {
397     // Use an explicit base class pointer to avoid resolution error in the
398     // following unique_ptr::reset for some old implementations.
399     DeserializeFunc* func = new DeserializeFuncType<R>(message);
400     deserialize_.reset(func);
401   }
402 
403   // Do not change status if no message is received.
AllowNoMessage()404   void AllowNoMessage() { allow_not_getting_message_ = true; }
405 
406   bool got_message;
407 
408  protected:
AddOp(grpc_op * ops,size_t * nops)409   void AddOp(grpc_op* ops, size_t* nops) {
410     if (!deserialize_) return;
411     grpc_op* op = &ops[(*nops)++];
412     op->op = GRPC_OP_RECV_MESSAGE;
413     op->flags = 0;
414     op->reserved = NULL;
415     op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
416   }
417 
FinishOp(bool * status)418   void FinishOp(bool* status) {
419     if (!deserialize_) return;
420     if (recv_buf_.Valid()) {
421       if (*status) {
422         got_message = true;
423         *status = deserialize_->Deserialize(&recv_buf_).ok();
424         recv_buf_.Release();
425       } else {
426         got_message = false;
427         recv_buf_.Clear();
428       }
429     } else {
430       got_message = false;
431       if (!allow_not_getting_message_) {
432         *status = false;
433       }
434     }
435     deserialize_.reset();
436   }
437 
438  private:
439   std::unique_ptr<DeserializeFunc> deserialize_;
440   ByteBuffer recv_buf_;
441   bool allow_not_getting_message_;
442 };
443 
444 class CallOpClientSendClose {
445  public:
CallOpClientSendClose()446   CallOpClientSendClose() : send_(false) {}
447 
ClientSendClose()448   void ClientSendClose() { send_ = true; }
449 
450  protected:
AddOp(grpc_op * ops,size_t * nops)451   void AddOp(grpc_op* ops, size_t* nops) {
452     if (!send_) return;
453     grpc_op* op = &ops[(*nops)++];
454     op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
455     op->flags = 0;
456     op->reserved = NULL;
457   }
FinishOp(bool * status)458   void FinishOp(bool* status) { send_ = false; }
459 
460  private:
461   bool send_;
462 };
463 
464 class CallOpServerSendStatus {
465  public:
CallOpServerSendStatus()466   CallOpServerSendStatus() : send_status_available_(false) {}
467 
ServerSendStatus(const std::multimap<grpc::string,grpc::string> & trailing_metadata,const Status & status)468   void ServerSendStatus(
469       const std::multimap<grpc::string, grpc::string>& trailing_metadata,
470       const Status& status) {
471     send_error_details_ = status.error_details();
472     trailing_metadata_ = FillMetadataArray(
473         trailing_metadata, &trailing_metadata_count_, send_error_details_);
474     send_status_available_ = true;
475     send_status_code_ = static_cast<grpc_status_code>(status.error_code());
476     send_error_message_ = status.error_message();
477   }
478 
479  protected:
AddOp(grpc_op * ops,size_t * nops)480   void AddOp(grpc_op* ops, size_t* nops) {
481     if (!send_status_available_) return;
482     grpc_op* op = &ops[(*nops)++];
483     op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
484     op->data.send_status_from_server.trailing_metadata_count =
485         trailing_metadata_count_;
486     op->data.send_status_from_server.trailing_metadata = trailing_metadata_;
487     op->data.send_status_from_server.status = send_status_code_;
488     error_message_slice_ = SliceReferencingString(send_error_message_);
489     op->data.send_status_from_server.status_details =
490         send_error_message_.empty() ? nullptr : &error_message_slice_;
491     op->flags = 0;
492     op->reserved = NULL;
493   }
494 
FinishOp(bool * status)495   void FinishOp(bool* status) {
496     if (!send_status_available_) return;
497     g_core_codegen_interface->gpr_free(trailing_metadata_);
498     send_status_available_ = false;
499   }
500 
501  private:
502   bool send_status_available_;
503   grpc_status_code send_status_code_;
504   grpc::string send_error_details_;
505   grpc::string send_error_message_;
506   size_t trailing_metadata_count_;
507   grpc_metadata* trailing_metadata_;
508   grpc_slice error_message_slice_;
509 };
510 
511 class CallOpRecvInitialMetadata {
512  public:
CallOpRecvInitialMetadata()513   CallOpRecvInitialMetadata() : metadata_map_(nullptr) {}
514 
RecvInitialMetadata(ClientContext * context)515   void RecvInitialMetadata(ClientContext* context) {
516     context->initial_metadata_received_ = true;
517     metadata_map_ = &context->recv_initial_metadata_;
518   }
519 
520  protected:
AddOp(grpc_op * ops,size_t * nops)521   void AddOp(grpc_op* ops, size_t* nops) {
522     if (metadata_map_ == nullptr) return;
523     grpc_op* op = &ops[(*nops)++];
524     op->op = GRPC_OP_RECV_INITIAL_METADATA;
525     op->data.recv_initial_metadata.recv_initial_metadata = metadata_map_->arr();
526     op->flags = 0;
527     op->reserved = NULL;
528   }
529 
FinishOp(bool * status)530   void FinishOp(bool* status) {
531     if (metadata_map_ == nullptr) return;
532     metadata_map_ = nullptr;
533   }
534 
535  private:
536   MetadataMap* metadata_map_;
537 };
538 
539 class CallOpClientRecvStatus {
540  public:
CallOpClientRecvStatus()541   CallOpClientRecvStatus()
542       : recv_status_(nullptr), debug_error_string_(nullptr) {}
543 
ClientRecvStatus(ClientContext * context,Status * status)544   void ClientRecvStatus(ClientContext* context, Status* status) {
545     client_context_ = context;
546     metadata_map_ = &client_context_->trailing_metadata_;
547     recv_status_ = status;
548     error_message_ = g_core_codegen_interface->grpc_empty_slice();
549   }
550 
551  protected:
AddOp(grpc_op * ops,size_t * nops)552   void AddOp(grpc_op* ops, size_t* nops) {
553     if (recv_status_ == nullptr) return;
554     grpc_op* op = &ops[(*nops)++];
555     op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
556     op->data.recv_status_on_client.trailing_metadata = metadata_map_->arr();
557     op->data.recv_status_on_client.status = &status_code_;
558     op->data.recv_status_on_client.status_details = &error_message_;
559     op->data.recv_status_on_client.error_string = &debug_error_string_;
560     op->flags = 0;
561     op->reserved = NULL;
562   }
563 
FinishOp(bool * status)564   void FinishOp(bool* status) {
565     if (recv_status_ == nullptr) return;
566     grpc::string binary_error_details = metadata_map_->GetBinaryErrorDetails();
567     *recv_status_ =
568         Status(static_cast<StatusCode>(status_code_),
569                GRPC_SLICE_IS_EMPTY(error_message_)
570                    ? grpc::string()
571                    : grpc::string(GRPC_SLICE_START_PTR(error_message_),
572                                   GRPC_SLICE_END_PTR(error_message_)),
573                binary_error_details);
574     client_context_->set_debug_error_string(
575         debug_error_string_ != nullptr ? debug_error_string_ : "");
576     g_core_codegen_interface->grpc_slice_unref(error_message_);
577     if (debug_error_string_ != nullptr) {
578       g_core_codegen_interface->gpr_free((void*)debug_error_string_);
579     }
580     recv_status_ = nullptr;
581   }
582 
583  private:
584   ClientContext* client_context_;
585   MetadataMap* metadata_map_;
586   Status* recv_status_;
587   const char* debug_error_string_;
588   grpc_status_code status_code_;
589   grpc_slice error_message_;
590 };
591 
592 /// An abstract collection of call ops, used to generate the
593 /// grpc_call_op structure to pass down to the lower layers,
594 /// and as it is-a CompletionQueueTag, also massages the final
595 /// completion into the correct form for consumption in the C++
596 /// API.
597 class CallOpSetInterface : public CompletionQueueTag {
598  public:
599   /// Fills in grpc_op, starting from ops[*nops] and moving
600   /// upwards.
601   virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0;
602 
603   /// Get the tag to be used at the core completion queue. Generally, the
604   /// value of cq_tag will be "this". However, it can be overridden if we
605   /// want core to process the tag differently (e.g., as a core callback)
606   virtual void* cq_tag() = 0;
607 };
608 
609 /// Primary implementation of CallOpSetInterface.
610 /// Since we cannot use variadic templates, we declare slots up to
611 /// the maximum count of ops we'll need in a set. We leverage the
612 /// empty base class optimization to slim this class (especially
613 /// when there are many unused slots used). To avoid duplicate base classes,
614 /// the template parmeter for CallNoOp is varied by argument position.
615 template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
616           class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
617           class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
618 class CallOpSet : public CallOpSetInterface,
619                   public Op1,
620                   public Op2,
621                   public Op3,
622                   public Op4,
623                   public Op5,
624                   public Op6 {
625  public:
CallOpSet()626   CallOpSet() : cq_tag_(this), return_tag_(this), call_(nullptr) {}
FillOps(grpc_call * call,grpc_op * ops,size_t * nops)627   void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override {
628     this->Op1::AddOp(ops, nops);
629     this->Op2::AddOp(ops, nops);
630     this->Op3::AddOp(ops, nops);
631     this->Op4::AddOp(ops, nops);
632     this->Op5::AddOp(ops, nops);
633     this->Op6::AddOp(ops, nops);
634     g_core_codegen_interface->grpc_call_ref(call);
635     call_ = call;
636   }
637 
FinalizeResult(void ** tag,bool * status)638   bool FinalizeResult(void** tag, bool* status) override {
639     this->Op1::FinishOp(status);
640     this->Op2::FinishOp(status);
641     this->Op3::FinishOp(status);
642     this->Op4::FinishOp(status);
643     this->Op5::FinishOp(status);
644     this->Op6::FinishOp(status);
645     *tag = return_tag_;
646 
647     g_core_codegen_interface->grpc_call_unref(call_);
648     return true;
649   }
650 
set_output_tag(void * return_tag)651   void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
652 
cq_tag()653   void* cq_tag() override { return cq_tag_; }
654 
655   /// set_cq_tag is used to provide a different core CQ tag than "this".
656   /// This is used for callback-based tags, where the core tag is the core
657   /// callback function. It does not change the use or behavior of any other
658   /// function (such as FinalizeResult)
set_cq_tag(void * cq_tag)659   void set_cq_tag(void* cq_tag) { cq_tag_ = cq_tag; }
660 
661  private:
662   void* cq_tag_;
663   void* return_tag_;
664   grpc_call* call_;
665 };
666 
667 /// Straightforward wrapping of the C call object
668 class Call final {
669  public:
670   /** call is owned by the caller */
Call(grpc_call * call,CallHook * call_hook,CompletionQueue * cq)671   Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)
672       : call_hook_(call_hook),
673         cq_(cq),
674         call_(call),
675         max_receive_message_size_(-1) {}
676 
Call(grpc_call * call,CallHook * call_hook,CompletionQueue * cq,int max_receive_message_size)677   Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq,
678        int max_receive_message_size)
679       : call_hook_(call_hook),
680         cq_(cq),
681         call_(call),
682         max_receive_message_size_(max_receive_message_size) {}
683 
PerformOps(CallOpSetInterface * ops)684   void PerformOps(CallOpSetInterface* ops) {
685     call_hook_->PerformOpsOnCall(ops, this);
686   }
687 
call()688   grpc_call* call() const { return call_; }
cq()689   CompletionQueue* cq() const { return cq_; }
690 
max_receive_message_size()691   int max_receive_message_size() const { return max_receive_message_size_; }
692 
693  private:
694   CallHook* call_hook_;
695   CompletionQueue* cq_;
696   grpc_call* call_;
697   int max_receive_message_size_;
698 };
699 }  // namespace internal
700 }  // namespace grpc
701 
702 #endif  // GRPCPP_IMPL_CODEGEN_CALL_H
703