1 //
2 // Copyright 2019 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/resolver/xds/xds_dependency_manager.h"
18
19 #include <set>
20
21 #include "absl/log/check.h"
22 #include "absl/log/log.h"
23 #include "absl/strings/str_join.h"
24 #include "src/core/config/core_configuration.h"
25 #include "src/core/load_balancing/xds/xds_channel_args.h"
26 #include "src/core/resolver/fake/fake_resolver.h"
27 #include "src/core/util/match.h"
28 #include "src/core/xds/grpc/xds_cluster_parser.h"
29 #include "src/core/xds/grpc/xds_endpoint_parser.h"
30 #include "src/core/xds/grpc/xds_listener_parser.h"
31 #include "src/core/xds/grpc/xds_route_config_parser.h"
32 #include "src/core/xds/grpc/xds_routing.h"
33
34 namespace grpc_core {
35
36 namespace {
37
38 // Max depth of aggregate cluster dependency graph.
39 constexpr int kMaxXdsAggregateClusterRecursionDepth = 16;
40
41 } // namespace
42
43 //
44 // XdsDependencyManager::ListenerWatcher
45 //
46
47 class XdsDependencyManager::ListenerWatcher final
48 : public XdsListenerResourceType::WatcherInterface {
49 public:
ListenerWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr)50 explicit ListenerWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr)
51 : dependency_mgr_(std::move(dependency_mgr)) {}
52
OnResourceChanged(absl::StatusOr<std::shared_ptr<const XdsListenerResource>> listener,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)53 void OnResourceChanged(
54 absl::StatusOr<std::shared_ptr<const XdsListenerResource>> listener,
55 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
56 dependency_mgr_->work_serializer_->Run(
57 [dependency_mgr = dependency_mgr_, listener = std::move(listener),
58 read_delay_handle = std::move(read_delay_handle)]() mutable {
59 dependency_mgr->OnListenerUpdate(std::move(listener));
60 },
61 DEBUG_LOCATION);
62 }
63
OnAmbientError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)64 void OnAmbientError(
65 absl::Status status,
66 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
67 dependency_mgr_->work_serializer_->Run(
68 [dependency_mgr = dependency_mgr_, status = std::move(status),
69 read_delay_handle = std::move(read_delay_handle)]() mutable {
70 dependency_mgr->OnListenerAmbientError(std::move(status));
71 },
72 DEBUG_LOCATION);
73 }
74
75 private:
76 RefCountedPtr<XdsDependencyManager> dependency_mgr_;
77 };
78
79 //
80 // XdsDependencyManager::RouteConfigWatcher
81 //
82
83 class XdsDependencyManager::RouteConfigWatcher final
84 : public XdsRouteConfigResourceType::WatcherInterface {
85 public:
RouteConfigWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,std::string name)86 RouteConfigWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,
87 std::string name)
88 : dependency_mgr_(std::move(dependency_mgr)), name_(std::move(name)) {}
89
OnResourceChanged(absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> route_config,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)90 void OnResourceChanged(
91 absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
92 route_config,
93 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
94 dependency_mgr_->work_serializer_->Run(
95 [self = RefAsSubclass<RouteConfigWatcher>(),
96 route_config = std::move(route_config),
97 read_delay_handle = std::move(read_delay_handle)]() mutable {
98 self->dependency_mgr_->OnRouteConfigUpdate(self->name_,
99 std::move(route_config));
100 },
101 DEBUG_LOCATION);
102 }
103
OnAmbientError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)104 void OnAmbientError(
105 absl::Status status,
106 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
107 dependency_mgr_->work_serializer_->Run(
108 [self = RefAsSubclass<RouteConfigWatcher>(), status = std::move(status),
109 read_delay_handle = std::move(read_delay_handle)]() mutable {
110 self->dependency_mgr_->OnRouteConfigAmbientError(self->name_,
111 std::move(status));
112 },
113 DEBUG_LOCATION);
114 }
115
116 private:
117 RefCountedPtr<XdsDependencyManager> dependency_mgr_;
118 std::string name_;
119 };
120
121 //
122 // XdsDependencyManager::ClusterWatcher
123 //
124
125 class XdsDependencyManager::ClusterWatcher final
126 : public XdsClusterResourceType::WatcherInterface {
127 public:
ClusterWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,absl::string_view name)128 ClusterWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,
129 absl::string_view name)
130 : dependency_mgr_(std::move(dependency_mgr)), name_(name) {}
131
OnResourceChanged(absl::StatusOr<std::shared_ptr<const XdsClusterResource>> cluster,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)132 void OnResourceChanged(
133 absl::StatusOr<std::shared_ptr<const XdsClusterResource>> cluster,
134 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
135 dependency_mgr_->work_serializer_->Run(
136 [self = RefAsSubclass<ClusterWatcher>(), cluster = std::move(cluster),
137 read_delay_handle = std::move(read_delay_handle)]() mutable {
138 self->dependency_mgr_->OnClusterUpdate(self->name_,
139 std::move(cluster));
140 },
141 DEBUG_LOCATION);
142 }
143
OnAmbientError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)144 void OnAmbientError(
145 absl::Status status,
146 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
147 dependency_mgr_->work_serializer_->Run(
148 [self = RefAsSubclass<ClusterWatcher>(), status = std::move(status),
149 read_delay_handle = std::move(read_delay_handle)]() mutable {
150 self->dependency_mgr_->OnClusterAmbientError(self->name_,
151 std::move(status));
152 },
153 DEBUG_LOCATION);
154 }
155
156 private:
157 RefCountedPtr<XdsDependencyManager> dependency_mgr_;
158 std::string name_;
159 };
160
161 //
162 // XdsDependencyManager::EndpointWatcher
163 //
164
165 class XdsDependencyManager::EndpointWatcher final
166 : public XdsEndpointResourceType::WatcherInterface {
167 public:
EndpointWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,absl::string_view name)168 EndpointWatcher(RefCountedPtr<XdsDependencyManager> dependency_mgr,
169 absl::string_view name)
170 : dependency_mgr_(std::move(dependency_mgr)), name_(name) {}
171
OnResourceChanged(absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> endpoint,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)172 void OnResourceChanged(
173 absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> endpoint,
174 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
175 dependency_mgr_->work_serializer_->Run(
176 [self = RefAsSubclass<EndpointWatcher>(),
177 endpoint = std::move(endpoint),
178 read_delay_handle = std::move(read_delay_handle)]() mutable {
179 self->dependency_mgr_->OnEndpointUpdate(self->name_,
180 std::move(endpoint));
181 },
182 DEBUG_LOCATION);
183 }
184
OnAmbientError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)185 void OnAmbientError(
186 absl::Status status,
187 RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) override {
188 dependency_mgr_->work_serializer_->Run(
189 [self = RefAsSubclass<EndpointWatcher>(), status = std::move(status),
190 read_delay_handle = std::move(read_delay_handle)]() mutable {
191 self->dependency_mgr_->OnEndpointAmbientError(self->name_,
192 std::move(status));
193 },
194 DEBUG_LOCATION);
195 }
196
197 private:
198 RefCountedPtr<XdsDependencyManager> dependency_mgr_;
199 std::string name_;
200 };
201
202 //
203 // XdsDependencyManager::DnsResultHandler
204 //
205
206 class XdsDependencyManager::DnsResultHandler final
207 : public Resolver::ResultHandler {
208 public:
DnsResultHandler(RefCountedPtr<XdsDependencyManager> dependency_mgr,std::string name)209 DnsResultHandler(RefCountedPtr<XdsDependencyManager> dependency_mgr,
210 std::string name)
211 : dependency_mgr_(std::move(dependency_mgr)), name_(std::move(name)) {}
212
ReportResult(Resolver::Result result)213 void ReportResult(Resolver::Result result) override {
214 dependency_mgr_->work_serializer_->Run(
215 [dependency_mgr = dependency_mgr_, name = name_,
216 result = std::move(result)]() mutable {
217 dependency_mgr->OnDnsResult(name, std::move(result));
218 },
219 DEBUG_LOCATION);
220 }
221
222 private:
223 RefCountedPtr<XdsDependencyManager> dependency_mgr_;
224 std::string name_;
225 };
226
227 //
228 // XdsDependencyManager::ClusterSubscription
229 //
230
Orphaned()231 void XdsDependencyManager::ClusterSubscription::Orphaned() {
232 dependency_mgr_->work_serializer_->Run(
233 [self = WeakRef()]() {
234 self->dependency_mgr_->OnClusterSubscriptionUnref(self->cluster_name_,
235 self.get());
236 },
237 DEBUG_LOCATION);
238 }
239
240 //
241 // XdsDependencyManager
242 //
243
XdsDependencyManager(RefCountedPtr<GrpcXdsClient> xds_client,std::shared_ptr<WorkSerializer> work_serializer,std::unique_ptr<Watcher> watcher,std::string data_plane_authority,std::string listener_resource_name,ChannelArgs args,grpc_pollset_set * interested_parties)244 XdsDependencyManager::XdsDependencyManager(
245 RefCountedPtr<GrpcXdsClient> xds_client,
246 std::shared_ptr<WorkSerializer> work_serializer,
247 std::unique_ptr<Watcher> watcher, std::string data_plane_authority,
248 std::string listener_resource_name, ChannelArgs args,
249 grpc_pollset_set* interested_parties)
250 : xds_client_(std::move(xds_client)),
251 work_serializer_(std::move(work_serializer)),
252 watcher_(std::move(watcher)),
253 data_plane_authority_(std::move(data_plane_authority)),
254 listener_resource_name_(std::move(listener_resource_name)),
255 args_(std::move(args)),
256 interested_parties_(interested_parties) {
257 GRPC_TRACE_LOG(xds_resolver, INFO)
258 << "[XdsDependencyManager " << this << "] starting watch for listener "
259 << listener_resource_name_;
260 auto listener_watcher = MakeRefCounted<ListenerWatcher>(Ref());
261 listener_watcher_ = listener_watcher.get();
262 XdsListenerResourceType::StartWatch(
263 xds_client_.get(), listener_resource_name_, std::move(listener_watcher));
264 }
265
Orphan()266 void XdsDependencyManager::Orphan() {
267 GRPC_TRACE_LOG(xds_resolver, INFO)
268 << "[XdsDependencyManager " << this << "] shutting down";
269 if (listener_watcher_ != nullptr) {
270 XdsListenerResourceType::CancelWatch(
271 xds_client_.get(), listener_resource_name_, listener_watcher_,
272 /*delay_unsubscription=*/false);
273 }
274 if (route_config_watcher_ != nullptr) {
275 XdsRouteConfigResourceType::CancelWatch(
276 xds_client_.get(), route_config_name_, route_config_watcher_,
277 /*delay_unsubscription=*/false);
278 }
279 for (const auto& p : cluster_watchers_) {
280 XdsClusterResourceType::CancelWatch(xds_client_.get(), p.first,
281 p.second.watcher,
282 /*delay_unsubscription=*/false);
283 }
284 for (const auto& p : endpoint_watchers_) {
285 XdsEndpointResourceType::CancelWatch(xds_client_.get(), p.first,
286 p.second.watcher,
287 /*delay_unsubscription=*/false);
288 }
289 cluster_subscriptions_.clear();
290 xds_client_.reset();
291 for (auto& p : dns_resolvers_) {
292 p.second.resolver.reset();
293 }
294 Unref();
295 }
296
RequestReresolution()297 void XdsDependencyManager::RequestReresolution() {
298 for (const auto& p : dns_resolvers_) {
299 p.second.resolver->RequestReresolutionLocked();
300 }
301 }
302
ResetBackoff()303 void XdsDependencyManager::ResetBackoff() {
304 for (const auto& p : dns_resolvers_) {
305 p.second.resolver->ResetBackoffLocked();
306 }
307 }
308
OnListenerUpdate(absl::StatusOr<std::shared_ptr<const XdsListenerResource>> listener)309 void XdsDependencyManager::OnListenerUpdate(
310 absl::StatusOr<std::shared_ptr<const XdsListenerResource>> listener) {
311 GRPC_TRACE_LOG(xds_resolver, INFO)
312 << "[XdsDependencyManager " << this << "] received Listener update";
313 if (xds_client_ == nullptr) return;
314 if (!listener.ok()) {
315 current_listener_.reset();
316 return ReportError("LDS", listener_resource_name_,
317 listener.status().message());
318 }
319 const auto* hcm = absl::get_if<XdsListenerResource::HttpConnectionManager>(
320 &(*listener)->listener);
321 if (hcm == nullptr) {
322 current_listener_.reset();
323 return ReportError("LDS", listener_resource_name_, "not an API listener");
324 }
325 current_listener_ = *std::move(listener);
326 lds_resolution_note_.clear();
327 Match(
328 hcm->route_config,
329 // RDS resource name
330 [&](const std::string& rds_name) {
331 // If the RDS name changed, update the RDS watcher.
332 // Note that this will be true on the initial update, because
333 // route_config_name_ will be empty.
334 if (route_config_name_ != rds_name) {
335 // If we already had a watch (i.e., if the previous config had
336 // a different RDS name), stop the previous watch.
337 // There will be no previous watch if either (a) this is the
338 // initial resource update or (b) the previous Listener had an
339 // inlined RouteConfig.
340 if (route_config_watcher_ != nullptr) {
341 XdsRouteConfigResourceType::CancelWatch(
342 xds_client_.get(), route_config_name_, route_config_watcher_,
343 /*delay_unsubscription=*/true);
344 route_config_watcher_ = nullptr;
345 }
346 // Start watch for the new RDS resource name.
347 route_config_name_ = rds_name;
348 GRPC_TRACE_LOG(xds_resolver, INFO)
349 << "[XdsDependencyManager " << this
350 << "] starting watch for route config " << route_config_name_;
351 auto watcher =
352 MakeRefCounted<RouteConfigWatcher>(Ref(), route_config_name_);
353 route_config_watcher_ = watcher.get();
354 XdsRouteConfigResourceType::StartWatch(
355 xds_client_.get(), route_config_name_, std::move(watcher));
356 } else {
357 // RDS resource name has not changed, so no watch needs to be
358 // updated, but we still need to propagate any changes in the
359 // HCM config (e.g., the list of HTTP filters).
360 MaybeReportUpdate();
361 }
362 },
363 // inlined RouteConfig
364 [&](const std::shared_ptr<const XdsRouteConfigResource>& route_config) {
365 // If the previous update specified an RDS resource instead of
366 // having an inlined RouteConfig, we need to cancel the RDS watch.
367 if (route_config_watcher_ != nullptr) {
368 XdsRouteConfigResourceType::CancelWatch(
369 xds_client_.get(), route_config_name_, route_config_watcher_);
370 route_config_watcher_ = nullptr;
371 route_config_name_.clear();
372 }
373 OnRouteConfigUpdate("", route_config);
374 });
375 }
376
OnListenerAmbientError(absl::Status status)377 void XdsDependencyManager::OnListenerAmbientError(absl::Status status) {
378 GRPC_TRACE_LOG(xds_resolver, INFO)
379 << "[XdsDependencyManager " << this
380 << "] received Listener error: " << listener_resource_name_ << ": "
381 << status;
382 if (xds_client_ == nullptr) return;
383 if (status.ok()) {
384 lds_resolution_note_.clear();
385 } else {
386 lds_resolution_note_ = absl::StrCat(
387 "LDS resource ", listener_resource_name_, ": ", status.message());
388 }
389 MaybeReportUpdate();
390 }
391
392 namespace {
393
394 class XdsVirtualHostListIterator final
395 : public XdsRouting::VirtualHostListIterator {
396 public:
XdsVirtualHostListIterator(const std::vector<XdsRouteConfigResource::VirtualHost> * virtual_hosts)397 explicit XdsVirtualHostListIterator(
398 const std::vector<XdsRouteConfigResource::VirtualHost>* virtual_hosts)
399 : virtual_hosts_(virtual_hosts) {}
400
Size() const401 size_t Size() const override { return virtual_hosts_->size(); }
402
GetDomainsForVirtualHost(size_t index) const403 const std::vector<std::string>& GetDomainsForVirtualHost(
404 size_t index) const override {
405 return (*virtual_hosts_)[index].domains;
406 }
407
408 private:
409 const std::vector<XdsRouteConfigResource::VirtualHost>* virtual_hosts_;
410 };
411
412 // Gets the set of clusters referenced in the specified virtual host.
GetClustersFromVirtualHost(const XdsRouteConfigResource::VirtualHost & virtual_host)413 absl::flat_hash_set<absl::string_view> GetClustersFromVirtualHost(
414 const XdsRouteConfigResource::VirtualHost& virtual_host) {
415 absl::flat_hash_set<absl::string_view> clusters;
416 for (auto& route : virtual_host.routes) {
417 auto* route_action =
418 absl::get_if<XdsRouteConfigResource::Route::RouteAction>(&route.action);
419 if (route_action == nullptr) continue;
420 Match(
421 route_action->action,
422 // cluster name
423 [&](const XdsRouteConfigResource::Route::RouteAction::ClusterName&
424 cluster_name) { clusters.insert(cluster_name.cluster_name); },
425 // WeightedClusters
426 [&](const std::vector<
427 XdsRouteConfigResource::Route::RouteAction::ClusterWeight>&
428 weighted_clusters) {
429 for (const auto& weighted_cluster : weighted_clusters) {
430 clusters.insert(weighted_cluster.name);
431 }
432 },
433 // ClusterSpecifierPlugin
434 [&](const XdsRouteConfigResource::Route::RouteAction::
435 ClusterSpecifierPluginName&) {
436 // Clusters are determined dynamically in this case, so we
437 // can't add any clusters here.
438 });
439 }
440 return clusters;
441 }
442
443 } // namespace
444
OnRouteConfigUpdate(const std::string & name,absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>> route_config)445 void XdsDependencyManager::OnRouteConfigUpdate(
446 const std::string& name,
447 absl::StatusOr<std::shared_ptr<const XdsRouteConfigResource>>
448 route_config) {
449 GRPC_TRACE_LOG(xds_resolver, INFO) << "[XdsDependencyManager " << this
450 << "] received RouteConfig update for "
451 << (name.empty() ? "<inline>" : name);
452 if (xds_client_ == nullptr) return;
453 if (!route_config.ok()) {
454 current_virtual_host_ = nullptr;
455 ReportError(route_config_name_.empty() ? "LDS" : "RDS",
456 route_config_name_.empty() ? listener_resource_name_
457 : route_config_name_,
458 route_config.status().message());
459 return;
460 }
461 // Ignore updates for stale names.
462 if (name.empty()) {
463 if (!route_config_name_.empty()) return;
464 } else {
465 if (name != route_config_name_) return;
466 }
467 // Find the relevant VirtualHost from the RouteConfiguration.
468 // If the resource doesn't have the right vhost, fail.
469 auto vhost_index = XdsRouting::FindVirtualHostForDomain(
470 XdsVirtualHostListIterator(&(*route_config)->virtual_hosts),
471 data_plane_authority_);
472 if (!vhost_index.has_value()) {
473 current_virtual_host_ = nullptr;
474 ReportError(route_config_name_.empty() ? "LDS" : "RDS",
475 route_config_name_.empty() ? listener_resource_name_
476 : route_config_name_,
477 absl::StrCat("could not find VirtualHost for ",
478 data_plane_authority_, " in RouteConfiguration"));
479 return;
480 }
481 // Update our data.
482 current_route_config_ = std::move(*route_config);
483 current_virtual_host_ = ¤t_route_config_->virtual_hosts[*vhost_index];
484 clusters_from_route_config_ =
485 GetClustersFromVirtualHost(*current_virtual_host_);
486 rds_resolution_note_.clear();
487 MaybeReportUpdate();
488 }
489
OnRouteConfigAmbientError(std::string resource_name,absl::Status status)490 void XdsDependencyManager::OnRouteConfigAmbientError(std::string resource_name,
491 absl::Status status) {
492 GRPC_TRACE_LOG(xds_resolver, INFO)
493 << "[XdsDependencyManager " << this
494 << "] received RouteConfig error: " << resource_name << ": " << status;
495 if (xds_client_ == nullptr) return;
496 if (status.ok()) {
497 rds_resolution_note_.clear();
498 } else {
499 rds_resolution_note_ =
500 absl::StrCat("RDS resource ", resource_name, ": ", status.message());
501 }
502 MaybeReportUpdate();
503 }
504
OnClusterUpdate(const std::string & name,absl::StatusOr<std::shared_ptr<const XdsClusterResource>> cluster)505 void XdsDependencyManager::OnClusterUpdate(
506 const std::string& name,
507 absl::StatusOr<std::shared_ptr<const XdsClusterResource>> cluster) {
508 GRPC_TRACE_LOG(xds_resolver, INFO) << "[XdsDependencyManager " << this
509 << "] received Cluster update: " << name;
510 if (xds_client_ == nullptr) return;
511 if (!cluster.ok()) {
512 cluster = absl::UnavailableError(
513 absl::StrCat("CDS resource ", name, ": ", cluster.status().message()));
514 }
515 auto it = cluster_watchers_.find(name);
516 if (it == cluster_watchers_.end()) return;
517 it->second.update = std::move(cluster);
518 it->second.resolution_note.clear();
519 MaybeReportUpdate();
520 }
521
OnClusterAmbientError(const std::string & name,absl::Status status)522 void XdsDependencyManager::OnClusterAmbientError(const std::string& name,
523 absl::Status status) {
524 GRPC_TRACE_LOG(xds_resolver, INFO)
525 << "[XdsDependencyManager " << this
526 << "] received Cluster error: " << name << " " << status;
527 if (xds_client_ == nullptr) return;
528 auto it = cluster_watchers_.find(name);
529 if (it == cluster_watchers_.end()) return;
530 if (status.ok()) {
531 it->second.resolution_note.clear();
532 } else {
533 it->second.resolution_note =
534 absl::StrCat("CDS resource ", name, ": ", status.message());
535 }
536 MaybeReportUpdate();
537 }
538
OnEndpointUpdate(const std::string & name,absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> endpoint)539 void XdsDependencyManager::OnEndpointUpdate(
540 const std::string& name,
541 absl::StatusOr<std::shared_ptr<const XdsEndpointResource>> endpoint) {
542 GRPC_TRACE_LOG(xds_resolver, INFO) << "[XdsDependencyManager " << this
543 << "] received Endpoint update: " << name;
544 if (xds_client_ == nullptr) return;
545 auto it = endpoint_watchers_.find(name);
546 if (it == endpoint_watchers_.end()) return;
547 if (!endpoint.ok()) {
548 it->second.update.endpoints.reset();
549 it->second.update.resolution_note =
550 absl::StrCat("EDS resource ", name, ": ", endpoint.status().message());
551 } else {
552 if ((*endpoint)->priorities.empty()) {
553 it->second.update.resolution_note =
554 absl::StrCat("EDS resource ", name, ": contains no localities");
555 } else {
556 std::set<absl::string_view> empty_localities;
557 for (const auto& priority : (*endpoint)->priorities) {
558 for (const auto& p : priority.localities) {
559 if (p.second.endpoints.empty()) {
560 empty_localities.insert(
561 p.first->human_readable_string().as_string_view());
562 }
563 }
564 }
565 if (!empty_localities.empty()) {
566 it->second.update.resolution_note = absl::StrCat(
567 "EDS resource ", name, ": contains empty localities: [",
568 absl::StrJoin(empty_localities, "; "), "]");
569 } else {
570 it->second.update.resolution_note.clear();
571 }
572 }
573 it->second.update.endpoints = std::move(*endpoint);
574 }
575 MaybeReportUpdate();
576 }
577
OnEndpointAmbientError(const std::string & name,absl::Status status)578 void XdsDependencyManager::OnEndpointAmbientError(const std::string& name,
579 absl::Status status) {
580 GRPC_TRACE_LOG(xds_resolver, INFO)
581 << "[XdsDependencyManager " << this
582 << "] received Endpoint error: " << name << " " << status;
583 if (xds_client_ == nullptr) return;
584 auto it = endpoint_watchers_.find(name);
585 if (it == endpoint_watchers_.end()) return;
586 if (status.ok()) {
587 it->second.update.resolution_note.clear();
588 } else {
589 it->second.update.resolution_note =
590 absl::StrCat("EDS resource ", name, ": ", status.message());
591 }
592 MaybeReportUpdate();
593 }
594
OnDnsResult(const std::string & dns_name,Resolver::Result result)595 void XdsDependencyManager::OnDnsResult(const std::string& dns_name,
596 Resolver::Result result) {
597 GRPC_TRACE_LOG(xds_resolver, INFO) << "[XdsDependencyManager " << this
598 << "] received DNS update: " << dns_name;
599 if (xds_client_ == nullptr) return;
600 auto it = dns_resolvers_.find(dns_name);
601 if (it == dns_resolvers_.end()) return;
602 PopulateDnsUpdate(dns_name, std::move(result), &it->second);
603 MaybeReportUpdate();
604 }
605
PopulateDnsUpdate(const std::string & dns_name,Resolver::Result result,DnsState * dns_state)606 void XdsDependencyManager::PopulateDnsUpdate(const std::string& dns_name,
607 Resolver::Result result,
608 DnsState* dns_state) {
609 // Convert resolver result to EDS update.
610 XdsEndpointResource::Priority::Locality locality;
611 locality.name = MakeRefCounted<XdsLocalityName>("", "", "");
612 locality.lb_weight = 1;
613 if (result.addresses.ok()) {
614 for (const auto& address : *result.addresses) {
615 locality.endpoints.emplace_back(
616 address.addresses(),
617 address.args().Set(GRPC_ARG_ADDRESS_NAME, dns_name));
618 }
619 dns_state->update.resolution_note = std::move(result.resolution_note);
620 } else if (result.resolution_note.empty()) {
621 dns_state->update.resolution_note =
622 absl::StrCat("DNS resolution failed for ", dns_name, ": ",
623 result.addresses.status().ToString());
624 }
625 XdsEndpointResource::Priority priority;
626 priority.localities.emplace(locality.name.get(), std::move(locality));
627 auto resource = std::make_shared<XdsEndpointResource>();
628 resource->priorities.emplace_back(std::move(priority));
629 dns_state->update.endpoints = std::move(resource);
630 }
631
PopulateClusterConfigMap(absl::string_view name,int depth,absl::flat_hash_map<std::string,absl::StatusOr<XdsConfig::ClusterConfig>> * cluster_config_map,std::set<absl::string_view> * eds_resources_seen,std::set<absl::string_view> * dns_names_seen,absl::StatusOr<std::vector<absl::string_view>> * leaf_clusters)632 bool XdsDependencyManager::PopulateClusterConfigMap(
633 absl::string_view name, int depth,
634 absl::flat_hash_map<std::string, absl::StatusOr<XdsConfig::ClusterConfig>>*
635 cluster_config_map,
636 std::set<absl::string_view>* eds_resources_seen,
637 std::set<absl::string_view>* dns_names_seen,
638 absl::StatusOr<std::vector<absl::string_view>>* leaf_clusters) {
639 if (depth > 0) CHECK_NE(leaf_clusters, nullptr);
640 if (depth == kMaxXdsAggregateClusterRecursionDepth) {
641 *leaf_clusters =
642 absl::UnavailableError("aggregate cluster graph exceeds max depth");
643 return true;
644 }
645 // Don't process the cluster again if we've already seen it in some
646 // other branch of the recursion tree. We populate it with a non-OK
647 // status here, since we need an entry in the map to avoid incorrectly
648 // stopping the CDS watch, but we'll overwrite this below if we actually
649 // have the data for the cluster.
650 auto p = cluster_config_map->emplace(
651 name, absl::InternalError("cluster data not yet available"));
652 if (!p.second) return true;
653 auto& cluster_config = p.first->second;
654 auto& state = cluster_watchers_[name];
655 // Create a new watcher if needed.
656 if (state.watcher == nullptr) {
657 auto watcher = MakeRefCounted<ClusterWatcher>(Ref(), name);
658 GRPC_TRACE_LOG(xds_resolver, INFO)
659 << "[XdsDependencyManager " << this << "] starting watch for cluster "
660 << name;
661 state.watcher = watcher.get();
662 XdsClusterResourceType::StartWatch(xds_client_.get(), name,
663 std::move(watcher));
664 return false;
665 }
666 // If there was an error fetching the CDS resource, report the error.
667 if (!state.update.ok()) {
668 cluster_config = state.update.status();
669 return true;
670 }
671 // If we don't have the resource yet, we can't return a config yet.
672 if (*state.update == nullptr) return false;
673 // Populate endpoint info based on cluster type.
674 return Match(
675 (*state.update)->type,
676 // EDS cluster.
677 [&](const XdsClusterResource::Eds& eds) {
678 absl::string_view eds_resource_name =
679 eds.eds_service_name.empty() ? name : eds.eds_service_name;
680 eds_resources_seen->insert(eds_resource_name);
681 // Start EDS watch if needed.
682 auto& eds_state = endpoint_watchers_[eds_resource_name];
683 if (eds_state.watcher == nullptr) {
684 GRPC_TRACE_LOG(xds_resolver, INFO)
685 << "[XdsDependencyManager " << this
686 << "] starting watch for endpoint " << eds_resource_name;
687 auto watcher =
688 MakeRefCounted<EndpointWatcher>(Ref(), eds_resource_name);
689 eds_state.watcher = watcher.get();
690 XdsEndpointResourceType::StartWatch(
691 xds_client_.get(), eds_resource_name, std::move(watcher));
692 return false;
693 }
694 // Check if EDS resource has been returned.
695 if (eds_state.update.endpoints == nullptr &&
696 eds_state.update.resolution_note.empty()) {
697 return false;
698 }
699 // Populate cluster config.
700 std::array<absl::string_view, 4> notes = {
701 lds_resolution_note_, rds_resolution_note_, state.resolution_note,
702 eds_state.update.resolution_note};
703 std::vector<absl::string_view> resolution_notes;
704 for (const auto& note : notes) {
705 if (!note.empty()) resolution_notes.push_back(note);
706 }
707 std::string node_id_buffer;
708 if (resolution_notes.empty()) {
709 const XdsBootstrap::Node* node =
710 DownCast<const GrpcXdsBootstrap&>(xds_client_->bootstrap())
711 .node();
712 if (node != nullptr) {
713 node_id_buffer = absl::StrCat("xDS node ID:", node->id());
714 resolution_notes.push_back(node_id_buffer);
715 }
716 }
717 cluster_config.emplace(*state.update, eds_state.update.endpoints,
718 absl::StrJoin(resolution_notes, "; "));
719 if (leaf_clusters != nullptr) (*leaf_clusters)->push_back(name);
720 return true;
721 },
722 // LOGICAL_DNS cluster.
723 [&](const XdsClusterResource::LogicalDns& logical_dns) {
724 dns_names_seen->insert(logical_dns.hostname);
725 // Start DNS resolver if needed.
726 auto& dns_state = dns_resolvers_[logical_dns.hostname];
727 if (dns_state.resolver == nullptr) {
728 GRPC_TRACE_LOG(xds_resolver, INFO)
729 << "[XdsDependencyManager " << this
730 << "] starting DNS resolver for " << logical_dns.hostname;
731 auto* fake_resolver_response_generator = args_.GetPointer<
732 FakeResolverResponseGenerator>(
733 GRPC_ARG_XDS_LOGICAL_DNS_CLUSTER_FAKE_RESOLVER_RESPONSE_GENERATOR);
734 ChannelArgs args = args_;
735 std::string target;
736 if (fake_resolver_response_generator != nullptr) {
737 target = absl::StrCat("fake:", logical_dns.hostname);
738 args = args.SetObject(fake_resolver_response_generator->Ref());
739 } else {
740 target = absl::StrCat("dns:", logical_dns.hostname);
741 }
742 dns_state.resolver =
743 CoreConfiguration::Get().resolver_registry().CreateResolver(
744 target, args, interested_parties_, work_serializer_,
745 std::make_unique<DnsResultHandler>(Ref(),
746 logical_dns.hostname));
747 if (dns_state.resolver == nullptr) {
748 Resolver::Result result;
749 result.addresses.emplace(); // Empty list.
750 result.resolution_note = absl::StrCat(
751 "failed to create DNS resolver for ", logical_dns.hostname);
752 PopulateDnsUpdate(logical_dns.hostname, std::move(result),
753 &dns_state);
754 } else {
755 dns_state.resolver->StartLocked();
756 return false;
757 }
758 }
759 // Check if result has been returned.
760 if (dns_state.update.endpoints == nullptr &&
761 dns_state.update.resolution_note.empty()) {
762 return false;
763 }
764 // Populate cluster config.
765 cluster_config.emplace(*state.update, dns_state.update.endpoints,
766 dns_state.update.resolution_note);
767 if (leaf_clusters != nullptr) (*leaf_clusters)->push_back(name);
768 return true;
769 },
770 // Aggregate cluster. Recursively expand to child clusters.
771 [&](const XdsClusterResource::Aggregate& aggregate) {
772 // Grab a ref to the CDS resource for the aggregate cluster here,
773 // since our reference into cluster_watchers_ will be invalidated
774 // when we recursively call ourselves and add entries to the
775 // map for underlying clusters.
776 auto cluster_resource = *state.update;
777 // Recursively expand leaf clusters.
778 absl::StatusOr<std::vector<absl::string_view>> child_leaf_clusters;
779 child_leaf_clusters.emplace();
780 bool have_all_resources = true;
781 for (const std::string& child_name :
782 aggregate.prioritized_cluster_names) {
783 have_all_resources &= PopulateClusterConfigMap(
784 child_name, depth + 1, cluster_config_map, eds_resources_seen,
785 dns_names_seen, &child_leaf_clusters);
786 if (!child_leaf_clusters.ok()) break;
787 }
788 // Note that we cannot use the cluster_config reference we
789 // created above, because it may have been invalidated by map
790 // insertions when we recursively called ourselves, so we have
791 // to do the lookup in cluster_config_map again.
792 auto& aggregate_cluster_config = (*cluster_config_map)[name];
793 // If we exceeded max recursion depth, report an error for the
794 // cluster, and propagate the error up if needed.
795 if (!child_leaf_clusters.ok()) {
796 aggregate_cluster_config = child_leaf_clusters.status();
797 if (leaf_clusters != nullptr) {
798 *leaf_clusters = child_leaf_clusters.status();
799 }
800 return true;
801 }
802 // If needed, propagate leaf cluster list up the tree.
803 if (leaf_clusters != nullptr) {
804 (*leaf_clusters)
805 ->insert((*leaf_clusters)->end(), child_leaf_clusters->begin(),
806 child_leaf_clusters->end());
807 }
808 // If there are no leaf clusters, report an error for the cluster.
809 if (have_all_resources && child_leaf_clusters->empty()) {
810 aggregate_cluster_config = absl::UnavailableError(
811 absl::StrCat("aggregate cluster dependency graph for ", name,
812 " has no leaf clusters"));
813 return true;
814 }
815 // Populate cluster config.
816 // Note that we do this even for aggregate clusters that are not
817 // at the root of the tree, because we need to make sure the list
818 // of underlying cluster names stays alive so that the leaf cluster
819 // list of the root aggregate cluster can point to those strings.
820 aggregate_cluster_config.emplace(std::move(cluster_resource),
821 std::move(*child_leaf_clusters));
822 return have_all_resources;
823 });
824 }
825
826 RefCountedPtr<XdsDependencyManager::ClusterSubscription>
GetClusterSubscription(absl::string_view cluster_name)827 XdsDependencyManager::GetClusterSubscription(absl::string_view cluster_name) {
828 auto it = cluster_subscriptions_.find(cluster_name);
829 if (it != cluster_subscriptions_.end()) {
830 auto subscription = it->second->RefIfNonZero();
831 if (subscription != nullptr) return subscription;
832 }
833 auto subscription = MakeRefCounted<ClusterSubscription>(cluster_name, Ref());
834 cluster_subscriptions_.emplace(subscription->cluster_name(),
835 subscription->WeakRef());
836 // If the cluster is not already subscribed to by virtue of being
837 // referenced in the route config, then trigger the CDS watch.
838 if (!clusters_from_route_config_.contains(cluster_name)) {
839 MaybeReportUpdate();
840 }
841 return subscription;
842 }
843
OnClusterSubscriptionUnref(absl::string_view cluster_name,ClusterSubscription * subscription)844 void XdsDependencyManager::OnClusterSubscriptionUnref(
845 absl::string_view cluster_name, ClusterSubscription* subscription) {
846 auto it = cluster_subscriptions_.find(cluster_name);
847 // Shouldn't happen, but ignore if it does.
848 if (it == cluster_subscriptions_.end()) return;
849 // Do nothing if the subscription has already been replaced.
850 if (it->second != subscription) return;
851 // Remove the entry.
852 cluster_subscriptions_.erase(it);
853 // If this cluster is not already subscribed to by virtue of being
854 // referenced in the route config, then update watches and generate a
855 // new update.
856 if (!clusters_from_route_config_.contains(cluster_name)) {
857 MaybeReportUpdate();
858 }
859 }
860
MaybeReportUpdate()861 void XdsDependencyManager::MaybeReportUpdate() {
862 // Populate Listener and RouteConfig fields.
863 if (current_listener_ == nullptr || current_virtual_host_ == nullptr) return;
864 auto config = MakeRefCounted<XdsConfig>();
865 config->listener = current_listener_;
866 config->route_config = current_route_config_;
867 config->virtual_host = current_virtual_host_;
868 // Determine the set of clusters we should be watching.
869 std::set<absl::string_view> clusters_to_watch;
870 for (const absl::string_view& cluster : clusters_from_route_config_) {
871 clusters_to_watch.insert(cluster);
872 }
873 for (const auto& p : cluster_subscriptions_) {
874 clusters_to_watch.insert(p.first);
875 }
876 // Populate Cluster map.
877 // We traverse the entire graph even if we don't yet have all of the
878 // resources we need to ensure that the right set of watches are active.
879 std::set<absl::string_view> eds_resources_seen;
880 std::set<absl::string_view> dns_names_seen;
881 bool have_all_resources = true;
882 for (const absl::string_view& cluster : clusters_to_watch) {
883 have_all_resources &= PopulateClusterConfigMap(
884 cluster, 0, &config->clusters, &eds_resources_seen, &dns_names_seen);
885 }
886 // Remove entries in cluster_watchers_ for any clusters not in
887 // config->clusters.
888 for (auto it = cluster_watchers_.begin(); it != cluster_watchers_.end();) {
889 const std::string& cluster_name = it->first;
890 if (config->clusters.contains(cluster_name)) {
891 ++it;
892 continue;
893 }
894 GRPC_TRACE_LOG(xds_resolver, INFO)
895 << "[XdsDependencyManager " << this << "] cancelling watch for cluster "
896 << cluster_name;
897 XdsClusterResourceType::CancelWatch(xds_client_.get(), cluster_name,
898 it->second.watcher,
899 /*delay_unsubscription=*/false);
900 cluster_watchers_.erase(it++);
901 }
902 // Remove entries in endpoint_watchers_ for any EDS resources not in
903 // eds_resources_seen.
904 for (auto it = endpoint_watchers_.begin(); it != endpoint_watchers_.end();) {
905 const std::string& eds_resource_name = it->first;
906 if (eds_resources_seen.find(eds_resource_name) !=
907 eds_resources_seen.end()) {
908 ++it;
909 continue;
910 }
911 GRPC_TRACE_LOG(xds_resolver, INFO)
912 << "[XdsDependencyManager " << this
913 << "] cancelling watch for EDS resource " << eds_resource_name;
914 XdsEndpointResourceType::CancelWatch(xds_client_.get(), eds_resource_name,
915 it->second.watcher,
916 /*delay_unsubscription=*/false);
917 endpoint_watchers_.erase(it++);
918 }
919 // Remove entries in dns_resolvers_ for any DNS name not in
920 // eds_resources_seen.
921 for (auto it = dns_resolvers_.begin(); it != dns_resolvers_.end();) {
922 const std::string& dns_name = it->first;
923 if (dns_names_seen.find(dns_name) != dns_names_seen.end()) {
924 ++it;
925 continue;
926 }
927 GRPC_TRACE_LOG(xds_resolver, INFO)
928 << "[XdsDependencyManager " << this
929 << "] shutting down DNS resolver for " << dns_name;
930 dns_resolvers_.erase(it++);
931 }
932 // If we have all the data we need, then send an update.
933 if (!have_all_resources) {
934 GRPC_TRACE_LOG(xds_resolver, INFO)
935 << "[XdsDependencyManager " << this
936 << "] missing data -- NOT returning config";
937 return;
938 }
939 GRPC_TRACE_LOG(xds_resolver, INFO)
940 << "[XdsDependencyManager " << this
941 << "] returning config: " << config->ToString();
942 watcher_->OnUpdate(std::move(config));
943 }
944
ReportError(absl::string_view resource_type,absl::string_view resource_name,absl::string_view error)945 void XdsDependencyManager::ReportError(absl::string_view resource_type,
946 absl::string_view resource_name,
947 absl::string_view error) {
948 watcher_->OnUpdate(absl::UnavailableError(
949 absl::StrCat(resource_type, " resource ", resource_name, ": ", error)));
950 }
951
952 } // namespace grpc_core
953