1 /* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H 19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H 20 21 #include <grpcpp/impl/codegen/message_allocator.h> 22 #include <grpcpp/impl/codegen/rpc_service_method.h> 23 #include <grpcpp/impl/codegen/server_callback.h> 24 #include <grpcpp/impl/codegen/server_context.h> 25 #include <grpcpp/impl/codegen/status.h> 26 27 namespace grpc { 28 namespace internal { 29 30 template <class RequestType, class ResponseType> 31 class CallbackUnaryHandler : public ::grpc::internal::MethodHandler { 32 public: CallbackUnaryHandler(std::function<ServerUnaryReactor * (::grpc::CallbackServerContext *,const RequestType *,ResponseType *)> get_reactor)33 explicit CallbackUnaryHandler( 34 std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*, 35 const RequestType*, ResponseType*)> 36 get_reactor) 37 : get_reactor_(std::move(get_reactor)) {} 38 SetMessageAllocator(::grpc::experimental::MessageAllocator<RequestType,ResponseType> * allocator)39 void SetMessageAllocator( 40 ::grpc::experimental::MessageAllocator<RequestType, ResponseType>* 41 allocator) { 42 allocator_ = allocator; 43 } 44 RunHandler(const HandlerParameter & param)45 void RunHandler(const HandlerParameter& param) final { 46 // Arena allocate a controller structure (that includes request/response) 47 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); 48 auto* allocator_state = static_cast< 49 ::grpc::experimental::MessageHolder<RequestType, ResponseType>*>( 50 param.internal_data); 51 52 auto* call = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 53 param.call->call(), sizeof(ServerCallbackUnaryImpl))) 54 ServerCallbackUnaryImpl( 55 static_cast<::grpc::CallbackServerContext*>(param.server_context), 56 param.call, allocator_state, param.call_requester); 57 param.server_context->BeginCompletionOp( 58 param.call, [call](bool) { call->MaybeDone(); }, call); 59 60 ServerUnaryReactor* reactor = nullptr; 61 if (param.status.ok()) { 62 reactor = ::grpc::internal::CatchingReactorGetter<ServerUnaryReactor>( 63 get_reactor_, 64 static_cast<::grpc::CallbackServerContext*>(param.server_context), 65 call->request(), call->response()); 66 } 67 68 if (reactor == nullptr) { 69 // if deserialization or reactor creator failed, we need to fail the call 70 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 71 param.call->call(), sizeof(UnimplementedUnaryReactor))) 72 UnimplementedUnaryReactor( 73 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); 74 } 75 76 /// Invoke SetupReactor as the last part of the handler 77 call->SetupReactor(reactor); 78 } 79 Deserialize(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,void ** handler_data)80 void* Deserialize(grpc_call* call, grpc_byte_buffer* req, 81 ::grpc::Status* status, void** handler_data) final { 82 ::grpc::ByteBuffer buf; 83 buf.set_buffer(req); 84 RequestType* request = nullptr; 85 ::grpc::experimental::MessageHolder<RequestType, ResponseType>* 86 allocator_state = nullptr; 87 if (allocator_ != nullptr) { 88 allocator_state = allocator_->AllocateMessages(); 89 } else { 90 allocator_state = 91 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 92 call, sizeof(DefaultMessageHolder<RequestType, ResponseType>))) 93 DefaultMessageHolder<RequestType, ResponseType>(); 94 } 95 *handler_data = allocator_state; 96 request = allocator_state->request(); 97 *status = 98 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); 99 buf.Release(); 100 if (status->ok()) { 101 return request; 102 } 103 // Clean up on deserialization failure. 104 allocator_state->Release(); 105 return nullptr; 106 } 107 108 private: 109 std::function<ServerUnaryReactor*(::grpc::CallbackServerContext*, 110 const RequestType*, ResponseType*)> 111 get_reactor_; 112 ::grpc::experimental::MessageAllocator<RequestType, ResponseType>* 113 allocator_ = nullptr; 114 115 class ServerCallbackUnaryImpl : public ServerCallbackUnary { 116 public: Finish(::grpc::Status s)117 void Finish(::grpc::Status s) override { 118 // A callback that only contains a call to MaybeDone can be run as an 119 // inline callback regardless of whether or not OnDone is inlineable 120 // because if the actual OnDone callback needs to be scheduled, MaybeDone 121 // is responsible for dispatching to an executor thread if needed. Thus, 122 // when setting up the finish_tag_, we can set its own callback to 123 // inlineable. 124 finish_tag_.Set( 125 call_.call(), 126 [this](bool) { 127 this->MaybeDone( 128 reactor_.load(std::memory_order_relaxed)->InternalInlineable()); 129 }, 130 &finish_ops_, /*can_inline=*/true); 131 finish_ops_.set_core_cq_tag(&finish_tag_); 132 133 if (!ctx_->sent_initial_metadata_) { 134 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 135 ctx_->initial_metadata_flags()); 136 if (ctx_->compression_level_set()) { 137 finish_ops_.set_compression_level(ctx_->compression_level()); 138 } 139 ctx_->sent_initial_metadata_ = true; 140 } 141 // The response is dropped if the status is not OK. 142 if (s.ok()) { 143 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, 144 finish_ops_.SendMessagePtr(response())); 145 } else { 146 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 147 } 148 finish_ops_.set_core_cq_tag(&finish_tag_); 149 call_.PerformOps(&finish_ops_); 150 } 151 SendInitialMetadata()152 void SendInitialMetadata() override { 153 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 154 this->Ref(); 155 // The callback for this function should not be marked inline because it 156 // is directly invoking a user-controlled reaction 157 // (OnSendInitialMetadataDone). Thus it must be dispatched to an executor 158 // thread. However, any OnDone needed after that can be inlined because it 159 // is already running on an executor thread. 160 meta_tag_.Set( 161 call_.call(), 162 [this](bool ok) { 163 ServerUnaryReactor* reactor = 164 reactor_.load(std::memory_order_relaxed); 165 reactor->OnSendInitialMetadataDone(ok); 166 this->MaybeDone(/*inlineable_ondone=*/true); 167 }, 168 &meta_ops_, /*can_inline=*/false); 169 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 170 ctx_->initial_metadata_flags()); 171 if (ctx_->compression_level_set()) { 172 meta_ops_.set_compression_level(ctx_->compression_level()); 173 } 174 ctx_->sent_initial_metadata_ = true; 175 meta_ops_.set_core_cq_tag(&meta_tag_); 176 call_.PerformOps(&meta_ops_); 177 } 178 179 private: 180 friend class CallbackUnaryHandler<RequestType, ResponseType>; 181 ServerCallbackUnaryImpl(::grpc::CallbackServerContext * ctx,::grpc::internal::Call * call,::grpc::experimental::MessageHolder<RequestType,ResponseType> * allocator_state,std::function<void ()> call_requester)182 ServerCallbackUnaryImpl( 183 ::grpc::CallbackServerContext* ctx, ::grpc::internal::Call* call, 184 ::grpc::experimental::MessageHolder<RequestType, ResponseType>* 185 allocator_state, 186 std::function<void()> call_requester) 187 : ctx_(ctx), 188 call_(*call), 189 allocator_state_(allocator_state), 190 call_requester_(std::move(call_requester)) { 191 ctx_->set_message_allocator_state(allocator_state); 192 } 193 194 /// SetupReactor binds the reactor (which also releases any queued 195 /// operations), maybe calls OnCancel if possible/needed, and maybe marks 196 /// the completion of the RPC. This should be the last component of the 197 /// handler. SetupReactor(ServerUnaryReactor * reactor)198 void SetupReactor(ServerUnaryReactor* reactor) { 199 reactor_.store(reactor, std::memory_order_relaxed); 200 this->BindReactor(reactor); 201 this->MaybeCallOnCancel(reactor); 202 this->MaybeDone(reactor->InternalInlineable()); 203 } 204 request()205 const RequestType* request() { return allocator_state_->request(); } response()206 ResponseType* response() { return allocator_state_->response(); } 207 CallOnDone()208 void CallOnDone() override { 209 reactor_.load(std::memory_order_relaxed)->OnDone(); 210 grpc_call* call = call_.call(); 211 auto call_requester = std::move(call_requester_); 212 allocator_state_->Release(); 213 this->~ServerCallbackUnaryImpl(); // explicitly call destructor 214 ::grpc::g_core_codegen_interface->grpc_call_unref(call); 215 call_requester(); 216 } 217 reactor()218 ServerReactor* reactor() override { 219 return reactor_.load(std::memory_order_relaxed); 220 } 221 222 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 223 meta_ops_; 224 ::grpc::internal::CallbackWithSuccessTag meta_tag_; 225 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 226 ::grpc::internal::CallOpSendMessage, 227 ::grpc::internal::CallOpServerSendStatus> 228 finish_ops_; 229 ::grpc::internal::CallbackWithSuccessTag finish_tag_; 230 231 ::grpc::CallbackServerContext* const ctx_; 232 ::grpc::internal::Call call_; 233 ::grpc::experimental::MessageHolder<RequestType, ResponseType>* const 234 allocator_state_; 235 std::function<void()> call_requester_; 236 // reactor_ can always be loaded/stored with relaxed memory ordering because 237 // its value is only set once, independently of other data in the object, 238 // and the loads that use it will always actually come provably later even 239 // though they are from different threads since they are triggered by 240 // actions initiated only by the setting up of the reactor_ variable. In 241 // a sense, it's a delayed "const": it gets its value from the SetupReactor 242 // method (not the constructor, so it's not a true const), but it doesn't 243 // change after that and it only gets used by actions caused, directly or 244 // indirectly, by that setup. This comment also applies to the reactor_ 245 // variables of the other streaming objects in this file. 246 std::atomic<ServerUnaryReactor*> reactor_; 247 // callbacks_outstanding_ follows a refcount pattern 248 std::atomic<intptr_t> callbacks_outstanding_{ 249 3}; // reserve for start, Finish, and CompletionOp 250 }; 251 }; 252 253 template <class RequestType, class ResponseType> 254 class CallbackClientStreamingHandler : public ::grpc::internal::MethodHandler { 255 public: CallbackClientStreamingHandler(std::function<ServerReadReactor<RequestType> * (::grpc::CallbackServerContext *,ResponseType *)> get_reactor)256 explicit CallbackClientStreamingHandler( 257 std::function<ServerReadReactor<RequestType>*( 258 ::grpc::CallbackServerContext*, ResponseType*)> 259 get_reactor) 260 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)261 void RunHandler(const HandlerParameter& param) final { 262 // Arena allocate a reader structure (that includes response) 263 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); 264 265 auto* reader = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 266 param.call->call(), sizeof(ServerCallbackReaderImpl))) 267 ServerCallbackReaderImpl( 268 static_cast<::grpc::CallbackServerContext*>(param.server_context), 269 param.call, param.call_requester); 270 // Inlineable OnDone can be false in the CompletionOp callback because there 271 // is no read reactor that has an inlineable OnDone; this only applies to 272 // the DefaultReactor (which is unary). 273 param.server_context->BeginCompletionOp( 274 param.call, 275 [reader](bool) { reader->MaybeDone(/*inlineable_ondone=*/false); }, 276 reader); 277 278 ServerReadReactor<RequestType>* reactor = nullptr; 279 if (param.status.ok()) { 280 reactor = ::grpc::internal::CatchingReactorGetter< 281 ServerReadReactor<RequestType>>( 282 get_reactor_, 283 static_cast<::grpc::CallbackServerContext*>(param.server_context), 284 reader->response()); 285 } 286 287 if (reactor == nullptr) { 288 // if deserialization or reactor creator failed, we need to fail the call 289 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 290 param.call->call(), sizeof(UnimplementedReadReactor<RequestType>))) 291 UnimplementedReadReactor<RequestType>( 292 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); 293 } 294 295 reader->SetupReactor(reactor); 296 } 297 298 private: 299 std::function<ServerReadReactor<RequestType>*(::grpc::CallbackServerContext*, 300 ResponseType*)> 301 get_reactor_; 302 303 class ServerCallbackReaderImpl : public ServerCallbackReader<RequestType> { 304 public: Finish(::grpc::Status s)305 void Finish(::grpc::Status s) override { 306 // A finish tag with only MaybeDone can have its callback inlined 307 // regardless even if OnDone is not inlineable because this callback just 308 // checks a ref and then decides whether or not to dispatch OnDone. 309 finish_tag_.Set( 310 call_.call(), 311 [this](bool) { 312 // Inlineable OnDone can be false here because there is 313 // no read reactor that has an inlineable OnDone; this 314 // only applies to the DefaultReactor (which is unary). 315 this->MaybeDone(/*inlineable_ondone=*/false); 316 }, 317 &finish_ops_, /*can_inline=*/true); 318 if (!ctx_->sent_initial_metadata_) { 319 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 320 ctx_->initial_metadata_flags()); 321 if (ctx_->compression_level_set()) { 322 finish_ops_.set_compression_level(ctx_->compression_level()); 323 } 324 ctx_->sent_initial_metadata_ = true; 325 } 326 // The response is dropped if the status is not OK. 327 if (s.ok()) { 328 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, 329 finish_ops_.SendMessagePtr(&resp_)); 330 } else { 331 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 332 } 333 finish_ops_.set_core_cq_tag(&finish_tag_); 334 call_.PerformOps(&finish_ops_); 335 } 336 SendInitialMetadata()337 void SendInitialMetadata() override { 338 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 339 this->Ref(); 340 // The callback for this function should not be inlined because it invokes 341 // a user-controlled reaction, but any resulting OnDone can be inlined in 342 // the executor to which this callback is dispatched. 343 meta_tag_.Set( 344 call_.call(), 345 [this](bool ok) { 346 ServerReadReactor<RequestType>* reactor = 347 reactor_.load(std::memory_order_relaxed); 348 reactor->OnSendInitialMetadataDone(ok); 349 this->MaybeDone(/*inlineable_ondone=*/true); 350 }, 351 &meta_ops_, /*can_inline=*/false); 352 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 353 ctx_->initial_metadata_flags()); 354 if (ctx_->compression_level_set()) { 355 meta_ops_.set_compression_level(ctx_->compression_level()); 356 } 357 ctx_->sent_initial_metadata_ = true; 358 meta_ops_.set_core_cq_tag(&meta_tag_); 359 call_.PerformOps(&meta_ops_); 360 } 361 Read(RequestType * req)362 void Read(RequestType* req) override { 363 this->Ref(); 364 read_ops_.RecvMessage(req); 365 call_.PerformOps(&read_ops_); 366 } 367 368 private: 369 friend class CallbackClientStreamingHandler<RequestType, ResponseType>; 370 ServerCallbackReaderImpl(::grpc::CallbackServerContext * ctx,::grpc::internal::Call * call,std::function<void ()> call_requester)371 ServerCallbackReaderImpl(::grpc::CallbackServerContext* ctx, 372 ::grpc::internal::Call* call, 373 std::function<void()> call_requester) 374 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} 375 SetupReactor(ServerReadReactor<RequestType> * reactor)376 void SetupReactor(ServerReadReactor<RequestType>* reactor) { 377 reactor_.store(reactor, std::memory_order_relaxed); 378 // The callback for this function should not be inlined because it invokes 379 // a user-controlled reaction, but any resulting OnDone can be inlined in 380 // the executor to which this callback is dispatched. 381 read_tag_.Set( 382 call_.call(), 383 [this, reactor](bool ok) { 384 reactor->OnReadDone(ok); 385 this->MaybeDone(/*inlineable_ondone=*/true); 386 }, 387 &read_ops_, /*can_inline=*/false); 388 read_ops_.set_core_cq_tag(&read_tag_); 389 this->BindReactor(reactor); 390 this->MaybeCallOnCancel(reactor); 391 // Inlineable OnDone can be false here because there is no read 392 // reactor that has an inlineable OnDone; this only applies to the 393 // DefaultReactor (which is unary). 394 this->MaybeDone(/*inlineable_ondone=*/false); 395 } 396 ~ServerCallbackReaderImpl()397 ~ServerCallbackReaderImpl() {} 398 response()399 ResponseType* response() { return &resp_; } 400 CallOnDone()401 void CallOnDone() override { 402 reactor_.load(std::memory_order_relaxed)->OnDone(); 403 grpc_call* call = call_.call(); 404 auto call_requester = std::move(call_requester_); 405 this->~ServerCallbackReaderImpl(); // explicitly call destructor 406 ::grpc::g_core_codegen_interface->grpc_call_unref(call); 407 call_requester(); 408 } 409 reactor()410 ServerReactor* reactor() override { 411 return reactor_.load(std::memory_order_relaxed); 412 } 413 414 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 415 meta_ops_; 416 ::grpc::internal::CallbackWithSuccessTag meta_tag_; 417 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 418 ::grpc::internal::CallOpSendMessage, 419 ::grpc::internal::CallOpServerSendStatus> 420 finish_ops_; 421 ::grpc::internal::CallbackWithSuccessTag finish_tag_; 422 ::grpc::internal::CallOpSet< 423 ::grpc::internal::CallOpRecvMessage<RequestType>> 424 read_ops_; 425 ::grpc::internal::CallbackWithSuccessTag read_tag_; 426 427 ::grpc::CallbackServerContext* const ctx_; 428 ::grpc::internal::Call call_; 429 ResponseType resp_; 430 std::function<void()> call_requester_; 431 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 432 std::atomic<ServerReadReactor<RequestType>*> reactor_; 433 // callbacks_outstanding_ follows a refcount pattern 434 std::atomic<intptr_t> callbacks_outstanding_{ 435 3}; // reserve for OnStarted, Finish, and CompletionOp 436 }; 437 }; 438 439 template <class RequestType, class ResponseType> 440 class CallbackServerStreamingHandler : public ::grpc::internal::MethodHandler { 441 public: CallbackServerStreamingHandler(std::function<ServerWriteReactor<ResponseType> * (::grpc::CallbackServerContext *,const RequestType *)> get_reactor)442 explicit CallbackServerStreamingHandler( 443 std::function<ServerWriteReactor<ResponseType>*( 444 ::grpc::CallbackServerContext*, const RequestType*)> 445 get_reactor) 446 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)447 void RunHandler(const HandlerParameter& param) final { 448 // Arena allocate a writer structure 449 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); 450 451 auto* writer = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 452 param.call->call(), sizeof(ServerCallbackWriterImpl))) 453 ServerCallbackWriterImpl( 454 static_cast<::grpc::CallbackServerContext*>(param.server_context), 455 param.call, static_cast<RequestType*>(param.request), 456 param.call_requester); 457 // Inlineable OnDone can be false in the CompletionOp callback because there 458 // is no write reactor that has an inlineable OnDone; this only applies to 459 // the DefaultReactor (which is unary). 460 param.server_context->BeginCompletionOp( 461 param.call, 462 [writer](bool) { writer->MaybeDone(/*inlineable_ondone=*/false); }, 463 writer); 464 465 ServerWriteReactor<ResponseType>* reactor = nullptr; 466 if (param.status.ok()) { 467 reactor = ::grpc::internal::CatchingReactorGetter< 468 ServerWriteReactor<ResponseType>>( 469 get_reactor_, 470 static_cast<::grpc::CallbackServerContext*>(param.server_context), 471 writer->request()); 472 } 473 if (reactor == nullptr) { 474 // if deserialization or reactor creator failed, we need to fail the call 475 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 476 param.call->call(), sizeof(UnimplementedWriteReactor<ResponseType>))) 477 UnimplementedWriteReactor<ResponseType>( 478 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); 479 } 480 481 writer->SetupReactor(reactor); 482 } 483 Deserialize(grpc_call * call,grpc_byte_buffer * req,::grpc::Status * status,void **)484 void* Deserialize(grpc_call* call, grpc_byte_buffer* req, 485 ::grpc::Status* status, void** /*handler_data*/) final { 486 ::grpc::ByteBuffer buf; 487 buf.set_buffer(req); 488 auto* request = 489 new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 490 call, sizeof(RequestType))) RequestType(); 491 *status = 492 ::grpc::SerializationTraits<RequestType>::Deserialize(&buf, request); 493 buf.Release(); 494 if (status->ok()) { 495 return request; 496 } 497 request->~RequestType(); 498 return nullptr; 499 } 500 501 private: 502 std::function<ServerWriteReactor<ResponseType>*( 503 ::grpc::CallbackServerContext*, const RequestType*)> 504 get_reactor_; 505 506 class ServerCallbackWriterImpl : public ServerCallbackWriter<ResponseType> { 507 public: Finish(::grpc::Status s)508 void Finish(::grpc::Status s) override { 509 // A finish tag with only MaybeDone can have its callback inlined 510 // regardless even if OnDone is not inlineable because this callback just 511 // checks a ref and then decides whether or not to dispatch OnDone. 512 finish_tag_.Set( 513 call_.call(), 514 [this](bool) { 515 // Inlineable OnDone can be false here because there is 516 // no write reactor that has an inlineable OnDone; this 517 // only applies to the DefaultReactor (which is unary). 518 this->MaybeDone(/*inlineable_ondone=*/false); 519 }, 520 &finish_ops_, /*can_inline=*/true); 521 finish_ops_.set_core_cq_tag(&finish_tag_); 522 523 if (!ctx_->sent_initial_metadata_) { 524 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 525 ctx_->initial_metadata_flags()); 526 if (ctx_->compression_level_set()) { 527 finish_ops_.set_compression_level(ctx_->compression_level()); 528 } 529 ctx_->sent_initial_metadata_ = true; 530 } 531 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 532 call_.PerformOps(&finish_ops_); 533 } 534 SendInitialMetadata()535 void SendInitialMetadata() override { 536 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 537 this->Ref(); 538 // The callback for this function should not be inlined because it invokes 539 // a user-controlled reaction, but any resulting OnDone can be inlined in 540 // the executor to which this callback is dispatched. 541 meta_tag_.Set( 542 call_.call(), 543 [this](bool ok) { 544 ServerWriteReactor<ResponseType>* reactor = 545 reactor_.load(std::memory_order_relaxed); 546 reactor->OnSendInitialMetadataDone(ok); 547 this->MaybeDone(/*inlineable_ondone=*/true); 548 }, 549 &meta_ops_, /*can_inline=*/false); 550 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 551 ctx_->initial_metadata_flags()); 552 if (ctx_->compression_level_set()) { 553 meta_ops_.set_compression_level(ctx_->compression_level()); 554 } 555 ctx_->sent_initial_metadata_ = true; 556 meta_ops_.set_core_cq_tag(&meta_tag_); 557 call_.PerformOps(&meta_ops_); 558 } 559 Write(const ResponseType * resp,::grpc::WriteOptions options)560 void Write(const ResponseType* resp, 561 ::grpc::WriteOptions options) override { 562 this->Ref(); 563 if (options.is_last_message()) { 564 options.set_buffer_hint(); 565 } 566 if (!ctx_->sent_initial_metadata_) { 567 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 568 ctx_->initial_metadata_flags()); 569 if (ctx_->compression_level_set()) { 570 write_ops_.set_compression_level(ctx_->compression_level()); 571 } 572 ctx_->sent_initial_metadata_ = true; 573 } 574 // TODO(vjpai): don't assert 575 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); 576 call_.PerformOps(&write_ops_); 577 } 578 WriteAndFinish(const ResponseType * resp,::grpc::WriteOptions options,::grpc::Status s)579 void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options, 580 ::grpc::Status s) override { 581 // This combines the write into the finish callback 582 // TODO(vjpai): don't assert 583 GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); 584 Finish(std::move(s)); 585 } 586 587 private: 588 friend class CallbackServerStreamingHandler<RequestType, ResponseType>; 589 ServerCallbackWriterImpl(::grpc::CallbackServerContext * ctx,::grpc::internal::Call * call,const RequestType * req,std::function<void ()> call_requester)590 ServerCallbackWriterImpl(::grpc::CallbackServerContext* ctx, 591 ::grpc::internal::Call* call, 592 const RequestType* req, 593 std::function<void()> call_requester) 594 : ctx_(ctx), 595 call_(*call), 596 req_(req), 597 call_requester_(std::move(call_requester)) {} 598 SetupReactor(ServerWriteReactor<ResponseType> * reactor)599 void SetupReactor(ServerWriteReactor<ResponseType>* reactor) { 600 reactor_.store(reactor, std::memory_order_relaxed); 601 // The callback for this function should not be inlined because it invokes 602 // a user-controlled reaction, but any resulting OnDone can be inlined in 603 // the executor to which this callback is dispatched. 604 write_tag_.Set( 605 call_.call(), 606 [this, reactor](bool ok) { 607 reactor->OnWriteDone(ok); 608 this->MaybeDone(/*inlineable_ondone=*/true); 609 }, 610 &write_ops_, /*can_inline=*/false); 611 write_ops_.set_core_cq_tag(&write_tag_); 612 this->BindReactor(reactor); 613 this->MaybeCallOnCancel(reactor); 614 // Inlineable OnDone can be false here because there is no write 615 // reactor that has an inlineable OnDone; this only applies to the 616 // DefaultReactor (which is unary). 617 this->MaybeDone(/*inlineable_ondone=*/false); 618 } ~ServerCallbackWriterImpl()619 ~ServerCallbackWriterImpl() { req_->~RequestType(); } 620 request()621 const RequestType* request() { return req_; } 622 CallOnDone()623 void CallOnDone() override { 624 reactor_.load(std::memory_order_relaxed)->OnDone(); 625 grpc_call* call = call_.call(); 626 auto call_requester = std::move(call_requester_); 627 this->~ServerCallbackWriterImpl(); // explicitly call destructor 628 ::grpc::g_core_codegen_interface->grpc_call_unref(call); 629 call_requester(); 630 } 631 reactor()632 ServerReactor* reactor() override { 633 return reactor_.load(std::memory_order_relaxed); 634 } 635 636 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 637 meta_ops_; 638 ::grpc::internal::CallbackWithSuccessTag meta_tag_; 639 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 640 ::grpc::internal::CallOpSendMessage, 641 ::grpc::internal::CallOpServerSendStatus> 642 finish_ops_; 643 ::grpc::internal::CallbackWithSuccessTag finish_tag_; 644 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 645 ::grpc::internal::CallOpSendMessage> 646 write_ops_; 647 ::grpc::internal::CallbackWithSuccessTag write_tag_; 648 649 ::grpc::CallbackServerContext* const ctx_; 650 ::grpc::internal::Call call_; 651 const RequestType* req_; 652 std::function<void()> call_requester_; 653 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 654 std::atomic<ServerWriteReactor<ResponseType>*> reactor_; 655 // callbacks_outstanding_ follows a refcount pattern 656 std::atomic<intptr_t> callbacks_outstanding_{ 657 3}; // reserve for OnStarted, Finish, and CompletionOp 658 }; 659 }; 660 661 template <class RequestType, class ResponseType> 662 class CallbackBidiHandler : public ::grpc::internal::MethodHandler { 663 public: CallbackBidiHandler(std::function<ServerBidiReactor<RequestType,ResponseType> * (::grpc::CallbackServerContext *)> get_reactor)664 explicit CallbackBidiHandler( 665 std::function<ServerBidiReactor<RequestType, ResponseType>*( 666 ::grpc::CallbackServerContext*)> 667 get_reactor) 668 : get_reactor_(std::move(get_reactor)) {} RunHandler(const HandlerParameter & param)669 void RunHandler(const HandlerParameter& param) final { 670 ::grpc::g_core_codegen_interface->grpc_call_ref(param.call->call()); 671 672 auto* stream = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 673 param.call->call(), sizeof(ServerCallbackReaderWriterImpl))) 674 ServerCallbackReaderWriterImpl( 675 static_cast<::grpc::CallbackServerContext*>(param.server_context), 676 param.call, param.call_requester); 677 // Inlineable OnDone can be false in the CompletionOp callback because there 678 // is no bidi reactor that has an inlineable OnDone; this only applies to 679 // the DefaultReactor (which is unary). 680 param.server_context->BeginCompletionOp( 681 param.call, 682 [stream](bool) { stream->MaybeDone(/*inlineable_ondone=*/false); }, 683 stream); 684 685 ServerBidiReactor<RequestType, ResponseType>* reactor = nullptr; 686 if (param.status.ok()) { 687 reactor = ::grpc::internal::CatchingReactorGetter< 688 ServerBidiReactor<RequestType, ResponseType>>( 689 get_reactor_, 690 static_cast<::grpc::CallbackServerContext*>(param.server_context)); 691 } 692 693 if (reactor == nullptr) { 694 // if deserialization or reactor creator failed, we need to fail the call 695 reactor = new (::grpc::g_core_codegen_interface->grpc_call_arena_alloc( 696 param.call->call(), 697 sizeof(UnimplementedBidiReactor<RequestType, ResponseType>))) 698 UnimplementedBidiReactor<RequestType, ResponseType>( 699 ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "")); 700 } 701 702 stream->SetupReactor(reactor); 703 } 704 705 private: 706 std::function<ServerBidiReactor<RequestType, ResponseType>*( 707 ::grpc::CallbackServerContext*)> 708 get_reactor_; 709 710 class ServerCallbackReaderWriterImpl 711 : public ServerCallbackReaderWriter<RequestType, ResponseType> { 712 public: Finish(::grpc::Status s)713 void Finish(::grpc::Status s) override { 714 // A finish tag with only MaybeDone can have its callback inlined 715 // regardless even if OnDone is not inlineable because this callback just 716 // checks a ref and then decides whether or not to dispatch OnDone. 717 finish_tag_.Set( 718 call_.call(), 719 [this](bool) { 720 // Inlineable OnDone can be false here because there is 721 // no bidi reactor that has an inlineable OnDone; this 722 // only applies to the DefaultReactor (which is unary). 723 this->MaybeDone(/*inlineable_ondone=*/false); 724 }, 725 &finish_ops_, /*can_inline=*/true); 726 finish_ops_.set_core_cq_tag(&finish_tag_); 727 728 if (!ctx_->sent_initial_metadata_) { 729 finish_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 730 ctx_->initial_metadata_flags()); 731 if (ctx_->compression_level_set()) { 732 finish_ops_.set_compression_level(ctx_->compression_level()); 733 } 734 ctx_->sent_initial_metadata_ = true; 735 } 736 finish_ops_.ServerSendStatus(&ctx_->trailing_metadata_, s); 737 call_.PerformOps(&finish_ops_); 738 } 739 SendInitialMetadata()740 void SendInitialMetadata() override { 741 GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_); 742 this->Ref(); 743 // The callback for this function should not be inlined because it invokes 744 // a user-controlled reaction, but any resulting OnDone can be inlined in 745 // the executor to which this callback is dispatched. 746 meta_tag_.Set( 747 call_.call(), 748 [this](bool ok) { 749 ServerBidiReactor<RequestType, ResponseType>* reactor = 750 reactor_.load(std::memory_order_relaxed); 751 reactor->OnSendInitialMetadataDone(ok); 752 this->MaybeDone(/*inlineable_ondone=*/true); 753 }, 754 &meta_ops_, /*can_inline=*/false); 755 meta_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 756 ctx_->initial_metadata_flags()); 757 if (ctx_->compression_level_set()) { 758 meta_ops_.set_compression_level(ctx_->compression_level()); 759 } 760 ctx_->sent_initial_metadata_ = true; 761 meta_ops_.set_core_cq_tag(&meta_tag_); 762 call_.PerformOps(&meta_ops_); 763 } 764 Write(const ResponseType * resp,::grpc::WriteOptions options)765 void Write(const ResponseType* resp, 766 ::grpc::WriteOptions options) override { 767 this->Ref(); 768 if (options.is_last_message()) { 769 options.set_buffer_hint(); 770 } 771 if (!ctx_->sent_initial_metadata_) { 772 write_ops_.SendInitialMetadata(&ctx_->initial_metadata_, 773 ctx_->initial_metadata_flags()); 774 if (ctx_->compression_level_set()) { 775 write_ops_.set_compression_level(ctx_->compression_level()); 776 } 777 ctx_->sent_initial_metadata_ = true; 778 } 779 // TODO(vjpai): don't assert 780 GPR_CODEGEN_ASSERT(write_ops_.SendMessagePtr(resp, options).ok()); 781 call_.PerformOps(&write_ops_); 782 } 783 WriteAndFinish(const ResponseType * resp,::grpc::WriteOptions options,::grpc::Status s)784 void WriteAndFinish(const ResponseType* resp, ::grpc::WriteOptions options, 785 ::grpc::Status s) override { 786 // TODO(vjpai): don't assert 787 GPR_CODEGEN_ASSERT(finish_ops_.SendMessagePtr(resp, options).ok()); 788 Finish(std::move(s)); 789 } 790 Read(RequestType * req)791 void Read(RequestType* req) override { 792 this->Ref(); 793 read_ops_.RecvMessage(req); 794 call_.PerformOps(&read_ops_); 795 } 796 797 private: 798 friend class CallbackBidiHandler<RequestType, ResponseType>; 799 ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext * ctx,::grpc::internal::Call * call,std::function<void ()> call_requester)800 ServerCallbackReaderWriterImpl(::grpc::CallbackServerContext* ctx, 801 ::grpc::internal::Call* call, 802 std::function<void()> call_requester) 803 : ctx_(ctx), call_(*call), call_requester_(std::move(call_requester)) {} 804 SetupReactor(ServerBidiReactor<RequestType,ResponseType> * reactor)805 void SetupReactor(ServerBidiReactor<RequestType, ResponseType>* reactor) { 806 reactor_.store(reactor, std::memory_order_relaxed); 807 // The callbacks for these functions should not be inlined because they 808 // invoke user-controlled reactions, but any resulting OnDones can be 809 // inlined in the executor to which a callback is dispatched. 810 write_tag_.Set( 811 call_.call(), 812 [this, reactor](bool ok) { 813 reactor->OnWriteDone(ok); 814 this->MaybeDone(/*inlineable_ondone=*/true); 815 }, 816 &write_ops_, /*can_inline=*/false); 817 write_ops_.set_core_cq_tag(&write_tag_); 818 read_tag_.Set( 819 call_.call(), 820 [this, reactor](bool ok) { 821 reactor->OnReadDone(ok); 822 this->MaybeDone(/*inlineable_ondone=*/true); 823 }, 824 &read_ops_, /*can_inline=*/false); 825 read_ops_.set_core_cq_tag(&read_tag_); 826 this->BindReactor(reactor); 827 this->MaybeCallOnCancel(reactor); 828 // Inlineable OnDone can be false here because there is no bidi 829 // reactor that has an inlineable OnDone; this only applies to the 830 // DefaultReactor (which is unary). 831 this->MaybeDone(/*inlineable_ondone=*/false); 832 } 833 CallOnDone()834 void CallOnDone() override { 835 reactor_.load(std::memory_order_relaxed)->OnDone(); 836 grpc_call* call = call_.call(); 837 auto call_requester = std::move(call_requester_); 838 this->~ServerCallbackReaderWriterImpl(); // explicitly call destructor 839 ::grpc::g_core_codegen_interface->grpc_call_unref(call); 840 call_requester(); 841 } 842 reactor()843 ServerReactor* reactor() override { 844 return reactor_.load(std::memory_order_relaxed); 845 } 846 847 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata> 848 meta_ops_; 849 ::grpc::internal::CallbackWithSuccessTag meta_tag_; 850 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 851 ::grpc::internal::CallOpSendMessage, 852 ::grpc::internal::CallOpServerSendStatus> 853 finish_ops_; 854 ::grpc::internal::CallbackWithSuccessTag finish_tag_; 855 ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata, 856 ::grpc::internal::CallOpSendMessage> 857 write_ops_; 858 ::grpc::internal::CallbackWithSuccessTag write_tag_; 859 ::grpc::internal::CallOpSet< 860 ::grpc::internal::CallOpRecvMessage<RequestType>> 861 read_ops_; 862 ::grpc::internal::CallbackWithSuccessTag read_tag_; 863 864 ::grpc::CallbackServerContext* const ctx_; 865 ::grpc::internal::Call call_; 866 std::function<void()> call_requester_; 867 // The memory ordering of reactor_ follows ServerCallbackUnaryImpl. 868 std::atomic<ServerBidiReactor<RequestType, ResponseType>*> reactor_; 869 // callbacks_outstanding_ follows a refcount pattern 870 std::atomic<intptr_t> callbacks_outstanding_{ 871 3}; // reserve for OnStarted, Finish, and CompletionOp 872 }; 873 }; 874 875 } // namespace internal 876 } // namespace grpc 877 878 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_HANDLERS_H 879