• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H
18 #define GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H
19 
20 #include <grpc/event_engine/event_engine.h>
21 
22 #include <atomic>
23 #include <map>
24 #include <memory>
25 #include <set>
26 #include <string>
27 #include <utility>
28 
29 #include "absl/base/thread_annotations.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/string_view.h"
33 #include "src/core/lib/debug/trace.h"
34 #include "src/core/load_balancing/backend_metric_data.h"
35 #include "src/core/util/dual_ref_counted.h"
36 #include "src/core/util/orphanable.h"
37 #include "src/core/util/per_cpu.h"
38 #include "src/core/util/ref_counted.h"
39 #include "src/core/util/ref_counted_ptr.h"
40 #include "src/core/util/sync.h"
41 #include "src/core/util/time.h"
42 #include "src/core/util/uri.h"
43 #include "src/core/util/work_serializer.h"
44 #include "src/core/xds/xds_client/xds_api.h"
45 #include "src/core/xds/xds_client/xds_backend_metric_propagation.h"
46 #include "src/core/xds/xds_client/xds_bootstrap.h"
47 #include "src/core/xds/xds_client/xds_locality.h"
48 #include "src/core/xds/xds_client/xds_metrics.h"
49 #include "src/core/xds/xds_client/xds_resource_type.h"
50 #include "src/core/xds/xds_client/xds_transport.h"
51 #include "upb/reflection/def.hpp"
52 
53 namespace grpc_core {
54 
55 bool XdsOrcaLrsPropagationChangesEnabled();
56 
57 class LrsClient : public DualRefCounted<LrsClient> {
58  public:
59   // Drop stats for an xds cluster.
60   class ClusterDropStats final : public RefCounted<ClusterDropStats> {
61    public:
62     // The total number of requests dropped for any reason is the sum of
63     // uncategorized_drops, and dropped_requests map.
64     using CategorizedDropsMap = std::map<std::string /* category */, uint64_t>;
65     struct Snapshot {
66       uint64_t uncategorized_drops = 0;
67       // The number of requests dropped for the specific drop categories
68       // outlined in the drop_overloads field in the EDS response.
69       CategorizedDropsMap categorized_drops;
70 
71       Snapshot& operator+=(const Snapshot& other) {
72         uncategorized_drops += other.uncategorized_drops;
73         for (const auto& p : other.categorized_drops) {
74           categorized_drops[p.first] += p.second;
75         }
76         return *this;
77       }
78 
IsZeroSnapshot79       bool IsZero() const {
80         if (uncategorized_drops != 0) return false;
81         for (const auto& p : categorized_drops) {
82           if (p.second != 0) return false;
83         }
84         return true;
85       }
86     };
87 
88     ClusterDropStats(RefCountedPtr<LrsClient> lrs_client,
89                      absl::string_view lrs_server,
90                      absl::string_view cluster_name,
91                      absl::string_view eds_service_name);
92     ~ClusterDropStats() override;
93 
94     // Returns a snapshot of this instance and resets all the counters.
95     Snapshot GetSnapshotAndReset();
96 
97     void AddUncategorizedDrops();
98     void AddCallDropped(const std::string& category);
99 
100    private:
101     RefCountedPtr<LrsClient> lrs_client_;
102     absl::string_view lrs_server_;
103     absl::string_view cluster_name_;
104     absl::string_view eds_service_name_;
105     std::atomic<uint64_t> uncategorized_drops_{0};
106     // Protects categorized_drops_. A mutex is necessary because the length of
107     // dropped_requests can be accessed by both the picker (from data plane
108     // mutex) and the load reporting thread (from the control plane combiner).
109     Mutex mu_;
110     CategorizedDropsMap categorized_drops_ ABSL_GUARDED_BY(mu_);
111   };
112 
113   // Locality stats for an xds cluster.
114   class ClusterLocalityStats final : public RefCounted<ClusterLocalityStats> {
115    public:
116     struct BackendMetric {
117       uint64_t num_requests_finished_with_metric = 0;
118       double total_metric_value = 0;
119 
120       BackendMetric() = default;
121 
BackendMetricBackendMetric122       BackendMetric(uint64_t num_requests_finished, double value)
123           : num_requests_finished_with_metric(num_requests_finished),
124             total_metric_value(value) {}
125 
BackendMetricBackendMetric126       BackendMetric(BackendMetric&& other) noexcept
127           : num_requests_finished_with_metric(
128                 std::exchange(other.num_requests_finished_with_metric, 0)),
129             total_metric_value(std::exchange(other.total_metric_value, 0)) {}
130 
131       BackendMetric& operator=(BackendMetric&& other) noexcept {
132         num_requests_finished_with_metric =
133             std::exchange(other.num_requests_finished_with_metric, 0);
134         total_metric_value = std::exchange(other.total_metric_value, 0);
135         return *this;
136       }
137 
138       BackendMetric& operator+=(const BackendMetric& other) {
139         num_requests_finished_with_metric +=
140             other.num_requests_finished_with_metric;
141         total_metric_value += other.total_metric_value;
142         return *this;
143       }
144 
IsZeroBackendMetric145       bool IsZero() const {
146         return num_requests_finished_with_metric == 0 &&
147                total_metric_value == 0;
148       }
149     };
150 
151     struct Snapshot {
152       uint64_t total_successful_requests = 0;
153       uint64_t total_requests_in_progress = 0;
154       uint64_t total_error_requests = 0;
155       uint64_t total_issued_requests = 0;
156       BackendMetric cpu_utilization;
157       BackendMetric mem_utilization;
158       BackendMetric application_utilization;
159       std::map<std::string, BackendMetric> backend_metrics;
160 
161       Snapshot& operator+=(const Snapshot& other) {
162         total_successful_requests += other.total_successful_requests;
163         total_requests_in_progress += other.total_requests_in_progress;
164         total_error_requests += other.total_error_requests;
165         total_issued_requests += other.total_issued_requests;
166         cpu_utilization += other.cpu_utilization;
167         mem_utilization += other.mem_utilization;
168         application_utilization += other.application_utilization;
169         for (const auto& p : other.backend_metrics) {
170           backend_metrics[p.first] += p.second;
171         }
172         return *this;
173       }
174 
IsZeroSnapshot175       bool IsZero() const {
176         if (total_successful_requests != 0 || total_requests_in_progress != 0 ||
177             total_error_requests != 0 || total_issued_requests != 0 ||
178             !cpu_utilization.IsZero() || !mem_utilization.IsZero() ||
179             !application_utilization.IsZero()) {
180           return false;
181         }
182         for (const auto& p : backend_metrics) {
183           if (!p.second.IsZero()) return false;
184         }
185         return true;
186       }
187     };
188 
189     ClusterLocalityStats(RefCountedPtr<LrsClient> lrs_client,
190                          absl::string_view lrs_server,
191                          absl::string_view cluster_name,
192                          absl::string_view eds_service_name,
193                          RefCountedPtr<XdsLocalityName> name,
194                          RefCountedPtr<const BackendMetricPropagation>
195                              backend_metric_propagation);
196     ~ClusterLocalityStats() override;
197 
198     // Returns a snapshot of this instance and resets all the counters.
199     Snapshot GetSnapshotAndReset();
200 
201     void AddCallStarted();
202     void AddCallFinished(const BackendMetricData* backend_metrics,
203                          bool fail = false);
204 
locality_name()205     XdsLocalityName* locality_name() const { return name_.get(); }
206 
207    private:
208     struct Stats {
209       std::atomic<uint64_t> total_successful_requests{0};
210       std::atomic<uint64_t> total_requests_in_progress{0};
211       std::atomic<uint64_t> total_error_requests{0};
212       std::atomic<uint64_t> total_issued_requests{0};
213 
214       Mutex backend_metrics_mu;
215       BackendMetric cpu_utilization ABSL_GUARDED_BY(backend_metrics_mu);
216       BackendMetric mem_utilization ABSL_GUARDED_BY(backend_metrics_mu);
217       BackendMetric application_utilization ABSL_GUARDED_BY(backend_metrics_mu);
218       std::map<std::string, BackendMetric> backend_metrics
219           ABSL_GUARDED_BY(backend_metrics_mu);
220     };
221 
222     RefCountedPtr<LrsClient> lrs_client_;
223     absl::string_view lrs_server_;
224     absl::string_view cluster_name_;
225     absl::string_view eds_service_name_;
226     RefCountedPtr<XdsLocalityName> name_;
227     RefCountedPtr<const BackendMetricPropagation> backend_metric_propagation_;
228     PerCpu<Stats> stats_{PerCpuOptions().SetMaxShards(32).SetCpusPerShard(4)};
229   };
230 
231   LrsClient(
232       std::shared_ptr<XdsBootstrap> bootstrap, std::string user_agent_name,
233       std::string user_agent_version,
234       RefCountedPtr<XdsTransportFactory> transport_factory,
235       std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine);
236   ~LrsClient() override;
237 
238   // Adds drop stats for cluster_name and eds_service_name.
239   RefCountedPtr<ClusterDropStats> AddClusterDropStats(
240       std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server,
241       absl::string_view cluster_name, absl::string_view eds_service_name);
242 
243   // Adds locality stats for cluster_name and eds_service_name for the
244   // specified locality with the specified backend metric propagation.
245   RefCountedPtr<ClusterLocalityStats> AddClusterLocalityStats(
246       std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server,
247       absl::string_view cluster_name, absl::string_view eds_service_name,
248       RefCountedPtr<XdsLocalityName> locality,
249       RefCountedPtr<const BackendMetricPropagation> backend_metric_propagation);
250 
251   // Resets connection backoff state.
252   void ResetBackoff();
253 
transport_factory()254   XdsTransportFactory* transport_factory() const {
255     return transport_factory_.get();
256   }
257 
engine()258   grpc_event_engine::experimental::EventEngine* engine() {
259     return engine_.get();
260   }
261 
262  private:
263   // Contains a channel to the LRS server and all the data related to the
264   // channel.
265   class LrsChannel final : public DualRefCounted<LrsChannel> {
266    public:
267     template <typename T>
268     class RetryableCall;
269 
270     class LrsCall;
271 
272     LrsChannel(WeakRefCountedPtr<LrsClient> lrs_client,
273                std::shared_ptr<const XdsBootstrap::XdsServer> server);
274     ~LrsChannel() override;
275 
lrs_client()276     LrsClient* lrs_client() const { return lrs_client_.get(); }
277 
278     void ResetBackoff();
279 
280     void MaybeStartLrsCall();
281 
server_uri()282     absl::string_view server_uri() const { return server_->server_uri(); }
283 
284    private:
285     void Orphaned() override;
286 
287     void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
288 
289     // The owning LrsClient.
290     WeakRefCountedPtr<LrsClient> lrs_client_;
291 
292     std::shared_ptr<const XdsBootstrap::XdsServer> server_;
293 
294     RefCountedPtr<XdsTransportFactory::XdsTransport> transport_;
295 
296     // The retryable LRS call.
297     OrphanablePtr<RetryableCall<LrsCall>> lrs_call_;
298   };
299 
300   struct LoadReportState {
301     struct LocalityState {
302       std::map<RefCountedPtr<const BackendMetricPropagation>,
303                ClusterLocalityStats*, BackendMetricPropagation::Less>
304           propagation_stats;
305       ClusterLocalityStats::Snapshot deleted_locality_stats;
306     };
307 
308     ClusterDropStats* drop_stats = nullptr;
309     ClusterDropStats::Snapshot deleted_drop_stats;
310     std::map<RefCountedPtr<XdsLocalityName>, LocalityState,
311              XdsLocalityName::Less>
312         locality_stats;
313     Timestamp last_report_time = Timestamp::Now();
314   };
315 
316   // Load report data.
317   using LoadReportMap = std::map<
318       std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
319       LoadReportState>;
320 
321   struct LoadReportServer {
322     RefCountedPtr<LrsChannel> lrs_channel;
323     LoadReportMap load_report_map;
324   };
325 
326   struct ClusterLoadReport {
327     ClusterDropStats::Snapshot dropped_requests;
328     std::map<RefCountedPtr<XdsLocalityName>, ClusterLocalityStats::Snapshot,
329              XdsLocalityName::Less>
330         locality_stats;
331     Duration load_report_interval;
332   };
333   using ClusterLoadReportMap = std::map<
334       std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>,
335       ClusterLoadReport>;
336 
337   void Orphaned() override;
338 
339   ClusterLoadReportMap BuildLoadReportSnapshotLocked(
340       const XdsBootstrap::XdsServer& lrs_server, bool send_all_clusters,
341       const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
342 
343   RefCountedPtr<LrsChannel> GetOrCreateLrsChannelLocked(
344       std::shared_ptr<const XdsBootstrap::XdsServer> server, const char* reason)
345       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
346 
347   static bool LoadReportCountersAreZero(const ClusterLoadReportMap& snapshot);
348 
349   void RemoveClusterDropStats(absl::string_view lrs_server,
350                               absl::string_view cluster_name,
351                               absl::string_view eds_service_name,
352                               ClusterDropStats* cluster_drop_stats);
353 
354   void RemoveClusterLocalityStats(
355       absl::string_view lrs_server, absl::string_view cluster_name,
356       absl::string_view eds_service_name,
357       const RefCountedPtr<XdsLocalityName>& locality,
358       const RefCountedPtr<const BackendMetricPropagation>&
359           backend_metric_propagation,
360       ClusterLocalityStats* cluster_locality_stats);
361 
362   // Creates an initial LRS request.
363   std::string CreateLrsInitialRequest() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
364 
365   // Creates an LRS request sending a client-side load report.
366   std::string CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map)
367       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
368 
369   // Parses the LRS response and populates send_all_clusters,
370   // cluster_names, and load_reporting_interval.
371   absl::Status ParseLrsResponse(absl::string_view encoded_response,
372                                 bool* send_all_clusters,
373                                 std::set<std::string>* cluster_names,
374                                 Duration* load_reporting_interval)
375       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
376 
377   std::shared_ptr<XdsBootstrap> bootstrap_;
378   const std::string user_agent_name_;
379   const std::string user_agent_version_;
380   RefCountedPtr<XdsTransportFactory> transport_factory_;
381   std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_;
382 
383   Mutex mu_;
384   upb::DefPool def_pool_ ABSL_GUARDED_BY(mu_);
385   // Map of existing LRS channels.
386   std::map<std::string /*XdsServer key*/, LrsChannel*> lrs_channel_map_
387       ABSL_GUARDED_BY(mu_);
388   std::map<std::string /*XdsServer key*/, LoadReportServer, std::less<>>
389       load_report_map_ ABSL_GUARDED_BY(mu_);
390 };
391 
392 }  // namespace grpc_core
393 
394 #endif  // GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H
395