• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "absl/strings/string_view.h"
20 
21 #include <grpc/grpc.h>
22 
23 #include "src/core/ext/filters/client_channel/lb_policy.h"
24 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
25 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
26 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
27 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
28 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
29 #include "src/core/ext/xds/xds_client.h"
30 #include "src/core/ext/xds/xds_client_stats.h"
31 #include "src/core/lib/channel/channel_args.h"
32 #include "src/core/lib/gpr/env.h"
33 #include "src/core/lib/gpr/string.h"
34 #include "src/core/lib/gprpp/orphanable.h"
35 #include "src/core/lib/gprpp/ref_counted_ptr.h"
36 #include "src/core/lib/gprpp/sync.h"
37 #include "src/core/lib/iomgr/work_serializer.h"
38 
39 namespace grpc_core {
40 
41 TraceFlag grpc_xds_cluster_impl_lb_trace(false, "xds_cluster_impl_lb");
42 
43 namespace {
44 
45 //
46 // global circuit breaker atomic map
47 //
48 
49 class CircuitBreakerCallCounterMap {
50  public:
51   using Key =
52       std::pair<std::string /*cluster*/, std::string /*eds_service_name*/>;
53 
54   class CallCounter : public RefCounted<CallCounter> {
55    public:
CallCounter(Key key)56     explicit CallCounter(Key key) : key_(std::move(key)) {}
57     ~CallCounter() override;
58 
Increment()59     uint32_t Increment() { return concurrent_requests_.FetchAdd(1); }
Decrement()60     void Decrement() { concurrent_requests_.FetchSub(1); }
61 
62    private:
63     Key key_;
64     Atomic<uint32_t> concurrent_requests_{0};
65   };
66 
67   RefCountedPtr<CallCounter> GetOrCreate(const std::string& cluster,
68                                          const std::string& eds_service_name);
69 
70  private:
71   Mutex mu_;
72   std::map<Key, CallCounter*> map_;
73 };
74 
75 CircuitBreakerCallCounterMap* g_call_counter_map = nullptr;
76 
77 RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter>
GetOrCreate(const std::string & cluster,const std::string & eds_service_name)78 CircuitBreakerCallCounterMap::GetOrCreate(const std::string& cluster,
79                                           const std::string& eds_service_name) {
80   Key key(cluster, eds_service_name);
81   RefCountedPtr<CallCounter> result;
82   MutexLock lock(&mu_);
83   auto it = map_.find(key);
84   if (it == map_.end()) {
85     it = map_.insert({key, nullptr}).first;
86   } else {
87     result = it->second->RefIfNonZero();
88   }
89   if (result == nullptr) {
90     result = MakeRefCounted<CallCounter>(std::move(key));
91     it->second = result.get();
92   }
93   return result;
94 }
95 
~CallCounter()96 CircuitBreakerCallCounterMap::CallCounter::~CallCounter() {
97   MutexLock lock(&g_call_counter_map->mu_);
98   auto it = g_call_counter_map->map_.find(key_);
99   if (it != g_call_counter_map->map_.end() && it->second == this) {
100     g_call_counter_map->map_.erase(it);
101   }
102 }
103 
104 //
105 // LB policy
106 //
107 
108 constexpr char kXdsClusterImpl[] = "xds_cluster_impl_experimental";
109 
110 // TODO (donnadionne): Check to see if circuit breaking is enabled, this will be
111 // removed once circuit breaking feature is fully integrated and enabled by
112 // default.
XdsCircuitBreakingEnabled()113 bool XdsCircuitBreakingEnabled() {
114   char* value = gpr_getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING");
115   bool parsed_value;
116   bool parse_succeeded = gpr_parse_bool_value(value, &parsed_value);
117   gpr_free(value);
118   return parse_succeeded && parsed_value;
119 }
120 
121 // Config for xDS Cluster Impl LB policy.
122 class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config {
123  public:
XdsClusterImplLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy,std::string cluster_name,std::string eds_service_name,absl::optional<std::string> lrs_load_reporting_server_name,uint32_t max_concurrent_requests,RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config)124   XdsClusterImplLbConfig(
125       RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
126       std::string cluster_name, std::string eds_service_name,
127       absl::optional<std::string> lrs_load_reporting_server_name,
128       uint32_t max_concurrent_requests,
129       RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config)
130       : child_policy_(std::move(child_policy)),
131         cluster_name_(std::move(cluster_name)),
132         eds_service_name_(std::move(eds_service_name)),
133         lrs_load_reporting_server_name_(
134             std::move(lrs_load_reporting_server_name)),
135         max_concurrent_requests_(max_concurrent_requests),
136         drop_config_(std::move(drop_config)) {}
137 
name() const138   const char* name() const override { return kXdsClusterImpl; }
139 
child_policy() const140   RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
141     return child_policy_;
142   }
cluster_name() const143   const std::string& cluster_name() const { return cluster_name_; }
eds_service_name() const144   const std::string& eds_service_name() const { return eds_service_name_; }
lrs_load_reporting_server_name() const145   const absl::optional<std::string>& lrs_load_reporting_server_name() const {
146     return lrs_load_reporting_server_name_;
147   };
max_concurrent_requests() const148   uint32_t max_concurrent_requests() const { return max_concurrent_requests_; }
drop_config() const149   RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config() const {
150     return drop_config_;
151   }
152 
153  private:
154   RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
155   std::string cluster_name_;
156   std::string eds_service_name_;
157   absl::optional<std::string> lrs_load_reporting_server_name_;
158   uint32_t max_concurrent_requests_;
159   RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
160 };
161 
162 // xDS Cluster Impl LB policy.
163 class XdsClusterImplLb : public LoadBalancingPolicy {
164  public:
165   XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client, Args args);
166 
name() const167   const char* name() const override { return kXdsClusterImpl; }
168 
169   void UpdateLocked(UpdateArgs args) override;
170   void ExitIdleLocked() override;
171   void ResetBackoffLocked() override;
172 
173  private:
174   class StatsSubchannelWrapper : public DelegatingSubchannel {
175    public:
StatsSubchannelWrapper(RefCountedPtr<SubchannelInterface> wrapped_subchannel,RefCountedPtr<XdsClusterLocalityStats> locality_stats)176     StatsSubchannelWrapper(
177         RefCountedPtr<SubchannelInterface> wrapped_subchannel,
178         RefCountedPtr<XdsClusterLocalityStats> locality_stats)
179         : DelegatingSubchannel(std::move(wrapped_subchannel)),
180           locality_stats_(std::move(locality_stats)) {}
181 
locality_stats() const182     XdsClusterLocalityStats* locality_stats() const {
183       return locality_stats_.get();
184     }
185 
186    private:
187     RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
188   };
189 
190   // A simple wrapper for ref-counting a picker from the child policy.
191   class RefCountedPicker : public RefCounted<RefCountedPicker> {
192    public:
RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)193     explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
194         : picker_(std::move(picker)) {}
Pick(PickArgs args)195     PickResult Pick(PickArgs args) { return picker_->Pick(args); }
196 
197    private:
198     std::unique_ptr<SubchannelPicker> picker_;
199   };
200 
201   // A picker that wraps the picker from the child to perform drops.
202   class Picker : public SubchannelPicker {
203    public:
204     Picker(XdsClusterImplLb* xds_cluster_impl_lb,
205            RefCountedPtr<RefCountedPicker> picker);
206 
207     PickResult Pick(PickArgs args) override;
208 
209    private:
210     RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
211     bool xds_circuit_breaking_enabled_;
212     uint32_t max_concurrent_requests_;
213     RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config_;
214     RefCountedPtr<XdsClusterDropStats> drop_stats_;
215     RefCountedPtr<RefCountedPicker> picker_;
216   };
217 
218   class Helper : public ChannelControlHelper {
219    public:
Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)220     explicit Helper(RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy)
221         : xds_cluster_impl_policy_(std::move(xds_cluster_impl_policy)) {}
222 
~Helper()223     ~Helper() override {
224       xds_cluster_impl_policy_.reset(DEBUG_LOCATION, "Helper");
225     }
226 
227     RefCountedPtr<SubchannelInterface> CreateSubchannel(
228         ServerAddress address, const grpc_channel_args& args) override;
229     void UpdateState(grpc_connectivity_state state, const absl::Status& status,
230                      std::unique_ptr<SubchannelPicker> picker) override;
231     void RequestReresolution() override;
232     void AddTraceEvent(TraceSeverity severity,
233                        absl::string_view message) override;
234 
235    private:
236     RefCountedPtr<XdsClusterImplLb> xds_cluster_impl_policy_;
237   };
238 
239   ~XdsClusterImplLb() override;
240 
241   void ShutdownLocked() override;
242 
243   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
244       const grpc_channel_args* args);
245   void UpdateChildPolicyLocked(ServerAddressList addresses,
246                                const grpc_channel_args* args);
247 
248   void MaybeUpdatePickerLocked();
249 
250   // Current config from the resolver.
251   RefCountedPtr<XdsClusterImplLbConfig> config_;
252 
253   // Current concurrent number of requests.
254   RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
255 
256   // Internal state.
257   bool shutting_down_ = false;
258 
259   // The xds client.
260   RefCountedPtr<XdsClient> xds_client_;
261 
262   // The stats for client-side load reporting.
263   RefCountedPtr<XdsClusterDropStats> drop_stats_;
264 
265   OrphanablePtr<LoadBalancingPolicy> child_policy_;
266 
267   // Latest state and picker reported by the child policy.
268   grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
269   absl::Status status_;
270   RefCountedPtr<RefCountedPicker> picker_;
271 };
272 
273 //
274 // XdsClusterImplLb::Picker
275 //
276 
Picker(XdsClusterImplLb * xds_cluster_impl_lb,RefCountedPtr<RefCountedPicker> picker)277 XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
278                                  RefCountedPtr<RefCountedPicker> picker)
279     : call_counter_(xds_cluster_impl_lb->call_counter_),
280       xds_circuit_breaking_enabled_(XdsCircuitBreakingEnabled()),
281       max_concurrent_requests_(
282           xds_cluster_impl_lb->config_->max_concurrent_requests()),
283       drop_config_(xds_cluster_impl_lb->config_->drop_config()),
284       drop_stats_(xds_cluster_impl_lb->drop_stats_),
285       picker_(std::move(picker)) {
286   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
287     gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] constructed new picker %p",
288             xds_cluster_impl_lb, this);
289   }
290 }
291 
Pick(LoadBalancingPolicy::PickArgs args)292 LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
293     LoadBalancingPolicy::PickArgs args) {
294   // Handle EDS drops.
295   const std::string* drop_category;
296   if (drop_config_->ShouldDrop(&drop_category)) {
297     if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category);
298     PickResult result;
299     result.type = PickResult::PICK_COMPLETE;
300     return result;
301   }
302   // Handle circuit breaking.
303   uint32_t current = call_counter_->Increment();
304   if (xds_circuit_breaking_enabled_) {
305     // Check and see if we exceeded the max concurrent requests count.
306     if (current >= max_concurrent_requests_) {
307       call_counter_->Decrement();
308       if (drop_stats_ != nullptr) drop_stats_->AddUncategorizedDrops();
309       PickResult result;
310       result.type = PickResult::PICK_COMPLETE;
311       return result;
312     }
313   }
314   // If we're not dropping the call, we should always have a child picker.
315   if (picker_ == nullptr) {  // Should never happen.
316     PickResult result;
317     result.type = PickResult::PICK_FAILED;
318     result.error = grpc_error_set_int(
319         GRPC_ERROR_CREATE_FROM_STATIC_STRING(
320             "xds_cluster_impl picker not given any child picker"),
321         GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
322     call_counter_->Decrement();
323     return result;
324   }
325   // Not dropping, so delegate to child picker.
326   PickResult result = picker_->Pick(args);
327   if (result.type == result.PICK_COMPLETE && result.subchannel != nullptr) {
328     XdsClusterLocalityStats* locality_stats = nullptr;
329     if (drop_stats_ != nullptr) {  // If load reporting is enabled.
330       auto* subchannel_wrapper =
331           static_cast<StatsSubchannelWrapper*>(result.subchannel.get());
332       // Handle load reporting.
333       locality_stats = subchannel_wrapper->locality_stats()->Ref().release();
334       // Record a call started.
335       locality_stats->AddCallStarted();
336       // Unwrap subchannel to pass back up the stack.
337       result.subchannel = subchannel_wrapper->wrapped_subchannel();
338     }
339     // Intercept the recv_trailing_metadata op to record call completion.
340     auto* call_counter = call_counter_->Ref(DEBUG_LOCATION, "call").release();
341     auto original_recv_trailing_metadata_ready =
342         result.recv_trailing_metadata_ready;
343     result.recv_trailing_metadata_ready =
344         // Note: This callback does not run in either the control plane
345         // work serializer or in the data plane mutex.
346         [locality_stats, original_recv_trailing_metadata_ready, call_counter](
347             grpc_error* error, MetadataInterface* metadata,
348             CallState* call_state) {
349           // Record call completion for load reporting.
350           if (locality_stats != nullptr) {
351             const bool call_failed = error != GRPC_ERROR_NONE;
352             locality_stats->AddCallFinished(call_failed);
353             locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
354           }
355           // Decrement number of calls in flight.
356           call_counter->Decrement();
357           call_counter->Unref(DEBUG_LOCATION, "call");
358           // Invoke the original recv_trailing_metadata_ready callback, if any.
359           if (original_recv_trailing_metadata_ready != nullptr) {
360             original_recv_trailing_metadata_ready(error, metadata, call_state);
361           }
362         };
363   } else {
364     // TODO(roth): We should ideally also record call failures here in the case
365     // where a pick fails.  This is challenging, because we don't know which
366     // picks are for wait_for_ready RPCs or how many times we'll return a
367     // failure for the same wait_for_ready RPC.
368     call_counter_->Decrement();
369   }
370   return result;
371 }
372 
373 //
374 // XdsClusterImplLb
375 //
376 
XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client,Args args)377 XdsClusterImplLb::XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client,
378                                    Args args)
379     : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
380   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
381     gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] created -- using xds client %p",
382             this, xds_client_.get());
383   }
384 }
385 
~XdsClusterImplLb()386 XdsClusterImplLb::~XdsClusterImplLb() {
387   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
388     gpr_log(GPR_INFO,
389             "[xds_cluster_impl_lb %p] destroying xds_cluster_impl LB policy",
390             this);
391   }
392 }
393 
ShutdownLocked()394 void XdsClusterImplLb::ShutdownLocked() {
395   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
396     gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] shutting down", this);
397   }
398   shutting_down_ = true;
399   // Remove the child policy's interested_parties pollset_set from the
400   // xDS policy.
401   if (child_policy_ != nullptr) {
402     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
403                                      interested_parties());
404     child_policy_.reset();
405   }
406   // Drop our ref to the child's picker, in case it's holding a ref to
407   // the child.
408   picker_.reset();
409   drop_stats_.reset();
410   xds_client_.reset();
411 }
412 
ExitIdleLocked()413 void XdsClusterImplLb::ExitIdleLocked() {
414   if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
415 }
416 
ResetBackoffLocked()417 void XdsClusterImplLb::ResetBackoffLocked() {
418   // The XdsClient will have its backoff reset by the xds resolver, so we
419   // don't need to do it here.
420   if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
421 }
422 
UpdateLocked(UpdateArgs args)423 void XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
424   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
425     gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] Received update", this);
426   }
427   // Update config.
428   const bool is_initial_update = config_ == nullptr;
429   auto old_config = std::move(config_);
430   config_ = std::move(args.config);
431   // On initial update, create drop stats.
432   if (is_initial_update) {
433     if (config_->lrs_load_reporting_server_name().has_value()) {
434       drop_stats_ = xds_client_->AddClusterDropStats(
435           config_->lrs_load_reporting_server_name().value(),
436           config_->cluster_name(), config_->eds_service_name());
437     }
438     call_counter_ = g_call_counter_map->GetOrCreate(
439         config_->cluster_name(), config_->eds_service_name());
440   } else {
441     // Cluster name, EDS service name, and LRS server name should never
442     // change, because the EDS policy above us should be swapped out if
443     // that happens.
444     GPR_ASSERT(config_->cluster_name() == old_config->cluster_name());
445     GPR_ASSERT(config_->eds_service_name() == old_config->eds_service_name());
446     GPR_ASSERT(config_->lrs_load_reporting_server_name() ==
447                old_config->lrs_load_reporting_server_name());
448   }
449   // Update picker if max_concurrent_requests has changed.
450   if (is_initial_update || config_->max_concurrent_requests() !=
451                                old_config->max_concurrent_requests()) {
452     MaybeUpdatePickerLocked();
453   }
454   // Update child policy.
455   UpdateChildPolicyLocked(std::move(args.addresses), args.args);
456 }
457 
MaybeUpdatePickerLocked()458 void XdsClusterImplLb::MaybeUpdatePickerLocked() {
459   // If we're dropping all calls, report READY, regardless of what (or
460   // whether) the child has reported.
461   if (config_->drop_config() != nullptr && config_->drop_config()->drop_all()) {
462     auto drop_picker = absl::make_unique<Picker>(this, picker_);
463     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
464       gpr_log(GPR_INFO,
465               "[xds_cluster_impl_lb %p] updating connectivity (drop all): "
466               "state=READY "
467               "picker=%p",
468               this, drop_picker.get());
469     }
470     channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
471                                           std::move(drop_picker));
472     return;
473   }
474   // Otherwise, update only if we have a child picker.
475   if (picker_ != nullptr) {
476     auto drop_picker = absl::make_unique<Picker>(this, picker_);
477     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
478       gpr_log(GPR_INFO,
479               "[xds_cluster_impl_lb %p] updating connectivity: state=%s "
480               "status=(%s) "
481               "picker=%p",
482               this, ConnectivityStateName(state_), status_.ToString().c_str(),
483               drop_picker.get());
484     }
485     channel_control_helper()->UpdateState(state_, status_,
486                                           std::move(drop_picker));
487   }
488 }
489 
CreateChildPolicyLocked(const grpc_channel_args * args)490 OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
491     const grpc_channel_args* args) {
492   LoadBalancingPolicy::Args lb_policy_args;
493   lb_policy_args.work_serializer = work_serializer();
494   lb_policy_args.args = args;
495   lb_policy_args.channel_control_helper =
496       absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
497   OrphanablePtr<LoadBalancingPolicy> lb_policy =
498       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
499                                          &grpc_xds_cluster_impl_lb_trace);
500   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
501     gpr_log(GPR_INFO,
502             "[xds_cluster_impl_lb %p] Created new child policy handler %p",
503             this, lb_policy.get());
504   }
505   // Add our interested_parties pollset_set to that of the newly created
506   // child policy. This will make the child policy progress upon activity on
507   // this policy, which in turn is tied to the application's call.
508   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
509                                    interested_parties());
510   return lb_policy;
511 }
512 
UpdateChildPolicyLocked(ServerAddressList addresses,const grpc_channel_args * args)513 void XdsClusterImplLb::UpdateChildPolicyLocked(ServerAddressList addresses,
514                                                const grpc_channel_args* args) {
515   // Create policy if needed.
516   if (child_policy_ == nullptr) {
517     child_policy_ = CreateChildPolicyLocked(args);
518   }
519   // Construct update args.
520   UpdateArgs update_args;
521   update_args.addresses = std::move(addresses);
522   update_args.config = config_->child_policy();
523   grpc_arg cluster_arg = grpc_channel_arg_string_create(
524       const_cast<char*>(GRPC_ARG_XDS_CLUSTER_NAME),
525       const_cast<char*>(config_->cluster_name().c_str()));
526   update_args.args = grpc_channel_args_copy_and_add(args, &cluster_arg, 1);
527   // Update the policy.
528   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
529     gpr_log(GPR_INFO,
530             "[xds_cluster_impl_lb %p] Updating child policy handler %p", this,
531             child_policy_.get());
532   }
533   child_policy_->UpdateLocked(std::move(update_args));
534 }
535 
536 //
537 // XdsClusterImplLb::Helper
538 //
539 
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)540 RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
541     ServerAddress address, const grpc_channel_args& args) {
542   if (xds_cluster_impl_policy_->shutting_down_) return nullptr;
543   // If load reporting is enabled, wrap the subchannel such that it
544   // includes the locality stats object, which will be used by the EdsPicker.
545   if (xds_cluster_impl_policy_->config_->lrs_load_reporting_server_name()
546           .has_value()) {
547     RefCountedPtr<XdsLocalityName> locality_name;
548     auto* attribute = address.GetAttribute(kXdsLocalityNameAttributeKey);
549     if (attribute != nullptr) {
550       const auto* locality_attr =
551           static_cast<const XdsLocalityAttribute*>(attribute);
552       locality_name = locality_attr->locality_name();
553     }
554     RefCountedPtr<XdsClusterLocalityStats> locality_stats =
555         xds_cluster_impl_policy_->xds_client_->AddClusterLocalityStats(
556             *xds_cluster_impl_policy_->config_
557                  ->lrs_load_reporting_server_name(),
558             xds_cluster_impl_policy_->config_->cluster_name(),
559             xds_cluster_impl_policy_->config_->eds_service_name(),
560             std::move(locality_name));
561     return MakeRefCounted<StatsSubchannelWrapper>(
562         xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel(
563             std::move(address), args),
564         std::move(locality_stats));
565   }
566   // Load reporting not enabled, so don't wrap the subchannel.
567   return xds_cluster_impl_policy_->channel_control_helper()->CreateSubchannel(
568       std::move(address), args);
569 }
570 
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)571 void XdsClusterImplLb::Helper::UpdateState(
572     grpc_connectivity_state state, const absl::Status& status,
573     std::unique_ptr<SubchannelPicker> picker) {
574   if (xds_cluster_impl_policy_->shutting_down_) return;
575   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
576     gpr_log(GPR_INFO,
577             "[xds_cluster_impl_lb %p] child connectivity state update: "
578             "state=%s (%s) "
579             "picker=%p",
580             xds_cluster_impl_policy_.get(), ConnectivityStateName(state),
581             status.ToString().c_str(), picker.get());
582   }
583   // Save the state and picker.
584   xds_cluster_impl_policy_->state_ = state;
585   xds_cluster_impl_policy_->status_ = status;
586   xds_cluster_impl_policy_->picker_ =
587       MakeRefCounted<RefCountedPicker>(std::move(picker));
588   // Wrap the picker and return it to the channel.
589   xds_cluster_impl_policy_->MaybeUpdatePickerLocked();
590 }
591 
RequestReresolution()592 void XdsClusterImplLb::Helper::RequestReresolution() {
593   if (xds_cluster_impl_policy_->shutting_down_) return;
594   xds_cluster_impl_policy_->channel_control_helper()->RequestReresolution();
595 }
596 
AddTraceEvent(TraceSeverity severity,absl::string_view message)597 void XdsClusterImplLb::Helper::AddTraceEvent(TraceSeverity severity,
598                                              absl::string_view message) {
599   if (xds_cluster_impl_policy_->shutting_down_) return;
600   xds_cluster_impl_policy_->channel_control_helper()->AddTraceEvent(severity,
601                                                                     message);
602 }
603 
604 //
605 // factory
606 //
607 
608 class XdsClusterImplLbFactory : public LoadBalancingPolicyFactory {
609  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const610   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
611       LoadBalancingPolicy::Args args) const override {
612     grpc_error* error = GRPC_ERROR_NONE;
613     RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
614     if (error != GRPC_ERROR_NONE) {
615       gpr_log(
616           GPR_ERROR,
617           "cannot get XdsClient to instantiate xds_cluster_impl LB policy: %s",
618           grpc_error_string(error));
619       GRPC_ERROR_UNREF(error);
620       return nullptr;
621     }
622     return MakeOrphanable<XdsClusterImplLb>(std::move(xds_client),
623                                             std::move(args));
624   }
625 
name() const626   const char* name() const override { return kXdsClusterImpl; }
627 
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const628   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
629       const Json& json, grpc_error** error) const override {
630     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
631     if (json.type() == Json::Type::JSON_NULL) {
632       // This policy was configured in the deprecated loadBalancingPolicy
633       // field or in the client API.
634       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
635           "field:loadBalancingPolicy error:xds_cluster_impl policy requires "
636           "configuration. Please use loadBalancingConfig field of service "
637           "config instead.");
638       return nullptr;
639     }
640     std::vector<grpc_error*> error_list;
641     // Child policy.
642     RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
643     auto it = json.object_value().find("childPolicy");
644     if (it == json.object_value().end()) {
645       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
646           "field:childPolicy error:required field missing"));
647     } else {
648       grpc_error* parse_error = GRPC_ERROR_NONE;
649       child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
650           it->second, &parse_error);
651       if (child_policy == nullptr) {
652         GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
653         std::vector<grpc_error*> child_errors;
654         child_errors.push_back(parse_error);
655         error_list.push_back(
656             GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
657       }
658     }
659     // Cluster name.
660     std::string cluster_name;
661     it = json.object_value().find("clusterName");
662     if (it == json.object_value().end()) {
663       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
664           "field:clusterName error:required field missing"));
665     } else if (it->second.type() != Json::Type::STRING) {
666       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
667           "field:clusterName error:type should be string"));
668     } else {
669       cluster_name = it->second.string_value();
670     }
671     // EDS service name.
672     std::string eds_service_name;
673     it = json.object_value().find("edsServiceName");
674     if (it != json.object_value().end()) {
675       if (it->second.type() != Json::Type::STRING) {
676         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
677             "field:edsServiceName error:type should be string"));
678       } else {
679         eds_service_name = it->second.string_value();
680       }
681     }
682     // LRS load reporting server name.
683     absl::optional<std::string> lrs_load_reporting_server_name;
684     it = json.object_value().find("lrsLoadReportingServerName");
685     if (it != json.object_value().end()) {
686       if (it->second.type() != Json::Type::STRING) {
687         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
688             "field:lrsLoadReportingServerName error:type should be string"));
689       } else {
690         lrs_load_reporting_server_name = it->second.string_value();
691       }
692     }
693     // Max concurrent requests.
694     uint32_t max_concurrent_requests = 1024;
695     it = json.object_value().find("maxConcurrentRequests");
696     if (it != json.object_value().end()) {
697       if (it->second.type() != Json::Type::NUMBER) {
698         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
699             "field:max_concurrent_requests error:must be of type number"));
700       } else {
701         max_concurrent_requests =
702             gpr_parse_nonnegative_int(it->second.string_value().c_str());
703       }
704     }
705     // Drop config.
706     auto drop_config = MakeRefCounted<XdsApi::EdsUpdate::DropConfig>();
707     it = json.object_value().find("dropCategories");
708     if (it == json.object_value().end()) {
709       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
710           "field:dropCategories error:required field missing"));
711     } else {
712       std::vector<grpc_error*> child_errors =
713           ParseDropCategories(it->second, drop_config.get());
714       if (!child_errors.empty()) {
715         error_list.push_back(GRPC_ERROR_CREATE_FROM_VECTOR(
716             "field:dropCategories", &child_errors));
717       }
718     }
719     if (!error_list.empty()) {
720       *error = GRPC_ERROR_CREATE_FROM_VECTOR(
721           "xds_cluster_impl_experimental LB policy config", &error_list);
722       return nullptr;
723     }
724     return MakeRefCounted<XdsClusterImplLbConfig>(
725         std::move(child_policy), std::move(cluster_name),
726         std::move(eds_service_name), std::move(lrs_load_reporting_server_name),
727         max_concurrent_requests, std::move(drop_config));
728   }
729 
730  private:
ParseDropCategories(const Json & json,XdsApi::EdsUpdate::DropConfig * drop_config)731   static std::vector<grpc_error*> ParseDropCategories(
732       const Json& json, XdsApi::EdsUpdate::DropConfig* drop_config) {
733     std::vector<grpc_error*> error_list;
734     if (json.type() != Json::Type::ARRAY) {
735       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
736           "dropCategories field is not an array"));
737       return error_list;
738     }
739     for (size_t i = 0; i < json.array_value().size(); ++i) {
740       const Json& entry = json.array_value()[i];
741       std::vector<grpc_error*> child_errors =
742           ParseDropCategory(entry, drop_config);
743       if (!child_errors.empty()) {
744         grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
745             absl::StrCat("errors parsing index ", i).c_str());
746         for (size_t i = 0; i < child_errors.size(); ++i) {
747           error = grpc_error_add_child(error, child_errors[i]);
748         }
749         error_list.push_back(error);
750       }
751     }
752     return error_list;
753   }
754 
ParseDropCategory(const Json & json,XdsApi::EdsUpdate::DropConfig * drop_config)755   static std::vector<grpc_error*> ParseDropCategory(
756       const Json& json, XdsApi::EdsUpdate::DropConfig* drop_config) {
757     std::vector<grpc_error*> error_list;
758     if (json.type() != Json::Type::OBJECT) {
759       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
760           "dropCategories entry is not an object"));
761       return error_list;
762     }
763     std::string category;
764     auto it = json.object_value().find("category");
765     if (it == json.object_value().end()) {
766       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
767           "\"category\" field not present"));
768     } else if (it->second.type() != Json::Type::STRING) {
769       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
770           "\"category\" field is not a string"));
771     } else {
772       category = it->second.string_value();
773     }
774     uint32_t requests_per_million = 0;
775     it = json.object_value().find("requests_per_million");
776     if (it == json.object_value().end()) {
777       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
778           "\"requests_per_million\" field is not present"));
779     } else if (it->second.type() != Json::Type::NUMBER) {
780       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
781           "\"requests_per_million\" field is not a number"));
782     } else {
783       requests_per_million =
784           gpr_parse_nonnegative_int(it->second.string_value().c_str());
785     }
786     if (error_list.empty()) {
787       drop_config->AddCategory(std::move(category), requests_per_million);
788     }
789     return error_list;
790   }
791 };
792 
793 }  // namespace
794 
795 }  // namespace grpc_core
796 
797 //
798 // Plugin registration
799 //
800 
grpc_lb_policy_xds_cluster_impl_init()801 void grpc_lb_policy_xds_cluster_impl_init() {
802   grpc_core::g_call_counter_map = new grpc_core::CircuitBreakerCallCounterMap();
803   grpc_core::LoadBalancingPolicyRegistry::Builder::
804       RegisterLoadBalancingPolicyFactory(
805           absl::make_unique<grpc_core::XdsClusterImplLbFactory>());
806 }
807 
grpc_lb_policy_xds_cluster_impl_shutdown()808 void grpc_lb_policy_xds_cluster_impl_shutdown() {
809   delete grpc_core::g_call_counter_map;
810 }
811