• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_H
18 #define GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_H
19 
20 #include <grpc/event_engine/event_engine.h>
21 #include <grpc/support/port_platform.h>
22 
23 #include <map>
24 #include <memory>
25 #include <set>
26 #include <string>
27 #include <utility>
28 #include <vector>
29 
30 #include "absl/base/thread_annotations.h"
31 #include "absl/container/flat_hash_set.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/string_view.h"
35 #include "envoy/admin/v3/config_dump_shared.upb.h"
36 #include "envoy/service/status/v3/csds.upb.h"
37 #include "src/core/lib/debug/trace.h"
38 #include "src/core/util/dual_ref_counted.h"
39 #include "src/core/util/orphanable.h"
40 #include "src/core/util/ref_counted.h"
41 #include "src/core/util/ref_counted_ptr.h"
42 #include "src/core/util/sync.h"
43 #include "src/core/util/time.h"
44 #include "src/core/util/uri.h"
45 #include "src/core/util/work_serializer.h"
46 #include "src/core/xds/xds_client/xds_api.h"
47 #include "src/core/xds/xds_client/xds_bootstrap.h"
48 #include "src/core/xds/xds_client/xds_locality.h"
49 #include "src/core/xds/xds_client/xds_metrics.h"
50 #include "src/core/xds/xds_client/xds_resource_type.h"
51 #include "src/core/xds/xds_client/xds_transport.h"
52 #include "upb/reflection/def.hpp"
53 
54 namespace grpc_core {
55 
56 namespace testing {
57 class XdsClientTestPeer;
58 }
59 
60 class XdsClient : public DualRefCounted<XdsClient> {
61  public:
62   // The authority reported for old-style (non-xdstp) resource names.
63   static constexpr absl::string_view kOldStyleAuthority = "#old";
64 
65   class ReadDelayHandle : public RefCounted<ReadDelayHandle> {
66    public:
NoWait()67     static RefCountedPtr<ReadDelayHandle> NoWait() { return nullptr; }
68   };
69 
70   // Resource watcher interface.  Implemented by callers.
71   // Note: Most callers will not use this API directly but rather via a
72   // resource-type-specific wrapper API provided by the relevant
73   // XdsResourceType implementation.
74   class ResourceWatcherInterface : public RefCounted<ResourceWatcherInterface> {
75    public:
76     virtual void OnGenericResourceChanged(
77         absl::StatusOr<std::shared_ptr<const XdsResourceType::ResourceData>>
78             resource,
79         RefCountedPtr<ReadDelayHandle> read_delay_handle)
80         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
81     virtual void OnAmbientError(
82         absl::Status status, RefCountedPtr<ReadDelayHandle> read_delay_handle)
83         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
84   };
85 
86   XdsClient(
87       std::shared_ptr<XdsBootstrap> bootstrap,
88       RefCountedPtr<XdsTransportFactory> transport_factory,
89       std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine,
90       std::unique_ptr<XdsMetricsReporter> metrics_reporter,
91       std::string user_agent_name, std::string user_agent_version,
92       Duration resource_request_timeout = Duration::Seconds(15));
93   ~XdsClient() override;
94 
95   // Start and cancel watch for a resource.
96   //
97   // The XdsClient takes ownership of the watcher, but the caller may
98   // keep a raw pointer to the watcher, which may be used only for
99   // cancellation.  (Because the caller does not own the watcher, the
100   // pointer must not be used for any other purpose.)
101   // If the caller is going to start a new watch after cancelling the
102   // old one, it should set delay_unsubscription to true.
103   //
104   // The resource type object must be a global singleton, since the first
105   // time the XdsClient sees a particular resource type object, it will
106   // store the pointer to that object as the authoritative implementation for
107   // its type URLs.  The resource type object must outlive the XdsClient object,
108   // and it is illegal to start a subsequent watch for the same type URLs using
109   // a different resource type object.
110   //
111   // Note: Most callers will not use this API directly but rather via a
112   // resource-type-specific wrapper API provided by the relevant
113   // XdsResourceType implementation.
114   void WatchResource(const XdsResourceType* type, absl::string_view name,
115                      RefCountedPtr<ResourceWatcherInterface> watcher);
116   void CancelResourceWatch(const XdsResourceType* type,
117                            absl::string_view listener_name,
118                            ResourceWatcherInterface* watcher,
119                            bool delay_unsubscription = false);
120 
121   // Resets connection backoff state.
122   virtual void ResetBackoff();
123 
bootstrap()124   const XdsBootstrap& bootstrap() const {
125     return *bootstrap_;  // ctor asserts that it is non-null
126   }
127 
transport_factory()128   XdsTransportFactory* transport_factory() const {
129     return transport_factory_.get();
130   }
131 
engine()132   grpc_event_engine::experimental::EventEngine* engine() {
133     return engine_.get();
134   }
135 
136  protected:
137   void Orphaned() override;
138 
mu()139   Mutex* mu() ABSL_LOCK_RETURNED(&mu_) { return &mu_; }
140 
141   // Dumps the active xDS config to the provided
142   // envoy.service.status.v3.ClientConfig message.
143   void DumpClientConfig(std::set<std::string>* string_pool, upb_Arena* arena,
144                         envoy_service_status_v3_ClientConfig* client_config)
145       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
146 
147   // Invokes func once for each combination of labels to report the
148   // resource count for those labels.
149   struct ResourceCountLabels {
150     absl::string_view xds_authority;
151     absl::string_view resource_type;
152     absl::string_view cache_state;
153   };
154   void ReportResourceCounts(
155       absl::FunctionRef<void(const ResourceCountLabels&, uint64_t)> func)
156       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
157 
158   // Invokes func once for each xDS server to report whether the
159   // connection to that server is working.
160   void ReportServerConnections(
161       absl::FunctionRef<void(absl::string_view /*xds_server*/, bool)> func)
162       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
163 
164  private:
165   friend testing::XdsClientTestPeer;
166 
167   struct XdsResourceKey {
168     std::string id;
169     std::vector<URI::QueryParam> query_params;
170 
171     bool operator<(const XdsResourceKey& other) const {
172       int c = id.compare(other.id);
173       if (c != 0) return c < 0;
174       return query_params < other.query_params;
175     }
176   };
177 
178   struct AuthorityState;
179 
180   struct XdsResourceName {
181     std::string authority;
182     XdsResourceKey key;
183   };
184 
185   // Contains a channel to the xds server and all the data related to the
186   // channel.  Holds a ref to the xds client object.
187   class XdsChannel final : public DualRefCounted<XdsChannel> {
188    public:
189     template <typename T>
190     class RetryableCall;
191 
192     class AdsCall;
193 
194     XdsChannel(WeakRefCountedPtr<XdsClient> xds_client,
195                const XdsBootstrap::XdsServer& server);
196     ~XdsChannel() override;
197 
xds_client()198     XdsClient* xds_client() const { return xds_client_.get(); }
199     AdsCall* ads_call() const;
200 
201     void ResetBackoff();
202 
203     // Returns non-OK if there has been an error since the last time the
204     // ADS stream saw a response.
status()205     const absl::Status& status() const { return status_; }
206 
207     void SubscribeLocked(const XdsResourceType* type,
208                          const XdsResourceName& name)
209         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
210     void UnsubscribeLocked(const XdsResourceType* type,
211                            const XdsResourceName& name,
212                            bool delay_unsubscription)
213         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
214 
server_uri()215     absl::string_view server_uri() const { return server_.server_uri(); }
216 
217    private:
218     class ConnectivityFailureWatcher;
219 
220     // Attempts to find a suitable Xds fallback server. Returns true if
221     // a connection to a suitable server had been established.
222     bool MaybeFallbackLocked(const std::string& authority,
223                              XdsClient::AuthorityState& authority_state)
224         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
225     void SetHealthyLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
226     void Orphaned() override;
227 
228     void OnConnectivityFailure(absl::Status status);
229 
230     // Enqueues error notifications to watchers.  Caller must drain
231     // XdsClient::work_serializer_ after releasing the lock.
232     void SetChannelStatusLocked(absl::Status status)
233         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
234 
235     // The owning xds client.
236     WeakRefCountedPtr<XdsClient> xds_client_;
237 
238     const XdsBootstrap::XdsServer& server_;  // Owned by bootstrap.
239 
240     RefCountedPtr<XdsTransportFactory::XdsTransport> transport_;
241     RefCountedPtr<XdsTransportFactory::XdsTransport::ConnectivityFailureWatcher>
242         failure_watcher_;
243 
244     bool shutting_down_ = false;
245 
246     // The retryable ADS and LRS calls.
247     OrphanablePtr<RetryableCall<AdsCall>> ads_call_;
248 
249     // Stores the most recent accepted resource version for each resource type.
250     std::map<const XdsResourceType*, std::string /*version*/>
251         resource_type_version_map_;
252 
253     absl::Status status_;
254   };
255 
256   using WatcherSet =
257       absl::flat_hash_set<RefCountedPtr<ResourceWatcherInterface>,
258                           RefCountedPtrHash<ResourceWatcherInterface>,
259                           RefCountedPtrEq<ResourceWatcherInterface>>;
260 
261   class ResourceState {
262    public:
263     // Resource status from the view of a xDS client, which tells the
264     // synchronization status between the xDS client and the xDS server.
265     enum ClientResourceStatus {
266       // Client requested this resource but hasn't received any update from
267       // management server. The client will not fail requests, but will queue
268       // them until update arrives or the client times out waiting for the
269       // resource.
270       REQUESTED = 1,
271       // This resource has been requested by the client but has either not been
272       // delivered by the server or was previously delivered by the server and
273       // then subsequently removed from resources provided by the server.
274       DOES_NOT_EXIST,
275       // Client received this resource and replied with ACK.
276       ACKED,
277       // Client received this resource and replied with NACK.
278       NACKED,
279     };
280     static_assert(static_cast<ClientResourceStatus>(envoy_admin_v3_REQUESTED) ==
281                   ClientResourceStatus::REQUESTED);
282     static_assert(
283         static_cast<ClientResourceStatus>(envoy_admin_v3_DOES_NOT_EXIST) ==
284         ClientResourceStatus::DOES_NOT_EXIST);
285     static_assert(static_cast<ClientResourceStatus>(envoy_admin_v3_ACKED) ==
286                   ClientResourceStatus::ACKED);
287     static_assert(static_cast<ClientResourceStatus>(envoy_admin_v3_NACKED) ==
288                   ClientResourceStatus::NACKED);
289 
AddWatcher(RefCountedPtr<ResourceWatcherInterface> watcher)290     void AddWatcher(RefCountedPtr<ResourceWatcherInterface> watcher) {
291       watchers_.insert(std::move(watcher));
292     }
RemoveWatcher(ResourceWatcherInterface * watcher)293     void RemoveWatcher(ResourceWatcherInterface* watcher) {
294       watchers_.erase(watcher);
295     }
HasWatchers()296     bool HasWatchers() const { return !watchers_.empty(); }
watchers()297     const WatcherSet& watchers() const { return watchers_; }
298 
299     void SetAcked(std::shared_ptr<const XdsResourceType::ResourceData> resource,
300                   std::string serialized_proto, std::string version,
301                   Timestamp update_time);
302     void SetNacked(const std::string& version, const std::string& details,
303                    Timestamp update_time);
304     void SetDoesNotExist();
305 
set_ignored_deletion(bool value)306     void set_ignored_deletion(bool value) { ignored_deletion_ = value; }
ignored_deletion()307     bool ignored_deletion() const { return ignored_deletion_; }
308 
client_status()309     ClientResourceStatus client_status() const { return client_status_; }
310     absl::string_view CacheStateString() const;
311 
HasResource()312     bool HasResource() const { return resource_ != nullptr; }
resource()313     std::shared_ptr<const XdsResourceType::ResourceData> resource() const {
314       return resource_;
315     }
316 
failed_details()317     absl::string_view failed_details() const { return failed_details_; }
318 
319     void FillGenericXdsConfig(
320         upb_StringView type_url, upb_StringView resource_name, upb_Arena* arena,
321         envoy_service_status_v3_ClientConfig_GenericXdsConfig* entry) const;
322 
323    private:
324     WatcherSet watchers_;
325     // The latest data seen for the resource.
326     std::shared_ptr<const XdsResourceType::ResourceData> resource_;
327     // Cache state.
328     ClientResourceStatus client_status_ = REQUESTED;
329     // The serialized bytes of the last successfully updated raw xDS resource.
330     std::string serialized_proto_;
331     // The timestamp when the resource was last successfully updated.
332     Timestamp update_time_;
333     // The last successfully updated version of the resource.
334     std::string version_;
335     // The rejected version string of the last failed update attempt.
336     std::string failed_version_;
337     // Details about the last failed update attempt.
338     std::string failed_details_;
339     // Timestamp of the last failed update attempt.
340     Timestamp failed_update_time_;
341     // If we've ignored deletion.
342     bool ignored_deletion_ = false;
343   };
344 
345   struct AuthorityState {
346     std::vector<RefCountedPtr<XdsChannel>> xds_channels;
347     std::map<const XdsResourceType*, std::map<XdsResourceKey, ResourceState>>
348         resource_map;
349   };
350 
351   absl::Status AppendNodeToStatus(const absl::Status& status) const;
352 
353   // Sends an OnResourceChanged() notification to a specific set of watchers.
354   void NotifyWatchersOnResourceChanged(
355       absl::StatusOr<std::shared_ptr<const XdsResourceType::ResourceData>>
356           resource,
357       WatcherSet watchers, RefCountedPtr<ReadDelayHandle> read_delay_handle);
358   // Sends an OnAmbientError() notification to a specific set of watchers.
359   void NotifyWatchersOnAmbientError(
360       absl::Status status, WatcherSet watchers,
361       RefCountedPtr<ReadDelayHandle> read_delay_handle);
362 
363   void MaybeRegisterResourceTypeLocked(const XdsResourceType* resource_type)
364       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
365 
366   // Gets the type for resource_type, or null if the type is unknown.
367   const XdsResourceType* GetResourceTypeLocked(absl::string_view resource_type)
368       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
369 
370   bool HasUncachedResources(const AuthorityState& authority_state);
371 
372   absl::StatusOr<XdsResourceName> ParseXdsResourceName(
373       absl::string_view name, const XdsResourceType* type);
374   static std::string ConstructFullXdsResourceName(
375       absl::string_view authority, absl::string_view resource_type,
376       const XdsResourceKey& key);
377 
378   RefCountedPtr<XdsChannel> GetOrCreateXdsChannelLocked(
379       const XdsBootstrap::XdsServer& server, const char* reason)
380       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
381 
382   std::shared_ptr<XdsBootstrap> bootstrap_;
383   const std::string user_agent_name_;
384   const std::string user_agent_version_;
385   RefCountedPtr<XdsTransportFactory> transport_factory_;
386   const Duration request_timeout_;
387   const bool xds_federation_enabled_;
388   WorkSerializer work_serializer_;
389   std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
390   std::unique_ptr<XdsMetricsReporter> metrics_reporter_;
391 
392   Mutex mu_;
393 
394   // Stores resource type objects seen by type URL.
395   std::map<absl::string_view /*resource_type*/, const XdsResourceType*>
396       resource_types_ ABSL_GUARDED_BY(mu_);
397   upb::DefPool def_pool_ ABSL_GUARDED_BY(mu_);
398 
399   // Map of existing xDS server channels.
400   std::map<std::string /*XdsServer key*/, XdsChannel*> xds_channel_map_
401       ABSL_GUARDED_BY(mu_);
402 
403   std::map<std::string /*authority*/, AuthorityState> authority_state_map_
404       ABSL_GUARDED_BY(mu_);
405 
406   // Stores started watchers whose resource name was not parsed successfully,
407   // waiting to be cancelled or reset in Orphan().
408   WatcherSet invalid_watchers_ ABSL_GUARDED_BY(mu_);
409 
410   bool shutting_down_ ABSL_GUARDED_BY(mu_) = false;
411 };
412 
413 }  // namespace grpc_core
414 
415 #endif  // GRPC_SRC_CORE_XDS_XDS_CLIENT_XDS_CLIENT_H
416