• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 
18 #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
20 
21 #include <grpc/grpc.h>
22 #include <grpc/impl/call.h>
23 #include <grpcpp/impl/rpc_service_method.h>
24 #include <grpcpp/server_context.h>
25 #include <grpcpp/support/message_allocator.h>
26 #include <grpcpp/support/server_callback.h>
27 #include <grpcpp/support/status.h>
28 
29 #include "absl/log/absl_check.h"
30 
31 namespace grpc {
32 namespace internal {
33 
34 template <class RequestType, class ResponseType>
35 class CallbackUnaryHandler : public grpc::internal::MethodHandler {
36  public:
CallbackUnaryHandler(std::function<ServerUnaryReactor * (grpc::CallbackServerContext *,const RequestType *,ResponseType *)> get_reactor)37   explicit CallbackUnaryHandler(
38       std::function<ServerUnaryReactor*(grpc::CallbackServerContext*,
39                                         const RequestType*, ResponseType*)>
40           get_reactor)
41       : get_reactor_(std::move(get_reactor)) {}
42 
SetMessageAllocator(MessageAllocator<RequestType,ResponseType> * allocator)43   void SetMessageAllocator(
44       MessageAllocator<RequestType, ResponseType>* allocator) {
45     allocator_ = allocator;
46   }
47 
RunHandler(const HandlerParameter & param)48   void RunHandler(const HandlerParameter& param) final {
49     // Arena allocate a controller structure (that includes request/response)
50     grpc_call_ref(param.call->call());
51     auto* allocator_state =
52         static_cast<MessageHolder<RequestType, ResponseType>*>(
53             param.internal_data);
54 
55     auto* call = new (grpc_call_arena_alloc(param.call->call(),
56                                             sizeof(ServerCallbackUnaryImpl)))
57         ServerCallbackUnaryImpl(
58             static_cast<grpc::CallbackServerContext*>(param.server_context),
59             param.call, allocator_state, param.call_requester);
60     param.server_context->BeginCompletionOp(
61         param.call, [call](bool) { call->MaybeDone(); }, call);
62 
63     ServerUnaryReactor* reactor = nullptr;
64     if (param.status.ok()) {
65       reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
66           get_reactor_,
67           static_cast<grpc::CallbackServerContext*>(param.server_context),
68           call->request(), call->response());
69     }
70 
71     if (reactor == nullptr) {
72       // if deserialization or reactor creator failed, we need to fail the call
73       reactor = new (grpc_call_arena_alloc(param.call->call(),
74                                            sizeof(UnimplementedUnaryReactor)))
75           UnimplementedUnaryReactor(
76               grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
77     }
78 
79     /// Invoke SetupReactor as the last part of the handler
80     call->SetupReactor(reactor);
81   }
82 
Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void ** handler_data)83   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
84                     grpc::Status* status, void** handler_data) final {
85     grpc::ByteBuffer buf;
86     buf.set_buffer(req);
87     RequestType* request = nullptr;
88     MessageHolder<RequestType, ResponseType>* allocator_state;
89     if (allocator_ != nullptr) {
90       allocator_state = allocator_->AllocateMessages();
91     } else {
92       allocator_state = new (grpc_call_arena_alloc(
93           call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
94           DefaultMessageHolder<RequestType, ResponseType>();
95     }
96     *handler_data = allocator_state;
97     request = allocator_state->request();
98     *status =
99         grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
100     buf.Release();
101     if (status->ok()) {
102       return request;
103     }
104     return nullptr;
105   }
106 
107  private:
108   std::function<ServerUnaryReactor*(grpc::CallbackServerContext*,
109                                     const RequestType*, ResponseType*)>
110       get_reactor_;
111   MessageAllocator<RequestType, ResponseType>* allocator_ = nullptr;
112 
113   class ServerCallbackUnaryImpl : public ServerCallbackUnary {
114    public:
Finish(grpc::Status s)115     void Finish(grpc::Status s) override {
116       // A callback that only contains a call to MaybeDone can be run as an
117       // inline callback regardless of whether or not OnDone is inlineable
118       // because if the actual OnDone callback needs to be scheduled, MaybeDone
119       // is responsible for dispatching to an executor thread if needed. Thus,
120       // when setting up the finish_tag_, we can set its own callback to
121       // inlineable.
122       finish_tag_.Set(
123           call_.call(),
124           [this](bool) {
125             this->MaybeDone(
126                 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
127           },
128           &finish_ops_, /*can_inline=*/true);
129       finish_ops_.set_core_cq_tag(&finish_tag_);
130 
131       if (!ctx_->sent_initial_metadata_) {
132         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
133                                         ctx_->initial_metadata_flags());
134         if (ctx_->compression_level_set()) {
135           finish_ops_.set_compression_level(ctx_->compression_level());
136         }
137         ctx_->sent_initial_metadata_ = true;
138       }
139       // The response is dropped if the status is not OK.
140       if (s.ok()) {
141         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
142                                      finish_ops_.SendMessagePtr(response()));
143       } else {
144         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
145       }
146       finish_ops_.set_core_cq_tag(&finish_tag_);
147       call_.PerformOps(&finish_ops_);
148     }
149 
SendInitialMetadata()150     void SendInitialMetadata() override {
151       ABSL_CHECK(!ctx_->sent_initial_metadata_);
152       this->Ref();
153       // The callback for this function should not be marked inline because it
154       // is directly invoking a user-controlled reaction
155       // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
156       // thread. However, any OnDone needed after that can be inlined because it
157       // is already running on an executor thread.
158       meta_tag_.Set(
159           call_.call(),
160           [this](bool ok) {
161             ServerUnaryReactor* reactor =
162                 reactor_.load(std::memory_order_relaxed);
163             reactor->OnSendInitialMetadataDone(ok);
164             this->MaybeDone(/*inlineable_ondone=*/true);
165           },
166           &meta_ops_, /*can_inline=*/false);
167       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
168                                     ctx_->initial_metadata_flags());
169       if (ctx_->compression_level_set()) {
170         meta_ops_.set_compression_level(ctx_->compression_level());
171       }
172       ctx_->sent_initial_metadata_ = true;
173       meta_ops_.set_core_cq_tag(&meta_tag_);
174       call_.PerformOps(&meta_ops_);
175     }
176 
177    private:
178     friend class CallbackUnaryHandler<RequestType, ResponseType>;
179 
ServerCallbackUnaryImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,MessageHolder<RequestType,ResponseType> * allocator_state,std::function<void ()> call_requester)180     ServerCallbackUnaryImpl(
181         grpc::CallbackServerContext* ctx, grpc::internal::Call* call,
182         MessageHolder<RequestType, ResponseType>* allocator_state,
183         std::function<void()> call_requester)
184         : ctx_(ctx),
185           call_(*call),
186           allocator_state_(allocator_state),
187           call_requester_(std::move(call_requester)) {
188       ctx_->set_message_allocator_state(allocator_state);
189     }
190 
call()191     grpc_call* call() override { return call_.call(); }
192 
193     /// SetupReactor binds the reactor (which also releases any queued
194     /// operations), maybe calls OnCancel if possible/needed, and maybe marks
195     /// the completion of the RPC. This should be the last component of the
196     /// handler.
SetupReactor(ServerUnaryReactor * reactor)197     void SetupReactor(ServerUnaryReactor* reactor) {
198       reactor_.store(reactor, std::memory_order_relaxed);
199       this->BindReactor(reactor);
200       this->MaybeCallOnCancel(reactor);
201       this->MaybeDone(reactor->InternalInlineable());
202     }
203 
request()204     const RequestType* request() { return allocator_state_->request(); }
response()205     ResponseType* response() { return allocator_state_->response(); }
206 
CallOnDone()207     void CallOnDone() override {
208       reactor_.load(std::memory_order_relaxed)->OnDone();
209       grpc_call* call = call_.call();
210       auto call_requester = std::move(call_requester_);
211       allocator_state_->Release();
212       if (ctx_->context_allocator() != nullptr) {
213         ctx_->context_allocator()->Release(ctx_);
214       }
215       this->~ServerCallbackUnaryImpl();  // explicitly call destructor
216       grpc_call_unref(call);
217       call_requester();
218     }
219 
reactor()220     ServerReactor* reactor() override {
221       return reactor_.load(std::memory_order_relaxed);
222     }
223 
224     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
225         meta_ops_;
226     grpc::internal::CallbackWithSuccessTag meta_tag_;
227     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
228                               grpc::internal::CallOpSendMessage,
229                               grpc::internal::CallOpServerSendStatus>
230         finish_ops_;
231     grpc::internal::CallbackWithSuccessTag finish_tag_;
232 
233     grpc::CallbackServerContext* const ctx_;
234     grpc::internal::Call call_;
235     MessageHolder<RequestType, ResponseType>* const allocator_state_;
236     std::function<void()> call_requester_;
237     // reactor_ can always be loaded/stored with relaxed memory ordering because
238     // its value is only set once, independently of other data in the object,
239     // and the loads that use it will always actually come provably later even
240     // though they are from different threads since they are triggered by
241     // actions initiated only by the setting up of the reactor_ variable. In
242     // a sense, it's a delayed "const": it gets its value from the SetupReactor
243     // method (not the constructor, so it's not a true const), but it doesn't
244     // change after that and it only gets used by actions caused, directly or
245     // indirectly, by that setup. This comment also applies to the reactor_
246     // variables of the other streaming objects in this file.
247     std::atomic<ServerUnaryReactor*> reactor_;
248     // callbacks_outstanding_ follows a refcount pattern
249     std::atomic<intptr_t> callbacks_outstanding_{
250         3};  // reserve for start, Finish, and CompletionOp
251   };
252 };
253 
254 template <class RequestType, class ResponseType>
255 class CallbackClientStreamingHandler : public grpc::internal::MethodHandler {
256  public:
CallbackClientStreamingHandler(std::function<ServerReadReactor<RequestType> * (grpc::CallbackServerContext *,ResponseType *)> get_reactor)257   explicit CallbackClientStreamingHandler(
258       std::function<ServerReadReactor<RequestType>*(
259           grpc::CallbackServerContext*, ResponseType*)>
260           get_reactor)
261       : get_reactor_(std::move(get_reactor)) {}
RunHandler(const HandlerParameter & param)262   void RunHandler(const HandlerParameter& param) final {
263     // Arena allocate a reader structure (that includes response)
264     grpc_call_ref(param.call->call());
265 
266     auto* reader = new (grpc_call_arena_alloc(param.call->call(),
267                                               sizeof(ServerCallbackReaderImpl)))
268         ServerCallbackReaderImpl(
269             static_cast<grpc::CallbackServerContext*>(param.server_context),
270             param.call, param.call_requester);
271     // Inlineable OnDone can be false in the CompletionOp callback because there
272     // is no read reactor that has an inlineable OnDone; this only applies to
273     // the DefaultReactor (which is unary).
274     param.server_context->BeginCompletionOp(
275         param.call,
276         [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
277         reader);
278 
279     ServerReadReactor<RequestType>* reactor = nullptr;
280     if (param.status.ok()) {
281       reactor =
282           grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>(
283               get_reactor_,
284               static_cast<grpc::CallbackServerContext*>(param.server_context),
285               reader->response());
286     }
287 
288     if (reactor == nullptr) {
289       // if deserialization or reactor creator failed, we need to fail the call
290       reactor = new (grpc_call_arena_alloc(
291           param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
292           UnimplementedReadReactor<RequestType>(
293               grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
294     }
295 
296     reader->SetupReactor(reactor);
297   }
298 
299  private:
300   std::function<ServerReadReactor<RequestType>*(grpc::CallbackServerContext*,
301                                                 ResponseType*)>
302       get_reactor_;
303 
304   class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
305    public:
Finish(grpc::Status s)306     void Finish(grpc::Status s) override {
307       // A finish tag with only MaybeDone can have its callback inlined
308       // regardless even if OnDone is not inlineable because this callback just
309       // checks a ref and then decides whether or not to dispatch OnDone.
310       finish_tag_.Set(
311           call_.call(),
312           [this](bool) {
313             // Inlineable OnDone can be false here because there is
314             // no read reactor that has an inlineable OnDone; this
315             // only applies to the DefaultReactor (which is unary).
316             this->MaybeDone(/*inlineable_ondone=*/false);
317           },
318           &finish_ops_, /*can_inline=*/true);
319       if (!ctx_->sent_initial_metadata_) {
320         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
321                                         ctx_->initial_metadata_flags());
322         if (ctx_->compression_level_set()) {
323           finish_ops_.set_compression_level(ctx_->compression_level());
324         }
325         ctx_->sent_initial_metadata_ = true;
326       }
327       // The response is dropped if the status is not OK.
328       if (s.ok()) {
329         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
330                                      finish_ops_.SendMessagePtr(&resp_));
331       } else {
332         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
333       }
334       finish_ops_.set_core_cq_tag(&finish_tag_);
335       call_.PerformOps(&finish_ops_);
336     }
337 
SendInitialMetadata()338     void SendInitialMetadata() override {
339       ABSL_CHECK(!ctx_->sent_initial_metadata_);
340       this->Ref();
341       // The callback for this function should not be inlined because it invokes
342       // a user-controlled reaction, but any resulting OnDone can be inlined in
343       // the executor to which this callback is dispatched.
344       meta_tag_.Set(
345           call_.call(),
346           [this](bool ok) {
347             ServerReadReactor<RequestType>* reactor =
348                 reactor_.load(std::memory_order_relaxed);
349             reactor->OnSendInitialMetadataDone(ok);
350             this->MaybeDone(/*inlineable_ondone=*/true);
351           },
352           &meta_ops_, /*can_inline=*/false);
353       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
354                                     ctx_->initial_metadata_flags());
355       if (ctx_->compression_level_set()) {
356         meta_ops_.set_compression_level(ctx_->compression_level());
357       }
358       ctx_->sent_initial_metadata_ = true;
359       meta_ops_.set_core_cq_tag(&meta_tag_);
360       call_.PerformOps(&meta_ops_);
361     }
362 
Read(RequestType * req)363     void Read(RequestType* req) override {
364       this->Ref();
365       read_ops_.RecvMessage(req);
366       call_.PerformOps(&read_ops_);
367     }
368 
369    private:
370     friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
371 
ServerCallbackReaderImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,std::function<void ()> call_requester)372     ServerCallbackReaderImpl(grpc::CallbackServerContext* ctx,
373                              grpc::internal::Call* call,
374                              std::function<void()> call_requester)
375         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
376 
call()377     grpc_call* call() override { return call_.call(); }
378 
SetupReactor(ServerReadReactor<RequestType> * reactor)379     void SetupReactor(ServerReadReactor<RequestType>* reactor) {
380       reactor_.store(reactor, std::memory_order_relaxed);
381       // The callback for this function should not be inlined because it invokes
382       // a user-controlled reaction, but any resulting OnDone can be inlined in
383       // the executor to which this callback is dispatched.
384       read_tag_.Set(
385           call_.call(),
386           [this, reactor](bool ok) {
387             if (GPR_UNLIKELY(!ok)) {
388               ctx_->MaybeMarkCancelledOnRead();
389             }
390             reactor->OnReadDone(ok);
391             this->MaybeDone(/*inlineable_ondone=*/true);
392           },
393           &read_ops_, /*can_inline=*/false);
394       read_ops_.set_core_cq_tag(&read_tag_);
395       this->BindReactor(reactor);
396       this->MaybeCallOnCancel(reactor);
397       // Inlineable OnDone can be false here because there is no read
398       // reactor that has an inlineable OnDone; this only applies to the
399       // DefaultReactor (which is unary).
400       this->MaybeDone(/*inlineable_ondone=*/false);
401     }
402 
~ServerCallbackReaderImpl()403     ~ServerCallbackReaderImpl() {}
404 
response()405     ResponseType* response() { return &resp_; }
406 
CallOnDone()407     void CallOnDone() override {
408       reactor_.load(std::memory_order_relaxed)->OnDone();
409       grpc_call* call = call_.call();
410       auto call_requester = std::move(call_requester_);
411       if (ctx_->context_allocator() != nullptr) {
412         ctx_->context_allocator()->Release(ctx_);
413       }
414       this->~ServerCallbackReaderImpl();  // explicitly call destructor
415       grpc_call_unref(call);
416       call_requester();
417     }
418 
reactor()419     ServerReactor* reactor() override {
420       return reactor_.load(std::memory_order_relaxed);
421     }
422 
423     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
424         meta_ops_;
425     grpc::internal::CallbackWithSuccessTag meta_tag_;
426     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
427                               grpc::internal::CallOpSendMessage,
428                               grpc::internal::CallOpServerSendStatus>
429         finish_ops_;
430     grpc::internal::CallbackWithSuccessTag finish_tag_;
431     grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<RequestType>>
432         read_ops_;
433     grpc::internal::CallbackWithSuccessTag read_tag_;
434 
435     grpc::CallbackServerContext* const ctx_;
436     grpc::internal::Call call_;
437     ResponseType resp_;
438     std::function<void()> call_requester_;
439     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
440     std::atomic<ServerReadReactor<RequestType>*> reactor_;
441     // callbacks_outstanding_ follows a refcount pattern
442     std::atomic<intptr_t> callbacks_outstanding_{
443         3};  // reserve for OnStarted, Finish, and CompletionOp
444   };
445 };
446 
447 template <class RequestType, class ResponseType>
448 class CallbackServerStreamingHandler : public grpc::internal::MethodHandler {
449  public:
CallbackServerStreamingHandler(std::function<ServerWriteReactor<ResponseType> * (grpc::CallbackServerContext *,const RequestType *)> get_reactor)450   explicit CallbackServerStreamingHandler(
451       std::function<ServerWriteReactor<ResponseType>*(
452           grpc::CallbackServerContext*, const RequestType*)>
453           get_reactor)
454       : get_reactor_(std::move(get_reactor)) {}
RunHandler(const HandlerParameter & param)455   void RunHandler(const HandlerParameter& param) final {
456     // Arena allocate a writer structure
457     grpc_call_ref(param.call->call());
458 
459     auto* writer = new (grpc_call_arena_alloc(param.call->call(),
460                                               sizeof(ServerCallbackWriterImpl)))
461         ServerCallbackWriterImpl(
462             static_cast<grpc::CallbackServerContext*>(param.server_context),
463             param.call, static_cast<RequestType*>(param.request),
464             param.call_requester);
465     // Inlineable OnDone can be false in the CompletionOp callback because there
466     // is no write reactor that has an inlineable OnDone; this only applies to
467     // the DefaultReactor (which is unary).
468     param.server_context->BeginCompletionOp(
469         param.call,
470         [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
471         writer);
472 
473     ServerWriteReactor<ResponseType>* reactor = nullptr;
474     if (param.status.ok()) {
475       reactor = grpc::internal::CatchingReactorGetter<
476           ServerWriteReactor<ResponseType>>(
477           get_reactor_,
478           static_cast<grpc::CallbackServerContext*>(param.server_context),
479           writer->request());
480     }
481     if (reactor == nullptr) {
482       // if deserialization or reactor creator failed, we need to fail the call
483       reactor = new (grpc_call_arena_alloc(
484           param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
485           UnimplementedWriteReactor<ResponseType>(
486               grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
487     }
488 
489     writer->SetupReactor(reactor);
490   }
491 
Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void **)492   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
493                     grpc::Status* status, void** /*handler_data*/) final {
494     grpc::ByteBuffer buf;
495     buf.set_buffer(req);
496     auto* request =
497         new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
498     *status =
499         grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
500     buf.Release();
501     if (status->ok()) {
502       return request;
503     }
504     request->~RequestType();
505     return nullptr;
506   }
507 
508  private:
509   std::function<ServerWriteReactor<ResponseType>*(grpc::CallbackServerContext*,
510                                                   const RequestType*)>
511       get_reactor_;
512 
513   class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
514    public:
Finish(grpc::Status s)515     void Finish(grpc::Status s) override {
516       // A finish tag with only MaybeDone can have its callback inlined
517       // regardless even if OnDone is not inlineable because this callback just
518       // checks a ref and then decides whether or not to dispatch OnDone.
519       finish_tag_.Set(
520           call_.call(),
521           [this](bool) {
522             // Inlineable OnDone can be false here because there is
523             // no write reactor that has an inlineable OnDone; this
524             // only applies to the DefaultReactor (which is unary).
525             this->MaybeDone(/*inlineable_ondone=*/false);
526           },
527           &finish_ops_, /*can_inline=*/true);
528       finish_ops_.set_core_cq_tag(&finish_tag_);
529 
530       if (!ctx_->sent_initial_metadata_) {
531         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
532                                         ctx_->initial_metadata_flags());
533         if (ctx_->compression_level_set()) {
534           finish_ops_.set_compression_level(ctx_->compression_level());
535         }
536         ctx_->sent_initial_metadata_ = true;
537       }
538       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
539       call_.PerformOps(&finish_ops_);
540     }
541 
SendInitialMetadata()542     void SendInitialMetadata() override {
543       ABSL_CHECK(!ctx_->sent_initial_metadata_);
544       this->Ref();
545       // The callback for this function should not be inlined because it invokes
546       // a user-controlled reaction, but any resulting OnDone can be inlined in
547       // the executor to which this callback is dispatched.
548       meta_tag_.Set(
549           call_.call(),
550           [this](bool ok) {
551             ServerWriteReactor<ResponseType>* reactor =
552                 reactor_.load(std::memory_order_relaxed);
553             reactor->OnSendInitialMetadataDone(ok);
554             this->MaybeDone(/*inlineable_ondone=*/true);
555           },
556           &meta_ops_, /*can_inline=*/false);
557       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
558                                     ctx_->initial_metadata_flags());
559       if (ctx_->compression_level_set()) {
560         meta_ops_.set_compression_level(ctx_->compression_level());
561       }
562       ctx_->sent_initial_metadata_ = true;
563       meta_ops_.set_core_cq_tag(&meta_tag_);
564       call_.PerformOps(&meta_ops_);
565     }
566 
Write(const ResponseType * resp,grpc::WriteOptions options)567     void Write(const ResponseType* resp, grpc::WriteOptions options) override {
568       this->Ref();
569       if (options.is_last_message()) {
570         options.set_buffer_hint();
571       }
572       if (!ctx_->sent_initial_metadata_) {
573         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
574                                        ctx_->initial_metadata_flags());
575         if (ctx_->compression_level_set()) {
576           write_ops_.set_compression_level(ctx_->compression_level());
577         }
578         ctx_->sent_initial_metadata_ = true;
579       }
580       // TODO(vjpai): don't assert
581       ABSL_CHECK(write_ops_.SendMessagePtr(resp, options).ok());
582       call_.PerformOps(&write_ops_);
583     }
584 
WriteAndFinish(const ResponseType * resp,grpc::WriteOptions options,grpc::Status s)585     void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
586                         grpc::Status s) override {
587       // This combines the write into the finish callback
588       // TODO(vjpai): don't assert
589       ABSL_CHECK(finish_ops_.SendMessagePtr(resp, options).ok());
590       Finish(std::move(s));
591     }
592 
593    private:
594     friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
595 
ServerCallbackWriterImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,const RequestType * req,std::function<void ()> call_requester)596     ServerCallbackWriterImpl(grpc::CallbackServerContext* ctx,
597                              grpc::internal::Call* call, const RequestType* req,
598                              std::function<void()> call_requester)
599         : ctx_(ctx),
600           call_(*call),
601           req_(req),
602           call_requester_(std::move(call_requester)) {}
603 
call()604     grpc_call* call() override { return call_.call(); }
605 
SetupReactor(ServerWriteReactor<ResponseType> * reactor)606     void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
607       reactor_.store(reactor, std::memory_order_relaxed);
608       // The callback for this function should not be inlined because it invokes
609       // a user-controlled reaction, but any resulting OnDone can be inlined in
610       // the executor to which this callback is dispatched.
611       write_tag_.Set(
612           call_.call(),
613           [this, reactor](bool ok) {
614             reactor->OnWriteDone(ok);
615             this->MaybeDone(/*inlineable_ondone=*/true);
616           },
617           &write_ops_, /*can_inline=*/false);
618       write_ops_.set_core_cq_tag(&write_tag_);
619       this->BindReactor(reactor);
620       this->MaybeCallOnCancel(reactor);
621       // Inlineable OnDone can be false here because there is no write
622       // reactor that has an inlineable OnDone; this only applies to the
623       // DefaultReactor (which is unary).
624       this->MaybeDone(/*inlineable_ondone=*/false);
625     }
~ServerCallbackWriterImpl()626     ~ServerCallbackWriterImpl() {
627       if (req_ != nullptr) {
628         req_->~RequestType();
629       }
630     }
631 
request()632     const RequestType* request() { return req_; }
633 
CallOnDone()634     void CallOnDone() override {
635       reactor_.load(std::memory_order_relaxed)->OnDone();
636       grpc_call* call = call_.call();
637       auto call_requester = std::move(call_requester_);
638       if (ctx_->context_allocator() != nullptr) {
639         ctx_->context_allocator()->Release(ctx_);
640       }
641       this->~ServerCallbackWriterImpl();  // explicitly call destructor
642       grpc_call_unref(call);
643       call_requester();
644     }
645 
reactor()646     ServerReactor* reactor() override {
647       return reactor_.load(std::memory_order_relaxed);
648     }
649 
650     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
651         meta_ops_;
652     grpc::internal::CallbackWithSuccessTag meta_tag_;
653     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
654                               grpc::internal::CallOpSendMessage,
655                               grpc::internal::CallOpServerSendStatus>
656         finish_ops_;
657     grpc::internal::CallbackWithSuccessTag finish_tag_;
658     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
659                               grpc::internal::CallOpSendMessage>
660         write_ops_;
661     grpc::internal::CallbackWithSuccessTag write_tag_;
662 
663     grpc::CallbackServerContext* const ctx_;
664     grpc::internal::Call call_;
665     const RequestType* req_;
666     std::function<void()> call_requester_;
667     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
668     std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
669     // callbacks_outstanding_ follows a refcount pattern
670     std::atomic<intptr_t> callbacks_outstanding_{
671         3};  // reserve for OnStarted, Finish, and CompletionOp
672   };
673 };
674 
675 template <class RequestType, class ResponseType>
676 class CallbackBidiHandler : public grpc::internal::MethodHandler {
677  public:
CallbackBidiHandler(std::function<ServerBidiReactor<RequestType,ResponseType> * (grpc::CallbackServerContext *)> get_reactor)678   explicit CallbackBidiHandler(
679       std::function<ServerBidiReactor<RequestType, ResponseType>*(
680           grpc::CallbackServerContext*)>
681           get_reactor)
682       : get_reactor_(std::move(get_reactor)) {}
RunHandler(const HandlerParameter & param)683   void RunHandler(const HandlerParameter& param) final {
684     grpc_call_ref(param.call->call());
685 
686     auto* stream = new (grpc_call_arena_alloc(
687         param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
688         ServerCallbackReaderWriterImpl(
689             static_cast<grpc::CallbackServerContext*>(param.server_context),
690             param.call, param.call_requester);
691     // Inlineable OnDone can be false in the CompletionOp callback because there
692     // is no bidi reactor that has an inlineable OnDone; this only applies to
693     // the DefaultReactor (which is unary).
694     param.server_context->BeginCompletionOp(
695         param.call,
696         [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
697         stream);
698 
699     ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
700     if (param.status.ok()) {
701       reactor = grpc::internal::CatchingReactorGetter<
702           ServerBidiReactor<RequestType, ResponseType>>(
703           get_reactor_,
704           static_cast<grpc::CallbackServerContext*>(param.server_context));
705     }
706 
707     if (reactor == nullptr) {
708       // if deserialization or reactor creator failed, we need to fail the call
709       reactor = new (grpc_call_arena_alloc(
710           param.call->call(),
711           sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
712           UnimplementedBidiReactor<RequestType, ResponseType>(
713               grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
714     }
715 
716     stream->SetupReactor(reactor);
717   }
718 
719  private:
720   std::function<ServerBidiReactor<RequestType, ResponseType>*(
721       grpc::CallbackServerContext*)>
722       get_reactor_;
723 
724   class ServerCallbackReaderWriterImpl
725       : public ServerCallbackReaderWriter<RequestType, ResponseType> {
726    public:
Finish(grpc::Status s)727     void Finish(grpc::Status s) override {
728       // A finish tag with only MaybeDone can have its callback inlined
729       // regardless even if OnDone is not inlineable because this callback just
730       // checks a ref and then decides whether or not to dispatch OnDone.
731       finish_tag_.Set(
732           call_.call(),
733           [this](bool) {
734             // Inlineable OnDone can be false here because there is
735             // no bidi reactor that has an inlineable OnDone; this
736             // only applies to the DefaultReactor (which is unary).
737             this->MaybeDone(/*inlineable_ondone=*/false);
738           },
739           &finish_ops_, /*can_inline=*/true);
740       finish_ops_.set_core_cq_tag(&finish_tag_);
741 
742       if (!ctx_->sent_initial_metadata_) {
743         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
744                                         ctx_->initial_metadata_flags());
745         if (ctx_->compression_level_set()) {
746           finish_ops_.set_compression_level(ctx_->compression_level());
747         }
748         ctx_->sent_initial_metadata_ = true;
749       }
750       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
751       call_.PerformOps(&finish_ops_);
752     }
753 
SendInitialMetadata()754     void SendInitialMetadata() override {
755       ABSL_CHECK(!ctx_->sent_initial_metadata_);
756       this->Ref();
757       // The callback for this function should not be inlined because it invokes
758       // a user-controlled reaction, but any resulting OnDone can be inlined in
759       // the executor to which this callback is dispatched.
760       meta_tag_.Set(
761           call_.call(),
762           [this](bool ok) {
763             ServerBidiReactor<RequestType, ResponseType>* reactor =
764                 reactor_.load(std::memory_order_relaxed);
765             reactor->OnSendInitialMetadataDone(ok);
766             this->MaybeDone(/*inlineable_ondone=*/true);
767           },
768           &meta_ops_, /*can_inline=*/false);
769       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
770                                     ctx_->initial_metadata_flags());
771       if (ctx_->compression_level_set()) {
772         meta_ops_.set_compression_level(ctx_->compression_level());
773       }
774       ctx_->sent_initial_metadata_ = true;
775       meta_ops_.set_core_cq_tag(&meta_tag_);
776       call_.PerformOps(&meta_ops_);
777     }
778 
Write(const ResponseType * resp,grpc::WriteOptions options)779     void Write(const ResponseType* resp, grpc::WriteOptions options) override {
780       this->Ref();
781       if (options.is_last_message()) {
782         options.set_buffer_hint();
783       }
784       if (!ctx_->sent_initial_metadata_) {
785         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
786                                        ctx_->initial_metadata_flags());
787         if (ctx_->compression_level_set()) {
788           write_ops_.set_compression_level(ctx_->compression_level());
789         }
790         ctx_->sent_initial_metadata_ = true;
791       }
792       // TODO(vjpai): don't assert
793       ABSL_CHECK(write_ops_.SendMessagePtr(resp, options).ok());
794       call_.PerformOps(&write_ops_);
795     }
796 
WriteAndFinish(const ResponseType * resp,grpc::WriteOptions options,grpc::Status s)797     void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
798                         grpc::Status s) override {
799       // TODO(vjpai): don't assert
800       ABSL_CHECK(finish_ops_.SendMessagePtr(resp, options).ok());
801       Finish(std::move(s));
802     }
803 
Read(RequestType * req)804     void Read(RequestType* req) override {
805       this->Ref();
806       read_ops_.RecvMessage(req);
807       call_.PerformOps(&read_ops_);
808     }
809 
810    private:
811     friend class CallbackBidiHandler<RequestType, ResponseType>;
812 
ServerCallbackReaderWriterImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,std::function<void ()> call_requester)813     ServerCallbackReaderWriterImpl(grpc::CallbackServerContext* ctx,
814                                    grpc::internal::Call* call,
815                                    std::function<void()> call_requester)
816         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
817 
call()818     grpc_call* call() override { return call_.call(); }
819 
SetupReactor(ServerBidiReactor<RequestType,ResponseType> * reactor)820     void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
821       reactor_.store(reactor, std::memory_order_relaxed);
822       // The callbacks for these functions should not be inlined because they
823       // invoke user-controlled reactions, but any resulting OnDones can be
824       // inlined in the executor to which a callback is dispatched.
825       write_tag_.Set(
826           call_.call(),
827           [this, reactor](bool ok) {
828             reactor->OnWriteDone(ok);
829             this->MaybeDone(/*inlineable_ondone=*/true);
830           },
831           &write_ops_, /*can_inline=*/false);
832       write_ops_.set_core_cq_tag(&write_tag_);
833       read_tag_.Set(
834           call_.call(),
835           [this, reactor](bool ok) {
836             if (GPR_UNLIKELY(!ok)) {
837               ctx_->MaybeMarkCancelledOnRead();
838             }
839             reactor->OnReadDone(ok);
840             this->MaybeDone(/*inlineable_ondone=*/true);
841           },
842           &read_ops_, /*can_inline=*/false);
843       read_ops_.set_core_cq_tag(&read_tag_);
844       this->BindReactor(reactor);
845       this->MaybeCallOnCancel(reactor);
846       // Inlineable OnDone can be false here because there is no bidi
847       // reactor that has an inlineable OnDone; this only applies to the
848       // DefaultReactor (which is unary).
849       this->MaybeDone(/*inlineable_ondone=*/false);
850     }
851 
CallOnDone()852     void CallOnDone() override {
853       reactor_.load(std::memory_order_relaxed)->OnDone();
854       grpc_call* call = call_.call();
855       auto call_requester = std::move(call_requester_);
856       if (ctx_->context_allocator() != nullptr) {
857         ctx_->context_allocator()->Release(ctx_);
858       }
859       this->~ServerCallbackReaderWriterImpl();  // explicitly call destructor
860       grpc_call_unref(call);
861       call_requester();
862     }
863 
reactor()864     ServerReactor* reactor() override {
865       return reactor_.load(std::memory_order_relaxed);
866     }
867 
868     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
869         meta_ops_;
870     grpc::internal::CallbackWithSuccessTag meta_tag_;
871     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
872                               grpc::internal::CallOpSendMessage,
873                               grpc::internal::CallOpServerSendStatus>
874         finish_ops_;
875     grpc::internal::CallbackWithSuccessTag finish_tag_;
876     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
877                               grpc::internal::CallOpSendMessage>
878         write_ops_;
879     grpc::internal::CallbackWithSuccessTag write_tag_;
880     grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<RequestType>>
881         read_ops_;
882     grpc::internal::CallbackWithSuccessTag read_tag_;
883 
884     grpc::CallbackServerContext* const ctx_;
885     grpc::internal::Call call_;
886     std::function<void()> call_requester_;
887     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
888     std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
889     // callbacks_outstanding_ follows a refcount pattern
890     std::atomic<intptr_t> callbacks_outstanding_{
891         3};  // reserve for OnStarted, Finish, and CompletionOp
892   };
893 };
894 
895 }  // namespace internal
896 }  // namespace grpc
897 
898 #endif  // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
899