• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_SRC_CORE_SERVER_SERVER_H
18 #define GRPC_SRC_CORE_SERVER_SERVER_H
19 
20 #include <grpc/compression.h>
21 #include <grpc/grpc.h>
22 #include <grpc/passive_listener.h>
23 #include <grpc/slice.h>
24 #include <grpc/support/port_platform.h>
25 #include <grpc/support/time.h>
26 #include <stddef.h>
27 #include <stdint.h>
28 
29 #include <algorithm>
30 #include <atomic>
31 #include <functional>
32 #include <list>
33 #include <memory>
34 #include <string>
35 #include <utility>
36 #include <vector>
37 
38 #include "absl/base/thread_annotations.h"
39 #include "absl/container/flat_hash_map.h"
40 #include "absl/container/flat_hash_set.h"
41 #include "absl/hash/hash.h"
42 #include "absl/random/random.h"
43 #include "absl/status/statusor.h"
44 #include "absl/strings/string_view.h"
45 #include "absl/types/optional.h"
46 #include "src/core/channelz/channelz.h"
47 #include "src/core/lib/channel/channel_args.h"
48 #include "src/core/lib/channel/channel_fwd.h"
49 #include "src/core/lib/channel/channel_stack.h"
50 #include "src/core/lib/debug/trace.h"
51 #include "src/core/lib/iomgr/call_combiner.h"
52 #include "src/core/lib/iomgr/closure.h"
53 #include "src/core/lib/iomgr/endpoint.h"
54 #include "src/core/lib/iomgr/error.h"
55 #include "src/core/lib/iomgr/iomgr_fwd.h"
56 #include "src/core/lib/iomgr/resolved_address.h"
57 #include "src/core/lib/promise/arena_promise.h"
58 #include "src/core/lib/resource_quota/connection_quota.h"
59 #include "src/core/lib/slice/slice.h"
60 #include "src/core/lib/surface/channel.h"
61 #include "src/core/lib/surface/completion_queue.h"
62 #include "src/core/lib/transport/metadata_batch.h"
63 #include "src/core/lib/transport/transport.h"
64 #include "src/core/server/server_interface.h"
65 #include "src/core/telemetry/call_tracer.h"
66 #include "src/core/util/cpp_impl_of.h"
67 #include "src/core/util/dual_ref_counted.h"
68 #include "src/core/util/orphanable.h"
69 #include "src/core/util/random_early_detection.h"
70 #include "src/core/util/ref_counted_ptr.h"
71 #include "src/core/util/sync.h"
72 #include "src/core/util/time.h"
73 
74 #define GRPC_ARG_SERVER_MAX_PENDING_REQUESTS "grpc.server.max_pending_requests"
75 #define GRPC_ARG_SERVER_MAX_PENDING_REQUESTS_HARD_LIMIT \
76   "grpc.server.max_pending_requests_hard_limit"
77 
78 namespace grpc_core {
79 
80 class ServerConfigFetcher
81     : public CppImplOf<ServerConfigFetcher, grpc_server_config_fetcher> {
82  public:
83   class ConnectionManager
84       : public grpc_core::DualRefCounted<ConnectionManager> {
85    public:
86     virtual absl::StatusOr<grpc_core::ChannelArgs>
87     UpdateChannelArgsForConnection(const grpc_core::ChannelArgs& args,
88                                    grpc_endpoint* tcp) = 0;
89   };
90 
91   class WatcherInterface {
92    public:
93     virtual ~WatcherInterface() = default;
94     // UpdateConnectionManager() is invoked by the config fetcher when a new
95     // config is available. Implementations should update the connection manager
96     // and start serving if not already serving.
97     virtual void UpdateConnectionManager(
98         grpc_core::RefCountedPtr<ConnectionManager> manager) = 0;
99     // Implementations should stop serving when this is called. Serving should
100     // only resume when UpdateConfig() is invoked.
101     virtual void StopServing() = 0;
102   };
103 
104   virtual ~ServerConfigFetcher() = default;
105 
106   virtual void StartWatch(std::string listening_address,
107                           std::unique_ptr<WatcherInterface> watcher) = 0;
108   virtual void CancelWatch(WatcherInterface* watcher) = 0;
109   virtual grpc_pollset_set* interested_parties() = 0;
110 };
111 
112 namespace experimental {
113 class PassiveListenerImpl;
114 }  // namespace experimental
115 
116 namespace testing {
117 class ServerTestPeer;
118 class ListenerStateTestPeer;
119 }  // namespace testing
120 
121 class Server : public ServerInterface,
122                public InternallyRefCounted<Server>,
123                public CppImplOf<Server, grpc_server> {
124  public:
125   // Filter vtable.
126   static const grpc_channel_filter kServerTopFilter;
127 
128   // Opaque type used for registered methods.
129   struct RegisteredMethod;
130 
131   // An object to represent the most relevant characteristics of a
132   // newly-allocated call object when using an AllocatingRequestMatcherBatch.
133   struct BatchCallAllocation {
134     void* tag;
135     grpc_call** call;
136     grpc_metadata_array* initial_metadata;
137     grpc_call_details* details;
138     grpc_completion_queue* cq;
139   };
140 
141   // An object to represent the most relevant characteristics of a
142   // newly-allocated call object when using an
143   // AllocatingRequestMatcherRegistered.
144   struct RegisteredCallAllocation {
145     void* tag;
146     grpc_call** call;
147     grpc_metadata_array* initial_metadata;
148     gpr_timespec* deadline;
149     grpc_byte_buffer** optional_payload;
150     grpc_completion_queue* cq;
151   };
152 
153   class ListenerState;
154 
155   /// Interface for listeners.
156   class ListenerInterface : public InternallyRefCounted<ListenerInterface> {
157    public:
158     // State for a connection that is being managed by this listener.
159     // The LogicalConnection interface helps the server keep track of
160     // connections during handshake. If the server uses a config fetcher, the
161     // connection continues to be tracked by the server to drain connections on
162     // a config update. If not, the server stops the tracking after handshake is
163     // done. As such, implementations of `LogicalConnection` should cancel the
164     // handshake on `Orphan` if still in progress, but not close down the
165     // transport.
166     // Implementations are responsible for informing ListenerState about the
167     // following stages of a connection -
168     // 1) Invoke AddLogicalConnection() on accepting a new connection. Do not
169     // invoke if the connection is going to be closed immediately.
170     // 2) Invoke OnHandshakeDone() (irrespective of error) once handshake is
171     // done. No need to invoke if `RemoveLogicalConnection()` has already been
172     // invoked.
173     // 3) Invoke RemoveLogicalConnection() when the connection is closed. Do not
174     // invoke if the connection was never added.
175     // TODO(yashykt): In the case where there is no config fetcher, we remove
176     // the connection from our map and instead use `ChannelData` to keep track
177     // of the connections. This is much cheaper (8 bytes per connection) as
178     // compared to implementations of LogicalConnection which can be more than
179     // 24 bytes based on the chttp2 implementation. This complexity causes
180     // weirdness for our interfaces. Figure out a way to combine these two
181     // tracking systems, without increasing memory utilization.
182     class LogicalConnection : public InternallyRefCounted<LogicalConnection> {
183      public:
184       ~LogicalConnection() override = default;
185 
186       // The following two methods are called in the context of a server config
187       // event.
188       virtual void SendGoAway() = 0;
189       virtual void DisconnectImmediately() = 0;
190     };
191 
192     ~ListenerInterface() override = default;
193 
194     /// Starts listening.
195     virtual void Start() = 0;
196 
197     /// Returns the channelz node for the listen socket, or null if not
198     /// supported.
199     virtual channelz::ListenSocketNode* channelz_listen_socket_node() const = 0;
200 
201     virtual void SetServerListenerState(
202         RefCountedPtr<ListenerState> listener_state) = 0;
203 
204     virtual const grpc_resolved_address* resolved_address() const = 0;
205 
206     /// Sets a closure to be invoked by the listener when its destruction
207     /// is complete.
208     virtual void SetOnDestroyDone(grpc_closure* on_destroy_done) = 0;
209   };
210 
211   // Implements the connection management and config fetching mechanism for
212   // listeners.
213   // Note that an alternative implementation would have been to combine the
214   // ListenerInterface and ListenerState into a single parent class, but
215   // they are being separated to make code simpler to understand.
216   class ListenerState : public RefCounted<ListenerState> {
217    public:
218     explicit ListenerState(RefCountedPtr<Server> server,
219                            OrphanablePtr<ListenerInterface> l);
220 
221     void Start();
222 
223     void Stop();
224 
listener()225     ListenerInterface* listener() { return listener_.get(); }
226 
server()227     Server* server() const { return server_.get(); }
228 
229     // Adds a LogicalConnection to the listener and updates the channel args if
230     // needed, and returns ChannelArgs if successful.
231     absl::optional<ChannelArgs> AddLogicalConnection(
232         OrphanablePtr<ListenerInterface::LogicalConnection> connection,
233         const ChannelArgs& args, grpc_endpoint* endpoint)
234         ABSL_LOCKS_EXCLUDED(mu_);
235 
236     void OnHandshakeDone(ListenerInterface::LogicalConnection* connection);
237 
238     // Removes the logical connection from being tracked. This could happen for
239     // reasons such as the connection being closed, or the connection has been
240     // established (including handshake) and doesn't have a server config
241     // fetcher.
242     void RemoveLogicalConnection(
243         ListenerInterface::LogicalConnection* connection);
244 
memory_quota()245     const MemoryQuotaRefPtr& memory_quota() const { return memory_quota_; }
246 
connection_quota()247     const ConnectionQuotaRefPtr& connection_quota() const {
248       return connection_quota_;
249     }
250 
event_engine()251     grpc_event_engine::experimental::EventEngine* event_engine() const {
252       return event_engine_;
253     }
254 
255    private:
256     friend class grpc_core::testing::ListenerStateTestPeer;
257 
258     class ConfigFetcherWatcher : public ServerConfigFetcher::WatcherInterface {
259      public:
ConfigFetcherWatcher(ListenerState * listener_state)260       explicit ConfigFetcherWatcher(ListenerState* listener_state)
261           : listener_state_(listener_state) {}
262 
263       void UpdateConnectionManager(
264           RefCountedPtr<ServerConfigFetcher::ConnectionManager>
265               connection_manager) override;
266 
267       void StopServing() override;
268 
269      private:
270       // This doesn't need to be ref-counted since we start and stop config
271       // fetching as part of starting and stopping the listener.
272       ListenerState* const listener_state_;
273     };
274 
275     struct ConnectionsToBeDrained {
276       absl::flat_hash_set<OrphanablePtr<ListenerInterface::LogicalConnection>>
277           connections;
278       grpc_core::Timestamp timestamp;
279     };
280 
281     void DrainConnectionsLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
282 
283     void OnDrainGraceTimer();
284 
285     void MaybeStartNewGraceTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
286 
287     void RemoveConnectionsToBeDrainedOnEmptyLocked(
288         std::deque<ConnectionsToBeDrained>::iterator it)
289         ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
290 
291     RefCountedPtr<Server> const server_;
292     MemoryQuotaRefPtr const memory_quota_;
293     ConnectionQuotaRefPtr connection_quota_;
294     grpc_event_engine::experimental::EventEngine* const event_engine_;
295     OrphanablePtr<ListenerInterface> listener_;
296     grpc_closure destroy_done_;
297     ConfigFetcherWatcher* config_fetcher_watcher_ = nullptr;
298     Mutex mu_;  // We could share this mutex with Listener implementations. It's
299                 // a tradeoff between increased memory requirement and more
300                 // granular critical regions.
301     RefCountedPtr<ServerConfigFetcher::ConnectionManager> connection_manager_
302         ABSL_GUARDED_BY(mu_);
303     bool is_serving_ ABSL_GUARDED_BY(mu_) = false;
304     bool started_ ABSL_GUARDED_BY(mu_) = false;
305     absl::flat_hash_set<OrphanablePtr<ListenerInterface::LogicalConnection>>
306         connections_ ABSL_GUARDED_BY(mu_);
307     std::deque<ConnectionsToBeDrained> connections_to_be_drained_list_
308         ABSL_GUARDED_BY(mu_);
309     grpc_event_engine::experimental::EventEngine::TaskHandle
310         drain_grace_timer_handle_ ABSL_GUARDED_BY(mu_) =
311             grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
312   };
313 
314   explicit Server(const ChannelArgs& args);
315   ~Server() override;
316 
317   void Orphan() ABSL_LOCKS_EXCLUDED(mu_global_) override;
318 
channel_args()319   const ChannelArgs& channel_args() const override { return channel_args_; }
channelz_node()320   channelz::ServerNode* channelz_node() const override {
321     return channelz_node_.get();
322   }
323 
324   // Do not call this before Start(). Returns the pollsets. The
325   // vector itself is immutable, but the pollsets inside are mutable. The
326   // result is valid for the lifetime of the server.
pollsets()327   const std::vector<grpc_pollset*>& pollsets() const { return pollsets_; }
328 
config_fetcher()329   ServerConfigFetcher* config_fetcher() const { return config_fetcher_.get(); }
330 
server_call_tracer_factory()331   ServerCallTracerFactory* server_call_tracer_factory() const override {
332     return server_call_tracer_factory_;
333   }
334 
set_config_fetcher(std::unique_ptr<ServerConfigFetcher> config_fetcher)335   void set_config_fetcher(std::unique_ptr<ServerConfigFetcher> config_fetcher) {
336     config_fetcher_ = std::move(config_fetcher);
337   }
338 
339   bool HasOpenConnections() ABSL_LOCKS_EXCLUDED(mu_global_);
340 
341   // Adds a listener to the server.  When the server starts, it will call
342   // the listener's Start() method, and when it shuts down, it will orphan
343   // the listener.
344   void AddListener(OrphanablePtr<ListenerInterface> listener);
345 
346   // Starts listening for connections.
347   void Start() ABSL_LOCKS_EXCLUDED(mu_global_);
348 
349   // Sets up a transport.  Creates a channel stack and binds the transport to
350   // the server.  Called from the listener when a new connection is accepted.
351   // Takes ownership of a ref on resource_user from the caller.
352   grpc_error_handle SetupTransport(
353       Transport* transport, grpc_pollset* accepting_pollset,
354       const ChannelArgs& args,
355       const RefCountedPtr<channelz::SocketNode>& socket_node)
356       ABSL_LOCKS_EXCLUDED(mu_global_);
357 
358   void RegisterCompletionQueue(grpc_completion_queue* cq);
359 
360   // Functions to specify that a specific registered method or the unregistered
361   // collection should use a specific allocator for request matching.
362   void SetRegisteredMethodAllocator(
363       grpc_completion_queue* cq, void* method_tag,
364       std::function<RegisteredCallAllocation()> allocator);
365   void SetBatchMethodAllocator(grpc_completion_queue* cq,
366                                std::function<BatchCallAllocation()> allocator);
367 
368   RegisteredMethod* RegisterMethod(
369       const char* method, const char* host,
370       grpc_server_register_method_payload_handling payload_handling,
371       uint32_t flags);
372 
373   grpc_call_error RequestCall(grpc_call** call, grpc_call_details* details,
374                               grpc_metadata_array* request_metadata,
375                               grpc_completion_queue* cq_bound_to_call,
376                               grpc_completion_queue* cq_for_notification,
377                               void* tag);
378 
379   grpc_call_error RequestRegisteredCall(
380       RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline,
381       grpc_metadata_array* request_metadata,
382       grpc_byte_buffer** optional_payload,
383       grpc_completion_queue* cq_bound_to_call,
384       grpc_completion_queue* cq_for_notification, void* tag_new);
385 
386   void ShutdownAndNotify(grpc_completion_queue* cq, void* tag)
387       ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_);
388 
389   void StopListening();
390 
391   void CancelAllCalls() ABSL_LOCKS_EXCLUDED(mu_global_);
392 
393   void SendGoaways() ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_);
394 
compression_options()395   grpc_compression_options compression_options() const override {
396     return compression_options_;
397   }
398 
399  private:
400   // note: the grpc_core::Server redundant namespace qualification is
401   // required for older gcc versions.
402   // TODO(yashykt): eliminate this friend statement as part of your upcoming
403   // server listener refactoring.
404   friend absl::Status(::grpc_server_add_passive_listener)(
405       grpc_core::Server* server, grpc_server_credentials* credentials,
406       std::shared_ptr<grpc_core::experimental::PassiveListenerImpl>
407           passive_listener);
408 
409   friend class grpc_core::testing::ServerTestPeer;
410 
411   struct RequestedCall;
412 
413   class RequestMatcherInterface;
414   class RealRequestMatcher;
415   class AllocatingRequestMatcherBase;
416   class AllocatingRequestMatcherBatch;
417   class AllocatingRequestMatcherRegistered;
418 
419   class ChannelData final {
420    public:
421     ChannelData() = default;
422     ~ChannelData();
423 
424     void InitTransport(RefCountedPtr<Server> server,
425                        RefCountedPtr<Channel> channel, size_t cq_idx,
426                        Transport* transport, intptr_t channelz_socket_uuid);
427 
server()428     RefCountedPtr<Server> server() const { return server_; }
channel()429     Channel* channel() const { return channel_.get(); }
cq_idx()430     size_t cq_idx() const { return cq_idx_; }
431 
432     // Filter vtable functions.
433     static grpc_error_handle InitChannelElement(
434         grpc_channel_element* elem, grpc_channel_element_args* args);
435     static void DestroyChannelElement(grpc_channel_element* elem);
436 
437    private:
438     class ConnectivityWatcher;
439 
440     static void AcceptStream(void* arg, Transport* /*transport*/,
441                              const void* transport_server_data);
442 
443     void Destroy() ABSL_EXCLUSIVE_LOCKS_REQUIRED(server_->mu_global_);
444 
445     static void FinishDestroy(void* arg, grpc_error_handle error);
446 
447     RefCountedPtr<Server> server_;
448     RefCountedPtr<Channel> channel_;
449     // The index into Server::cqs_ of the CQ used as a starting point for
450     // where to publish new incoming calls.
451     size_t cq_idx_;
452     absl::optional<std::list<ChannelData*>::iterator> list_position_;
453     grpc_closure finish_destroy_channel_closure_;
454     intptr_t channelz_socket_uuid_;
455   };
456 
457   class CallData {
458    public:
459     enum class CallState {
460       NOT_STARTED,  // Waiting for metadata.
461       PENDING,      // Initial metadata read, not flow controlled in yet.
462       ACTIVATED,    // Flow controlled in, on completion queue.
463       ZOMBIED,      // Cancelled before being queued.
464     };
465 
466     CallData(grpc_call_element* elem, const grpc_call_element_args& args,
467              RefCountedPtr<Server> server);
468     ~CallData();
469 
470     // Starts the recv_initial_metadata batch on the call.
471     // Invoked from ChannelData::AcceptStream().
472     void Start(grpc_call_element* elem);
473 
474     void SetState(CallState state);
475 
476     // Attempts to move from PENDING to ACTIVATED state.  Returns true
477     // on success.
478     bool MaybeActivate();
479 
480     // Publishes an incoming call to the application after it has been
481     // matched.
482     void Publish(size_t cq_idx, RequestedCall* rc);
483 
484     void KillZombie();
485 
486     void FailCallCreation();
487 
488     // Filter vtable functions.
489     static grpc_error_handle InitCallElement(
490         grpc_call_element* elem, const grpc_call_element_args* args);
491     static void DestroyCallElement(grpc_call_element* elem,
492                                    const grpc_call_final_info* /*final_info*/,
493                                    grpc_closure* /*ignored*/);
494     static void StartTransportStreamOpBatch(
495         grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
496 
497    private:
498     // Helper functions for handling calls at the top of the call stack.
499     static void RecvInitialMetadataBatchComplete(void* arg,
500                                                  grpc_error_handle error);
501     void StartNewRpc(grpc_call_element* elem);
502     static void PublishNewRpc(void* arg, grpc_error_handle error);
503 
504     // Functions used inside the call stack.
505     void StartTransportStreamOpBatchImpl(grpc_call_element* elem,
506                                          grpc_transport_stream_op_batch* batch);
507     static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
508     static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
509 
510     RefCountedPtr<Server> server_;
511 
512     grpc_call* call_;
513 
514     std::atomic<CallState> state_{CallState::NOT_STARTED};
515 
516     absl::optional<Slice> path_;
517     absl::optional<Slice> host_;
518     Timestamp deadline_ = Timestamp::InfFuture();
519 
520     grpc_completion_queue* cq_new_ = nullptr;
521 
522     RequestMatcherInterface* matcher_ = nullptr;
523     grpc_byte_buffer* payload_ = nullptr;
524 
525     grpc_closure kill_zombie_closure_;
526 
527     grpc_metadata_array initial_metadata_ =
528         grpc_metadata_array();  // Zero-initialize the C struct.
529     grpc_closure recv_initial_metadata_batch_complete_;
530 
531     grpc_metadata_batch* recv_initial_metadata_ = nullptr;
532     grpc_closure recv_initial_metadata_ready_;
533     grpc_closure* original_recv_initial_metadata_ready_;
534     grpc_error_handle recv_initial_metadata_error_;
535 
536     bool seen_recv_trailing_metadata_ready_ = false;
537     grpc_closure recv_trailing_metadata_ready_;
538     grpc_closure* original_recv_trailing_metadata_ready_;
539     grpc_error_handle recv_trailing_metadata_error_;
540 
541     grpc_closure publish_;
542 
543     CallCombiner* call_combiner_;
544   };
545 
546   struct ShutdownTag {
ShutdownTagShutdownTag547     ShutdownTag(void* tag_arg, grpc_completion_queue* cq_arg)
548         : tag(tag_arg), cq(cq_arg) {}
549     void* const tag;
550     grpc_completion_queue* const cq;
551     grpc_cq_completion completion;
552   };
553 
554   struct StringViewStringViewPairHash
555       : absl::flat_hash_set<
556             std::pair<absl::string_view, absl::string_view>>::hasher {
557     using is_transparent = void;
558   };
559 
560   struct StringViewStringViewPairEq
561       : std::equal_to<std::pair<absl::string_view, absl::string_view>> {
562     using is_transparent = void;
563   };
564 
565   class TransportConnectivityWatcher;
566 
567   RegisteredMethod* GetRegisteredMethod(const absl::string_view& host,
568                                         const absl::string_view& path);
569   void SetRegisteredMethodOnMetadata(ClientMetadata& metadata);
570 
571   static void ListenerDestroyDone(void* arg, grpc_error_handle error);
572 
DoneShutdownEvent(void * server,grpc_cq_completion *)573   static void DoneShutdownEvent(void* server,
574                                 grpc_cq_completion* /*completion*/) {
575     static_cast<Server*>(server)->Unref();
576   }
577 
578   static void DoneRequestEvent(void* req, grpc_cq_completion* completion);
579 
580   void FailCall(size_t cq_idx, RequestedCall* rc, grpc_error_handle error);
581   grpc_call_error QueueRequestedCall(size_t cq_idx, RequestedCall* rc);
582 
583   void MaybeFinishShutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_)
584       ABSL_LOCKS_EXCLUDED(mu_call_);
585 
586   void KillPendingWorkLocked(grpc_error_handle error)
587       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_call_);
588 
589   static grpc_call_error ValidateServerRequest(
590       grpc_completion_queue* cq_for_notification, void* tag,
591       grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
592   grpc_call_error ValidateServerRequestAndCq(
593       size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
594       grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
595 
596   std::vector<RefCountedPtr<Channel>> GetChannelsLocked() const;
597 
598   // Take a shutdown ref for a request (increment by 2) and return if shutdown
599   // has not been called.
ShutdownRefOnRequest()600   bool ShutdownRefOnRequest() {
601     int old_value = shutdown_refs_.fetch_add(2, std::memory_order_acq_rel);
602     return (old_value & 1) != 0;
603   }
604 
605   // Decrement the shutdown ref counter by either 1 (for shutdown call) or 2
606   // (for in-flight request) and possibly call MaybeFinishShutdown if
607   // appropriate.
ShutdownUnrefOnRequest()608   void ShutdownUnrefOnRequest() ABSL_LOCKS_EXCLUDED(mu_global_) {
609     if (shutdown_refs_.fetch_sub(2, std::memory_order_acq_rel) == 2) {
610       MutexLock lock(&mu_global_);
611       MaybeFinishShutdown();
612     }
613   }
ShutdownUnrefOnShutdownCall()614   void ShutdownUnrefOnShutdownCall() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_) {
615     if (shutdown_refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
616       // There is no request in-flight.
617       MaybeFinishShutdown();
618     }
619   }
620 
ShutdownCalled()621   bool ShutdownCalled() const {
622     return (shutdown_refs_.load(std::memory_order_acquire) & 1) == 0;
623   }
624 
625   // Returns whether there are no more shutdown refs, which means that shutdown
626   // has been called and all accepted requests have been published if using an
627   // AllocatingRequestMatcher.
ShutdownReady()628   bool ShutdownReady() const {
629     return shutdown_refs_.load(std::memory_order_acquire) == 0;
630   }
631 
632   auto MatchAndPublishCall(CallHandler call_handler);
633   absl::StatusOr<RefCountedPtr<UnstartedCallDestination>> MakeCallDestination(
634       const ChannelArgs& args);
635 
636   ChannelArgs const channel_args_;
637   RefCountedPtr<channelz::ServerNode> channelz_node_;
638   std::unique_ptr<ServerConfigFetcher> config_fetcher_;
639   ServerCallTracerFactory* const server_call_tracer_factory_;
640 
641   std::vector<grpc_completion_queue*> cqs_;
642   std::vector<grpc_pollset*> pollsets_;
643   bool started_ = false;
644   const grpc_compression_options compression_options_;
645 
646   // The two following mutexes control access to server-state.
647   // mu_global_ controls access to non-call-related state (e.g., channel state).
648   // mu_call_ controls access to call-related state (e.g., the call lists).
649   //
650   // If they are ever required to be nested, you must lock mu_global_
651   // before mu_call_. This is currently used in shutdown processing
652   // (ShutdownAndNotify() and MaybeFinishShutdown()).
653   Mutex mu_global_;  // mutex for server and channel state
654   Mutex mu_call_;    // mutex for call-specific state
655 
656   // startup synchronization: flag, signals whether we are doing the listener
657   // start routine or not.
658   bool starting_ ABSL_GUARDED_BY(mu_global_) = false;
659   CondVar starting_cv_;
660 
661   // Map of registered methods.
662   absl::flat_hash_map<std::pair<std::string, std::string> /*host, method*/,
663                       std::unique_ptr<RegisteredMethod>,
664                       StringViewStringViewPairHash, StringViewStringViewPairEq>
665       registered_methods_;
666 
667   // Request matcher for unregistered methods.
668   std::unique_ptr<RequestMatcherInterface> unregistered_request_matcher_;
669 
670   // The shutdown refs counter tracks whether or not shutdown has been called
671   // and whether there are any AllocatingRequestMatcher requests that have been
672   // accepted but not yet started (+2 on each one). If shutdown has been called,
673   // the lowest bit will be 0 (defaults to 1) and the counter will be even. The
674   // server should not notify on shutdown until the counter is 0 (shutdown is
675   // called and there are no requests that are accepted but not started).
676   std::atomic<int> shutdown_refs_{1};
677   bool shutdown_published_ ABSL_GUARDED_BY(mu_global_) = false;
678   std::vector<ShutdownTag> shutdown_tags_ ABSL_GUARDED_BY(mu_global_);
679 
ABSL_GUARDED_BY(mu_call_)680   RandomEarlyDetection pending_backlog_protector_ ABSL_GUARDED_BY(mu_call_){
681       static_cast<uint64_t>(
682           std::max(0, channel_args_.GetInt(GRPC_ARG_SERVER_MAX_PENDING_REQUESTS)
683                           .value_or(1000))),
684       static_cast<uint64_t>(std::max(
685           0,
686           channel_args_.GetInt(GRPC_ARG_SERVER_MAX_PENDING_REQUESTS_HARD_LIMIT)
687               .value_or(3000)))};
688   const Duration max_time_in_pending_queue_;
689   absl::BitGen bitgen_ ABSL_GUARDED_BY(mu_call_);
690 
691   std::list<ChannelData*> channels_;
692   absl::flat_hash_set<OrphanablePtr<ServerTransport>> connections_
693       ABSL_GUARDED_BY(mu_global_);
694   RefCountedPtr<ServerConfigFetcher::ConnectionManager> connection_manager_
695       ABSL_GUARDED_BY(mu_global_);
696   size_t connections_open_ ABSL_GUARDED_BY(mu_global_) = 0;
697 
698   std::list<RefCountedPtr<ListenerState>> listener_states_;
699   size_t listeners_destroyed_ = 0;
700 
701   // The last time we printed a shutdown progress message.
702   gpr_timespec last_shutdown_message_time_;
703 };
704 
705 }  // namespace grpc_core
706 
707 #endif  // GRPC_SRC_CORE_SERVER_SERVER_H
708