1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/impl/channel_arg_names.h>
18 #include <grpc/impl/connectivity_state.h>
19 #include <grpc/slice.h>
20 #include <grpc/status.h>
21 #include <grpc/support/port_platform.h>
22 #include <stdint.h>
23 #include <string.h>
24
25 #include <map>
26 #include <memory>
27 #include <set>
28 #include <string>
29 #include <type_traits>
30 #include <utility>
31
32 #include "absl/log/check.h"
33 #include "absl/log/log.h"
34 #include "absl/status/status.h"
35 #include "absl/status/statusor.h"
36 #include "absl/strings/str_cat.h"
37 #include "absl/strings/string_view.h"
38 #include "absl/types/optional.h"
39 #include "src/core/channelz/channel_trace.h"
40 #include "src/core/client_channel/client_channel_internal.h"
41 #include "src/core/client_channel/subchannel.h"
42 #include "src/core/client_channel/subchannel_stream_client.h"
43 #include "src/core/lib/address_utils/sockaddr_utils.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/debug/trace.h"
46 #include "src/core/lib/iomgr/closure.h"
47 #include "src/core/lib/iomgr/error.h"
48 #include "src/core/lib/iomgr/exec_ctx.h"
49 #include "src/core/lib/iomgr/iomgr_fwd.h"
50 #include "src/core/lib/iomgr/pollset_set.h"
51 #include "src/core/lib/slice/slice.h"
52 #include "src/core/lib/transport/connectivity_state.h"
53 #include "src/core/load_balancing/health_check_client_internal.h"
54 #include "src/core/load_balancing/subchannel_interface.h"
55 #include "src/core/util/debug_location.h"
56 #include "src/core/util/orphanable.h"
57 #include "src/core/util/ref_counted_ptr.h"
58 #include "src/core/util/sync.h"
59 #include "src/core/util/work_serializer.h"
60 #include "src/proto/grpc/health/v1/health.upb.h"
61 #include "upb/base/string_view.h"
62 #include "upb/mem/arena.hpp"
63
64 namespace grpc_core {
65
66 namespace {
67
68 // A fire-and-forget class to asynchronously drain a WorkSerializer queue.
69 class AsyncWorkSerializerDrainer final {
70 public:
AsyncWorkSerializerDrainer(std::shared_ptr<WorkSerializer> work_serializer)71 explicit AsyncWorkSerializerDrainer(
72 std::shared_ptr<WorkSerializer> work_serializer)
73 : work_serializer_(std::move(work_serializer)) {
74 GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
75 ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
76 }
77
78 private:
RunInExecCtx(void * arg,grpc_error_handle)79 static void RunInExecCtx(void* arg, grpc_error_handle) {
80 auto* self = static_cast<AsyncWorkSerializerDrainer*>(arg);
81 self->work_serializer_->DrainQueue();
82 delete self;
83 }
84
85 std::shared_ptr<WorkSerializer> work_serializer_;
86 grpc_closure closure_;
87 };
88
89 } // namespace
90
91 //
92 // HealthProducer::HealthChecker
93 //
94
HealthChecker(WeakRefCountedPtr<HealthProducer> producer,absl::string_view health_check_service_name)95 HealthProducer::HealthChecker::HealthChecker(
96 WeakRefCountedPtr<HealthProducer> producer,
97 absl::string_view health_check_service_name)
98 : producer_(std::move(producer)),
99 health_check_service_name_(health_check_service_name),
100 state_(producer_->state_ == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING
101 : producer_->state_),
102 status_(producer_->status_) {
103 // If the subchannel is already connected, start health checking.
104 if (producer_->state_ == GRPC_CHANNEL_READY) StartHealthStreamLocked();
105 }
106
Orphan()107 void HealthProducer::HealthChecker::Orphan() {
108 stream_client_.reset();
109 Unref();
110 }
111
AddWatcherLocked(HealthWatcher * watcher)112 void HealthProducer::HealthChecker::AddWatcherLocked(HealthWatcher* watcher) {
113 watchers_.insert(watcher);
114 if (state_.has_value()) watcher->Notify(*state_, status_);
115 }
116
RemoveWatcherLocked(HealthWatcher * watcher)117 bool HealthProducer::HealthChecker::RemoveWatcherLocked(
118 HealthWatcher* watcher) {
119 watchers_.erase(watcher);
120 return watchers_.empty();
121 }
122
OnConnectivityStateChangeLocked(grpc_connectivity_state state,const absl::Status & status)123 void HealthProducer::HealthChecker::OnConnectivityStateChangeLocked(
124 grpc_connectivity_state state, const absl::Status& status) {
125 if (state == GRPC_CHANNEL_READY) {
126 // We should already be in CONNECTING, and we don't want to change
127 // that until we see the initial response on the stream.
128 if (!state_.has_value()) {
129 state_ = GRPC_CHANNEL_CONNECTING;
130 status_ = absl::OkStatus();
131 } else {
132 CHECK(state_ == GRPC_CHANNEL_CONNECTING);
133 }
134 // Start the health watch stream.
135 StartHealthStreamLocked();
136 } else {
137 state_ = state;
138 status_ = status;
139 NotifyWatchersLocked(*state_, status_);
140 // We're not connected, so stop health checking.
141 stream_client_.reset();
142 }
143 }
144
NotifyWatchersLocked(grpc_connectivity_state state,absl::Status status)145 void HealthProducer::HealthChecker::NotifyWatchersLocked(
146 grpc_connectivity_state state, absl::Status status) {
147 GRPC_TRACE_LOG(health_check_client, INFO)
148 << "HealthProducer " << producer_.get() << " HealthChecker " << this
149 << ": reporting state " << ConnectivityStateName(state) << " to watchers";
150 work_serializer_->Schedule(
151 [self = Ref(), state, status = std::move(status)]() {
152 MutexLock lock(&self->producer_->mu_);
153 for (HealthWatcher* watcher : self->watchers_) {
154 watcher->Notify(state, status);
155 }
156 },
157 DEBUG_LOCATION);
158 new AsyncWorkSerializerDrainer(work_serializer_);
159 }
160
OnHealthWatchStatusChange(grpc_connectivity_state state,const absl::Status & status)161 void HealthProducer::HealthChecker::OnHealthWatchStatusChange(
162 grpc_connectivity_state state, const absl::Status& status) {
163 if (state == GRPC_CHANNEL_SHUTDOWN) return;
164 // Prepend the subchannel's address to the status if needed.
165 absl::Status use_status;
166 if (!status.ok()) {
167 use_status = absl::Status(
168 status.code(), absl::StrCat(producer_->subchannel_->address(), ": ",
169 status.message()));
170 }
171 work_serializer_->Schedule(
172 [self = Ref(), state, status = std::move(use_status)]() mutable {
173 MutexLock lock(&self->producer_->mu_);
174 if (self->stream_client_ != nullptr) {
175 self->state_ = state;
176 self->status_ = std::move(status);
177 for (HealthWatcher* watcher : self->watchers_) {
178 watcher->Notify(state, self->status_);
179 }
180 }
181 },
182 DEBUG_LOCATION);
183 new AsyncWorkSerializerDrainer(work_serializer_);
184 }
185
186 //
187 // HealthProducer::HealthChecker::HealthStreamEventHandler
188 //
189
190 class HealthProducer::HealthChecker::HealthStreamEventHandler final
191 : public SubchannelStreamClient::CallEventHandler {
192 public:
HealthStreamEventHandler(RefCountedPtr<HealthChecker> health_checker)193 explicit HealthStreamEventHandler(RefCountedPtr<HealthChecker> health_checker)
194 : health_checker_(std::move(health_checker)) {}
195
GetPathLocked()196 Slice GetPathLocked() override {
197 return Slice::FromStaticString("/grpc.health.v1.Health/Watch");
198 }
199
OnCallStartLocked(SubchannelStreamClient * client)200 void OnCallStartLocked(SubchannelStreamClient* client) override {
201 SetHealthStatusLocked(client, GRPC_CHANNEL_CONNECTING,
202 "starting health watch");
203 }
204
OnRetryTimerStartLocked(SubchannelStreamClient * client)205 void OnRetryTimerStartLocked(SubchannelStreamClient* client) override {
206 SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
207 "health check call failed; will retry after backoff");
208 }
209
EncodeSendMessageLocked()210 grpc_slice EncodeSendMessageLocked() override {
211 upb::Arena arena;
212 grpc_health_v1_HealthCheckRequest* request_struct =
213 grpc_health_v1_HealthCheckRequest_new(arena.ptr());
214 grpc_health_v1_HealthCheckRequest_set_service(
215 request_struct,
216 upb_StringView_FromDataAndSize(
217 health_checker_->health_check_service_name_.data(),
218 health_checker_->health_check_service_name_.size()));
219 size_t buf_length;
220 char* buf = grpc_health_v1_HealthCheckRequest_serialize(
221 request_struct, arena.ptr(), &buf_length);
222 grpc_slice request_slice = GRPC_SLICE_MALLOC(buf_length);
223 memcpy(GRPC_SLICE_START_PTR(request_slice), buf, buf_length);
224 return request_slice;
225 }
226
RecvMessageReadyLocked(SubchannelStreamClient * client,absl::string_view serialized_message)227 absl::Status RecvMessageReadyLocked(
228 SubchannelStreamClient* client,
229 absl::string_view serialized_message) override {
230 auto healthy = DecodeResponse(serialized_message);
231 if (!healthy.ok()) {
232 SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
233 healthy.status().ToString().c_str());
234 return healthy.status();
235 }
236 if (!*healthy) {
237 SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
238 "backend unhealthy");
239 } else {
240 SetHealthStatusLocked(client, GRPC_CHANNEL_READY, "OK");
241 }
242 return absl::OkStatus();
243 }
244
RecvTrailingMetadataReadyLocked(SubchannelStreamClient * client,grpc_status_code status)245 void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* client,
246 grpc_status_code status) override {
247 if (status == GRPC_STATUS_UNIMPLEMENTED) {
248 static const char kErrorMessage[] =
249 "health checking Watch method returned UNIMPLEMENTED; "
250 "disabling health checks but assuming server is healthy";
251 LOG(ERROR) << kErrorMessage;
252 auto* channelz_node =
253 health_checker_->producer_->subchannel_->channelz_node();
254 if (channelz_node != nullptr) {
255 channelz_node->AddTraceEvent(
256 channelz::ChannelTrace::Error,
257 grpc_slice_from_static_string(kErrorMessage));
258 }
259 SetHealthStatusLocked(client, GRPC_CHANNEL_READY, kErrorMessage);
260 }
261 }
262
263 private:
264 // Returns true if healthy.
DecodeResponse(absl::string_view serialized_message)265 static absl::StatusOr<bool> DecodeResponse(
266 absl::string_view serialized_message) {
267 // Deserialize message.
268 upb::Arena arena;
269 auto* response = grpc_health_v1_HealthCheckResponse_parse(
270 serialized_message.data(), serialized_message.size(), arena.ptr());
271 if (response == nullptr) {
272 // Can't parse message; assume unhealthy.
273 return absl::InvalidArgumentError("cannot parse health check response");
274 }
275 int32_t status = grpc_health_v1_HealthCheckResponse_status(response);
276 return status == grpc_health_v1_HealthCheckResponse_SERVING;
277 }
278
SetHealthStatusLocked(SubchannelStreamClient * client,grpc_connectivity_state state,const char * reason)279 void SetHealthStatusLocked(SubchannelStreamClient* client,
280 grpc_connectivity_state state,
281 const char* reason) {
282 GRPC_TRACE_LOG(health_check_client, INFO)
283 << "HealthCheckClient " << client
284 << ": setting state=" << ConnectivityStateName(state)
285 << " reason=" << reason;
286 health_checker_->OnHealthWatchStatusChange(
287 state, state == GRPC_CHANNEL_TRANSIENT_FAILURE
288 ? absl::UnavailableError(reason)
289 : absl::OkStatus());
290 }
291
292 RefCountedPtr<HealthChecker> health_checker_;
293 };
294
StartHealthStreamLocked()295 void HealthProducer::HealthChecker::StartHealthStreamLocked() {
296 GRPC_TRACE_LOG(health_check_client, INFO)
297 << "HealthProducer " << producer_.get() << " HealthChecker " << this
298 << ": creating HealthClient for \"" << health_check_service_name_ << "\"";
299 stream_client_ = MakeOrphanable<SubchannelStreamClient>(
300 producer_->connected_subchannel_, producer_->subchannel_->pollset_set(),
301 std::make_unique<HealthStreamEventHandler>(Ref()),
302 GRPC_TRACE_FLAG_ENABLED(health_check_client) ? "HealthClient" : nullptr);
303 }
304
305 //
306 // HealthProducer::ConnectivityWatcher
307 //
308
309 class HealthProducer::ConnectivityWatcher final
310 : public Subchannel::ConnectivityStateWatcherInterface {
311 public:
ConnectivityWatcher(WeakRefCountedPtr<HealthProducer> producer)312 explicit ConnectivityWatcher(WeakRefCountedPtr<HealthProducer> producer)
313 : producer_(std::move(producer)) {}
314
OnConnectivityStateChange(RefCountedPtr<ConnectivityStateWatcherInterface> self,grpc_connectivity_state state,const absl::Status & status)315 void OnConnectivityStateChange(
316 RefCountedPtr<ConnectivityStateWatcherInterface> self,
317 grpc_connectivity_state state, const absl::Status& status) override {
318 producer_->OnConnectivityStateChange(state, status);
319 self.reset();
320 }
321
interested_parties()322 grpc_pollset_set* interested_parties() override {
323 return producer_->interested_parties_;
324 }
325
326 private:
327 WeakRefCountedPtr<HealthProducer> producer_;
328 };
329
330 //
331 // HealthProducer
332 //
333
Start(RefCountedPtr<Subchannel> subchannel)334 void HealthProducer::Start(RefCountedPtr<Subchannel> subchannel) {
335 GRPC_TRACE_LOG(health_check_client, INFO)
336 << "HealthProducer " << this << ": starting with subchannel "
337 << subchannel.get();
338 subchannel_ = std::move(subchannel);
339 {
340 MutexLock lock(&mu_);
341 connected_subchannel_ = subchannel_->connected_subchannel();
342 }
343 auto connectivity_watcher =
344 MakeRefCounted<ConnectivityWatcher>(WeakRefAsSubclass<HealthProducer>());
345 connectivity_watcher_ = connectivity_watcher.get();
346 subchannel_->WatchConnectivityState(std::move(connectivity_watcher));
347 }
348
Orphaned()349 void HealthProducer::Orphaned() {
350 GRPC_TRACE_LOG(health_check_client, INFO)
351 << "HealthProducer " << this << ": shutting down";
352 {
353 MutexLock lock(&mu_);
354 health_checkers_.clear();
355 }
356 subchannel_->CancelConnectivityStateWatch(connectivity_watcher_);
357 subchannel_->RemoveDataProducer(this);
358 }
359
AddWatcher(HealthWatcher * watcher,const absl::optional<std::string> & health_check_service_name)360 void HealthProducer::AddWatcher(
361 HealthWatcher* watcher,
362 const absl::optional<std::string>& health_check_service_name) {
363 MutexLock lock(&mu_);
364 grpc_pollset_set_add_pollset_set(interested_parties_,
365 watcher->interested_parties());
366 if (!health_check_service_name.has_value()) {
367 if (state_.has_value()) watcher->Notify(*state_, status_);
368 non_health_watchers_.insert(watcher);
369 } else {
370 auto it =
371 health_checkers_.emplace(*health_check_service_name, nullptr).first;
372 auto& health_checker = it->second;
373 if (health_checker == nullptr) {
374 health_checker = MakeOrphanable<HealthChecker>(
375 WeakRefAsSubclass<HealthProducer>(), it->first);
376 }
377 health_checker->AddWatcherLocked(watcher);
378 }
379 }
380
RemoveWatcher(HealthWatcher * watcher,const absl::optional<std::string> & health_check_service_name)381 void HealthProducer::RemoveWatcher(
382 HealthWatcher* watcher,
383 const absl::optional<std::string>& health_check_service_name) {
384 MutexLock lock(&mu_);
385 grpc_pollset_set_del_pollset_set(interested_parties_,
386 watcher->interested_parties());
387 if (!health_check_service_name.has_value()) {
388 non_health_watchers_.erase(watcher);
389 } else {
390 auto it = health_checkers_.find(*health_check_service_name);
391 if (it == health_checkers_.end()) return;
392 const bool empty = it->second->RemoveWatcherLocked(watcher);
393 if (empty) health_checkers_.erase(it);
394 }
395 }
396
OnConnectivityStateChange(grpc_connectivity_state state,const absl::Status & status)397 void HealthProducer::OnConnectivityStateChange(grpc_connectivity_state state,
398 const absl::Status& status) {
399 GRPC_TRACE_LOG(health_check_client, INFO)
400 << "HealthProducer " << this
401 << ": subchannel state update: state=" << ConnectivityStateName(state)
402 << " status=" << status;
403 MutexLock lock(&mu_);
404 if (state == GRPC_CHANNEL_READY) {
405 connected_subchannel_ = subchannel_->connected_subchannel();
406 // If the subchannel became disconnected again before we got this
407 // notification, then just ignore the READY notification. We should
408 // get another notification shortly indicating a different state.
409 if (connected_subchannel_ == nullptr) return;
410 } else {
411 connected_subchannel_.reset();
412 }
413 state_ = state;
414 status_ = status;
415 for (const auto& p : health_checkers_) {
416 p.second->OnConnectivityStateChangeLocked(state, status);
417 }
418 for (HealthWatcher* watcher : non_health_watchers_) {
419 watcher->Notify(state, status);
420 }
421 }
422
423 //
424 // HealthWatcher
425 //
426
~HealthWatcher()427 HealthWatcher::~HealthWatcher() {
428 GRPC_TRACE_LOG(health_check_client, INFO)
429 << "HealthWatcher " << this << ": unregistering from producer "
430 << producer_.get() << " (health_check_service_name=\""
431 << health_check_service_name_.value_or("N/A") << "\")";
432 if (producer_ != nullptr) {
433 producer_->RemoveWatcher(this, health_check_service_name_);
434 }
435 }
436
SetSubchannel(Subchannel * subchannel)437 void HealthWatcher::SetSubchannel(Subchannel* subchannel) {
438 bool created = false;
439 // Check if our producer is already registered with the subchannel.
440 // If not, create a new one.
441 subchannel->GetOrAddDataProducer(
442 HealthProducer::Type(),
443 [&](Subchannel::DataProducerInterface** producer) {
444 if (*producer != nullptr) {
445 producer_ =
446 (*producer)->RefIfNonZero().TakeAsSubclass<HealthProducer>();
447 }
448 if (producer_ == nullptr) {
449 producer_ = MakeRefCounted<HealthProducer>();
450 *producer = producer_.get();
451 created = true;
452 }
453 });
454 // If we just created the producer, start it.
455 // This needs to be done outside of the lambda passed to
456 // GetOrAddDataProducer() to avoid deadlocking by re-acquiring the
457 // subchannel lock while already holding it.
458 if (created) producer_->Start(subchannel->Ref());
459 // Register ourself with the producer.
460 producer_->AddWatcher(this, health_check_service_name_);
461 GRPC_TRACE_LOG(health_check_client, INFO)
462 << "HealthWatcher " << this << ": registered with producer "
463 << producer_.get() << " (created=" << created
464 << ", health_check_service_name=\""
465 << health_check_service_name_.value_or("N/A") << "\")";
466 }
467
Notify(grpc_connectivity_state state,absl::Status status)468 void HealthWatcher::Notify(grpc_connectivity_state state, absl::Status status) {
469 work_serializer_->Schedule(
470 [watcher = watcher_, state, status = std::move(status)]() mutable {
471 watcher->OnConnectivityStateChange(state, std::move(status));
472 },
473 DEBUG_LOCATION);
474 new AsyncWorkSerializerDrainer(work_serializer_);
475 }
476
477 //
478 // External API
479 //
480
481 std::unique_ptr<SubchannelInterface::DataWatcherInterface>
MakeHealthCheckWatcher(std::shared_ptr<WorkSerializer> work_serializer,const ChannelArgs & args,std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher)482 MakeHealthCheckWatcher(
483 std::shared_ptr<WorkSerializer> work_serializer, const ChannelArgs& args,
484 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
485 watcher) {
486 absl::optional<std::string> health_check_service_name;
487 if (!args.GetBool(GRPC_ARG_INHIBIT_HEALTH_CHECKING).value_or(false)) {
488 health_check_service_name =
489 args.GetOwnedString(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME);
490 }
491 GRPC_TRACE_LOG(health_check_client, INFO)
492 << "creating HealthWatcher -- health_check_service_name=\""
493 << health_check_service_name.value_or("N/A") << "\"";
494 return std::make_unique<HealthWatcher>(std::move(work_serializer),
495 std::move(health_check_service_name),
496 std::move(watcher));
497 }
498
499 } // namespace grpc_core
500