1 // 2 // 3 // Copyright 2019 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 18 #ifndef GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H 19 #define GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H 20 21 #include <grpc/grpc.h> 22 #include <grpc/impl/call.h> 23 #include <grpcpp/impl/rpc_service_method.h> 24 #include <grpcpp/server_context.h> 25 #include <grpcpp/support/message_allocator.h> 26 #include <grpcpp/support/server_callback.h> 27 #include <grpcpp/support/status.h> 28 29 #include "absl/log/absl_check.h" 30 31 namespace grpc { 32 namespace internal { 33 34 template <class RequestType, class ResponseType> 35 class CallbackUnaryHandler : public grpc::internal::MethodHandler { 36 public: CallbackUnaryHandler(std::function<ServerUnaryReactor * (grpc::CallbackServerContext *,const RequestType *,ResponseType *)> get_reactor)37 explicit CallbackUnaryHandler( 38 std::function<ServerUnaryReactor*(grpc::CallbackServerContext*, 39 const RequestType*, ResponseType*)> 40 get_reactor) 41 : get_reactor_(std::move(get_reactor)) {} 42 SetMessageAllocator(MessageAllocator<RequestType,ResponseType> * allocator)43 void SetMessageAllocator( 44 MessageAllocator<RequestType, ResponseType>* allocator) { 45 allocator_ = allocator; 46 } 47 RunHandler(const HandlerParameter & param)48 void RunHandler(const HandlerParameter& param) final { 49 // Arena allocate a controller structure (that includes request/response) 50 grpc_call_ref(param.call->call()); 51 auto* allocator_state = 52 static_cast<MessageHolder<RequestType, ResponseType>*>( 53 param.internal_data); 54 55 auto* call = new (grpc_call_arena_alloc(param.call->call(), 56 sizeof(ServerCallbackUnaryImpl))) 57 ServerCallbackUnaryImpl( 58 static_cast<grpc::CallbackServerContext*>(param.server_context), 59 param.call, allocator_state, param.call_requester); 60 param.server_context->BeginCompletionOp( 61 param.call, [call](bool) { call->MaybeDone(); }, call); 62 63 ServerUnaryReactor* reactor = nullptr; 64 if (param.status.ok()) { 65 reactor = grpc::internal::CatchingReactorGetter<ServerUnaryReactor>( 66 get_reactor_, 67 static_cast<grpc::CallbackServerContext*>(param.server_context), 68 call->request(), call->response()); 69 } 70 71 if (reactor == nullptr) { 72 // if deserialization or reactor creator failed, we need to fail the call 73 reactor = new (grpc_call_arena_alloc(param.call->call(), 74 sizeof(UnimplementedUnaryReactor))) 75 UnimplementedUnaryReactor( 76 grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "")); 77 } 78 79 /// Invoke SetupReactor as the last part of the handler 80 call->SetupReactor(reactor); 81 } 82 Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void ** handler_data)83 void* Deserialize(grpc_call* call, grpc_byte_buffer* req, 84 grpc::Status* status, void** handler_data) final { 85 grpc::ByteBuffer buf; 86 buf.set_buffer(req); 87 RequestType* request = nullptr; 88 MessageHolder<RequestType, ResponseType>* allocator_state; 89 if (allocator_ != nullptr) { 90 allocator_state = allocator_->AllocateMessages(); 91 } else { 92 allocator_state = new (grpc_call_arena_alloc( 93 call, sizeof(DefaultMessageHolder<RequestType, ResponseType>))) 94 DefaultMessageHolder<RequestType, ResponseType>(); 95 } 96 *handler_data = allocator_state; 97 request = allocator_state->request(); 98 *status = 99 grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); 100 buf.Release(); 101 if (status->ok()) { 102 return request; 103 } 104 return nullptr; 105 } 106 107 private: 108 std::function<ServerUnaryReactor*(grpc::CallbackServerContext*, 109 const RequestType*, ResponseType*)> 110 get_reactor_; 111 MessageAllocator<RequestType, ResponseType>* allocator_ = nullptr; 112 113 class ServerCallbackUnaryImpl : public ServerCallbackUnary { 114 public: Finish(grpc::Status s)115 void Finish(grpc::Status s) override { 116 // A callback that only contains a call to MaybeDone can be run as an 117 // inline callback regardless of whether or not OnDone is inlineable 118 // because if the actual OnDone callback needs to be scheduled, MaybeDone 119 // is responsible for dispatching to an executor thread if needed. Thus, 120 // when setting up the finish_tag_, we can set its own callback to 121 // inlineable. 122 finish_tag_.Set( 123 call_.call(), 124 [this](bool) { 125 this->MaybeDone( 126 reactor_.load(std::memory_order_relaxed)->InternalInlineable()); 127 }, 128 &finish_ops_, /*can_inline=*/true); 129 finish_ops_.set_core_cq_tag(&finish_tag_); 130 131 if (!ctx_->sent_initial_metadata_) { 132 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 133 ctx_->initial_metadata_flags()); 134 if (ctx_->compression_level_set()) { 135 finish_ops_.set_compression_level(ctx_->compression_level()); 136 } 137 ctx_->sent_initial_metadata_ = true; 138 } 139 // The response is dropped if the status is not OK. 140 if (s.ok()) { 141 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, 142 finish_ops_.SendMessagePtr(response())); 143 } else { 144 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 145 } 146 finish_ops_.set_core_cq_tag(&finish_tag_); 147 call_.PerformOps(&finish_ops_); 148 } 149 SendInitialMetadata()150 void SendInitialMetadata() override { 151 ABSL_CHECK(!ctx_->sent_initial_metadata_); 152 this->Ref(); 153 // The callback for this function should not be marked inline because it 154 // is directly invoking a user-controlled reaction 155 // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor 156 // thread. However, any OnDone needed after that can be inlined because it 157 // is already running on an executor thread. 158 meta_tag_.Set( 159 call_.call(), 160 [this](bool ok) { 161 ServerUnaryReactor* reactor = 162 reactor_.load(std::memory_order_relaxed); 163 reactor->OnSendInitialMetadataDone(ok); 164 this->MaybeDone(/*inlineable_ondone=*/true); 165 }, 166 &meta_ops_, /*can_inline=*/false); 167 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 168 ctx_->initial_metadata_flags()); 169 if (ctx_->compression_level_set()) { 170 meta_ops_.set_compression_level(ctx_->compression_level()); 171 } 172 ctx_->sent_initial_metadata_ = true; 173 meta_ops_.set_core_cq_tag(&meta_tag_); 174 call_.PerformOps(&meta_ops_); 175 } 176 177 private: 178 friend class CallbackUnaryHandler<RequestType, ResponseType>; 179 ServerCallbackUnaryImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,MessageHolder<RequestType,ResponseType> * allocator_state,std::function<void ()> call_requester)180 ServerCallbackUnaryImpl( 181 grpc::CallbackServerContext* ctx, grpc::internal::Call* call, 182 MessageHolder<RequestType, ResponseType>* allocator_state, 183 std::function<void()> call_requester) 184 : ctx_(ctx), 185 call_(*call), 186 allocator_state_(allocator_state), 187 call_requester_(std::move(call_requester)) { 188 ctx_->set_message_allocator_state(allocator_state); 189 } 190 call()191 grpc_call* call() override { return call_.call(); } 192 193 /// SetupReactor binds the reactor (which also releases any queued 194 /// operations), maybe calls OnCancel if possible/needed, and maybe marks 195 /// the completion of the RPC. This should be the last component of the 196 /// handler. SetupReactor(ServerUnaryReactor * reactor)197 void SetupReactor(ServerUnaryReactor* reactor) { 198 reactor_.store(reactor, std::memory_order_relaxed); 199 this->BindReactor(reactor); 200 this->MaybeCallOnCancel(reactor); 201 this->MaybeDone(reactor->InternalInlineable()); 202 } 203 request()204 const RequestType* request() { return allocator_state_->request(); } response()205 ResponseType* response() { return allocator_state_->response(); } 206 CallOnDone()207 void CallOnDone() override { 208 reactor_.load(std::memory_order_relaxed)->OnDone(); 209 grpc_call* call = call_.call(); 210 auto call_requester = std::move(call_requester_); 211 allocator_state_->Release(); 212 if (ctx_->context_allocator() != nullptr) { 213 ctx_->context_allocator()->Release(ctx_); 214 } 215 this->~ServerCallbackUnaryImpl(); // explicitly call destructor 216 grpc_call_unref(call); 217 call_requester(); 218 } 219 reactor()220 ServerReactor* reactor() override { 221 return reactor_.load(std::memory_order_relaxed); 222 } 223 224 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 225 meta_ops_; 226 grpc::internal::CallbackWithSuccessTag meta_tag_; 227 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 228 grpc::internal::CallOpSendMessage, 229 grpc::internal::CallOpServerSendStatus> 230 finish_ops_; 231 grpc::internal::CallbackWithSuccessTag finish_tag_; 232 233 grpc::CallbackServerContext* const ctx_; 234 grpc::internal::Call call_; 235 MessageHolder<RequestType, ResponseType>* const allocator_state_; 236 std::function<void()> call_requester_; 237 // reactor_ can always be loaded/stored with relaxed memory ordering because 238 // its value is only set once, independently of other data in the object, 239 // and the loads that use it will always actually come provably later even 240 // though they are from different threads since they are triggered by 241 // actions initiated only by the setting up of the reactor_ variable. In 242 // a sense, it's a delayed "const": it gets its value from the SetupReactor 243 // method (not the constructor, so it's not a true const), but it doesn't 244 // change after that and it only gets used by actions caused, directly or 245 // indirectly, by that setup. This comment also applies to the reactor_ 246 // variables of the other streaming objects in this file. 247 std::atomic<ServerUnaryReactor*> reactor_; 248 // callbacks_outstanding_ follows a refcount pattern 249 std::atomic<intptr_t> callbacks_outstanding_{ 250 3}; // reserve for start, Finish, and CompletionOp 251 }; 252 }; 253 254 template <class RequestType, class ResponseType> 255 class CallbackClientStreamingHandler : public grpc::internal::MethodHandler { 256 public: CallbackClientStreamingHandler(std::function<ServerReadReactor<RequestType> * (grpc::CallbackServerContext *,ResponseType *)> get_reactor)257 explicit CallbackClientStreamingHandler( 258 std::function<ServerReadReactor<RequestType>*( 259 grpc::CallbackServerContext*, ResponseType*)> 260 get_reactor) 261 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)262 void RunHandler(const HandlerParameter& param) final { 263 // Arena allocate a reader structure (that includes response) 264 grpc_call_ref(param.call->call()); 265 266 auto* reader = new (grpc_call_arena_alloc(param.call->call(), 267 sizeof(ServerCallbackReaderImpl))) 268 ServerCallbackReaderImpl( 269 static_cast<grpc::CallbackServerContext*>(param.server_context), 270 param.call, param.call_requester); 271 // Inlineable OnDone can be false in the CompletionOp callback because there 272 // is no read reactor that has an inlineable OnDone; this only applies to 273 // the DefaultReactor (which is unary). 274 param.server_context->BeginCompletionOp( 275 param.call, 276 [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); }, 277 reader); 278 279 ServerReadReactor<RequestType>* reactor = nullptr; 280 if (param.status.ok()) { 281 reactor = 282 grpc::internal::CatchingReactorGetter<ServerReadReactor<RequestType>>( 283 get_reactor_, 284 static_cast<grpc::CallbackServerContext*>(param.server_context), 285 reader->response()); 286 } 287 288 if (reactor == nullptr) { 289 // if deserialization or reactor creator failed, we need to fail the call 290 reactor = new (grpc_call_arena_alloc( 291 param.call->call(), sizeof(UnimplementedReadReactor<RequestType>))) 292 UnimplementedReadReactor<RequestType>( 293 grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "")); 294 } 295 296 reader->SetupReactor(reactor); 297 } 298 299 private: 300 std::function<ServerReadReactor<RequestType>*(grpc::CallbackServerContext*, 301 ResponseType*)> 302 get_reactor_; 303 304 class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> { 305 public: Finish(grpc::Status s)306 void Finish(grpc::Status s) override { 307 // A finish tag with only MaybeDone can have its callback inlined 308 // regardless even if OnDone is not inlineable because this callback just 309 // checks a ref and then decides whether or not to dispatch OnDone. 310 finish_tag_.Set( 311 call_.call(), 312 [this](bool) { 313 // Inlineable OnDone can be false here because there is 314 // no read reactor that has an inlineable OnDone; this 315 // only applies to the DefaultReactor (which is unary). 316 this->MaybeDone(/*inlineable_ondone=*/false); 317 }, 318 &finish_ops_, /*can_inline=*/true); 319 if (!ctx_->sent_initial_metadata_) { 320 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 321 ctx_->initial_metadata_flags()); 322 if (ctx_->compression_level_set()) { 323 finish_ops_.set_compression_level(ctx_->compression_level()); 324 } 325 ctx_->sent_initial_metadata_ = true; 326 } 327 // The response is dropped if the status is not OK. 328 if (s.ok()) { 329 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, 330 finish_ops_.SendMessagePtr(&resp_)); 331 } else { 332 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 333 } 334 finish_ops_.set_core_cq_tag(&finish_tag_); 335 call_.PerformOps(&finish_ops_); 336 } 337 SendInitialMetadata()338 void SendInitialMetadata() override { 339 ABSL_CHECK(!ctx_->sent_initial_metadata_); 340 this->Ref(); 341 // The callback for this function should not be inlined because it invokes 342 // a user-controlled reaction, but any resulting OnDone can be inlined in 343 // the executor to which this callback is dispatched. 344 meta_tag_.Set( 345 call_.call(), 346 [this](bool ok) { 347 ServerReadReactor<RequestType>* reactor = 348 reactor_.load(std::memory_order_relaxed); 349 reactor->OnSendInitialMetadataDone(ok); 350 this->MaybeDone(/*inlineable_ondone=*/true); 351 }, 352 &meta_ops_, /*can_inline=*/false); 353 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 354 ctx_->initial_metadata_flags()); 355 if (ctx_->compression_level_set()) { 356 meta_ops_.set_compression_level(ctx_->compression_level()); 357 } 358 ctx_->sent_initial_metadata_ = true; 359 meta_ops_.set_core_cq_tag(&meta_tag_); 360 call_.PerformOps(&meta_ops_); 361 } 362 Read(RequestType * req)363 void Read(RequestType* req) override { 364 this->Ref(); 365 read_ops_.RecvMessage(req); 366 call_.PerformOps(&read_ops_); 367 } 368 369 private: 370 friend class CallbackClientStreamingHandler<RequestType, ResponseType>; 371 ServerCallbackReaderImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,std::function<void ()> call_requester)372 ServerCallbackReaderImpl(grpc::CallbackServerContext* ctx, 373 grpc::internal::Call* call, 374 std::function<void()> call_requester) 375 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} 376 call()377 grpc_call* call() override { return call_.call(); } 378 SetupReactor(ServerReadReactor<RequestType> * reactor)379 void SetupReactor(ServerReadReactor<RequestType>* reactor) { 380 reactor_.store(reactor, std::memory_order_relaxed); 381 // The callback for this function should not be inlined because it invokes 382 // a user-controlled reaction, but any resulting OnDone can be inlined in 383 // the executor to which this callback is dispatched. 384 read_tag_.Set( 385 call_.call(), 386 [this, reactor](bool ok) { 387 if (GPR_UNLIKELY(!ok)) { 388 ctx_->MaybeMarkCancelledOnRead(); 389 } 390 reactor->OnReadDone(ok); 391 this->MaybeDone(/*inlineable_ondone=*/true); 392 }, 393 &read_ops_, /*can_inline=*/false); 394 read_ops_.set_core_cq_tag(&read_tag_); 395 this->BindReactor(reactor); 396 this->MaybeCallOnCancel(reactor); 397 // Inlineable OnDone can be false here because there is no read 398 // reactor that has an inlineable OnDone; this only applies to the 399 // DefaultReactor (which is unary). 400 this->MaybeDone(/*inlineable_ondone=*/false); 401 } 402 ~ServerCallbackReaderImpl()403 ~ServerCallbackReaderImpl() {} 404 response()405 ResponseType* response() { return &resp_; } 406 CallOnDone()407 void CallOnDone() override { 408 reactor_.load(std::memory_order_relaxed)->OnDone(); 409 grpc_call* call = call_.call(); 410 auto call_requester = std::move(call_requester_); 411 if (ctx_->context_allocator() != nullptr) { 412 ctx_->context_allocator()->Release(ctx_); 413 } 414 this->~ServerCallbackReaderImpl(); // explicitly call destructor 415 grpc_call_unref(call); 416 call_requester(); 417 } 418 reactor()419 ServerReactor* reactor() override { 420 return reactor_.load(std::memory_order_relaxed); 421 } 422 423 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 424 meta_ops_; 425 grpc::internal::CallbackWithSuccessTag meta_tag_; 426 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 427 grpc::internal::CallOpSendMessage, 428 grpc::internal::CallOpServerSendStatus> 429 finish_ops_; 430 grpc::internal::CallbackWithSuccessTag finish_tag_; 431 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<RequestType>> 432 read_ops_; 433 grpc::internal::CallbackWithSuccessTag read_tag_; 434 435 grpc::CallbackServerContext* const ctx_; 436 grpc::internal::Call call_; 437 ResponseType resp_; 438 std::function<void()> call_requester_; 439 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 440 std::atomic<ServerReadReactor<RequestType>*> reactor_; 441 // callbacks_outstanding_ follows a refcount pattern 442 std::atomic<intptr_t> callbacks_outstanding_{ 443 3}; // reserve for OnStarted, Finish, and CompletionOp 444 }; 445 }; 446 447 template <class RequestType, class ResponseType> 448 class CallbackServerStreamingHandler : public grpc::internal::MethodHandler { 449 public: CallbackServerStreamingHandler(std::function<ServerWriteReactor<ResponseType> * (grpc::CallbackServerContext *,const RequestType *)> get_reactor)450 explicit CallbackServerStreamingHandler( 451 std::function<ServerWriteReactor<ResponseType>*( 452 grpc::CallbackServerContext*, const RequestType*)> 453 get_reactor) 454 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)455 void RunHandler(const HandlerParameter& param) final { 456 // Arena allocate a writer structure 457 grpc_call_ref(param.call->call()); 458 459 auto* writer = new (grpc_call_arena_alloc(param.call->call(), 460 sizeof(ServerCallbackWriterImpl))) 461 ServerCallbackWriterImpl( 462 static_cast<grpc::CallbackServerContext*>(param.server_context), 463 param.call, static_cast<RequestType*>(param.request), 464 param.call_requester); 465 // Inlineable OnDone can be false in the CompletionOp callback because there 466 // is no write reactor that has an inlineable OnDone; this only applies to 467 // the DefaultReactor (which is unary). 468 param.server_context->BeginCompletionOp( 469 param.call, 470 [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); }, 471 writer); 472 473 ServerWriteReactor<ResponseType>* reactor = nullptr; 474 if (param.status.ok()) { 475 reactor = grpc::internal::CatchingReactorGetter< 476 ServerWriteReactor<ResponseType>>( 477 get_reactor_, 478 static_cast<grpc::CallbackServerContext*>(param.server_context), 479 writer->request()); 480 } 481 if (reactor == nullptr) { 482 // if deserialization or reactor creator failed, we need to fail the call 483 reactor = new (grpc_call_arena_alloc( 484 param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>))) 485 UnimplementedWriteReactor<ResponseType>( 486 grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "")); 487 } 488 489 writer->SetupReactor(reactor); 490 } 491 Deserialize(grpc_call * call,grpc_byte_buffer * req,grpc::Status * status,void **)492 void* Deserialize(grpc_call* call, grpc_byte_buffer* req, 493 grpc::Status* status, void** /*handler_data*/) final { 494 grpc::ByteBuffer buf; 495 buf.set_buffer(req); 496 auto* request = 497 new (grpc_call_arena_alloc(call, sizeof(RequestType))) RequestType(); 498 *status = 499 grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); 500 buf.Release(); 501 if (status->ok()) { 502 return request; 503 } 504 request->~RequestType(); 505 return nullptr; 506 } 507 508 private: 509 std::function<ServerWriteReactor<ResponseType>*(grpc::CallbackServerContext*, 510 const RequestType*)> 511 get_reactor_; 512 513 class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> { 514 public: Finish(grpc::Status s)515 void Finish(grpc::Status s) override { 516 // A finish tag with only MaybeDone can have its callback inlined 517 // regardless even if OnDone is not inlineable because this callback just 518 // checks a ref and then decides whether or not to dispatch OnDone. 519 finish_tag_.Set( 520 call_.call(), 521 [this](bool) { 522 // Inlineable OnDone can be false here because there is 523 // no write reactor that has an inlineable OnDone; this 524 // only applies to the DefaultReactor (which is unary). 525 this->MaybeDone(/*inlineable_ondone=*/false); 526 }, 527 &finish_ops_, /*can_inline=*/true); 528 finish_ops_.set_core_cq_tag(&finish_tag_); 529 530 if (!ctx_->sent_initial_metadata_) { 531 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 532 ctx_->initial_metadata_flags()); 533 if (ctx_->compression_level_set()) { 534 finish_ops_.set_compression_level(ctx_->compression_level()); 535 } 536 ctx_->sent_initial_metadata_ = true; 537 } 538 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 539 call_.PerformOps(&finish_ops_); 540 } 541 SendInitialMetadata()542 void SendInitialMetadata() override { 543 ABSL_CHECK(!ctx_->sent_initial_metadata_); 544 this->Ref(); 545 // The callback for this function should not be inlined because it invokes 546 // a user-controlled reaction, but any resulting OnDone can be inlined in 547 // the executor to which this callback is dispatched. 548 meta_tag_.Set( 549 call_.call(), 550 [this](bool ok) { 551 ServerWriteReactor<ResponseType>* reactor = 552 reactor_.load(std::memory_order_relaxed); 553 reactor->OnSendInitialMetadataDone(ok); 554 this->MaybeDone(/*inlineable_ondone=*/true); 555 }, 556 &meta_ops_, /*can_inline=*/false); 557 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 558 ctx_->initial_metadata_flags()); 559 if (ctx_->compression_level_set()) { 560 meta_ops_.set_compression_level(ctx_->compression_level()); 561 } 562 ctx_->sent_initial_metadata_ = true; 563 meta_ops_.set_core_cq_tag(&meta_tag_); 564 call_.PerformOps(&meta_ops_); 565 } 566 Write(const ResponseType * resp,grpc::WriteOptions options)567 void Write(const ResponseType* resp, grpc::WriteOptions options) override { 568 this->Ref(); 569 if (options.is_last_message()) { 570 options.set_buffer_hint(); 571 } 572 if (!ctx_->sent_initial_metadata_) { 573 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 574 ctx_->initial_metadata_flags()); 575 if (ctx_->compression_level_set()) { 576 write_ops_.set_compression_level(ctx_->compression_level()); 577 } 578 ctx_->sent_initial_metadata_ = true; 579 } 580 // TODO(vjpai): don't assert 581 ABSL_CHECK(write_ops_.SendMessagePtr(resp, options).ok()); 582 call_.PerformOps(&write_ops_); 583 } 584 WriteAndFinish(const ResponseType * resp,grpc::WriteOptions options,grpc::Status s)585 void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options, 586 grpc::Status s) override { 587 // This combines the write into the finish callback 588 // TODO(vjpai): don't assert 589 ABSL_CHECK(finish_ops_.SendMessagePtr(resp, options).ok()); 590 Finish(std::move(s)); 591 } 592 593 private: 594 friend class CallbackServerStreamingHandler<RequestType, ResponseType>; 595 ServerCallbackWriterImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,const RequestType * req,std::function<void ()> call_requester)596 ServerCallbackWriterImpl(grpc::CallbackServerContext* ctx, 597 grpc::internal::Call* call, const RequestType* req, 598 std::function<void()> call_requester) 599 : ctx_(ctx), 600 call_(*call), 601 req_(req), 602 call_requester_(std::move(call_requester)) {} 603 call()604 grpc_call* call() override { return call_.call(); } 605 SetupReactor(ServerWriteReactor<ResponseType> * reactor)606 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) { 607 reactor_.store(reactor, std::memory_order_relaxed); 608 // The callback for this function should not be inlined because it invokes 609 // a user-controlled reaction, but any resulting OnDone can be inlined in 610 // the executor to which this callback is dispatched. 611 write_tag_.Set( 612 call_.call(), 613 [this, reactor](bool ok) { 614 reactor->OnWriteDone(ok); 615 this->MaybeDone(/*inlineable_ondone=*/true); 616 }, 617 &write_ops_, /*can_inline=*/false); 618 write_ops_.set_core_cq_tag(&write_tag_); 619 this->BindReactor(reactor); 620 this->MaybeCallOnCancel(reactor); 621 // Inlineable OnDone can be false here because there is no write 622 // reactor that has an inlineable OnDone; this only applies to the 623 // DefaultReactor (which is unary). 624 this->MaybeDone(/*inlineable_ondone=*/false); 625 } ~ServerCallbackWriterImpl()626 ~ServerCallbackWriterImpl() { 627 if (req_ != nullptr) { 628 req_->~RequestType(); 629 } 630 } 631 request()632 const RequestType* request() { return req_; } 633 CallOnDone()634 void CallOnDone() override { 635 reactor_.load(std::memory_order_relaxed)->OnDone(); 636 grpc_call* call = call_.call(); 637 auto call_requester = std::move(call_requester_); 638 if (ctx_->context_allocator() != nullptr) { 639 ctx_->context_allocator()->Release(ctx_); 640 } 641 this->~ServerCallbackWriterImpl(); // explicitly call destructor 642 grpc_call_unref(call); 643 call_requester(); 644 } 645 reactor()646 ServerReactor* reactor() override { 647 return reactor_.load(std::memory_order_relaxed); 648 } 649 650 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 651 meta_ops_; 652 grpc::internal::CallbackWithSuccessTag meta_tag_; 653 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 654 grpc::internal::CallOpSendMessage, 655 grpc::internal::CallOpServerSendStatus> 656 finish_ops_; 657 grpc::internal::CallbackWithSuccessTag finish_tag_; 658 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 659 grpc::internal::CallOpSendMessage> 660 write_ops_; 661 grpc::internal::CallbackWithSuccessTag write_tag_; 662 663 grpc::CallbackServerContext* const ctx_; 664 grpc::internal::Call call_; 665 const RequestType* req_; 666 std::function<void()> call_requester_; 667 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 668 std::atomic<ServerWriteReactor<ResponseType>*> reactor_; 669 // callbacks_outstanding_ follows a refcount pattern 670 std::atomic<intptr_t> callbacks_outstanding_{ 671 3}; // reserve for OnStarted, Finish, and CompletionOp 672 }; 673 }; 674 675 template <class RequestType, class ResponseType> 676 class CallbackBidiHandler : public grpc::internal::MethodHandler { 677 public: CallbackBidiHandler(std::function<ServerBidiReactor<RequestType,ResponseType> * (grpc::CallbackServerContext *)> get_reactor)678 explicit CallbackBidiHandler( 679 std::function<ServerBidiReactor<RequestType, ResponseType>*( 680 grpc::CallbackServerContext*)> 681 get_reactor) 682 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)683 void RunHandler(const HandlerParameter& param) final { 684 grpc_call_ref(param.call->call()); 685 686 auto* stream = new (grpc_call_arena_alloc( 687 param.call->call(), sizeof(ServerCallbackReaderWriterImpl))) 688 ServerCallbackReaderWriterImpl( 689 static_cast<grpc::CallbackServerContext*>(param.server_context), 690 param.call, param.call_requester); 691 // Inlineable OnDone can be false in the CompletionOp callback because there 692 // is no bidi reactor that has an inlineable OnDone; this only applies to 693 // the DefaultReactor (which is unary). 694 param.server_context->BeginCompletionOp( 695 param.call, 696 [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); }, 697 stream); 698 699 ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr; 700 if (param.status.ok()) { 701 reactor = grpc::internal::CatchingReactorGetter< 702 ServerBidiReactor<RequestType, ResponseType>>( 703 get_reactor_, 704 static_cast<grpc::CallbackServerContext*>(param.server_context)); 705 } 706 707 if (reactor == nullptr) { 708 // if deserialization or reactor creator failed, we need to fail the call 709 reactor = new (grpc_call_arena_alloc( 710 param.call->call(), 711 sizeof(UnimplementedBidiReactor<RequestType, ResponseType>))) 712 UnimplementedBidiReactor<RequestType, ResponseType>( 713 grpc::Status(grpc::StatusCode::UNIMPLEMENTED, "")); 714 } 715 716 stream->SetupReactor(reactor); 717 } 718 719 private: 720 std::function<ServerBidiReactor<RequestType, ResponseType>*( 721 grpc::CallbackServerContext*)> 722 get_reactor_; 723 724 class ServerCallbackReaderWriterImpl 725 : public ServerCallbackReaderWriter<RequestType, ResponseType> { 726 public: Finish(grpc::Status s)727 void Finish(grpc::Status s) override { 728 // A finish tag with only MaybeDone can have its callback inlined 729 // regardless even if OnDone is not inlineable because this callback just 730 // checks a ref and then decides whether or not to dispatch OnDone. 731 finish_tag_.Set( 732 call_.call(), 733 [this](bool) { 734 // Inlineable OnDone can be false here because there is 735 // no bidi reactor that has an inlineable OnDone; this 736 // only applies to the DefaultReactor (which is unary). 737 this->MaybeDone(/*inlineable_ondone=*/false); 738 }, 739 &finish_ops_, /*can_inline=*/true); 740 finish_ops_.set_core_cq_tag(&finish_tag_); 741 742 if (!ctx_->sent_initial_metadata_) { 743 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 744 ctx_->initial_metadata_flags()); 745 if (ctx_->compression_level_set()) { 746 finish_ops_.set_compression_level(ctx_->compression_level()); 747 } 748 ctx_->sent_initial_metadata_ = true; 749 } 750 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 751 call_.PerformOps(&finish_ops_); 752 } 753 SendInitialMetadata()754 void SendInitialMetadata() override { 755 ABSL_CHECK(!ctx_->sent_initial_metadata_); 756 this->Ref(); 757 // The callback for this function should not be inlined because it invokes 758 // a user-controlled reaction, but any resulting OnDone can be inlined in 759 // the executor to which this callback is dispatched. 760 meta_tag_.Set( 761 call_.call(), 762 [this](bool ok) { 763 ServerBidiReactor<RequestType, ResponseType>* reactor = 764 reactor_.load(std::memory_order_relaxed); 765 reactor->OnSendInitialMetadataDone(ok); 766 this->MaybeDone(/*inlineable_ondone=*/true); 767 }, 768 &meta_ops_, /*can_inline=*/false); 769 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 770 ctx_->initial_metadata_flags()); 771 if (ctx_->compression_level_set()) { 772 meta_ops_.set_compression_level(ctx_->compression_level()); 773 } 774 ctx_->sent_initial_metadata_ = true; 775 meta_ops_.set_core_cq_tag(&meta_tag_); 776 call_.PerformOps(&meta_ops_); 777 } 778 Write(const ResponseType * resp,grpc::WriteOptions options)779 void Write(const ResponseType* resp, grpc::WriteOptions options) override { 780 this->Ref(); 781 if (options.is_last_message()) { 782 options.set_buffer_hint(); 783 } 784 if (!ctx_->sent_initial_metadata_) { 785 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 786 ctx_->initial_metadata_flags()); 787 if (ctx_->compression_level_set()) { 788 write_ops_.set_compression_level(ctx_->compression_level()); 789 } 790 ctx_->sent_initial_metadata_ = true; 791 } 792 // TODO(vjpai): don't assert 793 ABSL_CHECK(write_ops_.SendMessagePtr(resp, options).ok()); 794 call_.PerformOps(&write_ops_); 795 } 796 WriteAndFinish(const ResponseType * resp,grpc::WriteOptions options,grpc::Status s)797 void WriteAndFinish(const ResponseType* resp, grpc::WriteOptions options, 798 grpc::Status s) override { 799 // TODO(vjpai): don't assert 800 ABSL_CHECK(finish_ops_.SendMessagePtr(resp, options).ok()); 801 Finish(std::move(s)); 802 } 803 Read(RequestType * req)804 void Read(RequestType* req) override { 805 this->Ref(); 806 read_ops_.RecvMessage(req); 807 call_.PerformOps(&read_ops_); 808 } 809 810 private: 811 friend class CallbackBidiHandler<RequestType, ResponseType>; 812 ServerCallbackReaderWriterImpl(grpc::CallbackServerContext * ctx,grpc::internal::Call * call,std::function<void ()> call_requester)813 ServerCallbackReaderWriterImpl(grpc::CallbackServerContext* ctx, 814 grpc::internal::Call* call, 815 std::function<void()> call_requester) 816 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} 817 call()818 grpc_call* call() override { return call_.call(); } 819 SetupReactor(ServerBidiReactor<RequestType,ResponseType> * reactor)820 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) { 821 reactor_.store(reactor, std::memory_order_relaxed); 822 // The callbacks for these functions should not be inlined because they 823 // invoke user-controlled reactions, but any resulting OnDones can be 824 // inlined in the executor to which a callback is dispatched. 825 write_tag_.Set( 826 call_.call(), 827 [this, reactor](bool ok) { 828 reactor->OnWriteDone(ok); 829 this->MaybeDone(/*inlineable_ondone=*/true); 830 }, 831 &write_ops_, /*can_inline=*/false); 832 write_ops_.set_core_cq_tag(&write_tag_); 833 read_tag_.Set( 834 call_.call(), 835 [this, reactor](bool ok) { 836 if (GPR_UNLIKELY(!ok)) { 837 ctx_->MaybeMarkCancelledOnRead(); 838 } 839 reactor->OnReadDone(ok); 840 this->MaybeDone(/*inlineable_ondone=*/true); 841 }, 842 &read_ops_, /*can_inline=*/false); 843 read_ops_.set_core_cq_tag(&read_tag_); 844 this->BindReactor(reactor); 845 this->MaybeCallOnCancel(reactor); 846 // Inlineable OnDone can be false here because there is no bidi 847 // reactor that has an inlineable OnDone; this only applies to the 848 // DefaultReactor (which is unary). 849 this->MaybeDone(/*inlineable_ondone=*/false); 850 } 851 CallOnDone()852 void CallOnDone() override { 853 reactor_.load(std::memory_order_relaxed)->OnDone(); 854 grpc_call* call = call_.call(); 855 auto call_requester = std::move(call_requester_); 856 if (ctx_->context_allocator() != nullptr) { 857 ctx_->context_allocator()->Release(ctx_); 858 } 859 this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor 860 grpc_call_unref(call); 861 call_requester(); 862 } 863 reactor()864 ServerReactor* reactor() override { 865 return reactor_.load(std::memory_order_relaxed); 866 } 867 868 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> 869 meta_ops_; 870 grpc::internal::CallbackWithSuccessTag meta_tag_; 871 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 872 grpc::internal::CallOpSendMessage, 873 grpc::internal::CallOpServerSendStatus> 874 finish_ops_; 875 grpc::internal::CallbackWithSuccessTag finish_tag_; 876 grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, 877 grpc::internal::CallOpSendMessage> 878 write_ops_; 879 grpc::internal::CallbackWithSuccessTag write_tag_; 880 grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<RequestType>> 881 read_ops_; 882 grpc::internal::CallbackWithSuccessTag read_tag_; 883 884 grpc::CallbackServerContext* const ctx_; 885 grpc::internal::Call call_; 886 std::function<void()> call_requester_; 887 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 888 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_; 889 // callbacks_outstanding_ follows a refcount pattern 890 std::atomic<intptr_t> callbacks_outstanding_{ 891 3}; // reserve for OnStarted, Finish, and CompletionOp 892 }; 893 }; 894 895 } // namespace internal 896 } // namespace grpc 897 898 #endif // GRPCPP_IMPL_SERVER_CALLBACK_HANDLERS_H 899