1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H 20 #define GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H 21 22 #include <grpc/impl/codegen/grpc_types.h> 23 #include <grpcpp/impl/codegen/byte_buffer.h> 24 #include <grpcpp/impl/codegen/call_hook.h> 25 #include <grpcpp/impl/codegen/completion_queue_tag.h> 26 #include <grpcpp/impl/codegen/core_codegen_interface.h> 27 #include <grpcpp/impl/codegen/rpc_service_method.h> 28 29 namespace grpc { 30 31 class AsyncGenericService; 32 class Channel; 33 class GenericServerContext; 34 class ServerCompletionQueue; 35 class ServerContext; 36 class ServerCredentials; 37 class Service; 38 39 extern CoreCodegenInterface* g_core_codegen_interface; 40 41 /// Models a gRPC server. 42 /// 43 /// Servers are configured and started via \a grpc::ServerBuilder. 44 namespace internal { 45 class ServerAsyncStreamingInterface; 46 } // namespace internal 47 48 class ServerInterface : public internal::CallHook { 49 public: ~ServerInterface()50 virtual ~ServerInterface() {} 51 52 /// \a Shutdown does the following things: 53 /// 54 /// 1. Shutdown the server: deactivate all listening ports, mark it in 55 /// "shutdown mode" so that further call Request's or incoming RPC matches 56 /// are no longer allowed. Also return all Request'ed-but-not-yet-active 57 /// calls as failed (!ok). This refers to calls that have been requested 58 /// at the server by the server-side library or application code but that 59 /// have not yet been matched to incoming RPCs from the client. Note that 60 /// this would even include default calls added automatically by the gRPC 61 /// C++ API without the user's input (e.g., "Unimplemented RPC method") 62 /// 63 /// 2. Block until all rpc method handlers invoked automatically by the sync 64 /// API finish. 65 /// 66 /// 3. If all pending calls complete (and all their operations are 67 /// retrieved by Next) before \a deadline expires, this finishes 68 /// gracefully. Otherwise, forcefully cancel all pending calls associated 69 /// with the server after \a deadline expires. In the case of the sync API, 70 /// if the RPC function for a streaming call has already been started and 71 /// takes a week to complete, the RPC function won't be forcefully 72 /// terminated (since that would leave state corrupt and incomplete) and 73 /// the method handler will just keep running (which will prevent the 74 /// server from completing the "join" operation that it needs to do at 75 /// shutdown time). 76 /// 77 /// All completion queue associated with the server (for example, for async 78 /// serving) must be shutdown *after* this method has returned: 79 /// See \a ServerBuilder::AddCompletionQueue for details. 80 /// They must also be drained (by repeated Next) after being shutdown. 81 /// 82 /// \param deadline How long to wait until pending rpcs are forcefully 83 /// terminated. 84 template <class T> Shutdown(const T & deadline)85 void Shutdown(const T& deadline) { 86 ShutdownInternal(TimePoint<T>(deadline).raw_time()); 87 } 88 89 /// Shutdown the server without a deadline and forced cancellation. 90 /// 91 /// All completion queue associated with the server (for example, for async 92 /// serving) must be shutdown *after* this method has returned: 93 /// See \a ServerBuilder::AddCompletionQueue for details. Shutdown()94 void Shutdown() { 95 ShutdownInternal( 96 g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_MONOTONIC)); 97 } 98 99 /// Block waiting for all work to complete. 100 /// 101 /// \warning The server must be either shutting down or some other thread must 102 /// call \a Shutdown for this function to ever return. 103 virtual void Wait() = 0; 104 105 protected: 106 friend class ::grpc::Service; 107 108 /// Register a service. This call does not take ownership of the service. 109 /// The service must exist for the lifetime of the Server instance. 110 virtual bool RegisterService(const grpc::string* host, Service* service) = 0; 111 112 /// Register a generic service. This call does not take ownership of the 113 /// service. The service must exist for the lifetime of the Server instance. 114 virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0; 115 116 /// Tries to bind \a server to the given \a addr. 117 /// 118 /// It can be invoked multiple times. 119 /// 120 /// \param addr The address to try to bind to the server (eg, localhost:1234, 121 /// 192.168.1.1:31416, [::1]:27182, etc.). 122 /// \params creds The credentials associated with the server. 123 /// 124 /// \return bound port number on sucess, 0 on failure. 125 /// 126 /// \warning It's an error to call this method on an already started server. 127 virtual int AddListeningPort(const grpc::string& addr, 128 ServerCredentials* creds) = 0; 129 130 /// Start the server. 131 /// 132 /// \param cqs Completion queues for handling asynchronous services. The 133 /// caller is required to keep all completion queues live until the server is 134 /// destroyed. 135 /// \param num_cqs How many completion queues does \a cqs hold. 136 virtual void Start(ServerCompletionQueue** cqs, size_t num_cqs) = 0; 137 138 virtual void ShutdownInternal(gpr_timespec deadline) = 0; 139 140 virtual int max_receive_message_size() const = 0; 141 142 virtual grpc_server* server() = 0; 143 144 virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops, 145 internal::Call* call) = 0; 146 147 class BaseAsyncRequest : public internal::CompletionQueueTag { 148 public: 149 BaseAsyncRequest(ServerInterface* server, ServerContext* context, 150 internal::ServerAsyncStreamingInterface* stream, 151 CompletionQueue* call_cq, void* tag, 152 bool delete_on_finalize); 153 virtual ~BaseAsyncRequest(); 154 155 bool FinalizeResult(void** tag, bool* status) override; 156 157 protected: 158 ServerInterface* const server_; 159 ServerContext* const context_; 160 internal::ServerAsyncStreamingInterface* const stream_; 161 CompletionQueue* const call_cq_; 162 void* const tag_; 163 const bool delete_on_finalize_; 164 grpc_call* call_; 165 }; 166 167 class RegisteredAsyncRequest : public BaseAsyncRequest { 168 public: 169 RegisteredAsyncRequest(ServerInterface* server, ServerContext* context, 170 internal::ServerAsyncStreamingInterface* stream, 171 CompletionQueue* call_cq, void* tag); 172 173 // uses BaseAsyncRequest::FinalizeResult 174 175 protected: 176 void IssueRequest(void* registered_method, grpc_byte_buffer** payload, 177 ServerCompletionQueue* notification_cq); 178 }; 179 180 class NoPayloadAsyncRequest final : public RegisteredAsyncRequest { 181 public: NoPayloadAsyncRequest(void * registered_method,ServerInterface * server,ServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag)182 NoPayloadAsyncRequest(void* registered_method, ServerInterface* server, 183 ServerContext* context, 184 internal::ServerAsyncStreamingInterface* stream, 185 CompletionQueue* call_cq, 186 ServerCompletionQueue* notification_cq, void* tag) 187 : RegisteredAsyncRequest(server, context, stream, call_cq, tag) { 188 IssueRequest(registered_method, nullptr, notification_cq); 189 } 190 191 // uses RegisteredAsyncRequest::FinalizeResult 192 }; 193 194 template <class Message> 195 class PayloadAsyncRequest final : public RegisteredAsyncRequest { 196 public: PayloadAsyncRequest(void * registered_method,ServerInterface * server,ServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag,Message * request)197 PayloadAsyncRequest(void* registered_method, ServerInterface* server, 198 ServerContext* context, 199 internal::ServerAsyncStreamingInterface* stream, 200 CompletionQueue* call_cq, 201 ServerCompletionQueue* notification_cq, void* tag, 202 Message* request) 203 : RegisteredAsyncRequest(server, context, stream, call_cq, tag), 204 registered_method_(registered_method), 205 server_(server), 206 context_(context), 207 stream_(stream), 208 call_cq_(call_cq), 209 notification_cq_(notification_cq), 210 tag_(tag), 211 request_(request) { 212 IssueRequest(registered_method, payload_.bbuf_ptr(), notification_cq); 213 } 214 ~PayloadAsyncRequest()215 ~PayloadAsyncRequest() { 216 payload_.Release(); // We do not own the payload_ 217 } 218 FinalizeResult(void ** tag,bool * status)219 bool FinalizeResult(void** tag, bool* status) override { 220 if (*status) { 221 if (!payload_.Valid() || !SerializationTraits<Message>::Deserialize( 222 payload_.bbuf_ptr(), request_) 223 .ok()) { 224 // If deserialization fails, we cancel the call and instantiate 225 // a new instance of ourselves to request another call. We then 226 // return false, which prevents the call from being returned to 227 // the application. 228 g_core_codegen_interface->grpc_call_cancel_with_status( 229 call_, GRPC_STATUS_INTERNAL, "Unable to parse request", nullptr); 230 g_core_codegen_interface->grpc_call_unref(call_); 231 new PayloadAsyncRequest(registered_method_, server_, context_, 232 stream_, call_cq_, notification_cq_, tag_, 233 request_); 234 delete this; 235 return false; 236 } 237 } 238 return RegisteredAsyncRequest::FinalizeResult(tag, status); 239 } 240 241 private: 242 void* const registered_method_; 243 ServerInterface* const server_; 244 ServerContext* const context_; 245 internal::ServerAsyncStreamingInterface* const stream_; 246 CompletionQueue* const call_cq_; 247 ServerCompletionQueue* const notification_cq_; 248 void* const tag_; 249 Message* const request_; 250 ByteBuffer payload_; 251 }; 252 253 class GenericAsyncRequest : public BaseAsyncRequest { 254 public: 255 GenericAsyncRequest(ServerInterface* server, GenericServerContext* context, 256 internal::ServerAsyncStreamingInterface* stream, 257 CompletionQueue* call_cq, 258 ServerCompletionQueue* notification_cq, void* tag, 259 bool delete_on_finalize); 260 261 bool FinalizeResult(void** tag, bool* status) override; 262 263 private: 264 grpc_call_details call_details_; 265 }; 266 267 template <class Message> RequestAsyncCall(internal::RpcServiceMethod * method,ServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag,Message * message)268 void RequestAsyncCall(internal::RpcServiceMethod* method, 269 ServerContext* context, 270 internal::ServerAsyncStreamingInterface* stream, 271 CompletionQueue* call_cq, 272 ServerCompletionQueue* notification_cq, void* tag, 273 Message* message) { 274 GPR_CODEGEN_ASSERT(method); 275 new PayloadAsyncRequest<Message>(method->server_tag(), this, context, 276 stream, call_cq, notification_cq, tag, 277 message); 278 } 279 RequestAsyncCall(internal::RpcServiceMethod * method,ServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag)280 void RequestAsyncCall(internal::RpcServiceMethod* method, 281 ServerContext* context, 282 internal::ServerAsyncStreamingInterface* stream, 283 CompletionQueue* call_cq, 284 ServerCompletionQueue* notification_cq, void* tag) { 285 GPR_CODEGEN_ASSERT(method); 286 new NoPayloadAsyncRequest(method->server_tag(), this, context, stream, 287 call_cq, notification_cq, tag); 288 } 289 RequestAsyncGenericCall(GenericServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag)290 void RequestAsyncGenericCall(GenericServerContext* context, 291 internal::ServerAsyncStreamingInterface* stream, 292 CompletionQueue* call_cq, 293 ServerCompletionQueue* notification_cq, 294 void* tag) { 295 new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, 296 tag, true); 297 } 298 }; 299 300 } // namespace grpc 301 302 #endif // GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H 303