• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "absl/strings/match.h"
22 #include "absl/strings/str_join.h"
23 #include "absl/strings/str_split.h"
24 #include "re2/re2.h"
25 
26 #include "src/core/ext/filters/client_channel/config_selector.h"
27 #include "src/core/ext/filters/client_channel/resolver_registry.h"
28 #include "src/core/ext/xds/xds_client.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/iomgr/closure.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32 #include "src/core/lib/transport/timeout_encoding.h"
33 
34 namespace grpc_core {
35 
36 TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");
37 
38 const char* kXdsClusterAttribute = "xds_cluster_name";
39 
40 namespace {
41 
42 //
43 // XdsResolver
44 //
45 
46 class XdsResolver : public Resolver {
47  public:
XdsResolver(ResolverArgs args)48   explicit XdsResolver(ResolverArgs args)
49       : Resolver(std::move(args.work_serializer),
50                  std::move(args.result_handler)),
51         server_name_(absl::StripPrefix(args.uri.path(), "/")),
52         args_(grpc_channel_args_copy(args.args)),
53         interested_parties_(args.pollset_set) {
54     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
55       gpr_log(GPR_INFO, "[xds_resolver %p] created for server name %s", this,
56               server_name_.c_str());
57     }
58   }
59 
~XdsResolver()60   ~XdsResolver() override {
61     grpc_channel_args_destroy(args_);
62     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
63       gpr_log(GPR_INFO, "[xds_resolver %p] destroyed", this);
64     }
65   }
66 
67   void StartLocked() override;
68 
69   void ShutdownLocked() override;
70 
71  private:
72   class Notifier {
73    public:
74     Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::LdsUpdate update);
75     Notifier(RefCountedPtr<XdsResolver> resolver, XdsApi::RdsUpdate update);
76     Notifier(RefCountedPtr<XdsResolver> resolver, grpc_error* error);
77     explicit Notifier(RefCountedPtr<XdsResolver> resolver);
78 
79    private:
80     enum Type { kLdsUpdate, kRdsUpdate, kError, kDoesNotExist };
81 
82     static void RunInExecCtx(void* arg, grpc_error* error);
83     void RunInWorkSerializer(grpc_error* error);
84 
85     RefCountedPtr<XdsResolver> resolver_;
86     grpc_closure closure_;
87     XdsApi::LdsUpdate update_;
88     Type type_;
89   };
90 
91   class ListenerWatcher : public XdsClient::ListenerWatcherInterface {
92    public:
ListenerWatcher(RefCountedPtr<XdsResolver> resolver)93     explicit ListenerWatcher(RefCountedPtr<XdsResolver> resolver)
94         : resolver_(std::move(resolver)) {}
OnListenerChanged(XdsApi::LdsUpdate listener)95     void OnListenerChanged(XdsApi::LdsUpdate listener) override {
96       new Notifier(resolver_, std::move(listener));
97     }
OnError(grpc_error * error)98     void OnError(grpc_error* error) override { new Notifier(resolver_, error); }
OnResourceDoesNotExist()99     void OnResourceDoesNotExist() override { new Notifier(resolver_); }
100 
101    private:
102     RefCountedPtr<XdsResolver> resolver_;
103   };
104 
105   class RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface {
106    public:
RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)107     explicit RouteConfigWatcher(RefCountedPtr<XdsResolver> resolver)
108         : resolver_(std::move(resolver)) {}
OnRouteConfigChanged(XdsApi::RdsUpdate route_config)109     void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override {
110       new Notifier(resolver_, std::move(route_config));
111     }
OnError(grpc_error * error)112     void OnError(grpc_error* error) override { new Notifier(resolver_, error); }
OnResourceDoesNotExist()113     void OnResourceDoesNotExist() override { new Notifier(resolver_); }
114 
115    private:
116     RefCountedPtr<XdsResolver> resolver_;
117   };
118 
119   class ClusterState
120       : public RefCounted<ClusterState, PolymorphicRefCount, false> {
121    public:
122     using ClusterStateMap =
123         std::map<std::string, std::unique_ptr<ClusterState>>;
124 
ClusterState(const std::string & cluster_name,ClusterStateMap * cluster_state_map)125     ClusterState(const std::string& cluster_name,
126                  ClusterStateMap* cluster_state_map)
127         : it_(cluster_state_map
128                   ->emplace(cluster_name, std::unique_ptr<ClusterState>(this))
129                   .first) {}
cluster() const130     const std::string& cluster() const { return it_->first; }
131 
132    private:
133     ClusterStateMap::iterator it_;
134   };
135 
136   class XdsConfigSelector : public ConfigSelector {
137    public:
138     XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,
139                       const std::vector<XdsApi::Route>& routes,
140                       grpc_error* error);
141     ~XdsConfigSelector() override;
142 
name() const143     const char* name() const override { return "XdsConfigSelector"; }
144 
Equals(const ConfigSelector * other) const145     bool Equals(const ConfigSelector* other) const override {
146       const auto* other_xds = static_cast<const XdsConfigSelector*>(other);
147       // Don't need to compare resolver_, since that will always be the same.
148       return route_table_ == other_xds->route_table_ &&
149              clusters_ == other_xds->clusters_;
150     }
151 
152     CallConfig GetCallConfig(GetCallConfigArgs args) override;
153 
154    private:
155     struct Route {
156       XdsApi::Route route;
157       absl::InlinedVector<std::pair<uint32_t, absl::string_view>, 2>
158           weighted_cluster_state;
159       RefCountedPtr<ServiceConfig> method_config;
operator ==grpc_core::__anonead745500111::XdsResolver::XdsConfigSelector::Route160       bool operator==(const Route& other) const {
161         return route == other.route &&
162                weighted_cluster_state == other.weighted_cluster_state;
163       }
164     };
165     using RouteTable = std::vector<Route>;
166 
167     void MaybeAddCluster(const std::string& name);
168     grpc_error* CreateMethodConfig(RefCountedPtr<ServiceConfig>* method_config,
169                                    const XdsApi::Route& route);
170 
171     RefCountedPtr<XdsResolver> resolver_;
172     RouteTable route_table_;
173     std::map<absl::string_view, RefCountedPtr<ClusterState>> clusters_;
174   };
175 
176   void OnListenerUpdate(XdsApi::LdsUpdate listener);
177   void OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update);
178   void OnError(grpc_error* error);
179   void OnResourceDoesNotExist();
180 
181   grpc_error* CreateServiceConfig(RefCountedPtr<ServiceConfig>* service_config);
182   void GenerateResult();
183   void MaybeRemoveUnusedClusters();
184 
185   std::string server_name_;
186   const grpc_channel_args* args_;
187   grpc_pollset_set* interested_parties_;
188   RefCountedPtr<XdsClient> xds_client_;
189   XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr;
190   std::string route_config_name_;
191   XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr;
192   ClusterState::ClusterStateMap cluster_state_map_;
193   std::vector<XdsApi::Route> current_update_;
194   XdsApi::Duration http_max_stream_duration_;
195 };
196 
197 //
198 // XdsResolver::Notifier
199 //
200 
Notifier(RefCountedPtr<XdsResolver> resolver,XdsApi::LdsUpdate update)201 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
202                                 XdsApi::LdsUpdate update)
203     : resolver_(std::move(resolver)),
204       update_(std::move(update)),
205       type_(kLdsUpdate) {
206   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
207   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
208 }
209 
Notifier(RefCountedPtr<XdsResolver> resolver,XdsApi::RdsUpdate update)210 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
211                                 XdsApi::RdsUpdate update)
212     : resolver_(std::move(resolver)), type_(kRdsUpdate) {
213   update_.rds_update = std::move(update);
214   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
215   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
216 }
217 
Notifier(RefCountedPtr<XdsResolver> resolver,grpc_error * error)218 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver,
219                                 grpc_error* error)
220     : resolver_(std::move(resolver)), type_(kError) {
221   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
222   ExecCtx::Run(DEBUG_LOCATION, &closure_, error);
223 }
224 
Notifier(RefCountedPtr<XdsResolver> resolver)225 XdsResolver::Notifier::Notifier(RefCountedPtr<XdsResolver> resolver)
226     : resolver_(std::move(resolver)), type_(kDoesNotExist) {
227   GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr);
228   ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
229 }
230 
RunInExecCtx(void * arg,grpc_error * error)231 void XdsResolver::Notifier::RunInExecCtx(void* arg, grpc_error* error) {
232   Notifier* self = static_cast<Notifier*>(arg);
233   GRPC_ERROR_REF(error);
234   self->resolver_->work_serializer()->Run(
235       [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION);
236 }
237 
RunInWorkSerializer(grpc_error * error)238 void XdsResolver::Notifier::RunInWorkSerializer(grpc_error* error) {
239   if (resolver_->xds_client_ == nullptr) {
240     GRPC_ERROR_UNREF(error);
241     delete this;
242     return;
243   }
244   switch (type_) {
245     case kLdsUpdate:
246       resolver_->OnListenerUpdate(std::move(update_));
247       break;
248     case kRdsUpdate:
249       resolver_->OnRouteConfigUpdate(std::move(*update_.rds_update));
250       break;
251     case kError:
252       resolver_->OnError(error);
253       break;
254     case kDoesNotExist:
255       resolver_->OnResourceDoesNotExist();
256       break;
257   };
258   delete this;
259 }
260 
261 //
262 // XdsResolver::XdsConfigSelector
263 //
264 
XdsConfigSelector(RefCountedPtr<XdsResolver> resolver,const std::vector<XdsApi::Route> & routes,grpc_error * error)265 XdsResolver::XdsConfigSelector::XdsConfigSelector(
266     RefCountedPtr<XdsResolver> resolver,
267     const std::vector<XdsApi::Route>& routes, grpc_error* error)
268     : resolver_(std::move(resolver)) {
269   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
270     gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p",
271             resolver_.get(), this);
272   }
273   // 1. Construct the route table
274   // 2  Update resolver's cluster state map
275   // 3. Construct cluster list to hold on to entries in the cluster state
276   // map.
277   // Reserve the necessary entries up-front to avoid reallocation as we add
278   // elements. This is necessary because the string_view in the entry's
279   // weighted_cluster_state field points to the memory in the route field, so
280   // moving the entry in a reallocation will cause the string_view to point to
281   // invalid data.
282   route_table_.reserve(routes.size());
283   for (auto& route : routes) {
284     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
285       gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s",
286               resolver_.get(), this, route.ToString().c_str());
287     }
288     route_table_.emplace_back();
289     auto& route_entry = route_table_.back();
290     route_entry.route = route;
291     // If the route doesn't specify a timeout, set its timeout to the global
292     // one.
293     if (!route.max_stream_duration.has_value()) {
294       route_entry.route.max_stream_duration =
295           resolver_->http_max_stream_duration_;
296     }
297     error = CreateMethodConfig(&route_entry.method_config, route_entry.route);
298     if (route.weighted_clusters.empty()) {
299       MaybeAddCluster(route.cluster_name);
300     } else {
301       uint32_t end = 0;
302       for (const auto& weighted_cluster : route_entry.route.weighted_clusters) {
303         MaybeAddCluster(weighted_cluster.name);
304         end += weighted_cluster.weight;
305         route_entry.weighted_cluster_state.emplace_back(end,
306                                                         weighted_cluster.name);
307       }
308     }
309   }
310 }
311 
CreateMethodConfig(RefCountedPtr<ServiceConfig> * method_config,const XdsApi::Route & route)312 grpc_error* XdsResolver::XdsConfigSelector::CreateMethodConfig(
313     RefCountedPtr<ServiceConfig>* method_config, const XdsApi::Route& route) {
314   grpc_error* error = GRPC_ERROR_NONE;
315   std::vector<std::string> fields;
316   if (route.max_stream_duration.has_value() &&
317       (route.max_stream_duration->seconds != 0 ||
318        route.max_stream_duration->nanos != 0)) {
319     fields.emplace_back(absl::StrFormat("    \"timeout\": \"%d.%09ds\"",
320                                         route.max_stream_duration->seconds,
321                                         route.max_stream_duration->nanos));
322   }
323   if (!fields.empty()) {
324     std::string json = absl::StrCat(
325         "{\n"
326         "  \"methodConfig\": [ {\n"
327         "    \"name\": [\n"
328         "      {}\n"
329         "    ],\n"
330         "    ",
331         absl::StrJoin(fields, ",\n"),
332         "\n  } ]\n"
333         "}");
334     *method_config =
335         ServiceConfig::Create(resolver_->args_, json.c_str(), &error);
336   }
337   return error;
338 }
339 
~XdsConfigSelector()340 XdsResolver::XdsConfigSelector::~XdsConfigSelector() {
341   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
342     gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p",
343             resolver_.get(), this);
344   }
345   clusters_.clear();
346   resolver_->MaybeRemoveUnusedClusters();
347 }
348 
MaybeAddCluster(const std::string & name)349 void XdsResolver::XdsConfigSelector::MaybeAddCluster(const std::string& name) {
350   if (clusters_.find(name) == clusters_.end()) {
351     auto it = resolver_->cluster_state_map_.find(name);
352     if (it == resolver_->cluster_state_map_.end()) {
353       auto new_cluster_state =
354           MakeRefCounted<ClusterState>(name, &resolver_->cluster_state_map_);
355       clusters_[new_cluster_state->cluster()] = std::move(new_cluster_state);
356     } else {
357       clusters_[it->second->cluster()] = it->second->Ref();
358     }
359   }
360 }
361 
PathMatch(const absl::string_view & path,const XdsApi::Route::Matchers::PathMatcher & path_matcher)362 bool PathMatch(const absl::string_view& path,
363                const XdsApi::Route::Matchers::PathMatcher& path_matcher) {
364   switch (path_matcher.type) {
365     case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::PREFIX:
366       return path_matcher.case_sensitive
367                  ? absl::StartsWith(path, path_matcher.string_matcher)
368                  : absl::StartsWithIgnoreCase(path,
369                                               path_matcher.string_matcher);
370     case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::PATH:
371       return path_matcher.case_sensitive
372                  ? path == path_matcher.string_matcher
373                  : absl::EqualsIgnoreCase(path, path_matcher.string_matcher);
374     case XdsApi::Route::Matchers::PathMatcher::PathMatcherType::REGEX:
375       // Note: Case-sensitive option will already have been set appropriately
376       // in path_matcher.regex_matcher when it was constructed, so no
377       // need to check it here.
378       return RE2::FullMatch(path.data(), *path_matcher.regex_matcher);
379     default:
380       return false;
381   }
382 }
383 
GetMetadataValue(const std::string & target_key,grpc_metadata_batch * initial_metadata,std::string * concatenated_value)384 absl::optional<absl::string_view> GetMetadataValue(
385     const std::string& target_key, grpc_metadata_batch* initial_metadata,
386     std::string* concatenated_value) {
387   // Find all values for the specified key.
388   GPR_DEBUG_ASSERT(initial_metadata != nullptr);
389   absl::InlinedVector<absl::string_view, 1> values;
390   for (grpc_linked_mdelem* md = initial_metadata->list.head; md != nullptr;
391        md = md->next) {
392     absl::string_view key = StringViewFromSlice(GRPC_MDKEY(md->md));
393     absl::string_view value = StringViewFromSlice(GRPC_MDVALUE(md->md));
394     if (target_key == key) values.push_back(value);
395   }
396   // If none found, no match.
397   if (values.empty()) return absl::nullopt;
398   // If exactly one found, return it as-is.
399   if (values.size() == 1) return values.front();
400   // If more than one found, concatenate the values, using
401   // *concatenated_values as a temporary holding place for the
402   // concatenated string.
403   *concatenated_value = absl::StrJoin(values, ",");
404   return *concatenated_value;
405 }
406 
HeaderMatchHelper(const XdsApi::Route::Matchers::HeaderMatcher & header_matcher,grpc_metadata_batch * initial_metadata)407 bool HeaderMatchHelper(
408     const XdsApi::Route::Matchers::HeaderMatcher& header_matcher,
409     grpc_metadata_batch* initial_metadata) {
410   std::string concatenated_value;
411   absl::optional<absl::string_view> value;
412   // Note: If we ever allow binary headers here, we still need to
413   // special-case ignore "grpc-tags-bin" and "grpc-trace-bin", since
414   // they are not visible to the LB policy in grpc-go.
415   if (absl::EndsWith(header_matcher.name, "-bin") ||
416       header_matcher.name == "grpc-previous-rpc-attempts") {
417     value = absl::nullopt;
418   } else if (header_matcher.name == "content-type") {
419     value = "application/grpc";
420   } else {
421     value = GetMetadataValue(header_matcher.name, initial_metadata,
422                              &concatenated_value);
423   }
424   if (!value.has_value()) {
425     if (header_matcher.type ==
426         XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PRESENT) {
427       return !header_matcher.present_match;
428     } else {
429       // For all other header matcher types, we need the header value to
430       // exist to consider matches.
431       return false;
432     }
433   }
434   switch (header_matcher.type) {
435     case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::EXACT:
436       return value.value() == header_matcher.string_matcher;
437     case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::REGEX:
438       return RE2::FullMatch(value.value().data(), *header_matcher.regex_match);
439     case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::RANGE:
440       int64_t int_value;
441       if (!absl::SimpleAtoi(value.value(), &int_value)) {
442         return false;
443       }
444       return int_value >= header_matcher.range_start &&
445              int_value < header_matcher.range_end;
446     case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::PREFIX:
447       return absl::StartsWith(value.value(), header_matcher.string_matcher);
448     case XdsApi::Route::Matchers::HeaderMatcher::HeaderMatcherType::SUFFIX:
449       return absl::EndsWith(value.value(), header_matcher.string_matcher);
450     default:
451       return false;
452   }
453 }
454 
HeadersMatch(const std::vector<XdsApi::Route::Matchers::HeaderMatcher> & header_matchers,grpc_metadata_batch * initial_metadata)455 bool HeadersMatch(
456     const std::vector<XdsApi::Route::Matchers::HeaderMatcher>& header_matchers,
457     grpc_metadata_batch* initial_metadata) {
458   for (const auto& header_matcher : header_matchers) {
459     bool match = HeaderMatchHelper(header_matcher, initial_metadata);
460     if (header_matcher.invert_match) match = !match;
461     if (!match) return false;
462   }
463   return true;
464 }
465 
UnderFraction(const uint32_t fraction_per_million)466 bool UnderFraction(const uint32_t fraction_per_million) {
467   // Generate a random number in [0, 1000000).
468   const uint32_t random_number = rand() % 1000000;
469   return random_number < fraction_per_million;
470 }
471 
GetCallConfig(GetCallConfigArgs args)472 ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig(
473     GetCallConfigArgs args) {
474   for (const auto& entry : route_table_) {
475     // Path matching.
476     if (!PathMatch(StringViewFromSlice(*args.path),
477                    entry.route.matchers.path_matcher)) {
478       continue;
479     }
480     // Header Matching.
481     if (!HeadersMatch(entry.route.matchers.header_matchers,
482                       args.initial_metadata)) {
483       continue;
484     }
485     // Match fraction check
486     if (entry.route.matchers.fraction_per_million.has_value() &&
487         !UnderFraction(entry.route.matchers.fraction_per_million.value())) {
488       continue;
489     }
490     // Found a route match
491     absl::string_view cluster_name;
492     if (entry.route.weighted_clusters.empty()) {
493       cluster_name = entry.route.cluster_name;
494     } else {
495       const uint32_t key =
496           rand() %
497           entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1]
498               .first;
499       // Find the index in weighted clusters corresponding to key.
500       size_t mid = 0;
501       size_t start_index = 0;
502       size_t end_index = entry.weighted_cluster_state.size() - 1;
503       size_t index = 0;
504       while (end_index > start_index) {
505         mid = (start_index + end_index) / 2;
506         if (entry.weighted_cluster_state[mid].first > key) {
507           end_index = mid;
508         } else if (entry.weighted_cluster_state[mid].first < key) {
509           start_index = mid + 1;
510         } else {
511           index = mid + 1;
512           break;
513         }
514       }
515       if (index == 0) index = start_index;
516       GPR_ASSERT(entry.weighted_cluster_state[index].first > key);
517       cluster_name = entry.weighted_cluster_state[index].second;
518     }
519     auto it = clusters_.find(cluster_name);
520     GPR_ASSERT(it != clusters_.end());
521     XdsResolver* resolver =
522         static_cast<XdsResolver*>(resolver_->Ref().release());
523     ClusterState* cluster_state = it->second->Ref().release();
524     CallConfig call_config;
525     if (entry.method_config != nullptr) {
526       call_config.service_config = entry.method_config;
527       call_config.method_configs =
528           entry.method_config->GetMethodParsedConfigVector(grpc_empty_slice());
529     }
530     call_config.call_attributes[kXdsClusterAttribute] = it->first;
531     call_config.on_call_committed = [resolver, cluster_state]() {
532       cluster_state->Unref();
533       ExecCtx::Run(
534           // TODO(roth): This hop into the ExecCtx is being done to avoid
535           // entering the WorkSerializer while holding the client channel data
536           // plane mutex, since that can lead to deadlocks. However, we should
537           // not have to solve this problem in each individual ConfigSelector
538           // implementation. When we have time, we should fix the client channel
539           // code to avoid this by not invoking the
540           // CallConfig::on_call_committed callback until after it has released
541           // the data plane mutex.
542           DEBUG_LOCATION,
543           GRPC_CLOSURE_CREATE(
544               [](void* arg, grpc_error* /*error*/) {
545                 auto* resolver = static_cast<XdsResolver*>(arg);
546                 resolver->work_serializer()->Run(
547                     [resolver]() {
548                       resolver->MaybeRemoveUnusedClusters();
549                       resolver->Unref();
550                     },
551                     DEBUG_LOCATION);
552               },
553               resolver, nullptr),
554           GRPC_ERROR_NONE);
555     };
556     return call_config;
557   }
558   return CallConfig();
559 }
560 
561 //
562 // XdsResolver
563 //
564 
StartLocked()565 void XdsResolver::StartLocked() {
566   grpc_error* error = GRPC_ERROR_NONE;
567   xds_client_ = XdsClient::GetOrCreate(&error);
568   if (error != GRPC_ERROR_NONE) {
569     gpr_log(GPR_ERROR,
570             "Failed to create xds client -- channel will remain in "
571             "TRANSIENT_FAILURE: %s",
572             grpc_error_string(error));
573     result_handler()->ReturnError(error);
574     return;
575   }
576   grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(),
577                                    interested_parties_);
578   channelz::ChannelNode* parent_channelz_node =
579       grpc_channel_args_find_pointer<channelz::ChannelNode>(
580           args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
581   if (parent_channelz_node != nullptr) {
582     xds_client_->AddChannelzLinkage(parent_channelz_node);
583   }
584   auto watcher = absl::make_unique<ListenerWatcher>(Ref());
585   listener_watcher_ = watcher.get();
586   xds_client_->WatchListenerData(server_name_, std::move(watcher));
587 }
588 
ShutdownLocked()589 void XdsResolver::ShutdownLocked() {
590   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
591     gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this);
592   }
593   if (xds_client_ != nullptr) {
594     if (listener_watcher_ != nullptr) {
595       xds_client_->CancelListenerDataWatch(server_name_, listener_watcher_,
596                                            /*delay_unsubscription=*/false);
597     }
598     if (route_config_watcher_ != nullptr) {
599       xds_client_->CancelRouteConfigDataWatch(
600           server_name_, route_config_watcher_, /*delay_unsubscription=*/false);
601     }
602     channelz::ChannelNode* parent_channelz_node =
603         grpc_channel_args_find_pointer<channelz::ChannelNode>(
604             args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
605     if (parent_channelz_node != nullptr) {
606       xds_client_->RemoveChannelzLinkage(parent_channelz_node);
607     }
608     grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(),
609                                      interested_parties_);
610     xds_client_.reset();
611   }
612 }
613 
OnListenerUpdate(XdsApi::LdsUpdate listener)614 void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) {
615   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
616     gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this);
617   }
618   if (listener.route_config_name != route_config_name_) {
619     if (route_config_watcher_ != nullptr) {
620       xds_client_->CancelRouteConfigDataWatch(
621           route_config_name_, route_config_watcher_,
622           /*delay_unsubscription=*/!listener.route_config_name.empty());
623       route_config_watcher_ = nullptr;
624     }
625     route_config_name_ = std::move(listener.route_config_name);
626     if (!route_config_name_.empty()) {
627       auto watcher = absl::make_unique<RouteConfigWatcher>(Ref());
628       route_config_watcher_ = watcher.get();
629       xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher));
630     }
631   }
632   http_max_stream_duration_ = listener.http_max_stream_duration;
633   if (route_config_name_.empty()) {
634     GPR_ASSERT(listener.rds_update.has_value());
635     OnRouteConfigUpdate(std::move(*listener.rds_update));
636   }
637 }
638 
OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update)639 void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) {
640   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
641     gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this);
642   }
643   // Find the relevant VirtualHost from the RouteConfiguration.
644   XdsApi::RdsUpdate::VirtualHost* vhost =
645       rds_update.FindVirtualHostForDomain(server_name_);
646   if (vhost == nullptr) {
647     OnError(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
648         absl::StrCat("could not find VirtualHost for ", server_name_,
649                      " in RouteConfiguration")
650             .c_str()));
651     return;
652   }
653   // Save the list of routes in the resolver.
654   current_update_ = std::move(vhost->routes);
655   // Send a new result to the channel.
656   GenerateResult();
657 }
658 
OnError(grpc_error * error)659 void XdsResolver::OnError(grpc_error* error) {
660   gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s",
661           this, grpc_error_string(error));
662   Result result;
663   result.args = grpc_channel_args_copy(args_);
664   result.service_config_error = error;
665   result_handler()->ReturnResult(std::move(result));
666 }
667 
OnResourceDoesNotExist()668 void XdsResolver::OnResourceDoesNotExist() {
669   gpr_log(GPR_ERROR,
670           "[xds_resolver %p] LDS/RDS resource does not exist -- clearing "
671           "update and returning empty service config",
672           this);
673   current_update_.clear();
674   Result result;
675   result.service_config =
676       ServiceConfig::Create(args_, "{}", &result.service_config_error);
677   GPR_ASSERT(result.service_config != nullptr);
678   result.args = grpc_channel_args_copy(args_);
679   result_handler()->ReturnResult(std::move(result));
680 }
681 
CreateServiceConfig(RefCountedPtr<ServiceConfig> * service_config)682 grpc_error* XdsResolver::CreateServiceConfig(
683     RefCountedPtr<ServiceConfig>* service_config) {
684   std::vector<std::string> clusters;
685   for (const auto& cluster : cluster_state_map_) {
686     clusters.push_back(
687         absl::StrFormat("      \"%s\":{\n"
688                         "        \"childPolicy\":[ {\n"
689                         "          \"cds_experimental\":{\n"
690                         "            \"cluster\": \"%s\"\n"
691                         "          }\n"
692                         "        } ]\n"
693                         "       }",
694                         cluster.first, cluster.first));
695   }
696   std::vector<std::string> config_parts;
697   config_parts.push_back(
698       "{\n"
699       "  \"loadBalancingConfig\":[\n"
700       "    { \"xds_cluster_manager_experimental\":{\n"
701       "      \"children\":{\n");
702   config_parts.push_back(absl::StrJoin(clusters, ",\n"));
703   config_parts.push_back(
704       "    }\n"
705       "    } }\n"
706       "  ]\n"
707       "}");
708   std::string json = absl::StrJoin(config_parts, "");
709   grpc_error* error = GRPC_ERROR_NONE;
710   *service_config = ServiceConfig::Create(args_, json.c_str(), &error);
711   return error;
712 }
713 
GenerateResult()714 void XdsResolver::GenerateResult() {
715   if (current_update_.empty()) return;
716   // First create XdsConfigSelector, which may add new entries to the cluster
717   // state map, and then CreateServiceConfig for LB policies.
718   grpc_error* error = GRPC_ERROR_NONE;
719   auto config_selector =
720       MakeRefCounted<XdsConfigSelector>(Ref(), current_update_, error);
721   if (error != GRPC_ERROR_NONE) {
722     OnError(error);
723     return;
724   }
725   Result result;
726   error = CreateServiceConfig(&result.service_config);
727   if (error != GRPC_ERROR_NONE) {
728     OnError(error);
729     return;
730   }
731   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) {
732     gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this,
733             result.service_config->json_string().c_str());
734   }
735   grpc_arg new_arg = config_selector->MakeChannelArg();
736   result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1);
737   result_handler()->ReturnResult(std::move(result));
738 }
739 
MaybeRemoveUnusedClusters()740 void XdsResolver::MaybeRemoveUnusedClusters() {
741   bool update_needed = false;
742   for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) {
743     RefCountedPtr<ClusterState> cluster_state = it->second->RefIfNonZero();
744     if (cluster_state != nullptr) {
745       ++it;
746     } else {
747       update_needed = true;
748       it = cluster_state_map_.erase(it);
749     }
750   }
751   if (update_needed && xds_client_ != nullptr) {
752     // Send a new result to the channel.
753     GenerateResult();
754   }
755 }
756 
757 //
758 // Factory
759 //
760 
761 class XdsResolverFactory : public ResolverFactory {
762  public:
IsValidUri(const URI & uri) const763   bool IsValidUri(const URI& uri) const override {
764     if (GPR_UNLIKELY(!uri.authority().empty())) {
765       gpr_log(GPR_ERROR, "URI authority not supported");
766       return false;
767     }
768     return true;
769   }
770 
CreateResolver(ResolverArgs args) const771   OrphanablePtr<Resolver> CreateResolver(ResolverArgs args) const override {
772     if (!IsValidUri(args.uri)) return nullptr;
773     return MakeOrphanable<XdsResolver>(std::move(args));
774   }
775 
scheme() const776   const char* scheme() const override { return "xds"; }
777 };
778 
779 }  // namespace
780 
781 }  // namespace grpc_core
782 
grpc_resolver_xds_init()783 void grpc_resolver_xds_init() {
784   grpc_core::ResolverRegistry::Builder::RegisterResolverFactory(
785       absl::make_unique<grpc_core::XdsResolverFactory>());
786 }
787 
grpc_resolver_xds_shutdown()788 void grpc_resolver_xds_shutdown() {}
789