1 // 2 // Copyright 2022 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H 18 #define GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H 19 20 #include <grpc/impl/connectivity_state.h> 21 #include <grpc/support/port_platform.h> 22 23 #include <map> 24 #include <memory> 25 #include <set> 26 #include <string> 27 #include <utility> 28 29 #include "absl/base/thread_annotations.h" 30 #include "absl/status/status.h" 31 #include "absl/strings/string_view.h" 32 #include "absl/types/optional.h" 33 #include "src/core/client_channel/subchannel.h" 34 #include "src/core/client_channel/subchannel_interface_internal.h" 35 #include "src/core/client_channel/subchannel_stream_client.h" 36 #include "src/core/lib/iomgr/iomgr_fwd.h" 37 #include "src/core/lib/iomgr/pollset_set.h" 38 #include "src/core/load_balancing/subchannel_interface.h" 39 #include "src/core/util/orphanable.h" 40 #include "src/core/util/ref_counted_ptr.h" 41 #include "src/core/util/sync.h" 42 #include "src/core/util/unique_type_name.h" 43 #include "src/core/util/work_serializer.h" 44 45 namespace grpc_core { 46 47 class HealthWatcher; 48 49 // This producer is registered with a subchannel. It creates a streaming 50 // health watch call for each health check service name that is being 51 // watched and reports the resulting connectivity state to all 52 // registered watchers. 53 class HealthProducer final : public Subchannel::DataProducerInterface { 54 public: HealthProducer()55 HealthProducer() : interested_parties_(grpc_pollset_set_create()) {} ~HealthProducer()56 ~HealthProducer() override { grpc_pollset_set_destroy(interested_parties_); } 57 58 void Start(RefCountedPtr<Subchannel> subchannel); 59 Type()60 static UniqueTypeName Type() { 61 static UniqueTypeName::Factory kFactory("health_check"); 62 return kFactory.Create(); 63 } 64 type()65 UniqueTypeName type() const override { return Type(); } 66 67 void AddWatcher(HealthWatcher* watcher, 68 const absl::optional<std::string>& health_check_service_name); 69 void RemoveWatcher( 70 HealthWatcher* watcher, 71 const absl::optional<std::string>& health_check_service_name); 72 73 private: 74 class ConnectivityWatcher; 75 76 // Health checker for a given health check service name. Contains the 77 // health check client and the list of watchers. 78 class HealthChecker final : public InternallyRefCounted<HealthChecker> { 79 public: 80 HealthChecker(WeakRefCountedPtr<HealthProducer> producer, 81 absl::string_view health_check_service_name); 82 83 // Disable thread-safety analysis because this method is called via 84 // OrphanablePtr<>, but there's no way to pass the lock annotation 85 // through there. 86 void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS; 87 88 void AddWatcherLocked(HealthWatcher* watcher) 89 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); 90 91 // Returns true if this was the last watcher. 92 bool RemoveWatcherLocked(HealthWatcher* watcher) 93 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); 94 95 // Called when the subchannel's connectivity state changes. 96 void OnConnectivityStateChangeLocked(grpc_connectivity_state state, 97 const absl::Status& status) 98 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); 99 100 private: 101 class HealthStreamEventHandler; 102 103 // Starts a new stream if we have a connected subchannel. 104 // Called whenever the subchannel transitions to state READY or when a 105 // watcher is added. 106 void StartHealthStreamLocked() 107 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); 108 109 // Notifies watchers of a new state. 110 // Called while holding the SubchannelStreamClient lock and possibly 111 // the producer lock, so must notify asynchronously, but in guaranteed 112 // order (hence the use of WorkSerializer). 113 void NotifyWatchersLocked(grpc_connectivity_state state, 114 absl::Status status) 115 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); 116 117 // Called by the health check client when receiving an update. 118 void OnHealthWatchStatusChange(grpc_connectivity_state state, 119 const absl::Status& status); 120 121 WeakRefCountedPtr<HealthProducer> producer_; 122 absl::string_view health_check_service_name_; 123 std::shared_ptr<WorkSerializer> work_serializer_ = 124 std::make_shared<WorkSerializer>( 125 producer_->subchannel_->event_engine()); 126 127 absl::optional<grpc_connectivity_state> state_ 128 ABSL_GUARDED_BY(&HealthProducer::mu_); 129 absl::Status status_ ABSL_GUARDED_BY(&HealthProducer::mu_); 130 OrphanablePtr<SubchannelStreamClient> stream_client_ 131 ABSL_GUARDED_BY(&HealthProducer::mu_); 132 std::set<HealthWatcher*> watchers_ ABSL_GUARDED_BY(&HealthProducer::mu_); 133 }; 134 135 // Handles a connectivity state change on the subchannel. 136 void OnConnectivityStateChange(grpc_connectivity_state state, 137 const absl::Status& status); 138 void Orphaned() override; 139 140 RefCountedPtr<Subchannel> subchannel_; 141 ConnectivityWatcher* connectivity_watcher_; 142 grpc_pollset_set* interested_parties_; 143 144 Mutex mu_; 145 absl::optional<grpc_connectivity_state> state_ ABSL_GUARDED_BY(&mu_); 146 absl::Status status_ ABSL_GUARDED_BY(&mu_); 147 RefCountedPtr<ConnectedSubchannel> connected_subchannel_ 148 ABSL_GUARDED_BY(&mu_); 149 std::map<std::string /*health_check_service_name*/, 150 OrphanablePtr<HealthChecker>> 151 health_checkers_ ABSL_GUARDED_BY(&mu_); 152 std::set<HealthWatcher*> non_health_watchers_ ABSL_GUARDED_BY(&mu_); 153 }; 154 155 // A data watcher that handles health checking. 156 class HealthWatcher final : public InternalSubchannelDataWatcherInterface { 157 public: HealthWatcher(std::shared_ptr<WorkSerializer> work_serializer,absl::optional<std::string> health_check_service_name,std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher)158 HealthWatcher( 159 std::shared_ptr<WorkSerializer> work_serializer, 160 absl::optional<std::string> health_check_service_name, 161 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> 162 watcher) 163 : work_serializer_(std::move(work_serializer)), 164 health_check_service_name_(std::move(health_check_service_name)), 165 watcher_(std::move(watcher)) {} 166 ~HealthWatcher() override; 167 type()168 UniqueTypeName type() const override { return HealthProducer::Type(); } 169 170 // When the client channel sees this wrapper, it will pass it the real 171 // subchannel to use. 172 void SetSubchannel(Subchannel* subchannel) override; 173 174 // For intercepting the watcher before it gets up to the real subchannel. 175 std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> TakeWatcher()176 TakeWatcher() { 177 return std::move(watcher_); 178 } SetWatcher(std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher)179 void SetWatcher( 180 std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> 181 watcher) { 182 watcher_ = std::move(watcher); 183 } 184 185 void Notify(grpc_connectivity_state state, absl::Status status); 186 interested_parties()187 grpc_pollset_set* interested_parties() const { 188 return watcher_->interested_parties(); 189 } 190 191 private: 192 std::shared_ptr<WorkSerializer> work_serializer_; 193 absl::optional<std::string> health_check_service_name_; 194 std::shared_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> 195 watcher_; 196 RefCountedPtr<HealthProducer> producer_; 197 }; 198 199 } // namespace grpc_core 200 201 #endif // GRPC_SRC_CORE_LOAD_BALANCING_HEALTH_CHECK_CLIENT_INTERNAL_H 202