1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/load_balancing/weighted_target/weighted_target.h"
18
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/impl/connectivity_state.h>
21 #include <grpc/support/port_platform.h>
22 #include <string.h>
23
24 #include <algorithm>
25 #include <cstdint>
26 #include <map>
27 #include <memory>
28 #include <string>
29 #include <utility>
30 #include <vector>
31
32 #include "absl/base/thread_annotations.h"
33 #include "absl/log/check.h"
34 #include "absl/log/log.h"
35 #include "absl/meta/type_traits.h"
36 #include "absl/random/random.h"
37 #include "absl/status/status.h"
38 #include "absl/status/statusor.h"
39 #include "absl/strings/str_cat.h"
40 #include "absl/strings/str_join.h"
41 #include "absl/strings/string_view.h"
42 #include "absl/types/optional.h"
43 #include "src/core/config/core_configuration.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/debug/trace.h"
46 #include "src/core/lib/iomgr/exec_ctx.h"
47 #include "src/core/lib/iomgr/pollset_set.h"
48 #include "src/core/lib/transport/connectivity_state.h"
49 #include "src/core/load_balancing/address_filtering.h"
50 #include "src/core/load_balancing/child_policy_handler.h"
51 #include "src/core/load_balancing/delegating_helper.h"
52 #include "src/core/load_balancing/lb_policy.h"
53 #include "src/core/load_balancing/lb_policy_factory.h"
54 #include "src/core/load_balancing/lb_policy_registry.h"
55 #include "src/core/resolver/endpoint_addresses.h"
56 #include "src/core/util/debug_location.h"
57 #include "src/core/util/json/json.h"
58 #include "src/core/util/json/json_args.h"
59 #include "src/core/util/json/json_object_loader.h"
60 #include "src/core/util/orphanable.h"
61 #include "src/core/util/ref_counted_ptr.h"
62 #include "src/core/util/sync.h"
63 #include "src/core/util/time.h"
64 #include "src/core/util/validation_errors.h"
65 #include "src/core/util/work_serializer.h"
66
67 // IWYU pragma: no_include <type_traits>
68
69 namespace grpc_core {
70
71 namespace {
72
73 using ::grpc_event_engine::experimental::EventEngine;
74
75 constexpr absl::string_view kWeightedTarget = "weighted_target_experimental";
76
77 // How long we keep a child around for after it has been removed from
78 // the config.
79 constexpr Duration kChildRetentionInterval = Duration::Minutes(15);
80
81 // Config for weighted_target LB policy.
82 class WeightedTargetLbConfig final : public LoadBalancingPolicy::Config {
83 public:
84 struct ChildConfig {
85 uint32_t weight;
86 RefCountedPtr<LoadBalancingPolicy::Config> config;
87
88 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
89 void JsonPostLoad(const Json& json, const JsonArgs&,
90 ValidationErrors* errors);
91 };
92
93 using TargetMap = std::map<std::string, ChildConfig>;
94
95 WeightedTargetLbConfig() = default;
96
97 WeightedTargetLbConfig(const WeightedTargetLbConfig&) = delete;
98 WeightedTargetLbConfig& operator=(const WeightedTargetLbConfig&) = delete;
99
100 WeightedTargetLbConfig(WeightedTargetLbConfig&& other) = delete;
101 WeightedTargetLbConfig& operator=(WeightedTargetLbConfig&& other) = delete;
102
name() const103 absl::string_view name() const override { return kWeightedTarget; }
104
target_map() const105 const TargetMap& target_map() const { return target_map_; }
106
107 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
108
109 private:
110 TargetMap target_map_;
111 };
112
113 // weighted_target LB policy.
114 class WeightedTargetLb final : public LoadBalancingPolicy {
115 public:
116 explicit WeightedTargetLb(Args args);
117
name() const118 absl::string_view name() const override { return kWeightedTarget; }
119
120 absl::Status UpdateLocked(UpdateArgs args) override;
121 void ResetBackoffLocked() override;
122
123 private:
124 // Picks a child using stateless WRR and then delegates to that
125 // child's picker.
126 class WeightedPicker final : public SubchannelPicker {
127 public:
128 // Maintains a weighted list of pickers from each child that is in
129 // ready state. The first element in the pair represents the end of a
130 // range proportional to the child's weight. The start of the range
131 // is the previous value in the vector and is 0 for the first element.
132 using PickerList =
133 std::vector<std::pair<uint64_t, RefCountedPtr<SubchannelPicker>>>;
134
WeightedPicker(PickerList pickers)135 explicit WeightedPicker(PickerList pickers)
136 : pickers_(std::move(pickers)) {}
137
138 PickResult Pick(PickArgs args) override;
139
140 private:
141 PickerList pickers_;
142
143 // TODO(roth): Consider using a separate thread-local BitGen for each CPU
144 // to avoid the need for this mutex.
145 Mutex mu_;
146 absl::BitGen bit_gen_ ABSL_GUARDED_BY(&mu_);
147 };
148
149 // Each WeightedChild holds a ref to its parent WeightedTargetLb.
150 class WeightedChild final : public InternallyRefCounted<WeightedChild> {
151 public:
152 WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy,
153 const std::string& name);
154 ~WeightedChild() override;
155
156 void Orphan() override;
157
158 absl::Status UpdateLocked(
159 const WeightedTargetLbConfig::ChildConfig& config,
160 absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
161 const std::string& resolution_note, ChannelArgs args);
162 void ResetBackoffLocked();
163 void DeactivateLocked();
164
weight() const165 uint32_t weight() const { return weight_; }
connectivity_state() const166 grpc_connectivity_state connectivity_state() const {
167 return connectivity_state_;
168 }
picker() const169 RefCountedPtr<SubchannelPicker> picker() const { return picker_; }
170
171 private:
172 class Helper final : public DelegatingChannelControlHelper {
173 public:
Helper(RefCountedPtr<WeightedChild> weighted_child)174 explicit Helper(RefCountedPtr<WeightedChild> weighted_child)
175 : weighted_child_(std::move(weighted_child)) {}
176
~Helper()177 ~Helper() override { weighted_child_.reset(DEBUG_LOCATION, "Helper"); }
178
179 void UpdateState(grpc_connectivity_state state,
180 const absl::Status& status,
181 RefCountedPtr<SubchannelPicker> picker) override;
182
183 private:
parent_helper() const184 ChannelControlHelper* parent_helper() const override {
185 return weighted_child_->weighted_target_policy_
186 ->channel_control_helper();
187 }
188
189 RefCountedPtr<WeightedChild> weighted_child_;
190 };
191
192 class DelayedRemovalTimer final
193 : public InternallyRefCounted<DelayedRemovalTimer> {
194 public:
195 explicit DelayedRemovalTimer(RefCountedPtr<WeightedChild> weighted_child);
196
197 void Orphan() override;
198
199 private:
200 void OnTimerLocked();
201
202 RefCountedPtr<WeightedChild> weighted_child_;
203 absl::optional<EventEngine::TaskHandle> timer_handle_;
204 };
205
206 // Methods for dealing with the child policy.
207 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
208 const ChannelArgs& args);
209
210 void OnConnectivityStateUpdateLocked(
211 grpc_connectivity_state state, const absl::Status& status,
212 RefCountedPtr<SubchannelPicker> picker);
213
214 // The owning LB policy.
215 RefCountedPtr<WeightedTargetLb> weighted_target_policy_;
216
217 const std::string name_;
218
219 uint32_t weight_ = 0;
220
221 OrphanablePtr<LoadBalancingPolicy> child_policy_;
222
223 RefCountedPtr<SubchannelPicker> picker_;
224 grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
225
226 OrphanablePtr<DelayedRemovalTimer> delayed_removal_timer_;
227 };
228
229 ~WeightedTargetLb() override;
230
231 void ShutdownLocked() override;
232
233 void UpdateStateLocked();
234
235 // Current config from the resolver.
236 RefCountedPtr<WeightedTargetLbConfig> config_;
237
238 // Internal state.
239 bool shutting_down_ = false;
240 bool update_in_progress_ = false;
241
242 // Children.
243 std::map<std::string, OrphanablePtr<WeightedChild>> targets_;
244 };
245
246 //
247 // WeightedTargetLb::WeightedPicker
248 //
249
Pick(PickArgs args)250 WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick(
251 PickArgs args) {
252 // Generate a random number in [0, total weight).
253 const uint64_t key = [&]() {
254 MutexLock lock(&mu_);
255 return absl::Uniform<uint64_t>(bit_gen_, 0, pickers_.back().first);
256 }();
257 // Find the index in pickers_ corresponding to key.
258 size_t mid = 0;
259 size_t start_index = 0;
260 size_t end_index = pickers_.size() - 1;
261 size_t index = 0;
262 while (end_index > start_index) {
263 mid = (start_index + end_index) / 2;
264 if (pickers_[mid].first > key) {
265 end_index = mid;
266 } else if (pickers_[mid].first < key) {
267 start_index = mid + 1;
268 } else {
269 index = mid + 1;
270 break;
271 }
272 }
273 if (index == 0) index = start_index;
274 CHECK(pickers_[index].first > key);
275 // Delegate to the child picker.
276 return pickers_[index].second->Pick(args);
277 }
278
279 //
280 // WeightedTargetLb
281 //
282
WeightedTargetLb(Args args)283 WeightedTargetLb::WeightedTargetLb(Args args)
284 : LoadBalancingPolicy(std::move(args)) {
285 GRPC_TRACE_LOG(weighted_target_lb, INFO)
286 << "[weighted_target_lb " << this << "] created";
287 }
288
~WeightedTargetLb()289 WeightedTargetLb::~WeightedTargetLb() {
290 GRPC_TRACE_LOG(weighted_target_lb, INFO)
291 << "[weighted_target_lb " << this
292 << "] destroying weighted_target LB policy";
293 }
294
ShutdownLocked()295 void WeightedTargetLb::ShutdownLocked() {
296 GRPC_TRACE_LOG(weighted_target_lb, INFO)
297 << "[weighted_target_lb " << this << "] shutting down";
298 shutting_down_ = true;
299 targets_.clear();
300 }
301
ResetBackoffLocked()302 void WeightedTargetLb::ResetBackoffLocked() {
303 for (auto& p : targets_) p.second->ResetBackoffLocked();
304 }
305
UpdateLocked(UpdateArgs args)306 absl::Status WeightedTargetLb::UpdateLocked(UpdateArgs args) {
307 if (shutting_down_) return absl::OkStatus();
308 GRPC_TRACE_LOG(weighted_target_lb, INFO)
309 << "[weighted_target_lb " << this << "] received update";
310 update_in_progress_ = true;
311 // Update config.
312 config_ = args.config.TakeAsSubclass<WeightedTargetLbConfig>();
313 // Deactivate the targets not in the new config.
314 for (const auto& p : targets_) {
315 const std::string& name = p.first;
316 WeightedChild* child = p.second.get();
317 if (config_->target_map().find(name) == config_->target_map().end()) {
318 child->DeactivateLocked();
319 }
320 }
321 // Update all children.
322 absl::StatusOr<HierarchicalAddressMap> address_map =
323 MakeHierarchicalAddressMap(args.addresses);
324 std::vector<std::string> errors;
325 for (const auto& p : config_->target_map()) {
326 const std::string& name = p.first;
327 const WeightedTargetLbConfig::ChildConfig& config = p.second;
328 auto& target = targets_[name];
329 // Create child if it does not already exist.
330 if (target == nullptr) {
331 target = MakeOrphanable<WeightedChild>(
332 RefAsSubclass<WeightedTargetLb>(DEBUG_LOCATION, "WeightedChild"),
333 name);
334 }
335 absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses;
336 if (address_map.ok()) {
337 auto it = address_map->find(name);
338 if (it == address_map->end()) {
339 addresses = std::make_shared<EndpointAddressesListIterator>(
340 EndpointAddressesList());
341 } else {
342 addresses = std::move(it->second);
343 }
344 } else {
345 addresses = address_map.status();
346 }
347 absl::Status status = target->UpdateLocked(config, std::move(addresses),
348 args.resolution_note, args.args);
349 if (!status.ok()) {
350 errors.emplace_back(
351 absl::StrCat("child ", name, ": ", status.ToString()));
352 }
353 }
354 update_in_progress_ = false;
355 if (config_->target_map().empty()) {
356 absl::Status status = absl::UnavailableError(absl::StrCat(
357 "no children in weighted_target policy: ", args.resolution_note));
358 channel_control_helper()->UpdateState(
359 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
360 MakeRefCounted<TransientFailurePicker>(status));
361 return absl::OkStatus();
362 }
363 UpdateStateLocked();
364 // Return status.
365 if (!errors.empty()) {
366 return absl::UnavailableError(absl::StrCat(
367 "errors from children: [", absl::StrJoin(errors, "; "), "]"));
368 }
369 return absl::OkStatus();
370 }
371
UpdateStateLocked()372 void WeightedTargetLb::UpdateStateLocked() {
373 // If we're in the process of propagating an update from our parent to
374 // our children, ignore any updates that come from the children. We
375 // will instead return a new picker once the update has been seen by
376 // all children. This avoids unnecessary picker churn while an update
377 // is being propagated to our children.
378 if (update_in_progress_) return;
379 GRPC_TRACE_LOG(weighted_target_lb, INFO)
380 << "[weighted_target_lb " << this
381 << "] scanning children to determine connectivity state";
382 // Construct lists of child pickers with associated weights, one for
383 // children that are in state READY and another for children that are
384 // in state TRANSIENT_FAILURE. Each child is represented by a portion of
385 // the range proportional to its weight, such that the total range is the
386 // sum of the weights of all children.
387 WeightedPicker::PickerList ready_picker_list;
388 uint64_t ready_end = 0;
389 WeightedPicker::PickerList tf_picker_list;
390 uint64_t tf_end = 0;
391 // Also count the number of children in CONNECTING and IDLE, to determine
392 // the aggregated state.
393 size_t num_connecting = 0;
394 size_t num_idle = 0;
395 for (const auto& p : targets_) {
396 const std::string& child_name = p.first;
397 const WeightedChild* child = p.second.get();
398 // Skip the targets that are not in the latest update.
399 if (config_->target_map().find(child_name) == config_->target_map().end()) {
400 continue;
401 }
402 auto child_picker = child->picker();
403 GRPC_TRACE_LOG(weighted_target_lb, INFO)
404 << "[weighted_target_lb " << this << "] child=" << child_name
405 << " state=" << ConnectivityStateName(child->connectivity_state())
406 << " weight=" << child->weight() << " picker=" << child_picker.get();
407 switch (child->connectivity_state()) {
408 case GRPC_CHANNEL_READY: {
409 CHECK_GT(child->weight(), 0u);
410 ready_end += child->weight();
411 ready_picker_list.emplace_back(ready_end, std::move(child_picker));
412 break;
413 }
414 case GRPC_CHANNEL_CONNECTING: {
415 ++num_connecting;
416 break;
417 }
418 case GRPC_CHANNEL_IDLE: {
419 ++num_idle;
420 break;
421 }
422 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
423 CHECK_GT(child->weight(), 0u);
424 tf_end += child->weight();
425 tf_picker_list.emplace_back(tf_end, std::move(child_picker));
426 break;
427 }
428 default:
429 GPR_UNREACHABLE_CODE(return);
430 }
431 }
432 // Determine aggregated connectivity state.
433 grpc_connectivity_state connectivity_state;
434 if (!ready_picker_list.empty()) {
435 connectivity_state = GRPC_CHANNEL_READY;
436 } else if (num_connecting > 0) {
437 connectivity_state = GRPC_CHANNEL_CONNECTING;
438 } else if (num_idle > 0) {
439 connectivity_state = GRPC_CHANNEL_IDLE;
440 } else {
441 connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
442 }
443 GRPC_TRACE_LOG(weighted_target_lb, INFO)
444 << "[weighted_target_lb " << this << "] connectivity changed to "
445 << ConnectivityStateName(connectivity_state);
446 RefCountedPtr<SubchannelPicker> picker;
447 absl::Status status;
448 switch (connectivity_state) {
449 case GRPC_CHANNEL_READY:
450 picker = MakeRefCounted<WeightedPicker>(std::move(ready_picker_list));
451 break;
452 case GRPC_CHANNEL_CONNECTING:
453 case GRPC_CHANNEL_IDLE:
454 picker = MakeRefCounted<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
455 break;
456 default:
457 picker = MakeRefCounted<WeightedPicker>(std::move(tf_picker_list));
458 }
459 channel_control_helper()->UpdateState(connectivity_state, status,
460 std::move(picker));
461 }
462
463 //
464 // WeightedTargetLb::WeightedChild::DelayedRemovalTimer
465 //
466
DelayedRemovalTimer(RefCountedPtr<WeightedTargetLb::WeightedChild> weighted_child)467 WeightedTargetLb::WeightedChild::DelayedRemovalTimer::DelayedRemovalTimer(
468 RefCountedPtr<WeightedTargetLb::WeightedChild> weighted_child)
469 : weighted_child_(std::move(weighted_child)) {
470 timer_handle_ =
471 weighted_child_->weighted_target_policy_->channel_control_helper()
472 ->GetEventEngine()
473 ->RunAfter(kChildRetentionInterval, [self = Ref()]() mutable {
474 ApplicationCallbackExecCtx app_exec_ctx;
475 ExecCtx exec_ctx;
476 auto* self_ptr = self.get(); // Avoid use-after-move problem.
477 self_ptr->weighted_child_->weighted_target_policy_
478 ->work_serializer()
479 ->Run([self = std::move(self)] { self->OnTimerLocked(); },
480 DEBUG_LOCATION);
481 });
482 }
483
Orphan()484 void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::Orphan() {
485 if (timer_handle_.has_value()) {
486 GRPC_TRACE_LOG(weighted_target_lb, INFO)
487 << "[weighted_target_lb "
488 << weighted_child_->weighted_target_policy_.get() << "] WeightedChild "
489 << weighted_child_.get() << " " << weighted_child_->name_
490 << ": cancelling delayed removal timer";
491 weighted_child_->weighted_target_policy_->channel_control_helper()
492 ->GetEventEngine()
493 ->Cancel(*timer_handle_);
494 }
495 Unref();
496 }
497
OnTimerLocked()498 void WeightedTargetLb::WeightedChild::DelayedRemovalTimer::OnTimerLocked() {
499 CHECK(timer_handle_.has_value());
500 timer_handle_.reset();
501 weighted_child_->weighted_target_policy_->targets_.erase(
502 weighted_child_->name_);
503 }
504
505 //
506 // WeightedTargetLb::WeightedChild
507 //
508
WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy,const std::string & name)509 WeightedTargetLb::WeightedChild::WeightedChild(
510 RefCountedPtr<WeightedTargetLb> weighted_target_policy,
511 const std::string& name)
512 : weighted_target_policy_(std::move(weighted_target_policy)),
513 name_(name),
514 picker_(MakeRefCounted<QueuePicker>(nullptr)) {
515 GRPC_TRACE_LOG(weighted_target_lb, INFO)
516 << "[weighted_target_lb " << weighted_target_policy_.get()
517 << "] created WeightedChild " << this << " for " << name_;
518 }
519
~WeightedChild()520 WeightedTargetLb::WeightedChild::~WeightedChild() {
521 GRPC_TRACE_LOG(weighted_target_lb, INFO)
522 << "[weighted_target_lb " << weighted_target_policy_.get()
523 << "] WeightedChild " << this << " " << name_ << ": destroying child";
524 weighted_target_policy_.reset(DEBUG_LOCATION, "WeightedChild");
525 }
526
Orphan()527 void WeightedTargetLb::WeightedChild::Orphan() {
528 GRPC_TRACE_LOG(weighted_target_lb, INFO)
529 << "[weighted_target_lb " << weighted_target_policy_.get()
530 << "] WeightedChild " << this << " " << name_ << ": shutting down child";
531 // Remove the child policy's interested_parties pollset_set from the
532 // xDS policy.
533 grpc_pollset_set_del_pollset_set(
534 child_policy_->interested_parties(),
535 weighted_target_policy_->interested_parties());
536 child_policy_.reset();
537 // Drop our ref to the child's picker, in case it's holding a ref to
538 // the child.
539 picker_.reset();
540 delayed_removal_timer_.reset();
541 Unref();
542 }
543
544 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const ChannelArgs & args)545 WeightedTargetLb::WeightedChild::CreateChildPolicyLocked(
546 const ChannelArgs& args) {
547 LoadBalancingPolicy::Args lb_policy_args;
548 lb_policy_args.work_serializer = weighted_target_policy_->work_serializer();
549 lb_policy_args.args = args;
550 lb_policy_args.channel_control_helper =
551 std::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
552 OrphanablePtr<LoadBalancingPolicy> lb_policy =
553 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
554 &weighted_target_lb_trace);
555 GRPC_TRACE_LOG(weighted_target_lb, INFO)
556 << "[weighted_target_lb " << weighted_target_policy_.get()
557 << "] WeightedChild " << this << " " << name_
558 << ": created new child policy handler " << lb_policy.get();
559 // Add the xDS's interested_parties pollset_set to that of the newly created
560 // child policy. This will make the child policy progress upon activity on
561 // xDS LB, which in turn is tied to the application's call.
562 grpc_pollset_set_add_pollset_set(
563 lb_policy->interested_parties(),
564 weighted_target_policy_->interested_parties());
565 return lb_policy;
566 }
567
UpdateLocked(const WeightedTargetLbConfig::ChildConfig & config,absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,const std::string & resolution_note,ChannelArgs args)568 absl::Status WeightedTargetLb::WeightedChild::UpdateLocked(
569 const WeightedTargetLbConfig::ChildConfig& config,
570 absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
571 const std::string& resolution_note, ChannelArgs args) {
572 if (weighted_target_policy_->shutting_down_) return absl::OkStatus();
573 // Update child weight.
574 if (weight_ != config.weight && GRPC_TRACE_FLAG_ENABLED(weighted_target_lb)) {
575 LOG(INFO) << "[weighted_target_lb " << weighted_target_policy_.get()
576 << "] WeightedChild " << this << " " << name_
577 << ": weight=" << config.weight;
578 }
579 weight_ = config.weight;
580 // Reactivate if needed.
581 if (delayed_removal_timer_ != nullptr) {
582 GRPC_TRACE_LOG(weighted_target_lb, INFO)
583 << "[weighted_target_lb " << weighted_target_policy_.get()
584 << "] WeightedChild " << this << " " << name_ << ": reactivating";
585 delayed_removal_timer_.reset();
586 }
587 // Create child policy if needed.
588 args = args.Set(GRPC_ARG_LB_WEIGHTED_TARGET_CHILD, name_);
589 if (child_policy_ == nullptr) {
590 child_policy_ = CreateChildPolicyLocked(args);
591 }
592 // Construct update args.
593 UpdateArgs update_args;
594 update_args.config = config.config;
595 update_args.addresses = std::move(addresses);
596 update_args.resolution_note = resolution_note;
597 update_args.args = std::move(args);
598 // Update the policy.
599 GRPC_TRACE_LOG(weighted_target_lb, INFO)
600 << "[weighted_target_lb " << weighted_target_policy_.get()
601 << "] WeightedChild " << this << " " << name_
602 << ": updating child policy handler " << child_policy_.get();
603 return child_policy_->UpdateLocked(std::move(update_args));
604 }
605
ResetBackoffLocked()606 void WeightedTargetLb::WeightedChild::ResetBackoffLocked() {
607 child_policy_->ResetBackoffLocked();
608 }
609
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)610 void WeightedTargetLb::WeightedChild::OnConnectivityStateUpdateLocked(
611 grpc_connectivity_state state, const absl::Status& status,
612 RefCountedPtr<SubchannelPicker> picker) {
613 // Cache the picker in the WeightedChild.
614 picker_ = std::move(picker);
615 GRPC_TRACE_LOG(weighted_target_lb, INFO)
616 << "[weighted_target_lb " << weighted_target_policy_.get()
617 << "] WeightedChild " << this << " " << name_
618 << ": connectivity state update: state=" << ConnectivityStateName(state)
619 << " (" << status << ") picker=" << picker_.get();
620 // If the child reports IDLE, immediately tell it to exit idle.
621 if (state == GRPC_CHANNEL_IDLE) child_policy_->ExitIdleLocked();
622 // Decide what state to report for aggregation purposes.
623 // If the last recorded state was TRANSIENT_FAILURE and the new state
624 // is something other than READY, don't change the state.
625 if (connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE ||
626 state == GRPC_CHANNEL_READY) {
627 connectivity_state_ = state;
628 }
629 // Update the LB policy's state if this child is not deactivated.
630 if (weight_ != 0) weighted_target_policy_->UpdateStateLocked();
631 }
632
DeactivateLocked()633 void WeightedTargetLb::WeightedChild::DeactivateLocked() {
634 // If already deactivated, don't do that again.
635 if (weight_ == 0) return;
636 GRPC_TRACE_LOG(weighted_target_lb, INFO)
637 << "[weighted_target_lb " << weighted_target_policy_.get()
638 << "] WeightedChild " << this << " " << name_ << ": deactivating";
639 // Set the child weight to 0 so that future picker won't contain this child.
640 weight_ = 0;
641 // Start a timer to delete the child.
642 delayed_removal_timer_ = MakeOrphanable<DelayedRemovalTimer>(
643 Ref(DEBUG_LOCATION, "DelayedRemovalTimer"));
644 }
645
646 //
647 // WeightedTargetLb::WeightedChild::Helper
648 //
649
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)650 void WeightedTargetLb::WeightedChild::Helper::UpdateState(
651 grpc_connectivity_state state, const absl::Status& status,
652 RefCountedPtr<SubchannelPicker> picker) {
653 if (weighted_child_->weighted_target_policy_->shutting_down_) return;
654 weighted_child_->OnConnectivityStateUpdateLocked(state, status,
655 std::move(picker));
656 }
657
658 //
659 // factory
660 //
661
JsonLoader(const JsonArgs &)662 const JsonLoaderInterface* WeightedTargetLbConfig::ChildConfig::JsonLoader(
663 const JsonArgs&) {
664 static const auto* loader =
665 JsonObjectLoader<ChildConfig>()
666 // Note: The config field requires custom parsing, so it's
667 // handled in JsonPostLoad() instead.
668 .Field("weight", &ChildConfig::weight)
669 .Finish();
670 return loader;
671 }
672
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)673 void WeightedTargetLbConfig::ChildConfig::JsonPostLoad(
674 const Json& json, const JsonArgs&, ValidationErrors* errors) {
675 ValidationErrors::ScopedField field(errors, ".childPolicy");
676 auto it = json.object().find("childPolicy");
677 if (it == json.object().end()) {
678 errors->AddError("field not present");
679 return;
680 }
681 auto lb_config =
682 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
683 it->second);
684 if (!lb_config.ok()) {
685 errors->AddError(lb_config.status().message());
686 return;
687 }
688 config = std::move(*lb_config);
689 }
690
JsonLoader(const JsonArgs &)691 const JsonLoaderInterface* WeightedTargetLbConfig::JsonLoader(const JsonArgs&) {
692 static const auto* loader =
693 JsonObjectLoader<WeightedTargetLbConfig>()
694 .Field("targets", &WeightedTargetLbConfig::target_map_)
695 .Finish();
696 return loader;
697 }
698
699 class WeightedTargetLbFactory final : public LoadBalancingPolicyFactory {
700 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const701 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
702 LoadBalancingPolicy::Args args) const override {
703 return MakeOrphanable<WeightedTargetLb>(std::move(args));
704 }
705
name() const706 absl::string_view name() const override { return kWeightedTarget; }
707
708 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const709 ParseLoadBalancingConfig(const Json& json) const override {
710 return LoadFromJson<RefCountedPtr<WeightedTargetLbConfig>>(
711 json, JsonArgs(), "errors validating weighted_target LB policy config");
712 }
713 };
714
715 } // namespace
716
RegisterWeightedTargetLbPolicy(CoreConfiguration::Builder * builder)717 void RegisterWeightedTargetLbPolicy(CoreConfiguration::Builder* builder) {
718 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
719 std::make_unique<WeightedTargetLbFactory>());
720 }
721
722 } // namespace grpc_core
723