1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <memory>
20 #include <mutex>
21
22 #include <grpc/slice.h>
23 #include <grpc/support/alloc.h>
24 #include <grpc/support/log.h>
25 #include <grpcpp/impl/codegen/method_handler_impl.h>
26
27 #include "pb_decode.h"
28 #include "pb_encode.h"
29 #include "src/cpp/server/health/default_health_check_service.h"
30 #include "src/cpp/server/health/health.pb.h"
31
32 namespace grpc {
33 namespace {
34 const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
35 } // namespace
36
HealthCheckServiceImpl(DefaultHealthCheckService * service)37 DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
38 DefaultHealthCheckService* service)
39 : service_(service), method_(nullptr) {
40 internal::MethodHandler* handler =
41 new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer,
42 ByteBuffer>(
43 std::mem_fn(&HealthCheckServiceImpl::Check), this);
44 method_ = new internal::RpcServiceMethod(
45 kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler);
46 AddMethod(method_);
47 }
48
Check(ServerContext * context,const ByteBuffer * request,ByteBuffer * response)49 Status DefaultHealthCheckService::HealthCheckServiceImpl::Check(
50 ServerContext* context, const ByteBuffer* request, ByteBuffer* response) {
51 // Decode request.
52 std::vector<Slice> slices;
53 if (!request->Dump(&slices).ok()) {
54 return Status(StatusCode::INVALID_ARGUMENT, "");
55 }
56 uint8_t* request_bytes = nullptr;
57 bool request_bytes_owned = false;
58 size_t request_size = 0;
59 grpc_health_v1_HealthCheckRequest request_struct;
60 if (slices.empty()) {
61 request_struct.has_service = false;
62 } else if (slices.size() == 1) {
63 request_bytes = const_cast<uint8_t*>(slices[0].begin());
64 request_size = slices[0].size();
65 } else {
66 request_bytes_owned = true;
67 request_bytes = static_cast<uint8_t*>(gpr_malloc(request->Length()));
68 uint8_t* copy_to = request_bytes;
69 for (size_t i = 0; i < slices.size(); i++) {
70 memcpy(copy_to, slices[i].begin(), slices[i].size());
71 copy_to += slices[i].size();
72 }
73 }
74
75 if (request_bytes != nullptr) {
76 pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size);
77 bool decode_status = pb_decode(
78 &istream, grpc_health_v1_HealthCheckRequest_fields, &request_struct);
79 if (request_bytes_owned) {
80 gpr_free(request_bytes);
81 }
82 if (!decode_status) {
83 return Status(StatusCode::INVALID_ARGUMENT, "");
84 }
85 }
86
87 // Check status from the associated default health checking service.
88 DefaultHealthCheckService::ServingStatus serving_status =
89 service_->GetServingStatus(
90 request_struct.has_service ? request_struct.service : "");
91 if (serving_status == DefaultHealthCheckService::NOT_FOUND) {
92 return Status(StatusCode::NOT_FOUND, "");
93 }
94
95 // Encode response
96 grpc_health_v1_HealthCheckResponse response_struct;
97 response_struct.has_status = true;
98 response_struct.status =
99 serving_status == DefaultHealthCheckService::SERVING
100 ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
101 : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
102 pb_ostream_t ostream;
103 memset(&ostream, 0, sizeof(ostream));
104 pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields,
105 &response_struct);
106 grpc_slice response_slice = grpc_slice_malloc(ostream.bytes_written);
107 ostream = pb_ostream_from_buffer(GRPC_SLICE_START_PTR(response_slice),
108 GRPC_SLICE_LENGTH(response_slice));
109 bool encode_status = pb_encode(
110 &ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct);
111 if (!encode_status) {
112 return Status(StatusCode::INTERNAL, "Failed to encode response.");
113 }
114 Slice encoded_response(response_slice, Slice::STEAL_REF);
115 ByteBuffer response_buffer(&encoded_response, 1);
116 response->Swap(&response_buffer);
117 return Status::OK;
118 }
119
DefaultHealthCheckService()120 DefaultHealthCheckService::DefaultHealthCheckService() {
121 services_map_.emplace("", true);
122 }
123
SetServingStatus(const grpc::string & service_name,bool serving)124 void DefaultHealthCheckService::SetServingStatus(
125 const grpc::string& service_name, bool serving) {
126 std::lock_guard<std::mutex> lock(mu_);
127 services_map_[service_name] = serving;
128 }
129
SetServingStatus(bool serving)130 void DefaultHealthCheckService::SetServingStatus(bool serving) {
131 std::lock_guard<std::mutex> lock(mu_);
132 for (auto iter = services_map_.begin(); iter != services_map_.end(); ++iter) {
133 iter->second = serving;
134 }
135 }
136
137 DefaultHealthCheckService::ServingStatus
GetServingStatus(const grpc::string & service_name) const138 DefaultHealthCheckService::GetServingStatus(
139 const grpc::string& service_name) const {
140 std::lock_guard<std::mutex> lock(mu_);
141 const auto& iter = services_map_.find(service_name);
142 if (iter == services_map_.end()) {
143 return NOT_FOUND;
144 }
145 return iter->second ? SERVING : NOT_SERVING;
146 }
147
148 DefaultHealthCheckService::HealthCheckServiceImpl*
GetHealthCheckService()149 DefaultHealthCheckService::GetHealthCheckService() {
150 GPR_ASSERT(impl_ == nullptr);
151 impl_.reset(new HealthCheckServiceImpl(this));
152 return impl_.get();
153 }
154
155 } // namespace grpc
156