• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/xds/xds_client/lrs_client.h"
18 
19 #include <grpc/event_engine/event_engine.h>
20 
21 #include <memory>
22 #include <set>
23 #include <string>
24 #include <vector>
25 
26 #include "absl/cleanup/cleanup.h"
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "absl/strings/string_view.h"
30 #include "absl/types/optional.h"
31 #include "envoy/config/core/v3/base.upb.h"
32 #include "envoy/config/endpoint/v3/load_report.upb.h"
33 #include "envoy/service/load_stats/v3/lrs.upb.h"
34 #include "envoy/service/load_stats/v3/lrs.upbdefs.h"
35 #include "google/protobuf/duration.upb.h"
36 #include "src/core/lib/debug/trace.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/util/backoff.h"
39 #include "src/core/util/debug_location.h"
40 #include "src/core/util/env.h"
41 #include "src/core/util/orphanable.h"
42 #include "src/core/util/ref_counted_ptr.h"
43 #include "src/core/util/string.h"
44 #include "src/core/util/sync.h"
45 #include "src/core/util/upb_utils.h"
46 #include "src/core/util/uri.h"
47 #include "src/core/xds/xds_client/xds_api.h"
48 #include "src/core/xds/xds_client/xds_bootstrap.h"
49 #include "src/core/xds/xds_client/xds_locality.h"
50 #include "upb/base/string_view.h"
51 #include "upb/mem/arena.h"
52 #include "upb/reflection/def.h"
53 #include "upb/text/encode.h"
54 
55 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
56 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
57 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
58 #define GRPC_XDS_RECONNECT_JITTER 0.2
59 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
60 
61 namespace grpc_core {
62 
63 using ::grpc_event_engine::experimental::EventEngine;
64 
65 // TODO(roth): Remove this once the feature passes interop tests.
XdsOrcaLrsPropagationChangesEnabled()66 bool XdsOrcaLrsPropagationChangesEnabled() {
67   auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_ORCA_LRS_PROPAGATION");
68   if (!value.has_value()) return false;
69   bool parsed_value;
70   bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
71   return parse_succeeded && parsed_value;
72 }
73 
74 namespace {
75 
GetAndResetCounter(std::atomic<uint64_t> * from)76 uint64_t GetAndResetCounter(std::atomic<uint64_t>* from) {
77   return from->exchange(0, std::memory_order_relaxed);
78 }
79 
80 }  // namespace
81 
82 //
83 // LrsClient::ClusterDropStats
84 //
85 
ClusterDropStats(RefCountedPtr<LrsClient> lrs_client,absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name)86 LrsClient::ClusterDropStats::ClusterDropStats(
87     RefCountedPtr<LrsClient> lrs_client, absl::string_view lrs_server,
88     absl::string_view cluster_name, absl::string_view eds_service_name)
89     : RefCounted(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
90                      ? "ClusterDropStats"
91                      : nullptr),
92       lrs_client_(std::move(lrs_client)),
93       lrs_server_(lrs_server),
94       cluster_name_(cluster_name),
95       eds_service_name_(eds_service_name) {
96   GRPC_TRACE_LOG(xds_client, INFO)
97       << "[lrs_client " << lrs_client_.get() << "] created drop stats " << this
98       << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
99       << eds_service_name_ << "}";
100 }
101 
~ClusterDropStats()102 LrsClient::ClusterDropStats::~ClusterDropStats() {
103   GRPC_TRACE_LOG(xds_client, INFO)
104       << "[lrs_client " << lrs_client_.get() << "] destroying drop stats "
105       << this << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
106       << eds_service_name_ << "}";
107   lrs_client_->RemoveClusterDropStats(lrs_server_, cluster_name_,
108                                       eds_service_name_, this);
109   lrs_client_.reset(DEBUG_LOCATION, "ClusterDropStats");
110 }
111 
112 LrsClient::ClusterDropStats::Snapshot
GetSnapshotAndReset()113 LrsClient::ClusterDropStats::GetSnapshotAndReset() {
114   Snapshot snapshot;
115   snapshot.uncategorized_drops = GetAndResetCounter(&uncategorized_drops_);
116   MutexLock lock(&mu_);
117   snapshot.categorized_drops = std::move(categorized_drops_);
118   return snapshot;
119 }
120 
AddUncategorizedDrops()121 void LrsClient::ClusterDropStats::AddUncategorizedDrops() {
122   uncategorized_drops_.fetch_add(1);
123 }
124 
AddCallDropped(const std::string & category)125 void LrsClient::ClusterDropStats::AddCallDropped(const std::string& category) {
126   MutexLock lock(&mu_);
127   ++categorized_drops_[category];
128 }
129 
130 //
131 // LrsClient::ClusterLocalityStats
132 //
133 
ClusterLocalityStats(RefCountedPtr<LrsClient> lrs_client,absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> name,RefCountedPtr<const BackendMetricPropagation> backend_metric_propagation)134 LrsClient::ClusterLocalityStats::ClusterLocalityStats(
135     RefCountedPtr<LrsClient> lrs_client, absl::string_view lrs_server,
136     absl::string_view cluster_name, absl::string_view eds_service_name,
137     RefCountedPtr<XdsLocalityName> name,
138     RefCountedPtr<const BackendMetricPropagation> backend_metric_propagation)
139     : RefCounted(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
140                      ? "ClusterLocalityStats"
141                      : nullptr),
142       lrs_client_(std::move(lrs_client)),
143       lrs_server_(lrs_server),
144       cluster_name_(cluster_name),
145       eds_service_name_(eds_service_name),
146       name_(std::move(name)),
147       backend_metric_propagation_(std::move(backend_metric_propagation)) {
148   GRPC_TRACE_LOG(xds_client, INFO)
149       << "[lrs_client " << lrs_client_.get() << "] created locality stats "
150       << this << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
151       << eds_service_name_ << ", "
152       << (name_ == nullptr ? "<none>" : name_->human_readable_string().c_str())
153       << ", propagation=" << backend_metric_propagation_->AsString() << "}";
154 }
155 
~ClusterLocalityStats()156 LrsClient::ClusterLocalityStats::~ClusterLocalityStats() {
157   GRPC_TRACE_LOG(xds_client, INFO)
158       << "[lrs_client " << lrs_client_.get() << "] destroying locality stats "
159       << this << " for {" << lrs_server_ << ", " << cluster_name_ << ", "
160       << eds_service_name_ << ", "
161       << (name_ == nullptr ? "<none>" : name_->human_readable_string().c_str())
162       << ", propagation=" << backend_metric_propagation_->AsString() << "}";
163   lrs_client_->RemoveClusterLocalityStats(lrs_server_, cluster_name_,
164                                           eds_service_name_, name_,
165                                           backend_metric_propagation_, this);
166   lrs_client_.reset(DEBUG_LOCATION, "ClusterLocalityStats");
167 }
168 
169 LrsClient::ClusterLocalityStats::Snapshot
GetSnapshotAndReset()170 LrsClient::ClusterLocalityStats::GetSnapshotAndReset() {
171   Snapshot snapshot;
172   for (auto& percpu_stats : stats_) {
173     Snapshot percpu_snapshot = {
174         GetAndResetCounter(&percpu_stats.total_successful_requests),
175         // Don't reset total_requests_in_progress because it's
176         // not related to a single reporting interval.
177         percpu_stats.total_requests_in_progress.load(std::memory_order_relaxed),
178         GetAndResetCounter(&percpu_stats.total_error_requests),
179         GetAndResetCounter(&percpu_stats.total_issued_requests),
180         {},
181         {},
182         {},
183         {}};
184     {
185       MutexLock lock(&percpu_stats.backend_metrics_mu);
186       percpu_snapshot.cpu_utilization = std::move(percpu_stats.cpu_utilization);
187       percpu_snapshot.mem_utilization = std::move(percpu_stats.mem_utilization);
188       percpu_snapshot.application_utilization =
189           std::move(percpu_stats.application_utilization);
190       percpu_snapshot.backend_metrics = std::move(percpu_stats.backend_metrics);
191     }
192     snapshot += percpu_snapshot;
193   }
194   return snapshot;
195 }
196 
AddCallStarted()197 void LrsClient::ClusterLocalityStats::AddCallStarted() {
198   Stats& stats = stats_.this_cpu();
199   stats.total_issued_requests.fetch_add(1, std::memory_order_relaxed);
200   stats.total_requests_in_progress.fetch_add(1, std::memory_order_relaxed);
201 }
202 
AddCallFinished(const BackendMetricData * backend_metrics,bool fail)203 void LrsClient::ClusterLocalityStats::AddCallFinished(
204     const BackendMetricData* backend_metrics, bool fail) {
205   Stats& stats = stats_.this_cpu();
206   std::atomic<uint64_t>& to_increment =
207       fail ? stats.total_error_requests : stats.total_successful_requests;
208   to_increment.fetch_add(1, std::memory_order_relaxed);
209   stats.total_requests_in_progress.fetch_add(-1, std::memory_order_acq_rel);
210   if (backend_metrics == nullptr) return;
211   MutexLock lock(&stats.backend_metrics_mu);
212   if (!XdsOrcaLrsPropagationChangesEnabled()) {
213     for (const auto& m : backend_metrics->named_metrics) {
214       stats.backend_metrics[std::string(m.first)] += BackendMetric(1, m.second);
215     }
216     return;
217   }
218   if (backend_metric_propagation_->propagation_bits &
219       BackendMetricPropagation::kCpuUtilization) {
220     stats.cpu_utilization += BackendMetric(1, backend_metrics->cpu_utilization);
221   }
222   if (backend_metric_propagation_->propagation_bits &
223       BackendMetricPropagation::kMemUtilization) {
224     stats.mem_utilization += BackendMetric(1, backend_metrics->mem_utilization);
225   }
226   if (backend_metric_propagation_->propagation_bits &
227       BackendMetricPropagation::kApplicationUtilization) {
228     stats.application_utilization +=
229         BackendMetric(1, backend_metrics->application_utilization);
230   }
231   if (backend_metric_propagation_->propagation_bits &
232           BackendMetricPropagation::kNamedMetricsAll ||
233       !backend_metric_propagation_->named_metric_keys.empty()) {
234     for (const auto& m : backend_metrics->named_metrics) {
235       if (backend_metric_propagation_->propagation_bits &
236               BackendMetricPropagation::kNamedMetricsAll ||
237           backend_metric_propagation_->named_metric_keys.contains(m.first)) {
238         stats.backend_metrics[absl::StrCat("named_metrics.", m.first)] +=
239             BackendMetric(1, m.second);
240       }
241     }
242   }
243 }
244 
245 //
246 // Internal class declarations
247 //
248 
249 // A call wrapper that can restart a call upon failure.
250 // The template parameter is the kind of wrapped call.
251 // TODO(roth): This is basically the same code as in XdsClient, and
252 // probably very similar to many other places in the codebase.
253 // Consider refactoring this into a common utility library somehow.
254 template <typename T>
255 class LrsClient::LrsChannel::RetryableCall final
256     : public InternallyRefCounted<RetryableCall<T>> {
257  public:
258   explicit RetryableCall(WeakRefCountedPtr<LrsChannel> lrs_channel);
259 
260   // Disable thread-safety analysis because this method is called via
261   // OrphanablePtr<>, but there's no way to pass the lock annotation
262   // through there.
263   void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
264 
265   void OnCallFinishedLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
266 
call() const267   T* call() const { return call_.get(); }
lrs_channel() const268   LrsChannel* lrs_channel() const { return lrs_channel_.get(); }
269 
270   bool IsCurrentCallOnChannel() const;
271 
272  private:
273   void StartNewCallLocked();
274   void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
275 
276   void OnRetryTimer();
277 
278   // The wrapped xds call that talks to the xds server. It's instantiated
279   // every time we start a new call. It's null during call retry backoff.
280   OrphanablePtr<T> call_;
281   // The owning xds channel.
282   WeakRefCountedPtr<LrsChannel> lrs_channel_;
283 
284   // Retry state.
285   BackOff backoff_;
286   absl::optional<EventEngine::TaskHandle> timer_handle_
287       ABSL_GUARDED_BY(&LrsClient::mu_);
288 
289   bool shutting_down_ = false;
290 };
291 
292 // An LRS call to the LRS server.
293 class LrsClient::LrsChannel::LrsCall final
294     : public InternallyRefCounted<LrsCall> {
295  public:
296   // The ctor and dtor should not be used directly.
297   explicit LrsCall(RefCountedPtr<RetryableCall<LrsCall>> retryable_call);
298 
299   void Orphan() override;
300 
retryable_call()301   RetryableCall<LrsCall>* retryable_call() { return retryable_call_.get(); }
lrs_channel() const302   LrsChannel* lrs_channel() const { return retryable_call_->lrs_channel(); }
lrs_client() const303   LrsClient* lrs_client() const { return lrs_channel()->lrs_client(); }
seen_response() const304   bool seen_response() const { return seen_response_; }
305 
306  private:
307   class StreamEventHandler final
308       : public XdsTransportFactory::XdsTransport::StreamingCall::EventHandler {
309    public:
StreamEventHandler(RefCountedPtr<LrsCall> lrs_call)310     explicit StreamEventHandler(RefCountedPtr<LrsCall> lrs_call)
311         : lrs_call_(std::move(lrs_call)) {}
312 
OnRequestSent(bool)313     void OnRequestSent(bool /*ok*/) override { lrs_call_->OnRequestSent(); }
OnRecvMessage(absl::string_view payload)314     void OnRecvMessage(absl::string_view payload) override {
315       lrs_call_->OnRecvMessage(payload);
316     }
OnStatusReceived(absl::Status status)317     void OnStatusReceived(absl::Status status) override {
318       lrs_call_->OnStatusReceived(std::move(status));
319     }
320 
321    private:
322     RefCountedPtr<LrsCall> lrs_call_;
323   };
324 
325   // A repeating timer for a particular duration.
326   class Timer final : public InternallyRefCounted<Timer> {
327    public:
Timer(RefCountedPtr<LrsCall> lrs_call)328     explicit Timer(RefCountedPtr<LrsCall> lrs_call)
329         : lrs_call_(std::move(lrs_call)) {}
~Timer()330     ~Timer() override { lrs_call_.reset(DEBUG_LOCATION, "LRS timer"); }
331 
332     // Disable thread-safety analysis because this method is called via
333     // OrphanablePtr<>, but there's no way to pass the lock annotation
334     // through there.
335     void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS;
336 
337     void ScheduleNextReportLocked()
338         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
339 
340    private:
IsCurrentTimerOnCall() const341     bool IsCurrentTimerOnCall() const {
342       return this == lrs_call_->timer_.get();
343     }
lrs_client() const344     LrsClient* lrs_client() const { return lrs_call_->lrs_client(); }
345 
346     void OnNextReportTimer();
347 
348     // The owning LRS call.
349     RefCountedPtr<LrsCall> lrs_call_;
350 
351     absl::optional<EventEngine::TaskHandle> timer_handle_
352         ABSL_GUARDED_BY(&LrsClient::mu_);
353   };
354 
355   void MaybeScheduleNextReportLocked()
356       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
357 
358   void SendReportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
359 
360   void SendMessageLocked(std::string payload)
361       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&LrsClient::mu_);
362 
363   void OnRequestSent();
364   void OnRecvMessage(absl::string_view payload);
365   void OnStatusReceived(absl::Status status);
366 
367   bool IsCurrentCallOnChannel() const;
368 
369   // The owning RetryableCall<>.
370   RefCountedPtr<RetryableCall<LrsCall>> retryable_call_;
371 
372   OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
373       streaming_call_;
374 
375   bool seen_response_ = false;
376   bool send_message_pending_ ABSL_GUARDED_BY(&LrsClient::mu_) = false;
377 
378   // Load reporting state.
379   bool send_all_clusters_ = false;
380   std::set<std::string> cluster_names_;  // Asked for by the LRS server.
381   Duration load_reporting_interval_;
382   bool last_report_counters_were_zero_ = false;
383   OrphanablePtr<Timer> timer_;
384 };
385 
386 //
387 // LrsClient::LrsChannel
388 //
389 
LrsChannel(WeakRefCountedPtr<LrsClient> lrs_client,std::shared_ptr<const XdsBootstrap::XdsServer> server)390 LrsClient::LrsChannel::LrsChannel(
391     WeakRefCountedPtr<LrsClient> lrs_client,
392     std::shared_ptr<const XdsBootstrap::XdsServer> server)
393     : DualRefCounted<LrsChannel>(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
394                                      ? "LrsChannel"
395                                      : nullptr),
396       lrs_client_(std::move(lrs_client)),
397       server_(std::move(server)) {
398   GRPC_TRACE_LOG(xds_client, INFO)
399       << "[lrs_client " << lrs_client_.get() << "] creating channel " << this
400       << " for server " << server_->server_uri();
401   absl::Status status;
402   transport_ = lrs_client_->transport_factory_->GetTransport(*server_, &status);
403   CHECK(transport_ != nullptr);
404   if (!status.ok()) {
405     LOG(ERROR) << "Error creating LRS channel to " << server_->server_uri()
406                << ": " << status;
407   }
408 }
409 
~LrsChannel()410 LrsClient::LrsChannel::~LrsChannel() {
411   GRPC_TRACE_LOG(xds_client, INFO)
412       << "[lrs_client " << lrs_client() << "] destroying lrs channel " << this
413       << " for server " << server_->server_uri();
414   lrs_client_.reset(DEBUG_LOCATION, "LrsChannel");
415 }
416 
417 // This method should only ever be called when holding the lock, but we can't
418 // use a ABSL_EXCLUSIVE_LOCKS_REQUIRED annotation, because Orphan() will be
419 // called from DualRefCounted::Unref(), which cannot have a lock annotation for
420 // a lock in this subclass.
Orphaned()421 void LrsClient::LrsChannel::Orphaned() ABSL_NO_THREAD_SAFETY_ANALYSIS {
422   GRPC_TRACE_LOG(xds_client, INFO)
423       << "[lrs_client " << lrs_client() << "] orphaning lrs channel " << this
424       << " for server " << server_->server_uri();
425   transport_.reset();
426   // At this time, all strong refs are removed, remove from channel map to
427   // prevent subsequent subscription from trying to use this LrsChannel as
428   // it is shutting down.
429   lrs_client_->lrs_channel_map_.erase(server_->Key());
430   lrs_call_.reset();
431 }
432 
ResetBackoff()433 void LrsClient::LrsChannel::ResetBackoff() { transport_->ResetBackoff(); }
434 
MaybeStartLrsCall()435 void LrsClient::LrsChannel::MaybeStartLrsCall() {
436   if (lrs_call_ != nullptr) return;
437   lrs_call_ = MakeOrphanable<RetryableCall<LrsCall>>(
438       WeakRef(DEBUG_LOCATION, "LrsCall"));
439 }
440 
StopLrsCallLocked()441 void LrsClient::LrsChannel::StopLrsCallLocked() {
442   lrs_client_->load_report_map_.erase(server_->Key());
443   lrs_call_.reset();
444 }
445 
446 //
447 // LrsClient::LrsChannel::RetryableCall<>
448 //
449 
450 template <typename T>
RetryableCall(WeakRefCountedPtr<LrsChannel> lrs_channel)451 LrsClient::LrsChannel::RetryableCall<T>::RetryableCall(
452     WeakRefCountedPtr<LrsChannel> lrs_channel)
453     : lrs_channel_(std::move(lrs_channel)),
454       backoff_(BackOff::Options()
455                    .set_initial_backoff(Duration::Seconds(
456                        GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS))
457                    .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
458                    .set_jitter(GRPC_XDS_RECONNECT_JITTER)
459                    .set_max_backoff(Duration::Seconds(
460                        GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS))) {
461   StartNewCallLocked();
462 }
463 
464 template <typename T>
Orphan()465 void LrsClient::LrsChannel::RetryableCall<T>::Orphan() {
466   shutting_down_ = true;
467   call_.reset();
468   if (timer_handle_.has_value()) {
469     lrs_channel()->lrs_client()->engine()->Cancel(*timer_handle_);
470     timer_handle_.reset();
471   }
472   this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
473 }
474 
475 template <typename T>
OnCallFinishedLocked()476 void LrsClient::LrsChannel::RetryableCall<T>::OnCallFinishedLocked() {
477   // If we saw a response on the current stream, reset backoff.
478   if (call_->seen_response()) backoff_.Reset();
479   call_.reset();
480   // Start retry timer.
481   StartRetryTimerLocked();
482 }
483 
484 template <typename T>
StartNewCallLocked()485 void LrsClient::LrsChannel::RetryableCall<T>::StartNewCallLocked() {
486   if (shutting_down_) return;
487   CHECK(lrs_channel_->transport_ != nullptr);
488   CHECK(call_ == nullptr);
489   GRPC_TRACE_LOG(xds_client, INFO)
490       << "[lrs_client " << lrs_channel()->lrs_client() << "] lrs server "
491       << lrs_channel()->server_->server_uri()
492       << ": start new call from retryable call " << this;
493   call_ = MakeOrphanable<T>(
494       this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
495 }
496 
497 template <typename T>
StartRetryTimerLocked()498 void LrsClient::LrsChannel::RetryableCall<T>::StartRetryTimerLocked() {
499   if (shutting_down_) return;
500   const Duration delay = backoff_.NextAttemptDelay();
501   GRPC_TRACE_LOG(xds_client, INFO)
502       << "[lrs_client " << lrs_channel()->lrs_client() << "] lrs server "
503       << lrs_channel()->server_->server_uri()
504       << ": call attempt failed; retry timer will fire in " << delay.millis()
505       << "ms.";
506   timer_handle_ = lrs_channel()->lrs_client()->engine()->RunAfter(
507       delay,
508       [self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() {
509         ApplicationCallbackExecCtx callback_exec_ctx;
510         ExecCtx exec_ctx;
511         self->OnRetryTimer();
512       });
513 }
514 
515 template <typename T>
OnRetryTimer()516 void LrsClient::LrsChannel::RetryableCall<T>::OnRetryTimer() {
517   MutexLock lock(&lrs_channel_->lrs_client()->mu_);
518   if (timer_handle_.has_value()) {
519     timer_handle_.reset();
520     if (shutting_down_) return;
521     GRPC_TRACE_LOG(xds_client, INFO)
522         << "[lrs_client " << lrs_channel()->lrs_client() << "] lrs server "
523         << lrs_channel()->server_->server_uri()
524         << ": retry timer fired (retryable call: " << this << ")";
525     StartNewCallLocked();
526   }
527 }
528 
529 //
530 // LrsClient::LrsChannel::LrsCall::Timer
531 //
532 
Orphan()533 void LrsClient::LrsChannel::LrsCall::Timer::Orphan() {
534   if (timer_handle_.has_value()) {
535     lrs_client()->engine()->Cancel(*timer_handle_);
536     timer_handle_.reset();
537   }
538   Unref(DEBUG_LOCATION, "Orphan");
539 }
540 
ScheduleNextReportLocked()541 void LrsClient::LrsChannel::LrsCall::Timer::ScheduleNextReportLocked() {
542   GRPC_TRACE_LOG(xds_client, INFO)
543       << "[lrs_client " << lrs_client() << "] lrs server "
544       << lrs_call_->lrs_channel()->server_->server_uri()
545       << ": scheduling next load report in "
546       << lrs_call_->load_reporting_interval_;
547   timer_handle_ = lrs_client()->engine()->RunAfter(
548       lrs_call_->load_reporting_interval_,
549       [self = Ref(DEBUG_LOCATION, "timer")]() {
550         ApplicationCallbackExecCtx callback_exec_ctx;
551         ExecCtx exec_ctx;
552         self->OnNextReportTimer();
553       });
554 }
555 
OnNextReportTimer()556 void LrsClient::LrsChannel::LrsCall::Timer::OnNextReportTimer() {
557   MutexLock lock(&lrs_client()->mu_);
558   timer_handle_.reset();
559   if (IsCurrentTimerOnCall()) lrs_call_->SendReportLocked();
560 }
561 
562 //
563 // LrsClient::LrsChannel::LrsCall
564 //
565 
LrsCall(RefCountedPtr<RetryableCall<LrsCall>> retryable_call)566 LrsClient::LrsChannel::LrsCall::LrsCall(
567     RefCountedPtr<RetryableCall<LrsCall>> retryable_call)
568     : InternallyRefCounted<LrsCall>(
569           GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) ? "LrsCall" : nullptr),
570       retryable_call_(std::move(retryable_call)) {
571   // Init the LRS call. Note that the call will progress every time there's
572   // activity in lrs_client()->interested_parties_, which is comprised of
573   // the polling entities from client_channel.
574   CHECK_NE(lrs_client(), nullptr);
575   const char* method =
576       "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats";
577   streaming_call_ = lrs_channel()->transport_->CreateStreamingCall(
578       method, std::make_unique<StreamEventHandler>(
579                   // Passing the initial ref here.  This ref will go away when
580                   // the StreamEventHandler is destroyed.
581                   RefCountedPtr<LrsCall>(this)));
582   CHECK(streaming_call_ != nullptr);
583   // Start the call.
584   GRPC_TRACE_LOG(xds_client, INFO)
585       << "[lrs_client " << lrs_client() << "] lrs server "
586       << lrs_channel()->server_->server_uri()
587       << ": starting LRS call (lrs_call=" << this
588       << ", streaming_call=" << streaming_call_.get() << ")";
589   // Send the initial request.
590   std::string serialized_payload = lrs_client()->CreateLrsInitialRequest();
591   SendMessageLocked(std::move(serialized_payload));
592   // Read initial response.
593   streaming_call_->StartRecvMessage();
594 }
595 
Orphan()596 void LrsClient::LrsChannel::LrsCall::Orphan() {
597   timer_.reset();
598   // Note that the initial ref is held by the StreamEventHandler, which
599   // will be destroyed when streaming_call_ is destroyed, which may not happen
600   // here, since there may be other refs held to streaming_call_ by internal
601   // callbacks.
602   streaming_call_.reset();
603 }
604 
MaybeScheduleNextReportLocked()605 void LrsClient::LrsChannel::LrsCall::MaybeScheduleNextReportLocked() {
606   // If there are no more registered stats to report, cancel the call.
607   auto it = lrs_client()->load_report_map_.find(lrs_channel()->server_->Key());
608   if (it == lrs_client()->load_report_map_.end() ||
609       it->second.load_report_map.empty()) {
610     it->second.lrs_channel->StopLrsCallLocked();
611     return;
612   }
613   // Don't start if the previous send_message op hasn't completed yet.
614   // If this happens, we'll be called again from OnRequestSent().
615   if (send_message_pending_) return;
616   // Don't start if no LRS response has arrived.
617   if (!seen_response()) return;
618   // If there is no timer, create one.
619   // This happens on the initial response and whenever the interval changes.
620   if (timer_ == nullptr) {
621     timer_ = MakeOrphanable<Timer>(Ref(DEBUG_LOCATION, "LRS timer"));
622   }
623   // Schedule the next load report.
624   timer_->ScheduleNextReportLocked();
625 }
626 
LoadReportCountersAreZero(const ClusterLoadReportMap & snapshot)627 bool LrsClient::LoadReportCountersAreZero(
628     const ClusterLoadReportMap& snapshot) {
629   for (const auto& p : snapshot) {
630     const ClusterLoadReport& cluster_snapshot = p.second;
631     if (!cluster_snapshot.dropped_requests.IsZero()) return false;
632     for (const auto& q : cluster_snapshot.locality_stats) {
633       const ClusterLocalityStats::Snapshot& locality_snapshot = q.second;
634       if (!locality_snapshot.IsZero()) return false;
635     }
636   }
637   return true;
638 }
639 
SendReportLocked()640 void LrsClient::LrsChannel::LrsCall::SendReportLocked() {
641   // Construct snapshot from all reported stats.
642   ClusterLoadReportMap snapshot = lrs_client()->BuildLoadReportSnapshotLocked(
643       *lrs_channel()->server_, send_all_clusters_, cluster_names_);
644   // Skip client load report if the counters were all zero in the last
645   // report and they are still zero in this one.
646   const bool old_val = last_report_counters_were_zero_;
647   last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
648   if (old_val && last_report_counters_were_zero_) {
649     MaybeScheduleNextReportLocked();
650     return;
651   }
652   // Send a request that contains the snapshot.
653   std::string serialized_payload =
654       lrs_client()->CreateLrsRequest(std::move(snapshot));
655   SendMessageLocked(std::move(serialized_payload));
656 }
657 
SendMessageLocked(std::string payload)658 void LrsClient::LrsChannel::LrsCall::SendMessageLocked(std::string payload) {
659   send_message_pending_ = true;
660   streaming_call_->SendMessage(std::move(payload));
661 }
662 
OnRequestSent()663 void LrsClient::LrsChannel::LrsCall::OnRequestSent() {
664   MutexLock lock(&lrs_client()->mu_);
665   send_message_pending_ = false;
666   if (IsCurrentCallOnChannel()) MaybeScheduleNextReportLocked();
667 }
668 
OnRecvMessage(absl::string_view payload)669 void LrsClient::LrsChannel::LrsCall::OnRecvMessage(absl::string_view payload) {
670   MutexLock lock(&lrs_client()->mu_);
671   // If we're no longer the current call, ignore the result.
672   if (!IsCurrentCallOnChannel()) return;
673   // Start recv after any code branch
674   auto cleanup = absl::MakeCleanup(
675       [call = streaming_call_.get()]() { call->StartRecvMessage(); });
676   // Parse the response.
677   bool send_all_clusters = false;
678   std::set<std::string> new_cluster_names;
679   Duration new_load_reporting_interval;
680   absl::Status status = lrs_client()->ParseLrsResponse(
681       payload, &send_all_clusters, &new_cluster_names,
682       &new_load_reporting_interval);
683   if (!status.ok()) {
684     LOG(ERROR) << "[lrs_client " << lrs_client() << "] lrs server "
685                << lrs_channel()->server_->server_uri()
686                << ": LRS response parsing failed: " << status;
687     return;
688   }
689   seen_response_ = true;
690   if (GRPC_TRACE_FLAG_ENABLED(xds_client)) {
691     LOG(INFO) << "[lrs_client " << lrs_client() << "] lrs server "
692               << lrs_channel()->server_->server_uri()
693               << ": LRS response received, " << new_cluster_names.size()
694               << " cluster names, send_all_clusters=" << send_all_clusters
695               << ", load_report_interval="
696               << new_load_reporting_interval.millis() << "ms";
697     size_t i = 0;
698     for (const auto& name : new_cluster_names) {
699       LOG(INFO) << "[lrs_client " << lrs_client() << "] cluster_name " << i++
700                 << ": " << name;
701     }
702   }
703   if (new_load_reporting_interval <
704       Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS)) {
705     new_load_reporting_interval =
706         Duration::Milliseconds(GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
707     GRPC_TRACE_LOG(xds_client, INFO)
708         << "[lrs_client " << lrs_client() << "] lrs server "
709         << lrs_channel()->server_->server_uri()
710         << ": increased load_report_interval to minimum value "
711         << GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS << "ms";
712   }
713   // Ignore identical update.
714   if (send_all_clusters == send_all_clusters_ &&
715       cluster_names_ == new_cluster_names &&
716       load_reporting_interval_ == new_load_reporting_interval) {
717     GRPC_TRACE_LOG(xds_client, INFO)
718         << "[lrs_client " << lrs_client() << "] lrs server "
719         << lrs_channel()->server_->server_uri()
720         << ": incoming LRS response identical to current, ignoring.";
721     return;
722   }
723   // If the interval has changed, we'll need to restart the timer below.
724   const bool restart_timer =
725       load_reporting_interval_ != new_load_reporting_interval;
726   // Record the new config.
727   send_all_clusters_ = send_all_clusters;
728   cluster_names_ = std::move(new_cluster_names);
729   load_reporting_interval_ = new_load_reporting_interval;
730   // Restart timer if needed.
731   if (restart_timer) {
732     timer_.reset();
733     MaybeScheduleNextReportLocked();
734   }
735 }
736 
OnStatusReceived(absl::Status status)737 void LrsClient::LrsChannel::LrsCall::OnStatusReceived(absl::Status status) {
738   MutexLock lock(&lrs_client()->mu_);
739   GRPC_TRACE_LOG(xds_client, INFO)
740       << "[lrs_client " << lrs_client() << "] lrs server "
741       << lrs_channel()->server_->server_uri()
742       << ": LRS call status received (lrs_channel=" << lrs_channel()
743       << ", lrs_call=" << this << ", streaming_call=" << streaming_call_.get()
744       << "): " << status;
745   // Ignore status from a stale call.
746   if (IsCurrentCallOnChannel()) {
747     // Try to restart the call.
748     retryable_call_->OnCallFinishedLocked();
749   }
750 }
751 
IsCurrentCallOnChannel() const752 bool LrsClient::LrsChannel::LrsCall::IsCurrentCallOnChannel() const {
753   // If the retryable LRS call is null (which only happens when the lrs
754   // channel is shutting down), all the LRS calls are stale.
755   if (lrs_channel()->lrs_call_ == nullptr) return false;
756   return this == lrs_channel()->lrs_call_->call();
757 }
758 
759 //
760 // LrsClient
761 //
762 
LrsClient(std::shared_ptr<XdsBootstrap> bootstrap,std::string user_agent_name,std::string user_agent_version,RefCountedPtr<XdsTransportFactory> transport_factory,std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine)763 LrsClient::LrsClient(
764     std::shared_ptr<XdsBootstrap> bootstrap, std::string user_agent_name,
765     std::string user_agent_version,
766     RefCountedPtr<XdsTransportFactory> transport_factory,
767     std::shared_ptr<grpc_event_engine::experimental::EventEngine> engine)
768     : DualRefCounted<LrsClient>(
769           GRPC_TRACE_FLAG_ENABLED(xds_client_refcount) ? "LrsClient" : nullptr),
770       bootstrap_(std::move(bootstrap)),
771       user_agent_name_(std::move(user_agent_name)),
772       user_agent_version_(std::move(user_agent_version)),
773       transport_factory_(std::move(transport_factory)),
774       engine_(std::move(engine)) {
775   GRPC_TRACE_LOG(xds_client, INFO)
776       << "[lrs_client " << this << "] creating lrs client";
777 }
778 
~LrsClient()779 LrsClient::~LrsClient() {
780   GRPC_TRACE_LOG(xds_client, INFO)
781       << "[lrs_client " << this << "] destroying lrs client";
782 }
783 
Orphaned()784 void LrsClient::Orphaned() {
785   GRPC_TRACE_LOG(xds_client, INFO)
786       << "[lrs_client " << this << "] shutting down lrs client";
787   MutexLock lock(&mu_);
788   // We may still be sending lingering queued load report data, so don't
789   // just clear the load reporting map, but we do want to clear the refs
790   // we're holding to the LrsChannel objects, to make sure that
791   // everything shuts down properly.
792   for (auto& p : load_report_map_) {
793     p.second.lrs_channel.reset(DEBUG_LOCATION, "LrsClient::Orphan()");
794   }
795 }
796 
GetOrCreateLrsChannelLocked(std::shared_ptr<const XdsBootstrap::XdsServer> server,const char * reason)797 RefCountedPtr<LrsClient::LrsChannel> LrsClient::GetOrCreateLrsChannelLocked(
798     std::shared_ptr<const XdsBootstrap::XdsServer> server, const char* reason) {
799   std::string key = server->Key();
800   auto it = lrs_channel_map_.find(key);
801   if (it != lrs_channel_map_.end()) {
802     return it->second->Ref(DEBUG_LOCATION, reason);
803   }
804   // Channel not found, so create a new one.
805   auto lrs_channel = MakeRefCounted<LrsChannel>(
806       WeakRef(DEBUG_LOCATION, "LrsChannel"), std::move(server));
807   lrs_channel_map_[std::move(key)] = lrs_channel.get();
808   return lrs_channel;
809 }
810 
AddClusterDropStats(std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name)811 RefCountedPtr<LrsClient::ClusterDropStats> LrsClient::AddClusterDropStats(
812     std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server,
813     absl::string_view cluster_name, absl::string_view eds_service_name) {
814   auto key =
815       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
816   RefCountedPtr<ClusterDropStats> cluster_drop_stats;
817   {
818     MutexLock lock(&mu_);
819     // We jump through some hoops here to make sure that the
820     // absl::string_views stored in the ClusterDropStats object point
821     // to the strings in the load_report_map_ keys, so that
822     // they have the same lifetime.
823     auto server_it =
824         load_report_map_.emplace(lrs_server->Key(), LoadReportServer()).first;
825     if (server_it->second.lrs_channel == nullptr) {
826       server_it->second.lrs_channel = GetOrCreateLrsChannelLocked(
827           lrs_server, "load report map (drop stats)");
828     }
829     auto load_report_it = server_it->second.load_report_map
830                               .emplace(std::move(key), LoadReportState())
831                               .first;
832     LoadReportState& load_report_state = load_report_it->second;
833     if (load_report_state.drop_stats != nullptr) {
834       cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
835     }
836     if (cluster_drop_stats == nullptr) {
837       if (load_report_state.drop_stats != nullptr) {
838         load_report_state.deleted_drop_stats +=
839             load_report_state.drop_stats->GetSnapshotAndReset();
840       }
841       cluster_drop_stats = MakeRefCounted<ClusterDropStats>(
842           Ref(DEBUG_LOCATION, "DropStats"), server_it->first /*lrs_server*/,
843           load_report_it->first.first /*cluster_name*/,
844           load_report_it->first.second /*eds_service_name*/);
845       load_report_state.drop_stats = cluster_drop_stats.get();
846     }
847     server_it->second.lrs_channel->MaybeStartLrsCall();
848   }
849   return cluster_drop_stats;
850 }
851 
RemoveClusterDropStats(absl::string_view lrs_server_key,absl::string_view cluster_name,absl::string_view eds_service_name,LrsClient::ClusterDropStats * cluster_drop_stats)852 void LrsClient::RemoveClusterDropStats(
853     absl::string_view lrs_server_key, absl::string_view cluster_name,
854     absl::string_view eds_service_name,
855     LrsClient::ClusterDropStats* cluster_drop_stats) {
856   MutexLock lock(&mu_);
857   auto server_it = load_report_map_.find(lrs_server_key);
858   if (server_it == load_report_map_.end()) return;
859   auto load_report_it = server_it->second.load_report_map.find(
860       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
861   if (load_report_it == server_it->second.load_report_map.end()) return;
862   LoadReportState& load_report_state = load_report_it->second;
863   if (load_report_state.drop_stats == cluster_drop_stats) {
864     // Record final snapshot in deleted_drop_stats, which will be
865     // added to the next load report.
866     load_report_state.deleted_drop_stats +=
867         load_report_state.drop_stats->GetSnapshotAndReset();
868     load_report_state.drop_stats = nullptr;
869   }
870 }
871 
872 RefCountedPtr<LrsClient::ClusterLocalityStats>
AddClusterLocalityStats(std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> locality,RefCountedPtr<const BackendMetricPropagation> backend_metric_propagation)873 LrsClient::AddClusterLocalityStats(
874     std::shared_ptr<const XdsBootstrap::XdsServer> lrs_server,
875     absl::string_view cluster_name, absl::string_view eds_service_name,
876     RefCountedPtr<XdsLocalityName> locality,
877     RefCountedPtr<const BackendMetricPropagation> backend_metric_propagation) {
878   auto key =
879       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
880   RefCountedPtr<ClusterLocalityStats> cluster_locality_stats;
881   {
882     MutexLock lock(&mu_);
883     // We jump through some hoops here to make sure that the
884     // absl::string_views stored in the ClusterLocalityStats object point
885     // to the strings in the load_report_map_ keys, so that
886     // they have the same lifetime.
887     auto server_it =
888         load_report_map_.emplace(lrs_server->Key(), LoadReportServer()).first;
889     if (server_it->second.lrs_channel == nullptr) {
890       server_it->second.lrs_channel = GetOrCreateLrsChannelLocked(
891           std::move(lrs_server), "load report map (locality stats)");
892     }
893     auto load_report_it = server_it->second.load_report_map
894                               .emplace(std::move(key), LoadReportState())
895                               .first;
896     LoadReportState& load_report_state = load_report_it->second;
897     LoadReportState::LocalityState& locality_state =
898         load_report_state.locality_stats[locality];
899     ClusterLocalityStats*& locality_stats =
900         locality_state.propagation_stats[backend_metric_propagation];
901     if (locality_stats != nullptr) {
902       cluster_locality_stats = locality_stats->RefIfNonZero();
903     }
904     if (cluster_locality_stats == nullptr) {
905       if (locality_stats != nullptr) {
906         locality_state.deleted_locality_stats +=
907             locality_stats->GetSnapshotAndReset();
908       }
909       cluster_locality_stats = MakeRefCounted<ClusterLocalityStats>(
910           Ref(DEBUG_LOCATION, "LocalityStats"), server_it->first /*lrs_server*/,
911           load_report_it->first.first /*cluster_name*/,
912           load_report_it->first.second /*eds_service_name*/,
913           std::move(locality), std::move(backend_metric_propagation));
914       locality_stats = cluster_locality_stats.get();
915     }
916     server_it->second.lrs_channel->MaybeStartLrsCall();
917   }
918   return cluster_locality_stats;
919 }
920 
RemoveClusterLocalityStats(absl::string_view lrs_server_key,absl::string_view cluster_name,absl::string_view eds_service_name,const RefCountedPtr<XdsLocalityName> & locality,const RefCountedPtr<const BackendMetricPropagation> & backend_metric_propagation,ClusterLocalityStats * cluster_locality_stats)921 void LrsClient::RemoveClusterLocalityStats(
922     absl::string_view lrs_server_key, absl::string_view cluster_name,
923     absl::string_view eds_service_name,
924     const RefCountedPtr<XdsLocalityName>& locality,
925     const RefCountedPtr<const BackendMetricPropagation>&
926         backend_metric_propagation,
927     ClusterLocalityStats* cluster_locality_stats) {
928   MutexLock lock(&mu_);
929   auto server_it = load_report_map_.find(lrs_server_key);
930   if (server_it == load_report_map_.end()) return;
931   auto load_report_it = server_it->second.load_report_map.find(
932       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
933   if (load_report_it == server_it->second.load_report_map.end()) return;
934   LoadReportState& load_report_state = load_report_it->second;
935   auto locality_it = load_report_state.locality_stats.find(locality);
936   if (locality_it == load_report_state.locality_stats.end()) return;
937   LoadReportState::LocalityState& locality_state = locality_it->second;
938   auto propagation_it =
939       locality_state.propagation_stats.find(backend_metric_propagation);
940   if (propagation_it == locality_state.propagation_stats.end()) return;
941   ClusterLocalityStats* locality_stats = propagation_it->second;
942   if (locality_stats == cluster_locality_stats) {
943     // Record final snapshot in deleted_locality_stats, which will be
944     // added to the next load report.
945     locality_state.deleted_locality_stats +=
946         locality_stats->GetSnapshotAndReset();
947     locality_state.propagation_stats.erase(propagation_it);
948   }
949 }
950 
ResetBackoff()951 void LrsClient::ResetBackoff() {
952   MutexLock lock(&mu_);
953   for (auto& p : lrs_channel_map_) {
954     p.second->ResetBackoff();
955   }
956 }
957 
BuildLoadReportSnapshotLocked(const XdsBootstrap::XdsServer & lrs_server,bool send_all_clusters,const std::set<std::string> & clusters)958 LrsClient::ClusterLoadReportMap LrsClient::BuildLoadReportSnapshotLocked(
959     const XdsBootstrap::XdsServer& lrs_server, bool send_all_clusters,
960     const std::set<std::string>& clusters) {
961   GRPC_TRACE_LOG(xds_client, INFO)
962       << "[lrs_client " << this << "] start building load report";
963   ClusterLoadReportMap snapshot_map;
964   auto server_it = load_report_map_.find(lrs_server.Key());
965   if (server_it == load_report_map_.end()) return snapshot_map;
966   auto& load_report_map = server_it->second.load_report_map;
967   for (auto load_report_it = load_report_map.begin();
968        load_report_it != load_report_map.end();) {
969     // Cluster key is cluster and EDS service name.
970     const auto& cluster_key = load_report_it->first;
971     LoadReportState& load_report = load_report_it->second;
972     // If the CDS response for a cluster indicates to use LRS but the
973     // LRS server does not say that it wants reports for this cluster,
974     // then we'll have stats objects here whose data we're not going to
975     // include in the load report.  However, we still need to clear out
976     // the data from the stats objects, so that if the LRS server starts
977     // asking for the data in the future, we don't incorrectly include
978     // data from previous reporting intervals in that future report.
979     const bool record_stats =
980         send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
981     ClusterLoadReport snapshot;
982     // Aggregate drop stats.
983     snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
984     if (load_report.drop_stats != nullptr) {
985       snapshot.dropped_requests +=
986           load_report.drop_stats->GetSnapshotAndReset();
987       GRPC_TRACE_LOG(xds_client, INFO)
988           << "[lrs_client " << this << "] cluster=" << cluster_key.first
989           << " eds_service_name=" << cluster_key.second
990           << " drop_stats=" << load_report.drop_stats;
991     }
992     // Aggregate locality stats.
993     for (auto it = load_report.locality_stats.begin();
994          it != load_report.locality_stats.end();) {
995       const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
996       auto& locality_state = it->second;
997       ClusterLocalityStats::Snapshot& locality_snapshot =
998           snapshot.locality_stats[locality_name];
999       locality_snapshot = std::move(locality_state.deleted_locality_stats);
1000       for (const auto& p : locality_state.propagation_stats) {
1001         ClusterLocalityStats* locality_stats = p.second;
1002         if (locality_stats != nullptr) {
1003           locality_snapshot += locality_stats->GetSnapshotAndReset();
1004           GRPC_TRACE_LOG(xds_client, INFO)
1005               << "[lrs_client " << this
1006               << "] cluster=" << cluster_key.first.c_str()
1007               << " eds_service_name=" << cluster_key.second.c_str()
1008               << " locality=" << locality_name->human_readable_string().c_str()
1009               << " propagation=" << p.first->AsString()
1010               << " locality_stats=" << locality_stats;
1011         }
1012       }
1013       // If the only thing left in this entry was final snapshots from
1014       // deleted locality stats objects, remove the entry.
1015       if (locality_state.propagation_stats.empty()) {
1016         it = load_report.locality_stats.erase(it);
1017       } else {
1018         ++it;
1019       }
1020     }
1021     // Compute load report interval.
1022     const Timestamp now = Timestamp::Now();
1023     snapshot.load_report_interval = now - load_report.last_report_time;
1024     load_report.last_report_time = now;
1025     // Record snapshot.
1026     if (record_stats) {
1027       snapshot_map[cluster_key] = std::move(snapshot);
1028     }
1029     // If the only thing left in this entry was final snapshots from
1030     // deleted stats objects, remove the entry.
1031     if (load_report.locality_stats.empty() &&
1032         load_report.drop_stats == nullptr) {
1033       load_report_it = load_report_map.erase(load_report_it);
1034     } else {
1035       ++load_report_it;
1036     }
1037   }
1038   return snapshot_map;
1039 }
1040 
1041 namespace {
1042 
1043 struct LrsApiContext {
1044   LrsClient* client;
1045   upb_DefPool* def_pool;
1046   upb_Arena* arena;
1047 };
1048 
MaybeLogLrsRequest(const LrsApiContext & context,const envoy_service_load_stats_v3_LoadStatsRequest * request)1049 void MaybeLogLrsRequest(
1050     const LrsApiContext& context,
1051     const envoy_service_load_stats_v3_LoadStatsRequest* request) {
1052   if (GRPC_TRACE_FLAG_ENABLED(xds_client) && ABSL_VLOG_IS_ON(2)) {
1053     const upb_MessageDef* msg_type =
1054         envoy_service_load_stats_v3_LoadStatsRequest_getmsgdef(
1055             context.def_pool);
1056     char buf[10240];
1057     upb_TextEncode(reinterpret_cast<const upb_Message*>(request), msg_type,
1058                    nullptr, 0, buf, sizeof(buf));
1059     VLOG(2) << "[lrs_client " << context.client
1060             << "] constructed LRS request: " << buf;
1061   }
1062 }
1063 
SerializeLrsRequest(const LrsApiContext & context,const envoy_service_load_stats_v3_LoadStatsRequest * request)1064 std::string SerializeLrsRequest(
1065     const LrsApiContext& context,
1066     const envoy_service_load_stats_v3_LoadStatsRequest* request) {
1067   size_t output_length;
1068   char* output = envoy_service_load_stats_v3_LoadStatsRequest_serialize(
1069       request, context.arena, &output_length);
1070   return std::string(output, output_length);
1071 }
1072 
1073 }  // namespace
1074 
CreateLrsInitialRequest()1075 std::string LrsClient::CreateLrsInitialRequest() {
1076   upb::Arena arena;
1077   const LrsApiContext context = {this, def_pool_.ptr(), arena.ptr()};
1078   // Create a request.
1079   envoy_service_load_stats_v3_LoadStatsRequest* request =
1080       envoy_service_load_stats_v3_LoadStatsRequest_new(arena.ptr());
1081   // Populate node.
1082   envoy_config_core_v3_Node* node_msg =
1083       envoy_service_load_stats_v3_LoadStatsRequest_mutable_node(request,
1084                                                                 arena.ptr());
1085   PopulateXdsNode(bootstrap_->node(), user_agent_name_, user_agent_version_,
1086                   node_msg, arena.ptr());
1087   envoy_config_core_v3_Node_add_client_features(
1088       node_msg,
1089       upb_StringView_FromString("envoy.lrs.supports_send_all_clusters"),
1090       arena.ptr());
1091   MaybeLogLrsRequest(context, request);
1092   return SerializeLrsRequest(context, request);
1093 }
1094 
1095 namespace {
1096 
MaybeAddUnnamedMetric(const LrsApiContext & context,const LrsClient::ClusterLocalityStats::BackendMetric & backend_metric,envoy_config_endpoint_v3_UnnamedEndpointLoadMetricStats * (* add_field)(envoy_config_endpoint_v3_UpstreamLocalityStats *,upb_Arena *),envoy_config_endpoint_v3_UpstreamLocalityStats * output)1097 void MaybeAddUnnamedMetric(
1098     const LrsApiContext& context,
1099     const LrsClient::ClusterLocalityStats::BackendMetric& backend_metric,
1100     envoy_config_endpoint_v3_UnnamedEndpointLoadMetricStats* (*add_field)(
1101         envoy_config_endpoint_v3_UpstreamLocalityStats*, upb_Arena*),
1102     envoy_config_endpoint_v3_UpstreamLocalityStats* output) {
1103   if (backend_metric.IsZero()) return;
1104   auto* metric_proto = add_field(output, context.arena);
1105   envoy_config_endpoint_v3_UnnamedEndpointLoadMetricStats_set_num_requests_finished_with_metric(
1106       metric_proto, backend_metric.num_requests_finished_with_metric);
1107   envoy_config_endpoint_v3_UnnamedEndpointLoadMetricStats_set_total_metric_value(
1108       metric_proto, backend_metric.total_metric_value);
1109 }
1110 
LocalityStatsPopulate(const LrsApiContext & context,envoy_config_endpoint_v3_UpstreamLocalityStats * output,const XdsLocalityName & locality_name,const LrsClient::ClusterLocalityStats::Snapshot & snapshot)1111 void LocalityStatsPopulate(
1112     const LrsApiContext& context,
1113     envoy_config_endpoint_v3_UpstreamLocalityStats* output,
1114     const XdsLocalityName& locality_name,
1115     const LrsClient::ClusterLocalityStats::Snapshot& snapshot) {
1116   // Set locality.
1117   envoy_config_core_v3_Locality* locality =
1118       envoy_config_endpoint_v3_UpstreamLocalityStats_mutable_locality(
1119           output, context.arena);
1120   if (!locality_name.region().empty()) {
1121     envoy_config_core_v3_Locality_set_region(
1122         locality, StdStringToUpbString(locality_name.region()));
1123   }
1124   if (!locality_name.zone().empty()) {
1125     envoy_config_core_v3_Locality_set_zone(
1126         locality, StdStringToUpbString(locality_name.zone()));
1127   }
1128   if (!locality_name.sub_zone().empty()) {
1129     envoy_config_core_v3_Locality_set_sub_zone(
1130         locality, StdStringToUpbString(locality_name.sub_zone()));
1131   }
1132   // Set total counts.
1133   envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_successful_requests(
1134       output, snapshot.total_successful_requests);
1135   envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_requests_in_progress(
1136       output, snapshot.total_requests_in_progress);
1137   envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_error_requests(
1138       output, snapshot.total_error_requests);
1139   envoy_config_endpoint_v3_UpstreamLocalityStats_set_total_issued_requests(
1140       output, snapshot.total_issued_requests);
1141   // Add backend metrics.
1142   MaybeAddUnnamedMetric(
1143       context, snapshot.cpu_utilization,
1144       envoy_config_endpoint_v3_UpstreamLocalityStats_mutable_cpu_utilization,
1145       output);
1146   MaybeAddUnnamedMetric(
1147       context, snapshot.mem_utilization,
1148       envoy_config_endpoint_v3_UpstreamLocalityStats_mutable_mem_utilization,
1149       output);
1150   MaybeAddUnnamedMetric(
1151       context, snapshot.application_utilization,
1152       envoy_config_endpoint_v3_UpstreamLocalityStats_mutable_application_utilization,
1153       output);
1154   for (const auto& p : snapshot.backend_metrics) {
1155     const std::string& metric_name = p.first;
1156     const LrsClient::ClusterLocalityStats::BackendMetric& metric_value =
1157         p.second;
1158     envoy_config_endpoint_v3_EndpointLoadMetricStats* load_metric =
1159         envoy_config_endpoint_v3_UpstreamLocalityStats_add_load_metric_stats(
1160             output, context.arena);
1161     envoy_config_endpoint_v3_EndpointLoadMetricStats_set_metric_name(
1162         load_metric, StdStringToUpbString(metric_name));
1163     envoy_config_endpoint_v3_EndpointLoadMetricStats_set_num_requests_finished_with_metric(
1164         load_metric, metric_value.num_requests_finished_with_metric);
1165     envoy_config_endpoint_v3_EndpointLoadMetricStats_set_total_metric_value(
1166         load_metric, metric_value.total_metric_value);
1167   }
1168 }
1169 
1170 }  // namespace
1171 
CreateLrsRequest(ClusterLoadReportMap cluster_load_report_map)1172 std::string LrsClient::CreateLrsRequest(
1173     ClusterLoadReportMap cluster_load_report_map) {
1174   upb::Arena arena;
1175   const LrsApiContext context = {this, def_pool_.ptr(), arena.ptr()};
1176   // Create a request.
1177   envoy_service_load_stats_v3_LoadStatsRequest* request =
1178       envoy_service_load_stats_v3_LoadStatsRequest_new(arena.ptr());
1179   for (auto& p : cluster_load_report_map) {
1180     const std::string& cluster_name = p.first.first;
1181     const std::string& eds_service_name = p.first.second;
1182     const ClusterLoadReport& load_report = p.second;
1183     // Add cluster stats.
1184     envoy_config_endpoint_v3_ClusterStats* cluster_stats =
1185         envoy_service_load_stats_v3_LoadStatsRequest_add_cluster_stats(
1186             request, arena.ptr());
1187     // Set the cluster name.
1188     envoy_config_endpoint_v3_ClusterStats_set_cluster_name(
1189         cluster_stats, StdStringToUpbString(cluster_name));
1190     // Set EDS service name, if non-empty.
1191     if (!eds_service_name.empty()) {
1192       envoy_config_endpoint_v3_ClusterStats_set_cluster_service_name(
1193           cluster_stats, StdStringToUpbString(eds_service_name));
1194     }
1195     // Add locality stats.
1196     for (const auto& p : load_report.locality_stats) {
1197       const XdsLocalityName& locality_name = *p.first;
1198       const auto& snapshot = p.second;
1199       envoy_config_endpoint_v3_UpstreamLocalityStats* locality_stats =
1200           envoy_config_endpoint_v3_ClusterStats_add_upstream_locality_stats(
1201               cluster_stats, arena.ptr());
1202       LocalityStatsPopulate(context, locality_stats, locality_name, snapshot);
1203     }
1204     // Add dropped requests.
1205     uint64_t total_dropped_requests = 0;
1206     for (const auto& p : load_report.dropped_requests.categorized_drops) {
1207       const std::string& category = p.first;
1208       const uint64_t count = p.second;
1209       envoy_config_endpoint_v3_ClusterStats_DroppedRequests* dropped_requests =
1210           envoy_config_endpoint_v3_ClusterStats_add_dropped_requests(
1211               cluster_stats, arena.ptr());
1212       envoy_config_endpoint_v3_ClusterStats_DroppedRequests_set_category(
1213           dropped_requests, StdStringToUpbString(category));
1214       envoy_config_endpoint_v3_ClusterStats_DroppedRequests_set_dropped_count(
1215           dropped_requests, count);
1216       total_dropped_requests += count;
1217     }
1218     total_dropped_requests += load_report.dropped_requests.uncategorized_drops;
1219     // Set total dropped requests.
1220     envoy_config_endpoint_v3_ClusterStats_set_total_dropped_requests(
1221         cluster_stats, total_dropped_requests);
1222     // Set real load report interval.
1223     gpr_timespec timespec = load_report.load_report_interval.as_timespec();
1224     google_protobuf_Duration* load_report_interval =
1225         envoy_config_endpoint_v3_ClusterStats_mutable_load_report_interval(
1226             cluster_stats, arena.ptr());
1227     google_protobuf_Duration_set_seconds(load_report_interval, timespec.tv_sec);
1228     google_protobuf_Duration_set_nanos(load_report_interval, timespec.tv_nsec);
1229   }
1230   MaybeLogLrsRequest(context, request);
1231   return SerializeLrsRequest(context, request);
1232 }
1233 
1234 namespace {
1235 
MaybeLogLrsResponse(const LrsApiContext & context,const envoy_service_load_stats_v3_LoadStatsResponse * response)1236 void MaybeLogLrsResponse(
1237     const LrsApiContext& context,
1238     const envoy_service_load_stats_v3_LoadStatsResponse* response) {
1239   if (GRPC_TRACE_FLAG_ENABLED(xds_client) && ABSL_VLOG_IS_ON(2)) {
1240     const upb_MessageDef* msg_type =
1241         envoy_service_load_stats_v3_LoadStatsResponse_getmsgdef(
1242             context.def_pool);
1243     char buf[10240];
1244     upb_TextEncode(reinterpret_cast<const upb_Message*>(response), msg_type,
1245                    nullptr, 0, buf, sizeof(buf));
1246     VLOG(2) << "[lrs_client " << context.client
1247             << "] received LRS response: " << buf;
1248   }
1249 }
1250 
1251 }  // namespace
1252 
ParseLrsResponse(absl::string_view encoded_response,bool * send_all_clusters,std::set<std::string> * cluster_names,Duration * load_reporting_interval)1253 absl::Status LrsClient::ParseLrsResponse(absl::string_view encoded_response,
1254                                          bool* send_all_clusters,
1255                                          std::set<std::string>* cluster_names,
1256                                          Duration* load_reporting_interval) {
1257   upb::Arena arena;
1258   // Decode the response.
1259   const envoy_service_load_stats_v3_LoadStatsResponse* decoded_response =
1260       envoy_service_load_stats_v3_LoadStatsResponse_parse(
1261           encoded_response.data(), encoded_response.size(), arena.ptr());
1262   // Parse the response.
1263   if (decoded_response == nullptr) {
1264     return absl::UnavailableError("Can't decode response.");
1265   }
1266   const LrsApiContext context = {this, def_pool_.ptr(), arena.ptr()};
1267   MaybeLogLrsResponse(context, decoded_response);
1268   // Check send_all_clusters.
1269   if (envoy_service_load_stats_v3_LoadStatsResponse_send_all_clusters(
1270           decoded_response)) {
1271     *send_all_clusters = true;
1272   } else {
1273     // Store the cluster names.
1274     size_t size;
1275     const upb_StringView* clusters =
1276         envoy_service_load_stats_v3_LoadStatsResponse_clusters(decoded_response,
1277                                                                &size);
1278     for (size_t i = 0; i < size; ++i) {
1279       cluster_names->emplace(UpbStringToStdString(clusters[i]));
1280     }
1281   }
1282   // Get the load report interval.
1283   const google_protobuf_Duration* load_reporting_interval_duration =
1284       envoy_service_load_stats_v3_LoadStatsResponse_load_reporting_interval(
1285           decoded_response);
1286   *load_reporting_interval = Duration::FromSecondsAndNanoseconds(
1287       google_protobuf_Duration_seconds(load_reporting_interval_duration),
1288       google_protobuf_Duration_nanos(load_reporting_interval_duration));
1289   return absl::OkStatus();
1290 }
1291 
1292 }  // namespace grpc_core
1293