• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H
18 #define GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H
19 
20 #include <grpc/impl/connectivity_state.h>
21 #include <grpc/support/port_platform.h>
22 
23 #include <map>
24 #include <memory>
25 #include <set>
26 #include <string>
27 #include <utility>
28 
29 #include "absl/base/thread_annotations.h"
30 #include "absl/status/status.h"
31 #include "absl/strings/string_view.h"
32 #include "absl/types/optional.h"
33 #include "src/core/client_channel/subchannel.h"
34 #include "src/core/client_channel/subchannel_interface_internal.h"
35 #include "src/core/client_channel/subchannel_stream_client.h"
36 #include "src/core/lib/iomgr/iomgr_fwd.h"
37 #include "src/core/lib/iomgr/pollset_set.h"
38 #include "src/core/load_balancing/subchannel_interface.h"
39 #include "src/core/util/orphanable.h"
40 #include "src/core/util/ref_counted_ptr.h"
41 #include "src/core/util/sync.h"
42 #include "src/core/util/unique_type_name.h"
43 #include "src/core/util/work_serializer.h"
44 
45 namespace grpc_core {
46 
47 class HealthWatcher;
48 
49 // This producer is registered with a subchannel.  It creates a streaming
50 // health watch call for each health check service name that is being
51 // watched and reports the resulting connectivity state to all
52 // registered watchers.
53 class HealthProducer final : public Subchannel::DataProducerInterface {
54  public:
HealthProducer()55   HealthProducer() : interested_parties_(grpc_pollset_set_create()) {}
~HealthProducer()56   ~HealthProducer() override { grpc_pollset_set_destroy(interested_parties_); }
57 
58   void Start(RefCountedPtr<Subchannel> subchannel);
59 
Type()60   static UniqueTypeName Type() {
61     static UniqueTypeName::Factory kFactory("health_check");
62     return kFactory.Create();
63   }
64 
type()65   UniqueTypeName type() const override { return Type(); }
66 
67   void AddWatcher(HealthWatcher* watcher,
68                   const absl::optional<std::string>& health_check_service_name);
69   void RemoveWatcher(
70       HealthWatcher* watcher,
71       const absl::optional<std::string>& health_check_service_name);
72 
73  private:
74   class ConnectivityWatcher;
75 
76   // Health checker for a given health check service name.  Contains the
77   // health check client and the list of watchers.
78   class HealthChecker final : public InternallyRefCounted<HealthChecker> {
79    public:
80     HealthChecker(WeakRefCountedPtr<HealthProducer> producer,
81                   absl::string_view health_check_service_name);
82 
83     // Disable thread-safety analysis because this method is called via
84     // OrphanablePtr<>, but there's no way to pass the lock annotation
85     // through there.
86     void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
87 
88     void AddWatcherLocked(HealthWatcher* watcher)
89         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_);
90 
91     // Returns true if this was the last watcher.
92     bool RemoveWatcherLocked(HealthWatcher* watcher)
93         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_);
94 
95     // Called when the subchannel's connectivity state changes.
96     void OnConnectivityStateChangeLocked(grpc_connectivity_state state,
97                                          const absl::Status& status)
98         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_);
99 
100    private:
101     class HealthStreamEventHandler;
102 
103     // Starts a new stream if we have a connected subchannel.
104     // Called whenever the subchannel transitions to state READY or when a
105     // watcher is added.
106     void StartHealthStreamLocked()
107         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_);
108 
109     // Notifies watchers of a new state.
110     // Called while holding the SubchannelStreamClient lock and possibly
111     // the producer lock, so must notify asynchronously, but in guaranteed
112     // order (hence the use of WorkSerializer).
113     void NotifyWatchersLocked(grpc_connectivity_state state,
114                               absl::Status status)
115         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_);
116 
117     // Called by the health check client when receiving an update.
118     void OnHealthWatchStatusChange(grpc_connectivity_state state,
119                                    const absl::Status& status);
120 
121     WeakRefCountedPtr<HealthProducer> producer_;
122     absl::string_view health_check_service_name_;
123     std::shared_ptr<WorkSerializer> work_serializer_ =
124         std::make_shared<WorkSerializer>(
125             producer_->subchannel_->event_engine());
126 
127     absl::optional<grpc_connectivity_state> state_
128         ABSL_GUARDED_BY(&HealthProducer::mu_);
129     absl::Status status_ ABSL_GUARDED_BY(&HealthProducer::mu_);
130     OrphanablePtr<SubchannelStreamClient> stream_client_
131         ABSL_GUARDED_BY(&HealthProducer::mu_);
132     std::set<HealthWatcher*> watchers_ ABSL_GUARDED_BY(&HealthProducer::mu_);
133   };
134 
135   // Handles a connectivity state change on the subchannel.
136   void OnConnectivityStateChange(grpc_connectivity_state state,
137                                  const absl::Status& status);
138   void Orphaned() override;
139 
140   RefCountedPtr<Subchannel> subchannel_;
141   ConnectivityWatcher* connectivity_watcher_;
142   grpc_pollset_set* interested_parties_;
143 
144   Mutex mu_;
145   absl::optional<grpc_connectivity_state> state_ ABSL_GUARDED_BY(&mu_);
146   absl::Status status_ ABSL_GUARDED_BY(&mu_);
147   RefCountedPtr<ConnectedSubchannel> connected_subchannel_
148       ABSL_GUARDED_BY(&mu_);
149   std::map<std::string /*health_check_service_name*/,
150            OrphanablePtr<HealthChecker>>
151       health_checkers_ ABSL_GUARDED_BY(&mu_);
152   std::set<HealthWatcher*> non_health_watchers_ ABSL_GUARDED_BY(&mu_);
153 };
154 
155 // A data watcher that handles health checking.
156 class HealthWatcher final : public InternalSubchannelDataWatcherInterface {
157  public:
HealthWatcher(std::shared_ptr<WorkSerializer> work_serializer,absl::optional<std::string> health_check_service_name,std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher)158   HealthWatcher(
159       std::shared_ptr<WorkSerializer> work_serializer,
160       absl::optional<std::string> health_check_service_name,
161       std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
162           watcher)
163       : work_serializer_(std::move(work_serializer)),
164         health_check_service_name_(std::move(health_check_service_name)),
165         watcher_(std::move(watcher)) {}
166   ~HealthWatcher() override;
167 
type()168   UniqueTypeName type() const override { return HealthProducer::Type(); }
169 
170   // When the client channel sees this wrapper, it will pass it the real
171   // subchannel to use.
172   void SetSubchannel(Subchannel* subchannel) override;
173 
174   // For intercepting the watcher before it gets up to the real subchannel.
175   std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
TakeWatcher()176   TakeWatcher() {
177     return std::move(watcher_);
178   }
SetWatcher(std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher)179   void SetWatcher(
180       std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
181           watcher) {
182     watcher_ = std::move(watcher);
183   }
184 
185   void Notify(grpc_connectivity_state state, absl::Status status);
186 
interested_parties()187   grpc_pollset_set* interested_parties() const {
188     return watcher_->interested_parties();
189   }
190 
191  private:
192   std::shared_ptr<WorkSerializer> work_serializer_;
193   absl::optional<std::string> health_check_service_name_;
194   std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
195       watcher_;
196   RefCountedPtr<HealthProducer> producer_;
197 };
198 
199 }  // namespace grpc_core
200 
201 #endif  // GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H
202