1 // 2 // Copyright 2019 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_CORE_EXT_XDS_XDS_CLIENT_H 18 #define GRPC_CORE_EXT_XDS_XDS_CLIENT_H 19 20 #include <grpc/support/port_platform.h> 21 22 #include <set> 23 #include <vector> 24 25 #include "absl/strings/string_view.h" 26 #include "absl/types/optional.h" 27 28 #include "src/core/ext/xds/xds_api.h" 29 #include "src/core/ext/xds/xds_bootstrap.h" 30 #include "src/core/ext/xds/xds_client_stats.h" 31 #include "src/core/lib/channel/channelz.h" 32 #include "src/core/lib/gprpp/dual_ref_counted.h" 33 #include "src/core/lib/gprpp/memory.h" 34 #include "src/core/lib/gprpp/orphanable.h" 35 #include "src/core/lib/gprpp/ref_counted.h" 36 #include "src/core/lib/gprpp/ref_counted_ptr.h" 37 #include "src/core/lib/gprpp/sync.h" 38 39 namespace grpc_core { 40 41 extern TraceFlag grpc_xds_client_trace; 42 extern TraceFlag grpc_xds_client_refcount_trace; 43 44 class XdsClient : public DualRefCounted<XdsClient> { 45 public: 46 // Listener data watcher interface. Implemented by callers. 47 class ListenerWatcherInterface { 48 public: 49 virtual ~ListenerWatcherInterface() = default; 50 virtual void OnListenerChanged(XdsApi::LdsUpdate listener) = 0; 51 virtual void OnError(grpc_error* error) = 0; 52 virtual void OnResourceDoesNotExist() = 0; 53 }; 54 55 // RouteConfiguration data watcher interface. Implemented by callers. 56 class RouteConfigWatcherInterface { 57 public: 58 virtual ~RouteConfigWatcherInterface() = default; 59 virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) = 0; 60 virtual void OnError(grpc_error* error) = 0; 61 virtual void OnResourceDoesNotExist() = 0; 62 }; 63 64 // Cluster data watcher interface. Implemented by callers. 65 class ClusterWatcherInterface { 66 public: 67 virtual ~ClusterWatcherInterface() = default; 68 virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0; 69 virtual void OnError(grpc_error* error) = 0; 70 virtual void OnResourceDoesNotExist() = 0; 71 }; 72 73 // Endpoint data watcher interface. Implemented by callers. 74 class EndpointWatcherInterface { 75 public: 76 virtual ~EndpointWatcherInterface() = default; 77 virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0; 78 virtual void OnError(grpc_error* error) = 0; 79 virtual void OnResourceDoesNotExist() = 0; 80 }; 81 82 // Factory function to get or create the global XdsClient instance. 83 // If *error is not GRPC_ERROR_NONE upon return, then there was 84 // an error initializing the client. 85 static RefCountedPtr<XdsClient> GetOrCreate(grpc_error** error); 86 87 // Callers should not instantiate directly. Use GetOrCreate() instead. 88 explicit XdsClient(grpc_error** error); 89 ~XdsClient() override; 90 certificate_provider_store()91 CertificateProviderStore& certificate_provider_store() { 92 return *certificate_provider_store_; 93 } 94 interested_parties()95 grpc_pollset_set* interested_parties() const { return interested_parties_; } 96 97 // TODO(roth): When we add federation, there will be multiple channels 98 // inside the XdsClient, and the set of channels may change over time, 99 // but not every channel may use every one of the child channels, so 100 // this API will need to change. At minumum, we will need to hold a 101 // ref to the parent channelz node so that we can update its list of 102 // children as the set of xDS channels changes. However, we may also 103 // want to make this a bit more selective such that only those 104 // channels on which a given parent channel is actually requesting 105 // resources will actually be marked as its children. 106 void AddChannelzLinkage(channelz::ChannelNode* parent_channelz_node); 107 void RemoveChannelzLinkage(channelz::ChannelNode* parent_channelz_node); 108 109 void Orphan() override; 110 111 // Start and cancel listener data watch for a listener. 112 // The XdsClient takes ownership of the watcher, but the caller may 113 // keep a raw pointer to the watcher, which may be used only for 114 // cancellation. (Because the caller does not own the watcher, the 115 // pointer must not be used for any other purpose.) 116 // If the caller is going to start a new watch after cancelling the 117 // old one, it should set delay_unsubscription to true. 118 void WatchListenerData(absl::string_view listener_name, 119 std::unique_ptr<ListenerWatcherInterface> watcher); 120 void CancelListenerDataWatch(absl::string_view listener_name, 121 ListenerWatcherInterface* watcher, 122 bool delay_unsubscription = false); 123 124 // Start and cancel route config data watch for a listener. 125 // The XdsClient takes ownership of the watcher, but the caller may 126 // keep a raw pointer to the watcher, which may be used only for 127 // cancellation. (Because the caller does not own the watcher, the 128 // pointer must not be used for any other purpose.) 129 // If the caller is going to start a new watch after cancelling the 130 // old one, it should set delay_unsubscription to true. 131 void WatchRouteConfigData( 132 absl::string_view route_config_name, 133 std::unique_ptr<RouteConfigWatcherInterface> watcher); 134 void CancelRouteConfigDataWatch(absl::string_view route_config_name, 135 RouteConfigWatcherInterface* watcher, 136 bool delay_unsubscription = false); 137 138 // Start and cancel cluster data watch for a cluster. 139 // The XdsClient takes ownership of the watcher, but the caller may 140 // keep a raw pointer to the watcher, which may be used only for 141 // cancellation. (Because the caller does not own the watcher, the 142 // pointer must not be used for any other purpose.) 143 // If the caller is going to start a new watch after cancelling the 144 // old one, it should set delay_unsubscription to true. 145 void WatchClusterData(absl::string_view cluster_name, 146 std::unique_ptr<ClusterWatcherInterface> watcher); 147 void CancelClusterDataWatch(absl::string_view cluster_name, 148 ClusterWatcherInterface* watcher, 149 bool delay_unsubscription = false); 150 151 // Start and cancel endpoint data watch for a cluster. 152 // The XdsClient takes ownership of the watcher, but the caller may 153 // keep a raw pointer to the watcher, which may be used only for 154 // cancellation. (Because the caller does not own the watcher, the 155 // pointer must not be used for any other purpose.) 156 // If the caller is going to start a new watch after cancelling the 157 // old one, it should set delay_unsubscription to true. 158 void WatchEndpointData(absl::string_view eds_service_name, 159 std::unique_ptr<EndpointWatcherInterface> watcher); 160 void CancelEndpointDataWatch(absl::string_view eds_service_name, 161 EndpointWatcherInterface* watcher, 162 bool delay_unsubscription = false); 163 164 // Adds and removes drop stats for cluster_name and eds_service_name. 165 RefCountedPtr<XdsClusterDropStats> AddClusterDropStats( 166 absl::string_view lrs_server, absl::string_view cluster_name, 167 absl::string_view eds_service_name); 168 void RemoveClusterDropStats(absl::string_view /*lrs_server*/, 169 absl::string_view cluster_name, 170 absl::string_view eds_service_name, 171 XdsClusterDropStats* cluster_drop_stats); 172 173 // Adds and removes locality stats for cluster_name and eds_service_name 174 // for the specified locality. 175 RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats( 176 absl::string_view lrs_server, absl::string_view cluster_name, 177 absl::string_view eds_service_name, 178 RefCountedPtr<XdsLocalityName> locality); 179 void RemoveClusterLocalityStats( 180 absl::string_view /*lrs_server*/, absl::string_view cluster_name, 181 absl::string_view eds_service_name, 182 const RefCountedPtr<XdsLocalityName>& locality, 183 XdsClusterLocalityStats* cluster_locality_stats); 184 185 // Resets connection backoff state. 186 void ResetBackoff(); 187 188 private: 189 // Contains a channel to the xds server and all the data related to the 190 // channel. Holds a ref to the xds client object. 191 // 192 // Currently, there is only one ChannelState object per XdsClient 193 // object, and it has essentially the same lifetime. But in the 194 // future, when we add federation support, a single XdsClient may have 195 // multiple underlying channels to talk to different xDS servers. 196 class ChannelState : public InternallyRefCounted<ChannelState> { 197 public: 198 template <typename T> 199 class RetryableCall; 200 201 class AdsCallState; 202 class LrsCallState; 203 204 ChannelState(WeakRefCountedPtr<XdsClient> xds_client, 205 const XdsBootstrap::XdsServer& server); 206 ~ChannelState() override; 207 208 void Orphan() override; 209 channel()210 grpc_channel* channel() const { return channel_; } xds_client()211 XdsClient* xds_client() const { return xds_client_.get(); } 212 AdsCallState* ads_calld() const; 213 LrsCallState* lrs_calld() const; 214 215 void MaybeStartLrsCall(); 216 void StopLrsCall(); 217 218 bool HasActiveAdsCall() const; 219 220 void StartConnectivityWatchLocked(); 221 void CancelConnectivityWatchLocked(); 222 223 void Subscribe(const std::string& type_url, const std::string& name); 224 void Unsubscribe(const std::string& type_url, const std::string& name, 225 bool delay_unsubscription); 226 227 private: 228 class StateWatcher; 229 230 // The owning xds client. 231 WeakRefCountedPtr<XdsClient> xds_client_; 232 233 const XdsBootstrap::XdsServer& server_; 234 235 // The channel and its status. 236 grpc_channel* channel_; 237 bool shutting_down_ = false; 238 StateWatcher* watcher_ = nullptr; 239 240 // The retryable XDS calls. 241 OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_; 242 OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_; 243 }; 244 245 struct ListenerState { 246 std::map<ListenerWatcherInterface*, 247 std::unique_ptr<ListenerWatcherInterface>> 248 watchers; 249 // The latest data seen from LDS. 250 absl::optional<XdsApi::LdsUpdate> update; 251 }; 252 253 struct RouteConfigState { 254 std::map<RouteConfigWatcherInterface*, 255 std::unique_ptr<RouteConfigWatcherInterface>> 256 watchers; 257 // The latest data seen from RDS. 258 absl::optional<XdsApi::RdsUpdate> update; 259 }; 260 261 struct ClusterState { 262 std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>> 263 watchers; 264 // The latest data seen from CDS. 265 absl::optional<XdsApi::CdsUpdate> update; 266 }; 267 268 struct EndpointState { 269 std::map<EndpointWatcherInterface*, 270 std::unique_ptr<EndpointWatcherInterface>> 271 watchers; 272 // The latest data seen from EDS. 273 absl::optional<XdsApi::EdsUpdate> update; 274 }; 275 276 struct LoadReportState { 277 struct LocalityState { 278 XdsClusterLocalityStats* locality_stats = nullptr; 279 XdsClusterLocalityStats::Snapshot deleted_locality_stats; 280 }; 281 282 XdsClusterDropStats* drop_stats = nullptr; 283 XdsClusterDropStats::Snapshot deleted_drop_stats; 284 std::map<RefCountedPtr<XdsLocalityName>, LocalityState, 285 XdsLocalityName::Less> 286 locality_stats; 287 grpc_millis last_report_time = ExecCtx::Get()->Now(); 288 }; 289 290 // Sends an error notification to all watchers. 291 void NotifyOnErrorLocked(grpc_error* error); 292 293 XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked( 294 bool send_all_clusters, const std::set<std::string>& clusters); 295 296 const grpc_millis request_timeout_; 297 grpc_pollset_set* interested_parties_; 298 std::unique_ptr<XdsBootstrap> bootstrap_; 299 OrphanablePtr<CertificateProviderStore> certificate_provider_store_; 300 XdsApi api_; 301 302 Mutex mu_; 303 304 // The channel for communicating with the xds server. 305 OrphanablePtr<ChannelState> chand_; 306 307 // One entry for each watched LDS resource. 308 std::map<std::string /*listener_name*/, ListenerState> listener_map_; 309 // One entry for each watched RDS resource. 310 std::map<std::string /*route_config_name*/, RouteConfigState> 311 route_config_map_; 312 // One entry for each watched CDS resource. 313 std::map<std::string /*cluster_name*/, ClusterState> cluster_map_; 314 // One entry for each watched EDS resource. 315 std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_; 316 317 // Load report data. 318 std::map< 319 std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>, 320 LoadReportState> 321 load_report_map_; 322 323 // Stores the most recent accepted resource version for each resource type. 324 std::map<std::string /*type*/, std::string /*version*/> resource_version_map_; 325 326 bool shutting_down_ = false; 327 }; 328 329 namespace internal { 330 void SetXdsChannelArgsForTest(grpc_channel_args* args); 331 void UnsetGlobalXdsClientForTest(); 332 } // namespace internal 333 334 } // namespace grpc_core 335 336 #endif /* GRPC_CORE_EXT_XDS_XDS_CLIENT_H */ 337