1 // 2 // 3 // Copyright 2019 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 18 #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H 19 #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H 20 21 #include <grpc/grpc.h> 22 #include <grpc/support/log.h> 23 #include <grpcpp/impl/rpc_service_method.h> 24 #include <grpcpp/server_context.h> 25 #include <grpcpp/support/message_allocator.h> 26 #include <grpcpp/support/server_callback.h> 27 #include <grpcpp/support/status.h> 28 29 namespace grpc { 30 namespace internal { 31 32 template <class RequestType, class ResponseType> 33 class CallbackUnaryHandler : public grpc::internal::MethodHandler { 34 public: CallbackUnaryHandler(std::function<ServerUnaryReactor * (grpc::CallbackServerContext *,const RequestType *,ResponseType *)> get_reactor)35 explicit CallbackUnaryHandler( 36 std::function<ServerUnaryReactor*(grpc::CallbackServerContext*, 37 const RequestType*, ResponseType*)> 38 get_reactor) 39 : get_reactor_(std::move(get_reactor)) {} 40 SetMessageAllocator(MessageAllocator<RequestType,ResponseType> * allocator)41 void SetMessageAllocator( 42 MessageAllocator<RequestType, ResponseType>* allocator) { 43 allocator_ = allocator; 44 } 45 RunHandler(const HandlerParameter & param)46 void RunHandler(const HandlerParameter& param) final { 47 // Arena allocate a controller structure (that includes request/response) 48 grpc_call_ref(param.call->call()); 49 auto* allocator_state = 50 static_cast<MessageHolder<RequestType, ResponseType>*>( 51 param.internal_data); 52 53 auto* call = new (grpc_call_arena_alloc(param.call->call(), 54 sizeof(ServerCallbackUnaryImpl))) 55 ServerCallbackUnaryImpl( 56 static_cast<grpc::CallbackServerContext*>(param.server_context), 57 param.call, allocator_state, param.call_requester); 58 param.server_context->BeginCompletionOp( 59 param.call, [call](bool) { call->MaybeDone(); }, call); 60 61 ServerUnaryReactor* reactor = nullptr; 62 if (param.status.ok()) { 63 reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>( 64 get_reactor_, 65 static_cast<grpc::CallbackServerContext*>(param.server_context), 66 call->request(), call->response()); 67 } 68 69 if (reactor == nullptr) { 70 // if deserialization or reactor creator failed, we need to fail the call 71 reactor = new (grpc_call_arena_alloc(param.call->call(), 72 sizeof(UnimplementedUnaryReactor))) 73 UnimplementedUnaryReactor( 74 grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "")); 75 } 76 77 /// Invoke SetupReactor as the last part of the handler 78 call->SetupReactor(reactor); 79 } 80 Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void ** handler_data)81 void* Deserialize(grpc_call* call, grpc_byte_buffer* req, 82 grpc::Status* status, void** handler_data) final { 83 grpc::ByteBuffer buf; 84 buf.set_buffer(req); 85 RequestType* request = nullptr; 86 MessageHolder<RequestType, ResponseType>* allocator_state; 87 if (allocator_ != nullptr) { 88 allocator_state = allocator_->AllocateMessages(); 89 } else { 90 allocator_state = new (grpc_call_arena_alloc( 91 call, sizeof(DefaultMessageHolder<RequestType, ResponseType>))) 92 DefaultMessageHolder<RequestType, ResponseType>(); 93 } 94 *handler_data = allocator_state; 95 request = allocator_state->request(); 96 *status = 97 grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); 98 buf.Release(); 99 if (status->ok()) { 100 return request; 101 } 102 return nullptr; 103 } 104 105 private: 106 std::function<ServerUnaryReactor*(grpc::CallbackServerContext*, 107 const RequestType*, ResponseType*)> 108 get_reactor_; 109 MessageAllocator<RequestType, ResponseType>* allocator_ = nullptr; 110 111 class ServerCallbackUnaryImpl : public ServerCallbackUnary { 112 public: Finish(grpc::Status s)113 void Finish(grpc::Status s) override { 114 // A callback that only contains a call to MaybeDone can be run as an 115 // inline callback regardless of whether or not OnDone is inlineable 116 // because if the actual OnDone callback needs to be scheduled, MaybeDone 117 // is responsible for dispatching to an executor thread if needed. Thus, 118 // when setting up the finish_tag_, we can set its own callback to 119 // inlineable. 120 finish_tag_.Set( 121 call_.call(), 122 [this](bool) { 123 this->MaybeDone( 124 reactor_.load(std::memory_order_relaxed)->InternalInlineable()); 125 }, 126 &finish_ops_, /*can_inline=*/true); 127 finish_ops_.set_core_cq_tag(&finish_tag_); 128 129 if (!ctx_->sent_initial_metadata_) { 130 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 131 ctx_->initial_metadata_flags()); 132 if (ctx_->compression_level_set()) { 133 finish_ops_.set_compression_level(ctx_->compression_level()); 134 } 135 ctx_->sent_initial_metadata_ = true; 136 } 137 // The response is dropped if the status is not OK. 138 if (s.ok()) { 139 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, 140 finish_ops_.SendMessagePtr(response())); 141 } else { 142 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 143 } 144 finish_ops_.set_core_cq_tag(&finish_tag_); 145 call_.PerformOps(&finish_ops_); 146 } 147 SendInitialMetadata()148 void SendInitialMetadata() override { 149 GPR_ASSERT(!ctx_->sent_initial_metadata_); 150 this->Ref(); 151 // The callback for this function should not be marked inline because it 152 // is directly invoking a user-controlled reaction 153 // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor 154 // thread. However, any OnDone needed after that can be inlined because it 155 // is already running on an executor thread. 156 meta_tag_.Set( 157 call_.call(), 158 [this](bool ok) { 159 ServerUnaryReactor* reactor = 160 reactor_.load(std::memory_order_relaxed); 161 reactor->OnSendInitialMetadataDone(ok); 162 this->MaybeDone(/*inlineable_ondone=*/true); 163 }, 164 &meta_ops_, /*can_inline=*/false); 165 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 166 ctx_->initial_metadata_flags()); 167 if (ctx_->compression_level_set()) { 168 meta_ops_.set_compression_level(ctx_->compression_level()); 169 } 170 ctx_->sent_initial_metadata_ = true; 171 meta_ops_.set_core_cq_tag(&meta_tag_); 172 call_.PerformOps(&meta_ops_); 173 } 174 175 private: 176 friend class CallbackUnaryHandler<RequestType, ResponseType>; 177 ServerCallbackUnaryImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,MessageHolder<RequestType,ResponseType> * allocator_state,std::function<void ()> call_requester)178 ServerCallbackUnaryImpl( 179 grpc::CallbackServerContext* ctx, grpc::internal::Call* call, 180 MessageHolder<RequestType, ResponseType>* allocator_state, 181 std::function<void()> call_requester) 182 : ctx_(ctx), 183 call_(*call), 184 allocator_state_(allocator_state), 185 call_requester_(std::move(call_requester)) { 186 ctx_->set_message_allocator_state(allocator_state); 187 } 188 189 /// SetupReactor binds the reactor (which also releases any queued 190 /// operations), maybe calls OnCancel if possible/needed, and maybe marks 191 /// the completion of the RPC. This should be the last component of the 192 /// handler. SetupReactor(ServerUnaryReactor * reactor)193 void SetupReactor(ServerUnaryReactor* reactor) { 194 reactor_.store(reactor, std::memory_order_relaxed); 195 this->BindReactor(reactor); 196 this->MaybeCallOnCancel(reactor); 197 this->MaybeDone(reactor->InternalInlineable()); 198 } 199 request()200 const RequestType* request() { return allocator_state_->request(); } response()201 ResponseType* response() { return allocator_state_->response(); } 202 CallOnDone()203 void CallOnDone() override { 204 reactor_.load(std::memory_order_relaxed)->OnDone(); 205 grpc_call* call = call_.call(); 206 auto call_requester = std::move(call_requester_); 207 allocator_state_->Release(); 208 if (ctx_->context_allocator() != nullptr) { 209 ctx_->context_allocator()->Release(ctx_); 210 } 211 this->~ServerCallbackUnaryImpl(); // explicitly call destructor 212 grpc_call_unref(call); 213 call_requester(); 214 } 215 reactor()216 ServerReactor* reactor() override { 217 return reactor_.load(std::memory_order_relaxed); 218 } 219 220 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 221 meta_ops_; 222 grpc::internal::CallbackWithSuccessTag meta_tag_; 223 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 224 grpc::internal::CallOpSendMessage, 225 grpc::internal::CallOpServerSendStatus> 226 finish_ops_; 227 grpc::internal::CallbackWithSuccessTag finish_tag_; 228 229 grpc::CallbackServerContext* const ctx_; 230 grpc::internal::Call call_; 231 MessageHolder<RequestType, ResponseType>* const allocator_state_; 232 std::function<void()> call_requester_; 233 // reactor_ can always be loaded/stored with relaxed memory ordering because 234 // its value is only set once, independently of other data in the object, 235 // and the loads that use it will always actually come provably later even 236 // though they are from different threads since they are triggered by 237 // actions initiated only by the setting up of the reactor_ variable. In 238 // a sense, it's a delayed "const": it gets its value from the SetupReactor 239 // method (not the constructor, so it's not a true const), but it doesn't 240 // change after that and it only gets used by actions caused, directly or 241 // indirectly, by that setup. This comment also applies to the reactor_ 242 // variables of the other streaming objects in this file. 243 std::atomic<ServerUnaryReactor*> reactor_; 244 // callbacks_outstanding_ follows a refcount pattern 245 std::atomic<intptr_t> callbacks_outstanding_{ 246 3}; // reserve for start, Finish, and CompletionOp 247 }; 248 }; 249 250 template <class RequestType, class ResponseType> 251 class CallbackClientStreamingHandler : public grpc::internal::MethodHandler { 252 public: CallbackClientStreamingHandler(std::function<ServerReadReactor<RequestType> * (grpc::CallbackServerContext *,ResponseType *)> get_reactor)253 explicit CallbackClientStreamingHandler( 254 std::function<ServerReadReactor<RequestType>*( 255 grpc::CallbackServerContext*, ResponseType*)> 256 get_reactor) 257 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)258 void RunHandler(const HandlerParameter& param) final { 259 // Arena allocate a reader structure (that includes response) 260 grpc_call_ref(param.call->call()); 261 262 auto* reader = new (grpc_call_arena_alloc(param.call->call(), 263 sizeof(ServerCallbackReaderImpl))) 264 ServerCallbackReaderImpl( 265 static_cast<grpc::CallbackServerContext*>(param.server_context), 266 param.call, param.call_requester); 267 // Inlineable OnDone can be false in the CompletionOp callback because there 268 // is no read reactor that has an inlineable OnDone; this only applies to 269 // the DefaultReactor (which is unary). 270 param.server_context->BeginCompletionOp( 271 param.call, 272 [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); }, 273 reader); 274 275 ServerReadReactor<RequestType>* reactor = nullptr; 276 if (param.status.ok()) { 277 reactor = 278 grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>( 279 get_reactor_, 280 static_cast<grpc::CallbackServerContext*>(param.server_context), 281 reader->response()); 282 } 283 284 if (reactor == nullptr) { 285 // if deserialization or reactor creator failed, we need to fail the call 286 reactor = new (grpc_call_arena_alloc( 287 param.call->call(), sizeof(UnimplementedReadReactor<RequestType>))) 288 UnimplementedReadReactor<RequestType>( 289 grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "")); 290 } 291 292 reader->SetupReactor(reactor); 293 } 294 295 private: 296 std::function<ServerReadReactor<RequestType>*(grpc::CallbackServerContext*, 297 ResponseType*)> 298 get_reactor_; 299 300 class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> { 301 public: Finish(grpc::Status s)302 void Finish(grpc::Status s) override { 303 // A finish tag with only MaybeDone can have its callback inlined 304 // regardless even if OnDone is not inlineable because this callback just 305 // checks a ref and then decides whether or not to dispatch OnDone. 306 finish_tag_.Set( 307 call_.call(), 308 [this](bool) { 309 // Inlineable OnDone can be false here because there is 310 // no read reactor that has an inlineable OnDone; this 311 // only applies to the DefaultReactor (which is unary). 312 this->MaybeDone(/*inlineable_ondone=*/false); 313 }, 314 &finish_ops_, /*can_inline=*/true); 315 if (!ctx_->sent_initial_metadata_) { 316 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 317 ctx_->initial_metadata_flags()); 318 if (ctx_->compression_level_set()) { 319 finish_ops_.set_compression_level(ctx_->compression_level()); 320 } 321 ctx_->sent_initial_metadata_ = true; 322 } 323 // The response is dropped if the status is not OK. 324 if (s.ok()) { 325 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, 326 finish_ops_.SendMessagePtr(&resp_)); 327 } else { 328 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 329 } 330 finish_ops_.set_core_cq_tag(&finish_tag_); 331 call_.PerformOps(&finish_ops_); 332 } 333 SendInitialMetadata()334 void SendInitialMetadata() override { 335 GPR_ASSERT(!ctx_->sent_initial_metadata_); 336 this->Ref(); 337 // The callback for this function should not be inlined because it invokes 338 // a user-controlled reaction, but any resulting OnDone can be inlined in 339 // the executor to which this callback is dispatched. 340 meta_tag_.Set( 341 call_.call(), 342 [this](bool ok) { 343 ServerReadReactor<RequestType>* reactor = 344 reactor_.load(std::memory_order_relaxed); 345 reactor->OnSendInitialMetadataDone(ok); 346 this->MaybeDone(/*inlineable_ondone=*/true); 347 }, 348 &meta_ops_, /*can_inline=*/false); 349 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 350 ctx_->initial_metadata_flags()); 351 if (ctx_->compression_level_set()) { 352 meta_ops_.set_compression_level(ctx_->compression_level()); 353 } 354 ctx_->sent_initial_metadata_ = true; 355 meta_ops_.set_core_cq_tag(&meta_tag_); 356 call_.PerformOps(&meta_ops_); 357 } 358 Read(RequestType * req)359 void Read(RequestType* req) override { 360 this->Ref(); 361 read_ops_.RecvMessage(req); 362 call_.PerformOps(&read_ops_); 363 } 364 365 private: 366 friend class CallbackClientStreamingHandler<RequestType, ResponseType>; 367 ServerCallbackReaderImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,std::function<void ()> call_requester)368 ServerCallbackReaderImpl(grpc::CallbackServerContext* ctx, 369 grpc::internal::Call* call, 370 std::function<void()> call_requester) 371 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} 372 SetupReactor(ServerReadReactor<RequestType> * reactor)373 void SetupReactor(ServerReadReactor<RequestType>* reactor) { 374 reactor_.store(reactor, std::memory_order_relaxed); 375 // The callback for this function should not be inlined because it invokes 376 // a user-controlled reaction, but any resulting OnDone can be inlined in 377 // the executor to which this callback is dispatched. 378 read_tag_.Set( 379 call_.call(), 380 [this, reactor](bool ok) { 381 if (GPR_UNLIKELY(!ok)) { 382 ctx_->MaybeMarkCancelledOnRead(); 383 } 384 reactor->OnReadDone(ok); 385 this->MaybeDone(/*inlineable_ondone=*/true); 386 }, 387 &read_ops_, /*can_inline=*/false); 388 read_ops_.set_core_cq_tag(&read_tag_); 389 this->BindReactor(reactor); 390 this->MaybeCallOnCancel(reactor); 391 // Inlineable OnDone can be false here because there is no read 392 // reactor that has an inlineable OnDone; this only applies to the 393 // DefaultReactor (which is unary). 394 this->MaybeDone(/*inlineable_ondone=*/false); 395 } 396 ~ServerCallbackReaderImpl()397 ~ServerCallbackReaderImpl() {} 398 response()399 ResponseType* response() { return &resp_; } 400 CallOnDone()401 void CallOnDone() override { 402 reactor_.load(std::memory_order_relaxed)->OnDone(); 403 grpc_call* call = call_.call(); 404 auto call_requester = std::move(call_requester_); 405 if (ctx_->context_allocator() != nullptr) { 406 ctx_->context_allocator()->Release(ctx_); 407 } 408 this->~ServerCallbackReaderImpl(); // explicitly call destructor 409 grpc_call_unref(call); 410 call_requester(); 411 } 412 reactor()413 ServerReactor* reactor() override { 414 return reactor_.load(std::memory_order_relaxed); 415 } 416 417 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 418 meta_ops_; 419 grpc::internal::CallbackWithSuccessTag meta_tag_; 420 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 421 grpc::internal::CallOpSendMessage, 422 grpc::internal::CallOpServerSendStatus> 423 finish_ops_; 424 grpc::internal::CallbackWithSuccessTag finish_tag_; 425 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<RequestType>> 426 read_ops_; 427 grpc::internal::CallbackWithSuccessTag read_tag_; 428 429 grpc::CallbackServerContext* const ctx_; 430 grpc::internal::Call call_; 431 ResponseType resp_; 432 std::function<void()> call_requester_; 433 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 434 std::atomic<ServerReadReactor<RequestType>*> reactor_; 435 // callbacks_outstanding_ follows a refcount pattern 436 std::atomic<intptr_t> callbacks_outstanding_{ 437 3}; // reserve for OnStarted, Finish, and CompletionOp 438 }; 439 }; 440 441 template <class RequestType, class ResponseType> 442 class CallbackServerStreamingHandler : public grpc::internal::MethodHandler { 443 public: CallbackServerStreamingHandler(std::function<ServerWriteReactor<ResponseType> * (grpc::CallbackServerContext *,const RequestType *)> get_reactor)444 explicit CallbackServerStreamingHandler( 445 std::function<ServerWriteReactor<ResponseType>*( 446 grpc::CallbackServerContext*, const RequestType*)> 447 get_reactor) 448 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)449 void RunHandler(const HandlerParameter& param) final { 450 // Arena allocate a writer structure 451 grpc_call_ref(param.call->call()); 452 453 auto* writer = new (grpc_call_arena_alloc(param.call->call(), 454 sizeof(ServerCallbackWriterImpl))) 455 ServerCallbackWriterImpl( 456 static_cast<grpc::CallbackServerContext*>(param.server_context), 457 param.call, static_cast<RequestType*>(param.request), 458 param.call_requester); 459 // Inlineable OnDone can be false in the CompletionOp callback because there 460 // is no write reactor that has an inlineable OnDone; this only applies to 461 // the DefaultReactor (which is unary). 462 param.server_context->BeginCompletionOp( 463 param.call, 464 [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); }, 465 writer); 466 467 ServerWriteReactor<ResponseType>* reactor = nullptr; 468 if (param.status.ok()) { 469 reactor = grpc::internal::CatchingReactorGetter< 470 ServerWriteReactor<ResponseType>>( 471 get_reactor_, 472 static_cast<grpc::CallbackServerContext*>(param.server_context), 473 writer->request()); 474 } 475 if (reactor == nullptr) { 476 // if deserialization or reactor creator failed, we need to fail the call 477 reactor = new (grpc_call_arena_alloc( 478 param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>))) 479 UnimplementedWriteReactor<ResponseType>( 480 grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "")); 481 } 482 483 writer->SetupReactor(reactor); 484 } 485 Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void **)486 void* Deserialize(grpc_call* call, grpc_byte_buffer* req, 487 grpc::Status* status, void** /*handler_data*/) final { 488 grpc::ByteBuffer buf; 489 buf.set_buffer(req); 490 auto* request = 491 new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType(); 492 *status = 493 grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); 494 buf.Release(); 495 if (status->ok()) { 496 return request; 497 } 498 request->~RequestType(); 499 return nullptr; 500 } 501 502 private: 503 std::function<ServerWriteReactor<ResponseType>*(grpc::CallbackServerContext*, 504 const RequestType*)> 505 get_reactor_; 506 507 class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> { 508 public: Finish(grpc::Status s)509 void Finish(grpc::Status s) override { 510 // A finish tag with only MaybeDone can have its callback inlined 511 // regardless even if OnDone is not inlineable because this callback just 512 // checks a ref and then decides whether or not to dispatch OnDone. 513 finish_tag_.Set( 514 call_.call(), 515 [this](bool) { 516 // Inlineable OnDone can be false here because there is 517 // no write reactor that has an inlineable OnDone; this 518 // only applies to the DefaultReactor (which is unary). 519 this->MaybeDone(/*inlineable_ondone=*/false); 520 }, 521 &finish_ops_, /*can_inline=*/true); 522 finish_ops_.set_core_cq_tag(&finish_tag_); 523 524 if (!ctx_->sent_initial_metadata_) { 525 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 526 ctx_->initial_metadata_flags()); 527 if (ctx_->compression_level_set()) { 528 finish_ops_.set_compression_level(ctx_->compression_level()); 529 } 530 ctx_->sent_initial_metadata_ = true; 531 } 532 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 533 call_.PerformOps(&finish_ops_); 534 } 535 SendInitialMetadata()536 void SendInitialMetadata() override { 537 GPR_ASSERT(!ctx_->sent_initial_metadata_); 538 this->Ref(); 539 // The callback for this function should not be inlined because it invokes 540 // a user-controlled reaction, but any resulting OnDone can be inlined in 541 // the executor to which this callback is dispatched. 542 meta_tag_.Set( 543 call_.call(), 544 [this](bool ok) { 545 ServerWriteReactor<ResponseType>* reactor = 546 reactor_.load(std::memory_order_relaxed); 547 reactor->OnSendInitialMetadataDone(ok); 548 this->MaybeDone(/*inlineable_ondone=*/true); 549 }, 550 &meta_ops_, /*can_inline=*/false); 551 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 552 ctx_->initial_metadata_flags()); 553 if (ctx_->compression_level_set()) { 554 meta_ops_.set_compression_level(ctx_->compression_level()); 555 } 556 ctx_->sent_initial_metadata_ = true; 557 meta_ops_.set_core_cq_tag(&meta_tag_); 558 call_.PerformOps(&meta_ops_); 559 } 560 Write(const ResponseType * resp,grpc::WriteOptions options)561 void Write(const ResponseType* resp, grpc::WriteOptions options) override { 562 this->Ref(); 563 if (options.is_last_message()) { 564 options.set_buffer_hint(); 565 } 566 if (!ctx_->sent_initial_metadata_) { 567 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 568 ctx_->initial_metadata_flags()); 569 if (ctx_->compression_level_set()) { 570 write_ops_.set_compression_level(ctx_->compression_level()); 571 } 572 ctx_->sent_initial_metadata_ = true; 573 } 574 // TODO(vjpai): don't assert 575 GPR_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); 576 call_.PerformOps(&write_ops_); 577 } 578 WriteAndFinish(const ResponseType * resp,grpc::WriteOptions options,grpc::Status s)579 void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options, 580 grpc::Status s) override { 581 // This combines the write into the finish callback 582 // TODO(vjpai): don't assert 583 GPR_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); 584 Finish(std::move(s)); 585 } 586 587 private: 588 friend class CallbackServerStreamingHandler<RequestType, ResponseType>; 589 ServerCallbackWriterImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,const RequestType * req,std::function<void ()> call_requester)590 ServerCallbackWriterImpl(grpc::CallbackServerContext* ctx, 591 grpc::internal::Call* call, const RequestType* req, 592 std::function<void()> call_requester) 593 : ctx_(ctx), 594 call_(*call), 595 req_(req), 596 call_requester_(std::move(call_requester)) {} 597 SetupReactor(ServerWriteReactor<ResponseType> * reactor)598 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) { 599 reactor_.store(reactor, std::memory_order_relaxed); 600 // The callback for this function should not be inlined because it invokes 601 // a user-controlled reaction, but any resulting OnDone can be inlined in 602 // the executor to which this callback is dispatched. 603 write_tag_.Set( 604 call_.call(), 605 [this, reactor](bool ok) { 606 reactor->OnWriteDone(ok); 607 this->MaybeDone(/*inlineable_ondone=*/true); 608 }, 609 &write_ops_, /*can_inline=*/false); 610 write_ops_.set_core_cq_tag(&write_tag_); 611 this->BindReactor(reactor); 612 this->MaybeCallOnCancel(reactor); 613 // Inlineable OnDone can be false here because there is no write 614 // reactor that has an inlineable OnDone; this only applies to the 615 // DefaultReactor (which is unary). 616 this->MaybeDone(/*inlineable_ondone=*/false); 617 } ~ServerCallbackWriterImpl()618 ~ServerCallbackWriterImpl() { 619 if (req_ != nullptr) { 620 req_->~RequestType(); 621 } 622 } 623 request()624 const RequestType* request() { return req_; } 625 CallOnDone()626 void CallOnDone() override { 627 reactor_.load(std::memory_order_relaxed)->OnDone(); 628 grpc_call* call = call_.call(); 629 auto call_requester = std::move(call_requester_); 630 if (ctx_->context_allocator() != nullptr) { 631 ctx_->context_allocator()->Release(ctx_); 632 } 633 this->~ServerCallbackWriterImpl(); // explicitly call destructor 634 grpc_call_unref(call); 635 call_requester(); 636 } 637 reactor()638 ServerReactor* reactor() override { 639 return reactor_.load(std::memory_order_relaxed); 640 } 641 642 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 643 meta_ops_; 644 grpc::internal::CallbackWithSuccessTag meta_tag_; 645 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 646 grpc::internal::CallOpSendMessage, 647 grpc::internal::CallOpServerSendStatus> 648 finish_ops_; 649 grpc::internal::CallbackWithSuccessTag finish_tag_; 650 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 651 grpc::internal::CallOpSendMessage> 652 write_ops_; 653 grpc::internal::CallbackWithSuccessTag write_tag_; 654 655 grpc::CallbackServerContext* const ctx_; 656 grpc::internal::Call call_; 657 const RequestType* req_; 658 std::function<void()> call_requester_; 659 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 660 std::atomic<ServerWriteReactor<ResponseType>*> reactor_; 661 // callbacks_outstanding_ follows a refcount pattern 662 std::atomic<intptr_t> callbacks_outstanding_{ 663 3}; // reserve for OnStarted, Finish, and CompletionOp 664 }; 665 }; 666 667 template <class RequestType, class ResponseType> 668 class CallbackBidiHandler : public grpc::internal::MethodHandler { 669 public: CallbackBidiHandler(std::function<ServerBidiReactor<RequestType,ResponseType> * (grpc::CallbackServerContext *)> get_reactor)670 explicit CallbackBidiHandler( 671 std::function<ServerBidiReactor<RequestType, ResponseType>*( 672 grpc::CallbackServerContext*)> 673 get_reactor) 674 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)675 void RunHandler(const HandlerParameter& param) final { 676 grpc_call_ref(param.call->call()); 677 678 auto* stream = new (grpc_call_arena_alloc( 679 param.call->call(), sizeof(ServerCallbackReaderWriterImpl))) 680 ServerCallbackReaderWriterImpl( 681 static_cast<grpc::CallbackServerContext*>(param.server_context), 682 param.call, param.call_requester); 683 // Inlineable OnDone can be false in the CompletionOp callback because there 684 // is no bidi reactor that has an inlineable OnDone; this only applies to 685 // the DefaultReactor (which is unary). 686 param.server_context->BeginCompletionOp( 687 param.call, 688 [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); }, 689 stream); 690 691 ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr; 692 if (param.status.ok()) { 693 reactor = grpc::internal::CatchingReactorGetter< 694 ServerBidiReactor<RequestType, ResponseType>>( 695 get_reactor_, 696 static_cast<grpc::CallbackServerContext*>(param.server_context)); 697 } 698 699 if (reactor == nullptr) { 700 // if deserialization or reactor creator failed, we need to fail the call 701 reactor = new (grpc_call_arena_alloc( 702 param.call->call(), 703 sizeof(UnimplementedBidiReactor<RequestType, ResponseType>))) 704 UnimplementedBidiReactor<RequestType, ResponseType>( 705 grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "")); 706 } 707 708 stream->SetupReactor(reactor); 709 } 710 711 private: 712 std::function<ServerBidiReactor<RequestType, ResponseType>*( 713 grpc::CallbackServerContext*)> 714 get_reactor_; 715 716 class ServerCallbackReaderWriterImpl 717 : public ServerCallbackReaderWriter<RequestType, ResponseType> { 718 public: Finish(grpc::Status s)719 void Finish(grpc::Status s) override { 720 // A finish tag with only MaybeDone can have its callback inlined 721 // regardless even if OnDone is not inlineable because this callback just 722 // checks a ref and then decides whether or not to dispatch OnDone. 723 finish_tag_.Set( 724 call_.call(), 725 [this](bool) { 726 // Inlineable OnDone can be false here because there is 727 // no bidi reactor that has an inlineable OnDone; this 728 // only applies to the DefaultReactor (which is unary). 729 this->MaybeDone(/*inlineable_ondone=*/false); 730 }, 731 &finish_ops_, /*can_inline=*/true); 732 finish_ops_.set_core_cq_tag(&finish_tag_); 733 734 if (!ctx_->sent_initial_metadata_) { 735 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 736 ctx_->initial_metadata_flags()); 737 if (ctx_->compression_level_set()) { 738 finish_ops_.set_compression_level(ctx_->compression_level()); 739 } 740 ctx_->sent_initial_metadata_ = true; 741 } 742 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 743 call_.PerformOps(&finish_ops_); 744 } 745 SendInitialMetadata()746 void SendInitialMetadata() override { 747 GPR_ASSERT(!ctx_->sent_initial_metadata_); 748 this->Ref(); 749 // The callback for this function should not be inlined because it invokes 750 // a user-controlled reaction, but any resulting OnDone can be inlined in 751 // the executor to which this callback is dispatched. 752 meta_tag_.Set( 753 call_.call(), 754 [this](bool ok) { 755 ServerBidiReactor<RequestType, ResponseType>* reactor = 756 reactor_.load(std::memory_order_relaxed); 757 reactor->OnSendInitialMetadataDone(ok); 758 this->MaybeDone(/*inlineable_ondone=*/true); 759 }, 760 &meta_ops_, /*can_inline=*/false); 761 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 762 ctx_->initial_metadata_flags()); 763 if (ctx_->compression_level_set()) { 764 meta_ops_.set_compression_level(ctx_->compression_level()); 765 } 766 ctx_->sent_initial_metadata_ = true; 767 meta_ops_.set_core_cq_tag(&meta_tag_); 768 call_.PerformOps(&meta_ops_); 769 } 770 Write(const ResponseType * resp,grpc::WriteOptions options)771 void Write(const ResponseType* resp, grpc::WriteOptions options) override { 772 this->Ref(); 773 if (options.is_last_message()) { 774 options.set_buffer_hint(); 775 } 776 if (!ctx_->sent_initial_metadata_) { 777 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 778 ctx_->initial_metadata_flags()); 779 if (ctx_->compression_level_set()) { 780 write_ops_.set_compression_level(ctx_->compression_level()); 781 } 782 ctx_->sent_initial_metadata_ = true; 783 } 784 // TODO(vjpai): don't assert 785 GPR_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); 786 call_.PerformOps(&write_ops_); 787 } 788 WriteAndFinish(const ResponseType * resp,grpc::WriteOptions options,grpc::Status s)789 void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options, 790 grpc::Status s) override { 791 // TODO(vjpai): don't assert 792 GPR_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); 793 Finish(std::move(s)); 794 } 795 Read(RequestType * req)796 void Read(RequestType* req) override { 797 this->Ref(); 798 read_ops_.RecvMessage(req); 799 call_.PerformOps(&read_ops_); 800 } 801 802 private: 803 friend class CallbackBidiHandler<RequestType, ResponseType>; 804 ServerCallbackReaderWriterImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,std::function<void ()> call_requester)805 ServerCallbackReaderWriterImpl(grpc::CallbackServerContext* ctx, 806 grpc::internal::Call* call, 807 std::function<void()> call_requester) 808 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} 809 SetupReactor(ServerBidiReactor<RequestType,ResponseType> * reactor)810 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) { 811 reactor_.store(reactor, std::memory_order_relaxed); 812 // The callbacks for these functions should not be inlined because they 813 // invoke user-controlled reactions, but any resulting OnDones can be 814 // inlined in the executor to which a callback is dispatched. 815 write_tag_.Set( 816 call_.call(), 817 [this, reactor](bool ok) { 818 reactor->OnWriteDone(ok); 819 this->MaybeDone(/*inlineable_ondone=*/true); 820 }, 821 &write_ops_, /*can_inline=*/false); 822 write_ops_.set_core_cq_tag(&write_tag_); 823 read_tag_.Set( 824 call_.call(), 825 [this, reactor](bool ok) { 826 if (GPR_UNLIKELY(!ok)) { 827 ctx_->MaybeMarkCancelledOnRead(); 828 } 829 reactor->OnReadDone(ok); 830 this->MaybeDone(/*inlineable_ondone=*/true); 831 }, 832 &read_ops_, /*can_inline=*/false); 833 read_ops_.set_core_cq_tag(&read_tag_); 834 this->BindReactor(reactor); 835 this->MaybeCallOnCancel(reactor); 836 // Inlineable OnDone can be false here because there is no bidi 837 // reactor that has an inlineable OnDone; this only applies to the 838 // DefaultReactor (which is unary). 839 this->MaybeDone(/*inlineable_ondone=*/false); 840 } 841 CallOnDone()842 void CallOnDone() override { 843 reactor_.load(std::memory_order_relaxed)->OnDone(); 844 grpc_call* call = call_.call(); 845 auto call_requester = std::move(call_requester_); 846 if (ctx_->context_allocator() != nullptr) { 847 ctx_->context_allocator()->Release(ctx_); 848 } 849 this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor 850 grpc_call_unref(call); 851 call_requester(); 852 } 853 reactor()854 ServerReactor* reactor() override { 855 return reactor_.load(std::memory_order_relaxed); 856 } 857 858 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 859 meta_ops_; 860 grpc::internal::CallbackWithSuccessTag meta_tag_; 861 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 862 grpc::internal::CallOpSendMessage, 863 grpc::internal::CallOpServerSendStatus> 864 finish_ops_; 865 grpc::internal::CallbackWithSuccessTag finish_tag_; 866 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 867 grpc::internal::CallOpSendMessage> 868 write_ops_; 869 grpc::internal::CallbackWithSuccessTag write_tag_; 870 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<RequestType>> 871 read_ops_; 872 grpc::internal::CallbackWithSuccessTag read_tag_; 873 874 grpc::CallbackServerContext* const ctx_; 875 grpc::internal::Call call_; 876 std::function<void()> call_requester_; 877 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 878 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_; 879 // callbacks_outstanding_ follows a refcount pattern 880 std::atomic<intptr_t> callbacks_outstanding_{ 881 3}; // reserve for OnStarted, Finish, and CompletionOp 882 }; 883 }; 884 885 } // namespace internal 886 } // namespace grpc 887 888 #endif // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H 889