1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/cpp/server/health/default_health_check_service.h"
20
21 #include <grpc/slice.h>
22 #include <grpcpp/impl/rpc_method.h>
23 #include <grpcpp/impl/rpc_service_method.h>
24 #include <grpcpp/impl/server_callback_handlers.h>
25 #include <grpcpp/support/slice.h>
26 #include <stdint.h>
27
28 #include <memory>
29 #include <utility>
30
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "src/proto/grpc/health/v1/health.upb.h"
34 #include "upb/base/string_view.h"
35 #include "upb/mem/arena.hpp"
36
37 #define MAX_SERVICE_NAME_LENGTH 200
38
39 namespace grpc {
40
41 //
42 // DefaultHealthCheckService
43 //
44
DefaultHealthCheckService()45 DefaultHealthCheckService::DefaultHealthCheckService() {
46 services_map_[""].SetServingStatus(SERVING);
47 }
48
SetServingStatus(const std::string & service_name,bool serving)49 void DefaultHealthCheckService::SetServingStatus(
50 const std::string& service_name, bool serving) {
51 grpc::internal::MutexLock lock(&mu_);
52 if (shutdown_) {
53 // Set to NOT_SERVING in case service_name is not in the map.
54 serving = false;
55 }
56 services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
57 }
58
SetServingStatus(bool serving)59 void DefaultHealthCheckService::SetServingStatus(bool serving) {
60 const ServingStatus status = serving ? SERVING : NOT_SERVING;
61 grpc::internal::MutexLock lock(&mu_);
62 if (shutdown_) return;
63 for (auto& p : services_map_) {
64 ServiceData& service_data = p.second;
65 service_data.SetServingStatus(status);
66 }
67 }
68
Shutdown()69 void DefaultHealthCheckService::Shutdown() {
70 grpc::internal::MutexLock lock(&mu_);
71 if (shutdown_) return;
72 shutdown_ = true;
73 for (auto& p : services_map_) {
74 ServiceData& service_data = p.second;
75 service_data.SetServingStatus(NOT_SERVING);
76 }
77 }
78
79 DefaultHealthCheckService::ServingStatus
GetServingStatus(const std::string & service_name) const80 DefaultHealthCheckService::GetServingStatus(
81 const std::string& service_name) const {
82 grpc::internal::MutexLock lock(&mu_);
83 auto it = services_map_.find(service_name);
84 if (it == services_map_.end()) return NOT_FOUND;
85 const ServiceData& service_data = it->second;
86 return service_data.GetServingStatus();
87 }
88
RegisterWatch(const std::string & service_name,grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher)89 void DefaultHealthCheckService::RegisterWatch(
90 const std::string& service_name,
91 grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher) {
92 grpc::internal::MutexLock lock(&mu_);
93 ServiceData& service_data = services_map_[service_name];
94 watcher->SendHealth(service_data.GetServingStatus());
95 service_data.AddWatch(std::move(watcher));
96 }
97
UnregisterWatch(const std::string & service_name,HealthCheckServiceImpl::WatchReactor * watcher)98 void DefaultHealthCheckService::UnregisterWatch(
99 const std::string& service_name,
100 HealthCheckServiceImpl::WatchReactor* watcher) {
101 grpc::internal::MutexLock lock(&mu_);
102 auto it = services_map_.find(service_name);
103 if (it == services_map_.end()) return;
104 ServiceData& service_data = it->second;
105 service_data.RemoveWatch(watcher);
106 if (service_data.Unused()) services_map_.erase(it);
107 }
108
109 DefaultHealthCheckService::HealthCheckServiceImpl*
GetHealthCheckService()110 DefaultHealthCheckService::GetHealthCheckService() {
111 CHECK(impl_ == nullptr);
112 impl_ = std::make_unique<HealthCheckServiceImpl>(this);
113 return impl_.get();
114 }
115
116 //
117 // DefaultHealthCheckService::ServiceData
118 //
119
SetServingStatus(ServingStatus status)120 void DefaultHealthCheckService::ServiceData::SetServingStatus(
121 ServingStatus status) {
122 status_ = status;
123 for (const auto& p : watchers_) {
124 p.first->SendHealth(status);
125 }
126 }
127
AddWatch(grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher)128 void DefaultHealthCheckService::ServiceData::AddWatch(
129 grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher) {
130 watchers_[watcher.get()] = std::move(watcher);
131 }
132
RemoveWatch(HealthCheckServiceImpl::WatchReactor * watcher)133 void DefaultHealthCheckService::ServiceData::RemoveWatch(
134 HealthCheckServiceImpl::WatchReactor* watcher) {
135 watchers_.erase(watcher);
136 }
137
138 //
139 // DefaultHealthCheckService::HealthCheckServiceImpl
140 //
141
142 namespace {
143 const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
144 const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
145 } // namespace
146
HealthCheckServiceImpl(DefaultHealthCheckService * database)147 DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
148 DefaultHealthCheckService* database)
149 : database_(database) {
150 // Add Check() method.
151 AddMethod(new internal::RpcServiceMethod(
152 kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
153 MarkMethodCallback(
154 0, new internal::CallbackUnaryHandler<ByteBuffer, ByteBuffer>(
155 [database](CallbackServerContext* context,
156 const ByteBuffer* request, ByteBuffer* response) {
157 return HandleCheckRequest(database, context, request, response);
158 }));
159 // Add Watch() method.
160 AddMethod(new internal::RpcServiceMethod(
161 kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
162 MarkMethodCallback(
163 1, new internal::CallbackServerStreamingHandler<ByteBuffer, ByteBuffer>(
164 [this](CallbackServerContext* /*ctx*/, const ByteBuffer* request) {
165 return new WatchReactor(this, request);
166 }));
167 }
168
~HealthCheckServiceImpl()169 DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
170 grpc::internal::MutexLock lock(&mu_);
171 shutdown_ = true;
172 while (num_watches_ > 0) {
173 shutdown_condition_.Wait(&mu_);
174 }
175 }
176
177 ServerUnaryReactor*
HandleCheckRequest(DefaultHealthCheckService * database,CallbackServerContext * context,const ByteBuffer * request,ByteBuffer * response)178 DefaultHealthCheckService::HealthCheckServiceImpl::HandleCheckRequest(
179 DefaultHealthCheckService* database, CallbackServerContext* context,
180 const ByteBuffer* request, ByteBuffer* response) {
181 auto* reactor = context->DefaultReactor();
182 std::string service_name;
183 if (!DecodeRequest(*request, &service_name)) {
184 reactor->Finish(
185 Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
186 return reactor;
187 }
188 ServingStatus serving_status = database->GetServingStatus(service_name);
189 if (serving_status == NOT_FOUND) {
190 reactor->Finish(Status(StatusCode::NOT_FOUND, "service name unknown"));
191 return reactor;
192 }
193 if (!EncodeResponse(serving_status, response)) {
194 reactor->Finish(Status(StatusCode::INTERNAL, "could not encode response"));
195 return reactor;
196 }
197 reactor->Finish(Status::OK);
198 return reactor;
199 }
200
DecodeRequest(const ByteBuffer & request,std::string * service_name)201 bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
202 const ByteBuffer& request, std::string* service_name) {
203 Slice slice;
204 if (!request.DumpToSingleSlice(&slice).ok()) return false;
205 uint8_t* request_bytes = nullptr;
206 size_t request_size = 0;
207 request_bytes = const_cast<uint8_t*>(slice.begin());
208 request_size = slice.size();
209 upb::Arena arena;
210 grpc_health_v1_HealthCheckRequest* request_struct =
211 grpc_health_v1_HealthCheckRequest_parse(
212 reinterpret_cast<char*>(request_bytes), request_size, arena.ptr());
213 if (request_struct == nullptr) {
214 return false;
215 }
216 upb_StringView service =
217 grpc_health_v1_HealthCheckRequest_service(request_struct);
218 if (service.size > MAX_SERVICE_NAME_LENGTH) {
219 return false;
220 }
221 service_name->assign(service.data, service.size);
222 return true;
223 }
224
EncodeResponse(ServingStatus status,ByteBuffer * response)225 bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
226 ServingStatus status, ByteBuffer* response) {
227 upb::Arena arena;
228 grpc_health_v1_HealthCheckResponse* response_struct =
229 grpc_health_v1_HealthCheckResponse_new(arena.ptr());
230 grpc_health_v1_HealthCheckResponse_set_status(
231 response_struct,
232 status == NOT_FOUND ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN
233 : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING
234 : grpc_health_v1_HealthCheckResponse_NOT_SERVING);
235 size_t buf_length;
236 char* buf = grpc_health_v1_HealthCheckResponse_serialize(
237 response_struct, arena.ptr(), &buf_length);
238 if (buf == nullptr) {
239 return false;
240 }
241 grpc_slice response_slice = grpc_slice_from_copied_buffer(buf, buf_length);
242 Slice encoded_response(response_slice, Slice::STEAL_REF);
243 ByteBuffer response_buffer(&encoded_response, 1);
244 response->Swap(&response_buffer);
245 return true;
246 }
247
248 //
249 // DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor
250 //
251
WatchReactor(HealthCheckServiceImpl * service,const ByteBuffer * request)252 DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::WatchReactor(
253 HealthCheckServiceImpl* service, const ByteBuffer* request)
254 : service_(service) {
255 {
256 grpc::internal::MutexLock lock(&service_->mu_);
257 ++service_->num_watches_;
258 }
259 bool success = DecodeRequest(*request, &service_name_);
260 VLOG(2) << "[HCS " << service_ << "] watcher " << this << " \""
261 << service_name_ << "\": watch call started";
262 if (!success) {
263 MaybeFinishLocked(Status(StatusCode::INTERNAL, "could not parse request"));
264 return;
265 }
266 // Register the call for updates to the service.
267 service_->database_->RegisterWatch(service_name_, Ref());
268 }
269
270 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
SendHealth(ServingStatus status)271 SendHealth(ServingStatus status) {
272 VLOG(2) << "[HCS " << service_ << "] watcher " << this << " \""
273 << service_name_ << "\": SendHealth() for ServingStatus " << status;
274 grpc::internal::MutexLock lock(&mu_);
275 // If there's already a send in flight, cache the new status, and
276 // we'll start a new send for it when the one in flight completes.
277 if (write_pending_) {
278 VLOG(2) << "[HCS " << service_ << "] watcher " << this << " \""
279 << service_name_ << "\": queuing write";
280 pending_status_ = status;
281 return;
282 }
283 // Start a send.
284 SendHealthLocked(status);
285 }
286
287 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
SendHealthLocked(ServingStatus status)288 SendHealthLocked(ServingStatus status) {
289 // Do nothing if Finish() has already been called.
290 if (finish_called_) return;
291 // Check if we're shutting down.
292 {
293 grpc::internal::MutexLock lock(&service_->mu_);
294 if (service_->shutdown_) {
295 MaybeFinishLocked(
296 Status(StatusCode::CANCELLED, "not writing due to shutdown"));
297 return;
298 }
299 }
300 // Send response.
301 bool success = EncodeResponse(status, &response_);
302 if (!success) {
303 MaybeFinishLocked(
304 Status(StatusCode::INTERNAL, "could not encode response"));
305 return;
306 }
307 VLOG(2) << "[HCS " << service_ << "] watcher " << this << " \""
308 << service_name_ << "\": starting write for ServingStatus " << status;
309 write_pending_ = true;
310 StartWrite(&response_);
311 }
312
313 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
OnWriteDone(bool ok)314 OnWriteDone(bool ok) {
315 VLOG(2) << "[HCS " << service_ << "] watcher " << this << " \""
316 << service_name_ << "\": OnWriteDone(): ok=" << ok;
317 response_.Clear();
318 grpc::internal::MutexLock lock(&mu_);
319 if (!ok) {
320 MaybeFinishLocked(Status(StatusCode::CANCELLED, "OnWriteDone() ok=false"));
321 return;
322 }
323 write_pending_ = false;
324 // If we got a new status since we started the last send, start a
325 // new send for it.
326 if (pending_status_ != NOT_FOUND) {
327 auto status = pending_status_;
328 pending_status_ = NOT_FOUND;
329 SendHealthLocked(status);
330 }
331 }
332
333 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
OnCancel()334 OnCancel() {
335 grpc::internal::MutexLock lock(&mu_);
336 MaybeFinishLocked(Status(StatusCode::UNKNOWN, "OnCancel()"));
337 }
338
OnDone()339 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::OnDone() {
340 VLOG(2) << "[HCS " << service_ << "] watcher " << this << " \""
341 << service_name_ << "\": OnDone()";
342 service_->database_->UnregisterWatch(service_name_, this);
343 {
344 grpc::internal::MutexLock lock(&service_->mu_);
345 if (--service_->num_watches_ == 0 && service_->shutdown_) {
346 service_->shutdown_condition_.Signal();
347 }
348 }
349 // Free the initial ref from instantiation.
350 Unref();
351 }
352
353 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
MaybeFinishLocked(Status status)354 MaybeFinishLocked(Status status) {
355 VLOG(2) << "[HCS " << service_ << "] watcher " << this << " \""
356 << service_name_
357 << "\": MaybeFinishLocked() with code=" << status.error_code()
358 << " msg=" << status.error_message();
359 if (!finish_called_) {
360 VLOG(2) << "[HCS " << service_ << "] watcher " << this << " \""
361 << service_name_ << "\": actually calling Finish()";
362 finish_called_ = true;
363 Finish(status);
364 }
365 }
366
367 } // namespace grpc
368