1 // 2 // Copyright 2019 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_H 18 #define GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_H 19 20 #include <grpc/event_engine/event_engine.h> 21 #include <grpc/support/port_platform.h> 22 23 #include <map> 24 #include <memory> 25 #include <set> 26 #include <string> 27 #include <utility> 28 #include <vector> 29 30 #include "absl/base/thread_annotations.h" 31 #include "absl/container/flat_hash_set.h" 32 #include "absl/status/status.h" 33 #include "absl/status/statusor.h" 34 #include "absl/strings/string_view.h" 35 #include "envoy/admin/v3/config_dump_shared.upb.h" 36 #include "envoy/service/status/v3/csds.upb.h" 37 #include "src/core/lib/debug/trace.h" 38 #include "src/core/util/dual_ref_counted.h" 39 #include "src/core/util/orphanable.h" 40 #include "src/core/util/ref_counted.h" 41 #include "src/core/util/ref_counted_ptr.h" 42 #include "src/core/util/sync.h" 43 #include "src/core/util/time.h" 44 #include "src/core/util/uri.h" 45 #include "src/core/util/work_serializer.h" 46 #include "src/core/xds/xds_client/xds_api.h" 47 #include "src/core/xds/xds_client/xds_bootstrap.h" 48 #include "src/core/xds/xds_client/xds_locality.h" 49 #include "src/core/xds/xds_client/xds_metrics.h" 50 #include "src/core/xds/xds_client/xds_resource_type.h" 51 #include "src/core/xds/xds_client/xds_transport.h" 52 #include "upb/reflection/def.hpp" 53 54 namespace grpc_core { 55 56 namespace testing { 57 class XdsClientTestPeer; 58 } 59 60 class XdsClient : public DualRefCounted<XdsClient> { 61 public: 62 // The authority reported for old-style (non-xdstp) resource names. 63 static constexpr absl::string_view kOldStyleAuthority = "#old"; 64 65 class ReadDelayHandle : public RefCounted<ReadDelayHandle> { 66 public: NoWait()67 static RefCountedPtr<ReadDelayHandle> NoWait() { return nullptr; } 68 }; 69 70 // Resource watcher interface. Implemented by callers. 71 // Note: Most callers will not use this API directly but rather via a 72 // resource-type-specific wrapper API provided by the relevant 73 // XdsResourceType implementation. 74 class ResourceWatcherInterface : public RefCounted<ResourceWatcherInterface> { 75 public: 76 virtual void OnGenericResourceChanged( 77 absl::StatusOr<std::shared_ptr<const XdsResourceType::ResourceData>> 78 resource, 79 RefCountedPtr<ReadDelayHandle> read_delay_handle) 80 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; 81 virtual void OnAmbientError( 82 absl::Status status, RefCountedPtr<ReadDelayHandle> read_delay_handle) 83 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0; 84 }; 85 86 XdsClient( 87 std::shared_ptr<XdsBootstrap> bootstrap, 88 RefCountedPtr<XdsTransportFactory> transport_factory, 89 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine, 90 std::unique_ptr<XdsMetricsReporter> metrics_reporter, 91 std::string user_agent_name, std::string user_agent_version, 92 Duration resource_request_timeout = Duration::Seconds(15)); 93 ~XdsClient() override; 94 95 // Start and cancel watch for a resource. 96 // 97 // The XdsClient takes ownership of the watcher, but the caller may 98 // keep a raw pointer to the watcher, which may be used only for 99 // cancellation. (Because the caller does not own the watcher, the 100 // pointer must not be used for any other purpose.) 101 // If the caller is going to start a new watch after cancelling the 102 // old one, it should set delay_unsubscription to true. 103 // 104 // The resource type object must be a global singleton, since the first 105 // time the XdsClient sees a particular resource type object, it will 106 // store the pointer to that object as the authoritative implementation for 107 // its type URLs. The resource type object must outlive the XdsClient object, 108 // and it is illegal to start a subsequent watch for the same type URLs using 109 // a different resource type object. 110 // 111 // Note: Most callers will not use this API directly but rather via a 112 // resource-type-specific wrapper API provided by the relevant 113 // XdsResourceType implementation. 114 void WatchResource(const XdsResourceType* type, absl::string_view name, 115 RefCountedPtr<ResourceWatcherInterface> watcher); 116 void CancelResourceWatch(const XdsResourceType* type, 117 absl::string_view listener_name, 118 ResourceWatcherInterface* watcher, 119 bool delay_unsubscription = false); 120 121 // Resets connection backoff state. 122 virtual void ResetBackoff(); 123 bootstrap()124 const XdsBootstrap& bootstrap() const { 125 return *bootstrap_; // ctor asserts that it is non-null 126 } 127 transport_factory()128 XdsTransportFactory* transport_factory() const { 129 return transport_factory_.get(); 130 } 131 engine()132 grpc_event_engine::experimental::EventEngine* engine() { 133 return engine_.get(); 134 } 135 136 protected: 137 void Orphaned() override; 138 mu()139 Mutex* mu() ABSL_LOCK_RETURNED(&mu_) { return &mu_; } 140 141 // Dumps the active xDS config to the provided 142 // envoy.service.status.v3.ClientConfig message. 143 void DumpClientConfig(std::set<std::string>* string_pool, upb_Arena* arena, 144 envoy_service_status_v3_ClientConfig* client_config) 145 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); 146 147 // Invokes func once for each combination of labels to report the 148 // resource count for those labels. 149 struct ResourceCountLabels { 150 absl::string_view xds_authority; 151 absl::string_view resource_type; 152 absl::string_view cache_state; 153 }; 154 void ReportResourceCounts( 155 absl::FunctionRef<void(const ResourceCountLabels&, uint64_t)> func) 156 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); 157 158 // Invokes func once for each xDS server to report whether the 159 // connection to that server is working. 160 void ReportServerConnections( 161 absl::FunctionRef<void(absl::string_view /*xds_server*/, bool)> func) 162 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); 163 164 private: 165 friend testing::XdsClientTestPeer; 166 167 struct XdsResourceKey { 168 std::string id; 169 std::vector<URI::QueryParam> query_params; 170 171 bool operator<(const XdsResourceKey& other) const { 172 int c = id.compare(other.id); 173 if (c != 0) return c < 0; 174 return query_params < other.query_params; 175 } 176 }; 177 178 struct AuthorityState; 179 180 struct XdsResourceName { 181 std::string authority; 182 XdsResourceKey key; 183 }; 184 185 // Contains a channel to the xds server and all the data related to the 186 // channel. Holds a ref to the xds client object. 187 class XdsChannel final : public DualRefCounted<XdsChannel> { 188 public: 189 template <typename T> 190 class RetryableCall; 191 192 class AdsCall; 193 194 XdsChannel(WeakRefCountedPtr<XdsClient> xds_client, 195 const XdsBootstrap::XdsServer& server); 196 ~XdsChannel() override; 197 xds_client()198 XdsClient* xds_client() const { return xds_client_.get(); } 199 AdsCall* ads_call() const; 200 201 void ResetBackoff(); 202 203 // Returns non-OK if there has been an error since the last time the 204 // ADS stream saw a response. status()205 const absl::Status& status() const { return status_; } 206 207 void SubscribeLocked(const XdsResourceType* type, 208 const XdsResourceName& name) 209 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 210 void UnsubscribeLocked(const XdsResourceType* type, 211 const XdsResourceName& name, 212 bool delay_unsubscription) 213 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 214 server_uri()215 absl::string_view server_uri() const { return server_.server_uri(); } 216 217 private: 218 class ConnectivityFailureWatcher; 219 220 // Attempts to find a suitable Xds fallback server. Returns true if 221 // a connection to a suitable server had been established. 222 bool MaybeFallbackLocked(const std::string& authority, 223 XdsClient::AuthorityState& authority_state) 224 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 225 void SetHealthyLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 226 void Orphaned() override; 227 228 void OnConnectivityFailure(absl::Status status); 229 230 // Enqueues error notifications to watchers. Caller must drain 231 // XdsClient::work_serializer_ after releasing the lock. 232 void SetChannelStatusLocked(absl::Status status) 233 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_); 234 235 // The owning xds client. 236 WeakRefCountedPtr<XdsClient> xds_client_; 237 238 const XdsBootstrap::XdsServer& server_; // Owned by bootstrap. 239 240 RefCountedPtr<XdsTransportFactory::XdsTransport> transport_; 241 RefCountedPtr<XdsTransportFactory::XdsTransport::ConnectivityFailureWatcher> 242 failure_watcher_; 243 244 bool shutting_down_ = false; 245 246 // The retryable ADS and LRS calls. 247 OrphanablePtr<RetryableCall<AdsCall>> ads_call_; 248 249 // Stores the most recent accepted resource version for each resource type. 250 std::map<const XdsResourceType*, std::string /*version*/> 251 resource_type_version_map_; 252 253 absl::Status status_; 254 }; 255 256 using WatcherSet = 257 absl::flat_hash_set<RefCountedPtr<ResourceWatcherInterface>, 258 RefCountedPtrHash<ResourceWatcherInterface>, 259 RefCountedPtrEq<ResourceWatcherInterface>>; 260 261 class ResourceState { 262 public: 263 // Resource status from the view of a xDS client, which tells the 264 // synchronization status between the xDS client and the xDS server. 265 enum ClientResourceStatus { 266 // Client requested this resource but hasn't received any update from 267 // management server. The client will not fail requests, but will queue 268 // them until update arrives or the client times out waiting for the 269 // resource. 270 REQUESTED = 1, 271 // This resource has been requested by the client but has either not been 272 // delivered by the server or was previously delivered by the server and 273 // then subsequently removed from resources provided by the server. 274 DOES_NOT_EXIST, 275 // Client received this resource and replied with ACK. 276 ACKED, 277 // Client received this resource and replied with NACK. 278 NACKED, 279 }; 280 static_assert(static_cast<ClientResourceStatus>(envoy_admin_v3_REQUESTED) == 281 ClientResourceStatus::REQUESTED); 282 static_assert( 283 static_cast<ClientResourceStatus>(envoy_admin_v3_DOES_NOT_EXIST) == 284 ClientResourceStatus::DOES_NOT_EXIST); 285 static_assert(static_cast<ClientResourceStatus>(envoy_admin_v3_ACKED) == 286 ClientResourceStatus::ACKED); 287 static_assert(static_cast<ClientResourceStatus>(envoy_admin_v3_NACKED) == 288 ClientResourceStatus::NACKED); 289 AddWatcher(RefCountedPtr<ResourceWatcherInterface> watcher)290 void AddWatcher(RefCountedPtr<ResourceWatcherInterface> watcher) { 291 watchers_.insert(std::move(watcher)); 292 } RemoveWatcher(ResourceWatcherInterface * watcher)293 void RemoveWatcher(ResourceWatcherInterface* watcher) { 294 watchers_.erase(watcher); 295 } HasWatchers()296 bool HasWatchers() const { return !watchers_.empty(); } watchers()297 const WatcherSet& watchers() const { return watchers_; } 298 299 void SetAcked(std::shared_ptr<const XdsResourceType::ResourceData> resource, 300 std::string serialized_proto, std::string version, 301 Timestamp update_time); 302 void SetNacked(const std::string& version, const std::string& details, 303 Timestamp update_time); 304 void SetDoesNotExist(); 305 set_ignored_deletion(bool value)306 void set_ignored_deletion(bool value) { ignored_deletion_ = value; } ignored_deletion()307 bool ignored_deletion() const { return ignored_deletion_; } 308 client_status()309 ClientResourceStatus client_status() const { return client_status_; } 310 absl::string_view CacheStateString() const; 311 HasResource()312 bool HasResource() const { return resource_ != nullptr; } resource()313 std::shared_ptr<const XdsResourceType::ResourceData> resource() const { 314 return resource_; 315 } 316 failed_details()317 absl::string_view failed_details() const { return failed_details_; } 318 319 void FillGenericXdsConfig( 320 upb_StringView type_url, upb_StringView resource_name, upb_Arena* arena, 321 envoy_service_status_v3_ClientConfig_GenericXdsConfig* entry) const; 322 323 private: 324 WatcherSet watchers_; 325 // The latest data seen for the resource. 326 std::shared_ptr<const XdsResourceType::ResourceData> resource_; 327 // Cache state. 328 ClientResourceStatus client_status_ = REQUESTED; 329 // The serialized bytes of the last successfully updated raw xDS resource. 330 std::string serialized_proto_; 331 // The timestamp when the resource was last successfully updated. 332 Timestamp update_time_; 333 // The last successfully updated version of the resource. 334 std::string version_; 335 // The rejected version string of the last failed update attempt. 336 std::string failed_version_; 337 // Details about the last failed update attempt. 338 std::string failed_details_; 339 // Timestamp of the last failed update attempt. 340 Timestamp failed_update_time_; 341 // If we've ignored deletion. 342 bool ignored_deletion_ = false; 343 }; 344 345 struct AuthorityState { 346 std::vector<RefCountedPtr<XdsChannel>> xds_channels; 347 std::map<const XdsResourceType*, std::map<XdsResourceKey, ResourceState>> 348 resource_map; 349 }; 350 351 absl::Status AppendNodeToStatus(const absl::Status& status) const; 352 353 // Sends an OnResourceChanged() notification to a specific set of watchers. 354 void NotifyWatchersOnResourceChanged( 355 absl::StatusOr<std::shared_ptr<const XdsResourceType::ResourceData>> 356 resource, 357 WatcherSet watchers, RefCountedPtr<ReadDelayHandle> read_delay_handle); 358 // Sends an OnAmbientError() notification to a specific set of watchers. 359 void NotifyWatchersOnAmbientError( 360 absl::Status status, WatcherSet watchers, 361 RefCountedPtr<ReadDelayHandle> read_delay_handle); 362 363 void MaybeRegisterResourceTypeLocked(const XdsResourceType* resource_type) 364 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 365 366 // Gets the type for resource_type, or null if the type is unknown. 367 const XdsResourceType* GetResourceTypeLocked(absl::string_view resource_type) 368 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 369 370 bool HasUncachedResources(const AuthorityState& authority_state); 371 372 absl::StatusOr<XdsResourceName> ParseXdsResourceName( 373 absl::string_view name, const XdsResourceType* type); 374 static std::string ConstructFullXdsResourceName( 375 absl::string_view authority, absl::string_view resource_type, 376 const XdsResourceKey& key); 377 378 RefCountedPtr<XdsChannel> GetOrCreateXdsChannelLocked( 379 const XdsBootstrap::XdsServer& server, const char* reason) 380 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 381 382 std::shared_ptr<XdsBootstrap> bootstrap_; 383 const std::string user_agent_name_; 384 const std::string user_agent_version_; 385 RefCountedPtr<XdsTransportFactory> transport_factory_; 386 const Duration request_timeout_; 387 const bool xds_federation_enabled_; 388 WorkSerializer work_serializer_; 389 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_; 390 std::unique_ptr<XdsMetricsReporter> metrics_reporter_; 391 392 Mutex mu_; 393 394 // Stores resource type objects seen by type URL. 395 std::map<absl::string_view /*resource_type*/, const XdsResourceType*> 396 resource_types_ ABSL_GUARDED_BY(mu_); 397 upb::DefPool def_pool_ ABSL_GUARDED_BY(mu_); 398 399 // Map of existing xDS server channels. 400 std::map<std::string /*XdsServer key*/, XdsChannel*> xds_channel_map_ 401 ABSL_GUARDED_BY(mu_); 402 403 std::map<std::string /*authority*/, AuthorityState> authority_state_map_ 404 ABSL_GUARDED_BY(mu_); 405 406 // Stores started watchers whose resource name was not parsed successfully, 407 // waiting to be cancelled or reset in Orphan(). 408 WatcherSet invalid_watchers_ ABSL_GUARDED_BY(mu_); 409 410 bool shutting_down_ ABSL_GUARDED_BY(mu_) = false; 411 }; 412 413 } // namespace grpc_core 414 415 #endif // GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_H 416