1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/xds/xds_client/lrs_client.h"
18
19 #include <grpc/event_engine/event_engine.h>
20
21 #include <memory>
22 #include <set>
23 #include <string>
24 #include <vector>
25
26 #include "absl/cleanup/cleanup.h"
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "absl/strings/string_view.h"
30 #include "absl/types/optional.h"
31 #include "envoy/config/core/v3/base.upb.h"
32 #include "envoy/config/endpoint/v3/load_report.upb.h"
33 #include "envoy/service/load_stats/v3/lrs.upb.h"
34 #include "envoy/service/load_stats/v3/lrs.upbdefs.h"
35 #include "google/protobuf/duration.upb.h"
36 #include "src/core/lib/debug/trace.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/util/backoff.h"
39 #include "src/core/util/debug_location.h"
40 #include "src/core/util/env.h"
41 #include "src/core/util/orphanable.h"
42 #include "src/core/util/ref_counted_ptr.h"
43 #include "src/core/util/string.h"
44 #include "src/core/util/sync.h"
45 #include "src/core/util/upb_utils.h"
46 #include "src/core/util/uri.h"
47 #include "src/core/xds/xds_client/xds_api.h"
48 #include "src/core/xds/xds_client/xds_bootstrap.h"
49 #include "src/core/xds/xds_client/xds_locality.h"
50 #include "upb/base/string_view.h"
51 #include "upb/mem/arena.h"
52 #include "upb/reflection/def.h"
53 #include "upb/text/encode.h"
54
55 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
56 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
57 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
58 #define GRPC_XDS_RECONNECT_JITTER 0.2
59 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
60
61 namespace grpc_core {
62
63 using ::grpc_event_engine::experimental::EventEngine;
64
65 // TODO(roth): Remove this once the feature passes interop tests.
XdsOrcaLrsPropagationChangesEnabled()66 bool XdsOrcaLrsPropagationChangesEnabled() {
67 auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
68 if (!value.has_value()) return false;
69 bool parsed_value;
70 bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
71 return parse_succeeded && parsed_value;
72 }
73
74 namespace {
75
GetAndResetCounter(std::atomic<uint64_t> * from)76 uint64_t GetAndResetCounter(std::atomic<uint64_t>* from) {
77 return from->exchange(0, std::memory_order_relaxed);
78 }
79
80 } // namespace
81
82 //
83 // LrsClient::ClusterDropStats
84 //
85
ClusterDropStats(RefCountedPtr<LrsClient> lrs_client,absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name)86 LrsClient::ClusterDropStats::ClusterDropStats(
87 RefCountedPtr<LrsClient> lrs_client, absl::string_view lrs_server,
88 absl::string_view cluster_name, absl::string_view eds_service_name)
89 : RefCounted(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
90 ? "ClusterDropStats"
91 : nullptr),
92 lrs_client_(std::move(lrs_client)),
93 lrs_server_(lrs_server),
94 cluster_name_(cluster_name),
95 eds_service_name_(eds_service_name) {
96 GRPC_TRACE_LOG(xds_client, INFO)
97 << "[lrs_client " << lrs_client_.get() << "] created drop stats " << this
98 << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
99 << eds_service_name_ << "}";
100 }
101
~ClusterDropStats()102 LrsClient::ClusterDropStats::~ClusterDropStats() {
103 GRPC_TRACE_LOG(xds_client, INFO)
104 << "[lrs_client " << lrs_client_.get() << "] destroying drop stats "
105 << this << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
106 << eds_service_name_ << "}";
107 lrs_client_->RemoveClusterDropStats(lrs_server_, cluster_name_,
108 eds_service_name_, this);
109 lrs_client_.reset(DEBUG_LOCATION, "ClusterDropStats");
110 }
111
112 LrsClient::ClusterDropStats::Snapshot
GetSnapshotAndReset()113 LrsClient::ClusterDropStats::GetSnapshotAndReset() {
114 Snapshot snapshot;
115 snapshot.uncategorized_drops = GetAndResetCounter(&uncategorized_drops_);
116 MutexLock lock(&mu_);
117 snapshot.categorized_drops = std::move(categorized_drops_);
118 return snapshot;
119 }
120
AddUncategorizedDrops()121 void LrsClient::ClusterDropStats::AddUncategorizedDrops() {
122 uncategorized_drops_.fetch_add(1);
123 }
124
AddCallDropped(const std::string & category)125 void LrsClient::ClusterDropStats::AddCallDropped(const std::string& category) {
126 MutexLock lock(&mu_);
127 ++categorized_drops_[category];
128 }
129
130 //
131 // LrsClient::ClusterLocalityStats
132 //
133
ClusterLocalityStats(RefCountedPtr<LrsClient> lrs_client,absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> name,RefCountedPtr<const BackendMetricPropagation> backend_metric_propagation)134 LrsClient::ClusterLocalityStats::ClusterLocalityStats(
135 RefCountedPtr<LrsClient> lrs_client, absl::string_view lrs_server,
136 absl::string_view cluster_name, absl::string_view eds_service_name,
137 RefCountedPtr<XdsLocalityName> name,
138 RefCountedPtr<const BackendMetricPropagation> backend_metric_propagation)
139 : RefCounted(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
140 ? "ClusterLocalityStats"
141 : nullptr),
142 lrs_client_(std::move(lrs_client)),
143 lrs_server_(lrs_server),
144 cluster_name_(cluster_name),
145 eds_service_name_(eds_service_name),
146 name_(std::move(name)),
147 backend_metric_propagation_(std::move(backend_metric_propagation)) {
148 GRPC_TRACE_LOG(xds_client, INFO)
149 << "[lrs_client " << lrs_client_.get() << "] created locality stats "
150 << this << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
151 << eds_service_name_ << ", "
152 << (name_ == nullptr ? "<none>" : name_->human_readable_string().c_str())
153 << ", propagation=" << backend_metric_propagation_->AsString() << "}";
154 }
155
~ClusterLocalityStats()156 LrsClient::ClusterLocalityStats::~ClusterLocalityStats() {
157 GRPC_TRACE_LOG(xds_client, INFO)
158 << "[lrs_client " << lrs_client_.get() << "] destroying locality stats "
159 << this << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
160 << eds_service_name_ << ", "
161 << (name_ == nullptr ? "<none>" : name_->human_readable_string().c_str())
162 << ", propagation=" << backend_metric_propagation_->AsString() << "}";
163 lrs_client_->RemoveClusterLocalityStats(lrs_server_, cluster_name_,
164 eds_service_name_, name_,
165 backend_metric_propagation_, this);
166 lrs_client_.reset(DEBUG_LOCATION, "ClusterLocalityStats");
167 }
168
169 LrsClient::ClusterLocalityStats::Snapshot
GetSnapshotAndReset()170 LrsClient::ClusterLocalityStats::GetSnapshotAndReset() {
171 Snapshot snapshot;
172 for (auto& percpu_stats : stats_) {
173 Snapshot percpu_snapshot = {
174 GetAndResetCounter(&percpu_stats.total_successful_requests),
175 // Don't reset total_requests_in_progress because it's
176 // not related to a single reporting interval.
177 percpu_stats.total_requests_in_progress.load(std::memory_order_relaxed),
178 GetAndResetCounter(&percpu_stats.total_error_requests),
179 GetAndResetCounter(&percpu_stats.total_issued_requests),
180 {},
181 {},
182 {},
183 {}};
184 {
185 MutexLock lock(&percpu_stats.backend_metrics_mu);
186 percpu_snapshot.cpu_utilization = std::move(percpu_stats.cpu_utilization);
187 percpu_snapshot.mem_utilization = std::move(percpu_stats.mem_utilization);
188 percpu_snapshot.application_utilization =
189 std::move(percpu_stats.application_utilization);
190 percpu_snapshot.backend_metrics = std::move(percpu_stats.backend_metrics);
191 }
192 snapshot += percpu_snapshot;
193 }
194 return snapshot;
195 }
196
AddCallStarted()197 void LrsClient::ClusterLocalityStats::AddCallStarted() {
198 Stats& stats = stats_.this_cpu();
199 stats.total_issued_requests.fetch_add(1, std::memory_order_relaxed);
200 stats.total_requests_in_progress.fetch_add(1, std::memory_order_relaxed);
201 }
202
AddCallFinished(const BackendMetricData * backend_metrics,bool fail)203 void LrsClient::ClusterLocalityStats::AddCallFinished(
204 const BackendMetricData* backend_metrics, bool fail) {
205 Stats& stats = stats_.this_cpu();
206 std::atomic<uint64_t>& to_increment =
207 fail ? stats.total_error_requests : stats.total_successful_requests;
208 to_increment.fetch_add(1, std::memory_order_relaxed);
209 stats.total_requests_in_progress.fetch_add(-1, std::memory_order_acq_rel);
210 if (backend_metrics == nullptr) return;
211 MutexLock lock(&stats.backend_metrics_mu);
212 if (!XdsOrcaLrsPropagationChangesEnabled()) {
213 for (const auto& m : backend_metrics->named_metrics) {
214 stats.backend_metrics[std::string(m.first)] += BackendMetric(1, m.second);
215 }
216 return;
217 }
218 if (backend_metric_propagation_->propagation_bits &
219 BackendMetricPropagation::kCpuUtilization) {
220 stats.cpu_utilization += BackendMetric(1, backend_metrics->cpu_utilization);
221 }
222 if (backend_metric_propagation_->propagation_bits &
223 BackendMetricPropagation::kMemUtilization) {
224 stats.mem_utilization += BackendMetric(1, backend_metrics->mem_utilization);
225 }
226 if (backend_metric_propagation_->propagation_bits &
227 BackendMetricPropagation::kApplicationUtilization) {
228 stats.application_utilization +=
229 BackendMetric(1, backend_metrics->application_utilization);
230 }
231 if (backend_metric_propagation_->propagation_bits &
232 BackendMetricPropagation::kNamedMetricsAll ||
233 !backend_metric_propagation_->named_metric_keys.empty()) {
234 for (const auto& m : backend_metrics->named_metrics) {
235 if (backend_metric_propagation_->propagation_bits &
236 BackendMetricPropagation::kNamedMetricsAll ||
237 backend_metric_propagation_->named_metric_keys.contains(m.first)) {
238 stats.backend_metrics[absl::StrCat("named_metrics.", m.first)] +=
239 BackendMetric(1, m.second);
240 }
241 }
242 }
243 }
244
245 //
246 // Internal class declarations
247 //
248
249 // A call wrapper that can restart a call upon failure.
250 // The template parameter is the kind of wrapped call.
251 // TODO(roth): This is basically the same code as in XdsClient, and
252 // probably very similar to many other places in the codebase.
253 // Consider refactoring this into a common utility library somehow.
254 template <typename T>
255 class LrsClient::LrsChannel::RetryableCall final
256 : public InternallyRefCounted<RetryableCall<T>> {
257 public:
258 explicit RetryableCall(WeakRefCountedPtr<LrsChannel> lrs_channel);
259
260 // Disable thread-safety analysis because this method is called via
261 // OrphanablePtr<>, but there's no way to pass the lock annotation
262 // through there.
263 void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
264
265 void OnCallFinishedLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
266
call() const267 T* call() const { return call_.get(); }
lrs_channel() const268 LrsChannel* lrs_channel() const { return lrs_channel_.get(); }
269
270 bool IsCurrentCallOnChannel() const;
271
272 private:
273 void StartNewCallLocked();
274 void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
275
276 void OnRetryTimer();
277
278 // The wrapped xds call that talks to the xds server. It's instantiated
279 // every time we start a new call. It's null during call retry backoff.
280 OrphanablePtr<T> call_;
281 // The owning xds channel.
282 WeakRefCountedPtr<LrsChannel> lrs_channel_;
283
284 // Retry state.
285 BackOff backoff_;
286 absl::optional<EventEngine::TaskHandle> timer_handle_
287 ABSL_GUARDED_BY(&LrsClient::mu_);
288
289 bool shutting_down_ = false;
290 };
291
292 // An LRS call to the LRS server.
293 class LrsClient::LrsChannel::LrsCall final
294 : public InternallyRefCounted<LrsCall> {
295 public:
296 // The ctor and dtor should not be used directly.
297 explicit LrsCall(RefCountedPtr<RetryableCall<LrsCall>> retryable_call);
298
299 void Orphan() override;
300
retryable_call()301 RetryableCall<LrsCall>* retryable_call() { return retryable_call_.get(); }
lrs_channel() const302 LrsChannel* lrs_channel() const { return retryable_call_->lrs_channel(); }
lrs_client() const303 LrsClient* lrs_client() const { return lrs_channel()->lrs_client(); }
seen_response() const304 bool seen_response() const { return seen_response_; }
305
306 private:
307 class StreamEventHandler final
308 : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler {
309 public:
StreamEventHandler(RefCountedPtr<LrsCall> lrs_call)310 explicit StreamEventHandler(RefCountedPtr<LrsCall> lrs_call)
311 : lrs_call_(std::move(lrs_call)) {}
312
OnRequestSent(bool)313 void OnRequestSent(bool /*ok*/) override { lrs_call_->OnRequestSent(); }
OnRecvMessage(absl::string_view payload)314 void OnRecvMessage(absl::string_view payload) override {
315 lrs_call_->OnRecvMessage(payload);
316 }
OnStatusReceived(absl::Status status)317 void OnStatusReceived(absl::Status status) override {
318 lrs_call_->OnStatusReceived(std::move(status));
319 }
320
321 private:
322 RefCountedPtr<LrsCall> lrs_call_;
323 };
324
325 // A repeating timer for a particular duration.
326 class Timer final : public InternallyRefCounted<Timer> {
327 public:
Timer(RefCountedPtr<LrsCall> lrs_call)328 explicit Timer(RefCountedPtr<LrsCall> lrs_call)
329 : lrs_call_(std::move(lrs_call)) {}
~Timer()330 ~Timer() override { lrs_call_.reset(DEBUG_LOCATION, "LRS timer"); }
331
332 // Disable thread-safety analysis because this method is called via
333 // OrphanablePtr<>, but there's no way to pass the lock annotation
334 // through there.
335 void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
336
337 void ScheduleNextReportLocked()
338 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
339
340 private:
IsCurrentTimerOnCall() const341 bool IsCurrentTimerOnCall() const {
342 return this == lrs_call_->timer_.get();
343 }
lrs_client() const344 LrsClient* lrs_client() const { return lrs_call_->lrs_client(); }
345
346 void OnNextReportTimer();
347
348 // The owning LRS call.
349 RefCountedPtr<LrsCall> lrs_call_;
350
351 absl::optional<EventEngine::TaskHandle> timer_handle_
352 ABSL_GUARDED_BY(&LrsClient::mu_);
353 };
354
355 void MaybeScheduleNextReportLocked()
356 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
357
358 void SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
359
360 void SendMessageLocked(std::string payload)
361 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
362
363 void OnRequestSent();
364 void OnRecvMessage(absl::string_view payload);
365 void OnStatusReceived(absl::Status status);
366
367 bool IsCurrentCallOnChannel() const;
368
369 // The owning RetryableCall<>.
370 RefCountedPtr<RetryableCall<LrsCall>> retryable_call_;
371
372 OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
373 streaming_call_;
374
375 bool seen_response_ = false;
376 bool send_message_pending_ ABSL_GUARDED_BY(&LrsClient::mu_) = false;
377
378 // Load reporting state.
379 bool send_all_clusters_ = false;
380 std::set<std::string> cluster_names_; // Asked for by the LRS server.
381 Duration load_reporting_interval_;
382 bool last_report_counters_were_zero_ = false;
383 OrphanablePtr<Timer> timer_;
384 };
385
386 //
387 // LrsClient::LrsChannel
388 //
389
LrsChannel(WeakRefCountedPtr<LrsClient> lrs_client,std::shared_ptr<const XdsBootstrap::XdsServer> server)390 LrsClient::LrsChannel::LrsChannel(
391 WeakRefCountedPtr<LrsClient> lrs_client,
392 std::shared_ptr<const XdsBootstrap::XdsServer> server)
393 : DualRefCounted<LrsChannel>(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
394 ? "LrsChannel"
395 : nullptr),
396 lrs_client_(std::move(lrs_client)),
397 server_(std::move(server)) {
398 GRPC_TRACE_LOG(xds_client, INFO)
399 << "[lrs_client " << lrs_client_.get() << "] creating channel " << this
400 << " for server " << server_->server_uri();
401 absl::Status status;
402 transport_ = lrs_client_->transport_factory_->GetTransport(*server_, &status);
403 CHECK(transport_ != nullptr);
404 if (!status.ok()) {
405 LOG(ERROR) << "Error creating LRS channel to " << server_->server_uri()
406 << ": " << status;
407 }
408 }
409
~LrsChannel()410 LrsClient::LrsChannel::~LrsChannel() {
411 GRPC_TRACE_LOG(xds_client, INFO)
412 << "[lrs_client " << lrs_client() << "] destroying lrs channel " << this
413 << " for server " << server_->server_uri();
414 lrs_client_.reset(DEBUG_LOCATION, "LrsChannel");
415 }
416
417 // This method should only ever be called when holding the lock, but we can't
418 // use a ABSL_EXCLUSIVE_LOCKS_REQUIRED annotation, because Orphan() will be
419 // called from DualRefCounted::Unref(), which cannot have a lock annotation for
420 // a lock in this subclass.
Orphaned()421 void LrsClient::LrsChannel::Orphaned() ABSL_NO_THREAD_SAFETY_ANALYSIS {
422 GRPC_TRACE_LOG(xds_client, INFO)
423 << "[lrs_client " << lrs_client() << "] orphaning lrs channel " << this
424 << " for server " << server_->server_uri();
425 transport_.reset();
426 // At this time, all strong refs are removed, remove from channel map to
427 // prevent subsequent subscription from trying to use this LrsChannel as
428 // it is shutting down.
429 lrs_client_->lrs_channel_map_.erase(server_->Key());
430 lrs_call_.reset();
431 }
432
ResetBackoff()433 void LrsClient::LrsChannel::ResetBackoff() { transport_->ResetBackoff(); }
434
MaybeStartLrsCall()435 void LrsClient::LrsChannel::MaybeStartLrsCall() {
436 if (lrs_call_ != nullptr) return;
437 lrs_call_ = MakeOrphanable<RetryableCall<LrsCall>>(
438 WeakRef(DEBUG_LOCATION, "LrsCall"));
439 }
440
StopLrsCallLocked()441 void LrsClient::LrsChannel::StopLrsCallLocked() {
442 lrs_client_->load_report_map_.erase(server_->Key());
443 lrs_call_.reset();
444 }
445
446 //
447 // LrsClient::LrsChannel::RetryableCall<>
448 //
449
450 template <typename T>
RetryableCall(WeakRefCountedPtr<LrsChannel> lrs_channel)451 LrsClient::LrsChannel::RetryableCall<T>::RetryableCall(
452 WeakRefCountedPtr<LrsChannel> lrs_channel)
453 : lrs_channel_(std::move(lrs_channel)),
454 backoff_(BackOff::Options()
455 .set_initial_backoff(Duration::Seconds(
456 GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS))
457 .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
458 .set_jitter(GRPC_XDS_RECONNECT_JITTER)
459 .set_max_backoff(Duration::Seconds(
460 GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS))) {
461 StartNewCallLocked();
462 }
463
464 template <typename T>
Orphan()465 void LrsClient::LrsChannel::RetryableCall<T>::Orphan() {
466 shutting_down_ = true;
467 call_.reset();
468 if (timer_handle_.has_value()) {
469 lrs_channel()->lrs_client()->engine()->Cancel(*timer_handle_);
470 timer_handle_.reset();
471 }
472 this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
473 }
474
475 template <typename T>
OnCallFinishedLocked()476 void LrsClient::LrsChannel::RetryableCall<T>::OnCallFinishedLocked() {
477 // If we saw a response on the current stream, reset backoff.
478 if (call_->seen_response()) backoff_.Reset();
479 call_.reset();
480 // Start retry timer.
481 StartRetryTimerLocked();
482 }
483
484 template <typename T>
StartNewCallLocked()485 void LrsClient::LrsChannel::RetryableCall<T>::StartNewCallLocked() {
486 if (shutting_down_) return;
487 CHECK(lrs_channel_->transport_ != nullptr);
488 CHECK(call_ == nullptr);
489 GRPC_TRACE_LOG(xds_client, INFO)
490 << "[lrs_client " << lrs_channel()->lrs_client() << "] lrs server "
491 << lrs_channel()->server_->server_uri()
492 << ": start new call from retryable call " << this;
493 call_ = MakeOrphanable<T>(
494 this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
495 }
496
497 template <typename T>
StartRetryTimerLocked()498 void LrsClient::LrsChannel::RetryableCall<T>::StartRetryTimerLocked() {
499 if (shutting_down_) return;
500 const Duration delay = backoff_.NextAttemptDelay();
501 GRPC_TRACE_LOG(xds_client, INFO)
502 << "[lrs_client " << lrs_channel()->lrs_client() << "] lrs server "
503 << lrs_channel()->server_->server_uri()
504 << ": call attempt failed; retry timer will fire in " << delay.millis()
505 << "ms.";
506 timer_handle_ = lrs_channel()->lrs_client()->engine()->RunAfter(
507 delay,
508 [self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() {
509 ApplicationCallbackExecCtx callback_exec_ctx;
510 ExecCtx exec_ctx;
511 self->OnRetryTimer();
512 });
513 }
514
515 template <typename T>
OnRetryTimer()516 void LrsClient::LrsChannel::RetryableCall<T>::OnRetryTimer() {
517 MutexLock lock(&lrs_channel_->lrs_client()->mu_);
518 if (timer_handle_.has_value()) {
519 timer_handle_.reset();
520 if (shutting_down_) return;
521 GRPC_TRACE_LOG(xds_client, INFO)
522 << "[lrs_client " << lrs_channel()->lrs_client() << "] lrs server "
523 << lrs_channel()->server_->server_uri()
524 << ": retry timer fired (retryable call: " << this << ")";
525 StartNewCallLocked();
526 }
527 }
528
529 //
530 // LrsClient::LrsChannel::LrsCall::Timer
531 //
532
Orphan()533 void LrsClient::LrsChannel::LrsCall::Timer::Orphan() {
534 if (timer_handle_.has_value()) {
535 lrs_client()->engine()->Cancel(*timer_handle_);
536 timer_handle_.reset();
537 }
538 Unref(DEBUG_LOCATION, "Orphan");
539 }
540
ScheduleNextReportLocked()541 void LrsClient::LrsChannel::LrsCall::Timer::ScheduleNextReportLocked() {
542 GRPC_TRACE_LOG(xds_client, INFO)
543 << "[lrs_client " << lrs_client() << "] lrs server "
544 << lrs_call_->lrs_channel()->server_->server_uri()
545 << ": scheduling next load report in "
546 << lrs_call_->load_reporting_interval_;
547 timer_handle_ = lrs_client()->engine()->RunAfter(
548 lrs_call_->load_reporting_interval_,
549 [self = Ref(DEBUG_LOCATION, "timer")]() {
550 ApplicationCallbackExecCtx callback_exec_ctx;
551 ExecCtx exec_ctx;
552 self->OnNextReportTimer();
553 });
554 }
555
OnNextReportTimer()556 void LrsClient::LrsChannel::LrsCall::Timer::OnNextReportTimer() {
557 MutexLock lock(&lrs_client()->mu_);
558 timer_handle_.reset();
559 if (IsCurrentTimerOnCall()) lrs_call_->SendReportLocked();
560 }
561
562 //
563 // LrsClient::LrsChannel::LrsCall
564 //
565
LrsCall(RefCountedPtr<RetryableCall<LrsCall>> retryable_call)566 LrsClient::LrsChannel::LrsCall::LrsCall(
567 RefCountedPtr<RetryableCall<LrsCall>> retryable_call)
568 : InternallyRefCounted<LrsCall>(
569 GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) ? "LrsCall" : nullptr),
570 retryable_call_(std::move(retryable_call)) {
571 // Init the LRS call. Note that the call will progress every time there's
572 // activity in lrs_client()->interested_parties_, which is comprised of
573 // the polling entities from client_channel.
574 CHECK_NE(lrs_client(), nullptr);
575 const char* method =
576 "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats";
577 streaming_call_ = lrs_channel()->transport_->CreateStreamingCall(
578 method, std::make_unique<StreamEventHandler>(
579 // Passing the initial ref here. This ref will go away when
580 // the StreamEventHandler is destroyed.
581 RefCountedPtr<LrsCall>(this)));
582 CHECK(streaming_call_ != nullptr);
583 // Start the call.
584 GRPC_TRACE_LOG(xds_client, INFO)
585 << "[lrs_client " << lrs_client() << "] lrs server "
586 << lrs_channel()->server_->server_uri()
587 << ": starting LRS call (lrs_call=" << this
588 << ", streaming_call=" << streaming_call_.get() << ")";
589 // Send the initial request.
590 std::string serialized_payload = lrs_client()->CreateLrsInitialRequest();
591 SendMessageLocked(std::move(serialized_payload));
592 // Read initial response.
593 streaming_call_->StartRecvMessage();
594 }
595
Orphan()596 void LrsClient::LrsChannel::LrsCall::Orphan() {
597 timer_.reset();
598 // Note that the initial ref is held by the StreamEventHandler, which
599 // will be destroyed when streaming_call_ is destroyed, which may not happen
600 // here, since there may be other refs held to streaming_call_ by internal
601 // callbacks.
602 streaming_call_.reset();
603 }
604
MaybeScheduleNextReportLocked()605 void LrsClient::LrsChannel::LrsCall::MaybeScheduleNextReportLocked() {
606 // If there are no more registered stats to report, cancel the call.
607 auto it = lrs_client()->load_report_map_.find(lrs_channel()->server_->Key());
608 if (it == lrs_client()->load_report_map_.end() ||
609 it->second.load_report_map.empty()) {
610 it->second.lrs_channel->StopLrsCallLocked();
611 return;
612 }
613 // Don't start if the previous send_message op hasn't completed yet.
614 // If this happens, we'll be called again from OnRequestSent().
615 if (send_message_pending_) return;
616 // Don't start if no LRS response has arrived.
617 if (!seen_response()) return;
618 // If there is no timer, create one.
619 // This happens on the initial response and whenever the interval changes.
620 if (timer_ == nullptr) {
621 timer_ = MakeOrphanable<Timer>(Ref(DEBUG_LOCATION, "LRS timer"));
622 }
623 // Schedule the next load report.
624 timer_->ScheduleNextReportLocked();
625 }
626
LoadReportCountersAreZero(const ClusterLoadReportMap & snapshot)627 bool LrsClient::LoadReportCountersAreZero(
628 const ClusterLoadReportMap& snapshot) {
629 for (const auto& p : snapshot) {
630 const ClusterLoadReport& cluster_snapshot = p.second;
631 if (!cluster_snapshot.dropped_requests.IsZero()) return false;
632 for (const auto& q : cluster_snapshot.locality_stats) {
633 const ClusterLocalityStats::Snapshot& locality_snapshot = q.second;
634 if (!locality_snapshot.IsZero()) return false;
635 }
636 }
637 return true;
638 }
639
SendReportLocked()640 void LrsClient::LrsChannel::LrsCall::SendReportLocked() {
641 // Construct snapshot from all reported stats.
642 ClusterLoadReportMap snapshot = lrs_client()->BuildLoadReportSnapshotLocked(
643 *lrs_channel()->server_, send_all_clusters_, cluster_names_);
644 // Skip client load report if the counters were all zero in the last
645 // report and they are still zero in this one.
646 const bool old_val = last_report_counters_were_zero_;
647 last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
648 if (old_val && last_report_counters_were_zero_) {
649 MaybeScheduleNextReportLocked();
650 return;
651 }
652 // Send a request that contains the snapshot.
653 std::string serialized_payload =
654 lrs_client()->CreateLrsRequest(std::move(snapshot));
655 SendMessageLocked(std::move(serialized_payload));
656 }
657
SendMessageLocked(std::string payload)658 void LrsClient::LrsChannel::LrsCall::SendMessageLocked(std::string payload) {
659 send_message_pending_ = true;
660 streaming_call_->SendMessage(std::move(payload));
661 }
662
OnRequestSent()663 void LrsClient::LrsChannel::LrsCall::OnRequestSent() {
664 MutexLock lock(&lrs_client()->mu_);
665 send_message_pending_ = false;
666 if (IsCurrentCallOnChannel()) MaybeScheduleNextReportLocked();
667 }
668
OnRecvMessage(absl::string_view payload)669 void LrsClient::LrsChannel::LrsCall::OnRecvMessage(absl::string_view payload) {
670 MutexLock lock(&lrs_client()->mu_);
671 // If we're no longer the current call, ignore the result.
672 if (!IsCurrentCallOnChannel()) return;
673 // Start recv after any code branch
674 auto cleanup = absl::MakeCleanup(
675 [call = streaming_call_.get()]() { call->StartRecvMessage(); });
676 // Parse the response.
677 bool send_all_clusters = false;
678 std::set<std::string> new_cluster_names;
679 Duration new_load_reporting_interval;
680 absl::Status status = lrs_client()->ParseLrsResponse(
681 payload, &send_all_clusters, &new_cluster_names,
682 &new_load_reporting_interval);
683 if (!status.ok()) {
684 LOG(ERROR) << "[lrs_client " << lrs_client() << "] lrs server "
685 << lrs_channel()->server_->server_uri()
686 << ": LRS response parsing failed: " << status;
687 return;
688 }
689 seen_response_ = true;
690 if (GRPC_TRACE_FLAG_ENABLED(xds_client)) {
691 LOG(INFO) << "[lrs_client " << lrs_client() << "] lrs server "
692 << lrs_channel()->server_->server_uri()
693 << ": LRS response received, " << new_cluster_names.size()
694 << " cluster names, send_all_clusters=" << send_all_clusters
695 << ", load_report_interval="
696 << new_load_reporting_interval.millis() << "ms";
697 size_t i = 0;
698 for (const auto& name : new_cluster_names) {
699 LOG(INFO) << "[lrs_client " << lrs_client() << "] cluster_name " << i++
700 << ": " << name;
701 }
702 }
703 if (new_load_reporting_interval <
704 Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS)) {
705 new_load_reporting_interval =
706 Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
707 GRPC_TRACE_LOG(xds_client, INFO)
708 << "[lrs_client " << lrs_client() << "] lrs server "
709 << lrs_channel()->server_->server_uri()
710 << ": increased load_report_interval to minimum value "
711 << GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS << "ms";
712 }
713 // Ignore identical update.
714 if (send_all_clusters == send_all_clusters_ &&
715 cluster_names_ == new_cluster_names &&
716 load_reporting_interval_ == new_load_reporting_interval) {
717 GRPC_TRACE_LOG(xds_client, INFO)
718 << "[lrs_client " << lrs_client() << "] lrs server "
719 << lrs_channel()->server_->server_uri()
720 << ": incoming LRS response identical to current, ignoring.";
721 return;
722 }
723 // If the interval has changed, we'll need to restart the timer below.
724 const bool restart_timer =
725 load_reporting_interval_ != new_load_reporting_interval;
726 // Record the new config.
727 send_all_clusters_ = send_all_clusters;
728 cluster_names_ = std::move(new_cluster_names);
729 load_reporting_interval_ = new_load_reporting_interval;
730 // Restart timer if needed.
731 if (restart_timer) {
732 timer_.reset();
733 MaybeScheduleNextReportLocked();
734 }
735 }
736
OnStatusReceived(absl::Status status)737 void LrsClient::LrsChannel::LrsCall::OnStatusReceived(absl::Status status) {
738 MutexLock lock(&lrs_client()->mu_);
739 GRPC_TRACE_LOG(xds_client, INFO)
740 << "[lrs_client " << lrs_client() << "] lrs server "
741 << lrs_channel()->server_->server_uri()
742 << ": LRS call status received (lrs_channel=" << lrs_channel()
743 << ", lrs_call=" << this << ", streaming_call=" << streaming_call_.get()
744 << "): " << status;
745 // Ignore status from a stale call.
746 if (IsCurrentCallOnChannel()) {
747 // Try to restart the call.
748 retryable_call_->OnCallFinishedLocked();
749 }
750 }
751
IsCurrentCallOnChannel() const752 bool LrsClient::LrsChannel::LrsCall::IsCurrentCallOnChannel() const {
753 // If the retryable LRS call is null (which only happens when the lrs
754 // channel is shutting down), all the LRS calls are stale.
755 if (lrs_channel()->lrs_call_ == nullptr) return false;
756 return this == lrs_channel()->lrs_call_->call();
757 }
758
759 //
760 // LrsClient
761 //
762
LrsClient(std::shared_ptr<XdsBootstrap> bootstrap,std::string user_agent_name,std::string user_agent_version,RefCountedPtr<XdsTransportFactory> transport_factory,std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine)763 LrsClient::LrsClient(
764 std::shared_ptr<XdsBootstrap> bootstrap, std::string user_agent_name,
765 std::string user_agent_version,
766 RefCountedPtr<XdsTransportFactory> transport_factory,
767 std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine)
768 : DualRefCounted<LrsClient>(
769 GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) ? "LrsClient" : nullptr),
770 bootstrap_(std::move(bootstrap)),
771 user_agent_name_(std::move(user_agent_name)),
772 user_agent_version_(std::move(user_agent_version)),
773 transport_factory_(std::move(transport_factory)),
774 engine_(std::move(engine)) {
775 GRPC_TRACE_LOG(xds_client, INFO)
776 << "[lrs_client " << this << "] creating lrs client";
777 }
778
~LrsClient()779 LrsClient::~LrsClient() {
780 GRPC_TRACE_LOG(xds_client, INFO)
781 << "[lrs_client " << this << "] destroying lrs client";
782 }
783
Orphaned()784 void LrsClient::Orphaned() {
785 GRPC_TRACE_LOG(xds_client, INFO)
786 << "[lrs_client " << this << "] shutting down lrs client";
787 MutexLock lock(&mu_);
788 // We may still be sending lingering queued load report data, so don't
789 // just clear the load reporting map, but we do want to clear the refs
790 // we're holding to the LrsChannel objects, to make sure that
791 // everything shuts down properly.
792 for (auto& p : load_report_map_) {
793 p.second.lrs_channel.reset(DEBUG_LOCATION, "LrsClient::Orphan()");
794 }
795 }
796
GetOrCreateLrsChannelLocked(std::shared_ptr<const XdsBootstrap::XdsServer> server,const char * reason)797 RefCountedPtr<LrsClient::LrsChannel> LrsClient::GetOrCreateLrsChannelLocked(
798 std::shared_ptr<const XdsBootstrap::XdsServer> server, const char* reason) {
799 std::string key = server->Key();
800 auto it = lrs_channel_map_.find(key);
801 if (it != lrs_channel_map_.end()) {
802 return it->second->Ref(DEBUG_LOCATION, reason);
803 }
804 // Channel not found, so create a new one.
805 auto lrs_channel = MakeRefCounted<LrsChannel>(
806 WeakRef(DEBUG_LOCATION, "LrsChannel"), std::move(server));
807 lrs_channel_map_[std::move(key)] = lrs_channel.get();
808 return lrs_channel;
809 }
810
AddClusterDropStats(std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name)811 RefCountedPtr<LrsClient::ClusterDropStats> LrsClient::AddClusterDropStats(
812 std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server,
813 absl::string_view cluster_name, absl::string_view eds_service_name) {
814 auto key =
815 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
816 RefCountedPtr<ClusterDropStats> cluster_drop_stats;
817 {
818 MutexLock lock(&mu_);
819 // We jump through some hoops here to make sure that the
820 // absl::string_views stored in the ClusterDropStats object point
821 // to the strings in the load_report_map_ keys, so that
822 // they have the same lifetime.
823 auto server_it =
824 load_report_map_.emplace(lrs_server->Key(), LoadReportServer()).first;
825 if (server_it->second.lrs_channel == nullptr) {
826 server_it->second.lrs_channel = GetOrCreateLrsChannelLocked(
827 lrs_server, "load report map (drop stats)");
828 }
829 auto load_report_it = server_it->second.load_report_map
830 .emplace(std::move(key), LoadReportState())
831 .first;
832 LoadReportState& load_report_state = load_report_it->second;
833 if (load_report_state.drop_stats != nullptr) {
834 cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
835 }
836 if (cluster_drop_stats == nullptr) {
837 if (load_report_state.drop_stats != nullptr) {
838 load_report_state.deleted_drop_stats +=
839 load_report_state.drop_stats->GetSnapshotAndReset();
840 }
841 cluster_drop_stats = MakeRefCounted<ClusterDropStats>(
842 Ref(DEBUG_LOCATION, "DropStats"), server_it->first /*lrs_server*/,
843 load_report_it->first.first /*cluster_name*/,
844 load_report_it->first.second /*eds_service_name*/);
845 load_report_state.drop_stats = cluster_drop_stats.get();
846 }
847 server_it->second.lrs_channel->MaybeStartLrsCall();
848 }
849 return cluster_drop_stats;
850 }
851
RemoveClusterDropStats(absl::string_view lrs_server_key,absl::string_view cluster_name,absl::string_view eds_service_name,LrsClient::ClusterDropStats * cluster_drop_stats)852 void LrsClient::RemoveClusterDropStats(
853 absl::string_view lrs_server_key, absl::string_view cluster_name,
854 absl::string_view eds_service_name,
855 LrsClient::ClusterDropStats* cluster_drop_stats) {
856 MutexLock lock(&mu_);
857 auto server_it = load_report_map_.find(lrs_server_key);
858 if (server_it == load_report_map_.end()) return;
859 auto load_report_it = server_it->second.load_report_map.find(
860 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
861 if (load_report_it == server_it->second.load_report_map.end()) return;
862 LoadReportState& load_report_state = load_report_it->second;
863 if (load_report_state.drop_stats == cluster_drop_stats) {
864 // Record final snapshot in deleted_drop_stats, which will be
865 // added to the next load report.
866 load_report_state.deleted_drop_stats +=
867 load_report_state.drop_stats->GetSnapshotAndReset();
868 load_report_state.drop_stats = nullptr;
869 }
870 }
871
872 RefCountedPtr<LrsClient::ClusterLocalityStats>
AddClusterLocalityStats(std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> locality,RefCountedPtr<const BackendMetricPropagation> backend_metric_propagation)873 LrsClient::AddClusterLocalityStats(
874 std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server,
875 absl::string_view cluster_name, absl::string_view eds_service_name,
876 RefCountedPtr<XdsLocalityName> locality,
877 RefCountedPtr<const BackendMetricPropagation> backend_metric_propagation) {
878 auto key =
879 std::make_pair(std::string(cluster_name), std::string(eds_service_name));
880 RefCountedPtr<ClusterLocalityStats> cluster_locality_stats;
881 {
882 MutexLock lock(&mu_);
883 // We jump through some hoops here to make sure that the
884 // absl::string_views stored in the ClusterLocalityStats object point
885 // to the strings in the load_report_map_ keys, so that
886 // they have the same lifetime.
887 auto server_it =
888 load_report_map_.emplace(lrs_server->Key(), LoadReportServer()).first;
889 if (server_it->second.lrs_channel == nullptr) {
890 server_it->second.lrs_channel = GetOrCreateLrsChannelLocked(
891 std::move(lrs_server), "load report map (locality stats)");
892 }
893 auto load_report_it = server_it->second.load_report_map
894 .emplace(std::move(key), LoadReportState())
895 .first;
896 LoadReportState& load_report_state = load_report_it->second;
897 LoadReportState::LocalityState& locality_state =
898 load_report_state.locality_stats[locality];
899 ClusterLocalityStats*& locality_stats =
900 locality_state.propagation_stats[backend_metric_propagation];
901 if (locality_stats != nullptr) {
902 cluster_locality_stats = locality_stats->RefIfNonZero();
903 }
904 if (cluster_locality_stats == nullptr) {
905 if (locality_stats != nullptr) {
906 locality_state.deleted_locality_stats +=
907 locality_stats->GetSnapshotAndReset();
908 }
909 cluster_locality_stats = MakeRefCounted<ClusterLocalityStats>(
910 Ref(DEBUG_LOCATION, "LocalityStats"), server_it->first /*lrs_server*/,
911 load_report_it->first.first /*cluster_name*/,
912 load_report_it->first.second /*eds_service_name*/,
913 std::move(locality), std::move(backend_metric_propagation));
914 locality_stats = cluster_locality_stats.get();
915 }
916 server_it->second.lrs_channel->MaybeStartLrsCall();
917 }
918 return cluster_locality_stats;
919 }
920
RemoveClusterLocalityStats(absl::string_view lrs_server_key,absl::string_view cluster_name,absl::string_view eds_service_name,const RefCountedPtr<XdsLocalityName> & locality,const RefCountedPtr<const BackendMetricPropagation> & backend_metric_propagation,ClusterLocalityStats * cluster_locality_stats)921 void LrsClient::RemoveClusterLocalityStats(
922 absl::string_view lrs_server_key, absl::string_view cluster_name,
923 absl::string_view eds_service_name,
924 const RefCountedPtr<XdsLocalityName>& locality,
925 const RefCountedPtr<const BackendMetricPropagation>&
926 backend_metric_propagation,
927 ClusterLocalityStats* cluster_locality_stats) {
928 MutexLock lock(&mu_);
929 auto server_it = load_report_map_.find(lrs_server_key);
930 if (server_it == load_report_map_.end()) return;
931 auto load_report_it = server_it->second.load_report_map.find(
932 std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
933 if (load_report_it == server_it->second.load_report_map.end()) return;
934 LoadReportState& load_report_state = load_report_it->second;
935 auto locality_it = load_report_state.locality_stats.find(locality);
936 if (locality_it == load_report_state.locality_stats.end()) return;
937 LoadReportState::LocalityState& locality_state = locality_it->second;
938 auto propagation_it =
939 locality_state.propagation_stats.find(backend_metric_propagation);
940 if (propagation_it == locality_state.propagation_stats.end()) return;
941 ClusterLocalityStats* locality_stats = propagation_it->second;
942 if (locality_stats == cluster_locality_stats) {
943 // Record final snapshot in deleted_locality_stats, which will be
944 // added to the next load report.
945 locality_state.deleted_locality_stats +=
946 locality_stats->GetSnapshotAndReset();
947 locality_state.propagation_stats.erase(propagation_it);
948 }
949 }
950
ResetBackoff()951 void LrsClient::ResetBackoff() {
952 MutexLock lock(&mu_);
953 for (auto& p : lrs_channel_map_) {
954 p.second->ResetBackoff();
955 }
956 }
957
BuildLoadReportSnapshotLocked(const XdsBootstrap::XdsServer & lrs_server,bool send_all_clusters,const std::set<std::string> & clusters)958 LrsClient::ClusterLoadReportMap LrsClient::BuildLoadReportSnapshotLocked(
959 const XdsBootstrap::XdsServer& lrs_server, bool send_all_clusters,
960 const std::set<std::string>& clusters) {
961 GRPC_TRACE_LOG(xds_client, INFO)
962 << "[lrs_client " << this << "] start building load report";
963 ClusterLoadReportMap snapshot_map;
964 auto server_it = load_report_map_.find(lrs_server.Key());
965 if (server_it == load_report_map_.end()) return snapshot_map;
966 auto& load_report_map = server_it->second.load_report_map;
967 for (auto load_report_it = load_report_map.begin();
968 load_report_it != load_report_map.end();) {
969 // Cluster key is cluster and EDS service name.
970 const auto& cluster_key = load_report_it->first;
971 LoadReportState& load_report = load_report_it->second;
972 // If the CDS response for a cluster indicates to use LRS but the
973 // LRS server does not say that it wants reports for this cluster,
974 // then we'll have stats objects here whose data we're not going to
975 // include in the load report. However, we still need to clear out
976 // the data from the stats objects, so that if the LRS server starts
977 // asking for the data in the future, we don't incorrectly include
978 // data from previous reporting intervals in that future report.
979 const bool record_stats =
980 send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
981 ClusterLoadReport snapshot;
982 // Aggregate drop stats.
983 snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
984 if (load_report.drop_stats != nullptr) {
985 snapshot.dropped_requests +=
986 load_report.drop_stats->GetSnapshotAndReset();
987 GRPC_TRACE_LOG(xds_client, INFO)
988 << "[lrs_client " << this << "] cluster=" << cluster_key.first
989 << " eds_service_name=" << cluster_key.second
990 << " drop_stats=" << load_report.drop_stats;
991 }
992 // Aggregate locality stats.
993 for (auto it = load_report.locality_stats.begin();
994 it != load_report.locality_stats.end();) {
995 const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
996 auto& locality_state = it->second;
997 ClusterLocalityStats::Snapshot& locality_snapshot =
998 snapshot.locality_stats[locality_name];
999 locality_snapshot = std::move(locality_state.deleted_locality_stats);
1000 for (const auto& p : locality_state.propagation_stats) {
1001 ClusterLocalityStats* locality_stats = p.second;
1002 if (locality_stats != nullptr) {
1003 locality_snapshot += locality_stats->GetSnapshotAndReset();
1004 GRPC_TRACE_LOG(xds_client, INFO)
1005 << "[lrs_client " << this
1006 << "] cluster=" << cluster_key.first.c_str()
1007 << " eds_service_name=" << cluster_key.second.c_str()
1008 << " locality=" << locality_name->human_readable_string().c_str()
1009 << " propagation=" << p.first->AsString()
1010 << " locality_stats=" << locality_stats;
1011 }
1012 }
1013 // If the only thing left in this entry was final snapshots from
1014 // deleted locality stats objects, remove the entry.
1015 if (locality_state.propagation_stats.empty()) {
1016 it = load_report.locality_stats.erase(it);
1017 } else {
1018 ++it;
1019 }
1020 }
1021 // Compute load report interval.
1022 const Timestamp now = Timestamp::Now();
1023 snapshot.load_report_interval = now - load_report.last_report_time;
1024 load_report.last_report_time = now;
1025 // Record snapshot.
1026 if (record_stats) {
1027 snapshot_map[cluster_key] = std::move(snapshot);
1028 }
1029 // If the only thing left in this entry was final snapshots from
1030 // deleted stats objects, remove the entry.
1031 if (load_report.locality_stats.empty() &&
1032 load_report.drop_stats == nullptr) {
1033 load_report_it = load_report_map.erase(load_report_it);
1034 } else {
1035 ++load_report_it;
1036 }
1037 }
1038 return snapshot_map;
1039 }
1040
1041 namespace {
1042
1043 struct LrsApiContext {
1044 LrsClient* client;
1045 upb_DefPool* def_pool;
1046 upb_Arena* arena;
1047 };
1048
MaybeLogLrsRequest(const LrsApiContext & context,const envoy_service_load_stats_v3_LoadStatsRequest * request)1049 void MaybeLogLrsRequest(
1050 const LrsApiContext& context,
1051 const envoy_service_load_stats_v3_LoadStatsRequest* request) {
1052 if (GRPC_TRACE_FLAG_ENABLED(xds_client) && ABSL_VLOG_IS_ON(2)) {
1053 const upb_MessageDef* msg_type =
1054 envoy_service_load_stats_v3_LoadStatsRequest_getmsgdef(
1055 context.def_pool);
1056 char buf[10240];
1057 upb_TextEncode(reinterpret_cast<const upb_Message*>(request), msg_type,
1058 nullptr, 0, buf, sizeof(buf));
1059 VLOG(2) << "[lrs_client " << context.client
1060 << "] constructed LRS request: " << buf;
1061 }
1062 }
1063
SerializeLrsRequest(const LrsApiContext & context,const envoy_service_load_stats_v3_LoadStatsRequest * request)1064 std::string SerializeLrsRequest(
1065 const LrsApiContext& context,
1066 const envoy_service_load_stats_v3_LoadStatsRequest* request) {
1067 size_t output_length;
1068 char* output = envoy_service_load_stats_v3_LoadStatsRequest_serialize(
1069 request, context.arena, &output_length);
1070 return std::string(output, output_length);
1071 }
1072
1073 } // namespace
1074
CreateLrsInitialRequest()1075 std::string LrsClient::CreateLrsInitialRequest() {
1076 upb::Arena arena;
1077 const LrsApiContext context = {this, def_pool_.ptr(), arena.ptr()};
1078 // Create a request.
1079 envoy_service_load_stats_v3_LoadStatsRequest* request =
1080 envoy_service_load_stats_v3_LoadStatsRequest_new(arena.ptr());
1081 // Populate node.
1082 envoy_config_core_v3_Node* node_msg =
1083 envoy_service_load_stats_v3_LoadStatsRequest_mutable_node(request,
1084 arena.ptr());
1085 PopulateXdsNode(bootstrap_->node(), user_agent_name_, user_agent_version_,
1086 node_msg, arena.ptr());
1087 envoy_config_core_v3_Node_add_client_features(
1088 node_msg,
1089 upb_StringView_FromString("envoy.lrs.supports_send_all_clusters"),
1090 arena.ptr());
1091 MaybeLogLrsRequest(context, request);
1092 return SerializeLrsRequest(context, request);
1093 }
1094
1095 namespace {
1096
MaybeAddUnnamedMetric(const LrsApiContext & context,const LrsClient::ClusterLocalityStats::BackendMetric & backend_metric,envoy_config_endpoint_v3_UnnamedEndpointLoadMetricStats * (* add_field)(envoy_config_endpoint_v3_UpstreamLocalityStats *,upb_Arena *),envoy_config_endpoint_v3_UpstreamLocalityStats * output)1097 void MaybeAddUnnamedMetric(
1098 const LrsApiContext& context,
1099 const LrsClient::ClusterLocalityStats::BackendMetric& backend_metric,
1100 envoy_config_endpoint_v3_UnnamedEndpointLoadMetricStats* (*add_field)(
1101 envoy_config_endpoint_v3_UpstreamLocalityStats*, upb_Arena*),
1102 envoy_config_endpoint_v3_UpstreamLocalityStats* output) {
1103 if (backend_metric.IsZero()) return;
1104 auto* metric_proto = add_field(output, context.arena);
1105 envoy_config_endpoint_v3_UnnamedEndpointLoadMetricStats_set_num_requests_finished_with_metric(
1106 metric_proto, backend_metric.num_requests_finished_with_metric);
1107 envoy_config_endpoint_v3_UnnamedEndpointLoadMetricStats_set_total_metric_value(
1108 metric_proto, backend_metric.total_metric_value);
1109 }
1110
LocalityStatsPopulate(const LrsApiContext & context,envoy_config_endpoint_v3_UpstreamLocalityStats * output,const XdsLocalityName & locality_name,const LrsClient::ClusterLocalityStats::Snapshot & snapshot)1111 void LocalityStatsPopulate(
1112 const LrsApiContext& context,
1113 envoy_config_endpoint_v3_UpstreamLocalityStats* output,
1114 const XdsLocalityName& locality_name,
1115 const LrsClient::ClusterLocalityStats::Snapshot& snapshot) {
1116 // Set locality.
1117 envoy_config_core_v3_Locality* locality =
1118 envoy_config_endpoint_v3_UpstreamLocalityStats_mutable_locality(
1119 output, context.arena);
1120 if (!locality_name.region().empty()) {
1121 envoy_config_core_v3_Locality_set_region(
1122 locality, StdStringToUpbString(locality_name.region()));
1123 }
1124 if (!locality_name.zone().empty()) {
1125 envoy_config_core_v3_Locality_set_zone(
1126 locality, StdStringToUpbString(locality_name.zone()));
1127 }
1128 if (!locality_name.sub_zone().empty()) {
1129 envoy_config_core_v3_Locality_set_sub_zone(
1130 locality, StdStringToUpbString(locality_name.sub_zone()));
1131 }
1132 // Set total counts.
1133 envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_successful_requests(
1134 output, snapshot.total_successful_requests);
1135 envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_requests_in_progress(
1136 output, snapshot.total_requests_in_progress);
1137 envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_error_requests(
1138 output, snapshot.total_error_requests);
1139 envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_issued_requests(
1140 output, snapshot.total_issued_requests);
1141 // Add backend metrics.
1142 MaybeAddUnnamedMetric(
1143 context, snapshot.cpu_utilization,
1144 envoy_config_endpoint_v3_UpstreamLocalityStats_mutable_cpu_utilization,
1145 output);
1146 MaybeAddUnnamedMetric(
1147 context, snapshot.mem_utilization,
1148 envoy_config_endpoint_v3_UpstreamLocalityStats_mutable_mem_utilization,
1149 output);
1150 MaybeAddUnnamedMetric(
1151 context, snapshot.application_utilization,
1152 envoy_config_endpoint_v3_UpstreamLocalityStats_mutable_application_utilization,
1153 output);
1154 for (const auto& p : snapshot.backend_metrics) {
1155 const std::string& metric_name = p.first;
1156 const LrsClient::ClusterLocalityStats::BackendMetric& metric_value =
1157 p.second;
1158 envoy_config_endpoint_v3_EndpointLoadMetricStats* load_metric =
1159 envoy_config_endpoint_v3_UpstreamLocalityStats_add_load_metric_stats(
1160 output, context.arena);
1161 envoy_config_endpoint_v3_EndpointLoadMetricStats_set_metric_name(
1162 load_metric, StdStringToUpbString(metric_name));
1163 envoy_config_endpoint_v3_EndpointLoadMetricStats_set_num_requests_finished_with_metric(
1164 load_metric, metric_value.num_requests_finished_with_metric);
1165 envoy_config_endpoint_v3_EndpointLoadMetricStats_set_total_metric_value(
1166 load_metric, metric_value.total_metric_value);
1167 }
1168 }
1169
1170 } // namespace
1171
CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map)1172 std::string LrsClient::CreateLrsRequest(
1173 ClusterLoadReportMap cluster_load_report_map) {
1174 upb::Arena arena;
1175 const LrsApiContext context = {this, def_pool_.ptr(), arena.ptr()};
1176 // Create a request.
1177 envoy_service_load_stats_v3_LoadStatsRequest* request =
1178 envoy_service_load_stats_v3_LoadStatsRequest_new(arena.ptr());
1179 for (auto& p : cluster_load_report_map) {
1180 const std::string& cluster_name = p.first.first;
1181 const std::string& eds_service_name = p.first.second;
1182 const ClusterLoadReport& load_report = p.second;
1183 // Add cluster stats.
1184 envoy_config_endpoint_v3_ClusterStats* cluster_stats =
1185 envoy_service_load_stats_v3_LoadStatsRequest_add_cluster_stats(
1186 request, arena.ptr());
1187 // Set the cluster name.
1188 envoy_config_endpoint_v3_ClusterStats_set_cluster_name(
1189 cluster_stats, StdStringToUpbString(cluster_name));
1190 // Set EDS service name, if non-empty.
1191 if (!eds_service_name.empty()) {
1192 envoy_config_endpoint_v3_ClusterStats_set_cluster_service_name(
1193 cluster_stats, StdStringToUpbString(eds_service_name));
1194 }
1195 // Add locality stats.
1196 for (const auto& p : load_report.locality_stats) {
1197 const XdsLocalityName& locality_name = *p.first;
1198 const auto& snapshot = p.second;
1199 envoy_config_endpoint_v3_UpstreamLocalityStats* locality_stats =
1200 envoy_config_endpoint_v3_ClusterStats_add_upstream_locality_stats(
1201 cluster_stats, arena.ptr());
1202 LocalityStatsPopulate(context, locality_stats, locality_name, snapshot);
1203 }
1204 // Add dropped requests.
1205 uint64_t total_dropped_requests = 0;
1206 for (const auto& p : load_report.dropped_requests.categorized_drops) {
1207 const std::string& category = p.first;
1208 const uint64_t count = p.second;
1209 envoy_config_endpoint_v3_ClusterStats_DroppedRequests* dropped_requests =
1210 envoy_config_endpoint_v3_ClusterStats_add_dropped_requests(
1211 cluster_stats, arena.ptr());
1212 envoy_config_endpoint_v3_ClusterStats_DroppedRequests_set_category(
1213 dropped_requests, StdStringToUpbString(category));
1214 envoy_config_endpoint_v3_ClusterStats_DroppedRequests_set_dropped_count(
1215 dropped_requests, count);
1216 total_dropped_requests += count;
1217 }
1218 total_dropped_requests += load_report.dropped_requests.uncategorized_drops;
1219 // Set total dropped requests.
1220 envoy_config_endpoint_v3_ClusterStats_set_total_dropped_requests(
1221 cluster_stats, total_dropped_requests);
1222 // Set real load report interval.
1223 gpr_timespec timespec = load_report.load_report_interval.as_timespec();
1224 google_protobuf_Duration* load_report_interval =
1225 envoy_config_endpoint_v3_ClusterStats_mutable_load_report_interval(
1226 cluster_stats, arena.ptr());
1227 google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec);
1228 google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec);
1229 }
1230 MaybeLogLrsRequest(context, request);
1231 return SerializeLrsRequest(context, request);
1232 }
1233
1234 namespace {
1235
MaybeLogLrsResponse(const LrsApiContext & context,const envoy_service_load_stats_v3_LoadStatsResponse * response)1236 void MaybeLogLrsResponse(
1237 const LrsApiContext& context,
1238 const envoy_service_load_stats_v3_LoadStatsResponse* response) {
1239 if (GRPC_TRACE_FLAG_ENABLED(xds_client) && ABSL_VLOG_IS_ON(2)) {
1240 const upb_MessageDef* msg_type =
1241 envoy_service_load_stats_v3_LoadStatsResponse_getmsgdef(
1242 context.def_pool);
1243 char buf[10240];
1244 upb_TextEncode(reinterpret_cast<const upb_Message*>(response), msg_type,
1245 nullptr, 0, buf, sizeof(buf));
1246 VLOG(2) << "[lrs_client " << context.client
1247 << "] received LRS response: " << buf;
1248 }
1249 }
1250
1251 } // namespace
1252
ParseLrsResponse(absl::string_view encoded_response,bool * send_all_clusters,std::set<std::string> * cluster_names,Duration * load_reporting_interval)1253 absl::Status LrsClient::ParseLrsResponse(absl::string_view encoded_response,
1254 bool* send_all_clusters,
1255 std::set<std::string>* cluster_names,
1256 Duration* load_reporting_interval) {
1257 upb::Arena arena;
1258 // Decode the response.
1259 const envoy_service_load_stats_v3_LoadStatsResponse* decoded_response =
1260 envoy_service_load_stats_v3_LoadStatsResponse_parse(
1261 encoded_response.data(), encoded_response.size(), arena.ptr());
1262 // Parse the response.
1263 if (decoded_response == nullptr) {
1264 return absl::UnavailableError("Can't decode response.");
1265 }
1266 const LrsApiContext context = {this, def_pool_.ptr(), arena.ptr()};
1267 MaybeLogLrsResponse(context, decoded_response);
1268 // Check send_all_clusters.
1269 if (envoy_service_load_stats_v3_LoadStatsResponse_send_all_clusters(
1270 decoded_response)) {
1271 *send_all_clusters = true;
1272 } else {
1273 // Store the cluster names.
1274 size_t size;
1275 const upb_StringView* clusters =
1276 envoy_service_load_stats_v3_LoadStatsResponse_clusters(decoded_response,
1277 &size);
1278 for (size_t i = 0; i < size; ++i) {
1279 cluster_names->emplace(UpbStringToStdString(clusters[i]));
1280 }
1281 }
1282 // Get the load report interval.
1283 const google_protobuf_Duration* load_reporting_interval_duration =
1284 envoy_service_load_stats_v3_LoadStatsResponse_load_reporting_interval(
1285 decoded_response);
1286 *load_reporting_interval = Duration::FromSecondsAndNanoseconds(
1287 google_protobuf_Duration_seconds(load_reporting_interval_duration),
1288 google_protobuf_Duration_nanos(load_reporting_interval_duration));
1289 return absl::OkStatus();
1290 }
1291
1292 } // namespace grpc_core
1293