1 /*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "absl/strings/match.h"
22 #include "absl/strings/str_join.h"
23 #include "absl/strings/str_split.h"
24 #include "re2/re2.h"
25
26 #include "src/core/ext/filters/client_channel/config_selector.h"
27 #include "src/core/ext/filters/client_channel/resolver_registry.h"
28 #include "src/core/ext/xds/xds_client.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/iomgr/closure.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32 #include "src/core/lib/transport/timeout_encoding.h"
33
34 namespace grpc_core {
35
36 TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");
37
38 const char* kXdsClusterAttribute = "xds_cluster_name";
39
40 namespace {
41
42 //
43 // XdsResolver
44 //
45
46 class XdsResolver : public Resolver {
47 public:
XdsResolver(ResolverArgs args)48 explicit XdsResolver(ResolverArgs args)
49 : work_serializer_(std::move(args.work_serializer)),
50 result_handler_(std::move(args.result_handler)),
51 server_name_(absl::StripPrefix(args.uri.path(), "/")),
52 args_(grpc_channel_args_copy(args.args)),
53 interested_parties_(args.pollset_set) {
54 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
55 gpr_log(GPR_INFO, "[xds_resolver %p] created for server name %s", this,
56 server_name_.c_str());
57 }
58 }
59
~XdsResolver()60 ~XdsResolver() override {
61 grpc_channel_args_destroy(args_);
62 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
63 gpr_log(GPR_INFO, "[xds_resolver %p] destroyed", this);
64 }
65 }
66
67 void StartLocked() override;
68
69 void ShutdownLocked() override;
70
71 private:
72 class Notifier {
73 public:
74 Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::LdsUpdate update);
75 Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::RdsUpdate update);
76 Notifier(RefCountedPtr<XdsResolver> resolver, grpc_error* error);
77 explicit Notifier(RefCountedPtr<XdsResolver> resolver);
78
79 private:
80 enum Type { kLdsUpdate, kRdsUpdate, kError, kDoesNotExist };
81
82 static void RunInExecCtx(void* arg, grpc_error* error);
83 void RunInWorkSerializer(grpc_error* error);
84
85 RefCountedPtr<XdsResolver> resolver_;
86 grpc_closure closure_;
87 XdsApi::LdsUpdate update_;
88 Type type_;
89 };
90
91 class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
92 public:
ListenerWatcher(RefCountedPtr<XdsResolver> resolver)93 explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
94 : resolver_(std::move(resolver)) {}
OnListenerChanged(XdsApi::LdsUpdate listener)95 void OnListenerChanged(XdsApi::LdsUpdate listener) override {
96 new Notifier(resolver_, std::move(listener));
97 }
OnError(grpc_error * error)98 void OnError(grpc_error* error) override { new Notifier(resolver_, error); }
OnResourceDoesNotExist()99 void OnResourceDoesNotExist() override { new Notifier(resolver_); }
100
101 private:
102 RefCountedPtr<XdsResolver> resolver_;
103 };
104
105 class RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface {
106 public:
RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)107 explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
108 : resolver_(std::move(resolver)) {}
OnRouteConfigChanged(XdsApi::RdsUpdate route_config)109 void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override {
110 new Notifier(resolver_, std::move(route_config));
111 }
OnError(grpc_error * error)112 void OnError(grpc_error* error) override { new Notifier(resolver_, error); }
OnResourceDoesNotExist()113 void OnResourceDoesNotExist() override { new Notifier(resolver_); }
114
115 private:
116 RefCountedPtr<XdsResolver> resolver_;
117 };
118
119 class ClusterState
120 : public RefCounted<ClusterState, PolymorphicRefCount, false> {
121 public:
122 using ClusterStateMap =
123 std::map<std::string, std::unique_ptr<ClusterState>>;
124
ClusterState(const std::string & cluster_name,ClusterStateMap * cluster_state_map)125 ClusterState(const std::string& cluster_name,
126 ClusterStateMap* cluster_state_map)
127 : it_(cluster_state_map
128 ->emplace(cluster_name, std::unique_ptr<ClusterState>(this))
129 .first) {}
cluster() const130 const std::string& cluster() const { return it_->first; }
131
132 private:
133 ClusterStateMap::iterator it_;
134 };
135
136 class XdsConfigSelector : public ConfigSelector {
137 public:
138 XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
139 const std::vector<XdsApi::Route>& routes,
140 grpc_error* error);
141 ~XdsConfigSelector() override;
142
name() const143 const char* name() const override { return "XdsConfigSelector"; }
144
Equals(const ConfigSelector * other) const145 bool Equals(const ConfigSelector* other) const override {
146 const auto* other_xds = static_cast<const XdsConfigSelector*>(other);
147 // Don't need to compare resolver_, since that will always be the same.
148 return route_table_ == other_xds->route_table_ &&
149 clusters_ == other_xds->clusters_;
150 }
151
152 CallConfig GetCallConfig(GetCallConfigArgs args) override;
153
154 private:
155 struct Route {
156 XdsApi::Route route;
157 absl::InlinedVector<std::pair<uint32_t, absl::string_view>, 2>
158 weighted_cluster_state;
159 RefCountedPtr<ServiceConfig> method_config;
operator ==grpc_core::__anon0bd98fcd0111::XdsResolver::XdsConfigSelector::Route160 bool operator==(const Route& other) const {
161 return route == other.route &&
162 weighted_cluster_state == other.weighted_cluster_state;
163 }
164 };
165 using RouteTable = std::vector<Route>;
166
167 void MaybeAddCluster(const std::string& name);
168 grpc_error* CreateMethodConfig(RefCountedPtr<ServiceConfig>* method_config,
169 const XdsApi::Route& route);
170
171 RefCountedPtr<XdsResolver> resolver_;
172 RouteTable route_table_;
173 std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters_;
174 };
175
176 void OnListenerUpdate(XdsApi::LdsUpdate listener);
177 void OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update);
178 void OnError(grpc_error* error);
179 void OnResourceDoesNotExist();
180
181 grpc_error* CreateServiceConfig(RefCountedPtr<ServiceConfig>* service_config);
182 void GenerateResult();
183 void MaybeRemoveUnusedClusters();
184
185 std::shared_ptr<WorkSerializer> work_serializer_;
186 std::unique_ptr<ResultHandler> result_handler_;
187 std::string server_name_;
188 const grpc_channel_args* args_;
189 grpc_pollset_set* interested_parties_;
190 RefCountedPtr<XdsClient> xds_client_;
191 XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr;
192 std::string route_config_name_;
193 XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr;
194 ClusterState::ClusterStateMap cluster_state_map_;
195 std::vector<XdsApi::Route> current_update_;
196 XdsApi::Duration http_max_stream_duration_;
197 };
198
199 //
200 // XdsResolver::Notifier
201 //
202
Notifier(RefCountedPtr<XdsResolver> resolver,XdsApi::LdsUpdate update)203 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
204 XdsApi::LdsUpdate update)
205 : resolver_(std::move(resolver)),
206 update_(std::move(update)),
207 type_(kLdsUpdate) {
208 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
209 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
210 }
211
Notifier(RefCountedPtr<XdsResolver> resolver,XdsApi::RdsUpdate update)212 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
213 XdsApi::RdsUpdate update)
214 : resolver_(std::move(resolver)), type_(kRdsUpdate) {
215 update_.rds_update = std::move(update);
216 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
217 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
218 }
219
Notifier(RefCountedPtr<XdsResolver> resolver,grpc_error * error)220 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
221 grpc_error* error)
222 : resolver_(std::move(resolver)), type_(kError) {
223 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
224 ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
225 }
226
Notifier(RefCountedPtr<XdsResolver> resolver)227 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver)
228 : resolver_(std::move(resolver)), type_(kDoesNotExist) {
229 GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
230 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
231 }
232
RunInExecCtx(void * arg,grpc_error * error)233 void XdsResolver::Notifier::RunInExecCtx(void* arg, grpc_error* error) {
234 Notifier* self = static_cast<Notifier*>(arg);
235 GRPC_ERROR_REF(error);
236 self->resolver_->work_serializer_->Run(
237 [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
238 }
239
RunInWorkSerializer(grpc_error * error)240 void XdsResolver::Notifier::RunInWorkSerializer(grpc_error* error) {
241 if (resolver_->xds_client_ == nullptr) {
242 GRPC_ERROR_UNREF(error);
243 delete this;
244 return;
245 }
246 switch (type_) {
247 case kLdsUpdate:
248 resolver_->OnListenerUpdate(std::move(update_));
249 break;
250 case kRdsUpdate:
251 resolver_->OnRouteConfigUpdate(std::move(*update_.rds_update));
252 break;
253 case kError:
254 resolver_->OnError(error);
255 break;
256 case kDoesNotExist:
257 resolver_->OnResourceDoesNotExist();
258 break;
259 };
260 delete this;
261 }
262
263 //
264 // XdsResolver::XdsConfigSelector
265 //
266
XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,const std::vector<XdsApi::Route> & routes,grpc_error * error)267 XdsResolver::XdsConfigSelector::XdsConfigSelector(
268 RefCountedPtr<XdsResolver> resolver,
269 const std::vector<XdsApi::Route>& routes, grpc_error* error)
270 : resolver_(std::move(resolver)) {
271 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
272 gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p",
273 resolver_.get(), this);
274 }
275 // 1. Construct the route table
276 // 2 Update resolver's cluster state map
277 // 3. Construct cluster list to hold on to entries in the cluster state
278 // map.
279 // Reserve the necessary entries up-front to avoid reallocation as we add
280 // elements. This is necessary because the string_view in the entry's
281 // weighted_cluster_state field points to the memory in the route field, so
282 // moving the entry in a reallocation will cause the string_view to point to
283 // invalid data.
284 route_table_.reserve(routes.size());
285 for (auto& route : routes) {
286 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
287 gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s",
288 resolver_.get(), this, route.ToString().c_str());
289 }
290 route_table_.emplace_back();
291 auto& route_entry = route_table_.back();
292 route_entry.route = route;
293 // If the route doesn't specify a timeout, set its timeout to the global
294 // one.
295 if (!route.max_stream_duration.has_value()) {
296 route_entry.route.max_stream_duration =
297 resolver_->http_max_stream_duration_;
298 }
299 error = CreateMethodConfig(&route_entry.method_config, route_entry.route);
300 if (route.weighted_clusters.empty()) {
301 MaybeAddCluster(route.cluster_name);
302 } else {
303 uint32_t end = 0;
304 for (const auto& weighted_cluster : route_entry.route.weighted_clusters) {
305 MaybeAddCluster(weighted_cluster.name);
306 end += weighted_cluster.weight;
307 route_entry.weighted_cluster_state.emplace_back(end,
308 weighted_cluster.name);
309 }
310 }
311 }
312 }
313
CreateMethodConfig(RefCountedPtr<ServiceConfig> * method_config,const XdsApi::Route & route)314 grpc_error* XdsResolver::XdsConfigSelector::CreateMethodConfig(
315 RefCountedPtr<ServiceConfig>* method_config, const XdsApi::Route& route) {
316 grpc_error* error = GRPC_ERROR_NONE;
317 std::vector<std::string> fields;
318 if (route.max_stream_duration.has_value() &&
319 (route.max_stream_duration->seconds != 0 ||
320 route.max_stream_duration->nanos != 0)) {
321 fields.emplace_back(absl::StrFormat(" \"timeout\": \"%d.%09ds\"",
322 route.max_stream_duration->seconds,
323 route.max_stream_duration->nanos));
324 }
325 if (!fields.empty()) {
326 std::string json = absl::StrCat(
327 "{\n"
328 " \"methodConfig\": [ {\n"
329 " \"name\": [\n"
330 " {}\n"
331 " ],\n"
332 " ",
333 absl::StrJoin(fields, ",\n"),
334 "\n } ]\n"
335 "}");
336 *method_config =
337 ServiceConfig::Create(resolver_->args_, json.c_str(), &error);
338 }
339 return error;
340 }
341
~XdsConfigSelector()342 XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
343 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
344 gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p",
345 resolver_.get(), this);
346 }
347 clusters_.clear();
348 resolver_->MaybeRemoveUnusedClusters();
349 }
350
MaybeAddCluster(const std::string & name)351 void XdsResolver::XdsConfigSelector::MaybeAddCluster(const std::string& name) {
352 if (clusters_.find(name) == clusters_.end()) {
353 auto it = resolver_->cluster_state_map_.find(name);
354 if (it == resolver_->cluster_state_map_.end()) {
355 auto new_cluster_state =
356 MakeRefCounted<ClusterState>(name, &resolver_->cluster_state_map_);
357 clusters_[new_cluster_state->cluster()] = std::move(new_cluster_state);
358 } else {
359 clusters_[it->second->cluster()] = it->second->Ref();
360 }
361 }
362 }
363
GetMetadataValue(const std::string & target_key,grpc_metadata_batch * initial_metadata,std::string * concatenated_value)364 absl::optional<absl::string_view> GetMetadataValue(
365 const std::string& target_key, grpc_metadata_batch* initial_metadata,
366 std::string* concatenated_value) {
367 // Find all values for the specified key.
368 GPR_DEBUG_ASSERT(initial_metadata != nullptr);
369 absl::InlinedVector<absl::string_view, 1> values;
370 for (grpc_linked_mdelem* md = initial_metadata->list.head; md != nullptr;
371 md = md->next) {
372 absl::string_view key = StringViewFromSlice(GRPC_MDKEY(md->md));
373 absl::string_view value = StringViewFromSlice(GRPC_MDVALUE(md->md));
374 if (target_key == key) values.push_back(value);
375 }
376 // If none found, no match.
377 if (values.empty()) return absl::nullopt;
378 // If exactly one found, return it as-is.
379 if (values.size() == 1) return values.front();
380 // If more than one found, concatenate the values, using
381 // *concatenated_values as a temporary holding place for the
382 // concatenated string.
383 *concatenated_value = absl::StrJoin(values, ",");
384 return *concatenated_value;
385 }
386
HeaderMatchHelper(const HeaderMatcher & header_matcher,grpc_metadata_batch * initial_metadata)387 bool HeaderMatchHelper(const HeaderMatcher& header_matcher,
388 grpc_metadata_batch* initial_metadata) {
389 std::string concatenated_value;
390 absl::optional<absl::string_view> value;
391 // Note: If we ever allow binary headers here, we still need to
392 // special-case ignore "grpc-tags-bin" and "grpc-trace-bin", since
393 // they are not visible to the LB policy in grpc-go.
394 if (absl::EndsWith(header_matcher.name(), "-bin") ||
395 header_matcher.name() == "grpc-previous-rpc-attempts") {
396 value = absl::nullopt;
397 } else if (header_matcher.name() == "content-type") {
398 value = "application/grpc";
399 } else {
400 value = GetMetadataValue(header_matcher.name(), initial_metadata,
401 &concatenated_value);
402 }
403 return header_matcher.Match(value);
404 }
405
HeadersMatch(const std::vector<HeaderMatcher> & header_matchers,grpc_metadata_batch * initial_metadata)406 bool HeadersMatch(const std::vector<HeaderMatcher>& header_matchers,
407 grpc_metadata_batch* initial_metadata) {
408 for (const auto& header_matcher : header_matchers) {
409 if (!HeaderMatchHelper(header_matcher, initial_metadata)) return false;
410 }
411 return true;
412 }
413
UnderFraction(const uint32_t fraction_per_million)414 bool UnderFraction(const uint32_t fraction_per_million) {
415 // Generate a random number in [0, 1000000).
416 const uint32_t random_number = rand() % 1000000;
417 return random_number < fraction_per_million;
418 }
419
GetCallConfig(GetCallConfigArgs args)420 ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
421 GetCallConfigArgs args) {
422 for (const auto& entry : route_table_) {
423 // Path matching.
424 if (!entry.route.matchers.path_matcher.Match(
425 StringViewFromSlice(*args.path))) {
426 continue;
427 }
428 // Header Matching.
429 if (!HeadersMatch(entry.route.matchers.header_matchers,
430 args.initial_metadata)) {
431 continue;
432 }
433 // Match fraction check
434 if (entry.route.matchers.fraction_per_million.has_value() &&
435 !UnderFraction(entry.route.matchers.fraction_per_million.value())) {
436 continue;
437 }
438 // Found a route match
439 absl::string_view cluster_name;
440 if (entry.route.weighted_clusters.empty()) {
441 cluster_name = entry.route.cluster_name;
442 } else {
443 const uint32_t key =
444 rand() %
445 entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1]
446 .first;
447 // Find the index in weighted clusters corresponding to key.
448 size_t mid = 0;
449 size_t start_index = 0;
450 size_t end_index = entry.weighted_cluster_state.size() - 1;
451 size_t index = 0;
452 while (end_index > start_index) {
453 mid = (start_index + end_index) / 2;
454 if (entry.weighted_cluster_state[mid].first > key) {
455 end_index = mid;
456 } else if (entry.weighted_cluster_state[mid].first < key) {
457 start_index = mid + 1;
458 } else {
459 index = mid + 1;
460 break;
461 }
462 }
463 if (index == 0) index = start_index;
464 GPR_ASSERT(entry.weighted_cluster_state[index].first > key);
465 cluster_name = entry.weighted_cluster_state[index].second;
466 }
467 auto it = clusters_.find(cluster_name);
468 GPR_ASSERT(it != clusters_.end());
469 XdsResolver* resolver =
470 static_cast<XdsResolver*>(resolver_->Ref().release());
471 ClusterState* cluster_state = it->second->Ref().release();
472 CallConfig call_config;
473 if (entry.method_config != nullptr) {
474 call_config.service_config = entry.method_config;
475 call_config.method_configs =
476 entry.method_config->GetMethodParsedConfigVector(grpc_empty_slice());
477 }
478 call_config.call_attributes[kXdsClusterAttribute] = it->first;
479 call_config.on_call_committed = [resolver, cluster_state]() {
480 cluster_state->Unref();
481 ExecCtx::Run(
482 // TODO(roth): This hop into the ExecCtx is being done to avoid
483 // entering the WorkSerializer while holding the client channel data
484 // plane mutex, since that can lead to deadlocks. However, we should
485 // not have to solve this problem in each individual ConfigSelector
486 // implementation. When we have time, we should fix the client channel
487 // code to avoid this by not invoking the
488 // CallConfig::on_call_committed callback until after it has released
489 // the data plane mutex.
490 DEBUG_LOCATION,
491 GRPC_CLOSURE_CREATE(
492 [](void* arg, grpc_error* /*error*/) {
493 auto* resolver = static_cast<XdsResolver*>(arg);
494 resolver->work_serializer_->Run(
495 [resolver]() {
496 resolver->MaybeRemoveUnusedClusters();
497 resolver->Unref();
498 },
499 DEBUG_LOCATION);
500 },
501 resolver, nullptr),
502 GRPC_ERROR_NONE);
503 };
504 return call_config;
505 }
506 return CallConfig();
507 }
508
509 //
510 // XdsResolver
511 //
512
StartLocked()513 void XdsResolver::StartLocked() {
514 grpc_error* error = GRPC_ERROR_NONE;
515 xds_client_ = XdsClient::GetOrCreate(&error);
516 if (error != GRPC_ERROR_NONE) {
517 gpr_log(GPR_ERROR,
518 "Failed to create xds client -- channel will remain in "
519 "TRANSIENT_FAILURE: %s",
520 grpc_error_string(error));
521 result_handler_->ReturnError(error);
522 return;
523 }
524 grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
525 interested_parties_);
526 channelz::ChannelNode* parent_channelz_node =
527 grpc_channel_args_find_pointer<channelz::ChannelNode>(
528 args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
529 if (parent_channelz_node != nullptr) {
530 xds_client_->AddChannelzLinkage(parent_channelz_node);
531 }
532 auto watcher = absl::make_unique<ListenerWatcher>(Ref());
533 listener_watcher_ = watcher.get();
534 xds_client_->WatchListenerData(server_name_, std::move(watcher));
535 }
536
ShutdownLocked()537 void XdsResolver::ShutdownLocked() {
538 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
539 gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
540 }
541 if (xds_client_ != nullptr) {
542 if (listener_watcher_ != nullptr) {
543 xds_client_->CancelListenerDataWatch(server_name_, listener_watcher_,
544 /*delay_unsubscription=*/false);
545 }
546 if (route_config_watcher_ != nullptr) {
547 xds_client_->CancelRouteConfigDataWatch(
548 server_name_, route_config_watcher_, /*delay_unsubscription=*/false);
549 }
550 channelz::ChannelNode* parent_channelz_node =
551 grpc_channel_args_find_pointer<channelz::ChannelNode>(
552 args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
553 if (parent_channelz_node != nullptr) {
554 xds_client_->RemoveChannelzLinkage(parent_channelz_node);
555 }
556 grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
557 interested_parties_);
558 xds_client_.reset();
559 }
560 }
561
OnListenerUpdate(XdsApi::LdsUpdate listener)562 void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) {
563 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
564 gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this);
565 }
566 if (listener.route_config_name != route_config_name_) {
567 if (route_config_watcher_ != nullptr) {
568 xds_client_->CancelRouteConfigDataWatch(
569 route_config_name_, route_config_watcher_,
570 /*delay_unsubscription=*/!listener.route_config_name.empty());
571 route_config_watcher_ = nullptr;
572 }
573 route_config_name_ = std::move(listener.route_config_name);
574 if (!route_config_name_.empty()) {
575 auto watcher = absl::make_unique<RouteConfigWatcher>(Ref());
576 route_config_watcher_ = watcher.get();
577 xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher));
578 }
579 }
580 http_max_stream_duration_ = listener.http_max_stream_duration;
581 if (route_config_name_.empty()) {
582 GPR_ASSERT(listener.rds_update.has_value());
583 OnRouteConfigUpdate(std::move(*listener.rds_update));
584 }
585 }
586
OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update)587 void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
588 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
589 gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this);
590 }
591 // Find the relevant VirtualHost from the RouteConfiguration.
592 XdsApi::RdsUpdate::VirtualHost* vhost =
593 rds_update.FindVirtualHostForDomain(server_name_);
594 if (vhost == nullptr) {
595 OnError(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
596 absl::StrCat("could not find VirtualHost for ", server_name_,
597 " in RouteConfiguration")
598 .c_str()));
599 return;
600 }
601 // Save the list of routes in the resolver.
602 current_update_ = std::move(vhost->routes);
603 // Send a new result to the channel.
604 GenerateResult();
605 }
606
OnError(grpc_error * error)607 void XdsResolver::OnError(grpc_error* error) {
608 gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s",
609 this, grpc_error_string(error));
610 Result result;
611 result.args = grpc_channel_args_copy(args_);
612 result.service_config_error = error;
613 result_handler_->ReturnResult(std::move(result));
614 }
615
OnResourceDoesNotExist()616 void XdsResolver::OnResourceDoesNotExist() {
617 gpr_log(GPR_ERROR,
618 "[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
619 "update and returning empty service config",
620 this);
621 current_update_.clear();
622 Result result;
623 result.service_config =
624 ServiceConfig::Create(args_, "{}", &result.service_config_error);
625 GPR_ASSERT(result.service_config != nullptr);
626 result.args = grpc_channel_args_copy(args_);
627 result_handler_->ReturnResult(std::move(result));
628 }
629
CreateServiceConfig(RefCountedPtr<ServiceConfig> * service_config)630 grpc_error* XdsResolver::CreateServiceConfig(
631 RefCountedPtr<ServiceConfig>* service_config) {
632 std::vector<std::string> clusters;
633 for (const auto& cluster : cluster_state_map_) {
634 clusters.push_back(
635 absl::StrFormat(" \"%s\":{\n"
636 " \"childPolicy\":[ {\n"
637 " \"cds_experimental\":{\n"
638 " \"cluster\": \"%s\"\n"
639 " }\n"
640 " } ]\n"
641 " }",
642 cluster.first, cluster.first));
643 }
644 std::vector<std::string> config_parts;
645 config_parts.push_back(
646 "{\n"
647 " \"loadBalancingConfig\":[\n"
648 " { \"xds_cluster_manager_experimental\":{\n"
649 " \"children\":{\n");
650 config_parts.push_back(absl::StrJoin(clusters, ",\n"));
651 config_parts.push_back(
652 " }\n"
653 " } }\n"
654 " ]\n"
655 "}");
656 std::string json = absl::StrJoin(config_parts, "");
657 grpc_error* error = GRPC_ERROR_NONE;
658 *service_config = ServiceConfig::Create(args_, json.c_str(), &error);
659 return error;
660 }
661
GenerateResult()662 void XdsResolver::GenerateResult() {
663 if (current_update_.empty()) return;
664 // First create XdsConfigSelector, which may add new entries to the cluster
665 // state map, and then CreateServiceConfig for LB policies.
666 grpc_error* error = GRPC_ERROR_NONE;
667 auto config_selector =
668 MakeRefCounted<XdsConfigSelector>(Ref(), current_update_, error);
669 if (error != GRPC_ERROR_NONE) {
670 OnError(error);
671 return;
672 }
673 Result result;
674 error = CreateServiceConfig(&result.service_config);
675 if (error != GRPC_ERROR_NONE) {
676 OnError(error);
677 return;
678 }
679 if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
680 gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
681 result.service_config->json_string().c_str());
682 }
683 grpc_arg new_arg = config_selector->MakeChannelArg();
684 result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1);
685 result_handler_->ReturnResult(std::move(result));
686 }
687
MaybeRemoveUnusedClusters()688 void XdsResolver::MaybeRemoveUnusedClusters() {
689 bool update_needed = false;
690 for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) {
691 RefCountedPtr<ClusterState> cluster_state = it->second->RefIfNonZero();
692 if (cluster_state != nullptr) {
693 ++it;
694 } else {
695 update_needed = true;
696 it = cluster_state_map_.erase(it);
697 }
698 }
699 if (update_needed && xds_client_ != nullptr) {
700 // Send a new result to the channel.
701 GenerateResult();
702 }
703 }
704
705 //
706 // Factory
707 //
708
709 class XdsResolverFactory : public ResolverFactory {
710 public:
IsValidUri(const URI & uri) const711 bool IsValidUri(const URI& uri) const override {
712 if (GPR_UNLIKELY(!uri.authority().empty())) {
713 gpr_log(GPR_ERROR, "URI authority not supported");
714 return false;
715 }
716 return true;
717 }
718
CreateResolver(ResolverArgs args) const719 OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
720 if (!IsValidUri(args.uri)) return nullptr;
721 return MakeOrphanable<XdsResolver>(std::move(args));
722 }
723
scheme() const724 const char* scheme() const override { return "xds"; }
725 };
726
727 } // namespace
728
729 } // namespace grpc_core
730
grpc_resolver_xds_init()731 void grpc_resolver_xds_init() {
732 grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
733 absl::make_unique<grpc_core::XdsResolverFactory>());
734 }
735
grpc_resolver_xds_shutdown()736 void grpc_resolver_xds_shutdown() {}
737