• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/event_engine/event_engine.h>
18 #include <grpc/impl/connectivity_state.h>
19 #include <grpc/support/port_platform.h>
20 #include <stddef.h>
21 
22 #include <algorithm>
23 #include <functional>
24 #include <map>
25 #include <memory>
26 #include <string>
27 #include <type_traits>
28 #include <utility>
29 #include <vector>
30 
31 #include "absl/log/log.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/str_cat.h"
35 #include "absl/strings/str_join.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/types/optional.h"
38 #include "src/core/client_channel/client_channel_internal.h"
39 #include "src/core/config/core_configuration.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/debug/trace.h"
42 #include "src/core/lib/iomgr/exec_ctx.h"
43 #include "src/core/lib/iomgr/pollset_set.h"
44 #include "src/core/lib/transport/connectivity_state.h"
45 #include "src/core/load_balancing/child_policy_handler.h"
46 #include "src/core/load_balancing/delegating_helper.h"
47 #include "src/core/load_balancing/lb_policy.h"
48 #include "src/core/load_balancing/lb_policy_factory.h"
49 #include "src/core/load_balancing/lb_policy_registry.h"
50 #include "src/core/resolver/endpoint_addresses.h"
51 #include "src/core/resolver/xds/xds_resolver_attributes.h"
52 #include "src/core/util/debug_location.h"
53 #include "src/core/util/json/json.h"
54 #include "src/core/util/json/json_args.h"
55 #include "src/core/util/json/json_object_loader.h"
56 #include "src/core/util/orphanable.h"
57 #include "src/core/util/ref_counted_ptr.h"
58 #include "src/core/util/time.h"
59 #include "src/core/util/validation_errors.h"
60 #include "src/core/util/work_serializer.h"
61 
62 namespace grpc_core {
63 
64 namespace {
65 
66 using ::grpc_event_engine::experimental::EventEngine;
67 
68 constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
69 constexpr absl::string_view kXdsClusterManager =
70     "xds_cluster_manager_experimental";
71 
72 // Config for xds_cluster_manager LB policy.
73 class XdsClusterManagerLbConfig final : public LoadBalancingPolicy::Config {
74  public:
75   struct Child {
76     RefCountedPtr<LoadBalancingPolicy::Config> config;
77 
78     static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
79     void JsonPostLoad(const Json& json, const JsonArgs&,
80                       ValidationErrors* errors);
81   };
82 
83   XdsClusterManagerLbConfig() = default;
84 
85   XdsClusterManagerLbConfig(const XdsClusterManagerLbConfig&) = delete;
86   XdsClusterManagerLbConfig& operator=(const XdsClusterManagerLbConfig&) =
87       delete;
88 
89   XdsClusterManagerLbConfig(XdsClusterManagerLbConfig&& other) = delete;
90   XdsClusterManagerLbConfig& operator=(XdsClusterManagerLbConfig&& other) =
91       delete;
92 
name() const93   absl::string_view name() const override { return kXdsClusterManager; }
94 
cluster_map() const95   const std::map<std::string, Child>& cluster_map() const {
96     return cluster_map_;
97   }
98 
99   static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
100 
101  private:
102   std::map<std::string, Child> cluster_map_;
103 };
104 
105 // xds_cluster_manager LB policy.
106 class XdsClusterManagerLb final : public LoadBalancingPolicy {
107  public:
108   explicit XdsClusterManagerLb(Args args);
109 
name() const110   absl::string_view name() const override { return kXdsClusterManager; }
111 
112   absl::Status UpdateLocked(UpdateArgs args) override;
113   void ExitIdleLocked() override;
114   void ResetBackoffLocked() override;
115 
116  private:
117   // Picks a child using prefix or path matching and then delegates to that
118   // child's picker.
119   class ClusterPicker final : public SubchannelPicker {
120    public:
121     // Maintains a map of cluster names to pickers.
122     using ClusterMap = std::map<std::string /*cluster_name*/,
123                                 RefCountedPtr<SubchannelPicker>, std::less<>>;
124 
125     // It is required that the keys of cluster_map have to live at least as long
126     // as the ClusterPicker instance.
ClusterPicker(ClusterMap cluster_map)127     explicit ClusterPicker(ClusterMap cluster_map)
128         : cluster_map_(std::move(cluster_map)) {}
129 
130     PickResult Pick(PickArgs args) override;
131 
132    private:
133     ClusterMap cluster_map_;
134   };
135 
136   // Each ClusterChild holds a ref to its parent XdsClusterManagerLb.
137   class ClusterChild final : public InternallyRefCounted<ClusterChild> {
138    public:
139     ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
140                  const std::string& name);
141     ~ClusterChild() override;
142 
143     void Orphan() override;
144 
145     absl::Status UpdateLocked(
146         RefCountedPtr<LoadBalancingPolicy::Config> config,
147         const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>&
148             addresses,
149         const ChannelArgs& args);
150     void ExitIdleLocked();
151     void ResetBackoffLocked();
152     void DeactivateLocked();
153 
connectivity_state() const154     grpc_connectivity_state connectivity_state() const {
155       return connectivity_state_;
156     }
picker() const157     RefCountedPtr<SubchannelPicker> picker() const { return picker_; }
158 
159    private:
160     class Helper final : public DelegatingChannelControlHelper {
161      public:
Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)162       explicit Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)
163           : xds_cluster_manager_child_(std::move(xds_cluster_manager_child)) {}
164 
~Helper()165       ~Helper() override {
166         xds_cluster_manager_child_.reset(DEBUG_LOCATION, "Helper");
167       }
168 
169       void UpdateState(grpc_connectivity_state state,
170                        const absl::Status& status,
171                        RefCountedPtr<SubchannelPicker> picker) override;
172 
173      private:
parent_helper() const174       ChannelControlHelper* parent_helper() const override {
175         return xds_cluster_manager_child_->xds_cluster_manager_policy_
176             ->channel_control_helper();
177       }
178 
179       RefCountedPtr<ClusterChild> xds_cluster_manager_child_;
180     };
181 
182     // Methods for dealing with the child policy.
183     OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
184         const ChannelArgs& args);
185 
186     void OnDelayedRemovalTimerLocked();
187 
188     // The owning LB policy.
189     RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy_;
190 
191     // Points to the corresponding key in children map.
192     const std::string name_;
193 
194     OrphanablePtr<LoadBalancingPolicy> child_policy_;
195 
196     RefCountedPtr<SubchannelPicker> picker_;
197     grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
198 
199     // States for delayed removal.
200     absl::optional<EventEngine::TaskHandle> delayed_removal_timer_handle_;
201     bool shutdown_ = false;
202   };
203 
204   ~XdsClusterManagerLb() override;
205 
206   void ShutdownLocked() override;
207 
208   void UpdateStateLocked();
209 
210   // Current config from the resolver.
211   RefCountedPtr<XdsClusterManagerLbConfig> config_;
212 
213   // Internal state.
214   bool shutting_down_ = false;
215   bool update_in_progress_ = false;
216 
217   // Children.
218   std::map<std::string, OrphanablePtr<ClusterChild>> children_;
219 };
220 
221 //
222 // XdsClusterManagerLb::ClusterPicker
223 //
224 
Pick(PickArgs args)225 XdsClusterManagerLb::PickResult XdsClusterManagerLb::ClusterPicker::Pick(
226     PickArgs args) {
227   auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
228   auto* cluster_name_attribute =
229       call_state->GetCallAttribute<XdsClusterAttribute>();
230   absl::string_view cluster_name;
231   if (cluster_name_attribute != nullptr) {
232     cluster_name = cluster_name_attribute->cluster();
233   }
234   auto it = cluster_map_.find(cluster_name);
235   if (it != cluster_map_.end()) {
236     return it->second->Pick(args);
237   }
238   return PickResult::Fail(absl::InternalError(absl::StrCat(
239       "xds cluster manager picker: unknown cluster \"", cluster_name, "\"")));
240 }
241 
242 //
243 // XdsClusterManagerLb
244 //
245 
XdsClusterManagerLb(Args args)246 XdsClusterManagerLb::XdsClusterManagerLb(Args args)
247     : LoadBalancingPolicy(std::move(args)) {}
248 
~XdsClusterManagerLb()249 XdsClusterManagerLb::~XdsClusterManagerLb() {
250   GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
251       << "[xds_cluster_manager_lb " << this
252       << "] destroying xds_cluster_manager LB policy";
253 }
254 
ShutdownLocked()255 void XdsClusterManagerLb::ShutdownLocked() {
256   GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
257       << "[xds_cluster_manager_lb " << this << "] shutting down";
258   shutting_down_ = true;
259   children_.clear();
260 }
261 
ExitIdleLocked()262 void XdsClusterManagerLb::ExitIdleLocked() {
263   for (auto& p : children_) p.second->ExitIdleLocked();
264 }
265 
ResetBackoffLocked()266 void XdsClusterManagerLb::ResetBackoffLocked() {
267   for (auto& p : children_) p.second->ResetBackoffLocked();
268 }
269 
UpdateLocked(UpdateArgs args)270 absl::Status XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
271   if (shutting_down_) return absl::OkStatus();
272   GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
273       << "[xds_cluster_manager_lb " << this << "] Received update";
274   update_in_progress_ = true;
275   // Update config.
276   config_ = args.config.TakeAsSubclass<XdsClusterManagerLbConfig>();
277   // Deactivate the children not in the new config.
278   for (const auto& p : children_) {
279     const std::string& name = p.first;
280     ClusterChild* child = p.second.get();
281     if (config_->cluster_map().find(name) == config_->cluster_map().end()) {
282       child->DeactivateLocked();
283     }
284   }
285   // Add or update the children in the new config.
286   std::vector<std::string> errors;
287   for (const auto& p : config_->cluster_map()) {
288     const std::string& name = p.first;
289     const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second.config;
290     auto& child = children_[name];
291     if (child == nullptr) {
292       child = MakeOrphanable<ClusterChild>(
293           RefAsSubclass<XdsClusterManagerLb>(DEBUG_LOCATION, "ClusterChild"),
294           name);
295     }
296     absl::Status status =
297         child->UpdateLocked(config, args.addresses, args.args);
298     if (!status.ok()) {
299       errors.emplace_back(
300           absl::StrCat("child ", name, ": ", status.ToString()));
301     }
302   }
303   update_in_progress_ = false;
304   UpdateStateLocked();
305   // Return status.
306   if (!errors.empty()) {
307     return absl::UnavailableError(absl::StrCat(
308         "errors from children: [", absl::StrJoin(errors, "; "), "]"));
309   }
310   return absl::OkStatus();
311 }
312 
UpdateStateLocked()313 void XdsClusterManagerLb::UpdateStateLocked() {
314   // If we're in the process of propagating an update from our parent to
315   // our children, ignore any updates that come from the children.  We
316   // will instead return a new picker once the update has been seen by
317   // all children.  This avoids unnecessary picker churn while an update
318   // is being propagated to our children.
319   if (update_in_progress_) return;
320   // Also count the number of children in each state, to determine the
321   // overall state.
322   size_t num_ready = 0;
323   size_t num_connecting = 0;
324   size_t num_idle = 0;
325   for (const auto& p : children_) {
326     const auto& child_name = p.first;
327     const ClusterChild* child = p.second.get();
328     // Skip the children that are not in the latest update.
329     if (config_->cluster_map().find(child_name) ==
330         config_->cluster_map().end()) {
331       continue;
332     }
333     switch (child->connectivity_state()) {
334       case GRPC_CHANNEL_READY: {
335         ++num_ready;
336         break;
337       }
338       case GRPC_CHANNEL_CONNECTING: {
339         ++num_connecting;
340         break;
341       }
342       case GRPC_CHANNEL_IDLE: {
343         ++num_idle;
344         break;
345       }
346       case GRPC_CHANNEL_TRANSIENT_FAILURE: {
347         break;
348       }
349       default:
350         GPR_UNREACHABLE_CODE(return);
351     }
352   }
353   // Determine aggregated connectivity state.
354   grpc_connectivity_state connectivity_state;
355   if (num_ready > 0) {
356     connectivity_state = GRPC_CHANNEL_READY;
357   } else if (num_connecting > 0) {
358     connectivity_state = GRPC_CHANNEL_CONNECTING;
359   } else if (num_idle > 0) {
360     connectivity_state = GRPC_CHANNEL_IDLE;
361   } else {
362     connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
363   }
364   GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
365       << "[xds_cluster_manager_lb " << this << "] connectivity changed to "
366       << ConnectivityStateName(connectivity_state);
367   ClusterPicker::ClusterMap cluster_map;
368   for (const auto& p : config_->cluster_map()) {
369     const std::string& cluster_name = p.first;
370     RefCountedPtr<SubchannelPicker>& child_picker = cluster_map[cluster_name];
371     child_picker = children_[cluster_name]->picker();
372     if (child_picker == nullptr) {
373       GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
374           << "[xds_cluster_manager_lb " << this << "] child " << cluster_name
375           << " has not yet returned a picker; creating a QueuePicker.";
376       child_picker =
377           MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
378     }
379   }
380   auto picker = MakeRefCounted<ClusterPicker>(std::move(cluster_map));
381   absl::Status status;
382   if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
383     status = absl::Status(absl::StatusCode::kUnavailable,
384                           "TRANSIENT_FAILURE from XdsClusterManagerLb");
385   }
386   channel_control_helper()->UpdateState(connectivity_state, status,
387                                         std::move(picker));
388 }
389 
390 //
391 // XdsClusterManagerLb::ClusterChild
392 //
393 
ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,const std::string & name)394 XdsClusterManagerLb::ClusterChild::ClusterChild(
395     RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
396     const std::string& name)
397     : xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)),
398       name_(name),
399       picker_(MakeRefCounted<QueuePicker>(nullptr)) {
400   GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
401       << "[xds_cluster_manager_lb " << xds_cluster_manager_policy_.get()
402       << "] created ClusterChild " << this << " for " << name_;
403 }
404 
~ClusterChild()405 XdsClusterManagerLb::ClusterChild::~ClusterChild() {
406   GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
407       << "[xds_cluster_manager_lb " << xds_cluster_manager_policy_.get()
408       << "] ClusterChild " << this << ": destroying child";
409   xds_cluster_manager_policy_.reset(DEBUG_LOCATION, "ClusterChild");
410 }
411 
Orphan()412 void XdsClusterManagerLb::ClusterChild::Orphan() {
413   GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
414       << "[xds_cluster_manager_lb " << xds_cluster_manager_policy_.get()
415       << "] ClusterChild " << this << " " << name_ << ": shutting down child";
416   // Remove the child policy's interested_parties pollset_set from the
417   // xDS policy.
418   grpc_pollset_set_del_pollset_set(
419       child_policy_->interested_parties(),
420       xds_cluster_manager_policy_->interested_parties());
421   child_policy_.reset();
422   // Drop our ref to the child's picker, in case it's holding a ref to
423   // the child.
424   picker_.reset();
425   if (delayed_removal_timer_handle_.has_value()) {
426     xds_cluster_manager_policy_->channel_control_helper()
427         ->GetEventEngine()
428         ->Cancel(*delayed_removal_timer_handle_);
429   }
430   shutdown_ = true;
431   Unref();
432 }
433 
434 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const ChannelArgs & args)435 XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
436     const ChannelArgs& args) {
437   LoadBalancingPolicy::Args lb_policy_args;
438   lb_policy_args.work_serializer =
439       xds_cluster_manager_policy_->work_serializer();
440   lb_policy_args.args = args;
441   lb_policy_args.channel_control_helper =
442       std::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
443   OrphanablePtr<LoadBalancingPolicy> lb_policy =
444       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
445                                          &xds_cluster_manager_lb_trace);
446   GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
447       << "[xds_cluster_manager_lb " << xds_cluster_manager_policy_.get()
448       << "] ClusterChild " << this << " " << name_
449       << ": Created new child policy handler " << lb_policy.get();
450   // Add the xDS's interested_parties pollset_set to that of the newly created
451   // child policy. This will make the child policy progress upon activity on
452   // xDS LB, which in turn is tied to the application's call.
453   grpc_pollset_set_add_pollset_set(
454       lb_policy->interested_parties(),
455       xds_cluster_manager_policy_->interested_parties());
456   return lb_policy;
457 }
458 
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> & addresses,const ChannelArgs & args)459 absl::Status XdsClusterManagerLb::ClusterChild::UpdateLocked(
460     RefCountedPtr<LoadBalancingPolicy::Config> config,
461     const absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>>& addresses,
462     const ChannelArgs& args) {
463   if (xds_cluster_manager_policy_->shutting_down_) return absl::OkStatus();
464   // Update child weight.
465   // Reactivate if needed.
466   if (delayed_removal_timer_handle_.has_value() &&
467       xds_cluster_manager_policy_->channel_control_helper()
468           ->GetEventEngine()
469           ->Cancel(*delayed_removal_timer_handle_)) {
470     delayed_removal_timer_handle_.reset();
471   }
472   // Create child policy if needed.
473   if (child_policy_ == nullptr) {
474     child_policy_ = CreateChildPolicyLocked(args);
475   }
476   // Construct update args.
477   UpdateArgs update_args;
478   update_args.config = std::move(config);
479   update_args.addresses = addresses;
480   update_args.args = args;
481   // Update the policy.
482   GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
483       << "[xds_cluster_manager_lb " << xds_cluster_manager_policy_.get()
484       << "] ClusterChild " << this << " " << name_
485       << ": Updating child policy handler " << child_policy_.get();
486   return child_policy_->UpdateLocked(std::move(update_args));
487 }
488 
ExitIdleLocked()489 void XdsClusterManagerLb::ClusterChild::ExitIdleLocked() {
490   child_policy_->ExitIdleLocked();
491 }
492 
ResetBackoffLocked()493 void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() {
494   child_policy_->ResetBackoffLocked();
495 }
496 
DeactivateLocked()497 void XdsClusterManagerLb::ClusterChild::DeactivateLocked() {
498   // If already deactivated, don't do that again.
499   if (delayed_removal_timer_handle_.has_value()) return;
500   // Start a timer to delete the child.
501   delayed_removal_timer_handle_ =
502       xds_cluster_manager_policy_->channel_control_helper()
503           ->GetEventEngine()
504           ->RunAfter(
505               kChildRetentionInterval,
506               [self = Ref(DEBUG_LOCATION, "ClusterChild+timer")]() mutable {
507                 ApplicationCallbackExecCtx application_exec_ctx;
508                 ExecCtx exec_ctx;
509                 auto* self_ptr = self.get();  // Avoid use-after-move problem.
510                 self_ptr->xds_cluster_manager_policy_->work_serializer()->Run(
511                     [self = std::move(self)]() {
512                       self->OnDelayedRemovalTimerLocked();
513                     },
514                     DEBUG_LOCATION);
515               });
516 }
517 
OnDelayedRemovalTimerLocked()518 void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked() {
519   delayed_removal_timer_handle_.reset();
520   if (!shutdown_) {
521     xds_cluster_manager_policy_->children_.erase(name_);
522   }
523 }
524 
525 //
526 // XdsClusterManagerLb::ClusterChild::Helper
527 //
528 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)529 void XdsClusterManagerLb::ClusterChild::Helper::UpdateState(
530     grpc_connectivity_state state, const absl::Status& status,
531     RefCountedPtr<SubchannelPicker> picker) {
532   GRPC_TRACE_LOG(xds_cluster_manager_lb, INFO)
533       << "[xds_cluster_manager_lb "
534       << xds_cluster_manager_child_->xds_cluster_manager_policy_.get()
535       << "] child " << xds_cluster_manager_child_->name_
536       << ": received update: state=" << ConnectivityStateName(state) << " ("
537       << status << ") picker=" << picker.get();
538   if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
539     return;
540   }
541   // Cache the picker in the ClusterChild.
542   xds_cluster_manager_child_->picker_ = std::move(picker);
543   // Decide what state to report for aggregation purposes.
544   // If the last recorded state was TRANSIENT_FAILURE and the new state
545   // is something other than READY, don't change the state.
546   if (xds_cluster_manager_child_->connectivity_state_ !=
547           GRPC_CHANNEL_TRANSIENT_FAILURE ||
548       state == GRPC_CHANNEL_READY) {
549     xds_cluster_manager_child_->connectivity_state_ = state;
550   }
551   // Notify the LB policy.
552   xds_cluster_manager_child_->xds_cluster_manager_policy_->UpdateStateLocked();
553 }
554 
555 //
556 // factory
557 //
558 
JsonLoader(const JsonArgs &)559 const JsonLoaderInterface* XdsClusterManagerLbConfig::Child::JsonLoader(
560     const JsonArgs&) {
561   // Note: The "childPolicy" field requires custom processing, so
562   // it's handled in JsonPostLoad() instead.
563   static const auto* loader = JsonObjectLoader<Child>().Finish();
564   return loader;
565 }
566 
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)567 void XdsClusterManagerLbConfig::Child::JsonPostLoad(const Json& json,
568                                                     const JsonArgs&,
569                                                     ValidationErrors* errors) {
570   ValidationErrors::ScopedField field(errors, ".childPolicy");
571   auto it = json.object().find("childPolicy");
572   if (it == json.object().end()) {
573     errors->AddError("field not present");
574     return;
575   }
576   auto lb_config =
577       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
578           it->second);
579   if (!lb_config.ok()) {
580     errors->AddError(lb_config.status().message());
581     return;
582   }
583   config = std::move(*lb_config);
584 }
585 
JsonLoader(const JsonArgs &)586 const JsonLoaderInterface* XdsClusterManagerLbConfig::JsonLoader(
587     const JsonArgs&) {
588   static const auto* loader =
589       JsonObjectLoader<XdsClusterManagerLbConfig>()
590           .Field("children", &XdsClusterManagerLbConfig::cluster_map_)
591           .Finish();
592   return loader;
593 }
594 
595 class XdsClusterManagerLbFactory final : public LoadBalancingPolicyFactory {
596  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const597   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
598       LoadBalancingPolicy::Args args) const override {
599     return MakeOrphanable<XdsClusterManagerLb>(std::move(args));
600   }
601 
name() const602   absl::string_view name() const override { return kXdsClusterManager; }
603 
604   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const605   ParseLoadBalancingConfig(const Json& json) const override {
606     return LoadFromJson<RefCountedPtr<XdsClusterManagerLbConfig>>(
607         json, JsonArgs(),
608         "errors validating xds_cluster_manager LB policy config");
609   }
610 };
611 
612 }  // namespace
613 
RegisterXdsClusterManagerLbPolicy(CoreConfiguration::Builder * builder)614 void RegisterXdsClusterManagerLbPolicy(CoreConfiguration::Builder* builder) {
615   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
616       std::make_unique<XdsClusterManagerLbFactory>());
617 }
618 
619 }  // namespace grpc_core
620