1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <grpc/grpc.h>
20
21 #include "src/core/ext/filters/client_channel/lb_policy.h"
22 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
23 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
24 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
25 #include "src/core/ext/filters/client_channel/xds/xds_client.h"
26 #include "src/core/ext/filters/client_channel/xds/xds_client_stats.h"
27 #include "src/core/lib/channel/channel_args.h"
28 #include "src/core/lib/gprpp/orphanable.h"
29 #include "src/core/lib/gprpp/ref_counted_ptr.h"
30 #include "src/core/lib/iomgr/work_serializer.h"
31
32 namespace grpc_core {
33
34 TraceFlag grpc_lb_lrs_trace(false, "lrs_lb");
35
36 namespace {
37
38 constexpr char kLrs[] = "lrs_experimental";
39
40 // Config for LRS LB policy.
41 class LrsLbConfig : public LoadBalancingPolicy::Config {
42 public:
LrsLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy,std::string cluster_name,std::string eds_service_name,std::string lrs_load_reporting_server_name,RefCountedPtr<XdsLocalityName> locality_name)43 LrsLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy,
44 std::string cluster_name, std::string eds_service_name,
45 std::string lrs_load_reporting_server_name,
46 RefCountedPtr<XdsLocalityName> locality_name)
47 : child_policy_(std::move(child_policy)),
48 cluster_name_(std::move(cluster_name)),
49 eds_service_name_(std::move(eds_service_name)),
50 lrs_load_reporting_server_name_(
51 std::move(lrs_load_reporting_server_name)),
52 locality_name_(std::move(locality_name)) {}
53
name() const54 const char* name() const override { return kLrs; }
55
child_policy() const56 RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
57 return child_policy_;
58 }
cluster_name() const59 const std::string& cluster_name() const { return cluster_name_; }
eds_service_name() const60 const std::string& eds_service_name() const { return eds_service_name_; }
lrs_load_reporting_server_name() const61 const std::string& lrs_load_reporting_server_name() const {
62 return lrs_load_reporting_server_name_;
63 };
locality_name() const64 RefCountedPtr<XdsLocalityName> locality_name() const {
65 return locality_name_;
66 }
67
68 private:
69 RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
70 std::string cluster_name_;
71 std::string eds_service_name_;
72 std::string lrs_load_reporting_server_name_;
73 RefCountedPtr<XdsLocalityName> locality_name_;
74 };
75
76 // LRS LB policy.
77 class LrsLb : public LoadBalancingPolicy {
78 public:
79 LrsLb(RefCountedPtr<XdsClient> xds_client, Args args);
80
name() const81 const char* name() const override { return kLrs; }
82
83 void UpdateLocked(UpdateArgs args) override;
84 void ExitIdleLocked() override;
85 void ResetBackoffLocked() override;
86
87 private:
88 // A simple wrapper for ref-counting a picker from the child policy.
89 class RefCountedPicker : public RefCounted<RefCountedPicker> {
90 public:
RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)91 explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
92 : picker_(std::move(picker)) {}
Pick(PickArgs args)93 PickResult Pick(PickArgs args) { return picker_->Pick(args); }
94
95 private:
96 std::unique_ptr<SubchannelPicker> picker_;
97 };
98
99 // A picker that wraps the picker from the child to perform load reporting.
100 class LoadReportingPicker : public SubchannelPicker {
101 public:
LoadReportingPicker(RefCountedPtr<RefCountedPicker> picker,RefCountedPtr<XdsClusterLocalityStats> locality_stats)102 LoadReportingPicker(RefCountedPtr<RefCountedPicker> picker,
103 RefCountedPtr<XdsClusterLocalityStats> locality_stats)
104 : picker_(std::move(picker)),
105 locality_stats_(std::move(locality_stats)) {}
106
107 PickResult Pick(PickArgs args);
108
109 private:
110 RefCountedPtr<RefCountedPicker> picker_;
111 RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
112 };
113
114 class Helper : public ChannelControlHelper {
115 public:
Helper(RefCountedPtr<LrsLb> lrs_policy)116 explicit Helper(RefCountedPtr<LrsLb> lrs_policy)
117 : lrs_policy_(std::move(lrs_policy)) {}
118
~Helper()119 ~Helper() { lrs_policy_.reset(DEBUG_LOCATION, "Helper"); }
120
121 RefCountedPtr<SubchannelInterface> CreateSubchannel(
122 const grpc_channel_args& args) override;
123 void UpdateState(grpc_connectivity_state state,
124 std::unique_ptr<SubchannelPicker> picker) override;
125 void RequestReresolution() override;
126 void AddTraceEvent(TraceSeverity severity,
127 absl::string_view message) override;
128
129 private:
130 RefCountedPtr<LrsLb> lrs_policy_;
131 };
132
133 ~LrsLb();
134
135 void ShutdownLocked() override;
136
137 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
138 const grpc_channel_args* args);
139 void UpdateChildPolicyLocked(ServerAddressList addresses,
140 const grpc_channel_args* args);
141
142 void MaybeUpdatePickerLocked();
143
144 // Current config from the resolver.
145 RefCountedPtr<LrsLbConfig> config_;
146
147 // Internal state.
148 bool shutting_down_ = false;
149
150 // The xds client.
151 RefCountedPtr<XdsClient> xds_client_;
152
153 // The stats for client-side load reporting.
154 RefCountedPtr<XdsClusterLocalityStats> locality_stats_;
155
156 OrphanablePtr<LoadBalancingPolicy> child_policy_;
157
158 // Latest state and picker reported by the child policy.
159 grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
160 RefCountedPtr<RefCountedPicker> picker_;
161 };
162
163 //
164 // LrsLb::LoadReportingPicker
165 //
166
Pick(LoadBalancingPolicy::PickArgs args)167 LoadBalancingPolicy::PickResult LrsLb::LoadReportingPicker::Pick(
168 LoadBalancingPolicy::PickArgs args) {
169 // Forward the pick to the picker returned from the child policy.
170 PickResult result = picker_->Pick(args);
171 if (result.type == PickResult::PICK_COMPLETE &&
172 result.subchannel != nullptr) {
173 // Record a call started.
174 locality_stats_->AddCallStarted();
175 // Intercept the recv_trailing_metadata op to record call completion.
176 XdsClusterLocalityStats* locality_stats =
177 locality_stats_->Ref(DEBUG_LOCATION, "LocalityStats+call").release();
178 result.recv_trailing_metadata_ready =
179 // Note: This callback does not run in either the control plane
180 // work serializer or in the data plane mutex.
181 [locality_stats](grpc_error* error, MetadataInterface* /*metadata*/,
182 CallState* /*call_state*/) {
183 const bool call_failed = error != GRPC_ERROR_NONE;
184 locality_stats->AddCallFinished(call_failed);
185 locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
186 };
187 }
188 return result;
189 }
190
191 //
192 // LrsLb
193 //
194
LrsLb(RefCountedPtr<XdsClient> xds_client,Args args)195 LrsLb::LrsLb(RefCountedPtr<XdsClient> xds_client, Args args)
196 : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
197 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
198 gpr_log(GPR_INFO, "[lrs_lb %p] created -- using xds client %p from channel",
199 this, xds_client_.get());
200 }
201 }
202
~LrsLb()203 LrsLb::~LrsLb() {
204 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
205 gpr_log(GPR_INFO, "[lrs_lb %p] destroying xds LB policy", this);
206 }
207 }
208
ShutdownLocked()209 void LrsLb::ShutdownLocked() {
210 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
211 gpr_log(GPR_INFO, "[lrs_lb %p] shutting down", this);
212 }
213 shutting_down_ = true;
214 // Remove the child policy's interested_parties pollset_set from the
215 // xDS policy.
216 if (child_policy_ != nullptr) {
217 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
218 interested_parties());
219 child_policy_.reset();
220 }
221 // Drop our ref to the child's picker, in case it's holding a ref to
222 // the child.
223 picker_.reset();
224 locality_stats_.reset();
225 xds_client_.reset();
226 }
227
ExitIdleLocked()228 void LrsLb::ExitIdleLocked() {
229 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
230 }
231
ResetBackoffLocked()232 void LrsLb::ResetBackoffLocked() {
233 // The XdsClient will have its backoff reset by the xds resolver, so we
234 // don't need to do it here.
235 if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
236 }
237
UpdateLocked(UpdateArgs args)238 void LrsLb::UpdateLocked(UpdateArgs args) {
239 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
240 gpr_log(GPR_INFO, "[lrs_lb %p] Received update", this);
241 }
242 // Update config.
243 auto old_config = std::move(config_);
244 config_ = std::move(args.config);
245 // Update load reporting if needed.
246 if (old_config == nullptr ||
247 config_->lrs_load_reporting_server_name() !=
248 old_config->lrs_load_reporting_server_name() ||
249 config_->cluster_name() != old_config->cluster_name() ||
250 config_->eds_service_name() != old_config->eds_service_name() ||
251 *config_->locality_name() != *old_config->locality_name()) {
252 locality_stats_ = xds_client_->AddClusterLocalityStats(
253 config_->lrs_load_reporting_server_name(), config_->cluster_name(),
254 config_->eds_service_name(), config_->locality_name());
255 MaybeUpdatePickerLocked();
256 }
257 // Remove XdsClient from channel args, so that its presence doesn't
258 // prevent us from sharing subchannels between channels.
259 grpc_channel_args* new_args = XdsClient::RemoveFromChannelArgs(*args.args);
260 // Update child policy.
261 UpdateChildPolicyLocked(std::move(args.addresses), new_args);
262 }
263
MaybeUpdatePickerLocked()264 void LrsLb::MaybeUpdatePickerLocked() {
265 if (picker_ != nullptr) {
266 auto lrs_picker =
267 absl::make_unique<LoadReportingPicker>(picker_, locality_stats_);
268 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
269 gpr_log(GPR_INFO, "[lrs_lb %p] updating connectivity: state=%s picker=%p",
270 this, ConnectivityStateName(state_), lrs_picker.get());
271 }
272 channel_control_helper()->UpdateState(state_, std::move(lrs_picker));
273 }
274 }
275
CreateChildPolicyLocked(const grpc_channel_args * args)276 OrphanablePtr<LoadBalancingPolicy> LrsLb::CreateChildPolicyLocked(
277 const grpc_channel_args* args) {
278 LoadBalancingPolicy::Args lb_policy_args;
279 lb_policy_args.work_serializer = work_serializer();
280 lb_policy_args.args = args;
281 lb_policy_args.channel_control_helper =
282 absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
283 OrphanablePtr<LoadBalancingPolicy> lb_policy =
284 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
285 &grpc_lb_lrs_trace);
286 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
287 gpr_log(GPR_INFO, "[lrs_lb %p] Created new child policy handler %p", this,
288 lb_policy.get());
289 }
290 // Add our interested_parties pollset_set to that of the newly created
291 // child policy. This will make the child policy progress upon activity on
292 // this policy, which in turn is tied to the application's call.
293 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
294 interested_parties());
295 return lb_policy;
296 }
297
UpdateChildPolicyLocked(ServerAddressList addresses,const grpc_channel_args * args)298 void LrsLb::UpdateChildPolicyLocked(ServerAddressList addresses,
299 const grpc_channel_args* args) {
300 // Create policy if needed.
301 if (child_policy_ == nullptr) {
302 child_policy_ = CreateChildPolicyLocked(args);
303 }
304 // Construct update args.
305 UpdateArgs update_args;
306 update_args.addresses = std::move(addresses);
307 update_args.config = config_->child_policy();
308 update_args.args = args;
309 // Update the policy.
310 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
311 gpr_log(GPR_INFO, "[lrs_lb %p] Updating child policy handler %p", this,
312 child_policy_.get());
313 }
314 child_policy_->UpdateLocked(std::move(update_args));
315 }
316
317 //
318 // LrsLb::Helper
319 //
320
CreateSubchannel(const grpc_channel_args & args)321 RefCountedPtr<SubchannelInterface> LrsLb::Helper::CreateSubchannel(
322 const grpc_channel_args& args) {
323 if (lrs_policy_->shutting_down_) return nullptr;
324 return lrs_policy_->channel_control_helper()->CreateSubchannel(args);
325 }
326
UpdateState(grpc_connectivity_state state,std::unique_ptr<SubchannelPicker> picker)327 void LrsLb::Helper::UpdateState(grpc_connectivity_state state,
328 std::unique_ptr<SubchannelPicker> picker) {
329 if (lrs_policy_->shutting_down_) return;
330 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_lrs_trace)) {
331 gpr_log(GPR_INFO,
332 "[lrs_lb %p] child connectivity state update: state=%s picker=%p",
333 lrs_policy_.get(), ConnectivityStateName(state), picker.get());
334 }
335 // Save the state and picker.
336 lrs_policy_->state_ = state;
337 lrs_policy_->picker_ = MakeRefCounted<RefCountedPicker>(std::move(picker));
338 // Wrap the picker and return it to the channel.
339 lrs_policy_->MaybeUpdatePickerLocked();
340 }
341
RequestReresolution()342 void LrsLb::Helper::RequestReresolution() {
343 if (lrs_policy_->shutting_down_) return;
344 lrs_policy_->channel_control_helper()->RequestReresolution();
345 }
346
AddTraceEvent(TraceSeverity severity,absl::string_view message)347 void LrsLb::Helper::AddTraceEvent(TraceSeverity severity,
348 absl::string_view message) {
349 if (lrs_policy_->shutting_down_) return;
350 lrs_policy_->channel_control_helper()->AddTraceEvent(severity, message);
351 }
352
353 //
354 // factory
355 //
356
357 class LrsLbFactory : public LoadBalancingPolicyFactory {
358 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const359 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
360 LoadBalancingPolicy::Args args) const override {
361 RefCountedPtr<XdsClient> xds_client =
362 XdsClient::GetFromChannelArgs(*args.args);
363 if (xds_client == nullptr) {
364 gpr_log(GPR_ERROR,
365 "XdsClient not present in channel args -- cannot instantiate "
366 "lrs LB policy");
367 return nullptr;
368 }
369 return MakeOrphanable<LrsLb>(std::move(xds_client), std::move(args));
370 }
371
name() const372 const char* name() const override { return kLrs; }
373
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const374 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
375 const Json& json, grpc_error** error) const override {
376 GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
377 if (json.type() == Json::Type::JSON_NULL) {
378 // lrs was mentioned as a policy in the deprecated loadBalancingPolicy
379 // field or in the client API.
380 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
381 "field:loadBalancingPolicy error:lrs policy requires configuration. "
382 "Please use loadBalancingConfig field of service config instead.");
383 return nullptr;
384 }
385 std::vector<grpc_error*> error_list;
386 // Child policy.
387 RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
388 auto it = json.object_value().find("childPolicy");
389 if (it == json.object_value().end()) {
390 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
391 "field:childPolicy error:required field missing"));
392 } else {
393 grpc_error* parse_error = GRPC_ERROR_NONE;
394 child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
395 it->second, &parse_error);
396 if (child_policy == nullptr) {
397 GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
398 std::vector<grpc_error*> child_errors;
399 child_errors.push_back(parse_error);
400 error_list.push_back(
401 GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors));
402 }
403 }
404 // Cluster name.
405 std::string cluster_name;
406 it = json.object_value().find("clusterName");
407 if (it == json.object_value().end()) {
408 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
409 "field:clusterName error:required field missing"));
410 } else if (it->second.type() != Json::Type::STRING) {
411 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
412 "field:clusterName error:type should be string"));
413 } else {
414 cluster_name = it->second.string_value();
415 }
416 // EDS service name.
417 std::string eds_service_name;
418 it = json.object_value().find("edsServiceName");
419 if (it != json.object_value().end()) {
420 if (it->second.type() != Json::Type::STRING) {
421 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
422 "field:edsServiceName error:type should be string"));
423 } else {
424 eds_service_name = it->second.string_value();
425 }
426 }
427 // Locality.
428 RefCountedPtr<XdsLocalityName> locality_name;
429 it = json.object_value().find("locality");
430 if (it == json.object_value().end()) {
431 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
432 "field:locality error:required field missing"));
433 } else {
434 std::vector<grpc_error*> child_errors =
435 ParseLocality(it->second, &locality_name);
436 if (!child_errors.empty()) {
437 error_list.push_back(
438 GRPC_ERROR_CREATE_FROM_VECTOR("field:locality", &child_errors));
439 }
440 }
441 // LRS load reporting server name.
442 std::string lrs_load_reporting_server_name;
443 it = json.object_value().find("lrsLoadReportingServerName");
444 if (it == json.object_value().end()) {
445 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
446 "field:lrsLoadReportingServerName error:required field missing"));
447 } else if (it->second.type() != Json::Type::STRING) {
448 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
449 "field:lrsLoadReportingServerName error:type should be string"));
450 } else {
451 lrs_load_reporting_server_name = it->second.string_value();
452 }
453 if (!error_list.empty()) {
454 *error = GRPC_ERROR_CREATE_FROM_VECTOR(
455 "lrs_experimental LB policy config", &error_list);
456 return nullptr;
457 }
458 return MakeRefCounted<LrsLbConfig>(
459 std::move(child_policy), std::move(cluster_name),
460 std::move(eds_service_name), std::move(lrs_load_reporting_server_name),
461 std::move(locality_name));
462 }
463
464 private:
ParseLocality(const Json & json,RefCountedPtr<XdsLocalityName> * name)465 static std::vector<grpc_error*> ParseLocality(
466 const Json& json, RefCountedPtr<XdsLocalityName>* name) {
467 std::vector<grpc_error*> error_list;
468 if (json.type() != Json::Type::OBJECT) {
469 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
470 "locality field is not an object"));
471 return error_list;
472 }
473 std::string region;
474 auto it = json.object_value().find("region");
475 if (it != json.object_value().end()) {
476 if (it->second.type() != Json::Type::STRING) {
477 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
478 "\"region\" field is not a string"));
479 } else {
480 region = it->second.string_value();
481 }
482 }
483 std::string zone;
484 it = json.object_value().find("zone");
485 if (it != json.object_value().end()) {
486 if (it->second.type() != Json::Type::STRING) {
487 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
488 "\"zone\" field is not a string"));
489 } else {
490 zone = it->second.string_value();
491 }
492 }
493 std::string subzone;
494 it = json.object_value().find("subzone");
495 if (it != json.object_value().end()) {
496 if (it->second.type() != Json::Type::STRING) {
497 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
498 "\"subzone\" field is not a string"));
499 } else {
500 subzone = it->second.string_value();
501 }
502 }
503 if (region.empty() && zone.empty() && subzone.empty()) {
504 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
505 "at least one of region, zone, or subzone must be set"));
506 }
507 if (error_list.empty()) {
508 *name = MakeRefCounted<XdsLocalityName>(region, zone, subzone);
509 }
510 return error_list;
511 }
512 };
513
514 } // namespace
515
516 } // namespace grpc_core
517
518 //
519 // Plugin registration
520 //
521
grpc_lb_policy_lrs_init()522 void grpc_lb_policy_lrs_init() {
523 grpc_core::LoadBalancingPolicyRegistry::Builder::
524 RegisterLoadBalancingPolicyFactory(
525 absl::make_unique<grpc_core::LrsLbFactory>());
526 }
527
grpc_lb_policy_lrs_shutdown()528 void grpc_lb_policy_lrs_shutdown() {}
529