• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2019 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 
18 #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
19 #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
20 
21 #include <grpc/grpc.h>
22 #include <grpc/support/log.h>
23 #include <grpcpp/impl/rpc_service_method.h>
24 #include <grpcpp/server_context.h>
25 #include <grpcpp/support/message_allocator.h>
26 #include <grpcpp/support/server_callback.h>
27 #include <grpcpp/support/status.h>
28 
29 namespace grpc {
30 namespace internal {
31 
32 template <class RequestType, class ResponseType>
33 class CallbackUnaryHandler : public grpc::internal::MethodHandler {
34  public:
CallbackUnaryHandler(std::function<ServerUnaryReactor * (grpc::CallbackServerContext *,const RequestType *,ResponseType *)> get_reactor)35   explicit CallbackUnaryHandler(
36       std::function<ServerUnaryReactor*(grpc::CallbackServerContext*,
37                                         const RequestType*, ResponseType*)>
38           get_reactor)
39       : get_reactor_(std::move(get_reactor)) {}
40 
SetMessageAllocator(MessageAllocator<RequestType,ResponseType> * allocator)41   void SetMessageAllocator(
42       MessageAllocator<RequestType, ResponseType>* allocator) {
43     allocator_ = allocator;
44   }
45 
RunHandler(const HandlerParameter & param)46   void RunHandler(const HandlerParameter& param) final {
47     // Arena allocate a controller structure (that includes request/response)
48     grpc_call_ref(param.call->call());
49     auto* allocator_state =
50         static_cast<MessageHolder<RequestType, ResponseType>*>(
51             param.internal_data);
52 
53     auto* call = new (grpc_call_arena_alloc(param.call->call(),
54                                             sizeof(ServerCallbackUnaryImpl)))
55         ServerCallbackUnaryImpl(
56             static_cast<grpc::CallbackServerContext*>(param.server_context),
57             param.call, allocator_state, param.call_requester);
58     param.server_context->BeginCompletionOp(
59         param.call, [call](bool) { call->MaybeDone(); }, call);
60 
61     ServerUnaryReactor* reactor = nullptr;
62     if (param.status.ok()) {
63       reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>(
64           get_reactor_,
65           static_cast<grpc::CallbackServerContext*>(param.server_context),
66           call->request(), call->response());
67     }
68 
69     if (reactor == nullptr) {
70       // if deserialization or reactor creator failed, we need to fail the call
71       reactor = new (grpc_call_arena_alloc(param.call->call(),
72                                            sizeof(UnimplementedUnaryReactor)))
73           UnimplementedUnaryReactor(
74               grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
75     }
76 
77     /// Invoke SetupReactor as the last part of the handler
78     call->SetupReactor(reactor);
79   }
80 
Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void ** handler_data)81   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
82                     grpc::Status* status, void** handler_data) final {
83     grpc::ByteBuffer buf;
84     buf.set_buffer(req);
85     RequestType* request = nullptr;
86     MessageHolder<RequestType, ResponseType>* allocator_state;
87     if (allocator_ != nullptr) {
88       allocator_state = allocator_->AllocateMessages();
89     } else {
90       allocator_state = new (grpc_call_arena_alloc(
91           call, sizeof(DefaultMessageHolder<RequestType, ResponseType>)))
92           DefaultMessageHolder<RequestType, ResponseType>();
93     }
94     *handler_data = allocator_state;
95     request = allocator_state->request();
96     *status =
97         grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
98     buf.Release();
99     if (status->ok()) {
100       return request;
101     }
102     return nullptr;
103   }
104 
105  private:
106   std::function<ServerUnaryReactor*(grpc::CallbackServerContext*,
107                                     const RequestType*, ResponseType*)>
108       get_reactor_;
109   MessageAllocator<RequestType, ResponseType>* allocator_ = nullptr;
110 
111   class ServerCallbackUnaryImpl : public ServerCallbackUnary {
112    public:
Finish(grpc::Status s)113     void Finish(grpc::Status s) override {
114       // A callback that only contains a call to MaybeDone can be run as an
115       // inline callback regardless of whether or not OnDone is inlineable
116       // because if the actual OnDone callback needs to be scheduled, MaybeDone
117       // is responsible for dispatching to an executor thread if needed. Thus,
118       // when setting up the finish_tag_, we can set its own callback to
119       // inlineable.
120       finish_tag_.Set(
121           call_.call(),
122           [this](bool) {
123             this->MaybeDone(
124                 reactor_.load(std::memory_order_relaxed)->InternalInlineable());
125           },
126           &finish_ops_, /*can_inline=*/true);
127       finish_ops_.set_core_cq_tag(&finish_tag_);
128 
129       if (!ctx_->sent_initial_metadata_) {
130         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
131                                         ctx_->initial_metadata_flags());
132         if (ctx_->compression_level_set()) {
133           finish_ops_.set_compression_level(ctx_->compression_level());
134         }
135         ctx_->sent_initial_metadata_ = true;
136       }
137       // The response is dropped if the status is not OK.
138       if (s.ok()) {
139         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
140                                      finish_ops_.SendMessagePtr(response()));
141       } else {
142         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
143       }
144       finish_ops_.set_core_cq_tag(&finish_tag_);
145       call_.PerformOps(&finish_ops_);
146     }
147 
SendInitialMetadata()148     void SendInitialMetadata() override {
149       GPR_ASSERT(!ctx_->sent_initial_metadata_);
150       this->Ref();
151       // The callback for this function should not be marked inline because it
152       // is directly invoking a user-controlled reaction
153       // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor
154       // thread. However, any OnDone needed after that can be inlined because it
155       // is already running on an executor thread.
156       meta_tag_.Set(
157           call_.call(),
158           [this](bool ok) {
159             ServerUnaryReactor* reactor =
160                 reactor_.load(std::memory_order_relaxed);
161             reactor->OnSendInitialMetadataDone(ok);
162             this->MaybeDone(/*inlineable_ondone=*/true);
163           },
164           &meta_ops_, /*can_inline=*/false);
165       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
166                                     ctx_->initial_metadata_flags());
167       if (ctx_->compression_level_set()) {
168         meta_ops_.set_compression_level(ctx_->compression_level());
169       }
170       ctx_->sent_initial_metadata_ = true;
171       meta_ops_.set_core_cq_tag(&meta_tag_);
172       call_.PerformOps(&meta_ops_);
173     }
174 
175    private:
176     friend class CallbackUnaryHandler<RequestType, ResponseType>;
177 
ServerCallbackUnaryImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,MessageHolder<RequestType,ResponseType> * allocator_state,std::function<void ()> call_requester)178     ServerCallbackUnaryImpl(
179         grpc::CallbackServerContext* ctx, grpc::internal::Call* call,
180         MessageHolder<RequestType, ResponseType>* allocator_state,
181         std::function<void()> call_requester)
182         : ctx_(ctx),
183           call_(*call),
184           allocator_state_(allocator_state),
185           call_requester_(std::move(call_requester)) {
186       ctx_->set_message_allocator_state(allocator_state);
187     }
188 
189     /// SetupReactor binds the reactor (which also releases any queued
190     /// operations), maybe calls OnCancel if possible/needed, and maybe marks
191     /// the completion of the RPC. This should be the last component of the
192     /// handler.
SetupReactor(ServerUnaryReactor * reactor)193     void SetupReactor(ServerUnaryReactor* reactor) {
194       reactor_.store(reactor, std::memory_order_relaxed);
195       this->BindReactor(reactor);
196       this->MaybeCallOnCancel(reactor);
197       this->MaybeDone(reactor->InternalInlineable());
198     }
199 
request()200     const RequestType* request() { return allocator_state_->request(); }
response()201     ResponseType* response() { return allocator_state_->response(); }
202 
CallOnDone()203     void CallOnDone() override {
204       reactor_.load(std::memory_order_relaxed)->OnDone();
205       grpc_call* call = call_.call();
206       auto call_requester = std::move(call_requester_);
207       allocator_state_->Release();
208       if (ctx_->context_allocator() != nullptr) {
209         ctx_->context_allocator()->Release(ctx_);
210       }
211       this->~ServerCallbackUnaryImpl();  // explicitly call destructor
212       grpc_call_unref(call);
213       call_requester();
214     }
215 
reactor()216     ServerReactor* reactor() override {
217       return reactor_.load(std::memory_order_relaxed);
218     }
219 
220     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
221         meta_ops_;
222     grpc::internal::CallbackWithSuccessTag meta_tag_;
223     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
224                               grpc::internal::CallOpSendMessage,
225                               grpc::internal::CallOpServerSendStatus>
226         finish_ops_;
227     grpc::internal::CallbackWithSuccessTag finish_tag_;
228 
229     grpc::CallbackServerContext* const ctx_;
230     grpc::internal::Call call_;
231     MessageHolder<RequestType, ResponseType>* const allocator_state_;
232     std::function<void()> call_requester_;
233     // reactor_ can always be loaded/stored with relaxed memory ordering because
234     // its value is only set once, independently of other data in the object,
235     // and the loads that use it will always actually come provably later even
236     // though they are from different threads since they are triggered by
237     // actions initiated only by the setting up of the reactor_ variable. In
238     // a sense, it's a delayed "const": it gets its value from the SetupReactor
239     // method (not the constructor, so it's not a true const), but it doesn't
240     // change after that and it only gets used by actions caused, directly or
241     // indirectly, by that setup. This comment also applies to the reactor_
242     // variables of the other streaming objects in this file.
243     std::atomic<ServerUnaryReactor*> reactor_;
244     // callbacks_outstanding_ follows a refcount pattern
245     std::atomic<intptr_t> callbacks_outstanding_{
246         3};  // reserve for start, Finish, and CompletionOp
247   };
248 };
249 
250 template <class RequestType, class ResponseType>
251 class CallbackClientStreamingHandler : public grpc::internal::MethodHandler {
252  public:
CallbackClientStreamingHandler(std::function<ServerReadReactor<RequestType> * (grpc::CallbackServerContext *,ResponseType *)> get_reactor)253   explicit CallbackClientStreamingHandler(
254       std::function<ServerReadReactor<RequestType>*(
255           grpc::CallbackServerContext*, ResponseType*)>
256           get_reactor)
257       : get_reactor_(std::move(get_reactor)) {}
RunHandler(const HandlerParameter & param)258   void RunHandler(const HandlerParameter& param) final {
259     // Arena allocate a reader structure (that includes response)
260     grpc_call_ref(param.call->call());
261 
262     auto* reader = new (grpc_call_arena_alloc(param.call->call(),
263                                               sizeof(ServerCallbackReaderImpl)))
264         ServerCallbackReaderImpl(
265             static_cast<grpc::CallbackServerContext*>(param.server_context),
266             param.call, param.call_requester);
267     // Inlineable OnDone can be false in the CompletionOp callback because there
268     // is no read reactor that has an inlineable OnDone; this only applies to
269     // the DefaultReactor (which is unary).
270     param.server_context->BeginCompletionOp(
271         param.call,
272         [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); },
273         reader);
274 
275     ServerReadReactor<RequestType>* reactor = nullptr;
276     if (param.status.ok()) {
277       reactor =
278           grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>(
279               get_reactor_,
280               static_cast<grpc::CallbackServerContext*>(param.server_context),
281               reader->response());
282     }
283 
284     if (reactor == nullptr) {
285       // if deserialization or reactor creator failed, we need to fail the call
286       reactor = new (grpc_call_arena_alloc(
287           param.call->call(), sizeof(UnimplementedReadReactor<RequestType>)))
288           UnimplementedReadReactor<RequestType>(
289               grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
290     }
291 
292     reader->SetupReactor(reactor);
293   }
294 
295  private:
296   std::function<ServerReadReactor<RequestType>*(grpc::CallbackServerContext*,
297                                                 ResponseType*)>
298       get_reactor_;
299 
300   class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> {
301    public:
Finish(grpc::Status s)302     void Finish(grpc::Status s) override {
303       // A finish tag with only MaybeDone can have its callback inlined
304       // regardless even if OnDone is not inlineable because this callback just
305       // checks a ref and then decides whether or not to dispatch OnDone.
306       finish_tag_.Set(
307           call_.call(),
308           [this](bool) {
309             // Inlineable OnDone can be false here because there is
310             // no read reactor that has an inlineable OnDone; this
311             // only applies to the DefaultReactor (which is unary).
312             this->MaybeDone(/*inlineable_ondone=*/false);
313           },
314           &finish_ops_, /*can_inline=*/true);
315       if (!ctx_->sent_initial_metadata_) {
316         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
317                                         ctx_->initial_metadata_flags());
318         if (ctx_->compression_level_set()) {
319           finish_ops_.set_compression_level(ctx_->compression_level());
320         }
321         ctx_->sent_initial_metadata_ = true;
322       }
323       // The response is dropped if the status is not OK.
324       if (s.ok()) {
325         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_,
326                                      finish_ops_.SendMessagePtr(&resp_));
327       } else {
328         finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
329       }
330       finish_ops_.set_core_cq_tag(&finish_tag_);
331       call_.PerformOps(&finish_ops_);
332     }
333 
SendInitialMetadata()334     void SendInitialMetadata() override {
335       GPR_ASSERT(!ctx_->sent_initial_metadata_);
336       this->Ref();
337       // The callback for this function should not be inlined because it invokes
338       // a user-controlled reaction, but any resulting OnDone can be inlined in
339       // the executor to which this callback is dispatched.
340       meta_tag_.Set(
341           call_.call(),
342           [this](bool ok) {
343             ServerReadReactor<RequestType>* reactor =
344                 reactor_.load(std::memory_order_relaxed);
345             reactor->OnSendInitialMetadataDone(ok);
346             this->MaybeDone(/*inlineable_ondone=*/true);
347           },
348           &meta_ops_, /*can_inline=*/false);
349       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
350                                     ctx_->initial_metadata_flags());
351       if (ctx_->compression_level_set()) {
352         meta_ops_.set_compression_level(ctx_->compression_level());
353       }
354       ctx_->sent_initial_metadata_ = true;
355       meta_ops_.set_core_cq_tag(&meta_tag_);
356       call_.PerformOps(&meta_ops_);
357     }
358 
Read(RequestType * req)359     void Read(RequestType* req) override {
360       this->Ref();
361       read_ops_.RecvMessage(req);
362       call_.PerformOps(&read_ops_);
363     }
364 
365    private:
366     friend class CallbackClientStreamingHandler<RequestType, ResponseType>;
367 
ServerCallbackReaderImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,std::function<void ()> call_requester)368     ServerCallbackReaderImpl(grpc::CallbackServerContext* ctx,
369                              grpc::internal::Call* call,
370                              std::function<void()> call_requester)
371         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
372 
SetupReactor(ServerReadReactor<RequestType> * reactor)373     void SetupReactor(ServerReadReactor<RequestType>* reactor) {
374       reactor_.store(reactor, std::memory_order_relaxed);
375       // The callback for this function should not be inlined because it invokes
376       // a user-controlled reaction, but any resulting OnDone can be inlined in
377       // the executor to which this callback is dispatched.
378       read_tag_.Set(
379           call_.call(),
380           [this, reactor](bool ok) {
381             if (GPR_UNLIKELY(!ok)) {
382               ctx_->MaybeMarkCancelledOnRead();
383             }
384             reactor->OnReadDone(ok);
385             this->MaybeDone(/*inlineable_ondone=*/true);
386           },
387           &read_ops_, /*can_inline=*/false);
388       read_ops_.set_core_cq_tag(&read_tag_);
389       this->BindReactor(reactor);
390       this->MaybeCallOnCancel(reactor);
391       // Inlineable OnDone can be false here because there is no read
392       // reactor that has an inlineable OnDone; this only applies to the
393       // DefaultReactor (which is unary).
394       this->MaybeDone(/*inlineable_ondone=*/false);
395     }
396 
~ServerCallbackReaderImpl()397     ~ServerCallbackReaderImpl() {}
398 
response()399     ResponseType* response() { return &resp_; }
400 
CallOnDone()401     void CallOnDone() override {
402       reactor_.load(std::memory_order_relaxed)->OnDone();
403       grpc_call* call = call_.call();
404       auto call_requester = std::move(call_requester_);
405       if (ctx_->context_allocator() != nullptr) {
406         ctx_->context_allocator()->Release(ctx_);
407       }
408       this->~ServerCallbackReaderImpl();  // explicitly call destructor
409       grpc_call_unref(call);
410       call_requester();
411     }
412 
reactor()413     ServerReactor* reactor() override {
414       return reactor_.load(std::memory_order_relaxed);
415     }
416 
417     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
418         meta_ops_;
419     grpc::internal::CallbackWithSuccessTag meta_tag_;
420     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
421                               grpc::internal::CallOpSendMessage,
422                               grpc::internal::CallOpServerSendStatus>
423         finish_ops_;
424     grpc::internal::CallbackWithSuccessTag finish_tag_;
425     grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<RequestType>>
426         read_ops_;
427     grpc::internal::CallbackWithSuccessTag read_tag_;
428 
429     grpc::CallbackServerContext* const ctx_;
430     grpc::internal::Call call_;
431     ResponseType resp_;
432     std::function<void()> call_requester_;
433     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
434     std::atomic<ServerReadReactor<RequestType>*> reactor_;
435     // callbacks_outstanding_ follows a refcount pattern
436     std::atomic<intptr_t> callbacks_outstanding_{
437         3};  // reserve for OnStarted, Finish, and CompletionOp
438   };
439 };
440 
441 template <class RequestType, class ResponseType>
442 class CallbackServerStreamingHandler : public grpc::internal::MethodHandler {
443  public:
CallbackServerStreamingHandler(std::function<ServerWriteReactor<ResponseType> * (grpc::CallbackServerContext *,const RequestType *)> get_reactor)444   explicit CallbackServerStreamingHandler(
445       std::function<ServerWriteReactor<ResponseType>*(
446           grpc::CallbackServerContext*, const RequestType*)>
447           get_reactor)
448       : get_reactor_(std::move(get_reactor)) {}
RunHandler(const HandlerParameter & param)449   void RunHandler(const HandlerParameter& param) final {
450     // Arena allocate a writer structure
451     grpc_call_ref(param.call->call());
452 
453     auto* writer = new (grpc_call_arena_alloc(param.call->call(),
454                                               sizeof(ServerCallbackWriterImpl)))
455         ServerCallbackWriterImpl(
456             static_cast<grpc::CallbackServerContext*>(param.server_context),
457             param.call, static_cast<RequestType*>(param.request),
458             param.call_requester);
459     // Inlineable OnDone can be false in the CompletionOp callback because there
460     // is no write reactor that has an inlineable OnDone; this only applies to
461     // the DefaultReactor (which is unary).
462     param.server_context->BeginCompletionOp(
463         param.call,
464         [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); },
465         writer);
466 
467     ServerWriteReactor<ResponseType>* reactor = nullptr;
468     if (param.status.ok()) {
469       reactor = grpc::internal::CatchingReactorGetter<
470           ServerWriteReactor<ResponseType>>(
471           get_reactor_,
472           static_cast<grpc::CallbackServerContext*>(param.server_context),
473           writer->request());
474     }
475     if (reactor == nullptr) {
476       // if deserialization or reactor creator failed, we need to fail the call
477       reactor = new (grpc_call_arena_alloc(
478           param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>)))
479           UnimplementedWriteReactor<ResponseType>(
480               grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
481     }
482 
483     writer->SetupReactor(reactor);
484   }
485 
Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void **)486   void* Deserialize(grpc_call* call, grpc_byte_buffer* req,
487                     grpc::Status* status, void** /*handler_data*/) final {
488     grpc::ByteBuffer buf;
489     buf.set_buffer(req);
490     auto* request =
491         new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType();
492     *status =
493         grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
494     buf.Release();
495     if (status->ok()) {
496       return request;
497     }
498     request->~RequestType();
499     return nullptr;
500   }
501 
502  private:
503   std::function<ServerWriteReactor<ResponseType>*(grpc::CallbackServerContext*,
504                                                   const RequestType*)>
505       get_reactor_;
506 
507   class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> {
508    public:
Finish(grpc::Status s)509     void Finish(grpc::Status s) override {
510       // A finish tag with only MaybeDone can have its callback inlined
511       // regardless even if OnDone is not inlineable because this callback just
512       // checks a ref and then decides whether or not to dispatch OnDone.
513       finish_tag_.Set(
514           call_.call(),
515           [this](bool) {
516             // Inlineable OnDone can be false here because there is
517             // no write reactor that has an inlineable OnDone; this
518             // only applies to the DefaultReactor (which is unary).
519             this->MaybeDone(/*inlineable_ondone=*/false);
520           },
521           &finish_ops_, /*can_inline=*/true);
522       finish_ops_.set_core_cq_tag(&finish_tag_);
523 
524       if (!ctx_->sent_initial_metadata_) {
525         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
526                                         ctx_->initial_metadata_flags());
527         if (ctx_->compression_level_set()) {
528           finish_ops_.set_compression_level(ctx_->compression_level());
529         }
530         ctx_->sent_initial_metadata_ = true;
531       }
532       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
533       call_.PerformOps(&finish_ops_);
534     }
535 
SendInitialMetadata()536     void SendInitialMetadata() override {
537       GPR_ASSERT(!ctx_->sent_initial_metadata_);
538       this->Ref();
539       // The callback for this function should not be inlined because it invokes
540       // a user-controlled reaction, but any resulting OnDone can be inlined in
541       // the executor to which this callback is dispatched.
542       meta_tag_.Set(
543           call_.call(),
544           [this](bool ok) {
545             ServerWriteReactor<ResponseType>* reactor =
546                 reactor_.load(std::memory_order_relaxed);
547             reactor->OnSendInitialMetadataDone(ok);
548             this->MaybeDone(/*inlineable_ondone=*/true);
549           },
550           &meta_ops_, /*can_inline=*/false);
551       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
552                                     ctx_->initial_metadata_flags());
553       if (ctx_->compression_level_set()) {
554         meta_ops_.set_compression_level(ctx_->compression_level());
555       }
556       ctx_->sent_initial_metadata_ = true;
557       meta_ops_.set_core_cq_tag(&meta_tag_);
558       call_.PerformOps(&meta_ops_);
559     }
560 
Write(const ResponseType * resp,grpc::WriteOptions options)561     void Write(const ResponseType* resp, grpc::WriteOptions options) override {
562       this->Ref();
563       if (options.is_last_message()) {
564         options.set_buffer_hint();
565       }
566       if (!ctx_->sent_initial_metadata_) {
567         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
568                                        ctx_->initial_metadata_flags());
569         if (ctx_->compression_level_set()) {
570           write_ops_.set_compression_level(ctx_->compression_level());
571         }
572         ctx_->sent_initial_metadata_ = true;
573       }
574       // TODO(vjpai): don't assert
575       GPR_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
576       call_.PerformOps(&write_ops_);
577     }
578 
WriteAndFinish(const ResponseType * resp,grpc::WriteOptions options,grpc::Status s)579     void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
580                         grpc::Status s) override {
581       // This combines the write into the finish callback
582       // TODO(vjpai): don't assert
583       GPR_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
584       Finish(std::move(s));
585     }
586 
587    private:
588     friend class CallbackServerStreamingHandler<RequestType, ResponseType>;
589 
ServerCallbackWriterImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,const RequestType * req,std::function<void ()> call_requester)590     ServerCallbackWriterImpl(grpc::CallbackServerContext* ctx,
591                              grpc::internal::Call* call, const RequestType* req,
592                              std::function<void()> call_requester)
593         : ctx_(ctx),
594           call_(*call),
595           req_(req),
596           call_requester_(std::move(call_requester)) {}
597 
SetupReactor(ServerWriteReactor<ResponseType> * reactor)598     void SetupReactor(ServerWriteReactor<ResponseType>* reactor) {
599       reactor_.store(reactor, std::memory_order_relaxed);
600       // The callback for this function should not be inlined because it invokes
601       // a user-controlled reaction, but any resulting OnDone can be inlined in
602       // the executor to which this callback is dispatched.
603       write_tag_.Set(
604           call_.call(),
605           [this, reactor](bool ok) {
606             reactor->OnWriteDone(ok);
607             this->MaybeDone(/*inlineable_ondone=*/true);
608           },
609           &write_ops_, /*can_inline=*/false);
610       write_ops_.set_core_cq_tag(&write_tag_);
611       this->BindReactor(reactor);
612       this->MaybeCallOnCancel(reactor);
613       // Inlineable OnDone can be false here because there is no write
614       // reactor that has an inlineable OnDone; this only applies to the
615       // DefaultReactor (which is unary).
616       this->MaybeDone(/*inlineable_ondone=*/false);
617     }
~ServerCallbackWriterImpl()618     ~ServerCallbackWriterImpl() {
619       if (req_ != nullptr) {
620         req_->~RequestType();
621       }
622     }
623 
request()624     const RequestType* request() { return req_; }
625 
CallOnDone()626     void CallOnDone() override {
627       reactor_.load(std::memory_order_relaxed)->OnDone();
628       grpc_call* call = call_.call();
629       auto call_requester = std::move(call_requester_);
630       if (ctx_->context_allocator() != nullptr) {
631         ctx_->context_allocator()->Release(ctx_);
632       }
633       this->~ServerCallbackWriterImpl();  // explicitly call destructor
634       grpc_call_unref(call);
635       call_requester();
636     }
637 
reactor()638     ServerReactor* reactor() override {
639       return reactor_.load(std::memory_order_relaxed);
640     }
641 
642     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
643         meta_ops_;
644     grpc::internal::CallbackWithSuccessTag meta_tag_;
645     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
646                               grpc::internal::CallOpSendMessage,
647                               grpc::internal::CallOpServerSendStatus>
648         finish_ops_;
649     grpc::internal::CallbackWithSuccessTag finish_tag_;
650     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
651                               grpc::internal::CallOpSendMessage>
652         write_ops_;
653     grpc::internal::CallbackWithSuccessTag write_tag_;
654 
655     grpc::CallbackServerContext* const ctx_;
656     grpc::internal::Call call_;
657     const RequestType* req_;
658     std::function<void()> call_requester_;
659     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
660     std::atomic<ServerWriteReactor<ResponseType>*> reactor_;
661     // callbacks_outstanding_ follows a refcount pattern
662     std::atomic<intptr_t> callbacks_outstanding_{
663         3};  // reserve for OnStarted, Finish, and CompletionOp
664   };
665 };
666 
667 template <class RequestType, class ResponseType>
668 class CallbackBidiHandler : public grpc::internal::MethodHandler {
669  public:
CallbackBidiHandler(std::function<ServerBidiReactor<RequestType,ResponseType> * (grpc::CallbackServerContext *)> get_reactor)670   explicit CallbackBidiHandler(
671       std::function<ServerBidiReactor<RequestType, ResponseType>*(
672           grpc::CallbackServerContext*)>
673           get_reactor)
674       : get_reactor_(std::move(get_reactor)) {}
RunHandler(const HandlerParameter & param)675   void RunHandler(const HandlerParameter& param) final {
676     grpc_call_ref(param.call->call());
677 
678     auto* stream = new (grpc_call_arena_alloc(
679         param.call->call(), sizeof(ServerCallbackReaderWriterImpl)))
680         ServerCallbackReaderWriterImpl(
681             static_cast<grpc::CallbackServerContext*>(param.server_context),
682             param.call, param.call_requester);
683     // Inlineable OnDone can be false in the CompletionOp callback because there
684     // is no bidi reactor that has an inlineable OnDone; this only applies to
685     // the DefaultReactor (which is unary).
686     param.server_context->BeginCompletionOp(
687         param.call,
688         [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); },
689         stream);
690 
691     ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr;
692     if (param.status.ok()) {
693       reactor = grpc::internal::CatchingReactorGetter<
694           ServerBidiReactor<RequestType, ResponseType>>(
695           get_reactor_,
696           static_cast<grpc::CallbackServerContext*>(param.server_context));
697     }
698 
699     if (reactor == nullptr) {
700       // if deserialization or reactor creator failed, we need to fail the call
701       reactor = new (grpc_call_arena_alloc(
702           param.call->call(),
703           sizeof(UnimplementedBidiReactor<RequestType, ResponseType>)))
704           UnimplementedBidiReactor<RequestType, ResponseType>(
705               grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""));
706     }
707 
708     stream->SetupReactor(reactor);
709   }
710 
711  private:
712   std::function<ServerBidiReactor<RequestType, ResponseType>*(
713       grpc::CallbackServerContext*)>
714       get_reactor_;
715 
716   class ServerCallbackReaderWriterImpl
717       : public ServerCallbackReaderWriter<RequestType, ResponseType> {
718    public:
Finish(grpc::Status s)719     void Finish(grpc::Status s) override {
720       // A finish tag with only MaybeDone can have its callback inlined
721       // regardless even if OnDone is not inlineable because this callback just
722       // checks a ref and then decides whether or not to dispatch OnDone.
723       finish_tag_.Set(
724           call_.call(),
725           [this](bool) {
726             // Inlineable OnDone can be false here because there is
727             // no bidi reactor that has an inlineable OnDone; this
728             // only applies to the DefaultReactor (which is unary).
729             this->MaybeDone(/*inlineable_ondone=*/false);
730           },
731           &finish_ops_, /*can_inline=*/true);
732       finish_ops_.set_core_cq_tag(&finish_tag_);
733 
734       if (!ctx_->sent_initial_metadata_) {
735         finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
736                                         ctx_->initial_metadata_flags());
737         if (ctx_->compression_level_set()) {
738           finish_ops_.set_compression_level(ctx_->compression_level());
739         }
740         ctx_->sent_initial_metadata_ = true;
741       }
742       finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s);
743       call_.PerformOps(&finish_ops_);
744     }
745 
SendInitialMetadata()746     void SendInitialMetadata() override {
747       GPR_ASSERT(!ctx_->sent_initial_metadata_);
748       this->Ref();
749       // The callback for this function should not be inlined because it invokes
750       // a user-controlled reaction, but any resulting OnDone can be inlined in
751       // the executor to which this callback is dispatched.
752       meta_tag_.Set(
753           call_.call(),
754           [this](bool ok) {
755             ServerBidiReactor<RequestType, ResponseType>* reactor =
756                 reactor_.load(std::memory_order_relaxed);
757             reactor->OnSendInitialMetadataDone(ok);
758             this->MaybeDone(/*inlineable_ondone=*/true);
759           },
760           &meta_ops_, /*can_inline=*/false);
761       meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
762                                     ctx_->initial_metadata_flags());
763       if (ctx_->compression_level_set()) {
764         meta_ops_.set_compression_level(ctx_->compression_level());
765       }
766       ctx_->sent_initial_metadata_ = true;
767       meta_ops_.set_core_cq_tag(&meta_tag_);
768       call_.PerformOps(&meta_ops_);
769     }
770 
Write(const ResponseType * resp,grpc::WriteOptions options)771     void Write(const ResponseType* resp, grpc::WriteOptions options) override {
772       this->Ref();
773       if (options.is_last_message()) {
774         options.set_buffer_hint();
775       }
776       if (!ctx_->sent_initial_metadata_) {
777         write_ops_.SendInitialMetadata(&ctx_->initial_metadata_,
778                                        ctx_->initial_metadata_flags());
779         if (ctx_->compression_level_set()) {
780           write_ops_.set_compression_level(ctx_->compression_level());
781         }
782         ctx_->sent_initial_metadata_ = true;
783       }
784       // TODO(vjpai): don't assert
785       GPR_ASSERT(write_ops_.SendMessagePtr(resp, options).ok());
786       call_.PerformOps(&write_ops_);
787     }
788 
WriteAndFinish(const ResponseType * resp,grpc::WriteOptions options,grpc::Status s)789     void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options,
790                         grpc::Status s) override {
791       // TODO(vjpai): don't assert
792       GPR_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok());
793       Finish(std::move(s));
794     }
795 
Read(RequestType * req)796     void Read(RequestType* req) override {
797       this->Ref();
798       read_ops_.RecvMessage(req);
799       call_.PerformOps(&read_ops_);
800     }
801 
802    private:
803     friend class CallbackBidiHandler<RequestType, ResponseType>;
804 
ServerCallbackReaderWriterImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,std::function<void ()> call_requester)805     ServerCallbackReaderWriterImpl(grpc::CallbackServerContext* ctx,
806                                    grpc::internal::Call* call,
807                                    std::function<void()> call_requester)
808         : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {}
809 
SetupReactor(ServerBidiReactor<RequestType,ResponseType> * reactor)810     void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) {
811       reactor_.store(reactor, std::memory_order_relaxed);
812       // The callbacks for these functions should not be inlined because they
813       // invoke user-controlled reactions, but any resulting OnDones can be
814       // inlined in the executor to which a callback is dispatched.
815       write_tag_.Set(
816           call_.call(),
817           [this, reactor](bool ok) {
818             reactor->OnWriteDone(ok);
819             this->MaybeDone(/*inlineable_ondone=*/true);
820           },
821           &write_ops_, /*can_inline=*/false);
822       write_ops_.set_core_cq_tag(&write_tag_);
823       read_tag_.Set(
824           call_.call(),
825           [this, reactor](bool ok) {
826             if (GPR_UNLIKELY(!ok)) {
827               ctx_->MaybeMarkCancelledOnRead();
828             }
829             reactor->OnReadDone(ok);
830             this->MaybeDone(/*inlineable_ondone=*/true);
831           },
832           &read_ops_, /*can_inline=*/false);
833       read_ops_.set_core_cq_tag(&read_tag_);
834       this->BindReactor(reactor);
835       this->MaybeCallOnCancel(reactor);
836       // Inlineable OnDone can be false here because there is no bidi
837       // reactor that has an inlineable OnDone; this only applies to the
838       // DefaultReactor (which is unary).
839       this->MaybeDone(/*inlineable_ondone=*/false);
840     }
841 
CallOnDone()842     void CallOnDone() override {
843       reactor_.load(std::memory_order_relaxed)->OnDone();
844       grpc_call* call = call_.call();
845       auto call_requester = std::move(call_requester_);
846       if (ctx_->context_allocator() != nullptr) {
847         ctx_->context_allocator()->Release(ctx_);
848       }
849       this->~ServerCallbackReaderWriterImpl();  // explicitly call destructor
850       grpc_call_unref(call);
851       call_requester();
852     }
853 
reactor()854     ServerReactor* reactor() override {
855       return reactor_.load(std::memory_order_relaxed);
856     }
857 
858     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata>
859         meta_ops_;
860     grpc::internal::CallbackWithSuccessTag meta_tag_;
861     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
862                               grpc::internal::CallOpSendMessage,
863                               grpc::internal::CallOpServerSendStatus>
864         finish_ops_;
865     grpc::internal::CallbackWithSuccessTag finish_tag_;
866     grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata,
867                               grpc::internal::CallOpSendMessage>
868         write_ops_;
869     grpc::internal::CallbackWithSuccessTag write_tag_;
870     grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<RequestType>>
871         read_ops_;
872     grpc::internal::CallbackWithSuccessTag read_tag_;
873 
874     grpc::CallbackServerContext* const ctx_;
875     grpc::internal::Call call_;
876     std::function<void()> call_requester_;
877     // The memory ordering of reactor_ follows ServerCallbackUnaryImpl.
878     std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_;
879     // callbacks_outstanding_ follows a refcount pattern
880     std::atomic<intptr_t> callbacks_outstanding_{
881         3};  // reserve for OnStarted, Finish, and CompletionOp
882   };
883 };
884 
885 }  // namespace internal
886 }  // namespace grpc
887 
888 #endif  // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H
889