1 // 2 // Copyright 2015 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_SRC_CORE_SERVER_SERVER_H 18 #define GRPC_SRC_CORE_SERVER_SERVER_H 19 20 #include <grpc/compression.h> 21 #include <grpc/grpc.h> 22 #include <grpc/passive_listener.h> 23 #include <grpc/slice.h> 24 #include <grpc/support/port_platform.h> 25 #include <grpc/support/time.h> 26 #include <stddef.h> 27 #include <stdint.h> 28 29 #include <algorithm> 30 #include <atomic> 31 #include <functional> 32 #include <list> 33 #include <memory> 34 #include <string> 35 #include <utility> 36 #include <vector> 37 38 #include "absl/base/thread_annotations.h" 39 #include "absl/container/flat_hash_map.h" 40 #include "absl/container/flat_hash_set.h" 41 #include "absl/hash/hash.h" 42 #include "absl/random/random.h" 43 #include "absl/status/statusor.h" 44 #include "absl/strings/string_view.h" 45 #include "absl/types/optional.h" 46 #include "src/core/channelz/channelz.h" 47 #include "src/core/lib/channel/channel_args.h" 48 #include "src/core/lib/channel/channel_fwd.h" 49 #include "src/core/lib/channel/channel_stack.h" 50 #include "src/core/lib/debug/trace.h" 51 #include "src/core/lib/iomgr/call_combiner.h" 52 #include "src/core/lib/iomgr/closure.h" 53 #include "src/core/lib/iomgr/endpoint.h" 54 #include "src/core/lib/iomgr/error.h" 55 #include "src/core/lib/iomgr/iomgr_fwd.h" 56 #include "src/core/lib/iomgr/resolved_address.h" 57 #include "src/core/lib/promise/arena_promise.h" 58 #include "src/core/lib/resource_quota/connection_quota.h" 59 #include "src/core/lib/slice/slice.h" 60 #include "src/core/lib/surface/channel.h" 61 #include "src/core/lib/surface/completion_queue.h" 62 #include "src/core/lib/transport/metadata_batch.h" 63 #include "src/core/lib/transport/transport.h" 64 #include "src/core/server/server_interface.h" 65 #include "src/core/telemetry/call_tracer.h" 66 #include "src/core/util/cpp_impl_of.h" 67 #include "src/core/util/dual_ref_counted.h" 68 #include "src/core/util/orphanable.h" 69 #include "src/core/util/random_early_detection.h" 70 #include "src/core/util/ref_counted_ptr.h" 71 #include "src/core/util/sync.h" 72 #include "src/core/util/time.h" 73 74 #define GRPC_ARG_SERVER_MAX_PENDING_REQUESTS "grpc.server.max_pending_requests" 75 #define GRPC_ARG_SERVER_MAX_PENDING_REQUESTS_HARD_LIMIT \ 76 "grpc.server.max_pending_requests_hard_limit" 77 78 namespace grpc_core { 79 80 class ServerConfigFetcher 81 : public CppImplOf<ServerConfigFetcher, grpc_server_config_fetcher> { 82 public: 83 class ConnectionManager 84 : public grpc_core::DualRefCounted<ConnectionManager> { 85 public: 86 virtual absl::StatusOr<grpc_core::ChannelArgs> 87 UpdateChannelArgsForConnection(const grpc_core::ChannelArgs& args, 88 grpc_endpoint* tcp) = 0; 89 }; 90 91 class WatcherInterface { 92 public: 93 virtual ~WatcherInterface() = default; 94 // UpdateConnectionManager() is invoked by the config fetcher when a new 95 // config is available. Implementations should update the connection manager 96 // and start serving if not already serving. 97 virtual void UpdateConnectionManager( 98 grpc_core::RefCountedPtr<ConnectionManager> manager) = 0; 99 // Implementations should stop serving when this is called. Serving should 100 // only resume when UpdateConfig() is invoked. 101 virtual void StopServing() = 0; 102 }; 103 104 virtual ~ServerConfigFetcher() = default; 105 106 virtual void StartWatch(std::string listening_address, 107 std::unique_ptr<WatcherInterface> watcher) = 0; 108 virtual void CancelWatch(WatcherInterface* watcher) = 0; 109 virtual grpc_pollset_set* interested_parties() = 0; 110 }; 111 112 namespace experimental { 113 class PassiveListenerImpl; 114 } // namespace experimental 115 116 namespace testing { 117 class ServerTestPeer; 118 class ListenerStateTestPeer; 119 } // namespace testing 120 121 class Server : public ServerInterface, 122 public InternallyRefCounted<Server>, 123 public CppImplOf<Server, grpc_server> { 124 public: 125 // Filter vtable. 126 static const grpc_channel_filter kServerTopFilter; 127 128 // Opaque type used for registered methods. 129 struct RegisteredMethod; 130 131 // An object to represent the most relevant characteristics of a 132 // newly-allocated call object when using an AllocatingRequestMatcherBatch. 133 struct BatchCallAllocation { 134 void* tag; 135 grpc_call** call; 136 grpc_metadata_array* initial_metadata; 137 grpc_call_details* details; 138 grpc_completion_queue* cq; 139 }; 140 141 // An object to represent the most relevant characteristics of a 142 // newly-allocated call object when using an 143 // AllocatingRequestMatcherRegistered. 144 struct RegisteredCallAllocation { 145 void* tag; 146 grpc_call** call; 147 grpc_metadata_array* initial_metadata; 148 gpr_timespec* deadline; 149 grpc_byte_buffer** optional_payload; 150 grpc_completion_queue* cq; 151 }; 152 153 class ListenerState; 154 155 /// Interface for listeners. 156 class ListenerInterface : public InternallyRefCounted<ListenerInterface> { 157 public: 158 // State for a connection that is being managed by this listener. 159 // The LogicalConnection interface helps the server keep track of 160 // connections during handshake. If the server uses a config fetcher, the 161 // connection continues to be tracked by the server to drain connections on 162 // a config update. If not, the server stops the tracking after handshake is 163 // done. As such, implementations of `LogicalConnection` should cancel the 164 // handshake on `Orphan` if still in progress, but not close down the 165 // transport. 166 // Implementations are responsible for informing ListenerState about the 167 // following stages of a connection - 168 // 1) Invoke AddLogicalConnection() on accepting a new connection. Do not 169 // invoke if the connection is going to be closed immediately. 170 // 2) Invoke OnHandshakeDone() (irrespective of error) once handshake is 171 // done. No need to invoke if `RemoveLogicalConnection()` has already been 172 // invoked. 173 // 3) Invoke RemoveLogicalConnection() when the connection is closed. Do not 174 // invoke if the connection was never added. 175 // TODO(yashykt): In the case where there is no config fetcher, we remove 176 // the connection from our map and instead use `ChannelData` to keep track 177 // of the connections. This is much cheaper (8 bytes per connection) as 178 // compared to implementations of LogicalConnection which can be more than 179 // 24 bytes based on the chttp2 implementation. This complexity causes 180 // weirdness for our interfaces. Figure out a way to combine these two 181 // tracking systems, without increasing memory utilization. 182 class LogicalConnection : public InternallyRefCounted<LogicalConnection> { 183 public: 184 ~LogicalConnection() override = default; 185 186 // The following two methods are called in the context of a server config 187 // event. 188 virtual void SendGoAway() = 0; 189 virtual void DisconnectImmediately() = 0; 190 }; 191 192 ~ListenerInterface() override = default; 193 194 /// Starts listening. 195 virtual void Start() = 0; 196 197 /// Returns the channelz node for the listen socket, or null if not 198 /// supported. 199 virtual channelz::ListenSocketNode* channelz_listen_socket_node() const = 0; 200 201 virtual void SetServerListenerState( 202 RefCountedPtr<ListenerState> listener_state) = 0; 203 204 virtual const grpc_resolved_address* resolved_address() const = 0; 205 206 /// Sets a closure to be invoked by the listener when its destruction 207 /// is complete. 208 virtual void SetOnDestroyDone(grpc_closure* on_destroy_done) = 0; 209 }; 210 211 // Implements the connection management and config fetching mechanism for 212 // listeners. 213 // Note that an alternative implementation would have been to combine the 214 // ListenerInterface and ListenerState into a single parent class, but 215 // they are being separated to make code simpler to understand. 216 class ListenerState : public RefCounted<ListenerState> { 217 public: 218 explicit ListenerState(RefCountedPtr<Server> server, 219 OrphanablePtr<ListenerInterface> l); 220 221 void Start(); 222 223 void Stop(); 224 listener()225 ListenerInterface* listener() { return listener_.get(); } 226 server()227 Server* server() const { return server_.get(); } 228 229 // Adds a LogicalConnection to the listener and updates the channel args if 230 // needed, and returns ChannelArgs if successful. 231 absl::optional<ChannelArgs> AddLogicalConnection( 232 OrphanablePtr<ListenerInterface::LogicalConnection> connection, 233 const ChannelArgs& args, grpc_endpoint* endpoint) 234 ABSL_LOCKS_EXCLUDED(mu_); 235 236 void OnHandshakeDone(ListenerInterface::LogicalConnection* connection); 237 238 // Removes the logical connection from being tracked. This could happen for 239 // reasons such as the connection being closed, or the connection has been 240 // established (including handshake) and doesn't have a server config 241 // fetcher. 242 void RemoveLogicalConnection( 243 ListenerInterface::LogicalConnection* connection); 244 memory_quota()245 const MemoryQuotaRefPtr& memory_quota() const { return memory_quota_; } 246 connection_quota()247 const ConnectionQuotaRefPtr& connection_quota() const { 248 return connection_quota_; 249 } 250 event_engine()251 grpc_event_engine::experimental::EventEngine* event_engine() const { 252 return event_engine_; 253 } 254 255 private: 256 friend class grpc_core::testing::ListenerStateTestPeer; 257 258 class ConfigFetcherWatcher : public ServerConfigFetcher::WatcherInterface { 259 public: ConfigFetcherWatcher(ListenerState * listener_state)260 explicit ConfigFetcherWatcher(ListenerState* listener_state) 261 : listener_state_(listener_state) {} 262 263 void UpdateConnectionManager( 264 RefCountedPtr<ServerConfigFetcher::ConnectionManager> 265 connection_manager) override; 266 267 void StopServing() override; 268 269 private: 270 // This doesn't need to be ref-counted since we start and stop config 271 // fetching as part of starting and stopping the listener. 272 ListenerState* const listener_state_; 273 }; 274 275 struct ConnectionsToBeDrained { 276 absl::flat_hash_set<OrphanablePtr<ListenerInterface::LogicalConnection>> 277 connections; 278 grpc_core::Timestamp timestamp; 279 }; 280 281 void DrainConnectionsLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 282 283 void OnDrainGraceTimer(); 284 285 void MaybeStartNewGraceTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 286 287 void RemoveConnectionsToBeDrainedOnEmptyLocked( 288 std::deque<ConnectionsToBeDrained>::iterator it) 289 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 290 291 RefCountedPtr<Server> const server_; 292 MemoryQuotaRefPtr const memory_quota_; 293 ConnectionQuotaRefPtr connection_quota_; 294 grpc_event_engine::experimental::EventEngine* const event_engine_; 295 OrphanablePtr<ListenerInterface> listener_; 296 grpc_closure destroy_done_; 297 ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr; 298 Mutex mu_; // We could share this mutex with Listener implementations. It's 299 // a tradeoff between increased memory requirement and more 300 // granular critical regions. 301 RefCountedPtr<ServerConfigFetcher::ConnectionManager> connection_manager_ 302 ABSL_GUARDED_BY(mu_); 303 bool is_serving_ ABSL_GUARDED_BY(mu_) = false; 304 bool started_ ABSL_GUARDED_BY(mu_) = false; 305 absl::flat_hash_set<OrphanablePtr<ListenerInterface::LogicalConnection>> 306 connections_ ABSL_GUARDED_BY(mu_); 307 std::deque<ConnectionsToBeDrained> connections_to_be_drained_list_ 308 ABSL_GUARDED_BY(mu_); 309 grpc_event_engine::experimental::EventEngine::TaskHandle 310 drain_grace_timer_handle_ ABSL_GUARDED_BY(mu_) = 311 grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid; 312 }; 313 314 explicit Server(const ChannelArgs& args); 315 ~Server() override; 316 317 void Orphan() ABSL_LOCKS_EXCLUDED(mu_global_) override; 318 channel_args()319 const ChannelArgs& channel_args() const override { return channel_args_; } channelz_node()320 channelz::ServerNode* channelz_node() const override { 321 return channelz_node_.get(); 322 } 323 324 // Do not call this before Start(). Returns the pollsets. The 325 // vector itself is immutable, but the pollsets inside are mutable. The 326 // result is valid for the lifetime of the server. pollsets()327 const std::vector<grpc_pollset*>& pollsets() const { return pollsets_; } 328 config_fetcher()329 ServerConfigFetcher* config_fetcher() const { return config_fetcher_.get(); } 330 server_call_tracer_factory()331 ServerCallTracerFactory* server_call_tracer_factory() const override { 332 return server_call_tracer_factory_; 333 } 334 set_config_fetcher(std::unique_ptr<ServerConfigFetcher> config_fetcher)335 void set_config_fetcher(std::unique_ptr<ServerConfigFetcher> config_fetcher) { 336 config_fetcher_ = std::move(config_fetcher); 337 } 338 339 bool HasOpenConnections() ABSL_LOCKS_EXCLUDED(mu_global_); 340 341 // Adds a listener to the server. When the server starts, it will call 342 // the listener's Start() method, and when it shuts down, it will orphan 343 // the listener. 344 void AddListener(OrphanablePtr<ListenerInterface> listener); 345 346 // Starts listening for connections. 347 void Start() ABSL_LOCKS_EXCLUDED(mu_global_); 348 349 // Sets up a transport. Creates a channel stack and binds the transport to 350 // the server. Called from the listener when a new connection is accepted. 351 // Takes ownership of a ref on resource_user from the caller. 352 grpc_error_handle SetupTransport( 353 Transport* transport, grpc_pollset* accepting_pollset, 354 const ChannelArgs& args, 355 const RefCountedPtr<channelz::SocketNode>& socket_node) 356 ABSL_LOCKS_EXCLUDED(mu_global_); 357 358 void RegisterCompletionQueue(grpc_completion_queue* cq); 359 360 // Functions to specify that a specific registered method or the unregistered 361 // collection should use a specific allocator for request matching. 362 void SetRegisteredMethodAllocator( 363 grpc_completion_queue* cq, void* method_tag, 364 std::function<RegisteredCallAllocation()> allocator); 365 void SetBatchMethodAllocator(grpc_completion_queue* cq, 366 std::function<BatchCallAllocation()> allocator); 367 368 RegisteredMethod* RegisterMethod( 369 const char* method, const char* host, 370 grpc_server_register_method_payload_handling payload_handling, 371 uint32_t flags); 372 373 grpc_call_error RequestCall(grpc_call** call, grpc_call_details* details, 374 grpc_metadata_array* request_metadata, 375 grpc_completion_queue* cq_bound_to_call, 376 grpc_completion_queue* cq_for_notification, 377 void* tag); 378 379 grpc_call_error RequestRegisteredCall( 380 RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline, 381 grpc_metadata_array* request_metadata, 382 grpc_byte_buffer** optional_payload, 383 grpc_completion_queue* cq_bound_to_call, 384 grpc_completion_queue* cq_for_notification, void* tag_new); 385 386 void ShutdownAndNotify(grpc_completion_queue* cq, void* tag) 387 ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_); 388 389 void StopListening(); 390 391 void CancelAllCalls() ABSL_LOCKS_EXCLUDED(mu_global_); 392 393 void SendGoaways() ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_); 394 compression_options()395 grpc_compression_options compression_options() const override { 396 return compression_options_; 397 } 398 399 private: 400 // note: the grpc_core::Server redundant namespace qualification is 401 // required for older gcc versions. 402 // TODO(yashykt): eliminate this friend statement as part of your upcoming 403 // server listener refactoring. 404 friend absl::Status(::grpc_server_add_passive_listener)( 405 grpc_core::Server* server, grpc_server_credentials* credentials, 406 std::shared_ptr<grpc_core::experimental::PassiveListenerImpl> 407 passive_listener); 408 409 friend class grpc_core::testing::ServerTestPeer; 410 411 struct RequestedCall; 412 413 class RequestMatcherInterface; 414 class RealRequestMatcher; 415 class AllocatingRequestMatcherBase; 416 class AllocatingRequestMatcherBatch; 417 class AllocatingRequestMatcherRegistered; 418 419 class ChannelData final { 420 public: 421 ChannelData() = default; 422 ~ChannelData(); 423 424 void InitTransport(RefCountedPtr<Server> server, 425 RefCountedPtr<Channel> channel, size_t cq_idx, 426 Transport* transport, intptr_t channelz_socket_uuid); 427 server()428 RefCountedPtr<Server> server() const { return server_; } channel()429 Channel* channel() const { return channel_.get(); } cq_idx()430 size_t cq_idx() const { return cq_idx_; } 431 432 // Filter vtable functions. 433 static grpc_error_handle InitChannelElement( 434 grpc_channel_element* elem, grpc_channel_element_args* args); 435 static void DestroyChannelElement(grpc_channel_element* elem); 436 437 private: 438 class ConnectivityWatcher; 439 440 static void AcceptStream(void* arg, Transport* /*transport*/, 441 const void* transport_server_data); 442 443 void Destroy() ABSL_EXCLUSIVE_LOCKS_REQUIRED(server_->mu_global_); 444 445 static void FinishDestroy(void* arg, grpc_error_handle error); 446 447 RefCountedPtr<Server> server_; 448 RefCountedPtr<Channel> channel_; 449 // The index into Server::cqs_ of the CQ used as a starting point for 450 // where to publish new incoming calls. 451 size_t cq_idx_; 452 absl::optional<std::list<ChannelData*>::iterator> list_position_; 453 grpc_closure finish_destroy_channel_closure_; 454 intptr_t channelz_socket_uuid_; 455 }; 456 457 class CallData { 458 public: 459 enum class CallState { 460 NOT_STARTED, // Waiting for metadata. 461 PENDING, // Initial metadata read, not flow controlled in yet. 462 ACTIVATED, // Flow controlled in, on completion queue. 463 ZOMBIED, // Cancelled before being queued. 464 }; 465 466 CallData(grpc_call_element* elem, const grpc_call_element_args& args, 467 RefCountedPtr<Server> server); 468 ~CallData(); 469 470 // Starts the recv_initial_metadata batch on the call. 471 // Invoked from ChannelData::AcceptStream(). 472 void Start(grpc_call_element* elem); 473 474 void SetState(CallState state); 475 476 // Attempts to move from PENDING to ACTIVATED state. Returns true 477 // on success. 478 bool MaybeActivate(); 479 480 // Publishes an incoming call to the application after it has been 481 // matched. 482 void Publish(size_t cq_idx, RequestedCall* rc); 483 484 void KillZombie(); 485 486 void FailCallCreation(); 487 488 // Filter vtable functions. 489 static grpc_error_handle InitCallElement( 490 grpc_call_element* elem, const grpc_call_element_args* args); 491 static void DestroyCallElement(grpc_call_element* elem, 492 const grpc_call_final_info* /*final_info*/, 493 grpc_closure* /*ignored*/); 494 static void StartTransportStreamOpBatch( 495 grpc_call_element* elem, grpc_transport_stream_op_batch* batch); 496 497 private: 498 // Helper functions for handling calls at the top of the call stack. 499 static void RecvInitialMetadataBatchComplete(void* arg, 500 grpc_error_handle error); 501 void StartNewRpc(grpc_call_element* elem); 502 static void PublishNewRpc(void* arg, grpc_error_handle error); 503 504 // Functions used inside the call stack. 505 void StartTransportStreamOpBatchImpl(grpc_call_element* elem, 506 grpc_transport_stream_op_batch* batch); 507 static void RecvInitialMetadataReady(void* arg, grpc_error_handle error); 508 static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); 509 510 RefCountedPtr<Server> server_; 511 512 grpc_call* call_; 513 514 std::atomic<CallState> state_{CallState::NOT_STARTED}; 515 516 absl::optional<Slice> path_; 517 absl::optional<Slice> host_; 518 Timestamp deadline_ = Timestamp::InfFuture(); 519 520 grpc_completion_queue* cq_new_ = nullptr; 521 522 RequestMatcherInterface* matcher_ = nullptr; 523 grpc_byte_buffer* payload_ = nullptr; 524 525 grpc_closure kill_zombie_closure_; 526 527 grpc_metadata_array initial_metadata_ = 528 grpc_metadata_array(); // Zero-initialize the C struct. 529 grpc_closure recv_initial_metadata_batch_complete_; 530 531 grpc_metadata_batch* recv_initial_metadata_ = nullptr; 532 grpc_closure recv_initial_metadata_ready_; 533 grpc_closure* original_recv_initial_metadata_ready_; 534 grpc_error_handle recv_initial_metadata_error_; 535 536 bool seen_recv_trailing_metadata_ready_ = false; 537 grpc_closure recv_trailing_metadata_ready_; 538 grpc_closure* original_recv_trailing_metadata_ready_; 539 grpc_error_handle recv_trailing_metadata_error_; 540 541 grpc_closure publish_; 542 543 CallCombiner* call_combiner_; 544 }; 545 546 struct ShutdownTag { ShutdownTagShutdownTag547 ShutdownTag(void* tag_arg, grpc_completion_queue* cq_arg) 548 : tag(tag_arg), cq(cq_arg) {} 549 void* const tag; 550 grpc_completion_queue* const cq; 551 grpc_cq_completion completion; 552 }; 553 554 struct StringViewStringViewPairHash 555 : absl::flat_hash_set< 556 std::pair<absl::string_view, absl::string_view>>::hasher { 557 using is_transparent = void; 558 }; 559 560 struct StringViewStringViewPairEq 561 : std::equal_to<std::pair<absl::string_view, absl::string_view>> { 562 using is_transparent = void; 563 }; 564 565 class TransportConnectivityWatcher; 566 567 RegisteredMethod* GetRegisteredMethod(const absl::string_view& host, 568 const absl::string_view& path); 569 void SetRegisteredMethodOnMetadata(ClientMetadata& metadata); 570 571 static void ListenerDestroyDone(void* arg, grpc_error_handle error); 572 DoneShutdownEvent(void * server,grpc_cq_completion *)573 static void DoneShutdownEvent(void* server, 574 grpc_cq_completion* /*completion*/) { 575 static_cast<Server*>(server)->Unref(); 576 } 577 578 static void DoneRequestEvent(void* req, grpc_cq_completion* completion); 579 580 void FailCall(size_t cq_idx, RequestedCall* rc, grpc_error_handle error); 581 grpc_call_error QueueRequestedCall(size_t cq_idx, RequestedCall* rc); 582 583 void MaybeFinishShutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_) 584 ABSL_LOCKS_EXCLUDED(mu_call_); 585 586 void KillPendingWorkLocked(grpc_error_handle error) 587 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_call_); 588 589 static grpc_call_error ValidateServerRequest( 590 grpc_completion_queue* cq_for_notification, void* tag, 591 grpc_byte_buffer** optional_payload, RegisteredMethod* rm); 592 grpc_call_error ValidateServerRequestAndCq( 593 size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag, 594 grpc_byte_buffer** optional_payload, RegisteredMethod* rm); 595 596 std::vector<RefCountedPtr<Channel>> GetChannelsLocked() const; 597 598 // Take a shutdown ref for a request (increment by 2) and return if shutdown 599 // has not been called. ShutdownRefOnRequest()600 bool ShutdownRefOnRequest() { 601 int old_value = shutdown_refs_.fetch_add(2, std::memory_order_acq_rel); 602 return (old_value & 1) != 0; 603 } 604 605 // Decrement the shutdown ref counter by either 1 (for shutdown call) or 2 606 // (for in-flight request) and possibly call MaybeFinishShutdown if 607 // appropriate. ShutdownUnrefOnRequest()608 void ShutdownUnrefOnRequest() ABSL_LOCKS_EXCLUDED(mu_global_) { 609 if (shutdown_refs_.fetch_sub(2, std::memory_order_acq_rel) == 2) { 610 MutexLock lock(&mu_global_); 611 MaybeFinishShutdown(); 612 } 613 } ShutdownUnrefOnShutdownCall()614 void ShutdownUnrefOnShutdownCall() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_) { 615 if (shutdown_refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) { 616 // There is no request in-flight. 617 MaybeFinishShutdown(); 618 } 619 } 620 ShutdownCalled()621 bool ShutdownCalled() const { 622 return (shutdown_refs_.load(std::memory_order_acquire) & 1) == 0; 623 } 624 625 // Returns whether there are no more shutdown refs, which means that shutdown 626 // has been called and all accepted requests have been published if using an 627 // AllocatingRequestMatcher. ShutdownReady()628 bool ShutdownReady() const { 629 return shutdown_refs_.load(std::memory_order_acquire) == 0; 630 } 631 632 auto MatchAndPublishCall(CallHandler call_handler); 633 absl::StatusOr<RefCountedPtr<UnstartedCallDestination>> MakeCallDestination( 634 const ChannelArgs& args); 635 636 ChannelArgs const channel_args_; 637 RefCountedPtr<channelz::ServerNode> channelz_node_; 638 std::unique_ptr<ServerConfigFetcher> config_fetcher_; 639 ServerCallTracerFactory* const server_call_tracer_factory_; 640 641 std::vector<grpc_completion_queue*> cqs_; 642 std::vector<grpc_pollset*> pollsets_; 643 bool started_ = false; 644 const grpc_compression_options compression_options_; 645 646 // The two following mutexes control access to server-state. 647 // mu_global_ controls access to non-call-related state (e.g., channel state). 648 // mu_call_ controls access to call-related state (e.g., the call lists). 649 // 650 // If they are ever required to be nested, you must lock mu_global_ 651 // before mu_call_. This is currently used in shutdown processing 652 // (ShutdownAndNotify() and MaybeFinishShutdown()). 653 Mutex mu_global_; // mutex for server and channel state 654 Mutex mu_call_; // mutex for call-specific state 655 656 // startup synchronization: flag, signals whether we are doing the listener 657 // start routine or not. 658 bool starting_ ABSL_GUARDED_BY(mu_global_) = false; 659 CondVar starting_cv_; 660 661 // Map of registered methods. 662 absl::flat_hash_map<std::pair<std::string, std::string> /*host, method*/, 663 std::unique_ptr<RegisteredMethod>, 664 StringViewStringViewPairHash, StringViewStringViewPairEq> 665 registered_methods_; 666 667 // Request matcher for unregistered methods. 668 std::unique_ptr<RequestMatcherInterface> unregistered_request_matcher_; 669 670 // The shutdown refs counter tracks whether or not shutdown has been called 671 // and whether there are any AllocatingRequestMatcher requests that have been 672 // accepted but not yet started (+2 on each one). If shutdown has been called, 673 // the lowest bit will be 0 (defaults to 1) and the counter will be even. The 674 // server should not notify on shutdown until the counter is 0 (shutdown is 675 // called and there are no requests that are accepted but not started). 676 std::atomic<int> shutdown_refs_{1}; 677 bool shutdown_published_ ABSL_GUARDED_BY(mu_global_) = false; 678 std::vector<ShutdownTag> shutdown_tags_ ABSL_GUARDED_BY(mu_global_); 679 ABSL_GUARDED_BY(mu_call_)680 RandomEarlyDetection pending_backlog_protector_ ABSL_GUARDED_BY(mu_call_){ 681 static_cast<uint64_t>( 682 std::max(0, channel_args_.GetInt(GRPC_ARG_SERVER_MAX_PENDING_REQUESTS) 683 .value_or(1000))), 684 static_cast<uint64_t>(std::max( 685 0, 686 channel_args_.GetInt(GRPC_ARG_SERVER_MAX_PENDING_REQUESTS_HARD_LIMIT) 687 .value_or(3000)))}; 688 const Duration max_time_in_pending_queue_; 689 absl::BitGen bitgen_ ABSL_GUARDED_BY(mu_call_); 690 691 std::list<ChannelData*> channels_; 692 absl::flat_hash_set<OrphanablePtr<ServerTransport>> connections_ 693 ABSL_GUARDED_BY(mu_global_); 694 RefCountedPtr<ServerConfigFetcher::ConnectionManager> connection_manager_ 695 ABSL_GUARDED_BY(mu_global_); 696 size_t connections_open_ ABSL_GUARDED_BY(mu_global_) = 0; 697 698 std::list<RefCountedPtr<ListenerState>> listener_states_; 699 size_t listeners_destroyed_ = 0; 700 701 // The last time we printed a shutdown progress message. 702 gpr_timespec last_shutdown_message_time_; 703 }; 704 705 } // namespace grpc_core 706 707 #endif // GRPC_SRC_CORE_SERVER_SERVER_H 708