1 /* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H 19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H 20 21 #include <grpcpp/impl/codegen/message_allocator.h> 22 #include <grpcpp/impl/codegen/rpc_service_method.h> 23 #include <grpcpp/impl/codegen/server_callback.h> 24 #include <grpcpp/impl/codegen/server_context.h> 25 #include <grpcpp/impl/codegen/status.h> 26 27 namespace grpc { 28 namespace internal { 29 30 template <class RequestType, class ResponseType> 31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler { 32 public: CallbackUnaryHandler(std::function<ServerUnaryReactor * (::grpc::CallbackServerContext *,const RequestType *,ResponseType *)> get_reactor)33 explicit CallbackUnaryHandler( 34 std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*, 35 const RequestType*, ResponseType*)> 36 get_reactor) 37 : get_reactor_(std::move(get_reactor)) {} 38 SetMessageAllocator(::grpc::experimental::MessageAllocator<RequestType,ResponseType> * allocator)39 void SetMessageAllocator( 40 ::grpc::experimental::MessageAllocator<RequestType, ResponseType>* 41 allocator) { 42 allocator_ = allocator; 43 } 44 RunHandler(const HandlerParameter & param)45 void RunHandler(const HandlerParameter& param) final { 46 // Arena allocate a controller structure (that includes request/response) 47 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); 48 auto* allocator_state = static_cast< 49 ::grpc::experimental::MessageHolder<RequestType, ResponseType>*>( 50 param.internal_data); 51 52 auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 53 param.call->call(), sizeof(ServerCallbackUnaryImpl))) 54 ServerCallbackUnaryImpl( 55 static_cast<::grpc::CallbackServerContext*>(param.server_context), 56 param.call, allocator_state, param.call_requester); 57 param.server_context->BeginCompletionOp( 58 param.call, [call](bool) { call->MaybeDone(); }, call); 59 60 ServerUnaryReactor* reactor = nullptr; 61 if (param.status.ok()) { 62 reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>( 63 get_reactor_, 64 static_cast<::grpc::CallbackServerContext*>(param.server_context), 65 call->request(), call->response()); 66 } 67 68 if (reactor == nullptr) { 69 // if deserialization or reactor creator failed, we need to fail the call 70 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 71 param.call->call(), sizeof(UnimplementedUnaryReactor))) 72 UnimplementedUnaryReactor( 73 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); 74 } 75 76 /// Invoke SetupReactor as the last part of the handler 77 call->SetupReactor(reactor); 78 } 79 Deserialize(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,void ** handler_data)80 void* Deserialize(grpc_call* call, grpc_byte_buffer* req, 81 ::grpc::Status* status, void** handler_data) final { 82 ::grpc::ByteBuffer buf; 83 buf.set_buffer(req); 84 RequestType* request = nullptr; 85 ::grpc::experimental::MessageHolder<RequestType, ResponseType>* 86 allocator_state = nullptr; 87 if (allocator_ != nullptr) { 88 allocator_state = allocator_->AllocateMessages(); 89 } else { 90 allocator_state = 91 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 92 call, sizeof(DefaultMessageHolder<RequestType, ResponseType>))) 93 DefaultMessageHolder<RequestType, ResponseType>(); 94 } 95 *handler_data = allocator_state; 96 request = allocator_state->request(); 97 *status = 98 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); 99 buf.Release(); 100 if (status->ok()) { 101 return request; 102 } 103 // Clean up on deserialization failure. 104 allocator_state->Release(); 105 return nullptr; 106 } 107 108 private: 109 std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*, 110 const RequestType*, ResponseType*)> 111 get_reactor_; 112 ::grpc::experimental::MessageAllocator<RequestType, ResponseType>* 113 allocator_ = nullptr; 114 115 class ServerCallbackUnaryImpl : public ServerCallbackUnary { 116 public: Finish(::grpc::Status s)117 void Finish(::grpc::Status s) override { 118 // A callback that only contains a call to MaybeDone can be run as an 119 // inline callback regardless of whether or not OnDone is inlineable 120 // because if the actual OnDone callback needs to be scheduled, MaybeDone 121 // is responsible for dispatching to an executor thread if needed. Thus, 122 // when setting up the finish_tag_, we can set its own callback to 123 // inlineable. 124 finish_tag_.Set( 125 call_.call(), 126 [this](bool) { 127 this->MaybeDone( 128 reactor_.load(std::memory_order_relaxed)->InternalInlineable()); 129 }, 130 &finish_ops_, /*can_inline=*/true); 131 finish_ops_.set_core_cq_tag(&finish_tag_); 132 133 if (!ctx_->sent_initial_metadata_) { 134 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 135 ctx_->initial_metadata_flags()); 136 if (ctx_->compression_level_set()) { 137 finish_ops_.set_compression_level(ctx_->compression_level()); 138 } 139 ctx_->sent_initial_metadata_ = true; 140 } 141 // The response is dropped if the status is not OK. 142 if (s.ok()) { 143 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, 144 finish_ops_.SendMessagePtr(response())); 145 } else { 146 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 147 } 148 finish_ops_.set_core_cq_tag(&finish_tag_); 149 call_.PerformOps(&finish_ops_); 150 } 151 SendInitialMetadata()152 void SendInitialMetadata() override { 153 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 154 this->Ref(); 155 // The callback for this function should not be marked inline because it 156 // is directly invoking a user-controlled reaction 157 // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor 158 // thread. However, any OnDone needed after that can be inlined because it 159 // is already running on an executor thread. 160 meta_tag_.Set( 161 call_.call(), 162 [this](bool ok) { 163 ServerUnaryReactor* reactor = 164 reactor_.load(std::memory_order_relaxed); 165 reactor->OnSendInitialMetadataDone(ok); 166 this->MaybeDone(/*inlineable_ondone=*/true); 167 }, 168 &meta_ops_, /*can_inline=*/false); 169 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 170 ctx_->initial_metadata_flags()); 171 if (ctx_->compression_level_set()) { 172 meta_ops_.set_compression_level(ctx_->compression_level()); 173 } 174 ctx_->sent_initial_metadata_ = true; 175 meta_ops_.set_core_cq_tag(&meta_tag_); 176 call_.PerformOps(&meta_ops_); 177 } 178 179 private: 180 friend class CallbackUnaryHandler<RequestType, ResponseType>; 181 ServerCallbackUnaryImpl(::grpc::CallbackServerContext * ctx,::grpc::internal::Call * call,::grpc::experimental::MessageHolder<RequestType,ResponseType> * allocator_state,std::function<void ()> call_requester)182 ServerCallbackUnaryImpl( 183 ::grpc::CallbackServerContext* ctx, ::grpc::internal::Call* call, 184 ::grpc::experimental::MessageHolder<RequestType, ResponseType>* 185 allocator_state, 186 std::function<void()> call_requester) 187 : ctx_(ctx), 188 call_(*call), 189 allocator_state_(allocator_state), 190 call_requester_(std::move(call_requester)) { 191 ctx_->set_message_allocator_state(allocator_state); 192 } 193 194 /// SetupReactor binds the reactor (which also releases any queued 195 /// operations), maybe calls OnCancel if possible/needed, and maybe marks 196 /// the completion of the RPC. This should be the last component of the 197 /// handler. SetupReactor(ServerUnaryReactor * reactor)198 void SetupReactor(ServerUnaryReactor* reactor) { 199 reactor_.store(reactor, std::memory_order_relaxed); 200 this->BindReactor(reactor); 201 this->MaybeCallOnCancel(reactor); 202 this->MaybeDone(reactor->InternalInlineable()); 203 } 204 request()205 const RequestType* request() { return allocator_state_->request(); } response()206 ResponseType* response() { return allocator_state_->response(); } 207 CallOnDone()208 void CallOnDone() override { 209 reactor_.load(std::memory_order_relaxed)->OnDone(); 210 grpc_call* call = call_.call(); 211 auto call_requester = std::move(call_requester_); 212 allocator_state_->Release(); 213 if (ctx_->context_allocator() != nullptr) { 214 ctx_->context_allocator()->Release(ctx_); 215 } 216 this->~ServerCallbackUnaryImpl(); // explicitly call destructor 217 ::grpc::g_core_codegen_interface->grpc_call_unref(call); 218 call_requester(); 219 } 220 reactor()221 ServerReactor* reactor() override { 222 return reactor_.load(std::memory_order_relaxed); 223 } 224 225 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 226 meta_ops_; 227 ::grpc::internal::CallbackWithSuccessTag meta_tag_; 228 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 229 ::grpc::internal::CallOpSendMessage, 230 ::grpc::internal::CallOpServerSendStatus> 231 finish_ops_; 232 ::grpc::internal::CallbackWithSuccessTag finish_tag_; 233 234 ::grpc::CallbackServerContext* const ctx_; 235 ::grpc::internal::Call call_; 236 ::grpc::experimental::MessageHolder<RequestType, ResponseType>* const 237 allocator_state_; 238 std::function<void()> call_requester_; 239 // reactor_ can always be loaded/stored with relaxed memory ordering because 240 // its value is only set once, independently of other data in the object, 241 // and the loads that use it will always actually come provably later even 242 // though they are from different threads since they are triggered by 243 // actions initiated only by the setting up of the reactor_ variable. In 244 // a sense, it's a delayed "const": it gets its value from the SetupReactor 245 // method (not the constructor, so it's not a true const), but it doesn't 246 // change after that and it only gets used by actions caused, directly or 247 // indirectly, by that setup. This comment also applies to the reactor_ 248 // variables of the other streaming objects in this file. 249 std::atomic<ServerUnaryReactor*> reactor_; 250 // callbacks_outstanding_ follows a refcount pattern 251 std::atomic<intptr_t> callbacks_outstanding_{ 252 3}; // reserve for start, Finish, and CompletionOp 253 }; 254 }; 255 256 template <class RequestType, class ResponseType> 257 class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler { 258 public: CallbackClientStreamingHandler(std::function<ServerReadReactor<RequestType> * (::grpc::CallbackServerContext *,ResponseType *)> get_reactor)259 explicit CallbackClientStreamingHandler( 260 std::function<ServerReadReactor<RequestType>*( 261 ::grpc::CallbackServerContext*, ResponseType*)> 262 get_reactor) 263 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)264 void RunHandler(const HandlerParameter& param) final { 265 // Arena allocate a reader structure (that includes response) 266 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); 267 268 auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 269 param.call->call(), sizeof(ServerCallbackReaderImpl))) 270 ServerCallbackReaderImpl( 271 static_cast<::grpc::CallbackServerContext*>(param.server_context), 272 param.call, param.call_requester); 273 // Inlineable OnDone can be false in the CompletionOp callback because there 274 // is no read reactor that has an inlineable OnDone; this only applies to 275 // the DefaultReactor (which is unary). 276 param.server_context->BeginCompletionOp( 277 param.call, 278 [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); }, 279 reader); 280 281 ServerReadReactor<RequestType>* reactor = nullptr; 282 if (param.status.ok()) { 283 reactor = ::grpc::internal::CatchingReactorGetter< 284 ServerReadReactor<RequestType>>( 285 get_reactor_, 286 static_cast<::grpc::CallbackServerContext*>(param.server_context), 287 reader->response()); 288 } 289 290 if (reactor == nullptr) { 291 // if deserialization or reactor creator failed, we need to fail the call 292 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 293 param.call->call(), sizeof(UnimplementedReadReactor<RequestType>))) 294 UnimplementedReadReactor<RequestType>( 295 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); 296 } 297 298 reader->SetupReactor(reactor); 299 } 300 301 private: 302 std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*, 303 ResponseType*)> 304 get_reactor_; 305 306 class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> { 307 public: Finish(::grpc::Status s)308 void Finish(::grpc::Status s) override { 309 // A finish tag with only MaybeDone can have its callback inlined 310 // regardless even if OnDone is not inlineable because this callback just 311 // checks a ref and then decides whether or not to dispatch OnDone. 312 finish_tag_.Set( 313 call_.call(), 314 [this](bool) { 315 // Inlineable OnDone can be false here because there is 316 // no read reactor that has an inlineable OnDone; this 317 // only applies to the DefaultReactor (which is unary). 318 this->MaybeDone(/*inlineable_ondone=*/false); 319 }, 320 &finish_ops_, /*can_inline=*/true); 321 if (!ctx_->sent_initial_metadata_) { 322 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 323 ctx_->initial_metadata_flags()); 324 if (ctx_->compression_level_set()) { 325 finish_ops_.set_compression_level(ctx_->compression_level()); 326 } 327 ctx_->sent_initial_metadata_ = true; 328 } 329 // The response is dropped if the status is not OK. 330 if (s.ok()) { 331 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, 332 finish_ops_.SendMessagePtr(&resp_)); 333 } else { 334 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 335 } 336 finish_ops_.set_core_cq_tag(&finish_tag_); 337 call_.PerformOps(&finish_ops_); 338 } 339 SendInitialMetadata()340 void SendInitialMetadata() override { 341 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 342 this->Ref(); 343 // The callback for this function should not be inlined because it invokes 344 // a user-controlled reaction, but any resulting OnDone can be inlined in 345 // the executor to which this callback is dispatched. 346 meta_tag_.Set( 347 call_.call(), 348 [this](bool ok) { 349 ServerReadReactor<RequestType>* reactor = 350 reactor_.load(std::memory_order_relaxed); 351 reactor->OnSendInitialMetadataDone(ok); 352 this->MaybeDone(/*inlineable_ondone=*/true); 353 }, 354 &meta_ops_, /*can_inline=*/false); 355 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 356 ctx_->initial_metadata_flags()); 357 if (ctx_->compression_level_set()) { 358 meta_ops_.set_compression_level(ctx_->compression_level()); 359 } 360 ctx_->sent_initial_metadata_ = true; 361 meta_ops_.set_core_cq_tag(&meta_tag_); 362 call_.PerformOps(&meta_ops_); 363 } 364 Read(RequestType * req)365 void Read(RequestType* req) override { 366 this->Ref(); 367 read_ops_.RecvMessage(req); 368 call_.PerformOps(&read_ops_); 369 } 370 371 private: 372 friend class CallbackClientStreamingHandler<RequestType, ResponseType>; 373 ServerCallbackReaderImpl(::grpc::CallbackServerContext * ctx,::grpc::internal::Call * call,std::function<void ()> call_requester)374 ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx, 375 ::grpc::internal::Call* call, 376 std::function<void()> call_requester) 377 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} 378 SetupReactor(ServerReadReactor<RequestType> * reactor)379 void SetupReactor(ServerReadReactor<RequestType>* reactor) { 380 reactor_.store(reactor, std::memory_order_relaxed); 381 // The callback for this function should not be inlined because it invokes 382 // a user-controlled reaction, but any resulting OnDone can be inlined in 383 // the executor to which this callback is dispatched. 384 read_tag_.Set( 385 call_.call(), 386 [this, reactor](bool ok) { 387 reactor->OnReadDone(ok); 388 this->MaybeDone(/*inlineable_ondone=*/true); 389 }, 390 &read_ops_, /*can_inline=*/false); 391 read_ops_.set_core_cq_tag(&read_tag_); 392 this->BindReactor(reactor); 393 this->MaybeCallOnCancel(reactor); 394 // Inlineable OnDone can be false here because there is no read 395 // reactor that has an inlineable OnDone; this only applies to the 396 // DefaultReactor (which is unary). 397 this->MaybeDone(/*inlineable_ondone=*/false); 398 } 399 ~ServerCallbackReaderImpl()400 ~ServerCallbackReaderImpl() {} 401 response()402 ResponseType* response() { return &resp_; } 403 CallOnDone()404 void CallOnDone() override { 405 reactor_.load(std::memory_order_relaxed)->OnDone(); 406 grpc_call* call = call_.call(); 407 auto call_requester = std::move(call_requester_); 408 if (ctx_->context_allocator() != nullptr) { 409 ctx_->context_allocator()->Release(ctx_); 410 } 411 this->~ServerCallbackReaderImpl(); // explicitly call destructor 412 ::grpc::g_core_codegen_interface->grpc_call_unref(call); 413 call_requester(); 414 } 415 reactor()416 ServerReactor* reactor() override { 417 return reactor_.load(std::memory_order_relaxed); 418 } 419 420 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 421 meta_ops_; 422 ::grpc::internal::CallbackWithSuccessTag meta_tag_; 423 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 424 ::grpc::internal::CallOpSendMessage, 425 ::grpc::internal::CallOpServerSendStatus> 426 finish_ops_; 427 ::grpc::internal::CallbackWithSuccessTag finish_tag_; 428 ::grpc::internal::CallOpSet< 429 ::grpc::internal::CallOpRecvMessage<RequestType>> 430 read_ops_; 431 ::grpc::internal::CallbackWithSuccessTag read_tag_; 432 433 ::grpc::CallbackServerContext* const ctx_; 434 ::grpc::internal::Call call_; 435 ResponseType resp_; 436 std::function<void()> call_requester_; 437 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 438 std::atomic<ServerReadReactor<RequestType>*> reactor_; 439 // callbacks_outstanding_ follows a refcount pattern 440 std::atomic<intptr_t> callbacks_outstanding_{ 441 3}; // reserve for OnStarted, Finish, and CompletionOp 442 }; 443 }; 444 445 template <class RequestType, class ResponseType> 446 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler { 447 public: CallbackServerStreamingHandler(std::function<ServerWriteReactor<ResponseType> * (::grpc::CallbackServerContext *,const RequestType *)> get_reactor)448 explicit CallbackServerStreamingHandler( 449 std::function<ServerWriteReactor<ResponseType>*( 450 ::grpc::CallbackServerContext*, const RequestType*)> 451 get_reactor) 452 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)453 void RunHandler(const HandlerParameter& param) final { 454 // Arena allocate a writer structure 455 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); 456 457 auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 458 param.call->call(), sizeof(ServerCallbackWriterImpl))) 459 ServerCallbackWriterImpl( 460 static_cast<::grpc::CallbackServerContext*>(param.server_context), 461 param.call, static_cast<RequestType*>(param.request), 462 param.call_requester); 463 // Inlineable OnDone can be false in the CompletionOp callback because there 464 // is no write reactor that has an inlineable OnDone; this only applies to 465 // the DefaultReactor (which is unary). 466 param.server_context->BeginCompletionOp( 467 param.call, 468 [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); }, 469 writer); 470 471 ServerWriteReactor<ResponseType>* reactor = nullptr; 472 if (param.status.ok()) { 473 reactor = ::grpc::internal::CatchingReactorGetter< 474 ServerWriteReactor<ResponseType>>( 475 get_reactor_, 476 static_cast<::grpc::CallbackServerContext*>(param.server_context), 477 writer->request()); 478 } 479 if (reactor == nullptr) { 480 // if deserialization or reactor creator failed, we need to fail the call 481 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 482 param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>))) 483 UnimplementedWriteReactor<ResponseType>( 484 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); 485 } 486 487 writer->SetupReactor(reactor); 488 } 489 Deserialize(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,void **)490 void* Deserialize(grpc_call* call, grpc_byte_buffer* req, 491 ::grpc::Status* status, void** /*handler_data*/) final { 492 ::grpc::ByteBuffer buf; 493 buf.set_buffer(req); 494 auto* request = 495 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 496 call, sizeof(RequestType))) RequestType(); 497 *status = 498 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); 499 buf.Release(); 500 if (status->ok()) { 501 return request; 502 } 503 request->~RequestType(); 504 return nullptr; 505 } 506 507 private: 508 std::function<ServerWriteReactor<ResponseType>*( 509 ::grpc::CallbackServerContext*, const RequestType*)> 510 get_reactor_; 511 512 class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> { 513 public: Finish(::grpc::Status s)514 void Finish(::grpc::Status s) override { 515 // A finish tag with only MaybeDone can have its callback inlined 516 // regardless even if OnDone is not inlineable because this callback just 517 // checks a ref and then decides whether or not to dispatch OnDone. 518 finish_tag_.Set( 519 call_.call(), 520 [this](bool) { 521 // Inlineable OnDone can be false here because there is 522 // no write reactor that has an inlineable OnDone; this 523 // only applies to the DefaultReactor (which is unary). 524 this->MaybeDone(/*inlineable_ondone=*/false); 525 }, 526 &finish_ops_, /*can_inline=*/true); 527 finish_ops_.set_core_cq_tag(&finish_tag_); 528 529 if (!ctx_->sent_initial_metadata_) { 530 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 531 ctx_->initial_metadata_flags()); 532 if (ctx_->compression_level_set()) { 533 finish_ops_.set_compression_level(ctx_->compression_level()); 534 } 535 ctx_->sent_initial_metadata_ = true; 536 } 537 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 538 call_.PerformOps(&finish_ops_); 539 } 540 SendInitialMetadata()541 void SendInitialMetadata() override { 542 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 543 this->Ref(); 544 // The callback for this function should not be inlined because it invokes 545 // a user-controlled reaction, but any resulting OnDone can be inlined in 546 // the executor to which this callback is dispatched. 547 meta_tag_.Set( 548 call_.call(), 549 [this](bool ok) { 550 ServerWriteReactor<ResponseType>* reactor = 551 reactor_.load(std::memory_order_relaxed); 552 reactor->OnSendInitialMetadataDone(ok); 553 this->MaybeDone(/*inlineable_ondone=*/true); 554 }, 555 &meta_ops_, /*can_inline=*/false); 556 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 557 ctx_->initial_metadata_flags()); 558 if (ctx_->compression_level_set()) { 559 meta_ops_.set_compression_level(ctx_->compression_level()); 560 } 561 ctx_->sent_initial_metadata_ = true; 562 meta_ops_.set_core_cq_tag(&meta_tag_); 563 call_.PerformOps(&meta_ops_); 564 } 565 Write(const ResponseType * resp,::grpc::WriteOptions options)566 void Write(const ResponseType* resp, 567 ::grpc::WriteOptions options) override { 568 this->Ref(); 569 if (options.is_last_message()) { 570 options.set_buffer_hint(); 571 } 572 if (!ctx_->sent_initial_metadata_) { 573 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 574 ctx_->initial_metadata_flags()); 575 if (ctx_->compression_level_set()) { 576 write_ops_.set_compression_level(ctx_->compression_level()); 577 } 578 ctx_->sent_initial_metadata_ = true; 579 } 580 // TODO(vjpai): don't assert 581 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); 582 call_.PerformOps(&write_ops_); 583 } 584 WriteAndFinish(const ResponseType * resp,::grpc::WriteOptions options,::grpc::Status s)585 void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options, 586 ::grpc::Status s) override { 587 // This combines the write into the finish callback 588 // TODO(vjpai): don't assert 589 GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); 590 Finish(std::move(s)); 591 } 592 593 private: 594 friend class CallbackServerStreamingHandler<RequestType, ResponseType>; 595 ServerCallbackWriterImpl(::grpc::CallbackServerContext * ctx,::grpc::internal::Call * call,const RequestType * req,std::function<void ()> call_requester)596 ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx, 597 ::grpc::internal::Call* call, 598 const RequestType* req, 599 std::function<void()> call_requester) 600 : ctx_(ctx), 601 call_(*call), 602 req_(req), 603 call_requester_(std::move(call_requester)) {} 604 SetupReactor(ServerWriteReactor<ResponseType> * reactor)605 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) { 606 reactor_.store(reactor, std::memory_order_relaxed); 607 // The callback for this function should not be inlined because it invokes 608 // a user-controlled reaction, but any resulting OnDone can be inlined in 609 // the executor to which this callback is dispatched. 610 write_tag_.Set( 611 call_.call(), 612 [this, reactor](bool ok) { 613 reactor->OnWriteDone(ok); 614 this->MaybeDone(/*inlineable_ondone=*/true); 615 }, 616 &write_ops_, /*can_inline=*/false); 617 write_ops_.set_core_cq_tag(&write_tag_); 618 this->BindReactor(reactor); 619 this->MaybeCallOnCancel(reactor); 620 // Inlineable OnDone can be false here because there is no write 621 // reactor that has an inlineable OnDone; this only applies to the 622 // DefaultReactor (which is unary). 623 this->MaybeDone(/*inlineable_ondone=*/false); 624 } ~ServerCallbackWriterImpl()625 ~ServerCallbackWriterImpl() { 626 if (req_ != nullptr) { 627 req_->~RequestType(); 628 } 629 } 630 request()631 const RequestType* request() { return req_; } 632 CallOnDone()633 void CallOnDone() override { 634 reactor_.load(std::memory_order_relaxed)->OnDone(); 635 grpc_call* call = call_.call(); 636 auto call_requester = std::move(call_requester_); 637 if (ctx_->context_allocator() != nullptr) { 638 ctx_->context_allocator()->Release(ctx_); 639 } 640 this->~ServerCallbackWriterImpl(); // explicitly call destructor 641 ::grpc::g_core_codegen_interface->grpc_call_unref(call); 642 call_requester(); 643 } 644 reactor()645 ServerReactor* reactor() override { 646 return reactor_.load(std::memory_order_relaxed); 647 } 648 649 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 650 meta_ops_; 651 ::grpc::internal::CallbackWithSuccessTag meta_tag_; 652 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 653 ::grpc::internal::CallOpSendMessage, 654 ::grpc::internal::CallOpServerSendStatus> 655 finish_ops_; 656 ::grpc::internal::CallbackWithSuccessTag finish_tag_; 657 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 658 ::grpc::internal::CallOpSendMessage> 659 write_ops_; 660 ::grpc::internal::CallbackWithSuccessTag write_tag_; 661 662 ::grpc::CallbackServerContext* const ctx_; 663 ::grpc::internal::Call call_; 664 const RequestType* req_; 665 std::function<void()> call_requester_; 666 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 667 std::atomic<ServerWriteReactor<ResponseType>*> reactor_; 668 // callbacks_outstanding_ follows a refcount pattern 669 std::atomic<intptr_t> callbacks_outstanding_{ 670 3}; // reserve for OnStarted, Finish, and CompletionOp 671 }; 672 }; 673 674 template <class RequestType, class ResponseType> 675 class CallbackBidiHandler : public ::grpc::internal::MethodHandler { 676 public: CallbackBidiHandler(std::function<ServerBidiReactor<RequestType,ResponseType> * (::grpc::CallbackServerContext *)> get_reactor)677 explicit CallbackBidiHandler( 678 std::function<ServerBidiReactor<RequestType, ResponseType>*( 679 ::grpc::CallbackServerContext*)> 680 get_reactor) 681 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)682 void RunHandler(const HandlerParameter& param) final { 683 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); 684 685 auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 686 param.call->call(), sizeof(ServerCallbackReaderWriterImpl))) 687 ServerCallbackReaderWriterImpl( 688 static_cast<::grpc::CallbackServerContext*>(param.server_context), 689 param.call, param.call_requester); 690 // Inlineable OnDone can be false in the CompletionOp callback because there 691 // is no bidi reactor that has an inlineable OnDone; this only applies to 692 // the DefaultReactor (which is unary). 693 param.server_context->BeginCompletionOp( 694 param.call, 695 [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); }, 696 stream); 697 698 ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr; 699 if (param.status.ok()) { 700 reactor = ::grpc::internal::CatchingReactorGetter< 701 ServerBidiReactor<RequestType, ResponseType>>( 702 get_reactor_, 703 static_cast<::grpc::CallbackServerContext*>(param.server_context)); 704 } 705 706 if (reactor == nullptr) { 707 // if deserialization or reactor creator failed, we need to fail the call 708 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 709 param.call->call(), 710 sizeof(UnimplementedBidiReactor<RequestType, ResponseType>))) 711 UnimplementedBidiReactor<RequestType, ResponseType>( 712 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); 713 } 714 715 stream->SetupReactor(reactor); 716 } 717 718 private: 719 std::function<ServerBidiReactor<RequestType, ResponseType>*( 720 ::grpc::CallbackServerContext*)> 721 get_reactor_; 722 723 class ServerCallbackReaderWriterImpl 724 : public ServerCallbackReaderWriter<RequestType, ResponseType> { 725 public: Finish(::grpc::Status s)726 void Finish(::grpc::Status s) override { 727 // A finish tag with only MaybeDone can have its callback inlined 728 // regardless even if OnDone is not inlineable because this callback just 729 // checks a ref and then decides whether or not to dispatch OnDone. 730 finish_tag_.Set( 731 call_.call(), 732 [this](bool) { 733 // Inlineable OnDone can be false here because there is 734 // no bidi reactor that has an inlineable OnDone; this 735 // only applies to the DefaultReactor (which is unary). 736 this->MaybeDone(/*inlineable_ondone=*/false); 737 }, 738 &finish_ops_, /*can_inline=*/true); 739 finish_ops_.set_core_cq_tag(&finish_tag_); 740 741 if (!ctx_->sent_initial_metadata_) { 742 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 743 ctx_->initial_metadata_flags()); 744 if (ctx_->compression_level_set()) { 745 finish_ops_.set_compression_level(ctx_->compression_level()); 746 } 747 ctx_->sent_initial_metadata_ = true; 748 } 749 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 750 call_.PerformOps(&finish_ops_); 751 } 752 SendInitialMetadata()753 void SendInitialMetadata() override { 754 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 755 this->Ref(); 756 // The callback for this function should not be inlined because it invokes 757 // a user-controlled reaction, but any resulting OnDone can be inlined in 758 // the executor to which this callback is dispatched. 759 meta_tag_.Set( 760 call_.call(), 761 [this](bool ok) { 762 ServerBidiReactor<RequestType, ResponseType>* reactor = 763 reactor_.load(std::memory_order_relaxed); 764 reactor->OnSendInitialMetadataDone(ok); 765 this->MaybeDone(/*inlineable_ondone=*/true); 766 }, 767 &meta_ops_, /*can_inline=*/false); 768 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 769 ctx_->initial_metadata_flags()); 770 if (ctx_->compression_level_set()) { 771 meta_ops_.set_compression_level(ctx_->compression_level()); 772 } 773 ctx_->sent_initial_metadata_ = true; 774 meta_ops_.set_core_cq_tag(&meta_tag_); 775 call_.PerformOps(&meta_ops_); 776 } 777 Write(const ResponseType * resp,::grpc::WriteOptions options)778 void Write(const ResponseType* resp, 779 ::grpc::WriteOptions options) override { 780 this->Ref(); 781 if (options.is_last_message()) { 782 options.set_buffer_hint(); 783 } 784 if (!ctx_->sent_initial_metadata_) { 785 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 786 ctx_->initial_metadata_flags()); 787 if (ctx_->compression_level_set()) { 788 write_ops_.set_compression_level(ctx_->compression_level()); 789 } 790 ctx_->sent_initial_metadata_ = true; 791 } 792 // TODO(vjpai): don't assert 793 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); 794 call_.PerformOps(&write_ops_); 795 } 796 WriteAndFinish(const ResponseType * resp,::grpc::WriteOptions options,::grpc::Status s)797 void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options, 798 ::grpc::Status s) override { 799 // TODO(vjpai): don't assert 800 GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); 801 Finish(std::move(s)); 802 } 803 Read(RequestType * req)804 void Read(RequestType* req) override { 805 this->Ref(); 806 read_ops_.RecvMessage(req); 807 call_.PerformOps(&read_ops_); 808 } 809 810 private: 811 friend class CallbackBidiHandler<RequestType, ResponseType>; 812 ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext * ctx,::grpc::internal::Call * call,std::function<void ()> call_requester)813 ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx, 814 ::grpc::internal::Call* call, 815 std::function<void()> call_requester) 816 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} 817 SetupReactor(ServerBidiReactor<RequestType,ResponseType> * reactor)818 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) { 819 reactor_.store(reactor, std::memory_order_relaxed); 820 // The callbacks for these functions should not be inlined because they 821 // invoke user-controlled reactions, but any resulting OnDones can be 822 // inlined in the executor to which a callback is dispatched. 823 write_tag_.Set( 824 call_.call(), 825 [this, reactor](bool ok) { 826 reactor->OnWriteDone(ok); 827 this->MaybeDone(/*inlineable_ondone=*/true); 828 }, 829 &write_ops_, /*can_inline=*/false); 830 write_ops_.set_core_cq_tag(&write_tag_); 831 read_tag_.Set( 832 call_.call(), 833 [this, reactor](bool ok) { 834 reactor->OnReadDone(ok); 835 this->MaybeDone(/*inlineable_ondone=*/true); 836 }, 837 &read_ops_, /*can_inline=*/false); 838 read_ops_.set_core_cq_tag(&read_tag_); 839 this->BindReactor(reactor); 840 this->MaybeCallOnCancel(reactor); 841 // Inlineable OnDone can be false here because there is no bidi 842 // reactor that has an inlineable OnDone; this only applies to the 843 // DefaultReactor (which is unary). 844 this->MaybeDone(/*inlineable_ondone=*/false); 845 } 846 CallOnDone()847 void CallOnDone() override { 848 reactor_.load(std::memory_order_relaxed)->OnDone(); 849 grpc_call* call = call_.call(); 850 auto call_requester = std::move(call_requester_); 851 if (ctx_->context_allocator() != nullptr) { 852 ctx_->context_allocator()->Release(ctx_); 853 } 854 this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor 855 ::grpc::g_core_codegen_interface->grpc_call_unref(call); 856 call_requester(); 857 } 858 reactor()859 ServerReactor* reactor() override { 860 return reactor_.load(std::memory_order_relaxed); 861 } 862 863 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 864 meta_ops_; 865 ::grpc::internal::CallbackWithSuccessTag meta_tag_; 866 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 867 ::grpc::internal::CallOpSendMessage, 868 ::grpc::internal::CallOpServerSendStatus> 869 finish_ops_; 870 ::grpc::internal::CallbackWithSuccessTag finish_tag_; 871 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 872 ::grpc::internal::CallOpSendMessage> 873 write_ops_; 874 ::grpc::internal::CallbackWithSuccessTag write_tag_; 875 ::grpc::internal::CallOpSet< 876 ::grpc::internal::CallOpRecvMessage<RequestType>> 877 read_ops_; 878 ::grpc::internal::CallbackWithSuccessTag read_tag_; 879 880 ::grpc::CallbackServerContext* const ctx_; 881 ::grpc::internal::Call call_; 882 std::function<void()> call_requester_; 883 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 884 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_; 885 // callbacks_outstanding_ follows a refcount pattern 886 std::atomic<intptr_t> callbacks_outstanding_{ 887 3}; // reserve for OnStarted, Finish, and CompletionOp 888 }; 889 }; 890 891 } // namespace internal 892 } // namespace grpc 893 894 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H 895