1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/event_engine/event_engine.h>
18 #include <grpc/impl/connectivity_state.h>
19 #include <grpc/support/port_platform.h>
20 #include <stddef.h>
21
22 #include <algorithm>
23 #include <functional>
24 #include <map>
25 #include <memory>
26 #include <string>
27 #include <type_traits>
28 #include <utility>
29 #include <vector>
30
31 #include "absl/log/log.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/str_cat.h"
35 #include "absl/strings/str_join.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/types/optional.h"
38 #include "src/core/client_channel/client_channel_internal.h"
39 #include "src/core/config/core_configuration.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/debug/trace.h"
42 #include "src/core/lib/iomgr/exec_ctx.h"
43 #include "src/core/lib/iomgr/pollset_set.h"
44 #include "src/core/lib/transport/connectivity_state.h"
45 #include "src/core/load_balancing/child_policy_handler.h"
46 #include "src/core/load_balancing/delegating_helper.h"
47 #include "src/core/load_balancing/lb_policy.h"
48 #include "src/core/load_balancing/lb_policy_factory.h"
49 #include "src/core/load_balancing/lb_policy_registry.h"
50 #include "src/core/resolver/endpoint_addresses.h"
51 #include "src/core/resolver/xds/xds_resolver_attributes.h"
52 #include "src/core/util/debug_location.h"
53 #include "src/core/util/json/json.h"
54 #include "src/core/util/json/json_args.h"
55 #include "src/core/util/json/json_object_loader.h"
56 #include "src/core/util/orphanable.h"
57 #include "src/core/util/ref_counted_ptr.h"
58 #include "src/core/util/time.h"
59 #include "src/core/util/validation_errors.h"
60 #include "src/core/util/work_serializer.h"
61
62 namespace grpc_core {
63
64 namespace {
65
66 using ::grpc_event_engine::experimental::EventEngine;
67
68 constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
69 constexpr absl::string_view kXdsClusterManager =
70 "xds_cluster_manager_experimental";
71
72 // Config for xds_cluster_manager LB policy.
73 class XdsClusterManagerLbConfig final : public LoadBalancingPolicy::Config {
74 public:
75 struct Child {
76 RefCountedPtr<LoadBalancingPolicy::Config> config;
77
78 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
79 void JsonPostLoad(const Json& json, const JsonArgs&,
80 ValidationErrors* errors);
81 };
82
83 XdsClusterManagerLbConfig() = default;
84
85 XdsClusterManagerLbConfig(const XdsClusterManagerLbConfig&) = delete;
86 XdsClusterManagerLbConfig& operator=(const XdsClusterManagerLbConfig&) =
87 delete;
88
89 XdsClusterManagerLbConfig(XdsClusterManagerLbConfig&& other) = delete;
90 XdsClusterManagerLbConfig& operator=(XdsClusterManagerLbConfig&& other) =
91 delete;
92
name() const93 absl::string_view name() const override { return kXdsClusterManager; }
94
cluster_map() const95 const std::map<std::string, Child>& cluster_map() const {
96 return cluster_map_;
97 }
98
99 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
100
101 private:
102 std::map<std::string, Child> cluster_map_;
103 };
104
105 // xds_cluster_manager LB policy.
106 class XdsClusterManagerLb final : public LoadBalancingPolicy {
107 public:
108 explicit XdsClusterManagerLb(Args args);
109
name() const110 absl::string_view name() const override { return kXdsClusterManager; }
111
112 absl::Status UpdateLocked(UpdateArgs args) override;
113 void ExitIdleLocked() override;
114 void ResetBackoffLocked() override;
115
116 private:
117 // Picks a child using prefix or path matching and then delegates to that
118 // child's picker.
119 class ClusterPicker final : public SubchannelPicker {
120 public:
121 // Maintains a map of cluster names to pickers.
122 using ClusterMap = std::map<std::string /*cluster_name*/,
123 RefCountedPtr<SubchannelPicker>, std::less<>>;
124
125 // It is required that the keys of cluster_map have to live at least as long
126 // as the ClusterPicker instance.
ClusterPicker(ClusterMap cluster_map)127 explicit ClusterPicker(ClusterMap cluster_map)
128 : cluster_map_(std::move(cluster_map)) {}
129
130 PickResult Pick(PickArgs args) override;
131
132 private:
133 ClusterMap cluster_map_;
134 };
135
136 // Each ClusterChild holds a ref to its parent XdsClusterManagerLb.
137 class ClusterChild final : public InternallyRefCounted<ClusterChild> {
138 public:
139 ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
140 const std::string& name);
141 ~ClusterChild() override;
142
143 void Orphan() override;
144
145 absl::Status UpdateLocked(
146 RefCountedPtr<LoadBalancingPolicy::Config> config,
147 const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>&
148 addresses,
149 const ChannelArgs& args);
150 void ExitIdleLocked();
151 void ResetBackoffLocked();
152 void DeactivateLocked();
153
connectivity_state() const154 grpc_connectivity_state connectivity_state() const {
155 return connectivity_state_;
156 }
picker() const157 RefCountedPtr<SubchannelPicker> picker() const { return picker_; }
158
159 private:
160 class Helper final : public DelegatingChannelControlHelper {
161 public:
Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)162 explicit Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)
163 : xds_cluster_manager_child_(std::move(xds_cluster_manager_child)) {}
164
~Helper()165 ~Helper() override {
166 xds_cluster_manager_child_.reset(DEBUG_LOCATION, "Helper");
167 }
168
169 void UpdateState(grpc_connectivity_state state,
170 const absl::Status& status,
171 RefCountedPtr<SubchannelPicker> picker) override;
172
173 private:
parent_helper() const174 ChannelControlHelper* parent_helper() const override {
175 return xds_cluster_manager_child_->xds_cluster_manager_policy_
176 ->channel_control_helper();
177 }
178
179 RefCountedPtr<ClusterChild> xds_cluster_manager_child_;
180 };
181
182 // Methods for dealing with the child policy.
183 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
184 const ChannelArgs& args);
185
186 void OnDelayedRemovalTimerLocked();
187
188 // The owning LB policy.
189 RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy_;
190
191 // Points to the corresponding key in children map.
192 const std::string name_;
193
194 OrphanablePtr<LoadBalancingPolicy> child_policy_;
195
196 RefCountedPtr<SubchannelPicker> picker_;
197 grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
198
199 // States for delayed removal.
200 absl::optional<EventEngine::TaskHandle> delayed_removal_timer_handle_;
201 bool shutdown_ = false;
202 };
203
204 ~XdsClusterManagerLb() override;
205
206 void ShutdownLocked() override;
207
208 void UpdateStateLocked();
209
210 // Current config from the resolver.
211 RefCountedPtr<XdsClusterManagerLbConfig> config_;
212
213 // Internal state.
214 bool shutting_down_ = false;
215 bool update_in_progress_ = false;
216
217 // Children.
218 std::map<std::string, OrphanablePtr<ClusterChild>> children_;
219 };
220
221 //
222 // XdsClusterManagerLb::ClusterPicker
223 //
224
Pick(PickArgs args)225 XdsClusterManagerLb::PickResult XdsClusterManagerLb::ClusterPicker::Pick(
226 PickArgs args) {
227 auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
228 auto* cluster_name_attribute =
229 call_state->GetCallAttribute<XdsClusterAttribute>();
230 absl::string_view cluster_name;
231 if (cluster_name_attribute != nullptr) {
232 cluster_name = cluster_name_attribute->cluster();
233 }
234 auto it = cluster_map_.find(cluster_name);
235 if (it != cluster_map_.end()) {
236 return it->second->Pick(args);
237 }
238 return PickResult::Fail(absl::InternalError(absl::StrCat(
239 "xds cluster manager picker: unknown cluster \"", cluster_name, "\"")));
240 }
241
242 //
243 // XdsClusterManagerLb
244 //
245
XdsClusterManagerLb(Args args)246 XdsClusterManagerLb::XdsClusterManagerLb(Args args)
247 : LoadBalancingPolicy(std::move(args)) {}
248
~XdsClusterManagerLb()249 XdsClusterManagerLb::~XdsClusterManagerLb() {
250 GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
251 << "[xds_cluster_manager_lb " << this
252 << "] destroying xds_cluster_manager LB policy";
253 }
254
ShutdownLocked()255 void XdsClusterManagerLb::ShutdownLocked() {
256 GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
257 << "[xds_cluster_manager_lb " << this << "] shutting down";
258 shutting_down_ = true;
259 children_.clear();
260 }
261
ExitIdleLocked()262 void XdsClusterManagerLb::ExitIdleLocked() {
263 for (auto& p : children_) p.second->ExitIdleLocked();
264 }
265
ResetBackoffLocked()266 void XdsClusterManagerLb::ResetBackoffLocked() {
267 for (auto& p : children_) p.second->ResetBackoffLocked();
268 }
269
UpdateLocked(UpdateArgs args)270 absl::Status XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
271 if (shutting_down_) return absl::OkStatus();
272 GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
273 << "[xds_cluster_manager_lb " << this << "] Received update";
274 update_in_progress_ = true;
275 // Update config.
276 config_ = args.config.TakeAsSubclass<XdsClusterManagerLbConfig>();
277 // Deactivate the children not in the new config.
278 for (const auto& p : children_) {
279 const std::string& name = p.first;
280 ClusterChild* child = p.second.get();
281 if (config_->cluster_map().find(name) == config_->cluster_map().end()) {
282 child->DeactivateLocked();
283 }
284 }
285 // Add or update the children in the new config.
286 std::vector<std::string> errors;
287 for (const auto& p : config_->cluster_map()) {
288 const std::string& name = p.first;
289 const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second.config;
290 auto& child = children_[name];
291 if (child == nullptr) {
292 child = MakeOrphanable<ClusterChild>(
293 RefAsSubclass<XdsClusterManagerLb>(DEBUG_LOCATION, "ClusterChild"),
294 name);
295 }
296 absl::Status status =
297 child->UpdateLocked(config, args.addresses, args.args);
298 if (!status.ok()) {
299 errors.emplace_back(
300 absl::StrCat("child ", name, ": ", status.ToString()));
301 }
302 }
303 update_in_progress_ = false;
304 UpdateStateLocked();
305 // Return status.
306 if (!errors.empty()) {
307 return absl::UnavailableError(absl::StrCat(
308 "errors from children: [", absl::StrJoin(errors, "; "), "]"));
309 }
310 return absl::OkStatus();
311 }
312
UpdateStateLocked()313 void XdsClusterManagerLb::UpdateStateLocked() {
314 // If we're in the process of propagating an update from our parent to
315 // our children, ignore any updates that come from the children. We
316 // will instead return a new picker once the update has been seen by
317 // all children. This avoids unnecessary picker churn while an update
318 // is being propagated to our children.
319 if (update_in_progress_) return;
320 // Also count the number of children in each state, to determine the
321 // overall state.
322 size_t num_ready = 0;
323 size_t num_connecting = 0;
324 size_t num_idle = 0;
325 for (const auto& p : children_) {
326 const auto& child_name = p.first;
327 const ClusterChild* child = p.second.get();
328 // Skip the children that are not in the latest update.
329 if (config_->cluster_map().find(child_name) ==
330 config_->cluster_map().end()) {
331 continue;
332 }
333 switch (child->connectivity_state()) {
334 case GRPC_CHANNEL_READY: {
335 ++num_ready;
336 break;
337 }
338 case GRPC_CHANNEL_CONNECTING: {
339 ++num_connecting;
340 break;
341 }
342 case GRPC_CHANNEL_IDLE: {
343 ++num_idle;
344 break;
345 }
346 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
347 break;
348 }
349 default:
350 GPR_UNREACHABLE_CODE(return);
351 }
352 }
353 // Determine aggregated connectivity state.
354 grpc_connectivity_state connectivity_state;
355 if (num_ready > 0) {
356 connectivity_state = GRPC_CHANNEL_READY;
357 } else if (num_connecting > 0) {
358 connectivity_state = GRPC_CHANNEL_CONNECTING;
359 } else if (num_idle > 0) {
360 connectivity_state = GRPC_CHANNEL_IDLE;
361 } else {
362 connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
363 }
364 GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
365 << "[xds_cluster_manager_lb " << this << "] connectivity changed to "
366 << ConnectivityStateName(connectivity_state);
367 ClusterPicker::ClusterMap cluster_map;
368 for (const auto& p : config_->cluster_map()) {
369 const std::string& cluster_name = p.first;
370 RefCountedPtr<SubchannelPicker>& child_picker = cluster_map[cluster_name];
371 child_picker = children_[cluster_name]->picker();
372 if (child_picker == nullptr) {
373 GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
374 << "[xds_cluster_manager_lb " << this << "] child " << cluster_name
375 << " has not yet returned a picker; creating a QueuePicker.";
376 child_picker =
377 MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
378 }
379 }
380 auto picker = MakeRefCounted<ClusterPicker>(std::move(cluster_map));
381 absl::Status status;
382 if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
383 status = absl::Status(absl::StatusCode::kUnavailable,
384 "TRANSIENT_FAILURE from XdsClusterManagerLb");
385 }
386 channel_control_helper()->UpdateState(connectivity_state, status,
387 std::move(picker));
388 }
389
390 //
391 // XdsClusterManagerLb::ClusterChild
392 //
393
ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,const std::string & name)394 XdsClusterManagerLb::ClusterChild::ClusterChild(
395 RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
396 const std::string& name)
397 : xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)),
398 name_(name),
399 picker_(MakeRefCounted<QueuePicker>(nullptr)) {
400 GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
401 << "[xds_cluster_manager_lb " << xds_cluster_manager_policy_.get()
402 << "] created ClusterChild " << this << " for " << name_;
403 }
404
~ClusterChild()405 XdsClusterManagerLb::ClusterChild::~ClusterChild() {
406 GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
407 << "[xds_cluster_manager_lb " << xds_cluster_manager_policy_.get()
408 << "] ClusterChild " << this << ": destroying child";
409 xds_cluster_manager_policy_.reset(DEBUG_LOCATION, "ClusterChild");
410 }
411
Orphan()412 void XdsClusterManagerLb::ClusterChild::Orphan() {
413 GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
414 << "[xds_cluster_manager_lb " << xds_cluster_manager_policy_.get()
415 << "] ClusterChild " << this << " " << name_ << ": shutting down child";
416 // Remove the child policy's interested_parties pollset_set from the
417 // xDS policy.
418 grpc_pollset_set_del_pollset_set(
419 child_policy_->interested_parties(),
420 xds_cluster_manager_policy_->interested_parties());
421 child_policy_.reset();
422 // Drop our ref to the child's picker, in case it's holding a ref to
423 // the child.
424 picker_.reset();
425 if (delayed_removal_timer_handle_.has_value()) {
426 xds_cluster_manager_policy_->channel_control_helper()
427 ->GetEventEngine()
428 ->Cancel(*delayed_removal_timer_handle_);
429 }
430 shutdown_ = true;
431 Unref();
432 }
433
434 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const ChannelArgs & args)435 XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
436 const ChannelArgs& args) {
437 LoadBalancingPolicy::Args lb_policy_args;
438 lb_policy_args.work_serializer =
439 xds_cluster_manager_policy_->work_serializer();
440 lb_policy_args.args = args;
441 lb_policy_args.channel_control_helper =
442 std::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
443 OrphanablePtr<LoadBalancingPolicy> lb_policy =
444 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
445 &xds_cluster_manager_lb_trace);
446 GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
447 << "[xds_cluster_manager_lb " << xds_cluster_manager_policy_.get()
448 << "] ClusterChild " << this << " " << name_
449 << ": Created new child policy handler " << lb_policy.get();
450 // Add the xDS's interested_parties pollset_set to that of the newly created
451 // child policy. This will make the child policy progress upon activity on
452 // xDS LB, which in turn is tied to the application's call.
453 grpc_pollset_set_add_pollset_set(
454 lb_policy->interested_parties(),
455 xds_cluster_manager_policy_->interested_parties());
456 return lb_policy;
457 }
458
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> & addresses,const ChannelArgs & args)459 absl::Status XdsClusterManagerLb::ClusterChild::UpdateLocked(
460 RefCountedPtr<LoadBalancingPolicy::Config> config,
461 const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>& addresses,
462 const ChannelArgs& args) {
463 if (xds_cluster_manager_policy_->shutting_down_) return absl::OkStatus();
464 // Update child weight.
465 // Reactivate if needed.
466 if (delayed_removal_timer_handle_.has_value() &&
467 xds_cluster_manager_policy_->channel_control_helper()
468 ->GetEventEngine()
469 ->Cancel(*delayed_removal_timer_handle_)) {
470 delayed_removal_timer_handle_.reset();
471 }
472 // Create child policy if needed.
473 if (child_policy_ == nullptr) {
474 child_policy_ = CreateChildPolicyLocked(args);
475 }
476 // Construct update args.
477 UpdateArgs update_args;
478 update_args.config = std::move(config);
479 update_args.addresses = addresses;
480 update_args.args = args;
481 // Update the policy.
482 GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
483 << "[xds_cluster_manager_lb " << xds_cluster_manager_policy_.get()
484 << "] ClusterChild " << this << " " << name_
485 << ": Updating child policy handler " << child_policy_.get();
486 return child_policy_->UpdateLocked(std::move(update_args));
487 }
488
ExitIdleLocked()489 void XdsClusterManagerLb::ClusterChild::ExitIdleLocked() {
490 child_policy_->ExitIdleLocked();
491 }
492
ResetBackoffLocked()493 void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() {
494 child_policy_->ResetBackoffLocked();
495 }
496
DeactivateLocked()497 void XdsClusterManagerLb::ClusterChild::DeactivateLocked() {
498 // If already deactivated, don't do that again.
499 if (delayed_removal_timer_handle_.has_value()) return;
500 // Start a timer to delete the child.
501 delayed_removal_timer_handle_ =
502 xds_cluster_manager_policy_->channel_control_helper()
503 ->GetEventEngine()
504 ->RunAfter(
505 kChildRetentionInterval,
506 [self = Ref(DEBUG_LOCATION, "ClusterChild+timer")]() mutable {
507 ApplicationCallbackExecCtx application_exec_ctx;
508 ExecCtx exec_ctx;
509 auto* self_ptr = self.get(); // Avoid use-after-move problem.
510 self_ptr->xds_cluster_manager_policy_->work_serializer()->Run(
511 [self = std::move(self)]() {
512 self->OnDelayedRemovalTimerLocked();
513 },
514 DEBUG_LOCATION);
515 });
516 }
517
OnDelayedRemovalTimerLocked()518 void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked() {
519 delayed_removal_timer_handle_.reset();
520 if (!shutdown_) {
521 xds_cluster_manager_policy_->children_.erase(name_);
522 }
523 }
524
525 //
526 // XdsClusterManagerLb::ClusterChild::Helper
527 //
528
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)529 void XdsClusterManagerLb::ClusterChild::Helper::UpdateState(
530 grpc_connectivity_state state, const absl::Status& status,
531 RefCountedPtr<SubchannelPicker> picker) {
532 GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
533 << "[xds_cluster_manager_lb "
534 << xds_cluster_manager_child_->xds_cluster_manager_policy_.get()
535 << "] child " << xds_cluster_manager_child_->name_
536 << ": received update: state=" << ConnectivityStateName(state) << " ("
537 << status << ") picker=" << picker.get();
538 if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
539 return;
540 }
541 // Cache the picker in the ClusterChild.
542 xds_cluster_manager_child_->picker_ = std::move(picker);
543 // Decide what state to report for aggregation purposes.
544 // If the last recorded state was TRANSIENT_FAILURE and the new state
545 // is something other than READY, don't change the state.
546 if (xds_cluster_manager_child_->connectivity_state_ !=
547 GRPC_CHANNEL_TRANSIENT_FAILURE ||
548 state == GRPC_CHANNEL_READY) {
549 xds_cluster_manager_child_->connectivity_state_ = state;
550 }
551 // Notify the LB policy.
552 xds_cluster_manager_child_->xds_cluster_manager_policy_->UpdateStateLocked();
553 }
554
555 //
556 // factory
557 //
558
JsonLoader(const JsonArgs &)559 const JsonLoaderInterface* XdsClusterManagerLbConfig::Child::JsonLoader(
560 const JsonArgs&) {
561 // Note: The "childPolicy" field requires custom processing, so
562 // it's handled in JsonPostLoad() instead.
563 static const auto* loader = JsonObjectLoader<Child>().Finish();
564 return loader;
565 }
566
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)567 void XdsClusterManagerLbConfig::Child::JsonPostLoad(const Json& json,
568 const JsonArgs&,
569 ValidationErrors* errors) {
570 ValidationErrors::ScopedField field(errors, ".childPolicy");
571 auto it = json.object().find("childPolicy");
572 if (it == json.object().end()) {
573 errors->AddError("field not present");
574 return;
575 }
576 auto lb_config =
577 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
578 it->second);
579 if (!lb_config.ok()) {
580 errors->AddError(lb_config.status().message());
581 return;
582 }
583 config = std::move(*lb_config);
584 }
585
JsonLoader(const JsonArgs &)586 const JsonLoaderInterface* XdsClusterManagerLbConfig::JsonLoader(
587 const JsonArgs&) {
588 static const auto* loader =
589 JsonObjectLoader<XdsClusterManagerLbConfig>()
590 .Field("children", &XdsClusterManagerLbConfig::cluster_map_)
591 .Finish();
592 return loader;
593 }
594
595 class XdsClusterManagerLbFactory final : public LoadBalancingPolicyFactory {
596 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const597 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
598 LoadBalancingPolicy::Args args) const override {
599 return MakeOrphanable<XdsClusterManagerLb>(std::move(args));
600 }
601
name() const602 absl::string_view name() const override { return kXdsClusterManager; }
603
604 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const605 ParseLoadBalancingConfig(const Json& json) const override {
606 return LoadFromJson<RefCountedPtr<XdsClusterManagerLbConfig>>(
607 json, JsonArgs(),
608 "errors validating xds_cluster_manager LB policy config");
609 }
610 };
611
612 } // namespace
613
RegisterXdsClusterManagerLbPolicy(CoreConfiguration::Builder * builder)614 void RegisterXdsClusterManagerLbPolicy(CoreConfiguration::Builder* builder) {
615 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
616 std::make_unique<XdsClusterManagerLbFactory>());
617 }
618
619 } // namespace grpc_core
620