• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/impl/connectivity_state.h>
18 #include <grpc/support/port_platform.h>
19 #include <stddef.h>
20 #include <stdint.h>
21 
22 #include <atomic>
23 #include <map>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 #include <vector>
28 
29 #include "absl/base/thread_annotations.h"
30 #include "absl/log/check.h"
31 #include "absl/log/log.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/str_cat.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/types/optional.h"
37 #include "absl/types/variant.h"
38 #include "src/core/client_channel/client_channel_internal.h"
39 #include "src/core/config/core_configuration.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/debug/trace.h"
42 #include "src/core/lib/iomgr/pollset_set.h"
43 #include "src/core/lib/iomgr/resolved_address.h"
44 #include "src/core/lib/security/credentials/xds/xds_credentials.h"
45 #include "src/core/lib/transport/connectivity_state.h"
46 #include "src/core/load_balancing/backend_metric_data.h"
47 #include "src/core/load_balancing/child_policy_handler.h"
48 #include "src/core/load_balancing/delegating_helper.h"
49 #include "src/core/load_balancing/lb_policy.h"
50 #include "src/core/load_balancing/lb_policy_factory.h"
51 #include "src/core/load_balancing/lb_policy_registry.h"
52 #include "src/core/load_balancing/subchannel_interface.h"
53 #include "src/core/load_balancing/xds/xds_channel_args.h"
54 #include "src/core/resolver/endpoint_addresses.h"
55 #include "src/core/resolver/xds/xds_config.h"
56 #include "src/core/resolver/xds/xds_resolver_attributes.h"
57 #include "src/core/telemetry/call_tracer.h"
58 #include "src/core/util/debug_location.h"
59 #include "src/core/util/json/json.h"
60 #include "src/core/util/json/json_args.h"
61 #include "src/core/util/json/json_object_loader.h"
62 #include "src/core/util/match.h"
63 #include "src/core/util/orphanable.h"
64 #include "src/core/util/ref_counted.h"
65 #include "src/core/util/ref_counted_ptr.h"
66 #include "src/core/util/ref_counted_string.h"
67 #include "src/core/util/sync.h"
68 #include "src/core/util/validation_errors.h"
69 #include "src/core/xds/grpc/xds_bootstrap_grpc.h"
70 #include "src/core/xds/grpc/xds_client_grpc.h"
71 #include "src/core/xds/grpc/xds_endpoint.h"
72 #include "src/core/xds/xds_client/xds_bootstrap.h"
73 #include "src/core/xds/xds_client/xds_client.h"
74 #include "src/core/xds/xds_client/xds_locality.h"
75 
76 namespace grpc_core {
77 
78 namespace {
79 
80 //
81 // global circuit breaker atomic map
82 //
83 
84 class CircuitBreakerCallCounterMap final {
85  public:
86   using Key =
87       std::pair<std::string /*cluster*/, std::string /*eds_service_name*/>;
88 
89   class CallCounter final : public RefCounted<CallCounter> {
90    public:
CallCounter(Key key)91     explicit CallCounter(Key key) : key_(std::move(key)) {}
92     ~CallCounter() override;
93 
Load()94     uint32_t Load() {
95       return concurrent_requests_.load(std::memory_order_seq_cst);
96     }
Increment()97     uint32_t Increment() { return concurrent_requests_.fetch_add(1); }
Decrement()98     void Decrement() { concurrent_requests_.fetch_sub(1); }
99 
100    private:
101     Key key_;
102     std::atomic<uint32_t> concurrent_requests_{0};
103   };
104 
105   RefCountedPtr<CallCounter> GetOrCreate(const std::string& cluster,
106                                          const std::string& eds_service_name);
107 
108  private:
109   Mutex mu_;
110   std::map<Key, CallCounter*> map_ ABSL_GUARDED_BY(mu_);
111 };
112 
113 CircuitBreakerCallCounterMap* const g_call_counter_map =
114     new CircuitBreakerCallCounterMap;
115 
116 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter>
GetOrCreate(const std::string & cluster,const std::string & eds_service_name)117 CircuitBreakerCallCounterMap::GetOrCreate(const std::string& cluster,
118                                           const std::string& eds_service_name) {
119   Key key(cluster, eds_service_name);
120   RefCountedPtr<CallCounter> result;
121   MutexLock lock(&mu_);
122   auto it = map_.find(key);
123   if (it == map_.end()) {
124     it = map_.insert({key, nullptr}).first;
125   } else {
126     result = it->second->RefIfNonZero();
127   }
128   if (result == nullptr) {
129     result = MakeRefCounted<CallCounter>(std::move(key));
130     it->second = result.get();
131   }
132   return result;
133 }
134 
~CallCounter()135 CircuitBreakerCallCounterMap::CallCounter::~CallCounter() {
136   MutexLock lock(&g_call_counter_map->mu_);
137   auto it = g_call_counter_map->map_.find(key_);
138   if (it != g_call_counter_map->map_.end() && it->second == this) {
139     g_call_counter_map->map_.erase(it);
140   }
141 }
142 
143 //
144 // LB policy
145 //
146 
147 constexpr absl::string_view kXdsClusterImpl = "xds_cluster_impl_experimental";
148 
149 // Config for xDS Cluster Impl LB policy.
150 class XdsClusterImplLbConfig final : public LoadBalancingPolicy::Config {
151  public:
152   XdsClusterImplLbConfig() = default;
153 
154   XdsClusterImplLbConfig(const XdsClusterImplLbConfig&) = delete;
155   XdsClusterImplLbConfig& operator=(const XdsClusterImplLbConfig&) = delete;
156 
157   XdsClusterImplLbConfig(XdsClusterImplLbConfig&& other) = delete;
158   XdsClusterImplLbConfig& operator=(XdsClusterImplLbConfig&& other) = delete;
159 
name() const160   absl::string_view name() const override { return kXdsClusterImpl; }
161 
cluster_name() const162   const std::string& cluster_name() const { return cluster_name_; }
child_policy() const163   RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
164     return child_policy_;
165   }
166 
167   static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
168   void JsonPostLoad(const Json& json, const JsonArgs& args,
169                     ValidationErrors* errors);
170 
171  private:
172   std::string cluster_name_;
173   RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
174 };
175 
176 // xDS Cluster Impl LB policy.
177 class XdsClusterImplLb final : public LoadBalancingPolicy {
178  public:
179   XdsClusterImplLb(RefCountedPtr<GrpcXdsClient> xds_client, Args args);
180 
name() const181   absl::string_view name() const override { return kXdsClusterImpl; }
182 
183   absl::Status UpdateLocked(UpdateArgs args) override;
184   void ExitIdleLocked() override;
185   void ResetBackoffLocked() override;
186 
187  private:
188   class StatsSubchannelWrapper final : public DelegatingSubchannel {
189    public:
190     // If load reporting is enabled and we have a ClusterLocalityStats
191     // object, that object already contains the locality label.  We
192     // need to store the locality label directly only in the case where
193     // load reporting is disabled.
194     using LocalityData = absl::variant<
195         RefCountedStringValue /*locality*/,
196         RefCountedPtr<LrsClient::ClusterLocalityStats> /*locality_stats*/>;
197 
StatsSubchannelWrapper(RefCountedPtr<SubchannelInterface> wrapped_subchannel,LocalityData locality_data,absl::string_view hostname)198     StatsSubchannelWrapper(
199         RefCountedPtr<SubchannelInterface> wrapped_subchannel,
200         LocalityData locality_data, absl::string_view hostname)
201         : DelegatingSubchannel(std::move(wrapped_subchannel)),
202           locality_data_(std::move(locality_data)),
203           hostname_(grpc_event_engine::experimental::Slice::FromCopiedString(
204               hostname)) {}
205 
locality() const206     RefCountedStringValue locality() const {
207       return Match(
208           locality_data_,
209           [](RefCountedStringValue locality) { return locality; },
210           [](const RefCountedPtr<LrsClient::ClusterLocalityStats>&
211                  locality_stats) {
212             return locality_stats->locality_name()->human_readable_string();
213           });
214     }
215 
locality_stats() const216     LrsClient::ClusterLocalityStats* locality_stats() const {
217       return Match(
218           locality_data_,
219           [](const RefCountedStringValue&) {
220             return static_cast<LrsClient::ClusterLocalityStats*>(nullptr);
221           },
222           [](const RefCountedPtr<LrsClient::ClusterLocalityStats>&
223                  locality_stats) { return locality_stats.get(); });
224     }
225 
hostname() const226     const grpc_event_engine::experimental::Slice& hostname() const {
227       return hostname_;
228     }
229 
230    private:
231     LocalityData locality_data_;
232     grpc_event_engine::experimental::Slice hostname_;
233   };
234 
235   // A picker that wraps the picker from the child to perform drops.
236   class Picker final : public SubchannelPicker {
237    public:
238     Picker(XdsClusterImplLb* xds_cluster_impl_lb,
239            RefCountedPtr<SubchannelPicker> picker);
240 
241     PickResult Pick(PickArgs args) override;
242 
243    private:
244     class SubchannelCallTracker;
245 
246     RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
247     uint32_t max_concurrent_requests_;
248     RefCountedStringValue service_telemetry_label_;
249     RefCountedStringValue namespace_telemetry_label_;
250     RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
251     RefCountedPtr<LrsClient::ClusterDropStats> drop_stats_;
252     RefCountedPtr<SubchannelPicker> picker_;
253   };
254 
255   class Helper final
256       : public ParentOwningDelegatingChannelControlHelper<XdsClusterImplLb> {
257    public:
Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)258     explicit Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)
259         : ParentOwningDelegatingChannelControlHelper(
260               std::move(xds_cluster_impl_policy)) {}
261 
262     RefCountedPtr<SubchannelInterface> CreateSubchannel(
263         const grpc_resolved_address& address,
264         const ChannelArgs& per_address_args, const ChannelArgs& args) override;
265     void UpdateState(grpc_connectivity_state state, const absl::Status& status,
266                      RefCountedPtr<SubchannelPicker> picker) override;
267   };
268 
269   ~XdsClusterImplLb() override;
270 
271   void ShutdownLocked() override;
272 
273   void ResetState();
274   void ReportTransientFailure(absl::Status status);
275 
276   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
277       const ChannelArgs& args);
278   absl::Status UpdateChildPolicyLocked(
279       absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
280       std::string resolution_note, const ChannelArgs& args);
281 
282   absl::StatusOr<RefCountedPtr<XdsCertificateProvider>>
283   MaybeCreateCertificateProviderLocked(
284       const XdsClusterResource& cluster_resource) const;
285 
286   void MaybeUpdatePickerLocked();
287 
288   // Current config from the resolver.
289   RefCountedPtr<XdsClusterImplLbConfig> config_;
290   std::shared_ptr<const XdsClusterResource> cluster_resource_;
291   RefCountedStringValue service_telemetry_label_;
292   RefCountedStringValue namespace_telemetry_label_;
293   RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
294 
295   // Current concurrent number of requests.
296   RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
297 
298   // Internal state.
299   bool shutting_down_ = false;
300 
301   // The xds client.
302   RefCountedPtr<GrpcXdsClient> xds_client_;
303 
304   // The stats for client-side load reporting.
305   RefCountedPtr<LrsClient::ClusterDropStats> drop_stats_;
306 
307   OrphanablePtr<LoadBalancingPolicy> child_policy_;
308 
309   // Latest state and picker reported by the child policy.
310   grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
311   absl::Status status_;
312   RefCountedPtr<SubchannelPicker> picker_;
313 };
314 
315 //
316 // XdsClusterImplLb::Picker::SubchannelCallTracker
317 //
318 
319 class XdsClusterImplLb::Picker::SubchannelCallTracker final
320     : public LoadBalancingPolicy::SubchannelCallTrackerInterface {
321  public:
SubchannelCallTracker(std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface> original_subchannel_call_tracker,RefCountedPtr<LrsClient::ClusterLocalityStats> locality_stats,RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter)322   SubchannelCallTracker(
323       std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
324           original_subchannel_call_tracker,
325       RefCountedPtr<LrsClient::ClusterLocalityStats> locality_stats,
326       RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter)
327       : original_subchannel_call_tracker_(
328             std::move(original_subchannel_call_tracker)),
329         locality_stats_(std::move(locality_stats)),
330         call_counter_(std::move(call_counter)) {}
331 
~SubchannelCallTracker()332   ~SubchannelCallTracker() override {
333     locality_stats_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
334     call_counter_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
335 #ifndef NDEBUG
336     DCHECK(!started_);
337 #endif
338   }
339 
Start()340   void Start() override {
341     // Increment number of calls in flight.
342     call_counter_->Increment();
343     // Record a call started.
344     if (locality_stats_ != nullptr) {
345       locality_stats_->AddCallStarted();
346     }
347     // Delegate if needed.
348     if (original_subchannel_call_tracker_ != nullptr) {
349       original_subchannel_call_tracker_->Start();
350     }
351 #ifndef NDEBUG
352     started_ = true;
353 #endif
354   }
355 
Finish(FinishArgs args)356   void Finish(FinishArgs args) override {
357     // Delegate if needed.
358     if (original_subchannel_call_tracker_ != nullptr) {
359       original_subchannel_call_tracker_->Finish(args);
360     }
361     // Record call completion for load reporting.
362     if (locality_stats_ != nullptr) {
363       locality_stats_->AddCallFinished(
364           args.backend_metric_accessor->GetBackendMetricData(),
365           !args.status.ok());
366     }
367     // Decrement number of calls in flight.
368     call_counter_->Decrement();
369 #ifndef NDEBUG
370     started_ = false;
371 #endif
372   }
373 
374  private:
375   std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
376       original_subchannel_call_tracker_;
377   RefCountedPtr<LrsClient::ClusterLocalityStats> locality_stats_;
378   RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
379 #ifndef NDEBUG
380   bool started_ = false;
381 #endif
382 };
383 
384 //
385 // XdsClusterImplLb::Picker
386 //
387 
Picker(XdsClusterImplLb * xds_cluster_impl_lb,RefCountedPtr<SubchannelPicker> picker)388 XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
389                                  RefCountedPtr<SubchannelPicker> picker)
390     : call_counter_(xds_cluster_impl_lb->call_counter_),
391       max_concurrent_requests_(
392           xds_cluster_impl_lb->cluster_resource_->max_concurrent_requests),
393       service_telemetry_label_(xds_cluster_impl_lb->service_telemetry_label_),
394       namespace_telemetry_label_(
395           xds_cluster_impl_lb->namespace_telemetry_label_),
396       drop_config_(xds_cluster_impl_lb->drop_config_),
397       drop_stats_(xds_cluster_impl_lb->drop_stats_),
398       picker_(std::move(picker)) {
399   GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
400       << "[xds_cluster_impl_lb " << xds_cluster_impl_lb
401       << "] constructed new picker " << this;
402 }
403 
Pick(LoadBalancingPolicy::PickArgs args)404 LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
405     LoadBalancingPolicy::PickArgs args) {
406   auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
407   auto* call_attempt_tracer = call_state->GetCallAttemptTracer();
408   if (call_attempt_tracer != nullptr) {
409     call_attempt_tracer->SetOptionalLabel(
410         ClientCallTracer::CallAttemptTracer::OptionalLabelKey::kXdsServiceName,
411         service_telemetry_label_);
412     call_attempt_tracer->SetOptionalLabel(
413         ClientCallTracer::CallAttemptTracer::OptionalLabelKey::
414             kXdsServiceNamespace,
415         namespace_telemetry_label_);
416   }
417   // Handle EDS drops.
418   const std::string* drop_category;
419   if (drop_config_ != nullptr && drop_config_->ShouldDrop(&drop_category)) {
420     if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category);
421     return PickResult::Drop(absl::UnavailableError(
422         absl::StrCat("EDS-configured drop: ", *drop_category)));
423   }
424   // Check if we exceeded the max concurrent requests circuit breaking limit.
425   // Note: We check the value here, but we don't actually increment the
426   // counter for the current request until the channel calls the subchannel
427   // call tracker's Start() method.  This means that we may wind up
428   // allowing more concurrent requests than the configured limit.
429   if (call_counter_->Load() >= max_concurrent_requests_) {
430     if (drop_stats_ != nullptr) drop_stats_->AddUncategorizedDrops();
431     return PickResult::Drop(absl::UnavailableError("circuit breaker drop"));
432   }
433   // If we're not dropping the call, we should always have a child picker.
434   if (picker_ == nullptr) {  // Should never happen.
435     return PickResult::Fail(absl::InternalError(
436         "xds_cluster_impl picker not given any child picker"));
437   }
438   // Not dropping, so delegate to child picker.
439   PickResult result = picker_->Pick(args);
440   auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
441   if (complete_pick != nullptr) {
442     auto* subchannel_wrapper =
443         static_cast<StatsSubchannelWrapper*>(complete_pick->subchannel.get());
444     // Add locality label to per-call metrics if needed.
445     if (call_attempt_tracer != nullptr) {
446       call_attempt_tracer->SetOptionalLabel(
447           ClientCallTracer::CallAttemptTracer::OptionalLabelKey::kLocality,
448           subchannel_wrapper->locality());
449     }
450     // Handle load reporting.
451     RefCountedPtr<LrsClient::ClusterLocalityStats> locality_stats;
452     if (subchannel_wrapper->locality_stats() != nullptr) {
453       locality_stats = subchannel_wrapper->locality_stats()->Ref(
454           DEBUG_LOCATION, "SubchannelCallTracker");
455     }
456     // Handle authority rewriting if needed.
457     if (!subchannel_wrapper->hostname().empty()) {
458       auto* route_state_attribute =
459           call_state->GetCallAttribute<XdsRouteStateAttribute>();
460       if (route_state_attribute != nullptr) {
461         auto* route_action =
462             absl::get_if<XdsRouteConfigResource::Route::RouteAction>(
463                 &route_state_attribute->route().action);
464         if (route_action != nullptr && route_action->auto_host_rewrite) {
465           complete_pick->authority_override =
466               subchannel_wrapper->hostname().Ref();
467         }
468       }
469     }
470     // Unwrap subchannel to pass back up the stack.
471     complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
472     // Inject subchannel call tracker to record call completion.
473     complete_pick->subchannel_call_tracker =
474         std::make_unique<SubchannelCallTracker>(
475             std::move(complete_pick->subchannel_call_tracker),
476             std::move(locality_stats),
477             call_counter_->Ref(DEBUG_LOCATION, "SubchannelCallTracker"));
478   } else {
479     // TODO(roth): We should ideally also record call failures here in the case
480     // where a pick fails.  This is challenging, because we don't know which
481     // picks are for wait_for_ready RPCs or how many times we'll return a
482     // failure for the same wait_for_ready RPC.
483   }
484   return result;
485 }
486 
487 //
488 // XdsClusterImplLb
489 //
490 
XdsClusterImplLb(RefCountedPtr<GrpcXdsClient> xds_client,Args args)491 XdsClusterImplLb::XdsClusterImplLb(RefCountedPtr<GrpcXdsClient> xds_client,
492                                    Args args)
493     : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
494   GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
495       << "[xds_cluster_impl_lb " << this << "] created -- using xds client "
496       << xds_client_.get();
497 }
498 
~XdsClusterImplLb()499 XdsClusterImplLb::~XdsClusterImplLb() {
500   GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
501       << "[xds_cluster_impl_lb " << this
502       << "] destroying xds_cluster_impl LB policy";
503 }
504 
ShutdownLocked()505 void XdsClusterImplLb::ShutdownLocked() {
506   GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
507       << "[xds_cluster_impl_lb " << this << "] shutting down";
508   shutting_down_ = true;
509   ResetState();
510   xds_client_.reset(DEBUG_LOCATION, "XdsClusterImpl");
511 }
512 
ResetState()513 void XdsClusterImplLb::ResetState() {
514   // Remove the child policy's interested_parties pollset_set from the
515   // xDS policy.
516   if (child_policy_ != nullptr) {
517     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
518                                      interested_parties());
519     child_policy_.reset();
520   }
521   // Drop our ref to the child's picker, in case it's holding a ref to
522   // the child.
523   picker_.reset();
524   drop_stats_.reset();
525 }
526 
ReportTransientFailure(absl::Status status)527 void XdsClusterImplLb::ReportTransientFailure(absl::Status status) {
528   GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
529       << "[xds_cluster_impl_lb " << this
530       << "] reporting TRANSIENT_FAILURE: " << status;
531   ResetState();
532   channel_control_helper()->UpdateState(
533       GRPC_CHANNEL_TRANSIENT_FAILURE, status,
534       MakeRefCounted<TransientFailurePicker>(status));
535 }
536 
ExitIdleLocked()537 void XdsClusterImplLb::ExitIdleLocked() {
538   if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
539 }
540 
ResetBackoffLocked()541 void XdsClusterImplLb::ResetBackoffLocked() {
542   // The XdsClient will have its backoff reset by the xds resolver, so we
543   // don't need to do it here.
544   if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
545 }
546 
GetEdsResourceName(const XdsClusterResource & cluster_resource)547 std::string GetEdsResourceName(const XdsClusterResource& cluster_resource) {
548   auto* eds = absl::get_if<XdsClusterResource::Eds>(&cluster_resource.type);
549   if (eds == nullptr) return "";
550   return eds->eds_service_name;
551 }
552 
UpdateLocked(UpdateArgs args)553 absl::Status XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
554   GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
555       << "[xds_cluster_impl_lb " << this << "] Received update";
556   // Grab new LB policy config.
557   auto new_config = args.config.TakeAsSubclass<XdsClusterImplLbConfig>();
558   // Cluster name should never change, because the cds policy will assign a
559   // different priority child name if that happens, which means that this
560   // policy instance will get replaced instead of being updated.
561   if (config_ != nullptr) {
562     CHECK(config_->cluster_name() == new_config->cluster_name());
563   }
564   // Get xDS config.
565   auto new_xds_config = args.args.GetObjectRef<XdsConfig>();
566   if (new_xds_config == nullptr) {
567     // Should never happen.
568     absl::Status status = absl::InternalError(
569         "xDS config not passed to xds_cluster_impl LB policy");
570     ReportTransientFailure(status);
571     return status;
572   }
573   auto it = new_xds_config->clusters.find(new_config->cluster_name());
574   if (it == new_xds_config->clusters.end() || !it->second.ok() ||
575       it->second->cluster == nullptr) {
576     // Should never happen.
577     absl::Status status = absl::InternalError(absl::StrCat(
578         "xDS config has no entry for cluster ", new_config->cluster_name()));
579     ReportTransientFailure(status);
580     return status;
581   }
582   auto& new_cluster_config = *it->second;
583   auto* endpoint_config =
584       absl::get_if<XdsConfig::ClusterConfig::EndpointConfig>(
585           &new_cluster_config.children);
586   if (endpoint_config == nullptr) {
587     // Should never happen.
588     absl::Status status = absl::InternalError(
589         absl::StrCat("cluster config for ", new_config->cluster_name(),
590                      " has no endpoint config"));
591     ReportTransientFailure(status);
592     return status;
593   }
594   auto xds_cert_provider =
595       MaybeCreateCertificateProviderLocked(*new_cluster_config.cluster);
596   if (!xds_cert_provider.ok()) {
597     // Should never happen.
598     ReportTransientFailure(xds_cert_provider.status());
599     return xds_cert_provider.status();
600   }
601   if (*xds_cert_provider != nullptr) {
602     args.args = args.args.SetObject(std::move(*xds_cert_provider));
603   }
604   // Now we've verified the new config is good.
605   // Get new and old (if any) EDS service name.
606   std::string new_eds_service_name =
607       GetEdsResourceName(*new_cluster_config.cluster);
608   std::string old_eds_service_name =
609       cluster_resource_ == nullptr ? ""
610                                    : GetEdsResourceName(*cluster_resource_);
611   // Update drop stats if needed.
612   // Note: We need a drop stats object whenever load reporting is enabled,
613   // even if we have no EDS drop config, because we also use it when
614   // reporting circuit breaker drops.
615   if (new_cluster_config.cluster->lrs_load_reporting_server == nullptr) {
616     drop_stats_.reset();
617   } else if (cluster_resource_ == nullptr ||
618              old_eds_service_name != new_eds_service_name ||
619              !LrsServersEqual(
620                  cluster_resource_->lrs_load_reporting_server,
621                  new_cluster_config.cluster->lrs_load_reporting_server)) {
622     drop_stats_ = xds_client_->lrs_client().AddClusterDropStats(
623         new_cluster_config.cluster->lrs_load_reporting_server,
624         new_config->cluster_name(), new_eds_service_name);
625     if (drop_stats_ == nullptr) {
626       LOG(ERROR)
627           << "[xds_cluster_impl_lb " << this
628           << "] Failed to get cluster drop stats for LRS server "
629           << new_cluster_config.cluster->lrs_load_reporting_server->server_uri()
630           << ", cluster " << new_config->cluster_name() << ", EDS service name "
631           << new_eds_service_name
632           << ", load reporting for drops will not be done.";
633     }
634   }
635   // Update call counter if needed.
636   if (cluster_resource_ == nullptr ||
637       old_eds_service_name != new_eds_service_name) {
638     call_counter_ = g_call_counter_map->GetOrCreate(new_config->cluster_name(),
639                                                     new_eds_service_name);
640   }
641   // Update config state, now that we're done comparing old and new fields.
642   config_ = std::move(new_config);
643   cluster_resource_ = new_cluster_config.cluster;
644   const XdsMetadataValue* metadata_value =
645       cluster_resource_->metadata.Find("com.google.csm.telemetry_labels");
646   if (metadata_value != nullptr &&
647       metadata_value->type() == XdsStructMetadataValue::Type()) {
648     const Json::Object& json_object =
649         DownCast<const XdsStructMetadataValue*>(metadata_value)
650             ->json()
651             .object();
652     auto it = json_object.find("service_name");
653     if (it != json_object.end() && it->second.type() == Json::Type::kString) {
654       service_telemetry_label_ = RefCountedStringValue(it->second.string());
655     }
656     it = json_object.find("service_namespace");
657     if (it != json_object.end() && it->second.type() == Json::Type::kString) {
658       namespace_telemetry_label_ = RefCountedStringValue(it->second.string());
659     }
660   }
661   drop_config_ = endpoint_config->endpoints != nullptr
662                      ? endpoint_config->endpoints->drop_config
663                      : nullptr;
664   // Update picker in case some dependent config field changed.
665   MaybeUpdatePickerLocked();
666   // Update child policy.
667   return UpdateChildPolicyLocked(std::move(args.addresses),
668                                  std::move(args.resolution_note), args.args);
669 }
670 
671 absl::StatusOr<RefCountedPtr<XdsCertificateProvider>>
MaybeCreateCertificateProviderLocked(const XdsClusterResource & cluster_resource) const672 XdsClusterImplLb::MaybeCreateCertificateProviderLocked(
673     const XdsClusterResource& cluster_resource) const {
674   // If the channel is not using XdsCreds, do nothing.
675   auto channel_credentials = channel_control_helper()->GetChannelCredentials();
676   if (channel_credentials == nullptr ||
677       channel_credentials->type() != XdsCredentials::Type()) {
678     return nullptr;
679   }
680   // Configure root cert.
681   absl::string_view root_cert_name;
682   RefCountedPtr<grpc_tls_certificate_provider> root_cert_provider;
683   bool use_system_root_certs = false;
684   absl::Status status = Match(
685       cluster_resource.common_tls_context.certificate_validation_context
686           .ca_certs,
687       [](const absl::monostate&) {
688         // No root cert configured.
689         return absl::OkStatus();
690       },
691       [&](const CommonTlsContext::CertificateProviderPluginInstance&
692               cert_provider) {
693         root_cert_name = cert_provider.certificate_name;
694         root_cert_provider =
695             xds_client_->certificate_provider_store()
696                 .CreateOrGetCertificateProvider(cert_provider.instance_name);
697         if (root_cert_provider == nullptr) {
698           return absl::InternalError(
699               absl::StrCat("Certificate provider instance name: \"",
700                            cert_provider.instance_name, "\" not recognized."));
701         }
702         return absl::OkStatus();
703       },
704       [&](const CommonTlsContext::CertificateValidationContext::
705               SystemRootCerts&) {
706         use_system_root_certs = true;
707         return absl::OkStatus();
708       });
709   if (!status.ok()) return status;
710   // Configure identity cert.
711   absl::string_view identity_provider_instance_name =
712       cluster_resource.common_tls_context.tls_certificate_provider_instance
713           .instance_name;
714   absl::string_view identity_cert_name =
715       cluster_resource.common_tls_context.tls_certificate_provider_instance
716           .certificate_name;
717   RefCountedPtr<grpc_tls_certificate_provider> identity_cert_provider;
718   if (!identity_provider_instance_name.empty()) {
719     identity_cert_provider =
720         xds_client_->certificate_provider_store()
721             .CreateOrGetCertificateProvider(identity_provider_instance_name);
722     if (identity_cert_provider == nullptr) {
723       return absl::InternalError(
724           absl::StrCat("Certificate provider instance name: \"",
725                        identity_provider_instance_name, "\" not recognized."));
726     }
727   }
728   // Configure SAN matchers.
729   const std::vector<StringMatcher>& san_matchers =
730       cluster_resource.common_tls_context.certificate_validation_context
731           .match_subject_alt_names;
732   // Create xds cert provider.
733   return MakeRefCounted<XdsCertificateProvider>(
734       std::move(root_cert_provider), root_cert_name, use_system_root_certs,
735       std::move(identity_cert_provider), identity_cert_name, san_matchers);
736 }
737 
MaybeUpdatePickerLocked()738 void XdsClusterImplLb::MaybeUpdatePickerLocked() {
739   // If we're dropping all calls, report READY, regardless of what (or
740   // whether) the child has reported.
741   if (drop_config_ != nullptr && drop_config_->drop_all()) {
742     auto drop_picker = MakeRefCounted<Picker>(this, picker_);
743     GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
744         << "[xds_cluster_impl_lb " << this
745         << "] updating connectivity (drop all): state=READY picker="
746         << drop_picker.get();
747     channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
748                                           std::move(drop_picker));
749     return;
750   }
751   // Otherwise, update only if we have a child picker.
752   if (picker_ != nullptr) {
753     auto drop_picker = MakeRefCounted<Picker>(this, picker_);
754     GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
755         << "[xds_cluster_impl_lb " << this
756         << "] updating connectivity: state=" << ConnectivityStateName(state_)
757         << " status=(" << status_ << ") picker=" << drop_picker.get();
758     channel_control_helper()->UpdateState(state_, status_,
759                                           std::move(drop_picker));
760   }
761 }
762 
CreateChildPolicyLocked(const ChannelArgs & args)763 OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
764     const ChannelArgs& args) {
765   LoadBalancingPolicy::Args lb_policy_args;
766   lb_policy_args.work_serializer = work_serializer();
767   lb_policy_args.args = args;
768   lb_policy_args.channel_control_helper = std::make_unique<Helper>(
769       RefAsSubclass<XdsClusterImplLb>(DEBUG_LOCATION, "Helper"));
770   OrphanablePtr<LoadBalancingPolicy> lb_policy =
771       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
772                                          &xds_cluster_impl_lb_trace);
773   GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
774       << "[xds_cluster_impl_lb " << this
775       << "] Created new child policy handler " << lb_policy.get();
776   // Add our interested_parties pollset_set to that of the newly created
777   // child policy. This will make the child policy progress upon activity on
778   // this policy, which in turn is tied to the application's call.
779   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
780                                    interested_parties());
781   return lb_policy;
782 }
783 
UpdateChildPolicyLocked(absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,std::string resolution_note,const ChannelArgs & args)784 absl::Status XdsClusterImplLb::UpdateChildPolicyLocked(
785     absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
786     std::string resolution_note, const ChannelArgs& args) {
787   // Create policy if needed.
788   if (child_policy_ == nullptr) {
789     child_policy_ = CreateChildPolicyLocked(args);
790   }
791   // Construct update args.
792   UpdateArgs update_args;
793   update_args.addresses = std::move(addresses);
794   update_args.resolution_note = std::move(resolution_note);
795   update_args.config = config_->child_policy();
796   update_args.args =
797       args.Set(GRPC_ARG_XDS_CLUSTER_NAME, config_->cluster_name());
798   // Update the policy.
799   GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
800       << "[xds_cluster_impl_lb " << this << "] Updating child policy handler "
801       << child_policy_.get();
802   return child_policy_->UpdateLocked(std::move(update_args));
803 }
804 
805 //
806 // XdsClusterImplLb::Helper
807 //
808 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)809 RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
810     const grpc_resolved_address& address, const ChannelArgs& per_address_args,
811     const ChannelArgs& args) {
812   if (parent()->shutting_down_) return nullptr;
813   // Wrap the subchannel so that we pass along the locality label and
814   // (if load reporting is enabled) the locality stats object, which
815   // will be used by the picker.
816   auto locality_name = per_address_args.GetObjectRef<XdsLocalityName>();
817   RefCountedPtr<LrsClient::ClusterLocalityStats> locality_stats;
818   if (parent()->cluster_resource_->lrs_load_reporting_server != nullptr) {
819     locality_stats =
820         parent()->xds_client_->lrs_client().AddClusterLocalityStats(
821             parent()->cluster_resource_->lrs_load_reporting_server,
822             parent()->config_->cluster_name(),
823             GetEdsResourceName(*parent()->cluster_resource_), locality_name,
824             parent()->cluster_resource_->lrs_backend_metric_propagation);
825     if (locality_stats == nullptr) {
826       LOG(ERROR)
827           << "[xds_cluster_impl_lb " << parent()
828           << "] Failed to get locality stats object for LRS server "
829           << parent()
830                  ->cluster_resource_->lrs_load_reporting_server->server_uri()
831           << ", cluster " << parent()->config_->cluster_name()
832           << ", EDS service name "
833           << GetEdsResourceName(*parent()->cluster_resource_)
834           << "; load reports will not be generated";
835     }
836   }
837   StatsSubchannelWrapper::LocalityData locality_data;
838   if (locality_stats != nullptr) {
839     locality_data = std::move(locality_stats);
840   } else {
841     locality_data = locality_name->human_readable_string();
842   }
843   return MakeRefCounted<StatsSubchannelWrapper>(
844       parent()->channel_control_helper()->CreateSubchannel(
845           address, per_address_args, args),
846       std::move(locality_data),
847       per_address_args.GetString(GRPC_ARG_ADDRESS_NAME).value_or(""));
848 }
849 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)850 void XdsClusterImplLb::Helper::UpdateState(
851     grpc_connectivity_state state, const absl::Status& status,
852     RefCountedPtr<SubchannelPicker> picker) {
853   if (parent()->shutting_down_) return;
854   GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
855       << "[xds_cluster_impl_lb " << parent()
856       << "] child connectivity state update: state="
857       << ConnectivityStateName(state) << " (" << status
858       << ") picker=" << picker.get();
859   // Save the state and picker.
860   parent()->state_ = state;
861   parent()->status_ = status;
862   parent()->picker_ = std::move(picker);
863   // Wrap the picker and return it to the channel.
864   parent()->MaybeUpdatePickerLocked();
865 }
866 
867 //
868 // factory
869 //
870 
JsonLoader(const JsonArgs &)871 const JsonLoaderInterface* XdsClusterImplLbConfig::JsonLoader(const JsonArgs&) {
872   static const auto* loader =
873       JsonObjectLoader<XdsClusterImplLbConfig>()
874           // Note: Some fields require custom processing, so they are
875           // handled in JsonPostLoad() instead.
876           .Field("clusterName", &XdsClusterImplLbConfig::cluster_name_)
877           .Finish();
878   return loader;
879 }
880 
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)881 void XdsClusterImplLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
882                                           ValidationErrors* errors) {
883   // Parse "childPolicy" field.
884   ValidationErrors::ScopedField field(errors, ".childPolicy");
885   auto it = json.object().find("childPolicy");
886   if (it == json.object().end()) {
887     errors->AddError("field not present");
888   } else {
889     auto lb_config =
890         CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
891             it->second);
892     if (!lb_config.ok()) {
893       errors->AddError(lb_config.status().message());
894     } else {
895       child_policy_ = std::move(*lb_config);
896     }
897   }
898 }
899 
900 class XdsClusterImplLbFactory final : public LoadBalancingPolicyFactory {
901  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const902   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
903       LoadBalancingPolicy::Args args) const override {
904     auto xds_client = args.args.GetObjectRef<GrpcXdsClient>(DEBUG_LOCATION,
905                                                             "XdsClusterImplLb");
906     if (xds_client == nullptr) {
907       LOG(ERROR) << "XdsClient not present in channel args -- cannot "
908                     "instantiate xds_cluster_impl LB policy";
909       return nullptr;
910     }
911     return MakeOrphanable<XdsClusterImplLb>(std::move(xds_client),
912                                             std::move(args));
913   }
914 
name() const915   absl::string_view name() const override { return kXdsClusterImpl; }
916 
917   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const918   ParseLoadBalancingConfig(const Json& json) const override {
919     return LoadFromJson<RefCountedPtr<XdsClusterImplLbConfig>>(
920         json, JsonArgs(),
921         "errors validating xds_cluster_impl LB policy config");
922   }
923 };
924 
925 }  // namespace
926 
RegisterXdsClusterImplLbPolicy(CoreConfiguration::Builder * builder)927 void RegisterXdsClusterImplLbPolicy(CoreConfiguration::Builder* builder) {
928   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
929       std::make_unique<XdsClusterImplLbFactory>());
930 }
931 
932 }  // namespace grpc_core
933