1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H 20 #define GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H 21 22 #include <grpc/impl/codegen/port_platform.h> 23 24 #include <grpc/impl/codegen/grpc_types.h> 25 #include <grpcpp/impl/codegen/byte_buffer.h> 26 #include <grpcpp/impl/codegen/call.h> 27 #include <grpcpp/impl/codegen/call_hook.h> 28 #include <grpcpp/impl/codegen/completion_queue_tag.h> 29 #include <grpcpp/impl/codegen/core_codegen_interface.h> 30 #include <grpcpp/impl/codegen/interceptor_common.h> 31 #include <grpcpp/impl/codegen/rpc_service_method.h> 32 #include <grpcpp/impl/codegen/server_context.h> 33 34 namespace grpc { 35 36 class AsyncGenericService; 37 class Channel; 38 class CompletionQueue; 39 class GenericServerContext; 40 class ServerCompletionQueue; 41 class ServerCredentials; 42 class Service; 43 44 extern CoreCodegenInterface* g_core_codegen_interface; 45 46 /// Models a gRPC server. 47 /// 48 /// Servers are configured and started via \a grpc::ServerBuilder. 49 namespace internal { 50 class ServerAsyncStreamingInterface; 51 } // namespace internal 52 53 #ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL 54 namespace experimental { 55 #endif 56 class CallbackGenericService; 57 #ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL 58 } // namespace experimental 59 #endif 60 61 namespace experimental { 62 class ServerInterceptorFactoryInterface; 63 } // namespace experimental 64 65 class ServerInterface : public internal::CallHook { 66 public: ~ServerInterface()67 ~ServerInterface() override {} 68 69 /// \a Shutdown does the following things: 70 /// 71 /// 1. Shutdown the server: deactivate all listening ports, mark it in 72 /// "shutdown mode" so that further call Request's or incoming RPC matches 73 /// are no longer allowed. Also return all Request'ed-but-not-yet-active 74 /// calls as failed (!ok). This refers to calls that have been requested 75 /// at the server by the server-side library or application code but that 76 /// have not yet been matched to incoming RPCs from the client. Note that 77 /// this would even include default calls added automatically by the gRPC 78 /// C++ API without the user's input (e.g., "Unimplemented RPC method") 79 /// 80 /// 2. Block until all rpc method handlers invoked automatically by the sync 81 /// API finish. 82 /// 83 /// 3. If all pending calls complete (and all their operations are 84 /// retrieved by Next) before \a deadline expires, this finishes 85 /// gracefully. Otherwise, forcefully cancel all pending calls associated 86 /// with the server after \a deadline expires. In the case of the sync API, 87 /// if the RPC function for a streaming call has already been started and 88 /// takes a week to complete, the RPC function won't be forcefully 89 /// terminated (since that would leave state corrupt and incomplete) and 90 /// the method handler will just keep running (which will prevent the 91 /// server from completing the "join" operation that it needs to do at 92 /// shutdown time). 93 /// 94 /// All completion queue associated with the server (for example, for async 95 /// serving) must be shutdown *after* this method has returned: 96 /// See \a ServerBuilder::AddCompletionQueue for details. 97 /// They must also be drained (by repeated Next) after being shutdown. 98 /// 99 /// \param deadline How long to wait until pending rpcs are forcefully 100 /// terminated. 101 template <class T> Shutdown(const T & deadline)102 void Shutdown(const T& deadline) { 103 ShutdownInternal(TimePoint<T>(deadline).raw_time()); 104 } 105 106 /// Shutdown the server without a deadline and forced cancellation. 107 /// 108 /// All completion queue associated with the server (for example, for async 109 /// serving) must be shutdown *after* this method has returned: 110 /// See \a ServerBuilder::AddCompletionQueue for details. Shutdown()111 void Shutdown() { 112 ShutdownInternal( 113 g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_MONOTONIC)); 114 } 115 116 /// Block waiting for all work to complete. 117 /// 118 /// \warning The server must be either shutting down or some other thread must 119 /// call \a Shutdown for this function to ever return. 120 virtual void Wait() = 0; 121 122 protected: 123 friend class ::grpc::Service; 124 125 /// Register a service. This call does not take ownership of the service. 126 /// The service must exist for the lifetime of the Server instance. 127 virtual bool RegisterService(const std::string* host, Service* service) = 0; 128 129 /// Register a generic service. This call does not take ownership of the 130 /// service. The service must exist for the lifetime of the Server instance. 131 virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0; 132 133 #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL 134 /// Register a callback generic service. This call does not take ownership of 135 /// the service. The service must exist for the lifetime of the Server 136 /// instance. May not be abstract since this is a post-1.0 API addition. 137 RegisterCallbackGenericService(CallbackGenericService *)138 virtual void RegisterCallbackGenericService(CallbackGenericService* 139 /*service*/) {} 140 #else 141 /// NOTE: class experimental_registration_interface is not part of the public 142 /// API of this class 143 /// TODO(vjpai): Move these contents to public API when no longer experimental 144 class experimental_registration_interface { 145 public: ~experimental_registration_interface()146 virtual ~experimental_registration_interface() {} 147 /// May not be abstract since this is a post-1.0 API addition RegisterCallbackGenericService(experimental::CallbackGenericService *)148 virtual void RegisterCallbackGenericService( 149 experimental::CallbackGenericService* /*service*/) {} RegisterContextAllocator(std::unique_ptr<ContextAllocator> context_allocator)150 virtual void RegisterContextAllocator( 151 std::unique_ptr<ContextAllocator> context_allocator) {} 152 }; 153 154 /// NOTE: The function experimental_registration() is not stable public API. 155 /// It is a view to the experimental components of this class. It may be 156 /// changed or removed at any time. May not be abstract since this is a 157 /// post-1.0 API addition experimental_registration()158 virtual experimental_registration_interface* experimental_registration() { 159 return nullptr; 160 } 161 #endif 162 163 /// Tries to bind \a server to the given \a addr. 164 /// 165 /// It can be invoked multiple times. 166 /// 167 /// \param addr The address to try to bind to the server (eg, localhost:1234, 168 /// 192.168.1.1:31416, [::1]:27182, etc.). 169 /// \params creds The credentials associated with the server. 170 /// 171 /// \return bound port number on success, 0 on failure. 172 /// 173 /// \warning It's an error to call this method on an already started server. 174 virtual int AddListeningPort(const std::string& addr, 175 ServerCredentials* creds) = 0; 176 177 /// Start the server. 178 /// 179 /// \param cqs Completion queues for handling asynchronous services. The 180 /// caller is required to keep all completion queues live until the server is 181 /// destroyed. 182 /// \param num_cqs How many completion queues does \a cqs hold. 183 virtual void Start(::grpc::ServerCompletionQueue** cqs, size_t num_cqs) = 0; 184 185 virtual void ShutdownInternal(gpr_timespec deadline) = 0; 186 187 virtual int max_receive_message_size() const = 0; 188 189 virtual grpc_server* server() = 0; 190 191 void PerformOpsOnCall(internal::CallOpSetInterface* ops, 192 internal::Call* call) override = 0; 193 194 class BaseAsyncRequest : public internal::CompletionQueueTag { 195 public: 196 BaseAsyncRequest(ServerInterface* server, ::grpc::ServerContext* context, 197 internal::ServerAsyncStreamingInterface* stream, 198 ::grpc::CompletionQueue* call_cq, 199 ::grpc::ServerCompletionQueue* notification_cq, void* tag, 200 bool delete_on_finalize); 201 ~BaseAsyncRequest() override; 202 203 bool FinalizeResult(void** tag, bool* status) override; 204 205 private: 206 void ContinueFinalizeResultAfterInterception(); 207 208 protected: 209 ServerInterface* const server_; 210 ::grpc::ServerContext* const context_; 211 internal::ServerAsyncStreamingInterface* const stream_; 212 ::grpc::CompletionQueue* const call_cq_; 213 ::grpc::ServerCompletionQueue* const notification_cq_; 214 void* const tag_; 215 const bool delete_on_finalize_; 216 grpc_call* call_; 217 internal::Call call_wrapper_; 218 internal::InterceptorBatchMethodsImpl interceptor_methods_; 219 bool done_intercepting_; 220 }; 221 222 /// RegisteredAsyncRequest is not part of the C++ API 223 class RegisteredAsyncRequest : public BaseAsyncRequest { 224 public: 225 RegisteredAsyncRequest(ServerInterface* server, 226 ::grpc::ServerContext* context, 227 internal::ServerAsyncStreamingInterface* stream, 228 ::grpc::CompletionQueue* call_cq, 229 ::grpc::ServerCompletionQueue* notification_cq, 230 void* tag, const char* name, 231 internal::RpcMethod::RpcType type); 232 FinalizeResult(void ** tag,bool * status)233 bool FinalizeResult(void** tag, bool* status) override { 234 /* If we are done intercepting, then there is nothing more for us to do */ 235 if (done_intercepting_) { 236 return BaseAsyncRequest::FinalizeResult(tag, status); 237 } 238 call_wrapper_ = ::grpc::internal::Call( 239 call_, server_, call_cq_, server_->max_receive_message_size(), 240 context_->set_server_rpc_info(name_, type_, 241 *server_->interceptor_creators())); 242 return BaseAsyncRequest::FinalizeResult(tag, status); 243 } 244 245 protected: 246 void IssueRequest(void* registered_method, grpc_byte_buffer** payload, 247 ::grpc::ServerCompletionQueue* notification_cq); 248 const char* name_; 249 const internal::RpcMethod::RpcType type_; 250 }; 251 252 class NoPayloadAsyncRequest final : public RegisteredAsyncRequest { 253 public: NoPayloadAsyncRequest(internal::RpcServiceMethod * registered_method,ServerInterface * server,::grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc::CompletionQueue * call_cq,::grpc::ServerCompletionQueue * notification_cq,void * tag)254 NoPayloadAsyncRequest(internal::RpcServiceMethod* registered_method, 255 ServerInterface* server, 256 ::grpc::ServerContext* context, 257 internal::ServerAsyncStreamingInterface* stream, 258 ::grpc::CompletionQueue* call_cq, 259 ::grpc::ServerCompletionQueue* notification_cq, 260 void* tag) 261 : RegisteredAsyncRequest( 262 server, context, stream, call_cq, notification_cq, tag, 263 registered_method->name(), registered_method->method_type()) { 264 IssueRequest(registered_method->server_tag(), nullptr, notification_cq); 265 } 266 267 // uses RegisteredAsyncRequest::FinalizeResult 268 }; 269 270 template <class Message> 271 class PayloadAsyncRequest final : public RegisteredAsyncRequest { 272 public: PayloadAsyncRequest(internal::RpcServiceMethod * registered_method,ServerInterface * server,::grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc::CompletionQueue * call_cq,::grpc::ServerCompletionQueue * notification_cq,void * tag,Message * request)273 PayloadAsyncRequest(internal::RpcServiceMethod* registered_method, 274 ServerInterface* server, ::grpc::ServerContext* context, 275 internal::ServerAsyncStreamingInterface* stream, 276 ::grpc::CompletionQueue* call_cq, 277 ::grpc::ServerCompletionQueue* notification_cq, 278 void* tag, Message* request) 279 : RegisteredAsyncRequest( 280 server, context, stream, call_cq, notification_cq, tag, 281 registered_method->name(), registered_method->method_type()), 282 registered_method_(registered_method), 283 request_(request) { 284 IssueRequest(registered_method->server_tag(), payload_.bbuf_ptr(), 285 notification_cq); 286 } 287 ~PayloadAsyncRequest()288 ~PayloadAsyncRequest() override { 289 payload_.Release(); // We do not own the payload_ 290 } 291 FinalizeResult(void ** tag,bool * status)292 bool FinalizeResult(void** tag, bool* status) override { 293 /* If we are done intercepting, then there is nothing more for us to do */ 294 if (done_intercepting_) { 295 return RegisteredAsyncRequest::FinalizeResult(tag, status); 296 } 297 if (*status) { 298 if (!payload_.Valid() || !SerializationTraits<Message>::Deserialize( 299 payload_.bbuf_ptr(), request_) 300 .ok()) { 301 // If deserialization fails, we cancel the call and instantiate 302 // a new instance of ourselves to request another call. We then 303 // return false, which prevents the call from being returned to 304 // the application. 305 g_core_codegen_interface->grpc_call_cancel_with_status( 306 call_, GRPC_STATUS_INTERNAL, "Unable to parse request", nullptr); 307 g_core_codegen_interface->grpc_call_unref(call_); 308 new PayloadAsyncRequest(registered_method_, server_, context_, 309 stream_, call_cq_, notification_cq_, tag_, 310 request_); 311 delete this; 312 return false; 313 } 314 } 315 /* Set interception point for recv message */ 316 interceptor_methods_.AddInterceptionHookPoint( 317 experimental::InterceptionHookPoints::POST_RECV_MESSAGE); 318 interceptor_methods_.SetRecvMessage(request_, nullptr); 319 return RegisteredAsyncRequest::FinalizeResult(tag, status); 320 } 321 322 private: 323 internal::RpcServiceMethod* const registered_method_; 324 Message* const request_; 325 ByteBuffer payload_; 326 }; 327 328 class GenericAsyncRequest : public BaseAsyncRequest { 329 public: 330 GenericAsyncRequest(ServerInterface* server, GenericServerContext* context, 331 internal::ServerAsyncStreamingInterface* stream, 332 ::grpc::CompletionQueue* call_cq, 333 ::grpc::ServerCompletionQueue* notification_cq, 334 void* tag, bool delete_on_finalize); 335 336 bool FinalizeResult(void** tag, bool* status) override; 337 338 private: 339 grpc_call_details call_details_; 340 }; 341 342 template <class Message> RequestAsyncCall(internal::RpcServiceMethod * method,::grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc::CompletionQueue * call_cq,::grpc::ServerCompletionQueue * notification_cq,void * tag,Message * message)343 void RequestAsyncCall(internal::RpcServiceMethod* method, 344 ::grpc::ServerContext* context, 345 internal::ServerAsyncStreamingInterface* stream, 346 ::grpc::CompletionQueue* call_cq, 347 ::grpc::ServerCompletionQueue* notification_cq, 348 void* tag, Message* message) { 349 GPR_CODEGEN_ASSERT(method); 350 new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq, 351 notification_cq, tag, message); 352 } 353 RequestAsyncCall(internal::RpcServiceMethod * method,::grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc::CompletionQueue * call_cq,::grpc::ServerCompletionQueue * notification_cq,void * tag)354 void RequestAsyncCall(internal::RpcServiceMethod* method, 355 ::grpc::ServerContext* context, 356 internal::ServerAsyncStreamingInterface* stream, 357 ::grpc::CompletionQueue* call_cq, 358 ::grpc::ServerCompletionQueue* notification_cq, 359 void* tag) { 360 GPR_CODEGEN_ASSERT(method); 361 new NoPayloadAsyncRequest(method, this, context, stream, call_cq, 362 notification_cq, tag); 363 } 364 RequestAsyncGenericCall(GenericServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc::CompletionQueue * call_cq,::grpc::ServerCompletionQueue * notification_cq,void * tag)365 void RequestAsyncGenericCall(GenericServerContext* context, 366 internal::ServerAsyncStreamingInterface* stream, 367 ::grpc::CompletionQueue* call_cq, 368 ::grpc::ServerCompletionQueue* notification_cq, 369 void* tag) { 370 new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, 371 tag, true); 372 } 373 374 private: 375 // EXPERIMENTAL 376 // Getter method for the vector of interceptor factory objects. 377 // Returns a nullptr (rather than being pure) since this is a post-1.0 method 378 // and adding a new pure method to an interface would be a breaking change 379 // (even though this is private and non-API) 380 virtual std::vector< 381 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>* interceptor_creators()382 interceptor_creators() { 383 return nullptr; 384 } 385 386 // EXPERIMENTAL 387 // A method to get the callbackable completion queue associated with this 388 // server. If the return value is nullptr, this server doesn't support 389 // callback operations. 390 // TODO(vjpai): Consider a better default like using a global CQ 391 // Returns nullptr (rather than being pure) since this is a post-1.0 method 392 // and adding a new pure method to an interface would be a breaking change 393 // (even though this is private and non-API) CallbackCQ()394 virtual ::grpc::CompletionQueue* CallbackCQ() { return nullptr; } 395 }; 396 397 } // namespace grpc 398 399 #endif // GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H 400