• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_SRC_CORE_LIB_SURFACE_SERVER_H
18 #define GRPC_SRC_CORE_LIB_SURFACE_SERVER_H
19 
20 #include <grpc/support/port_platform.h>
21 
22 #include <stddef.h>
23 #include <stdint.h>
24 
25 #include <algorithm>
26 #include <atomic>
27 #include <functional>
28 #include <list>
29 #include <memory>
30 #include <string>
31 #include <utility>
32 #include <vector>
33 
34 #include "absl/base/thread_annotations.h"
35 #include "absl/container/flat_hash_map.h"
36 #include "absl/container/flat_hash_set.h"
37 #include "absl/hash/hash.h"
38 #include "absl/random/random.h"
39 #include "absl/status/statusor.h"
40 #include "absl/strings/string_view.h"
41 #include "absl/types/optional.h"
42 
43 #include <grpc/grpc.h>
44 #include <grpc/slice.h>
45 #include <grpc/support/time.h>
46 
47 #include "src/core/lib/backoff/random_early_detection.h"
48 #include "src/core/lib/channel/call_tracer.h"
49 #include "src/core/lib/channel/channel_args.h"
50 #include "src/core/lib/channel/channel_fwd.h"
51 #include "src/core/lib/channel/channel_stack.h"
52 #include "src/core/lib/channel/channelz.h"
53 #include "src/core/lib/debug/trace.h"
54 #include "src/core/lib/gprpp/cpp_impl_of.h"
55 #include "src/core/lib/gprpp/dual_ref_counted.h"
56 #include "src/core/lib/gprpp/orphanable.h"
57 #include "src/core/lib/gprpp/ref_counted_ptr.h"
58 #include "src/core/lib/gprpp/sync.h"
59 #include "src/core/lib/gprpp/time.h"
60 #include "src/core/lib/iomgr/call_combiner.h"
61 #include "src/core/lib/iomgr/closure.h"
62 #include "src/core/lib/iomgr/endpoint.h"
63 #include "src/core/lib/iomgr/error.h"
64 #include "src/core/lib/iomgr/iomgr_fwd.h"
65 #include "src/core/lib/promise/arena_promise.h"
66 #include "src/core/lib/slice/slice.h"
67 #include "src/core/lib/surface/channel.h"
68 #include "src/core/lib/surface/completion_queue.h"
69 #include "src/core/lib/surface/server_interface.h"
70 #include "src/core/lib/transport/metadata_batch.h"
71 #include "src/core/lib/transport/transport.h"
72 
73 #define GRPC_ARG_SERVER_MAX_PENDING_REQUESTS "grpc.server.max_pending_requests"
74 #define GRPC_ARG_SERVER_MAX_PENDING_REQUESTS_HARD_LIMIT \
75   "grpc.server.max_pending_requests_hard_limit"
76 
77 namespace grpc_core {
78 
79 extern TraceFlag grpc_server_channel_trace;
80 
81 class Server : public ServerInterface,
82                public InternallyRefCounted<Server>,
83                public CppImplOf<Server, grpc_server> {
84  public:
85   // Filter vtable.
86   static const grpc_channel_filter kServerTopFilter;
87 
88   // Opaque type used for registered methods.
89   struct RegisteredMethod;
90 
91   // An object to represent the most relevant characteristics of a
92   // newly-allocated call object when using an AllocatingRequestMatcherBatch.
93   struct BatchCallAllocation {
94     void* tag;
95     grpc_call** call;
96     grpc_metadata_array* initial_metadata;
97     grpc_call_details* details;
98     grpc_completion_queue* cq;
99   };
100 
101   // An object to represent the most relevant characteristics of a
102   // newly-allocated call object when using an
103   // AllocatingRequestMatcherRegistered.
104   struct RegisteredCallAllocation {
105     void* tag;
106     grpc_call** call;
107     grpc_metadata_array* initial_metadata;
108     gpr_timespec* deadline;
109     grpc_byte_buffer** optional_payload;
110     grpc_completion_queue* cq;
111   };
112 
113   /// Interface for listeners.
114   /// Implementations must override the Orphan() method, which should stop
115   /// listening and initiate destruction of the listener.
116   class ListenerInterface : public Orphanable {
117    public:
118     ~ListenerInterface() override = default;
119 
120     /// Starts listening. This listener may refer to the pollset object beyond
121     /// this call, so it is a pointer rather than a reference.
122     virtual void Start(Server* server,
123                        const std::vector<grpc_pollset*>* pollsets) = 0;
124 
125     /// Returns the channelz node for the listen socket, or null if not
126     /// supported.
127     virtual channelz::ListenSocketNode* channelz_listen_socket_node() const = 0;
128 
129     /// Sets a closure to be invoked by the listener when its destruction
130     /// is complete.
131     virtual void SetOnDestroyDone(grpc_closure* on_destroy_done) = 0;
132   };
133 
134   explicit Server(const ChannelArgs& args);
135   ~Server() override;
136 
137   void Orphan() ABSL_LOCKS_EXCLUDED(mu_global_) override;
138 
channel_args()139   const ChannelArgs& channel_args() const override { return channel_args_; }
channelz_node()140   channelz::ServerNode* channelz_node() const override {
141     return channelz_node_.get();
142   }
143 
144   // Do not call this before Start(). Returns the pollsets. The
145   // vector itself is immutable, but the pollsets inside are mutable. The
146   // result is valid for the lifetime of the server.
pollsets()147   const std::vector<grpc_pollset*>& pollsets() const { return pollsets_; }
148 
config_fetcher()149   grpc_server_config_fetcher* config_fetcher() const {
150     return config_fetcher_.get();
151   }
152 
server_call_tracer_factory()153   ServerCallTracerFactory* server_call_tracer_factory() const override {
154     return server_call_tracer_factory_;
155   }
156 
157   void set_config_fetcher(
158       std::unique_ptr<grpc_server_config_fetcher> config_fetcher);
159 
160   bool HasOpenConnections() ABSL_LOCKS_EXCLUDED(mu_global_);
161 
162   // Adds a listener to the server.  When the server starts, it will call
163   // the listener's Start() method, and when it shuts down, it will orphan
164   // the listener.
165   void AddListener(OrphanablePtr<ListenerInterface> listener);
166 
167   // Starts listening for connections.
168   void Start() ABSL_LOCKS_EXCLUDED(mu_global_);
169 
170   // Sets up a transport.  Creates a channel stack and binds the transport to
171   // the server.  Called from the listener when a new connection is accepted.
172   // Takes ownership of a ref on resource_user from the caller.
173   grpc_error_handle SetupTransport(
174       Transport* transport, grpc_pollset* accepting_pollset,
175       const ChannelArgs& args,
176       const RefCountedPtr<channelz::SocketNode>& socket_node);
177 
178   void RegisterCompletionQueue(grpc_completion_queue* cq);
179 
180   // Functions to specify that a specific registered method or the unregistered
181   // collection should use a specific allocator for request matching.
182   void SetRegisteredMethodAllocator(
183       grpc_completion_queue* cq, void* method_tag,
184       std::function<RegisteredCallAllocation()> allocator);
185   void SetBatchMethodAllocator(grpc_completion_queue* cq,
186                                std::function<BatchCallAllocation()> allocator);
187 
188   RegisteredMethod* RegisterMethod(
189       const char* method, const char* host,
190       grpc_server_register_method_payload_handling payload_handling,
191       uint32_t flags);
192 
193   grpc_call_error RequestCall(grpc_call** call, grpc_call_details* details,
194                               grpc_metadata_array* request_metadata,
195                               grpc_completion_queue* cq_bound_to_call,
196                               grpc_completion_queue* cq_for_notification,
197                               void* tag);
198 
199   grpc_call_error RequestRegisteredCall(
200       RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline,
201       grpc_metadata_array* request_metadata,
202       grpc_byte_buffer** optional_payload,
203       grpc_completion_queue* cq_bound_to_call,
204       grpc_completion_queue* cq_for_notification, void* tag_new);
205 
206   void ShutdownAndNotify(grpc_completion_queue* cq, void* tag)
207       ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_);
208 
209   void StopListening();
210 
211   void CancelAllCalls() ABSL_LOCKS_EXCLUDED(mu_global_);
212 
213   void SendGoaways() ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_);
214 
215  private:
216   struct RequestedCall;
217 
218   class RequestMatcherInterface;
219   class RealRequestMatcherFilterStack;
220   class RealRequestMatcherPromises;
221   class AllocatingRequestMatcherBase;
222   class AllocatingRequestMatcherBatch;
223   class AllocatingRequestMatcherRegistered;
224 
225   class ChannelData final : public ServerTransport::Acceptor {
226    public:
227     ChannelData() = default;
228     ~ChannelData();
229 
230     void InitTransport(RefCountedPtr<Server> server,
231                        OrphanablePtr<Channel> channel, size_t cq_idx,
232                        Transport* transport, intptr_t channelz_socket_uuid);
233 
server()234     RefCountedPtr<Server> server() const { return server_; }
channel()235     Channel* channel() const { return channel_.get(); }
cq_idx()236     size_t cq_idx() const { return cq_idx_; }
237 
238     RegisteredMethod* GetRegisteredMethod(const absl::string_view& host,
239                                           const absl::string_view& path);
240     // Filter vtable functions.
241     static grpc_error_handle InitChannelElement(
242         grpc_channel_element* elem, grpc_channel_element_args* args);
243     static void DestroyChannelElement(grpc_channel_element* elem);
244     static ArenaPromise<ServerMetadataHandle> MakeCallPromise(
245         grpc_channel_element* elem, CallArgs call_args, NextPromiseFactory);
246     void InitCall(RefCountedPtr<CallSpineInterface> call);
247 
248     Arena* CreateArena() override;
249     absl::StatusOr<CallInitiator> CreateCall(
250         ClientMetadata& client_initial_metadata, Arena* arena) override;
251 
252    private:
253     class ConnectivityWatcher;
254 
255     static void AcceptStream(void* arg, Transport* /*transport*/,
256                              const void* transport_server_data);
257     void SetRegisteredMethodOnMetadata(ClientMetadata& metadata);
258 
259     void Destroy() ABSL_EXCLUSIVE_LOCKS_REQUIRED(server_->mu_global_);
260 
261     static void FinishDestroy(void* arg, grpc_error_handle error);
262 
263     RefCountedPtr<Server> server_;
264     OrphanablePtr<Channel> channel_;
265     // The index into Server::cqs_ of the CQ used as a starting point for
266     // where to publish new incoming calls.
267     size_t cq_idx_;
268     absl::optional<std::list<ChannelData*>::iterator> list_position_;
269     grpc_closure finish_destroy_channel_closure_;
270     intptr_t channelz_socket_uuid_;
271   };
272 
273   class CallData {
274    public:
275     enum class CallState {
276       NOT_STARTED,  // Waiting for metadata.
277       PENDING,      // Initial metadata read, not flow controlled in yet.
278       ACTIVATED,    // Flow controlled in, on completion queue.
279       ZOMBIED,      // Cancelled before being queued.
280     };
281 
282     CallData(grpc_call_element* elem, const grpc_call_element_args& args,
283              RefCountedPtr<Server> server);
284     ~CallData();
285 
286     // Starts the recv_initial_metadata batch on the call.
287     // Invoked from ChannelData::AcceptStream().
288     void Start(grpc_call_element* elem);
289 
290     void SetState(CallState state);
291 
292     // Attempts to move from PENDING to ACTIVATED state.  Returns true
293     // on success.
294     bool MaybeActivate();
295 
296     // Publishes an incoming call to the application after it has been
297     // matched.
298     void Publish(size_t cq_idx, RequestedCall* rc);
299 
300     void KillZombie();
301 
302     void FailCallCreation();
303 
304     // Filter vtable functions.
305     static grpc_error_handle InitCallElement(
306         grpc_call_element* elem, const grpc_call_element_args* args);
307     static void DestroyCallElement(grpc_call_element* elem,
308                                    const grpc_call_final_info* /*final_info*/,
309                                    grpc_closure* /*ignored*/);
310     static void StartTransportStreamOpBatch(
311         grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
312 
313    private:
314     // Helper functions for handling calls at the top of the call stack.
315     static void RecvInitialMetadataBatchComplete(void* arg,
316                                                  grpc_error_handle error);
317     void StartNewRpc(grpc_call_element* elem);
318     static void PublishNewRpc(void* arg, grpc_error_handle error);
319 
320     // Functions used inside the call stack.
321     void StartTransportStreamOpBatchImpl(grpc_call_element* elem,
322                                          grpc_transport_stream_op_batch* batch);
323     static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
324     static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
325 
326     RefCountedPtr<Server> server_;
327 
328     grpc_call* call_;
329 
330     std::atomic<CallState> state_{CallState::NOT_STARTED};
331 
332     absl::optional<Slice> path_;
333     absl::optional<Slice> host_;
334     Timestamp deadline_ = Timestamp::InfFuture();
335 
336     grpc_completion_queue* cq_new_ = nullptr;
337 
338     RequestMatcherInterface* matcher_ = nullptr;
339     grpc_byte_buffer* payload_ = nullptr;
340 
341     grpc_closure kill_zombie_closure_;
342 
343     grpc_metadata_array initial_metadata_ =
344         grpc_metadata_array();  // Zero-initialize the C struct.
345     grpc_closure recv_initial_metadata_batch_complete_;
346 
347     grpc_metadata_batch* recv_initial_metadata_ = nullptr;
348     grpc_closure recv_initial_metadata_ready_;
349     grpc_closure* original_recv_initial_metadata_ready_;
350     grpc_error_handle recv_initial_metadata_error_;
351 
352     bool seen_recv_trailing_metadata_ready_ = false;
353     grpc_closure recv_trailing_metadata_ready_;
354     grpc_closure* original_recv_trailing_metadata_ready_;
355     grpc_error_handle recv_trailing_metadata_error_;
356 
357     grpc_closure publish_;
358 
359     CallCombiner* call_combiner_;
360   };
361 
362   struct Listener {
ListenerListener363     explicit Listener(OrphanablePtr<ListenerInterface> l)
364         : listener(std::move(l)) {}
365     OrphanablePtr<ListenerInterface> listener;
366     grpc_closure destroy_done;
367   };
368 
369   struct ShutdownTag {
ShutdownTagShutdownTag370     ShutdownTag(void* tag_arg, grpc_completion_queue* cq_arg)
371         : tag(tag_arg), cq(cq_arg) {}
372     void* const tag;
373     grpc_completion_queue* const cq;
374     grpc_cq_completion completion;
375   };
376 
377   struct StringViewStringViewPairHash
378       : absl::flat_hash_set<
379             std::pair<absl::string_view, absl::string_view>>::hasher {
380     using is_transparent = void;
381   };
382 
383   struct StringViewStringViewPairEq
384       : std::equal_to<std::pair<absl::string_view, absl::string_view>> {
385     using is_transparent = void;
386   };
387 
388   static void ListenerDestroyDone(void* arg, grpc_error_handle error);
389 
DoneShutdownEvent(void * server,grpc_cq_completion *)390   static void DoneShutdownEvent(void* server,
391                                 grpc_cq_completion* /*completion*/) {
392     static_cast<Server*>(server)->Unref();
393   }
394 
395   static void DoneRequestEvent(void* req, grpc_cq_completion* completion);
396 
397   void FailCall(size_t cq_idx, RequestedCall* rc, grpc_error_handle error);
398   grpc_call_error QueueRequestedCall(size_t cq_idx, RequestedCall* rc);
399 
400   void MaybeFinishShutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_)
401       ABSL_LOCKS_EXCLUDED(mu_call_);
402 
403   void KillPendingWorkLocked(grpc_error_handle error)
404       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_call_);
405 
406   static grpc_call_error ValidateServerRequest(
407       grpc_completion_queue* cq_for_notification, void* tag,
408       grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
409   grpc_call_error ValidateServerRequestAndCq(
410       size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
411       grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
412 
413   std::vector<RefCountedPtr<Channel>> GetChannelsLocked() const;
414 
415   // Take a shutdown ref for a request (increment by 2) and return if shutdown
416   // has not been called.
ShutdownRefOnRequest()417   bool ShutdownRefOnRequest() {
418     int old_value = shutdown_refs_.fetch_add(2, std::memory_order_acq_rel);
419     return (old_value & 1) != 0;
420   }
421 
422   // Decrement the shutdown ref counter by either 1 (for shutdown call) or 2
423   // (for in-flight request) and possibly call MaybeFinishShutdown if
424   // appropriate.
ShutdownUnrefOnRequest()425   void ShutdownUnrefOnRequest() ABSL_LOCKS_EXCLUDED(mu_global_) {
426     if (shutdown_refs_.fetch_sub(2, std::memory_order_acq_rel) == 2) {
427       MutexLock lock(&mu_global_);
428       MaybeFinishShutdown();
429     }
430   }
ShutdownUnrefOnShutdownCall()431   void ShutdownUnrefOnShutdownCall() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_) {
432     if (shutdown_refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
433       // There is no request in-flight.
434       MaybeFinishShutdown();
435     }
436   }
437 
ShutdownCalled()438   bool ShutdownCalled() const {
439     return (shutdown_refs_.load(std::memory_order_acquire) & 1) == 0;
440   }
441 
442   // Returns whether there are no more shutdown refs, which means that shutdown
443   // has been called and all accepted requests have been published if using an
444   // AllocatingRequestMatcher.
ShutdownReady()445   bool ShutdownReady() const {
446     return shutdown_refs_.load(std::memory_order_acquire) == 0;
447   }
448 
449   ChannelArgs const channel_args_;
450   RefCountedPtr<channelz::ServerNode> channelz_node_;
451   std::unique_ptr<grpc_server_config_fetcher> config_fetcher_;
452   ServerCallTracerFactory* const server_call_tracer_factory_;
453 
454   std::vector<grpc_completion_queue*> cqs_;
455   std::vector<grpc_pollset*> pollsets_;
456   bool started_ = false;
457 
458   // The two following mutexes control access to server-state.
459   // mu_global_ controls access to non-call-related state (e.g., channel state).
460   // mu_call_ controls access to call-related state (e.g., the call lists).
461   //
462   // If they are ever required to be nested, you must lock mu_global_
463   // before mu_call_. This is currently used in shutdown processing
464   // (ShutdownAndNotify() and MaybeFinishShutdown()).
465   Mutex mu_global_;  // mutex for server and channel state
466   Mutex mu_call_;    // mutex for call-specific state
467 
468   // startup synchronization: flag, signals whether we are doing the listener
469   // start routine or not.
470   bool starting_ ABSL_GUARDED_BY(mu_global_) = false;
471   CondVar starting_cv_;
472 
473   // Map of registered methods.
474   absl::flat_hash_map<std::pair<std::string, std::string> /*host, method*/,
475                       std::unique_ptr<RegisteredMethod>,
476                       StringViewStringViewPairHash, StringViewStringViewPairEq>
477       registered_methods_;
478 
479   // Request matcher for unregistered methods.
480   std::unique_ptr<RequestMatcherInterface> unregistered_request_matcher_;
481 
482   // The shutdown refs counter tracks whether or not shutdown has been called
483   // and whether there are any AllocatingRequestMatcher requests that have been
484   // accepted but not yet started (+2 on each one). If shutdown has been called,
485   // the lowest bit will be 0 (defaults to 1) and the counter will be even. The
486   // server should not notify on shutdown until the counter is 0 (shutdown is
487   // called and there are no requests that are accepted but not started).
488   std::atomic<int> shutdown_refs_{1};
489   bool shutdown_published_ ABSL_GUARDED_BY(mu_global_) = false;
490   std::vector<ShutdownTag> shutdown_tags_ ABSL_GUARDED_BY(mu_global_);
491 
ABSL_GUARDED_BY(mu_call_)492   RandomEarlyDetection pending_backlog_protector_ ABSL_GUARDED_BY(mu_call_){
493       static_cast<uint64_t>(
494           std::max(0, channel_args_.GetInt(GRPC_ARG_SERVER_MAX_PENDING_REQUESTS)
495                           .value_or(1000))),
496       static_cast<uint64_t>(std::max(
497           0,
498           channel_args_.GetInt(GRPC_ARG_SERVER_MAX_PENDING_REQUESTS_HARD_LIMIT)
499               .value_or(3000)))};
500   const Duration max_time_in_pending_queue_;
501   absl::BitGen bitgen_ ABSL_GUARDED_BY(mu_call_);
502 
503   std::list<ChannelData*> channels_;
504 
505   std::list<Listener> listeners_;
506   size_t listeners_destroyed_ = 0;
507 
508   // The last time we printed a shutdown progress message.
509   gpr_timespec last_shutdown_message_time_;
510 };
511 
512 }  // namespace grpc_core
513 
514 struct grpc_server_config_fetcher {
515  public:
516   class ConnectionManager
517       : public grpc_core::DualRefCounted<ConnectionManager> {
518    public:
519     // Ownership of \a args is transfered.
520     virtual absl::StatusOr<grpc_core::ChannelArgs>
521     UpdateChannelArgsForConnection(const grpc_core::ChannelArgs& args,
522                                    grpc_endpoint* tcp) = 0;
523   };
524 
525   class WatcherInterface {
526    public:
527     virtual ~WatcherInterface() = default;
528     // UpdateConnectionManager() is invoked by the config fetcher when a new
529     // config is available. Implementations should update the connection manager
530     // and start serving if not already serving.
531     virtual void UpdateConnectionManager(
532         grpc_core::RefCountedPtr<ConnectionManager> manager) = 0;
533     // Implementations should stop serving when this is called. Serving should
534     // only resume when UpdateConfig() is invoked.
535     virtual void StopServing() = 0;
536   };
537 
538   virtual ~grpc_server_config_fetcher() = default;
539 
540   virtual void StartWatch(std::string listening_address,
541                           std::unique_ptr<WatcherInterface> watcher) = 0;
542   virtual void CancelWatch(WatcherInterface* watcher) = 0;
543   virtual grpc_pollset_set* interested_parties() = 0;
544 };
545 
546 namespace grpc_core {
547 
set_config_fetcher(std::unique_ptr<grpc_server_config_fetcher> config_fetcher)548 inline void Server::set_config_fetcher(
549     std::unique_ptr<grpc_server_config_fetcher> config_fetcher) {
550   config_fetcher_ = std::move(config_fetcher);
551 }
552 
553 }  // namespace grpc_core
554 
555 #endif  // GRPC_SRC_CORE_LIB_SURFACE_SERVER_H
556