1 // 2 // Copyright 2019 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_CORE_EXT_XDS_XDS_CLIENT_H 18 #define GRPC_CORE_EXT_XDS_XDS_CLIENT_H 19 20 #include <grpc/support/port_platform.h> 21 22 #include <set> 23 #include <vector> 24 25 #include "absl/strings/string_view.h" 26 #include "absl/types/optional.h" 27 28 #include "src/core/ext/xds/xds_api.h" 29 #include "src/core/ext/xds/xds_bootstrap.h" 30 #include "src/core/ext/xds/xds_client_stats.h" 31 #include "src/core/lib/channel/channelz.h" 32 #include "src/core/lib/gprpp/dual_ref_counted.h" 33 #include "src/core/lib/gprpp/memory.h" 34 #include "src/core/lib/gprpp/orphanable.h" 35 #include "src/core/lib/gprpp/ref_counted.h" 36 #include "src/core/lib/gprpp/ref_counted_ptr.h" 37 #include "src/core/lib/gprpp/sync.h" 38 39 namespace grpc_core { 40 41 extern TraceFlag grpc_xds_client_trace; 42 extern TraceFlag grpc_xds_client_refcount_trace; 43 44 class XdsClient : public DualRefCounted<XdsClient> { 45 public: 46 // Listener data watcher interface. Implemented by callers. 47 class ListenerWatcherInterface { 48 public: 49 virtual ~ListenerWatcherInterface() = default; 50 virtual void OnListenerChanged(XdsApi::LdsUpdate listener) = 0; 51 virtual void OnError(grpc_error_handle error) = 0; 52 virtual void OnResourceDoesNotExist() = 0; 53 }; 54 55 // RouteConfiguration data watcher interface. Implemented by callers. 56 class RouteConfigWatcherInterface { 57 public: 58 virtual ~RouteConfigWatcherInterface() = default; 59 virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) = 0; 60 virtual void OnError(grpc_error_handle error) = 0; 61 virtual void OnResourceDoesNotExist() = 0; 62 }; 63 64 // Cluster data watcher interface. Implemented by callers. 65 class ClusterWatcherInterface { 66 public: 67 virtual ~ClusterWatcherInterface() = default; 68 virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0; 69 virtual void OnError(grpc_error_handle error) = 0; 70 virtual void OnResourceDoesNotExist() = 0; 71 }; 72 73 // Endpoint data watcher interface. Implemented by callers. 74 class EndpointWatcherInterface { 75 public: 76 virtual ~EndpointWatcherInterface() = default; 77 virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0; 78 virtual void OnError(grpc_error_handle error) = 0; 79 virtual void OnResourceDoesNotExist() = 0; 80 }; 81 82 // Factory function to get or create the global XdsClient instance. 83 // If *error is not GRPC_ERROR_NONE upon return, then there was 84 // an error initializing the client. 85 static RefCountedPtr<XdsClient> GetOrCreate(const grpc_channel_args* args, 86 grpc_error_handle* error); 87 88 // Most callers should not instantiate directly. Use GetOrCreate() instead. 89 XdsClient(std::unique_ptr<XdsBootstrap> bootstrap, 90 const grpc_channel_args* args); 91 ~XdsClient() override; 92 bootstrap()93 const XdsBootstrap& bootstrap() const { 94 // bootstrap_ is guaranteed to be non-null since XdsClient::GetOrCreate() 95 // would return a null object if bootstrap_ was null. 96 return *bootstrap_; 97 } 98 certificate_provider_store()99 CertificateProviderStore& certificate_provider_store() { 100 return *certificate_provider_store_; 101 } 102 interested_parties()103 grpc_pollset_set* interested_parties() const { return interested_parties_; } 104 105 // TODO(roth): When we add federation, there will be multiple channels 106 // inside the XdsClient, and the set of channels may change over time, 107 // but not every channel may use every one of the child channels, so 108 // this API will need to change. At minumum, we will need to hold a 109 // ref to the parent channelz node so that we can update its list of 110 // children as the set of xDS channels changes. However, we may also 111 // want to make this a bit more selective such that only those 112 // channels on which a given parent channel is actually requesting 113 // resources will actually be marked as its children. 114 void AddChannelzLinkage(channelz::ChannelNode* parent_channelz_node); 115 void RemoveChannelzLinkage(channelz::ChannelNode* parent_channelz_node); 116 117 void Orphan() override; 118 119 // Start and cancel listener data watch for a listener. 120 // The XdsClient takes ownership of the watcher, but the caller may 121 // keep a raw pointer to the watcher, which may be used only for 122 // cancellation. (Because the caller does not own the watcher, the 123 // pointer must not be used for any other purpose.) 124 // If the caller is going to start a new watch after cancelling the 125 // old one, it should set delay_unsubscription to true. 126 void WatchListenerData(absl::string_view listener_name, 127 std::unique_ptr<ListenerWatcherInterface> watcher); 128 void CancelListenerDataWatch(absl::string_view listener_name, 129 ListenerWatcherInterface* watcher, 130 bool delay_unsubscription = false); 131 132 // Start and cancel route config data watch for a listener. 133 // The XdsClient takes ownership of the watcher, but the caller may 134 // keep a raw pointer to the watcher, which may be used only for 135 // cancellation. (Because the caller does not own the watcher, the 136 // pointer must not be used for any other purpose.) 137 // If the caller is going to start a new watch after cancelling the 138 // old one, it should set delay_unsubscription to true. 139 void WatchRouteConfigData( 140 absl::string_view route_config_name, 141 std::unique_ptr<RouteConfigWatcherInterface> watcher); 142 void CancelRouteConfigDataWatch(absl::string_view route_config_name, 143 RouteConfigWatcherInterface* watcher, 144 bool delay_unsubscription = false); 145 146 // Start and cancel cluster data watch for a cluster. 147 // The XdsClient takes ownership of the watcher, but the caller may 148 // keep a raw pointer to the watcher, which may be used only for 149 // cancellation. (Because the caller does not own the watcher, the 150 // pointer must not be used for any other purpose.) 151 // If the caller is going to start a new watch after cancelling the 152 // old one, it should set delay_unsubscription to true. 153 void WatchClusterData(absl::string_view cluster_name, 154 std::unique_ptr<ClusterWatcherInterface> watcher); 155 void CancelClusterDataWatch(absl::string_view cluster_name, 156 ClusterWatcherInterface* watcher, 157 bool delay_unsubscription = false); 158 159 // Start and cancel endpoint data watch for a cluster. 160 // The XdsClient takes ownership of the watcher, but the caller may 161 // keep a raw pointer to the watcher, which may be used only for 162 // cancellation. (Because the caller does not own the watcher, the 163 // pointer must not be used for any other purpose.) 164 // If the caller is going to start a new watch after cancelling the 165 // old one, it should set delay_unsubscription to true. 166 void WatchEndpointData(absl::string_view eds_service_name, 167 std::unique_ptr<EndpointWatcherInterface> watcher); 168 void CancelEndpointDataWatch(absl::string_view eds_service_name, 169 EndpointWatcherInterface* watcher, 170 bool delay_unsubscription = false); 171 172 // Adds and removes drop stats for cluster_name and eds_service_name. 173 RefCountedPtr<XdsClusterDropStats> AddClusterDropStats( 174 absl::string_view lrs_server, absl::string_view cluster_name, 175 absl::string_view eds_service_name); 176 void RemoveClusterDropStats(absl::string_view /*lrs_server*/, 177 absl::string_view cluster_name, 178 absl::string_view eds_service_name, 179 XdsClusterDropStats* cluster_drop_stats); 180 181 // Adds and removes locality stats for cluster_name and eds_service_name 182 // for the specified locality. 183 RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats( 184 absl::string_view lrs_server, absl::string_view cluster_name, 185 absl::string_view eds_service_name, 186 RefCountedPtr<XdsLocalityName> locality); 187 void RemoveClusterLocalityStats( 188 absl::string_view /*lrs_server*/, absl::string_view cluster_name, 189 absl::string_view eds_service_name, 190 const RefCountedPtr<XdsLocalityName>& locality, 191 XdsClusterLocalityStats* cluster_locality_stats); 192 193 // Resets connection backoff state. 194 void ResetBackoff(); 195 196 // Dumps the active xDS config in JSON format. 197 // Individual xDS resource is encoded as envoy.admin.v3.*ConfigDump. Returns 198 // envoy.service.status.v3.ClientConfig which also includes the config 199 // status (e.g., CLIENT_REQUESTED, CLIENT_ACKED, CLIENT_NACKED). 200 // 201 // Expected to be invoked by wrapper languages in their CSDS service 202 // implementation. 203 std::string DumpClientConfigBinary(); 204 205 // Helpers for encoding the XdsClient object in channel args. 206 grpc_arg MakeChannelArg() const; 207 static RefCountedPtr<XdsClient> GetFromChannelArgs( 208 const grpc_channel_args& args); 209 210 private: 211 // Contains a channel to the xds server and all the data related to the 212 // channel. Holds a ref to the xds client object. 213 // 214 // Currently, there is only one ChannelState object per XdsClient 215 // object, and it has essentially the same lifetime. But in the 216 // future, when we add federation support, a single XdsClient may have 217 // multiple underlying channels to talk to different xDS servers. 218 class ChannelState : public InternallyRefCounted<ChannelState> { 219 public: 220 template <typename T> 221 class RetryableCall; 222 223 class AdsCallState; 224 class LrsCallState; 225 226 ChannelState(WeakRefCountedPtr<XdsClient> xds_client, 227 const XdsBootstrap::XdsServer& server); 228 ~ChannelState() override; 229 230 void Orphan() override; 231 channel()232 grpc_channel* channel() const { return channel_; } xds_client()233 XdsClient* xds_client() const { return xds_client_.get(); } 234 AdsCallState* ads_calld() const; 235 LrsCallState* lrs_calld() const; 236 237 void MaybeStartLrsCall(); 238 void StopLrsCall(); 239 240 bool HasAdsCall() const; 241 bool HasActiveAdsCall() const; 242 243 void StartConnectivityWatchLocked(); 244 void CancelConnectivityWatchLocked(); 245 246 void SubscribeLocked(const std::string& type_url, const std::string& name) 247 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 248 void UnsubscribeLocked(const std::string& type_url, const std::string& name, 249 bool delay_unsubscription) 250 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 251 252 private: 253 class StateWatcher; 254 255 // The owning xds client. 256 WeakRefCountedPtr<XdsClient> xds_client_; 257 258 const XdsBootstrap::XdsServer& server_; 259 260 // The channel and its status. 261 grpc_channel* channel_; 262 bool shutting_down_ = false; 263 StateWatcher* watcher_ = nullptr; 264 265 // The retryable XDS calls. 266 OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_; 267 OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_; 268 }; 269 270 struct ListenerState { 271 std::map<ListenerWatcherInterface*, 272 std::unique_ptr<ListenerWatcherInterface>> 273 watchers; 274 // The latest data seen from LDS. 275 absl::optional<XdsApi::LdsUpdate> update; 276 XdsApi::ResourceMetadata meta; 277 }; 278 279 struct RouteConfigState { 280 std::map<RouteConfigWatcherInterface*, 281 std::unique_ptr<RouteConfigWatcherInterface>> 282 watchers; 283 // The latest data seen from RDS. 284 absl::optional<XdsApi::RdsUpdate> update; 285 XdsApi::ResourceMetadata meta; 286 }; 287 288 struct ClusterState { 289 std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>> 290 watchers; 291 // The latest data seen from CDS. 292 absl::optional<XdsApi::CdsUpdate> update; 293 XdsApi::ResourceMetadata meta; 294 }; 295 296 struct EndpointState { 297 std::map<EndpointWatcherInterface*, 298 std::unique_ptr<EndpointWatcherInterface>> 299 watchers; 300 // The latest data seen from EDS. 301 absl::optional<XdsApi::EdsUpdate> update; 302 XdsApi::ResourceMetadata meta; 303 }; 304 305 struct LoadReportState { 306 struct LocalityState { 307 XdsClusterLocalityStats* locality_stats = nullptr; 308 XdsClusterLocalityStats::Snapshot deleted_locality_stats; 309 }; 310 311 XdsClusterDropStats* drop_stats = nullptr; 312 XdsClusterDropStats::Snapshot deleted_drop_stats; 313 std::map<RefCountedPtr<XdsLocalityName>, LocalityState, 314 XdsLocalityName::Less> 315 locality_stats; 316 grpc_millis last_report_time = ExecCtx::Get()->Now(); 317 }; 318 319 // Sends an error notification to all watchers. 320 void NotifyOnErrorLocked(grpc_error_handle error) 321 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 322 323 XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked( 324 bool send_all_clusters, const std::set<std::string>& clusters) 325 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 326 327 void UpdateResourceMetadataWithFailedParseResultLocked( 328 grpc_millis update_time, const XdsApi::AdsParseResult& result) 329 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 330 331 std::unique_ptr<XdsBootstrap> bootstrap_; 332 grpc_channel_args* args_; 333 const grpc_millis request_timeout_; 334 grpc_pollset_set* interested_parties_; 335 OrphanablePtr<CertificateProviderStore> certificate_provider_store_; 336 XdsApi api_; 337 338 Mutex mu_; 339 340 // The channel for communicating with the xds server. 341 OrphanablePtr<ChannelState> chand_ ABSL_GUARDED_BY(mu_); 342 343 // One entry for each watched LDS resource. 344 std::map<std::string /*listener_name*/, ListenerState> listener_map_ 345 ABSL_GUARDED_BY(mu_); 346 // One entry for each watched RDS resource. 347 std::map<std::string /*route_config_name*/, RouteConfigState> 348 route_config_map_ ABSL_GUARDED_BY(mu_); 349 // One entry for each watched CDS resource. 350 std::map<std::string /*cluster_name*/, ClusterState> cluster_map_ 351 ABSL_GUARDED_BY(mu_); 352 // One entry for each watched EDS resource. 353 std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_ 354 ABSL_GUARDED_BY(mu_); 355 356 // Load report data. 357 std::map< 358 std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>, 359 LoadReportState> 360 load_report_map_ ABSL_GUARDED_BY(mu_); 361 362 // Stores the most recent accepted resource version for each resource type. 363 std::map<std::string /*type*/, std::string /*version*/> resource_version_map_ 364 ABSL_GUARDED_BY(mu_); 365 366 bool shutting_down_ ABSL_GUARDED_BY(mu_) = false; 367 }; 368 369 namespace internal { 370 void SetXdsChannelArgsForTest(grpc_channel_args* args); 371 void UnsetGlobalXdsClientForTest(); 372 // Sets bootstrap config to be used when no env var is set. 373 // Does not take ownership of config. 374 void SetXdsFallbackBootstrapConfig(const char* config); 375 } // namespace internal 376 377 } // namespace grpc_core 378 379 #endif // GRPC_CORE_EXT_XDS_XDS_CLIENT_H 380