1 /* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H 19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H 20 21 #include <atomic> 22 #include <functional> 23 #include <type_traits> 24 25 #include <grpcpp/impl/codegen/call.h> 26 #include <grpcpp/impl/codegen/call_op_set.h> 27 #include <grpcpp/impl/codegen/callback_common.h> 28 #include <grpcpp/impl/codegen/config.h> 29 #include <grpcpp/impl/codegen/core_codegen_interface.h> 30 #include <grpcpp/impl/codegen/message_allocator.h> 31 #include <grpcpp/impl/codegen/status.h> 32 #include <grpcpp/impl/codegen/sync.h> 33 34 namespace grpc { 35 36 // Declare base class of all reactors as internal 37 namespace internal { 38 39 // Forward declarations 40 template <class Request, class Response> 41 class CallbackUnaryHandler; 42 template <class Request, class Response> 43 class CallbackClientStreamingHandler; 44 template <class Request, class Response> 45 class CallbackServerStreamingHandler; 46 template <class Request, class Response> 47 class CallbackBidiHandler; 48 49 class ServerReactor { 50 public: 51 virtual ~ServerReactor() = default; 52 virtual void OnDone() = 0; 53 virtual void OnCancel() = 0; 54 55 // The following is not API. It is for internal use only and specifies whether 56 // all reactions of this Reactor can be run without an extra executor 57 // scheduling. This should only be used for internally-defined reactors with 58 // trivial reactions. InternalInlineable()59 virtual bool InternalInlineable() { return false; } 60 61 private: 62 template <class Request, class Response> 63 friend class CallbackUnaryHandler; 64 template <class Request, class Response> 65 friend class CallbackClientStreamingHandler; 66 template <class Request, class Response> 67 friend class CallbackServerStreamingHandler; 68 template <class Request, class Response> 69 friend class CallbackBidiHandler; 70 }; 71 72 /// The base class of ServerCallbackUnary etc. 73 class ServerCallbackCall { 74 public: ~ServerCallbackCall()75 virtual ~ServerCallbackCall() {} 76 77 // This object is responsible for tracking when it is safe to call OnDone and 78 // OnCancel. OnDone should not be called until the method handler is complete, 79 // Finish has been called, the ServerContext CompletionOp (which tracks 80 // cancellation or successful completion) has completed, and all outstanding 81 // Read/Write actions have seen their reactions. OnCancel should not be called 82 // until after the method handler is done and the RPC has completed with a 83 // cancellation. This is tracked by counting how many of these conditions have 84 // been met and calling OnCancel when none remain unmet. 85 86 // Public versions of MaybeDone: one where we don't know the reactor in 87 // advance (used for the ServerContext CompletionOp), and one for where we 88 // know the inlineability of the OnDone reaction. You should set the inline 89 // flag to true if either the Reactor is InternalInlineable() or if this 90 // callback is already being forced to run dispatched to an executor 91 // (typically because it contains additional work than just the MaybeDone). 92 MaybeDone()93 void MaybeDone() { 94 if (GPR_UNLIKELY(Unref() == 1)) { 95 ScheduleOnDone(reactor()->InternalInlineable()); 96 } 97 } 98 MaybeDone(bool inline_ondone)99 void MaybeDone(bool inline_ondone) { 100 if (GPR_UNLIKELY(Unref() == 1)) { 101 ScheduleOnDone(inline_ondone); 102 } 103 } 104 105 // Fast version called with known reactor passed in, used from derived 106 // classes, typically in non-cancel case MaybeCallOnCancel(ServerReactor * reactor)107 void MaybeCallOnCancel(ServerReactor* reactor) { 108 if (GPR_UNLIKELY(UnblockCancellation())) { 109 CallOnCancel(reactor); 110 } 111 } 112 113 // Slower version called from object that doesn't know the reactor a priori 114 // (such as the ServerContext CompletionOp which is formed before the 115 // reactor). This is used in cancel cases only, so it's ok to be slower and 116 // invoke a virtual function. MaybeCallOnCancel()117 void MaybeCallOnCancel() { 118 if (GPR_UNLIKELY(UnblockCancellation())) { 119 CallOnCancel(reactor()); 120 } 121 } 122 123 protected: 124 /// Increases the reference count Ref()125 void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); } 126 127 private: 128 virtual ServerReactor* reactor() = 0; 129 130 // CallOnDone performs the work required at completion of the RPC: invoking 131 // the OnDone function and doing all necessary cleanup. This function is only 132 // ever invoked on a fully-Unref'fed ServerCallbackCall. 133 virtual void CallOnDone() = 0; 134 135 // If the OnDone reaction is inlineable, execute it inline. Otherwise send it 136 // to an executor. 137 void ScheduleOnDone(bool inline_ondone); 138 139 // If the OnCancel reaction is inlineable, execute it inline. Otherwise send 140 // it to an executor. 141 void CallOnCancel(ServerReactor* reactor); 142 143 // Implement the cancellation constraint counter. Return true if OnCancel 144 // should be called, false otherwise. UnblockCancellation()145 bool UnblockCancellation() { 146 return on_cancel_conditions_remaining_.fetch_sub( 147 1, std::memory_order_acq_rel) == 1; 148 } 149 150 /// Decreases the reference count and returns the previous value Unref()151 int Unref() { 152 return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel); 153 } 154 155 std::atomic_int on_cancel_conditions_remaining_{2}; 156 std::atomic_int callbacks_outstanding_{ 157 3}; // reserve for start, Finish, and CompletionOp 158 }; 159 160 template <class Request, class Response> 161 class DefaultMessageHolder 162 : public ::grpc::experimental::MessageHolder<Request, Response> { 163 public: DefaultMessageHolder()164 DefaultMessageHolder() { 165 this->set_request(&request_obj_); 166 this->set_response(&response_obj_); 167 } Release()168 void Release() override { 169 // the object is allocated in the call arena. 170 this->~DefaultMessageHolder<Request, Response>(); 171 } 172 173 private: 174 Request request_obj_; 175 Response response_obj_; 176 }; 177 178 } // namespace internal 179 180 // Forward declarations 181 class ServerUnaryReactor; 182 template <class Request> 183 class ServerReadReactor; 184 template <class Response> 185 class ServerWriteReactor; 186 template <class Request, class Response> 187 class ServerBidiReactor; 188 189 // NOTE: The actual call/stream object classes are provided as API only to 190 // support mocking. There are no implementations of these class interfaces in 191 // the API. 192 class ServerCallbackUnary : public internal::ServerCallbackCall { 193 public: ~ServerCallbackUnary()194 ~ServerCallbackUnary() override {} 195 virtual void Finish(::grpc::Status s) = 0; 196 virtual void SendInitialMetadata() = 0; 197 198 protected: 199 // Use a template rather than explicitly specifying ServerUnaryReactor to 200 // delay binding and avoid a circular forward declaration issue 201 template <class Reactor> BindReactor(Reactor * reactor)202 void BindReactor(Reactor* reactor) { 203 reactor->InternalBindCall(this); 204 } 205 }; 206 207 template <class Request> 208 class ServerCallbackReader : public internal::ServerCallbackCall { 209 public: ~ServerCallbackReader()210 ~ServerCallbackReader() override {} 211 virtual void Finish(::grpc::Status s) = 0; 212 virtual void SendInitialMetadata() = 0; 213 virtual void Read(Request* msg) = 0; 214 215 protected: BindReactor(ServerReadReactor<Request> * reactor)216 void BindReactor(ServerReadReactor<Request>* reactor) { 217 reactor->InternalBindReader(this); 218 } 219 }; 220 221 template <class Response> 222 class ServerCallbackWriter : public internal::ServerCallbackCall { 223 public: ~ServerCallbackWriter()224 ~ServerCallbackWriter() override {} 225 226 virtual void Finish(::grpc::Status s) = 0; 227 virtual void SendInitialMetadata() = 0; 228 virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0; 229 virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options, 230 ::grpc::Status s) = 0; 231 232 protected: BindReactor(ServerWriteReactor<Response> * reactor)233 void BindReactor(ServerWriteReactor<Response>* reactor) { 234 reactor->InternalBindWriter(this); 235 } 236 }; 237 238 template <class Request, class Response> 239 class ServerCallbackReaderWriter : public internal::ServerCallbackCall { 240 public: ~ServerCallbackReaderWriter()241 ~ServerCallbackReaderWriter() override {} 242 243 virtual void Finish(::grpc::Status s) = 0; 244 virtual void SendInitialMetadata() = 0; 245 virtual void Read(Request* msg) = 0; 246 virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0; 247 virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options, 248 ::grpc::Status s) = 0; 249 250 protected: BindReactor(ServerBidiReactor<Request,Response> * reactor)251 void BindReactor(ServerBidiReactor<Request, Response>* reactor) { 252 reactor->InternalBindStream(this); 253 } 254 }; 255 256 // The following classes are the reactor interfaces that are to be implemented 257 // by the user, returned as the output parameter of the method handler for a 258 // callback method. Note that none of the classes are pure; all reactions have a 259 // default empty reaction so that the user class only needs to override those 260 // reactions that it cares about. The reaction methods will be invoked by the 261 // library in response to the completion of various operations. Reactions must 262 // not include blocking operations (such as blocking I/O, starting synchronous 263 // RPCs, or waiting on condition variables). Reactions may be invoked 264 // concurrently, except that OnDone is called after all others (assuming proper 265 // API usage). The reactor may not be deleted until OnDone is called. 266 267 /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC. 268 template <class Request, class Response> 269 class ServerBidiReactor : public internal::ServerReactor { 270 public: 271 // NOTE: Initializing stream_ as a constructor initializer rather than a 272 // default initializer because gcc-4.x requires a copy constructor for 273 // default initializing a templated member, which isn't ok for atomic. 274 // TODO(vjpai): Switch to default constructor and default initializer when 275 // gcc-4.x is no longer supported ServerBidiReactor()276 ServerBidiReactor() : stream_(nullptr) {} 277 ~ServerBidiReactor() override = default; 278 279 /// Send any initial metadata stored in the RPC context. If not invoked, 280 /// any initial metadata will be passed along with the first Write or the 281 /// Finish (if there are no writes). StartSendInitialMetadata()282 void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_) { 283 ServerCallbackReaderWriter<Request, Response>* stream = 284 stream_.load(std::memory_order_acquire); 285 if (stream == nullptr) { 286 grpc::internal::MutexLock l(&stream_mu_); 287 stream = stream_.load(std::memory_order_relaxed); 288 if (stream == nullptr) { 289 backlog_.send_initial_metadata_wanted = true; 290 return; 291 } 292 } 293 stream->SendInitialMetadata(); 294 } 295 296 /// Initiate a read operation. 297 /// 298 /// \param[out] req Where to eventually store the read message. Valid when 299 /// the library calls OnReadDone StartRead(Request * req)300 void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(stream_mu_) { 301 ServerCallbackReaderWriter<Request, Response>* stream = 302 stream_.load(std::memory_order_acquire); 303 if (stream == nullptr) { 304 grpc::internal::MutexLock l(&stream_mu_); 305 stream = stream_.load(std::memory_order_relaxed); 306 if (stream == nullptr) { 307 backlog_.read_wanted = req; 308 return; 309 } 310 } 311 stream->Read(req); 312 } 313 314 /// Initiate a write operation. 315 /// 316 /// \param[in] resp The message to be written. The library does not take 317 /// ownership but the caller must ensure that the message is 318 /// not deleted or modified until OnWriteDone is called. StartWrite(const Response * resp)319 void StartWrite(const Response* resp) { 320 StartWrite(resp, ::grpc::WriteOptions()); 321 } 322 323 /// Initiate a write operation with specified options. 324 /// 325 /// \param[in] resp The message to be written. The library does not take 326 /// ownership but the caller must ensure that the message is 327 /// not deleted or modified until OnWriteDone is called. 328 /// \param[in] options The WriteOptions to use for writing this message StartWrite(const Response * resp,::grpc::WriteOptions options)329 void StartWrite(const Response* resp, ::grpc::WriteOptions options) 330 ABSL_LOCKS_EXCLUDED(stream_mu_) { 331 ServerCallbackReaderWriter<Request, Response>* stream = 332 stream_.load(std::memory_order_acquire); 333 if (stream == nullptr) { 334 grpc::internal::MutexLock l(&stream_mu_); 335 stream = stream_.load(std::memory_order_relaxed); 336 if (stream == nullptr) { 337 backlog_.write_wanted = resp; 338 backlog_.write_options_wanted = options; 339 return; 340 } 341 } 342 stream->Write(resp, options); 343 } 344 345 /// Initiate a write operation with specified options and final RPC Status, 346 /// which also causes any trailing metadata for this RPC to be sent out. 347 /// StartWriteAndFinish is like merging StartWriteLast and Finish into a 348 /// single step. A key difference, though, is that this operation doesn't have 349 /// an OnWriteDone reaction - it is considered complete only when OnDone is 350 /// available. An RPC can either have StartWriteAndFinish or Finish, but not 351 /// both. 352 /// 353 /// \param[in] resp The message to be written. The library does not take 354 /// ownership but the caller must ensure that the message is 355 /// not deleted or modified until OnDone is called. 356 /// \param[in] options The WriteOptions to use for writing this message 357 /// \param[in] s The status outcome of this RPC StartWriteAndFinish(const Response * resp,::grpc::WriteOptions options,::grpc::Status s)358 void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options, 359 ::grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) { 360 ServerCallbackReaderWriter<Request, Response>* stream = 361 stream_.load(std::memory_order_acquire); 362 if (stream == nullptr) { 363 grpc::internal::MutexLock l(&stream_mu_); 364 stream = stream_.load(std::memory_order_relaxed); 365 if (stream == nullptr) { 366 backlog_.write_and_finish_wanted = true; 367 backlog_.write_wanted = resp; 368 backlog_.write_options_wanted = options; 369 backlog_.status_wanted = std::move(s); 370 return; 371 } 372 } 373 stream->WriteAndFinish(resp, options, std::move(s)); 374 } 375 376 /// Inform system of a planned write operation with specified options, but 377 /// allow the library to schedule the actual write coalesced with the writing 378 /// of trailing metadata (which takes place on a Finish call). 379 /// 380 /// \param[in] resp The message to be written. The library does not take 381 /// ownership but the caller must ensure that the message is 382 /// not deleted or modified until OnWriteDone is called. 383 /// \param[in] options The WriteOptions to use for writing this message StartWriteLast(const Response * resp,::grpc::WriteOptions options)384 void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) { 385 StartWrite(resp, options.set_last_message()); 386 } 387 388 /// Indicate that the stream is to be finished and the trailing metadata and 389 /// RPC status are to be sent. Every RPC MUST be finished using either Finish 390 /// or StartWriteAndFinish (but not both), even if the RPC is already 391 /// cancelled. 392 /// 393 /// \param[in] s The status outcome of this RPC Finish(::grpc::Status s)394 void Finish(::grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) { 395 ServerCallbackReaderWriter<Request, Response>* stream = 396 stream_.load(std::memory_order_acquire); 397 if (stream == nullptr) { 398 grpc::internal::MutexLock l(&stream_mu_); 399 stream = stream_.load(std::memory_order_relaxed); 400 if (stream == nullptr) { 401 backlog_.finish_wanted = true; 402 backlog_.status_wanted = std::move(s); 403 return; 404 } 405 } 406 stream->Finish(std::move(s)); 407 } 408 409 /// Notifies the application that an explicit StartSendInitialMetadata 410 /// operation completed. Not used when the sending of initial metadata 411 /// piggybacks onto the first write. 412 /// 413 /// \param[in] ok Was it successful? If false, no further write-side operation 414 /// will succeed. OnSendInitialMetadataDone(bool)415 virtual void OnSendInitialMetadataDone(bool /*ok*/) {} 416 417 /// Notifies the application that a StartRead operation completed. 418 /// 419 /// \param[in] ok Was it successful? If false, no further read-side operation 420 /// will succeed. OnReadDone(bool)421 virtual void OnReadDone(bool /*ok*/) {} 422 423 /// Notifies the application that a StartWrite (or StartWriteLast) operation 424 /// completed. 425 /// 426 /// \param[in] ok Was it successful? If false, no further write-side operation 427 /// will succeed. OnWriteDone(bool)428 virtual void OnWriteDone(bool /*ok*/) {} 429 430 /// Notifies the application that all operations associated with this RPC 431 /// have completed. This is an override (from the internal base class) but 432 /// still abstract, so derived classes MUST override it to be instantiated. 433 void OnDone() override = 0; 434 435 /// Notifies the application that this RPC has been cancelled. This is an 436 /// override (from the internal base class) but not final, so derived classes 437 /// should override it if they want to take action. OnCancel()438 void OnCancel() override {} 439 440 private: 441 friend class ServerCallbackReaderWriter<Request, Response>; 442 // May be overridden by internal implementation details. This is not a public 443 // customization point. InternalBindStream(ServerCallbackReaderWriter<Request,Response> * stream)444 virtual void InternalBindStream( 445 ServerCallbackReaderWriter<Request, Response>* stream) { 446 grpc::internal::MutexLock l(&stream_mu_); 447 448 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { 449 stream->SendInitialMetadata(); 450 } 451 if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) { 452 stream->Read(backlog_.read_wanted); 453 } 454 if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) { 455 stream->WriteAndFinish(backlog_.write_wanted, 456 std::move(backlog_.write_options_wanted), 457 std::move(backlog_.status_wanted)); 458 } else { 459 if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) { 460 stream->Write(backlog_.write_wanted, 461 std::move(backlog_.write_options_wanted)); 462 } 463 if (GPR_UNLIKELY(backlog_.finish_wanted)) { 464 stream->Finish(std::move(backlog_.status_wanted)); 465 } 466 } 467 // Set stream_ last so that other functions can use it lock-free 468 stream_.store(stream, std::memory_order_release); 469 } 470 471 grpc::internal::Mutex stream_mu_; 472 // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant 473 // once C++17 or ABSL is supported since stream and backlog are 474 // mutually exclusive in this class. Do likewise with the 475 // remaining reactor classes and their backlogs as well. 476 std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr}; 477 struct PreBindBacklog { 478 bool send_initial_metadata_wanted = false; 479 bool write_and_finish_wanted = false; 480 bool finish_wanted = false; 481 Request* read_wanted = nullptr; 482 const Response* write_wanted = nullptr; 483 ::grpc::WriteOptions write_options_wanted; 484 ::grpc::Status status_wanted; 485 }; 486 PreBindBacklog backlog_ ABSL_GUARDED_BY(stream_mu_); 487 }; 488 489 /// \a ServerReadReactor is the interface for a client-streaming RPC. 490 template <class Request> 491 class ServerReadReactor : public internal::ServerReactor { 492 public: ServerReadReactor()493 ServerReadReactor() : reader_(nullptr) {} 494 ~ServerReadReactor() override = default; 495 496 /// The following operation initiations are exactly like ServerBidiReactor. StartSendInitialMetadata()497 void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_) { 498 ServerCallbackReader<Request>* reader = 499 reader_.load(std::memory_order_acquire); 500 if (reader == nullptr) { 501 grpc::internal::MutexLock l(&reader_mu_); 502 reader = reader_.load(std::memory_order_relaxed); 503 if (reader == nullptr) { 504 backlog_.send_initial_metadata_wanted = true; 505 return; 506 } 507 } 508 reader->SendInitialMetadata(); 509 } StartRead(Request * req)510 void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(reader_mu_) { 511 ServerCallbackReader<Request>* reader = 512 reader_.load(std::memory_order_acquire); 513 if (reader == nullptr) { 514 grpc::internal::MutexLock l(&reader_mu_); 515 reader = reader_.load(std::memory_order_relaxed); 516 if (reader == nullptr) { 517 backlog_.read_wanted = req; 518 return; 519 } 520 } 521 reader->Read(req); 522 } Finish(::grpc::Status s)523 void Finish(::grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_) { 524 ServerCallbackReader<Request>* reader = 525 reader_.load(std::memory_order_acquire); 526 if (reader == nullptr) { 527 grpc::internal::MutexLock l(&reader_mu_); 528 reader = reader_.load(std::memory_order_relaxed); 529 if (reader == nullptr) { 530 backlog_.finish_wanted = true; 531 backlog_.status_wanted = std::move(s); 532 return; 533 } 534 } 535 reader->Finish(std::move(s)); 536 } 537 538 /// The following notifications are exactly like ServerBidiReactor. OnSendInitialMetadataDone(bool)539 virtual void OnSendInitialMetadataDone(bool /*ok*/) {} OnReadDone(bool)540 virtual void OnReadDone(bool /*ok*/) {} 541 void OnDone() override = 0; OnCancel()542 void OnCancel() override {} 543 544 private: 545 friend class ServerCallbackReader<Request>; 546 547 // May be overridden by internal implementation details. This is not a public 548 // customization point. InternalBindReader(ServerCallbackReader<Request> * reader)549 virtual void InternalBindReader(ServerCallbackReader<Request>* reader) 550 ABSL_LOCKS_EXCLUDED(reader_mu_) { 551 grpc::internal::MutexLock l(&reader_mu_); 552 553 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { 554 reader->SendInitialMetadata(); 555 } 556 if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) { 557 reader->Read(backlog_.read_wanted); 558 } 559 if (GPR_UNLIKELY(backlog_.finish_wanted)) { 560 reader->Finish(std::move(backlog_.status_wanted)); 561 } 562 // Set reader_ last so that other functions can use it lock-free 563 reader_.store(reader, std::memory_order_release); 564 } 565 566 grpc::internal::Mutex reader_mu_; 567 std::atomic<ServerCallbackReader<Request>*> reader_{nullptr}; 568 struct PreBindBacklog { 569 bool send_initial_metadata_wanted = false; 570 bool finish_wanted = false; 571 Request* read_wanted = nullptr; 572 ::grpc::Status status_wanted; 573 }; 574 PreBindBacklog backlog_ ABSL_GUARDED_BY(reader_mu_); 575 }; 576 577 /// \a ServerWriteReactor is the interface for a server-streaming RPC. 578 template <class Response> 579 class ServerWriteReactor : public internal::ServerReactor { 580 public: ServerWriteReactor()581 ServerWriteReactor() : writer_(nullptr) {} 582 ~ServerWriteReactor() override = default; 583 584 /// The following operation initiations are exactly like ServerBidiReactor. StartSendInitialMetadata()585 void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_) { 586 ServerCallbackWriter<Response>* writer = 587 writer_.load(std::memory_order_acquire); 588 if (writer == nullptr) { 589 grpc::internal::MutexLock l(&writer_mu_); 590 writer = writer_.load(std::memory_order_relaxed); 591 if (writer == nullptr) { 592 backlog_.send_initial_metadata_wanted = true; 593 return; 594 } 595 } 596 writer->SendInitialMetadata(); 597 } StartWrite(const Response * resp)598 void StartWrite(const Response* resp) { 599 StartWrite(resp, ::grpc::WriteOptions()); 600 } StartWrite(const Response * resp,::grpc::WriteOptions options)601 void StartWrite(const Response* resp, ::grpc::WriteOptions options) 602 ABSL_LOCKS_EXCLUDED(writer_mu_) { 603 ServerCallbackWriter<Response>* writer = 604 writer_.load(std::memory_order_acquire); 605 if (writer == nullptr) { 606 grpc::internal::MutexLock l(&writer_mu_); 607 writer = writer_.load(std::memory_order_relaxed); 608 if (writer == nullptr) { 609 backlog_.write_wanted = resp; 610 backlog_.write_options_wanted = options; 611 return; 612 } 613 } 614 writer->Write(resp, options); 615 } StartWriteAndFinish(const Response * resp,::grpc::WriteOptions options,::grpc::Status s)616 void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options, 617 ::grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) { 618 ServerCallbackWriter<Response>* writer = 619 writer_.load(std::memory_order_acquire); 620 if (writer == nullptr) { 621 grpc::internal::MutexLock l(&writer_mu_); 622 writer = writer_.load(std::memory_order_relaxed); 623 if (writer == nullptr) { 624 backlog_.write_and_finish_wanted = true; 625 backlog_.write_wanted = resp; 626 backlog_.write_options_wanted = options; 627 backlog_.status_wanted = std::move(s); 628 return; 629 } 630 } 631 writer->WriteAndFinish(resp, options, std::move(s)); 632 } StartWriteLast(const Response * resp,::grpc::WriteOptions options)633 void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) { 634 StartWrite(resp, options.set_last_message()); 635 } Finish(::grpc::Status s)636 void Finish(::grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) { 637 ServerCallbackWriter<Response>* writer = 638 writer_.load(std::memory_order_acquire); 639 if (writer == nullptr) { 640 grpc::internal::MutexLock l(&writer_mu_); 641 writer = writer_.load(std::memory_order_relaxed); 642 if (writer == nullptr) { 643 backlog_.finish_wanted = true; 644 backlog_.status_wanted = std::move(s); 645 return; 646 } 647 } 648 writer->Finish(std::move(s)); 649 } 650 651 /// The following notifications are exactly like ServerBidiReactor. OnSendInitialMetadataDone(bool)652 virtual void OnSendInitialMetadataDone(bool /*ok*/) {} OnWriteDone(bool)653 virtual void OnWriteDone(bool /*ok*/) {} 654 void OnDone() override = 0; OnCancel()655 void OnCancel() override {} 656 657 private: 658 friend class ServerCallbackWriter<Response>; 659 // May be overridden by internal implementation details. This is not a public 660 // customization point. InternalBindWriter(ServerCallbackWriter<Response> * writer)661 virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) 662 ABSL_LOCKS_EXCLUDED(writer_mu_) { 663 grpc::internal::MutexLock l(&writer_mu_); 664 665 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { 666 writer->SendInitialMetadata(); 667 } 668 if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) { 669 writer->WriteAndFinish(backlog_.write_wanted, 670 std::move(backlog_.write_options_wanted), 671 std::move(backlog_.status_wanted)); 672 } else { 673 if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) { 674 writer->Write(backlog_.write_wanted, 675 std::move(backlog_.write_options_wanted)); 676 } 677 if (GPR_UNLIKELY(backlog_.finish_wanted)) { 678 writer->Finish(std::move(backlog_.status_wanted)); 679 } 680 } 681 // Set writer_ last so that other functions can use it lock-free 682 writer_.store(writer, std::memory_order_release); 683 } 684 685 grpc::internal::Mutex writer_mu_; 686 std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr}; 687 struct PreBindBacklog { 688 bool send_initial_metadata_wanted = false; 689 bool write_and_finish_wanted = false; 690 bool finish_wanted = false; 691 const Response* write_wanted = nullptr; 692 ::grpc::WriteOptions write_options_wanted; 693 ::grpc::Status status_wanted; 694 }; 695 PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_); 696 }; 697 698 class ServerUnaryReactor : public internal::ServerReactor { 699 public: ServerUnaryReactor()700 ServerUnaryReactor() : call_(nullptr) {} 701 ~ServerUnaryReactor() override = default; 702 703 /// StartSendInitialMetadata is exactly like ServerBidiReactor. StartSendInitialMetadata()704 void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_) { 705 ServerCallbackUnary* call = call_.load(std::memory_order_acquire); 706 if (call == nullptr) { 707 grpc::internal::MutexLock l(&call_mu_); 708 call = call_.load(std::memory_order_relaxed); 709 if (call == nullptr) { 710 backlog_.send_initial_metadata_wanted = true; 711 return; 712 } 713 } 714 call->SendInitialMetadata(); 715 } 716 /// Finish is similar to ServerBidiReactor except for one detail. 717 /// If the status is non-OK, any message will not be sent. Instead, 718 /// the client will only receive the status and any trailing metadata. Finish(::grpc::Status s)719 void Finish(::grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_) { 720 ServerCallbackUnary* call = call_.load(std::memory_order_acquire); 721 if (call == nullptr) { 722 grpc::internal::MutexLock l(&call_mu_); 723 call = call_.load(std::memory_order_relaxed); 724 if (call == nullptr) { 725 backlog_.finish_wanted = true; 726 backlog_.status_wanted = std::move(s); 727 return; 728 } 729 } 730 call->Finish(std::move(s)); 731 } 732 733 /// The following notifications are exactly like ServerBidiReactor. OnSendInitialMetadataDone(bool)734 virtual void OnSendInitialMetadataDone(bool /*ok*/) {} 735 void OnDone() override = 0; OnCancel()736 void OnCancel() override {} 737 738 private: 739 friend class ServerCallbackUnary; 740 // May be overridden by internal implementation details. This is not a public 741 // customization point. InternalBindCall(ServerCallbackUnary * call)742 virtual void InternalBindCall(ServerCallbackUnary* call) 743 ABSL_LOCKS_EXCLUDED(call_mu_) { 744 grpc::internal::MutexLock l(&call_mu_); 745 746 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) { 747 call->SendInitialMetadata(); 748 } 749 if (GPR_UNLIKELY(backlog_.finish_wanted)) { 750 call->Finish(std::move(backlog_.status_wanted)); 751 } 752 // Set call_ last so that other functions can use it lock-free 753 call_.store(call, std::memory_order_release); 754 } 755 756 grpc::internal::Mutex call_mu_; 757 std::atomic<ServerCallbackUnary*> call_{nullptr}; 758 struct PreBindBacklog { 759 bool send_initial_metadata_wanted = false; 760 bool finish_wanted = false; 761 ::grpc::Status status_wanted; 762 }; 763 PreBindBacklog backlog_ ABSL_GUARDED_BY(call_mu_); 764 }; 765 766 namespace internal { 767 768 template <class Base> 769 class FinishOnlyReactor : public Base { 770 public: FinishOnlyReactor(::grpc::Status s)771 explicit FinishOnlyReactor(::grpc::Status s) { this->Finish(std::move(s)); } OnDone()772 void OnDone() override { this->~FinishOnlyReactor(); } 773 }; 774 775 using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>; 776 template <class Request> 777 using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>; 778 template <class Response> 779 using UnimplementedWriteReactor = 780 FinishOnlyReactor<ServerWriteReactor<Response>>; 781 template <class Request, class Response> 782 using UnimplementedBidiReactor = 783 FinishOnlyReactor<ServerBidiReactor<Request, Response>>; 784 785 } // namespace internal 786 787 // TODO(vjpai): Remove namespace experimental when de-experimentalized fully. 788 namespace experimental { 789 790 template <class Request> 791 using ServerReadReactor = ::grpc::ServerReadReactor<Request>; 792 793 template <class Response> 794 using ServerWriteReactor = ::grpc::ServerWriteReactor<Response>; 795 796 template <class Request, class Response> 797 using ServerBidiReactor = ::grpc::ServerBidiReactor<Request, Response>; 798 799 using ServerUnaryReactor = ::grpc::ServerUnaryReactor; 800 801 } // namespace experimental 802 803 } // namespace grpc 804 805 #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H 806