• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <inttypes.h>
20 #include <limits.h>
21 #include <string.h>
22 
23 #include "absl/container/inlined_vector.h"
24 #include "absl/strings/str_cat.h"
25 
26 #include <grpc/grpc.h>
27 
28 #include "src/core/ext/filters/client_channel/lb_policy.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
30 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
31 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
32 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
33 #include "src/core/lib/channel/channel_args.h"
34 #include "src/core/lib/gpr/string.h"
35 #include "src/core/lib/gprpp/orphanable.h"
36 #include "src/core/lib/gprpp/ref_counted_ptr.h"
37 #include "src/core/lib/iomgr/timer.h"
38 #include "src/core/lib/iomgr/work_serializer.h"
39 #include "src/core/lib/transport/error_utils.h"
40 
41 namespace grpc_core {
42 
43 TraceFlag grpc_lb_weighted_target_trace(false, "weighted_target_lb");
44 
45 namespace {
46 
47 constexpr char kWeightedTarget[] = "weighted_target_experimental";
48 
49 // How long we keep a child around for after it has been removed from
50 // the config.
51 constexpr int kChildRetentionIntervalMs = 15 * 60 * 1000;
52 
53 // Config for weighted_target LB policy.
54 class WeightedTargetLbConfig : public LoadBalancingPolicy::Config {
55  public:
56   struct ChildConfig {
57     uint32_t weight;
58     RefCountedPtr<LoadBalancingPolicy::Config> config;
59   };
60 
61   using TargetMap = std::map<std::string, ChildConfig>;
62 
WeightedTargetLbConfig(TargetMap target_map)63   explicit WeightedTargetLbConfig(TargetMap target_map)
64       : target_map_(std::move(target_map)) {}
65 
name() const66   const char* name() const override { return kWeightedTarget; }
67 
target_map() const68   const TargetMap& target_map() const { return target_map_; }
69 
70  private:
71   TargetMap target_map_;
72 };
73 
74 // weighted_target LB policy.
75 class WeightedTargetLb : public LoadBalancingPolicy {
76  public:
77   explicit WeightedTargetLb(Args args);
78 
name() const79   const char* name() const override { return kWeightedTarget; }
80 
81   void UpdateLocked(UpdateArgs args) override;
82   void ResetBackoffLocked() override;
83 
84  private:
85   // A simple wrapper for ref-counting a picker from the child policy.
86   class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> {
87    public:
ChildPickerWrapper(std::unique_ptr<SubchannelPicker> picker)88     explicit ChildPickerWrapper(std::unique_ptr<SubchannelPicker> picker)
89         : picker_(std::move(picker)) {}
Pick(PickArgs args)90     PickResult Pick(PickArgs args) { return picker_->Pick(args); }
91 
92    private:
93     std::unique_ptr<SubchannelPicker> picker_;
94   };
95 
96   // Picks a child using stateless WRR and then delegates to that
97   // child's picker.
98   class WeightedPicker : public SubchannelPicker {
99    public:
100     // Maintains a weighted list of pickers from each child that is in
101     // ready state. The first element in the pair represents the end of a
102     // range proportional to the child's weight. The start of the range
103     // is the previous value in the vector and is 0 for the first element.
104     using PickerList = absl::InlinedVector<
105         std::pair<uint32_t, RefCountedPtr<ChildPickerWrapper>>, 1>;
106 
WeightedPicker(PickerList pickers)107     explicit WeightedPicker(PickerList pickers)
108         : pickers_(std::move(pickers)) {}
109 
110     PickResult Pick(PickArgs args) override;
111 
112    private:
113     PickerList pickers_;
114   };
115 
116   // Each WeightedChild holds a ref to its parent WeightedTargetLb.
117   class WeightedChild : public InternallyRefCounted<WeightedChild> {
118    public:
119     WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy,
120                   const std::string& name);
121     ~WeightedChild() override;
122 
123     void Orphan() override;
124 
125     void UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config,
126                       ServerAddressList addresses,
127                       const grpc_channel_args* args);
128     void ResetBackoffLocked();
129     void DeactivateLocked();
130 
weight() const131     uint32_t weight() const { return weight_; }
connectivity_state() const132     grpc_connectivity_state connectivity_state() const {
133       return connectivity_state_;
134     }
picker_wrapper() const135     RefCountedPtr<ChildPickerWrapper> picker_wrapper() const {
136       return picker_wrapper_;
137     }
138 
139    private:
140     class Helper : public ChannelControlHelper {
141      public:
Helper(RefCountedPtr<WeightedChild> weighted_child)142       explicit Helper(RefCountedPtr<WeightedChild> weighted_child)
143           : weighted_child_(std::move(weighted_child)) {}
144 
~Helper()145       ~Helper() override { weighted_child_.reset(DEBUG_LOCATION, "Helper"); }
146 
147       RefCountedPtr<SubchannelInterface> CreateSubchannel(
148           ServerAddress address, const grpc_channel_args& args) override;
149       void UpdateState(grpc_connectivity_state state,
150                        const absl::Status& status,
151                        std::unique_ptr<SubchannelPicker> picker) override;
152       void RequestReresolution() override;
153       void AddTraceEvent(TraceSeverity severity,
154                          absl::string_view message) override;
155 
156      private:
157       RefCountedPtr<WeightedChild> weighted_child_;
158     };
159 
160     // Methods for dealing with the child policy.
161     OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
162         const grpc_channel_args* args);
163 
164     void OnConnectivityStateUpdateLocked(
165         grpc_connectivity_state state, const absl::Status& status,
166         std::unique_ptr<SubchannelPicker> picker);
167 
168     static void OnDelayedRemovalTimer(void* arg, grpc_error* error);
169     void OnDelayedRemovalTimerLocked(grpc_error* error);
170 
171     // The owning LB policy.
172     RefCountedPtr<WeightedTargetLb> weighted_target_policy_;
173 
174     const std::string name_;
175 
176     uint32_t weight_;
177 
178     OrphanablePtr<LoadBalancingPolicy> child_policy_;
179 
180     RefCountedPtr<ChildPickerWrapper> picker_wrapper_;
181     grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
182     bool seen_failure_since_ready_ = false;
183 
184     // States for delayed removal.
185     grpc_timer delayed_removal_timer_;
186     grpc_closure on_delayed_removal_timer_;
187     bool delayed_removal_timer_callback_pending_ = false;
188     bool shutdown_ = false;
189   };
190 
191   ~WeightedTargetLb() override;
192 
193   void ShutdownLocked() override;
194 
195   void UpdateStateLocked();
196 
197   // Current config from the resolver.
198   RefCountedPtr<WeightedTargetLbConfig> config_;
199 
200   // Internal state.
201   bool shutting_down_ = false;
202 
203   // Children.
204   std::map<std::string, OrphanablePtr<WeightedChild>> targets_;
205 };
206 
207 //
208 // WeightedTargetLb::WeightedPicker
209 //
210 
Pick(PickArgs args)211 WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick(
212     PickArgs args) {
213   // Generate a random number in [0, total weight).
214   const uint32_t key = rand() % pickers_[pickers_.size() - 1].first;
215   // Find the index in pickers_ corresponding to key.
216   size_t mid = 0;
217   size_t start_index = 0;
218   size_t end_index = pickers_.size() - 1;
219   size_t index = 0;
220   while (end_index > start_index) {
221     mid = (start_index + end_index) / 2;
222     if (pickers_[mid].first > key) {
223       end_index = mid;
224     } else if (pickers_[mid].first < key) {
225       start_index = mid + 1;
226     } else {
227       index = mid + 1;
228       break;
229     }
230   }
231   if (index == 0) index = start_index;
232   GPR_ASSERT(pickers_[index].first > key);
233   // Delegate to the child picker.
234   return pickers_[index].second->Pick(args);
235 }
236 
237 //
238 // WeightedTargetLb
239 //
240 
WeightedTargetLb(Args args)241 WeightedTargetLb::WeightedTargetLb(Args args)
242     : LoadBalancingPolicy(std::move(args)) {
243   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
244     gpr_log(GPR_INFO, "[weighted_target_lb %p] created", this);
245   }
246 }
247 
~WeightedTargetLb()248 WeightedTargetLb::~WeightedTargetLb() {
249   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
250     gpr_log(GPR_INFO,
251             "[weighted_target_lb %p] destroying weighted_target LB policy",
252             this);
253   }
254 }
255 
ShutdownLocked()256 void WeightedTargetLb::ShutdownLocked() {
257   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
258     gpr_log(GPR_INFO, "[weighted_target_lb %p] shutting down", this);
259   }
260   shutting_down_ = true;
261   targets_.clear();
262 }
263 
ResetBackoffLocked()264 void WeightedTargetLb::ResetBackoffLocked() {
265   for (auto& p : targets_) p.second->ResetBackoffLocked();
266 }
267 
UpdateLocked(UpdateArgs args)268 void WeightedTargetLb::UpdateLocked(UpdateArgs args) {
269   if (shutting_down_) return;
270   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
271     gpr_log(GPR_INFO, "[weighted_target_lb %p] Received update", this);
272   }
273   // Update config.
274   config_ = std::move(args.config);
275   // Deactivate the targets not in the new config.
276   for (const auto& p : targets_) {
277     const std::string& name = p.first;
278     WeightedChild* child = p.second.get();
279     if (config_->target_map().find(name) == config_->target_map().end()) {
280       child->DeactivateLocked();
281     }
282   }
283   // Create any children that don't already exist.
284   // Note that we add all children before updating any of them, because
285   // an update may trigger a child to immediately update its
286   // connectivity state (e.g., reporting TRANSIENT_FAILURE immediately when
287   // receiving an empty address list), and we don't want to return an
288   // overall state with incomplete data.
289   for (const auto& p : config_->target_map()) {
290     const std::string& name = p.first;
291     auto it = targets_.find(name);
292     if (it == targets_.end()) {
293       targets_.emplace(name, MakeOrphanable<WeightedChild>(
294                                  Ref(DEBUG_LOCATION, "WeightedChild"), name));
295     }
296   }
297   // Update all children.
298   HierarchicalAddressMap address_map =
299       MakeHierarchicalAddressMap(args.addresses);
300   for (const auto& p : config_->target_map()) {
301     const std::string& name = p.first;
302     const WeightedTargetLbConfig::ChildConfig& config = p.second;
303     targets_[name]->UpdateLocked(config, std::move(address_map[name]),
304                                  args.args);
305   }
306   UpdateStateLocked();
307 }
308 
UpdateStateLocked()309 void WeightedTargetLb::UpdateStateLocked() {
310   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
311     gpr_log(GPR_INFO,
312             "[weighted_target_lb %p] scanning children to determine "
313             "connectivity state",
314             this);
315   }
316   // Construct a new picker which maintains a map of all child pickers
317   // that are ready. Each child is represented by a portion of the range
318   // proportional to its weight, such that the total range is the sum of the
319   // weights of all children.
320   WeightedPicker::PickerList picker_list;
321   uint32_t end = 0;
322   // Also count the number of children in each state, to determine the
323   // overall state.
324   size_t num_connecting = 0;
325   size_t num_idle = 0;
326   size_t num_transient_failures = 0;
327   for (const auto& p : targets_) {
328     const std::string& child_name = p.first;
329     const WeightedChild* child = p.second.get();
330     // Skip the targets that are not in the latest update.
331     if (config_->target_map().find(child_name) == config_->target_map().end()) {
332       continue;
333     }
334     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
335       gpr_log(GPR_INFO,
336               "[weighted_target_lb %p]   child=%s state=%s weight=%d picker=%p",
337               this, child_name.c_str(),
338               ConnectivityStateName(child->connectivity_state()),
339               child->weight(), child->picker_wrapper().get());
340     }
341     switch (child->connectivity_state()) {
342       case GRPC_CHANNEL_READY: {
343         end += child->weight();
344         picker_list.push_back(std::make_pair(end, child->picker_wrapper()));
345         break;
346       }
347       case GRPC_CHANNEL_CONNECTING: {
348         ++num_connecting;
349         break;
350       }
351       case GRPC_CHANNEL_IDLE: {
352         ++num_idle;
353         break;
354       }
355       case GRPC_CHANNEL_TRANSIENT_FAILURE: {
356         ++num_transient_failures;
357         break;
358       }
359       default:
360         GPR_UNREACHABLE_CODE(return );
361     }
362   }
363   // Determine aggregated connectivity state.
364   grpc_connectivity_state connectivity_state;
365   if (!picker_list.empty()) {
366     connectivity_state = GRPC_CHANNEL_READY;
367   } else if (num_connecting > 0) {
368     connectivity_state = GRPC_CHANNEL_CONNECTING;
369   } else if (num_idle > 0) {
370     connectivity_state = GRPC_CHANNEL_IDLE;
371   } else {
372     connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
373   }
374   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
375     gpr_log(GPR_INFO, "[weighted_target_lb %p] connectivity changed to %s",
376             this, ConnectivityStateName(connectivity_state));
377   }
378   std::unique_ptr<SubchannelPicker> picker;
379   absl::Status status;
380   switch (connectivity_state) {
381     case GRPC_CHANNEL_READY:
382       picker = absl::make_unique<WeightedPicker>(std::move(picker_list));
383       break;
384     case GRPC_CHANNEL_CONNECTING:
385     case GRPC_CHANNEL_IDLE:
386       picker =
387           absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
388       break;
389     default:
390       grpc_error* error = grpc_error_set_int(
391           GRPC_ERROR_CREATE_FROM_STATIC_STRING(
392               "weighted_target: all children report state TRANSIENT_FAILURE"),
393           GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
394       status = grpc_error_to_absl_status(error);
395       picker = absl::make_unique<TransientFailurePicker>(error);
396   }
397   channel_control_helper()->UpdateState(connectivity_state, status,
398                                         std::move(picker));
399 }
400 
401 //
402 // WeightedTargetLb::WeightedChild
403 //
404 
WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy,const std::string & name)405 WeightedTargetLb::WeightedChild::WeightedChild(
406     RefCountedPtr<WeightedTargetLb> weighted_target_policy,
407     const std::string& name)
408     : weighted_target_policy_(std::move(weighted_target_policy)), name_(name) {
409   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
410     gpr_log(GPR_INFO, "[weighted_target_lb %p] created WeightedChild %p for %s",
411             weighted_target_policy_.get(), this, name_.c_str());
412   }
413   GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this,
414                     grpc_schedule_on_exec_ctx);
415 }
416 
~WeightedChild()417 WeightedTargetLb::WeightedChild::~WeightedChild() {
418   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
419     gpr_log(GPR_INFO,
420             "[weighted_target_lb %p] WeightedChild %p %s: destroying child",
421             weighted_target_policy_.get(), this, name_.c_str());
422   }
423   weighted_target_policy_.reset(DEBUG_LOCATION, "WeightedChild");
424 }
425 
Orphan()426 void WeightedTargetLb::WeightedChild::Orphan() {
427   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
428     gpr_log(GPR_INFO,
429             "[weighted_target_lb %p] WeightedChild %p %s: shutting down child",
430             weighted_target_policy_.get(), this, name_.c_str());
431   }
432   // Remove the child policy's interested_parties pollset_set from the
433   // xDS policy.
434   grpc_pollset_set_del_pollset_set(
435       child_policy_->interested_parties(),
436       weighted_target_policy_->interested_parties());
437   child_policy_.reset();
438   // Drop our ref to the child's picker, in case it's holding a ref to
439   // the child.
440   picker_wrapper_.reset();
441   if (delayed_removal_timer_callback_pending_) {
442     delayed_removal_timer_callback_pending_ = false;
443     grpc_timer_cancel(&delayed_removal_timer_);
444   }
445   shutdown_ = true;
446   Unref();
447 }
448 
449 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const grpc_channel_args * args)450 WeightedTargetLb::WeightedChild::CreateChildPolicyLocked(
451     const grpc_channel_args* args) {
452   LoadBalancingPolicy::Args lb_policy_args;
453   lb_policy_args.work_serializer = weighted_target_policy_->work_serializer();
454   lb_policy_args.args = args;
455   lb_policy_args.channel_control_helper =
456       absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
457   OrphanablePtr<LoadBalancingPolicy> lb_policy =
458       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
459                                          &grpc_lb_weighted_target_trace);
460   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
461     gpr_log(GPR_INFO,
462             "[weighted_target_lb %p] WeightedChild %p %s: Created new child "
463             "policy handler %p",
464             weighted_target_policy_.get(), this, name_.c_str(),
465             lb_policy.get());
466   }
467   // Add the xDS's interested_parties pollset_set to that of the newly created
468   // child policy. This will make the child policy progress upon activity on
469   // xDS LB, which in turn is tied to the application's call.
470   grpc_pollset_set_add_pollset_set(
471       lb_policy->interested_parties(),
472       weighted_target_policy_->interested_parties());
473   return lb_policy;
474 }
475 
UpdateLocked(const WeightedTargetLbConfig::ChildConfig & config,ServerAddressList addresses,const grpc_channel_args * args)476 void WeightedTargetLb::WeightedChild::UpdateLocked(
477     const WeightedTargetLbConfig::ChildConfig& config,
478     ServerAddressList addresses, const grpc_channel_args* args) {
479   if (weighted_target_policy_->shutting_down_) return;
480   // Update child weight.
481   weight_ = config.weight;
482   // Reactivate if needed.
483   if (delayed_removal_timer_callback_pending_) {
484     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
485       gpr_log(GPR_INFO,
486               "[weighted_target_lb %p] WeightedChild %p %s: reactivating",
487               weighted_target_policy_.get(), this, name_.c_str());
488     }
489     delayed_removal_timer_callback_pending_ = false;
490     grpc_timer_cancel(&delayed_removal_timer_);
491   }
492   // Create child policy if needed.
493   if (child_policy_ == nullptr) {
494     child_policy_ = CreateChildPolicyLocked(args);
495   }
496   // Construct update args.
497   UpdateArgs update_args;
498   update_args.config = config.config;
499   update_args.addresses = std::move(addresses);
500   update_args.args = grpc_channel_args_copy(args);
501   // Update the policy.
502   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
503     gpr_log(GPR_INFO,
504             "[weighted_target_lb %p] WeightedChild %p %s: Updating child "
505             "policy handler %p",
506             weighted_target_policy_.get(), this, name_.c_str(),
507             child_policy_.get());
508   }
509   child_policy_->UpdateLocked(std::move(update_args));
510 }
511 
ResetBackoffLocked()512 void WeightedTargetLb::WeightedChild::ResetBackoffLocked() {
513   child_policy_->ResetBackoffLocked();
514 }
515 
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)516 void WeightedTargetLb::WeightedChild::OnConnectivityStateUpdateLocked(
517     grpc_connectivity_state state, const absl::Status& status,
518     std::unique_ptr<SubchannelPicker> picker) {
519   // Cache the picker in the WeightedChild.
520   picker_wrapper_ = MakeRefCounted<ChildPickerWrapper>(std::move(picker));
521   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
522     gpr_log(GPR_INFO,
523             "[weighted_target_lb %p] WeightedChild %p %s: connectivity "
524             "state update: state=%s (%s) picker_wrapper=%p",
525             weighted_target_policy_.get(), this, name_.c_str(),
526             ConnectivityStateName(state), status.ToString().c_str(),
527             picker_wrapper_.get());
528   }
529   // If the child reports IDLE, immediately tell it to exit idle.
530   if (state == GRPC_CHANNEL_IDLE) child_policy_->ExitIdleLocked();
531   // Decide what state to report for aggregation purposes.
532   // If we haven't seen a failure since the last time we were in state
533   // READY, then we report the state change as-is.  However, once we do see
534   // a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
535   // changes until we go back into state READY.
536   if (!seen_failure_since_ready_) {
537     if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
538       seen_failure_since_ready_ = true;
539     }
540   } else {
541     if (state != GRPC_CHANNEL_READY) return;
542     seen_failure_since_ready_ = false;
543   }
544   connectivity_state_ = state;
545   // Notify the LB policy.
546   weighted_target_policy_->UpdateStateLocked();
547 }
548 
DeactivateLocked()549 void WeightedTargetLb::WeightedChild::DeactivateLocked() {
550   // If already deactivated, don't do that again.
551   if (weight_ == 0) return;
552   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
553     gpr_log(GPR_INFO,
554             "[weighted_target_lb %p] WeightedChild %p %s: deactivating",
555             weighted_target_policy_.get(), this, name_.c_str());
556   }
557   // Set the child weight to 0 so that future picker won't contain this child.
558   weight_ = 0;
559   // Start a timer to delete the child.
560   Ref(DEBUG_LOCATION, "WeightedChild+timer").release();
561   delayed_removal_timer_callback_pending_ = true;
562   grpc_timer_init(&delayed_removal_timer_,
563                   ExecCtx::Get()->Now() + kChildRetentionIntervalMs,
564                   &on_delayed_removal_timer_);
565 }
566 
OnDelayedRemovalTimer(void * arg,grpc_error * error)567 void WeightedTargetLb::WeightedChild::OnDelayedRemovalTimer(void* arg,
568                                                             grpc_error* error) {
569   WeightedChild* self = static_cast<WeightedChild*>(arg);
570   GRPC_ERROR_REF(error);  // ref owned by lambda
571   self->weighted_target_policy_->work_serializer()->Run(
572       [self, error]() { self->OnDelayedRemovalTimerLocked(error); },
573       DEBUG_LOCATION);
574 }
575 
OnDelayedRemovalTimerLocked(grpc_error * error)576 void WeightedTargetLb::WeightedChild::OnDelayedRemovalTimerLocked(
577     grpc_error* error) {
578   if (error == GRPC_ERROR_NONE && delayed_removal_timer_callback_pending_ &&
579       !shutdown_ && weight_ == 0) {
580     delayed_removal_timer_callback_pending_ = false;
581     weighted_target_policy_->targets_.erase(name_);
582   }
583   Unref(DEBUG_LOCATION, "WeightedChild+timer");
584   GRPC_ERROR_UNREF(error);
585 }
586 
587 //
588 // WeightedTargetLb::WeightedChild::Helper
589 //
590 
591 RefCountedPtr<SubchannelInterface>
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)592 WeightedTargetLb::WeightedChild::Helper::CreateSubchannel(
593     ServerAddress address, const grpc_channel_args& args) {
594   if (weighted_child_->weighted_target_policy_->shutting_down_) return nullptr;
595   return weighted_child_->weighted_target_policy_->channel_control_helper()
596       ->CreateSubchannel(std::move(address), args);
597 }
598 
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)599 void WeightedTargetLb::WeightedChild::Helper::UpdateState(
600     grpc_connectivity_state state, const absl::Status& status,
601     std::unique_ptr<SubchannelPicker> picker) {
602   if (weighted_child_->weighted_target_policy_->shutting_down_) return;
603   weighted_child_->OnConnectivityStateUpdateLocked(state, status,
604                                                    std::move(picker));
605 }
606 
RequestReresolution()607 void WeightedTargetLb::WeightedChild::Helper::RequestReresolution() {
608   if (weighted_child_->weighted_target_policy_->shutting_down_) return;
609   weighted_child_->weighted_target_policy_->channel_control_helper()
610       ->RequestReresolution();
611 }
612 
AddTraceEvent(TraceSeverity severity,absl::string_view message)613 void WeightedTargetLb::WeightedChild::Helper::AddTraceEvent(
614     TraceSeverity severity, absl::string_view message) {
615   if (weighted_child_->weighted_target_policy_->shutting_down_) return;
616   weighted_child_->weighted_target_policy_->channel_control_helper()
617       ->AddTraceEvent(severity, message);
618 }
619 
620 //
621 // factory
622 //
623 
624 class WeightedTargetLbFactory : public LoadBalancingPolicyFactory {
625  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const626   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
627       LoadBalancingPolicy::Args args) const override {
628     return MakeOrphanable<WeightedTargetLb>(std::move(args));
629   }
630 
name() const631   const char* name() const override { return kWeightedTarget; }
632 
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const633   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
634       const Json& json, grpc_error** error) const override {
635     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
636     if (json.type() == Json::Type::JSON_NULL) {
637       // weighted_target was mentioned as a policy in the deprecated
638       // loadBalancingPolicy field or in the client API.
639       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
640           "field:loadBalancingPolicy error:weighted_target policy requires "
641           "configuration.  Please use loadBalancingConfig field of service "
642           "config instead.");
643       return nullptr;
644     }
645     std::vector<grpc_error*> error_list;
646     // Weight map.
647     WeightedTargetLbConfig::TargetMap target_map;
648     auto it = json.object_value().find("targets");
649     if (it == json.object_value().end()) {
650       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
651           "field:targets error:required field not present"));
652     } else if (it->second.type() != Json::Type::OBJECT) {
653       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
654           "field:targets error:type should be object"));
655     } else {
656       for (const auto& p : it->second.object_value()) {
657         WeightedTargetLbConfig::ChildConfig child_config;
658         std::vector<grpc_error*> child_errors =
659             ParseChildConfig(p.second, &child_config);
660         if (!child_errors.empty()) {
661           // Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
662           // string is not static in this case.
663           grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
664               absl::StrCat("field:targets key:", p.first).c_str());
665           for (grpc_error* child_error : child_errors) {
666             error = grpc_error_add_child(error, child_error);
667           }
668           error_list.push_back(error);
669         } else {
670           target_map[p.first] = std::move(child_config);
671         }
672       }
673     }
674     if (!error_list.empty()) {
675       *error = GRPC_ERROR_CREATE_FROM_VECTOR(
676           "weighted_target_experimental LB policy config", &error_list);
677       return nullptr;
678     }
679     return MakeRefCounted<WeightedTargetLbConfig>(std::move(target_map));
680   }
681 
682  private:
ParseChildConfig(const Json & json,WeightedTargetLbConfig::ChildConfig * child_config)683   static std::vector<grpc_error*> ParseChildConfig(
684       const Json& json, WeightedTargetLbConfig::ChildConfig* child_config) {
685     std::vector<grpc_error*> error_list;
686     if (json.type() != Json::Type::OBJECT) {
687       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
688           "value should be of type object"));
689       return error_list;
690     }
691     // Weight.
692     auto it = json.object_value().find("weight");
693     if (it == json.object_value().end()) {
694       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
695           "required field \"weight\" not specified"));
696     } else if (it->second.type() != Json::Type::NUMBER) {
697       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
698           "field:weight error:must be of type number"));
699     } else {
700       int weight = gpr_parse_nonnegative_int(it->second.string_value().c_str());
701       if (weight == -1) {
702         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
703             "field:weight error:unparseable value"));
704       } else if (weight == 0) {
705         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
706             "field:weight error:value must be greater than zero"));
707       } else {
708         child_config->weight = weight;
709       }
710     }
711     // Child policy.
712     it = json.object_value().find("childPolicy");
713     if (it != json.object_value().end()) {
714       grpc_error* parse_error = GRPC_ERROR_NONE;
715       child_config->config =
716           LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(it->second,
717                                                                 &parse_error);
718       if (child_config->config == nullptr) {
719         GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
720         std::vector<grpc_error*> child_errors;
721         child_errors.push_back(parse_error);
722         error_list.push_back(
723             GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
724       }
725     }
726     return error_list;
727   }
728 };
729 
730 }  // namespace
731 
732 }  // namespace grpc_core
733 
734 //
735 // Plugin registration
736 //
737 
grpc_lb_policy_weighted_target_init()738 void grpc_lb_policy_weighted_target_init() {
739   grpc_core::LoadBalancingPolicyRegistry::Builder::
740       RegisterLoadBalancingPolicyFactory(
741           absl::make_unique<grpc_core::WeightedTargetLbFactory>());
742 }
743 
grpc_lb_policy_weighted_target_shutdown()744 void grpc_lb_policy_weighted_target_shutdown() {}
745