1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <inttypes.h>
20 #include <limits.h>
21 #include <string.h>
22
23 #include "absl/container/inlined_vector.h"
24 #include "absl/strings/str_cat.h"
25
26 #include <grpc/grpc.h>
27
28 #include "src/core/ext/filters/client_channel/lb_policy.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
30 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
31 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
32 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
33 #include "src/core/lib/channel/channel_args.h"
34 #include "src/core/lib/gpr/string.h"
35 #include "src/core/lib/gprpp/orphanable.h"
36 #include "src/core/lib/gprpp/ref_counted_ptr.h"
37 #include "src/core/lib/iomgr/timer.h"
38 #include "src/core/lib/iomgr/work_serializer.h"
39 #include "src/core/lib/transport/error_utils.h"
40
41 namespace grpc_core {
42
43 TraceFlag grpc_lb_weighted_target_trace(false, "weighted_target_lb");
44
45 namespace {
46
47 constexpr char kWeightedTarget[] = "weighted_target_experimental";
48
49 // How long we keep a child around for after it has been removed from
50 // the config.
51 constexpr int kChildRetentionIntervalMs = 15 * 60 * 1000;
52
53 // Config for weighted_target LB policy.
54 class WeightedTargetLbConfig : public LoadBalancingPolicy::Config {
55 public:
56 struct ChildConfig {
57 uint32_t weight;
58 RefCountedPtr<LoadBalancingPolicy::Config> config;
59 };
60
61 using TargetMap = std::map<std::string, ChildConfig>;
62
WeightedTargetLbConfig(TargetMap target_map)63 explicit WeightedTargetLbConfig(TargetMap target_map)
64 : target_map_(std::move(target_map)) {}
65
name() const66 const char* name() const override { return kWeightedTarget; }
67
target_map() const68 const TargetMap& target_map() const { return target_map_; }
69
70 private:
71 TargetMap target_map_;
72 };
73
74 // weighted_target LB policy.
75 class WeightedTargetLb : public LoadBalancingPolicy {
76 public:
77 explicit WeightedTargetLb(Args args);
78
name() const79 const char* name() const override { return kWeightedTarget; }
80
81 void UpdateLocked(UpdateArgs args) override;
82 void ResetBackoffLocked() override;
83
84 private:
85 // A simple wrapper for ref-counting a picker from the child policy.
86 class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> {
87 public:
ChildPickerWrapper(std::unique_ptr<SubchannelPicker> picker)88 explicit ChildPickerWrapper(std::unique_ptr<SubchannelPicker> picker)
89 : picker_(std::move(picker)) {}
Pick(PickArgs args)90 PickResult Pick(PickArgs args) { return picker_->Pick(args); }
91
92 private:
93 std::unique_ptr<SubchannelPicker> picker_;
94 };
95
96 // Picks a child using stateless WRR and then delegates to that
97 // child's picker.
98 class WeightedPicker : public SubchannelPicker {
99 public:
100 // Maintains a weighted list of pickers from each child that is in
101 // ready state. The first element in the pair represents the end of a
102 // range proportional to the child's weight. The start of the range
103 // is the previous value in the vector and is 0 for the first element.
104 using PickerList = absl::InlinedVector<
105 std::pair<uint32_t, RefCountedPtr<ChildPickerWrapper>>, 1>;
106
WeightedPicker(PickerList pickers)107 explicit WeightedPicker(PickerList pickers)
108 : pickers_(std::move(pickers)) {}
109
110 PickResult Pick(PickArgs args) override;
111
112 private:
113 PickerList pickers_;
114 };
115
116 // Each WeightedChild holds a ref to its parent WeightedTargetLb.
117 class WeightedChild : public InternallyRefCounted<WeightedChild> {
118 public:
119 WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy,
120 const std::string& name);
121 ~WeightedChild() override;
122
123 void Orphan() override;
124
125 void UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config,
126 ServerAddressList addresses,
127 const grpc_channel_args* args);
128 void ResetBackoffLocked();
129 void DeactivateLocked();
130
weight() const131 uint32_t weight() const { return weight_; }
connectivity_state() const132 grpc_connectivity_state connectivity_state() const {
133 return connectivity_state_;
134 }
picker_wrapper() const135 RefCountedPtr<ChildPickerWrapper> picker_wrapper() const {
136 return picker_wrapper_;
137 }
138
139 private:
140 class Helper : public ChannelControlHelper {
141 public:
Helper(RefCountedPtr<WeightedChild> weighted_child)142 explicit Helper(RefCountedPtr<WeightedChild> weighted_child)
143 : weighted_child_(std::move(weighted_child)) {}
144
~Helper()145 ~Helper() override { weighted_child_.reset(DEBUG_LOCATION, "Helper"); }
146
147 RefCountedPtr<SubchannelInterface> CreateSubchannel(
148 ServerAddress address, const grpc_channel_args& args) override;
149 void UpdateState(grpc_connectivity_state state,
150 const absl::Status& status,
151 std::unique_ptr<SubchannelPicker> picker) override;
152 void RequestReresolution() override;
153 void AddTraceEvent(TraceSeverity severity,
154 absl::string_view message) override;
155
156 private:
157 RefCountedPtr<WeightedChild> weighted_child_;
158 };
159
160 // Methods for dealing with the child policy.
161 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
162 const grpc_channel_args* args);
163
164 void OnConnectivityStateUpdateLocked(
165 grpc_connectivity_state state, const absl::Status& status,
166 std::unique_ptr<SubchannelPicker> picker);
167
168 static void OnDelayedRemovalTimer(void* arg, grpc_error* error);
169 void OnDelayedRemovalTimerLocked(grpc_error* error);
170
171 // The owning LB policy.
172 RefCountedPtr<WeightedTargetLb> weighted_target_policy_;
173
174 const std::string name_;
175
176 uint32_t weight_;
177
178 OrphanablePtr<LoadBalancingPolicy> child_policy_;
179
180 RefCountedPtr<ChildPickerWrapper> picker_wrapper_;
181 grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
182 bool seen_failure_since_ready_ = false;
183
184 // States for delayed removal.
185 grpc_timer delayed_removal_timer_;
186 grpc_closure on_delayed_removal_timer_;
187 bool delayed_removal_timer_callback_pending_ = false;
188 bool shutdown_ = false;
189 };
190
191 ~WeightedTargetLb() override;
192
193 void ShutdownLocked() override;
194
195 void UpdateStateLocked();
196
197 // Current config from the resolver.
198 RefCountedPtr<WeightedTargetLbConfig> config_;
199
200 // Internal state.
201 bool shutting_down_ = false;
202
203 // Children.
204 std::map<std::string, OrphanablePtr<WeightedChild>> targets_;
205 };
206
207 //
208 // WeightedTargetLb::WeightedPicker
209 //
210
Pick(PickArgs args)211 WeightedTargetLb::PickResult WeightedTargetLb::WeightedPicker::Pick(
212 PickArgs args) {
213 // Generate a random number in [0, total weight).
214 const uint32_t key = rand() % pickers_[pickers_.size() - 1].first;
215 // Find the index in pickers_ corresponding to key.
216 size_t mid = 0;
217 size_t start_index = 0;
218 size_t end_index = pickers_.size() - 1;
219 size_t index = 0;
220 while (end_index > start_index) {
221 mid = (start_index + end_index) / 2;
222 if (pickers_[mid].first > key) {
223 end_index = mid;
224 } else if (pickers_[mid].first < key) {
225 start_index = mid + 1;
226 } else {
227 index = mid + 1;
228 break;
229 }
230 }
231 if (index == 0) index = start_index;
232 GPR_ASSERT(pickers_[index].first > key);
233 // Delegate to the child picker.
234 return pickers_[index].second->Pick(args);
235 }
236
237 //
238 // WeightedTargetLb
239 //
240
WeightedTargetLb(Args args)241 WeightedTargetLb::WeightedTargetLb(Args args)
242 : LoadBalancingPolicy(std::move(args)) {
243 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
244 gpr_log(GPR_INFO, "[weighted_target_lb %p] created", this);
245 }
246 }
247
~WeightedTargetLb()248 WeightedTargetLb::~WeightedTargetLb() {
249 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
250 gpr_log(GPR_INFO,
251 "[weighted_target_lb %p] destroying weighted_target LB policy",
252 this);
253 }
254 }
255
ShutdownLocked()256 void WeightedTargetLb::ShutdownLocked() {
257 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
258 gpr_log(GPR_INFO, "[weighted_target_lb %p] shutting down", this);
259 }
260 shutting_down_ = true;
261 targets_.clear();
262 }
263
ResetBackoffLocked()264 void WeightedTargetLb::ResetBackoffLocked() {
265 for (auto& p : targets_) p.second->ResetBackoffLocked();
266 }
267
UpdateLocked(UpdateArgs args)268 void WeightedTargetLb::UpdateLocked(UpdateArgs args) {
269 if (shutting_down_) return;
270 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
271 gpr_log(GPR_INFO, "[weighted_target_lb %p] Received update", this);
272 }
273 // Update config.
274 config_ = std::move(args.config);
275 // Deactivate the targets not in the new config.
276 for (const auto& p : targets_) {
277 const std::string& name = p.first;
278 WeightedChild* child = p.second.get();
279 if (config_->target_map().find(name) == config_->target_map().end()) {
280 child->DeactivateLocked();
281 }
282 }
283 // Create any children that don't already exist.
284 // Note that we add all children before updating any of them, because
285 // an update may trigger a child to immediately update its
286 // connectivity state (e.g., reporting TRANSIENT_FAILURE immediately when
287 // receiving an empty address list), and we don't want to return an
288 // overall state with incomplete data.
289 for (const auto& p : config_->target_map()) {
290 const std::string& name = p.first;
291 auto it = targets_.find(name);
292 if (it == targets_.end()) {
293 targets_.emplace(name, MakeOrphanable<WeightedChild>(
294 Ref(DEBUG_LOCATION, "WeightedChild"), name));
295 }
296 }
297 // Update all children.
298 HierarchicalAddressMap address_map =
299 MakeHierarchicalAddressMap(args.addresses);
300 for (const auto& p : config_->target_map()) {
301 const std::string& name = p.first;
302 const WeightedTargetLbConfig::ChildConfig& config = p.second;
303 targets_[name]->UpdateLocked(config, std::move(address_map[name]),
304 args.args);
305 }
306 UpdateStateLocked();
307 }
308
UpdateStateLocked()309 void WeightedTargetLb::UpdateStateLocked() {
310 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
311 gpr_log(GPR_INFO,
312 "[weighted_target_lb %p] scanning children to determine "
313 "connectivity state",
314 this);
315 }
316 // Construct a new picker which maintains a map of all child pickers
317 // that are ready. Each child is represented by a portion of the range
318 // proportional to its weight, such that the total range is the sum of the
319 // weights of all children.
320 WeightedPicker::PickerList picker_list;
321 uint32_t end = 0;
322 // Also count the number of children in each state, to determine the
323 // overall state.
324 size_t num_connecting = 0;
325 size_t num_idle = 0;
326 size_t num_transient_failures = 0;
327 for (const auto& p : targets_) {
328 const std::string& child_name = p.first;
329 const WeightedChild* child = p.second.get();
330 // Skip the targets that are not in the latest update.
331 if (config_->target_map().find(child_name) == config_->target_map().end()) {
332 continue;
333 }
334 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
335 gpr_log(GPR_INFO,
336 "[weighted_target_lb %p] child=%s state=%s weight=%d picker=%p",
337 this, child_name.c_str(),
338 ConnectivityStateName(child->connectivity_state()),
339 child->weight(), child->picker_wrapper().get());
340 }
341 switch (child->connectivity_state()) {
342 case GRPC_CHANNEL_READY: {
343 end += child->weight();
344 picker_list.push_back(std::make_pair(end, child->picker_wrapper()));
345 break;
346 }
347 case GRPC_CHANNEL_CONNECTING: {
348 ++num_connecting;
349 break;
350 }
351 case GRPC_CHANNEL_IDLE: {
352 ++num_idle;
353 break;
354 }
355 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
356 ++num_transient_failures;
357 break;
358 }
359 default:
360 GPR_UNREACHABLE_CODE(return );
361 }
362 }
363 // Determine aggregated connectivity state.
364 grpc_connectivity_state connectivity_state;
365 if (!picker_list.empty()) {
366 connectivity_state = GRPC_CHANNEL_READY;
367 } else if (num_connecting > 0) {
368 connectivity_state = GRPC_CHANNEL_CONNECTING;
369 } else if (num_idle > 0) {
370 connectivity_state = GRPC_CHANNEL_IDLE;
371 } else {
372 connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
373 }
374 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
375 gpr_log(GPR_INFO, "[weighted_target_lb %p] connectivity changed to %s",
376 this, ConnectivityStateName(connectivity_state));
377 }
378 std::unique_ptr<SubchannelPicker> picker;
379 absl::Status status;
380 switch (connectivity_state) {
381 case GRPC_CHANNEL_READY:
382 picker = absl::make_unique<WeightedPicker>(std::move(picker_list));
383 break;
384 case GRPC_CHANNEL_CONNECTING:
385 case GRPC_CHANNEL_IDLE:
386 picker =
387 absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
388 break;
389 default:
390 grpc_error* error = grpc_error_set_int(
391 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
392 "weighted_target: all children report state TRANSIENT_FAILURE"),
393 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
394 status = grpc_error_to_absl_status(error);
395 picker = absl::make_unique<TransientFailurePicker>(error);
396 }
397 channel_control_helper()->UpdateState(connectivity_state, status,
398 std::move(picker));
399 }
400
401 //
402 // WeightedTargetLb::WeightedChild
403 //
404
WeightedChild(RefCountedPtr<WeightedTargetLb> weighted_target_policy,const std::string & name)405 WeightedTargetLb::WeightedChild::WeightedChild(
406 RefCountedPtr<WeightedTargetLb> weighted_target_policy,
407 const std::string& name)
408 : weighted_target_policy_(std::move(weighted_target_policy)), name_(name) {
409 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
410 gpr_log(GPR_INFO, "[weighted_target_lb %p] created WeightedChild %p for %s",
411 weighted_target_policy_.get(), this, name_.c_str());
412 }
413 GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this,
414 grpc_schedule_on_exec_ctx);
415 }
416
~WeightedChild()417 WeightedTargetLb::WeightedChild::~WeightedChild() {
418 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
419 gpr_log(GPR_INFO,
420 "[weighted_target_lb %p] WeightedChild %p %s: destroying child",
421 weighted_target_policy_.get(), this, name_.c_str());
422 }
423 weighted_target_policy_.reset(DEBUG_LOCATION, "WeightedChild");
424 }
425
Orphan()426 void WeightedTargetLb::WeightedChild::Orphan() {
427 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
428 gpr_log(GPR_INFO,
429 "[weighted_target_lb %p] WeightedChild %p %s: shutting down child",
430 weighted_target_policy_.get(), this, name_.c_str());
431 }
432 // Remove the child policy's interested_parties pollset_set from the
433 // xDS policy.
434 grpc_pollset_set_del_pollset_set(
435 child_policy_->interested_parties(),
436 weighted_target_policy_->interested_parties());
437 child_policy_.reset();
438 // Drop our ref to the child's picker, in case it's holding a ref to
439 // the child.
440 picker_wrapper_.reset();
441 if (delayed_removal_timer_callback_pending_) {
442 delayed_removal_timer_callback_pending_ = false;
443 grpc_timer_cancel(&delayed_removal_timer_);
444 }
445 shutdown_ = true;
446 Unref();
447 }
448
449 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const grpc_channel_args * args)450 WeightedTargetLb::WeightedChild::CreateChildPolicyLocked(
451 const grpc_channel_args* args) {
452 LoadBalancingPolicy::Args lb_policy_args;
453 lb_policy_args.work_serializer = weighted_target_policy_->work_serializer();
454 lb_policy_args.args = args;
455 lb_policy_args.channel_control_helper =
456 absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
457 OrphanablePtr<LoadBalancingPolicy> lb_policy =
458 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
459 &grpc_lb_weighted_target_trace);
460 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
461 gpr_log(GPR_INFO,
462 "[weighted_target_lb %p] WeightedChild %p %s: Created new child "
463 "policy handler %p",
464 weighted_target_policy_.get(), this, name_.c_str(),
465 lb_policy.get());
466 }
467 // Add the xDS's interested_parties pollset_set to that of the newly created
468 // child policy. This will make the child policy progress upon activity on
469 // xDS LB, which in turn is tied to the application's call.
470 grpc_pollset_set_add_pollset_set(
471 lb_policy->interested_parties(),
472 weighted_target_policy_->interested_parties());
473 return lb_policy;
474 }
475
UpdateLocked(const WeightedTargetLbConfig::ChildConfig & config,ServerAddressList addresses,const grpc_channel_args * args)476 void WeightedTargetLb::WeightedChild::UpdateLocked(
477 const WeightedTargetLbConfig::ChildConfig& config,
478 ServerAddressList addresses, const grpc_channel_args* args) {
479 if (weighted_target_policy_->shutting_down_) return;
480 // Update child weight.
481 weight_ = config.weight;
482 // Reactivate if needed.
483 if (delayed_removal_timer_callback_pending_) {
484 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
485 gpr_log(GPR_INFO,
486 "[weighted_target_lb %p] WeightedChild %p %s: reactivating",
487 weighted_target_policy_.get(), this, name_.c_str());
488 }
489 delayed_removal_timer_callback_pending_ = false;
490 grpc_timer_cancel(&delayed_removal_timer_);
491 }
492 // Create child policy if needed.
493 if (child_policy_ == nullptr) {
494 child_policy_ = CreateChildPolicyLocked(args);
495 }
496 // Construct update args.
497 UpdateArgs update_args;
498 update_args.config = config.config;
499 update_args.addresses = std::move(addresses);
500 update_args.args = grpc_channel_args_copy(args);
501 // Update the policy.
502 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
503 gpr_log(GPR_INFO,
504 "[weighted_target_lb %p] WeightedChild %p %s: Updating child "
505 "policy handler %p",
506 weighted_target_policy_.get(), this, name_.c_str(),
507 child_policy_.get());
508 }
509 child_policy_->UpdateLocked(std::move(update_args));
510 }
511
ResetBackoffLocked()512 void WeightedTargetLb::WeightedChild::ResetBackoffLocked() {
513 child_policy_->ResetBackoffLocked();
514 }
515
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)516 void WeightedTargetLb::WeightedChild::OnConnectivityStateUpdateLocked(
517 grpc_connectivity_state state, const absl::Status& status,
518 std::unique_ptr<SubchannelPicker> picker) {
519 // Cache the picker in the WeightedChild.
520 picker_wrapper_ = MakeRefCounted<ChildPickerWrapper>(std::move(picker));
521 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
522 gpr_log(GPR_INFO,
523 "[weighted_target_lb %p] WeightedChild %p %s: connectivity "
524 "state update: state=%s (%s) picker_wrapper=%p",
525 weighted_target_policy_.get(), this, name_.c_str(),
526 ConnectivityStateName(state), status.ToString().c_str(),
527 picker_wrapper_.get());
528 }
529 // If the child reports IDLE, immediately tell it to exit idle.
530 if (state == GRPC_CHANNEL_IDLE) child_policy_->ExitIdleLocked();
531 // Decide what state to report for aggregation purposes.
532 // If we haven't seen a failure since the last time we were in state
533 // READY, then we report the state change as-is. However, once we do see
534 // a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
535 // changes until we go back into state READY.
536 if (!seen_failure_since_ready_) {
537 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
538 seen_failure_since_ready_ = true;
539 }
540 } else {
541 if (state != GRPC_CHANNEL_READY) return;
542 seen_failure_since_ready_ = false;
543 }
544 connectivity_state_ = state;
545 // Notify the LB policy.
546 weighted_target_policy_->UpdateStateLocked();
547 }
548
DeactivateLocked()549 void WeightedTargetLb::WeightedChild::DeactivateLocked() {
550 // If already deactivated, don't do that again.
551 if (weight_ == 0) return;
552 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_weighted_target_trace)) {
553 gpr_log(GPR_INFO,
554 "[weighted_target_lb %p] WeightedChild %p %s: deactivating",
555 weighted_target_policy_.get(), this, name_.c_str());
556 }
557 // Set the child weight to 0 so that future picker won't contain this child.
558 weight_ = 0;
559 // Start a timer to delete the child.
560 Ref(DEBUG_LOCATION, "WeightedChild+timer").release();
561 delayed_removal_timer_callback_pending_ = true;
562 grpc_timer_init(&delayed_removal_timer_,
563 ExecCtx::Get()->Now() + kChildRetentionIntervalMs,
564 &on_delayed_removal_timer_);
565 }
566
OnDelayedRemovalTimer(void * arg,grpc_error * error)567 void WeightedTargetLb::WeightedChild::OnDelayedRemovalTimer(void* arg,
568 grpc_error* error) {
569 WeightedChild* self = static_cast<WeightedChild*>(arg);
570 GRPC_ERROR_REF(error); // ref owned by lambda
571 self->weighted_target_policy_->work_serializer()->Run(
572 [self, error]() { self->OnDelayedRemovalTimerLocked(error); },
573 DEBUG_LOCATION);
574 }
575
OnDelayedRemovalTimerLocked(grpc_error * error)576 void WeightedTargetLb::WeightedChild::OnDelayedRemovalTimerLocked(
577 grpc_error* error) {
578 if (error == GRPC_ERROR_NONE && delayed_removal_timer_callback_pending_ &&
579 !shutdown_ && weight_ == 0) {
580 delayed_removal_timer_callback_pending_ = false;
581 weighted_target_policy_->targets_.erase(name_);
582 }
583 Unref(DEBUG_LOCATION, "WeightedChild+timer");
584 GRPC_ERROR_UNREF(error);
585 }
586
587 //
588 // WeightedTargetLb::WeightedChild::Helper
589 //
590
591 RefCountedPtr<SubchannelInterface>
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)592 WeightedTargetLb::WeightedChild::Helper::CreateSubchannel(
593 ServerAddress address, const grpc_channel_args& args) {
594 if (weighted_child_->weighted_target_policy_->shutting_down_) return nullptr;
595 return weighted_child_->weighted_target_policy_->channel_control_helper()
596 ->CreateSubchannel(std::move(address), args);
597 }
598
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)599 void WeightedTargetLb::WeightedChild::Helper::UpdateState(
600 grpc_connectivity_state state, const absl::Status& status,
601 std::unique_ptr<SubchannelPicker> picker) {
602 if (weighted_child_->weighted_target_policy_->shutting_down_) return;
603 weighted_child_->OnConnectivityStateUpdateLocked(state, status,
604 std::move(picker));
605 }
606
RequestReresolution()607 void WeightedTargetLb::WeightedChild::Helper::RequestReresolution() {
608 if (weighted_child_->weighted_target_policy_->shutting_down_) return;
609 weighted_child_->weighted_target_policy_->channel_control_helper()
610 ->RequestReresolution();
611 }
612
AddTraceEvent(TraceSeverity severity,absl::string_view message)613 void WeightedTargetLb::WeightedChild::Helper::AddTraceEvent(
614 TraceSeverity severity, absl::string_view message) {
615 if (weighted_child_->weighted_target_policy_->shutting_down_) return;
616 weighted_child_->weighted_target_policy_->channel_control_helper()
617 ->AddTraceEvent(severity, message);
618 }
619
620 //
621 // factory
622 //
623
624 class WeightedTargetLbFactory : public LoadBalancingPolicyFactory {
625 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const626 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
627 LoadBalancingPolicy::Args args) const override {
628 return MakeOrphanable<WeightedTargetLb>(std::move(args));
629 }
630
name() const631 const char* name() const override { return kWeightedTarget; }
632
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const633 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
634 const Json& json, grpc_error** error) const override {
635 GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
636 if (json.type() == Json::Type::JSON_NULL) {
637 // weighted_target was mentioned as a policy in the deprecated
638 // loadBalancingPolicy field or in the client API.
639 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
640 "field:loadBalancingPolicy error:weighted_target policy requires "
641 "configuration. Please use loadBalancingConfig field of service "
642 "config instead.");
643 return nullptr;
644 }
645 std::vector<grpc_error*> error_list;
646 // Weight map.
647 WeightedTargetLbConfig::TargetMap target_map;
648 auto it = json.object_value().find("targets");
649 if (it == json.object_value().end()) {
650 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
651 "field:targets error:required field not present"));
652 } else if (it->second.type() != Json::Type::OBJECT) {
653 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
654 "field:targets error:type should be object"));
655 } else {
656 for (const auto& p : it->second.object_value()) {
657 WeightedTargetLbConfig::ChildConfig child_config;
658 std::vector<grpc_error*> child_errors =
659 ParseChildConfig(p.second, &child_config);
660 if (!child_errors.empty()) {
661 // Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
662 // string is not static in this case.
663 grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
664 absl::StrCat("field:targets key:", p.first).c_str());
665 for (grpc_error* child_error : child_errors) {
666 error = grpc_error_add_child(error, child_error);
667 }
668 error_list.push_back(error);
669 } else {
670 target_map[p.first] = std::move(child_config);
671 }
672 }
673 }
674 if (!error_list.empty()) {
675 *error = GRPC_ERROR_CREATE_FROM_VECTOR(
676 "weighted_target_experimental LB policy config", &error_list);
677 return nullptr;
678 }
679 return MakeRefCounted<WeightedTargetLbConfig>(std::move(target_map));
680 }
681
682 private:
ParseChildConfig(const Json & json,WeightedTargetLbConfig::ChildConfig * child_config)683 static std::vector<grpc_error*> ParseChildConfig(
684 const Json& json, WeightedTargetLbConfig::ChildConfig* child_config) {
685 std::vector<grpc_error*> error_list;
686 if (json.type() != Json::Type::OBJECT) {
687 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
688 "value should be of type object"));
689 return error_list;
690 }
691 // Weight.
692 auto it = json.object_value().find("weight");
693 if (it == json.object_value().end()) {
694 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
695 "required field \"weight\" not specified"));
696 } else if (it->second.type() != Json::Type::NUMBER) {
697 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
698 "field:weight error:must be of type number"));
699 } else {
700 int weight = gpr_parse_nonnegative_int(it->second.string_value().c_str());
701 if (weight == -1) {
702 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
703 "field:weight error:unparseable value"));
704 } else if (weight == 0) {
705 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
706 "field:weight error:value must be greater than zero"));
707 } else {
708 child_config->weight = weight;
709 }
710 }
711 // Child policy.
712 it = json.object_value().find("childPolicy");
713 if (it != json.object_value().end()) {
714 grpc_error* parse_error = GRPC_ERROR_NONE;
715 child_config->config =
716 LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(it->second,
717 &parse_error);
718 if (child_config->config == nullptr) {
719 GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
720 std::vector<grpc_error*> child_errors;
721 child_errors.push_back(parse_error);
722 error_list.push_back(
723 GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
724 }
725 }
726 return error_list;
727 }
728 };
729
730 } // namespace
731
732 } // namespace grpc_core
733
734 //
735 // Plugin registration
736 //
737
grpc_lb_policy_weighted_target_init()738 void grpc_lb_policy_weighted_target_init() {
739 grpc_core::LoadBalancingPolicyRegistry::Builder::
740 RegisterLoadBalancingPolicyFactory(
741 absl::make_unique<grpc_core::WeightedTargetLbFactory>());
742 }
743
grpc_lb_policy_weighted_target_shutdown()744 void grpc_lb_policy_weighted_target_shutdown() {}
745