1 // 2 // 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPCPP_SERVER_INTERFACE_H 20 #define GRPCPP_SERVER_INTERFACE_H 21 22 #include <grpc/grpc.h> 23 #include <grpc/impl/grpc_types.h> 24 #include <grpc/support/port_platform.h> 25 #include <grpc/support/time.h> 26 #include <grpcpp/impl/call.h> 27 #include <grpcpp/impl/call_hook.h> 28 #include <grpcpp/impl/codegen/interceptor_common.h> 29 #include <grpcpp/impl/completion_queue_tag.h> 30 #include <grpcpp/impl/rpc_service_method.h> 31 #include <grpcpp/server_context.h> 32 #include <grpcpp/support/byte_buffer.h> 33 34 #include "absl/log/absl_check.h" 35 36 namespace grpc { 37 38 class AsyncGenericService; 39 class Channel; 40 class CompletionQueue; 41 class GenericServerContext; 42 class ServerCompletionQueue; 43 class ServerCredentials; 44 class Service; 45 46 /// Models a gRPC server. 47 /// 48 /// Servers are configured and started via \a grpc::ServerBuilder. 49 namespace internal { 50 class ServerAsyncStreamingInterface; 51 } // namespace internal 52 53 class CallbackGenericService; 54 55 namespace experimental { 56 class ServerInterceptorFactoryInterface; 57 class ServerMetricRecorder; 58 } // namespace experimental 59 60 class ServerInterface : public internal::CallHook { 61 public: ~ServerInterface()62 ~ServerInterface() override {} 63 64 /// \a Shutdown does the following things: 65 /// 66 /// 1. Shutdown the server: deactivate all listening ports, mark it in 67 /// "shutdown mode" so that further call Request's or incoming RPC matches 68 /// are no longer allowed. Also return all Request'ed-but-not-yet-active 69 /// calls as failed (!ok). This refers to calls that have been requested 70 /// at the server by the server-side library or application code but that 71 /// have not yet been matched to incoming RPCs from the client. Note that 72 /// this would even include default calls added automatically by the gRPC 73 /// C++ API without the user's input (e.g., "Unimplemented RPC method") 74 /// 75 /// 2. Block until all rpc method handlers invoked automatically by the sync 76 /// API finish. 77 /// 78 /// 3. If all pending calls complete (and all their operations are 79 /// retrieved by Next) before \a deadline expires, this finishes 80 /// gracefully. Otherwise, forcefully cancel all pending calls associated 81 /// with the server after \a deadline expires. In the case of the sync API, 82 /// if the RPC function for a streaming call has already been started and 83 /// takes a week to complete, the RPC function won't be forcefully 84 /// terminated (since that would leave state corrupt and incomplete) and 85 /// the method handler will just keep running (which will prevent the 86 /// server from completing the "join" operation that it needs to do at 87 /// shutdown time). 88 /// 89 /// All completion queue associated with the server (for example, for async 90 /// serving) must be shutdown *after* this method has returned: 91 /// See \a ServerBuilder::AddCompletionQueue for details. 92 /// They must also be drained (by repeated Next) after being shutdown. 93 /// 94 /// \param deadline How long to wait until pending rpcs are forcefully 95 /// terminated. 96 template <class T> Shutdown(const T & deadline)97 void Shutdown(const T& deadline) { 98 ShutdownInternal(TimePoint<T>(deadline).raw_time()); 99 } 100 101 /// Shutdown the server without a deadline and forced cancellation. 102 /// 103 /// All completion queue associated with the server (for example, for async 104 /// serving) must be shutdown *after* this method has returned: 105 /// See \a ServerBuilder::AddCompletionQueue for details. Shutdown()106 void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); } 107 108 /// Block waiting for all work to complete. 109 /// 110 /// \warning The server must be either shutting down or some other thread must 111 /// call \a Shutdown for this function to ever return. 112 virtual void Wait() = 0; 113 114 protected: 115 friend class grpc::Service; 116 117 /// Register a service. This call does not take ownership of the service. 118 /// The service must exist for the lifetime of the Server instance. 119 virtual bool RegisterService(const std::string* host, Service* service) = 0; 120 121 /// Register a generic service. This call does not take ownership of the 122 /// service. The service must exist for the lifetime of the Server instance. 123 virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0; 124 125 /// Register a callback generic service. This call does not take ownership of 126 /// the service. The service must exist for the lifetime of the Server 127 /// instance. May not be abstract since this is a post-1.0 API addition. 128 RegisterCallbackGenericService(CallbackGenericService *)129 virtual void RegisterCallbackGenericService(CallbackGenericService* 130 /*service*/) {} 131 132 /// Tries to bind \a server to the given \a addr. 133 /// 134 /// It can be invoked multiple times. 135 /// 136 /// \param addr The address to try to bind to the server (eg, localhost:1234, 137 /// 192.168.1.1:31416, [::1]:27182, etc.). 138 /// \params creds The credentials associated with the server. 139 /// 140 /// \return bound port number on success, 0 on failure. 141 /// 142 /// \warning It's an error to call this method on an already started server. 143 virtual int AddListeningPort(const std::string& addr, 144 ServerCredentials* creds) = 0; 145 146 /// Start the server. 147 /// 148 /// \param cqs Completion queues for handling asynchronous services. The 149 /// caller is required to keep all completion queues live until the server is 150 /// destroyed. 151 /// \param num_cqs How many completion queues does \a cqs hold. 152 virtual void Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) = 0; 153 154 virtual void ShutdownInternal(gpr_timespec deadline) = 0; 155 156 virtual int max_receive_message_size() const = 0; 157 158 virtual grpc_server* server() = 0; 159 160 void PerformOpsOnCall(internal::CallOpSetInterface* ops, 161 internal::Call* call) override = 0; 162 163 class BaseAsyncRequest : public internal::CompletionQueueTag { 164 public: 165 BaseAsyncRequest(ServerInterface* server, grpc::ServerContext* context, 166 internal::ServerAsyncStreamingInterface* stream, 167 grpc::CompletionQueue* call_cq, 168 grpc::ServerCompletionQueue* notification_cq, void* tag, 169 bool delete_on_finalize); 170 ~BaseAsyncRequest() override; 171 172 bool FinalizeResult(void** tag, bool* status) override; 173 174 private: 175 void ContinueFinalizeResultAfterInterception(); 176 177 protected: 178 ServerInterface* const server_; 179 grpc::ServerContext* const context_; 180 internal::ServerAsyncStreamingInterface* const stream_; 181 grpc::CompletionQueue* const call_cq_; 182 grpc::ServerCompletionQueue* const notification_cq_; 183 void* const tag_; 184 const bool delete_on_finalize_; 185 grpc_call* call_; 186 internal::Call call_wrapper_; 187 internal::InterceptorBatchMethodsImpl interceptor_methods_; 188 bool done_intercepting_; 189 bool call_metric_recording_enabled_; 190 experimental::ServerMetricRecorder* server_metric_recorder_; 191 }; 192 193 /// RegisteredAsyncRequest is not part of the C++ API 194 class RegisteredAsyncRequest : public BaseAsyncRequest { 195 public: 196 RegisteredAsyncRequest(ServerInterface* server, 197 grpc::ServerContext* context, 198 internal::ServerAsyncStreamingInterface* stream, 199 grpc::CompletionQueue* call_cq, 200 grpc::ServerCompletionQueue* notification_cq, 201 void* tag, const char* name, 202 internal::RpcMethod::RpcType type); 203 FinalizeResult(void ** tag,bool * status)204 bool FinalizeResult(void** tag, bool* status) override { 205 // If we are done intercepting, then there is nothing more for us to do 206 if (done_intercepting_) { 207 return BaseAsyncRequest::FinalizeResult(tag, status); 208 } 209 call_wrapper_ = grpc::internal::Call( 210 call_, server_, call_cq_, server_->max_receive_message_size(), 211 context_->set_server_rpc_info(name_, type_, 212 *server_->interceptor_creators())); 213 return BaseAsyncRequest::FinalizeResult(tag, status); 214 } 215 216 protected: 217 void IssueRequest(void* registered_method, grpc_byte_buffer** payload, 218 grpc::ServerCompletionQueue* notification_cq); 219 const char* name_; 220 const internal::RpcMethod::RpcType type_; 221 }; 222 223 class NoPayloadAsyncRequest final : public RegisteredAsyncRequest { 224 public: NoPayloadAsyncRequest(internal::RpcServiceMethod * registered_method,ServerInterface * server,grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag)225 NoPayloadAsyncRequest(internal::RpcServiceMethod* registered_method, 226 ServerInterface* server, grpc::ServerContext* context, 227 internal::ServerAsyncStreamingInterface* stream, 228 grpc::CompletionQueue* call_cq, 229 grpc::ServerCompletionQueue* notification_cq, 230 void* tag) 231 : RegisteredAsyncRequest( 232 server, context, stream, call_cq, notification_cq, tag, 233 registered_method->name(), registered_method->method_type()) { 234 IssueRequest(registered_method->server_tag(), nullptr, notification_cq); 235 } 236 237 // uses RegisteredAsyncRequest::FinalizeResult 238 }; 239 240 template <class Message> 241 class PayloadAsyncRequest final : public RegisteredAsyncRequest { 242 public: PayloadAsyncRequest(internal::RpcServiceMethod * registered_method,ServerInterface * server,grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag,Message * request)243 PayloadAsyncRequest(internal::RpcServiceMethod* registered_method, 244 ServerInterface* server, grpc::ServerContext* context, 245 internal::ServerAsyncStreamingInterface* stream, 246 grpc::CompletionQueue* call_cq, 247 grpc::ServerCompletionQueue* notification_cq, void* tag, 248 Message* request) 249 : RegisteredAsyncRequest( 250 server, context, stream, call_cq, notification_cq, tag, 251 registered_method->name(), registered_method->method_type()), 252 registered_method_(registered_method), 253 request_(request) { 254 IssueRequest(registered_method->server_tag(), payload_.bbuf_ptr(), 255 notification_cq); 256 } 257 ~PayloadAsyncRequest()258 ~PayloadAsyncRequest() override { 259 payload_.Release(); // We do not own the payload_ 260 } 261 FinalizeResult(void ** tag,bool * status)262 bool FinalizeResult(void** tag, bool* status) override { 263 // If we are done intercepting, then there is nothing more for us to do 264 if (done_intercepting_) { 265 return RegisteredAsyncRequest::FinalizeResult(tag, status); 266 } 267 if (*status) { 268 if (!payload_.Valid() || !SerializationTraits<Message>::Deserialize( 269 payload_.bbuf_ptr(), request_) 270 .ok()) { 271 // If deserialization fails, we cancel the call and instantiate 272 // a new instance of ourselves to request another call. We then 273 // return false, which prevents the call from being returned to 274 // the application. 275 grpc_call_cancel_with_status(call_, GRPC_STATUS_INTERNAL, 276 "Unable to parse request", nullptr); 277 grpc_call_unref(call_); 278 new PayloadAsyncRequest(registered_method_, server_, context_, 279 stream_, call_cq_, notification_cq_, tag_, 280 request_); 281 delete this; 282 return false; 283 } 284 } 285 // Set interception point for recv message 286 interceptor_methods_.AddInterceptionHookPoint( 287 experimental::InterceptionHookPoints::POST_RECV_MESSAGE); 288 interceptor_methods_.SetRecvMessage(request_, nullptr); 289 return RegisteredAsyncRequest::FinalizeResult(tag, status); 290 } 291 292 private: 293 internal::RpcServiceMethod* const registered_method_; 294 Message* const request_; 295 ByteBuffer payload_; 296 }; 297 298 class GenericAsyncRequest : public BaseAsyncRequest { 299 public: 300 GenericAsyncRequest(ServerInterface* server, GenericServerContext* context, 301 internal::ServerAsyncStreamingInterface* stream, 302 grpc::CompletionQueue* call_cq, 303 grpc::ServerCompletionQueue* notification_cq, void* tag, 304 bool delete_on_finalize, bool issue_request = true); 305 306 bool FinalizeResult(void** tag, bool* status) override; 307 308 protected: 309 void IssueRequest(); 310 311 private: 312 grpc_call_details call_details_; 313 }; 314 315 template <class Message> RequestAsyncCall(internal::RpcServiceMethod * method,grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag,Message * message)316 void RequestAsyncCall(internal::RpcServiceMethod* method, 317 grpc::ServerContext* context, 318 internal::ServerAsyncStreamingInterface* stream, 319 grpc::CompletionQueue* call_cq, 320 grpc::ServerCompletionQueue* notification_cq, void* tag, 321 Message* message) { 322 ABSL_CHECK(method); 323 new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq, 324 notification_cq, tag, message); 325 } 326 RequestAsyncCall(internal::RpcServiceMethod * method,grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag)327 void RequestAsyncCall(internal::RpcServiceMethod* method, 328 grpc::ServerContext* context, 329 internal::ServerAsyncStreamingInterface* stream, 330 grpc::CompletionQueue* call_cq, 331 grpc::ServerCompletionQueue* notification_cq, 332 void* tag) { 333 ABSL_CHECK(method); 334 new NoPayloadAsyncRequest(method, this, context, stream, call_cq, 335 notification_cq, tag); 336 } 337 RequestAsyncGenericCall(GenericServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag)338 void RequestAsyncGenericCall(GenericServerContext* context, 339 internal::ServerAsyncStreamingInterface* stream, 340 grpc::CompletionQueue* call_cq, 341 grpc::ServerCompletionQueue* notification_cq, 342 void* tag) { 343 new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, 344 tag, true); 345 } 346 347 private: 348 // EXPERIMENTAL 349 // Getter method for the vector of interceptor factory objects. 350 // Returns a nullptr (rather than being pure) since this is a post-1.0 method 351 // and adding a new pure method to an interface would be a breaking change 352 // (even though this is private and non-API) 353 virtual std::vector< 354 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>* interceptor_creators()355 interceptor_creators() { 356 return nullptr; 357 } 358 359 // Whether per-call load reporting is enabled. 360 virtual bool call_metric_recording_enabled() const = 0; 361 362 // Interface to read or update server-wide metrics. Returns null when not set. 363 virtual experimental::ServerMetricRecorder* server_metric_recorder() 364 const = 0; 365 366 // A method to get the callbackable completion queue associated with this 367 // server. If the return value is nullptr, this server doesn't support 368 // callback operations. 369 // TODO(vjpai): Consider a better default like using a global CQ 370 // Returns nullptr (rather than being pure) since this is a post-1.0 method 371 // and adding a new pure method to an interface would be a breaking change 372 // (even though this is private and non-API) CallbackCQ()373 virtual grpc::CompletionQueue* CallbackCQ() { return nullptr; } 374 }; 375 376 } // namespace grpc 377 378 #endif // GRPCPP_SERVER_INTERFACE_H 379