• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <set>
20 #include <string>
21 #include <vector>
22 
23 #include "absl/status/status.h"
24 #include "absl/strings/str_cat.h"
25 #include "absl/strings/string_view.h"
26 
27 #include <grpc/grpc.h>
28 
29 #include "src/core/ext/filters/client_channel/lb_policy.h"
30 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
31 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
32 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
33 #include "src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/gprpp/orphanable.h"
37 #include "src/core/lib/gprpp/ref_counted_ptr.h"
38 #include "src/core/lib/iomgr/timer.h"
39 #include "src/core/lib/iomgr/work_serializer.h"
40 #include "src/core/lib/transport/error_utils.h"
41 
42 #define GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS (15 * 60 * 1000)
43 
44 namespace grpc_core {
45 
46 TraceFlag grpc_xds_cluster_manager_lb_trace(false, "xds_cluster_manager_lb");
47 
48 namespace {
49 
50 constexpr char kXdsClusterManager[] = "xds_cluster_manager_experimental";
51 
52 // Config for xds_cluster_manager LB policy.
53 class XdsClusterManagerLbConfig : public LoadBalancingPolicy::Config {
54  public:
55   using ClusterMap =
56       std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>;
57 
XdsClusterManagerLbConfig(ClusterMap cluster_map)58   explicit XdsClusterManagerLbConfig(ClusterMap cluster_map)
59       : cluster_map_(std::move(cluster_map)) {}
60 
name() const61   const char* name() const override { return kXdsClusterManager; }
62 
cluster_map() const63   const ClusterMap& cluster_map() const { return cluster_map_; }
64 
65  private:
66   ClusterMap cluster_map_;
67 };
68 
69 // xds_cluster_manager LB policy.
70 class XdsClusterManagerLb : public LoadBalancingPolicy {
71  public:
72   explicit XdsClusterManagerLb(Args args);
73 
name() const74   const char* name() const override { return kXdsClusterManager; }
75 
76   void UpdateLocked(UpdateArgs args) override;
77   void ExitIdleLocked() override;
78   void ResetBackoffLocked() override;
79 
80  private:
81   // A simple wrapper for ref-counting a picker from the child policy.
82   class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> {
83    public:
ChildPickerWrapper(std::string name,std::unique_ptr<SubchannelPicker> picker)84     ChildPickerWrapper(std::string name,
85                        std::unique_ptr<SubchannelPicker> picker)
86         : name_(std::move(name)), picker_(std::move(picker)) {}
Pick(PickArgs args)87     PickResult Pick(PickArgs args) { return picker_->Pick(args); }
88 
name() const89     const std::string& name() const { return name_; }
90 
91    private:
92     std::string name_;
93     std::unique_ptr<SubchannelPicker> picker_;
94   };
95 
96   // Picks a child using prefix or path matching and then delegates to that
97   // child's picker.
98   class ClusterPicker : public SubchannelPicker {
99    public:
100     // Maintains a map of cluster names to pickers.
101     using ClusterMap = std::map<absl::string_view /*cluster_name*/,
102                                 RefCountedPtr<ChildPickerWrapper>>;
103 
104     // It is required that the keys of cluster_map have to live at least as long
105     // as the ClusterPicker instance.
ClusterPicker(ClusterMap cluster_map)106     explicit ClusterPicker(ClusterMap cluster_map)
107         : cluster_map_(std::move(cluster_map)) {}
108 
109     PickResult Pick(PickArgs args) override;
110 
111    private:
112     ClusterMap cluster_map_;
113   };
114 
115   // Each ClusterChild holds a ref to its parent XdsClusterManagerLb.
116   class ClusterChild : public InternallyRefCounted<ClusterChild> {
117    public:
118     ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
119                  const std::string& name);
120     ~ClusterChild() override;
121 
122     void Orphan() override;
123 
124     void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
125                       const ServerAddressList& addresses,
126                       const grpc_channel_args* args);
127     void ExitIdleLocked();
128     void ResetBackoffLocked();
129     void DeactivateLocked();
130 
connectivity_state() const131     grpc_connectivity_state connectivity_state() const {
132       return connectivity_state_;
133     }
picker_wrapper() const134     RefCountedPtr<ChildPickerWrapper> picker_wrapper() const {
135       return picker_wrapper_;
136     }
137 
138    private:
139     class Helper : public ChannelControlHelper {
140      public:
Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)141       explicit Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)
142           : xds_cluster_manager_child_(std::move(xds_cluster_manager_child)) {}
143 
~Helper()144       ~Helper() override {
145         xds_cluster_manager_child_.reset(DEBUG_LOCATION, "Helper");
146       }
147 
148       RefCountedPtr<SubchannelInterface> CreateSubchannel(
149           ServerAddress address, const grpc_channel_args& args) override;
150       void UpdateState(grpc_connectivity_state state,
151                        const absl::Status& status,
152                        std::unique_ptr<SubchannelPicker> picker) override;
153       void RequestReresolution() override;
154       void AddTraceEvent(TraceSeverity severity,
155                          absl::string_view message) override;
156 
157      private:
158       RefCountedPtr<ClusterChild> xds_cluster_manager_child_;
159     };
160 
161     // Methods for dealing with the child policy.
162     OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
163         const grpc_channel_args* args);
164 
165     static void OnDelayedRemovalTimer(void* arg, grpc_error* error);
166     void OnDelayedRemovalTimerLocked(grpc_error* error);
167 
168     // The owning LB policy.
169     RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy_;
170 
171     // Points to the corresponding key in children map.
172     const std::string name_;
173 
174     OrphanablePtr<LoadBalancingPolicy> child_policy_;
175 
176     RefCountedPtr<ChildPickerWrapper> picker_wrapper_;
177     grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
178     bool seen_failure_since_ready_ = false;
179 
180     // States for delayed removal.
181     grpc_timer delayed_removal_timer_;
182     grpc_closure on_delayed_removal_timer_;
183     bool delayed_removal_timer_callback_pending_ = false;
184     bool shutdown_ = false;
185   };
186 
187   ~XdsClusterManagerLb() override;
188 
189   void ShutdownLocked() override;
190 
191   void UpdateStateLocked();
192 
193   // Current config from the resolver.
194   RefCountedPtr<XdsClusterManagerLbConfig> config_;
195 
196   // Internal state.
197   bool shutting_down_ = false;
198 
199   // Children.
200   std::map<std::string, OrphanablePtr<ClusterChild>> children_;
201 };
202 
203 //
204 // XdsClusterManagerLb::ClusterPicker
205 //
206 
Pick(PickArgs args)207 XdsClusterManagerLb::PickResult XdsClusterManagerLb::ClusterPicker::Pick(
208     PickArgs args) {
209   auto cluster_name =
210       args.call_state->ExperimentalGetCallAttribute(kXdsClusterAttribute);
211   auto it = cluster_map_.find(cluster_name);
212   if (it != cluster_map_.end()) {
213     return it->second->Pick(args);
214   }
215   PickResult result;
216   result.type = PickResult::PICK_FAILED;
217   result.error = grpc_error_set_int(
218       GRPC_ERROR_CREATE_FROM_COPIED_STRING(
219           absl::StrCat("xds cluster manager picker: unknown cluster \"",
220                        cluster_name, "\"")
221               .c_str()),
222       GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
223   return result;
224 }
225 
226 //
227 // XdsClusterManagerLb
228 //
229 
XdsClusterManagerLb(Args args)230 XdsClusterManagerLb::XdsClusterManagerLb(Args args)
231     : LoadBalancingPolicy(std::move(args)) {}
232 
~XdsClusterManagerLb()233 XdsClusterManagerLb::~XdsClusterManagerLb() {
234   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
235     gpr_log(
236         GPR_INFO,
237         "[xds_cluster_manager_lb %p] destroying xds_cluster_manager LB policy",
238         this);
239   }
240 }
241 
ShutdownLocked()242 void XdsClusterManagerLb::ShutdownLocked() {
243   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
244     gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] shutting down", this);
245   }
246   shutting_down_ = true;
247   children_.clear();
248 }
249 
ExitIdleLocked()250 void XdsClusterManagerLb::ExitIdleLocked() {
251   for (auto& p : children_) p.second->ExitIdleLocked();
252 }
253 
ResetBackoffLocked()254 void XdsClusterManagerLb::ResetBackoffLocked() {
255   for (auto& p : children_) p.second->ResetBackoffLocked();
256 }
257 
UpdateLocked(UpdateArgs args)258 void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
259   if (shutting_down_) return;
260   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
261     gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] Received update", this);
262   }
263   // Update config.
264   config_ = std::move(args.config);
265   // Deactivate the children not in the new config.
266   for (const auto& p : children_) {
267     const std::string& name = p.first;
268     ClusterChild* child = p.second.get();
269     if (config_->cluster_map().find(name) == config_->cluster_map().end()) {
270       child->DeactivateLocked();
271     }
272   }
273   // Add or update the children in the new config.
274   for (const auto& p : config_->cluster_map()) {
275     const std::string& name = p.first;
276     const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second;
277     auto it = children_.find(name);
278     if (it == children_.end()) {
279       it = children_
280                .emplace(name, MakeOrphanable<ClusterChild>(
281                                   Ref(DEBUG_LOCATION, "ClusterChild"), name))
282                .first;
283     }
284     it->second->UpdateLocked(config, args.addresses, args.args);
285   }
286   UpdateStateLocked();
287 }
288 
UpdateStateLocked()289 void XdsClusterManagerLb::UpdateStateLocked() {
290   // Also count the number of children in each state, to determine the
291   // overall state.
292   size_t num_ready = 0;
293   size_t num_connecting = 0;
294   size_t num_idle = 0;
295   size_t num_transient_failures = 0;
296   for (const auto& p : children_) {
297     const auto& child_name = p.first;
298     const ClusterChild* child = p.second.get();
299     // Skip the children that are not in the latest update.
300     if (config_->cluster_map().find(child_name) ==
301         config_->cluster_map().end()) {
302       continue;
303     }
304     switch (child->connectivity_state()) {
305       case GRPC_CHANNEL_READY: {
306         ++num_ready;
307         break;
308       }
309       case GRPC_CHANNEL_CONNECTING: {
310         ++num_connecting;
311         break;
312       }
313       case GRPC_CHANNEL_IDLE: {
314         ++num_idle;
315         break;
316       }
317       case GRPC_CHANNEL_TRANSIENT_FAILURE: {
318         ++num_transient_failures;
319         break;
320       }
321       default:
322         GPR_UNREACHABLE_CODE(return );
323     }
324   }
325   // Determine aggregated connectivity state.
326   grpc_connectivity_state connectivity_state;
327   if (num_ready > 0) {
328     connectivity_state = GRPC_CHANNEL_READY;
329   } else if (num_connecting > 0) {
330     connectivity_state = GRPC_CHANNEL_CONNECTING;
331   } else if (num_idle > 0) {
332     connectivity_state = GRPC_CHANNEL_IDLE;
333   } else {
334     connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
335   }
336   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
337     gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] connectivity changed to %s",
338             this, ConnectivityStateName(connectivity_state));
339   }
340   std::unique_ptr<SubchannelPicker> picker;
341   absl::Status status;
342   switch (connectivity_state) {
343     case GRPC_CHANNEL_READY: {
344       ClusterPicker::ClusterMap cluster_map;
345       for (const auto& p : config_->cluster_map()) {
346         const std::string& cluster_name = p.first;
347         RefCountedPtr<ChildPickerWrapper>& child_picker =
348             cluster_map[cluster_name];
349         child_picker = children_[cluster_name]->picker_wrapper();
350         if (child_picker == nullptr) {
351           if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
352             gpr_log(
353                 GPR_INFO,
354                 "[xds_cluster_manager_lb %p] child %s has not yet returned a "
355                 "picker; creating a QueuePicker.",
356                 this, cluster_name.c_str());
357           }
358           child_picker = MakeRefCounted<ChildPickerWrapper>(
359               cluster_name, absl::make_unique<QueuePicker>(
360                                 Ref(DEBUG_LOCATION, "QueuePicker")));
361         }
362       }
363       picker = absl::make_unique<ClusterPicker>(std::move(cluster_map));
364       break;
365     }
366     case GRPC_CHANNEL_CONNECTING:
367     case GRPC_CHANNEL_IDLE:
368       picker =
369           absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
370       break;
371     default:
372       grpc_error* error = grpc_error_set_int(
373           GRPC_ERROR_CREATE_FROM_STATIC_STRING(
374               "TRANSIENT_FAILURE from XdsClusterManagerLb"),
375           GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
376       status = grpc_error_to_absl_status(error);
377       picker = absl::make_unique<TransientFailurePicker>(error);
378   }
379   channel_control_helper()->UpdateState(connectivity_state, status,
380                                         std::move(picker));
381 }
382 
383 //
384 // XdsClusterManagerLb::ClusterChild
385 //
386 
ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,const std::string & name)387 XdsClusterManagerLb::ClusterChild::ClusterChild(
388     RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
389     const std::string& name)
390     : xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)),
391       name_(name) {
392   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
393     gpr_log(GPR_INFO,
394             "[xds_cluster_manager_lb %p] created ClusterChild %p for %s",
395             xds_cluster_manager_policy_.get(), this, name_.c_str());
396   }
397   GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this,
398                     grpc_schedule_on_exec_ctx);
399 }
400 
~ClusterChild()401 XdsClusterManagerLb::ClusterChild::~ClusterChild() {
402   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
403     gpr_log(GPR_INFO,
404             "[xds_cluster_manager_lb %p] ClusterChild %p: destroying "
405             "child",
406             xds_cluster_manager_policy_.get(), this);
407   }
408   xds_cluster_manager_policy_.reset(DEBUG_LOCATION, "ClusterChild");
409 }
410 
Orphan()411 void XdsClusterManagerLb::ClusterChild::Orphan() {
412   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
413     gpr_log(GPR_INFO,
414             "[xds_cluster_manager_lb %p] ClusterChild %p %s: "
415             "shutting down child",
416             xds_cluster_manager_policy_.get(), this, name_.c_str());
417   }
418   // Remove the child policy's interested_parties pollset_set from the
419   // xDS policy.
420   grpc_pollset_set_del_pollset_set(
421       child_policy_->interested_parties(),
422       xds_cluster_manager_policy_->interested_parties());
423   child_policy_.reset();
424   // Drop our ref to the child's picker, in case it's holding a ref to
425   // the child.
426   picker_wrapper_.reset();
427   if (delayed_removal_timer_callback_pending_) {
428     grpc_timer_cancel(&delayed_removal_timer_);
429   }
430   shutdown_ = true;
431   Unref();
432 }
433 
434 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const grpc_channel_args * args)435 XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
436     const grpc_channel_args* args) {
437   LoadBalancingPolicy::Args lb_policy_args;
438   lb_policy_args.work_serializer =
439       xds_cluster_manager_policy_->work_serializer();
440   lb_policy_args.args = args;
441   lb_policy_args.channel_control_helper =
442       absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
443   OrphanablePtr<LoadBalancingPolicy> lb_policy =
444       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
445                                          &grpc_xds_cluster_manager_lb_trace);
446   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
447     gpr_log(GPR_INFO,
448             "[xds_cluster_manager_lb %p] ClusterChild %p %s: Created "
449             "new child "
450             "policy handler %p",
451             xds_cluster_manager_policy_.get(), this, name_.c_str(),
452             lb_policy.get());
453   }
454   // Add the xDS's interested_parties pollset_set to that of the newly created
455   // child policy. This will make the child policy progress upon activity on
456   // xDS LB, which in turn is tied to the application's call.
457   grpc_pollset_set_add_pollset_set(
458       lb_policy->interested_parties(),
459       xds_cluster_manager_policy_->interested_parties());
460   return lb_policy;
461 }
462 
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,const ServerAddressList & addresses,const grpc_channel_args * args)463 void XdsClusterManagerLb::ClusterChild::UpdateLocked(
464     RefCountedPtr<LoadBalancingPolicy::Config> config,
465     const ServerAddressList& addresses, const grpc_channel_args* args) {
466   if (xds_cluster_manager_policy_->shutting_down_) return;
467   // Update child weight.
468   // Reactivate if needed.
469   if (delayed_removal_timer_callback_pending_) {
470     delayed_removal_timer_callback_pending_ = false;
471     grpc_timer_cancel(&delayed_removal_timer_);
472   }
473   // Create child policy if needed.
474   if (child_policy_ == nullptr) {
475     child_policy_ = CreateChildPolicyLocked(args);
476   }
477   // Construct update args.
478   UpdateArgs update_args;
479   update_args.config = std::move(config);
480   update_args.addresses = addresses;
481   update_args.args = grpc_channel_args_copy(args);
482   // Update the policy.
483   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
484     gpr_log(GPR_INFO,
485             "[xds_cluster_manager_lb %p] ClusterChild %p %s: "
486             "Updating child "
487             "policy handler %p",
488             xds_cluster_manager_policy_.get(), this, name_.c_str(),
489             child_policy_.get());
490   }
491   child_policy_->UpdateLocked(std::move(update_args));
492 }
493 
ExitIdleLocked()494 void XdsClusterManagerLb::ClusterChild::ExitIdleLocked() {
495   child_policy_->ExitIdleLocked();
496 }
497 
ResetBackoffLocked()498 void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() {
499   child_policy_->ResetBackoffLocked();
500 }
501 
DeactivateLocked()502 void XdsClusterManagerLb::ClusterChild::DeactivateLocked() {
503   // If already deactivated, don't do that again.
504   if (delayed_removal_timer_callback_pending_ == true) return;
505   // Set the child weight to 0 so that future picker won't contain this child.
506   // Start a timer to delete the child.
507   Ref(DEBUG_LOCATION, "ClusterChild+timer").release();
508   grpc_timer_init(&delayed_removal_timer_,
509                   ExecCtx::Get()->Now() +
510                       GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS,
511                   &on_delayed_removal_timer_);
512   delayed_removal_timer_callback_pending_ = true;
513 }
514 
OnDelayedRemovalTimer(void * arg,grpc_error * error)515 void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimer(
516     void* arg, grpc_error* error) {
517   ClusterChild* self = static_cast<ClusterChild*>(arg);
518   GRPC_ERROR_REF(error);  // Ref owned by the lambda
519   self->xds_cluster_manager_policy_->work_serializer()->Run(
520       [self, error]() { self->OnDelayedRemovalTimerLocked(error); },
521       DEBUG_LOCATION);
522 }
523 
OnDelayedRemovalTimerLocked(grpc_error * error)524 void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked(
525     grpc_error* error) {
526   delayed_removal_timer_callback_pending_ = false;
527   if (error == GRPC_ERROR_NONE && !shutdown_) {
528     xds_cluster_manager_policy_->children_.erase(name_);
529   }
530   Unref(DEBUG_LOCATION, "ClusterChild+timer");
531   GRPC_ERROR_UNREF(error);
532 }
533 
534 //
535 // XdsClusterManagerLb::ClusterChild::Helper
536 //
537 
538 RefCountedPtr<SubchannelInterface>
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)539 XdsClusterManagerLb::ClusterChild::Helper::CreateSubchannel(
540     ServerAddress address, const grpc_channel_args& args) {
541   if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
542     return nullptr;
543   }
544   return xds_cluster_manager_child_->xds_cluster_manager_policy_
545       ->channel_control_helper()
546       ->CreateSubchannel(std::move(address), args);
547 }
548 
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)549 void XdsClusterManagerLb::ClusterChild::Helper::UpdateState(
550     grpc_connectivity_state state, const absl::Status& status,
551     std::unique_ptr<SubchannelPicker> picker) {
552   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
553     gpr_log(
554         GPR_INFO,
555         "[xds_cluster_manager_lb %p] child %s: received update: state=%s (%s) "
556         "picker=%p",
557         xds_cluster_manager_child_->xds_cluster_manager_policy_.get(),
558         xds_cluster_manager_child_->name_.c_str(), ConnectivityStateName(state),
559         status.ToString().c_str(), picker.get());
560   }
561   if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
562     return;
563   }
564   // Cache the picker in the ClusterChild.
565   xds_cluster_manager_child_->picker_wrapper_ =
566       MakeRefCounted<ChildPickerWrapper>(xds_cluster_manager_child_->name_,
567                                          std::move(picker));
568   // Decide what state to report for aggregation purposes.
569   // If we haven't seen a failure since the last time we were in state
570   // READY, then we report the state change as-is.  However, once we do see
571   // a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
572   // changes until we go back into state READY.
573   if (!xds_cluster_manager_child_->seen_failure_since_ready_) {
574     if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
575       xds_cluster_manager_child_->seen_failure_since_ready_ = true;
576     }
577   } else {
578     if (state != GRPC_CHANNEL_READY) return;
579     xds_cluster_manager_child_->seen_failure_since_ready_ = false;
580   }
581   xds_cluster_manager_child_->connectivity_state_ = state;
582   // Notify the LB policy.
583   xds_cluster_manager_child_->xds_cluster_manager_policy_->UpdateStateLocked();
584 }
585 
RequestReresolution()586 void XdsClusterManagerLb::ClusterChild::Helper::RequestReresolution() {
587   if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
588     return;
589   }
590   xds_cluster_manager_child_->xds_cluster_manager_policy_
591       ->channel_control_helper()
592       ->RequestReresolution();
593 }
594 
AddTraceEvent(TraceSeverity severity,absl::string_view message)595 void XdsClusterManagerLb::ClusterChild::Helper::AddTraceEvent(
596     TraceSeverity severity, absl::string_view message) {
597   if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
598     return;
599   }
600   xds_cluster_manager_child_->xds_cluster_manager_policy_
601       ->channel_control_helper()
602       ->AddTraceEvent(severity, message);
603 }
604 
605 //
606 // factory
607 //
608 
609 class XdsClusterManagerLbFactory : public LoadBalancingPolicyFactory {
610  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const611   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
612       LoadBalancingPolicy::Args args) const override {
613     return MakeOrphanable<XdsClusterManagerLb>(std::move(args));
614   }
615 
name() const616   const char* name() const override { return kXdsClusterManager; }
617 
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const618   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
619       const Json& json, grpc_error** error) const override {
620     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
621     if (json.type() == Json::Type::JSON_NULL) {
622       // xds_cluster_manager was mentioned as a policy in the deprecated
623       // loadBalancingPolicy field or in the client API.
624       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
625           "field:loadBalancingPolicy error:xds_cluster_manager policy requires "
626           "configuration.  Please use loadBalancingConfig field of service "
627           "config instead.");
628       return nullptr;
629     }
630     std::vector<grpc_error*> error_list;
631     XdsClusterManagerLbConfig::ClusterMap cluster_map;
632     std::set<std::string /*cluster_name*/> clusters_to_be_used;
633     auto it = json.object_value().find("children");
634     if (it == json.object_value().end()) {
635       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
636           "field:children error:required field not present"));
637     } else if (it->second.type() != Json::Type::OBJECT) {
638       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
639           "field:children error:type should be object"));
640     } else {
641       for (const auto& p : it->second.object_value()) {
642         const std::string& child_name = p.first;
643         if (child_name.empty()) {
644           error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
645               "field:children element error: name cannot be empty"));
646           continue;
647         }
648         RefCountedPtr<LoadBalancingPolicy::Config> child_config;
649         std::vector<grpc_error*> child_errors =
650             ParseChildConfig(p.second, &child_config);
651         if (!child_errors.empty()) {
652           // Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
653           // string is not static in this case.
654           grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
655               absl::StrCat("field:children name:", child_name).c_str());
656           for (grpc_error* child_error : child_errors) {
657             error = grpc_error_add_child(error, child_error);
658           }
659           error_list.push_back(error);
660         } else {
661           cluster_map[child_name] = std::move(child_config);
662           clusters_to_be_used.insert(child_name);
663         }
664       }
665     }
666     if (cluster_map.empty()) {
667       error_list.push_back(
668           GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid children configured"));
669     }
670     if (!error_list.empty()) {
671       *error = GRPC_ERROR_CREATE_FROM_VECTOR(
672           "xds_cluster_manager_experimental LB policy config", &error_list);
673       return nullptr;
674     }
675     return MakeRefCounted<XdsClusterManagerLbConfig>(std::move(cluster_map));
676   }
677 
678  private:
ParseChildConfig(const Json & json,RefCountedPtr<LoadBalancingPolicy::Config> * child_config)679   static std::vector<grpc_error*> ParseChildConfig(
680       const Json& json,
681       RefCountedPtr<LoadBalancingPolicy::Config>* child_config) {
682     std::vector<grpc_error*> error_list;
683     if (json.type() != Json::Type::OBJECT) {
684       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
685           "value should be of type object"));
686       return error_list;
687     }
688     auto it = json.object_value().find("childPolicy");
689     if (it == json.object_value().end()) {
690       error_list.push_back(
691           GRPC_ERROR_CREATE_FROM_STATIC_STRING("did not find childPolicy"));
692     } else {
693       grpc_error* parse_error = GRPC_ERROR_NONE;
694       *child_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
695           it->second, &parse_error);
696       if (*child_config == nullptr) {
697         GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
698         std::vector<grpc_error*> child_errors;
699         child_errors.push_back(parse_error);
700         error_list.push_back(
701             GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
702       }
703     }
704     return error_list;
705   }
706 };
707 
708 }  // namespace
709 
710 }  // namespace grpc_core
711 
712 //
713 // Plugin registration
714 //
715 
grpc_lb_policy_xds_cluster_manager_init()716 void grpc_lb_policy_xds_cluster_manager_init() {
717   grpc_core::LoadBalancingPolicyRegistry::Builder::
718       RegisterLoadBalancingPolicyFactory(
719           absl::make_unique<grpc_core::XdsClusterManagerLbFactory>());
720 }
721 
grpc_lb_policy_xds_cluster_manager_shutdown()722 void grpc_lb_policy_xds_cluster_manager_shutdown() {}
723