• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/resolver/xds/xds_dependency_manager.h"
18 
19 #include <set>
20 
21 #include "absl/log/check.h"
22 #include "absl/log/log.h"
23 #include "absl/strings/str_join.h"
24 #include "src/core/config/core_configuration.h"
25 #include "src/core/load_balancing/xds/xds_channel_args.h"
26 #include "src/core/resolver/fake/fake_resolver.h"
27 #include "src/core/util/match.h"
28 #include "src/core/xds/grpc/xds_cluster_parser.h"
29 #include "src/core/xds/grpc/xds_endpoint_parser.h"
30 #include "src/core/xds/grpc/xds_listener_parser.h"
31 #include "src/core/xds/grpc/xds_route_config_parser.h"
32 #include "src/core/xds/grpc/xds_routing.h"
33 
34 namespace grpc_core {
35 
36 namespace {
37 
38 // Max depth of aggregate cluster dependency graph.
39 constexpr int kMaxXdsAggregateClusterRecursionDepth = 16;
40 
41 }  // namespace
42 
43 //
44 // XdsDependencyManager::ListenerWatcher
45 //
46 
47 class XdsDependencyManager::ListenerWatcher final
48     : public XdsListenerResourceType::WatcherInterface {
49  public:
ListenerWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr)50   explicit ListenerWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr)
51       : dependency_mgr_(std::move(dependency_mgr)) {}
52 
OnResourceChanged(absl::StatusOr<std::shared_ptr<const XdsListenerResource>> listener,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)53   void OnResourceChanged(
54       absl::StatusOr<std::shared_ptr<const XdsListenerResource>> listener,
55       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
56     dependency_mgr_->work_serializer_->Run(
57         [dependency_mgr = dependency_mgr_, listener = std::move(listener),
58          read_delay_handle = std::move(read_delay_handle)]() mutable {
59           dependency_mgr->OnListenerUpdate(std::move(listener));
60         },
61         DEBUG_LOCATION);
62   }
63 
OnAmbientError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)64   void OnAmbientError(
65       absl::Status status,
66       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
67     dependency_mgr_->work_serializer_->Run(
68         [dependency_mgr = dependency_mgr_, status = std::move(status),
69          read_delay_handle = std::move(read_delay_handle)]() mutable {
70           dependency_mgr->OnListenerAmbientError(std::move(status));
71         },
72         DEBUG_LOCATION);
73   }
74 
75  private:
76   RefCountedPtr<XdsDependencyManager> dependency_mgr_;
77 };
78 
79 //
80 // XdsDependencyManager::RouteConfigWatcher
81 //
82 
83 class XdsDependencyManager::RouteConfigWatcher final
84     : public XdsRouteConfigResourceType::WatcherInterface {
85  public:
RouteConfigWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,std::string name)86   RouteConfigWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,
87                      std::string name)
88       : dependency_mgr_(std::move(dependency_mgr)), name_(std::move(name)) {}
89 
OnResourceChanged(absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> route_config,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)90   void OnResourceChanged(
91       absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
92           route_config,
93       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
94     dependency_mgr_->work_serializer_->Run(
95         [self = RefAsSubclass<RouteConfigWatcher>(),
96          route_config = std::move(route_config),
97          read_delay_handle = std::move(read_delay_handle)]() mutable {
98           self->dependency_mgr_->OnRouteConfigUpdate(self->name_,
99                                                      std::move(route_config));
100         },
101         DEBUG_LOCATION);
102   }
103 
OnAmbientError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)104   void OnAmbientError(
105       absl::Status status,
106       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
107     dependency_mgr_->work_serializer_->Run(
108         [self = RefAsSubclass<RouteConfigWatcher>(), status = std::move(status),
109          read_delay_handle = std::move(read_delay_handle)]() mutable {
110           self->dependency_mgr_->OnRouteConfigAmbientError(self->name_,
111                                                            std::move(status));
112         },
113         DEBUG_LOCATION);
114   }
115 
116  private:
117   RefCountedPtr<XdsDependencyManager> dependency_mgr_;
118   std::string name_;
119 };
120 
121 //
122 // XdsDependencyManager::ClusterWatcher
123 //
124 
125 class XdsDependencyManager::ClusterWatcher final
126     : public XdsClusterResourceType::WatcherInterface {
127  public:
ClusterWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,absl::string_view name)128   ClusterWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,
129                  absl::string_view name)
130       : dependency_mgr_(std::move(dependency_mgr)), name_(name) {}
131 
OnResourceChanged(absl::StatusOr<std::shared_ptr<const XdsClusterResource>> cluster,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)132   void OnResourceChanged(
133       absl::StatusOr<std::shared_ptr<const XdsClusterResource>> cluster,
134       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
135     dependency_mgr_->work_serializer_->Run(
136         [self = RefAsSubclass<ClusterWatcher>(), cluster = std::move(cluster),
137          read_delay_handle = std::move(read_delay_handle)]() mutable {
138           self->dependency_mgr_->OnClusterUpdate(self->name_,
139                                                  std::move(cluster));
140         },
141         DEBUG_LOCATION);
142   }
143 
OnAmbientError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)144   void OnAmbientError(
145       absl::Status status,
146       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
147     dependency_mgr_->work_serializer_->Run(
148         [self = RefAsSubclass<ClusterWatcher>(), status = std::move(status),
149          read_delay_handle = std::move(read_delay_handle)]() mutable {
150           self->dependency_mgr_->OnClusterAmbientError(self->name_,
151                                                        std::move(status));
152         },
153         DEBUG_LOCATION);
154   }
155 
156  private:
157   RefCountedPtr<XdsDependencyManager> dependency_mgr_;
158   std::string name_;
159 };
160 
161 //
162 // XdsDependencyManager::EndpointWatcher
163 //
164 
165 class XdsDependencyManager::EndpointWatcher final
166     : public XdsEndpointResourceType::WatcherInterface {
167  public:
EndpointWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,absl::string_view name)168   EndpointWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,
169                   absl::string_view name)
170       : dependency_mgr_(std::move(dependency_mgr)), name_(name) {}
171 
OnResourceChanged(absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> endpoint,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)172   void OnResourceChanged(
173       absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> endpoint,
174       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
175     dependency_mgr_->work_serializer_->Run(
176         [self = RefAsSubclass<EndpointWatcher>(),
177          endpoint = std::move(endpoint),
178          read_delay_handle = std::move(read_delay_handle)]() mutable {
179           self->dependency_mgr_->OnEndpointUpdate(self->name_,
180                                                   std::move(endpoint));
181         },
182         DEBUG_LOCATION);
183   }
184 
OnAmbientError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)185   void OnAmbientError(
186       absl::Status status,
187       RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
188     dependency_mgr_->work_serializer_->Run(
189         [self = RefAsSubclass<EndpointWatcher>(), status = std::move(status),
190          read_delay_handle = std::move(read_delay_handle)]() mutable {
191           self->dependency_mgr_->OnEndpointAmbientError(self->name_,
192                                                         std::move(status));
193         },
194         DEBUG_LOCATION);
195   }
196 
197  private:
198   RefCountedPtr<XdsDependencyManager> dependency_mgr_;
199   std::string name_;
200 };
201 
202 //
203 // XdsDependencyManager::DnsResultHandler
204 //
205 
206 class XdsDependencyManager::DnsResultHandler final
207     : public Resolver::ResultHandler {
208  public:
DnsResultHandler(RefCountedPtr<XdsDependencyManager> dependency_mgr,std::string name)209   DnsResultHandler(RefCountedPtr<XdsDependencyManager> dependency_mgr,
210                    std::string name)
211       : dependency_mgr_(std::move(dependency_mgr)), name_(std::move(name)) {}
212 
ReportResult(Resolver::Result result)213   void ReportResult(Resolver::Result result) override {
214     dependency_mgr_->work_serializer_->Run(
215         [dependency_mgr = dependency_mgr_, name = name_,
216          result = std::move(result)]() mutable {
217           dependency_mgr->OnDnsResult(name, std::move(result));
218         },
219         DEBUG_LOCATION);
220   }
221 
222  private:
223   RefCountedPtr<XdsDependencyManager> dependency_mgr_;
224   std::string name_;
225 };
226 
227 //
228 // XdsDependencyManager::ClusterSubscription
229 //
230 
Orphaned()231 void XdsDependencyManager::ClusterSubscription::Orphaned() {
232   dependency_mgr_->work_serializer_->Run(
233       [self = WeakRef()]() {
234         self->dependency_mgr_->OnClusterSubscriptionUnref(self->cluster_name_,
235                                                           self.get());
236       },
237       DEBUG_LOCATION);
238 }
239 
240 //
241 // XdsDependencyManager
242 //
243 
XdsDependencyManager(RefCountedPtr<GrpcXdsClient> xds_client,std::shared_ptr<WorkSerializer> work_serializer,std::unique_ptr<Watcher> watcher,std::string data_plane_authority,std::string listener_resource_name,ChannelArgs args,grpc_pollset_set * interested_parties)244 XdsDependencyManager::XdsDependencyManager(
245     RefCountedPtr<GrpcXdsClient> xds_client,
246     std::shared_ptr<WorkSerializer> work_serializer,
247     std::unique_ptr<Watcher> watcher, std::string data_plane_authority,
248     std::string listener_resource_name, ChannelArgs args,
249     grpc_pollset_set* interested_parties)
250     : xds_client_(std::move(xds_client)),
251       work_serializer_(std::move(work_serializer)),
252       watcher_(std::move(watcher)),
253       data_plane_authority_(std::move(data_plane_authority)),
254       listener_resource_name_(std::move(listener_resource_name)),
255       args_(std::move(args)),
256       interested_parties_(interested_parties) {
257   GRPC_TRACE_LOG(xds_resolver, INFO)
258       << "[XdsDependencyManager " << this << "] starting watch for listener "
259       << listener_resource_name_;
260   auto listener_watcher = MakeRefCounted<ListenerWatcher>(Ref());
261   listener_watcher_ = listener_watcher.get();
262   XdsListenerResourceType::StartWatch(
263       xds_client_.get(), listener_resource_name_, std::move(listener_watcher));
264 }
265 
Orphan()266 void XdsDependencyManager::Orphan() {
267   GRPC_TRACE_LOG(xds_resolver, INFO)
268       << "[XdsDependencyManager " << this << "] shutting down";
269   if (listener_watcher_ != nullptr) {
270     XdsListenerResourceType::CancelWatch(
271         xds_client_.get(), listener_resource_name_, listener_watcher_,
272         /*delay_unsubscription=*/false);
273   }
274   if (route_config_watcher_ != nullptr) {
275     XdsRouteConfigResourceType::CancelWatch(
276         xds_client_.get(), route_config_name_, route_config_watcher_,
277         /*delay_unsubscription=*/false);
278   }
279   for (const auto& p : cluster_watchers_) {
280     XdsClusterResourceType::CancelWatch(xds_client_.get(), p.first,
281                                         p.second.watcher,
282                                         /*delay_unsubscription=*/false);
283   }
284   for (const auto& p : endpoint_watchers_) {
285     XdsEndpointResourceType::CancelWatch(xds_client_.get(), p.first,
286                                          p.second.watcher,
287                                          /*delay_unsubscription=*/false);
288   }
289   cluster_subscriptions_.clear();
290   xds_client_.reset();
291   for (auto& p : dns_resolvers_) {
292     p.second.resolver.reset();
293   }
294   Unref();
295 }
296 
RequestReresolution()297 void XdsDependencyManager::RequestReresolution() {
298   for (const auto& p : dns_resolvers_) {
299     p.second.resolver->RequestReresolutionLocked();
300   }
301 }
302 
ResetBackoff()303 void XdsDependencyManager::ResetBackoff() {
304   for (const auto& p : dns_resolvers_) {
305     p.second.resolver->ResetBackoffLocked();
306   }
307 }
308 
OnListenerUpdate(absl::StatusOr<std::shared_ptr<const XdsListenerResource>> listener)309 void XdsDependencyManager::OnListenerUpdate(
310     absl::StatusOr<std::shared_ptr<const XdsListenerResource>> listener) {
311   GRPC_TRACE_LOG(xds_resolver, INFO)
312       << "[XdsDependencyManager " << this << "] received Listener update";
313   if (xds_client_ == nullptr) return;
314   if (!listener.ok()) {
315     current_listener_.reset();
316     return ReportError("LDS", listener_resource_name_,
317                        listener.status().message());
318   }
319   const auto* hcm = absl::get_if<XdsListenerResource::HttpConnectionManager>(
320       &(*listener)->listener);
321   if (hcm == nullptr) {
322     current_listener_.reset();
323     return ReportError("LDS", listener_resource_name_, "not an API listener");
324   }
325   current_listener_ = *std::move(listener);
326   lds_resolution_note_.clear();
327   Match(
328       hcm->route_config,
329       // RDS resource name
330       [&](const std::string& rds_name) {
331         // If the RDS name changed, update the RDS watcher.
332         // Note that this will be true on the initial update, because
333         // route_config_name_ will be empty.
334         if (route_config_name_ != rds_name) {
335           // If we already had a watch (i.e., if the previous config had
336           // a different RDS name), stop the previous watch.
337           // There will be no previous watch if either (a) this is the
338           // initial resource update or (b) the previous Listener had an
339           // inlined RouteConfig.
340           if (route_config_watcher_ != nullptr) {
341             XdsRouteConfigResourceType::CancelWatch(
342                 xds_client_.get(), route_config_name_, route_config_watcher_,
343                 /*delay_unsubscription=*/true);
344             route_config_watcher_ = nullptr;
345           }
346           // Start watch for the new RDS resource name.
347           route_config_name_ = rds_name;
348           GRPC_TRACE_LOG(xds_resolver, INFO)
349               << "[XdsDependencyManager " << this
350               << "] starting watch for route config " << route_config_name_;
351           auto watcher =
352               MakeRefCounted<RouteConfigWatcher>(Ref(), route_config_name_);
353           route_config_watcher_ = watcher.get();
354           XdsRouteConfigResourceType::StartWatch(
355               xds_client_.get(), route_config_name_, std::move(watcher));
356         } else {
357           // RDS resource name has not changed, so no watch needs to be
358           // updated, but we still need to propagate any changes in the
359           // HCM config (e.g., the list of HTTP filters).
360           MaybeReportUpdate();
361         }
362       },
363       // inlined RouteConfig
364       [&](const std::shared_ptr<const XdsRouteConfigResource>& route_config) {
365         // If the previous update specified an RDS resource instead of
366         // having an inlined RouteConfig, we need to cancel the RDS watch.
367         if (route_config_watcher_ != nullptr) {
368           XdsRouteConfigResourceType::CancelWatch(
369               xds_client_.get(), route_config_name_, route_config_watcher_);
370           route_config_watcher_ = nullptr;
371           route_config_name_.clear();
372         }
373         OnRouteConfigUpdate("", route_config);
374       });
375 }
376 
OnListenerAmbientError(absl::Status status)377 void XdsDependencyManager::OnListenerAmbientError(absl::Status status) {
378   GRPC_TRACE_LOG(xds_resolver, INFO)
379       << "[XdsDependencyManager " << this
380       << "] received Listener error: " << listener_resource_name_ << ": "
381       << status;
382   if (xds_client_ == nullptr) return;
383   if (status.ok()) {
384     lds_resolution_note_.clear();
385   } else {
386     lds_resolution_note_ = absl::StrCat(
387         "LDS resource ", listener_resource_name_, ": ", status.message());
388   }
389   MaybeReportUpdate();
390 }
391 
392 namespace {
393 
394 class XdsVirtualHostListIterator final
395     : public XdsRouting::VirtualHostListIterator {
396  public:
XdsVirtualHostListIterator(const std::vector<XdsRouteConfigResource::VirtualHost> * virtual_hosts)397   explicit XdsVirtualHostListIterator(
398       const std::vector<XdsRouteConfigResource::VirtualHost>* virtual_hosts)
399       : virtual_hosts_(virtual_hosts) {}
400 
Size() const401   size_t Size() const override { return virtual_hosts_->size(); }
402 
GetDomainsForVirtualHost(size_t index) const403   const std::vector<std::string>& GetDomainsForVirtualHost(
404       size_t index) const override {
405     return (*virtual_hosts_)[index].domains;
406   }
407 
408  private:
409   const std::vector<XdsRouteConfigResource::VirtualHost>* virtual_hosts_;
410 };
411 
412 // Gets the set of clusters referenced in the specified virtual host.
GetClustersFromVirtualHost(const XdsRouteConfigResource::VirtualHost & virtual_host)413 absl::flat_hash_set<absl::string_view> GetClustersFromVirtualHost(
414     const XdsRouteConfigResource::VirtualHost& virtual_host) {
415   absl::flat_hash_set<absl::string_view> clusters;
416   for (auto& route : virtual_host.routes) {
417     auto* route_action =
418         absl::get_if<XdsRouteConfigResource::Route::RouteAction>(&route.action);
419     if (route_action == nullptr) continue;
420     Match(
421         route_action->action,
422         // cluster name
423         [&](const XdsRouteConfigResource::Route::RouteAction::ClusterName&
424                 cluster_name) { clusters.insert(cluster_name.cluster_name); },
425         // WeightedClusters
426         [&](const std::vector<
427             XdsRouteConfigResource::Route::RouteAction::ClusterWeight>&
428                 weighted_clusters) {
429           for (const auto& weighted_cluster : weighted_clusters) {
430             clusters.insert(weighted_cluster.name);
431           }
432         },
433         // ClusterSpecifierPlugin
434         [&](const XdsRouteConfigResource::Route::RouteAction::
435                 ClusterSpecifierPluginName&) {
436           // Clusters are determined dynamically in this case, so we
437           // can't add any clusters here.
438         });
439   }
440   return clusters;
441 }
442 
443 }  // namespace
444 
OnRouteConfigUpdate(const std::string & name,absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> route_config)445 void XdsDependencyManager::OnRouteConfigUpdate(
446     const std::string& name,
447     absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
448         route_config) {
449   GRPC_TRACE_LOG(xds_resolver, INFO) << "[XdsDependencyManager " << this
450                                      << "] received RouteConfig update for "
451                                      << (name.empty() ? "<inline>" : name);
452   if (xds_client_ == nullptr) return;
453   if (!route_config.ok()) {
454     current_virtual_host_ = nullptr;
455     ReportError(route_config_name_.empty() ? "LDS" : "RDS",
456                 route_config_name_.empty() ? listener_resource_name_
457                                            : route_config_name_,
458                 route_config.status().message());
459     return;
460   }
461   // Ignore updates for stale names.
462   if (name.empty()) {
463     if (!route_config_name_.empty()) return;
464   } else {
465     if (name != route_config_name_) return;
466   }
467   // Find the relevant VirtualHost from the RouteConfiguration.
468   // If the resource doesn't have the right vhost, fail.
469   auto vhost_index = XdsRouting::FindVirtualHostForDomain(
470       XdsVirtualHostListIterator(&(*route_config)->virtual_hosts),
471       data_plane_authority_);
472   if (!vhost_index.has_value()) {
473     current_virtual_host_ = nullptr;
474     ReportError(route_config_name_.empty() ? "LDS" : "RDS",
475                 route_config_name_.empty() ? listener_resource_name_
476                                            : route_config_name_,
477                 absl::StrCat("could not find VirtualHost for ",
478                              data_plane_authority_, " in RouteConfiguration"));
479     return;
480   }
481   // Update our data.
482   current_route_config_ = std::move(*route_config);
483   current_virtual_host_ = &current_route_config_->virtual_hosts[*vhost_index];
484   clusters_from_route_config_ =
485       GetClustersFromVirtualHost(*current_virtual_host_);
486   rds_resolution_note_.clear();
487   MaybeReportUpdate();
488 }
489 
OnRouteConfigAmbientError(std::string resource_name,absl::Status status)490 void XdsDependencyManager::OnRouteConfigAmbientError(std::string resource_name,
491                                                      absl::Status status) {
492   GRPC_TRACE_LOG(xds_resolver, INFO)
493       << "[XdsDependencyManager " << this
494       << "] received RouteConfig error: " << resource_name << ": " << status;
495   if (xds_client_ == nullptr) return;
496   if (status.ok()) {
497     rds_resolution_note_.clear();
498   } else {
499     rds_resolution_note_ =
500         absl::StrCat("RDS resource ", resource_name, ": ", status.message());
501   }
502   MaybeReportUpdate();
503 }
504 
OnClusterUpdate(const std::string & name,absl::StatusOr<std::shared_ptr<const XdsClusterResource>> cluster)505 void XdsDependencyManager::OnClusterUpdate(
506     const std::string& name,
507     absl::StatusOr<std::shared_ptr<const XdsClusterResource>> cluster) {
508   GRPC_TRACE_LOG(xds_resolver, INFO) << "[XdsDependencyManager " << this
509                                      << "] received Cluster update: " << name;
510   if (xds_client_ == nullptr) return;
511   if (!cluster.ok()) {
512     cluster = absl::UnavailableError(
513         absl::StrCat("CDS resource ", name, ": ", cluster.status().message()));
514   }
515   auto it = cluster_watchers_.find(name);
516   if (it == cluster_watchers_.end()) return;
517   it->second.update = std::move(cluster);
518   it->second.resolution_note.clear();
519   MaybeReportUpdate();
520 }
521 
OnClusterAmbientError(const std::string & name,absl::Status status)522 void XdsDependencyManager::OnClusterAmbientError(const std::string& name,
523                                                  absl::Status status) {
524   GRPC_TRACE_LOG(xds_resolver, INFO)
525       << "[XdsDependencyManager " << this
526       << "] received Cluster error: " << name << " " << status;
527   if (xds_client_ == nullptr) return;
528   auto it = cluster_watchers_.find(name);
529   if (it == cluster_watchers_.end()) return;
530   if (status.ok()) {
531     it->second.resolution_note.clear();
532   } else {
533     it->second.resolution_note =
534         absl::StrCat("CDS resource ", name, ": ", status.message());
535   }
536   MaybeReportUpdate();
537 }
538 
OnEndpointUpdate(const std::string & name,absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> endpoint)539 void XdsDependencyManager::OnEndpointUpdate(
540     const std::string& name,
541     absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> endpoint) {
542   GRPC_TRACE_LOG(xds_resolver, INFO) << "[XdsDependencyManager " << this
543                                      << "] received Endpoint update: " << name;
544   if (xds_client_ == nullptr) return;
545   auto it = endpoint_watchers_.find(name);
546   if (it == endpoint_watchers_.end()) return;
547   if (!endpoint.ok()) {
548     it->second.update.endpoints.reset();
549     it->second.update.resolution_note =
550         absl::StrCat("EDS resource ", name, ": ", endpoint.status().message());
551   } else {
552     if ((*endpoint)->priorities.empty()) {
553       it->second.update.resolution_note =
554           absl::StrCat("EDS resource ", name, ": contains no localities");
555     } else {
556       std::set<absl::string_view> empty_localities;
557       for (const auto& priority : (*endpoint)->priorities) {
558         for (const auto& p : priority.localities) {
559           if (p.second.endpoints.empty()) {
560             empty_localities.insert(
561                 p.first->human_readable_string().as_string_view());
562           }
563         }
564       }
565       if (!empty_localities.empty()) {
566         it->second.update.resolution_note = absl::StrCat(
567             "EDS resource ", name, ": contains empty localities: [",
568             absl::StrJoin(empty_localities, "; "), "]");
569       } else {
570         it->second.update.resolution_note.clear();
571       }
572     }
573     it->second.update.endpoints = std::move(*endpoint);
574   }
575   MaybeReportUpdate();
576 }
577 
OnEndpointAmbientError(const std::string & name,absl::Status status)578 void XdsDependencyManager::OnEndpointAmbientError(const std::string& name,
579                                                   absl::Status status) {
580   GRPC_TRACE_LOG(xds_resolver, INFO)
581       << "[XdsDependencyManager " << this
582       << "] received Endpoint error: " << name << " " << status;
583   if (xds_client_ == nullptr) return;
584   auto it = endpoint_watchers_.find(name);
585   if (it == endpoint_watchers_.end()) return;
586   if (status.ok()) {
587     it->second.update.resolution_note.clear();
588   } else {
589     it->second.update.resolution_note =
590         absl::StrCat("EDS resource ", name, ": ", status.message());
591   }
592   MaybeReportUpdate();
593 }
594 
OnDnsResult(const std::string & dns_name,Resolver::Result result)595 void XdsDependencyManager::OnDnsResult(const std::string& dns_name,
596                                        Resolver::Result result) {
597   GRPC_TRACE_LOG(xds_resolver, INFO) << "[XdsDependencyManager " << this
598                                      << "] received DNS update: " << dns_name;
599   if (xds_client_ == nullptr) return;
600   auto it = dns_resolvers_.find(dns_name);
601   if (it == dns_resolvers_.end()) return;
602   PopulateDnsUpdate(dns_name, std::move(result), &it->second);
603   MaybeReportUpdate();
604 }
605 
PopulateDnsUpdate(const std::string & dns_name,Resolver::Result result,DnsState * dns_state)606 void XdsDependencyManager::PopulateDnsUpdate(const std::string& dns_name,
607                                              Resolver::Result result,
608                                              DnsState* dns_state) {
609   // Convert resolver result to EDS update.
610   XdsEndpointResource::Priority::Locality locality;
611   locality.name = MakeRefCounted<XdsLocalityName>("", "", "");
612   locality.lb_weight = 1;
613   if (result.addresses.ok()) {
614     for (const auto& address : *result.addresses) {
615       locality.endpoints.emplace_back(
616           address.addresses(),
617           address.args().Set(GRPC_ARG_ADDRESS_NAME, dns_name));
618     }
619     dns_state->update.resolution_note = std::move(result.resolution_note);
620   } else if (result.resolution_note.empty()) {
621     dns_state->update.resolution_note =
622         absl::StrCat("DNS resolution failed for ", dns_name, ": ",
623                      result.addresses.status().ToString());
624   }
625   XdsEndpointResource::Priority priority;
626   priority.localities.emplace(locality.name.get(), std::move(locality));
627   auto resource = std::make_shared<XdsEndpointResource>();
628   resource->priorities.emplace_back(std::move(priority));
629   dns_state->update.endpoints = std::move(resource);
630 }
631 
PopulateClusterConfigMap(absl::string_view name,int depth,absl::flat_hash_map<std::string,absl::StatusOr<XdsConfig::ClusterConfig>> * cluster_config_map,std::set<absl::string_view> * eds_resources_seen,std::set<absl::string_view> * dns_names_seen,absl::StatusOr<std::vector<absl::string_view>> * leaf_clusters)632 bool XdsDependencyManager::PopulateClusterConfigMap(
633     absl::string_view name, int depth,
634     absl::flat_hash_map<std::string, absl::StatusOr<XdsConfig::ClusterConfig>>*
635         cluster_config_map,
636     std::set<absl::string_view>* eds_resources_seen,
637     std::set<absl::string_view>* dns_names_seen,
638     absl::StatusOr<std::vector<absl::string_view>>* leaf_clusters) {
639   if (depth > 0) CHECK_NE(leaf_clusters, nullptr);
640   if (depth == kMaxXdsAggregateClusterRecursionDepth) {
641     *leaf_clusters =
642         absl::UnavailableError("aggregate cluster graph exceeds max depth");
643     return true;
644   }
645   // Don't process the cluster again if we've already seen it in some
646   // other branch of the recursion tree.  We populate it with a non-OK
647   // status here, since we need an entry in the map to avoid incorrectly
648   // stopping the CDS watch, but we'll overwrite this below if we actually
649   // have the data for the cluster.
650   auto p = cluster_config_map->emplace(
651       name, absl::InternalError("cluster data not yet available"));
652   if (!p.second) return true;
653   auto& cluster_config = p.first->second;
654   auto& state = cluster_watchers_[name];
655   // Create a new watcher if needed.
656   if (state.watcher == nullptr) {
657     auto watcher = MakeRefCounted<ClusterWatcher>(Ref(), name);
658     GRPC_TRACE_LOG(xds_resolver, INFO)
659         << "[XdsDependencyManager " << this << "] starting watch for cluster "
660         << name;
661     state.watcher = watcher.get();
662     XdsClusterResourceType::StartWatch(xds_client_.get(), name,
663                                        std::move(watcher));
664     return false;
665   }
666   // If there was an error fetching the CDS resource, report the error.
667   if (!state.update.ok()) {
668     cluster_config = state.update.status();
669     return true;
670   }
671   // If we don't have the resource yet, we can't return a config yet.
672   if (*state.update == nullptr) return false;
673   // Populate endpoint info based on cluster type.
674   return Match(
675       (*state.update)->type,
676       // EDS cluster.
677       [&](const XdsClusterResource::Eds& eds) {
678         absl::string_view eds_resource_name =
679             eds.eds_service_name.empty() ? name : eds.eds_service_name;
680         eds_resources_seen->insert(eds_resource_name);
681         // Start EDS watch if needed.
682         auto& eds_state = endpoint_watchers_[eds_resource_name];
683         if (eds_state.watcher == nullptr) {
684           GRPC_TRACE_LOG(xds_resolver, INFO)
685               << "[XdsDependencyManager " << this
686               << "] starting watch for endpoint " << eds_resource_name;
687           auto watcher =
688               MakeRefCounted<EndpointWatcher>(Ref(), eds_resource_name);
689           eds_state.watcher = watcher.get();
690           XdsEndpointResourceType::StartWatch(
691               xds_client_.get(), eds_resource_name, std::move(watcher));
692           return false;
693         }
694         // Check if EDS resource has been returned.
695         if (eds_state.update.endpoints == nullptr &&
696             eds_state.update.resolution_note.empty()) {
697           return false;
698         }
699         // Populate cluster config.
700         std::array<absl::string_view, 4> notes = {
701             lds_resolution_note_, rds_resolution_note_, state.resolution_note,
702             eds_state.update.resolution_note};
703         std::vector<absl::string_view> resolution_notes;
704         for (const auto& note : notes) {
705           if (!note.empty()) resolution_notes.push_back(note);
706         }
707         std::string node_id_buffer;
708         if (resolution_notes.empty()) {
709           const XdsBootstrap::Node* node =
710               DownCast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
711                   .node();
712           if (node != nullptr) {
713             node_id_buffer = absl::StrCat("xDS node ID:", node->id());
714             resolution_notes.push_back(node_id_buffer);
715           }
716         }
717         cluster_config.emplace(*state.update, eds_state.update.endpoints,
718                                absl::StrJoin(resolution_notes, "; "));
719         if (leaf_clusters != nullptr) (*leaf_clusters)->push_back(name);
720         return true;
721       },
722       // LOGICAL_DNS cluster.
723       [&](const XdsClusterResource::LogicalDns& logical_dns) {
724         dns_names_seen->insert(logical_dns.hostname);
725         // Start DNS resolver if needed.
726         auto& dns_state = dns_resolvers_[logical_dns.hostname];
727         if (dns_state.resolver == nullptr) {
728           GRPC_TRACE_LOG(xds_resolver, INFO)
729               << "[XdsDependencyManager " << this
730               << "] starting DNS resolver for " << logical_dns.hostname;
731           auto* fake_resolver_response_generator = args_.GetPointer<
732               FakeResolverResponseGenerator>(
733               GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR);
734           ChannelArgs args = args_;
735           std::string target;
736           if (fake_resolver_response_generator != nullptr) {
737             target = absl::StrCat("fake:", logical_dns.hostname);
738             args = args.SetObject(fake_resolver_response_generator->Ref());
739           } else {
740             target = absl::StrCat("dns:", logical_dns.hostname);
741           }
742           dns_state.resolver =
743               CoreConfiguration::Get().resolver_registry().CreateResolver(
744                   target, args, interested_parties_, work_serializer_,
745                   std::make_unique<DnsResultHandler>(Ref(),
746                                                      logical_dns.hostname));
747           if (dns_state.resolver == nullptr) {
748             Resolver::Result result;
749             result.addresses.emplace();  // Empty list.
750             result.resolution_note = absl::StrCat(
751                 "failed to create DNS resolver for ", logical_dns.hostname);
752             PopulateDnsUpdate(logical_dns.hostname, std::move(result),
753                               &dns_state);
754           } else {
755             dns_state.resolver->StartLocked();
756             return false;
757           }
758         }
759         // Check if result has been returned.
760         if (dns_state.update.endpoints == nullptr &&
761             dns_state.update.resolution_note.empty()) {
762           return false;
763         }
764         // Populate cluster config.
765         cluster_config.emplace(*state.update, dns_state.update.endpoints,
766                                dns_state.update.resolution_note);
767         if (leaf_clusters != nullptr) (*leaf_clusters)->push_back(name);
768         return true;
769       },
770       // Aggregate cluster.  Recursively expand to child clusters.
771       [&](const XdsClusterResource::Aggregate& aggregate) {
772         // Grab a ref to the CDS resource for the aggregate cluster here,
773         // since our reference into cluster_watchers_ will be invalidated
774         // when we recursively call ourselves and add entries to the
775         // map for underlying clusters.
776         auto cluster_resource = *state.update;
777         // Recursively expand leaf clusters.
778         absl::StatusOr<std::vector<absl::string_view>> child_leaf_clusters;
779         child_leaf_clusters.emplace();
780         bool have_all_resources = true;
781         for (const std::string& child_name :
782              aggregate.prioritized_cluster_names) {
783           have_all_resources &= PopulateClusterConfigMap(
784               child_name, depth + 1, cluster_config_map, eds_resources_seen,
785               dns_names_seen, &child_leaf_clusters);
786           if (!child_leaf_clusters.ok()) break;
787         }
788         // Note that we cannot use the cluster_config reference we
789         // created above, because it may have been invalidated by map
790         // insertions when we recursively called ourselves, so we have
791         // to do the lookup in cluster_config_map again.
792         auto& aggregate_cluster_config = (*cluster_config_map)[name];
793         // If we exceeded max recursion depth, report an error for the
794         // cluster, and propagate the error up if needed.
795         if (!child_leaf_clusters.ok()) {
796           aggregate_cluster_config = child_leaf_clusters.status();
797           if (leaf_clusters != nullptr) {
798             *leaf_clusters = child_leaf_clusters.status();
799           }
800           return true;
801         }
802         // If needed, propagate leaf cluster list up the tree.
803         if (leaf_clusters != nullptr) {
804           (*leaf_clusters)
805               ->insert((*leaf_clusters)->end(), child_leaf_clusters->begin(),
806                        child_leaf_clusters->end());
807         }
808         // If there are no leaf clusters, report an error for the cluster.
809         if (have_all_resources && child_leaf_clusters->empty()) {
810           aggregate_cluster_config = absl::UnavailableError(
811               absl::StrCat("aggregate cluster dependency graph for ", name,
812                            " has no leaf clusters"));
813           return true;
814         }
815         // Populate cluster config.
816         // Note that we do this even for aggregate clusters that are not
817         // at the root of the tree, because we need to make sure the list
818         // of underlying cluster names stays alive so that the leaf cluster
819         // list of the root aggregate cluster can point to those strings.
820         aggregate_cluster_config.emplace(std::move(cluster_resource),
821                                          std::move(*child_leaf_clusters));
822         return have_all_resources;
823       });
824 }
825 
826 RefCountedPtr<XdsDependencyManager::ClusterSubscription>
GetClusterSubscription(absl::string_view cluster_name)827 XdsDependencyManager::GetClusterSubscription(absl::string_view cluster_name) {
828   auto it = cluster_subscriptions_.find(cluster_name);
829   if (it != cluster_subscriptions_.end()) {
830     auto subscription = it->second->RefIfNonZero();
831     if (subscription != nullptr) return subscription;
832   }
833   auto subscription = MakeRefCounted<ClusterSubscription>(cluster_name, Ref());
834   cluster_subscriptions_.emplace(subscription->cluster_name(),
835                                  subscription->WeakRef());
836   // If the cluster is not already subscribed to by virtue of being
837   // referenced in the route config, then trigger the CDS watch.
838   if (!clusters_from_route_config_.contains(cluster_name)) {
839     MaybeReportUpdate();
840   }
841   return subscription;
842 }
843 
OnClusterSubscriptionUnref(absl::string_view cluster_name,ClusterSubscription * subscription)844 void XdsDependencyManager::OnClusterSubscriptionUnref(
845     absl::string_view cluster_name, ClusterSubscription* subscription) {
846   auto it = cluster_subscriptions_.find(cluster_name);
847   // Shouldn't happen, but ignore if it does.
848   if (it == cluster_subscriptions_.end()) return;
849   // Do nothing if the subscription has already been replaced.
850   if (it->second != subscription) return;
851   // Remove the entry.
852   cluster_subscriptions_.erase(it);
853   // If this cluster is not already subscribed to by virtue of being
854   // referenced in the route config, then update watches and generate a
855   // new update.
856   if (!clusters_from_route_config_.contains(cluster_name)) {
857     MaybeReportUpdate();
858   }
859 }
860 
MaybeReportUpdate()861 void XdsDependencyManager::MaybeReportUpdate() {
862   // Populate Listener and RouteConfig fields.
863   if (current_listener_ == nullptr || current_virtual_host_ == nullptr) return;
864   auto config = MakeRefCounted<XdsConfig>();
865   config->listener = current_listener_;
866   config->route_config = current_route_config_;
867   config->virtual_host = current_virtual_host_;
868   // Determine the set of clusters we should be watching.
869   std::set<absl::string_view> clusters_to_watch;
870   for (const absl::string_view& cluster : clusters_from_route_config_) {
871     clusters_to_watch.insert(cluster);
872   }
873   for (const auto& p : cluster_subscriptions_) {
874     clusters_to_watch.insert(p.first);
875   }
876   // Populate Cluster map.
877   // We traverse the entire graph even if we don't yet have all of the
878   // resources we need to ensure that the right set of watches are active.
879   std::set<absl::string_view> eds_resources_seen;
880   std::set<absl::string_view> dns_names_seen;
881   bool have_all_resources = true;
882   for (const absl::string_view& cluster : clusters_to_watch) {
883     have_all_resources &= PopulateClusterConfigMap(
884         cluster, 0, &config->clusters, &eds_resources_seen, &dns_names_seen);
885   }
886   // Remove entries in cluster_watchers_ for any clusters not in
887   // config->clusters.
888   for (auto it = cluster_watchers_.begin(); it != cluster_watchers_.end();) {
889     const std::string& cluster_name = it->first;
890     if (config->clusters.contains(cluster_name)) {
891       ++it;
892       continue;
893     }
894     GRPC_TRACE_LOG(xds_resolver, INFO)
895         << "[XdsDependencyManager " << this << "] cancelling watch for cluster "
896         << cluster_name;
897     XdsClusterResourceType::CancelWatch(xds_client_.get(), cluster_name,
898                                         it->second.watcher,
899                                         /*delay_unsubscription=*/false);
900     cluster_watchers_.erase(it++);
901   }
902   // Remove entries in endpoint_watchers_ for any EDS resources not in
903   // eds_resources_seen.
904   for (auto it = endpoint_watchers_.begin(); it != endpoint_watchers_.end();) {
905     const std::string& eds_resource_name = it->first;
906     if (eds_resources_seen.find(eds_resource_name) !=
907         eds_resources_seen.end()) {
908       ++it;
909       continue;
910     }
911     GRPC_TRACE_LOG(xds_resolver, INFO)
912         << "[XdsDependencyManager " << this
913         << "] cancelling watch for EDS resource " << eds_resource_name;
914     XdsEndpointResourceType::CancelWatch(xds_client_.get(), eds_resource_name,
915                                          it->second.watcher,
916                                          /*delay_unsubscription=*/false);
917     endpoint_watchers_.erase(it++);
918   }
919   // Remove entries in dns_resolvers_ for any DNS name not in
920   // eds_resources_seen.
921   for (auto it = dns_resolvers_.begin(); it != dns_resolvers_.end();) {
922     const std::string& dns_name = it->first;
923     if (dns_names_seen.find(dns_name) != dns_names_seen.end()) {
924       ++it;
925       continue;
926     }
927     GRPC_TRACE_LOG(xds_resolver, INFO)
928         << "[XdsDependencyManager " << this
929         << "] shutting down DNS resolver for " << dns_name;
930     dns_resolvers_.erase(it++);
931   }
932   // If we have all the data we need, then send an update.
933   if (!have_all_resources) {
934     GRPC_TRACE_LOG(xds_resolver, INFO)
935         << "[XdsDependencyManager " << this
936         << "] missing data -- NOT returning config";
937     return;
938   }
939   GRPC_TRACE_LOG(xds_resolver, INFO)
940       << "[XdsDependencyManager " << this
941       << "] returning config: " << config->ToString();
942   watcher_->OnUpdate(std::move(config));
943 }
944 
ReportError(absl::string_view resource_type,absl::string_view resource_name,absl::string_view error)945 void XdsDependencyManager::ReportError(absl::string_view resource_type,
946                                        absl::string_view resource_name,
947                                        absl::string_view error) {
948   watcher_->OnUpdate(absl::UnavailableError(
949       absl::StrCat(resource_type, " resource ", resource_name, ": ", error)));
950 }
951 
952 }  // namespace grpc_core
953