1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPCPP_SERVER_H 20 #define GRPCPP_SERVER_H 21 22 #include <condition_variable> 23 #include <list> 24 #include <memory> 25 #include <mutex> 26 #include <vector> 27 28 #include <grpc/compression.h> 29 #include <grpcpp/completion_queue.h> 30 #include <grpcpp/impl/call.h> 31 #include <grpcpp/impl/codegen/grpc_library.h> 32 #include <grpcpp/impl/codegen/server_interface.h> 33 #include <grpcpp/impl/rpc_service_method.h> 34 #include <grpcpp/security/server_credentials.h> 35 #include <grpcpp/support/channel_arguments.h> 36 #include <grpcpp/support/config.h> 37 #include <grpcpp/support/status.h> 38 39 struct grpc_server; 40 41 namespace grpc { 42 43 class AsyncGenericService; 44 class HealthCheckServiceInterface; 45 class ServerContext; 46 class ServerInitializer; 47 48 /// Represents a gRPC server. 49 /// 50 /// Use a \a grpc::ServerBuilder to create, configure, and start 51 /// \a Server instances. 52 class Server : public ServerInterface, private GrpcLibraryCodegen { 53 public: 54 ~Server(); 55 56 /// Block until the server shuts down. 57 /// 58 /// \warning The server must be either shutting down or some other thread must 59 /// call \a Shutdown for this function to ever return. 60 void Wait() override; 61 62 /// Global callbacks are a set of hooks that are called when server 63 /// events occur. \a SetGlobalCallbacks method is used to register 64 /// the hooks with gRPC. Note that 65 /// the \a GlobalCallbacks instance will be shared among all 66 /// \a Server instances in an application and can be set exactly 67 /// once per application. 68 class GlobalCallbacks { 69 public: ~GlobalCallbacks()70 virtual ~GlobalCallbacks() {} 71 /// Called before server is created. UpdateArguments(ChannelArguments * args)72 virtual void UpdateArguments(ChannelArguments* args) {} 73 /// Called before application callback for each synchronous server request 74 virtual void PreSynchronousRequest(ServerContext* context) = 0; 75 /// Called after application callback for each synchronous server request 76 virtual void PostSynchronousRequest(ServerContext* context) = 0; 77 /// Called before server is started. PreServerStart(Server * server)78 virtual void PreServerStart(Server* server) {} 79 /// Called after a server port is added. AddPort(Server * server,const grpc::string & addr,ServerCredentials * creds,int port)80 virtual void AddPort(Server* server, const grpc::string& addr, 81 ServerCredentials* creds, int port) {} 82 }; 83 /// Set the global callback object. Can only be called once per application. 84 /// Does not take ownership of callbacks, and expects the pointed to object 85 /// to be alive until all server objects in the process have been destroyed. 86 /// The same \a GlobalCallbacks object will be used throughout the 87 /// application and is shared among all \a Server objects. 88 static void SetGlobalCallbacks(GlobalCallbacks* callbacks); 89 90 /// Returns a \em raw pointer to the underlying \a grpc_server instance. 91 /// EXPERIMENTAL: for internal/test use only 92 grpc_server* c_server(); 93 94 /// Returns the health check service. GetHealthCheckService()95 HealthCheckServiceInterface* GetHealthCheckService() const { 96 return health_check_service_.get(); 97 } 98 99 /// Establish a channel for in-process communication 100 std::shared_ptr<Channel> InProcessChannel(const ChannelArguments& args); 101 102 protected: 103 /// Register a service. This call does not take ownership of the service. 104 /// The service must exist for the lifetime of the Server instance. 105 bool RegisterService(const grpc::string* host, Service* service) override; 106 107 /// Try binding the server to the given \a addr endpoint 108 /// (port, and optionally including IP address to bind to). 109 /// 110 /// It can be invoked multiple times. Should be used before 111 /// starting the server. 112 /// 113 /// \param addr The address to try to bind to the server (eg, localhost:1234, 114 /// 192.168.1.1:31416, [::1]:27182, etc.). 115 /// \param creds The credentials associated with the server. 116 /// 117 /// \return bound port number on success, 0 on failure. 118 /// 119 /// \warning It is an error to call this method on an already started server. 120 int AddListeningPort(const grpc::string& addr, 121 ServerCredentials* creds) override; 122 123 /// NOTE: This is *NOT* a public API. The server constructors are supposed to 124 /// be used by \a ServerBuilder class only. The constructor will be made 125 /// 'private' very soon. 126 /// 127 /// Server constructors. To be used by \a ServerBuilder only. 128 /// 129 /// \param max_message_size Maximum message length that the channel can 130 /// receive. 131 /// 132 /// \param args The channel args 133 /// 134 /// \param sync_server_cqs The completion queues to use if the server is a 135 /// synchronous server (or a hybrid server). The server polls for new RPCs on 136 /// these queues 137 /// 138 /// \param min_pollers The minimum number of polling threads per server 139 /// completion queue (in param sync_server_cqs) to use for listening to 140 /// incoming requests (used only in case of sync server) 141 /// 142 /// \param max_pollers The maximum number of polling threads per server 143 /// completion queue (in param sync_server_cqs) to use for listening to 144 /// incoming requests (used only in case of sync server) 145 /// 146 /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on 147 /// server completion queues passed via sync_server_cqs param. 148 Server(int max_message_size, ChannelArguments* args, 149 std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> 150 sync_server_cqs, 151 int min_pollers, int max_pollers, int sync_cq_timeout_msec, 152 grpc_resource_quota* server_rq = nullptr); 153 154 /// Start the server. 155 /// 156 /// \param cqs Completion queues for handling asynchronous services. The 157 /// caller is required to keep all completion queues live until the server is 158 /// destroyed. 159 /// \param num_cqs How many completion queues does \a cqs hold. 160 void Start(ServerCompletionQueue** cqs, size_t num_cqs) override; 161 server()162 grpc_server* server() override { return server_; }; 163 164 private: 165 friend class AsyncGenericService; 166 friend class ServerBuilder; 167 friend class ServerInitializer; 168 169 class SyncRequest; 170 class UnimplementedAsyncRequest; 171 class UnimplementedAsyncResponse; 172 173 /// SyncRequestThreadManager is an implementation of ThreadManager. This class 174 /// is responsible for polling for incoming RPCs and calling the RPC handlers. 175 /// This is only used in case of a Sync server (i.e a server exposing a sync 176 /// interface) 177 class SyncRequestThreadManager; 178 179 /// Register a generic service. This call does not take ownership of the 180 /// service. The service must exist for the lifetime of the Server instance. 181 void RegisterAsyncGenericService(AsyncGenericService* service) override; 182 183 void PerformOpsOnCall(internal::CallOpSetInterface* ops, 184 internal::Call* call) override; 185 186 void ShutdownInternal(gpr_timespec deadline) override; 187 max_receive_message_size()188 int max_receive_message_size() const override { 189 return max_receive_message_size_; 190 }; 191 192 ServerInitializer* initializer(); 193 194 const int max_receive_message_size_; 195 196 /// The following completion queues are ONLY used in case of Sync API 197 /// i.e. if the server has any services with sync methods. The server uses 198 /// these completion queues to poll for new RPCs 199 std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> 200 sync_server_cqs_; 201 202 /// List of \a ThreadManager instances (one for each cq in 203 /// the \a sync_server_cqs) 204 std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_; 205 206 // Server status 207 std::mutex mu_; 208 bool started_; 209 bool shutdown_; 210 bool shutdown_notified_; // Was notify called on the shutdown_cv_ 211 212 std::condition_variable shutdown_cv_; 213 214 std::shared_ptr<GlobalCallbacks> global_callbacks_; 215 216 std::vector<grpc::string> services_; 217 bool has_generic_service_; 218 219 // Pointer to the wrapped grpc_server. 220 grpc_server* server_; 221 222 std::unique_ptr<ServerInitializer> server_initializer_; 223 224 std::unique_ptr<HealthCheckServiceInterface> health_check_service_; 225 bool health_check_service_disabled_; 226 227 // A special handler for resource exhausted in sync case 228 std::unique_ptr<internal::MethodHandler> resource_exhausted_handler_; 229 }; 230 231 } // namespace grpc 232 233 #endif // GRPCPP_SERVER_H 234