• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_SERVER_H
20 #define GRPCPP_SERVER_H
21 
22 #include <list>
23 #include <memory>
24 #include <vector>
25 
26 #include <grpc/impl/codegen/port_platform.h>
27 
28 #include <grpc/compression.h>
29 #include <grpc/support/atm.h>
30 #include <grpcpp/channel.h>
31 #include <grpcpp/completion_queue.h>
32 #include <grpcpp/health_check_service_interface.h>
33 #include <grpcpp/impl/call.h>
34 #include <grpcpp/impl/codegen/client_interceptor.h>
35 #include <grpcpp/impl/codegen/completion_queue.h>
36 #include <grpcpp/impl/codegen/grpc_library.h>
37 #include <grpcpp/impl/codegen/server_interface.h>
38 #include <grpcpp/impl/rpc_service_method.h>
39 #include <grpcpp/security/server_credentials.h>
40 #include <grpcpp/support/channel_arguments.h>
41 #include <grpcpp/support/config.h>
42 #include <grpcpp/support/status.h>
43 
44 struct grpc_server;
45 
46 namespace grpc {
47 class AsyncGenericService;
48 class ServerContext;
49 class ServerInitializer;
50 
51 namespace internal {
52 class ExternalConnectionAcceptorImpl;
53 }  // namespace internal
54 
55 /// Represents a gRPC server.
56 ///
57 /// Use a \a grpc::ServerBuilder to create, configure, and start
58 /// \a Server instances.
59 class Server : public ServerInterface, private GrpcLibraryCodegen {
60  public:
61   ~Server() ABSL_LOCKS_EXCLUDED(mu_) override;
62 
63   /// Block until the server shuts down.
64   ///
65   /// \warning The server must be either shutting down or some other thread must
66   /// call \a Shutdown for this function to ever return.
67   void Wait() ABSL_LOCKS_EXCLUDED(mu_) override;
68 
69   /// Global callbacks are a set of hooks that are called when server
70   /// events occur.  \a SetGlobalCallbacks method is used to register
71   /// the hooks with gRPC.  Note that
72   /// the \a GlobalCallbacks instance will be shared among all
73   /// \a Server instances in an application and can be set exactly
74   /// once per application.
75   class GlobalCallbacks {
76    public:
~GlobalCallbacks()77     virtual ~GlobalCallbacks() {}
78     /// Called before server is created.
UpdateArguments(ChannelArguments *)79     virtual void UpdateArguments(ChannelArguments* /*args*/) {}
80     /// Called before application callback for each synchronous server request
81     virtual void PreSynchronousRequest(ServerContext* context) = 0;
82     /// Called after application callback for each synchronous server request
83     virtual void PostSynchronousRequest(ServerContext* context) = 0;
84     /// Called before server is started.
PreServerStart(Server *)85     virtual void PreServerStart(Server* /*server*/) {}
86     /// Called after a server port is added.
AddPort(Server *,const std::string &,ServerCredentials *,int)87     virtual void AddPort(Server* /*server*/, const std::string& /*addr*/,
88                          ServerCredentials* /*creds*/, int /*port*/) {}
89   };
90   /// Set the global callback object. Can only be called once per application.
91   /// Does not take ownership of callbacks, and expects the pointed to object
92   /// to be alive until all server objects in the process have been destroyed.
93   /// The same \a GlobalCallbacks object will be used throughout the
94   /// application and is shared among all \a Server objects.
95   static void SetGlobalCallbacks(GlobalCallbacks* callbacks);
96 
97   /// Returns a \em raw pointer to the underlying \a grpc_server instance.
98   /// EXPERIMENTAL:  for internal/test use only
99   grpc_server* c_server();
100 
101   /// Returns the health check service.
GetHealthCheckService()102   HealthCheckServiceInterface* GetHealthCheckService() const {
103     return health_check_service_.get();
104   }
105 
106   /// Establish a channel for in-process communication
107   std::shared_ptr<Channel> InProcessChannel(const ChannelArguments& args);
108 
109   /// NOTE: class experimental_type is not part of the public API of this class.
110   /// TODO(yashykt): Integrate into public API when this is no longer
111   /// experimental.
112   class experimental_type {
113    public:
experimental_type(Server * server)114     explicit experimental_type(Server* server) : server_(server) {}
115 
116     /// Establish a channel for in-process communication with client
117     /// interceptors
118     std::shared_ptr<Channel> InProcessChannelWithInterceptors(
119         const ChannelArguments& args,
120         std::vector<
121             std::unique_ptr<experimental::ClientInterceptorFactoryInterface>>
122             interceptor_creators);
123 
124    private:
125     Server* server_;
126   };
127 
128   /// NOTE: The function experimental() is not stable public API. It is a view
129   /// to the experimental components of this class. It may be changed or removed
130   /// at any time.
experimental()131   experimental_type experimental() { return experimental_type(this); }
132 
133  protected:
134   /// Register a service. This call does not take ownership of the service.
135   /// The service must exist for the lifetime of the Server instance.
136   bool RegisterService(const std::string* addr, Service* service) override;
137 
138   /// Try binding the server to the given \a addr endpoint
139   /// (port, and optionally including IP address to bind to).
140   ///
141   /// It can be invoked multiple times. Should be used before
142   /// starting the server.
143   ///
144   /// \param addr The address to try to bind to the server (eg, localhost:1234,
145   /// 192.168.1.1:31416, [::1]:27182, etc.).
146   /// \param creds The credentials associated with the server.
147   ///
148   /// \return bound port number on success, 0 on failure.
149   ///
150   /// \warning It is an error to call this method on an already started server.
151   int AddListeningPort(const std::string& addr,
152                        ServerCredentials* creds) override;
153 
154   /// NOTE: This is *NOT* a public API. The server constructors are supposed to
155   /// be used by \a ServerBuilder class only. The constructor will be made
156   /// 'private' very soon.
157   ///
158   /// Server constructors. To be used by \a ServerBuilder only.
159   ///
160   /// \param args The channel args
161   ///
162   /// \param sync_server_cqs The completion queues to use if the server is a
163   /// synchronous server (or a hybrid server). The server polls for new RPCs on
164   /// these queues
165   ///
166   /// \param min_pollers The minimum number of polling threads per server
167   /// completion queue (in param sync_server_cqs) to use for listening to
168   /// incoming requests (used only in case of sync server)
169   ///
170   /// \param max_pollers The maximum number of polling threads per server
171   /// completion queue (in param sync_server_cqs) to use for listening to
172   /// incoming requests (used only in case of sync server)
173   ///
174   /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on
175   /// server completion queues passed via sync_server_cqs param.
176   Server(ChannelArguments* args,
177          std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
178              sync_server_cqs,
179          int min_pollers, int max_pollers, int sync_cq_timeout_msec,
180          std::vector<std::shared_ptr<internal::ExternalConnectionAcceptorImpl>>
181              acceptors,
182          grpc_server_config_fetcher* server_config_fetcher = nullptr,
183          grpc_resource_quota* server_rq = nullptr,
184          std::vector<
185              std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
186              interceptor_creators = std::vector<std::unique_ptr<
187                  experimental::ServerInterceptorFactoryInterface>>());
188 
189   /// Start the server.
190   ///
191   /// \param cqs Completion queues for handling asynchronous services. The
192   /// caller is required to keep all completion queues live until the server is
193   /// destroyed.
194   /// \param num_cqs How many completion queues does \a cqs hold.
195   void Start(ServerCompletionQueue** cqs, size_t num_cqs) override;
196 
server()197   grpc_server* server() override { return server_; }
198 
199  protected:
200   /// NOTE: This method is not part of the public API for this class.
set_health_check_service(std::unique_ptr<HealthCheckServiceInterface> service)201   void set_health_check_service(
202       std::unique_ptr<HealthCheckServiceInterface> service) {
203     health_check_service_ = std::move(service);
204   }
205 
context_allocator()206   ContextAllocator* context_allocator() { return context_allocator_.get(); }
207 
208   /// NOTE: This method is not part of the public API for this class.
health_check_service_disabled()209   bool health_check_service_disabled() const {
210     return health_check_service_disabled_;
211   }
212 
213  private:
214   std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>*
interceptor_creators()215   interceptor_creators() override {
216     return &interceptor_creators_;
217   }
218 
219   friend class AsyncGenericService;
220   friend class ServerBuilder;
221   friend class ServerInitializer;
222 
223   class SyncRequest;
224   class CallbackRequestBase;
225   template <class ServerContextType>
226   class CallbackRequest;
227   class UnimplementedAsyncRequest;
228   class UnimplementedAsyncResponse;
229 
230   /// SyncRequestThreadManager is an implementation of ThreadManager. This class
231   /// is responsible for polling for incoming RPCs and calling the RPC handlers.
232   /// This is only used in case of a Sync server (i.e a server exposing a sync
233   /// interface)
234   class SyncRequestThreadManager;
235 
236   /// Register a generic service. This call does not take ownership of the
237   /// service. The service must exist for the lifetime of the Server instance.
238   void RegisterAsyncGenericService(AsyncGenericService* service) override;
239 
240 #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
241   /// Register a callback-based generic service. This call does not take
242   /// ownership of theservice. The service must exist for the lifetime of the
243   /// Server instance.
244   void RegisterCallbackGenericService(CallbackGenericService* service) override;
245 
RegisterContextAllocator(std::unique_ptr<ContextAllocator> context_allocator)246   void RegisterContextAllocator(
247       std::unique_ptr<ContextAllocator> context_allocator) {
248     context_allocator_ = std::move(context_allocator);
249   }
250 
251 #else
252   /// NOTE: class experimental_registration_type is not part of the public API
253   /// of this class
254   /// TODO(vjpai): Move these contents to the public API of Server when
255   ///              they are no longer experimental
256   class experimental_registration_type final
257       : public experimental_registration_interface {
258    public:
experimental_registration_type(Server * server)259     explicit experimental_registration_type(Server* server) : server_(server) {}
RegisterCallbackGenericService(experimental::CallbackGenericService * service)260     void RegisterCallbackGenericService(
261         experimental::CallbackGenericService* service) override {
262       server_->RegisterCallbackGenericService(service);
263     }
264 
RegisterContextAllocator(std::unique_ptr<ContextAllocator> context_allocator)265     void RegisterContextAllocator(
266         std::unique_ptr<ContextAllocator> context_allocator) override {
267       server_->context_allocator_ = std::move(context_allocator);
268     }
269 
270    private:
271     Server* server_;
272   };
273 
274   /// TODO(vjpai): Mark this override when experimental type above is deleted
275   void RegisterCallbackGenericService(
276       experimental::CallbackGenericService* service);
277 
278   /// NOTE: The function experimental_registration() is not stable public API.
279   /// It is a view to the experimental components of this class. It may be
280   /// changed or removed at any time.
experimental_registration()281   experimental_registration_interface* experimental_registration() override {
282     return &experimental_registration_;
283   }
284 #endif
285 
286   void PerformOpsOnCall(internal::CallOpSetInterface* ops,
287                         internal::Call* call) override;
288 
289   void ShutdownInternal(gpr_timespec deadline)
290       ABSL_LOCKS_EXCLUDED(mu_) override;
291 
max_receive_message_size()292   int max_receive_message_size() const override {
293     return max_receive_message_size_;
294   }
295 
296   CompletionQueue* CallbackCQ() ABSL_LOCKS_EXCLUDED(mu_) override;
297 
298   ServerInitializer* initializer();
299 
300   // Functions to manage the server shutdown ref count. Things that increase
301   // the ref count are the running state of the server (take a ref at start and
302   // drop it at shutdown) and each running callback RPC.
303   void Ref();
304   void UnrefWithPossibleNotify() ABSL_LOCKS_EXCLUDED(mu_);
305   void UnrefAndWaitLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
306 
307   std::vector<std::shared_ptr<internal::ExternalConnectionAcceptorImpl>>
308       acceptors_;
309 
310   // A vector of interceptor factory objects.
311   // This should be destroyed after health_check_service_ and this requirement
312   // is satisfied by declaring interceptor_creators_ before
313   // health_check_service_. (C++ mandates that member objects be destroyed in
314   // the reverse order of initialization.)
315   std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
316       interceptor_creators_;
317 
318   int max_receive_message_size_;
319 
320   /// The following completion queues are ONLY used in case of Sync API
321   /// i.e. if the server has any services with sync methods. The server uses
322   /// these completion queues to poll for new RPCs
323   std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
324       sync_server_cqs_;
325 
326   /// List of \a ThreadManager instances (one for each cq in
327   /// the \a sync_server_cqs)
328   std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_;
329 
330 #ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL
331   // For registering experimental callback generic service; remove when that
332   // method longer experimental
333   experimental_registration_type experimental_registration_{this};
334 #endif
335 
336   // Server status
337   internal::Mutex mu_;
338   bool started_;
339   bool shutdown_ ABSL_GUARDED_BY(mu_);
340   bool shutdown_notified_
341       ABSL_GUARDED_BY(mu_);  // Was notify called on the shutdown_cv_
342   internal::CondVar shutdown_done_cv_;
343   bool shutdown_done_ ABSL_GUARDED_BY(mu_) = false;
344   std::atomic_int shutdown_refs_outstanding_{1};
345 
346   internal::CondVar shutdown_cv_;
347 
348   std::shared_ptr<GlobalCallbacks> global_callbacks_;
349 
350   std::vector<std::string> services_;
351   bool has_async_generic_service_ = false;
352   bool has_callback_generic_service_ = false;
353   bool has_callback_methods_ = false;
354 
355   // Pointer to the wrapped grpc_server.
356   grpc_server* server_;
357 
358   std::unique_ptr<ServerInitializer> server_initializer_;
359 
360   std::unique_ptr<ContextAllocator> context_allocator_;
361 
362   std::unique_ptr<HealthCheckServiceInterface> health_check_service_;
363   bool health_check_service_disabled_;
364 
365   // When appropriate, use a default callback generic service to handle
366   // unimplemented methods
367 #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
368   std::unique_ptr<CallbackGenericService> unimplemented_service_;
369 #else
370   std::unique_ptr<experimental::CallbackGenericService> unimplemented_service_;
371 #endif
372 
373   // A special handler for resource exhausted in sync case
374   std::unique_ptr<internal::MethodHandler> resource_exhausted_handler_;
375 
376   // Handler for callback generic service, if any
377   std::unique_ptr<internal::MethodHandler> generic_handler_;
378 
379   // callback_cq_ references the callbackable completion queue associated
380   // with this server (if any). It is set on the first call to CallbackCQ().
381   // It is _not owned_ by the server; ownership belongs with its internal
382   // shutdown callback tag (invoked when the CQ is fully shutdown).
383   std::atomic<CompletionQueue*> callback_cq_{nullptr};
384 
385   // List of CQs passed in by user that must be Shutdown only after Server is
386   // Shutdown.  Even though this is only used with NDEBUG, instantiate it in all
387   // cases since otherwise the size will be inconsistent.
388   std::vector<CompletionQueue*> cq_list_;
389 };
390 
391 }  // namespace grpc
392 
393 #endif  // GRPCPP_SERVER_H
394