• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/impl/channel_arg_names.h>
18 #include <grpc/impl/connectivity_state.h>
19 #include <grpc/slice.h>
20 #include <grpc/status.h>
21 #include <grpc/support/port_platform.h>
22 #include <stdint.h>
23 #include <string.h>
24 
25 #include <map>
26 #include <memory>
27 #include <set>
28 #include <string>
29 #include <type_traits>
30 #include <utility>
31 
32 #include "absl/log/check.h"
33 #include "absl/log/log.h"
34 #include "absl/status/status.h"
35 #include "absl/status/statusor.h"
36 #include "absl/strings/str_cat.h"
37 #include "absl/strings/string_view.h"
38 #include "absl/types/optional.h"
39 #include "src/core/channelz/channel_trace.h"
40 #include "src/core/client_channel/client_channel_internal.h"
41 #include "src/core/client_channel/subchannel.h"
42 #include "src/core/client_channel/subchannel_stream_client.h"
43 #include "src/core/lib/address_utils/sockaddr_utils.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/debug/trace.h"
46 #include "src/core/lib/iomgr/closure.h"
47 #include "src/core/lib/iomgr/error.h"
48 #include "src/core/lib/iomgr/exec_ctx.h"
49 #include "src/core/lib/iomgr/iomgr_fwd.h"
50 #include "src/core/lib/iomgr/pollset_set.h"
51 #include "src/core/lib/slice/slice.h"
52 #include "src/core/lib/transport/connectivity_state.h"
53 #include "src/core/load_balancing/health_check_client_internal.h"
54 #include "src/core/load_balancing/subchannel_interface.h"
55 #include "src/core/util/debug_location.h"
56 #include "src/core/util/orphanable.h"
57 #include "src/core/util/ref_counted_ptr.h"
58 #include "src/core/util/sync.h"
59 #include "src/core/util/work_serializer.h"
60 #include "src/proto/grpc/health/v1/health.upb.h"
61 #include "upb/base/string_view.h"
62 #include "upb/mem/arena.hpp"
63 
64 namespace grpc_core {
65 
66 namespace {
67 
68 // A fire-and-forget class to asynchronously drain a WorkSerializer queue.
69 class AsyncWorkSerializerDrainer final {
70  public:
AsyncWorkSerializerDrainer(std::shared_ptr<WorkSerializer> work_serializer)71   explicit AsyncWorkSerializerDrainer(
72       std::shared_ptr<WorkSerializer> work_serializer)
73       : work_serializer_(std::move(work_serializer)) {
74     GRPC_CLOSURE_INIT(&closure_, RunInExecCtx, this, nullptr);
75     ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
76   }
77 
78  private:
RunInExecCtx(void * arg,grpc_error_handle)79   static void RunInExecCtx(void* arg, grpc_error_handle) {
80     auto* self = static_cast<AsyncWorkSerializerDrainer*>(arg);
81     self->work_serializer_->DrainQueue();
82     delete self;
83   }
84 
85   std::shared_ptr<WorkSerializer> work_serializer_;
86   grpc_closure closure_;
87 };
88 
89 }  // namespace
90 
91 //
92 // HealthProducer::HealthChecker
93 //
94 
HealthChecker(WeakRefCountedPtr<HealthProducer> producer,absl::string_view health_check_service_name)95 HealthProducer::HealthChecker::HealthChecker(
96     WeakRefCountedPtr<HealthProducer> producer,
97     absl::string_view health_check_service_name)
98     : producer_(std::move(producer)),
99       health_check_service_name_(health_check_service_name),
100       state_(producer_->state_ == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING
101                                                      : producer_->state_),
102       status_(producer_->status_) {
103   // If the subchannel is already connected, start health checking.
104   if (producer_->state_ == GRPC_CHANNEL_READY) StartHealthStreamLocked();
105 }
106 
Orphan()107 void HealthProducer::HealthChecker::Orphan() {
108   stream_client_.reset();
109   Unref();
110 }
111 
AddWatcherLocked(HealthWatcher * watcher)112 void HealthProducer::HealthChecker::AddWatcherLocked(HealthWatcher* watcher) {
113   watchers_.insert(watcher);
114   if (state_.has_value()) watcher->Notify(*state_, status_);
115 }
116 
RemoveWatcherLocked(HealthWatcher * watcher)117 bool HealthProducer::HealthChecker::RemoveWatcherLocked(
118     HealthWatcher* watcher) {
119   watchers_.erase(watcher);
120   return watchers_.empty();
121 }
122 
OnConnectivityStateChangeLocked(grpc_connectivity_state state,const absl::Status & status)123 void HealthProducer::HealthChecker::OnConnectivityStateChangeLocked(
124     grpc_connectivity_state state, const absl::Status& status) {
125   if (state == GRPC_CHANNEL_READY) {
126     // We should already be in CONNECTING, and we don't want to change
127     // that until we see the initial response on the stream.
128     if (!state_.has_value()) {
129       state_ = GRPC_CHANNEL_CONNECTING;
130       status_ = absl::OkStatus();
131     } else {
132       CHECK(state_ == GRPC_CHANNEL_CONNECTING);
133     }
134     // Start the health watch stream.
135     StartHealthStreamLocked();
136   } else {
137     state_ = state;
138     status_ = status;
139     NotifyWatchersLocked(*state_, status_);
140     // We're not connected, so stop health checking.
141     stream_client_.reset();
142   }
143 }
144 
NotifyWatchersLocked(grpc_connectivity_state state,absl::Status status)145 void HealthProducer::HealthChecker::NotifyWatchersLocked(
146     grpc_connectivity_state state, absl::Status status) {
147   GRPC_TRACE_LOG(health_check_client, INFO)
148       << "HealthProducer " << producer_.get() << " HealthChecker " << this
149       << ": reporting state " << ConnectivityStateName(state) << " to watchers";
150   work_serializer_->Schedule(
151       [self = Ref(), state, status = std::move(status)]() {
152         MutexLock lock(&self->producer_->mu_);
153         for (HealthWatcher* watcher : self->watchers_) {
154           watcher->Notify(state, status);
155         }
156       },
157       DEBUG_LOCATION);
158   new AsyncWorkSerializerDrainer(work_serializer_);
159 }
160 
OnHealthWatchStatusChange(grpc_connectivity_state state,const absl::Status & status)161 void HealthProducer::HealthChecker::OnHealthWatchStatusChange(
162     grpc_connectivity_state state, const absl::Status& status) {
163   if (state == GRPC_CHANNEL_SHUTDOWN) return;
164   // Prepend the subchannel's address to the status if needed.
165   absl::Status use_status;
166   if (!status.ok()) {
167     use_status = absl::Status(
168         status.code(), absl::StrCat(producer_->subchannel_->address(), ": ",
169                                     status.message()));
170   }
171   work_serializer_->Schedule(
172       [self = Ref(), state, status = std::move(use_status)]() mutable {
173         MutexLock lock(&self->producer_->mu_);
174         if (self->stream_client_ != nullptr) {
175           self->state_ = state;
176           self->status_ = std::move(status);
177           for (HealthWatcher* watcher : self->watchers_) {
178             watcher->Notify(state, self->status_);
179           }
180         }
181       },
182       DEBUG_LOCATION);
183   new AsyncWorkSerializerDrainer(work_serializer_);
184 }
185 
186 //
187 // HealthProducer::HealthChecker::HealthStreamEventHandler
188 //
189 
190 class HealthProducer::HealthChecker::HealthStreamEventHandler final
191     : public SubchannelStreamClient::CallEventHandler {
192  public:
HealthStreamEventHandler(RefCountedPtr<HealthChecker> health_checker)193   explicit HealthStreamEventHandler(RefCountedPtr<HealthChecker> health_checker)
194       : health_checker_(std::move(health_checker)) {}
195 
GetPathLocked()196   Slice GetPathLocked() override {
197     return Slice::FromStaticString("/grpc.health.v1.Health/Watch");
198   }
199 
OnCallStartLocked(SubchannelStreamClient * client)200   void OnCallStartLocked(SubchannelStreamClient* client) override {
201     SetHealthStatusLocked(client, GRPC_CHANNEL_CONNECTING,
202                           "starting health watch");
203   }
204 
OnRetryTimerStartLocked(SubchannelStreamClient * client)205   void OnRetryTimerStartLocked(SubchannelStreamClient* client) override {
206     SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
207                           "health check call failed; will retry after backoff");
208   }
209 
EncodeSendMessageLocked()210   grpc_slice EncodeSendMessageLocked() override {
211     upb::Arena arena;
212     grpc_health_v1_HealthCheckRequest* request_struct =
213         grpc_health_v1_HealthCheckRequest_new(arena.ptr());
214     grpc_health_v1_HealthCheckRequest_set_service(
215         request_struct,
216         upb_StringView_FromDataAndSize(
217             health_checker_->health_check_service_name_.data(),
218             health_checker_->health_check_service_name_.size()));
219     size_t buf_length;
220     char* buf = grpc_health_v1_HealthCheckRequest_serialize(
221         request_struct, arena.ptr(), &buf_length);
222     grpc_slice request_slice = GRPC_SLICE_MALLOC(buf_length);
223     memcpy(GRPC_SLICE_START_PTR(request_slice), buf, buf_length);
224     return request_slice;
225   }
226 
RecvMessageReadyLocked(SubchannelStreamClient * client,absl::string_view serialized_message)227   absl::Status RecvMessageReadyLocked(
228       SubchannelStreamClient* client,
229       absl::string_view serialized_message) override {
230     auto healthy = DecodeResponse(serialized_message);
231     if (!healthy.ok()) {
232       SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
233                             healthy.status().ToString().c_str());
234       return healthy.status();
235     }
236     if (!*healthy) {
237       SetHealthStatusLocked(client, GRPC_CHANNEL_TRANSIENT_FAILURE,
238                             "backend unhealthy");
239     } else {
240       SetHealthStatusLocked(client, GRPC_CHANNEL_READY, "OK");
241     }
242     return absl::OkStatus();
243   }
244 
RecvTrailingMetadataReadyLocked(SubchannelStreamClient * client,grpc_status_code status)245   void RecvTrailingMetadataReadyLocked(SubchannelStreamClient* client,
246                                        grpc_status_code status) override {
247     if (status == GRPC_STATUS_UNIMPLEMENTED) {
248       static const char kErrorMessage[] =
249           "health checking Watch method returned UNIMPLEMENTED; "
250           "disabling health checks but assuming server is healthy";
251       LOG(ERROR) << kErrorMessage;
252       auto* channelz_node =
253           health_checker_->producer_->subchannel_->channelz_node();
254       if (channelz_node != nullptr) {
255         channelz_node->AddTraceEvent(
256             channelz::ChannelTrace::Error,
257             grpc_slice_from_static_string(kErrorMessage));
258       }
259       SetHealthStatusLocked(client, GRPC_CHANNEL_READY, kErrorMessage);
260     }
261   }
262 
263  private:
264   // Returns true if healthy.
DecodeResponse(absl::string_view serialized_message)265   static absl::StatusOr<bool> DecodeResponse(
266       absl::string_view serialized_message) {
267     // Deserialize message.
268     upb::Arena arena;
269     auto* response = grpc_health_v1_HealthCheckResponse_parse(
270         serialized_message.data(), serialized_message.size(), arena.ptr());
271     if (response == nullptr) {
272       // Can't parse message; assume unhealthy.
273       return absl::InvalidArgumentError("cannot parse health check response");
274     }
275     int32_t status = grpc_health_v1_HealthCheckResponse_status(response);
276     return status == grpc_health_v1_HealthCheckResponse_SERVING;
277   }
278 
SetHealthStatusLocked(SubchannelStreamClient * client,grpc_connectivity_state state,const char * reason)279   void SetHealthStatusLocked(SubchannelStreamClient* client,
280                              grpc_connectivity_state state,
281                              const char* reason) {
282     GRPC_TRACE_LOG(health_check_client, INFO)
283         << "HealthCheckClient " << client
284         << ": setting state=" << ConnectivityStateName(state)
285         << " reason=" << reason;
286     health_checker_->OnHealthWatchStatusChange(
287         state, state == GRPC_CHANNEL_TRANSIENT_FAILURE
288                    ? absl::UnavailableError(reason)
289                    : absl::OkStatus());
290   }
291 
292   RefCountedPtr<HealthChecker> health_checker_;
293 };
294 
StartHealthStreamLocked()295 void HealthProducer::HealthChecker::StartHealthStreamLocked() {
296   GRPC_TRACE_LOG(health_check_client, INFO)
297       << "HealthProducer " << producer_.get() << " HealthChecker " << this
298       << ": creating HealthClient for \"" << health_check_service_name_ << "\"";
299   stream_client_ = MakeOrphanable<SubchannelStreamClient>(
300       producer_->connected_subchannel_, producer_->subchannel_->pollset_set(),
301       std::make_unique<HealthStreamEventHandler>(Ref()),
302       GRPC_TRACE_FLAG_ENABLED(health_check_client) ? "HealthClient" : nullptr);
303 }
304 
305 //
306 // HealthProducer::ConnectivityWatcher
307 //
308 
309 class HealthProducer::ConnectivityWatcher final
310     : public Subchannel::ConnectivityStateWatcherInterface {
311  public:
ConnectivityWatcher(WeakRefCountedPtr<HealthProducer> producer)312   explicit ConnectivityWatcher(WeakRefCountedPtr<HealthProducer> producer)
313       : producer_(std::move(producer)) {}
314 
OnConnectivityStateChange(RefCountedPtr<ConnectivityStateWatcherInterface> self,grpc_connectivity_state state,const absl::Status & status)315   void OnConnectivityStateChange(
316       RefCountedPtr<ConnectivityStateWatcherInterface> self,
317       grpc_connectivity_state state, const absl::Status& status) override {
318     producer_->OnConnectivityStateChange(state, status);
319     self.reset();
320   }
321 
interested_parties()322   grpc_pollset_set* interested_parties() override {
323     return producer_->interested_parties_;
324   }
325 
326  private:
327   WeakRefCountedPtr<HealthProducer> producer_;
328 };
329 
330 //
331 // HealthProducer
332 //
333 
Start(RefCountedPtr<Subchannel> subchannel)334 void HealthProducer::Start(RefCountedPtr<Subchannel> subchannel) {
335   GRPC_TRACE_LOG(health_check_client, INFO)
336       << "HealthProducer " << this << ": starting with subchannel "
337       << subchannel.get();
338   subchannel_ = std::move(subchannel);
339   {
340     MutexLock lock(&mu_);
341     connected_subchannel_ = subchannel_->connected_subchannel();
342   }
343   auto connectivity_watcher =
344       MakeRefCounted<ConnectivityWatcher>(WeakRefAsSubclass<HealthProducer>());
345   connectivity_watcher_ = connectivity_watcher.get();
346   subchannel_->WatchConnectivityState(std::move(connectivity_watcher));
347 }
348 
Orphaned()349 void HealthProducer::Orphaned() {
350   GRPC_TRACE_LOG(health_check_client, INFO)
351       << "HealthProducer " << this << ": shutting down";
352   {
353     MutexLock lock(&mu_);
354     health_checkers_.clear();
355   }
356   subchannel_->CancelConnectivityStateWatch(connectivity_watcher_);
357   subchannel_->RemoveDataProducer(this);
358 }
359 
AddWatcher(HealthWatcher * watcher,const absl::optional<std::string> & health_check_service_name)360 void HealthProducer::AddWatcher(
361     HealthWatcher* watcher,
362     const absl::optional<std::string>& health_check_service_name) {
363   MutexLock lock(&mu_);
364   grpc_pollset_set_add_pollset_set(interested_parties_,
365                                    watcher->interested_parties());
366   if (!health_check_service_name.has_value()) {
367     if (state_.has_value()) watcher->Notify(*state_, status_);
368     non_health_watchers_.insert(watcher);
369   } else {
370     auto it =
371         health_checkers_.emplace(*health_check_service_name, nullptr).first;
372     auto& health_checker = it->second;
373     if (health_checker == nullptr) {
374       health_checker = MakeOrphanable<HealthChecker>(
375           WeakRefAsSubclass<HealthProducer>(), it->first);
376     }
377     health_checker->AddWatcherLocked(watcher);
378   }
379 }
380 
RemoveWatcher(HealthWatcher * watcher,const absl::optional<std::string> & health_check_service_name)381 void HealthProducer::RemoveWatcher(
382     HealthWatcher* watcher,
383     const absl::optional<std::string>& health_check_service_name) {
384   MutexLock lock(&mu_);
385   grpc_pollset_set_del_pollset_set(interested_parties_,
386                                    watcher->interested_parties());
387   if (!health_check_service_name.has_value()) {
388     non_health_watchers_.erase(watcher);
389   } else {
390     auto it = health_checkers_.find(*health_check_service_name);
391     if (it == health_checkers_.end()) return;
392     const bool empty = it->second->RemoveWatcherLocked(watcher);
393     if (empty) health_checkers_.erase(it);
394   }
395 }
396 
OnConnectivityStateChange(grpc_connectivity_state state,const absl::Status & status)397 void HealthProducer::OnConnectivityStateChange(grpc_connectivity_state state,
398                                                const absl::Status& status) {
399   GRPC_TRACE_LOG(health_check_client, INFO)
400       << "HealthProducer " << this
401       << ": subchannel state update: state=" << ConnectivityStateName(state)
402       << " status=" << status;
403   MutexLock lock(&mu_);
404   if (state == GRPC_CHANNEL_READY) {
405     connected_subchannel_ = subchannel_->connected_subchannel();
406     // If the subchannel became disconnected again before we got this
407     // notification, then just ignore the READY notification.  We should
408     // get another notification shortly indicating a different state.
409     if (connected_subchannel_ == nullptr) return;
410   } else {
411     connected_subchannel_.reset();
412   }
413   state_ = state;
414   status_ = status;
415   for (const auto& p : health_checkers_) {
416     p.second->OnConnectivityStateChangeLocked(state, status);
417   }
418   for (HealthWatcher* watcher : non_health_watchers_) {
419     watcher->Notify(state, status);
420   }
421 }
422 
423 //
424 // HealthWatcher
425 //
426 
~HealthWatcher()427 HealthWatcher::~HealthWatcher() {
428   GRPC_TRACE_LOG(health_check_client, INFO)
429       << "HealthWatcher " << this << ": unregistering from producer "
430       << producer_.get() << " (health_check_service_name=\""
431       << health_check_service_name_.value_or("N/A") << "\")";
432   if (producer_ != nullptr) {
433     producer_->RemoveWatcher(this, health_check_service_name_);
434   }
435 }
436 
SetSubchannel(Subchannel * subchannel)437 void HealthWatcher::SetSubchannel(Subchannel* subchannel) {
438   bool created = false;
439   // Check if our producer is already registered with the subchannel.
440   // If not, create a new one.
441   subchannel->GetOrAddDataProducer(
442       HealthProducer::Type(),
443       [&](Subchannel::DataProducerInterface** producer) {
444         if (*producer != nullptr) {
445           producer_ =
446               (*producer)->RefIfNonZero().TakeAsSubclass<HealthProducer>();
447         }
448         if (producer_ == nullptr) {
449           producer_ = MakeRefCounted<HealthProducer>();
450           *producer = producer_.get();
451           created = true;
452         }
453       });
454   // If we just created the producer, start it.
455   // This needs to be done outside of the lambda passed to
456   // GetOrAddDataProducer() to avoid deadlocking by re-acquiring the
457   // subchannel lock while already holding it.
458   if (created) producer_->Start(subchannel->Ref());
459   // Register ourself with the producer.
460   producer_->AddWatcher(this, health_check_service_name_);
461   GRPC_TRACE_LOG(health_check_client, INFO)
462       << "HealthWatcher " << this << ": registered with producer "
463       << producer_.get() << " (created=" << created
464       << ", health_check_service_name=\""
465       << health_check_service_name_.value_or("N/A") << "\")";
466 }
467 
Notify(grpc_connectivity_state state,absl::Status status)468 void HealthWatcher::Notify(grpc_connectivity_state state, absl::Status status) {
469   work_serializer_->Schedule(
470       [watcher = watcher_, state, status = std::move(status)]() mutable {
471         watcher->OnConnectivityStateChange(state, std::move(status));
472       },
473       DEBUG_LOCATION);
474   new AsyncWorkSerializerDrainer(work_serializer_);
475 }
476 
477 //
478 // External API
479 //
480 
481 std::unique_ptr<SubchannelInterface::DataWatcherInterface>
MakeHealthCheckWatcher(std::shared_ptr<WorkSerializer> work_serializer,const ChannelArgs & args,std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher)482 MakeHealthCheckWatcher(
483     std::shared_ptr<WorkSerializer> work_serializer, const ChannelArgs& args,
484     std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
485         watcher) {
486   absl::optional<std::string> health_check_service_name;
487   if (!args.GetBool(GRPC_ARG_INHIBIT_HEALTH_CHECKING).value_or(false)) {
488     health_check_service_name =
489         args.GetOwnedString(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME);
490   }
491   GRPC_TRACE_LOG(health_check_client, INFO)
492       << "creating HealthWatcher -- health_check_service_name=\""
493       << health_check_service_name.value_or("N/A") << "\"";
494   return std::make_unique<HealthWatcher>(std::move(work_serializer),
495                                          std::move(health_check_service_name),
496                                          std::move(watcher));
497 }
498 
499 }  // namespace grpc_core
500