1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPCPP_SERVER_IMPL_H 20 #define GRPCPP_SERVER_IMPL_H 21 22 #include <list> 23 #include <memory> 24 #include <vector> 25 26 #include <grpc/impl/codegen/port_platform.h> 27 28 #include <grpc/compression.h> 29 #include <grpc/support/atm.h> 30 #include <grpcpp/channel_impl.h> 31 #include <grpcpp/completion_queue_impl.h> 32 #include <grpcpp/health_check_service_interface.h> 33 #include <grpcpp/impl/call.h> 34 #include <grpcpp/impl/codegen/client_interceptor.h> 35 #include <grpcpp/impl/codegen/completion_queue_impl.h> 36 #include <grpcpp/impl/codegen/grpc_library.h> 37 #include <grpcpp/impl/codegen/server_interface.h> 38 #include <grpcpp/impl/rpc_service_method.h> 39 #include <grpcpp/security/server_credentials.h> 40 #include <grpcpp/support/channel_arguments_impl.h> 41 #include <grpcpp/support/config.h> 42 #include <grpcpp/support/status.h> 43 44 struct grpc_server; 45 46 namespace grpc { 47 class AsyncGenericService; 48 49 namespace internal { 50 class ExternalConnectionAcceptorImpl; 51 } // namespace internal 52 53 } // namespace grpc 54 55 namespace grpc_impl { 56 class HealthCheckServiceInterface; 57 class ServerContext; 58 class ServerInitializer; 59 60 /// Represents a gRPC server. 61 /// 62 /// Use a \a grpc::ServerBuilder to create, configure, and start 63 /// \a Server instances. 64 class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { 65 public: 66 ~Server(); 67 68 /// Block until the server shuts down. 69 /// 70 /// \warning The server must be either shutting down or some other thread must 71 /// call \a Shutdown for this function to ever return. 72 void Wait() override; 73 74 /// Global callbacks are a set of hooks that are called when server 75 /// events occur. \a SetGlobalCallbacks method is used to register 76 /// the hooks with gRPC. Note that 77 /// the \a GlobalCallbacks instance will be shared among all 78 /// \a Server instances in an application and can be set exactly 79 /// once per application. 80 class GlobalCallbacks { 81 public: ~GlobalCallbacks()82 virtual ~GlobalCallbacks() {} 83 /// Called before server is created. UpdateArguments(ChannelArguments *)84 virtual void UpdateArguments(ChannelArguments* /*args*/) {} 85 /// Called before application callback for each synchronous server request 86 virtual void PreSynchronousRequest(grpc_impl::ServerContext* context) = 0; 87 /// Called after application callback for each synchronous server request 88 virtual void PostSynchronousRequest(grpc_impl::ServerContext* context) = 0; 89 /// Called before server is started. PreServerStart(Server *)90 virtual void PreServerStart(Server* /*server*/) {} 91 /// Called after a server port is added. AddPort(Server *,const std::string &,grpc::ServerCredentials *,int)92 virtual void AddPort(Server* /*server*/, const std::string& /*addr*/, 93 grpc::ServerCredentials* /*creds*/, int /*port*/) {} 94 }; 95 /// Set the global callback object. Can only be called once per application. 96 /// Does not take ownership of callbacks, and expects the pointed to object 97 /// to be alive until all server objects in the process have been destroyed. 98 /// The same \a GlobalCallbacks object will be used throughout the 99 /// application and is shared among all \a Server objects. 100 static void SetGlobalCallbacks(GlobalCallbacks* callbacks); 101 102 /// Returns a \em raw pointer to the underlying \a grpc_server instance. 103 /// EXPERIMENTAL: for internal/test use only 104 grpc_server* c_server(); 105 106 /// Returns the health check service. GetHealthCheckService()107 grpc::HealthCheckServiceInterface* GetHealthCheckService() const { 108 return health_check_service_.get(); 109 } 110 111 /// Establish a channel for in-process communication 112 std::shared_ptr<Channel> InProcessChannel(const ChannelArguments& args); 113 114 /// NOTE: class experimental_type is not part of the public API of this class. 115 /// TODO(yashykt): Integrate into public API when this is no longer 116 /// experimental. 117 class experimental_type { 118 public: experimental_type(Server * server)119 explicit experimental_type(Server* server) : server_(server) {} 120 121 /// Establish a channel for in-process communication with client 122 /// interceptors 123 std::shared_ptr<Channel> InProcessChannelWithInterceptors( 124 const ChannelArguments& args, 125 std::vector<std::unique_ptr< 126 grpc::experimental::ClientInterceptorFactoryInterface>> 127 interceptor_creators); 128 129 private: 130 Server* server_; 131 }; 132 133 /// NOTE: The function experimental() is not stable public API. It is a view 134 /// to the experimental components of this class. It may be changed or removed 135 /// at any time. experimental()136 experimental_type experimental() { return experimental_type(this); } 137 138 protected: 139 /// Register a service. This call does not take ownership of the service. 140 /// The service must exist for the lifetime of the Server instance. 141 bool RegisterService(const std::string* host, 142 grpc::Service* service) override; 143 144 /// Try binding the server to the given \a addr endpoint 145 /// (port, and optionally including IP address to bind to). 146 /// 147 /// It can be invoked multiple times. Should be used before 148 /// starting the server. 149 /// 150 /// \param addr The address to try to bind to the server (eg, localhost:1234, 151 /// 192.168.1.1:31416, [::1]:27182, etc.). 152 /// \param creds The credentials associated with the server. 153 /// 154 /// \return bound port number on success, 0 on failure. 155 /// 156 /// \warning It is an error to call this method on an already started server. 157 int AddListeningPort(const std::string& addr, 158 grpc::ServerCredentials* creds) override; 159 160 /// NOTE: This is *NOT* a public API. The server constructors are supposed to 161 /// be used by \a ServerBuilder class only. The constructor will be made 162 /// 'private' very soon. 163 /// 164 /// Server constructors. To be used by \a ServerBuilder only. 165 /// 166 /// \param args The channel args 167 /// 168 /// \param sync_server_cqs The completion queues to use if the server is a 169 /// synchronous server (or a hybrid server). The server polls for new RPCs on 170 /// these queues 171 /// 172 /// \param min_pollers The minimum number of polling threads per server 173 /// completion queue (in param sync_server_cqs) to use for listening to 174 /// incoming requests (used only in case of sync server) 175 /// 176 /// \param max_pollers The maximum number of polling threads per server 177 /// completion queue (in param sync_server_cqs) to use for listening to 178 /// incoming requests (used only in case of sync server) 179 /// 180 /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on 181 /// server completion queues passed via sync_server_cqs param. 182 Server(ChannelArguments* args, 183 std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> 184 sync_server_cqs, 185 int min_pollers, int max_pollers, int sync_cq_timeout_msec, 186 std::vector< 187 std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> 188 acceptors, 189 grpc_resource_quota* server_rq = nullptr, 190 std::vector<std::unique_ptr< 191 grpc::experimental::ServerInterceptorFactoryInterface>> 192 interceptor_creators = std::vector<std::unique_ptr< 193 grpc::experimental::ServerInterceptorFactoryInterface>>()); 194 195 /// Start the server. 196 /// 197 /// \param cqs Completion queues for handling asynchronous services. The 198 /// caller is required to keep all completion queues live until the server is 199 /// destroyed. 200 /// \param num_cqs How many completion queues does \a cqs hold. 201 void Start(ServerCompletionQueue** cqs, size_t num_cqs) override; 202 server()203 grpc_server* server() override { return server_; } 204 205 protected: 206 /// NOTE: This method is not part of the public API for this class. set_health_check_service(std::unique_ptr<grpc::HealthCheckServiceInterface> service)207 void set_health_check_service( 208 std::unique_ptr<grpc::HealthCheckServiceInterface> service) { 209 health_check_service_ = std::move(service); 210 } 211 212 /// NOTE: This method is not part of the public API for this class. health_check_service_disabled()213 bool health_check_service_disabled() const { 214 return health_check_service_disabled_; 215 } 216 217 private: 218 std::vector< 219 std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>* interceptor_creators()220 interceptor_creators() override { 221 return &interceptor_creators_; 222 } 223 224 friend class grpc::AsyncGenericService; 225 friend class grpc::ServerBuilder; 226 friend class grpc_impl::ServerInitializer; 227 228 class SyncRequest; 229 class CallbackRequestBase; 230 template <class ServerContextType> 231 class CallbackRequest; 232 class UnimplementedAsyncRequest; 233 class UnimplementedAsyncResponse; 234 235 /// SyncRequestThreadManager is an implementation of ThreadManager. This class 236 /// is responsible for polling for incoming RPCs and calling the RPC handlers. 237 /// This is only used in case of a Sync server (i.e a server exposing a sync 238 /// interface) 239 class SyncRequestThreadManager; 240 241 /// Register a generic service. This call does not take ownership of the 242 /// service. The service must exist for the lifetime of the Server instance. 243 void RegisterAsyncGenericService(grpc::AsyncGenericService* service) override; 244 245 #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL 246 /// Register a callback-based generic service. This call does not take 247 /// ownership of theservice. The service must exist for the lifetime of the 248 /// Server instance. 249 void RegisterCallbackGenericService( 250 grpc::CallbackGenericService* service) override; 251 #else 252 /// NOTE: class experimental_registration_type is not part of the public API 253 /// of this class 254 /// TODO(vjpai): Move these contents to the public API of Server when 255 /// they are no longer experimental 256 class experimental_registration_type final 257 : public experimental_registration_interface { 258 public: experimental_registration_type(Server * server)259 explicit experimental_registration_type(Server* server) : server_(server) {} RegisterCallbackGenericService(grpc::experimental::CallbackGenericService * service)260 void RegisterCallbackGenericService( 261 grpc::experimental::CallbackGenericService* service) override { 262 server_->RegisterCallbackGenericService(service); 263 } 264 265 private: 266 Server* server_; 267 }; 268 269 /// TODO(vjpai): Mark this override when experimental type above is deleted 270 void RegisterCallbackGenericService( 271 grpc::experimental::CallbackGenericService* service); 272 273 /// NOTE: The function experimental_registration() is not stable public API. 274 /// It is a view to the experimental components of this class. It may be 275 /// changed or removed at any time. experimental_registration()276 experimental_registration_interface* experimental_registration() override { 277 return &experimental_registration_; 278 } 279 #endif 280 281 void PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops, 282 grpc::internal::Call* call) override; 283 284 void ShutdownInternal(gpr_timespec deadline) override; 285 max_receive_message_size()286 int max_receive_message_size() const override { 287 return max_receive_message_size_; 288 } 289 290 CompletionQueue* CallbackCQ() override; 291 292 grpc_impl::ServerInitializer* initializer(); 293 294 // Functions to manage the server shutdown ref count. Things that increase 295 // the ref count are the running state of the server (take a ref at start and 296 // drop it at shutdown) and each running callback RPC. 297 void Ref(); 298 void UnrefWithPossibleNotify() /* LOCKS_EXCLUDED(mu_) */; 299 void UnrefAndWaitLocked() /* EXCLUSIVE_LOCKS_REQUIRED(mu_) */; 300 301 std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> 302 acceptors_; 303 304 // A vector of interceptor factory objects. 305 // This should be destroyed after health_check_service_ and this requirement 306 // is satisfied by declaring interceptor_creators_ before 307 // health_check_service_. (C++ mandates that member objects be destroyed in 308 // the reverse order of initialization.) 309 std::vector< 310 std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> 311 interceptor_creators_; 312 313 int max_receive_message_size_; 314 315 /// The following completion queues are ONLY used in case of Sync API 316 /// i.e. if the server has any services with sync methods. The server uses 317 /// these completion queues to poll for new RPCs 318 std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> 319 sync_server_cqs_; 320 321 /// List of \a ThreadManager instances (one for each cq in 322 /// the \a sync_server_cqs) 323 std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_; 324 325 #ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL 326 // For registering experimental callback generic service; remove when that 327 // method longer experimental 328 experimental_registration_type experimental_registration_{this}; 329 #endif 330 331 // Server status 332 grpc::internal::Mutex mu_; 333 bool started_; 334 bool shutdown_; 335 bool shutdown_notified_; // Was notify called on the shutdown_cv_ 336 grpc::internal::CondVar shutdown_done_cv_; 337 bool shutdown_done_ = false; 338 std::atomic_int shutdown_refs_outstanding_{1}; 339 340 grpc::internal::CondVar shutdown_cv_; 341 342 std::shared_ptr<GlobalCallbacks> global_callbacks_; 343 344 std::vector<std::string> services_; 345 bool has_async_generic_service_ = false; 346 bool has_callback_generic_service_ = false; 347 bool has_callback_methods_ = false; 348 349 // Pointer to the wrapped grpc_server. 350 grpc_server* server_; 351 352 std::unique_ptr<grpc_impl::ServerInitializer> server_initializer_; 353 354 std::unique_ptr<grpc::HealthCheckServiceInterface> health_check_service_; 355 bool health_check_service_disabled_; 356 357 // When appropriate, use a default callback generic service to handle 358 // unimplemented methods 359 #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL 360 std::unique_ptr<grpc::CallbackGenericService> unimplemented_service_; 361 #else 362 std::unique_ptr<grpc::experimental::CallbackGenericService> 363 unimplemented_service_; 364 #endif 365 366 // A special handler for resource exhausted in sync case 367 std::unique_ptr<grpc::internal::MethodHandler> resource_exhausted_handler_; 368 369 // Handler for callback generic service, if any 370 std::unique_ptr<grpc::internal::MethodHandler> generic_handler_; 371 372 // callback_cq_ references the callbackable completion queue associated 373 // with this server (if any). It is set on the first call to CallbackCQ(). 374 // It is _not owned_ by the server; ownership belongs with its internal 375 // shutdown callback tag (invoked when the CQ is fully shutdown). 376 CompletionQueue* callback_cq_ /* GUARDED_BY(mu_) */ = nullptr; 377 378 // List of CQs passed in by user that must be Shutdown only after Server is 379 // Shutdown. Even though this is only used with NDEBUG, instantiate it in all 380 // cases since otherwise the size will be inconsistent. 381 std::vector<CompletionQueue*> cq_list_; 382 }; 383 384 } // namespace grpc_impl 385 386 #endif // GRPCPP_SERVER_IMPL_H 387