• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1  /*
2   *
3   * Copyright 2019 gRPC authors.
4   *
5   * Licensed under the Apache License, Version 2.0 (the "License");
6   * you may not use this file except in compliance with the License.
7   * You may obtain a copy of the License at
8   *
9   *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
19  #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
20  
21  #include <atomic>
22  #include <functional>
23  #include <type_traits>
24  
25  #include <grpcpp/impl/codegen/call.h>
26  #include <grpcpp/impl/codegen/call_op_set.h>
27  #include <grpcpp/impl/codegen/callback_common.h>
28  #include <grpcpp/impl/codegen/config.h>
29  #include <grpcpp/impl/codegen/core_codegen_interface.h>
30  #include <grpcpp/impl/codegen/message_allocator.h>
31  #include <grpcpp/impl/codegen/status.h>
32  
33  namespace grpc_impl {
34  
35  // Declare base class of all reactors as internal
36  namespace internal {
37  
38  // Forward declarations
39  template <class Request, class Response>
40  class CallbackUnaryHandler;
41  template <class Request, class Response>
42  class CallbackClientStreamingHandler;
43  template <class Request, class Response>
44  class CallbackServerStreamingHandler;
45  template <class Request, class Response>
46  class CallbackBidiHandler;
47  
48  class ServerReactor {
49   public:
50    virtual ~ServerReactor() = default;
51    virtual void OnDone() = 0;
52    virtual void OnCancel() = 0;
53  
54    // The following is not API. It is for internal use only and specifies whether
55    // all reactions of this Reactor can be run without an extra executor
56    // scheduling. This should only be used for internally-defined reactors with
57    // trivial reactions.
InternalInlineable()58    virtual bool InternalInlineable() { return false; }
59  
60   private:
61    template <class Request, class Response>
62    friend class CallbackUnaryHandler;
63    template <class Request, class Response>
64    friend class CallbackClientStreamingHandler;
65    template <class Request, class Response>
66    friend class CallbackServerStreamingHandler;
67    template <class Request, class Response>
68    friend class CallbackBidiHandler;
69  };
70  
71  /// The base class of ServerCallbackUnary etc.
72  class ServerCallbackCall {
73   public:
~ServerCallbackCall()74    virtual ~ServerCallbackCall() {}
75  
76    // This object is responsible for tracking when it is safe to call OnDone and
77    // OnCancel. OnDone should not be called until the method handler is complete,
78    // Finish has been called, the ServerContext CompletionOp (which tracks
79    // cancellation or successful completion) has completed, and all outstanding
80    // Read/Write actions have seen their reactions. OnCancel should not be called
81    // until after the method handler is done and the RPC has completed with a
82    // cancellation. This is tracked by counting how many of these conditions have
83    // been met and calling OnCancel when none remain unmet.
84  
85    // Public versions of MaybeDone: one where we don't know the reactor in
86    // advance (used for the ServerContext CompletionOp), and one for where we
87    // know the inlineability of the OnDone reaction. You should set the inline
88    // flag to true if either the Reactor is InternalInlineable() or if this
89    // callback is already being forced to run dispatched to an executor
90    // (typically because it contains additional work than just the MaybeDone).
91  
MaybeDone()92    void MaybeDone() {
93      if (GPR_UNLIKELY(Unref() == 1)) {
94        ScheduleOnDone(reactor()->InternalInlineable());
95      }
96    }
97  
MaybeDone(bool inline_ondone)98    void MaybeDone(bool inline_ondone) {
99      if (GPR_UNLIKELY(Unref() == 1)) {
100        ScheduleOnDone(inline_ondone);
101      }
102    }
103  
104    // Fast version called with known reactor passed in, used from derived
105    // classes, typically in non-cancel case
MaybeCallOnCancel(ServerReactor * reactor)106    void MaybeCallOnCancel(ServerReactor* reactor) {
107      if (GPR_UNLIKELY(UnblockCancellation())) {
108        CallOnCancel(reactor);
109      }
110    }
111  
112    // Slower version called from object that doesn't know the reactor a priori
113    // (such as the ServerContext CompletionOp which is formed before the
114    // reactor). This is used in cancel cases only, so it's ok to be slower and
115    // invoke a virtual function.
MaybeCallOnCancel()116    void MaybeCallOnCancel() {
117      if (GPR_UNLIKELY(UnblockCancellation())) {
118        CallOnCancel(reactor());
119      }
120    }
121  
122   protected:
123    /// Increases the reference count
Ref()124    void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); }
125  
126   private:
127    virtual ServerReactor* reactor() = 0;
128  
129    // CallOnDone performs the work required at completion of the RPC: invoking
130    // the OnDone function and doing all necessary cleanup. This function is only
131    // ever invoked on a fully-Unref'fed ServerCallbackCall.
132    virtual void CallOnDone() = 0;
133  
134    // If the OnDone reaction is inlineable, execute it inline. Otherwise send it
135    // to an executor.
136    void ScheduleOnDone(bool inline_ondone);
137  
138    // If the OnCancel reaction is inlineable, execute it inline. Otherwise send
139    // it to an executor.
140    void CallOnCancel(ServerReactor* reactor);
141  
142    // Implement the cancellation constraint counter. Return true if OnCancel
143    // should be called, false otherwise.
UnblockCancellation()144    bool UnblockCancellation() {
145      return on_cancel_conditions_remaining_.fetch_sub(
146                 1, std::memory_order_acq_rel) == 1;
147    }
148  
149    /// Decreases the reference count and returns the previous value
Unref()150    int Unref() {
151      return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
152    }
153  
154    std::atomic_int on_cancel_conditions_remaining_{2};
155    std::atomic_int callbacks_outstanding_{
156        3};  // reserve for start, Finish, and CompletionOp
157  };
158  
159  template <class Request, class Response>
160  class DefaultMessageHolder
161      : public ::grpc::experimental::MessageHolder<Request, Response> {
162   public:
DefaultMessageHolder()163    DefaultMessageHolder() {
164      this->set_request(&request_obj_);
165      this->set_response(&response_obj_);
166    }
Release()167    void Release() override {
168      // the object is allocated in the call arena.
169      this->~DefaultMessageHolder<Request, Response>();
170    }
171  
172   private:
173    Request request_obj_;
174    Response response_obj_;
175  };
176  
177  }  // namespace internal
178  
179  // Forward declarations
180  class ServerUnaryReactor;
181  template <class Request>
182  class ServerReadReactor;
183  template <class Response>
184  class ServerWriteReactor;
185  template <class Request, class Response>
186  class ServerBidiReactor;
187  
188  // NOTE: The actual call/stream object classes are provided as API only to
189  // support mocking. There are no implementations of these class interfaces in
190  // the API.
191  class ServerCallbackUnary : public internal::ServerCallbackCall {
192   public:
~ServerCallbackUnary()193    virtual ~ServerCallbackUnary() {}
194    virtual void Finish(::grpc::Status s) = 0;
195    virtual void SendInitialMetadata() = 0;
196  
197   protected:
198    // Use a template rather than explicitly specifying ServerUnaryReactor to
199    // delay binding and avoid a circular forward declaration issue
200    template <class Reactor>
BindReactor(Reactor * reactor)201    void BindReactor(Reactor* reactor) {
202      reactor->InternalBindCall(this);
203    }
204  };
205  
206  template <class Request>
207  class ServerCallbackReader : public internal::ServerCallbackCall {
208   public:
~ServerCallbackReader()209    virtual ~ServerCallbackReader() {}
210    virtual void Finish(::grpc::Status s) = 0;
211    virtual void SendInitialMetadata() = 0;
212    virtual void Read(Request* msg) = 0;
213  
214   protected:
BindReactor(ServerReadReactor<Request> * reactor)215    void BindReactor(ServerReadReactor<Request>* reactor) {
216      reactor->InternalBindReader(this);
217    }
218  };
219  
220  template <class Response>
221  class ServerCallbackWriter : public internal::ServerCallbackCall {
222   public:
~ServerCallbackWriter()223    virtual ~ServerCallbackWriter() {}
224  
225    virtual void Finish(::grpc::Status s) = 0;
226    virtual void SendInitialMetadata() = 0;
227    virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0;
228    virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options,
229                                ::grpc::Status s) = 0;
230  
231   protected:
BindReactor(ServerWriteReactor<Response> * reactor)232    void BindReactor(ServerWriteReactor<Response>* reactor) {
233      reactor->InternalBindWriter(this);
234    }
235  };
236  
237  template <class Request, class Response>
238  class ServerCallbackReaderWriter : public internal::ServerCallbackCall {
239   public:
~ServerCallbackReaderWriter()240    virtual ~ServerCallbackReaderWriter() {}
241  
242    virtual void Finish(::grpc::Status s) = 0;
243    virtual void SendInitialMetadata() = 0;
244    virtual void Read(Request* msg) = 0;
245    virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0;
246    virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options,
247                                ::grpc::Status s) = 0;
248  
249   protected:
BindReactor(ServerBidiReactor<Request,Response> * reactor)250    void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
251      reactor->InternalBindStream(this);
252    }
253  };
254  
255  // The following classes are the reactor interfaces that are to be implemented
256  // by the user, returned as the output parameter of the method handler for a
257  // callback method. Note that none of the classes are pure; all reactions have a
258  // default empty reaction so that the user class only needs to override those
259  // classes that it cares about.
260  
261  /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC.
262  template <class Request, class Response>
263  class ServerBidiReactor : public internal::ServerReactor {
264   public:
265    // NOTE: Initializing stream_ as a constructor initializer rather than a
266    //       default initializer because gcc-4.x requires a copy constructor for
267    //       default initializing a templated member, which isn't ok for atomic.
268    // TODO(vjpai): Switch to default constructor and default initializer when
269    //              gcc-4.x is no longer supported
ServerBidiReactor()270    ServerBidiReactor() : stream_(nullptr) {}
271    ~ServerBidiReactor() = default;
272  
273    /// Send any initial metadata stored in the RPC context. If not invoked,
274    /// any initial metadata will be passed along with the first Write or the
275    /// Finish (if there are no writes).
StartSendInitialMetadata()276    void StartSendInitialMetadata() {
277      ServerCallbackReaderWriter<Request, Response>* stream =
278          stream_.load(std::memory_order_acquire);
279      if (stream == nullptr) {
280        grpc::internal::MutexLock l(&stream_mu_);
281        stream = stream_.load(std::memory_order_relaxed);
282        if (stream == nullptr) {
283          backlog_.send_initial_metadata_wanted = true;
284          return;
285        }
286      }
287      stream->SendInitialMetadata();
288    }
289  
290    /// Initiate a read operation.
291    ///
292    /// \param[out] req Where to eventually store the read message. Valid when
293    ///                 the library calls OnReadDone
StartRead(Request * req)294    void StartRead(Request* req) {
295      ServerCallbackReaderWriter<Request, Response>* stream =
296          stream_.load(std::memory_order_acquire);
297      if (stream == nullptr) {
298        grpc::internal::MutexLock l(&stream_mu_);
299        stream = stream_.load(std::memory_order_relaxed);
300        if (stream == nullptr) {
301          backlog_.read_wanted = req;
302          return;
303        }
304      }
305      stream->Read(req);
306    }
307  
308    /// Initiate a write operation.
309    ///
310    /// \param[in] resp The message to be written. The library does not take
311    ///                 ownership but the caller must ensure that the message is
312    ///                 not deleted or modified until OnWriteDone is called.
StartWrite(const Response * resp)313    void StartWrite(const Response* resp) {
314      StartWrite(resp, ::grpc::WriteOptions());
315    }
316  
317    /// Initiate a write operation with specified options.
318    ///
319    /// \param[in] resp The message to be written. The library does not take
320    ///                 ownership but the caller must ensure that the message is
321    ///                 not deleted or modified until OnWriteDone is called.
322    /// \param[in] options The WriteOptions to use for writing this message
StartWrite(const Response * resp,::grpc::WriteOptions options)323    void StartWrite(const Response* resp, ::grpc::WriteOptions options) {
324      ServerCallbackReaderWriter<Request, Response>* stream =
325          stream_.load(std::memory_order_acquire);
326      if (stream == nullptr) {
327        grpc::internal::MutexLock l(&stream_mu_);
328        stream = stream_.load(std::memory_order_relaxed);
329        if (stream == nullptr) {
330          backlog_.write_wanted = resp;
331          backlog_.write_options_wanted = std::move(options);
332          return;
333        }
334      }
335      stream->Write(resp, std::move(options));
336    }
337  
338    /// Initiate a write operation with specified options and final RPC Status,
339    /// which also causes any trailing metadata for this RPC to be sent out.
340    /// StartWriteAndFinish is like merging StartWriteLast and Finish into a
341    /// single step. A key difference, though, is that this operation doesn't have
342    /// an OnWriteDone reaction - it is considered complete only when OnDone is
343    /// available. An RPC can either have StartWriteAndFinish or Finish, but not
344    /// both.
345    ///
346    /// \param[in] resp The message to be written. The library does not take
347    ///                 ownership but the caller must ensure that the message is
348    ///                 not deleted or modified until OnDone is called.
349    /// \param[in] options The WriteOptions to use for writing this message
350    /// \param[in] s The status outcome of this RPC
StartWriteAndFinish(const Response * resp,::grpc::WriteOptions options,::grpc::Status s)351    void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options,
352                             ::grpc::Status s) {
353      ServerCallbackReaderWriter<Request, Response>* stream =
354          stream_.load(std::memory_order_acquire);
355      if (stream == nullptr) {
356        grpc::internal::MutexLock l(&stream_mu_);
357        stream = stream_.load(std::memory_order_relaxed);
358        if (stream == nullptr) {
359          backlog_.write_and_finish_wanted = true;
360          backlog_.write_wanted = resp;
361          backlog_.write_options_wanted = std::move(options);
362          backlog_.status_wanted = std::move(s);
363          return;
364        }
365      }
366      stream->WriteAndFinish(resp, std::move(options), std::move(s));
367    }
368  
369    /// Inform system of a planned write operation with specified options, but
370    /// allow the library to schedule the actual write coalesced with the writing
371    /// of trailing metadata (which takes place on a Finish call).
372    ///
373    /// \param[in] resp The message to be written. The library does not take
374    ///                 ownership but the caller must ensure that the message is
375    ///                 not deleted or modified until OnWriteDone is called.
376    /// \param[in] options The WriteOptions to use for writing this message
StartWriteLast(const Response * resp,::grpc::WriteOptions options)377    void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) {
378      StartWrite(resp, std::move(options.set_last_message()));
379    }
380  
381    /// Indicate that the stream is to be finished and the trailing metadata and
382    /// RPC status are to be sent. Every RPC MUST be finished using either Finish
383    /// or StartWriteAndFinish (but not both), even if the RPC is already
384    /// cancelled.
385    ///
386    /// \param[in] s The status outcome of this RPC
Finish(::grpc::Status s)387    void Finish(::grpc::Status s) {
388      ServerCallbackReaderWriter<Request, Response>* stream =
389          stream_.load(std::memory_order_acquire);
390      if (stream == nullptr) {
391        grpc::internal::MutexLock l(&stream_mu_);
392        stream = stream_.load(std::memory_order_relaxed);
393        if (stream == nullptr) {
394          backlog_.finish_wanted = true;
395          backlog_.status_wanted = std::move(s);
396          return;
397        }
398      }
399      stream->Finish(std::move(s));
400    }
401  
402    /// Notifies the application that an explicit StartSendInitialMetadata
403    /// operation completed. Not used when the sending of initial metadata
404    /// piggybacks onto the first write.
405    ///
406    /// \param[in] ok Was it successful? If false, no further write-side operation
407    ///               will succeed.
OnSendInitialMetadataDone(bool)408    virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
409  
410    /// Notifies the application that a StartRead operation completed.
411    ///
412    /// \param[in] ok Was it successful? If false, no further read-side operation
413    ///               will succeed.
OnReadDone(bool)414    virtual void OnReadDone(bool /*ok*/) {}
415  
416    /// Notifies the application that a StartWrite (or StartWriteLast) operation
417    /// completed.
418    ///
419    /// \param[in] ok Was it successful? If false, no further write-side operation
420    ///               will succeed.
OnWriteDone(bool)421    virtual void OnWriteDone(bool /*ok*/) {}
422  
423    /// Notifies the application that all operations associated with this RPC
424    /// have completed. This is an override (from the internal base class) but
425    /// still abstract, so derived classes MUST override it to be instantiated.
426    void OnDone() override = 0;
427  
428    /// Notifies the application that this RPC has been cancelled. This is an
429    /// override (from the internal base class) but not final, so derived classes
430    /// should override it if they want to take action.
OnCancel()431    void OnCancel() override {}
432  
433   private:
434    friend class ServerCallbackReaderWriter<Request, Response>;
435    // May be overridden by internal implementation details. This is not a public
436    // customization point.
InternalBindStream(ServerCallbackReaderWriter<Request,Response> * stream)437    virtual void InternalBindStream(
438        ServerCallbackReaderWriter<Request, Response>* stream) {
439      // TODO(vjpai): When stream_or_backlog_ becomes a variant (see below), use
440      // a scoped MutexLock and std::swap stream_or_backlog_ with a variant that
441      // has stream, then std::get<PreBindBacklog> out of that after the lock.
442      // Do likewise with the remaining InternalBind* functions as well.
443      grpc::internal::ReleasableMutexLock l(&stream_mu_);
444      PreBindBacklog ops(std::move(backlog_));
445      stream_.store(stream, std::memory_order_release);
446      l.Unlock();
447  
448      if (ops.send_initial_metadata_wanted) {
449        stream->SendInitialMetadata();
450      }
451      if (ops.read_wanted != nullptr) {
452        stream->Read(ops.read_wanted);
453      }
454      if (ops.write_and_finish_wanted) {
455        stream->WriteAndFinish(ops.write_wanted,
456                               std::move(ops.write_options_wanted),
457                               std::move(ops.status_wanted));
458      } else {
459        if (ops.write_wanted != nullptr) {
460          stream->Write(ops.write_wanted, std::move(ops.write_options_wanted));
461        }
462        if (ops.finish_wanted) {
463          stream->Finish(std::move(ops.status_wanted));
464        }
465      }
466    }
467  
468    grpc::internal::Mutex stream_mu_;
469    // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant
470    //              once C++17 or ABSL is supported since stream and backlog are
471    //              mutually exclusive in this class. Do likewise with the
472    //              remaining reactor classes and their backlogs as well.
473    std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
474    struct PreBindBacklog {
475      bool send_initial_metadata_wanted = false;
476      bool write_and_finish_wanted = false;
477      bool finish_wanted = false;
478      Request* read_wanted = nullptr;
479      const Response* write_wanted = nullptr;
480      ::grpc::WriteOptions write_options_wanted;
481      ::grpc::Status status_wanted;
482    };
483    PreBindBacklog backlog_ /* GUARDED_BY(stream_mu_) */;
484  };
485  
486  /// \a ServerReadReactor is the interface for a client-streaming RPC.
487  template <class Request>
488  class ServerReadReactor : public internal::ServerReactor {
489   public:
ServerReadReactor()490    ServerReadReactor() : reader_(nullptr) {}
491    ~ServerReadReactor() = default;
492  
493    /// The following operation initiations are exactly like ServerBidiReactor.
StartSendInitialMetadata()494    void StartSendInitialMetadata() {
495      ServerCallbackReader<Request>* reader =
496          reader_.load(std::memory_order_acquire);
497      if (reader == nullptr) {
498        grpc::internal::MutexLock l(&reader_mu_);
499        reader = reader_.load(std::memory_order_relaxed);
500        if (reader == nullptr) {
501          backlog_.send_initial_metadata_wanted = true;
502          return;
503        }
504      }
505      reader->SendInitialMetadata();
506    }
StartRead(Request * req)507    void StartRead(Request* req) {
508      ServerCallbackReader<Request>* reader =
509          reader_.load(std::memory_order_acquire);
510      if (reader == nullptr) {
511        grpc::internal::MutexLock l(&reader_mu_);
512        reader = reader_.load(std::memory_order_relaxed);
513        if (reader == nullptr) {
514          backlog_.read_wanted = req;
515          return;
516        }
517      }
518      reader->Read(req);
519    }
Finish(::grpc::Status s)520    void Finish(::grpc::Status s) {
521      ServerCallbackReader<Request>* reader =
522          reader_.load(std::memory_order_acquire);
523      if (reader == nullptr) {
524        grpc::internal::MutexLock l(&reader_mu_);
525        reader = reader_.load(std::memory_order_relaxed);
526        if (reader == nullptr) {
527          backlog_.finish_wanted = true;
528          backlog_.status_wanted = std::move(s);
529          return;
530        }
531      }
532      reader->Finish(std::move(s));
533    }
534  
535    /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)536    virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
OnReadDone(bool)537    virtual void OnReadDone(bool /*ok*/) {}
538    void OnDone() override = 0;
OnCancel()539    void OnCancel() override {}
540  
541   private:
542    friend class ServerCallbackReader<Request>;
543  
544    // May be overridden by internal implementation details. This is not a public
545    // customization point.
InternalBindReader(ServerCallbackReader<Request> * reader)546    virtual void InternalBindReader(ServerCallbackReader<Request>* reader) {
547      grpc::internal::ReleasableMutexLock l(&reader_mu_);
548      PreBindBacklog ops(std::move(backlog_));
549      reader_.store(reader, std::memory_order_release);
550      l.Unlock();
551  
552      if (ops.send_initial_metadata_wanted) {
553        reader->SendInitialMetadata();
554      }
555      if (ops.read_wanted != nullptr) {
556        reader->Read(ops.read_wanted);
557      }
558      if (ops.finish_wanted) {
559        reader->Finish(std::move(ops.status_wanted));
560      }
561    }
562  
563    grpc::internal::Mutex reader_mu_;
564    std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
565    struct PreBindBacklog {
566      bool send_initial_metadata_wanted = false;
567      bool finish_wanted = false;
568      Request* read_wanted = nullptr;
569      ::grpc::Status status_wanted;
570    };
571    PreBindBacklog backlog_ /* GUARDED_BY(reader_mu_) */;
572  };
573  
574  /// \a ServerWriteReactor is the interface for a server-streaming RPC.
575  template <class Response>
576  class ServerWriteReactor : public internal::ServerReactor {
577   public:
ServerWriteReactor()578    ServerWriteReactor() : writer_(nullptr) {}
579    ~ServerWriteReactor() = default;
580  
581    /// The following operation initiations are exactly like ServerBidiReactor.
StartSendInitialMetadata()582    void StartSendInitialMetadata() {
583      ServerCallbackWriter<Response>* writer =
584          writer_.load(std::memory_order_acquire);
585      if (writer == nullptr) {
586        grpc::internal::MutexLock l(&writer_mu_);
587        writer = writer_.load(std::memory_order_relaxed);
588        if (writer == nullptr) {
589          backlog_.send_initial_metadata_wanted = true;
590          return;
591        }
592      }
593      writer->SendInitialMetadata();
594    }
StartWrite(const Response * resp)595    void StartWrite(const Response* resp) {
596      StartWrite(resp, ::grpc::WriteOptions());
597    }
StartWrite(const Response * resp,::grpc::WriteOptions options)598    void StartWrite(const Response* resp, ::grpc::WriteOptions options) {
599      ServerCallbackWriter<Response>* writer =
600          writer_.load(std::memory_order_acquire);
601      if (writer == nullptr) {
602        grpc::internal::MutexLock l(&writer_mu_);
603        writer = writer_.load(std::memory_order_relaxed);
604        if (writer == nullptr) {
605          backlog_.write_wanted = resp;
606          backlog_.write_options_wanted = std::move(options);
607          return;
608        }
609      }
610      writer->Write(resp, std::move(options));
611    }
StartWriteAndFinish(const Response * resp,::grpc::WriteOptions options,::grpc::Status s)612    void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options,
613                             ::grpc::Status s) {
614      ServerCallbackWriter<Response>* writer =
615          writer_.load(std::memory_order_acquire);
616      if (writer == nullptr) {
617        grpc::internal::MutexLock l(&writer_mu_);
618        writer = writer_.load(std::memory_order_relaxed);
619        if (writer == nullptr) {
620          backlog_.write_and_finish_wanted = true;
621          backlog_.write_wanted = resp;
622          backlog_.write_options_wanted = std::move(options);
623          backlog_.status_wanted = std::move(s);
624          return;
625        }
626      }
627      writer->WriteAndFinish(resp, std::move(options), std::move(s));
628    }
StartWriteLast(const Response * resp,::grpc::WriteOptions options)629    void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) {
630      StartWrite(resp, std::move(options.set_last_message()));
631    }
Finish(::grpc::Status s)632    void Finish(::grpc::Status s) {
633      ServerCallbackWriter<Response>* writer =
634          writer_.load(std::memory_order_acquire);
635      if (writer == nullptr) {
636        grpc::internal::MutexLock l(&writer_mu_);
637        writer = writer_.load(std::memory_order_relaxed);
638        if (writer == nullptr) {
639          backlog_.finish_wanted = true;
640          backlog_.status_wanted = std::move(s);
641          return;
642        }
643      }
644      writer->Finish(std::move(s));
645    }
646  
647    /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)648    virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
OnWriteDone(bool)649    virtual void OnWriteDone(bool /*ok*/) {}
650    void OnDone() override = 0;
OnCancel()651    void OnCancel() override {}
652  
653   private:
654    friend class ServerCallbackWriter<Response>;
655    // May be overridden by internal implementation details. This is not a public
656    // customization point.
InternalBindWriter(ServerCallbackWriter<Response> * writer)657    virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) {
658      grpc::internal::ReleasableMutexLock l(&writer_mu_);
659      PreBindBacklog ops(std::move(backlog_));
660      writer_.store(writer, std::memory_order_release);
661      l.Unlock();
662  
663      if (ops.send_initial_metadata_wanted) {
664        writer->SendInitialMetadata();
665      }
666      if (ops.write_and_finish_wanted) {
667        writer->WriteAndFinish(ops.write_wanted,
668                               std::move(ops.write_options_wanted),
669                               std::move(ops.status_wanted));
670      } else {
671        if (ops.write_wanted != nullptr) {
672          writer->Write(ops.write_wanted, std::move(ops.write_options_wanted));
673        }
674        if (ops.finish_wanted) {
675          writer->Finish(std::move(ops.status_wanted));
676        }
677      }
678    }
679  
680    grpc::internal::Mutex writer_mu_;
681    std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
682    struct PreBindBacklog {
683      bool send_initial_metadata_wanted = false;
684      bool write_and_finish_wanted = false;
685      bool finish_wanted = false;
686      const Response* write_wanted = nullptr;
687      ::grpc::WriteOptions write_options_wanted;
688      ::grpc::Status status_wanted;
689    };
690    PreBindBacklog backlog_ /* GUARDED_BY(writer_mu_) */;
691  };
692  
693  class ServerUnaryReactor : public internal::ServerReactor {
694   public:
ServerUnaryReactor()695    ServerUnaryReactor() : call_(nullptr) {}
696    ~ServerUnaryReactor() = default;
697  
698    /// StartSendInitialMetadata is exactly like ServerBidiReactor.
StartSendInitialMetadata()699    void StartSendInitialMetadata() {
700      ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
701      if (call == nullptr) {
702        grpc::internal::MutexLock l(&call_mu_);
703        call = call_.load(std::memory_order_relaxed);
704        if (call == nullptr) {
705          backlog_.send_initial_metadata_wanted = true;
706          return;
707        }
708      }
709      call->SendInitialMetadata();
710    }
711    /// Finish is similar to ServerBidiReactor except for one detail.
712    /// If the status is non-OK, any message will not be sent. Instead,
713    /// the client will only receive the status and any trailing metadata.
Finish(::grpc::Status s)714    void Finish(::grpc::Status s) {
715      ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
716      if (call == nullptr) {
717        grpc::internal::MutexLock l(&call_mu_);
718        call = call_.load(std::memory_order_relaxed);
719        if (call == nullptr) {
720          backlog_.finish_wanted = true;
721          backlog_.status_wanted = std::move(s);
722          return;
723        }
724      }
725      call->Finish(std::move(s));
726    }
727  
728    /// The following notifications are exactly like ServerBidiReactor.
OnSendInitialMetadataDone(bool)729    virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
730    void OnDone() override = 0;
OnCancel()731    void OnCancel() override {}
732  
733   private:
734    friend class ServerCallbackUnary;
735    // May be overridden by internal implementation details. This is not a public
736    // customization point.
InternalBindCall(ServerCallbackUnary * call)737    virtual void InternalBindCall(ServerCallbackUnary* call) {
738      grpc::internal::ReleasableMutexLock l(&call_mu_);
739      PreBindBacklog ops(std::move(backlog_));
740      call_.store(call, std::memory_order_release);
741      l.Unlock();
742  
743      if (ops.send_initial_metadata_wanted) {
744        call->SendInitialMetadata();
745      }
746      if (ops.finish_wanted) {
747        call->Finish(std::move(ops.status_wanted));
748      }
749    }
750  
751    grpc::internal::Mutex call_mu_;
752    std::atomic<ServerCallbackUnary*> call_{nullptr};
753    struct PreBindBacklog {
754      bool send_initial_metadata_wanted = false;
755      bool finish_wanted = false;
756      ::grpc::Status status_wanted;
757    };
758    PreBindBacklog backlog_ /* GUARDED_BY(call_mu_) */;
759  };
760  
761  namespace internal {
762  
763  template <class Base>
764  class FinishOnlyReactor : public Base {
765   public:
FinishOnlyReactor(::grpc::Status s)766    explicit FinishOnlyReactor(::grpc::Status s) { this->Finish(std::move(s)); }
OnDone()767    void OnDone() override { this->~FinishOnlyReactor(); }
768  };
769  
770  using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>;
771  template <class Request>
772  using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>;
773  template <class Response>
774  using UnimplementedWriteReactor =
775      FinishOnlyReactor<ServerWriteReactor<Response>>;
776  template <class Request, class Response>
777  using UnimplementedBidiReactor =
778      FinishOnlyReactor<ServerBidiReactor<Request, Response>>;
779  
780  }  // namespace internal
781  }  // namespace grpc_impl
782  
783  #endif  // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
784