• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SERVER_INTERFACE_H
20 #define GRPCPP_SERVER_INTERFACE_H
21 
22 #include <grpc/grpc.h>
23 #include <grpc/impl/grpc_types.h>
24 #include <grpc/support/port_platform.h>
25 #include <grpc/support/time.h>
26 #include <grpcpp/impl/call.h>
27 #include <grpcpp/impl/call_hook.h>
28 #include <grpcpp/impl/codegen/interceptor_common.h>
29 #include <grpcpp/impl/completion_queue_tag.h>
30 #include <grpcpp/impl/rpc_service_method.h>
31 #include <grpcpp/server_context.h>
32 #include <grpcpp/support/byte_buffer.h>
33 
34 #include "absl/log/absl_check.h"
35 
36 namespace grpc {
37 
38 class AsyncGenericService;
39 class Channel;
40 class CompletionQueue;
41 class GenericServerContext;
42 class ServerCompletionQueue;
43 class ServerCredentials;
44 class Service;
45 
46 /// Models a gRPC server.
47 ///
48 /// Servers are configured and started via \a grpc::ServerBuilder.
49 namespace internal {
50 class ServerAsyncStreamingInterface;
51 }  // namespace internal
52 
53 class CallbackGenericService;
54 
55 namespace experimental {
56 class ServerInterceptorFactoryInterface;
57 class ServerMetricRecorder;
58 }  // namespace experimental
59 
60 class ServerInterface : public internal::CallHook {
61  public:
~ServerInterface()62   ~ServerInterface() override {}
63 
64   /// \a Shutdown does the following things:
65   ///
66   /// 1. Shutdown the server: deactivate all listening ports, mark it in
67   ///    "shutdown mode" so that further call Request's or incoming RPC matches
68   ///    are no longer allowed. Also return all Request'ed-but-not-yet-active
69   ///    calls as failed (!ok). This refers to calls that have been requested
70   ///    at the server by the server-side library or application code but that
71   ///    have not yet been matched to incoming RPCs from the client. Note that
72   ///    this would even include default calls added automatically by the gRPC
73   ///    C++ API without the user's input (e.g., "Unimplemented RPC method")
74   ///
75   /// 2. Block until all rpc method handlers invoked automatically by the sync
76   ///    API finish.
77   ///
78   /// 3. If all pending calls complete (and all their operations are
79   ///    retrieved by Next) before \a deadline expires, this finishes
80   ///    gracefully. Otherwise, forcefully cancel all pending calls associated
81   ///    with the server after \a deadline expires. In the case of the sync API,
82   ///    if the RPC function for a streaming call has already been started and
83   ///    takes a week to complete, the RPC function won't be forcefully
84   ///    terminated (since that would leave state corrupt and incomplete) and
85   ///    the method handler will just keep running (which will prevent the
86   ///    server from completing the "join" operation that it needs to do at
87   ///    shutdown time).
88   ///
89   /// All completion queue associated with the server (for example, for async
90   /// serving) must be shutdown *after* this method has returned:
91   /// See \a ServerBuilder::AddCompletionQueue for details.
92   /// They must also be drained (by repeated Next) after being shutdown.
93   ///
94   /// \param deadline How long to wait until pending rpcs are forcefully
95   /// terminated.
96   template <class T>
Shutdown(const T & deadline)97   void Shutdown(const T& deadline) {
98     ShutdownInternal(TimePoint<T>(deadline).raw_time());
99   }
100 
101   /// Shutdown the server without a deadline and forced cancellation.
102   ///
103   /// All completion queue associated with the server (for example, for async
104   /// serving) must be shutdown *after* this method has returned:
105   /// See \a ServerBuilder::AddCompletionQueue for details.
Shutdown()106   void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); }
107 
108   /// Block waiting for all work to complete.
109   ///
110   /// \warning The server must be either shutting down or some other thread must
111   /// call \a Shutdown for this function to ever return.
112   virtual void Wait() = 0;
113 
114  protected:
115   friend class grpc::Service;
116 
117   /// Register a service. This call does not take ownership of the service.
118   /// The service must exist for the lifetime of the Server instance.
119   virtual bool RegisterService(const std::string* host, Service* service) = 0;
120 
121   /// Register a generic service. This call does not take ownership of the
122   /// service. The service must exist for the lifetime of the Server instance.
123   virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0;
124 
125   /// Register a callback generic service. This call does not take ownership of
126   /// the  service. The service must exist for the lifetime of the Server
127   /// instance. May not be abstract since this is a post-1.0 API addition.
128 
RegisterCallbackGenericService(CallbackGenericService *)129   virtual void RegisterCallbackGenericService(CallbackGenericService*
130                                               /*service*/) {}
131 
132   /// Tries to bind \a server to the given \a addr.
133   ///
134   /// It can be invoked multiple times.
135   ///
136   /// \param addr The address to try to bind to the server (eg, localhost:1234,
137   /// 192.168.1.1:31416, [::1]:27182, etc.).
138   /// \params creds The credentials associated with the server.
139   ///
140   /// \return bound port number on success, 0 on failure.
141   ///
142   /// \warning It's an error to call this method on an already started server.
143   virtual int AddListeningPort(const std::string& addr,
144                                ServerCredentials* creds) = 0;
145 
146   /// Start the server.
147   ///
148   /// \param cqs Completion queues for handling asynchronous services. The
149   /// caller is required to keep all completion queues live until the server is
150   /// destroyed.
151   /// \param num_cqs How many completion queues does \a cqs hold.
152   virtual void Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) = 0;
153 
154   virtual void ShutdownInternal(gpr_timespec deadline) = 0;
155 
156   virtual int max_receive_message_size() const = 0;
157 
158   virtual grpc_server* server() = 0;
159 
160   void PerformOpsOnCall(internal::CallOpSetInterface* ops,
161                         internal::Call* call) override = 0;
162 
163   class BaseAsyncRequest : public internal::CompletionQueueTag {
164    public:
165     BaseAsyncRequest(ServerInterface* server, grpc::ServerContext* context,
166                      internal::ServerAsyncStreamingInterface* stream,
167                      grpc::CompletionQueue* call_cq,
168                      grpc::ServerCompletionQueue* notification_cq, void* tag,
169                      bool delete_on_finalize);
170     ~BaseAsyncRequest() override;
171 
172     bool FinalizeResult(void** tag, bool* status) override;
173 
174    private:
175     void ContinueFinalizeResultAfterInterception();
176 
177    protected:
178     ServerInterface* const server_;
179     grpc::ServerContext* const context_;
180     internal::ServerAsyncStreamingInterface* const stream_;
181     grpc::CompletionQueue* const call_cq_;
182     grpc::ServerCompletionQueue* const notification_cq_;
183     void* const tag_;
184     const bool delete_on_finalize_;
185     grpc_call* call_;
186     internal::Call call_wrapper_;
187     internal::InterceptorBatchMethodsImpl interceptor_methods_;
188     bool done_intercepting_;
189     bool call_metric_recording_enabled_;
190     experimental::ServerMetricRecorder* server_metric_recorder_;
191   };
192 
193   /// RegisteredAsyncRequest is not part of the C++ API
194   class RegisteredAsyncRequest : public BaseAsyncRequest {
195    public:
196     RegisteredAsyncRequest(ServerInterface* server,
197                            grpc::ServerContext* context,
198                            internal::ServerAsyncStreamingInterface* stream,
199                            grpc::CompletionQueue* call_cq,
200                            grpc::ServerCompletionQueue* notification_cq,
201                            void* tag, const char* name,
202                            internal::RpcMethod::RpcType type);
203 
FinalizeResult(void ** tag,bool * status)204     bool FinalizeResult(void** tag, bool* status) override {
205       // If we are done intercepting, then there is nothing more for us to do
206       if (done_intercepting_) {
207         return BaseAsyncRequest::FinalizeResult(tag, status);
208       }
209       call_wrapper_ = grpc::internal::Call(
210           call_, server_, call_cq_, server_->max_receive_message_size(),
211           context_->set_server_rpc_info(name_, type_,
212                                         *server_->interceptor_creators()));
213       return BaseAsyncRequest::FinalizeResult(tag, status);
214     }
215 
216    protected:
217     void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
218                       grpc::ServerCompletionQueue* notification_cq);
219     const char* name_;
220     const internal::RpcMethod::RpcType type_;
221   };
222 
223   class NoPayloadAsyncRequest final : public RegisteredAsyncRequest {
224    public:
NoPayloadAsyncRequest(internal::RpcServiceMethod * registered_method,ServerInterface * server,grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag)225     NoPayloadAsyncRequest(internal::RpcServiceMethod* registered_method,
226                           ServerInterface* server, grpc::ServerContext* context,
227                           internal::ServerAsyncStreamingInterface* stream,
228                           grpc::CompletionQueue* call_cq,
229                           grpc::ServerCompletionQueue* notification_cq,
230                           void* tag)
231         : RegisteredAsyncRequest(
232               server, context, stream, call_cq, notification_cq, tag,
233               registered_method->name(), registered_method->method_type()) {
234       IssueRequest(registered_method->server_tag(), nullptr, notification_cq);
235     }
236 
237     // uses RegisteredAsyncRequest::FinalizeResult
238   };
239 
240   template <class Message>
241   class PayloadAsyncRequest final : public RegisteredAsyncRequest {
242    public:
PayloadAsyncRequest(internal::RpcServiceMethod * registered_method,ServerInterface * server,grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag,Message * request)243     PayloadAsyncRequest(internal::RpcServiceMethod* registered_method,
244                         ServerInterface* server, grpc::ServerContext* context,
245                         internal::ServerAsyncStreamingInterface* stream,
246                         grpc::CompletionQueue* call_cq,
247                         grpc::ServerCompletionQueue* notification_cq, void* tag,
248                         Message* request)
249         : RegisteredAsyncRequest(
250               server, context, stream, call_cq, notification_cq, tag,
251               registered_method->name(), registered_method->method_type()),
252           registered_method_(registered_method),
253           request_(request) {
254       IssueRequest(registered_method->server_tag(), payload_.bbuf_ptr(),
255                    notification_cq);
256     }
257 
~PayloadAsyncRequest()258     ~PayloadAsyncRequest() override {
259       payload_.Release();  // We do not own the payload_
260     }
261 
FinalizeResult(void ** tag,bool * status)262     bool FinalizeResult(void** tag, bool* status) override {
263       // If we are done intercepting, then there is nothing more for us to do
264       if (done_intercepting_) {
265         return RegisteredAsyncRequest::FinalizeResult(tag, status);
266       }
267       if (*status) {
268         if (!payload_.Valid() || !SerializationTraits<Message>::Deserialize(
269                                       payload_.bbuf_ptr(), request_)
270                                       .ok()) {
271           // If deserialization fails, we cancel the call and instantiate
272           // a new instance of ourselves to request another call.  We then
273           // return false, which prevents the call from being returned to
274           // the application.
275           grpc_call_cancel_with_status(call_, GRPC_STATUS_INTERNAL,
276                                        "Unable to parse request", nullptr);
277           grpc_call_unref(call_);
278           new PayloadAsyncRequest(registered_method_, server_, context_,
279                                   stream_, call_cq_, notification_cq_, tag_,
280                                   request_);
281           delete this;
282           return false;
283         }
284       }
285       // Set interception point for recv message
286       interceptor_methods_.AddInterceptionHookPoint(
287           experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
288       interceptor_methods_.SetRecvMessage(request_, nullptr);
289       return RegisteredAsyncRequest::FinalizeResult(tag, status);
290     }
291 
292    private:
293     internal::RpcServiceMethod* const registered_method_;
294     Message* const request_;
295     ByteBuffer payload_;
296   };
297 
298   class GenericAsyncRequest : public BaseAsyncRequest {
299    public:
300     GenericAsyncRequest(ServerInterface* server, GenericServerContext* context,
301                         internal::ServerAsyncStreamingInterface* stream,
302                         grpc::CompletionQueue* call_cq,
303                         grpc::ServerCompletionQueue* notification_cq, void* tag,
304                         bool delete_on_finalize, bool issue_request = true);
305 
306     bool FinalizeResult(void** tag, bool* status) override;
307 
308    protected:
309     void IssueRequest();
310 
311    private:
312     grpc_call_details call_details_;
313   };
314 
315   template <class Message>
RequestAsyncCall(internal::RpcServiceMethod * method,grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag,Message * message)316   void RequestAsyncCall(internal::RpcServiceMethod* method,
317                         grpc::ServerContext* context,
318                         internal::ServerAsyncStreamingInterface* stream,
319                         grpc::CompletionQueue* call_cq,
320                         grpc::ServerCompletionQueue* notification_cq, void* tag,
321                         Message* message) {
322     ABSL_CHECK(method);
323     new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq,
324                                      notification_cq, tag, message);
325   }
326 
RequestAsyncCall(internal::RpcServiceMethod * method,grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag)327   void RequestAsyncCall(internal::RpcServiceMethod* method,
328                         grpc::ServerContext* context,
329                         internal::ServerAsyncStreamingInterface* stream,
330                         grpc::CompletionQueue* call_cq,
331                         grpc::ServerCompletionQueue* notification_cq,
332                         void* tag) {
333     ABSL_CHECK(method);
334     new NoPayloadAsyncRequest(method, this, context, stream, call_cq,
335                               notification_cq, tag);
336   }
337 
RequestAsyncGenericCall(GenericServerContext * context,internal::ServerAsyncStreamingInterface * stream,grpc::CompletionQueue * call_cq,grpc::ServerCompletionQueue * notification_cq,void * tag)338   void RequestAsyncGenericCall(GenericServerContext* context,
339                                internal::ServerAsyncStreamingInterface* stream,
340                                grpc::CompletionQueue* call_cq,
341                                grpc::ServerCompletionQueue* notification_cq,
342                                void* tag) {
343     new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
344                             tag, true);
345   }
346 
347  private:
348   // EXPERIMENTAL
349   // Getter method for the vector of interceptor factory objects.
350   // Returns a nullptr (rather than being pure) since this is a post-1.0 method
351   // and adding a new pure method to an interface would be a breaking change
352   // (even though this is private and non-API)
353   virtual std::vector<
354       std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>*
interceptor_creators()355   interceptor_creators() {
356     return nullptr;
357   }
358 
359   // Whether per-call load reporting is enabled.
360   virtual bool call_metric_recording_enabled() const = 0;
361 
362   // Interface to read or update server-wide metrics. Returns null when not set.
363   virtual experimental::ServerMetricRecorder* server_metric_recorder()
364       const = 0;
365 
366   // A method to get the callbackable completion queue associated with this
367   // server. If the return value is nullptr, this server doesn't support
368   // callback operations.
369   // TODO(vjpai): Consider a better default like using a global CQ
370   // Returns nullptr (rather than being pure) since this is a post-1.0 method
371   // and adding a new pure method to an interface would be a breaking change
372   // (even though this is private and non-API)
CallbackCQ()373   virtual grpc::CompletionQueue* CallbackCQ() { return nullptr; }
374 };
375 
376 }  // namespace grpc
377 
378 #endif  // GRPCPP_SERVER_INTERFACE_H
379