1 // 2 // 3 // Copyright 2018 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H 20 #define GRPCPP_SUPPORT_SERVER_CALLBACK_H 21 22 #include <atomic> 23 #include <functional> 24 #include <type_traits> 25 26 #include <grpcpp/impl/call.h> 27 #include <grpcpp/impl/call_op_set.h> 28 #include <grpcpp/impl/sync.h> 29 #include <grpcpp/support/callback_common.h> 30 #include <grpcpp/support/config.h> 31 #include <grpcpp/support/message_allocator.h> 32 #include <grpcpp/support/status.h> 33 34 namespace grpc { 35 36 // Declare base class of all reactors as internal 37 namespace internal { 38 39 // Forward declarations 40 template <class Request, class Response> 41 class CallbackUnaryHandler; 42 template <class Request, class Response> 43 class CallbackClientStreamingHandler; 44 template <class Request, class Response> 45 class CallbackServerStreamingHandler; 46 template <class Request, class Response> 47 class CallbackBidiHandler; 48 49 class ServerReactor { 50 public: 51 virtual ~ServerReactor() = default; 52 virtual void OnDone() = 0; 53 virtual void OnCancel() = 0; 54 55 // The following is not API. It is for internal use only and specifies whether 56 // all reactions of this Reactor can be run without an extra executor 57 // scheduling. This should only be used for internally-defined reactors with 58 // trivial reactions. InternalInlineable()59 virtual bool InternalInlineable() { return false; } 60 61 private: 62 template <class Request, class Response> 63 friend class CallbackUnaryHandler; 64 template <class Request, class Response> 65 friend class CallbackClientStreamingHandler; 66 template <class Request, class Response> 67 friend class CallbackServerStreamingHandler; 68 template <class Request, class Response> 69 friend class CallbackBidiHandler; 70 }; 71 72 /// The base class of ServerCallbackUnary etc. 73 class ServerCallbackCall { 74 public: ~ServerCallbackCall()75 virtual ~ServerCallbackCall() {} 76 77 // This object is responsible for tracking when it is safe to call OnDone and 78 // OnCancel. OnDone should not be called until the method handler is complete, 79 // Finish has been called, the ServerContext CompletionOp (which tracks 80 // cancellation or successful completion) has completed, and all outstanding 81 // Read/Write actions have seen their reactions. OnCancel should not be called 82 // until after the method handler is done and the RPC has completed with a 83 // cancellation. This is tracked by counting how many of these conditions have 84 // been met and calling OnCancel when none remain unmet. 85 86 // Public versions of MaybeDone: one where we don't know the reactor in 87 // advance (used for the ServerContext CompletionOp), and one for where we 88 // know the inlineability of the OnDone reaction. You should set the inline 89 // flag to true if either the Reactor is InternalInlineable() or if this 90 // callback is already being forced to run dispatched to an executor 91 // (typically because it contains additional work than just the MaybeDone). 92 MaybeDone()93 void MaybeDone() { 94 if (GPR_UNLIKELY(Unref() == 1)) { 95 ScheduleOnDone(reactor()->InternalInlineable()); 96 } 97 } 98 MaybeDone(bool inline_ondone)99 void MaybeDone(bool inline_ondone) { 100 if (GPR_UNLIKELY(Unref() == 1)) { 101 ScheduleOnDone(inline_ondone); 102 } 103 } 104 105 // Fast version called with known reactor passed in, used from derived 106 // classes, typically in non-cancel case MaybeCallOnCancel(ServerReactor * reactor)107 void MaybeCallOnCancel(ServerReactor* reactor) { 108 if (GPR_UNLIKELY(UnblockCancellation())) { 109 CallOnCancel(reactor); 110 } 111 } 112 113 // Slower version called from object that doesn't know the reactor a priori 114 // (such as the ServerContext CompletionOp which is formed before the 115 // reactor). This is used in cancel cases only, so it's ok to be slower and 116 // invoke a virtual function. MaybeCallOnCancel()117 void MaybeCallOnCancel() { 118 if (GPR_UNLIKELY(UnblockCancellation())) { 119 CallOnCancel(reactor()); 120 } 121 } 122 123 protected: 124 /// Increases the reference count Ref()125 void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); } 126 127 private: 128 virtual ServerReactor* reactor() = 0; 129 130 // CallOnDone performs the work required at completion of the RPC: invoking 131 // the OnDone function and doing all necessary cleanup. This function is only 132 // ever invoked on a fully-Unref'fed ServerCallbackCall. 133 virtual void CallOnDone() = 0; 134 135 // If the OnDone reaction is inlineable, execute it inline. Otherwise send it 136 // to an executor. 137 void ScheduleOnDone(bool inline_ondone); 138 139 // If the OnCancel reaction is inlineable, execute it inline. Otherwise send 140 // it to an executor. 141 void CallOnCancel(ServerReactor* reactor); 142 143 // Implement the cancellation constraint counter. Return true if OnCancel 144 // should be called, false otherwise. UnblockCancellation()145 bool UnblockCancellation() { 146 return on_cancel_conditions_remaining_.fetch_sub( 147 1, std::memory_order_acq_rel) == 1; 148 } 149 150 /// Decreases the reference count and returns the previous value Unref()151 int Unref() { 152 return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel); 153 } 154 155 std::atomic_int on_cancel_conditions_remaining_{2}; 156 std::atomic_int callbacks_outstanding_{ 157 3}; // reserve for start, Finish, and CompletionOp 158 }; 159 160 template <class Request, class Response> 161 class DefaultMessageHolder : public MessageHolder<Request, Response> { 162 public: DefaultMessageHolder()163 DefaultMessageHolder() { 164 this->set_request(&request_obj_); 165 this->set_response(&response_obj_); 166 } Release()167 void Release() override { 168 // the object is allocated in the call arena. 169 this->~DefaultMessageHolder<Request, Response>(); 170 } 171 172 private: 173 Request request_obj_; 174 Response response_obj_; 175 }; 176 177 } // namespace internal 178 179 // Forward declarations 180 class ServerUnaryReactor; 181 template <class Request> 182 class ServerReadReactor; 183 template <class Response> 184 class ServerWriteReactor; 185 template <class Request, class Response> 186 class ServerBidiReactor; 187 188 // NOTE: The actual call/stream object classes are provided as API only to 189 // support mocking. There are no implementations of these class interfaces in 190 // the API. 191 class ServerCallbackUnary : public internal::ServerCallbackCall { 192 public: ~ServerCallbackUnary()193 ~ServerCallbackUnary() override {} 194 virtual void Finish(grpc::Status s) = 0; 195 virtual void SendInitialMetadata() = 0; 196 197 protected: 198 // Use a template rather than explicitly specifying ServerUnaryReactor to 199 // delay binding and avoid a circular forward declaration issue 200 template <class Reactor> BindReactor(Reactor * reactor)201 void BindReactor(Reactor* reactor) { 202 reactor->InternalBindCall(this); 203 } 204 }; 205 206 template <class Request> 207 class ServerCallbackReader : public internal::ServerCallbackCall { 208 public: ~ServerCallbackReader()209 ~ServerCallbackReader() override {} 210 virtual void Finish(grpc::Status s) = 0; 211 virtual void SendInitialMetadata() = 0; 212 virtual void Read(Request* msg) = 0; 213 214 protected: BindReactor(ServerReadReactor<Request> * reactor)215 void BindReactor(ServerReadReactor<Request>* reactor) { 216 reactor->InternalBindReader(this); 217 } 218 }; 219 220 template <class Response> 221 class ServerCallbackWriter : public internal::ServerCallbackCall { 222 public: ~ServerCallbackWriter()223 ~ServerCallbackWriter() override {} 224 225 virtual void Finish(grpc::Status s) = 0; 226 virtual void SendInitialMetadata() = 0; 227 virtual void Write(const Response* msg, grpc::WriteOptions options) = 0; 228 virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options, 229 grpc::Status s) = 0; 230 231 protected: BindReactor(ServerWriteReactor<Response> * reactor)232 void BindReactor(ServerWriteReactor<Response>* reactor) { 233 reactor->InternalBindWriter(this); 234 } 235 }; 236 237 template <class Request, class Response> 238 class ServerCallbackReaderWriter : public internal::ServerCallbackCall { 239 public: ~ServerCallbackReaderWriter()240 ~ServerCallbackReaderWriter() override {} 241 242 virtual void Finish(grpc::Status s) = 0; 243 virtual void SendInitialMetadata() = 0; 244 virtual void Read(Request* msg) = 0; 245 virtual void Write(const Response* msg, grpc::WriteOptions options) = 0; 246 virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options, 247 grpc::Status s) = 0; 248 249 protected: BindReactor(ServerBidiReactor<Request,Response> * reactor)250 void BindReactor(ServerBidiReactor<Request, Response>* reactor) { 251 reactor->InternalBindStream(this); 252 } 253 }; 254 255 // The following classes are the reactor interfaces that are to be implemented 256 // by the user, returned as the output parameter of the method handler for a 257 // callback method. Note that none of the classes are pure; all reactions have a 258 // default empty reaction so that the user class only needs to override those 259 // reactions that it cares about. The reaction methods will be invoked by the 260 // library in response to the completion of various operations. Reactions must 261 // not include blocking operations (such as blocking I/O, starting synchronous 262 // RPCs, or waiting on condition variables). Reactions may be invoked 263 // concurrently, except that OnDone is called after all others (assuming proper 264 // API usage). The reactor may not be deleted until OnDone is called. 265 266 /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC. 267 template <class Request, class Response> 268 class ServerBidiReactor : public internal::ServerReactor { 269 public: 270 // NOTE: Initializing stream_ as a constructor initializer rather than a 271 // default initializer because gcc-4.x requires a copy constructor for 272 // default initializing a templated member, which isn't ok for atomic. 273 // TODO(vjpai): Switch to default constructor and default initializer when 274 // gcc-4.x is no longer supported ServerBidiReactor()275 ServerBidiReactor() : stream_(nullptr) {} 276 ~ServerBidiReactor() override = default; 277 278 /// Send any initial metadata stored in the RPC context. If not invoked, 279 /// any initial metadata will be passed along with the first Write or the 280 /// Finish (if there are no writes). StartSendInitialMetadata()281 void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_) { 282 ServerCallbackReaderWriter<Request, Response>* stream = 283 stream_.load(std::memory_order_acquire); 284 if (stream == nullptr) { 285 grpc::internal::MutexLock l(&stream_mu_); 286 stream = stream_.load(std::memory_order_relaxed); 287 if (stream == nullptr) { 288 backlog_.send_initial_metadata_wanted = true; 289 return; 290 } 291 } 292 stream->SendInitialMetadata(); 293 } 294 295 /// Initiate a read operation. 296 /// 297 /// \param[out] req Where to eventually store the read message. Valid when 298 /// the library calls OnReadDone StartRead(Request * req)299 void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(stream_mu_) { 300 ServerCallbackReaderWriter<Request, Response>* stream = 301 stream_.load(std::memory_order_acquire); 302 if (stream == nullptr) { 303 grpc::internal::MutexLock l(&stream_mu_); 304 stream = stream_.load(std::memory_order_relaxed); 305 if (stream == nullptr) { 306 backlog_.read_wanted = req; 307 return; 308 } 309 } 310 stream->Read(req); 311 } 312 313 /// Initiate a write operation. 314 /// 315 /// \param[in] resp The message to be written. The library does not take 316 /// ownership but the caller must ensure that the message is 317 /// not deleted or modified until OnWriteDone is called. StartWrite(const Response * resp)318 void StartWrite(const Response* resp) { 319 StartWrite(resp, grpc::WriteOptions()); 320 } 321 322 /// Initiate a write operation with specified options. 323 /// 324 /// \param[in] resp The message to be written. The library does not take 325 /// ownership but the caller must ensure that the message is 326 /// not deleted or modified until OnWriteDone is called. 327 /// \param[in] options The WriteOptions to use for writing this message StartWrite(const Response * resp,grpc::WriteOptions options)328 void StartWrite(const Response* resp, grpc::WriteOptions options) 329 ABSL_LOCKS_EXCLUDED(stream_mu_) { 330 ServerCallbackReaderWriter<Request, Response>* stream = 331 stream_.load(std::memory_order_acquire); 332 if (stream == nullptr) { 333 grpc::internal::MutexLock l(&stream_mu_); 334 stream = stream_.load(std::memory_order_relaxed); 335 if (stream == nullptr) { 336 backlog_.write_wanted = resp; 337 backlog_.write_options_wanted = options; 338 return; 339 } 340 } 341 stream->Write(resp, options); 342 } 343 344 /// Initiate a write operation with specified options and final RPC Status, 345 /// which also causes any trailing metadata for this RPC to be sent out. 346 /// StartWriteAndFinish is like merging StartWriteLast and Finish into a 347 /// single step. A key difference, though, is that this operation doesn't have 348 /// an OnWriteDone reaction - it is considered complete only when OnDone is 349 /// available. An RPC can either have StartWriteAndFinish or Finish, but not 350 /// both. 351 /// 352 /// \param[in] resp The message to be written. The library does not take 353 /// ownership but the caller must ensure that the message is 354 /// not deleted or modified until OnDone is called. 355 /// \param[in] options The WriteOptions to use for writing this message 356 /// \param[in] s The status outcome of this RPC StartWriteAndFinish(const Response * resp,grpc::WriteOptions options,grpc::Status s)357 void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options, 358 grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) { 359 ServerCallbackReaderWriter<Request, Response>* stream = 360 stream_.load(std::memory_order_acquire); 361 if (stream == nullptr) { 362 grpc::internal::MutexLock l(&stream_mu_); 363 stream = stream_.load(std::memory_order_relaxed); 364 if (stream == nullptr) { 365 backlog_.write_and_finish_wanted = true; 366 backlog_.write_wanted = resp; 367 backlog_.write_options_wanted = options; 368 backlog_.status_wanted = std::move(s); 369 return; 370 } 371 } 372 stream->WriteAndFinish(resp, options, std::move(s)); 373 } 374 375 /// Inform system of a planned write operation with specified options, but 376 /// allow the library to schedule the actual write coalesced with the writing 377 /// of trailing metadata (which takes place on a Finish call). 378 /// 379 /// \param[in] resp The message to be written. The library does not take 380 /// ownership but the caller must ensure that the message is 381 /// not deleted or modified until OnWriteDone is called. 382 /// \param[in] options The WriteOptions to use for writing this message StartWriteLast(const Response * resp,grpc::WriteOptions options)383 void StartWriteLast(const Response* resp, grpc::WriteOptions options) { 384 StartWrite(resp, options.set_last_message()); 385 } 386 387 /// Indicate that the stream is to be finished and the trailing metadata and 388 /// RPC status are to be sent. Every RPC MUST be finished using either Finish 389 /// or StartWriteAndFinish (but not both), even if the RPC is already 390 /// cancelled. 391 /// 392 /// \param[in] s The status outcome of this RPC Finish(grpc::Status s)393 void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) { 394 ServerCallbackReaderWriter<Request, Response>* stream = 395 stream_.load(std::memory_order_acquire); 396 if (stream == nullptr) { 397 grpc::internal::MutexLock l(&stream_mu_); 398 stream = stream_.load(std::memory_order_relaxed); 399 if (stream == nullptr) { 400 backlog_.finish_wanted = true; 401 backlog_.status_wanted = std::move(s); 402 return; 403 } 404 } 405 stream->Finish(std::move(s)); 406 } 407 408 /// Notifies the application that an explicit StartSendInitialMetadata 409 /// operation completed. Not used when the sending of initial metadata 410 /// piggybacks onto the first write. 411 /// 412 /// \param[in] ok Was it successful? If false, no further write-side operation 413 /// will succeed. OnSendInitialMetadataDone(bool)414 virtual void OnSendInitialMetadataDone(bool /*ok*/) {} 415 416 /// Notifies the application that a StartRead operation completed. 417 /// 418 /// \param[in] ok Was it successful? If false, no further read-side operation 419 /// will succeed. OnReadDone(bool)420 virtual void OnReadDone(bool /*ok*/) {} 421 422 /// Notifies the application that a StartWrite (or StartWriteLast) operation 423 /// completed. 424 /// 425 /// \param[in] ok Was it successful? If false, no further write-side operation 426 /// will succeed. OnWriteDone(bool)427 virtual void OnWriteDone(bool /*ok*/) {} 428 429 /// Notifies the application that all operations associated with this RPC 430 /// have completed. This is an override (from the internal base class) but 431 /// still abstract, so derived classes MUST override it to be instantiated. 432 void OnDone() override = 0; 433 434 /// Notifies the application that this RPC has been cancelled. This is an 435 /// override (from the internal base class) but not final, so derived classes 436 /// should override it if they want to take action. OnCancel()437 void OnCancel() override {} 438 439 private: 440 friend class ServerCallbackReaderWriter<Request, Response>; 441 // May be overridden by internal implementation details. This is not a public 442 // customization point. InternalBindStream(ServerCallbackReaderWriter<Request,Response> * stream)443 virtual void InternalBindStream( 444 ServerCallbackReaderWriter<Request, Response>* stream) { 445 grpc::internal::MutexLock l(&stream_mu_); 446 447 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { 448 stream->SendInitialMetadata(); 449 } 450 if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) { 451 stream->Read(backlog_.read_wanted); 452 } 453 if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) { 454 stream->WriteAndFinish(backlog_.write_wanted, 455 std::move(backlog_.write_options_wanted), 456 std::move(backlog_.status_wanted)); 457 } else { 458 if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) { 459 stream->Write(backlog_.write_wanted, 460 std::move(backlog_.write_options_wanted)); 461 } 462 if (GPR_UNLIKELY(backlog_.finish_wanted)) { 463 stream->Finish(std::move(backlog_.status_wanted)); 464 } 465 } 466 // Set stream_ last so that other functions can use it lock-free 467 stream_.store(stream, std::memory_order_release); 468 } 469 470 grpc::internal::Mutex stream_mu_; 471 // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant 472 // once C++17 or ABSL is supported since stream and backlog are 473 // mutually exclusive in this class. Do likewise with the 474 // remaining reactor classes and their backlogs as well. 475 std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr}; 476 struct PreBindBacklog { 477 bool send_initial_metadata_wanted = false; 478 bool write_and_finish_wanted = false; 479 bool finish_wanted = false; 480 Request* read_wanted = nullptr; 481 const Response* write_wanted = nullptr; 482 grpc::WriteOptions write_options_wanted; 483 grpc::Status status_wanted; 484 }; 485 PreBindBacklog backlog_ ABSL_GUARDED_BY(stream_mu_); 486 }; 487 488 /// \a ServerReadReactor is the interface for a client-streaming RPC. 489 template <class Request> 490 class ServerReadReactor : public internal::ServerReactor { 491 public: ServerReadReactor()492 ServerReadReactor() : reader_(nullptr) {} 493 ~ServerReadReactor() override = default; 494 495 /// The following operation initiations are exactly like ServerBidiReactor. StartSendInitialMetadata()496 void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_) { 497 ServerCallbackReader<Request>* reader = 498 reader_.load(std::memory_order_acquire); 499 if (reader == nullptr) { 500 grpc::internal::MutexLock l(&reader_mu_); 501 reader = reader_.load(std::memory_order_relaxed); 502 if (reader == nullptr) { 503 backlog_.send_initial_metadata_wanted = true; 504 return; 505 } 506 } 507 reader->SendInitialMetadata(); 508 } StartRead(Request * req)509 void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(reader_mu_) { 510 ServerCallbackReader<Request>* reader = 511 reader_.load(std::memory_order_acquire); 512 if (reader == nullptr) { 513 grpc::internal::MutexLock l(&reader_mu_); 514 reader = reader_.load(std::memory_order_relaxed); 515 if (reader == nullptr) { 516 backlog_.read_wanted = req; 517 return; 518 } 519 } 520 reader->Read(req); 521 } Finish(grpc::Status s)522 void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_) { 523 ServerCallbackReader<Request>* reader = 524 reader_.load(std::memory_order_acquire); 525 if (reader == nullptr) { 526 grpc::internal::MutexLock l(&reader_mu_); 527 reader = reader_.load(std::memory_order_relaxed); 528 if (reader == nullptr) { 529 backlog_.finish_wanted = true; 530 backlog_.status_wanted = std::move(s); 531 return; 532 } 533 } 534 reader->Finish(std::move(s)); 535 } 536 537 /// The following notifications are exactly like ServerBidiReactor. OnSendInitialMetadataDone(bool)538 virtual void OnSendInitialMetadataDone(bool /*ok*/) {} OnReadDone(bool)539 virtual void OnReadDone(bool /*ok*/) {} 540 void OnDone() override = 0; OnCancel()541 void OnCancel() override {} 542 543 private: 544 friend class ServerCallbackReader<Request>; 545 546 // May be overridden by internal implementation details. This is not a public 547 // customization point. InternalBindReader(ServerCallbackReader<Request> * reader)548 virtual void InternalBindReader(ServerCallbackReader<Request>* reader) 549 ABSL_LOCKS_EXCLUDED(reader_mu_) { 550 grpc::internal::MutexLock l(&reader_mu_); 551 552 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { 553 reader->SendInitialMetadata(); 554 } 555 if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) { 556 reader->Read(backlog_.read_wanted); 557 } 558 if (GPR_UNLIKELY(backlog_.finish_wanted)) { 559 reader->Finish(std::move(backlog_.status_wanted)); 560 } 561 // Set reader_ last so that other functions can use it lock-free 562 reader_.store(reader, std::memory_order_release); 563 } 564 565 grpc::internal::Mutex reader_mu_; 566 std::atomic<ServerCallbackReader<Request>*> reader_{nullptr}; 567 struct PreBindBacklog { 568 bool send_initial_metadata_wanted = false; 569 bool finish_wanted = false; 570 Request* read_wanted = nullptr; 571 grpc::Status status_wanted; 572 }; 573 PreBindBacklog backlog_ ABSL_GUARDED_BY(reader_mu_); 574 }; 575 576 /// \a ServerWriteReactor is the interface for a server-streaming RPC. 577 template <class Response> 578 class ServerWriteReactor : public internal::ServerReactor { 579 public: ServerWriteReactor()580 ServerWriteReactor() : writer_(nullptr) {} 581 ~ServerWriteReactor() override = default; 582 583 /// The following operation initiations are exactly like ServerBidiReactor. StartSendInitialMetadata()584 void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_) { 585 ServerCallbackWriter<Response>* writer = 586 writer_.load(std::memory_order_acquire); 587 if (writer == nullptr) { 588 grpc::internal::MutexLock l(&writer_mu_); 589 writer = writer_.load(std::memory_order_relaxed); 590 if (writer == nullptr) { 591 backlog_.send_initial_metadata_wanted = true; 592 return; 593 } 594 } 595 writer->SendInitialMetadata(); 596 } StartWrite(const Response * resp)597 void StartWrite(const Response* resp) { 598 StartWrite(resp, grpc::WriteOptions()); 599 } StartWrite(const Response * resp,grpc::WriteOptions options)600 void StartWrite(const Response* resp, grpc::WriteOptions options) 601 ABSL_LOCKS_EXCLUDED(writer_mu_) { 602 ServerCallbackWriter<Response>* writer = 603 writer_.load(std::memory_order_acquire); 604 if (writer == nullptr) { 605 grpc::internal::MutexLock l(&writer_mu_); 606 writer = writer_.load(std::memory_order_relaxed); 607 if (writer == nullptr) { 608 backlog_.write_wanted = resp; 609 backlog_.write_options_wanted = options; 610 return; 611 } 612 } 613 writer->Write(resp, options); 614 } StartWriteAndFinish(const Response * resp,grpc::WriteOptions options,grpc::Status s)615 void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options, 616 grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) { 617 ServerCallbackWriter<Response>* writer = 618 writer_.load(std::memory_order_acquire); 619 if (writer == nullptr) { 620 grpc::internal::MutexLock l(&writer_mu_); 621 writer = writer_.load(std::memory_order_relaxed); 622 if (writer == nullptr) { 623 backlog_.write_and_finish_wanted = true; 624 backlog_.write_wanted = resp; 625 backlog_.write_options_wanted = options; 626 backlog_.status_wanted = std::move(s); 627 return; 628 } 629 } 630 writer->WriteAndFinish(resp, options, std::move(s)); 631 } StartWriteLast(const Response * resp,grpc::WriteOptions options)632 void StartWriteLast(const Response* resp, grpc::WriteOptions options) { 633 StartWrite(resp, options.set_last_message()); 634 } Finish(grpc::Status s)635 void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) { 636 ServerCallbackWriter<Response>* writer = 637 writer_.load(std::memory_order_acquire); 638 if (writer == nullptr) { 639 grpc::internal::MutexLock l(&writer_mu_); 640 writer = writer_.load(std::memory_order_relaxed); 641 if (writer == nullptr) { 642 backlog_.finish_wanted = true; 643 backlog_.status_wanted = std::move(s); 644 return; 645 } 646 } 647 writer->Finish(std::move(s)); 648 } 649 650 /// The following notifications are exactly like ServerBidiReactor. OnSendInitialMetadataDone(bool)651 virtual void OnSendInitialMetadataDone(bool /*ok*/) {} OnWriteDone(bool)652 virtual void OnWriteDone(bool /*ok*/) {} 653 void OnDone() override = 0; OnCancel()654 void OnCancel() override {} 655 656 private: 657 friend class ServerCallbackWriter<Response>; 658 // May be overridden by internal implementation details. This is not a public 659 // customization point. InternalBindWriter(ServerCallbackWriter<Response> * writer)660 virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) 661 ABSL_LOCKS_EXCLUDED(writer_mu_) { 662 grpc::internal::MutexLock l(&writer_mu_); 663 664 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { 665 writer->SendInitialMetadata(); 666 } 667 if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) { 668 writer->WriteAndFinish(backlog_.write_wanted, 669 std::move(backlog_.write_options_wanted), 670 std::move(backlog_.status_wanted)); 671 } else { 672 if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) { 673 writer->Write(backlog_.write_wanted, 674 std::move(backlog_.write_options_wanted)); 675 } 676 if (GPR_UNLIKELY(backlog_.finish_wanted)) { 677 writer->Finish(std::move(backlog_.status_wanted)); 678 } 679 } 680 // Set writer_ last so that other functions can use it lock-free 681 writer_.store(writer, std::memory_order_release); 682 } 683 684 grpc::internal::Mutex writer_mu_; 685 std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr}; 686 struct PreBindBacklog { 687 bool send_initial_metadata_wanted = false; 688 bool write_and_finish_wanted = false; 689 bool finish_wanted = false; 690 const Response* write_wanted = nullptr; 691 grpc::WriteOptions write_options_wanted; 692 grpc::Status status_wanted; 693 }; 694 PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_); 695 }; 696 697 class ServerUnaryReactor : public internal::ServerReactor { 698 public: ServerUnaryReactor()699 ServerUnaryReactor() : call_(nullptr) {} 700 ~ServerUnaryReactor() override = default; 701 702 /// StartSendInitialMetadata is exactly like ServerBidiReactor. StartSendInitialMetadata()703 void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_) { 704 ServerCallbackUnary* call = call_.load(std::memory_order_acquire); 705 if (call == nullptr) { 706 grpc::internal::MutexLock l(&call_mu_); 707 call = call_.load(std::memory_order_relaxed); 708 if (call == nullptr) { 709 backlog_.send_initial_metadata_wanted = true; 710 return; 711 } 712 } 713 call->SendInitialMetadata(); 714 } 715 /// Finish is similar to ServerBidiReactor except for one detail. 716 /// If the status is non-OK, any message will not be sent. Instead, 717 /// the client will only receive the status and any trailing metadata. Finish(grpc::Status s)718 void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_) { 719 ServerCallbackUnary* call = call_.load(std::memory_order_acquire); 720 if (call == nullptr) { 721 grpc::internal::MutexLock l(&call_mu_); 722 call = call_.load(std::memory_order_relaxed); 723 if (call == nullptr) { 724 backlog_.finish_wanted = true; 725 backlog_.status_wanted = std::move(s); 726 return; 727 } 728 } 729 call->Finish(std::move(s)); 730 } 731 732 /// The following notifications are exactly like ServerBidiReactor. OnSendInitialMetadataDone(bool)733 virtual void OnSendInitialMetadataDone(bool /*ok*/) {} 734 void OnDone() override = 0; OnCancel()735 void OnCancel() override {} 736 737 private: 738 friend class ServerCallbackUnary; 739 // May be overridden by internal implementation details. This is not a public 740 // customization point. InternalBindCall(ServerCallbackUnary * call)741 virtual void InternalBindCall(ServerCallbackUnary* call) 742 ABSL_LOCKS_EXCLUDED(call_mu_) { 743 grpc::internal::MutexLock l(&call_mu_); 744 745 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { 746 call->SendInitialMetadata(); 747 } 748 if (GPR_UNLIKELY(backlog_.finish_wanted)) { 749 call->Finish(std::move(backlog_.status_wanted)); 750 } 751 // Set call_ last so that other functions can use it lock-free 752 call_.store(call, std::memory_order_release); 753 } 754 755 grpc::internal::Mutex call_mu_; 756 std::atomic<ServerCallbackUnary*> call_{nullptr}; 757 struct PreBindBacklog { 758 bool send_initial_metadata_wanted = false; 759 bool finish_wanted = false; 760 grpc::Status status_wanted; 761 }; 762 PreBindBacklog backlog_ ABSL_GUARDED_BY(call_mu_); 763 }; 764 765 namespace internal { 766 767 template <class Base> 768 class FinishOnlyReactor : public Base { 769 public: FinishOnlyReactor(grpc::Status s)770 explicit FinishOnlyReactor(grpc::Status s) { this->Finish(std::move(s)); } OnDone()771 void OnDone() override { this->~FinishOnlyReactor(); } 772 }; 773 774 using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>; 775 template <class Request> 776 using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>; 777 template <class Response> 778 using UnimplementedWriteReactor = 779 FinishOnlyReactor<ServerWriteReactor<Response>>; 780 template <class Request, class Response> 781 using UnimplementedBidiReactor = 782 FinishOnlyReactor<ServerBidiReactor<Request, Response>>; 783 784 } // namespace internal 785 786 // TODO(vjpai): Remove namespace experimental when last known users are migrated 787 // off. 788 namespace experimental { 789 790 template <class Request, class Response> 791 using ServerBidiReactor = ::grpc::ServerBidiReactor<Request, Response>; 792 793 } // namespace experimental 794 795 } // namespace grpc 796 797 #endif // GRPCPP_SUPPORT_SERVER_CALLBACK_H 798