• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <memory>
20 
21 #include "absl/memory/memory.h"
22 #include "upb/upb.hpp"
23 
24 #include <grpc/slice.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpcpp/impl/codegen/method_handler.h>
28 
29 #include "src/cpp/server/health/default_health_check_service.h"
30 #include "src/proto/grpc/health/v1/health.upb.h"
31 #include "upb/upb.hpp"
32 
33 #define MAX_SERVICE_NAME_LENGTH 200
34 
35 namespace grpc {
36 
37 //
38 // DefaultHealthCheckService
39 //
40 
DefaultHealthCheckService()41 DefaultHealthCheckService::DefaultHealthCheckService() {
42   services_map_[""].SetServingStatus(SERVING);
43 }
44 
SetServingStatus(const std::string & service_name,bool serving)45 void DefaultHealthCheckService::SetServingStatus(
46     const std::string& service_name, bool serving) {
47   grpc_core::MutexLock lock(&mu_);
48   if (shutdown_) {
49     // Set to NOT_SERVING in case service_name is not in the map.
50     serving = false;
51   }
52   services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
53 }
54 
SetServingStatus(bool serving)55 void DefaultHealthCheckService::SetServingStatus(bool serving) {
56   const ServingStatus status = serving ? SERVING : NOT_SERVING;
57   grpc_core::MutexLock lock(&mu_);
58   if (shutdown_) {
59     return;
60   }
61   for (auto& p : services_map_) {
62     ServiceData& service_data = p.second;
63     service_data.SetServingStatus(status);
64   }
65 }
66 
Shutdown()67 void DefaultHealthCheckService::Shutdown() {
68   grpc_core::MutexLock lock(&mu_);
69   if (shutdown_) {
70     return;
71   }
72   shutdown_ = true;
73   for (auto& p : services_map_) {
74     ServiceData& service_data = p.second;
75     service_data.SetServingStatus(NOT_SERVING);
76   }
77 }
78 
79 DefaultHealthCheckService::ServingStatus
GetServingStatus(const std::string & service_name) const80 DefaultHealthCheckService::GetServingStatus(
81     const std::string& service_name) const {
82   grpc_core::MutexLock lock(&mu_);
83   auto it = services_map_.find(service_name);
84   if (it == services_map_.end()) {
85     return NOT_FOUND;
86   }
87   const ServiceData& service_data = it->second;
88   return service_data.GetServingStatus();
89 }
90 
RegisterCallHandler(const std::string & service_name,std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler)91 void DefaultHealthCheckService::RegisterCallHandler(
92     const std::string& service_name,
93     std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
94   grpc_core::MutexLock lock(&mu_);
95   ServiceData& service_data = services_map_[service_name];
96   service_data.AddCallHandler(handler /* copies ref */);
97   HealthCheckServiceImpl::CallHandler* h = handler.get();
98   h->SendHealth(std::move(handler), service_data.GetServingStatus());
99 }
100 
UnregisterCallHandler(const std::string & service_name,const std::shared_ptr<HealthCheckServiceImpl::CallHandler> & handler)101 void DefaultHealthCheckService::UnregisterCallHandler(
102     const std::string& service_name,
103     const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
104   grpc_core::MutexLock lock(&mu_);
105   auto it = services_map_.find(service_name);
106   if (it == services_map_.end()) return;
107   ServiceData& service_data = it->second;
108   service_data.RemoveCallHandler(handler);
109   if (service_data.Unused()) {
110     services_map_.erase(it);
111   }
112 }
113 
114 DefaultHealthCheckService::HealthCheckServiceImpl*
GetHealthCheckService(std::unique_ptr<ServerCompletionQueue> cq)115 DefaultHealthCheckService::GetHealthCheckService(
116     std::unique_ptr<ServerCompletionQueue> cq) {
117   GPR_ASSERT(impl_ == nullptr);
118   impl_ = absl::make_unique<HealthCheckServiceImpl>(this, std::move(cq));
119   return impl_.get();
120 }
121 
122 //
123 // DefaultHealthCheckService::ServiceData
124 //
125 
SetServingStatus(ServingStatus status)126 void DefaultHealthCheckService::ServiceData::SetServingStatus(
127     ServingStatus status) {
128   status_ = status;
129   for (auto& call_handler : call_handlers_) {
130     call_handler->SendHealth(call_handler /* copies ref */, status);
131   }
132 }
133 
AddCallHandler(std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler)134 void DefaultHealthCheckService::ServiceData::AddCallHandler(
135     std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
136   call_handlers_.insert(std::move(handler));
137 }
138 
RemoveCallHandler(const std::shared_ptr<HealthCheckServiceImpl::CallHandler> & handler)139 void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
140     const std::shared_ptr<HealthCheckServiceImpl::CallHandler>& handler) {
141   call_handlers_.erase(handler);
142 }
143 
144 //
145 // DefaultHealthCheckService::HealthCheckServiceImpl
146 //
147 
148 namespace {
149 const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
150 const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
151 }  // namespace
152 
HealthCheckServiceImpl(DefaultHealthCheckService * database,std::unique_ptr<ServerCompletionQueue> cq)153 DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
154     DefaultHealthCheckService* database,
155     std::unique_ptr<ServerCompletionQueue> cq)
156     : database_(database), cq_(std::move(cq)) {
157   // Add Check() method.
158   AddMethod(new internal::RpcServiceMethod(
159       kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
160   // Add Watch() method.
161   AddMethod(new internal::RpcServiceMethod(
162       kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
163   // Create serving thread.
164   thread_ = absl::make_unique<::grpc_core::Thread>("grpc_health_check_service",
165                                                    Serve, this);
166 }
167 
~HealthCheckServiceImpl()168 DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
169   // We will reach here after the server starts shutting down.
170   shutdown_ = true;
171   {
172     grpc_core::MutexLock lock(&cq_shutdown_mu_);
173     cq_->Shutdown();
174   }
175   thread_->Join();
176 }
177 
StartServingThread()178 void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
179   // Request the calls we're interested in.
180   // We do this before starting the serving thread, so that we know it's
181   // done before server startup is complete.
182   CheckCallHandler::CreateAndStart(cq_.get(), database_, this);
183   WatchCallHandler::CreateAndStart(cq_.get(), database_, this);
184   // Start serving thread.
185   thread_->Start();
186 }
187 
Serve(void * arg)188 void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
189   HealthCheckServiceImpl* service = static_cast<HealthCheckServiceImpl*>(arg);
190   void* tag;
191   bool ok;
192   while (true) {
193     if (!service->cq_->Next(&tag, &ok)) {
194       // The completion queue is shutting down.
195       GPR_ASSERT(service->shutdown_);
196       break;
197     }
198     auto* next_step = static_cast<CallableTag*>(tag);
199     next_step->Run(ok);
200   }
201 }
202 
DecodeRequest(const ByteBuffer & request,std::string * service_name)203 bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
204     const ByteBuffer& request, std::string* service_name) {
205   std::vector<Slice> slices;
206   if (!request.Dump(&slices).ok()) return false;
207   uint8_t* request_bytes = nullptr;
208   size_t request_size = 0;
209   if (slices.size() == 1) {
210     request_bytes = const_cast<uint8_t*>(slices[0].begin());
211     request_size = slices[0].size();
212   } else if (slices.size() > 1) {
213     request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
214     uint8_t* copy_to = request_bytes;
215     for (size_t i = 0; i < slices.size(); i++) {
216       memcpy(copy_to, slices[i].begin(), slices[i].size());
217       copy_to += slices[i].size();
218     }
219   }
220   upb::Arena arena;
221   grpc_health_v1_HealthCheckRequest* request_struct =
222       grpc_health_v1_HealthCheckRequest_parse(
223           reinterpret_cast<char*>(request_bytes), request_size, arena.ptr());
224   if (slices.size() > 1) {
225     gpr_free(request_bytes);
226   }
227   if (request_struct == nullptr) {
228     return false;
229   }
230   upb_strview service =
231       grpc_health_v1_HealthCheckRequest_service(request_struct);
232   if (service.size > MAX_SERVICE_NAME_LENGTH) {
233     return false;
234   }
235   service_name->assign(service.data, service.size);
236   return true;
237 }
238 
EncodeResponse(ServingStatus status,ByteBuffer * response)239 bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
240     ServingStatus status, ByteBuffer* response) {
241   upb::Arena arena;
242   grpc_health_v1_HealthCheckResponse* response_struct =
243       grpc_health_v1_HealthCheckResponse_new(arena.ptr());
244   grpc_health_v1_HealthCheckResponse_set_status(
245       response_struct,
246       status == NOT_FOUND
247           ? grpc_health_v1_HealthCheckResponse_SERVICE_UNKNOWN
248           : status == SERVING ? grpc_health_v1_HealthCheckResponse_SERVING
249                               : grpc_health_v1_HealthCheckResponse_NOT_SERVING);
250   size_t buf_length;
251   char* buf = grpc_health_v1_HealthCheckResponse_serialize(
252       response_struct, arena.ptr(), &buf_length);
253   if (buf == nullptr) {
254     return false;
255   }
256   grpc_slice response_slice = grpc_slice_from_copied_buffer(buf, buf_length);
257   Slice encoded_response(response_slice, Slice::STEAL_REF);
258   ByteBuffer response_buffer(&encoded_response, 1);
259   response->Swap(&response_buffer);
260   return true;
261 }
262 
263 //
264 // DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
265 //
266 
267 void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
CreateAndStart(ServerCompletionQueue * cq,DefaultHealthCheckService * database,HealthCheckServiceImpl * service)268     CreateAndStart(ServerCompletionQueue* cq,
269                    DefaultHealthCheckService* database,
270                    HealthCheckServiceImpl* service) {
271   std::shared_ptr<CallHandler> self =
272       std::make_shared<CheckCallHandler>(cq, database, service);
273   CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
274   {
275     grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
276     if (service->shutdown_) return;
277     // Request a Check() call.
278     handler->next_ =
279         CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
280                               std::placeholders::_1, std::placeholders::_2),
281                     std::move(self));
282     service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
283                                &handler->writer_, cq, cq, &handler->next_);
284   }
285 }
286 
287 DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
CheckCallHandler(ServerCompletionQueue * cq,DefaultHealthCheckService * database,HealthCheckServiceImpl * service)288     CheckCallHandler(ServerCompletionQueue* cq,
289                      DefaultHealthCheckService* database,
290                      HealthCheckServiceImpl* service)
291     : cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
292 
293 void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
OnCallReceived(std::shared_ptr<CallHandler> self,bool ok)294     OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
295   if (!ok) {
296     // The value of ok being false means that the server is shutting down.
297     return;
298   }
299   // Spawn a new handler instance to serve the next new client. Every handler
300   // instance will deallocate itself when it's done.
301   CreateAndStart(cq_, database_, service_);
302   // Process request.
303   gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
304           this);
305   std::string service_name;
306   grpc::Status status = Status::OK;
307   ByteBuffer response;
308   if (!service_->DecodeRequest(request_, &service_name)) {
309     status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request");
310   } else {
311     ServingStatus serving_status = database_->GetServingStatus(service_name);
312     if (serving_status == NOT_FOUND) {
313       status = Status(StatusCode::NOT_FOUND, "service name unknown");
314     } else if (!service_->EncodeResponse(serving_status, &response)) {
315       status = Status(StatusCode::INTERNAL, "could not encode response");
316     }
317   }
318   // Send response.
319   {
320     grpc_core::MutexLock lock(&service_->cq_shutdown_mu_);
321     if (!service_->shutdown_) {
322       next_ =
323           CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
324                                 std::placeholders::_1, std::placeholders::_2),
325                       std::move(self));
326       if (status.ok()) {
327         writer_.Finish(response, status, &next_);
328       } else {
329         writer_.FinishWithError(status, &next_);
330       }
331     }
332   }
333 }
334 
335 void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
OnFinishDone(std::shared_ptr<CallHandler> self,bool ok)336     OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
337   if (ok) {
338     gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
339             service_, this);
340   }
341   self.reset();  // To appease clang-tidy.
342 }
343 
344 //
345 // DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
346 //
347 
348 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
CreateAndStart(ServerCompletionQueue * cq,DefaultHealthCheckService * database,HealthCheckServiceImpl * service)349     CreateAndStart(ServerCompletionQueue* cq,
350                    DefaultHealthCheckService* database,
351                    HealthCheckServiceImpl* service) {
352   std::shared_ptr<CallHandler> self =
353       std::make_shared<WatchCallHandler>(cq, database, service);
354   WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
355   {
356     grpc_core::MutexLock lock(&service->cq_shutdown_mu_);
357     if (service->shutdown_) return;
358     // Request AsyncNotifyWhenDone().
359     handler->on_done_notified_ =
360         CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
361                               std::placeholders::_1, std::placeholders::_2),
362                     self /* copies ref */);
363     handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
364     // Request a Watch() call.
365     handler->next_ =
366         CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
367                               std::placeholders::_1, std::placeholders::_2),
368                     std::move(self));
369     service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
370                                          &handler->stream_, cq, cq,
371                                          &handler->next_);
372   }
373 }
374 
375 DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
WatchCallHandler(ServerCompletionQueue * cq,DefaultHealthCheckService * database,HealthCheckServiceImpl * service)376     WatchCallHandler(ServerCompletionQueue* cq,
377                      DefaultHealthCheckService* database,
378                      HealthCheckServiceImpl* service)
379     : cq_(cq), database_(database), service_(service), stream_(&ctx_) {}
380 
381 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnCallReceived(std::shared_ptr<CallHandler> self,bool ok)382     OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
383   if (!ok) {
384     // Server shutting down.
385     //
386     // AsyncNotifyWhenDone() needs to be called before the call starts, but the
387     // tag will not pop out if the call never starts (
388     // https://github.com/grpc/grpc/issues/10136). So we need to manually
389     // release the ownership of the handler in this case.
390     GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
391     return;
392   }
393   // Spawn a new handler instance to serve the next new client. Every handler
394   // instance will deallocate itself when it's done.
395   CreateAndStart(cq_, database_, service_);
396   // Parse request.
397   if (!service_->DecodeRequest(request_, &service_name_)) {
398     SendFinish(std::move(self),
399                Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
400     return;
401   }
402   // Register the call for updates to the service.
403   gpr_log(GPR_DEBUG,
404           "[HCS %p] Health watch started for service \"%s\" (handler: %p)",
405           service_, service_name_.c_str(), this);
406   database_->RegisterCallHandler(service_name_, std::move(self));
407 }
408 
409 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendHealth(std::shared_ptr<CallHandler> self,ServingStatus status)410     SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
411   grpc_core::MutexLock lock(&send_mu_);
412   // If there's already a send in flight, cache the new status, and
413   // we'll start a new send for it when the one in flight completes.
414   if (send_in_flight_) {
415     pending_status_ = status;
416     return;
417   }
418   // Start a send.
419   SendHealthLocked(std::move(self), status);
420 }
421 
422 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendHealthLocked(std::shared_ptr<CallHandler> self,ServingStatus status)423     SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
424   send_in_flight_ = true;
425   // Construct response.
426   ByteBuffer response;
427   bool success = service_->EncodeResponse(status, &response);
428   // Grab shutdown lock and send response.
429   grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
430   if (service_->shutdown_) {
431     SendFinishLocked(std::move(self), Status::CANCELLED);
432     return;
433   }
434   if (!success) {
435     SendFinishLocked(std::move(self),
436                      Status(StatusCode::INTERNAL, "could not encode response"));
437     return;
438   }
439   next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
440                                 std::placeholders::_1, std::placeholders::_2),
441                       std::move(self));
442   stream_.Write(response, &next_);
443 }
444 
445 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnSendHealthDone(std::shared_ptr<CallHandler> self,bool ok)446     OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
447   if (!ok) {
448     SendFinish(std::move(self), Status::CANCELLED);
449     return;
450   }
451   grpc_core::MutexLock lock(&send_mu_);
452   send_in_flight_ = false;
453   // If we got a new status since we started the last send, start a
454   // new send for it.
455   if (pending_status_ != NOT_FOUND) {
456     auto status = pending_status_;
457     pending_status_ = NOT_FOUND;
458     SendHealthLocked(std::move(self), status);
459   }
460 }
461 
462 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendFinish(std::shared_ptr<CallHandler> self,const Status & status)463     SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
464   if (finish_called_) return;
465   grpc_core::MutexLock cq_lock(&service_->cq_shutdown_mu_);
466   if (service_->shutdown_) return;
467   SendFinishLocked(std::move(self), status);
468 }
469 
470 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
SendFinishLocked(std::shared_ptr<CallHandler> self,const Status & status)471     SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) {
472   on_finish_done_ =
473       CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
474                             std::placeholders::_1, std::placeholders::_2),
475                   std::move(self));
476   stream_.Finish(status, &on_finish_done_);
477   finish_called_ = true;
478 }
479 
480 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnFinishDone(std::shared_ptr<CallHandler> self,bool ok)481     OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
482   if (ok) {
483     gpr_log(GPR_DEBUG,
484             "[HCS %p] Health watch call finished (service_name: \"%s\", "
485             "handler: %p).",
486             service_, service_name_.c_str(), this);
487   }
488   self.reset();  // To appease clang-tidy.
489 }
490 
491 // TODO(roth): This method currently assumes that there will be only one
492 // thread polling the cq and invoking the corresponding callbacks.  If
493 // that changes, we will need to add synchronization here.
494 void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
OnDoneNotified(std::shared_ptr<CallHandler> self,bool ok)495     OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
496   GPR_ASSERT(ok);
497   gpr_log(GPR_DEBUG,
498           "[HCS %p] Health watch call is notified done (handler: %p, "
499           "is_cancelled: %d).",
500           service_, this, static_cast<int>(ctx_.IsCancelled()));
501   database_->UnregisterCallHandler(service_name_, self);
502   SendFinish(std::move(self), Status::CANCELLED);
503 }
504 
505 }  // namespace grpc
506