• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
20 #define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
21 
22 #include <atomic>
23 #include <set>
24 
25 #include <grpc/support/log.h>
26 #include <grpcpp/grpcpp.h>
27 #include <grpcpp/health_check_service_interface.h>
28 #include <grpcpp/impl/codegen/async_generic_service.h>
29 #include <grpcpp/impl/codegen/async_unary_call.h>
30 #include <grpcpp/impl/codegen/completion_queue.h>
31 #include <grpcpp/impl/codegen/service_type.h>
32 #include <grpcpp/support/byte_buffer.h>
33 
34 #include "src/core/lib/gprpp/sync.h"
35 #include "src/core/lib/gprpp/thd.h"
36 
37 namespace grpc {
38 
39 // Default implementation of HealthCheckServiceInterface. Server will create and
40 // own it.
41 class DefaultHealthCheckService final : public HealthCheckServiceInterface {
42  public:
43   enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING };
44 
45   // The service impl to register with the server.
46   class HealthCheckServiceImpl : public Service {
47    public:
48     // Base class for call handlers.
49     class CallHandler {
50      public:
51       virtual ~CallHandler() = default;
52       virtual void SendHealth(std::shared_ptr<CallHandler> self,
53                               ServingStatus status) = 0;
54     };
55 
56     HealthCheckServiceImpl(DefaultHealthCheckService* database,
57                            std::unique_ptr<ServerCompletionQueue> cq);
58 
59     ~HealthCheckServiceImpl() override;
60 
61     void StartServingThread();
62 
63    private:
64     // A tag that can be called with a bool argument. It's tailored for
65     // CallHandler's use. Before being used, it should be constructed with a
66     // method of CallHandler and a shared pointer to the handler. The
67     // shared pointer will be moved to the invoked function and the function
68     // can only be invoked once. That makes ref counting of the handler easier,
69     // because the shared pointer is not bound to the function and can be gone
70     // once the invoked function returns (if not used any more).
71     class CallableTag {
72      public:
73       using HandlerFunction =
74           std::function<void(std::shared_ptr<CallHandler>, bool)>;
75 
CallableTag()76       CallableTag() {}
77 
CallableTag(HandlerFunction func,std::shared_ptr<CallHandler> handler)78       CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler)
79           : handler_function_(std::move(func)), handler_(std::move(handler)) {
80         GPR_ASSERT(handler_function_ != nullptr);
81         GPR_ASSERT(handler_ != nullptr);
82       }
83 
84       // Runs the tag. This should be called only once. The handler is no
85       // longer owned by this tag after this method is invoked.
Run(bool ok)86       void Run(bool ok) {
87         GPR_ASSERT(handler_function_ != nullptr);
88         GPR_ASSERT(handler_ != nullptr);
89         handler_function_(std::move(handler_), ok);
90       }
91 
92       // Releases and returns the shared pointer to the handler.
ReleaseHandler()93       std::shared_ptr<CallHandler> ReleaseHandler() {
94         return std::move(handler_);
95       }
96 
97      private:
98       HandlerFunction handler_function_ = nullptr;
99       std::shared_ptr<CallHandler> handler_;
100     };
101 
102     // Call handler for Check method.
103     // Each handler takes care of one call. It contains per-call data and it
104     // will access the members of the parent class (i.e.,
105     // DefaultHealthCheckService) for per-service health data.
106     class CheckCallHandler : public CallHandler {
107      public:
108       // Instantiates a CheckCallHandler and requests the next health check
109       // call. The handler object will manage its own lifetime, so no action is
110       // needed from the caller any more regarding that object.
111       static void CreateAndStart(ServerCompletionQueue* cq,
112                                  DefaultHealthCheckService* database,
113                                  HealthCheckServiceImpl* service);
114 
115       // This ctor is public because we want to use std::make_shared<> in
116       // CreateAndStart(). This ctor shouldn't be used elsewhere.
117       CheckCallHandler(ServerCompletionQueue* cq,
118                        DefaultHealthCheckService* database,
119                        HealthCheckServiceImpl* service);
120 
121       // Not used for Check.
SendHealth(std::shared_ptr<CallHandler>,ServingStatus)122       void SendHealth(std::shared_ptr<CallHandler> /*self*/,
123                       ServingStatus /*status*/) override {}
124 
125      private:
126       // Called when we receive a call.
127       // Spawns a new handler so that we can keep servicing future calls.
128       void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
129 
130       // Called when Finish() is done.
131       void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
132 
133       // The members passed down from HealthCheckServiceImpl.
134       ServerCompletionQueue* cq_;
135       DefaultHealthCheckService* database_;
136       HealthCheckServiceImpl* service_;
137 
138       ByteBuffer request_;
139       GenericServerAsyncResponseWriter writer_;
140       ServerContext ctx_;
141 
142       CallableTag next_;
143     };
144 
145     // Call handler for Watch method.
146     // Each handler takes care of one call. It contains per-call data and it
147     // will access the members of the parent class (i.e.,
148     // DefaultHealthCheckService) for per-service health data.
149     class WatchCallHandler : public CallHandler {
150      public:
151       // Instantiates a WatchCallHandler and requests the next health check
152       // call. The handler object will manage its own lifetime, so no action is
153       // needed from the caller any more regarding that object.
154       static void CreateAndStart(ServerCompletionQueue* cq,
155                                  DefaultHealthCheckService* database,
156                                  HealthCheckServiceImpl* service);
157 
158       // This ctor is public because we want to use std::make_shared<> in
159       // CreateAndStart(). This ctor shouldn't be used elsewhere.
160       WatchCallHandler(ServerCompletionQueue* cq,
161                        DefaultHealthCheckService* database,
162                        HealthCheckServiceImpl* service);
163 
164       void SendHealth(std::shared_ptr<CallHandler> self,
165                       ServingStatus status) override;
166 
167      private:
168       // Called when we receive a call.
169       // Spawns a new handler so that we can keep servicing future calls.
170       void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok);
171 
172       // Requires holding send_mu_.
173       void SendHealthLocked(std::shared_ptr<CallHandler> self,
174                             ServingStatus status);
175 
176       // When sending a health result finishes.
177       void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok);
178 
179       void SendFinish(std::shared_ptr<CallHandler> self, const Status& status);
180 
181       // Requires holding service_->cq_shutdown_mu_.
182       void SendFinishLocked(std::shared_ptr<CallHandler> self,
183                             const Status& status);
184 
185       // Called when Finish() is done.
186       void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok);
187 
188       // Called when AsyncNotifyWhenDone() notifies us.
189       void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok);
190 
191       // The members passed down from HealthCheckServiceImpl.
192       ServerCompletionQueue* cq_;
193       DefaultHealthCheckService* database_;
194       HealthCheckServiceImpl* service_;
195 
196       ByteBuffer request_;
197       std::string service_name_;
198       GenericServerAsyncWriter stream_;
199       ServerContext ctx_;
200 
201       grpc_core::Mutex send_mu_;
202       bool send_in_flight_ = false;               // Guarded by mu_.
203       ServingStatus pending_status_ = NOT_FOUND;  // Guarded by mu_.
204 
205       bool finish_called_ = false;
206       CallableTag next_;
207       CallableTag on_done_notified_;
208       CallableTag on_finish_done_;
209     };
210 
211     // Handles the incoming requests and drives the completion queue in a loop.
212     static void Serve(void* arg);
213 
214     // Returns true on success.
215     static bool DecodeRequest(const ByteBuffer& request,
216                               std::string* service_name);
217     static bool EncodeResponse(ServingStatus status, ByteBuffer* response);
218 
219     // Needed to appease Windows compilers, which don't seem to allow
220     // nested classes to access protected members in the parent's
221     // superclass.
222     using Service::RequestAsyncServerStreaming;
223     using Service::RequestAsyncUnary;
224 
225     DefaultHealthCheckService* database_;
226     std::unique_ptr<ServerCompletionQueue> cq_;
227 
228     // To synchronize the operations related to shutdown state of cq_, so that
229     // we don't enqueue new tags into cq_ after it is already shut down.
230     grpc_core::Mutex cq_shutdown_mu_;
231     std::atomic_bool shutdown_{false};
232     std::unique_ptr<::grpc_core::Thread> thread_;
233   };
234 
235   DefaultHealthCheckService();
236 
237   void SetServingStatus(const std::string& service_name, bool serving) override;
238   void SetServingStatus(bool serving) override;
239 
240   void Shutdown() override;
241 
242   ServingStatus GetServingStatus(const std::string& service_name) const;
243 
244   HealthCheckServiceImpl* GetHealthCheckService(
245       std::unique_ptr<ServerCompletionQueue> cq);
246 
247  private:
248   // Stores the current serving status of a service and any call
249   // handlers registered for updates when the service's status changes.
250   class ServiceData {
251    public:
252     void SetServingStatus(ServingStatus status);
GetServingStatus()253     ServingStatus GetServingStatus() const { return status_; }
254     void AddCallHandler(
255         std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
256     void RemoveCallHandler(
257         const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
Unused()258     bool Unused() const {
259       return call_handlers_.empty() && status_ == NOT_FOUND;
260     }
261 
262    private:
263     ServingStatus status_ = NOT_FOUND;
264     std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>>
265         call_handlers_;
266   };
267 
268   void RegisterCallHandler(
269       const std::string& service_name,
270       std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler);
271 
272   void UnregisterCallHandler(
273       const std::string& service_name,
274       const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler);
275 
276   mutable grpc_core::Mutex mu_;
277   bool shutdown_ = false;                            // Guarded by mu_.
278   std::map<std::string, ServiceData> services_map_;  // Guarded by mu_.
279   std::unique_ptr<HealthCheckServiceImpl> impl_;
280 };
281 
282 }  // namespace grpc
283 
284 #endif  // GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H
285