• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "absl/strings/match.h"
22 #include "absl/strings/str_join.h"
23 #include "absl/strings/str_split.h"
24 #include "re2/re2.h"
25 
26 #include "src/core/ext/filters/client_channel/config_selector.h"
27 #include "src/core/ext/filters/client_channel/resolver_registry.h"
28 #include "src/core/ext/xds/xds_client.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/iomgr/closure.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32 #include "src/core/lib/transport/timeout_encoding.h"
33 
34 namespace grpc_core {
35 
36 TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");
37 
38 const char* kXdsClusterAttribute = "xds_cluster_name";
39 
40 namespace {
41 
42 //
43 // XdsResolver
44 //
45 
46 class XdsResolver : public Resolver {
47  public:
XdsResolver(ResolverArgs args)48   explicit XdsResolver(ResolverArgs args)
49       : work_serializer_(std::move(args.work_serializer)),
50         result_handler_(std::move(args.result_handler)),
51         server_name_(absl::StripPrefix(args.uri.path(), "/")),
52         args_(grpc_channel_args_copy(args.args)),
53         interested_parties_(args.pollset_set) {
54     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
55       gpr_log(GPR_INFO, "[xds_resolver %p] created for server name %s", this,
56               server_name_.c_str());
57     }
58   }
59 
~XdsResolver()60   ~XdsResolver() override {
61     grpc_channel_args_destroy(args_);
62     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
63       gpr_log(GPR_INFO, "[xds_resolver %p] destroyed", this);
64     }
65   }
66 
67   void StartLocked() override;
68 
69   void ShutdownLocked() override;
70 
71  private:
72   class Notifier {
73    public:
74     Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::LdsUpdate update);
75     Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::RdsUpdate update);
76     Notifier(RefCountedPtr<XdsResolver> resolver, grpc_error* error);
77     explicit Notifier(RefCountedPtr<XdsResolver> resolver);
78 
79    private:
80     enum Type { kLdsUpdate, kRdsUpdate, kError, kDoesNotExist };
81 
82     static void RunInExecCtx(void* arg, grpc_error* error);
83     void RunInWorkSerializer(grpc_error* error);
84 
85     RefCountedPtr<XdsResolver> resolver_;
86     grpc_closure closure_;
87     XdsApi::LdsUpdate update_;
88     Type type_;
89   };
90 
91   class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
92    public:
ListenerWatcher(RefCountedPtr<XdsResolver> resolver)93     explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
94         : resolver_(std::move(resolver)) {}
OnListenerChanged(XdsApi::LdsUpdate listener)95     void OnListenerChanged(XdsApi::LdsUpdate listener) override {
96       new Notifier(resolver_, std::move(listener));
97     }
OnError(grpc_error * error)98     void OnError(grpc_error* error) override { new Notifier(resolver_, error); }
OnResourceDoesNotExist()99     void OnResourceDoesNotExist() override { new Notifier(resolver_); }
100 
101    private:
102     RefCountedPtr<XdsResolver> resolver_;
103   };
104 
105   class RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface {
106    public:
RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)107     explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
108         : resolver_(std::move(resolver)) {}
OnRouteConfigChanged(XdsApi::RdsUpdate route_config)109     void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override {
110       new Notifier(resolver_, std::move(route_config));
111     }
OnError(grpc_error * error)112     void OnError(grpc_error* error) override { new Notifier(resolver_, error); }
OnResourceDoesNotExist()113     void OnResourceDoesNotExist() override { new Notifier(resolver_); }
114 
115    private:
116     RefCountedPtr<XdsResolver> resolver_;
117   };
118 
119   class ClusterState
120       : public RefCounted<ClusterState, PolymorphicRefCount, false> {
121    public:
122     using ClusterStateMap =
123         std::map<std::string, std::unique_ptr<ClusterState>>;
124 
ClusterState(const std::string & cluster_name,ClusterStateMap * cluster_state_map)125     ClusterState(const std::string& cluster_name,
126                  ClusterStateMap* cluster_state_map)
127         : it_(cluster_state_map
128                   ->emplace(cluster_name, std::unique_ptr<ClusterState>(this))
129                   .first) {}
cluster() const130     const std::string& cluster() const { return it_->first; }
131 
132    private:
133     ClusterStateMap::iterator it_;
134   };
135 
136   class XdsConfigSelector : public ConfigSelector {
137    public:
138     XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
139                       const std::vector<XdsApi::Route>& routes,
140                       grpc_error* error);
141     ~XdsConfigSelector() override;
142 
name() const143     const char* name() const override { return "XdsConfigSelector"; }
144 
Equals(const ConfigSelector * other) const145     bool Equals(const ConfigSelector* other) const override {
146       const auto* other_xds = static_cast<const XdsConfigSelector*>(other);
147       // Don't need to compare resolver_, since that will always be the same.
148       return route_table_ == other_xds->route_table_ &&
149              clusters_ == other_xds->clusters_;
150     }
151 
152     CallConfig GetCallConfig(GetCallConfigArgs args) override;
153 
154    private:
155     struct Route {
156       XdsApi::Route route;
157       absl::InlinedVector<std::pair<uint32_t, absl::string_view>, 2>
158           weighted_cluster_state;
159       RefCountedPtr<ServiceConfig> method_config;
operator ==grpc_core::__anon0bd98fcd0111::XdsResolver::XdsConfigSelector::Route160       bool operator==(const Route& other) const {
161         return route == other.route &&
162                weighted_cluster_state == other.weighted_cluster_state;
163       }
164     };
165     using RouteTable = std::vector<Route>;
166 
167     void MaybeAddCluster(const std::string& name);
168     grpc_error* CreateMethodConfig(RefCountedPtr<ServiceConfig>* method_config,
169                                    const XdsApi::Route& route);
170 
171     RefCountedPtr<XdsResolver> resolver_;
172     RouteTable route_table_;
173     std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters_;
174   };
175 
176   void OnListenerUpdate(XdsApi::LdsUpdate listener);
177   void OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update);
178   void OnError(grpc_error* error);
179   void OnResourceDoesNotExist();
180 
181   grpc_error* CreateServiceConfig(RefCountedPtr<ServiceConfig>* service_config);
182   void GenerateResult();
183   void MaybeRemoveUnusedClusters();
184 
185   std::shared_ptr<WorkSerializer> work_serializer_;
186   std::unique_ptr<ResultHandler> result_handler_;
187   std::string server_name_;
188   const grpc_channel_args* args_;
189   grpc_pollset_set* interested_parties_;
190   RefCountedPtr<XdsClient> xds_client_;
191   XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr;
192   std::string route_config_name_;
193   XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr;
194   ClusterState::ClusterStateMap cluster_state_map_;
195   std::vector<XdsApi::Route> current_update_;
196   XdsApi::Duration http_max_stream_duration_;
197 };
198 
199 //
200 // XdsResolver::Notifier
201 //
202 
Notifier(RefCountedPtr<XdsResolver> resolver,XdsApi::LdsUpdate update)203 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
204                                 XdsApi::LdsUpdate update)
205     : resolver_(std::move(resolver)),
206       update_(std::move(update)),
207       type_(kLdsUpdate) {
208   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
209   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
210 }
211 
Notifier(RefCountedPtr<XdsResolver> resolver,XdsApi::RdsUpdate update)212 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
213                                 XdsApi::RdsUpdate update)
214     : resolver_(std::move(resolver)), type_(kRdsUpdate) {
215   update_.rds_update = std::move(update);
216   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
217   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
218 }
219 
Notifier(RefCountedPtr<XdsResolver> resolver,grpc_error * error)220 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
221                                 grpc_error* error)
222     : resolver_(std::move(resolver)), type_(kError) {
223   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
224   ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
225 }
226 
Notifier(RefCountedPtr<XdsResolver> resolver)227 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver)
228     : resolver_(std::move(resolver)), type_(kDoesNotExist) {
229   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
230   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
231 }
232 
RunInExecCtx(void * arg,grpc_error * error)233 void XdsResolver::Notifier::RunInExecCtx(void* arg, grpc_error* error) {
234   Notifier* self = static_cast<Notifier*>(arg);
235   GRPC_ERROR_REF(error);
236   self->resolver_->work_serializer_->Run(
237       [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
238 }
239 
RunInWorkSerializer(grpc_error * error)240 void XdsResolver::Notifier::RunInWorkSerializer(grpc_error* error) {
241   if (resolver_->xds_client_ == nullptr) {
242     GRPC_ERROR_UNREF(error);
243     delete this;
244     return;
245   }
246   switch (type_) {
247     case kLdsUpdate:
248       resolver_->OnListenerUpdate(std::move(update_));
249       break;
250     case kRdsUpdate:
251       resolver_->OnRouteConfigUpdate(std::move(*update_.rds_update));
252       break;
253     case kError:
254       resolver_->OnError(error);
255       break;
256     case kDoesNotExist:
257       resolver_->OnResourceDoesNotExist();
258       break;
259   };
260   delete this;
261 }
262 
263 //
264 // XdsResolver::XdsConfigSelector
265 //
266 
XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,const std::vector<XdsApi::Route> & routes,grpc_error * error)267 XdsResolver::XdsConfigSelector::XdsConfigSelector(
268     RefCountedPtr<XdsResolver> resolver,
269     const std::vector<XdsApi::Route>& routes, grpc_error* error)
270     : resolver_(std::move(resolver)) {
271   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
272     gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p",
273             resolver_.get(), this);
274   }
275   // 1. Construct the route table
276   // 2  Update resolver's cluster state map
277   // 3. Construct cluster list to hold on to entries in the cluster state
278   // map.
279   // Reserve the necessary entries up-front to avoid reallocation as we add
280   // elements. This is necessary because the string_view in the entry's
281   // weighted_cluster_state field points to the memory in the route field, so
282   // moving the entry in a reallocation will cause the string_view to point to
283   // invalid data.
284   route_table_.reserve(routes.size());
285   for (auto& route : routes) {
286     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
287       gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s",
288               resolver_.get(), this, route.ToString().c_str());
289     }
290     route_table_.emplace_back();
291     auto& route_entry = route_table_.back();
292     route_entry.route = route;
293     // If the route doesn't specify a timeout, set its timeout to the global
294     // one.
295     if (!route.max_stream_duration.has_value()) {
296       route_entry.route.max_stream_duration =
297           resolver_->http_max_stream_duration_;
298     }
299     error = CreateMethodConfig(&route_entry.method_config, route_entry.route);
300     if (route.weighted_clusters.empty()) {
301       MaybeAddCluster(route.cluster_name);
302     } else {
303       uint32_t end = 0;
304       for (const auto& weighted_cluster : route_entry.route.weighted_clusters) {
305         MaybeAddCluster(weighted_cluster.name);
306         end += weighted_cluster.weight;
307         route_entry.weighted_cluster_state.emplace_back(end,
308                                                         weighted_cluster.name);
309       }
310     }
311   }
312 }
313 
CreateMethodConfig(RefCountedPtr<ServiceConfig> * method_config,const XdsApi::Route & route)314 grpc_error* XdsResolver::XdsConfigSelector::CreateMethodConfig(
315     RefCountedPtr<ServiceConfig>* method_config, const XdsApi::Route& route) {
316   grpc_error* error = GRPC_ERROR_NONE;
317   std::vector<std::string> fields;
318   if (route.max_stream_duration.has_value() &&
319       (route.max_stream_duration->seconds != 0 ||
320        route.max_stream_duration->nanos != 0)) {
321     fields.emplace_back(absl::StrFormat("    \"timeout\": \"%d.%09ds\"",
322                                         route.max_stream_duration->seconds,
323                                         route.max_stream_duration->nanos));
324   }
325   if (!fields.empty()) {
326     std::string json = absl::StrCat(
327         "{\n"
328         "  \"methodConfig\": [ {\n"
329         "    \"name\": [\n"
330         "      {}\n"
331         "    ],\n"
332         "    ",
333         absl::StrJoin(fields, ",\n"),
334         "\n  } ]\n"
335         "}");
336     *method_config =
337         ServiceConfig::Create(resolver_->args_, json.c_str(), &error);
338   }
339   return error;
340 }
341 
~XdsConfigSelector()342 XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
343   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
344     gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p",
345             resolver_.get(), this);
346   }
347   clusters_.clear();
348   resolver_->MaybeRemoveUnusedClusters();
349 }
350 
MaybeAddCluster(const std::string & name)351 void XdsResolver::XdsConfigSelector::MaybeAddCluster(const std::string& name) {
352   if (clusters_.find(name) == clusters_.end()) {
353     auto it = resolver_->cluster_state_map_.find(name);
354     if (it == resolver_->cluster_state_map_.end()) {
355       auto new_cluster_state =
356           MakeRefCounted<ClusterState>(name, &resolver_->cluster_state_map_);
357       clusters_[new_cluster_state->cluster()] = std::move(new_cluster_state);
358     } else {
359       clusters_[it->second->cluster()] = it->second->Ref();
360     }
361   }
362 }
363 
GetMetadataValue(const std::string & target_key,grpc_metadata_batch * initial_metadata,std::string * concatenated_value)364 absl::optional<absl::string_view> GetMetadataValue(
365     const std::string& target_key, grpc_metadata_batch* initial_metadata,
366     std::string* concatenated_value) {
367   // Find all values for the specified key.
368   GPR_DEBUG_ASSERT(initial_metadata != nullptr);
369   absl::InlinedVector<absl::string_view, 1> values;
370   for (grpc_linked_mdelem* md = initial_metadata->list.head; md != nullptr;
371        md = md->next) {
372     absl::string_view key = StringViewFromSlice(GRPC_MDKEY(md->md));
373     absl::string_view value = StringViewFromSlice(GRPC_MDVALUE(md->md));
374     if (target_key == key) values.push_back(value);
375   }
376   // If none found, no match.
377   if (values.empty()) return absl::nullopt;
378   // If exactly one found, return it as-is.
379   if (values.size() == 1) return values.front();
380   // If more than one found, concatenate the values, using
381   // *concatenated_values as a temporary holding place for the
382   // concatenated string.
383   *concatenated_value = absl::StrJoin(values, ",");
384   return *concatenated_value;
385 }
386 
HeaderMatchHelper(const HeaderMatcher & header_matcher,grpc_metadata_batch * initial_metadata)387 bool HeaderMatchHelper(const HeaderMatcher& header_matcher,
388                        grpc_metadata_batch* initial_metadata) {
389   std::string concatenated_value;
390   absl::optional<absl::string_view> value;
391   // Note: If we ever allow binary headers here, we still need to
392   // special-case ignore "grpc-tags-bin" and "grpc-trace-bin", since
393   // they are not visible to the LB policy in grpc-go.
394   if (absl::EndsWith(header_matcher.name(), "-bin") ||
395       header_matcher.name() == "grpc-previous-rpc-attempts") {
396     value = absl::nullopt;
397   } else if (header_matcher.name() == "content-type") {
398     value = "application/grpc";
399   } else {
400     value = GetMetadataValue(header_matcher.name(), initial_metadata,
401                              &concatenated_value);
402   }
403   return header_matcher.Match(value);
404 }
405 
HeadersMatch(const std::vector<HeaderMatcher> & header_matchers,grpc_metadata_batch * initial_metadata)406 bool HeadersMatch(const std::vector<HeaderMatcher>& header_matchers,
407                   grpc_metadata_batch* initial_metadata) {
408   for (const auto& header_matcher : header_matchers) {
409     if (!HeaderMatchHelper(header_matcher, initial_metadata)) return false;
410   }
411   return true;
412 }
413 
UnderFraction(const uint32_t fraction_per_million)414 bool UnderFraction(const uint32_t fraction_per_million) {
415   // Generate a random number in [0, 1000000).
416   const uint32_t random_number = rand() % 1000000;
417   return random_number < fraction_per_million;
418 }
419 
GetCallConfig(GetCallConfigArgs args)420 ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
421     GetCallConfigArgs args) {
422   for (const auto& entry : route_table_) {
423     // Path matching.
424     if (!entry.route.matchers.path_matcher.Match(
425             StringViewFromSlice(*args.path))) {
426       continue;
427     }
428     // Header Matching.
429     if (!HeadersMatch(entry.route.matchers.header_matchers,
430                       args.initial_metadata)) {
431       continue;
432     }
433     // Match fraction check
434     if (entry.route.matchers.fraction_per_million.has_value() &&
435         !UnderFraction(entry.route.matchers.fraction_per_million.value())) {
436       continue;
437     }
438     // Found a route match
439     absl::string_view cluster_name;
440     if (entry.route.weighted_clusters.empty()) {
441       cluster_name = entry.route.cluster_name;
442     } else {
443       const uint32_t key =
444           rand() %
445           entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1]
446               .first;
447       // Find the index in weighted clusters corresponding to key.
448       size_t mid = 0;
449       size_t start_index = 0;
450       size_t end_index = entry.weighted_cluster_state.size() - 1;
451       size_t index = 0;
452       while (end_index > start_index) {
453         mid = (start_index + end_index) / 2;
454         if (entry.weighted_cluster_state[mid].first > key) {
455           end_index = mid;
456         } else if (entry.weighted_cluster_state[mid].first < key) {
457           start_index = mid + 1;
458         } else {
459           index = mid + 1;
460           break;
461         }
462       }
463       if (index == 0) index = start_index;
464       GPR_ASSERT(entry.weighted_cluster_state[index].first > key);
465       cluster_name = entry.weighted_cluster_state[index].second;
466     }
467     auto it = clusters_.find(cluster_name);
468     GPR_ASSERT(it != clusters_.end());
469     XdsResolver* resolver =
470         static_cast<XdsResolver*>(resolver_->Ref().release());
471     ClusterState* cluster_state = it->second->Ref().release();
472     CallConfig call_config;
473     if (entry.method_config != nullptr) {
474       call_config.service_config = entry.method_config;
475       call_config.method_configs =
476           entry.method_config->GetMethodParsedConfigVector(grpc_empty_slice());
477     }
478     call_config.call_attributes[kXdsClusterAttribute] = it->first;
479     call_config.on_call_committed = [resolver, cluster_state]() {
480       cluster_state->Unref();
481       ExecCtx::Run(
482           // TODO(roth): This hop into the ExecCtx is being done to avoid
483           // entering the WorkSerializer while holding the client channel data
484           // plane mutex, since that can lead to deadlocks. However, we should
485           // not have to solve this problem in each individual ConfigSelector
486           // implementation. When we have time, we should fix the client channel
487           // code to avoid this by not invoking the
488           // CallConfig::on_call_committed callback until after it has released
489           // the data plane mutex.
490           DEBUG_LOCATION,
491           GRPC_CLOSURE_CREATE(
492               [](void* arg, grpc_error* /*error*/) {
493                 auto* resolver = static_cast<XdsResolver*>(arg);
494                 resolver->work_serializer_->Run(
495                     [resolver]() {
496                       resolver->MaybeRemoveUnusedClusters();
497                       resolver->Unref();
498                     },
499                     DEBUG_LOCATION);
500               },
501               resolver, nullptr),
502           GRPC_ERROR_NONE);
503     };
504     return call_config;
505   }
506   return CallConfig();
507 }
508 
509 //
510 // XdsResolver
511 //
512 
StartLocked()513 void XdsResolver::StartLocked() {
514   grpc_error* error = GRPC_ERROR_NONE;
515   xds_client_ = XdsClient::GetOrCreate(&error);
516   if (error != GRPC_ERROR_NONE) {
517     gpr_log(GPR_ERROR,
518             "Failed to create xds client -- channel will remain in "
519             "TRANSIENT_FAILURE: %s",
520             grpc_error_string(error));
521     result_handler_->ReturnError(error);
522     return;
523   }
524   grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
525                                    interested_parties_);
526   channelz::ChannelNode* parent_channelz_node =
527       grpc_channel_args_find_pointer<channelz::ChannelNode>(
528           args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
529   if (parent_channelz_node != nullptr) {
530     xds_client_->AddChannelzLinkage(parent_channelz_node);
531   }
532   auto watcher = absl::make_unique<ListenerWatcher>(Ref());
533   listener_watcher_ = watcher.get();
534   xds_client_->WatchListenerData(server_name_, std::move(watcher));
535 }
536 
ShutdownLocked()537 void XdsResolver::ShutdownLocked() {
538   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
539     gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
540   }
541   if (xds_client_ != nullptr) {
542     if (listener_watcher_ != nullptr) {
543       xds_client_->CancelListenerDataWatch(server_name_, listener_watcher_,
544                                            /*delay_unsubscription=*/false);
545     }
546     if (route_config_watcher_ != nullptr) {
547       xds_client_->CancelRouteConfigDataWatch(
548           server_name_, route_config_watcher_, /*delay_unsubscription=*/false);
549     }
550     channelz::ChannelNode* parent_channelz_node =
551         grpc_channel_args_find_pointer<channelz::ChannelNode>(
552             args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
553     if (parent_channelz_node != nullptr) {
554       xds_client_->RemoveChannelzLinkage(parent_channelz_node);
555     }
556     grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
557                                      interested_parties_);
558     xds_client_.reset();
559   }
560 }
561 
OnListenerUpdate(XdsApi::LdsUpdate listener)562 void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) {
563   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
564     gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this);
565   }
566   if (listener.route_config_name != route_config_name_) {
567     if (route_config_watcher_ != nullptr) {
568       xds_client_->CancelRouteConfigDataWatch(
569           route_config_name_, route_config_watcher_,
570           /*delay_unsubscription=*/!listener.route_config_name.empty());
571       route_config_watcher_ = nullptr;
572     }
573     route_config_name_ = std::move(listener.route_config_name);
574     if (!route_config_name_.empty()) {
575       auto watcher = absl::make_unique<RouteConfigWatcher>(Ref());
576       route_config_watcher_ = watcher.get();
577       xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher));
578     }
579   }
580   http_max_stream_duration_ = listener.http_max_stream_duration;
581   if (route_config_name_.empty()) {
582     GPR_ASSERT(listener.rds_update.has_value());
583     OnRouteConfigUpdate(std::move(*listener.rds_update));
584   }
585 }
586 
OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update)587 void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
588   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
589     gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this);
590   }
591   // Find the relevant VirtualHost from the RouteConfiguration.
592   XdsApi::RdsUpdate::VirtualHost* vhost =
593       rds_update.FindVirtualHostForDomain(server_name_);
594   if (vhost == nullptr) {
595     OnError(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
596         absl::StrCat("could not find VirtualHost for ", server_name_,
597                      " in RouteConfiguration")
598             .c_str()));
599     return;
600   }
601   // Save the list of routes in the resolver.
602   current_update_ = std::move(vhost->routes);
603   // Send a new result to the channel.
604   GenerateResult();
605 }
606 
OnError(grpc_error * error)607 void XdsResolver::OnError(grpc_error* error) {
608   gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s",
609           this, grpc_error_string(error));
610   Result result;
611   result.args = grpc_channel_args_copy(args_);
612   result.service_config_error = error;
613   result_handler_->ReturnResult(std::move(result));
614 }
615 
OnResourceDoesNotExist()616 void XdsResolver::OnResourceDoesNotExist() {
617   gpr_log(GPR_ERROR,
618           "[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
619           "update and returning empty service config",
620           this);
621   current_update_.clear();
622   Result result;
623   result.service_config =
624       ServiceConfig::Create(args_, "{}", &result.service_config_error);
625   GPR_ASSERT(result.service_config != nullptr);
626   result.args = grpc_channel_args_copy(args_);
627   result_handler_->ReturnResult(std::move(result));
628 }
629 
CreateServiceConfig(RefCountedPtr<ServiceConfig> * service_config)630 grpc_error* XdsResolver::CreateServiceConfig(
631     RefCountedPtr<ServiceConfig>* service_config) {
632   std::vector<std::string> clusters;
633   for (const auto& cluster : cluster_state_map_) {
634     clusters.push_back(
635         absl::StrFormat("      \"%s\":{\n"
636                         "        \"childPolicy\":[ {\n"
637                         "          \"cds_experimental\":{\n"
638                         "            \"cluster\": \"%s\"\n"
639                         "          }\n"
640                         "        } ]\n"
641                         "       }",
642                         cluster.first, cluster.first));
643   }
644   std::vector<std::string> config_parts;
645   config_parts.push_back(
646       "{\n"
647       "  \"loadBalancingConfig\":[\n"
648       "    { \"xds_cluster_manager_experimental\":{\n"
649       "      \"children\":{\n");
650   config_parts.push_back(absl::StrJoin(clusters, ",\n"));
651   config_parts.push_back(
652       "    }\n"
653       "    } }\n"
654       "  ]\n"
655       "}");
656   std::string json = absl::StrJoin(config_parts, "");
657   grpc_error* error = GRPC_ERROR_NONE;
658   *service_config = ServiceConfig::Create(args_, json.c_str(), &error);
659   return error;
660 }
661 
GenerateResult()662 void XdsResolver::GenerateResult() {
663   if (current_update_.empty()) return;
664   // First create XdsConfigSelector, which may add new entries to the cluster
665   // state map, and then CreateServiceConfig for LB policies.
666   grpc_error* error = GRPC_ERROR_NONE;
667   auto config_selector =
668       MakeRefCounted<XdsConfigSelector>(Ref(), current_update_, error);
669   if (error != GRPC_ERROR_NONE) {
670     OnError(error);
671     return;
672   }
673   Result result;
674   error = CreateServiceConfig(&result.service_config);
675   if (error != GRPC_ERROR_NONE) {
676     OnError(error);
677     return;
678   }
679   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
680     gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
681             result.service_config->json_string().c_str());
682   }
683   grpc_arg new_arg = config_selector->MakeChannelArg();
684   result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1);
685   result_handler_->ReturnResult(std::move(result));
686 }
687 
MaybeRemoveUnusedClusters()688 void XdsResolver::MaybeRemoveUnusedClusters() {
689   bool update_needed = false;
690   for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) {
691     RefCountedPtr<ClusterState> cluster_state = it->second->RefIfNonZero();
692     if (cluster_state != nullptr) {
693       ++it;
694     } else {
695       update_needed = true;
696       it = cluster_state_map_.erase(it);
697     }
698   }
699   if (update_needed && xds_client_ != nullptr) {
700     // Send a new result to the channel.
701     GenerateResult();
702   }
703 }
704 
705 //
706 // Factory
707 //
708 
709 class XdsResolverFactory : public ResolverFactory {
710  public:
IsValidUri(const URI & uri) const711   bool IsValidUri(const URI& uri) const override {
712     if (GPR_UNLIKELY(!uri.authority().empty())) {
713       gpr_log(GPR_ERROR, "URI authority not supported");
714       return false;
715     }
716     return true;
717   }
718 
CreateResolver(ResolverArgs args) const719   OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
720     if (!IsValidUri(args.uri)) return nullptr;
721     return MakeOrphanable<XdsResolver>(std::move(args));
722   }
723 
scheme() const724   const char* scheme() const override { return "xds"; }
725 };
726 
727 }  // namespace
728 
729 }  // namespace grpc_core
730 
grpc_resolver_xds_init()731 void grpc_resolver_xds_init() {
732   grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
733       absl::make_unique<grpc_core::XdsResolverFactory>());
734 }
735 
grpc_resolver_xds_shutdown()736 void grpc_resolver_xds_shutdown() {}
737