1 // 2 // Copyright 2019 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H 18 #define GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H 19 20 #include <grpc/event_engine/event_engine.h> 21 22 #include <atomic> 23 #include <map> 24 #include <memory> 25 #include <set> 26 #include <string> 27 #include <utility> 28 29 #include "absl/base/thread_annotations.h" 30 #include "absl/status/status.h" 31 #include "absl/status/statusor.h" 32 #include "absl/strings/string_view.h" 33 #include "src/core/lib/debug/trace.h" 34 #include "src/core/load_balancing/backend_metric_data.h" 35 #include "src/core/util/dual_ref_counted.h" 36 #include "src/core/util/orphanable.h" 37 #include "src/core/util/per_cpu.h" 38 #include "src/core/util/ref_counted.h" 39 #include "src/core/util/ref_counted_ptr.h" 40 #include "src/core/util/sync.h" 41 #include "src/core/util/time.h" 42 #include "src/core/util/uri.h" 43 #include "src/core/util/work_serializer.h" 44 #include "src/core/xds/xds_client/xds_api.h" 45 #include "src/core/xds/xds_client/xds_backend_metric_propagation.h" 46 #include "src/core/xds/xds_client/xds_bootstrap.h" 47 #include "src/core/xds/xds_client/xds_locality.h" 48 #include "src/core/xds/xds_client/xds_metrics.h" 49 #include "src/core/xds/xds_client/xds_resource_type.h" 50 #include "src/core/xds/xds_client/xds_transport.h" 51 #include "upb/reflection/def.hpp" 52 53 namespace grpc_core { 54 55 bool XdsOrcaLrsPropagationChangesEnabled(); 56 57 class LrsClient : public DualRefCounted<LrsClient> { 58 public: 59 // Drop stats for an xds cluster. 60 class ClusterDropStats final : public RefCounted<ClusterDropStats> { 61 public: 62 // The total number of requests dropped for any reason is the sum of 63 // uncategorized_drops, and dropped_requests map. 64 using CategorizedDropsMap = std::map<std::string /* category */, uint64_t>; 65 struct Snapshot { 66 uint64_t uncategorized_drops = 0; 67 // The number of requests dropped for the specific drop categories 68 // outlined in the drop_overloads field in the EDS response. 69 CategorizedDropsMap categorized_drops; 70 71 Snapshot& operator+=(const Snapshot& other) { 72 uncategorized_drops += other.uncategorized_drops; 73 for (const auto& p : other.categorized_drops) { 74 categorized_drops[p.first] += p.second; 75 } 76 return *this; 77 } 78 IsZeroSnapshot79 bool IsZero() const { 80 if (uncategorized_drops != 0) return false; 81 for (const auto& p : categorized_drops) { 82 if (p.second != 0) return false; 83 } 84 return true; 85 } 86 }; 87 88 ClusterDropStats(RefCountedPtr<LrsClient> lrs_client, 89 absl::string_view lrs_server, 90 absl::string_view cluster_name, 91 absl::string_view eds_service_name); 92 ~ClusterDropStats() override; 93 94 // Returns a snapshot of this instance and resets all the counters. 95 Snapshot GetSnapshotAndReset(); 96 97 void AddUncategorizedDrops(); 98 void AddCallDropped(const std::string& category); 99 100 private: 101 RefCountedPtr<LrsClient> lrs_client_; 102 absl::string_view lrs_server_; 103 absl::string_view cluster_name_; 104 absl::string_view eds_service_name_; 105 std::atomic<uint64_t> uncategorized_drops_{0}; 106 // Protects categorized_drops_. A mutex is necessary because the length of 107 // dropped_requests can be accessed by both the picker (from data plane 108 // mutex) and the load reporting thread (from the control plane combiner). 109 Mutex mu_; 110 CategorizedDropsMap categorized_drops_ ABSL_GUARDED_BY(mu_); 111 }; 112 113 // Locality stats for an xds cluster. 114 class ClusterLocalityStats final : public RefCounted<ClusterLocalityStats> { 115 public: 116 struct BackendMetric { 117 uint64_t num_requests_finished_with_metric = 0; 118 double total_metric_value = 0; 119 120 BackendMetric() = default; 121 BackendMetricBackendMetric122 BackendMetric(uint64_t num_requests_finished, double value) 123 : num_requests_finished_with_metric(num_requests_finished), 124 total_metric_value(value) {} 125 BackendMetricBackendMetric126 BackendMetric(BackendMetric&& other) noexcept 127 : num_requests_finished_with_metric( 128 std::exchange(other.num_requests_finished_with_metric, 0)), 129 total_metric_value(std::exchange(other.total_metric_value, 0)) {} 130 131 BackendMetric& operator=(BackendMetric&& other) noexcept { 132 num_requests_finished_with_metric = 133 std::exchange(other.num_requests_finished_with_metric, 0); 134 total_metric_value = std::exchange(other.total_metric_value, 0); 135 return *this; 136 } 137 138 BackendMetric& operator+=(const BackendMetric& other) { 139 num_requests_finished_with_metric += 140 other.num_requests_finished_with_metric; 141 total_metric_value += other.total_metric_value; 142 return *this; 143 } 144 IsZeroBackendMetric145 bool IsZero() const { 146 return num_requests_finished_with_metric == 0 && 147 total_metric_value == 0; 148 } 149 }; 150 151 struct Snapshot { 152 uint64_t total_successful_requests = 0; 153 uint64_t total_requests_in_progress = 0; 154 uint64_t total_error_requests = 0; 155 uint64_t total_issued_requests = 0; 156 BackendMetric cpu_utilization; 157 BackendMetric mem_utilization; 158 BackendMetric application_utilization; 159 std::map<std::string, BackendMetric> backend_metrics; 160 161 Snapshot& operator+=(const Snapshot& other) { 162 total_successful_requests += other.total_successful_requests; 163 total_requests_in_progress += other.total_requests_in_progress; 164 total_error_requests += other.total_error_requests; 165 total_issued_requests += other.total_issued_requests; 166 cpu_utilization += other.cpu_utilization; 167 mem_utilization += other.mem_utilization; 168 application_utilization += other.application_utilization; 169 for (const auto& p : other.backend_metrics) { 170 backend_metrics[p.first] += p.second; 171 } 172 return *this; 173 } 174 IsZeroSnapshot175 bool IsZero() const { 176 if (total_successful_requests != 0 || total_requests_in_progress != 0 || 177 total_error_requests != 0 || total_issued_requests != 0 || 178 !cpu_utilization.IsZero() || !mem_utilization.IsZero() || 179 !application_utilization.IsZero()) { 180 return false; 181 } 182 for (const auto& p : backend_metrics) { 183 if (!p.second.IsZero()) return false; 184 } 185 return true; 186 } 187 }; 188 189 ClusterLocalityStats(RefCountedPtr<LrsClient> lrs_client, 190 absl::string_view lrs_server, 191 absl::string_view cluster_name, 192 absl::string_view eds_service_name, 193 RefCountedPtr<XdsLocalityName> name, 194 RefCountedPtr<const BackendMetricPropagation> 195 backend_metric_propagation); 196 ~ClusterLocalityStats() override; 197 198 // Returns a snapshot of this instance and resets all the counters. 199 Snapshot GetSnapshotAndReset(); 200 201 void AddCallStarted(); 202 void AddCallFinished(const BackendMetricData* backend_metrics, 203 bool fail = false); 204 locality_name()205 XdsLocalityName* locality_name() const { return name_.get(); } 206 207 private: 208 struct Stats { 209 std::atomic<uint64_t> total_successful_requests{0}; 210 std::atomic<uint64_t> total_requests_in_progress{0}; 211 std::atomic<uint64_t> total_error_requests{0}; 212 std::atomic<uint64_t> total_issued_requests{0}; 213 214 Mutex backend_metrics_mu; 215 BackendMetric cpu_utilization ABSL_GUARDED_BY(backend_metrics_mu); 216 BackendMetric mem_utilization ABSL_GUARDED_BY(backend_metrics_mu); 217 BackendMetric application_utilization ABSL_GUARDED_BY(backend_metrics_mu); 218 std::map<std::string, BackendMetric> backend_metrics 219 ABSL_GUARDED_BY(backend_metrics_mu); 220 }; 221 222 RefCountedPtr<LrsClient> lrs_client_; 223 absl::string_view lrs_server_; 224 absl::string_view cluster_name_; 225 absl::string_view eds_service_name_; 226 RefCountedPtr<XdsLocalityName> name_; 227 RefCountedPtr<const BackendMetricPropagation> backend_metric_propagation_; 228 PerCpu<Stats> stats_{PerCpuOptions().SetMaxShards(32).SetCpusPerShard(4)}; 229 }; 230 231 LrsClient( 232 std::shared_ptr<XdsBootstrap> bootstrap, std::string user_agent_name, 233 std::string user_agent_version, 234 RefCountedPtr<XdsTransportFactory> transport_factory, 235 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine); 236 ~LrsClient() override; 237 238 // Adds drop stats for cluster_name and eds_service_name. 239 RefCountedPtr<ClusterDropStats> AddClusterDropStats( 240 std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server, 241 absl::string_view cluster_name, absl::string_view eds_service_name); 242 243 // Adds locality stats for cluster_name and eds_service_name for the 244 // specified locality with the specified backend metric propagation. 245 RefCountedPtr<ClusterLocalityStats> AddClusterLocalityStats( 246 std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server, 247 absl::string_view cluster_name, absl::string_view eds_service_name, 248 RefCountedPtr<XdsLocalityName> locality, 249 RefCountedPtr<const BackendMetricPropagation> backend_metric_propagation); 250 251 // Resets connection backoff state. 252 void ResetBackoff(); 253 transport_factory()254 XdsTransportFactory* transport_factory() const { 255 return transport_factory_.get(); 256 } 257 engine()258 grpc_event_engine::experimental::EventEngine* engine() { 259 return engine_.get(); 260 } 261 262 private: 263 // Contains a channel to the LRS server and all the data related to the 264 // channel. 265 class LrsChannel final : public DualRefCounted<LrsChannel> { 266 public: 267 template <typename T> 268 class RetryableCall; 269 270 class LrsCall; 271 272 LrsChannel(WeakRefCountedPtr<LrsClient> lrs_client, 273 std::shared_ptr<const XdsBootstrap::XdsServer> server); 274 ~LrsChannel() override; 275 lrs_client()276 LrsClient* lrs_client() const { return lrs_client_.get(); } 277 278 void ResetBackoff(); 279 280 void MaybeStartLrsCall(); 281 server_uri()282 absl::string_view server_uri() const { return server_->server_uri(); } 283 284 private: 285 void Orphaned() override; 286 287 void StopLrsCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_); 288 289 // The owning LrsClient. 290 WeakRefCountedPtr<LrsClient> lrs_client_; 291 292 std::shared_ptr<const XdsBootstrap::XdsServer> server_; 293 294 RefCountedPtr<XdsTransportFactory::XdsTransport> transport_; 295 296 // The retryable LRS call. 297 OrphanablePtr<RetryableCall<LrsCall>> lrs_call_; 298 }; 299 300 struct LoadReportState { 301 struct LocalityState { 302 std::map<RefCountedPtr<const BackendMetricPropagation>, 303 ClusterLocalityStats*, BackendMetricPropagation::Less> 304 propagation_stats; 305 ClusterLocalityStats::Snapshot deleted_locality_stats; 306 }; 307 308 ClusterDropStats* drop_stats = nullptr; 309 ClusterDropStats::Snapshot deleted_drop_stats; 310 std::map<RefCountedPtr<XdsLocalityName>, LocalityState, 311 XdsLocalityName::Less> 312 locality_stats; 313 Timestamp last_report_time = Timestamp::Now(); 314 }; 315 316 // Load report data. 317 using LoadReportMap = std::map< 318 std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>, 319 LoadReportState>; 320 321 struct LoadReportServer { 322 RefCountedPtr<LrsChannel> lrs_channel; 323 LoadReportMap load_report_map; 324 }; 325 326 struct ClusterLoadReport { 327 ClusterDropStats::Snapshot dropped_requests; 328 std::map<RefCountedPtr<XdsLocalityName>, ClusterLocalityStats::Snapshot, 329 XdsLocalityName::Less> 330 locality_stats; 331 Duration load_report_interval; 332 }; 333 using ClusterLoadReportMap = std::map< 334 std::pair<std::string /*cluster_name*/, std::string /*eds_service_name*/>, 335 ClusterLoadReport>; 336 337 void Orphaned() override; 338 339 ClusterLoadReportMap BuildLoadReportSnapshotLocked( 340 const XdsBootstrap::XdsServer& lrs_server, bool send_all_clusters, 341 const std::set<std::string>& clusters) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 342 343 RefCountedPtr<LrsChannel> GetOrCreateLrsChannelLocked( 344 std::shared_ptr<const XdsBootstrap::XdsServer> server, const char* reason) 345 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 346 347 static bool LoadReportCountersAreZero(const ClusterLoadReportMap& snapshot); 348 349 void RemoveClusterDropStats(absl::string_view lrs_server, 350 absl::string_view cluster_name, 351 absl::string_view eds_service_name, 352 ClusterDropStats* cluster_drop_stats); 353 354 void RemoveClusterLocalityStats( 355 absl::string_view lrs_server, absl::string_view cluster_name, 356 absl::string_view eds_service_name, 357 const RefCountedPtr<XdsLocalityName>& locality, 358 const RefCountedPtr<const BackendMetricPropagation>& 359 backend_metric_propagation, 360 ClusterLocalityStats* cluster_locality_stats); 361 362 // Creates an initial LRS request. 363 std::string CreateLrsInitialRequest() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); 364 365 // Creates an LRS request sending a client-side load report. 366 std::string CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map) 367 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); 368 369 // Parses the LRS response and populates send_all_clusters, 370 // cluster_names, and load_reporting_interval. 371 absl::Status ParseLrsResponse(absl::string_view encoded_response, 372 bool* send_all_clusters, 373 std::set<std::string>* cluster_names, 374 Duration* load_reporting_interval) 375 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); 376 377 std::shared_ptr<XdsBootstrap> bootstrap_; 378 const std::string user_agent_name_; 379 const std::string user_agent_version_; 380 RefCountedPtr<XdsTransportFactory> transport_factory_; 381 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine_; 382 383 Mutex mu_; 384 upb::DefPool def_pool_ ABSL_GUARDED_BY(mu_); 385 // Map of existing LRS channels. 386 std::map<std::string /*XdsServer key*/, LrsChannel*> lrs_channel_map_ 387 ABSL_GUARDED_BY(mu_); 388 std::map<std::string /*XdsServer key*/, LoadReportServer, std::less<>> 389 load_report_map_ ABSL_GUARDED_BY(mu_); 390 }; 391 392 } // namespace grpc_core 393 394 #endif // GRPC_SRC_CORE_XDS_XDS_CLIENT_LRS_CLIENT_H 395