• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/grpc_security.h>
18 #include <grpc/impl/connectivity_state.h>
19 #include <grpc/support/json.h>
20 #include <grpc/support/port_platform.h>
21 
22 #include <algorithm>
23 #include <map>
24 #include <memory>
25 #include <set>
26 #include <string>
27 #include <type_traits>
28 #include <utility>
29 #include <vector>
30 
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/str_cat.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/types/optional.h"
38 #include "absl/types/variant.h"
39 #include "src/core/config/core_configuration.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/debug/trace.h"
42 #include "src/core/lib/iomgr/pollset_set.h"
43 #include "src/core/load_balancing/address_filtering.h"
44 #include "src/core/load_balancing/delegating_helper.h"
45 #include "src/core/load_balancing/lb_policy.h"
46 #include "src/core/load_balancing/lb_policy_factory.h"
47 #include "src/core/load_balancing/lb_policy_registry.h"
48 #include "src/core/load_balancing/outlier_detection/outlier_detection.h"
49 #include "src/core/load_balancing/xds/xds_channel_args.h"
50 #include "src/core/resolver/xds/xds_dependency_manager.h"
51 #include "src/core/util/debug_location.h"
52 #include "src/core/util/env.h"
53 #include "src/core/util/json/json.h"
54 #include "src/core/util/json/json_args.h"
55 #include "src/core/util/json/json_object_loader.h"
56 #include "src/core/util/json/json_writer.h"
57 #include "src/core/util/match.h"
58 #include "src/core/util/orphanable.h"
59 #include "src/core/util/ref_counted_ptr.h"
60 #include "src/core/util/time.h"
61 #include "src/core/util/unique_type_name.h"
62 #include "src/core/util/work_serializer.h"
63 #include "src/core/xds/grpc/xds_cluster.h"
64 #include "src/core/xds/grpc/xds_common_types.h"
65 #include "src/core/xds/grpc/xds_health_status.h"
66 
67 namespace grpc_core {
68 
69 namespace {
70 
71 // TODO(roth): Remove this after the 1.63 release.
XdsAggregateClusterBackwardCompatibilityEnabled()72 bool XdsAggregateClusterBackwardCompatibilityEnabled() {
73   auto value = GetEnv("GRPC_XDS_AGGREGATE_CLUSTER_BACKWARD_COMPAT");
74   if (!value.has_value()) return false;
75   bool parsed_value;
76   bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
77   return parse_succeeded && parsed_value;
78 }
79 
80 constexpr absl::string_view kCds = "cds_experimental";
81 
82 // Config for this LB policy.
83 class CdsLbConfig final : public LoadBalancingPolicy::Config {
84  public:
85   CdsLbConfig() = default;
86 
87   CdsLbConfig(const CdsLbConfig&) = delete;
88   CdsLbConfig& operator=(const CdsLbConfig&) = delete;
89 
90   CdsLbConfig(CdsLbConfig&& other) = delete;
91   CdsLbConfig& operator=(CdsLbConfig&& other) = delete;
92 
name() const93   absl::string_view name() const override { return kCds; }
94 
cluster() const95   const std::string& cluster() const { return cluster_; }
is_dynamic() const96   bool is_dynamic() const { return is_dynamic_; }
97 
JsonLoader(const JsonArgs &)98   static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
99     static const auto* loader =
100         JsonObjectLoader<CdsLbConfig>()
101             .Field("cluster", &CdsLbConfig::cluster_)
102             .OptionalField("isDynamic", &CdsLbConfig::is_dynamic_)
103             .Finish();
104     return loader;
105   }
106 
107  private:
108   std::string cluster_;
109   bool is_dynamic_ = false;
110 };
111 
112 // CDS LB policy.
113 class CdsLb final : public LoadBalancingPolicy {
114  public:
115   explicit CdsLb(Args args);
116 
name() const117   absl::string_view name() const override { return kCds; }
118 
119   absl::Status UpdateLocked(UpdateArgs args) override;
120   void ResetBackoffLocked() override;
121   void ExitIdleLocked() override;
122 
123  private:
124   // Delegating helper to be passed to child policy.
125   using Helper = ParentOwningDelegatingChannelControlHelper<CdsLb>;
126 
127   // State used to retain child policy names for the priority policy.
128   struct ChildNameState {
129     std::vector<size_t /*child_number*/> priority_child_numbers;
130     size_t next_available_child_number = 0;
131 
Resetgrpc_core::__anon80268f070111::CdsLb::ChildNameState132     void Reset() {
133       priority_child_numbers.clear();
134       next_available_child_number = 0;
135     }
136   };
137 
138   ~CdsLb() override;
139 
140   void ShutdownLocked() override;
141 
142   // Computes child numbers for new_cluster, reusing child numbers
143   // from old_cluster and child_name_state_ in an intelligent
144   // way to avoid unnecessary churn.
145   ChildNameState ComputeChildNames(
146       const XdsConfig::ClusterConfig* old_cluster,
147       const XdsConfig::ClusterConfig& new_cluster,
148       const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config) const;
149 
150   std::string GetChildPolicyName(const std::string& cluster, size_t priority);
151 
152   Json CreateChildPolicyConfigForLeafCluster(
153       const XdsConfig::ClusterConfig& new_cluster,
154       const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config,
155       const XdsClusterResource* aggregate_cluster_resource);
156   Json CreateChildPolicyConfigForAggregateCluster(
157       const XdsConfig::ClusterConfig::AggregateConfig& aggregate_config);
158 
159   void ResetState();
160 
161   void ReportTransientFailure(absl::Status status);
162 
163   std::string cluster_name_;
164   RefCountedPtr<const XdsConfig> xds_config_;
165 
166   // Cluster subscription, for dynamic clusters (e.g., RLS).
167   RefCountedPtr<XdsDependencyManager::ClusterSubscription> subscription_;
168 
169   ChildNameState child_name_state_;
170 
171   // Child LB policy.
172   OrphanablePtr<LoadBalancingPolicy> child_policy_;
173 
174   // Internal state.
175   bool shutting_down_ = false;
176 };
177 
178 //
179 // CdsLb
180 //
181 
CdsLb(Args args)182 CdsLb::CdsLb(Args args) : LoadBalancingPolicy(std::move(args)) {
183   GRPC_TRACE_LOG(cds_lb, INFO) << "[cdslb " << this << "] created";
184 }
185 
~CdsLb()186 CdsLb::~CdsLb() {
187   GRPC_TRACE_LOG(cds_lb, INFO)
188       << "[cdslb " << this << "] destroying cds LB policy";
189 }
190 
ShutdownLocked()191 void CdsLb::ShutdownLocked() {
192   GRPC_TRACE_LOG(cds_lb, INFO) << "[cdslb " << this << "] shutting down";
193   shutting_down_ = true;
194   ResetState();
195 }
196 
ResetBackoffLocked()197 void CdsLb::ResetBackoffLocked() {
198   if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
199 }
200 
ExitIdleLocked()201 void CdsLb::ExitIdleLocked() {
202   if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
203 }
204 
205 // We need at least one priority for each discovery mechanism, just so that we
206 // have a child in which to create the xds_cluster_impl policy.  This ensures
207 // that we properly handle the case of a discovery mechanism dropping 100% of
208 // calls, the OnError() case, and the OnResourceDoesNotExist() case.
GetUpdatePriorityList(const XdsEndpointResource * update)209 const XdsEndpointResource::PriorityList& GetUpdatePriorityList(
210     const XdsEndpointResource* update) {
211   static const NoDestruct<XdsEndpointResource::PriorityList>
212       kPriorityListWithEmptyPriority(1);
213   if (update == nullptr || update->priorities.empty()) {
214     return *kPriorityListWithEmptyPriority;
215   }
216   return update->priorities;
217 }
218 
MakeChildPolicyName(absl::string_view cluster,size_t child_number)219 std::string MakeChildPolicyName(absl::string_view cluster,
220                                 size_t child_number) {
221   return absl::StrCat("{cluster=", cluster, ", child_number=", child_number,
222                       "}");
223 }
224 
225 class PriorityEndpointIterator final : public EndpointAddressesIterator {
226  public:
PriorityEndpointIterator(std::string cluster_name,bool use_http_connect,std::shared_ptr<const XdsEndpointResource> endpoints,std::vector<size_t> priority_child_numbers)227   PriorityEndpointIterator(
228       std::string cluster_name, bool use_http_connect,
229       std::shared_ptr<const XdsEndpointResource> endpoints,
230       std::vector<size_t /*child_number*/> priority_child_numbers)
231       : cluster_name_(std::move(cluster_name)),
232         use_http_connect_(use_http_connect),
233         endpoints_(std::move(endpoints)),
234         priority_child_numbers_(std::move(priority_child_numbers)) {}
235 
ForEach(absl::FunctionRef<void (const EndpointAddresses &)> callback) const236   void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
237       const override {
238     const auto& priority_list = GetUpdatePriorityList(endpoints_.get());
239     for (size_t priority = 0; priority < priority_list.size(); ++priority) {
240       const auto& priority_entry = priority_list[priority];
241       std::string priority_child_name =
242           MakeChildPolicyName(cluster_name_, priority_child_numbers_[priority]);
243       for (const auto& p : priority_entry.localities) {
244         const auto& locality_name = p.first;
245         const auto& locality = p.second;
246         std::vector<RefCountedStringValue> hierarchical_path = {
247             RefCountedStringValue(priority_child_name),
248             locality_name->human_readable_string()};
249         auto hierarchical_path_attr =
250             MakeRefCounted<HierarchicalPathArg>(std::move(hierarchical_path));
251         for (const auto& endpoint : locality.endpoints) {
252           uint32_t endpoint_weight =
253               locality.lb_weight *
254               endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1);
255           ChannelArgs args =
256               endpoint.args()
257                   .SetObject(hierarchical_path_attr)
258                   .Set(GRPC_ARG_ADDRESS_WEIGHT, endpoint_weight)
259                   .SetObject(locality_name->Ref())
260                   .Set(GRPC_ARG_XDS_LOCALITY_WEIGHT, locality.lb_weight);
261           if (!use_http_connect_) args = args.Remove(GRPC_ARG_XDS_HTTP_PROXY);
262           callback(EndpointAddresses(endpoint.addresses(), args));
263         }
264       }
265     }
266   }
267 
268  private:
269   std::string cluster_name_;
270   bool use_http_connect_;
271   std::shared_ptr<const XdsEndpointResource> endpoints_;
272   std::vector<size_t /*child_number*/> priority_child_numbers_;
273 };
274 
UpdateLocked(UpdateArgs args)275 absl::Status CdsLb::UpdateLocked(UpdateArgs args) {
276   // Get new config.
277   auto new_config = args.config.TakeAsSubclass<CdsLbConfig>();
278   GRPC_TRACE_LOG(cds_lb, INFO)
279       << "[cdslb " << this
280       << "] received update: cluster=" << new_config->cluster()
281       << " is_dynamic=" << new_config->is_dynamic();
282   CHECK(new_config != nullptr);
283   // Cluster name should never change, because we should use a different
284   // child name in xds_cluster_manager in that case.
285   if (cluster_name_.empty()) {
286     cluster_name_ = new_config->cluster();
287   } else {
288     CHECK(cluster_name_ == new_config->cluster());
289   }
290   // Start dynamic subscription if needed.
291   if (new_config->is_dynamic() && subscription_ == nullptr) {
292     GRPC_TRACE_LOG(cds_lb, INFO)
293         << "[cdslb " << this << "] obtaining dynamic subscription for cluster "
294         << cluster_name_;
295     auto* dependency_mgr = args.args.GetObject<XdsDependencyManager>();
296     if (dependency_mgr == nullptr) {
297       // Should never happen.
298       absl::Status status =
299           absl::InternalError("xDS dependency mgr not passed to CDS LB policy");
300       ReportTransientFailure(status);
301       return status;
302     }
303     subscription_ = dependency_mgr->GetClusterSubscription(cluster_name_);
304   }
305   // Get xDS config.
306   auto new_xds_config = args.args.GetObjectRef<XdsConfig>();
307   if (new_xds_config == nullptr) {
308     // Should never happen.
309     absl::Status status =
310         absl::InternalError("xDS config not passed to CDS LB policy");
311     ReportTransientFailure(status);
312     return status;
313   }
314   auto it = new_xds_config->clusters.find(cluster_name_);
315   if (it == new_xds_config->clusters.end()) {
316     // Cluster not present.
317     if (new_config->is_dynamic()) {
318       // If we are already subscribed, it's possible that we just
319       // recently subscribed but another update came through before we
320       // got the new cluster, in which case it will still be missing.
321       GRPC_TRACE_LOG(cds_lb, INFO)
322           << "[cdslb " << this
323           << "] xDS config has no entry for dynamic cluster " << cluster_name_
324           << ", waiting for subsequent update";
325       // Stay in CONNECTING until we get an update that has the cluster.
326       return absl::OkStatus();
327     }
328     // Not a dynamic cluster.  This should never happen.
329     absl::Status status = absl::UnavailableError(absl::StrCat(
330         "xDS config has no entry for static cluster ", cluster_name_));
331     ReportTransientFailure(status);
332     return status;
333   }
334   auto& new_cluster_config = it->second;
335   // If new list is not OK, report TRANSIENT_FAILURE.
336   if (!new_cluster_config.ok()) {
337     ReportTransientFailure(new_cluster_config.status());
338     return new_cluster_config.status();
339   }
340   CHECK_NE(new_cluster_config->cluster, nullptr);
341   // Find old cluster, if any.
342   const XdsConfig::ClusterConfig* old_cluster_config = nullptr;
343   if (xds_config_ != nullptr) {
344     auto it_old = xds_config_->clusters.find(cluster_name_);
345     if (it_old != xds_config_->clusters.end() && it_old->second.ok()) {
346       old_cluster_config = &*it_old->second;
347       // If nothing changed for a leaf cluster, then ignore the update.
348       // Can't do this for an aggregate cluster, because even if the aggregate
349       // cluster itself didn't change, the leaf clusters may have changed.
350       if (*new_cluster_config == *old_cluster_config &&
351           absl::holds_alternative<XdsConfig::ClusterConfig::EndpointConfig>(
352               new_cluster_config->children)) {
353         return absl::OkStatus();
354       }
355     }
356   }
357   // TODO(roth): Remove this after the 1.63 release.
358   const XdsClusterResource* aggregate_cluster_resource = nullptr;
359   static constexpr absl::string_view kArgXdsAggregateClusterName =
360       GRPC_ARG_NO_SUBCHANNEL_PREFIX "xds_aggregate_cluster_name";
361   if (XdsAggregateClusterBackwardCompatibilityEnabled()) {
362     if (absl::holds_alternative<XdsConfig::ClusterConfig::EndpointConfig>(
363             new_cluster_config->children)) {
364       auto aggregate_cluster = args.args.GetString(kArgXdsAggregateClusterName);
365       if (aggregate_cluster.has_value()) {
366         auto it = new_xds_config->clusters.find(*aggregate_cluster);
367         if (it == new_xds_config->clusters.end()) {
368           // Cluster not present.  This should never happen.
369           absl::Status status = absl::UnavailableError(
370               absl::StrCat("xDS config has no entry for aggregate cluster ",
371                            *aggregate_cluster));
372           ReportTransientFailure(status);
373           return status;
374         }
375         auto& aggregate_cluster_config = it->second;
376         if (!aggregate_cluster_config.ok()) {
377           ReportTransientFailure(aggregate_cluster_config.status());
378           return aggregate_cluster_config.status();
379         }
380         CHECK_NE(aggregate_cluster_config->cluster, nullptr);
381         aggregate_cluster_resource = aggregate_cluster_config->cluster.get();
382       }
383     } else {
384       args.args = args.args.Set(kArgXdsAggregateClusterName, cluster_name_);
385     }
386   }
387   // Construct child policy config and update state based on the cluster type.
388   Json child_policy_config_json;
389   UpdateArgs update_args;
390   Match(
391       new_cluster_config->children,
392       // Leaf cluster.
393       [&](const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config) {
394         // Compute new child numbers.
395         child_name_state_ = ComputeChildNames(
396             old_cluster_config, *new_cluster_config, endpoint_config);
397         // Populate addresses and resolution_note for child policy.
398         update_args.addresses = std::make_shared<PriorityEndpointIterator>(
399             cluster_name_, new_cluster_config->cluster->use_http_connect,
400             endpoint_config.endpoints,
401             child_name_state_.priority_child_numbers);
402         update_args.resolution_note = endpoint_config.resolution_note;
403         // Construct child policy config.
404         child_policy_config_json = CreateChildPolicyConfigForLeafCluster(
405             *new_cluster_config, endpoint_config, aggregate_cluster_resource);
406       },
407       // Aggregate cluster.
408       [&](const XdsConfig::ClusterConfig::AggregateConfig& aggregate_config) {
409         child_name_state_.Reset();
410         // Construct child policy config.
411         child_policy_config_json =
412             CreateChildPolicyConfigForAggregateCluster(aggregate_config);
413       });
414   // Swap in new xDS config, now that we're done with the old one.
415   xds_config_ = std::move(new_xds_config);
416   // Validate child policy config.
417   auto child_config =
418       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
419           child_policy_config_json);
420   if (!child_config.ok()) {
421     // Should never happen.
422     absl::Status status = absl::InternalError(
423         absl::StrCat(cluster_name_, ": error parsing child policy config: ",
424                      child_config.status().message()));
425     ReportTransientFailure(status);
426     return status;
427   }
428   // Create child policy if not already present.
429   if (child_policy_ == nullptr) {
430     LoadBalancingPolicy::Args lb_args;
431     lb_args.work_serializer = work_serializer();
432     lb_args.args = args.args;
433     lb_args.channel_control_helper =
434         std::make_unique<Helper>(RefAsSubclass<CdsLb>());
435     child_policy_ =
436         CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
437             (*child_config)->name(), std::move(lb_args));
438     if (child_policy_ == nullptr) {
439       // Should never happen.
440       absl::Status status = absl::UnavailableError(
441           absl::StrCat(cluster_name_, ": failed to create child policy"));
442       ReportTransientFailure(status);
443       return status;
444     }
445     grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
446                                      interested_parties());
447     GRPC_TRACE_LOG(cds_lb, INFO)
448         << "[cdslb " << this << "] created child policy "
449         << (*child_config)->name() << " (" << child_policy_.get() << ")";
450   }
451   // Update child policy.
452   update_args.config = std::move(*child_config);
453   update_args.args = args.args;
454   return child_policy_->UpdateLocked(std::move(update_args));
455 }
456 
ComputeChildNames(const XdsConfig::ClusterConfig * old_cluster,const XdsConfig::ClusterConfig & new_cluster,const XdsConfig::ClusterConfig::EndpointConfig & endpoint_config) const457 CdsLb::ChildNameState CdsLb::ComputeChildNames(
458     const XdsConfig::ClusterConfig* old_cluster,
459     const XdsConfig::ClusterConfig& new_cluster,
460     const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config) const {
461   CHECK(!absl::holds_alternative<XdsConfig::ClusterConfig::AggregateConfig>(
462       new_cluster.children));
463   // First, build some maps from locality to child number and the reverse
464   // from old_cluster and child_name_state_.
465   std::map<XdsLocalityName*, size_t /*child_number*/, XdsLocalityName::Less>
466       locality_child_map;
467   std::map<size_t, std::set<XdsLocalityName*, XdsLocalityName::Less>>
468       child_locality_map;
469   if (old_cluster != nullptr) {
470     auto* old_endpoint_config =
471         absl::get_if<XdsConfig::ClusterConfig::EndpointConfig>(
472             &old_cluster->children);
473     if (old_endpoint_config != nullptr) {
474       const auto& prev_priority_list =
475           GetUpdatePriorityList(old_endpoint_config->endpoints.get());
476       for (size_t priority = 0; priority < prev_priority_list.size();
477            ++priority) {
478         size_t child_number =
479             child_name_state_.priority_child_numbers[priority];
480         const auto& localities = prev_priority_list[priority].localities;
481         for (const auto& p : localities) {
482           XdsLocalityName* locality_name = p.first;
483           locality_child_map[locality_name] = child_number;
484           child_locality_map[child_number].insert(locality_name);
485         }
486       }
487     }
488   }
489   // Now construct new state containing priority child numbers for the new
490   // cluster based on the maps constructed above.
491   ChildNameState new_child_name_state;
492   new_child_name_state.next_available_child_number =
493       child_name_state_.next_available_child_number;
494   const XdsEndpointResource::PriorityList& priority_list =
495       GetUpdatePriorityList(endpoint_config.endpoints.get());
496   for (size_t priority = 0; priority < priority_list.size(); ++priority) {
497     const auto& localities = priority_list[priority].localities;
498     absl::optional<size_t> child_number;
499     // If one of the localities in this priority already existed, reuse its
500     // child number.
501     for (const auto& p : localities) {
502       XdsLocalityName* locality_name = p.first;
503       if (!child_number.has_value()) {
504         auto it = locality_child_map.find(locality_name);
505         if (it != locality_child_map.end()) {
506           child_number = it->second;
507           locality_child_map.erase(it);
508           // Remove localities that *used* to be in this child number, so
509           // that we don't incorrectly reuse this child number for a
510           // subsequent priority.
511           for (XdsLocalityName* old_locality :
512                child_locality_map[*child_number]) {
513             locality_child_map.erase(old_locality);
514           }
515         }
516       } else {
517         // Remove all localities that are now in this child number, so
518         // that we don't accidentally reuse this child number for a
519         // subsequent priority.
520         locality_child_map.erase(locality_name);
521       }
522     }
523     // If we didn't find an existing child number, assign a new one.
524     if (!child_number.has_value()) {
525       for (child_number = new_child_name_state.next_available_child_number;
526            child_locality_map.find(*child_number) != child_locality_map.end();
527            ++(*child_number)) {
528       }
529       new_child_name_state.next_available_child_number = *child_number + 1;
530       // Add entry so we know that the child number is in use.
531       // (Don't need to add the list of localities, since we won't use them.)
532       child_locality_map[*child_number];
533     }
534     new_child_name_state.priority_child_numbers.push_back(*child_number);
535   }
536   return new_child_name_state;
537 }
538 
CreateChildPolicyConfigForLeafCluster(const XdsConfig::ClusterConfig & new_cluster,const XdsConfig::ClusterConfig::EndpointConfig & endpoint_config,const XdsClusterResource * aggregate_cluster_resource)539 Json CdsLb::CreateChildPolicyConfigForLeafCluster(
540     const XdsConfig::ClusterConfig& new_cluster,
541     const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config,
542     const XdsClusterResource* aggregate_cluster_resource) {
543   const auto& cluster_resource = *new_cluster.cluster;
544   const bool is_logical_dns =
545       absl::holds_alternative<XdsClusterResource::LogicalDns>(
546           cluster_resource.type);
547   // Determine what xDS LB policy to use.
548   Json xds_lb_policy;
549   if (is_logical_dns) {
550     xds_lb_policy = Json::FromArray({
551         Json::FromObject({
552             {"pick_first", Json::FromObject({})},
553         }),
554     });
555   }
556   // TODO(roth): Remove this "else if" block after the 1.63 release.
557   else if (XdsAggregateClusterBackwardCompatibilityEnabled() &&
558            aggregate_cluster_resource != nullptr) {
559     xds_lb_policy =
560         Json::FromArray(aggregate_cluster_resource->lb_policy_config);
561   } else {
562     xds_lb_policy = Json::FromArray(new_cluster.cluster->lb_policy_config);
563   }
564   // Wrap it in the priority policy.
565   Json::Object priority_children;
566   Json::Array priority_priorities;
567   const auto& priority_list =
568       GetUpdatePriorityList(endpoint_config.endpoints.get());
569   for (size_t priority = 0; priority < priority_list.size(); ++priority) {
570     // Add priority entry, with the appropriate child name.
571     std::string child_name = MakeChildPolicyName(
572         cluster_name_, child_name_state_.priority_child_numbers[priority]);
573     priority_priorities.emplace_back(Json::FromString(child_name));
574     Json::Object child_config = {{"config", xds_lb_policy}};
575     if (!is_logical_dns) {
576       child_config["ignore_reresolution_requests"] = Json::FromBool(true);
577     }
578     priority_children[child_name] = Json::FromObject(std::move(child_config));
579   }
580   Json priority_policy = Json::FromArray({Json::FromObject({
581       {"priority_experimental",
582        Json::FromObject({
583            {"children", Json::FromObject(std::move(priority_children))},
584            {"priorities", Json::FromArray(std::move(priority_priorities))},
585        })},
586   })});
587   // Wrap the priority policy in the xds_override_host policy.
588   Json xds_override_host_policy = Json::FromArray({Json::FromObject({
589       {"xds_override_host_experimental",
590        Json::FromObject({
591            {"clusterName", Json::FromString(cluster_name_)},
592            {"childPolicy", std::move(priority_policy)},
593        })},
594   })});
595   // Wrap the xds_override_host policy in the xds_cluster_impl policy.
596   Json xds_cluster_impl_policy = Json::FromArray({Json::FromObject({
597       {"xds_cluster_impl_experimental",
598        Json::FromObject({
599            {"clusterName", Json::FromString(cluster_name_)},
600            {"childPolicy", std::move(xds_override_host_policy)},
601        })},
602   })});
603   // Wrap the xds_cluster_impl policy in the outlier_detection policy.
604   Json::Object outlier_detection_config = {
605       {"childPolicy", std::move(xds_cluster_impl_policy)},
606   };
607   if (cluster_resource.outlier_detection.has_value()) {
608     auto& outlier_detection_update = *cluster_resource.outlier_detection;
609     outlier_detection_config["interval"] =
610         Json::FromString(outlier_detection_update.interval.ToJsonString());
611     outlier_detection_config["baseEjectionTime"] = Json::FromString(
612         outlier_detection_update.base_ejection_time.ToJsonString());
613     outlier_detection_config["maxEjectionTime"] = Json::FromString(
614         outlier_detection_update.max_ejection_time.ToJsonString());
615     outlier_detection_config["maxEjectionPercent"] =
616         Json::FromNumber(outlier_detection_update.max_ejection_percent);
617     if (outlier_detection_update.success_rate_ejection.has_value()) {
618       outlier_detection_config["successRateEjection"] = Json::FromObject({
619           {"stdevFactor",
620            Json::FromNumber(
621                outlier_detection_update.success_rate_ejection->stdev_factor)},
622           {"enforcementPercentage",
623            Json::FromNumber(outlier_detection_update.success_rate_ejection
624                                 ->enforcement_percentage)},
625           {"minimumHosts",
626            Json::FromNumber(
627                outlier_detection_update.success_rate_ejection->minimum_hosts)},
628           {"requestVolume",
629            Json::FromNumber(
630                outlier_detection_update.success_rate_ejection->request_volume)},
631       });
632     }
633     if (outlier_detection_update.failure_percentage_ejection.has_value()) {
634       outlier_detection_config["failurePercentageEjection"] = Json::FromObject({
635           {"threshold",
636            Json::FromNumber(outlier_detection_update
637                                 .failure_percentage_ejection->threshold)},
638           {"enforcementPercentage",
639            Json::FromNumber(
640                outlier_detection_update.failure_percentage_ejection
641                    ->enforcement_percentage)},
642           {"minimumHosts",
643            Json::FromNumber(outlier_detection_update
644                                 .failure_percentage_ejection->minimum_hosts)},
645           {"requestVolume",
646            Json::FromNumber(outlier_detection_update
647                                 .failure_percentage_ejection->request_volume)},
648       });
649     }
650   }
651   Json outlier_detection_policy = Json::FromArray({Json::FromObject({
652       {"outlier_detection_experimental",
653        Json::FromObject(std::move(outlier_detection_config))},
654   })});
655   GRPC_TRACE_LOG(cds_lb, INFO)
656       << "[cdslb " << this << "] generated config for child policy: "
657       << JsonDump(outlier_detection_policy, /*indent=*/1);
658   return outlier_detection_policy;
659 }
660 
CreateChildPolicyConfigForAggregateCluster(const XdsConfig::ClusterConfig::AggregateConfig & aggregate_config)661 Json CdsLb::CreateChildPolicyConfigForAggregateCluster(
662     const XdsConfig::ClusterConfig::AggregateConfig& aggregate_config) {
663   Json::Object priority_children;
664   Json::Array priority_priorities;
665   for (const absl::string_view& leaf_cluster : aggregate_config.leaf_clusters) {
666     priority_children[std::string(leaf_cluster)] = Json::FromObject({
667         {"config",
668          Json::FromArray({
669              Json::FromObject({
670                  {"cds_experimental",
671                   Json::FromObject({
672                       {"cluster", Json::FromString(std::string(leaf_cluster))},
673                   })},
674              }),
675          })},
676     });
677     priority_priorities.emplace_back(
678         Json::FromString(std::string(leaf_cluster)));
679   }
680   Json json = Json::FromArray({Json::FromObject({
681       {"priority_experimental",
682        Json::FromObject({
683            {"children", Json::FromObject(std::move(priority_children))},
684            {"priorities", Json::FromArray(std::move(priority_priorities))},
685        })},
686   })});
687   GRPC_TRACE_LOG(cds_lb, INFO)
688       << "[cdslb " << this << "] generated config for child policy: "
689       << JsonDump(json, /*indent=*/1);
690   return json;
691 }
692 
ResetState()693 void CdsLb::ResetState() {
694   cluster_name_.clear();
695   xds_config_.reset();
696   child_name_state_.Reset();
697   if (child_policy_ != nullptr) {
698     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
699                                      interested_parties());
700     child_policy_.reset();
701   }
702 }
703 
ReportTransientFailure(absl::Status status)704 void CdsLb::ReportTransientFailure(absl::Status status) {
705   GRPC_TRACE_LOG(cds_lb, INFO)
706       << "[cdslb " << this << "] reporting TRANSIENT_FAILURE: " << status;
707   ResetState();
708   channel_control_helper()->UpdateState(
709       GRPC_CHANNEL_TRANSIENT_FAILURE, status,
710       MakeRefCounted<TransientFailurePicker>(status));
711 }
712 
713 //
714 // factory
715 //
716 
717 class CdsLbFactory final : public LoadBalancingPolicyFactory {
718  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const719   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
720       LoadBalancingPolicy::Args args) const override {
721     return MakeOrphanable<CdsLb>(std::move(args));
722   }
723 
name() const724   absl::string_view name() const override { return kCds; }
725 
726   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const727   ParseLoadBalancingConfig(const Json& json) const override {
728     return LoadFromJson<RefCountedPtr<CdsLbConfig>>(
729         json, JsonArgs(), "errors validating cds LB policy config");
730   }
731 };
732 
733 }  // namespace
734 
RegisterCdsLbPolicy(CoreConfiguration::Builder * builder)735 void RegisterCdsLbPolicy(CoreConfiguration::Builder* builder) {
736   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
737       std::make_unique<CdsLbFactory>());
738 }
739 
740 }  // namespace grpc_core
741