• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_XDS_XDS_CLIENT_H
18 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_XDS_XDS_CLIENT_H
19 
20 #include <grpc/support/port_platform.h>
21 
22 #include <set>
23 
24 #include "absl/strings/string_view.h"
25 #include "absl/types/optional.h"
26 
27 #include "src/core/ext/filters/client_channel/service_config.h"
28 #include "src/core/ext/filters/client_channel/xds/xds_api.h"
29 #include "src/core/ext/filters/client_channel/xds/xds_bootstrap.h"
30 #include "src/core/ext/filters/client_channel/xds/xds_client_stats.h"
31 #include "src/core/lib/gprpp/map.h"
32 #include "src/core/lib/gprpp/memory.h"
33 #include "src/core/lib/gprpp/orphanable.h"
34 #include "src/core/lib/gprpp/ref_counted.h"
35 #include "src/core/lib/gprpp/ref_counted_ptr.h"
36 #include "src/core/lib/iomgr/work_serializer.h"
37 
38 namespace grpc_core {
39 
40 extern TraceFlag xds_client_trace;
41 
42 class XdsClient : public InternallyRefCounted<XdsClient> {
43  public:
44   // Service config watcher interface.  Implemented by callers.
45   class ServiceConfigWatcherInterface {
46    public:
47     virtual ~ServiceConfigWatcherInterface() = default;
48 
49     virtual void OnServiceConfigChanged(
50         RefCountedPtr<ServiceConfig> service_config) = 0;
51 
52     virtual void OnError(grpc_error* error) = 0;
53 
54     virtual void OnResourceDoesNotExist() = 0;
55   };
56 
57   // Cluster data watcher interface.  Implemented by callers.
58   class ClusterWatcherInterface {
59    public:
60     virtual ~ClusterWatcherInterface() = default;
61 
62     virtual void OnClusterChanged(XdsApi::CdsUpdate cluster_data) = 0;
63 
64     virtual void OnError(grpc_error* error) = 0;
65 
66     virtual void OnResourceDoesNotExist() = 0;
67   };
68 
69   // Endpoint data watcher interface.  Implemented by callers.
70   class EndpointWatcherInterface {
71    public:
72     virtual ~EndpointWatcherInterface() = default;
73 
74     virtual void OnEndpointChanged(XdsApi::EdsUpdate update) = 0;
75 
76     virtual void OnError(grpc_error* error) = 0;
77 
78     virtual void OnResourceDoesNotExist() = 0;
79   };
80 
81   // If *error is not GRPC_ERROR_NONE after construction, then there was
82   // an error initializing the client.
83   XdsClient(std::shared_ptr<WorkSerializer> work_serializer,
84             grpc_pollset_set* interested_parties, absl::string_view server_name,
85             std::unique_ptr<ServiceConfigWatcherInterface> watcher,
86             const grpc_channel_args& channel_args, grpc_error** error);
87   ~XdsClient();
88 
89   void Orphan() override;
90 
91   // Start and cancel cluster data watch for a cluster.
92   // The XdsClient takes ownership of the watcher, but the caller may
93   // keep a raw pointer to the watcher, which may be used only for
94   // cancellation.  (Because the caller does not own the watcher, the
95   // pointer must not be used for any other purpose.)
96   // If the caller is going to start a new watch after cancelling the
97   // old one, it should set delay_unsubscription to true.
98   void WatchClusterData(absl::string_view cluster_name,
99                         std::unique_ptr<ClusterWatcherInterface> watcher);
100   void CancelClusterDataWatch(absl::string_view cluster_name,
101                               ClusterWatcherInterface* watcher,
102                               bool delay_unsubscription = false);
103 
104   // Start and cancel endpoint data watch for a cluster.
105   // The XdsClient takes ownership of the watcher, but the caller may
106   // keep a raw pointer to the watcher, which may be used only for
107   // cancellation.  (Because the caller does not own the watcher, the
108   // pointer must not be used for any other purpose.)
109   // If the caller is going to start a new watch after cancelling the
110   // old one, it should set delay_unsubscription to true.
111   void WatchEndpointData(absl::string_view eds_service_name,
112                          std::unique_ptr<EndpointWatcherInterface> watcher);
113   void CancelEndpointDataWatch(absl::string_view eds_service_name,
114                                EndpointWatcherInterface* watcher,
115                                bool delay_unsubscription = false);
116 
117   // Adds and removes drop stats for cluster_name and eds_service_name.
118   RefCountedPtr<XdsClusterDropStats> AddClusterDropStats(
119       absl::string_view lrs_server, absl::string_view cluster_name,
120       absl::string_view eds_service_name);
121   void RemoveClusterDropStats(absl::string_view /*lrs_server*/,
122                               absl::string_view cluster_name,
123                               absl::string_view eds_service_name,
124                               XdsClusterDropStats* cluster_drop_stats);
125 
126   // Adds and removes locality stats for cluster_name and eds_service_name
127   // for the specified locality.
128   RefCountedPtr<XdsClusterLocalityStats> AddClusterLocalityStats(
129       absl::string_view lrs_server, absl::string_view cluster_name,
130       absl::string_view eds_service_name,
131       RefCountedPtr<XdsLocalityName> locality);
132   void RemoveClusterLocalityStats(
133       absl::string_view /*lrs_server*/, absl::string_view cluster_name,
134       absl::string_view eds_service_name,
135       const RefCountedPtr<XdsLocalityName>& locality,
136       XdsClusterLocalityStats* cluster_locality_stats);
137 
138   // Resets connection backoff state.
139   void ResetBackoff();
140 
141   // Helpers for encoding the XdsClient object in channel args.
142   grpc_arg MakeChannelArg() const;
143   static RefCountedPtr<XdsClient> GetFromChannelArgs(
144       const grpc_channel_args& args);
145   static grpc_channel_args* RemoveFromChannelArgs(
146       const grpc_channel_args& args);
147 
148  private:
149   // Contains a channel to the xds server and all the data related to the
150   // channel.  Holds a ref to the xds client object.
151   // TODO(roth): This is separate from the XdsClient object because it was
152   // originally designed to be able to swap itself out in case the
153   // balancer name changed.  Now that the balancer name is going to be
154   // coming from the bootstrap file, we don't really need this level of
155   // indirection unless we decide to support watching the bootstrap file
156   // for changes.  At some point, if we decide that we're never going to
157   // need to do that, then we can eliminate this class and move its
158   // contents directly into the XdsClient class.
159   class ChannelState : public InternallyRefCounted<ChannelState> {
160    public:
161     template <typename T>
162     class RetryableCall;
163 
164     class AdsCallState;
165     class LrsCallState;
166 
167     ChannelState(RefCountedPtr<XdsClient> xds_client, grpc_channel* channel);
168     ~ChannelState();
169 
170     void Orphan() override;
171 
channel()172     grpc_channel* channel() const { return channel_; }
xds_client()173     XdsClient* xds_client() const { return xds_client_.get(); }
174     AdsCallState* ads_calld() const;
175     LrsCallState* lrs_calld() const;
176 
177     void MaybeStartLrsCall();
178     void StopLrsCall();
179 
180     bool HasActiveAdsCall() const;
181 
182     void StartConnectivityWatchLocked();
183     void CancelConnectivityWatchLocked();
184 
185     void Subscribe(const std::string& type_url, const std::string& name);
186     void Unsubscribe(const std::string& type_url, const std::string& name,
187                      bool delay_unsubscription);
188 
189    private:
190     class StateWatcher;
191 
192     // The owning xds client.
193     RefCountedPtr<XdsClient> xds_client_;
194 
195     // The channel and its status.
196     grpc_channel* channel_;
197     bool shutting_down_ = false;
198     StateWatcher* watcher_ = nullptr;
199 
200     // The retryable XDS calls.
201     OrphanablePtr<RetryableCall<AdsCallState>> ads_calld_;
202     OrphanablePtr<RetryableCall<LrsCallState>> lrs_calld_;
203   };
204 
205   struct ClusterState {
206     std::map<ClusterWatcherInterface*, std::unique_ptr<ClusterWatcherInterface>>
207         watchers;
208     // The latest data seen from CDS.
209     absl::optional<XdsApi::CdsUpdate> update;
210   };
211 
212   struct EndpointState {
213     std::map<EndpointWatcherInterface*,
214              std::unique_ptr<EndpointWatcherInterface>>
215         watchers;
216     // The latest data seen from EDS.
217     absl::optional<XdsApi::EdsUpdate> update;
218   };
219 
220   struct LoadReportState {
221     struct LocalityState {
222       std::set<XdsClusterLocalityStats*> locality_stats;
223       std::vector<XdsClusterLocalityStats::Snapshot> deleted_locality_stats;
224     };
225 
226     std::set<XdsClusterDropStats*> drop_stats;
227     XdsClusterDropStats::DroppedRequestsMap deleted_drop_stats;
228     std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
229              XdsLocalityName::Less>
230         locality_stats;
231     grpc_millis last_report_time = ExecCtx::Get()->Now();
232   };
233 
234   // Sends an error notification to all watchers.
235   void NotifyOnError(grpc_error* error);
236 
237   // Returns the weighted_clusters action name to use from
238   // weighted_cluster_index_map_ for a WeightedClusters route action.
239   std::string WeightedClustersActionName(
240       const std::vector<XdsApi::RdsUpdate::RdsRoute::ClusterWeight>&
241           weighted_clusters);
242 
243   // Updates weighted_cluster_index_map_ that will
244   // determine the names of the WeightedCluster actions for the current update.
245   void UpdateWeightedClusterIndexMap(const XdsApi::RdsUpdate& rds_update);
246 
247   // Create the service config generated by the RdsUpdate.
248   grpc_error* CreateServiceConfig(const XdsApi::RdsUpdate& rds_update,
249                                   RefCountedPtr<ServiceConfig>* service_config);
250 
251   XdsApi::ClusterLoadReportMap BuildLoadReportSnapshot(
252       bool send_all_clusters, const std::set<std::string>& clusters);
253 
254   // Channel arg vtable functions.
255   static void* ChannelArgCopy(void* p);
256   static void ChannelArgDestroy(void* p);
257   static int ChannelArgCmp(void* p, void* q);
258 
259   static const grpc_arg_pointer_vtable kXdsClientVtable;
260 
261   const grpc_millis request_timeout_;
262 
263   std::shared_ptr<WorkSerializer> work_serializer_;
264   grpc_pollset_set* interested_parties_;
265 
266   std::unique_ptr<XdsBootstrap> bootstrap_;
267   XdsApi api_;
268 
269   const std::string server_name_;
270 
271   std::unique_ptr<ServiceConfigWatcherInterface> service_config_watcher_;
272 
273   // The channel for communicating with the xds server.
274   OrphanablePtr<ChannelState> chand_;
275 
276   absl::optional<XdsApi::LdsUpdate> lds_result_;
277   absl::optional<XdsApi::RdsUpdate> rds_result_;
278 
279   // One entry for each watched CDS resource.
280   std::map<std::string /*cluster_name*/, ClusterState> cluster_map_;
281   // One entry for each watched EDS resource.
282   std::map<std::string /*eds_service_name*/, EndpointState> endpoint_map_;
283   std::map<
284       std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
285       LoadReportState>
286       load_report_map_;
287 
288   // 2-level map to store WeightedCluster action names.
289   // Top level map is keyed by cluster names without weight like a_b_c; bottom
290   // level map is keyed by cluster names + weights like a10_b50_c40.
291   struct ClusterNamesInfo {
292     uint64_t next_index = 0;
293     std::map<std::string /*cluster names + weights*/,
294              uint64_t /*policy index number*/>
295         cluster_weights_map;
296   };
297   using WeightedClusterIndexMap =
298       std::map<std::string /*cluster names*/, ClusterNamesInfo>;
299 
300   // Cache of action names for WeightedCluster targets in the current
301   // service config.
302   WeightedClusterIndexMap weighted_cluster_index_map_;
303 
304   bool shutting_down_ = false;
305 };
306 
307 }  // namespace grpc_core
308 
309 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_XDS_XDS_CLIENT_H */
310