• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
20 
21 #include <atomic>
22 #include <functional>
23 #include <type_traits>
24 
25 #include <grpcpp/impl/codegen/call.h>
26 #include <grpcpp/impl/codegen/call_op_set.h>
27 #include <grpcpp/impl/codegen/callback_common.h>
28 #include <grpcpp/impl/codegen/config.h>
29 #include <grpcpp/impl/codegen/core_codegen_interface.h>
30 #include <grpcpp/impl/codegen/message_allocator.h>
31 #include <grpcpp/impl/codegen/status.h>
32 #include <grpcpp/impl/codegen/sync.h>
33 
34 namespace grpc {
35 
36 // Declare base class of all reactors as internal
37 namespace internal {
38 
39 // Forward declarations
40 template <class Request, class Response>
41 class CallbackUnaryHandler;
42 template <class Request, class Response>
43 class CallbackClientStreamingHandler;
44 template <class Request, class Response>
45 class CallbackServerStreamingHandler;
46 template <class Request, class Response>
47 class CallbackBidiHandler;
48 
49 class ServerReactor {
50  public:
51   virtual ~ServerReactor() = default;
52   virtual void OnDone() = 0;
53   virtual void OnCancel() = 0;
54 
55   // The following is not API. It is for internal use only and specifies whether
56   // all reactions of this Reactor can be run without an extra executor
57   // scheduling. This should only be used for internally-defined reactors with
58   // trivial reactions.
InternalInlineable()59   virtual bool InternalInlineable() { return false; }
60 
61  private:
62   template <class Request, class Response>
63   friend class CallbackUnaryHandler;
64   template <class Request, class Response>
65   friend class CallbackClientStreamingHandler;
66   template <class Request, class Response>
67   friend class CallbackServerStreamingHandler;
68   template <class Request, class Response>
69   friend class CallbackBidiHandler;
70 };
71 
72 /// The base class of ServerCallbackUnary etc.
73 class ServerCallbackCall {
74  public:
~ServerCallbackCall()75   virtual ~ServerCallbackCall() {}
76 
77   // This object is responsible for tracking when it is safe to call OnDone and
78   // OnCancel. OnDone should not be called until the method handler is complete,
79   // Finish has been called, the ServerContext CompletionOp (which tracks
80   // cancellation or successful completion) has completed, and all outstanding
81   // Read/Write actions have seen their reactions. OnCancel should not be called
82   // until after the method handler is done and the RPC has completed with a
83   // cancellation. This is tracked by counting how many of these conditions have
84   // been met and calling OnCancel when none remain unmet.
85 
86   // Public versions of MaybeDone: one where we don't know the reactor in
87   // advance (used for the ServerContext CompletionOp), and one for where we
88   // know the inlineability of the OnDone reaction. You should set the inline
89   // flag to true if either the Reactor is InternalInlineable() or if this
90   // callback is already being forced to run dispatched to an executor
91   // (typically because it contains additional work than just the MaybeDone).
92 
MaybeDone()93   void MaybeDone() {
94     if (GPR_UNLIKELY(Unref() == 1)) {
95       ScheduleOnDone(reactor()->InternalInlineable());
96     }
97   }
98 
MaybeDone(bool inline_ondone)99   void MaybeDone(bool inline_ondone) {
100     if (GPR_UNLIKELY(Unref() == 1)) {
101       ScheduleOnDone(inline_ondone);
102     }
103   }
104 
105   // Fast version called with known reactor passed in, used from derived
106   // classes, typically in non-cancel case
MaybeCallOnCancel(ServerReactor * reactor)107   void MaybeCallOnCancel(ServerReactor* reactor) {
108     if (GPR_UNLIKELY(UnblockCancellation())) {
109       CallOnCancel(reactor);
110     }
111   }
112 
113   // Slower version called from object that doesn't know the reactor a priori
114   // (such as the ServerContext CompletionOp which is formed before the
115   // reactor). This is used in cancel cases only, so it's ok to be slower and
116   // invoke a virtual function.
MaybeCallOnCancel()117   void MaybeCallOnCancel() {
118     if (GPR_UNLIKELY(UnblockCancellation())) {
119       CallOnCancel(reactor());
120     }
121   }
122 
123  protected:
124   /// Increases the reference count
Ref()125   void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); }
126 
127  private:
128   virtual ServerReactor* reactor() = 0;
129 
130   // CallOnDone performs the work required at completion of the RPC: invoking
131   // the OnDone function and doing all necessary cleanup. This function is only
132   // ever invoked on a fully-Unref'fed ServerCallbackCall.
133   virtual void CallOnDone() = 0;
134 
135   // If the OnDone reaction is inlineable, execute it inline. Otherwise send it
136   // to an executor.
137   void ScheduleOnDone(bool inline_ondone);
138 
139   // If the OnCancel reaction is inlineable, execute it inline. Otherwise send
140   // it to an executor.
141   void CallOnCancel(ServerReactor* reactor);
142 
143   // Implement the cancellation constraint counter. Return true if OnCancel
144   // should be called, false otherwise.
UnblockCancellation()145   bool UnblockCancellation() {
146     return on_cancel_conditions_remaining_.fetch_sub(
147                1, std::memory_order_acq_rel) == 1;
148   }
149 
150   /// Decreases the reference count and returns the previous value
Unref()151   int Unref() {
152     return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
153   }
154 
155   std::atomic_int on_cancel_conditions_remaining_{2};
156   std::atomic_int callbacks_outstanding_{
157       3};  // reserve for start, Finish, and CompletionOp
158 };
159 
160 template <class Request, class Response>
161 class DefaultMessageHolder
162     : public ::grpc::experimental::MessageHolder<Request, Response> {
163  public:
DefaultMessageHolder()164   DefaultMessageHolder() {
165     this->set_request(&request_obj_);
166     this->set_response(&response_obj_);
167   }
Release()168   void Release() override {
169     // the object is allocated in the call arena.
170     this->~DefaultMessageHolder<Request, Response>();
171   }
172 
173  private:
174   Request request_obj_;
175   Response response_obj_;
176 };
177 
178 }  // namespace internal
179 
180 // Forward declarations
181 class ServerUnaryReactor;
182 template <class Request>
183 class ServerReadReactor;
184 template <class Response>
185 class ServerWriteReactor;
186 template <class Request, class Response>
187 class ServerBidiReactor;
188 
189 // NOTE: The actual call/stream object classes are provided as API only to
190 // support mocking. There are no implementations of these class interfaces in
191 // the API.
192 class ServerCallbackUnary : public internal::ServerCallbackCall {
193  public:
~ServerCallbackUnary()194   ~ServerCallbackUnary() override {}
195   virtual void Finish(::grpc::Status s) = 0;
196   virtual void SendInitialMetadata() = 0;
197 
198  protected:
199   // Use a template rather than explicitly specifying ServerUnaryReactor to
200   // delay binding and avoid a circular forward declaration issue
201   template <class Reactor>
BindReactor(Reactor * reactor)202   void BindReactor(Reactor* reactor) {
203     reactor->InternalBindCall(this);
204   }
205 };
206 
207 template <class Request>
208 class ServerCallbackReader : public internal::ServerCallbackCall {
209  public:
~ServerCallbackReader()210   ~ServerCallbackReader() override {}
211   virtual void Finish(::grpc::Status s) = 0;
212   virtual void SendInitialMetadata() = 0;
213   virtual void Read(Request* msg) = 0;
214 
215  protected:
BindReactor(ServerReadReactor<Request> * reactor)216   void BindReactor(ServerReadReactor<Request>* reactor) {
217     reactor->InternalBindReader(this);
218   }
219 };
220 
221 template <class Response>
222 class ServerCallbackWriter : public internal::ServerCallbackCall {
223  public:
~ServerCallbackWriter()224   ~ServerCallbackWriter() override {}
225 
226   virtual void Finish(::grpc::Status s) = 0;
227   virtual void SendInitialMetadata() = 0;
228   virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0;
229   virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options,
230                               ::grpc::Status s) = 0;
231 
232  protected:
BindReactor(ServerWriteReactor<Response> * reactor)233   void BindReactor(ServerWriteReactor<Response>* reactor) {
234     reactor->InternalBindWriter(this);
235   }
236 };
237 
238 template <class Request, class Response>
239 class ServerCallbackReaderWriter : public internal::ServerCallbackCall {
240  public:
~ServerCallbackReaderWriter()241   ~ServerCallbackReaderWriter() override {}
242 
243   virtual void Finish(::grpc::Status s) = 0;
244   virtual void SendInitialMetadata() = 0;
245   virtual void Read(Request* msg) = 0;
246   virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0;
247   virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options,
248                               ::grpc::Status s) = 0;
249 
250  protected:
BindReactor(ServerBidiReactor<Request,Response> * reactor)251   void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
252     reactor->InternalBindStream(this);
253   }
254 };
255 
256 // The following classes are the reactor interfaces that are to be implemented
257 // by the user, returned as the output parameter of the method handler for a
258 // callback method. Note that none of the classes are pure; all reactions have a
259 // default empty reaction so that the user class only needs to override those
260 // reactions that it cares about. The reaction methods will be invoked by the
261 // library in response to the completion of various operations. Reactions must
262 // not include blocking operations (such as blocking I/O, starting synchronous
263 // RPCs, or waiting on condition variables). Reactions may be invoked
264 // concurrently, except that OnDone is called after all others (assuming proper
265 // API usage). The reactor may not be deleted until OnDone is called.
266 
267 /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC.
268 template <class Request, class Response>
269 class ServerBidiReactor : public internal::ServerReactor {
270  public:
271   // NOTE: Initializing stream_ as a constructor initializer rather than a
272   //       default initializer because gcc-4.x requires a copy constructor for
273   //       default initializing a templated member, which isn't ok for atomic.
274   // TODO(vjpai): Switch to default constructor and default initializer when
275   //              gcc-4.x is no longer supported
ServerBidiReactor()276   ServerBidiReactor() : stream_(nullptr) {}
277   ~ServerBidiReactor() override = default;
278 
279   /// Send any initial metadata stored in the RPC context. If not invoked,
280   /// any initial metadata will be passed along with the first Write or the
281   /// Finish (if there are no writes).
StartSendInitialMetadata()282   void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_) {
283     ServerCallbackReaderWriter<Request, Response>* stream =
284         stream_.load(std::memory_order_acquire);
285     if (stream == nullptr) {
286       grpc::internal::MutexLock l(&stream_mu_);
287       stream = stream_.load(std::memory_order_relaxed);
288       if (stream == nullptr) {
289         backlog_.send_initial_metadata_wanted = true;
290         return;
291       }
292     }
293     stream->SendInitialMetadata();
294   }
295 
296   /// Initiate a read operation.
297   ///
298   /// \param[out] req Where to eventually store the read message. Valid when
299   ///                 the library calls OnReadDone
StartRead(Request * req)300   void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(stream_mu_) {
301     ServerCallbackReaderWriter<Request, Response>* stream =
302         stream_.load(std::memory_order_acquire);
303     if (stream == nullptr) {
304       grpc::internal::MutexLock l(&stream_mu_);
305       stream = stream_.load(std::memory_order_relaxed);
306       if (stream == nullptr) {
307         backlog_.read_wanted = req;
308         return;
309       }
310     }
311     stream->Read(req);
312   }
313 
314   /// Initiate a write operation.
315   ///
316   /// \param[in] resp The message to be written. The library does not take
317   ///                 ownership but the caller must ensure that the message is
318   ///                 not deleted or modified until OnWriteDone is called.
StartWrite(const Response * resp)319   void StartWrite(const Response* resp) {
320     StartWrite(resp, ::grpc::WriteOptions());
321   }
322 
323   /// Initiate a write operation with specified options.
324   ///
325   /// \param[in] resp The message to be written. The library does not take
326   ///                 ownership but the caller must ensure that the message is
327   ///                 not deleted or modified until OnWriteDone is called.
328   /// \param[in] options The WriteOptions to use for writing this message
StartWrite(const Response * resp,::grpc::WriteOptions options)329   void StartWrite(const Response* resp, ::grpc::WriteOptions options)
330       ABSL_LOCKS_EXCLUDED(stream_mu_) {
331     ServerCallbackReaderWriter<Request, Response>* stream =
332         stream_.load(std::memory_order_acquire);
333     if (stream == nullptr) {
334       grpc::internal::MutexLock l(&stream_mu_);
335       stream = stream_.load(std::memory_order_relaxed);
336       if (stream == nullptr) {
337         backlog_.write_wanted = resp;
338         backlog_.write_options_wanted = options;
339         return;
340       }
341     }
342     stream->Write(resp, options);
343   }
344 
345   /// Initiate a write operation with specified options and final RPC Status,
346   /// which also causes any trailing metadata for this RPC to be sent out.
347   /// StartWriteAndFinish is like merging StartWriteLast and Finish into a
348   /// single step. A key difference, though, is that this operation doesn't have
349   /// an OnWriteDone reaction - it is considered complete only when OnDone is
350   /// available. An RPC can either have StartWriteAndFinish or Finish, but not
351   /// both.
352   ///
353   /// \param[in] resp The message to be written. The library does not take
354   ///                 ownership but the caller must ensure that the message is
355   ///                 not deleted or modified until OnDone is called.
356   /// \param[in] options The WriteOptions to use for writing this message
357   /// \param[in] s The status outcome of this RPC
StartWriteAndFinish(const Response * resp,::grpc::WriteOptions options,::grpc::Status s)358   void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options,
359                            ::grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) {
360     ServerCallbackReaderWriter<Request, Response>* stream =
361         stream_.load(std::memory_order_acquire);
362     if (stream == nullptr) {
363       grpc::internal::MutexLock l(&stream_mu_);
364       stream = stream_.load(std::memory_order_relaxed);
365       if (stream == nullptr) {
366         backlog_.write_and_finish_wanted = true;
367         backlog_.write_wanted = resp;
368         backlog_.write_options_wanted = options;
369         backlog_.status_wanted = std::move(s);
370         return;
371       }
372     }
373     stream->WriteAndFinish(resp, options, std::move(s));
374   }
375 
376   /// Inform system of a planned write operation with specified options, but
377   /// allow the library to schedule the actual write coalesced with the writing
378   /// of trailing metadata (which takes place on a Finish call).
379   ///
380   /// \param[in] resp The message to be written. The library does not take
381   ///                 ownership but the caller must ensure that the message is
382   ///                 not deleted or modified until OnWriteDone is called.
383   /// \param[in] options The WriteOptions to use for writing this message
StartWriteLast(const Response * resp,::grpc::WriteOptions options)384   void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) {
385     StartWrite(resp, options.set_last_message());
386   }
387 
388   /// Indicate that the stream is to be finished and the trailing metadata and
389   /// RPC status are to be sent. Every RPC MUST be finished using either Finish
390   /// or StartWriteAndFinish (but not both), even if the RPC is already
391   /// cancelled.
392   ///
393   /// \param[in] s The status outcome of this RPC
Finish(::grpc::Status s)394   void Finish(::grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) {
395     ServerCallbackReaderWriter<Request, Response>* stream =
396         stream_.load(std::memory_order_acquire);
397     if (stream == nullptr) {
398       grpc::internal::MutexLock l(&stream_mu_);
399       stream = stream_.load(std::memory_order_relaxed);
400       if (stream == nullptr) {
401         backlog_.finish_wanted = true;
402         backlog_.status_wanted = std::move(s);
403         return;
404       }
405     }
406     stream->Finish(std::move(s));
407   }
408 
409   /// Notifies the application that an explicit StartSendInitialMetadata
410   /// operation completed. Not used when the sending of initial metadata
411   /// piggybacks onto the first write.
412   ///
413   /// \param[in] ok Was it successful? If false, no further write-side operation
414   ///               will succeed.
OnSendInitialMetadataDone(bool)415   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
416 
417   /// Notifies the application that a StartRead operation completed.
418   ///
419   /// \param[in] ok Was it successful? If false, no further read-side operation
420   ///               will succeed.
OnReadDone(bool)421   virtual void OnReadDone(bool /*ok*/) {}
422 
423   /// Notifies the application that a StartWrite (or StartWriteLast) operation
424   /// completed.
425   ///
426   /// \param[in] ok Was it successful? If false, no further write-side operation
427   ///               will succeed.
OnWriteDone(bool)428   virtual void OnWriteDone(bool /*ok*/) {}
429 
430   /// Notifies the application that all operations associated with this RPC
431   /// have completed. This is an override (from the internal base class) but
432   /// still abstract, so derived classes MUST override it to be instantiated.
433   void OnDone() override = 0;
434 
435   /// Notifies the application that this RPC has been cancelled. This is an
436   /// override (from the internal base class) but not final, so derived classes
437   /// should override it if they want to take action.
OnCancel()438   void OnCancel() override {}
439 
440  private:
441   friend class ServerCallbackReaderWriter<Request, Response>;
442   // May be overridden by internal implementation details. This is not a public
443   // customization point.
InternalBindStream(ServerCallbackReaderWriter<Request,Response> * stream)444   virtual void InternalBindStream(
445       ServerCallbackReaderWriter<Request, Response>* stream) {
446     grpc::internal::MutexLock l(&stream_mu_);
447 
448     if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
449       stream->SendInitialMetadata();
450     }
451     if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
452       stream->Read(backlog_.read_wanted);
453     }
454     if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
455       stream->WriteAndFinish(backlog_.write_wanted,
456                              std::move(backlog_.write_options_wanted),
457                              std::move(backlog_.status_wanted));
458     } else {
459       if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
460         stream->Write(backlog_.write_wanted,
461                       std::move(backlog_.write_options_wanted));
462       }
463       if (GPR_UNLIKELY(backlog_.finish_wanted)) {
464         stream->Finish(std::move(backlog_.status_wanted));
465       }
466     }
467     // Set stream_ last so that other functions can use it lock-free
468     stream_.store(stream, std::memory_order_release);
469   }
470 
471   grpc::internal::Mutex stream_mu_;
472   // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant
473   //              once C++17 or ABSL is supported since stream and backlog are
474   //              mutually exclusive in this class. Do likewise with the
475   //              remaining reactor classes and their backlogs as well.
476   std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
477   struct PreBindBacklog {
478     bool send_initial_metadata_wanted = false;
479     bool write_and_finish_wanted = false;
480     bool finish_wanted = false;
481     Request* read_wanted = nullptr;
482     const Response* write_wanted = nullptr;
483     ::grpc::WriteOptions write_options_wanted;
484     ::grpc::Status status_wanted;
485   };
486   PreBindBacklog backlog_ ABSL_GUARDED_BY(stream_mu_);
487 };
488 
489 /// \a ServerReadReactor is the interface for a client-streaming RPC.
490 template <class Request>
491 class ServerReadReactor : public internal::ServerReactor {
492  public:
ServerReadReactor()493   ServerReadReactor() : reader_(nullptr) {}
494   ~ServerReadReactor() override = default;
495 
496   /// The following operation initiations are exactly like ServerBidiReactor.
StartSendInitialMetadata()497   void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_) {
498     ServerCallbackReader<Request>* reader =
499         reader_.load(std::memory_order_acquire);
500     if (reader == nullptr) {
501       grpc::internal::MutexLock l(&reader_mu_);
502       reader = reader_.load(std::memory_order_relaxed);
503       if (reader == nullptr) {
504         backlog_.send_initial_metadata_wanted = true;
505         return;
506       }
507     }
508     reader->SendInitialMetadata();
509   }
StartRead(Request * req)510   void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(reader_mu_) {
511     ServerCallbackReader<Request>* reader =
512         reader_.load(std::memory_order_acquire);
513     if (reader == nullptr) {
514       grpc::internal::MutexLock l(&reader_mu_);
515       reader = reader_.load(std::memory_order_relaxed);
516       if (reader == nullptr) {
517         backlog_.read_wanted = req;
518         return;
519       }
520     }
521     reader->Read(req);
522   }
Finish(::grpc::Status s)523   void Finish(::grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_) {
524     ServerCallbackReader<Request>* reader =
525         reader_.load(std::memory_order_acquire);
526     if (reader == nullptr) {
527       grpc::internal::MutexLock l(&reader_mu_);
528       reader = reader_.load(std::memory_order_relaxed);
529       if (reader == nullptr) {
530         backlog_.finish_wanted = true;
531         backlog_.status_wanted = std::move(s);
532         return;
533       }
534     }
535     reader->Finish(std::move(s));
536   }
537 
538   /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)539   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
OnReadDone(bool)540   virtual void OnReadDone(bool /*ok*/) {}
541   void OnDone() override = 0;
OnCancel()542   void OnCancel() override {}
543 
544  private:
545   friend class ServerCallbackReader<Request>;
546 
547   // May be overridden by internal implementation details. This is not a public
548   // customization point.
InternalBindReader(ServerCallbackReader<Request> * reader)549   virtual void InternalBindReader(ServerCallbackReader<Request>* reader)
550       ABSL_LOCKS_EXCLUDED(reader_mu_) {
551     grpc::internal::MutexLock l(&reader_mu_);
552 
553     if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
554       reader->SendInitialMetadata();
555     }
556     if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
557       reader->Read(backlog_.read_wanted);
558     }
559     if (GPR_UNLIKELY(backlog_.finish_wanted)) {
560       reader->Finish(std::move(backlog_.status_wanted));
561     }
562     // Set reader_ last so that other functions can use it lock-free
563     reader_.store(reader, std::memory_order_release);
564   }
565 
566   grpc::internal::Mutex reader_mu_;
567   std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
568   struct PreBindBacklog {
569     bool send_initial_metadata_wanted = false;
570     bool finish_wanted = false;
571     Request* read_wanted = nullptr;
572     ::grpc::Status status_wanted;
573   };
574   PreBindBacklog backlog_ ABSL_GUARDED_BY(reader_mu_);
575 };
576 
577 /// \a ServerWriteReactor is the interface for a server-streaming RPC.
578 template <class Response>
579 class ServerWriteReactor : public internal::ServerReactor {
580  public:
ServerWriteReactor()581   ServerWriteReactor() : writer_(nullptr) {}
582   ~ServerWriteReactor() override = default;
583 
584   /// The following operation initiations are exactly like ServerBidiReactor.
StartSendInitialMetadata()585   void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_) {
586     ServerCallbackWriter<Response>* writer =
587         writer_.load(std::memory_order_acquire);
588     if (writer == nullptr) {
589       grpc::internal::MutexLock l(&writer_mu_);
590       writer = writer_.load(std::memory_order_relaxed);
591       if (writer == nullptr) {
592         backlog_.send_initial_metadata_wanted = true;
593         return;
594       }
595     }
596     writer->SendInitialMetadata();
597   }
StartWrite(const Response * resp)598   void StartWrite(const Response* resp) {
599     StartWrite(resp, ::grpc::WriteOptions());
600   }
StartWrite(const Response * resp,::grpc::WriteOptions options)601   void StartWrite(const Response* resp, ::grpc::WriteOptions options)
602       ABSL_LOCKS_EXCLUDED(writer_mu_) {
603     ServerCallbackWriter<Response>* writer =
604         writer_.load(std::memory_order_acquire);
605     if (writer == nullptr) {
606       grpc::internal::MutexLock l(&writer_mu_);
607       writer = writer_.load(std::memory_order_relaxed);
608       if (writer == nullptr) {
609         backlog_.write_wanted = resp;
610         backlog_.write_options_wanted = options;
611         return;
612       }
613     }
614     writer->Write(resp, options);
615   }
StartWriteAndFinish(const Response * resp,::grpc::WriteOptions options,::grpc::Status s)616   void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options,
617                            ::grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) {
618     ServerCallbackWriter<Response>* writer =
619         writer_.load(std::memory_order_acquire);
620     if (writer == nullptr) {
621       grpc::internal::MutexLock l(&writer_mu_);
622       writer = writer_.load(std::memory_order_relaxed);
623       if (writer == nullptr) {
624         backlog_.write_and_finish_wanted = true;
625         backlog_.write_wanted = resp;
626         backlog_.write_options_wanted = options;
627         backlog_.status_wanted = std::move(s);
628         return;
629       }
630     }
631     writer->WriteAndFinish(resp, options, std::move(s));
632   }
StartWriteLast(const Response * resp,::grpc::WriteOptions options)633   void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) {
634     StartWrite(resp, options.set_last_message());
635   }
Finish(::grpc::Status s)636   void Finish(::grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) {
637     ServerCallbackWriter<Response>* writer =
638         writer_.load(std::memory_order_acquire);
639     if (writer == nullptr) {
640       grpc::internal::MutexLock l(&writer_mu_);
641       writer = writer_.load(std::memory_order_relaxed);
642       if (writer == nullptr) {
643         backlog_.finish_wanted = true;
644         backlog_.status_wanted = std::move(s);
645         return;
646       }
647     }
648     writer->Finish(std::move(s));
649   }
650 
651   /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)652   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
OnWriteDone(bool)653   virtual void OnWriteDone(bool /*ok*/) {}
654   void OnDone() override = 0;
OnCancel()655   void OnCancel() override {}
656 
657  private:
658   friend class ServerCallbackWriter<Response>;
659   // May be overridden by internal implementation details. This is not a public
660   // customization point.
InternalBindWriter(ServerCallbackWriter<Response> * writer)661   virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer)
662       ABSL_LOCKS_EXCLUDED(writer_mu_) {
663     grpc::internal::MutexLock l(&writer_mu_);
664 
665     if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
666       writer->SendInitialMetadata();
667     }
668     if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
669       writer->WriteAndFinish(backlog_.write_wanted,
670                              std::move(backlog_.write_options_wanted),
671                              std::move(backlog_.status_wanted));
672     } else {
673       if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
674         writer->Write(backlog_.write_wanted,
675                       std::move(backlog_.write_options_wanted));
676       }
677       if (GPR_UNLIKELY(backlog_.finish_wanted)) {
678         writer->Finish(std::move(backlog_.status_wanted));
679       }
680     }
681     // Set writer_ last so that other functions can use it lock-free
682     writer_.store(writer, std::memory_order_release);
683   }
684 
685   grpc::internal::Mutex writer_mu_;
686   std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
687   struct PreBindBacklog {
688     bool send_initial_metadata_wanted = false;
689     bool write_and_finish_wanted = false;
690     bool finish_wanted = false;
691     const Response* write_wanted = nullptr;
692     ::grpc::WriteOptions write_options_wanted;
693     ::grpc::Status status_wanted;
694   };
695   PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_);
696 };
697 
698 class ServerUnaryReactor : public internal::ServerReactor {
699  public:
ServerUnaryReactor()700   ServerUnaryReactor() : call_(nullptr) {}
701   ~ServerUnaryReactor() override = default;
702 
703   /// StartSendInitialMetadata is exactly like ServerBidiReactor.
StartSendInitialMetadata()704   void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_) {
705     ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
706     if (call == nullptr) {
707       grpc::internal::MutexLock l(&call_mu_);
708       call = call_.load(std::memory_order_relaxed);
709       if (call == nullptr) {
710         backlog_.send_initial_metadata_wanted = true;
711         return;
712       }
713     }
714     call->SendInitialMetadata();
715   }
716   /// Finish is similar to ServerBidiReactor except for one detail.
717   /// If the status is non-OK, any message will not be sent. Instead,
718   /// the client will only receive the status and any trailing metadata.
Finish(::grpc::Status s)719   void Finish(::grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_) {
720     ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
721     if (call == nullptr) {
722       grpc::internal::MutexLock l(&call_mu_);
723       call = call_.load(std::memory_order_relaxed);
724       if (call == nullptr) {
725         backlog_.finish_wanted = true;
726         backlog_.status_wanted = std::move(s);
727         return;
728       }
729     }
730     call->Finish(std::move(s));
731   }
732 
733   /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)734   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
735   void OnDone() override = 0;
OnCancel()736   void OnCancel() override {}
737 
738  private:
739   friend class ServerCallbackUnary;
740   // May be overridden by internal implementation details. This is not a public
741   // customization point.
InternalBindCall(ServerCallbackUnary * call)742   virtual void InternalBindCall(ServerCallbackUnary* call)
743       ABSL_LOCKS_EXCLUDED(call_mu_) {
744     grpc::internal::MutexLock l(&call_mu_);
745 
746     if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
747       call->SendInitialMetadata();
748     }
749     if (GPR_UNLIKELY(backlog_.finish_wanted)) {
750       call->Finish(std::move(backlog_.status_wanted));
751     }
752     // Set call_ last so that other functions can use it lock-free
753     call_.store(call, std::memory_order_release);
754   }
755 
756   grpc::internal::Mutex call_mu_;
757   std::atomic<ServerCallbackUnary*> call_{nullptr};
758   struct PreBindBacklog {
759     bool send_initial_metadata_wanted = false;
760     bool finish_wanted = false;
761     ::grpc::Status status_wanted;
762   };
763   PreBindBacklog backlog_ ABSL_GUARDED_BY(call_mu_);
764 };
765 
766 namespace internal {
767 
768 template <class Base>
769 class FinishOnlyReactor : public Base {
770  public:
FinishOnlyReactor(::grpc::Status s)771   explicit FinishOnlyReactor(::grpc::Status s) { this->Finish(std::move(s)); }
OnDone()772   void OnDone() override { this->~FinishOnlyReactor(); }
773 };
774 
775 using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>;
776 template <class Request>
777 using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>;
778 template <class Response>
779 using UnimplementedWriteReactor =
780     FinishOnlyReactor<ServerWriteReactor<Response>>;
781 template <class Request, class Response>
782 using UnimplementedBidiReactor =
783     FinishOnlyReactor<ServerBidiReactor<Request, Response>>;
784 
785 }  // namespace internal
786 
787 // TODO(vjpai): Remove namespace experimental when de-experimentalized fully.
788 namespace experimental {
789 
790 template <class Request>
791 using ServerReadReactor = ::grpc::ServerReadReactor<Request>;
792 
793 template <class Response>
794 using ServerWriteReactor = ::grpc::ServerWriteReactor<Response>;
795 
796 template <class Request, class Response>
797 using ServerBidiReactor = ::grpc::ServerBidiReactor<Request, Response>;
798 
799 using ServerUnaryReactor = ::grpc::ServerUnaryReactor;
800 
801 }  // namespace experimental
802 
803 }  // namespace grpc
804 
805 #endif  // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
806