• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H
20 #define GRPCPP_SUPPORT_SERVER_CALLBACK_H
21 
22 #include <atomic>
23 #include <functional>
24 #include <type_traits>
25 
26 #include <grpcpp/impl/call.h>
27 #include <grpcpp/impl/call_op_set.h>
28 #include <grpcpp/impl/sync.h>
29 #include <grpcpp/support/callback_common.h>
30 #include <grpcpp/support/config.h>
31 #include <grpcpp/support/message_allocator.h>
32 #include <grpcpp/support/status.h>
33 
34 namespace grpc {
35 
36 // Declare base class of all reactors as internal
37 namespace internal {
38 
39 // Forward declarations
40 template <class Request, class Response>
41 class CallbackUnaryHandler;
42 template <class Request, class Response>
43 class CallbackClientStreamingHandler;
44 template <class Request, class Response>
45 class CallbackServerStreamingHandler;
46 template <class Request, class Response>
47 class CallbackBidiHandler;
48 
49 class ServerReactor {
50  public:
51   virtual ~ServerReactor() = default;
52   virtual void OnDone() = 0;
53   virtual void OnCancel() = 0;
54 
55   // The following is not API. It is for internal use only and specifies whether
56   // all reactions of this Reactor can be run without an extra executor
57   // scheduling. This should only be used for internally-defined reactors with
58   // trivial reactions.
InternalInlineable()59   virtual bool InternalInlineable() { return false; }
60 
61  private:
62   template <class Request, class Response>
63   friend class CallbackUnaryHandler;
64   template <class Request, class Response>
65   friend class CallbackClientStreamingHandler;
66   template <class Request, class Response>
67   friend class CallbackServerStreamingHandler;
68   template <class Request, class Response>
69   friend class CallbackBidiHandler;
70 };
71 
72 /// The base class of ServerCallbackUnary etc.
73 class ServerCallbackCall {
74  public:
~ServerCallbackCall()75   virtual ~ServerCallbackCall() {}
76 
77   // This object is responsible for tracking when it is safe to call OnDone and
78   // OnCancel. OnDone should not be called until the method handler is complete,
79   // Finish has been called, the ServerContext CompletionOp (which tracks
80   // cancellation or successful completion) has completed, and all outstanding
81   // Read/Write actions have seen their reactions. OnCancel should not be called
82   // until after the method handler is done and the RPC has completed with a
83   // cancellation. This is tracked by counting how many of these conditions have
84   // been met and calling OnCancel when none remain unmet.
85 
86   // Public versions of MaybeDone: one where we don't know the reactor in
87   // advance (used for the ServerContext CompletionOp), and one for where we
88   // know the inlineability of the OnDone reaction. You should set the inline
89   // flag to true if either the Reactor is InternalInlineable() or if this
90   // callback is already being forced to run dispatched to an executor
91   // (typically because it contains additional work than just the MaybeDone).
92 
MaybeDone()93   void MaybeDone() {
94     if (GPR_UNLIKELY(Unref() == 1)) {
95       ScheduleOnDone(reactor()->InternalInlineable());
96     }
97   }
98 
MaybeDone(bool inline_ondone)99   void MaybeDone(bool inline_ondone) {
100     if (GPR_UNLIKELY(Unref() == 1)) {
101       ScheduleOnDone(inline_ondone);
102     }
103   }
104 
105   // Fast version called with known reactor passed in, used from derived
106   // classes, typically in non-cancel case
MaybeCallOnCancel(ServerReactor * reactor)107   void MaybeCallOnCancel(ServerReactor* reactor) {
108     if (GPR_UNLIKELY(UnblockCancellation())) {
109       CallOnCancel(reactor);
110     }
111   }
112 
113   // Slower version called from object that doesn't know the reactor a priori
114   // (such as the ServerContext CompletionOp which is formed before the
115   // reactor). This is used in cancel cases only, so it's ok to be slower and
116   // invoke a virtual function.
MaybeCallOnCancel()117   void MaybeCallOnCancel() {
118     if (GPR_UNLIKELY(UnblockCancellation())) {
119       CallOnCancel(reactor());
120     }
121   }
122 
123  protected:
124   /// Increases the reference count
Ref()125   void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); }
126 
127  private:
128   virtual ServerReactor* reactor() = 0;
129 
130   // CallOnDone performs the work required at completion of the RPC: invoking
131   // the OnDone function and doing all necessary cleanup. This function is only
132   // ever invoked on a fully-Unref'fed ServerCallbackCall.
133   virtual void CallOnDone() = 0;
134 
135   // If the OnDone reaction is inlineable, execute it inline. Otherwise send it
136   // to an executor.
137   void ScheduleOnDone(bool inline_ondone);
138 
139   // If the OnCancel reaction is inlineable, execute it inline. Otherwise send
140   // it to an executor.
141   void CallOnCancel(ServerReactor* reactor);
142 
143   // Implement the cancellation constraint counter. Return true if OnCancel
144   // should be called, false otherwise.
UnblockCancellation()145   bool UnblockCancellation() {
146     return on_cancel_conditions_remaining_.fetch_sub(
147                1, std::memory_order_acq_rel) == 1;
148   }
149 
150   /// Decreases the reference count and returns the previous value
Unref()151   int Unref() {
152     return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
153   }
154 
155   std::atomic_int on_cancel_conditions_remaining_{2};
156   std::atomic_int callbacks_outstanding_{
157       3};  // reserve for start, Finish, and CompletionOp
158 };
159 
160 template <class Request, class Response>
161 class DefaultMessageHolder : public MessageHolder<Request, Response> {
162  public:
DefaultMessageHolder()163   DefaultMessageHolder() {
164     this->set_request(&request_obj_);
165     this->set_response(&response_obj_);
166   }
Release()167   void Release() override {
168     // the object is allocated in the call arena.
169     this->~DefaultMessageHolder<Request, Response>();
170   }
171 
172  private:
173   Request request_obj_;
174   Response response_obj_;
175 };
176 
177 }  // namespace internal
178 
179 // Forward declarations
180 class ServerUnaryReactor;
181 template <class Request>
182 class ServerReadReactor;
183 template <class Response>
184 class ServerWriteReactor;
185 template <class Request, class Response>
186 class ServerBidiReactor;
187 
188 // NOTE: The actual call/stream object classes are provided as API only to
189 // support mocking. There are no implementations of these class interfaces in
190 // the API.
191 class ServerCallbackUnary : public internal::ServerCallbackCall {
192  public:
~ServerCallbackUnary()193   ~ServerCallbackUnary() override {}
194   virtual void Finish(grpc::Status s) = 0;
195   virtual void SendInitialMetadata() = 0;
196 
197  protected:
198   // Use a template rather than explicitly specifying ServerUnaryReactor to
199   // delay binding and avoid a circular forward declaration issue
200   template <class Reactor>
BindReactor(Reactor * reactor)201   void BindReactor(Reactor* reactor) {
202     reactor->InternalBindCall(this);
203   }
204 };
205 
206 template <class Request>
207 class ServerCallbackReader : public internal::ServerCallbackCall {
208  public:
~ServerCallbackReader()209   ~ServerCallbackReader() override {}
210   virtual void Finish(grpc::Status s) = 0;
211   virtual void SendInitialMetadata() = 0;
212   virtual void Read(Request* msg) = 0;
213 
214  protected:
BindReactor(ServerReadReactor<Request> * reactor)215   void BindReactor(ServerReadReactor<Request>* reactor) {
216     reactor->InternalBindReader(this);
217   }
218 };
219 
220 template <class Response>
221 class ServerCallbackWriter : public internal::ServerCallbackCall {
222  public:
~ServerCallbackWriter()223   ~ServerCallbackWriter() override {}
224 
225   virtual void Finish(grpc::Status s) = 0;
226   virtual void SendInitialMetadata() = 0;
227   virtual void Write(const Response* msg, grpc::WriteOptions options) = 0;
228   virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options,
229                               grpc::Status s) = 0;
230 
231  protected:
BindReactor(ServerWriteReactor<Response> * reactor)232   void BindReactor(ServerWriteReactor<Response>* reactor) {
233     reactor->InternalBindWriter(this);
234   }
235 };
236 
237 template <class Request, class Response>
238 class ServerCallbackReaderWriter : public internal::ServerCallbackCall {
239  public:
~ServerCallbackReaderWriter()240   ~ServerCallbackReaderWriter() override {}
241 
242   virtual void Finish(grpc::Status s) = 0;
243   virtual void SendInitialMetadata() = 0;
244   virtual void Read(Request* msg) = 0;
245   virtual void Write(const Response* msg, grpc::WriteOptions options) = 0;
246   virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options,
247                               grpc::Status s) = 0;
248 
249  protected:
BindReactor(ServerBidiReactor<Request,Response> * reactor)250   void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
251     reactor->InternalBindStream(this);
252   }
253 };
254 
255 // The following classes are the reactor interfaces that are to be implemented
256 // by the user, returned as the output parameter of the method handler for a
257 // callback method. Note that none of the classes are pure; all reactions have a
258 // default empty reaction so that the user class only needs to override those
259 // reactions that it cares about. The reaction methods will be invoked by the
260 // library in response to the completion of various operations. Reactions must
261 // not include blocking operations (such as blocking I/O, starting synchronous
262 // RPCs, or waiting on condition variables). Reactions may be invoked
263 // concurrently, except that OnDone is called after all others (assuming proper
264 // API usage). The reactor may not be deleted until OnDone is called.
265 
266 /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC.
267 template <class Request, class Response>
268 class ServerBidiReactor : public internal::ServerReactor {
269  public:
270   // NOTE: Initializing stream_ as a constructor initializer rather than a
271   //       default initializer because gcc-4.x requires a copy constructor for
272   //       default initializing a templated member, which isn't ok for atomic.
273   // TODO(vjpai): Switch to default constructor and default initializer when
274   //              gcc-4.x is no longer supported
ServerBidiReactor()275   ServerBidiReactor() : stream_(nullptr) {}
276   ~ServerBidiReactor() override = default;
277 
278   /// Send any initial metadata stored in the RPC context. If not invoked,
279   /// any initial metadata will be passed along with the first Write or the
280   /// Finish (if there are no writes).
StartSendInitialMetadata()281   void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_) {
282     ServerCallbackReaderWriter<Request, Response>* stream =
283         stream_.load(std::memory_order_acquire);
284     if (stream == nullptr) {
285       grpc::internal::MutexLock l(&stream_mu_);
286       stream = stream_.load(std::memory_order_relaxed);
287       if (stream == nullptr) {
288         backlog_.send_initial_metadata_wanted = true;
289         return;
290       }
291     }
292     stream->SendInitialMetadata();
293   }
294 
295   /// Initiate a read operation.
296   ///
297   /// \param[out] req Where to eventually store the read message. Valid when
298   ///                 the library calls OnReadDone
StartRead(Request * req)299   void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(stream_mu_) {
300     ServerCallbackReaderWriter<Request, Response>* stream =
301         stream_.load(std::memory_order_acquire);
302     if (stream == nullptr) {
303       grpc::internal::MutexLock l(&stream_mu_);
304       stream = stream_.load(std::memory_order_relaxed);
305       if (stream == nullptr) {
306         backlog_.read_wanted = req;
307         return;
308       }
309     }
310     stream->Read(req);
311   }
312 
313   /// Initiate a write operation.
314   ///
315   /// \param[in] resp The message to be written. The library does not take
316   ///                 ownership but the caller must ensure that the message is
317   ///                 not deleted or modified until OnWriteDone is called.
StartWrite(const Response * resp)318   void StartWrite(const Response* resp) {
319     StartWrite(resp, grpc::WriteOptions());
320   }
321 
322   /// Initiate a write operation with specified options.
323   ///
324   /// \param[in] resp The message to be written. The library does not take
325   ///                 ownership but the caller must ensure that the message is
326   ///                 not deleted or modified until OnWriteDone is called.
327   /// \param[in] options The WriteOptions to use for writing this message
StartWrite(const Response * resp,grpc::WriteOptions options)328   void StartWrite(const Response* resp, grpc::WriteOptions options)
329       ABSL_LOCKS_EXCLUDED(stream_mu_) {
330     ServerCallbackReaderWriter<Request, Response>* stream =
331         stream_.load(std::memory_order_acquire);
332     if (stream == nullptr) {
333       grpc::internal::MutexLock l(&stream_mu_);
334       stream = stream_.load(std::memory_order_relaxed);
335       if (stream == nullptr) {
336         backlog_.write_wanted = resp;
337         backlog_.write_options_wanted = options;
338         return;
339       }
340     }
341     stream->Write(resp, options);
342   }
343 
344   /// Initiate a write operation with specified options and final RPC Status,
345   /// which also causes any trailing metadata for this RPC to be sent out.
346   /// StartWriteAndFinish is like merging StartWriteLast and Finish into a
347   /// single step. A key difference, though, is that this operation doesn't have
348   /// an OnWriteDone reaction - it is considered complete only when OnDone is
349   /// available. An RPC can either have StartWriteAndFinish or Finish, but not
350   /// both.
351   ///
352   /// \param[in] resp The message to be written. The library does not take
353   ///                 ownership but the caller must ensure that the message is
354   ///                 not deleted or modified until OnDone is called.
355   /// \param[in] options The WriteOptions to use for writing this message
356   /// \param[in] s The status outcome of this RPC
StartWriteAndFinish(const Response * resp,grpc::WriteOptions options,grpc::Status s)357   void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options,
358                            grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) {
359     ServerCallbackReaderWriter<Request, Response>* stream =
360         stream_.load(std::memory_order_acquire);
361     if (stream == nullptr) {
362       grpc::internal::MutexLock l(&stream_mu_);
363       stream = stream_.load(std::memory_order_relaxed);
364       if (stream == nullptr) {
365         backlog_.write_and_finish_wanted = true;
366         backlog_.write_wanted = resp;
367         backlog_.write_options_wanted = options;
368         backlog_.status_wanted = std::move(s);
369         return;
370       }
371     }
372     stream->WriteAndFinish(resp, options, std::move(s));
373   }
374 
375   /// Inform system of a planned write operation with specified options, but
376   /// allow the library to schedule the actual write coalesced with the writing
377   /// of trailing metadata (which takes place on a Finish call).
378   ///
379   /// \param[in] resp The message to be written. The library does not take
380   ///                 ownership but the caller must ensure that the message is
381   ///                 not deleted or modified until OnWriteDone is called.
382   /// \param[in] options The WriteOptions to use for writing this message
StartWriteLast(const Response * resp,grpc::WriteOptions options)383   void StartWriteLast(const Response* resp, grpc::WriteOptions options) {
384     StartWrite(resp, options.set_last_message());
385   }
386 
387   /// Indicate that the stream is to be finished and the trailing metadata and
388   /// RPC status are to be sent. Every RPC MUST be finished using either Finish
389   /// or StartWriteAndFinish (but not both), even if the RPC is already
390   /// cancelled.
391   ///
392   /// \param[in] s The status outcome of this RPC
Finish(grpc::Status s)393   void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) {
394     ServerCallbackReaderWriter<Request, Response>* stream =
395         stream_.load(std::memory_order_acquire);
396     if (stream == nullptr) {
397       grpc::internal::MutexLock l(&stream_mu_);
398       stream = stream_.load(std::memory_order_relaxed);
399       if (stream == nullptr) {
400         backlog_.finish_wanted = true;
401         backlog_.status_wanted = std::move(s);
402         return;
403       }
404     }
405     stream->Finish(std::move(s));
406   }
407 
408   /// Notifies the application that an explicit StartSendInitialMetadata
409   /// operation completed. Not used when the sending of initial metadata
410   /// piggybacks onto the first write.
411   ///
412   /// \param[in] ok Was it successful? If false, no further write-side operation
413   ///               will succeed.
OnSendInitialMetadataDone(bool)414   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
415 
416   /// Notifies the application that a StartRead operation completed.
417   ///
418   /// \param[in] ok Was it successful? If false, no further read-side operation
419   ///               will succeed.
OnReadDone(bool)420   virtual void OnReadDone(bool /*ok*/) {}
421 
422   /// Notifies the application that a StartWrite (or StartWriteLast) operation
423   /// completed.
424   ///
425   /// \param[in] ok Was it successful? If false, no further write-side operation
426   ///               will succeed.
OnWriteDone(bool)427   virtual void OnWriteDone(bool /*ok*/) {}
428 
429   /// Notifies the application that all operations associated with this RPC
430   /// have completed. This is an override (from the internal base class) but
431   /// still abstract, so derived classes MUST override it to be instantiated.
432   void OnDone() override = 0;
433 
434   /// Notifies the application that this RPC has been cancelled. This is an
435   /// override (from the internal base class) but not final, so derived classes
436   /// should override it if they want to take action.
OnCancel()437   void OnCancel() override {}
438 
439  private:
440   friend class ServerCallbackReaderWriter<Request, Response>;
441   // May be overridden by internal implementation details. This is not a public
442   // customization point.
InternalBindStream(ServerCallbackReaderWriter<Request,Response> * stream)443   virtual void InternalBindStream(
444       ServerCallbackReaderWriter<Request, Response>* stream) {
445     grpc::internal::MutexLock l(&stream_mu_);
446 
447     if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
448       stream->SendInitialMetadata();
449     }
450     if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
451       stream->Read(backlog_.read_wanted);
452     }
453     if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
454       stream->WriteAndFinish(backlog_.write_wanted,
455                              std::move(backlog_.write_options_wanted),
456                              std::move(backlog_.status_wanted));
457     } else {
458       if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
459         stream->Write(backlog_.write_wanted,
460                       std::move(backlog_.write_options_wanted));
461       }
462       if (GPR_UNLIKELY(backlog_.finish_wanted)) {
463         stream->Finish(std::move(backlog_.status_wanted));
464       }
465     }
466     // Set stream_ last so that other functions can use it lock-free
467     stream_.store(stream, std::memory_order_release);
468   }
469 
470   grpc::internal::Mutex stream_mu_;
471   // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant
472   //              once C++17 or ABSL is supported since stream and backlog are
473   //              mutually exclusive in this class. Do likewise with the
474   //              remaining reactor classes and their backlogs as well.
475   std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
476   struct PreBindBacklog {
477     bool send_initial_metadata_wanted = false;
478     bool write_and_finish_wanted = false;
479     bool finish_wanted = false;
480     Request* read_wanted = nullptr;
481     const Response* write_wanted = nullptr;
482     grpc::WriteOptions write_options_wanted;
483     grpc::Status status_wanted;
484   };
485   PreBindBacklog backlog_ ABSL_GUARDED_BY(stream_mu_);
486 };
487 
488 /// \a ServerReadReactor is the interface for a client-streaming RPC.
489 template <class Request>
490 class ServerReadReactor : public internal::ServerReactor {
491  public:
ServerReadReactor()492   ServerReadReactor() : reader_(nullptr) {}
493   ~ServerReadReactor() override = default;
494 
495   /// The following operation initiations are exactly like ServerBidiReactor.
StartSendInitialMetadata()496   void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_) {
497     ServerCallbackReader<Request>* reader =
498         reader_.load(std::memory_order_acquire);
499     if (reader == nullptr) {
500       grpc::internal::MutexLock l(&reader_mu_);
501       reader = reader_.load(std::memory_order_relaxed);
502       if (reader == nullptr) {
503         backlog_.send_initial_metadata_wanted = true;
504         return;
505       }
506     }
507     reader->SendInitialMetadata();
508   }
StartRead(Request * req)509   void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(reader_mu_) {
510     ServerCallbackReader<Request>* reader =
511         reader_.load(std::memory_order_acquire);
512     if (reader == nullptr) {
513       grpc::internal::MutexLock l(&reader_mu_);
514       reader = reader_.load(std::memory_order_relaxed);
515       if (reader == nullptr) {
516         backlog_.read_wanted = req;
517         return;
518       }
519     }
520     reader->Read(req);
521   }
Finish(grpc::Status s)522   void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_) {
523     ServerCallbackReader<Request>* reader =
524         reader_.load(std::memory_order_acquire);
525     if (reader == nullptr) {
526       grpc::internal::MutexLock l(&reader_mu_);
527       reader = reader_.load(std::memory_order_relaxed);
528       if (reader == nullptr) {
529         backlog_.finish_wanted = true;
530         backlog_.status_wanted = std::move(s);
531         return;
532       }
533     }
534     reader->Finish(std::move(s));
535   }
536 
537   /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)538   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
OnReadDone(bool)539   virtual void OnReadDone(bool /*ok*/) {}
540   void OnDone() override = 0;
OnCancel()541   void OnCancel() override {}
542 
543  private:
544   friend class ServerCallbackReader<Request>;
545 
546   // May be overridden by internal implementation details. This is not a public
547   // customization point.
InternalBindReader(ServerCallbackReader<Request> * reader)548   virtual void InternalBindReader(ServerCallbackReader<Request>* reader)
549       ABSL_LOCKS_EXCLUDED(reader_mu_) {
550     grpc::internal::MutexLock l(&reader_mu_);
551 
552     if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
553       reader->SendInitialMetadata();
554     }
555     if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
556       reader->Read(backlog_.read_wanted);
557     }
558     if (GPR_UNLIKELY(backlog_.finish_wanted)) {
559       reader->Finish(std::move(backlog_.status_wanted));
560     }
561     // Set reader_ last so that other functions can use it lock-free
562     reader_.store(reader, std::memory_order_release);
563   }
564 
565   grpc::internal::Mutex reader_mu_;
566   std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
567   struct PreBindBacklog {
568     bool send_initial_metadata_wanted = false;
569     bool finish_wanted = false;
570     Request* read_wanted = nullptr;
571     grpc::Status status_wanted;
572   };
573   PreBindBacklog backlog_ ABSL_GUARDED_BY(reader_mu_);
574 };
575 
576 /// \a ServerWriteReactor is the interface for a server-streaming RPC.
577 template <class Response>
578 class ServerWriteReactor : public internal::ServerReactor {
579  public:
ServerWriteReactor()580   ServerWriteReactor() : writer_(nullptr) {}
581   ~ServerWriteReactor() override = default;
582 
583   /// The following operation initiations are exactly like ServerBidiReactor.
StartSendInitialMetadata()584   void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_) {
585     ServerCallbackWriter<Response>* writer =
586         writer_.load(std::memory_order_acquire);
587     if (writer == nullptr) {
588       grpc::internal::MutexLock l(&writer_mu_);
589       writer = writer_.load(std::memory_order_relaxed);
590       if (writer == nullptr) {
591         backlog_.send_initial_metadata_wanted = true;
592         return;
593       }
594     }
595     writer->SendInitialMetadata();
596   }
StartWrite(const Response * resp)597   void StartWrite(const Response* resp) {
598     StartWrite(resp, grpc::WriteOptions());
599   }
StartWrite(const Response * resp,grpc::WriteOptions options)600   void StartWrite(const Response* resp, grpc::WriteOptions options)
601       ABSL_LOCKS_EXCLUDED(writer_mu_) {
602     ServerCallbackWriter<Response>* writer =
603         writer_.load(std::memory_order_acquire);
604     if (writer == nullptr) {
605       grpc::internal::MutexLock l(&writer_mu_);
606       writer = writer_.load(std::memory_order_relaxed);
607       if (writer == nullptr) {
608         backlog_.write_wanted = resp;
609         backlog_.write_options_wanted = options;
610         return;
611       }
612     }
613     writer->Write(resp, options);
614   }
StartWriteAndFinish(const Response * resp,grpc::WriteOptions options,grpc::Status s)615   void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options,
616                            grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) {
617     ServerCallbackWriter<Response>* writer =
618         writer_.load(std::memory_order_acquire);
619     if (writer == nullptr) {
620       grpc::internal::MutexLock l(&writer_mu_);
621       writer = writer_.load(std::memory_order_relaxed);
622       if (writer == nullptr) {
623         backlog_.write_and_finish_wanted = true;
624         backlog_.write_wanted = resp;
625         backlog_.write_options_wanted = options;
626         backlog_.status_wanted = std::move(s);
627         return;
628       }
629     }
630     writer->WriteAndFinish(resp, options, std::move(s));
631   }
StartWriteLast(const Response * resp,grpc::WriteOptions options)632   void StartWriteLast(const Response* resp, grpc::WriteOptions options) {
633     StartWrite(resp, options.set_last_message());
634   }
Finish(grpc::Status s)635   void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) {
636     ServerCallbackWriter<Response>* writer =
637         writer_.load(std::memory_order_acquire);
638     if (writer == nullptr) {
639       grpc::internal::MutexLock l(&writer_mu_);
640       writer = writer_.load(std::memory_order_relaxed);
641       if (writer == nullptr) {
642         backlog_.finish_wanted = true;
643         backlog_.status_wanted = std::move(s);
644         return;
645       }
646     }
647     writer->Finish(std::move(s));
648   }
649 
650   /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)651   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
OnWriteDone(bool)652   virtual void OnWriteDone(bool /*ok*/) {}
653   void OnDone() override = 0;
OnCancel()654   void OnCancel() override {}
655 
656  private:
657   friend class ServerCallbackWriter<Response>;
658   // May be overridden by internal implementation details. This is not a public
659   // customization point.
InternalBindWriter(ServerCallbackWriter<Response> * writer)660   virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer)
661       ABSL_LOCKS_EXCLUDED(writer_mu_) {
662     grpc::internal::MutexLock l(&writer_mu_);
663 
664     if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
665       writer->SendInitialMetadata();
666     }
667     if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
668       writer->WriteAndFinish(backlog_.write_wanted,
669                              std::move(backlog_.write_options_wanted),
670                              std::move(backlog_.status_wanted));
671     } else {
672       if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
673         writer->Write(backlog_.write_wanted,
674                       std::move(backlog_.write_options_wanted));
675       }
676       if (GPR_UNLIKELY(backlog_.finish_wanted)) {
677         writer->Finish(std::move(backlog_.status_wanted));
678       }
679     }
680     // Set writer_ last so that other functions can use it lock-free
681     writer_.store(writer, std::memory_order_release);
682   }
683 
684   grpc::internal::Mutex writer_mu_;
685   std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
686   struct PreBindBacklog {
687     bool send_initial_metadata_wanted = false;
688     bool write_and_finish_wanted = false;
689     bool finish_wanted = false;
690     const Response* write_wanted = nullptr;
691     grpc::WriteOptions write_options_wanted;
692     grpc::Status status_wanted;
693   };
694   PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_);
695 };
696 
697 class ServerUnaryReactor : public internal::ServerReactor {
698  public:
ServerUnaryReactor()699   ServerUnaryReactor() : call_(nullptr) {}
700   ~ServerUnaryReactor() override = default;
701 
702   /// StartSendInitialMetadata is exactly like ServerBidiReactor.
StartSendInitialMetadata()703   void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_) {
704     ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
705     if (call == nullptr) {
706       grpc::internal::MutexLock l(&call_mu_);
707       call = call_.load(std::memory_order_relaxed);
708       if (call == nullptr) {
709         backlog_.send_initial_metadata_wanted = true;
710         return;
711       }
712     }
713     call->SendInitialMetadata();
714   }
715   /// Finish is similar to ServerBidiReactor except for one detail.
716   /// If the status is non-OK, any message will not be sent. Instead,
717   /// the client will only receive the status and any trailing metadata.
Finish(grpc::Status s)718   void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_) {
719     ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
720     if (call == nullptr) {
721       grpc::internal::MutexLock l(&call_mu_);
722       call = call_.load(std::memory_order_relaxed);
723       if (call == nullptr) {
724         backlog_.finish_wanted = true;
725         backlog_.status_wanted = std::move(s);
726         return;
727       }
728     }
729     call->Finish(std::move(s));
730   }
731 
732   /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)733   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
734   void OnDone() override = 0;
OnCancel()735   void OnCancel() override {}
736 
737  private:
738   friend class ServerCallbackUnary;
739   // May be overridden by internal implementation details. This is not a public
740   // customization point.
InternalBindCall(ServerCallbackUnary * call)741   virtual void InternalBindCall(ServerCallbackUnary* call)
742       ABSL_LOCKS_EXCLUDED(call_mu_) {
743     grpc::internal::MutexLock l(&call_mu_);
744 
745     if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
746       call->SendInitialMetadata();
747     }
748     if (GPR_UNLIKELY(backlog_.finish_wanted)) {
749       call->Finish(std::move(backlog_.status_wanted));
750     }
751     // Set call_ last so that other functions can use it lock-free
752     call_.store(call, std::memory_order_release);
753   }
754 
755   grpc::internal::Mutex call_mu_;
756   std::atomic<ServerCallbackUnary*> call_{nullptr};
757   struct PreBindBacklog {
758     bool send_initial_metadata_wanted = false;
759     bool finish_wanted = false;
760     grpc::Status status_wanted;
761   };
762   PreBindBacklog backlog_ ABSL_GUARDED_BY(call_mu_);
763 };
764 
765 namespace internal {
766 
767 template <class Base>
768 class FinishOnlyReactor : public Base {
769  public:
FinishOnlyReactor(grpc::Status s)770   explicit FinishOnlyReactor(grpc::Status s) { this->Finish(std::move(s)); }
OnDone()771   void OnDone() override { this->~FinishOnlyReactor(); }
772 };
773 
774 using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>;
775 template <class Request>
776 using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>;
777 template <class Response>
778 using UnimplementedWriteReactor =
779     FinishOnlyReactor<ServerWriteReactor<Response>>;
780 template <class Request, class Response>
781 using UnimplementedBidiReactor =
782     FinishOnlyReactor<ServerBidiReactor<Request, Response>>;
783 
784 }  // namespace internal
785 
786 // TODO(vjpai): Remove namespace experimental when last known users are migrated
787 // off.
788 namespace experimental {
789 
790 template <class Request, class Response>
791 using ServerBidiReactor = ::grpc::ServerBidiReactor<Request, Response>;
792 
793 }  // namespace experimental
794 
795 }  // namespace grpc
796 
797 #endif  // GRPCPP_SUPPORT_SERVER_CALLBACK_H
798