• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_CORE_LIB_SURFACE_SERVER_H
18 #define GRPC_CORE_LIB_SURFACE_SERVER_H
19 
20 #include <grpc/support/port_platform.h>
21 
22 #include <list>
23 #include <vector>
24 
25 #include "absl/status/statusor.h"
26 #include "absl/types/optional.h"
27 
28 #include <grpc/grpc.h>
29 
30 #include "src/core/lib/channel/channel_args.h"
31 #include "src/core/lib/channel/channel_stack.h"
32 #include "src/core/lib/channel/channelz.h"
33 #include "src/core/lib/debug/trace.h"
34 #include "src/core/lib/gprpp/atomic.h"
35 #include "src/core/lib/iomgr/resolve_address.h"
36 #include "src/core/lib/surface/completion_queue.h"
37 #include "src/core/lib/transport/transport.h"
38 
39 namespace grpc_core {
40 
41 extern TraceFlag grpc_server_channel_trace;
42 
43 class Server : public InternallyRefCounted<Server> {
44  public:
45   // Filter vtable.
46   static const grpc_channel_filter kServerTopFilter;
47 
48   // Opaque type used for registered methods.
49   struct RegisteredMethod;
50 
51   // An object to represent the most relevant characteristics of a
52   // newly-allocated call object when using an AllocatingRequestMatcherBatch.
53   struct BatchCallAllocation {
54     void* tag;
55     grpc_call** call;
56     grpc_metadata_array* initial_metadata;
57     grpc_call_details* details;
58     grpc_completion_queue* cq;
59   };
60 
61   // An object to represent the most relevant characteristics of a
62   // newly-allocated call object when using an
63   // AllocatingRequestMatcherRegistered.
64   struct RegisteredCallAllocation {
65     void* tag;
66     grpc_call** call;
67     grpc_metadata_array* initial_metadata;
68     gpr_timespec* deadline;
69     grpc_byte_buffer** optional_payload;
70     grpc_completion_queue* cq;
71   };
72 
73   /// Interface for listeners.
74   /// Implementations must override the Orphan() method, which should stop
75   /// listening and initiate destruction of the listener.
76   class ListenerInterface : public Orphanable {
77    public:
78     ~ListenerInterface() override = default;
79 
80     /// Starts listening. This listener may refer to the pollset object beyond
81     /// this call, so it is a pointer rather than a reference.
82     virtual void Start(Server* server,
83                        const std::vector<grpc_pollset*>* pollsets) = 0;
84 
85     /// Returns the channelz node for the listen socket, or null if not
86     /// supported.
87     virtual channelz::ListenSocketNode* channelz_listen_socket_node() const = 0;
88 
89     /// Sets a closure to be invoked by the listener when its destruction
90     /// is complete.
91     virtual void SetOnDestroyDone(grpc_closure* on_destroy_done) = 0;
92   };
93 
94   explicit Server(const grpc_channel_args* args);
95   ~Server() override;
96 
97   void Orphan() ABSL_LOCKS_EXCLUDED(mu_global_) override;
98 
channel_args()99   const grpc_channel_args* channel_args() const { return channel_args_; }
default_resource_user()100   grpc_resource_user* default_resource_user() const {
101     return default_resource_user_;
102   }
channelz_node()103   channelz::ServerNode* channelz_node() const { return channelz_node_.get(); }
104 
105   // Do not call this before Start(). Returns the pollsets. The
106   // vector itself is immutable, but the pollsets inside are mutable. The
107   // result is valid for the lifetime of the server.
pollsets()108   const std::vector<grpc_pollset*>& pollsets() const { return pollsets_; }
109 
config_fetcher()110   grpc_server_config_fetcher* config_fetcher() const {
111     return config_fetcher_.get();
112   }
113 
set_config_fetcher(std::unique_ptr<grpc_server_config_fetcher> config_fetcher)114   void set_config_fetcher(
115       std::unique_ptr<grpc_server_config_fetcher> config_fetcher) {
116     config_fetcher_ = std::move(config_fetcher);
117   }
118 
119   bool HasOpenConnections() ABSL_LOCKS_EXCLUDED(mu_global_);
120 
121   // Adds a listener to the server.  When the server starts, it will call
122   // the listener's Start() method, and when it shuts down, it will orphan
123   // the listener.
124   void AddListener(OrphanablePtr<ListenerInterface> listener);
125 
126   // Starts listening for connections.
127   void Start() ABSL_LOCKS_EXCLUDED(mu_global_);
128 
129   // Sets up a transport.  Creates a channel stack and binds the transport to
130   // the server.  Called from the listener when a new connection is accepted.
131   grpc_error_handle SetupTransport(
132       grpc_transport* transport, grpc_pollset* accepting_pollset,
133       const grpc_channel_args* args,
134       const RefCountedPtr<channelz::SocketNode>& socket_node,
135       grpc_resource_user* resource_user = nullptr);
136 
137   void RegisterCompletionQueue(grpc_completion_queue* cq);
138 
139   // Functions to specify that a specific registered method or the unregistered
140   // collection should use a specific allocator for request matching.
141   void SetRegisteredMethodAllocator(
142       grpc_completion_queue* cq, void* method_tag,
143       std::function<RegisteredCallAllocation()> allocator);
144   void SetBatchMethodAllocator(grpc_completion_queue* cq,
145                                std::function<BatchCallAllocation()> allocator);
146 
147   RegisteredMethod* RegisterMethod(
148       const char* method, const char* host,
149       grpc_server_register_method_payload_handling payload_handling,
150       uint32_t flags);
151 
152   grpc_call_error RequestCall(grpc_call** call, grpc_call_details* details,
153                               grpc_metadata_array* request_metadata,
154                               grpc_completion_queue* cq_bound_to_call,
155                               grpc_completion_queue* cq_for_notification,
156                               void* tag);
157 
158   grpc_call_error RequestRegisteredCall(
159       RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline,
160       grpc_metadata_array* request_metadata,
161       grpc_byte_buffer** optional_payload,
162       grpc_completion_queue* cq_bound_to_call,
163       grpc_completion_queue* cq_for_notification, void* tag_new);
164 
165   void ShutdownAndNotify(grpc_completion_queue* cq, void* tag)
166       ABSL_LOCKS_EXCLUDED(mu_global_, mu_call_);
167 
168   void CancelAllCalls() ABSL_LOCKS_EXCLUDED(mu_global_);
169 
170  private:
171   struct RequestedCall;
172 
173   struct ChannelRegisteredMethod {
174     RegisteredMethod* server_registered_method = nullptr;
175     uint32_t flags;
176     bool has_host;
177     ExternallyManagedSlice method;
178     ExternallyManagedSlice host;
179   };
180 
181   class RequestMatcherInterface;
182   class RealRequestMatcher;
183   class AllocatingRequestMatcherBase;
184   class AllocatingRequestMatcherBatch;
185   class AllocatingRequestMatcherRegistered;
186 
187   class ChannelData {
188    public:
189     ChannelData() = default;
190     ~ChannelData();
191 
192     void InitTransport(RefCountedPtr<Server> server, grpc_channel* channel,
193                        size_t cq_idx, grpc_transport* transport,
194                        intptr_t channelz_socket_uuid);
195 
server()196     RefCountedPtr<Server> server() const { return server_; }
channel()197     grpc_channel* channel() const { return channel_; }
cq_idx()198     size_t cq_idx() const { return cq_idx_; }
199 
200     ChannelRegisteredMethod* GetRegisteredMethod(const grpc_slice& host,
201                                                  const grpc_slice& path,
202                                                  bool is_idempotent);
203 
204     // Filter vtable functions.
205     static grpc_error_handle InitChannelElement(
206         grpc_channel_element* elem, grpc_channel_element_args* args);
207     static void DestroyChannelElement(grpc_channel_element* elem);
208 
209    private:
210     class ConnectivityWatcher;
211 
212     static void AcceptStream(void* arg, grpc_transport* /*transport*/,
213                              const void* transport_server_data);
214 
215     void Destroy() ABSL_EXCLUSIVE_LOCKS_REQUIRED(server_->mu_global_);
216 
217     static void FinishDestroy(void* arg, grpc_error_handle error);
218 
219     RefCountedPtr<Server> server_;
220     grpc_channel* channel_;
221     // The index into Server::cqs_ of the CQ used as a starting point for
222     // where to publish new incoming calls.
223     size_t cq_idx_;
224     absl::optional<std::list<ChannelData*>::iterator> list_position_;
225     // A hash-table of the methods and hosts of the registered methods.
226     // TODO(vjpai): Convert this to an STL map type as opposed to a direct
227     // bucket implementation. (Consider performance impact, hash function to
228     // use, etc.)
229     std::unique_ptr<std::vector<ChannelRegisteredMethod>> registered_methods_;
230     uint32_t registered_method_max_probes_;
231     grpc_closure finish_destroy_channel_closure_;
232     intptr_t channelz_socket_uuid_;
233   };
234 
235   class CallData {
236    public:
237     enum class CallState {
238       NOT_STARTED,  // Waiting for metadata.
239       PENDING,      // Initial metadata read, not flow controlled in yet.
240       ACTIVATED,    // Flow controlled in, on completion queue.
241       ZOMBIED,      // Cancelled before being queued.
242     };
243 
244     CallData(grpc_call_element* elem, const grpc_call_element_args& args,
245              RefCountedPtr<Server> server);
246     ~CallData();
247 
248     // Starts the recv_initial_metadata batch on the call.
249     // Invoked from ChannelData::AcceptStream().
250     void Start(grpc_call_element* elem);
251 
252     void SetState(CallState state);
253 
254     // Attempts to move from PENDING to ACTIVATED state.  Returns true
255     // on success.
256     bool MaybeActivate();
257 
258     // Publishes an incoming call to the application after it has been
259     // matched.
260     void Publish(size_t cq_idx, RequestedCall* rc);
261 
262     void KillZombie();
263 
264     void FailCallCreation();
265 
266     // Filter vtable functions.
267     static grpc_error_handle InitCallElement(
268         grpc_call_element* elem, const grpc_call_element_args* args);
269     static void DestroyCallElement(grpc_call_element* elem,
270                                    const grpc_call_final_info* /*final_info*/,
271                                    grpc_closure* /*ignored*/);
272     static void StartTransportStreamOpBatch(
273         grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
274 
275    private:
276     // Helper functions for handling calls at the top of the call stack.
277     static void RecvInitialMetadataBatchComplete(void* arg,
278                                                  grpc_error_handle error);
279     void StartNewRpc(grpc_call_element* elem);
280     static void PublishNewRpc(void* arg, grpc_error_handle error);
281 
282     // Functions used inside the call stack.
283     void StartTransportStreamOpBatchImpl(grpc_call_element* elem,
284                                          grpc_transport_stream_op_batch* batch);
285     static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
286     static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
287 
288     RefCountedPtr<Server> server_;
289 
290     grpc_call* call_;
291 
292     Atomic<CallState> state_{CallState::NOT_STARTED};
293 
294     absl::optional<grpc_slice> path_;
295     absl::optional<grpc_slice> host_;
296     grpc_millis deadline_ = GRPC_MILLIS_INF_FUTURE;
297 
298     grpc_completion_queue* cq_new_ = nullptr;
299 
300     RequestMatcherInterface* matcher_ = nullptr;
301     grpc_byte_buffer* payload_ = nullptr;
302 
303     grpc_closure kill_zombie_closure_;
304 
305     grpc_metadata_array initial_metadata_ =
306         grpc_metadata_array();  // Zero-initialize the C struct.
307     grpc_closure recv_initial_metadata_batch_complete_;
308 
309     grpc_metadata_batch* recv_initial_metadata_ = nullptr;
310     uint32_t recv_initial_metadata_flags_ = 0;
311     grpc_closure recv_initial_metadata_ready_;
312     grpc_closure* original_recv_initial_metadata_ready_;
313     grpc_error_handle recv_initial_metadata_error_ = GRPC_ERROR_NONE;
314 
315     bool seen_recv_trailing_metadata_ready_ = false;
316     grpc_closure recv_trailing_metadata_ready_;
317     grpc_closure* original_recv_trailing_metadata_ready_;
318     grpc_error_handle recv_trailing_metadata_error_ = GRPC_ERROR_NONE;
319 
320     grpc_closure publish_;
321 
322     CallCombiner* call_combiner_;
323   };
324 
325   struct Listener {
ListenerListener326     explicit Listener(OrphanablePtr<ListenerInterface> l)
327         : listener(std::move(l)) {}
328     OrphanablePtr<ListenerInterface> listener;
329     grpc_closure destroy_done;
330   };
331 
332   struct ShutdownTag {
ShutdownTagShutdownTag333     ShutdownTag(void* tag_arg, grpc_completion_queue* cq_arg)
334         : tag(tag_arg), cq(cq_arg) {}
335     void* const tag;
336     grpc_completion_queue* const cq;
337     grpc_cq_completion completion;
338   };
339 
340   static void ListenerDestroyDone(void* arg, grpc_error_handle error);
341 
DoneShutdownEvent(void * server,grpc_cq_completion *)342   static void DoneShutdownEvent(void* server,
343                                 grpc_cq_completion* /*completion*/) {
344     static_cast<Server*>(server)->Unref();
345   }
346 
347   static void DoneRequestEvent(void* req, grpc_cq_completion* completion);
348 
349   void FailCall(size_t cq_idx, RequestedCall* rc, grpc_error_handle error);
350   grpc_call_error QueueRequestedCall(size_t cq_idx, RequestedCall* rc);
351 
352   void MaybeFinishShutdown() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_)
353       ABSL_LOCKS_EXCLUDED(mu_call_);
354 
355   void KillPendingWorkLocked(grpc_error_handle error)
356       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_call_);
357 
358   static grpc_call_error ValidateServerRequest(
359       grpc_completion_queue* cq_for_notification, void* tag,
360       grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
361   grpc_call_error ValidateServerRequestAndCq(
362       size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
363       grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
364 
365   std::vector<grpc_channel*> GetChannelsLocked() const;
366 
367   // Take a shutdown ref for a request (increment by 2) and return if shutdown
368   // has already been called.
ShutdownRefOnRequest()369   bool ShutdownRefOnRequest() {
370     int old_value = shutdown_refs_.FetchAdd(2, MemoryOrder::ACQ_REL);
371     return (old_value & 1) != 0;
372   }
373 
374   // Decrement the shutdown ref counter by either 1 (for shutdown call) or 2
375   // (for in-flight request) and possibly call MaybeFinishShutdown if
376   // appropriate.
ShutdownUnrefOnRequest()377   void ShutdownUnrefOnRequest() ABSL_LOCKS_EXCLUDED(mu_global_) {
378     if (shutdown_refs_.FetchSub(2, MemoryOrder::ACQ_REL) == 2) {
379       MutexLock lock(&mu_global_);
380       MaybeFinishShutdown();
381     }
382   }
ShutdownUnrefOnShutdownCall()383   void ShutdownUnrefOnShutdownCall() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_global_) {
384     if (shutdown_refs_.FetchSub(1, MemoryOrder::ACQ_REL) == 1) {
385       MaybeFinishShutdown();
386     }
387   }
388 
ShutdownCalled()389   bool ShutdownCalled() const {
390     return (shutdown_refs_.Load(MemoryOrder::ACQUIRE) & 1) == 0;
391   }
392 
393   // Returns whether there are no more shutdown refs, which means that shutdown
394   // has been called and all accepted requests have been published if using an
395   // AllocatingRequestMatcher.
ShutdownReady()396   bool ShutdownReady() const {
397     return shutdown_refs_.Load(MemoryOrder::ACQUIRE) == 0;
398   }
399 
400   grpc_channel_args* const channel_args_;
401   grpc_resource_user* default_resource_user_ = nullptr;
402   RefCountedPtr<channelz::ServerNode> channelz_node_;
403   std::unique_ptr<grpc_server_config_fetcher> config_fetcher_;
404 
405   std::vector<grpc_completion_queue*> cqs_;
406   std::vector<grpc_pollset*> pollsets_;
407   bool started_ = false;
408 
409   // The two following mutexes control access to server-state.
410   // mu_global_ controls access to non-call-related state (e.g., channel state).
411   // mu_call_ controls access to call-related state (e.g., the call lists).
412   //
413   // If they are ever required to be nested, you must lock mu_global_
414   // before mu_call_. This is currently used in shutdown processing
415   // (ShutdownAndNotify() and MaybeFinishShutdown()).
416   Mutex mu_global_;  // mutex for server and channel state
417   Mutex mu_call_;    // mutex for call-specific state
418 
419   // startup synchronization: flag is protected by mu_global_, signals whether
420   // we are doing the listener start routine or not.
421   bool starting_ = false;
422   CondVar starting_cv_;
423 
424   std::vector<std::unique_ptr<RegisteredMethod>> registered_methods_;
425 
426   // Request matcher for unregistered methods.
427   std::unique_ptr<RequestMatcherInterface> unregistered_request_matcher_;
428 
429   // The shutdown refs counter tracks whether or not shutdown has been called
430   // and whether there are any AllocatingRequestMatcher requests that have been
431   // accepted but not yet started (+2 on each one). If shutdown has been called,
432   // the lowest bit will be 0 (defaults to 1) and the counter will be even. The
433   // server should not notify on shutdown until the counter is 0 (shutdown is
434   // called and there are no requests that are accepted but not started).
435   Atomic<int> shutdown_refs_{1};
436   bool shutdown_published_ ABSL_GUARDED_BY(mu_global_) = false;
437   std::vector<ShutdownTag> shutdown_tags_ ABSL_GUARDED_BY(mu_global_);
438 
439   std::list<ChannelData*> channels_;
440 
441   std::list<Listener> listeners_;
442   size_t listeners_destroyed_ = 0;
443 
444   // The last time we printed a shutdown progress message.
445   gpr_timespec last_shutdown_message_time_;
446 };
447 
448 }  // namespace grpc_core
449 
450 struct grpc_server {
451   grpc_core::OrphanablePtr<grpc_core::Server> core_server;
452 };
453 
454 // TODO(roth): Eventually, will need a way to modify configuration even after
455 // a connection is established (e.g., to change things like L7 rate
456 // limiting, RBAC, and fault injection configs).  One possible option
457 // would be to do something like ServiceConfig and ConfigSelector, but
458 // that might add unnecessary per-call overhead.  Need to consider other
459 // approaches here.
460 struct grpc_server_config_fetcher {
461  public:
462   class ConnectionManager : public grpc_core::RefCounted<ConnectionManager> {
463    public:
464     // Ownership of \a args is transfered.
465     virtual absl::StatusOr<grpc_channel_args*> UpdateChannelArgsForConnection(
466         grpc_channel_args* args, grpc_endpoint* tcp) = 0;
467   };
468 
469   class WatcherInterface {
470    public:
471     virtual ~WatcherInterface() = default;
472     // UpdateConnectionManager() is invoked by the config fetcher when a new
473     // config is available. Implementations should update the connection manager
474     // and start serving if not already serving.
475     virtual void UpdateConnectionManager(
476         grpc_core::RefCountedPtr<ConnectionManager> manager) = 0;
477     // Implementations should stop serving when this is called. Serving should
478     // only resume when UpdateConfig() is invoked.
479     virtual void StopServing() = 0;
480   };
481 
482   virtual ~grpc_server_config_fetcher() = default;
483 
484   // Ownership of \a args is transferred.
485   virtual void StartWatch(std::string listening_address,
486                           grpc_channel_args* args,
487                           std::unique_ptr<WatcherInterface> watcher) = 0;
488   virtual void CancelWatch(WatcherInterface* watcher) = 0;
489   virtual grpc_pollset_set* interested_parties() = 0;
490 };
491 
492 #endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */
493