• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_CORE_LIB_SURFACE_SERVER_H
18 #define GRPC_CORE_LIB_SURFACE_SERVER_H
19 
20 #include <grpc/support/port_platform.h>
21 
22 #include <list>
23 #include <vector>
24 
25 #include "absl/types/optional.h"
26 
27 #include <grpc/grpc.h>
28 
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/channel/channel_stack.h"
31 #include "src/core/lib/channel/channelz.h"
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/gprpp/atomic.h"
34 #include "src/core/lib/surface/completion_queue.h"
35 #include "src/core/lib/transport/transport.h"
36 
37 namespace grpc_core {
38 
39 extern TraceFlag grpc_server_channel_trace;
40 
41 class Server : public InternallyRefCounted<Server> {
42  public:
43   // Filter vtable.
44   static const grpc_channel_filter kServerTopFilter;
45 
46   // Opaque type used for registered methods.
47   struct RegisteredMethod;
48 
49   // An object to represent the most relevant characteristics of a
50   // newly-allocated call object when using an AllocatingRequestMatcherBatch.
51   struct BatchCallAllocation {
52     grpc_experimental_completion_queue_functor* tag;
53     grpc_call** call;
54     grpc_metadata_array* initial_metadata;
55     grpc_call_details* details;
56   };
57 
58   // An object to represent the most relevant characteristics of a
59   // newly-allocated call object when using an
60   // AllocatingRequestMatcherRegistered.
61   struct RegisteredCallAllocation {
62     grpc_experimental_completion_queue_functor* tag;
63     grpc_call** call;
64     grpc_metadata_array* initial_metadata;
65     gpr_timespec* deadline;
66     grpc_byte_buffer** optional_payload;
67   };
68 
69   /// Interface for listeners.
70   /// Implementations must override the Orphan() method, which should stop
71   /// listening and initiate destruction of the listener.
72   class ListenerInterface : public Orphanable {
73    public:
74     ~ListenerInterface() override = default;
75 
76     /// Starts listening. This listener may refer to the pollset object beyond
77     /// this call, so it is a pointer rather than a reference.
78     virtual void Start(Server* server,
79                        const std::vector<grpc_pollset*>* pollsets) = 0;
80 
81     /// Returns the channelz node for the listen socket, or null if not
82     /// supported.
83     virtual channelz::ListenSocketNode* channelz_listen_socket_node() const = 0;
84 
85     /// Sets a closure to be invoked by the listener when its destruction
86     /// is complete.
87     virtual void SetOnDestroyDone(grpc_closure* on_destroy_done) = 0;
88   };
89 
90   explicit Server(const grpc_channel_args* args);
91   ~Server() override;
92 
93   void Orphan() override;
94 
channel_args()95   const grpc_channel_args* channel_args() const { return channel_args_; }
default_resource_user()96   grpc_resource_user* default_resource_user() const {
97     return default_resource_user_;
98   }
channelz_node()99   channelz::ServerNode* channelz_node() const { return channelz_node_.get(); }
100 
101   // Do not call this before Start(). Returns the pollsets. The
102   // vector itself is immutable, but the pollsets inside are mutable. The
103   // result is valid for the lifetime of the server.
pollsets()104   const std::vector<grpc_pollset*>& pollsets() const { return pollsets_; }
105 
config_fetcher()106   grpc_server_config_fetcher* config_fetcher() const {
107     return config_fetcher_.get();
108   }
109 
set_config_fetcher(std::unique_ptr<grpc_server_config_fetcher> config_fetcher)110   void set_config_fetcher(
111       std::unique_ptr<grpc_server_config_fetcher> config_fetcher) {
112     config_fetcher_ = std::move(config_fetcher);
113   }
114 
115   bool HasOpenConnections();
116 
117   // Adds a listener to the server.  When the server starts, it will call
118   // the listener's Start() method, and when it shuts down, it will orphan
119   // the listener.
120   void AddListener(OrphanablePtr<ListenerInterface> listener);
121 
122   // Starts listening for connections.
123   void Start();
124 
125   // Sets up a transport.  Creates a channel stack and binds the transport to
126   // the server.  Called from the listener when a new connection is accepted.
127   grpc_error* SetupTransport(
128       grpc_transport* transport, grpc_pollset* accepting_pollset,
129       const grpc_channel_args* args,
130       const RefCountedPtr<channelz::SocketNode>& socket_node,
131       grpc_resource_user* resource_user = nullptr);
132 
133   void RegisterCompletionQueue(grpc_completion_queue* cq);
134 
135   // Functions to specify that a specific registered method or the unregistered
136   // collection should use a specific allocator for request matching.
137   void SetRegisteredMethodAllocator(
138       grpc_completion_queue* cq, void* method_tag,
139       std::function<RegisteredCallAllocation()> allocator);
140   void SetBatchMethodAllocator(grpc_completion_queue* cq,
141                                std::function<BatchCallAllocation()> allocator);
142 
143   RegisteredMethod* RegisterMethod(
144       const char* method, const char* host,
145       grpc_server_register_method_payload_handling payload_handling,
146       uint32_t flags);
147 
148   grpc_call_error RequestCall(grpc_call** call, grpc_call_details* details,
149                               grpc_metadata_array* request_metadata,
150                               grpc_completion_queue* cq_bound_to_call,
151                               grpc_completion_queue* cq_for_notification,
152                               void* tag);
153 
154   grpc_call_error RequestRegisteredCall(
155       RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline,
156       grpc_metadata_array* request_metadata,
157       grpc_byte_buffer** optional_payload,
158       grpc_completion_queue* cq_bound_to_call,
159       grpc_completion_queue* cq_for_notification, void* tag_new);
160 
161   void ShutdownAndNotify(grpc_completion_queue* cq, void* tag);
162 
163   void CancelAllCalls();
164 
165  private:
166   struct RequestedCall;
167 
168   struct ChannelRegisteredMethod {
169     RegisteredMethod* server_registered_method = nullptr;
170     uint32_t flags;
171     bool has_host;
172     ExternallyManagedSlice method;
173     ExternallyManagedSlice host;
174   };
175 
176   class RequestMatcherInterface;
177   class RealRequestMatcher;
178   class AllocatingRequestMatcherBase;
179   class AllocatingRequestMatcherBatch;
180   class AllocatingRequestMatcherRegistered;
181 
182   class ChannelData {
183    public:
184     ChannelData() = default;
185     ~ChannelData();
186 
187     void InitTransport(RefCountedPtr<Server> server, grpc_channel* channel,
188                        size_t cq_idx, grpc_transport* transport,
189                        intptr_t channelz_socket_uuid);
190 
server()191     RefCountedPtr<Server> server() const { return server_; }
channel()192     grpc_channel* channel() const { return channel_; }
cq_idx()193     size_t cq_idx() const { return cq_idx_; }
194 
195     ChannelRegisteredMethod* GetRegisteredMethod(const grpc_slice& host,
196                                                  const grpc_slice& path,
197                                                  bool is_idempotent);
198 
199     // Filter vtable functions.
200     static grpc_error* InitChannelElement(grpc_channel_element* elem,
201                                           grpc_channel_element_args* args);
202     static void DestroyChannelElement(grpc_channel_element* elem);
203 
204    private:
205     class ConnectivityWatcher;
206 
207     static void AcceptStream(void* arg, grpc_transport* /*transport*/,
208                              const void* transport_server_data);
209 
210     void Destroy();
211 
212     static void FinishDestroy(void* arg, grpc_error* error);
213 
214     RefCountedPtr<Server> server_;
215     grpc_channel* channel_;
216     // The index into Server::cqs_ of the CQ used as a starting point for
217     // where to publish new incoming calls.
218     size_t cq_idx_;
219     absl::optional<std::list<ChannelData*>::iterator> list_position_;
220     // A hash-table of the methods and hosts of the registered methods.
221     // TODO(vjpai): Convert this to an STL map type as opposed to a direct
222     // bucket implementation. (Consider performance impact, hash function to
223     // use, etc.)
224     std::unique_ptr<std::vector<ChannelRegisteredMethod>> registered_methods_;
225     uint32_t registered_method_max_probes_;
226     grpc_closure finish_destroy_channel_closure_;
227     intptr_t channelz_socket_uuid_;
228   };
229 
230   class CallData {
231    public:
232     enum class CallState {
233       NOT_STARTED,  // Waiting for metadata.
234       PENDING,      // Initial metadata read, not flow controlled in yet.
235       ACTIVATED,    // Flow controlled in, on completion queue.
236       ZOMBIED,      // Cancelled before being queued.
237     };
238 
239     CallData(grpc_call_element* elem, const grpc_call_element_args& args,
240              RefCountedPtr<Server> server);
241     ~CallData();
242 
243     // Starts the recv_initial_metadata batch on the call.
244     // Invoked from ChannelData::AcceptStream().
245     void Start(grpc_call_element* elem);
246 
247     void SetState(CallState state);
248 
249     // Attempts to move from PENDING to ACTIVATED state.  Returns true
250     // on success.
251     bool MaybeActivate();
252 
253     // Publishes an incoming call to the application after it has been
254     // matched.
255     void Publish(size_t cq_idx, RequestedCall* rc);
256 
257     void KillZombie();
258 
259     void FailCallCreation();
260 
261     // Filter vtable functions.
262     static grpc_error* InitCallElement(grpc_call_element* elem,
263                                        const grpc_call_element_args* args);
264     static void DestroyCallElement(grpc_call_element* elem,
265                                    const grpc_call_final_info* /*final_info*/,
266                                    grpc_closure* /*ignored*/);
267     static void StartTransportStreamOpBatch(
268         grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
269 
270    private:
271     // Helper functions for handling calls at the top of the call stack.
272     static void RecvInitialMetadataBatchComplete(void* arg, grpc_error* error);
273     void StartNewRpc(grpc_call_element* elem);
274     static void PublishNewRpc(void* arg, grpc_error* error);
275 
276     // Functions used inside the call stack.
277     void StartTransportStreamOpBatchImpl(grpc_call_element* elem,
278                                          grpc_transport_stream_op_batch* batch);
279     static void RecvInitialMetadataReady(void* arg, grpc_error* error);
280     static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
281 
282     RefCountedPtr<Server> server_;
283 
284     grpc_call* call_;
285 
286     Atomic<CallState> state_{CallState::NOT_STARTED};
287 
288     absl::optional<grpc_slice> path_;
289     absl::optional<grpc_slice> host_;
290     grpc_millis deadline_ = GRPC_MILLIS_INF_FUTURE;
291 
292     grpc_completion_queue* cq_new_ = nullptr;
293 
294     RequestMatcherInterface* matcher_ = nullptr;
295     grpc_byte_buffer* payload_ = nullptr;
296 
297     grpc_closure kill_zombie_closure_;
298 
299     grpc_metadata_array initial_metadata_ =
300         grpc_metadata_array();  // Zero-initialize the C struct.
301     grpc_closure recv_initial_metadata_batch_complete_;
302 
303     grpc_metadata_batch* recv_initial_metadata_ = nullptr;
304     uint32_t recv_initial_metadata_flags_ = 0;
305     grpc_closure recv_initial_metadata_ready_;
306     grpc_closure* original_recv_initial_metadata_ready_;
307     grpc_error* recv_initial_metadata_error_ = GRPC_ERROR_NONE;
308 
309     bool seen_recv_trailing_metadata_ready_ = false;
310     grpc_closure recv_trailing_metadata_ready_;
311     grpc_closure* original_recv_trailing_metadata_ready_;
312     grpc_error* recv_trailing_metadata_error_ = GRPC_ERROR_NONE;
313 
314     grpc_closure publish_;
315 
316     CallCombiner* call_combiner_;
317   };
318 
319   struct Listener {
ListenerListener320     explicit Listener(OrphanablePtr<ListenerInterface> l)
321         : listener(std::move(l)) {}
322     OrphanablePtr<ListenerInterface> listener;
323     grpc_closure destroy_done;
324   };
325 
326   struct ShutdownTag {
ShutdownTagShutdownTag327     ShutdownTag(void* tag_arg, grpc_completion_queue* cq_arg)
328         : tag(tag_arg), cq(cq_arg) {}
329     void* const tag;
330     grpc_completion_queue* const cq;
331     grpc_cq_completion completion;
332   };
333 
334   static void ListenerDestroyDone(void* arg, grpc_error* error);
335 
DoneShutdownEvent(void * server,grpc_cq_completion *)336   static void DoneShutdownEvent(void* server,
337                                 grpc_cq_completion* /*completion*/) {
338     static_cast<Server*>(server)->Unref();
339   }
340 
341   static void DoneRequestEvent(void* req, grpc_cq_completion* completion);
342 
343   void FailCall(size_t cq_idx, RequestedCall* rc, grpc_error* error);
344   grpc_call_error QueueRequestedCall(size_t cq_idx, RequestedCall* rc);
345 
346   void MaybeFinishShutdown();
347 
348   void KillPendingWorkLocked(grpc_error* error);
349 
350   static grpc_call_error ValidateServerRequest(
351       grpc_completion_queue* cq_for_notification, void* tag,
352       grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
353   grpc_call_error ValidateServerRequestAndCq(
354       size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
355       grpc_byte_buffer** optional_payload, RegisteredMethod* rm);
356 
357   std::vector<grpc_channel*> GetChannelsLocked() const;
358 
359   grpc_channel_args* const channel_args_;
360   grpc_resource_user* default_resource_user_ = nullptr;
361   RefCountedPtr<channelz::ServerNode> channelz_node_;
362   std::unique_ptr<grpc_server_config_fetcher> config_fetcher_;
363 
364   std::vector<grpc_completion_queue*> cqs_;
365   std::vector<grpc_pollset*> pollsets_;
366   bool started_ = false;
367 
368   // The two following mutexes control access to server-state.
369   // mu_global_ controls access to non-call-related state (e.g., channel state).
370   // mu_call_ controls access to call-related state (e.g., the call lists).
371   //
372   // If they are ever required to be nested, you must lock mu_global_
373   // before mu_call_. This is currently used in shutdown processing
374   // (ShutdownAndNotify() and MaybeFinishShutdown()).
375   Mutex mu_global_;  // mutex for server and channel state
376   Mutex mu_call_;    // mutex for call-specific state
377 
378   // startup synchronization: flag is protected by mu_global_, signals whether
379   // we are doing the listener start routine or not.
380   bool starting_ = false;
381   CondVar starting_cv_;
382 
383   std::vector<std::unique_ptr<RegisteredMethod>> registered_methods_;
384 
385   // Request matcher for unregistered methods.
386   std::unique_ptr<RequestMatcherInterface> unregistered_request_matcher_;
387 
388   std::atomic_bool shutdown_flag_{false};
389   bool shutdown_published_ = false;
390   std::vector<ShutdownTag> shutdown_tags_;
391 
392   std::list<ChannelData*> channels_;
393 
394   std::list<Listener> listeners_;
395   size_t listeners_destroyed_ = 0;
396 
397   // The last time we printed a shutdown progress message.
398   gpr_timespec last_shutdown_message_time_;
399 };
400 
401 }  // namespace grpc_core
402 
403 struct grpc_server {
404   grpc_core::OrphanablePtr<grpc_core::Server> core_server;
405 };
406 
407 // TODO(roth): Eventually, will need a way to modify configuration even after
408 // a connection is established (e.g., to change things like L7 rate
409 // limiting, RBAC, and fault injection configs).  One possible option
410 // would be to do something like ServiceConfig and ConfigSelector, but
411 // that might add unnecessary per-call overhead.  Need to consider other
412 // approaches here.
413 struct grpc_server_config_fetcher {
414  public:
415   class WatcherInterface {
416    public:
417     virtual ~WatcherInterface() = default;
418     // Ownership of \a args is transferred.
419     virtual void UpdateConfig(grpc_channel_args* args) = 0;
420   };
421 
422   virtual ~grpc_server_config_fetcher() = default;
423 
424   // Ownership of \a args is transferred.
425   virtual void StartWatch(std::string listening_address,
426                           grpc_channel_args* args,
427                           std::unique_ptr<WatcherInterface> watcher) = 0;
428   virtual void CancelWatch(WatcherInterface* watcher) = 0;
429   virtual grpc_pollset_set* interested_parties() = 0;
430 };
431 
432 #endif /* GRPC_CORE_LIB_SURFACE_SERVER_H */
433