• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
20 
21 #include <atomic>
22 #include <functional>
23 #include <type_traits>
24 
25 #include <grpcpp/impl/codegen/call.h>
26 #include <grpcpp/impl/codegen/call_op_set.h>
27 #include <grpcpp/impl/codegen/callback_common.h>
28 #include <grpcpp/impl/codegen/config.h>
29 #include <grpcpp/impl/codegen/core_codegen_interface.h>
30 #include <grpcpp/impl/codegen/message_allocator.h>
31 #include <grpcpp/impl/codegen/status.h>
32 
33 namespace grpc {
34 
35 // Declare base class of all reactors as internal
36 namespace internal {
37 
38 // Forward declarations
39 template <class Request, class Response>
40 class CallbackUnaryHandler;
41 template <class Request, class Response>
42 class CallbackClientStreamingHandler;
43 template <class Request, class Response>
44 class CallbackServerStreamingHandler;
45 template <class Request, class Response>
46 class CallbackBidiHandler;
47 
48 class ServerReactor {
49  public:
50   virtual ~ServerReactor() = default;
51   virtual void OnDone() = 0;
52   virtual void OnCancel() = 0;
53 
54   // The following is not API. It is for internal use only and specifies whether
55   // all reactions of this Reactor can be run without an extra executor
56   // scheduling. This should only be used for internally-defined reactors with
57   // trivial reactions.
InternalInlineable()58   virtual bool InternalInlineable() { return false; }
59 
60  private:
61   template <class Request, class Response>
62   friend class CallbackUnaryHandler;
63   template <class Request, class Response>
64   friend class CallbackClientStreamingHandler;
65   template <class Request, class Response>
66   friend class CallbackServerStreamingHandler;
67   template <class Request, class Response>
68   friend class CallbackBidiHandler;
69 };
70 
71 /// The base class of ServerCallbackUnary etc.
72 class ServerCallbackCall {
73  public:
~ServerCallbackCall()74   virtual ~ServerCallbackCall() {}
75 
76   // This object is responsible for tracking when it is safe to call OnDone and
77   // OnCancel. OnDone should not be called until the method handler is complete,
78   // Finish has been called, the ServerContext CompletionOp (which tracks
79   // cancellation or successful completion) has completed, and all outstanding
80   // Read/Write actions have seen their reactions. OnCancel should not be called
81   // until after the method handler is done and the RPC has completed with a
82   // cancellation. This is tracked by counting how many of these conditions have
83   // been met and calling OnCancel when none remain unmet.
84 
85   // Public versions of MaybeDone: one where we don't know the reactor in
86   // advance (used for the ServerContext CompletionOp), and one for where we
87   // know the inlineability of the OnDone reaction. You should set the inline
88   // flag to true if either the Reactor is InternalInlineable() or if this
89   // callback is already being forced to run dispatched to an executor
90   // (typically because it contains additional work than just the MaybeDone).
91 
MaybeDone()92   void MaybeDone() {
93     if (GPR_UNLIKELY(Unref() == 1)) {
94       ScheduleOnDone(reactor()->InternalInlineable());
95     }
96   }
97 
MaybeDone(bool inline_ondone)98   void MaybeDone(bool inline_ondone) {
99     if (GPR_UNLIKELY(Unref() == 1)) {
100       ScheduleOnDone(inline_ondone);
101     }
102   }
103 
104   // Fast version called with known reactor passed in, used from derived
105   // classes, typically in non-cancel case
MaybeCallOnCancel(ServerReactor * reactor)106   void MaybeCallOnCancel(ServerReactor* reactor) {
107     if (GPR_UNLIKELY(UnblockCancellation())) {
108       CallOnCancel(reactor);
109     }
110   }
111 
112   // Slower version called from object that doesn't know the reactor a priori
113   // (such as the ServerContext CompletionOp which is formed before the
114   // reactor). This is used in cancel cases only, so it's ok to be slower and
115   // invoke a virtual function.
MaybeCallOnCancel()116   void MaybeCallOnCancel() {
117     if (GPR_UNLIKELY(UnblockCancellation())) {
118       CallOnCancel(reactor());
119     }
120   }
121 
122  protected:
123   /// Increases the reference count
Ref()124   void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); }
125 
126  private:
127   virtual ServerReactor* reactor() = 0;
128 
129   // CallOnDone performs the work required at completion of the RPC: invoking
130   // the OnDone function and doing all necessary cleanup. This function is only
131   // ever invoked on a fully-Unref'fed ServerCallbackCall.
132   virtual void CallOnDone() = 0;
133 
134   // If the OnDone reaction is inlineable, execute it inline. Otherwise send it
135   // to an executor.
136   void ScheduleOnDone(bool inline_ondone);
137 
138   // If the OnCancel reaction is inlineable, execute it inline. Otherwise send
139   // it to an executor.
140   void CallOnCancel(ServerReactor* reactor);
141 
142   // Implement the cancellation constraint counter. Return true if OnCancel
143   // should be called, false otherwise.
UnblockCancellation()144   bool UnblockCancellation() {
145     return on_cancel_conditions_remaining_.fetch_sub(
146                1, std::memory_order_acq_rel) == 1;
147   }
148 
149   /// Decreases the reference count and returns the previous value
Unref()150   int Unref() {
151     return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
152   }
153 
154   std::atomic_int on_cancel_conditions_remaining_{2};
155   std::atomic_int callbacks_outstanding_{
156       3};  // reserve for start, Finish, and CompletionOp
157 };
158 
159 template <class Request, class Response>
160 class DefaultMessageHolder
161     : public ::grpc::experimental::MessageHolder<Request, Response> {
162  public:
DefaultMessageHolder()163   DefaultMessageHolder() {
164     this->set_request(&request_obj_);
165     this->set_response(&response_obj_);
166   }
Release()167   void Release() override {
168     // the object is allocated in the call arena.
169     this->~DefaultMessageHolder<Request, Response>();
170   }
171 
172  private:
173   Request request_obj_;
174   Response response_obj_;
175 };
176 
177 }  // namespace internal
178 
179 // Forward declarations
180 class ServerUnaryReactor;
181 template <class Request>
182 class ServerReadReactor;
183 template <class Response>
184 class ServerWriteReactor;
185 template <class Request, class Response>
186 class ServerBidiReactor;
187 
188 // NOTE: The actual call/stream object classes are provided as API only to
189 // support mocking. There are no implementations of these class interfaces in
190 // the API.
191 class ServerCallbackUnary : public internal::ServerCallbackCall {
192  public:
~ServerCallbackUnary()193   ~ServerCallbackUnary() override {}
194   virtual void Finish(::grpc::Status s) = 0;
195   virtual void SendInitialMetadata() = 0;
196 
197  protected:
198   // Use a template rather than explicitly specifying ServerUnaryReactor to
199   // delay binding and avoid a circular forward declaration issue
200   template <class Reactor>
BindReactor(Reactor * reactor)201   void BindReactor(Reactor* reactor) {
202     reactor->InternalBindCall(this);
203   }
204 };
205 
206 template <class Request>
207 class ServerCallbackReader : public internal::ServerCallbackCall {
208  public:
~ServerCallbackReader()209   ~ServerCallbackReader() override {}
210   virtual void Finish(::grpc::Status s) = 0;
211   virtual void SendInitialMetadata() = 0;
212   virtual void Read(Request* msg) = 0;
213 
214  protected:
BindReactor(ServerReadReactor<Request> * reactor)215   void BindReactor(ServerReadReactor<Request>* reactor) {
216     reactor->InternalBindReader(this);
217   }
218 };
219 
220 template <class Response>
221 class ServerCallbackWriter : public internal::ServerCallbackCall {
222  public:
~ServerCallbackWriter()223   ~ServerCallbackWriter() override {}
224 
225   virtual void Finish(::grpc::Status s) = 0;
226   virtual void SendInitialMetadata() = 0;
227   virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0;
228   virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options,
229                               ::grpc::Status s) = 0;
230 
231  protected:
BindReactor(ServerWriteReactor<Response> * reactor)232   void BindReactor(ServerWriteReactor<Response>* reactor) {
233     reactor->InternalBindWriter(this);
234   }
235 };
236 
237 template <class Request, class Response>
238 class ServerCallbackReaderWriter : public internal::ServerCallbackCall {
239  public:
~ServerCallbackReaderWriter()240   ~ServerCallbackReaderWriter() override {}
241 
242   virtual void Finish(::grpc::Status s) = 0;
243   virtual void SendInitialMetadata() = 0;
244   virtual void Read(Request* msg) = 0;
245   virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0;
246   virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options,
247                               ::grpc::Status s) = 0;
248 
249  protected:
BindReactor(ServerBidiReactor<Request,Response> * reactor)250   void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
251     reactor->InternalBindStream(this);
252   }
253 };
254 
255 // The following classes are the reactor interfaces that are to be implemented
256 // by the user, returned as the output parameter of the method handler for a
257 // callback method. Note that none of the classes are pure; all reactions have a
258 // default empty reaction so that the user class only needs to override those
259 // classes that it cares about.
260 
261 /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC.
262 template <class Request, class Response>
263 class ServerBidiReactor : public internal::ServerReactor {
264  public:
265   // NOTE: Initializing stream_ as a constructor initializer rather than a
266   //       default initializer because gcc-4.x requires a copy constructor for
267   //       default initializing a templated member, which isn't ok for atomic.
268   // TODO(vjpai): Switch to default constructor and default initializer when
269   //              gcc-4.x is no longer supported
ServerBidiReactor()270   ServerBidiReactor() : stream_(nullptr) {}
271   ~ServerBidiReactor() override = default;
272 
273   /// Send any initial metadata stored in the RPC context. If not invoked,
274   /// any initial metadata will be passed along with the first Write or the
275   /// Finish (if there are no writes).
StartSendInitialMetadata()276   void StartSendInitialMetadata() {
277     ServerCallbackReaderWriter<Request, Response>* stream =
278         stream_.load(std::memory_order_acquire);
279     if (stream == nullptr) {
280       grpc::internal::MutexLock l(&stream_mu_);
281       stream = stream_.load(std::memory_order_relaxed);
282       if (stream == nullptr) {
283         backlog_.send_initial_metadata_wanted = true;
284         return;
285       }
286     }
287     stream->SendInitialMetadata();
288   }
289 
290   /// Initiate a read operation.
291   ///
292   /// \param[out] req Where to eventually store the read message. Valid when
293   ///                 the library calls OnReadDone
StartRead(Request * req)294   void StartRead(Request* req) {
295     ServerCallbackReaderWriter<Request, Response>* stream =
296         stream_.load(std::memory_order_acquire);
297     if (stream == nullptr) {
298       grpc::internal::MutexLock l(&stream_mu_);
299       stream = stream_.load(std::memory_order_relaxed);
300       if (stream == nullptr) {
301         backlog_.read_wanted = req;
302         return;
303       }
304     }
305     stream->Read(req);
306   }
307 
308   /// Initiate a write operation.
309   ///
310   /// \param[in] resp The message to be written. The library does not take
311   ///                 ownership but the caller must ensure that the message is
312   ///                 not deleted or modified until OnWriteDone is called.
StartWrite(const Response * resp)313   void StartWrite(const Response* resp) {
314     StartWrite(resp, ::grpc::WriteOptions());
315   }
316 
317   /// Initiate a write operation with specified options.
318   ///
319   /// \param[in] resp The message to be written. The library does not take
320   ///                 ownership but the caller must ensure that the message is
321   ///                 not deleted or modified until OnWriteDone is called.
322   /// \param[in] options The WriteOptions to use for writing this message
StartWrite(const Response * resp,::grpc::WriteOptions options)323   void StartWrite(const Response* resp, ::grpc::WriteOptions options) {
324     ServerCallbackReaderWriter<Request, Response>* stream =
325         stream_.load(std::memory_order_acquire);
326     if (stream == nullptr) {
327       grpc::internal::MutexLock l(&stream_mu_);
328       stream = stream_.load(std::memory_order_relaxed);
329       if (stream == nullptr) {
330         backlog_.write_wanted = resp;
331         backlog_.write_options_wanted = options;
332         return;
333       }
334     }
335     stream->Write(resp, options);
336   }
337 
338   /// Initiate a write operation with specified options and final RPC Status,
339   /// which also causes any trailing metadata for this RPC to be sent out.
340   /// StartWriteAndFinish is like merging StartWriteLast and Finish into a
341   /// single step. A key difference, though, is that this operation doesn't have
342   /// an OnWriteDone reaction - it is considered complete only when OnDone is
343   /// available. An RPC can either have StartWriteAndFinish or Finish, but not
344   /// both.
345   ///
346   /// \param[in] resp The message to be written. The library does not take
347   ///                 ownership but the caller must ensure that the message is
348   ///                 not deleted or modified until OnDone is called.
349   /// \param[in] options The WriteOptions to use for writing this message
350   /// \param[in] s The status outcome of this RPC
StartWriteAndFinish(const Response * resp,::grpc::WriteOptions options,::grpc::Status s)351   void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options,
352                            ::grpc::Status s) {
353     ServerCallbackReaderWriter<Request, Response>* stream =
354         stream_.load(std::memory_order_acquire);
355     if (stream == nullptr) {
356       grpc::internal::MutexLock l(&stream_mu_);
357       stream = stream_.load(std::memory_order_relaxed);
358       if (stream == nullptr) {
359         backlog_.write_and_finish_wanted = true;
360         backlog_.write_wanted = resp;
361         backlog_.write_options_wanted = options;
362         backlog_.status_wanted = std::move(s);
363         return;
364       }
365     }
366     stream->WriteAndFinish(resp, options, std::move(s));
367   }
368 
369   /// Inform system of a planned write operation with specified options, but
370   /// allow the library to schedule the actual write coalesced with the writing
371   /// of trailing metadata (which takes place on a Finish call).
372   ///
373   /// \param[in] resp The message to be written. The library does not take
374   ///                 ownership but the caller must ensure that the message is
375   ///                 not deleted or modified until OnWriteDone is called.
376   /// \param[in] options The WriteOptions to use for writing this message
StartWriteLast(const Response * resp,::grpc::WriteOptions options)377   void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) {
378     StartWrite(resp, options.set_last_message());
379   }
380 
381   /// Indicate that the stream is to be finished and the trailing metadata and
382   /// RPC status are to be sent. Every RPC MUST be finished using either Finish
383   /// or StartWriteAndFinish (but not both), even if the RPC is already
384   /// cancelled.
385   ///
386   /// \param[in] s The status outcome of this RPC
Finish(::grpc::Status s)387   void Finish(::grpc::Status s) {
388     ServerCallbackReaderWriter<Request, Response>* stream =
389         stream_.load(std::memory_order_acquire);
390     if (stream == nullptr) {
391       grpc::internal::MutexLock l(&stream_mu_);
392       stream = stream_.load(std::memory_order_relaxed);
393       if (stream == nullptr) {
394         backlog_.finish_wanted = true;
395         backlog_.status_wanted = std::move(s);
396         return;
397       }
398     }
399     stream->Finish(std::move(s));
400   }
401 
402   /// Notifies the application that an explicit StartSendInitialMetadata
403   /// operation completed. Not used when the sending of initial metadata
404   /// piggybacks onto the first write.
405   ///
406   /// \param[in] ok Was it successful? If false, no further write-side operation
407   ///               will succeed.
OnSendInitialMetadataDone(bool)408   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
409 
410   /// Notifies the application that a StartRead operation completed.
411   ///
412   /// \param[in] ok Was it successful? If false, no further read-side operation
413   ///               will succeed.
OnReadDone(bool)414   virtual void OnReadDone(bool /*ok*/) {}
415 
416   /// Notifies the application that a StartWrite (or StartWriteLast) operation
417   /// completed.
418   ///
419   /// \param[in] ok Was it successful? If false, no further write-side operation
420   ///               will succeed.
OnWriteDone(bool)421   virtual void OnWriteDone(bool /*ok*/) {}
422 
423   /// Notifies the application that all operations associated with this RPC
424   /// have completed. This is an override (from the internal base class) but
425   /// still abstract, so derived classes MUST override it to be instantiated.
426   void OnDone() override = 0;
427 
428   /// Notifies the application that this RPC has been cancelled. This is an
429   /// override (from the internal base class) but not final, so derived classes
430   /// should override it if they want to take action.
OnCancel()431   void OnCancel() override {}
432 
433  private:
434   friend class ServerCallbackReaderWriter<Request, Response>;
435   // May be overridden by internal implementation details. This is not a public
436   // customization point.
InternalBindStream(ServerCallbackReaderWriter<Request,Response> * stream)437   virtual void InternalBindStream(
438       ServerCallbackReaderWriter<Request, Response>* stream) {
439     grpc::internal::MutexLock l(&stream_mu_);
440 
441     if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
442       stream->SendInitialMetadata();
443     }
444     if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
445       stream->Read(backlog_.read_wanted);
446     }
447     if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
448       stream->WriteAndFinish(backlog_.write_wanted,
449                              std::move(backlog_.write_options_wanted),
450                              std::move(backlog_.status_wanted));
451     } else {
452       if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
453         stream->Write(backlog_.write_wanted,
454                       std::move(backlog_.write_options_wanted));
455       }
456       if (GPR_UNLIKELY(backlog_.finish_wanted)) {
457         stream->Finish(std::move(backlog_.status_wanted));
458       }
459     }
460     // Set stream_ last so that other functions can use it lock-free
461     stream_.store(stream, std::memory_order_release);
462   }
463 
464   grpc::internal::Mutex stream_mu_;
465   // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant
466   //              once C++17 or ABSL is supported since stream and backlog are
467   //              mutually exclusive in this class. Do likewise with the
468   //              remaining reactor classes and their backlogs as well.
469   std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
470   struct PreBindBacklog {
471     bool send_initial_metadata_wanted = false;
472     bool write_and_finish_wanted = false;
473     bool finish_wanted = false;
474     Request* read_wanted = nullptr;
475     const Response* write_wanted = nullptr;
476     ::grpc::WriteOptions write_options_wanted;
477     ::grpc::Status status_wanted;
478   };
479   PreBindBacklog backlog_ /* GUARDED_BY(stream_mu_) */;
480 };
481 
482 /// \a ServerReadReactor is the interface for a client-streaming RPC.
483 template <class Request>
484 class ServerReadReactor : public internal::ServerReactor {
485  public:
ServerReadReactor()486   ServerReadReactor() : reader_(nullptr) {}
487   ~ServerReadReactor() override = default;
488 
489   /// The following operation initiations are exactly like ServerBidiReactor.
StartSendInitialMetadata()490   void StartSendInitialMetadata() {
491     ServerCallbackReader<Request>* reader =
492         reader_.load(std::memory_order_acquire);
493     if (reader == nullptr) {
494       grpc::internal::MutexLock l(&reader_mu_);
495       reader = reader_.load(std::memory_order_relaxed);
496       if (reader == nullptr) {
497         backlog_.send_initial_metadata_wanted = true;
498         return;
499       }
500     }
501     reader->SendInitialMetadata();
502   }
StartRead(Request * req)503   void StartRead(Request* req) {
504     ServerCallbackReader<Request>* reader =
505         reader_.load(std::memory_order_acquire);
506     if (reader == nullptr) {
507       grpc::internal::MutexLock l(&reader_mu_);
508       reader = reader_.load(std::memory_order_relaxed);
509       if (reader == nullptr) {
510         backlog_.read_wanted = req;
511         return;
512       }
513     }
514     reader->Read(req);
515   }
Finish(::grpc::Status s)516   void Finish(::grpc::Status s) {
517     ServerCallbackReader<Request>* reader =
518         reader_.load(std::memory_order_acquire);
519     if (reader == nullptr) {
520       grpc::internal::MutexLock l(&reader_mu_);
521       reader = reader_.load(std::memory_order_relaxed);
522       if (reader == nullptr) {
523         backlog_.finish_wanted = true;
524         backlog_.status_wanted = std::move(s);
525         return;
526       }
527     }
528     reader->Finish(std::move(s));
529   }
530 
531   /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)532   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
OnReadDone(bool)533   virtual void OnReadDone(bool /*ok*/) {}
534   void OnDone() override = 0;
OnCancel()535   void OnCancel() override {}
536 
537  private:
538   friend class ServerCallbackReader<Request>;
539 
540   // May be overridden by internal implementation details. This is not a public
541   // customization point.
InternalBindReader(ServerCallbackReader<Request> * reader)542   virtual void InternalBindReader(ServerCallbackReader<Request>* reader) {
543     grpc::internal::MutexLock l(&reader_mu_);
544 
545     if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
546       reader->SendInitialMetadata();
547     }
548     if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
549       reader->Read(backlog_.read_wanted);
550     }
551     if (GPR_UNLIKELY(backlog_.finish_wanted)) {
552       reader->Finish(std::move(backlog_.status_wanted));
553     }
554     // Set reader_ last so that other functions can use it lock-free
555     reader_.store(reader, std::memory_order_release);
556   }
557 
558   grpc::internal::Mutex reader_mu_;
559   std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
560   struct PreBindBacklog {
561     bool send_initial_metadata_wanted = false;
562     bool finish_wanted = false;
563     Request* read_wanted = nullptr;
564     ::grpc::Status status_wanted;
565   };
566   PreBindBacklog backlog_ /* GUARDED_BY(reader_mu_) */;
567 };
568 
569 /// \a ServerWriteReactor is the interface for a server-streaming RPC.
570 template <class Response>
571 class ServerWriteReactor : public internal::ServerReactor {
572  public:
ServerWriteReactor()573   ServerWriteReactor() : writer_(nullptr) {}
574   ~ServerWriteReactor() override = default;
575 
576   /// The following operation initiations are exactly like ServerBidiReactor.
StartSendInitialMetadata()577   void StartSendInitialMetadata() {
578     ServerCallbackWriter<Response>* writer =
579         writer_.load(std::memory_order_acquire);
580     if (writer == nullptr) {
581       grpc::internal::MutexLock l(&writer_mu_);
582       writer = writer_.load(std::memory_order_relaxed);
583       if (writer == nullptr) {
584         backlog_.send_initial_metadata_wanted = true;
585         return;
586       }
587     }
588     writer->SendInitialMetadata();
589   }
StartWrite(const Response * resp)590   void StartWrite(const Response* resp) {
591     StartWrite(resp, ::grpc::WriteOptions());
592   }
StartWrite(const Response * resp,::grpc::WriteOptions options)593   void StartWrite(const Response* resp, ::grpc::WriteOptions options) {
594     ServerCallbackWriter<Response>* writer =
595         writer_.load(std::memory_order_acquire);
596     if (writer == nullptr) {
597       grpc::internal::MutexLock l(&writer_mu_);
598       writer = writer_.load(std::memory_order_relaxed);
599       if (writer == nullptr) {
600         backlog_.write_wanted = resp;
601         backlog_.write_options_wanted = options;
602         return;
603       }
604     }
605     writer->Write(resp, options);
606   }
StartWriteAndFinish(const Response * resp,::grpc::WriteOptions options,::grpc::Status s)607   void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options,
608                            ::grpc::Status s) {
609     ServerCallbackWriter<Response>* writer =
610         writer_.load(std::memory_order_acquire);
611     if (writer == nullptr) {
612       grpc::internal::MutexLock l(&writer_mu_);
613       writer = writer_.load(std::memory_order_relaxed);
614       if (writer == nullptr) {
615         backlog_.write_and_finish_wanted = true;
616         backlog_.write_wanted = resp;
617         backlog_.write_options_wanted = options;
618         backlog_.status_wanted = std::move(s);
619         return;
620       }
621     }
622     writer->WriteAndFinish(resp, options, std::move(s));
623   }
StartWriteLast(const Response * resp,::grpc::WriteOptions options)624   void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) {
625     StartWrite(resp, options.set_last_message());
626   }
Finish(::grpc::Status s)627   void Finish(::grpc::Status s) {
628     ServerCallbackWriter<Response>* writer =
629         writer_.load(std::memory_order_acquire);
630     if (writer == nullptr) {
631       grpc::internal::MutexLock l(&writer_mu_);
632       writer = writer_.load(std::memory_order_relaxed);
633       if (writer == nullptr) {
634         backlog_.finish_wanted = true;
635         backlog_.status_wanted = std::move(s);
636         return;
637       }
638     }
639     writer->Finish(std::move(s));
640   }
641 
642   /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)643   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
OnWriteDone(bool)644   virtual void OnWriteDone(bool /*ok*/) {}
645   void OnDone() override = 0;
OnCancel()646   void OnCancel() override {}
647 
648  private:
649   friend class ServerCallbackWriter<Response>;
650   // May be overridden by internal implementation details. This is not a public
651   // customization point.
InternalBindWriter(ServerCallbackWriter<Response> * writer)652   virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) {
653     grpc::internal::MutexLock l(&writer_mu_);
654 
655     if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
656       writer->SendInitialMetadata();
657     }
658     if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
659       writer->WriteAndFinish(backlog_.write_wanted,
660                              std::move(backlog_.write_options_wanted),
661                              std::move(backlog_.status_wanted));
662     } else {
663       if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
664         writer->Write(backlog_.write_wanted,
665                       std::move(backlog_.write_options_wanted));
666       }
667       if (GPR_UNLIKELY(backlog_.finish_wanted)) {
668         writer->Finish(std::move(backlog_.status_wanted));
669       }
670     }
671     // Set writer_ last so that other functions can use it lock-free
672     writer_.store(writer, std::memory_order_release);
673   }
674 
675   grpc::internal::Mutex writer_mu_;
676   std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
677   struct PreBindBacklog {
678     bool send_initial_metadata_wanted = false;
679     bool write_and_finish_wanted = false;
680     bool finish_wanted = false;
681     const Response* write_wanted = nullptr;
682     ::grpc::WriteOptions write_options_wanted;
683     ::grpc::Status status_wanted;
684   };
685   PreBindBacklog backlog_ /* GUARDED_BY(writer_mu_) */;
686 };
687 
688 class ServerUnaryReactor : public internal::ServerReactor {
689  public:
ServerUnaryReactor()690   ServerUnaryReactor() : call_(nullptr) {}
691   ~ServerUnaryReactor() override = default;
692 
693   /// StartSendInitialMetadata is exactly like ServerBidiReactor.
StartSendInitialMetadata()694   void StartSendInitialMetadata() {
695     ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
696     if (call == nullptr) {
697       grpc::internal::MutexLock l(&call_mu_);
698       call = call_.load(std::memory_order_relaxed);
699       if (call == nullptr) {
700         backlog_.send_initial_metadata_wanted = true;
701         return;
702       }
703     }
704     call->SendInitialMetadata();
705   }
706   /// Finish is similar to ServerBidiReactor except for one detail.
707   /// If the status is non-OK, any message will not be sent. Instead,
708   /// the client will only receive the status and any trailing metadata.
Finish(::grpc::Status s)709   void Finish(::grpc::Status s) {
710     ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
711     if (call == nullptr) {
712       grpc::internal::MutexLock l(&call_mu_);
713       call = call_.load(std::memory_order_relaxed);
714       if (call == nullptr) {
715         backlog_.finish_wanted = true;
716         backlog_.status_wanted = std::move(s);
717         return;
718       }
719     }
720     call->Finish(std::move(s));
721   }
722 
723   /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)724   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
725   void OnDone() override = 0;
OnCancel()726   void OnCancel() override {}
727 
728  private:
729   friend class ServerCallbackUnary;
730   // May be overridden by internal implementation details. This is not a public
731   // customization point.
InternalBindCall(ServerCallbackUnary * call)732   virtual void InternalBindCall(ServerCallbackUnary* call) {
733     grpc::internal::MutexLock l(&call_mu_);
734 
735     if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
736       call->SendInitialMetadata();
737     }
738     if (GPR_UNLIKELY(backlog_.finish_wanted)) {
739       call->Finish(std::move(backlog_.status_wanted));
740     }
741     // Set call_ last so that other functions can use it lock-free
742     call_.store(call, std::memory_order_release);
743   }
744 
745   grpc::internal::Mutex call_mu_;
746   std::atomic<ServerCallbackUnary*> call_{nullptr};
747   struct PreBindBacklog {
748     bool send_initial_metadata_wanted = false;
749     bool finish_wanted = false;
750     ::grpc::Status status_wanted;
751   };
752   PreBindBacklog backlog_ /* GUARDED_BY(call_mu_) */;
753 };
754 
755 namespace internal {
756 
757 template <class Base>
758 class FinishOnlyReactor : public Base {
759  public:
FinishOnlyReactor(::grpc::Status s)760   explicit FinishOnlyReactor(::grpc::Status s) { this->Finish(std::move(s)); }
OnDone()761   void OnDone() override { this->~FinishOnlyReactor(); }
762 };
763 
764 using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>;
765 template <class Request>
766 using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>;
767 template <class Response>
768 using UnimplementedWriteReactor =
769     FinishOnlyReactor<ServerWriteReactor<Response>>;
770 template <class Request, class Response>
771 using UnimplementedBidiReactor =
772     FinishOnlyReactor<ServerBidiReactor<Request, Response>>;
773 
774 }  // namespace internal
775 
776 // TODO(vjpai): Remove namespace experimental when de-experimentalized fully.
777 namespace experimental {
778 
779 template <class Request>
780 using ServerReadReactor = ::grpc::ServerReadReactor<Request>;
781 
782 template <class Response>
783 using ServerWriteReactor = ::grpc::ServerWriteReactor<Response>;
784 
785 template <class Request, class Response>
786 using ServerBidiReactor = ::grpc::ServerBidiReactor<Request, Response>;
787 
788 using ServerUnaryReactor = ::grpc::ServerUnaryReactor;
789 
790 }  // namespace experimental
791 
792 }  // namespace grpc
793 
794 #endif  // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
795