• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/load_balancing/weighted_target/weighted_target.h"
18 
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/impl/connectivity_state.h>
21 #include <grpc/support/port_platform.h>
22 #include <string.h>
23 
24 #include <algorithm>
25 #include <cstdint>
26 #include <map>
27 #include <memory>
28 #include <string>
29 #include <utility>
30 #include <vector>
31 
32 #include "absl/base/thread_annotations.h"
33 #include "absl/log/check.h"
34 #include "absl/log/log.h"
35 #include "absl/meta/type_traits.h"
36 #include "absl/random/random.h"
37 #include "absl/status/status.h"
38 #include "absl/status/statusor.h"
39 #include "absl/strings/str_cat.h"
40 #include "absl/strings/str_join.h"
41 #include "absl/strings/string_view.h"
42 #include "absl/types/optional.h"
43 #include "src/core/config/core_configuration.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/debug/trace.h"
46 #include "src/core/lib/iomgr/exec_ctx.h"
47 #include "src/core/lib/iomgr/pollset_set.h"
48 #include "src/core/lib/transport/connectivity_state.h"
49 #include "src/core/load_balancing/address_filtering.h"
50 #include "src/core/load_balancing/child_policy_handler.h"
51 #include "src/core/load_balancing/delegating_helper.h"
52 #include "src/core/load_balancing/lb_policy.h"
53 #include "src/core/load_balancing/lb_policy_factory.h"
54 #include "src/core/load_balancing/lb_policy_registry.h"
55 #include "src/core/resolver/endpoint_addresses.h"
56 #include "src/core/util/debug_location.h"
57 #include "src/core/util/json/json.h"
58 #include "src/core/util/json/json_args.h"
59 #include "src/core/util/json/json_object_loader.h"
60 #include "src/core/util/orphanable.h"
61 #include "src/core/util/ref_counted_ptr.h"
62 #include "src/core/util/sync.h"
63 #include "src/core/util/time.h"
64 #include "src/core/util/validation_errors.h"
65 #include "src/core/util/work_serializer.h"
66 
67 // IWYU pragma: no_include <type_traits>
68 
69 namespace grpc_core {
70 
71 namespace {
72 
73 using ::grpc_event_engine::experimental::EventEngine;
74 
75 constexpr absl::string_view kWeightedTarget = "weighted_target_experimental";
76 
77 // How long we keep a child around for after it has been removed from
78 // the config.
79 constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
80 
81 // Config for weighted_target LB policy.
82 class WeightedTargetLbConfig final : public LoadBalancingPolicy::Config {
83  public:
84   struct ChildConfig {
85     uint32_t weight;
86     RefCountedPtr<LoadBalancingPolicy::Config> config;
87 
88     static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
89     void JsonPostLoad(const Json& json, const JsonArgs&,
90                       ValidationErrors* errors);
91   };
92 
93   using TargetMap = std::map<std::string, ChildConfig>;
94 
95   WeightedTargetLbConfig() = default;
96 
97   WeightedTargetLbConfig(const WeightedTargetLbConfig&) = delete;
98   WeightedTargetLbConfig& operator=(const WeightedTargetLbConfig&) = delete;
99 
100   WeightedTargetLbConfig(WeightedTargetLbConfig&& other) = delete;
101   WeightedTargetLbConfig& operator=(WeightedTargetLbConfig&& other) = delete;
102 
name() const103   absl::string_view name() const override { return kWeightedTarget; }
104 
target_map() const105   const TargetMap& target_map() const { return target_map_; }
106 
107   static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
108 
109  private:
110   TargetMap target_map_;
111 };
112 
113 // weighted_target LB policy.
114 class WeightedTargetLb final : public LoadBalancingPolicy {
115  public:
116   explicit WeightedTargetLb(Args args);
117 
name() const118   absl::string_view name() const override { return kWeightedTarget; }
119 
120   absl::Status UpdateLocked(UpdateArgs args) override;
121   void ResetBackoffLocked() override;
122 
123  private:
124   // Picks a child using stateless WRR and then delegates to that
125   // child's picker.
126   class WeightedPicker final : public SubchannelPicker {
127    public:
128     // Maintains a weighted list of pickers from each child that is in
129     // ready state. The first element in the pair represents the end of a
130     // range proportional to the child's weight. The start of the range
131     // is the previous value in the vector and is 0 for the first element.
132     using PickerList =
133         std::vector<std::pair<uint64_t, RefCountedPtr<SubchannelPicker>>>;
134 
WeightedPicker(PickerList pickers)135     explicit WeightedPicker(PickerList pickers)
136         : pickers_(std::move(pickers)) {}
137 
138     PickResult Pick(PickArgs args) override;
139 
140    private:
141     PickerList pickers_;
142 
143     // TODO(roth): Consider using a separate thread-local BitGen for each CPU
144     // to avoid the need for this mutex.
145     Mutex mu_;
146     absl::BitGen bit_gen_ ABSL_GUARDED_BY(&mu_);
147   };
148 
149   // Each WeightedChild holds a ref to its parent WeightedTargetLb.
150   class WeightedChild final : public InternallyRefCounted<WeightedChild> {
151    public:
152     WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy,
153                   const std::string& name);
154     ~WeightedChild() override;
155 
156     void Orphan() override;
157 
158     absl::Status UpdateLocked(
159         const WeightedTargetLbConfig::ChildConfig& config,
160         absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
161         const std::string& resolution_note, ChannelArgs args);
162     void ResetBackoffLocked();
163     void DeactivateLocked();
164 
weight() const165     uint32_t weight() const { return weight_; }
connectivity_state() const166     grpc_connectivity_state connectivity_state() const {
167       return connectivity_state_;
168     }
picker() const169     RefCountedPtr<SubchannelPicker> picker() const { return picker_; }
170 
171    private:
172     class Helper final : public DelegatingChannelControlHelper {
173      public:
Helper(RefCountedPtr<WeightedChild> weighted_child)174       explicit Helper(RefCountedPtr<WeightedChild> weighted_child)
175           : weighted_child_(std::move(weighted_child)) {}
176 
~Helper()177       ~Helper() override { weighted_child_.reset(DEBUG_LOCATION, "Helper"); }
178 
179       void UpdateState(grpc_connectivity_state state,
180                        const absl::Status& status,
181                        RefCountedPtr<SubchannelPicker> picker) override;
182 
183      private:
parent_helper() const184       ChannelControlHelper* parent_helper() const override {
185         return weighted_child_->weighted_target_policy_
186             ->channel_control_helper();
187       }
188 
189       RefCountedPtr<WeightedChild> weighted_child_;
190     };
191 
192     class DelayedRemovalTimer final
193         : public InternallyRefCounted<DelayedRemovalTimer> {
194      public:
195       explicit DelayedRemovalTimer(RefCountedPtr<WeightedChild> weighted_child);
196 
197       void Orphan() override;
198 
199      private:
200       void OnTimerLocked();
201 
202       RefCountedPtr<WeightedChild> weighted_child_;
203       absl::optional<EventEngine::TaskHandle> timer_handle_;
204     };
205 
206     // Methods for dealing with the child policy.
207     OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
208         const ChannelArgs& args);
209 
210     void OnConnectivityStateUpdateLocked(
211         grpc_connectivity_state state, const absl::Status& status,
212         RefCountedPtr<SubchannelPicker> picker);
213 
214     // The owning LB policy.
215     RefCountedPtr<WeightedTargetLb> weighted_target_policy_;
216 
217     const std::string name_;
218 
219     uint32_t weight_ = 0;
220 
221     OrphanablePtr<LoadBalancingPolicy> child_policy_;
222 
223     RefCountedPtr<SubchannelPicker> picker_;
224     grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
225 
226     OrphanablePtr<DelayedRemovalTimer> delayed_removal_timer_;
227   };
228 
229   ~WeightedTargetLb() override;
230 
231   void ShutdownLocked() override;
232 
233   void UpdateStateLocked();
234 
235   // Current config from the resolver.
236   RefCountedPtr<WeightedTargetLbConfig> config_;
237 
238   // Internal state.
239   bool shutting_down_ = false;
240   bool update_in_progress_ = false;
241 
242   // Children.
243   std::map<std::string, OrphanablePtr<WeightedChild>> targets_;
244 };
245 
246 //
247 // WeightedTargetLb::WeightedPicker
248 //
249 
Pick(PickArgs args)250 WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick(
251     PickArgs args) {
252   // Generate a random number in [0, total weight).
253   const uint64_t key = [&]() {
254     MutexLock lock(&mu_);
255     return absl::Uniform<uint64_t>(bit_gen_, 0, pickers_.back().first);
256   }();
257   // Find the index in pickers_ corresponding to key.
258   size_t mid = 0;
259   size_t start_index = 0;
260   size_t end_index = pickers_.size() - 1;
261   size_t index = 0;
262   while (end_index > start_index) {
263     mid = (start_index + end_index) / 2;
264     if (pickers_[mid].first > key) {
265       end_index = mid;
266     } else if (pickers_[mid].first < key) {
267       start_index = mid + 1;
268     } else {
269       index = mid + 1;
270       break;
271     }
272   }
273   if (index == 0) index = start_index;
274   CHECK(pickers_[index].first > key);
275   // Delegate to the child picker.
276   return pickers_[index].second->Pick(args);
277 }
278 
279 //
280 // WeightedTargetLb
281 //
282 
WeightedTargetLb(Args args)283 WeightedTargetLb::WeightedTargetLb(Args args)
284     : LoadBalancingPolicy(std::move(args)) {
285   GRPC_TRACE_LOG(weighted_target_lb, INFO)
286       << "[weighted_target_lb " << this << "] created";
287 }
288 
~WeightedTargetLb()289 WeightedTargetLb::~WeightedTargetLb() {
290   GRPC_TRACE_LOG(weighted_target_lb, INFO)
291       << "[weighted_target_lb " << this
292       << "] destroying weighted_target LB policy";
293 }
294 
ShutdownLocked()295 void WeightedTargetLb::ShutdownLocked() {
296   GRPC_TRACE_LOG(weighted_target_lb, INFO)
297       << "[weighted_target_lb " << this << "] shutting down";
298   shutting_down_ = true;
299   targets_.clear();
300 }
301 
ResetBackoffLocked()302 void WeightedTargetLb::ResetBackoffLocked() {
303   for (auto& p : targets_) p.second->ResetBackoffLocked();
304 }
305 
UpdateLocked(UpdateArgs args)306 absl::Status WeightedTargetLb::UpdateLocked(UpdateArgs args) {
307   if (shutting_down_) return absl::OkStatus();
308   GRPC_TRACE_LOG(weighted_target_lb, INFO)
309       << "[weighted_target_lb " << this << "] received update";
310   update_in_progress_ = true;
311   // Update config.
312   config_ = args.config.TakeAsSubclass<WeightedTargetLbConfig>();
313   // Deactivate the targets not in the new config.
314   for (const auto& p : targets_) {
315     const std::string& name = p.first;
316     WeightedChild* child = p.second.get();
317     if (config_->target_map().find(name) == config_->target_map().end()) {
318       child->DeactivateLocked();
319     }
320   }
321   // Update all children.
322   absl::StatusOr<HierarchicalAddressMap> address_map =
323       MakeHierarchicalAddressMap(args.addresses);
324   std::vector<std::string> errors;
325   for (const auto& p : config_->target_map()) {
326     const std::string& name = p.first;
327     const WeightedTargetLbConfig::ChildConfig& config = p.second;
328     auto& target = targets_[name];
329     // Create child if it does not already exist.
330     if (target == nullptr) {
331       target = MakeOrphanable<WeightedChild>(
332           RefAsSubclass<WeightedTargetLb>(DEBUG_LOCATION, "WeightedChild"),
333           name);
334     }
335     absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses;
336     if (address_map.ok()) {
337       auto it = address_map->find(name);
338       if (it == address_map->end()) {
339         addresses = std::make_shared<EndpointAddressesListIterator>(
340             EndpointAddressesList());
341       } else {
342         addresses = std::move(it->second);
343       }
344     } else {
345       addresses = address_map.status();
346     }
347     absl::Status status = target->UpdateLocked(config, std::move(addresses),
348                                                args.resolution_note, args.args);
349     if (!status.ok()) {
350       errors.emplace_back(
351           absl::StrCat("child ", name, ": ", status.ToString()));
352     }
353   }
354   update_in_progress_ = false;
355   if (config_->target_map().empty()) {
356     absl::Status status = absl::UnavailableError(absl::StrCat(
357         "no children in weighted_target policy: ", args.resolution_note));
358     channel_control_helper()->UpdateState(
359         GRPC_CHANNEL_TRANSIENT_FAILURE, status,
360         MakeRefCounted<TransientFailurePicker>(status));
361     return absl::OkStatus();
362   }
363   UpdateStateLocked();
364   // Return status.
365   if (!errors.empty()) {
366     return absl::UnavailableError(absl::StrCat(
367         "errors from children: [", absl::StrJoin(errors, "; "), "]"));
368   }
369   return absl::OkStatus();
370 }
371 
UpdateStateLocked()372 void WeightedTargetLb::UpdateStateLocked() {
373   // If we're in the process of propagating an update from our parent to
374   // our children, ignore any updates that come from the children.  We
375   // will instead return a new picker once the update has been seen by
376   // all children.  This avoids unnecessary picker churn while an update
377   // is being propagated to our children.
378   if (update_in_progress_) return;
379   GRPC_TRACE_LOG(weighted_target_lb, INFO)
380       << "[weighted_target_lb " << this
381       << "] scanning children to determine connectivity state";
382   // Construct lists of child pickers with associated weights, one for
383   // children that are in state READY and another for children that are
384   // in state TRANSIENT_FAILURE.  Each child is represented by a portion of
385   // the range proportional to its weight, such that the total range is the
386   // sum of the weights of all children.
387   WeightedPicker::PickerList ready_picker_list;
388   uint64_t ready_end = 0;
389   WeightedPicker::PickerList tf_picker_list;
390   uint64_t tf_end = 0;
391   // Also count the number of children in CONNECTING and IDLE, to determine
392   // the aggregated state.
393   size_t num_connecting = 0;
394   size_t num_idle = 0;
395   for (const auto& p : targets_) {
396     const std::string& child_name = p.first;
397     const WeightedChild* child = p.second.get();
398     // Skip the targets that are not in the latest update.
399     if (config_->target_map().find(child_name) == config_->target_map().end()) {
400       continue;
401     }
402     auto child_picker = child->picker();
403     GRPC_TRACE_LOG(weighted_target_lb, INFO)
404         << "[weighted_target_lb " << this << "]   child=" << child_name
405         << " state=" << ConnectivityStateName(child->connectivity_state())
406         << " weight=" << child->weight() << " picker=" << child_picker.get();
407     switch (child->connectivity_state()) {
408       case GRPC_CHANNEL_READY: {
409         CHECK_GT(child->weight(), 0u);
410         ready_end += child->weight();
411         ready_picker_list.emplace_back(ready_end, std::move(child_picker));
412         break;
413       }
414       case GRPC_CHANNEL_CONNECTING: {
415         ++num_connecting;
416         break;
417       }
418       case GRPC_CHANNEL_IDLE: {
419         ++num_idle;
420         break;
421       }
422       case GRPC_CHANNEL_TRANSIENT_FAILURE: {
423         CHECK_GT(child->weight(), 0u);
424         tf_end += child->weight();
425         tf_picker_list.emplace_back(tf_end, std::move(child_picker));
426         break;
427       }
428       default:
429         GPR_UNREACHABLE_CODE(return);
430     }
431   }
432   // Determine aggregated connectivity state.
433   grpc_connectivity_state connectivity_state;
434   if (!ready_picker_list.empty()) {
435     connectivity_state = GRPC_CHANNEL_READY;
436   } else if (num_connecting > 0) {
437     connectivity_state = GRPC_CHANNEL_CONNECTING;
438   } else if (num_idle > 0) {
439     connectivity_state = GRPC_CHANNEL_IDLE;
440   } else {
441     connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
442   }
443   GRPC_TRACE_LOG(weighted_target_lb, INFO)
444       << "[weighted_target_lb " << this << "] connectivity changed to "
445       << ConnectivityStateName(connectivity_state);
446   RefCountedPtr<SubchannelPicker> picker;
447   absl::Status status;
448   switch (connectivity_state) {
449     case GRPC_CHANNEL_READY:
450       picker = MakeRefCounted<WeightedPicker>(std::move(ready_picker_list));
451       break;
452     case GRPC_CHANNEL_CONNECTING:
453     case GRPC_CHANNEL_IDLE:
454       picker = MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
455       break;
456     default:
457       picker = MakeRefCounted<WeightedPicker>(std::move(tf_picker_list));
458   }
459   channel_control_helper()->UpdateState(connectivity_state, status,
460                                         std::move(picker));
461 }
462 
463 //
464 // WeightedTargetLb::WeightedChild::DelayedRemovalTimer
465 //
466 
DelayedRemovalTimer(RefCountedPtr<WeightedTargetLb::WeightedChild> weighted_child)467 WeightedTargetLb::WeightedChild::DelayedRemovalTimer::DelayedRemovalTimer(
468     RefCountedPtr<WeightedTargetLb::WeightedChild> weighted_child)
469     : weighted_child_(std::move(weighted_child)) {
470   timer_handle_ =
471       weighted_child_->weighted_target_policy_->channel_control_helper()
472           ->GetEventEngine()
473           ->RunAfter(kChildRetentionInterval, [self = Ref()]() mutable {
474             ApplicationCallbackExecCtx app_exec_ctx;
475             ExecCtx exec_ctx;
476             auto* self_ptr = self.get();  // Avoid use-after-move problem.
477             self_ptr->weighted_child_->weighted_target_policy_
478                 ->work_serializer()
479                 ->Run([self = std::move(self)] { self->OnTimerLocked(); },
480                       DEBUG_LOCATION);
481           });
482 }
483 
Orphan()484 void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() {
485   if (timer_handle_.has_value()) {
486     GRPC_TRACE_LOG(weighted_target_lb, INFO)
487         << "[weighted_target_lb "
488         << weighted_child_->weighted_target_policy_.get() << "] WeightedChild "
489         << weighted_child_.get() << " " << weighted_child_->name_
490         << ": cancelling delayed removal timer";
491     weighted_child_->weighted_target_policy_->channel_control_helper()
492         ->GetEventEngine()
493         ->Cancel(*timer_handle_);
494   }
495   Unref();
496 }
497 
OnTimerLocked()498 void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::OnTimerLocked() {
499   CHECK(timer_handle_.has_value());
500   timer_handle_.reset();
501   weighted_child_->weighted_target_policy_->targets_.erase(
502       weighted_child_->name_);
503 }
504 
505 //
506 // WeightedTargetLb::WeightedChild
507 //
508 
WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy,const std::string & name)509 WeightedTargetLb::WeightedChild::WeightedChild(
510     RefCountedPtr<WeightedTargetLb> weighted_target_policy,
511     const std::string& name)
512     : weighted_target_policy_(std::move(weighted_target_policy)),
513       name_(name),
514       picker_(MakeRefCounted<QueuePicker>(nullptr)) {
515   GRPC_TRACE_LOG(weighted_target_lb, INFO)
516       << "[weighted_target_lb " << weighted_target_policy_.get()
517       << "] created WeightedChild " << this << " for " << name_;
518 }
519 
~WeightedChild()520 WeightedTargetLb::WeightedChild::~WeightedChild() {
521   GRPC_TRACE_LOG(weighted_target_lb, INFO)
522       << "[weighted_target_lb " << weighted_target_policy_.get()
523       << "] WeightedChild " << this << " " << name_ << ": destroying child";
524   weighted_target_policy_.reset(DEBUG_LOCATION, "WeightedChild");
525 }
526 
Orphan()527 void WeightedTargetLb::WeightedChild::Orphan() {
528   GRPC_TRACE_LOG(weighted_target_lb, INFO)
529       << "[weighted_target_lb " << weighted_target_policy_.get()
530       << "] WeightedChild " << this << " " << name_ << ": shutting down child";
531   // Remove the child policy's interested_parties pollset_set from the
532   // xDS policy.
533   grpc_pollset_set_del_pollset_set(
534       child_policy_->interested_parties(),
535       weighted_target_policy_->interested_parties());
536   child_policy_.reset();
537   // Drop our ref to the child's picker, in case it's holding a ref to
538   // the child.
539   picker_.reset();
540   delayed_removal_timer_.reset();
541   Unref();
542 }
543 
544 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const ChannelArgs & args)545 WeightedTargetLb::WeightedChild::CreateChildPolicyLocked(
546     const ChannelArgs& args) {
547   LoadBalancingPolicy::Args lb_policy_args;
548   lb_policy_args.work_serializer = weighted_target_policy_->work_serializer();
549   lb_policy_args.args = args;
550   lb_policy_args.channel_control_helper =
551       std::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
552   OrphanablePtr<LoadBalancingPolicy> lb_policy =
553       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
554                                          &weighted_target_lb_trace);
555   GRPC_TRACE_LOG(weighted_target_lb, INFO)
556       << "[weighted_target_lb " << weighted_target_policy_.get()
557       << "] WeightedChild " << this << " " << name_
558       << ": created new child policy handler " << lb_policy.get();
559   // Add the xDS's interested_parties pollset_set to that of the newly created
560   // child policy. This will make the child policy progress upon activity on
561   // xDS LB, which in turn is tied to the application's call.
562   grpc_pollset_set_add_pollset_set(
563       lb_policy->interested_parties(),
564       weighted_target_policy_->interested_parties());
565   return lb_policy;
566 }
567 
UpdateLocked(const WeightedTargetLbConfig::ChildConfig & config,absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,const std::string & resolution_note,ChannelArgs args)568 absl::Status WeightedTargetLb::WeightedChild::UpdateLocked(
569     const WeightedTargetLbConfig::ChildConfig& config,
570     absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
571     const std::string& resolution_note, ChannelArgs args) {
572   if (weighted_target_policy_->shutting_down_) return absl::OkStatus();
573   // Update child weight.
574   if (weight_ != config.weight && GRPC_TRACE_FLAG_ENABLED(weighted_target_lb)) {
575     LOG(INFO) << "[weighted_target_lb " << weighted_target_policy_.get()
576               << "] WeightedChild " << this << " " << name_
577               << ": weight=" << config.weight;
578   }
579   weight_ = config.weight;
580   // Reactivate if needed.
581   if (delayed_removal_timer_ != nullptr) {
582     GRPC_TRACE_LOG(weighted_target_lb, INFO)
583         << "[weighted_target_lb " << weighted_target_policy_.get()
584         << "] WeightedChild " << this << " " << name_ << ": reactivating";
585     delayed_removal_timer_.reset();
586   }
587   // Create child policy if needed.
588   args = args.Set(GRPC_ARG_LB_WEIGHTED_TARGET_CHILD, name_);
589   if (child_policy_ == nullptr) {
590     child_policy_ = CreateChildPolicyLocked(args);
591   }
592   // Construct update args.
593   UpdateArgs update_args;
594   update_args.config = config.config;
595   update_args.addresses = std::move(addresses);
596   update_args.resolution_note = resolution_note;
597   update_args.args = std::move(args);
598   // Update the policy.
599   GRPC_TRACE_LOG(weighted_target_lb, INFO)
600       << "[weighted_target_lb " << weighted_target_policy_.get()
601       << "] WeightedChild " << this << " " << name_
602       << ": updating child policy handler " << child_policy_.get();
603   return child_policy_->UpdateLocked(std::move(update_args));
604 }
605 
ResetBackoffLocked()606 void WeightedTargetLb::WeightedChild::ResetBackoffLocked() {
607   child_policy_->ResetBackoffLocked();
608 }
609 
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)610 void WeightedTargetLb::WeightedChild::OnConnectivityStateUpdateLocked(
611     grpc_connectivity_state state, const absl::Status& status,
612     RefCountedPtr<SubchannelPicker> picker) {
613   // Cache the picker in the WeightedChild.
614   picker_ = std::move(picker);
615   GRPC_TRACE_LOG(weighted_target_lb, INFO)
616       << "[weighted_target_lb " << weighted_target_policy_.get()
617       << "] WeightedChild " << this << " " << name_
618       << ": connectivity state update: state=" << ConnectivityStateName(state)
619       << " (" << status << ") picker=" << picker_.get();
620   // If the child reports IDLE, immediately tell it to exit idle.
621   if (state == GRPC_CHANNEL_IDLE) child_policy_->ExitIdleLocked();
622   // Decide what state to report for aggregation purposes.
623   // If the last recorded state was TRANSIENT_FAILURE and the new state
624   // is something other than READY, don't change the state.
625   if (connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE ||
626       state == GRPC_CHANNEL_READY) {
627     connectivity_state_ = state;
628   }
629   // Update the LB policy's state if this child is not deactivated.
630   if (weight_ != 0) weighted_target_policy_->UpdateStateLocked();
631 }
632 
DeactivateLocked()633 void WeightedTargetLb::WeightedChild::DeactivateLocked() {
634   // If already deactivated, don't do that again.
635   if (weight_ == 0) return;
636   GRPC_TRACE_LOG(weighted_target_lb, INFO)
637       << "[weighted_target_lb " << weighted_target_policy_.get()
638       << "] WeightedChild " << this << " " << name_ << ": deactivating";
639   // Set the child weight to 0 so that future picker won't contain this child.
640   weight_ = 0;
641   // Start a timer to delete the child.
642   delayed_removal_timer_ = MakeOrphanable<DelayedRemovalTimer>(
643       Ref(DEBUG_LOCATION, "DelayedRemovalTimer"));
644 }
645 
646 //
647 // WeightedTargetLb::WeightedChild::Helper
648 //
649 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)650 void WeightedTargetLb::WeightedChild::Helper::UpdateState(
651     grpc_connectivity_state state, const absl::Status& status,
652     RefCountedPtr<SubchannelPicker> picker) {
653   if (weighted_child_->weighted_target_policy_->shutting_down_) return;
654   weighted_child_->OnConnectivityStateUpdateLocked(state, status,
655                                                    std::move(picker));
656 }
657 
658 //
659 // factory
660 //
661 
JsonLoader(const JsonArgs &)662 const JsonLoaderInterface* WeightedTargetLbConfig::ChildConfig::JsonLoader(
663     const JsonArgs&) {
664   static const auto* loader =
665       JsonObjectLoader<ChildConfig>()
666           // Note: The config field requires custom parsing, so it's
667           // handled in JsonPostLoad() instead.
668           .Field("weight", &ChildConfig::weight)
669           .Finish();
670   return loader;
671 }
672 
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)673 void WeightedTargetLbConfig::ChildConfig::JsonPostLoad(
674     const Json& json, const JsonArgs&, ValidationErrors* errors) {
675   ValidationErrors::ScopedField field(errors, ".childPolicy");
676   auto it = json.object().find("childPolicy");
677   if (it == json.object().end()) {
678     errors->AddError("field not present");
679     return;
680   }
681   auto lb_config =
682       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
683           it->second);
684   if (!lb_config.ok()) {
685     errors->AddError(lb_config.status().message());
686     return;
687   }
688   config = std::move(*lb_config);
689 }
690 
JsonLoader(const JsonArgs &)691 const JsonLoaderInterface* WeightedTargetLbConfig::JsonLoader(const JsonArgs&) {
692   static const auto* loader =
693       JsonObjectLoader<WeightedTargetLbConfig>()
694           .Field("targets", &WeightedTargetLbConfig::target_map_)
695           .Finish();
696   return loader;
697 }
698 
699 class WeightedTargetLbFactory final : public LoadBalancingPolicyFactory {
700  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const701   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
702       LoadBalancingPolicy::Args args) const override {
703     return MakeOrphanable<WeightedTargetLb>(std::move(args));
704   }
705 
name() const706   absl::string_view name() const override { return kWeightedTarget; }
707 
708   absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const709   ParseLoadBalancingConfig(const Json& json) const override {
710     return LoadFromJson<RefCountedPtr<WeightedTargetLbConfig>>(
711         json, JsonArgs(), "errors validating weighted_target LB policy config");
712   }
713 };
714 
715 }  // namespace
716 
RegisterWeightedTargetLbPolicy(CoreConfiguration::Builder * builder)717 void RegisterWeightedTargetLbPolicy(CoreConfiguration::Builder* builder) {
718   builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
719       std::make_unique<WeightedTargetLbFactory>());
720 }
721 
722 }  // namespace grpc_core
723