1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/impl/connectivity_state.h>
18 #include <grpc/support/port_platform.h>
19 #include <stddef.h>
20 #include <stdint.h>
21
22 #include <atomic>
23 #include <map>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 #include <vector>
28
29 #include "absl/base/thread_annotations.h"
30 #include "absl/log/check.h"
31 #include "absl/log/log.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/str_cat.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/types/optional.h"
37 #include "absl/types/variant.h"
38 #include "src/core/client_channel/client_channel_internal.h"
39 #include "src/core/config/core_configuration.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/debug/trace.h"
42 #include "src/core/lib/iomgr/pollset_set.h"
43 #include "src/core/lib/iomgr/resolved_address.h"
44 #include "src/core/lib/security/credentials/xds/xds_credentials.h"
45 #include "src/core/lib/transport/connectivity_state.h"
46 #include "src/core/load_balancing/backend_metric_data.h"
47 #include "src/core/load_balancing/child_policy_handler.h"
48 #include "src/core/load_balancing/delegating_helper.h"
49 #include "src/core/load_balancing/lb_policy.h"
50 #include "src/core/load_balancing/lb_policy_factory.h"
51 #include "src/core/load_balancing/lb_policy_registry.h"
52 #include "src/core/load_balancing/subchannel_interface.h"
53 #include "src/core/load_balancing/xds/xds_channel_args.h"
54 #include "src/core/resolver/endpoint_addresses.h"
55 #include "src/core/resolver/xds/xds_config.h"
56 #include "src/core/resolver/xds/xds_resolver_attributes.h"
57 #include "src/core/telemetry/call_tracer.h"
58 #include "src/core/util/debug_location.h"
59 #include "src/core/util/json/json.h"
60 #include "src/core/util/json/json_args.h"
61 #include "src/core/util/json/json_object_loader.h"
62 #include "src/core/util/match.h"
63 #include "src/core/util/orphanable.h"
64 #include "src/core/util/ref_counted.h"
65 #include "src/core/util/ref_counted_ptr.h"
66 #include "src/core/util/ref_counted_string.h"
67 #include "src/core/util/sync.h"
68 #include "src/core/util/validation_errors.h"
69 #include "src/core/xds/grpc/xds_bootstrap_grpc.h"
70 #include "src/core/xds/grpc/xds_client_grpc.h"
71 #include "src/core/xds/grpc/xds_endpoint.h"
72 #include "src/core/xds/xds_client/xds_bootstrap.h"
73 #include "src/core/xds/xds_client/xds_client.h"
74 #include "src/core/xds/xds_client/xds_locality.h"
75
76 namespace grpc_core {
77
78 namespace {
79
80 //
81 // global circuit breaker atomic map
82 //
83
84 class CircuitBreakerCallCounterMap final {
85 public:
86 using Key =
87 std::pair<std::string /*cluster*/, std::string /*eds_service_name*/>;
88
89 class CallCounter final : public RefCounted<CallCounter> {
90 public:
CallCounter(Key key)91 explicit CallCounter(Key key) : key_(std::move(key)) {}
92 ~CallCounter() override;
93
Load()94 uint32_t Load() {
95 return concurrent_requests_.load(std::memory_order_seq_cst);
96 }
Increment()97 uint32_t Increment() { return concurrent_requests_.fetch_add(1); }
Decrement()98 void Decrement() { concurrent_requests_.fetch_sub(1); }
99
100 private:
101 Key key_;
102 std::atomic<uint32_t> concurrent_requests_{0};
103 };
104
105 RefCountedPtr<CallCounter> GetOrCreate(const std::string& cluster,
106 const std::string& eds_service_name);
107
108 private:
109 Mutex mu_;
110 std::map<Key, CallCounter*> map_ ABSL_GUARDED_BY(mu_);
111 };
112
113 CircuitBreakerCallCounterMap* const g_call_counter_map =
114 new CircuitBreakerCallCounterMap;
115
116 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter>
GetOrCreate(const std::string & cluster,const std::string & eds_service_name)117 CircuitBreakerCallCounterMap::GetOrCreate(const std::string& cluster,
118 const std::string& eds_service_name) {
119 Key key(cluster, eds_service_name);
120 RefCountedPtr<CallCounter> result;
121 MutexLock lock(&mu_);
122 auto it = map_.find(key);
123 if (it == map_.end()) {
124 it = map_.insert({key, nullptr}).first;
125 } else {
126 result = it->second->RefIfNonZero();
127 }
128 if (result == nullptr) {
129 result = MakeRefCounted<CallCounter>(std::move(key));
130 it->second = result.get();
131 }
132 return result;
133 }
134
~CallCounter()135 CircuitBreakerCallCounterMap::CallCounter::~CallCounter() {
136 MutexLock lock(&g_call_counter_map->mu_);
137 auto it = g_call_counter_map->map_.find(key_);
138 if (it != g_call_counter_map->map_.end() && it->second == this) {
139 g_call_counter_map->map_.erase(it);
140 }
141 }
142
143 //
144 // LB policy
145 //
146
147 constexpr absl::string_view kXdsClusterImpl = "xds_cluster_impl_experimental";
148
149 // Config for xDS Cluster Impl LB policy.
150 class XdsClusterImplLbConfig final : public LoadBalancingPolicy::Config {
151 public:
152 XdsClusterImplLbConfig() = default;
153
154 XdsClusterImplLbConfig(const XdsClusterImplLbConfig&) = delete;
155 XdsClusterImplLbConfig& operator=(const XdsClusterImplLbConfig&) = delete;
156
157 XdsClusterImplLbConfig(XdsClusterImplLbConfig&& other) = delete;
158 XdsClusterImplLbConfig& operator=(XdsClusterImplLbConfig&& other) = delete;
159
name() const160 absl::string_view name() const override { return kXdsClusterImpl; }
161
cluster_name() const162 const std::string& cluster_name() const { return cluster_name_; }
child_policy() const163 RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
164 return child_policy_;
165 }
166
167 static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
168 void JsonPostLoad(const Json& json, const JsonArgs& args,
169 ValidationErrors* errors);
170
171 private:
172 std::string cluster_name_;
173 RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
174 };
175
176 // xDS Cluster Impl LB policy.
177 class XdsClusterImplLb final : public LoadBalancingPolicy {
178 public:
179 XdsClusterImplLb(RefCountedPtr<GrpcXdsClient> xds_client, Args args);
180
name() const181 absl::string_view name() const override { return kXdsClusterImpl; }
182
183 absl::Status UpdateLocked(UpdateArgs args) override;
184 void ExitIdleLocked() override;
185 void ResetBackoffLocked() override;
186
187 private:
188 class StatsSubchannelWrapper final : public DelegatingSubchannel {
189 public:
190 // If load reporting is enabled and we have a ClusterLocalityStats
191 // object, that object already contains the locality label. We
192 // need to store the locality label directly only in the case where
193 // load reporting is disabled.
194 using LocalityData = absl::variant<
195 RefCountedStringValue /*locality*/,
196 RefCountedPtr<LrsClient::ClusterLocalityStats> /*locality_stats*/>;
197
StatsSubchannelWrapper(RefCountedPtr<SubchannelInterface> wrapped_subchannel,LocalityData locality_data,absl::string_view hostname)198 StatsSubchannelWrapper(
199 RefCountedPtr<SubchannelInterface> wrapped_subchannel,
200 LocalityData locality_data, absl::string_view hostname)
201 : DelegatingSubchannel(std::move(wrapped_subchannel)),
202 locality_data_(std::move(locality_data)),
203 hostname_(grpc_event_engine::experimental::Slice::FromCopiedString(
204 hostname)) {}
205
locality() const206 RefCountedStringValue locality() const {
207 return Match(
208 locality_data_,
209 [](RefCountedStringValue locality) { return locality; },
210 [](const RefCountedPtr<LrsClient::ClusterLocalityStats>&
211 locality_stats) {
212 return locality_stats->locality_name()->human_readable_string();
213 });
214 }
215
locality_stats() const216 LrsClient::ClusterLocalityStats* locality_stats() const {
217 return Match(
218 locality_data_,
219 [](const RefCountedStringValue&) {
220 return static_cast<LrsClient::ClusterLocalityStats*>(nullptr);
221 },
222 [](const RefCountedPtr<LrsClient::ClusterLocalityStats>&
223 locality_stats) { return locality_stats.get(); });
224 }
225
hostname() const226 const grpc_event_engine::experimental::Slice& hostname() const {
227 return hostname_;
228 }
229
230 private:
231 LocalityData locality_data_;
232 grpc_event_engine::experimental::Slice hostname_;
233 };
234
235 // A picker that wraps the picker from the child to perform drops.
236 class Picker final : public SubchannelPicker {
237 public:
238 Picker(XdsClusterImplLb* xds_cluster_impl_lb,
239 RefCountedPtr<SubchannelPicker> picker);
240
241 PickResult Pick(PickArgs args) override;
242
243 private:
244 class SubchannelCallTracker;
245
246 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
247 uint32_t max_concurrent_requests_;
248 RefCountedStringValue service_telemetry_label_;
249 RefCountedStringValue namespace_telemetry_label_;
250 RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
251 RefCountedPtr<LrsClient::ClusterDropStats> drop_stats_;
252 RefCountedPtr<SubchannelPicker> picker_;
253 };
254
255 class Helper final
256 : public ParentOwningDelegatingChannelControlHelper<XdsClusterImplLb> {
257 public:
Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)258 explicit Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)
259 : ParentOwningDelegatingChannelControlHelper(
260 std::move(xds_cluster_impl_policy)) {}
261
262 RefCountedPtr<SubchannelInterface> CreateSubchannel(
263 const grpc_resolved_address& address,
264 const ChannelArgs& per_address_args, const ChannelArgs& args) override;
265 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
266 RefCountedPtr<SubchannelPicker> picker) override;
267 };
268
269 ~XdsClusterImplLb() override;
270
271 void ShutdownLocked() override;
272
273 void ResetState();
274 void ReportTransientFailure(absl::Status status);
275
276 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
277 const ChannelArgs& args);
278 absl::Status UpdateChildPolicyLocked(
279 absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
280 std::string resolution_note, const ChannelArgs& args);
281
282 absl::StatusOr<RefCountedPtr<XdsCertificateProvider>>
283 MaybeCreateCertificateProviderLocked(
284 const XdsClusterResource& cluster_resource) const;
285
286 void MaybeUpdatePickerLocked();
287
288 // Current config from the resolver.
289 RefCountedPtr<XdsClusterImplLbConfig> config_;
290 std::shared_ptr<const XdsClusterResource> cluster_resource_;
291 RefCountedStringValue service_telemetry_label_;
292 RefCountedStringValue namespace_telemetry_label_;
293 RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
294
295 // Current concurrent number of requests.
296 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
297
298 // Internal state.
299 bool shutting_down_ = false;
300
301 // The xds client.
302 RefCountedPtr<GrpcXdsClient> xds_client_;
303
304 // The stats for client-side load reporting.
305 RefCountedPtr<LrsClient::ClusterDropStats> drop_stats_;
306
307 OrphanablePtr<LoadBalancingPolicy> child_policy_;
308
309 // Latest state and picker reported by the child policy.
310 grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
311 absl::Status status_;
312 RefCountedPtr<SubchannelPicker> picker_;
313 };
314
315 //
316 // XdsClusterImplLb::Picker::SubchannelCallTracker
317 //
318
319 class XdsClusterImplLb::Picker::SubchannelCallTracker final
320 : public LoadBalancingPolicy::SubchannelCallTrackerInterface {
321 public:
SubchannelCallTracker(std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface> original_subchannel_call_tracker,RefCountedPtr<LrsClient::ClusterLocalityStats> locality_stats,RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter)322 SubchannelCallTracker(
323 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
324 original_subchannel_call_tracker,
325 RefCountedPtr<LrsClient::ClusterLocalityStats> locality_stats,
326 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter)
327 : original_subchannel_call_tracker_(
328 std::move(original_subchannel_call_tracker)),
329 locality_stats_(std::move(locality_stats)),
330 call_counter_(std::move(call_counter)) {}
331
~SubchannelCallTracker()332 ~SubchannelCallTracker() override {
333 locality_stats_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
334 call_counter_.reset(DEBUG_LOCATION, "SubchannelCallTracker");
335 #ifndef NDEBUG
336 DCHECK(!started_);
337 #endif
338 }
339
Start()340 void Start() override {
341 // Increment number of calls in flight.
342 call_counter_->Increment();
343 // Record a call started.
344 if (locality_stats_ != nullptr) {
345 locality_stats_->AddCallStarted();
346 }
347 // Delegate if needed.
348 if (original_subchannel_call_tracker_ != nullptr) {
349 original_subchannel_call_tracker_->Start();
350 }
351 #ifndef NDEBUG
352 started_ = true;
353 #endif
354 }
355
Finish(FinishArgs args)356 void Finish(FinishArgs args) override {
357 // Delegate if needed.
358 if (original_subchannel_call_tracker_ != nullptr) {
359 original_subchannel_call_tracker_->Finish(args);
360 }
361 // Record call completion for load reporting.
362 if (locality_stats_ != nullptr) {
363 locality_stats_->AddCallFinished(
364 args.backend_metric_accessor->GetBackendMetricData(),
365 !args.status.ok());
366 }
367 // Decrement number of calls in flight.
368 call_counter_->Decrement();
369 #ifndef NDEBUG
370 started_ = false;
371 #endif
372 }
373
374 private:
375 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
376 original_subchannel_call_tracker_;
377 RefCountedPtr<LrsClient::ClusterLocalityStats> locality_stats_;
378 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
379 #ifndef NDEBUG
380 bool started_ = false;
381 #endif
382 };
383
384 //
385 // XdsClusterImplLb::Picker
386 //
387
Picker(XdsClusterImplLb * xds_cluster_impl_lb,RefCountedPtr<SubchannelPicker> picker)388 XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
389 RefCountedPtr<SubchannelPicker> picker)
390 : call_counter_(xds_cluster_impl_lb->call_counter_),
391 max_concurrent_requests_(
392 xds_cluster_impl_lb->cluster_resource_->max_concurrent_requests),
393 service_telemetry_label_(xds_cluster_impl_lb->service_telemetry_label_),
394 namespace_telemetry_label_(
395 xds_cluster_impl_lb->namespace_telemetry_label_),
396 drop_config_(xds_cluster_impl_lb->drop_config_),
397 drop_stats_(xds_cluster_impl_lb->drop_stats_),
398 picker_(std::move(picker)) {
399 GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
400 << "[xds_cluster_impl_lb " << xds_cluster_impl_lb
401 << "] constructed new picker " << this;
402 }
403
Pick(LoadBalancingPolicy::PickArgs args)404 LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
405 LoadBalancingPolicy::PickArgs args) {
406 auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
407 auto* call_attempt_tracer = call_state->GetCallAttemptTracer();
408 if (call_attempt_tracer != nullptr) {
409 call_attempt_tracer->SetOptionalLabel(
410 ClientCallTracer::CallAttemptTracer::OptionalLabelKey::kXdsServiceName,
411 service_telemetry_label_);
412 call_attempt_tracer->SetOptionalLabel(
413 ClientCallTracer::CallAttemptTracer::OptionalLabelKey::
414 kXdsServiceNamespace,
415 namespace_telemetry_label_);
416 }
417 // Handle EDS drops.
418 const std::string* drop_category;
419 if (drop_config_ != nullptr && drop_config_->ShouldDrop(&drop_category)) {
420 if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category);
421 return PickResult::Drop(absl::UnavailableError(
422 absl::StrCat("EDS-configured drop: ", *drop_category)));
423 }
424 // Check if we exceeded the max concurrent requests circuit breaking limit.
425 // Note: We check the value here, but we don't actually increment the
426 // counter for the current request until the channel calls the subchannel
427 // call tracker's Start() method. This means that we may wind up
428 // allowing more concurrent requests than the configured limit.
429 if (call_counter_->Load() >= max_concurrent_requests_) {
430 if (drop_stats_ != nullptr) drop_stats_->AddUncategorizedDrops();
431 return PickResult::Drop(absl::UnavailableError("circuit breaker drop"));
432 }
433 // If we're not dropping the call, we should always have a child picker.
434 if (picker_ == nullptr) { // Should never happen.
435 return PickResult::Fail(absl::InternalError(
436 "xds_cluster_impl picker not given any child picker"));
437 }
438 // Not dropping, so delegate to child picker.
439 PickResult result = picker_->Pick(args);
440 auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
441 if (complete_pick != nullptr) {
442 auto* subchannel_wrapper =
443 static_cast<StatsSubchannelWrapper*>(complete_pick->subchannel.get());
444 // Add locality label to per-call metrics if needed.
445 if (call_attempt_tracer != nullptr) {
446 call_attempt_tracer->SetOptionalLabel(
447 ClientCallTracer::CallAttemptTracer::OptionalLabelKey::kLocality,
448 subchannel_wrapper->locality());
449 }
450 // Handle load reporting.
451 RefCountedPtr<LrsClient::ClusterLocalityStats> locality_stats;
452 if (subchannel_wrapper->locality_stats() != nullptr) {
453 locality_stats = subchannel_wrapper->locality_stats()->Ref(
454 DEBUG_LOCATION, "SubchannelCallTracker");
455 }
456 // Handle authority rewriting if needed.
457 if (!subchannel_wrapper->hostname().empty()) {
458 auto* route_state_attribute =
459 call_state->GetCallAttribute<XdsRouteStateAttribute>();
460 if (route_state_attribute != nullptr) {
461 auto* route_action =
462 absl::get_if<XdsRouteConfigResource::Route::RouteAction>(
463 &route_state_attribute->route().action);
464 if (route_action != nullptr && route_action->auto_host_rewrite) {
465 complete_pick->authority_override =
466 subchannel_wrapper->hostname().Ref();
467 }
468 }
469 }
470 // Unwrap subchannel to pass back up the stack.
471 complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel();
472 // Inject subchannel call tracker to record call completion.
473 complete_pick->subchannel_call_tracker =
474 std::make_unique<SubchannelCallTracker>(
475 std::move(complete_pick->subchannel_call_tracker),
476 std::move(locality_stats),
477 call_counter_->Ref(DEBUG_LOCATION, "SubchannelCallTracker"));
478 } else {
479 // TODO(roth): We should ideally also record call failures here in the case
480 // where a pick fails. This is challenging, because we don't know which
481 // picks are for wait_for_ready RPCs or how many times we'll return a
482 // failure for the same wait_for_ready RPC.
483 }
484 return result;
485 }
486
487 //
488 // XdsClusterImplLb
489 //
490
XdsClusterImplLb(RefCountedPtr<GrpcXdsClient> xds_client,Args args)491 XdsClusterImplLb::XdsClusterImplLb(RefCountedPtr<GrpcXdsClient> xds_client,
492 Args args)
493 : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
494 GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
495 << "[xds_cluster_impl_lb " << this << "] created -- using xds client "
496 << xds_client_.get();
497 }
498
~XdsClusterImplLb()499 XdsClusterImplLb::~XdsClusterImplLb() {
500 GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
501 << "[xds_cluster_impl_lb " << this
502 << "] destroying xds_cluster_impl LB policy";
503 }
504
ShutdownLocked()505 void XdsClusterImplLb::ShutdownLocked() {
506 GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
507 << "[xds_cluster_impl_lb " << this << "] shutting down";
508 shutting_down_ = true;
509 ResetState();
510 xds_client_.reset(DEBUG_LOCATION, "XdsClusterImpl");
511 }
512
ResetState()513 void XdsClusterImplLb::ResetState() {
514 // Remove the child policy's interested_parties pollset_set from the
515 // xDS policy.
516 if (child_policy_ != nullptr) {
517 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
518 interested_parties());
519 child_policy_.reset();
520 }
521 // Drop our ref to the child's picker, in case it's holding a ref to
522 // the child.
523 picker_.reset();
524 drop_stats_.reset();
525 }
526
ReportTransientFailure(absl::Status status)527 void XdsClusterImplLb::ReportTransientFailure(absl::Status status) {
528 GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
529 << "[xds_cluster_impl_lb " << this
530 << "] reporting TRANSIENT_FAILURE: " << status;
531 ResetState();
532 channel_control_helper()->UpdateState(
533 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
534 MakeRefCounted<TransientFailurePicker>(status));
535 }
536
ExitIdleLocked()537 void XdsClusterImplLb::ExitIdleLocked() {
538 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
539 }
540
ResetBackoffLocked()541 void XdsClusterImplLb::ResetBackoffLocked() {
542 // The XdsClient will have its backoff reset by the xds resolver, so we
543 // don't need to do it here.
544 if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
545 }
546
GetEdsResourceName(const XdsClusterResource & cluster_resource)547 std::string GetEdsResourceName(const XdsClusterResource& cluster_resource) {
548 auto* eds = absl::get_if<XdsClusterResource::Eds>(&cluster_resource.type);
549 if (eds == nullptr) return "";
550 return eds->eds_service_name;
551 }
552
UpdateLocked(UpdateArgs args)553 absl::Status XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
554 GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
555 << "[xds_cluster_impl_lb " << this << "] Received update";
556 // Grab new LB policy config.
557 auto new_config = args.config.TakeAsSubclass<XdsClusterImplLbConfig>();
558 // Cluster name should never change, because the cds policy will assign a
559 // different priority child name if that happens, which means that this
560 // policy instance will get replaced instead of being updated.
561 if (config_ != nullptr) {
562 CHECK(config_->cluster_name() == new_config->cluster_name());
563 }
564 // Get xDS config.
565 auto new_xds_config = args.args.GetObjectRef<XdsConfig>();
566 if (new_xds_config == nullptr) {
567 // Should never happen.
568 absl::Status status = absl::InternalError(
569 "xDS config not passed to xds_cluster_impl LB policy");
570 ReportTransientFailure(status);
571 return status;
572 }
573 auto it = new_xds_config->clusters.find(new_config->cluster_name());
574 if (it == new_xds_config->clusters.end() || !it->second.ok() ||
575 it->second->cluster == nullptr) {
576 // Should never happen.
577 absl::Status status = absl::InternalError(absl::StrCat(
578 "xDS config has no entry for cluster ", new_config->cluster_name()));
579 ReportTransientFailure(status);
580 return status;
581 }
582 auto& new_cluster_config = *it->second;
583 auto* endpoint_config =
584 absl::get_if<XdsConfig::ClusterConfig::EndpointConfig>(
585 &new_cluster_config.children);
586 if (endpoint_config == nullptr) {
587 // Should never happen.
588 absl::Status status = absl::InternalError(
589 absl::StrCat("cluster config for ", new_config->cluster_name(),
590 " has no endpoint config"));
591 ReportTransientFailure(status);
592 return status;
593 }
594 auto xds_cert_provider =
595 MaybeCreateCertificateProviderLocked(*new_cluster_config.cluster);
596 if (!xds_cert_provider.ok()) {
597 // Should never happen.
598 ReportTransientFailure(xds_cert_provider.status());
599 return xds_cert_provider.status();
600 }
601 if (*xds_cert_provider != nullptr) {
602 args.args = args.args.SetObject(std::move(*xds_cert_provider));
603 }
604 // Now we've verified the new config is good.
605 // Get new and old (if any) EDS service name.
606 std::string new_eds_service_name =
607 GetEdsResourceName(*new_cluster_config.cluster);
608 std::string old_eds_service_name =
609 cluster_resource_ == nullptr ? ""
610 : GetEdsResourceName(*cluster_resource_);
611 // Update drop stats if needed.
612 // Note: We need a drop stats object whenever load reporting is enabled,
613 // even if we have no EDS drop config, because we also use it when
614 // reporting circuit breaker drops.
615 if (new_cluster_config.cluster->lrs_load_reporting_server == nullptr) {
616 drop_stats_.reset();
617 } else if (cluster_resource_ == nullptr ||
618 old_eds_service_name != new_eds_service_name ||
619 !LrsServersEqual(
620 cluster_resource_->lrs_load_reporting_server,
621 new_cluster_config.cluster->lrs_load_reporting_server)) {
622 drop_stats_ = xds_client_->lrs_client().AddClusterDropStats(
623 new_cluster_config.cluster->lrs_load_reporting_server,
624 new_config->cluster_name(), new_eds_service_name);
625 if (drop_stats_ == nullptr) {
626 LOG(ERROR)
627 << "[xds_cluster_impl_lb " << this
628 << "] Failed to get cluster drop stats for LRS server "
629 << new_cluster_config.cluster->lrs_load_reporting_server->server_uri()
630 << ", cluster " << new_config->cluster_name() << ", EDS service name "
631 << new_eds_service_name
632 << ", load reporting for drops will not be done.";
633 }
634 }
635 // Update call counter if needed.
636 if (cluster_resource_ == nullptr ||
637 old_eds_service_name != new_eds_service_name) {
638 call_counter_ = g_call_counter_map->GetOrCreate(new_config->cluster_name(),
639 new_eds_service_name);
640 }
641 // Update config state, now that we're done comparing old and new fields.
642 config_ = std::move(new_config);
643 cluster_resource_ = new_cluster_config.cluster;
644 const XdsMetadataValue* metadata_value =
645 cluster_resource_->metadata.Find("com.google.csm.telemetry_labels");
646 if (metadata_value != nullptr &&
647 metadata_value->type() == XdsStructMetadataValue::Type()) {
648 const Json::Object& json_object =
649 DownCast<const XdsStructMetadataValue*>(metadata_value)
650 ->json()
651 .object();
652 auto it = json_object.find("service_name");
653 if (it != json_object.end() && it->second.type() == Json::Type::kString) {
654 service_telemetry_label_ = RefCountedStringValue(it->second.string());
655 }
656 it = json_object.find("service_namespace");
657 if (it != json_object.end() && it->second.type() == Json::Type::kString) {
658 namespace_telemetry_label_ = RefCountedStringValue(it->second.string());
659 }
660 }
661 drop_config_ = endpoint_config->endpoints != nullptr
662 ? endpoint_config->endpoints->drop_config
663 : nullptr;
664 // Update picker in case some dependent config field changed.
665 MaybeUpdatePickerLocked();
666 // Update child policy.
667 return UpdateChildPolicyLocked(std::move(args.addresses),
668 std::move(args.resolution_note), args.args);
669 }
670
671 absl::StatusOr<RefCountedPtr<XdsCertificateProvider>>
MaybeCreateCertificateProviderLocked(const XdsClusterResource & cluster_resource) const672 XdsClusterImplLb::MaybeCreateCertificateProviderLocked(
673 const XdsClusterResource& cluster_resource) const {
674 // If the channel is not using XdsCreds, do nothing.
675 auto channel_credentials = channel_control_helper()->GetChannelCredentials();
676 if (channel_credentials == nullptr ||
677 channel_credentials->type() != XdsCredentials::Type()) {
678 return nullptr;
679 }
680 // Configure root cert.
681 absl::string_view root_cert_name;
682 RefCountedPtr<grpc_tls_certificate_provider> root_cert_provider;
683 bool use_system_root_certs = false;
684 absl::Status status = Match(
685 cluster_resource.common_tls_context.certificate_validation_context
686 .ca_certs,
687 [](const absl::monostate&) {
688 // No root cert configured.
689 return absl::OkStatus();
690 },
691 [&](const CommonTlsContext::CertificateProviderPluginInstance&
692 cert_provider) {
693 root_cert_name = cert_provider.certificate_name;
694 root_cert_provider =
695 xds_client_->certificate_provider_store()
696 .CreateOrGetCertificateProvider(cert_provider.instance_name);
697 if (root_cert_provider == nullptr) {
698 return absl::InternalError(
699 absl::StrCat("Certificate provider instance name: \"",
700 cert_provider.instance_name, "\" not recognized."));
701 }
702 return absl::OkStatus();
703 },
704 [&](const CommonTlsContext::CertificateValidationContext::
705 SystemRootCerts&) {
706 use_system_root_certs = true;
707 return absl::OkStatus();
708 });
709 if (!status.ok()) return status;
710 // Configure identity cert.
711 absl::string_view identity_provider_instance_name =
712 cluster_resource.common_tls_context.tls_certificate_provider_instance
713 .instance_name;
714 absl::string_view identity_cert_name =
715 cluster_resource.common_tls_context.tls_certificate_provider_instance
716 .certificate_name;
717 RefCountedPtr<grpc_tls_certificate_provider> identity_cert_provider;
718 if (!identity_provider_instance_name.empty()) {
719 identity_cert_provider =
720 xds_client_->certificate_provider_store()
721 .CreateOrGetCertificateProvider(identity_provider_instance_name);
722 if (identity_cert_provider == nullptr) {
723 return absl::InternalError(
724 absl::StrCat("Certificate provider instance name: \"",
725 identity_provider_instance_name, "\" not recognized."));
726 }
727 }
728 // Configure SAN matchers.
729 const std::vector<StringMatcher>& san_matchers =
730 cluster_resource.common_tls_context.certificate_validation_context
731 .match_subject_alt_names;
732 // Create xds cert provider.
733 return MakeRefCounted<XdsCertificateProvider>(
734 std::move(root_cert_provider), root_cert_name, use_system_root_certs,
735 std::move(identity_cert_provider), identity_cert_name, san_matchers);
736 }
737
MaybeUpdatePickerLocked()738 void XdsClusterImplLb::MaybeUpdatePickerLocked() {
739 // If we're dropping all calls, report READY, regardless of what (or
740 // whether) the child has reported.
741 if (drop_config_ != nullptr && drop_config_->drop_all()) {
742 auto drop_picker = MakeRefCounted<Picker>(this, picker_);
743 GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
744 << "[xds_cluster_impl_lb " << this
745 << "] updating connectivity (drop all): state=READY picker="
746 << drop_picker.get();
747 channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
748 std::move(drop_picker));
749 return;
750 }
751 // Otherwise, update only if we have a child picker.
752 if (picker_ != nullptr) {
753 auto drop_picker = MakeRefCounted<Picker>(this, picker_);
754 GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
755 << "[xds_cluster_impl_lb " << this
756 << "] updating connectivity: state=" << ConnectivityStateName(state_)
757 << " status=(" << status_ << ") picker=" << drop_picker.get();
758 channel_control_helper()->UpdateState(state_, status_,
759 std::move(drop_picker));
760 }
761 }
762
CreateChildPolicyLocked(const ChannelArgs & args)763 OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
764 const ChannelArgs& args) {
765 LoadBalancingPolicy::Args lb_policy_args;
766 lb_policy_args.work_serializer = work_serializer();
767 lb_policy_args.args = args;
768 lb_policy_args.channel_control_helper = std::make_unique<Helper>(
769 RefAsSubclass<XdsClusterImplLb>(DEBUG_LOCATION, "Helper"));
770 OrphanablePtr<LoadBalancingPolicy> lb_policy =
771 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
772 &xds_cluster_impl_lb_trace);
773 GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
774 << "[xds_cluster_impl_lb " << this
775 << "] Created new child policy handler " << lb_policy.get();
776 // Add our interested_parties pollset_set to that of the newly created
777 // child policy. This will make the child policy progress upon activity on
778 // this policy, which in turn is tied to the application's call.
779 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
780 interested_parties());
781 return lb_policy;
782 }
783
UpdateChildPolicyLocked(absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,std::string resolution_note,const ChannelArgs & args)784 absl::Status XdsClusterImplLb::UpdateChildPolicyLocked(
785 absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
786 std::string resolution_note, const ChannelArgs& args) {
787 // Create policy if needed.
788 if (child_policy_ == nullptr) {
789 child_policy_ = CreateChildPolicyLocked(args);
790 }
791 // Construct update args.
792 UpdateArgs update_args;
793 update_args.addresses = std::move(addresses);
794 update_args.resolution_note = std::move(resolution_note);
795 update_args.config = config_->child_policy();
796 update_args.args =
797 args.Set(GRPC_ARG_XDS_CLUSTER_NAME, config_->cluster_name());
798 // Update the policy.
799 GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
800 << "[xds_cluster_impl_lb " << this << "] Updating child policy handler "
801 << child_policy_.get();
802 return child_policy_->UpdateLocked(std::move(update_args));
803 }
804
805 //
806 // XdsClusterImplLb::Helper
807 //
808
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)809 RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
810 const grpc_resolved_address& address, const ChannelArgs& per_address_args,
811 const ChannelArgs& args) {
812 if (parent()->shutting_down_) return nullptr;
813 // Wrap the subchannel so that we pass along the locality label and
814 // (if load reporting is enabled) the locality stats object, which
815 // will be used by the picker.
816 auto locality_name = per_address_args.GetObjectRef<XdsLocalityName>();
817 RefCountedPtr<LrsClient::ClusterLocalityStats> locality_stats;
818 if (parent()->cluster_resource_->lrs_load_reporting_server != nullptr) {
819 locality_stats =
820 parent()->xds_client_->lrs_client().AddClusterLocalityStats(
821 parent()->cluster_resource_->lrs_load_reporting_server,
822 parent()->config_->cluster_name(),
823 GetEdsResourceName(*parent()->cluster_resource_), locality_name,
824 parent()->cluster_resource_->lrs_backend_metric_propagation);
825 if (locality_stats == nullptr) {
826 LOG(ERROR)
827 << "[xds_cluster_impl_lb " << parent()
828 << "] Failed to get locality stats object for LRS server "
829 << parent()
830 ->cluster_resource_->lrs_load_reporting_server->server_uri()
831 << ", cluster " << parent()->config_->cluster_name()
832 << ", EDS service name "
833 << GetEdsResourceName(*parent()->cluster_resource_)
834 << "; load reports will not be generated";
835 }
836 }
837 StatsSubchannelWrapper::LocalityData locality_data;
838 if (locality_stats != nullptr) {
839 locality_data = std::move(locality_stats);
840 } else {
841 locality_data = locality_name->human_readable_string();
842 }
843 return MakeRefCounted<StatsSubchannelWrapper>(
844 parent()->channel_control_helper()->CreateSubchannel(
845 address, per_address_args, args),
846 std::move(locality_data),
847 per_address_args.GetString(GRPC_ARG_ADDRESS_NAME).value_or(""));
848 }
849
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<SubchannelPicker> picker)850 void XdsClusterImplLb::Helper::UpdateState(
851 grpc_connectivity_state state, const absl::Status& status,
852 RefCountedPtr<SubchannelPicker> picker) {
853 if (parent()->shutting_down_) return;
854 GRPC_TRACE_LOG(xds_cluster_impl_lb, INFO)
855 << "[xds_cluster_impl_lb " << parent()
856 << "] child connectivity state update: state="
857 << ConnectivityStateName(state) << " (" << status
858 << ") picker=" << picker.get();
859 // Save the state and picker.
860 parent()->state_ = state;
861 parent()->status_ = status;
862 parent()->picker_ = std::move(picker);
863 // Wrap the picker and return it to the channel.
864 parent()->MaybeUpdatePickerLocked();
865 }
866
867 //
868 // factory
869 //
870
JsonLoader(const JsonArgs &)871 const JsonLoaderInterface* XdsClusterImplLbConfig::JsonLoader(const JsonArgs&) {
872 static const auto* loader =
873 JsonObjectLoader<XdsClusterImplLbConfig>()
874 // Note: Some fields require custom processing, so they are
875 // handled in JsonPostLoad() instead.
876 .Field("clusterName", &XdsClusterImplLbConfig::cluster_name_)
877 .Finish();
878 return loader;
879 }
880
JsonPostLoad(const Json & json,const JsonArgs &,ValidationErrors * errors)881 void XdsClusterImplLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
882 ValidationErrors* errors) {
883 // Parse "childPolicy" field.
884 ValidationErrors::ScopedField field(errors, ".childPolicy");
885 auto it = json.object().find("childPolicy");
886 if (it == json.object().end()) {
887 errors->AddError("field not present");
888 } else {
889 auto lb_config =
890 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
891 it->second);
892 if (!lb_config.ok()) {
893 errors->AddError(lb_config.status().message());
894 } else {
895 child_policy_ = std::move(*lb_config);
896 }
897 }
898 }
899
900 class XdsClusterImplLbFactory final : public LoadBalancingPolicyFactory {
901 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const902 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
903 LoadBalancingPolicy::Args args) const override {
904 auto xds_client = args.args.GetObjectRef<GrpcXdsClient>(DEBUG_LOCATION,
905 "XdsClusterImplLb");
906 if (xds_client == nullptr) {
907 LOG(ERROR) << "XdsClient not present in channel args -- cannot "
908 "instantiate xds_cluster_impl LB policy";
909 return nullptr;
910 }
911 return MakeOrphanable<XdsClusterImplLb>(std::move(xds_client),
912 std::move(args));
913 }
914
name() const915 absl::string_view name() const override { return kXdsClusterImpl; }
916
917 absl::StatusOr<RefCountedPtr<LoadBalancingPolicy::Config>>
ParseLoadBalancingConfig(const Json & json) const918 ParseLoadBalancingConfig(const Json& json) const override {
919 return LoadFromJson<RefCountedPtr<XdsClusterImplLbConfig>>(
920 json, JsonArgs(),
921 "errors validating xds_cluster_impl LB policy config");
922 }
923 };
924
925 } // namespace
926
RegisterXdsClusterImplLbPolicy(CoreConfiguration::Builder * builder)927 void RegisterXdsClusterImplLbPolicy(CoreConfiguration::Builder* builder) {
928 builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory(
929 std::make_unique<XdsClusterImplLbFactory>());
930 }
931
932 } // namespace grpc_core
933