1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/grpc_security.h>
18 #include <grpc/impl/connectivity_state.h>
19 #include <grpc/support/json.h>
20 #include <grpc/support/port_platform.h>
21
22 #include <algorithm>
23 #include <map>
24 #include <memory>
25 #include <set>
26 #include <string>
27 #include <type_traits>
28 #include <utility>
29 #include <vector>
30
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/str_cat.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/types/optional.h"
38 #include "absl/types/variant.h"
39 #include "src/core/config/core_configuration.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/debug/trace.h"
42 #include "src/core/lib/iomgr/pollset_set.h"
43 #include "src/core/load_balancing/address_filtering.h"
44 #include "src/core/load_balancing/delegating_helper.h"
45 #include "src/core/load_balancing/lb_policy.h"
46 #include "src/core/load_balancing/lb_policy_factory.h"
47 #include "src/core/load_balancing/lb_policy_registry.h"
48 #include "src/core/load_balancing/outlier_detection/outlier_detection.h"
49 #include "src/core/load_balancing/xds/xds_channel_args.h"
50 #include "src/core/resolver/xds/xds_dependency_manager.h"
51 #include "src/core/util/debug_location.h"
52 #include "src/core/util/env.h"
53 #include "src/core/util/json/json.h"
54 #include "src/core/util/json/json_args.h"
55 #include "src/core/util/json/json_object_loader.h"
56 #include "src/core/util/json/json_writer.h"
57 #include "src/core/util/match.h"
58 #include "src/core/util/orphanable.h"
59 #include "src/core/util/ref_counted_ptr.h"
60 #include "src/core/util/time.h"
61 #include "src/core/util/unique_type_name.h"
62 #include "src/core/util/work_serializer.h"
63 #include "src/core/xds/grpc/xds_cluster.h"
64 #include "src/core/xds/grpc/xds_common_types.h"
65 #include "src/core/xds/grpc/xds_health_status.h"
66
67 namespace grpc_core {
68
69 namespace {
70
71 // TODO(roth): Remove this after the 1.63 release.
XdsAggregateClusterBackwardCompatibilityEnabled()72 bool XdsAggregateClusterBackwardCompatibilityEnabled() {
73 auto value = GetEnv("GRPC_XDS_AGGREGATE_CLUSTER_BACKWARD_COMPAT");
74 if (!value.has_value()) return false;
75 bool parsed_value;
76 bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
77 return parse_succeeded && parsed_value;
78 }
79
80 constexpr absl::string_view kCds = "cds_experimental";
81
82 // Config for this LB policy.
83 class CdsLbConfig final : public LoadBalancingPolicy::Config {
84 public:
85 CdsLbConfig() = default;
86
87 CdsLbConfig(const CdsLbConfig&) = delete;
88 CdsLbConfig& operator=(const CdsLbConfig&) = delete;
89
90 CdsLbConfig(CdsLbConfig&& other) = delete;
91 CdsLbConfig& operator=(CdsLbConfig&& other) = delete;
92
name() const93 absl::string_view name() const override { return kCds; }
94
cluster() const95 const std::string& cluster() const { return cluster_; }
is_dynamic() const96 bool is_dynamic() const { return is_dynamic_; }
97
JsonLoader(const JsonArgs &)98 static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
99 static const auto* loader =
100 JsonObjectLoader<CdsLbConfig>()
101 .Field("cluster", &CdsLbConfig::cluster_)
102 .OptionalField("isDynamic", &CdsLbConfig::is_dynamic_)
103 .Finish();
104 return loader;
105 }
106
107 private:
108 std::string cluster_;
109 bool is_dynamic_ = false;
110 };
111
112 // CDS LB policy.
113 class CdsLb final : public LoadBalancingPolicy {
114 public:
115 explicit CdsLb(Args args);
116
name() const117 absl::string_view name() const override { return kCds; }
118
119 absl::Status UpdateLocked(UpdateArgs args) override;
120 void ResetBackoffLocked() override;
121 void ExitIdleLocked() override;
122
123 private:
124 // Delegating helper to be passed to child policy.
125 using Helper = ParentOwningDelegatingChannelControlHelper<CdsLb>;
126
127 // State used to retain child policy names for the priority policy.
128 struct ChildNameState {
129 std::vector<size_t /*child_number*/> priority_child_numbers;
130 size_t next_available_child_number = 0;
131
Resetgrpc_core::__anon80268f070111::CdsLb::ChildNameState132 void Reset() {
133 priority_child_numbers.clear();
134 next_available_child_number = 0;
135 }
136 };
137
138 ~CdsLb() override;
139
140 void ShutdownLocked() override;
141
142 // Computes child numbers for new_cluster, reusing child numbers
143 // from old_cluster and child_name_state_ in an intelligent
144 // way to avoid unnecessary churn.
145 ChildNameState ComputeChildNames(
146 const XdsConfig::ClusterConfig* old_cluster,
147 const XdsConfig::ClusterConfig& new_cluster,
148 const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config) const;
149
150 std::string GetChildPolicyName(const std::string& cluster, size_t priority);
151
152 Json CreateChildPolicyConfigForLeafCluster(
153 const XdsConfig::ClusterConfig& new_cluster,
154 const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config,
155 const XdsClusterResource* aggregate_cluster_resource);
156 Json CreateChildPolicyConfigForAggregateCluster(
157 const XdsConfig::ClusterConfig::AggregateConfig& aggregate_config);
158
159 void ResetState();
160
161 void ReportTransientFailure(absl::Status status);
162
163 std::string cluster_name_;
164 RefCountedPtr<const XdsConfig> xds_config_;
165
166 // Cluster subscription, for dynamic clusters (e.g., RLS).
167 RefCountedPtr<XdsDependencyManager::ClusterSubscription> subscription_;
168
169 ChildNameState child_name_state_;
170
171 // Child LB policy.
172 OrphanablePtr<LoadBalancingPolicy> child_policy_;
173
174 // Internal state.
175 bool shutting_down_ = false;
176 };
177
178 //
179 // CdsLb
180 //
181
CdsLb(Args args)182 CdsLb::CdsLb(Args args) : LoadBalancingPolicy(std::move(args)) {
183 GRPC_TRACE_LOG(cds_lb, INFO) << "[cdslb " << this << "] created";
184 }
185
~CdsLb()186 CdsLb::~CdsLb() {
187 GRPC_TRACE_LOG(cds_lb, INFO)
188 << "[cdslb " << this << "] destroying cds LB policy";
189 }
190
ShutdownLocked()191 void CdsLb::ShutdownLocked() {
192 GRPC_TRACE_LOG(cds_lb, INFO) << "[cdslb " << this << "] shutting down";
193 shutting_down_ = true;
194 ResetState();
195 }
196
ResetBackoffLocked()197 void CdsLb::ResetBackoffLocked() {
198 if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
199 }
200
ExitIdleLocked()201 void CdsLb::ExitIdleLocked() {
202 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
203 }
204
205 // We need at least one priority for each discovery mechanism, just so that we
206 // have a child in which to create the xds_cluster_impl policy. This ensures
207 // that we properly handle the case of a discovery mechanism dropping 100% of
208 // calls, the OnError() case, and the OnResourceDoesNotExist() case.
GetUpdatePriorityList(const XdsEndpointResource * update)209 const XdsEndpointResource::PriorityList& GetUpdatePriorityList(
210 const XdsEndpointResource* update) {
211 static const NoDestruct<XdsEndpointResource::PriorityList>
212 kPriorityListWithEmptyPriority(1);
213 if (update == nullptr || update->priorities.empty()) {
214 return *kPriorityListWithEmptyPriority;
215 }
216 return update->priorities;
217 }
218
MakeChildPolicyName(absl::string_view cluster,size_t child_number)219 std::string MakeChildPolicyName(absl::string_view cluster,
220 size_t child_number) {
221 return absl::StrCat("{cluster=", cluster, ", child_number=", child_number,
222 "}");
223 }
224
225 class PriorityEndpointIterator final : public EndpointAddressesIterator {
226 public:
PriorityEndpointIterator(std::string cluster_name,bool use_http_connect,std::shared_ptr<const XdsEndpointResource> endpoints,std::vector<size_t> priority_child_numbers)227 PriorityEndpointIterator(
228 std::string cluster_name, bool use_http_connect,
229 std::shared_ptr<const XdsEndpointResource> endpoints,
230 std::vector<size_t /*child_number*/> priority_child_numbers)
231 : cluster_name_(std::move(cluster_name)),
232 use_http_connect_(use_http_connect),
233 endpoints_(std::move(endpoints)),
234 priority_child_numbers_(std::move(priority_child_numbers)) {}
235
ForEach(absl::FunctionRef<void (const EndpointAddresses &)> callback) const236 void ForEach(absl::FunctionRef<void(const EndpointAddresses&)> callback)
237 const override {
238 const auto& priority_list = GetUpdatePriorityList(endpoints_.get());
239 for (size_t priority = 0; priority < priority_list.size(); ++priority) {
240 const auto& priority_entry = priority_list[priority];
241 std::string priority_child_name =
242 MakeChildPolicyName(cluster_name_, priority_child_numbers_[priority]);
243 for (const auto& p : priority_entry.localities) {
244 const auto& locality_name = p.first;
245 const auto& locality = p.second;
246 std::vector<RefCountedStringValue> hierarchical_path = {
247 RefCountedStringValue(priority_child_name),
248 locality_name->human_readable_string()};
249 auto hierarchical_path_attr =
250 MakeRefCounted<HierarchicalPathArg>(std::move(hierarchical_path));
251 for (const auto& endpoint : locality.endpoints) {
252 uint32_t endpoint_weight =
253 locality.lb_weight *
254 endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1);
255 ChannelArgs args =
256 endpoint.args()
257 .SetObject(hierarchical_path_attr)
258 .Set(GRPC_ARG_ADDRESS_WEIGHT, endpoint_weight)
259 .SetObject(locality_name->Ref())
260 .Set(GRPC_ARG_XDS_LOCALITY_WEIGHT, locality.lb_weight);
261 if (!use_http_connect_) args = args.Remove(GRPC_ARG_XDS_HTTP_PROXY);
262 callback(EndpointAddresses(endpoint.addresses(), args));
263 }
264 }
265 }
266 }
267
268 private:
269 std::string cluster_name_;
270 bool use_http_connect_;
271 std::shared_ptr<const XdsEndpointResource> endpoints_;
272 std::vector<size_t /*child_number*/> priority_child_numbers_;
273 };
274
UpdateLocked(UpdateArgs args)275 absl::Status CdsLb::UpdateLocked(UpdateArgs args) {
276 // Get new config.
277 auto new_config = args.config.TakeAsSubclass<CdsLbConfig>();
278 GRPC_TRACE_LOG(cds_lb, INFO)
279 << "[cdslb " << this
280 << "] received update: cluster=" << new_config->cluster()
281 << " is_dynamic=" << new_config->is_dynamic();
282 CHECK(new_config != nullptr);
283 // Cluster name should never change, because we should use a different
284 // child name in xds_cluster_manager in that case.
285 if (cluster_name_.empty()) {
286 cluster_name_ = new_config->cluster();
287 } else {
288 CHECK(cluster_name_ == new_config->cluster());
289 }
290 // Start dynamic subscription if needed.
291 if (new_config->is_dynamic() && subscription_ == nullptr) {
292 GRPC_TRACE_LOG(cds_lb, INFO)
293 << "[cdslb " << this << "] obtaining dynamic subscription for cluster "
294 << cluster_name_;
295 auto* dependency_mgr = args.args.GetObject<XdsDependencyManager>();
296 if (dependency_mgr == nullptr) {
297 // Should never happen.
298 absl::Status status =
299 absl::InternalError("xDS dependency mgr not passed to CDS LB policy");
300 ReportTransientFailure(status);
301 return status;
302 }
303 subscription_ = dependency_mgr->GetClusterSubscription(cluster_name_);
304 }
305 // Get xDS config.
306 auto new_xds_config = args.args.GetObjectRef<XdsConfig>();
307 if (new_xds_config == nullptr) {
308 // Should never happen.
309 absl::Status status =
310 absl::InternalError("xDS config not passed to CDS LB policy");
311 ReportTransientFailure(status);
312 return status;
313 }
314 auto it = new_xds_config->clusters.find(cluster_name_);
315 if (it == new_xds_config->clusters.end()) {
316 // Cluster not present.
317 if (new_config->is_dynamic()) {
318 // If we are already subscribed, it's possible that we just
319 // recently subscribed but another update came through before we
320 // got the new cluster, in which case it will still be missing.
321 GRPC_TRACE_LOG(cds_lb, INFO)
322 << "[cdslb " << this
323 << "] xDS config has no entry for dynamic cluster " << cluster_name_
324 << ", waiting for subsequent update";
325 // Stay in CONNECTING until we get an update that has the cluster.
326 return absl::OkStatus();
327 }
328 // Not a dynamic cluster. This should never happen.
329 absl::Status status = absl::UnavailableError(absl::StrCat(
330 "xDS config has no entry for static cluster ", cluster_name_));
331 ReportTransientFailure(status);
332 return status;
333 }
334 auto& new_cluster_config = it->second;
335 // If new list is not OK, report TRANSIENT_FAILURE.
336 if (!new_cluster_config.ok()) {
337 ReportTransientFailure(new_cluster_config.status());
338 return new_cluster_config.status();
339 }
340 CHECK_NE(new_cluster_config->cluster, nullptr);
341 // Find old cluster, if any.
342 const XdsConfig::ClusterConfig* old_cluster_config = nullptr;
343 if (xds_config_ != nullptr) {
344 auto it_old = xds_config_->clusters.find(cluster_name_);
345 if (it_old != xds_config_->clusters.end() && it_old->second.ok()) {
346 old_cluster_config = &*it_old->second;
347 // If nothing changed for a leaf cluster, then ignore the update.
348 // Can't do this for an aggregate cluster, because even if the aggregate
349 // cluster itself didn't change, the leaf clusters may have changed.
350 if (*new_cluster_config == *old_cluster_config &&
351 absl::holds_alternative<XdsConfig::ClusterConfig::EndpointConfig>(
352 new_cluster_config->children)) {
353 return absl::OkStatus();
354 }
355 }
356 }
357 // TODO(roth): Remove this after the 1.63 release.
358 const XdsClusterResource* aggregate_cluster_resource = nullptr;
359 static constexpr absl::string_view kArgXdsAggregateClusterName =
360 GRPC_ARG_NO_SUBCHANNEL_PREFIX "xds_aggregate_cluster_name";
361 if (XdsAggregateClusterBackwardCompatibilityEnabled()) {
362 if (absl::holds_alternative<XdsConfig::ClusterConfig::EndpointConfig>(
363 new_cluster_config->children)) {
364 auto aggregate_cluster = args.args.GetString(kArgXdsAggregateClusterName);
365 if (aggregate_cluster.has_value()) {
366 auto it = new_xds_config->clusters.find(*aggregate_cluster);
367 if (it == new_xds_config->clusters.end()) {
368 // Cluster not present. This should never happen.
369 absl::Status status = absl::UnavailableError(
370 absl::StrCat("xDS config has no entry for aggregate cluster ",
371 *aggregate_cluster));
372 ReportTransientFailure(status);
373 return status;
374 }
375 auto& aggregate_cluster_config = it->second;
376 if (!aggregate_cluster_config.ok()) {
377 ReportTransientFailure(aggregate_cluster_config.status());
378 return aggregate_cluster_config.status();
379 }
380 CHECK_NE(aggregate_cluster_config->cluster, nullptr);
381 aggregate_cluster_resource = aggregate_cluster_config->cluster.get();
382 }
383 } else {
384 args.args = args.args.Set(kArgXdsAggregateClusterName, cluster_name_);
385 }
386 }
387 // Construct child policy config and update state based on the cluster type.
388 Json child_policy_config_json;
389 UpdateArgs update_args;
390 Match(
391 new_cluster_config->children,
392 // Leaf cluster.
393 [&](const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config) {
394 // Compute new child numbers.
395 child_name_state_ = ComputeChildNames(
396 old_cluster_config, *new_cluster_config, endpoint_config);
397 // Populate addresses and resolution_note for child policy.
398 update_args.addresses = std::make_shared<PriorityEndpointIterator>(
399 cluster_name_, new_cluster_config->cluster->use_http_connect,
400 endpoint_config.endpoints,
401 child_name_state_.priority_child_numbers);
402 update_args.resolution_note = endpoint_config.resolution_note;
403 // Construct child policy config.
404 child_policy_config_json = CreateChildPolicyConfigForLeafCluster(
405 *new_cluster_config, endpoint_config, aggregate_cluster_resource);
406 },
407 // Aggregate cluster.
408 [&](const XdsConfig::ClusterConfig::AggregateConfig& aggregate_config) {
409 child_name_state_.Reset();
410 // Construct child policy config.
411 child_policy_config_json =
412 CreateChildPolicyConfigForAggregateCluster(aggregate_config);
413 });
414 // Swap in new xDS config, now that we're done with the old one.
415 xds_config_ = std::move(new_xds_config);
416 // Validate child policy config.
417 auto child_config =
418 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
419 child_policy_config_json);
420 if (!child_config.ok()) {
421 // Should never happen.
422 absl::Status status = absl::InternalError(
423 absl::StrCat(cluster_name_, ": error parsing child policy config: ",
424 child_config.status().message()));
425 ReportTransientFailure(status);
426 return status;
427 }
428 // Create child policy if not already present.
429 if (child_policy_ == nullptr) {
430 LoadBalancingPolicy::Args lb_args;
431 lb_args.work_serializer = work_serializer();
432 lb_args.args = args.args;
433 lb_args.channel_control_helper =
434 std::make_unique<Helper>(RefAsSubclass<CdsLb>());
435 child_policy_ =
436 CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
437 (*child_config)->name(), std::move(lb_args));
438 if (child_policy_ == nullptr) {
439 // Should never happen.
440 absl::Status status = absl::UnavailableError(
441 absl::StrCat(cluster_name_, ": failed to create child policy"));
442 ReportTransientFailure(status);
443 return status;
444 }
445 grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
446 interested_parties());
447 GRPC_TRACE_LOG(cds_lb, INFO)
448 << "[cdslb " << this << "] created child policy "
449 << (*child_config)->name() << " (" << child_policy_.get() << ")";
450 }
451 // Update child policy.
452 update_args.config = std::move(*child_config);
453 update_args.args = args.args;
454 return child_policy_->UpdateLocked(std::move(update_args));
455 }
456
ComputeChildNames(const XdsConfig::ClusterConfig * old_cluster,const XdsConfig::ClusterConfig & new_cluster,const XdsConfig::ClusterConfig::EndpointConfig & endpoint_config) const457 CdsLb::ChildNameState CdsLb::ComputeChildNames(
458 const XdsConfig::ClusterConfig* old_cluster,
459 const XdsConfig::ClusterConfig& new_cluster,
460 const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config) const {
461 CHECK(!absl::holds_alternative<XdsConfig::ClusterConfig::AggregateConfig>(
462 new_cluster.children));
463 // First, build some maps from locality to child number and the reverse
464 // from old_cluster and child_name_state_.
465 std::map<XdsLocalityName*, size_t /*child_number*/, XdsLocalityName::Less>
466 locality_child_map;
467 std::map<size_t, std::set<XdsLocalityName*, XdsLocalityName::Less>>
468 child_locality_map;
469 if (old_cluster != nullptr) {
470 auto* old_endpoint_config =
471 absl::get_if<XdsConfig::ClusterConfig::EndpointConfig>(
472 &old_cluster->children);
473 if (old_endpoint_config != nullptr) {
474 const auto& prev_priority_list =
475 GetUpdatePriorityList(old_endpoint_config->endpoints.get());
476 for (size_t priority = 0; priority < prev_priority_list.size();
477 ++priority) {
478 size_t child_number =
479 child_name_state_.priority_child_numbers[priority];
480 const auto& localities = prev_priority_list[priority].localities;
481 for (const auto& p : localities) {
482 XdsLocalityName* locality_name = p.first;
483 locality_child_map[locality_name] = child_number;
484 child_locality_map[child_number].insert(locality_name);
485 }
486 }
487 }
488 }
489 // Now construct new state containing priority child numbers for the new
490 // cluster based on the maps constructed above.
491 ChildNameState new_child_name_state;
492 new_child_name_state.next_available_child_number =
493 child_name_state_.next_available_child_number;
494 const XdsEndpointResource::PriorityList& priority_list =
495 GetUpdatePriorityList(endpoint_config.endpoints.get());
496 for (size_t priority = 0; priority < priority_list.size(); ++priority) {
497 const auto& localities = priority_list[priority].localities;
498 absl::optional<size_t> child_number;
499 // If one of the localities in this priority already existed, reuse its
500 // child number.
501 for (const auto& p : localities) {
502 XdsLocalityName* locality_name = p.first;
503 if (!child_number.has_value()) {
504 auto it = locality_child_map.find(locality_name);
505 if (it != locality_child_map.end()) {
506 child_number = it->second;
507 locality_child_map.erase(it);
508 // Remove localities that *used* to be in this child number, so
509 // that we don't incorrectly reuse this child number for a
510 // subsequent priority.
511 for (XdsLocalityName* old_locality :
512 child_locality_map[*child_number]) {
513 locality_child_map.erase(old_locality);
514 }
515 }
516 } else {
517 // Remove all localities that are now in this child number, so
518 // that we don't accidentally reuse this child number for a
519 // subsequent priority.
520 locality_child_map.erase(locality_name);
521 }
522 }
523 // If we didn't find an existing child number, assign a new one.
524 if (!child_number.has_value()) {
525 for (child_number = new_child_name_state.next_available_child_number;
526 child_locality_map.find(*child_number) != child_locality_map.end();
527 ++(*child_number)) {
528 }
529 new_child_name_state.next_available_child_number = *child_number + 1;
530 // Add entry so we know that the child number is in use.
531 // (Don't need to add the list of localities, since we won't use them.)
532 child_locality_map[*child_number];
533 }
534 new_child_name_state.priority_child_numbers.push_back(*child_number);
535 }
536 return new_child_name_state;
537 }
538
CreateChildPolicyConfigForLeafCluster(const XdsConfig::ClusterConfig & new_cluster,const XdsConfig::ClusterConfig::EndpointConfig & endpoint_config,const XdsClusterResource * aggregate_cluster_resource)539 Json CdsLb::CreateChildPolicyConfigForLeafCluster(
540 const XdsConfig::ClusterConfig& new_cluster,
541 const XdsConfig::ClusterConfig::EndpointConfig& endpoint_config,
542 const XdsClusterResource* aggregate_cluster_resource) {
543 const auto& cluster_resource = *new_cluster.cluster;
544 const bool is_logical_dns =
545 absl::holds_alternative<XdsClusterResource::LogicalDns>(
546 cluster_resource.type);
547 // Determine what xDS LB policy to use.
548 Json xds_lb_policy;
549 if (is_logical_dns) {
550 xds_lb_policy = Json::FromArray({
551 Json::FromObject({
552 {"pick_first", Json::FromObject({})},
553 }),
554 });
555 }
556 // TODO(roth): Remove this "else if" block after the 1.63 release.
557 else if (XdsAggregateClusterBackwardCompatibilityEnabled() &&
558 aggregate_cluster_resource != nullptr) {
559 xds_lb_policy =
560 Json::FromArray(aggregate_cluster_resource->lb_policy_config);
561 } else {
562 xds_lb_policy = Json::FromArray(new_cluster.cluster->lb_policy_config);
563 }
564 // Wrap it in the priority policy.
565 Json::Object priority_children;
566 Json::Array priority_priorities;
567 const auto& priority_list =
568 GetUpdatePriorityList(endpoint_config.endpoints.get());
569 for (size_t priority = 0; priority < priority_list.size(); ++priority) {
570 // Add priority entry, with the appropriate child name.
571 std::string child_name = MakeChildPolicyName(
572 cluster_name_, child_name_state_.priority_child_numbers[priority]);
573 priority_priorities.emplace_back(Json::FromString(child_name));
574 Json::Object child_config = {{"config", xds_lb_policy}};
575 if (!is_logical_dns) {
576 child_config["ignore_reresolution_requests"] = Json::FromBool(true);
577 }
578 priority_children[child_name] = Json::FromObject(std::move(child_config));
579 }
580 Json priority_policy = Json::FromArray({Json::FromObject({
581 {"priority_experimental",
582 Json::FromObject({
583 {"children", Json::FromObject(std::move(priority_children))},
584 {"priorities", Json::FromArray(std::move(priority_priorities))},
585 })},
586 })});
587 // Wrap the priority policy in the xds_override_host policy.
588 Json xds_override_host_policy = Json::FromArray({Json::FromObject({
589 {"xds_override_host_experimental",
590 Json::FromObject({
591 {"clusterName", Json::FromString(cluster_name_)},
592 {"childPolicy", std::move(priority_policy)},
593 })},
594 })});
595 // Wrap the xds_override_host policy in the xds_cluster_impl policy.
596 Json xds_cluster_impl_policy = Json::FromArray({Json::FromObject({
597 {"xds_cluster_impl_experimental",
598 Json::FromObject({
599 {"clusterName", Json::FromString(cluster_name_)},
600 {"childPolicy", std::move(xds_override_host_policy)},
601 })},
602 })});
603 // Wrap the xds_cluster_impl policy in the outlier_detection policy.
604 Json::Object outlier_detection_config = {
605 {"childPolicy", std::move(xds_cluster_impl_policy)},
606 };
607 if (cluster_resource.outlier_detection.has_value()) {
608 auto& outlier_detection_update = *cluster_resource.outlier_detection;
609 outlier_detection_config["interval"] =
610 Json::FromString(outlier_detection_update.interval.ToJsonString());
611 outlier_detection_config["baseEjectionTime"] = Json::FromString(
612 outlier_detection_update.base_ejection_time.ToJsonString());
613 outlier_detection_config["maxEjectionTime"] = Json::FromString(
614 outlier_detection_update.max_ejection_time.ToJsonString());
615 outlier_detection_config["maxEjectionPercent"] =
616 Json::FromNumber(outlier_detection_update.max_ejection_percent);
617 if (outlier_detection_update.success_rate_ejection.has_value()) {
618 outlier_detection_config["successRateEjection"] = Json::FromObject({
619 {"stdevFactor",
620 Json::FromNumber(
621 outlier_detection_update.success_rate_ejection->stdev_factor)},
622 {"enforcementPercentage",
623 Json::FromNumber(outlier_detection_update.success_rate_ejection
624 ->enforcement_percentage)},
625 {"minimumHosts",
626 Json::FromNumber(
627 outlier_detection_update.success_rate_ejection->minimum_hosts)},
628 {"requestVolume",
629 Json::FromNumber(
630 outlier_detection_update.success_rate_ejection->request_volume)},
631 });
632 }
633 if (outlier_detection_update.failure_percentage_ejection.has_value()) {
634 outlier_detection_config["failurePercentageEjection"] = Json::FromObject({
635 {"threshold",
636 Json::FromNumber(outlier_detection_update
637 .failure_percentage_ejection->threshold)},
638 {"enforcementPercentage",
639 Json::FromNumber(
640 outlier_detection_update.failure_percentage_ejection
641 ->enforcement_percentage)},
642 {"minimumHosts",
643 Json::FromNumber(outlier_detection_update
644 .failure_percentage_ejection->minimum_hosts)},
645 {"requestVolume",
646 Json::FromNumber(outlier_detection_update
647 .failure_percentage_ejection->request_volume)},
648 });
649 }
650 }
651 Json outlier_detection_policy = Json::FromArray({Json::FromObject({
652 {"outlier_detection_experimental",
653 Json::FromObject(std::move(outlier_detection_config))},
654 })});
655 GRPC_TRACE_LOG(cds_lb, INFO)
656 << "[cdslb " << this << "] generated config for child policy: "
657 << JsonDump(outlier_detection_policy, /*indent=*/1);
658 return outlier_detection_policy;
659 }
660
CreateChildPolicyConfigForAggregateCluster(const XdsConfig::ClusterConfig::AggregateConfig & aggregate_config)661 Json CdsLb::CreateChildPolicyConfigForAggregateCluster(
662 const XdsConfig::ClusterConfig::AggregateConfig& aggregate_config) {
663 Json::Object priority_children;
664 Json::Array priority_priorities;
665 for (const absl::string_view& leaf_cluster : aggregate_config.leaf_clusters) {
666 priority_children[std::string(leaf_cluster)] = Json::FromObject({
667 {"config",
668 Json::FromArray({
669 Json::FromObject({
670 {"cds_experimental",
671 Json::FromObject({
672 {"cluster", Json::FromString(std::string(leaf_cluster))},
673 })},
674 }),
675 })},
676 });
677 priority_priorities.emplace_back(
678 Json::FromString(std::string(leaf_cluster)));
679 }
680 Json json = Json::FromArray({Json::FromObject({
681 {"priority_experimental",
682 Json::FromObject({
683 {"children", Json::FromObject(std::move(priority_children))},
684 {"priorities", Json::FromArray(std::move(priority_priorities))},
685 })},
686 })});
687 GRPC_TRACE_LOG(cds_lb, INFO)
688 << "[cdslb " << this << "] generated config for child policy: "
689 << JsonDump(json, /*indent=*/1);
690 return json;
691 }
692
ResetState()693 void CdsLb::ResetState() {
694 cluster_name_.clear();
695 xds_config_.reset();
696 child_name_state_.Reset();
697 if (child_policy_ != nullptr) {
698 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
699 interested_parties());
700 child_policy_.reset();
701 }
702 }
703
ReportTransientFailure(absl::Status status)704 void CdsLb::ReportTransientFailure(absl::Status status) {
705 GRPC_TRACE_LOG(cds_lb, INFO)
706 << "[cdslb " << this << "] reporting TRANSIENT_FAILURE: " << status;
707 ResetState();
708 channel_control_helper()->UpdateState(
709 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
710 MakeRefCounted<TransientFailurePicker>(status));
711 }
712
713 //
714 // factory
715 //
716
717 class CdsLbFactory final : public LoadBalancingPolicyFactory {
718 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const719 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
720 LoadBalancingPolicy::Args args) const override {
721 return MakeOrphanable<CdsLb>(std::move(args));
722 }
723
name() const724 absl::string_view name() const override { return kCds; }
725
726 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const727 ParseLoadBalancingConfig(const Json& json) const override {
728 return LoadFromJson<RefCountedPtr<CdsLbConfig>>(
729 json, JsonArgs(), "errors validating cds LB policy config");
730 }
731 };
732
733 } // namespace
734
RegisterCdsLbPolicy(CoreConfiguration::Builder * builder)735 void RegisterCdsLbPolicy(CoreConfiguration::Builder* builder) {
736 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
737 std::make_unique<CdsLbFactory>());
738 }
739
740 } // namespace grpc_core
741