• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <grpc/grpc.h>
20 
21 #include "src/core/ext/filters/client_channel/lb_policy.h"
22 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
23 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
24 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
25 #include "src/core/ext/filters/client_channel/xds/xds_client.h"
26 #include "src/core/ext/filters/client_channel/xds/xds_client_stats.h"
27 #include "src/core/lib/channel/channel_args.h"
28 #include "src/core/lib/gprpp/orphanable.h"
29 #include "src/core/lib/gprpp/ref_counted_ptr.h"
30 #include "src/core/lib/iomgr/work_serializer.h"
31 
32 namespace grpc_core {
33 
34 TraceFlag grpc_lb_lrs_trace(false, "lrs_lb");
35 
36 namespace {
37 
38 constexpr char kLrs[] = "lrs_experimental";
39 
40 // Config for LRS LB policy.
41 class LrsLbConfig : public LoadBalancingPolicy::Config {
42  public:
LrsLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy,std::string cluster_name,std::string eds_service_name,std::string lrs_load_reporting_server_name,RefCountedPtr<XdsLocalityName> locality_name)43   LrsLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
44               std::string cluster_name, std::string eds_service_name,
45               std::string lrs_load_reporting_server_name,
46               RefCountedPtr<XdsLocalityName> locality_name)
47       : child_policy_(std::move(child_policy)),
48         cluster_name_(std::move(cluster_name)),
49         eds_service_name_(std::move(eds_service_name)),
50         lrs_load_reporting_server_name_(
51             std::move(lrs_load_reporting_server_name)),
52         locality_name_(std::move(locality_name)) {}
53 
name() const54   const char* name() const override { return kLrs; }
55 
child_policy() const56   RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
57     return child_policy_;
58   }
cluster_name() const59   const std::string& cluster_name() const { return cluster_name_; }
eds_service_name() const60   const std::string& eds_service_name() const { return eds_service_name_; }
lrs_load_reporting_server_name() const61   const std::string& lrs_load_reporting_server_name() const {
62     return lrs_load_reporting_server_name_;
63   };
locality_name() const64   RefCountedPtr<XdsLocalityName> locality_name() const {
65     return locality_name_;
66   }
67 
68  private:
69   RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
70   std::string cluster_name_;
71   std::string eds_service_name_;
72   std::string lrs_load_reporting_server_name_;
73   RefCountedPtr<XdsLocalityName> locality_name_;
74 };
75 
76 // LRS LB policy.
77 class LrsLb : public LoadBalancingPolicy {
78  public:
79   LrsLb(RefCountedPtr<XdsClient> xds_client, Args args);
80 
name() const81   const char* name() const override { return kLrs; }
82 
83   void UpdateLocked(UpdateArgs args) override;
84   void ExitIdleLocked() override;
85   void ResetBackoffLocked() override;
86 
87  private:
88   // A simple wrapper for ref-counting a picker from the child policy.
89   class RefCountedPicker : public RefCounted<RefCountedPicker> {
90    public:
RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)91     explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
92         : picker_(std::move(picker)) {}
Pick(PickArgs args)93     PickResult Pick(PickArgs args) { return picker_->Pick(args); }
94 
95    private:
96     std::unique_ptr<SubchannelPicker> picker_;
97   };
98 
99   // A picker that wraps the picker from the child to perform load reporting.
100   class LoadReportingPicker : public SubchannelPicker {
101    public:
LoadReportingPicker(RefCountedPtr<RefCountedPicker> picker,RefCountedPtr<XdsClusterLocalityStats> locality_stats)102     LoadReportingPicker(RefCountedPtr<RefCountedPicker> picker,
103                         RefCountedPtr<XdsClusterLocalityStats> locality_stats)
104         : picker_(std::move(picker)),
105           locality_stats_(std::move(locality_stats)) {}
106 
107     PickResult Pick(PickArgs args);
108 
109    private:
110     RefCountedPtr<RefCountedPicker> picker_;
111     RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
112   };
113 
114   class Helper : public ChannelControlHelper {
115    public:
Helper(RefCountedPtr<LrsLb> lrs_policy)116     explicit Helper(RefCountedPtr<LrsLb> lrs_policy)
117         : lrs_policy_(std::move(lrs_policy)) {}
118 
~Helper()119     ~Helper() { lrs_policy_.reset(DEBUG_LOCATION, "Helper"); }
120 
121     RefCountedPtr<SubchannelInterface> CreateSubchannel(
122         const grpc_channel_args& args) override;
123     void UpdateState(grpc_connectivity_state state,
124                      std::unique_ptr<SubchannelPicker> picker) override;
125     void RequestReresolution() override;
126     void AddTraceEvent(TraceSeverity severity,
127                        absl::string_view message) override;
128 
129    private:
130     RefCountedPtr<LrsLb> lrs_policy_;
131   };
132 
133   ~LrsLb();
134 
135   void ShutdownLocked() override;
136 
137   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
138       const grpc_channel_args* args);
139   void UpdateChildPolicyLocked(ServerAddressList addresses,
140                                const grpc_channel_args* args);
141 
142   void MaybeUpdatePickerLocked();
143 
144   // Current config from the resolver.
145   RefCountedPtr<LrsLbConfig> config_;
146 
147   // Internal state.
148   bool shutting_down_ = false;
149 
150   // The xds client.
151   RefCountedPtr<XdsClient> xds_client_;
152 
153   // The stats for client-side load reporting.
154   RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
155 
156   OrphanablePtr<LoadBalancingPolicy> child_policy_;
157 
158   // Latest state and picker reported by the child policy.
159   grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
160   RefCountedPtr<RefCountedPicker> picker_;
161 };
162 
163 //
164 // LrsLb::LoadReportingPicker
165 //
166 
Pick(LoadBalancingPolicy::PickArgs args)167 LoadBalancingPolicy::PickResult LrsLb::LoadReportingPicker::Pick(
168     LoadBalancingPolicy::PickArgs args) {
169   // Forward the pick to the picker returned from the child policy.
170   PickResult result = picker_->Pick(args);
171   if (result.type == PickResult::PICK_COMPLETE &&
172       result.subchannel != nullptr) {
173     // Record a call started.
174     locality_stats_->AddCallStarted();
175     // Intercept the recv_trailing_metadata op to record call completion.
176     XdsClusterLocalityStats* locality_stats =
177         locality_stats_->Ref(DEBUG_LOCATION, "LocalityStats+call").release();
178     result.recv_trailing_metadata_ready =
179         // Note: This callback does not run in either the control plane
180         // work serializer or in the data plane mutex.
181         [locality_stats](grpc_error* error, MetadataInterface* /*metadata*/,
182                          CallState* /*call_state*/) {
183           const bool call_failed = error != GRPC_ERROR_NONE;
184           locality_stats->AddCallFinished(call_failed);
185           locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
186         };
187   }
188   return result;
189 }
190 
191 //
192 // LrsLb
193 //
194 
LrsLb(RefCountedPtr<XdsClient> xds_client,Args args)195 LrsLb::LrsLb(RefCountedPtr<XdsClient> xds_client, Args args)
196     : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
197   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
198     gpr_log(GPR_INFO, "[lrs_lb %p] created -- using xds client %p from channel",
199             this, xds_client_.get());
200   }
201 }
202 
~LrsLb()203 LrsLb::~LrsLb() {
204   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
205     gpr_log(GPR_INFO, "[lrs_lb %p] destroying xds LB policy", this);
206   }
207 }
208 
ShutdownLocked()209 void LrsLb::ShutdownLocked() {
210   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
211     gpr_log(GPR_INFO, "[lrs_lb %p] shutting down", this);
212   }
213   shutting_down_ = true;
214   // Remove the child policy's interested_parties pollset_set from the
215   // xDS policy.
216   if (child_policy_ != nullptr) {
217     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
218                                      interested_parties());
219     child_policy_.reset();
220   }
221   // Drop our ref to the child's picker, in case it's holding a ref to
222   // the child.
223   picker_.reset();
224   locality_stats_.reset();
225   xds_client_.reset();
226 }
227 
ExitIdleLocked()228 void LrsLb::ExitIdleLocked() {
229   if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
230 }
231 
ResetBackoffLocked()232 void LrsLb::ResetBackoffLocked() {
233   // The XdsClient will have its backoff reset by the xds resolver, so we
234   // don't need to do it here.
235   if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
236 }
237 
UpdateLocked(UpdateArgs args)238 void LrsLb::UpdateLocked(UpdateArgs args) {
239   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
240     gpr_log(GPR_INFO, "[lrs_lb %p] Received update", this);
241   }
242   // Update config.
243   auto old_config = std::move(config_);
244   config_ = std::move(args.config);
245   // Update load reporting if needed.
246   if (old_config == nullptr ||
247       config_->lrs_load_reporting_server_name() !=
248           old_config->lrs_load_reporting_server_name() ||
249       config_->cluster_name() != old_config->cluster_name() ||
250       config_->eds_service_name() != old_config->eds_service_name() ||
251       *config_->locality_name() != *old_config->locality_name()) {
252     locality_stats_ = xds_client_->AddClusterLocalityStats(
253         config_->lrs_load_reporting_server_name(), config_->cluster_name(),
254         config_->eds_service_name(), config_->locality_name());
255     MaybeUpdatePickerLocked();
256   }
257   // Remove XdsClient from channel args, so that its presence doesn't
258   // prevent us from sharing subchannels between channels.
259   grpc_channel_args* new_args = XdsClient::RemoveFromChannelArgs(*args.args);
260   // Update child policy.
261   UpdateChildPolicyLocked(std::move(args.addresses), new_args);
262 }
263 
MaybeUpdatePickerLocked()264 void LrsLb::MaybeUpdatePickerLocked() {
265   if (picker_ != nullptr) {
266     auto lrs_picker =
267         absl::make_unique<LoadReportingPicker>(picker_, locality_stats_);
268     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
269       gpr_log(GPR_INFO, "[lrs_lb %p] updating connectivity: state=%s picker=%p",
270               this, ConnectivityStateName(state_), lrs_picker.get());
271     }
272     channel_control_helper()->UpdateState(state_, std::move(lrs_picker));
273   }
274 }
275 
CreateChildPolicyLocked(const grpc_channel_args * args)276 OrphanablePtr<LoadBalancingPolicy> LrsLb::CreateChildPolicyLocked(
277     const grpc_channel_args* args) {
278   LoadBalancingPolicy::Args lb_policy_args;
279   lb_policy_args.work_serializer = work_serializer();
280   lb_policy_args.args = args;
281   lb_policy_args.channel_control_helper =
282       absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
283   OrphanablePtr<LoadBalancingPolicy> lb_policy =
284       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
285                                          &grpc_lb_lrs_trace);
286   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
287     gpr_log(GPR_INFO, "[lrs_lb %p] Created new child policy handler %p", this,
288             lb_policy.get());
289   }
290   // Add our interested_parties pollset_set to that of the newly created
291   // child policy. This will make the child policy progress upon activity on
292   // this policy, which in turn is tied to the application's call.
293   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
294                                    interested_parties());
295   return lb_policy;
296 }
297 
UpdateChildPolicyLocked(ServerAddressList addresses,const grpc_channel_args * args)298 void LrsLb::UpdateChildPolicyLocked(ServerAddressList addresses,
299                                     const grpc_channel_args* args) {
300   // Create policy if needed.
301   if (child_policy_ == nullptr) {
302     child_policy_ = CreateChildPolicyLocked(args);
303   }
304   // Construct update args.
305   UpdateArgs update_args;
306   update_args.addresses = std::move(addresses);
307   update_args.config = config_->child_policy();
308   update_args.args = args;
309   // Update the policy.
310   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
311     gpr_log(GPR_INFO, "[lrs_lb %p] Updating child policy handler %p", this,
312             child_policy_.get());
313   }
314   child_policy_->UpdateLocked(std::move(update_args));
315 }
316 
317 //
318 // LrsLb::Helper
319 //
320 
CreateSubchannel(const grpc_channel_args & args)321 RefCountedPtr<SubchannelInterface> LrsLb::Helper::CreateSubchannel(
322     const grpc_channel_args& args) {
323   if (lrs_policy_->shutting_down_) return nullptr;
324   return lrs_policy_->channel_control_helper()->CreateSubchannel(args);
325 }
326 
UpdateState(grpc_connectivity_state state,std::unique_ptr<SubchannelPicker> picker)327 void LrsLb::Helper::UpdateState(grpc_connectivity_state state,
328                                 std::unique_ptr<SubchannelPicker> picker) {
329   if (lrs_policy_->shutting_down_) return;
330   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
331     gpr_log(GPR_INFO,
332             "[lrs_lb %p] child connectivity state update: state=%s picker=%p",
333             lrs_policy_.get(), ConnectivityStateName(state), picker.get());
334   }
335   // Save the state and picker.
336   lrs_policy_->state_ = state;
337   lrs_policy_->picker_ = MakeRefCounted<RefCountedPicker>(std::move(picker));
338   // Wrap the picker and return it to the channel.
339   lrs_policy_->MaybeUpdatePickerLocked();
340 }
341 
RequestReresolution()342 void LrsLb::Helper::RequestReresolution() {
343   if (lrs_policy_->shutting_down_) return;
344   lrs_policy_->channel_control_helper()->RequestReresolution();
345 }
346 
AddTraceEvent(TraceSeverity severity,absl::string_view message)347 void LrsLb::Helper::AddTraceEvent(TraceSeverity severity,
348                                   absl::string_view message) {
349   if (lrs_policy_->shutting_down_) return;
350   lrs_policy_->channel_control_helper()->AddTraceEvent(severity, message);
351 }
352 
353 //
354 // factory
355 //
356 
357 class LrsLbFactory : public LoadBalancingPolicyFactory {
358  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const359   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
360       LoadBalancingPolicy::Args args) const override {
361     RefCountedPtr<XdsClient> xds_client =
362         XdsClient::GetFromChannelArgs(*args.args);
363     if (xds_client == nullptr) {
364       gpr_log(GPR_ERROR,
365               "XdsClient not present in channel args -- cannot instantiate "
366               "lrs LB policy");
367       return nullptr;
368     }
369     return MakeOrphanable<LrsLb>(std::move(xds_client), std::move(args));
370   }
371 
name() const372   const char* name() const override { return kLrs; }
373 
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const374   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
375       const Json& json, grpc_error** error) const override {
376     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
377     if (json.type() == Json::Type::JSON_NULL) {
378       // lrs was mentioned as a policy in the deprecated loadBalancingPolicy
379       // field or in the client API.
380       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
381           "field:loadBalancingPolicy error:lrs policy requires configuration. "
382           "Please use loadBalancingConfig field of service config instead.");
383       return nullptr;
384     }
385     std::vector<grpc_error*> error_list;
386     // Child policy.
387     RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
388     auto it = json.object_value().find("childPolicy");
389     if (it == json.object_value().end()) {
390       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
391           "field:childPolicy error:required field missing"));
392     } else {
393       grpc_error* parse_error = GRPC_ERROR_NONE;
394       child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
395           it->second, &parse_error);
396       if (child_policy == nullptr) {
397         GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
398         std::vector<grpc_error*> child_errors;
399         child_errors.push_back(parse_error);
400         error_list.push_back(
401             GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
402       }
403     }
404     // Cluster name.
405     std::string cluster_name;
406     it = json.object_value().find("clusterName");
407     if (it == json.object_value().end()) {
408       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
409           "field:clusterName error:required field missing"));
410     } else if (it->second.type() != Json::Type::STRING) {
411       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
412           "field:clusterName error:type should be string"));
413     } else {
414       cluster_name = it->second.string_value();
415     }
416     // EDS service name.
417     std::string eds_service_name;
418     it = json.object_value().find("edsServiceName");
419     if (it != json.object_value().end()) {
420       if (it->second.type() != Json::Type::STRING) {
421         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
422             "field:edsServiceName error:type should be string"));
423       } else {
424         eds_service_name = it->second.string_value();
425       }
426     }
427     // Locality.
428     RefCountedPtr<XdsLocalityName> locality_name;
429     it = json.object_value().find("locality");
430     if (it == json.object_value().end()) {
431       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
432           "field:locality error:required field missing"));
433     } else {
434       std::vector<grpc_error*> child_errors =
435           ParseLocality(it->second, &locality_name);
436       if (!child_errors.empty()) {
437         error_list.push_back(
438             GRPC_ERROR_CREATE_FROM_VECTOR("field:locality", &child_errors));
439       }
440     }
441     // LRS load reporting server name.
442     std::string lrs_load_reporting_server_name;
443     it = json.object_value().find("lrsLoadReportingServerName");
444     if (it == json.object_value().end()) {
445       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
446           "field:lrsLoadReportingServerName error:required field missing"));
447     } else if (it->second.type() != Json::Type::STRING) {
448       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
449           "field:lrsLoadReportingServerName error:type should be string"));
450     } else {
451       lrs_load_reporting_server_name = it->second.string_value();
452     }
453     if (!error_list.empty()) {
454       *error = GRPC_ERROR_CREATE_FROM_VECTOR(
455           "lrs_experimental LB policy config", &error_list);
456       return nullptr;
457     }
458     return MakeRefCounted<LrsLbConfig>(
459         std::move(child_policy), std::move(cluster_name),
460         std::move(eds_service_name), std::move(lrs_load_reporting_server_name),
461         std::move(locality_name));
462   }
463 
464  private:
ParseLocality(const Json & json,RefCountedPtr<XdsLocalityName> * name)465   static std::vector<grpc_error*> ParseLocality(
466       const Json& json, RefCountedPtr<XdsLocalityName>* name) {
467     std::vector<grpc_error*> error_list;
468     if (json.type() != Json::Type::OBJECT) {
469       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
470           "locality field is not an object"));
471       return error_list;
472     }
473     std::string region;
474     auto it = json.object_value().find("region");
475     if (it != json.object_value().end()) {
476       if (it->second.type() != Json::Type::STRING) {
477         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
478             "\"region\" field is not a string"));
479       } else {
480         region = it->second.string_value();
481       }
482     }
483     std::string zone;
484     it = json.object_value().find("zone");
485     if (it != json.object_value().end()) {
486       if (it->second.type() != Json::Type::STRING) {
487         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
488             "\"zone\" field is not a string"));
489       } else {
490         zone = it->second.string_value();
491       }
492     }
493     std::string subzone;
494     it = json.object_value().find("subzone");
495     if (it != json.object_value().end()) {
496       if (it->second.type() != Json::Type::STRING) {
497         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
498             "\"subzone\" field is not a string"));
499       } else {
500         subzone = it->second.string_value();
501       }
502     }
503     if (region.empty() && zone.empty() && subzone.empty()) {
504       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
505           "at least one of region, zone, or subzone must be set"));
506     }
507     if (error_list.empty()) {
508       *name = MakeRefCounted<XdsLocalityName>(region, zone, subzone);
509     }
510     return error_list;
511   }
512 };
513 
514 }  // namespace
515 
516 }  // namespace grpc_core
517 
518 //
519 // Plugin registration
520 //
521 
grpc_lb_policy_lrs_init()522 void grpc_lb_policy_lrs_init() {
523   grpc_core::LoadBalancingPolicyRegistry::Builder::
524       RegisterLoadBalancingPolicyFactory(
525           absl::make_unique<grpc_core::LrsLbFactory>());
526 }
527 
grpc_lb_policy_lrs_shutdown()528 void grpc_lb_policy_lrs_shutdown() {}
529