• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
20 
21 #include <atomic>
22 #include <functional>
23 #include <type_traits>
24 
25 #include <grpcpp/impl/codegen/call.h>
26 #include <grpcpp/impl/codegen/call_op_set.h>
27 #include <grpcpp/impl/codegen/callback_common.h>
28 #include <grpcpp/impl/codegen/config.h>
29 #include <grpcpp/impl/codegen/core_codegen_interface.h>
30 #include <grpcpp/impl/codegen/message_allocator.h>
31 #include <grpcpp/impl/codegen/status.h>
32 
33 namespace grpc_impl {
34 
35 // Declare base class of all reactors as internal
36 namespace internal {
37 
38 // Forward declarations
39 template <class Request, class Response>
40 class CallbackUnaryHandler;
41 template <class Request, class Response>
42 class CallbackClientStreamingHandler;
43 template <class Request, class Response>
44 class CallbackServerStreamingHandler;
45 template <class Request, class Response>
46 class CallbackBidiHandler;
47 
48 class ServerReactor {
49  public:
50   virtual ~ServerReactor() = default;
51   virtual void OnDone() = 0;
52   virtual void OnCancel() = 0;
53 
54   // The following is not API. It is for internal use only and specifies whether
55   // all reactions of this Reactor can be run without an extra executor
56   // scheduling. This should only be used for internally-defined reactors with
57   // trivial reactions.
InternalInlineable()58   virtual bool InternalInlineable() { return false; }
59 
60  private:
61   template <class Request, class Response>
62   friend class CallbackUnaryHandler;
63   template <class Request, class Response>
64   friend class CallbackClientStreamingHandler;
65   template <class Request, class Response>
66   friend class CallbackServerStreamingHandler;
67   template <class Request, class Response>
68   friend class CallbackBidiHandler;
69 };
70 
71 /// The base class of ServerCallbackUnary etc.
72 class ServerCallbackCall {
73  public:
~ServerCallbackCall()74   virtual ~ServerCallbackCall() {}
75 
76   // This object is responsible for tracking when it is safe to call OnDone and
77   // OnCancel. OnDone should not be called until the method handler is complete,
78   // Finish has been called, the ServerContext CompletionOp (which tracks
79   // cancellation or successful completion) has completed, and all outstanding
80   // Read/Write actions have seen their reactions. OnCancel should not be called
81   // until after the method handler is done and the RPC has completed with a
82   // cancellation. This is tracked by counting how many of these conditions have
83   // been met and calling OnCancel when none remain unmet.
84 
85   // Public versions of MaybeDone: one where we don't know the reactor in
86   // advance (used for the ServerContext CompletionOp), and one for where we
87   // know the inlineability of the OnDone reaction. You should set the inline
88   // flag to true if either the Reactor is InternalInlineable() or if this
89   // callback is already being forced to run dispatched to an executor
90   // (typically because it contains additional work than just the MaybeDone).
91 
MaybeDone()92   void MaybeDone() {
93     if (GPR_UNLIKELY(Unref() == 1)) {
94       ScheduleOnDone(reactor()->InternalInlineable());
95     }
96   }
97 
MaybeDone(bool inline_ondone)98   void MaybeDone(bool inline_ondone) {
99     if (GPR_UNLIKELY(Unref() == 1)) {
100       ScheduleOnDone(inline_ondone);
101     }
102   }
103 
104   // Fast version called with known reactor passed in, used from derived
105   // classes, typically in non-cancel case
MaybeCallOnCancel(ServerReactor * reactor)106   void MaybeCallOnCancel(ServerReactor* reactor) {
107     if (GPR_UNLIKELY(UnblockCancellation())) {
108       CallOnCancel(reactor);
109     }
110   }
111 
112   // Slower version called from object that doesn't know the reactor a priori
113   // (such as the ServerContext CompletionOp which is formed before the
114   // reactor). This is used in cancel cases only, so it's ok to be slower and
115   // invoke a virtual function.
MaybeCallOnCancel()116   void MaybeCallOnCancel() {
117     if (GPR_UNLIKELY(UnblockCancellation())) {
118       CallOnCancel(reactor());
119     }
120   }
121 
122  protected:
123   /// Increases the reference count
Ref()124   void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); }
125 
126  private:
127   virtual ServerReactor* reactor() = 0;
128 
129   // CallOnDone performs the work required at completion of the RPC: invoking
130   // the OnDone function and doing all necessary cleanup. This function is only
131   // ever invoked on a fully-Unref'fed ServerCallbackCall.
132   virtual void CallOnDone() = 0;
133 
134   // If the OnDone reaction is inlineable, execute it inline. Otherwise send it
135   // to an executor.
136   void ScheduleOnDone(bool inline_ondone);
137 
138   // If the OnCancel reaction is inlineable, execute it inline. Otherwise send
139   // it to an executor.
140   void CallOnCancel(ServerReactor* reactor);
141 
142   // Implement the cancellation constraint counter. Return true if OnCancel
143   // should be called, false otherwise.
UnblockCancellation()144   bool UnblockCancellation() {
145     return on_cancel_conditions_remaining_.fetch_sub(
146                1, std::memory_order_acq_rel) == 1;
147   }
148 
149   /// Decreases the reference count and returns the previous value
Unref()150   int Unref() {
151     return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
152   }
153 
154   std::atomic_int on_cancel_conditions_remaining_{2};
155   std::atomic_int callbacks_outstanding_{
156       3};  // reserve for start, Finish, and CompletionOp
157 };
158 
159 template <class Request, class Response>
160 class DefaultMessageHolder
161     : public ::grpc::experimental::MessageHolder<Request, Response> {
162  public:
DefaultMessageHolder()163   DefaultMessageHolder() {
164     this->set_request(&request_obj_);
165     this->set_response(&response_obj_);
166   }
Release()167   void Release() override {
168     // the object is allocated in the call arena.
169     this->~DefaultMessageHolder<Request, Response>();
170   }
171 
172  private:
173   Request request_obj_;
174   Response response_obj_;
175 };
176 
177 }  // namespace internal
178 
179 // Forward declarations
180 class ServerUnaryReactor;
181 template <class Request>
182 class ServerReadReactor;
183 template <class Response>
184 class ServerWriteReactor;
185 template <class Request, class Response>
186 class ServerBidiReactor;
187 
188 // NOTE: The actual call/stream object classes are provided as API only to
189 // support mocking. There are no implementations of these class interfaces in
190 // the API.
191 class ServerCallbackUnary : public internal::ServerCallbackCall {
192  public:
~ServerCallbackUnary()193   virtual ~ServerCallbackUnary() {}
194   virtual void Finish(::grpc::Status s) = 0;
195   virtual void SendInitialMetadata() = 0;
196 
197  protected:
198   // Use a template rather than explicitly specifying ServerUnaryReactor to
199   // delay binding and avoid a circular forward declaration issue
200   template <class Reactor>
BindReactor(Reactor * reactor)201   void BindReactor(Reactor* reactor) {
202     reactor->InternalBindCall(this);
203   }
204 };
205 
206 template <class Request>
207 class ServerCallbackReader : public internal::ServerCallbackCall {
208  public:
~ServerCallbackReader()209   virtual ~ServerCallbackReader() {}
210   virtual void Finish(::grpc::Status s) = 0;
211   virtual void SendInitialMetadata() = 0;
212   virtual void Read(Request* msg) = 0;
213 
214  protected:
BindReactor(ServerReadReactor<Request> * reactor)215   void BindReactor(ServerReadReactor<Request>* reactor) {
216     reactor->InternalBindReader(this);
217   }
218 };
219 
220 template <class Response>
221 class ServerCallbackWriter : public internal::ServerCallbackCall {
222  public:
~ServerCallbackWriter()223   virtual ~ServerCallbackWriter() {}
224 
225   virtual void Finish(::grpc::Status s) = 0;
226   virtual void SendInitialMetadata() = 0;
227   virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0;
228   virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options,
229                               ::grpc::Status s) = 0;
230 
231  protected:
BindReactor(ServerWriteReactor<Response> * reactor)232   void BindReactor(ServerWriteReactor<Response>* reactor) {
233     reactor->InternalBindWriter(this);
234   }
235 };
236 
237 template <class Request, class Response>
238 class ServerCallbackReaderWriter : public internal::ServerCallbackCall {
239  public:
~ServerCallbackReaderWriter()240   virtual ~ServerCallbackReaderWriter() {}
241 
242   virtual void Finish(::grpc::Status s) = 0;
243   virtual void SendInitialMetadata() = 0;
244   virtual void Read(Request* msg) = 0;
245   virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0;
246   virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options,
247                               ::grpc::Status s) = 0;
248 
249  protected:
BindReactor(ServerBidiReactor<Request,Response> * reactor)250   void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
251     reactor->InternalBindStream(this);
252   }
253 };
254 
255 // The following classes are the reactor interfaces that are to be implemented
256 // by the user, returned as the output parameter of the method handler for a
257 // callback method. Note that none of the classes are pure; all reactions have a
258 // default empty reaction so that the user class only needs to override those
259 // classes that it cares about.
260 
261 /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC.
262 template <class Request, class Response>
263 class ServerBidiReactor : public internal::ServerReactor {
264  public:
265   // NOTE: Initializing stream_ as a constructor initializer rather than a
266   //       default initializer because gcc-4.x requires a copy constructor for
267   //       default initializing a templated member, which isn't ok for atomic.
268   // TODO(vjpai): Switch to default constructor and default initializer when
269   //              gcc-4.x is no longer supported
ServerBidiReactor()270   ServerBidiReactor() : stream_(nullptr) {}
271   ~ServerBidiReactor() = default;
272 
273   /// Send any initial metadata stored in the RPC context. If not invoked,
274   /// any initial metadata will be passed along with the first Write or the
275   /// Finish (if there are no writes).
StartSendInitialMetadata()276   void StartSendInitialMetadata() {
277     ServerCallbackReaderWriter<Request, Response>* stream =
278         stream_.load(std::memory_order_acquire);
279     if (stream == nullptr) {
280       grpc::internal::MutexLock l(&stream_mu_);
281       stream = stream_.load(std::memory_order_relaxed);
282       if (stream == nullptr) {
283         backlog_.send_initial_metadata_wanted = true;
284         return;
285       }
286     }
287     stream->SendInitialMetadata();
288   }
289 
290   /// Initiate a read operation.
291   ///
292   /// \param[out] req Where to eventually store the read message. Valid when
293   ///                 the library calls OnReadDone
StartRead(Request * req)294   void StartRead(Request* req) {
295     ServerCallbackReaderWriter<Request, Response>* stream =
296         stream_.load(std::memory_order_acquire);
297     if (stream == nullptr) {
298       grpc::internal::MutexLock l(&stream_mu_);
299       stream = stream_.load(std::memory_order_relaxed);
300       if (stream == nullptr) {
301         backlog_.read_wanted = req;
302         return;
303       }
304     }
305     stream->Read(req);
306   }
307 
308   /// Initiate a write operation.
309   ///
310   /// \param[in] resp The message to be written. The library does not take
311   ///                 ownership but the caller must ensure that the message is
312   ///                 not deleted or modified until OnWriteDone is called.
StartWrite(const Response * resp)313   void StartWrite(const Response* resp) {
314     StartWrite(resp, ::grpc::WriteOptions());
315   }
316 
317   /// Initiate a write operation with specified options.
318   ///
319   /// \param[in] resp The message to be written. The library does not take
320   ///                 ownership but the caller must ensure that the message is
321   ///                 not deleted or modified until OnWriteDone is called.
322   /// \param[in] options The WriteOptions to use for writing this message
StartWrite(const Response * resp,::grpc::WriteOptions options)323   void StartWrite(const Response* resp, ::grpc::WriteOptions options) {
324     ServerCallbackReaderWriter<Request, Response>* stream =
325         stream_.load(std::memory_order_acquire);
326     if (stream == nullptr) {
327       grpc::internal::MutexLock l(&stream_mu_);
328       stream = stream_.load(std::memory_order_relaxed);
329       if (stream == nullptr) {
330         backlog_.write_wanted = resp;
331         backlog_.write_options_wanted = std::move(options);
332         return;
333       }
334     }
335     stream->Write(resp, std::move(options));
336   }
337 
338   /// Initiate a write operation with specified options and final RPC Status,
339   /// which also causes any trailing metadata for this RPC to be sent out.
340   /// StartWriteAndFinish is like merging StartWriteLast and Finish into a
341   /// single step. A key difference, though, is that this operation doesn't have
342   /// an OnWriteDone reaction - it is considered complete only when OnDone is
343   /// available. An RPC can either have StartWriteAndFinish or Finish, but not
344   /// both.
345   ///
346   /// \param[in] resp The message to be written. The library does not take
347   ///                 ownership but the caller must ensure that the message is
348   ///                 not deleted or modified until OnDone is called.
349   /// \param[in] options The WriteOptions to use for writing this message
350   /// \param[in] s The status outcome of this RPC
StartWriteAndFinish(const Response * resp,::grpc::WriteOptions options,::grpc::Status s)351   void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options,
352                            ::grpc::Status s) {
353     ServerCallbackReaderWriter<Request, Response>* stream =
354         stream_.load(std::memory_order_acquire);
355     if (stream == nullptr) {
356       grpc::internal::MutexLock l(&stream_mu_);
357       stream = stream_.load(std::memory_order_relaxed);
358       if (stream == nullptr) {
359         backlog_.write_and_finish_wanted = true;
360         backlog_.write_wanted = resp;
361         backlog_.write_options_wanted = std::move(options);
362         backlog_.status_wanted = std::move(s);
363         return;
364       }
365     }
366     stream->WriteAndFinish(resp, std::move(options), std::move(s));
367   }
368 
369   /// Inform system of a planned write operation with specified options, but
370   /// allow the library to schedule the actual write coalesced with the writing
371   /// of trailing metadata (which takes place on a Finish call).
372   ///
373   /// \param[in] resp The message to be written. The library does not take
374   ///                 ownership but the caller must ensure that the message is
375   ///                 not deleted or modified until OnWriteDone is called.
376   /// \param[in] options The WriteOptions to use for writing this message
StartWriteLast(const Response * resp,::grpc::WriteOptions options)377   void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) {
378     StartWrite(resp, std::move(options.set_last_message()));
379   }
380 
381   /// Indicate that the stream is to be finished and the trailing metadata and
382   /// RPC status are to be sent. Every RPC MUST be finished using either Finish
383   /// or StartWriteAndFinish (but not both), even if the RPC is already
384   /// cancelled.
385   ///
386   /// \param[in] s The status outcome of this RPC
Finish(::grpc::Status s)387   void Finish(::grpc::Status s) {
388     ServerCallbackReaderWriter<Request, Response>* stream =
389         stream_.load(std::memory_order_acquire);
390     if (stream == nullptr) {
391       grpc::internal::MutexLock l(&stream_mu_);
392       stream = stream_.load(std::memory_order_relaxed);
393       if (stream == nullptr) {
394         backlog_.finish_wanted = true;
395         backlog_.status_wanted = std::move(s);
396         return;
397       }
398     }
399     stream->Finish(std::move(s));
400   }
401 
402   /// Notifies the application that an explicit StartSendInitialMetadata
403   /// operation completed. Not used when the sending of initial metadata
404   /// piggybacks onto the first write.
405   ///
406   /// \param[in] ok Was it successful? If false, no further write-side operation
407   ///               will succeed.
OnSendInitialMetadataDone(bool)408   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
409 
410   /// Notifies the application that a StartRead operation completed.
411   ///
412   /// \param[in] ok Was it successful? If false, no further read-side operation
413   ///               will succeed.
OnReadDone(bool)414   virtual void OnReadDone(bool /*ok*/) {}
415 
416   /// Notifies the application that a StartWrite (or StartWriteLast) operation
417   /// completed.
418   ///
419   /// \param[in] ok Was it successful? If false, no further write-side operation
420   ///               will succeed.
OnWriteDone(bool)421   virtual void OnWriteDone(bool /*ok*/) {}
422 
423   /// Notifies the application that all operations associated with this RPC
424   /// have completed. This is an override (from the internal base class) but
425   /// still abstract, so derived classes MUST override it to be instantiated.
426   void OnDone() override = 0;
427 
428   /// Notifies the application that this RPC has been cancelled. This is an
429   /// override (from the internal base class) but not final, so derived classes
430   /// should override it if they want to take action.
OnCancel()431   void OnCancel() override {}
432 
433  private:
434   friend class ServerCallbackReaderWriter<Request, Response>;
435   // May be overridden by internal implementation details. This is not a public
436   // customization point.
InternalBindStream(ServerCallbackReaderWriter<Request,Response> * stream)437   virtual void InternalBindStream(
438       ServerCallbackReaderWriter<Request, Response>* stream) {
439     // TODO(vjpai): When stream_or_backlog_ becomes a variant (see below), use
440     // a scoped MutexLock and std::swap stream_or_backlog_ with a variant that
441     // has stream, then std::get<PreBindBacklog> out of that after the lock.
442     // Do likewise with the remaining InternalBind* functions as well.
443     grpc::internal::ReleasableMutexLock l(&stream_mu_);
444     PreBindBacklog ops(std::move(backlog_));
445     stream_.store(stream, std::memory_order_release);
446     l.Unlock();
447 
448     if (ops.send_initial_metadata_wanted) {
449       stream->SendInitialMetadata();
450     }
451     if (ops.read_wanted != nullptr) {
452       stream->Read(ops.read_wanted);
453     }
454     if (ops.write_and_finish_wanted) {
455       stream->WriteAndFinish(ops.write_wanted,
456                              std::move(ops.write_options_wanted),
457                              std::move(ops.status_wanted));
458     } else {
459       if (ops.write_wanted != nullptr) {
460         stream->Write(ops.write_wanted, std::move(ops.write_options_wanted));
461       }
462       if (ops.finish_wanted) {
463         stream->Finish(std::move(ops.status_wanted));
464       }
465     }
466   }
467 
468   grpc::internal::Mutex stream_mu_;
469   // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant
470   //              once C++17 or ABSL is supported since stream and backlog are
471   //              mutually exclusive in this class. Do likewise with the
472   //              remaining reactor classes and their backlogs as well.
473   std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
474   struct PreBindBacklog {
475     bool send_initial_metadata_wanted = false;
476     bool write_and_finish_wanted = false;
477     bool finish_wanted = false;
478     Request* read_wanted = nullptr;
479     const Response* write_wanted = nullptr;
480     ::grpc::WriteOptions write_options_wanted;
481     ::grpc::Status status_wanted;
482   };
483   PreBindBacklog backlog_ /* GUARDED_BY(stream_mu_) */;
484 };
485 
486 /// \a ServerReadReactor is the interface for a client-streaming RPC.
487 template <class Request>
488 class ServerReadReactor : public internal::ServerReactor {
489  public:
ServerReadReactor()490   ServerReadReactor() : reader_(nullptr) {}
491   ~ServerReadReactor() = default;
492 
493   /// The following operation initiations are exactly like ServerBidiReactor.
StartSendInitialMetadata()494   void StartSendInitialMetadata() {
495     ServerCallbackReader<Request>* reader =
496         reader_.load(std::memory_order_acquire);
497     if (reader == nullptr) {
498       grpc::internal::MutexLock l(&reader_mu_);
499       reader = reader_.load(std::memory_order_relaxed);
500       if (reader == nullptr) {
501         backlog_.send_initial_metadata_wanted = true;
502         return;
503       }
504     }
505     reader->SendInitialMetadata();
506   }
StartRead(Request * req)507   void StartRead(Request* req) {
508     ServerCallbackReader<Request>* reader =
509         reader_.load(std::memory_order_acquire);
510     if (reader == nullptr) {
511       grpc::internal::MutexLock l(&reader_mu_);
512       reader = reader_.load(std::memory_order_relaxed);
513       if (reader == nullptr) {
514         backlog_.read_wanted = req;
515         return;
516       }
517     }
518     reader->Read(req);
519   }
Finish(::grpc::Status s)520   void Finish(::grpc::Status s) {
521     ServerCallbackReader<Request>* reader =
522         reader_.load(std::memory_order_acquire);
523     if (reader == nullptr) {
524       grpc::internal::MutexLock l(&reader_mu_);
525       reader = reader_.load(std::memory_order_relaxed);
526       if (reader == nullptr) {
527         backlog_.finish_wanted = true;
528         backlog_.status_wanted = std::move(s);
529         return;
530       }
531     }
532     reader->Finish(std::move(s));
533   }
534 
535   /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)536   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
OnReadDone(bool)537   virtual void OnReadDone(bool /*ok*/) {}
538   void OnDone() override = 0;
OnCancel()539   void OnCancel() override {}
540 
541  private:
542   friend class ServerCallbackReader<Request>;
543 
544   // May be overridden by internal implementation details. This is not a public
545   // customization point.
InternalBindReader(ServerCallbackReader<Request> * reader)546   virtual void InternalBindReader(ServerCallbackReader<Request>* reader) {
547     grpc::internal::ReleasableMutexLock l(&reader_mu_);
548     PreBindBacklog ops(std::move(backlog_));
549     reader_.store(reader, std::memory_order_release);
550     l.Unlock();
551 
552     if (ops.send_initial_metadata_wanted) {
553       reader->SendInitialMetadata();
554     }
555     if (ops.read_wanted != nullptr) {
556       reader->Read(ops.read_wanted);
557     }
558     if (ops.finish_wanted) {
559       reader->Finish(std::move(ops.status_wanted));
560     }
561   }
562 
563   grpc::internal::Mutex reader_mu_;
564   std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
565   struct PreBindBacklog {
566     bool send_initial_metadata_wanted = false;
567     bool finish_wanted = false;
568     Request* read_wanted = nullptr;
569     ::grpc::Status status_wanted;
570   };
571   PreBindBacklog backlog_ /* GUARDED_BY(reader_mu_) */;
572 };
573 
574 /// \a ServerWriteReactor is the interface for a server-streaming RPC.
575 template <class Response>
576 class ServerWriteReactor : public internal::ServerReactor {
577  public:
ServerWriteReactor()578   ServerWriteReactor() : writer_(nullptr) {}
579   ~ServerWriteReactor() = default;
580 
581   /// The following operation initiations are exactly like ServerBidiReactor.
StartSendInitialMetadata()582   void StartSendInitialMetadata() {
583     ServerCallbackWriter<Response>* writer =
584         writer_.load(std::memory_order_acquire);
585     if (writer == nullptr) {
586       grpc::internal::MutexLock l(&writer_mu_);
587       writer = writer_.load(std::memory_order_relaxed);
588       if (writer == nullptr) {
589         backlog_.send_initial_metadata_wanted = true;
590         return;
591       }
592     }
593     writer->SendInitialMetadata();
594   }
StartWrite(const Response * resp)595   void StartWrite(const Response* resp) {
596     StartWrite(resp, ::grpc::WriteOptions());
597   }
StartWrite(const Response * resp,::grpc::WriteOptions options)598   void StartWrite(const Response* resp, ::grpc::WriteOptions options) {
599     ServerCallbackWriter<Response>* writer =
600         writer_.load(std::memory_order_acquire);
601     if (writer == nullptr) {
602       grpc::internal::MutexLock l(&writer_mu_);
603       writer = writer_.load(std::memory_order_relaxed);
604       if (writer == nullptr) {
605         backlog_.write_wanted = resp;
606         backlog_.write_options_wanted = std::move(options);
607         return;
608       }
609     }
610     writer->Write(resp, std::move(options));
611   }
StartWriteAndFinish(const Response * resp,::grpc::WriteOptions options,::grpc::Status s)612   void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options,
613                            ::grpc::Status s) {
614     ServerCallbackWriter<Response>* writer =
615         writer_.load(std::memory_order_acquire);
616     if (writer == nullptr) {
617       grpc::internal::MutexLock l(&writer_mu_);
618       writer = writer_.load(std::memory_order_relaxed);
619       if (writer == nullptr) {
620         backlog_.write_and_finish_wanted = true;
621         backlog_.write_wanted = resp;
622         backlog_.write_options_wanted = std::move(options);
623         backlog_.status_wanted = std::move(s);
624         return;
625       }
626     }
627     writer->WriteAndFinish(resp, std::move(options), std::move(s));
628   }
StartWriteLast(const Response * resp,::grpc::WriteOptions options)629   void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) {
630     StartWrite(resp, std::move(options.set_last_message()));
631   }
Finish(::grpc::Status s)632   void Finish(::grpc::Status s) {
633     ServerCallbackWriter<Response>* writer =
634         writer_.load(std::memory_order_acquire);
635     if (writer == nullptr) {
636       grpc::internal::MutexLock l(&writer_mu_);
637       writer = writer_.load(std::memory_order_relaxed);
638       if (writer == nullptr) {
639         backlog_.finish_wanted = true;
640         backlog_.status_wanted = std::move(s);
641         return;
642       }
643     }
644     writer->Finish(std::move(s));
645   }
646 
647   /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)648   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
OnWriteDone(bool)649   virtual void OnWriteDone(bool /*ok*/) {}
650   void OnDone() override = 0;
OnCancel()651   void OnCancel() override {}
652 
653  private:
654   friend class ServerCallbackWriter<Response>;
655   // May be overridden by internal implementation details. This is not a public
656   // customization point.
InternalBindWriter(ServerCallbackWriter<Response> * writer)657   virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) {
658     grpc::internal::ReleasableMutexLock l(&writer_mu_);
659     PreBindBacklog ops(std::move(backlog_));
660     writer_.store(writer, std::memory_order_release);
661     l.Unlock();
662 
663     if (ops.send_initial_metadata_wanted) {
664       writer->SendInitialMetadata();
665     }
666     if (ops.write_and_finish_wanted) {
667       writer->WriteAndFinish(ops.write_wanted,
668                              std::move(ops.write_options_wanted),
669                              std::move(ops.status_wanted));
670     } else {
671       if (ops.write_wanted != nullptr) {
672         writer->Write(ops.write_wanted, std::move(ops.write_options_wanted));
673       }
674       if (ops.finish_wanted) {
675         writer->Finish(std::move(ops.status_wanted));
676       }
677     }
678   }
679 
680   grpc::internal::Mutex writer_mu_;
681   std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
682   struct PreBindBacklog {
683     bool send_initial_metadata_wanted = false;
684     bool write_and_finish_wanted = false;
685     bool finish_wanted = false;
686     const Response* write_wanted = nullptr;
687     ::grpc::WriteOptions write_options_wanted;
688     ::grpc::Status status_wanted;
689   };
690   PreBindBacklog backlog_ /* GUARDED_BY(writer_mu_) */;
691 };
692 
693 class ServerUnaryReactor : public internal::ServerReactor {
694  public:
ServerUnaryReactor()695   ServerUnaryReactor() : call_(nullptr) {}
696   ~ServerUnaryReactor() = default;
697 
698   /// StartSendInitialMetadata is exactly like ServerBidiReactor.
StartSendInitialMetadata()699   void StartSendInitialMetadata() {
700     ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
701     if (call == nullptr) {
702       grpc::internal::MutexLock l(&call_mu_);
703       call = call_.load(std::memory_order_relaxed);
704       if (call == nullptr) {
705         backlog_.send_initial_metadata_wanted = true;
706         return;
707       }
708     }
709     call->SendInitialMetadata();
710   }
711   /// Finish is similar to ServerBidiReactor except for one detail.
712   /// If the status is non-OK, any message will not be sent. Instead,
713   /// the client will only receive the status and any trailing metadata.
Finish(::grpc::Status s)714   void Finish(::grpc::Status s) {
715     ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
716     if (call == nullptr) {
717       grpc::internal::MutexLock l(&call_mu_);
718       call = call_.load(std::memory_order_relaxed);
719       if (call == nullptr) {
720         backlog_.finish_wanted = true;
721         backlog_.status_wanted = std::move(s);
722         return;
723       }
724     }
725     call->Finish(std::move(s));
726   }
727 
728   /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)729   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
730   void OnDone() override = 0;
OnCancel()731   void OnCancel() override {}
732 
733  private:
734   friend class ServerCallbackUnary;
735   // May be overridden by internal implementation details. This is not a public
736   // customization point.
InternalBindCall(ServerCallbackUnary * call)737   virtual void InternalBindCall(ServerCallbackUnary* call) {
738     grpc::internal::ReleasableMutexLock l(&call_mu_);
739     PreBindBacklog ops(std::move(backlog_));
740     call_.store(call, std::memory_order_release);
741     l.Unlock();
742 
743     if (ops.send_initial_metadata_wanted) {
744       call->SendInitialMetadata();
745     }
746     if (ops.finish_wanted) {
747       call->Finish(std::move(ops.status_wanted));
748     }
749   }
750 
751   grpc::internal::Mutex call_mu_;
752   std::atomic<ServerCallbackUnary*> call_{nullptr};
753   struct PreBindBacklog {
754     bool send_initial_metadata_wanted = false;
755     bool finish_wanted = false;
756     ::grpc::Status status_wanted;
757   };
758   PreBindBacklog backlog_ /* GUARDED_BY(call_mu_) */;
759 };
760 
761 namespace internal {
762 
763 template <class Base>
764 class FinishOnlyReactor : public Base {
765  public:
FinishOnlyReactor(::grpc::Status s)766   explicit FinishOnlyReactor(::grpc::Status s) { this->Finish(std::move(s)); }
OnDone()767   void OnDone() override { this->~FinishOnlyReactor(); }
768 };
769 
770 using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>;
771 template <class Request>
772 using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>;
773 template <class Response>
774 using UnimplementedWriteReactor =
775     FinishOnlyReactor<ServerWriteReactor<Response>>;
776 template <class Request, class Response>
777 using UnimplementedBidiReactor =
778     FinishOnlyReactor<ServerBidiReactor<Request, Response>>;
779 
780 }  // namespace internal
781 }  // namespace grpc_impl
782 
783 #endif  // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
784