• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <inttypes.h>
20 #include <limits.h>
21 
22 #include "absl/strings/str_cat.h"
23 #include "absl/types/optional.h"
24 
25 #include <grpc/grpc.h>
26 
27 #include "src/core/ext/filters/client_channel/client_channel.h"
28 #include "src/core/ext/filters/client_channel/lb_policy.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
30 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
31 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds.h"
32 #include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
33 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
34 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
35 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
36 #include "src/core/ext/filters/client_channel/resolver_registry.h"
37 #include "src/core/ext/filters/client_channel/server_address.h"
38 #include "src/core/ext/xds/xds_channel_args.h"
39 #include "src/core/ext/xds/xds_client.h"
40 #include "src/core/ext/xds/xds_client_stats.h"
41 #include "src/core/lib/channel/channel_args.h"
42 #include "src/core/lib/gpr/string.h"
43 #include "src/core/lib/gprpp/orphanable.h"
44 #include "src/core/lib/gprpp/ref_counted_ptr.h"
45 #include "src/core/lib/iomgr/work_serializer.h"
46 #include "src/core/lib/transport/error_utils.h"
47 #include "src/core/lib/uri/uri_parser.h"
48 
49 #define GRPC_EDS_DEFAULT_FALLBACK_TIMEOUT 10000
50 
51 namespace grpc_core {
52 
53 TraceFlag grpc_lb_xds_cluster_resolver_trace(false, "xds_cluster_resolver_lb");
54 
55 const char* kXdsLocalityNameAttributeKey = "xds_locality_name";
56 
57 namespace {
58 
59 constexpr char kXdsClusterResolver[] = "xds_cluster_resolver_experimental";
60 
61 // Config for EDS LB policy.
62 class XdsClusterResolverLbConfig : public LoadBalancingPolicy::Config {
63  public:
64   struct DiscoveryMechanism {
65     std::string cluster_name;
66     absl::optional<std::string> lrs_load_reporting_server_name;
67     uint32_t max_concurrent_requests;
68     enum DiscoveryMechanismType {
69       EDS,
70       LOGICAL_DNS,
71     };
72     DiscoveryMechanismType type;
73     std::string eds_service_name;
74 
operator ==grpc_core::__anon9699b2f90111::XdsClusterResolverLbConfig::DiscoveryMechanism75     bool operator==(const DiscoveryMechanism& other) const {
76       return (cluster_name == other.cluster_name &&
77               lrs_load_reporting_server_name ==
78                   other.lrs_load_reporting_server_name &&
79               max_concurrent_requests == other.max_concurrent_requests &&
80               type == other.type && eds_service_name == other.eds_service_name);
81     }
82   };
83 
XdsClusterResolverLbConfig(std::vector<DiscoveryMechanism> discovery_mechanisms,Json xds_lb_policy)84   XdsClusterResolverLbConfig(
85       std::vector<DiscoveryMechanism> discovery_mechanisms, Json xds_lb_policy)
86       : discovery_mechanisms_(std::move(discovery_mechanisms)),
87         xds_lb_policy_(std::move(xds_lb_policy)) {}
88 
name() const89   const char* name() const override { return kXdsClusterResolver; }
discovery_mechanisms() const90   const std::vector<DiscoveryMechanism>& discovery_mechanisms() const {
91     return discovery_mechanisms_;
92   }
93 
xds_lb_policy() const94   const Json& xds_lb_policy() const { return xds_lb_policy_; }
95 
96  private:
97   std::vector<DiscoveryMechanism> discovery_mechanisms_;
98   Json xds_lb_policy_;
99 };
100 
101 // Xds Cluster Resolver LB policy.
102 class XdsClusterResolverLb : public LoadBalancingPolicy {
103  public:
104   XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client, Args args);
105 
name() const106   const char* name() const override { return kXdsClusterResolver; }
107 
108   void UpdateLocked(UpdateArgs args) override;
109   void ResetBackoffLocked() override;
110   void ExitIdleLocked() override;
111 
112  private:
113   // Discovery Mechanism Base class
114   //
115   // Implemented by EDS and LOGICAL_DNS.
116   //
117   // Implementations are responsible for calling the LB policy's
118   // OnEndpointChanged(), OnError(), and OnResourceDoesNotExist()
119   // methods when the corresponding events occur.
120   //
121   // Must implement Orphan() method to cancel the watchers.
122   class DiscoveryMechanism : public InternallyRefCounted<DiscoveryMechanism> {
123    public:
DiscoveryMechanism(RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,size_t index)124     DiscoveryMechanism(
125         RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,
126         size_t index)
127         : parent_(std::move(xds_cluster_resolver_lb)), index_(index) {}
128     virtual void Start() = 0;
129     void Orphan() override = 0;
130     virtual Json::Array override_child_policy() = 0;
131     virtual bool disable_reresolution() = 0;
132 
133     // Caller must ensure that config_ is set before calling.
GetXdsClusterResolverResourceName() const134     const absl::string_view GetXdsClusterResolverResourceName() const {
135       if (!parent_->is_xds_uri_) return parent_->server_name_;
136       if (!parent_->config_->discovery_mechanisms()[index_]
137                .eds_service_name.empty()) {
138         return parent_->config_->discovery_mechanisms()[index_]
139             .eds_service_name;
140       }
141       return parent_->config_->discovery_mechanisms()[index_].cluster_name;
142     }
143 
144     // Returns a pair containing the cluster and eds_service_name
145     // to use for LRS load reporting. Caller must ensure that config_ is set
146     // before calling.
GetLrsClusterKey() const147     std::pair<absl::string_view, absl::string_view> GetLrsClusterKey() const {
148       if (!parent_->is_xds_uri_) return {parent_->server_name_, nullptr};
149       return {
150           parent_->config_->discovery_mechanisms()[index_].cluster_name,
151           parent_->config_->discovery_mechanisms()[index_].eds_service_name};
152     }
153 
154    protected:
parent() const155     XdsClusterResolverLb* parent() const { return parent_.get(); }
index() const156     size_t index() const { return index_; }
157 
158    private:
159     RefCountedPtr<XdsClusterResolverLb> parent_;
160     // Stores its own index in the vector of DiscoveryMechanism.
161     size_t index_;
162   };
163 
164   class EdsDiscoveryMechanism : public DiscoveryMechanism {
165    public:
EdsDiscoveryMechanism(RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,size_t index)166     EdsDiscoveryMechanism(
167         RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,
168         size_t index)
169         : DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) {}
170     void Start() override;
171     void Orphan() override;
override_child_policy()172     Json::Array override_child_policy() override { return Json::Array{}; }
disable_reresolution()173     bool disable_reresolution() override { return true; }
174 
175    private:
176     class EndpointWatcher : public XdsClient::EndpointWatcherInterface {
177      public:
EndpointWatcher(RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism)178       explicit EndpointWatcher(
179           RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism)
180           : discovery_mechanism_(std::move(discovery_mechanism)) {}
~EndpointWatcher()181       ~EndpointWatcher() override {
182         discovery_mechanism_.reset(DEBUG_LOCATION, "EndpointWatcher");
183       }
OnEndpointChanged(XdsApi::EdsUpdate update)184       void OnEndpointChanged(XdsApi::EdsUpdate update) override {
185         new Notifier(discovery_mechanism_, std::move(update));
186       }
OnError(grpc_error * error)187       void OnError(grpc_error* error) override {
188         new Notifier(discovery_mechanism_, error);
189       }
OnResourceDoesNotExist()190       void OnResourceDoesNotExist() override {
191         new Notifier(discovery_mechanism_);
192       }
193 
194      private:
195       class Notifier {
196        public:
197         Notifier(RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism,
198                  XdsApi::EdsUpdate update);
199         Notifier(RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism,
200                  grpc_error* error);
201         explicit Notifier(
202             RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism);
~Notifier()203         ~Notifier() { discovery_mechanism_.reset(DEBUG_LOCATION, "Notifier"); }
204 
205        private:
206         enum Type { kUpdate, kError, kDoesNotExist };
207 
208         static void RunInExecCtx(void* arg, grpc_error* error);
209         void RunInWorkSerializer(grpc_error* error);
210 
211         RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism_;
212         grpc_closure closure_;
213         XdsApi::EdsUpdate update_;
214         Type type_;
215       };
216 
217       RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism_;
218     };
219 
220     // Note that this is not owned, so this pointer must never be dereferenced.
221     EndpointWatcher* watcher_ = nullptr;
222   };
223 
224   class LogicalDNSDiscoveryMechanism : public DiscoveryMechanism {
225    public:
LogicalDNSDiscoveryMechanism(RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,size_t index)226     LogicalDNSDiscoveryMechanism(
227         RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_lb,
228         size_t index)
229         : DiscoveryMechanism(std::move(xds_cluster_resolver_lb), index) {}
230     void Start() override;
231     void Orphan() override;
override_child_policy()232     Json::Array override_child_policy() override {
233       return Json::Array{
234           Json::Object{
235               {"pick_first", Json::Object()},
236           },
237       };
238     }
disable_reresolution()239     bool disable_reresolution() override { return false; };
240 
241    private:
242     class ResolverResultHandler : public Resolver::ResultHandler {
243      public:
ResolverResultHandler(RefCountedPtr<LogicalDNSDiscoveryMechanism> discovery_mechanism)244       explicit ResolverResultHandler(
245           RefCountedPtr<LogicalDNSDiscoveryMechanism> discovery_mechanism)
246           : discovery_mechanism_(std::move(discovery_mechanism)) {}
247 
~ResolverResultHandler()248       ~ResolverResultHandler() override {}
249 
250       void ReturnResult(Resolver::Result result) override;
251 
252       void ReturnError(grpc_error* error) override;
253 
254      private:
255       RefCountedPtr<LogicalDNSDiscoveryMechanism> discovery_mechanism_;
256     };
257     // This is only necessary because of a bug in msvc where nested class cannot
258     // access protected member in base class.
259     friend class ResolverResultHandler;
260     OrphanablePtr<Resolver> resolver_;
261   };
262 
263   struct DiscoveryMechanismEntry {
264     OrphanablePtr<DiscoveryMechanism> discovery_mechanism;
265     bool first_update_received = false;
266     // Number of priorities this mechanism has contributed to priority_list_.
267     // (The sum of this across all discovery mechanisms should always equal
268     // the number of priorities in priority_list_.)
269     uint32_t num_priorities = 0;
270     RefCountedPtr<XdsApi::EdsUpdate::DropConfig> drop_config;
271     // Populated only when an update has been delivered by the mechanism
272     // but has not yet been applied to the LB policy's combined priority_list_.
273     absl::optional<XdsApi::EdsUpdate::PriorityList> pending_priority_list;
274   };
275 
276   class Helper : public ChannelControlHelper {
277    public:
Helper(RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_policy)278     explicit Helper(
279         RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_policy)
280         : xds_cluster_resolver_policy_(std::move(xds_cluster_resolver_policy)) {
281     }
282 
~Helper()283     ~Helper() override {
284       xds_cluster_resolver_policy_.reset(DEBUG_LOCATION, "Helper");
285     }
286 
287     RefCountedPtr<SubchannelInterface> CreateSubchannel(
288         ServerAddress address, const grpc_channel_args& args) override;
289     void UpdateState(grpc_connectivity_state state, const absl::Status& status,
290                      std::unique_ptr<SubchannelPicker> picker) override;
291     // This is a no-op, because we get the addresses from the xds
292     // client, which is a watch-based API.
RequestReresolution()293     void RequestReresolution() override {}
294     void AddTraceEvent(TraceSeverity severity,
295                        absl::string_view message) override;
296 
297    private:
298     RefCountedPtr<XdsClusterResolverLb> xds_cluster_resolver_policy_;
299   };
300 
301   ~XdsClusterResolverLb() override;
302 
303   void ShutdownLocked() override;
304 
305   void OnEndpointChanged(size_t index, XdsApi::EdsUpdate update);
306   void OnError(size_t index, grpc_error* error);
307   void OnResourceDoesNotExist(size_t index);
308 
309   void MaybeDestroyChildPolicyLocked();
310 
311   void UpdatePriorityList(XdsApi::EdsUpdate::PriorityList priority_list);
312   void UpdateChildPolicyLocked();
313   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
314       const grpc_channel_args* args);
315   ServerAddressList CreateChildPolicyAddressesLocked();
316   RefCountedPtr<Config> CreateChildPolicyConfigLocked();
317   grpc_channel_args* CreateChildPolicyArgsLocked(
318       const grpc_channel_args* args_in);
319 
320   // Server name from target URI.
321   std::string server_name_;
322   bool is_xds_uri_;
323 
324   // Current channel args and config from the resolver.
325   const grpc_channel_args* args_ = nullptr;
326   RefCountedPtr<XdsClusterResolverLbConfig> config_;
327 
328   // Internal state.
329   bool shutting_down_ = false;
330 
331   // The xds client and endpoint watcher.
332   RefCountedPtr<XdsClient> xds_client_;
333 
334   // Vector of discovery mechansism entries in priority order.
335   std::vector<DiscoveryMechanismEntry> discovery_mechanisms_;
336 
337   // The latest data from the endpoint watcher.
338   XdsApi::EdsUpdate::PriorityList priority_list_;
339   // State used to retain child policy names for priority policy.
340   std::vector<size_t /*child_number*/> priority_child_numbers_;
341 
342   OrphanablePtr<LoadBalancingPolicy> child_policy_;
343 };
344 
345 //
346 // XdsClusterResolverLb::Helper
347 //
348 
349 RefCountedPtr<SubchannelInterface>
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)350 XdsClusterResolverLb::Helper::CreateSubchannel(ServerAddress address,
351                                                const grpc_channel_args& args) {
352   if (xds_cluster_resolver_policy_->shutting_down_) return nullptr;
353   return xds_cluster_resolver_policy_->channel_control_helper()
354       ->CreateSubchannel(std::move(address), args);
355 }
356 
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)357 void XdsClusterResolverLb::Helper::UpdateState(
358     grpc_connectivity_state state, const absl::Status& status,
359     std::unique_ptr<SubchannelPicker> picker) {
360   if (xds_cluster_resolver_policy_->shutting_down_ ||
361       xds_cluster_resolver_policy_->child_policy_ == nullptr) {
362     return;
363   }
364   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
365     gpr_log(GPR_INFO,
366             "[xds_cluster_resolver_lb %p] child policy updated state=%s (%s) "
367             "picker=%p",
368             xds_cluster_resolver_policy_.get(), ConnectivityStateName(state),
369             status.ToString().c_str(), picker.get());
370   }
371   xds_cluster_resolver_policy_->channel_control_helper()->UpdateState(
372       state, status, std::move(picker));
373 }
374 
AddTraceEvent(TraceSeverity severity,absl::string_view message)375 void XdsClusterResolverLb::Helper::AddTraceEvent(TraceSeverity severity,
376                                                  absl::string_view message) {
377   if (xds_cluster_resolver_policy_->shutting_down_) return;
378   xds_cluster_resolver_policy_->channel_control_helper()->AddTraceEvent(
379       severity, message);
380 }
381 
382 //
383 // XdsClusterResolverLb::EdsDiscoveryMechanism
384 //
385 
Start()386 void XdsClusterResolverLb::EdsDiscoveryMechanism::Start() {
387   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
388     gpr_log(GPR_INFO,
389             "[xds_cluster_resolver_lb %p] eds discovery mechanism %" PRIuPTR
390             ":%p starting xds watch for %s",
391             parent(), index(), this,
392             std::string(GetXdsClusterResolverResourceName()).c_str());
393   }
394   auto watcher = absl::make_unique<EndpointWatcher>(
395       Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism"));
396   watcher_ = watcher.get();
397   parent()->xds_client_->WatchEndpointData(GetXdsClusterResolverResourceName(),
398                                            std::move(watcher));
399 }
400 
Orphan()401 void XdsClusterResolverLb::EdsDiscoveryMechanism::Orphan() {
402   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
403     gpr_log(GPR_INFO,
404             "[xds_cluster_resolver_lb %p] eds discovery mechanism %" PRIuPTR
405             ":%p cancelling xds watch for %s",
406             parent(), index(), this,
407             std::string(GetXdsClusterResolverResourceName()).c_str());
408   }
409   parent()->xds_client_->CancelEndpointDataWatch(
410       GetXdsClusterResolverResourceName(), watcher_);
411   Unref();
412 }
413 
414 //
415 // XdsClusterResolverLb::EndpointWatcher::Notifier
416 //
417 
418 XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism> discovery_mechanism,XdsApi::EdsUpdate update)419     Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism>
420                  discovery_mechanism,
421              XdsApi::EdsUpdate update)
422     : discovery_mechanism_(std::move(discovery_mechanism)),
423       update_(std::move(update)),
424       type_(kUpdate) {
425   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
426   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
427 }
428 
429 XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism> discovery_mechanism,grpc_error * error)430     Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism>
431                  discovery_mechanism,
432              grpc_error* error)
433     : discovery_mechanism_(std::move(discovery_mechanism)), type_(kError) {
434   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
435   ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
436 }
437 
438 XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism> discovery_mechanism)439     Notifier(RefCountedPtr<XdsClusterResolverLb::EdsDiscoveryMechanism>
440                  discovery_mechanism)
441     : discovery_mechanism_(std::move(discovery_mechanism)),
442       type_(kDoesNotExist) {
443   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
444   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
445 }
446 
447 void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
RunInExecCtx(void * arg,grpc_error * error)448     RunInExecCtx(void* arg, grpc_error* error) {
449   Notifier* self = static_cast<Notifier*>(arg);
450   GRPC_ERROR_REF(error);
451   self->discovery_mechanism_->parent()->work_serializer()->Run(
452       [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
453 }
454 
455 void XdsClusterResolverLb::EdsDiscoveryMechanism::EndpointWatcher::Notifier::
RunInWorkSerializer(grpc_error * error)456     RunInWorkSerializer(grpc_error* error) {
457   switch (type_) {
458     case kUpdate:
459       discovery_mechanism_->parent()->OnEndpointChanged(
460           discovery_mechanism_->index(), std::move(update_));
461       break;
462     case kError:
463       discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(),
464                                               error);
465       break;
466     case kDoesNotExist:
467       discovery_mechanism_->parent()->OnResourceDoesNotExist(
468           discovery_mechanism_->index());
469       break;
470   };
471   delete this;
472 }
473 
474 //
475 // XdsClusterResolverLb::LogicalDNSDiscoveryMechanism
476 //
477 
Start()478 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Start() {
479   std::string target = parent()->server_name_;
480   grpc_channel_args* args = nullptr;
481   FakeResolverResponseGenerator* fake_resolver_response_generator =
482       grpc_channel_args_find_pointer<FakeResolverResponseGenerator>(
483           parent()->args_,
484           GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR);
485   if (fake_resolver_response_generator != nullptr) {
486     target = absl::StrCat("fake:", target);
487     grpc_arg new_arg = FakeResolverResponseGenerator::MakeChannelArg(
488         fake_resolver_response_generator);
489     args = grpc_channel_args_copy_and_add(parent()->args_, &new_arg, 1);
490   } else {
491     args = grpc_channel_args_copy(parent()->args_);
492   }
493   resolver_ = ResolverRegistry::CreateResolver(
494       target.c_str(), args, parent()->interested_parties(),
495       parent()->work_serializer(),
496       absl::make_unique<ResolverResultHandler>(
497           Ref(DEBUG_LOCATION, "LogicalDNSDiscoveryMechanism")));
498   grpc_channel_args_destroy(args);
499   if (resolver_ == nullptr) {
500     parent()->OnResourceDoesNotExist(index());
501     return;
502   }
503   resolver_->StartLocked();
504   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
505     gpr_log(GPR_INFO,
506             "[xds_cluster_resolver_lb %p] logical DNS discovery mechanism "
507             "%" PRIuPTR ":%p starting dns resolver %p",
508             parent(), index(), this, resolver_.get());
509   }
510 }
511 
Orphan()512 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Orphan() {
513   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
514     gpr_log(
515         GPR_INFO,
516         "[xds_cluster_resolver_lb %p] logical DNS discovery mechanism %" PRIuPTR
517         ":%p shutting down dns resolver %p",
518         parent(), index(), this, resolver_.get());
519   }
520   resolver_.reset();
521   Unref();
522 }
523 
524 //
525 // XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler
526 //
527 
528 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
ReturnResult(Resolver::Result result)529     ReturnResult(Resolver::Result result) {
530   // convert result to eds update
531   XdsApi::EdsUpdate update;
532   XdsApi::EdsUpdate::Priority::Locality locality;
533   locality.name = MakeRefCounted<XdsLocalityName>("", "", "");
534   locality.lb_weight = 1;
535   locality.endpoints = std::move(result.addresses);
536   XdsApi::EdsUpdate::Priority priority;
537   priority.localities.emplace(locality.name.get(), std::move(locality));
538   update.priorities.emplace_back(std::move(priority));
539   discovery_mechanism_->parent()->OnEndpointChanged(
540       discovery_mechanism_->index(), std::move(update));
541 }
542 
543 void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
ReturnError(grpc_error * error)544     ReturnError(grpc_error* error) {
545   discovery_mechanism_->parent()->OnError(discovery_mechanism_->index(), error);
546 }
547 
548 //
549 // XdsClusterResolverLb public methods
550 //
551 
XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client,Args args)552 XdsClusterResolverLb::XdsClusterResolverLb(RefCountedPtr<XdsClient> xds_client,
553                                            Args args)
554     : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
555   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
556     gpr_log(GPR_INFO,
557             "[xds_cluster_resolver_lb %p] created -- using xds client %p", this,
558             xds_client_.get());
559   }
560   // Record server name.
561   const char* server_uri =
562       grpc_channel_args_find_string(args.args, GRPC_ARG_SERVER_URI);
563   GPR_ASSERT(server_uri != nullptr);
564   absl::StatusOr<URI> uri = URI::Parse(server_uri);
565   GPR_ASSERT(uri.ok() && !uri->path().empty());
566   server_name_ = std::string(absl::StripPrefix(uri->path(), "/"));
567   is_xds_uri_ = uri->scheme() == "xds";
568   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
569     gpr_log(GPR_INFO,
570             "[xds_cluster_resolver_lb %p] server name from channel "
571             "(is_xds_uri=%d): %s",
572             this, is_xds_uri_, server_name_.c_str());
573   }
574   // EDS-only flow.
575   if (!is_xds_uri_) {
576     // Setup channelz linkage.
577     channelz::ChannelNode* parent_channelz_node =
578         grpc_channel_args_find_pointer<channelz::ChannelNode>(
579             args.args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
580     if (parent_channelz_node != nullptr) {
581       xds_client_->AddChannelzLinkage(parent_channelz_node);
582     }
583     // Couple polling.
584     grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
585                                      interested_parties());
586   }
587 }
588 
~XdsClusterResolverLb()589 XdsClusterResolverLb::~XdsClusterResolverLb() {
590   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
591     gpr_log(GPR_INFO,
592             "[xds_cluster_resolver_lb %p] destroying xds_cluster_resolver LB "
593             "policy",
594             this);
595   }
596 }
597 
ShutdownLocked()598 void XdsClusterResolverLb::ShutdownLocked() {
599   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
600     gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] shutting down", this);
601   }
602   shutting_down_ = true;
603   MaybeDestroyChildPolicyLocked();
604   discovery_mechanisms_.clear();
605   if (!is_xds_uri_) {
606     // Remove channelz linkage.
607     channelz::ChannelNode* parent_channelz_node =
608         grpc_channel_args_find_pointer<channelz::ChannelNode>(
609             args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
610     if (parent_channelz_node != nullptr) {
611       xds_client_->RemoveChannelzLinkage(parent_channelz_node);
612     }
613     // Decouple polling.
614     grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
615                                      interested_parties());
616   }
617   xds_client_.reset(DEBUG_LOCATION, "XdsClusterResolverLb");
618   // Destroy channel args.
619   grpc_channel_args_destroy(args_);
620   args_ = nullptr;
621 }
622 
MaybeDestroyChildPolicyLocked()623 void XdsClusterResolverLb::MaybeDestroyChildPolicyLocked() {
624   if (child_policy_ != nullptr) {
625     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
626                                      interested_parties());
627     child_policy_.reset();
628   }
629 }
630 
UpdateLocked(UpdateArgs args)631 void XdsClusterResolverLb::UpdateLocked(UpdateArgs args) {
632   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
633     gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] Received update", this);
634   }
635   const bool is_initial_update = args_ == nullptr;
636   // Update config.
637   auto old_config = std::move(config_);
638   config_ = std::move(args.config);
639   // Update args.
640   grpc_channel_args_destroy(args_);
641   args_ = args.args;
642   args.args = nullptr;
643   // Update child policy if needed.
644   if (child_policy_ != nullptr) UpdateChildPolicyLocked();
645   // Create endpoint watcher if needed.
646   if (is_initial_update) {
647     for (const auto& config : config_->discovery_mechanisms()) {
648       DiscoveryMechanismEntry entry;
649       if (config.type == XdsClusterResolverLbConfig::DiscoveryMechanism::
650                              DiscoveryMechanismType::EDS) {
651         entry.discovery_mechanism =
652             grpc_core::MakeOrphanable<EdsDiscoveryMechanism>(
653                 Ref(DEBUG_LOCATION, "EdsDiscoveryMechanism"),
654                 discovery_mechanisms_.size());
655       } else if (config.type == XdsClusterResolverLbConfig::DiscoveryMechanism::
656                                     DiscoveryMechanismType::LOGICAL_DNS) {
657         entry.discovery_mechanism =
658             grpc_core::MakeOrphanable<LogicalDNSDiscoveryMechanism>(
659                 Ref(DEBUG_LOCATION, "LogicalDNSDiscoveryMechanism"),
660                 discovery_mechanisms_.size());
661       } else {
662         GPR_ASSERT(0);
663       }
664       discovery_mechanisms_.push_back(std::move(entry));
665     }
666     // Call start() on all discovery mechanisms after creation.
667     for (const auto& discovery_mechanism : discovery_mechanisms_) {
668       discovery_mechanism.discovery_mechanism->Start();
669     }
670   }
671 }
672 
ResetBackoffLocked()673 void XdsClusterResolverLb::ResetBackoffLocked() {
674   // When the XdsClient is instantiated in the resolver instead of in this
675   // LB policy, this is done via the resolver, so we don't need to do it here.
676   if (!is_xds_uri_ && xds_client_ != nullptr) xds_client_->ResetBackoff();
677   if (child_policy_ != nullptr) {
678     child_policy_->ResetBackoffLocked();
679   }
680 }
681 
ExitIdleLocked()682 void XdsClusterResolverLb::ExitIdleLocked() {
683   if (child_policy_ != nullptr) child_policy_->ExitIdleLocked();
684 }
685 
OnEndpointChanged(size_t index,XdsApi::EdsUpdate update)686 void XdsClusterResolverLb::OnEndpointChanged(size_t index,
687                                              XdsApi::EdsUpdate update) {
688   if (shutting_down_) return;
689   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
690     gpr_log(GPR_INFO,
691             "[xds_cluster_resolver_lb %p] Received update from xds client"
692             " for discovery mechanism %" PRIuPTR "",
693             this, index);
694   }
695   // We need at least one priority for each discovery mechanism, just so that we
696   // have a child in which to create the xds_cluster_impl policy.  This ensures
697   // that we properly handle the case of a discovery mechanism dropping 100% of
698   // calls, the OnError() case, and the OnResourceDoesNotExist() case.
699   if (update.priorities.empty()) update.priorities.emplace_back();
700   discovery_mechanisms_[index].drop_config = std::move(update.drop_config);
701   discovery_mechanisms_[index].pending_priority_list =
702       std::move(update.priorities);
703   discovery_mechanisms_[index].first_update_received = true;
704   // If any discovery mechanism has not received its first update,
705   // wait until that happens before creating the child policy.
706   // TODO(roth): If this becomes problematic in the future (e.g., a
707   // secondary discovery mechanism delaying us from starting up at all),
708   // we can consider some sort of optimization whereby we can create the
709   // priority policy with only a subset of its children.  But we need to
710   // make sure not to get into a situation where the priority policy
711   // will put the channel into TRANSIENT_FAILURE instead of CONNECTING
712   // while we're still waiting for the other discovery mechanism(s).
713   for (DiscoveryMechanismEntry& mechanism : discovery_mechanisms_) {
714     if (!mechanism.first_update_received) return;
715   }
716   // Construct new priority list.
717   XdsApi::EdsUpdate::PriorityList priority_list;
718   size_t priority_index = 0;
719   for (DiscoveryMechanismEntry& mechanism : discovery_mechanisms_) {
720     // If the mechanism has a pending update, use that.
721     // Otherwise, use the priorities that it previously contributed to the
722     // combined list.
723     if (mechanism.pending_priority_list.has_value()) {
724       priority_list.insert(priority_list.end(),
725                            mechanism.pending_priority_list->begin(),
726                            mechanism.pending_priority_list->end());
727       priority_index += mechanism.num_priorities;
728       mechanism.num_priorities = mechanism.pending_priority_list->size();
729       mechanism.pending_priority_list.reset();
730     } else {
731       priority_list.insert(
732           priority_list.end(), priority_list_.begin() + priority_index,
733           priority_list_.begin() + priority_index + mechanism.num_priorities);
734       priority_index += mechanism.num_priorities;
735     }
736   }
737   // Update child policy.
738   UpdatePriorityList(std::move(priority_list));
739 }
740 
OnError(size_t index,grpc_error * error)741 void XdsClusterResolverLb::OnError(size_t index, grpc_error* error) {
742   gpr_log(GPR_ERROR,
743           "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR
744           " xds watcher reported error: %s",
745           this, index, grpc_error_string(error));
746   GRPC_ERROR_UNREF(error);
747   if (shutting_down_) return;
748   if (!discovery_mechanisms_[index].first_update_received) {
749     // Call OnEndpointChanged with an empty update just like
750     // OnResourceDoesNotExist.
751     OnEndpointChanged(index, XdsApi::EdsUpdate());
752   }
753 }
754 
OnResourceDoesNotExist(size_t index)755 void XdsClusterResolverLb::OnResourceDoesNotExist(size_t index) {
756   gpr_log(GPR_ERROR,
757           "[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR
758           " resource does not exist",
759           this, index);
760   if (shutting_down_) return;
761   // Call OnEndpointChanged with an empty update.
762   OnEndpointChanged(index, XdsApi::EdsUpdate());
763 }
764 
765 //
766 // child policy-related methods
767 //
768 
UpdatePriorityList(XdsApi::EdsUpdate::PriorityList priority_list)769 void XdsClusterResolverLb::UpdatePriorityList(
770     XdsApi::EdsUpdate::PriorityList priority_list) {
771   // Build some maps from locality to child number and the reverse from
772   // the old data in priority_list_ and priority_child_numbers_.
773   std::map<XdsLocalityName*, size_t /*child_number*/, XdsLocalityName::Less>
774       locality_child_map;
775   std::map<size_t, std::set<XdsLocalityName*>> child_locality_map;
776   for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
777     size_t child_number = priority_child_numbers_[priority];
778     const auto& localities = priority_list_[priority].localities;
779     for (const auto& p : localities) {
780       XdsLocalityName* locality_name = p.first;
781       locality_child_map[locality_name] = child_number;
782       child_locality_map[child_number].insert(locality_name);
783     }
784   }
785   // Construct new list of children.
786   std::vector<size_t> priority_child_numbers;
787   for (size_t priority = 0; priority < priority_list.size(); ++priority) {
788     const auto& localities = priority_list[priority].localities;
789     absl::optional<size_t> child_number;
790     // If one of the localities in this priority already existed, reuse its
791     // child number.
792     for (const auto& p : localities) {
793       XdsLocalityName* locality_name = p.first;
794       if (!child_number.has_value()) {
795         auto it = locality_child_map.find(locality_name);
796         if (it != locality_child_map.end()) {
797           child_number = it->second;
798           locality_child_map.erase(it);
799           // Remove localities that *used* to be in this child number, so
800           // that we don't incorrectly reuse this child number for a
801           // subsequent priority.
802           for (XdsLocalityName* old_locality :
803                child_locality_map[*child_number]) {
804             locality_child_map.erase(old_locality);
805           }
806         }
807       } else {
808         // Remove all localities that are now in this child number, so
809         // that we don't accidentally reuse this child number for a
810         // subsequent priority.
811         locality_child_map.erase(locality_name);
812       }
813     }
814     // If we didn't find an existing child number, assign a new one.
815     if (!child_number.has_value()) {
816       for (child_number = 0;
817            child_locality_map.find(*child_number) != child_locality_map.end();
818            ++(*child_number)) {
819       }
820       // Add entry so we know that the child number is in use.
821       // (Don't need to add the list of localities, since we won't use them.)
822       child_locality_map[*child_number];
823     }
824     priority_child_numbers.push_back(*child_number);
825   }
826   // Save update.
827   priority_list_ = std::move(priority_list);
828   priority_child_numbers_ = std::move(priority_child_numbers);
829   // Update child policy.
830   UpdateChildPolicyLocked();
831 }
832 
CreateChildPolicyAddressesLocked()833 ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
834   ServerAddressList addresses;
835   for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
836     const auto& localities = priority_list_[priority].localities;
837     std::string priority_child_name =
838         absl::StrCat("child", priority_child_numbers_[priority]);
839     for (const auto& p : localities) {
840       const auto& locality_name = p.first;
841       const auto& locality = p.second;
842       std::vector<std::string> hierarchical_path = {
843           priority_child_name, locality_name->AsHumanReadableString()};
844       for (const auto& endpoint : locality.endpoints) {
845         addresses.emplace_back(
846             endpoint
847                 .WithAttribute(kHierarchicalPathAttributeKey,
848                                MakeHierarchicalPathAttribute(hierarchical_path))
849                 .WithAttribute(kXdsLocalityNameAttributeKey,
850                                absl::make_unique<XdsLocalityAttribute>(
851                                    locality_name->Ref()))
852                 .WithAttribute(ServerAddressWeightAttribute::
853                                    kServerAddressWeightAttributeKey,
854                                absl::make_unique<ServerAddressWeightAttribute>(
855                                    locality.lb_weight)));
856       }
857     }
858   }
859   return addresses;
860 }
861 
862 RefCountedPtr<LoadBalancingPolicy::Config>
CreateChildPolicyConfigLocked()863 XdsClusterResolverLb::CreateChildPolicyConfigLocked() {
864   Json::Object priority_children;
865   Json::Array priority_priorities;
866   // Setting up index to iterate through the discovery mechanisms and keeping
867   // track the discovery_mechanism each priority belongs to.
868   size_t discovery_index = 0;
869   // Setting up num_priorities_remaining to track the priorities in each
870   // discovery_mechanism.
871   size_t num_priorities_remaining_in_discovery =
872       discovery_mechanisms_[discovery_index].num_priorities;
873   for (size_t priority = 0; priority < priority_list_.size(); ++priority) {
874     Json child_policy;
875     if (!discovery_mechanisms_[discovery_index]
876              .discovery_mechanism->override_child_policy()
877              .empty()) {
878       child_policy = discovery_mechanisms_[discovery_index]
879                          .discovery_mechanism->override_child_policy();
880     } else {
881       const auto& xds_lb_policy = config_->xds_lb_policy().object_value();
882       if (xds_lb_policy.find("ROUND_ROBIN") != xds_lb_policy.end()) {
883         const auto& localities = priority_list_[priority].localities;
884         Json::Object weighted_targets;
885         for (const auto& p : localities) {
886           XdsLocalityName* locality_name = p.first;
887           const auto& locality = p.second;
888           // Construct JSON object containing locality name.
889           Json::Object locality_name_json;
890           if (!locality_name->region().empty()) {
891             locality_name_json["region"] = locality_name->region();
892           }
893           if (!locality_name->zone().empty()) {
894             locality_name_json["zone"] = locality_name->zone();
895           }
896           if (!locality_name->sub_zone().empty()) {
897             locality_name_json["subzone"] = locality_name->sub_zone();
898           }
899           // Add weighted target entry.
900           weighted_targets[locality_name->AsHumanReadableString()] =
901               Json::Object{
902                   {"weight", locality.lb_weight},
903                   {"childPolicy",
904                    Json::Array{
905                        Json::Object{
906                            {"round_robin", Json::Object()},
907                        },
908                    }},
909               };
910         }
911         // Construct locality-picking policy.
912         // Start with field from our config and add the "targets" field.
913         child_policy = Json::Array{
914             Json::Object{
915                 {"weighted_target_experimental",
916                  Json::Object{
917                      {"targets", Json::Object()},
918                  }},
919             },
920         };
921         Json::Object& config =
922             *(*child_policy.mutable_array())[0].mutable_object();
923         auto it = config.begin();
924         GPR_ASSERT(it != config.end());
925         (*it->second.mutable_object())["targets"] = std::move(weighted_targets);
926       } else {
927         auto it = xds_lb_policy.find("RING_HASH");
928         GPR_ASSERT(it != xds_lb_policy.end());
929         Json::Object ring_hash_experimental_policy = it->second.object_value();
930         child_policy = Json::Array{
931             Json::Object{
932                 {"ring_hash_experimental", ring_hash_experimental_policy},
933             },
934         };
935       }
936     }
937     // Wrap it in the drop policy.
938     Json::Array drop_categories;
939     if (discovery_mechanisms_[discovery_index].drop_config != nullptr) {
940       for (const auto& category : discovery_mechanisms_[discovery_index]
941                                       .drop_config->drop_category_list()) {
942         drop_categories.push_back(Json::Object{
943             {"category", category.name},
944             {"requests_per_million", category.parts_per_million},
945         });
946       }
947     }
948     const auto lrs_key = discovery_mechanisms_[discovery_index]
949                              .discovery_mechanism->GetLrsClusterKey();
950     Json::Object xds_cluster_impl_config = {
951         {"clusterName", std::string(lrs_key.first)},
952         {"childPolicy", std::move(child_policy)},
953         {"dropCategories", std::move(drop_categories)},
954         {"maxConcurrentRequests",
955          config_->discovery_mechanisms()[discovery_index]
956              .max_concurrent_requests},
957     };
958     if (!lrs_key.second.empty()) {
959       xds_cluster_impl_config["edsServiceName"] = std::string(lrs_key.second);
960     }
961     if (config_->discovery_mechanisms()[discovery_index]
962             .lrs_load_reporting_server_name.has_value()) {
963       xds_cluster_impl_config["lrsLoadReportingServerName"] =
964           config_->discovery_mechanisms()[discovery_index]
965               .lrs_load_reporting_server_name.value();
966     }
967     Json locality_picking_policy = Json::Array{Json::Object{
968         {"xds_cluster_impl_experimental", std::move(xds_cluster_impl_config)},
969     }};
970     // Add priority entry.
971     const size_t child_number = priority_child_numbers_[priority];
972     std::string child_name = absl::StrCat("child", child_number);
973     priority_priorities.emplace_back(child_name);
974     Json::Object child_config = {
975         {"config", std::move(locality_picking_policy)},
976     };
977     if (discovery_mechanisms_[discovery_index]
978             .discovery_mechanism->disable_reresolution()) {
979       child_config["ignore_reresolution_requests"] = true;
980     }
981     priority_children[child_name] = std::move(child_config);
982     // Each priority in the priority_list_ should correspond to a priority in a
983     // discovery mechanism in discovery_mechanisms_ (both in the same order).
984     // Keeping track of the discovery_mechanism each priority belongs to.
985     --num_priorities_remaining_in_discovery;
986     while (num_priorities_remaining_in_discovery == 0 &&
987            discovery_index < discovery_mechanisms_.size() - 1) {
988       ++discovery_index;
989       num_priorities_remaining_in_discovery =
990           discovery_mechanisms_[discovery_index].num_priorities;
991     }
992   }
993   // There should be matching number of priorities in discovery_mechanisms_ and
994   // in priority_list_; therefore at the end of looping through all the
995   // priorities, num_priorities_remaining should be down to 0, and index should
996   // be the last index in discovery_mechanisms_.
997   GPR_ASSERT(num_priorities_remaining_in_discovery == 0);
998   GPR_ASSERT(discovery_index == discovery_mechanisms_.size() - 1);
999   Json json = Json::Array{Json::Object{
1000       {"priority_experimental",
1001        Json::Object{
1002            {"children", std::move(priority_children)},
1003            {"priorities", std::move(priority_priorities)},
1004        }},
1005   }};
1006   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
1007     std::string json_str = json.Dump(/*indent=*/1);
1008     gpr_log(
1009         GPR_INFO,
1010         "[xds_cluster_resolver_lb %p] generated config for child policy: %s",
1011         this, json_str.c_str());
1012   }
1013   grpc_error* error = GRPC_ERROR_NONE;
1014   RefCountedPtr<LoadBalancingPolicy::Config> config =
1015       LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error);
1016   if (error != GRPC_ERROR_NONE) {
1017     // This should never happen, but if it does, we basically have no
1018     // way to fix it, so we put the channel in TRANSIENT_FAILURE.
1019     gpr_log(GPR_ERROR,
1020             "[xds_cluster_resolver_lb %p] error parsing generated child policy "
1021             "config -- "
1022             "will put channel in TRANSIENT_FAILURE: %s",
1023             this, grpc_error_string(error));
1024     error = grpc_error_set_int(
1025         grpc_error_add_child(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1026                                  "xds_cluster_resolver LB policy: error "
1027                                  "parsing generated child policy config"),
1028                              error),
1029         GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL);
1030     channel_control_helper()->UpdateState(
1031         GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
1032         absl::make_unique<TransientFailurePicker>(error));
1033     return nullptr;
1034   }
1035   return config;
1036 }
1037 
UpdateChildPolicyLocked()1038 void XdsClusterResolverLb::UpdateChildPolicyLocked() {
1039   if (shutting_down_) return;
1040   UpdateArgs update_args;
1041   update_args.config = CreateChildPolicyConfigLocked();
1042   if (update_args.config == nullptr) return;
1043   update_args.addresses = CreateChildPolicyAddressesLocked();
1044   update_args.args = CreateChildPolicyArgsLocked(args_);
1045   if (child_policy_ == nullptr) {
1046     child_policy_ = CreateChildPolicyLocked(update_args.args);
1047   }
1048   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
1049     gpr_log(GPR_INFO, "[xds_cluster_resolver_lb %p] Updating child policy %p",
1050             this, child_policy_.get());
1051   }
1052   child_policy_->UpdateLocked(std::move(update_args));
1053 }
1054 
CreateChildPolicyArgsLocked(const grpc_channel_args * args)1055 grpc_channel_args* XdsClusterResolverLb::CreateChildPolicyArgsLocked(
1056     const grpc_channel_args* args) {
1057   // Inhibit client-side health checking, since the balancer does this for us.
1058   grpc_arg new_arg = grpc_channel_arg_integer_create(
1059       const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
1060   return grpc_channel_args_copy_and_add(args, &new_arg, 1);
1061 }
1062 
1063 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const grpc_channel_args * args)1064 XdsClusterResolverLb::CreateChildPolicyLocked(const grpc_channel_args* args) {
1065   LoadBalancingPolicy::Args lb_policy_args;
1066   lb_policy_args.work_serializer = work_serializer();
1067   lb_policy_args.args = args;
1068   lb_policy_args.channel_control_helper =
1069       absl::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
1070   OrphanablePtr<LoadBalancingPolicy> lb_policy =
1071       LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
1072           "priority_experimental", std::move(lb_policy_args));
1073   if (GPR_UNLIKELY(lb_policy == nullptr)) {
1074     gpr_log(GPR_ERROR,
1075             "[xds_cluster_resolver_lb %p] failure creating child policy", this);
1076     return nullptr;
1077   }
1078   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
1079     gpr_log(GPR_INFO,
1080             "[xds_cluster_resolver_lb %p]: Created new child policy %p", this,
1081             lb_policy.get());
1082   }
1083   // Add our interested_parties pollset_set to that of the newly created
1084   // child policy. This will make the child policy progress upon activity on
1085   // this policy, which in turn is tied to the application's call.
1086   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1087                                    interested_parties());
1088   return lb_policy;
1089 }
1090 
1091 //
1092 // factory
1093 //
1094 
1095 class XdsClusterResolverLbFactory : public LoadBalancingPolicyFactory {
1096  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const1097   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1098       LoadBalancingPolicy::Args args) const override {
1099     grpc_error* error = GRPC_ERROR_NONE;
1100     RefCountedPtr<XdsClient> xds_client = XdsClient::GetOrCreate(&error);
1101     if (error != GRPC_ERROR_NONE) {
1102       gpr_log(GPR_ERROR,
1103               "cannot get XdsClient to instantiate xds_cluster_resolver LB "
1104               "policy: %s",
1105               grpc_error_string(error));
1106       GRPC_ERROR_UNREF(error);
1107       return nullptr;
1108     }
1109     return MakeOrphanable<XdsClusterResolverChildHandler>(std::move(xds_client),
1110                                                           std::move(args));
1111   }
1112 
name() const1113   const char* name() const override { return kXdsClusterResolver; }
1114 
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const1115   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
1116       const Json& json, grpc_error** error) const override {
1117     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
1118     if (json.type() == Json::Type::JSON_NULL) {
1119       // xds_cluster_resolver was mentioned as a policy in the deprecated
1120       // loadBalancingPolicy field or in the client API.
1121       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1122           "field:loadBalancingPolicy error:xds_cluster_resolver policy "
1123           "requires configuration. "
1124           "Please use loadBalancingConfig field of service config instead.");
1125       return nullptr;
1126     }
1127     std::vector<grpc_error*> error_list;
1128     std::vector<XdsClusterResolverLbConfig::DiscoveryMechanism>
1129         discovery_mechanisms;
1130     auto it = json.object_value().find("discoveryMechanisms");
1131     if (it == json.object_value().end()) {
1132       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1133           "field:discoveryMechanisms error:required field missing"));
1134     } else if (it->second.type() != Json::Type::ARRAY) {
1135       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1136           "field:discoveryMechanisms error:type should be array"));
1137     } else {
1138       const Json::Array& array = it->second.array_value();
1139       for (size_t i = 0; i < array.size(); ++i) {
1140         XdsClusterResolverLbConfig::DiscoveryMechanism discovery_mechanism;
1141         std::vector<grpc_error*> discovery_mechanism_errors =
1142             ParseDiscoveryMechanism(array[i], &discovery_mechanism);
1143         if (!discovery_mechanism_errors.empty()) {
1144           grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
1145               absl::StrCat("field:discovery_mechanism element: ", i, " error")
1146                   .c_str());
1147           for (grpc_error* discovery_mechanism_error :
1148                discovery_mechanism_errors) {
1149             error = grpc_error_add_child(error, discovery_mechanism_error);
1150           }
1151           error_list.push_back(error);
1152         }
1153         discovery_mechanisms.emplace_back(std::move(discovery_mechanism));
1154       }
1155     }
1156     if (discovery_mechanisms.empty()) {
1157       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1158           "field:discovery_mechanism error:list is missing or empty"));
1159     }
1160     Json xds_lb_policy = Json::Object{
1161         {"ROUND_ROBIN", Json::Object()},
1162     };
1163     it = json.object_value().find("xdsLbPolicy");
1164     if (it != json.object_value().end()) {
1165       if (it->second.type() != Json::Type::ARRAY) {
1166         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1167             "field:xdsLbPolicy error:type should be array"));
1168       } else {
1169         const Json::Array& array = it->second.array_value();
1170         for (size_t i = 0; i < array.size(); ++i) {
1171           if (array[i].type() != Json::Type::OBJECT) {
1172             error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1173                 "field:xdsLbPolicy error:element should be of type object"));
1174             continue;
1175           }
1176           const Json::Object& policy = array[i].object_value();
1177           auto policy_it = policy.find("ROUND_ROBIN");
1178           if (policy_it != policy.end()) {
1179             if (policy_it->second.type() != Json::Type::OBJECT) {
1180               error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1181                   "field:ROUND_ROBIN error:type should be object"));
1182             }
1183             break;
1184           }
1185           policy_it = policy.find("RING_HASH");
1186           if (policy_it != policy.end()) {
1187             if (policy_it->second.type() != Json::Type::OBJECT) {
1188               error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1189                   "field:RING_HASH error:type should be object"));
1190               continue;
1191             }
1192             // TODO(donnadionne): Move this to a method in
1193             // ring_hash_experimental and call it here.
1194             const Json::Object& ring_hash = policy_it->second.object_value();
1195             xds_lb_policy = array[i];
1196             size_t min_ring_size = 1024;
1197             size_t max_ring_size = 8388608;
1198             auto ring_hash_it = ring_hash.find("min_ring_size");
1199             if (ring_hash_it == ring_hash.end()) {
1200               error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1201                   "field:min_ring_size missing"));
1202             } else if (ring_hash_it->second.type() != Json::Type::NUMBER) {
1203               error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1204                   "field:min_ring_size error: should be of "
1205                   "number"));
1206             } else {
1207               min_ring_size = gpr_parse_nonnegative_int(
1208                   ring_hash_it->second.string_value().c_str());
1209             }
1210             ring_hash_it = ring_hash.find("max_ring_size");
1211             if (ring_hash_it == ring_hash.end()) {
1212               error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1213                   "field:max_ring_size missing"));
1214             } else if (ring_hash_it->second.type() != Json::Type::NUMBER) {
1215               error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1216                   "field:max_ring_size error: should be of "
1217                   "number"));
1218             } else {
1219               max_ring_size = gpr_parse_nonnegative_int(
1220                   ring_hash_it->second.string_value().c_str());
1221             }
1222             if (min_ring_size <= 0 || min_ring_size > 8388608 ||
1223                 max_ring_size <= 0 || max_ring_size > 8388608 ||
1224                 min_ring_size > max_ring_size) {
1225               error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1226                   "field:max_ring_size and or min_ring_size error: "
1227                   "values need to be in the range of 1 to 8388608 "
1228                   "and max_ring_size cannot be smaller than "
1229                   "min_ring_size"));
1230             }
1231             ring_hash_it = ring_hash.find("hash_function");
1232             if (ring_hash_it == ring_hash.end()) {
1233               error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1234                   "field:hash_function missing"));
1235             } else if (ring_hash_it->second.type() != Json::Type::STRING) {
1236               error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1237                   "field:hash_function error: should be a "
1238                   "string"));
1239             } else if (ring_hash_it->second.string_value() != "XX_HASH" &&
1240                        ring_hash_it->second.string_value() != "MURMUR_HASH_2") {
1241               error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1242                   "field:hash_function error: unsupported "
1243                   "hash_function"));
1244             }
1245             break;
1246           }
1247         }
1248       }
1249     }
1250     // Construct config.
1251     if (error_list.empty()) {
1252       return MakeRefCounted<XdsClusterResolverLbConfig>(
1253           std::move(discovery_mechanisms), std::move(xds_lb_policy));
1254     } else {
1255       *error = GRPC_ERROR_CREATE_FROM_VECTOR(
1256           "xds_cluster_resolver_experimental LB policy config", &error_list);
1257       return nullptr;
1258     }
1259   }
1260 
1261  private:
ParseDiscoveryMechanism(const Json & json,XdsClusterResolverLbConfig::DiscoveryMechanism * discovery_mechanism)1262   static std::vector<grpc_error*> ParseDiscoveryMechanism(
1263       const Json& json,
1264       XdsClusterResolverLbConfig::DiscoveryMechanism* discovery_mechanism) {
1265     std::vector<grpc_error*> error_list;
1266     if (json.type() != Json::Type::OBJECT) {
1267       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1268           "value should be of type object"));
1269       return error_list;
1270     }
1271     // Cluster name.
1272     auto it = json.object_value().find("clusterName");
1273     if (it == json.object_value().end()) {
1274       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1275           "field:clusterName error:required field missing"));
1276     } else if (it->second.type() != Json::Type::STRING) {
1277       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1278           "field:clusterName error:type should be string"));
1279     } else {
1280       discovery_mechanism->cluster_name = it->second.string_value();
1281     }
1282     // LRS load reporting server name.
1283     it = json.object_value().find("lrsLoadReportingServerName");
1284     if (it != json.object_value().end()) {
1285       if (it->second.type() != Json::Type::STRING) {
1286         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1287             "field:lrsLoadReportingServerName error:type should be string"));
1288       } else {
1289         discovery_mechanism->lrs_load_reporting_server_name.emplace(
1290             it->second.string_value());
1291       }
1292     }
1293     // Max concurrent requests.
1294     discovery_mechanism->max_concurrent_requests = 1024;
1295     it = json.object_value().find("max_concurrent_requests");
1296     if (it != json.object_value().end()) {
1297       if (it->second.type() != Json::Type::NUMBER) {
1298         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1299             "field:max_concurrent_requests error:must be of type number"));
1300       } else {
1301         discovery_mechanism->max_concurrent_requests =
1302             gpr_parse_nonnegative_int(it->second.string_value().c_str());
1303       }
1304     }
1305     // Discovery Mechanism type
1306     it = json.object_value().find("type");
1307     if (it == json.object_value().end()) {
1308       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1309           "field:type error:required field missing"));
1310     } else if (it->second.type() != Json::Type::STRING) {
1311       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1312           "field:type error:type should be string"));
1313     } else {
1314       if (it->second.string_value() == "EDS") {
1315         discovery_mechanism->type = XdsClusterResolverLbConfig::
1316             DiscoveryMechanism::DiscoveryMechanismType::EDS;
1317       } else if (it->second.string_value() == "LOGICAL_DNS") {
1318         discovery_mechanism->type = XdsClusterResolverLbConfig::
1319             DiscoveryMechanism::DiscoveryMechanismType::LOGICAL_DNS;
1320       } else {
1321         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1322             "field:type error:invalid type"));
1323       }
1324     }
1325     // EDS service name.
1326     it = json.object_value().find("edsServiceName");
1327     if (it != json.object_value().end()) {
1328       if (it->second.type() != Json::Type::STRING) {
1329         error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1330             "field:xds_cluster_resolverServiceName error:type should be "
1331             "string"));
1332       } else {
1333         discovery_mechanism->eds_service_name = it->second.string_value();
1334       }
1335     }
1336     return error_list;
1337   }
1338 
1339   class XdsClusterResolverChildHandler : public ChildPolicyHandler {
1340    public:
XdsClusterResolverChildHandler(RefCountedPtr<XdsClient> xds_client,Args args)1341     XdsClusterResolverChildHandler(RefCountedPtr<XdsClient> xds_client,
1342                                    Args args)
1343         : ChildPolicyHandler(std::move(args),
1344                              &grpc_lb_xds_cluster_resolver_trace),
1345           xds_client_(std::move(xds_client)) {}
1346 
ConfigChangeRequiresNewPolicyInstance(LoadBalancingPolicy::Config * old_config,LoadBalancingPolicy::Config * new_config) const1347     bool ConfigChangeRequiresNewPolicyInstance(
1348         LoadBalancingPolicy::Config* old_config,
1349         LoadBalancingPolicy::Config* new_config) const override {
1350       GPR_ASSERT(old_config->name() == kXdsClusterResolver);
1351       GPR_ASSERT(new_config->name() == kXdsClusterResolver);
1352       XdsClusterResolverLbConfig* old_xds_cluster_resolver_config =
1353           static_cast<XdsClusterResolverLbConfig*>(old_config);
1354       XdsClusterResolverLbConfig* new_xds_cluster_resolver_config =
1355           static_cast<XdsClusterResolverLbConfig*>(new_config);
1356       return old_xds_cluster_resolver_config->discovery_mechanisms() !=
1357              new_xds_cluster_resolver_config->discovery_mechanisms();
1358     }
1359 
CreateLoadBalancingPolicy(const char *,LoadBalancingPolicy::Args args) const1360     OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1361         const char* /*name*/, LoadBalancingPolicy::Args args) const override {
1362       return MakeOrphanable<XdsClusterResolverLb>(xds_client_, std::move(args));
1363     }
1364 
1365    private:
1366     RefCountedPtr<XdsClient> xds_client_;
1367   };
1368 };
1369 
1370 }  // namespace
1371 
1372 }  // namespace grpc_core
1373 
1374 //
1375 // Plugin registration
1376 //
1377 
grpc_lb_policy_xds_cluster_resolver_init()1378 void grpc_lb_policy_xds_cluster_resolver_init() {
1379   grpc_core::LoadBalancingPolicyRegistry::Builder::
1380       RegisterLoadBalancingPolicyFactory(
1381           absl::make_unique<grpc_core::XdsClusterResolverLbFactory>());
1382 }
1383 
grpc_lb_policy_xds_cluster_resolver_shutdown()1384 void grpc_lb_policy_xds_cluster_resolver_shutdown() {}
1385