1 /* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H 19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H 20 21 #include <atomic> 22 #include <functional> 23 #include <type_traits> 24 25 #include <grpcpp/impl/codegen/call.h> 26 #include <grpcpp/impl/codegen/call_op_set.h> 27 #include <grpcpp/impl/codegen/callback_common.h> 28 #include <grpcpp/impl/codegen/config.h> 29 #include <grpcpp/impl/codegen/core_codegen_interface.h> 30 #include <grpcpp/impl/codegen/message_allocator.h> 31 #include <grpcpp/impl/codegen/status.h> 32 33 namespace grpc { 34 35 // Declare base class of all reactors as internal 36 namespace internal { 37 38 // Forward declarations 39 template <class Request, class Response> 40 class CallbackUnaryHandler; 41 template <class Request, class Response> 42 class CallbackClientStreamingHandler; 43 template <class Request, class Response> 44 class CallbackServerStreamingHandler; 45 template <class Request, class Response> 46 class CallbackBidiHandler; 47 48 class ServerReactor { 49 public: 50 virtual ~ServerReactor() = default; 51 virtual void OnDone() = 0; 52 virtual void OnCancel() = 0; 53 54 // The following is not API. It is for internal use only and specifies whether 55 // all reactions of this Reactor can be run without an extra executor 56 // scheduling. This should only be used for internally-defined reactors with 57 // trivial reactions. InternalInlineable()58 virtual bool InternalInlineable() { return false; } 59 60 private: 61 template <class Request, class Response> 62 friend class CallbackUnaryHandler; 63 template <class Request, class Response> 64 friend class CallbackClientStreamingHandler; 65 template <class Request, class Response> 66 friend class CallbackServerStreamingHandler; 67 template <class Request, class Response> 68 friend class CallbackBidiHandler; 69 }; 70 71 /// The base class of ServerCallbackUnary etc. 72 class ServerCallbackCall { 73 public: ~ServerCallbackCall()74 virtual ~ServerCallbackCall() {} 75 76 // This object is responsible for tracking when it is safe to call OnDone and 77 // OnCancel. OnDone should not be called until the method handler is complete, 78 // Finish has been called, the ServerContext CompletionOp (which tracks 79 // cancellation or successful completion) has completed, and all outstanding 80 // Read/Write actions have seen their reactions. OnCancel should not be called 81 // until after the method handler is done and the RPC has completed with a 82 // cancellation. This is tracked by counting how many of these conditions have 83 // been met and calling OnCancel when none remain unmet. 84 85 // Public versions of MaybeDone: one where we don't know the reactor in 86 // advance (used for the ServerContext CompletionOp), and one for where we 87 // know the inlineability of the OnDone reaction. You should set the inline 88 // flag to true if either the Reactor is InternalInlineable() or if this 89 // callback is already being forced to run dispatched to an executor 90 // (typically because it contains additional work than just the MaybeDone). 91 MaybeDone()92 void MaybeDone() { 93 if (GPR_UNLIKELY(Unref() == 1)) { 94 ScheduleOnDone(reactor()->InternalInlineable()); 95 } 96 } 97 MaybeDone(bool inline_ondone)98 void MaybeDone(bool inline_ondone) { 99 if (GPR_UNLIKELY(Unref() == 1)) { 100 ScheduleOnDone(inline_ondone); 101 } 102 } 103 104 // Fast version called with known reactor passed in, used from derived 105 // classes, typically in non-cancel case MaybeCallOnCancel(ServerReactor * reactor)106 void MaybeCallOnCancel(ServerReactor* reactor) { 107 if (GPR_UNLIKELY(UnblockCancellation())) { 108 CallOnCancel(reactor); 109 } 110 } 111 112 // Slower version called from object that doesn't know the reactor a priori 113 // (such as the ServerContext CompletionOp which is formed before the 114 // reactor). This is used in cancel cases only, so it's ok to be slower and 115 // invoke a virtual function. MaybeCallOnCancel()116 void MaybeCallOnCancel() { 117 if (GPR_UNLIKELY(UnblockCancellation())) { 118 CallOnCancel(reactor()); 119 } 120 } 121 122 protected: 123 /// Increases the reference count Ref()124 void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); } 125 126 private: 127 virtual ServerReactor* reactor() = 0; 128 129 // CallOnDone performs the work required at completion of the RPC: invoking 130 // the OnDone function and doing all necessary cleanup. This function is only 131 // ever invoked on a fully-Unref'fed ServerCallbackCall. 132 virtual void CallOnDone() = 0; 133 134 // If the OnDone reaction is inlineable, execute it inline. Otherwise send it 135 // to an executor. 136 void ScheduleOnDone(bool inline_ondone); 137 138 // If the OnCancel reaction is inlineable, execute it inline. Otherwise send 139 // it to an executor. 140 void CallOnCancel(ServerReactor* reactor); 141 142 // Implement the cancellation constraint counter. Return true if OnCancel 143 // should be called, false otherwise. UnblockCancellation()144 bool UnblockCancellation() { 145 return on_cancel_conditions_remaining_.fetch_sub( 146 1, std::memory_order_acq_rel) == 1; 147 } 148 149 /// Decreases the reference count and returns the previous value Unref()150 int Unref() { 151 return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel); 152 } 153 154 std::atomic_int on_cancel_conditions_remaining_{2}; 155 std::atomic_int callbacks_outstanding_{ 156 3}; // reserve for start, Finish, and CompletionOp 157 }; 158 159 template <class Request, class Response> 160 class DefaultMessageHolder 161 : public ::grpc::experimental::MessageHolder<Request, Response> { 162 public: DefaultMessageHolder()163 DefaultMessageHolder() { 164 this->set_request(&request_obj_); 165 this->set_response(&response_obj_); 166 } Release()167 void Release() override { 168 // the object is allocated in the call arena. 169 this->~DefaultMessageHolder<Request, Response>(); 170 } 171 172 private: 173 Request request_obj_; 174 Response response_obj_; 175 }; 176 177 } // namespace internal 178 179 // Forward declarations 180 class ServerUnaryReactor; 181 template <class Request> 182 class ServerReadReactor; 183 template <class Response> 184 class ServerWriteReactor; 185 template <class Request, class Response> 186 class ServerBidiReactor; 187 188 // NOTE: The actual call/stream object classes are provided as API only to 189 // support mocking. There are no implementations of these class interfaces in 190 // the API. 191 class ServerCallbackUnary : public internal::ServerCallbackCall { 192 public: ~ServerCallbackUnary()193 ~ServerCallbackUnary() override {} 194 virtual void Finish(::grpc::Status s) = 0; 195 virtual void SendInitialMetadata() = 0; 196 197 protected: 198 // Use a template rather than explicitly specifying ServerUnaryReactor to 199 // delay binding and avoid a circular forward declaration issue 200 template <class Reactor> BindReactor(Reactor * reactor)201 void BindReactor(Reactor* reactor) { 202 reactor->InternalBindCall(this); 203 } 204 }; 205 206 template <class Request> 207 class ServerCallbackReader : public internal::ServerCallbackCall { 208 public: ~ServerCallbackReader()209 ~ServerCallbackReader() override {} 210 virtual void Finish(::grpc::Status s) = 0; 211 virtual void SendInitialMetadata() = 0; 212 virtual void Read(Request* msg) = 0; 213 214 protected: BindReactor(ServerReadReactor<Request> * reactor)215 void BindReactor(ServerReadReactor<Request>* reactor) { 216 reactor->InternalBindReader(this); 217 } 218 }; 219 220 template <class Response> 221 class ServerCallbackWriter : public internal::ServerCallbackCall { 222 public: ~ServerCallbackWriter()223 ~ServerCallbackWriter() override {} 224 225 virtual void Finish(::grpc::Status s) = 0; 226 virtual void SendInitialMetadata() = 0; 227 virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0; 228 virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options, 229 ::grpc::Status s) = 0; 230 231 protected: BindReactor(ServerWriteReactor<Response> * reactor)232 void BindReactor(ServerWriteReactor<Response>* reactor) { 233 reactor->InternalBindWriter(this); 234 } 235 }; 236 237 template <class Request, class Response> 238 class ServerCallbackReaderWriter : public internal::ServerCallbackCall { 239 public: ~ServerCallbackReaderWriter()240 ~ServerCallbackReaderWriter() override {} 241 242 virtual void Finish(::grpc::Status s) = 0; 243 virtual void SendInitialMetadata() = 0; 244 virtual void Read(Request* msg) = 0; 245 virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0; 246 virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options, 247 ::grpc::Status s) = 0; 248 249 protected: BindReactor(ServerBidiReactor<Request,Response> * reactor)250 void BindReactor(ServerBidiReactor<Request, Response>* reactor) { 251 reactor->InternalBindStream(this); 252 } 253 }; 254 255 // The following classes are the reactor interfaces that are to be implemented 256 // by the user, returned as the output parameter of the method handler for a 257 // callback method. Note that none of the classes are pure; all reactions have a 258 // default empty reaction so that the user class only needs to override those 259 // classes that it cares about. 260 261 /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC. 262 template <class Request, class Response> 263 class ServerBidiReactor : public internal::ServerReactor { 264 public: 265 // NOTE: Initializing stream_ as a constructor initializer rather than a 266 // default initializer because gcc-4.x requires a copy constructor for 267 // default initializing a templated member, which isn't ok for atomic. 268 // TODO(vjpai): Switch to default constructor and default initializer when 269 // gcc-4.x is no longer supported ServerBidiReactor()270 ServerBidiReactor() : stream_(nullptr) {} 271 ~ServerBidiReactor() override = default; 272 273 /// Send any initial metadata stored in the RPC context. If not invoked, 274 /// any initial metadata will be passed along with the first Write or the 275 /// Finish (if there are no writes). StartSendInitialMetadata()276 void StartSendInitialMetadata() { 277 ServerCallbackReaderWriter<Request, Response>* stream = 278 stream_.load(std::memory_order_acquire); 279 if (stream == nullptr) { 280 grpc::internal::MutexLock l(&stream_mu_); 281 stream = stream_.load(std::memory_order_relaxed); 282 if (stream == nullptr) { 283 backlog_.send_initial_metadata_wanted = true; 284 return; 285 } 286 } 287 stream->SendInitialMetadata(); 288 } 289 290 /// Initiate a read operation. 291 /// 292 /// \param[out] req Where to eventually store the read message. Valid when 293 /// the library calls OnReadDone StartRead(Request * req)294 void StartRead(Request* req) { 295 ServerCallbackReaderWriter<Request, Response>* stream = 296 stream_.load(std::memory_order_acquire); 297 if (stream == nullptr) { 298 grpc::internal::MutexLock l(&stream_mu_); 299 stream = stream_.load(std::memory_order_relaxed); 300 if (stream == nullptr) { 301 backlog_.read_wanted = req; 302 return; 303 } 304 } 305 stream->Read(req); 306 } 307 308 /// Initiate a write operation. 309 /// 310 /// \param[in] resp The message to be written. The library does not take 311 /// ownership but the caller must ensure that the message is 312 /// not deleted or modified until OnWriteDone is called. StartWrite(const Response * resp)313 void StartWrite(const Response* resp) { 314 StartWrite(resp, ::grpc::WriteOptions()); 315 } 316 317 /// Initiate a write operation with specified options. 318 /// 319 /// \param[in] resp The message to be written. The library does not take 320 /// ownership but the caller must ensure that the message is 321 /// not deleted or modified until OnWriteDone is called. 322 /// \param[in] options The WriteOptions to use for writing this message StartWrite(const Response * resp,::grpc::WriteOptions options)323 void StartWrite(const Response* resp, ::grpc::WriteOptions options) { 324 ServerCallbackReaderWriter<Request, Response>* stream = 325 stream_.load(std::memory_order_acquire); 326 if (stream == nullptr) { 327 grpc::internal::MutexLock l(&stream_mu_); 328 stream = stream_.load(std::memory_order_relaxed); 329 if (stream == nullptr) { 330 backlog_.write_wanted = resp; 331 backlog_.write_options_wanted = options; 332 return; 333 } 334 } 335 stream->Write(resp, options); 336 } 337 338 /// Initiate a write operation with specified options and final RPC Status, 339 /// which also causes any trailing metadata for this RPC to be sent out. 340 /// StartWriteAndFinish is like merging StartWriteLast and Finish into a 341 /// single step. A key difference, though, is that this operation doesn't have 342 /// an OnWriteDone reaction - it is considered complete only when OnDone is 343 /// available. An RPC can either have StartWriteAndFinish or Finish, but not 344 /// both. 345 /// 346 /// \param[in] resp The message to be written. The library does not take 347 /// ownership but the caller must ensure that the message is 348 /// not deleted or modified until OnDone is called. 349 /// \param[in] options The WriteOptions to use for writing this message 350 /// \param[in] s The status outcome of this RPC StartWriteAndFinish(const Response * resp,::grpc::WriteOptions options,::grpc::Status s)351 void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options, 352 ::grpc::Status s) { 353 ServerCallbackReaderWriter<Request, Response>* stream = 354 stream_.load(std::memory_order_acquire); 355 if (stream == nullptr) { 356 grpc::internal::MutexLock l(&stream_mu_); 357 stream = stream_.load(std::memory_order_relaxed); 358 if (stream == nullptr) { 359 backlog_.write_and_finish_wanted = true; 360 backlog_.write_wanted = resp; 361 backlog_.write_options_wanted = options; 362 backlog_.status_wanted = std::move(s); 363 return; 364 } 365 } 366 stream->WriteAndFinish(resp, options, std::move(s)); 367 } 368 369 /// Inform system of a planned write operation with specified options, but 370 /// allow the library to schedule the actual write coalesced with the writing 371 /// of trailing metadata (which takes place on a Finish call). 372 /// 373 /// \param[in] resp The message to be written. The library does not take 374 /// ownership but the caller must ensure that the message is 375 /// not deleted or modified until OnWriteDone is called. 376 /// \param[in] options The WriteOptions to use for writing this message StartWriteLast(const Response * resp,::grpc::WriteOptions options)377 void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) { 378 StartWrite(resp, options.set_last_message()); 379 } 380 381 /// Indicate that the stream is to be finished and the trailing metadata and 382 /// RPC status are to be sent. Every RPC MUST be finished using either Finish 383 /// or StartWriteAndFinish (but not both), even if the RPC is already 384 /// cancelled. 385 /// 386 /// \param[in] s The status outcome of this RPC Finish(::grpc::Status s)387 void Finish(::grpc::Status s) { 388 ServerCallbackReaderWriter<Request, Response>* stream = 389 stream_.load(std::memory_order_acquire); 390 if (stream == nullptr) { 391 grpc::internal::MutexLock l(&stream_mu_); 392 stream = stream_.load(std::memory_order_relaxed); 393 if (stream == nullptr) { 394 backlog_.finish_wanted = true; 395 backlog_.status_wanted = std::move(s); 396 return; 397 } 398 } 399 stream->Finish(std::move(s)); 400 } 401 402 /// Notifies the application that an explicit StartSendInitialMetadata 403 /// operation completed. Not used when the sending of initial metadata 404 /// piggybacks onto the first write. 405 /// 406 /// \param[in] ok Was it successful? If false, no further write-side operation 407 /// will succeed. OnSendInitialMetadataDone(bool)408 virtual void OnSendInitialMetadataDone(bool /*ok*/) {} 409 410 /// Notifies the application that a StartRead operation completed. 411 /// 412 /// \param[in] ok Was it successful? If false, no further read-side operation 413 /// will succeed. OnReadDone(bool)414 virtual void OnReadDone(bool /*ok*/) {} 415 416 /// Notifies the application that a StartWrite (or StartWriteLast) operation 417 /// completed. 418 /// 419 /// \param[in] ok Was it successful? If false, no further write-side operation 420 /// will succeed. OnWriteDone(bool)421 virtual void OnWriteDone(bool /*ok*/) {} 422 423 /// Notifies the application that all operations associated with this RPC 424 /// have completed. This is an override (from the internal base class) but 425 /// still abstract, so derived classes MUST override it to be instantiated. 426 void OnDone() override = 0; 427 428 /// Notifies the application that this RPC has been cancelled. This is an 429 /// override (from the internal base class) but not final, so derived classes 430 /// should override it if they want to take action. OnCancel()431 void OnCancel() override {} 432 433 private: 434 friend class ServerCallbackReaderWriter<Request, Response>; 435 // May be overridden by internal implementation details. This is not a public 436 // customization point. InternalBindStream(ServerCallbackReaderWriter<Request,Response> * stream)437 virtual void InternalBindStream( 438 ServerCallbackReaderWriter<Request, Response>* stream) { 439 grpc::internal::MutexLock l(&stream_mu_); 440 441 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { 442 stream->SendInitialMetadata(); 443 } 444 if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) { 445 stream->Read(backlog_.read_wanted); 446 } 447 if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) { 448 stream->WriteAndFinish(backlog_.write_wanted, 449 std::move(backlog_.write_options_wanted), 450 std::move(backlog_.status_wanted)); 451 } else { 452 if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) { 453 stream->Write(backlog_.write_wanted, 454 std::move(backlog_.write_options_wanted)); 455 } 456 if (GPR_UNLIKELY(backlog_.finish_wanted)) { 457 stream->Finish(std::move(backlog_.status_wanted)); 458 } 459 } 460 // Set stream_ last so that other functions can use it lock-free 461 stream_.store(stream, std::memory_order_release); 462 } 463 464 grpc::internal::Mutex stream_mu_; 465 // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant 466 // once C++17 or ABSL is supported since stream and backlog are 467 // mutually exclusive in this class. Do likewise with the 468 // remaining reactor classes and their backlogs as well. 469 std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr}; 470 struct PreBindBacklog { 471 bool send_initial_metadata_wanted = false; 472 bool write_and_finish_wanted = false; 473 bool finish_wanted = false; 474 Request* read_wanted = nullptr; 475 const Response* write_wanted = nullptr; 476 ::grpc::WriteOptions write_options_wanted; 477 ::grpc::Status status_wanted; 478 }; 479 PreBindBacklog backlog_ /* GUARDED_BY(stream_mu_) */; 480 }; 481 482 /// \a ServerReadReactor is the interface for a client-streaming RPC. 483 template <class Request> 484 class ServerReadReactor : public internal::ServerReactor { 485 public: ServerReadReactor()486 ServerReadReactor() : reader_(nullptr) {} 487 ~ServerReadReactor() override = default; 488 489 /// The following operation initiations are exactly like ServerBidiReactor. StartSendInitialMetadata()490 void StartSendInitialMetadata() { 491 ServerCallbackReader<Request>* reader = 492 reader_.load(std::memory_order_acquire); 493 if (reader == nullptr) { 494 grpc::internal::MutexLock l(&reader_mu_); 495 reader = reader_.load(std::memory_order_relaxed); 496 if (reader == nullptr) { 497 backlog_.send_initial_metadata_wanted = true; 498 return; 499 } 500 } 501 reader->SendInitialMetadata(); 502 } StartRead(Request * req)503 void StartRead(Request* req) { 504 ServerCallbackReader<Request>* reader = 505 reader_.load(std::memory_order_acquire); 506 if (reader == nullptr) { 507 grpc::internal::MutexLock l(&reader_mu_); 508 reader = reader_.load(std::memory_order_relaxed); 509 if (reader == nullptr) { 510 backlog_.read_wanted = req; 511 return; 512 } 513 } 514 reader->Read(req); 515 } Finish(::grpc::Status s)516 void Finish(::grpc::Status s) { 517 ServerCallbackReader<Request>* reader = 518 reader_.load(std::memory_order_acquire); 519 if (reader == nullptr) { 520 grpc::internal::MutexLock l(&reader_mu_); 521 reader = reader_.load(std::memory_order_relaxed); 522 if (reader == nullptr) { 523 backlog_.finish_wanted = true; 524 backlog_.status_wanted = std::move(s); 525 return; 526 } 527 } 528 reader->Finish(std::move(s)); 529 } 530 531 /// The following notifications are exactly like ServerBidiReactor. OnSendInitialMetadataDone(bool)532 virtual void OnSendInitialMetadataDone(bool /*ok*/) {} OnReadDone(bool)533 virtual void OnReadDone(bool /*ok*/) {} 534 void OnDone() override = 0; OnCancel()535 void OnCancel() override {} 536 537 private: 538 friend class ServerCallbackReader<Request>; 539 540 // May be overridden by internal implementation details. This is not a public 541 // customization point. InternalBindReader(ServerCallbackReader<Request> * reader)542 virtual void InternalBindReader(ServerCallbackReader<Request>* reader) { 543 grpc::internal::MutexLock l(&reader_mu_); 544 545 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { 546 reader->SendInitialMetadata(); 547 } 548 if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) { 549 reader->Read(backlog_.read_wanted); 550 } 551 if (GPR_UNLIKELY(backlog_.finish_wanted)) { 552 reader->Finish(std::move(backlog_.status_wanted)); 553 } 554 // Set reader_ last so that other functions can use it lock-free 555 reader_.store(reader, std::memory_order_release); 556 } 557 558 grpc::internal::Mutex reader_mu_; 559 std::atomic<ServerCallbackReader<Request>*> reader_{nullptr}; 560 struct PreBindBacklog { 561 bool send_initial_metadata_wanted = false; 562 bool finish_wanted = false; 563 Request* read_wanted = nullptr; 564 ::grpc::Status status_wanted; 565 }; 566 PreBindBacklog backlog_ /* GUARDED_BY(reader_mu_) */; 567 }; 568 569 /// \a ServerWriteReactor is the interface for a server-streaming RPC. 570 template <class Response> 571 class ServerWriteReactor : public internal::ServerReactor { 572 public: ServerWriteReactor()573 ServerWriteReactor() : writer_(nullptr) {} 574 ~ServerWriteReactor() override = default; 575 576 /// The following operation initiations are exactly like ServerBidiReactor. StartSendInitialMetadata()577 void StartSendInitialMetadata() { 578 ServerCallbackWriter<Response>* writer = 579 writer_.load(std::memory_order_acquire); 580 if (writer == nullptr) { 581 grpc::internal::MutexLock l(&writer_mu_); 582 writer = writer_.load(std::memory_order_relaxed); 583 if (writer == nullptr) { 584 backlog_.send_initial_metadata_wanted = true; 585 return; 586 } 587 } 588 writer->SendInitialMetadata(); 589 } StartWrite(const Response * resp)590 void StartWrite(const Response* resp) { 591 StartWrite(resp, ::grpc::WriteOptions()); 592 } StartWrite(const Response * resp,::grpc::WriteOptions options)593 void StartWrite(const Response* resp, ::grpc::WriteOptions options) { 594 ServerCallbackWriter<Response>* writer = 595 writer_.load(std::memory_order_acquire); 596 if (writer == nullptr) { 597 grpc::internal::MutexLock l(&writer_mu_); 598 writer = writer_.load(std::memory_order_relaxed); 599 if (writer == nullptr) { 600 backlog_.write_wanted = resp; 601 backlog_.write_options_wanted = options; 602 return; 603 } 604 } 605 writer->Write(resp, options); 606 } StartWriteAndFinish(const Response * resp,::grpc::WriteOptions options,::grpc::Status s)607 void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options, 608 ::grpc::Status s) { 609 ServerCallbackWriter<Response>* writer = 610 writer_.load(std::memory_order_acquire); 611 if (writer == nullptr) { 612 grpc::internal::MutexLock l(&writer_mu_); 613 writer = writer_.load(std::memory_order_relaxed); 614 if (writer == nullptr) { 615 backlog_.write_and_finish_wanted = true; 616 backlog_.write_wanted = resp; 617 backlog_.write_options_wanted = options; 618 backlog_.status_wanted = std::move(s); 619 return; 620 } 621 } 622 writer->WriteAndFinish(resp, options, std::move(s)); 623 } StartWriteLast(const Response * resp,::grpc::WriteOptions options)624 void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) { 625 StartWrite(resp, options.set_last_message()); 626 } Finish(::grpc::Status s)627 void Finish(::grpc::Status s) { 628 ServerCallbackWriter<Response>* writer = 629 writer_.load(std::memory_order_acquire); 630 if (writer == nullptr) { 631 grpc::internal::MutexLock l(&writer_mu_); 632 writer = writer_.load(std::memory_order_relaxed); 633 if (writer == nullptr) { 634 backlog_.finish_wanted = true; 635 backlog_.status_wanted = std::move(s); 636 return; 637 } 638 } 639 writer->Finish(std::move(s)); 640 } 641 642 /// The following notifications are exactly like ServerBidiReactor. OnSendInitialMetadataDone(bool)643 virtual void OnSendInitialMetadataDone(bool /*ok*/) {} OnWriteDone(bool)644 virtual void OnWriteDone(bool /*ok*/) {} 645 void OnDone() override = 0; OnCancel()646 void OnCancel() override {} 647 648 private: 649 friend class ServerCallbackWriter<Response>; 650 // May be overridden by internal implementation details. This is not a public 651 // customization point. InternalBindWriter(ServerCallbackWriter<Response> * writer)652 virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) { 653 grpc::internal::MutexLock l(&writer_mu_); 654 655 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { 656 writer->SendInitialMetadata(); 657 } 658 if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) { 659 writer->WriteAndFinish(backlog_.write_wanted, 660 std::move(backlog_.write_options_wanted), 661 std::move(backlog_.status_wanted)); 662 } else { 663 if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) { 664 writer->Write(backlog_.write_wanted, 665 std::move(backlog_.write_options_wanted)); 666 } 667 if (GPR_UNLIKELY(backlog_.finish_wanted)) { 668 writer->Finish(std::move(backlog_.status_wanted)); 669 } 670 } 671 // Set writer_ last so that other functions can use it lock-free 672 writer_.store(writer, std::memory_order_release); 673 } 674 675 grpc::internal::Mutex writer_mu_; 676 std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr}; 677 struct PreBindBacklog { 678 bool send_initial_metadata_wanted = false; 679 bool write_and_finish_wanted = false; 680 bool finish_wanted = false; 681 const Response* write_wanted = nullptr; 682 ::grpc::WriteOptions write_options_wanted; 683 ::grpc::Status status_wanted; 684 }; 685 PreBindBacklog backlog_ /* GUARDED_BY(writer_mu_) */; 686 }; 687 688 class ServerUnaryReactor : public internal::ServerReactor { 689 public: ServerUnaryReactor()690 ServerUnaryReactor() : call_(nullptr) {} 691 ~ServerUnaryReactor() override = default; 692 693 /// StartSendInitialMetadata is exactly like ServerBidiReactor. StartSendInitialMetadata()694 void StartSendInitialMetadata() { 695 ServerCallbackUnary* call = call_.load(std::memory_order_acquire); 696 if (call == nullptr) { 697 grpc::internal::MutexLock l(&call_mu_); 698 call = call_.load(std::memory_order_relaxed); 699 if (call == nullptr) { 700 backlog_.send_initial_metadata_wanted = true; 701 return; 702 } 703 } 704 call->SendInitialMetadata(); 705 } 706 /// Finish is similar to ServerBidiReactor except for one detail. 707 /// If the status is non-OK, any message will not be sent. Instead, 708 /// the client will only receive the status and any trailing metadata. Finish(::grpc::Status s)709 void Finish(::grpc::Status s) { 710 ServerCallbackUnary* call = call_.load(std::memory_order_acquire); 711 if (call == nullptr) { 712 grpc::internal::MutexLock l(&call_mu_); 713 call = call_.load(std::memory_order_relaxed); 714 if (call == nullptr) { 715 backlog_.finish_wanted = true; 716 backlog_.status_wanted = std::move(s); 717 return; 718 } 719 } 720 call->Finish(std::move(s)); 721 } 722 723 /// The following notifications are exactly like ServerBidiReactor. OnSendInitialMetadataDone(bool)724 virtual void OnSendInitialMetadataDone(bool /*ok*/) {} 725 void OnDone() override = 0; OnCancel()726 void OnCancel() override {} 727 728 private: 729 friend class ServerCallbackUnary; 730 // May be overridden by internal implementation details. This is not a public 731 // customization point. InternalBindCall(ServerCallbackUnary * call)732 virtual void InternalBindCall(ServerCallbackUnary* call) { 733 grpc::internal::MutexLock l(&call_mu_); 734 735 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { 736 call->SendInitialMetadata(); 737 } 738 if (GPR_UNLIKELY(backlog_.finish_wanted)) { 739 call->Finish(std::move(backlog_.status_wanted)); 740 } 741 // Set call_ last so that other functions can use it lock-free 742 call_.store(call, std::memory_order_release); 743 } 744 745 grpc::internal::Mutex call_mu_; 746 std::atomic<ServerCallbackUnary*> call_{nullptr}; 747 struct PreBindBacklog { 748 bool send_initial_metadata_wanted = false; 749 bool finish_wanted = false; 750 ::grpc::Status status_wanted; 751 }; 752 PreBindBacklog backlog_ /* GUARDED_BY(call_mu_) */; 753 }; 754 755 namespace internal { 756 757 template <class Base> 758 class FinishOnlyReactor : public Base { 759 public: FinishOnlyReactor(::grpc::Status s)760 explicit FinishOnlyReactor(::grpc::Status s) { this->Finish(std::move(s)); } OnDone()761 void OnDone() override { this->~FinishOnlyReactor(); } 762 }; 763 764 using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>; 765 template <class Request> 766 using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>; 767 template <class Response> 768 using UnimplementedWriteReactor = 769 FinishOnlyReactor<ServerWriteReactor<Response>>; 770 template <class Request, class Response> 771 using UnimplementedBidiReactor = 772 FinishOnlyReactor<ServerBidiReactor<Request, Response>>; 773 774 } // namespace internal 775 776 // TODO(vjpai): Remove namespace experimental when de-experimentalized fully. 777 namespace experimental { 778 779 template <class Request> 780 using ServerReadReactor = ::grpc::ServerReadReactor<Request>; 781 782 template <class Response> 783 using ServerWriteReactor = ::grpc::ServerWriteReactor<Response>; 784 785 template <class Request, class Response> 786 using ServerBidiReactor = ::grpc::ServerBidiReactor<Request, Response>; 787 788 using ServerUnaryReactor = ::grpc::ServerUnaryReactor; 789 790 } // namespace experimental 791 792 } // namespace grpc 793 794 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H 795