1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPCPP_SERVER_H 20 #define GRPCPP_SERVER_H 21 22 #include <list> 23 #include <memory> 24 #include <vector> 25 26 #include <grpc/impl/codegen/port_platform.h> 27 28 #include <grpc/compression.h> 29 #include <grpc/support/atm.h> 30 #include <grpcpp/channel.h> 31 #include <grpcpp/completion_queue.h> 32 #include <grpcpp/health_check_service_interface.h> 33 #include <grpcpp/impl/call.h> 34 #include <grpcpp/impl/codegen/client_interceptor.h> 35 #include <grpcpp/impl/codegen/completion_queue.h> 36 #include <grpcpp/impl/codegen/grpc_library.h> 37 #include <grpcpp/impl/codegen/server_interface.h> 38 #include <grpcpp/impl/rpc_service_method.h> 39 #include <grpcpp/security/server_credentials.h> 40 #include <grpcpp/support/channel_arguments.h> 41 #include <grpcpp/support/config.h> 42 #include <grpcpp/support/status.h> 43 44 struct grpc_server; 45 46 namespace grpc { 47 class AsyncGenericService; 48 class ServerContext; 49 class ServerInitializer; 50 51 namespace internal { 52 class ExternalConnectionAcceptorImpl; 53 } // namespace internal 54 55 /// Represents a gRPC server. 56 /// 57 /// Use a \a grpc::ServerBuilder to create, configure, and start 58 /// \a Server instances. 59 class Server : public ServerInterface, private GrpcLibraryCodegen { 60 public: 61 ~Server() override; 62 63 /// Block until the server shuts down. 64 /// 65 /// \warning The server must be either shutting down or some other thread must 66 /// call \a Shutdown for this function to ever return. 67 void Wait() override; 68 69 /// Global callbacks are a set of hooks that are called when server 70 /// events occur. \a SetGlobalCallbacks method is used to register 71 /// the hooks with gRPC. Note that 72 /// the \a GlobalCallbacks instance will be shared among all 73 /// \a Server instances in an application and can be set exactly 74 /// once per application. 75 class GlobalCallbacks { 76 public: ~GlobalCallbacks()77 virtual ~GlobalCallbacks() {} 78 /// Called before server is created. UpdateArguments(ChannelArguments *)79 virtual void UpdateArguments(ChannelArguments* /*args*/) {} 80 /// Called before application callback for each synchronous server request 81 virtual void PreSynchronousRequest(ServerContext* context) = 0; 82 /// Called after application callback for each synchronous server request 83 virtual void PostSynchronousRequest(ServerContext* context) = 0; 84 /// Called before server is started. PreServerStart(Server *)85 virtual void PreServerStart(Server* /*server*/) {} 86 /// Called after a server port is added. AddPort(Server *,const std::string &,ServerCredentials *,int)87 virtual void AddPort(Server* /*server*/, const std::string& /*addr*/, 88 ServerCredentials* /*creds*/, int /*port*/) {} 89 }; 90 /// Set the global callback object. Can only be called once per application. 91 /// Does not take ownership of callbacks, and expects the pointed to object 92 /// to be alive until all server objects in the process have been destroyed. 93 /// The same \a GlobalCallbacks object will be used throughout the 94 /// application and is shared among all \a Server objects. 95 static void SetGlobalCallbacks(GlobalCallbacks* callbacks); 96 97 /// Returns a \em raw pointer to the underlying \a grpc_server instance. 98 /// EXPERIMENTAL: for internal/test use only 99 grpc_server* c_server(); 100 101 /// Returns the health check service. GetHealthCheckService()102 HealthCheckServiceInterface* GetHealthCheckService() const { 103 return health_check_service_.get(); 104 } 105 106 /// Establish a channel for in-process communication 107 std::shared_ptr<Channel> InProcessChannel(const ChannelArguments& args); 108 109 /// NOTE: class experimental_type is not part of the public API of this class. 110 /// TODO(yashykt): Integrate into public API when this is no longer 111 /// experimental. 112 class experimental_type { 113 public: experimental_type(Server * server)114 explicit experimental_type(Server* server) : server_(server) {} 115 116 /// Establish a channel for in-process communication with client 117 /// interceptors 118 std::shared_ptr<Channel> InProcessChannelWithInterceptors( 119 const ChannelArguments& args, 120 std::vector< 121 std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> 122 interceptor_creators); 123 124 private: 125 Server* server_; 126 }; 127 128 /// NOTE: The function experimental() is not stable public API. It is a view 129 /// to the experimental components of this class. It may be changed or removed 130 /// at any time. experimental()131 experimental_type experimental() { return experimental_type(this); } 132 133 protected: 134 /// Register a service. This call does not take ownership of the service. 135 /// The service must exist for the lifetime of the Server instance. 136 bool RegisterService(const std::string* addr, Service* service) override; 137 138 /// Try binding the server to the given \a addr endpoint 139 /// (port, and optionally including IP address to bind to). 140 /// 141 /// It can be invoked multiple times. Should be used before 142 /// starting the server. 143 /// 144 /// \param addr The address to try to bind to the server (eg, localhost:1234, 145 /// 192.168.1.1:31416, [::1]:27182, etc.). 146 /// \param creds The credentials associated with the server. 147 /// 148 /// \return bound port number on success, 0 on failure. 149 /// 150 /// \warning It is an error to call this method on an already started server. 151 int AddListeningPort(const std::string& addr, 152 ServerCredentials* creds) override; 153 154 /// NOTE: This is *NOT* a public API. The server constructors are supposed to 155 /// be used by \a ServerBuilder class only. The constructor will be made 156 /// 'private' very soon. 157 /// 158 /// Server constructors. To be used by \a ServerBuilder only. 159 /// 160 /// \param args The channel args 161 /// 162 /// \param sync_server_cqs The completion queues to use if the server is a 163 /// synchronous server (or a hybrid server). The server polls for new RPCs on 164 /// these queues 165 /// 166 /// \param min_pollers The minimum number of polling threads per server 167 /// completion queue (in param sync_server_cqs) to use for listening to 168 /// incoming requests (used only in case of sync server) 169 /// 170 /// \param max_pollers The maximum number of polling threads per server 171 /// completion queue (in param sync_server_cqs) to use for listening to 172 /// incoming requests (used only in case of sync server) 173 /// 174 /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on 175 /// server completion queues passed via sync_server_cqs param. 176 Server(ChannelArguments* args, 177 std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> 178 sync_server_cqs, 179 int min_pollers, int max_pollers, int sync_cq_timeout_msec, 180 std::vector<std::shared_ptr<internal::ExternalConnectionAcceptorImpl>> 181 acceptors, 182 grpc_server_config_fetcher* server_config_fetcher = nullptr, 183 grpc_resource_quota* server_rq = nullptr, 184 std::vector< 185 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> 186 interceptor_creators = std::vector<std::unique_ptr< 187 experimental::ServerInterceptorFactoryInterface>>()); 188 189 /// Start the server. 190 /// 191 /// \param cqs Completion queues for handling asynchronous services. The 192 /// caller is required to keep all completion queues live until the server is 193 /// destroyed. 194 /// \param num_cqs How many completion queues does \a cqs hold. 195 void Start(ServerCompletionQueue** cqs, size_t num_cqs) override; 196 server()197 grpc_server* server() override { return server_; } 198 199 protected: 200 /// NOTE: This method is not part of the public API for this class. set_health_check_service(std::unique_ptr<HealthCheckServiceInterface> service)201 void set_health_check_service( 202 std::unique_ptr<HealthCheckServiceInterface> service) { 203 health_check_service_ = std::move(service); 204 } 205 context_allocator()206 ContextAllocator* context_allocator() { return context_allocator_.get(); } 207 208 /// NOTE: This method is not part of the public API for this class. health_check_service_disabled()209 bool health_check_service_disabled() const { 210 return health_check_service_disabled_; 211 } 212 213 private: 214 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>* interceptor_creators()215 interceptor_creators() override { 216 return &interceptor_creators_; 217 } 218 219 friend class AsyncGenericService; 220 friend class ServerBuilder; 221 friend class ServerInitializer; 222 223 class SyncRequest; 224 class CallbackRequestBase; 225 template <class ServerContextType> 226 class CallbackRequest; 227 class UnimplementedAsyncRequest; 228 class UnimplementedAsyncResponse; 229 230 /// SyncRequestThreadManager is an implementation of ThreadManager. This class 231 /// is responsible for polling for incoming RPCs and calling the RPC handlers. 232 /// This is only used in case of a Sync server (i.e a server exposing a sync 233 /// interface) 234 class SyncRequestThreadManager; 235 236 /// Register a generic service. This call does not take ownership of the 237 /// service. The service must exist for the lifetime of the Server instance. 238 void RegisterAsyncGenericService(AsyncGenericService* service) override; 239 240 #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL 241 /// Register a callback-based generic service. This call does not take 242 /// ownership of theservice. The service must exist for the lifetime of the 243 /// Server instance. 244 void RegisterCallbackGenericService(CallbackGenericService* service) override; 245 RegisterContextAllocator(std::unique_ptr<ContextAllocator> context_allocator)246 void RegisterContextAllocator( 247 std::unique_ptr<ContextAllocator> context_allocator) { 248 context_allocator_ = std::move(context_allocator); 249 } 250 251 #else 252 /// NOTE: class experimental_registration_type is not part of the public API 253 /// of this class 254 /// TODO(vjpai): Move these contents to the public API of Server when 255 /// they are no longer experimental 256 class experimental_registration_type final 257 : public experimental_registration_interface { 258 public: experimental_registration_type(Server * server)259 explicit experimental_registration_type(Server* server) : server_(server) {} RegisterCallbackGenericService(experimental::CallbackGenericService * service)260 void RegisterCallbackGenericService( 261 experimental::CallbackGenericService* service) override { 262 server_->RegisterCallbackGenericService(service); 263 } 264 RegisterContextAllocator(std::unique_ptr<ContextAllocator> context_allocator)265 void RegisterContextAllocator( 266 std::unique_ptr<ContextAllocator> context_allocator) override { 267 server_->context_allocator_ = std::move(context_allocator); 268 } 269 270 private: 271 Server* server_; 272 }; 273 274 /// TODO(vjpai): Mark this override when experimental type above is deleted 275 void RegisterCallbackGenericService( 276 experimental::CallbackGenericService* service); 277 278 /// NOTE: The function experimental_registration() is not stable public API. 279 /// It is a view to the experimental components of this class. It may be 280 /// changed or removed at any time. experimental_registration()281 experimental_registration_interface* experimental_registration() override { 282 return &experimental_registration_; 283 } 284 #endif 285 286 void PerformOpsOnCall(internal::CallOpSetInterface* ops, 287 internal::Call* call) override; 288 289 void ShutdownInternal(gpr_timespec deadline) override; 290 max_receive_message_size()291 int max_receive_message_size() const override { 292 return max_receive_message_size_; 293 } 294 295 CompletionQueue* CallbackCQ() override; 296 297 ServerInitializer* initializer(); 298 299 // Functions to manage the server shutdown ref count. Things that increase 300 // the ref count are the running state of the server (take a ref at start and 301 // drop it at shutdown) and each running callback RPC. 302 void Ref(); 303 void UnrefWithPossibleNotify() /* LOCKS_EXCLUDED(mu_) */; 304 void UnrefAndWaitLocked() /* EXCLUSIVE_LOCKS_REQUIRED(mu_) */; 305 306 std::vector<std::shared_ptr<internal::ExternalConnectionAcceptorImpl>> 307 acceptors_; 308 309 // A vector of interceptor factory objects. 310 // This should be destroyed after health_check_service_ and this requirement 311 // is satisfied by declaring interceptor_creators_ before 312 // health_check_service_. (C++ mandates that member objects be destroyed in 313 // the reverse order of initialization.) 314 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> 315 interceptor_creators_; 316 317 int max_receive_message_size_; 318 319 /// The following completion queues are ONLY used in case of Sync API 320 /// i.e. if the server has any services with sync methods. The server uses 321 /// these completion queues to poll for new RPCs 322 std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> 323 sync_server_cqs_; 324 325 /// List of \a ThreadManager instances (one for each cq in 326 /// the \a sync_server_cqs) 327 std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_; 328 329 #ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL 330 // For registering experimental callback generic service; remove when that 331 // method longer experimental 332 experimental_registration_type experimental_registration_{this}; 333 #endif 334 335 // Server status 336 internal::Mutex mu_; 337 bool started_; 338 bool shutdown_; 339 bool shutdown_notified_; // Was notify called on the shutdown_cv_ 340 internal::CondVar shutdown_done_cv_; 341 bool shutdown_done_ = false; 342 std::atomic_int shutdown_refs_outstanding_{1}; 343 344 internal::CondVar shutdown_cv_; 345 346 std::shared_ptr<GlobalCallbacks> global_callbacks_; 347 348 std::vector<std::string> services_; 349 bool has_async_generic_service_ = false; 350 bool has_callback_generic_service_ = false; 351 bool has_callback_methods_ = false; 352 353 // Pointer to the wrapped grpc_server. 354 grpc_server* server_; 355 356 std::unique_ptr<ServerInitializer> server_initializer_; 357 358 std::unique_ptr<ContextAllocator> context_allocator_; 359 360 std::unique_ptr<HealthCheckServiceInterface> health_check_service_; 361 bool health_check_service_disabled_; 362 363 // When appropriate, use a default callback generic service to handle 364 // unimplemented methods 365 #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL 366 std::unique_ptr<CallbackGenericService> unimplemented_service_; 367 #else 368 std::unique_ptr<experimental::CallbackGenericService> unimplemented_service_; 369 #endif 370 371 // A special handler for resource exhausted in sync case 372 std::unique_ptr<internal::MethodHandler> resource_exhausted_handler_; 373 374 // Handler for callback generic service, if any 375 std::unique_ptr<internal::MethodHandler> generic_handler_; 376 377 // callback_cq_ references the callbackable completion queue associated 378 // with this server (if any). It is set on the first call to CallbackCQ(). 379 // It is _not owned_ by the server; ownership belongs with its internal 380 // shutdown callback tag (invoked when the CQ is fully shutdown). 381 CompletionQueue* callback_cq_ /* GUARDED_BY(mu_) */ = nullptr; 382 383 // List of CQs passed in by user that must be Shutdown only after Server is 384 // Shutdown. Even though this is only used with NDEBUG, instantiate it in all 385 // cases since otherwise the size will be inconsistent. 386 std::vector<CompletionQueue*> cq_list_; 387 }; 388 389 } // namespace grpc 390 391 #endif // GRPCPP_SERVER_H 392