1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <inttypes.h>
20 #include <limits.h>
21
22 #include "absl/strings/str_cat.h"
23 #include "absl/types/optional.h"
24
25 #include <grpc/grpc.h>
26
27 #include "src/core/ext/filters/client_channel/client_channel.h"
28 #include "src/core/ext/filters/client_channel/lb_policy.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
30 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
31 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
32 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
33 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
34 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
35 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
36 #include "src/core/ext/filters/client_channel/resolver_registry.h"
37 #include "src/core/ext/filters/client_channel/server_address.h"
38 #include "src/core/ext/xds/xds_channel_args.h"
39 #include "src/core/ext/xds/xds_client.h"
40 #include "src/core/ext/xds/xds_client_stats.h"
41 #include "src/core/lib/channel/channel_args.h"
42 #include "src/core/lib/gpr/string.h"
43 #include "src/core/lib/gprpp/orphanable.h"
44 #include "src/core/lib/gprpp/ref_counted_ptr.h"
45 #include "src/core/lib/iomgr/work_serializer.h"
46 #include "src/core/lib/transport/error_utils.h"
47 #include "src/core/lib/uri/uri_parser.h"
48
49 #define GRPC_EDS_DEFAULT_FALLBACK_TIMEOUT 10000
50
51 namespace grpc_core {
52
53 TraceFlag grpc_lb_xds_cluster_resolver_trace(false, "xds_cluster_resolver_lb");
54
55 const char* kXdsLocalityNameAttributeKey = "xds_locality_name";
56
57 namespace {
58
59 constexpr char kXdsClusterResolver[] = "xds_cluster_resolver_experimental";
60
61 // Config for EDS LB policy.
62 class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config {
63 public:
64 struct DiscoveryMechanism {
65 std::string cluster_name;
66 absl::optional<std::string> lrs_load_reporting_server_name;
67 uint32_t max_concurrent_requests;
68 enum DiscoveryMechanismType {
69 EDS,
70 LOGICAL_DNS,
71 };
72 DiscoveryMechanismType type;
73 std::string eds_service_name;
74
operator ==grpc_core::__anon9699b2f90111::XdsClusterResolverLbConfig::DiscoveryMechanism75 bool operator==(const DiscoveryMechanism& other) const {
76 return (cluster_name == other.cluster_name &&
77 lrs_load_reporting_server_name ==
78 other.lrs_load_reporting_server_name &&
79 max_concurrent_requests == other.max_concurrent_requests &&
80 type == other.type && eds_service_name == other.eds_service_name);
81 }
82 };
83
XdsClusterResolverLbConfig(std::vector<DiscoveryMechanism> discovery_mechanisms,Json xds_lb_policy)84 XdsClusterResolverLbConfig(
85 std::vector<DiscoveryMechanism> discovery_mechanisms, Json xds_lb_policy)
86 : discovery_mechanisms_(std::move(discovery_mechanisms)),
87 xds_lb_policy_(std::move(xds_lb_policy)) {}
88
name() const89 const char* name() const override { return kXdsClusterResolver; }
discovery_mechanisms() const90 const std::vector<DiscoveryMechanism>& discovery_mechanisms() const {
91 return discovery_mechanisms_;
92 }
93
xds_lb_policy() const94 const Json& xds_lb_policy() const { return xds_lb_policy_; }
95
96 private:
97 std::vector<DiscoveryMechanism> discovery_mechanisms_;
98 Json xds_lb_policy_;
99 };
100
101 // Xds Cluster Resolver LB policy.
102 class XdsClusterResolverLb : public LoadBalancingPolicy {
103 public:
104 XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client, Args args);
105
name() const106 const char* name() const override { return kXdsClusterResolver; }
107
108 void UpdateLocked(UpdateArgs args) override;
109 void ResetBackoffLocked() override;
110 void ExitIdleLocked() override;
111
112 private:
113 // Discovery Mechanism Base class
114 //
115 // Implemented by EDS and LOGICAL_DNS.
116 //
117 // Implementations are responsible for calling the LB policy's
118 // OnEndpointChanged(), OnError(), and OnResourceDoesNotExist()
119 // methods when the corresponding events occur.
120 //
121 // Must implement Orphan() method to cancel the watchers.
122 class DiscoveryMechanism : public InternallyRefCounted<DiscoveryMechanism> {
123 public:
DiscoveryMechanism(RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,size_t index)124 DiscoveryMechanism(
125 RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,
126 size_t index)
127 : parent_(std::move(xds_cluster_resolver_lb)), index_(index) {}
128 virtual void Start() = 0;
129 void Orphan() override = 0;
130 virtual Json::Array override_child_policy() = 0;
131 virtual bool disable_reresolution() = 0;
132
133 // Caller must ensure that config_ is set before calling.
GetXdsClusterResolverResourceName() const134 const absl::string_view GetXdsClusterResolverResourceName() const {
135 if (!parent_->is_xds_uri_) return parent_->server_name_;
136 if (!parent_->config_->discovery_mechanisms()[index_]
137 .eds_service_name.empty()) {
138 return parent_->config_->discovery_mechanisms()[index_]
139 .eds_service_name;
140 }
141 return parent_->config_->discovery_mechanisms()[index_].cluster_name;
142 }
143
144 // Returns a pair containing the cluster and eds_service_name
145 // to use for LRS load reporting. Caller must ensure that config_ is set
146 // before calling.
GetLrsClusterKey() const147 std::pair<absl::string_view, absl::string_view> GetLrsClusterKey() const {
148 if (!parent_->is_xds_uri_) return {parent_->server_name_, nullptr};
149 return {
150 parent_->config_->discovery_mechanisms()[index_].cluster_name,
151 parent_->config_->discovery_mechanisms()[index_].eds_service_name};
152 }
153
154 protected:
parent() const155 XdsClusterResolverLb* parent() const { return parent_.get(); }
index() const156 size_t index() const { return index_; }
157
158 private:
159 RefCountedPtr<XdsClusterResolverLb> parent_;
160 // Stores its own index in the vector of DiscoveryMechanism.
161 size_t index_;
162 };
163
164 class EdsDiscoveryMechanism : public DiscoveryMechanism {
165 public:
EdsDiscoveryMechanism(RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,size_t index)166 EdsDiscoveryMechanism(
167 RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,
168 size_t index)
169 : DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) {}
170 void Start() override;
171 void Orphan() override;
override_child_policy()172 Json::Array override_child_policy() override { return Json::Array{}; }
disable_reresolution()173 bool disable_reresolution() override { return true; }
174
175 private:
176 class EndpointWatcher : public XdsClient::EndpointWatcherInterface {
177 public:
EndpointWatcher(RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism)178 explicit EndpointWatcher(
179 RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism)
180 : discovery_mechanism_(std::move(discovery_mechanism)) {}
~EndpointWatcher()181 ~EndpointWatcher() override {
182 discovery_mechanism_.reset(DEBUG_LOCATION, "EndpointWatcher");
183 }
OnEndpointChanged(XdsApi::EdsUpdate update)184 void OnEndpointChanged(XdsApi::EdsUpdate update) override {
185 new Notifier(discovery_mechanism_, std::move(update));
186 }
OnError(grpc_error * error)187 void OnError(grpc_error* error) override {
188 new Notifier(discovery_mechanism_, error);
189 }
OnResourceDoesNotExist()190 void OnResourceDoesNotExist() override {
191 new Notifier(discovery_mechanism_);
192 }
193
194 private:
195 class Notifier {
196 public:
197 Notifier(RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism,
198 XdsApi::EdsUpdate update);
199 Notifier(RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism,
200 grpc_error* error);
201 explicit Notifier(
202 RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism);
~Notifier()203 ~Notifier() { discovery_mechanism_.reset(DEBUG_LOCATION, "Notifier"); }
204
205 private:
206 enum Type { kUpdate, kError, kDoesNotExist };
207
208 static void RunInExecCtx(void* arg, grpc_error* error);
209 void RunInWorkSerializer(grpc_error* error);
210
211 RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism_;
212 grpc_closure closure_;
213 XdsApi::EdsUpdate update_;
214 Type type_;
215 };
216
217 RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism_;
218 };
219
220 // Note that this is not owned, so this pointer must never be dereferenced.
221 EndpointWatcher* watcher_ = nullptr;
222 };
223
224 class LogicalDNSDiscoveryMechanism : public DiscoveryMechanism {
225 public:
LogicalDNSDiscoveryMechanism(RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,size_t index)226 LogicalDNSDiscoveryMechanism(
227 RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,
228 size_t index)
229 : DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) {}
230 void Start() override;
231 void Orphan() override;
override_child_policy()232 Json::Array override_child_policy() override {
233 return Json::Array{
234 Json::Object{
235 {"pick_first", Json::Object()},
236 },
237 };
238 }
disable_reresolution()239 bool disable_reresolution() override { return false; };
240
241 private:
242 class ResolverResultHandler : public Resolver::ResultHandler {
243 public:
ResolverResultHandler(RefCountedPtr<LogicalDNSDiscoveryMechanism> discovery_mechanism)244 explicit ResolverResultHandler(
245 RefCountedPtr<LogicalDNSDiscoveryMechanism> discovery_mechanism)
246 : discovery_mechanism_(std::move(discovery_mechanism)) {}
247
~ResolverResultHandler()248 ~ResolverResultHandler() override {}
249
250 void ReturnResult(Resolver::Result result) override;
251
252 void ReturnError(grpc_error* error) override;
253
254 private:
255 RefCountedPtr<LogicalDNSDiscoveryMechanism> discovery_mechanism_;
256 };
257 // This is only necessary because of a bug in msvc where nested class cannot
258 // access protected member in base class.
259 friend class ResolverResultHandler;
260 OrphanablePtr<Resolver> resolver_;
261 };
262
263 struct DiscoveryMechanismEntry {
264 OrphanablePtr<DiscoveryMechanism> discovery_mechanism;
265 bool first_update_received = false;
266 // Number of priorities this mechanism has contributed to priority_list_.
267 // (The sum of this across all discovery mechanisms should always equal
268 // the number of priorities in priority_list_.)
269 uint32_t num_priorities = 0;
270 RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config;
271 // Populated only when an update has been delivered by the mechanism
272 // but has not yet been applied to the LB policy's combined priority_list_.
273 absl::optional<XdsApi::EdsUpdate::PriorityList> pending_priority_list;
274 };
275
276 class Helper : public ChannelControlHelper {
277 public:
Helper(RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_policy)278 explicit Helper(
279 RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_policy)
280 : xds_cluster_resolver_policy_(std::move(xds_cluster_resolver_policy)) {
281 }
282
~Helper()283 ~Helper() override {
284 xds_cluster_resolver_policy_.reset(DEBUG_LOCATION, "Helper");
285 }
286
287 RefCountedPtr<SubchannelInterface> CreateSubchannel(
288 ServerAddress address, const grpc_channel_args& args) override;
289 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
290 std::unique_ptr<SubchannelPicker> picker) override;
291 // This is a no-op, because we get the addresses from the xds
292 // client, which is a watch-based API.
RequestReresolution()293 void RequestReresolution() override {}
294 void AddTraceEvent(TraceSeverity severity,
295 absl::string_view message) override;
296
297 private:
298 RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_policy_;
299 };
300
301 ~XdsClusterResolverLb() override;
302
303 void ShutdownLocked() override;
304
305 void OnEndpointChanged(size_t index, XdsApi::EdsUpdate update);
306 void OnError(size_t index, grpc_error* error);
307 void OnResourceDoesNotExist(size_t index);
308
309 void MaybeDestroyChildPolicyLocked();
310
311 void UpdatePriorityList(XdsApi::EdsUpdate::PriorityList priority_list);
312 void UpdateChildPolicyLocked();
313 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
314 const grpc_channel_args* args);
315 ServerAddressList CreateChildPolicyAddressesLocked();
316 RefCountedPtr<Config> CreateChildPolicyConfigLocked();
317 grpc_channel_args* CreateChildPolicyArgsLocked(
318 const grpc_channel_args* args_in);
319
320 // Server name from target URI.
321 std::string server_name_;
322 bool is_xds_uri_;
323
324 // Current channel args and config from the resolver.
325 const grpc_channel_args* args_ = nullptr;
326 RefCountedPtr<XdsClusterResolverLbConfig> config_;
327
328 // Internal state.
329 bool shutting_down_ = false;
330
331 // The xds client and endpoint watcher.
332 RefCountedPtr<XdsClient> xds_client_;
333
334 // Vector of discovery mechansism entries in priority order.
335 std::vector<DiscoveryMechanismEntry> discovery_mechanisms_;
336
337 // The latest data from the endpoint watcher.
338 XdsApi::EdsUpdate::PriorityList priority_list_;
339 // State used to retain child policy names for priority policy.
340 std::vector<size_t /*child_number*/> priority_child_numbers_;
341
342 OrphanablePtr<LoadBalancingPolicy> child_policy_;
343 };
344
345 //
346 // XdsClusterResolverLb::Helper
347 //
348
349 RefCountedPtr<SubchannelInterface>
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)350 XdsClusterResolverLb::Helper::CreateSubchannel(ServerAddress address,
351 const grpc_channel_args& args) {
352 if (xds_cluster_resolver_policy_->shutting_down_) return nullptr;
353 return xds_cluster_resolver_policy_->channel_control_helper()
354 ->CreateSubchannel(std::move(address), args);
355 }
356
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)357 void XdsClusterResolverLb::Helper::UpdateState(
358 grpc_connectivity_state state, const absl::Status& status,
359 std::unique_ptr<SubchannelPicker> picker) {
360 if (xds_cluster_resolver_policy_->shutting_down_ ||
361 xds_cluster_resolver_policy_->child_policy_ == nullptr) {
362 return;
363 }
364 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
365 gpr_log(GPR_INFO,
366 "[xds_cluster_resolver_lb %p] child policy updated state=%s (%s) "
367 "picker=%p",
368 xds_cluster_resolver_policy_.get(), ConnectivityStateName(state),
369 status.ToString().c_str(), picker.get());
370 }
371 xds_cluster_resolver_policy_->channel_control_helper()->UpdateState(
372 state, status, std::move(picker));
373 }
374
AddTraceEvent(TraceSeverity severity,absl::string_view message)375 void XdsClusterResolverLb::Helper::AddTraceEvent(TraceSeverity severity,
376 absl::string_view message) {
377 if (xds_cluster_resolver_policy_->shutting_down_) return;
378 xds_cluster_resolver_policy_->channel_control_helper()->AddTraceEvent(
379 severity, message);
380 }
381
382 //
383 // XdsClusterResolverLb::EdsDiscoveryMechanism
384 //
385
Start()386 void XdsClusterResolverLb::EdsDiscoveryMechanism::Start() {
387 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
388 gpr_log(GPR_INFO,
389 "[xds_cluster_resolver_lb %p] eds discovery mechanism %" PRIuPTR
390 ":%p starting xds watch for %s",
391 parent(), index(), this,
392 std::string(GetXdsClusterResolverResourceName()).c_str());
393 }
394 auto watcher = absl::make_unique<EndpointWatcher>(
395 Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism"));
396 watcher_ = watcher.get();
397 parent()->xds_client_->WatchEndpointData(GetXdsClusterResolverResourceName(),
398 std::move(watcher));
399 }
400
Orphan()401 void XdsClusterResolverLb::EdsDiscoveryMechanism::Orphan() {
402 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
403 gpr_log(GPR_INFO,
404 "[xds_cluster_resolver_lb %p] eds discovery mechanism %" PRIuPTR
405 ":%p cancelling xds watch for %s",
406 parent(), index(), this,
407 std::string(GetXdsClusterResolverResourceName()).c_str());
408 }
409 parent()->xds_client_->CancelEndpointDataWatch(
410 GetXdsClusterResolverResourceName(), watcher_);
411 Unref();
412 }
413
414 //
415 // XdsClusterResolverLb::EndpointWatcher::Notifier
416 //
417
418 XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism> discovery_mechanism,XdsApi::EdsUpdate update)419 Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism>
420 discovery_mechanism,
421 XdsApi::EdsUpdate update)
422 : discovery_mechanism_(std::move(discovery_mechanism)),
423 update_(std::move(update)),
424 type_(kUpdate) {
425 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
426 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
427 }
428
429 XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism> discovery_mechanism,grpc_error * error)430 Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism>
431 discovery_mechanism,
432 grpc_error* error)
433 : discovery_mechanism_(std::move(discovery_mechanism)), type_(kError) {
434 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
435 ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
436 }
437
438 XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism> discovery_mechanism)439 Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism>
440 discovery_mechanism)
441 : discovery_mechanism_(std::move(discovery_mechanism)),
442 type_(kDoesNotExist) {
443 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
444 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
445 }
446
447 void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
RunInExecCtx(void * arg,grpc_error * error)448 RunInExecCtx(void* arg, grpc_error* error) {
449 Notifier* self = static_cast<Notifier*>(arg);
450 GRPC_ERROR_REF(error);
451 self->discovery_mechanism_->parent()->work_serializer()->Run(
452 [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
453 }
454
455 void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
RunInWorkSerializer(grpc_error * error)456 RunInWorkSerializer(grpc_error* error) {
457 switch (type_) {
458 case kUpdate:
459 discovery_mechanism_->parent()->OnEndpointChanged(
460 discovery_mechanism_->index(), std::move(update_));
461 break;
462 case kError:
463 discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(),
464 error);
465 break;
466 case kDoesNotExist:
467 discovery_mechanism_->parent()->OnResourceDoesNotExist(
468 discovery_mechanism_->index());
469 break;
470 };
471 delete this;
472 }
473
474 //
475 // XdsClusterResolverLb::LogicalDNSDiscoveryMechanism
476 //
477
Start()478 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Start() {
479 std::string target = parent()->server_name_;
480 grpc_channel_args* args = nullptr;
481 FakeResolverResponseGenerator* fake_resolver_response_generator =
482 grpc_channel_args_find_pointer<FakeResolverResponseGenerator>(
483 parent()->args_,
484 GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR);
485 if (fake_resolver_response_generator != nullptr) {
486 target = absl::StrCat("fake:", target);
487 grpc_arg new_arg = FakeResolverResponseGenerator::MakeChannelArg(
488 fake_resolver_response_generator);
489 args = grpc_channel_args_copy_and_add(parent()->args_, &new_arg, 1);
490 } else {
491 args = grpc_channel_args_copy(parent()->args_);
492 }
493 resolver_ = ResolverRegistry::CreateResolver(
494 target.c_str(), args, parent()->interested_parties(),
495 parent()->work_serializer(),
496 absl::make_unique<ResolverResultHandler>(
497 Ref(DEBUG_LOCATION, "LogicalDNSDiscoveryMechanism")));
498 grpc_channel_args_destroy(args);
499 if (resolver_ == nullptr) {
500 parent()->OnResourceDoesNotExist(index());
501 return;
502 }
503 resolver_->StartLocked();
504 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
505 gpr_log(GPR_INFO,
506 "[xds_cluster_resolver_lb %p] logical DNS discovery mechanism "
507 "%" PRIuPTR ":%p starting dns resolver %p",
508 parent(), index(), this, resolver_.get());
509 }
510 }
511
Orphan()512 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Orphan() {
513 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
514 gpr_log(
515 GPR_INFO,
516 "[xds_cluster_resolver_lb %p] logical DNS discovery mechanism %" PRIuPTR
517 ":%p shutting down dns resolver %p",
518 parent(), index(), this, resolver_.get());
519 }
520 resolver_.reset();
521 Unref();
522 }
523
524 //
525 // XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler
526 //
527
528 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
ReturnResult(Resolver::Result result)529 ReturnResult(Resolver::Result result) {
530 // convert result to eds update
531 XdsApi::EdsUpdate update;
532 XdsApi::EdsUpdate::Priority::Locality locality;
533 locality.name = MakeRefCounted<XdsLocalityName>("", "", "");
534 locality.lb_weight = 1;
535 locality.endpoints = std::move(result.addresses);
536 XdsApi::EdsUpdate::Priority priority;
537 priority.localities.emplace(locality.name.get(), std::move(locality));
538 update.priorities.emplace_back(std::move(priority));
539 discovery_mechanism_->parent()->OnEndpointChanged(
540 discovery_mechanism_->index(), std::move(update));
541 }
542
543 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
ReturnError(grpc_error * error)544 ReturnError(grpc_error* error) {
545 discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(), error);
546 }
547
548 //
549 // XdsClusterResolverLb public methods
550 //
551
XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client,Args args)552 XdsClusterResolverLb::XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client,
553 Args args)
554 : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
555 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
556 gpr_log(GPR_INFO,
557 "[xds_cluster_resolver_lb %p] created -- using xds client %p", this,
558 xds_client_.get());
559 }
560 // Record server name.
561 const char* server_uri =
562 grpc_channel_args_find_string(args.args, GRPC_ARG_SERVER_URI);
563 GPR_ASSERT(server_uri != nullptr);
564 absl::StatusOr<URI> uri = URI::Parse(server_uri);
565 GPR_ASSERT(uri.ok() && !uri->path().empty());
566 server_name_ = std::string(absl::StripPrefix(uri->path(), "/"));
567 is_xds_uri_ = uri->scheme() == "xds";
568 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
569 gpr_log(GPR_INFO,
570 "[xds_cluster_resolver_lb %p] server name from channel "
571 "(is_xds_uri=%d): %s",
572 this, is_xds_uri_, server_name_.c_str());
573 }
574 // EDS-only flow.
575 if (!is_xds_uri_) {
576 // Setup channelz linkage.
577 channelz::ChannelNode* parent_channelz_node =
578 grpc_channel_args_find_pointer<channelz::ChannelNode>(
579 args.args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
580 if (parent_channelz_node != nullptr) {
581 xds_client_->AddChannelzLinkage(parent_channelz_node);
582 }
583 // Couple polling.
584 grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
585 interested_parties());
586 }
587 }
588
~XdsClusterResolverLb()589 XdsClusterResolverLb::~XdsClusterResolverLb() {
590 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
591 gpr_log(GPR_INFO,
592 "[xds_cluster_resolver_lb %p] destroying xds_cluster_resolver LB "
593 "policy",
594 this);
595 }
596 }
597
ShutdownLocked()598 void XdsClusterResolverLb::ShutdownLocked() {
599 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
600 gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] shutting down", this);
601 }
602 shutting_down_ = true;
603 MaybeDestroyChildPolicyLocked();
604 discovery_mechanisms_.clear();
605 if (!is_xds_uri_) {
606 // Remove channelz linkage.
607 channelz::ChannelNode* parent_channelz_node =
608 grpc_channel_args_find_pointer<channelz::ChannelNode>(
609 args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
610 if (parent_channelz_node != nullptr) {
611 xds_client_->RemoveChannelzLinkage(parent_channelz_node);
612 }
613 // Decouple polling.
614 grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
615 interested_parties());
616 }
617 xds_client_.reset(DEBUG_LOCATION, "XdsClusterResolverLb");
618 // Destroy channel args.
619 grpc_channel_args_destroy(args_);
620 args_ = nullptr;
621 }
622
MaybeDestroyChildPolicyLocked()623 void XdsClusterResolverLb::MaybeDestroyChildPolicyLocked() {
624 if (child_policy_ != nullptr) {
625 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
626 interested_parties());
627 child_policy_.reset();
628 }
629 }
630
UpdateLocked(UpdateArgs args)631 void XdsClusterResolverLb::UpdateLocked(UpdateArgs args) {
632 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
633 gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] Received update", this);
634 }
635 const bool is_initial_update = args_ == nullptr;
636 // Update config.
637 auto old_config = std::move(config_);
638 config_ = std::move(args.config);
639 // Update args.
640 grpc_channel_args_destroy(args_);
641 args_ = args.args;
642 args.args = nullptr;
643 // Update child policy if needed.
644 if (child_policy_ != nullptr) UpdateChildPolicyLocked();
645 // Create endpoint watcher if needed.
646 if (is_initial_update) {
647 for (const auto& config : config_->discovery_mechanisms()) {
648 DiscoveryMechanismEntry entry;
649 if (config.type == XdsClusterResolverLbConfig::DiscoveryMechanism::
650 DiscoveryMechanismType::EDS) {
651 entry.discovery_mechanism =
652 grpc_core::MakeOrphanable<EdsDiscoveryMechanism>(
653 Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism"),
654 discovery_mechanisms_.size());
655 } else if (config.type == XdsClusterResolverLbConfig::DiscoveryMechanism::
656 DiscoveryMechanismType::LOGICAL_DNS) {
657 entry.discovery_mechanism =
658 grpc_core::MakeOrphanable<LogicalDNSDiscoveryMechanism>(
659 Ref(DEBUG_LOCATION, "LogicalDNSDiscoveryMechanism"),
660 discovery_mechanisms_.size());
661 } else {
662 GPR_ASSERT(0);
663 }
664 discovery_mechanisms_.push_back(std::move(entry));
665 }
666 // Call start() on all discovery mechanisms after creation.
667 for (const auto& discovery_mechanism : discovery_mechanisms_) {
668 discovery_mechanism.discovery_mechanism->Start();
669 }
670 }
671 }
672
ResetBackoffLocked()673 void XdsClusterResolverLb::ResetBackoffLocked() {
674 // When the XdsClient is instantiated in the resolver instead of in this
675 // LB policy, this is done via the resolver, so we don't need to do it here.
676 if (!is_xds_uri_ && xds_client_ != nullptr) xds_client_->ResetBackoff();
677 if (child_policy_ != nullptr) {
678 child_policy_->ResetBackoffLocked();
679 }
680 }
681
ExitIdleLocked()682 void XdsClusterResolverLb::ExitIdleLocked() {
683 if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
684 }
685
OnEndpointChanged(size_t index,XdsApi::EdsUpdate update)686 void XdsClusterResolverLb::OnEndpointChanged(size_t index,
687 XdsApi::EdsUpdate update) {
688 if (shutting_down_) return;
689 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
690 gpr_log(GPR_INFO,
691 "[xds_cluster_resolver_lb %p] Received update from xds client"
692 " for discovery mechanism %" PRIuPTR "",
693 this, index);
694 }
695 // We need at least one priority for each discovery mechanism, just so that we
696 // have a child in which to create the xds_cluster_impl policy. This ensures
697 // that we properly handle the case of a discovery mechanism dropping 100% of
698 // calls, the OnError() case, and the OnResourceDoesNotExist() case.
699 if (update.priorities.empty()) update.priorities.emplace_back();
700 discovery_mechanisms_[index].drop_config = std::move(update.drop_config);
701 discovery_mechanisms_[index].pending_priority_list =
702 std::move(update.priorities);
703 discovery_mechanisms_[index].first_update_received = true;
704 // If any discovery mechanism has not received its first update,
705 // wait until that happens before creating the child policy.
706 // TODO(roth): If this becomes problematic in the future (e.g., a
707 // secondary discovery mechanism delaying us from starting up at all),
708 // we can consider some sort of optimization whereby we can create the
709 // priority policy with only a subset of its children. But we need to
710 // make sure not to get into a situation where the priority policy
711 // will put the channel into TRANSIENT_FAILURE instead of CONNECTING
712 // while we're still waiting for the other discovery mechanism(s).
713 for (DiscoveryMechanismEntry& mechanism : discovery_mechanisms_) {
714 if (!mechanism.first_update_received) return;
715 }
716 // Construct new priority list.
717 XdsApi::EdsUpdate::PriorityList priority_list;
718 size_t priority_index = 0;
719 for (DiscoveryMechanismEntry& mechanism : discovery_mechanisms_) {
720 // If the mechanism has a pending update, use that.
721 // Otherwise, use the priorities that it previously contributed to the
722 // combined list.
723 if (mechanism.pending_priority_list.has_value()) {
724 priority_list.insert(priority_list.end(),
725 mechanism.pending_priority_list->begin(),
726 mechanism.pending_priority_list->end());
727 priority_index += mechanism.num_priorities;
728 mechanism.num_priorities = mechanism.pending_priority_list->size();
729 mechanism.pending_priority_list.reset();
730 } else {
731 priority_list.insert(
732 priority_list.end(), priority_list_.begin() + priority_index,
733 priority_list_.begin() + priority_index + mechanism.num_priorities);
734 priority_index += mechanism.num_priorities;
735 }
736 }
737 // Update child policy.
738 UpdatePriorityList(std::move(priority_list));
739 }
740
OnError(size_t index,grpc_error * error)741 void XdsClusterResolverLb::OnError(size_t index, grpc_error* error) {
742 gpr_log(GPR_ERROR,
743 "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR
744 " xds watcher reported error: %s",
745 this, index, grpc_error_string(error));
746 GRPC_ERROR_UNREF(error);
747 if (shutting_down_) return;
748 if (!discovery_mechanisms_[index].first_update_received) {
749 // Call OnEndpointChanged with an empty update just like
750 // OnResourceDoesNotExist.
751 OnEndpointChanged(index, XdsApi::EdsUpdate());
752 }
753 }
754
OnResourceDoesNotExist(size_t index)755 void XdsClusterResolverLb::OnResourceDoesNotExist(size_t index) {
756 gpr_log(GPR_ERROR,
757 "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR
758 " resource does not exist",
759 this, index);
760 if (shutting_down_) return;
761 // Call OnEndpointChanged with an empty update.
762 OnEndpointChanged(index, XdsApi::EdsUpdate());
763 }
764
765 //
766 // child policy-related methods
767 //
768
UpdatePriorityList(XdsApi::EdsUpdate::PriorityList priority_list)769 void XdsClusterResolverLb::UpdatePriorityList(
770 XdsApi::EdsUpdate::PriorityList priority_list) {
771 // Build some maps from locality to child number and the reverse from
772 // the old data in priority_list_ and priority_child_numbers_.
773 std::map<XdsLocalityName*, size_t /*child_number*/, XdsLocalityName::Less>
774 locality_child_map;
775 std::map<size_t, std::set<XdsLocalityName*>> child_locality_map;
776 for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
777 size_t child_number = priority_child_numbers_[priority];
778 const auto& localities = priority_list_[priority].localities;
779 for (const auto& p : localities) {
780 XdsLocalityName* locality_name = p.first;
781 locality_child_map[locality_name] = child_number;
782 child_locality_map[child_number].insert(locality_name);
783 }
784 }
785 // Construct new list of children.
786 std::vector<size_t> priority_child_numbers;
787 for (size_t priority = 0; priority < priority_list.size(); ++priority) {
788 const auto& localities = priority_list[priority].localities;
789 absl::optional<size_t> child_number;
790 // If one of the localities in this priority already existed, reuse its
791 // child number.
792 for (const auto& p : localities) {
793 XdsLocalityName* locality_name = p.first;
794 if (!child_number.has_value()) {
795 auto it = locality_child_map.find(locality_name);
796 if (it != locality_child_map.end()) {
797 child_number = it->second;
798 locality_child_map.erase(it);
799 // Remove localities that *used* to be in this child number, so
800 // that we don't incorrectly reuse this child number for a
801 // subsequent priority.
802 for (XdsLocalityName* old_locality :
803 child_locality_map[*child_number]) {
804 locality_child_map.erase(old_locality);
805 }
806 }
807 } else {
808 // Remove all localities that are now in this child number, so
809 // that we don't accidentally reuse this child number for a
810 // subsequent priority.
811 locality_child_map.erase(locality_name);
812 }
813 }
814 // If we didn't find an existing child number, assign a new one.
815 if (!child_number.has_value()) {
816 for (child_number = 0;
817 child_locality_map.find(*child_number) != child_locality_map.end();
818 ++(*child_number)) {
819 }
820 // Add entry so we know that the child number is in use.
821 // (Don't need to add the list of localities, since we won't use them.)
822 child_locality_map[*child_number];
823 }
824 priority_child_numbers.push_back(*child_number);
825 }
826 // Save update.
827 priority_list_ = std::move(priority_list);
828 priority_child_numbers_ = std::move(priority_child_numbers);
829 // Update child policy.
830 UpdateChildPolicyLocked();
831 }
832
CreateChildPolicyAddressesLocked()833 ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
834 ServerAddressList addresses;
835 for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
836 const auto& localities = priority_list_[priority].localities;
837 std::string priority_child_name =
838 absl::StrCat("child", priority_child_numbers_[priority]);
839 for (const auto& p : localities) {
840 const auto& locality_name = p.first;
841 const auto& locality = p.second;
842 std::vector<std::string> hierarchical_path = {
843 priority_child_name, locality_name->AsHumanReadableString()};
844 for (const auto& endpoint : locality.endpoints) {
845 addresses.emplace_back(
846 endpoint
847 .WithAttribute(kHierarchicalPathAttributeKey,
848 MakeHierarchicalPathAttribute(hierarchical_path))
849 .WithAttribute(kXdsLocalityNameAttributeKey,
850 absl::make_unique<XdsLocalityAttribute>(
851 locality_name->Ref()))
852 .WithAttribute(ServerAddressWeightAttribute::
853 kServerAddressWeightAttributeKey,
854 absl::make_unique<ServerAddressWeightAttribute>(
855 locality.lb_weight)));
856 }
857 }
858 }
859 return addresses;
860 }
861
862 RefCountedPtr<LoadBalancingPolicy::Config>
CreateChildPolicyConfigLocked()863 XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
864 Json::Object priority_children;
865 Json::Array priority_priorities;
866 // Setting up index to iterate through the discovery mechanisms and keeping
867 // track the discovery_mechanism each priority belongs to.
868 size_t discovery_index = 0;
869 // Setting up num_priorities_remaining to track the priorities in each
870 // discovery_mechanism.
871 size_t num_priorities_remaining_in_discovery =
872 discovery_mechanisms_[discovery_index].num_priorities;
873 for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
874 Json child_policy;
875 if (!discovery_mechanisms_[discovery_index]
876 .discovery_mechanism->override_child_policy()
877 .empty()) {
878 child_policy = discovery_mechanisms_[discovery_index]
879 .discovery_mechanism->override_child_policy();
880 } else {
881 const auto& xds_lb_policy = config_->xds_lb_policy().object_value();
882 if (xds_lb_policy.find("ROUND_ROBIN") != xds_lb_policy.end()) {
883 const auto& localities = priority_list_[priority].localities;
884 Json::Object weighted_targets;
885 for (const auto& p : localities) {
886 XdsLocalityName* locality_name = p.first;
887 const auto& locality = p.second;
888 // Construct JSON object containing locality name.
889 Json::Object locality_name_json;
890 if (!locality_name->region().empty()) {
891 locality_name_json["region"] = locality_name->region();
892 }
893 if (!locality_name->zone().empty()) {
894 locality_name_json["zone"] = locality_name->zone();
895 }
896 if (!locality_name->sub_zone().empty()) {
897 locality_name_json["subzone"] = locality_name->sub_zone();
898 }
899 // Add weighted target entry.
900 weighted_targets[locality_name->AsHumanReadableString()] =
901 Json::Object{
902 {"weight", locality.lb_weight},
903 {"childPolicy",
904 Json::Array{
905 Json::Object{
906 {"round_robin", Json::Object()},
907 },
908 }},
909 };
910 }
911 // Construct locality-picking policy.
912 // Start with field from our config and add the "targets" field.
913 child_policy = Json::Array{
914 Json::Object{
915 {"weighted_target_experimental",
916 Json::Object{
917 {"targets", Json::Object()},
918 }},
919 },
920 };
921 Json::Object& config =
922 *(*child_policy.mutable_array())[0].mutable_object();
923 auto it = config.begin();
924 GPR_ASSERT(it != config.end());
925 (*it->second.mutable_object())["targets"] = std::move(weighted_targets);
926 } else {
927 auto it = xds_lb_policy.find("RING_HASH");
928 GPR_ASSERT(it != xds_lb_policy.end());
929 Json::Object ring_hash_experimental_policy = it->second.object_value();
930 child_policy = Json::Array{
931 Json::Object{
932 {"ring_hash_experimental", ring_hash_experimental_policy},
933 },
934 };
935 }
936 }
937 // Wrap it in the drop policy.
938 Json::Array drop_categories;
939 if (discovery_mechanisms_[discovery_index].drop_config != nullptr) {
940 for (const auto& category : discovery_mechanisms_[discovery_index]
941 .drop_config->drop_category_list()) {
942 drop_categories.push_back(Json::Object{
943 {"category", category.name},
944 {"requests_per_million", category.parts_per_million},
945 });
946 }
947 }
948 const auto lrs_key = discovery_mechanisms_[discovery_index]
949 .discovery_mechanism->GetLrsClusterKey();
950 Json::Object xds_cluster_impl_config = {
951 {"clusterName", std::string(lrs_key.first)},
952 {"childPolicy", std::move(child_policy)},
953 {"dropCategories", std::move(drop_categories)},
954 {"maxConcurrentRequests",
955 config_->discovery_mechanisms()[discovery_index]
956 .max_concurrent_requests},
957 };
958 if (!lrs_key.second.empty()) {
959 xds_cluster_impl_config["edsServiceName"] = std::string(lrs_key.second);
960 }
961 if (config_->discovery_mechanisms()[discovery_index]
962 .lrs_load_reporting_server_name.has_value()) {
963 xds_cluster_impl_config["lrsLoadReportingServerName"] =
964 config_->discovery_mechanisms()[discovery_index]
965 .lrs_load_reporting_server_name.value();
966 }
967 Json locality_picking_policy = Json::Array{Json::Object{
968 {"xds_cluster_impl_experimental", std::move(xds_cluster_impl_config)},
969 }};
970 // Add priority entry.
971 const size_t child_number = priority_child_numbers_[priority];
972 std::string child_name = absl::StrCat("child", child_number);
973 priority_priorities.emplace_back(child_name);
974 Json::Object child_config = {
975 {"config", std::move(locality_picking_policy)},
976 };
977 if (discovery_mechanisms_[discovery_index]
978 .discovery_mechanism->disable_reresolution()) {
979 child_config["ignore_reresolution_requests"] = true;
980 }
981 priority_children[child_name] = std::move(child_config);
982 // Each priority in the priority_list_ should correspond to a priority in a
983 // discovery mechanism in discovery_mechanisms_ (both in the same order).
984 // Keeping track of the discovery_mechanism each priority belongs to.
985 --num_priorities_remaining_in_discovery;
986 while (num_priorities_remaining_in_discovery == 0 &&
987 discovery_index < discovery_mechanisms_.size() - 1) {
988 ++discovery_index;
989 num_priorities_remaining_in_discovery =
990 discovery_mechanisms_[discovery_index].num_priorities;
991 }
992 }
993 // There should be matching number of priorities in discovery_mechanisms_ and
994 // in priority_list_; therefore at the end of looping through all the
995 // priorities, num_priorities_remaining should be down to 0, and index should
996 // be the last index in discovery_mechanisms_.
997 GPR_ASSERT(num_priorities_remaining_in_discovery == 0);
998 GPR_ASSERT(discovery_index == discovery_mechanisms_.size() - 1);
999 Json json = Json::Array{Json::Object{
1000 {"priority_experimental",
1001 Json::Object{
1002 {"children", std::move(priority_children)},
1003 {"priorities", std::move(priority_priorities)},
1004 }},
1005 }};
1006 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
1007 std::string json_str = json.Dump(/*indent=*/1);
1008 gpr_log(
1009 GPR_INFO,
1010 "[xds_cluster_resolver_lb %p] generated config for child policy: %s",
1011 this, json_str.c_str());
1012 }
1013 grpc_error* error = GRPC_ERROR_NONE;
1014 RefCountedPtr<LoadBalancingPolicy::Config> config =
1015 LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error);
1016 if (error != GRPC_ERROR_NONE) {
1017 // This should never happen, but if it does, we basically have no
1018 // way to fix it, so we put the channel in TRANSIENT_FAILURE.
1019 gpr_log(GPR_ERROR,
1020 "[xds_cluster_resolver_lb %p] error parsing generated child policy "
1021 "config -- "
1022 "will put channel in TRANSIENT_FAILURE: %s",
1023 this, grpc_error_string(error));
1024 error = grpc_error_set_int(
1025 grpc_error_add_child(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1026 "xds_cluster_resolver LB policy: error "
1027 "parsing generated child policy config"),
1028 error),
1029 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
1030 channel_control_helper()->UpdateState(
1031 GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
1032 absl::make_unique<TransientFailurePicker>(error));
1033 return nullptr;
1034 }
1035 return config;
1036 }
1037
UpdateChildPolicyLocked()1038 void XdsClusterResolverLb::UpdateChildPolicyLocked() {
1039 if (shutting_down_) return;
1040 UpdateArgs update_args;
1041 update_args.config = CreateChildPolicyConfigLocked();
1042 if (update_args.config == nullptr) return;
1043 update_args.addresses = CreateChildPolicyAddressesLocked();
1044 update_args.args = CreateChildPolicyArgsLocked(args_);
1045 if (child_policy_ == nullptr) {
1046 child_policy_ = CreateChildPolicyLocked(update_args.args);
1047 }
1048 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
1049 gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] Updating child policy %p",
1050 this, child_policy_.get());
1051 }
1052 child_policy_->UpdateLocked(std::move(update_args));
1053 }
1054
CreateChildPolicyArgsLocked(const grpc_channel_args * args)1055 grpc_channel_args* XdsClusterResolverLb::CreateChildPolicyArgsLocked(
1056 const grpc_channel_args* args) {
1057 // Inhibit client-side health checking, since the balancer does this for us.
1058 grpc_arg new_arg = grpc_channel_arg_integer_create(
1059 const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
1060 return grpc_channel_args_copy_and_add(args, &new_arg, 1);
1061 }
1062
1063 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const grpc_channel_args * args)1064 XdsClusterResolverLb::CreateChildPolicyLocked(const grpc_channel_args* args) {
1065 LoadBalancingPolicy::Args lb_policy_args;
1066 lb_policy_args.work_serializer = work_serializer();
1067 lb_policy_args.args = args;
1068 lb_policy_args.channel_control_helper =
1069 absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
1070 OrphanablePtr<LoadBalancingPolicy> lb_policy =
1071 LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
1072 "priority_experimental", std::move(lb_policy_args));
1073 if (GPR_UNLIKELY(lb_policy == nullptr)) {
1074 gpr_log(GPR_ERROR,
1075 "[xds_cluster_resolver_lb %p] failure creating child policy", this);
1076 return nullptr;
1077 }
1078 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
1079 gpr_log(GPR_INFO,
1080 "[xds_cluster_resolver_lb %p]: Created new child policy %p", this,
1081 lb_policy.get());
1082 }
1083 // Add our interested_parties pollset_set to that of the newly created
1084 // child policy. This will make the child policy progress upon activity on
1085 // this policy, which in turn is tied to the application's call.
1086 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1087 interested_parties());
1088 return lb_policy;
1089 }
1090
1091 //
1092 // factory
1093 //
1094
1095 class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
1096 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const1097 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1098 LoadBalancingPolicy::Args args) const override {
1099 grpc_error* error = GRPC_ERROR_NONE;
1100 RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
1101 if (error != GRPC_ERROR_NONE) {
1102 gpr_log(GPR_ERROR,
1103 "cannot get XdsClient to instantiate xds_cluster_resolver LB "
1104 "policy: %s",
1105 grpc_error_string(error));
1106 GRPC_ERROR_UNREF(error);
1107 return nullptr;
1108 }
1109 return MakeOrphanable<XdsClusterResolverChildHandler>(std::move(xds_client),
1110 std::move(args));
1111 }
1112
name() const1113 const char* name() const override { return kXdsClusterResolver; }
1114
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const1115 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
1116 const Json& json, grpc_error** error) const override {
1117 GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
1118 if (json.type() == Json::Type::JSON_NULL) {
1119 // xds_cluster_resolver was mentioned as a policy in the deprecated
1120 // loadBalancingPolicy field or in the client API.
1121 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1122 "field:loadBalancingPolicy error:xds_cluster_resolver policy "
1123 "requires configuration. "
1124 "Please use loadBalancingConfig field of service config instead.");
1125 return nullptr;
1126 }
1127 std::vector<grpc_error*> error_list;
1128 std::vector<XdsClusterResolverLbConfig::DiscoveryMechanism>
1129 discovery_mechanisms;
1130 auto it = json.object_value().find("discoveryMechanisms");
1131 if (it == json.object_value().end()) {
1132 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1133 "field:discoveryMechanisms error:required field missing"));
1134 } else if (it->second.type() != Json::Type::ARRAY) {
1135 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1136 "field:discoveryMechanisms error:type should be array"));
1137 } else {
1138 const Json::Array& array = it->second.array_value();
1139 for (size_t i = 0; i < array.size(); ++i) {
1140 XdsClusterResolverLbConfig::DiscoveryMechanism discovery_mechanism;
1141 std::vector<grpc_error*> discovery_mechanism_errors =
1142 ParseDiscoveryMechanism(array[i], &discovery_mechanism);
1143 if (!discovery_mechanism_errors.empty()) {
1144 grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
1145 absl::StrCat("field:discovery_mechanism element: ", i, " error")
1146 .c_str());
1147 for (grpc_error* discovery_mechanism_error :
1148 discovery_mechanism_errors) {
1149 error = grpc_error_add_child(error, discovery_mechanism_error);
1150 }
1151 error_list.push_back(error);
1152 }
1153 discovery_mechanisms.emplace_back(std::move(discovery_mechanism));
1154 }
1155 }
1156 if (discovery_mechanisms.empty()) {
1157 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1158 "field:discovery_mechanism error:list is missing or empty"));
1159 }
1160 Json xds_lb_policy = Json::Object{
1161 {"ROUND_ROBIN", Json::Object()},
1162 };
1163 it = json.object_value().find("xdsLbPolicy");
1164 if (it != json.object_value().end()) {
1165 if (it->second.type() != Json::Type::ARRAY) {
1166 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1167 "field:xdsLbPolicy error:type should be array"));
1168 } else {
1169 const Json::Array& array = it->second.array_value();
1170 for (size_t i = 0; i < array.size(); ++i) {
1171 if (array[i].type() != Json::Type::OBJECT) {
1172 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1173 "field:xdsLbPolicy error:element should be of type object"));
1174 continue;
1175 }
1176 const Json::Object& policy = array[i].object_value();
1177 auto policy_it = policy.find("ROUND_ROBIN");
1178 if (policy_it != policy.end()) {
1179 if (policy_it->second.type() != Json::Type::OBJECT) {
1180 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1181 "field:ROUND_ROBIN error:type should be object"));
1182 }
1183 break;
1184 }
1185 policy_it = policy.find("RING_HASH");
1186 if (policy_it != policy.end()) {
1187 if (policy_it->second.type() != Json::Type::OBJECT) {
1188 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1189 "field:RING_HASH error:type should be object"));
1190 continue;
1191 }
1192 // TODO(donnadionne): Move this to a method in
1193 // ring_hash_experimental and call it here.
1194 const Json::Object& ring_hash = policy_it->second.object_value();
1195 xds_lb_policy = array[i];
1196 size_t min_ring_size = 1024;
1197 size_t max_ring_size = 8388608;
1198 auto ring_hash_it = ring_hash.find("min_ring_size");
1199 if (ring_hash_it == ring_hash.end()) {
1200 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1201 "field:min_ring_size missing"));
1202 } else if (ring_hash_it->second.type() != Json::Type::NUMBER) {
1203 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1204 "field:min_ring_size error: should be of "
1205 "number"));
1206 } else {
1207 min_ring_size = gpr_parse_nonnegative_int(
1208 ring_hash_it->second.string_value().c_str());
1209 }
1210 ring_hash_it = ring_hash.find("max_ring_size");
1211 if (ring_hash_it == ring_hash.end()) {
1212 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1213 "field:max_ring_size missing"));
1214 } else if (ring_hash_it->second.type() != Json::Type::NUMBER) {
1215 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1216 "field:max_ring_size error: should be of "
1217 "number"));
1218 } else {
1219 max_ring_size = gpr_parse_nonnegative_int(
1220 ring_hash_it->second.string_value().c_str());
1221 }
1222 if (min_ring_size <= 0 || min_ring_size > 8388608 ||
1223 max_ring_size <= 0 || max_ring_size > 8388608 ||
1224 min_ring_size > max_ring_size) {
1225 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1226 "field:max_ring_size and or min_ring_size error: "
1227 "values need to be in the range of 1 to 8388608 "
1228 "and max_ring_size cannot be smaller than "
1229 "min_ring_size"));
1230 }
1231 ring_hash_it = ring_hash.find("hash_function");
1232 if (ring_hash_it == ring_hash.end()) {
1233 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1234 "field:hash_function missing"));
1235 } else if (ring_hash_it->second.type() != Json::Type::STRING) {
1236 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1237 "field:hash_function error: should be a "
1238 "string"));
1239 } else if (ring_hash_it->second.string_value() != "XX_HASH" &&
1240 ring_hash_it->second.string_value() != "MURMUR_HASH_2") {
1241 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1242 "field:hash_function error: unsupported "
1243 "hash_function"));
1244 }
1245 break;
1246 }
1247 }
1248 }
1249 }
1250 // Construct config.
1251 if (error_list.empty()) {
1252 return MakeRefCounted<XdsClusterResolverLbConfig>(
1253 std::move(discovery_mechanisms), std::move(xds_lb_policy));
1254 } else {
1255 *error = GRPC_ERROR_CREATE_FROM_VECTOR(
1256 "xds_cluster_resolver_experimental LB policy config", &error_list);
1257 return nullptr;
1258 }
1259 }
1260
1261 private:
ParseDiscoveryMechanism(const Json & json,XdsClusterResolverLbConfig::DiscoveryMechanism * discovery_mechanism)1262 static std::vector<grpc_error*> ParseDiscoveryMechanism(
1263 const Json& json,
1264 XdsClusterResolverLbConfig::DiscoveryMechanism* discovery_mechanism) {
1265 std::vector<grpc_error*> error_list;
1266 if (json.type() != Json::Type::OBJECT) {
1267 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1268 "value should be of type object"));
1269 return error_list;
1270 }
1271 // Cluster name.
1272 auto it = json.object_value().find("clusterName");
1273 if (it == json.object_value().end()) {
1274 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1275 "field:clusterName error:required field missing"));
1276 } else if (it->second.type() != Json::Type::STRING) {
1277 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1278 "field:clusterName error:type should be string"));
1279 } else {
1280 discovery_mechanism->cluster_name = it->second.string_value();
1281 }
1282 // LRS load reporting server name.
1283 it = json.object_value().find("lrsLoadReportingServerName");
1284 if (it != json.object_value().end()) {
1285 if (it->second.type() != Json::Type::STRING) {
1286 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1287 "field:lrsLoadReportingServerName error:type should be string"));
1288 } else {
1289 discovery_mechanism->lrs_load_reporting_server_name.emplace(
1290 it->second.string_value());
1291 }
1292 }
1293 // Max concurrent requests.
1294 discovery_mechanism->max_concurrent_requests = 1024;
1295 it = json.object_value().find("max_concurrent_requests");
1296 if (it != json.object_value().end()) {
1297 if (it->second.type() != Json::Type::NUMBER) {
1298 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1299 "field:max_concurrent_requests error:must be of type number"));
1300 } else {
1301 discovery_mechanism->max_concurrent_requests =
1302 gpr_parse_nonnegative_int(it->second.string_value().c_str());
1303 }
1304 }
1305 // Discovery Mechanism type
1306 it = json.object_value().find("type");
1307 if (it == json.object_value().end()) {
1308 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1309 "field:type error:required field missing"));
1310 } else if (it->second.type() != Json::Type::STRING) {
1311 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1312 "field:type error:type should be string"));
1313 } else {
1314 if (it->second.string_value() == "EDS") {
1315 discovery_mechanism->type = XdsClusterResolverLbConfig::
1316 DiscoveryMechanism::DiscoveryMechanismType::EDS;
1317 } else if (it->second.string_value() == "LOGICAL_DNS") {
1318 discovery_mechanism->type = XdsClusterResolverLbConfig::
1319 DiscoveryMechanism::DiscoveryMechanismType::LOGICAL_DNS;
1320 } else {
1321 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1322 "field:type error:invalid type"));
1323 }
1324 }
1325 // EDS service name.
1326 it = json.object_value().find("edsServiceName");
1327 if (it != json.object_value().end()) {
1328 if (it->second.type() != Json::Type::STRING) {
1329 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1330 "field:xds_cluster_resolverServiceName error:type should be "
1331 "string"));
1332 } else {
1333 discovery_mechanism->eds_service_name = it->second.string_value();
1334 }
1335 }
1336 return error_list;
1337 }
1338
1339 class XdsClusterResolverChildHandler : public ChildPolicyHandler {
1340 public:
XdsClusterResolverChildHandler(RefCountedPtr<XdsClient> xds_client,Args args)1341 XdsClusterResolverChildHandler(RefCountedPtr<XdsClient> xds_client,
1342 Args args)
1343 : ChildPolicyHandler(std::move(args),
1344 &grpc_lb_xds_cluster_resolver_trace),
1345 xds_client_(std::move(xds_client)) {}
1346
ConfigChangeRequiresNewPolicyInstance(LoadBalancingPolicy::Config * old_config,LoadBalancingPolicy::Config * new_config) const1347 bool ConfigChangeRequiresNewPolicyInstance(
1348 LoadBalancingPolicy::Config* old_config,
1349 LoadBalancingPolicy::Config* new_config) const override {
1350 GPR_ASSERT(old_config->name() == kXdsClusterResolver);
1351 GPR_ASSERT(new_config->name() == kXdsClusterResolver);
1352 XdsClusterResolverLbConfig* old_xds_cluster_resolver_config =
1353 static_cast<XdsClusterResolverLbConfig*>(old_config);
1354 XdsClusterResolverLbConfig* new_xds_cluster_resolver_config =
1355 static_cast<XdsClusterResolverLbConfig*>(new_config);
1356 return old_xds_cluster_resolver_config->discovery_mechanisms() !=
1357 new_xds_cluster_resolver_config->discovery_mechanisms();
1358 }
1359
CreateLoadBalancingPolicy(const char *,LoadBalancingPolicy::Args args) const1360 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1361 const char* /*name*/, LoadBalancingPolicy::Args args) const override {
1362 return MakeOrphanable<XdsClusterResolverLb>(xds_client_, std::move(args));
1363 }
1364
1365 private:
1366 RefCountedPtr<XdsClient> xds_client_;
1367 };
1368 };
1369
1370 } // namespace
1371
1372 } // namespace grpc_core
1373
1374 //
1375 // Plugin registration
1376 //
1377
grpc_lb_policy_xds_cluster_resolver_init()1378 void grpc_lb_policy_xds_cluster_resolver_init() {
1379 grpc_core::LoadBalancingPolicyRegistry::Builder::
1380 RegisterLoadBalancingPolicyFactory(
1381 absl::make_unique<grpc_core::XdsClusterResolverLbFactory>());
1382 }
1383
grpc_lb_policy_xds_cluster_resolver_shutdown()1384 void grpc_lb_policy_xds_cluster_resolver_shutdown() {}
1385