1 /* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H 19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H 20 21 #include <grpcpp/impl/codegen/message_allocator.h> 22 #include <grpcpp/impl/codegen/rpc_service_method.h> 23 #include <grpcpp/impl/codegen/server_callback_impl.h> 24 #include <grpcpp/impl/codegen/server_context_impl.h> 25 #include <grpcpp/impl/codegen/status.h> 26 27 namespace grpc_impl { 28 namespace internal { 29 30 template <class RequestType, class ResponseType> 31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler { 32 public: CallbackUnaryHandler(std::function<ServerUnaryReactor * (::grpc_impl::CallbackServerContext *,const RequestType *,ResponseType *)> get_reactor)33 explicit CallbackUnaryHandler( 34 std::function<ServerUnaryReactor*(::grpc_impl::CallbackServerContext*, 35 const RequestType*, ResponseType*)> 36 get_reactor) 37 : get_reactor_(std::move(get_reactor)) {} 38 SetMessageAllocator(::grpc::experimental::MessageAllocator<RequestType,ResponseType> * allocator)39 void SetMessageAllocator( 40 ::grpc::experimental::MessageAllocator<RequestType, ResponseType>* 41 allocator) { 42 allocator_ = allocator; 43 } 44 RunHandler(const HandlerParameter & param)45 void RunHandler(const HandlerParameter& param) final { 46 // Arena allocate a controller structure (that includes request/response) 47 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); 48 auto* allocator_state = static_cast< 49 ::grpc::experimental::MessageHolder<RequestType, ResponseType>*>( 50 param.internal_data); 51 52 auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 53 param.call->call(), sizeof(ServerCallbackUnaryImpl))) 54 ServerCallbackUnaryImpl( 55 static_cast<::grpc_impl::CallbackServerContext*>( 56 param.server_context), 57 param.call, allocator_state, std::move(param.call_requester)); 58 param.server_context->BeginCompletionOp( 59 param.call, [call](bool) { call->MaybeDone(); }, call); 60 61 ServerUnaryReactor* reactor = nullptr; 62 if (param.status.ok()) { 63 reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>( 64 get_reactor_, 65 static_cast<::grpc_impl::CallbackServerContext*>( 66 param.server_context), 67 call->request(), call->response()); 68 } 69 70 if (reactor == nullptr) { 71 // if deserialization or reactor creator failed, we need to fail the call 72 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 73 param.call->call(), sizeof(UnimplementedUnaryReactor))) 74 UnimplementedUnaryReactor( 75 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); 76 } 77 78 /// Invoke SetupReactor as the last part of the handler 79 call->SetupReactor(reactor); 80 } 81 Deserialize(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,void ** handler_data)82 void* Deserialize(grpc_call* call, grpc_byte_buffer* req, 83 ::grpc::Status* status, void** handler_data) final { 84 ::grpc::ByteBuffer buf; 85 buf.set_buffer(req); 86 RequestType* request = nullptr; 87 ::grpc::experimental::MessageHolder<RequestType, ResponseType>* 88 allocator_state = nullptr; 89 if (allocator_ != nullptr) { 90 allocator_state = allocator_->AllocateMessages(); 91 } else { 92 allocator_state = 93 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 94 call, sizeof(DefaultMessageHolder<RequestType, ResponseType>))) 95 DefaultMessageHolder<RequestType, ResponseType>(); 96 } 97 *handler_data = allocator_state; 98 request = allocator_state->request(); 99 *status = 100 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); 101 buf.Release(); 102 if (status->ok()) { 103 return request; 104 } 105 // Clean up on deserialization failure. 106 allocator_state->Release(); 107 return nullptr; 108 } 109 110 private: 111 std::function<ServerUnaryReactor*(::grpc_impl::CallbackServerContext*, 112 const RequestType*, ResponseType*)> 113 get_reactor_; 114 ::grpc::experimental::MessageAllocator<RequestType, ResponseType>* 115 allocator_ = nullptr; 116 117 class ServerCallbackUnaryImpl : public ServerCallbackUnary { 118 public: Finish(::grpc::Status s)119 void Finish(::grpc::Status s) override { 120 // A callback that only contains a call to MaybeDone can be run as an 121 // inline callback regardless of whether or not OnDone is inlineable 122 // because if the actual OnDone callback needs to be scheduled, MaybeDone 123 // is responsible for dispatching to an executor thread if needed. Thus, 124 // when setting up the finish_tag_, we can set its own callback to 125 // inlineable. 126 finish_tag_.Set( 127 call_.call(), 128 [this](bool) { 129 this->MaybeDone( 130 reactor_.load(std::memory_order_relaxed)->InternalInlineable()); 131 }, 132 &finish_ops_, /*can_inline=*/true); 133 finish_ops_.set_core_cq_tag(&finish_tag_); 134 135 if (!ctx_->sent_initial_metadata_) { 136 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 137 ctx_->initial_metadata_flags()); 138 if (ctx_->compression_level_set()) { 139 finish_ops_.set_compression_level(ctx_->compression_level()); 140 } 141 ctx_->sent_initial_metadata_ = true; 142 } 143 // The response is dropped if the status is not OK. 144 if (s.ok()) { 145 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, 146 finish_ops_.SendMessagePtr(response())); 147 } else { 148 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 149 } 150 finish_ops_.set_core_cq_tag(&finish_tag_); 151 call_.PerformOps(&finish_ops_); 152 } 153 SendInitialMetadata()154 void SendInitialMetadata() override { 155 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 156 this->Ref(); 157 // The callback for this function should not be marked inline because it 158 // is directly invoking a user-controlled reaction 159 // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor 160 // thread. However, any OnDone needed after that can be inlined because it 161 // is already running on an executor thread. 162 meta_tag_.Set(call_.call(), 163 [this](bool ok) { 164 ServerUnaryReactor* reactor = 165 reactor_.load(std::memory_order_relaxed); 166 reactor->OnSendInitialMetadataDone(ok); 167 this->MaybeDone(/*inlineable_ondone=*/true); 168 }, 169 &meta_ops_, /*can_inline=*/false); 170 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 171 ctx_->initial_metadata_flags()); 172 if (ctx_->compression_level_set()) { 173 meta_ops_.set_compression_level(ctx_->compression_level()); 174 } 175 ctx_->sent_initial_metadata_ = true; 176 meta_ops_.set_core_cq_tag(&meta_tag_); 177 call_.PerformOps(&meta_ops_); 178 } 179 180 private: 181 friend class CallbackUnaryHandler<RequestType, ResponseType>; 182 ServerCallbackUnaryImpl(::grpc_impl::CallbackServerContext * ctx,::grpc::internal::Call * call,::grpc::experimental::MessageHolder<RequestType,ResponseType> * allocator_state,std::function<void ()> call_requester)183 ServerCallbackUnaryImpl( 184 ::grpc_impl::CallbackServerContext* ctx, ::grpc::internal::Call* call, 185 ::grpc::experimental::MessageHolder<RequestType, ResponseType>* 186 allocator_state, 187 std::function<void()> call_requester) 188 : ctx_(ctx), 189 call_(*call), 190 allocator_state_(allocator_state), 191 call_requester_(std::move(call_requester)) { 192 ctx_->set_message_allocator_state(allocator_state); 193 } 194 195 /// SetupReactor binds the reactor (which also releases any queued 196 /// operations), maybe calls OnCancel if possible/needed, and maybe marks 197 /// the completion of the RPC. This should be the last component of the 198 /// handler. SetupReactor(ServerUnaryReactor * reactor)199 void SetupReactor(ServerUnaryReactor* reactor) { 200 reactor_.store(reactor, std::memory_order_relaxed); 201 this->BindReactor(reactor); 202 this->MaybeCallOnCancel(reactor); 203 this->MaybeDone(reactor->InternalInlineable()); 204 } 205 request()206 const RequestType* request() { return allocator_state_->request(); } response()207 ResponseType* response() { return allocator_state_->response(); } 208 CallOnDone()209 void CallOnDone() override { 210 reactor_.load(std::memory_order_relaxed)->OnDone(); 211 grpc_call* call = call_.call(); 212 auto call_requester = std::move(call_requester_); 213 allocator_state_->Release(); 214 this->~ServerCallbackUnaryImpl(); // explicitly call destructor 215 ::grpc::g_core_codegen_interface->grpc_call_unref(call); 216 call_requester(); 217 } 218 reactor()219 ServerReactor* reactor() override { 220 return reactor_.load(std::memory_order_relaxed); 221 } 222 223 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 224 meta_ops_; 225 ::grpc::internal::CallbackWithSuccessTag meta_tag_; 226 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 227 ::grpc::internal::CallOpSendMessage, 228 ::grpc::internal::CallOpServerSendStatus> 229 finish_ops_; 230 ::grpc::internal::CallbackWithSuccessTag finish_tag_; 231 232 ::grpc_impl::CallbackServerContext* const ctx_; 233 ::grpc::internal::Call call_; 234 ::grpc::experimental::MessageHolder<RequestType, ResponseType>* const 235 allocator_state_; 236 std::function<void()> call_requester_; 237 // reactor_ can always be loaded/stored with relaxed memory ordering because 238 // its value is only set once, independently of other data in the object, 239 // and the loads that use it will always actually come provably later even 240 // though they are from different threads since they are triggered by 241 // actions initiated only by the setting up of the reactor_ variable. In 242 // a sense, it's a delayed "const": it gets its value from the SetupReactor 243 // method (not the constructor, so it's not a true const), but it doesn't 244 // change after that and it only gets used by actions caused, directly or 245 // indirectly, by that setup. This comment also applies to the reactor_ 246 // variables of the other streaming objects in this file. 247 std::atomic<ServerUnaryReactor*> reactor_; 248 // callbacks_outstanding_ follows a refcount pattern 249 std::atomic<intptr_t> callbacks_outstanding_{ 250 3}; // reserve for start, Finish, and CompletionOp 251 }; 252 }; 253 254 template <class RequestType, class ResponseType> 255 class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler { 256 public: CallbackClientStreamingHandler(std::function<ServerReadReactor<RequestType> * (::grpc_impl::CallbackServerContext *,ResponseType *)> get_reactor)257 explicit CallbackClientStreamingHandler( 258 std::function<ServerReadReactor<RequestType>*( 259 ::grpc_impl::CallbackServerContext*, ResponseType*)> 260 get_reactor) 261 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)262 void RunHandler(const HandlerParameter& param) final { 263 // Arena allocate a reader structure (that includes response) 264 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); 265 266 auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 267 param.call->call(), sizeof(ServerCallbackReaderImpl))) 268 ServerCallbackReaderImpl( 269 static_cast<::grpc_impl::CallbackServerContext*>( 270 param.server_context), 271 param.call, std::move(param.call_requester)); 272 // Inlineable OnDone can be false in the CompletionOp callback because there 273 // is no read reactor that has an inlineable OnDone; this only applies to 274 // the DefaultReactor (which is unary). 275 param.server_context->BeginCompletionOp( 276 param.call, 277 [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); }, 278 reader); 279 280 ServerReadReactor<RequestType>* reactor = nullptr; 281 if (param.status.ok()) { 282 reactor = ::grpc::internal::CatchingReactorGetter< 283 ServerReadReactor<RequestType>>( 284 get_reactor_, 285 static_cast<::grpc_impl::CallbackServerContext*>( 286 param.server_context), 287 reader->response()); 288 } 289 290 if (reactor == nullptr) { 291 // if deserialization or reactor creator failed, we need to fail the call 292 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 293 param.call->call(), sizeof(UnimplementedReadReactor<RequestType>))) 294 UnimplementedReadReactor<RequestType>( 295 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); 296 } 297 298 reader->SetupReactor(reactor); 299 } 300 301 private: 302 std::function<ServerReadReactor<RequestType>*( 303 ::grpc_impl::CallbackServerContext*, ResponseType*)> 304 get_reactor_; 305 306 class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> { 307 public: Finish(::grpc::Status s)308 void Finish(::grpc::Status s) override { 309 // A finish tag with only MaybeDone can have its callback inlined 310 // regardless even if OnDone is not inlineable because this callback just 311 // checks a ref and then decides whether or not to dispatch OnDone. 312 finish_tag_.Set(call_.call(), 313 [this](bool) { 314 // Inlineable OnDone can be false here because there is 315 // no read reactor that has an inlineable OnDone; this 316 // only applies to the DefaultReactor (which is unary). 317 this->MaybeDone(/*inlineable_ondone=*/false); 318 }, 319 &finish_ops_, /*can_inline=*/true); 320 if (!ctx_->sent_initial_metadata_) { 321 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 322 ctx_->initial_metadata_flags()); 323 if (ctx_->compression_level_set()) { 324 finish_ops_.set_compression_level(ctx_->compression_level()); 325 } 326 ctx_->sent_initial_metadata_ = true; 327 } 328 // The response is dropped if the status is not OK. 329 if (s.ok()) { 330 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, 331 finish_ops_.SendMessagePtr(&resp_)); 332 } else { 333 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 334 } 335 finish_ops_.set_core_cq_tag(&finish_tag_); 336 call_.PerformOps(&finish_ops_); 337 } 338 SendInitialMetadata()339 void SendInitialMetadata() override { 340 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 341 this->Ref(); 342 // The callback for this function should not be inlined because it invokes 343 // a user-controlled reaction, but any resulting OnDone can be inlined in 344 // the executor to which this callback is dispatched. 345 meta_tag_.Set(call_.call(), 346 [this](bool ok) { 347 ServerReadReactor<RequestType>* reactor = 348 reactor_.load(std::memory_order_relaxed); 349 reactor->OnSendInitialMetadataDone(ok); 350 this->MaybeDone(/*inlineable_ondone=*/true); 351 }, 352 &meta_ops_, /*can_inline=*/false); 353 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 354 ctx_->initial_metadata_flags()); 355 if (ctx_->compression_level_set()) { 356 meta_ops_.set_compression_level(ctx_->compression_level()); 357 } 358 ctx_->sent_initial_metadata_ = true; 359 meta_ops_.set_core_cq_tag(&meta_tag_); 360 call_.PerformOps(&meta_ops_); 361 } 362 Read(RequestType * req)363 void Read(RequestType* req) override { 364 this->Ref(); 365 read_ops_.RecvMessage(req); 366 call_.PerformOps(&read_ops_); 367 } 368 369 private: 370 friend class CallbackClientStreamingHandler<RequestType, ResponseType>; 371 ServerCallbackReaderImpl(::grpc_impl::CallbackServerContext * ctx,::grpc::internal::Call * call,std::function<void ()> call_requester)372 ServerCallbackReaderImpl(::grpc_impl::CallbackServerContext* ctx, 373 ::grpc::internal::Call* call, 374 std::function<void()> call_requester) 375 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} 376 SetupReactor(ServerReadReactor<RequestType> * reactor)377 void SetupReactor(ServerReadReactor<RequestType>* reactor) { 378 reactor_.store(reactor, std::memory_order_relaxed); 379 // The callback for this function should not be inlined because it invokes 380 // a user-controlled reaction, but any resulting OnDone can be inlined in 381 // the executor to which this callback is dispatched. 382 read_tag_.Set(call_.call(), 383 [this, reactor](bool ok) { 384 reactor->OnReadDone(ok); 385 this->MaybeDone(/*inlineable_ondone=*/true); 386 }, 387 &read_ops_, /*can_inline=*/false); 388 read_ops_.set_core_cq_tag(&read_tag_); 389 this->BindReactor(reactor); 390 this->MaybeCallOnCancel(reactor); 391 // Inlineable OnDone can be false here because there is no read 392 // reactor that has an inlineable OnDone; this only applies to the 393 // DefaultReactor (which is unary). 394 this->MaybeDone(/*inlineable_ondone=*/false); 395 } 396 ~ServerCallbackReaderImpl()397 ~ServerCallbackReaderImpl() {} 398 response()399 ResponseType* response() { return &resp_; } 400 CallOnDone()401 void CallOnDone() override { 402 reactor_.load(std::memory_order_relaxed)->OnDone(); 403 grpc_call* call = call_.call(); 404 auto call_requester = std::move(call_requester_); 405 this->~ServerCallbackReaderImpl(); // explicitly call destructor 406 ::grpc::g_core_codegen_interface->grpc_call_unref(call); 407 call_requester(); 408 } 409 reactor()410 ServerReactor* reactor() override { 411 return reactor_.load(std::memory_order_relaxed); 412 } 413 414 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 415 meta_ops_; 416 ::grpc::internal::CallbackWithSuccessTag meta_tag_; 417 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 418 ::grpc::internal::CallOpSendMessage, 419 ::grpc::internal::CallOpServerSendStatus> 420 finish_ops_; 421 ::grpc::internal::CallbackWithSuccessTag finish_tag_; 422 ::grpc::internal::CallOpSet< 423 ::grpc::internal::CallOpRecvMessage<RequestType>> 424 read_ops_; 425 ::grpc::internal::CallbackWithSuccessTag read_tag_; 426 427 ::grpc_impl::CallbackServerContext* const ctx_; 428 ::grpc::internal::Call call_; 429 ResponseType resp_; 430 std::function<void()> call_requester_; 431 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 432 std::atomic<ServerReadReactor<RequestType>*> reactor_; 433 // callbacks_outstanding_ follows a refcount pattern 434 std::atomic<intptr_t> callbacks_outstanding_{ 435 3}; // reserve for OnStarted, Finish, and CompletionOp 436 }; 437 }; 438 439 template <class RequestType, class ResponseType> 440 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler { 441 public: CallbackServerStreamingHandler(std::function<ServerWriteReactor<ResponseType> * (::grpc_impl::CallbackServerContext *,const RequestType *)> get_reactor)442 explicit CallbackServerStreamingHandler( 443 std::function<ServerWriteReactor<ResponseType>*( 444 ::grpc_impl::CallbackServerContext*, const RequestType*)> 445 get_reactor) 446 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)447 void RunHandler(const HandlerParameter& param) final { 448 // Arena allocate a writer structure 449 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); 450 451 auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 452 param.call->call(), sizeof(ServerCallbackWriterImpl))) 453 ServerCallbackWriterImpl( 454 static_cast<::grpc_impl::CallbackServerContext*>( 455 param.server_context), 456 param.call, static_cast<RequestType*>(param.request), 457 std::move(param.call_requester)); 458 // Inlineable OnDone can be false in the CompletionOp callback because there 459 // is no write reactor that has an inlineable OnDone; this only applies to 460 // the DefaultReactor (which is unary). 461 param.server_context->BeginCompletionOp( 462 param.call, 463 [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); }, 464 writer); 465 466 ServerWriteReactor<ResponseType>* reactor = nullptr; 467 if (param.status.ok()) { 468 reactor = ::grpc::internal::CatchingReactorGetter< 469 ServerWriteReactor<ResponseType>>( 470 get_reactor_, 471 static_cast<::grpc_impl::CallbackServerContext*>( 472 param.server_context), 473 writer->request()); 474 } 475 if (reactor == nullptr) { 476 // if deserialization or reactor creator failed, we need to fail the call 477 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 478 param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>))) 479 UnimplementedWriteReactor<ResponseType>( 480 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); 481 } 482 483 writer->SetupReactor(reactor); 484 } 485 Deserialize(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,void **)486 void* Deserialize(grpc_call* call, grpc_byte_buffer* req, 487 ::grpc::Status* status, void** /*handler_data*/) final { 488 ::grpc::ByteBuffer buf; 489 buf.set_buffer(req); 490 auto* request = 491 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 492 call, sizeof(RequestType))) RequestType(); 493 *status = 494 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); 495 buf.Release(); 496 if (status->ok()) { 497 return request; 498 } 499 request->~RequestType(); 500 return nullptr; 501 } 502 503 private: 504 std::function<ServerWriteReactor<ResponseType>*( 505 ::grpc_impl::CallbackServerContext*, const RequestType*)> 506 get_reactor_; 507 508 class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> { 509 public: Finish(::grpc::Status s)510 void Finish(::grpc::Status s) override { 511 // A finish tag with only MaybeDone can have its callback inlined 512 // regardless even if OnDone is not inlineable because this callback just 513 // checks a ref and then decides whether or not to dispatch OnDone. 514 finish_tag_.Set(call_.call(), 515 [this](bool) { 516 // Inlineable OnDone can be false here because there is 517 // no write reactor that has an inlineable OnDone; this 518 // only applies to the DefaultReactor (which is unary). 519 this->MaybeDone(/*inlineable_ondone=*/false); 520 }, 521 &finish_ops_, /*can_inline=*/true); 522 finish_ops_.set_core_cq_tag(&finish_tag_); 523 524 if (!ctx_->sent_initial_metadata_) { 525 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 526 ctx_->initial_metadata_flags()); 527 if (ctx_->compression_level_set()) { 528 finish_ops_.set_compression_level(ctx_->compression_level()); 529 } 530 ctx_->sent_initial_metadata_ = true; 531 } 532 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 533 call_.PerformOps(&finish_ops_); 534 } 535 SendInitialMetadata()536 void SendInitialMetadata() override { 537 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 538 this->Ref(); 539 // The callback for this function should not be inlined because it invokes 540 // a user-controlled reaction, but any resulting OnDone can be inlined in 541 // the executor to which this callback is dispatched. 542 meta_tag_.Set(call_.call(), 543 [this](bool ok) { 544 ServerWriteReactor<ResponseType>* reactor = 545 reactor_.load(std::memory_order_relaxed); 546 reactor->OnSendInitialMetadataDone(ok); 547 this->MaybeDone(/*inlineable_ondone=*/true); 548 }, 549 &meta_ops_, /*can_inline=*/false); 550 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 551 ctx_->initial_metadata_flags()); 552 if (ctx_->compression_level_set()) { 553 meta_ops_.set_compression_level(ctx_->compression_level()); 554 } 555 ctx_->sent_initial_metadata_ = true; 556 meta_ops_.set_core_cq_tag(&meta_tag_); 557 call_.PerformOps(&meta_ops_); 558 } 559 Write(const ResponseType * resp,::grpc::WriteOptions options)560 void Write(const ResponseType* resp, 561 ::grpc::WriteOptions options) override { 562 this->Ref(); 563 if (options.is_last_message()) { 564 options.set_buffer_hint(); 565 } 566 if (!ctx_->sent_initial_metadata_) { 567 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 568 ctx_->initial_metadata_flags()); 569 if (ctx_->compression_level_set()) { 570 write_ops_.set_compression_level(ctx_->compression_level()); 571 } 572 ctx_->sent_initial_metadata_ = true; 573 } 574 // TODO(vjpai): don't assert 575 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); 576 call_.PerformOps(&write_ops_); 577 } 578 WriteAndFinish(const ResponseType * resp,::grpc::WriteOptions options,::grpc::Status s)579 void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options, 580 ::grpc::Status s) override { 581 // This combines the write into the finish callback 582 // TODO(vjpai): don't assert 583 GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); 584 Finish(std::move(s)); 585 } 586 587 private: 588 friend class CallbackServerStreamingHandler<RequestType, ResponseType>; 589 ServerCallbackWriterImpl(::grpc_impl::CallbackServerContext * ctx,::grpc::internal::Call * call,const RequestType * req,std::function<void ()> call_requester)590 ServerCallbackWriterImpl(::grpc_impl::CallbackServerContext* ctx, 591 ::grpc::internal::Call* call, 592 const RequestType* req, 593 std::function<void()> call_requester) 594 : ctx_(ctx), 595 call_(*call), 596 req_(req), 597 call_requester_(std::move(call_requester)) {} 598 SetupReactor(ServerWriteReactor<ResponseType> * reactor)599 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) { 600 reactor_.store(reactor, std::memory_order_relaxed); 601 // The callback for this function should not be inlined because it invokes 602 // a user-controlled reaction, but any resulting OnDone can be inlined in 603 // the executor to which this callback is dispatched. 604 write_tag_.Set(call_.call(), 605 [this, reactor](bool ok) { 606 reactor->OnWriteDone(ok); 607 this->MaybeDone(/*inlineable_ondone=*/true); 608 }, 609 &write_ops_, /*can_inline=*/false); 610 write_ops_.set_core_cq_tag(&write_tag_); 611 this->BindReactor(reactor); 612 this->MaybeCallOnCancel(reactor); 613 // Inlineable OnDone can be false here because there is no write 614 // reactor that has an inlineable OnDone; this only applies to the 615 // DefaultReactor (which is unary). 616 this->MaybeDone(/*inlineable_ondone=*/false); 617 } ~ServerCallbackWriterImpl()618 ~ServerCallbackWriterImpl() { req_->~RequestType(); } 619 request()620 const RequestType* request() { return req_; } 621 CallOnDone()622 void CallOnDone() override { 623 reactor_.load(std::memory_order_relaxed)->OnDone(); 624 grpc_call* call = call_.call(); 625 auto call_requester = std::move(call_requester_); 626 this->~ServerCallbackWriterImpl(); // explicitly call destructor 627 ::grpc::g_core_codegen_interface->grpc_call_unref(call); 628 call_requester(); 629 } 630 reactor()631 ServerReactor* reactor() override { 632 return reactor_.load(std::memory_order_relaxed); 633 } 634 635 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 636 meta_ops_; 637 ::grpc::internal::CallbackWithSuccessTag meta_tag_; 638 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 639 ::grpc::internal::CallOpSendMessage, 640 ::grpc::internal::CallOpServerSendStatus> 641 finish_ops_; 642 ::grpc::internal::CallbackWithSuccessTag finish_tag_; 643 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 644 ::grpc::internal::CallOpSendMessage> 645 write_ops_; 646 ::grpc::internal::CallbackWithSuccessTag write_tag_; 647 648 ::grpc_impl::CallbackServerContext* const ctx_; 649 ::grpc::internal::Call call_; 650 const RequestType* req_; 651 std::function<void()> call_requester_; 652 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 653 std::atomic<ServerWriteReactor<ResponseType>*> reactor_; 654 // callbacks_outstanding_ follows a refcount pattern 655 std::atomic<intptr_t> callbacks_outstanding_{ 656 3}; // reserve for OnStarted, Finish, and CompletionOp 657 }; 658 }; 659 660 template <class RequestType, class ResponseType> 661 class CallbackBidiHandler : public ::grpc::internal::MethodHandler { 662 public: CallbackBidiHandler(std::function<ServerBidiReactor<RequestType,ResponseType> * (::grpc_impl::CallbackServerContext *)> get_reactor)663 explicit CallbackBidiHandler( 664 std::function<ServerBidiReactor<RequestType, ResponseType>*( 665 ::grpc_impl::CallbackServerContext*)> 666 get_reactor) 667 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)668 void RunHandler(const HandlerParameter& param) final { 669 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); 670 671 auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 672 param.call->call(), sizeof(ServerCallbackReaderWriterImpl))) 673 ServerCallbackReaderWriterImpl( 674 static_cast<::grpc_impl::CallbackServerContext*>( 675 param.server_context), 676 param.call, std::move(param.call_requester)); 677 // Inlineable OnDone can be false in the CompletionOp callback because there 678 // is no bidi reactor that has an inlineable OnDone; this only applies to 679 // the DefaultReactor (which is unary). 680 param.server_context->BeginCompletionOp( 681 param.call, 682 [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); }, 683 stream); 684 685 ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr; 686 if (param.status.ok()) { 687 reactor = ::grpc::internal::CatchingReactorGetter< 688 ServerBidiReactor<RequestType, ResponseType>>( 689 get_reactor_, static_cast<::grpc_impl::CallbackServerContext*>( 690 param.server_context)); 691 } 692 693 if (reactor == nullptr) { 694 // if deserialization or reactor creator failed, we need to fail the call 695 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 696 param.call->call(), 697 sizeof(UnimplementedBidiReactor<RequestType, ResponseType>))) 698 UnimplementedBidiReactor<RequestType, ResponseType>( 699 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); 700 } 701 702 stream->SetupReactor(reactor); 703 } 704 705 private: 706 std::function<ServerBidiReactor<RequestType, ResponseType>*( 707 ::grpc_impl::CallbackServerContext*)> 708 get_reactor_; 709 710 class ServerCallbackReaderWriterImpl 711 : public ServerCallbackReaderWriter<RequestType, ResponseType> { 712 public: Finish(::grpc::Status s)713 void Finish(::grpc::Status s) override { 714 // A finish tag with only MaybeDone can have its callback inlined 715 // regardless even if OnDone is not inlineable because this callback just 716 // checks a ref and then decides whether or not to dispatch OnDone. 717 finish_tag_.Set(call_.call(), 718 [this](bool) { 719 // Inlineable OnDone can be false here because there is 720 // no bidi reactor that has an inlineable OnDone; this 721 // only applies to the DefaultReactor (which is unary). 722 this->MaybeDone(/*inlineable_ondone=*/false); 723 }, 724 &finish_ops_, /*can_inline=*/true); 725 finish_ops_.set_core_cq_tag(&finish_tag_); 726 727 if (!ctx_->sent_initial_metadata_) { 728 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 729 ctx_->initial_metadata_flags()); 730 if (ctx_->compression_level_set()) { 731 finish_ops_.set_compression_level(ctx_->compression_level()); 732 } 733 ctx_->sent_initial_metadata_ = true; 734 } 735 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 736 call_.PerformOps(&finish_ops_); 737 } 738 SendInitialMetadata()739 void SendInitialMetadata() override { 740 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 741 this->Ref(); 742 // The callback for this function should not be inlined because it invokes 743 // a user-controlled reaction, but any resulting OnDone can be inlined in 744 // the executor to which this callback is dispatched. 745 meta_tag_.Set(call_.call(), 746 [this](bool ok) { 747 ServerBidiReactor<RequestType, ResponseType>* reactor = 748 reactor_.load(std::memory_order_relaxed); 749 reactor->OnSendInitialMetadataDone(ok); 750 this->MaybeDone(/*inlineable_ondone=*/true); 751 }, 752 &meta_ops_, /*can_inline=*/false); 753 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 754 ctx_->initial_metadata_flags()); 755 if (ctx_->compression_level_set()) { 756 meta_ops_.set_compression_level(ctx_->compression_level()); 757 } 758 ctx_->sent_initial_metadata_ = true; 759 meta_ops_.set_core_cq_tag(&meta_tag_); 760 call_.PerformOps(&meta_ops_); 761 } 762 Write(const ResponseType * resp,::grpc::WriteOptions options)763 void Write(const ResponseType* resp, 764 ::grpc::WriteOptions options) override { 765 this->Ref(); 766 if (options.is_last_message()) { 767 options.set_buffer_hint(); 768 } 769 if (!ctx_->sent_initial_metadata_) { 770 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 771 ctx_->initial_metadata_flags()); 772 if (ctx_->compression_level_set()) { 773 write_ops_.set_compression_level(ctx_->compression_level()); 774 } 775 ctx_->sent_initial_metadata_ = true; 776 } 777 // TODO(vjpai): don't assert 778 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); 779 call_.PerformOps(&write_ops_); 780 } 781 WriteAndFinish(const ResponseType * resp,::grpc::WriteOptions options,::grpc::Status s)782 void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options, 783 ::grpc::Status s) override { 784 // TODO(vjpai): don't assert 785 GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); 786 Finish(std::move(s)); 787 } 788 Read(RequestType * req)789 void Read(RequestType* req) override { 790 this->Ref(); 791 read_ops_.RecvMessage(req); 792 call_.PerformOps(&read_ops_); 793 } 794 795 private: 796 friend class CallbackBidiHandler<RequestType, ResponseType>; 797 ServerCallbackReaderWriterImpl(::grpc_impl::CallbackServerContext * ctx,::grpc::internal::Call * call,std::function<void ()> call_requester)798 ServerCallbackReaderWriterImpl(::grpc_impl::CallbackServerContext* ctx, 799 ::grpc::internal::Call* call, 800 std::function<void()> call_requester) 801 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} 802 SetupReactor(ServerBidiReactor<RequestType,ResponseType> * reactor)803 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) { 804 reactor_.store(reactor, std::memory_order_relaxed); 805 // The callbacks for these functions should not be inlined because they 806 // invoke user-controlled reactions, but any resulting OnDones can be 807 // inlined in the executor to which a callback is dispatched. 808 write_tag_.Set(call_.call(), 809 [this, reactor](bool ok) { 810 reactor->OnWriteDone(ok); 811 this->MaybeDone(/*inlineable_ondone=*/true); 812 }, 813 &write_ops_, /*can_inline=*/false); 814 write_ops_.set_core_cq_tag(&write_tag_); 815 read_tag_.Set(call_.call(), 816 [this, reactor](bool ok) { 817 reactor->OnReadDone(ok); 818 this->MaybeDone(/*inlineable_ondone=*/true); 819 }, 820 &read_ops_, /*can_inline=*/false); 821 read_ops_.set_core_cq_tag(&read_tag_); 822 this->BindReactor(reactor); 823 this->MaybeCallOnCancel(reactor); 824 // Inlineable OnDone can be false here because there is no bidi 825 // reactor that has an inlineable OnDone; this only applies to the 826 // DefaultReactor (which is unary). 827 this->MaybeDone(/*inlineable_ondone=*/false); 828 } 829 CallOnDone()830 void CallOnDone() override { 831 reactor_.load(std::memory_order_relaxed)->OnDone(); 832 grpc_call* call = call_.call(); 833 auto call_requester = std::move(call_requester_); 834 this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor 835 ::grpc::g_core_codegen_interface->grpc_call_unref(call); 836 call_requester(); 837 } 838 reactor()839 ServerReactor* reactor() override { 840 return reactor_.load(std::memory_order_relaxed); 841 } 842 843 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 844 meta_ops_; 845 ::grpc::internal::CallbackWithSuccessTag meta_tag_; 846 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 847 ::grpc::internal::CallOpSendMessage, 848 ::grpc::internal::CallOpServerSendStatus> 849 finish_ops_; 850 ::grpc::internal::CallbackWithSuccessTag finish_tag_; 851 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 852 ::grpc::internal::CallOpSendMessage> 853 write_ops_; 854 ::grpc::internal::CallbackWithSuccessTag write_tag_; 855 ::grpc::internal::CallOpSet< 856 ::grpc::internal::CallOpRecvMessage<RequestType>> 857 read_ops_; 858 ::grpc::internal::CallbackWithSuccessTag read_tag_; 859 860 ::grpc_impl::CallbackServerContext* const ctx_; 861 ::grpc::internal::Call call_; 862 std::function<void()> call_requester_; 863 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 864 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_; 865 // callbacks_outstanding_ follows a refcount pattern 866 std::atomic<intptr_t> callbacks_outstanding_{ 867 3}; // reserve for OnStarted, Finish, and CompletionOp 868 }; 869 }; 870 871 } // namespace internal 872 } // namespace grpc_impl 873 874 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H 875