1 /*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #ifndef GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
19 #define GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
20 #include <atomic>
21 #include <functional>
22
23 #include <grpcpp/impl/codegen/call.h>
24 #include <grpcpp/impl/codegen/call_op_set.h>
25 #include <grpcpp/impl/codegen/callback_common.h>
26 #include <grpcpp/impl/codegen/channel_interface.h>
27 #include <grpcpp/impl/codegen/config.h>
28 #include <grpcpp/impl/codegen/core_codegen_interface.h>
29 #include <grpcpp/impl/codegen/status.h>
30
31 namespace grpc {
32 namespace internal {
33 class RpcMethod;
34 } // namespace internal
35 } // namespace grpc
36
37 namespace grpc_impl {
38 class Channel;
39 class ClientContext;
40
41 namespace internal {
42
43 /// Perform a callback-based unary call
44 /// TODO(vjpai): Combine as much as possible with the blocking unary call code
45 template <class InputMessage, class OutputMessage>
CallbackUnaryCall(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (::grpc::Status)> on_completion)46 void CallbackUnaryCall(::grpc::ChannelInterface* channel,
47 const ::grpc::internal::RpcMethod& method,
48 ::grpc_impl::ClientContext* context,
49 const InputMessage* request, OutputMessage* result,
50 std::function<void(::grpc::Status)> on_completion) {
51 CallbackUnaryCallImpl<InputMessage, OutputMessage> x(
52 channel, method, context, request, result, on_completion);
53 }
54
55 template <class InputMessage, class OutputMessage>
56 class CallbackUnaryCallImpl {
57 public:
CallbackUnaryCallImpl(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,const InputMessage * request,OutputMessage * result,std::function<void (::grpc::Status)> on_completion)58 CallbackUnaryCallImpl(::grpc::ChannelInterface* channel,
59 const ::grpc::internal::RpcMethod& method,
60 ::grpc_impl::ClientContext* context,
61 const InputMessage* request, OutputMessage* result,
62 std::function<void(::grpc::Status)> on_completion) {
63 ::grpc_impl::CompletionQueue* cq = channel->CallbackCQ();
64 GPR_CODEGEN_ASSERT(cq != nullptr);
65 grpc::internal::Call call(channel->CreateCall(method, context, cq));
66
67 using FullCallOpSet = grpc::internal::CallOpSet<
68 ::grpc::internal::CallOpSendInitialMetadata,
69 grpc::internal::CallOpSendMessage,
70 grpc::internal::CallOpRecvInitialMetadata,
71 grpc::internal::CallOpRecvMessage<OutputMessage>,
72 grpc::internal::CallOpClientSendClose,
73 grpc::internal::CallOpClientRecvStatus>;
74
75 struct OpSetAndTag {
76 FullCallOpSet opset;
77 grpc::internal::CallbackWithStatusTag tag;
78 };
79 const size_t alloc_sz = sizeof(OpSetAndTag);
80 auto* const alloced = static_cast<OpSetAndTag*>(
81 ::grpc::g_core_codegen_interface->grpc_call_arena_alloc(call.call(),
82 alloc_sz));
83 auto* ops = new (&alloced->opset) FullCallOpSet;
84 auto* tag = new (&alloced->tag)
85 grpc::internal::CallbackWithStatusTag(call.call(), on_completion, ops);
86
87 // TODO(vjpai): Unify code with sync API as much as possible
88 ::grpc::Status s = ops->SendMessagePtr(request);
89 if (!s.ok()) {
90 tag->force_run(s);
91 return;
92 }
93 ops->SendInitialMetadata(&context->send_initial_metadata_,
94 context->initial_metadata_flags());
95 ops->RecvInitialMetadata(context);
96 ops->RecvMessage(result);
97 ops->AllowNoMessage();
98 ops->ClientSendClose();
99 ops->ClientRecvStatus(context, tag->status_ptr());
100 ops->set_core_cq_tag(tag);
101 call.PerformOps(ops);
102 }
103 };
104
105 // Base class for public API classes.
106 class ClientReactor {
107 public:
108 /// Called by the library when all operations associated with this RPC have
109 /// completed and all Holds have been removed. OnDone provides the RPC status
110 /// outcome for both successful and failed RPCs. If it is never called on an
111 /// RPC, it indicates an application-level problem (like failure to remove a
112 /// hold).
113 ///
114 /// \param[in] s The status outcome of this RPC
115 virtual void OnDone(const ::grpc::Status& /*s*/) = 0;
116
117 /// InternalScheduleOnDone is not part of the API and is not meant to be
118 /// overridden. It is virtual to allow successful builds for certain bazel
119 /// build users that only want to depend on gRPC codegen headers and not the
120 /// full library (although this is not a generally-supported option). Although
121 /// the virtual call is slower than a direct call, this function is
122 /// heavyweight and the cost of the virtual call is not much in comparison.
123 /// This function may be removed or devirtualized in the future.
124 virtual void InternalScheduleOnDone(::grpc::Status s);
125 };
126
127 } // namespace internal
128
129 // Forward declarations
130 template <class Request, class Response>
131 class ClientBidiReactor;
132 template <class Response>
133 class ClientReadReactor;
134 template <class Request>
135 class ClientWriteReactor;
136 class ClientUnaryReactor;
137
138 // NOTE: The streaming objects are not actually implemented in the public API.
139 // These interfaces are provided for mocking only. Typical applications
140 // will interact exclusively with the reactors that they define.
141 template <class Request, class Response>
142 class ClientCallbackReaderWriter {
143 public:
~ClientCallbackReaderWriter()144 virtual ~ClientCallbackReaderWriter() {}
145 virtual void StartCall() = 0;
146 virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
147 virtual void WritesDone() = 0;
148 virtual void Read(Response* resp) = 0;
149 virtual void AddHold(int holds) = 0;
150 virtual void RemoveHold() = 0;
151
152 protected:
BindReactor(ClientBidiReactor<Request,Response> * reactor)153 void BindReactor(ClientBidiReactor<Request, Response>* reactor) {
154 reactor->BindStream(this);
155 }
156 };
157
158 template <class Response>
159 class ClientCallbackReader {
160 public:
~ClientCallbackReader()161 virtual ~ClientCallbackReader() {}
162 virtual void StartCall() = 0;
163 virtual void Read(Response* resp) = 0;
164 virtual void AddHold(int holds) = 0;
165 virtual void RemoveHold() = 0;
166
167 protected:
BindReactor(ClientReadReactor<Response> * reactor)168 void BindReactor(ClientReadReactor<Response>* reactor) {
169 reactor->BindReader(this);
170 }
171 };
172
173 template <class Request>
174 class ClientCallbackWriter {
175 public:
~ClientCallbackWriter()176 virtual ~ClientCallbackWriter() {}
177 virtual void StartCall() = 0;
Write(const Request * req)178 void Write(const Request* req) { Write(req, ::grpc::WriteOptions()); }
179 virtual void Write(const Request* req, ::grpc::WriteOptions options) = 0;
WriteLast(const Request * req,::grpc::WriteOptions options)180 void WriteLast(const Request* req, ::grpc::WriteOptions options) {
181 Write(req, options.set_last_message());
182 }
183 virtual void WritesDone() = 0;
184
185 virtual void AddHold(int holds) = 0;
186 virtual void RemoveHold() = 0;
187
188 protected:
BindReactor(ClientWriteReactor<Request> * reactor)189 void BindReactor(ClientWriteReactor<Request>* reactor) {
190 reactor->BindWriter(this);
191 }
192 };
193
194 class ClientCallbackUnary {
195 public:
~ClientCallbackUnary()196 virtual ~ClientCallbackUnary() {}
197 virtual void StartCall() = 0;
198
199 protected:
200 void BindReactor(ClientUnaryReactor* reactor);
201 };
202
203 // The following classes are the reactor interfaces that are to be implemented
204 // by the user. They are passed in to the library as an argument to a call on a
205 // stub (either a codegen-ed call or a generic call). The streaming RPC is
206 // activated by calling StartCall, possibly after initiating StartRead,
207 // StartWrite, or AddHold operations on the streaming object. Note that none of
208 // the classes are pure; all reactions have a default empty reaction so that the
209 // user class only needs to override those classes that it cares about.
210 // The reactor must be passed to the stub invocation before any of the below
211 // operations can be called.
212
213 /// \a ClientBidiReactor is the interface for a bidirectional streaming RPC.
214 template <class Request, class Response>
215 class ClientBidiReactor : public internal::ClientReactor {
216 public:
~ClientBidiReactor()217 virtual ~ClientBidiReactor() {}
218
219 /// Activate the RPC and initiate any reads or writes that have been Start'ed
220 /// before this call. All streaming RPCs issued by the client MUST have
221 /// StartCall invoked on them (even if they are canceled) as this call is the
222 /// activation of their lifecycle.
StartCall()223 void StartCall() { stream_->StartCall(); }
224
225 /// Initiate a read operation (or post it for later initiation if StartCall
226 /// has not yet been invoked).
227 ///
228 /// \param[out] resp Where to eventually store the read message. Valid when
229 /// the library calls OnReadDone
StartRead(Response * resp)230 void StartRead(Response* resp) { stream_->Read(resp); }
231
232 /// Initiate a write operation (or post it for later initiation if StartCall
233 /// has not yet been invoked).
234 ///
235 /// \param[in] req The message to be written. The library does not take
236 /// ownership but the caller must ensure that the message is
237 /// not deleted or modified until OnWriteDone is called.
StartWrite(const Request * req)238 void StartWrite(const Request* req) {
239 StartWrite(req, ::grpc::WriteOptions());
240 }
241
242 /// Initiate/post a write operation with specified options.
243 ///
244 /// \param[in] req The message to be written. The library does not take
245 /// ownership but the caller must ensure that the message is
246 /// not deleted or modified until OnWriteDone is called.
247 /// \param[in] options The WriteOptions to use for writing this message
StartWrite(const Request * req,::grpc::WriteOptions options)248 void StartWrite(const Request* req, ::grpc::WriteOptions options) {
249 stream_->Write(req, std::move(options));
250 }
251
252 /// Initiate/post a write operation with specified options and an indication
253 /// that this is the last write (like StartWrite and StartWritesDone, merged).
254 /// Note that calling this means that no more calls to StartWrite,
255 /// StartWriteLast, or StartWritesDone are allowed.
256 ///
257 /// \param[in] req The message to be written. The library does not take
258 /// ownership but the caller must ensure that the message is
259 /// not deleted or modified until OnWriteDone is called.
260 /// \param[in] options The WriteOptions to use for writing this message
StartWriteLast(const Request * req,::grpc::WriteOptions options)261 void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
262 StartWrite(req, std::move(options.set_last_message()));
263 }
264
265 /// Indicate that the RPC will have no more write operations. This can only be
266 /// issued once for a given RPC. This is not required or allowed if
267 /// StartWriteLast is used since that already has the same implication.
268 /// Note that calling this means that no more calls to StartWrite,
269 /// StartWriteLast, or StartWritesDone are allowed.
StartWritesDone()270 void StartWritesDone() { stream_->WritesDone(); }
271
272 /// Holds are needed if (and only if) this stream has operations that take
273 /// place on it after StartCall but from outside one of the reactions
274 /// (OnReadDone, etc). This is _not_ a common use of the streaming API.
275 ///
276 /// Holds must be added before calling StartCall. If a stream still has a hold
277 /// in place, its resources will not be destroyed even if the status has
278 /// already come in from the wire and there are currently no active callbacks
279 /// outstanding. Similarly, the stream will not call OnDone if there are still
280 /// holds on it.
281 ///
282 /// For example, if a StartRead or StartWrite operation is going to be
283 /// initiated from elsewhere in the application, the application should call
284 /// AddHold or AddMultipleHolds before StartCall. If there is going to be,
285 /// for example, a read-flow and a write-flow taking place outside the
286 /// reactions, then call AddMultipleHolds(2) before StartCall. When the
287 /// application knows that it won't issue any more read operations (such as
288 /// when a read comes back as not ok), it should issue a RemoveHold(). It
289 /// should also call RemoveHold() again after it does StartWriteLast or
290 /// StartWritesDone that indicates that there will be no more write ops.
291 /// The number of RemoveHold calls must match the total number of AddHold
292 /// calls plus the number of holds added by AddMultipleHolds.
293 /// The argument to AddMultipleHolds must be positive.
AddHold()294 void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)295 void AddMultipleHolds(int holds) {
296 GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
297 stream_->AddHold(holds);
298 }
RemoveHold()299 void RemoveHold() { stream_->RemoveHold(); }
300
301 /// Notifies the application that all operations associated with this RPC
302 /// have completed and all Holds have been removed. OnDone provides the RPC
303 /// status outcome for both successful and failed RPCs and will be called in
304 /// all cases. If it is not called, it indicates an application-level problem
305 /// (like failure to remove a hold).
306 ///
307 /// \param[in] s The status outcome of this RPC
OnDone(const::grpc::Status &)308 void OnDone(const ::grpc::Status& /*s*/) override {}
309
310 /// Notifies the application that a read of initial metadata from the
311 /// server is done. If the application chooses not to implement this method,
312 /// it can assume that the initial metadata has been read before the first
313 /// call of OnReadDone or OnDone.
314 ///
315 /// \param[in] ok Was the initial metadata read successfully? If false, no
316 /// new read/write operation will succeed, and any further
317 /// Start* operations should not be called.
OnReadInitialMetadataDone(bool)318 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
319
320 /// Notifies the application that a StartRead operation completed.
321 ///
322 /// \param[in] ok Was it successful? If false, no new read/write operation
323 /// will succeed, and any further Start* should not be called.
OnReadDone(bool)324 virtual void OnReadDone(bool /*ok*/) {}
325
326 /// Notifies the application that a StartWrite or StartWriteLast operation
327 /// completed.
328 ///
329 /// \param[in] ok Was it successful? If false, no new read/write operation
330 /// will succeed, and any further Start* should not be called.
OnWriteDone(bool)331 virtual void OnWriteDone(bool /*ok*/) {}
332
333 /// Notifies the application that a StartWritesDone operation completed. Note
334 /// that this is only used on explicit StartWritesDone operations and not for
335 /// those that are implicitly invoked as part of a StartWriteLast.
336 ///
337 /// \param[in] ok Was it successful? If false, the application will later see
338 /// the failure reflected as a bad status in OnDone and no
339 /// further Start* should be called.
OnWritesDoneDone(bool)340 virtual void OnWritesDoneDone(bool /*ok*/) {}
341
342 private:
343 friend class ClientCallbackReaderWriter<Request, Response>;
BindStream(ClientCallbackReaderWriter<Request,Response> * stream)344 void BindStream(ClientCallbackReaderWriter<Request, Response>* stream) {
345 stream_ = stream;
346 }
347 ClientCallbackReaderWriter<Request, Response>* stream_;
348 };
349
350 /// \a ClientReadReactor is the interface for a server-streaming RPC.
351 /// All public methods behave as in ClientBidiReactor.
352 template <class Response>
353 class ClientReadReactor : public internal::ClientReactor {
354 public:
~ClientReadReactor()355 virtual ~ClientReadReactor() {}
356
StartCall()357 void StartCall() { reader_->StartCall(); }
StartRead(Response * resp)358 void StartRead(Response* resp) { reader_->Read(resp); }
359
AddHold()360 void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)361 void AddMultipleHolds(int holds) {
362 GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
363 reader_->AddHold(holds);
364 }
RemoveHold()365 void RemoveHold() { reader_->RemoveHold(); }
366
OnDone(const::grpc::Status &)367 void OnDone(const ::grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)368 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnReadDone(bool)369 virtual void OnReadDone(bool /*ok*/) {}
370
371 private:
372 friend class ClientCallbackReader<Response>;
BindReader(ClientCallbackReader<Response> * reader)373 void BindReader(ClientCallbackReader<Response>* reader) { reader_ = reader; }
374 ClientCallbackReader<Response>* reader_;
375 };
376
377 /// \a ClientWriteReactor is the interface for a client-streaming RPC.
378 /// All public methods behave as in ClientBidiReactor.
379 template <class Request>
380 class ClientWriteReactor : public internal::ClientReactor {
381 public:
~ClientWriteReactor()382 virtual ~ClientWriteReactor() {}
383
StartCall()384 void StartCall() { writer_->StartCall(); }
StartWrite(const Request * req)385 void StartWrite(const Request* req) {
386 StartWrite(req, ::grpc::WriteOptions());
387 }
StartWrite(const Request * req,::grpc::WriteOptions options)388 void StartWrite(const Request* req, ::grpc::WriteOptions options) {
389 writer_->Write(req, std::move(options));
390 }
StartWriteLast(const Request * req,::grpc::WriteOptions options)391 void StartWriteLast(const Request* req, ::grpc::WriteOptions options) {
392 StartWrite(req, std::move(options.set_last_message()));
393 }
StartWritesDone()394 void StartWritesDone() { writer_->WritesDone(); }
395
AddHold()396 void AddHold() { AddMultipleHolds(1); }
AddMultipleHolds(int holds)397 void AddMultipleHolds(int holds) {
398 GPR_CODEGEN_DEBUG_ASSERT(holds > 0);
399 writer_->AddHold(holds);
400 }
RemoveHold()401 void RemoveHold() { writer_->RemoveHold(); }
402
OnDone(const::grpc::Status &)403 void OnDone(const ::grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)404 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
OnWriteDone(bool)405 virtual void OnWriteDone(bool /*ok*/) {}
OnWritesDoneDone(bool)406 virtual void OnWritesDoneDone(bool /*ok*/) {}
407
408 private:
409 friend class ClientCallbackWriter<Request>;
BindWriter(ClientCallbackWriter<Request> * writer)410 void BindWriter(ClientCallbackWriter<Request>* writer) { writer_ = writer; }
411
412 ClientCallbackWriter<Request>* writer_;
413 };
414
415 /// \a ClientUnaryReactor is a reactor-style interface for a unary RPC.
416 /// This is _not_ a common way of invoking a unary RPC. In practice, this
417 /// option should be used only if the unary RPC wants to receive initial
418 /// metadata without waiting for the response to complete. Most deployments of
419 /// RPC systems do not use this option, but it is needed for generality.
420 /// All public methods behave as in ClientBidiReactor.
421 /// StartCall is included for consistency with the other reactor flavors: even
422 /// though there are no StartRead or StartWrite operations to queue before the
423 /// call (that is part of the unary call itself) and there is no reactor object
424 /// being created as a result of this call, we keep a consistent 2-phase
425 /// initiation API among all the reactor flavors.
426 class ClientUnaryReactor : public internal::ClientReactor {
427 public:
~ClientUnaryReactor()428 virtual ~ClientUnaryReactor() {}
429
StartCall()430 void StartCall() { call_->StartCall(); }
OnDone(const::grpc::Status &)431 void OnDone(const ::grpc::Status& /*s*/) override {}
OnReadInitialMetadataDone(bool)432 virtual void OnReadInitialMetadataDone(bool /*ok*/) {}
433
434 private:
435 friend class ClientCallbackUnary;
BindCall(ClientCallbackUnary * call)436 void BindCall(ClientCallbackUnary* call) { call_ = call; }
437 ClientCallbackUnary* call_;
438 };
439
440 // Define function out-of-line from class to avoid forward declaration issue
BindReactor(ClientUnaryReactor * reactor)441 inline void ClientCallbackUnary::BindReactor(ClientUnaryReactor* reactor) {
442 reactor->BindCall(this);
443 }
444
445 namespace internal {
446
447 // Forward declare factory classes for friendship
448 template <class Request, class Response>
449 class ClientCallbackReaderWriterFactory;
450 template <class Response>
451 class ClientCallbackReaderFactory;
452 template <class Request>
453 class ClientCallbackWriterFactory;
454
455 template <class Request, class Response>
456 class ClientCallbackReaderWriterImpl
457 : public ClientCallbackReaderWriter<Request, Response> {
458 public:
459 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)460 static void operator delete(void* /*ptr*/, std::size_t size) {
461 GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderWriterImpl));
462 }
463
464 // This operator should never be called as the memory should be freed as part
465 // of the arena destruction. It only exists to provide a matching operator
466 // delete to the operator new so that some compilers will not complain (see
467 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
468 // there are no tests catching the compiler warning.
delete(void *,void *)469 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
470
StartCall()471 void StartCall() override {
472 // This call initiates two batches, plus any backlog, each with a callback
473 // 1. Send initial metadata (unless corked) + recv initial metadata
474 // 2. Any read backlog
475 // 3. Any write backlog
476 // 4. Recv trailing metadata (unless corked)
477 if (!start_corked_) {
478 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
479 context_->initial_metadata_flags());
480 }
481
482 call_.PerformOps(&start_ops_);
483
484 {
485 grpc::internal::MutexLock lock(&start_mu_);
486
487 if (backlog_.read_ops) {
488 call_.PerformOps(&read_ops_);
489 }
490 if (backlog_.write_ops) {
491 call_.PerformOps(&write_ops_);
492 }
493 if (backlog_.writes_done_ops) {
494 call_.PerformOps(&writes_done_ops_);
495 }
496 call_.PerformOps(&finish_ops_);
497 // The last thing in this critical section is to set started_ so that it
498 // can be used lock-free as well.
499 started_.store(true, std::memory_order_release);
500 }
501 // MaybeFinish outside the lock to make sure that destruction of this object
502 // doesn't take place while holding the lock (which would cause the lock to
503 // be released after destruction)
504 this->MaybeFinish(/*from_reaction=*/false);
505 }
506
Read(Response * msg)507 void Read(Response* msg) override {
508 read_ops_.RecvMessage(msg);
509 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
510 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
511 grpc::internal::MutexLock lock(&start_mu_);
512 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
513 backlog_.read_ops = true;
514 return;
515 }
516 }
517 call_.PerformOps(&read_ops_);
518 }
519
Write(const Request * msg,::grpc::WriteOptions options)520 void Write(const Request* msg, ::grpc::WriteOptions options) override {
521 if (options.is_last_message()) {
522 options.set_buffer_hint();
523 write_ops_.ClientSendClose();
524 }
525 // TODO(vjpai): don't assert
526 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
527 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
528 if (GPR_UNLIKELY(corked_write_needed_)) {
529 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
530 context_->initial_metadata_flags());
531 corked_write_needed_ = false;
532 }
533
534 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
535 grpc::internal::MutexLock lock(&start_mu_);
536 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
537 backlog_.write_ops = true;
538 return;
539 }
540 }
541 call_.PerformOps(&write_ops_);
542 }
WritesDone()543 void WritesDone() override {
544 writes_done_ops_.ClientSendClose();
545 writes_done_tag_.Set(call_.call(),
546 [this](bool ok) {
547 reactor_->OnWritesDoneDone(ok);
548 MaybeFinish(/*from_reaction=*/true);
549 },
550 &writes_done_ops_, /*can_inline=*/false);
551 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
552 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
553 if (GPR_UNLIKELY(corked_write_needed_)) {
554 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
555 context_->initial_metadata_flags());
556 corked_write_needed_ = false;
557 }
558 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
559 grpc::internal::MutexLock lock(&start_mu_);
560 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
561 backlog_.writes_done_ops = true;
562 return;
563 }
564 }
565 call_.PerformOps(&writes_done_ops_);
566 }
567
AddHold(int holds)568 void AddHold(int holds) override {
569 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
570 }
RemoveHold()571 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
572
573 private:
574 friend class ClientCallbackReaderWriterFactory<Request, Response>;
575
ClientCallbackReaderWriterImpl(grpc::internal::Call call,::grpc_impl::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)576 ClientCallbackReaderWriterImpl(grpc::internal::Call call,
577 ::grpc_impl::ClientContext* context,
578 ClientBidiReactor<Request, Response>* reactor)
579 : context_(context),
580 call_(call),
581 reactor_(reactor),
582 start_corked_(context_->initial_metadata_corked_),
583 corked_write_needed_(start_corked_) {
584 this->BindReactor(reactor);
585
586 // Set up the unchanging parts of the start, read, and write tags and ops.
587 start_tag_.Set(call_.call(),
588 [this](bool ok) {
589 reactor_->OnReadInitialMetadataDone(ok);
590 MaybeFinish(/*from_reaction=*/true);
591 },
592 &start_ops_, /*can_inline=*/false);
593 start_ops_.RecvInitialMetadata(context_);
594 start_ops_.set_core_cq_tag(&start_tag_);
595
596 write_tag_.Set(call_.call(),
597 [this](bool ok) {
598 reactor_->OnWriteDone(ok);
599 MaybeFinish(/*from_reaction=*/true);
600 },
601 &write_ops_, /*can_inline=*/false);
602 write_ops_.set_core_cq_tag(&write_tag_);
603
604 read_tag_.Set(call_.call(),
605 [this](bool ok) {
606 reactor_->OnReadDone(ok);
607 MaybeFinish(/*from_reaction=*/true);
608 },
609 &read_ops_, /*can_inline=*/false);
610 read_ops_.set_core_cq_tag(&read_tag_);
611
612 // Also set up the Finish tag and op set.
613 finish_tag_.Set(
614 call_.call(),
615 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
616 &finish_ops_,
617 /*can_inline=*/false);
618 finish_ops_.ClientRecvStatus(context_, &finish_status_);
619 finish_ops_.set_core_cq_tag(&finish_tag_);
620 }
621
622 // MaybeFinish can be called from reactions or from user-initiated operations
623 // like StartCall or RemoveHold. If this is the last operation or hold on this
624 // object, it will invoke the OnDone reaction. If MaybeFinish was called from
625 // a reaction, it can call OnDone directly. If not, it would need to schedule
626 // OnDone onto an executor thread to avoid the possibility of deadlocking with
627 // any locks in the user code that invoked it.
MaybeFinish(bool from_reaction)628 void MaybeFinish(bool from_reaction) {
629 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
630 1, std::memory_order_acq_rel) == 1)) {
631 ::grpc::Status s = std::move(finish_status_);
632 auto* reactor = reactor_;
633 auto* call = call_.call();
634 this->~ClientCallbackReaderWriterImpl();
635 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
636 if (GPR_LIKELY(from_reaction)) {
637 reactor->OnDone(s);
638 } else {
639 reactor->InternalScheduleOnDone(std::move(s));
640 }
641 }
642 }
643
644 ::grpc_impl::ClientContext* const context_;
645 grpc::internal::Call call_;
646 ClientBidiReactor<Request, Response>* const reactor_;
647
648 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
649 grpc::internal::CallOpRecvInitialMetadata>
650 start_ops_;
651 grpc::internal::CallbackWithSuccessTag start_tag_;
652 const bool start_corked_;
653 bool corked_write_needed_; // no lock needed since only accessed in
654 // Write/WritesDone which cannot be concurrent
655
656 grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
657 grpc::internal::CallbackWithSuccessTag finish_tag_;
658 ::grpc::Status finish_status_;
659
660 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
661 grpc::internal::CallOpSendMessage,
662 grpc::internal::CallOpClientSendClose>
663 write_ops_;
664 grpc::internal::CallbackWithSuccessTag write_tag_;
665
666 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
667 grpc::internal::CallOpClientSendClose>
668 writes_done_ops_;
669 grpc::internal::CallbackWithSuccessTag writes_done_tag_;
670
671 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
672 read_ops_;
673 grpc::internal::CallbackWithSuccessTag read_tag_;
674
675 struct StartCallBacklog {
676 bool write_ops = false;
677 bool writes_done_ops = false;
678 bool read_ops = false;
679 };
680 StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
681
682 // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
683 std::atomic<intptr_t> callbacks_outstanding_{3};
684 std::atomic_bool started_{false};
685 grpc::internal::Mutex start_mu_;
686 };
687
688 template <class Request, class Response>
689 class ClientCallbackReaderWriterFactory {
690 public:
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,ClientBidiReactor<Request,Response> * reactor)691 static void Create(::grpc::ChannelInterface* channel,
692 const ::grpc::internal::RpcMethod& method,
693 ::grpc_impl::ClientContext* context,
694 ClientBidiReactor<Request, Response>* reactor) {
695 grpc::internal::Call call =
696 channel->CreateCall(method, context, channel->CallbackCQ());
697
698 ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
699 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
700 call.call(), sizeof(ClientCallbackReaderWriterImpl<Request, Response>)))
701 ClientCallbackReaderWriterImpl<Request, Response>(call, context,
702 reactor);
703 }
704 };
705
706 template <class Response>
707 class ClientCallbackReaderImpl : public ClientCallbackReader<Response> {
708 public:
709 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)710 static void operator delete(void* /*ptr*/, std::size_t size) {
711 GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackReaderImpl));
712 }
713
714 // This operator should never be called as the memory should be freed as part
715 // of the arena destruction. It only exists to provide a matching operator
716 // delete to the operator new so that some compilers will not complain (see
717 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
718 // there are no tests catching the compiler warning.
delete(void *,void *)719 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
720
StartCall()721 void StartCall() override {
722 // This call initiates two batches, plus any backlog, each with a callback
723 // 1. Send initial metadata (unless corked) + recv initial metadata
724 // 2. Any backlog
725 // 3. Recv trailing metadata
726
727 start_tag_.Set(call_.call(),
728 [this](bool ok) {
729 reactor_->OnReadInitialMetadataDone(ok);
730 MaybeFinish(/*from_reaction=*/true);
731 },
732 &start_ops_, /*can_inline=*/false);
733 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
734 context_->initial_metadata_flags());
735 start_ops_.RecvInitialMetadata(context_);
736 start_ops_.set_core_cq_tag(&start_tag_);
737 call_.PerformOps(&start_ops_);
738
739 // Also set up the read tag so it doesn't have to be set up each time
740 read_tag_.Set(call_.call(),
741 [this](bool ok) {
742 reactor_->OnReadDone(ok);
743 MaybeFinish(/*from_reaction=*/true);
744 },
745 &read_ops_, /*can_inline=*/false);
746 read_ops_.set_core_cq_tag(&read_tag_);
747
748 {
749 grpc::internal::MutexLock lock(&start_mu_);
750 if (backlog_.read_ops) {
751 call_.PerformOps(&read_ops_);
752 }
753 started_.store(true, std::memory_order_release);
754 }
755
756 finish_tag_.Set(
757 call_.call(),
758 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
759 &finish_ops_, /*can_inline=*/false);
760 finish_ops_.ClientRecvStatus(context_, &finish_status_);
761 finish_ops_.set_core_cq_tag(&finish_tag_);
762 call_.PerformOps(&finish_ops_);
763 }
764
Read(Response * msg)765 void Read(Response* msg) override {
766 read_ops_.RecvMessage(msg);
767 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
768 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
769 grpc::internal::MutexLock lock(&start_mu_);
770 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
771 backlog_.read_ops = true;
772 return;
773 }
774 }
775 call_.PerformOps(&read_ops_);
776 }
777
AddHold(int holds)778 void AddHold(int holds) override {
779 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
780 }
RemoveHold()781 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
782
783 private:
784 friend class ClientCallbackReaderFactory<Response>;
785
786 template <class Request>
ClientCallbackReaderImpl(::grpc::internal::Call call,::grpc_impl::ClientContext * context,Request * request,ClientReadReactor<Response> * reactor)787 ClientCallbackReaderImpl(::grpc::internal::Call call,
788 ::grpc_impl::ClientContext* context,
789 Request* request,
790 ClientReadReactor<Response>* reactor)
791 : context_(context), call_(call), reactor_(reactor) {
792 this->BindReactor(reactor);
793 // TODO(vjpai): don't assert
794 GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
795 start_ops_.ClientSendClose();
796 }
797
798 // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)799 void MaybeFinish(bool from_reaction) {
800 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
801 1, std::memory_order_acq_rel) == 1)) {
802 ::grpc::Status s = std::move(finish_status_);
803 auto* reactor = reactor_;
804 auto* call = call_.call();
805 this->~ClientCallbackReaderImpl();
806 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
807 if (GPR_LIKELY(from_reaction)) {
808 reactor->OnDone(s);
809 } else {
810 reactor->InternalScheduleOnDone(std::move(s));
811 }
812 }
813 }
814
815 ::grpc_impl::ClientContext* const context_;
816 grpc::internal::Call call_;
817 ClientReadReactor<Response>* const reactor_;
818
819 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
820 grpc::internal::CallOpSendMessage,
821 grpc::internal::CallOpClientSendClose,
822 grpc::internal::CallOpRecvInitialMetadata>
823 start_ops_;
824 grpc::internal::CallbackWithSuccessTag start_tag_;
825
826 grpc::internal::CallOpSet<grpc::internal::CallOpClientRecvStatus> finish_ops_;
827 grpc::internal::CallbackWithSuccessTag finish_tag_;
828 ::grpc::Status finish_status_;
829
830 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<Response>>
831 read_ops_;
832 grpc::internal::CallbackWithSuccessTag read_tag_;
833
834 struct StartCallBacklog {
835 bool read_ops = false;
836 };
837 StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
838
839 // Minimum of 2 callbacks to pre-register for start and finish
840 std::atomic<intptr_t> callbacks_outstanding_{2};
841 std::atomic_bool started_{false};
842 grpc::internal::Mutex start_mu_;
843 };
844
845 template <class Response>
846 class ClientCallbackReaderFactory {
847 public:
848 template <class Request>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,const Request * request,ClientReadReactor<Response> * reactor)849 static void Create(::grpc::ChannelInterface* channel,
850 const ::grpc::internal::RpcMethod& method,
851 ::grpc_impl::ClientContext* context,
852 const Request* request,
853 ClientReadReactor<Response>* reactor) {
854 grpc::internal::Call call =
855 channel->CreateCall(method, context, channel->CallbackCQ());
856
857 ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
858 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
859 call.call(), sizeof(ClientCallbackReaderImpl<Response>)))
860 ClientCallbackReaderImpl<Response>(call, context, request, reactor);
861 }
862 };
863
864 template <class Request>
865 class ClientCallbackWriterImpl : public ClientCallbackWriter<Request> {
866 public:
867 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)868 static void operator delete(void* /*ptr*/, std::size_t size) {
869 GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackWriterImpl));
870 }
871
872 // This operator should never be called as the memory should be freed as part
873 // of the arena destruction. It only exists to provide a matching operator
874 // delete to the operator new so that some compilers will not complain (see
875 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
876 // there are no tests catching the compiler warning.
delete(void *,void *)877 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
878
StartCall()879 void StartCall() override {
880 // This call initiates two batches, plus any backlog, each with a callback
881 // 1. Send initial metadata (unless corked) + recv initial metadata
882 // 2. Any backlog
883 // 3. Recv trailing metadata
884
885 if (!start_corked_) {
886 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
887 context_->initial_metadata_flags());
888 }
889 call_.PerformOps(&start_ops_);
890
891 {
892 grpc::internal::MutexLock lock(&start_mu_);
893
894 if (backlog_.write_ops) {
895 call_.PerformOps(&write_ops_);
896 }
897 if (backlog_.writes_done_ops) {
898 call_.PerformOps(&writes_done_ops_);
899 }
900 call_.PerformOps(&finish_ops_);
901 // The last thing in this critical section is to set started_ so that it
902 // can be used lock-free as well.
903 started_.store(true, std::memory_order_release);
904 }
905 // MaybeFinish outside the lock to make sure that destruction of this object
906 // doesn't take place while holding the lock (which would cause the lock to
907 // be released after destruction)
908 this->MaybeFinish(/*from_reaction=*/false);
909 }
910
Write(const Request * msg,::grpc::WriteOptions options)911 void Write(const Request* msg, ::grpc::WriteOptions options) override {
912 if (GPR_UNLIKELY(options.is_last_message())) {
913 options.set_buffer_hint();
914 write_ops_.ClientSendClose();
915 }
916 // TODO(vjpai): don't assert
917 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(msg, options).ok());
918 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
919
920 if (GPR_UNLIKELY(corked_write_needed_)) {
921 write_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
922 context_->initial_metadata_flags());
923 corked_write_needed_ = false;
924 }
925
926 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
927 grpc::internal::MutexLock lock(&start_mu_);
928 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
929 backlog_.write_ops = true;
930 return;
931 }
932 }
933 call_.PerformOps(&write_ops_);
934 }
935
WritesDone()936 void WritesDone() override {
937 writes_done_ops_.ClientSendClose();
938 writes_done_tag_.Set(call_.call(),
939 [this](bool ok) {
940 reactor_->OnWritesDoneDone(ok);
941 MaybeFinish(/*from_reaction=*/true);
942 },
943 &writes_done_ops_, /*can_inline=*/false);
944 writes_done_ops_.set_core_cq_tag(&writes_done_tag_);
945 callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
946
947 if (GPR_UNLIKELY(corked_write_needed_)) {
948 writes_done_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
949 context_->initial_metadata_flags());
950 corked_write_needed_ = false;
951 }
952
953 if (GPR_UNLIKELY(!started_.load(std::memory_order_acquire))) {
954 grpc::internal::MutexLock lock(&start_mu_);
955 if (GPR_LIKELY(!started_.load(std::memory_order_relaxed))) {
956 backlog_.writes_done_ops = true;
957 return;
958 }
959 }
960 call_.PerformOps(&writes_done_ops_);
961 }
962
AddHold(int holds)963 void AddHold(int holds) override {
964 callbacks_outstanding_.fetch_add(holds, std::memory_order_relaxed);
965 }
RemoveHold()966 void RemoveHold() override { MaybeFinish(/*from_reaction=*/false); }
967
968 private:
969 friend class ClientCallbackWriterFactory<Request>;
970
971 template <class Response>
ClientCallbackWriterImpl(::grpc::internal::Call call,::grpc_impl::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)972 ClientCallbackWriterImpl(::grpc::internal::Call call,
973 ::grpc_impl::ClientContext* context,
974 Response* response,
975 ClientWriteReactor<Request>* reactor)
976 : context_(context),
977 call_(call),
978 reactor_(reactor),
979 start_corked_(context_->initial_metadata_corked_),
980 corked_write_needed_(start_corked_) {
981 this->BindReactor(reactor);
982
983 // Set up the unchanging parts of the start and write tags and ops.
984 start_tag_.Set(call_.call(),
985 [this](bool ok) {
986 reactor_->OnReadInitialMetadataDone(ok);
987 MaybeFinish(/*from_reaction=*/true);
988 },
989 &start_ops_, /*can_inline=*/false);
990 start_ops_.RecvInitialMetadata(context_);
991 start_ops_.set_core_cq_tag(&start_tag_);
992
993 write_tag_.Set(call_.call(),
994 [this](bool ok) {
995 reactor_->OnWriteDone(ok);
996 MaybeFinish(/*from_reaction=*/true);
997 },
998 &write_ops_, /*can_inline=*/false);
999 write_ops_.set_core_cq_tag(&write_tag_);
1000
1001 // Also set up the Finish tag and op set.
1002 finish_ops_.RecvMessage(response);
1003 finish_ops_.AllowNoMessage();
1004 finish_tag_.Set(
1005 call_.call(),
1006 [this](bool /*ok*/) { MaybeFinish(/*from_reaction=*/true); },
1007 &finish_ops_,
1008 /*can_inline=*/false);
1009 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1010 finish_ops_.set_core_cq_tag(&finish_tag_);
1011 }
1012
1013 // MaybeFinish behaves as in ClientCallbackReaderWriterImpl.
MaybeFinish(bool from_reaction)1014 void MaybeFinish(bool from_reaction) {
1015 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1016 1, std::memory_order_acq_rel) == 1)) {
1017 ::grpc::Status s = std::move(finish_status_);
1018 auto* reactor = reactor_;
1019 auto* call = call_.call();
1020 this->~ClientCallbackWriterImpl();
1021 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
1022 if (GPR_LIKELY(from_reaction)) {
1023 reactor->OnDone(s);
1024 } else {
1025 reactor->InternalScheduleOnDone(std::move(s));
1026 }
1027 }
1028 }
1029
1030 ::grpc_impl::ClientContext* const context_;
1031 grpc::internal::Call call_;
1032 ClientWriteReactor<Request>* const reactor_;
1033
1034 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1035 grpc::internal::CallOpRecvInitialMetadata>
1036 start_ops_;
1037 grpc::internal::CallbackWithSuccessTag start_tag_;
1038 const bool start_corked_;
1039 bool corked_write_needed_; // no lock needed since only accessed in
1040 // Write/WritesDone which cannot be concurrent
1041
1042 grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1043 grpc::internal::CallOpClientRecvStatus>
1044 finish_ops_;
1045 grpc::internal::CallbackWithSuccessTag finish_tag_;
1046 ::grpc::Status finish_status_;
1047
1048 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1049 grpc::internal::CallOpSendMessage,
1050 grpc::internal::CallOpClientSendClose>
1051 write_ops_;
1052 grpc::internal::CallbackWithSuccessTag write_tag_;
1053
1054 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1055 grpc::internal::CallOpClientSendClose>
1056 writes_done_ops_;
1057 grpc::internal::CallbackWithSuccessTag writes_done_tag_;
1058
1059 struct StartCallBacklog {
1060 bool write_ops = false;
1061 bool writes_done_ops = false;
1062 };
1063 StartCallBacklog backlog_ /* GUARDED_BY(start_mu_) */;
1064
1065 // Minimum of 3 callbacks to pre-register for start ops, StartCall, and finish
1066 std::atomic<intptr_t> callbacks_outstanding_{3};
1067 std::atomic_bool started_{false};
1068 grpc::internal::Mutex start_mu_;
1069 };
1070
1071 template <class Request>
1072 class ClientCallbackWriterFactory {
1073 public:
1074 template <class Response>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,Response * response,ClientWriteReactor<Request> * reactor)1075 static void Create(::grpc::ChannelInterface* channel,
1076 const ::grpc::internal::RpcMethod& method,
1077 ::grpc_impl::ClientContext* context, Response* response,
1078 ClientWriteReactor<Request>* reactor) {
1079 grpc::internal::Call call =
1080 channel->CreateCall(method, context, channel->CallbackCQ());
1081
1082 ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
1083 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
1084 call.call(), sizeof(ClientCallbackWriterImpl<Request>)))
1085 ClientCallbackWriterImpl<Request>(call, context, response, reactor);
1086 }
1087 };
1088
1089 class ClientCallbackUnaryImpl final : public ClientCallbackUnary {
1090 public:
1091 // always allocated against a call arena, no memory free required
delete(void *,std::size_t size)1092 static void operator delete(void* /*ptr*/, std::size_t size) {
1093 GPR_CODEGEN_ASSERT(size == sizeof(ClientCallbackUnaryImpl));
1094 }
1095
1096 // This operator should never be called as the memory should be freed as part
1097 // of the arena destruction. It only exists to provide a matching operator
1098 // delete to the operator new so that some compilers will not complain (see
1099 // https://github.com/grpc/grpc/issues/11301) Note at the time of adding this
1100 // there are no tests catching the compiler warning.
delete(void *,void *)1101 static void operator delete(void*, void*) { GPR_CODEGEN_ASSERT(false); }
1102
StartCall()1103 void StartCall() override {
1104 // This call initiates two batches, each with a callback
1105 // 1. Send initial metadata + write + writes done + recv initial metadata
1106 // 2. Read message, recv trailing metadata
1107
1108 start_tag_.Set(call_.call(),
1109 [this](bool ok) {
1110 reactor_->OnReadInitialMetadataDone(ok);
1111 MaybeFinish();
1112 },
1113 &start_ops_, /*can_inline=*/false);
1114 start_ops_.SendInitialMetadata(&context_->send_initial_metadata_,
1115 context_->initial_metadata_flags());
1116 start_ops_.RecvInitialMetadata(context_);
1117 start_ops_.set_core_cq_tag(&start_tag_);
1118 call_.PerformOps(&start_ops_);
1119
1120 finish_tag_.Set(call_.call(), [this](bool /*ok*/) { MaybeFinish(); },
1121 &finish_ops_,
1122 /*can_inline=*/false);
1123 finish_ops_.ClientRecvStatus(context_, &finish_status_);
1124 finish_ops_.set_core_cq_tag(&finish_tag_);
1125 call_.PerformOps(&finish_ops_);
1126 }
1127
1128 private:
1129 friend class ClientCallbackUnaryFactory;
1130
1131 template <class Request, class Response>
ClientCallbackUnaryImpl(::grpc::internal::Call call,::grpc_impl::ClientContext * context,Request * request,Response * response,ClientUnaryReactor * reactor)1132 ClientCallbackUnaryImpl(::grpc::internal::Call call,
1133 ::grpc_impl::ClientContext* context, Request* request,
1134 Response* response, ClientUnaryReactor* reactor)
1135 : context_(context), call_(call), reactor_(reactor) {
1136 this->BindReactor(reactor);
1137 // TODO(vjpai): don't assert
1138 GPR_CODEGEN_ASSERT(start_ops_.SendMessagePtr(request).ok());
1139 start_ops_.ClientSendClose();
1140 finish_ops_.RecvMessage(response);
1141 finish_ops_.AllowNoMessage();
1142 }
1143
1144 // In the unary case, MaybeFinish is only ever invoked from a
1145 // library-initiated reaction, so it will just directly call OnDone if this is
1146 // the last reaction for this RPC.
MaybeFinish()1147 void MaybeFinish() {
1148 if (GPR_UNLIKELY(callbacks_outstanding_.fetch_sub(
1149 1, std::memory_order_acq_rel) == 1)) {
1150 ::grpc::Status s = std::move(finish_status_);
1151 auto* reactor = reactor_;
1152 auto* call = call_.call();
1153 this->~ClientCallbackUnaryImpl();
1154 ::grpc::g_core_codegen_interface->grpc_call_unref(call);
1155 reactor->OnDone(s);
1156 }
1157 }
1158
1159 ::grpc_impl::ClientContext* const context_;
1160 grpc::internal::Call call_;
1161 ClientUnaryReactor* const reactor_;
1162
1163 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
1164 grpc::internal::CallOpSendMessage,
1165 grpc::internal::CallOpClientSendClose,
1166 grpc::internal::CallOpRecvInitialMetadata>
1167 start_ops_;
1168 grpc::internal::CallbackWithSuccessTag start_tag_;
1169
1170 grpc::internal::CallOpSet<grpc::internal::CallOpGenericRecvMessage,
1171 grpc::internal::CallOpClientRecvStatus>
1172 finish_ops_;
1173 grpc::internal::CallbackWithSuccessTag finish_tag_;
1174 ::grpc::Status finish_status_;
1175
1176 // This call will have 2 callbacks: start and finish
1177 std::atomic<intptr_t> callbacks_outstanding_{2};
1178 };
1179
1180 class ClientCallbackUnaryFactory {
1181 public:
1182 template <class Request, class Response>
Create(::grpc::ChannelInterface * channel,const::grpc::internal::RpcMethod & method,::grpc_impl::ClientContext * context,const Request * request,Response * response,ClientUnaryReactor * reactor)1183 static void Create(::grpc::ChannelInterface* channel,
1184 const ::grpc::internal::RpcMethod& method,
1185 ::grpc_impl::ClientContext* context,
1186 const Request* request, Response* response,
1187 ClientUnaryReactor* reactor) {
1188 grpc::internal::Call call =
1189 channel->CreateCall(method, context, channel->CallbackCQ());
1190
1191 ::grpc::g_core_codegen_interface->grpc_call_ref(call.call());
1192
1193 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
1194 call.call(), sizeof(ClientCallbackUnaryImpl)))
1195 ClientCallbackUnaryImpl(call, context, request, response, reactor);
1196 }
1197 };
1198
1199 } // namespace internal
1200 } // namespace grpc_impl
1201 #endif // GRPCPP_IMPL_CODEGEN_CLIENT_CALLBACK_IMPL_H
1202