• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/cpp/server/health/default_health_check_service.h"
20 
21 #include <grpc/slice.h>
22 #include <grpcpp/impl/rpc_method.h>
23 #include <grpcpp/impl/rpc_service_method.h>
24 #include <grpcpp/impl/server_callback_handlers.h>
25 #include <grpcpp/support/slice.h>
26 #include <stdint.h>
27 
28 #include <memory>
29 #include <utility>
30 
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "src/proto/grpc/health/v1/health.upb.h"
34 #include "upb/base/string_view.h"
35 #include "upb/mem/arena.hpp"
36 
37 #define MAX_SERVICE_NAME_LENGTH 200
38 
39 namespace grpc {
40 
41 //
42 // DefaultHealthCheckService
43 //
44 
DefaultHealthCheckService()45 DefaultHealthCheckService::DefaultHealthCheckService() {
46   services_map_[""].SetServingStatus(SERVING);
47 }
48 
SetServingStatus(const std::string & service_name,bool serving)49 void DefaultHealthCheckService::SetServingStatus(
50     const std::string& service_name, bool serving) {
51   grpc::internal::MutexLock lock(&mu_);
52   if (shutdown_) {
53     // Set to NOT_SERVING in case service_name is not in the map.
54     serving = false;
55   }
56   services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
57 }
58 
SetServingStatus(bool serving)59 void DefaultHealthCheckService::SetServingStatus(bool serving) {
60   const ServingStatus status = serving ? SERVING : NOT_SERVING;
61   grpc::internal::MutexLock lock(&mu_);
62   if (shutdown_) return;
63   for (auto& p : services_map_) {
64     ServiceData& service_data = p.second;
65     service_data.SetServingStatus(status);
66   }
67 }
68 
Shutdown()69 void DefaultHealthCheckService::Shutdown() {
70   grpc::internal::MutexLock lock(&mu_);
71   if (shutdown_) return;
72   shutdown_ = true;
73   for (auto& p : services_map_) {
74     ServiceData& service_data = p.second;
75     service_data.SetServingStatus(NOT_SERVING);
76   }
77 }
78 
79 DefaultHealthCheckService::ServingStatus
GetServingStatus(const std::string & service_name) const80 DefaultHealthCheckService::GetServingStatus(
81     const std::string& service_name) const {
82   grpc::internal::MutexLock lock(&mu_);
83   auto it = services_map_.find(service_name);
84   if (it == services_map_.end()) return NOT_FOUND;
85   const ServiceData& service_data = it->second;
86   return service_data.GetServingStatus();
87 }
88 
RegisterWatch(const std::string & service_name,grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher)89 void DefaultHealthCheckService::RegisterWatch(
90     const std::string& service_name,
91     grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher) {
92   grpc::internal::MutexLock lock(&mu_);
93   ServiceData& service_data = services_map_[service_name];
94   watcher->SendHealth(service_data.GetServingStatus());
95   service_data.AddWatch(std::move(watcher));
96 }
97 
UnregisterWatch(const std::string & service_name,HealthCheckServiceImpl::WatchReactor * watcher)98 void DefaultHealthCheckService::UnregisterWatch(
99     const std::string& service_name,
100     HealthCheckServiceImpl::WatchReactor* watcher) {
101   grpc::internal::MutexLock lock(&mu_);
102   auto it = services_map_.find(service_name);
103   if (it == services_map_.end()) return;
104   ServiceData& service_data = it->second;
105   service_data.RemoveWatch(watcher);
106   if (service_data.Unused()) services_map_.erase(it);
107 }
108 
109 DefaultHealthCheckService::HealthCheckServiceImpl*
GetHealthCheckService()110 DefaultHealthCheckService::GetHealthCheckService() {
111   CHECK(impl_ == nullptr);
112   impl_ = std::make_unique<HealthCheckServiceImpl>(this);
113   return impl_.get();
114 }
115 
116 //
117 // DefaultHealthCheckService::ServiceData
118 //
119 
SetServingStatus(ServingStatus status)120 void DefaultHealthCheckService::ServiceData::SetServingStatus(
121     ServingStatus status) {
122   status_ = status;
123   for (const auto& p : watchers_) {
124     p.first->SendHealth(status);
125   }
126 }
127 
AddWatch(grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher)128 void DefaultHealthCheckService::ServiceData::AddWatch(
129     grpc_core::RefCountedPtr<HealthCheckServiceImpl::WatchReactor> watcher) {
130   watchers_[watcher.get()] = std::move(watcher);
131 }
132 
RemoveWatch(HealthCheckServiceImpl::WatchReactor * watcher)133 void DefaultHealthCheckService::ServiceData::RemoveWatch(
134     HealthCheckServiceImpl::WatchReactor* watcher) {
135   watchers_.erase(watcher);
136 }
137 
138 //
139 // DefaultHealthCheckService::HealthCheckServiceImpl
140 //
141 
142 namespace {
143 const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
144 const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
145 }  // namespace
146 
HealthCheckServiceImpl(DefaultHealthCheckService * database)147 DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
148     DefaultHealthCheckService* database)
149     : database_(database) {
150   // Add Check() method.
151   AddMethod(new internal::RpcServiceMethod(
152       kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
153   MarkMethodCallback(
154       0, new internal::CallbackUnaryHandler<ByteBuffer, ByteBuffer>(
155              [database](CallbackServerContext* context,
156                         const ByteBuffer* request, ByteBuffer* response) {
157                return HandleCheckRequest(database, context, request, response);
158              }));
159   // Add Watch() method.
160   AddMethod(new internal::RpcServiceMethod(
161       kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
162   MarkMethodCallback(
163       1, new internal::CallbackServerStreamingHandler<ByteBuffer, ByteBuffer>(
164              [this](CallbackServerContext* /*ctx*/, const ByteBuffer* request) {
165                return new WatchReactor(this, request);
166              }));
167 }
168 
~HealthCheckServiceImpl()169 DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
170   grpc::internal::MutexLock lock(&mu_);
171   shutdown_ = true;
172   while (num_watches_ > 0) {
173     shutdown_condition_.Wait(&mu_);
174   }
175 }
176 
177 ServerUnaryReactor*
HandleCheckRequest(DefaultHealthCheckService * database,CallbackServerContext * context,const ByteBuffer * request,ByteBuffer * response)178 DefaultHealthCheckService::HealthCheckServiceImpl::HandleCheckRequest(
179     DefaultHealthCheckService* database, CallbackServerContext* context,
180     const ByteBuffer* request, ByteBuffer* response) {
181   auto* reactor = context->DefaultReactor();
182   std::string service_name;
183   if (!DecodeRequest(*request, &service_name)) {
184     reactor->Finish(
185         Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
186     return reactor;
187   }
188   ServingStatus serving_status = database->GetServingStatus(service_name);
189   if (serving_status == NOT_FOUND) {
190     reactor->Finish(Status(StatusCode::NOT_FOUND, "service name unknown"));
191     return reactor;
192   }
193   if (!EncodeResponse(serving_status, response)) {
194     reactor->Finish(Status(StatusCode::INTERNAL, "could not encode response"));
195     return reactor;
196   }
197   reactor->Finish(Status::OK);
198   return reactor;
199 }
200 
DecodeRequest(const ByteBuffer & request,std::string * service_name)201 bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
202     const ByteBuffer& request, std::string* service_name) {
203   Slice slice;
204   if (!request.DumpToSingleSlice(&slice).ok()) return false;
205   uint8_t* request_bytes = nullptr;
206   size_t request_size = 0;
207   request_bytes = const_cast<uint8_t*>(slice.begin());
208   request_size = slice.size();
209   upb::Arena arena;
210   grpc_health_v1_HealthCheckRequest* request_struct =
211       grpc_health_v1_HealthCheckRequest_parse(
212           reinterpret_cast<char*>(request_bytes), request_size, arena.ptr());
213   if (request_struct == nullptr) {
214     return false;
215   }
216   upb_StringView service =
217       grpc_health_v1_HealthCheckRequest_service(request_struct);
218   if (service.size > MAX_SERVICE_NAME_LENGTH) {
219     return false;
220   }
221   service_name->assign(service.data, service.size);
222   return true;
223 }
224 
EncodeResponse(ServingStatus status,ByteBuffer * response)225 bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
226     ServingStatus status, ByteBuffer* response) {
227   upb::Arena arena;
228   grpc_health_v1_HealthCheckResponse* response_struct =
229       grpc_health_v1_HealthCheckResponse_new(arena.ptr());
230   grpc_health_v1_HealthCheckResponse_set_status(
231       response_struct,
232       status == NOT_FOUND ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN
233       : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING
234                           : grpc_health_v1_HealthCheckResponse_NOT_SERVING);
235   size_t buf_length;
236   char* buf = grpc_health_v1_HealthCheckResponse_serialize(
237       response_struct, arena.ptr(), &buf_length);
238   if (buf == nullptr) {
239     return false;
240   }
241   grpc_slice response_slice = grpc_slice_from_copied_buffer(buf, buf_length);
242   Slice encoded_response(response_slice, Slice::STEAL_REF);
243   ByteBuffer response_buffer(&encoded_response, 1);
244   response->Swap(&response_buffer);
245   return true;
246 }
247 
248 //
249 // DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor
250 //
251 
WatchReactor(HealthCheckServiceImpl * service,const ByteBuffer * request)252 DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::WatchReactor(
253     HealthCheckServiceImpl* service, const ByteBuffer* request)
254     : service_(service) {
255   {
256     grpc::internal::MutexLock lock(&service_->mu_);
257     ++service_->num_watches_;
258   }
259   bool success = DecodeRequest(*request, &service_name_);
260   VLOG(2) << "[HCS " << service_ << "] watcher " << this << " \""
261           << service_name_ << "\": watch call started";
262   if (!success) {
263     MaybeFinishLocked(Status(StatusCode::INTERNAL, "could not parse request"));
264     return;
265   }
266   // Register the call for updates to the service.
267   service_->database_->RegisterWatch(service_name_, Ref());
268 }
269 
270 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
SendHealth(ServingStatus status)271     SendHealth(ServingStatus status) {
272   VLOG(2) << "[HCS " << service_ << "] watcher " << this << " \""
273           << service_name_ << "\": SendHealth() for ServingStatus " << status;
274   grpc::internal::MutexLock lock(&mu_);
275   // If there's already a send in flight, cache the new status, and
276   // we'll start a new send for it when the one in flight completes.
277   if (write_pending_) {
278     VLOG(2) << "[HCS " << service_ << "] watcher " << this << " \""
279             << service_name_ << "\": queuing write";
280     pending_status_ = status;
281     return;
282   }
283   // Start a send.
284   SendHealthLocked(status);
285 }
286 
287 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
SendHealthLocked(ServingStatus status)288     SendHealthLocked(ServingStatus status) {
289   // Do nothing if Finish() has already been called.
290   if (finish_called_) return;
291   // Check if we're shutting down.
292   {
293     grpc::internal::MutexLock lock(&service_->mu_);
294     if (service_->shutdown_) {
295       MaybeFinishLocked(
296           Status(StatusCode::CANCELLED, "not writing due to shutdown"));
297       return;
298     }
299   }
300   // Send response.
301   bool success = EncodeResponse(status, &response_);
302   if (!success) {
303     MaybeFinishLocked(
304         Status(StatusCode::INTERNAL, "could not encode response"));
305     return;
306   }
307   VLOG(2) << "[HCS " << service_ << "] watcher " << this << " \""
308           << service_name_ << "\": starting write for ServingStatus " << status;
309   write_pending_ = true;
310   StartWrite(&response_);
311 }
312 
313 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
OnWriteDone(bool ok)314     OnWriteDone(bool ok) {
315   VLOG(2) << "[HCS " << service_ << "] watcher " << this << " \""
316           << service_name_ << "\": OnWriteDone(): ok=" << ok;
317   response_.Clear();
318   grpc::internal::MutexLock lock(&mu_);
319   if (!ok) {
320     MaybeFinishLocked(Status(StatusCode::CANCELLED, "OnWriteDone() ok=false"));
321     return;
322   }
323   write_pending_ = false;
324   // If we got a new status since we started the last send, start a
325   // new send for it.
326   if (pending_status_ != NOT_FOUND) {
327     auto status = pending_status_;
328     pending_status_ = NOT_FOUND;
329     SendHealthLocked(status);
330   }
331 }
332 
333 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
OnCancel()334     OnCancel() {
335   grpc::internal::MutexLock lock(&mu_);
336   MaybeFinishLocked(Status(StatusCode::UNKNOWN, "OnCancel()"));
337 }
338 
OnDone()339 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::OnDone() {
340   VLOG(2) << "[HCS " << service_ << "] watcher " << this << " \""
341           << service_name_ << "\": OnDone()";
342   service_->database_->UnregisterWatch(service_name_, this);
343   {
344     grpc::internal::MutexLock lock(&service_->mu_);
345     if (--service_->num_watches_ == 0 && service_->shutdown_) {
346       service_->shutdown_condition_.Signal();
347     }
348   }
349   // Free the initial ref from instantiation.
350   Unref();
351 }
352 
353 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchReactor::
MaybeFinishLocked(Status status)354     MaybeFinishLocked(Status status) {
355   VLOG(2) << "[HCS " << service_ << "] watcher " << this << " \""
356           << service_name_
357           << "\": MaybeFinishLocked() with code=" << status.error_code()
358           << " msg=" << status.error_message();
359   if (!finish_called_) {
360     VLOG(2) << "[HCS " << service_ << "] watcher " << this << " \""
361             << service_name_ << "\": actually calling Finish()";
362     finish_called_ = true;
363     Finish(status);
364   }
365 }
366 
367 }  // namespace grpc
368