1 /* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H 20 #define GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H 21 22 #include <atomic> 23 #include <set> 24 25 #include <grpc/support/log.h> 26 #include <grpcpp/grpcpp.h> 27 #include <grpcpp/health_check_service_interface.h> 28 #include <grpcpp/impl/codegen/async_generic_service.h> 29 #include <grpcpp/impl/codegen/async_unary_call.h> 30 #include <grpcpp/impl/codegen/completion_queue.h> 31 #include <grpcpp/impl/codegen/service_type.h> 32 #include <grpcpp/support/byte_buffer.h> 33 34 #include "src/core/lib/gprpp/sync.h" 35 #include "src/core/lib/gprpp/thd.h" 36 37 namespace grpc { 38 39 // Default implementation of HealthCheckServiceInterface. Server will create and 40 // own it. 41 class DefaultHealthCheckService final : public HealthCheckServiceInterface { 42 public: 43 enum ServingStatus { NOT_FOUND, SERVING, NOT_SERVING }; 44 45 // The service impl to register with the server. 46 class HealthCheckServiceImpl : public Service { 47 public: 48 // Base class for call handlers. 49 class CallHandler { 50 public: 51 virtual ~CallHandler() = default; 52 virtual void SendHealth(std::shared_ptr<CallHandler> self, 53 ServingStatus status) = 0; 54 }; 55 56 HealthCheckServiceImpl(DefaultHealthCheckService* database, 57 std::unique_ptr<ServerCompletionQueue> cq); 58 59 ~HealthCheckServiceImpl() override; 60 61 void StartServingThread(); 62 63 private: 64 // A tag that can be called with a bool argument. It's tailored for 65 // CallHandler's use. Before being used, it should be constructed with a 66 // method of CallHandler and a shared pointer to the handler. The 67 // shared pointer will be moved to the invoked function and the function 68 // can only be invoked once. That makes ref counting of the handler easier, 69 // because the shared pointer is not bound to the function and can be gone 70 // once the invoked function returns (if not used any more). 71 class CallableTag { 72 public: 73 using HandlerFunction = 74 std::function<void(std::shared_ptr<CallHandler>, bool)>; 75 CallableTag()76 CallableTag() {} 77 CallableTag(HandlerFunction func,std::shared_ptr<CallHandler> handler)78 CallableTag(HandlerFunction func, std::shared_ptr<CallHandler> handler) 79 : handler_function_(std::move(func)), handler_(std::move(handler)) { 80 GPR_ASSERT(handler_function_ != nullptr); 81 GPR_ASSERT(handler_ != nullptr); 82 } 83 84 // Runs the tag. This should be called only once. The handler is no 85 // longer owned by this tag after this method is invoked. Run(bool ok)86 void Run(bool ok) { 87 GPR_ASSERT(handler_function_ != nullptr); 88 GPR_ASSERT(handler_ != nullptr); 89 handler_function_(std::move(handler_), ok); 90 } 91 92 // Releases and returns the shared pointer to the handler. ReleaseHandler()93 std::shared_ptr<CallHandler> ReleaseHandler() { 94 return std::move(handler_); 95 } 96 97 private: 98 HandlerFunction handler_function_ = nullptr; 99 std::shared_ptr<CallHandler> handler_; 100 }; 101 102 // Call handler for Check method. 103 // Each handler takes care of one call. It contains per-call data and it 104 // will access the members of the parent class (i.e., 105 // DefaultHealthCheckService) for per-service health data. 106 class CheckCallHandler : public CallHandler { 107 public: 108 // Instantiates a CheckCallHandler and requests the next health check 109 // call. The handler object will manage its own lifetime, so no action is 110 // needed from the caller any more regarding that object. 111 static void CreateAndStart(ServerCompletionQueue* cq, 112 DefaultHealthCheckService* database, 113 HealthCheckServiceImpl* service); 114 115 // This ctor is public because we want to use std::make_shared<> in 116 // CreateAndStart(). This ctor shouldn't be used elsewhere. 117 CheckCallHandler(ServerCompletionQueue* cq, 118 DefaultHealthCheckService* database, 119 HealthCheckServiceImpl* service); 120 121 // Not used for Check. SendHealth(std::shared_ptr<CallHandler>,ServingStatus)122 void SendHealth(std::shared_ptr<CallHandler> /*self*/, 123 ServingStatus /*status*/) override {} 124 125 private: 126 // Called when we receive a call. 127 // Spawns a new handler so that we can keep servicing future calls. 128 void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); 129 130 // Called when Finish() is done. 131 void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); 132 133 // The members passed down from HealthCheckServiceImpl. 134 ServerCompletionQueue* cq_; 135 DefaultHealthCheckService* database_; 136 HealthCheckServiceImpl* service_; 137 138 ByteBuffer request_; 139 GenericServerAsyncResponseWriter writer_; 140 ServerContext ctx_; 141 142 CallableTag next_; 143 }; 144 145 // Call handler for Watch method. 146 // Each handler takes care of one call. It contains per-call data and it 147 // will access the members of the parent class (i.e., 148 // DefaultHealthCheckService) for per-service health data. 149 class WatchCallHandler : public CallHandler { 150 public: 151 // Instantiates a WatchCallHandler and requests the next health check 152 // call. The handler object will manage its own lifetime, so no action is 153 // needed from the caller any more regarding that object. 154 static void CreateAndStart(ServerCompletionQueue* cq, 155 DefaultHealthCheckService* database, 156 HealthCheckServiceImpl* service); 157 158 // This ctor is public because we want to use std::make_shared<> in 159 // CreateAndStart(). This ctor shouldn't be used elsewhere. 160 WatchCallHandler(ServerCompletionQueue* cq, 161 DefaultHealthCheckService* database, 162 HealthCheckServiceImpl* service); 163 164 void SendHealth(std::shared_ptr<CallHandler> self, 165 ServingStatus status) override; 166 167 private: 168 // Called when we receive a call. 169 // Spawns a new handler so that we can keep servicing future calls. 170 void OnCallReceived(std::shared_ptr<CallHandler> self, bool ok); 171 172 // Requires holding send_mu_. 173 void SendHealthLocked(std::shared_ptr<CallHandler> self, 174 ServingStatus status); 175 176 // When sending a health result finishes. 177 void OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok); 178 179 void SendFinish(std::shared_ptr<CallHandler> self, const Status& status); 180 181 // Requires holding service_->cq_shutdown_mu_. 182 void SendFinishLocked(std::shared_ptr<CallHandler> self, 183 const Status& status); 184 185 // Called when Finish() is done. 186 void OnFinishDone(std::shared_ptr<CallHandler> self, bool ok); 187 188 // Called when AsyncNotifyWhenDone() notifies us. 189 void OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok); 190 191 // The members passed down from HealthCheckServiceImpl. 192 ServerCompletionQueue* cq_; 193 DefaultHealthCheckService* database_; 194 HealthCheckServiceImpl* service_; 195 196 ByteBuffer request_; 197 std::string service_name_; 198 GenericServerAsyncWriter stream_; 199 ServerContext ctx_; 200 201 grpc_core::Mutex send_mu_; 202 bool send_in_flight_ = false; // Guarded by mu_. 203 ServingStatus pending_status_ = NOT_FOUND; // Guarded by mu_. 204 205 bool finish_called_ = false; 206 CallableTag next_; 207 CallableTag on_done_notified_; 208 CallableTag on_finish_done_; 209 }; 210 211 // Handles the incoming requests and drives the completion queue in a loop. 212 static void Serve(void* arg); 213 214 // Returns true on success. 215 static bool DecodeRequest(const ByteBuffer& request, 216 std::string* service_name); 217 static bool EncodeResponse(ServingStatus status, ByteBuffer* response); 218 219 // Needed to appease Windows compilers, which don't seem to allow 220 // nested classes to access protected members in the parent's 221 // superclass. 222 using Service::RequestAsyncServerStreaming; 223 using Service::RequestAsyncUnary; 224 225 DefaultHealthCheckService* database_; 226 std::unique_ptr<ServerCompletionQueue> cq_; 227 228 // To synchronize the operations related to shutdown state of cq_, so that 229 // we don't enqueue new tags into cq_ after it is already shut down. 230 grpc_core::Mutex cq_shutdown_mu_; 231 std::atomic_bool shutdown_{false}; 232 std::unique_ptr<::grpc_core::Thread> thread_; 233 }; 234 235 DefaultHealthCheckService(); 236 237 void SetServingStatus(const std::string& service_name, bool serving) override; 238 void SetServingStatus(bool serving) override; 239 240 void Shutdown() override; 241 242 ServingStatus GetServingStatus(const std::string& service_name) const; 243 244 HealthCheckServiceImpl* GetHealthCheckService( 245 std::unique_ptr<ServerCompletionQueue> cq); 246 247 private: 248 // Stores the current serving status of a service and any call 249 // handlers registered for updates when the service's status changes. 250 class ServiceData { 251 public: 252 void SetServingStatus(ServingStatus status); GetServingStatus()253 ServingStatus GetServingStatus() const { return status_; } 254 void AddCallHandler( 255 std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); 256 void RemoveCallHandler( 257 const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); Unused()258 bool Unused() const { 259 return call_handlers_.empty() && status_ == NOT_FOUND; 260 } 261 262 private: 263 ServingStatus status_ = NOT_FOUND; 264 std::set<std::shared_ptr<HealthCheckServiceImpl::CallHandler>> 265 call_handlers_; 266 }; 267 268 void RegisterCallHandler( 269 const std::string& service_name, 270 std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler); 271 272 void UnregisterCallHandler( 273 const std::string& service_name, 274 const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler); 275 276 mutable grpc_core::Mutex mu_; 277 bool shutdown_ = false; // Guarded by mu_. 278 std::map<std::string, ServiceData> services_map_; // Guarded by mu_. 279 std::unique_ptr<HealthCheckServiceImpl> impl_; 280 }; 281 282 } // namespace grpc 283 284 #endif // GRPC_INTERNAL_CPP_SERVER_DEFAULT_HEALTH_CHECK_SERVICE_H 285