1 /*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
20 #include <atomic>
21 #include <functional>
22
23 #include <grpcpp/impl/codegen/call.h>
24 #include <grpcpp/impl/codegen/call_op_set.h>
25 #include <grpcpp/impl/codegen/callback_common.h>
26 #include <grpcpp/impl/codegen/channel_interface.h>
27 #include <grpcpp/impl/codegen/config.h>
28 #include <grpcpp/impl/codegen/core_codegen_interface.h>
29 #include <grpcpp/impl/codegen/status.h>
30
31 namespace grpc {
32 class Channel;
33 class ClientContext;
34
35 namespace internal {
36 class RpcMethod;
37
38 /// Perform a callback-based unary call. May optionally specify the base
39 /// class of the Request and Response so that the internal calls and structures
40 /// below this may be based on those base classes and thus achieve code reuse
41 /// across different RPCs (e.g., for protobuf, MessageLite would be a base
42 /// class).
43 /// TODO(vjpai): Combine as much as possible with the blocking unary call code
44 template <class InputMessage, class OutputMessage,
45 class BaseInputMessage = InputMessage,
46 class BaseOutputMessage = OutputMessage>
CallbackUnaryCall(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (::grpc::Status)> on_completion)47 void CallbackUnaryCall(::grpc::ChannelInterface* channel,
48 const ::grpc::internal::RpcMethod& method,
49 ::grpc::ClientContext* context,
50 const InputMessage* request, OutputMessage* result,
51 std::function<void(::grpc::Status)> on_completion) {
52 static_assert(std::is_base_of<BaseInputMessage, InputMessage>::value,
53 "Invalid input message specification");
54 static_assert(std::is_base_of<BaseOutputMessage, OutputMessage>::value,
55 "Invalid output message specification");
56 CallbackUnaryCallImpl<BaseInputMessage, BaseOutputMessage> x(
57 channel, method, context, request, result, on_completion);
58 }
59
60 template <class InputMessage, class OutputMessage>
61 class CallbackUnaryCallImpl {
62 public:
CallbackUnaryCallImpl(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (::grpc::Status)> on_completion)63 CallbackUnaryCallImpl(::grpc::ChannelInterface* channel,
64 const ::grpc::internal::RpcMethod& method,
65 ::grpc::ClientContext* context,
66 const InputMessage* request, OutputMessage* result,
67 std::function<void(::grpc::Status)> on_completion) {
68 ::grpc::CompletionQueue* cq = channel->CallbackCQ();
69 GPR_CODEGEN_ASSERT(cq != nullptr);
70 grpc::internal::Call call(channel->CreateCall(method, context, cq));
71
72 using FullCallOpSet = grpc::internal::CallOpSet<
73 ::grpc::internal::CallOpSendInitialMetadata,
74 grpc::internal::CallOpSendMessage,
75 grpc::internal::CallOpRecvInitialMetadata,
76 grpc::internal::CallOpRecvMessage<OutputMessage>,
77 grpc::internal::CallOpClientSendClose,
78 grpc::internal::CallOpClientRecvStatus>;
79
80 struct OpSetAndTag {
81 FullCallOpSet opset;
82 grpc::internal::CallbackWithStatusTag tag;
83 };
84 const size_t alloc_sz = sizeof(OpSetAndTag);
85 auto* const alloced = static_cast<OpSetAndTag*>(
86 ::grpc::g_core_codegen_interface->grpc_call_arena_alloc(call.call(),
87 alloc_sz));
88 auto* ops = new (&alloced->opset) FullCallOpSet;
89 auto* tag = new (&alloced->tag)
90 grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
91
92 // TODO(vjpai): Unify code with sync API as much as possible
93 ::grpc::Status s = ops->SendMessagePtr(request);
94 if (!s.ok()) {
95 tag->force_run(s);
96 return;
97 }
98 ops->SendInitialMetadata(&context->send_initial_metadata_,
99 context->initial_metadata_flags());
100 ops->RecvInitialMetadata(context);
101 ops->RecvMessage(result);
102 ops->AllowNoMessage();
103 ops->ClientSendClose();
104 ops->ClientRecvStatus(context, tag->status_ptr());
105 ops->set_core_cq_tag(tag);
106 call.PerformOps(ops);
107 }
108 };
109
110 // Base class for public API classes.
111 class ClientReactor {
112 public:
113 /// Called by the library when all operations associated with this RPC have
114 /// completed and all Holds have been removed. OnDone provides the RPC status
115 /// outcome for both successful and failed RPCs. If it is never called on an
116 /// RPC, it indicates an application-level problem (like failure to remove a
117 /// hold).
118 ///
119 /// \param[in] s The status outcome of this RPC
120 virtual void OnDone(const ::grpc::Status& /*s*/) = 0;
121
122 /// InternalScheduleOnDone is not part of the API and is not meant to be
123 /// overridden. It is virtual to allow successful builds for certain bazel
124 /// build users that only want to depend on gRPC codegen headers and not the
125 /// full library (although this is not a generally-supported option). Although
126 /// the virtual call is slower than a direct call, this function is
127 /// heavyweight and the cost of the virtual call is not much in comparison.
128 /// This function may be removed or devirtualized in the future.
129 virtual void InternalScheduleOnDone(::grpc::Status s);
130 };
131
132 } // namespace internal
133
134 // Forward declarations
135 template <class Request, class Response>
136 class ClientBidiReactor;
137 template <class Response>
138 class ClientReadReactor;
139 template <class Request>
140 class ClientWriteReactor;
141 class ClientUnaryReactor;
142
143 // NOTE: The streaming objects are not actually implemented in the public API.
144 // These interfaces are provided for mocking only. Typical applications
145 // will interact exclusively with the reactors that they define.
146 template <class Request, class Response>
147 class ClientCallbackReaderWriter {
148 public:
~ClientCallbackReaderWriter()149 virtual ~ClientCallbackReaderWriter() {}
150 virtual void StartCall() = 0;
151 virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
152 virtual void WritesDone() = 0;
153 virtual void Read(Response* resp) = 0;
154 virtual void AddHold(int holds) = 0;
155 virtual void RemoveHold() = 0;
156
157 protected:
BindReactor(ClientBidiReactor<Request,Response> * reactor)158 void BindReactor(ClientBidiReactor<Request, Response>* reactor) {
159 reactor->BindStream(this);
160 }
161 };
162
163 template <class Response>
164 class ClientCallbackReader {
165 public:
~ClientCallbackReader()166 virtual ~ClientCallbackReader() {}
167 virtual void StartCall() = 0;
168 virtual void Read(Response* resp) = 0;
169 virtual void AddHold(int holds) = 0;
170 virtual void RemoveHold() = 0;
171
172 protected:
BindReactor(ClientReadReactor<Response> * reactor)173 void BindReactor(ClientReadReactor<Response>* reactor) {
174 reactor->BindReader(this);
175 }
176 };
177
178 template <class Request>
179 class ClientCallbackWriter {
180 public:
~ClientCallbackWriter()181 virtual ~ClientCallbackWriter() {}
182 virtual void StartCall() = 0;
Write(const Request * req)183 void Write(const Request* req) { Write(req, ::grpc::WriteOptions()); }
184 virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
WriteLast(const Request * req,::grpc::WriteOptions options)185 void WriteLast(const Request* req, ::grpc::WriteOptions options) {
186 Write(req, options.set_last_message());
187 }
188 virtual void WritesDone() = 0;
189
190 virtual void AddHold(int holds) = 0;
191 virtual void RemoveHold() = 0;
192
193 protected:
BindReactor(ClientWriteReactor<Request> * reactor)194 void BindReactor(ClientWriteReactor<Request>* reactor) {
195 reactor->BindWriter(this);
196 }
197 };
198
199 class ClientCallbackUnary {
200 public:
~ClientCallbackUnary()201 virtual ~ClientCallbackUnary() {}
202 virtual void StartCall() = 0;
203
204 protected:
205 void BindReactor(ClientUnaryReactor* reactor);
206 };
207
208 // The following classes are the reactor interfaces that are to be implemented
209 // by the user. They are passed in to the library as an argument to a call on a
210 // stub (either a codegen-ed call or a generic call). The streaming RPC is
211 // activated by calling StartCall, possibly after initiating StartRead,
212 // StartWrite, or AddHold operations on the streaming object. Note that none of
213 // the classes are pure; all reactions have a default empty reaction so that the
214 // user class only needs to override those classes that it cares about.
215 // The reactor must be passed to the stub invocation before any of the below
216 // operations can be called.
217
218 /// \a ClientBidiReactor is the interface for a bidirectional streaming RPC.
219 template <class Request, class Response>
220 class ClientBidiReactor : public internal::ClientReactor {
221 public:
~ClientBidiReactor()222 virtual ~ClientBidiReactor() {}
223
224 /// Activate the RPC and initiate any reads or writes that have been Start'ed
225 /// before this call. All streaming RPCs issued by the client MUST have
226 /// StartCall invoked on them (even if they are canceled) as this call is the
227 /// activation of their lifecycle.
StartCall()228 void StartCall() { stream_->StartCall(); }
229
230 /// Initiate a read operation (or post it for later initiation if StartCall
231 /// has not yet been invoked).
232 ///
233 /// \param[out] resp Where to eventually store the read message. Valid when
234 /// the library calls OnReadDone
StartRead(Response * resp)235 void StartRead(Response* resp) { stream_->Read(resp); }
236
237 /// Initiate a write operation (or post it for later initiation if StartCall
238 /// has not yet been invoked).
239 ///
240 /// \param[in] req The message to be written. The library does not take
241 /// ownership but the caller must ensure that the message is
242 /// not deleted or modified until OnWriteDone is called.
StartWrite(const Request * req)243 void StartWrite(const Request* req) {
244 StartWrite(req, ::grpc::WriteOptions());
245 }
246
247 /// Initiate/post a write operation with specified options.
248 ///
249 /// \param[in] req The message to be written. The library does not take
250 /// ownership but the caller must ensure that the message is
251 /// not deleted or modified until OnWriteDone is called.
252 /// \param[in] options The WriteOptions to use for writing this message
StartWrite(const Request * req,::grpc::WriteOptions options)253 void StartWrite(const Request* req, ::grpc::WriteOptions options) {
254 stream_->Write(req, options);
255 }
256
257 /// Initiate/post a write operation with specified options and an indication
258 /// that this is the last write (like StartWrite and StartWritesDone, merged).
259 /// Note that calling this means that no more calls to StartWrite,
260 /// StartWriteLast, or StartWritesDone are allowed.
261 ///
262 /// \param[in] req The message to be written. The library does not take
263 /// ownership but the caller must ensure that the message is
264 /// not deleted or modified until OnWriteDone is called.
265 /// \param[in] options The WriteOptions to use for writing this message
StartWriteLast(const Request * req,::grpc::WriteOptions options)266 void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
267 StartWrite(req, options.set_last_message());
268 }
269
270 /// Indicate that the RPC will have no more write operations. This can only be
271 /// issued once for a given RPC. This is not required or allowed if
272 /// StartWriteLast is used since that already has the same implication.
273 /// Note that calling this means that no more calls to StartWrite,
274 /// StartWriteLast, or StartWritesDone are allowed.
StartWritesDone()275 void StartWritesDone() { stream_->WritesDone(); }
276
277 /// Holds are needed if (and only if) this stream has operations that take
278 /// place on it after StartCall but from outside one of the reactions
279 /// (OnReadDone, etc). This is _not_ a common use of the streaming API.
280 ///
281 /// Holds must be added before calling StartCall. If a stream still has a hold
282 /// in place, its resources will not be destroyed even if the status has
283 /// already come in from the wire and there are currently no active callbacks
284 /// outstanding. Similarly, the stream will not call OnDone if there are still
285 /// holds on it.
286 ///
287 /// For example, if a StartRead or StartWrite operation is going to be
288 /// initiated from elsewhere in the application, the application should call
289 /// AddHold or AddMultipleHolds before StartCall. If there is going to be,
290 /// for example, a read-flow and a write-flow taking place outside the
291 /// reactions, then call AddMultipleHolds(2) before StartCall. When the
292 /// application knows that it won't issue any more read operations (such as
293 /// when a read comes back as not ok), it should issue a RemoveHold(). It
294 /// should also call RemoveHold() again after it does StartWriteLast or
295 /// StartWritesDone that indicates that there will be no more write ops.
296 /// The number of RemoveHold calls must match the total number of AddHold
297 /// calls plus the number of holds added by AddMultipleHolds.
298 /// The argument to AddMultipleHolds must be positive.
AddHold()299 void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)300 void AddMultipleHolds(int holds) {
301 GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
302 stream_->AddHold(holds);
303 }
RemoveHold()304 void RemoveHold() { stream_->RemoveHold(); }
305
306 /// Notifies the application that all operations associated with this RPC
307 /// have completed and all Holds have been removed. OnDone provides the RPC
308 /// status outcome for both successful and failed RPCs and will be called in
309 /// all cases. If it is not called, it indicates an application-level problem
310 /// (like failure to remove a hold).
311 ///
312 /// \param[in] s The status outcome of this RPC
OnDone(const::grpc::Status &)313 void OnDone(const ::grpc::Status& /*s*/) override {}
314
315 /// Notifies the application that a read of initial metadata from the
316 /// server is done. If the application chooses not to implement this method,
317 /// it can assume that the initial metadata has been read before the first
318 /// call of OnReadDone or OnDone.
319 ///
320 /// \param[in] ok Was the initial metadata read successfully? If false, no
321 /// new read/write operation will succeed, and any further
322 /// Start* operations should not be called.
OnReadInitialMetadataDone(bool)323 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
324
325 /// Notifies the application that a StartRead operation completed.
326 ///
327 /// \param[in] ok Was it successful? If false, no new read/write operation
328 /// will succeed, and any further Start* should not be called.
OnReadDone(bool)329 virtual void OnReadDone(bool /*ok*/) {}
330
331 /// Notifies the application that a StartWrite or StartWriteLast operation
332 /// completed.
333 ///
334 /// \param[in] ok Was it successful? If false, no new read/write operation
335 /// will succeed, and any further Start* should not be called.
OnWriteDone(bool)336 virtual void OnWriteDone(bool /*ok*/) {}
337
338 /// Notifies the application that a StartWritesDone operation completed. Note
339 /// that this is only used on explicit StartWritesDone operations and not for
340 /// those that are implicitly invoked as part of a StartWriteLast.
341 ///
342 /// \param[in] ok Was it successful? If false, the application will later see
343 /// the failure reflected as a bad status in OnDone and no
344 /// further Start* should be called.
OnWritesDoneDone(bool)345 virtual void OnWritesDoneDone(bool /*ok*/) {}
346
347 private:
348 friend class ClientCallbackReaderWriter<Request, Response>;
BindStream(ClientCallbackReaderWriter<Request,Response> * stream)349 void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
350 stream_ = stream;
351 }
352 ClientCallbackReaderWriter<Request, Response>* stream_;
353 };
354
355 /// \a ClientReadReactor is the interface for a server-streaming RPC.
356 /// All public methods behave as in ClientBidiReactor.
357 template <class Response>
358 class ClientReadReactor : public internal::ClientReactor {
359 public:
~ClientReadReactor()360 virtual ~ClientReadReactor() {}
361
StartCall()362 void StartCall() { reader_->StartCall(); }
StartRead(Response * resp)363 void StartRead(Response* resp) { reader_->Read(resp); }
364
AddHold()365 void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)366 void AddMultipleHolds(int holds) {
367 GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
368 reader_->AddHold(holds);
369 }
RemoveHold()370 void RemoveHold() { reader_->RemoveHold(); }
371
OnDone(const::grpc::Status &)372 void OnDone(const ::grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)373 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnReadDone(bool)374 virtual void OnReadDone(bool /*ok*/) {}
375
376 private:
377 friend class ClientCallbackReader<Response>;
BindReader(ClientCallbackReader<Response> * reader)378 void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
379 ClientCallbackReader<Response>* reader_;
380 };
381
382 /// \a ClientWriteReactor is the interface for a client-streaming RPC.
383 /// All public methods behave as in ClientBidiReactor.
384 template <class Request>
385 class ClientWriteReactor : public internal::ClientReactor {
386 public:
~ClientWriteReactor()387 virtual ~ClientWriteReactor() {}
388
StartCall()389 void StartCall() { writer_->StartCall(); }
StartWrite(const Request * req)390 void StartWrite(const Request* req) {
391 StartWrite(req, ::grpc::WriteOptions());
392 }
StartWrite(const Request * req,::grpc::WriteOptions options)393 void StartWrite(const Request* req, ::grpc::WriteOptions options) {
394 writer_->Write(req, options);
395 }
StartWriteLast(const Request * req,::grpc::WriteOptions options)396 void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
397 StartWrite(req, options.set_last_message());
398 }
StartWritesDone()399 void StartWritesDone() { writer_->WritesDone(); }
400
AddHold()401 void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)402 void AddMultipleHolds(int holds) {
403 GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
404 writer_->AddHold(holds);
405 }
RemoveHold()406 void RemoveHold() { writer_->RemoveHold(); }
407
OnDone(const::grpc::Status &)408 void OnDone(const ::grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)409 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnWriteDone(bool)410 virtual void OnWriteDone(bool /*ok*/) {}
OnWritesDoneDone(bool)411 virtual void OnWritesDoneDone(bool /*ok*/) {}
412
413 private:
414 friend class ClientCallbackWriter<Request>;
BindWriter(ClientCallbackWriter<Request> * writer)415 void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
416
417 ClientCallbackWriter<Request>* writer_;
418 };
419
420 /// \a ClientUnaryReactor is a reactor-style interface for a unary RPC.
421 /// This is _not_ a common way of invoking a unary RPC. In practice, this
422 /// option should be used only if the unary RPC wants to receive initial
423 /// metadata without waiting for the response to complete. Most deployments of
424 /// RPC systems do not use this option, but it is needed for generality.
425 /// All public methods behave as in ClientBidiReactor.
426 /// StartCall is included for consistency with the other reactor flavors: even
427 /// though there are no StartRead or StartWrite operations to queue before the
428 /// call (that is part of the unary call itself) and there is no reactor object
429 /// being created as a result of this call, we keep a consistent 2-phase
430 /// initiation API among all the reactor flavors.
431 class ClientUnaryReactor : public internal::ClientReactor {
432 public:
~ClientUnaryReactor()433 virtual ~ClientUnaryReactor() {}
434
StartCall()435 void StartCall() { call_->StartCall(); }
OnDone(const::grpc::Status &)436 void OnDone(const ::grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)437 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
438
439 private:
440 friend class ClientCallbackUnary;
BindCall(ClientCallbackUnary * call)441 void BindCall(ClientCallbackUnary* call) { call_ = call; }
442 ClientCallbackUnary* call_;
443 };
444
445 // Define function out-of-line from class to avoid forward declaration issue
BindReactor(ClientUnaryReactor * reactor)446 inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) {
447 reactor->BindCall(this);
448 }
449
450 namespace internal {
451
452 // Forward declare factory classes for friendship
453 template <class Request, class Response>
454 class ClientCallbackReaderWriterFactory;
455 template <class Response>
456 class ClientCallbackReaderFactory;
457 template <class Request>
458 class ClientCallbackWriterFactory;
459
460 template <class Request, class Response>
461 class ClientCallbackReaderWriterImpl
462 : public ClientCallbackReaderWriter<Request, Response> {
463 public:
464 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)465 static void operator delete(void* /*ptr*/, std::size_t size) {
466 GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderWriterImpl));
467 }
468
469 // This operator should never be called as the memory should be freed as part
470 // of the arena destruction. It only exists to provide a matching operator
471 // delete to the operator new so that some compilers will not complain (see
472 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
473 // there are no tests catching the compiler warning.
delete(void *,void *)474 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
475
StartCall()476 void StartCall() override {
477 // This call initiates two batches, plus any backlog, each with a callback
478 // 1. Send initial metadata (unless corked) + recv initial metadata
479 // 2. Any read backlog
480 // 3. Any write backlog
481 // 4. Recv trailing metadata (unless corked)
482 if (!start_corked_) {
483 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
484 context_->initial_metadata_flags());
485 }
486
487 call_.PerformOps(&start_ops_);
488
489 {
490 grpc::internal::MutexLock lock(&start_mu_);
491
492 if (backlog_.read_ops) {
493 call_.PerformOps(&read_ops_);
494 }
495 if (backlog_.write_ops) {
496 call_.PerformOps(&write_ops_);
497 }
498 if (backlog_.writes_done_ops) {
499 call_.PerformOps(&writes_done_ops_);
500 }
501 call_.PerformOps(&finish_ops_);
502 // The last thing in this critical section is to set started_ so that it
503 // can be used lock-free as well.
504 started_.store(true, std::memory_order_release);
505 }
506 // MaybeFinish outside the lock to make sure that destruction of this object
507 // doesn't take place while holding the lock (which would cause the lock to
508 // be released after destruction)
509 this->MaybeFinish(/*from_reaction=*/false);
510 }
511
Read(Response * msg)512 void Read(Response* msg) override {
513 read_ops_.RecvMessage(msg);
514 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
515 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
516 grpc::internal::MutexLock lock(&start_mu_);
517 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
518 backlog_.read_ops = true;
519 return;
520 }
521 }
522 call_.PerformOps(&read_ops_);
523 }
524
Write(const Request * msg,::grpc::WriteOptions options)525 void Write(const Request* msg, ::grpc::WriteOptions options) override {
526 if (options.is_last_message()) {
527 options.set_buffer_hint();
528 write_ops_.ClientSendClose();
529 }
530 // TODO(vjpai): don't assert
531 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
532 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
533 if (GPR_UNLIKELY(corked_write_needed_)) {
534 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
535 context_->initial_metadata_flags());
536 corked_write_needed_ = false;
537 }
538
539 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
540 grpc::internal::MutexLock lock(&start_mu_);
541 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
542 backlog_.write_ops = true;
543 return;
544 }
545 }
546 call_.PerformOps(&write_ops_);
547 }
WritesDone()548 void WritesDone() override {
549 writes_done_ops_.ClientSendClose();
550 writes_done_tag_.Set(
551 call_.call(),
552 [this](bool ok) {
553 reactor_->OnWritesDoneDone(ok);
554 MaybeFinish(/*from_reaction=*/true);
555 },
556 &writes_done_ops_, /*can_inline=*/false);
557 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
558 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
559 if (GPR_UNLIKELY(corked_write_needed_)) {
560 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
561 context_->initial_metadata_flags());
562 corked_write_needed_ = false;
563 }
564 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
565 grpc::internal::MutexLock lock(&start_mu_);
566 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
567 backlog_.writes_done_ops = true;
568 return;
569 }
570 }
571 call_.PerformOps(&writes_done_ops_);
572 }
573
AddHold(int holds)574 void AddHold(int holds) override {
575 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
576 }
RemoveHold()577 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
578
579 private:
580 friend class ClientCallbackReaderWriterFactory<Request, Response>;
581
ClientCallbackReaderWriterImpl(grpc::internal::Call call,::grpc::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)582 ClientCallbackReaderWriterImpl(grpc::internal::Call call,
583 ::grpc::ClientContext* context,
584 ClientBidiReactor<Request, Response>* reactor)
585 : context_(context),
586 call_(call),
587 reactor_(reactor),
588 start_corked_(context_->initial_metadata_corked_),
589 corked_write_needed_(start_corked_) {
590 this->BindReactor(reactor);
591
592 // Set up the unchanging parts of the start, read, and write tags and ops.
593 start_tag_.Set(
594 call_.call(),
595 [this](bool ok) {
596 reactor_->OnReadInitialMetadataDone(ok);
597 MaybeFinish(/*from_reaction=*/true);
598 },
599 &start_ops_, /*can_inline=*/false);
600 start_ops_.RecvInitialMetadata(context_);
601 start_ops_.set_core_cq_tag(&start_tag_);
602
603 write_tag_.Set(
604 call_.call(),
605 [this](bool ok) {
606 reactor_->OnWriteDone(ok);
607 MaybeFinish(/*from_reaction=*/true);
608 },
609 &write_ops_, /*can_inline=*/false);
610 write_ops_.set_core_cq_tag(&write_tag_);
611
612 read_tag_.Set(
613 call_.call(),
614 [this](bool ok) {
615 reactor_->OnReadDone(ok);
616 MaybeFinish(/*from_reaction=*/true);
617 },
618 &read_ops_, /*can_inline=*/false);
619 read_ops_.set_core_cq_tag(&read_tag_);
620
621 // Also set up the Finish tag and op set.
622 finish_tag_.Set(
623 call_.call(),
624 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
625 &finish_ops_,
626 /*can_inline=*/false);
627 finish_ops_.ClientRecvStatus(context_, &finish_status_);
628 finish_ops_.set_core_cq_tag(&finish_tag_);
629 }
630
631 // MaybeFinish can be called from reactions or from user-initiated operations
632 // like StartCall or RemoveHold. If this is the last operation or hold on this
633 // object, it will invoke the OnDone reaction. If MaybeFinish was called from
634 // a reaction, it can call OnDone directly. If not, it would need to schedule
635 // OnDone onto an executor thread to avoid the possibility of deadlocking with
636 // any locks in the user code that invoked it.
MaybeFinish(bool from_reaction)637 void MaybeFinish(bool from_reaction) {
638 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
639 1, std::memory_order_acq_rel) == 1)) {
640 ::grpc::Status s = std::move(finish_status_);
641 auto* reactor = reactor_;
642 auto* call = call_.call();
643 this->~ClientCallbackReaderWriterImpl();
644 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
645 if (GPR_LIKELY(from_reaction)) {
646 reactor->OnDone(s);
647 } else {
648 reactor->InternalScheduleOnDone(std::move(s));
649 }
650 }
651 }
652
653 ::grpc::ClientContext* const context_;
654 grpc::internal::Call call_;
655 ClientBidiReactor<Request, Response>* const reactor_;
656
657 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
658 grpc::internal::CallOpRecvInitialMetadata>
659 start_ops_;
660 grpc::internal::CallbackWithSuccessTag start_tag_;
661 const bool start_corked_;
662 bool corked_write_needed_; // no lock needed since only accessed in
663 // Write/WritesDone which cannot be concurrent
664
665 grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
666 grpc::internal::CallbackWithSuccessTag finish_tag_;
667 ::grpc::Status finish_status_;
668
669 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
670 grpc::internal::CallOpSendMessage,
671 grpc::internal::CallOpClientSendClose>
672 write_ops_;
673 grpc::internal::CallbackWithSuccessTag write_tag_;
674
675 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
676 grpc::internal::CallOpClientSendClose>
677 writes_done_ops_;
678 grpc::internal::CallbackWithSuccessTag writes_done_tag_;
679
680 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
681 read_ops_;
682 grpc::internal::CallbackWithSuccessTag read_tag_;
683
684 struct StartCallBacklog {
685 bool write_ops = false;
686 bool writes_done_ops = false;
687 bool read_ops = false;
688 };
689 StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
690
691 // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
692 std::atomic<intptr_t> callbacks_outstanding_{3};
693 std::atomic_bool started_{false};
694 grpc::internal::Mutex start_mu_;
695 };
696
697 template <class Request, class Response>
698 class ClientCallbackReaderWriterFactory {
699 public:
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)700 static void Create(::grpc::ChannelInterface* channel,
701 const ::grpc::internal::RpcMethod& method,
702 ::grpc::ClientContext* context,
703 ClientBidiReactor<Request, Response>* reactor) {
704 grpc::internal::Call call =
705 channel->CreateCall(method, context, channel->CallbackCQ());
706
707 ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
708 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
709 call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
710 ClientCallbackReaderWriterImpl<Request, Response>(call, context,
711 reactor);
712 }
713 };
714
715 template <class Response>
716 class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
717 public:
718 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)719 static void operator delete(void* /*ptr*/, std::size_t size) {
720 GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderImpl));
721 }
722
723 // This operator should never be called as the memory should be freed as part
724 // of the arena destruction. It only exists to provide a matching operator
725 // delete to the operator new so that some compilers will not complain (see
726 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
727 // there are no tests catching the compiler warning.
delete(void *,void *)728 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
729
StartCall()730 void StartCall() override {
731 // This call initiates two batches, plus any backlog, each with a callback
732 // 1. Send initial metadata (unless corked) + recv initial metadata
733 // 2. Any backlog
734 // 3. Recv trailing metadata
735
736 start_tag_.Set(
737 call_.call(),
738 [this](bool ok) {
739 reactor_->OnReadInitialMetadataDone(ok);
740 MaybeFinish(/*from_reaction=*/true);
741 },
742 &start_ops_, /*can_inline=*/false);
743 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
744 context_->initial_metadata_flags());
745 start_ops_.RecvInitialMetadata(context_);
746 start_ops_.set_core_cq_tag(&start_tag_);
747 call_.PerformOps(&start_ops_);
748
749 // Also set up the read tag so it doesn't have to be set up each time
750 read_tag_.Set(
751 call_.call(),
752 [this](bool ok) {
753 reactor_->OnReadDone(ok);
754 MaybeFinish(/*from_reaction=*/true);
755 },
756 &read_ops_, /*can_inline=*/false);
757 read_ops_.set_core_cq_tag(&read_tag_);
758
759 {
760 grpc::internal::MutexLock lock(&start_mu_);
761 if (backlog_.read_ops) {
762 call_.PerformOps(&read_ops_);
763 }
764 started_.store(true, std::memory_order_release);
765 }
766
767 finish_tag_.Set(
768 call_.call(),
769 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
770 &finish_ops_, /*can_inline=*/false);
771 finish_ops_.ClientRecvStatus(context_, &finish_status_);
772 finish_ops_.set_core_cq_tag(&finish_tag_);
773 call_.PerformOps(&finish_ops_);
774 }
775
Read(Response * msg)776 void Read(Response* msg) override {
777 read_ops_.RecvMessage(msg);
778 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
779 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
780 grpc::internal::MutexLock lock(&start_mu_);
781 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
782 backlog_.read_ops = true;
783 return;
784 }
785 }
786 call_.PerformOps(&read_ops_);
787 }
788
AddHold(int holds)789 void AddHold(int holds) override {
790 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
791 }
RemoveHold()792 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
793
794 private:
795 friend class ClientCallbackReaderFactory<Response>;
796
797 template <class Request>
ClientCallbackReaderImpl(::grpc::internal::Call call,::grpc::ClientContext * context,Request * request,ClientReadReactor<Response> * reactor)798 ClientCallbackReaderImpl(::grpc::internal::Call call,
799 ::grpc::ClientContext* context, Request* request,
800 ClientReadReactor<Response>* reactor)
801 : context_(context), call_(call), reactor_(reactor) {
802 this->BindReactor(reactor);
803 // TODO(vjpai): don't assert
804 GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
805 start_ops_.ClientSendClose();
806 }
807
808 // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)809 void MaybeFinish(bool from_reaction) {
810 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
811 1, std::memory_order_acq_rel) == 1)) {
812 ::grpc::Status s = std::move(finish_status_);
813 auto* reactor = reactor_;
814 auto* call = call_.call();
815 this->~ClientCallbackReaderImpl();
816 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
817 if (GPR_LIKELY(from_reaction)) {
818 reactor->OnDone(s);
819 } else {
820 reactor->InternalScheduleOnDone(std::move(s));
821 }
822 }
823 }
824
825 ::grpc::ClientContext* const context_;
826 grpc::internal::Call call_;
827 ClientReadReactor<Response>* const reactor_;
828
829 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
830 grpc::internal::CallOpSendMessage,
831 grpc::internal::CallOpClientSendClose,
832 grpc::internal::CallOpRecvInitialMetadata>
833 start_ops_;
834 grpc::internal::CallbackWithSuccessTag start_tag_;
835
836 grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
837 grpc::internal::CallbackWithSuccessTag finish_tag_;
838 ::grpc::Status finish_status_;
839
840 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
841 read_ops_;
842 grpc::internal::CallbackWithSuccessTag read_tag_;
843
844 struct StartCallBacklog {
845 bool read_ops = false;
846 };
847 StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
848
849 // Minimum of 2 callbacks to pre-register for start and finish
850 std::atomic<intptr_t> callbacks_outstanding_{2};
851 std::atomic_bool started_{false};
852 grpc::internal::Mutex start_mu_;
853 };
854
855 template <class Response>
856 class ClientCallbackReaderFactory {
857 public:
858 template <class Request>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const Request * request,ClientReadReactor<Response> * reactor)859 static void Create(::grpc::ChannelInterface* channel,
860 const ::grpc::internal::RpcMethod& method,
861 ::grpc::ClientContext* context, const Request* request,
862 ClientReadReactor<Response>* reactor) {
863 grpc::internal::Call call =
864 channel->CreateCall(method, context, channel->CallbackCQ());
865
866 ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
867 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
868 call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
869 ClientCallbackReaderImpl<Response>(call, context, request, reactor);
870 }
871 };
872
873 template <class Request>
874 class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
875 public:
876 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)877 static void operator delete(void* /*ptr*/, std::size_t size) {
878 GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackWriterImpl));
879 }
880
881 // This operator should never be called as the memory should be freed as part
882 // of the arena destruction. It only exists to provide a matching operator
883 // delete to the operator new so that some compilers will not complain (see
884 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
885 // there are no tests catching the compiler warning.
delete(void *,void *)886 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
887
StartCall()888 void StartCall() override {
889 // This call initiates two batches, plus any backlog, each with a callback
890 // 1. Send initial metadata (unless corked) + recv initial metadata
891 // 2. Any backlog
892 // 3. Recv trailing metadata
893
894 if (!start_corked_) {
895 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
896 context_->initial_metadata_flags());
897 }
898 call_.PerformOps(&start_ops_);
899
900 {
901 grpc::internal::MutexLock lock(&start_mu_);
902
903 if (backlog_.write_ops) {
904 call_.PerformOps(&write_ops_);
905 }
906 if (backlog_.writes_done_ops) {
907 call_.PerformOps(&writes_done_ops_);
908 }
909 call_.PerformOps(&finish_ops_);
910 // The last thing in this critical section is to set started_ so that it
911 // can be used lock-free as well.
912 started_.store(true, std::memory_order_release);
913 }
914 // MaybeFinish outside the lock to make sure that destruction of this object
915 // doesn't take place while holding the lock (which would cause the lock to
916 // be released after destruction)
917 this->MaybeFinish(/*from_reaction=*/false);
918 }
919
Write(const Request * msg,::grpc::WriteOptions options)920 void Write(const Request* msg, ::grpc::WriteOptions options) override {
921 if (GPR_UNLIKELY(options.is_last_message())) {
922 options.set_buffer_hint();
923 write_ops_.ClientSendClose();
924 }
925 // TODO(vjpai): don't assert
926 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
927 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
928
929 if (GPR_UNLIKELY(corked_write_needed_)) {
930 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
931 context_->initial_metadata_flags());
932 corked_write_needed_ = false;
933 }
934
935 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
936 grpc::internal::MutexLock lock(&start_mu_);
937 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
938 backlog_.write_ops = true;
939 return;
940 }
941 }
942 call_.PerformOps(&write_ops_);
943 }
944
WritesDone()945 void WritesDone() override {
946 writes_done_ops_.ClientSendClose();
947 writes_done_tag_.Set(
948 call_.call(),
949 [this](bool ok) {
950 reactor_->OnWritesDoneDone(ok);
951 MaybeFinish(/*from_reaction=*/true);
952 },
953 &writes_done_ops_, /*can_inline=*/false);
954 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
955 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
956
957 if (GPR_UNLIKELY(corked_write_needed_)) {
958 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
959 context_->initial_metadata_flags());
960 corked_write_needed_ = false;
961 }
962
963 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
964 grpc::internal::MutexLock lock(&start_mu_);
965 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
966 backlog_.writes_done_ops = true;
967 return;
968 }
969 }
970 call_.PerformOps(&writes_done_ops_);
971 }
972
AddHold(int holds)973 void AddHold(int holds) override {
974 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
975 }
RemoveHold()976 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
977
978 private:
979 friend class ClientCallbackWriterFactory<Request>;
980
981 template <class Response>
ClientCallbackWriterImpl(::grpc::internal::Call call,::grpc::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)982 ClientCallbackWriterImpl(::grpc::internal::Call call,
983 ::grpc::ClientContext* context, Response* response,
984 ClientWriteReactor<Request>* reactor)
985 : context_(context),
986 call_(call),
987 reactor_(reactor),
988 start_corked_(context_->initial_metadata_corked_),
989 corked_write_needed_(start_corked_) {
990 this->BindReactor(reactor);
991
992 // Set up the unchanging parts of the start and write tags and ops.
993 start_tag_.Set(
994 call_.call(),
995 [this](bool ok) {
996 reactor_->OnReadInitialMetadataDone(ok);
997 MaybeFinish(/*from_reaction=*/true);
998 },
999 &start_ops_, /*can_inline=*/false);
1000 start_ops_.RecvInitialMetadata(context_);
1001 start_ops_.set_core_cq_tag(&start_tag_);
1002
1003 write_tag_.Set(
1004 call_.call(),
1005 [this](bool ok) {
1006 reactor_->OnWriteDone(ok);
1007 MaybeFinish(/*from_reaction=*/true);
1008 },
1009 &write_ops_, /*can_inline=*/false);
1010 write_ops_.set_core_cq_tag(&write_tag_);
1011
1012 // Also set up the Finish tag and op set.
1013 finish_ops_.RecvMessage(response);
1014 finish_ops_.AllowNoMessage();
1015 finish_tag_.Set(
1016 call_.call(),
1017 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1018 &finish_ops_,
1019 /*can_inline=*/false);
1020 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1021 finish_ops_.set_core_cq_tag(&finish_tag_);
1022 }
1023
1024 // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)1025 void MaybeFinish(bool from_reaction) {
1026 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1027 1, std::memory_order_acq_rel) == 1)) {
1028 ::grpc::Status s = std::move(finish_status_);
1029 auto* reactor = reactor_;
1030 auto* call = call_.call();
1031 this->~ClientCallbackWriterImpl();
1032 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
1033 if (GPR_LIKELY(from_reaction)) {
1034 reactor->OnDone(s);
1035 } else {
1036 reactor->InternalScheduleOnDone(std::move(s));
1037 }
1038 }
1039 }
1040
1041 ::grpc::ClientContext* const context_;
1042 grpc::internal::Call call_;
1043 ClientWriteReactor<Request>* const reactor_;
1044
1045 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1046 grpc::internal::CallOpRecvInitialMetadata>
1047 start_ops_;
1048 grpc::internal::CallbackWithSuccessTag start_tag_;
1049 const bool start_corked_;
1050 bool corked_write_needed_; // no lock needed since only accessed in
1051 // Write/WritesDone which cannot be concurrent
1052
1053 grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1054 grpc::internal::CallOpClientRecvStatus>
1055 finish_ops_;
1056 grpc::internal::CallbackWithSuccessTag finish_tag_;
1057 ::grpc::Status finish_status_;
1058
1059 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1060 grpc::internal::CallOpSendMessage,
1061 grpc::internal::CallOpClientSendClose>
1062 write_ops_;
1063 grpc::internal::CallbackWithSuccessTag write_tag_;
1064
1065 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1066 grpc::internal::CallOpClientSendClose>
1067 writes_done_ops_;
1068 grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1069
1070 struct StartCallBacklog {
1071 bool write_ops = false;
1072 bool writes_done_ops = false;
1073 };
1074 StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
1075
1076 // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1077 std::atomic<intptr_t> callbacks_outstanding_{3};
1078 std::atomic_bool started_{false};
1079 grpc::internal::Mutex start_mu_;
1080 };
1081
1082 template <class Request>
1083 class ClientCallbackWriterFactory {
1084 public:
1085 template <class Response>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)1086 static void Create(::grpc::ChannelInterface* channel,
1087 const ::grpc::internal::RpcMethod& method,
1088 ::grpc::ClientContext* context, Response* response,
1089 ClientWriteReactor<Request>* reactor) {
1090 grpc::internal::Call call =
1091 channel->CreateCall(method, context, channel->CallbackCQ());
1092
1093 ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
1094 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
1095 call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
1096 ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1097 }
1098 };
1099
1100 class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
1101 public:
1102 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)1103 static void operator delete(void* /*ptr*/, std::size_t size) {
1104 GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackUnaryImpl));
1105 }
1106
1107 // This operator should never be called as the memory should be freed as part
1108 // of the arena destruction. It only exists to provide a matching operator
1109 // delete to the operator new so that some compilers will not complain (see
1110 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1111 // there are no tests catching the compiler warning.
delete(void *,void *)1112 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
1113
StartCall()1114 void StartCall() override {
1115 // This call initiates two batches, each with a callback
1116 // 1. Send initial metadata + write + writes done + recv initial metadata
1117 // 2. Read message, recv trailing metadata
1118
1119 start_tag_.Set(
1120 call_.call(),
1121 [this](bool ok) {
1122 reactor_->OnReadInitialMetadataDone(ok);
1123 MaybeFinish();
1124 },
1125 &start_ops_, /*can_inline=*/false);
1126 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1127 context_->initial_metadata_flags());
1128 start_ops_.RecvInitialMetadata(context_);
1129 start_ops_.set_core_cq_tag(&start_tag_);
1130 call_.PerformOps(&start_ops_);
1131
1132 finish_tag_.Set(
1133 call_.call(), [this](bool /*ok*/) { MaybeFinish(); }, &finish_ops_,
1134 /*can_inline=*/false);
1135 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1136 finish_ops_.set_core_cq_tag(&finish_tag_);
1137 call_.PerformOps(&finish_ops_);
1138 }
1139
1140 private:
1141 friend class ClientCallbackUnaryFactory;
1142
1143 template <class Request, class Response>
ClientCallbackUnaryImpl(::grpc::internal::Call call,::grpc::ClientContext * context,Request * request,Response * response,ClientUnaryReactor * reactor)1144 ClientCallbackUnaryImpl(::grpc::internal::Call call,
1145 ::grpc::ClientContext* context, Request* request,
1146 Response* response, ClientUnaryReactor* reactor)
1147 : context_(context), call_(call), reactor_(reactor) {
1148 this->BindReactor(reactor);
1149 // TODO(vjpai): don't assert
1150 GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
1151 start_ops_.ClientSendClose();
1152 finish_ops_.RecvMessage(response);
1153 finish_ops_.AllowNoMessage();
1154 }
1155
1156 // In the unary case, MaybeFinish is only ever invoked from a
1157 // library-initiated reaction, so it will just directly call OnDone if this is
1158 // the last reaction for this RPC.
MaybeFinish()1159 void MaybeFinish() {
1160 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1161 1, std::memory_order_acq_rel) == 1)) {
1162 ::grpc::Status s = std::move(finish_status_);
1163 auto* reactor = reactor_;
1164 auto* call = call_.call();
1165 this->~ClientCallbackUnaryImpl();
1166 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
1167 reactor->OnDone(s);
1168 }
1169 }
1170
1171 ::grpc::ClientContext* const context_;
1172 grpc::internal::Call call_;
1173 ClientUnaryReactor* const reactor_;
1174
1175 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1176 grpc::internal::CallOpSendMessage,
1177 grpc::internal::CallOpClientSendClose,
1178 grpc::internal::CallOpRecvInitialMetadata>
1179 start_ops_;
1180 grpc::internal::CallbackWithSuccessTag start_tag_;
1181
1182 grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1183 grpc::internal::CallOpClientRecvStatus>
1184 finish_ops_;
1185 grpc::internal::CallbackWithSuccessTag finish_tag_;
1186 ::grpc::Status finish_status_;
1187
1188 // This call will have 2 callbacks: start and finish
1189 std::atomic<intptr_t> callbacks_outstanding_{2};
1190 };
1191
1192 class ClientCallbackUnaryFactory {
1193 public:
1194 template <class Request, class Response, class BaseRequest = Request,
1195 class BaseResponse = Response>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc::ClientContext * context,const Request * request,Response * response,ClientUnaryReactor * reactor)1196 static void Create(::grpc::ChannelInterface* channel,
1197 const ::grpc::internal::RpcMethod& method,
1198 ::grpc::ClientContext* context, const Request* request,
1199 Response* response, ClientUnaryReactor* reactor) {
1200 grpc::internal::Call call =
1201 channel->CreateCall(method, context, channel->CallbackCQ());
1202
1203 ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
1204
1205 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
1206 call.call(), sizeof(ClientCallbackUnaryImpl)))
1207 ClientCallbackUnaryImpl(call, context,
1208 static_cast<const BaseRequest*>(request),
1209 static_cast<BaseResponse*>(response), reactor);
1210 }
1211 };
1212
1213 } // namespace internal
1214
1215 // TODO(vjpai): Remove namespace experimental when de-experimentalized fully.
1216 namespace experimental {
1217
1218 template <class Response>
1219 using ClientCallbackReader = ::grpc::ClientCallbackReader<Response>;
1220
1221 template <class Request>
1222 using ClientCallbackWriter = ::grpc::ClientCallbackWriter<Request>;
1223
1224 template <class Request, class Response>
1225 using ClientCallbackReaderWriter =
1226 ::grpc::ClientCallbackReaderWriter<Request, Response>;
1227
1228 template <class Response>
1229 using ClientReadReactor = ::grpc::ClientReadReactor<Response>;
1230
1231 template <class Request>
1232 using ClientWriteReactor = ::grpc::ClientWriteReactor<Request>;
1233
1234 template <class Request, class Response>
1235 using ClientBidiReactor = ::grpc::ClientBidiReactor<Request, Response>;
1236
1237 typedef ::grpc::ClientUnaryReactor ClientUnaryReactor;
1238
1239 } // namespace experimental
1240
1241 } // namespace grpc
1242 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_H
1243