• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
20 #define GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
21 
22 #include <grpc/impl/codegen/port_platform.h>
23 
24 #include <grpc/impl/codegen/grpc_types.h>
25 #include <grpcpp/impl/codegen/byte_buffer.h>
26 #include <grpcpp/impl/codegen/call.h>
27 #include <grpcpp/impl/codegen/call_hook.h>
28 #include <grpcpp/impl/codegen/completion_queue_tag.h>
29 #include <grpcpp/impl/codegen/core_codegen_interface.h>
30 #include <grpcpp/impl/codegen/interceptor_common.h>
31 #include <grpcpp/impl/codegen/rpc_service_method.h>
32 #include <grpcpp/impl/codegen/server_context_impl.h>
33 
34 namespace grpc_impl {
35 
36 class Channel;
37 class CompletionQueue;
38 class ServerCompletionQueue;
39 class ServerCredentials;
40 }  // namespace grpc_impl
41 namespace grpc {
42 
43 class AsyncGenericService;
44 class GenericServerContext;
45 class Service;
46 
47 extern CoreCodegenInterface* g_core_codegen_interface;
48 
49 /// Models a gRPC server.
50 ///
51 /// Servers are configured and started via \a grpc::ServerBuilder.
52 namespace internal {
53 class ServerAsyncStreamingInterface;
54 }  // namespace internal
55 
56 #ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL
57 namespace experimental {
58 #endif
59 class CallbackGenericService;
60 #ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL
61 }  // namespace experimental
62 #endif
63 
64 namespace experimental {
65 class ServerInterceptorFactoryInterface;
66 }  // namespace experimental
67 
68 class ServerInterface : public internal::CallHook {
69  public:
~ServerInterface()70   virtual ~ServerInterface() {}
71 
72   /// \a Shutdown does the following things:
73   ///
74   /// 1. Shutdown the server: deactivate all listening ports, mark it in
75   ///    "shutdown mode" so that further call Request's or incoming RPC matches
76   ///    are no longer allowed. Also return all Request'ed-but-not-yet-active
77   ///    calls as failed (!ok). This refers to calls that have been requested
78   ///    at the server by the server-side library or application code but that
79   ///    have not yet been matched to incoming RPCs from the client. Note that
80   ///    this would even include default calls added automatically by the gRPC
81   ///    C++ API without the user's input (e.g., "Unimplemented RPC method")
82   ///
83   /// 2. Block until all rpc method handlers invoked automatically by the sync
84   ///    API finish.
85   ///
86   /// 3. If all pending calls complete (and all their operations are
87   ///    retrieved by Next) before \a deadline expires, this finishes
88   ///    gracefully. Otherwise, forcefully cancel all pending calls associated
89   ///    with the server after \a deadline expires. In the case of the sync API,
90   ///    if the RPC function for a streaming call has already been started and
91   ///    takes a week to complete, the RPC function won't be forcefully
92   ///    terminated (since that would leave state corrupt and incomplete) and
93   ///    the method handler will just keep running (which will prevent the
94   ///    server from completing the "join" operation that it needs to do at
95   ///    shutdown time).
96   ///
97   /// All completion queue associated with the server (for example, for async
98   /// serving) must be shutdown *after* this method has returned:
99   /// See \a ServerBuilder::AddCompletionQueue for details.
100   /// They must also be drained (by repeated Next) after being shutdown.
101   ///
102   /// \param deadline How long to wait until pending rpcs are forcefully
103   /// terminated.
104   template <class T>
Shutdown(const T & deadline)105   void Shutdown(const T& deadline) {
106     ShutdownInternal(TimePoint<T>(deadline).raw_time());
107   }
108 
109   /// Shutdown the server without a deadline and forced cancellation.
110   ///
111   /// All completion queue associated with the server (for example, for async
112   /// serving) must be shutdown *after* this method has returned:
113   /// See \a ServerBuilder::AddCompletionQueue for details.
Shutdown()114   void Shutdown() {
115     ShutdownInternal(
116         g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_MONOTONIC));
117   }
118 
119   /// Block waiting for all work to complete.
120   ///
121   /// \warning The server must be either shutting down or some other thread must
122   /// call \a Shutdown for this function to ever return.
123   virtual void Wait() = 0;
124 
125  protected:
126   friend class ::grpc::Service;
127 
128   /// Register a service. This call does not take ownership of the service.
129   /// The service must exist for the lifetime of the Server instance.
130   virtual bool RegisterService(const std::string* host, Service* service) = 0;
131 
132   /// Register a generic service. This call does not take ownership of the
133   /// service. The service must exist for the lifetime of the Server instance.
134   virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0;
135 
136 #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
137   /// Register a callback generic service. This call does not take ownership of
138   /// the  service. The service must exist for the lifetime of the Server
139   /// instance. May not be abstract since this is a post-1.0 API addition.
140 
RegisterCallbackGenericService(CallbackGenericService *)141   virtual void RegisterCallbackGenericService(CallbackGenericService*
142                                               /*service*/) {}
143 #else
144   /// NOTE: class experimental_registration_interface is not part of the public
145   /// API of this class
146   /// TODO(vjpai): Move these contents to public API when no longer experimental
147   class experimental_registration_interface {
148    public:
~experimental_registration_interface()149     virtual ~experimental_registration_interface() {}
150     /// May not be abstract since this is a post-1.0 API addition
RegisterCallbackGenericService(experimental::CallbackGenericService *)151     virtual void RegisterCallbackGenericService(
152         experimental::CallbackGenericService* /*service*/) {}
153   };
154 
155   /// NOTE: The function experimental_registration() is not stable public API.
156   /// It is a view to the experimental components of this class. It may be
157   /// changed or removed at any time. May not be abstract since this is a
158   /// post-1.0 API addition
experimental_registration()159   virtual experimental_registration_interface* experimental_registration() {
160     return nullptr;
161   }
162 #endif
163 
164   /// Tries to bind \a server to the given \a addr.
165   ///
166   /// It can be invoked multiple times.
167   ///
168   /// \param addr The address to try to bind to the server (eg, localhost:1234,
169   /// 192.168.1.1:31416, [::1]:27182, etc.).
170   /// \params creds The credentials associated with the server.
171   ///
172   /// \return bound port number on success, 0 on failure.
173   ///
174   /// \warning It's an error to call this method on an already started server.
175   virtual int AddListeningPort(const std::string& addr,
176                                grpc_impl::ServerCredentials* creds) = 0;
177 
178   /// Start the server.
179   ///
180   /// \param cqs Completion queues for handling asynchronous services. The
181   /// caller is required to keep all completion queues live until the server is
182   /// destroyed.
183   /// \param num_cqs How many completion queues does \a cqs hold.
184   virtual void Start(::grpc_impl::ServerCompletionQueue** cqs,
185                      size_t num_cqs) = 0;
186 
187   virtual void ShutdownInternal(gpr_timespec deadline) = 0;
188 
189   virtual int max_receive_message_size() const = 0;
190 
191   virtual grpc_server* server() = 0;
192 
193   virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops,
194                                 internal::Call* call) = 0;
195 
196   class BaseAsyncRequest : public internal::CompletionQueueTag {
197    public:
198     BaseAsyncRequest(ServerInterface* server,
199                      ::grpc_impl::ServerContext* context,
200                      internal::ServerAsyncStreamingInterface* stream,
201                      ::grpc_impl::CompletionQueue* call_cq,
202                      ::grpc_impl::ServerCompletionQueue* notification_cq,
203                      void* tag, bool delete_on_finalize);
204     virtual ~BaseAsyncRequest();
205 
206     bool FinalizeResult(void** tag, bool* status) override;
207 
208    private:
209     void ContinueFinalizeResultAfterInterception();
210 
211    protected:
212     ServerInterface* const server_;
213     ::grpc_impl::ServerContext* const context_;
214     internal::ServerAsyncStreamingInterface* const stream_;
215     ::grpc_impl::CompletionQueue* const call_cq_;
216     ::grpc_impl::ServerCompletionQueue* const notification_cq_;
217     void* const tag_;
218     const bool delete_on_finalize_;
219     grpc_call* call_;
220     internal::Call call_wrapper_;
221     internal::InterceptorBatchMethodsImpl interceptor_methods_;
222     bool done_intercepting_;
223   };
224 
225   /// RegisteredAsyncRequest is not part of the C++ API
226   class RegisteredAsyncRequest : public BaseAsyncRequest {
227    public:
228     RegisteredAsyncRequest(ServerInterface* server,
229                            ::grpc_impl::ServerContext* context,
230                            internal::ServerAsyncStreamingInterface* stream,
231                            ::grpc_impl::CompletionQueue* call_cq,
232                            ::grpc_impl::ServerCompletionQueue* notification_cq,
233                            void* tag, const char* name,
234                            internal::RpcMethod::RpcType type);
235 
FinalizeResult(void ** tag,bool * status)236     virtual bool FinalizeResult(void** tag, bool* status) override {
237       /* If we are done intercepting, then there is nothing more for us to do */
238       if (done_intercepting_) {
239         return BaseAsyncRequest::FinalizeResult(tag, status);
240       }
241       call_wrapper_ = ::grpc::internal::Call(
242           call_, server_, call_cq_, server_->max_receive_message_size(),
243           context_->set_server_rpc_info(name_, type_,
244                                         *server_->interceptor_creators()));
245       return BaseAsyncRequest::FinalizeResult(tag, status);
246     }
247 
248    protected:
249     void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
250                       ::grpc_impl::ServerCompletionQueue* notification_cq);
251     const char* name_;
252     const internal::RpcMethod::RpcType type_;
253   };
254 
255   class NoPayloadAsyncRequest final : public RegisteredAsyncRequest {
256    public:
NoPayloadAsyncRequest(internal::RpcServiceMethod * registered_method,ServerInterface * server,::grpc_impl::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc_impl::CompletionQueue * call_cq,::grpc_impl::ServerCompletionQueue * notification_cq,void * tag)257     NoPayloadAsyncRequest(internal::RpcServiceMethod* registered_method,
258                           ServerInterface* server,
259                           ::grpc_impl::ServerContext* context,
260                           internal::ServerAsyncStreamingInterface* stream,
261                           ::grpc_impl::CompletionQueue* call_cq,
262                           ::grpc_impl::ServerCompletionQueue* notification_cq,
263                           void* tag)
264         : RegisteredAsyncRequest(
265               server, context, stream, call_cq, notification_cq, tag,
266               registered_method->name(), registered_method->method_type()) {
267       IssueRequest(registered_method->server_tag(), nullptr, notification_cq);
268     }
269 
270     // uses RegisteredAsyncRequest::FinalizeResult
271   };
272 
273   template <class Message>
274   class PayloadAsyncRequest final : public RegisteredAsyncRequest {
275    public:
PayloadAsyncRequest(internal::RpcServiceMethod * registered_method,ServerInterface * server,::grpc_impl::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc_impl::CompletionQueue * call_cq,::grpc_impl::ServerCompletionQueue * notification_cq,void * tag,Message * request)276     PayloadAsyncRequest(internal::RpcServiceMethod* registered_method,
277                         ServerInterface* server,
278                         ::grpc_impl::ServerContext* context,
279                         internal::ServerAsyncStreamingInterface* stream,
280                         ::grpc_impl::CompletionQueue* call_cq,
281                         ::grpc_impl::ServerCompletionQueue* notification_cq,
282                         void* tag, Message* request)
283         : RegisteredAsyncRequest(
284               server, context, stream, call_cq, notification_cq, tag,
285               registered_method->name(), registered_method->method_type()),
286           registered_method_(registered_method),
287           request_(request) {
288       IssueRequest(registered_method->server_tag(), payload_.bbuf_ptr(),
289                    notification_cq);
290     }
291 
~PayloadAsyncRequest()292     ~PayloadAsyncRequest() {
293       payload_.Release();  // We do not own the payload_
294     }
295 
FinalizeResult(void ** tag,bool * status)296     bool FinalizeResult(void** tag, bool* status) override {
297       /* If we are done intercepting, then there is nothing more for us to do */
298       if (done_intercepting_) {
299         return RegisteredAsyncRequest::FinalizeResult(tag, status);
300       }
301       if (*status) {
302         if (!payload_.Valid() || !SerializationTraits<Message>::Deserialize(
303                                       payload_.bbuf_ptr(), request_)
304                                       .ok()) {
305           // If deserialization fails, we cancel the call and instantiate
306           // a new instance of ourselves to request another call.  We then
307           // return false, which prevents the call from being returned to
308           // the application.
309           g_core_codegen_interface->grpc_call_cancel_with_status(
310               call_, GRPC_STATUS_INTERNAL, "Unable to parse request", nullptr);
311           g_core_codegen_interface->grpc_call_unref(call_);
312           new PayloadAsyncRequest(registered_method_, server_, context_,
313                                   stream_, call_cq_, notification_cq_, tag_,
314                                   request_);
315           delete this;
316           return false;
317         }
318       }
319       /* Set interception point for recv message */
320       interceptor_methods_.AddInterceptionHookPoint(
321           experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
322       interceptor_methods_.SetRecvMessage(request_, nullptr);
323       return RegisteredAsyncRequest::FinalizeResult(tag, status);
324     }
325 
326    private:
327     internal::RpcServiceMethod* const registered_method_;
328     Message* const request_;
329     ByteBuffer payload_;
330   };
331 
332   class GenericAsyncRequest : public BaseAsyncRequest {
333    public:
334     GenericAsyncRequest(ServerInterface* server, GenericServerContext* context,
335                         internal::ServerAsyncStreamingInterface* stream,
336                         ::grpc_impl::CompletionQueue* call_cq,
337                         ::grpc_impl::ServerCompletionQueue* notification_cq,
338                         void* tag, bool delete_on_finalize);
339 
340     bool FinalizeResult(void** tag, bool* status) override;
341 
342    private:
343     grpc_call_details call_details_;
344   };
345 
346   template <class Message>
RequestAsyncCall(internal::RpcServiceMethod * method,::grpc_impl::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc_impl::CompletionQueue * call_cq,::grpc_impl::ServerCompletionQueue * notification_cq,void * tag,Message * message)347   void RequestAsyncCall(internal::RpcServiceMethod* method,
348                         ::grpc_impl::ServerContext* context,
349                         internal::ServerAsyncStreamingInterface* stream,
350                         ::grpc_impl::CompletionQueue* call_cq,
351                         ::grpc_impl::ServerCompletionQueue* notification_cq,
352                         void* tag, Message* message) {
353     GPR_CODEGEN_ASSERT(method);
354     new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq,
355                                      notification_cq, tag, message);
356   }
357 
RequestAsyncCall(internal::RpcServiceMethod * method,::grpc_impl::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc_impl::CompletionQueue * call_cq,::grpc_impl::ServerCompletionQueue * notification_cq,void * tag)358   void RequestAsyncCall(internal::RpcServiceMethod* method,
359                         ::grpc_impl::ServerContext* context,
360                         internal::ServerAsyncStreamingInterface* stream,
361                         ::grpc_impl::CompletionQueue* call_cq,
362                         ::grpc_impl::ServerCompletionQueue* notification_cq,
363                         void* tag) {
364     GPR_CODEGEN_ASSERT(method);
365     new NoPayloadAsyncRequest(method, this, context, stream, call_cq,
366                               notification_cq, tag);
367   }
368 
RequestAsyncGenericCall(GenericServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc_impl::CompletionQueue * call_cq,::grpc_impl::ServerCompletionQueue * notification_cq,void * tag)369   void RequestAsyncGenericCall(
370       GenericServerContext* context,
371       internal::ServerAsyncStreamingInterface* stream,
372       ::grpc_impl::CompletionQueue* call_cq,
373       ::grpc_impl::ServerCompletionQueue* notification_cq, void* tag) {
374     new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
375                             tag, true);
376   }
377 
378  private:
379   // EXPERIMENTAL
380   // Getter method for the vector of interceptor factory objects.
381   // Returns a nullptr (rather than being pure) since this is a post-1.0 method
382   // and adding a new pure method to an interface would be a breaking change
383   // (even though this is private and non-API)
384   virtual std::vector<
385       std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>*
interceptor_creators()386   interceptor_creators() {
387     return nullptr;
388   }
389 
390   // EXPERIMENTAL
391   // A method to get the callbackable completion queue associated with this
392   // server. If the return value is nullptr, this server doesn't support
393   // callback operations.
394   // TODO(vjpai): Consider a better default like using a global CQ
395   // Returns nullptr (rather than being pure) since this is a post-1.0 method
396   // and adding a new pure method to an interface would be a breaking change
397   // (even though this is private and non-API)
CallbackCQ()398   virtual ::grpc_impl::CompletionQueue* CallbackCQ() { return nullptr; }
399 };
400 
401 }  // namespace grpc
402 
403 #endif  // GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H
404