• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
20 
21 #include <grpcpp/impl/codegen/message_allocator.h>
22 #include <grpcpp/impl/codegen/rpc_service_method.h>
23 #include <grpcpp/impl/codegen/server_callback.h>
24 #include <grpcpp/impl/codegen/server_context.h>
25 #include <grpcpp/impl/codegen/status.h>
26 
27 namespace grpc {
28 namespace internal {
29 
30 template <class RequestType, class ResponseType>
31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler {
32  public:
CallbackUnaryHandler(std::function<ServerUnaryReactor * (::grpc::CallbackServerContext *,const RequestType *,ResponseType *)> get_reactor)33   explicit CallbackUnaryHandler(
34       std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
35                                         const RequestType*, ResponseType*)>
36           get_reactor)
37       : get_reactor_(std::move(get_reactor)) {}
38 
SetMessageAllocator(::grpc::experimental::MessageAllocator<RequestType,ResponseType> * allocator)39   void SetMessageAllocator(
40       ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
41           allocator) {
42     allocator_ = allocator;
43   }
44 
RunHandler(const HandlerParameter & param)45   void RunHandler(const HandlerParameter& param) final {
46     // Arena allocate a controller structure (that includes request/response)
47     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
48     auto* allocator_state = static_cast<
49         ::grpc::experimental::MessageHolder<RequestType, ResponseType>*>(
50         param.internal_data);
51 
52     auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
53         param.call->call(), sizeof(ServerCallbackUnaryImpl)))
54         ServerCallbackUnaryImpl(
55             static_cast<::grpc::CallbackServerContext*>(param.server_context),
56             param.call, allocator_state, param.call_requester);
57     param.server_context->BeginCompletionOp(
58         param.call, [call](bool) { call->MaybeDone(); }, call);
59 
60     ServerUnaryReactor* reactor = nullptr;
61     if (param.status.ok()) {
62       reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
63           get_reactor_,
64           static_cast<::grpc::CallbackServerContext*>(param.server_context),
65           call->request(), call->response());
66     }
67 
68     if (reactor == nullptr) {
69       // if deserialization or reactor creator failed, we need to fail the call
70       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
71           param.call->call(), sizeof(UnimplementedUnaryReactor)))
72           UnimplementedUnaryReactor(
73               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
74     }
75 
76     /// Invoke SetupReactor as the last part of the handler
77     call->SetupReactor(reactor);
78   }
79 
Deserialize(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,void ** handler_data)80   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
81                     ::grpc::Status* status, void** handler_data) final {
82     ::grpc::ByteBuffer buf;
83     buf.set_buffer(req);
84     RequestType* request = nullptr;
85     ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
86         allocator_state = nullptr;
87     if (allocator_ != nullptr) {
88       allocator_state = allocator_->AllocateMessages();
89     } else {
90       allocator_state =
91           new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
92               call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
93               DefaultMessageHolder<RequestType, ResponseType>();
94     }
95     *handler_data = allocator_state;
96     request = allocator_state->request();
97     *status =
98         ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
99     buf.Release();
100     if (status->ok()) {
101       return request;
102     }
103     // Clean up on deserialization failure.
104     allocator_state->Release();
105     return nullptr;
106   }
107 
108  private:
109   std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*,
110                                     const RequestType*, ResponseType*)>
111       get_reactor_;
112   ::grpc::experimental::MessageAllocator<RequestType, ResponseType>*
113       allocator_ = nullptr;
114 
115   class ServerCallbackUnaryImpl : public ServerCallbackUnary {
116    public:
Finish(::grpc::Status s)117     void Finish(::grpc::Status s) override {
118       // A callback that only contains a call to MaybeDone can be run as an
119       // inline callback regardless of whether or not OnDone is inlineable
120       // because if the actual OnDone callback needs to be scheduled, MaybeDone
121       // is responsible for dispatching to an executor thread if needed. Thus,
122       // when setting up the finish_tag_, we can set its own callback to
123       // inlineable.
124       finish_tag_.Set(
125           call_.call(),
126           [this](bool) {
127             this->MaybeDone(
128                 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
129           },
130           &finish_ops_, /*can_inline=*/true);
131       finish_ops_.set_core_cq_tag(&finish_tag_);
132 
133       if (!ctx_->sent_initial_metadata_) {
134         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
135                                         ctx_->initial_metadata_flags());
136         if (ctx_->compression_level_set()) {
137           finish_ops_.set_compression_level(ctx_->compression_level());
138         }
139         ctx_->sent_initial_metadata_ = true;
140       }
141       // The response is dropped if the status is not OK.
142       if (s.ok()) {
143         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
144                                      finish_ops_.SendMessagePtr(response()));
145       } else {
146         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
147       }
148       finish_ops_.set_core_cq_tag(&finish_tag_);
149       call_.PerformOps(&finish_ops_);
150     }
151 
SendInitialMetadata()152     void SendInitialMetadata() override {
153       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
154       this->Ref();
155       // The callback for this function should not be marked inline because it
156       // is directly invoking a user-controlled reaction
157       // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
158       // thread. However, any OnDone needed after that can be inlined because it
159       // is already running on an executor thread.
160       meta_tag_.Set(
161           call_.call(),
162           [this](bool ok) {
163             ServerUnaryReactor* reactor =
164                 reactor_.load(std::memory_order_relaxed);
165             reactor->OnSendInitialMetadataDone(ok);
166             this->MaybeDone(/*inlineable_ondone=*/true);
167           },
168           &meta_ops_, /*can_inline=*/false);
169       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
170                                     ctx_->initial_metadata_flags());
171       if (ctx_->compression_level_set()) {
172         meta_ops_.set_compression_level(ctx_->compression_level());
173       }
174       ctx_->sent_initial_metadata_ = true;
175       meta_ops_.set_core_cq_tag(&meta_tag_);
176       call_.PerformOps(&meta_ops_);
177     }
178 
179    private:
180     friend class CallbackUnaryHandler<RequestType, ResponseType>;
181 
ServerCallbackUnaryImpl(::grpc::CallbackServerContext * ctx,::grpc::internal::Call * call,::grpc::experimental::MessageHolder<RequestType,ResponseType> * allocator_state,std::function<void ()> call_requester)182     ServerCallbackUnaryImpl(
183         ::grpc::CallbackServerContext* ctx, ::grpc::internal::Call* call,
184         ::grpc::experimental::MessageHolder<RequestType, ResponseType>*
185             allocator_state,
186         std::function<void()> call_requester)
187         : ctx_(ctx),
188           call_(*call),
189           allocator_state_(allocator_state),
190           call_requester_(std::move(call_requester)) {
191       ctx_->set_message_allocator_state(allocator_state);
192     }
193 
194     /// SetupReactor binds the reactor (which also releases any queued
195     /// operations), maybe calls OnCancel if possible/needed, and maybe marks
196     /// the completion of the RPC. This should be the last component of the
197     /// handler.
SetupReactor(ServerUnaryReactor * reactor)198     void SetupReactor(ServerUnaryReactor* reactor) {
199       reactor_.store(reactor, std::memory_order_relaxed);
200       this->BindReactor(reactor);
201       this->MaybeCallOnCancel(reactor);
202       this->MaybeDone(reactor->InternalInlineable());
203     }
204 
request()205     const RequestType* request() { return allocator_state_->request(); }
response()206     ResponseType* response() { return allocator_state_->response(); }
207 
CallOnDone()208     void CallOnDone() override {
209       reactor_.load(std::memory_order_relaxed)->OnDone();
210       grpc_call* call = call_.call();
211       auto call_requester = std::move(call_requester_);
212       allocator_state_->Release();
213       if (ctx_->context_allocator() != nullptr) {
214         ctx_->context_allocator()->Release(ctx_);
215       }
216       this->~ServerCallbackUnaryImpl();  // explicitly call destructor
217       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
218       call_requester();
219     }
220 
reactor()221     ServerReactor* reactor() override {
222       return reactor_.load(std::memory_order_relaxed);
223     }
224 
225     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
226         meta_ops_;
227     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
228     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
229                                 ::grpc::internal::CallOpSendMessage,
230                                 ::grpc::internal::CallOpServerSendStatus>
231         finish_ops_;
232     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
233 
234     ::grpc::CallbackServerContext* const ctx_;
235     ::grpc::internal::Call call_;
236     ::grpc::experimental::MessageHolder<RequestType, ResponseType>* const
237         allocator_state_;
238     std::function<void()> call_requester_;
239     // reactor_ can always be loaded/stored with relaxed memory ordering because
240     // its value is only set once, independently of other data in the object,
241     // and the loads that use it will always actually come provably later even
242     // though they are from different threads since they are triggered by
243     // actions initiated only by the setting up of the reactor_ variable. In
244     // a sense, it's a delayed "const": it gets its value from the SetupReactor
245     // method (not the constructor, so it's not a true const), but it doesn't
246     // change after that and it only gets used by actions caused, directly or
247     // indirectly, by that setup. This comment also applies to the reactor_
248     // variables of the other streaming objects in this file.
249     std::atomic<ServerUnaryReactor*> reactor_;
250     // callbacks_outstanding_ follows a refcount pattern
251     std::atomic<intptr_t> callbacks_outstanding_{
252         3};  // reserve for start, Finish, and CompletionOp
253   };
254 };
255 
256 template <class RequestType, class ResponseType>
257 class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler {
258  public:
CallbackClientStreamingHandler(std::function<ServerReadReactor<RequestType> * (::grpc::CallbackServerContext *,ResponseType *)> get_reactor)259   explicit CallbackClientStreamingHandler(
260       std::function<ServerReadReactor<RequestType>*(
261           ::grpc::CallbackServerContext*, ResponseType*)>
262           get_reactor)
263       : get_reactor_(std::move(get_reactor)) {}
RunHandler(const HandlerParameter & param)264   void RunHandler(const HandlerParameter& param) final {
265     // Arena allocate a reader structure (that includes response)
266     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
267 
268     auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
269         param.call->call(), sizeof(ServerCallbackReaderImpl)))
270         ServerCallbackReaderImpl(
271             static_cast<::grpc::CallbackServerContext*>(param.server_context),
272             param.call, param.call_requester);
273     // Inlineable OnDone can be false in the CompletionOp callback because there
274     // is no read reactor that has an inlineable OnDone; this only applies to
275     // the DefaultReactor (which is unary).
276     param.server_context->BeginCompletionOp(
277         param.call,
278         [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
279         reader);
280 
281     ServerReadReactor<RequestType>* reactor = nullptr;
282     if (param.status.ok()) {
283       reactor = ::grpc::internal::CatchingReactorGetter<
284           ServerReadReactor<RequestType>>(
285           get_reactor_,
286           static_cast<::grpc::CallbackServerContext*>(param.server_context),
287           reader->response());
288     }
289 
290     if (reactor == nullptr) {
291       // if deserialization or reactor creator failed, we need to fail the call
292       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
293           param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
294           UnimplementedReadReactor<RequestType>(
295               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
296     }
297 
298     reader->SetupReactor(reactor);
299   }
300 
301  private:
302   std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*,
303                                                 ResponseType*)>
304       get_reactor_;
305 
306   class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
307    public:
Finish(::grpc::Status s)308     void Finish(::grpc::Status s) override {
309       // A finish tag with only MaybeDone can have its callback inlined
310       // regardless even if OnDone is not inlineable because this callback just
311       // checks a ref and then decides whether or not to dispatch OnDone.
312       finish_tag_.Set(
313           call_.call(),
314           [this](bool) {
315             // Inlineable OnDone can be false here because there is
316             // no read reactor that has an inlineable OnDone; this
317             // only applies to the DefaultReactor (which is unary).
318             this->MaybeDone(/*inlineable_ondone=*/false);
319           },
320           &finish_ops_, /*can_inline=*/true);
321       if (!ctx_->sent_initial_metadata_) {
322         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
323                                         ctx_->initial_metadata_flags());
324         if (ctx_->compression_level_set()) {
325           finish_ops_.set_compression_level(ctx_->compression_level());
326         }
327         ctx_->sent_initial_metadata_ = true;
328       }
329       // The response is dropped if the status is not OK.
330       if (s.ok()) {
331         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
332                                      finish_ops_.SendMessagePtr(&resp_));
333       } else {
334         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
335       }
336       finish_ops_.set_core_cq_tag(&finish_tag_);
337       call_.PerformOps(&finish_ops_);
338     }
339 
SendInitialMetadata()340     void SendInitialMetadata() override {
341       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
342       this->Ref();
343       // The callback for this function should not be inlined because it invokes
344       // a user-controlled reaction, but any resulting OnDone can be inlined in
345       // the executor to which this callback is dispatched.
346       meta_tag_.Set(
347           call_.call(),
348           [this](bool ok) {
349             ServerReadReactor<RequestType>* reactor =
350                 reactor_.load(std::memory_order_relaxed);
351             reactor->OnSendInitialMetadataDone(ok);
352             this->MaybeDone(/*inlineable_ondone=*/true);
353           },
354           &meta_ops_, /*can_inline=*/false);
355       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
356                                     ctx_->initial_metadata_flags());
357       if (ctx_->compression_level_set()) {
358         meta_ops_.set_compression_level(ctx_->compression_level());
359       }
360       ctx_->sent_initial_metadata_ = true;
361       meta_ops_.set_core_cq_tag(&meta_tag_);
362       call_.PerformOps(&meta_ops_);
363     }
364 
Read(RequestType * req)365     void Read(RequestType* req) override {
366       this->Ref();
367       read_ops_.RecvMessage(req);
368       call_.PerformOps(&read_ops_);
369     }
370 
371    private:
372     friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
373 
ServerCallbackReaderImpl(::grpc::CallbackServerContext * ctx,::grpc::internal::Call * call,std::function<void ()> call_requester)374     ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx,
375                              ::grpc::internal::Call* call,
376                              std::function<void()> call_requester)
377         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
378 
SetupReactor(ServerReadReactor<RequestType> * reactor)379     void SetupReactor(ServerReadReactor<RequestType>* reactor) {
380       reactor_.store(reactor, std::memory_order_relaxed);
381       // The callback for this function should not be inlined because it invokes
382       // a user-controlled reaction, but any resulting OnDone can be inlined in
383       // the executor to which this callback is dispatched.
384       read_tag_.Set(
385           call_.call(),
386           [this, reactor](bool ok) {
387             reactor->OnReadDone(ok);
388             this->MaybeDone(/*inlineable_ondone=*/true);
389           },
390           &read_ops_, /*can_inline=*/false);
391       read_ops_.set_core_cq_tag(&read_tag_);
392       this->BindReactor(reactor);
393       this->MaybeCallOnCancel(reactor);
394       // Inlineable OnDone can be false here because there is no read
395       // reactor that has an inlineable OnDone; this only applies to the
396       // DefaultReactor (which is unary).
397       this->MaybeDone(/*inlineable_ondone=*/false);
398     }
399 
~ServerCallbackReaderImpl()400     ~ServerCallbackReaderImpl() {}
401 
response()402     ResponseType* response() { return &resp_; }
403 
CallOnDone()404     void CallOnDone() override {
405       reactor_.load(std::memory_order_relaxed)->OnDone();
406       grpc_call* call = call_.call();
407       auto call_requester = std::move(call_requester_);
408       if (ctx_->context_allocator() != nullptr) {
409         ctx_->context_allocator()->Release(ctx_);
410       }
411       this->~ServerCallbackReaderImpl();  // explicitly call destructor
412       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
413       call_requester();
414     }
415 
reactor()416     ServerReactor* reactor() override {
417       return reactor_.load(std::memory_order_relaxed);
418     }
419 
420     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
421         meta_ops_;
422     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
423     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
424                                 ::grpc::internal::CallOpSendMessage,
425                                 ::grpc::internal::CallOpServerSendStatus>
426         finish_ops_;
427     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
428     ::grpc::internal::CallOpSet<
429         ::grpc::internal::CallOpRecvMessage<RequestType>>
430         read_ops_;
431     ::grpc::internal::CallbackWithSuccessTag read_tag_;
432 
433     ::grpc::CallbackServerContext* const ctx_;
434     ::grpc::internal::Call call_;
435     ResponseType resp_;
436     std::function<void()> call_requester_;
437     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
438     std::atomic<ServerReadReactor<RequestType>*> reactor_;
439     // callbacks_outstanding_ follows a refcount pattern
440     std::atomic<intptr_t> callbacks_outstanding_{
441         3};  // reserve for OnStarted, Finish, and CompletionOp
442   };
443 };
444 
445 template <class RequestType, class ResponseType>
446 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler {
447  public:
CallbackServerStreamingHandler(std::function<ServerWriteReactor<ResponseType> * (::grpc::CallbackServerContext *,const RequestType *)> get_reactor)448   explicit CallbackServerStreamingHandler(
449       std::function<ServerWriteReactor<ResponseType>*(
450           ::grpc::CallbackServerContext*, const RequestType*)>
451           get_reactor)
452       : get_reactor_(std::move(get_reactor)) {}
RunHandler(const HandlerParameter & param)453   void RunHandler(const HandlerParameter& param) final {
454     // Arena allocate a writer structure
455     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
456 
457     auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
458         param.call->call(), sizeof(ServerCallbackWriterImpl)))
459         ServerCallbackWriterImpl(
460             static_cast<::grpc::CallbackServerContext*>(param.server_context),
461             param.call, static_cast<RequestType*>(param.request),
462             param.call_requester);
463     // Inlineable OnDone can be false in the CompletionOp callback because there
464     // is no write reactor that has an inlineable OnDone; this only applies to
465     // the DefaultReactor (which is unary).
466     param.server_context->BeginCompletionOp(
467         param.call,
468         [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
469         writer);
470 
471     ServerWriteReactor<ResponseType>* reactor = nullptr;
472     if (param.status.ok()) {
473       reactor = ::grpc::internal::CatchingReactorGetter<
474           ServerWriteReactor<ResponseType>>(
475           get_reactor_,
476           static_cast<::grpc::CallbackServerContext*>(param.server_context),
477           writer->request());
478     }
479     if (reactor == nullptr) {
480       // if deserialization or reactor creator failed, we need to fail the call
481       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
482           param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
483           UnimplementedWriteReactor<ResponseType>(
484               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
485     }
486 
487     writer->SetupReactor(reactor);
488   }
489 
Deserialize(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,void **)490   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
491                     ::grpc::Status* status, void** /*handler_data*/) final {
492     ::grpc::ByteBuffer buf;
493     buf.set_buffer(req);
494     auto* request =
495         new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
496             call, sizeof(RequestType))) RequestType();
497     *status =
498         ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
499     buf.Release();
500     if (status->ok()) {
501       return request;
502     }
503     request->~RequestType();
504     return nullptr;
505   }
506 
507  private:
508   std::function<ServerWriteReactor<ResponseType>*(
509       ::grpc::CallbackServerContext*, const RequestType*)>
510       get_reactor_;
511 
512   class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
513    public:
Finish(::grpc::Status s)514     void Finish(::grpc::Status s) override {
515       // A finish tag with only MaybeDone can have its callback inlined
516       // regardless even if OnDone is not inlineable because this callback just
517       // checks a ref and then decides whether or not to dispatch OnDone.
518       finish_tag_.Set(
519           call_.call(),
520           [this](bool) {
521             // Inlineable OnDone can be false here because there is
522             // no write reactor that has an inlineable OnDone; this
523             // only applies to the DefaultReactor (which is unary).
524             this->MaybeDone(/*inlineable_ondone=*/false);
525           },
526           &finish_ops_, /*can_inline=*/true);
527       finish_ops_.set_core_cq_tag(&finish_tag_);
528 
529       if (!ctx_->sent_initial_metadata_) {
530         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
531                                         ctx_->initial_metadata_flags());
532         if (ctx_->compression_level_set()) {
533           finish_ops_.set_compression_level(ctx_->compression_level());
534         }
535         ctx_->sent_initial_metadata_ = true;
536       }
537       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
538       call_.PerformOps(&finish_ops_);
539     }
540 
SendInitialMetadata()541     void SendInitialMetadata() override {
542       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
543       this->Ref();
544       // The callback for this function should not be inlined because it invokes
545       // a user-controlled reaction, but any resulting OnDone can be inlined in
546       // the executor to which this callback is dispatched.
547       meta_tag_.Set(
548           call_.call(),
549           [this](bool ok) {
550             ServerWriteReactor<ResponseType>* reactor =
551                 reactor_.load(std::memory_order_relaxed);
552             reactor->OnSendInitialMetadataDone(ok);
553             this->MaybeDone(/*inlineable_ondone=*/true);
554           },
555           &meta_ops_, /*can_inline=*/false);
556       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
557                                     ctx_->initial_metadata_flags());
558       if (ctx_->compression_level_set()) {
559         meta_ops_.set_compression_level(ctx_->compression_level());
560       }
561       ctx_->sent_initial_metadata_ = true;
562       meta_ops_.set_core_cq_tag(&meta_tag_);
563       call_.PerformOps(&meta_ops_);
564     }
565 
Write(const ResponseType * resp,::grpc::WriteOptions options)566     void Write(const ResponseType* resp,
567                ::grpc::WriteOptions options) override {
568       this->Ref();
569       if (options.is_last_message()) {
570         options.set_buffer_hint();
571       }
572       if (!ctx_->sent_initial_metadata_) {
573         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
574                                        ctx_->initial_metadata_flags());
575         if (ctx_->compression_level_set()) {
576           write_ops_.set_compression_level(ctx_->compression_level());
577         }
578         ctx_->sent_initial_metadata_ = true;
579       }
580       // TODO(vjpai): don't assert
581       GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
582       call_.PerformOps(&write_ops_);
583     }
584 
WriteAndFinish(const ResponseType * resp,::grpc::WriteOptions options,::grpc::Status s)585     void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
586                         ::grpc::Status s) override {
587       // This combines the write into the finish callback
588       // TODO(vjpai): don't assert
589       GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
590       Finish(std::move(s));
591     }
592 
593    private:
594     friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
595 
ServerCallbackWriterImpl(::grpc::CallbackServerContext * ctx,::grpc::internal::Call * call,const RequestType * req,std::function<void ()> call_requester)596     ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx,
597                              ::grpc::internal::Call* call,
598                              const RequestType* req,
599                              std::function<void()> call_requester)
600         : ctx_(ctx),
601           call_(*call),
602           req_(req),
603           call_requester_(std::move(call_requester)) {}
604 
SetupReactor(ServerWriteReactor<ResponseType> * reactor)605     void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
606       reactor_.store(reactor, std::memory_order_relaxed);
607       // The callback for this function should not be inlined because it invokes
608       // a user-controlled reaction, but any resulting OnDone can be inlined in
609       // the executor to which this callback is dispatched.
610       write_tag_.Set(
611           call_.call(),
612           [this, reactor](bool ok) {
613             reactor->OnWriteDone(ok);
614             this->MaybeDone(/*inlineable_ondone=*/true);
615           },
616           &write_ops_, /*can_inline=*/false);
617       write_ops_.set_core_cq_tag(&write_tag_);
618       this->BindReactor(reactor);
619       this->MaybeCallOnCancel(reactor);
620       // Inlineable OnDone can be false here because there is no write
621       // reactor that has an inlineable OnDone; this only applies to the
622       // DefaultReactor (which is unary).
623       this->MaybeDone(/*inlineable_ondone=*/false);
624     }
~ServerCallbackWriterImpl()625     ~ServerCallbackWriterImpl() {
626       if (req_ != nullptr) {
627         req_->~RequestType();
628       }
629     }
630 
request()631     const RequestType* request() { return req_; }
632 
CallOnDone()633     void CallOnDone() override {
634       reactor_.load(std::memory_order_relaxed)->OnDone();
635       grpc_call* call = call_.call();
636       auto call_requester = std::move(call_requester_);
637       if (ctx_->context_allocator() != nullptr) {
638         ctx_->context_allocator()->Release(ctx_);
639       }
640       this->~ServerCallbackWriterImpl();  // explicitly call destructor
641       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
642       call_requester();
643     }
644 
reactor()645     ServerReactor* reactor() override {
646       return reactor_.load(std::memory_order_relaxed);
647     }
648 
649     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
650         meta_ops_;
651     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
652     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
653                                 ::grpc::internal::CallOpSendMessage,
654                                 ::grpc::internal::CallOpServerSendStatus>
655         finish_ops_;
656     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
657     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
658                                 ::grpc::internal::CallOpSendMessage>
659         write_ops_;
660     ::grpc::internal::CallbackWithSuccessTag write_tag_;
661 
662     ::grpc::CallbackServerContext* const ctx_;
663     ::grpc::internal::Call call_;
664     const RequestType* req_;
665     std::function<void()> call_requester_;
666     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
667     std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
668     // callbacks_outstanding_ follows a refcount pattern
669     std::atomic<intptr_t> callbacks_outstanding_{
670         3};  // reserve for OnStarted, Finish, and CompletionOp
671   };
672 };
673 
674 template <class RequestType, class ResponseType>
675 class CallbackBidiHandler : public ::grpc::internal::MethodHandler {
676  public:
CallbackBidiHandler(std::function<ServerBidiReactor<RequestType,ResponseType> * (::grpc::CallbackServerContext *)> get_reactor)677   explicit CallbackBidiHandler(
678       std::function<ServerBidiReactor<RequestType, ResponseType>*(
679           ::grpc::CallbackServerContext*)>
680           get_reactor)
681       : get_reactor_(std::move(get_reactor)) {}
RunHandler(const HandlerParameter & param)682   void RunHandler(const HandlerParameter& param) final {
683     ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call());
684 
685     auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
686         param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
687         ServerCallbackReaderWriterImpl(
688             static_cast<::grpc::CallbackServerContext*>(param.server_context),
689             param.call, param.call_requester);
690     // Inlineable OnDone can be false in the CompletionOp callback because there
691     // is no bidi reactor that has an inlineable OnDone; this only applies to
692     // the DefaultReactor (which is unary).
693     param.server_context->BeginCompletionOp(
694         param.call,
695         [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
696         stream);
697 
698     ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
699     if (param.status.ok()) {
700       reactor = ::grpc::internal::CatchingReactorGetter<
701           ServerBidiReactor<RequestType, ResponseType>>(
702           get_reactor_,
703           static_cast<::grpc::CallbackServerContext*>(param.server_context));
704     }
705 
706     if (reactor == nullptr) {
707       // if deserialization or reactor creator failed, we need to fail the call
708       reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc(
709           param.call->call(),
710           sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
711           UnimplementedBidiReactor<RequestType, ResponseType>(
712               ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, ""));
713     }
714 
715     stream->SetupReactor(reactor);
716   }
717 
718  private:
719   std::function<ServerBidiReactor<RequestType, ResponseType>*(
720       ::grpc::CallbackServerContext*)>
721       get_reactor_;
722 
723   class ServerCallbackReaderWriterImpl
724       : public ServerCallbackReaderWriter<RequestType, ResponseType> {
725    public:
Finish(::grpc::Status s)726     void Finish(::grpc::Status s) override {
727       // A finish tag with only MaybeDone can have its callback inlined
728       // regardless even if OnDone is not inlineable because this callback just
729       // checks a ref and then decides whether or not to dispatch OnDone.
730       finish_tag_.Set(
731           call_.call(),
732           [this](bool) {
733             // Inlineable OnDone can be false here because there is
734             // no bidi reactor that has an inlineable OnDone; this
735             // only applies to the DefaultReactor (which is unary).
736             this->MaybeDone(/*inlineable_ondone=*/false);
737           },
738           &finish_ops_, /*can_inline=*/true);
739       finish_ops_.set_core_cq_tag(&finish_tag_);
740 
741       if (!ctx_->sent_initial_metadata_) {
742         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
743                                         ctx_->initial_metadata_flags());
744         if (ctx_->compression_level_set()) {
745           finish_ops_.set_compression_level(ctx_->compression_level());
746         }
747         ctx_->sent_initial_metadata_ = true;
748       }
749       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
750       call_.PerformOps(&finish_ops_);
751     }
752 
SendInitialMetadata()753     void SendInitialMetadata() override {
754       GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
755       this->Ref();
756       // The callback for this function should not be inlined because it invokes
757       // a user-controlled reaction, but any resulting OnDone can be inlined in
758       // the executor to which this callback is dispatched.
759       meta_tag_.Set(
760           call_.call(),
761           [this](bool ok) {
762             ServerBidiReactor<RequestType, ResponseType>* reactor =
763                 reactor_.load(std::memory_order_relaxed);
764             reactor->OnSendInitialMetadataDone(ok);
765             this->MaybeDone(/*inlineable_ondone=*/true);
766           },
767           &meta_ops_, /*can_inline=*/false);
768       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
769                                     ctx_->initial_metadata_flags());
770       if (ctx_->compression_level_set()) {
771         meta_ops_.set_compression_level(ctx_->compression_level());
772       }
773       ctx_->sent_initial_metadata_ = true;
774       meta_ops_.set_core_cq_tag(&meta_tag_);
775       call_.PerformOps(&meta_ops_);
776     }
777 
Write(const ResponseType * resp,::grpc::WriteOptions options)778     void Write(const ResponseType* resp,
779                ::grpc::WriteOptions options) override {
780       this->Ref();
781       if (options.is_last_message()) {
782         options.set_buffer_hint();
783       }
784       if (!ctx_->sent_initial_metadata_) {
785         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
786                                        ctx_->initial_metadata_flags());
787         if (ctx_->compression_level_set()) {
788           write_ops_.set_compression_level(ctx_->compression_level());
789         }
790         ctx_->sent_initial_metadata_ = true;
791       }
792       // TODO(vjpai): don't assert
793       GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
794       call_.PerformOps(&write_ops_);
795     }
796 
WriteAndFinish(const ResponseType * resp,::grpc::WriteOptions options,::grpc::Status s)797     void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options,
798                         ::grpc::Status s) override {
799       // TODO(vjpai): don't assert
800       GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
801       Finish(std::move(s));
802     }
803 
Read(RequestType * req)804     void Read(RequestType* req) override {
805       this->Ref();
806       read_ops_.RecvMessage(req);
807       call_.PerformOps(&read_ops_);
808     }
809 
810    private:
811     friend class CallbackBidiHandler<RequestType, ResponseType>;
812 
ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext * ctx,::grpc::internal::Call * call,std::function<void ()> call_requester)813     ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx,
814                                    ::grpc::internal::Call* call,
815                                    std::function<void()> call_requester)
816         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
817 
SetupReactor(ServerBidiReactor<RequestType,ResponseType> * reactor)818     void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
819       reactor_.store(reactor, std::memory_order_relaxed);
820       // The callbacks for these functions should not be inlined because they
821       // invoke user-controlled reactions, but any resulting OnDones can be
822       // inlined in the executor to which a callback is dispatched.
823       write_tag_.Set(
824           call_.call(),
825           [this, reactor](bool ok) {
826             reactor->OnWriteDone(ok);
827             this->MaybeDone(/*inlineable_ondone=*/true);
828           },
829           &write_ops_, /*can_inline=*/false);
830       write_ops_.set_core_cq_tag(&write_tag_);
831       read_tag_.Set(
832           call_.call(),
833           [this, reactor](bool ok) {
834             reactor->OnReadDone(ok);
835             this->MaybeDone(/*inlineable_ondone=*/true);
836           },
837           &read_ops_, /*can_inline=*/false);
838       read_ops_.set_core_cq_tag(&read_tag_);
839       this->BindReactor(reactor);
840       this->MaybeCallOnCancel(reactor);
841       // Inlineable OnDone can be false here because there is no bidi
842       // reactor that has an inlineable OnDone; this only applies to the
843       // DefaultReactor (which is unary).
844       this->MaybeDone(/*inlineable_ondone=*/false);
845     }
846 
CallOnDone()847     void CallOnDone() override {
848       reactor_.load(std::memory_order_relaxed)->OnDone();
849       grpc_call* call = call_.call();
850       auto call_requester = std::move(call_requester_);
851       if (ctx_->context_allocator() != nullptr) {
852         ctx_->context_allocator()->Release(ctx_);
853       }
854       this->~ServerCallbackReaderWriterImpl();  // explicitly call destructor
855       ::grpc::g_core_codegen_interface->grpc_call_unref(call);
856       call_requester();
857     }
858 
reactor()859     ServerReactor* reactor() override {
860       return reactor_.load(std::memory_order_relaxed);
861     }
862 
863     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
864         meta_ops_;
865     ::grpc::internal::CallbackWithSuccessTag meta_tag_;
866     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
867                                 ::grpc::internal::CallOpSendMessage,
868                                 ::grpc::internal::CallOpServerSendStatus>
869         finish_ops_;
870     ::grpc::internal::CallbackWithSuccessTag finish_tag_;
871     ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
872                                 ::grpc::internal::CallOpSendMessage>
873         write_ops_;
874     ::grpc::internal::CallbackWithSuccessTag write_tag_;
875     ::grpc::internal::CallOpSet<
876         ::grpc::internal::CallOpRecvMessage<RequestType>>
877         read_ops_;
878     ::grpc::internal::CallbackWithSuccessTag read_tag_;
879 
880     ::grpc::CallbackServerContext* const ctx_;
881     ::grpc::internal::Call call_;
882     std::function<void()> call_requester_;
883     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
884     std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
885     // callbacks_outstanding_ follows a refcount pattern
886     std::atomic<intptr_t> callbacks_outstanding_{
887         3};  // reserve for OnStarted, Finish, and CompletionOp
888   };
889 };
890 
891 }  // namespace internal
892 }  // namespace grpc
893 
894 #endif  // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H
895