• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_CORE_EXT_XDS_XDS_CLIENT_H
18 #define GRPC_CORE_EXT_XDS_XDS_CLIENT_H
19 
20 #include <grpc/support/port_platform.h>
21 
22 #include <set>
23 #include <vector>
24 
25 #include "absl/strings/string_view.h"
26 #include "absl/types/optional.h"
27 
28 #include "src/core/ext/xds/xds_api.h"
29 #include "src/core/ext/xds/xds_bootstrap.h"
30 #include "src/core/ext/xds/xds_client_stats.h"
31 #include "src/core/lib/channel/channelz.h"
32 #include "src/core/lib/gprpp/dual_ref_counted.h"
33 #include "src/core/lib/gprpp/memory.h"
34 #include "src/core/lib/gprpp/orphanable.h"
35 #include "src/core/lib/gprpp/ref_counted.h"
36 #include "src/core/lib/gprpp/ref_counted_ptr.h"
37 #include "src/core/lib/gprpp/sync.h"
38 
39 namespace grpc_core {
40 
41 extern TraceFlag grpc_xds_client_trace;
42 extern TraceFlag grpc_xds_client_refcount_trace;
43 
44 class XdsClient : public DualRefCounted<XdsClient> {
45  public:
46   // Listener data watcher interface.  Implemented by callers.
47   class ListenerWatcherInterface {
48    public:
49     virtual ~ListenerWatcherInterface() = default;
50     virtual void OnListenerChanged(XdsApi::LdsUpdate listener) = 0;
51     virtual void OnError(grpc_error_handle error) = 0;
52     virtual void OnResourceDoesNotExist() = 0;
53   };
54 
55   // RouteConfiguration data watcher interface.  Implemented by callers.
56   class RouteConfigWatcherInterface {
57    public:
58     virtual ~RouteConfigWatcherInterface() = default;
59     virtual void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) = 0;
60     virtual void OnError(grpc_error_handle error) = 0;
61     virtual void OnResourceDoesNotExist() = 0;
62   };
63 
64   // Cluster data watcher interface.  Implemented by callers.
65   class ClusterWatcherInterface {
66    public:
67     virtual ~ClusterWatcherInterface() = default;
68     virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0;
69     virtual void OnError(grpc_error_handle error) = 0;
70     virtual void OnResourceDoesNotExist() = 0;
71   };
72 
73   // Endpoint data watcher interface.  Implemented by callers.
74   class EndpointWatcherInterface {
75    public:
76     virtual ~EndpointWatcherInterface() = default;
77     virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0;
78     virtual void OnError(grpc_error_handle error) = 0;
79     virtual void OnResourceDoesNotExist() = 0;
80   };
81 
82   // Factory function to get or create the global XdsClient instance.
83   // If *error is not GRPC_ERROR_NONE upon return, then there was
84   // an error initializing the client.
85   static RefCountedPtr<XdsClient> GetOrCreate(const grpc_channel_args* args,
86                                               grpc_error_handle* error);
87 
88   // Most callers should not instantiate directly.  Use GetOrCreate() instead.
89   XdsClient(std::unique_ptr<XdsBootstrap> bootstrap,
90             const grpc_channel_args* args);
91   ~XdsClient() override;
92 
bootstrap()93   const XdsBootstrap& bootstrap() const {
94     // bootstrap_ is guaranteed to be non-null since XdsClient::GetOrCreate()
95     // would return a null object if bootstrap_ was null.
96     return *bootstrap_;
97   }
98 
certificate_provider_store()99   CertificateProviderStore& certificate_provider_store() {
100     return *certificate_provider_store_;
101   }
102 
interested_parties()103   grpc_pollset_set* interested_parties() const { return interested_parties_; }
104 
105   // TODO(roth): When we add federation, there will be multiple channels
106   // inside the XdsClient, and the set of channels may change over time,
107   // but not every channel may use every one of the child channels, so
108   // this API will need to change.  At minumum, we will need to hold a
109   // ref to the parent channelz node so that we can update its list of
110   // children as the set of xDS channels changes.  However, we may also
111   // want to make this a bit more selective such that only those
112   // channels on which a given parent channel is actually requesting
113   // resources will actually be marked as its children.
114   void AddChannelzLinkage(channelz::ChannelNode* parent_channelz_node);
115   void RemoveChannelzLinkage(channelz::ChannelNode* parent_channelz_node);
116 
117   void Orphan() override;
118 
119   // Start and cancel listener data watch for a listener.
120   // The XdsClient takes ownership of the watcher, but the caller may
121   // keep a raw pointer to the watcher, which may be used only for
122   // cancellation.  (Because the caller does not own the watcher, the
123   // pointer must not be used for any other purpose.)
124   // If the caller is going to start a new watch after cancelling the
125   // old one, it should set delay_unsubscription to true.
126   void WatchListenerData(absl::string_view listener_name,
127                          std::unique_ptr<ListenerWatcherInterface> watcher);
128   void CancelListenerDataWatch(absl::string_view listener_name,
129                                ListenerWatcherInterface* watcher,
130                                bool delay_unsubscription = false);
131 
132   // Start and cancel route config data watch for a listener.
133   // The XdsClient takes ownership of the watcher, but the caller may
134   // keep a raw pointer to the watcher, which may be used only for
135   // cancellation.  (Because the caller does not own the watcher, the
136   // pointer must not be used for any other purpose.)
137   // If the caller is going to start a new watch after cancelling the
138   // old one, it should set delay_unsubscription to true.
139   void WatchRouteConfigData(
140       absl::string_view route_config_name,
141       std::unique_ptr<RouteConfigWatcherInterface> watcher);
142   void CancelRouteConfigDataWatch(absl::string_view route_config_name,
143                                   RouteConfigWatcherInterface* watcher,
144                                   bool delay_unsubscription = false);
145 
146   // Start and cancel cluster data watch for a cluster.
147   // The XdsClient takes ownership of the watcher, but the caller may
148   // keep a raw pointer to the watcher, which may be used only for
149   // cancellation.  (Because the caller does not own the watcher, the
150   // pointer must not be used for any other purpose.)
151   // If the caller is going to start a new watch after cancelling the
152   // old one, it should set delay_unsubscription to true.
153   void WatchClusterData(absl::string_view cluster_name,
154                         std::unique_ptr<ClusterWatcherInterface> watcher);
155   void CancelClusterDataWatch(absl::string_view cluster_name,
156                               ClusterWatcherInterface* watcher,
157                               bool delay_unsubscription = false);
158 
159   // Start and cancel endpoint data watch for a cluster.
160   // The XdsClient takes ownership of the watcher, but the caller may
161   // keep a raw pointer to the watcher, which may be used only for
162   // cancellation.  (Because the caller does not own the watcher, the
163   // pointer must not be used for any other purpose.)
164   // If the caller is going to start a new watch after cancelling the
165   // old one, it should set delay_unsubscription to true.
166   void WatchEndpointData(absl::string_view eds_service_name,
167                          std::unique_ptr<EndpointWatcherInterface> watcher);
168   void CancelEndpointDataWatch(absl::string_view eds_service_name,
169                                EndpointWatcherInterface* watcher,
170                                bool delay_unsubscription = false);
171 
172   // Adds and removes drop stats for cluster_name and eds_service_name.
173   RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
174       absl::string_view lrs_server, absl::string_view cluster_name,
175       absl::string_view eds_service_name);
176   void RemoveClusterDropStats(absl::string_view /*lrs_server*/,
177                               absl::string_view cluster_name,
178                               absl::string_view eds_service_name,
179                               XdsClusterDropStats* cluster_drop_stats);
180 
181   // Adds and removes locality stats for cluster_name and eds_service_name
182   // for the specified locality.
183   RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats(
184       absl::string_view lrs_server, absl::string_view cluster_name,
185       absl::string_view eds_service_name,
186       RefCountedPtr<XdsLocalityName> locality);
187   void RemoveClusterLocalityStats(
188       absl::string_view /*lrs_server*/, absl::string_view cluster_name,
189       absl::string_view eds_service_name,
190       const RefCountedPtr<XdsLocalityName>& locality,
191       XdsClusterLocalityStats* cluster_locality_stats);
192 
193   // Resets connection backoff state.
194   void ResetBackoff();
195 
196   // Dumps the active xDS config in JSON format.
197   // Individual xDS resource is encoded as envoy.admin.v3.*ConfigDump. Returns
198   // envoy.service.status.v3.ClientConfig which also includes the config
199   // status (e.g., CLIENT_REQUESTED, CLIENT_ACKED, CLIENT_NACKED).
200   //
201   // Expected to be invoked by wrapper languages in their CSDS service
202   // implementation.
203   std::string DumpClientConfigBinary();
204 
205   // Helpers for encoding the XdsClient object in channel args.
206   grpc_arg MakeChannelArg() const;
207   static RefCountedPtr<XdsClient> GetFromChannelArgs(
208       const grpc_channel_args& args);
209 
210  private:
211   // Contains a channel to the xds server and all the data related to the
212   // channel.  Holds a ref to the xds client object.
213   //
214   // Currently, there is only one ChannelState object per XdsClient
215   // object, and it has essentially the same lifetime.  But in the
216   // future, when we add federation support, a single XdsClient may have
217   // multiple underlying channels to talk to different xDS servers.
218   class ChannelState : public InternallyRefCounted<ChannelState> {
219    public:
220     template <typename T>
221     class RetryableCall;
222 
223     class AdsCallState;
224     class LrsCallState;
225 
226     ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
227                  const XdsBootstrap::XdsServer& server);
228     ~ChannelState() override;
229 
230     void Orphan() override;
231 
channel()232     grpc_channel* channel() const { return channel_; }
xds_client()233     XdsClient* xds_client() const { return xds_client_.get(); }
234     AdsCallState* ads_calld() const;
235     LrsCallState* lrs_calld() const;
236 
237     void MaybeStartLrsCall();
238     void StopLrsCall();
239 
240     bool HasAdsCall() const;
241     bool HasActiveAdsCall() const;
242 
243     void StartConnectivityWatchLocked();
244     void CancelConnectivityWatchLocked();
245 
246     void SubscribeLocked(const std::string& type_url, const std::string& name)
247         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
248     void UnsubscribeLocked(const std::string& type_url, const std::string& name,
249                            bool delay_unsubscription)
250         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);
251 
252    private:
253     class StateWatcher;
254 
255     // The owning xds client.
256     WeakRefCountedPtr<XdsClient> xds_client_;
257 
258     const XdsBootstrap::XdsServer& server_;
259 
260     // The channel and its status.
261     grpc_channel* channel_;
262     bool shutting_down_ = false;
263     StateWatcher* watcher_ = nullptr;
264 
265     // The retryable XDS calls.
266     OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_;
267     OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;
268   };
269 
270   struct ListenerState {
271     std::map<ListenerWatcherInterface*,
272              std::unique_ptr<ListenerWatcherInterface>>
273         watchers;
274     // The latest data seen from LDS.
275     absl::optional<XdsApi::LdsUpdate> update;
276     XdsApi::ResourceMetadata meta;
277   };
278 
279   struct RouteConfigState {
280     std::map<RouteConfigWatcherInterface*,
281              std::unique_ptr<RouteConfigWatcherInterface>>
282         watchers;
283     // The latest data seen from RDS.
284     absl::optional<XdsApi::RdsUpdate> update;
285     XdsApi::ResourceMetadata meta;
286   };
287 
288   struct ClusterState {
289     std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>>
290         watchers;
291     // The latest data seen from CDS.
292     absl::optional<XdsApi::CdsUpdate> update;
293     XdsApi::ResourceMetadata meta;
294   };
295 
296   struct EndpointState {
297     std::map<EndpointWatcherInterface*,
298              std::unique_ptr<EndpointWatcherInterface>>
299         watchers;
300     // The latest data seen from EDS.
301     absl::optional<XdsApi::EdsUpdate> update;
302     XdsApi::ResourceMetadata meta;
303   };
304 
305   struct LoadReportState {
306     struct LocalityState {
307       XdsClusterLocalityStats* locality_stats = nullptr;
308       XdsClusterLocalityStats::Snapshot deleted_locality_stats;
309     };
310 
311     XdsClusterDropStats* drop_stats = nullptr;
312     XdsClusterDropStats::Snapshot deleted_drop_stats;
313     std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
314              XdsLocalityName::Less>
315         locality_stats;
316     grpc_millis last_report_time = ExecCtx::Get()->Now();
317   };
318 
319   // Sends an error notification to all watchers.
320   void NotifyOnErrorLocked(grpc_error_handle error)
321       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
322 
323   XdsApi::ClusterLoadReportMap BuildLoadReportSnapshotLocked(
324       bool send_all_clusters, const std::set<std::string>& clusters)
325       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
326 
327   void UpdateResourceMetadataWithFailedParseResultLocked(
328       grpc_millis update_time, const XdsApi::AdsParseResult& result)
329       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
330 
331   std::unique_ptr<XdsBootstrap> bootstrap_;
332   grpc_channel_args* args_;
333   const grpc_millis request_timeout_;
334   grpc_pollset_set* interested_parties_;
335   OrphanablePtr<CertificateProviderStore> certificate_provider_store_;
336   XdsApi api_;
337 
338   Mutex mu_;
339 
340   // The channel for communicating with the xds server.
341   OrphanablePtr<ChannelState> chand_ ABSL_GUARDED_BY(mu_);
342 
343   // One entry for each watched LDS resource.
344   std::map<std::string /*listener_name*/, ListenerState> listener_map_
345       ABSL_GUARDED_BY(mu_);
346   // One entry for each watched RDS resource.
347   std::map<std::string /*route_config_name*/, RouteConfigState>
348       route_config_map_ ABSL_GUARDED_BY(mu_);
349   // One entry for each watched CDS resource.
350   std::map<std::string /*cluster_name*/, ClusterState> cluster_map_
351       ABSL_GUARDED_BY(mu_);
352   // One entry for each watched EDS resource.
353   std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_
354       ABSL_GUARDED_BY(mu_);
355 
356   // Load report data.
357   std::map<
358       std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
359       LoadReportState>
360       load_report_map_ ABSL_GUARDED_BY(mu_);
361 
362   // Stores the most recent accepted resource version for each resource type.
363   std::map<std::string /*type*/, std::string /*version*/> resource_version_map_
364       ABSL_GUARDED_BY(mu_);
365 
366   bool shutting_down_ ABSL_GUARDED_BY(mu_) = false;
367 };
368 
369 namespace internal {
370 void SetXdsChannelArgsForTest(grpc_channel_args* args);
371 void UnsetGlobalXdsClientForTest();
372 // Sets bootstrap config to be used when no env var is set.
373 // Does not take ownership of config.
374 void SetXdsFallbackBootstrapConfig(const char* config);
375 }  // namespace internal
376 
377 }  // namespace grpc_core
378 
379 #endif  // GRPC_CORE_EXT_XDS_XDS_CLIENT_H
380