• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
20 #include <atomic>
21 #include <functional>
22 
23 #include <grpcpp/impl/codegen/call.h>
24 #include <grpcpp/impl/codegen/call_op_set.h>
25 #include <grpcpp/impl/codegen/callback_common.h>
26 #include <grpcpp/impl/codegen/channel_interface.h>
27 #include <grpcpp/impl/codegen/config.h>
28 #include <grpcpp/impl/codegen/core_codegen_interface.h>
29 #include <grpcpp/impl/codegen/status.h>
30 
31 namespace grpc {
32 namespace internal {
33 class RpcMethod;
34 }  // namespace internal
35 }  // namespace grpc
36 
37 namespace grpc_impl {
38 class Channel;
39 class ClientContext;
40 
41 namespace internal {
42 
43 /// Perform a callback-based unary call
44 /// TODO(vjpai): Combine as much as possible with the blocking unary call code
45 template <class InputMessage, class OutputMessage>
CallbackUnaryCall(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (::grpc::Status)> on_completion)46 void CallbackUnaryCall(::grpc::ChannelInterface* channel,
47                        const ::grpc::internal::RpcMethod& method,
48                        ::grpc_impl::ClientContext* context,
49                        const InputMessage* request, OutputMessage* result,
50                        std::function<void(::grpc::Status)> on_completion) {
51   CallbackUnaryCallImpl<InputMessage, OutputMessage> x(
52       channel, method, context, request, result, on_completion);
53 }
54 
55 template <class InputMessage, class OutputMessage>
56 class CallbackUnaryCallImpl {
57  public:
CallbackUnaryCallImpl(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (::grpc::Status)> on_completion)58   CallbackUnaryCallImpl(::grpc::ChannelInterface* channel,
59                         const ::grpc::internal::RpcMethod& method,
60                         ::grpc_impl::ClientContext* context,
61                         const InputMessage* request, OutputMessage* result,
62                         std::function<void(::grpc::Status)> on_completion) {
63     ::grpc_impl::CompletionQueue* cq = channel->CallbackCQ();
64     GPR_CODEGEN_ASSERT(cq != nullptr);
65     grpc::internal::Call call(channel->CreateCall(method, context, cq));
66 
67     using FullCallOpSet = grpc::internal::CallOpSet<
68         ::grpc::internal::CallOpSendInitialMetadata,
69         grpc::internal::CallOpSendMessage,
70         grpc::internal::CallOpRecvInitialMetadata,
71         grpc::internal::CallOpRecvMessage<OutputMessage>,
72         grpc::internal::CallOpClientSendClose,
73         grpc::internal::CallOpClientRecvStatus>;
74 
75     struct OpSetAndTag {
76       FullCallOpSet opset;
77       grpc::internal::CallbackWithStatusTag tag;
78     };
79     const size_t alloc_sz = sizeof(OpSetAndTag);
80     auto* const alloced = static_cast<OpSetAndTag*>(
81         ::grpc::g_core_codegen_interface->grpc_call_arena_alloc(call.call(),
82                                                                 alloc_sz));
83     auto* ops = new (&alloced->opset) FullCallOpSet;
84     auto* tag = new (&alloced->tag)
85         grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
86 
87     // TODO(vjpai): Unify code with sync API as much as possible
88     ::grpc::Status s = ops->SendMessagePtr(request);
89     if (!s.ok()) {
90       tag->force_run(s);
91       return;
92     }
93     ops->SendInitialMetadata(&context->send_initial_metadata_,
94                              context->initial_metadata_flags());
95     ops->RecvInitialMetadata(context);
96     ops->RecvMessage(result);
97     ops->AllowNoMessage();
98     ops->ClientSendClose();
99     ops->ClientRecvStatus(context, tag->status_ptr());
100     ops->set_core_cq_tag(tag);
101     call.PerformOps(ops);
102   }
103 };
104 
105 // Base class for public API classes.
106 class ClientReactor {
107  public:
108   /// Called by the library when all operations associated with this RPC have
109   /// completed and all Holds have been removed. OnDone provides the RPC status
110   /// outcome for both successful and failed RPCs. If it is never called on an
111   /// RPC, it indicates an application-level problem (like failure to remove a
112   /// hold).
113   ///
114   /// \param[in] s The status outcome of this RPC
115   virtual void OnDone(const ::grpc::Status& /*s*/) = 0;
116 
117   /// InternalScheduleOnDone is not part of the API and is not meant to be
118   /// overridden. It is virtual to allow successful builds for certain bazel
119   /// build users that only want to depend on gRPC codegen headers and not the
120   /// full library (although this is not a generally-supported option). Although
121   /// the virtual call is slower than a direct call, this function is
122   /// heavyweight and the cost of the virtual call is not much in comparison.
123   /// This function may be removed or devirtualized in the future.
124   virtual void InternalScheduleOnDone(::grpc::Status s);
125 };
126 
127 }  // namespace internal
128 
129 // Forward declarations
130 template <class Request, class Response>
131 class ClientBidiReactor;
132 template <class Response>
133 class ClientReadReactor;
134 template <class Request>
135 class ClientWriteReactor;
136 class ClientUnaryReactor;
137 
138 // NOTE: The streaming objects are not actually implemented in the public API.
139 //       These interfaces are provided for mocking only. Typical applications
140 //       will interact exclusively with the reactors that they define.
141 template <class Request, class Response>
142 class ClientCallbackReaderWriter {
143  public:
~ClientCallbackReaderWriter()144   virtual ~ClientCallbackReaderWriter() {}
145   virtual void StartCall() = 0;
146   virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
147   virtual void WritesDone() = 0;
148   virtual void Read(Response* resp) = 0;
149   virtual void AddHold(int holds) = 0;
150   virtual void RemoveHold() = 0;
151 
152  protected:
BindReactor(ClientBidiReactor<Request,Response> * reactor)153   void BindReactor(ClientBidiReactor<Request, Response>* reactor) {
154     reactor->BindStream(this);
155   }
156 };
157 
158 template <class Response>
159 class ClientCallbackReader {
160  public:
~ClientCallbackReader()161   virtual ~ClientCallbackReader() {}
162   virtual void StartCall() = 0;
163   virtual void Read(Response* resp) = 0;
164   virtual void AddHold(int holds) = 0;
165   virtual void RemoveHold() = 0;
166 
167  protected:
BindReactor(ClientReadReactor<Response> * reactor)168   void BindReactor(ClientReadReactor<Response>* reactor) {
169     reactor->BindReader(this);
170   }
171 };
172 
173 template <class Request>
174 class ClientCallbackWriter {
175  public:
~ClientCallbackWriter()176   virtual ~ClientCallbackWriter() {}
177   virtual void StartCall() = 0;
Write(const Request * req)178   void Write(const Request* req) { Write(req, ::grpc::WriteOptions()); }
179   virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
WriteLast(const Request * req,::grpc::WriteOptions options)180   void WriteLast(const Request* req, ::grpc::WriteOptions options) {
181     Write(req, options.set_last_message());
182   }
183   virtual void WritesDone() = 0;
184 
185   virtual void AddHold(int holds) = 0;
186   virtual void RemoveHold() = 0;
187 
188  protected:
BindReactor(ClientWriteReactor<Request> * reactor)189   void BindReactor(ClientWriteReactor<Request>* reactor) {
190     reactor->BindWriter(this);
191   }
192 };
193 
194 class ClientCallbackUnary {
195  public:
~ClientCallbackUnary()196   virtual ~ClientCallbackUnary() {}
197   virtual void StartCall() = 0;
198 
199  protected:
200   void BindReactor(ClientUnaryReactor* reactor);
201 };
202 
203 // The following classes are the reactor interfaces that are to be implemented
204 // by the user. They are passed in to the library as an argument to a call on a
205 // stub (either a codegen-ed call or a generic call). The streaming RPC is
206 // activated by calling StartCall, possibly after initiating StartRead,
207 // StartWrite, or AddHold operations on the streaming object. Note that none of
208 // the classes are pure; all reactions have a default empty reaction so that the
209 // user class only needs to override those classes that it cares about.
210 // The reactor must be passed to the stub invocation before any of the below
211 // operations can be called.
212 
213 /// \a ClientBidiReactor is the interface for a bidirectional streaming RPC.
214 template <class Request, class Response>
215 class ClientBidiReactor : public internal::ClientReactor {
216  public:
~ClientBidiReactor()217   virtual ~ClientBidiReactor() {}
218 
219   /// Activate the RPC and initiate any reads or writes that have been Start'ed
220   /// before this call. All streaming RPCs issued by the client MUST have
221   /// StartCall invoked on them (even if they are canceled) as this call is the
222   /// activation of their lifecycle.
StartCall()223   void StartCall() { stream_->StartCall(); }
224 
225   /// Initiate a read operation (or post it for later initiation if StartCall
226   /// has not yet been invoked).
227   ///
228   /// \param[out] resp Where to eventually store the read message. Valid when
229   ///                  the library calls OnReadDone
StartRead(Response * resp)230   void StartRead(Response* resp) { stream_->Read(resp); }
231 
232   /// Initiate a write operation (or post it for later initiation if StartCall
233   /// has not yet been invoked).
234   ///
235   /// \param[in] req The message to be written. The library does not take
236   ///                ownership but the caller must ensure that the message is
237   ///                not deleted or modified until OnWriteDone is called.
StartWrite(const Request * req)238   void StartWrite(const Request* req) {
239     StartWrite(req, ::grpc::WriteOptions());
240   }
241 
242   /// Initiate/post a write operation with specified options.
243   ///
244   /// \param[in] req The message to be written. The library does not take
245   ///                ownership but the caller must ensure that the message is
246   ///                not deleted or modified until OnWriteDone is called.
247   /// \param[in] options The WriteOptions to use for writing this message
StartWrite(const Request * req,::grpc::WriteOptions options)248   void StartWrite(const Request* req, ::grpc::WriteOptions options) {
249     stream_->Write(req, std::move(options));
250   }
251 
252   /// Initiate/post a write operation with specified options and an indication
253   /// that this is the last write (like StartWrite and StartWritesDone, merged).
254   /// Note that calling this means that no more calls to StartWrite,
255   /// StartWriteLast, or StartWritesDone are allowed.
256   ///
257   /// \param[in] req The message to be written. The library does not take
258   ///                ownership but the caller must ensure that the message is
259   ///                not deleted or modified until OnWriteDone is called.
260   /// \param[in] options The WriteOptions to use for writing this message
StartWriteLast(const Request * req,::grpc::WriteOptions options)261   void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
262     StartWrite(req, std::move(options.set_last_message()));
263   }
264 
265   /// Indicate that the RPC will have no more write operations. This can only be
266   /// issued once for a given RPC. This is not required or allowed if
267   /// StartWriteLast is used since that already has the same implication.
268   /// Note that calling this means that no more calls to StartWrite,
269   /// StartWriteLast, or StartWritesDone are allowed.
StartWritesDone()270   void StartWritesDone() { stream_->WritesDone(); }
271 
272   /// Holds are needed if (and only if) this stream has operations that take
273   /// place on it after StartCall but from outside one of the reactions
274   /// (OnReadDone, etc). This is _not_ a common use of the streaming API.
275   ///
276   /// Holds must be added before calling StartCall. If a stream still has a hold
277   /// in place, its resources will not be destroyed even if the status has
278   /// already come in from the wire and there are currently no active callbacks
279   /// outstanding. Similarly, the stream will not call OnDone if there are still
280   /// holds on it.
281   ///
282   /// For example, if a StartRead or StartWrite operation is going to be
283   /// initiated from elsewhere in the application, the application should call
284   /// AddHold or AddMultipleHolds before StartCall.  If there is going to be,
285   /// for example, a read-flow and a write-flow taking place outside the
286   /// reactions, then call AddMultipleHolds(2) before StartCall. When the
287   /// application knows that it won't issue any more read operations (such as
288   /// when a read comes back as not ok), it should issue a RemoveHold(). It
289   /// should also call RemoveHold() again after it does StartWriteLast or
290   /// StartWritesDone that indicates that there will be no more write ops.
291   /// The number of RemoveHold calls must match the total number of AddHold
292   /// calls plus the number of holds added by AddMultipleHolds.
293   /// The argument to AddMultipleHolds must be positive.
AddHold()294   void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)295   void AddMultipleHolds(int holds) {
296     GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
297     stream_->AddHold(holds);
298   }
RemoveHold()299   void RemoveHold() { stream_->RemoveHold(); }
300 
301   /// Notifies the application that all operations associated with this RPC
302   /// have completed and all Holds have been removed. OnDone provides the RPC
303   /// status outcome for both successful and failed RPCs and will be called in
304   /// all cases. If it is not called, it indicates an application-level problem
305   /// (like failure to remove a hold).
306   ///
307   /// \param[in] s The status outcome of this RPC
OnDone(const::grpc::Status &)308   void OnDone(const ::grpc::Status& /*s*/) override {}
309 
310   /// Notifies the application that a read of initial metadata from the
311   /// server is done. If the application chooses not to implement this method,
312   /// it can assume that the initial metadata has been read before the first
313   /// call of OnReadDone or OnDone.
314   ///
315   /// \param[in] ok Was the initial metadata read successfully? If false, no
316   ///               new read/write operation will succeed, and any further
317   ///               Start* operations should not be called.
OnReadInitialMetadataDone(bool)318   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
319 
320   /// Notifies the application that a StartRead operation completed.
321   ///
322   /// \param[in] ok Was it successful? If false, no new read/write operation
323   ///               will succeed, and any further Start* should not be called.
OnReadDone(bool)324   virtual void OnReadDone(bool /*ok*/) {}
325 
326   /// Notifies the application that a StartWrite or StartWriteLast operation
327   /// completed.
328   ///
329   /// \param[in] ok Was it successful? If false, no new read/write operation
330   ///               will succeed, and any further Start* should not be called.
OnWriteDone(bool)331   virtual void OnWriteDone(bool /*ok*/) {}
332 
333   /// Notifies the application that a StartWritesDone operation completed. Note
334   /// that this is only used on explicit StartWritesDone operations and not for
335   /// those that are implicitly invoked as part of a StartWriteLast.
336   ///
337   /// \param[in] ok Was it successful? If false, the application will later see
338   ///               the failure reflected as a bad status in OnDone and no
339   ///               further Start* should be called.
OnWritesDoneDone(bool)340   virtual void OnWritesDoneDone(bool /*ok*/) {}
341 
342  private:
343   friend class ClientCallbackReaderWriter<Request, Response>;
BindStream(ClientCallbackReaderWriter<Request,Response> * stream)344   void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
345     stream_ = stream;
346   }
347   ClientCallbackReaderWriter<Request, Response>* stream_;
348 };
349 
350 /// \a ClientReadReactor is the interface for a server-streaming RPC.
351 /// All public methods behave as in ClientBidiReactor.
352 template <class Response>
353 class ClientReadReactor : public internal::ClientReactor {
354  public:
~ClientReadReactor()355   virtual ~ClientReadReactor() {}
356 
StartCall()357   void StartCall() { reader_->StartCall(); }
StartRead(Response * resp)358   void StartRead(Response* resp) { reader_->Read(resp); }
359 
AddHold()360   void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)361   void AddMultipleHolds(int holds) {
362     GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
363     reader_->AddHold(holds);
364   }
RemoveHold()365   void RemoveHold() { reader_->RemoveHold(); }
366 
OnDone(const::grpc::Status &)367   void OnDone(const ::grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)368   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnReadDone(bool)369   virtual void OnReadDone(bool /*ok*/) {}
370 
371  private:
372   friend class ClientCallbackReader<Response>;
BindReader(ClientCallbackReader<Response> * reader)373   void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
374   ClientCallbackReader<Response>* reader_;
375 };
376 
377 /// \a ClientWriteReactor is the interface for a client-streaming RPC.
378 /// All public methods behave as in ClientBidiReactor.
379 template <class Request>
380 class ClientWriteReactor : public internal::ClientReactor {
381  public:
~ClientWriteReactor()382   virtual ~ClientWriteReactor() {}
383 
StartCall()384   void StartCall() { writer_->StartCall(); }
StartWrite(const Request * req)385   void StartWrite(const Request* req) {
386     StartWrite(req, ::grpc::WriteOptions());
387   }
StartWrite(const Request * req,::grpc::WriteOptions options)388   void StartWrite(const Request* req, ::grpc::WriteOptions options) {
389     writer_->Write(req, std::move(options));
390   }
StartWriteLast(const Request * req,::grpc::WriteOptions options)391   void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
392     StartWrite(req, std::move(options.set_last_message()));
393   }
StartWritesDone()394   void StartWritesDone() { writer_->WritesDone(); }
395 
AddHold()396   void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)397   void AddMultipleHolds(int holds) {
398     GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
399     writer_->AddHold(holds);
400   }
RemoveHold()401   void RemoveHold() { writer_->RemoveHold(); }
402 
OnDone(const::grpc::Status &)403   void OnDone(const ::grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)404   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnWriteDone(bool)405   virtual void OnWriteDone(bool /*ok*/) {}
OnWritesDoneDone(bool)406   virtual void OnWritesDoneDone(bool /*ok*/) {}
407 
408  private:
409   friend class ClientCallbackWriter<Request>;
BindWriter(ClientCallbackWriter<Request> * writer)410   void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
411 
412   ClientCallbackWriter<Request>* writer_;
413 };
414 
415 /// \a ClientUnaryReactor is a reactor-style interface for a unary RPC.
416 /// This is _not_ a common way of invoking a unary RPC. In practice, this
417 /// option should be used only if the unary RPC wants to receive initial
418 /// metadata without waiting for the response to complete. Most deployments of
419 /// RPC systems do not use this option, but it is needed for generality.
420 /// All public methods behave as in ClientBidiReactor.
421 /// StartCall is included for consistency with the other reactor flavors: even
422 /// though there are no StartRead or StartWrite operations to queue before the
423 /// call (that is part of the unary call itself) and there is no reactor object
424 /// being created as a result of this call, we keep a consistent 2-phase
425 /// initiation API among all the reactor flavors.
426 class ClientUnaryReactor : public internal::ClientReactor {
427  public:
~ClientUnaryReactor()428   virtual ~ClientUnaryReactor() {}
429 
StartCall()430   void StartCall() { call_->StartCall(); }
OnDone(const::grpc::Status &)431   void OnDone(const ::grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)432   virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
433 
434  private:
435   friend class ClientCallbackUnary;
BindCall(ClientCallbackUnary * call)436   void BindCall(ClientCallbackUnary* call) { call_ = call; }
437   ClientCallbackUnary* call_;
438 };
439 
440 // Define function out-of-line from class to avoid forward declaration issue
BindReactor(ClientUnaryReactor * reactor)441 inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) {
442   reactor->BindCall(this);
443 }
444 
445 namespace internal {
446 
447 // Forward declare factory classes for friendship
448 template <class Request, class Response>
449 class ClientCallbackReaderWriterFactory;
450 template <class Response>
451 class ClientCallbackReaderFactory;
452 template <class Request>
453 class ClientCallbackWriterFactory;
454 
455 template <class Request, class Response>
456 class ClientCallbackReaderWriterImpl
457     : public ClientCallbackReaderWriter<Request, Response> {
458  public:
459   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)460   static void operator delete(void* /*ptr*/, std::size_t size) {
461     GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderWriterImpl));
462   }
463 
464   // This operator should never be called as the memory should be freed as part
465   // of the arena destruction. It only exists to provide a matching operator
466   // delete to the operator new so that some compilers will not complain (see
467   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
468   // there are no tests catching the compiler warning.
delete(void *,void *)469   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
470 
StartCall()471   void StartCall() override {
472     // This call initiates two batches, plus any backlog, each with a callback
473     // 1. Send initial metadata (unless corked) + recv initial metadata
474     // 2. Any read backlog
475     // 3. Any write backlog
476     // 4. Recv trailing metadata (unless corked)
477     if (!start_corked_) {
478       start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
479                                      context_->initial_metadata_flags());
480     }
481 
482     call_.PerformOps(&start_ops_);
483 
484     {
485       grpc::internal::MutexLock lock(&start_mu_);
486 
487       if (backlog_.read_ops) {
488         call_.PerformOps(&read_ops_);
489       }
490       if (backlog_.write_ops) {
491         call_.PerformOps(&write_ops_);
492       }
493       if (backlog_.writes_done_ops) {
494         call_.PerformOps(&writes_done_ops_);
495       }
496       call_.PerformOps(&finish_ops_);
497       // The last thing in this critical section is to set started_ so that it
498       // can be used lock-free as well.
499       started_.store(true, std::memory_order_release);
500     }
501     // MaybeFinish outside the lock to make sure that destruction of this object
502     // doesn't take place while holding the lock (which would cause the lock to
503     // be released after destruction)
504     this->MaybeFinish(/*from_reaction=*/false);
505   }
506 
Read(Response * msg)507   void Read(Response* msg) override {
508     read_ops_.RecvMessage(msg);
509     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
510     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
511       grpc::internal::MutexLock lock(&start_mu_);
512       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
513         backlog_.read_ops = true;
514         return;
515       }
516     }
517     call_.PerformOps(&read_ops_);
518   }
519 
Write(const Request * msg,::grpc::WriteOptions options)520   void Write(const Request* msg, ::grpc::WriteOptions options) override {
521     if (options.is_last_message()) {
522       options.set_buffer_hint();
523       write_ops_.ClientSendClose();
524     }
525     // TODO(vjpai): don't assert
526     GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
527     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
528     if (GPR_UNLIKELY(corked_write_needed_)) {
529       write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
530                                      context_->initial_metadata_flags());
531       corked_write_needed_ = false;
532     }
533 
534     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
535       grpc::internal::MutexLock lock(&start_mu_);
536       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
537         backlog_.write_ops = true;
538         return;
539       }
540     }
541     call_.PerformOps(&write_ops_);
542   }
WritesDone()543   void WritesDone() override {
544     writes_done_ops_.ClientSendClose();
545     writes_done_tag_.Set(call_.call(),
546                          [this](bool ok) {
547                            reactor_->OnWritesDoneDone(ok);
548                            MaybeFinish(/*from_reaction=*/true);
549                          },
550                          &writes_done_ops_, /*can_inline=*/false);
551     writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
552     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
553     if (GPR_UNLIKELY(corked_write_needed_)) {
554       writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
555                                            context_->initial_metadata_flags());
556       corked_write_needed_ = false;
557     }
558     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
559       grpc::internal::MutexLock lock(&start_mu_);
560       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
561         backlog_.writes_done_ops = true;
562         return;
563       }
564     }
565     call_.PerformOps(&writes_done_ops_);
566   }
567 
AddHold(int holds)568   void AddHold(int holds) override {
569     callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
570   }
RemoveHold()571   void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
572 
573  private:
574   friend class ClientCallbackReaderWriterFactory<Request, Response>;
575 
ClientCallbackReaderWriterImpl(grpc::internal::Call call,::grpc_impl::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)576   ClientCallbackReaderWriterImpl(grpc::internal::Call call,
577                                  ::grpc_impl::ClientContext* context,
578                                  ClientBidiReactor<Request, Response>* reactor)
579       : context_(context),
580         call_(call),
581         reactor_(reactor),
582         start_corked_(context_->initial_metadata_corked_),
583         corked_write_needed_(start_corked_) {
584     this->BindReactor(reactor);
585 
586     // Set up the unchanging parts of the start, read, and write tags and ops.
587     start_tag_.Set(call_.call(),
588                    [this](bool ok) {
589                      reactor_->OnReadInitialMetadataDone(ok);
590                      MaybeFinish(/*from_reaction=*/true);
591                    },
592                    &start_ops_, /*can_inline=*/false);
593     start_ops_.RecvInitialMetadata(context_);
594     start_ops_.set_core_cq_tag(&start_tag_);
595 
596     write_tag_.Set(call_.call(),
597                    [this](bool ok) {
598                      reactor_->OnWriteDone(ok);
599                      MaybeFinish(/*from_reaction=*/true);
600                    },
601                    &write_ops_, /*can_inline=*/false);
602     write_ops_.set_core_cq_tag(&write_tag_);
603 
604     read_tag_.Set(call_.call(),
605                   [this](bool ok) {
606                     reactor_->OnReadDone(ok);
607                     MaybeFinish(/*from_reaction=*/true);
608                   },
609                   &read_ops_, /*can_inline=*/false);
610     read_ops_.set_core_cq_tag(&read_tag_);
611 
612     // Also set up the Finish tag and op set.
613     finish_tag_.Set(
614         call_.call(),
615         [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
616         &finish_ops_,
617         /*can_inline=*/false);
618     finish_ops_.ClientRecvStatus(context_, &finish_status_);
619     finish_ops_.set_core_cq_tag(&finish_tag_);
620   }
621 
622   // MaybeFinish can be called from reactions or from user-initiated operations
623   // like StartCall or RemoveHold. If this is the last operation or hold on this
624   // object, it will invoke the OnDone reaction. If MaybeFinish was called from
625   // a reaction, it can call OnDone directly. If not, it would need to schedule
626   // OnDone onto an executor thread to avoid the possibility of deadlocking with
627   // any locks in the user code that invoked it.
MaybeFinish(bool from_reaction)628   void MaybeFinish(bool from_reaction) {
629     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
630                          1, std::memory_order_acq_rel) == 1)) {
631       ::grpc::Status s = std::move(finish_status_);
632       auto* reactor = reactor_;
633       auto* call = call_.call();
634       this->~ClientCallbackReaderWriterImpl();
635       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
636       if (GPR_LIKELY(from_reaction)) {
637         reactor->OnDone(s);
638       } else {
639         reactor->InternalScheduleOnDone(std::move(s));
640       }
641     }
642   }
643 
644   ::grpc_impl::ClientContext* const context_;
645   grpc::internal::Call call_;
646   ClientBidiReactor<Request, Response>* const reactor_;
647 
648   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
649                             grpc::internal::CallOpRecvInitialMetadata>
650       start_ops_;
651   grpc::internal::CallbackWithSuccessTag start_tag_;
652   const bool start_corked_;
653   bool corked_write_needed_;  // no lock needed since only accessed in
654                               // Write/WritesDone which cannot be concurrent
655 
656   grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
657   grpc::internal::CallbackWithSuccessTag finish_tag_;
658   ::grpc::Status finish_status_;
659 
660   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
661                             grpc::internal::CallOpSendMessage,
662                             grpc::internal::CallOpClientSendClose>
663       write_ops_;
664   grpc::internal::CallbackWithSuccessTag write_tag_;
665 
666   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
667                             grpc::internal::CallOpClientSendClose>
668       writes_done_ops_;
669   grpc::internal::CallbackWithSuccessTag writes_done_tag_;
670 
671   grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
672       read_ops_;
673   grpc::internal::CallbackWithSuccessTag read_tag_;
674 
675   struct StartCallBacklog {
676     bool write_ops = false;
677     bool writes_done_ops = false;
678     bool read_ops = false;
679   };
680   StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
681 
682   // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
683   std::atomic<intptr_t> callbacks_outstanding_{3};
684   std::atomic_bool started_{false};
685   grpc::internal::Mutex start_mu_;
686 };
687 
688 template <class Request, class Response>
689 class ClientCallbackReaderWriterFactory {
690  public:
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)691   static void Create(::grpc::ChannelInterface* channel,
692                      const ::grpc::internal::RpcMethod& method,
693                      ::grpc_impl::ClientContext* context,
694                      ClientBidiReactor<Request, Response>* reactor) {
695     grpc::internal::Call call =
696         channel->CreateCall(method, context, channel->CallbackCQ());
697 
698     ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
699     new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
700         call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
701         ClientCallbackReaderWriterImpl<Request, Response>(call, context,
702                                                           reactor);
703   }
704 };
705 
706 template <class Response>
707 class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
708  public:
709   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)710   static void operator delete(void* /*ptr*/, std::size_t size) {
711     GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderImpl));
712   }
713 
714   // This operator should never be called as the memory should be freed as part
715   // of the arena destruction. It only exists to provide a matching operator
716   // delete to the operator new so that some compilers will not complain (see
717   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
718   // there are no tests catching the compiler warning.
delete(void *,void *)719   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
720 
StartCall()721   void StartCall() override {
722     // This call initiates two batches, plus any backlog, each with a callback
723     // 1. Send initial metadata (unless corked) + recv initial metadata
724     // 2. Any backlog
725     // 3. Recv trailing metadata
726 
727     start_tag_.Set(call_.call(),
728                    [this](bool ok) {
729                      reactor_->OnReadInitialMetadataDone(ok);
730                      MaybeFinish(/*from_reaction=*/true);
731                    },
732                    &start_ops_, /*can_inline=*/false);
733     start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
734                                    context_->initial_metadata_flags());
735     start_ops_.RecvInitialMetadata(context_);
736     start_ops_.set_core_cq_tag(&start_tag_);
737     call_.PerformOps(&start_ops_);
738 
739     // Also set up the read tag so it doesn't have to be set up each time
740     read_tag_.Set(call_.call(),
741                   [this](bool ok) {
742                     reactor_->OnReadDone(ok);
743                     MaybeFinish(/*from_reaction=*/true);
744                   },
745                   &read_ops_, /*can_inline=*/false);
746     read_ops_.set_core_cq_tag(&read_tag_);
747 
748     {
749       grpc::internal::MutexLock lock(&start_mu_);
750       if (backlog_.read_ops) {
751         call_.PerformOps(&read_ops_);
752       }
753       started_.store(true, std::memory_order_release);
754     }
755 
756     finish_tag_.Set(
757         call_.call(),
758         [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
759         &finish_ops_, /*can_inline=*/false);
760     finish_ops_.ClientRecvStatus(context_, &finish_status_);
761     finish_ops_.set_core_cq_tag(&finish_tag_);
762     call_.PerformOps(&finish_ops_);
763   }
764 
Read(Response * msg)765   void Read(Response* msg) override {
766     read_ops_.RecvMessage(msg);
767     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
768     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
769       grpc::internal::MutexLock lock(&start_mu_);
770       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
771         backlog_.read_ops = true;
772         return;
773       }
774     }
775     call_.PerformOps(&read_ops_);
776   }
777 
AddHold(int holds)778   void AddHold(int holds) override {
779     callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
780   }
RemoveHold()781   void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
782 
783  private:
784   friend class ClientCallbackReaderFactory<Response>;
785 
786   template <class Request>
ClientCallbackReaderImpl(::grpc::internal::Call call,::grpc_impl::ClientContext * context,Request * request,ClientReadReactor<Response> * reactor)787   ClientCallbackReaderImpl(::grpc::internal::Call call,
788                            ::grpc_impl::ClientContext* context,
789                            Request* request,
790                            ClientReadReactor<Response>* reactor)
791       : context_(context), call_(call), reactor_(reactor) {
792     this->BindReactor(reactor);
793     // TODO(vjpai): don't assert
794     GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
795     start_ops_.ClientSendClose();
796   }
797 
798   // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)799   void MaybeFinish(bool from_reaction) {
800     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
801                          1, std::memory_order_acq_rel) == 1)) {
802       ::grpc::Status s = std::move(finish_status_);
803       auto* reactor = reactor_;
804       auto* call = call_.call();
805       this->~ClientCallbackReaderImpl();
806       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
807       if (GPR_LIKELY(from_reaction)) {
808         reactor->OnDone(s);
809       } else {
810         reactor->InternalScheduleOnDone(std::move(s));
811       }
812     }
813   }
814 
815   ::grpc_impl::ClientContext* const context_;
816   grpc::internal::Call call_;
817   ClientReadReactor<Response>* const reactor_;
818 
819   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
820                             grpc::internal::CallOpSendMessage,
821                             grpc::internal::CallOpClientSendClose,
822                             grpc::internal::CallOpRecvInitialMetadata>
823       start_ops_;
824   grpc::internal::CallbackWithSuccessTag start_tag_;
825 
826   grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
827   grpc::internal::CallbackWithSuccessTag finish_tag_;
828   ::grpc::Status finish_status_;
829 
830   grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
831       read_ops_;
832   grpc::internal::CallbackWithSuccessTag read_tag_;
833 
834   struct StartCallBacklog {
835     bool read_ops = false;
836   };
837   StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
838 
839   // Minimum of 2 callbacks to pre-register for start and finish
840   std::atomic<intptr_t> callbacks_outstanding_{2};
841   std::atomic_bool started_{false};
842   grpc::internal::Mutex start_mu_;
843 };
844 
845 template <class Response>
846 class ClientCallbackReaderFactory {
847  public:
848   template <class Request>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,const Request * request,ClientReadReactor<Response> * reactor)849   static void Create(::grpc::ChannelInterface* channel,
850                      const ::grpc::internal::RpcMethod& method,
851                      ::grpc_impl::ClientContext* context,
852                      const Request* request,
853                      ClientReadReactor<Response>* reactor) {
854     grpc::internal::Call call =
855         channel->CreateCall(method, context, channel->CallbackCQ());
856 
857     ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
858     new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
859         call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
860         ClientCallbackReaderImpl<Response>(call, context, request, reactor);
861   }
862 };
863 
864 template <class Request>
865 class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
866  public:
867   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)868   static void operator delete(void* /*ptr*/, std::size_t size) {
869     GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackWriterImpl));
870   }
871 
872   // This operator should never be called as the memory should be freed as part
873   // of the arena destruction. It only exists to provide a matching operator
874   // delete to the operator new so that some compilers will not complain (see
875   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
876   // there are no tests catching the compiler warning.
delete(void *,void *)877   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
878 
StartCall()879   void StartCall() override {
880     // This call initiates two batches, plus any backlog, each with a callback
881     // 1. Send initial metadata (unless corked) + recv initial metadata
882     // 2. Any backlog
883     // 3. Recv trailing metadata
884 
885     if (!start_corked_) {
886       start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
887                                      context_->initial_metadata_flags());
888     }
889     call_.PerformOps(&start_ops_);
890 
891     {
892       grpc::internal::MutexLock lock(&start_mu_);
893 
894       if (backlog_.write_ops) {
895         call_.PerformOps(&write_ops_);
896       }
897       if (backlog_.writes_done_ops) {
898         call_.PerformOps(&writes_done_ops_);
899       }
900       call_.PerformOps(&finish_ops_);
901       // The last thing in this critical section is to set started_ so that it
902       // can be used lock-free as well.
903       started_.store(true, std::memory_order_release);
904     }
905     // MaybeFinish outside the lock to make sure that destruction of this object
906     // doesn't take place while holding the lock (which would cause the lock to
907     // be released after destruction)
908     this->MaybeFinish(/*from_reaction=*/false);
909   }
910 
Write(const Request * msg,::grpc::WriteOptions options)911   void Write(const Request* msg, ::grpc::WriteOptions options) override {
912     if (GPR_UNLIKELY(options.is_last_message())) {
913       options.set_buffer_hint();
914       write_ops_.ClientSendClose();
915     }
916     // TODO(vjpai): don't assert
917     GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
918     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
919 
920     if (GPR_UNLIKELY(corked_write_needed_)) {
921       write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
922                                      context_->initial_metadata_flags());
923       corked_write_needed_ = false;
924     }
925 
926     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
927       grpc::internal::MutexLock lock(&start_mu_);
928       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
929         backlog_.write_ops = true;
930         return;
931       }
932     }
933     call_.PerformOps(&write_ops_);
934   }
935 
WritesDone()936   void WritesDone() override {
937     writes_done_ops_.ClientSendClose();
938     writes_done_tag_.Set(call_.call(),
939                          [this](bool ok) {
940                            reactor_->OnWritesDoneDone(ok);
941                            MaybeFinish(/*from_reaction=*/true);
942                          },
943                          &writes_done_ops_, /*can_inline=*/false);
944     writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
945     callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
946 
947     if (GPR_UNLIKELY(corked_write_needed_)) {
948       writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
949                                            context_->initial_metadata_flags());
950       corked_write_needed_ = false;
951     }
952 
953     if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
954       grpc::internal::MutexLock lock(&start_mu_);
955       if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
956         backlog_.writes_done_ops = true;
957         return;
958       }
959     }
960     call_.PerformOps(&writes_done_ops_);
961   }
962 
AddHold(int holds)963   void AddHold(int holds) override {
964     callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
965   }
RemoveHold()966   void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
967 
968  private:
969   friend class ClientCallbackWriterFactory<Request>;
970 
971   template <class Response>
ClientCallbackWriterImpl(::grpc::internal::Call call,::grpc_impl::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)972   ClientCallbackWriterImpl(::grpc::internal::Call call,
973                            ::grpc_impl::ClientContext* context,
974                            Response* response,
975                            ClientWriteReactor<Request>* reactor)
976       : context_(context),
977         call_(call),
978         reactor_(reactor),
979         start_corked_(context_->initial_metadata_corked_),
980         corked_write_needed_(start_corked_) {
981     this->BindReactor(reactor);
982 
983     // Set up the unchanging parts of the start and write tags and ops.
984     start_tag_.Set(call_.call(),
985                    [this](bool ok) {
986                      reactor_->OnReadInitialMetadataDone(ok);
987                      MaybeFinish(/*from_reaction=*/true);
988                    },
989                    &start_ops_, /*can_inline=*/false);
990     start_ops_.RecvInitialMetadata(context_);
991     start_ops_.set_core_cq_tag(&start_tag_);
992 
993     write_tag_.Set(call_.call(),
994                    [this](bool ok) {
995                      reactor_->OnWriteDone(ok);
996                      MaybeFinish(/*from_reaction=*/true);
997                    },
998                    &write_ops_, /*can_inline=*/false);
999     write_ops_.set_core_cq_tag(&write_tag_);
1000 
1001     // Also set up the Finish tag and op set.
1002     finish_ops_.RecvMessage(response);
1003     finish_ops_.AllowNoMessage();
1004     finish_tag_.Set(
1005         call_.call(),
1006         [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1007         &finish_ops_,
1008         /*can_inline=*/false);
1009     finish_ops_.ClientRecvStatus(context_, &finish_status_);
1010     finish_ops_.set_core_cq_tag(&finish_tag_);
1011   }
1012 
1013   // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)1014   void MaybeFinish(bool from_reaction) {
1015     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1016                          1, std::memory_order_acq_rel) == 1)) {
1017       ::grpc::Status s = std::move(finish_status_);
1018       auto* reactor = reactor_;
1019       auto* call = call_.call();
1020       this->~ClientCallbackWriterImpl();
1021       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
1022       if (GPR_LIKELY(from_reaction)) {
1023         reactor->OnDone(s);
1024       } else {
1025         reactor->InternalScheduleOnDone(std::move(s));
1026       }
1027     }
1028   }
1029 
1030   ::grpc_impl::ClientContext* const context_;
1031   grpc::internal::Call call_;
1032   ClientWriteReactor<Request>* const reactor_;
1033 
1034   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1035                             grpc::internal::CallOpRecvInitialMetadata>
1036       start_ops_;
1037   grpc::internal::CallbackWithSuccessTag start_tag_;
1038   const bool start_corked_;
1039   bool corked_write_needed_;  // no lock needed since only accessed in
1040                               // Write/WritesDone which cannot be concurrent
1041 
1042   grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1043                             grpc::internal::CallOpClientRecvStatus>
1044       finish_ops_;
1045   grpc::internal::CallbackWithSuccessTag finish_tag_;
1046   ::grpc::Status finish_status_;
1047 
1048   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1049                             grpc::internal::CallOpSendMessage,
1050                             grpc::internal::CallOpClientSendClose>
1051       write_ops_;
1052   grpc::internal::CallbackWithSuccessTag write_tag_;
1053 
1054   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1055                             grpc::internal::CallOpClientSendClose>
1056       writes_done_ops_;
1057   grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1058 
1059   struct StartCallBacklog {
1060     bool write_ops = false;
1061     bool writes_done_ops = false;
1062   };
1063   StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
1064 
1065   // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1066   std::atomic<intptr_t> callbacks_outstanding_{3};
1067   std::atomic_bool started_{false};
1068   grpc::internal::Mutex start_mu_;
1069 };
1070 
1071 template <class Request>
1072 class ClientCallbackWriterFactory {
1073  public:
1074   template <class Response>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)1075   static void Create(::grpc::ChannelInterface* channel,
1076                      const ::grpc::internal::RpcMethod& method,
1077                      ::grpc_impl::ClientContext* context, Response* response,
1078                      ClientWriteReactor<Request>* reactor) {
1079     grpc::internal::Call call =
1080         channel->CreateCall(method, context, channel->CallbackCQ());
1081 
1082     ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
1083     new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
1084         call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
1085         ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1086   }
1087 };
1088 
1089 class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
1090  public:
1091   // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)1092   static void operator delete(void* /*ptr*/, std::size_t size) {
1093     GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackUnaryImpl));
1094   }
1095 
1096   // This operator should never be called as the memory should be freed as part
1097   // of the arena destruction. It only exists to provide a matching operator
1098   // delete to the operator new so that some compilers will not complain (see
1099   // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1100   // there are no tests catching the compiler warning.
delete(void *,void *)1101   static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
1102 
StartCall()1103   void StartCall() override {
1104     // This call initiates two batches, each with a callback
1105     // 1. Send initial metadata + write + writes done + recv initial metadata
1106     // 2. Read message, recv trailing metadata
1107 
1108     start_tag_.Set(call_.call(),
1109                    [this](bool ok) {
1110                      reactor_->OnReadInitialMetadataDone(ok);
1111                      MaybeFinish();
1112                    },
1113                    &start_ops_, /*can_inline=*/false);
1114     start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1115                                    context_->initial_metadata_flags());
1116     start_ops_.RecvInitialMetadata(context_);
1117     start_ops_.set_core_cq_tag(&start_tag_);
1118     call_.PerformOps(&start_ops_);
1119 
1120     finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); },
1121                     &finish_ops_,
1122                     /*can_inline=*/false);
1123     finish_ops_.ClientRecvStatus(context_, &finish_status_);
1124     finish_ops_.set_core_cq_tag(&finish_tag_);
1125     call_.PerformOps(&finish_ops_);
1126   }
1127 
1128  private:
1129   friend class ClientCallbackUnaryFactory;
1130 
1131   template <class Request, class Response>
ClientCallbackUnaryImpl(::grpc::internal::Call call,::grpc_impl::ClientContext * context,Request * request,Response * response,ClientUnaryReactor * reactor)1132   ClientCallbackUnaryImpl(::grpc::internal::Call call,
1133                           ::grpc_impl::ClientContext* context, Request* request,
1134                           Response* response, ClientUnaryReactor* reactor)
1135       : context_(context), call_(call), reactor_(reactor) {
1136     this->BindReactor(reactor);
1137     // TODO(vjpai): don't assert
1138     GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
1139     start_ops_.ClientSendClose();
1140     finish_ops_.RecvMessage(response);
1141     finish_ops_.AllowNoMessage();
1142   }
1143 
1144   // In the unary case, MaybeFinish is only ever invoked from a
1145   // library-initiated reaction, so it will just directly call OnDone if this is
1146   // the last reaction for this RPC.
MaybeFinish()1147   void MaybeFinish() {
1148     if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1149                          1, std::memory_order_acq_rel) == 1)) {
1150       ::grpc::Status s = std::move(finish_status_);
1151       auto* reactor = reactor_;
1152       auto* call = call_.call();
1153       this->~ClientCallbackUnaryImpl();
1154       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
1155       reactor->OnDone(s);
1156     }
1157   }
1158 
1159   ::grpc_impl::ClientContext* const context_;
1160   grpc::internal::Call call_;
1161   ClientUnaryReactor* const reactor_;
1162 
1163   grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1164                             grpc::internal::CallOpSendMessage,
1165                             grpc::internal::CallOpClientSendClose,
1166                             grpc::internal::CallOpRecvInitialMetadata>
1167       start_ops_;
1168   grpc::internal::CallbackWithSuccessTag start_tag_;
1169 
1170   grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1171                             grpc::internal::CallOpClientRecvStatus>
1172       finish_ops_;
1173   grpc::internal::CallbackWithSuccessTag finish_tag_;
1174   ::grpc::Status finish_status_;
1175 
1176   // This call will have 2 callbacks: start and finish
1177   std::atomic<intptr_t> callbacks_outstanding_{2};
1178 };
1179 
1180 class ClientCallbackUnaryFactory {
1181  public:
1182   template <class Request, class Response>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,const Request * request,Response * response,ClientUnaryReactor * reactor)1183   static void Create(::grpc::ChannelInterface* channel,
1184                      const ::grpc::internal::RpcMethod& method,
1185                      ::grpc_impl::ClientContext* context,
1186                      const Request* request, Response* response,
1187                      ClientUnaryReactor* reactor) {
1188     grpc::internal::Call call =
1189         channel->CreateCall(method, context, channel->CallbackCQ());
1190 
1191     ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
1192 
1193     new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
1194         call.call(), sizeof(ClientCallbackUnaryImpl)))
1195         ClientCallbackUnaryImpl(call, context, request, response, reactor);
1196   }
1197 };
1198 
1199 }  // namespace internal
1200 }  // namespace grpc_impl
1201 #endif  // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
1202