1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <set>
20 #include <string>
21 #include <vector>
22
23 #include "absl/status/status.h"
24 #include "absl/strings/str_cat.h"
25 #include "absl/strings/string_view.h"
26
27 #include <grpc/grpc.h>
28
29 #include "src/core/ext/filters/client_channel/lb_policy.h"
30 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
31 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
32 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
33 #include "src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/gpr/string.h"
36 #include "src/core/lib/gprpp/orphanable.h"
37 #include "src/core/lib/gprpp/ref_counted_ptr.h"
38 #include "src/core/lib/iomgr/timer.h"
39 #include "src/core/lib/iomgr/work_serializer.h"
40 #include "src/core/lib/transport/error_utils.h"
41
42 #define GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS (15 * 60 * 1000)
43
44 namespace grpc_core {
45
46 TraceFlag grpc_xds_cluster_manager_lb_trace(false, "xds_cluster_manager_lb");
47
48 namespace {
49
50 constexpr char kXdsClusterManager[] = "xds_cluster_manager_experimental";
51
52 // Config for xds_cluster_manager LB policy.
53 class XdsClusterManagerLbConfig : public LoadBalancingPolicy::Config {
54 public:
55 using ClusterMap =
56 std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>;
57
XdsClusterManagerLbConfig(ClusterMap cluster_map)58 explicit XdsClusterManagerLbConfig(ClusterMap cluster_map)
59 : cluster_map_(std::move(cluster_map)) {}
60
name() const61 const char* name() const override { return kXdsClusterManager; }
62
cluster_map() const63 const ClusterMap& cluster_map() const { return cluster_map_; }
64
65 private:
66 ClusterMap cluster_map_;
67 };
68
69 // xds_cluster_manager LB policy.
70 class XdsClusterManagerLb : public LoadBalancingPolicy {
71 public:
72 explicit XdsClusterManagerLb(Args args);
73
name() const74 const char* name() const override { return kXdsClusterManager; }
75
76 void UpdateLocked(UpdateArgs args) override;
77 void ExitIdleLocked() override;
78 void ResetBackoffLocked() override;
79
80 private:
81 // A simple wrapper for ref-counting a picker from the child policy.
82 class ChildPickerWrapper : public RefCounted<ChildPickerWrapper> {
83 public:
ChildPickerWrapper(std::string name,std::unique_ptr<SubchannelPicker> picker)84 ChildPickerWrapper(std::string name,
85 std::unique_ptr<SubchannelPicker> picker)
86 : name_(std::move(name)), picker_(std::move(picker)) {}
Pick(PickArgs args)87 PickResult Pick(PickArgs args) { return picker_->Pick(args); }
88
name() const89 const std::string& name() const { return name_; }
90
91 private:
92 std::string name_;
93 std::unique_ptr<SubchannelPicker> picker_;
94 };
95
96 // Picks a child using prefix or path matching and then delegates to that
97 // child's picker.
98 class ClusterPicker : public SubchannelPicker {
99 public:
100 // Maintains a map of cluster names to pickers.
101 using ClusterMap = std::map<absl::string_view /*cluster_name*/,
102 RefCountedPtr<ChildPickerWrapper>>;
103
104 // It is required that the keys of cluster_map have to live at least as long
105 // as the ClusterPicker instance.
ClusterPicker(ClusterMap cluster_map)106 explicit ClusterPicker(ClusterMap cluster_map)
107 : cluster_map_(std::move(cluster_map)) {}
108
109 PickResult Pick(PickArgs args) override;
110
111 private:
112 ClusterMap cluster_map_;
113 };
114
115 // Each ClusterChild holds a ref to its parent XdsClusterManagerLb.
116 class ClusterChild : public InternallyRefCounted<ClusterChild> {
117 public:
118 ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
119 const std::string& name);
120 ~ClusterChild() override;
121
122 void Orphan() override;
123
124 void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
125 const ServerAddressList& addresses,
126 const grpc_channel_args* args);
127 void ExitIdleLocked();
128 void ResetBackoffLocked();
129 void DeactivateLocked();
130
connectivity_state() const131 grpc_connectivity_state connectivity_state() const {
132 return connectivity_state_;
133 }
picker_wrapper() const134 RefCountedPtr<ChildPickerWrapper> picker_wrapper() const {
135 return picker_wrapper_;
136 }
137
138 private:
139 class Helper : public ChannelControlHelper {
140 public:
Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)141 explicit Helper(RefCountedPtr<ClusterChild> xds_cluster_manager_child)
142 : xds_cluster_manager_child_(std::move(xds_cluster_manager_child)) {}
143
~Helper()144 ~Helper() override {
145 xds_cluster_manager_child_.reset(DEBUG_LOCATION, "Helper");
146 }
147
148 RefCountedPtr<SubchannelInterface> CreateSubchannel(
149 ServerAddress address, const grpc_channel_args& args) override;
150 void UpdateState(grpc_connectivity_state state,
151 const absl::Status& status,
152 std::unique_ptr<SubchannelPicker> picker) override;
153 void RequestReresolution() override;
154 void AddTraceEvent(TraceSeverity severity,
155 absl::string_view message) override;
156
157 private:
158 RefCountedPtr<ClusterChild> xds_cluster_manager_child_;
159 };
160
161 // Methods for dealing with the child policy.
162 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
163 const grpc_channel_args* args);
164
165 static void OnDelayedRemovalTimer(void* arg, grpc_error* error);
166 void OnDelayedRemovalTimerLocked(grpc_error* error);
167
168 // The owning LB policy.
169 RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy_;
170
171 // Points to the corresponding key in children map.
172 const std::string name_;
173
174 OrphanablePtr<LoadBalancingPolicy> child_policy_;
175
176 RefCountedPtr<ChildPickerWrapper> picker_wrapper_;
177 grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE;
178 bool seen_failure_since_ready_ = false;
179
180 // States for delayed removal.
181 grpc_timer delayed_removal_timer_;
182 grpc_closure on_delayed_removal_timer_;
183 bool delayed_removal_timer_callback_pending_ = false;
184 bool shutdown_ = false;
185 };
186
187 ~XdsClusterManagerLb() override;
188
189 void ShutdownLocked() override;
190
191 void UpdateStateLocked();
192
193 // Current config from the resolver.
194 RefCountedPtr<XdsClusterManagerLbConfig> config_;
195
196 // Internal state.
197 bool shutting_down_ = false;
198
199 // Children.
200 std::map<std::string, OrphanablePtr<ClusterChild>> children_;
201 };
202
203 //
204 // XdsClusterManagerLb::ClusterPicker
205 //
206
Pick(PickArgs args)207 XdsClusterManagerLb::PickResult XdsClusterManagerLb::ClusterPicker::Pick(
208 PickArgs args) {
209 auto cluster_name =
210 args.call_state->ExperimentalGetCallAttribute(kXdsClusterAttribute);
211 auto it = cluster_map_.find(cluster_name);
212 if (it != cluster_map_.end()) {
213 return it->second->Pick(args);
214 }
215 PickResult result;
216 result.type = PickResult::PICK_FAILED;
217 result.error = grpc_error_set_int(
218 GRPC_ERROR_CREATE_FROM_COPIED_STRING(
219 absl::StrCat("xds cluster manager picker: unknown cluster \"",
220 cluster_name, "\"")
221 .c_str()),
222 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
223 return result;
224 }
225
226 //
227 // XdsClusterManagerLb
228 //
229
XdsClusterManagerLb(Args args)230 XdsClusterManagerLb::XdsClusterManagerLb(Args args)
231 : LoadBalancingPolicy(std::move(args)) {}
232
~XdsClusterManagerLb()233 XdsClusterManagerLb::~XdsClusterManagerLb() {
234 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
235 gpr_log(
236 GPR_INFO,
237 "[xds_cluster_manager_lb %p] destroying xds_cluster_manager LB policy",
238 this);
239 }
240 }
241
ShutdownLocked()242 void XdsClusterManagerLb::ShutdownLocked() {
243 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
244 gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] shutting down", this);
245 }
246 shutting_down_ = true;
247 children_.clear();
248 }
249
ExitIdleLocked()250 void XdsClusterManagerLb::ExitIdleLocked() {
251 for (auto& p : children_) p.second->ExitIdleLocked();
252 }
253
ResetBackoffLocked()254 void XdsClusterManagerLb::ResetBackoffLocked() {
255 for (auto& p : children_) p.second->ResetBackoffLocked();
256 }
257
UpdateLocked(UpdateArgs args)258 void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) {
259 if (shutting_down_) return;
260 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
261 gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] Received update", this);
262 }
263 // Update config.
264 config_ = std::move(args.config);
265 // Deactivate the children not in the new config.
266 for (const auto& p : children_) {
267 const std::string& name = p.first;
268 ClusterChild* child = p.second.get();
269 if (config_->cluster_map().find(name) == config_->cluster_map().end()) {
270 child->DeactivateLocked();
271 }
272 }
273 // Add or update the children in the new config.
274 for (const auto& p : config_->cluster_map()) {
275 const std::string& name = p.first;
276 const RefCountedPtr<LoadBalancingPolicy::Config>& config = p.second;
277 auto it = children_.find(name);
278 if (it == children_.end()) {
279 it = children_
280 .emplace(name, MakeOrphanable<ClusterChild>(
281 Ref(DEBUG_LOCATION, "ClusterChild"), name))
282 .first;
283 }
284 it->second->UpdateLocked(config, args.addresses, args.args);
285 }
286 UpdateStateLocked();
287 }
288
UpdateStateLocked()289 void XdsClusterManagerLb::UpdateStateLocked() {
290 // Also count the number of children in each state, to determine the
291 // overall state.
292 size_t num_ready = 0;
293 size_t num_connecting = 0;
294 size_t num_idle = 0;
295 size_t num_transient_failures = 0;
296 for (const auto& p : children_) {
297 const auto& child_name = p.first;
298 const ClusterChild* child = p.second.get();
299 // Skip the children that are not in the latest update.
300 if (config_->cluster_map().find(child_name) ==
301 config_->cluster_map().end()) {
302 continue;
303 }
304 switch (child->connectivity_state()) {
305 case GRPC_CHANNEL_READY: {
306 ++num_ready;
307 break;
308 }
309 case GRPC_CHANNEL_CONNECTING: {
310 ++num_connecting;
311 break;
312 }
313 case GRPC_CHANNEL_IDLE: {
314 ++num_idle;
315 break;
316 }
317 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
318 ++num_transient_failures;
319 break;
320 }
321 default:
322 GPR_UNREACHABLE_CODE(return );
323 }
324 }
325 // Determine aggregated connectivity state.
326 grpc_connectivity_state connectivity_state;
327 if (num_ready > 0) {
328 connectivity_state = GRPC_CHANNEL_READY;
329 } else if (num_connecting > 0) {
330 connectivity_state = GRPC_CHANNEL_CONNECTING;
331 } else if (num_idle > 0) {
332 connectivity_state = GRPC_CHANNEL_IDLE;
333 } else {
334 connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
335 }
336 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
337 gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] connectivity changed to %s",
338 this, ConnectivityStateName(connectivity_state));
339 }
340 std::unique_ptr<SubchannelPicker> picker;
341 absl::Status status;
342 switch (connectivity_state) {
343 case GRPC_CHANNEL_READY: {
344 ClusterPicker::ClusterMap cluster_map;
345 for (const auto& p : config_->cluster_map()) {
346 const std::string& cluster_name = p.first;
347 RefCountedPtr<ChildPickerWrapper>& child_picker =
348 cluster_map[cluster_name];
349 child_picker = children_[cluster_name]->picker_wrapper();
350 if (child_picker == nullptr) {
351 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
352 gpr_log(
353 GPR_INFO,
354 "[xds_cluster_manager_lb %p] child %s has not yet returned a "
355 "picker; creating a QueuePicker.",
356 this, cluster_name.c_str());
357 }
358 child_picker = MakeRefCounted<ChildPickerWrapper>(
359 cluster_name, absl::make_unique<QueuePicker>(
360 Ref(DEBUG_LOCATION, "QueuePicker")));
361 }
362 }
363 picker = absl::make_unique<ClusterPicker>(std::move(cluster_map));
364 break;
365 }
366 case GRPC_CHANNEL_CONNECTING:
367 case GRPC_CHANNEL_IDLE:
368 picker =
369 absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker"));
370 break;
371 default:
372 grpc_error* error = grpc_error_set_int(
373 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
374 "TRANSIENT_FAILURE from XdsClusterManagerLb"),
375 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
376 status = grpc_error_to_absl_status(error);
377 picker = absl::make_unique<TransientFailurePicker>(error);
378 }
379 channel_control_helper()->UpdateState(connectivity_state, status,
380 std::move(picker));
381 }
382
383 //
384 // XdsClusterManagerLb::ClusterChild
385 //
386
ClusterChild(RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,const std::string & name)387 XdsClusterManagerLb::ClusterChild::ClusterChild(
388 RefCountedPtr<XdsClusterManagerLb> xds_cluster_manager_policy,
389 const std::string& name)
390 : xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)),
391 name_(name) {
392 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
393 gpr_log(GPR_INFO,
394 "[xds_cluster_manager_lb %p] created ClusterChild %p for %s",
395 xds_cluster_manager_policy_.get(), this, name_.c_str());
396 }
397 GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this,
398 grpc_schedule_on_exec_ctx);
399 }
400
~ClusterChild()401 XdsClusterManagerLb::ClusterChild::~ClusterChild() {
402 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
403 gpr_log(GPR_INFO,
404 "[xds_cluster_manager_lb %p] ClusterChild %p: destroying "
405 "child",
406 xds_cluster_manager_policy_.get(), this);
407 }
408 xds_cluster_manager_policy_.reset(DEBUG_LOCATION, "ClusterChild");
409 }
410
Orphan()411 void XdsClusterManagerLb::ClusterChild::Orphan() {
412 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
413 gpr_log(GPR_INFO,
414 "[xds_cluster_manager_lb %p] ClusterChild %p %s: "
415 "shutting down child",
416 xds_cluster_manager_policy_.get(), this, name_.c_str());
417 }
418 // Remove the child policy's interested_parties pollset_set from the
419 // xDS policy.
420 grpc_pollset_set_del_pollset_set(
421 child_policy_->interested_parties(),
422 xds_cluster_manager_policy_->interested_parties());
423 child_policy_.reset();
424 // Drop our ref to the child's picker, in case it's holding a ref to
425 // the child.
426 picker_wrapper_.reset();
427 if (delayed_removal_timer_callback_pending_) {
428 grpc_timer_cancel(&delayed_removal_timer_);
429 }
430 shutdown_ = true;
431 Unref();
432 }
433
434 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const grpc_channel_args * args)435 XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
436 const grpc_channel_args* args) {
437 LoadBalancingPolicy::Args lb_policy_args;
438 lb_policy_args.work_serializer =
439 xds_cluster_manager_policy_->work_serializer();
440 lb_policy_args.args = args;
441 lb_policy_args.channel_control_helper =
442 absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
443 OrphanablePtr<LoadBalancingPolicy> lb_policy =
444 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
445 &grpc_xds_cluster_manager_lb_trace);
446 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
447 gpr_log(GPR_INFO,
448 "[xds_cluster_manager_lb %p] ClusterChild %p %s: Created "
449 "new child "
450 "policy handler %p",
451 xds_cluster_manager_policy_.get(), this, name_.c_str(),
452 lb_policy.get());
453 }
454 // Add the xDS's interested_parties pollset_set to that of the newly created
455 // child policy. This will make the child policy progress upon activity on
456 // xDS LB, which in turn is tied to the application's call.
457 grpc_pollset_set_add_pollset_set(
458 lb_policy->interested_parties(),
459 xds_cluster_manager_policy_->interested_parties());
460 return lb_policy;
461 }
462
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,const ServerAddressList & addresses,const grpc_channel_args * args)463 void XdsClusterManagerLb::ClusterChild::UpdateLocked(
464 RefCountedPtr<LoadBalancingPolicy::Config> config,
465 const ServerAddressList& addresses, const grpc_channel_args* args) {
466 if (xds_cluster_manager_policy_->shutting_down_) return;
467 // Update child weight.
468 // Reactivate if needed.
469 if (delayed_removal_timer_callback_pending_) {
470 delayed_removal_timer_callback_pending_ = false;
471 grpc_timer_cancel(&delayed_removal_timer_);
472 }
473 // Create child policy if needed.
474 if (child_policy_ == nullptr) {
475 child_policy_ = CreateChildPolicyLocked(args);
476 }
477 // Construct update args.
478 UpdateArgs update_args;
479 update_args.config = std::move(config);
480 update_args.addresses = addresses;
481 update_args.args = grpc_channel_args_copy(args);
482 // Update the policy.
483 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
484 gpr_log(GPR_INFO,
485 "[xds_cluster_manager_lb %p] ClusterChild %p %s: "
486 "Updating child "
487 "policy handler %p",
488 xds_cluster_manager_policy_.get(), this, name_.c_str(),
489 child_policy_.get());
490 }
491 child_policy_->UpdateLocked(std::move(update_args));
492 }
493
ExitIdleLocked()494 void XdsClusterManagerLb::ClusterChild::ExitIdleLocked() {
495 child_policy_->ExitIdleLocked();
496 }
497
ResetBackoffLocked()498 void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() {
499 child_policy_->ResetBackoffLocked();
500 }
501
DeactivateLocked()502 void XdsClusterManagerLb::ClusterChild::DeactivateLocked() {
503 // If already deactivated, don't do that again.
504 if (delayed_removal_timer_callback_pending_ == true) return;
505 // Set the child weight to 0 so that future picker won't contain this child.
506 // Start a timer to delete the child.
507 Ref(DEBUG_LOCATION, "ClusterChild+timer").release();
508 grpc_timer_init(&delayed_removal_timer_,
509 ExecCtx::Get()->Now() +
510 GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS,
511 &on_delayed_removal_timer_);
512 delayed_removal_timer_callback_pending_ = true;
513 }
514
OnDelayedRemovalTimer(void * arg,grpc_error * error)515 void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimer(
516 void* arg, grpc_error* error) {
517 ClusterChild* self = static_cast<ClusterChild*>(arg);
518 GRPC_ERROR_REF(error); // Ref owned by the lambda
519 self->xds_cluster_manager_policy_->work_serializer()->Run(
520 [self, error]() { self->OnDelayedRemovalTimerLocked(error); },
521 DEBUG_LOCATION);
522 }
523
OnDelayedRemovalTimerLocked(grpc_error * error)524 void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked(
525 grpc_error* error) {
526 delayed_removal_timer_callback_pending_ = false;
527 if (error == GRPC_ERROR_NONE && !shutdown_) {
528 xds_cluster_manager_policy_->children_.erase(name_);
529 }
530 Unref(DEBUG_LOCATION, "ClusterChild+timer");
531 GRPC_ERROR_UNREF(error);
532 }
533
534 //
535 // XdsClusterManagerLb::ClusterChild::Helper
536 //
537
538 RefCountedPtr<SubchannelInterface>
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)539 XdsClusterManagerLb::ClusterChild::Helper::CreateSubchannel(
540 ServerAddress address, const grpc_channel_args& args) {
541 if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
542 return nullptr;
543 }
544 return xds_cluster_manager_child_->xds_cluster_manager_policy_
545 ->channel_control_helper()
546 ->CreateSubchannel(std::move(address), args);
547 }
548
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)549 void XdsClusterManagerLb::ClusterChild::Helper::UpdateState(
550 grpc_connectivity_state state, const absl::Status& status,
551 std::unique_ptr<SubchannelPicker> picker) {
552 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) {
553 gpr_log(
554 GPR_INFO,
555 "[xds_cluster_manager_lb %p] child %s: received update: state=%s (%s) "
556 "picker=%p",
557 xds_cluster_manager_child_->xds_cluster_manager_policy_.get(),
558 xds_cluster_manager_child_->name_.c_str(), ConnectivityStateName(state),
559 status.ToString().c_str(), picker.get());
560 }
561 if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
562 return;
563 }
564 // Cache the picker in the ClusterChild.
565 xds_cluster_manager_child_->picker_wrapper_ =
566 MakeRefCounted<ChildPickerWrapper>(xds_cluster_manager_child_->name_,
567 std::move(picker));
568 // Decide what state to report for aggregation purposes.
569 // If we haven't seen a failure since the last time we were in state
570 // READY, then we report the state change as-is. However, once we do see
571 // a failure, we report TRANSIENT_FAILURE and ignore any subsequent state
572 // changes until we go back into state READY.
573 if (!xds_cluster_manager_child_->seen_failure_since_ready_) {
574 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
575 xds_cluster_manager_child_->seen_failure_since_ready_ = true;
576 }
577 } else {
578 if (state != GRPC_CHANNEL_READY) return;
579 xds_cluster_manager_child_->seen_failure_since_ready_ = false;
580 }
581 xds_cluster_manager_child_->connectivity_state_ = state;
582 // Notify the LB policy.
583 xds_cluster_manager_child_->xds_cluster_manager_policy_->UpdateStateLocked();
584 }
585
RequestReresolution()586 void XdsClusterManagerLb::ClusterChild::Helper::RequestReresolution() {
587 if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
588 return;
589 }
590 xds_cluster_manager_child_->xds_cluster_manager_policy_
591 ->channel_control_helper()
592 ->RequestReresolution();
593 }
594
AddTraceEvent(TraceSeverity severity,absl::string_view message)595 void XdsClusterManagerLb::ClusterChild::Helper::AddTraceEvent(
596 TraceSeverity severity, absl::string_view message) {
597 if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) {
598 return;
599 }
600 xds_cluster_manager_child_->xds_cluster_manager_policy_
601 ->channel_control_helper()
602 ->AddTraceEvent(severity, message);
603 }
604
605 //
606 // factory
607 //
608
609 class XdsClusterManagerLbFactory : public LoadBalancingPolicyFactory {
610 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const611 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
612 LoadBalancingPolicy::Args args) const override {
613 return MakeOrphanable<XdsClusterManagerLb>(std::move(args));
614 }
615
name() const616 const char* name() const override { return kXdsClusterManager; }
617
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const618 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
619 const Json& json, grpc_error** error) const override {
620 GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
621 if (json.type() == Json::Type::JSON_NULL) {
622 // xds_cluster_manager was mentioned as a policy in the deprecated
623 // loadBalancingPolicy field or in the client API.
624 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
625 "field:loadBalancingPolicy error:xds_cluster_manager policy requires "
626 "configuration. Please use loadBalancingConfig field of service "
627 "config instead.");
628 return nullptr;
629 }
630 std::vector<grpc_error*> error_list;
631 XdsClusterManagerLbConfig::ClusterMap cluster_map;
632 std::set<std::string /*cluster_name*/> clusters_to_be_used;
633 auto it = json.object_value().find("children");
634 if (it == json.object_value().end()) {
635 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
636 "field:children error:required field not present"));
637 } else if (it->second.type() != Json::Type::OBJECT) {
638 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
639 "field:children error:type should be object"));
640 } else {
641 for (const auto& p : it->second.object_value()) {
642 const std::string& child_name = p.first;
643 if (child_name.empty()) {
644 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
645 "field:children element error: name cannot be empty"));
646 continue;
647 }
648 RefCountedPtr<LoadBalancingPolicy::Config> child_config;
649 std::vector<grpc_error*> child_errors =
650 ParseChildConfig(p.second, &child_config);
651 if (!child_errors.empty()) {
652 // Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error
653 // string is not static in this case.
654 grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
655 absl::StrCat("field:children name:", child_name).c_str());
656 for (grpc_error* child_error : child_errors) {
657 error = grpc_error_add_child(error, child_error);
658 }
659 error_list.push_back(error);
660 } else {
661 cluster_map[child_name] = std::move(child_config);
662 clusters_to_be_used.insert(child_name);
663 }
664 }
665 }
666 if (cluster_map.empty()) {
667 error_list.push_back(
668 GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid children configured"));
669 }
670 if (!error_list.empty()) {
671 *error = GRPC_ERROR_CREATE_FROM_VECTOR(
672 "xds_cluster_manager_experimental LB policy config", &error_list);
673 return nullptr;
674 }
675 return MakeRefCounted<XdsClusterManagerLbConfig>(std::move(cluster_map));
676 }
677
678 private:
ParseChildConfig(const Json & json,RefCountedPtr<LoadBalancingPolicy::Config> * child_config)679 static std::vector<grpc_error*> ParseChildConfig(
680 const Json& json,
681 RefCountedPtr<LoadBalancingPolicy::Config>* child_config) {
682 std::vector<grpc_error*> error_list;
683 if (json.type() != Json::Type::OBJECT) {
684 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
685 "value should be of type object"));
686 return error_list;
687 }
688 auto it = json.object_value().find("childPolicy");
689 if (it == json.object_value().end()) {
690 error_list.push_back(
691 GRPC_ERROR_CREATE_FROM_STATIC_STRING("did not find childPolicy"));
692 } else {
693 grpc_error* parse_error = GRPC_ERROR_NONE;
694 *child_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
695 it->second, &parse_error);
696 if (*child_config == nullptr) {
697 GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
698 std::vector<grpc_error*> child_errors;
699 child_errors.push_back(parse_error);
700 error_list.push_back(
701 GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
702 }
703 }
704 return error_list;
705 }
706 };
707
708 } // namespace
709
710 } // namespace grpc_core
711
712 //
713 // Plugin registration
714 //
715
grpc_lb_policy_xds_cluster_manager_init()716 void grpc_lb_policy_xds_cluster_manager_init() {
717 grpc_core::LoadBalancingPolicyRegistry::Builder::
718 RegisterLoadBalancingPolicyFactory(
719 absl::make_unique<grpc_core::XdsClusterManagerLbFactory>());
720 }
721
grpc_lb_policy_xds_cluster_manager_shutdown()722 void grpc_lb_policy_xds_cluster_manager_shutdown() {}
723